This source file includes following definitions.
- ompi_coll_base_allgatherv_intra_bruck
- ompi_coll_base_allgatherv_intra_ring
- ompi_coll_base_allgatherv_intra_neighborexchange
- ompi_coll_base_allgatherv_intra_two_procs
- ompi_coll_base_allgatherv_intra_basic_default
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 #include "ompi_config.h"
27
28 #include "mpi.h"
29 #include "ompi/constants.h"
30 #include "ompi/datatype/ompi_datatype.h"
31 #include "ompi/communicator/communicator.h"
32 #include "ompi/mca/coll/coll.h"
33 #include "ompi/mca/coll/base/coll_tags.h"
34 #include "ompi/mca/coll/base/coll_base_functions.h"
35 #include "coll_base_topo.h"
36 #include "coll_base_util.h"
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93 int ompi_coll_base_allgatherv_intra_bruck(const void *sbuf, int scount,
94 struct ompi_datatype_t *sdtype,
95 void *rbuf, const int *rcounts,
96 const int *rdispls,
97 struct ompi_datatype_t *rdtype,
98 struct ompi_communicator_t *comm,
99 mca_coll_base_module_t *module)
100 {
101 int line = -1, err = 0, rank, size, sendto, recvfrom, distance, blockcount, i;
102 int *new_rcounts = NULL, *new_rdispls = NULL, *new_scounts = NULL, *new_sdispls = NULL;
103 ptrdiff_t rlb, rext;
104 char *tmpsend = NULL, *tmprecv = NULL;
105 struct ompi_datatype_t *new_rdtype, *new_sdtype;
106
107 size = ompi_comm_size(comm);
108 rank = ompi_comm_rank(comm);
109
110 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
111 "coll:base:allgather_intra_bruck rank %d", rank));
112
113 err = ompi_datatype_get_extent (rdtype, &rlb, &rext);
114 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
115
116
117
118
119
120 tmprecv = (char*) rbuf + (ptrdiff_t)rdispls[rank] * rext;
121 if (MPI_IN_PLACE != sbuf) {
122 tmpsend = (char*) sbuf;
123 err = ompi_datatype_sndrcv(tmpsend, scount, sdtype,
124 tmprecv, rcounts[rank], rdtype);
125 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
126
127 }
128
129
130
131
132
133
134
135
136
137
138
139
140 blockcount = 1;
141 tmpsend = (char*) rbuf;
142
143 new_rcounts = (int*) calloc(4*size, sizeof(int));
144 if (NULL == new_rcounts) { err = -1; line = __LINE__; goto err_hndl; }
145 new_rdispls = new_rcounts + size;
146 new_scounts = new_rdispls + size;
147 new_sdispls = new_scounts + size;
148
149 for (distance = 1; distance < size; distance<<=1) {
150
151 recvfrom = (rank + distance) % size;
152 sendto = (rank - distance + size) % size;
153
154 if (distance <= (size >> 1)) {
155 blockcount = distance;
156 } else {
157 blockcount = size - distance;
158 }
159
160
161 for (i = 0; i < blockcount; i++) {
162 const int tmp_srank = (rank + i) % size;
163 const int tmp_rrank = (recvfrom + i) % size;
164 new_scounts[i] = rcounts[tmp_srank];
165 new_sdispls[i] = rdispls[tmp_srank];
166 new_rcounts[i] = rcounts[tmp_rrank];
167 new_rdispls[i] = rdispls[tmp_rrank];
168 }
169 err = ompi_datatype_create_indexed(blockcount, new_scounts, new_sdispls,
170 rdtype, &new_sdtype);
171 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
172 err = ompi_datatype_create_indexed(blockcount, new_rcounts, new_rdispls,
173 rdtype, &new_rdtype);
174
175 err = ompi_datatype_commit(&new_sdtype);
176 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
177 err = ompi_datatype_commit(&new_rdtype);
178 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
179
180
181 err = ompi_coll_base_sendrecv(rbuf, 1, new_sdtype, sendto,
182 MCA_COLL_BASE_TAG_ALLGATHERV,
183 rbuf, 1, new_rdtype, recvfrom,
184 MCA_COLL_BASE_TAG_ALLGATHERV,
185 comm, MPI_STATUS_IGNORE, rank);
186 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
187
188 ompi_datatype_destroy(&new_sdtype);
189 ompi_datatype_destroy(&new_rdtype);
190 }
191
192 free(new_rcounts);
193
194 return OMPI_SUCCESS;
195
196 err_hndl:
197 if( NULL != new_rcounts ) free(new_rcounts);
198
199 OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "%s:%4d\tError occurred %d, rank %2d",
200 __FILE__, line, err, rank));
201 (void)line;
202 return err;
203 }
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221 int ompi_coll_base_allgatherv_intra_ring(const void *sbuf, int scount,
222 struct ompi_datatype_t *sdtype,
223 void* rbuf, const int *rcounts, const int *rdisps,
224 struct ompi_datatype_t *rdtype,
225 struct ompi_communicator_t *comm,
226 mca_coll_base_module_t *module)
227 {
228 int line = -1, rank, size, sendto, recvfrom, i, recvdatafrom, senddatafrom, err = 0;
229 ptrdiff_t rlb, rext;
230 char *tmpsend = NULL, *tmprecv = NULL;
231
232 size = ompi_comm_size(comm);
233 rank = ompi_comm_rank(comm);
234
235 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
236 "coll:base:allgatherv_intra_ring rank %d", rank));
237
238 err = ompi_datatype_get_extent (rdtype, &rlb, &rext);
239 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
240
241
242
243
244
245 tmprecv = (char*) rbuf + (ptrdiff_t)rdisps[rank] * rext;
246 if (MPI_IN_PLACE != sbuf) {
247 tmpsend = (char*) sbuf;
248 err = ompi_datatype_sndrcv(tmpsend, scount, sdtype,
249 tmprecv, rcounts[rank], rdtype);
250 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
251 }
252
253
254
255
256
257
258
259
260
261 sendto = (rank + 1) % size;
262 recvfrom = (rank - 1 + size) % size;
263
264 for (i = 0; i < size - 1; i++) {
265 recvdatafrom = (rank - i - 1 + size) % size;
266 senddatafrom = (rank - i + size) % size;
267
268 tmprecv = (char*)rbuf + rdisps[recvdatafrom] * rext;
269 tmpsend = (char*)rbuf + rdisps[senddatafrom] * rext;
270
271
272 err = ompi_coll_base_sendrecv(tmpsend, rcounts[senddatafrom], rdtype,
273 sendto, MCA_COLL_BASE_TAG_ALLGATHERV,
274 tmprecv, rcounts[recvdatafrom], rdtype,
275 recvfrom, MCA_COLL_BASE_TAG_ALLGATHERV,
276 comm, MPI_STATUS_IGNORE, rank);
277 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
278 }
279
280 return OMPI_SUCCESS;
281
282 err_hndl:
283 OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "%s:%4d\tError occurred %d, rank %2d",
284 __FILE__, line, err, rank));
285 (void)line;
286 return err;
287 }
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347 int
348 ompi_coll_base_allgatherv_intra_neighborexchange(const void *sbuf, int scount,
349 struct ompi_datatype_t *sdtype,
350 void* rbuf, const int *rcounts, const int *rdispls,
351 struct ompi_datatype_t *rdtype,
352 struct ompi_communicator_t *comm,
353 mca_coll_base_module_t *module)
354 {
355 int line = -1, rank, size, i, even_rank, err = 0;
356 int neighbor[2], offset_at_step[2], recv_data_from[2], send_data_from;
357 int new_scounts[2], new_sdispls[2], new_rcounts[2], new_rdispls[2];
358 ptrdiff_t rlb, rext;
359 char *tmpsend = NULL, *tmprecv = NULL;
360 struct ompi_datatype_t *new_rdtype, *new_sdtype;
361
362 size = ompi_comm_size(comm);
363 rank = ompi_comm_rank(comm);
364
365 if (size % 2) {
366 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
367 "coll:base:allgatherv_intra_neighborexchange WARNING: odd size %d, switching to ring algorithm",
368 size));
369 return ompi_coll_base_allgatherv_intra_ring(sbuf, scount, sdtype,
370 rbuf, rcounts,
371 rdispls, rdtype,
372 comm, module);
373 }
374
375 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
376 "coll:base:allgatherv_intra_neighborexchange rank %d", rank));
377
378 err = ompi_datatype_get_extent (rdtype, &rlb, &rext);
379 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
380
381
382
383
384
385 tmprecv = (char*) rbuf + (ptrdiff_t)rdispls[rank] * rext;
386 if (MPI_IN_PLACE != sbuf) {
387 tmpsend = (char*) sbuf;
388 err = ompi_datatype_sndrcv(tmpsend, scount, sdtype,
389 tmprecv, rcounts[rank], rdtype);
390 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
391 }
392
393
394 even_rank = !(rank % 2);
395 if (even_rank) {
396 neighbor[0] = (rank + 1) % size;
397 neighbor[1] = (rank - 1 + size) % size;
398 recv_data_from[0] = rank;
399 recv_data_from[1] = rank;
400 offset_at_step[0] = (+2);
401 offset_at_step[1] = (-2);
402 } else {
403 neighbor[0] = (rank - 1 + size) % size;
404 neighbor[1] = (rank + 1) % size;
405 recv_data_from[0] = neighbor[0];
406 recv_data_from[1] = neighbor[0];
407 offset_at_step[0] = (-2);
408 offset_at_step[1] = (+2);
409 }
410
411
412
413
414
415
416
417
418
419
420 tmprecv = (char*)rbuf + (ptrdiff_t)rdispls[neighbor[0]] * rext;
421 tmpsend = (char*)rbuf + (ptrdiff_t)rdispls[rank] * rext;
422 err = ompi_coll_base_sendrecv(tmpsend, rcounts[rank], rdtype,
423 neighbor[0], MCA_COLL_BASE_TAG_ALLGATHERV,
424 tmprecv, rcounts[neighbor[0]], rdtype,
425 neighbor[0], MCA_COLL_BASE_TAG_ALLGATHERV,
426 comm, MPI_STATUS_IGNORE, rank);
427 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
428
429
430 if (even_rank) {
431 send_data_from = rank;
432 } else {
433 send_data_from = recv_data_from[0];
434 }
435
436 for (i = 1; i < (size / 2); i++) {
437 const int i_parity = i % 2;
438 recv_data_from[i_parity] =
439 (recv_data_from[i_parity] + offset_at_step[i_parity] + size) % size;
440
441
442
443
444
445
446 new_scounts[0] = rcounts[send_data_from];
447 new_scounts[1] = rcounts[(send_data_from + 1)];
448 new_sdispls[0] = rdispls[send_data_from];
449 new_sdispls[1] = rdispls[(send_data_from + 1)];
450 err = ompi_datatype_create_indexed(2, new_scounts, new_sdispls, rdtype,
451 &new_sdtype);
452 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
453 err = ompi_datatype_commit(&new_sdtype);
454 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
455
456 new_rcounts[0] = rcounts[recv_data_from[i_parity]];
457 new_rcounts[1] = rcounts[(recv_data_from[i_parity] + 1)];
458 new_rdispls[0] = rdispls[recv_data_from[i_parity]];
459 new_rdispls[1] = rdispls[(recv_data_from[i_parity] + 1)];
460 err = ompi_datatype_create_indexed(2, new_rcounts, new_rdispls, rdtype,
461 &new_rdtype);
462 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
463 err = ompi_datatype_commit(&new_rdtype);
464 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
465
466 tmprecv = (char*)rbuf;
467 tmpsend = (char*)rbuf;
468
469
470 err = ompi_coll_base_sendrecv(tmpsend, 1, new_sdtype, neighbor[i_parity],
471 MCA_COLL_BASE_TAG_ALLGATHERV,
472 tmprecv, 1, new_rdtype, neighbor[i_parity],
473 MCA_COLL_BASE_TAG_ALLGATHERV,
474 comm, MPI_STATUS_IGNORE, rank);
475 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
476
477 send_data_from = recv_data_from[i_parity];
478
479 ompi_datatype_destroy(&new_sdtype);
480 ompi_datatype_destroy(&new_rdtype);
481 }
482
483 return OMPI_SUCCESS;
484
485 err_hndl:
486 OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "%s:%4d\tError occurred %d, rank %2d",
487 __FILE__, line, err, rank));
488 (void)line;
489 return err;
490 }
491
492
493 int ompi_coll_base_allgatherv_intra_two_procs(const void *sbuf, int scount,
494 struct ompi_datatype_t *sdtype,
495 void* rbuf, const int *rcounts,
496 const int *rdispls,
497 struct ompi_datatype_t *rdtype,
498 struct ompi_communicator_t *comm,
499 mca_coll_base_module_t *module)
500 {
501 int line = -1, err = 0, rank, remote;
502 char *tmpsend = NULL, *tmprecv = NULL;
503 ptrdiff_t rext, lb;
504
505 rank = ompi_comm_rank(comm);
506
507 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
508 "ompi_coll_base_allgatherv_intra_two_procs rank %d", rank));
509
510 if (2 != ompi_comm_size(comm)) {
511 return MPI_ERR_UNSUPPORTED_OPERATION;
512 }
513
514 err = ompi_datatype_get_extent (rdtype, &lb, &rext);
515 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
516
517
518
519
520
521 remote = rank ^ 0x1;
522
523 tmpsend = (char*)sbuf;
524 if (MPI_IN_PLACE == sbuf) {
525 tmpsend = (char*)rbuf + (ptrdiff_t)rdispls[rank] * rext;
526 scount = rcounts[rank];
527 sdtype = rdtype;
528 }
529 tmprecv = (char*)rbuf + (ptrdiff_t)rdispls[remote] * rext;
530
531 err = ompi_coll_base_sendrecv(tmpsend, scount, sdtype, remote,
532 MCA_COLL_BASE_TAG_ALLGATHERV,
533 tmprecv, rcounts[remote], rdtype, remote,
534 MCA_COLL_BASE_TAG_ALLGATHERV,
535 comm, MPI_STATUS_IGNORE, rank);
536 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
537
538
539 if (MPI_IN_PLACE != sbuf) {
540 err = ompi_datatype_sndrcv((char*)sbuf, scount, sdtype,
541 (char*)rbuf + (ptrdiff_t)rdispls[rank] * rext,
542 rcounts[rank], rdtype);
543 if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
544 }
545
546 return MPI_SUCCESS;
547
548 err_hndl:
549 OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "%s:%4d\tError occurred %d, rank %2d",
550 __FILE__, line, err, rank));
551 (void)line;
552 return err;
553 }
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578 int
579 ompi_coll_base_allgatherv_intra_basic_default(const void *sbuf, int scount,
580 struct ompi_datatype_t *sdtype,
581 void *rbuf, const int *rcounts,
582 const int *disps,
583 struct ompi_datatype_t *rdtype,
584 struct ompi_communicator_t *comm,
585 mca_coll_base_module_t *module)
586 {
587 int size, rank, err;
588 MPI_Aint extent, lb;
589 char *send_buf = NULL;
590 struct ompi_datatype_t *newtype, *send_type;
591
592 size = ompi_comm_size(comm);
593 rank = ompi_comm_rank(comm);
594
595
596
597
598
599 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
600 "ompi_coll_base_allgatherv_intra_basic_default rank %d",
601 rank));
602
603 if (MPI_IN_PLACE == sbuf) {
604 ompi_datatype_get_extent(rdtype, &lb, &extent);
605 send_type = rdtype;
606 send_buf = (char*)rbuf;
607 send_buf += ((ptrdiff_t)disps[rank] * extent);
608 scount = rcounts[rank];
609 } else {
610 send_buf = (char*)sbuf;
611 send_type = sdtype;
612 }
613
614 err = comm->c_coll->coll_gatherv(send_buf,
615 scount, send_type,rbuf,
616 rcounts, disps, rdtype, 0,
617 comm, comm->c_coll->coll_gatherv_module);
618 if (MPI_SUCCESS != err) {
619 return err;
620 }
621
622
623
624
625
626
627
628
629
630
631
632
633
634 err = ompi_datatype_create_indexed(size,rcounts,disps,rdtype,&newtype);
635 if (MPI_SUCCESS != err) {
636 return err;
637 }
638
639 err = ompi_datatype_commit(&newtype);
640 if(MPI_SUCCESS != err) {
641 return err;
642 }
643
644 comm->c_coll->coll_bcast(rbuf, 1, newtype, 0, comm,
645 comm->c_coll->coll_bcast_module);
646
647 ompi_datatype_destroy (&newtype);
648
649 return MPI_SUCCESS;
650 }
651
652