This source file includes following definitions.
- mca_comm_cid_context_construct
- mca_comm_cid_context_destruct
- ompi_comm_allreduce_context_construct
- ompi_comm_allreduce_context_destruct
- ompi_comm_cid_init
- mca_comm_cid_context_alloc
- ompi_comm_allreduce_context_alloc
- ompi_comm_nextcid_nb
- ompi_comm_nextcid
- ompi_comm_allreduce_getnextcid
- ompi_comm_checkcid
- ompi_comm_nextcid_check_flag
- ompi_comm_activate_nb
- ompi_comm_activate
- ompi_comm_activate_nb_complete
- ompi_comm_allreduce_intra_nb
- ompi_comm_allreduce_inter_nb
- ompi_comm_allreduce_inter_leader_exchange
- ompi_comm_allreduce_inter_leader_reduce
- ompi_comm_allreduce_inter_bcast
- ompi_comm_allreduce_bridged_schedule_bcast
- ompi_comm_allreduce_bridged_xchng_complete
- ompi_comm_allreduce_bridged_reduce_complete
- ompi_comm_allreduce_intra_bridge_nb
- ompi_comm_allreduce_pmix_reduce_complete
- ompi_comm_allreduce_intra_pmix_nb
- ompi_comm_allreduce_group_broadcast
- ompi_comm_allreduce_group_recv_complete
- ompi_comm_allreduce_group_nb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 #include "ompi_config.h"
34
35 #include "opal/dss/dss.h"
36 #include "opal/mca/pmix/pmix.h"
37 #include "opal/util/printf.h"
38
39 #include "ompi/proc/proc.h"
40 #include "ompi/communicator/communicator.h"
41 #include "ompi/op/op.h"
42 #include "ompi/constants.h"
43 #include "opal/class/opal_pointer_array.h"
44 #include "opal/class/opal_list.h"
45 #include "ompi/mca/pml/pml.h"
46 #include "ompi/mca/rte/rte.h"
47 #include "ompi/mca/coll/base/base.h"
48 #include "ompi/request/request.h"
49 #include "ompi/runtime/mpiruntime.h"
50
51 struct ompi_comm_cid_context_t;
52
53 typedef int (*ompi_comm_allreduce_impl_fn_t) (int *inbuf, int *outbuf, int count, struct ompi_op_t *op,
54 struct ompi_comm_cid_context_t *cid_context,
55 ompi_request_t **req);
56
57
58 struct ompi_comm_cid_context_t {
59 opal_object_t super;
60
61 ompi_communicator_t *newcomm;
62 ompi_communicator_t **newcommp;
63 ompi_communicator_t *comm;
64 ompi_communicator_t *bridgecomm;
65
66 ompi_comm_allreduce_impl_fn_t allreduce_fn;
67
68 int nextcid;
69 int nextlocal_cid;
70 int start;
71 int flag, rflag;
72 int local_leader;
73 int remote_leader;
74 int iter;
75
76 int ok;
77 char *port_string;
78 bool send_first;
79 int pml_tag;
80 char *pmix_tag;
81 };
82
83 typedef struct ompi_comm_cid_context_t ompi_comm_cid_context_t;
84
85 static void mca_comm_cid_context_construct (ompi_comm_cid_context_t *context)
86 {
87 memset ((void *) ((intptr_t) context + sizeof (context->super)), 0, sizeof (*context) - sizeof (context->super));
88 }
89
90 static void mca_comm_cid_context_destruct (ompi_comm_cid_context_t *context)
91 {
92 free (context->port_string);
93 free (context->pmix_tag);
94 }
95
96 OBJ_CLASS_INSTANCE (ompi_comm_cid_context_t, opal_object_t,
97 mca_comm_cid_context_construct,
98 mca_comm_cid_context_destruct);
99
100 struct ompi_comm_allreduce_context_t {
101 opal_object_t super;
102
103 int *inbuf;
104 int *outbuf;
105 int count;
106 struct ompi_op_t *op;
107 ompi_comm_cid_context_t *cid_context;
108 int *tmpbuf;
109
110
111 int peers_comm[3];
112 };
113
114 typedef struct ompi_comm_allreduce_context_t ompi_comm_allreduce_context_t;
115
116 static void ompi_comm_allreduce_context_construct (ompi_comm_allreduce_context_t *context)
117 {
118 memset ((void *) ((intptr_t) context + sizeof (context->super)), 0, sizeof (*context) - sizeof (context->super));
119 }
120
121 static void ompi_comm_allreduce_context_destruct (ompi_comm_allreduce_context_t *context)
122 {
123 free (context->tmpbuf);
124 }
125
126 OBJ_CLASS_INSTANCE (ompi_comm_allreduce_context_t, opal_object_t,
127 ompi_comm_allreduce_context_construct,
128 ompi_comm_allreduce_context_destruct);
129
130
131
132
133
134
135
136
137
138 static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf, int count,
139 struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context,
140 ompi_request_t **req);
141
142
143 static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, int count,
144 struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context,
145 ompi_request_t **req);
146
147 static int ompi_comm_allreduce_group_nb (int *inbuf, int *outbuf, int count,
148 struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context,
149 ompi_request_t **req);
150
151 static int ompi_comm_allreduce_intra_pmix_nb (int *inbuf, int *outbuf, int count,
152 struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context,
153 ompi_request_t **req);
154
155 static int ompi_comm_allreduce_intra_bridge_nb (int *inbuf, int *outbuf, int count,
156 struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context,
157 ompi_request_t **req);
158
159 static opal_mutex_t ompi_cid_lock = OPAL_MUTEX_STATIC_INIT;
160
161
162 int ompi_comm_cid_init (void)
163 {
164 return OMPI_SUCCESS;
165 }
166
167 static ompi_comm_cid_context_t *mca_comm_cid_context_alloc (ompi_communicator_t *newcomm, ompi_communicator_t *comm,
168 ompi_communicator_t *bridgecomm, const void *arg0,
169 const void *arg1, const char *pmix_tag, bool send_first,
170 int mode)
171 {
172 ompi_comm_cid_context_t *context;
173
174 context = OBJ_NEW(ompi_comm_cid_context_t);
175 if (OPAL_UNLIKELY(NULL == context)) {
176 return NULL;
177 }
178
179 context->newcomm = newcomm;
180 context->comm = comm;
181 context->bridgecomm = bridgecomm;
182 context->pml_tag = 0;
183
184
185
186 switch (mode) {
187 case OMPI_COMM_CID_INTRA:
188 context->allreduce_fn = ompi_comm_allreduce_intra_nb;
189 break;
190 case OMPI_COMM_CID_INTER:
191 context->allreduce_fn = ompi_comm_allreduce_inter_nb;
192 break;
193 case OMPI_COMM_CID_GROUP:
194 context->allreduce_fn = ompi_comm_allreduce_group_nb;
195 context->pml_tag = ((int *) arg0)[0];
196 break;
197 case OMPI_COMM_CID_INTRA_PMIX:
198 context->allreduce_fn = ompi_comm_allreduce_intra_pmix_nb;
199 context->local_leader = ((int *) arg0)[0];
200 if (arg1) {
201 context->port_string = strdup ((char *) arg1);
202 }
203 context->pmix_tag = strdup ((char *) pmix_tag);
204 break;
205 case OMPI_COMM_CID_INTRA_BRIDGE:
206 context->allreduce_fn = ompi_comm_allreduce_intra_bridge_nb;
207 context->local_leader = ((int *) arg0)[0];
208 context->remote_leader = ((int *) arg1)[0];
209 break;
210 default:
211 OBJ_RELEASE(context);
212 return NULL;
213 }
214
215 context->send_first = send_first;
216 context->iter = 0;
217 context->ok = 1;
218
219 return context;
220 }
221
222 static ompi_comm_allreduce_context_t *ompi_comm_allreduce_context_alloc (int *inbuf, int *outbuf,
223 int count, struct ompi_op_t *op,
224 ompi_comm_cid_context_t *cid_context)
225 {
226 ompi_comm_allreduce_context_t *context;
227
228 context = OBJ_NEW(ompi_comm_allreduce_context_t);
229 if (OPAL_UNLIKELY(NULL == context)) {
230 return NULL;
231 }
232
233 context->inbuf = inbuf;
234 context->outbuf = outbuf;
235 context->count = count;
236 context->op = op;
237 context->cid_context = cid_context;
238
239 return context;
240 }
241
242
243 static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request);
244
245 static int ompi_comm_checkcid (ompi_comm_request_t *request);
246
247 static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request);
248
249 static volatile int64_t ompi_comm_cid_lowest_id = INT64_MAX;
250
251 int ompi_comm_nextcid_nb (ompi_communicator_t *newcomm, ompi_communicator_t *comm,
252 ompi_communicator_t *bridgecomm, const void *arg0, const void *arg1,
253 bool send_first, int mode, ompi_request_t **req)
254 {
255 ompi_comm_cid_context_t *context;
256 ompi_comm_request_t *request;
257
258 context = mca_comm_cid_context_alloc (newcomm, comm, bridgecomm, arg0, arg1,
259 "nextcid", send_first, mode);
260 if (NULL == context) {
261 return OMPI_ERR_OUT_OF_RESOURCE;
262 }
263
264 context->start = ompi_mpi_communicators.lowest_free;
265
266 request = ompi_comm_request_get ();
267 if (NULL == request) {
268 OBJ_RELEASE(context);
269 return OMPI_ERR_OUT_OF_RESOURCE;
270 }
271
272 request->context = &context->super;
273
274 ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0);
275 ompi_comm_request_start (request);
276
277 *req = &request->super;
278
279
280 return OMPI_SUCCESS;
281 }
282
283 int ompi_comm_nextcid (ompi_communicator_t *newcomm, ompi_communicator_t *comm,
284 ompi_communicator_t *bridgecomm, const void *arg0, const void *arg1,
285 bool send_first, int mode)
286 {
287 ompi_request_t *req;
288 int rc;
289
290 rc = ompi_comm_nextcid_nb (newcomm, comm, bridgecomm, arg0, arg1, send_first, mode, &req);
291 if (OMPI_SUCCESS != rc) {
292 return rc;
293 }
294
295 ompi_request_wait_completion (req);
296 rc = req->req_status.MPI_ERROR;
297 ompi_comm_request_return ((ompi_comm_request_t *) req);
298
299 return rc;
300 }
301
302 static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request)
303 {
304 ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context;
305 int64_t my_id = ((int64_t) ompi_comm_get_cid (context->comm) << 32 | context->pml_tag);
306 ompi_request_t *subreq;
307 bool flag;
308 int ret;
309 int participate = (context->newcomm->c_local_group->grp_my_rank != MPI_UNDEFINED);
310
311 if (OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) {
312 return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0);
313 }
314
315 if (ompi_comm_cid_lowest_id < my_id) {
316 OPAL_THREAD_UNLOCK(&ompi_cid_lock);
317 return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0);
318 }
319
320 ompi_comm_cid_lowest_id = my_id;
321
322
323
324
325 if( participate ){
326 flag = false;
327 context->nextlocal_cid = mca_pml.pml_max_contextid;
328 for (unsigned int i = context->start ; i < mca_pml.pml_max_contextid ; ++i) {
329 flag = opal_pointer_array_test_and_set_item (&ompi_mpi_communicators, i,
330 context->comm);
331 if (true == flag) {
332 context->nextlocal_cid = i;
333 break;
334 }
335 }
336 } else {
337 context->nextlocal_cid = 0;
338 }
339
340 ret = context->allreduce_fn (&context->nextlocal_cid, &context->nextcid, 1, MPI_MAX,
341 context, &subreq);
342
343
344
345 if (OMPI_SUCCESS != ret) {
346 goto err_exit;
347 }
348
349 if ( ((unsigned int) context->nextlocal_cid == mca_pml.pml_max_contextid) ) {
350
351 ret = OMPI_ERR_OUT_OF_RESOURCE;
352 goto err_exit;
353 }
354 OPAL_THREAD_UNLOCK(&ompi_cid_lock);
355
356
357 return ompi_comm_request_schedule_append (request, ompi_comm_checkcid, &subreq, 1);
358 err_exit:
359 if (participate && flag) {
360 opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, context->nextlocal_cid, NULL);
361 }
362 ompi_comm_cid_lowest_id = INT64_MAX;
363 OPAL_THREAD_UNLOCK(&ompi_cid_lock);
364 return ret;
365
366 }
367
368 static int ompi_comm_checkcid (ompi_comm_request_t *request)
369 {
370 ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context;
371 ompi_request_t *subreq;
372 int ret;
373 int participate = (context->newcomm->c_local_group->grp_my_rank != MPI_UNDEFINED);
374
375 if (OMPI_SUCCESS != request->super.req_status.MPI_ERROR) {
376 if (participate) {
377 opal_pointer_array_set_item(&ompi_mpi_communicators, context->nextlocal_cid, NULL);
378 }
379 return request->super.req_status.MPI_ERROR;
380 }
381
382 if (OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) {
383 return ompi_comm_request_schedule_append (request, ompi_comm_checkcid, NULL, 0);
384 }
385
386 if( !participate ){
387 context->flag = 1;
388 } else {
389 context->flag = (context->nextcid == context->nextlocal_cid);
390 if ( participate && !context->flag) {
391 opal_pointer_array_set_item(&ompi_mpi_communicators, context->nextlocal_cid, NULL);
392
393 context->flag = opal_pointer_array_test_and_set_item (&ompi_mpi_communicators,
394 context->nextcid, context->comm);
395 }
396 }
397
398 ++context->iter;
399
400 ret = context->allreduce_fn (&context->flag, &context->rflag, 1, MPI_MIN, context, &subreq);
401 if (OMPI_SUCCESS == ret) {
402 ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, &subreq, 1);
403 } else {
404 if (participate && context->flag ) {
405 opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, context->nextlocal_cid, NULL);
406 }
407 ompi_comm_cid_lowest_id = INT64_MAX;
408 }
409
410 OPAL_THREAD_UNLOCK(&ompi_cid_lock);
411 return ret;
412 }
413
414 static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request)
415 {
416 ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context;
417 int participate = (context->newcomm->c_local_group->grp_my_rank != MPI_UNDEFINED);
418
419 if (OMPI_SUCCESS != request->super.req_status.MPI_ERROR) {
420 if (participate) {
421 opal_pointer_array_set_item(&ompi_mpi_communicators, context->nextcid, NULL);
422 }
423 return request->super.req_status.MPI_ERROR;
424 }
425
426 if (OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) {
427 return ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, NULL, 0);
428 }
429
430 if (0 != context->rflag) {
431 if( !participate ) {
432
433
434
435
436 context->nextlocal_cid = mca_pml.pml_max_contextid;
437 for (unsigned int i = context->start ; i < mca_pml.pml_max_contextid ; ++i) {
438 bool flag;
439 flag = opal_pointer_array_test_and_set_item (&ompi_mpi_communicators, i,
440 context->comm);
441 if (true == flag) {
442 context->nextlocal_cid = i;
443 break;
444 }
445 }
446 context->nextcid = context->nextlocal_cid;
447 }
448
449
450 context->newcomm->c_contextid = context->nextcid;
451 opal_pointer_array_set_item (&ompi_mpi_communicators, context->nextcid, context->newcomm);
452
453
454 ompi_comm_cid_lowest_id = INT64_MAX;
455 OPAL_THREAD_UNLOCK(&ompi_cid_lock);
456
457
458 return OMPI_SUCCESS;
459 }
460
461 if (participate && (0 != context->flag)) {
462
463 opal_pointer_array_set_item (&ompi_mpi_communicators, context->nextcid, NULL);
464 context->start = context->nextcid + 1;
465 }
466
467 ++context->iter;
468
469 OPAL_THREAD_UNLOCK(&ompi_cid_lock);
470
471
472 return ompi_comm_allreduce_getnextcid (request);
473 }
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494 static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request);
495
496 int ompi_comm_activate_nb (ompi_communicator_t **newcomm, ompi_communicator_t *comm,
497 ompi_communicator_t *bridgecomm, const void *arg0,
498 const void *arg1, bool send_first, int mode, ompi_request_t **req)
499 {
500 ompi_comm_cid_context_t *context;
501 ompi_comm_request_t *request;
502 ompi_request_t *subreq;
503 int ret = 0;
504
505 context = mca_comm_cid_context_alloc (*newcomm, comm, bridgecomm, arg0, arg1, "activate",
506 send_first, mode);
507 if (NULL == context) {
508 return OMPI_ERR_OUT_OF_RESOURCE;
509 }
510
511
512 context->newcommp = newcomm;
513
514 request = ompi_comm_request_get ();
515 if (NULL == request) {
516 OBJ_RELEASE(context);
517 return OMPI_ERR_OUT_OF_RESOURCE;
518 }
519
520 request->context = &context->super;
521
522 if (MPI_UNDEFINED != (*newcomm)->c_local_group->grp_my_rank) {
523
524 if ( OMPI_SUCCESS != (ret = MCA_PML_CALL(add_comm(*newcomm))) ) {
525 OBJ_RELEASE(*newcomm);
526 OBJ_RELEASE(context);
527 *newcomm = MPI_COMM_NULL;
528 return ret;
529 }
530 OMPI_COMM_SET_PML_ADDED(*newcomm);
531 }
532
533
534
535
536 ret = context->allreduce_fn (&context->ok, &context->ok, 1, MPI_MIN, context,
537 &subreq);
538 if (OMPI_SUCCESS != ret) {
539 ompi_comm_request_return (request);
540 return ret;
541 }
542
543 ompi_comm_request_schedule_append (request, ompi_comm_activate_nb_complete, &subreq, 1);
544 ompi_comm_request_start (request);
545
546 *req = &request->super;
547
548 return OMPI_SUCCESS;
549 }
550
551 int ompi_comm_activate (ompi_communicator_t **newcomm, ompi_communicator_t *comm,
552 ompi_communicator_t *bridgecomm, const void *arg0,
553 const void *arg1, bool send_first, int mode)
554 {
555 ompi_request_t *req;
556 int rc;
557
558 rc = ompi_comm_activate_nb (newcomm, comm, bridgecomm, arg0, arg1, send_first, mode, &req);
559 if (OMPI_SUCCESS != rc) {
560 return rc;
561 }
562
563 ompi_request_wait_completion (req);
564 rc = req->req_status.MPI_ERROR;
565 ompi_comm_request_return ((ompi_comm_request_t *) req);
566
567 return rc;
568 }
569
570 static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request)
571 {
572 ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context;
573 int ret;
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599 if (MPI_UNDEFINED == (context->newcomm)->c_local_group->grp_my_rank) {
600 return OMPI_SUCCESS;
601 }
602
603
604
605 if (OMPI_SUCCESS != (ret = mca_coll_base_comm_select(context->newcomm))) {
606 OBJ_RELEASE(context->newcomm);
607 *context->newcommp = MPI_COMM_NULL;
608 return ret;
609 }
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628 if (OMPI_COMM_IS_INTER(context->newcomm)) {
629 if (OMPI_COMM_CID_IS_LOWER(context->newcomm, context->comm)) {
630 OMPI_COMM_SET_EXTRA_RETAIN (context->newcomm);
631 OBJ_RETAIN (context->newcomm);
632 }
633 }
634
635
636 return OMPI_SUCCESS;
637 }
638
639
640
641
642 static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf, int count, struct ompi_op_t *op,
643 ompi_comm_cid_context_t *context, ompi_request_t **req)
644 {
645 ompi_communicator_t *comm = context->comm;
646
647 return comm->c_coll->coll_iallreduce (inbuf, outbuf, count, MPI_INT, op, comm,
648 req, comm->c_coll->coll_iallreduce_module);
649 }
650
651
652 static int ompi_comm_allreduce_inter_leader_exchange (ompi_comm_request_t *request);
653 static int ompi_comm_allreduce_inter_leader_reduce (ompi_comm_request_t *request);
654 static int ompi_comm_allreduce_inter_bcast (ompi_comm_request_t *request);
655
656 static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf,
657 int count, struct ompi_op_t *op,
658 ompi_comm_cid_context_t *cid_context,
659 ompi_request_t **req)
660 {
661 ompi_communicator_t *intercomm = cid_context->comm;
662 ompi_comm_allreduce_context_t *context;
663 ompi_comm_request_t *request;
664 ompi_request_t *subreq;
665 int local_rank, rc;
666
667 if (!OMPI_COMM_IS_INTER (cid_context->comm)) {
668 return MPI_ERR_COMM;
669 }
670
671 request = ompi_comm_request_get ();
672 if (OPAL_UNLIKELY(NULL == request)) {
673 return OMPI_ERR_OUT_OF_RESOURCE;
674 }
675
676 context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context);
677 if (OPAL_UNLIKELY(NULL == context)) {
678 ompi_comm_request_return (request);
679 return OMPI_ERR_OUT_OF_RESOURCE;
680 }
681
682 request->context = &context->super;
683
684
685 local_rank = ompi_comm_rank (intercomm);
686
687 if (0 == local_rank) {
688 context->tmpbuf = (int *) calloc (count, sizeof(int));
689 if (OPAL_UNLIKELY (NULL == context->tmpbuf)) {
690 ompi_comm_request_return (request);
691 return OMPI_ERR_OUT_OF_RESOURCE;
692 }
693 }
694
695
696
697 rc = intercomm->c_local_comm->c_coll->coll_ireduce (inbuf, context->tmpbuf, count, MPI_INT, op, 0,
698 intercomm->c_local_comm, &subreq,
699 intercomm->c_local_comm->c_coll->coll_ireduce_module);
700 if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
701 ompi_comm_request_return (request);
702 return rc;
703 }
704
705 if (0 == local_rank) {
706 ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_leader_exchange, &subreq, 1);
707 } else {
708 ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_bcast, &subreq, 1);
709 }
710
711 ompi_comm_request_start (request);
712 *req = &request->super;
713
714 return OMPI_SUCCESS;
715 }
716
717
718 static int ompi_comm_allreduce_inter_leader_exchange (ompi_comm_request_t *request)
719 {
720 ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
721 ompi_communicator_t *intercomm = context->cid_context->comm;
722 ompi_request_t *subreqs[2];
723 int rc;
724
725
726
727 rc = MCA_PML_CALL(irecv (context->outbuf, context->count, MPI_INT, 0, OMPI_COMM_ALLREDUCE_TAG,
728 intercomm, subreqs));
729 if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
730 return rc;
731 }
732
733 rc = MCA_PML_CALL(isend (context->tmpbuf, context->count, MPI_INT, 0, OMPI_COMM_ALLREDUCE_TAG,
734 MCA_PML_BASE_SEND_STANDARD, intercomm, subreqs + 1));
735 if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
736 return rc;
737 }
738
739 return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_leader_reduce, subreqs, 2);
740 }
741
742 static int ompi_comm_allreduce_inter_leader_reduce (ompi_comm_request_t *request)
743 {
744 ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
745
746 ompi_op_reduce (context->op, context->tmpbuf, context->outbuf, context->count, MPI_INT);
747
748 return ompi_comm_allreduce_inter_bcast (request);
749 }
750
751
752 static int ompi_comm_allreduce_inter_bcast (ompi_comm_request_t *request)
753 {
754 ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
755 ompi_communicator_t *comm = context->cid_context->comm->c_local_comm;
756 ompi_request_t *subreq;
757 int rc;
758
759
760 rc = comm->c_coll->coll_ibcast (context->outbuf, context->count, MPI_INT, 0, comm,
761 &subreq, comm->c_coll->coll_ibcast_module);
762 if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
763 return rc;
764 }
765
766 return ompi_comm_request_schedule_append (request, NULL, &subreq, 1);
767 }
768
769 static int ompi_comm_allreduce_bridged_schedule_bcast (ompi_comm_request_t *request)
770 {
771 ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
772 ompi_communicator_t *comm = context->cid_context->comm;
773 ompi_request_t *subreq;
774 int rc;
775
776 rc = comm->c_coll->coll_ibcast (context->outbuf, context->count, MPI_INT,
777 context->cid_context->local_leader, comm,
778 &subreq, comm->c_coll->coll_ibcast_module);
779 if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
780 return rc;
781 }
782
783 return ompi_comm_request_schedule_append (request, NULL, &subreq, 1);
784 }
785
786 static int ompi_comm_allreduce_bridged_xchng_complete (ompi_comm_request_t *request)
787 {
788 ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
789
790
791 ompi_op_reduce (context->op, context->tmpbuf, context->outbuf, context->count, MPI_INT);
792
793
794 return ompi_comm_allreduce_bridged_schedule_bcast (request);
795 }
796
797 static int ompi_comm_allreduce_bridged_reduce_complete (ompi_comm_request_t *request)
798 {
799 ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
800 ompi_communicator_t *bridgecomm = context->cid_context->bridgecomm;
801 ompi_request_t *subreq[2];
802 int rc;
803
804
805 rc = MCA_PML_CALL(irecv (context->outbuf, context->count, MPI_INT, context->cid_context->remote_leader,
806 OMPI_COMM_ALLREDUCE_TAG, bridgecomm, subreq + 1));
807 if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
808 return rc;
809 }
810
811 rc = MCA_PML_CALL(isend (context->tmpbuf, context->count, MPI_INT, context->cid_context->remote_leader,
812 OMPI_COMM_ALLREDUCE_TAG, MCA_PML_BASE_SEND_STANDARD, bridgecomm,
813 subreq));
814 if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
815 return rc;
816 }
817
818 return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_bridged_xchng_complete, subreq, 2);
819 }
820
821 static int ompi_comm_allreduce_intra_bridge_nb (int *inbuf, int *outbuf,
822 int count, struct ompi_op_t *op,
823 ompi_comm_cid_context_t *cid_context,
824 ompi_request_t **req)
825 {
826 ompi_communicator_t *comm = cid_context->comm;
827 ompi_comm_allreduce_context_t *context;
828 int local_rank = ompi_comm_rank (comm);
829 ompi_comm_request_t *request;
830 ompi_request_t *subreq;
831 int rc;
832
833 context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context);
834 if (OPAL_UNLIKELY(NULL == context)) {
835 return OMPI_ERR_OUT_OF_RESOURCE;
836 }
837
838 if (local_rank == cid_context->local_leader) {
839 context->tmpbuf = (int *) calloc (count, sizeof (int));
840 if (OPAL_UNLIKELY(NULL == context->tmpbuf)) {
841 OBJ_RELEASE(context);
842 return OMPI_ERR_OUT_OF_RESOURCE;
843 }
844 }
845
846 request = ompi_comm_request_get ();
847 if (OPAL_UNLIKELY(NULL == request)) {
848 OBJ_RELEASE(context);
849 return OMPI_ERR_OUT_OF_RESOURCE;
850 }
851
852 request->context = &context->super;
853
854 if (cid_context->local_leader == local_rank) {
855 memcpy (context->tmpbuf, inbuf, count * sizeof (int));
856 }
857
858
859 rc = comm->c_coll->coll_ireduce (inbuf, context->tmpbuf, count, MPI_INT, op,
860 cid_context->local_leader, comm, &subreq,
861 comm->c_coll->coll_ireduce_module);
862 if ( OMPI_SUCCESS != rc ) {
863 ompi_comm_request_return (request);
864 return rc;
865 }
866
867 if (cid_context->local_leader == local_rank) {
868 rc = ompi_comm_request_schedule_append (request, ompi_comm_allreduce_bridged_reduce_complete,
869 &subreq, 1);
870 } else {
871
872 ompi_comm_request_schedule_append (request, NULL, &subreq, 1);
873
874 rc = ompi_comm_allreduce_bridged_schedule_bcast (request);
875 }
876
877 if (OMPI_SUCCESS != rc) {
878 ompi_comm_request_return (request);
879 return rc;
880 }
881
882 ompi_comm_request_start (request);
883
884 *req = &request->super;
885
886 return OMPI_SUCCESS;
887 }
888
889 static int ompi_comm_allreduce_pmix_reduce_complete (ompi_comm_request_t *request)
890 {
891 ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
892 ompi_comm_cid_context_t *cid_context = context->cid_context;
893 int32_t size_count = context->count;
894 opal_value_t info;
895 opal_pmix_pdata_t pdat;
896 opal_buffer_t sbuf;
897 int rc;
898 int bytes_written;
899 const int output_id = 0;
900 const int verbosity_level = 1;
901
902 OBJ_CONSTRUCT(&sbuf, opal_buffer_t);
903
904 if (OPAL_SUCCESS != (rc = opal_dss.pack(&sbuf, context->tmpbuf, (int32_t)context->count, OPAL_INT))) {
905 OBJ_DESTRUCT(&sbuf);
906 opal_output_verbose (verbosity_level, output_id, "pack failed. rc %d\n", rc);
907 return rc;
908 }
909
910 OBJ_CONSTRUCT(&info, opal_value_t);
911 OBJ_CONSTRUCT(&pdat, opal_pmix_pdata_t);
912
913 info.type = OPAL_BYTE_OBJECT;
914 pdat.value.type = OPAL_BYTE_OBJECT;
915
916 opal_dss.unload(&sbuf, (void**)&info.data.bo.bytes, &info.data.bo.size);
917 OBJ_DESTRUCT(&sbuf);
918
919 bytes_written = opal_asprintf(&info.key,
920 cid_context->send_first ? "%s:%s:send:%d"
921 : "%s:%s:recv:%d",
922 cid_context->port_string,
923 cid_context->pmix_tag,
924 cid_context->iter);
925
926 if (bytes_written == -1) {
927 opal_output_verbose (verbosity_level, output_id, "writing info.key failed\n");
928 } else {
929 bytes_written = opal_asprintf(&pdat.value.key,
930 cid_context->send_first ? "%s:%s:recv:%d"
931 : "%s:%s:send:%d",
932 cid_context->port_string,
933 cid_context->pmix_tag,
934 cid_context->iter);
935
936 if (bytes_written == -1) {
937 opal_output_verbose (verbosity_level, output_id, "writing pdat.value.key failed\n");
938 }
939 }
940
941 if (bytes_written == -1) {
942
943
944 opal_output_verbose (verbosity_level, output_id, "send first: %d\n", cid_context->send_first);
945 opal_output_verbose (verbosity_level, output_id, "port string: %s\n", cid_context->port_string);
946 opal_output_verbose (verbosity_level, output_id, "pmix tag: %s\n", cid_context->pmix_tag);
947 opal_output_verbose (verbosity_level, output_id, "iter: %d\n", cid_context->iter);
948 return OMPI_ERR_OUT_OF_RESOURCE;
949 }
950
951
952
953 OPAL_PMIX_EXCHANGE(rc, &info, &pdat, 600);
954 OBJ_DESTRUCT(&info);
955 if (OPAL_SUCCESS != rc) {
956 OBJ_DESTRUCT(&pdat);
957 return rc;
958 }
959
960 OBJ_CONSTRUCT(&sbuf, opal_buffer_t);
961 opal_dss.load(&sbuf, pdat.value.data.bo.bytes, pdat.value.data.bo.size);
962 pdat.value.data.bo.bytes = NULL;
963 pdat.value.data.bo.size = 0;
964 OBJ_DESTRUCT(&pdat);
965
966 rc = opal_dss.unpack (&sbuf, context->outbuf, &size_count, OPAL_INT);
967 OBJ_DESTRUCT(&sbuf);
968 if (OPAL_UNLIKELY(OPAL_SUCCESS != rc)) {
969 return rc;
970 }
971
972 ompi_op_reduce (context->op, context->tmpbuf, context->outbuf, size_count, MPI_INT);
973
974 return ompi_comm_allreduce_bridged_schedule_bcast (request);
975 }
976
977 static int ompi_comm_allreduce_intra_pmix_nb (int *inbuf, int *outbuf,
978 int count, struct ompi_op_t *op,
979 ompi_comm_cid_context_t *cid_context,
980 ompi_request_t **req)
981 {
982 ompi_communicator_t *comm = cid_context->comm;
983 ompi_comm_allreduce_context_t *context;
984 int local_rank = ompi_comm_rank (comm);
985 ompi_comm_request_t *request;
986 ompi_request_t *subreq;
987 int rc;
988
989 context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context);
990 if (OPAL_UNLIKELY(NULL == context)) {
991 return OMPI_ERR_OUT_OF_RESOURCE;
992 }
993
994 if (cid_context->local_leader == local_rank) {
995 context->tmpbuf = (int *) calloc (count, sizeof(int));
996 if (OPAL_UNLIKELY(NULL == context->tmpbuf)) {
997 OBJ_RELEASE(context);
998 return OMPI_ERR_OUT_OF_RESOURCE;
999 }
1000 }
1001
1002 request = ompi_comm_request_get ();
1003 if (NULL == request) {
1004 OBJ_RELEASE(context);
1005 return OMPI_ERR_OUT_OF_RESOURCE;
1006 }
1007
1008 request->context = &context->super;
1009
1010
1011 rc = comm->c_coll->coll_ireduce (inbuf, context->tmpbuf, count, MPI_INT, op,
1012 cid_context->local_leader, comm,
1013 &subreq, comm->c_coll->coll_ireduce_module);
1014 if ( OMPI_SUCCESS != rc ) {
1015 ompi_comm_request_return (request);
1016 return rc;
1017 }
1018
1019 if (cid_context->local_leader == local_rank) {
1020 rc = ompi_comm_request_schedule_append (request, ompi_comm_allreduce_pmix_reduce_complete,
1021 &subreq, 1);
1022 } else {
1023
1024 rc = ompi_comm_request_schedule_append (request, NULL, &subreq, 1);
1025
1026 rc = ompi_comm_allreduce_bridged_schedule_bcast (request);
1027 }
1028
1029 if (OMPI_SUCCESS != rc) {
1030 ompi_comm_request_return (request);
1031 return rc;
1032 }
1033
1034 ompi_comm_request_start (request);
1035 *req = (ompi_request_t *) request;
1036
1037
1038 return OMPI_SUCCESS;
1039 }
1040
1041 static int ompi_comm_allreduce_group_broadcast (ompi_comm_request_t *request)
1042 {
1043 ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
1044 ompi_comm_cid_context_t *cid_context = context->cid_context;
1045 ompi_request_t *subreq[2];
1046 int subreq_count = 0;
1047 int rc;
1048
1049 for (int i = 0 ; i < 2 ; ++i) {
1050 if (MPI_PROC_NULL != context->peers_comm[i + 1]) {
1051 rc = MCA_PML_CALL(isend(context->outbuf, context->count, MPI_INT, context->peers_comm[i+1],
1052 cid_context->pml_tag, MCA_PML_BASE_SEND_STANDARD,
1053 cid_context->comm, subreq + subreq_count++));
1054 if (OMPI_SUCCESS != rc) {
1055 return rc;
1056 }
1057 }
1058 }
1059
1060 return ompi_comm_request_schedule_append (request, NULL, subreq, subreq_count);
1061 }
1062
1063 static int ompi_comm_allreduce_group_recv_complete (ompi_comm_request_t *request)
1064 {
1065 ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
1066 ompi_comm_cid_context_t *cid_context = context->cid_context;
1067 int *tmp = context->tmpbuf;
1068 ompi_request_t *subreq[2];
1069 int rc;
1070
1071 for (int i = 0 ; i < 2 ; ++i) {
1072 if (MPI_PROC_NULL != context->peers_comm[i + 1]) {
1073 ompi_op_reduce (context->op, tmp, context->outbuf, context->count, MPI_INT);
1074 tmp += context->count;
1075 }
1076 }
1077
1078 if (MPI_PROC_NULL != context->peers_comm[0]) {
1079
1080 rc = MCA_PML_CALL(isend(context->outbuf, context->count, MPI_INT, context->peers_comm[0],
1081 cid_context->pml_tag, MCA_PML_BASE_SEND_STANDARD,
1082 cid_context->comm, subreq));
1083 if (OMPI_SUCCESS != rc) {
1084 return rc;
1085 }
1086
1087 rc = MCA_PML_CALL(irecv(context->outbuf, context->count, MPI_INT, context->peers_comm[0],
1088 cid_context->pml_tag, cid_context->comm, subreq + 1));
1089 if (OMPI_SUCCESS != rc) {
1090 return rc;
1091 }
1092
1093 return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_group_broadcast, subreq, 2);
1094 }
1095
1096
1097 return ompi_comm_allreduce_group_broadcast (request);
1098 }
1099
1100 static int ompi_comm_allreduce_group_nb (int *inbuf, int *outbuf, int count,
1101 struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context,
1102 ompi_request_t **req)
1103 {
1104 ompi_group_t *group = cid_context->newcomm->c_local_group;
1105 const int group_size = ompi_group_size (group);
1106 const int group_rank = ompi_group_rank (group);
1107 ompi_communicator_t *comm = cid_context->comm;
1108 int peers_group[3], *tmp, subreq_count = 0;
1109 ompi_comm_allreduce_context_t *context;
1110 ompi_comm_request_t *request;
1111 ompi_request_t *subreq[3];
1112
1113 context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context);
1114 if (NULL == context) {
1115 return OMPI_ERR_OUT_OF_RESOURCE;
1116 }
1117
1118 tmp = context->tmpbuf = calloc (sizeof (int), count * 3);
1119 if (NULL == context->tmpbuf) {
1120 OBJ_RELEASE(context);
1121 return OMPI_ERR_OUT_OF_RESOURCE;
1122 }
1123
1124 request = ompi_comm_request_get ();
1125 if (NULL == request) {
1126 OBJ_RELEASE(context);
1127 return OMPI_ERR_OUT_OF_RESOURCE;
1128 }
1129
1130 request->context = &context->super;
1131
1132
1133 peers_group[0] = group_rank ? ((group_rank - 1) >> 1) : MPI_PROC_NULL;
1134 peers_group[1] = (group_rank * 2 + 1) < group_size ? group_rank * 2 + 1: MPI_PROC_NULL;
1135 peers_group[2] = (group_rank * 2 + 2) < group_size ? group_rank * 2 + 2 : MPI_PROC_NULL;
1136
1137
1138 ompi_group_translate_ranks (group, 3, peers_group, comm->c_local_group, context->peers_comm);
1139
1140
1141 memmove (outbuf, inbuf, sizeof (int) * count);
1142
1143 for (int i = 0 ; i < 2 ; ++i) {
1144 if (MPI_PROC_NULL != context->peers_comm[i + 1]) {
1145 int rc = MCA_PML_CALL(irecv(tmp, count, MPI_INT, context->peers_comm[i + 1],
1146 cid_context->pml_tag, comm, subreq + subreq_count++));
1147 if (OMPI_SUCCESS != rc) {
1148 ompi_comm_request_return (request);
1149 return rc;
1150 }
1151
1152 tmp += count;
1153 }
1154 }
1155
1156 ompi_comm_request_schedule_append (request, ompi_comm_allreduce_group_recv_complete, subreq, subreq_count);
1157
1158 ompi_comm_request_start (request);
1159 *req = &request->super;
1160
1161 return OMPI_SUCCESS;
1162 }