root/ompi/mca/coll/base/coll_base_alltoall.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_coll_base_alltoall_intra_basic_inplace
  2. ompi_coll_base_alltoall_intra_pairwise
  3. ompi_coll_base_alltoall_intra_bruck
  4. ompi_coll_base_alltoall_intra_linear_sync
  5. ompi_coll_base_alltoall_intra_two_procs
  6. ompi_coll_base_alltoall_intra_basic_linear

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   4  *                         University Research and Technology
   5  *                         Corporation.  All rights reserved.
   6  * Copyright (c) 2004-2017 The University of Tennessee and The University
   7  *                         of Tennessee Research Foundation.  All rights
   8  *                         reserved.
   9  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
  10  *                         University of Stuttgart.  All rights reserved.
  11  * Copyright (c) 2004-2005 The Regents of the University of California.
  12  *                         All rights reserved.
  13  * Copyright (c) 2013-2016 Los Alamos National Security, LLC. All Rights
  14  *                         reserved.
  15  * Copyright (c) 2014-2017 Research Organization for Information Science
  16  *                         and Technology (RIST). All rights reserved.
  17  * Copyright (c) 2017      IBM Corporation. All rights reserved.
  18  * $COPYRIGHT$
  19  *
  20  * Additional copyrights may follow
  21  *
  22  * $HEADER$
  23  */
  24 
  25 #include "ompi_config.h"
  26 
  27 #include "mpi.h"
  28 #include "ompi/constants.h"
  29 #include "ompi/datatype/ompi_datatype.h"
  30 #include "ompi/communicator/communicator.h"
  31 #include "ompi/mca/coll/coll.h"
  32 #include "ompi/mca/coll/base/coll_tags.h"
  33 #include "ompi/mca/pml/pml.h"
  34 #include "ompi/mca/coll/base/coll_base_functions.h"
  35 #include "coll_base_topo.h"
  36 #include "coll_base_util.h"
  37 
  38 /* MPI_IN_PLACE all to all algorithm. TODO: implement a better one. */
  39 int
  40 mca_coll_base_alltoall_intra_basic_inplace(const void *rbuf, int rcount,
  41                                            struct ompi_datatype_t *rdtype,
  42                                            struct ompi_communicator_t *comm,
  43                                            mca_coll_base_module_t *module)
  44 {
  45     int i, j, size, rank, err = MPI_SUCCESS, line;
  46     ptrdiff_t ext, gap = 0;
  47     ompi_request_t *req;
  48     char *allocated_buffer = NULL, *tmp_buffer;
  49     size_t max_size;
  50 
  51     /* Initialize. */
  52 
  53     size = ompi_comm_size(comm);
  54     rank = ompi_comm_rank(comm);
  55 
  56     /* If only one process, we're done. */
  57     if (1 == size) {
  58         return MPI_SUCCESS;
  59     }
  60 
  61     /* Find the largest receive amount */
  62     ompi_datatype_type_extent (rdtype, &ext);
  63     max_size = opal_datatype_span(&rdtype->super, rcount, &gap);
  64 
  65     /* Initiate all send/recv to/from others. */
  66 
  67     /* Allocate a temporary buffer */
  68     allocated_buffer = calloc (max_size, 1);
  69     if( NULL == allocated_buffer) { err = OMPI_ERR_OUT_OF_RESOURCE; line = __LINE__; goto error_hndl; }
  70     tmp_buffer = allocated_buffer - gap;
  71     max_size = ext * rcount;
  72 
  73     /* in-place alltoall slow algorithm (but works) */
  74     for (i = 0 ; i < size ; ++i) {
  75         for (j = i+1 ; j < size ; ++j) {
  76             if (i == rank) {
  77                 /* Copy the data into the temporary buffer */
  78                 err = ompi_datatype_copy_content_same_ddt (rdtype, rcount, tmp_buffer,
  79                                                        (char *) rbuf + j * max_size);
  80                 if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }
  81 
  82                 /* Exchange data with the peer */
  83                 err = MCA_PML_CALL(irecv ((char *) rbuf + max_size * j, rcount, rdtype,
  84                                           j, MCA_COLL_BASE_TAG_ALLTOALL, comm, &req));
  85                 if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }
  86 
  87                 err = MCA_PML_CALL(send ((char *) tmp_buffer,  rcount, rdtype,
  88                                           j, MCA_COLL_BASE_TAG_ALLTOALL, MCA_PML_BASE_SEND_STANDARD,
  89                                           comm));
  90                 if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }
  91             } else if (j == rank) {
  92                 /* Copy the data into the temporary buffer */
  93                 err = ompi_datatype_copy_content_same_ddt (rdtype, rcount, tmp_buffer,
  94                                                        (char *) rbuf + i * max_size);
  95                 if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }
  96 
  97                 /* Exchange data with the peer */
  98                 err = MCA_PML_CALL(irecv ((char *) rbuf + max_size * i, rcount, rdtype,
  99                                           i, MCA_COLL_BASE_TAG_ALLTOALL, comm, &req));
 100                 if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }
 101 
 102                 err = MCA_PML_CALL(send ((char *) tmp_buffer,  rcount, rdtype,
 103                                           i, MCA_COLL_BASE_TAG_ALLTOALL, MCA_PML_BASE_SEND_STANDARD,
 104                                           comm));
 105                 if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }
 106             } else {
 107                 continue;
 108             }
 109 
 110             /* Wait for the requests to complete */
 111             err = ompi_request_wait ( &req, MPI_STATUSES_IGNORE);
 112             if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }
 113         }
 114     }
 115 
 116  error_hndl:
 117     /* Free the temporary buffer */
 118     if( NULL != allocated_buffer )
 119         free (allocated_buffer);
 120 
 121     if( MPI_SUCCESS != err ) {
 122         OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
 123                      "%s:%4d\tError occurred %d, rank %2d", __FILE__, line, err,
 124                      rank));
 125         (void)line;  // silence compiler warning
 126     }
 127 
 128     /* All done */
 129     return err;
 130 }
 131 
 132 int ompi_coll_base_alltoall_intra_pairwise(const void *sbuf, int scount,
 133                                             struct ompi_datatype_t *sdtype,
 134                                             void* rbuf, int rcount,
 135                                             struct ompi_datatype_t *rdtype,
 136                                             struct ompi_communicator_t *comm,
 137                                             mca_coll_base_module_t *module)
 138 {
 139     int line = -1, err = 0, rank, size, step, sendto, recvfrom;
 140     void * tmpsend, *tmprecv;
 141     ptrdiff_t lb, sext, rext;
 142 
 143     if (MPI_IN_PLACE == sbuf) {
 144         return mca_coll_base_alltoall_intra_basic_inplace (rbuf, rcount, rdtype,
 145                                                             comm, module);
 146     }
 147 
 148     size = ompi_comm_size(comm);
 149     rank = ompi_comm_rank(comm);
 150 
 151     OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
 152                  "coll:base:alltoall_intra_pairwise rank %d", rank));
 153 
 154     err = ompi_datatype_get_extent (sdtype, &lb, &sext);
 155     if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
 156     err = ompi_datatype_get_extent (rdtype, &lb, &rext);
 157     if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
 158 
 159 
 160     /* Perform pairwise exchange - starting from 1 so the local copy is last */
 161     for (step = 1; step < size + 1; step++) {
 162 
 163         /* Determine sender and receiver for this step. */
 164         sendto  = (rank + step) % size;
 165         recvfrom = (rank + size - step) % size;
 166 
 167         /* Determine sending and receiving locations */
 168         tmpsend = (char*)sbuf + (ptrdiff_t)sendto * sext * (ptrdiff_t)scount;
 169         tmprecv = (char*)rbuf + (ptrdiff_t)recvfrom * rext * (ptrdiff_t)rcount;
 170 
 171         /* send and receive */
 172         err = ompi_coll_base_sendrecv( tmpsend, scount, sdtype, sendto,
 173                                         MCA_COLL_BASE_TAG_ALLTOALL,
 174                                         tmprecv, rcount, rdtype, recvfrom,
 175                                         MCA_COLL_BASE_TAG_ALLTOALL,
 176                                         comm, MPI_STATUS_IGNORE, rank);
 177         if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl;  }
 178     }
 179 
 180     return MPI_SUCCESS;
 181 
 182  err_hndl:
 183     OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
 184                  "%s:%4d\tError occurred %d, rank %2d", __FILE__, line,
 185                  err, rank));
 186     (void)line;  // silence compiler warning
 187     return err;
 188 }
 189 
 190 
 191 int ompi_coll_base_alltoall_intra_bruck(const void *sbuf, int scount,
 192                                          struct ompi_datatype_t *sdtype,
 193                                          void* rbuf, int rcount,
 194                                          struct ompi_datatype_t *rdtype,
 195                                          struct ompi_communicator_t *comm,
 196                                          mca_coll_base_module_t *module)
 197 {
 198     int i, k, line = -1, rank, size, err = 0;
 199     int sendto, recvfrom, distance, *displs = NULL, *blen = NULL;
 200     char *tmpbuf = NULL, *tmpbuf_free = NULL;
 201     ptrdiff_t sext, rext, span, gap = 0;
 202     struct ompi_datatype_t *new_ddt;
 203 
 204     if (MPI_IN_PLACE == sbuf) {
 205         return mca_coll_base_alltoall_intra_basic_inplace (rbuf, rcount, rdtype,
 206                                                             comm, module);
 207     }
 208 
 209     size = ompi_comm_size(comm);
 210     rank = ompi_comm_rank(comm);
 211 
 212     OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
 213                  "coll:base:alltoall_intra_bruck rank %d", rank));
 214 
 215     err = ompi_datatype_type_extent (sdtype, &sext);
 216     if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
 217 
 218     err = ompi_datatype_type_extent (rdtype, &rext);
 219     if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
 220 
 221     span = opal_datatype_span(&sdtype->super, (int64_t)size * scount, &gap);
 222 
 223     displs = (int *) malloc(size * sizeof(int));
 224     if (displs == NULL) { line = __LINE__; err = -1; goto err_hndl; }
 225     blen = (int *) malloc(size * sizeof(int));
 226     if (blen == NULL) { line = __LINE__; err = -1; goto err_hndl; }
 227 
 228     /* tmp buffer allocation for message data */
 229     tmpbuf_free = (char *)malloc(span);
 230     if (tmpbuf_free == NULL) { line = __LINE__; err = -1; goto err_hndl; }
 231     tmpbuf = tmpbuf_free - gap;
 232 
 233     /* Step 1 - local rotation - shift up by rank */
 234     err = ompi_datatype_copy_content_same_ddt (sdtype,
 235                                                (int32_t) ((ptrdiff_t)(size - rank) * (ptrdiff_t)scount),
 236                                                tmpbuf,
 237                                                ((char*) sbuf) + (ptrdiff_t)rank * (ptrdiff_t)scount * sext);
 238     if (err<0) {
 239         line = __LINE__; err = -1; goto err_hndl;
 240     }
 241 
 242     if (rank != 0) {
 243         err = ompi_datatype_copy_content_same_ddt (sdtype, (ptrdiff_t)rank * (ptrdiff_t)scount,
 244                                                    tmpbuf + (ptrdiff_t)(size - rank) * (ptrdiff_t)scount* sext,
 245                                                    (char*) sbuf);
 246         if (err<0) {
 247             line = __LINE__; err = -1; goto err_hndl;
 248         }
 249     }
 250 
 251     /* perform communication step */
 252     for (distance = 1; distance < size; distance<<=1) {
 253 
 254         sendto = (rank + distance) % size;
 255         recvfrom = (rank - distance + size) % size;
 256         k = 0;
 257 
 258         /* create indexed datatype */
 259         for (i = 1; i < size; i++) {
 260             if (( i & distance) == distance) {
 261                 displs[k] = (ptrdiff_t)i * (ptrdiff_t)scount;
 262                 blen[k] = scount;
 263                 k++;
 264             }
 265         }
 266         /* Set indexes and displacements */
 267         err = ompi_datatype_create_indexed(k, blen, displs, sdtype, &new_ddt);
 268         if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl;  }
 269         /* Commit the new datatype */
 270         err = ompi_datatype_commit(&new_ddt);
 271         if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl;  }
 272 
 273         /* Sendreceive */
 274         err = ompi_coll_base_sendrecv ( tmpbuf, 1, new_ddt, sendto,
 275                                          MCA_COLL_BASE_TAG_ALLTOALL,
 276                                          rbuf, 1, new_ddt, recvfrom,
 277                                          MCA_COLL_BASE_TAG_ALLTOALL,
 278                                          comm, MPI_STATUS_IGNORE, rank );
 279         if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
 280 
 281         /* Copy back new data from recvbuf to tmpbuf */
 282         err = ompi_datatype_copy_content_same_ddt(new_ddt, 1,tmpbuf, (char *) rbuf);
 283         if (err < 0) { line = __LINE__; err = -1; goto err_hndl;  }
 284 
 285         /* free ddt */
 286         err = ompi_datatype_destroy(&new_ddt);
 287         if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl;  }
 288     } /* end of for (distance = 1... */
 289 
 290     /* Step 3 - local rotation - */
 291     for (i = 0; i < size; i++) {
 292 
 293         err = ompi_datatype_copy_content_same_ddt (rdtype, (int32_t) rcount,
 294                                                    ((char*)rbuf) + ((ptrdiff_t)((rank - i + size) % size) * (ptrdiff_t)rcount * rext),
 295                                                    tmpbuf + (ptrdiff_t)i * (ptrdiff_t)rcount * rext);
 296         if (err < 0) { line = __LINE__; err = -1; goto err_hndl;  }
 297     }
 298 
 299     /* Step 4 - clean up */
 300     if (tmpbuf != NULL) free(tmpbuf_free);
 301     if (displs != NULL) free(displs);
 302     if (blen != NULL) free(blen);
 303     return OMPI_SUCCESS;
 304 
 305  err_hndl:
 306     OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
 307                  "%s:%4d\tError occurred %d, rank %2d", __FILE__, line, err,
 308                  rank));
 309     (void)line;  // silence compiler warning
 310     if (tmpbuf != NULL) free(tmpbuf_free);
 311     if (displs != NULL) free(displs);
 312     if (blen != NULL) free(blen);
 313     return err;
 314 }
 315 
 316 /*
 317  * alltoall_intra_linear_sync
 318  *
 319  * Function:       Linear implementation of alltoall with limited number
 320  *                 of outstanding requests.
 321  * Accepts:        Same as MPI_Alltoall(), and the maximum number of
 322  *                 outstanding requests (actual number is 2 * max, since
 323  *                 we count receive and send requests separately).
 324  * Returns:        MPI_SUCCESS or error code
 325  *
 326  * Description:    Algorithm is the following:
 327  *                 1) post K irecvs, K <= N
 328  *                 2) post K isends, K <= N
 329  *                 3) while not done
 330  *                    - wait for any request to complete
 331  *                    - replace that request by the new one of the same type.
 332  */
 333 int ompi_coll_base_alltoall_intra_linear_sync(const void *sbuf, int scount,
 334                                                struct ompi_datatype_t *sdtype,
 335                                                void* rbuf, int rcount,
 336                                                struct ompi_datatype_t *rdtype,
 337                                                struct ompi_communicator_t *comm,
 338                                                mca_coll_base_module_t *module,
 339                                                int max_outstanding_reqs)
 340 {
 341     int line, error, ri, si, rank, size, nrreqs, nsreqs, total_reqs;
 342     int nreqs = 0;
 343     char *psnd, *prcv;
 344     ptrdiff_t slb, sext, rlb, rext;
 345 
 346     ompi_request_t **reqs = NULL;
 347 
 348     if (MPI_IN_PLACE == sbuf) {
 349         return mca_coll_base_alltoall_intra_basic_inplace (rbuf, rcount, rdtype,
 350                                                             comm, module);
 351     }
 352 
 353     /* Initialize. */
 354 
 355     size = ompi_comm_size(comm);
 356     rank = ompi_comm_rank(comm);
 357 
 358     OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
 359                  "ompi_coll_base_alltoall_intra_linear_sync rank %d", rank));
 360 
 361     error = ompi_datatype_get_extent(sdtype, &slb, &sext);
 362     if (OMPI_SUCCESS != error) {
 363         return error;
 364     }
 365     sext *= scount;
 366 
 367     error = ompi_datatype_get_extent(rdtype, &rlb, &rext);
 368     if (OMPI_SUCCESS != error) {
 369         return error;
 370     }
 371     rext *= rcount;
 372 
 373     /* simple optimization */
 374 
 375     psnd = ((char *) sbuf) + (ptrdiff_t)rank * sext;
 376     prcv = ((char *) rbuf) + (ptrdiff_t)rank * rext;
 377 
 378     error = ompi_datatype_sndrcv(psnd, scount, sdtype, prcv, rcount, rdtype);
 379     if (MPI_SUCCESS != error) {
 380         return error;
 381     }
 382 
 383     /* If only one process, we're done. */
 384 
 385     if (1 == size) {
 386         return MPI_SUCCESS;
 387     }
 388 
 389     /* Initiate send/recv to/from others. */
 390     total_reqs =  (((max_outstanding_reqs > (size - 1)) ||
 391                     (max_outstanding_reqs <= 0)) ?
 392                    (size - 1) : (max_outstanding_reqs));
 393     if (0 < total_reqs) {
 394         reqs = ompi_coll_base_comm_get_reqs(module->base_data, 2 * total_reqs);
 395         if (NULL == reqs) { error = -1; line = __LINE__; goto error_hndl; }
 396         reqs[0] = reqs[1] = MPI_REQUEST_NULL;
 397     }
 398 
 399     prcv = (char *) rbuf;
 400     psnd = (char *) sbuf;
 401 
 402     /* Post first batch or ireceive and isend requests  */
 403     for (nreqs = 0, nrreqs = 0, ri = (rank + 1) % size; nreqs < total_reqs;
 404          ri = (ri + 1) % size, ++nrreqs) {
 405         nreqs++;
 406         error = MCA_PML_CALL(irecv
 407                              (prcv + (ptrdiff_t)ri * rext, rcount, rdtype, ri,
 408                               MCA_COLL_BASE_TAG_ALLTOALL, comm, &reqs[nreqs]));
 409         if (MPI_SUCCESS != error) { line = __LINE__; goto error_hndl; }
 410     }
 411     for (nsreqs = 0, si =  (rank + size - 1) % size; nreqs < 2 * total_reqs;
 412           si = (si + size - 1) % size, ++nsreqs) {
 413         nreqs++;
 414         error = MCA_PML_CALL(isend
 415                              (psnd + (ptrdiff_t)si * sext, scount, sdtype, si,
 416                               MCA_COLL_BASE_TAG_ALLTOALL,
 417                               MCA_PML_BASE_SEND_STANDARD, comm, &reqs[nreqs]));
 418         if (MPI_SUCCESS != error) { line = __LINE__; goto error_hndl; }
 419     }
 420 
 421     /* Wait for requests to complete */
 422     if (nreqs == 2 * (size - 1)) {
 423         /* Optimization for the case when all requests have been posted  */
 424         error = ompi_request_wait_all(nreqs, reqs, MPI_STATUSES_IGNORE);
 425         if (MPI_SUCCESS != error) { line = __LINE__; goto error_hndl; }
 426 
 427     } else {
 428         /* As requests complete, replace them with corresponding requests:
 429            - wait for any request to complete, mark the request as
 430            MPI_REQUEST_NULL
 431            - If it was a receive request, replace it with new irecv request
 432            (if any)
 433            - if it was a send request, replace it with new isend request (if any)
 434         */
 435         int ncreqs = 0;
 436         while (ncreqs < 2 * (size - 1)) {
 437             int completed;
 438             error = ompi_request_wait_any(2 * total_reqs, reqs, &completed,
 439                                           MPI_STATUS_IGNORE);
 440             if (MPI_SUCCESS != error) { line = __LINE__; goto error_hndl; }
 441             reqs[completed] = MPI_REQUEST_NULL;
 442             ncreqs++;
 443             if (completed < total_reqs) {
 444                 if (nrreqs < (size - 1)) {
 445                     error = MCA_PML_CALL(irecv
 446                                          (prcv + (ptrdiff_t)ri * rext, rcount, rdtype, ri,
 447                                           MCA_COLL_BASE_TAG_ALLTOALL, comm,
 448                                           &reqs[completed]));
 449                     if (MPI_SUCCESS != error) { line = __LINE__; goto error_hndl; }
 450                     ++nrreqs;
 451                     ri = (ri + 1) % size;
 452                 }
 453             } else {
 454                 if (nsreqs < (size - 1)) {
 455                     error = MCA_PML_CALL(isend
 456                                          (psnd + (ptrdiff_t)si * sext, scount, sdtype, si,
 457                                           MCA_COLL_BASE_TAG_ALLTOALL,
 458                                           MCA_PML_BASE_SEND_STANDARD, comm,
 459                                           &reqs[completed]));
 460                     if (MPI_SUCCESS != error) { line = __LINE__; goto error_hndl; }
 461                     ++nsreqs;
 462                     si = (si + size - 1) % size;
 463                 }
 464             }
 465         }
 466     }
 467 
 468     /* All done */
 469     return MPI_SUCCESS;
 470 
 471  error_hndl:
 472     /* find a real error code */
 473     if (MPI_ERR_IN_STATUS == error) {
 474         for( ri = 0; ri < nreqs; ri++ ) {
 475             if (MPI_REQUEST_NULL == reqs[ri]) continue;
 476             if (MPI_ERR_PENDING == reqs[ri]->req_status.MPI_ERROR) continue;
 477             error = reqs[ri]->req_status.MPI_ERROR;
 478             break;
 479         }
 480     }
 481     OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
 482                  "%s:%4d\tError occurred %d, rank %2d", __FILE__, line, error,
 483                  rank));
 484     (void)line;  // silence compiler warning
 485     ompi_coll_base_free_reqs(reqs, nreqs);
 486     return error;
 487 }
 488 
 489 
 490 int ompi_coll_base_alltoall_intra_two_procs(const void *sbuf, int scount,
 491                                              struct ompi_datatype_t *sdtype,
 492                                              void* rbuf, int rcount,
 493                                              struct ompi_datatype_t *rdtype,
 494                                              struct ompi_communicator_t *comm,
 495                                              mca_coll_base_module_t *module)
 496 {
 497     int line = -1, err = 0, rank, remote;
 498     void * tmpsend, *tmprecv;
 499     ptrdiff_t sext, rext, lb;
 500 
 501     if (MPI_IN_PLACE == sbuf) {
 502         return mca_coll_base_alltoall_intra_basic_inplace (rbuf, rcount, rdtype,
 503                                                             comm, module);
 504     }
 505 
 506     rank = ompi_comm_rank(comm);
 507 
 508     OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
 509                  "ompi_coll_base_alltoall_intra_two_procs rank %d", rank));
 510 
 511     if (2 != ompi_comm_size(comm)) {
 512         return MPI_ERR_UNSUPPORTED_OPERATION;
 513     }
 514 
 515     err = ompi_datatype_get_extent (sdtype, &lb, &sext);
 516     if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
 517 
 518     err = ompi_datatype_get_extent (rdtype, &lb, &rext);
 519     if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
 520 
 521     /* exchange data */
 522     remote  = rank ^ 1;
 523 
 524     tmpsend = (char*)sbuf + (ptrdiff_t)remote * sext * (ptrdiff_t)scount;
 525     tmprecv = (char*)rbuf + (ptrdiff_t)remote * rext * (ptrdiff_t)rcount;
 526 
 527     /* send and receive */
 528     err = ompi_coll_base_sendrecv ( tmpsend, scount, sdtype, remote,
 529                                      MCA_COLL_BASE_TAG_ALLTOALL,
 530                                      tmprecv, rcount, rdtype, remote,
 531                                      MCA_COLL_BASE_TAG_ALLTOALL,
 532                                      comm, MPI_STATUS_IGNORE, rank );
 533     if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl;  }
 534 
 535     /* ddt sendrecv your own data */
 536     err = ompi_datatype_sndrcv((char*) sbuf + (ptrdiff_t)rank * sext * (ptrdiff_t)scount,
 537                                (int32_t) scount, sdtype,
 538                                (char*) rbuf + (ptrdiff_t)rank * rext * (ptrdiff_t)rcount,
 539                                (int32_t) rcount, rdtype);
 540     if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl;  }
 541 
 542     /* done */
 543     return MPI_SUCCESS;
 544 
 545  err_hndl:
 546     OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
 547                  "%s:%4d\tError occurred %d, rank %2d", __FILE__, line, err,
 548                  rank));
 549     (void)line;  // silence compiler warning
 550     return err;
 551 }
 552 
 553 
 554 
 555 /*
 556  * Linear functions are copied from the BASIC coll module
 557  * they do not segment the message and are simple implementations
 558  * but for some small number of nodes and/or small data sizes they
 559  * are just as fast as base/tree based segmenting operations
 560  * and as such may be selected by the decision functions
 561  * These are copied into this module due to the way we select modules
 562  * in V1. i.e. in V2 we will handle this differently and so will not
 563  * have to duplicate code.
 564  * GEF Oct05 after asking Jeff.
 565  */
 566 
 567 /* copied function (with appropriate renaming) starts here */
 568 
 569 int ompi_coll_base_alltoall_intra_basic_linear(const void *sbuf, int scount,
 570                                                struct ompi_datatype_t *sdtype,
 571                                                void* rbuf, int rcount,
 572                                                struct ompi_datatype_t *rdtype,
 573                                                struct ompi_communicator_t *comm,
 574                                                mca_coll_base_module_t *module)
 575 {
 576     int i, rank, size, err, line;
 577     int nreqs = 0;
 578     char *psnd, *prcv;
 579     MPI_Aint lb, sndinc, rcvinc;
 580     ompi_request_t **req, **sreq, **rreq;
 581     mca_coll_base_module_t *base_module = (mca_coll_base_module_t*) module;
 582     mca_coll_base_comm_t *data = base_module->base_data;
 583 
 584     if (MPI_IN_PLACE == sbuf) {
 585         return mca_coll_base_alltoall_intra_basic_inplace (rbuf, rcount, rdtype,
 586                                                             comm, module);
 587     }
 588 
 589     /* Initialize. */
 590 
 591     size = ompi_comm_size(comm);
 592     rank = ompi_comm_rank(comm);
 593 
 594     OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
 595                  "ompi_coll_base_alltoall_intra_basic_linear rank %d", rank));
 596 
 597     err = ompi_datatype_get_extent(sdtype, &lb, &sndinc);
 598     if (OMPI_SUCCESS != err) {
 599         return err;
 600     }
 601     sndinc *= scount;
 602 
 603     err = ompi_datatype_get_extent(rdtype, &lb, &rcvinc);
 604     if (OMPI_SUCCESS != err) {
 605         return err;
 606     }
 607     rcvinc *= rcount;
 608 
 609     /* simple optimization */
 610 
 611     psnd = ((char *) sbuf) + (ptrdiff_t)rank * sndinc;
 612     prcv = ((char *) rbuf) + (ptrdiff_t)rank * rcvinc;
 613 
 614     err = ompi_datatype_sndrcv(psnd, scount, sdtype, prcv, rcount, rdtype);
 615     if (MPI_SUCCESS != err) {
 616         return err;
 617     }
 618 
 619     /* If only one process, we're done. */
 620 
 621     if (1 == size) {
 622         return MPI_SUCCESS;
 623     }
 624 
 625     /* Initiate all send/recv to/from others. */
 626 
 627     req = rreq = ompi_coll_base_comm_get_reqs(data, (size - 1) * 2);
 628     if (NULL == req) { err = OMPI_ERR_OUT_OF_RESOURCE; line = __LINE__; goto err_hndl; }
 629 
 630     prcv = (char *) rbuf;
 631     psnd = (char *) sbuf;
 632 
 633     /* Post all receives first -- a simple optimization */
 634 
 635     for (nreqs = 0, i = (rank + 1) % size; i != rank;
 636          i = (i + 1) % size, ++rreq) {
 637         nreqs++;
 638         err = MCA_PML_CALL(irecv_init
 639                            (prcv + (ptrdiff_t)i * rcvinc, rcount, rdtype, i,
 640                            MCA_COLL_BASE_TAG_ALLTOALL, comm, rreq));
 641         if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
 642     }
 643 
 644     /* Now post all sends in reverse order
 645        - We would like to minimize the search time through message queue
 646          when messages actually arrive in the order in which they were posted.
 647      */
 648     sreq = rreq;
 649     for (i = (rank + size - 1) % size; i != rank;
 650          i = (i + size - 1) % size, ++sreq) {
 651         nreqs++;
 652         err = MCA_PML_CALL(isend_init
 653                            (psnd + (ptrdiff_t)i * sndinc, scount, sdtype, i,
 654                            MCA_COLL_BASE_TAG_ALLTOALL,
 655                            MCA_PML_BASE_SEND_STANDARD, comm, sreq));
 656         if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
 657     }
 658 
 659     /* Start your engines.  This will never return an error. */
 660 
 661     MCA_PML_CALL(start(nreqs, req));
 662 
 663     /* Wait for them all.  If there's an error, note that we don't
 664      * care what the error was -- just that there *was* an error.  The
 665      * PML will finish all requests, even if one or more of them fail.
 666      * i.e., by the end of this call, all the requests are free-able.
 667      * So free them anyway -- even if there was an error, and return
 668      * the error after we free everything. */
 669 
 670     err = ompi_request_wait_all(nreqs, req, MPI_STATUSES_IGNORE);
 671     if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
 672 
 673  err_hndl:
 674     if (MPI_SUCCESS != err) {
 675         /* find a real error code */
 676         if (MPI_ERR_IN_STATUS == err) {
 677             for( i = 0; i < nreqs; i++ ) {
 678                 if (MPI_REQUEST_NULL == req[i]) continue;
 679                 if (MPI_ERR_PENDING == req[i]->req_status.MPI_ERROR) continue;
 680                 err = req[i]->req_status.MPI_ERROR;
 681                 break;
 682             }
 683         }
 684         OPAL_OUTPUT( (ompi_coll_base_framework.framework_output,"%s:%4d\tError occurred %d, rank %2d",
 685                       __FILE__, line, err, rank) );
 686         (void)line;  // silence compiler warning
 687     }
 688     /* Free the reqs in all cases as they are persistent requests */
 689     ompi_coll_base_free_reqs(req, nreqs);
 690 
 691     /* All done */
 692     return err;
 693 }
 694 
 695 /* copied function (with appropriate renaming) ends here */

/* [<][>][^][v][top][bottom][index][help] */