This source file includes following definitions.
- ompi_osc_ucx_handle_incoming_post
- ompi_osc_ucx_fence
- ompi_osc_ucx_start
- ompi_osc_ucx_complete
- ompi_osc_ucx_post
- ompi_osc_ucx_wait
- ompi_osc_ucx_test
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 #include "ompi_config.h"
27
28 #include "ompi/mca/osc/osc.h"
29 #include "ompi/mca/osc/base/base.h"
30 #include "ompi/mca/osc/base/osc_base_obj_convert.h"
31 #include "opal/mca/common/ucx/common_ucx.h"
32
33 #include "osc_ucx.h"
34
35 typedef struct ompi_osc_ucx_pending_post {
36 opal_list_item_t super;
37 int rank;
38 } ompi_osc_ucx_pending_post_t;
39
40 OBJ_CLASS_INSTANCE(ompi_osc_ucx_pending_post_t, opal_list_item_t, NULL, NULL);
41
42 static inline void ompi_osc_ucx_handle_incoming_post(ompi_osc_ucx_module_t *module, volatile uint64_t *post_ptr, int ranks_in_win_grp[], int grp_size) {
43 int i, post_rank = (*post_ptr) - 1;
44 ompi_osc_ucx_pending_post_t *pending_post = NULL;
45
46 (*post_ptr) = 0;
47
48 for (i = 0; i < grp_size; i++) {
49 if (post_rank == ranks_in_win_grp[i]) {
50 module->post_count++;
51 return;
52 }
53 }
54
55
56 pending_post = OBJ_NEW(ompi_osc_ucx_pending_post_t);
57 pending_post->rank = post_rank;
58 opal_list_append(&module->pending_posts, &pending_post->super);
59 }
60
61 int ompi_osc_ucx_fence(int assert, struct ompi_win_t *win) {
62 ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
63 int ret = OMPI_SUCCESS;
64
65 if (module->epoch_type.access != NONE_EPOCH &&
66 module->epoch_type.access != FENCE_EPOCH) {
67 return OMPI_ERR_RMA_SYNC;
68 }
69
70 if (assert & MPI_MODE_NOSUCCEED) {
71 module->epoch_type.access = NONE_EPOCH;
72 } else {
73 module->epoch_type.access = FENCE_EPOCH;
74 }
75
76 if (!(assert & MPI_MODE_NOPRECEDE)) {
77 ret = opal_common_ucx_wpmem_flush(module->mem, OPAL_COMMON_UCX_SCOPE_WORKER, 0);
78 if (ret != OMPI_SUCCESS) {
79 return ret;
80 }
81 }
82
83 return module->comm->c_coll->coll_barrier(module->comm,
84 module->comm->c_coll->coll_barrier_module);
85 }
86
87 int ompi_osc_ucx_start(struct ompi_group_t *group, int assert, struct ompi_win_t *win) {
88 ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
89 int i, size, *ranks_in_grp = NULL, *ranks_in_win_grp = NULL;
90 ompi_group_t *win_group = NULL;
91 int ret = OMPI_SUCCESS;
92
93 if (module->epoch_type.access != NONE_EPOCH &&
94 module->epoch_type.access != FENCE_EPOCH) {
95 return OMPI_ERR_RMA_SYNC;
96 }
97
98 module->epoch_type.access = START_COMPLETE_EPOCH;
99
100 OBJ_RETAIN(group);
101 module->start_group = group;
102 size = ompi_group_size(module->start_group);
103
104 ranks_in_grp = malloc(sizeof(int) * size);
105 ranks_in_win_grp = malloc(sizeof(int) * ompi_comm_size(module->comm));
106
107 for (i = 0; i < size; i++) {
108 ranks_in_grp[i] = i;
109 }
110
111 ret = ompi_comm_group(module->comm, &win_group);
112 if (ret != OMPI_SUCCESS) {
113 return OMPI_ERROR;
114 }
115
116 ret = ompi_group_translate_ranks(module->start_group, size, ranks_in_grp,
117 win_group, ranks_in_win_grp);
118 if (ret != OMPI_SUCCESS) {
119 return OMPI_ERROR;
120 }
121
122 if ((assert & MPI_MODE_NOCHECK) == 0) {
123 ompi_osc_ucx_pending_post_t *pending_post, *next;
124
125
126 OPAL_LIST_FOREACH_SAFE(pending_post, next, &module->pending_posts, ompi_osc_ucx_pending_post_t) {
127 for (i = 0; i < size; i++) {
128 if (pending_post->rank == ranks_in_win_grp[i]) {
129 opal_list_remove_item(&module->pending_posts, &pending_post->super);
130 OBJ_RELEASE(pending_post);
131 module->post_count++;
132 break;
133 }
134 }
135 }
136
137
138 while (module->post_count != size) {
139 for (i = 0; i < OMPI_OSC_UCX_POST_PEER_MAX; i++) {
140 if (0 == module->state.post_state[i]) {
141 continue;
142 }
143
144 ompi_osc_ucx_handle_incoming_post(module, &(module->state.post_state[i]), ranks_in_win_grp, size);
145 }
146 opal_common_ucx_wpool_progress(mca_osc_ucx_component.wpool);
147 }
148
149 module->post_count = 0;
150 }
151
152 free(ranks_in_grp);
153 ompi_group_free(&win_group);
154
155 module->start_grp_ranks = ranks_in_win_grp;
156
157 return ret;
158 }
159
160 int ompi_osc_ucx_complete(struct ompi_win_t *win) {
161 ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
162 int i, size;
163 int ret = OMPI_SUCCESS;
164
165 if (module->epoch_type.access != START_COMPLETE_EPOCH) {
166 return OMPI_ERR_RMA_SYNC;
167 }
168
169 module->epoch_type.access = NONE_EPOCH;
170
171 ret = opal_common_ucx_wpmem_flush(module->mem, OPAL_COMMON_UCX_SCOPE_WORKER, 0);
172 if (ret != OMPI_SUCCESS) {
173 return ret;
174 }
175
176 size = ompi_group_size(module->start_group);
177 for (i = 0; i < size; i++) {
178 uint64_t remote_addr = module->state_addrs[module->start_grp_ranks[i]] + OSC_UCX_STATE_COMPLETE_COUNT_OFFSET;
179
180 ret = opal_common_ucx_wpmem_post(module->mem, UCP_ATOMIC_POST_OP_ADD,
181 1, module->start_grp_ranks[i], sizeof(uint64_t),
182 remote_addr);
183 if (ret != OMPI_SUCCESS) {
184 OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_post failed: %d", ret);
185 }
186
187 ret = opal_common_ucx_wpmem_flush(module->mem, OPAL_COMMON_UCX_SCOPE_EP,
188 module->start_grp_ranks[i]);
189 if (ret != OMPI_SUCCESS) {
190 return ret;
191 }
192 }
193
194 OBJ_RELEASE(module->start_group);
195 module->start_group = NULL;
196 free(module->start_grp_ranks);
197
198 return ret;
199 }
200
201 int ompi_osc_ucx_post(struct ompi_group_t *group, int assert, struct ompi_win_t *win) {
202 ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
203 int ret = OMPI_SUCCESS;
204
205 if (module->epoch_type.exposure != NONE_EPOCH) {
206 return OMPI_ERR_RMA_SYNC;
207 }
208
209 OBJ_RETAIN(group);
210 module->post_group = group;
211
212 if ((assert & MPI_MODE_NOCHECK) == 0) {
213 int i, j, size;
214 ompi_group_t *win_group = NULL;
215 int *ranks_in_grp = NULL, *ranks_in_win_grp = NULL;
216 int myrank = ompi_comm_rank(module->comm);
217
218 size = ompi_group_size(module->post_group);
219 ranks_in_grp = malloc(sizeof(int) * size);
220 ranks_in_win_grp = malloc(sizeof(int) * ompi_comm_size(module->comm));
221
222 for (i = 0; i < size; i++) {
223 ranks_in_grp[i] = i;
224 }
225
226 ret = ompi_comm_group(module->comm, &win_group);
227 if (ret != OMPI_SUCCESS) {
228 return OMPI_ERROR;
229 }
230
231 ret = ompi_group_translate_ranks(module->post_group, size, ranks_in_grp,
232 win_group, ranks_in_win_grp);
233 if (ret != OMPI_SUCCESS) {
234 return OMPI_ERROR;
235 }
236
237 for (i = 0; i < size; i++) {
238 uint64_t remote_addr = module->state_addrs[ranks_in_win_grp[i]] + OSC_UCX_STATE_POST_INDEX_OFFSET;
239 uint64_t curr_idx = 0, result = 0;
240
241
242 ret = opal_common_ucx_wpmem_fetch(module->mem, UCP_ATOMIC_FETCH_OP_FADD,
243 1, ranks_in_win_grp[i], &result,
244 sizeof(result), remote_addr);
245 if (ret != OMPI_SUCCESS) {
246 return OMPI_ERROR;
247 }
248
249 curr_idx = result & (OMPI_OSC_UCX_POST_PEER_MAX - 1);
250
251 remote_addr = module->state_addrs[ranks_in_win_grp[i]] + OSC_UCX_STATE_POST_STATE_OFFSET + sizeof(uint64_t) * curr_idx;
252
253
254 do {
255 ret = opal_common_ucx_wpmem_cmpswp(module->mem, 0, result,
256 myrank + 1, &result, sizeof(result),
257 remote_addr);
258 if (ret != OMPI_SUCCESS) {
259 return OMPI_ERROR;
260 }
261
262 if (result == 0)
263 break;
264
265
266 for (j = 0; j < OMPI_OSC_UCX_POST_PEER_MAX; j++) {
267
268 if (0 == module->state.post_state[j]) {
269 continue;
270 }
271
272 ompi_osc_ucx_handle_incoming_post(module, &(module->state.post_state[j]), NULL, 0);
273 }
274
275 ucp_worker_progress(mca_osc_ucx_component.wpool->dflt_worker);
276 usleep(100);
277 } while (1);
278 }
279
280 free(ranks_in_grp);
281 free(ranks_in_win_grp);
282 ompi_group_free(&win_group);
283 }
284
285 module->epoch_type.exposure = POST_WAIT_EPOCH;
286
287 return ret;
288 }
289
290 int ompi_osc_ucx_wait(struct ompi_win_t *win) {
291 ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
292 int size;
293
294 if (module->epoch_type.exposure != POST_WAIT_EPOCH) {
295 return OMPI_ERR_RMA_SYNC;
296 }
297
298 size = ompi_group_size(module->post_group);
299
300 while (module->state.complete_count != (uint64_t)size) {
301
302 opal_common_ucx_wpool_progress(mca_osc_ucx_component.wpool);
303 }
304
305 module->state.complete_count = 0;
306
307 OBJ_RELEASE(module->post_group);
308 module->post_group = NULL;
309
310 module->epoch_type.exposure = NONE_EPOCH;
311
312 return OMPI_SUCCESS;
313 }
314
315 int ompi_osc_ucx_test(struct ompi_win_t *win, int *flag) {
316 ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
317 int size;
318
319 if (module->epoch_type.exposure != POST_WAIT_EPOCH) {
320 return OMPI_ERR_RMA_SYNC;
321 }
322
323 size = ompi_group_size(module->post_group);
324
325 opal_progress();
326
327 if (module->state.complete_count == (uint64_t)size) {
328 OBJ_RELEASE(module->post_group);
329 module->post_group = NULL;
330
331 module->state.complete_count = 0;
332
333 module->epoch_type.exposure = NONE_EPOCH;
334 *flag = 1;
335 } else {
336 *flag = 0;
337 }
338
339 return OMPI_SUCCESS;
340 }