root/ompi/mca/coll/base/coll_base_exscan.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. ompi_coll_base_exscan_intra_linear
  2. ompi_coll_base_exscan_intra_recursivedoubling

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2018      Siberian State University of Telecommunications
   4  *                         and Information Science. All rights reserved.
   5  * Copyright (c) 2018      Research Organization for Information Science
   6  *                         and Technology (RIST).  All rights reserved.
   7  * $COPYRIGHT$
   8  *
   9  * Additional copyrights may follow
  10  *
  11  * $HEADER$
  12  */
  13 
  14 #include "ompi_config.h"
  15 
  16 #include "mpi.h"
  17 #include "ompi/constants.h"
  18 #include "ompi/datatype/ompi_datatype.h"
  19 #include "ompi/communicator/communicator.h"
  20 #include "ompi/mca/coll/coll.h"
  21 #include "ompi/mca/coll/base/coll_base_functions.h"
  22 #include "ompi/mca/coll/base/coll_tags.h"
  23 #include "ompi/mca/coll/base/coll_base_util.h"
  24 #include "ompi/mca/pml/pml.h"
  25 #include "ompi/op/op.h"
  26 
  27 /*
  28  * ompi_coll_base_exscan_intra_linear
  29  *
  30  * Function:  Linear algorithm for exclusive scan.
  31  * Accepts:   Same as MPI_Exscan
  32  * Returns:   MPI_SUCCESS or error code
  33  */
  34 int
  35 ompi_coll_base_exscan_intra_linear(const void *sbuf, void *rbuf, int count,
  36                                   struct ompi_datatype_t *dtype,
  37                                   struct ompi_op_t *op,
  38                                   struct ompi_communicator_t *comm,
  39                                   mca_coll_base_module_t *module)
  40 {
  41     int size, rank, err;
  42     ptrdiff_t dsize, gap;
  43     char *free_buffer = NULL;
  44     char *reduce_buffer = NULL;
  45 
  46     rank = ompi_comm_rank(comm);
  47     size = ompi_comm_size(comm);
  48 
  49     /* For MPI_IN_PLACE, just adjust send buffer to point to
  50      * receive buffer. */
  51     if (MPI_IN_PLACE == sbuf) {
  52         sbuf = rbuf;
  53     }
  54 
  55     /* If we're rank 0, then just send our sbuf to the next rank, and
  56      * we are done. */
  57     if (0 == rank) {
  58         return MCA_PML_CALL(send(sbuf, count, dtype, rank + 1,
  59                                  MCA_COLL_BASE_TAG_EXSCAN,
  60                                  MCA_PML_BASE_SEND_STANDARD, comm));
  61     }
  62 
  63     /* If we're the last rank, then just receive the result from the
  64      * prior rank, and we are done. */
  65     else if ((size - 1) == rank) {
  66         return MCA_PML_CALL(recv(rbuf, count, dtype, rank - 1,
  67                                  MCA_COLL_BASE_TAG_EXSCAN, comm,
  68                                  MPI_STATUS_IGNORE));
  69     }
  70 
  71     /* Otherwise, get the result from the prior rank, combine it with my
  72      * data, and send it to the next rank */
  73 
  74     /* Get a temporary buffer to perform the reduction into.  Rationale
  75      * for malloc'ing this size is provided in coll_basic_reduce.c. */
  76     dsize = opal_datatype_span(&dtype->super, count, &gap);
  77 
  78     free_buffer = (char*)malloc(dsize);
  79     if (NULL == free_buffer) {
  80         return OMPI_ERR_OUT_OF_RESOURCE;
  81     }
  82     reduce_buffer = free_buffer - gap;
  83     err = ompi_datatype_copy_content_same_ddt(dtype, count,
  84                                               reduce_buffer, (char*)sbuf);
  85 
  86     /* Receive the reduced value from the prior rank */
  87     err = MCA_PML_CALL(recv(rbuf, count, dtype, rank - 1,
  88                             MCA_COLL_BASE_TAG_EXSCAN, comm, MPI_STATUS_IGNORE));
  89     if (MPI_SUCCESS != err) {
  90         goto error;
  91     }
  92 
  93     /* Now reduce the prior rank's result with my source buffer.  The source
  94      * buffer had been previously copied into the temporary reduce_buffer. */
  95     ompi_op_reduce(op, rbuf, reduce_buffer, count, dtype);
  96 
  97     /* Send my result off to the next rank */
  98     err = MCA_PML_CALL(send(reduce_buffer, count, dtype, rank + 1,
  99                             MCA_COLL_BASE_TAG_EXSCAN,
 100                             MCA_PML_BASE_SEND_STANDARD, comm));
 101     /* Error */
 102   error:
 103     free(free_buffer);
 104 
 105     /* All done */
 106     return err;
 107 }
 108 
 109 
 110 /*
 111  * ompi_coll_base_exscan_intra_recursivedoubling
 112  *
 113  * Function:  Recursive doubling algorithm for exclusive scan.
 114  * Accepts:   Same as MPI_Exscan
 115  * Returns:   MPI_SUCCESS or error code
 116  *
 117  * Description:  Implements recursive doubling algorithm for MPI_Exscan.
 118  *               The algorithm preserves order of operations so it can
 119  *               be used both by commutative and non-commutative operations.
 120  *
 121  * Example for 5 processes and commutative operation MPI_SUM:
 122  * Process:  0                 1             2              3               4
 123  * recvbuf:  -                 -             -              -               -
 124  *   psend: [0]               [1]           [2]            [3]             [4]
 125  *
 126  *  Step 1:
 127  * recvbuf:  -                [0]            -             [2]              -
 128  *   psend: [1+0]             [0+1]         [3+2]          [2+3]           [4]
 129  *
 130  *  Step 2:
 131  * recvbuf:  -                [0]            [1+0]          [(0+1)+2]       -
 132  *   psend: [(3+2)+(1+0)]     [(2+3)+(0+1)]  [(1+0)+(3+2)]  [(1+0)+(2+3)]  [4]
 133  *
 134  *  Step 3:
 135  * recvbuf:  -                [0]            [1+0]          [(0+1)+2]      [(3+2)+(1+0)]
 136  *   psend: [4+((3+2)+(1+0))]                                              [((3+2)+(1+0))+4]
 137  *
 138  * Time complexity (worst case): \ceil(\log_2(p))(2\alpha + 2m\beta + 2m\gamma)
 139  * Memory requirements (per process): 2 * count * typesize = O(count)
 140  * Limitations: intra-communicators only
 141  */
 142 int ompi_coll_base_exscan_intra_recursivedoubling(
 143     const void *sendbuf, void *recvbuf, int count, struct ompi_datatype_t *datatype,
 144     struct ompi_op_t *op, struct ompi_communicator_t *comm,
 145     mca_coll_base_module_t *module)
 146 {
 147     int err = MPI_SUCCESS;
 148     char *tmpsend_raw = NULL, *tmprecv_raw = NULL;
 149     int comm_size = ompi_comm_size(comm);
 150     int rank = ompi_comm_rank(comm);
 151 
 152     OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "coll:base:exscan_intra_recursivedoubling: rank %d/%d",
 153                  rank, comm_size));
 154     if (count == 0)
 155         return MPI_SUCCESS;
 156     if (comm_size < 2)
 157         return MPI_SUCCESS;
 158 
 159     ptrdiff_t dsize, gap;
 160     dsize = opal_datatype_span(&datatype->super, count, &gap);
 161     tmpsend_raw = malloc(dsize);
 162     tmprecv_raw = malloc(dsize);
 163     if (NULL == tmpsend_raw || NULL == tmprecv_raw) {
 164         err = OMPI_ERR_OUT_OF_RESOURCE;
 165         goto cleanup_and_return;
 166     }
 167     char *psend = tmpsend_raw - gap;
 168     char *precv = tmprecv_raw - gap;
 169     if (sendbuf != MPI_IN_PLACE) {
 170         err = ompi_datatype_copy_content_same_ddt(datatype, count, psend, (char *)sendbuf);
 171         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 172     } else {
 173         err = ompi_datatype_copy_content_same_ddt(datatype, count, psend, recvbuf);
 174         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 175     }
 176     int is_commute = ompi_op_is_commute(op);
 177     int is_first_block = 1;
 178 
 179     for (int mask = 1; mask < comm_size; mask <<= 1) {
 180         int remote = rank ^ mask;
 181         if (remote < comm_size) {
 182             err = ompi_coll_base_sendrecv(psend, count, datatype, remote,
 183                                           MCA_COLL_BASE_TAG_EXSCAN,
 184                                           precv, count, datatype, remote,
 185                                           MCA_COLL_BASE_TAG_EXSCAN, comm,
 186                                           MPI_STATUS_IGNORE, rank);
 187             if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 188 
 189             if (rank > remote) {
 190                 /* Assertion: rank > 0 and rbuf is valid */
 191                 if (is_first_block) {
 192                     err = ompi_datatype_copy_content_same_ddt(datatype, count,
 193                                                               recvbuf, precv);
 194                     if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 195                     is_first_block = 0;
 196                 } else {
 197                     /* Accumulate prefix reduction: recvbuf = precv <op> recvbuf */
 198                     ompi_op_reduce(op, precv, recvbuf, count, datatype);
 199                 }
 200                 /* Partial result: psend = precv <op> psend */
 201                 ompi_op_reduce(op, precv, psend, count, datatype);
 202             } else {
 203                 if (is_commute) {
 204                     /* psend = precv <op> psend */
 205                     ompi_op_reduce(op, precv, psend, count, datatype);
 206                 } else {
 207                     /* precv = psend <op> precv */
 208                     ompi_op_reduce(op, psend, precv, count, datatype);
 209                     char *tmp = psend;
 210                     psend = precv;
 211                     precv = tmp;
 212                 }
 213             }
 214         }
 215     }
 216 
 217 cleanup_and_return:
 218     if (NULL != tmpsend_raw)
 219         free(tmpsend_raw);
 220     if (NULL != tmprecv_raw)
 221         free(tmprecv_raw);
 222     return err;
 223 }

/* [<][>][^][v][top][bottom][index][help] */