1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
2 /*
3 * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
4 * University Research and Technology
5 * Corporation. All rights reserved.
6 * Copyright (c) 2004-2017 The University of Tennessee and The University
7 * of Tennessee Research Foundation. All rights
8 * reserved.
9 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
10 * University of Stuttgart. All rights reserved.
11 * Copyright (c) 2004-2005 The Regents of the University of California.
12 * All rights reserved.
13 * Copyright (c) 2008 Sun Microsystems, Inc. All rights reserved.
14 * Copyright (c) 2009 University of Houston. All rights reserved.
15 * Copyright (c) 2013 Los Alamos National Security, LLC. All rights
16 * reserved.
17 * Copyright (c) 2015-2016 Research Organization for Information Science
18 * and Technology (RIST). All rights reserved.
19 * $COPYRIGHT$
20 *
21 * Additional copyrights may follow
22 *
23 * $HEADER$
24 */
25
26 #include "ompi_config.h"
27
28 #include "mpi.h"
29 #include "opal/util/bit_ops.h"
30 #include "ompi/constants.h"
31 #include "ompi/datatype/ompi_datatype.h"
32 #include "ompi/communicator/communicator.h"
33 #include "ompi/mca/coll/coll.h"
34 #include "ompi/mca/coll/base/coll_tags.h"
35 #include "ompi/mca/pml/pml.h"
36 #include "ompi/op/op.h"
37 #include "ompi/mca/coll/base/coll_base_functions.h"
38 #include "coll_base_topo.h"
39 #include "coll_base_util.h"
40
41 /*******************************************************************************
42 * ompi_coll_base_reduce_scatter_intra_nonoverlapping
43 *
44 * This function just calls a reduce to rank 0, followed by an
45 * appropriate scatterv call.
46 */
47 int ompi_coll_base_reduce_scatter_intra_nonoverlapping(const void *sbuf, void *rbuf,
48 const int *rcounts,
49 struct ompi_datatype_t *dtype,
50 struct ompi_op_t *op,
51 struct ompi_communicator_t *comm,
52 mca_coll_base_module_t *module)
53 {
54 int err, i, rank, size, total_count, *displs = NULL;
55 const int root = 0;
56 char *tmprbuf = NULL, *tmprbuf_free = NULL;
57
58 rank = ompi_comm_rank(comm);
59 size = ompi_comm_size(comm);
60
61 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,"coll:base:reduce_scatter_intra_nonoverlapping, rank %d", rank));
62
63 for (i = 0, total_count = 0; i < size; i++) { total_count += rcounts[i]; }
64
65 /* Reduce to rank 0 (root) and scatterv */
66 tmprbuf = (char*) rbuf;
67 if (MPI_IN_PLACE == sbuf) {
68 /* rbuf on root (0) is big enough to hold whole data */
69 if (root == rank) {
70 err = comm->c_coll->coll_reduce (MPI_IN_PLACE, tmprbuf, total_count,
71 dtype, op, root, comm, comm->c_coll->coll_reduce_module);
72 } else {
73 err = comm->c_coll->coll_reduce(tmprbuf, NULL, total_count,
74 dtype, op, root, comm, comm->c_coll->coll_reduce_module);
75 }
76 } else {
77 if (root == rank) {
78 /* We must allocate temporary receive buffer on root to ensure that
79 rbuf is big enough */
80 ptrdiff_t dsize, gap = 0;
81 dsize = opal_datatype_span(&dtype->super, total_count, &gap);
82
83 tmprbuf_free = (char*) malloc(dsize);
84 tmprbuf = tmprbuf_free - gap;
85 }
86 err = comm->c_coll->coll_reduce (sbuf, tmprbuf, total_count,
87 dtype, op, root, comm, comm->c_coll->coll_reduce_module);
88 }
89 if (MPI_SUCCESS != err) {
90 if (NULL != tmprbuf_free) free(tmprbuf_free);
91 return err;
92 }
93
94 displs = (int*) malloc(size * sizeof(int));
95 displs[0] = 0;
96 for (i = 1; i < size; i++) {
97 displs[i] = displs[i-1] + rcounts[i-1];
98 }
99 if (MPI_IN_PLACE == sbuf && root == rank) {
100 err = comm->c_coll->coll_scatterv (tmprbuf, rcounts, displs, dtype,
101 MPI_IN_PLACE, 0, MPI_DATATYPE_NULL,
102 root, comm, comm->c_coll->coll_scatterv_module);
103 } else {
104 err = comm->c_coll->coll_scatterv (tmprbuf, rcounts, displs, dtype,
105 rbuf, rcounts[rank], dtype,
106 root, comm, comm->c_coll->coll_scatterv_module);
107 }
108 free(displs);
109 if (NULL != tmprbuf_free) free(tmprbuf_free);
110
111 return err;
112 }
113
114 /*
115 * Recursive-halving function is (*mostly*) copied from the BASIC coll module.
116 * I have removed the part which handles "large" message sizes
117 * (non-overlapping version of reduce_Scatter).
118 */
119
120 /* copied function (with appropriate renaming) starts here */
121
122 /*
123 * reduce_scatter_intra_basic_recursivehalving
124 *
125 * Function: - reduce scatter implementation using recursive-halving
126 * algorithm
127 * Accepts: - same as MPI_Reduce_scatter()
128 * Returns: - MPI_SUCCESS or error code
129 * Limitation: - Works only for commutative operations.
130 */
131 int
132 ompi_coll_base_reduce_scatter_intra_basic_recursivehalving( const void *sbuf,
133 void *rbuf,
134 const int *rcounts,
135 struct ompi_datatype_t *dtype,
136 struct ompi_op_t *op,
137 struct ompi_communicator_t *comm,
138 mca_coll_base_module_t *module)
139 {
140 int i, rank, size, count, err = OMPI_SUCCESS;
141 int tmp_size, remain = 0, tmp_rank, *disps = NULL;
142 ptrdiff_t extent, buf_size, gap = 0;
143 char *recv_buf = NULL, *recv_buf_free = NULL;
144 char *result_buf = NULL, *result_buf_free = NULL;
145
146 /* Initialize */
147 rank = ompi_comm_rank(comm);
148 size = ompi_comm_size(comm);
149
150 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,"coll:base:reduce_scatter_intra_basic_recursivehalving, rank %d", rank));
151
152 /* Find displacements and the like */
153 disps = (int*) malloc(sizeof(int) * size);
154 if (NULL == disps) return OMPI_ERR_OUT_OF_RESOURCE;
155
156 disps[0] = 0;
157 for (i = 0; i < (size - 1); ++i) {
158 disps[i + 1] = disps[i] + rcounts[i];
159 }
160 count = disps[size - 1] + rcounts[size - 1];
161
162 /* short cut the trivial case */
163 if (0 == count) {
164 free(disps);
165 return OMPI_SUCCESS;
166 }
167
168 /* get datatype information */
169 ompi_datatype_type_extent(dtype, &extent);
170 buf_size = opal_datatype_span(&dtype->super, count, &gap);
171
172 /* Handle MPI_IN_PLACE */
173 if (MPI_IN_PLACE == sbuf) {
174 sbuf = rbuf;
175 }
176
177 /* Allocate temporary receive buffer. */
178 recv_buf_free = (char*) malloc(buf_size);
179 recv_buf = recv_buf_free - gap;
180 if (NULL == recv_buf_free) {
181 err = OMPI_ERR_OUT_OF_RESOURCE;
182 goto cleanup;
183 }
184
185 /* allocate temporary buffer for results */
186 result_buf_free = (char*) malloc(buf_size);
187 result_buf = result_buf_free - gap;
188
189 /* copy local buffer into the temporary results */
190 err = ompi_datatype_sndrcv(sbuf, count, dtype, result_buf, count, dtype);
191 if (OMPI_SUCCESS != err) goto cleanup;
192
193 /* figure out power of two mapping: grow until larger than
194 comm size, then go back one, to get the largest power of
195 two less than comm size */
196 tmp_size = opal_next_poweroftwo (size);
197 tmp_size >>= 1;
198 remain = size - tmp_size;
199
200 /* If comm size is not a power of two, have the first "remain"
201 procs with an even rank send to rank + 1, leaving a power of
202 two procs to do the rest of the algorithm */
203 if (rank < 2 * remain) {
204 if ((rank & 1) == 0) {
205 err = MCA_PML_CALL(send(result_buf, count, dtype, rank + 1,
206 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
207 MCA_PML_BASE_SEND_STANDARD,
208 comm));
209 if (OMPI_SUCCESS != err) goto cleanup;
210
211 /* we don't participate from here on out */
212 tmp_rank = -1;
213 } else {
214 err = MCA_PML_CALL(recv(recv_buf, count, dtype, rank - 1,
215 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
216 comm, MPI_STATUS_IGNORE));
217
218 /* integrate their results into our temp results */
219 ompi_op_reduce(op, recv_buf, result_buf, count, dtype);
220
221 /* adjust rank to be the bottom "remain" ranks */
222 tmp_rank = rank / 2;
223 }
224 } else {
225 /* just need to adjust rank to show that the bottom "even
226 remain" ranks dropped out */
227 tmp_rank = rank - remain;
228 }
229
230 /* For ranks not kicked out by the above code, perform the
231 recursive halving */
232 if (tmp_rank >= 0) {
233 int *tmp_disps = NULL, *tmp_rcounts = NULL;
234 int mask, send_index, recv_index, last_index;
235
236 /* recalculate disps and rcounts to account for the
237 special "remainder" processes that are no longer doing
238 anything */
239 tmp_rcounts = (int*) malloc(tmp_size * sizeof(int));
240 if (NULL == tmp_rcounts) {
241 err = OMPI_ERR_OUT_OF_RESOURCE;
242 goto cleanup;
243 }
244 tmp_disps = (int*) malloc(tmp_size * sizeof(int));
245 if (NULL == tmp_disps) {
246 free(tmp_rcounts);
247 err = OMPI_ERR_OUT_OF_RESOURCE;
248 goto cleanup;
249 }
250
251 for (i = 0 ; i < tmp_size ; ++i) {
252 if (i < remain) {
253 /* need to include old neighbor as well */
254 tmp_rcounts[i] = rcounts[i * 2 + 1] + rcounts[i * 2];
255 } else {
256 tmp_rcounts[i] = rcounts[i + remain];
257 }
258 }
259
260 tmp_disps[0] = 0;
261 for (i = 0; i < tmp_size - 1; ++i) {
262 tmp_disps[i + 1] = tmp_disps[i] + tmp_rcounts[i];
263 }
264
265 /* do the recursive halving communication. Don't use the
266 dimension information on the communicator because I
267 think the information is invalidated by our "shrinking"
268 of the communicator */
269 mask = tmp_size >> 1;
270 send_index = recv_index = 0;
271 last_index = tmp_size;
272 while (mask > 0) {
273 int tmp_peer, peer, send_count, recv_count;
274 struct ompi_request_t *request;
275
276 tmp_peer = tmp_rank ^ mask;
277 peer = (tmp_peer < remain) ? tmp_peer * 2 + 1 : tmp_peer + remain;
278
279 /* figure out if we're sending, receiving, or both */
280 send_count = recv_count = 0;
281 if (tmp_rank < tmp_peer) {
282 send_index = recv_index + mask;
283 for (i = send_index ; i < last_index ; ++i) {
284 send_count += tmp_rcounts[i];
285 }
286 for (i = recv_index ; i < send_index ; ++i) {
287 recv_count += tmp_rcounts[i];
288 }
289 } else {
290 recv_index = send_index + mask;
291 for (i = send_index ; i < recv_index ; ++i) {
292 send_count += tmp_rcounts[i];
293 }
294 for (i = recv_index ; i < last_index ; ++i) {
295 recv_count += tmp_rcounts[i];
296 }
297 }
298
299 /* actual data transfer. Send from result_buf,
300 receive into recv_buf */
301 if (recv_count > 0) {
302 err = MCA_PML_CALL(irecv(recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
303 recv_count, dtype, peer,
304 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
305 comm, &request));
306 if (OMPI_SUCCESS != err) {
307 free(tmp_rcounts);
308 free(tmp_disps);
309 goto cleanup;
310 }
311 }
312 if (send_count > 0) {
313 err = MCA_PML_CALL(send(result_buf + (ptrdiff_t)tmp_disps[send_index] * extent,
314 send_count, dtype, peer,
315 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
316 MCA_PML_BASE_SEND_STANDARD,
317 comm));
318 if (OMPI_SUCCESS != err) {
319 free(tmp_rcounts);
320 free(tmp_disps);
321 goto cleanup;
322 }
323 }
324
325 /* if we received something on this step, push it into
326 the results buffer */
327 if (recv_count > 0) {
328 err = ompi_request_wait(&request, MPI_STATUS_IGNORE);
329 if (OMPI_SUCCESS != err) {
330 free(tmp_rcounts);
331 free(tmp_disps);
332 goto cleanup;
333 }
334
335 ompi_op_reduce(op,
336 recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
337 result_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
338 recv_count, dtype);
339 }
340
341 /* update for next iteration */
342 send_index = recv_index;
343 last_index = recv_index + mask;
344 mask >>= 1;
345 }
346
347 /* copy local results from results buffer into real receive buffer */
348 if (0 != rcounts[rank]) {
349 err = ompi_datatype_sndrcv(result_buf + disps[rank] * extent,
350 rcounts[rank], dtype,
351 rbuf, rcounts[rank], dtype);
352 if (OMPI_SUCCESS != err) {
353 free(tmp_rcounts);
354 free(tmp_disps);
355 goto cleanup;
356 }
357 }
358
359 free(tmp_rcounts);
360 free(tmp_disps);
361 }
362
363 /* Now fix up the non-power of two case, by having the odd
364 procs send the even procs the proper results */
365 if (rank < (2 * remain)) {
366 if ((rank & 1) == 0) {
367 if (rcounts[rank]) {
368 err = MCA_PML_CALL(recv(rbuf, rcounts[rank], dtype, rank + 1,
369 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
370 comm, MPI_STATUS_IGNORE));
371 if (OMPI_SUCCESS != err) goto cleanup;
372 }
373 } else {
374 if (rcounts[rank - 1]) {
375 err = MCA_PML_CALL(send(result_buf + disps[rank - 1] * extent,
376 rcounts[rank - 1], dtype, rank - 1,
377 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
378 MCA_PML_BASE_SEND_STANDARD,
379 comm));
380 if (OMPI_SUCCESS != err) goto cleanup;
381 }
382 }
383 }
384
385 cleanup:
386 if (NULL != disps) free(disps);
387 if (NULL != recv_buf_free) free(recv_buf_free);
388 if (NULL != result_buf_free) free(result_buf_free);
389
390 return err;
391 }
392
393 /* copied function (with appropriate renaming) ends here */
394
395
396 /*
397 * ompi_coll_base_reduce_scatter_intra_ring
398 *
399 * Function: Ring algorithm for reduce_scatter operation
400 * Accepts: Same as MPI_Reduce_scatter()
401 * Returns: MPI_SUCCESS or error code
402 *
403 * Description: Implements ring algorithm for reduce_scatter:
404 * the block sizes defined in rcounts are exchanged and
405 8 updated until they reach proper destination.
406 * Algorithm requires 2 * max(rcounts) extra buffering
407 *
408 * Limitations: The algorithm DOES NOT preserve order of operations so it
409 * can be used only for commutative operations.
410 * Example on 5 nodes:
411 * Initial state
412 * # 0 1 2 3 4
413 * [00] [10] -> [20] [30] [40]
414 * [01] [11] [21] -> [31] [41]
415 * [02] [12] [22] [32] -> [42]
416 * -> [03] [13] [23] [33] [43] --> ..
417 * [04] -> [14] [24] [34] [44]
418 *
419 * COMPUTATION PHASE
420 * Step 0: rank r sends block (r-1) to rank (r+1) and
421 * receives block (r+1) from rank (r-1) [with wraparound].
422 * # 0 1 2 3 4
423 * [00] [10] [10+20] -> [30] [40]
424 * [01] [11] [21] [21+31] -> [41]
425 * -> [02] [12] [22] [32] [32+42] -->..
426 * [43+03] -> [13] [23] [33] [43]
427 * [04] [04+14] -> [24] [34] [44]
428 *
429 * Step 1:
430 * # 0 1 2 3 4
431 * [00] [10] [10+20] [10+20+30] -> [40]
432 * -> [01] [11] [21] [21+31] [21+31+41] ->
433 * [32+42+02] -> [12] [22] [32] [32+42]
434 * [03] [43+03+13] -> [23] [33] [43]
435 * [04] [04+14] [04+14+24] -> [34] [44]
436 *
437 * Step 2:
438 * # 0 1 2 3 4
439 * -> [00] [10] [10+20] [10+20+30] [10+20+30+40] ->
440 * [21+31+41+01]-> [11] [21] [21+31] [21+31+41]
441 * [32+42+02] [32+42+02+12]-> [22] [32] [32+42]
442 * [03] [43+03+13] [43+03+13+23]-> [33] [43]
443 * [04] [04+14] [04+14+24] [04+14+24+34] -> [44]
444 *
445 * Step 3:
446 * # 0 1 2 3 4
447 * [10+20+30+40+00] [10] [10+20] [10+20+30] [10+20+30+40]
448 * [21+31+41+01] [21+31+41+01+11] [21] [21+31] [21+31+41]
449 * [32+42+02] [32+42+02+12] [32+42+02+12+22] [32] [32+42]
450 * [03] [43+03+13] [43+03+13+23] [43+03+13+23+33] [43]
451 * [04] [04+14] [04+14+24] [04+14+24+34] [04+14+24+34+44]
452 * DONE :)
453 *
454 */
455 int
456 ompi_coll_base_reduce_scatter_intra_ring( const void *sbuf, void *rbuf, const int *rcounts,
457 struct ompi_datatype_t *dtype,
458 struct ompi_op_t *op,
459 struct ompi_communicator_t *comm,
460 mca_coll_base_module_t *module)
461 {
462 int ret, line, rank, size, i, k, recv_from, send_to, total_count, max_block_count;
463 int inbi, *displs = NULL;
464 char *tmpsend = NULL, *tmprecv = NULL, *accumbuf = NULL, *accumbuf_free = NULL;
465 char *inbuf_free[2] = {NULL, NULL}, *inbuf[2] = {NULL, NULL};
466 ptrdiff_t extent, max_real_segsize, dsize, gap = 0;
467 ompi_request_t *reqs[2] = {MPI_REQUEST_NULL, MPI_REQUEST_NULL};
468
469 size = ompi_comm_size(comm);
470 rank = ompi_comm_rank(comm);
471
472 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
473 "coll:base:reduce_scatter_intra_ring rank %d, size %d",
474 rank, size));
475
476 /* Determine the maximum number of elements per node,
477 corresponding block size, and displacements array.
478 */
479 displs = (int*) malloc(size * sizeof(int));
480 if (NULL == displs) { ret = -1; line = __LINE__; goto error_hndl; }
481 displs[0] = 0;
482 total_count = rcounts[0];
483 max_block_count = rcounts[0];
484 for (i = 1; i < size; i++) {
485 displs[i] = total_count;
486 total_count += rcounts[i];
487 if (max_block_count < rcounts[i]) max_block_count = rcounts[i];
488 }
489
490 /* Special case for size == 1 */
491 if (1 == size) {
492 if (MPI_IN_PLACE != sbuf) {
493 ret = ompi_datatype_copy_content_same_ddt(dtype, total_count,
494 (char*)rbuf, (char*)sbuf);
495 if (ret < 0) { line = __LINE__; goto error_hndl; }
496 }
497 free(displs);
498 return MPI_SUCCESS;
499 }
500
501 /* Allocate and initialize temporary buffers, we need:
502 - a temporary buffer to perform reduction (size total_count) since
503 rbuf can be of rcounts[rank] size.
504 - up to two temporary buffers used for communication/computation overlap.
505 */
506 ret = ompi_datatype_type_extent(dtype, &extent);
507 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
508
509 max_real_segsize = opal_datatype_span(&dtype->super, max_block_count, &gap);
510 dsize = opal_datatype_span(&dtype->super, total_count, &gap);
511
512 accumbuf_free = (char*)malloc(dsize);
513 if (NULL == accumbuf_free) { ret = -1; line = __LINE__; goto error_hndl; }
514 accumbuf = accumbuf_free - gap;
515
516 inbuf_free[0] = (char*)malloc(max_real_segsize);
517 if (NULL == inbuf_free[0]) { ret = -1; line = __LINE__; goto error_hndl; }
518 inbuf[0] = inbuf_free[0] - gap;
519 if (size > 2) {
520 inbuf_free[1] = (char*)malloc(max_real_segsize);
521 if (NULL == inbuf_free[1]) { ret = -1; line = __LINE__; goto error_hndl; }
522 inbuf[1] = inbuf_free[1] - gap;
523 }
524
525 /* Handle MPI_IN_PLACE for size > 1 */
526 if (MPI_IN_PLACE == sbuf) {
527 sbuf = rbuf;
528 }
529
530 ret = ompi_datatype_copy_content_same_ddt(dtype, total_count,
531 accumbuf, (char*)sbuf);
532 if (ret < 0) { line = __LINE__; goto error_hndl; }
533
534 /* Computation loop */
535
536 /*
537 For each of the remote nodes:
538 - post irecv for block (r-2) from (r-1) with wrap around
539 - send block (r-1) to (r+1)
540 - in loop for every step k = 2 .. n
541 - post irecv for block (r - 1 + n - k) % n
542 - wait on block (r + n - k) % n to arrive
543 - compute on block (r + n - k ) % n
544 - send block (r + n - k) % n
545 - wait on block (r)
546 - compute on block (r)
547 - copy block (r) to rbuf
548 Note that we must be careful when computing the begining of buffers and
549 for send operations and computation we must compute the exact block size.
550 */
551 send_to = (rank + 1) % size;
552 recv_from = (rank + size - 1) % size;
553
554 inbi = 0;
555 /* Initialize first receive from the neighbor on the left */
556 ret = MCA_PML_CALL(irecv(inbuf[inbi], max_block_count, dtype, recv_from,
557 MCA_COLL_BASE_TAG_REDUCE_SCATTER, comm,
558 &reqs[inbi]));
559 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
560 tmpsend = accumbuf + (ptrdiff_t)displs[recv_from] * extent;
561 ret = MCA_PML_CALL(send(tmpsend, rcounts[recv_from], dtype, send_to,
562 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
563 MCA_PML_BASE_SEND_STANDARD, comm));
564 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
565
566 for (k = 2; k < size; k++) {
567 const int prevblock = (rank + size - k) % size;
568
569 inbi = inbi ^ 0x1;
570
571 /* Post irecv for the current block */
572 ret = MCA_PML_CALL(irecv(inbuf[inbi], max_block_count, dtype, recv_from,
573 MCA_COLL_BASE_TAG_REDUCE_SCATTER, comm,
574 &reqs[inbi]));
575 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
576
577 /* Wait on previous block to arrive */
578 ret = ompi_request_wait(&reqs[inbi ^ 0x1], MPI_STATUS_IGNORE);
579 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
580
581 /* Apply operation on previous block: result goes to rbuf
582 rbuf[prevblock] = inbuf[inbi ^ 0x1] (op) rbuf[prevblock]
583 */
584 tmprecv = accumbuf + (ptrdiff_t)displs[prevblock] * extent;
585 ompi_op_reduce(op, inbuf[inbi ^ 0x1], tmprecv, rcounts[prevblock], dtype);
586
587 /* send previous block to send_to */
588 ret = MCA_PML_CALL(send(tmprecv, rcounts[prevblock], dtype, send_to,
589 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
590 MCA_PML_BASE_SEND_STANDARD, comm));
591 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
592 }
593
594 /* Wait on the last block to arrive */
595 ret = ompi_request_wait(&reqs[inbi], MPI_STATUS_IGNORE);
596 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
597
598 /* Apply operation on the last block (my block)
599 rbuf[rank] = inbuf[inbi] (op) rbuf[rank] */
600 tmprecv = accumbuf + (ptrdiff_t)displs[rank] * extent;
601 ompi_op_reduce(op, inbuf[inbi], tmprecv, rcounts[rank], dtype);
602
603 /* Copy result from tmprecv to rbuf */
604 ret = ompi_datatype_copy_content_same_ddt(dtype, rcounts[rank], (char *)rbuf, tmprecv);
605 if (ret < 0) { line = __LINE__; goto error_hndl; }
606
607 if (NULL != displs) free(displs);
608 if (NULL != accumbuf_free) free(accumbuf_free);
609 if (NULL != inbuf_free[0]) free(inbuf_free[0]);
610 if (NULL != inbuf_free[1]) free(inbuf_free[1]);
611
612 return MPI_SUCCESS;
613
614 error_hndl:
615 OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "%s:%4d\tRank %d Error occurred %d\n",
616 __FILE__, line, rank, ret));
617 (void)line; // silence compiler warning
618 if (NULL != displs) free(displs);
619 if (NULL != accumbuf_free) free(accumbuf_free);
620 if (NULL != inbuf_free[0]) free(inbuf_free[0]);
621 if (NULL != inbuf_free[1]) free(inbuf_free[1]);
622 return ret;
623 }
624
625 /*
626 * ompi_sum_counts: Returns sum of counts [lo, hi]
627 * lo, hi in {0, 1, ..., nprocs_pof2 - 1}
628 */
629 static int ompi_sum_counts(const int *counts, int *displs, int nprocs_rem, int lo, int hi)
630 {
631 /* Adjust lo and hi for taking into account blocks of excluded processes */
632 lo = (lo < nprocs_rem) ? lo * 2 : lo + nprocs_rem;
633 hi = (hi < nprocs_rem) ? hi * 2 + 1 : hi + nprocs_rem;
634 return displs[hi] + counts[hi] - displs[lo];
635 }
636
637 /*
638 * ompi_coll_base_reduce_scatter_intra_butterfly
639 *
640 * Function: Butterfly algorithm for reduce_scatter
641 * Accepts: Same as MPI_Reduce_scatter
642 * Returns: MPI_SUCCESS or error code
643 *
644 * Description: Implements butterfly algorithm for MPI_Reduce_scatter [*].
645 * The algorithm can be used both by commutative and non-commutative
646 * operations, for power-of-two and non-power-of-two number of processes.
647 *
648 * [*] J.L. Traff. An improved Algorithm for (non-commutative) Reduce-scatter
649 * with an Application // Proc. of EuroPVM/MPI, 2005. -- pp. 129-137.
650 *
651 * Time complexity: O(m\lambda + log(p)\alpha + m\beta + m\gamma),
652 * where m = sum of rcounts[], p = comm_size
653 * Memory requirements (per process): 2 * m * typesize + comm_size
654 *
655 * Example: comm_size=6, nprocs_pof2=4, nprocs_rem=2, rcounts[]=1, sbuf=[0,1,...,5]
656 * Step 1. Reduce the number of processes to 4
657 * rank 0: [0|1|2|3|4|5]: send to 1: vrank -1
658 * rank 1: [0|1|2|3|4|5]: recv from 0, op: vrank 0: [0|2|4|6|8|10]
659 * rank 2: [0|1|2|3|4|5]: send to 3: vrank -1
660 * rank 3: [0|1|2|3|4|5]: recv from 2, op: vrank 1: [0|2|4|6|8|10]
661 * rank 4: [0|1|2|3|4|5]: vrank 2: [0|1|2|3|4|5]
662 * rank 5: [0|1|2|3|4|5]: vrank 3: [0|1|2|3|4|5]
663 *
664 * Step 2. Butterfly. Buffer of 6 elements is divided into 4 blocks.
665 * Round 1 (mask=1, nblocks=2)
666 * 0: vrank -1
667 * 1: vrank 0 [0 2|4 6|8|10]: exch with 1: send [2,3], recv [0,1]: [0 4|8 12|*|*]
668 * 2: vrank -1
669 * 3: vrank 1 [0 2|4 6|8|10]: exch with 0: send [0,1], recv [2,3]: [**|**|16|20]
670 * 4: vrank 2 [0 1|2 3|4|5] : exch with 3: send [2,3], recv [0,1]: [0 2|4 6|*|*]
671 * 5: vrank 3 [0 1|2 3|4|5] : exch with 2: send [0,1], recv [2,3]: [**|**|8|10]
672 *
673 * Round 2 (mask=2, nblocks=1)
674 * 0: vrank -1
675 * 1: vrank 0 [0 4|8 12|*|*]: exch with 2: send [1], recv [0]: [0 6|**|*|*]
676 * 2: vrank -1
677 * 3: vrank 1 [**|**|16|20] : exch with 3: send [3], recv [2]: [**|**|24|*]
678 * 4: vrank 2 [0 2|4 6|*|*] : exch with 0: send [0], recv [1]: [**|12 18|*|*]
679 * 5: vrank 3 [**|**|8|10] : exch with 1: send [2], recv [3]: [**|**|*|30]
680 *
681 * Step 3. Exchange with remote process according to a mirror permutation:
682 * mperm(0)=0, mperm(1)=2, mperm(2)=1, mperm(3)=3
683 * 0: vrank -1: recv "0" from process 0
684 * 1: vrank 0 [0 6|**|*|*]: send "0" to 0, copy "6" to rbuf (mperm(0)=0)
685 * 2: vrank -1: recv result "12" from process 4
686 * 3: vrank 1 [**|**|24|*]
687 * 4: vrank 2 [**|12 18|*|*]: send "12" to 2, send "18" to 3, recv "24" from 3
688 * 5: vrank 3 [**|**|*|30]: copy "30" to rbuf (mperm(3)=3)
689 */
690 int
691 ompi_coll_base_reduce_scatter_intra_butterfly(
692 const void *sbuf, void *rbuf, const int *rcounts, struct ompi_datatype_t *dtype,
693 struct ompi_op_t *op, struct ompi_communicator_t *comm,
694 mca_coll_base_module_t *module)
695 {
696 char *tmpbuf[2] = {NULL, NULL}, *psend, *precv;
697 int *displs = NULL, index;
698 ptrdiff_t span, gap, totalcount, extent;
699 int err = MPI_SUCCESS;
700 int comm_size = ompi_comm_size(comm);
701 int rank = ompi_comm_rank(comm);
702
703 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
704 "coll:base:reduce_scatter_intra_butterfly: rank %d/%d",
705 rank, comm_size));
706 if (comm_size < 2)
707 return MPI_SUCCESS;
708
709 displs = malloc(sizeof(*displs) * comm_size);
710 if (NULL == displs) {
711 err = OMPI_ERR_OUT_OF_RESOURCE;
712 goto cleanup_and_return;
713 }
714 displs[0] = 0;
715 for (int i = 1; i < comm_size; i++) {
716 displs[i] = displs[i - 1] + rcounts[i - 1];
717 }
718 totalcount = displs[comm_size - 1] + rcounts[comm_size - 1];
719
720 ompi_datatype_type_extent(dtype, &extent);
721 span = opal_datatype_span(&dtype->super, totalcount, &gap);
722 tmpbuf[0] = malloc(span);
723 tmpbuf[1] = malloc(span);
724 if (NULL == tmpbuf[0] || NULL == tmpbuf[1]) {
725 err = OMPI_ERR_OUT_OF_RESOURCE;
726 goto cleanup_and_return;
727 }
728 psend = tmpbuf[0] - gap;
729 precv = tmpbuf[1] - gap;
730
731 if (sbuf != MPI_IN_PLACE) {
732 err = ompi_datatype_copy_content_same_ddt(dtype, totalcount, psend, (char *)sbuf);
733 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
734 } else {
735 err = ompi_datatype_copy_content_same_ddt(dtype, totalcount, psend, rbuf);
736 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
737 }
738
739 /*
740 * Step 1. Reduce the number of processes to the nearest lower power of two
741 * p' = 2^{\floor{\log_2 p}} by removing r = p - p' processes.
742 * In the first 2r processes (ranks 0 to 2r - 1), all the even ranks send
743 * the input vector to their neighbor (rank + 1) and all the odd ranks recv
744 * the input vector and perform local reduction.
745 * The odd ranks (0 to 2r - 1) contain the reduction with the input
746 * vector on their neighbors (the even ranks). The first r odd
747 * processes and the p - 2r last processes are renumbered from
748 * 0 to 2^{\floor{\log_2 p}} - 1. Even ranks do not participate in the
749 * rest of the algorithm.
750 */
751
752 /* Find nearest power-of-two less than or equal to comm_size */
753 int nprocs_pof2 = opal_next_poweroftwo(comm_size);
754 nprocs_pof2 >>= 1;
755 int nprocs_rem = comm_size - nprocs_pof2;
756 int log2_size = opal_cube_dim(nprocs_pof2);
757
758 int vrank = -1;
759 if (rank < 2 * nprocs_rem) {
760 if ((rank % 2) == 0) {
761 /* Even process */
762 err = MCA_PML_CALL(send(psend, totalcount, dtype, rank + 1,
763 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
764 MCA_PML_BASE_SEND_STANDARD, comm));
765 if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
766 /* This process does not participate in the rest of the algorithm */
767 vrank = -1;
768 } else {
769 /* Odd process */
770 err = MCA_PML_CALL(recv(precv, totalcount, dtype, rank - 1,
771 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
772 comm, MPI_STATUS_IGNORE));
773 if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
774 ompi_op_reduce(op, precv, psend, totalcount, dtype);
775 /* Adjust rank to be the bottom "remain" ranks */
776 vrank = rank / 2;
777 }
778 } else {
779 /* Adjust rank to show that the bottom "even remain" ranks dropped out */
780 vrank = rank - nprocs_rem;
781 }
782
783 if (vrank != -1) {
784 /*
785 * Now, psend vector of size totalcount is divided into nprocs_pof2 blocks:
786 * block 0: rcounts[0] and rcounts[1] -- for process 0 and 1
787 * block 1: rcounts[2] and rcounts[3] -- for process 2 and 3
788 * ...
789 * block r-1: rcounts[2*(r-1)] and rcounts[2*(r-1)+1]
790 * block r: rcounts[r+r]
791 * block r+1: rcounts[r+r+1]
792 * ...
793 * block nprocs_pof2 - 1: rcounts[r+nprocs_pof2-1]
794 */
795 int nblocks = nprocs_pof2, send_index = 0, recv_index = 0;
796 for (int mask = 1; mask < nprocs_pof2; mask <<= 1) {
797 int vpeer = vrank ^ mask;
798 int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem;
799
800 nblocks /= 2;
801 if ((vrank & mask) == 0) {
802 /* Send the upper half of reduction buffer, recv the lower half */
803 send_index += nblocks;
804 } else {
805 /* Send the upper half of reduction buffer, recv the lower half */
806 recv_index += nblocks;
807 }
808
809 /* Send blocks: [send_index, send_index + nblocks - 1] */
810 int send_count = ompi_sum_counts(rcounts, displs, nprocs_rem,
811 send_index, send_index + nblocks - 1);
812 index = (send_index < nprocs_rem) ? 2 * send_index : nprocs_rem + send_index;
813 ptrdiff_t sdispl = displs[index];
814
815 /* Recv blocks: [recv_index, recv_index + nblocks - 1] */
816 int recv_count = ompi_sum_counts(rcounts, displs, nprocs_rem,
817 recv_index, recv_index + nblocks - 1);
818 index = (recv_index < nprocs_rem) ? 2 * recv_index : nprocs_rem + recv_index;
819 ptrdiff_t rdispl = displs[index];
820
821 err = ompi_coll_base_sendrecv(psend + (ptrdiff_t)sdispl * extent, send_count,
822 dtype, peer, MCA_COLL_BASE_TAG_REDUCE_SCATTER,
823 precv + (ptrdiff_t)rdispl * extent, recv_count,
824 dtype, peer, MCA_COLL_BASE_TAG_REDUCE_SCATTER,
825 comm, MPI_STATUS_IGNORE, rank);
826 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
827
828 if (vrank < vpeer) {
829 /* precv = psend <op> precv */
830 ompi_op_reduce(op, psend + (ptrdiff_t)rdispl * extent,
831 precv + (ptrdiff_t)rdispl * extent, recv_count, dtype);
832 char *p = psend;
833 psend = precv;
834 precv = p;
835 } else {
836 /* psend = precv <op> psend */
837 ompi_op_reduce(op, precv + (ptrdiff_t)rdispl * extent,
838 psend + (ptrdiff_t)rdispl * extent, recv_count, dtype);
839 }
840 send_index = recv_index;
841 }
842 /*
843 * psend points to the result block [send_index]
844 * Exchange results with remote process according to a mirror permutation.
845 */
846 int vpeer = ompi_mirror_perm(vrank, log2_size);
847 int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem;
848 index = (send_index < nprocs_rem) ? 2 * send_index : nprocs_rem + send_index;
849
850 if (vpeer < nprocs_rem) {
851 /*
852 * Process has two blocks: for excluded process and own.
853 * Send the first block to excluded process.
854 */
855 err = MCA_PML_CALL(send(psend + (ptrdiff_t)displs[index] * extent,
856 rcounts[index], dtype, peer - 1,
857 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
858 MCA_PML_BASE_SEND_STANDARD, comm));
859 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
860 }
861
862 /* If process has two blocks, then send the second block (own block) */
863 if (vpeer < nprocs_rem)
864 index++;
865 if (vpeer != vrank) {
866 err = ompi_coll_base_sendrecv(psend + (ptrdiff_t)displs[index] * extent,
867 rcounts[index], dtype, peer,
868 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
869 rbuf, rcounts[rank], dtype, peer,
870 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
871 comm, MPI_STATUS_IGNORE, rank);
872 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
873 } else {
874 err = ompi_datatype_copy_content_same_ddt(dtype, rcounts[rank], rbuf,
875 psend + (ptrdiff_t)displs[rank] * extent);
876 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
877 }
878
879 } else {
880 /* Excluded process: receive result */
881 int vpeer = ompi_mirror_perm((rank + 1) / 2, log2_size);
882 int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem;
883 err = MCA_PML_CALL(recv(rbuf, rcounts[rank], dtype, peer,
884 MCA_COLL_BASE_TAG_REDUCE_SCATTER, comm,
885 MPI_STATUS_IGNORE));
886 if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
887 }
888
889 cleanup_and_return:
890 if (displs)
891 free(displs);
892 if (tmpbuf[0])
893 free(tmpbuf[0]);
894 if (tmpbuf[1])
895 free(tmpbuf[1]);
896 return err;
897 }