This source file includes following definitions.
- mca_coll_basic_reduce_scatter_intra
- mca_coll_basic_reduce_scatter_inter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 #include "ompi_config.h"
27 #include "coll_basic.h"
28
29 #include <stdio.h>
30 #include <errno.h>
31
32 #include "mpi.h"
33 #include "opal/util/bit_ops.h"
34 #include "ompi/constants.h"
35 #include "ompi/mca/coll/coll.h"
36 #include "ompi/mca/coll/base/coll_tags.h"
37 #include "ompi/mca/pml/pml.h"
38 #include "ompi/datatype/ompi_datatype.h"
39 #include "coll_basic.h"
40 #include "ompi/op/op.h"
41
42 #define COMMUTATIVE_LONG_MSG (8 * 1024 * 1024)
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66 int
67 mca_coll_basic_reduce_scatter_intra(const void *sbuf, void *rbuf, const int *rcounts,
68 struct ompi_datatype_t *dtype,
69 struct ompi_op_t *op,
70 struct ompi_communicator_t *comm,
71 mca_coll_base_module_t *module)
72 {
73 int i, rank, size, count, err = OMPI_SUCCESS;
74 ptrdiff_t extent, buf_size, gap;
75 int *disps = NULL;
76 char *recv_buf = NULL, *recv_buf_free = NULL;
77 char *result_buf = NULL, *result_buf_free = NULL;
78
79 rank = ompi_comm_rank(comm);
80 size = ompi_comm_size(comm);
81
82
83 disps = (int*) malloc(sizeof(int) * size);
84 if (NULL == disps) return OMPI_ERR_OUT_OF_RESOURCE;
85
86 disps[0] = 0;
87 for (i = 0; i < (size - 1); ++i) {
88 disps[i + 1] = disps[i] + rcounts[i];
89 }
90 count = disps[size - 1] + rcounts[size - 1];
91
92
93 if (0 == count) {
94 free(disps);
95 return OMPI_SUCCESS;
96 }
97
98
99 ompi_datatype_type_extent(dtype, &extent);
100 buf_size = opal_datatype_span(&dtype->super, count, &gap);
101
102
103 if (MPI_IN_PLACE == sbuf) {
104 sbuf = rbuf;
105 }
106
107 if ((op->o_flags & OMPI_OP_FLAGS_COMMUTE) &&
108 (buf_size < COMMUTATIVE_LONG_MSG)) {
109 int tmp_size, remain = 0, tmp_rank;
110
111
112 recv_buf_free = (char*) malloc(buf_size);
113 recv_buf = recv_buf_free - gap;
114 if (NULL == recv_buf_free) {
115 err = OMPI_ERR_OUT_OF_RESOURCE;
116 goto cleanup;
117 }
118
119
120 result_buf_free = (char*) malloc(buf_size);
121 result_buf = result_buf_free - gap;
122
123
124 err = ompi_datatype_sndrcv(sbuf, count, dtype, result_buf, count, dtype);
125 if (OMPI_SUCCESS != err) goto cleanup;
126
127
128
129
130 tmp_size = opal_next_poweroftwo(size);
131 tmp_size >>= 1;
132 remain = size - tmp_size;
133
134
135
136
137 if (rank < 2 * remain) {
138 if ((rank & 1) == 0) {
139 err = MCA_PML_CALL(send(result_buf, count, dtype, rank + 1,
140 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
141 MCA_PML_BASE_SEND_STANDARD,
142 comm));
143 if (OMPI_SUCCESS != err) goto cleanup;
144
145
146 tmp_rank = -1;
147 } else {
148 err = MCA_PML_CALL(recv(recv_buf, count, dtype, rank - 1,
149 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
150 comm, MPI_STATUS_IGNORE));
151 if (OMPI_SUCCESS != err) goto cleanup;
152
153
154 ompi_op_reduce(op, recv_buf, result_buf, count, dtype);
155
156
157 tmp_rank = rank / 2;
158 }
159 } else {
160
161
162 tmp_rank = rank - remain;
163 }
164
165
166
167 if (tmp_rank >= 0) {
168 int *tmp_disps = NULL, *tmp_rcounts = NULL;
169 int mask, send_index, recv_index, last_index;
170
171
172
173
174 tmp_rcounts = (int*) malloc(tmp_size * sizeof(int));
175 if (NULL == tmp_rcounts) {
176 err = OMPI_ERR_OUT_OF_RESOURCE;
177 goto cleanup;
178 }
179 tmp_disps = (int*) malloc(tmp_size * sizeof(int));
180 if (NULL == tmp_disps) {
181 free(tmp_rcounts);
182 err = OMPI_ERR_OUT_OF_RESOURCE;
183 goto cleanup;
184 }
185
186 for (i = 0 ; i < tmp_size ; ++i) {
187 if (i < remain) {
188
189 tmp_rcounts[i] = rcounts[i * 2 + 1] + rcounts[i * 2];
190 } else {
191 tmp_rcounts[i] = rcounts[i + remain];
192 }
193 }
194
195 tmp_disps[0] = 0;
196 for (i = 0; i < tmp_size - 1; ++i) {
197 tmp_disps[i + 1] = tmp_disps[i] + tmp_rcounts[i];
198 }
199
200
201
202
203
204 mask = tmp_size >> 1;
205 send_index = recv_index = 0;
206 last_index = tmp_size;
207 while (mask > 0) {
208 int tmp_peer, peer, send_count, recv_count;
209 struct ompi_request_t *request;
210
211 tmp_peer = tmp_rank ^ mask;
212 peer = (tmp_peer < remain) ? tmp_peer * 2 + 1 : tmp_peer + remain;
213
214
215 send_count = recv_count = 0;
216 if (tmp_rank < tmp_peer) {
217 send_index = recv_index + mask;
218 for (i = send_index ; i < last_index ; ++i) {
219 send_count += tmp_rcounts[i];
220 }
221 for (i = recv_index ; i < send_index ; ++i) {
222 recv_count += tmp_rcounts[i];
223 }
224 } else {
225 recv_index = send_index + mask;
226 for (i = send_index ; i < recv_index ; ++i) {
227 send_count += tmp_rcounts[i];
228 }
229 for (i = recv_index ; i < last_index ; ++i) {
230 recv_count += tmp_rcounts[i];
231 }
232 }
233
234
235
236 if (recv_count > 0) {
237 err = MCA_PML_CALL(irecv(recv_buf + tmp_disps[recv_index] * extent,
238 recv_count, dtype, peer,
239 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
240 comm, &request));
241 if (OMPI_SUCCESS != err) {
242 free(tmp_rcounts);
243 free(tmp_disps);
244 goto cleanup;
245 }
246 }
247 if (send_count > 0) {
248 err = MCA_PML_CALL(send(result_buf + tmp_disps[send_index] * extent,
249 send_count, dtype, peer,
250 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
251 MCA_PML_BASE_SEND_STANDARD,
252 comm));
253 if (OMPI_SUCCESS != err) {
254 free(tmp_rcounts);
255 free(tmp_disps);
256 goto cleanup;
257 }
258 }
259
260
261
262 if (recv_count > 0) {
263 err = ompi_request_wait(&request, MPI_STATUS_IGNORE);
264 if (OMPI_SUCCESS != err) {
265 free(tmp_rcounts);
266 free(tmp_disps);
267 goto cleanup;
268 }
269
270 ompi_op_reduce(op,
271 recv_buf + tmp_disps[recv_index] * extent,
272 result_buf + tmp_disps[recv_index] * extent,
273 recv_count, dtype);
274 }
275
276
277 send_index = recv_index;
278 last_index = recv_index + mask;
279 mask >>= 1;
280 }
281
282
283 if (0 != rcounts[rank]) {
284 err = ompi_datatype_sndrcv(result_buf + disps[rank] * extent,
285 rcounts[rank], dtype,
286 rbuf, rcounts[rank], dtype);
287 if (OMPI_SUCCESS != err) {
288 free(tmp_rcounts);
289 free(tmp_disps);
290 goto cleanup;
291 }
292 }
293
294 free(tmp_rcounts);
295 free(tmp_disps);
296 }
297
298
299
300 if (rank < 2 * remain) {
301 if ((rank & 1) == 0) {
302 if (rcounts[rank]) {
303 err = MCA_PML_CALL(recv(rbuf, rcounts[rank], dtype, rank + 1,
304 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
305 comm, MPI_STATUS_IGNORE));
306 if (OMPI_SUCCESS != err) goto cleanup;
307 }
308 } else {
309 if (rcounts[rank - 1]) {
310 err = MCA_PML_CALL(send(result_buf + disps[rank - 1] * extent,
311 rcounts[rank - 1], dtype, rank - 1,
312 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
313 MCA_PML_BASE_SEND_STANDARD,
314 comm));
315 if (OMPI_SUCCESS != err) goto cleanup;
316 }
317 }
318 }
319
320 } else {
321 if (0 == rank) {
322
323
324 recv_buf_free = (char*) malloc(buf_size);
325 recv_buf = recv_buf_free - gap;
326 if (NULL == recv_buf_free) {
327 err = OMPI_ERR_OUT_OF_RESOURCE;
328 goto cleanup;
329 }
330 }
331
332
333 err =
334 comm->c_coll->coll_reduce(sbuf, recv_buf, count, dtype, op, 0,
335 comm, comm->c_coll->coll_reduce_module);
336
337
338 if (MPI_SUCCESS == err) {
339 err = comm->c_coll->coll_scatterv(recv_buf, rcounts, disps, dtype,
340 rbuf, rcounts[rank], dtype, 0,
341 comm, comm->c_coll->coll_scatterv_module);
342 }
343 }
344
345 cleanup:
346 if (NULL != disps) free(disps);
347 if (NULL != recv_buf_free) free(recv_buf_free);
348 if (NULL != result_buf_free) free(result_buf_free);
349
350 return err;
351 }
352
353
354
355
356
357
358
359
360
361 int
362 mca_coll_basic_reduce_scatter_inter(const void *sbuf, void *rbuf, const int *rcounts,
363 struct ompi_datatype_t *dtype,
364 struct ompi_op_t *op,
365 struct ompi_communicator_t *comm,
366 mca_coll_base_module_t *module)
367 {
368 int err, i, rank, root = 0, rsize, lsize, totalcounts;
369 char *tmpbuf = NULL, *tmpbuf2 = NULL, *lbuf = NULL, *buf;
370 ptrdiff_t gap, span;
371 ompi_request_t *req;
372 int *disps = NULL;
373
374 rank = ompi_comm_rank(comm);
375 rsize = ompi_comm_remote_size(comm);
376 lsize = ompi_comm_size(comm);
377
378
379 for (totalcounts = 0, i = 0; i < lsize; i++) {
380 totalcounts += rcounts[i];
381 }
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400 if (rank == root) {
401 span = opal_datatype_span(&dtype->super, totalcounts, &gap);
402
403
404 disps = (int*) malloc(sizeof(int) * lsize);
405 if (NULL == disps) {
406 return OMPI_ERR_OUT_OF_RESOURCE;
407 }
408 disps[0] = 0;
409 for (i = 0; i < (lsize - 1); ++i) {
410 disps[i + 1] = disps[i] + rcounts[i];
411 }
412
413 tmpbuf = (char *) malloc(span);
414 tmpbuf2 = (char *) malloc(span);
415 if (NULL == tmpbuf || NULL == tmpbuf2) {
416 err = OMPI_ERR_OUT_OF_RESOURCE;
417 goto exit;
418 }
419 lbuf = tmpbuf - gap;
420 buf = tmpbuf2 - gap;
421
422
423 err = MCA_PML_CALL(isend(sbuf, totalcounts, dtype, 0,
424 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
425 MCA_PML_BASE_SEND_STANDARD, comm, &req));
426 if (OMPI_SUCCESS != err) {
427 goto exit;
428 }
429
430 err = MCA_PML_CALL(recv(lbuf, totalcounts, dtype, 0,
431 MCA_COLL_BASE_TAG_REDUCE_SCATTER, comm,
432 MPI_STATUS_IGNORE));
433 if (OMPI_SUCCESS != err) {
434 goto exit;
435 }
436
437 err = ompi_request_wait( &req, MPI_STATUS_IGNORE);
438 if (OMPI_SUCCESS != err) {
439 goto exit;
440 }
441
442
443
444
445
446
447 for (i = 1; i < rsize; i++) {
448 char *tbuf;
449 err = MCA_PML_CALL(recv(buf, totalcounts, dtype, i,
450 MCA_COLL_BASE_TAG_REDUCE_SCATTER, comm,
451 MPI_STATUS_IGNORE));
452 if (MPI_SUCCESS != err) {
453 goto exit;
454 }
455
456
457 ompi_op_reduce(op, lbuf, buf, totalcounts, dtype);
458
459 tbuf = lbuf; lbuf = buf; buf = tbuf;
460 }
461 } else {
462
463 err = MCA_PML_CALL(send(sbuf, totalcounts, dtype, root,
464 MCA_COLL_BASE_TAG_REDUCE_SCATTER,
465 MCA_PML_BASE_SEND_STANDARD, comm));
466 if (OMPI_SUCCESS != err) {
467 goto exit;
468 }
469 }
470
471
472 err = comm->c_local_comm->c_coll->coll_scatterv(lbuf, rcounts, disps, dtype,
473 rbuf, rcounts[rank], dtype, 0,
474 comm->c_local_comm,
475 comm->c_local_comm->c_coll->coll_scatterv_module);
476
477 exit:
478 if (NULL != tmpbuf) {
479 free(tmpbuf);
480 }
481
482 if (NULL != tmpbuf2) {
483 free(tmpbuf2);
484 }
485
486 if (NULL != disps) {
487 free(disps);
488 }
489
490 return err;
491 }