This source file includes following definitions.
- start_shared
- end_shared
- start_exclusive
- end_exclusive
- ompi_osc_ucx_lock
- ompi_osc_ucx_unlock
- ompi_osc_ucx_lock_all
- ompi_osc_ucx_unlock_all
- ompi_osc_ucx_sync
- ompi_osc_ucx_flush
- ompi_osc_ucx_flush_all
- ompi_osc_ucx_flush_local
- ompi_osc_ucx_flush_local_all
1
2
3
4
5
6
7
8
9
10 #include "ompi_config.h"
11
12 #include "ompi/mca/osc/osc.h"
13 #include "ompi/mca/osc/base/base.h"
14 #include "ompi/mca/osc/base/osc_base_obj_convert.h"
15 #include "opal/mca/common/ucx/common_ucx.h"
16
17 #include "osc_ucx.h"
18
19 OBJ_CLASS_INSTANCE(ompi_osc_ucx_lock_t, opal_object_t, NULL, NULL);
20
21 static inline int start_shared(ompi_osc_ucx_module_t *module, int target) {
22 uint64_t result_value = -1;
23 uint64_t remote_addr = (module->state_addrs)[target] + OSC_UCX_STATE_LOCK_OFFSET;
24 int ret = OMPI_SUCCESS;
25
26 while (true) {
27 ret = opal_common_ucx_wpmem_fetch(module->state_mem, UCP_ATOMIC_FETCH_OP_FADD, 1,
28 target, &result_value, sizeof(result_value),
29 remote_addr);
30 if (OMPI_SUCCESS != ret) {
31 return ret;
32 }
33
34 assert((int64_t)result_value >= 0);
35 if (result_value >= TARGET_LOCK_EXCLUSIVE) {
36 ret = opal_common_ucx_wpmem_post(module->state_mem,
37 UCP_ATOMIC_POST_OP_ADD, (-1), target,
38 sizeof(uint64_t), remote_addr);
39 if (OMPI_SUCCESS != ret) {
40 return ret;
41 }
42 } else {
43 break;
44 }
45 ucp_worker_progress(mca_osc_ucx_component.wpool->dflt_worker);
46 }
47
48 return ret;
49 }
50
51 static inline int end_shared(ompi_osc_ucx_module_t *module, int target) {
52 uint64_t remote_addr = (module->state_addrs)[target] + OSC_UCX_STATE_LOCK_OFFSET;
53 return opal_common_ucx_wpmem_post(module->state_mem, UCP_ATOMIC_POST_OP_ADD,
54 (-1), target, sizeof(uint64_t), remote_addr);
55 }
56
57 static inline int start_exclusive(ompi_osc_ucx_module_t *module, int target) {
58 uint64_t result_value = -1;
59 uint64_t remote_addr = (module->state_addrs)[target] + OSC_UCX_STATE_LOCK_OFFSET;
60 int ret = OMPI_SUCCESS;
61
62 for (;;) {
63 ret = opal_common_ucx_wpmem_cmpswp(module->state_mem,
64 TARGET_LOCK_UNLOCKED, TARGET_LOCK_EXCLUSIVE,
65 target, &result_value, sizeof(result_value),
66 remote_addr);
67 if (OMPI_SUCCESS != ret) {
68 return ret;
69 }
70 if (result_value == TARGET_LOCK_UNLOCKED) {
71 return OMPI_SUCCESS;
72 }
73
74 ucp_worker_progress(mca_osc_ucx_component.wpool->dflt_worker);
75 }
76 }
77
78 static inline int end_exclusive(ompi_osc_ucx_module_t *module, int target) {
79 uint64_t result_value = 0;
80 uint64_t remote_addr = (module->state_addrs)[target] + OSC_UCX_STATE_LOCK_OFFSET;
81 int ret = OMPI_SUCCESS;
82
83 ret = opal_common_ucx_wpmem_fetch(module->state_mem,
84 UCP_ATOMIC_FETCH_OP_SWAP, TARGET_LOCK_UNLOCKED,
85 target, &result_value, sizeof(result_value),
86 remote_addr);
87 if (OMPI_SUCCESS != ret) {
88 return ret;
89 }
90
91 assert(result_value >= TARGET_LOCK_EXCLUSIVE);
92
93 return ret;
94 }
95
96 int ompi_osc_ucx_lock(int lock_type, int target, int assert, struct ompi_win_t *win) {
97 ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t *)win->w_osc_module;
98 ompi_osc_ucx_lock_t *lock = NULL;
99 ompi_osc_ucx_epoch_t original_epoch = module->epoch_type.access;
100 int ret = OMPI_SUCCESS;
101
102 if (module->lock_count == 0) {
103 if (module->epoch_type.access != NONE_EPOCH &&
104 module->epoch_type.access != FENCE_EPOCH) {
105 return OMPI_ERR_RMA_SYNC;
106 }
107 } else {
108 ompi_osc_ucx_lock_t *item = NULL;
109 assert(module->epoch_type.access == PASSIVE_EPOCH);
110 opal_hash_table_get_value_uint32(&module->outstanding_locks, (uint32_t) target, (void **) &item);
111 if (item != NULL) {
112 return OMPI_ERR_RMA_SYNC;
113 }
114 }
115
116 module->epoch_type.access = PASSIVE_EPOCH;
117 module->lock_count++;
118 assert(module->lock_count <= ompi_comm_size(module->comm));
119
120 lock = OBJ_NEW(ompi_osc_ucx_lock_t);
121 lock->target_rank = target;
122
123 if ((assert & MPI_MODE_NOCHECK) == 0) {
124 lock->is_nocheck = false;
125 if (lock_type == MPI_LOCK_EXCLUSIVE) {
126 ret = start_exclusive(module, target);
127 lock->type = LOCK_EXCLUSIVE;
128 } else {
129 ret = start_shared(module, target);
130 lock->type = LOCK_SHARED;
131 }
132 } else {
133 lock->is_nocheck = true;
134 }
135
136 if (ret == OMPI_SUCCESS) {
137 opal_hash_table_set_value_uint32(&module->outstanding_locks, (uint32_t)target, (void *)lock);
138 } else {
139 OBJ_RELEASE(lock);
140 module->epoch_type.access = original_epoch;
141 }
142
143 return ret;
144 }
145
146 int ompi_osc_ucx_unlock(int target, struct ompi_win_t *win) {
147 ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t *)win->w_osc_module;
148 ompi_osc_ucx_lock_t *lock = NULL;
149 int ret = OMPI_SUCCESS;
150
151 if (module->epoch_type.access != PASSIVE_EPOCH) {
152 return OMPI_ERR_RMA_SYNC;
153 }
154
155 opal_hash_table_get_value_uint32(&module->outstanding_locks, (uint32_t) target, (void **) &lock);
156 if (lock == NULL) {
157 return OMPI_ERR_RMA_SYNC;
158 }
159
160 opal_hash_table_remove_value_uint32(&module->outstanding_locks,
161 (uint32_t)target);
162
163 ret = opal_common_ucx_wpmem_flush(module->mem, OPAL_COMMON_UCX_SCOPE_EP, target);
164 if (ret != OMPI_SUCCESS) {
165 return ret;
166 }
167
168 if (lock->is_nocheck == false) {
169 if (lock->type == LOCK_EXCLUSIVE) {
170 ret = end_exclusive(module, target);
171 } else {
172 ret = end_shared(module, target);
173 }
174 }
175
176 OBJ_RELEASE(lock);
177
178 module->lock_count--;
179 assert(module->lock_count >= 0);
180 if (module->lock_count == 0) {
181 module->epoch_type.access = NONE_EPOCH;
182 }
183
184 return ret;
185 }
186
187 int ompi_osc_ucx_lock_all(int assert, struct ompi_win_t *win) {
188 ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
189 int ret = OMPI_SUCCESS;
190
191 if (module->epoch_type.access != NONE_EPOCH &&
192 module->epoch_type.access != FENCE_EPOCH) {
193 return OMPI_ERR_RMA_SYNC;
194 }
195
196 module->epoch_type.access = PASSIVE_ALL_EPOCH;
197
198 if (0 == (assert & MPI_MODE_NOCHECK)) {
199 int i, comm_size;
200 module->lock_all_is_nocheck = false;
201 comm_size = ompi_comm_size(module->comm);
202 for (i = 0; i < comm_size; i++) {
203 ret = start_shared(module, i);
204 if (ret != OMPI_SUCCESS) {
205 int j;
206 for (j = 0; j < i; j++) {
207 end_shared(module, j);
208 }
209 return ret;
210 }
211 }
212 } else {
213 module->lock_all_is_nocheck = true;
214 }
215 assert(OMPI_SUCCESS == ret);
216 return OMPI_SUCCESS;
217 }
218
219 int ompi_osc_ucx_unlock_all(struct ompi_win_t *win) {
220 ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*)win->w_osc_module;
221 int comm_size = ompi_comm_size(module->comm);
222 int ret = OMPI_SUCCESS;
223
224 if (module->epoch_type.access != PASSIVE_ALL_EPOCH) {
225 return OMPI_ERR_RMA_SYNC;
226 }
227
228 assert(module->lock_count == 0);
229
230 ret = opal_common_ucx_wpmem_flush(module->mem, OPAL_COMMON_UCX_SCOPE_WORKER, 0);
231 if (ret != OMPI_SUCCESS) {
232 return ret;
233 }
234
235 if (!module->lock_all_is_nocheck) {
236 int i;
237 for (i = 0; i < comm_size; i++) {
238 ret |= end_shared(module, i);
239 }
240 }
241
242 module->epoch_type.access = NONE_EPOCH;
243
244 return ret;
245 }
246
247 int ompi_osc_ucx_sync(struct ompi_win_t *win) {
248 ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t *)win->w_osc_module;
249 int ret = OMPI_SUCCESS;
250
251 if (module->epoch_type.access != PASSIVE_EPOCH &&
252 module->epoch_type.access != PASSIVE_ALL_EPOCH) {
253 return OMPI_ERR_RMA_SYNC;
254 }
255
256 opal_atomic_mb();
257
258 ret = opal_common_ucx_wpmem_fence(module->mem);
259 if (ret != OMPI_SUCCESS) {
260 OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fence failed: %d", ret);
261 }
262
263 return ret;
264 }
265
266 int ompi_osc_ucx_flush(int target, struct ompi_win_t *win) {
267 ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
268 int ret = OMPI_SUCCESS;
269
270 if (module->epoch_type.access != PASSIVE_EPOCH &&
271 module->epoch_type.access != PASSIVE_ALL_EPOCH) {
272 return OMPI_ERR_RMA_SYNC;
273 }
274
275 ret = opal_common_ucx_wpmem_flush(module->mem, OPAL_COMMON_UCX_SCOPE_EP, target);
276 if (ret != OMPI_SUCCESS) {
277 return ret;
278 }
279
280 return OMPI_SUCCESS;
281 }
282
283 int ompi_osc_ucx_flush_all(struct ompi_win_t *win) {
284 ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t *)win->w_osc_module;
285 int ret = OMPI_SUCCESS;
286
287 if (module->epoch_type.access != PASSIVE_EPOCH &&
288 module->epoch_type.access != PASSIVE_ALL_EPOCH) {
289 return OMPI_ERR_RMA_SYNC;
290 }
291
292 ret = opal_common_ucx_wpmem_flush(module->mem, OPAL_COMMON_UCX_SCOPE_WORKER, 0);
293 if (ret != OMPI_SUCCESS) {
294 return ret;
295 }
296
297 return OMPI_SUCCESS;
298 }
299
300 int ompi_osc_ucx_flush_local(int target, struct ompi_win_t *win) {
301
302
303 return ompi_osc_ucx_flush(target, win);
304 }
305
306 int ompi_osc_ucx_flush_local_all(struct ompi_win_t *win) {
307
308
309 return ompi_osc_ucx_flush_all(win);
310 }