root/ompi/communicator/comm_cid.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_comm_cid_context_construct
  2. mca_comm_cid_context_destruct
  3. ompi_comm_allreduce_context_construct
  4. ompi_comm_allreduce_context_destruct
  5. ompi_comm_cid_init
  6. mca_comm_cid_context_alloc
  7. ompi_comm_allreduce_context_alloc
  8. ompi_comm_nextcid_nb
  9. ompi_comm_nextcid
  10. ompi_comm_allreduce_getnextcid
  11. ompi_comm_checkcid
  12. ompi_comm_nextcid_check_flag
  13. ompi_comm_activate_nb
  14. ompi_comm_activate
  15. ompi_comm_activate_nb_complete
  16. ompi_comm_allreduce_intra_nb
  17. ompi_comm_allreduce_inter_nb
  18. ompi_comm_allreduce_inter_leader_exchange
  19. ompi_comm_allreduce_inter_leader_reduce
  20. ompi_comm_allreduce_inter_bcast
  21. ompi_comm_allreduce_bridged_schedule_bcast
  22. ompi_comm_allreduce_bridged_xchng_complete
  23. ompi_comm_allreduce_bridged_reduce_complete
  24. ompi_comm_allreduce_intra_bridge_nb
  25. ompi_comm_allreduce_pmix_reduce_complete
  26. ompi_comm_allreduce_intra_pmix_nb
  27. ompi_comm_allreduce_group_broadcast
  28. ompi_comm_allreduce_group_recv_complete
  29. ompi_comm_allreduce_group_nb

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   4  *                         University Research and Technology
   5  *                         Corporation.  All rights reserved.
   6  * Copyright (c) 2004-2017 The University of Tennessee and The University
   7  *                         of Tennessee Research Foundation.  All rights
   8  *                         reserved.
   9  * Copyright (c) 2004-2008 High Performance Computing Center Stuttgart,
  10  *                         University of Stuttgart.  All rights reserved.
  11  * Copyright (c) 2004-2005 The Regents of the University of California.
  12  *                         All rights reserved.
  13  * Copyright (c) 2007      Cisco Systems, Inc.  All rights reserved.
  14  * Copyright (c) 2007      Voltaire All rights reserved.
  15  * Copyright (c) 2006-2010 University of Houston.  All rights reserved.
  16  * Copyright (c) 2009      Sun Microsystems, Inc.  All rights reserved.
  17  * Copyright (c) 2012-2016 Los Alamos National Security, LLC.  All rights
  18  *                         reserved.
  19  * Copyright (c) 2012      Oak Ridge National Labs.  All rights reserved.
  20  * Copyright (c) 2013-2016 Intel, Inc.  All rights reserved.
  21  * Copyright (c) 2014-2016 Research Organization for Information Science
  22  *                         and Technology (RIST). All rights reserved.
  23  * Copyright (c) 2016      IBM Corporation.  All rights reserved.
  24  * Copyright (c) 2017      Mellanox Technologies. All rights reserved.
  25  * Copyright (c) 2018      Amazon.com, Inc. or its affiliates.  All Rights reserved.
  26  * $COPYRIGHT$
  27  *
  28  * Additional copyrights may follow
  29  *
  30  * $HEADER$
  31  */
  32 
  33 #include "ompi_config.h"
  34 
  35 #include "opal/dss/dss.h"
  36 #include "opal/mca/pmix/pmix.h"
  37 #include "opal/util/printf.h"
  38 
  39 #include "ompi/proc/proc.h"
  40 #include "ompi/communicator/communicator.h"
  41 #include "ompi/op/op.h"
  42 #include "ompi/constants.h"
  43 #include "opal/class/opal_pointer_array.h"
  44 #include "opal/class/opal_list.h"
  45 #include "ompi/mca/pml/pml.h"
  46 #include "ompi/mca/rte/rte.h"
  47 #include "ompi/mca/coll/base/base.h"
  48 #include "ompi/request/request.h"
  49 #include "ompi/runtime/mpiruntime.h"
  50 
  51 struct ompi_comm_cid_context_t;
  52 
  53 typedef int (*ompi_comm_allreduce_impl_fn_t) (int *inbuf, int *outbuf, int count, struct ompi_op_t *op,
  54                                               struct ompi_comm_cid_context_t *cid_context,
  55                                               ompi_request_t **req);
  56 
  57 
  58 struct ompi_comm_cid_context_t {
  59     opal_object_t super;
  60 
  61     ompi_communicator_t *newcomm;
  62     ompi_communicator_t **newcommp;
  63     ompi_communicator_t *comm;
  64     ompi_communicator_t *bridgecomm;
  65 
  66     ompi_comm_allreduce_impl_fn_t allreduce_fn;
  67 
  68     int nextcid;
  69     int nextlocal_cid;
  70     int start;
  71     int flag, rflag;
  72     int local_leader;
  73     int remote_leader;
  74     int iter;
  75     /** storage for activate barrier */
  76     int ok;
  77     char *port_string;
  78     bool send_first;
  79     int pml_tag;
  80     char *pmix_tag;
  81 };
  82 
  83 typedef struct ompi_comm_cid_context_t ompi_comm_cid_context_t;
  84 
  85 static void mca_comm_cid_context_construct (ompi_comm_cid_context_t *context)
  86 {
  87     memset ((void *) ((intptr_t) context + sizeof (context->super)), 0, sizeof (*context) - sizeof (context->super));
  88 }
  89 
  90 static void mca_comm_cid_context_destruct (ompi_comm_cid_context_t *context)
  91 {
  92     free (context->port_string);
  93     free (context->pmix_tag);
  94 }
  95 
  96 OBJ_CLASS_INSTANCE (ompi_comm_cid_context_t, opal_object_t,
  97                     mca_comm_cid_context_construct,
  98                     mca_comm_cid_context_destruct);
  99 
 100 struct ompi_comm_allreduce_context_t {
 101     opal_object_t super;
 102 
 103     int *inbuf;
 104     int *outbuf;
 105     int count;
 106     struct ompi_op_t *op;
 107     ompi_comm_cid_context_t *cid_context;
 108     int *tmpbuf;
 109 
 110     /* for group allreduce */
 111     int peers_comm[3];
 112 };
 113 
 114 typedef struct ompi_comm_allreduce_context_t ompi_comm_allreduce_context_t;
 115 
 116 static void ompi_comm_allreduce_context_construct (ompi_comm_allreduce_context_t *context)
 117 {
 118     memset ((void *) ((intptr_t) context + sizeof (context->super)), 0, sizeof (*context) - sizeof (context->super));
 119 }
 120 
 121 static void ompi_comm_allreduce_context_destruct (ompi_comm_allreduce_context_t *context)
 122 {
 123     free (context->tmpbuf);
 124 }
 125 
 126 OBJ_CLASS_INSTANCE (ompi_comm_allreduce_context_t, opal_object_t,
 127                     ompi_comm_allreduce_context_construct,
 128                     ompi_comm_allreduce_context_destruct);
 129 
 130 /**
 131  * These functions make sure, that we determine the global result over
 132  * an intra communicators (simple), an inter-communicator and a
 133  * pseudo inter-communicator described by two separate intra-comms
 134  * and a bridge-comm (intercomm-create scenario).
 135  */
 136 
 137 /* non-blocking intracommunicator allreduce */
 138 static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf, int count,
 139                                          struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context,
 140                                          ompi_request_t **req);
 141 
 142 /* non-blocking intercommunicator allreduce */
 143 static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, int count,
 144                                          struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context,
 145                                          ompi_request_t **req);
 146 
 147 static int ompi_comm_allreduce_group_nb (int *inbuf, int *outbuf, int count,
 148                                          struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context,
 149                                          ompi_request_t **req);
 150 
 151 static int ompi_comm_allreduce_intra_pmix_nb (int *inbuf, int *outbuf, int count,
 152                                               struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context,
 153                                               ompi_request_t **req);
 154 
 155 static int ompi_comm_allreduce_intra_bridge_nb (int *inbuf, int *outbuf, int count,
 156                                                 struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context,
 157                                                 ompi_request_t **req);
 158 
 159 static opal_mutex_t ompi_cid_lock = OPAL_MUTEX_STATIC_INIT;
 160 
 161 
 162 int ompi_comm_cid_init (void)
 163 {
 164     return OMPI_SUCCESS;
 165 }
 166 
 167 static ompi_comm_cid_context_t *mca_comm_cid_context_alloc (ompi_communicator_t *newcomm, ompi_communicator_t *comm,
 168                                                             ompi_communicator_t *bridgecomm, const void *arg0,
 169                                                             const void *arg1, const char *pmix_tag, bool send_first,
 170                                                             int mode)
 171 {
 172     ompi_comm_cid_context_t *context;
 173 
 174     context = OBJ_NEW(ompi_comm_cid_context_t);
 175     if (OPAL_UNLIKELY(NULL == context)) {
 176         return NULL;
 177     }
 178 
 179     context->newcomm       = newcomm;
 180     context->comm          = comm;
 181     context->bridgecomm    = bridgecomm;
 182     context->pml_tag       = 0;
 183 
 184     /* Determine which implementation of allreduce we have to use
 185      * for the current mode. */
 186     switch (mode) {
 187     case OMPI_COMM_CID_INTRA:
 188         context->allreduce_fn = ompi_comm_allreduce_intra_nb;
 189         break;
 190     case OMPI_COMM_CID_INTER:
 191         context->allreduce_fn = ompi_comm_allreduce_inter_nb;
 192         break;
 193     case OMPI_COMM_CID_GROUP:
 194         context->allreduce_fn = ompi_comm_allreduce_group_nb;
 195         context->pml_tag = ((int *) arg0)[0];
 196         break;
 197     case OMPI_COMM_CID_INTRA_PMIX:
 198         context->allreduce_fn = ompi_comm_allreduce_intra_pmix_nb;
 199         context->local_leader = ((int *) arg0)[0];
 200         if (arg1) {
 201             context->port_string = strdup ((char *) arg1);
 202         }
 203         context->pmix_tag = strdup ((char *) pmix_tag);
 204         break;
 205     case OMPI_COMM_CID_INTRA_BRIDGE:
 206         context->allreduce_fn = ompi_comm_allreduce_intra_bridge_nb;
 207         context->local_leader = ((int *) arg0)[0];
 208         context->remote_leader = ((int *) arg1)[0];
 209         break;
 210     default:
 211         OBJ_RELEASE(context);
 212         return NULL;
 213     }
 214 
 215     context->send_first = send_first;
 216     context->iter = 0;
 217     context->ok = 1;
 218 
 219     return context;
 220 }
 221 
 222 static ompi_comm_allreduce_context_t *ompi_comm_allreduce_context_alloc (int *inbuf, int *outbuf,
 223                                                                          int count, struct ompi_op_t *op,
 224                                                                          ompi_comm_cid_context_t *cid_context)
 225 {
 226     ompi_comm_allreduce_context_t *context;
 227 
 228     context = OBJ_NEW(ompi_comm_allreduce_context_t);
 229     if (OPAL_UNLIKELY(NULL == context)) {
 230         return NULL;
 231     }
 232 
 233     context->inbuf = inbuf;
 234     context->outbuf = outbuf;
 235     context->count = count;
 236     context->op = op;
 237     context->cid_context = cid_context;
 238 
 239     return context;
 240 }
 241 
 242 /* find the next available local cid and start an allreduce */
 243 static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request);
 244 /* verify that the maximum cid is locally available and start an allreduce */
 245 static int ompi_comm_checkcid (ompi_comm_request_t *request);
 246 /* verify that the cid was available globally */
 247 static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request);
 248 
 249 static volatile int64_t ompi_comm_cid_lowest_id = INT64_MAX;
 250 
 251 int ompi_comm_nextcid_nb (ompi_communicator_t *newcomm, ompi_communicator_t *comm,
 252                           ompi_communicator_t *bridgecomm, const void *arg0, const void *arg1,
 253                           bool send_first, int mode, ompi_request_t **req)
 254 {
 255     ompi_comm_cid_context_t *context;
 256     ompi_comm_request_t *request;
 257 
 258     context = mca_comm_cid_context_alloc (newcomm, comm, bridgecomm, arg0, arg1,
 259                                           "nextcid", send_first, mode);
 260     if (NULL == context) {
 261         return OMPI_ERR_OUT_OF_RESOURCE;
 262     }
 263 
 264     context->start = ompi_mpi_communicators.lowest_free;
 265 
 266     request = ompi_comm_request_get ();
 267     if (NULL == request) {
 268         OBJ_RELEASE(context);
 269         return OMPI_ERR_OUT_OF_RESOURCE;
 270     }
 271 
 272     request->context = &context->super;
 273 
 274     ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0);
 275     ompi_comm_request_start (request);
 276 
 277     *req = &request->super;
 278 
 279 
 280     return OMPI_SUCCESS;
 281 }
 282 
 283 int ompi_comm_nextcid (ompi_communicator_t *newcomm, ompi_communicator_t *comm,
 284                        ompi_communicator_t *bridgecomm, const void *arg0, const void *arg1,
 285                        bool send_first, int mode)
 286 {
 287     ompi_request_t *req;
 288     int rc;
 289 
 290     rc = ompi_comm_nextcid_nb (newcomm, comm, bridgecomm, arg0, arg1, send_first, mode, &req);
 291     if (OMPI_SUCCESS != rc) {
 292         return rc;
 293     }
 294 
 295     ompi_request_wait_completion (req);
 296     rc = req->req_status.MPI_ERROR;
 297     ompi_comm_request_return ((ompi_comm_request_t *) req);
 298 
 299     return rc;
 300 }
 301 
 302 static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request)
 303 {
 304     ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context;
 305     int64_t my_id = ((int64_t) ompi_comm_get_cid (context->comm) << 32 | context->pml_tag);
 306     ompi_request_t *subreq;
 307     bool flag;
 308     int ret;
 309     int participate = (context->newcomm->c_local_group->grp_my_rank != MPI_UNDEFINED);
 310 
 311     if (OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) {
 312         return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0);
 313     }
 314 
 315     if (ompi_comm_cid_lowest_id < my_id) {
 316         OPAL_THREAD_UNLOCK(&ompi_cid_lock);
 317         return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0);
 318     }
 319 
 320     ompi_comm_cid_lowest_id = my_id;
 321 
 322     /**
 323      * This is the real algorithm described in the doc
 324      */
 325     if( participate ){
 326         flag = false;
 327         context->nextlocal_cid = mca_pml.pml_max_contextid;
 328         for (unsigned int i = context->start ; i < mca_pml.pml_max_contextid ; ++i) {
 329             flag = opal_pointer_array_test_and_set_item (&ompi_mpi_communicators, i,
 330                                                          context->comm);
 331             if (true == flag) {
 332                 context->nextlocal_cid = i;
 333                 break;
 334             }
 335         }
 336     } else {
 337         context->nextlocal_cid = 0;
 338     }
 339 
 340     ret = context->allreduce_fn (&context->nextlocal_cid, &context->nextcid, 1, MPI_MAX,
 341                                  context, &subreq);
 342     /* there was a failure during non-blocking collective
 343      * all we can do is abort
 344      */
 345     if (OMPI_SUCCESS != ret) {
 346         goto err_exit;
 347     }
 348 
 349     if ( ((unsigned int) context->nextlocal_cid == mca_pml.pml_max_contextid) ) {
 350         /* Our local CID space is out, others already aware (allreduce above) */
 351         ret = OMPI_ERR_OUT_OF_RESOURCE;
 352         goto err_exit;
 353     }
 354     OPAL_THREAD_UNLOCK(&ompi_cid_lock);
 355 
 356     /* next we want to verify that the resulting commid is ok */
 357     return ompi_comm_request_schedule_append (request, ompi_comm_checkcid, &subreq, 1);
 358 err_exit:
 359     if (participate && flag) {
 360         opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, context->nextlocal_cid, NULL);
 361     }
 362     ompi_comm_cid_lowest_id = INT64_MAX;
 363     OPAL_THREAD_UNLOCK(&ompi_cid_lock);
 364     return ret;
 365 
 366 }
 367 
 368 static int ompi_comm_checkcid (ompi_comm_request_t *request)
 369 {
 370     ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context;
 371     ompi_request_t *subreq;
 372     int ret;
 373     int participate = (context->newcomm->c_local_group->grp_my_rank != MPI_UNDEFINED);
 374 
 375     if (OMPI_SUCCESS != request->super.req_status.MPI_ERROR) {
 376         if (participate) {
 377             opal_pointer_array_set_item(&ompi_mpi_communicators, context->nextlocal_cid, NULL);
 378         }
 379         return request->super.req_status.MPI_ERROR;
 380     }
 381 
 382     if (OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) {
 383         return ompi_comm_request_schedule_append (request, ompi_comm_checkcid, NULL, 0);
 384     }
 385 
 386     if( !participate ){
 387         context->flag = 1;
 388     } else {
 389         context->flag = (context->nextcid == context->nextlocal_cid);
 390         if ( participate && !context->flag) {
 391             opal_pointer_array_set_item(&ompi_mpi_communicators, context->nextlocal_cid, NULL);
 392 
 393             context->flag = opal_pointer_array_test_and_set_item (&ompi_mpi_communicators,
 394                                                                   context->nextcid, context->comm);
 395         }
 396     }
 397 
 398     ++context->iter;
 399 
 400     ret = context->allreduce_fn (&context->flag, &context->rflag, 1, MPI_MIN, context, &subreq);
 401     if (OMPI_SUCCESS == ret) {
 402         ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, &subreq, 1);
 403     } else {
 404         if (participate && context->flag ) {
 405             opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, context->nextlocal_cid, NULL);
 406         }
 407         ompi_comm_cid_lowest_id = INT64_MAX;
 408     }
 409 
 410     OPAL_THREAD_UNLOCK(&ompi_cid_lock);
 411     return ret;
 412 }
 413 
 414 static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request)
 415 {
 416     ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context;
 417     int participate = (context->newcomm->c_local_group->grp_my_rank != MPI_UNDEFINED);
 418 
 419     if (OMPI_SUCCESS != request->super.req_status.MPI_ERROR) {
 420         if (participate) {
 421             opal_pointer_array_set_item(&ompi_mpi_communicators, context->nextcid, NULL);
 422         }
 423         return request->super.req_status.MPI_ERROR;
 424     }
 425 
 426     if (OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) {
 427         return ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, NULL, 0);
 428     }
 429 
 430     if (0 != context->rflag) {
 431         if( !participate ) {
 432             /* we need to provide something sane here
 433              * but we cannot use `nextcid` as we may have it
 434              * in-use, go ahead with next locally-available CID
 435              */
 436             context->nextlocal_cid = mca_pml.pml_max_contextid;
 437             for (unsigned int i = context->start ; i < mca_pml.pml_max_contextid ; ++i) {
 438                 bool flag;
 439                 flag = opal_pointer_array_test_and_set_item (&ompi_mpi_communicators, i,
 440                                                                 context->comm);
 441                 if (true == flag) {
 442                     context->nextlocal_cid = i;
 443                     break;
 444                 }
 445             }
 446             context->nextcid = context->nextlocal_cid;
 447         }
 448 
 449         /* set the according values to the newcomm */
 450         context->newcomm->c_contextid = context->nextcid;
 451         opal_pointer_array_set_item (&ompi_mpi_communicators, context->nextcid, context->newcomm);
 452 
 453         /* unlock the cid generator */
 454         ompi_comm_cid_lowest_id = INT64_MAX;
 455         OPAL_THREAD_UNLOCK(&ompi_cid_lock);
 456 
 457         /* done! */
 458         return OMPI_SUCCESS;
 459     }
 460 
 461     if (participate && (0 != context->flag)) {
 462         /* we could use this cid, but other don't agree */
 463         opal_pointer_array_set_item (&ompi_mpi_communicators, context->nextcid, NULL);
 464         context->start = context->nextcid + 1; /* that's where we can start the next round */
 465     }
 466 
 467     ++context->iter;
 468 
 469     OPAL_THREAD_UNLOCK(&ompi_cid_lock);
 470 
 471     /* try again */
 472     return ompi_comm_allreduce_getnextcid (request);
 473 }
 474 
 475 /**************************************************************************/
 476 /**************************************************************************/
 477 /**************************************************************************/
 478 /* This routine serves two purposes:
 479  * - the allreduce acts as a kind of Barrier,
 480  *   which avoids, that we have incoming fragments
 481  *   on the new communicator before everybody has set
 482  *   up the comm structure.
 483  * - some components (e.g. the collective MagPIe component
 484  *   might want to generate new communicators and communicate
 485  *   using the new comm. Thus, it can just be called after
 486  *   the 'barrier'.
 487  *
 488  * The reason that this routine is in comm_cid and not in
 489  * comm.c is, that this file contains the allreduce implementations
 490  * which are required, and thus we avoid having duplicate code...
 491  */
 492 
 493 /* Non-blocking version of ompi_comm_activate */
 494 static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request);
 495 
 496 int ompi_comm_activate_nb (ompi_communicator_t **newcomm, ompi_communicator_t *comm,
 497                            ompi_communicator_t *bridgecomm, const void *arg0,
 498                            const void *arg1, bool send_first, int mode, ompi_request_t **req)
 499 {
 500     ompi_comm_cid_context_t *context;
 501     ompi_comm_request_t *request;
 502     ompi_request_t *subreq;
 503     int ret = 0;
 504 
 505     context = mca_comm_cid_context_alloc (*newcomm, comm, bridgecomm, arg0, arg1, "activate",
 506                                           send_first, mode);
 507     if (NULL == context) {
 508         return OMPI_ERR_OUT_OF_RESOURCE;
 509     }
 510 
 511     /* keep track of the pointer so it can be set to MPI_COMM_NULL on failure */
 512     context->newcommp = newcomm;
 513 
 514     request = ompi_comm_request_get ();
 515     if (NULL == request) {
 516         OBJ_RELEASE(context);
 517         return OMPI_ERR_OUT_OF_RESOURCE;
 518     }
 519 
 520     request->context = &context->super;
 521 
 522     if (MPI_UNDEFINED != (*newcomm)->c_local_group->grp_my_rank) {
 523         /* Initialize the PML stuff in the newcomm  */
 524         if ( OMPI_SUCCESS != (ret = MCA_PML_CALL(add_comm(*newcomm))) ) {
 525             OBJ_RELEASE(*newcomm);
 526             OBJ_RELEASE(context);
 527             *newcomm = MPI_COMM_NULL;
 528             return ret;
 529         }
 530         OMPI_COMM_SET_PML_ADDED(*newcomm);
 531     }
 532 
 533     /* Step 1: the barrier, after which it is allowed to
 534      * send messages over the new communicator
 535      */
 536     ret = context->allreduce_fn (&context->ok, &context->ok, 1, MPI_MIN, context,
 537                                  &subreq);
 538     if (OMPI_SUCCESS != ret) {
 539         ompi_comm_request_return (request);
 540         return ret;
 541     }
 542 
 543     ompi_comm_request_schedule_append (request, ompi_comm_activate_nb_complete, &subreq, 1);
 544     ompi_comm_request_start (request);
 545 
 546     *req = &request->super;
 547 
 548     return OMPI_SUCCESS;
 549 }
 550 
 551 int ompi_comm_activate (ompi_communicator_t **newcomm, ompi_communicator_t *comm,
 552                         ompi_communicator_t *bridgecomm, const void *arg0,
 553                         const void *arg1, bool send_first, int mode)
 554 {
 555     ompi_request_t *req;
 556     int rc;
 557 
 558     rc = ompi_comm_activate_nb (newcomm, comm, bridgecomm, arg0, arg1, send_first, mode, &req);
 559     if (OMPI_SUCCESS != rc) {
 560         return rc;
 561     }
 562 
 563     ompi_request_wait_completion (req);
 564     rc = req->req_status.MPI_ERROR;
 565     ompi_comm_request_return ((ompi_comm_request_t *) req);
 566 
 567     return rc;
 568 }
 569 
 570 static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request)
 571 {
 572     ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context;
 573     int ret;
 574 
 575     /**
 576      * Check to see if this process is in the new communicator.
 577      *
 578      * Specifically, this function is invoked by all proceses in the
 579      * old communicator, regardless of whether they are in the new
 580      * communicator or not.  This is because it is far simpler to use
 581      * MPI collective functions on the old communicator to determine
 582      * some data for the new communicator (e.g., remote_leader) than
 583      * to kludge up our own pseudo-collective routines over just the
 584      * processes in the new communicator.  Hence, *all* processes in
 585      * the old communicator need to invoke this function.
 586      *
 587      * That being said, only processes in the new communicator need to
 588      * select a coll module for the new communicator.  More
 589      * specifically, proceses who are not in the new communicator
 590      * should *not* select a coll module -- for example,
 591      * ompi_comm_rank(newcomm) returns MPI_UNDEFINED for processes who
 592      * are not in the new communicator.  This can cause errors in the
 593      * selection / initialization of a coll module.  Plus, it's
 594      * wasteful -- processes in the new communicator will end up
 595      * freeing the new communicator anyway, so we might as well leave
 596      * the coll selection as NULL (the coll base comm unselect code
 597      * handles that case properly).
 598      */
 599     if (MPI_UNDEFINED == (context->newcomm)->c_local_group->grp_my_rank) {
 600         return OMPI_SUCCESS;
 601     }
 602 
 603     /* Let the collectives components fight over who will do
 604        collective on this new comm.  */
 605     if (OMPI_SUCCESS != (ret = mca_coll_base_comm_select(context->newcomm))) {
 606         OBJ_RELEASE(context->newcomm);
 607         *context->newcommp = MPI_COMM_NULL;
 608         return ret;
 609     }
 610 
 611     /* For an inter communicator, we have to deal with the potential
 612      * problem of what is happening if the local_comm that we created
 613      * has a lower CID than the parent comm. This is not a problem
 614      * as long as the user calls MPI_Comm_free on the inter communicator.
 615      * However, if the communicators are not freed by the user but released
 616      * by Open MPI in MPI_Finalize, we walk through the list of still available
 617      * communicators and free them one by one. Thus, local_comm is freed before
 618      * the actual inter-communicator. However, the local_comm pointer in the
 619      * inter communicator will still contain the 'previous' address of the local_comm
 620      * and thus this will lead to a segmentation violation. In order to prevent
 621      * that from happening, we increase the reference counter local_comm
 622      * by one if its CID is lower than the parent. We cannot increase however
 623      *  its reference counter if the CID of local_comm is larger than
 624      * the CID of the inter communicators, since a regular MPI_Comm_free would
 625      * leave in that the case the local_comm hanging around and thus we would not
 626      * recycle CID's properly, which was the reason and the cause for this trouble.
 627      */
 628     if (OMPI_COMM_IS_INTER(context->newcomm)) {
 629         if (OMPI_COMM_CID_IS_LOWER(context->newcomm, context->comm)) {
 630             OMPI_COMM_SET_EXTRA_RETAIN (context->newcomm);
 631             OBJ_RETAIN (context->newcomm);
 632         }
 633     }
 634 
 635     /* done */
 636     return OMPI_SUCCESS;
 637 }
 638 
 639 /**************************************************************************/
 640 /**************************************************************************/
 641 /**************************************************************************/
 642 static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf, int count, struct ompi_op_t *op,
 643                                          ompi_comm_cid_context_t *context, ompi_request_t **req)
 644 {
 645     ompi_communicator_t *comm = context->comm;
 646 
 647     return comm->c_coll->coll_iallreduce (inbuf, outbuf, count, MPI_INT, op, comm,
 648                                          req, comm->c_coll->coll_iallreduce_module);
 649 }
 650 
 651 /* Non-blocking version of ompi_comm_allreduce_inter */
 652 static int ompi_comm_allreduce_inter_leader_exchange (ompi_comm_request_t *request);
 653 static int ompi_comm_allreduce_inter_leader_reduce (ompi_comm_request_t *request);
 654 static int ompi_comm_allreduce_inter_bcast (ompi_comm_request_t *request);
 655 
 656 static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf,
 657                                          int count, struct ompi_op_t *op,
 658                                          ompi_comm_cid_context_t *cid_context,
 659                                          ompi_request_t **req)
 660 {
 661     ompi_communicator_t *intercomm = cid_context->comm;
 662     ompi_comm_allreduce_context_t *context;
 663     ompi_comm_request_t *request;
 664     ompi_request_t *subreq;
 665     int local_rank, rc;
 666 
 667     if (!OMPI_COMM_IS_INTER (cid_context->comm)) {
 668         return MPI_ERR_COMM;
 669     }
 670 
 671     request = ompi_comm_request_get ();
 672     if (OPAL_UNLIKELY(NULL == request)) {
 673         return OMPI_ERR_OUT_OF_RESOURCE;
 674     }
 675 
 676     context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context);
 677     if (OPAL_UNLIKELY(NULL == context)) {
 678         ompi_comm_request_return (request);
 679         return OMPI_ERR_OUT_OF_RESOURCE;
 680     }
 681 
 682     request->context = &context->super;
 683 
 684     /* Allocate temporary arrays */
 685     local_rank = ompi_comm_rank (intercomm);
 686 
 687     if (0 == local_rank) {
 688         context->tmpbuf  = (int *) calloc (count, sizeof(int));
 689         if (OPAL_UNLIKELY (NULL == context->tmpbuf)) {
 690             ompi_comm_request_return (request);
 691             return OMPI_ERR_OUT_OF_RESOURCE;
 692         }
 693     }
 694 
 695     /* Execute the inter-allreduce: the result from the local will be in the buffer of the remote group
 696      * and vise-versa. */
 697     rc = intercomm->c_local_comm->c_coll->coll_ireduce (inbuf, context->tmpbuf, count, MPI_INT, op, 0,
 698                                                        intercomm->c_local_comm, &subreq,
 699                                                        intercomm->c_local_comm->c_coll->coll_ireduce_module);
 700     if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
 701         ompi_comm_request_return (request);
 702         return rc;
 703     }
 704 
 705     if (0 == local_rank) {
 706         ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_leader_exchange, &subreq, 1);
 707     } else {
 708         ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_bcast, &subreq, 1);
 709     }
 710 
 711     ompi_comm_request_start (request);
 712     *req = &request->super;
 713 
 714     return OMPI_SUCCESS;
 715 }
 716 
 717 
 718 static int ompi_comm_allreduce_inter_leader_exchange (ompi_comm_request_t *request)
 719 {
 720     ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
 721     ompi_communicator_t *intercomm = context->cid_context->comm;
 722     ompi_request_t *subreqs[2];
 723     int rc;
 724 
 725     /* local leader exchange their data and determine the overall result
 726        for both groups */
 727     rc = MCA_PML_CALL(irecv (context->outbuf, context->count, MPI_INT, 0, OMPI_COMM_ALLREDUCE_TAG,
 728                              intercomm, subreqs));
 729     if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
 730         return rc;
 731     }
 732 
 733     rc = MCA_PML_CALL(isend (context->tmpbuf, context->count, MPI_INT, 0, OMPI_COMM_ALLREDUCE_TAG,
 734                              MCA_PML_BASE_SEND_STANDARD, intercomm, subreqs + 1));
 735     if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
 736         return rc;
 737     }
 738 
 739     return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_leader_reduce, subreqs, 2);
 740 }
 741 
 742 static int ompi_comm_allreduce_inter_leader_reduce (ompi_comm_request_t *request)
 743 {
 744     ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
 745 
 746     ompi_op_reduce (context->op, context->tmpbuf, context->outbuf, context->count, MPI_INT);
 747 
 748     return ompi_comm_allreduce_inter_bcast (request);
 749 }
 750 
 751 
 752 static int ompi_comm_allreduce_inter_bcast (ompi_comm_request_t *request)
 753 {
 754     ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
 755     ompi_communicator_t *comm = context->cid_context->comm->c_local_comm;
 756     ompi_request_t *subreq;
 757     int rc;
 758 
 759     /* both roots have the same result. broadcast to the local group */
 760     rc = comm->c_coll->coll_ibcast (context->outbuf, context->count, MPI_INT, 0, comm,
 761                                    &subreq, comm->c_coll->coll_ibcast_module);
 762     if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
 763         return rc;
 764     }
 765 
 766     return ompi_comm_request_schedule_append (request, NULL, &subreq, 1);
 767 }
 768 
 769 static int ompi_comm_allreduce_bridged_schedule_bcast (ompi_comm_request_t *request)
 770 {
 771     ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
 772     ompi_communicator_t *comm = context->cid_context->comm;
 773     ompi_request_t *subreq;
 774     int rc;
 775 
 776     rc = comm->c_coll->coll_ibcast (context->outbuf, context->count, MPI_INT,
 777                                    context->cid_context->local_leader, comm,
 778                                    &subreq, comm->c_coll->coll_ibcast_module);
 779     if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
 780         return rc;
 781     }
 782 
 783     return ompi_comm_request_schedule_append (request, NULL, &subreq, 1);
 784 }
 785 
 786 static int ompi_comm_allreduce_bridged_xchng_complete (ompi_comm_request_t *request)
 787 {
 788     ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
 789 
 790     /* step 3: reduce leader data */
 791     ompi_op_reduce (context->op, context->tmpbuf, context->outbuf, context->count, MPI_INT);
 792 
 793     /* schedule the broadcast to local peers */
 794     return ompi_comm_allreduce_bridged_schedule_bcast (request);
 795 }
 796 
 797 static int ompi_comm_allreduce_bridged_reduce_complete (ompi_comm_request_t *request)
 798 {
 799     ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
 800     ompi_communicator_t *bridgecomm = context->cid_context->bridgecomm;
 801     ompi_request_t *subreq[2];
 802     int rc;
 803 
 804     /* step 2: leader exchange */
 805     rc = MCA_PML_CALL(irecv (context->outbuf, context->count, MPI_INT, context->cid_context->remote_leader,
 806                              OMPI_COMM_ALLREDUCE_TAG, bridgecomm, subreq + 1));
 807     if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
 808         return rc;
 809     }
 810 
 811     rc = MCA_PML_CALL(isend (context->tmpbuf, context->count, MPI_INT, context->cid_context->remote_leader,
 812                              OMPI_COMM_ALLREDUCE_TAG, MCA_PML_BASE_SEND_STANDARD, bridgecomm,
 813                              subreq));
 814     if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
 815         return rc;
 816     }
 817 
 818     return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_bridged_xchng_complete, subreq, 2);
 819 }
 820 
 821 static int ompi_comm_allreduce_intra_bridge_nb (int *inbuf, int *outbuf,
 822                                                 int count, struct ompi_op_t *op,
 823                                                 ompi_comm_cid_context_t *cid_context,
 824                                                 ompi_request_t **req)
 825 {
 826     ompi_communicator_t *comm = cid_context->comm;
 827     ompi_comm_allreduce_context_t *context;
 828     int local_rank = ompi_comm_rank (comm);
 829     ompi_comm_request_t *request;
 830     ompi_request_t *subreq;
 831     int rc;
 832 
 833     context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context);
 834     if (OPAL_UNLIKELY(NULL == context)) {
 835         return OMPI_ERR_OUT_OF_RESOURCE;
 836     }
 837 
 838     if (local_rank == cid_context->local_leader) {
 839         context->tmpbuf = (int *) calloc (count, sizeof (int));
 840         if (OPAL_UNLIKELY(NULL == context->tmpbuf)) {
 841             OBJ_RELEASE(context);
 842             return OMPI_ERR_OUT_OF_RESOURCE;
 843         }
 844     }
 845 
 846     request = ompi_comm_request_get ();
 847     if (OPAL_UNLIKELY(NULL == request)) {
 848         OBJ_RELEASE(context);
 849         return OMPI_ERR_OUT_OF_RESOURCE;
 850     }
 851 
 852     request->context = &context->super;
 853 
 854     if (cid_context->local_leader == local_rank) {
 855         memcpy (context->tmpbuf, inbuf, count * sizeof (int));
 856     }
 857 
 858     /* step 1: reduce to the local leader */
 859     rc = comm->c_coll->coll_ireduce (inbuf, context->tmpbuf, count, MPI_INT, op,
 860                                     cid_context->local_leader, comm, &subreq,
 861                                     comm->c_coll->coll_ireduce_module);
 862     if ( OMPI_SUCCESS != rc ) {
 863         ompi_comm_request_return (request);
 864         return rc;
 865     }
 866 
 867     if (cid_context->local_leader == local_rank) {
 868         rc = ompi_comm_request_schedule_append (request, ompi_comm_allreduce_bridged_reduce_complete,
 869                                                 &subreq, 1);
 870     } else {
 871         /* go ahead and schedule the broadcast */
 872         ompi_comm_request_schedule_append (request, NULL, &subreq, 1);
 873 
 874         rc = ompi_comm_allreduce_bridged_schedule_bcast (request);
 875     }
 876 
 877     if (OMPI_SUCCESS != rc) {
 878         ompi_comm_request_return (request);
 879         return rc;
 880     }
 881 
 882     ompi_comm_request_start (request);
 883 
 884     *req = &request->super;
 885 
 886     return OMPI_SUCCESS;
 887 }
 888 
 889 static int ompi_comm_allreduce_pmix_reduce_complete (ompi_comm_request_t *request)
 890 {
 891     ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
 892     ompi_comm_cid_context_t *cid_context = context->cid_context;
 893     int32_t size_count = context->count;
 894     opal_value_t info;
 895     opal_pmix_pdata_t pdat;
 896     opal_buffer_t sbuf;
 897     int rc;
 898     int bytes_written;
 899     const int output_id = 0;
 900     const int verbosity_level = 1;
 901 
 902     OBJ_CONSTRUCT(&sbuf, opal_buffer_t);
 903 
 904     if (OPAL_SUCCESS != (rc = opal_dss.pack(&sbuf, context->tmpbuf, (int32_t)context->count, OPAL_INT))) {
 905         OBJ_DESTRUCT(&sbuf);
 906         opal_output_verbose (verbosity_level, output_id, "pack failed. rc  %d\n", rc);
 907         return rc;
 908     }
 909 
 910     OBJ_CONSTRUCT(&info, opal_value_t);
 911     OBJ_CONSTRUCT(&pdat, opal_pmix_pdata_t);
 912 
 913     info.type = OPAL_BYTE_OBJECT;
 914     pdat.value.type = OPAL_BYTE_OBJECT;
 915 
 916     opal_dss.unload(&sbuf, (void**)&info.data.bo.bytes, &info.data.bo.size);
 917     OBJ_DESTRUCT(&sbuf);
 918 
 919     bytes_written = opal_asprintf(&info.key,
 920                              cid_context->send_first ? "%s:%s:send:%d"
 921                                                      : "%s:%s:recv:%d",
 922                              cid_context->port_string,
 923                              cid_context->pmix_tag,
 924                              cid_context->iter);
 925 
 926     if (bytes_written == -1) {
 927         opal_output_verbose (verbosity_level, output_id, "writing info.key failed\n");
 928     } else {
 929         bytes_written = opal_asprintf(&pdat.value.key,
 930                                  cid_context->send_first ? "%s:%s:recv:%d"
 931                                                          : "%s:%s:send:%d",
 932                                  cid_context->port_string,
 933                                  cid_context->pmix_tag,
 934                                  cid_context->iter);
 935 
 936         if (bytes_written == -1) {
 937             opal_output_verbose (verbosity_level, output_id, "writing pdat.value.key failed\n");
 938         }
 939     }
 940 
 941     if (bytes_written == -1) {
 942         // write with separate calls,
 943         // just in case the args are the cause of failure
 944         opal_output_verbose (verbosity_level, output_id, "send first: %d\n", cid_context->send_first);
 945         opal_output_verbose (verbosity_level, output_id, "port string: %s\n", cid_context->port_string);
 946         opal_output_verbose (verbosity_level, output_id, "pmix tag: %s\n", cid_context->pmix_tag);
 947         opal_output_verbose (verbosity_level, output_id, "iter: %d\n", cid_context->iter);
 948         return OMPI_ERR_OUT_OF_RESOURCE;
 949     }
 950 
 951     /* this macro is not actually non-blocking. if a non-blocking version becomes available this function
 952      * needs to be reworked to take advantage of it. */
 953     OPAL_PMIX_EXCHANGE(rc, &info, &pdat, 600);  // give them 10 minutes
 954     OBJ_DESTRUCT(&info);
 955     if (OPAL_SUCCESS != rc) {
 956         OBJ_DESTRUCT(&pdat);
 957         return rc;
 958     }
 959 
 960     OBJ_CONSTRUCT(&sbuf, opal_buffer_t);
 961     opal_dss.load(&sbuf, pdat.value.data.bo.bytes, pdat.value.data.bo.size);
 962     pdat.value.data.bo.bytes = NULL;
 963     pdat.value.data.bo.size = 0;
 964     OBJ_DESTRUCT(&pdat);
 965 
 966     rc = opal_dss.unpack (&sbuf, context->outbuf, &size_count, OPAL_INT);
 967     OBJ_DESTRUCT(&sbuf);
 968     if (OPAL_UNLIKELY(OPAL_SUCCESS != rc)) {
 969         return rc;
 970     }
 971 
 972     ompi_op_reduce (context->op, context->tmpbuf, context->outbuf, size_count, MPI_INT);
 973 
 974     return ompi_comm_allreduce_bridged_schedule_bcast (request);
 975 }
 976 
 977 static int ompi_comm_allreduce_intra_pmix_nb (int *inbuf, int *outbuf,
 978                                               int count, struct ompi_op_t *op,
 979                                               ompi_comm_cid_context_t *cid_context,
 980                                               ompi_request_t **req)
 981 {
 982     ompi_communicator_t *comm = cid_context->comm;
 983     ompi_comm_allreduce_context_t *context;
 984     int local_rank = ompi_comm_rank (comm);
 985     ompi_comm_request_t *request;
 986     ompi_request_t *subreq;
 987     int rc;
 988 
 989     context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context);
 990     if (OPAL_UNLIKELY(NULL == context)) {
 991         return OMPI_ERR_OUT_OF_RESOURCE;
 992     }
 993 
 994     if (cid_context->local_leader == local_rank) {
 995         context->tmpbuf = (int *) calloc (count, sizeof(int));
 996         if (OPAL_UNLIKELY(NULL == context->tmpbuf)) {
 997             OBJ_RELEASE(context);
 998             return OMPI_ERR_OUT_OF_RESOURCE;
 999         }
1000     }
1001 
1002     request = ompi_comm_request_get ();
1003     if (NULL == request) {
1004         OBJ_RELEASE(context);
1005         return OMPI_ERR_OUT_OF_RESOURCE;
1006     }
1007 
1008     request->context = &context->super;
1009 
1010     /* comm is an intra-communicator */
1011     rc = comm->c_coll->coll_ireduce (inbuf, context->tmpbuf, count, MPI_INT, op,
1012                                     cid_context->local_leader, comm,
1013                                     &subreq, comm->c_coll->coll_ireduce_module);
1014     if ( OMPI_SUCCESS != rc ) {
1015         ompi_comm_request_return (request);
1016         return rc;
1017     }
1018 
1019     if (cid_context->local_leader == local_rank) {
1020         rc = ompi_comm_request_schedule_append (request, ompi_comm_allreduce_pmix_reduce_complete,
1021                                                 &subreq, 1);
1022     } else {
1023         /* go ahead and schedule the broadcast */
1024         rc = ompi_comm_request_schedule_append (request, NULL, &subreq, 1);
1025 
1026         rc = ompi_comm_allreduce_bridged_schedule_bcast (request);
1027     }
1028 
1029     if (OMPI_SUCCESS != rc) {
1030         ompi_comm_request_return (request);
1031         return rc;
1032     }
1033 
1034     ompi_comm_request_start (request);
1035     *req = (ompi_request_t *) request;
1036 
1037     /* use the same function as bridged to schedule the broadcast */
1038     return OMPI_SUCCESS;
1039 }
1040 
1041 static int ompi_comm_allreduce_group_broadcast (ompi_comm_request_t *request)
1042 {
1043     ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
1044     ompi_comm_cid_context_t *cid_context = context->cid_context;
1045     ompi_request_t *subreq[2];
1046     int subreq_count = 0;
1047     int rc;
1048 
1049     for (int i = 0 ; i < 2 ; ++i) {
1050         if (MPI_PROC_NULL != context->peers_comm[i + 1]) {
1051             rc = MCA_PML_CALL(isend(context->outbuf, context->count, MPI_INT, context->peers_comm[i+1],
1052                                     cid_context->pml_tag, MCA_PML_BASE_SEND_STANDARD,
1053                                     cid_context->comm, subreq + subreq_count++));
1054             if (OMPI_SUCCESS != rc) {
1055                 return rc;
1056             }
1057         }
1058     }
1059 
1060     return ompi_comm_request_schedule_append (request, NULL, subreq, subreq_count);
1061 }
1062 
1063 static int ompi_comm_allreduce_group_recv_complete (ompi_comm_request_t *request)
1064 {
1065     ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
1066     ompi_comm_cid_context_t *cid_context = context->cid_context;
1067     int *tmp = context->tmpbuf;
1068     ompi_request_t *subreq[2];
1069     int rc;
1070 
1071     for (int i = 0 ; i < 2 ; ++i) {
1072         if (MPI_PROC_NULL != context->peers_comm[i + 1]) {
1073             ompi_op_reduce (context->op, tmp, context->outbuf, context->count, MPI_INT);
1074             tmp += context->count;
1075         }
1076     }
1077 
1078     if (MPI_PROC_NULL != context->peers_comm[0]) {
1079         /* interior node */
1080         rc = MCA_PML_CALL(isend(context->outbuf, context->count, MPI_INT, context->peers_comm[0],
1081                                 cid_context->pml_tag, MCA_PML_BASE_SEND_STANDARD,
1082                                 cid_context->comm, subreq));
1083         if (OMPI_SUCCESS != rc) {
1084             return rc;
1085         }
1086 
1087         rc = MCA_PML_CALL(irecv(context->outbuf, context->count, MPI_INT, context->peers_comm[0],
1088                                 cid_context->pml_tag, cid_context->comm, subreq + 1));
1089         if (OMPI_SUCCESS != rc) {
1090             return rc;
1091         }
1092 
1093         return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_group_broadcast, subreq, 2);
1094     }
1095 
1096     /* root */
1097     return ompi_comm_allreduce_group_broadcast (request);
1098 }
1099 
1100 static int ompi_comm_allreduce_group_nb (int *inbuf, int *outbuf, int count,
1101                                          struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context,
1102                                          ompi_request_t **req)
1103 {
1104     ompi_group_t *group = cid_context->newcomm->c_local_group;
1105     const int group_size = ompi_group_size (group);
1106     const int group_rank = ompi_group_rank (group);
1107     ompi_communicator_t *comm = cid_context->comm;
1108     int peers_group[3], *tmp, subreq_count = 0;
1109     ompi_comm_allreduce_context_t *context;
1110     ompi_comm_request_t *request;
1111     ompi_request_t *subreq[3];
1112 
1113     context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context);
1114     if (NULL == context) {
1115         return OMPI_ERR_OUT_OF_RESOURCE;
1116     }
1117 
1118     tmp = context->tmpbuf = calloc (sizeof (int), count * 3);
1119     if (NULL == context->tmpbuf) {
1120         OBJ_RELEASE(context);
1121         return OMPI_ERR_OUT_OF_RESOURCE;
1122     }
1123 
1124     request = ompi_comm_request_get ();
1125     if (NULL == request) {
1126         OBJ_RELEASE(context);
1127         return OMPI_ERR_OUT_OF_RESOURCE;
1128     }
1129 
1130     request->context = &context->super;
1131 
1132     /* basic recursive doubling allreduce on the group */
1133     peers_group[0] = group_rank ? ((group_rank - 1) >> 1) : MPI_PROC_NULL;
1134     peers_group[1] = (group_rank * 2 + 1) < group_size ? group_rank * 2 + 1: MPI_PROC_NULL;
1135     peers_group[2] = (group_rank * 2 + 2) < group_size ? group_rank * 2 + 2 : MPI_PROC_NULL;
1136 
1137     /* translate the ranks into the ranks of the parent communicator */
1138     ompi_group_translate_ranks (group, 3, peers_group, comm->c_local_group, context->peers_comm);
1139 
1140     /* reduce */
1141     memmove (outbuf, inbuf, sizeof (int) * count);
1142 
1143     for (int i = 0 ; i < 2 ; ++i) {
1144         if (MPI_PROC_NULL != context->peers_comm[i + 1]) {
1145             int rc = MCA_PML_CALL(irecv(tmp, count, MPI_INT, context->peers_comm[i + 1],
1146                                         cid_context->pml_tag, comm, subreq + subreq_count++));
1147             if (OMPI_SUCCESS != rc) {
1148                 ompi_comm_request_return (request);
1149                 return rc;
1150             }
1151 
1152             tmp += count;
1153         }
1154     }
1155 
1156     ompi_comm_request_schedule_append (request, ompi_comm_allreduce_group_recv_complete, subreq, subreq_count);
1157 
1158     ompi_comm_request_start (request);
1159     *req = &request->super;
1160 
1161     return OMPI_SUCCESS;
1162 }

/* [<][>][^][v][top][bottom][index][help] */