1 /*
2 * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
3 * University Research and Technology
4 * Corporation. All rights reserved.
5 * Copyright (c) 2004-2015 The University of Tennessee and The University
6 * of Tennessee Research Foundation. All rights
7 * reserved.
8 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
9 * University of Stuttgart. All rights reserved.
10 * Copyright (c) 2004-2005 The Regents of the University of California.
11 * All rights reserved.
12 * Copyright (c) 2009-2013 Cisco Systems, Inc. All rights reserved.
13 * Copyright (c) 2015 Research Organization for Information Science
14 * and Technology (RIST). All rights reserved.
15 * $COPYRIGHT$
16 *
17 * Additional copyrights may follow
18 *
19 * $HEADER$
20 */
21
22 #include "ompi_config.h"
23
24 #include <string.h>
25
26 #include "opal/datatype/opal_convertor.h"
27 #include "opal/sys/atomic.h"
28 #include "ompi/constants.h"
29 #include "ompi/communicator/communicator.h"
30 #include "ompi/mca/coll/coll.h"
31 #include "ompi/op/op.h"
32 #include "coll_sm.h"
33
34
35 /*
36 * Local functions
37 */
38 static int reduce_inorder(const void *sbuf, void* rbuf, int count,
39 struct ompi_datatype_t *dtype,
40 struct ompi_op_t *op,
41 int root, struct ompi_communicator_t *comm,
42 mca_coll_base_module_t *module);
43 #define WANT_REDUCE_NO_ORDER 0
44 #if WANT_REDUCE_NO_ORDER
45 static int reduce_no_order(const void *sbuf, void* rbuf, int count,
46 struct ompi_datatype_t *dtype,
47 struct ompi_op_t *op,
48 int root, struct ompi_communicator_t *comm,
49 mca_coll_base_module_t *module);
50 #endif
51
52 /*
53 * Useful utility routine
54 */
55 #if !defined(min)
56 static inline int min(int a, int b)
57 {
58 return (a < b) ? a : b;
59 }
60 #endif
61
62 /**
63 * Shared memory reduction.
64 *
65 * Simply farms out to the associative or non-associative functions.
66 */
67 int mca_coll_sm_reduce_intra(const void *sbuf, void* rbuf, int count,
68 struct ompi_datatype_t *dtype,
69 struct ompi_op_t *op,
70 int root, struct ompi_communicator_t *comm,
71 mca_coll_base_module_t *module)
72 {
73 size_t size;
74 mca_coll_sm_module_t *sm_module = (mca_coll_sm_module_t*) module;
75
76 /* There are several possibilities:
77 *
78 * 0. If the datatype is larger than a segment, fall back to
79 * underlying module
80 * 1. If the op is user-defined, use the strict order
81 * 2. If the op is intrinsic:
82 * a. If the op is float-associative, use the unordered
83 * b. If the op is not float-associative:
84 * i. if the data is floating point, use the strict order
85 * ii. if the data is not floating point, use the unordered
86 */
87
88 ompi_datatype_type_size(dtype, &size);
89 if ((int)size > mca_coll_sm_component.sm_control_size) {
90 return sm_module->previous_reduce(sbuf, rbuf, count,
91 dtype, op, root, comm,
92 sm_module->previous_reduce_module);
93 }
94 #if WANT_REDUCE_NO_ORDER
95 else {
96 /* Lazily enable the module the first time we invoke a
97 collective on it */
98 if (!sm_module->enabled) {
99 if (OMPI_SUCCESS !=
100 (ret = ompi_coll_sm_lazy_enable(module, comm))) {
101 return ret;
102 }
103 }
104
105 if (!ompi_op_is_intrinsic(op) ||
106 (ompi_op_is_intrinsic(op) && !ompi_op_is_float_assoc(op) &&
107 0 != (dtype->flags & OMPI_DATATYPE_FLAG_DATA_FLOAT))) {
108 return reduce_inorder(sbuf, rbuf, count, dtype, op,
109 root, comm, module);
110 } else {
111 return reduce_no_order(sbuf, rbuf, count, dtype, op,
112 root, comm, module);
113 }
114 }
115 #else
116 else {
117 /* Lazily enable the module the first time we invoke a
118 collective on it */
119 if (!sm_module->enabled) {
120 int ret;
121
122 if (OMPI_SUCCESS !=
123 (ret = ompi_coll_sm_lazy_enable(module, comm))) {
124 return ret;
125 }
126 }
127
128 return reduce_inorder(sbuf, rbuf, count, dtype, op, root, comm, module);
129 }
130 #endif
131 }
132
133
134 /**
135 * In-order shared memory reduction.
136 *
137 * This function performs the reduction in order -- combining elements
138 * starting with (0 operation 1), then (result operation 2), then
139 * (result operation 3), etc.
140 *
141 * Root's algorithm:
142 *
143 * If our datatype is "friendly" (i.e., the representation of the
144 * buffer is the same packed as it is unpacked), then the root doesn't
145 * need a temporary buffer -- we can combine the operands directly
146 * from the shared memory segments to the root's rbuf. Otherwise, we
147 * need a receive convertor and receive each fragment into a temporary
148 * buffer where we can combine that operan with the root's rbuf.
149 *
150 * In general, there are two loops:
151 *
152 * 1. loop over all fragments (which must be done in units of an
153 * integer number of datatypes -- remember that if this function is
154 * called, we know that the datattype is smaller than the max size of
155 * a fragment, so this is definitely possible)
156 *
157 * 2. loop over all the processes -- 0 to (comm_size-1).
158 * For process 0:
159 * - if the root==0, copy the *entire* buffer (i.e., don't copy
160 * fragment by fragment -- might as well copy the entire thing) the
161 * first time through the algorithm, and no-op every other time
162 * - else, copy from the shmem fragment to the out buffer
163 * For all other proceses:
164 * - if root==i, combine the relevant fragment from the sbuf to the
165 * relevant fragment on the rbuf
166 * - else, if the datatype is friendly, combine relevant fragment from
167 * the shmem segment to the relevant fragment in the rbuf. Otherwise,
168 * use the convertor to copy the fragment out of shmem into a temp
169 * buffer and do the combination from there to the rbuf.
170 *
171 * If we don't have a friendly datatype, then free the temporary
172 * buffer at the end.
173 */
174
175
176 static int reduce_inorder(const void *sbuf, void* rbuf, int count,
177 struct ompi_datatype_t *dtype,
178 struct ompi_op_t *op,
179 int root, struct ompi_communicator_t *comm,
180 mca_coll_base_module_t *module)
181 {
182 struct iovec iov;
183 mca_coll_sm_module_t *sm_module = (mca_coll_sm_module_t*) module;
184 mca_coll_sm_comm_t *data = sm_module->sm_comm_data;
185 int ret, rank, size;
186 int flag_num, segment_num, max_segment_num;
187 size_t total_size, max_data, bytes;
188 mca_coll_sm_in_use_flag_t *flag;
189 mca_coll_sm_data_index_t *index;
190 size_t ddt_size, segsize;
191 size_t segment_ddt_count, segment_ddt_bytes, zero = 0;
192 ptrdiff_t extent, gap;
193
194 /* Setup some identities */
195
196 rank = ompi_comm_rank(comm);
197 size = ompi_comm_size(comm);
198
199 /* Figure out how much we should have the convertor copy. We need
200 to have it be in units of a datatype -- i.e., we only want to
201 copy a whole datatype worth of data or none at all (we've
202 already guaranteed above that the datatype is not larger than a
203 segment, so we'll at least get 1). */
204
205 /* ddt_size is the packed size (e.g., MPI_SHORT_INT is 6) */
206 ompi_datatype_type_size(dtype, &ddt_size);
207 /* extent is from lb to ub (e.g., MPI_SHORT_INT is 8) */
208 ompi_datatype_type_extent(dtype, &extent);
209 segment_ddt_count = mca_coll_sm_component.sm_fragment_size / ddt_size;
210 iov.iov_len = segment_ddt_bytes = segment_ddt_count * ddt_size;
211 total_size = ddt_size * count;
212
213 bytes = 0;
214
215 /* Only have one top-level decision as to whether I'm the root or
216 not. Do this at the slight expense of repeating a little logic
217 -- but it's better than a conditional branch in every loop
218 iteration. */
219
220 /*********************************************************************
221 * Root
222 *********************************************************************/
223
224 if (root == rank) {
225 opal_convertor_t rtb_convertor, rbuf_convertor;
226 char *reduce_temp_buffer, *free_buffer, *reduce_target;
227 char *inplace_temp;
228 int peer;
229 size_t count_left = (size_t)count;
230 int frag_num = 0;
231 bool first_operation = true;
232
233 /* If the datatype is the same packed as it is unpacked, we
234 can save a memory copy and just do the reduction operation
235 directly from the shared memory segment. However, if the
236 representation is not the same, then we need to get a
237 receive convertor and a temporary buffer to receive
238 into. */
239
240 if (ompi_datatype_is_contiguous_memory_layout(dtype, count)) {
241 reduce_temp_buffer = free_buffer = NULL;
242 } else {
243 /* When we have a non-contiguous datatype, we need one or
244 * two convertors:
245 *
246 * rtb_convertor: unpacking from the shmem to the
247 * reduce_temp_buffer (where we can then apply the
248 * reduction).
249 *
250 * rbuf_convertor: unpacking from the shmem directly to the
251 * rbuf (no need to go to the reduce_temp_buffer first and
252 * then apply the reduction -- just copy straight to the
253 * target buffer).
254 */
255 OBJ_CONSTRUCT(&rtb_convertor, opal_convertor_t);
256 OBJ_CONSTRUCT(&rbuf_convertor, opal_convertor_t);
257
258 /* See lengthy comment in coll basic reduce about
259 explanation for how to malloc the extra buffer. Note
260 that we do not need a buffer big enough to hold "count"
261 instances of the datatype (i.e., big enough to hold the
262 entire user buffer) -- we only need to be able to hold
263 "segment_ddt_count" instances (i.e., the number of
264 instances that can be held in a single fragment) */
265
266 segsize = opal_datatype_span(&dtype->super, segment_ddt_count, &gap);
267
268 free_buffer = (char*)malloc(segsize);
269 if (NULL == free_buffer) {
270 return OMPI_ERR_OUT_OF_RESOURCE;
271 }
272 reduce_temp_buffer = free_buffer - gap;
273
274 /* Trickery here: we use a potentially smaller count than
275 the user count -- use the largest count that is <=
276 user's count that will fit within a single segment. */
277
278 if (OMPI_SUCCESS !=
279 (ret = opal_convertor_copy_and_prepare_for_recv(
280 ompi_mpi_local_convertor,
281 &(dtype->super),
282 segment_ddt_count,
283 reduce_temp_buffer,
284 0,
285 &rtb_convertor))) {
286 free(free_buffer);
287 return ret;
288 }
289
290 /* See if we need the rbuf_convertor */
291 if (size - 1 != rank) {
292 if (OMPI_SUCCESS !=
293 (ret = opal_convertor_copy_and_prepare_for_recv(
294 ompi_mpi_local_convertor,
295 &(dtype->super),
296 count,
297 rbuf,
298 0,
299 &rbuf_convertor))) {
300 free(free_buffer);
301 return ret;
302 }
303 }
304 }
305
306 /* If we're a) doing MPI_IN_PLACE (which means we're the root
307 -- wouldn't have gotten down here with MPI_IN_PLACE if we
308 weren't the root), and b) we're not rank (size-1), then we
309 need to copy the rbuf into a temporary buffer and use that
310 as the sbuf */
311
312 if (MPI_IN_PLACE == sbuf && (size - 1) != rank) {
313 segsize = opal_datatype_span(&dtype->super, count, &gap);
314 inplace_temp = (char*)malloc(segsize);
315 if (NULL == inplace_temp) {
316 if (NULL != free_buffer) {
317 free(free_buffer);
318 }
319 return OMPI_ERR_OUT_OF_RESOURCE;
320 }
321 sbuf = inplace_temp - gap;
322 ompi_datatype_copy_content_same_ddt(dtype, count, (char *)sbuf, (char *)rbuf);
323 } else {
324 inplace_temp = NULL;
325 }
326
327 /* Main loop over receiving / reducing fragments */
328
329 do {
330 flag_num = (data->mcb_operation_count %
331 mca_coll_sm_component.sm_comm_num_in_use_flags);
332 FLAG_SETUP(flag_num, flag, data);
333 FLAG_WAIT_FOR_IDLE(flag, reduce_root_flag_label);
334 FLAG_RETAIN(flag, size, data->mcb_operation_count);
335 ++data->mcb_operation_count;
336
337 /* Loop over all the segments in this set */
338
339 segment_num =
340 flag_num * mca_coll_sm_component.sm_segs_per_inuse_flag;
341 max_segment_num =
342 (flag_num + 1) * mca_coll_sm_component.sm_segs_per_inuse_flag;
343 reduce_target = (((char*) rbuf) + (frag_num * extent * segment_ddt_count));
344 do {
345
346 /* Note that all the other coll modules reduce from
347 process (size-1) to 0, so that's the order we'll do
348 it here. */
349 /* Process (size-1) is the root (special case) */
350 if (size - 1 == rank) {
351 /* If we're the root *and* the first process to be
352 combined *and* this is the first segment in the
353 entire algorithm, then just copy the whole sbuf
354 to rbuf. That way, we never need to copy from
355 my sbuf again (i.e., do the copy all at once
356 since all the data is local, and then don't
357 worry about it for the rest of the
358 algorithm) */
359 if (first_operation) {
360 first_operation = false;
361 if (MPI_IN_PLACE != sbuf) {
362 ompi_datatype_copy_content_same_ddt(dtype, count,
363 reduce_target, (char*)sbuf);
364 }
365 }
366 }
367
368 /* Process (size-1) is not the root */
369 else {
370 /* Wait for the data to be copied into shmem, just
371 like any other non-root process */
372 index = &(data->mcb_data_index[segment_num]);
373 PARENT_WAIT_FOR_NOTIFY_SPECIFIC(size - 1, rank, index, max_data, reduce_root_parent_label1);
374
375 /* If the datatype is contiguous, just copy it
376 straight to the reduce_target */
377 if (NULL == free_buffer) {
378 memcpy(reduce_target, ((char*)index->mcbmi_data) +
379 (size - 1) * mca_coll_sm_component.sm_fragment_size, max_data);
380 }
381 /* If the datatype is noncontiguous, use the
382 rbuf_convertor to unpack it straight to the
383 rbuf */
384 else {
385 max_data = segment_ddt_bytes;
386 COPY_FRAGMENT_OUT(rbuf_convertor, size - 1, index,
387 iov, max_data);
388 }
389 }
390
391 /* Loop over all the remaining processes, receiving
392 and reducing them in order */
393
394 for (peer = size - 2; peer >= 0; --peer) {
395
396 /* Handle the case where the source is this
397 process (which, by definition, excludes the
398 sbuf_copied_to_rbuf case because that can
399 *only* happen when root==0). In this case, we
400 don't need to wait for the peer (i.e., me) to
401 copy into shmem -- just reduce directly from my
402 sbuf. */
403 if (rank == peer) {
404 ompi_op_reduce(op,
405 ((char *) sbuf) +
406 frag_num * extent * segment_ddt_count,
407 reduce_target,
408 min(count_left, segment_ddt_count),
409 dtype);
410 }
411
412 /* Now handle the case where the source is not
413 this process. Wait for the process to copy to
414 the segment into shmem. */
415 else {
416 index = &(data->mcb_data_index[segment_num]);
417 PARENT_WAIT_FOR_NOTIFY_SPECIFIC(peer, rank,
418 index, max_data, reduce_root_parent_label2);
419
420 /* If we don't need an extra buffer, then do the
421 reduction operation on the fragment straight
422 from the shmem. */
423
424 if (NULL == free_buffer) {
425 ompi_op_reduce(op,
426 (index->mcbmi_data +
427 (peer * mca_coll_sm_component.sm_fragment_size)),
428 reduce_target,
429 min(count_left, segment_ddt_count),
430 dtype);
431 }
432
433 /* Otherwise, unpack the fragment to the temporary
434 buffer and then do the reduction from there */
435
436 else {
437 /* Unpack the fragment into my temporary
438 buffer */
439 max_data = segment_ddt_bytes;
440 COPY_FRAGMENT_OUT(rtb_convertor, peer, index,
441 iov, max_data);
442 opal_convertor_set_position(&rtb_convertor, &zero);
443
444 /* Do the reduction on this fragment */
445 ompi_op_reduce(op, reduce_temp_buffer,
446 reduce_target,
447 min(count_left, segment_ddt_count),
448 dtype);
449 }
450 } /* whether this process was me or not */
451 } /* loop over all proceses */
452
453 /* We've iterated through all the processes -- now we
454 move on to the next segment */
455
456 count_left -= segment_ddt_count;
457 bytes += segment_ddt_bytes;
458 ++segment_num;
459 ++frag_num;
460 reduce_target += extent * segment_ddt_count;
461 } while (bytes < total_size && segment_num < max_segment_num);
462
463 /* Root is now done with this set of segments */
464 FLAG_RELEASE(flag);
465 } while (bytes < total_size);
466
467 /* Kill the convertor, if we had one */
468
469 if (NULL != free_buffer) {
470 OBJ_DESTRUCT(&rtb_convertor);
471 OBJ_DESTRUCT(&rbuf_convertor);
472 free(free_buffer);
473 }
474 if (NULL != inplace_temp) {
475 free(inplace_temp);
476 }
477 }
478
479 /*********************************************************************
480 * Non-root
481 *********************************************************************/
482
483 else {
484 /* Here we get a convertor for the full count that the user
485 provided (as opposed to the convertor that the root got) */
486
487 opal_convertor_t sbuf_convertor;
488 OBJ_CONSTRUCT(&sbuf_convertor, opal_convertor_t);
489 if (OMPI_SUCCESS !=
490 (ret =
491 opal_convertor_copy_and_prepare_for_send(ompi_mpi_local_convertor,
492 &(dtype->super),
493 count,
494 sbuf,
495 0,
496 &sbuf_convertor))) {
497 return ret;
498 }
499
500 /* Loop over sending fragments to the root */
501
502 do {
503 flag_num = (data->mcb_operation_count %
504 mca_coll_sm_component.sm_comm_num_in_use_flags);
505
506 /* Wait for the root to mark this set of segments as
507 ours */
508 FLAG_SETUP(flag_num, flag, data);
509 FLAG_WAIT_FOR_OP(flag, data->mcb_operation_count, reduce_nonroot_flag_label);
510 ++data->mcb_operation_count;
511
512 /* Loop over all the segments in this set */
513
514 segment_num =
515 flag_num * mca_coll_sm_component.sm_segs_per_inuse_flag;
516 max_segment_num =
517 (flag_num + 1) * mca_coll_sm_component.sm_segs_per_inuse_flag;
518 do {
519 index = &(data->mcb_data_index[segment_num]);
520
521 /* Copy from the user's buffer to my shared mem
522 segment */
523 max_data = segment_ddt_bytes;
524 COPY_FRAGMENT_IN(sbuf_convertor, index, rank, iov, max_data);
525 bytes += max_data;
526
527 /* Wait for the write to absolutely complete */
528 opal_atomic_wmb();
529
530 /* Tell my parent (always the reduction root -- we're
531 ignoring the mcb_tree parent/child relationships
532 here) that this fragment is ready */
533 CHILD_NOTIFY_PARENT(rank, root, index, max_data);
534
535 ++segment_num;
536 } while (bytes < total_size && segment_num < max_segment_num);
537
538 /* We're finished with this set of segments */
539 FLAG_RELEASE(flag);
540 } while (bytes < total_size);
541
542 /* Kill the convertor */
543
544 OBJ_DESTRUCT(&sbuf_convertor);
545 }
546
547 /* All done */
548
549 return OMPI_SUCCESS;
550 }
551
552
553 #if WANT_REDUCE_NO_ORDER
554 /**
555 * Unordered shared memory reduction.
556 *
557 * This function performs the reduction in whatever order the operands
558 * arrive.
559 */
560 static int reduce_no_order(const void *sbuf, void* rbuf, int count,
561 struct ompi_datatype_t *dtype,
562 struct ompi_op_t *op,
563 int root, struct ompi_communicator_t *comm,
564 mca_coll_base_module_t *module)
565 {
566 return OMPI_ERR_NOT_IMPLEMENTED;
567 }
568 #endif