root/ompi/mca/coll/base/coll_base_reduce_scatter.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. ompi_coll_base_reduce_scatter_intra_nonoverlapping
  2. ompi_coll_base_reduce_scatter_intra_basic_recursivehalving
  3. ompi_coll_base_reduce_scatter_intra_ring
  4. ompi_sum_counts
  5. ompi_coll_base_reduce_scatter_intra_butterfly

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   4  *                         University Research and Technology
   5  *                         Corporation.  All rights reserved.
   6  * Copyright (c) 2004-2017 The University of Tennessee and The University
   7  *                         of Tennessee Research Foundation.  All rights
   8  *                         reserved.
   9  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
  10  *                         University of Stuttgart.  All rights reserved.
  11  * Copyright (c) 2004-2005 The Regents of the University of California.
  12  *                         All rights reserved.
  13  * Copyright (c) 2008      Sun Microsystems, Inc.  All rights reserved.
  14  * Copyright (c) 2009      University of Houston. All rights reserved.
  15  * Copyright (c) 2013      Los Alamos National Security, LLC. All rights
  16  *                         reserved.
  17  * Copyright (c) 2015-2016 Research Organization for Information Science
  18  *                         and Technology (RIST). All rights reserved.
  19  * $COPYRIGHT$
  20  *
  21  * Additional copyrights may follow
  22  *
  23  * $HEADER$
  24  */
  25 
  26 #include "ompi_config.h"
  27 
  28 #include "mpi.h"
  29 #include "opal/util/bit_ops.h"
  30 #include "ompi/constants.h"
  31 #include "ompi/datatype/ompi_datatype.h"
  32 #include "ompi/communicator/communicator.h"
  33 #include "ompi/mca/coll/coll.h"
  34 #include "ompi/mca/coll/base/coll_tags.h"
  35 #include "ompi/mca/pml/pml.h"
  36 #include "ompi/op/op.h"
  37 #include "ompi/mca/coll/base/coll_base_functions.h"
  38 #include "coll_base_topo.h"
  39 #include "coll_base_util.h"
  40 
  41 /*******************************************************************************
  42  * ompi_coll_base_reduce_scatter_intra_nonoverlapping
  43  *
  44  * This function just calls a reduce to rank 0, followed by an
  45  * appropriate scatterv call.
  46  */
  47 int ompi_coll_base_reduce_scatter_intra_nonoverlapping(const void *sbuf, void *rbuf,
  48                                                         const int *rcounts,
  49                                                         struct ompi_datatype_t *dtype,
  50                                                         struct ompi_op_t *op,
  51                                                         struct ompi_communicator_t *comm,
  52                                                         mca_coll_base_module_t *module)
  53 {
  54     int err, i, rank, size, total_count, *displs = NULL;
  55     const int root = 0;
  56     char *tmprbuf = NULL, *tmprbuf_free = NULL;
  57 
  58     rank = ompi_comm_rank(comm);
  59     size = ompi_comm_size(comm);
  60 
  61     OPAL_OUTPUT((ompi_coll_base_framework.framework_output,"coll:base:reduce_scatter_intra_nonoverlapping, rank %d", rank));
  62 
  63     for (i = 0, total_count = 0; i < size; i++) { total_count += rcounts[i]; }
  64 
  65     /* Reduce to rank 0 (root) and scatterv */
  66     tmprbuf = (char*) rbuf;
  67     if (MPI_IN_PLACE == sbuf) {
  68         /* rbuf on root (0) is big enough to hold whole data */
  69         if (root == rank) {
  70             err = comm->c_coll->coll_reduce (MPI_IN_PLACE, tmprbuf, total_count,
  71                                             dtype, op, root, comm, comm->c_coll->coll_reduce_module);
  72         } else {
  73             err = comm->c_coll->coll_reduce(tmprbuf, NULL, total_count,
  74                                            dtype, op, root, comm, comm->c_coll->coll_reduce_module);
  75         }
  76     } else {
  77         if (root == rank) {
  78             /* We must allocate temporary receive buffer on root to ensure that
  79                rbuf is big enough */
  80             ptrdiff_t dsize, gap = 0;
  81             dsize = opal_datatype_span(&dtype->super, total_count, &gap);
  82 
  83             tmprbuf_free = (char*) malloc(dsize);
  84             tmprbuf = tmprbuf_free - gap;
  85         }
  86         err = comm->c_coll->coll_reduce (sbuf, tmprbuf, total_count,
  87                                         dtype, op, root, comm, comm->c_coll->coll_reduce_module);
  88     }
  89     if (MPI_SUCCESS != err) {
  90         if (NULL != tmprbuf_free) free(tmprbuf_free);
  91         return err;
  92     }
  93 
  94     displs = (int*) malloc(size * sizeof(int));
  95     displs[0] = 0;
  96     for (i = 1; i < size; i++) {
  97         displs[i] = displs[i-1] + rcounts[i-1];
  98     }
  99     if (MPI_IN_PLACE == sbuf && root == rank) {
 100         err =  comm->c_coll->coll_scatterv (tmprbuf, rcounts, displs, dtype,
 101                                            MPI_IN_PLACE, 0, MPI_DATATYPE_NULL,
 102                                            root, comm, comm->c_coll->coll_scatterv_module);
 103     } else {
 104         err =  comm->c_coll->coll_scatterv (tmprbuf, rcounts, displs, dtype,
 105                                            rbuf, rcounts[rank], dtype,
 106                                            root, comm, comm->c_coll->coll_scatterv_module);
 107     }
 108     free(displs);
 109     if (NULL != tmprbuf_free) free(tmprbuf_free);
 110 
 111     return err;
 112 }
 113 
 114 /*
 115  * Recursive-halving function is (*mostly*) copied from the BASIC coll module.
 116  * I have removed the part which handles "large" message sizes
 117  * (non-overlapping version of reduce_Scatter).
 118  */
 119 
 120 /* copied function (with appropriate renaming) starts here */
 121 
 122 /*
 123  *  reduce_scatter_intra_basic_recursivehalving
 124  *
 125  *  Function:   - reduce scatter implementation using recursive-halving
 126  *                algorithm
 127  *  Accepts:    - same as MPI_Reduce_scatter()
 128  *  Returns:    - MPI_SUCCESS or error code
 129  *  Limitation: - Works only for commutative operations.
 130  */
 131 int
 132 ompi_coll_base_reduce_scatter_intra_basic_recursivehalving( const void *sbuf,
 133                                                             void *rbuf,
 134                                                             const int *rcounts,
 135                                                             struct ompi_datatype_t *dtype,
 136                                                             struct ompi_op_t *op,
 137                                                             struct ompi_communicator_t *comm,
 138                                                             mca_coll_base_module_t *module)
 139 {
 140     int i, rank, size, count, err = OMPI_SUCCESS;
 141     int tmp_size, remain = 0, tmp_rank, *disps = NULL;
 142     ptrdiff_t extent, buf_size, gap = 0;
 143     char *recv_buf = NULL, *recv_buf_free = NULL;
 144     char *result_buf = NULL, *result_buf_free = NULL;
 145 
 146     /* Initialize */
 147     rank = ompi_comm_rank(comm);
 148     size = ompi_comm_size(comm);
 149 
 150     OPAL_OUTPUT((ompi_coll_base_framework.framework_output,"coll:base:reduce_scatter_intra_basic_recursivehalving, rank %d", rank));
 151 
 152     /* Find displacements and the like */
 153     disps = (int*) malloc(sizeof(int) * size);
 154     if (NULL == disps) return OMPI_ERR_OUT_OF_RESOURCE;
 155 
 156     disps[0] = 0;
 157     for (i = 0; i < (size - 1); ++i) {
 158         disps[i + 1] = disps[i] + rcounts[i];
 159     }
 160     count = disps[size - 1] + rcounts[size - 1];
 161 
 162     /* short cut the trivial case */
 163     if (0 == count) {
 164         free(disps);
 165         return OMPI_SUCCESS;
 166     }
 167 
 168     /* get datatype information */
 169     ompi_datatype_type_extent(dtype, &extent);
 170     buf_size = opal_datatype_span(&dtype->super, count, &gap);
 171 
 172     /* Handle MPI_IN_PLACE */
 173     if (MPI_IN_PLACE == sbuf) {
 174         sbuf = rbuf;
 175     }
 176 
 177     /* Allocate temporary receive buffer. */
 178     recv_buf_free = (char*) malloc(buf_size);
 179     recv_buf = recv_buf_free - gap;
 180     if (NULL == recv_buf_free) {
 181         err = OMPI_ERR_OUT_OF_RESOURCE;
 182         goto cleanup;
 183     }
 184 
 185     /* allocate temporary buffer for results */
 186     result_buf_free = (char*) malloc(buf_size);
 187     result_buf = result_buf_free - gap;
 188 
 189     /* copy local buffer into the temporary results */
 190     err = ompi_datatype_sndrcv(sbuf, count, dtype, result_buf, count, dtype);
 191     if (OMPI_SUCCESS != err) goto cleanup;
 192 
 193     /* figure out power of two mapping: grow until larger than
 194        comm size, then go back one, to get the largest power of
 195        two less than comm size */
 196     tmp_size = opal_next_poweroftwo (size);
 197     tmp_size >>= 1;
 198     remain = size - tmp_size;
 199 
 200     /* If comm size is not a power of two, have the first "remain"
 201        procs with an even rank send to rank + 1, leaving a power of
 202        two procs to do the rest of the algorithm */
 203     if (rank < 2 * remain) {
 204         if ((rank & 1) == 0) {
 205             err = MCA_PML_CALL(send(result_buf, count, dtype, rank + 1,
 206                                     MCA_COLL_BASE_TAG_REDUCE_SCATTER,
 207                                     MCA_PML_BASE_SEND_STANDARD,
 208                                     comm));
 209             if (OMPI_SUCCESS != err) goto cleanup;
 210 
 211             /* we don't participate from here on out */
 212             tmp_rank = -1;
 213         } else {
 214             err = MCA_PML_CALL(recv(recv_buf, count, dtype, rank - 1,
 215                                     MCA_COLL_BASE_TAG_REDUCE_SCATTER,
 216                                     comm, MPI_STATUS_IGNORE));
 217 
 218             /* integrate their results into our temp results */
 219             ompi_op_reduce(op, recv_buf, result_buf, count, dtype);
 220 
 221             /* adjust rank to be the bottom "remain" ranks */
 222             tmp_rank = rank / 2;
 223         }
 224     } else {
 225         /* just need to adjust rank to show that the bottom "even
 226            remain" ranks dropped out */
 227         tmp_rank = rank - remain;
 228     }
 229 
 230     /* For ranks not kicked out by the above code, perform the
 231        recursive halving */
 232     if (tmp_rank >= 0) {
 233         int *tmp_disps = NULL, *tmp_rcounts = NULL;
 234         int mask, send_index, recv_index, last_index;
 235 
 236         /* recalculate disps and rcounts to account for the
 237            special "remainder" processes that are no longer doing
 238            anything */
 239         tmp_rcounts = (int*) malloc(tmp_size * sizeof(int));
 240         if (NULL == tmp_rcounts) {
 241             err = OMPI_ERR_OUT_OF_RESOURCE;
 242             goto cleanup;
 243         }
 244         tmp_disps = (int*) malloc(tmp_size * sizeof(int));
 245         if (NULL == tmp_disps) {
 246             free(tmp_rcounts);
 247             err = OMPI_ERR_OUT_OF_RESOURCE;
 248             goto cleanup;
 249         }
 250 
 251         for (i = 0 ; i < tmp_size ; ++i) {
 252             if (i < remain) {
 253                 /* need to include old neighbor as well */
 254                 tmp_rcounts[i] = rcounts[i * 2 + 1] + rcounts[i * 2];
 255             } else {
 256                 tmp_rcounts[i] = rcounts[i + remain];
 257             }
 258         }
 259 
 260         tmp_disps[0] = 0;
 261         for (i = 0; i < tmp_size - 1; ++i) {
 262             tmp_disps[i + 1] = tmp_disps[i] + tmp_rcounts[i];
 263         }
 264 
 265         /* do the recursive halving communication.  Don't use the
 266            dimension information on the communicator because I
 267            think the information is invalidated by our "shrinking"
 268            of the communicator */
 269         mask = tmp_size >> 1;
 270         send_index = recv_index = 0;
 271         last_index = tmp_size;
 272         while (mask > 0) {
 273             int tmp_peer, peer, send_count, recv_count;
 274             struct ompi_request_t *request;
 275 
 276             tmp_peer = tmp_rank ^ mask;
 277             peer = (tmp_peer < remain) ? tmp_peer * 2 + 1 : tmp_peer + remain;
 278 
 279             /* figure out if we're sending, receiving, or both */
 280             send_count = recv_count = 0;
 281             if (tmp_rank < tmp_peer) {
 282                 send_index = recv_index + mask;
 283                 for (i = send_index ; i < last_index ; ++i) {
 284                     send_count += tmp_rcounts[i];
 285                 }
 286                 for (i = recv_index ; i < send_index ; ++i) {
 287                     recv_count += tmp_rcounts[i];
 288                 }
 289             } else {
 290                 recv_index = send_index + mask;
 291                 for (i = send_index ; i < recv_index ; ++i) {
 292                     send_count += tmp_rcounts[i];
 293                 }
 294                 for (i = recv_index ; i < last_index ; ++i) {
 295                     recv_count += tmp_rcounts[i];
 296                 }
 297             }
 298 
 299             /* actual data transfer.  Send from result_buf,
 300                receive into recv_buf */
 301             if (recv_count > 0) {
 302                 err = MCA_PML_CALL(irecv(recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
 303                                          recv_count, dtype, peer,
 304                                          MCA_COLL_BASE_TAG_REDUCE_SCATTER,
 305                                          comm, &request));
 306                 if (OMPI_SUCCESS != err) {
 307                     free(tmp_rcounts);
 308                     free(tmp_disps);
 309                     goto cleanup;
 310                 }
 311             }
 312             if (send_count > 0) {
 313                 err = MCA_PML_CALL(send(result_buf + (ptrdiff_t)tmp_disps[send_index] * extent,
 314                                         send_count, dtype, peer,
 315                                         MCA_COLL_BASE_TAG_REDUCE_SCATTER,
 316                                         MCA_PML_BASE_SEND_STANDARD,
 317                                         comm));
 318                 if (OMPI_SUCCESS != err) {
 319                     free(tmp_rcounts);
 320                     free(tmp_disps);
 321                     goto cleanup;
 322                 }
 323             }
 324 
 325             /* if we received something on this step, push it into
 326                the results buffer */
 327             if (recv_count > 0) {
 328                 err = ompi_request_wait(&request, MPI_STATUS_IGNORE);
 329                 if (OMPI_SUCCESS != err) {
 330                     free(tmp_rcounts);
 331                     free(tmp_disps);
 332                     goto cleanup;
 333                 }
 334 
 335                 ompi_op_reduce(op,
 336                                recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
 337                                result_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
 338                                recv_count, dtype);
 339             }
 340 
 341             /* update for next iteration */
 342             send_index = recv_index;
 343             last_index = recv_index + mask;
 344             mask >>= 1;
 345         }
 346 
 347         /* copy local results from results buffer into real receive buffer */
 348         if (0 != rcounts[rank]) {
 349             err = ompi_datatype_sndrcv(result_buf + disps[rank] * extent,
 350                                        rcounts[rank], dtype,
 351                                        rbuf, rcounts[rank], dtype);
 352             if (OMPI_SUCCESS != err) {
 353                 free(tmp_rcounts);
 354                 free(tmp_disps);
 355                 goto cleanup;
 356             }
 357         }
 358 
 359         free(tmp_rcounts);
 360         free(tmp_disps);
 361     }
 362 
 363     /* Now fix up the non-power of two case, by having the odd
 364        procs send the even procs the proper results */
 365     if (rank < (2 * remain)) {
 366         if ((rank & 1) == 0) {
 367             if (rcounts[rank]) {
 368                 err = MCA_PML_CALL(recv(rbuf, rcounts[rank], dtype, rank + 1,
 369                                         MCA_COLL_BASE_TAG_REDUCE_SCATTER,
 370                                         comm, MPI_STATUS_IGNORE));
 371                 if (OMPI_SUCCESS != err) goto cleanup;
 372             }
 373         } else {
 374             if (rcounts[rank - 1]) {
 375                 err = MCA_PML_CALL(send(result_buf + disps[rank - 1] * extent,
 376                                         rcounts[rank - 1], dtype, rank - 1,
 377                                         MCA_COLL_BASE_TAG_REDUCE_SCATTER,
 378                                         MCA_PML_BASE_SEND_STANDARD,
 379                                         comm));
 380                 if (OMPI_SUCCESS != err) goto cleanup;
 381             }
 382         }
 383     }
 384 
 385  cleanup:
 386     if (NULL != disps) free(disps);
 387     if (NULL != recv_buf_free) free(recv_buf_free);
 388     if (NULL != result_buf_free) free(result_buf_free);
 389 
 390     return err;
 391 }
 392 
 393 /* copied function (with appropriate renaming) ends here */
 394 
 395 
 396 /*
 397  *   ompi_coll_base_reduce_scatter_intra_ring
 398  *
 399  *   Function:       Ring algorithm for reduce_scatter operation
 400  *   Accepts:        Same as MPI_Reduce_scatter()
 401  *   Returns:        MPI_SUCCESS or error code
 402  *
 403  *   Description:    Implements ring algorithm for reduce_scatter:
 404  *                   the block sizes defined in rcounts are exchanged and
 405  8                    updated until they reach proper destination.
 406  *                   Algorithm requires 2 * max(rcounts) extra buffering
 407  *
 408  *   Limitations:    The algorithm DOES NOT preserve order of operations so it
 409  *                   can be used only for commutative operations.
 410  *         Example on 5 nodes:
 411  *         Initial state
 412  *   #      0              1             2              3             4
 413  *        [00]           [10]   ->     [20]           [30]           [40]
 414  *        [01]           [11]          [21]  ->       [31]           [41]
 415  *        [02]           [12]          [22]           [32]  ->       [42]
 416  *    ->  [03]           [13]          [23]           [33]           [43] --> ..
 417  *        [04]  ->       [14]          [24]           [34]           [44]
 418  *
 419  *        COMPUTATION PHASE
 420  *         Step 0: rank r sends block (r-1) to rank (r+1) and
 421  *                 receives block (r+1) from rank (r-1) [with wraparound].
 422  *   #      0              1             2              3             4
 423  *        [00]           [10]        [10+20]   ->     [30]           [40]
 424  *        [01]           [11]          [21]          [21+31]  ->     [41]
 425  *    ->  [02]           [12]          [22]           [32]         [32+42] -->..
 426  *      [43+03] ->       [13]          [23]           [33]           [43]
 427  *        [04]         [04+14]  ->     [24]           [34]           [44]
 428  *
 429  *         Step 1:
 430  *   #      0              1             2              3             4
 431  *        [00]           [10]        [10+20]       [10+20+30] ->     [40]
 432  *    ->  [01]           [11]          [21]          [21+31]      [21+31+41] ->
 433  *     [32+42+02] ->     [12]          [22]           [32]         [32+42]
 434  *        [03]        [43+03+13] ->    [23]           [33]           [43]
 435  *        [04]         [04+14]      [04+14+24]  ->    [34]           [44]
 436  *
 437  *         Step 2:
 438  *   #      0              1             2              3             4
 439  *     -> [00]           [10]        [10+20]       [10+20+30]   [10+20+30+40] ->
 440  *   [21+31+41+01]->     [11]          [21]          [21+31]      [21+31+41]
 441  *     [32+42+02]   [32+42+02+12]->    [22]           [32]         [32+42]
 442  *        [03]        [43+03+13]   [43+03+13+23]->    [33]           [43]
 443  *        [04]         [04+14]      [04+14+24]    [04+14+24+34] ->   [44]
 444  *
 445  *         Step 3:
 446  *   #      0             1              2              3             4
 447  * [10+20+30+40+00]     [10]         [10+20]       [10+20+30]   [10+20+30+40]
 448  *  [21+31+41+01] [21+31+41+01+11]     [21]          [21+31]      [21+31+41]
 449  *    [32+42+02]   [32+42+02+12] [32+42+02+12+22]     [32]         [32+42]
 450  *       [03]        [43+03+13]    [43+03+13+23] [43+03+13+23+33]    [43]
 451  *       [04]         [04+14]       [04+14+24]    [04+14+24+34] [04+14+24+34+44]
 452  *    DONE :)
 453  *
 454  */
 455 int
 456 ompi_coll_base_reduce_scatter_intra_ring( const void *sbuf, void *rbuf, const int *rcounts,
 457                                           struct ompi_datatype_t *dtype,
 458                                           struct ompi_op_t *op,
 459                                           struct ompi_communicator_t *comm,
 460                                           mca_coll_base_module_t *module)
 461 {
 462     int ret, line, rank, size, i, k, recv_from, send_to, total_count, max_block_count;
 463     int inbi, *displs = NULL;
 464     char *tmpsend = NULL, *tmprecv = NULL, *accumbuf = NULL, *accumbuf_free = NULL;
 465     char *inbuf_free[2] = {NULL, NULL}, *inbuf[2] = {NULL, NULL};
 466     ptrdiff_t extent, max_real_segsize, dsize, gap = 0;
 467     ompi_request_t *reqs[2] = {MPI_REQUEST_NULL, MPI_REQUEST_NULL};
 468 
 469     size = ompi_comm_size(comm);
 470     rank = ompi_comm_rank(comm);
 471 
 472     OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
 473                  "coll:base:reduce_scatter_intra_ring rank %d, size %d",
 474                  rank, size));
 475 
 476     /* Determine the maximum number of elements per node,
 477        corresponding block size, and displacements array.
 478     */
 479     displs = (int*) malloc(size * sizeof(int));
 480     if (NULL == displs) { ret = -1; line = __LINE__; goto error_hndl; }
 481     displs[0] = 0;
 482     total_count = rcounts[0];
 483     max_block_count = rcounts[0];
 484     for (i = 1; i < size; i++) {
 485         displs[i] = total_count;
 486         total_count += rcounts[i];
 487         if (max_block_count < rcounts[i]) max_block_count = rcounts[i];
 488     }
 489 
 490     /* Special case for size == 1 */
 491     if (1 == size) {
 492         if (MPI_IN_PLACE != sbuf) {
 493             ret = ompi_datatype_copy_content_same_ddt(dtype, total_count,
 494                                                       (char*)rbuf, (char*)sbuf);
 495             if (ret < 0) { line = __LINE__; goto error_hndl; }
 496         }
 497         free(displs);
 498         return MPI_SUCCESS;
 499     }
 500 
 501     /* Allocate and initialize temporary buffers, we need:
 502        - a temporary buffer to perform reduction (size total_count) since
 503        rbuf can be of rcounts[rank] size.
 504        - up to two temporary buffers used for communication/computation overlap.
 505     */
 506     ret = ompi_datatype_type_extent(dtype, &extent);
 507     if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
 508 
 509     max_real_segsize = opal_datatype_span(&dtype->super, max_block_count, &gap);
 510     dsize = opal_datatype_span(&dtype->super, total_count, &gap);
 511 
 512     accumbuf_free = (char*)malloc(dsize);
 513     if (NULL == accumbuf_free) { ret = -1; line = __LINE__; goto error_hndl; }
 514     accumbuf = accumbuf_free - gap;
 515 
 516     inbuf_free[0] = (char*)malloc(max_real_segsize);
 517     if (NULL == inbuf_free[0]) { ret = -1; line = __LINE__; goto error_hndl; }
 518     inbuf[0] = inbuf_free[0] - gap;
 519     if (size > 2) {
 520         inbuf_free[1] = (char*)malloc(max_real_segsize);
 521         if (NULL == inbuf_free[1]) { ret = -1; line = __LINE__; goto error_hndl; }
 522         inbuf[1] = inbuf_free[1] - gap;
 523     }
 524 
 525     /* Handle MPI_IN_PLACE for size > 1 */
 526     if (MPI_IN_PLACE == sbuf) {
 527         sbuf = rbuf;
 528     }
 529 
 530     ret = ompi_datatype_copy_content_same_ddt(dtype, total_count,
 531                                               accumbuf, (char*)sbuf);
 532     if (ret < 0) { line = __LINE__; goto error_hndl; }
 533 
 534     /* Computation loop */
 535 
 536     /*
 537        For each of the remote nodes:
 538        - post irecv for block (r-2) from (r-1) with wrap around
 539        - send block (r-1) to (r+1)
 540        - in loop for every step k = 2 .. n
 541        - post irecv for block (r - 1 + n - k) % n
 542        - wait on block (r + n - k) % n to arrive
 543        - compute on block (r + n - k ) % n
 544        - send block (r + n - k) % n
 545        - wait on block (r)
 546        - compute on block (r)
 547        - copy block (r) to rbuf
 548        Note that we must be careful when computing the begining of buffers and
 549        for send operations and computation we must compute the exact block size.
 550     */
 551     send_to = (rank + 1) % size;
 552     recv_from = (rank + size - 1) % size;
 553 
 554     inbi = 0;
 555     /* Initialize first receive from the neighbor on the left */
 556     ret = MCA_PML_CALL(irecv(inbuf[inbi], max_block_count, dtype, recv_from,
 557                              MCA_COLL_BASE_TAG_REDUCE_SCATTER, comm,
 558                              &reqs[inbi]));
 559     if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
 560     tmpsend = accumbuf + (ptrdiff_t)displs[recv_from] * extent;
 561     ret = MCA_PML_CALL(send(tmpsend, rcounts[recv_from], dtype, send_to,
 562                             MCA_COLL_BASE_TAG_REDUCE_SCATTER,
 563                             MCA_PML_BASE_SEND_STANDARD, comm));
 564     if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
 565 
 566     for (k = 2; k < size; k++) {
 567         const int prevblock = (rank + size - k) % size;
 568 
 569         inbi = inbi ^ 0x1;
 570 
 571         /* Post irecv for the current block */
 572         ret = MCA_PML_CALL(irecv(inbuf[inbi], max_block_count, dtype, recv_from,
 573                                  MCA_COLL_BASE_TAG_REDUCE_SCATTER, comm,
 574                                  &reqs[inbi]));
 575         if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
 576 
 577         /* Wait on previous block to arrive */
 578         ret = ompi_request_wait(&reqs[inbi ^ 0x1], MPI_STATUS_IGNORE);
 579         if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
 580 
 581         /* Apply operation on previous block: result goes to rbuf
 582            rbuf[prevblock] = inbuf[inbi ^ 0x1] (op) rbuf[prevblock]
 583         */
 584         tmprecv = accumbuf + (ptrdiff_t)displs[prevblock] * extent;
 585         ompi_op_reduce(op, inbuf[inbi ^ 0x1], tmprecv, rcounts[prevblock], dtype);
 586 
 587         /* send previous block to send_to */
 588         ret = MCA_PML_CALL(send(tmprecv, rcounts[prevblock], dtype, send_to,
 589                                 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
 590                                 MCA_PML_BASE_SEND_STANDARD, comm));
 591         if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
 592     }
 593 
 594     /* Wait on the last block to arrive */
 595     ret = ompi_request_wait(&reqs[inbi], MPI_STATUS_IGNORE);
 596     if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
 597 
 598     /* Apply operation on the last block (my block)
 599        rbuf[rank] = inbuf[inbi] (op) rbuf[rank] */
 600     tmprecv = accumbuf + (ptrdiff_t)displs[rank] * extent;
 601     ompi_op_reduce(op, inbuf[inbi], tmprecv, rcounts[rank], dtype);
 602 
 603     /* Copy result from tmprecv to rbuf */
 604     ret = ompi_datatype_copy_content_same_ddt(dtype, rcounts[rank], (char *)rbuf, tmprecv);
 605     if (ret < 0) { line = __LINE__; goto error_hndl; }
 606 
 607     if (NULL != displs) free(displs);
 608     if (NULL != accumbuf_free) free(accumbuf_free);
 609     if (NULL != inbuf_free[0]) free(inbuf_free[0]);
 610     if (NULL != inbuf_free[1]) free(inbuf_free[1]);
 611 
 612     return MPI_SUCCESS;
 613 
 614  error_hndl:
 615     OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "%s:%4d\tRank %d Error occurred %d\n",
 616                  __FILE__, line, rank, ret));
 617     (void)line;  // silence compiler warning
 618     if (NULL != displs) free(displs);
 619     if (NULL != accumbuf_free) free(accumbuf_free);
 620     if (NULL != inbuf_free[0]) free(inbuf_free[0]);
 621     if (NULL != inbuf_free[1]) free(inbuf_free[1]);
 622     return ret;
 623 }
 624 
 625 /*
 626  * ompi_sum_counts: Returns sum of counts [lo, hi]
 627  *                  lo, hi in {0, 1, ..., nprocs_pof2 - 1}
 628  */
 629 static int ompi_sum_counts(const int *counts, int *displs, int nprocs_rem, int lo, int hi)
 630 {
 631     /* Adjust lo and hi for taking into account blocks of excluded processes */
 632     lo = (lo < nprocs_rem) ? lo * 2 : lo + nprocs_rem;
 633     hi = (hi < nprocs_rem) ? hi * 2 + 1 : hi + nprocs_rem;
 634     return displs[hi] + counts[hi] - displs[lo];
 635 }
 636 
 637 /*
 638  * ompi_coll_base_reduce_scatter_intra_butterfly
 639  *
 640  * Function:  Butterfly algorithm for reduce_scatter
 641  * Accepts:   Same as MPI_Reduce_scatter
 642  * Returns:   MPI_SUCCESS or error code
 643  *
 644  * Description:  Implements butterfly algorithm for MPI_Reduce_scatter [*].
 645  *               The algorithm can be used both by commutative and non-commutative
 646  *               operations, for power-of-two and non-power-of-two number of processes.
 647  *
 648  * [*] J.L. Traff. An improved Algorithm for (non-commutative) Reduce-scatter
 649  *     with an Application // Proc. of EuroPVM/MPI, 2005. -- pp. 129-137.
 650  *
 651  * Time complexity: O(m\lambda + log(p)\alpha + m\beta + m\gamma),
 652  *   where m = sum of rcounts[], p = comm_size
 653  * Memory requirements (per process): 2 * m * typesize + comm_size
 654  *
 655  * Example: comm_size=6, nprocs_pof2=4, nprocs_rem=2, rcounts[]=1, sbuf=[0,1,...,5]
 656  * Step 1. Reduce the number of processes to 4
 657  * rank 0: [0|1|2|3|4|5]: send to 1: vrank -1
 658  * rank 1: [0|1|2|3|4|5]: recv from 0, op: vrank 0: [0|2|4|6|8|10]
 659  * rank 2: [0|1|2|3|4|5]: send to 3: vrank -1
 660  * rank 3: [0|1|2|3|4|5]: recv from 2, op: vrank 1: [0|2|4|6|8|10]
 661  * rank 4: [0|1|2|3|4|5]: vrank 2: [0|1|2|3|4|5]
 662  * rank 5: [0|1|2|3|4|5]: vrank 3: [0|1|2|3|4|5]
 663  *
 664  * Step 2. Butterfly. Buffer of 6 elements is divided into 4 blocks.
 665  * Round 1 (mask=1, nblocks=2)
 666  * 0: vrank -1
 667  * 1: vrank  0 [0 2|4 6|8|10]: exch with 1: send [2,3], recv [0,1]: [0 4|8 12|*|*]
 668  * 2: vrank -1
 669  * 3: vrank  1 [0 2|4 6|8|10]: exch with 0: send [0,1], recv [2,3]: [**|**|16|20]
 670  * 4: vrank  2 [0 1|2 3|4|5] : exch with 3: send [2,3], recv [0,1]: [0 2|4 6|*|*]
 671  * 5: vrank  3 [0 1|2 3|4|5] : exch with 2: send [0,1], recv [2,3]: [**|**|8|10]
 672  *
 673  * Round 2 (mask=2, nblocks=1)
 674  * 0: vrank -1
 675  * 1: vrank  0 [0 4|8 12|*|*]: exch with 2: send [1], recv [0]: [0 6|**|*|*]
 676  * 2: vrank -1
 677  * 3: vrank  1 [**|**|16|20] : exch with 3: send [3], recv [2]: [**|**|24|*]
 678  * 4: vrank  2 [0 2|4 6|*|*] : exch with 0: send [0], recv [1]: [**|12 18|*|*]
 679  * 5: vrank  3 [**|**|8|10]  : exch with 1: send [2], recv [3]: [**|**|*|30]
 680  *
 681  * Step 3. Exchange with remote process according to a mirror permutation:
 682  *         mperm(0)=0, mperm(1)=2, mperm(2)=1, mperm(3)=3
 683  * 0: vrank -1: recv "0" from process 0
 684  * 1: vrank  0 [0 6|**|*|*]: send "0" to 0, copy "6" to rbuf (mperm(0)=0)
 685  * 2: vrank -1: recv result "12" from process 4
 686  * 3: vrank  1 [**|**|24|*]
 687  * 4: vrank  2 [**|12 18|*|*]: send "12" to 2, send "18" to 3, recv "24" from 3
 688  * 5: vrank  3 [**|**|*|30]: copy "30" to rbuf (mperm(3)=3)
 689  */
 690 int
 691 ompi_coll_base_reduce_scatter_intra_butterfly(
 692     const void *sbuf, void *rbuf, const int *rcounts, struct ompi_datatype_t *dtype,
 693     struct ompi_op_t *op, struct ompi_communicator_t *comm,
 694     mca_coll_base_module_t *module)
 695 {
 696     char *tmpbuf[2] = {NULL, NULL}, *psend, *precv;
 697     int *displs = NULL, index;
 698     ptrdiff_t span, gap, totalcount, extent;
 699     int err = MPI_SUCCESS;
 700     int comm_size = ompi_comm_size(comm);
 701     int rank = ompi_comm_rank(comm);
 702 
 703     OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
 704                  "coll:base:reduce_scatter_intra_butterfly: rank %d/%d",
 705                  rank, comm_size));
 706     if (comm_size < 2)
 707         return MPI_SUCCESS;
 708 
 709     displs = malloc(sizeof(*displs) * comm_size);
 710     if (NULL == displs) {
 711         err = OMPI_ERR_OUT_OF_RESOURCE;
 712         goto cleanup_and_return;
 713     }
 714     displs[0] = 0;
 715     for (int i = 1; i < comm_size; i++) {
 716         displs[i] = displs[i - 1] + rcounts[i - 1];
 717     }
 718     totalcount = displs[comm_size - 1] + rcounts[comm_size - 1];
 719 
 720     ompi_datatype_type_extent(dtype, &extent);
 721     span = opal_datatype_span(&dtype->super, totalcount, &gap);
 722     tmpbuf[0] = malloc(span);
 723     tmpbuf[1] = malloc(span);
 724     if (NULL == tmpbuf[0] || NULL == tmpbuf[1]) {
 725         err = OMPI_ERR_OUT_OF_RESOURCE;
 726         goto cleanup_and_return;
 727     }
 728     psend = tmpbuf[0] - gap;
 729     precv = tmpbuf[1] - gap;
 730 
 731     if (sbuf != MPI_IN_PLACE) {
 732         err = ompi_datatype_copy_content_same_ddt(dtype, totalcount, psend, (char *)sbuf);
 733         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 734     } else {
 735         err = ompi_datatype_copy_content_same_ddt(dtype, totalcount, psend, rbuf);
 736         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 737     }
 738 
 739     /*
 740      * Step 1. Reduce the number of processes to the nearest lower power of two
 741      * p' = 2^{\floor{\log_2 p}} by removing r = p - p' processes.
 742      * In the first 2r processes (ranks 0 to 2r - 1), all the even ranks send
 743      * the input vector to their neighbor (rank + 1) and all the odd ranks recv
 744      * the input vector and perform local reduction.
 745      * The odd ranks (0 to 2r - 1) contain the reduction with the input
 746      * vector on their neighbors (the even ranks). The first r odd
 747      * processes and the p - 2r last processes are renumbered from
 748      * 0 to 2^{\floor{\log_2 p}} - 1. Even ranks do not participate in the
 749      * rest of the algorithm.
 750      */
 751 
 752     /* Find nearest power-of-two less than or equal to comm_size */
 753     int nprocs_pof2 = opal_next_poweroftwo(comm_size);
 754     nprocs_pof2 >>= 1;
 755     int nprocs_rem = comm_size - nprocs_pof2;
 756     int log2_size = opal_cube_dim(nprocs_pof2);
 757 
 758     int vrank = -1;
 759     if (rank < 2 * nprocs_rem) {
 760         if ((rank % 2) == 0) {
 761             /* Even process */
 762             err = MCA_PML_CALL(send(psend, totalcount, dtype, rank + 1,
 763                                     MCA_COLL_BASE_TAG_REDUCE_SCATTER,
 764                                     MCA_PML_BASE_SEND_STANDARD, comm));
 765             if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
 766             /* This process does not participate in the rest of the algorithm */
 767             vrank = -1;
 768         } else {
 769             /* Odd process */
 770             err = MCA_PML_CALL(recv(precv, totalcount, dtype, rank - 1,
 771                                     MCA_COLL_BASE_TAG_REDUCE_SCATTER,
 772                                     comm, MPI_STATUS_IGNORE));
 773             if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
 774             ompi_op_reduce(op, precv, psend, totalcount, dtype);
 775             /* Adjust rank to be the bottom "remain" ranks */
 776             vrank = rank / 2;
 777         }
 778     } else {
 779         /* Adjust rank to show that the bottom "even remain" ranks dropped out */
 780         vrank = rank - nprocs_rem;
 781     }
 782 
 783     if (vrank != -1) {
 784         /*
 785          * Now, psend vector of size totalcount is divided into nprocs_pof2 blocks:
 786          * block 0:   rcounts[0] and rcounts[1] -- for process 0 and 1
 787          * block 1:   rcounts[2] and rcounts[3] -- for process 2 and 3
 788          * ...
 789          * block r-1: rcounts[2*(r-1)] and rcounts[2*(r-1)+1]
 790          * block r:   rcounts[r+r]
 791          * block r+1: rcounts[r+r+1]
 792          * ...
 793          * block nprocs_pof2 - 1: rcounts[r+nprocs_pof2-1]
 794          */
 795         int nblocks = nprocs_pof2, send_index = 0, recv_index = 0;
 796         for (int mask = 1; mask < nprocs_pof2; mask <<= 1) {
 797             int vpeer = vrank ^ mask;
 798             int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem;
 799 
 800             nblocks /= 2;
 801             if ((vrank & mask) == 0) {
 802                 /* Send the upper half of reduction buffer, recv the lower half */
 803                 send_index += nblocks;
 804             } else {
 805                 /* Send the upper half of reduction buffer, recv the lower half */
 806                 recv_index += nblocks;
 807             }
 808 
 809             /* Send blocks: [send_index, send_index + nblocks - 1] */
 810             int send_count = ompi_sum_counts(rcounts, displs, nprocs_rem,
 811                                              send_index, send_index + nblocks - 1);
 812             index = (send_index < nprocs_rem) ? 2 * send_index : nprocs_rem + send_index;
 813             ptrdiff_t sdispl = displs[index];
 814 
 815             /* Recv blocks: [recv_index, recv_index + nblocks - 1] */
 816             int recv_count = ompi_sum_counts(rcounts, displs, nprocs_rem,
 817                                              recv_index, recv_index + nblocks - 1);
 818             index = (recv_index < nprocs_rem) ? 2 * recv_index : nprocs_rem + recv_index;
 819             ptrdiff_t rdispl = displs[index];
 820 
 821             err = ompi_coll_base_sendrecv(psend + (ptrdiff_t)sdispl * extent, send_count,
 822                                           dtype, peer, MCA_COLL_BASE_TAG_REDUCE_SCATTER,
 823                                           precv + (ptrdiff_t)rdispl * extent, recv_count,
 824                                           dtype, peer, MCA_COLL_BASE_TAG_REDUCE_SCATTER,
 825                                           comm, MPI_STATUS_IGNORE, rank);
 826             if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 827 
 828             if (vrank < vpeer) {
 829                 /* precv = psend <op> precv */
 830                 ompi_op_reduce(op, psend + (ptrdiff_t)rdispl * extent,
 831                                precv + (ptrdiff_t)rdispl * extent, recv_count, dtype);
 832                 char *p = psend;
 833                 psend = precv;
 834                 precv = p;
 835             } else {
 836                 /* psend = precv <op> psend */
 837                 ompi_op_reduce(op, precv + (ptrdiff_t)rdispl * extent,
 838                                psend + (ptrdiff_t)rdispl * extent, recv_count, dtype);
 839             }
 840             send_index = recv_index;
 841         }
 842         /*
 843          * psend points to the result block [send_index]
 844          * Exchange results with remote process according to a mirror permutation.
 845          */
 846         int vpeer = ompi_mirror_perm(vrank, log2_size);
 847         int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem;
 848         index = (send_index < nprocs_rem) ? 2 * send_index : nprocs_rem + send_index;
 849 
 850         if (vpeer < nprocs_rem) {
 851             /*
 852              * Process has two blocks: for excluded process and own.
 853              * Send the first block to excluded process.
 854              */
 855             err = MCA_PML_CALL(send(psend + (ptrdiff_t)displs[index] * extent,
 856                                     rcounts[index], dtype, peer - 1,
 857                                     MCA_COLL_BASE_TAG_REDUCE_SCATTER,
 858                                     MCA_PML_BASE_SEND_STANDARD, comm));
 859             if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 860         }
 861 
 862         /* If process has two blocks, then send the second block (own block) */
 863         if (vpeer < nprocs_rem)
 864             index++;
 865         if (vpeer != vrank) {
 866             err = ompi_coll_base_sendrecv(psend + (ptrdiff_t)displs[index] * extent,
 867                                           rcounts[index], dtype, peer,
 868                                           MCA_COLL_BASE_TAG_REDUCE_SCATTER,
 869                                           rbuf, rcounts[rank], dtype, peer,
 870                                           MCA_COLL_BASE_TAG_REDUCE_SCATTER,
 871                                           comm, MPI_STATUS_IGNORE, rank);
 872             if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 873         } else {
 874             err = ompi_datatype_copy_content_same_ddt(dtype, rcounts[rank], rbuf,
 875                                                       psend + (ptrdiff_t)displs[rank] * extent);
 876             if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 877         }
 878 
 879     } else {
 880         /* Excluded process: receive result */
 881         int vpeer = ompi_mirror_perm((rank + 1) / 2, log2_size);
 882         int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem;
 883         err = MCA_PML_CALL(recv(rbuf, rcounts[rank], dtype, peer,
 884                                 MCA_COLL_BASE_TAG_REDUCE_SCATTER, comm,
 885                                 MPI_STATUS_IGNORE));
 886         if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
 887     }
 888 
 889 cleanup_and_return:
 890     if (displs)
 891         free(displs);
 892     if (tmpbuf[0])
 893         free(tmpbuf[0]);
 894     if (tmpbuf[1])
 895         free(tmpbuf[1]);
 896     return err;
 897 }

/* [<][>][^][v][top][bottom][index][help] */