This source file includes following definitions.
- ompi_coll_base_allreduce_intra_nonoverlapping
- ompi_coll_base_allreduce_intra_recursivedoubling
- ompi_coll_base_allreduce_intra_ring
- ompi_coll_base_allreduce_intra_ring_segmented
- ompi_coll_base_allreduce_intra_basic_linear
- ompi_coll_base_allreduce_intra_redscat_allgather
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 #include "ompi_config.h"
28
29 #include "mpi.h"
30 #include "opal/util/bit_ops.h"
31 #include "ompi/constants.h"
32 #include "ompi/datatype/ompi_datatype.h"
33 #include "ompi/communicator/communicator.h"
34 #include "ompi/mca/coll/coll.h"
35 #include "ompi/mca/coll/base/coll_tags.h"
36 #include "ompi/mca/pml/pml.h"
37 #include "ompi/op/op.h"
38 #include "ompi/mca/coll/base/coll_base_functions.h"
39 #include "coll_base_topo.h"
40 #include "coll_base_util.h"
41
42
43
44
45
46
47
48
49
50
51
52
53 int
54 ompi_coll_base_allreduce_intra_nonoverlapping(const void *sbuf, void *rbuf, int count,
55 struct ompi_datatype_t *dtype,
56 struct ompi_op_t *op,
57 struct ompi_communicator_t *comm,
58 mca_coll_base_module_t *module)
59 {
60 int err, rank;
61
62 rank = ompi_comm_rank(comm);
63
64 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,"coll:base:allreduce_intra_nonoverlapping rank %d", rank));
65
66
67
68 if (MPI_IN_PLACE == sbuf) {
69 if (0 == rank) {
70 err = comm->c_coll->coll_reduce (MPI_IN_PLACE, rbuf, count, dtype,
71 op, 0, comm, comm->c_coll->coll_reduce_module);
72 } else {
73 err = comm->c_coll->coll_reduce (rbuf, NULL, count, dtype, op, 0,
74 comm, comm->c_coll->coll_reduce_module);
75 }
76 } else {
77 err = comm->c_coll->coll_reduce (sbuf, rbuf, count, dtype, op, 0,
78 comm, comm->c_coll->coll_reduce_module);
79 }
80 if (MPI_SUCCESS != err) {
81 return err;
82 }
83
84 return comm->c_coll->coll_bcast (rbuf, count, dtype, 0, comm,
85 comm->c_coll->coll_bcast_module);
86 }
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129 int
130 ompi_coll_base_allreduce_intra_recursivedoubling(const void *sbuf, void *rbuf,
131 int count,
132 struct ompi_datatype_t *dtype,
133 struct ompi_op_t *op,
134 struct ompi_communicator_t *comm,
135 mca_coll_base_module_t *module)
136 {
137 int ret, line, rank, size, adjsize, remote, distance;
138 int newrank, newremote, extra_ranks;
139 char *tmpsend = NULL, *tmprecv = NULL, *tmpswap = NULL, *inplacebuf_free = NULL, *inplacebuf;
140 ptrdiff_t span, gap = 0;
141
142 size = ompi_comm_size(comm);
143 rank = ompi_comm_rank(comm);
144
145 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
146 "coll:base:allreduce_intra_recursivedoubling rank %d", rank));
147
148
149 if (1 == size) {
150 if (MPI_IN_PLACE != sbuf) {
151 ret = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf);
152 if (ret < 0) { line = __LINE__; goto error_hndl; }
153 }
154 return MPI_SUCCESS;
155 }
156
157
158 span = opal_datatype_span(&dtype->super, count, &gap);
159 inplacebuf_free = (char*) malloc(span);
160 if (NULL == inplacebuf_free) { ret = -1; line = __LINE__; goto error_hndl; }
161 inplacebuf = inplacebuf_free - gap;
162
163 if (MPI_IN_PLACE == sbuf) {
164 ret = ompi_datatype_copy_content_same_ddt(dtype, count, inplacebuf, (char*)rbuf);
165 if (ret < 0) { line = __LINE__; goto error_hndl; }
166 } else {
167 ret = ompi_datatype_copy_content_same_ddt(dtype, count, inplacebuf, (char*)sbuf);
168 if (ret < 0) { line = __LINE__; goto error_hndl; }
169 }
170
171 tmpsend = (char*) inplacebuf;
172 tmprecv = (char*) rbuf;
173
174
175 adjsize = opal_next_poweroftwo (size);
176 adjsize >>= 1;
177
178
179
180
181
182
183
184
185 extra_ranks = size - adjsize;
186 if (rank < (2 * extra_ranks)) {
187 if (0 == (rank % 2)) {
188 ret = MCA_PML_CALL(send(tmpsend, count, dtype, (rank + 1),
189 MCA_COLL_BASE_TAG_ALLREDUCE,
190 MCA_PML_BASE_SEND_STANDARD, comm));
191 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
192 newrank = -1;
193 } else {
194 ret = MCA_PML_CALL(recv(tmprecv, count, dtype, (rank - 1),
195 MCA_COLL_BASE_TAG_ALLREDUCE, comm,
196 MPI_STATUS_IGNORE));
197 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
198
199 ompi_op_reduce(op, tmprecv, tmpsend, count, dtype);
200 newrank = rank >> 1;
201 }
202 } else {
203 newrank = rank - extra_ranks;
204 }
205
206
207
208
209
210
211 for (distance = 0x1; distance < adjsize; distance <<=1) {
212 if (newrank < 0) break;
213
214 newremote = newrank ^ distance;
215 remote = (newremote < extra_ranks)?
216 (newremote * 2 + 1):(newremote + extra_ranks);
217
218
219 ret = ompi_coll_base_sendrecv_actual(tmpsend, count, dtype, remote,
220 MCA_COLL_BASE_TAG_ALLREDUCE,
221 tmprecv, count, dtype, remote,
222 MCA_COLL_BASE_TAG_ALLREDUCE,
223 comm, MPI_STATUS_IGNORE);
224 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
225
226
227 if (rank < remote) {
228
229 ompi_op_reduce(op, tmpsend, tmprecv, count, dtype);
230 tmpswap = tmprecv;
231 tmprecv = tmpsend;
232 tmpsend = tmpswap;
233 } else {
234
235 ompi_op_reduce(op, tmprecv, tmpsend, count, dtype);
236 }
237 }
238
239
240
241
242
243
244 if (rank < (2 * extra_ranks)) {
245 if (0 == (rank % 2)) {
246 ret = MCA_PML_CALL(recv(rbuf, count, dtype, (rank + 1),
247 MCA_COLL_BASE_TAG_ALLREDUCE, comm,
248 MPI_STATUS_IGNORE));
249 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
250 tmpsend = (char*)rbuf;
251 } else {
252 ret = MCA_PML_CALL(send(tmpsend, count, dtype, (rank - 1),
253 MCA_COLL_BASE_TAG_ALLREDUCE,
254 MCA_PML_BASE_SEND_STANDARD, comm));
255 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
256 }
257 }
258
259
260 if (tmpsend != rbuf) {
261 ret = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)rbuf, tmpsend);
262 if (ret < 0) { line = __LINE__; goto error_hndl; }
263 }
264
265 if (NULL != inplacebuf_free) free(inplacebuf_free);
266 return MPI_SUCCESS;
267
268 error_hndl:
269 OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "%s:%4d\tRank %d Error occurred %d\n",
270 __FILE__, line, rank, ret));
271 (void)line;
272 if (NULL != inplacebuf_free) free(inplacebuf_free);
273 return ret;
274 }
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340 int
341 ompi_coll_base_allreduce_intra_ring(const void *sbuf, void *rbuf, int count,
342 struct ompi_datatype_t *dtype,
343 struct ompi_op_t *op,
344 struct ompi_communicator_t *comm,
345 mca_coll_base_module_t *module)
346 {
347 int ret, line, rank, size, k, recv_from, send_to, block_count, inbi;
348 int early_segcount, late_segcount, split_rank, max_segcount;
349 size_t typelng;
350 char *tmpsend = NULL, *tmprecv = NULL, *inbuf[2] = {NULL, NULL};
351 ptrdiff_t true_lb, true_extent, lb, extent;
352 ptrdiff_t block_offset, max_real_segsize;
353 ompi_request_t *reqs[2] = {MPI_REQUEST_NULL, MPI_REQUEST_NULL};
354
355 size = ompi_comm_size(comm);
356 rank = ompi_comm_rank(comm);
357
358 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
359 "coll:base:allreduce_intra_ring rank %d, count %d", rank, count));
360
361
362 if (1 == size) {
363 if (MPI_IN_PLACE != sbuf) {
364 ret = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf);
365 if (ret < 0) { line = __LINE__; goto error_hndl; }
366 }
367 return MPI_SUCCESS;
368 }
369
370
371 if (count < size) {
372 OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "coll:base:allreduce_ring rank %d/%d, count %d, switching to recursive doubling", rank, size, count));
373 return (ompi_coll_base_allreduce_intra_recursivedoubling(sbuf, rbuf,
374 count,
375 dtype, op,
376 comm, module));
377 }
378
379
380 ret = ompi_datatype_get_extent(dtype, &lb, &extent);
381 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
382 ret = ompi_datatype_get_true_extent(dtype, &true_lb, &true_extent);
383 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
384 ret = ompi_datatype_type_size( dtype, &typelng);
385 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
386
387
388
389
390
391
392
393
394 COLL_BASE_COMPUTE_BLOCKCOUNT( count, size, split_rank,
395 early_segcount, late_segcount );
396 max_segcount = early_segcount;
397 max_real_segsize = true_extent + (max_segcount - 1) * extent;
398
399
400 inbuf[0] = (char*)malloc(max_real_segsize);
401 if (NULL == inbuf[0]) { ret = -1; line = __LINE__; goto error_hndl; }
402 if (size > 2) {
403 inbuf[1] = (char*)malloc(max_real_segsize);
404 if (NULL == inbuf[1]) { ret = -1; line = __LINE__; goto error_hndl; }
405 }
406
407
408 if (MPI_IN_PLACE != sbuf) {
409 ret = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf);
410 if (ret < 0) { line = __LINE__; goto error_hndl; }
411 }
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430 send_to = (rank + 1) % size;
431 recv_from = (rank + size - 1) % size;
432
433 inbi = 0;
434
435 ret = MCA_PML_CALL(irecv(inbuf[inbi], max_segcount, dtype, recv_from,
436 MCA_COLL_BASE_TAG_ALLREDUCE, comm, &reqs[inbi]));
437 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
438
439 block_offset = ((rank < split_rank)?
440 ((ptrdiff_t)rank * (ptrdiff_t)early_segcount) :
441 ((ptrdiff_t)rank * (ptrdiff_t)late_segcount + split_rank));
442 block_count = ((rank < split_rank)? early_segcount : late_segcount);
443 tmpsend = ((char*)rbuf) + block_offset * extent;
444 ret = MCA_PML_CALL(send(tmpsend, block_count, dtype, send_to,
445 MCA_COLL_BASE_TAG_ALLREDUCE,
446 MCA_PML_BASE_SEND_STANDARD, comm));
447 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
448
449 for (k = 2; k < size; k++) {
450 const int prevblock = (rank + size - k + 1) % size;
451
452 inbi = inbi ^ 0x1;
453
454
455 ret = MCA_PML_CALL(irecv(inbuf[inbi], max_segcount, dtype, recv_from,
456 MCA_COLL_BASE_TAG_ALLREDUCE, comm, &reqs[inbi]));
457 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
458
459
460 ret = ompi_request_wait(&reqs[inbi ^ 0x1], MPI_STATUS_IGNORE);
461 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
462
463
464
465
466 block_offset = ((prevblock < split_rank)?
467 ((ptrdiff_t)prevblock * early_segcount) :
468 ((ptrdiff_t)prevblock * late_segcount + split_rank));
469 block_count = ((prevblock < split_rank)? early_segcount : late_segcount);
470 tmprecv = ((char*)rbuf) + (ptrdiff_t)block_offset * extent;
471 ompi_op_reduce(op, inbuf[inbi ^ 0x1], tmprecv, block_count, dtype);
472
473
474 ret = MCA_PML_CALL(send(tmprecv, block_count, dtype, send_to,
475 MCA_COLL_BASE_TAG_ALLREDUCE,
476 MCA_PML_BASE_SEND_STANDARD, comm));
477 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
478 }
479
480
481 ret = ompi_request_wait(&reqs[inbi], MPI_STATUS_IGNORE);
482 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
483
484
485
486 recv_from = (rank + 1) % size;
487 block_offset = ((recv_from < split_rank)?
488 ((ptrdiff_t)recv_from * early_segcount) :
489 ((ptrdiff_t)recv_from * late_segcount + split_rank));
490 block_count = ((recv_from < split_rank)? early_segcount : late_segcount);
491 tmprecv = ((char*)rbuf) + (ptrdiff_t)block_offset * extent;
492 ompi_op_reduce(op, inbuf[inbi], tmprecv, block_count, dtype);
493
494
495 send_to = (rank + 1) % size;
496 recv_from = (rank + size - 1) % size;
497 for (k = 0; k < size - 1; k++) {
498 const int recv_data_from = (rank + size - k) % size;
499 const int send_data_from = (rank + 1 + size - k) % size;
500 const int send_block_offset =
501 ((send_data_from < split_rank)?
502 ((ptrdiff_t)send_data_from * early_segcount) :
503 ((ptrdiff_t)send_data_from * late_segcount + split_rank));
504 const int recv_block_offset =
505 ((recv_data_from < split_rank)?
506 ((ptrdiff_t)recv_data_from * early_segcount) :
507 ((ptrdiff_t)recv_data_from * late_segcount + split_rank));
508 block_count = ((send_data_from < split_rank)?
509 early_segcount : late_segcount);
510
511 tmprecv = (char*)rbuf + (ptrdiff_t)recv_block_offset * extent;
512 tmpsend = (char*)rbuf + (ptrdiff_t)send_block_offset * extent;
513
514 ret = ompi_coll_base_sendrecv(tmpsend, block_count, dtype, send_to,
515 MCA_COLL_BASE_TAG_ALLREDUCE,
516 tmprecv, max_segcount, dtype, recv_from,
517 MCA_COLL_BASE_TAG_ALLREDUCE,
518 comm, MPI_STATUS_IGNORE, rank);
519 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl;}
520
521 }
522
523 if (NULL != inbuf[0]) free(inbuf[0]);
524 if (NULL != inbuf[1]) free(inbuf[1]);
525
526 return MPI_SUCCESS;
527
528 error_hndl:
529 OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "%s:%4d\tRank %d Error occurred %d\n",
530 __FILE__, line, rank, ret));
531 ompi_coll_base_free_reqs(reqs, 2);
532 (void)line;
533 if (NULL != inbuf[0]) free(inbuf[0]);
534 if (NULL != inbuf[1]) free(inbuf[1]);
535 return ret;
536 }
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617 int
618 ompi_coll_base_allreduce_intra_ring_segmented(const void *sbuf, void *rbuf, int count,
619 struct ompi_datatype_t *dtype,
620 struct ompi_op_t *op,
621 struct ompi_communicator_t *comm,
622 mca_coll_base_module_t *module,
623 uint32_t segsize)
624 {
625 int ret, line, rank, size, k, recv_from, send_to;
626 int early_blockcount, late_blockcount, split_rank;
627 int segcount, max_segcount, num_phases, phase, block_count, inbi;
628 size_t typelng;
629 char *tmpsend = NULL, *tmprecv = NULL, *inbuf[2] = {NULL, NULL};
630 ptrdiff_t block_offset, max_real_segsize;
631 ompi_request_t *reqs[2] = {MPI_REQUEST_NULL, MPI_REQUEST_NULL};
632 ptrdiff_t lb, extent, gap;
633
634 size = ompi_comm_size(comm);
635 rank = ompi_comm_rank(comm);
636
637 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
638 "coll:base:allreduce_intra_ring_segmented rank %d, count %d", rank, count));
639
640
641 if (1 == size) {
642 if (MPI_IN_PLACE != sbuf) {
643 ret = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf);
644 if (ret < 0) { line = __LINE__; goto error_hndl; }
645 }
646 return MPI_SUCCESS;
647 }
648
649
650 ret = ompi_datatype_type_size( dtype, &typelng);
651 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
652 segcount = count;
653 COLL_BASE_COMPUTED_SEGCOUNT(segsize, typelng, segcount)
654
655
656 if (count < (size * segcount)) {
657 OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "coll:base:allreduce_ring_segmented rank %d/%d, count %d, switching to regular ring", rank, size, count));
658 return (ompi_coll_base_allreduce_intra_ring(sbuf, rbuf, count, dtype, op,
659 comm, module));
660 }
661
662
663 num_phases = count / (size * segcount);
664 if ((count % (size * segcount) >= size) &&
665 (count % (size * segcount) > ((size * segcount) / 2))) {
666 num_phases++;
667 }
668
669
670
671
672
673
674
675
676
677
678 COLL_BASE_COMPUTE_BLOCKCOUNT( count, size, split_rank,
679 early_blockcount, late_blockcount );
680 COLL_BASE_COMPUTE_BLOCKCOUNT( early_blockcount, num_phases, inbi,
681 max_segcount, k);
682
683 ret = ompi_datatype_get_extent(dtype, &lb, &extent);
684 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
685 max_real_segsize = opal_datatype_span(&dtype->super, max_segcount, &gap);
686
687
688 inbuf[0] = (char*)malloc(max_real_segsize);
689 if (NULL == inbuf[0]) { ret = -1; line = __LINE__; goto error_hndl; }
690 if (size > 2) {
691 inbuf[1] = (char*)malloc(max_real_segsize);
692 if (NULL == inbuf[1]) { ret = -1; line = __LINE__; goto error_hndl; }
693 }
694
695
696 if (MPI_IN_PLACE != sbuf) {
697 ret = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf);
698 if (ret < 0) { line = __LINE__; goto error_hndl; }
699 }
700
701
702 for (phase = 0; phase < num_phases; phase ++) {
703 ptrdiff_t phase_offset;
704 int early_phase_segcount, late_phase_segcount, split_phase, phase_count;
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723 send_to = (rank + 1) % size;
724 recv_from = (rank + size - 1) % size;
725
726 inbi = 0;
727
728 ret = MCA_PML_CALL(irecv(inbuf[inbi], max_segcount, dtype, recv_from,
729 MCA_COLL_BASE_TAG_ALLREDUCE, comm, &reqs[inbi]));
730 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
731
732
733
734 block_offset = ((rank < split_rank)?
735 ((ptrdiff_t)rank * (ptrdiff_t)early_blockcount) :
736 ((ptrdiff_t)rank * (ptrdiff_t)late_blockcount + split_rank));
737 block_count = ((rank < split_rank)? early_blockcount : late_blockcount);
738 COLL_BASE_COMPUTE_BLOCKCOUNT(block_count, num_phases, split_phase,
739 early_phase_segcount, late_phase_segcount)
740 phase_count = ((phase < split_phase)?
741 (early_phase_segcount) : (late_phase_segcount));
742 phase_offset = ((phase < split_phase)?
743 ((ptrdiff_t)phase * (ptrdiff_t)early_phase_segcount) :
744 ((ptrdiff_t)phase * (ptrdiff_t)late_phase_segcount + split_phase));
745 tmpsend = ((char*)rbuf) + (ptrdiff_t)(block_offset + phase_offset) * extent;
746 ret = MCA_PML_CALL(send(tmpsend, phase_count, dtype, send_to,
747 MCA_COLL_BASE_TAG_ALLREDUCE,
748 MCA_PML_BASE_SEND_STANDARD, comm));
749 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
750
751 for (k = 2; k < size; k++) {
752 const int prevblock = (rank + size - k + 1) % size;
753
754 inbi = inbi ^ 0x1;
755
756
757 ret = MCA_PML_CALL(irecv(inbuf[inbi], max_segcount, dtype, recv_from,
758 MCA_COLL_BASE_TAG_ALLREDUCE, comm,
759 &reqs[inbi]));
760 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
761
762
763 ret = ompi_request_wait(&reqs[inbi ^ 0x1], MPI_STATUS_IGNORE);
764 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
765
766
767
768
769 block_offset = ((prevblock < split_rank)?
770 ((ptrdiff_t)prevblock * (ptrdiff_t)early_blockcount) :
771 ((ptrdiff_t)prevblock * (ptrdiff_t)late_blockcount + split_rank));
772 block_count = ((prevblock < split_rank)?
773 early_blockcount : late_blockcount);
774 COLL_BASE_COMPUTE_BLOCKCOUNT(block_count, num_phases, split_phase,
775 early_phase_segcount, late_phase_segcount)
776 phase_count = ((phase < split_phase)?
777 (early_phase_segcount) : (late_phase_segcount));
778 phase_offset = ((phase < split_phase)?
779 ((ptrdiff_t)phase * (ptrdiff_t)early_phase_segcount) :
780 ((ptrdiff_t)phase * (ptrdiff_t)late_phase_segcount + split_phase));
781 tmprecv = ((char*)rbuf) + (ptrdiff_t)(block_offset + phase_offset) * extent;
782 ompi_op_reduce(op, inbuf[inbi ^ 0x1], tmprecv, phase_count, dtype);
783
784
785 ret = MCA_PML_CALL(send(tmprecv, phase_count, dtype, send_to,
786 MCA_COLL_BASE_TAG_ALLREDUCE,
787 MCA_PML_BASE_SEND_STANDARD, comm));
788 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
789 }
790
791
792 ret = ompi_request_wait(&reqs[inbi], MPI_STATUS_IGNORE);
793 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
794
795
796
797 recv_from = (rank + 1) % size;
798 block_offset = ((recv_from < split_rank)?
799 ((ptrdiff_t)recv_from * (ptrdiff_t)early_blockcount) :
800 ((ptrdiff_t)recv_from * (ptrdiff_t)late_blockcount + split_rank));
801 block_count = ((recv_from < split_rank)?
802 early_blockcount : late_blockcount);
803 COLL_BASE_COMPUTE_BLOCKCOUNT(block_count, num_phases, split_phase,
804 early_phase_segcount, late_phase_segcount)
805 phase_count = ((phase < split_phase)?
806 (early_phase_segcount) : (late_phase_segcount));
807 phase_offset = ((phase < split_phase)?
808 ((ptrdiff_t)phase * (ptrdiff_t)early_phase_segcount) :
809 ((ptrdiff_t)phase * (ptrdiff_t)late_phase_segcount + split_phase));
810 tmprecv = ((char*)rbuf) + (ptrdiff_t)(block_offset + phase_offset) * extent;
811 ompi_op_reduce(op, inbuf[inbi], tmprecv, phase_count, dtype);
812 }
813
814
815 send_to = (rank + 1) % size;
816 recv_from = (rank + size - 1) % size;
817 for (k = 0; k < size - 1; k++) {
818 const int recv_data_from = (rank + size - k) % size;
819 const int send_data_from = (rank + 1 + size - k) % size;
820 const int send_block_offset =
821 ((send_data_from < split_rank)?
822 ((ptrdiff_t)send_data_from * (ptrdiff_t)early_blockcount) :
823 ((ptrdiff_t)send_data_from * (ptrdiff_t)late_blockcount + split_rank));
824 const int recv_block_offset =
825 ((recv_data_from < split_rank)?
826 ((ptrdiff_t)recv_data_from * (ptrdiff_t)early_blockcount) :
827 ((ptrdiff_t)recv_data_from * (ptrdiff_t)late_blockcount + split_rank));
828 block_count = ((send_data_from < split_rank)?
829 early_blockcount : late_blockcount);
830
831 tmprecv = (char*)rbuf + (ptrdiff_t)recv_block_offset * extent;
832 tmpsend = (char*)rbuf + (ptrdiff_t)send_block_offset * extent;
833
834 ret = ompi_coll_base_sendrecv(tmpsend, block_count, dtype, send_to,
835 MCA_COLL_BASE_TAG_ALLREDUCE,
836 tmprecv, early_blockcount, dtype, recv_from,
837 MCA_COLL_BASE_TAG_ALLREDUCE,
838 comm, MPI_STATUS_IGNORE, rank);
839 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl;}
840
841 }
842
843 if (NULL != inbuf[0]) free(inbuf[0]);
844 if (NULL != inbuf[1]) free(inbuf[1]);
845
846 return MPI_SUCCESS;
847
848 error_hndl:
849 OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "%s:%4d\tRank %d Error occurred %d\n",
850 __FILE__, line, rank, ret));
851 ompi_coll_base_free_reqs(reqs, 2);
852 (void)line;
853 if (NULL != inbuf[0]) free(inbuf[0]);
854 if (NULL != inbuf[1]) free(inbuf[1]);
855 return ret;
856 }
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880 int
881 ompi_coll_base_allreduce_intra_basic_linear(const void *sbuf, void *rbuf, int count,
882 struct ompi_datatype_t *dtype,
883 struct ompi_op_t *op,
884 struct ompi_communicator_t *comm,
885 mca_coll_base_module_t *module)
886 {
887 int err, rank;
888
889 rank = ompi_comm_rank(comm);
890
891 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,"coll:base:allreduce_intra_basic_linear rank %d", rank));
892
893
894
895 if (MPI_IN_PLACE == sbuf) {
896 if (0 == rank) {
897 err = ompi_coll_base_reduce_intra_basic_linear (MPI_IN_PLACE, rbuf, count, dtype,
898 op, 0, comm, module);
899 } else {
900 err = ompi_coll_base_reduce_intra_basic_linear(rbuf, NULL, count, dtype,
901 op, 0, comm, module);
902 }
903 } else {
904 err = ompi_coll_base_reduce_intra_basic_linear(sbuf, rbuf, count, dtype,
905 op, 0, comm, module);
906 }
907 if (MPI_SUCCESS != err) {
908 return err;
909 }
910
911 return ompi_coll_base_bcast_intra_basic_linear(rbuf, count, dtype, 0, comm, module);
912 }
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970 int ompi_coll_base_allreduce_intra_redscat_allgather(
971 const void *sbuf, void *rbuf, int count, struct ompi_datatype_t *dtype,
972 struct ompi_op_t *op, struct ompi_communicator_t *comm,
973 mca_coll_base_module_t *module)
974 {
975 int *rindex = NULL, *rcount = NULL, *sindex = NULL, *scount = NULL;
976
977 int comm_size = ompi_comm_size(comm);
978 int rank = ompi_comm_rank(comm);
979 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
980 "coll:base:allreduce_intra_redscat_allgather: rank %d/%d",
981 rank, comm_size));
982
983
984 int nsteps = opal_hibit(comm_size, comm->c_cube_dim + 1);
985 assert(nsteps >= 0);
986 int nprocs_pof2 = 1 << nsteps;
987
988 if (count < nprocs_pof2 || !ompi_op_is_commute(op)) {
989 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
990 "coll:base:allreduce_intra_redscat_allgather: rank %d/%d "
991 "count %d switching to basic linear allreduce",
992 rank, comm_size, count));
993 return ompi_coll_base_allreduce_intra_basic_linear(sbuf, rbuf, count, dtype,
994 op, comm, module);
995 }
996
997 int err = MPI_SUCCESS;
998 ptrdiff_t lb, extent, dsize, gap = 0;
999 ompi_datatype_get_extent(dtype, &lb, &extent);
1000 dsize = opal_datatype_span(&dtype->super, count, &gap);
1001
1002
1003 char *tmp_buf = NULL;
1004 char *tmp_buf_raw = (char *)malloc(dsize);
1005 if (NULL == tmp_buf_raw)
1006 return OMPI_ERR_OUT_OF_RESOURCE;
1007 tmp_buf = tmp_buf_raw - gap;
1008
1009 if (sbuf != MPI_IN_PLACE) {
1010 err = ompi_datatype_copy_content_same_ddt(dtype, count, (char *)rbuf,
1011 (char *)sbuf);
1012 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
1013 }
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032 int vrank, step, wsize;
1033 int nprocs_rem = comm_size - nprocs_pof2;
1034
1035 if (rank < 2 * nprocs_rem) {
1036 int count_lhalf = count / 2;
1037 int count_rhalf = count - count_lhalf;
1038
1039 if (rank % 2 != 0) {
1040
1041
1042
1043
1044
1045 err = ompi_coll_base_sendrecv(rbuf, count_lhalf, dtype, rank - 1,
1046 MCA_COLL_BASE_TAG_ALLREDUCE,
1047 (char *)tmp_buf + (ptrdiff_t)count_lhalf * extent,
1048 count_rhalf, dtype, rank - 1,
1049 MCA_COLL_BASE_TAG_ALLREDUCE, comm,
1050 MPI_STATUS_IGNORE, rank);
1051 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
1052
1053
1054 ompi_op_reduce(op, (char *)tmp_buf + (ptrdiff_t)count_lhalf * extent,
1055 (char *)rbuf + count_lhalf * extent, count_rhalf, dtype);
1056
1057
1058 err = MCA_PML_CALL(send((char *)rbuf + (ptrdiff_t)count_lhalf * extent,
1059 count_rhalf, dtype, rank - 1,
1060 MCA_COLL_BASE_TAG_ALLREDUCE,
1061 MCA_PML_BASE_SEND_STANDARD, comm));
1062 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
1063
1064
1065 vrank = -1;
1066
1067 } else {
1068
1069
1070
1071
1072
1073 err = ompi_coll_base_sendrecv((char *)rbuf + (ptrdiff_t)count_lhalf * extent,
1074 count_rhalf, dtype, rank + 1,
1075 MCA_COLL_BASE_TAG_ALLREDUCE,
1076 tmp_buf, count_lhalf, dtype, rank + 1,
1077 MCA_COLL_BASE_TAG_ALLREDUCE, comm,
1078 MPI_STATUS_IGNORE, rank);
1079 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
1080
1081
1082 ompi_op_reduce(op, tmp_buf, rbuf, count_lhalf, dtype);
1083
1084
1085 err = MCA_PML_CALL(recv((char *)rbuf + (ptrdiff_t)count_lhalf * extent,
1086 count_rhalf, dtype, rank + 1,
1087 MCA_COLL_BASE_TAG_ALLREDUCE, comm,
1088 MPI_STATUS_IGNORE));
1089 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
1090
1091 vrank = rank / 2;
1092 }
1093 } else {
1094 vrank = rank - nprocs_rem;
1095 }
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109 rindex = malloc(sizeof(*rindex) * nsteps);
1110 sindex = malloc(sizeof(*sindex) * nsteps);
1111 rcount = malloc(sizeof(*rcount) * nsteps);
1112 scount = malloc(sizeof(*scount) * nsteps);
1113 if (NULL == rindex || NULL == sindex || NULL == rcount || NULL == scount) {
1114 err = OMPI_ERR_OUT_OF_RESOURCE;
1115 goto cleanup_and_return;
1116 }
1117
1118 if (vrank != -1) {
1119 step = 0;
1120 wsize = count;
1121 sindex[0] = rindex[0] = 0;
1122
1123 for (int mask = 1; mask < nprocs_pof2; mask <<= 1) {
1124
1125
1126
1127
1128 int vdest = vrank ^ mask;
1129
1130 int dest = (vdest < nprocs_rem) ? vdest * 2 : vdest + nprocs_rem;
1131
1132 if (rank < dest) {
1133
1134
1135
1136
1137
1138 rcount[step] = wsize / 2;
1139 scount[step] = wsize - rcount[step];
1140 sindex[step] = rindex[step] + rcount[step];
1141 } else {
1142
1143
1144
1145
1146
1147 scount[step] = wsize / 2;
1148 rcount[step] = wsize - scount[step];
1149 rindex[step] = sindex[step] + scount[step];
1150 }
1151
1152
1153 err = ompi_coll_base_sendrecv((char *)rbuf + (ptrdiff_t)sindex[step] * extent,
1154 scount[step], dtype, dest,
1155 MCA_COLL_BASE_TAG_ALLREDUCE,
1156 (char *)tmp_buf + (ptrdiff_t)rindex[step] * extent,
1157 rcount[step], dtype, dest,
1158 MCA_COLL_BASE_TAG_ALLREDUCE, comm,
1159 MPI_STATUS_IGNORE, rank);
1160 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
1161
1162
1163 ompi_op_reduce(op, (char *)tmp_buf + (ptrdiff_t)rindex[step] * extent,
1164 (char *)rbuf + (ptrdiff_t)rindex[step] * extent,
1165 rcount[step], dtype);
1166
1167
1168 if (step + 1 < nsteps) {
1169 rindex[step + 1] = rindex[step];
1170 sindex[step + 1] = rindex[step];
1171 wsize = rcount[step];
1172 step++;
1173 }
1174 }
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188 step = nsteps - 1;
1189
1190 for (int mask = nprocs_pof2 >> 1; mask > 0; mask >>= 1) {
1191 int vdest = vrank ^ mask;
1192
1193 int dest = (vdest < nprocs_rem) ? vdest * 2 : vdest + nprocs_rem;
1194
1195
1196
1197
1198
1199 err = ompi_coll_base_sendrecv((char *)rbuf + (ptrdiff_t)rindex[step] * extent,
1200 rcount[step], dtype, dest,
1201 MCA_COLL_BASE_TAG_ALLREDUCE,
1202 (char *)rbuf + (ptrdiff_t)sindex[step] * extent,
1203 scount[step], dtype, dest,
1204 MCA_COLL_BASE_TAG_ALLREDUCE, comm,
1205 MPI_STATUS_IGNORE, rank);
1206 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
1207 step--;
1208 }
1209 }
1210
1211
1212
1213
1214 if (rank < 2 * nprocs_rem) {
1215 if (rank % 2 != 0) {
1216
1217 err = MCA_PML_CALL(recv(rbuf, count, dtype, rank - 1,
1218 MCA_COLL_BASE_TAG_ALLREDUCE, comm,
1219 MPI_STATUS_IGNORE));
1220 if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
1221
1222 } else {
1223
1224 err = MCA_PML_CALL(send(rbuf, count, dtype, rank + 1,
1225 MCA_COLL_BASE_TAG_ALLREDUCE,
1226 MCA_PML_BASE_SEND_STANDARD, comm));
1227 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
1228 }
1229 }
1230
1231 cleanup_and_return:
1232 if (NULL != tmp_buf_raw)
1233 free(tmp_buf_raw);
1234 if (NULL != rindex)
1235 free(rindex);
1236 if (NULL != sindex)
1237 free(sindex);
1238 if (NULL != rcount)
1239 free(rcount);
1240 if (NULL != scount)
1241 free(scount);
1242 return err;
1243 }
1244
1245