This source file includes following definitions.
- mca_coll_basic_reduce_log_intra
- mca_coll_basic_reduce_lin_inter
- mca_coll_basic_reduce_log_inter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 #include "ompi_config.h"
22 #include "coll_basic.h"
23
24 #include <stdio.h>
25
26 #include "mpi.h"
27 #include "ompi/constants.h"
28 #include "ompi/mca/coll/coll.h"
29 #include "ompi/mca/coll/base/coll_tags.h"
30 #include "ompi/mca/pml/pml.h"
31 #include "ompi/op/op.h"
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86 int
87 mca_coll_basic_reduce_log_intra(const void *sbuf, void *rbuf, int count,
88 struct ompi_datatype_t *dtype,
89 struct ompi_op_t *op,
90 int root, struct ompi_communicator_t *comm,
91 mca_coll_base_module_t *module)
92 {
93 int i, size, rank, vrank;
94 int err, peer, dim, mask;
95 ptrdiff_t lb, extent, dsize, gap;
96 char *free_buffer = NULL;
97 char *free_rbuf = NULL;
98 char *pml_buffer = NULL;
99 char *snd_buffer = NULL;
100 char *rcv_buffer = (char*)rbuf;
101 char *inplace_temp = NULL;
102
103
104
105
106
107
108 if (!ompi_op_is_commute(op)) {
109 return ompi_coll_base_reduce_intra_basic_linear(sbuf, rbuf, count, dtype,
110 op, root, comm, module);
111 }
112
113
114 size = ompi_comm_size(comm);
115 rank = ompi_comm_rank(comm);
116 vrank = ompi_op_is_commute(op) ? (rank - root + size) % size : rank;
117 dim = comm->c_cube_dim;
118
119
120
121
122 ompi_datatype_get_extent(dtype, &lb, &extent);
123 dsize = opal_datatype_span(&dtype->super, count, &gap);
124
125 free_buffer = (char*)malloc(dsize);
126 if (NULL == free_buffer) {
127 return OMPI_ERR_OUT_OF_RESOURCE;
128 }
129
130 pml_buffer = free_buffer - gap;
131
132
133 if (ompi_op_is_commute(op)) {
134 rcv_buffer = pml_buffer;
135 }
136
137
138
139
140 if (MPI_IN_PLACE == sbuf) {
141 inplace_temp = (char*)malloc(dsize);
142 if (NULL == inplace_temp) {
143 err = OMPI_ERR_OUT_OF_RESOURCE;
144 goto cleanup_and_return;
145 }
146 sbuf = inplace_temp - gap;
147 err = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)sbuf, (char*)rbuf);
148 }
149 snd_buffer = (char*)sbuf;
150
151 if (rank != root && 0 == (vrank & 1)) {
152
153
154
155 free_rbuf = (char*)malloc(dsize);
156 if (NULL == free_rbuf) {
157 err = OMPI_ERR_OUT_OF_RESOURCE;
158 goto cleanup_and_return;
159 }
160 rbuf = free_rbuf - gap;
161 }
162
163
164
165
166 for (i = 0, mask = 1; i < dim; ++i, mask <<= 1) {
167
168
169 if (vrank & mask) {
170 peer = vrank & ~mask;
171 if (ompi_op_is_commute(op)) {
172 peer = (peer + root) % size;
173 }
174
175 err = MCA_PML_CALL(send(snd_buffer, count,
176 dtype, peer, MCA_COLL_BASE_TAG_REDUCE,
177 MCA_PML_BASE_SEND_STANDARD, comm));
178 if (MPI_SUCCESS != err) {
179 goto cleanup_and_return;
180 }
181 snd_buffer = (char*)rbuf;
182 break;
183 }
184
185
186
187
188 else {
189 peer = vrank | mask;
190 if (peer >= size) {
191 continue;
192 }
193 if (ompi_op_is_commute(op)) {
194 peer = (peer + root) % size;
195 }
196
197
198
199
200
201
202
203
204
205
206 err = MCA_PML_CALL(recv(rcv_buffer, count, dtype, peer,
207 MCA_COLL_BASE_TAG_REDUCE, comm,
208 MPI_STATUS_IGNORE));
209 if (MPI_SUCCESS != err) {
210 goto cleanup_and_return;
211 }
212
213
214
215 if (snd_buffer != sbuf) {
216
217 ompi_op_reduce(op, rcv_buffer, pml_buffer, count, dtype);
218 } else {
219
220
221
222
223
224 if (!ompi_op_is_commute(op)) {
225 ompi_datatype_copy_content_same_ddt(dtype, count, pml_buffer,
226 (char*)sbuf);
227 ompi_op_reduce(op, rbuf, pml_buffer, count, dtype);
228 } else {
229 ompi_op_reduce(op, (void *)sbuf, pml_buffer, count, dtype);
230 }
231
232 snd_buffer = pml_buffer;
233
234
235 rcv_buffer = (char*)rbuf;
236 }
237 }
238 }
239
240
241 err = MPI_SUCCESS;
242 if (0 == vrank) {
243 if (root == rank) {
244 ompi_datatype_copy_content_same_ddt(dtype, count, (char*)rbuf, snd_buffer);
245 } else {
246 err = MCA_PML_CALL(send(snd_buffer, count,
247 dtype, root, MCA_COLL_BASE_TAG_REDUCE,
248 MCA_PML_BASE_SEND_STANDARD, comm));
249 }
250 } else if (rank == root) {
251 err = MCA_PML_CALL(recv(rcv_buffer, count, dtype, 0,
252 MCA_COLL_BASE_TAG_REDUCE,
253 comm, MPI_STATUS_IGNORE));
254 if (rcv_buffer != rbuf) {
255 ompi_op_reduce(op, rcv_buffer, rbuf, count, dtype);
256 }
257 }
258
259 cleanup_and_return:
260 if (NULL != inplace_temp) {
261 free(inplace_temp);
262 }
263 if (NULL != free_buffer) {
264 free(free_buffer);
265 }
266 if (NULL != free_rbuf) {
267 free(free_rbuf);
268 }
269
270
271
272 return err;
273 }
274
275
276
277
278
279
280
281
282
283 int
284 mca_coll_basic_reduce_lin_inter(const void *sbuf, void *rbuf, int count,
285 struct ompi_datatype_t *dtype,
286 struct ompi_op_t *op,
287 int root, struct ompi_communicator_t *comm,
288 mca_coll_base_module_t *module)
289 {
290 int i, err, size;
291 ptrdiff_t dsize, gap;
292 char *free_buffer = NULL;
293 char *pml_buffer = NULL;
294
295
296 size = ompi_comm_remote_size(comm);
297
298 if (MPI_PROC_NULL == root) {
299
300 err = OMPI_SUCCESS;
301 } else if (MPI_ROOT != root) {
302
303 err = MCA_PML_CALL(send(sbuf, count, dtype, root,
304 MCA_COLL_BASE_TAG_REDUCE,
305 MCA_PML_BASE_SEND_STANDARD, comm));
306 } else {
307
308 dsize = opal_datatype_span(&dtype->super, count, &gap);
309
310 free_buffer = (char*)malloc(dsize);
311 if (NULL == free_buffer) {
312 return OMPI_ERR_OUT_OF_RESOURCE;
313 }
314 pml_buffer = free_buffer - gap;
315
316
317
318 err = MCA_PML_CALL(recv(rbuf, count, dtype, 0,
319 MCA_COLL_BASE_TAG_REDUCE, comm,
320 MPI_STATUS_IGNORE));
321 if (MPI_SUCCESS != err) {
322 if (NULL != free_buffer) {
323 free(free_buffer);
324 }
325 return err;
326 }
327
328
329 for (i = 1; i < size; i++) {
330 err = MCA_PML_CALL(recv(pml_buffer, count, dtype, i,
331 MCA_COLL_BASE_TAG_REDUCE, comm,
332 MPI_STATUS_IGNORE));
333 if (MPI_SUCCESS != err) {
334 if (NULL != free_buffer) {
335 free(free_buffer);
336 }
337 return err;
338 }
339
340
341 ompi_op_reduce(op, pml_buffer, rbuf, count, dtype);
342 }
343
344 if (NULL != free_buffer) {
345 free(free_buffer);
346 }
347 }
348
349
350 return err;
351 }
352
353
354
355
356
357
358
359
360
361 int
362 mca_coll_basic_reduce_log_inter(const void *sbuf, void *rbuf, int count,
363 struct ompi_datatype_t *dtype,
364 struct ompi_op_t *op,
365 int root, struct ompi_communicator_t *comm,
366 mca_coll_base_module_t *module)
367 {
368 return OMPI_ERR_NOT_IMPLEMENTED;
369 }