root/ompi/mca/coll/base/coll_base_allreduce.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. ompi_coll_base_allreduce_intra_nonoverlapping
  2. ompi_coll_base_allreduce_intra_recursivedoubling
  3. ompi_coll_base_allreduce_intra_ring
  4. ompi_coll_base_allreduce_intra_ring_segmented
  5. ompi_coll_base_allreduce_intra_basic_linear
  6. ompi_coll_base_allreduce_intra_redscat_allgather

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   4  *                         University Research and Technology
   5  *                         Corporation.  All rights reserved.
   6  * Copyright (c) 2004-2017 The University of Tennessee and The University
   7  *                         of Tennessee Research Foundation.  All rights
   8  *                         reserved.
   9  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
  10  *                         University of Stuttgart.  All rights reserved.
  11  * Copyright (c) 2004-2005 The Regents of the University of California.
  12  *                         All rights reserved.
  13  * Copyright (c) 2009      University of Houston. All rights reserved.
  14  * Copyright (c) 2013      Los Alamos National Security, LLC. All Rights
  15  *                         reserved.
  16  * Copyright (c) 2015-2017 Research Organization for Information Science
  17  *                         and Technology (RIST). All rights reserved.
  18  * Copyright (c) 2018      Siberian State University of Telecommunications
  19  *                         and Information Science. All rights reserved.
  20  * $COPYRIGHT$
  21  *
  22  * Additional copyrights may follow
  23  *
  24  * $HEADER$
  25  */
  26 
  27 #include "ompi_config.h"
  28 
  29 #include "mpi.h"
  30 #include "opal/util/bit_ops.h"
  31 #include "ompi/constants.h"
  32 #include "ompi/datatype/ompi_datatype.h"
  33 #include "ompi/communicator/communicator.h"
  34 #include "ompi/mca/coll/coll.h"
  35 #include "ompi/mca/coll/base/coll_tags.h"
  36 #include "ompi/mca/pml/pml.h"
  37 #include "ompi/op/op.h"
  38 #include "ompi/mca/coll/base/coll_base_functions.h"
  39 #include "coll_base_topo.h"
  40 #include "coll_base_util.h"
  41 
  42 /*
  43  * ompi_coll_base_allreduce_intra_nonoverlapping
  44  *
  45  * This function just calls a reduce followed by a broadcast
  46  * both called functions are base but they complete sequentially,
  47  * i.e. no additional overlapping
  48  * meaning if the number of segments used is greater than the topo depth
  49  * then once the first segment of data is fully 'reduced' it is not broadcast
  50  * while the reduce continues (cost = cost-reduce + cost-bcast + decision x 3)
  51  *
  52  */
  53 int
  54 ompi_coll_base_allreduce_intra_nonoverlapping(const void *sbuf, void *rbuf, int count,
  55                                                struct ompi_datatype_t *dtype,
  56                                                struct ompi_op_t *op,
  57                                                struct ompi_communicator_t *comm,
  58                                                mca_coll_base_module_t *module)
  59 {
  60     int err, rank;
  61 
  62     rank = ompi_comm_rank(comm);
  63 
  64     OPAL_OUTPUT((ompi_coll_base_framework.framework_output,"coll:base:allreduce_intra_nonoverlapping rank %d", rank));
  65 
  66     /* Reduce to 0 and broadcast. */
  67 
  68     if (MPI_IN_PLACE == sbuf) {
  69         if (0 == rank) {
  70             err = comm->c_coll->coll_reduce (MPI_IN_PLACE, rbuf, count, dtype,
  71                                             op, 0, comm, comm->c_coll->coll_reduce_module);
  72         } else {
  73             err = comm->c_coll->coll_reduce (rbuf, NULL, count, dtype, op, 0,
  74                                             comm, comm->c_coll->coll_reduce_module);
  75         }
  76     } else {
  77         err = comm->c_coll->coll_reduce (sbuf, rbuf, count, dtype, op, 0,
  78                                         comm, comm->c_coll->coll_reduce_module);
  79     }
  80     if (MPI_SUCCESS != err) {
  81         return err;
  82     }
  83 
  84     return comm->c_coll->coll_bcast (rbuf, count, dtype, 0, comm,
  85                                     comm->c_coll->coll_bcast_module);
  86 }
  87 
  88 /*
  89  *   ompi_coll_base_allreduce_intra_recursivedoubling
  90  *
  91  *   Function:       Recursive doubling algorithm for allreduce operation
  92  *   Accepts:        Same as MPI_Allreduce()
  93  *   Returns:        MPI_SUCCESS or error code
  94  *
  95  *   Description:    Implements recursive doubling algorithm for allreduce.
  96  *                   Original (non-segmented) implementation is used in MPICH-2
  97  *                   for small and intermediate size messages.
  98  *                   The algorithm preserves order of operations so it can
  99  *                   be used both by commutative and non-commutative operations.
 100  *
 101  *         Example on 7 nodes:
 102  *         Initial state
 103  *         #      0       1      2       3      4       5      6
 104  *               [0]     [1]    [2]     [3]    [4]     [5]    [6]
 105  *         Initial adjustment step for non-power of two nodes.
 106  *         old rank      1              3              5      6
 107  *         new rank      0              1              2      3
 108  *                     [0+1]          [2+3]          [4+5]   [6]
 109  *         Step 1
 110  *         old rank      1              3              5      6
 111  *         new rank      0              1              2      3
 112  *                     [0+1+]         [0+1+]         [4+5+]  [4+5+]
 113  *                     [2+3+]         [2+3+]         [6   ]  [6   ]
 114  *         Step 2
 115  *         old rank      1              3              5      6
 116  *         new rank      0              1              2      3
 117  *                     [0+1+]         [0+1+]         [0+1+]  [0+1+]
 118  *                     [2+3+]         [2+3+]         [2+3+]  [2+3+]
 119  *                     [4+5+]         [4+5+]         [4+5+]  [4+5+]
 120  *                     [6   ]         [6   ]         [6   ]  [6   ]
 121  *         Final adjustment step for non-power of two nodes
 122  *         #      0       1      2       3      4       5      6
 123  *              [0+1+] [0+1+] [0+1+]  [0+1+] [0+1+]  [0+1+] [0+1+]
 124  *              [2+3+] [2+3+] [2+3+]  [2+3+] [2+3+]  [2+3+] [2+3+]
 125  *              [4+5+] [4+5+] [4+5+]  [4+5+] [4+5+]  [4+5+] [4+5+]
 126  *              [6   ] [6   ] [6   ]  [6   ] [6   ]  [6   ] [6   ]
 127  *
 128  */
 129 int
 130 ompi_coll_base_allreduce_intra_recursivedoubling(const void *sbuf, void *rbuf,
 131                                                   int count,
 132                                                   struct ompi_datatype_t *dtype,
 133                                                   struct ompi_op_t *op,
 134                                                   struct ompi_communicator_t *comm,
 135                                                   mca_coll_base_module_t *module)
 136 {
 137     int ret, line, rank, size, adjsize, remote, distance;
 138     int newrank, newremote, extra_ranks;
 139     char *tmpsend = NULL, *tmprecv = NULL, *tmpswap = NULL, *inplacebuf_free = NULL, *inplacebuf;
 140     ptrdiff_t span, gap = 0;
 141 
 142     size = ompi_comm_size(comm);
 143     rank = ompi_comm_rank(comm);
 144 
 145     OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
 146                  "coll:base:allreduce_intra_recursivedoubling rank %d", rank));
 147 
 148     /* Special case for size == 1 */
 149     if (1 == size) {
 150         if (MPI_IN_PLACE != sbuf) {
 151             ret = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf);
 152             if (ret < 0) { line = __LINE__; goto error_hndl; }
 153         }
 154         return MPI_SUCCESS;
 155     }
 156 
 157     /* Allocate and initialize temporary send buffer */
 158     span = opal_datatype_span(&dtype->super, count, &gap);
 159     inplacebuf_free = (char*) malloc(span);
 160     if (NULL == inplacebuf_free) { ret = -1; line = __LINE__; goto error_hndl; }
 161     inplacebuf = inplacebuf_free - gap;
 162 
 163     if (MPI_IN_PLACE == sbuf) {
 164         ret = ompi_datatype_copy_content_same_ddt(dtype, count, inplacebuf, (char*)rbuf);
 165         if (ret < 0) { line = __LINE__; goto error_hndl; }
 166     } else {
 167         ret = ompi_datatype_copy_content_same_ddt(dtype, count, inplacebuf, (char*)sbuf);
 168         if (ret < 0) { line = __LINE__; goto error_hndl; }
 169     }
 170 
 171     tmpsend = (char*) inplacebuf;
 172     tmprecv = (char*) rbuf;
 173 
 174     /* Determine nearest power of two less than or equal to size */
 175     adjsize = opal_next_poweroftwo (size);
 176     adjsize >>= 1;
 177 
 178     /* Handle non-power-of-two case:
 179        - Even ranks less than 2 * extra_ranks send their data to (rank + 1), and
 180        sets new rank to -1.
 181        - Odd ranks less than 2 * extra_ranks receive data from (rank - 1),
 182        apply appropriate operation, and set new rank to rank/2
 183        - Everyone else sets rank to rank - extra_ranks
 184     */
 185     extra_ranks = size - adjsize;
 186     if (rank <  (2 * extra_ranks)) {
 187         if (0 == (rank % 2)) {
 188             ret = MCA_PML_CALL(send(tmpsend, count, dtype, (rank + 1),
 189                                     MCA_COLL_BASE_TAG_ALLREDUCE,
 190                                     MCA_PML_BASE_SEND_STANDARD, comm));
 191             if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
 192             newrank = -1;
 193         } else {
 194             ret = MCA_PML_CALL(recv(tmprecv, count, dtype, (rank - 1),
 195                                     MCA_COLL_BASE_TAG_ALLREDUCE, comm,
 196                                     MPI_STATUS_IGNORE));
 197             if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
 198             /* tmpsend = tmprecv (op) tmpsend */
 199             ompi_op_reduce(op, tmprecv, tmpsend, count, dtype);
 200             newrank = rank >> 1;
 201         }
 202     } else {
 203         newrank = rank - extra_ranks;
 204     }
 205 
 206     /* Communication/Computation loop
 207        - Exchange message with remote node.
 208        - Perform appropriate operation taking in account order of operations:
 209        result = value (op) result
 210     */
 211     for (distance = 0x1; distance < adjsize; distance <<=1) {
 212         if (newrank < 0) break;
 213         /* Determine remote node */
 214         newremote = newrank ^ distance;
 215         remote = (newremote < extra_ranks)?
 216             (newremote * 2 + 1):(newremote + extra_ranks);
 217 
 218         /* Exchange the data */
 219         ret = ompi_coll_base_sendrecv_actual(tmpsend, count, dtype, remote,
 220                                              MCA_COLL_BASE_TAG_ALLREDUCE,
 221                                              tmprecv, count, dtype, remote,
 222                                              MCA_COLL_BASE_TAG_ALLREDUCE,
 223                                              comm, MPI_STATUS_IGNORE);
 224         if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
 225 
 226         /* Apply operation */
 227         if (rank < remote) {
 228             /* tmprecv = tmpsend (op) tmprecv */
 229             ompi_op_reduce(op, tmpsend, tmprecv, count, dtype);
 230             tmpswap = tmprecv;
 231             tmprecv = tmpsend;
 232             tmpsend = tmpswap;
 233         } else {
 234             /* tmpsend = tmprecv (op) tmpsend */
 235             ompi_op_reduce(op, tmprecv, tmpsend, count, dtype);
 236         }
 237     }
 238 
 239     /* Handle non-power-of-two case:
 240        - Odd ranks less than 2 * extra_ranks send result from tmpsend to
 241        (rank - 1)
 242        - Even ranks less than 2 * extra_ranks receive result from (rank + 1)
 243     */
 244     if (rank < (2 * extra_ranks)) {
 245         if (0 == (rank % 2)) {
 246             ret = MCA_PML_CALL(recv(rbuf, count, dtype, (rank + 1),
 247                                     MCA_COLL_BASE_TAG_ALLREDUCE, comm,
 248                                     MPI_STATUS_IGNORE));
 249             if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
 250             tmpsend = (char*)rbuf;
 251         } else {
 252             ret = MCA_PML_CALL(send(tmpsend, count, dtype, (rank - 1),
 253                                     MCA_COLL_BASE_TAG_ALLREDUCE,
 254                                     MCA_PML_BASE_SEND_STANDARD, comm));
 255             if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
 256         }
 257     }
 258 
 259     /* Ensure that the final result is in rbuf */
 260     if (tmpsend != rbuf) {
 261         ret = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)rbuf, tmpsend);
 262         if (ret < 0) { line = __LINE__; goto error_hndl; }
 263     }
 264 
 265     if (NULL != inplacebuf_free) free(inplacebuf_free);
 266     return MPI_SUCCESS;
 267 
 268  error_hndl:
 269     OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "%s:%4d\tRank %d Error occurred %d\n",
 270                  __FILE__, line, rank, ret));
 271     (void)line;  // silence compiler warning
 272     if (NULL != inplacebuf_free) free(inplacebuf_free);
 273     return ret;
 274 }
 275 
 276 /*
 277  *   ompi_coll_base_allreduce_intra_ring
 278  *
 279  *   Function:       Ring algorithm for allreduce operation
 280  *   Accepts:        Same as MPI_Allreduce()
 281  *   Returns:        MPI_SUCCESS or error code
 282  *
 283  *   Description:    Implements ring algorithm for allreduce: the message is
 284  *                   automatically segmented to segment of size M/N.
 285  *                   Algorithm requires 2*N - 1 steps.
 286  *
 287  *   Limitations:    The algorithm DOES NOT preserve order of operations so it
 288  *                   can be used only for commutative operations.
 289  *                   In addition, algorithm cannot work if the total count is
 290  *                   less than size.
 291  *         Example on 5 nodes:
 292  *         Initial state
 293  *   #      0              1             2              3             4
 294  *        [00]           [10]          [20]           [30]           [40]
 295  *        [01]           [11]          [21]           [31]           [41]
 296  *        [02]           [12]          [22]           [32]           [42]
 297  *        [03]           [13]          [23]           [33]           [43]
 298  *        [04]           [14]          [24]           [34]           [44]
 299  *
 300  *        COMPUTATION PHASE
 301  *         Step 0: rank r sends block r to rank (r+1) and receives bloc (r-1)
 302  *                 from rank (r-1) [with wraparound].
 303  *    #     0              1             2              3             4
 304  *        [00]          [00+10]        [20]           [30]           [40]
 305  *        [01]           [11]         [11+21]         [31]           [41]
 306  *        [02]           [12]          [22]          [22+32]         [42]
 307  *        [03]           [13]          [23]           [33]         [33+43]
 308  *      [44+04]          [14]          [24]           [34]           [44]
 309  *
 310  *         Step 1: rank r sends block (r-1) to rank (r+1) and receives bloc
 311  *                 (r-2) from rank (r-1) [with wraparound].
 312  *    #      0              1             2              3             4
 313  *         [00]          [00+10]     [01+10+20]        [30]           [40]
 314  *         [01]           [11]         [11+21]      [11+21+31]        [41]
 315  *         [02]           [12]          [22]          [22+32]      [22+32+42]
 316  *      [33+43+03]        [13]          [23]           [33]         [33+43]
 317  *        [44+04]       [44+04+14]       [24]           [34]           [44]
 318  *
 319  *         Step 2: rank r sends block (r-2) to rank (r+1) and receives bloc
 320  *                 (r-2) from rank (r-1) [with wraparound].
 321  *    #      0              1             2              3             4
 322  *         [00]          [00+10]     [01+10+20]    [01+10+20+30]      [40]
 323  *         [01]           [11]         [11+21]      [11+21+31]    [11+21+31+41]
 324  *     [22+32+42+02]      [12]          [22]          [22+32]      [22+32+42]
 325  *      [33+43+03]    [33+43+03+13]     [23]           [33]         [33+43]
 326  *        [44+04]       [44+04+14]  [44+04+14+24]      [34]           [44]
 327  *
 328  *         Step 3: rank r sends block (r-3) to rank (r+1) and receives bloc
 329  *                 (r-3) from rank (r-1) [with wraparound].
 330  *    #      0              1             2              3             4
 331  *         [00]          [00+10]     [01+10+20]    [01+10+20+30]     [FULL]
 332  *        [FULL]           [11]        [11+21]      [11+21+31]    [11+21+31+41]
 333  *     [22+32+42+02]     [FULL]          [22]         [22+32]      [22+32+42]
 334  *      [33+43+03]    [33+43+03+13]     [FULL]          [33]         [33+43]
 335  *        [44+04]       [44+04+14]  [44+04+14+24]      [FULL]         [44]
 336  *
 337  *        DISTRIBUTION PHASE: ring ALLGATHER with ranks shifted by 1.
 338  *
 339  */
 340 int
 341 ompi_coll_base_allreduce_intra_ring(const void *sbuf, void *rbuf, int count,
 342                                      struct ompi_datatype_t *dtype,
 343                                      struct ompi_op_t *op,
 344                                      struct ompi_communicator_t *comm,
 345                                      mca_coll_base_module_t *module)
 346 {
 347     int ret, line, rank, size, k, recv_from, send_to, block_count, inbi;
 348     int early_segcount, late_segcount, split_rank, max_segcount;
 349     size_t typelng;
 350     char *tmpsend = NULL, *tmprecv = NULL, *inbuf[2] = {NULL, NULL};
 351     ptrdiff_t true_lb, true_extent, lb, extent;
 352     ptrdiff_t block_offset, max_real_segsize;
 353     ompi_request_t *reqs[2] = {MPI_REQUEST_NULL, MPI_REQUEST_NULL};
 354 
 355     size = ompi_comm_size(comm);
 356     rank = ompi_comm_rank(comm);
 357 
 358     OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
 359                  "coll:base:allreduce_intra_ring rank %d, count %d", rank, count));
 360 
 361     /* Special case for size == 1 */
 362     if (1 == size) {
 363         if (MPI_IN_PLACE != sbuf) {
 364             ret = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf);
 365             if (ret < 0) { line = __LINE__; goto error_hndl; }
 366         }
 367         return MPI_SUCCESS;
 368     }
 369 
 370     /* Special case for count less than size - use recursive doubling */
 371     if (count < size) {
 372         OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "coll:base:allreduce_ring rank %d/%d, count %d, switching to recursive doubling", rank, size, count));
 373         return (ompi_coll_base_allreduce_intra_recursivedoubling(sbuf, rbuf,
 374                                                                   count,
 375                                                                   dtype, op,
 376                                                                   comm, module));
 377     }
 378 
 379     /* Allocate and initialize temporary buffers */
 380     ret = ompi_datatype_get_extent(dtype, &lb, &extent);
 381     if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
 382     ret = ompi_datatype_get_true_extent(dtype, &true_lb, &true_extent);
 383     if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
 384     ret = ompi_datatype_type_size( dtype, &typelng);
 385     if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
 386 
 387     /* Determine the number of elements per block and corresponding
 388        block sizes.
 389        The blocks are divided into "early" and "late" ones:
 390        blocks 0 .. (split_rank - 1) are "early" and
 391        blocks (split_rank) .. (size - 1) are "late".
 392        Early blocks are at most 1 element larger than the late ones.
 393     */
 394     COLL_BASE_COMPUTE_BLOCKCOUNT( count, size, split_rank,
 395                                    early_segcount, late_segcount );
 396     max_segcount = early_segcount;
 397     max_real_segsize = true_extent + (max_segcount - 1) * extent;
 398 
 399 
 400     inbuf[0] = (char*)malloc(max_real_segsize);
 401     if (NULL == inbuf[0]) { ret = -1; line = __LINE__; goto error_hndl; }
 402     if (size > 2) {
 403         inbuf[1] = (char*)malloc(max_real_segsize);
 404         if (NULL == inbuf[1]) { ret = -1; line = __LINE__; goto error_hndl; }
 405     }
 406 
 407     /* Handle MPI_IN_PLACE */
 408     if (MPI_IN_PLACE != sbuf) {
 409         ret = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf);
 410         if (ret < 0) { line = __LINE__; goto error_hndl; }
 411     }
 412 
 413     /* Computation loop */
 414 
 415     /*
 416        For each of the remote nodes:
 417        - post irecv for block (r-1)
 418        - send block (r)
 419        - in loop for every step k = 2 .. n
 420        - post irecv for block (r + n - k) % n
 421        - wait on block (r + n - k + 1) % n to arrive
 422        - compute on block (r + n - k + 1) % n
 423        - send block (r + n - k + 1) % n
 424        - wait on block (r + 1)
 425        - compute on block (r + 1)
 426        - send block (r + 1) to rank (r + 1)
 427        Note that we must be careful when computing the begining of buffers and
 428        for send operations and computation we must compute the exact block size.
 429     */
 430     send_to = (rank + 1) % size;
 431     recv_from = (rank + size - 1) % size;
 432 
 433     inbi = 0;
 434     /* Initialize first receive from the neighbor on the left */
 435     ret = MCA_PML_CALL(irecv(inbuf[inbi], max_segcount, dtype, recv_from,
 436                              MCA_COLL_BASE_TAG_ALLREDUCE, comm, &reqs[inbi]));
 437     if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
 438     /* Send first block (my block) to the neighbor on the right */
 439     block_offset = ((rank < split_rank)?
 440                     ((ptrdiff_t)rank * (ptrdiff_t)early_segcount) :
 441                     ((ptrdiff_t)rank * (ptrdiff_t)late_segcount + split_rank));
 442     block_count = ((rank < split_rank)? early_segcount : late_segcount);
 443     tmpsend = ((char*)rbuf) + block_offset * extent;
 444     ret = MCA_PML_CALL(send(tmpsend, block_count, dtype, send_to,
 445                             MCA_COLL_BASE_TAG_ALLREDUCE,
 446                             MCA_PML_BASE_SEND_STANDARD, comm));
 447     if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
 448 
 449     for (k = 2; k < size; k++) {
 450         const int prevblock = (rank + size - k + 1) % size;
 451 
 452         inbi = inbi ^ 0x1;
 453 
 454         /* Post irecv for the current block */
 455         ret = MCA_PML_CALL(irecv(inbuf[inbi], max_segcount, dtype, recv_from,
 456                                  MCA_COLL_BASE_TAG_ALLREDUCE, comm, &reqs[inbi]));
 457         if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
 458 
 459         /* Wait on previous block to arrive */
 460         ret = ompi_request_wait(&reqs[inbi ^ 0x1], MPI_STATUS_IGNORE);
 461         if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
 462 
 463         /* Apply operation on previous block: result goes to rbuf
 464            rbuf[prevblock] = inbuf[inbi ^ 0x1] (op) rbuf[prevblock]
 465         */
 466         block_offset = ((prevblock < split_rank)?
 467                         ((ptrdiff_t)prevblock * early_segcount) :
 468                         ((ptrdiff_t)prevblock * late_segcount + split_rank));
 469         block_count = ((prevblock < split_rank)? early_segcount : late_segcount);
 470         tmprecv = ((char*)rbuf) + (ptrdiff_t)block_offset * extent;
 471         ompi_op_reduce(op, inbuf[inbi ^ 0x1], tmprecv, block_count, dtype);
 472 
 473         /* send previous block to send_to */
 474         ret = MCA_PML_CALL(send(tmprecv, block_count, dtype, send_to,
 475                                 MCA_COLL_BASE_TAG_ALLREDUCE,
 476                                 MCA_PML_BASE_SEND_STANDARD, comm));
 477         if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
 478     }
 479 
 480     /* Wait on the last block to arrive */
 481     ret = ompi_request_wait(&reqs[inbi], MPI_STATUS_IGNORE);
 482     if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
 483 
 484     /* Apply operation on the last block (from neighbor (rank + 1)
 485        rbuf[rank+1] = inbuf[inbi] (op) rbuf[rank + 1] */
 486     recv_from = (rank + 1) % size;
 487     block_offset = ((recv_from < split_rank)?
 488                     ((ptrdiff_t)recv_from * early_segcount) :
 489                     ((ptrdiff_t)recv_from * late_segcount + split_rank));
 490     block_count = ((recv_from < split_rank)? early_segcount : late_segcount);
 491     tmprecv = ((char*)rbuf) + (ptrdiff_t)block_offset * extent;
 492     ompi_op_reduce(op, inbuf[inbi], tmprecv, block_count, dtype);
 493 
 494     /* Distribution loop - variation of ring allgather */
 495     send_to = (rank + 1) % size;
 496     recv_from = (rank + size - 1) % size;
 497     for (k = 0; k < size - 1; k++) {
 498         const int recv_data_from = (rank + size - k) % size;
 499         const int send_data_from = (rank + 1 + size - k) % size;
 500         const int send_block_offset =
 501             ((send_data_from < split_rank)?
 502              ((ptrdiff_t)send_data_from * early_segcount) :
 503              ((ptrdiff_t)send_data_from * late_segcount + split_rank));
 504         const int recv_block_offset =
 505             ((recv_data_from < split_rank)?
 506              ((ptrdiff_t)recv_data_from * early_segcount) :
 507              ((ptrdiff_t)recv_data_from * late_segcount + split_rank));
 508         block_count = ((send_data_from < split_rank)?
 509                        early_segcount : late_segcount);
 510 
 511         tmprecv = (char*)rbuf + (ptrdiff_t)recv_block_offset * extent;
 512         tmpsend = (char*)rbuf + (ptrdiff_t)send_block_offset * extent;
 513 
 514         ret = ompi_coll_base_sendrecv(tmpsend, block_count, dtype, send_to,
 515                                        MCA_COLL_BASE_TAG_ALLREDUCE,
 516                                        tmprecv, max_segcount, dtype, recv_from,
 517                                        MCA_COLL_BASE_TAG_ALLREDUCE,
 518                                        comm, MPI_STATUS_IGNORE, rank);
 519         if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl;}
 520 
 521     }
 522 
 523     if (NULL != inbuf[0]) free(inbuf[0]);
 524     if (NULL != inbuf[1]) free(inbuf[1]);
 525 
 526     return MPI_SUCCESS;
 527 
 528  error_hndl:
 529     OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "%s:%4d\tRank %d Error occurred %d\n",
 530                  __FILE__, line, rank, ret));
 531     ompi_coll_base_free_reqs(reqs, 2);
 532     (void)line;  // silence compiler warning
 533     if (NULL != inbuf[0]) free(inbuf[0]);
 534     if (NULL != inbuf[1]) free(inbuf[1]);
 535     return ret;
 536 }
 537 
 538 /*
 539  *   ompi_coll_base_allreduce_intra_ring_segmented
 540  *
 541  *   Function:       Pipelined ring algorithm for allreduce operation
 542  *   Accepts:        Same as MPI_Allreduce(), segment size
 543  *   Returns:        MPI_SUCCESS or error code
 544  *
 545  *   Description:    Implements pipelined ring algorithm for allreduce:
 546  *                   user supplies suggested segment size for the pipelining of
 547  *                   reduce operation.
 548  *                   The segment size determines the number of phases, np, for
 549  *                   the algorithm execution.
 550  *                   The message is automatically divided into blocks of
 551  *                   approximately  (count / (np * segcount)) elements.
 552  *                   At the end of reduction phase, allgather like step is
 553  *                   executed.
 554  *                   Algorithm requires (np + 1)*(N - 1) steps.
 555  *
 556  *   Limitations:    The algorithm DOES NOT preserve order of operations so it
 557  *                   can be used only for commutative operations.
 558  *                   In addition, algorithm cannot work if the total size is
 559  *                   less than size * segment size.
 560  *         Example on 3 nodes with 2 phases
 561  *         Initial state
 562  *   #      0              1             2
 563  *        [00a]          [10a]         [20a]
 564  *        [00b]          [10b]         [20b]
 565  *        [01a]          [11a]         [21a]
 566  *        [01b]          [11b]         [21b]
 567  *        [02a]          [12a]         [22a]
 568  *        [02b]          [12b]         [22b]
 569  *
 570  *        COMPUTATION PHASE 0 (a)
 571  *         Step 0: rank r sends block ra to rank (r+1) and receives bloc (r-1)a
 572  *                 from rank (r-1) [with wraparound].
 573  *    #     0              1             2
 574  *        [00a]        [00a+10a]       [20a]
 575  *        [00b]          [10b]         [20b]
 576  *        [01a]          [11a]       [11a+21a]
 577  *        [01b]          [11b]         [21b]
 578  *      [22a+02a]        [12a]         [22a]
 579  *        [02b]          [12b]         [22b]
 580  *
 581  *         Step 1: rank r sends block (r-1)a to rank (r+1) and receives bloc
 582  *                 (r-2)a from rank (r-1) [with wraparound].
 583  *    #     0              1             2
 584  *        [00a]        [00a+10a]   [00a+10a+20a]
 585  *        [00b]          [10b]         [20b]
 586  *    [11a+21a+01a]      [11a]       [11a+21a]
 587  *        [01b]          [11b]         [21b]
 588  *      [22a+02a]    [22a+02a+12a]     [22a]
 589  *        [02b]          [12b]         [22b]
 590  *
 591  *        COMPUTATION PHASE 1 (b)
 592  *         Step 0: rank r sends block rb to rank (r+1) and receives bloc (r-1)b
 593  *                 from rank (r-1) [with wraparound].
 594  *    #     0              1             2
 595  *        [00a]        [00a+10a]       [20a]
 596  *        [00b]        [00b+10b]       [20b]
 597  *        [01a]          [11a]       [11a+21a]
 598  *        [01b]          [11b]       [11b+21b]
 599  *      [22a+02a]        [12a]         [22a]
 600  *      [22b+02b]        [12b]         [22b]
 601  *
 602  *         Step 1: rank r sends block (r-1)b to rank (r+1) and receives bloc
 603  *                 (r-2)b from rank (r-1) [with wraparound].
 604  *    #     0              1             2
 605  *        [00a]        [00a+10a]   [00a+10a+20a]
 606  *        [00b]          [10b]     [0bb+10b+20b]
 607  *    [11a+21a+01a]      [11a]       [11a+21a]
 608  *    [11b+21b+01b]      [11b]         [21b]
 609  *      [22a+02a]    [22a+02a+12a]     [22a]
 610  *        [02b]      [22b+01b+12b]     [22b]
 611  *
 612  *
 613  *        DISTRIBUTION PHASE: ring ALLGATHER with ranks shifted by 1 (same as
 614  *         in regular ring algorithm.
 615  *
 616  */
 617 int
 618 ompi_coll_base_allreduce_intra_ring_segmented(const void *sbuf, void *rbuf, int count,
 619                                                struct ompi_datatype_t *dtype,
 620                                                struct ompi_op_t *op,
 621                                                struct ompi_communicator_t *comm,
 622                                                mca_coll_base_module_t *module,
 623                                                uint32_t segsize)
 624 {
 625     int ret, line, rank, size, k, recv_from, send_to;
 626     int early_blockcount, late_blockcount, split_rank;
 627     int segcount, max_segcount, num_phases, phase, block_count, inbi;
 628     size_t typelng;
 629     char *tmpsend = NULL, *tmprecv = NULL, *inbuf[2] = {NULL, NULL};
 630     ptrdiff_t block_offset, max_real_segsize;
 631     ompi_request_t *reqs[2] = {MPI_REQUEST_NULL, MPI_REQUEST_NULL};
 632     ptrdiff_t lb, extent, gap;
 633 
 634     size = ompi_comm_size(comm);
 635     rank = ompi_comm_rank(comm);
 636 
 637     OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
 638                  "coll:base:allreduce_intra_ring_segmented rank %d, count %d", rank, count));
 639 
 640     /* Special case for size == 1 */
 641     if (1 == size) {
 642         if (MPI_IN_PLACE != sbuf) {
 643             ret = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf);
 644             if (ret < 0) { line = __LINE__; goto error_hndl; }
 645         }
 646         return MPI_SUCCESS;
 647     }
 648 
 649     /* Determine segment count based on the suggested segment size */
 650     ret = ompi_datatype_type_size( dtype, &typelng);
 651     if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
 652     segcount = count;
 653     COLL_BASE_COMPUTED_SEGCOUNT(segsize, typelng, segcount)
 654 
 655         /* Special case for count less than size * segcount - use regular ring */
 656         if (count < (size * segcount)) {
 657             OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "coll:base:allreduce_ring_segmented rank %d/%d, count %d, switching to regular ring", rank, size, count));
 658             return (ompi_coll_base_allreduce_intra_ring(sbuf, rbuf, count, dtype, op,
 659                                                          comm, module));
 660         }
 661 
 662     /* Determine the number of phases of the algorithm */
 663     num_phases = count / (size * segcount);
 664     if ((count % (size * segcount) >= size) &&
 665         (count % (size * segcount) > ((size * segcount) / 2))) {
 666         num_phases++;
 667     }
 668 
 669     /* Determine the number of elements per block and corresponding
 670        block sizes.
 671        The blocks are divided into "early" and "late" ones:
 672        blocks 0 .. (split_rank - 1) are "early" and
 673        blocks (split_rank) .. (size - 1) are "late".
 674        Early blocks are at most 1 element larger than the late ones.
 675        Note, these blocks will be split into num_phases segments,
 676        out of the largest one will have max_segcount elements.
 677     */
 678     COLL_BASE_COMPUTE_BLOCKCOUNT( count, size, split_rank,
 679                                    early_blockcount, late_blockcount );
 680     COLL_BASE_COMPUTE_BLOCKCOUNT( early_blockcount, num_phases, inbi,
 681                                    max_segcount, k);
 682 
 683     ret = ompi_datatype_get_extent(dtype, &lb, &extent);
 684     if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
 685      max_real_segsize = opal_datatype_span(&dtype->super, max_segcount, &gap);
 686 
 687     /* Allocate and initialize temporary buffers */
 688     inbuf[0] = (char*)malloc(max_real_segsize);
 689     if (NULL == inbuf[0]) { ret = -1; line = __LINE__; goto error_hndl; }
 690     if (size > 2) {
 691         inbuf[1] = (char*)malloc(max_real_segsize);
 692         if (NULL == inbuf[1]) { ret = -1; line = __LINE__; goto error_hndl; }
 693     }
 694 
 695     /* Handle MPI_IN_PLACE */
 696     if (MPI_IN_PLACE != sbuf) {
 697         ret = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf);
 698         if (ret < 0) { line = __LINE__; goto error_hndl; }
 699     }
 700 
 701     /* Computation loop: for each phase, repeat ring allreduce computation loop */
 702     for (phase = 0; phase < num_phases; phase ++) {
 703         ptrdiff_t phase_offset;
 704         int early_phase_segcount, late_phase_segcount, split_phase, phase_count;
 705 
 706         /*
 707            For each of the remote nodes:
 708            - post irecv for block (r-1)
 709            - send block (r)
 710            To do this, first compute block offset and count, and use block offset
 711            to compute phase offset.
 712            - in loop for every step k = 2 .. n
 713            - post irecv for block (r + n - k) % n
 714            - wait on block (r + n - k + 1) % n to arrive
 715            - compute on block (r + n - k + 1) % n
 716            - send block (r + n - k + 1) % n
 717            - wait on block (r + 1)
 718            - compute on block (r + 1)
 719            - send block (r + 1) to rank (r + 1)
 720            Note that we must be careful when computing the begining of buffers and
 721            for send operations and computation we must compute the exact block size.
 722         */
 723         send_to = (rank + 1) % size;
 724         recv_from = (rank + size - 1) % size;
 725 
 726         inbi = 0;
 727         /* Initialize first receive from the neighbor on the left */
 728         ret = MCA_PML_CALL(irecv(inbuf[inbi], max_segcount, dtype, recv_from,
 729                                  MCA_COLL_BASE_TAG_ALLREDUCE, comm, &reqs[inbi]));
 730         if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
 731         /* Send first block (my block) to the neighbor on the right:
 732            - compute my block and phase offset
 733            - send data */
 734         block_offset = ((rank < split_rank)?
 735                         ((ptrdiff_t)rank * (ptrdiff_t)early_blockcount) :
 736                         ((ptrdiff_t)rank * (ptrdiff_t)late_blockcount + split_rank));
 737         block_count = ((rank < split_rank)? early_blockcount : late_blockcount);
 738         COLL_BASE_COMPUTE_BLOCKCOUNT(block_count, num_phases, split_phase,
 739                                       early_phase_segcount, late_phase_segcount)
 740         phase_count = ((phase < split_phase)?
 741                        (early_phase_segcount) : (late_phase_segcount));
 742         phase_offset = ((phase < split_phase)?
 743                         ((ptrdiff_t)phase * (ptrdiff_t)early_phase_segcount) :
 744                         ((ptrdiff_t)phase * (ptrdiff_t)late_phase_segcount + split_phase));
 745         tmpsend = ((char*)rbuf) + (ptrdiff_t)(block_offset + phase_offset) * extent;
 746         ret = MCA_PML_CALL(send(tmpsend, phase_count, dtype, send_to,
 747                                 MCA_COLL_BASE_TAG_ALLREDUCE,
 748                                 MCA_PML_BASE_SEND_STANDARD, comm));
 749         if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
 750 
 751         for (k = 2; k < size; k++) {
 752             const int prevblock = (rank + size - k + 1) % size;
 753 
 754             inbi = inbi ^ 0x1;
 755 
 756             /* Post irecv for the current block */
 757             ret = MCA_PML_CALL(irecv(inbuf[inbi], max_segcount, dtype, recv_from,
 758                                      MCA_COLL_BASE_TAG_ALLREDUCE, comm,
 759                                      &reqs[inbi]));
 760             if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
 761 
 762             /* Wait on previous block to arrive */
 763             ret = ompi_request_wait(&reqs[inbi ^ 0x1], MPI_STATUS_IGNORE);
 764             if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
 765 
 766             /* Apply operation on previous block: result goes to rbuf
 767                rbuf[prevblock] = inbuf[inbi ^ 0x1] (op) rbuf[prevblock]
 768             */
 769             block_offset = ((prevblock < split_rank)?
 770                             ((ptrdiff_t)prevblock * (ptrdiff_t)early_blockcount) :
 771                             ((ptrdiff_t)prevblock * (ptrdiff_t)late_blockcount + split_rank));
 772             block_count = ((prevblock < split_rank)?
 773                            early_blockcount : late_blockcount);
 774             COLL_BASE_COMPUTE_BLOCKCOUNT(block_count, num_phases, split_phase,
 775                                           early_phase_segcount, late_phase_segcount)
 776                 phase_count = ((phase < split_phase)?
 777                                (early_phase_segcount) : (late_phase_segcount));
 778             phase_offset = ((phase < split_phase)?
 779                             ((ptrdiff_t)phase * (ptrdiff_t)early_phase_segcount) :
 780                             ((ptrdiff_t)phase * (ptrdiff_t)late_phase_segcount + split_phase));
 781             tmprecv = ((char*)rbuf) + (ptrdiff_t)(block_offset + phase_offset) * extent;
 782             ompi_op_reduce(op, inbuf[inbi ^ 0x1], tmprecv, phase_count, dtype);
 783 
 784             /* send previous block to send_to */
 785             ret = MCA_PML_CALL(send(tmprecv, phase_count, dtype, send_to,
 786                                     MCA_COLL_BASE_TAG_ALLREDUCE,
 787                                     MCA_PML_BASE_SEND_STANDARD, comm));
 788             if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
 789         }
 790 
 791         /* Wait on the last block to arrive */
 792         ret = ompi_request_wait(&reqs[inbi], MPI_STATUS_IGNORE);
 793         if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
 794 
 795         /* Apply operation on the last block (from neighbor (rank + 1)
 796            rbuf[rank+1] = inbuf[inbi] (op) rbuf[rank + 1] */
 797         recv_from = (rank + 1) % size;
 798         block_offset = ((recv_from < split_rank)?
 799                         ((ptrdiff_t)recv_from * (ptrdiff_t)early_blockcount) :
 800                         ((ptrdiff_t)recv_from * (ptrdiff_t)late_blockcount + split_rank));
 801         block_count = ((recv_from < split_rank)?
 802                        early_blockcount : late_blockcount);
 803         COLL_BASE_COMPUTE_BLOCKCOUNT(block_count, num_phases, split_phase,
 804                                       early_phase_segcount, late_phase_segcount)
 805             phase_count = ((phase < split_phase)?
 806                            (early_phase_segcount) : (late_phase_segcount));
 807         phase_offset = ((phase < split_phase)?
 808                         ((ptrdiff_t)phase * (ptrdiff_t)early_phase_segcount) :
 809                         ((ptrdiff_t)phase * (ptrdiff_t)late_phase_segcount + split_phase));
 810         tmprecv = ((char*)rbuf) + (ptrdiff_t)(block_offset + phase_offset) * extent;
 811         ompi_op_reduce(op, inbuf[inbi], tmprecv, phase_count, dtype);
 812     }
 813 
 814     /* Distribution loop - variation of ring allgather */
 815     send_to = (rank + 1) % size;
 816     recv_from = (rank + size - 1) % size;
 817     for (k = 0; k < size - 1; k++) {
 818         const int recv_data_from = (rank + size - k) % size;
 819         const int send_data_from = (rank + 1 + size - k) % size;
 820         const int send_block_offset =
 821             ((send_data_from < split_rank)?
 822              ((ptrdiff_t)send_data_from * (ptrdiff_t)early_blockcount) :
 823              ((ptrdiff_t)send_data_from * (ptrdiff_t)late_blockcount + split_rank));
 824         const int recv_block_offset =
 825             ((recv_data_from < split_rank)?
 826              ((ptrdiff_t)recv_data_from * (ptrdiff_t)early_blockcount) :
 827              ((ptrdiff_t)recv_data_from * (ptrdiff_t)late_blockcount + split_rank));
 828         block_count = ((send_data_from < split_rank)?
 829                        early_blockcount : late_blockcount);
 830 
 831         tmprecv = (char*)rbuf + (ptrdiff_t)recv_block_offset * extent;
 832         tmpsend = (char*)rbuf + (ptrdiff_t)send_block_offset * extent;
 833 
 834         ret = ompi_coll_base_sendrecv(tmpsend, block_count, dtype, send_to,
 835                                        MCA_COLL_BASE_TAG_ALLREDUCE,
 836                                        tmprecv, early_blockcount, dtype, recv_from,
 837                                        MCA_COLL_BASE_TAG_ALLREDUCE,
 838                                        comm, MPI_STATUS_IGNORE, rank);
 839         if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl;}
 840 
 841     }
 842 
 843     if (NULL != inbuf[0]) free(inbuf[0]);
 844     if (NULL != inbuf[1]) free(inbuf[1]);
 845 
 846     return MPI_SUCCESS;
 847 
 848  error_hndl:
 849     OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "%s:%4d\tRank %d Error occurred %d\n",
 850                  __FILE__, line, rank, ret));
 851     ompi_coll_base_free_reqs(reqs, 2);
 852     (void)line;  // silence compiler warning
 853     if (NULL != inbuf[0]) free(inbuf[0]);
 854     if (NULL != inbuf[1]) free(inbuf[1]);
 855     return ret;
 856 }
 857 
 858 /*
 859  * Linear functions are copied from the BASIC coll module
 860  * they do not segment the message and are simple implementations
 861  * but for some small number of nodes and/or small data sizes they
 862  * are just as fast as base/tree based segmenting operations
 863  * and as such may be selected by the decision functions
 864  * These are copied into this module due to the way we select modules
 865  * in V1. i.e. in V2 we will handle this differently and so will not
 866  * have to duplicate code.
 867  * GEF Oct05 after asking Jeff.
 868  */
 869 
 870 /* copied function (with appropriate renaming) starts here */
 871 
 872 
 873 /*
 874  *      allreduce_intra
 875  *
 876  *      Function:       - allreduce using other MPI collectives
 877  *      Accepts:        - same as MPI_Allreduce()
 878  *      Returns:        - MPI_SUCCESS or error code
 879  */
 880 int
 881 ompi_coll_base_allreduce_intra_basic_linear(const void *sbuf, void *rbuf, int count,
 882                                              struct ompi_datatype_t *dtype,
 883                                              struct ompi_op_t *op,
 884                                              struct ompi_communicator_t *comm,
 885                                              mca_coll_base_module_t *module)
 886 {
 887     int err, rank;
 888 
 889     rank = ompi_comm_rank(comm);
 890 
 891     OPAL_OUTPUT((ompi_coll_base_framework.framework_output,"coll:base:allreduce_intra_basic_linear rank %d", rank));
 892 
 893     /* Reduce to 0 and broadcast. */
 894 
 895     if (MPI_IN_PLACE == sbuf) {
 896         if (0 == rank) {
 897             err = ompi_coll_base_reduce_intra_basic_linear (MPI_IN_PLACE, rbuf, count, dtype,
 898                                                              op, 0, comm, module);
 899         } else {
 900             err = ompi_coll_base_reduce_intra_basic_linear(rbuf, NULL, count, dtype,
 901                                                             op, 0, comm, module);
 902         }
 903     } else {
 904         err = ompi_coll_base_reduce_intra_basic_linear(sbuf, rbuf, count, dtype,
 905                                                         op, 0, comm, module);
 906     }
 907     if (MPI_SUCCESS != err) {
 908         return err;
 909     }
 910 
 911     return ompi_coll_base_bcast_intra_basic_linear(rbuf, count, dtype, 0, comm, module);
 912 }
 913 
 914 /*
 915  * ompi_coll_base_allreduce_intra_redscat_allgather
 916  *
 917  * Function:  Allreduce using Rabenseifner's algorithm.
 918  * Accepts:   Same arguments as MPI_Allreduce
 919  * Returns:   MPI_SUCCESS or error code
 920  *
 921  * Description: an implementation of Rabenseifner's allreduce algorithm [1, 2].
 922  *   [1] Rajeev Thakur, Rolf Rabenseifner and William Gropp.
 923  *       Optimization of Collective Communication Operations in MPICH //
 924  *       The Int. Journal of High Performance Computing Applications. Vol 19,
 925  *       Issue 1, pp. 49--66.
 926  *   [2] http://www.hlrs.de/mpi/myreduce.html.
 927  *
 928  * This algorithm is a combination of a reduce-scatter implemented with
 929  * recursive vector halving and recursive distance doubling, followed either
 930  * by an allgather implemented with recursive doubling [1].
 931  *
 932  * Step 1. If the number of processes is not a power of two, reduce it to
 933  * the nearest lower power of two (p' = 2^{\floor{\log_2 p}})
 934  * by removing r = p - p' extra processes as follows. In the first 2r processes
 935  * (ranks 0 to 2r - 1), all the even ranks send the second half of the input
 936  * vector to their right neighbor (rank + 1), and all the odd ranks send
 937  * the first half of the input vector to their left neighbor (rank - 1).
 938  * The even ranks compute the reduction on the first half of the vector and
 939  * the odd ranks compute the reduction on the second half. The odd ranks then
 940  * send the result to their left neighbors (the even ranks). As a result,
 941  * the even ranks among the first 2r processes now contain the reduction with
 942  * the input vector on their right neighbors (the odd ranks). These odd ranks
 943  * do not participate in the rest of the algorithm, which leaves behind
 944  * a power-of-two number of processes. The first r even-ranked processes and
 945  * the last p - 2r processes are now renumbered from 0 to p' - 1.
 946  *
 947  * Step 2. The remaining processes now perform a reduce-scatter by using
 948  * recursive vector halving and recursive distance doubling. The even-ranked
 949  * processes send the second half of their buffer to rank + 1 and the odd-ranked
 950  * processes send the first half of their buffer to rank - 1. All processes
 951  * then compute the reduction between the local buffer and the received buffer.
 952  * In the next log_2(p') - 1 steps, the buffers are recursively halved, and the
 953  * distance is doubled. At the end, each of the p' processes has 1 / p' of the
 954  * total reduction result.
 955  *
 956  * Step 3. An allgather is performed by using recursive vector doubling and
 957  * distance halving. All exchanges are executed in reverse order relative
 958  * to recursive doubling on previous step. If the number of processes is not
 959  * a power of two, the total result vector must be sent to the r processes
 960  * that were removed in the first step.
 961  *
 962  * Limitations:
 963  *   count >= 2^{\floor{\log_2 p}}
 964  *   commutative operations only
 965  *   intra-communicators only
 966  *
 967  * Memory requirements (per process):
 968  *   count * typesize + 4 * \log_2(p) * sizeof(int) = O(count)
 969  */
 970 int ompi_coll_base_allreduce_intra_redscat_allgather(
 971     const void *sbuf, void *rbuf, int count, struct ompi_datatype_t *dtype,
 972     struct ompi_op_t *op, struct ompi_communicator_t *comm,
 973     mca_coll_base_module_t *module)
 974 {
 975     int *rindex = NULL, *rcount = NULL, *sindex = NULL, *scount = NULL;
 976 
 977     int comm_size = ompi_comm_size(comm);
 978     int rank = ompi_comm_rank(comm);
 979     OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
 980                  "coll:base:allreduce_intra_redscat_allgather: rank %d/%d",
 981                  rank, comm_size));
 982 
 983     /* Find nearest power-of-two less than or equal to comm_size */
 984     int nsteps = opal_hibit(comm_size, comm->c_cube_dim + 1);   /* ilog2(comm_size) */
 985     assert(nsteps >= 0);
 986     int nprocs_pof2 = 1 << nsteps;                              /* flp2(comm_size) */
 987 
 988     if (count < nprocs_pof2 || !ompi_op_is_commute(op)) {
 989         OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
 990                      "coll:base:allreduce_intra_redscat_allgather: rank %d/%d "
 991                      "count %d switching to basic linear allreduce",
 992                      rank, comm_size, count));
 993         return ompi_coll_base_allreduce_intra_basic_linear(sbuf, rbuf, count, dtype,
 994                                                            op, comm, module);
 995     }
 996 
 997     int err = MPI_SUCCESS;
 998     ptrdiff_t lb, extent, dsize, gap = 0;
 999     ompi_datatype_get_extent(dtype, &lb, &extent);
