root/ompi/mca/coll/base/coll_base_reduce_scatter_block.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. ompi_coll_base_reduce_scatter_block_basic_linear
  2. ompi_coll_base_reduce_scatter_block_intra_recursivedoubling
  3. ompi_range_sum
  4. ompi_coll_base_reduce_scatter_block_intra_recursivehalving
  5. ompi_coll_base_reduce_scatter_block_intra_butterfly
  6. ompi_coll_base_reduce_scatter_block_intra_butterfly_pof2

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   4  *                         University Research and Technology
   5  *                         Corporation.  All rights reserved.
   6  * Copyright (c) 2004-2017 The University of Tennessee and The University
   7  *                         of Tennessee Research Foundation.  All rights
   8  *                         reserved.
   9  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
  10  *                         University of Stuttgart.  All rights reserved.
  11  * Copyright (c) 2004-2005 The Regents of the University of California.
  12  *                         All rights reserved.
  13  * Copyright (c) 2008      Sun Microsystems, Inc.  All rights reserved.
  14  * Copyright (c) 2012      Oak Ridge National Labs.  All rights reserved.
  15  * Copyright (c) 2012      Sandia National Laboratories. All rights reserved.
  16  * Copyright (c) 2014-2018 Research Organization for Information Science
  17  *                         and Technology (RIST). All rights reserved.
  18  * Copyright (c) 2018      Siberian State University of Telecommunications
  19  *                         and Information Sciences. All rights reserved.
  20  * $COPYRIGHT$
  21  *
  22  * Additional copyrights may follow
  23  *
  24  * $HEADER$
  25  */
  26 
  27 #include "ompi_config.h"
  28 
  29 #include "mpi.h"
  30 #include "opal/util/bit_ops.h"
  31 #include "ompi/constants.h"
  32 #include "ompi/datatype/ompi_datatype.h"
  33 #include "ompi/communicator/communicator.h"
  34 #include "ompi/mca/coll/coll.h"
  35 #include "ompi/mca/coll/basic/coll_basic.h"
  36 #include "ompi/mca/pml/pml.h"
  37 #include "ompi/op/op.h"
  38 #include "coll_tags.h"
  39 #include "coll_base_functions.h"
  40 #include "coll_base_topo.h"
  41 #include "coll_base_util.h"
  42 
  43 /*
  44  *      ompi_reduce_scatter_block_basic_linear
  45  *
  46  *      Function:       - reduce then scatter
  47  *      Accepts:        - same as MPI_Reduce_scatter_block()
  48  *      Returns:        - MPI_SUCCESS or error code
  49  *
  50  * Algorithm:
  51  *     reduce and scatter (needs to be cleaned
  52  *     up at some point)
  53  */
  54 int
  55 ompi_coll_base_reduce_scatter_block_basic_linear(const void *sbuf, void *rbuf, int rcount,
  56                                                  struct ompi_datatype_t *dtype,
  57                                                  struct ompi_op_t *op,
  58                                                  struct ompi_communicator_t *comm,
  59                                                  mca_coll_base_module_t *module)
  60 {
  61     int rank, size, count, err = OMPI_SUCCESS;
  62     ptrdiff_t gap, span;
  63     char *recv_buf = NULL, *recv_buf_free = NULL;
  64 
  65     /* Initialize */
  66     rank = ompi_comm_rank(comm);
  67     size = ompi_comm_size(comm);
  68 
  69     /* short cut the trivial case */
  70     count = rcount * size;
  71     if (0 == count) {
  72         return OMPI_SUCCESS;
  73     }
  74 
  75     /* get datatype information */
  76     span = opal_datatype_span(&dtype->super, count, &gap);
  77 
  78     /* Handle MPI_IN_PLACE */
  79     if (MPI_IN_PLACE == sbuf) {
  80         sbuf = rbuf;
  81     }
  82 
  83     if (0 == rank) {
  84         /* temporary receive buffer.  See coll_basic_reduce.c for
  85            details on sizing */
  86         recv_buf_free = (char*) malloc(span);
  87         if (NULL == recv_buf_free) {
  88             err = OMPI_ERR_OUT_OF_RESOURCE;
  89             goto cleanup;
  90         }
  91         recv_buf = recv_buf_free - gap;
  92     }
  93 
  94     /* reduction */
  95     err =
  96         comm->c_coll->coll_reduce(sbuf, recv_buf, count, dtype, op, 0,
  97                                  comm, comm->c_coll->coll_reduce_module);
  98 
  99     /* scatter */
 100     if (MPI_SUCCESS == err) {
 101         err = comm->c_coll->coll_scatter(recv_buf, rcount, dtype,
 102                                         rbuf, rcount, dtype, 0,
 103                                         comm, comm->c_coll->coll_scatter_module);
 104     }
 105 
 106  cleanup:
 107     if (NULL != recv_buf_free) free(recv_buf_free);
 108 
 109     return err;
 110 }
 111 
 112 /*
 113  * ompi_coll_base_reduce_scatter_block_intra_recursivedoubling
 114  *
 115  * Function:  Recursive doubling algorithm for reduce_scatter_block.
 116  * Accepts:   Same as MPI_Reduce_scatter_block
 117  * Returns:   MPI_SUCCESS or error code
 118  *
 119  * Description:  Implements recursive doubling algorithm for MPI_Reduce_scatter_block.
 120  *               The algorithm preserves order of operations so it can
 121  *               be used both by commutative and non-commutative operations.
 122  *
 123  * Time complexity: \alpha\log(p) + \beta*m(\log(p)-(p-1)/p) + \gamma*m(\log(p)-(p-1)/p),
 124  *                  where m = rcount * comm_size, p = comm_size
 125  * Memory requirements (per process): 2 * rcount * comm_size * typesize
 126  */
 127 int
 128 ompi_coll_base_reduce_scatter_block_intra_recursivedoubling(
 129     const void *sbuf, void *rbuf, int rcount, struct ompi_datatype_t *dtype,
 130     struct ompi_op_t *op, struct ompi_communicator_t *comm,
 131     mca_coll_base_module_t *module)
 132 {
 133     struct ompi_datatype_t *dtypesend = NULL, *dtyperecv = NULL;
 134     char *tmprecv_raw = NULL, *tmpbuf_raw = NULL, *tmprecv, *tmpbuf;
 135     ptrdiff_t span, gap, totalcount, extent;
 136     int blocklens[2], displs[2];
 137     int err = MPI_SUCCESS;
 138     int comm_size = ompi_comm_size(comm);
 139     int rank = ompi_comm_rank(comm);
 140 
 141     OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
 142                  "coll:base:reduce_scatter_block_intra_recursivedoubling: rank %d/%d",
 143                  rank, comm_size));
 144     if (rcount == 0)
 145         return MPI_SUCCESS;
 146     if (comm_size < 2)
 147         return MPI_SUCCESS;
 148 
 149     totalcount = comm_size * rcount;
 150     ompi_datatype_type_extent(dtype, &extent);
 151     span = opal_datatype_span(&dtype->super, totalcount, &gap);
 152     tmpbuf_raw = malloc(span);
 153     tmprecv_raw = malloc(span);
 154     if (NULL == tmpbuf_raw || NULL == tmprecv_raw) {
 155         err = OMPI_ERR_OUT_OF_RESOURCE;
 156         goto cleanup_and_return;
 157     }
 158     tmpbuf = tmpbuf_raw - gap;
 159     tmprecv = tmprecv_raw - gap;
 160 
 161     if (sbuf != MPI_IN_PLACE) {
 162         err = ompi_datatype_copy_content_same_ddt(dtype, totalcount, tmpbuf, (char *)sbuf);
 163         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 164     } else {
 165         err = ompi_datatype_copy_content_same_ddt(dtype, totalcount, tmpbuf, rbuf);
 166         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 167     }
 168     int is_commutative = ompi_op_is_commute(op);
 169 
 170     /* Recursive distance doubling */
 171     int rdoubling_step = 0;
 172     for (int mask = 1; mask < comm_size; mask <<= 1) {
 173         int remote = rank ^ mask;
 174         int cur_tree_root = ompi_rounddown(rank, mask);
 175         int remote_tree_root = ompi_rounddown(remote, mask);
 176 
 177         /*
 178          * Let be m is a block size in bytes (rcount), p is a comm_size,
 179          * p*m is a total message size in sbuf.
 180          * Step 1: processes send and recv (p*m-m) amount of data
 181          * Step 2: processes send and recv (p*m-2*m) amount of data
 182          * Step 3: processes send and recv (p*m-4*m) amount of data
 183          * ...
 184          * Step ceil(\log_2(p)): send and recv (p*m-m*2^floor{\log_2(p-1)})
 185          *
 186          * Send block from tmpbuf: [0..cur_tree_root - 1], [cur_tree_root + mask, p - 1]
 187          * Recv block into tmprecv: [0..remote_tree_root - 1], [remote_tree_root + mask, p - 1]
 188          */
 189 
 190         /* Send type */
 191         blocklens[0] = rcount * cur_tree_root;
 192         blocklens[1] = (comm_size >= cur_tree_root + mask) ?
 193                        rcount * (comm_size - cur_tree_root - mask) : 0;
 194         displs[0] = 0;
 195         displs[1] = comm_size * rcount - blocklens[1];
 196         err = ompi_datatype_create_indexed(2, blocklens, displs, dtype, &dtypesend);
 197         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 198         err = ompi_datatype_commit(&dtypesend);
 199         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 200 
 201         /* Recv type */
 202         blocklens[0] = rcount * remote_tree_root;
 203         blocklens[1] = (comm_size >= remote_tree_root + mask) ?
 204                        rcount * (comm_size - remote_tree_root - mask) : 0;
 205         displs[0] = 0;
 206         displs[1] = comm_size * rcount - blocklens[1];
 207         err = ompi_datatype_create_indexed(2, blocklens, displs, dtype, &dtyperecv);
 208         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 209         err = ompi_datatype_commit(&dtyperecv);
 210         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 211 
 212         int is_block_received = 0;
 213         if (remote < comm_size) {
 214             err = ompi_coll_base_sendrecv(tmpbuf, 1, dtypesend, remote,
 215                                           MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 216                                           tmprecv, 1, dtyperecv, remote,
 217                                           MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 218                                           comm, MPI_STATUS_IGNORE, rank);
 219             if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 220             is_block_received = 1;
 221         }
 222         /*
 223          * Non-power-of-two case: if process did not have destination process
 224          * to communicate with, we need to send him the current result.
 225          * Recursive halving algorithm is used for search of process.
 226          */
 227         if (remote_tree_root + mask > comm_size) {
 228             /*
 229              * Compute the number of processes in current subtree
 230              * that have all the data
 231              */
 232             int nprocs_alldata = comm_size - cur_tree_root - mask;
 233             for (int rhalving_mask = mask >> 1; rhalving_mask > 0; rhalving_mask >>= 1) {
 234                 remote = rank ^ rhalving_mask;
 235                 int tree_root = ompi_rounddown(rank, rhalving_mask << 1);
 236                 /*
 237                  * Send only if:
 238                  * 1) current process has data: (remote > rank) && (rank < tree_root + nprocs_alldata)
 239                  * 2) remote process does not have data at any step: remote >= tree_root + nprocs_alldata
 240                  */
 241                 if ((remote > rank) && (rank < tree_root + nprocs_alldata)
 242                     && (remote >= tree_root + nprocs_alldata)) {
 243                     err = MCA_PML_CALL(send(tmprecv, 1, dtyperecv, remote,
 244                                             MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 245                                             MCA_PML_BASE_SEND_STANDARD, comm));
 246                     if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 247 
 248                 } else if ((remote < rank) && (remote < tree_root + nprocs_alldata) &&
 249                            (rank >= tree_root + nprocs_alldata)) {
 250                     err = MCA_PML_CALL(recv(tmprecv, 1, dtyperecv, remote,
 251                                             MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 252                                             comm, MPI_STATUS_IGNORE));
 253                     if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 254                     is_block_received = 1;
 255                 }
 256             }
 257         }
 258 
 259         if (is_block_received) {
 260             /* After reduction the result must be in tmpbuf */
 261             if (is_commutative || (remote_tree_root < cur_tree_root)) {
 262                 ompi_op_reduce(op, tmprecv, tmpbuf, blocklens[0], dtype);
 263                 ompi_op_reduce(op, tmprecv + (ptrdiff_t)displs[1] * extent,
 264                                tmpbuf + (ptrdiff_t)displs[1] * extent,
 265                                blocklens[1], dtype);
 266             } else {
 267                 ompi_op_reduce(op, tmpbuf, tmprecv, blocklens[0], dtype);
 268                 ompi_op_reduce(op, tmpbuf + (ptrdiff_t)displs[1] * extent,
 269                                tmprecv + (ptrdiff_t)displs[1] * extent,
 270                                blocklens[1], dtype);
 271                 err = ompi_datatype_copy_content_same_ddt(dtyperecv, 1,
 272                                                           tmpbuf, tmprecv);
 273                 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 274             }
 275         }
 276         rdoubling_step++;
 277         err = ompi_datatype_destroy(&dtypesend);
 278         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 279         err = ompi_datatype_destroy(&dtyperecv);
 280         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 281     }
 282     err = ompi_datatype_copy_content_same_ddt(dtype, rcount, rbuf,
 283                                               tmpbuf + (ptrdiff_t)rank * rcount * extent);
 284     if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 285 
 286 cleanup_and_return:
 287     if (dtypesend)
 288         ompi_datatype_destroy(&dtypesend);
 289     if (dtyperecv)
 290         ompi_datatype_destroy(&dtyperecv);
 291     if (tmpbuf_raw)
 292         free(tmpbuf_raw);
 293     if (tmprecv_raw)
 294         free(tmprecv_raw);
 295     return err;
 296 }
 297 
 298 /*
 299  * ompi_range_sum: Returns sum of elems in intersection of [a, b] and [0, r]
 300  *   index: 0 1 2 3 4 ... r r+1 r+2 ... nproc_pof2
 301  *   value: 2 2 2 2 2 ... 2  1   1  ... 1
 302  */
 303 static int ompi_range_sum(int a, int b, int r)
 304 {
 305     if (r < a)
 306         return b - a + 1;
 307     else if (r > b)
 308         return 2 * (b - a + 1);
 309     return 2 * (r - a + 1) + b - r;
 310 }
 311 
 312 /*
 313  * ompi_coll_base_reduce_scatter_block_intra_recursivehalving
 314  *
 315  * Function:  Recursive halving algorithm for reduce_scatter_block
 316  * Accepts:   Same as MPI_Reduce_scatter_block
 317  * Returns:   MPI_SUCCESS or error code
 318  *
 319  * Description:  Implements recursive halving algorithm for MPI_Reduce_scatter_block.
 320  *               The algorithm can be used by commutative operations only.
 321  *
 322  * Limitations:  commutative operations only
 323  * Memory requirements (per process): 2 * rcount * comm_size * typesize
 324  */
 325 int
 326 ompi_coll_base_reduce_scatter_block_intra_recursivehalving(
 327     const void *sbuf, void *rbuf, int rcount, struct ompi_datatype_t *dtype,
 328     struct ompi_op_t *op, struct ompi_communicator_t *comm,
 329     mca_coll_base_module_t *module)
 330 {
 331     char *tmprecv_raw = NULL, *tmpbuf_raw = NULL, *tmprecv, *tmpbuf;
 332     ptrdiff_t span, gap, totalcount, extent;
 333     int err = MPI_SUCCESS;
 334     int comm_size = ompi_comm_size(comm);
 335     int rank = ompi_comm_rank(comm);
 336 
 337     OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
 338                  "coll:base:reduce_scatter_block_intra_recursivehalving: rank %d/%d",
 339                  rank, comm_size));
 340     if (rcount == 0 || comm_size < 2)
 341         return MPI_SUCCESS;
 342 
 343     if (!ompi_op_is_commute(op)) {
 344         OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
 345                      "coll:base:reduce_scatter_block_intra_recursivehalving: rank %d/%d "
 346                      "switching to basic reduce_scatter_block", rank, comm_size));
 347         return ompi_coll_base_reduce_scatter_block_basic_linear(sbuf, rbuf, rcount, dtype,
 348                                                                 op, comm, module);
 349     }
 350     totalcount = comm_size * rcount;
 351     ompi_datatype_type_extent(dtype, &extent);
 352     span = opal_datatype_span(&dtype->super, totalcount, &gap);
 353     tmpbuf_raw = malloc(span);
 354     tmprecv_raw = malloc(span);
 355     if (NULL == tmpbuf_raw || NULL == tmprecv_raw) {
 356         err = OMPI_ERR_OUT_OF_RESOURCE;
 357         goto cleanup_and_return;
 358     }
 359     tmpbuf = tmpbuf_raw - gap;
 360     tmprecv = tmprecv_raw - gap;
 361 
 362     if (sbuf != MPI_IN_PLACE) {
 363         err = ompi_datatype_copy_content_same_ddt(dtype, totalcount, tmpbuf, (char *)sbuf);
 364         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 365     } else {
 366         err = ompi_datatype_copy_content_same_ddt(dtype, totalcount, tmpbuf, rbuf);
 367         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 368     }
 369 
 370     /*
 371      * Step 1. Reduce the number of processes to the nearest lower power of two
 372      * p' = 2^{\floor{\log_2 p}} by removing r = p - p' processes.
 373      * In the first 2r processes (ranks 0 to 2r - 1), all the even ranks send
 374      * the input vector to their neighbor (rank + 1) and all the odd ranks recv
 375      * the input vector and perform local reduction.
 376      * The odd ranks (0 to 2r - 1) contain the reduction with the input
 377      * vector on their neighbors (the even ranks). The first r odd
 378      * processes and the p - 2r last processes are renumbered from
 379      * 0 to 2^{\floor{\log_2 p}} - 1. Even ranks do not participate in the
 380      * rest of the algorithm.
 381      */
 382 
 383     /* Find nearest power-of-two less than or equal to comm_size */
 384     int nprocs_pof2 = opal_next_poweroftwo(comm_size);
 385     nprocs_pof2 >>= 1;
 386     int nprocs_rem = comm_size - nprocs_pof2;
 387 
 388     int vrank = -1;
 389     if (rank < 2 * nprocs_rem) {
 390         if ((rank % 2) == 0) {
 391             /* Even process */
 392             err = MCA_PML_CALL(send(tmpbuf, totalcount, dtype, rank + 1,
 393                                     MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 394                                     MCA_PML_BASE_SEND_STANDARD, comm));
 395             if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
 396             /* This process does not pariticipate in the rest of the algorithm */
 397             vrank = -1;
 398         } else {
 399             /* Odd process */
 400             err = MCA_PML_CALL(recv(tmprecv, totalcount, dtype, rank - 1,
 401                                     MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 402                                     comm, MPI_STATUS_IGNORE));
 403             if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
 404             ompi_op_reduce(op, tmprecv, tmpbuf, totalcount, dtype);
 405             /* Adjust rank to be the bottom "remain" ranks */
 406             vrank = rank / 2;
 407         }
 408     } else {
 409         /* Adjust rank to show that the bottom "even remain" ranks dropped out */
 410         vrank = rank - nprocs_rem;
 411     }
 412 
 413     if (vrank != -1) {
 414         /*
 415          * Step 2. Recursive vector halving. We have p' = 2^{\floor{\log_2 p}}
 416          * power-of-two number of processes with new ranks (vrank) and partial
 417          * result in tmpbuf.
 418          * All processes then compute the reduction between the local
 419          * buffer and the received buffer. In the next \log_2(p') - 1 steps, the
 420          * buffers are recursively halved. At the end, each of the p' processes
 421          * has 1 / p' of the total reduction result.
 422          */
 423         int send_index = 0, recv_index = 0, last_index = nprocs_pof2;
 424         for (int mask = nprocs_pof2 >> 1; mask > 0; mask >>= 1) {
 425             int vpeer = vrank ^ mask;
 426             int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem;
 427 
 428             /*
 429              * Calculate the recv_count and send_count because the
 430              * even-numbered processes who no longer participate will
 431              * have their result calculated by the process to their
 432              * right (rank + 1).
 433              */
 434             int send_count = 0, recv_count = 0;
 435             if (vrank < vpeer) {
 436                 /* Send the right half of the buffer, recv the left half */
 437                 send_index = recv_index + mask;
 438                 send_count = rcount * ompi_range_sum(send_index, last_index - 1, nprocs_rem - 1);
 439                 recv_count = rcount * ompi_range_sum(recv_index, send_index - 1, nprocs_rem - 1);
 440             } else {
 441                 /* Send the left half of the buffer, recv the right half */
 442                 recv_index = send_index + mask;
 443                 send_count = rcount * ompi_range_sum(send_index, recv_index - 1, nprocs_rem - 1);
 444                 recv_count = rcount * ompi_range_sum(recv_index, last_index - 1, nprocs_rem - 1);
 445             }
 446             ptrdiff_t rdispl = rcount * ((recv_index <= nprocs_rem - 1) ?
 447                                          2 * recv_index : nprocs_rem + recv_index);
 448             ptrdiff_t sdispl = rcount * ((send_index <= nprocs_rem - 1) ?
 449                                          2 * send_index : nprocs_rem + send_index);
 450             struct ompi_request_t *request = NULL;
 451 
 452             if (recv_count > 0) {
 453                 err = MCA_PML_CALL(irecv(tmprecv + rdispl * extent, recv_count,
 454                                          dtype, peer, MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 455                                          comm, &request));
 456                 if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
 457             }
 458             if (send_count > 0) {
 459                 err = MCA_PML_CALL(send(tmpbuf + sdispl * extent, send_count,
 460                                         dtype, peer, MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 461                                         MCA_PML_BASE_SEND_STANDARD,
 462                                         comm));
 463                 if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
 464             }
 465             if (recv_count > 0) {
 466                 err = ompi_request_wait(&request, MPI_STATUS_IGNORE);
 467                 if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
 468                 ompi_op_reduce(op, tmprecv + rdispl * extent,
 469                                tmpbuf + rdispl * extent, recv_count, dtype);
 470             }
 471             send_index = recv_index;
 472             last_index = recv_index + mask;
 473         }
 474         err = ompi_datatype_copy_content_same_ddt(dtype, rcount, rbuf,
 475                                                   tmpbuf + (ptrdiff_t)rank * rcount * extent);
 476         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 477     }
 478 
 479     /* Step 3. Send the result to excluded even ranks */
 480     if (rank < 2 * nprocs_rem) {
 481         if ((rank % 2) == 0) {
 482             /* Even process */
 483             err = MCA_PML_CALL(recv(rbuf, rcount, dtype, rank + 1,
 484                                     MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK, comm,
 485                                     MPI_STATUS_IGNORE));
 486             if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
 487         } else {
 488             /* Odd process */
 489             err = MCA_PML_CALL(send(tmpbuf + (ptrdiff_t)(rank - 1) * rcount * extent,
 490                                     rcount, dtype, rank - 1,
 491                                     MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 492                                     MCA_PML_BASE_SEND_STANDARD, comm));
 493             if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 494         }
 495     }
 496 
 497 cleanup_and_return:
 498     if (tmpbuf_raw)
 499         free(tmpbuf_raw);
 500     if (tmprecv_raw)
 501         free(tmprecv_raw);
 502     return err;
 503 }
 504 
 505 static int ompi_coll_base_reduce_scatter_block_intra_butterfly_pof2(
 506     const void *sbuf, void *rbuf, int rcount, struct ompi_datatype_t *dtype,
 507     struct ompi_op_t *op, struct ompi_communicator_t *comm,
 508     mca_coll_base_module_t *module);
 509 
 510 /*
 511  * ompi_coll_base_reduce_scatter_block_intra_butterfly
 512  *
 513  * Function:  Butterfly algorithm for reduce_scatter_block
 514  * Accepts:   Same as MPI_Reduce_scatter_block
 515  * Returns:   MPI_SUCCESS or error code
 516  *
 517  * Description:  Implements butterfly algorithm for MPI_Reduce_scatter_block [*].
 518  *               The algorithm can be used both by commutative and non-commutative
 519  *               operations, for power-of-two and non-power-of-two number of processes.
 520  *
 521  * [*] J.L. Traff. An improved Algorithm for (non-commutative) Reduce-scatter
 522  *     with an Application // Proc. of EuroPVM/MPI, 2005. -- pp. 129-137.
 523  *
 524  * Time complexity:
 525  *   m\lambda + (\alpha + m\beta + m\gamma) +
 526  *   + 2\log_2(p)\alpha + 2m(1-1/p)\beta + m(1-1/p)\gamma +
 527  *   + 3(\alpha + m/p\beta) = O(m\lambda + log(p)\alpha + m\beta + m\gamma),
 528  *   where m = rcount * comm_size, p = comm_size
 529  * Memory requirements (per process): 2 * rcount * comm_size * typesize
 530  *
 531  * Example: comm_size=6, nprocs_pof2=4, nprocs_rem=2, rcount=1, sbuf=[0,1,...,5]
 532  * Step 1. Reduce the number of processes to 4
 533  * rank 0: [0|1|2|3|4|5]: send to 1: vrank -1
 534  * rank 1: [0|1|2|3|4|5]: recv from 0, op: vrank 0: [0|2|4|6|8|10]
 535  * rank 2: [0|1|2|3|4|5]: send to 3: vrank -1
 536  * rank 3: [0|1|2|3|4|5]: recv from 2, op: vrank 1: [0|2|4|6|8|10]
 537  * rank 4: [0|1|2|3|4|5]: vrank 2: [0|1|2|3|4|5]
 538  * rank 5: [0|1|2|3|4|5]: vrank 3: [0|1|2|3|4|5]
 539  *
 540  * Step 2. Butterfly. Buffer of 6 elements is divided into 4 blocks.
 541  * Round 1 (mask=1, nblocks=2)
 542  * 0: vrank -1
 543  * 1: vrank  0 [0 2|4 6|8|10]: exch with 1: send [2,3], recv [0,1]: [0 4|8 12|*|*]
 544  * 2: vrank -1
 545  * 3: vrank  1 [0 2|4 6|8|10]: exch with 0: send [0,1], recv [2,3]: [**|**|16|20]
 546  * 4: vrank  2 [0 1|2 3|4|5] : exch with 3: send [2,3], recv [0,1]: [0 2|4 6|*|*]
 547  * 5: vrank  3 [0 1|2 3|4|5] : exch with 2: send [0,1], recv [2,3]: [**|**|8|10]
 548  *
 549  * Round 2 (mask=2, nblocks=1)
 550  * 0: vrank -1
 551  * 1: vrank  0 [0 4|8 12|*|*]: exch with 2: send [1], recv [0]: [0 6|**|*|*]
 552  * 2: vrank -1
 553  * 3: vrank  1 [**|**|16|20] : exch with 3: send [3], recv [2]: [**|**|24|*]
 554  * 4: vrank  2 [0 2|4 6|*|*] : exch with 0: send [0], recv [1]: [**|12 18|*|*]
 555  * 5: vrank  3 [**|**|8|10]  : exch with 1: send [2], recv [3]: [**|**|*|30]
 556  *
 557  * Step 3. Exchange with remote process according to a mirror permutation:
 558  *         mperm(0)=0, mperm(1)=2, mperm(2)=1, mperm(3)=3
 559  * 0: vrank -1: recv "0" from process 0
 560  * 1: vrank  0 [0 6|**|*|*]: send "0" to 0, copy "6" to rbuf (mperm(0)=0)
 561  * 2: vrank -1: recv result "12" from process 4
 562  * 3: vrank  1 [**|**|24|*]
 563  * 4: vrank  2 [**|12 18|*|*]: send "12" to 2, send "18" to 3, recv "24" from 3
 564  * 5: vrank  3 [**|**|*|30]: copy "30" to rbuf (mperm(3)=3)
 565  */
 566 int
 567 ompi_coll_base_reduce_scatter_block_intra_butterfly(
 568     const void *sbuf, void *rbuf, int rcount, struct ompi_datatype_t *dtype,
 569     struct ompi_op_t *op, struct ompi_communicator_t *comm,
 570     mca_coll_base_module_t *module)
 571 {
 572     char *tmpbuf[2] = {NULL, NULL}, *psend, *precv;
 573     ptrdiff_t span, gap, totalcount, extent;
 574     int err = MPI_SUCCESS;
 575     int comm_size = ompi_comm_size(comm);
 576     int rank = ompi_comm_rank(comm);
 577 
 578     OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
 579                  "coll:base:reduce_scatter_block_intra_butterfly: rank %d/%d",
 580                  rank, comm_size));
 581     if (rcount == 0 || comm_size < 2)
 582         return MPI_SUCCESS;
 583 
 584     if (!(comm_size & (comm_size - 1))) {
 585         /* Special case: comm_size is a power of two */
 586         return ompi_coll_base_reduce_scatter_block_intra_butterfly_pof2(
 587                    sbuf, rbuf, rcount, dtype, op, comm, module);
 588     }
 589 
 590     totalcount = comm_size * rcount;
 591     ompi_datatype_type_extent(dtype, &extent);
 592     span = opal_datatype_span(&dtype->super, totalcount, &gap);
 593     tmpbuf[0] = malloc(span);
 594     tmpbuf[1] = malloc(span);
 595     if (NULL == tmpbuf[0] || NULL == tmpbuf[1]) {
 596         err = OMPI_ERR_OUT_OF_RESOURCE;
 597         goto cleanup_and_return;
 598     }
 599     psend = tmpbuf[0] - gap;
 600     precv = tmpbuf[1] - gap;
 601 
 602     if (sbuf != MPI_IN_PLACE) {
 603         err = ompi_datatype_copy_content_same_ddt(dtype, totalcount, psend, (char *)sbuf);
 604         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 605     } else {
 606         err = ompi_datatype_copy_content_same_ddt(dtype, totalcount, psend, rbuf);
 607         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 608     }
 609 
 610     /*
 611      * Step 1. Reduce the number of processes to the nearest lower power of two
 612      * p' = 2^{\floor{\log_2 p}} by removing r = p - p' processes.
 613      * In the first 2r processes (ranks 0 to 2r - 1), all the even ranks send
 614      * the input vector to their neighbor (rank + 1) and all the odd ranks recv
 615      * the input vector and perform local reduction.
 616      * The odd ranks (0 to 2r - 1) contain the reduction with the input
 617      * vector on their neighbors (the even ranks). The first r odd
 618      * processes and the p - 2r last processes are renumbered from
 619      * 0 to 2^{\floor{\log_2 p}} - 1. Even ranks do not participate in the
 620      * rest of the algorithm.
 621      */
 622 
 623     /* Find nearest power-of-two less than or equal to comm_size */
 624     int nprocs_pof2 = opal_next_poweroftwo(comm_size);
 625     nprocs_pof2 >>= 1;
 626     int nprocs_rem = comm_size - nprocs_pof2;
 627     int log2_size = opal_cube_dim(nprocs_pof2);
 628 
 629     int vrank = -1;
 630     if (rank < 2 * nprocs_rem) {
 631         if ((rank % 2) == 0) {
 632             /* Even process */
 633             err = MCA_PML_CALL(send(psend, totalcount, dtype, rank + 1,
 634                                     MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 635                                     MCA_PML_BASE_SEND_STANDARD, comm));
 636             if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
 637             /* This process does not participate in the rest of the algorithm */
 638             vrank = -1;
 639         } else {
 640             /* Odd process */
 641             err = MCA_PML_CALL(recv(precv, totalcount, dtype, rank - 1,
 642                                     MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 643                                     comm, MPI_STATUS_IGNORE));
 644             if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
 645             ompi_op_reduce(op, precv, psend, totalcount, dtype);
 646             /* Adjust rank to be the bottom "remain" ranks */
 647             vrank = rank / 2;
 648         }
 649     } else {
 650         /* Adjust rank to show that the bottom "even remain" ranks dropped out */
 651         vrank = rank - nprocs_rem;
 652     }
 653 
 654     if (vrank != -1) {
 655         /*
 656          * Now, psend vector of size rcount * comm_size elements is divided into
 657          * nprocs_pof2 blocks:
 658          * block 0 has 2*rcount elems (for process 0 and 1)
 659          * block 1 has 2*rcount elems (for process 2 and 3)
 660          * ...
 661          * block r-1 has 2*rcount elems (for process 2*(r-1) and 2*(r-1)+1)
 662          * block r has rcount elems (for process r+r)
 663          * block r+1 has rcount elems (for process r+r+1)
 664          * ...
 665          * block nprocs_pof2 - 1 has rcount elems (for process r + nprocs_pof2-1)
 666          */
 667         int nblocks = nprocs_pof2, send_index = 0, recv_index = 0;
 668         for (int mask = 1; mask < nprocs_pof2; mask <<= 1) {
 669             int vpeer = vrank ^ mask;
 670             int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem;
 671 
 672             nblocks /= 2;
 673             if ((vrank & mask) == 0) {
 674                 /* Send the upper half of reduction buffer, recv the lower half */
 675                 send_index += nblocks;
 676             } else {
 677                 /* Send the upper half of reduction buffer, recv the lower half */
 678                 recv_index += nblocks;
 679             }
 680             int send_count = rcount * ompi_range_sum(send_index,
 681                                           send_index + nblocks - 1, nprocs_rem - 1);
 682             int recv_count = rcount * ompi_range_sum(recv_index,
 683                                           recv_index + nblocks - 1, nprocs_rem - 1);
 684             ptrdiff_t sdispl = rcount * ((send_index <= nprocs_rem - 1) ?
 685                                          2 * send_index : nprocs_rem + send_index);
 686             ptrdiff_t rdispl = rcount * ((recv_index <= nprocs_rem - 1) ?
 687                                          2 * recv_index : nprocs_rem + recv_index);
 688 
 689             err = ompi_coll_base_sendrecv(psend + (ptrdiff_t)sdispl * extent, send_count,
 690                                           dtype, peer, MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 691                                           precv + (ptrdiff_t)rdispl * extent, recv_count,
 692                                           dtype, peer, MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 693                                           comm, MPI_STATUS_IGNORE, rank);
 694             if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 695 
 696             if (vrank < vpeer) {
 697                 /* precv = psend <op> precv */
 698                 ompi_op_reduce(op, psend + (ptrdiff_t)rdispl * extent,
 699                                precv + (ptrdiff_t)rdispl * extent, recv_count, dtype);
 700                 char *p = psend;
 701                 psend = precv;
 702                 precv = p;
 703             } else {
 704                 /* psend = precv <op> psend */
 705                 ompi_op_reduce(op, precv + (ptrdiff_t)rdispl * extent,
 706                                psend + (ptrdiff_t)rdispl * extent, recv_count, dtype);
 707             }
 708             send_index = recv_index;
 709         }
 710         /*
 711          * psend points to the result: [send_index, send_index + recv_count - 1]
 712          * Exchange results with remote process according to a mirror permutation.
 713          */
 714         int vpeer = ompi_mirror_perm(vrank, log2_size);
 715         int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem;
 716 
 717         if (vpeer < nprocs_rem) {
 718             /*
 719              * Process has two blocks: for excluded process and own.
 720              * Send result to the excluded process.
 721              */
 722             ptrdiff_t sdispl = rcount * ((send_index <= nprocs_rem - 1) ?
 723                                          2 * send_index : nprocs_rem + send_index);
 724             err = MCA_PML_CALL(send(psend + (ptrdiff_t)sdispl * extent,
 725                                     rcount, dtype, peer - 1,
 726                                     MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 727                                     MCA_PML_BASE_SEND_STANDARD, comm));
 728             if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 729         }
 730 
 731         /* Send result to a remote process according to a mirror permutation */
 732         ptrdiff_t sdispl = rcount * ((send_index <= nprocs_rem - 1) ?
 733                                       2 * send_index : nprocs_rem + send_index);
 734         /* If process has two blocks, then send the second block (own block) */
 735         if (vpeer < nprocs_rem)
 736             sdispl += rcount;
 737         if (vpeer != vrank) {
 738             err = ompi_coll_base_sendrecv(psend + (ptrdiff_t)sdispl * extent, rcount,
 739                                           dtype, peer, MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 740                                           rbuf, rcount, dtype, peer,
 741                                           MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 742                                           comm, MPI_STATUS_IGNORE, rank);
 743             if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 744         } else {
 745             err = ompi_datatype_copy_content_same_ddt(dtype, rcount, rbuf,
 746                                                       psend + (ptrdiff_t)sdispl * extent);
 747             if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 748         }
 749 
 750     } else {
 751         /* Excluded process: receive result */
 752         int vpeer = ompi_mirror_perm((rank + 1) / 2, log2_size);
 753         int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem;
 754         err = MCA_PML_CALL(recv(rbuf, rcount, dtype, peer,
 755                                 MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK, comm,
 756                                 MPI_STATUS_IGNORE));
 757         if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
 758     }
 759 
 760 cleanup_and_return:
 761     if (tmpbuf[0])
 762         free(tmpbuf[0]);
 763     if (tmpbuf[1])
 764         free(tmpbuf[1]);
 765     return err;
 766 }
 767 
 768 /*
 769  * ompi_coll_base_reduce_scatter_block_intra_butterfly_pof2
 770  *
 771  * Function:    Butterfly algorithm for reduce_scatter_block
 772  * Accepts:     Same as MPI_Reduce_scatter_block
 773  * Returns:     MPI_SUCCESS or error code
 774  * Limitations: Power-of-two number of processes.
 775  *
 776  * Description:  Implements butterfly algorithm for MPI_Reduce_scatter_block [*].
 777  *               The algorithm can be used both by commutative and non-commutative
 778  *               operations, for power-of-two number of processes.
 779  *
 780  * [*] J.L. Traff. An improved Algorithm for (non-commutative) Reduce-scatter
 781  *     with an Application // Proc. of EuroPVM/MPI, 2005. -- pp. 129-137.
 782  *
 783  * Time complexity:
 784  *   m\lambda + 2\log_2(p)\alpha + 2m(1-1/p)\beta + m(1-1/p)\gamma + m/p\lambda =
 785  *   = O(m\lambda + log(p)\alpha + m\beta + m\gamma),
 786  *   where m = rcount * comm_size, p = comm_size
 787  * Memory requirements (per process): 2 * rcount * comm_size * typesize
 788  *
 789  * Example: comm_size=4, rcount=1, sbuf=[0,1,2,3]
 790  * Step 1. Permute the blocks according to a mirror permutation:
 791  *    mperm(0)=0, mperm(1)=2, mperm(2)=1, mperm(3)=3
 792  *    sbuf=[0|1|2|3] ==> psend=[0|2|1|3]
 793  *
 794  * Step 2. Butterfly
 795  * Round 1 (mask=1, nblocks=2)
 796  * 0: [0|2|1|3]: exch with 1: send [2,3], recv [0,1]: [0|4|*|*]
 797  * 1: [0|2|1|3]: exch with 0: send [0,1], recv [2,3]: [*|*|2|6]
 798  * 2: [0|2|1|3]: exch with 3: send [2,3], recv [0,1]: [0|4|*|*]
 799  * 3: [0|2|1|3]: exch with 2: send [0,1], recv [2,3]: [*|*|2|6]
 800  *
 801  * Round 2 (mask=2, nblocks=1)
 802  * 0: [0|4|*|*]: exch with 2: send [1], recv [0]: [0|*|*|*]
 803  * 1: [*|*|2|6]: exch with 3: send [3], recv [2]: [*|*|4|*]
 804  * 2: [0|4|*|*]: exch with 0: send [0], recv [1]: [*|8|*|*]
 805  * 3: [*|*|2|6]: exch with 1: send [2], recv [3]: [*|*|*|12]
 806  *
 807  * Step 3. Copy result to rbuf
 808  */
 809 static int
 810 ompi_coll_base_reduce_scatter_block_intra_butterfly_pof2(
 811     const void *sbuf, void *rbuf, int rcount, struct ompi_datatype_t *dtype,
 812     struct ompi_op_t *op, struct ompi_communicator_t *comm,
 813     mca_coll_base_module_t *module)
 814 {
 815     char *tmpbuf[2] = {NULL, NULL}, *psend, *precv;
 816     ptrdiff_t span, gap, totalcount, extent;
 817     int err = MPI_SUCCESS;
 818     int comm_size = ompi_comm_size(comm);
 819     int rank = ompi_comm_rank(comm);
 820 
 821     if (rcount == 0 || comm_size < 2)
 822         return MPI_SUCCESS;
 823 
 824     totalcount = comm_size * rcount;
 825     ompi_datatype_type_extent(dtype, &extent);
 826     span = opal_datatype_span(&dtype->super, totalcount, &gap);
 827     tmpbuf[0] = malloc(span);
 828     tmpbuf[1] = malloc(span);
 829     if (NULL == tmpbuf[0] || NULL == tmpbuf[1]) {
 830         err = OMPI_ERR_OUT_OF_RESOURCE;
 831         goto cleanup_and_return;
 832     }
 833     psend = tmpbuf[0] - gap;
 834     precv = tmpbuf[1] - gap;
 835 
 836     /* Permute the blocks according to a mirror permutation */
 837     int log2_comm_size = opal_cube_dim(comm_size);
 838     char *pdata = (sbuf != MPI_IN_PLACE) ? (char *)sbuf : rbuf;
 839     for (int i = 0; i < comm_size; i++) {
 840         char *src = pdata + (ptrdiff_t)i * extent * rcount;
 841         char *dst = psend + (ptrdiff_t)ompi_mirror_perm(i, log2_comm_size) * extent * rcount;
 842         err = ompi_datatype_copy_content_same_ddt(dtype, rcount, dst, src);
 843         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 844     }
 845 
 846     int nblocks = totalcount, send_index = 0, recv_index = 0;
 847     for (int mask = 1; mask < comm_size; mask <<= 1) {
 848         int peer = rank ^ mask;
 849         nblocks /= 2;
 850 
 851         if ((rank & mask) == 0) {
 852             /* Send the upper half of reduction buffer, recv the lower half */
 853             send_index += nblocks;
 854         } else {
 855             /* Send the upper half of reduction buffer, recv the lower half */
 856             recv_index += nblocks;
 857         }
 858         err = ompi_coll_base_sendrecv(psend + (ptrdiff_t)send_index * extent,
 859                                       nblocks, dtype, peer,
 860                                       MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 861                                       precv + (ptrdiff_t)recv_index * extent,
 862                                       nblocks, dtype, peer,
 863                                       MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 864                                       comm, MPI_STATUS_IGNORE, rank);
 865         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 866 
 867         if (rank < peer) {
 868             /* precv = psend <op> precv */
 869             ompi_op_reduce(op, psend + (ptrdiff_t)recv_index * extent,
 870                            precv + (ptrdiff_t)recv_index * extent, nblocks, dtype);
 871             char *p = psend;
 872             psend = precv;
 873             precv = p;
 874         } else {
 875             /* psend = precv <op> psend */
 876             ompi_op_reduce(op, precv + (ptrdiff_t)recv_index * extent,
 877                            psend + (ptrdiff_t)recv_index * extent, nblocks, dtype);
 878         }
 879         send_index = recv_index;
 880     }
 881     /* Copy the result to the rbuf */
 882     err = ompi_datatype_copy_content_same_ddt(dtype, rcount, rbuf,
 883                                               psend + (ptrdiff_t)recv_index * extent);
 884     if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 885 
 886 cleanup_and_return:
 887     if (tmpbuf[0])
 888         free(tmpbuf[0]);
 889     if (tmpbuf[1])
 890         free(tmpbuf[1]);
 891     return err;
 892 }

/* [<][>][^][v][top][bottom][index][help] */