This source file includes following definitions.
- init
- finalize
- xcast
- allgather
- allgather_recv
- xcast_recv
- barrier_release
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 #include "orte_config.h"
19 #include "orte/constants.h"
20 #include "orte/types.h"
21
22 #include <string.h>
23
24 #include "opal/dss/dss.h"
25 #include "opal/class/opal_list.h"
26 #include "opal/mca/pmix/pmix.h"
27 #include "opal/mca/compress/compress.h"
28
29 #include "orte/mca/errmgr/errmgr.h"
30 #include "orte/mca/rml/base/base.h"
31 #include "orte/mca/rml/base/rml_contact.h"
32 #include "orte/mca/routed/base/base.h"
33 #include "orte/mca/state/state.h"
34 #include "orte/util/name_fns.h"
35 #include "orte/util/nidmap.h"
36 #include "orte/util/proc_info.h"
37
38 #include "orte/mca/grpcomm/base/base.h"
39 #include "grpcomm_direct.h"
40
41
42
43 static int init(void);
44 static void finalize(void);
45 static int xcast(orte_vpid_t *vpids,
46 size_t nprocs,
47 opal_buffer_t *buf);
48 static int allgather(orte_grpcomm_coll_t *coll,
49 opal_buffer_t *buf);
50
51
52 orte_grpcomm_base_module_t orte_grpcomm_direct_module = {
53 init,
54 finalize,
55 xcast,
56 allgather
57 };
58
59
60 static void xcast_recv(int status, orte_process_name_t* sender,
61 opal_buffer_t* buffer, orte_rml_tag_t tag,
62 void* cbdata);
63 static void allgather_recv(int status, orte_process_name_t* sender,
64 opal_buffer_t* buffer, orte_rml_tag_t tag,
65 void* cbdata);
66 static void barrier_release(int status, orte_process_name_t* sender,
67 opal_buffer_t* buffer, orte_rml_tag_t tag,
68 void* cbdata);
69
70
71 static opal_list_t tracker;
72
73
74
75
76 static int init(void)
77 {
78 OBJ_CONSTRUCT(&tracker, opal_list_t);
79
80
81 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
82 ORTE_RML_TAG_XCAST,
83 ORTE_RML_PERSISTENT,
84 xcast_recv, NULL);
85 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
86 ORTE_RML_TAG_ALLGATHER_DIRECT,
87 ORTE_RML_PERSISTENT,
88 allgather_recv, NULL);
89
90 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
91 ORTE_RML_TAG_COLL_RELEASE,
92 ORTE_RML_PERSISTENT,
93 barrier_release, NULL);
94
95 return OPAL_SUCCESS;
96 }
97
98
99
100
101 static void finalize(void)
102 {
103 OPAL_LIST_DESTRUCT(&tracker);
104 return;
105 }
106
107 static int xcast(orte_vpid_t *vpids,
108 size_t nprocs,
109 opal_buffer_t *buf)
110 {
111 int rc;
112
113
114 OBJ_RETAIN(buf);
115 if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_XCAST,
116 orte_rml_send_callback, NULL))) {
117 ORTE_ERROR_LOG(rc);
118 OBJ_RELEASE(buf);
119 return rc;
120 }
121 return ORTE_SUCCESS;
122 }
123
124 static int allgather(orte_grpcomm_coll_t *coll,
125 opal_buffer_t *buf)
126 {
127 int rc;
128 opal_buffer_t *relay;
129
130 OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
131 "%s grpcomm:direct: allgather",
132 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
133
134
135
136
137
138 relay = OBJ_NEW(opal_buffer_t);
139
140 if (OPAL_SUCCESS != (rc = opal_dss.pack(relay, &coll->sig, 1, ORTE_SIGNATURE))) {
141 ORTE_ERROR_LOG(rc);
142 OBJ_RELEASE(relay);
143 return rc;
144 }
145
146
147 opal_dss.copy_payload(relay, buf);
148
149
150 OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
151 "%s grpcomm:direct:allgather sending to ourself",
152 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
153
154
155 rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_NAME, relay,
156 ORTE_RML_TAG_ALLGATHER_DIRECT,
157 orte_rml_send_callback, NULL);
158 return rc;
159 }
160
161 static void allgather_recv(int status, orte_process_name_t* sender,
162 opal_buffer_t* buffer, orte_rml_tag_t tag,
163 void* cbdata)
164 {
165 int32_t cnt;
166 int rc, ret;
167 orte_grpcomm_signature_t *sig;
168 opal_buffer_t *reply;
169 orte_grpcomm_coll_t *coll;
170
171 OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
172 "%s grpcomm:direct allgather recvd from %s",
173 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
174 ORTE_NAME_PRINT(sender)));
175
176
177 cnt = 1;
178 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &sig, &cnt, ORTE_SIGNATURE))) {
179 ORTE_ERROR_LOG(rc);
180 return;
181 }
182
183
184 if (NULL == (coll = orte_grpcomm_base_get_tracker(sig, true))) {
185 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
186 OBJ_RELEASE(sig);
187 return;
188 }
189
190
191 coll->nreported++;
192
193 opal_dss.copy_payload(&coll->bucket, buffer);
194
195 OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
196 "%s grpcomm:direct allgather recv nexpected %d nrep %d",
197 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
198 (int)coll->nexpected, (int)coll->nreported));
199
200
201 if (coll->nreported == coll->nexpected) {
202 if (ORTE_PROC_IS_HNP) {
203 OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
204 "%s grpcomm:direct allgather HNP reports complete",
205 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
206
207 reply = OBJ_NEW(opal_buffer_t);
208
209 if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &sig, 1, ORTE_SIGNATURE))) {
210 ORTE_ERROR_LOG(rc);
211 OBJ_RELEASE(reply);
212 OBJ_RELEASE(sig);
213 return;
214 }
215
216
217 ret = ORTE_SUCCESS;
218 if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) {
219 ORTE_ERROR_LOG(rc);
220 OBJ_RELEASE(reply);
221 OBJ_RELEASE(sig);
222 return;
223 }
224
225 opal_dss.copy_payload(reply, &coll->bucket);
226
227 (void)orte_grpcomm.xcast(sig, ORTE_RML_TAG_COLL_RELEASE, reply);
228 OBJ_RELEASE(reply);
229 } else {
230 OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
231 "%s grpcomm:direct allgather rollup complete - sending to %s",
232 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
233 ORTE_NAME_PRINT(ORTE_PROC_MY_PARENT)));
234
235 reply = OBJ_NEW(opal_buffer_t);
236
237 if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &sig, 1, ORTE_SIGNATURE))) {
238 ORTE_ERROR_LOG(rc);
239 OBJ_RELEASE(reply);
240 OBJ_RELEASE(sig);
241 return;
242 }
243
244 opal_dss.copy_payload(reply, &coll->bucket);
245
246 rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_PARENT, reply,
247 ORTE_RML_TAG_ALLGATHER_DIRECT,
248 orte_rml_send_callback, NULL);
249 }
250 }
251 OBJ_RELEASE(sig);
252 }
253
254 static void xcast_recv(int status, orte_process_name_t* sender,
255 opal_buffer_t* buffer, orte_rml_tag_t tg,
256 void* cbdata)
257 {
258 opal_list_item_t *item;
259 orte_namelist_t *nm;
260 int ret, cnt;
261 opal_buffer_t *relay=NULL, *rly;
262 orte_daemon_cmd_flag_t command = ORTE_DAEMON_NULL_CMD;
263 opal_buffer_t wireup, datbuf, *data;
264 opal_byte_object_t *bo;
265 int8_t flag;
266 orte_job_t *jdata;
267 orte_proc_t *rec;
268 opal_list_t coll;
269 orte_grpcomm_signature_t *sig;
270 orte_rml_tag_t tag;
271 size_t inlen, cmplen;
272 uint8_t *packed_data, *cmpdata;
273 int32_t nvals, i;
274 opal_value_t kv, *kval;
275 orte_process_name_t dmn;
276
277 OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
278 "%s grpcomm:direct:xcast:recv: with %d bytes",
279 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
280 (int)buffer->bytes_used));
281
282
283
284 rly = OBJ_NEW(opal_buffer_t);
285 opal_dss.copy_payload(rly, buffer);
286 OBJ_CONSTRUCT(&datbuf, opal_buffer_t);
287
288 OBJ_CONSTRUCT(&coll, opal_list_t);
289
290
291 cnt=1;
292 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &flag, &cnt, OPAL_INT8))) {
293 ORTE_ERROR_LOG(ret);
294 ORTE_FORCED_TERMINATE(ret);
295 OBJ_DESTRUCT(&datbuf);
296 OBJ_DESTRUCT(&coll);
297 OBJ_RELEASE(rly);
298 return;
299 }
300 if (flag) {
301
302 cnt=1;
303 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &inlen, &cnt, OPAL_SIZE))) {
304 ORTE_ERROR_LOG(ret);
305 ORTE_FORCED_TERMINATE(ret);
306 OBJ_DESTRUCT(&datbuf);
307 OBJ_DESTRUCT(&coll);
308 OBJ_RELEASE(rly);
309 return;
310 }
311
312 cnt=1;
313 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &cmplen, &cnt, OPAL_SIZE))) {
314 ORTE_ERROR_LOG(ret);
315 ORTE_FORCED_TERMINATE(ret);
316 OBJ_DESTRUCT(&datbuf);
317 OBJ_DESTRUCT(&coll);
318 OBJ_RELEASE(rly);
319 return;
320 }
321
322 packed_data = (uint8_t*)malloc(inlen);
323
324 cnt = inlen;
325 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, packed_data, &cnt, OPAL_UINT8))) {
326 ORTE_ERROR_LOG(ret);
327 free(packed_data);
328 ORTE_FORCED_TERMINATE(ret);
329 OBJ_DESTRUCT(&datbuf);
330 OBJ_DESTRUCT(&coll);
331 OBJ_RELEASE(rly);
332 return;
333 }
334
335 if (opal_compress.decompress_block(&cmpdata, cmplen,
336 packed_data, inlen)) {
337
338 opal_dss.load(&datbuf, cmpdata, cmplen);
339 data = &datbuf;
340 } else {
341 data = buffer;
342 }
343 free(packed_data);
344 } else {
345 data = buffer;
346 }
347
348
349 cnt=1;
350 if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &sig, &cnt, ORTE_SIGNATURE))) {
351 ORTE_ERROR_LOG(ret);
352 OBJ_DESTRUCT(&datbuf);
353 OBJ_DESTRUCT(&coll);
354 OBJ_RELEASE(rly);
355 ORTE_FORCED_TERMINATE(ret);
356 return;
357 }
358 OBJ_RELEASE(sig);
359
360
361 cnt=1;
362 if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &tag, &cnt, ORTE_RML_TAG))) {
363 ORTE_ERROR_LOG(ret);
364 OBJ_DESTRUCT(&datbuf);
365 OBJ_DESTRUCT(&coll);
366 OBJ_RELEASE(rly);
367 ORTE_FORCED_TERMINATE(ret);
368 return;
369 }
370
371
372
373
374 if (ORTE_RML_TAG_DAEMON == tag) {
375
376 cnt=1;
377 if (ORTE_SUCCESS == (ret = opal_dss.unpack(data, &command, &cnt, ORTE_DAEMON_CMD))) {
378
379
380 if (ORTE_DAEMON_EXIT_CMD == command ||
381 ORTE_DAEMON_HALT_VM_CMD == command) {
382 orte_orteds_term_ordered = true;
383 if (ORTE_DAEMON_HALT_VM_CMD == command) {
384
385 orte_abnormal_term_ordered = true;
386 }
387
388 relay = OBJ_NEW(opal_buffer_t);
389
390 if (OPAL_SUCCESS != (ret = opal_dss.pack(relay, &command, 1, ORTE_DAEMON_CMD))) {
391 ORTE_ERROR_LOG(ret);
392 goto relay;
393 }
394 opal_dss.copy_payload(relay, data);
395 } else if (ORTE_DAEMON_ADD_LOCAL_PROCS == command ||
396 ORTE_DAEMON_DVM_NIDMAP_CMD == command ||
397 ORTE_DAEMON_DVM_ADD_PROCS == command) {
398
399 relay = OBJ_NEW(opal_buffer_t);
400
401 if (OPAL_SUCCESS != (ret = opal_dss.pack(relay, &command, 1, ORTE_DAEMON_CMD))) {
402 ORTE_ERROR_LOG(ret);
403 goto relay;
404 }
405
406 cnt = 1;
407 if (OPAL_SUCCESS != (ret = opal_dss.unpack(data, &flag, &cnt, OPAL_INT8))) {
408 ORTE_ERROR_LOG(ret);
409 goto relay;
410 }
411 if (1 == flag) {
412 if (ORTE_SUCCESS != (ret = orte_util_decode_nidmap(data))) {
413 ORTE_ERROR_LOG(ret);
414 goto relay;
415 }
416 if (!ORTE_PROC_IS_HNP) {
417
418
419
420 orte_routed.update_routing_plan();
421 }
422
423 orte_routed_base.routing_enabled = true;
424
425
426 cnt=1;
427 if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &bo, &cnt, OPAL_BYTE_OBJECT))) {
428 ORTE_ERROR_LOG(ret);
429 goto relay;
430 }
431 if (0 < bo->size) {
432
433 OBJ_CONSTRUCT(&wireup, opal_buffer_t);
434 opal_dss.load(&wireup, bo->bytes, bo->size);
435
436 if (opal_pmix.legacy_get()) {
437 OBJ_CONSTRUCT(&kv, opal_value_t);
438 kv.key = OPAL_PMIX_PROC_URI;
439 kv.type = OPAL_STRING;
440 cnt=1;
441 while (OPAL_SUCCESS == (ret = opal_dss.unpack(&wireup, &dmn, &cnt, ORTE_NAME))) {
442 cnt = 1;
443 if (ORTE_SUCCESS != (ret = opal_dss.unpack(&wireup, &kv.data.string, &cnt, OPAL_STRING))) {
444 ORTE_ERROR_LOG(ret);
445 break;
446 }
447 if (OPAL_SUCCESS != (ret = opal_pmix.store_local(&dmn, &kv))) {
448 ORTE_ERROR_LOG(ret);
449 free(kv.data.string);
450 break;
451 }
452 free(kv.data.string);
453 kv.data.string = NULL;
454 }
455 if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != ret) {
456 ORTE_ERROR_LOG(ret);
457 }
458 } else {
459 cnt=1;
460 while (OPAL_SUCCESS == (ret = opal_dss.unpack(&wireup, &dmn, &cnt, ORTE_NAME))) {
461 cnt = 1;
462 if (ORTE_SUCCESS != (ret = opal_dss.unpack(&wireup, &nvals, &cnt, OPAL_INT32))) {
463 ORTE_ERROR_LOG(ret);
464 break;
465 }
466 for (i=0; i < nvals; i++) {
467 cnt = 1;
468 if (ORTE_SUCCESS != (ret = opal_dss.unpack(&wireup, &kval, &cnt, OPAL_VALUE))) {
469 ORTE_ERROR_LOG(ret);
470 break;
471 }
472 OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
473 "%s STORING MODEX DATA FOR PROC %s KEY %s",
474 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
475 ORTE_NAME_PRINT(&dmn), kval->key));
476 if (OPAL_SUCCESS != (ret = opal_pmix.store_local(&dmn, kval))) {
477 ORTE_ERROR_LOG(ret);
478 OBJ_RELEASE(kval);
479 break;
480 }
481 OBJ_RELEASE(kval);
482 }
483 }
484 if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != ret) {
485 ORTE_ERROR_LOG(ret);
486 }
487 }
488
489 OBJ_DESTRUCT(&wireup);
490 }
491 free(bo);
492 }
493
494
495 opal_dss.copy_payload(relay, data);
496 } else {
497 relay = OBJ_NEW(opal_buffer_t);
498
499 if (OPAL_SUCCESS != (ret = opal_dss.pack(relay, &command, 1, ORTE_DAEMON_CMD))) {
500 ORTE_ERROR_LOG(ret);
501 goto relay;
502 }
503
504 opal_dss.copy_payload(relay, data);
505 }
506 } else {
507 ORTE_ERROR_LOG(ret);
508 goto CLEANUP;
509 }
510 } else {
511
512 relay = OBJ_NEW(opal_buffer_t);
513 opal_dss.copy_payload(relay, data);
514 }
515
516 relay:
517 if (!orte_do_not_launch) {
518
519 orte_routed.get_routing_list(&coll);
520
521
522 if (opal_list_is_empty(&coll)) {
523 OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
524 "%s grpcomm:direct:send_relay - recipient list is empty!",
525 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
526 goto CLEANUP;
527 }
528
529
530 while (NULL != (item = opal_list_remove_first(&coll))) {
531 nm = (orte_namelist_t*)item;
532
533 OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
534 "%s grpcomm:direct:send_relay sending relay msg of %d bytes to %s",
535 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)rly->bytes_used,
536 ORTE_NAME_PRINT(&nm->name)));
537 OBJ_RETAIN(rly);
538
539
540
541 jdata = orte_get_job_data_object(nm->name.jobid);
542 if (NULL == (rec = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, nm->name.vpid))) {
543 if (!orte_abnormal_term_ordered && !orte_orteds_term_ordered) {
544 opal_output(0, "%s grpcomm:direct:send_relay proc %s not found - cannot relay",
545 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&nm->name));
546 }
547 OBJ_RELEASE(rly);
548 OBJ_RELEASE(item);
549 ORTE_FORCED_TERMINATE(ORTE_ERR_UNREACH);
550 continue;
551 }
552 if ((ORTE_PROC_STATE_RUNNING < rec->state &&
553 ORTE_PROC_STATE_CALLED_ABORT != rec->state) ||
554 !ORTE_FLAG_TEST(rec, ORTE_PROC_FLAG_ALIVE)) {
555 if (!orte_abnormal_term_ordered && !orte_orteds_term_ordered) {
556 opal_output(0, "%s grpcomm:direct:send_relay proc %s not running - cannot relay: %s ",
557 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&nm->name),
558 ORTE_FLAG_TEST(rec, ORTE_PROC_FLAG_ALIVE) ? orte_proc_state_to_str(rec->state) : "NOT ALIVE");
559 }
560 OBJ_RELEASE(rly);
561 OBJ_RELEASE(item);
562 ORTE_FORCED_TERMINATE(ORTE_ERR_UNREACH);
563 continue;
564 }
565 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(&nm->name, rly, ORTE_RML_TAG_XCAST,
566 orte_rml_send_callback, NULL))) {
567 ORTE_ERROR_LOG(ret);
568 OBJ_RELEASE(rly);
569 OBJ_RELEASE(item);
570 ORTE_FORCED_TERMINATE(ORTE_ERR_UNREACH);
571 continue;
572 }
573 OBJ_RELEASE(item);
574 }
575 }
576
577 CLEANUP:
578
579 OPAL_LIST_DESTRUCT(&coll);
580 OBJ_RELEASE(rly);
581
582
583
584
585
586 if (ORTE_DAEMON_DVM_NIDMAP_CMD != command) {
587 ORTE_RML_POST_MESSAGE(ORTE_PROC_MY_NAME, tag, 1,
588 relay->base_ptr, relay->bytes_used);
589 relay->base_ptr = NULL;
590 relay->bytes_used = 0;
591 }
592 if (NULL != relay) {
593 OBJ_RELEASE(relay);
594 }
595 OBJ_DESTRUCT(&datbuf);
596 }
597
598 static void barrier_release(int status, orte_process_name_t* sender,
599 opal_buffer_t* buffer, orte_rml_tag_t tag,
600 void* cbdata)
601 {
602 int32_t cnt;
603 int rc, ret;
604 orte_grpcomm_signature_t *sig;
605 orte_grpcomm_coll_t *coll;
606
607 OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
608 "%s grpcomm:direct: barrier release called with %d bytes",
609 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)buffer->bytes_used));
610
611
612 cnt = 1;
613 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &sig, &cnt, ORTE_SIGNATURE))) {
614 ORTE_ERROR_LOG(rc);
615 return;
616 }
617
618
619 cnt = 1;
620 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &ret, &cnt, OPAL_INT))) {
621 ORTE_ERROR_LOG(rc);
622 return;
623 }
624
625
626
627
628 if (NULL == (coll = orte_grpcomm_base_get_tracker(sig, false))) {
629 OBJ_RELEASE(sig);
630 return;
631 }
632
633
634 if (NULL != coll->cbfunc) {
635 coll->cbfunc(ret, buffer, coll->cbdata);
636 }
637 opal_list_remove_item(&orte_grpcomm_base.ongoing, &coll->super);
638 OBJ_RELEASE(coll);
639 OBJ_RELEASE(sig);
640 }