This source file includes following definitions.
- min
- mca_coll_sm_reduce_intra
- reduce_inorder
- reduce_no_order
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 #include "ompi_config.h"
23
24 #include <string.h>
25
26 #include "opal/datatype/opal_convertor.h"
27 #include "opal/sys/atomic.h"
28 #include "ompi/constants.h"
29 #include "ompi/communicator/communicator.h"
30 #include "ompi/mca/coll/coll.h"
31 #include "ompi/op/op.h"
32 #include "coll_sm.h"
33
34
35
36
37
38 static int reduce_inorder(const void *sbuf, void* rbuf, int count,
39 struct ompi_datatype_t *dtype,
40 struct ompi_op_t *op,
41 int root, struct ompi_communicator_t *comm,
42 mca_coll_base_module_t *module);
43 #define WANT_REDUCE_NO_ORDER 0
44 #if WANT_REDUCE_NO_ORDER
45 static int reduce_no_order(const void *sbuf, void* rbuf, int count,
46 struct ompi_datatype_t *dtype,
47 struct ompi_op_t *op,
48 int root, struct ompi_communicator_t *comm,
49 mca_coll_base_module_t *module);
50 #endif
51
52
53
54
55 #if !defined(min)
56 static inline int min(int a, int b)
57 {
58 return (a < b) ? a : b;
59 }
60 #endif
61
62
63
64
65
66
67 int mca_coll_sm_reduce_intra(const void *sbuf, void* rbuf, int count,
68 struct ompi_datatype_t *dtype,
69 struct ompi_op_t *op,
70 int root, struct ompi_communicator_t *comm,
71 mca_coll_base_module_t *module)
72 {
73 size_t size;
74 mca_coll_sm_module_t *sm_module = (mca_coll_sm_module_t*) module;
75
76
77
78
79
80
81
82
83
84
85
86
87
88 ompi_datatype_type_size(dtype, &size);
89 if ((int)size > mca_coll_sm_component.sm_control_size) {
90 return sm_module->previous_reduce(sbuf, rbuf, count,
91 dtype, op, root, comm,
92 sm_module->previous_reduce_module);
93 }
94 #if WANT_REDUCE_NO_ORDER
95 else {
96
97
98 if (!sm_module->enabled) {
99 if (OMPI_SUCCESS !=
100 (ret = ompi_coll_sm_lazy_enable(module, comm))) {
101 return ret;
102 }
103 }
104
105 if (!ompi_op_is_intrinsic(op) ||
106 (ompi_op_is_intrinsic(op) && !ompi_op_is_float_assoc(op) &&
107 0 != (dtype->flags & OMPI_DATATYPE_FLAG_DATA_FLOAT))) {
108 return reduce_inorder(sbuf, rbuf, count, dtype, op,
109 root, comm, module);
110 } else {
111 return reduce_no_order(sbuf, rbuf, count, dtype, op,
112 root, comm, module);
113 }
114 }
115 #else
116 else {
117
118
119 if (!sm_module->enabled) {
120 int ret;
121
122 if (OMPI_SUCCESS !=
123 (ret = ompi_coll_sm_lazy_enable(module, comm))) {
124 return ret;
125 }
126 }
127
128 return reduce_inorder(sbuf, rbuf, count, dtype, op, root, comm, module);
129 }
130 #endif
131 }
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176 static int reduce_inorder(const void *sbuf, void* rbuf, int count,
177 struct ompi_datatype_t *dtype,
178 struct ompi_op_t *op,
179 int root, struct ompi_communicator_t *comm,
180 mca_coll_base_module_t *module)
181 {
182 struct iovec iov;
183 mca_coll_sm_module_t *sm_module = (mca_coll_sm_module_t*) module;
184 mca_coll_sm_comm_t *data = sm_module->sm_comm_data;
185 int ret, rank, size;
186 int flag_num, segment_num, max_segment_num;
187 size_t total_size, max_data, bytes;
188 mca_coll_sm_in_use_flag_t *flag;
189 mca_coll_sm_data_index_t *index;
190 size_t ddt_size, segsize;
191 size_t segment_ddt_count, segment_ddt_bytes, zero = 0;
192 ptrdiff_t extent, gap;
193
194
195
196 rank = ompi_comm_rank(comm);
197 size = ompi_comm_size(comm);
198
199
200
201
202
203
204
205
206 ompi_datatype_type_size(dtype, &ddt_size);
207
208 ompi_datatype_type_extent(dtype, &extent);
209 segment_ddt_count = mca_coll_sm_component.sm_fragment_size / ddt_size;
210 iov.iov_len = segment_ddt_bytes = segment_ddt_count * ddt_size;
211 total_size = ddt_size * count;
212
213 bytes = 0;
214
215
216
217
218
219
220
221
222
223
224 if (root == rank) {
225 opal_convertor_t rtb_convertor, rbuf_convertor;
226 char *reduce_temp_buffer, *free_buffer, *reduce_target;
227 char *inplace_temp;
228 int peer;
229 size_t count_left = (size_t)count;
230 int frag_num = 0;
231 bool first_operation = true;
232
233
234
235
236
237
238
239
240 if (ompi_datatype_is_contiguous_memory_layout(dtype, count)) {
241 reduce_temp_buffer = free_buffer = NULL;
242 } else {
243
244
245
246
247
248
249
250
251
252
253
254
255 OBJ_CONSTRUCT(&rtb_convertor, opal_convertor_t);
256 OBJ_CONSTRUCT(&rbuf_convertor, opal_convertor_t);
257
258
259
260
261
262
263
264
265
266 segsize = opal_datatype_span(&dtype->super, segment_ddt_count, &gap);
267
268 free_buffer = (char*)malloc(segsize);
269 if (NULL == free_buffer) {
270 return OMPI_ERR_OUT_OF_RESOURCE;
271 }
272 reduce_temp_buffer = free_buffer - gap;
273
274
275
276
277
278 if (OMPI_SUCCESS !=
279 (ret = opal_convertor_copy_and_prepare_for_recv(
280 ompi_mpi_local_convertor,
281 &(dtype->super),
282 segment_ddt_count,
283 reduce_temp_buffer,
284 0,
285 &rtb_convertor))) {
286 free(free_buffer);
287 return ret;
288 }
289
290
291 if (size - 1 != rank) {
292 if (OMPI_SUCCESS !=
293 (ret = opal_convertor_copy_and_prepare_for_recv(
294 ompi_mpi_local_convertor,
295 &(dtype->super),
296 count,
297 rbuf,
298 0,
299 &rbuf_convertor))) {
300 free(free_buffer);
301 return ret;
302 }
303 }
304 }
305
306
307
308
309
310
311
312 if (MPI_IN_PLACE == sbuf && (size - 1) != rank) {
313 segsize = opal_datatype_span(&dtype->super, count, &gap);
314 inplace_temp = (char*)malloc(segsize);
315 if (NULL == inplace_temp) {
316 if (NULL != free_buffer) {
317 free(free_buffer);
318 }
319 return OMPI_ERR_OUT_OF_RESOURCE;
320 }
321 sbuf = inplace_temp - gap;
322 ompi_datatype_copy_content_same_ddt(dtype, count, (char *)sbuf, (char *)rbuf);
323 } else {
324 inplace_temp = NULL;
325 }
326
327
328
329 do {
330 flag_num = (data->mcb_operation_count %
331 mca_coll_sm_component.sm_comm_num_in_use_flags);
332 FLAG_SETUP(flag_num, flag, data);
333 FLAG_WAIT_FOR_IDLE(flag, reduce_root_flag_label);
334 FLAG_RETAIN(flag, size, data->mcb_operation_count);
335 ++data->mcb_operation_count;
336
337
338
339 segment_num =
340 flag_num * mca_coll_sm_component.sm_segs_per_inuse_flag;
341 max_segment_num =
342 (flag_num + 1) * mca_coll_sm_component.sm_segs_per_inuse_flag;
343 reduce_target = (((char*) rbuf) + (frag_num * extent * segment_ddt_count));
344 do {
345
346
347
348
349
350 if (size - 1 == rank) {
351
352
353
354
355
356
357
358
359 if (first_operation) {
360 first_operation = false;
361 if (MPI_IN_PLACE != sbuf) {
362 ompi_datatype_copy_content_same_ddt(dtype, count,
363 reduce_target, (char*)sbuf);
364 }
365 }
366 }
367
368
369 else {
370
371
372 index = &(data->mcb_data_index[segment_num]);
373 PARENT_WAIT_FOR_NOTIFY_SPECIFIC(size - 1, rank, index, max_data, reduce_root_parent_label1);
374
375
376
377 if (NULL == free_buffer) {
378 memcpy(reduce_target, ((char*)index->mcbmi_data) +
379 (size - 1) * mca_coll_sm_component.sm_fragment_size, max_data);
380 }
381
382
383
384 else {
385 max_data = segment_ddt_bytes;
386 COPY_FRAGMENT_OUT(rbuf_convertor, size - 1, index,
387 iov, max_data);
388 }
389 }
390
391
392
393
394 for (peer = size - 2; peer >= 0; --peer) {
395
396
397
398
399
400
401
402
403 if (rank == peer) {
404 ompi_op_reduce(op,
405 ((char *) sbuf) +
406 frag_num * extent * segment_ddt_count,
407 reduce_target,
408 min(count_left, segment_ddt_count),
409 dtype);
410 }
411
412
413
414
415 else {
416 index = &(data->mcb_data_index[segment_num]);
417 PARENT_WAIT_FOR_NOTIFY_SPECIFIC(peer, rank,
418 index, max_data, reduce_root_parent_label2);
419
420
421
422
423
424 if (NULL == free_buffer) {
425 ompi_op_reduce(op,
426 (index->mcbmi_data +
427 (peer * mca_coll_sm_component.sm_fragment_size)),
428 reduce_target,
429 min(count_left, segment_ddt_count),
430 dtype);
431 }
432
433
434
435
436 else {
437
438
439 max_data = segment_ddt_bytes;
440 COPY_FRAGMENT_OUT(rtb_convertor, peer, index,
441 iov, max_data);
442 opal_convertor_set_position(&rtb_convertor, &zero);
443
444
445 ompi_op_reduce(op, reduce_temp_buffer,
446 reduce_target,
447 min(count_left, segment_ddt_count),
448 dtype);
449 }
450 }
451 }
452
453
454
455
456 count_left -= segment_ddt_count;
457 bytes += segment_ddt_bytes;
458 ++segment_num;
459 ++frag_num;
460 reduce_target += extent * segment_ddt_count;
461 } while (bytes < total_size && segment_num < max_segment_num);
462
463
464 FLAG_RELEASE(flag);
465 } while (bytes < total_size);
466
467
468
469 if (NULL != free_buffer) {
470 OBJ_DESTRUCT(&rtb_convertor);
471 OBJ_DESTRUCT(&rbuf_convertor);
472 free(free_buffer);
473 }
474 if (NULL != inplace_temp) {
475 free(inplace_temp);
476 }
477 }
478
479
480
481
482
483 else {
484
485
486
487 opal_convertor_t sbuf_convertor;
488 OBJ_CONSTRUCT(&sbuf_convertor, opal_convertor_t);
489 if (OMPI_SUCCESS !=
490 (ret =
491 opal_convertor_copy_and_prepare_for_send(ompi_mpi_local_convertor,
492 &(dtype->super),
493 count,
494 sbuf,
495 0,
496 &sbuf_convertor))) {
497 return ret;
498 }
499
500
501
502 do {
503 flag_num = (data->mcb_operation_count %
504 mca_coll_sm_component.sm_comm_num_in_use_flags);
505
506
507
508 FLAG_SETUP(flag_num, flag, data);
509 FLAG_WAIT_FOR_OP(flag, data->mcb_operation_count, reduce_nonroot_flag_label);
510 ++data->mcb_operation_count;
511
512
513
514 segment_num =
515 flag_num * mca_coll_sm_component.sm_segs_per_inuse_flag;
516 max_segment_num =
517 (flag_num + 1) * mca_coll_sm_component.sm_segs_per_inuse_flag;
518 do {
519 index = &(data->mcb_data_index[segment_num]);
520
521
522
523 max_data = segment_ddt_bytes;
524 COPY_FRAGMENT_IN(sbuf_convertor, index, rank, iov, max_data);
525 bytes += max_data;
526
527
528 opal_atomic_wmb();
529
530
531
532
533 CHILD_NOTIFY_PARENT(rank, root, index, max_data);
534
535 ++segment_num;
536 } while (bytes < total_size && segment_num < max_segment_num);
537
538
539 FLAG_RELEASE(flag);
540 } while (bytes < total_size);
541
542
543
544 OBJ_DESTRUCT(&sbuf_convertor);
545 }
546
547
548
549 return OMPI_SUCCESS;
550 }
551
552
553 #if WANT_REDUCE_NO_ORDER
554
555
556
557
558
559
560 static int reduce_no_order(const void *sbuf, void* rbuf, int count,
561 struct ompi_datatype_t *dtype,
562 struct ompi_op_t *op,
563 int root, struct ompi_communicator_t *comm,
564 mca_coll_base_module_t *module)
565 {
566 return OMPI_ERR_NOT_IMPLEMENTED;
567 }
568 #endif