1000     dsize = opal_datatype_span(&dtype->super, count, &gap);
1001 
1002     /* Temporary buffer for receiving messages */
1003     char *tmp_buf = NULL;
1004     char *tmp_buf_raw = (char *)malloc(dsize);
1005     if (NULL == tmp_buf_raw)
1006         return OMPI_ERR_OUT_OF_RESOURCE;
1007     tmp_buf = tmp_buf_raw - gap;
1008 
1009     if (sbuf != MPI_IN_PLACE) {
1010         err = ompi_datatype_copy_content_same_ddt(dtype, count, (char *)rbuf,
1011                                                   (char *)sbuf);
1012         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
1013     }
1014 
1015     /*
1016      * Step 1. Reduce the number of processes to the nearest lower power of two
1017      * p' = 2^{\floor{\log_2 p}} by removing r = p - p' processes.
1018      * 1. In the first 2r processes (ranks 0 to 2r - 1), all the even ranks send
1019      *    the second half of the input vector to their right neighbor (rank + 1)
1020      *    and all the odd ranks send the first half of the input vector to their
1021      *    left neighbor (rank - 1).
1022      * 2. All 2r processes compute the reduction on their half.
1023      * 3. The odd ranks then send the result to their left neighbors
1024      *    (the even ranks).
1025      *
1026      * The even ranks (0 to 2r - 1) now contain the reduction with the input
1027      * vector on their right neighbors (the odd ranks). The first r even
1028      * processes and the p - 2r last processes are renumbered from
1029      * 0 to 2^{\floor{\log_2 p}} - 1.
1030      */
1031 
1032     int vrank, step, wsize;
1033     int nprocs_rem = comm_size - nprocs_pof2;
1034 
1035     if (rank < 2 * nprocs_rem) {
1036         int count_lhalf = count / 2;
1037         int count_rhalf = count - count_lhalf;
1038 
1039         if (rank % 2 != 0) {
1040             /*
1041              * Odd process -- exchange with rank - 1
1042              * Send the left half of the input vector to the left neighbor,
1043              * Recv the right half of the input vector from the left neighbor
1044              */
1045             err = ompi_coll_base_sendrecv(rbuf, count_lhalf, dtype, rank - 1,
1046                                           MCA_COLL_BASE_TAG_ALLREDUCE,
1047                                           (char *)tmp_buf + (ptrdiff_t)count_lhalf * extent,
1048                                           count_rhalf, dtype, rank - 1,
1049                                           MCA_COLL_BASE_TAG_ALLREDUCE, comm,
1050                                           MPI_STATUS_IGNORE, rank);
1051             if (MPI_SUCCESS != err) { goto cleanup_and_return; }
1052 
1053             /* Reduce on the right half of the buffers (result in rbuf) */
1054             ompi_op_reduce(op, (char *)tmp_buf + (ptrdiff_t)count_lhalf * extent,
1055                            (char *)rbuf + count_lhalf * extent, count_rhalf, dtype);
1056 
1057             /* Send the right half to the left neighbor */
1058             err = MCA_PML_CALL(send((char *)rbuf + (ptrdiff_t)count_lhalf * extent,
1059                                     count_rhalf, dtype, rank - 1,
1060                                     MCA_COLL_BASE_TAG_ALLREDUCE,
1061                                     MCA_PML_BASE_SEND_STANDARD, comm));
1062             if (MPI_SUCCESS != err) { goto cleanup_and_return; }
1063 
1064             /* This process does not pariticipate in recursive doubling phase */
1065             vrank = -1;
1066 
1067         } else {
1068             /*
1069              * Even process -- exchange with rank + 1
1070              * Send the right half of the input vector to the right neighbor,
1071              * Recv the left half of the input vector from the right neighbor
1072              */
1073             err = ompi_coll_base_sendrecv((char *)rbuf + (ptrdiff_t)count_lhalf * extent,
1074                                           count_rhalf, dtype, rank + 1,
1075                                           MCA_COLL_BASE_TAG_ALLREDUCE,
1076                                           tmp_buf, count_lhalf, dtype, rank + 1,
1077                                           MCA_COLL_BASE_TAG_ALLREDUCE, comm,
1078                                           MPI_STATUS_IGNORE, rank);
1079             if (MPI_SUCCESS != err) { goto cleanup_and_return; }
1080 
1081             /* Reduce on the right half of the buffers (result in rbuf) */
1082             ompi_op_reduce(op, tmp_buf, rbuf, count_lhalf, dtype);
1083 
1084             /* Recv the right half from the right neighbor */
1085             err = MCA_PML_CALL(recv((char *)rbuf + (ptrdiff_t)count_lhalf * extent,
1086                                     count_rhalf, dtype, rank + 1,
1087                                     MCA_COLL_BASE_TAG_ALLREDUCE, comm,
1088                                     MPI_STATUS_IGNORE));
1089             if (MPI_SUCCESS != err) { goto cleanup_and_return; }
1090 
1091             vrank = rank / 2;
1092         }
1093     } else { /* rank >= 2 * nprocs_rem */
1094         vrank = rank - nprocs_rem;
1095     }
1096 
1097     /*
1098      * Step 2. Reduce-scatter implemented with recursive vector halving and
1099      * recursive distance doubling. We have p' = 2^{\floor{\log_2 p}}
1100      * power-of-two number of processes with new ranks (vrank) and result in rbuf.
1101      *
1102      * The even-ranked processes send the right half of their buffer to rank + 1
1103      * and the odd-ranked processes send the left half of their buffer to
1104      * rank - 1. All processes then compute the reduction between the local
1105      * buffer and the received buffer. In the next \log_2(p') - 1 steps, the
1106      * buffers are recursively halved, and the distance is doubled. At the end,
1107      * each of the p' processes has 1 / p' of the total reduction result.
1108      */
1109     rindex = malloc(sizeof(*rindex) * nsteps);
1110     sindex = malloc(sizeof(*sindex) * nsteps);
1111     rcount = malloc(sizeof(*rcount) * nsteps);
1112     scount = malloc(sizeof(*scount) * nsteps);
1113     if (NULL == rindex || NULL == sindex || NULL == rcount || NULL == scount) {
1114         err = OMPI_ERR_OUT_OF_RESOURCE;
1115         goto cleanup_and_return;
1116     }
1117 
1118     if (vrank != -1) {
1119         step = 0;
1120         wsize = count;
1121         sindex[0] = rindex[0] = 0;
1122 
1123         for (int mask = 1; mask < nprocs_pof2; mask <<= 1) {
1124             /*
1125              * On each iteration: rindex[step] = sindex[step] -- begining of the
1126              * current window. Length of the current window is storded in wsize.
1127              */
1128             int vdest = vrank ^ mask;
1129             /* Translate vdest virtual rank to real rank */
1130             int dest = (vdest < nprocs_rem) ? vdest * 2 : vdest + nprocs_rem;
1131 
1132             if (rank < dest) {
1133                 /*
1134                  * Recv into the left half of the current window, send the right
1135                  * half of the window to the peer (perform reduce on the left
1136                  * half of the current window)
1137                  */
1138                 rcount[step] = wsize / 2;
1139                 scount[step] = wsize - rcount[step];
1140                 sindex[step] = rindex[step] + rcount[step];
1141             } else {
1142                 /*
1143                  * Recv into the right half of the current window, send the left
1144                  * half of the window to the peer (perform reduce on the right
1145                  * half of the current window)
1146                  */
1147                 scount[step] = wsize / 2;
1148                 rcount[step] = wsize - scount[step];
1149                 rindex[step] = sindex[step] + scount[step];
1150             }
1151 
1152             /* Send part of data from the rbuf, recv into the tmp_buf */
1153             err = ompi_coll_base_sendrecv((char *)rbuf + (ptrdiff_t)sindex[step] * extent,
1154                                           scount[step], dtype, dest,
1155                                           MCA_COLL_BASE_TAG_ALLREDUCE,
1156                                           (char *)tmp_buf + (ptrdiff_t)rindex[step] * extent,
1157                                           rcount[step], dtype, dest,
1158                                           MCA_COLL_BASE_TAG_ALLREDUCE, comm,
1159                                           MPI_STATUS_IGNORE, rank);
1160             if (MPI_SUCCESS != err) { goto cleanup_and_return; }
1161 
1162             /* Local reduce: rbuf[] = tmp_buf[] <op> rbuf[] */
1163             ompi_op_reduce(op, (char *)tmp_buf + (ptrdiff_t)rindex[step] * extent,
1164                            (char *)rbuf + (ptrdiff_t)rindex[step] * extent,
1165                            rcount[step], dtype);
1166 
1167             /* Move the current window to the received message */
1168             if (step + 1 < nsteps) {
1169                 rindex[step + 1] = rindex[step];
1170                 sindex[step + 1] = rindex[step];
1171                 wsize = rcount[step];
1172                 step++;
1173             }
1174         }
1175         /*
1176          * Assertion: each process has 1 / p' of the total reduction result:
1177          * rcount[nsteps - 1] elements in the rbuf[rindex[nsteps - 1], ...].
1178          */
1179 
1180         /*
1181          * Step 3. Allgather by the recursive doubling algorithm.
1182          * Each process has 1 / p' of the total reduction result:
1183          * rcount[nsteps - 1] elements in the rbuf[rindex[nsteps - 1], ...].
1184          * All exchanges are executed in reverse order relative
1185          * to recursive doubling (previous step).
1186          */
1187 
1188         step = nsteps - 1;
1189 
1190         for (int mask = nprocs_pof2 >> 1; mask > 0; mask >>= 1) {
1191             int vdest = vrank ^ mask;
1192             /* Translate vdest virtual rank to real rank */
1193             int dest = (vdest < nprocs_rem) ? vdest * 2 : vdest + nprocs_rem;
1194 
1195             /*
1196              * Send rcount[step] elements from rbuf[rindex[step]...]
1197              * Recv scount[step] elements to rbuf[sindex[step]...]
1198              */
1199             err = ompi_coll_base_sendrecv((char *)rbuf + (ptrdiff_t)rindex[step] * extent,
1200                                           rcount[step], dtype, dest,
1201                                           MCA_COLL_BASE_TAG_ALLREDUCE,
1202                                           (char *)rbuf + (ptrdiff_t)sindex[step] * extent,
1203                                           scount[step], dtype, dest,
1204                                           MCA_COLL_BASE_TAG_ALLREDUCE, comm,
1205                                           MPI_STATUS_IGNORE, rank);
1206             if (MPI_SUCCESS != err) { goto cleanup_and_return; }
1207             step--;
1208         }
1209     }
1210 
1211     /*
1212      * Step 4. Send total result to excluded odd ranks.
1213      */
1214     if (rank < 2 * nprocs_rem) {
1215         if (rank % 2 != 0) {
1216             /* Odd process -- recv result from rank - 1 */
1217             err = MCA_PML_CALL(recv(rbuf, count, dtype, rank - 1,
1218                                     MCA_COLL_BASE_TAG_ALLREDUCE, comm,
1219                                     MPI_STATUS_IGNORE));
1220             if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
1221 
1222         } else {
1223             /* Even process -- send result to rank + 1 */
1224             err = MCA_PML_CALL(send(rbuf, count, dtype, rank + 1,
1225                                     MCA_COLL_BASE_TAG_ALLREDUCE,
1226                                     MCA_PML_BASE_SEND_STANDARD, comm));
1227             if (MPI_SUCCESS != err) { goto cleanup_and_return; }
1228         }
1229     }
1230 
1231   cleanup_and_return:
1232     if (NULL != tmp_buf_raw)
1233         free(tmp_buf_raw);
1234     if (NULL != rindex)
1235         free(rindex);
1236     if (NULL != sindex)
1237         free(sindex);
1238     if (NULL != rcount)
1239         free(rcount);
1240     if (NULL != scount)
1241         free(scount);
1242     return err;
1243 }
1244 
1245 /* copied function (with appropriate renaming) ends here */

/* [<][>][^][v][top][bottom][index][help] */