root/ompi/mca/coll/basic/coll_basic_reduce.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_coll_basic_reduce_log_intra
  2. mca_coll_basic_reduce_lin_inter
  3. mca_coll_basic_reduce_log_inter

   1 /*
   2  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2015 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2015      Research Organization for Information Science
  13  *                         and Technology (RIST). All rights reserved.
  14  * $COPYRIGHT$
  15  *
  16  * Additional copyrights may follow
  17  *
  18  * $HEADER$
  19  */
  20 
  21 #include "ompi_config.h"
  22 #include "coll_basic.h"
  23 
  24 #include <stdio.h>
  25 
  26 #include "mpi.h"
  27 #include "ompi/constants.h"
  28 #include "ompi/mca/coll/coll.h"
  29 #include "ompi/mca/coll/base/coll_tags.h"
  30 #include "ompi/mca/pml/pml.h"
  31 #include "ompi/op/op.h"
  32 
  33 
  34 /*
  35  *      reduce_log_intra
  36  *
  37  *      Function:       - reduction using O(log N) algorithm
  38  *      Accepts:        - same as MPI_Reduce()
  39  *      Returns:        - MPI_SUCCESS or error code
  40  *
  41  *
  42  *      Performing reduction on each dimension of the hypercube.
  43  *      An example for 8 procs (dimensions = 3):
  44  *
  45  *      Stage 1, reduce on X dimension,  1 -> 0, 3 -> 2, 5 -> 4, 7 -> 6
  46  *
  47  *          6----<---7          proc_0: 0+1
  48  *         /|       /|          proc_1: 1
  49  *        / |      / |          proc_2: 2+3
  50  *       /  |     /  |          proc_3: 3
  51  *      4----<---5   |          proc_4: 4+5
  52  *      |   2--< |---3          proc_5: 5
  53  *      |  /     |  /           proc_6: 6+7
  54  *      | /      | /            proc_7: 7
  55  *      |/       |/
  56  *      0----<---1
  57  *
  58  *      Stage 2, reduce on Y dimension, 2 -> 0, 6 -> 4
  59  *
  60  *          6--------7          proc_0: 0+1+2+3
  61  *         /|       /|          proc_1: 1
  62  *        v |      / |          proc_2: 2+3
  63  *       /  |     /  |          proc_3: 3
  64  *      4--------5   |          proc_4: 4+5+6+7
  65  *      |   2--- |---3          proc_5: 5
  66  *      |  /     |  /           proc_6: 6+7
  67  *      | v      | /            proc_7: 7
  68  *      |/       |/
  69  *      0--------1
  70  *
  71  *      Stage 3, reduce on Z dimension, 4 -> 0
  72  *
  73  *          6--------7          proc_0: 0+1+2+3+4+5+6+7
  74  *         /|       /|          proc_1: 1
  75  *        / |      / |          proc_2: 2+3
  76  *       /  |     /  |          proc_3: 3
  77  *      4--------5   |          proc_4: 4+5+6+7
  78  *      |   2--- |---3          proc_5: 5
  79  *      v  /     |  /           proc_6: 6+7
  80  *      | /      | /            proc_7: 7
  81  *      |/       |/
  82  *      0--------1
  83  *
  84  *
  85  */
  86 int
  87 mca_coll_basic_reduce_log_intra(const void *sbuf, void *rbuf, int count,
  88                                 struct ompi_datatype_t *dtype,
  89                                 struct ompi_op_t *op,
  90                                 int root, struct ompi_communicator_t *comm,
  91                                 mca_coll_base_module_t *module)
  92 {
  93     int i, size, rank, vrank;
  94     int err, peer, dim, mask;
  95     ptrdiff_t lb, extent, dsize, gap;
  96     char *free_buffer = NULL;
  97     char *free_rbuf = NULL;
  98     char *pml_buffer = NULL;
  99     char *snd_buffer = NULL;
 100     char *rcv_buffer = (char*)rbuf;
 101     char *inplace_temp = NULL;
 102 
 103     /* JMS Codearound for now -- if the operations is not communative,
 104      * just call the linear algorithm.  Need to talk to Edgar / George
 105      * about fixing this algorithm here to work with non-communative
 106      * operations. */
 107 
 108     if (!ompi_op_is_commute(op)) {
 109         return ompi_coll_base_reduce_intra_basic_linear(sbuf, rbuf, count, dtype,
 110                                                         op, root, comm, module);
 111     }
 112 
 113     /* Some variables */
 114     size = ompi_comm_size(comm);
 115     rank = ompi_comm_rank(comm);
 116     vrank = ompi_op_is_commute(op) ? (rank - root + size) % size : rank;
 117     dim = comm->c_cube_dim;
 118 
 119     /* Allocate the incoming and resulting message buffers.  See lengthy
 120      * rationale above. */
 121 
 122     ompi_datatype_get_extent(dtype, &lb, &extent);
 123     dsize = opal_datatype_span(&dtype->super, count, &gap);
 124 
 125     free_buffer = (char*)malloc(dsize);
 126     if (NULL == free_buffer) {
 127         return OMPI_ERR_OUT_OF_RESOURCE;
 128     }
 129 
 130     pml_buffer = free_buffer - gap;
 131     /* read the comment about commutative operations (few lines down
 132      * the page) */
 133     if (ompi_op_is_commute(op)) {
 134         rcv_buffer = pml_buffer;
 135     }
 136 
 137     /* Allocate sendbuf in case the MPI_IN_PLACE option has been used. See lengthy
 138      * rationale above. */
 139 
 140     if (MPI_IN_PLACE == sbuf) {
 141         inplace_temp = (char*)malloc(dsize);
 142         if (NULL == inplace_temp) {
 143             err = OMPI_ERR_OUT_OF_RESOURCE;
 144             goto cleanup_and_return;
 145         }
 146         sbuf = inplace_temp - gap;
 147         err = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)sbuf, (char*)rbuf);
 148     }
 149     snd_buffer = (char*)sbuf;
 150 
 151     if (rank != root && 0 == (vrank & 1)) {
 152         /* root is the only one required to provide a valid rbuf.
 153          * Assume rbuf is invalid for all other ranks, so fix it up
 154          * here to be valid on all non-leaf ranks */
 155         free_rbuf = (char*)malloc(dsize);
 156         if (NULL == free_rbuf) {
 157             err = OMPI_ERR_OUT_OF_RESOURCE;
 158             goto cleanup_and_return;
 159         }
 160         rbuf = free_rbuf - gap;
 161     }
 162 
 163     /* Loop over cube dimensions. High processes send to low ones in the
 164      * dimension. */
 165 
 166     for (i = 0, mask = 1; i < dim; ++i, mask <<= 1) {
 167 
 168         /* A high-proc sends to low-proc and stops. */
 169         if (vrank & mask) {
 170             peer = vrank & ~mask;
 171             if (ompi_op_is_commute(op)) {
 172                 peer = (peer + root) % size;
 173             }
 174 
 175             err = MCA_PML_CALL(send(snd_buffer, count,
 176                                     dtype, peer, MCA_COLL_BASE_TAG_REDUCE,
 177                                     MCA_PML_BASE_SEND_STANDARD, comm));
 178             if (MPI_SUCCESS != err) {
 179                 goto cleanup_and_return;
 180             }
 181             snd_buffer = (char*)rbuf;
 182             break;
 183         }
 184 
 185         /* A low-proc receives, reduces, and moves to a higher
 186          * dimension. */
 187 
 188         else {
 189             peer = vrank | mask;
 190             if (peer >= size) {
 191                 continue;
 192             }
 193             if (ompi_op_is_commute(op)) {
 194                 peer = (peer + root) % size;
 195             }
 196 
 197             /* Most of the time (all except the first one for commutative
 198              * operations) we receive in the user provided buffer
 199              * (rbuf). But the exception is here to allow us to dont have
 200              * to copy from the sbuf to a temporary location. If the
 201              * operation is commutative we dont care in which order we
 202              * apply the operation, so for the first time we can receive
 203              * the data in the pml_buffer and then apply to operation
 204              * between this buffer and the user provided data. */
 205 
 206             err = MCA_PML_CALL(recv(rcv_buffer, count, dtype, peer,
 207                                     MCA_COLL_BASE_TAG_REDUCE, comm,
 208                                     MPI_STATUS_IGNORE));
 209             if (MPI_SUCCESS != err) {
 210                 goto cleanup_and_return;
 211             }
 212             /* Perform the operation. The target is always the user
 213              * provided buffer We do the operation only if we receive it
 214              * not in the user buffer */
 215             if (snd_buffer != sbuf) {
 216                 /* the target buffer is the locally allocated one */
 217                 ompi_op_reduce(op, rcv_buffer, pml_buffer, count, dtype);
 218             } else {
 219                 /* If we're commutative, we don't care about the order of
 220                  * operations and we can just reduce the operations now.
 221                  * If we are not commutative, we have to copy the send
 222                  * buffer into a temp buffer (pml_buffer) and then reduce
 223                  * what we just received against it. */
 224                 if (!ompi_op_is_commute(op)) {
 225                     ompi_datatype_copy_content_same_ddt(dtype, count, pml_buffer,
 226                                                    (char*)sbuf);
 227                     ompi_op_reduce(op, rbuf, pml_buffer, count, dtype);
 228                 } else {
 229                     ompi_op_reduce(op, (void *)sbuf, pml_buffer, count, dtype);
 230                 }
 231                 /* now we have to send the buffer containing the computed data */
 232                 snd_buffer = pml_buffer;
 233                 /* starting from now we always receive in the user
 234                  * provided buffer */
 235                 rcv_buffer = (char*)rbuf;
 236             }
 237         }
 238     }
 239 
 240     /* Get the result to the root if needed. */
 241     err = MPI_SUCCESS;
 242     if (0 == vrank) {
 243         if (root == rank) {
 244             ompi_datatype_copy_content_same_ddt(dtype, count, (char*)rbuf, snd_buffer);
 245         } else {
 246             err = MCA_PML_CALL(send(snd_buffer, count,
 247                                     dtype, root, MCA_COLL_BASE_TAG_REDUCE,
 248                                     MCA_PML_BASE_SEND_STANDARD, comm));
 249         }
 250     } else if (rank == root) {
 251         err = MCA_PML_CALL(recv(rcv_buffer, count, dtype, 0,
 252                                 MCA_COLL_BASE_TAG_REDUCE,
 253                                 comm, MPI_STATUS_IGNORE));
 254         if (rcv_buffer != rbuf) {
 255             ompi_op_reduce(op, rcv_buffer, rbuf, count, dtype);
 256         }
 257     }
 258 
 259   cleanup_and_return:
 260     if (NULL != inplace_temp) {
 261         free(inplace_temp);
 262     }
 263     if (NULL != free_buffer) {
 264         free(free_buffer);
 265     }
 266     if (NULL != free_rbuf) {
 267         free(free_rbuf);
 268     }
 269 
 270     /* All done */
 271 
 272     return err;
 273 }
 274 
 275 
 276 /*
 277  *      reduce_lin_inter
 278  *
 279  *      Function:       - reduction using O(N) algorithm
 280  *      Accepts:        - same as MPI_Reduce()
 281  *      Returns:        - MPI_SUCCESS or error code
 282  */
 283 int
 284 mca_coll_basic_reduce_lin_inter(const void *sbuf, void *rbuf, int count,
 285                                 struct ompi_datatype_t *dtype,
 286                                 struct ompi_op_t *op,
 287                                 int root, struct ompi_communicator_t *comm,
 288                                 mca_coll_base_module_t *module)
 289 {
 290     int i, err, size;
 291     ptrdiff_t dsize, gap;
 292     char *free_buffer = NULL;
 293     char *pml_buffer = NULL;
 294 
 295     /* Initialize */
 296     size = ompi_comm_remote_size(comm);
 297 
 298     if (MPI_PROC_NULL == root) {
 299         /* do nothing */
 300         err = OMPI_SUCCESS;
 301     } else if (MPI_ROOT != root) {
 302         /* If not root, send data to the root. */
 303         err = MCA_PML_CALL(send(sbuf, count, dtype, root,
 304                                 MCA_COLL_BASE_TAG_REDUCE,
 305                                 MCA_PML_BASE_SEND_STANDARD, comm));
 306     } else {
 307         /* Root receives and reduces messages  */
 308         dsize = opal_datatype_span(&dtype->super, count, &gap);
 309 
 310         free_buffer = (char*)malloc(dsize);
 311         if (NULL == free_buffer) {
 312             return OMPI_ERR_OUT_OF_RESOURCE;
 313         }
 314         pml_buffer = free_buffer - gap;
 315 
 316 
 317         /* Initialize the receive buffer. */
 318         err = MCA_PML_CALL(recv(rbuf, count, dtype, 0,
 319                                 MCA_COLL_BASE_TAG_REDUCE, comm,
 320                                 MPI_STATUS_IGNORE));
 321         if (MPI_SUCCESS != err) {
 322             if (NULL != free_buffer) {
 323                 free(free_buffer);
 324             }
 325             return err;
 326         }
 327 
 328         /* Loop receiving and calling reduction function (C or Fortran). */
 329         for (i = 1; i < size; i++) {
 330             err = MCA_PML_CALL(recv(pml_buffer, count, dtype, i,
 331                                     MCA_COLL_BASE_TAG_REDUCE, comm,
 332                                     MPI_STATUS_IGNORE));
 333             if (MPI_SUCCESS != err) {
 334                 if (NULL != free_buffer) {
 335                     free(free_buffer);
 336                 }
 337                 return err;
 338             }
 339 
 340             /* Perform the reduction */
 341             ompi_op_reduce(op, pml_buffer, rbuf, count, dtype);
 342         }
 343 
 344         if (NULL != free_buffer) {
 345             free(free_buffer);
 346         }
 347     }
 348 
 349     /* All done */
 350     return err;
 351 }
 352 
 353 
 354 /*
 355  *      reduce_log_inter
 356  *
 357  *      Function:       - reduction using O(N) algorithm
 358  *      Accepts:        - same as MPI_Reduce()
 359  *      Returns:        - MPI_SUCCESS or error code
 360  */
 361 int
 362 mca_coll_basic_reduce_log_inter(const void *sbuf, void *rbuf, int count,
 363                                 struct ompi_datatype_t *dtype,
 364                                 struct ompi_op_t *op,
 365                                 int root, struct ompi_communicator_t *comm,
 366                                 mca_coll_base_module_t *module)
 367 {
 368     return OMPI_ERR_NOT_IMPLEMENTED;
 369 }

/* [<][>][^][v][top][bottom][index][help] */