1 /*
2 * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
3 * University Research and Technology
4 * Corporation. All rights reserved.
5 * Copyright (c) 2004-2005 The University of Tennessee and The University
6 * of Tennessee Research Foundation. All rights
7 * reserved.
8 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
9 * University of Stuttgart. All rights reserved.
10 * Copyright (c) 2004-2005 The Regents of the University of California.
11 * All rights reserved.
12 * $COPYRIGHT$
13 *
14 * Additional copyrights may follow
15 *
16 * $HEADER$
17 */
18 /** @file */
19
20 #include "ompi_config.h"
21
22 #include <string.h>
23
24 #include "opal/datatype/opal_convertor.h"
25 #include "ompi/constants.h"
26 #include "ompi/communicator/communicator.h"
27 #include "ompi/datatype/ompi_datatype.h"
28 #include "ompi/mca/coll/coll.h"
29 #include "opal/sys/atomic.h"
30 #include "coll_sm.h"
31
32 /**
33 * Shared memory broadcast.
34 *
35 * For the root, the general algorithm is to wait for a set of
36 * segments to become available. Once it is, the root claims the set
37 * by writing the current operation number and the number of processes
38 * using the set to the flag. The root then loops over the set of
39 * segments; for each segment, it copies a fragment of the user's
40 * buffer into the shared data segment and then writes the data size
41 * into its childrens' control buffers. The process is repeated until
42 * all fragments have been written.
43 *
44 * For non-roots, for each set of buffers, they wait until the current
45 * operation number appears in the in-use flag (i.e., written by the
46 * root). Then for each segment, they wait for a nonzero to appear
47 * into their control buffers. If they have children, they copy the
48 * data from their parent's shared data segment into their shared data
49 * segment, and write the data size into each of their childrens'
50 * control buffers. They then copy the data from their shared [local]
51 * data segment into the user's output buffer. The process is
52 * repeated until all fragments have been received. If they do not
53 * have children, they copy the data directly from the parent's shared
54 * data segment into the user's output buffer.
55 */
56 int mca_coll_sm_bcast_intra(void *buff, int count,
57 struct ompi_datatype_t *datatype, int root,
58 struct ompi_communicator_t *comm,
59 mca_coll_base_module_t *module)
60 {
61 struct iovec iov;
62 mca_coll_sm_module_t *sm_module = (mca_coll_sm_module_t*) module;
63 mca_coll_sm_comm_t *data;
64 int i, ret, rank, size, num_children, src_rank;
65 int flag_num, segment_num, max_segment_num;
66 int parent_rank;
67 size_t total_size, max_data, bytes;
68 mca_coll_sm_in_use_flag_t *flag;
69 opal_convertor_t convertor;
70 mca_coll_sm_tree_node_t *me, *parent, **children;
71 mca_coll_sm_data_index_t *index;
72
73 /* Lazily enable the module the first time we invoke a collective
74 on it */
75 if (!sm_module->enabled) {
76 if (OMPI_SUCCESS != (ret = ompi_coll_sm_lazy_enable(module, comm))) {
77 return ret;
78 }
79 }
80 data = sm_module->sm_comm_data;
81
82 /* Setup some identities */
83
84 rank = ompi_comm_rank(comm);
85 size = ompi_comm_size(comm);
86
87 OBJ_CONSTRUCT(&convertor, opal_convertor_t);
88 iov.iov_len = mca_coll_sm_component.sm_fragment_size;
89 bytes = 0;
90
91 me = &data->mcb_tree[(rank + size - root) % size];
92 parent = me->mcstn_parent;
93 children = me->mcstn_children;
94 num_children = me->mcstn_num_children;
95
96 /* Only have one top-level decision as to whether I'm the root or
97 not. Do this at the slight expense of repeating a little logic
98 -- but it's better than a conditional branch in every loop
99 iteration. */
100
101 /*********************************************************************
102 * Root
103 *********************************************************************/
104
105 if (root == rank) {
106
107 /* The root needs a send convertor to pack from the user's
108 buffer to shared memory */
109
110 if (OMPI_SUCCESS !=
111 (ret =
112 opal_convertor_copy_and_prepare_for_send(ompi_mpi_local_convertor,
113 &(datatype->super),
114 count,
115 buff,
116 0,
117 &convertor))) {
118 return ret;
119 }
120 opal_convertor_get_packed_size(&convertor, &total_size);
121
122 /* Main loop over sending fragments */
123
124 do {
125 flag_num = (data->mcb_operation_count++ %
126 mca_coll_sm_component.sm_comm_num_in_use_flags);
127
128 FLAG_SETUP(flag_num, flag, data);
129 FLAG_WAIT_FOR_IDLE(flag, bcast_root_label);
130 FLAG_RETAIN(flag, size - 1, data->mcb_operation_count - 1);
131
132 /* Loop over all the segments in this set */
133
134 segment_num =
135 flag_num * mca_coll_sm_component.sm_segs_per_inuse_flag;
136 max_segment_num =
137 (flag_num + 1) * mca_coll_sm_component.sm_segs_per_inuse_flag;
138 do {
139 index = &(data->mcb_data_index[segment_num]);
140
141 /* Copy the fragment from the user buffer to my fragment
142 in the current segment */
143 max_data = mca_coll_sm_component.sm_fragment_size;
144 COPY_FRAGMENT_IN(convertor, index, rank, iov, max_data);
145 bytes += max_data;
146
147 /* Wait for the write to absolutely complete */
148 opal_atomic_wmb();
149
150 /* Tell my children that this fragment is ready */
151 PARENT_NOTIFY_CHILDREN(children, num_children, index,
152 max_data);
153
154 ++segment_num;
155 } while (bytes < total_size && segment_num < max_segment_num);
156 } while (bytes < total_size);
157 }
158
159 /*********************************************************************
160 * Non-root
161 *********************************************************************/
162
163 else {
164
165 /* Non-root processes need a receive convertor to unpack from
166 shared mmory to the user's buffer */
167
168 if (OMPI_SUCCESS !=
169 (ret =
170 opal_convertor_copy_and_prepare_for_recv(ompi_mpi_local_convertor,
171 &(datatype->super),
172 count,
173 buff,
174 0,
175 &convertor))) {
176 return ret;
177 }
178 opal_convertor_get_packed_size(&convertor, &total_size);
179
180 /* Loop over receiving (and possibly re-sending) the
181 fragments */
182
183 do {
184 flag_num = (data->mcb_operation_count %
185 mca_coll_sm_component.sm_comm_num_in_use_flags);
186
187 /* Wait for the root to mark this set of segments as
188 ours */
189 FLAG_SETUP(flag_num, flag, data);
190 FLAG_WAIT_FOR_OP(flag, data->mcb_operation_count, bcast_nonroot_label1);
191 ++data->mcb_operation_count;
192
193 /* Loop over all the segments in this set */
194
195 segment_num =
196 flag_num * mca_coll_sm_component.sm_segs_per_inuse_flag;
197 max_segment_num =
198 (flag_num + 1) * mca_coll_sm_component.sm_segs_per_inuse_flag;
199 do {
200
201 /* Pre-calculate some values */
202 parent_rank = (parent->mcstn_id + root) % size;
203 index = &(data->mcb_data_index[segment_num]);
204
205 /* Wait for my parent to tell me that the segment is ready */
206 CHILD_WAIT_FOR_NOTIFY(rank, index, max_data, bcast_nonroot_label2);
207
208 /* If I have children, send the data to them */
209 if (num_children > 0) {
210 /* Copy the fragment from the parent's portion in
211 the segment to my portion in the segment. */
212 COPY_FRAGMENT_BETWEEN(parent_rank, rank, index, max_data);
213
214 /* Wait for the write to absolutely complete */
215 opal_atomic_wmb();
216
217 /* Tell my children that this fragment is ready */
218 PARENT_NOTIFY_CHILDREN(children, num_children, index,
219 max_data);
220
221 /* Set the "copy from buffer" to be my local
222 segment buffer so that we don't potentially
223 incur a non-local memory copy from the parent's
224 fan out data segment [again] when copying to
225 the user's buffer */
226 src_rank = rank;
227 }
228
229 /* If I don't have any children, set the "copy from
230 buffer" to be my parent's fan out segment to copy
231 directly from my parent */
232
233 else {
234 src_rank = parent_rank;
235 }
236
237 /* Copy to my output buffer */
238 COPY_FRAGMENT_OUT(convertor, src_rank, index, iov, max_data);
239
240 bytes += max_data;
241 ++segment_num;
242 } while (bytes < total_size && segment_num < max_segment_num);
243
244 /* Wait for all copy-out writes to complete before I say
245 I'm done with the segments */
246 opal_atomic_wmb();
247
248 /* We're finished with this set of segments */
249 FLAG_RELEASE(flag);
250 } while (bytes < total_size);
251 }
252
253 /* Kill the convertor */
254
255 OBJ_DESTRUCT(&convertor);
256
257 /* All done */
258
259 return OMPI_SUCCESS;
260 }