This source file includes following definitions.
- orte_plm_base_comm_start
- orte_plm_base_comm_stop
- orte_plm_base_recv
- orte_plm_base_receive_process_msg
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 #include "orte_config.h"
32
33 #include <string.h>
34 #ifdef HAVE_SYS_TIME_H
35 #include <sys/time.h>
36 #endif
37
38 #include "orte/mca/mca.h"
39 #include "opal/dss/dss.h"
40 #include "opal/threads/threads.h"
41 #include "opal/util/argv.h"
42 #include "opal/util/opal_environ.h"
43
44 #include "orte/constants.h"
45 #include "orte/types.h"
46 #include "orte/util/proc_info.h"
47 #include "orte/util/error_strings.h"
48 #include "orte/mca/errmgr/errmgr.h"
49 #include "orte/mca/ess/ess.h"
50 #include "orte/mca/rml/rml.h"
51 #include "orte/mca/rml/rml_types.h"
52 #include "orte/mca/routed/routed.h"
53 #include "orte/mca/ras/base/base.h"
54 #include "orte/util/name_fns.h"
55 #include "orte/mca/state/state.h"
56 #include "orte/runtime/orte_globals.h"
57 #include "orte/runtime/orte_quit.h"
58
59 #include "orte/mca/plm/plm_types.h"
60 #include "orte/mca/plm/plm.h"
61 #include "orte/mca/plm/base/plm_private.h"
62 #include "orte/mca/plm/base/base.h"
63
64 static bool recv_issued=false;
65
66 int orte_plm_base_comm_start(void)
67 {
68 if (recv_issued) {
69 return ORTE_SUCCESS;
70 }
71
72 OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output,
73 "%s plm:base:receive start comm",
74 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
75
76 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
77 ORTE_RML_TAG_PLM,
78 ORTE_RML_PERSISTENT,
79 orte_plm_base_recv,
80 NULL);
81 if (ORTE_PROC_IS_HNP) {
82 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
83 ORTE_RML_TAG_ORTED_CALLBACK,
84 ORTE_RML_PERSISTENT,
85 orte_plm_base_daemon_callback, NULL);
86 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
87 ORTE_RML_TAG_REPORT_REMOTE_LAUNCH,
88 ORTE_RML_PERSISTENT,
89 orte_plm_base_daemon_failed, NULL);
90 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
91 ORTE_RML_TAG_TOPOLOGY_REPORT,
92 ORTE_RML_PERSISTENT,
93 orte_plm_base_daemon_topology, NULL);
94 }
95 recv_issued = true;
96
97 return ORTE_SUCCESS;
98 }
99
100
101 int orte_plm_base_comm_stop(void)
102 {
103 if (!recv_issued) {
104 return ORTE_SUCCESS;
105 }
106
107 OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output,
108 "%s plm:base:receive stop comm",
109 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
110
111 orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_PLM);
112 if (ORTE_PROC_IS_HNP) {
113 orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ORTED_CALLBACK);
114 orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_REPORT_REMOTE_LAUNCH);
115 orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_TOPOLOGY_REPORT);
116 }
117 recv_issued = false;
118
119 return ORTE_SUCCESS;
120 }
121
122
123
124 void orte_plm_base_recv(int status, orte_process_name_t* sender,
125 opal_buffer_t* buffer, orte_rml_tag_t tag,
126 void* cbdata)
127 {
128 orte_plm_cmd_flag_t command;
129 orte_std_cntr_t count;
130 orte_jobid_t job;
131 orte_job_t *jdata, *parent;
132 opal_buffer_t *answer;
133 orte_vpid_t vpid;
134 orte_proc_t *proc;
135 orte_proc_state_t state;
136 orte_exit_code_t exit_code;
137 int32_t rc=ORTE_SUCCESS, ret;
138 orte_app_context_t *app, *child_app;
139 orte_process_name_t name, *nptr;
140 pid_t pid;
141 bool running;
142 int i, room;
143 char **env;
144 char *prefix_dir;
145
146 OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output,
147 "%s plm:base:receive processing msg",
148 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
149
150 count = 1;
151 if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &command, &count, ORTE_PLM_CMD))) {
152 ORTE_ERROR_LOG(rc);
153 goto CLEANUP;
154 }
155
156 switch (command) {
157 case ORTE_PLM_LAUNCH_JOB_CMD:
158 OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output,
159 "%s plm:base:receive job launch command from %s",
160 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
161 ORTE_NAME_PRINT(sender)));
162
163
164 count = 1;
165 if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &jdata, &count, ORTE_JOB))) {
166 ORTE_ERROR_LOG(rc);
167 goto ANSWER_LAUNCH;
168 }
169
170
171 jdata->originator.jobid = sender->jobid;
172 jdata->originator.vpid = sender->vpid;
173
174
175
176 nptr = &name;
177 if (!orte_get_attribute(&jdata->attributes, ORTE_JOB_LAUNCH_PROXY, (void**)&nptr, OPAL_NAME)) {
178 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
179 rc = ORTE_ERR_NOT_FOUND;
180 goto ANSWER_LAUNCH;
181 }
182
183
184 if (NULL != (parent = orte_get_job_data_object(name.jobid))) {
185
186
187
188
189
190
191
192 app = (orte_app_context_t*)opal_pointer_array_get_item(parent->apps, 0);
193 child_app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, 0);
194 prefix_dir = NULL;
195 if (orte_get_attribute(&app->attributes, ORTE_APP_PREFIX_DIR, (void**)&prefix_dir, OPAL_STRING) &&
196 !orte_get_attribute(&child_app->attributes, ORTE_APP_PREFIX_DIR, NULL, OPAL_STRING)) {
197 orte_set_attribute(&child_app->attributes, ORTE_APP_PREFIX_DIR, ORTE_ATTR_GLOBAL, prefix_dir, OPAL_STRING);
198 }
199 if (NULL != prefix_dir) {
200 free(prefix_dir);
201 }
202 }
203
204
205
206
207 if (NULL != orte_forwarded_envars) {
208 for (i=0; i < jdata->apps->size; i++) {
209 if (NULL == (app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, i))) {
210 continue;
211 }
212 env = opal_environ_merge(orte_forwarded_envars, app->env);
213 opal_argv_free(app->env);
214 app->env = env;
215 }
216 }
217
218 OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output,
219 "%s plm:base:receive adding hosts",
220 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
221
222
223 if (ORTE_SUCCESS != (rc = orte_ras_base_add_hosts(jdata))) {
224 ORTE_ERROR_LOG(rc);
225 goto ANSWER_LAUNCH;
226 }
227
228 if (NULL != parent) {
229 if (NULL == parent->bookmark) {
230
231 if (NULL != (proc = (orte_proc_t*)opal_pointer_array_get_item(parent->procs, sender->vpid))) {
232
233
234
235
236
237 jdata->bookmark = proc->node;
238 }
239 } else {
240 jdata->bookmark = parent->bookmark;
241 }
242
243 jdata->bkmark_obj = parent->bkmark_obj;
244 }
245
246
247 OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output,
248 "%s plm:base:receive calling spawn",
249 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
250 if (ORTE_SUCCESS != (rc = orte_plm.spawn(jdata))) {
251 ORTE_ERROR_LOG(rc);
252 goto ANSWER_LAUNCH;
253 }
254 break;
255 ANSWER_LAUNCH:
256 OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output,
257 "%s plm:base:receive - error on launch: %d",
258 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), rc));
259
260
261 answer = OBJ_NEW(opal_buffer_t);
262
263
264 if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &rc, 1, OPAL_INT32))) {
265 ORTE_ERROR_LOG(ret);
266 }
267
268
269 job = ORTE_JOBID_INVALID;
270 if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &job, 1, ORTE_JOBID))) {
271 ORTE_ERROR_LOG(ret);
272 }
273
274 if (orte_get_attribute(&jdata->attributes, ORTE_JOB_ROOM_NUM, (void**)&room, OPAL_INT)) {
275 if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &room, 1, OPAL_INT))) {
276 ORTE_ERROR_LOG(ret);
277 }
278 }
279
280
281 if (0 > (ret = orte_rml.send_buffer_nb(sender, answer, ORTE_RML_TAG_LAUNCH_RESP,
282 orte_rml_send_callback, NULL))) {
283 ORTE_ERROR_LOG(ret);
284 OBJ_RELEASE(answer);
285 }
286 break;
287
288 case ORTE_PLM_UPDATE_PROC_STATE:
289 opal_output_verbose(5, orte_plm_base_framework.framework_output,
290 "%s plm:base:receive update proc state command from %s",
291 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
292 ORTE_NAME_PRINT(sender));
293 count = 1;
294 while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &job, &count, ORTE_JOBID))) {
295
296 opal_output_verbose(5, orte_plm_base_framework.framework_output,
297 "%s plm:base:receive got update_proc_state for job %s",
298 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
299 ORTE_JOBID_PRINT(job));
300
301 name.jobid = job;
302 running = false;
303
304 jdata = orte_get_job_data_object(job);
305 count = 1;
306 while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &vpid, &count, ORTE_VPID))) {
307 if (ORTE_VPID_INVALID == vpid) {
308
309 break;
310 }
311 name.vpid = vpid;
312
313 count = 1;
314 if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &pid, &count, OPAL_PID))) {
315 ORTE_ERROR_LOG(rc);
316 goto CLEANUP;
317 }
318
319 count = 1;
320 if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &state, &count, ORTE_PROC_STATE))) {
321 ORTE_ERROR_LOG(rc);
322 goto CLEANUP;
323 }
324 if (ORTE_PROC_STATE_RUNNING == state) {
325 running = true;
326 }
327
328 count = 1;
329 if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &exit_code, &count, ORTE_EXIT_CODE))) {
330 ORTE_ERROR_LOG(rc);
331 goto CLEANUP;
332 }
333
334 OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output,
335 "%s plm:base:receive got update_proc_state for vpid %lu state %s exit_code %d",
336 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
337 (unsigned long)vpid, orte_proc_state_to_str(state), (int)exit_code));
338
339 if (NULL != jdata) {
340
341 if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, vpid))) {
342 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
343 ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
344 goto CLEANUP;
345 }
346
347
348
349 proc->pid = pid;
350 proc->exit_code = exit_code;
351 ORTE_ACTIVATE_PROC_STATE(&name, state);
352 }
353 }
354
355 if (running && NULL != jdata) {
356 jdata->num_daemons_reported++;
357 if (orte_report_launch_progress) {
358 if (0 == jdata->num_daemons_reported % 100 ||
359 jdata->num_daemons_reported == orte_process_info.num_procs) {
360 ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_REPORT_PROGRESS);
361 }
362 }
363 }
364
365 count = 1;
366 }
367 if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
368 ORTE_ERROR_LOG(rc);
369 } else {
370 rc = ORTE_SUCCESS;
371 }
372 break;
373
374 case ORTE_PLM_REGISTERED_CMD:
375 count=1;
376 if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &job, &count, ORTE_JOBID))) {
377 ORTE_ERROR_LOG(rc);
378 goto CLEANUP;
379 }
380 name.jobid = job;
381
382 if (NULL == (jdata = orte_get_job_data_object(job))) {
383 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
384 rc = ORTE_ERR_NOT_FOUND;
385 goto CLEANUP;
386 }
387 count=1;
388 while (ORTE_SUCCESS == opal_dss.unpack(buffer, &vpid, &count, ORTE_VPID)) {
389 name.vpid = vpid;
390 ORTE_ACTIVATE_PROC_STATE(&name, ORTE_PROC_STATE_REGISTERED);
391 count=1;
392 }
393 break;
394
395 default:
396 ORTE_ERROR_LOG(ORTE_ERR_VALUE_OUT_OF_BOUNDS);
397 rc = ORTE_ERR_VALUE_OUT_OF_BOUNDS;
398 break;
399 }
400
401 CLEANUP:
402
403 if (ORTE_PROC_IS_HNP && ORTE_SUCCESS != rc) {
404 jdata = NULL;
405 ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
406 }
407
408 OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output,
409 "%s plm:base:receive done processing commands",
410 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
411 }
412
413
414 void orte_plm_base_receive_process_msg(int fd, short event, void *data)
415 {
416 assert(0);
417 }