1 /*
2 * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
3 * University Research and Technology
4 * Corporation. All rights reserved.
5 * Copyright (c) 2004-2015 The University of Tennessee and The University
6 * of Tennessee Research Foundation. All rights
7 * reserved.
8 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
9 * University of Stuttgart. All rights reserved.
10 * Copyright (c) 2004-2005 The Regents of the University of California.
11 * All rights reserved.
12 * Copyright (c) 2015 Research Organization for Information Science
13 * and Technology (RIST). All rights reserved.
14 * $COPYRIGHT$
15 *
16 * Additional copyrights may follow
17 *
18 * $HEADER$
19 */
20
21 #include "ompi_config.h"
22 #include "coll_basic.h"
23
24 #include <stdio.h>
25
26 #include "mpi.h"
27 #include "ompi/constants.h"
28 #include "ompi/mca/coll/coll.h"
29 #include "ompi/mca/coll/base/coll_tags.h"
30 #include "ompi/mca/pml/pml.h"
31 #include "ompi/op/op.h"
32
33
34 /*
35 * reduce_log_intra
36 *
37 * Function: - reduction using O(log N) algorithm
38 * Accepts: - same as MPI_Reduce()
39 * Returns: - MPI_SUCCESS or error code
40 *
41 *
42 * Performing reduction on each dimension of the hypercube.
43 * An example for 8 procs (dimensions = 3):
44 *
45 * Stage 1, reduce on X dimension, 1 -> 0, 3 -> 2, 5 -> 4, 7 -> 6
46 *
47 * 6----<---7 proc_0: 0+1
48 * /| /| proc_1: 1
49 * / | / | proc_2: 2+3
50 * / | / | proc_3: 3
51 * 4----<---5 | proc_4: 4+5
52 * | 2--< |---3 proc_5: 5
53 * | / | / proc_6: 6+7
54 * | / | / proc_7: 7
55 * |/ |/
56 * 0----<---1
57 *
58 * Stage 2, reduce on Y dimension, 2 -> 0, 6 -> 4
59 *
60 * 6--------7 proc_0: 0+1+2+3
61 * /| /| proc_1: 1
62 * v | / | proc_2: 2+3
63 * / | / | proc_3: 3
64 * 4--------5 | proc_4: 4+5+6+7
65 * | 2--- |---3 proc_5: 5
66 * | / | / proc_6: 6+7
67 * | v | / proc_7: 7
68 * |/ |/
69 * 0--------1
70 *
71 * Stage 3, reduce on Z dimension, 4 -> 0
72 *
73 * 6--------7 proc_0: 0+1+2+3+4+5+6+7
74 * /| /| proc_1: 1
75 * / | / | proc_2: 2+3
76 * / | / | proc_3: 3
77 * 4--------5 | proc_4: 4+5+6+7
78 * | 2--- |---3 proc_5: 5
79 * v / | / proc_6: 6+7
80 * | / | / proc_7: 7
81 * |/ |/
82 * 0--------1
83 *
84 *
85 */
86 int
87 mca_coll_basic_reduce_log_intra(const void *sbuf, void *rbuf, int count,
88 struct ompi_datatype_t *dtype,
89 struct ompi_op_t *op,
90 int root, struct ompi_communicator_t *comm,
91 mca_coll_base_module_t *module)
92 {
93 int i, size, rank, vrank;
94 int err, peer, dim, mask;
95 ptrdiff_t lb, extent, dsize, gap;
96 char *free_buffer = NULL;
97 char *free_rbuf = NULL;
98 char *pml_buffer = NULL;
99 char *snd_buffer = NULL;
100 char *rcv_buffer = (char*)rbuf;
101 char *inplace_temp = NULL;
102
103 /* JMS Codearound for now -- if the operations is not communative,
104 * just call the linear algorithm. Need to talk to Edgar / George
105 * about fixing this algorithm here to work with non-communative
106 * operations. */
107
108 if (!ompi_op_is_commute(op)) {
109 return ompi_coll_base_reduce_intra_basic_linear(sbuf, rbuf, count, dtype,
110 op, root, comm, module);
111 }
112
113 /* Some variables */
114 size = ompi_comm_size(comm);
115 rank = ompi_comm_rank(comm);
116 vrank = ompi_op_is_commute(op) ? (rank - root + size) % size : rank;
117 dim = comm->c_cube_dim;
118
119 /* Allocate the incoming and resulting message buffers. See lengthy
120 * rationale above. */
121
122 ompi_datatype_get_extent(dtype, &lb, &extent);
123 dsize = opal_datatype_span(&dtype->super, count, &gap);
124
125 free_buffer = (char*)malloc(dsize);
126 if (NULL == free_buffer) {
127 return OMPI_ERR_OUT_OF_RESOURCE;
128 }
129
130 pml_buffer = free_buffer - gap;
131 /* read the comment about commutative operations (few lines down
132 * the page) */
133 if (ompi_op_is_commute(op)) {
134 rcv_buffer = pml_buffer;
135 }
136
137 /* Allocate sendbuf in case the MPI_IN_PLACE option has been used. See lengthy
138 * rationale above. */
139
140 if (MPI_IN_PLACE == sbuf) {
141 inplace_temp = (char*)malloc(dsize);
142 if (NULL == inplace_temp) {
143 err = OMPI_ERR_OUT_OF_RESOURCE;
144 goto cleanup_and_return;
145 }
146 sbuf = inplace_temp - gap;
147 err = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)sbuf, (char*)rbuf);
148 }
149 snd_buffer = (char*)sbuf;
150
151 if (rank != root && 0 == (vrank & 1)) {
152 /* root is the only one required to provide a valid rbuf.
153 * Assume rbuf is invalid for all other ranks, so fix it up
154 * here to be valid on all non-leaf ranks */
155 free_rbuf = (char*)malloc(dsize);
156 if (NULL == free_rbuf) {
157 err = OMPI_ERR_OUT_OF_RESOURCE;
158 goto cleanup_and_return;
159 }
160 rbuf = free_rbuf - gap;
161 }
162
163 /* Loop over cube dimensions. High processes send to low ones in the
164 * dimension. */
165
166 for (i = 0, mask = 1; i < dim; ++i, mask <<= 1) {
167
168 /* A high-proc sends to low-proc and stops. */
169 if (vrank & mask) {
170 peer = vrank & ~mask;
171 if (ompi_op_is_commute(op)) {
172 peer = (peer + root) % size;
173 }
174
175 err = MCA_PML_CALL(send(snd_buffer, count,
176 dtype, peer, MCA_COLL_BASE_TAG_REDUCE,
177 MCA_PML_BASE_SEND_STANDARD, comm));
178 if (MPI_SUCCESS != err) {
179 goto cleanup_and_return;
180 }
181 snd_buffer = (char*)rbuf;
182 break;
183 }
184
185 /* A low-proc receives, reduces, and moves to a higher
186 * dimension. */
187
188 else {
189 peer = vrank | mask;
190 if (peer >= size) {
191 continue;
192 }
193 if (ompi_op_is_commute(op)) {
194 peer = (peer + root) % size;
195 }
196
197 /* Most of the time (all except the first one for commutative
198 * operations) we receive in the user provided buffer
199 * (rbuf). But the exception is here to allow us to dont have
200 * to copy from the sbuf to a temporary location. If the
201 * operation is commutative we dont care in which order we
202 * apply the operation, so for the first time we can receive
203 * the data in the pml_buffer and then apply to operation
204 * between this buffer and the user provided data. */
205
206 err = MCA_PML_CALL(recv(rcv_buffer, count, dtype, peer,
207 MCA_COLL_BASE_TAG_REDUCE, comm,
208 MPI_STATUS_IGNORE));
209 if (MPI_SUCCESS != err) {
210 goto cleanup_and_return;
211 }
212 /* Perform the operation. The target is always the user
213 * provided buffer We do the operation only if we receive it
214 * not in the user buffer */
215 if (snd_buffer != sbuf) {
216 /* the target buffer is the locally allocated one */
217 ompi_op_reduce(op, rcv_buffer, pml_buffer, count, dtype);
218 } else {
219 /* If we're commutative, we don't care about the order of
220 * operations and we can just reduce the operations now.
221 * If we are not commutative, we have to copy the send
222 * buffer into a temp buffer (pml_buffer) and then reduce
223 * what we just received against it. */
224 if (!ompi_op_is_commute(op)) {
225 ompi_datatype_copy_content_same_ddt(dtype, count, pml_buffer,
226 (char*)sbuf);
227 ompi_op_reduce(op, rbuf, pml_buffer, count, dtype);
228 } else {
229 ompi_op_reduce(op, (void *)sbuf, pml_buffer, count, dtype);
230 }
231 /* now we have to send the buffer containing the computed data */
232 snd_buffer = pml_buffer;
233 /* starting from now we always receive in the user
234 * provided buffer */
235 rcv_buffer = (char*)rbuf;
236 }
237 }
238 }
239
240 /* Get the result to the root if needed. */
241 err = MPI_SUCCESS;
242 if (0 == vrank) {
243 if (root == rank) {
244 ompi_datatype_copy_content_same_ddt(dtype, count, (char*)rbuf, snd_buffer);
245 } else {
246 err = MCA_PML_CALL(send(snd_buffer, count,
247 dtype, root, MCA_COLL_BASE_TAG_REDUCE,
248 MCA_PML_BASE_SEND_STANDARD, comm));
249 }
250 } else if (rank == root) {
251 err = MCA_PML_CALL(recv(rcv_buffer, count, dtype, 0,
252 MCA_COLL_BASE_TAG_REDUCE,
253 comm, MPI_STATUS_IGNORE));
254 if (rcv_buffer != rbuf) {
255 ompi_op_reduce(op, rcv_buffer, rbuf, count, dtype);
256 }
257 }
258
259 cleanup_and_return:
260 if (NULL != inplace_temp) {
261 free(inplace_temp);
262 }
263 if (NULL != free_buffer) {
264 free(free_buffer);
265 }
266 if (NULL != free_rbuf) {
267 free(free_rbuf);
268 }
269
270 /* All done */
271
272 return err;
273 }
274
275
276 /*
277 * reduce_lin_inter
278 *
279 * Function: - reduction using O(N) algorithm
280 * Accepts: - same as MPI_Reduce()
281 * Returns: - MPI_SUCCESS or error code
282 */
283 int
284 mca_coll_basic_reduce_lin_inter(const void *sbuf, void *rbuf, int count,
285 struct ompi_datatype_t *dtype,
286 struct ompi_op_t *op,
287 int root, struct ompi_communicator_t *comm,
288 mca_coll_base_module_t *module)
289 {
290 int i, err, size;
291 ptrdiff_t dsize, gap;
292 char *free_buffer = NULL;
293 char *pml_buffer = NULL;
294
295 /* Initialize */
296 size = ompi_comm_remote_size(comm);
297
298 if (MPI_PROC_NULL == root) {
299 /* do nothing */
300 err = OMPI_SUCCESS;
301 } else if (MPI_ROOT != root) {
302 /* If not root, send data to the root. */
303 err = MCA_PML_CALL(send(sbuf, count, dtype, root,
304 MCA_COLL_BASE_TAG_REDUCE,
305 MCA_PML_BASE_SEND_STANDARD, comm));
306 } else {
307 /* Root receives and reduces messages */
308 dsize = opal_datatype_span(&dtype->super, count, &gap);
309
310 free_buffer = (char*)malloc(dsize);
311 if (NULL == free_buffer) {
312 return OMPI_ERR_OUT_OF_RESOURCE;
313 }
314 pml_buffer = free_buffer - gap;
315
316
317 /* Initialize the receive buffer. */
318 err = MCA_PML_CALL(recv(rbuf, count, dtype, 0,
319 MCA_COLL_BASE_TAG_REDUCE, comm,
320 MPI_STATUS_IGNORE));
321 if (MPI_SUCCESS != err) {
322 if (NULL != free_buffer) {
323 free(free_buffer);
324 }
325 return err;
326 }
327
328 /* Loop receiving and calling reduction function (C or Fortran). */
329 for (i = 1; i < size; i++) {
330 err = MCA_PML_CALL(recv(pml_buffer, count, dtype, i,
331 MCA_COLL_BASE_TAG_REDUCE, comm,
332 MPI_STATUS_IGNORE));
333 if (MPI_SUCCESS != err) {
334 if (NULL != free_buffer) {
335 free(free_buffer);
336 }
337 return err;
338 }
339
340 /* Perform the reduction */
341 ompi_op_reduce(op, pml_buffer, rbuf, count, dtype);
342 }
343
344 if (NULL != free_buffer) {
345 free(free_buffer);
346 }
347 }
348
349 /* All done */
350 return err;
351 }
352
353
354 /*
355 * reduce_log_inter
356 *
357 * Function: - reduction using O(N) algorithm
358 * Accepts: - same as MPI_Reduce()
359 * Returns: - MPI_SUCCESS or error code
360 */
361 int
362 mca_coll_basic_reduce_log_inter(const void *sbuf, void *rbuf, int count,
363 struct ompi_datatype_t *dtype,
364 struct ompi_op_t *op,
365 int root, struct ompi_communicator_t *comm,
366 mca_coll_base_module_t *module)
367 {
368 return OMPI_ERR_NOT_IMPLEMENTED;
369 }