This source file includes following definitions.
- ompi_coll_base_reduce_scatter_block_basic_linear
- ompi_coll_base_reduce_scatter_block_intra_recursivedoubling
- ompi_range_sum
- ompi_coll_base_reduce_scatter_block_intra_recursivehalving
- ompi_coll_base_reduce_scatter_block_intra_butterfly
- ompi_coll_base_reduce_scatter_block_intra_butterfly_pof2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 #include "ompi_config.h"
28
29 #include "mpi.h"
30 #include "opal/util/bit_ops.h"
31 #include "ompi/constants.h"
32 #include "ompi/datatype/ompi_datatype.h"
33 #include "ompi/communicator/communicator.h"
34 #include "ompi/mca/coll/coll.h"
35 #include "ompi/mca/coll/basic/coll_basic.h"
36 #include "ompi/mca/pml/pml.h"
37 #include "ompi/op/op.h"
38 #include "coll_tags.h"
39 #include "coll_base_functions.h"
40 #include "coll_base_topo.h"
41 #include "coll_base_util.h"
42
43
44
45
46
47
48
49
50
51
52
53
54 int
55 ompi_coll_base_reduce_scatter_block_basic_linear(const void *sbuf, void *rbuf, int rcount,
56 struct ompi_datatype_t *dtype,
57 struct ompi_op_t *op,
58 struct ompi_communicator_t *comm,
59 mca_coll_base_module_t *module)
60 {
61 int rank, size, count, err = OMPI_SUCCESS;
62 ptrdiff_t gap, span;
63 char *recv_buf = NULL, *recv_buf_free = NULL;
64
65
66 rank = ompi_comm_rank(comm);
67 size = ompi_comm_size(comm);
68
69
70 count = rcount * size;
71 if (0 == count) {
72 return OMPI_SUCCESS;
73 }
74
75
76 span = opal_datatype_span(&dtype->super, count, &gap);
77
78
79 if (MPI_IN_PLACE == sbuf) {
80 sbuf = rbuf;
81 }
82
83 if (0 == rank) {
84
85
86 recv_buf_free = (char*) malloc(span);
87 if (NULL == recv_buf_free) {
88 err = OMPI_ERR_OUT_OF_RESOURCE;
89 goto cleanup;
90 }
91 recv_buf = recv_buf_free - gap;
92 }
93
94
95 err =
96 comm->c_coll->coll_reduce(sbuf, recv_buf, count, dtype, op, 0,
97 comm, comm->c_coll->coll_reduce_module);
98
99
100 if (MPI_SUCCESS == err) {
101 err = comm->c_coll->coll_scatter(recv_buf, rcount, dtype,
102 rbuf, rcount, dtype, 0,
103 comm, comm->c_coll->coll_scatter_module);
104 }
105
106 cleanup:
107 if (NULL != recv_buf_free) free(recv_buf_free);
108
109 return err;
110 }
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127 int
128 ompi_coll_base_reduce_scatter_block_intra_recursivedoubling(
129 const void *sbuf, void *rbuf, int rcount, struct ompi_datatype_t *dtype,
130 struct ompi_op_t *op, struct ompi_communicator_t *comm,
131 mca_coll_base_module_t *module)
132 {
133 struct ompi_datatype_t *dtypesend = NULL, *dtyperecv = NULL;
134 char *tmprecv_raw = NULL, *tmpbuf_raw = NULL, *tmprecv, *tmpbuf;
135 ptrdiff_t span, gap, totalcount, extent;
136 int blocklens[2], displs[2];
137 int err = MPI_SUCCESS;
138 int comm_size = ompi_comm_size(comm);
139 int rank = ompi_comm_rank(comm);
140
141 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
142 "coll:base:reduce_scatter_block_intra_recursivedoubling: rank %d/%d",
143 rank, comm_size));
144 if (rcount == 0)
145 return MPI_SUCCESS;
146 if (comm_size < 2)
147 return MPI_SUCCESS;
148
149 totalcount = comm_size * rcount;
150 ompi_datatype_type_extent(dtype, &extent);
151 span = opal_datatype_span(&dtype->super, totalcount, &gap);
152 tmpbuf_raw = malloc(span);
153 tmprecv_raw = malloc(span);
154 if (NULL == tmpbuf_raw || NULL == tmprecv_raw) {
155 err = OMPI_ERR_OUT_OF_RESOURCE;
156 goto cleanup_and_return;
157 }
158 tmpbuf = tmpbuf_raw - gap;
159 tmprecv = tmprecv_raw - gap;
160
161 if (sbuf != MPI_IN_PLACE) {
162 err = ompi_datatype_copy_content_same_ddt(dtype, totalcount, tmpbuf, (char *)sbuf);
163 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
164 } else {
165 err = ompi_datatype_copy_content_same_ddt(dtype, totalcount, tmpbuf, rbuf);
166 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
167 }
168 int is_commutative = ompi_op_is_commute(op);
169
170
171 int rdoubling_step = 0;
172 for (int mask = 1; mask < comm_size; mask <<= 1) {
173 int remote = rank ^ mask;
174 int cur_tree_root = ompi_rounddown(rank, mask);
175 int remote_tree_root = ompi_rounddown(remote, mask);
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191 blocklens[0] = rcount * cur_tree_root;
192 blocklens[1] = (comm_size >= cur_tree_root + mask) ?
193 rcount * (comm_size - cur_tree_root - mask) : 0;
194 displs[0] = 0;
195 displs[1] = comm_size * rcount - blocklens[1];
196 err = ompi_datatype_create_indexed(2, blocklens, displs, dtype, &dtypesend);
197 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
198 err = ompi_datatype_commit(&dtypesend);
199 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
200
201
202 blocklens[0] = rcount * remote_tree_root;
203 blocklens[1] = (comm_size >= remote_tree_root + mask) ?
204 rcount * (comm_size - remote_tree_root - mask) : 0;
205 displs[0] = 0;
206 displs[1] = comm_size * rcount - blocklens[1];
207 err = ompi_datatype_create_indexed(2, blocklens, displs, dtype, &dtyperecv);
208 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
209 err = ompi_datatype_commit(&dtyperecv);
210 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
211
212 int is_block_received = 0;
213 if (remote < comm_size) {
214 err = ompi_coll_base_sendrecv(tmpbuf, 1, dtypesend, remote,
215 MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
216 tmprecv, 1, dtyperecv, remote,
217 MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
218 comm, MPI_STATUS_IGNORE, rank);
219 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
220 is_block_received = 1;
221 }
222
223
224
225
226
227 if (remote_tree_root + mask > comm_size) {
228
229
230
231
232 int nprocs_alldata = comm_size - cur_tree_root - mask;
233 for (int rhalving_mask = mask >> 1; rhalving_mask > 0; rhalving_mask >>= 1) {
234 remote = rank ^ rhalving_mask;
235 int tree_root = ompi_rounddown(rank, rhalving_mask << 1);
236
237
238
239
240
241 if ((remote > rank) && (rank < tree_root + nprocs_alldata)
242 && (remote >= tree_root + nprocs_alldata)) {
243 err = MCA_PML_CALL(send(tmprecv, 1, dtyperecv, remote,
244 MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
245 MCA_PML_BASE_SEND_STANDARD, comm));
246 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
247
248 } else if ((remote < rank) && (remote < tree_root + nprocs_alldata) &&
249 (rank >= tree_root + nprocs_alldata)) {
250 err = MCA_PML_CALL(recv(tmprecv, 1, dtyperecv, remote,
251 MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
252 comm, MPI_STATUS_IGNORE));
253 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
254 is_block_received = 1;
255 }
256 }
257 }
258
259 if (is_block_received) {
260
261 if (is_commutative || (remote_tree_root < cur_tree_root)) {
262 ompi_op_reduce(op, tmprecv, tmpbuf, blocklens[0], dtype);
263 ompi_op_reduce(op, tmprecv + (ptrdiff_t)displs[1] * extent,
264 tmpbuf + (ptrdiff_t)displs[1] * extent,
265 blocklens[1], dtype);
266 } else {
267 ompi_op_reduce(op, tmpbuf, tmprecv, blocklens[0], dtype);
268 ompi_op_reduce(op, tmpbuf + (ptrdiff_t)displs[1] * extent,
269 tmprecv + (ptrdiff_t)displs[1] * extent,
270 blocklens[1], dtype);
271 err = ompi_datatype_copy_content_same_ddt(dtyperecv, 1,
272 tmpbuf, tmprecv);
273 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
274 }
275 }
276 rdoubling_step++;
277 err = ompi_datatype_destroy(&dtypesend);
278 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
279 err = ompi_datatype_destroy(&dtyperecv);
280 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
281 }
282 err = ompi_datatype_copy_content_same_ddt(dtype, rcount, rbuf,
283 tmpbuf + (ptrdiff_t)rank * rcount * extent);
284 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
285
286 cleanup_and_return:
287 if (dtypesend)
288 ompi_datatype_destroy(&dtypesend);
289 if (dtyperecv)
290 ompi_datatype_destroy(&dtyperecv);
291 if (tmpbuf_raw)
292 free(tmpbuf_raw);
293 if (tmprecv_raw)
294 free(tmprecv_raw);
295 return err;
296 }
297
298
299
300
301
302
303 static int ompi_range_sum(int a, int b, int r)
304 {
305 if (r < a)
306 return b - a + 1;
307 else if (r > b)
308 return 2 * (b - a + 1);
309 return 2 * (r - a + 1) + b - r;
310 }
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325 int
326 ompi_coll_base_reduce_scatter_block_intra_recursivehalving(
327 const void *sbuf, void *rbuf, int rcount, struct ompi_datatype_t *dtype,
328 struct ompi_op_t *op, struct ompi_communicator_t *comm,
329 mca_coll_base_module_t *module)
330 {
331 char *tmprecv_raw = NULL, *tmpbuf_raw = NULL, *tmprecv, *tmpbuf;
332 ptrdiff_t span, gap, totalcount, extent;
333 int err = MPI_SUCCESS;
334 int comm_size = ompi_comm_size(comm);
335 int rank = ompi_comm_rank(comm);
336
337 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
338 "coll:base:reduce_scatter_block_intra_recursivehalving: rank %d/%d",
339 rank, comm_size));
340 if (rcount == 0 || comm_size < 2)
341 return MPI_SUCCESS;
342
343 if (!ompi_op_is_commute(op)) {
344 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
345 "coll:base:reduce_scatter_block_intra_recursivehalving: rank %d/%d "
346 "switching to basic reduce_scatter_block", rank, comm_size));
347 return ompi_coll_base_reduce_scatter_block_basic_linear(sbuf, rbuf, rcount, dtype,
348 op, comm, module);
349 }
350 totalcount = comm_size * rcount;
351 ompi_datatype_type_extent(dtype, &extent);
352 span = opal_datatype_span(&dtype->super, totalcount, &gap);
353 tmpbuf_raw = malloc(span);
354 tmprecv_raw = malloc(span);
355 if (NULL == tmpbuf_raw || NULL == tmprecv_raw) {
356 err = OMPI_ERR_OUT_OF_RESOURCE;
357 goto cleanup_and_return;
358 }
359 tmpbuf = tmpbuf_raw - gap;
360 tmprecv = tmprecv_raw - gap;
361
362 if (sbuf != MPI_IN_PLACE) {
363 err = ompi_datatype_copy_content_same_ddt(dtype, totalcount, tmpbuf, (char *)sbuf);
364 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
365 } else {
366 err = ompi_datatype_copy_content_same_ddt(dtype, totalcount, tmpbuf, rbuf);
367 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
368 }
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384 int nprocs_pof2 = opal_next_poweroftwo(comm_size);
385 nprocs_pof2 >>= 1;
386 int nprocs_rem = comm_size - nprocs_pof2;
387
388 int vrank = -1;
389 if (rank < 2 * nprocs_rem) {
390 if ((rank % 2) == 0) {
391
392 err = MCA_PML_CALL(send(tmpbuf, totalcount, dtype, rank + 1,
393 MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
394 MCA_PML_BASE_SEND_STANDARD, comm));
395 if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
396
397 vrank = -1;
398 } else {
399
400 err = MCA_PML_CALL(recv(tmprecv, totalcount, dtype, rank - 1,
401 MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
402 comm, MPI_STATUS_IGNORE));
403 if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
404 ompi_op_reduce(op, tmprecv, tmpbuf, totalcount, dtype);
405
406 vrank = rank / 2;
407 }
408 } else {
409
410 vrank = rank - nprocs_rem;
411 }
412
413 if (vrank != -1) {
414
415
416
417
418
419
420
421
422
423 int send_index = 0, recv_index = 0, last_index = nprocs_pof2;
424 for (int mask = nprocs_pof2 >> 1; mask > 0; mask >>= 1) {
425 int vpeer = vrank ^ mask;
426 int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem;
427
428
429
430
431
432
433
434 int send_count = 0, recv_count = 0;
435 if (vrank < vpeer) {
436
437 send_index = recv_index + mask;
438 send_count = rcount * ompi_range_sum(send_index, last_index - 1, nprocs_rem - 1);
439 recv_count = rcount * ompi_range_sum(recv_index, send_index - 1, nprocs_rem - 1);
440 } else {
441
442 recv_index = send_index + mask;
443 send_count = rcount * ompi_range_sum(send_index, recv_index - 1, nprocs_rem - 1);
444 recv_count = rcount * ompi_range_sum(recv_index, last_index - 1, nprocs_rem - 1);
445 }
446 ptrdiff_t rdispl = rcount * ((recv_index <= nprocs_rem - 1) ?
447 2 * recv_index : nprocs_rem + recv_index);
448 ptrdiff_t sdispl = rcount * ((send_index <= nprocs_rem - 1) ?
449 2 * send_index : nprocs_rem + send_index);
450 struct ompi_request_t *request = NULL;
451
452 if (recv_count > 0) {
453 err = MCA_PML_CALL(irecv(tmprecv + rdispl * extent, recv_count,
454 dtype, peer, MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
455 comm, &request));
456 if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
457 }
458 if (send_count > 0) {
459 err = MCA_PML_CALL(send(tmpbuf + sdispl * extent, send_count,
460 dtype, peer, MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
461 MCA_PML_BASE_SEND_STANDARD,
462 comm));
463 if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
464 }
465 if (recv_count > 0) {
466 err = ompi_request_wait(&request, MPI_STATUS_IGNORE);
467 if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
468 ompi_op_reduce(op, tmprecv + rdispl * extent,
469 tmpbuf + rdispl * extent, recv_count, dtype);
470 }
471 send_index = recv_index;
472 last_index = recv_index + mask;
473 }
474 err = ompi_datatype_copy_content_same_ddt(dtype, rcount, rbuf,
475 tmpbuf + (ptrdiff_t)rank * rcount * extent);
476 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
477 }
478
479
480 if (rank < 2 * nprocs_rem) {
481 if ((rank % 2) == 0) {
482
483 err = MCA_PML_CALL(recv(rbuf, rcount, dtype, rank + 1,
484 MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK, comm,
485 MPI_STATUS_IGNORE));
486 if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
487 } else {
488
489 err = MCA_PML_CALL(send(tmpbuf + (ptrdiff_t)(rank - 1) * rcount * extent,
490 rcount, dtype, rank - 1,
491 MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
492 MCA_PML_BASE_SEND_STANDARD, comm));
493 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
494 }
495 }
496
497 cleanup_and_return:
498 if (tmpbuf_raw)
499 free(tmpbuf_raw);
500 if (tmprecv_raw)
501 free(tmprecv_raw);
502 return err;
503 }
504
505 static int ompi_coll_base_reduce_scatter_block_intra_butterfly_pof2(
506 const void *sbuf, void *rbuf, int rcount, struct ompi_datatype_t *dtype,
507 struct ompi_op_t *op, struct ompi_communicator_t *comm,
508 mca_coll_base_module_t *module);
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566 int
567 ompi_coll_base_reduce_scatter_block_intra_butterfly(
568 const void *sbuf, void *rbuf, int rcount, struct ompi_datatype_t *dtype,
569 struct ompi_op_t *op, struct ompi_communicator_t *comm,
570 mca_coll_base_module_t *module)
571 {
572 char *tmpbuf[2] = {NULL, NULL}, *psend, *precv;
573 ptrdiff_t span, gap, totalcount, extent;
574 int err = MPI_SUCCESS;
575 int comm_size = ompi_comm_size(comm);
576 int rank = ompi_comm_rank(comm);
577
578 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
579 "coll:base:reduce_scatter_block_intra_butterfly: rank %d/%d",
580 rank, comm_size));
581 if (rcount == 0 || comm_size < 2)
582 return MPI_SUCCESS;
583
584 if (!(comm_size & (comm_size - 1))) {
585
586 return ompi_coll_base_reduce_scatter_block_intra_butterfly_pof2(
587 sbuf, rbuf, rcount, dtype, op, comm, module);
588 }
589
590 totalcount = comm_size * rcount;
591 ompi_datatype_type_extent(dtype, &extent);
592 span = opal_datatype_span(&dtype->super, totalcount, &gap);
593 tmpbuf[0] = malloc(span);
594 tmpbuf[1] = malloc(span);
595 if (NULL == tmpbuf[0] || NULL == tmpbuf[1]) {
596 err = OMPI_ERR_OUT_OF_RESOURCE;
597 goto cleanup_and_return;
598 }
599 psend = tmpbuf[0] - gap;
600 precv = tmpbuf[1] - gap;
601
602 if (sbuf != MPI_IN_PLACE) {
603 err = ompi_datatype_copy_content_same_ddt(dtype, totalcount, psend, (char *)sbuf);
604 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
605 } else {
606 err = ompi_datatype_copy_content_same_ddt(dtype, totalcount, psend, rbuf);
607 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
608 }
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624 int nprocs_pof2 = opal_next_poweroftwo(comm_size);
625 nprocs_pof2 >>= 1;
626 int nprocs_rem = comm_size - nprocs_pof2;
627 int log2_size = opal_cube_dim(nprocs_pof2);
628
629 int vrank = -1;
630 if (rank < 2 * nprocs_rem) {
631 if ((rank % 2) == 0) {
632
633 err = MCA_PML_CALL(send(psend, totalcount, dtype, rank + 1,
634 MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
635 MCA_PML_BASE_SEND_STANDARD, comm));
636 if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
637
638 vrank = -1;
639 } else {
640
641 err = MCA_PML_CALL(recv(precv, totalcount, dtype, rank - 1,
642 MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
643 comm, MPI_STATUS_IGNORE));
644 if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
645 ompi_op_reduce(op, precv, psend, totalcount, dtype);
646
647 vrank = rank / 2;
648 }
649 } else {
650
651 vrank = rank - nprocs_rem;
652 }
653
654 if (vrank != -1) {
655
656
657
658
659
660
661
662
663
664
665
666
667 int nblocks = nprocs_pof2, send_index = 0, recv_index = 0;
668 for (int mask = 1; mask < nprocs_pof2; mask <<= 1) {
669 int vpeer = vrank ^ mask;
670 int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem;
671
672 nblocks /= 2;
673 if ((vrank & mask) == 0) {
674
675 send_index += nblocks;
676 } else {
677
678 recv_index += nblocks;
679 }
680 int send_count = rcount * ompi_range_sum(send_index,
681 send_index + nblocks - 1, nprocs_rem - 1);
682 int recv_count = rcount * ompi_range_sum(recv_index,
683 recv_index + nblocks - 1, nprocs_rem - 1);
684 ptrdiff_t sdispl = rcount * ((send_index <= nprocs_rem - 1) ?
685 2 * send_index : nprocs_rem + send_index);
686 ptrdiff_t rdispl = rcount * ((recv_index <= nprocs_rem - 1) ?
687 2 * recv_index : nprocs_rem + recv_index);
688
689 err = ompi_coll_base_sendrecv(psend + (ptrdiff_t)sdispl * extent, send_count,
690 dtype, peer, MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
691 precv + (ptrdiff_t)rdispl * extent, recv_count,
692 dtype, peer, MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
693 comm, MPI_STATUS_IGNORE, rank);
694 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
695
696 if (vrank < vpeer) {
697
698 ompi_op_reduce(op, psend + (ptrdiff_t)rdispl * extent,
699 precv + (ptrdiff_t)rdispl * extent, recv_count, dtype);
700 char *p = psend;
701 psend = precv;
702 precv = p;
703 } else {
704
705 ompi_op_reduce(op, precv + (ptrdiff_t)rdispl * extent,
706 psend + (ptrdiff_t)rdispl * extent, recv_count, dtype);
707 }
708 send_index = recv_index;
709 }
710
711
712
713
714 int vpeer = ompi_mirror_perm(vrank, log2_size);
715 int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem;
716
717 if (vpeer < nprocs_rem) {
718
719
720
721
722 ptrdiff_t sdispl = rcount * ((send_index <= nprocs_rem - 1) ?
723 2 * send_index : nprocs_rem + send_index);
724 err = MCA_PML_CALL(send(psend + (ptrdiff_t)sdispl * extent,
725 rcount, dtype, peer - 1,
726 MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
727 MCA_PML_BASE_SEND_STANDARD, comm));
728 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
729 }
730
731
732 ptrdiff_t sdispl = rcount * ((send_index <= nprocs_rem - 1) ?
733 2 * send_index : nprocs_rem + send_index);
734
735 if (vpeer < nprocs_rem)
736 sdispl += rcount;
737 if (vpeer != vrank) {
738 err = ompi_coll_base_sendrecv(psend + (ptrdiff_t)sdispl * extent, rcount,
739 dtype, peer, MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
740 rbuf, rcount, dtype, peer,
741 MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
742 comm, MPI_STATUS_IGNORE, rank);
743 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
744 } else {
745 err = ompi_datatype_copy_content_same_ddt(dtype, rcount, rbuf,
746 psend + (ptrdiff_t)sdispl * extent);
747 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
748 }
749
750 } else {
751
752 int vpeer = ompi_mirror_perm((rank + 1) / 2, log2_size);
753 int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem;
754 err = MCA_PML_CALL(recv(rbuf, rcount, dtype, peer,
755 MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK, comm,
756 MPI_STATUS_IGNORE));
757 if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
758 }
759
760 cleanup_and_return:
761 if (tmpbuf[0])
762 free(tmpbuf[0]);
763 if (tmpbuf[1])
764 free(tmpbuf[1]);
765 return err;
766 }
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809 static int
810 ompi_coll_base_reduce_scatter_block_intra_butterfly_pof2(
811 const void *sbuf, void *rbuf, int rcount, struct ompi_datatype_t *dtype,
812 struct ompi_op_t *op, struct ompi_communicator_t *comm,
813 mca_coll_base_module_t *module)
814 {
815 char *tmpbuf[2] = {NULL, NULL}, *psend, *precv;
816 ptrdiff_t span, gap, totalcount, extent;
817 int err = MPI_SUCCESS;
818 int comm_size = ompi_comm_size(comm);
819 int rank = ompi_comm_rank(comm);
820
821 if (rcount == 0 || comm_size < 2)
822 return MPI_SUCCESS;
823
824 totalcount = comm_size * rcount;
825 ompi_datatype_type_extent(dtype, &extent);
826 span = opal_datatype_span(&dtype->super, totalcount, &gap);
827 tmpbuf[0] = malloc(span);
828 tmpbuf[1] = malloc(span);
829 if (NULL == tmpbuf[0] || NULL == tmpbuf[1]) {
830 err = OMPI_ERR_OUT_OF_RESOURCE;
831 goto cleanup_and_return;
832 }
833 psend = tmpbuf[0] - gap;
834 precv = tmpbuf[1] - gap;
835
836
837 int log2_comm_size = opal_cube_dim(comm_size);
838 char *pdata = (sbuf != MPI_IN_PLACE) ? (char *)sbuf : rbuf;
839 for (int i = 0; i < comm_size; i++) {
840 char *src = pdata + (ptrdiff_t)i * extent * rcount;
841 char *dst = psend + (ptrdiff_t)ompi_mirror_perm(i, log2_comm_size) * extent * rcount;
842 err = ompi_datatype_copy_content_same_ddt(dtype, rcount, dst, src);
843 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
844 }
845
846 int nblocks = totalcount, send_index = 0, recv_index = 0;
847 for (int mask = 1; mask < comm_size; mask <<= 1) {
848 int peer = rank ^ mask;
849 nblocks /= 2;
850
851 if ((rank & mask) == 0) {
852
853 send_index += nblocks;
854 } else {
855
856 recv_index += nblocks;
857 }
858 err = ompi_coll_base_sendrecv(psend + (ptrdiff_t)send_index * extent,
859 nblocks, dtype, peer,
860 MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
861 precv + (ptrdiff_t)recv_index * extent,
862 nblocks, dtype, peer,
863 MCA_COLL_BASE_TAG_REDUCE_SCATTER_BLOCK,
864 comm, MPI_STATUS_IGNORE, rank);
865 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
866
867 if (rank < peer) {
868
869 ompi_op_reduce(op, psend + (ptrdiff_t)recv_index * extent,
870 precv + (ptrdiff_t)recv_index * extent, nblocks, dtype);
871 char *p = psend;
872 psend = precv;
873 precv = p;
874 } else {
875
876 ompi_op_reduce(op, precv + (ptrdiff_t)recv_index * extent,
877 psend + (ptrdiff_t)recv_index * extent, nblocks, dtype);
878 }
879 send_index = recv_index;
880 }
881
882 err = ompi_datatype_copy_content_same_ddt(dtype, rcount, rbuf,
883 psend + (ptrdiff_t)recv_index * extent);
884 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
885
886 cleanup_and_return:
887 if (tmpbuf[0])
888 free(tmpbuf[0]);
889 if (tmpbuf[1])
890 free(tmpbuf[1]);
891 return err;
892 }