root/ompi/mca/coll/base/coll_base_alltoallv.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_coll_base_alltoallv_intra_basic_inplace
  2. ompi_coll_base_alltoallv_intra_pairwise
  3. ompi_coll_base_alltoallv_intra_basic_linear

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   4  *                         University Research and Technology
   5  *                         Corporation.  All rights reserved.
   6  * Copyright (c) 2004-2017 The University of Tennessee and The University
   7  *                         of Tennessee Research Foundation.  All rights
   8  *                         reserved.
   9  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
  10  *                         University of Stuttgart.  All rights reserved.
  11  * Copyright (c) 2004-2005 The Regents of the University of California.
  12  *                         All rights reserved.
  13  * Copyright (c) 2008      Sun Microsystems, Inc.  All rights reserved.
  14  * Copyright (c) 2013      Los Alamos National Security, LLC. All Rights
  15  *                         reserved.
  16  * Copyright (c) 2013      FUJITSU LIMITED.  All rights reserved.
  17  * Copyright (c) 2014-2017 Research Organization for Information Science
  18  *                         and Technology (RIST). All rights reserved.
  19  * Copyright (c) 2017      IBM Corporation. All rights reserved.
  20  * $COPYRIGHT$
  21  *
  22  * Additional copyrights may follow
  23  *
  24  * $HEADER$
  25  */
  26 
  27 #include "ompi_config.h"
  28 
  29 #include "mpi.h"
  30 #include "ompi/constants.h"
  31 #include "ompi/datatype/ompi_datatype.h"
  32 #include "ompi/communicator/communicator.h"
  33 #include "ompi/mca/coll/coll.h"
  34 #include "ompi/mca/coll/base/coll_tags.h"
  35 #include "ompi/mca/pml/pml.h"
  36 #include "ompi/mca/coll/base/coll_base_functions.h"
  37 #include "coll_base_topo.h"
  38 #include "coll_base_util.h"
  39 
  40 int
  41 mca_coll_base_alltoallv_intra_basic_inplace(const void *rbuf, const int *rcounts, const int *rdisps,
  42                                             struct ompi_datatype_t *rdtype,
  43                                             struct ompi_communicator_t *comm,
  44                                             mca_coll_base_module_t *module)
  45 {
  46     int i, j, size, rank, err=MPI_SUCCESS;
  47     char *allocated_buffer, *tmp_buffer;
  48     size_t max_size;
  49     ptrdiff_t ext, gap = 0;
  50 
  51     /* Initialize. */
  52 
  53     size = ompi_comm_size(comm);
  54     rank = ompi_comm_rank(comm);
  55 
  56     /* If only one process, we're done. */
  57     if (1 == size) {
  58         return MPI_SUCCESS;
  59     }
  60     /* Find the largest receive amount */
  61     ompi_datatype_type_extent (rdtype, &ext);
  62     for (i = 0, max_size = 0 ; i < size ; ++i) {
  63         if (i == rank) {
  64             continue;
  65         }
  66         size_t size = opal_datatype_span(&rdtype->super, rcounts[i], &gap);
  67         max_size = size > max_size ? size : max_size;
  68     }
  69     /* The gap will always be the same as we are working on the same datatype */
  70 
  71     if (OPAL_UNLIKELY(0 == max_size)) {
  72         return MPI_SUCCESS;
  73     }
  74 
  75     /* Allocate a temporary buffer */
  76     allocated_buffer = calloc (max_size, 1);
  77     if (NULL == allocated_buffer) {
  78         return OMPI_ERR_OUT_OF_RESOURCE;
  79     }
  80     tmp_buffer = allocated_buffer - gap;
  81 
  82     /* Initiate all send/recv to/from others. */
  83     /* in-place alltoallv slow algorithm (but works) */
  84     for (i = 0 ; i < size ; ++i) {
  85         for (j = i+1 ; j < size ; ++j) {
  86             if (i == rank && 0 != rcounts[j]) {
  87                 /* Copy the data into the temporary buffer */
  88                 err = ompi_datatype_copy_content_same_ddt (rdtype, rcounts[j],
  89                                                            tmp_buffer, (char *) rbuf + rdisps[j] * ext);
  90                 if (MPI_SUCCESS != err) { goto error_hndl; }
  91 
  92                 /* Exchange data with the peer */
  93                 err = ompi_coll_base_sendrecv_actual((void *) tmp_buffer,  rcounts[j], rdtype,
  94                                                      j, MCA_COLL_BASE_TAG_ALLTOALLV,
  95                                                      (char *)rbuf + rdisps[j] * ext, rcounts[j], rdtype,
  96                                                      j, MCA_COLL_BASE_TAG_ALLTOALLV,
  97                                                      comm, MPI_STATUS_IGNORE);
  98                 if (MPI_SUCCESS != err) { goto error_hndl; }
  99             } else if (j == rank && 0 != rcounts[i]) {
 100                 /* Copy the data into the temporary buffer */
 101                 err = ompi_datatype_copy_content_same_ddt (rdtype, rcounts[i],
 102                                                            tmp_buffer, (char *) rbuf + rdisps[i] * ext);
 103                 if (MPI_SUCCESS != err) { goto error_hndl; }
 104 
 105                 /* Exchange data with the peer */
 106                 err = ompi_coll_base_sendrecv_actual((void *) tmp_buffer,  rcounts[i], rdtype,
 107                                                      i, MCA_COLL_BASE_TAG_ALLTOALLV,
 108                                                      (char *) rbuf + rdisps[i] * ext, rcounts[i], rdtype,
 109                                                      i, MCA_COLL_BASE_TAG_ALLTOALLV,
 110                                                      comm, MPI_STATUS_IGNORE);
 111                 if (MPI_SUCCESS != err) { goto error_hndl; }
 112             }
 113         }
 114     }
 115 
 116  error_hndl:
 117     /* Free the temporary buffer */
 118     free (allocated_buffer);
 119 
 120     /* All done */
 121     return err;
 122 }
 123 
 124 int
 125 ompi_coll_base_alltoallv_intra_pairwise(const void *sbuf, const int *scounts, const int *sdisps,
 126                                          struct ompi_datatype_t *sdtype,
 127                                          void* rbuf, const int *rcounts, const int *rdisps,
 128                                          struct ompi_datatype_t *rdtype,
 129                                          struct ompi_communicator_t *comm,
 130                                          mca_coll_base_module_t *module)
 131 {
 132     int line = -1, err = 0, rank, size, step = 0, sendto, recvfrom;
 133     void *psnd, *prcv;
 134     ptrdiff_t sext, rext;
 135 
 136     if (MPI_IN_PLACE == sbuf) {
 137         return mca_coll_base_alltoallv_intra_basic_inplace (rbuf, rcounts, rdisps,
 138                                                              rdtype, comm, module);
 139     }
 140 
 141     size = ompi_comm_size(comm);
 142     rank = ompi_comm_rank(comm);
 143 
 144     OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
 145                  "coll:base:alltoallv_intra_pairwise rank %d", rank));
 146 
 147     ompi_datatype_type_extent(sdtype, &sext);
 148     ompi_datatype_type_extent(rdtype, &rext);
 149 
 150    /* Perform pairwise exchange starting from 1 since local exhange is done */
 151     for (step = 0; step < size; step++) {
 152 
 153         /* Determine sender and receiver for this step. */
 154         sendto  = (rank + step) % size;
 155         recvfrom = (rank + size - step) % size;
 156 
 157         /* Determine sending and receiving locations */
 158         psnd = (char*)sbuf + (ptrdiff_t)sdisps[sendto] * sext;
 159         prcv = (char*)rbuf + (ptrdiff_t)rdisps[recvfrom] * rext;
 160 
 161         /* send and receive */
 162         err = ompi_coll_base_sendrecv( psnd, scounts[sendto], sdtype, sendto,
 163                                         MCA_COLL_BASE_TAG_ALLTOALLV,
 164                                         prcv, rcounts[recvfrom], rdtype, recvfrom,
 165                                         MCA_COLL_BASE_TAG_ALLTOALLV,
 166                                         comm, MPI_STATUS_IGNORE, rank);
 167         if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl;  }
 168     }
 169 
 170     return MPI_SUCCESS;
 171 
 172  err_hndl:
 173     OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
 174                  "%s:%4d\tError occurred %d, rank %2d at step %d", __FILE__, line,
 175                  err, rank, step));
 176     (void)line;  // silence compiler warning
 177     return err;
 178 }
 179 
 180 /**
 181  * Linear functions are copied from the basic coll module.  For
 182  * some small number of nodes and/or small data sizes they are just as
 183  * fast as base/tree based segmenting operations and as such may be
 184  * selected by the decision functions.  These are copied into this module
 185  * due to the way we select modules in V1. i.e. in V2 we will handle this
 186  * differently and so will not have to duplicate code.
 187  */
 188 int
 189 ompi_coll_base_alltoallv_intra_basic_linear(const void *sbuf, const int *scounts, const int *sdisps,
 190                                             struct ompi_datatype_t *sdtype,
 191                                             void *rbuf, const int *rcounts, const int *rdisps,
 192                                             struct ompi_datatype_t *rdtype,
 193                                             struct ompi_communicator_t *comm,
 194                                             mca_coll_base_module_t *module)
 195 {
 196     int i, size, rank, err, nreqs;
 197     char *psnd, *prcv;
 198     ptrdiff_t sext, rext;
 199     ompi_request_t **preq, **reqs;
 200     mca_coll_base_module_t *base_module = (mca_coll_base_module_t*) module;
 201     mca_coll_base_comm_t *data = base_module->base_data;
 202 
 203     if (MPI_IN_PLACE == sbuf) {
 204         return  mca_coll_base_alltoallv_intra_basic_inplace (rbuf, rcounts, rdisps,
 205                                                               rdtype, comm, module);
 206     }
 207 
 208     size = ompi_comm_size(comm);
 209     rank = ompi_comm_rank(comm);
 210 
 211     OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
 212                  "coll:base:alltoallv_intra_basic_linear rank %d", rank));
 213 
 214     ompi_datatype_type_extent(sdtype, &sext);
 215     ompi_datatype_type_extent(rdtype, &rext);
 216 
 217     /* Simple optimization - handle send to self first */
 218     psnd = ((char *) sbuf) + (ptrdiff_t)sdisps[rank] * sext;
 219     prcv = ((char *) rbuf) + (ptrdiff_t)rdisps[rank] * rext;
 220     if (0 != scounts[rank]) {
 221         err = ompi_datatype_sndrcv(psnd, scounts[rank], sdtype,
 222                               prcv, rcounts[rank], rdtype);
 223         if (MPI_SUCCESS != err) {
 224             return err;
 225         }
 226     }
 227 
 228     /* If only one process, we're done. */
 229     if (1 == size) {
 230         return MPI_SUCCESS;
 231     }
 232 
 233     /* Now, initiate all send/recv to/from others. */
 234     nreqs = 0;
 235     reqs = preq = ompi_coll_base_comm_get_reqs(data, 2 * size);
 236     if( NULL == reqs ) { err = OMPI_ERR_OUT_OF_RESOURCE; goto err_hndl; }
 237 
 238     /* Post all receives first */
 239     for (i = 0; i < size; ++i) {
 240         if (i == rank) {
 241             continue;
 242         }
 243 
 244         ++nreqs;
 245         prcv = ((char *) rbuf) + (ptrdiff_t)rdisps[i] * rext;
 246         err = MCA_PML_CALL(irecv_init(prcv, rcounts[i], rdtype,
 247                                       i, MCA_COLL_BASE_TAG_ALLTOALLV, comm,
 248                                       preq++));
 249         if (MPI_SUCCESS != err) { goto err_hndl; }
 250     }
 251 
 252     /* Now post all sends */
 253     for (i = 0; i < size; ++i) {
 254         if (i == rank) {
 255             continue;
 256         }
 257 
 258         ++nreqs;
 259         psnd = ((char *) sbuf) + (ptrdiff_t)sdisps[i] * sext;
 260         err = MCA_PML_CALL(isend_init(psnd, scounts[i], sdtype,
 261                                       i, MCA_COLL_BASE_TAG_ALLTOALLV,
 262                                       MCA_PML_BASE_SEND_STANDARD, comm,
 263                                       preq++));
 264         if (MPI_SUCCESS != err) { goto err_hndl; }
 265     }
 266 
 267     /* Start your engines.  This will never return an error. */
 268     MCA_PML_CALL(start(nreqs, reqs));
 269 
 270     /* Wait for them all.  If there's an error, note that we don't care
 271      * what the error was -- just that there *was* an error.  The PML
 272      * will finish all requests, even if one or more of them fail.
 273      * i.e., by the end of this call, all the requests are free-able.
 274      * So free them anyway -- even if there was an error, and return the
 275      * error after we free everything. */
 276     err = ompi_request_wait_all(nreqs, reqs, MPI_STATUSES_IGNORE);
 277 
 278  err_hndl:
 279     /* find a real error code */
 280     if (MPI_ERR_IN_STATUS == err) {
 281         for( i = 0; i < nreqs; i++ ) {
 282             if (MPI_REQUEST_NULL == reqs[i]) continue;
 283             if (MPI_ERR_PENDING == reqs[i]->req_status.MPI_ERROR) continue;
 284             err = reqs[i]->req_status.MPI_ERROR;
 285             break;
 286         }
 287     }
 288     /* Free the requests in all cases as they are persistent */
 289     ompi_coll_base_free_reqs(reqs, nreqs);
 290 
 291     return err;
 292 }

/* [<][>][^][v][top][bottom][index][help] */