This source file includes following definitions.
- orte_routed_base_xcast_routing
- orte_routed_base_process_callback
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 #include "orte_config.h"
24 #include "orte/constants.h"
25 #include "orte/types.h"
26
27 #include "opal/dss/dss.h"
28 #include "opal/util/argv.h"
29
30 #include "orte/mca/errmgr/errmgr.h"
31 #include "orte/mca/ess/ess.h"
32 #include "orte/mca/odls/odls_types.h"
33 #include "orte/mca/rml/rml.h"
34 #include "orte/mca/state/state.h"
35 #include "orte/runtime/orte_globals.h"
36 #include "orte/runtime/orte_wait.h"
37
38 #include "orte/mca/routed/base/base.h"
39
40 void orte_routed_base_xcast_routing(opal_list_t *coll, opal_list_t *my_children)
41 {
42 orte_routed_tree_t *child;
43 orte_namelist_t *nm;
44 int i;
45 orte_proc_t *proc;
46 orte_job_t *daemons;
47
48
49
50
51 if (ORTE_PROC_IS_HNP) {
52 if (orte_abnormal_term_ordered || !orte_routing_is_enabled) {
53 daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid);
54 for (i=1; i < daemons->procs->size; i++) {
55 if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(daemons->procs, i))) {
56 continue;
57 }
58
59 if (ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_ALIVE)) {
60 nm = OBJ_NEW(orte_namelist_t);
61 nm->name.jobid = ORTE_PROC_MY_NAME->jobid;
62 nm->name.vpid = proc->name.vpid;
63 opal_list_append(coll, &nm->super);
64 }
65 }
66
67 if (0 == opal_list_get_size(coll)) {
68 ORTE_ACTIVATE_JOB_STATE(NULL, ORTE_JOB_STATE_DAEMONS_TERMINATED);
69 }
70 } else {
71
72 OPAL_LIST_FOREACH(child, my_children, orte_routed_tree_t) {
73 nm = OBJ_NEW(orte_namelist_t);
74 nm->name.jobid = ORTE_PROC_MY_NAME->jobid;
75 nm->name.vpid = child->vpid;
76 opal_list_append(coll, &nm->super);
77 }
78 }
79 } else {
80
81 OPAL_LIST_FOREACH(child, my_children, orte_routed_tree_t) {
82 nm = OBJ_NEW(orte_namelist_t);
83 nm->name.jobid = ORTE_PROC_MY_NAME->jobid;
84 nm->name.vpid = child->vpid;
85 opal_list_append(coll, &nm->super);
86 }
87 }
88 }
89
90 int orte_routed_base_process_callback(orte_jobid_t job, opal_buffer_t *buffer)
91 {
92 orte_proc_t *proc;
93 orte_job_t *jdata;
94 orte_std_cntr_t cnt;
95 char *rml_uri;
96 orte_vpid_t vpid;
97 int rc;
98
99
100 if (NULL == (jdata = orte_get_job_data_object(job))) {
101
102 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
103 return ORTE_ERR_NOT_FOUND;
104 }
105
106
107 cnt = 1;
108 while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &vpid, &cnt, ORTE_VPID))) {
109
110 if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &rml_uri, &cnt, OPAL_STRING))) {
111 ORTE_ERROR_LOG(rc);
112 continue;
113 }
114
115 OPAL_OUTPUT_VERBOSE((2, orte_routed_base_framework.framework_output,
116 "%s routed_base:callback got uri %s for job %s rank %s",
117 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
118 (NULL == rml_uri) ? "NULL" : rml_uri,
119 ORTE_JOBID_PRINT(job), ORTE_VPID_PRINT(vpid)));
120
121 if (NULL == rml_uri) {
122
123 ORTE_ERROR_LOG(ORTE_ERR_FATAL);
124 return ORTE_ERR_FATAL;
125 }
126
127 if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, vpid))) {
128 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
129 continue;
130 }
131
132
133 proc->rml_uri = strdup(rml_uri);
134 free(rml_uri);
135
136 cnt = 1;
137 }
138 if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
139 ORTE_ERROR_LOG(rc);
140 return rc;
141 }
142
143 return ORTE_SUCCESS;
144 }