This source file includes following definitions.
- mca_topo_base_dist_graph_distribute
- mca_topo_base_dist_graph_create
- mca_topo_base_comm_dist_graph_2_2_0_construct
- mca_topo_base_comm_dist_graph_2_2_0_destruct
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 #include "ompi_config.h"
17
18 #include "ompi/communicator/communicator.h"
19 #include "ompi/info/info.h"
20 #include "ompi/mca/topo/base/base.h"
21 #include "ompi/datatype/ompi_datatype.h"
22 #include "ompi/mca/pml/pml.h"
23 #include "ompi/request/request.h"
24
25 #define IN_INDEX 0
26 #define OUT_INDEX 1
27 #define MCA_TOPO_BASE_TAG_DIST_EDGE_IN -50
28 #define MCA_TOPO_BASE_TAG_DIST_EDGE_OUT -51
29
30 typedef struct _dist_graph_elem {
31 int in;
32 int out;
33 } mca_topo_base_dist_graph_elem_t;
34
35 int mca_topo_base_dist_graph_distribute(mca_topo_base_module_t* module,
36 ompi_communicator_t *comm,
37 int n, const int nodes[],
38 const int degrees[], const int targets[],
39 const int weights[],
40 mca_topo_base_comm_dist_graph_2_2_0_t** ptopo)
41 {
42 int i, j, err, count, left_over, pending_reqs, current_pos, index, csize;
43 int *rin = NULL, *rout, *temp = NULL;
44 mca_topo_base_dist_graph_elem_t *pos, *cnt, *idx;
45 size_t int_size, how_much;
46 ompi_status_public_t status;
47 ompi_request_t **reqs = NULL;
48 mca_topo_base_comm_dist_graph_2_2_0_t* topo=NULL;
49
50 ompi_datatype_type_size( (ompi_datatype_t*)&ompi_mpi_int, &int_size);
51
52 csize = ompi_comm_size(comm);
53
54
55
56
57
58
59
60 cnt = (mca_topo_base_dist_graph_elem_t*)calloc(3 * csize, sizeof(mca_topo_base_dist_graph_elem_t));
61 if( NULL == cnt ) {
62 err = OMPI_ERR_OUT_OF_RESOURCE;
63 goto bail_out;
64 }
65 pos = cnt + csize;
66 idx = pos + csize;
67
68 for( index = i = 0; i < n; i++ ) {
69 cnt[nodes[i]].out += degrees[i];
70 for( j = 0; j < degrees[i]; ++j ) {
71 cnt[targets[index]].in++;
72 index++;
73 }
74 }
75
76
77
78
79
80 pos[0].in = 0;
81 pos[0].out = 0;
82 for( i = 0; i < (csize - 1); i++ ) {
83 pos[i + 1].in = pos[i].in + cnt[i].in;
84 pos[i + 1].out = pos[i].out + cnt[i].out;
85 }
86
87 rin = (int*)calloc(2 * (pos[csize - 1].in + cnt[csize - 1].in +
88 pos[csize - 1].out + cnt[csize - 1].out), sizeof(int));
89 if( NULL == rin ) {
90 err = OMPI_ERR_OUT_OF_RESOURCE;
91 goto bail_out;
92 }
93 rout = &rin[2 * (pos[csize - 1].in + cnt[csize - 1].in)];
94
95 for( index = i = 0; i < n; ++i ) {
96 for( j = 0; j < degrees[i]; ++j ) {
97 int position = pos[nodes[i]].out + idx[nodes[i]].out;
98 if( MPI_UNWEIGHTED != weights ) {
99 position *= 2;
100 rout[position + 1] = weights[index];
101 }
102 rout[position + 0] = targets[index];
103 idx[nodes[i]].out++;
104
105 position = pos[targets[index]].in + idx[targets[index]].in;
106 if( MPI_UNWEIGHTED != weights ) {
107 position *= 2;
108 rin[position + 1] = weights[index];
109 }
110 rin[position + 0] = nodes[i];
111 idx[targets[index]].in++;
112
113 index++;
114 }
115 }
116
117 err = comm->c_coll->coll_reduce_scatter_block( MPI_IN_PLACE, idx, 2,
118 (ompi_datatype_t*)&ompi_mpi_int, MPI_SUM, comm,
119 comm->c_coll->coll_reduce_scatter_block_module);
120
121
122
123
124
125 topo = OBJ_NEW(mca_topo_base_comm_dist_graph_2_2_0_t);
126 if( NULL == topo ) {
127 err = OMPI_ERR_OUT_OF_RESOURCE;
128 goto bail_out;
129 }
130 topo->indegree = idx[0].in;
131 topo->outdegree = idx[0].out;
132 topo->weighted = (weights != MPI_UNWEIGHTED);
133 if (topo->indegree > 0) {
134 topo->in = (int*)malloc(sizeof(int) * topo->indegree);
135 if (NULL == topo->in) {
136 err = OMPI_ERR_OUT_OF_RESOURCE;
137 goto bail_out;
138 }
139 if (MPI_UNWEIGHTED != weights) {
140 topo->inw = (int*)malloc(sizeof(int) * topo->indegree);
141 if (NULL == topo->inw) {
142 err = OMPI_ERR_OUT_OF_RESOURCE;
143 goto bail_out;
144 }
145 }
146 }
147 if (topo->outdegree > 0) {
148 topo->out = (int*)malloc(sizeof(int) * topo->outdegree);
149 if (NULL == topo->out) {
150 err = OMPI_ERR_OUT_OF_RESOURCE;
151 goto bail_out;
152 }
153 if (MPI_UNWEIGHTED != weights) {
154 topo->outw = (int*)malloc(sizeof(int) * topo->outdegree);
155 if (NULL == topo->outw) {
156 err = OMPI_ERR_OUT_OF_RESOURCE;
157 goto bail_out;
158 }
159 }
160 }
161
162 reqs = (ompi_request_t**)malloc(sizeof(ompi_request_t*) * 2 * csize);
163 for (pending_reqs = i = 0; i < csize; ++i) {
164 int position;
165 if( 0 != (count = cnt[i].in) ) {
166 position = pos[i].in;
167 if (MPI_UNWEIGHTED != weights) {
168 count *= 2;
169 position *= 2;
170 }
171 err = MCA_PML_CALL(isend( &rin[position], count, (ompi_datatype_t*)&ompi_mpi_int,
172 i, MCA_TOPO_BASE_TAG_DIST_EDGE_IN, MCA_PML_BASE_SEND_STANDARD,
173 comm, &reqs[pending_reqs]));
174 pending_reqs++;
175 }
176 if( 0 != (count = cnt[i].out) ) {
177 position = pos[i].out;
178 if (MPI_UNWEIGHTED != weights) {
179 count *= 2;
180 position *= 2;
181 }
182 err = MCA_PML_CALL(isend(&rout[position], count, (ompi_datatype_t*)&ompi_mpi_int,
183 i, MCA_TOPO_BASE_TAG_DIST_EDGE_OUT, MCA_PML_BASE_SEND_STANDARD,
184 comm, &reqs[pending_reqs]));
185 pending_reqs++;
186 }
187 }
188
189
190
191
192
193 count = topo->indegree;
194 temp = topo->in;
195 if (MPI_UNWEIGHTED != weights) {
196 count *= 2;
197 if (count > 0) {
198
199
200 temp = (int*)malloc(count*sizeof(int));
201 if (NULL == temp) {
202 err = OMPI_ERR_OUT_OF_RESOURCE;
203 goto bail_out;
204 }
205 }
206 }
207 for( left_over = count, current_pos = i = 0; left_over > 0; i++ ) {
208
209 MCA_PML_CALL(recv( &temp[count - left_over], left_over, (ompi_datatype_t*)&ompi_mpi_int,
210 MPI_ANY_SOURCE, MCA_TOPO_BASE_TAG_DIST_EDGE_IN,
211 comm, &status ));
212 how_much = status._ucount / int_size;
213 if (MPI_UNWEIGHTED != weights) {
214 for( j = 0; j < ((int)how_much >> 1); j++, current_pos++ ) {
215 topo->in[current_pos] = temp[2 * j + 0 + (count - left_over)];
216 topo->inw[current_pos] = temp[2 * j + 1 + (count - left_over)];
217 }
218 }
219 left_over -= how_much;
220 }
221 if (MPI_UNWEIGHTED != weights) {
222 free(temp);
223 }
224
225
226
227
228
229 count = topo->outdegree;
230 temp = topo->out;
231 if (MPI_UNWEIGHTED != weights) {
232 count *= 2;
233 if (count > 0) {
234
235
236 temp = (int*)malloc(count*sizeof(int));
237 if (NULL == temp) {
238 err = OMPI_ERR_OUT_OF_RESOURCE;
239 goto bail_out;
240 }
241 }
242 }
243 for( left_over = count, current_pos = i = 0; left_over > 0; i++ ) {
244
245 MCA_PML_CALL(recv( &temp[count - left_over], left_over, (ompi_datatype_t*)&ompi_mpi_int,
246 MPI_ANY_SOURCE, MCA_TOPO_BASE_TAG_DIST_EDGE_OUT,
247 comm, &status ));
248 how_much = status._ucount / int_size;
249
250 if (MPI_UNWEIGHTED != weights) {
251 for( j = 0; j < ((int)how_much >> 1); j++, current_pos++ ) {
252 topo->out[current_pos] = temp[2 * j + 0 + (count - left_over)];
253 topo->outw[current_pos] = temp[2 * j + 1 + (count - left_over)];
254 }
255 }
256 left_over -= how_much;
257 }
258 if (MPI_UNWEIGHTED != weights) {
259 free(temp);
260 }
261
262 err = ompi_request_wait_all(pending_reqs, reqs, MPI_STATUSES_IGNORE);
263 *ptopo = topo;
264 topo = NULL;
265
266 bail_out:
267 if( NULL != reqs ) {
268 free(reqs);
269 }
270 if( NULL != rin ) {
271 free(rin);
272 }
273 if( NULL != cnt ) {
274 free(cnt);
275 }
276 if( NULL != topo ) {
277 OBJ_RELEASE(topo);
278 }
279 return err;
280 }
281
282 int mca_topo_base_dist_graph_create(mca_topo_base_module_t* module,
283 ompi_communicator_t *comm_old,
284 int n, const int nodes[],
285 const int degrees[], const int targets[],
286 const int weights[],
287 opal_info_t *info, int reorder,
288 ompi_communicator_t **newcomm)
289 {
290 int err;
291
292 if( OMPI_SUCCESS != (err = ompi_comm_create(comm_old,
293 comm_old->c_local_group,
294 newcomm)) ) {
295 OBJ_RELEASE(module);
296 return err;
297 }
298
299
300
301 if (info && info != &(MPI_INFO_NULL->super)) {
302 ompi_communicator_t *intermediate_comm = *newcomm;
303 ompi_comm_dup_with_info (intermediate_comm, info, newcomm);
304 ompi_comm_free(&intermediate_comm);
305 }
306
307 assert(NULL == (*newcomm)->c_topo);
308 (*newcomm)->c_topo = module;
309 (*newcomm)->c_topo->reorder = reorder;
310 (*newcomm)->c_flags |= OMPI_COMM_DIST_GRAPH;
311
312 err = mca_topo_base_dist_graph_distribute(module,
313 *newcomm,
314 n, nodes,
315 degrees, targets,
316 weights,
317 &((*newcomm)->c_topo->mtc.dist_graph));
318 if( OMPI_SUCCESS != err ) {
319 ompi_comm_free(newcomm);
320 }
321 return err;
322 }
323
324 static void mca_topo_base_comm_dist_graph_2_2_0_construct(mca_topo_base_comm_dist_graph_2_2_0_t * dist_graph) {
325 dist_graph->in = NULL;
326 dist_graph->inw = NULL;
327 dist_graph->out = NULL;
328 dist_graph->outw = NULL;
329 dist_graph->indegree = 0;
330 dist_graph->outdegree = 0;
331 dist_graph->weighted = false;
332 }
333
334 static void mca_topo_base_comm_dist_graph_2_2_0_destruct(mca_topo_base_comm_dist_graph_2_2_0_t * dist_graph) {
335 if (NULL != dist_graph->in) {
336 free(dist_graph->in);
337 }
338 if (NULL != dist_graph->inw) {
339 free(dist_graph->inw);
340 }
341 if (NULL != dist_graph->out) {
342 free(dist_graph->out);
343 }
344 if (NULL != dist_graph->outw) {
345 free(dist_graph->outw);
346 }
347 }
348
349 OBJ_CLASS_INSTANCE(mca_topo_base_comm_dist_graph_2_2_0_t, opal_object_t,
350 mca_topo_base_comm_dist_graph_2_2_0_construct,
351 mca_topo_base_comm_dist_graph_2_2_0_destruct);