1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
2 /*
3 * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
4 * University Research and Technology
5 * Corporation. All rights reserved.
6 * Copyright (c) 2004-2017 The University of Tennessee and The University
7 * of Tennessee Research Foundation. All rights
8 * reserved.
9 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
10 * University of Stuttgart. All rights reserved.
11 * Copyright (c) 2004-2005 The Regents of the University of California.
12 * All rights reserved.
13 * Copyright (c) 2009 University of Houston. All rights reserved.
14 * Copyright (c) 2013 Los Alamos National Security, LLC. All Rights
15 * reserved.
16 * Copyright (c) 2015-2017 Research Organization for Information Science
17 * and Technology (RIST). All rights reserved.
18 * Copyright (c) 2018 Siberian State University of Telecommunications
19 * and Information Science. All rights reserved.
20 * $COPYRIGHT$
21 *
22 * Additional copyrights may follow
23 *
24 * $HEADER$
25 */
26
27 #include "ompi_config.h"
28
29 #include "mpi.h"
30 #include "opal/util/bit_ops.h"
31 #include "ompi/constants.h"
32 #include "ompi/datatype/ompi_datatype.h"
33 #include "ompi/communicator/communicator.h"
34 #include "ompi/mca/coll/coll.h"
35 #include "ompi/mca/coll/base/coll_tags.h"
36 #include "ompi/mca/pml/pml.h"
37 #include "ompi/op/op.h"
38 #include "ompi/mca/coll/base/coll_base_functions.h"
39 #include "coll_base_topo.h"
40 #include "coll_base_util.h"
41
42 /*
43 * ompi_coll_base_allreduce_intra_nonoverlapping
44 *
45 * This function just calls a reduce followed by a broadcast
46 * both called functions are base but they complete sequentially,
47 * i.e. no additional overlapping
48 * meaning if the number of segments used is greater than the topo depth
49 * then once the first segment of data is fully 'reduced' it is not broadcast
50 * while the reduce continues (cost = cost-reduce + cost-bcast + decision x 3)
51 *
52 */
53 int
54 ompi_coll_base_allreduce_intra_nonoverlapping(const void *sbuf, void *rbuf, int count,
55 struct ompi_datatype_t *dtype,
56 struct ompi_op_t *op,
57 struct ompi_communicator_t *comm,
58 mca_coll_base_module_t *module)
59 {
60 int err, rank;
61
62 rank = ompi_comm_rank(comm);
63
64 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,"coll:base:allreduce_intra_nonoverlapping rank %d", rank));
65
66 /* Reduce to 0 and broadcast. */
67
68 if (MPI_IN_PLACE == sbuf) {
69 if (0 == rank) {
70 err = comm->c_coll->coll_reduce (MPI_IN_PLACE, rbuf, count, dtype,
71 op, 0, comm, comm->c_coll->coll_reduce_module);
72 } else {
73 err = comm->c_coll->coll_reduce (rbuf, NULL, count, dtype, op, 0,
74 comm, comm->c_coll->coll_reduce_module);
75 }
76 } else {
77 err = comm->c_coll->coll_reduce (sbuf, rbuf, count, dtype, op, 0,
78 comm, comm->c_coll->coll_reduce_module);
79 }
80 if (MPI_SUCCESS != err) {
81 return err;
82 }
83
84 return comm->c_coll->coll_bcast (rbuf, count, dtype, 0, comm,
85 comm->c_coll->coll_bcast_module);
86 }
87
88 /*
89 * ompi_coll_base_allreduce_intra_recursivedoubling
90 *
91 * Function: Recursive doubling algorithm for allreduce operation
92 * Accepts: Same as MPI_Allreduce()
93 * Returns: MPI_SUCCESS or error code
94 *
95 * Description: Implements recursive doubling algorithm for allreduce.
96 * Original (non-segmented) implementation is used in MPICH-2
97 * for small and intermediate size messages.
98 * The algorithm preserves order of operations so it can
99 * be used both by commutative and non-commutative operations.
100 *
101 * Example on 7 nodes:
102 * Initial state
103 * # 0 1 2 3 4 5 6
104 * [0] [1] [2] [3] [4] [5] [6]
105 * Initial adjustment step for non-power of two nodes.
106 * old rank 1 3 5 6
107 * new rank 0 1 2 3
108 * [0+1] [2+3] [4+5] [6]
109 * Step 1
110 * old rank 1 3 5 6
111 * new rank 0 1 2 3
112 * [0+1+] [0+1+] [4+5+] [4+5+]
113 * [2+3+] [2+3+] [6 ] [6 ]
114 * Step 2
115 * old rank 1 3 5 6
116 * new rank 0 1 2 3
117 * [0+1+] [0+1+] [0+1+] [0+1+]
118 * [2+3+] [2+3+] [2+3+] [2+3+]
119 * [4+5+] [4+5+] [4+5+] [4+5+]
120 * [6 ] [6 ] [6 ] [6 ]
121 * Final adjustment step for non-power of two nodes
122 * # 0 1 2 3 4 5 6
123 * [0+1+] [0+1+] [0+1+] [0+1+] [0+1+] [0+1+] [0+1+]
124 * [2+3+] [2+3+] [2+3+] [2+3+] [2+3+] [2+3+] [2+3+]
125 * [4+5+] [4+5+] [4+5+] [4+5+] [4+5+] [4+5+] [4+5+]
126 * [6 ] [6 ] [6 ] [6 ] [6 ] [6 ] [6 ]
127 *
128 */
129 int
130 ompi_coll_base_allreduce_intra_recursivedoubling(const void *sbuf, void *rbuf,
131 int count,
132 struct ompi_datatype_t *dtype,
133 struct ompi_op_t *op,
134 struct ompi_communicator_t *comm,
135 mca_coll_base_module_t *module)
136 {
137 int ret, line, rank, size, adjsize, remote, distance;
138 int newrank, newremote, extra_ranks;
139 char *tmpsend = NULL, *tmprecv = NULL, *tmpswap = NULL, *inplacebuf_free = NULL, *inplacebuf;
140 ptrdiff_t span, gap = 0;
141
142 size = ompi_comm_size(comm);
143 rank = ompi_comm_rank(comm);
144
145 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
146 "coll:base:allreduce_intra_recursivedoubling rank %d", rank));
147
148 /* Special case for size == 1 */
149 if (1 == size) {
150 if (MPI_IN_PLACE != sbuf) {
151 ret = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf);
152 if (ret < 0) { line = __LINE__; goto error_hndl; }
153 }
154 return MPI_SUCCESS;
155 }
156
157 /* Allocate and initialize temporary send buffer */
158 span = opal_datatype_span(&dtype->super, count, &gap);
159 inplacebuf_free = (char*) malloc(span);
160 if (NULL == inplacebuf_free) { ret = -1; line = __LINE__; goto error_hndl; }
161 inplacebuf = inplacebuf_free - gap;
162
163 if (MPI_IN_PLACE == sbuf) {
164 ret = ompi_datatype_copy_content_same_ddt(dtype, count, inplacebuf, (char*)rbuf);
165 if (ret < 0) { line = __LINE__; goto error_hndl; }
166 } else {
167 ret = ompi_datatype_copy_content_same_ddt(dtype, count, inplacebuf, (char*)sbuf);
168 if (ret < 0) { line = __LINE__; goto error_hndl; }
169 }
170
171 tmpsend = (char*) inplacebuf;
172 tmprecv = (char*) rbuf;
173
174 /* Determine nearest power of two less than or equal to size */
175 adjsize = opal_next_poweroftwo (size);
176 adjsize >>= 1;
177
178 /* Handle non-power-of-two case:
179 - Even ranks less than 2 * extra_ranks send their data to (rank + 1), and
180 sets new rank to -1.
181 - Odd ranks less than 2 * extra_ranks receive data from (rank - 1),
182 apply appropriate operation, and set new rank to rank/2
183 - Everyone else sets rank to rank - extra_ranks
184 */
185 extra_ranks = size - adjsize;
186 if (rank < (2 * extra_ranks)) {
187 if (0 == (rank % 2)) {
188 ret = MCA_PML_CALL(send(tmpsend, count, dtype, (rank + 1),
189 MCA_COLL_BASE_TAG_ALLREDUCE,
190 MCA_PML_BASE_SEND_STANDARD, comm));
191 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
192 newrank = -1;
193 } else {
194 ret = MCA_PML_CALL(recv(tmprecv, count, dtype, (rank - 1),
195 MCA_COLL_BASE_TAG_ALLREDUCE, comm,
196 MPI_STATUS_IGNORE));
197 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
198 /* tmpsend = tmprecv (op) tmpsend */
199 ompi_op_reduce(op, tmprecv, tmpsend, count, dtype);
200 newrank = rank >> 1;
201 }
202 } else {
203 newrank = rank - extra_ranks;
204 }
205
206 /* Communication/Computation loop
207 - Exchange message with remote node.
208 - Perform appropriate operation taking in account order of operations:
209 result = value (op) result
210 */
211 for (distance = 0x1; distance < adjsize; distance <<=1) {
212 if (newrank < 0) break;
213 /* Determine remote node */
214 newremote = newrank ^ distance;
215 remote = (newremote < extra_ranks)?
216 (newremote * 2 + 1):(newremote + extra_ranks);
217
218 /* Exchange the data */
219 ret = ompi_coll_base_sendrecv_actual(tmpsend, count, dtype, remote,
220 MCA_COLL_BASE_TAG_ALLREDUCE,
221 tmprecv, count, dtype, remote,
222 MCA_COLL_BASE_TAG_ALLREDUCE,
223 comm, MPI_STATUS_IGNORE);
224 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
225
226 /* Apply operation */
227 if (rank < remote) {
228 /* tmprecv = tmpsend (op) tmprecv */
229 ompi_op_reduce(op, tmpsend, tmprecv, count, dtype);
230 tmpswap = tmprecv;
231 tmprecv = tmpsend;
232 tmpsend = tmpswap;
233 } else {
234 /* tmpsend = tmprecv (op) tmpsend */
235 ompi_op_reduce(op, tmprecv, tmpsend, count, dtype);
236 }
237 }
238
239 /* Handle non-power-of-two case:
240 - Odd ranks less than 2 * extra_ranks send result from tmpsend to
241 (rank - 1)
242 - Even ranks less than 2 * extra_ranks receive result from (rank + 1)
243 */
244 if (rank < (2 * extra_ranks)) {
245 if (0 == (rank % 2)) {
246 ret = MCA_PML_CALL(recv(rbuf, count, dtype, (rank + 1),
247 MCA_COLL_BASE_TAG_ALLREDUCE, comm,
248 MPI_STATUS_IGNORE));
249 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
250 tmpsend = (char*)rbuf;
251 } else {
252 ret = MCA_PML_CALL(send(tmpsend, count, dtype, (rank - 1),
253 MCA_COLL_BASE_TAG_ALLREDUCE,
254 MCA_PML_BASE_SEND_STANDARD, comm));
255 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
256 }
257 }
258
259 /* Ensure that the final result is in rbuf */
260 if (tmpsend != rbuf) {
261 ret = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)rbuf, tmpsend);
262 if (ret < 0) { line = __LINE__; goto error_hndl; }
263 }
264
265 if (NULL != inplacebuf_free) free(inplacebuf_free);
266 return MPI_SUCCESS;
267
268 error_hndl:
269 OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "%s:%4d\tRank %d Error occurred %d\n",
270 __FILE__, line, rank, ret));
271 (void)line; // silence compiler warning
272 if (NULL != inplacebuf_free) free(inplacebuf_free);
273 return ret;
274 }
275
276 /*
277 * ompi_coll_base_allreduce_intra_ring
278 *
279 * Function: Ring algorithm for allreduce operation
280 * Accepts: Same as MPI_Allreduce()
281 * Returns: MPI_SUCCESS or error code
282 *
283 * Description: Implements ring algorithm for allreduce: the message is
284 * automatically segmented to segment of size M/N.
285 * Algorithm requires 2*N - 1 steps.
286 *
287 * Limitations: The algorithm DOES NOT preserve order of operations so it
288 * can be used only for commutative operations.
289 * In addition, algorithm cannot work if the total count is
290 * less than size.
291 * Example on 5 nodes:
292 * Initial state
293 * # 0 1 2 3 4
294 * [00] [10] [20] [30] [40]
295 * [01] [11] [21] [31] [41]
296 * [02] [12] [22] [32] [42]
297 * [03] [13] [23] [33] [43]
298 * [04] [14] [24] [34] [44]
299 *
300 * COMPUTATION PHASE
301 * Step 0: rank r sends block r to rank (r+1) and receives bloc (r-1)
302 * from rank (r-1) [with wraparound].
303 * # 0 1 2 3 4
304 * [00] [00+10] [20] [30] [40]
305 * [01] [11] [11+21] [31] [41]
306 * [02] [12] [22] [22+32] [42]
307 * [03] [13] [23] [33] [33+43]
308 * [44+04] [14] [24] [34] [44]
309 *
310 * Step 1: rank r sends block (r-1) to rank (r+1) and receives bloc
311 * (r-2) from rank (r-1) [with wraparound].
312 * # 0 1 2 3 4
313 * [00] [00+10] [01+10+20] [30] [40]
314 * [01] [11] [11+21] [11+21+31] [41]
315 * [02] [12] [22] [22+32] [22+32+42]
316 * [33+43+03] [13] [23] [33] [33+43]
317 * [44+04] [44+04+14] [24] [34] [44]
318 *
319 * Step 2: rank r sends block (r-2) to rank (r+1) and receives bloc
320 * (r-2) from rank (r-1) [with wraparound].
321 * # 0 1 2 3 4
322 * [00] [00+10] [01+10+20] [01+10+20+30] [40]
323 * [01] [11] [11+21] [11+21+31] [11+21+31+41]
324 * [22+32+42+02] [12] [22] [22+32] [22+32+42]
325 * [33+43+03] [33+43+03+13] [23] [33] [33+43]
326 * [44+04] [44+04+14] [44+04+14+24] [34] [44]
327 *
328 * Step 3: rank r sends block (r-3) to rank (r+1) and receives bloc
329 * (r-3) from rank (r-1) [with wraparound].
330 * # 0 1 2 3 4
331 * [00] [00+10] [01+10+20] [01+10+20+30] [FULL]
332 * [FULL] [11] [11+21] [11+21+31] [11+21+31+41]
333 * [22+32+42+02] [FULL] [22] [22+32] [22+32+42]
334 * [33+43+03] [33+43+03+13] [FULL] [33] [33+43]
335 * [44+04] [44+04+14] [44+04+14+24] [FULL] [44]
336 *
337 * DISTRIBUTION PHASE: ring ALLGATHER with ranks shifted by 1.
338 *
339 */
340 int
341 ompi_coll_base_allreduce_intra_ring(const void *sbuf, void *rbuf, int count,
342 struct ompi_datatype_t *dtype,
343 struct ompi_op_t *op,
344 struct ompi_communicator_t *comm,
345 mca_coll_base_module_t *module)
346 {
347 int ret, line, rank, size, k, recv_from, send_to, block_count, inbi;
348 int early_segcount, late_segcount, split_rank, max_segcount;
349 size_t typelng;
350 char *tmpsend = NULL, *tmprecv = NULL, *inbuf[2] = {NULL, NULL};
351 ptrdiff_t true_lb, true_extent, lb, extent;
352 ptrdiff_t block_offset, max_real_segsize;
353 ompi_request_t *reqs[2] = {MPI_REQUEST_NULL, MPI_REQUEST_NULL};
354
355 size = ompi_comm_size(comm);
356 rank = ompi_comm_rank(comm);
357
358 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
359 "coll:base:allreduce_intra_ring rank %d, count %d", rank, count));
360
361 /* Special case for size == 1 */
362 if (1 == size) {
363 if (MPI_IN_PLACE != sbuf) {
364 ret = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf);
365 if (ret < 0) { line = __LINE__; goto error_hndl; }
366 }
367 return MPI_SUCCESS;
368 }
369
370 /* Special case for count less than size - use recursive doubling */
371 if (count < size) {
372 OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "coll:base:allreduce_ring rank %d/%d, count %d, switching to recursive doubling", rank, size, count));
373 return (ompi_coll_base_allreduce_intra_recursivedoubling(sbuf, rbuf,
374 count,
375 dtype, op,
376 comm, module));
377 }
378
379 /* Allocate and initialize temporary buffers */
380 ret = ompi_datatype_get_extent(dtype, &lb, &extent);
381 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
382 ret = ompi_datatype_get_true_extent(dtype, &true_lb, &true_extent);
383 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
384 ret = ompi_datatype_type_size( dtype, &typelng);
385 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
386
387 /* Determine the number of elements per block and corresponding
388 block sizes.
389 The blocks are divided into "early" and "late" ones:
390 blocks 0 .. (split_rank - 1) are "early" and
391 blocks (split_rank) .. (size - 1) are "late".
392 Early blocks are at most 1 element larger than the late ones.
393 */
394 COLL_BASE_COMPUTE_BLOCKCOUNT( count, size, split_rank,
395 early_segcount, late_segcount );
396 max_segcount = early_segcount;
397 max_real_segsize = true_extent + (max_segcount - 1) * extent;
398
399
400 inbuf[0] = (char*)malloc(max_real_segsize);
401 if (NULL == inbuf[0]) { ret = -1; line = __LINE__; goto error_hndl; }
402 if (size > 2) {
403 inbuf[1] = (char*)malloc(max_real_segsize);
404 if (NULL == inbuf[1]) { ret = -1; line = __LINE__; goto error_hndl; }
405 }
406
407 /* Handle MPI_IN_PLACE */
408 if (MPI_IN_PLACE != sbuf) {
409 ret = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf);
410 if (ret < 0) { line = __LINE__; goto error_hndl; }
411 }
412
413 /* Computation loop */
414
415 /*
416 For each of the remote nodes:
417 - post irecv for block (r-1)
418 - send block (r)
419 - in loop for every step k = 2 .. n
420 - post irecv for block (r + n - k) % n
421 - wait on block (r + n - k + 1) % n to arrive
422 - compute on block (r + n - k + 1) % n
423 - send block (r + n - k + 1) % n
424 - wait on block (r + 1)
425 - compute on block (r + 1)
426 - send block (r + 1) to rank (r + 1)
427 Note that we must be careful when computing the begining of buffers and
428 for send operations and computation we must compute the exact block size.
429 */
430 send_to = (rank + 1) % size;
431 recv_from = (rank + size - 1) % size;
432
433 inbi = 0;
434 /* Initialize first receive from the neighbor on the left */
435 ret = MCA_PML_CALL(irecv(inbuf[inbi], max_segcount, dtype, recv_from,
436 MCA_COLL_BASE_TAG_ALLREDUCE, comm, &reqs[inbi]));
437 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
438 /* Send first block (my block) to the neighbor on the right */
439 block_offset = ((rank < split_rank)?
440 ((ptrdiff_t)rank * (ptrdiff_t)early_segcount) :
441 ((ptrdiff_t)rank * (ptrdiff_t)late_segcount + split_rank));
442 block_count = ((rank < split_rank)? early_segcount : late_segcount);
443 tmpsend = ((char*)rbuf) + block_offset * extent;
444 ret = MCA_PML_CALL(send(tmpsend, block_count, dtype, send_to,
445 MCA_COLL_BASE_TAG_ALLREDUCE,
446 MCA_PML_BASE_SEND_STANDARD, comm));
447 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
448
449 for (k = 2; k < size; k++) {
450 const int prevblock = (rank + size - k + 1) % size;
451
452 inbi = inbi ^ 0x1;
453
454 /* Post irecv for the current block */
455 ret = MCA_PML_CALL(irecv(inbuf[inbi], max_segcount, dtype, recv_from,
456 MCA_COLL_BASE_TAG_ALLREDUCE, comm, &reqs[inbi]));
457 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
458
459 /* Wait on previous block to arrive */
460 ret = ompi_request_wait(&reqs[inbi ^ 0x1], MPI_STATUS_IGNORE);
461 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
462
463 /* Apply operation on previous block: result goes to rbuf
464 rbuf[prevblock] = inbuf[inbi ^ 0x1] (op) rbuf[prevblock]
465 */
466 block_offset = ((prevblock < split_rank)?
467 ((ptrdiff_t)prevblock * early_segcount) :
468 ((ptrdiff_t)prevblock * late_segcount + split_rank));
469 block_count = ((prevblock < split_rank)? early_segcount : late_segcount);
470 tmprecv = ((char*)rbuf) + (ptrdiff_t)block_offset * extent;
471 ompi_op_reduce(op, inbuf[inbi ^ 0x1], tmprecv, block_count, dtype);
472
473 /* send previous block to send_to */
474 ret = MCA_PML_CALL(send(tmprecv, block_count, dtype, send_to,
475 MCA_COLL_BASE_TAG_ALLREDUCE,
476 MCA_PML_BASE_SEND_STANDARD, comm));
477 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
478 }
479
480 /* Wait on the last block to arrive */
481 ret = ompi_request_wait(&reqs[inbi], MPI_STATUS_IGNORE);
482 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
483
484 /* Apply operation on the last block (from neighbor (rank + 1)
485 rbuf[rank+1] = inbuf[inbi] (op) rbuf[rank + 1] */
486 recv_from = (rank + 1) % size;
487 block_offset = ((recv_from < split_rank)?
488 ((ptrdiff_t)recv_from * early_segcount) :
489 ((ptrdiff_t)recv_from * late_segcount + split_rank));
490 block_count = ((recv_from < split_rank)? early_segcount : late_segcount);
491 tmprecv = ((char*)rbuf) + (ptrdiff_t)block_offset * extent;
492 ompi_op_reduce(op, inbuf[inbi], tmprecv, block_count, dtype);
493
494 /* Distribution loop - variation of ring allgather */
495 send_to = (rank + 1) % size;
496 recv_from = (rank + size - 1) % size;
497 for (k = 0; k < size - 1; k++) {
498 const int recv_data_from = (rank + size - k) % size;
499 const int send_data_from = (rank + 1 + size - k) % size;
500 const int send_block_offset =
501 ((send_data_from < split_rank)?
502 ((ptrdiff_t)send_data_from * early_segcount) :
503 ((ptrdiff_t)send_data_from * late_segcount + split_rank));
504 const int recv_block_offset =
505 ((recv_data_from < split_rank)?
506 ((ptrdiff_t)recv_data_from * early_segcount) :
507 ((ptrdiff_t)recv_data_from * late_segcount + split_rank));
508 block_count = ((send_data_from < split_rank)?
509 early_segcount : late_segcount);
510
511 tmprecv = (char*)rbuf + (ptrdiff_t)recv_block_offset * extent;
512 tmpsend = (char*)rbuf + (ptrdiff_t)send_block_offset * extent;
513
514 ret = ompi_coll_base_sendrecv(tmpsend, block_count, dtype, send_to,
515 MCA_COLL_BASE_TAG_ALLREDUCE,
516 tmprecv, max_segcount, dtype, recv_from,
517 MCA_COLL_BASE_TAG_ALLREDUCE,
518 comm, MPI_STATUS_IGNORE, rank);
519 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl;}
520
521 }
522
523 if (NULL != inbuf[0]) free(inbuf[0]);
524 if (NULL != inbuf[1]) free(inbuf[1]);
525
526 return MPI_SUCCESS;
527
528 error_hndl:
529 OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "%s:%4d\tRank %d Error occurred %d\n",
530 __FILE__, line, rank, ret));
531 ompi_coll_base_free_reqs(reqs, 2);
532 (void)line; // silence compiler warning
533 if (NULL != inbuf[0]) free(inbuf[0]);
534 if (NULL != inbuf[1]) free(inbuf[1]);
535 return ret;
536 }
537
538 /*
539 * ompi_coll_base_allreduce_intra_ring_segmented
540 *
541 * Function: Pipelined ring algorithm for allreduce operation
542 * Accepts: Same as MPI_Allreduce(), segment size
543 * Returns: MPI_SUCCESS or error code
544 *
545 * Description: Implements pipelined ring algorithm for allreduce:
546 * user supplies suggested segment size for the pipelining of
547 * reduce operation.
548 * The segment size determines the number of phases, np, for
549 * the algorithm execution.
550 * The message is automatically divided into blocks of
551 * approximately (count / (np * segcount)) elements.
552 * At the end of reduction phase, allgather like step is
553 * executed.
554 * Algorithm requires (np + 1)*(N - 1) steps.
555 *
556 * Limitations: The algorithm DOES NOT preserve order of operations so it
557 * can be used only for commutative operations.
558 * In addition, algorithm cannot work if the total size is
559 * less than size * segment size.
560 * Example on 3 nodes with 2 phases
561 * Initial state
562 * # 0 1 2
563 * [00a] [10a] [20a]
564 * [00b] [10b] [20b]
565 * [01a] [11a] [21a]
566 * [01b] [11b] [21b]
567 * [02a] [12a] [22a]
568 * [02b] [12b] [22b]
569 *
570 * COMPUTATION PHASE 0 (a)
571 * Step 0: rank r sends block ra to rank (r+1) and receives bloc (r-1)a
572 * from rank (r-1) [with wraparound].
573 * # 0 1 2
574 * [00a] [00a+10a] [20a]
575 * [00b] [10b] [20b]
576 * [01a] [11a] [11a+21a]
577 * [01b] [11b] [21b]
578 * [22a+02a] [12a] [22a]
579 * [02b] [12b] [22b]
580 *
581 * Step 1: rank r sends block (r-1)a to rank (r+1) and receives bloc
582 * (r-2)a from rank (r-1) [with wraparound].
583 * # 0 1 2
584 * [00a] [00a+10a] [00a+10a+20a]
585 * [00b] [10b] [20b]
586 * [11a+21a+01a] [11a] [11a+21a]
587 * [01b] [11b] [21b]
588 * [22a+02a] [22a+02a+12a] [22a]
589 * [02b] [12b] [22b]
590 *
591 * COMPUTATION PHASE 1 (b)
592 * Step 0: rank r sends block rb to rank (r+1) and receives bloc (r-1)b
593 * from rank (r-1) [with wraparound].
594 * # 0 1 2
595 * [00a] [00a+10a] [20a]
596 * [00b] [00b+10b] [20b]
597 * [01a] [11a] [11a+21a]
598 * [01b] [11b] [11b+21b]
599 * [22a+02a] [12a] [22a]
600 * [22b+02b] [12b] [22b]
601 *
602 * Step 1: rank r sends block (r-1)b to rank (r+1) and receives bloc
603 * (r-2)b from rank (r-1) [with wraparound].
604 * # 0 1 2
605 * [00a] [00a+10a] [00a+10a+20a]
606 * [00b] [10b] [0bb+10b+20b]
607 * [11a+21a+01a] [11a] [11a+21a]
608 * [11b+21b+01b] [11b] [21b]
609 * [22a+02a] [22a+02a+12a] [22a]
610 * [02b] [22b+01b+12b] [22b]
611 *
612 *
613 * DISTRIBUTION PHASE: ring ALLGATHER with ranks shifted by 1 (same as
614 * in regular ring algorithm.
615 *
616 */
617 int
618 ompi_coll_base_allreduce_intra_ring_segmented(const void *sbuf, void *rbuf, int count,
619 struct ompi_datatype_t *dtype,
620 struct ompi_op_t *op,
621 struct ompi_communicator_t *comm,
622 mca_coll_base_module_t *module,
623 uint32_t segsize)
624 {
625 int ret, line, rank, size, k, recv_from, send_to;
626 int early_blockcount, late_blockcount, split_rank;
627 int segcount, max_segcount, num_phases, phase, block_count, inbi;
628 size_t typelng;
629 char *tmpsend = NULL, *tmprecv = NULL, *inbuf[2] = {NULL, NULL};
630 ptrdiff_t block_offset, max_real_segsize;
631 ompi_request_t *reqs[2] = {MPI_REQUEST_NULL, MPI_REQUEST_NULL};
632 ptrdiff_t lb, extent, gap;
633
634 size = ompi_comm_size(comm);
635 rank = ompi_comm_rank(comm);
636
637 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
638 "coll:base:allreduce_intra_ring_segmented rank %d, count %d", rank, count));
639
640 /* Special case for size == 1 */
641 if (1 == size) {
642 if (MPI_IN_PLACE != sbuf) {
643 ret = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf);
644 if (ret < 0) { line = __LINE__; goto error_hndl; }
645 }
646 return MPI_SUCCESS;
647 }
648
649 /* Determine segment count based on the suggested segment size */
650 ret = ompi_datatype_type_size( dtype, &typelng);
651 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
652 segcount = count;
653 COLL_BASE_COMPUTED_SEGCOUNT(segsize, typelng, segcount)
654
655 /* Special case for count less than size * segcount - use regular ring */
656 if (count < (size * segcount)) {
657 OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "coll:base:allreduce_ring_segmented rank %d/%d, count %d, switching to regular ring", rank, size, count));
658 return (ompi_coll_base_allreduce_intra_ring(sbuf, rbuf, count, dtype, op,
659 comm, module));
660 }
661
662 /* Determine the number of phases of the algorithm */
663 num_phases = count / (size * segcount);
664 if ((count % (size * segcount) >= size) &&
665 (count % (size * segcount) > ((size * segcount) / 2))) {
666 num_phases++;
667 }
668
669 /* Determine the number of elements per block and corresponding
670 block sizes.
671 The blocks are divided into "early" and "late" ones:
672 blocks 0 .. (split_rank - 1) are "early" and
673 blocks (split_rank) .. (size - 1) are "late".
674 Early blocks are at most 1 element larger than the late ones.
675 Note, these blocks will be split into num_phases segments,
676 out of the largest one will have max_segcount elements.
677 */
678 COLL_BASE_COMPUTE_BLOCKCOUNT( count, size, split_rank,
679 early_blockcount, late_blockcount );
680 COLL_BASE_COMPUTE_BLOCKCOUNT( early_blockcount, num_phases, inbi,
681 max_segcount, k);
682
683 ret = ompi_datatype_get_extent(dtype, &lb, &extent);
684 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
685 max_real_segsize = opal_datatype_span(&dtype->super, max_segcount, &gap);
686
687 /* Allocate and initialize temporary buffers */
688 inbuf[0] = (char*)malloc(max_real_segsize);
689 if (NULL == inbuf[0]) { ret = -1; line = __LINE__; goto error_hndl; }
690 if (size > 2) {
691 inbuf[1] = (char*)malloc(max_real_segsize);
692 if (NULL == inbuf[1]) { ret = -1; line = __LINE__; goto error_hndl; }
693 }
694
695 /* Handle MPI_IN_PLACE */
696 if (MPI_IN_PLACE != sbuf) {
697 ret = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf);
698 if (ret < 0) { line = __LINE__; goto error_hndl; }
699 }
700
701 /* Computation loop: for each phase, repeat ring allreduce computation loop */
702 for (phase = 0; phase < num_phases; phase ++) {
703 ptrdiff_t phase_offset;
704 int early_phase_segcount, late_phase_segcount, split_phase, phase_count;
705
706 /*
707 For each of the remote nodes:
708 - post irecv for block (r-1)
709 - send block (r)
710 To do this, first compute block offset and count, and use block offset
711 to compute phase offset.
712 - in loop for every step k = 2 .. n
713 - post irecv for block (r + n - k) % n
714 - wait on block (r + n - k + 1) % n to arrive
715 - compute on block (r + n - k + 1) % n
716 - send block (r + n - k + 1) % n
717 - wait on block (r + 1)
718 - compute on block (r + 1)
719 - send block (r + 1) to rank (r + 1)
720 Note that we must be careful when computing the begining of buffers and
721 for send operations and computation we must compute the exact block size.
722 */
723 send_to = (rank + 1) % size;
724 recv_from = (rank + size - 1) % size;
725
726 inbi = 0;
727 /* Initialize first receive from the neighbor on the left */
728 ret = MCA_PML_CALL(irecv(inbuf[inbi], max_segcount, dtype, recv_from,
729 MCA_COLL_BASE_TAG_ALLREDUCE, comm, &reqs[inbi]));
730 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
731 /* Send first block (my block) to the neighbor on the right:
732 - compute my block and phase offset
733 - send data */
734 block_offset = ((rank < split_rank)?
735 ((ptrdiff_t)rank * (ptrdiff_t)early_blockcount) :
736 ((ptrdiff_t)rank * (ptrdiff_t)late_blockcount + split_rank));
737 block_count = ((rank < split_rank)? early_blockcount : late_blockcount);
738 COLL_BASE_COMPUTE_BLOCKCOUNT(block_count, num_phases, split_phase,
739 early_phase_segcount, late_phase_segcount)
740 phase_count = ((phase < split_phase)?
741 (early_phase_segcount) : (late_phase_segcount));
742 phase_offset = ((phase < split_phase)?
743 ((ptrdiff_t)phase * (ptrdiff_t)early_phase_segcount) :
744 ((ptrdiff_t)phase * (ptrdiff_t)late_phase_segcount + split_phase));
745 tmpsend = ((char*)rbuf) + (ptrdiff_t)(block_offset + phase_offset) * extent;
746 ret = MCA_PML_CALL(send(tmpsend, phase_count, dtype, send_to,
747 MCA_COLL_BASE_TAG_ALLREDUCE,
748 MCA_PML_BASE_SEND_STANDARD, comm));
749 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
750
751 for (k = 2; k < size; k++) {
752 const int prevblock = (rank + size - k + 1) % size;
753
754 inbi = inbi ^ 0x1;
755
756 /* Post irecv for the current block */
757 ret = MCA_PML_CALL(irecv(inbuf[inbi], max_segcount, dtype, recv_from,
758 MCA_COLL_BASE_TAG_ALLREDUCE, comm,
759 &reqs[inbi]));
760 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
761
762 /* Wait on previous block to arrive */
763 ret = ompi_request_wait(&reqs[inbi ^ 0x1], MPI_STATUS_IGNORE);
764 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
765
766 /* Apply operation on previous block: result goes to rbuf
767 rbuf[prevblock] = inbuf[inbi ^ 0x1] (op) rbuf[prevblock]
768 */
769 block_offset = ((prevblock < split_rank)?
770 ((ptrdiff_t)prevblock * (ptrdiff_t)early_blockcount) :
771 ((ptrdiff_t)prevblock * (ptrdiff_t)late_blockcount + split_rank));
772 block_count = ((prevblock < split_rank)?
773 early_blockcount : late_blockcount);
774 COLL_BASE_COMPUTE_BLOCKCOUNT(block_count, num_phases, split_phase,
775 early_phase_segcount, late_phase_segcount)
776 phase_count = ((phase < split_phase)?
777 (early_phase_segcount) : (late_phase_segcount));
778 phase_offset = ((phase < split_phase)?
779 ((ptrdiff_t)phase * (ptrdiff_t)early_phase_segcount) :
780 ((ptrdiff_t)phase * (ptrdiff_t)late_phase_segcount + split_phase));
781 tmprecv = ((char*)rbuf) + (ptrdiff_t)(block_offset + phase_offset) * extent;
782 ompi_op_reduce(op, inbuf[inbi ^ 0x1], tmprecv, phase_count, dtype);
783
784 /* send previous block to send_to */
785 ret = MCA_PML_CALL(send(tmprecv, phase_count, dtype, send_to,
786 MCA_COLL_BASE_TAG_ALLREDUCE,
787 MCA_PML_BASE_SEND_STANDARD, comm));
788 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
789 }
790
791 /* Wait on the last block to arrive */
792 ret = ompi_request_wait(&reqs[inbi], MPI_STATUS_IGNORE);
793 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
794
795 /* Apply operation on the last block (from neighbor (rank + 1)
796 rbuf[rank+1] = inbuf[inbi] (op) rbuf[rank + 1] */
797 recv_from = (rank + 1) % size;
798 block_offset = ((recv_from < split_rank)?
799 ((ptrdiff_t)recv_from * (ptrdiff_t)early_blockcount) :
800 ((ptrdiff_t)recv_from * (ptrdiff_t)late_blockcount + split_rank));
801 block_count = ((recv_from < split_rank)?
802 early_blockcount : late_blockcount);
803 COLL_BASE_COMPUTE_BLOCKCOUNT(block_count, num_phases, split_phase,
804 early_phase_segcount, late_phase_segcount)
805 phase_count = ((phase < split_phase)?
806 (early_phase_segcount) : (late_phase_segcount));
807 phase_offset = ((phase < split_phase)?
808 ((ptrdiff_t)phase * (ptrdiff_t)early_phase_segcount) :
809 ((ptrdiff_t)phase * (ptrdiff_t)late_phase_segcount + split_phase));
810 tmprecv = ((char*)rbuf) + (ptrdiff_t)(block_offset + phase_offset) * extent;
811 ompi_op_reduce(op, inbuf[inbi], tmprecv, phase_count, dtype);
812 }
813
814 /* Distribution loop - variation of ring allgather */
815 send_to = (rank + 1) % size;
816 recv_from = (rank + size - 1) % size;
817 for (k = 0; k < size - 1; k++) {
818 const int recv_data_from = (rank + size - k) % size;
819 const int send_data_from = (rank + 1 + size - k) % size;
820 const int send_block_offset =
821 ((send_data_from < split_rank)?
822 ((ptrdiff_t)send_data_from * (ptrdiff_t)early_blockcount) :
823 ((ptrdiff_t)send_data_from * (ptrdiff_t)late_blockcount + split_rank));
824 const int recv_block_offset =
825 ((recv_data_from < split_rank)?
826 ((ptrdiff_t)recv_data_from * (ptrdiff_t)early_blockcount) :
827 ((ptrdiff_t)recv_data_from * (ptrdiff_t)late_blockcount + split_rank));
828 block_count = ((send_data_from < split_rank)?
829 early_blockcount : late_blockcount);
830
831 tmprecv = (char*)rbuf + (ptrdiff_t)recv_block_offset * extent;
832 tmpsend = (char*)rbuf + (ptrdiff_t)send_block_offset * extent;
833
834 ret = ompi_coll_base_sendrecv(tmpsend, block_count, dtype, send_to,
835 MCA_COLL_BASE_TAG_ALLREDUCE,
836 tmprecv, early_blockcount, dtype, recv_from,
837 MCA_COLL_BASE_TAG_ALLREDUCE,
838 comm, MPI_STATUS_IGNORE, rank);
839 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl;}
840
841 }
842
843 if (NULL != inbuf[0]) free(inbuf[0]);
844 if (NULL != inbuf[1]) free(inbuf[1]);
845
846 return MPI_SUCCESS;
847
848 error_hndl:
849 OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "%s:%4d\tRank %d Error occurred %d\n",
850 __FILE__, line, rank, ret));
851 ompi_coll_base_free_reqs(reqs, 2);
852 (void)line; // silence compiler warning
853 if (NULL != inbuf[0]) free(inbuf[0]);
854 if (NULL != inbuf[1]) free(inbuf[1]);
855 return ret;
856 }
857
858 /*
859 * Linear functions are copied from the BASIC coll module
860 * they do not segment the message and are simple implementations
861 * but for some small number of nodes and/or small data sizes they
862 * are just as fast as base/tree based segmenting operations
863 * and as such may be selected by the decision functions
864 * These are copied into this module due to the way we select modules
865 * in V1. i.e. in V2 we will handle this differently and so will not
866 * have to duplicate code.
867 * GEF Oct05 after asking Jeff.
868 */
869
870 /* copied function (with appropriate renaming) starts here */
871
872
873 /*
874 * allreduce_intra
875 *
876 * Function: - allreduce using other MPI collectives
877 * Accepts: - same as MPI_Allreduce()
878 * Returns: - MPI_SUCCESS or error code
879 */
880 int
881 ompi_coll_base_allreduce_intra_basic_linear(const void *sbuf, void *rbuf, int count,
882 struct ompi_datatype_t *dtype,
883 struct ompi_op_t *op,
884 struct ompi_communicator_t *comm,
885 mca_coll_base_module_t *module)
886 {
887 int err, rank;
888
889 rank = ompi_comm_rank(comm);
890
891 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,"coll:base:allreduce_intra_basic_linear rank %d", rank));
892
893 /* Reduce to 0 and broadcast. */
894
895 if (MPI_IN_PLACE == sbuf) {
896 if (0 == rank) {
897 err = ompi_coll_base_reduce_intra_basic_linear (MPI_IN_PLACE, rbuf, count, dtype,
898 op, 0, comm, module);
899 } else {
900 err = ompi_coll_base_reduce_intra_basic_linear(rbuf, NULL, count, dtype,
901 op, 0, comm, module);
902 }
903 } else {
904 err = ompi_coll_base_reduce_intra_basic_linear(sbuf, rbuf, count, dtype,
905 op, 0, comm, module);
906 }
907 if (MPI_SUCCESS != err) {
908 return err;
909 }
910
911 return ompi_coll_base_bcast_intra_basic_linear(rbuf, count, dtype, 0, comm, module);
912 }
913
914 /*
915 * ompi_coll_base_allreduce_intra_redscat_allgather
916 *
917 * Function: Allreduce using Rabenseifner's algorithm.
918 * Accepts: Same arguments as MPI_Allreduce
919 * Returns: MPI_SUCCESS or error code
920 *
921 * Description: an implementation of Rabenseifner's allreduce algorithm [1, 2].
922 * [1] Rajeev Thakur, Rolf Rabenseifner and William Gropp.
923 * Optimization of Collective Communication Operations in MPICH //
924 * The Int. Journal of High Performance Computing Applications. Vol 19,
925 * Issue 1, pp. 49--66.
926 * [2] http://www.hlrs.de/mpi/myreduce.html.
927 *
928 * This algorithm is a combination of a reduce-scatter implemented with
929 * recursive vector halving and recursive distance doubling, followed either
930 * by an allgather implemented with recursive doubling [1].
931 *
932 * Step 1. If the number of processes is not a power of two, reduce it to
933 * the nearest lower power of two (p' = 2^{\floor{\log_2 p}})
934 * by removing r = p - p' extra processes as follows. In the first 2r processes
935 * (ranks 0 to 2r - 1), all the even ranks send the second half of the input
936 * vector to their right neighbor (rank + 1), and all the odd ranks send
937 * the first half of the input vector to their left neighbor (rank - 1).
938 * The even ranks compute the reduction on the first half of the vector and
939 * the odd ranks compute the reduction on the second half. The odd ranks then
940 * send the result to their left neighbors (the even ranks). As a result,
941 * the even ranks among the first 2r processes now contain the reduction with
942 * the input vector on their right neighbors (the odd ranks). These odd ranks
943 * do not participate in the rest of the algorithm, which leaves behind
944 * a power-of-two number of processes. The first r even-ranked processes and
945 * the last p - 2r processes are now renumbered from 0 to p' - 1.
946 *
947 * Step 2. The remaining processes now perform a reduce-scatter by using
948 * recursive vector halving and recursive distance doubling. The even-ranked
949 * processes send the second half of their buffer to rank + 1 and the odd-ranked
950 * processes send the first half of their buffer to rank - 1. All processes
951 * then compute the reduction between the local buffer and the received buffer.
952 * In the next log_2(p') - 1 steps, the buffers are recursively halved, and the
953 * distance is doubled. At the end, each of the p' processes has 1 / p' of the
954 * total reduction result.
955 *
956 * Step 3. An allgather is performed by using recursive vector doubling and
957 * distance halving. All exchanges are executed in reverse order relative
958 * to recursive doubling on previous step. If the number of processes is not
959 * a power of two, the total result vector must be sent to the r processes
960 * that were removed in the first step.
961 *
962 * Limitations:
963 * count >= 2^{\floor{\log_2 p}}
964 * commutative operations only
965 * intra-communicators only
966 *
967 * Memory requirements (per process):
968 * count * typesize + 4 * \log_2(p) * sizeof(int) = O(count)
969 */
970 int ompi_coll_base_allreduce_intra_redscat_allgather(
971 const void *sbuf, void *rbuf, int count, struct ompi_datatype_t *dtype,
972 struct ompi_op_t *op, struct ompi_communicator_t *comm,
973 mca_coll_base_module_t *module)
974 {
975 int *rindex = NULL, *rcount = NULL, *sindex = NULL, *scount = NULL;
976
977 int comm_size = ompi_comm_size(comm);
978 int rank = ompi_comm_rank(comm);
979 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
980 "coll:base:allreduce_intra_redscat_allgather: rank %d/%d",
981 rank, comm_size));
982
983 /* Find nearest power-of-two less than or equal to comm_size */
984 int nsteps = opal_hibit(comm_size, comm->c_cube_dim + 1); /* ilog2(comm_size) */
985 assert(nsteps >= 0);
986 int nprocs_pof2 = 1 << nsteps; /* flp2(comm_size) */
987
988 if (count < nprocs_pof2 || !ompi_op_is_commute(op)) {
989 OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
990 "coll:base:allreduce_intra_redscat_allgather: rank %d/%d "
991 "count %d switching to basic linear allreduce",
992 rank, comm_size, count));
993 return ompi_coll_base_allreduce_intra_basic_linear(sbuf, rbuf, count, dtype,
994 op, comm, module);
995 }
996
997 int err = MPI_SUCCESS;
998 ptrdiff_t lb, extent, dsize, gap = 0;
999 ompi_datatype_get_extent(dtype, &lb, &extent);
1000 dsize = opal_datatype_span(&dtype->super, count, &gap);
1001
1002 /* Temporary buffer for receiving messages */
1003 char *tmp_buf = NULL;
1004 char *tmp_buf_raw = (char *)malloc(dsize);
1005 if (NULL == tmp_buf_raw)
1006 return OMPI_ERR_OUT_OF_RESOURCE;
1007 tmp_buf = tmp_buf_raw - gap;
1008
1009 if (sbuf != MPI_IN_PLACE) {
1010 err = ompi_datatype_copy_content_same_ddt(dtype, count, (char *)rbuf,
1011 (char *)sbuf);
1012 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
1013 }
1014
1015 /*
1016 * Step 1. Reduce the number of processes to the nearest lower power of two
1017 * p' = 2^{\floor{\log_2 p}} by removing r = p - p' processes.
1018 * 1. In the first 2r processes (ranks 0 to 2r - 1), all the even ranks send
1019 * the second half of the input vector to their right neighbor (rank + 1)
1020 * and all the odd ranks send the first half of the input vector to their
1021 * left neighbor (rank - 1).
1022 * 2. All 2r processes compute the reduction on their half.
1023 * 3. The odd ranks then send the result to their left neighbors
1024 * (the even ranks).
1025 *
1026 * The even ranks (0 to 2r - 1) now contain the reduction with the input
1027 * vector on their right neighbors (the odd ranks). The first r even
1028 * processes and the p - 2r last processes are renumbered from
1029 * 0 to 2^{\floor{\log_2 p}} - 1.
1030 */
1031
1032 int vrank, step, wsize;
1033 int nprocs_rem = comm_size - nprocs_pof2;
1034
1035 if (rank < 2 * nprocs_rem) {
1036 int count_lhalf = count / 2;
1037 int count_rhalf = count - count_lhalf;
1038
1039 if (rank % 2 != 0) {
1040 /*
1041 * Odd process -- exchange with rank - 1
1042 * Send the left half of the input vector to the left neighbor,
1043 * Recv the right half of the input vector from the left neighbor
1044 */
1045 err = ompi_coll_base_sendrecv(rbuf, count_lhalf, dtype, rank - 1,
1046 MCA_COLL_BASE_TAG_ALLREDUCE,
1047 (char *)tmp_buf + (ptrdiff_t)count_lhalf * extent,
1048 count_rhalf, dtype, rank - 1,
1049 MCA_COLL_BASE_TAG_ALLREDUCE, comm,
1050 MPI_STATUS_IGNORE, rank);
1051 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
1052
1053 /* Reduce on the right half of the buffers (result in rbuf) */
1054 ompi_op_reduce(op, (char *)tmp_buf + (ptrdiff_t)count_lhalf * extent,
1055 (char *)rbuf + count_lhalf * extent, count_rhalf, dtype);
1056
1057 /* Send the right half to the left neighbor */
1058 err = MCA_PML_CALL(send((char *)rbuf + (ptrdiff_t)count_lhalf * extent,
1059 count_rhalf, dtype, rank - 1,
1060 MCA_COLL_BASE_TAG_ALLREDUCE,
1061 MCA_PML_BASE_SEND_STANDARD, comm));
1062 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
1063
1064 /* This process does not pariticipate in recursive doubling phase */
1065 vrank = -1;
1066
1067 } else {
1068 /*
1069 * Even process -- exchange with rank + 1
1070 * Send the right half of the input vector to the right neighbor,
1071 * Recv the left half of the input vector from the right neighbor
1072 */
1073 err = ompi_coll_base_sendrecv((char *)rbuf + (ptrdiff_t)count_lhalf * extent,
1074 count_rhalf, dtype, rank + 1,
1075 MCA_COLL_BASE_TAG_ALLREDUCE,
1076 tmp_buf, count_lhalf, dtype, rank + 1,
1077 MCA_COLL_BASE_TAG_ALLREDUCE, comm,
1078 MPI_STATUS_IGNORE, rank);
1079 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
1080
1081 /* Reduce on the right half of the buffers (result in rbuf) */
1082 ompi_op_reduce(op, tmp_buf, rbuf, count_lhalf, dtype);
1083
1084 /* Recv the right half from the right neighbor */
1085 err = MCA_PML_CALL(recv((char *)rbuf + (ptrdiff_t)count_lhalf * extent,
1086 count_rhalf, dtype, rank + 1,
1087 MCA_COLL_BASE_TAG_ALLREDUCE, comm,
1088 MPI_STATUS_IGNORE));
1089 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
1090
1091 vrank = rank / 2;
1092 }
1093 } else { /* rank >= 2 * nprocs_rem */
1094 vrank = rank - nprocs_rem;
1095 }
1096
1097 /*
1098 * Step 2. Reduce-scatter implemented with recursive vector halving and
1099 * recursive distance doubling. We have p' = 2^{\floor{\log_2 p}}
1100 * power-of-two number of processes with new ranks (vrank) and result in rbuf.
1101 *
1102 * The even-ranked processes send the right half of their buffer to rank + 1
1103 * and the odd-ranked processes send the left half of their buffer to
1104 * rank - 1. All processes then compute the reduction between the local
1105 * buffer and the received buffer. In the next \log_2(p') - 1 steps, the
1106 * buffers are recursively halved, and the distance is doubled. At the end,
1107 * each of the p' processes has 1 / p' of the total reduction result.
1108 */
1109 rindex = malloc(sizeof(*rindex) * nsteps);
1110 sindex = malloc(sizeof(*sindex) * nsteps);
1111 rcount = malloc(sizeof(*rcount) * nsteps);
1112 scount = malloc(sizeof(*scount) * nsteps);
1113 if (NULL == rindex || NULL == sindex || NULL == rcount || NULL == scount) {
1114 err = OMPI_ERR_OUT_OF_RESOURCE;
1115 goto cleanup_and_return;
1116 }
1117
1118 if (vrank != -1) {
1119 step = 0;
1120 wsize = count;
1121 sindex[0] = rindex[0] = 0;
1122
1123 for (int mask = 1; mask < nprocs_pof2; mask <<= 1) {
1124 /*
1125 * On each iteration: rindex[step] = sindex[step] -- begining of the
1126 * current window. Length of the current window is storded in wsize.
1127 */
1128 int vdest = vrank ^ mask;
1129 /* Translate vdest virtual rank to real rank */
1130 int dest = (vdest < nprocs_rem) ? vdest * 2 : vdest + nprocs_rem;
1131
1132 if (rank < dest) {
1133 /*
1134 * Recv into the left half of the current window, send the right
1135 * half of the window to the peer (perform reduce on the left
1136 * half of the current window)
1137 */
1138 rcount[step] = wsize / 2;
1139 scount[step] = wsize - rcount[step];
1140 sindex[step] = rindex[step] + rcount[step];
1141 } else {
1142 /*
1143 * Recv into the right half of the current window, send the left
1144 * half of the window to the peer (perform reduce on the right
1145 * half of the current window)
1146 */
1147 scount[step] = wsize / 2;
1148 rcount[step] = wsize - scount[step];
1149 rindex[step] = sindex[step] + scount[step];
1150 }
1151
1152 /* Send part of data from the rbuf, recv into the tmp_buf */
1153 err = ompi_coll_base_sendrecv((char *)rbuf + (ptrdiff_t)sindex[step] * extent,
1154 scount[step], dtype, dest,
1155 MCA_COLL_BASE_TAG_ALLREDUCE,
1156 (char *)tmp_buf + (ptrdiff_t)rindex[step] * extent,
1157 rcount[step], dtype, dest,
1158 MCA_COLL_BASE_TAG_ALLREDUCE, comm,
1159 MPI_STATUS_IGNORE, rank);
1160 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
1161
1162 /* Local reduce: rbuf[] = tmp_buf[] <op> rbuf[] */
1163 ompi_op_reduce(op, (char *)tmp_buf + (ptrdiff_t)rindex[step] * extent,
1164 (char *)rbuf + (ptrdiff_t)rindex[step] * extent,
1165 rcount[step], dtype);
1166
1167 /* Move the current window to the received message */
1168 if (step + 1 < nsteps) {
1169 rindex[step + 1] = rindex[step];
1170 sindex[step + 1] = rindex[step];
1171 wsize = rcount[step];
1172 step++;
1173 }
1174 }
1175 /*
1176 * Assertion: each process has 1 / p' of the total reduction result:
1177 * rcount[nsteps - 1] elements in the rbuf[rindex[nsteps - 1], ...].
1178 */
1179
1180 /*
1181 * Step 3. Allgather by the recursive doubling algorithm.
1182 * Each process has 1 / p' of the total reduction result:
1183 * rcount[nsteps - 1] elements in the rbuf[rindex[nsteps - 1], ...].
1184 * All exchanges are executed in reverse order relative
1185 * to recursive doubling (previous step).
1186 */
1187
1188 step = nsteps - 1;
1189
1190 for (int mask = nprocs_pof2 >> 1; mask > 0; mask >>= 1) {
1191 int vdest = vrank ^ mask;
1192 /* Translate vdest virtual rank to real rank */
1193 int dest = (vdest < nprocs_rem) ? vdest * 2 : vdest + nprocs_rem;
1194
1195 /*
1196 * Send rcount[step] elements from rbuf[rindex[step]...]
1197 * Recv scount[step] elements to rbuf[sindex[step]...]
1198 */
1199 err = ompi_coll_base_sendrecv((char *)rbuf + (ptrdiff_t)rindex[step] * extent,
1200 rcount[step], dtype, dest,
1201 MCA_COLL_BASE_TAG_ALLREDUCE,
1202 (char *)rbuf + (ptrdiff_t)sindex[step] * extent,
1203 scount[step], dtype, dest,
1204 MCA_COLL_BASE_TAG_ALLREDUCE, comm,
1205 MPI_STATUS_IGNORE, rank);
1206 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
1207 step--;
1208 }
1209 }
1210
1211 /*
1212 * Step 4. Send total result to excluded odd ranks.
1213 */
1214 if (rank < 2 * nprocs_rem) {
1215 if (rank % 2 != 0) {
1216 /* Odd process -- recv result from rank - 1 */
1217 err = MCA_PML_CALL(recv(rbuf, count, dtype, rank - 1,
1218 MCA_COLL_BASE_TAG_ALLREDUCE, comm,
1219 MPI_STATUS_IGNORE));
1220 if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
1221
1222 } else {
1223 /* Even process -- send result to rank + 1 */
1224 err = MCA_PML_CALL(send(rbuf, count, dtype, rank + 1,
1225 MCA_COLL_BASE_TAG_ALLREDUCE,
1226 MCA_PML_BASE_SEND_STANDARD, comm));
1227 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
1228 }
1229 }
1230
1231 cleanup_and_return:
1232 if (NULL != tmp_buf_raw)
1233 free(tmp_buf_raw);
1234 if (NULL != rindex)
1235 free(rindex);
1236 if (NULL != sindex)
1237 free(sindex);
1238 if (NULL != rcount)
1239 free(rcount);
1240 if (NULL != scount)
1241 free(scount);
1242 return err;
1243 }
1244
1245 /* copied function (with appropriate renaming) ends here */