root/ompi/mca/coll/basic/coll_basic_reduce_scatter_block.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_coll_basic_reduce_scatter_block_intra
  2. mca_coll_basic_reduce_scatter_block_inter

   1 /*
   2  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2017 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2008      Sun Microsystems, Inc.  All rights reserved.
  13  * Copyright (c) 2012      Oak Ridge National Labs.  All rights reserved.
  14  * Copyright (c) 2012      Sandia National Laboratories. All rights reserved.
  15  * Copyright (c) 2014-2018 Research Organization for Information Science
  16  *                         and Technology (RIST). All rights reserved.
  17  * $COPYRIGHT$
  18  *
  19  * Additional copyrights may follow
  20  *
  21  * $HEADER$
  22  */
  23 
  24 #include "ompi_config.h"
  25 #include "coll_basic.h"
  26 
  27 #include <stdio.h>
  28 #include <errno.h>
  29 
  30 #include "mpi.h"
  31 #include "opal/util/bit_ops.h"
  32 #include "ompi/constants.h"
  33 #include "ompi/mca/coll/coll.h"
  34 #include "ompi/mca/coll/base/coll_tags.h"
  35 #include "ompi/mca/pml/pml.h"
  36 #include "ompi/datatype/ompi_datatype.h"
  37 #include "coll_basic.h"
  38 #include "ompi/op/op.h"
  39 
  40 #define COMMUTATIVE_LONG_MSG 8 * 1024 * 1024
  41 
  42 /*
  43  *      reduce_scatter_block
  44  *
  45  *      Function:       - reduce then scatter
  46  *      Accepts:        - same as MPI_Reduce_scatter_block()
  47  *      Returns:        - MPI_SUCCESS or error code
  48  *
  49  * Algorithm:
  50  *     reduce and scatter (needs to be cleaned
  51  *     up at some point)
  52  */
  53 int
  54 mca_coll_basic_reduce_scatter_block_intra(const void *sbuf, void *rbuf, int rcount,
  55                                           struct ompi_datatype_t *dtype,
  56                                           struct ompi_op_t *op,
  57                                           struct ompi_communicator_t *comm,
  58                                           mca_coll_base_module_t *module)
  59 {
  60     return ompi_coll_base_reduce_scatter_block_basic_linear(sbuf, rbuf, rcount, dtype, op, comm, module);
  61 }
  62 
  63 /*
  64  *      reduce_scatter_block_inter
  65  *
  66  *      Function:       - reduce/scatter operation
  67  *      Accepts:        - same arguments as MPI_Reduce_scatter()
  68  *      Returns:        - MPI_SUCCESS or error code
  69  */
  70 int
  71 mca_coll_basic_reduce_scatter_block_inter(const void *sbuf, void *rbuf, int rcount,
  72                                           struct ompi_datatype_t *dtype,
  73                                           struct ompi_op_t *op,
  74                                           struct ompi_communicator_t *comm,
  75                                           mca_coll_base_module_t *module)
  76 {
  77     int err, i, rank, root = 0, rsize, lsize;
  78     int totalcounts;
  79     ptrdiff_t gap, span;
  80     char *tmpbuf = NULL, *tmpbuf2 = NULL;
  81     char *lbuf = NULL, *buf;
  82     ompi_request_t *req;
  83 
  84     rank = ompi_comm_rank(comm);
  85     rsize = ompi_comm_remote_size(comm);
  86     lsize = ompi_comm_size(comm);
  87 
  88     totalcounts = lsize * rcount;
  89 
  90     /*
  91      * The following code basically does an interreduce followed by a
  92      * intrascatter.  This is implemented by having the roots of each
  93      * group exchange their sbuf.  Then, the roots receive the data
  94      * from each of the remote ranks and execute the reduce.  When
  95      * this is complete, they have the reduced data available to them
  96      * for doing the scatter.  They do this on the local communicator
  97      * associated with the intercommunicator.
  98      *
  99      * Note: There are other ways to implement MPI_Reduce_scatter_block on
 100      * intercommunicators.  For example, one could do a MPI_Reduce locally,
 101      * then send the results to the other root which could scatter it.
 102      *
 103      */
 104     if (rank == root) {
 105         span = opal_datatype_span(&dtype->super, totalcounts, &gap);
 106 
 107         tmpbuf = (char *) malloc(span);
 108         tmpbuf2 = (char *) malloc(span);
 109         if (NULL == tmpbuf || NULL == tmpbuf2) {
 110             return OMPI_ERR_OUT_OF_RESOURCE;
 111         }
 112         lbuf = tmpbuf - gap;
 113         buf = tmpbuf2 - gap;
 114 
 115         /* Do a send-recv between the two root procs. to avoid deadlock */
 116         err = MCA_PML_CALL(isend(sbuf, totalcounts, dtype, 0,
 117                                  MCA_COLL_BASE_TAG_REDUCE_SCATTER,
 118                                  MCA_PML_BASE_SEND_STANDARD, comm, &req));
 119         if (OMPI_SUCCESS != err) {
 120             goto exit;
 121         }
 122 
 123         err = MCA_PML_CALL(recv(lbuf, totalcounts, dtype, 0,
 124                                 MCA_COLL_BASE_TAG_REDUCE_SCATTER, comm,
 125                                 MPI_STATUS_IGNORE));
 126         if (OMPI_SUCCESS != err) {
 127             goto exit;
 128         }
 129 
 130         err = ompi_request_wait( &req, MPI_STATUS_IGNORE);
 131         if (OMPI_SUCCESS != err) {
 132             goto exit;
 133         }
 134 
 135 
 136         /* Loop receiving and calling reduction function (C or Fortran)
 137          * The result of this reduction operations is then in
 138          * tmpbuf2.
 139          */
 140         for (i = 1; i < rsize; i++) {
 141             char *tbuf;
 142             err = MCA_PML_CALL(recv(buf, totalcounts, dtype, i,
 143                                     MCA_COLL_BASE_TAG_REDUCE_SCATTER, comm,
 144                                     MPI_STATUS_IGNORE));
 145             if (MPI_SUCCESS != err) {
 146                 goto exit;
 147             }
 148 
 149             /* Perform the reduction */
 150             ompi_op_reduce(op, lbuf, buf, totalcounts, dtype);
 151             /* swap the buffers */
 152             tbuf = lbuf; lbuf = buf; buf = tbuf;
 153         }
 154     } else {
 155         /* If not root, send data to the root. */
 156         err = MCA_PML_CALL(send(sbuf, totalcounts, dtype, root,
 157                                 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
 158                                 MCA_PML_BASE_SEND_STANDARD, comm));
 159         if (OMPI_SUCCESS != err) {
 160             goto exit;
 161         }
 162     }
 163 
 164     /* Now do a scatterv on the local communicator */
 165     err = comm->c_local_comm->c_coll->coll_scatter(lbuf, rcount, dtype,
 166                                    rbuf, rcount, dtype, 0,
 167                                    comm->c_local_comm,
 168                                    comm->c_local_comm->c_coll->coll_scatter_module);
 169 
 170   exit:
 171     if (NULL != tmpbuf) {
 172         free(tmpbuf);
 173     }
 174 
 175     if (NULL != tmpbuf2) {
 176         free(tmpbuf2);
 177     }
 178 
 179     return err;
 180 }

/* [<][>][^][v][top][bottom][index][help] */