This source file includes following definitions.
- ompi_coll_base_reduce_scatter_block_basic_linear
- ompi_coll_base_reduce_scatter_block_intra_recursivedoubling
- ompi_range_sum
- ompi_coll_base_reduce_scatter_block_intra_recursivehalving
- ompi_coll_base_reduce_scatter_block_intra_butterfly
- ompi_coll_base_reduce_scatter_block_intra_butterfly_pof2
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 
  17 
  18 
  19 
  20 
  21 
  22 
  23 
  24 
  25 
  26 
  27 #include "ompi_config.h"
  28 
  29 #include "mpi.h"
  30 #include "opal/util/bit_ops.h"
  31 #include "ompi/constants.h"
  32 #include "ompi/datatype/ompi_datatype.h"
  33 #include "ompi/communicator/communicator.h"
  34 #include "ompi/mca/coll/coll.h"
  35 #include "ompi/mca/coll/basic/coll_basic.h"
  36 #include "ompi/mca/pml/pml.h"
  37 #include "ompi/op/op.h"
  38 #include "coll_tags.h"
  39 #include "coll_base_functions.h"
  40 #include "coll_base_topo.h"
  41 #include "coll_base_util.h"
  42 
  43 
  44 
  45 
  46 
  47 
  48 
  49 
  50 
  51 
  52 
  53 
  54 int
  55 ompi_coll_base_reduce_scatter_block_basic_linear(const void *sbuf, void *rbuf, int rcount,
  56                                                  struct ompi_datatype_t *dtype,
  57                                                  struct ompi_op_t *op,
  58                                                  struct ompi_communicator_t *comm,
  59                                                  mca_coll_base_module_t *module)
  60 {
  61     int rank, size, count, err = OMPI_SUCCESS;
  62     ptrdiff_t gap, span;
  63     char *recv_buf = NULL, *recv_buf_free = NULL;
  64 
  65     
  66     rank = ompi_comm_rank(comm);
  67     size = ompi_comm_size(comm);
  68 
  69     
  70     count = rcount * size;
  71     if (0 == count) {
  72         return OMPI_SUCCESS;
  73     }
  74 
  75     
  76     span = opal_datatype_span(&dtype->super, count, &gap);
  77 
  78     
  79     if (MPI_IN_PLACE == sbuf) {
  80         sbuf = rbuf;
  81     }
  82 
  83     if (0 == rank) {
  84         
  85 
  86         recv_buf_free = (char*) malloc(span);
  87         if (NULL == recv_buf_free) {
  88             err = OMPI_ERR_OUT_OF_RESOURCE;
  89             goto cleanup;
  90         }
  91         recv_buf = recv_buf_free - gap;
  92     }
  93 
  94     
  95     err =
  96         comm->c_coll->coll_reduce(sbuf, recv_buf, count, dtype, op, 0,
  97                                  comm, comm->c_coll->coll_reduce_module);
  98 
  99     
 100     if (MPI_SUCCESS == err) {
 101         err = comm->c_coll->coll_scatter(recv_buf, rcount, dtype,
 102                                         rbuf, rcount, dtype, 0,
 103                                         comm, comm->c_coll->coll_scatter_module);
 104     }
 105 
 106  cleanup:
 107     if (NULL != recv_buf_free) free(recv_buf_free);
 108 
 109     return err;
 110 }
 111 
 112 
 113 
 114 
 115 
 116 
 117 
 118 
 119 
 120 
 121 
 122 
 123 
 124 
 125 
 126 
 127 int
 128 ompi_coll_base_reduce_scatter_block_intra_recursivedoubling(
 129     const void *sbuf, void *rbuf, int rcount, struct ompi_datatype_t *dtype,
 130     struct ompi_op_t *op, struct ompi_communicator_t *comm,
 131     mca_coll_base_module_t *module)
 132 {
 133     struct ompi_datatype_t *dtypesend = NULL, *dtyperecv = NULL;
 134     char *tmprecv_raw = NULL, *tmpbuf_raw = NULL, *tmprecv, *tmpbuf;
 135     ptrdiff_t span, gap, totalcount, extent;
 136     int blocklens[2], displs[2];
 137     int err = MPI_SUCCESS;
 138     int comm_size = ompi_comm_size(comm);
 139     int rank = ompi_comm_rank(comm);
 140 
 141     OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
 142                  "coll:base:reduce_scatter_block_intra_recursivedoubling: rank %d/%d",
 143                  rank, comm_size));
 144     if (rcount == 0)
 145         return MPI_SUCCESS;
 146     if (comm_size < 2)
 147         return MPI_SUCCESS;
 148 
 149     totalcount = comm_size * rcount;
 150     ompi_datatype_type_extent(dtype, &extent);
 151     span = opal_datatype_span(&dtype->super, totalcount, &gap);
 152     tmpbuf_raw = malloc(span);
 153     tmprecv_raw = malloc(span);
 154     if (NULL == tmpbuf_raw || NULL == tmprecv_raw) {
 155         err = OMPI_ERR_OUT_OF_RESOURCE;
 156         goto cleanup_and_return;
 157     }
 158     tmpbuf = tmpbuf_raw - gap;
 159     tmprecv = tmprecv_raw - gap;
 160 
 161     if (sbuf != MPI_IN_PLACE) {
 162         err = ompi_datatype_copy_content_same_ddt(dtype, totalcount, tmpbuf, (char *)sbuf);
 163         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 164     } else {
 165         err = ompi_datatype_copy_content_same_ddt(dtype, totalcount, tmpbuf, rbuf);
 166         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 167     }
 168     int is_commutative = ompi_op_is_commute(op);
 169 
 170     
 171     int rdoubling_step = 0;
 172     for (int mask = 1; mask < comm_size; mask <<= 1) {
 173         int remote = rank ^ mask;
 174         int cur_tree_root = ompi_rounddown(rank, mask);
 175         int remote_tree_root = ompi_rounddown(remote, mask);
 176 
 177         
 178 
 179 
 180 
 181 
 182 
 183 
 184 
 185 
 186 
 187 
 188 
 189 
 190         
 191         blocklens[0] = rcount * cur_tree_root;
 192         blocklens[1] = (comm_size >= cur_tree_root + mask) ?
 193                        rcount * (comm_size - cur_tree_root - mask) : 0;
 194         displs[0] = 0;
 195         displs[1] = comm_size * rcount - blocklens[1];
 196         err = ompi_datatype_create_indexed(2, blocklens, displs, dtype, &dtypesend);
 197         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 198         err = ompi_datatype_commit(&dtypesend);
 199         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 200 
 201         
 202         blocklens[0] = rcount * remote_tree_root;
 203         blocklens[1] = (comm_size >= remote_tree_root + mask) ?
 204                        rcount * (comm_size - remote_tree_root - mask) : 0;
 205         displs[0] = 0;
 206         displs[1] = comm_size * rcount - blocklens[1];
 207         err = ompi_datatype_create_indexed(2, blocklens, displs, dtype, &dtyperecv);
 208         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 209         err = ompi_datatype_commit(&dtyperecv);
 210         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 211 
 212         int is_block_received = 0;
 213         if (remote < comm_size) {
 214             err = ompi_coll_base_sendrecv(tmpbuf, 1, dtypesend, remote,
 215                                           MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 216                                           tmprecv, 1, dtyperecv, remote,
 217                                           MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 218                                           comm, MPI_STATUS_IGNORE, rank);
 219             if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 220             is_block_received = 1;
 221         }
 222         
 223 
 224 
 225 
 226 
 227         if (remote_tree_root + mask > comm_size) {
 228             
 229 
 230 
 231 
 232             int nprocs_alldata = comm_size - cur_tree_root - mask;
 233             for (int rhalving_mask = mask >> 1; rhalving_mask > 0; rhalving_mask >>= 1) {
 234                 remote = rank ^ rhalving_mask;
 235                 int tree_root = ompi_rounddown(rank, rhalving_mask << 1);
 236                 
 237 
 238 
 239 
 240 
 241                 if ((remote > rank) && (rank < tree_root + nprocs_alldata)
 242                     && (remote >= tree_root + nprocs_alldata)) {
 243                     err = MCA_PML_CALL(send(tmprecv, 1, dtyperecv, remote,
 244                                             MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 245                                             MCA_PML_BASE_SEND_STANDARD, comm));
 246                     if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 247 
 248                 } else if ((remote < rank) && (remote < tree_root + nprocs_alldata) &&
 249                            (rank >= tree_root + nprocs_alldata)) {
 250                     err = MCA_PML_CALL(recv(tmprecv, 1, dtyperecv, remote,
 251                                             MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 252                                             comm, MPI_STATUS_IGNORE));
 253                     if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 254                     is_block_received = 1;
 255                 }
 256             }
 257         }
 258 
 259         if (is_block_received) {
 260             
 261             if (is_commutative || (remote_tree_root < cur_tree_root)) {
 262                 ompi_op_reduce(op, tmprecv, tmpbuf, blocklens[0], dtype);
 263                 ompi_op_reduce(op, tmprecv + (ptrdiff_t)displs[1] * extent,
 264                                tmpbuf + (ptrdiff_t)displs[1] * extent,
 265                                blocklens[1], dtype);
 266             } else {
 267                 ompi_op_reduce(op, tmpbuf, tmprecv, blocklens[0], dtype);
 268                 ompi_op_reduce(op, tmpbuf + (ptrdiff_t)displs[1] * extent,
 269                                tmprecv + (ptrdiff_t)displs[1] * extent,
 270                                blocklens[1], dtype);
 271                 err = ompi_datatype_copy_content_same_ddt(dtyperecv, 1,
 272                                                           tmpbuf, tmprecv);
 273                 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 274             }
 275         }
 276         rdoubling_step++;
 277         err = ompi_datatype_destroy(&dtypesend);
 278         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 279         err = ompi_datatype_destroy(&dtyperecv);
 280         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 281     }
 282     err = ompi_datatype_copy_content_same_ddt(dtype, rcount, rbuf,
 283                                               tmpbuf + (ptrdiff_t)rank * rcount * extent);
 284     if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 285 
 286 cleanup_and_return:
 287     if (dtypesend)
 288         ompi_datatype_destroy(&dtypesend);
 289     if (dtyperecv)
 290         ompi_datatype_destroy(&dtyperecv);
 291     if (tmpbuf_raw)
 292         free(tmpbuf_raw);
 293     if (tmprecv_raw)
 294         free(tmprecv_raw);
 295     return err;
 296 }
 297 
 298 
 299 
 300 
 301 
 302 
 303 static int ompi_range_sum(int a, int b, int r)
 304 {
 305     if (r < a)
 306         return b - a + 1;
 307     else if (r > b)
 308         return 2 * (b - a + 1);
 309     return 2 * (r - a + 1) + b - r;
 310 }
 311 
 312 
 313 
 314 
 315 
 316 
 317 
 318 
 319 
 320 
 321 
 322 
 323 
 324 
 325 int
 326 ompi_coll_base_reduce_scatter_block_intra_recursivehalving(
 327     const void *sbuf, void *rbuf, int rcount, struct ompi_datatype_t *dtype,
 328     struct ompi_op_t *op, struct ompi_communicator_t *comm,
 329     mca_coll_base_module_t *module)
 330 {
 331     char *tmprecv_raw = NULL, *tmpbuf_raw = NULL, *tmprecv, *tmpbuf;
 332     ptrdiff_t span, gap, totalcount, extent;
 333     int err = MPI_SUCCESS;
 334     int comm_size = ompi_comm_size(comm);
 335     int rank = ompi_comm_rank(comm);
 336 
 337     OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
 338                  "coll:base:reduce_scatter_block_intra_recursivehalving: rank %d/%d",
 339                  rank, comm_size));
 340     if (rcount == 0 || comm_size < 2)
 341         return MPI_SUCCESS;
 342 
 343     if (!ompi_op_is_commute(op)) {
 344         OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
 345                      "coll:base:reduce_scatter_block_intra_recursivehalving: rank %d/%d "
 346                      "switching to basic reduce_scatter_block", rank, comm_size));
 347         return ompi_coll_base_reduce_scatter_block_basic_linear(sbuf, rbuf, rcount, dtype,
 348                                                                 op, comm, module);
 349     }
 350     totalcount = comm_size * rcount;
 351     ompi_datatype_type_extent(dtype, &extent);
 352     span = opal_datatype_span(&dtype->super, totalcount, &gap);
 353     tmpbuf_raw = malloc(span);
 354     tmprecv_raw = malloc(span);
 355     if (NULL == tmpbuf_raw || NULL == tmprecv_raw) {
 356         err = OMPI_ERR_OUT_OF_RESOURCE;
 357         goto cleanup_and_return;
 358     }
 359     tmpbuf = tmpbuf_raw - gap;
 360     tmprecv = tmprecv_raw - gap;
 361 
 362     if (sbuf != MPI_IN_PLACE) {
 363         err = ompi_datatype_copy_content_same_ddt(dtype, totalcount, tmpbuf, (char *)sbuf);
 364         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 365     } else {
 366         err = ompi_datatype_copy_content_same_ddt(dtype, totalcount, tmpbuf, rbuf);
 367         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 368     }
 369 
 370     
 371 
 372 
 373 
 374 
 375 
 376 
 377 
 378 
 379 
 380 
 381 
 382 
 383     
 384     int nprocs_pof2 = opal_next_poweroftwo(comm_size);
 385     nprocs_pof2 >>= 1;
 386     int nprocs_rem = comm_size - nprocs_pof2;
 387 
 388     int vrank = -1;
 389     if (rank < 2 * nprocs_rem) {
 390         if ((rank % 2) == 0) {
 391             
 392             err = MCA_PML_CALL(send(tmpbuf, totalcount, dtype, rank + 1,
 393                                     MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 394                                     MCA_PML_BASE_SEND_STANDARD, comm));
 395             if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
 396             
 397             vrank = -1;
 398         } else {
 399             
 400             err = MCA_PML_CALL(recv(tmprecv, totalcount, dtype, rank - 1,
 401                                     MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 402                                     comm, MPI_STATUS_IGNORE));
 403             if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
 404             ompi_op_reduce(op, tmprecv, tmpbuf, totalcount, dtype);
 405             
 406             vrank = rank / 2;
 407         }
 408     } else {
 409         
 410         vrank = rank - nprocs_rem;
 411     }
 412 
 413     if (vrank != -1) {
 414         
 415 
 416 
 417 
 418 
 419 
 420 
 421 
 422 
 423         int send_index = 0, recv_index = 0, last_index = nprocs_pof2;
 424         for (int mask = nprocs_pof2 >> 1; mask > 0; mask >>= 1) {
 425             int vpeer = vrank ^ mask;
 426             int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem;
 427 
 428             
 429 
 430 
 431 
 432 
 433 
 434             int send_count = 0, recv_count = 0;
 435             if (vrank < vpeer) {
 436                 
 437                 send_index = recv_index + mask;
 438                 send_count = rcount * ompi_range_sum(send_index, last_index - 1, nprocs_rem - 1);
 439                 recv_count = rcount * ompi_range_sum(recv_index, send_index - 1, nprocs_rem - 1);
 440             } else {
 441                 
 442                 recv_index = send_index + mask;
 443                 send_count = rcount * ompi_range_sum(send_index, recv_index - 1, nprocs_rem - 1);
 444                 recv_count = rcount * ompi_range_sum(recv_index, last_index - 1, nprocs_rem - 1);
 445             }
 446             ptrdiff_t rdispl = rcount * ((recv_index <= nprocs_rem - 1) ?
 447                                          2 * recv_index : nprocs_rem + recv_index);
 448             ptrdiff_t sdispl = rcount * ((send_index <= nprocs_rem - 1) ?
 449                                          2 * send_index : nprocs_rem + send_index);
 450             struct ompi_request_t *request = NULL;
 451 
 452             if (recv_count > 0) {
 453                 err = MCA_PML_CALL(irecv(tmprecv + rdispl * extent, recv_count,
 454                                          dtype, peer, MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 455                                          comm, &request));
 456                 if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
 457             }
 458             if (send_count > 0) {
 459                 err = MCA_PML_CALL(send(tmpbuf + sdispl * extent, send_count,
 460                                         dtype, peer, MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 461                                         MCA_PML_BASE_SEND_STANDARD,
 462                                         comm));
 463                 if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
 464             }
 465             if (recv_count > 0) {
 466                 err = ompi_request_wait(&request, MPI_STATUS_IGNORE);
 467                 if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
 468                 ompi_op_reduce(op, tmprecv + rdispl * extent,
 469                                tmpbuf + rdispl * extent, recv_count, dtype);
 470             }
 471             send_index = recv_index;
 472             last_index = recv_index + mask;
 473         }
 474         err = ompi_datatype_copy_content_same_ddt(dtype, rcount, rbuf,
 475                                                   tmpbuf + (ptrdiff_t)rank * rcount * extent);
 476         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 477     }
 478 
 479     
 480     if (rank < 2 * nprocs_rem) {
 481         if ((rank % 2) == 0) {
 482             
 483             err = MCA_PML_CALL(recv(rbuf, rcount, dtype, rank + 1,
 484                                     MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK, comm,
 485                                     MPI_STATUS_IGNORE));
 486             if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
 487         } else {
 488             
 489             err = MCA_PML_CALL(send(tmpbuf + (ptrdiff_t)(rank - 1) * rcount * extent,
 490                                     rcount, dtype, rank - 1,
 491                                     MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 492                                     MCA_PML_BASE_SEND_STANDARD, comm));
 493             if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 494         }
 495     }
 496 
 497 cleanup_and_return:
 498     if (tmpbuf_raw)
 499         free(tmpbuf_raw);
 500     if (tmprecv_raw)
 501         free(tmprecv_raw);
 502     return err;
 503 }
 504 
 505 static int ompi_coll_base_reduce_scatter_block_intra_butterfly_pof2(
 506     const void *sbuf, void *rbuf, int rcount, struct ompi_datatype_t *dtype,
 507     struct ompi_op_t *op, struct ompi_communicator_t *comm,
 508     mca_coll_base_module_t *module);
 509 
 510 
 511 
 512 
 513 
 514 
 515 
 516 
 517 
 518 
 519 
 520 
 521 
 522 
 523 
 524 
 525 
 526 
 527 
 528 
 529 
 530 
 531 
 532 
 533 
 534 
 535 
 536 
 537 
 538 
 539 
 540 
 541 
 542 
 543 
 544 
 545 
 546 
 547 
 548 
 549 
 550 
 551 
 552 
 553 
 554 
 555 
 556 
 557 
 558 
 559 
 560 
 561 
 562 
 563 
 564 
 565 
 566 int
 567 ompi_coll_base_reduce_scatter_block_intra_butterfly(
 568     const void *sbuf, void *rbuf, int rcount, struct ompi_datatype_t *dtype,
 569     struct ompi_op_t *op, struct ompi_communicator_t *comm,
 570     mca_coll_base_module_t *module)
 571 {
 572     char *tmpbuf[2] = {NULL, NULL}, *psend, *precv;
 573     ptrdiff_t span, gap, totalcount, extent;
 574     int err = MPI_SUCCESS;
 575     int comm_size = ompi_comm_size(comm);
 576     int rank = ompi_comm_rank(comm);
 577 
 578     OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
 579                  "coll:base:reduce_scatter_block_intra_butterfly: rank %d/%d",
 580                  rank, comm_size));
 581     if (rcount == 0 || comm_size < 2)
 582         return MPI_SUCCESS;
 583 
 584     if (!(comm_size & (comm_size - 1))) {
 585         
 586         return ompi_coll_base_reduce_scatter_block_intra_butterfly_pof2(
 587                    sbuf, rbuf, rcount, dtype, op, comm, module);
 588     }
 589 
 590     totalcount = comm_size * rcount;
 591     ompi_datatype_type_extent(dtype, &extent);
 592     span = opal_datatype_span(&dtype->super, totalcount, &gap);
 593     tmpbuf[0] = malloc(span);
 594     tmpbuf[1] = malloc(span);
 595     if (NULL == tmpbuf[0] || NULL == tmpbuf[1]) {
 596         err = OMPI_ERR_OUT_OF_RESOURCE;
 597         goto cleanup_and_return;
 598     }
 599     psend = tmpbuf[0] - gap;
 600     precv = tmpbuf[1] - gap;
 601 
 602     if (sbuf != MPI_IN_PLACE) {
 603         err = ompi_datatype_copy_content_same_ddt(dtype, totalcount, psend, (char *)sbuf);
 604         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 605     } else {
 606         err = ompi_datatype_copy_content_same_ddt(dtype, totalcount, psend, rbuf);
 607         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 608     }
 609 
 610     
 611 
 612 
 613 
 614 
 615 
 616 
 617 
 618 
 619 
 620 
 621 
 622 
 623     
 624     int nprocs_pof2 = opal_next_poweroftwo(comm_size);
 625     nprocs_pof2 >>= 1;
 626     int nprocs_rem = comm_size - nprocs_pof2;
 627     int log2_size = opal_cube_dim(nprocs_pof2);
 628 
 629     int vrank = -1;
 630     if (rank < 2 * nprocs_rem) {
 631         if ((rank % 2) == 0) {
 632             
 633             err = MCA_PML_CALL(send(psend, totalcount, dtype, rank + 1,
 634                                     MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 635                                     MCA_PML_BASE_SEND_STANDARD, comm));
 636             if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
 637             
 638             vrank = -1;
 639         } else {
 640             
 641             err = MCA_PML_CALL(recv(precv, totalcount, dtype, rank - 1,
 642                                     MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 643                                     comm, MPI_STATUS_IGNORE));
 644             if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
 645             ompi_op_reduce(op, precv, psend, totalcount, dtype);
 646             
 647             vrank = rank / 2;
 648         }
 649     } else {
 650         
 651         vrank = rank - nprocs_rem;
 652     }
 653 
 654     if (vrank != -1) {
 655         
 656 
 657 
 658 
 659 
 660 
 661 
 662 
 663 
 664 
 665 
 666 
 667         int nblocks = nprocs_pof2, send_index = 0, recv_index = 0;
 668         for (int mask = 1; mask < nprocs_pof2; mask <<= 1) {
 669             int vpeer = vrank ^ mask;
 670             int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem;
 671 
 672             nblocks /= 2;
 673             if ((vrank & mask) == 0) {
 674                 
 675                 send_index += nblocks;
 676             } else {
 677                 
 678                 recv_index += nblocks;
 679             }
 680             int send_count = rcount * ompi_range_sum(send_index,
 681                                           send_index + nblocks - 1, nprocs_rem - 1);
 682             int recv_count = rcount * ompi_range_sum(recv_index,
 683                                           recv_index + nblocks - 1, nprocs_rem - 1);
 684             ptrdiff_t sdispl = rcount * ((send_index <= nprocs_rem - 1) ?
 685                                          2 * send_index : nprocs_rem + send_index);
 686             ptrdiff_t rdispl = rcount * ((recv_index <= nprocs_rem - 1) ?
 687                                          2 * recv_index : nprocs_rem + recv_index);
 688 
 689             err = ompi_coll_base_sendrecv(psend + (ptrdiff_t)sdispl * extent, send_count,
 690                                           dtype, peer, MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 691                                           precv + (ptrdiff_t)rdispl * extent, recv_count,
 692                                           dtype, peer, MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 693                                           comm, MPI_STATUS_IGNORE, rank);
 694             if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 695 
 696             if (vrank < vpeer) {
 697                 
 698                 ompi_op_reduce(op, psend + (ptrdiff_t)rdispl * extent,
 699                                precv + (ptrdiff_t)rdispl * extent, recv_count, dtype);
 700                 char *p = psend;
 701                 psend = precv;
 702                 precv = p;
 703             } else {
 704                 
 705                 ompi_op_reduce(op, precv + (ptrdiff_t)rdispl * extent,
 706                                psend + (ptrdiff_t)rdispl * extent, recv_count, dtype);
 707             }
 708             send_index = recv_index;
 709         }
 710         
 711 
 712 
 713 
 714         int vpeer = ompi_mirror_perm(vrank, log2_size);
 715         int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem;
 716 
 717         if (vpeer < nprocs_rem) {
 718             
 719 
 720 
 721 
 722             ptrdiff_t sdispl = rcount * ((send_index <= nprocs_rem - 1) ?
 723                                          2 * send_index : nprocs_rem + send_index);
 724             err = MCA_PML_CALL(send(psend + (ptrdiff_t)sdispl * extent,
 725                                     rcount, dtype, peer - 1,
 726                                     MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 727                                     MCA_PML_BASE_SEND_STANDARD, comm));
 728             if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 729         }
 730 
 731         
 732         ptrdiff_t sdispl = rcount * ((send_index <= nprocs_rem - 1) ?
 733                                       2 * send_index : nprocs_rem + send_index);
 734         
 735         if (vpeer < nprocs_rem)
 736             sdispl += rcount;
 737         if (vpeer != vrank) {
 738             err = ompi_coll_base_sendrecv(psend + (ptrdiff_t)sdispl * extent, rcount,
 739                                           dtype, peer, MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 740                                           rbuf, rcount, dtype, peer,
 741                                           MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 742                                           comm, MPI_STATUS_IGNORE, rank);
 743             if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 744         } else {
 745             err = ompi_datatype_copy_content_same_ddt(dtype, rcount, rbuf,
 746                                                       psend + (ptrdiff_t)sdispl * extent);
 747             if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 748         }
 749 
 750     } else {
 751         
 752         int vpeer = ompi_mirror_perm((rank + 1) / 2, log2_size);
 753         int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem;
 754         err = MCA_PML_CALL(recv(rbuf, rcount, dtype, peer,
 755                                 MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK, comm,
 756                                 MPI_STATUS_IGNORE));
 757         if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
 758     }
 759 
 760 cleanup_and_return:
 761     if (tmpbuf[0])
 762         free(tmpbuf[0]);
 763     if (tmpbuf[1])
 764         free(tmpbuf[1]);
 765     return err;
 766 }
 767 
 768 
 769 
 770 
 771 
 772 
 773 
 774 
 775 
 776 
 777 
 778 
 779 
 780 
 781 
 782 
 783 
 784 
 785 
 786 
 787 
 788 
 789 
 790 
 791 
 792 
 793 
 794 
 795 
 796 
 797 
 798 
 799 
 800 
 801 
 802 
 803 
 804 
 805 
 806 
 807 
 808 
 809 static int
 810 ompi_coll_base_reduce_scatter_block_intra_butterfly_pof2(
 811     const void *sbuf, void *rbuf, int rcount, struct ompi_datatype_t *dtype,
 812     struct ompi_op_t *op, struct ompi_communicator_t *comm,
 813     mca_coll_base_module_t *module)
 814 {
 815     char *tmpbuf[2] = {NULL, NULL}, *psend, *precv;
 816     ptrdiff_t span, gap, totalcount, extent;
 817     int err = MPI_SUCCESS;
 818     int comm_size = ompi_comm_size(comm);
 819     int rank = ompi_comm_rank(comm);
 820 
 821     if (rcount == 0 || comm_size < 2)
 822         return MPI_SUCCESS;
 823 
 824     totalcount = comm_size * rcount;
 825     ompi_datatype_type_extent(dtype, &extent);
 826     span = opal_datatype_span(&dtype->super, totalcount, &gap);
 827     tmpbuf[0] = malloc(span);
 828     tmpbuf[1] = malloc(span);
 829     if (NULL == tmpbuf[0] || NULL == tmpbuf[1]) {
 830         err = OMPI_ERR_OUT_OF_RESOURCE;
 831         goto cleanup_and_return;
 832     }
 833     psend = tmpbuf[0] - gap;
 834     precv = tmpbuf[1] - gap;
 835 
 836     
 837     int log2_comm_size = opal_cube_dim(comm_size);
 838     char *pdata = (sbuf != MPI_IN_PLACE) ? (char *)sbuf : rbuf;
 839     for (int i = 0; i < comm_size; i++) {
 840         char *src = pdata + (ptrdiff_t)i * extent * rcount;
 841         char *dst = psend + (ptrdiff_t)ompi_mirror_perm(i, log2_comm_size) * extent * rcount;
 842         err = ompi_datatype_copy_content_same_ddt(dtype, rcount, dst, src);
 843         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 844     }
 845 
 846     int nblocks = totalcount, send_index = 0, recv_index = 0;
 847     for (int mask = 1; mask < comm_size; mask <<= 1) {
 848         int peer = rank ^ mask;
 849         nblocks /= 2;
 850 
 851         if ((rank & mask) == 0) {
 852             
 853             send_index += nblocks;
 854         } else {
 855             
 856             recv_index += nblocks;
 857         }
 858         err = ompi_coll_base_sendrecv(psend + (ptrdiff_t)send_index * extent,
 859                                       nblocks, dtype, peer,
 860                                       MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 861                                       precv + (ptrdiff_t)recv_index * extent,
 862                                       nblocks, dtype, peer,
 863                                       MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
 864                                       comm, MPI_STATUS_IGNORE, rank);
 865         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 866 
 867         if (rank < peer) {
 868             
 869             ompi_op_reduce(op, psend + (ptrdiff_t)recv_index * extent,
 870                            precv + (ptrdiff_t)recv_index * extent, nblocks, dtype);
 871             char *p = psend;
 872             psend = precv;
 873             precv = p;
 874         } else {
 875             
 876             ompi_op_reduce(op, precv + (ptrdiff_t)recv_index * extent,
 877                            psend + (ptrdiff_t)recv_index * extent, nblocks, dtype);
 878         }
 879         send_index = recv_index;
 880     }
 881     
 882     err = ompi_datatype_copy_content_same_ddt(dtype, rcount, rbuf,
 883                                               psend + (ptrdiff_t)recv_index * extent);
 884     if (MPI_SUCCESS != err) { goto cleanup_and_return; }
 885 
 886 cleanup_and_return:
 887     if (tmpbuf[0])
 888         free(tmpbuf[0]);
 889     if (tmpbuf[1])
 890         free(tmpbuf[1]);
 891     return err;
 892 }