This source file includes following definitions.
- orte_state_base_activate_job_state
- orte_state_base_add_job_state
- orte_state_base_set_job_state_callback
- orte_state_base_set_job_state_priority
- orte_state_base_remove_job_state
- orte_state_base_print_job_state_machine
- orte_state_base_activate_proc_state
- orte_state_base_add_proc_state
- orte_state_base_set_proc_state_callback
- orte_state_base_set_proc_state_priority
- orte_state_base_remove_proc_state
- orte_state_base_print_proc_state_machine
- cleanup_node
- orte_state_base_local_launch_complete
- orte_state_base_cleanup_job
- orte_state_base_report_progress
- orte_state_base_notify_data_server
- _send_notification
- orte_state_base_track_procs
- orte_state_base_check_all_complete
- orte_state_base_check_fds
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 #include "orte_config.h"
16 #include "orte/constants.h"
17
18 #if HAVE_UNISTD_H
19 #include <unistd.h>
20 #endif
21 #if HAVE_FCNTL_H
22 #include <fcntl.h>
23 #endif
24
25 #include "opal/class/opal_list.h"
26 #include "opal/mca/event/event.h"
27 #include "opal/mca/pmix/pmix.h"
28 #include "opal/util/argv.h"
29
30 #include "orte/orted/pmix/pmix_server_internal.h"
31 #include "orte/runtime/orte_data_server.h"
32 #include "orte/runtime/orte_globals.h"
33 #include "orte/runtime/orte_wait.h"
34 #include "orte/mca/errmgr/errmgr.h"
35 #include "orte/mca/grpcomm/grpcomm.h"
36 #include "orte/mca/iof/base/base.h"
37 #include "orte/mca/rmaps/rmaps_types.h"
38 #include "orte/mca/plm/plm.h"
39 #include "orte/mca/rml/rml.h"
40 #include "orte/mca/routed/routed.h"
41 #include "orte/util/session_dir.h"
42 #include "orte/util/threads.h"
43 #include "orte/util/show_help.h"
44
45 #include "orte/mca/state/base/base.h"
46 #include "orte/mca/state/base/state_private.h"
47
48 void orte_state_base_activate_job_state(orte_job_t *jdata,
49 orte_job_state_t state)
50 {
51 opal_list_item_t *itm, *any=NULL, *error=NULL;
52 orte_state_t *s;
53 orte_state_caddy_t *caddy;
54
55 for (itm = opal_list_get_first(&orte_job_states);
56 itm != opal_list_get_end(&orte_job_states);
57 itm = opal_list_get_next(itm)) {
58 s = (orte_state_t*)itm;
59 if (s->job_state == ORTE_JOB_STATE_ANY) {
60
61 any = itm;
62 }
63 if (s->job_state == ORTE_JOB_STATE_ERROR) {
64 error = itm;
65 }
66 if (s->job_state == state) {
67 OPAL_OUTPUT_VERBOSE((1, orte_state_base_framework.framework_output,
68 "%s ACTIVATING JOB %s STATE %s PRI %d",
69 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
70 (NULL == jdata) ? "NULL" : ORTE_JOBID_PRINT(jdata->jobid),
71 orte_job_state_to_str(state), s->priority));
72 if (NULL == s->cbfunc) {
73 OPAL_OUTPUT_VERBOSE((1, orte_state_base_framework.framework_output,
74 "%s NULL CBFUNC FOR JOB %s STATE %s",
75 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
76 (NULL == jdata) ? "ALL" : ORTE_JOBID_PRINT(jdata->jobid),
77 orte_job_state_to_str(state)));
78 return;
79 }
80 caddy = OBJ_NEW(orte_state_caddy_t);
81 if (NULL != jdata) {
82 caddy->jdata = jdata;
83 caddy->job_state = state;
84 OBJ_RETAIN(jdata);
85 }
86 ORTE_THREADSHIFT(caddy, orte_event_base, s->cbfunc, s->priority);
87 return;
88 }
89 }
90
91
92
93 if (ORTE_JOB_STATE_ERROR < state && NULL != error) {
94 s = (orte_state_t*)error;
95 } else if (NULL != any) {
96 s = (orte_state_t*)any;
97 } else {
98 OPAL_OUTPUT_VERBOSE((1, orte_state_base_framework.framework_output,
99 "ACTIVATE: ANY STATE NOT FOUND"));
100 return;
101 }
102 if (NULL == s->cbfunc) {
103 OPAL_OUTPUT_VERBOSE((1, orte_state_base_framework.framework_output,
104 "ACTIVATE: ANY STATE HANDLER NOT DEFINED"));
105 return;
106 }
107 caddy = OBJ_NEW(orte_state_caddy_t);
108 if (NULL != jdata) {
109 caddy->jdata = jdata;
110 caddy->job_state = state;
111 OBJ_RETAIN(jdata);
112 }
113 OPAL_OUTPUT_VERBOSE((1, orte_state_base_framework.framework_output,
114 "%s ACTIVATING JOB %s STATE %s PRI %d",
115 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
116 (NULL == jdata) ? "NULL" : ORTE_JOBID_PRINT(jdata->jobid),
117 orte_job_state_to_str(state), s->priority));
118 ORTE_THREADSHIFT(caddy, orte_event_base, s->cbfunc, s->priority);
119 }
120
121
122 int orte_state_base_add_job_state(orte_job_state_t state,
123 orte_state_cbfunc_t cbfunc,
124 int priority)
125 {
126 opal_list_item_t *item;
127 orte_state_t *st;
128
129
130 for (item = opal_list_get_first(&orte_job_states);
131 item != opal_list_get_end(&orte_job_states);
132 item = opal_list_get_next(item)) {
133 st = (orte_state_t*)item;
134 if (st->job_state == state) {
135 OPAL_OUTPUT_VERBOSE((1, orte_state_base_framework.framework_output,
136 "DUPLICATE STATE DEFINED: %s",
137 orte_job_state_to_str(state)));
138 return ORTE_ERR_BAD_PARAM;
139 }
140 }
141
142 st = OBJ_NEW(orte_state_t);
143 st->job_state = state;
144 st->cbfunc = cbfunc;
145 st->priority = priority;
146 opal_list_append(&orte_job_states, &(st->super));
147
148 return ORTE_SUCCESS;
149 }
150
151 int orte_state_base_set_job_state_callback(orte_job_state_t state,
152 orte_state_cbfunc_t cbfunc)
153 {
154 opal_list_item_t *item;
155 orte_state_t *st;
156
157 for (item = opal_list_get_first(&orte_job_states);
158 item != opal_list_get_end(&orte_job_states);
159 item = opal_list_get_next(item)) {
160 st = (orte_state_t*)item;
161 if (st->job_state == state) {
162 st->cbfunc = cbfunc;
163 return ORTE_SUCCESS;
164 }
165 }
166
167
168 st = OBJ_NEW(orte_state_t);
169 st->job_state = state;
170 st->cbfunc = cbfunc;
171 st->priority = ORTE_SYS_PRI;
172 opal_list_append(&orte_job_states, &(st->super));
173
174 return ORTE_SUCCESS;
175 }
176
177 int orte_state_base_set_job_state_priority(orte_job_state_t state,
178 int priority)
179 {
180 opal_list_item_t *item;
181 orte_state_t *st;
182
183 for (item = opal_list_get_first(&orte_job_states);
184 item != opal_list_get_end(&orte_job_states);
185 item = opal_list_get_next(item)) {
186 st = (orte_state_t*)item;
187 if (st->job_state == state) {
188 st->priority = priority;
189 return ORTE_SUCCESS;
190 }
191 }
192 return ORTE_ERR_NOT_FOUND;
193 }
194
195 int orte_state_base_remove_job_state(orte_job_state_t state)
196 {
197 opal_list_item_t *item;
198 orte_state_t *st;
199
200 for (item = opal_list_get_first(&orte_job_states);
201 item != opal_list_get_end(&orte_job_states);
202 item = opal_list_get_next(item)) {
203 st = (orte_state_t*)item;
204 if (st->job_state == state) {
205 opal_list_remove_item(&orte_job_states, item);
206 OBJ_RELEASE(item);
207 return ORTE_SUCCESS;
208 }
209 }
210 return ORTE_ERR_NOT_FOUND;
211 }
212
213 void orte_state_base_print_job_state_machine(void)
214 {
215 opal_list_item_t *item;
216 orte_state_t *st;
217
218 opal_output(0, "ORTE_JOB_STATE_MACHINE:");
219 for (item = opal_list_get_first(&orte_job_states);
220 item != opal_list_get_end(&orte_job_states);
221 item = opal_list_get_next(item)) {
222 st = (orte_state_t*)item;
223 opal_output(0, "\tState: %s cbfunc: %s",
224 orte_job_state_to_str(st->job_state),
225 (NULL == st->cbfunc) ? "NULL" : "DEFINED");
226 }
227 }
228
229
230
231 void orte_state_base_activate_proc_state(orte_process_name_t *proc,
232 orte_proc_state_t state)
233 {
234 opal_list_item_t *itm, *any=NULL, *error=NULL;
235 orte_state_t *s;
236 orte_state_caddy_t *caddy;
237
238 for (itm = opal_list_get_first(&orte_proc_states);
239 itm != opal_list_get_end(&orte_proc_states);
240 itm = opal_list_get_next(itm)) {
241 s = (orte_state_t*)itm;
242 if (s->proc_state == ORTE_PROC_STATE_ANY) {
243
244 any = itm;
245 }
246 if (s->proc_state == ORTE_PROC_STATE_ERROR) {
247 error = itm;
248 }
249 if (s->proc_state == state) {
250 OPAL_OUTPUT_VERBOSE((1, orte_state_base_framework.framework_output,
251 "%s ACTIVATING PROC %s STATE %s PRI %d",
252 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
253 ORTE_NAME_PRINT(proc),
254 orte_proc_state_to_str(state), s->priority));
255 if (NULL == s->cbfunc) {
256 OPAL_OUTPUT_VERBOSE((1, orte_state_base_framework.framework_output,
257 "%s NULL CBFUNC FOR PROC %s STATE %s",
258 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
259 ORTE_NAME_PRINT(proc),
260 orte_proc_state_to_str(state)));
261 return;
262 }
263 caddy = OBJ_NEW(orte_state_caddy_t);
264 caddy->name = *proc;
265 caddy->proc_state = state;
266 ORTE_THREADSHIFT(caddy, orte_event_base, s->cbfunc, s->priority);
267 return;
268 }
269 }
270
271
272
273 if (ORTE_PROC_STATE_ERROR < state && NULL != error) {
274 s = (orte_state_t*)error;
275 } else if (NULL != any) {
276 s = (orte_state_t*)any;
277 } else {
278 OPAL_OUTPUT_VERBOSE((1, orte_state_base_framework.framework_output,
279 "INCREMENT: ANY STATE NOT FOUND"));
280 return;
281 }
282 if (NULL == s->cbfunc) {
283 OPAL_OUTPUT_VERBOSE((1, orte_state_base_framework.framework_output,
284 "ACTIVATE: ANY STATE HANDLER NOT DEFINED"));
285 return;
286 }
287 caddy = OBJ_NEW(orte_state_caddy_t);
288 caddy->name = *proc;
289 caddy->proc_state = state;
290 OPAL_OUTPUT_VERBOSE((1, orte_state_base_framework.framework_output,
291 "%s ACTIVATING PROC %s STATE %s PRI %d",
292 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
293 ORTE_NAME_PRINT(proc),
294 orte_proc_state_to_str(state), s->priority));
295 ORTE_THREADSHIFT(caddy, orte_event_base, s->cbfunc, s->priority);
296 }
297
298 int orte_state_base_add_proc_state(orte_proc_state_t state,
299 orte_state_cbfunc_t cbfunc,
300 int priority)
301 {
302 opal_list_item_t *item;
303 orte_state_t *st;
304
305
306 for (item = opal_list_get_first(&orte_proc_states);
307 item != opal_list_get_end(&orte_proc_states);
308 item = opal_list_get_next(item)) {
309 st = (orte_state_t*)item;
310 if (st->proc_state == state) {
311 OPAL_OUTPUT_VERBOSE((1, orte_state_base_framework.framework_output,
312 "DUPLICATE STATE DEFINED: %s",
313 orte_proc_state_to_str(state)));
314 return ORTE_ERR_BAD_PARAM;
315 }
316 }
317
318 st = OBJ_NEW(orte_state_t);
319 st->proc_state = state;
320 st->cbfunc = cbfunc;
321 st->priority = priority;
322 opal_list_append(&orte_proc_states, &(st->super));
323
324 return ORTE_SUCCESS;
325 }
326
327 int orte_state_base_set_proc_state_callback(orte_proc_state_t state,
328 orte_state_cbfunc_t cbfunc)
329 {
330 opal_list_item_t *item;
331 orte_state_t *st;
332
333 for (item = opal_list_get_first(&orte_proc_states);
334 item != opal_list_get_end(&orte_proc_states);
335 item = opal_list_get_next(item)) {
336 st = (orte_state_t*)item;
337 if (st->proc_state == state) {
338 st->cbfunc = cbfunc;
339 return ORTE_SUCCESS;
340 }
341 }
342 return ORTE_ERR_NOT_FOUND;
343 }
344
345 int orte_state_base_set_proc_state_priority(orte_proc_state_t state,
346 int priority)
347 {
348 opal_list_item_t *item;
349 orte_state_t *st;
350
351 for (item = opal_list_get_first(&orte_proc_states);
352 item != opal_list_get_end(&orte_proc_states);
353 item = opal_list_get_next(item)) {
354 st = (orte_state_t*)item;
355 if (st->proc_state == state) {
356 st->priority = priority;
357 return ORTE_SUCCESS;
358 }
359 }
360 return ORTE_ERR_NOT_FOUND;
361 }
362
363 int orte_state_base_remove_proc_state(orte_proc_state_t state)
364 {
365 opal_list_item_t *item;
366 orte_state_t *st;
367
368 for (item = opal_list_get_first(&orte_proc_states);
369 item != opal_list_get_end(&orte_proc_states);
370 item = opal_list_get_next(item)) {
371 st = (orte_state_t*)item;
372 if (st->proc_state == state) {
373 opal_list_remove_item(&orte_proc_states, item);
374 OBJ_RELEASE(item);
375 return ORTE_SUCCESS;
376 }
377 }
378 return ORTE_ERR_NOT_FOUND;
379 }
380
381 void orte_state_base_print_proc_state_machine(void)
382 {
383 opal_list_item_t *item;
384 orte_state_t *st;
385
386 opal_output(0, "ORTE_PROC_STATE_MACHINE:");
387 for (item = opal_list_get_first(&orte_proc_states);
388 item != opal_list_get_end(&orte_proc_states);
389 item = opal_list_get_next(item)) {
390 st = (orte_state_t*)item;
391 opal_output(0, "\tState: %s cbfunc: %s",
392 orte_proc_state_to_str(st->proc_state),
393 (NULL == st->cbfunc) ? "NULL" : "DEFINED");
394 }
395 }
396
397 static void cleanup_node(orte_proc_t *proc)
398 {
399 orte_node_t *node;
400 orte_proc_t *p;
401 int i;
402
403 OPAL_OUTPUT_VERBOSE((2, orte_state_base_framework.framework_output,
404 "%s state:base:cleanup_node on proc %s",
405 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
406 ORTE_NAME_PRINT(&proc->name)));
407
408 if (NULL == (node = proc->node)) {
409 return;
410 }
411 if (!ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_TOOL)) {
412 node->num_procs--;
413 node->slots_inuse--;
414 }
415 for (i=0; i < node->procs->size; i++) {
416 if (NULL == (p = (orte_proc_t*)opal_pointer_array_get_item(node->procs, i))) {
417 continue;
418 }
419 if (p->name.jobid == proc->name.jobid &&
420 p->name.vpid == proc->name.vpid) {
421 opal_pointer_array_set_item(node->procs, i, NULL);
422 OBJ_RELEASE(p);
423 break;
424 }
425 }
426 }
427
428 void orte_state_base_local_launch_complete(int fd, short argc, void *cbdata)
429 {
430 orte_state_caddy_t *state = (orte_state_caddy_t*)cbdata;
431 orte_job_t *jdata = state->jdata;
432
433 if (orte_report_launch_progress) {
434 if (0 == jdata->num_daemons_reported % 100 ||
435 jdata->num_daemons_reported == orte_process_info.num_procs) {
436 ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_REPORT_PROGRESS);
437 }
438 }
439 OBJ_RELEASE(state);
440 }
441
442 void orte_state_base_cleanup_job(int fd, short argc, void *cbdata)
443 {
444 orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
445 orte_job_t *jdata;
446
447 ORTE_ACQUIRE_OBJECT(caddy);
448 jdata = caddy->jdata;
449
450 OPAL_OUTPUT_VERBOSE((2, orte_state_base_framework.framework_output,
451 "%s state:base:cleanup on job %s",
452 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
453 (NULL == jdata) ? "NULL" : ORTE_JOBID_PRINT(jdata->jobid)));
454
455
456 jdata->state = ORTE_JOB_STATE_NOTIFIED;
457
458 ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_TERMINATED);
459 OBJ_RELEASE(caddy);
460 }
461
462 void orte_state_base_report_progress(int fd, short argc, void *cbdata)
463 {
464 orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
465 orte_job_t *jdata;
466
467 ORTE_ACQUIRE_OBJECT(caddy);
468 jdata = caddy->jdata;
469
470 opal_output(orte_clean_output, "App launch reported: %d (out of %d) daemons - %d (out of %d) procs",
471 (int)jdata->num_daemons_reported, (int)orte_process_info.num_procs,
472 (int)jdata->num_launched, (int)jdata->num_procs);
473 OBJ_RELEASE(caddy);
474 }
475
476 void orte_state_base_notify_data_server(orte_process_name_t *target)
477 {
478 opal_buffer_t *buf;
479 int rc, room = -1;
480 uint8_t cmd = ORTE_PMIX_PURGE_PROC_CMD;
481
482
483 if (ORTE_JOBID_INVALID == orte_pmix_server_globals.server.jobid) {
484 return;
485 }
486
487 buf = OBJ_NEW(opal_buffer_t);
488
489
490 if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &room, 1, OPAL_INT))) {
491 ORTE_ERROR_LOG(rc);
492 OBJ_RELEASE(buf);
493 return;
494 }
495
496
497 if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &cmd, 1, OPAL_UINT8))) {
498 ORTE_ERROR_LOG(rc);
499 OBJ_RELEASE(buf);
500 return;
501 }
502
503
504 if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, target, 1, ORTE_NAME))) {
505 ORTE_ERROR_LOG(rc);
506 OBJ_RELEASE(buf);
507 return;
508 }
509
510
511 rc = orte_rml.send_buffer_nb(&orte_pmix_server_globals.server, buf,
512 ORTE_RML_TAG_DATA_SERVER,
513 orte_rml_send_callback, NULL);
514 if (ORTE_SUCCESS != rc) {
515 OBJ_RELEASE(buf);
516 }
517 }
518
519 static void _send_notification(int status,
520 orte_proc_state_t state,
521 orte_process_name_t *proc,
522 orte_process_name_t *target)
523 {
524 opal_buffer_t *buf;
525 orte_grpcomm_signature_t sig;
526 int rc;
527 opal_value_t kv, *kvptr;
528 orte_process_name_t daemon;
529
530 buf = OBJ_NEW(opal_buffer_t);
531
532 opal_output_verbose(5, orte_state_base_framework.framework_output,
533 "%s state:base:sending notification %s proc %s target %s",
534 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
535 ORTE_ERROR_NAME(status),
536 ORTE_NAME_PRINT(proc),
537 ORTE_NAME_PRINT(target));
538
539
540 if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &status, 1, OPAL_INT))) {
541 ORTE_ERROR_LOG(rc);
542 OBJ_RELEASE(buf);
543 return;
544 }
545
546
547 if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, proc, 1, ORTE_NAME))) {
548 ORTE_ERROR_LOG(rc);
549 OBJ_RELEASE(buf);
550 return;
551 }
552
553 if (ORTE_VPID_WILDCARD == target->vpid) {
554
555 rc = 1;
556 } else {
557
558 rc = 2;
559 }
560 if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &rc, 1, OPAL_INT))) {
561 ORTE_ERROR_LOG(rc);
562 OBJ_RELEASE(buf);
563 return;
564 }
565
566
567 OBJ_CONSTRUCT(&kv, opal_value_t);
568 kv.key = strdup(OPAL_PMIX_EVENT_AFFECTED_PROC);
569 kv.type = OPAL_NAME;
570 kv.data.name.jobid = proc->jobid;
571 kv.data.name.vpid = proc->vpid;
572 kvptr = &kv;
573 if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &kvptr, 1, OPAL_VALUE))) {
574 ORTE_ERROR_LOG(rc);
575 OBJ_DESTRUCT(&kv);
576 OBJ_RELEASE(buf);
577 return;
578 }
579 OBJ_DESTRUCT(&kv);
580
581 if (ORTE_VPID_WILDCARD == target->vpid) {
582
583 OBJ_CONSTRUCT(&sig, orte_grpcomm_signature_t);
584 sig.signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t));
585 sig.signature[0].jobid = ORTE_PROC_MY_NAME->jobid;
586 sig.signature[0].vpid = ORTE_VPID_WILDCARD;
587 sig.sz = 1;
588
589 if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(&sig, ORTE_RML_TAG_NOTIFICATION, buf))) {
590 ORTE_ERROR_LOG(rc);
591 }
592 OBJ_DESTRUCT(&sig);
593 OBJ_RELEASE(buf);
594 } else {
595
596 OBJ_CONSTRUCT(&kv, opal_value_t);
597 kv.key = strdup(OPAL_PMIX_EVENT_CUSTOM_RANGE);
598 kv.type = OPAL_NAME;
599 kv.data.name.jobid = target->jobid;
600 kv.data.name.vpid = target->vpid;
601 kvptr = &kv;
602 if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &kvptr, 1, OPAL_VALUE))) {
603 ORTE_ERROR_LOG(rc);
604 OBJ_DESTRUCT(&kv);
605 OBJ_RELEASE(buf);
606 return;
607 }
608 OBJ_DESTRUCT(&kv);
609
610 daemon.jobid = ORTE_PROC_MY_NAME->jobid;
611 daemon.vpid = orte_get_proc_daemon_vpid(target);
612
613 opal_output_verbose(5, orte_state_base_framework.framework_output,
614 "%s state:base:sending notification %s to proc %s at daemon %s",
615 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
616 ORTE_ERROR_NAME(status),
617 ORTE_NAME_PRINT(target),
618 ORTE_NAME_PRINT(&daemon));
619 if (ORTE_SUCCESS != (rc = orte_rml.send_buffer_nb(&daemon, buf,
620 ORTE_RML_TAG_NOTIFICATION,
621 orte_rml_send_callback, NULL))) {
622 ORTE_ERROR_LOG(rc);
623 OBJ_RELEASE(buf);
624 }
625 }
626 }
627
628 void orte_state_base_track_procs(int fd, short argc, void *cbdata)
629 {
630 orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
631 orte_process_name_t *proc;
632 orte_proc_state_t state;
633 orte_job_t *jdata;
634 orte_proc_t *pdata;
635 int i;
636 orte_process_name_t parent, target;
637
638 ORTE_ACQUIRE_OBJECT(caddy);
639 proc = &caddy->name;
640 state = caddy->proc_state;
641
642 opal_output_verbose(5, orte_state_base_framework.framework_output,
643 "%s state:base:track_procs called for proc %s state %s",
644 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
645 ORTE_NAME_PRINT(proc),
646 orte_proc_state_to_str(state));
647
648
649 if (NULL == (jdata = orte_get_job_data_object(proc->jobid))) {
650 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
651 goto cleanup;
652 }
653 pdata = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, proc->vpid);
654
655 if (ORTE_PROC_STATE_RUNNING == state) {
656
657 if (pdata->state < ORTE_PROC_STATE_TERMINATED) {
658 pdata->state = state;
659 }
660 jdata->num_launched++;
661 if (jdata->num_launched == jdata->num_procs) {
662 if (ORTE_FLAG_TEST(jdata, ORTE_JOB_FLAG_DEBUGGER_DAEMON)) {
663 ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_READY_FOR_DEBUGGERS);
664 } else {
665 ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_RUNNING);
666 }
667 }
668 } else if (ORTE_PROC_STATE_REGISTERED == state) {
669
670 if (pdata->state < ORTE_PROC_STATE_TERMINATED) {
671 pdata->state = state;
672 }
673 jdata->num_reported++;
674 if (jdata->num_reported == jdata->num_procs) {
675 ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_REGISTERED);
676 }
677 } else if (ORTE_PROC_STATE_IOF_COMPLETE == state) {
678
679 if (pdata->state < ORTE_PROC_STATE_TERMINATED) {
680 pdata->state = state;
681 }
682
683 if (NULL != orte_iof.close) {
684 orte_iof.close(proc, ORTE_IOF_STDALL);
685 }
686 ORTE_FLAG_SET(pdata, ORTE_PROC_FLAG_IOF_COMPLETE);
687 if (ORTE_FLAG_TEST(pdata, ORTE_PROC_FLAG_WAITPID)) {
688 ORTE_ACTIVATE_PROC_STATE(proc, ORTE_PROC_STATE_TERMINATED);
689 }
690 } else if (ORTE_PROC_STATE_WAITPID_FIRED == state) {
691
692 if (pdata->state < ORTE_PROC_STATE_TERMINATED) {
693 pdata->state = state;
694 }
695 ORTE_FLAG_SET(pdata, ORTE_PROC_FLAG_WAITPID);
696 if (ORTE_FLAG_TEST(pdata, ORTE_PROC_FLAG_IOF_COMPLETE)) {
697 ORTE_ACTIVATE_PROC_STATE(proc, ORTE_PROC_STATE_TERMINATED);
698 }
699 } else if (ORTE_PROC_STATE_TERMINATED == state) {
700
701 ORTE_FLAG_UNSET(pdata, ORTE_PROC_FLAG_ALIVE);
702 if (pdata->state < ORTE_PROC_STATE_TERMINATED) {
703 pdata->state = state;
704 }
705 if (ORTE_FLAG_TEST(pdata, ORTE_PROC_FLAG_LOCAL)) {
706
707 opal_pmix.server_deregister_client(proc, NULL, NULL);
708
709
710
711
712 orte_session_dir_finalize(proc);
713 }
714
715
716
717
718 if (orte_orteds_term_ordered &&
719 0 == orte_routed.num_routes()) {
720 for (i=0; i < orte_local_children->size; i++) {
721 if (NULL != (pdata = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i)) &&
722 ORTE_FLAG_TEST(pdata, ORTE_PROC_FLAG_ALIVE)) {
723
724 goto cleanup;
725 }
726 }
727
728 OPAL_OUTPUT_VERBOSE((5, orte_state_base_framework.framework_output,
729 "%s state:base all routes and children gone - exiting",
730 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
731 ORTE_ACTIVATE_JOB_STATE(NULL, ORTE_JOB_STATE_DAEMONS_TERMINATED);
732 goto cleanup;
733 }
734
735 cleanup_node(pdata);
736
737 jdata->num_terminated++;
738 if (jdata->num_terminated == jdata->num_procs) {
739
740 if (orte_state_base_run_fdcheck) {
741 orte_state_base_check_fds(jdata);
742 }
743
744
745 if (NULL != orte_data_server_uri) {
746 target.jobid = jdata->jobid;
747 target.vpid = ORTE_VPID_WILDCARD;
748 orte_state_base_notify_data_server(&target);
749 }
750 ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_TERMINATED);
751 } else if (ORTE_PROC_STATE_TERMINATED < pdata->state &&
752 !orte_job_term_ordered) {
753
754 parent.jobid = jdata->jobid;
755 parent.vpid = ORTE_VPID_WILDCARD;
756 _send_notification(OPAL_ERR_PROC_ABORTED, pdata->state, &pdata->name, &parent);
757 }
758 }
759
760 cleanup:
761 OBJ_RELEASE(caddy);
762 }
763
764 void orte_state_base_check_all_complete(int fd, short args, void *cbdata)
765 {
766 orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
767 orte_job_t *jdata;
768 orte_proc_t *proc;
769 int i;
770 orte_std_cntr_t j;
771 orte_job_t *job;
772 orte_node_t *node;
773 orte_job_map_t *map;
774 orte_std_cntr_t index;
775 bool one_still_alive;
776 orte_vpid_t lowest=0;
777 int32_t i32, *i32ptr;
778 uint32_t u32;
779 void *nptr;
780
781 ORTE_ACQUIRE_OBJECT(caddy);
782 jdata = caddy->jdata;
783
784 opal_output_verbose(2, orte_state_base_framework.framework_output,
785 "%s state:base:check_job_complete on job %s",
786 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
787 (NULL == jdata) ? "NULL" : ORTE_JOBID_PRINT(jdata->jobid));
788
789 if (NULL == jdata || jdata->jobid == ORTE_PROC_MY_NAME->jobid) {
790
791 OPAL_OUTPUT_VERBOSE((2, orte_state_base_framework.framework_output,
792 "%s state:base:check_job_complete - received NULL job, checking daemons",
793 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
794 goto CHECK_DAEMONS;
795 } else {
796
797
798
799 if (jdata->state < ORTE_JOB_STATE_UNTERMINATED) {
800 jdata->state = ORTE_JOB_STATE_TERMINATED;
801 }
802 }
803
804
805 if (NULL != orte_iof.complete) {
806 orte_iof.complete(jdata);
807 }
808
809
810 if (NULL != opal_pmix.server_deregister_nspace) {
811 opal_pmix.server_deregister_nspace(jdata->jobid, NULL, NULL);
812 }
813
814 i32ptr = &i32;
815 if (orte_get_attribute(&jdata->attributes, ORTE_JOB_NUM_NONZERO_EXIT, (void**)&i32ptr, OPAL_INT32) && !orte_abort_non_zero_exit) {
816 if (!orte_report_child_jobs_separately || 1 == ORTE_LOCAL_JOBID(jdata->jobid)) {
817
818 ORTE_UPDATE_EXIT_STATUS(lowest);
819 }
820
821
822 orte_show_help("help-state-base.txt", "normal-termination-but", true,
823 (1 == ORTE_LOCAL_JOBID(jdata->jobid)) ? "the primary" : "child",
824 (1 == ORTE_LOCAL_JOBID(jdata->jobid)) ? "" : ORTE_LOCAL_JOBID_PRINT(jdata->jobid),
825 i32, (1 == i32) ? "process returned\na non-zero exit code." :
826 "processes returned\nnon-zero exit codes.");
827 }
828
829 OPAL_OUTPUT_VERBOSE((2, orte_state_base_framework.framework_output,
830 "%s state:base:check_job_completed declared job %s terminated with state %s - checking all jobs",
831 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
832 ORTE_JOBID_PRINT(jdata->jobid),
833 orte_job_state_to_str(jdata->state)));
834
835
836
837
838 if (NULL != jdata &&
839 (orte_get_attribute(&jdata->attributes, ORTE_JOB_CONTINUOUS_OP, NULL, OPAL_BOOL) ||
840 ORTE_FLAG_TEST(jdata, ORTE_JOB_FLAG_RECOVERABLE))) {
841 goto CHECK_ALIVE;
842 }
843
844
845
846
847
848
849
850
851
852
853
854 CHECK_DAEMONS:
855 if (jdata == NULL || jdata->jobid == ORTE_PROC_MY_NAME->jobid) {
856 if (0 == orte_routed.num_routes()) {
857
858 OPAL_OUTPUT_VERBOSE((2, orte_state_base_framework.framework_output,
859 "%s orteds complete - exiting",
860 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
861 if (NULL == jdata) {
862 jdata = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid);
863 }
864 ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_DAEMONS_TERMINATED);
865 OBJ_RELEASE(caddy);
866 return;
867 }
868 OBJ_RELEASE(caddy);
869 return;
870 }
871
872
873
874
875
876
877
878
879
880 if (NULL != jdata->map && jdata->state == ORTE_JOB_STATE_TERMINATED) {
881 map = jdata->map;
882 for (index = 0; index < map->nodes->size; index++) {
883 if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, index))) {
884 continue;
885 }
886 OPAL_OUTPUT_VERBOSE((2, orte_state_base_framework.framework_output,
887 "%s releasing procs for job %s from node %s",
888 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
889 ORTE_JOBID_PRINT(jdata->jobid), node->name));
890 for (i = 0; i < node->procs->size; i++) {
891 if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(node->procs, i))) {
892 continue;
893 }
894 if (proc->name.jobid != jdata->jobid) {
895
896 continue;
897 }
898 node->slots_inuse--;
899 node->num_procs--;
900 OPAL_OUTPUT_VERBOSE((2, orte_state_base_framework.framework_output,
901 "%s releasing proc %s from node %s",
902 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
903 ORTE_NAME_PRINT(&proc->name), node->name));
904
905 opal_pointer_array_set_item(node->procs, i, NULL);
906
907 OBJ_RELEASE(proc);
908 }
909
910 opal_pointer_array_set_item(map->nodes, index, NULL);
911
912 OBJ_RELEASE(node);
913 }
914 OBJ_RELEASE(map);
915 jdata->map = NULL;
916 }
917
918 CHECK_ALIVE:
919
920
921
922 one_still_alive = false;
923 j = opal_hash_table_get_first_key_uint32(orte_job_data, &u32, (void **)&job, &nptr);
924 while (OPAL_SUCCESS == j) {
925
926 if (job->jobid == ORTE_PROC_MY_NAME->jobid ||
927 ORTE_JOB_FAMILY(job->jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
928 goto next;
929 }
930
931
932
933
934
935
936
937
938
939
940
941 if (NULL != jdata && job->jobid == jdata->jobid) {
942 if (jdata->state == ORTE_JOB_STATE_TERMINATED) {
943 OPAL_OUTPUT_VERBOSE((2, orte_state_base_framework.framework_output,
944 "%s state:base:check_job_completed state is terminated - activating notify",
945 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
946 ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_NOTIFY_COMPLETED);
947 one_still_alive = true;
948 } else if (jdata->state == ORTE_JOB_STATE_KILLED_BY_CMD ||
949 jdata->state == ORTE_JOB_STATE_NOTIFIED) {
950 OPAL_OUTPUT_VERBOSE((2, orte_state_base_framework.framework_output,
951 "%s state:base:check_job_completed state is killed or notified - cleaning up",
952 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
953
954
955
956
957 if (1 < j) {
958 if (ORTE_FLAG_TEST(jdata, ORTE_JOB_FLAG_DEBUGGER_DAEMON)) {
959
960 ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_DEBUGGER_DETACH);
961 }
962 opal_hash_table_set_value_uint32(orte_job_data, jdata->jobid, NULL);
963 OBJ_RELEASE(jdata);
964 }
965 }
966 goto next;
967 }
968
969 if (ORTE_FLAG_TEST(job, ORTE_JOB_FLAG_DO_NOT_MONITOR)) {
970 goto next;
971 }
972
973
974
975 if (ORTE_JOB_STATE_NOTIFIED != job->state) {
976
977
978
979
980 OPAL_OUTPUT_VERBOSE((2, orte_state_base_framework.framework_output,
981 "%s state:base:check_job_completed job %s is not terminated (%d:%d)",
982 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
983 ORTE_JOBID_PRINT(job->jobid),
984 job->num_terminated, job->num_procs));
985 one_still_alive = true;
986 }
987 else {
988 OPAL_OUTPUT_VERBOSE((2, orte_state_base_framework.framework_output,
989 "%s state:base:check_job_completed job %s is terminated (%d vs %d [%s])",
990 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
991 ORTE_JOBID_PRINT(job->jobid),
992 job->num_terminated, job->num_procs,
993 (NULL == jdata) ? "UNKNOWN" : orte_job_state_to_str(jdata->state) ));
994 }
995 next:
996 j = opal_hash_table_get_next_key_uint32(orte_job_data, &u32, (void **)&job, nptr, &nptr);
997 }
998
999
1000 if (one_still_alive) {
1001 OPAL_OUTPUT_VERBOSE((2, orte_state_base_framework.framework_output,
1002 "%s state:base:check_job_completed at least one job is not terminated",
1003 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
1004 OBJ_RELEASE(caddy);
1005 return;
1006 }
1007
1008 OPAL_OUTPUT_VERBOSE((2, orte_state_base_framework.framework_output,
1009 "%s state:base:check_job_completed all jobs terminated",
1010 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
1011
1012
1013 if (NULL != orte_mpiexec_timeout) {
1014 OBJ_RELEASE(orte_mpiexec_timeout);
1015 orte_mpiexec_timeout = NULL;
1016 }
1017
1018
1019
1020
1021 ORTE_UPDATE_EXIT_STATUS(0);
1022
1023
1024
1025
1026
1027 orte_plm.terminate_orteds();
1028
1029 OBJ_RELEASE(caddy);
1030 }
1031
1032
1033 void orte_state_base_check_fds(orte_job_t *jdata)
1034 {
1035 int nfds, i, fdflags, flflags;
1036 char path[1024], info[256], **list=NULL, *status, *result, *r2;
1037 ssize_t rc;
1038 struct flock fl;
1039 bool flk;
1040 int cnt = 0;
1041
1042
1043
1044 nfds = getdtablesize();
1045 result = NULL;
1046
1047 for (i=0; i < nfds; i++) {
1048 fdflags = fcntl(i, F_GETFD);
1049 if (-1 == fdflags) {
1050
1051 continue;
1052 }
1053 flflags = fcntl(i, F_GETFL);
1054 if (-1 == flflags) {
1055
1056 continue;
1057 }
1058 snprintf(path, 1024, "/proc/self/fd/%d", i);
1059 memset(info, 0, 256);
1060
1061 rc = readlink(path, info, 256);
1062 if (-1 == rc) {
1063
1064 continue;
1065 }
1066
1067 fl.l_type = F_WRLCK;
1068 fl.l_whence = 0;
1069 fl.l_start = 0;
1070 fl.l_len = 0;
1071 if (-1 == fcntl(i, F_GETLK, &fl)) {
1072 flk = false;
1073 } else {
1074 flk = true;
1075 }
1076
1077 if (fdflags & FD_CLOEXEC) {
1078 opal_argv_append_nosize(&list, "cloexec");
1079 }
1080 if (flflags & O_APPEND) {
1081 opal_argv_append_nosize(&list, "append");
1082 }
1083 if (flflags & O_NONBLOCK) {
1084 opal_argv_append_nosize(&list, "nonblock");
1085 }
1086
1087
1088
1089
1090
1091
1092 if (O_RDONLY == (flflags & 3)) {
1093 opal_argv_append_nosize(&list, "rdonly");
1094 } else if (O_WRONLY == (flflags & 3)) {
1095 opal_argv_append_nosize(&list, "wronly");
1096 } else {
1097 opal_argv_append_nosize(&list, "rdwr");
1098 }
1099 if (flk && F_UNLCK != fl.l_type) {
1100 if (F_WRLCK == fl.l_type) {
1101 opal_argv_append_nosize(&list, "wrlock");
1102 } else {
1103 opal_argv_append_nosize(&list, "rdlock");
1104 }
1105 }
1106 if (NULL != list) {
1107 status = opal_argv_join(list, ' ');
1108 opal_argv_free(list);
1109 list = NULL;
1110 if (NULL == result) {
1111 opal_asprintf(&result, " %d\t(%s)\t%s\n", i, info, status);
1112 } else {
1113 opal_asprintf(&r2, "%s %d\t(%s)\t%s\n", result, i, info, status);
1114 free(result);
1115 result = r2;
1116 }
1117 free(status);
1118 }
1119 ++cnt;
1120 }
1121 opal_asprintf(&r2, "%s: %d open file descriptors after job %d completed\n%s",
1122 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), cnt, ORTE_LOCAL_JOBID(jdata->jobid), result);
1123 opal_output(0, "%s", r2);
1124 free(result);
1125 free(r2);
1126 }