This source file includes following definitions.
- ompi_coll_base_allgather_intra_bruck
- ompi_coll_base_allgather_intra_recursivedoubling
- ompi_coll_base_allgather_intra_ring
- ompi_coll_base_allgather_intra_neighborexchange
- ompi_coll_base_allgather_intra_two_procs
- ompi_coll_base_allgather_intra_basic_linear
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 #include "ompi_config.h"
25
26 #include "mpi.h"
27 #include "opal/util/bit_ops.h"
28 #include "ompi/constants.h"
29 #include "ompi/datatype/ompi_datatype.h"
30 #include "ompi/communicator/communicator.h"
31 #include "ompi/mca/coll/coll.h"
32 #include "ompi/mca/coll/base/coll_tags.h"
33 #include "ompi/mca/coll/base/coll_base_functions.h"
34 #include "coll_base_topo.h"
35 #include "coll_base_util.h"
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85 int ompi_coll_base_allgather_intra_bruck(const void *sbuf, int scount,
86 struct ompi_datatype_t *sdtype,
87 void* rbuf, int rcount,
88 struct ompi_datatype_t *rdtype,
89 struct ompi_communicator_t *comm,
90 mca_coll_base_module_t *module)
91 {
92 int line = -1, rank, size, sendto, recvfrom, distance, blockcount, err = 0;
93 ptrdiff_t rlb, rext;
94 char *tmpsend = NULL, *tmprecv = NULL;
95
96 size = ompi_comm_size(comm);
97 rank = ompi_comm_rank(comm);
98
99 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
100 "coll:base:allgather_intra_bruck rank %d", rank));
101
102 err = ompi_datatype_get_extent (rdtype, &rlb, &rext);
103 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
104
105
106
107
108
109
110 tmprecv = (char*) rbuf;
111 if (MPI_IN_PLACE != sbuf) {
112 tmpsend = (char*) sbuf;
113 err = ompi_datatype_sndrcv(tmpsend, scount, sdtype, tmprecv, rcount, rdtype);
114 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
115
116 } else if (0 != rank) {
117 tmpsend = ((char*)rbuf) + (ptrdiff_t)rank * (ptrdiff_t)rcount * rext;
118 err = ompi_datatype_copy_content_same_ddt(rdtype, rcount, tmprecv, tmpsend);
119 if (err < 0) { line = __LINE__; goto err_hndl; }
120 }
121
122
123
124
125
126
127
128
129
130
131
132 blockcount = 1;
133 tmpsend = (char*) rbuf;
134 for (distance = 1; distance < size; distance<<=1) {
135
136 recvfrom = (rank + distance) % size;
137 sendto = (rank - distance + size) % size;
138
139 tmprecv = tmpsend + (ptrdiff_t)distance * (ptrdiff_t)rcount * rext;
140
141 if (distance <= (size >> 1)) {
142 blockcount = distance;
143 } else {
144 blockcount = size - distance;
145 }
146
147
148 err = ompi_coll_base_sendrecv(tmpsend, blockcount * rcount, rdtype,
149 sendto, MCA_COLL_BASE_TAG_ALLGATHER,
150 tmprecv, blockcount * rcount, rdtype,
151 recvfrom, MCA_COLL_BASE_TAG_ALLGATHER,
152 comm, MPI_STATUS_IGNORE, rank);
153 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
154
155 }
156
157
158
159
160
161
162
163
164
165
166 if (0 != rank) {
167 char *free_buf = NULL, *shift_buf = NULL;
168 ptrdiff_t span, gap = 0;
169
170 span = opal_datatype_span(&rdtype->super, (int64_t)(size - rank) * rcount, &gap);
171
172 free_buf = (char*)calloc(span, sizeof(char));
173 if (NULL == free_buf) {
174 line = __LINE__; err = OMPI_ERR_OUT_OF_RESOURCE; goto err_hndl;
175 }
176 shift_buf = free_buf - gap;
177
178
179 err = ompi_datatype_copy_content_same_ddt(rdtype, ((ptrdiff_t)(size - rank) * (ptrdiff_t)rcount),
180 shift_buf, rbuf);
181 if (err < 0) { line = __LINE__; goto err_hndl; }
182
183
184 tmpsend = (char*) rbuf + (ptrdiff_t)(size - rank) * (ptrdiff_t)rcount * rext;
185 err = ompi_datatype_copy_content_same_ddt(rdtype, (ptrdiff_t)rank * (ptrdiff_t)rcount,
186 rbuf, tmpsend);
187 if (err < 0) { line = __LINE__; goto err_hndl; }
188
189
190 tmprecv = (char*) rbuf + (ptrdiff_t)rank * (ptrdiff_t)rcount * rext;
191 err = ompi_datatype_copy_content_same_ddt(rdtype, (ptrdiff_t)(size - rank) * (ptrdiff_t)rcount,
192 tmprecv, shift_buf);
193 if (err < 0) { line = __LINE__; goto err_hndl; }
194
195 free(free_buf);
196 }
197
198 return OMPI_SUCCESS;
199
200 err_hndl:
201 OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "%s:%4d\tError occurred %d, rank %2d",
202 __FILE__, line, err, rank));
203 (void)line;
204 return err;
205 }
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252 int
253 ompi_coll_base_allgather_intra_recursivedoubling(const void *sbuf, int scount,
254 struct ompi_datatype_t *sdtype,
255 void* rbuf, int rcount,
256 struct ompi_datatype_t *rdtype,
257 struct ompi_communicator_t *comm,
258 mca_coll_base_module_t *module)
259 {
260 int line = -1, rank, size, pow2size, err;
261 int remote, distance, sendblocklocation;
262 ptrdiff_t rlb, rext;
263 char *tmpsend = NULL, *tmprecv = NULL;
264
265 size = ompi_comm_size(comm);
266 rank = ompi_comm_rank(comm);
267
268 pow2size = opal_next_poweroftwo (size);
269 pow2size >>=1;
270
271
272
273
274
275 if (pow2size != size) {
276 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
277 "coll:base:allgather_intra_recursivedoubling WARNING: non-pow-2 size %d, switching to bruck algorithm",
278 size));
279
280 return ompi_coll_base_allgather_intra_bruck(sbuf, scount, sdtype,
281 rbuf, rcount, rdtype,
282 comm, module);
283 }
284
285 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
286 "coll:base:allgather_intra_recursivedoubling rank %d, size %d",
287 rank, size));
288
289 err = ompi_datatype_get_extent (rdtype, &rlb, &rext);
290 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
291
292
293
294
295
296 if (MPI_IN_PLACE != sbuf) {
297 tmpsend = (char*) sbuf;
298 tmprecv = (char*) rbuf + (ptrdiff_t)rank * (ptrdiff_t)rcount * rext;
299 err = ompi_datatype_sndrcv(tmpsend, scount, sdtype, tmprecv, rcount, rdtype);
300 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
301
302 }
303
304
305
306
307
308
309 sendblocklocation = rank;
310 for (distance = 0x1; distance < size; distance<<=1) {
311 remote = rank ^ distance;
312
313 if (rank < remote) {
314 tmpsend = (char*)rbuf + (ptrdiff_t)sendblocklocation * (ptrdiff_t)rcount * rext;
315 tmprecv = (char*)rbuf + (ptrdiff_t)(sendblocklocation + distance) * (ptrdiff_t)rcount * rext;
316 } else {
317 tmpsend = (char*)rbuf + (ptrdiff_t)sendblocklocation * (ptrdiff_t)rcount * rext;
318 tmprecv = (char*)rbuf + (ptrdiff_t)(sendblocklocation - distance) * (ptrdiff_t)rcount * rext;
319 sendblocklocation -= distance;
320 }
321
322
323 err = ompi_coll_base_sendrecv(tmpsend, (ptrdiff_t)distance * (ptrdiff_t)rcount, rdtype,
324 remote, MCA_COLL_BASE_TAG_ALLGATHER,
325 tmprecv, (ptrdiff_t)distance * (ptrdiff_t)rcount, rdtype,
326 remote, MCA_COLL_BASE_TAG_ALLGATHER,
327 comm, MPI_STATUS_IGNORE, rank);
328 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
329
330 }
331
332 return OMPI_SUCCESS;
333
334 err_hndl:
335 OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "%s:%4d\tError occurred %d, rank %2d",
336 __FILE__, line, err, rank));
337 (void)line;
338 return err;
339 }
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358 int ompi_coll_base_allgather_intra_ring(const void *sbuf, int scount,
359 struct ompi_datatype_t *sdtype,
360 void* rbuf, int rcount,
361 struct ompi_datatype_t *rdtype,
362 struct ompi_communicator_t *comm,
363 mca_coll_base_module_t *module)
364 {
365 int line = -1, rank, size, err, sendto, recvfrom, i, recvdatafrom, senddatafrom;
366 ptrdiff_t rlb, rext;
367 char *tmpsend = NULL, *tmprecv = NULL;
368
369 size = ompi_comm_size(comm);
370 rank = ompi_comm_rank(comm);
371
372 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
373 "coll:base:allgather_intra_ring rank %d", rank));
374
375 err = ompi_datatype_get_extent (rdtype, &rlb, &rext);
376 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
377
378
379
380
381
382 tmprecv = (char*) rbuf + (ptrdiff_t)rank * (ptrdiff_t)rcount * rext;
383 if (MPI_IN_PLACE != sbuf) {
384 tmpsend = (char*) sbuf;
385 err = ompi_datatype_sndrcv(tmpsend, scount, sdtype, tmprecv, rcount, rdtype);
386 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
387 }
388
389
390
391
392
393
394
395
396
397 sendto = (rank + 1) % size;
398 recvfrom = (rank - 1 + size) % size;
399
400 for (i = 0; i < size - 1; i++) {
401 recvdatafrom = (rank - i - 1 + size) % size;
402 senddatafrom = (rank - i + size) % size;
403
404 tmprecv = (char*)rbuf + (ptrdiff_t)recvdatafrom * (ptrdiff_t)rcount * rext;
405 tmpsend = (char*)rbuf + (ptrdiff_t)senddatafrom * (ptrdiff_t)rcount * rext;
406
407
408 err = ompi_coll_base_sendrecv(tmpsend, rcount, rdtype, sendto,
409 MCA_COLL_BASE_TAG_ALLGATHER,
410 tmprecv, rcount, rdtype, recvfrom,
411 MCA_COLL_BASE_TAG_ALLGATHER,
412 comm, MPI_STATUS_IGNORE, rank);
413 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
414
415 }
416
417 return OMPI_SUCCESS;
418
419 err_hndl:
420 OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "%s:%4d\tError occurred %d, rank %2d",
421 __FILE__, line, err, rank));
422 (void)line;
423 return err;
424 }
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483 int
484 ompi_coll_base_allgather_intra_neighborexchange(const void *sbuf, int scount,
485 struct ompi_datatype_t *sdtype,
486 void* rbuf, int rcount,
487 struct ompi_datatype_t *rdtype,
488 struct ompi_communicator_t *comm,
489 mca_coll_base_module_t *module)
490 {
491 int line = -1, rank, size, i, even_rank, err;
492 int neighbor[2], offset_at_step[2], recv_data_from[2], send_data_from;
493 ptrdiff_t rlb, rext;
494 char *tmpsend = NULL, *tmprecv = NULL;
495
496 size = ompi_comm_size(comm);
497 rank = ompi_comm_rank(comm);
498
499 if (size % 2) {
500 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
501 "coll:base:allgather_intra_neighborexchange WARNING: odd size %d, switching to ring algorithm",
502 size));
503 return ompi_coll_base_allgather_intra_ring(sbuf, scount, sdtype,
504 rbuf, rcount, rdtype,
505 comm, module);
506 }
507
508 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
509 "coll:base:allgather_intra_neighborexchange rank %d", rank));
510
511 err = ompi_datatype_get_extent (rdtype, &rlb, &rext);
512 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
513
514
515
516
517
518 tmprecv = (char*) rbuf + (ptrdiff_t)rank *(ptrdiff_t) rcount * rext;
519 if (MPI_IN_PLACE != sbuf) {
520 tmpsend = (char*) sbuf;
521 err = ompi_datatype_sndrcv(tmpsend, scount, sdtype, tmprecv, rcount, rdtype);
522 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
523 }
524
525
526 even_rank = !(rank % 2);
527 if (even_rank) {
528 neighbor[0] = (rank + 1) % size;
529 neighbor[1] = (rank - 1 + size) % size;
530 recv_data_from[0] = rank;
531 recv_data_from[1] = rank;
532 offset_at_step[0] = (+2);
533 offset_at_step[1] = (-2);
534 } else {
535 neighbor[0] = (rank - 1 + size) % size;
536 neighbor[1] = (rank + 1) % size;
537 recv_data_from[0] = neighbor[0];
538 recv_data_from[1] = neighbor[0];
539 offset_at_step[0] = (-2);
540 offset_at_step[1] = (+2);
541 }
542
543
544
545
546
547
548
549
550 tmprecv = (char*)rbuf + (ptrdiff_t)neighbor[0] * (ptrdiff_t)rcount * rext;
551 tmpsend = (char*)rbuf + (ptrdiff_t)rank * (ptrdiff_t)rcount * rext;
552
553 err = ompi_coll_base_sendrecv(tmpsend, rcount, rdtype, neighbor[0],
554 MCA_COLL_BASE_TAG_ALLGATHER,
555 tmprecv, rcount, rdtype, neighbor[0],
556 MCA_COLL_BASE_TAG_ALLGATHER,
557 comm, MPI_STATUS_IGNORE, rank);
558 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
559
560
561 if (even_rank) {
562 send_data_from = rank;
563 } else {
564 send_data_from = recv_data_from[0];
565 }
566
567 for (i = 1; i < (size / 2); i++) {
568 const int i_parity = i % 2;
569 recv_data_from[i_parity] =
570 (recv_data_from[i_parity] + offset_at_step[i_parity] + size) % size;
571
572 tmprecv = (char*)rbuf + (ptrdiff_t)recv_data_from[i_parity] * (ptrdiff_t)rcount * rext;
573 tmpsend = (char*)rbuf + (ptrdiff_t)send_data_from * rcount * rext;
574
575
576 err = ompi_coll_base_sendrecv(tmpsend, (ptrdiff_t)2 * (ptrdiff_t)rcount, rdtype,
577 neighbor[i_parity],
578 MCA_COLL_BASE_TAG_ALLGATHER,
579 tmprecv, (ptrdiff_t)2 * (ptrdiff_t)rcount, rdtype,
580 neighbor[i_parity],
581 MCA_COLL_BASE_TAG_ALLGATHER,
582 comm, MPI_STATUS_IGNORE, rank);
583 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
584
585 send_data_from = recv_data_from[i_parity];
586 }
587
588 return OMPI_SUCCESS;
589
590 err_hndl:
591 OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "%s:%4d\tError occurred %d, rank %2d",
592 __FILE__, line, err, rank));
593 (void)line;
594 return err;
595 }
596
597
598 int ompi_coll_base_allgather_intra_two_procs(const void *sbuf, int scount,
599 struct ompi_datatype_t *sdtype,
600 void* rbuf, int rcount,
601 struct ompi_datatype_t *rdtype,
602 struct ompi_communicator_t *comm,
603 mca_coll_base_module_t *module)
604 {
605 int line = -1, err, rank, remote;
606 char *tmpsend = NULL, *tmprecv = NULL;
607 ptrdiff_t rext, lb;
608
609 rank = ompi_comm_rank(comm);
610
611 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
612 "ompi_coll_base_allgather_intra_two_procs rank %d", rank));
613
614 if (2 != ompi_comm_size(comm)) {
615 return MPI_ERR_UNSUPPORTED_OPERATION;
616 }
617
618 err = ompi_datatype_get_extent (rdtype, &lb, &rext);
619 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
620
621
622
623
624
625 remote = rank ^ 0x1;
626
627 tmpsend = (char*)sbuf;
628 if (MPI_IN_PLACE == sbuf) {
629 tmpsend = (char*)rbuf + (ptrdiff_t)rank * (ptrdiff_t)rcount * rext;
630 scount = rcount;
631 sdtype = rdtype;
632 }
633 tmprecv = (char*)rbuf + (ptrdiff_t)remote * (ptrdiff_t)rcount * rext;
634
635 err = ompi_coll_base_sendrecv(tmpsend, scount, sdtype, remote,
636 MCA_COLL_BASE_TAG_ALLGATHER,
637 tmprecv, rcount, rdtype, remote,
638 MCA_COLL_BASE_TAG_ALLGATHER,
639 comm, MPI_STATUS_IGNORE, rank);
640 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
641
642
643 if (MPI_IN_PLACE != sbuf) {
644 err = ompi_datatype_sndrcv((char*)sbuf, scount, sdtype,
645 (char*)rbuf + (ptrdiff_t)rank * (ptrdiff_t)rcount * rext, rcount, rdtype);
646 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
647 }
648
649 return MPI_SUCCESS;
650
651 err_hndl:
652 OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "%s:%4d\tError occurred %d, rank %2d",
653 __FILE__, line, err, rank));
654 (void)line;
655 return err;
656 }
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680 int
681 ompi_coll_base_allgather_intra_basic_linear(const void *sbuf, int scount,
682 struct ompi_datatype_t *sdtype,
683 void *rbuf,
684 int rcount,
685 struct ompi_datatype_t *rdtype,
686 struct ompi_communicator_t *comm,
687 mca_coll_base_module_t *module)
688 {
689 int err;
690 ptrdiff_t lb, extent;
691
692
693
694
695
696
697 if (MPI_IN_PLACE == sbuf && 0 != ompi_comm_rank(comm)) {
698 ompi_datatype_get_extent(rdtype, &lb, &extent);
699 sbuf = ((char*) rbuf) + (ompi_comm_rank(comm) * extent * rcount);
700 sdtype = rdtype;
701 scount = rcount;
702 }
703
704
705
706 err = comm->c_coll->coll_gather(sbuf, scount, sdtype,
707 rbuf, rcount, rdtype,
708 0, comm, comm->c_coll->coll_gather_module);
709 if (MPI_SUCCESS == err) {
710 size_t length = (ptrdiff_t)rcount * ompi_comm_size(comm);
711 if( length < (size_t)INT_MAX ) {
712 err = comm->c_coll->coll_bcast(rbuf, (ptrdiff_t)rcount * ompi_comm_size(comm), rdtype,
713 0, comm, comm->c_coll->coll_bcast_module);
714 } else {
715 ompi_datatype_t* temptype;
716 ompi_datatype_create_contiguous(ompi_comm_size(comm), rdtype, &temptype);
717 ompi_datatype_commit(&temptype);
718 err = comm->c_coll->coll_bcast(rbuf, rcount, temptype,
719 0, comm, comm->c_coll->coll_bcast_module);
720 ompi_datatype_destroy(&temptype);
721 }
722 }
723
724
725
726 return err;
727 }
728
729