1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
2 /*
3 * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
4 * University Research and Technology
5 * Corporation. All rights reserved.
6 * Copyright (c) 2004-2017 The University of Tennessee and The University
7 * of Tennessee Research Foundation. All rights
8 * reserved.
9 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
10 * University of Stuttgart. All rights reserved.
11 * Copyright (c) 2004-2005 The Regents of the University of California.
12 * All rights reserved.
13 * Copyright (c) 2013 Los Alamos National Security, LLC. All Rights
14 * reserved.
15 * Copyright (c) 2015-2016 Research Organization for Information Science
16 * and Technology (RIST). All rights reserved.
17 * Copyright (c) 2016-2017 IBM Corporation. All rights reserved.
18 * Copyright (c) 2018 Siberian State University of Telecommunications
19 * and Information Science. All rights reserved.
20 * $COPYRIGHT$
21 *
22 * Additional copyrights may follow
23 *
24 * $HEADER$
25 */
26
27 #include "ompi_config.h"
28
29 #include "mpi.h"
30 #include "opal/util/bit_ops.h"
31 #include "ompi/constants.h"
32 #include "ompi/datatype/ompi_datatype.h"
33 #include "ompi/communicator/communicator.h"
34 #include "ompi/mca/coll/coll.h"
35 #include "ompi/mca/coll/base/coll_tags.h"
36 #include "ompi/mca/pml/pml.h"
37 #include "ompi/op/op.h"
38 #include "ompi/mca/coll/base/coll_base_functions.h"
39 #include "coll_base_topo.h"
40 #include "coll_base_util.h"
41
42 int mca_coll_base_reduce_local(const void *inbuf, void *inoutbuf, int count,
43 struct ompi_datatype_t * dtype, struct ompi_op_t * op,
44 mca_coll_base_module_t *module)
45 {
46 /* XXX -- CONST -- do not cast away const -- update ompi/op/op.h */
47 ompi_op_reduce(op, (void *)inbuf, inoutbuf, count, dtype);
48 return OMPI_SUCCESS;
49 }
50
51 /**
52 * This is a generic implementation of the reduce protocol. It used the tree
53 * provided as an argument and execute all operations using a segment of
54 * count times a datatype.
55 * For the last communication it will update the count in order to limit
56 * the number of datatype to the original count (original_count)
57 *
58 * Note that for non-commutative operations we cannot save memory copy
59 * for the first block: thus we must copy sendbuf to accumbuf on intermediate
60 * to keep the optimized loop happy.
61 */
62 int ompi_coll_base_reduce_generic( const void* sendbuf, void* recvbuf, int original_count,
63 ompi_datatype_t* datatype, ompi_op_t* op,
64 int root, ompi_communicator_t* comm,
65 mca_coll_base_module_t *module,
66 ompi_coll_tree_t* tree, int count_by_segment,
67 int max_outstanding_reqs )
68 {
69 char *inbuf[2] = {NULL, NULL}, *inbuf_free[2] = {NULL, NULL};
70 char *accumbuf = NULL, *accumbuf_free = NULL;
71 char *local_op_buffer = NULL, *sendtmpbuf = NULL;
72 ptrdiff_t extent, size, gap = 0, segment_increment;
73 ompi_request_t **sreq = NULL, *reqs[2] = {MPI_REQUEST_NULL, MPI_REQUEST_NULL};
74 int num_segments, line, ret, segindex, i, rank;
75 int recvcount, prevcount, inbi;
76
77 /**
78 * Determine number of segments and number of elements
79 * sent per operation
80 */
81 ompi_datatype_type_extent( datatype, &extent );
82 num_segments = (int)(((size_t)original_count + (size_t)count_by_segment - (size_t)1) / (size_t)count_by_segment);
83 segment_increment = (ptrdiff_t)count_by_segment * extent;
84
85 sendtmpbuf = (char*) sendbuf;
86 if( sendbuf == MPI_IN_PLACE ) {
87 sendtmpbuf = (char *)recvbuf;
88 }
89
90 OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "coll:base:reduce_generic count %d, msg size %ld, segsize %ld, max_requests %d",
91 original_count, (unsigned long)((ptrdiff_t)num_segments * (ptrdiff_t)segment_increment),
92 (unsigned long)segment_increment, max_outstanding_reqs));
93
94 rank = ompi_comm_rank(comm);
95
96 /* non-leaf nodes - wait for children to send me data & forward up
97 (if needed) */
98 if( tree->tree_nextsize > 0 ) {
99 ptrdiff_t real_segment_size;
100
101 /* handle non existant recv buffer (i.e. its NULL) and
102 protect the recv buffer on non-root nodes */
103 accumbuf = (char*)recvbuf;
104 if( (NULL == accumbuf) || (root != rank) ) {
105 /* Allocate temporary accumulator buffer. */
106 size = opal_datatype_span(&datatype->super, original_count, &gap);
107 accumbuf_free = (char*)malloc(size);
108 if (accumbuf_free == NULL) {
109 line = __LINE__; ret = -1; goto error_hndl;
110 }
111 accumbuf = accumbuf_free - gap;
112 }
113
114 /* If this is a non-commutative operation we must copy
115 sendbuf to the accumbuf, in order to simplfy the loops */
116
117 if (!ompi_op_is_commute(op) && MPI_IN_PLACE != sendbuf) {
118 ompi_datatype_copy_content_same_ddt(datatype, original_count,
119 (char*)accumbuf,
120 (char*)sendtmpbuf);
121 }
122 /* Allocate two buffers for incoming segments */
123 real_segment_size = opal_datatype_span(&datatype->super, count_by_segment, &gap);
124 inbuf_free[0] = (char*) malloc(real_segment_size);
125 if( inbuf_free[0] == NULL ) {
126 line = __LINE__; ret = -1; goto error_hndl;
127 }
128 inbuf[0] = inbuf_free[0] - gap;
129 /* if there is chance to overlap communication -
130 allocate second buffer */
131 if( (num_segments > 1) || (tree->tree_nextsize > 1) ) {
132 inbuf_free[1] = (char*) malloc(real_segment_size);
133 if( inbuf_free[1] == NULL ) {
134 line = __LINE__; ret = -1; goto error_hndl;
135 }
136 inbuf[1] = inbuf_free[1] - gap;
137 }
138
139 /* reset input buffer index and receive count */
140 inbi = 0;
141 recvcount = 0;
142 /* for each segment */
143 for( segindex = 0; segindex <= num_segments; segindex++ ) {
144 prevcount = recvcount;
145 /* recvcount - number of elements in current segment */
146 recvcount = count_by_segment;
147 if( segindex == (num_segments-1) )
148 recvcount = original_count - (ptrdiff_t)count_by_segment * (ptrdiff_t)segindex;
149
150 /* for each child */
151 for( i = 0; i < tree->tree_nextsize; i++ ) {
152 /**
153 * We try to overlap communication:
154 * either with next segment or with the next child
155 */
156 /* post irecv for current segindex on current child */
157 if( segindex < num_segments ) {
158 void* local_recvbuf = inbuf[inbi];
159 if( 0 == i ) {
160 /* for the first step (1st child per segment) and
161 * commutative operations we might be able to irecv
162 * directly into the accumulate buffer so that we can
163 * reduce(op) this with our sendbuf in one step as
164 * ompi_op_reduce only has two buffer pointers,
165 * this avoids an extra memory copy.
166 *
167 * BUT if the operation is non-commutative or
168 * we are root and are USING MPI_IN_PLACE this is wrong!
169 */
170 if( (ompi_op_is_commute(op)) &&
171 !((MPI_IN_PLACE == sendbuf) && (rank == tree->tree_root)) ) {
172 local_recvbuf = accumbuf + (ptrdiff_t)segindex * (ptrdiff_t)segment_increment;
173 }
174 }
175
176 ret = MCA_PML_CALL(irecv(local_recvbuf, recvcount, datatype,
177 tree->tree_next[i],
178 MCA_COLL_BASE_TAG_REDUCE, comm,
179 &reqs[inbi]));
180 if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl;}
181 }
182 /* wait for previous req to complete, if any.
183 if there are no requests reqs[inbi ^1] will be
184 MPI_REQUEST_NULL. */
185 /* wait on data from last child for previous segment */
186 ret = ompi_request_wait(&reqs[inbi ^ 1],
187 MPI_STATUSES_IGNORE );
188 if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
189 local_op_buffer = inbuf[inbi ^ 1];
190 if( i > 0 ) {
191 /* our first operation is to combine our own [sendbuf] data
192 * with the data we recvd from down stream (but only
193 * the operation is commutative and if we are not root and
194 * not using MPI_IN_PLACE)
195 */
196 if( 1 == i ) {
197 if( (ompi_op_is_commute(op)) &&
198 !((MPI_IN_PLACE == sendbuf) && (rank == tree->tree_root)) ) {
199 local_op_buffer = sendtmpbuf + (ptrdiff_t)segindex * (ptrdiff_t)segment_increment;
200 }
201 }
202 /* apply operation */
203 ompi_op_reduce(op, local_op_buffer,
204 accumbuf + (ptrdiff_t)segindex * (ptrdiff_t)segment_increment,
205 recvcount, datatype );
206 } else if ( segindex > 0 ) {
207 void* accumulator = accumbuf + (ptrdiff_t)(segindex-1) * (ptrdiff_t)segment_increment;
208 if( tree->tree_nextsize <= 1 ) {
209 if( (ompi_op_is_commute(op)) &&
210 !((MPI_IN_PLACE == sendbuf) && (rank == tree->tree_root)) ) {
211 local_op_buffer = sendtmpbuf + (ptrdiff_t)(segindex-1) * (ptrdiff_t)segment_increment;
212 }
213 }
214 ompi_op_reduce(op, local_op_buffer, accumulator, prevcount,
215 datatype );
216
217 /* all reduced on available data this step (i) complete,
218 * pass to the next process unless you are the root.
219 */
220 if (rank != tree->tree_root) {
221 /* send combined/accumulated data to parent */
222 ret = MCA_PML_CALL( send( accumulator, prevcount,
223 datatype, tree->tree_prev,
224 MCA_COLL_BASE_TAG_REDUCE,
225 MCA_PML_BASE_SEND_STANDARD,
226 comm) );
227 if (ret != MPI_SUCCESS) {
228 line = __LINE__; goto error_hndl;
229 }
230 }
231
232 /* we stop when segindex = number of segments
233 (i.e. we do num_segment+1 steps for pipelining */
234 if (segindex == num_segments) break;
235 }
236
237 /* update input buffer index */
238 inbi = inbi ^ 1;
239 } /* end of for each child */
240 } /* end of for each segment */
241
242 /* clean up */
243 if( inbuf_free[0] != NULL) free(inbuf_free[0]);
244 if( inbuf_free[1] != NULL) free(inbuf_free[1]);
245 if( accumbuf_free != NULL ) free(accumbuf_free);
246 }
247
248 /* leaf nodes
249 Depending on the value of max_outstanding_reqs and
250 the number of segments we have two options:
251 - send all segments using blocking send to the parent, or
252 - avoid overflooding the parent nodes by limiting the number of
253 outstanding requests to max_oustanding_reqs.
254 TODO/POSSIBLE IMPROVEMENT: If there is a way to determine the eager size
255 for the current communication, synchronization should be used only
256 when the message/segment size is smaller than the eager size.
257 */
258 else {
259
260 /* If the number of segments is less than a maximum number of oustanding
261 requests or there is no limit on the maximum number of outstanding
262 requests, we send data to the parent using blocking send */
263 if ((0 == max_outstanding_reqs) ||
264 (num_segments <= max_outstanding_reqs)) {
265
266 segindex = 0;
267 while ( original_count > 0) {
268 if (original_count < count_by_segment) {
269 count_by_segment = original_count;
270 }
271 ret = MCA_PML_CALL( send((char*)sendbuf +
272 (ptrdiff_t)segindex * (ptrdiff_t)segment_increment,
273 count_by_segment, datatype,
274 tree->tree_prev,
275 MCA_COLL_BASE_TAG_REDUCE,
276 MCA_PML_BASE_SEND_STANDARD,
277 comm) );
278 if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
279 segindex++;
280 original_count -= count_by_segment;
281 }
282 }
283
284 /* Otherwise, introduce flow control:
285 - post max_outstanding_reqs non-blocking synchronous send,
286 - for remaining segments
287 - wait for a ssend to complete, and post the next one.
288 - wait for all outstanding sends to complete.
289 */
290 else {
291
292 int creq = 0;
293
294 sreq = ompi_coll_base_comm_get_reqs(module->base_data, max_outstanding_reqs);
295 if (NULL == sreq) { line = __LINE__; ret = -1; goto error_hndl; }
296
297 /* post first group of requests */
298 for (segindex = 0; segindex < max_outstanding_reqs; segindex++) {
299 ret = MCA_PML_CALL( isend((char*)sendbuf +
300 (ptrdiff_t)segindex * (ptrdiff_t)segment_increment,
301 count_by_segment, datatype,
302 tree->tree_prev,
303 MCA_COLL_BASE_TAG_REDUCE,
304 MCA_PML_BASE_SEND_SYNCHRONOUS, comm,
305 &sreq[segindex]) );
306 if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
307 original_count -= count_by_segment;
308 }
309
310 creq = 0;
311 while ( original_count > 0 ) {
312 /* wait on a posted request to complete */
313 ret = ompi_request_wait(&sreq[creq], MPI_STATUS_IGNORE);
314 if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
315
316 if( original_count < count_by_segment ) {
317 count_by_segment = original_count;
318 }
319 ret = MCA_PML_CALL( isend((char*)sendbuf +
320 (ptrdiff_t)segindex * (ptrdiff_t)segment_increment,
321 count_by_segment, datatype,
322 tree->tree_prev,
323 MCA_COLL_BASE_TAG_REDUCE,
324 MCA_PML_BASE_SEND_SYNCHRONOUS, comm,
325 &sreq[creq]) );
326 if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
327 creq = (creq + 1) % max_outstanding_reqs;
328 segindex++;
329 original_count -= count_by_segment;
330 }
331
332 /* Wait on the remaining request to complete */
333 ret = ompi_request_wait_all( max_outstanding_reqs, sreq,
334 MPI_STATUSES_IGNORE );
335 if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
336 }
337 }
338 return OMPI_SUCCESS;
339
340 error_hndl: /* error handler */
341 /* find a real error code */
342 if (MPI_ERR_IN_STATUS == ret) {
343 for( i = 0; i < 2; i++ ) {
344 if (MPI_REQUEST_NULL == reqs[i]) continue;
345 if (MPI_ERR_PENDING == reqs[i]->req_status.MPI_ERROR) continue;
346 ret = reqs[i]->req_status.MPI_ERROR;
347 break;
348 }
349 }
350 ompi_coll_base_free_reqs(reqs, 2);
351 if( NULL != sreq ) {
352 if (MPI_ERR_IN_STATUS == ret) {
353 for( i = 0; i < max_outstanding_reqs; i++ ) {
354 if (MPI_REQUEST_NULL == sreq[i]) continue;
355 if (MPI_ERR_PENDING == sreq[i]->req_status.MPI_ERROR) continue;
356 ret = sreq[i]->req_status.MPI_ERROR;
357 break;
358 }
359 }
360 ompi_coll_base_free_reqs(sreq, max_outstanding_reqs);
361 }
362 if( inbuf_free[0] != NULL ) free(inbuf_free[0]);
363 if( inbuf_free[1] != NULL ) free(inbuf_free[1]);
364 if( accumbuf_free != NULL ) free(accumbuf);
365 OPAL_OUTPUT (( ompi_coll_base_framework.framework_output,
366 "ERROR_HNDL: node %d file %s line %d error %d\n",
367 rank, __FILE__, line, ret ));
368 (void)line; // silence compiler warning
369 return ret;
370 }
371
372 /* Attention: this version of the reduce operations does not
373 work for:
374 - non-commutative operations
375 - segment sizes which are not multiplies of the extent of the datatype
376 meaning that at least one datatype must fit in the segment !
377 */
378
379 int ompi_coll_base_reduce_intra_chain( const void *sendbuf, void *recvbuf, int count,
380 ompi_datatype_t* datatype,
381 ompi_op_t* op, int root,
382 ompi_communicator_t* comm,
383 mca_coll_base_module_t *module,
384 uint32_t segsize, int fanout,
385 int max_outstanding_reqs )
386 {
387 int segcount = count;
388 size_t typelng;
389 mca_coll_base_module_t *base_module = (mca_coll_base_module_t*) module;
390 mca_coll_base_comm_t *data = base_module->base_data;
391
392 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,"coll:base:reduce_intra_chain rank %d fo %d ss %5d", ompi_comm_rank(comm), fanout, segsize));
393
394 COLL_BASE_UPDATE_CHAIN( comm, base_module, root, fanout );
395 /**
396 * Determine number of segments and number of elements
397 * sent per operation
398 */
399 ompi_datatype_type_size( datatype, &typelng );
400 COLL_BASE_COMPUTED_SEGCOUNT( segsize, typelng, segcount );
401
402 return ompi_coll_base_reduce_generic( sendbuf, recvbuf, count, datatype,
403 op, root, comm, module,
404 data->cached_chain,
405 segcount, max_outstanding_reqs );
406 }
407
408
409 int ompi_coll_base_reduce_intra_pipeline( const void *sendbuf, void *recvbuf,
410 int count, ompi_datatype_t* datatype,
411 ompi_op_t* op, int root,
412 ompi_communicator_t* comm,
413 mca_coll_base_module_t *module,
414 uint32_t segsize,
415 int max_outstanding_reqs )
416 {
417 int segcount = count;
418 size_t typelng;
419 mca_coll_base_module_t *base_module = (mca_coll_base_module_t*) module;
420 mca_coll_base_comm_t *data = base_module->base_data;
421
422 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,"coll:base:reduce_intra_pipeline rank %d ss %5d",
423 ompi_comm_rank(comm), segsize));
424
425 COLL_BASE_UPDATE_PIPELINE( comm, base_module, root );
426
427 /**
428 * Determine number of segments and number of elements
429 * sent per operation
430 */
431 ompi_datatype_type_size( datatype, &typelng );
432 COLL_BASE_COMPUTED_SEGCOUNT( segsize, typelng, segcount );
433
434 return ompi_coll_base_reduce_generic( sendbuf, recvbuf, count, datatype,
435 op, root, comm, module,
436 data->cached_pipeline,
437 segcount, max_outstanding_reqs );
438 }
439
440 int ompi_coll_base_reduce_intra_binary( const void *sendbuf, void *recvbuf,
441 int count, ompi_datatype_t* datatype,
442 ompi_op_t* op, int root,
443 ompi_communicator_t* comm,
444 mca_coll_base_module_t *module,
445 uint32_t segsize,
446 int max_outstanding_reqs )
447 {
448 int segcount = count;
449 size_t typelng;
450 mca_coll_base_module_t *base_module = (mca_coll_base_module_t*) module;
451 mca_coll_base_comm_t *data = base_module->base_data;
452
453 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,"coll:base:reduce_intra_binary rank %d ss %5d",
454 ompi_comm_rank(comm), segsize));
455
456 COLL_BASE_UPDATE_BINTREE( comm, base_module, root );
457
458 /**
459 * Determine number of segments and number of elements
460 * sent per operation
461 */
462 ompi_datatype_type_size( datatype, &typelng );
463 COLL_BASE_COMPUTED_SEGCOUNT( segsize, typelng, segcount );
464
465 return ompi_coll_base_reduce_generic( sendbuf, recvbuf, count, datatype,
466 op, root, comm, module,
467 data->cached_bintree,
468 segcount, max_outstanding_reqs );
469 }
470
471 int ompi_coll_base_reduce_intra_binomial( const void *sendbuf, void *recvbuf,
472 int count, ompi_datatype_t* datatype,
473 ompi_op_t* op, int root,
474 ompi_communicator_t* comm,
475 mca_coll_base_module_t *module,
476 uint32_t segsize,
477 int max_outstanding_reqs )
478 {
479 int segcount = count;
480 size_t typelng;
481 mca_coll_base_module_t *base_module = (mca_coll_base_module_t*) module;
482 mca_coll_base_comm_t *data = base_module->base_data;
483
484 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,"coll:base:reduce_intra_binomial rank %d ss %5d",
485 ompi_comm_rank(comm), segsize));
486
487 COLL_BASE_UPDATE_IN_ORDER_BMTREE( comm, base_module, root );
488
489 /**
490 * Determine number of segments and number of elements
491 * sent per operation
492 */
493 ompi_datatype_type_size( datatype, &typelng );
494 COLL_BASE_COMPUTED_SEGCOUNT( segsize, typelng, segcount );
495
496 return ompi_coll_base_reduce_generic( sendbuf, recvbuf, count, datatype,
497 op, root, comm, module,
498 data->cached_in_order_bmtree,
499 segcount, max_outstanding_reqs );
500 }
501
502 /*
503 * reduce_intra_in_order_binary
504 *
505 * Function: Logarithmic reduce operation for non-commutative operations.
506 * Acecpts: same as MPI_Reduce()
507 * Returns: MPI_SUCCESS or error code
508 */
509 int ompi_coll_base_reduce_intra_in_order_binary( const void *sendbuf, void *recvbuf,
510 int count,
511 ompi_datatype_t* datatype,
512 ompi_op_t* op, int root,
513 ompi_communicator_t* comm,
514 mca_coll_base_module_t *module,
515 uint32_t segsize,
516 int max_outstanding_reqs )
517 {
518 int ret, rank, size, io_root, segcount = count;
519 void *use_this_sendbuf = NULL;
520 void *use_this_recvbuf = NULL;
521 char *tmpbuf_free = NULL;
522 size_t typelng;
523 mca_coll_base_module_t *base_module = (mca_coll_base_module_t*) module;
524 mca_coll_base_comm_t *data = base_module->base_data;
525
526 rank = ompi_comm_rank(comm);
527 size = ompi_comm_size(comm);
528 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,"coll:base:reduce_intra_in_order_binary rank %d ss %5d",
529 rank, segsize));
530
531 COLL_BASE_UPDATE_IN_ORDER_BINTREE( comm, base_module );
532
533 /**
534 * Determine number of segments and number of elements
535 * sent per operation
536 */
537 ompi_datatype_type_size( datatype, &typelng );
538 COLL_BASE_COMPUTED_SEGCOUNT( segsize, typelng, segcount );
539
540 /* An in-order binary tree must use root (size-1) to preserve the order of
541 operations. Thus, if root is not rank (size - 1), then we must handle
542 1. MPI_IN_PLACE option on real root, and
543 2. we must allocate temporary recvbuf on rank (size - 1).
544 Note that generic function must be careful not to switch order of
545 operations for non-commutative ops.
546 */
547 io_root = size - 1;
548 use_this_sendbuf = (void *)sendbuf;
549 use_this_recvbuf = recvbuf;
550 if (io_root != root) {
551 ptrdiff_t dsize, gap = 0;
552 char *tmpbuf;
553
554 dsize = opal_datatype_span(&datatype->super, count, &gap);
555
556 if ((root == rank) && (MPI_IN_PLACE == sendbuf)) {
557 tmpbuf_free = (char *) malloc(dsize);
558 if (NULL == tmpbuf_free) {
559 return MPI_ERR_INTERN;
560 }
561 tmpbuf = tmpbuf_free - gap;
562 ompi_datatype_copy_content_same_ddt(datatype, count,
563 (char*)tmpbuf,
564 (char*)recvbuf);
565 use_this_sendbuf = tmpbuf;
566 } else if (io_root == rank) {
567 tmpbuf_free = (char *) malloc(dsize);
568 if (NULL == tmpbuf_free) {
569 return MPI_ERR_INTERN;
570 }
571 tmpbuf = tmpbuf_free - gap;
572 use_this_recvbuf = tmpbuf;
573 }
574 }
575
576 /* Use generic reduce with in-order binary tree topology and io_root */
577 ret = ompi_coll_base_reduce_generic( use_this_sendbuf, use_this_recvbuf, count, datatype,
578 op, io_root, comm, module,
579 data->cached_in_order_bintree,
580 segcount, max_outstanding_reqs );
581 if (MPI_SUCCESS != ret) { return ret; }
582
583 /* Clean up */
584 if (io_root != root) {
585 if (root == rank) {
586 /* Receive result from rank io_root to recvbuf */
587 ret = MCA_PML_CALL(recv(recvbuf, count, datatype, io_root,
588 MCA_COLL_BASE_TAG_REDUCE, comm,
589 MPI_STATUS_IGNORE));
590 if (MPI_SUCCESS != ret) { return ret; }
591
592 } else if (io_root == rank) {
593 /* Send result from use_this_recvbuf to root */
594 ret = MCA_PML_CALL(send(use_this_recvbuf, count, datatype, root,
595 MCA_COLL_BASE_TAG_REDUCE,
596 MCA_PML_BASE_SEND_STANDARD, comm));
597 if (MPI_SUCCESS != ret) { return ret; }
598 }
599 }
600 if (NULL != tmpbuf_free) {
601 free(tmpbuf_free);
602 }
603
604 return MPI_SUCCESS;
605 }
606
607 /*
608 * Linear functions are copied from the BASIC coll module
609 * they do not segment the message and are simple implementations
610 * but for some small number of nodes and/or small data sizes they
611 * are just as fast as base/tree based segmenting operations
612 * and as such may be selected by the decision functions
613 * These are copied into this module due to the way we select modules
614 * in V1. i.e. in V2 we will handle this differently and so will not
615 * have to duplicate code.
616 * GEF Oct05 after asking Jeff.
617 */
618
619 /*
620 * reduce_lin_intra
621 *
622 * Function: - reduction using O(N) algorithm
623 * Accepts: - same as MPI_Reduce()
624 * Returns: - MPI_SUCCESS or error code
625 */
626 int
627 ompi_coll_base_reduce_intra_basic_linear(const void *sbuf, void *rbuf, int count,
628 struct ompi_datatype_t *dtype,
629 struct ompi_op_t *op,
630 int root,
631 struct ompi_communicator_t *comm,
632 mca_coll_base_module_t *module)
633 {
634 int i, rank, err, size;
635 ptrdiff_t extent, dsize, gap = 0;
636 char *free_buffer = NULL;
637 char *pml_buffer = NULL;
638 char *inplace_temp_free = NULL;
639 char *inbuf;
640
641 /* Initialize */
642
643 rank = ompi_comm_rank(comm);
644 size = ompi_comm_size(comm);
645
646 /* If not root, send data to the root. */
647
648 if (rank != root) {
649 err = MCA_PML_CALL(send(sbuf, count, dtype, root,
650 MCA_COLL_BASE_TAG_REDUCE,
651 MCA_PML_BASE_SEND_STANDARD, comm));
652 return err;
653 }
654
655 dsize = opal_datatype_span(&dtype->super, count, &gap);
656 ompi_datatype_type_extent(dtype, &extent);
657
658 if (MPI_IN_PLACE == sbuf) {
659 sbuf = rbuf;
660 inplace_temp_free = (char*)malloc(dsize);
661 if (NULL == inplace_temp_free) {
662 return OMPI_ERR_OUT_OF_RESOURCE;
663 }
664 rbuf = inplace_temp_free - gap;
665 }
666
667 if (size > 1) {
668 free_buffer = (char*)malloc(dsize);
669 if (NULL == free_buffer) {
670 if (NULL != inplace_temp_free) {
671 free(inplace_temp_free);
672 }
673 return OMPI_ERR_OUT_OF_RESOURCE;
674 }
675 pml_buffer = free_buffer - gap;
676 }
677
678 /* Initialize the receive buffer. */
679
680 if (rank == (size - 1)) {
681 err = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf);
682 } else {
683 err = MCA_PML_CALL(recv(rbuf, count, dtype, size - 1,
684 MCA_COLL_BASE_TAG_REDUCE, comm,
685 MPI_STATUS_IGNORE));
686 }
687 if (MPI_SUCCESS != err) {
688 if (NULL != free_buffer) {
689 free(free_buffer);
690 }
691 return err;
692 }
693
694 /* Loop receiving and calling reduction function (C or Fortran). */
695
696 for (i = size - 2; i >= 0; --i) {
697 if (rank == i) {
698 inbuf = (char*)sbuf;
699 } else {
700 err = MCA_PML_CALL(recv(pml_buffer, count, dtype, i,
701 MCA_COLL_BASE_TAG_REDUCE, comm,
702 MPI_STATUS_IGNORE));
703 if (MPI_SUCCESS != err) {
704 if (NULL != free_buffer) {
705 free(free_buffer);
706 }
707 return err;
708 }
709
710 inbuf = pml_buffer;
711 }
712
713 /* Perform the reduction */
714
715 ompi_op_reduce(op, inbuf, rbuf, count, dtype);
716 }
717
718 if (NULL != inplace_temp_free) {
719 err = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)sbuf, rbuf);
720 free(inplace_temp_free);
721 }
722 if (NULL != free_buffer) {
723 free(free_buffer);
724 }
725
726 /* All done */
727
728 return MPI_SUCCESS;
729 }
730
731 /*
732 * ompi_coll_base_reduce_intra_redscat_gather
733 *
734 * Function: Reduce using Rabenseifner's algorithm.
735 * Accepts: Same arguments as MPI_Reduce
736 * Returns: MPI_SUCCESS or error code
737 *
738 * Description: an implementation of Rabenseifner's reduce algorithm [1, 2].
739 * [1] Rajeev Thakur, Rolf Rabenseifner and William Gropp.
740 * Optimization of Collective Communication Operations in MPICH //
741 * The Int. Journal of High Performance Computing Applications. Vol 19,
742 * Issue 1, pp. 49--66.
743 * [2] http://www.hlrs.de/mpi/myreduce.html.
744 *
745 * This algorithm is a combination of a reduce-scatter implemented with
746 * recursive vector halving and recursive distance doubling, followed either
747 * by a binomial tree gather [1].
748 *
749 * Step 1. If the number of processes is not a power of two, reduce it to
750 * the nearest lower power of two (p' = 2^{\floor{\log_2 p}})
751 * by removing r = p - p' extra processes as follows. In the first 2r processes
752 * (ranks 0 to 2r - 1), all the even ranks send the second half of the input
753 * vector to their right neighbor (rank + 1), and all the odd ranks send
754 * the first half of the input vector to their left neighbor (rank - 1).
755 * The even ranks compute the reduction on the first half of the vector and
756 * the odd ranks compute the reduction on the second half. The odd ranks then
757 * send the result to their left neighbors (the even ranks). As a result,
758 * the even ranks among the first 2r processes now contain the reduction with
759 * the input vector on their right neighbors (the odd ranks). These odd ranks
760 * do not participate in the rest of the algorithm, which leaves behind
761 * a power-of-two number of processes. The first r even-ranked processes and
762 * the last p - 2r processes are now renumbered from 0 to p' - 1.
763 *
764 * Step 2. The remaining processes now perform a reduce-scatter by using
765 * recursive vector halving and recursive distance doubling. The even-ranked
766 * processes send the second half of their buffer to rank + 1 and the odd-ranked
767 * processes send the first half of their buffer to rank - 1. All processes
768 * then compute the reduction between the local buffer and the received buffer.
769 * In the next log_2(p') - 1 steps, the buffers are recursively halved, and the
770 * distance is doubled. At the end, each of the p' processes has 1 / p' of the
771 * total reduction result.
772 *
773 * Step 3. A binomial tree gather is performed by using recursive vector
774 * doubling and distance halving. In the non-power-of-two case, if the root
775 * happens to be one of those odd-ranked processes that would normally
776 * be removed in the first step, then the role of this process and process 0
777 * are interchanged.
778 *
779 * Limitations:
780 * count >= 2^{\floor{\log_2 p}}
781 * commutative operations only
782 * intra-communicators only
783 *
784 * Memory requirements (per process):
785 * rank != root: 2 * count * typesize + 4 * \log_2(p) * sizeof(int) = O(count)
786 * rank == root: count * typesize + 4 * \log_2(p) * sizeof(int) = O(count)
787 *
788 * Recommendations: root = 0, otherwise it is required additional steps
789 * in the root process.
790 */
791 int ompi_coll_base_reduce_intra_redscat_gather(
792 const void *sbuf, void *rbuf, int count, struct ompi_datatype_t *dtype,
793 struct ompi_op_t *op, int root, struct ompi_communicator_t *comm,
794 mca_coll_base_module_t *module)
795 {
796 int comm_size = ompi_comm_size(comm);
797 int rank = ompi_comm_rank(comm);
798
799 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
800 "coll:base:reduce_intra_redscat_gather: rank %d/%d, root %d",
801 rank, comm_size, root));
802
803 /* Find nearest power-of-two less than or equal to comm_size */
804 int nsteps = opal_hibit(comm_size, comm->c_cube_dim + 1); /* ilog2(comm_size) */
805 assert(nsteps >= 0);
806 int nprocs_pof2 = 1 << nsteps; /* flp2(comm_size) */
807
808 if (nprocs_pof2 < 2 || count < nprocs_pof2 || !ompi_op_is_commute(op)) {
809 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
810 "coll:base:reduce_intra_redscat_gather: rank %d/%d count %d "
811 "switching to basic linear reduce", rank, comm_size, count));
812 return ompi_coll_base_reduce_intra_basic_linear(sbuf, rbuf, count, dtype,
813 op, root, comm, module);
814 }
815
816 int err = MPI_SUCCESS;
817 int *rindex = NULL, *rcount = NULL, *sindex = NULL, *scount = NULL;
818 ptrdiff_t lb, extent, dsize, gap;
819 ompi_datatype_get_extent(dtype, &lb, &extent);
820 dsize = opal_datatype_span(&dtype->super, count, &gap);
821
822 /* Temporary buffers */
823 char *tmp_buf_raw = NULL, *rbuf_raw = NULL;
824 tmp_buf_raw = malloc(dsize);
825 if (NULL == tmp_buf_raw) {
826 err = OMPI_ERR_OUT_OF_RESOURCE;
827 goto cleanup_and_return;
828 }
829 char *tmp_buf = tmp_buf_raw - gap;
830
831 if (rank != root) {
832 rbuf_raw = malloc(dsize);
833 if (NULL == rbuf_raw) {
834 err = OMPI_ERR_OUT_OF_RESOURCE;
835 goto cleanup_and_return;
836 }
837 rbuf = rbuf_raw - gap;
838 }
839
840 if ((rank != root) || (sbuf != MPI_IN_PLACE)) {
841 err = ompi_datatype_copy_content_same_ddt(dtype, count, rbuf,
842 (char *)sbuf);
843 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
844 }
845
846 /*
847 * Step 1. Reduce the number of processes to the nearest lower power of two
848 * p' = 2^{\floor{\log_2 p}} by removing r = p - p' processes.
849 * 1. In the first 2r processes (ranks 0 to 2r - 1), all the even ranks send
850 * the second half of the input vector to their right neighbor (rank + 1)
851 * and all the odd ranks send the first half of the input vector to their
852 * left neighbor (rank - 1).
853 * 2. All 2r processes compute the reduction on their half.
854 * 3. The odd ranks then send the result to their left neighbors
855 * (the even ranks).
856 *
857 * The even ranks (0 to 2r - 1) now contain the reduction with the input
858 * vector on their right neighbors (the odd ranks). The first r even
859 * processes and the p - 2r last processes are renumbered from
860 * 0 to 2^{\floor{\log_2 p}} - 1. These odd ranks do not participate in the
861 * rest of the algorithm.
862 */
863
864 int vrank, step, wsize;
865 int nprocs_rem = comm_size - nprocs_pof2;
866
867 if (rank < 2 * nprocs_rem) {
868 int count_lhalf = count / 2;
869 int count_rhalf = count - count_lhalf;
870
871 if (rank % 2 != 0) {
872 /*
873 * Odd process -- exchange with rank - 1
874 * Send the left half of the input vector to the left neighbor,
875 * Recv the right half of the input vector from the left neighbor
876 */
877 err = ompi_coll_base_sendrecv(rbuf, count_lhalf, dtype, rank - 1,
878 MCA_COLL_BASE_TAG_REDUCE,
879 (char *)tmp_buf + (ptrdiff_t)count_lhalf * extent,
880 count_rhalf, dtype, rank - 1,
881 MCA_COLL_BASE_TAG_REDUCE, comm,
882 MPI_STATUS_IGNORE, rank);
883 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
884
885 /* Reduce on the right half of the buffers (result in rbuf) */
886 ompi_op_reduce(op, (char *)tmp_buf + (ptrdiff_t)count_lhalf * extent,
887 (char *)rbuf + count_lhalf * extent, count_rhalf, dtype);
888
889 /* Send the right half to the left neighbor */
890 err = MCA_PML_CALL(send((char *)rbuf + (ptrdiff_t)count_lhalf * extent,
891 count_rhalf, dtype, rank - 1,
892 MCA_COLL_BASE_TAG_REDUCE,
893 MCA_PML_BASE_SEND_STANDARD, comm));
894 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
895
896 /* This process does not pariticipate in recursive doubling phase */
897 vrank = -1;
898
899 } else {
900 /*
901 * Even process -- exchange with rank + 1
902 * Send the right half of the input vector to the right neighbor,
903 * Recv the left half of the input vector from the right neighbor
904 */
905 err = ompi_coll_base_sendrecv((char *)rbuf + (ptrdiff_t)count_lhalf * extent,
906 count_rhalf, dtype, rank + 1,
907 MCA_COLL_BASE_TAG_REDUCE,
908 tmp_buf, count_lhalf, dtype, rank + 1,
909 MCA_COLL_BASE_TAG_REDUCE, comm,
910 MPI_STATUS_IGNORE, rank);
911 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
912
913 /* Reduce on the right half of the buffers (result in rbuf) */
914 ompi_op_reduce(op, tmp_buf, rbuf, count_lhalf, dtype);
915
916 /* Recv the right half from the right neighbor */
917 err = MCA_PML_CALL(recv((char *)rbuf + (ptrdiff_t)count_lhalf * extent,
918 count_rhalf, dtype, rank + 1,
919 MCA_COLL_BASE_TAG_REDUCE, comm,
920 MPI_STATUS_IGNORE));
921 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
922
923 vrank = rank / 2;
924 }
925 } else { /* rank >= 2 * nprocs_rem */
926 vrank = rank - nprocs_rem;
927 }
928
929 /*
930 * Step 2. Reduce-scatter implemented with recursive vector halving and
931 * recursive distance doubling. We have p' = 2^{\floor{\log_2 p}}
932 * power-of-two number of processes with new ranks (vrank) and result in rbuf.
933 *
934 * The even-ranked processes send the right half of their buffer to rank + 1
935 * and the odd-ranked processes send the left half of their buffer to
936 * rank - 1. All processes then compute the reduction between the local
937 * buffer and the received buffer. In the next \log_2(p') - 1 steps, the
938 * buffers are recursively halved, and the distance is doubled. At the end,
939 * each of the p' processes has 1 / p' of the total reduction result.
940 */
941
942 rindex = malloc(sizeof(*rindex) * nsteps); /* O(\log_2(p)) */
943 sindex = malloc(sizeof(*sindex) * nsteps);
944 rcount = malloc(sizeof(*rcount) * nsteps);
945 scount = malloc(sizeof(*scount) * nsteps);
946 if (NULL == rindex || NULL == sindex || NULL == rcount || NULL == scount) {
947 err = OMPI_ERR_OUT_OF_RESOURCE;
948 goto cleanup_and_return;
949 }
950
951 if (vrank != -1) {
952 step = 0;
953 wsize = count;
954 sindex[0] = rindex[0] = 0;
955
956 for (int mask = 1; mask < nprocs_pof2; mask <<= 1) {
957 /*
958 * On each iteration: rindex[step] = sindex[step] -- begining of the
959 * current window. Length of the current window is storded in wsize.
960 */
961 int vdest = vrank ^ mask;
962 /* Translate vdest virtual rank to real rank */
963 int dest = (vdest < nprocs_rem) ? vdest * 2 : vdest + nprocs_rem;
964
965 if (rank < dest) {
966 /*
967 * Recv into the left half of the current window, send the right
968 * half of the window to the peer (perform reduce on the left
969 * half of the current window)
970 */
971 rcount[step] = wsize / 2;
972 scount[step] = wsize - rcount[step];
973 sindex[step] = rindex[step] + rcount[step];
974 } else {
975 /*
976 * Recv into the right half of the current window, send the left
977 * half of the window to the peer (perform reduce on the right
978 * half of the current window)
979 */
980 scount[step] = wsize / 2;
981 rcount[step] = wsize - scount[step];
982 rindex[step] = sindex[step] + scount[step];
983 }
984
985 /* Send part of data from the rbuf, recv into the tmp_buf */
986 err = ompi_coll_base_sendrecv((char *)rbuf + (ptrdiff_t)sindex[step] * extent,
987 scount[step], dtype, dest,
988 MCA_COLL_BASE_TAG_REDUCE,
989 (char *)tmp_buf + (ptrdiff_t)rindex[step] * extent,
990 rcount[step], dtype, dest,
991 MCA_COLL_BASE_TAG_REDUCE, comm,
992 MPI_STATUS_IGNORE, rank);
993 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
994
995 /* Local reduce: rbuf[] = tmp_buf[] <op> rbuf[] */
996 ompi_op_reduce(op, (char *)tmp_buf + (ptrdiff_t)rindex[step] * extent,
997 (char *)rbuf + (ptrdiff_t)rindex[step] * extent,
998 rcount[step], dtype);
999
1000 /* Move the current window to the received message */
1001 if (step + 1 < nsteps) {
1002 rindex[step + 1] = rindex[step];
1003 sindex[step + 1] = rindex[step];
1004 wsize = rcount[step];
1005 step++;
1006 }
1007 }
1008 }
1009 /*
1010 * Assertion: each process has 1 / p' of the total reduction result:
1011 * rcount[nsteps - 1] elements in the rbuf[rindex[nsteps - 1], ...].
1012 */
1013
1014 /*
1015 * Setup the root process for gather operation.
1016 * Case 1: root < 2r and root is odd -- root process was excluded on step 1
1017 * Recv data from process 0, vroot = 0, vrank = 0
1018 * Case 2: root < 2r and root is even: vroot = root / 2
1019 * Case 3: root >= 2r: vroot = root - r
1020 */
1021 int vroot = 0;
1022 if (root < 2 * nprocs_rem) {
1023 if (root % 2 != 0) {
1024 vroot = 0;
1025 if (rank == root) {
1026 /*
1027 * Case 1: root < 2r and root is odd -- root process was
1028 * excluded on step 1 (newrank == -1).
1029 * Recv a data from the process 0.
1030 */
1031 rindex[0] = 0;
1032 step = 0, wsize = count;
1033 for (int mask = 1; mask < nprocs_pof2; mask *= 2) {
1034 rcount[step] = wsize / 2;
1035 scount[step] = wsize - rcount[step];
1036 rindex[step] = 0;
1037 sindex[step] = rcount[step];
1038 step++;
1039 wsize /= 2;
1040 }
1041
1042 err = MCA_PML_CALL(recv(rbuf, rcount[nsteps - 1], dtype, 0,
1043 MCA_COLL_BASE_TAG_REDUCE, comm,
1044 MPI_STATUS_IGNORE));
1045 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
1046 vrank = 0;
1047
1048 } else if (vrank == 0) {
1049 /* Send a data to the root */
1050 err = MCA_PML_CALL(send(rbuf, rcount[nsteps - 1], dtype, root,
1051 MCA_COLL_BASE_TAG_REDUCE,
1052 MCA_PML_BASE_SEND_STANDARD, comm));
1053 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
1054 vrank = -1;
1055 }
1056 } else {
1057 /* Case 2: root < 2r and a root is even: vroot = root / 2 */
1058 vroot = root / 2;
1059 }
1060 } else {
1061 /* Case 3: root >= 2r: newroot = root - r */
1062 vroot = root - nprocs_rem;
1063 }
1064
1065 /*
1066 * Step 3. Gather result at the vroot by the binomial tree algorithm.
1067 * Each process has 1 / p' of the total reduction result:
1068 * rcount[nsteps - 1] elements in the rbuf[rindex[nsteps - 1], ...].
1069 * All exchanges are executed in reverse order relative
1070 * to recursive doubling (previous step).
1071 */
1072
1073 if (vrank != -1) {
1074 int vdest_tree, vroot_tree;
1075 step = nsteps - 1; /* step = ilog2(p') - 1 */
1076
1077 for (int mask = nprocs_pof2 >> 1; mask > 0; mask >>= 1) {
1078 int vdest = vrank ^ mask;
1079 /* Translate vdest virtual rank to real rank */
1080 int dest = (vdest < nprocs_rem) ? vdest * 2 : vdest + nprocs_rem;
1081 if ((vdest == 0) && (root < 2 * nprocs_rem) && (root % 2 != 0))
1082 dest = root;
1083
1084 vdest_tree = vdest >> step;
1085 vdest_tree <<= step;
1086 vroot_tree = vroot >> step;
1087 vroot_tree <<= step;
1088 if (vdest_tree == vroot_tree) {
1089 /* Send data from rbuf and exit */
1090 err = MCA_PML_CALL(send((char *)rbuf + (ptrdiff_t)rindex[step] * extent,
1091 rcount[step], dtype, dest,
1092 MCA_COLL_BASE_TAG_REDUCE,
1093 MCA_PML_BASE_SEND_STANDARD, comm));
1094 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
1095 break;
1096 } else {
1097 /* Recv and continue */
1098 err = MCA_PML_CALL(recv((char *)rbuf + (ptrdiff_t)sindex[step] * extent,
1099 scount[step], dtype, dest,
1100 MCA_COLL_BASE_TAG_REDUCE, comm,
1101 MPI_STATUS_IGNORE));
1102 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
1103 }
1104 step--;
1105 }
1106 }
1107
1108 cleanup_and_return:
1109 if (NULL != tmp_buf_raw)
1110 free(tmp_buf_raw);
1111 if (NULL != rbuf_raw)
1112 free(rbuf_raw);
1113 if (NULL != rindex)
1114 free(rindex);
1115 if (NULL != sindex)
1116 free(sindex);
1117 if (NULL != rcount)
1118 free(rcount);
1119 if (NULL != scount)
1120 free(scount);
1121 return err;
1122 }