This source file includes following definitions.
- ompi_coll_base_reduce_scatter_intra_nonoverlapping
- ompi_coll_base_reduce_scatter_intra_basic_recursivehalving
- ompi_coll_base_reduce_scatter_intra_ring
- ompi_sum_counts
- ompi_coll_base_reduce_scatter_intra_butterfly
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 #include "ompi_config.h"
27
28 #include "mpi.h"
29 #include "opal/util/bit_ops.h"
30 #include "ompi/constants.h"
31 #include "ompi/datatype/ompi_datatype.h"
32 #include "ompi/communicator/communicator.h"
33 #include "ompi/mca/coll/coll.h"
34 #include "ompi/mca/coll/base/coll_tags.h"
35 #include "ompi/mca/pml/pml.h"
36 #include "ompi/op/op.h"
37 #include "ompi/mca/coll/base/coll_base_functions.h"
38 #include "coll_base_topo.h"
39 #include "coll_base_util.h"
40
41
42
43
44
45
46
47 int ompi_coll_base_reduce_scatter_intra_nonoverlapping(const void *sbuf, void *rbuf,
48 const int *rcounts,
49 struct ompi_datatype_t *dtype,
50 struct ompi_op_t *op,
51 struct ompi_communicator_t *comm,
52 mca_coll_base_module_t *module)
53 {
54 int err, i, rank, size, total_count, *displs = NULL;
55 const int root = 0;
56 char *tmprbuf = NULL, *tmprbuf_free = NULL;
57
58 rank = ompi_comm_rank(comm);
59 size = ompi_comm_size(comm);
60
61 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,"coll:base:reduce_scatter_intra_nonoverlapping, rank %d", rank));
62
63 for (i = 0, total_count = 0; i < size; i++) { total_count += rcounts[i]; }
64
65
66 tmprbuf = (char*) rbuf;
67 if (MPI_IN_PLACE == sbuf) {
68
69 if (root == rank) {
70 err = comm->c_coll->coll_reduce (MPI_IN_PLACE, tmprbuf, total_count,
71 dtype, op, root, comm, comm->c_coll->coll_reduce_module);
72 } else {
73 err = comm->c_coll->coll_reduce(tmprbuf, NULL, total_count,
74 dtype, op, root, comm, comm->c_coll->coll_reduce_module);
75 }
76 } else {
77 if (root == rank) {
78
79
80 ptrdiff_t dsize, gap = 0;
81 dsize = opal_datatype_span(&dtype->super, total_count, &gap);
82
83 tmprbuf_free = (char*) malloc(dsize);
84 tmprbuf = tmprbuf_free - gap;
85 }
86 err = comm->c_coll->coll_reduce (sbuf, tmprbuf, total_count,
87 dtype, op, root, comm, comm->c_coll->coll_reduce_module);
88 }
89 if (MPI_SUCCESS != err) {
90 if (NULL != tmprbuf_free) free(tmprbuf_free);
91 return err;
92 }
93
94 displs = (int*) malloc(size * sizeof(int));
95 displs[0] = 0;
96 for (i = 1; i < size; i++) {
97 displs[i] = displs[i-1] + rcounts[i-1];
98 }
99 if (MPI_IN_PLACE == sbuf && root == rank) {
100 err = comm->c_coll->coll_scatterv (tmprbuf, rcounts, displs, dtype,
101 MPI_IN_PLACE, 0, MPI_DATATYPE_NULL,
102 root, comm, comm->c_coll->coll_scatterv_module);
103 } else {
104 err = comm->c_coll->coll_scatterv (tmprbuf, rcounts, displs, dtype,
105 rbuf, rcounts[rank], dtype,
106 root, comm, comm->c_coll->coll_scatterv_module);
107 }
108 free(displs);
109 if (NULL != tmprbuf_free) free(tmprbuf_free);
110
111 return err;
112 }
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131 int
132 ompi_coll_base_reduce_scatter_intra_basic_recursivehalving( const void *sbuf,
133 void *rbuf,
134 const int *rcounts,
135 struct ompi_datatype_t *dtype,
136 struct ompi_op_t *op,
137 struct ompi_communicator_t *comm,
138 mca_coll_base_module_t *module)
139 {
140 int i, rank, size, count, err = OMPI_SUCCESS;
141 int tmp_size, remain = 0, tmp_rank, *disps = NULL;
142 ptrdiff_t extent, buf_size, gap = 0;
143 char *recv_buf = NULL, *recv_buf_free = NULL;
144 char *result_buf = NULL, *result_buf_free = NULL;
145
146
147 rank = ompi_comm_rank(comm);
148 size = ompi_comm_size(comm);
149
150 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,"coll:base:reduce_scatter_intra_basic_recursivehalving, rank %d", rank));
151
152
153 disps = (int*) malloc(sizeof(int) * size);
154 if (NULL == disps) return OMPI_ERR_OUT_OF_RESOURCE;
155
156 disps[0] = 0;
157 for (i = 0; i < (size - 1); ++i) {
158 disps[i + 1] = disps[i] + rcounts[i];
159 }
160 count = disps[size - 1] + rcounts[size - 1];
161
162
163 if (0 == count) {
164 free(disps);
165 return OMPI_SUCCESS;
166 }
167
168
169 ompi_datatype_type_extent(dtype, &extent);
170 buf_size = opal_datatype_span(&dtype->super, count, &gap);
171
172
173 if (MPI_IN_PLACE == sbuf) {
174 sbuf = rbuf;
175 }
176
177
178 recv_buf_free = (char*) malloc(buf_size);
179 recv_buf = recv_buf_free - gap;
180 if (NULL == recv_buf_free) {
181 err = OMPI_ERR_OUT_OF_RESOURCE;
182 goto cleanup;
183 }
184
185
186 result_buf_free = (char*) malloc(buf_size);
187 result_buf = result_buf_free - gap;
188
189
190 err = ompi_datatype_sndrcv(sbuf, count, dtype, result_buf, count, dtype);
191 if (OMPI_SUCCESS != err) goto cleanup;
192
193
194
195
196 tmp_size = opal_next_poweroftwo (size);
197 tmp_size >>= 1;
198 remain = size - tmp_size;
199
200
201
202
203 if (rank < 2 * remain) {
204 if ((rank & 1) == 0) {
205 err = MCA_PML_CALL(send(result_buf, count, dtype, rank + 1,
206 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
207 MCA_PML_BASE_SEND_STANDARD,
208 comm));
209 if (OMPI_SUCCESS != err) goto cleanup;
210
211
212 tmp_rank = -1;
213 } else {
214 err = MCA_PML_CALL(recv(recv_buf, count, dtype, rank - 1,
215 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
216 comm, MPI_STATUS_IGNORE));
217
218
219 ompi_op_reduce(op, recv_buf, result_buf, count, dtype);
220
221
222 tmp_rank = rank / 2;
223 }
224 } else {
225
226
227 tmp_rank = rank - remain;
228 }
229
230
231
232 if (tmp_rank >= 0) {
233 int *tmp_disps = NULL, *tmp_rcounts = NULL;
234 int mask, send_index, recv_index, last_index;
235
236
237
238
239 tmp_rcounts = (int*) malloc(tmp_size * sizeof(int));
240 if (NULL == tmp_rcounts) {
241 err = OMPI_ERR_OUT_OF_RESOURCE;
242 goto cleanup;
243 }
244 tmp_disps = (int*) malloc(tmp_size * sizeof(int));
245 if (NULL == tmp_disps) {
246 free(tmp_rcounts);
247 err = OMPI_ERR_OUT_OF_RESOURCE;
248 goto cleanup;
249 }
250
251 for (i = 0 ; i < tmp_size ; ++i) {
252 if (i < remain) {
253
254 tmp_rcounts[i] = rcounts[i * 2 + 1] + rcounts[i * 2];
255 } else {
256 tmp_rcounts[i] = rcounts[i + remain];
257 }
258 }
259
260 tmp_disps[0] = 0;
261 for (i = 0; i < tmp_size - 1; ++i) {
262 tmp_disps[i + 1] = tmp_disps[i] + tmp_rcounts[i];
263 }
264
265
266
267
268
269 mask = tmp_size >> 1;
270 send_index = recv_index = 0;
271 last_index = tmp_size;
272 while (mask > 0) {
273 int tmp_peer, peer, send_count, recv_count;
274 struct ompi_request_t *request;
275
276 tmp_peer = tmp_rank ^ mask;
277 peer = (tmp_peer < remain) ? tmp_peer * 2 + 1 : tmp_peer + remain;
278
279
280 send_count = recv_count = 0;
281 if (tmp_rank < tmp_peer) {
282 send_index = recv_index + mask;
283 for (i = send_index ; i < last_index ; ++i) {
284 send_count += tmp_rcounts[i];
285 }
286 for (i = recv_index ; i < send_index ; ++i) {
287 recv_count += tmp_rcounts[i];
288 }
289 } else {
290 recv_index = send_index + mask;
291 for (i = send_index ; i < recv_index ; ++i) {
292 send_count += tmp_rcounts[i];
293 }
294 for (i = recv_index ; i < last_index ; ++i) {
295 recv_count += tmp_rcounts[i];
296 }
297 }
298
299
300
301 if (recv_count > 0) {
302 err = MCA_PML_CALL(irecv(recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
303 recv_count, dtype, peer,
304 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
305 comm, &request));
306 if (OMPI_SUCCESS != err) {
307 free(tmp_rcounts);
308 free(tmp_disps);
309 goto cleanup;
310 }
311 }
312 if (send_count > 0) {
313 err = MCA_PML_CALL(send(result_buf + (ptrdiff_t)tmp_disps[send_index] * extent,
314 send_count, dtype, peer,
315 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
316 MCA_PML_BASE_SEND_STANDARD,
317 comm));
318 if (OMPI_SUCCESS != err) {
319 free(tmp_rcounts);
320 free(tmp_disps);
321 goto cleanup;
322 }
323 }
324
325
326
327 if (recv_count > 0) {
328 err = ompi_request_wait(&request, MPI_STATUS_IGNORE);
329 if (OMPI_SUCCESS != err) {
330 free(tmp_rcounts);
331 free(tmp_disps);
332 goto cleanup;
333 }
334
335 ompi_op_reduce(op,
336 recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
337 result_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
338 recv_count, dtype);
339 }
340
341
342 send_index = recv_index;
343 last_index = recv_index + mask;
344 mask >>= 1;
345 }
346
347
348 if (0 != rcounts[rank]) {
349 err = ompi_datatype_sndrcv(result_buf + disps[rank] * extent,
350 rcounts[rank], dtype,
351 rbuf, rcounts[rank], dtype);
352 if (OMPI_SUCCESS != err) {
353 free(tmp_rcounts);
354 free(tmp_disps);
355 goto cleanup;
356 }
357 }
358
359 free(tmp_rcounts);
360 free(tmp_disps);
361 }
362
363
364
365 if (rank < (2 * remain)) {
366 if ((rank & 1) == 0) {
367 if (rcounts[rank]) {
368 err = MCA_PML_CALL(recv(rbuf, rcounts[rank], dtype, rank + 1,
369 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
370 comm, MPI_STATUS_IGNORE));
371 if (OMPI_SUCCESS != err) goto cleanup;
372 }
373 } else {
374 if (rcounts[rank - 1]) {
375 err = MCA_PML_CALL(send(result_buf + disps[rank - 1] * extent,
376 rcounts[rank - 1], dtype, rank - 1,
377 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
378 MCA_PML_BASE_SEND_STANDARD,
379 comm));
380 if (OMPI_SUCCESS != err) goto cleanup;
381 }
382 }
383 }
384
385 cleanup:
386 if (NULL != disps) free(disps);
387 if (NULL != recv_buf_free) free(recv_buf_free);
388 if (NULL != result_buf_free) free(result_buf_free);
389
390 return err;
391 }
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455 int
456 ompi_coll_base_reduce_scatter_intra_ring( const void *sbuf, void *rbuf, const int *rcounts,
457 struct ompi_datatype_t *dtype,
458 struct ompi_op_t *op,
459 struct ompi_communicator_t *comm,
460 mca_coll_base_module_t *module)
461 {
462 int ret, line, rank, size, i, k, recv_from, send_to, total_count, max_block_count;
463 int inbi, *displs = NULL;
464 char *tmpsend = NULL, *tmprecv = NULL, *accumbuf = NULL, *accumbuf_free = NULL;
465 char *inbuf_free[2] = {NULL, NULL}, *inbuf[2] = {NULL, NULL};
466 ptrdiff_t extent, max_real_segsize, dsize, gap = 0;
467 ompi_request_t *reqs[2] = {MPI_REQUEST_NULL, MPI_REQUEST_NULL};
468
469 size = ompi_comm_size(comm);
470 rank = ompi_comm_rank(comm);
471
472 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
473 "coll:base:reduce_scatter_intra_ring rank %d, size %d",
474 rank, size));
475
476
477
478
479 displs = (int*) malloc(size * sizeof(int));
480 if (NULL == displs) { ret = -1; line = __LINE__; goto error_hndl; }
481 displs[0] = 0;
482 total_count = rcounts[0];
483 max_block_count = rcounts[0];
484 for (i = 1; i < size; i++) {
485 displs[i] = total_count;
486 total_count += rcounts[i];
487 if (max_block_count < rcounts[i]) max_block_count = rcounts[i];
488 }
489
490
491 if (1 == size) {
492 if (MPI_IN_PLACE != sbuf) {
493 ret = ompi_datatype_copy_content_same_ddt(dtype, total_count,
494 (char*)rbuf, (char*)sbuf);
495 if (ret < 0) { line = __LINE__; goto error_hndl; }
496 }
497 free(displs);
498 return MPI_SUCCESS;
499 }
500
501
502
503
504
505
506 ret = ompi_datatype_type_extent(dtype, &extent);
507 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
508
509 max_real_segsize = opal_datatype_span(&dtype->super, max_block_count, &gap);
510 dsize = opal_datatype_span(&dtype->super, total_count, &gap);
511
512 accumbuf_free = (char*)malloc(dsize);
513 if (NULL == accumbuf_free) { ret = -1; line = __LINE__; goto error_hndl; }
514 accumbuf = accumbuf_free - gap;
515
516 inbuf_free[0] = (char*)malloc(max_real_segsize);
517 if (NULL == inbuf_free[0]) { ret = -1; line = __LINE__; goto error_hndl; }
518 inbuf[0] = inbuf_free[0] - gap;
519 if (size > 2) {
520 inbuf_free[1] = (char*)malloc(max_real_segsize);
521 if (NULL == inbuf_free[1]) { ret = -1; line = __LINE__; goto error_hndl; }
522 inbuf[1] = inbuf_free[1] - gap;
523 }
524
525
526 if (MPI_IN_PLACE == sbuf) {
527 sbuf = rbuf;
528 }
529
530 ret = ompi_datatype_copy_content_same_ddt(dtype, total_count,
531 accumbuf, (char*)sbuf);
532 if (ret < 0) { line = __LINE__; goto error_hndl; }
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551 send_to = (rank + 1) % size;
552 recv_from = (rank + size - 1) % size;
553
554 inbi = 0;
555
556 ret = MCA_PML_CALL(irecv(inbuf[inbi], max_block_count, dtype, recv_from,
557 MCA_COLL_BASE_TAG_REDUCE_SCATTER, comm,
558 &reqs[inbi]));
559 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
560 tmpsend = accumbuf + (ptrdiff_t)displs[recv_from] * extent;
561 ret = MCA_PML_CALL(send(tmpsend, rcounts[recv_from], dtype, send_to,
562 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
563 MCA_PML_BASE_SEND_STANDARD, comm));
564 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
565
566 for (k = 2; k < size; k++) {
567 const int prevblock = (rank + size - k) % size;
568
569 inbi = inbi ^ 0x1;
570
571
572 ret = MCA_PML_CALL(irecv(inbuf[inbi], max_block_count, dtype, recv_from,
573 MCA_COLL_BASE_TAG_REDUCE_SCATTER, comm,
574 &reqs[inbi]));
575 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
576
577
578 ret = ompi_request_wait(&reqs[inbi ^ 0x1], MPI_STATUS_IGNORE);
579 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
580
581
582
583
584 tmprecv = accumbuf + (ptrdiff_t)displs[prevblock] * extent;
585 ompi_op_reduce(op, inbuf[inbi ^ 0x1], tmprecv, rcounts[prevblock], dtype);
586
587
588 ret = MCA_PML_CALL(send(tmprecv, rcounts[prevblock], dtype, send_to,
589 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
590 MCA_PML_BASE_SEND_STANDARD, comm));
591 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
592 }
593
594
595 ret = ompi_request_wait(&reqs[inbi], MPI_STATUS_IGNORE);
596 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
597
598
599
600 tmprecv = accumbuf + (ptrdiff_t)displs[rank] * extent;
601 ompi_op_reduce(op, inbuf[inbi], tmprecv, rcounts[rank], dtype);
602
603
604 ret = ompi_datatype_copy_content_same_ddt(dtype, rcounts[rank], (char *)rbuf, tmprecv);
605 if (ret < 0) { line = __LINE__; goto error_hndl; }
606
607 if (NULL != displs) free(displs);
608 if (NULL != accumbuf_free) free(accumbuf_free);
609 if (NULL != inbuf_free[0]) free(inbuf_free[0]);
610 if (NULL != inbuf_free[1]) free(inbuf_free[1]);
611
612 return MPI_SUCCESS;
613
614 error_hndl:
615 OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "%s:%4d\tRank %d Error occurred %d\n",
616 __FILE__, line, rank, ret));
617 (void)line;
618 if (NULL != displs) free(displs);
619 if (NULL != accumbuf_free) free(accumbuf_free);
620 if (NULL != inbuf_free[0]) free(inbuf_free[0]);
621 if (NULL != inbuf_free[1]) free(inbuf_free[1]);
622 return ret;
623 }
624
625
626
627
628
629 static int ompi_sum_counts(const int *counts, int *displs, int nprocs_rem, int lo, int hi)
630 {
631
632 lo = (lo < nprocs_rem) ? lo * 2 : lo + nprocs_rem;
633 hi = (hi < nprocs_rem) ? hi * 2 + 1 : hi + nprocs_rem;
634 return displs[hi] + counts[hi] - displs[lo];
635 }
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690 int
691 ompi_coll_base_reduce_scatter_intra_butterfly(
692 const void *sbuf, void *rbuf, const int *rcounts, struct ompi_datatype_t *dtype,
693 struct ompi_op_t *op, struct ompi_communicator_t *comm,
694 mca_coll_base_module_t *module)
695 {
696 char *tmpbuf[2] = {NULL, NULL}, *psend, *precv;
697 int *displs = NULL, index;
698 ptrdiff_t span, gap, totalcount, extent;
699 int err = MPI_SUCCESS;
700 int comm_size = ompi_comm_size(comm);
701 int rank = ompi_comm_rank(comm);
702
703 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
704 "coll:base:reduce_scatter_intra_butterfly: rank %d/%d",
705 rank, comm_size));
706 if (comm_size < 2)
707 return MPI_SUCCESS;
708
709 displs = malloc(sizeof(*displs) * comm_size);
710 if (NULL == displs) {
711 err = OMPI_ERR_OUT_OF_RESOURCE;
712 goto cleanup_and_return;
713 }
714 displs[0] = 0;
715 for (int i = 1; i < comm_size; i++) {
716 displs[i] = displs[i - 1] + rcounts[i - 1];
717 }
718 totalcount = displs[comm_size - 1] + rcounts[comm_size - 1];
719
720 ompi_datatype_type_extent(dtype, &extent);
721 span = opal_datatype_span(&dtype->super, totalcount, &gap);
722 tmpbuf[0] = malloc(span);
723 tmpbuf[1] = malloc(span);
724 if (NULL == tmpbuf[0] || NULL == tmpbuf[1]) {
725 err = OMPI_ERR_OUT_OF_RESOURCE;
726 goto cleanup_and_return;
727 }
728 psend = tmpbuf[0] - gap;
729 precv = tmpbuf[1] - gap;
730
731 if (sbuf != MPI_IN_PLACE) {
732 err = ompi_datatype_copy_content_same_ddt(dtype, totalcount, psend, (char *)sbuf);
733 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
734 } else {
735 err = ompi_datatype_copy_content_same_ddt(dtype, totalcount, psend, rbuf);
736 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
737 }
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753 int nprocs_pof2 = opal_next_poweroftwo(comm_size);
754 nprocs_pof2 >>= 1;
755 int nprocs_rem = comm_size - nprocs_pof2;
756 int log2_size = opal_cube_dim(nprocs_pof2);
757
758 int vrank = -1;
759 if (rank < 2 * nprocs_rem) {
760 if ((rank % 2) == 0) {
761
762 err = MCA_PML_CALL(send(psend, totalcount, dtype, rank + 1,
763 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
764 MCA_PML_BASE_SEND_STANDARD, comm));
765 if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
766
767 vrank = -1;
768 } else {
769
770 err = MCA_PML_CALL(recv(precv, totalcount, dtype, rank - 1,
771 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
772 comm, MPI_STATUS_IGNORE));
773 if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
774 ompi_op_reduce(op, precv, psend, totalcount, dtype);
775
776 vrank = rank / 2;
777 }
778 } else {
779
780 vrank = rank - nprocs_rem;
781 }
782
783 if (vrank != -1) {
784
785
786
787
788
789
790
791
792
793
794
795 int nblocks = nprocs_pof2, send_index = 0, recv_index = 0;
796 for (int mask = 1; mask < nprocs_pof2; mask <<= 1) {
797 int vpeer = vrank ^ mask;
798 int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem;
799
800 nblocks /= 2;
801 if ((vrank & mask) == 0) {
802
803 send_index += nblocks;
804 } else {
805
806 recv_index += nblocks;
807 }
808
809
810 int send_count = ompi_sum_counts(rcounts, displs, nprocs_rem,
811 send_index, send_index + nblocks - 1);
812 index = (send_index < nprocs_rem) ? 2 * send_index : nprocs_rem + send_index;
813 ptrdiff_t sdispl = displs[index];
814
815
816 int recv_count = ompi_sum_counts(rcounts, displs, nprocs_rem,
817 recv_index, recv_index + nblocks - 1);
818 index = (recv_index < nprocs_rem) ? 2 * recv_index : nprocs_rem + recv_index;
819 ptrdiff_t rdispl = displs[index];
820
821 err = ompi_coll_base_sendrecv(psend + (ptrdiff_t)sdispl * extent, send_count,
822 dtype, peer, MCA_COLL_BASE_TAG_REDUCE_SCATTER,
823 precv + (ptrdiff_t)rdispl * extent, recv_count,
824 dtype, peer, MCA_COLL_BASE_TAG_REDUCE_SCATTER,
825 comm, MPI_STATUS_IGNORE, rank);
826 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
827
828 if (vrank < vpeer) {
829
830 ompi_op_reduce(op, psend + (ptrdiff_t)rdispl * extent,
831 precv + (ptrdiff_t)rdispl * extent, recv_count, dtype);
832 char *p = psend;
833 psend = precv;
834 precv = p;
835 } else {
836
837 ompi_op_reduce(op, precv + (ptrdiff_t)rdispl * extent,
838 psend + (ptrdiff_t)rdispl * extent, recv_count, dtype);
839 }
840 send_index = recv_index;
841 }
842
843
844
845
846 int vpeer = ompi_mirror_perm(vrank, log2_size);
847 int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem;
848 index = (send_index < nprocs_rem) ? 2 * send_index : nprocs_rem + send_index;
849
850 if (vpeer < nprocs_rem) {
851
852
853
854
855 err = MCA_PML_CALL(send(psend + (ptrdiff_t)displs[index] * extent,
856 rcounts[index], dtype, peer - 1,
857 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
858 MCA_PML_BASE_SEND_STANDARD, comm));
859 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
860 }
861
862
863 if (vpeer < nprocs_rem)
864 index++;
865 if (vpeer != vrank) {
866 err = ompi_coll_base_sendrecv(psend + (ptrdiff_t)displs[index] * extent,
867 rcounts[index], dtype, peer,
868 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
869 rbuf, rcounts[rank], dtype, peer,
870 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
871 comm, MPI_STATUS_IGNORE, rank);
872 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
873 } else {
874 err = ompi_datatype_copy_content_same_ddt(dtype, rcounts[rank], rbuf,
875 psend + (ptrdiff_t)displs[rank] * extent);
876 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
877 }
878
879 } else {
880
881 int vpeer = ompi_mirror_perm((rank + 1) / 2, log2_size);
882 int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem;
883 err = MCA_PML_CALL(recv(rbuf, rcounts[rank], dtype, peer,
884 MCA_COLL_BASE_TAG_REDUCE_SCATTER, comm,
885 MPI_STATUS_IGNORE));
886 if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
887 }
888
889 cleanup_and_return:
890 if (displs)
891 free(displs);
892 if (tmpbuf[0])
893 free(tmpbuf[0]);
894 if (tmpbuf[1])
895 free(tmpbuf[1]);
896 return err;
897 }