This source file includes following definitions.
- compare_ranks
- ompi_osc_pt2pt_get_peers
- ompi_osc_pt2pt_release_peers
- ompi_osc_pt2pt_fence
- ompi_osc_pt2pt_start
- ompi_osc_pt2pt_complete
- ompi_osc_pt2pt_post
- ompi_osc_pt2pt_wait
- ompi_osc_pt2pt_test
- osc_pt2pt_incoming_complete
- osc_pt2pt_incoming_post
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 #include "ompi_config.h"
28
29 #include "osc_pt2pt.h"
30 #include "osc_pt2pt_header.h"
31 #include "osc_pt2pt_data_move.h"
32 #include "osc_pt2pt_frag.h"
33
34 #include "mpi.h"
35 #include "opal/runtime/opal_progress.h"
36 #include "opal/threads/mutex.h"
37 #include "ompi/communicator/communicator.h"
38 #include "ompi/mca/osc/base/base.h"
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 static int compare_ranks (const void *ptra, const void *ptrb)
54 {
55 int a = *((int *) ptra);
56 int b = *((int *) ptrb);
57
58 if (a < b) {
59 return -1;
60 } else if (a > b) {
61 return 1;
62 }
63
64 return 0;
65 }
66
67
68
69
70
71
72
73
74
75
76
77
78 static ompi_osc_pt2pt_peer_t **ompi_osc_pt2pt_get_peers (ompi_osc_pt2pt_module_t *module, ompi_group_t *sub_group)
79 {
80 int size = ompi_group_size(sub_group);
81 ompi_osc_pt2pt_peer_t **peers;
82 int *ranks1, *ranks2;
83 int ret;
84
85 ranks1 = calloc (size, sizeof(int));
86 ranks2 = calloc (size, sizeof(int));
87 peers = calloc (size, sizeof (ompi_osc_pt2pt_peer_t *));
88 if (NULL == ranks1 || NULL == ranks2 || NULL == peers) {
89 free (ranks1);
90 free (ranks2);
91 free (peers);
92 return NULL;
93 }
94
95 for (int i = 0 ; i < size ; ++i) {
96 ranks1[i] = i;
97 }
98
99 ret = ompi_group_translate_ranks (sub_group, size, ranks1, module->comm->c_local_group,
100 ranks2);
101 free (ranks1);
102 if (OMPI_SUCCESS != ret) {
103 free (ranks2);
104 free (peers);
105 return NULL;
106 }
107
108 qsort (ranks2, size, sizeof (int), compare_ranks);
109 for (int i = 0 ; i < size ; ++i) {
110 peers[i] = ompi_osc_pt2pt_peer_lookup (module, ranks2[i]);
111 OBJ_RETAIN(peers[i]);
112 }
113 free (ranks2);
114
115 return peers;
116 }
117
118 static void ompi_osc_pt2pt_release_peers (ompi_osc_pt2pt_peer_t **peers, int npeers)
119 {
120 if (peers) {
121 for (int i = 0 ; i < npeers ; ++i) {
122 OBJ_RELEASE(peers[i]);
123 }
124
125 free (peers);
126 }
127 }
128
129 int ompi_osc_pt2pt_fence(int assert, ompi_win_t *win)
130 {
131 ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
132 uint32_t incoming_reqs;
133 int ret = OMPI_SUCCESS;
134
135 OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
136 "osc pt2pt: fence start"));
137
138
139 if (ompi_osc_pt2pt_in_passive_epoch (module)) {
140 OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
141 "osc pt2pt: could not enter fence. already in an access epoch"));
142 return OMPI_ERR_RMA_SYNC;
143 }
144
145
146 if (0 == (assert & MPI_MODE_NOSUCCEED)) {
147 module->all_sync.type = OMPI_OSC_PT2PT_SYNC_TYPE_FENCE;
148 module->all_sync.eager_send_active = true;
149 }
150
151
152 if (0 != (assert & MPI_MODE_NOPRECEDE)) {
153 module->comm->c_coll->coll_barrier (module->comm, module->comm->c_coll->coll_barrier_module);
154 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
155 "osc pt2pt: fence end (short circuit)"));
156 return ret;
157 }
158
159
160 ret = ompi_osc_pt2pt_frag_flush_all(module);
161 if (OMPI_SUCCESS != ret) {
162 return ret;
163 }
164
165 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
166 "osc pt2pt: fence done sending"));
167
168
169 ret = module->comm->c_coll->coll_reduce_scatter_block ((void *) module->epoch_outgoing_frag_count,
170 &incoming_reqs, 1, MPI_UINT32_T,
171 MPI_SUM, module->comm,
172 module->comm->c_coll->coll_reduce_scatter_block_module);
173 if (OMPI_SUCCESS != ret) {
174 return ret;
175 }
176
177 OPAL_THREAD_LOCK(&module->lock);
178 bzero ((void *) module->epoch_outgoing_frag_count, sizeof(uint32_t) * ompi_comm_size(module->comm));
179
180 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
181 "osc pt2pt: fence expects %d requests",
182 incoming_reqs));
183
184
185 OPAL_THREAD_ADD_FETCH32(&module->active_incoming_frag_count, -incoming_reqs);
186
187
188 while (module->outgoing_frag_count < 0 || module->active_incoming_frag_count < 0) {
189 opal_condition_wait(&module->cond, &module->lock);
190 }
191
192 if (assert & MPI_MODE_NOSUCCEED) {
193
194
195 ompi_osc_pt2pt_sync_reset (&module->all_sync);
196 }
197
198 module->all_sync.epoch_active = false;
199 OPAL_THREAD_UNLOCK(&module->lock);
200
201 module->comm->c_coll->coll_barrier (module->comm, module->comm->c_coll->coll_barrier_module);
202
203 OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
204 "osc pt2pt: fence end: %d", ret));
205
206 return OMPI_SUCCESS;
207 }
208
209
210 int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win)
211 {
212 ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
213 ompi_osc_pt2pt_sync_t *sync = &module->all_sync;
214
215 OPAL_THREAD_LOCK(&sync->lock);
216
217
218 if (ompi_osc_pt2pt_access_epoch_active (module)) {
219 OPAL_THREAD_UNLOCK(&sync->lock);
220 return OMPI_ERR_RMA_SYNC;
221 }
222
223
224 sync->num_peers = ompi_group_size (group);
225 sync->sync.pscw.group = group;
226
227
228 sync->sync_expected = sync->num_peers;
229
230
231
232
233
234 sync->eager_send_active = false;
235
236 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
237 "ompi_osc_pt2pt_start entering with group size %d...",
238 sync->num_peers));
239
240 sync->type = OMPI_OSC_PT2PT_SYNC_TYPE_PSCW;
241
242
243
244 sync->epoch_active = true;
245
246
247 OBJ_RETAIN(group);
248
249 if (0 == ompi_group_size (group)) {
250
251 sync->eager_send_active = true;
252 OPAL_THREAD_UNLOCK(&sync->lock);
253 return OMPI_SUCCESS;
254 }
255
256 opal_atomic_wmb ();
257
258
259 sync->peer_list.peers = ompi_osc_pt2pt_get_peers (module, group);
260 if (NULL == sync->peer_list.peers) {
261 OPAL_THREAD_UNLOCK(&sync->lock);
262 return OMPI_ERR_OUT_OF_RESOURCE;
263 }
264
265 if (!(assert & MPI_MODE_NOCHECK)) {
266 for (int i = 0 ; i < sync->num_peers ; ++i) {
267 ompi_osc_pt2pt_peer_t *peer = sync->peer_list.peers[i];
268
269 if (ompi_osc_pt2pt_peer_unex (peer)) {
270
271 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
272 "found unexpected post from %d",
273 peer->rank));
274 OPAL_THREAD_ADD_FETCH32 (&sync->sync_expected, -1);
275 ompi_osc_pt2pt_peer_set_unex (peer, false);
276 }
277 }
278 } else {
279 sync->sync_expected = 0;
280 }
281
282 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
283 "post messages still needed: %d", sync->sync_expected));
284
285
286
287
288 if (0 == sync->sync_expected) {
289 sync->eager_send_active = true;
290 }
291
292 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
293 "ompi_osc_pt2pt_start complete. eager sends active: %d",
294 sync->eager_send_active));
295
296 OPAL_THREAD_UNLOCK(&sync->lock);
297 return OMPI_SUCCESS;
298 }
299
300
301 int ompi_osc_pt2pt_complete (ompi_win_t *win)
302 {
303 ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
304 ompi_osc_pt2pt_sync_t *sync = &module->all_sync;
305 int my_rank = ompi_comm_rank (module->comm);
306 ompi_osc_pt2pt_peer_t **peers;
307 int ret = OMPI_SUCCESS;
308 ompi_group_t *group;
309 size_t group_size;
310
311 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
312 "ompi_osc_pt2pt_complete entering..."));
313
314 OPAL_THREAD_LOCK(&sync->lock);
315 if (OMPI_OSC_PT2PT_SYNC_TYPE_PSCW != sync->type) {
316 OPAL_THREAD_UNLOCK(&sync->lock);
317 return OMPI_ERR_RMA_SYNC;
318 }
319
320
321 ompi_osc_pt2pt_sync_wait_nolock (sync);
322
323
324 group = sync->sync.pscw.group;
325 group_size = sync->num_peers;
326
327 peers = sync->peer_list.peers;
328
329
330 ompi_osc_pt2pt_sync_reset (sync);
331 OPAL_THREAD_UNLOCK(&sync->lock);
332
333 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
334 "ompi_osc_pt2pt_complete all posts received. sending complete messages..."));
335
336
337
338
339
340
341
342
343 for (size_t i = 0 ; i < group_size ; ++i) {
344 ompi_osc_pt2pt_header_complete_t complete_req;
345 int rank = peers[i]->rank;
346
347 if (my_rank == rank) {
348
349 osc_pt2pt_incoming_complete (module, rank, 0);
350 continue;
351 }
352
353 complete_req.base.type = OMPI_OSC_PT2PT_HDR_TYPE_COMPLETE;
354 complete_req.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID;
355 complete_req.frag_count = module->epoch_outgoing_frag_count[rank];
356 #if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
357 #if OPAL_ENABLE_DEBUG
358 complete_req.padding[0] = 0;
359 complete_req.padding[1] = 0;
360 #endif
361 osc_pt2pt_hton(&complete_req, ompi_comm_peer_lookup (module->comm, rank));
362 #endif
363
364 ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, rank);
365
366
367
368 if (peer->active_frag) {
369 ompi_osc_pt2pt_frag_t *active_frag = (ompi_osc_pt2pt_frag_t *) peer->active_frag;
370 if (active_frag->remain_len < sizeof (complete_req)) {
371 ++complete_req.frag_count;
372 }
373 }
374
375 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
376 "ompi_osc_pt2pt_complete sending complete message to %d. frag_count: %u",
377 rank, complete_req.frag_count));
378
379 ret = ompi_osc_pt2pt_control_send (module, rank, &complete_req,
380 sizeof(ompi_osc_pt2pt_header_complete_t));
381 if (OMPI_SUCCESS != ret) {
382 break;
383 }
384
385 ret = ompi_osc_pt2pt_frag_flush_target (module, rank);
386 if (OMPI_SUCCESS != ret) {
387 break;
388 }
389
390
391 module->epoch_outgoing_frag_count[rank] = 0;
392 }
393
394 if (peers) {
395
396 ompi_osc_pt2pt_release_peers (peers, group_size);
397 }
398
399 if (OMPI_SUCCESS != ret) {
400 return ret;
401 }
402
403 OPAL_THREAD_LOCK(&module->lock);
404
405
406 while (module->outgoing_frag_count < 0) {
407 opal_condition_wait(&module->cond, &module->lock);
408 }
409
410
411 OPAL_THREAD_UNLOCK(&module->lock);
412
413
414 OBJ_RELEASE(group);
415
416 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
417 "ompi_osc_pt2pt_complete complete"));
418
419 return OMPI_SUCCESS;
420 }
421
422
423 int ompi_osc_pt2pt_post (ompi_group_t *group, int assert, ompi_win_t *win)
424 {
425 int ret = OMPI_SUCCESS;
426 ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
427 ompi_osc_pt2pt_header_post_t post_req;
428 ompi_osc_pt2pt_peer_t **peers;
429
430
431 if (module->pw_group) {
432 return OMPI_ERR_RMA_SYNC;
433 }
434
435 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
436 "ompi_osc_pt2pt_post entering with group size %d...",
437 ompi_group_size (group)));
438
439 OPAL_THREAD_LOCK(&module->lock);
440
441
442 if (NULL != module->pw_group) {
443 OPAL_THREAD_UNLOCK(&(module->lock));
444 return OMPI_ERR_RMA_SYNC;
445 }
446
447
448 OBJ_RETAIN(group);
449
450 module->pw_group = group;
451
452
453
454
455 module->num_complete_msgs = -ompi_group_size(module->pw_group);
456
457 OPAL_THREAD_UNLOCK(&(module->lock));
458
459 if ((assert & MPI_MODE_NOCHECK) || 0 == ompi_group_size (group)) {
460 return OMPI_SUCCESS;
461 }
462
463 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
464 "sending post messages"));
465
466
467 peers = ompi_osc_pt2pt_get_peers (module, module->pw_group);
468 if (OPAL_UNLIKELY(NULL == peers)) {
469 return OMPI_ERR_OUT_OF_RESOURCE;
470 }
471
472
473 for (int i = 0 ; i < ompi_group_size(module->pw_group) ; ++i) {
474 ompi_osc_pt2pt_peer_t *peer = peers[i];
475 int rank = peer->rank;
476
477 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "Sending post message to rank %d", rank));
478 ompi_proc_t *proc = ompi_comm_peer_lookup (module->comm, rank);
479
480
481 if (ompi_proc_local() == proc) {
482 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_complete self post"));
483 osc_pt2pt_incoming_post (module, ompi_comm_rank(module->comm));
484 continue;
485 }
486
487 post_req.base.type = OMPI_OSC_PT2PT_HDR_TYPE_POST;
488 post_req.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID;
489 osc_pt2pt_hton(&post_req, proc);
490
491
492
493 ret = ompi_osc_pt2pt_control_send_unbuffered(module, rank, &post_req,
494 sizeof(ompi_osc_pt2pt_header_post_t));
495 if (OMPI_SUCCESS != ret) {
496 break;
497 }
498 }
499
500 ompi_osc_pt2pt_release_peers (peers, ompi_group_size(module->pw_group));
501
502 return ret;
503 }
504
505
506 int ompi_osc_pt2pt_wait (ompi_win_t *win)
507 {
508 ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
509 ompi_group_t *group;
510
511 if (NULL == module->pw_group) {
512 return OMPI_ERR_RMA_SYNC;
513 }
514
515 OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
516 "ompi_osc_pt2pt_wait entering... module %p", (void *) module));
517
518 OPAL_THREAD_LOCK(&module->lock);
519 while (0 != module->num_complete_msgs || module->active_incoming_frag_count < 0) {
520 OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "module %p, num_complete_msgs = %d, "
521 "active_incoming_frag_count = %d", (void *) module, module->num_complete_msgs,
522 module->active_incoming_frag_count));
523 opal_condition_wait(&module->cond, &module->lock);
524 }
525
526 group = module->pw_group;
527 module->pw_group = NULL;
528 OPAL_THREAD_UNLOCK(&module->lock);
529
530 OBJ_RELEASE(group);
531
532 OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
533 "ompi_osc_pt2pt_wait complete"));
534
535 return OMPI_SUCCESS;
536 }
537
538
539 int ompi_osc_pt2pt_test (ompi_win_t *win, int *flag)
540 {
541 ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
542 ompi_group_t *group;
543 int ret = OMPI_SUCCESS;
544
545 #if !OPAL_ENABLE_PROGRESS_THREADS
546 opal_progress();
547 #endif
548
549 if (NULL == module->pw_group) {
550 return OMPI_ERR_RMA_SYNC;
551 }
552
553 OPAL_THREAD_LOCK(&(module->lock));
554
555 if (0 != module->num_complete_msgs || module->active_incoming_frag_count < 0) {
556 *flag = 0;
557 } else {
558 *flag = 1;
559
560 group = module->pw_group;
561 module->pw_group = NULL;
562
563 OBJ_RELEASE(group);
564 }
565
566 OPAL_THREAD_UNLOCK(&(module->lock));
567
568 return ret;
569 }
570
571 void osc_pt2pt_incoming_complete (ompi_osc_pt2pt_module_t *module, int source, int frag_count)
572 {
573 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
574 "osc pt2pt: process_complete got complete message from %d. expected fragment count %d. "
575 "current incomming count: %d. expected complete msgs: %d", source,
576 frag_count, module->active_incoming_frag_count, module->num_complete_msgs));
577
578
579 OPAL_THREAD_ADD_FETCH32(&module->active_incoming_frag_count, -frag_count);
580
581
582 opal_atomic_wmb ();
583
584 if (0 == OPAL_THREAD_ADD_FETCH32(&module->num_complete_msgs, 1)) {
585 OPAL_THREAD_LOCK(&module->lock);
586 opal_condition_broadcast (&module->cond);
587 OPAL_THREAD_UNLOCK(&module->lock);
588 }
589 }
590
591 void osc_pt2pt_incoming_post (ompi_osc_pt2pt_module_t *module, int source)
592 {
593 ompi_osc_pt2pt_sync_t *sync = &module->all_sync;
594
595 OPAL_THREAD_LOCK(&sync->lock);
596
597
598 if (!ompi_osc_pt2pt_sync_pscw_peer (module, source, NULL)) {
599 ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source);
600
601 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
602 "received unexpected post message from %d for future PSCW synchronization",
603 source));
604
605 ompi_osc_pt2pt_peer_set_unex (peer, true);
606 OPAL_THREAD_UNLOCK(&sync->lock);
607 } else {
608 OPAL_THREAD_UNLOCK(&sync->lock);
609
610 ompi_osc_pt2pt_sync_expected (sync);
611
612 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
613 "received post message for PSCW synchronization. post messages still needed: %d",
614 sync->sync_expected));
615 }
616 }