root/ompi/mca/coll/base/coll_base_scan.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. ompi_coll_base_scan_intra_linear
  2. ompi_coll_base_scan_intra_recursivedoubling

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2018      Siberian State University of Telecommunications
   4  *                         and Information Science. All rights reserved.
   5  * Copyright (c) 2018      Research Organization for Information Science
   6  *                         and Technology (RIST).  All rights reserved.
   7  * $COPYRIGHT$
   8  *
   9  * Additional copyrights may follow
  10  *
  11  * $HEADER$
  12  */
  13 
  14 #include "ompi_config.h"
  15 
  16 #include "mpi.h"
  17 #include "ompi/constants.h"
  18 #include "ompi/datatype/ompi_datatype.h"
  19 #include "ompi/communicator/communicator.h"
  20 #include "ompi/mca/coll/coll.h"
  21 #include "ompi/mca/coll/base/coll_base_functions.h"
  22 #include "ompi/mca/coll/base/coll_tags.h"
  23 #include "ompi/mca/coll/base/coll_base_util.h"
  24 #include "ompi/mca/pml/pml.h"
  25 #include "ompi/op/op.h"
  26 
  27 /*
  28  * ompi_coll_base_scan_intra_linear
  29  *
  30  * Function:  Linear algorithm for inclusive scan.
  31  * Accepts:   Same as MPI_Scan
  32  * Returns:   MPI_SUCCESS or error code
  33  */
  34 int
  35 ompi_coll_base_scan_intra_linear(const void *sbuf, void *rbuf, int count,
  36                                 struct ompi_datatype_t *dtype,
  37                                 struct ompi_op_t *op,
  38                                 struct ompi_communicator_t *comm,
  39                                 mca_coll_base_module_t *module)
  40 {
  41     int size, rank, err;
  42     ptrdiff_t dsize, gap;
  43     char *free_buffer = NULL;
  44     char *pml_buffer = NULL;
  45 
  46     /* Initialize */
  47 
  48     rank = ompi_comm_rank(comm);
  49     size = ompi_comm_size(comm);
  50 
  51     /* If I'm rank 0, just copy into the receive buffer */
  52 
  53     if (0 == rank) {
  54         if (MPI_IN_PLACE != sbuf) {
  55             err = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf);
  56             if (MPI_SUCCESS != err) {
  57                 return err;
  58             }
  59         }
  60     }
  61 
  62     /* Otherwise receive previous buffer and reduce. */
  63 
  64     else {
  65         /* Allocate a temporary buffer.  Rationale for this size is
  66          * listed in coll_basic_reduce.c.  Use this temporary buffer to
  67          * receive into, later. */
  68 
  69         dsize = opal_datatype_span(&dtype->super, count, &gap);
  70         free_buffer = malloc(dsize);
  71         if (NULL == free_buffer) {
  72             return OMPI_ERR_OUT_OF_RESOURCE;
  73         }
  74         pml_buffer = free_buffer - gap;
  75 
  76         /* Copy the send buffer into the receive buffer. */
  77 
  78         if (MPI_IN_PLACE != sbuf) {
  79             err = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf);
  80             if (MPI_SUCCESS != err) {
  81                 if (NULL != free_buffer) {
  82                     free(free_buffer);
  83                 }
  84                 return err;
  85             }
  86         }
  87 
  88         /* Receive the prior answer */
  89 
  90         err = MCA_PML_CALL(recv(pml_buffer, count, dtype,
  91                                 rank - 1, MCA_COLL_BASE_TAG_SCAN, comm,
  92                                 MPI_STATUS_IGNORE));
  93         if (MPI_SUCCESS != err) {
  94             if (NULL != free_buffer) {
  95                 free(free_buffer);
  96             }
  97             return err;
  98         }
  99 
 100         /* Perform the operation */
 101 
 102         ompi_op_reduce(op, pml_buffer, rbuf, count, dtype);
 103 
 104         /* All done */
 105 
 106         if (NULL != free_buffer) {
 107             free(free_buffer);
 108         }
 109     }
 110 
 111     /* Send result to next process. */
 112 
 113     if (rank < (size - 1)) {
 114         return MCA_PML_CALL(send(rbuf, count, dtype, rank + 1,
 115                                  MCA_COLL_BASE_TAG_SCAN,
 116                                  MCA_PML_BASE_SEND_STANDARD, comm));
 117     }
 118 
 119     /* All done */
 120 
 121     return MPI_SUCCESS;
 122 }
 123 
 124 
 125 /*
 126  * ompi_coll_base_scan_intra_recursivedoubling
 127  *
 128  * Function:  Recursive doubling algorithm for inclusive scan.
 129  * Accepts:   Same as MPI_Scan
 130  * Returns:   MPI_SUCCESS or error code
 131  *
 132  * Description:  Implements recursive doubling algorithm for MPI_Scan.
 133  *               The algorithm preserves order of operations so it can
 134  *               be used both by commutative and non-commutative operations.
 135  *
 136  * Example for 5 processes and commutative operation MPI_SUM:
 137  * Process:   0                 1              2              3              4
 138  * recvbuf:  [0]               [1]            [2]            [3]            [4]
 139  *   psend:  [0]               [1]            [2]            [3]            [4]
 140  *
 141  *  Step 1:
 142  * recvbuf:  [0]               [0+1]          [2]            [2+3]          [4]
 143  *   psend:  [1+0]             [0+1]          [3+2]          [2+3]          [4]
 144  *
 145  *  Step 2:
 146  * recvbuf:  [0]               [0+1]          [(1+0)+2]      [(1+0)+(2+3)]  [4]
 147  *  psend:   [(3+2)+(1+0)]     [(2+3)+(0+1)]  [(1+0)+(3+2)]  [(1+0)+(2+3)]  [4]
 148  *
 149  *  Step 3:
 150  * recvbuf:  [0]               [0+1]           [(1+0)+2]     [(1+0)+(2+3)]  [((3+2)+(1+0))+4]
 151  *   psend:  [4+((3+2)+(1+0))]                                              [((3+2)+(1+0))+4]
 152  *
 153  * Time complexity (worst case): \ceil(\log_2(p))(2\alpha + 2m\beta + 2m\gamma)
 154  * Memory requirements (per process): 2 * count * typesize = O(count)
 155  * Limitations: intra-communicators only
 156  */
 157 int ompi_coll_base_scan_intra_recursivedoubling(
 158     const void *sendbuf, void *recvbuf, int count, struct ompi_datatype_t *datatype,
 159     struct ompi_op_t *op, struct ompi_communicator_t *comm,
 160     mca_coll_base_module_t *module)
 161 {
 162     int err = MPI_SUCCESS;
 163     char *tmpsend_raw = NULL, *tmprecv_raw = NULL;
 164     int comm_size = ompi_comm_size(comm);
 165     int rank = ompi_comm_rank(comm);
 166 
 167     OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
 168                  "coll:base:scan_intra_recursivedoubling: rank %d/%d",
 169                  rank, comm_size));
 170     if (count == 0)
 171         return MPI_SUCCESS;
 172 
 173     if (sendbuf != MPI_IN_PLACE) {
 174         err = ompi_datatype_copy_content_same_ddt(datatype, count, recvbuf, (char *)sendbuf);
 175         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 176     }
 177     if (comm_size < 2)
 178         return MPI_SUCCESS;
 179 
 180     ptrdiff_t dsize, gap;
 181     dsize = opal_datatype_span(&datatype->super, count, &gap);
 182     tmpsend_raw = malloc(dsize);
 183     tmprecv_raw = malloc(dsize);
 184     if (NULL == tmpsend_raw || NULL == tmprecv_raw) {
 185         err = OMPI_ERR_OUT_OF_RESOURCE;
 186         goto cleanup_and_return;
 187     }
 188     char *psend = tmpsend_raw - gap;
 189     char *precv = tmprecv_raw - gap;
 190     err = ompi_datatype_copy_content_same_ddt(datatype, count, psend, recvbuf);
 191     if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 192     int is_commute = ompi_op_is_commute(op);
 193 
 194     for (int mask = 1; mask < comm_size; mask <<= 1) {
 195         int remote = rank ^ mask;
 196         if (remote < comm_size) {
 197             err = ompi_coll_base_sendrecv(psend, count, datatype, remote,
 198                                           MCA_COLL_BASE_TAG_SCAN,
 199                                           precv, count, datatype, remote,
 200                                           MCA_COLL_BASE_TAG_SCAN, comm,
 201                                           MPI_STATUS_IGNORE, rank);
 202             if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 203 
 204             if (rank > remote) {
 205                 /* Accumulate prefix reduction: recvbuf = precv <op> recvbuf */
 206                 ompi_op_reduce(op, precv, recvbuf, count, datatype);
 207                 /* Partial result: psend = precv <op> psend */
 208                 ompi_op_reduce(op, precv, psend, count, datatype);
 209             } else {
 210                 if (is_commute) {
 211                     /* psend = precv <op> psend */
 212                     ompi_op_reduce(op, precv, psend, count, datatype);
 213                 } else {
 214                     /* precv = psend <op> precv */
 215                     ompi_op_reduce(op, psend, precv, count, datatype);
 216                     char *tmp = psend;
 217                     psend = precv;
 218                     precv = tmp;
 219                 }
 220             }
 221         }
 222     }
 223 
 224 cleanup_and_return:
 225     if (NULL != tmpsend_raw)
 226         free(tmpsend_raw);
 227     if (NULL != tmprecv_raw)
 228         free(tmprecv_raw);
 229     return err;
 230 }

/* [<][>][^][v][top][bottom][index][help] */