This source file includes following definitions.
- mca_coll_base_alltoallv_intra_basic_inplace
- ompi_coll_base_alltoallv_intra_pairwise
- ompi_coll_base_alltoallv_intra_basic_linear
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 
  17 
  18 
  19 
  20 
  21 
  22 
  23 
  24 
  25 
  26 
  27 #include "ompi_config.h"
  28 
  29 #include "mpi.h"
  30 #include "ompi/constants.h"
  31 #include "ompi/datatype/ompi_datatype.h"
  32 #include "ompi/communicator/communicator.h"
  33 #include "ompi/mca/coll/coll.h"
  34 #include "ompi/mca/coll/base/coll_tags.h"
  35 #include "ompi/mca/pml/pml.h"
  36 #include "ompi/mca/coll/base/coll_base_functions.h"
  37 #include "coll_base_topo.h"
  38 #include "coll_base_util.h"
  39 
  40 int
  41 mca_coll_base_alltoallv_intra_basic_inplace(const void *rbuf, const int *rcounts, const int *rdisps,
  42                                             struct ompi_datatype_t *rdtype,
  43                                             struct ompi_communicator_t *comm,
  44                                             mca_coll_base_module_t *module)
  45 {
  46     int i, j, size, rank, err=MPI_SUCCESS;
  47     char *allocated_buffer, *tmp_buffer;
  48     size_t max_size;
  49     ptrdiff_t ext, gap = 0;
  50 
  51     
  52 
  53     size = ompi_comm_size(comm);
  54     rank = ompi_comm_rank(comm);
  55 
  56     
  57     if (1 == size) {
  58         return MPI_SUCCESS;
  59     }
  60     
  61     ompi_datatype_type_extent (rdtype, &ext);
  62     for (i = 0, max_size = 0 ; i < size ; ++i) {
  63         if (i == rank) {
  64             continue;
  65         }
  66         size_t size = opal_datatype_span(&rdtype->super, rcounts[i], &gap);
  67         max_size = size > max_size ? size : max_size;
  68     }
  69     
  70 
  71     if (OPAL_UNLIKELY(0 == max_size)) {
  72         return MPI_SUCCESS;
  73     }
  74 
  75     
  76     allocated_buffer = calloc (max_size, 1);
  77     if (NULL == allocated_buffer) {
  78         return OMPI_ERR_OUT_OF_RESOURCE;
  79     }
  80     tmp_buffer = allocated_buffer - gap;
  81 
  82     
  83     
  84     for (i = 0 ; i < size ; ++i) {
  85         for (j = i+1 ; j < size ; ++j) {
  86             if (i == rank && 0 != rcounts[j]) {
  87                 
  88                 err = ompi_datatype_copy_content_same_ddt (rdtype, rcounts[j],
  89                                                            tmp_buffer, (char *) rbuf + rdisps[j] * ext);
  90                 if (MPI_SUCCESS != err) { goto error_hndl; }
  91 
  92                 
  93                 err = ompi_coll_base_sendrecv_actual((void *) tmp_buffer,  rcounts[j], rdtype,
  94                                                      j, MCA_COLL_BASE_TAG_ALLTOALLV,
  95                                                      (char *)rbuf + rdisps[j] * ext, rcounts[j], rdtype,
  96                                                      j, MCA_COLL_BASE_TAG_ALLTOALLV,
  97                                                      comm, MPI_STATUS_IGNORE);
  98                 if (MPI_SUCCESS != err) { goto error_hndl; }
  99             } else if (j == rank && 0 != rcounts[i]) {
 100                 
 101                 err = ompi_datatype_copy_content_same_ddt (rdtype, rcounts[i],
 102                                                            tmp_buffer, (char *) rbuf + rdisps[i] * ext);
 103                 if (MPI_SUCCESS != err) { goto error_hndl; }
 104 
 105                 
 106                 err = ompi_coll_base_sendrecv_actual((void *) tmp_buffer,  rcounts[i], rdtype,
 107                                                      i, MCA_COLL_BASE_TAG_ALLTOALLV,
 108                                                      (char *) rbuf + rdisps[i] * ext, rcounts[i], rdtype,
 109                                                      i, MCA_COLL_BASE_TAG_ALLTOALLV,
 110                                                      comm, MPI_STATUS_IGNORE);
 111                 if (MPI_SUCCESS != err) { goto error_hndl; }
 112             }
 113         }
 114     }
 115 
 116  error_hndl:
 117     
 118     free (allocated_buffer);
 119 
 120     
 121     return err;
 122 }
 123 
 124 int
 125 ompi_coll_base_alltoallv_intra_pairwise(const void *sbuf, const int *scounts, const int *sdisps,
 126                                          struct ompi_datatype_t *sdtype,
 127                                          void* rbuf, const int *rcounts, const int *rdisps,
 128                                          struct ompi_datatype_t *rdtype,
 129                                          struct ompi_communicator_t *comm,
 130                                          mca_coll_base_module_t *module)
 131 {
 132     int line = -1, err = 0, rank, size, step = 0, sendto, recvfrom;
 133     void *psnd, *prcv;
 134     ptrdiff_t sext, rext;
 135 
 136     if (MPI_IN_PLACE == sbuf) {
 137         return mca_coll_base_alltoallv_intra_basic_inplace (rbuf, rcounts, rdisps,
 138                                                              rdtype, comm, module);
 139     }
 140 
 141     size = ompi_comm_size(comm);
 142     rank = ompi_comm_rank(comm);
 143 
 144     OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
 145                  "coll:base:alltoallv_intra_pairwise rank %d", rank));
 146 
 147     ompi_datatype_type_extent(sdtype, &sext);
 148     ompi_datatype_type_extent(rdtype, &rext);
 149 
 150    
 151     for (step = 0; step < size; step++) {
 152 
 153         
 154         sendto  = (rank + step) % size;
 155         recvfrom = (rank + size - step) % size;
 156 
 157         
 158         psnd = (char*)sbuf + (ptrdiff_t)sdisps[sendto] * sext;
 159         prcv = (char*)rbuf + (ptrdiff_t)rdisps[recvfrom] * rext;
 160 
 161         
 162         err = ompi_coll_base_sendrecv( psnd, scounts[sendto], sdtype, sendto,
 163                                         MCA_COLL_BASE_TAG_ALLTOALLV,
 164                                         prcv, rcounts[recvfrom], rdtype, recvfrom,
 165                                         MCA_COLL_BASE_TAG_ALLTOALLV,
 166                                         comm, MPI_STATUS_IGNORE, rank);
 167         if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl;  }
 168     }
 169 
 170     return MPI_SUCCESS;
 171 
 172  err_hndl:
 173     OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
 174                  "%s:%4d\tError occurred %d, rank %2d at step %d", __FILE__, line,
 175                  err, rank, step));
 176     (void)line;  
 177     return err;
 178 }
 179 
 180 
 181 
 182 
 183 
 184 
 185 
 186 
 187 
 188 int
 189 ompi_coll_base_alltoallv_intra_basic_linear(const void *sbuf, const int *scounts, const int *sdisps,
 190                                             struct ompi_datatype_t *sdtype,
 191                                             void *rbuf, const int *rcounts, const int *rdisps,
 192                                             struct ompi_datatype_t *rdtype,
 193                                             struct ompi_communicator_t *comm,
 194                                             mca_coll_base_module_t *module)
 195 {
 196     int i, size, rank, err, nreqs;
 197     char *psnd, *prcv;
 198     ptrdiff_t sext, rext;
 199     ompi_request_t **preq, **reqs;
 200     mca_coll_base_module_t *base_module = (mca_coll_base_module_t*) module;
 201     mca_coll_base_comm_t *data = base_module->base_data;
 202 
 203     if (MPI_IN_PLACE == sbuf) {
 204         return  mca_coll_base_alltoallv_intra_basic_inplace (rbuf, rcounts, rdisps,
 205                                                               rdtype, comm, module);
 206     }
 207 
 208     size = ompi_comm_size(comm);
 209     rank = ompi_comm_rank(comm);
 210 
 211     OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
 212                  "coll:base:alltoallv_intra_basic_linear rank %d", rank));
 213 
 214     ompi_datatype_type_extent(sdtype, &sext);
 215     ompi_datatype_type_extent(rdtype, &rext);
 216 
 217     
 218     psnd = ((char *) sbuf) + (ptrdiff_t)sdisps[rank] * sext;
 219     prcv = ((char *) rbuf) + (ptrdiff_t)rdisps[rank] * rext;
 220     if (0 != scounts[rank]) {
 221         err = ompi_datatype_sndrcv(psnd, scounts[rank], sdtype,
 222                               prcv, rcounts[rank], rdtype);
 223         if (MPI_SUCCESS != err) {
 224             return err;
 225         }
 226     }
 227 
 228     
 229     if (1 == size) {
 230         return MPI_SUCCESS;
 231     }
 232 
 233     
 234     nreqs = 0;
 235     reqs = preq = ompi_coll_base_comm_get_reqs(data, 2 * size);
 236     if( NULL == reqs ) { err = OMPI_ERR_OUT_OF_RESOURCE; goto err_hndl; }
 237 
 238     
 239     for (i = 0; i < size; ++i) {
 240         if (i == rank) {
 241             continue;
 242         }
 243 
 244         ++nreqs;
 245         prcv = ((char *) rbuf) + (ptrdiff_t)rdisps[i] * rext;
 246         err = MCA_PML_CALL(irecv_init(prcv, rcounts[i], rdtype,
 247                                       i, MCA_COLL_BASE_TAG_ALLTOALLV, comm,
 248                                       preq++));
 249         if (MPI_SUCCESS != err) { goto err_hndl; }
 250     }
 251 
 252     
 253     for (i = 0; i < size; ++i) {
 254         if (i == rank) {
 255             continue;
 256         }
 257 
 258         ++nreqs;
 259         psnd = ((char *) sbuf) + (ptrdiff_t)sdisps[i] * sext;
 260         err = MCA_PML_CALL(isend_init(psnd, scounts[i], sdtype,
 261                                       i, MCA_COLL_BASE_TAG_ALLTOALLV,
 262                                       MCA_PML_BASE_SEND_STANDARD, comm,
 263                                       preq++));
 264         if (MPI_SUCCESS != err) { goto err_hndl; }
 265     }
 266 
 267     
 268     MCA_PML_CALL(start(nreqs, reqs));
 269 
 270     
 271 
 272 
 273 
 274 
 275 
 276     err = ompi_request_wait_all(nreqs, reqs, MPI_STATUSES_IGNORE);
 277 
 278  err_hndl:
 279     
 280     if (MPI_ERR_IN_STATUS == err) {
 281         for( i = 0; i < nreqs; i++ ) {
 282             if (MPI_REQUEST_NULL == reqs[i]) continue;
 283             if (MPI_ERR_PENDING == reqs[i]->req_status.MPI_ERROR) continue;
 284             err = reqs[i]->req_status.MPI_ERROR;
 285             break;
 286         }
 287     }
 288     
 289     ompi_coll_base_free_reqs(reqs, nreqs);
 290 
 291     return err;
 292 }