This source file includes following definitions.
- mca_comm_cid_context_construct
- mca_comm_cid_context_destruct
- ompi_comm_allreduce_context_construct
- ompi_comm_allreduce_context_destruct
- ompi_comm_cid_init
- mca_comm_cid_context_alloc
- ompi_comm_allreduce_context_alloc
- ompi_comm_nextcid_nb
- ompi_comm_nextcid
- ompi_comm_allreduce_getnextcid
- ompi_comm_checkcid
- ompi_comm_nextcid_check_flag
- ompi_comm_activate_nb
- ompi_comm_activate
- ompi_comm_activate_nb_complete
- ompi_comm_allreduce_intra_nb
- ompi_comm_allreduce_inter_nb
- ompi_comm_allreduce_inter_leader_exchange
- ompi_comm_allreduce_inter_leader_reduce
- ompi_comm_allreduce_inter_bcast
- ompi_comm_allreduce_bridged_schedule_bcast
- ompi_comm_allreduce_bridged_xchng_complete
- ompi_comm_allreduce_bridged_reduce_complete
- ompi_comm_allreduce_intra_bridge_nb
- ompi_comm_allreduce_pmix_reduce_complete
- ompi_comm_allreduce_intra_pmix_nb
- ompi_comm_allreduce_group_broadcast
- ompi_comm_allreduce_group_recv_complete
- ompi_comm_allreduce_group_nb
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 
  17 
  18 
  19 
  20 
  21 
  22 
  23 
  24 
  25 
  26 
  27 
  28 
  29 
  30 
  31 
  32 
  33 #include "ompi_config.h"
  34 
  35 #include "opal/dss/dss.h"
  36 #include "opal/mca/pmix/pmix.h"
  37 #include "opal/util/printf.h"
  38 
  39 #include "ompi/proc/proc.h"
  40 #include "ompi/communicator/communicator.h"
  41 #include "ompi/op/op.h"
  42 #include "ompi/constants.h"
  43 #include "opal/class/opal_pointer_array.h"
  44 #include "opal/class/opal_list.h"
  45 #include "ompi/mca/pml/pml.h"
  46 #include "ompi/mca/rte/rte.h"
  47 #include "ompi/mca/coll/base/base.h"
  48 #include "ompi/request/request.h"
  49 #include "ompi/runtime/mpiruntime.h"
  50 
  51 struct ompi_comm_cid_context_t;
  52 
  53 typedef int (*ompi_comm_allreduce_impl_fn_t) (int *inbuf, int *outbuf, int count, struct ompi_op_t *op,
  54                                               struct ompi_comm_cid_context_t *cid_context,
  55                                               ompi_request_t **req);
  56 
  57 
  58 struct ompi_comm_cid_context_t {
  59     opal_object_t super;
  60 
  61     ompi_communicator_t *newcomm;
  62     ompi_communicator_t **newcommp;
  63     ompi_communicator_t *comm;
  64     ompi_communicator_t *bridgecomm;
  65 
  66     ompi_comm_allreduce_impl_fn_t allreduce_fn;
  67 
  68     int nextcid;
  69     int nextlocal_cid;
  70     int start;
  71     int flag, rflag;
  72     int local_leader;
  73     int remote_leader;
  74     int iter;
  75     
  76     int ok;
  77     char *port_string;
  78     bool send_first;
  79     int pml_tag;
  80     char *pmix_tag;
  81 };
  82 
  83 typedef struct ompi_comm_cid_context_t ompi_comm_cid_context_t;
  84 
  85 static void mca_comm_cid_context_construct (ompi_comm_cid_context_t *context)
  86 {
  87     memset ((void *) ((intptr_t) context + sizeof (context->super)), 0, sizeof (*context) - sizeof (context->super));
  88 }
  89 
  90 static void mca_comm_cid_context_destruct (ompi_comm_cid_context_t *context)
  91 {
  92     free (context->port_string);
  93     free (context->pmix_tag);
  94 }
  95 
  96 OBJ_CLASS_INSTANCE (ompi_comm_cid_context_t, opal_object_t,
  97                     mca_comm_cid_context_construct,
  98                     mca_comm_cid_context_destruct);
  99 
 100 struct ompi_comm_allreduce_context_t {
 101     opal_object_t super;
 102 
 103     int *inbuf;
 104     int *outbuf;
 105     int count;
 106     struct ompi_op_t *op;
 107     ompi_comm_cid_context_t *cid_context;
 108     int *tmpbuf;
 109 
 110     
 111     int peers_comm[3];
 112 };
 113 
 114 typedef struct ompi_comm_allreduce_context_t ompi_comm_allreduce_context_t;
 115 
 116 static void ompi_comm_allreduce_context_construct (ompi_comm_allreduce_context_t *context)
 117 {
 118     memset ((void *) ((intptr_t) context + sizeof (context->super)), 0, sizeof (*context) - sizeof (context->super));
 119 }
 120 
 121 static void ompi_comm_allreduce_context_destruct (ompi_comm_allreduce_context_t *context)
 122 {
 123     free (context->tmpbuf);
 124 }
 125 
 126 OBJ_CLASS_INSTANCE (ompi_comm_allreduce_context_t, opal_object_t,
 127                     ompi_comm_allreduce_context_construct,
 128                     ompi_comm_allreduce_context_destruct);
 129 
 130 
 131 
 132 
 133 
 134 
 135 
 136 
 137 
 138 static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf, int count,
 139                                          struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context,
 140                                          ompi_request_t **req);
 141 
 142 
 143 static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, int count,
 144                                          struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context,
 145                                          ompi_request_t **req);
 146 
 147 static int ompi_comm_allreduce_group_nb (int *inbuf, int *outbuf, int count,
 148                                          struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context,
 149                                          ompi_request_t **req);
 150 
 151 static int ompi_comm_allreduce_intra_pmix_nb (int *inbuf, int *outbuf, int count,
 152                                               struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context,
 153                                               ompi_request_t **req);
 154 
 155 static int ompi_comm_allreduce_intra_bridge_nb (int *inbuf, int *outbuf, int count,
 156                                                 struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context,
 157                                                 ompi_request_t **req);
 158 
 159 static opal_mutex_t ompi_cid_lock = OPAL_MUTEX_STATIC_INIT;
 160 
 161 
 162 int ompi_comm_cid_init (void)
 163 {
 164     return OMPI_SUCCESS;
 165 }
 166 
 167 static ompi_comm_cid_context_t *mca_comm_cid_context_alloc (ompi_communicator_t *newcomm, ompi_communicator_t *comm,
 168                                                             ompi_communicator_t *bridgecomm, const void *arg0,
 169                                                             const void *arg1, const char *pmix_tag, bool send_first,
 170                                                             int mode)
 171 {
 172     ompi_comm_cid_context_t *context;
 173 
 174     context = OBJ_NEW(ompi_comm_cid_context_t);
 175     if (OPAL_UNLIKELY(NULL == context)) {
 176         return NULL;
 177     }
 178 
 179     context->newcomm       = newcomm;
 180     context->comm          = comm;
 181     context->bridgecomm    = bridgecomm;
 182     context->pml_tag       = 0;
 183 
 184     
 185 
 186     switch (mode) {
 187     case OMPI_COMM_CID_INTRA:
 188         context->allreduce_fn = ompi_comm_allreduce_intra_nb;
 189         break;
 190     case OMPI_COMM_CID_INTER:
 191         context->allreduce_fn = ompi_comm_allreduce_inter_nb;
 192         break;
 193     case OMPI_COMM_CID_GROUP:
 194         context->allreduce_fn = ompi_comm_allreduce_group_nb;
 195         context->pml_tag = ((int *) arg0)[0];
 196         break;
 197     case OMPI_COMM_CID_INTRA_PMIX:
 198         context->allreduce_fn = ompi_comm_allreduce_intra_pmix_nb;
 199         context->local_leader = ((int *) arg0)[0];
 200         if (arg1) {
 201             context->port_string = strdup ((char *) arg1);
 202         }
 203         context->pmix_tag = strdup ((char *) pmix_tag);
 204         break;
 205     case OMPI_COMM_CID_INTRA_BRIDGE:
 206         context->allreduce_fn = ompi_comm_allreduce_intra_bridge_nb;
 207         context->local_leader = ((int *) arg0)[0];
 208         context->remote_leader = ((int *) arg1)[0];
 209         break;
 210     default:
 211         OBJ_RELEASE(context);
 212         return NULL;
 213     }
 214 
 215     context->send_first = send_first;
 216     context->iter = 0;
 217     context->ok = 1;
 218 
 219     return context;
 220 }
 221 
 222 static ompi_comm_allreduce_context_t *ompi_comm_allreduce_context_alloc (int *inbuf, int *outbuf,
 223                                                                          int count, struct ompi_op_t *op,
 224                                                                          ompi_comm_cid_context_t *cid_context)
 225 {
 226     ompi_comm_allreduce_context_t *context;
 227 
 228     context = OBJ_NEW(ompi_comm_allreduce_context_t);
 229     if (OPAL_UNLIKELY(NULL == context)) {
 230         return NULL;
 231     }
 232 
 233     context->inbuf = inbuf;
 234     context->outbuf = outbuf;
 235     context->count = count;
 236     context->op = op;
 237     context->cid_context = cid_context;
 238 
 239     return context;
 240 }
 241 
 242 
 243 static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request);
 244 
 245 static int ompi_comm_checkcid (ompi_comm_request_t *request);
 246 
 247 static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request);
 248 
 249 static volatile int64_t ompi_comm_cid_lowest_id = INT64_MAX;
 250 
 251 int ompi_comm_nextcid_nb (ompi_communicator_t *newcomm, ompi_communicator_t *comm,
 252                           ompi_communicator_t *bridgecomm, const void *arg0, const void *arg1,
 253                           bool send_first, int mode, ompi_request_t **req)
 254 {
 255     ompi_comm_cid_context_t *context;
 256     ompi_comm_request_t *request;
 257 
 258     context = mca_comm_cid_context_alloc (newcomm, comm, bridgecomm, arg0, arg1,
 259                                           "nextcid", send_first, mode);
 260     if (NULL == context) {
 261         return OMPI_ERR_OUT_OF_RESOURCE;
 262     }
 263 
 264     context->start = ompi_mpi_communicators.lowest_free;
 265 
 266     request = ompi_comm_request_get ();
 267     if (NULL == request) {
 268         OBJ_RELEASE(context);
 269         return OMPI_ERR_OUT_OF_RESOURCE;
 270     }
 271 
 272     request->context = &context->super;
 273 
 274     ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0);
 275     ompi_comm_request_start (request);
 276 
 277     *req = &request->super;
 278 
 279 
 280     return OMPI_SUCCESS;
 281 }
 282 
 283 int ompi_comm_nextcid (ompi_communicator_t *newcomm, ompi_communicator_t *comm,
 284                        ompi_communicator_t *bridgecomm, const void *arg0, const void *arg1,
 285                        bool send_first, int mode)
 286 {
 287     ompi_request_t *req;
 288     int rc;
 289 
 290     rc = ompi_comm_nextcid_nb (newcomm, comm, bridgecomm, arg0, arg1, send_first, mode, &req);
 291     if (OMPI_SUCCESS != rc) {
 292         return rc;
 293     }
 294 
 295     ompi_request_wait_completion (req);
 296     rc = req->req_status.MPI_ERROR;
 297     ompi_comm_request_return ((ompi_comm_request_t *) req);
 298 
 299     return rc;
 300 }
 301 
 302 static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request)
 303 {
 304     ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context;
 305     int64_t my_id = ((int64_t) ompi_comm_get_cid (context->comm) << 32 | context->pml_tag);
 306     ompi_request_t *subreq;
 307     bool flag;
 308     int ret;
 309     int participate = (context->newcomm->c_local_group->grp_my_rank != MPI_UNDEFINED);
 310 
 311     if (OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) {
 312         return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0);
 313     }
 314 
 315     if (ompi_comm_cid_lowest_id < my_id) {
 316         OPAL_THREAD_UNLOCK(&ompi_cid_lock);
 317         return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0);
 318     }
 319 
 320     ompi_comm_cid_lowest_id = my_id;
 321 
 322     
 323 
 324 
 325     if( participate ){
 326         flag = false;
 327         context->nextlocal_cid = mca_pml.pml_max_contextid;
 328         for (unsigned int i = context->start ; i < mca_pml.pml_max_contextid ; ++i) {
 329             flag = opal_pointer_array_test_and_set_item (&ompi_mpi_communicators, i,
 330                                                          context->comm);
 331             if (true == flag) {
 332                 context->nextlocal_cid = i;
 333                 break;
 334             }
 335         }
 336     } else {
 337         context->nextlocal_cid = 0;
 338     }
 339 
 340     ret = context->allreduce_fn (&context->nextlocal_cid, &context->nextcid, 1, MPI_MAX,
 341                                  context, &subreq);
 342     
 343 
 344 
 345     if (OMPI_SUCCESS != ret) {
 346         goto err_exit;
 347     }
 348 
 349     if ( ((unsigned int) context->nextlocal_cid == mca_pml.pml_max_contextid) ) {
 350         
 351         ret = OMPI_ERR_OUT_OF_RESOURCE;
 352         goto err_exit;
 353     }
 354     OPAL_THREAD_UNLOCK(&ompi_cid_lock);
 355 
 356     
 357     return ompi_comm_request_schedule_append (request, ompi_comm_checkcid, &subreq, 1);
 358 err_exit:
 359     if (participate && flag) {
 360         opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, context->nextlocal_cid, NULL);
 361     }
 362     ompi_comm_cid_lowest_id = INT64_MAX;
 363     OPAL_THREAD_UNLOCK(&ompi_cid_lock);
 364     return ret;
 365 
 366 }
 367 
 368 static int ompi_comm_checkcid (ompi_comm_request_t *request)
 369 {
 370     ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context;
 371     ompi_request_t *subreq;
 372     int ret;
 373     int participate = (context->newcomm->c_local_group->grp_my_rank != MPI_UNDEFINED);
 374 
 375     if (OMPI_SUCCESS != request->super.req_status.MPI_ERROR) {
 376         if (participate) {
 377             opal_pointer_array_set_item(&ompi_mpi_communicators, context->nextlocal_cid, NULL);
 378         }
 379         return request->super.req_status.MPI_ERROR;
 380     }
 381 
 382     if (OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) {
 383         return ompi_comm_request_schedule_append (request, ompi_comm_checkcid, NULL, 0);
 384     }
 385 
 386     if( !participate ){
 387         context->flag = 1;
 388     } else {
 389         context->flag = (context->nextcid == context->nextlocal_cid);
 390         if ( participate && !context->flag) {
 391             opal_pointer_array_set_item(&ompi_mpi_communicators, context->nextlocal_cid, NULL);
 392 
 393             context->flag = opal_pointer_array_test_and_set_item (&ompi_mpi_communicators,
 394                                                                   context->nextcid, context->comm);
 395         }
 396     }
 397 
 398     ++context->iter;
 399 
 400     ret = context->allreduce_fn (&context->flag, &context->rflag, 1, MPI_MIN, context, &subreq);
 401     if (OMPI_SUCCESS == ret) {
 402         ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, &subreq, 1);
 403     } else {
 404         if (participate && context->flag ) {
 405             opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, context->nextlocal_cid, NULL);
 406         }
 407         ompi_comm_cid_lowest_id = INT64_MAX;
 408     }
 409 
 410     OPAL_THREAD_UNLOCK(&ompi_cid_lock);
 411     return ret;
 412 }
 413 
 414 static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request)
 415 {
 416     ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context;
 417     int participate = (context->newcomm->c_local_group->grp_my_rank != MPI_UNDEFINED);
 418 
 419     if (OMPI_SUCCESS != request->super.req_status.MPI_ERROR) {
 420         if (participate) {
 421             opal_pointer_array_set_item(&ompi_mpi_communicators, context->nextcid, NULL);
 422         }
 423         return request->super.req_status.MPI_ERROR;
 424     }
 425 
 426     if (OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) {
 427         return ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, NULL, 0);
 428     }
 429 
 430     if (0 != context->rflag) {
 431         if( !participate ) {
 432             
 433 
 434 
 435 
 436             context->nextlocal_cid = mca_pml.pml_max_contextid;
 437             for (unsigned int i = context->start ; i < mca_pml.pml_max_contextid ; ++i) {
 438                 bool flag;
 439                 flag = opal_pointer_array_test_and_set_item (&ompi_mpi_communicators, i,
 440                                                                 context->comm);
 441                 if (true == flag) {
 442                     context->nextlocal_cid = i;
 443                     break;
 444                 }
 445             }
 446             context->nextcid = context->nextlocal_cid;
 447         }
 448 
 449         
 450         context->newcomm->c_contextid = context->nextcid;
 451         opal_pointer_array_set_item (&ompi_mpi_communicators, context->nextcid, context->newcomm);
 452 
 453         
 454         ompi_comm_cid_lowest_id = INT64_MAX;
 455         OPAL_THREAD_UNLOCK(&ompi_cid_lock);
 456 
 457         
 458         return OMPI_SUCCESS;
 459     }
 460 
 461     if (participate && (0 != context->flag)) {
 462         
 463         opal_pointer_array_set_item (&ompi_mpi_communicators, context->nextcid, NULL);
 464         context->start = context->nextcid + 1; 
 465     }
 466 
 467     ++context->iter;
 468 
 469     OPAL_THREAD_UNLOCK(&ompi_cid_lock);
 470 
 471     
 472     return ompi_comm_allreduce_getnextcid (request);
 473 }
 474 
 475 
 476 
 477 
 478 
 479 
 480 
 481 
 482 
 483 
 484 
 485 
 486 
 487 
 488 
 489 
 490 
 491 
 492 
 493 
 494 static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request);
 495 
 496 int ompi_comm_activate_nb (ompi_communicator_t **newcomm, ompi_communicator_t *comm,
 497                            ompi_communicator_t *bridgecomm, const void *arg0,
 498                            const void *arg1, bool send_first, int mode, ompi_request_t **req)
 499 {
 500     ompi_comm_cid_context_t *context;
 501     ompi_comm_request_t *request;
 502     ompi_request_t *subreq;
 503     int ret = 0;
 504 
 505     context = mca_comm_cid_context_alloc (*newcomm, comm, bridgecomm, arg0, arg1, "activate",
 506                                           send_first, mode);
 507     if (NULL == context) {
 508         return OMPI_ERR_OUT_OF_RESOURCE;
 509     }
 510 
 511     
 512     context->newcommp = newcomm;
 513 
 514     request = ompi_comm_request_get ();
 515     if (NULL == request) {
 516         OBJ_RELEASE(context);
 517         return OMPI_ERR_OUT_OF_RESOURCE;
 518     }
 519 
 520     request->context = &context->super;
 521 
 522     if (MPI_UNDEFINED != (*newcomm)->c_local_group->grp_my_rank) {
 523         
 524         if ( OMPI_SUCCESS != (ret = MCA_PML_CALL(add_comm(*newcomm))) ) {
 525             OBJ_RELEASE(*newcomm);
 526             OBJ_RELEASE(context);
 527             *newcomm = MPI_COMM_NULL;
 528             return ret;
 529         }
 530         OMPI_COMM_SET_PML_ADDED(*newcomm);
 531     }
 532 
 533     
 534 
 535 
 536     ret = context->allreduce_fn (&context->ok, &context->ok, 1, MPI_MIN, context,
 537                                  &subreq);
 538     if (OMPI_SUCCESS != ret) {
 539         ompi_comm_request_return (request);
 540         return ret;
 541     }
 542 
 543     ompi_comm_request_schedule_append (request, ompi_comm_activate_nb_complete, &subreq, 1);
 544     ompi_comm_request_start (request);
 545 
 546     *req = &request->super;
 547 
 548     return OMPI_SUCCESS;
 549 }
 550 
 551 int ompi_comm_activate (ompi_communicator_t **newcomm, ompi_communicator_t *comm,
 552                         ompi_communicator_t *bridgecomm, const void *arg0,
 553                         const void *arg1, bool send_first, int mode)
 554 {
 555     ompi_request_t *req;
 556     int rc;
 557 
 558     rc = ompi_comm_activate_nb (newcomm, comm, bridgecomm, arg0, arg1, send_first, mode, &req);
 559     if (OMPI_SUCCESS != rc) {
 560         return rc;
 561     }
 562 
 563     ompi_request_wait_completion (req);
 564     rc = req->req_status.MPI_ERROR;
 565     ompi_comm_request_return ((ompi_comm_request_t *) req);
 566 
 567     return rc;
 568 }
 569 
 570 static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request)
 571 {
 572     ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context;
 573     int ret;
 574 
 575     
 576 
 577 
 578 
 579 
 580 
 581 
 582 
 583 
 584 
 585 
 586 
 587 
 588 
 589 
 590 
 591 
 592 
 593 
 594 
 595 
 596 
 597 
 598 
 599     if (MPI_UNDEFINED == (context->newcomm)->c_local_group->grp_my_rank) {
 600         return OMPI_SUCCESS;
 601     }
 602 
 603     
 604 
 605     if (OMPI_SUCCESS != (ret = mca_coll_base_comm_select(context->newcomm))) {
 606         OBJ_RELEASE(context->newcomm);
 607         *context->newcommp = MPI_COMM_NULL;
 608         return ret;
 609     }
 610 
 611     
 612 
 613 
 614 
 615 
 616 
 617 
 618 
 619 
 620 
 621 
 622 
 623 
 624 
 625 
 626 
 627 
 628     if (OMPI_COMM_IS_INTER(context->newcomm)) {
 629         if (OMPI_COMM_CID_IS_LOWER(context->newcomm, context->comm)) {
 630             OMPI_COMM_SET_EXTRA_RETAIN (context->newcomm);
 631             OBJ_RETAIN (context->newcomm);
 632         }
 633     }
 634 
 635     
 636     return OMPI_SUCCESS;
 637 }
 638 
 639 
 640 
 641 
 642 static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf, int count, struct ompi_op_t *op,
 643                                          ompi_comm_cid_context_t *context, ompi_request_t **req)
 644 {
 645     ompi_communicator_t *comm = context->comm;
 646 
 647     return comm->c_coll->coll_iallreduce (inbuf, outbuf, count, MPI_INT, op, comm,
 648                                          req, comm->c_coll->coll_iallreduce_module);
 649 }
 650 
 651 
 652 static int ompi_comm_allreduce_inter_leader_exchange (ompi_comm_request_t *request);
 653 static int ompi_comm_allreduce_inter_leader_reduce (ompi_comm_request_t *request);
 654 static int ompi_comm_allreduce_inter_bcast (ompi_comm_request_t *request);
 655 
 656 static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf,
 657                                          int count, struct ompi_op_t *op,
 658                                          ompi_comm_cid_context_t *cid_context,
 659                                          ompi_request_t **req)
 660 {
 661     ompi_communicator_t *intercomm = cid_context->comm;
 662     ompi_comm_allreduce_context_t *context;
 663     ompi_comm_request_t *request;
 664     ompi_request_t *subreq;
 665     int local_rank, rc;
 666 
 667     if (!OMPI_COMM_IS_INTER (cid_context->comm)) {
 668         return MPI_ERR_COMM;
 669     }
 670 
 671     request = ompi_comm_request_get ();
 672     if (OPAL_UNLIKELY(NULL == request)) {
 673         return OMPI_ERR_OUT_OF_RESOURCE;
 674     }
 675 
 676     context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context);
 677     if (OPAL_UNLIKELY(NULL == context)) {
 678         ompi_comm_request_return (request);
 679         return OMPI_ERR_OUT_OF_RESOURCE;
 680     }
 681 
 682     request->context = &context->super;
 683 
 684     
 685     local_rank = ompi_comm_rank (intercomm);
 686 
 687     if (0 == local_rank) {
 688         context->tmpbuf  = (int *) calloc (count, sizeof(int));
 689         if (OPAL_UNLIKELY (NULL == context->tmpbuf)) {
 690             ompi_comm_request_return (request);
 691             return OMPI_ERR_OUT_OF_RESOURCE;
 692         }
 693     }
 694 
 695     
 696 
 697     rc = intercomm->c_local_comm->c_coll->coll_ireduce (inbuf, context->tmpbuf, count, MPI_INT, op, 0,
 698                                                        intercomm->c_local_comm, &subreq,
 699                                                        intercomm->c_local_comm->c_coll->coll_ireduce_module);
 700     if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
 701         ompi_comm_request_return (request);
 702         return rc;
 703     }
 704 
 705     if (0 == local_rank) {
 706         ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_leader_exchange, &subreq, 1);
 707     } else {
 708         ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_bcast, &subreq, 1);
 709     }
 710 
 711     ompi_comm_request_start (request);
 712     *req = &request->super;
 713 
 714     return OMPI_SUCCESS;
 715 }
 716 
 717 
 718 static int ompi_comm_allreduce_inter_leader_exchange (ompi_comm_request_t *request)
 719 {
 720     ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
 721     ompi_communicator_t *intercomm = context->cid_context->comm;
 722     ompi_request_t *subreqs[2];
 723     int rc;
 724 
 725     
 726 
 727     rc = MCA_PML_CALL(irecv (context->outbuf, context->count, MPI_INT, 0, OMPI_COMM_ALLREDUCE_TAG,
 728                              intercomm, subreqs));
 729     if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
 730         return rc;
 731     }
 732 
 733     rc = MCA_PML_CALL(isend (context->tmpbuf, context->count, MPI_INT, 0, OMPI_COMM_ALLREDUCE_TAG,
 734                              MCA_PML_BASE_SEND_STANDARD, intercomm, subreqs + 1));
 735     if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
 736         return rc;
 737     }
 738 
 739     return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_leader_reduce, subreqs, 2);
 740 }
 741 
 742 static int ompi_comm_allreduce_inter_leader_reduce (ompi_comm_request_t *request)
 743 {
 744     ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
 745 
 746     ompi_op_reduce (context->op, context->tmpbuf, context->outbuf, context->count, MPI_INT);
 747 
 748     return ompi_comm_allreduce_inter_bcast (request);
 749 }
 750 
 751 
 752 static int ompi_comm_allreduce_inter_bcast (ompi_comm_request_t *request)
 753 {
 754     ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
 755     ompi_communicator_t *comm = context->cid_context->comm->c_local_comm;
 756     ompi_request_t *subreq;
 757     int rc;
 758 
 759     
 760     rc = comm->c_coll->coll_ibcast (context->outbuf, context->count, MPI_INT, 0, comm,
 761                                    &subreq, comm->c_coll->coll_ibcast_module);
 762     if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
 763         return rc;
 764     }
 765 
 766     return ompi_comm_request_schedule_append (request, NULL, &subreq, 1);
 767 }
 768 
 769 static int ompi_comm_allreduce_bridged_schedule_bcast (ompi_comm_request_t *request)
 770 {
 771     ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
 772     ompi_communicator_t *comm = context->cid_context->comm;
 773     ompi_request_t *subreq;
 774     int rc;
 775 
 776     rc = comm->c_coll->coll_ibcast (context->outbuf, context->count, MPI_INT,
 777                                    context->cid_context->local_leader, comm,
 778                                    &subreq, comm->c_coll->coll_ibcast_module);
 779     if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
 780         return rc;
 781     }
 782 
 783     return ompi_comm_request_schedule_append (request, NULL, &subreq, 1);
 784 }
 785 
 786 static int ompi_comm_allreduce_bridged_xchng_complete (ompi_comm_request_t *request)
 787 {
 788     ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
 789 
 790     
 791     ompi_op_reduce (context->op, context->tmpbuf, context->outbuf, context->count, MPI_INT);
 792 
 793     
 794     return ompi_comm_allreduce_bridged_schedule_bcast (request);
 795 }
 796 
 797 static int ompi_comm_allreduce_bridged_reduce_complete (ompi_comm_request_t *request)
 798 {
 799     ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
 800     ompi_communicator_t *bridgecomm = context->cid_context->bridgecomm;
 801     ompi_request_t *subreq[2];
 802     int rc;
 803 
 804     
 805     rc = MCA_PML_CALL(irecv (context->outbuf, context->count, MPI_INT, context->cid_context->remote_leader,
 806                              OMPI_COMM_ALLREDUCE_TAG, bridgecomm, subreq + 1));
 807     if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
 808         return rc;
 809     }
 810 
 811     rc = MCA_PML_CALL(isend (context->tmpbuf, context->count, MPI_INT, context->cid_context->remote_leader,
 812                              OMPI_COMM_ALLREDUCE_TAG, MCA_PML_BASE_SEND_STANDARD, bridgecomm,
 813                              subreq));
 814     if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
 815         return rc;
 816     }
 817 
 818     return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_bridged_xchng_complete, subreq, 2);
 819 }
 820 
 821 static int ompi_comm_allreduce_intra_bridge_nb (int *inbuf, int *outbuf,
 822                                                 int count, struct ompi_op_t *op,
 823                                                 ompi_comm_cid_context_t *cid_context,
 824                                                 ompi_request_t **req)
 825 {
 826     ompi_communicator_t *comm = cid_context->comm;
 827     ompi_comm_allreduce_context_t *context;
 828     int local_rank = ompi_comm_rank (comm);
 829     ompi_comm_request_t *request;
 830     ompi_request_t *subreq;
 831     int rc;
 832 
 833     context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context);
 834     if (OPAL_UNLIKELY(NULL == context)) {
 835         return OMPI_ERR_OUT_OF_RESOURCE;
 836     }
 837 
 838     if (local_rank == cid_context->local_leader) {
 839         context->tmpbuf = (int *) calloc (count, sizeof (int));
 840         if (OPAL_UNLIKELY(NULL == context->tmpbuf)) {
 841             OBJ_RELEASE(context);
 842             return OMPI_ERR_OUT_OF_RESOURCE;
 843         }
 844     }
 845 
 846     request = ompi_comm_request_get ();
 847     if (OPAL_UNLIKELY(NULL == request)) {
 848         OBJ_RELEASE(context);
 849         return OMPI_ERR_OUT_OF_RESOURCE;
 850     }
 851 
 852     request->context = &context->super;
 853 
 854     if (cid_context->local_leader == local_rank) {
 855         memcpy (context->tmpbuf, inbuf, count * sizeof (int));
 856     }
 857 
 858     
 859     rc = comm->c_coll->coll_ireduce (inbuf, context->tmpbuf, count, MPI_INT, op,
 860                                     cid_context->local_leader, comm, &subreq,
 861                                     comm->c_coll->coll_ireduce_module);
 862     if ( OMPI_SUCCESS != rc ) {
 863         ompi_comm_request_return (request);
 864         return rc;
 865     }
 866 
 867     if (cid_context->local_leader == local_rank) {
 868         rc = ompi_comm_request_schedule_append (request, ompi_comm_allreduce_bridged_reduce_complete,
 869                                                 &subreq, 1);
 870     } else {
 871         
 872         ompi_comm_request_schedule_append (request, NULL, &subreq, 1);
 873 
 874         rc = ompi_comm_allreduce_bridged_schedule_bcast (request);
 875     }
 876 
 877     if (OMPI_SUCCESS != rc) {
 878         ompi_comm_request_return (request);
 879         return rc;
 880     }
 881 
 882     ompi_comm_request_start (request);
 883 
 884     *req = &request->super;
 885 
 886     return OMPI_SUCCESS;
 887 }
 888 
 889 static int ompi_comm_allreduce_pmix_reduce_complete (ompi_comm_request_t *request)
 890 {
 891     ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
 892     ompi_comm_cid_context_t *cid_context = context->cid_context;
 893     int32_t size_count = context->count;
 894     opal_value_t info;
 895     opal_pmix_pdata_t pdat;
 896     opal_buffer_t sbuf;
 897     int rc;
 898     int bytes_written;
 899     const int output_id = 0;
 900     const int verbosity_level = 1;
 901 
 902     OBJ_CONSTRUCT(&sbuf, opal_buffer_t);
 903 
 904     if (OPAL_SUCCESS != (rc = opal_dss.pack(&sbuf, context->tmpbuf, (int32_t)context->count, OPAL_INT))) {
 905         OBJ_DESTRUCT(&sbuf);
 906         opal_output_verbose (verbosity_level, output_id, "pack failed. rc  %d\n", rc);
 907         return rc;
 908     }
 909 
 910     OBJ_CONSTRUCT(&info, opal_value_t);
 911     OBJ_CONSTRUCT(&pdat, opal_pmix_pdata_t);
 912 
 913     info.type = OPAL_BYTE_OBJECT;
 914     pdat.value.type = OPAL_BYTE_OBJECT;
 915 
 916     opal_dss.unload(&sbuf, (void**)&info.data.bo.bytes, &info.data.bo.size);
 917     OBJ_DESTRUCT(&sbuf);
 918 
 919     bytes_written = opal_asprintf(&info.key,
 920                              cid_context->send_first ? "%s:%s:send:%d"
 921                                                      : "%s:%s:recv:%d",
 922                              cid_context->port_string,
 923                              cid_context->pmix_tag,
 924                              cid_context->iter);
 925 
 926     if (bytes_written == -1) {
 927         opal_output_verbose (verbosity_level, output_id, "writing info.key failed\n");
 928     } else {
 929         bytes_written = opal_asprintf(&pdat.value.key,
 930                                  cid_context->send_first ? "%s:%s:recv:%d"
 931                                                          : "%s:%s:send:%d",
 932                                  cid_context->port_string,
 933                                  cid_context->pmix_tag,
 934                                  cid_context->iter);
 935 
 936         if (bytes_written == -1) {
 937             opal_output_verbose (verbosity_level, output_id, "writing pdat.value.key failed\n");
 938         }
 939     }
 940 
 941     if (bytes_written == -1) {
 942         
 943         
 944         opal_output_verbose (verbosity_level, output_id, "send first: %d\n", cid_context->send_first);
 945         opal_output_verbose (verbosity_level, output_id, "port string: %s\n", cid_context->port_string);
 946         opal_output_verbose (verbosity_level, output_id, "pmix tag: %s\n", cid_context->pmix_tag);
 947         opal_output_verbose (verbosity_level, output_id, "iter: %d\n", cid_context->iter);
 948         return OMPI_ERR_OUT_OF_RESOURCE;
 949     }
 950 
 951     
 952 
 953     OPAL_PMIX_EXCHANGE(rc, &info, &pdat, 600);  
 954     OBJ_DESTRUCT(&info);
 955     if (OPAL_SUCCESS != rc) {
 956         OBJ_DESTRUCT(&pdat);
 957         return rc;
 958     }
 959 
 960     OBJ_CONSTRUCT(&sbuf, opal_buffer_t);
 961     opal_dss.load(&sbuf, pdat.value.data.bo.bytes, pdat.value.data.bo.size);
 962     pdat.value.data.bo.bytes = NULL;
 963     pdat.value.data.bo.size = 0;
 964     OBJ_DESTRUCT(&pdat);
 965 
 966     rc = opal_dss.unpack (&sbuf, context->outbuf, &size_count, OPAL_INT);
 967     OBJ_DESTRUCT(&sbuf);
 968     if (OPAL_UNLIKELY(OPAL_SUCCESS != rc)) {
 969         return rc;
 970     }
 971 
 972     ompi_op_reduce (context->op, context->tmpbuf, context->outbuf, size_count, MPI_INT);
 973 
 974     return ompi_comm_allreduce_bridged_schedule_bcast (request);
 975 }
 976 
 977 static int ompi_comm_allreduce_intra_pmix_nb (int *inbuf, int *outbuf,
 978                                               int count, struct ompi_op_t *op,
 979                                               ompi_comm_cid_context_t *cid_context,
 980                                               ompi_request_t **req)
 981 {
 982     ompi_communicator_t *comm = cid_context->comm;
 983     ompi_comm_allreduce_context_t *context;
 984     int local_rank = ompi_comm_rank (comm);
 985     ompi_comm_request_t *request;
 986     ompi_request_t *subreq;
 987     int rc;
 988 
 989     context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context);
 990     if (OPAL_UNLIKELY(NULL == context)) {
 991         return OMPI_ERR_OUT_OF_RESOURCE;
 992     }
 993 
 994     if (cid_context->local_leader == local_rank) {
 995         context->tmpbuf = (int *) calloc (count, sizeof(int));
 996         if (OPAL_UNLIKELY(NULL == context->tmpbuf)) {
 997             OBJ_RELEASE(context);
 998             return OMPI_ERR_OUT_OF_RESOURCE;
 999         }
1000     }
1001 
1002     request = ompi_comm_request_get ();
1003     if (NULL == request) {
1004         OBJ_RELEASE(context);
1005         return OMPI_ERR_OUT_OF_RESOURCE;
1006     }
1007 
1008     request->context = &context->super;
1009 
1010     
1011     rc = comm->c_coll->coll_ireduce (inbuf, context->tmpbuf, count, MPI_INT, op,
1012                                     cid_context->local_leader, comm,
1013                                     &subreq, comm->c_coll->coll_ireduce_module);
1014     if ( OMPI_SUCCESS != rc ) {
1015         ompi_comm_request_return (request);
1016         return rc;
1017     }
1018 
1019     if (cid_context->local_leader == local_rank) {
1020         rc = ompi_comm_request_schedule_append (request, ompi_comm_allreduce_pmix_reduce_complete,
1021                                                 &subreq, 1);
1022     } else {
1023         
1024         rc = ompi_comm_request_schedule_append (request, NULL, &subreq, 1);
1025 
1026         rc = ompi_comm_allreduce_bridged_schedule_bcast (request);
1027     }
1028 
1029     if (OMPI_SUCCESS != rc) {
1030         ompi_comm_request_return (request);
1031         return rc;
1032     }
1033 
1034     ompi_comm_request_start (request);
1035     *req = (ompi_request_t *) request;
1036 
1037     
1038     return OMPI_SUCCESS;
1039 }
1040 
1041 static int ompi_comm_allreduce_group_broadcast (ompi_comm_request_t *request)
1042 {
1043     ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
1044     ompi_comm_cid_context_t *cid_context = context->cid_context;
1045     ompi_request_t *subreq[2];
1046     int subreq_count = 0;
1047     int rc;
1048 
1049     for (int i = 0 ; i < 2 ; ++i) {
1050         if (MPI_PROC_NULL != context->peers_comm[i + 1]) {
1051             rc = MCA_PML_CALL(isend(context->outbuf, context->count, MPI_INT, context->peers_comm[i+1],
1052                                     cid_context->pml_tag, MCA_PML_BASE_SEND_STANDARD,
1053                                     cid_context->comm, subreq + subreq_count++));
1054             if (OMPI_SUCCESS != rc) {
1055                 return rc;
1056             }
1057         }
1058     }
1059 
1060     return ompi_comm_request_schedule_append (request, NULL, subreq, subreq_count);
1061 }
1062 
1063 static int ompi_comm_allreduce_group_recv_complete (ompi_comm_request_t *request)
1064 {
1065     ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
1066     ompi_comm_cid_context_t *cid_context = context->cid_context;
1067     int *tmp = context->tmpbuf;
1068     ompi_request_t *subreq[2];
1069     int rc;
1070 
1071     for (int i = 0 ; i < 2 ; ++i) {
1072         if (MPI_PROC_NULL != context->peers_comm[i + 1]) {
1073             ompi_op_reduce (context->op, tmp, context->outbuf, context->count, MPI_INT);
1074             tmp += context->count;
1075         }
1076     }
1077 
1078     if (MPI_PROC_NULL != context->peers_comm[0]) {
1079         
1080         rc = MCA_PML_CALL(isend(context->outbuf, context->count, MPI_INT, context->peers_comm[0],
1081                                 cid_context->pml_tag, MCA_PML_BASE_SEND_STANDARD,
1082                                 cid_context->comm, subreq));
1083         if (OMPI_SUCCESS != rc) {
1084             return rc;
1085         }
1086 
1087         rc = MCA_PML_CALL(irecv(context->outbuf, context->count, MPI_INT, context->peers_comm[0],
1088                                 cid_context->pml_tag, cid_context->comm, subreq + 1));
1089         if (OMPI_SUCCESS != rc) {
1090             return rc;
1091         }
1092 
1093         return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_group_broadcast, subreq, 2);
1094     }
1095 
1096     
1097     return ompi_comm_allreduce_group_broadcast (request);
1098 }
1099 
1100 static int ompi_comm_allreduce_group_nb (int *inbuf, int *outbuf, int count,
1101                                          struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context,
1102                                          ompi_request_t **req)
1103 {
1104     ompi_group_t *group = cid_context->newcomm->c_local_group;
1105     const int group_size = ompi_group_size (group);
1106     const int group_rank = ompi_group_rank (group);
1107     ompi_communicator_t *comm = cid_context->comm;
1108     int peers_group[3], *tmp, subreq_count = 0;
1109     ompi_comm_allreduce_context_t *context;
1110     ompi_comm_request_t *request;
1111     ompi_request_t *subreq[3];
1112 
1113     context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context);
1114     if (NULL == context) {
1115         return OMPI_ERR_OUT_OF_RESOURCE;
1116     }
1117 
1118     tmp = context->tmpbuf = calloc (sizeof (int), count * 3);
1119     if (NULL == context->tmpbuf) {
1120         OBJ_RELEASE(context);
1121         return OMPI_ERR_OUT_OF_RESOURCE;
1122     }
1123 
1124     request = ompi_comm_request_get ();
1125     if (NULL == request) {
1126         OBJ_RELEASE(context);
1127         return OMPI_ERR_OUT_OF_RESOURCE;
1128     }
1129 
1130     request->context = &context->super;
1131 
1132     
1133     peers_group[0] = group_rank ? ((group_rank - 1) >> 1) : MPI_PROC_NULL;
1134     peers_group[1] = (group_rank * 2 + 1) < group_size ? group_rank * 2 + 1: MPI_PROC_NULL;
1135     peers_group[2] = (group_rank * 2 + 2) < group_size ? group_rank * 2 + 2 : MPI_PROC_NULL;
1136 
1137     
1138     ompi_group_translate_ranks (group, 3, peers_group, comm->c_local_group, context->peers_comm);
1139 
1140     
1141     memmove (outbuf, inbuf, sizeof (int) * count);
1142 
1143     for (int i = 0 ; i < 2 ; ++i) {
1144         if (MPI_PROC_NULL != context->peers_comm[i + 1]) {
1145             int rc = MCA_PML_CALL(irecv(tmp, count, MPI_INT, context->peers_comm[i + 1],
1146                                         cid_context->pml_tag, comm, subreq + subreq_count++));
1147             if (OMPI_SUCCESS != rc) {
1148                 ompi_comm_request_return (request);
1149                 return rc;
1150             }
1151 
1152             tmp += count;
1153         }
1154     }
1155 
1156     ompi_comm_request_schedule_append (request, ompi_comm_allreduce_group_recv_complete, subreq, subreq_count);
1157 
1158     ompi_comm_request_start (request);
1159     *req = &request->super;
1160 
1161     return OMPI_SUCCESS;
1162 }