This source file includes following definitions.
- mca_coll_base_reduce_local
- ompi_coll_base_reduce_generic
- ompi_coll_base_reduce_intra_chain
- ompi_coll_base_reduce_intra_pipeline
- ompi_coll_base_reduce_intra_binary
- ompi_coll_base_reduce_intra_binomial
- ompi_coll_base_reduce_intra_in_order_binary
- ompi_coll_base_reduce_intra_basic_linear
- ompi_coll_base_reduce_intra_redscat_gather
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 #include "ompi_config.h"
28
29 #include "mpi.h"
30 #include "opal/util/bit_ops.h"
31 #include "ompi/constants.h"
32 #include "ompi/datatype/ompi_datatype.h"
33 #include "ompi/communicator/communicator.h"
34 #include "ompi/mca/coll/coll.h"
35 #include "ompi/mca/coll/base/coll_tags.h"
36 #include "ompi/mca/pml/pml.h"
37 #include "ompi/op/op.h"
38 #include "ompi/mca/coll/base/coll_base_functions.h"
39 #include "coll_base_topo.h"
40 #include "coll_base_util.h"
41
42 int mca_coll_base_reduce_local(const void *inbuf, void *inoutbuf, int count,
43 struct ompi_datatype_t * dtype, struct ompi_op_t * op,
44 mca_coll_base_module_t *module)
45 {
46
47 ompi_op_reduce(op, (void *)inbuf, inoutbuf, count, dtype);
48 return OMPI_SUCCESS;
49 }
50
51
52
53
54
55
56
57
58
59
60
61
62 int ompi_coll_base_reduce_generic( const void* sendbuf, void* recvbuf, int original_count,
63 ompi_datatype_t* datatype, ompi_op_t* op,
64 int root, ompi_communicator_t* comm,
65 mca_coll_base_module_t *module,
66 ompi_coll_tree_t* tree, int count_by_segment,
67 int max_outstanding_reqs )
68 {
69 char *inbuf[2] = {NULL, NULL}, *inbuf_free[2] = {NULL, NULL};
70 char *accumbuf = NULL, *accumbuf_free = NULL;
71 char *local_op_buffer = NULL, *sendtmpbuf = NULL;
72 ptrdiff_t extent, size, gap = 0, segment_increment;
73 ompi_request_t **sreq = NULL, *reqs[2] = {MPI_REQUEST_NULL, MPI_REQUEST_NULL};
74 int num_segments, line, ret, segindex, i, rank;
75 int recvcount, prevcount, inbi;
76
77
78
79
80
81 ompi_datatype_type_extent( datatype, &extent );
82 num_segments = (int)(((size_t)original_count + (size_t)count_by_segment - (size_t)1) / (size_t)count_by_segment);
83 segment_increment = (ptrdiff_t)count_by_segment * extent;
84
85 sendtmpbuf = (char*) sendbuf;
86 if( sendbuf == MPI_IN_PLACE ) {
87 sendtmpbuf = (char *)recvbuf;
88 }
89
90 OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "coll:base:reduce_generic count %d, msg size %ld, segsize %ld, max_requests %d",
91 original_count, (unsigned long)((ptrdiff_t)num_segments * (ptrdiff_t)segment_increment),
92 (unsigned long)segment_increment, max_outstanding_reqs));
93
94 rank = ompi_comm_rank(comm);
95
96
97
98 if( tree->tree_nextsize > 0 ) {
99 ptrdiff_t real_segment_size;
100
101
102
103 accumbuf = (char*)recvbuf;
104 if( (NULL == accumbuf) || (root != rank) ) {
105
106 size = opal_datatype_span(&datatype->super, original_count, &gap);
107 accumbuf_free = (char*)malloc(size);
108 if (accumbuf_free == NULL) {
109 line = __LINE__; ret = -1; goto error_hndl;
110 }
111 accumbuf = accumbuf_free - gap;
112 }
113
114
115
116
117 if (!ompi_op_is_commute(op) && MPI_IN_PLACE != sendbuf) {
118 ompi_datatype_copy_content_same_ddt(datatype, original_count,
119 (char*)accumbuf,
120 (char*)sendtmpbuf);
121 }
122
123 real_segment_size = opal_datatype_span(&datatype->super, count_by_segment, &gap);
124 inbuf_free[0] = (char*) malloc(real_segment_size);
125 if( inbuf_free[0] == NULL ) {
126 line = __LINE__; ret = -1; goto error_hndl;
127 }
128 inbuf[0] = inbuf_free[0] - gap;
129
130
131 if( (num_segments > 1) || (tree->tree_nextsize > 1) ) {
132 inbuf_free[1] = (char*) malloc(real_segment_size);
133 if( inbuf_free[1] == NULL ) {
134 line = __LINE__; ret = -1; goto error_hndl;
135 }
136 inbuf[1] = inbuf_free[1] - gap;
137 }
138
139
140 inbi = 0;
141 recvcount = 0;
142
143 for( segindex = 0; segindex <= num_segments; segindex++ ) {
144 prevcount = recvcount;
145
146 recvcount = count_by_segment;
147 if( segindex == (num_segments-1) )
148 recvcount = original_count - (ptrdiff_t)count_by_segment * (ptrdiff_t)segindex;
149
150
151 for( i = 0; i < tree->tree_nextsize; i++ ) {
152
153
154
155
156
157 if( segindex < num_segments ) {
158 void* local_recvbuf = inbuf[inbi];
159 if( 0 == i ) {
160
161
162
163
164
165
166
167
168
169
170 if( (ompi_op_is_commute(op)) &&
171 !((MPI_IN_PLACE == sendbuf) && (rank == tree->tree_root)) ) {
172 local_recvbuf = accumbuf + (ptrdiff_t)segindex * (ptrdiff_t)segment_increment;
173 }
174 }
175
176 ret = MCA_PML_CALL(irecv(local_recvbuf, recvcount, datatype,
177 tree->tree_next[i],
178 MCA_COLL_BASE_TAG_REDUCE, comm,
179 &reqs[inbi]));
180 if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl;}
181 }
182
183
184
185
186 ret = ompi_request_wait(&reqs[inbi ^ 1],
187 MPI_STATUSES_IGNORE );
188 if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
189 local_op_buffer = inbuf[inbi ^ 1];
190 if( i > 0 ) {
191
192
193
194
195
196 if( 1 == i ) {
197 if( (ompi_op_is_commute(op)) &&
198 !((MPI_IN_PLACE == sendbuf) && (rank == tree->tree_root)) ) {
199 local_op_buffer = sendtmpbuf + (ptrdiff_t)segindex * (ptrdiff_t)segment_increment;
200 }
201 }
202
203 ompi_op_reduce(op, local_op_buffer,
204 accumbuf + (ptrdiff_t)segindex * (ptrdiff_t)segment_increment,
205 recvcount, datatype );
206 } else if ( segindex > 0 ) {
207 void* accumulator = accumbuf + (ptrdiff_t)(segindex-1) * (ptrdiff_t)segment_increment;
208 if( tree->tree_nextsize <= 1 ) {
209 if( (ompi_op_is_commute(op)) &&
210 !((MPI_IN_PLACE == sendbuf) && (rank == tree->tree_root)) ) {
211 local_op_buffer = sendtmpbuf + (ptrdiff_t)(segindex-1) * (ptrdiff_t)segment_increment;
212 }
213 }
214 ompi_op_reduce(op, local_op_buffer, accumulator, prevcount,
215 datatype );
216
217
218
219
220 if (rank != tree->tree_root) {
221
222 ret = MCA_PML_CALL( send( accumulator, prevcount,
223 datatype, tree->tree_prev,
224 MCA_COLL_BASE_TAG_REDUCE,
225 MCA_PML_BASE_SEND_STANDARD,
226 comm) );
227 if (ret != MPI_SUCCESS) {
228 line = __LINE__; goto error_hndl;
229 }
230 }
231
232
233
234 if (segindex == num_segments) break;
235 }
236
237
238 inbi = inbi ^ 1;
239 }
240 }
241
242
243 if( inbuf_free[0] != NULL) free(inbuf_free[0]);
244 if( inbuf_free[1] != NULL) free(inbuf_free[1]);
245 if( accumbuf_free != NULL ) free(accumbuf_free);
246 }
247
248
249
250
251
252
253
254
255
256
257
258 else {
259
260
261
262
263 if ((0 == max_outstanding_reqs) ||
264 (num_segments <= max_outstanding_reqs)) {
265
266 segindex = 0;
267 while ( original_count > 0) {
268 if (original_count < count_by_segment) {
269 count_by_segment = original_count;
270 }
271 ret = MCA_PML_CALL( send((char*)sendbuf +
272 (ptrdiff_t)segindex * (ptrdiff_t)segment_increment,
273 count_by_segment, datatype,
274 tree->tree_prev,
275 MCA_COLL_BASE_TAG_REDUCE,
276 MCA_PML_BASE_SEND_STANDARD,
277 comm) );
278 if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
279 segindex++;
280 original_count -= count_by_segment;
281 }
282 }
283
284
285
286
287
288
289
290 else {
291
292 int creq = 0;
293
294 sreq = ompi_coll_base_comm_get_reqs(module->base_data, max_outstanding_reqs);
295 if (NULL == sreq) { line = __LINE__; ret = -1; goto error_hndl; }
296
297
298 for (segindex = 0; segindex < max_outstanding_reqs; segindex++) {
299 ret = MCA_PML_CALL( isend((char*)sendbuf +
300 (ptrdiff_t)segindex * (ptrdiff_t)segment_increment,
301 count_by_segment, datatype,
302 tree->tree_prev,
303 MCA_COLL_BASE_TAG_REDUCE,
304 MCA_PML_BASE_SEND_SYNCHRONOUS, comm,
305 &sreq[segindex]) );
306 if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
307 original_count -= count_by_segment;
308 }
309
310 creq = 0;
311 while ( original_count > 0 ) {
312
313 ret = ompi_request_wait(&sreq[creq], MPI_STATUS_IGNORE);
314 if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
315
316 if( original_count < count_by_segment ) {
317 count_by_segment = original_count;
318 }
319 ret = MCA_PML_CALL( isend((char*)sendbuf +
320 (ptrdiff_t)segindex * (ptrdiff_t)segment_increment,
321 count_by_segment, datatype,
322 tree->tree_prev,
323 MCA_COLL_BASE_TAG_REDUCE,
324 MCA_PML_BASE_SEND_SYNCHRONOUS, comm,
325 &sreq[creq]) );
326 if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
327 creq = (creq + 1) % max_outstanding_reqs;
328 segindex++;
329 original_count -= count_by_segment;
330 }
331
332
333 ret = ompi_request_wait_all( max_outstanding_reqs, sreq,
334 MPI_STATUSES_IGNORE );
335 if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
336 }
337 }
338 return OMPI_SUCCESS;
339
340 error_hndl:
341
342 if (MPI_ERR_IN_STATUS == ret) {
343 for( i = 0; i < 2; i++ ) {
344 if (MPI_REQUEST_NULL == reqs[i]) continue;
345 if (MPI_ERR_PENDING == reqs[i]->req_status.MPI_ERROR) continue;
346 ret = reqs[i]->req_status.MPI_ERROR;
347 break;
348 }
349 }
350 ompi_coll_base_free_reqs(reqs, 2);
351 if( NULL != sreq ) {
352 if (MPI_ERR_IN_STATUS == ret) {
353 for( i = 0; i < max_outstanding_reqs; i++ ) {
354 if (MPI_REQUEST_NULL == sreq[i]) continue;
355 if (MPI_ERR_PENDING == sreq[i]->req_status.MPI_ERROR) continue;
356 ret = sreq[i]->req_status.MPI_ERROR;
357 break;
358 }
359 }
360 ompi_coll_base_free_reqs(sreq, max_outstanding_reqs);
361 }
362 if( inbuf_free[0] != NULL ) free(inbuf_free[0]);
363 if( inbuf_free[1] != NULL ) free(inbuf_free[1]);
364 if( accumbuf_free != NULL ) free(accumbuf);
365 OPAL_OUTPUT (( ompi_coll_base_framework.framework_output,
366 "ERROR_HNDL: node %d file %s line %d error %d\n",
367 rank, __FILE__, line, ret ));
368 (void)line;
369 return ret;
370 }
371
372
373
374
375
376
377
378
379 int ompi_coll_base_reduce_intra_chain( const void *sendbuf, void *recvbuf, int count,
380 ompi_datatype_t* datatype,
381 ompi_op_t* op, int root,
382 ompi_communicator_t* comm,
383 mca_coll_base_module_t *module,
384 uint32_t segsize, int fanout,
385 int max_outstanding_reqs )
386 {
387 int segcount = count;
388 size_t typelng;
389 mca_coll_base_module_t *base_module = (mca_coll_base_module_t*) module;
390 mca_coll_base_comm_t *data = base_module->base_data;
391
392 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,"coll:base:reduce_intra_chain rank %d fo %d ss %5d", ompi_comm_rank(comm), fanout, segsize));
393
394 COLL_BASE_UPDATE_CHAIN( comm, base_module, root, fanout );
395
396
397
398
399 ompi_datatype_type_size( datatype, &typelng );
400 COLL_BASE_COMPUTED_SEGCOUNT( segsize, typelng, segcount );
401
402 return ompi_coll_base_reduce_generic( sendbuf, recvbuf, count, datatype,
403 op, root, comm, module,
404 data->cached_chain,
405 segcount, max_outstanding_reqs );
406 }
407
408
409 int ompi_coll_base_reduce_intra_pipeline( const void *sendbuf, void *recvbuf,
410 int count, ompi_datatype_t* datatype,
411 ompi_op_t* op, int root,
412 ompi_communicator_t* comm,
413 mca_coll_base_module_t *module,
414 uint32_t segsize,
415 int max_outstanding_reqs )
416 {
417 int segcount = count;
418 size_t typelng;
419 mca_coll_base_module_t *base_module = (mca_coll_base_module_t*) module;
420 mca_coll_base_comm_t *data = base_module->base_data;
421
422 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,"coll:base:reduce_intra_pipeline rank %d ss %5d",
423 ompi_comm_rank(comm), segsize));
424
425 COLL_BASE_UPDATE_PIPELINE( comm, base_module, root );
426
427
428
429
430
431 ompi_datatype_type_size( datatype, &typelng );
432 COLL_BASE_COMPUTED_SEGCOUNT( segsize, typelng, segcount );
433
434 return ompi_coll_base_reduce_generic( sendbuf, recvbuf, count, datatype,
435 op, root, comm, module,
436 data->cached_pipeline,
437 segcount, max_outstanding_reqs );
438 }
439
440 int ompi_coll_base_reduce_intra_binary( const void *sendbuf, void *recvbuf,
441 int count, ompi_datatype_t* datatype,
442 ompi_op_t* op, int root,
443 ompi_communicator_t* comm,
444 mca_coll_base_module_t *module,
445 uint32_t segsize,
446 int max_outstanding_reqs )
447 {
448 int segcount = count;
449 size_t typelng;
450 mca_coll_base_module_t *base_module = (mca_coll_base_module_t*) module;
451 mca_coll_base_comm_t *data = base_module->base_data;
452
453 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,"coll:base:reduce_intra_binary rank %d ss %5d",
454 ompi_comm_rank(comm), segsize));
455
456 COLL_BASE_UPDATE_BINTREE( comm, base_module, root );
457
458
459
460
461
462 ompi_datatype_type_size( datatype, &typelng );
463 COLL_BASE_COMPUTED_SEGCOUNT( segsize, typelng, segcount );
464
465 return ompi_coll_base_reduce_generic( sendbuf, recvbuf, count, datatype,
466 op, root, comm, module,
467 data->cached_bintree,
468 segcount, max_outstanding_reqs );
469 }
470
471 int ompi_coll_base_reduce_intra_binomial( const void *sendbuf, void *recvbuf,
472 int count, ompi_datatype_t* datatype,
473 ompi_op_t* op, int root,
474 ompi_communicator_t* comm,
475 mca_coll_base_module_t *module,
476 uint32_t segsize,
477 int max_outstanding_reqs )
478 {
479 int segcount = count;
480 size_t typelng;
481 mca_coll_base_module_t *base_module = (mca_coll_base_module_t*) module;
482 mca_coll_base_comm_t *data = base_module->base_data;
483
484 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,"coll:base:reduce_intra_binomial rank %d ss %5d",
485 ompi_comm_rank(comm), segsize));
486
487 COLL_BASE_UPDATE_IN_ORDER_BMTREE( comm, base_module, root );
488
489
490
491
492
493 ompi_datatype_type_size( datatype, &typelng );
494 COLL_BASE_COMPUTED_SEGCOUNT( segsize, typelng, segcount );
495
496 return ompi_coll_base_reduce_generic( sendbuf, recvbuf, count, datatype,
497 op, root, comm, module,
498 data->cached_in_order_bmtree,
499 segcount, max_outstanding_reqs );
500 }
501
502
503
504
505
506
507
508
509 int ompi_coll_base_reduce_intra_in_order_binary( const void *sendbuf, void *recvbuf,
510 int count,
511 ompi_datatype_t* datatype,
512 ompi_op_t* op, int root,
513 ompi_communicator_t* comm,
514 mca_coll_base_module_t *module,
515 uint32_t segsize,
516 int max_outstanding_reqs )
517 {
518 int ret, rank, size, io_root, segcount = count;
519 void *use_this_sendbuf = NULL;
520 void *use_this_recvbuf = NULL;
521 char *tmpbuf_free = NULL;
522 size_t typelng;
523 mca_coll_base_module_t *base_module = (mca_coll_base_module_t*) module;
524 mca_coll_base_comm_t *data = base_module->base_data;
525
526 rank = ompi_comm_rank(comm);
527 size = ompi_comm_size(comm);
528 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,"coll:base:reduce_intra_in_order_binary rank %d ss %5d",
529 rank, segsize));
530
531 COLL_BASE_UPDATE_IN_ORDER_BINTREE( comm, base_module );
532
533
534
535
536
537 ompi_datatype_type_size( datatype, &typelng );
538 COLL_BASE_COMPUTED_SEGCOUNT( segsize, typelng, segcount );
539
540
541
542
543
544
545
546
547 io_root = size - 1;
548 use_this_sendbuf = (void *)sendbuf;
549 use_this_recvbuf = recvbuf;
550 if (io_root != root) {
551 ptrdiff_t dsize, gap = 0;
552 char *tmpbuf;
553
554 dsize = opal_datatype_span(&datatype->super, count, &gap);
555
556 if ((root == rank) && (MPI_IN_PLACE == sendbuf)) {
557 tmpbuf_free = (char *) malloc(dsize);
558 if (NULL == tmpbuf_free) {
559 return MPI_ERR_INTERN;
560 }
561 tmpbuf = tmpbuf_free - gap;
562 ompi_datatype_copy_content_same_ddt(datatype, count,
563 (char*)tmpbuf,
564 (char*)recvbuf);
565 use_this_sendbuf = tmpbuf;
566 } else if (io_root == rank) {
567 tmpbuf_free = (char *) malloc(dsize);
568 if (NULL == tmpbuf_free) {
569 return MPI_ERR_INTERN;
570 }
571 tmpbuf = tmpbuf_free - gap;
572 use_this_recvbuf = tmpbuf;
573 }
574 }
575
576
577 ret = ompi_coll_base_reduce_generic( use_this_sendbuf, use_this_recvbuf, count, datatype,
578 op, io_root, comm, module,
579 data->cached_in_order_bintree,
580 segcount, max_outstanding_reqs );
581 if (MPI_SUCCESS != ret) { return ret; }
582
583
584 if (io_root != root) {
585 if (root == rank) {
586
587 ret = MCA_PML_CALL(recv(recvbuf, count, datatype, io_root,
588 MCA_COLL_BASE_TAG_REDUCE, comm,
589 MPI_STATUS_IGNORE));
590 if (MPI_SUCCESS != ret) { return ret; }
591
592 } else if (io_root == rank) {
593
594 ret = MCA_PML_CALL(send(use_this_recvbuf, count, datatype, root,
595 MCA_COLL_BASE_TAG_REDUCE,
596 MCA_PML_BASE_SEND_STANDARD, comm));
597 if (MPI_SUCCESS != ret) { return ret; }
598 }
599 }
600 if (NULL != tmpbuf_free) {
601 free(tmpbuf_free);
602 }
603
604 return MPI_SUCCESS;
605 }
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626 int
627 ompi_coll_base_reduce_intra_basic_linear(const void *sbuf, void *rbuf, int count,
628 struct ompi_datatype_t *dtype,
629 struct ompi_op_t *op,
630 int root,
631 struct ompi_communicator_t *comm,
632 mca_coll_base_module_t *module)
633 {
634 int i, rank, err, size;
635 ptrdiff_t extent, dsize, gap = 0;
636 char *free_buffer = NULL;
637 char *pml_buffer = NULL;
638 char *inplace_temp_free = NULL;
639 char *inbuf;
640
641
642
643 rank = ompi_comm_rank(comm);
644 size = ompi_comm_size(comm);
645
646
647
648 if (rank != root) {
649 err = MCA_PML_CALL(send(sbuf, count, dtype, root,
650 MCA_COLL_BASE_TAG_REDUCE,
651 MCA_PML_BASE_SEND_STANDARD, comm));
652 return err;
653 }
654
655 dsize = opal_datatype_span(&dtype->super, count, &gap);
656 ompi_datatype_type_extent(dtype, &extent);
657
658 if (MPI_IN_PLACE == sbuf) {
659 sbuf = rbuf;
660 inplace_temp_free = (char*)malloc(dsize);
661 if (NULL == inplace_temp_free) {
662 return OMPI_ERR_OUT_OF_RESOURCE;
663 }
664 rbuf = inplace_temp_free - gap;
665 }
666
667 if (size > 1) {
668 free_buffer = (char*)malloc(dsize);
669 if (NULL == free_buffer) {
670 if (NULL != inplace_temp_free) {
671 free(inplace_temp_free);
672 }
673 return OMPI_ERR_OUT_OF_RESOURCE;
674 }
675 pml_buffer = free_buffer - gap;
676 }
677
678
679
680 if (rank == (size - 1)) {
681 err = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf);
682 } else {
683 err = MCA_PML_CALL(recv(rbuf, count, dtype, size - 1,
684 MCA_COLL_BASE_TAG_REDUCE, comm,
685 MPI_STATUS_IGNORE));
686 }
687 if (MPI_SUCCESS != err) {
688 if (NULL != free_buffer) {
689 free(free_buffer);
690 }
691 return err;
692 }
693
694
695
696 for (i = size - 2; i >= 0; --i) {
697 if (rank == i) {
698 inbuf = (char*)sbuf;
699 } else {
700 err = MCA_PML_CALL(recv(pml_buffer, count, dtype, i,
701 MCA_COLL_BASE_TAG_REDUCE, comm,
702 MPI_STATUS_IGNORE));
703 if (MPI_SUCCESS != err) {
704 if (NULL != free_buffer) {
705 free(free_buffer);
706 }
707 return err;
708 }
709
710 inbuf = pml_buffer;
711 }
712
713
714
715 ompi_op_reduce(op, inbuf, rbuf, count, dtype);
716 }
717
718 if (NULL != inplace_temp_free) {
719 err = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)sbuf, rbuf);
720 free(inplace_temp_free);
721 }
722 if (NULL != free_buffer) {
723 free(free_buffer);
724 }
725
726
727
728 return MPI_SUCCESS;
729 }
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791 int ompi_coll_base_reduce_intra_redscat_gather(
792 const void *sbuf, void *rbuf, int count, struct ompi_datatype_t *dtype,
793 struct ompi_op_t *op, int root, struct ompi_communicator_t *comm,
794 mca_coll_base_module_t *module)
795 {
796 int comm_size = ompi_comm_size(comm);
797 int rank = ompi_comm_rank(comm);
798
799 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
800 "coll:base:reduce_intra_redscat_gather: rank %d/%d, root %d",
801 rank, comm_size, root));
802
803
804 int nsteps = opal_hibit(comm_size, comm->c_cube_dim + 1);
805 assert(nsteps >= 0);
806 int nprocs_pof2 = 1 << nsteps;
807
808 if (nprocs_pof2 < 2 || count < nprocs_pof2 || !ompi_op_is_commute(op)) {
809 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
810 "coll:base:reduce_intra_redscat_gather: rank %d/%d count %d "
811 "switching to basic linear reduce", rank, comm_size, count));
812 return ompi_coll_base_reduce_intra_basic_linear(sbuf, rbuf, count, dtype,
813 op, root, comm, module);
814 }
815
816 int err = MPI_SUCCESS;
817 int *rindex = NULL, *rcount = NULL, *sindex = NULL, *scount = NULL;
818 ptrdiff_t lb, extent, dsize, gap;
819 ompi_datatype_get_extent(dtype, &lb, &extent);
820 dsize = opal_datatype_span(&dtype->super, count, &gap);
821
822
823 char *tmp_buf_raw = NULL, *rbuf_raw = NULL;
824 tmp_buf_raw = malloc(dsize);
825 if (NULL == tmp_buf_raw) {
826 err = OMPI_ERR_OUT_OF_RESOURCE;
827 goto cleanup_and_return;
828 }
829 char *tmp_buf = tmp_buf_raw - gap;
830
831 if (rank != root) {
832 rbuf_raw = malloc(dsize);
833 if (NULL == rbuf_raw) {
834 err = OMPI_ERR_OUT_OF_RESOURCE;
835 goto cleanup_and_return;
836 }
837 rbuf = rbuf_raw - gap;
838 }
839
840 if ((rank != root) || (sbuf != MPI_IN_PLACE)) {
841 err = ompi_datatype_copy_content_same_ddt(dtype, count, rbuf,
842 (char *)sbuf);
843 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
844 }
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864 int vrank, step, wsize;
865 int nprocs_rem = comm_size - nprocs_pof2;
866
867 if (rank < 2 * nprocs_rem) {
868 int count_lhalf = count / 2;
869 int count_rhalf = count - count_lhalf;
870
871 if (rank % 2 != 0) {
872
873
874
875
876
877 err = ompi_coll_base_sendrecv(rbuf, count_lhalf, dtype, rank - 1,
878 MCA_COLL_BASE_TAG_REDUCE,
879 (char *)tmp_buf + (ptrdiff_t)count_lhalf * extent,
880 count_rhalf, dtype, rank - 1,
881 MCA_COLL_BASE_TAG_REDUCE, comm,
882 MPI_STATUS_IGNORE, rank);
883 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
884
885
886 ompi_op_reduce(op, (char *)tmp_buf + (ptrdiff_t)count_lhalf * extent,
887 (char *)rbuf + count_lhalf * extent, count_rhalf, dtype);
888
889
890 err = MCA_PML_CALL(send((char *)rbuf + (ptrdiff_t)count_lhalf * extent,
891 count_rhalf, dtype, rank - 1,
892 MCA_COLL_BASE_TAG_REDUCE,
893 MCA_PML_BASE_SEND_STANDARD, comm));
894 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
895
896
897 vrank = -1;
898
899 } else {
900
901
902
903
904
905 err = ompi_coll_base_sendrecv((char *)rbuf + (ptrdiff_t)count_lhalf * extent,
906 count_rhalf, dtype, rank + 1,
907 MCA_COLL_BASE_TAG_REDUCE,
908 tmp_buf, count_lhalf, dtype, rank + 1,
909 MCA_COLL_BASE_TAG_REDUCE, comm,
910 MPI_STATUS_IGNORE, rank);
911 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
912
913
914 ompi_op_reduce(op, tmp_buf, rbuf, count_lhalf, dtype);
915
916
917 err = MCA_PML_CALL(recv((char *)rbuf + (ptrdiff_t)count_lhalf * extent,
918 count_rhalf, dtype, rank + 1,
919 MCA_COLL_BASE_TAG_REDUCE, comm,
920 MPI_STATUS_IGNORE));
921 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
922
923 vrank = rank / 2;
924 }
925 } else {
926 vrank = rank - nprocs_rem;
927 }
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942 rindex = malloc(sizeof(*rindex) * nsteps);
943 sindex = malloc(sizeof(*sindex) * nsteps);
944 rcount = malloc(sizeof(*rcount) * nsteps);
945 scount = malloc(sizeof(*scount) * nsteps);
946 if (NULL == rindex || NULL == sindex || NULL == rcount || NULL == scount) {
947 err = OMPI_ERR_OUT_OF_RESOURCE;
948 goto cleanup_and_return;
949 }
950
951 if (vrank != -1) {
952 step = 0;
953 wsize = count;
954 sindex[0] = rindex[0] = 0;
955
956 for (int mask = 1; mask < nprocs_pof2; mask <<= 1) {
957
958
959
960
961 int vdest = vrank ^ mask;
962
963 int dest = (vdest < nprocs_rem) ? vdest * 2 : vdest + nprocs_rem;
964
965 if (rank < dest) {
966
967
968
969
970
971 rcount[step] = wsize / 2;
972 scount[step] = wsize - rcount[step];
973 sindex[step] = rindex[step] + rcount[step];
974 } else {
975
976
977
978
979
980 scount[step] = wsize / 2;
981 rcount[step] = wsize - scount[step];
982 rindex[step] = sindex[step] + scount[step];
983 }
984
985
986 err = ompi_coll_base_sendrecv((char *)rbuf + (ptrdiff_t)sindex[step] * extent,
987 scount[step], dtype, dest,
988 MCA_COLL_BASE_TAG_REDUCE,
989 (char *)tmp_buf + (ptrdiff_t)rindex[step] * extent,
990 rcount[step], dtype, dest,
991 MCA_COLL_BASE_TAG_REDUCE, comm,
992 MPI_STATUS_IGNORE, rank);
993 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
994
995
996 ompi_op_reduce(op, (char *)tmp_buf + (ptrdiff_t)rindex[step] * extent,
997 (char *)rbuf + (ptrdiff_t)rindex[step] * extent,
998 rcount[step], dtype);
999
1000
1001 if (step + 1 < nsteps) {
1002 rindex[step + 1] = rindex[step];
1003 sindex[step + 1] = rindex[step];
1004 wsize = rcount[step];
1005 step++;
1006 }
1007 }
1008 }
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021 int vroot = 0;
1022 if (root < 2 * nprocs_rem) {
1023 if (root % 2 != 0) {
1024 vroot = 0;
1025 if (rank == root) {
1026
1027
1028
1029
1030
1031 rindex[0] = 0;
1032 step = 0, wsize = count;
1033 for (int mask = 1; mask < nprocs_pof2; mask *= 2) {
1034 rcount[step] = wsize / 2;
1035 scount[step] = wsize - rcount[step];
1036 rindex[step] = 0;
1037 sindex[step] = rcount[step];
1038 step++;
1039 wsize /= 2;
1040 }
1041
1042 err = MCA_PML_CALL(recv(rbuf, rcount[nsteps - 1], dtype, 0,
1043 MCA_COLL_BASE_TAG_REDUCE, comm,
1044 MPI_STATUS_IGNORE));
1045 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
1046 vrank = 0;
1047
1048 } else if (vrank == 0) {
1049
1050 err = MCA_PML_CALL(send(rbuf, rcount[nsteps - 1], dtype, root,
1051 MCA_COLL_BASE_TAG_REDUCE,
1052 MCA_PML_BASE_SEND_STANDARD, comm));
1053 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
1054 vrank = -1;
1055 }
1056 } else {
1057
1058 vroot = root / 2;
1059 }
1060 } else {
1061
1062 vroot = root - nprocs_rem;
1063 }
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073 if (vrank != -1) {
1074 int vdest_tree, vroot_tree;
1075 step = nsteps - 1;
1076
1077 for (int mask = nprocs_pof2 >> 1; mask > 0; mask >>= 1) {
1078 int vdest = vrank ^ mask;
1079
1080 int dest = (vdest < nprocs_rem) ? vdest * 2 : vdest + nprocs_rem;
1081 if ((vdest == 0) && (root < 2 * nprocs_rem) && (root % 2 != 0))
1082 dest = root;
1083
1084 vdest_tree = vdest >> step;
1085 vdest_tree <<= step;
1086 vroot_tree = vroot >> step;
1087 vroot_tree <<= step;
1088 if (vdest_tree == vroot_tree) {
1089
1090 err = MCA_PML_CALL(send((char *)rbuf + (ptrdiff_t)rindex[step] * extent,
1091 rcount[step], dtype, dest,
1092 MCA_COLL_BASE_TAG_REDUCE,
1093 MCA_PML_BASE_SEND_STANDARD, comm));
1094 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
1095 break;
1096 } else {
1097
1098 err = MCA_PML_CALL(recv((char *)rbuf + (ptrdiff_t)sindex[step] * extent,
1099 scount[step], dtype, dest,
1100 MCA_COLL_BASE_TAG_REDUCE, comm,
1101 MPI_STATUS_IGNORE));
1102 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
1103 }
1104 step--;
1105 }
1106 }
1107
1108 cleanup_and_return:
1109 if (NULL != tmp_buf_raw)
1110 free(tmp_buf_raw);
1111 if (NULL != rbuf_raw)
1112 free(rbuf_raw);
1113 if (NULL != rindex)
1114 free(rindex);
1115 if (NULL != sindex)
1116 free(sindex);
1117 if (NULL != rcount)
1118 free(rcount);
1119 if (NULL != scount)
1120 free(scount);
1121 return err;
1122 }