This source file includes following definitions.
- ompi_osc_rdma_pending_op_construct
- ompi_osc_rdma_pending_op_destruct
- ompi_osc_rdma_atomic_complete
- compare_ranks
- ompi_osc_rdma_get_peers
- ompi_osc_rdma_release_peers
- ompi_osc_rdma_handle_post
- ompi_osc_rdma_check_posts
- ompi_osc_rdma_post_peer
- ompi_osc_rdma_post_atomic
- ompi_osc_rdma_start_atomic
- ompi_osc_rdma_complete_atomic
- ompi_osc_rdma_wait_atomic
- ompi_osc_rdma_test_atomic
- ompi_osc_rdma_fence_atomic
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 #include "ompi_config.h"
28
29 #include "osc_rdma.h"
30 #include "osc_rdma_frag.h"
31 #include "osc_rdma_active_target.h"
32
33 #include "mpi.h"
34 #include "opal/threads/mutex.h"
35 #include "ompi/communicator/communicator.h"
36 #include "ompi/mca/osc/base/base.h"
37
38
39
40
41
42
43
44 struct ompi_osc_rdma_pending_post_t {
45 opal_list_item_t super;
46 int rank;
47 };
48 typedef struct ompi_osc_rdma_pending_post_t ompi_osc_rdma_pending_post_t;
49
50 static OBJ_CLASS_INSTANCE(ompi_osc_rdma_pending_post_t, opal_list_item_t, NULL, NULL);
51
52 static void ompi_osc_rdma_pending_op_construct (ompi_osc_rdma_pending_op_t *pending_op)
53 {
54 pending_op->op_frag = NULL;
55 pending_op->op_buffer = NULL;
56 pending_op->op_result = NULL;
57 pending_op->op_complete = false;
58 pending_op->cbfunc = NULL;
59 pending_op->module = NULL;
60 }
61
62 static void ompi_osc_rdma_pending_op_destruct (ompi_osc_rdma_pending_op_t *pending_op)
63 {
64 if (NULL != pending_op->op_frag) {
65 ompi_osc_rdma_frag_complete (pending_op->op_frag);
66 }
67
68 if (NULL != pending_op->module) {
69 (void) opal_atomic_fetch_add_32 (&pending_op->module->pending_ops, -1);
70 }
71
72 ompi_osc_rdma_pending_op_construct (pending_op);
73 }
74
75 OBJ_CLASS_INSTANCE(ompi_osc_rdma_pending_op_t, opal_list_item_t,
76 ompi_osc_rdma_pending_op_construct,
77 ompi_osc_rdma_pending_op_destruct);
78
79
80
81
82 void ompi_osc_rdma_atomic_complete (mca_btl_base_module_t *btl, struct mca_btl_base_endpoint_t *endpoint,
83 void *local_address, mca_btl_base_registration_handle_t *local_handle,
84 void *context, void *data, int status)
85 {
86 ompi_osc_rdma_pending_op_t *pending_op = (ompi_osc_rdma_pending_op_t *) context;
87
88 OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_INFO, "pending atomic %p complete with status %d", (void*)pending_op, status);
89
90 if (pending_op->op_result) {
91 memmove (pending_op->op_result, pending_op->op_buffer, pending_op->op_size);
92 }
93
94 if (NULL != pending_op->cbfunc) {
95 pending_op->cbfunc (pending_op->cbdata, pending_op->cbcontext, status);
96 }
97
98 if (NULL != pending_op->op_frag) {
99 ompi_osc_rdma_frag_complete (pending_op->op_frag);
100 pending_op->op_frag = NULL;
101 }
102
103 pending_op->op_complete = true;
104 OBJ_RELEASE(pending_op);
105 }
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120 static int compare_ranks (const void *ptra, const void *ptrb)
121 {
122 int a = *((int *) ptra);
123 int b = *((int *) ptrb);
124
125 if (a < b) {
126 return -1;
127 } else if (a > b) {
128 return 1;
129 }
130
131 return 0;
132 }
133
134
135
136
137
138
139
140
141
142
143
144
145 static ompi_osc_rdma_peer_t **ompi_osc_rdma_get_peers (ompi_osc_rdma_module_t *module, ompi_group_t *sub_group)
146 {
147 int size = ompi_group_size(sub_group);
148 ompi_osc_rdma_peer_t **peers;
149 int *ranks1, *ranks2;
150 int ret;
151
152 ranks1 = calloc (size, sizeof(int));
153 ranks2 = calloc (size, sizeof(int));
154 peers = calloc (size, sizeof (ompi_osc_rdma_peer_t *));
155 if (NULL == ranks1 || NULL == ranks2 || NULL == peers) {
156 free (ranks1);
157 free (ranks2);
158 free (peers);
159 return NULL;
160 }
161
162 for (int i = 0 ; i < size ; ++i) {
163 ranks1[i] = i;
164 }
165
166 ret = ompi_group_translate_ranks (sub_group, size, ranks1, module->comm->c_local_group,
167 ranks2);
168 free (ranks1);
169 if (OMPI_SUCCESS != ret) {
170 free (ranks2);
171 free (peers);
172 return NULL;
173 }
174
175 qsort (ranks2, size, sizeof (int), compare_ranks);
176 for (int i = 0 ; i < size ; ++i) {
177 peers[i] = ompi_osc_rdma_module_peer (module, ranks2[i]);
178 if (NULL == peers[i]) {
179 free (peers);
180 peers = NULL;
181 break;
182 }
183
184 OBJ_RETAIN(peers[i]);
185 }
186 free (ranks2);
187
188 return peers;
189 }
190
191 static void ompi_osc_rdma_release_peers (ompi_osc_rdma_peer_t **peers, int npeers)
192 {
193 for (int i = 0 ; i < npeers ; ++i) {
194 OBJ_RELEASE(peers[i]);
195 }
196
197 free (peers);
198 }
199
200 static void ompi_osc_rdma_handle_post (ompi_osc_rdma_module_t *module, int rank, ompi_osc_rdma_peer_t **peers, int npeers) {
201 ompi_osc_rdma_state_t *state = module->state;
202 ompi_osc_rdma_pending_post_t *pending_post;
203
204
205 for (int j = 0 ; j < npeers ; ++j) {
206 if (rank == peers[j]->rank) {
207 OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_INFO, "got expected post from %d. still expecting posts from %d processes",
208 rank, (int) (npeers - state->num_post_msgs - 1));
209
210 ompi_osc_rdma_counter_add (&state->num_post_msgs, 1);
211 return;
212 }
213 }
214
215
216 OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_INFO, "got unexpected post from %d . queueing for later", rank);
217 pending_post = OBJ_NEW(ompi_osc_rdma_pending_post_t);
218 pending_post->rank = rank;
219 OPAL_THREAD_SCOPED_LOCK(&module->lock, opal_list_append (&module->pending_posts, &pending_post->super));
220 }
221
222 static void ompi_osc_rdma_check_posts (ompi_osc_rdma_module_t *module)
223 {
224 ompi_osc_rdma_state_t *state = module->state;
225 ompi_osc_rdma_sync_t *sync = &module->all_sync;
226 int count = 0;
227
228 if (OMPI_OSC_RDMA_SYNC_TYPE_PSCW == sync->type) {
229 count = sync->num_peers;
230 }
231
232 for (int i = 0 ; i < OMPI_OSC_RDMA_POST_PEER_MAX ; ++i) {
233
234 if (0 == state->post_peers[i]) {
235 continue;
236 }
237
238 ompi_osc_rdma_handle_post (module, state->post_peers[i] - 1, sync->peer_list.peers, count);
239 state->post_peers[i] = 0;
240 }
241 }
242
243 static int ompi_osc_rdma_post_peer (ompi_osc_rdma_module_t *module, ompi_osc_rdma_peer_t *peer)
244 {
245 uint64_t target = (uint64_t) (intptr_t) peer->state + offsetof (ompi_osc_rdma_state_t, post_index);
246 ompi_osc_rdma_lock_t post_index, result, _tmp_value;
247 int my_rank = ompi_comm_rank (module->comm);
248 int ret;
249
250 if (peer->rank == my_rank) {
251 ompi_osc_rdma_handle_post (module, my_rank, NULL, 0);
252 return OMPI_SUCCESS;
253 }
254
255
256 if (!ompi_osc_rdma_peer_local_state (peer)) {
257 ret = ompi_osc_rdma_lock_btl_fop (module, peer, target, MCA_BTL_ATOMIC_ADD, 1, &post_index, true);
258 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
259 return ret;
260 }
261 } else {
262 post_index = ompi_osc_rdma_counter_add ((osc_rdma_atomic_counter_t *) (intptr_t) target, 1) - 1;
263 }
264
265 post_index &= OMPI_OSC_RDMA_POST_PEER_MAX - 1;
266
267 target = (uint64_t) (intptr_t) peer->state + offsetof (ompi_osc_rdma_state_t, post_peers) +
268 sizeof (osc_rdma_counter_t) * post_index;
269
270 do {
271 OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "attempting to post to index %d @ rank %d", (int)post_index, peer->rank);
272
273 _tmp_value = 0;
274
275
276 if (!ompi_osc_rdma_peer_local_state (peer)) {
277 ret = ompi_osc_rdma_lock_btl_cswap (module, peer, target, 0, 1 + (int64_t) my_rank, &result);
278 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
279 return ret;
280 }
281 } else {
282 result = !ompi_osc_rdma_lock_compare_exchange ((osc_rdma_atomic_counter_t *) target, &_tmp_value,
283 1 + (osc_rdma_counter_t) my_rank);
284 }
285
286 if (OPAL_LIKELY(0 == result)) {
287 break;
288 }
289
290
291 ompi_osc_rdma_check_posts (module);
292
293
294 nanosleep (&(struct timespec) {.tv_sec = 0, .tv_nsec = 100}, NULL);
295 } while (1);
296
297 return OMPI_SUCCESS;
298 }
299
300 int ompi_osc_rdma_post_atomic (ompi_group_t *group, int assert, ompi_win_t *win)
301 {
302 ompi_osc_rdma_module_t *module = GET_MODULE(win);
303 ompi_osc_rdma_peer_t **peers;
304 ompi_osc_rdma_state_t *state = module->state;
305 int ret = OMPI_SUCCESS;
306
307 OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "post: %p, %d, %s", (void*) group, assert, win->w_name);
308
309
310 if (module->pw_group) {
311 return OMPI_ERR_RMA_SYNC;
312 }
313
314
315 OBJ_RETAIN(group);
316
317 OPAL_THREAD_LOCK(&module->lock);
318
319
320 if (NULL != module->pw_group) {
321 OPAL_THREAD_UNLOCK(&(module->lock));
322 return OMPI_ERR_RMA_SYNC;
323 }
324 module->pw_group = group;
325
326
327
328
329 state->num_complete_msgs = 0;
330 OPAL_THREAD_UNLOCK(&module->lock);
331
332 if ((assert & MPI_MODE_NOCHECK) || 0 == ompi_group_size (group)) {
333 return OMPI_SUCCESS;
334 }
335
336
337 peers = ompi_osc_rdma_get_peers (module, module->pw_group);
338 if (OPAL_UNLIKELY(NULL == peers)) {
339 return OMPI_ERR_OUT_OF_RESOURCE;
340 }
341
342 OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "sending post messages");
343
344
345 for (int i = 0 ; i < ompi_group_size(module->pw_group) ; ++i) {
346 ret = ompi_osc_rdma_post_peer (module, peers[i]);
347 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
348 break;
349 }
350 }
351
352 ompi_osc_rdma_release_peers (peers, ompi_group_size(module->pw_group));
353
354 OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "post complete");
355
356 return ret;
357 }
358
359 int ompi_osc_rdma_start_atomic (ompi_group_t *group, int assert, ompi_win_t *win)
360 {
361 ompi_osc_rdma_module_t *module = GET_MODULE(win);
362 ompi_osc_rdma_pending_post_t *pending_post, *next;
363 ompi_osc_rdma_state_t *state = module->state;
364 ompi_osc_rdma_sync_t *sync = &module->all_sync;
365 int group_size = ompi_group_size (group);
366
367 OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "start: %p, %d, %s", (void*) group, assert,
368 win->w_name);
369
370 OPAL_THREAD_LOCK(&module->lock);
371
372
373 if (ompi_osc_rdma_access_epoch_active (module)) {
374 OPAL_THREAD_UNLOCK(&module->lock);
375 return OMPI_ERR_RMA_SYNC;
376 }
377
378
379 sync->num_peers = ompi_group_size (group);
380 sync->sync.pscw.group = group;
381
382
383 state->num_post_msgs = 0;
384
385 OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "start group size %d", sync->num_peers);
386
387 if (0 == ompi_group_size (group)) {
388
389 OPAL_THREAD_UNLOCK(&module->lock);
390 return OMPI_SUCCESS;
391 }
392
393 opal_atomic_wmb ();
394
395 sync->type = OMPI_OSC_RDMA_SYNC_TYPE_PSCW;
396
397
398
399 sync->epoch_active = true;
400
401
402 sync->peer_list.peers = ompi_osc_rdma_get_peers (module, group);
403 if (NULL == sync->peer_list.peers) {
404 OPAL_THREAD_UNLOCK(&module->lock);
405 return OMPI_ERR_OUT_OF_RESOURCE;
406 }
407
408
409 OBJ_RETAIN(group);
410
411 if (!(assert & MPI_MODE_NOCHECK)) {
412
413 OPAL_LIST_FOREACH_SAFE(pending_post, next, &module->pending_posts, ompi_osc_rdma_pending_post_t) {
414 for (int i = 0 ; i < group_size ; ++i) {
415 ompi_osc_rdma_peer_t *peer = sync->peer_list.peers[i];
416
417 if (pending_post->rank == peer->rank) {
418 OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "found queued post from %d. still expecting posts "
419 "from %d processes", peer->rank, (int) (group_size - state->num_post_msgs - 1));
420 opal_list_remove_item (&module->pending_posts, &pending_post->super);
421 OBJ_RELEASE(pending_post);
422 ompi_osc_rdma_counter_add (&state->num_post_msgs, 1);
423 break;
424 }
425 }
426 }
427
428
429 while (state->num_post_msgs != group_size) {
430 OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "waiting for post messages. have %d of %d",
431 (int) state->num_post_msgs, group_size);
432 ompi_osc_rdma_check_posts (module);
433 ompi_osc_rdma_progress (module);
434 }
435 } else {
436 state->num_post_msgs = group_size;
437 }
438
439 OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "start complete");
440
441 OPAL_THREAD_UNLOCK(&module->lock);
442 return OMPI_SUCCESS;
443 }
444
445 int ompi_osc_rdma_complete_atomic (ompi_win_t *win)
446 {
447 ompi_osc_rdma_module_t *module = GET_MODULE(win);
448 ompi_osc_rdma_sync_t *sync = &module->all_sync;
449 ompi_osc_rdma_peer_t **peers;
450 ompi_group_t *group;
451 int group_size;
452 int ret __opal_attribute_unused__;
453
454 OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "complete: %s", win->w_name);
455
456 OPAL_THREAD_LOCK(&module->lock);
457 if (OMPI_OSC_RDMA_SYNC_TYPE_PSCW != sync->type) {
458 OPAL_THREAD_UNLOCK(&module->lock);
459 return OMPI_ERR_RMA_SYNC;
460 }
461
462
463 group = sync->sync.pscw.group;
464 group_size = sync->num_peers;
465 sync->type = OMPI_OSC_RDMA_SYNC_TYPE_NONE;
466 sync->epoch_active = false;
467
468
469 OBJ_RELEASE(group);
470
471 peers = sync->peer_list.peers;
472 if (NULL == peers) {
473
474 OPAL_THREAD_UNLOCK(&(module->lock));
475 OBJ_RELEASE(group);
476 return OMPI_SUCCESS;
477 }
478
479 sync->peer_list.peers = NULL;
480
481 OPAL_THREAD_UNLOCK(&(module->lock));
482
483 ompi_osc_rdma_sync_rdma_complete (sync);
484
485
486 for (int i = 0 ; i < group_size ; ++i) {
487 ompi_osc_rdma_peer_t *peer = peers[i];
488 intptr_t target = (intptr_t) peer->state + offsetof (ompi_osc_rdma_state_t, num_complete_msgs);
489
490 if (!ompi_osc_rdma_peer_local_state (peer)) {
491 ret = ompi_osc_rdma_lock_btl_op (module, peer, target, MCA_BTL_ATOMIC_ADD, 1, true);
492 assert (OMPI_SUCCESS == ret);
493 } else {
494 (void) ompi_osc_rdma_counter_add ((osc_rdma_atomic_counter_t *) target, 1);
495 }
496 }
497
498
499 ompi_osc_rdma_release_peers (peers, group_size);
500
501 OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "complete complete");
502
503 return OMPI_SUCCESS;
504 }
505
506 int ompi_osc_rdma_wait_atomic (ompi_win_t *win)
507 {
508 ompi_osc_rdma_module_t *module = GET_MODULE(win);
509 ompi_osc_rdma_state_t *state = module->state;
510 ompi_group_t *group;
511 int group_size;
512
513 OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "wait: %s", win->w_name);
514
515 OPAL_THREAD_LOCK(&module->lock);
516 if (NULL == module->pw_group) {
517 OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_INFO, "no matching post");
518 OPAL_THREAD_UNLOCK(&module->lock);
519 return OMPI_ERR_RMA_SYNC;
520 }
521
522 group_size = ompi_group_size (module->pw_group);
523 OPAL_THREAD_UNLOCK(&module->lock);
524
525 OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "waiting on complete message. have %d of %d",
526 (int) state->num_complete_msgs, group_size);
527
528 while (group_size != state->num_complete_msgs) {
529 ompi_osc_rdma_progress (module);
530 opal_atomic_mb ();
531 }
532
533 OPAL_THREAD_LOCK(&module->lock);
534 group = module->pw_group;
535 module->pw_group = NULL;
536 OPAL_THREAD_UNLOCK(&module->lock);
537
538 OBJ_RELEASE(group);
539
540 OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "wait complete");
541
542 return OMPI_SUCCESS;
543 }
544
545
546 int ompi_osc_rdma_test_atomic (ompi_win_t *win, int *flag)
547 {
548 ompi_osc_rdma_module_t *module = GET_MODULE(win);
549 ompi_osc_rdma_state_t *state = module->state;
550 ompi_group_t *group;
551 int group_size;
552
553 OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "test: %s", win->w_name);
554
555 OPAL_THREAD_LOCK(&module->lock);
556 if (NULL == module->pw_group) {
557 OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_INFO, "no matching post");
558 OPAL_THREAD_UNLOCK(&module->lock);
559 return OMPI_ERR_RMA_SYNC;
560 }
561
562 group_size = ompi_group_size (module->pw_group);
563
564 *flag = (group_size == state->num_complete_msgs);
565 OPAL_THREAD_UNLOCK(&module->lock);
566
567 OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "checking on complete message. have %d of %d",
568 (int) state->num_complete_msgs, group_size);
569
570 if (!*flag) {
571 ompi_osc_rdma_progress (module);
572 return OMPI_SUCCESS;
573 }
574
575 state->num_complete_msgs = 0;
576
577 OPAL_THREAD_LOCK(&(module->lock));
578 group = module->pw_group;
579 module->pw_group = NULL;
580 OPAL_THREAD_UNLOCK(&(module->lock));
581
582 OBJ_RELEASE(group);
583
584 OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "test complete. returning flag: true");
585
586 return OMPI_SUCCESS;
587 }
588
589 int ompi_osc_rdma_fence_atomic (int assert, ompi_win_t *win)
590 {
591 ompi_osc_rdma_module_t *module = GET_MODULE(win);
592 int ret = OMPI_SUCCESS;
593
594 OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "fence: %d, %s", assert, win->w_name);
595
596
597 if (ompi_osc_rdma_in_passive_epoch (module) || module->pw_group) {
598 OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_INFO, "can not start fence epoch due to conflicting epoch");
599 return OMPI_ERR_RMA_SYNC;
600 }
601
602
603
604 OPAL_THREAD_LOCK(&module->lock);
605
606
607 if (0 == (assert & MPI_MODE_NOSUCCEED)) {
608 module->all_sync.type = OMPI_OSC_RDMA_SYNC_TYPE_FENCE;
609 module->all_sync.num_peers = ompi_comm_size (module->comm);
610
611
612 }
613
614
615
616
617 module->all_sync.epoch_active = false;
618
619
620
621
622
623 ompi_osc_rdma_sync_rdma_complete (&module->all_sync);
624
625
626 ret = module->comm->c_coll->coll_barrier(module->comm, module->comm->c_coll->coll_barrier_module);
627
628 if (assert & MPI_MODE_NOSUCCEED) {
629
630
631 module->all_sync.type = OMPI_OSC_RDMA_SYNC_TYPE_NONE;
632 }
633
634 OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "fence complete");
635
636 OPAL_THREAD_UNLOCK(&module->lock);
637
638 return ret;
639 }