This source file includes following definitions.
- gccon
- gcdes
- orte_grpcomm_API_xcast
- allgather_stub
- orte_grpcomm_API_allgather
- orte_grpcomm_base_get_tracker
- create_dmns
- pack_xcast
- orte_grpcomm_base_mark_distance_recv
- orte_grpcomm_base_check_distance_recv
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 #include "orte_config.h"
32
33
34 #include "opal/dss/dss.h"
35
36 #include "opal/mca/compress/compress.h"
37 #include "orte/util/proc_info.h"
38 #include "orte/util/error_strings.h"
39 #include "orte/mca/errmgr/errmgr.h"
40 #include "orte/mca/odls/base/base.h"
41 #include "orte/mca/rmaps/rmaps_types.h"
42 #include "orte/mca/rml/rml.h"
43 #include "orte/mca/routed/routed.h"
44 #include "orte/mca/state/state.h"
45 #include "orte/util/name_fns.h"
46 #include "orte/util/threads.h"
47 #include "orte/runtime/orte_globals.h"
48
49 #include "orte/mca/grpcomm/grpcomm.h"
50 #include "orte/mca/grpcomm/base/base.h"
51
52 static int pack_xcast(orte_grpcomm_signature_t *sig,
53 opal_buffer_t *buffer,
54 opal_buffer_t *message,
55 orte_rml_tag_t tag);
56
57 static int create_dmns(orte_grpcomm_signature_t *sig,
58 orte_vpid_t **dmns, size_t *ndmns);
59
60 typedef struct {
61 opal_object_t super;
62 opal_event_t ev;
63 orte_grpcomm_signature_t *sig;
64 opal_buffer_t *buf;
65 orte_grpcomm_cbfunc_t cbfunc;
66 void *cbdata;
67 } orte_grpcomm_caddy_t;
68 static void gccon(orte_grpcomm_caddy_t *p)
69 {
70 p->sig = NULL;
71 p->buf = NULL;
72 p->cbfunc = NULL;
73 p->cbdata = NULL;
74 }
75 static void gcdes(orte_grpcomm_caddy_t *p)
76 {
77 if (NULL != p->buf) {
78 OBJ_RELEASE(p->buf);
79 }
80 }
81 static OBJ_CLASS_INSTANCE(orte_grpcomm_caddy_t,
82 opal_object_t,
83 gccon, gcdes);
84
85 int orte_grpcomm_API_xcast(orte_grpcomm_signature_t *sig,
86 orte_rml_tag_t tag,
87 opal_buffer_t *msg)
88 {
89 int rc = ORTE_ERROR;
90 opal_buffer_t *buf;
91 orte_grpcomm_base_active_t *active;
92 orte_vpid_t *dmns;
93 size_t ndmns;
94
95 OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
96 "%s grpcomm:base:xcast sending %u bytes to tag %ld",
97 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
98 (NULL == msg) ? 0 : (unsigned int)msg->bytes_used, (long)tag));
99
100
101
102
103
104 buf = OBJ_NEW(opal_buffer_t);
105
106
107 if (ORTE_SUCCESS != (rc = create_dmns(sig, &dmns, &ndmns))) {
108 ORTE_ERROR_LOG(rc);
109 OBJ_RELEASE(buf);
110 return rc;
111 }
112
113
114 if (ORTE_SUCCESS != (rc = pack_xcast(sig, buf, msg, tag))) {
115 ORTE_ERROR_LOG(rc);
116 OBJ_RELEASE(buf);
117 if (NULL != dmns) {
118 free(dmns);
119 }
120 return rc;
121 }
122
123
124 OPAL_LIST_FOREACH(active, &orte_grpcomm_base.actives, orte_grpcomm_base_active_t) {
125 if (NULL != active->module->xcast) {
126 if (ORTE_SUCCESS == (rc = active->module->xcast(dmns, ndmns, buf))) {
127 break;
128 }
129 }
130 }
131 OBJ_RELEASE(buf);
132 if (NULL != dmns) {
133 free(dmns);
134 }
135 return rc;
136 }
137
138 static void allgather_stub(int fd, short args, void *cbdata)
139 {
140 orte_grpcomm_caddy_t *cd = (orte_grpcomm_caddy_t*)cbdata;
141 int ret = OPAL_SUCCESS;
142 int rc;
143 orte_grpcomm_base_active_t *active;
144 orte_grpcomm_coll_t *coll;
145 uint32_t *seq_number;
146
147 ORTE_ACQUIRE_OBJECT(cd);
148
149 OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
150 "%s grpcomm:base:allgather stub",
151 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
152
153
154
155
156 ret = opal_hash_table_get_value_ptr(&orte_grpcomm_base.sig_table, (void *)cd->sig->signature, cd->sig->sz * sizeof(orte_process_name_t), (void **)&seq_number);
157 if (OPAL_ERR_NOT_FOUND == ret) {
158 seq_number = (uint32_t *)malloc(sizeof(uint32_t));
159 *seq_number = 0;
160 } else if (OPAL_SUCCESS == ret) {
161 *seq_number = *seq_number + 1;
162 } else {
163 OPAL_OUTPUT((orte_grpcomm_base_framework.framework_output,
164 "%s rpcomm:base:allgather cannot get signature from hash table",
165 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
166 ORTE_ERROR_LOG(ret);
167 OBJ_RELEASE(cd);
168 return;
169 }
170 ret = opal_hash_table_set_value_ptr(&orte_grpcomm_base.sig_table, (void *)cd->sig->signature, cd->sig->sz * sizeof(orte_process_name_t), (void *)seq_number);
171 if (OPAL_SUCCESS != ret) {
172 OPAL_OUTPUT((orte_grpcomm_base_framework.framework_output,
173 "%s rpcomm:base:allgather cannot add new signature to hash table",
174 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
175 ORTE_ERROR_LOG(ret);
176 OBJ_RELEASE(cd);
177 return;
178 }
179 coll = orte_grpcomm_base_get_tracker(cd->sig, true);
180 if (NULL == coll) {
181 OBJ_RELEASE(cd->sig);
182 OBJ_RELEASE(cd);
183 return;
184 }
185 OBJ_RELEASE(cd->sig);
186 coll->cbfunc = cd->cbfunc;
187 coll->cbdata = cd->cbdata;
188
189
190 OPAL_LIST_FOREACH(active, &orte_grpcomm_base.actives, orte_grpcomm_base_active_t) {
191 if (NULL != active->module->allgather) {
192 if (ORTE_SUCCESS == (rc = active->module->allgather(coll, cd->buf))) {
193 break;
194 }
195 }
196 }
197 OBJ_RELEASE(cd);
198 }
199
200 int orte_grpcomm_API_allgather(orte_grpcomm_signature_t *sig,
201 opal_buffer_t *buf,
202 orte_grpcomm_cbfunc_t cbfunc,
203 void *cbdata)
204 {
205 orte_grpcomm_caddy_t *cd;
206
207 OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
208 "%s grpcomm:base:allgather",
209 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
210
211
212
213 cd = OBJ_NEW(orte_grpcomm_caddy_t);
214
215 OBJ_RETAIN(buf);
216 opal_dss.copy((void **)&cd->sig, (void *)sig, ORTE_SIGNATURE);
217 cd->buf = buf;
218 cd->cbfunc = cbfunc;
219 cd->cbdata = cbdata;
220 opal_event_set(orte_event_base, &cd->ev, -1, OPAL_EV_WRITE, allgather_stub, cd);
221 opal_event_set_priority(&cd->ev, ORTE_MSG_PRI);
222 ORTE_POST_OBJECT(cd);
223 opal_event_active(&cd->ev, OPAL_EV_WRITE, 1);
224 return ORTE_SUCCESS;
225 }
226
227 orte_grpcomm_coll_t* orte_grpcomm_base_get_tracker(orte_grpcomm_signature_t *sig, bool create)
228 {
229 orte_grpcomm_coll_t *coll;
230 int rc;
231 orte_namelist_t *nm;
232 opal_list_t children;
233 size_t n;
234
235
236 OPAL_LIST_FOREACH(coll, &orte_grpcomm_base.ongoing, orte_grpcomm_coll_t) {
237 if (NULL == sig->signature) {
238 if (NULL == coll->sig->signature) {
239
240
241 return coll;
242 }
243
244 break;
245 }
246 if (OPAL_EQUAL == (rc = opal_dss.compare(sig, coll->sig, ORTE_SIGNATURE))) {
247 OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
248 "%s grpcomm:base:returning existing collective",
249 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
250 return coll;
251 }
252 }
253
254
255 if (!create) {
256 OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
257 "%s grpcomm:base: not creating new coll",
258 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
259
260 return NULL;
261 }
262 coll = OBJ_NEW(orte_grpcomm_coll_t);
263 opal_dss.copy((void **)&coll->sig, (void *)sig, ORTE_SIGNATURE);
264
265 if (1 < opal_output_get_verbosity(orte_grpcomm_base_framework.framework_output)) {
266 char *tmp=NULL;
267 (void)opal_dss.print(&tmp, NULL, coll->sig, ORTE_SIGNATURE);
268 opal_output(0, "%s grpcomm:base: creating new coll for%s",
269 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), tmp);
270 free(tmp);
271 }
272
273 opal_list_append(&orte_grpcomm_base.ongoing, &coll->super);
274
275
276 if (ORTE_SUCCESS != (rc = create_dmns(sig, &coll->dmns, &coll->ndmns))) {
277 ORTE_ERROR_LOG(rc);
278 return NULL;
279 }
280
281
282
283
284 OBJ_CONSTRUCT(&children, opal_list_t);
285 orte_routed.get_routing_list(&children);
286 while (NULL != (nm = (orte_namelist_t*)opal_list_remove_first(&children))) {
287 for (n=0; n < coll->ndmns; n++) {
288 if (nm->name.vpid == coll->dmns[n]) {
289 coll->nexpected++;
290 break;
291 }
292 }
293 OBJ_RELEASE(nm);
294 }
295 OPAL_LIST_DESTRUCT(&children);
296
297
298
299
300 for (n=0; n < coll->ndmns; n++) {
301 if (coll->dmns[n] == ORTE_PROC_MY_NAME->vpid) {
302 coll->nexpected++;
303 break;
304 }
305 }
306
307 return coll;
308 }
309
310 static int create_dmns(orte_grpcomm_signature_t *sig,
311 orte_vpid_t **dmns, size_t *ndmns)
312 {
313 size_t n;
314 orte_job_t *jdata;
315 orte_proc_t *proc;
316 orte_node_t *node;
317 int i;
318 opal_list_t ds;
319 orte_namelist_t *nm;
320 orte_vpid_t vpid;
321 bool found;
322 size_t nds;
323 orte_vpid_t *dns;
324
325 OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
326 "%s grpcomm:base:create_dmns called with %s signature",
327 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
328 (NULL == sig->signature) ? "NULL" : "NON-NULL"));
329
330
331
332 if (NULL == sig->signature || ORTE_PROC_MY_NAME->jobid == sig->signature[0].jobid) {
333 *ndmns = orte_process_info.num_procs;
334 *dmns = NULL;
335 return ORTE_SUCCESS;
336 }
337
338 if (ORTE_VPID_WILDCARD == sig->signature[0].vpid) {
339 OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
340 "%s grpcomm:base:create_dmns called for all procs in job %s",
341 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
342 ORTE_JOBID_PRINT(sig->signature[0].jobid)));
343
344 if (NULL == (jdata = orte_get_job_data_object(sig->signature[0].jobid))) {
345 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
346 ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
347 *ndmns = 0;
348 *dmns = NULL;
349 return ORTE_ERR_NOT_FOUND;
350 }
351 if (NULL == jdata->map || 0 == jdata->map->num_nodes) {
352
353
354
355 if (ORTE_PROC_IS_HNP) {
356 dns = (orte_vpid_t*)malloc(sizeof(vpid));
357 dns[0] = ORTE_PROC_MY_NAME->vpid;
358 *ndmns = 1;
359 *dmns = dns;
360 return ORTE_SUCCESS;
361 }
362 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
363 ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
364 *ndmns = 0;
365 *dmns = NULL;
366 return ORTE_ERR_NOT_FOUND;
367 }
368 dns = (orte_vpid_t*)malloc(jdata->map->num_nodes * sizeof(vpid));
369 nds = 0;
370 for (i=0; i < jdata->map->nodes->size && (int)nds < jdata->map->num_nodes; i++) {
371 if (NULL == (node = opal_pointer_array_get_item(jdata->map->nodes, i))) {
372 continue;
373 }
374 if (NULL == node->daemon) {
375
376 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
377 free(dns);
378 ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
379 *ndmns = 0;
380 *dmns = NULL;
381 return ORTE_ERR_NOT_FOUND;
382 }
383 OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
384 "%s grpcomm:base:create_dmns adding daemon %s to array",
385 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
386 ORTE_NAME_PRINT(&node->daemon->name)));
387 dns[nds++] = node->daemon->name.vpid;
388 }
389 } else {
390
391
392
393 OBJ_CONSTRUCT(&ds, opal_list_t);
394 for (n=0; n < sig->sz; n++) {
395 if (NULL == (jdata = orte_get_job_data_object(sig->signature[n].jobid))) {
396 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
397 OPAL_LIST_DESTRUCT(&ds);
398 ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
399 *ndmns = 0;
400 *dmns = NULL;
401 return ORTE_ERR_NOT_FOUND;
402 }
403 OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
404 "%s sign: GETTING PROC OBJECT FOR %s",
405 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
406 ORTE_NAME_PRINT(&sig->signature[n])));
407 if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, sig->signature[n].vpid))) {
408 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
409 OPAL_LIST_DESTRUCT(&ds);
410 ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
411 *ndmns = 0;
412 *dmns = NULL;
413 return ORTE_ERR_NOT_FOUND;
414 }
415 if (NULL == proc->node || NULL == proc->node->daemon) {
416 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
417 OPAL_LIST_DESTRUCT(&ds);
418 ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
419 *ndmns = 0;
420 *dmns = NULL;
421 return ORTE_ERR_NOT_FOUND;
422 }
423 vpid = proc->node->daemon->name.vpid;
424 found = false;
425 OPAL_LIST_FOREACH(nm, &ds, orte_namelist_t) {
426 if (nm->name.vpid == vpid) {
427 found = true;
428 break;
429 }
430 }
431 if (!found) {
432 nm = OBJ_NEW(orte_namelist_t);
433 nm->name.vpid = vpid;
434 opal_list_append(&ds, &nm->super);
435 }
436 }
437 if (0 == opal_list_get_size(&ds)) {
438 ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
439 OPAL_LIST_DESTRUCT(&ds);
440 ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
441 *ndmns = 0;
442 *dmns = NULL;
443 return ORTE_ERR_NOT_FOUND;
444 }
445 dns = (orte_vpid_t*)malloc(opal_list_get_size(&ds) * sizeof(orte_vpid_t));
446 nds = 0;
447 while (NULL != (nm = (orte_namelist_t*)opal_list_remove_first(&ds))) {
448 OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
449 "%s grpcomm:base:create_dmns adding daemon %s to array",
450 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
451 ORTE_NAME_PRINT(&nm->name)));
452 dns[nds++] = nm->name.vpid;
453 OBJ_RELEASE(nm);
454 }
455 OPAL_LIST_DESTRUCT(&ds);
456 }
457 *dmns = dns;
458 *ndmns = nds;
459 return ORTE_SUCCESS;
460 }
461
462 static int pack_xcast(orte_grpcomm_signature_t *sig,
463 opal_buffer_t *buffer,
464 opal_buffer_t *message,
465 orte_rml_tag_t tag)
466 {
467 int rc;
468 opal_buffer_t data;
469 int8_t flag;
470 uint8_t *cmpdata;
471 size_t cmplen;
472
473
474 OBJ_CONSTRUCT(&data, opal_buffer_t);
475
476
477 if (ORTE_SUCCESS != (rc = opal_dss.pack(&data, &sig, 1, ORTE_SIGNATURE))) {
478 ORTE_ERROR_LOG(rc);
479 OBJ_DESTRUCT(&data);
480 return rc;
481 }
482
483 if (ORTE_SUCCESS != (rc = opal_dss.pack(&data, &tag, 1, ORTE_RML_TAG))) {
484 ORTE_ERROR_LOG(rc);
485 OBJ_DESTRUCT(&data);
486 return rc;
487 }
488
489
490
491
492
493 if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(&data, message))) {
494 ORTE_ERROR_LOG(rc);
495 OBJ_DESTRUCT(&data);
496 return rc;
497 }
498
499
500 if (opal_compress.compress_block((uint8_t*)data.base_ptr, data.bytes_used,
501 &cmpdata, &cmplen)) {
502
503 flag = 1;
504 if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &flag, 1, OPAL_INT8))) {
505 ORTE_ERROR_LOG(rc);
506 free(cmpdata);
507 OBJ_DESTRUCT(&data);
508 return rc;
509 }
510
511 if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &cmplen, 1, OPAL_SIZE))) {
512 ORTE_ERROR_LOG(rc);
513 free(cmpdata);
514 OBJ_DESTRUCT(&data);
515 return rc;
516 }
517
518 if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &data.bytes_used, 1, OPAL_SIZE))) {
519 ORTE_ERROR_LOG(rc);
520 free(cmpdata);
521 OBJ_DESTRUCT(&data);
522 return rc;
523 }
524
525 if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, cmpdata, cmplen, OPAL_UINT8))) {
526 ORTE_ERROR_LOG(rc);
527 free(cmpdata);
528 OBJ_DESTRUCT(&data);
529 return rc;
530 }
531 OBJ_DESTRUCT(&data);
532 free(cmpdata);
533 } else {
534
535 flag = 0;
536 if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &flag, 1, OPAL_INT8))) {
537 ORTE_ERROR_LOG(rc);
538 OBJ_DESTRUCT(&data);
539 free(cmpdata);
540 return rc;
541 }
542
543 opal_dss.copy_payload(buffer, &data);
544 OBJ_DESTRUCT(&data);
545 }
546
547 OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
548 "MSG SIZE: %lu", buffer->bytes_used));
549 return ORTE_SUCCESS;
550 }
551
552 void orte_grpcomm_base_mark_distance_recv (orte_grpcomm_coll_t *coll,
553 uint32_t distance) {
554 opal_bitmap_set_bit (&coll->distance_mask_recv, distance);
555 }
556
557 unsigned int orte_grpcomm_base_check_distance_recv (orte_grpcomm_coll_t *coll,
558 uint32_t distance) {
559 return opal_bitmap_is_set_bit (&coll->distance_mask_recv, distance);
560 }