This source file includes following definitions.
- init
- finalize
- track_jobs
- track_procs
- pack_state_for_proc
- pack_state_update
1
2
3
4
5
6
7
8
9
10
11
12 #include "orte_config.h"
13
14 #include <sys/types.h>
15 #ifdef HAVE_UNISTD_H
16 #include <unistd.h>
17 #endif
18 #include <string.h>
19
20 #include "opal/util/output.h"
21 #include "opal/dss/dss.h"
22 #include "opal/mca/pmix/pmix.h"
23
24 #include "orte/mca/errmgr/errmgr.h"
25 #include "orte/mca/iof/base/base.h"
26 #include "orte/mca/odls/base/base.h"
27 #include "orte/mca/rmaps/rmaps_types.h"
28 #include "orte/mca/rml/rml.h"
29 #include "orte/mca/routed/routed.h"
30 #include "orte/util/session_dir.h"
31 #include "orte/util/threads.h"
32 #include "orte/orted/pmix/pmix_server_internal.h"
33 #include "orte/runtime/orte_data_server.h"
34 #include "orte/runtime/orte_quit.h"
35
36 #include "orte/mca/state/state.h"
37 #include "orte/mca/state/base/base.h"
38 #include "orte/mca/state/base/state_private.h"
39 #include "state_orted.h"
40
41
42
43
44 static int init(void);
45 static int finalize(void);
46
47
48
49
50 orte_state_base_module_t orte_state_orted_module = {
51 init,
52 finalize,
53 orte_state_base_activate_job_state,
54 orte_state_base_add_job_state,
55 orte_state_base_set_job_state_callback,
56 orte_state_base_set_job_state_priority,
57 orte_state_base_remove_job_state,
58 orte_state_base_activate_proc_state,
59 orte_state_base_add_proc_state,
60 orte_state_base_set_proc_state_callback,
61 orte_state_base_set_proc_state_priority,
62 orte_state_base_remove_proc_state
63 };
64
65
66 static void track_jobs(int fd, short argc, void *cbdata);
67 static void track_procs(int fd, short argc, void *cbdata);
68 static int pack_state_update(opal_buffer_t *buf, orte_job_t *jdata);
69
70
71 static orte_job_state_t job_states[] = {
72 ORTE_JOB_STATE_LOCAL_LAUNCH_COMPLETE,
73 };
74 static orte_state_cbfunc_t job_callbacks[] = {
75 track_jobs
76 };
77
78 static orte_proc_state_t proc_states[] = {
79 ORTE_PROC_STATE_RUNNING,
80 ORTE_PROC_STATE_REGISTERED,
81 ORTE_PROC_STATE_IOF_COMPLETE,
82 ORTE_PROC_STATE_WAITPID_FIRED,
83 ORTE_PROC_STATE_TERMINATED
84 };
85 static orte_state_cbfunc_t proc_callbacks[] = {
86 track_procs,
87 track_procs,
88 track_procs,
89 track_procs,
90 track_procs
91 };
92
93
94
95
96 static int init(void)
97 {
98 int num_states, i, rc;
99
100
101 OBJ_CONSTRUCT(&orte_job_states, opal_list_t);
102 OBJ_CONSTRUCT(&orte_proc_states, opal_list_t);
103
104 num_states = sizeof(job_states) / sizeof(orte_job_state_t);
105 for (i=0; i < num_states; i++) {
106 if (ORTE_SUCCESS != (rc = orte_state.add_job_state(job_states[i],
107 job_callbacks[i],
108 ORTE_SYS_PRI))) {
109 ORTE_ERROR_LOG(rc);
110 }
111 }
112
113 if (ORTE_SUCCESS != (rc = orte_state.add_job_state(ORTE_JOB_STATE_FORCED_EXIT,
114 orte_quit, ORTE_ERROR_PRI))) {
115 ORTE_ERROR_LOG(rc);
116 }
117
118 if (ORTE_SUCCESS != (rc = orte_state.add_job_state(ORTE_JOB_STATE_DAEMONS_TERMINATED,
119 orte_quit, ORTE_SYS_PRI))) {
120 ORTE_ERROR_LOG(rc);
121 }
122 if (5 < opal_output_get_verbosity(orte_state_base_framework.framework_output)) {
123 orte_state_base_print_job_state_machine();
124 }
125
126
127
128
129 num_states = sizeof(proc_states) / sizeof(orte_proc_state_t);
130 for (i=0; i < num_states; i++) {
131 if (ORTE_SUCCESS != (rc = orte_state.add_proc_state(proc_states[i],
132 proc_callbacks[i],
133 ORTE_SYS_PRI))) {
134 ORTE_ERROR_LOG(rc);
135 }
136 }
137 if (5 < opal_output_get_verbosity(orte_state_base_framework.framework_output)) {
138 orte_state_base_print_proc_state_machine();
139 }
140 return ORTE_SUCCESS;
141 }
142
143 static int finalize(void)
144 {
145 opal_list_item_t *item;
146
147
148 while (NULL != (item = opal_list_remove_first(&orte_job_states))) {
149 OBJ_RELEASE(item);
150 }
151 OBJ_DESTRUCT(&orte_job_states);
152 while (NULL != (item = opal_list_remove_first(&orte_proc_states))) {
153 OBJ_RELEASE(item);
154 }
155 OBJ_DESTRUCT(&orte_proc_states);
156
157 return ORTE_SUCCESS;
158 }
159
160 static void track_jobs(int fd, short argc, void *cbdata)
161 {
162 orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
163 opal_buffer_t *alert;
164 orte_plm_cmd_flag_t cmd;
165 int rc, i;
166 orte_proc_state_t running = ORTE_PROC_STATE_RUNNING;
167 orte_proc_t *child;
168 orte_vpid_t null=ORTE_VPID_INVALID;
169
170 ORTE_ACQUIRE_OBJECT(caddy);
171
172 if (ORTE_JOB_STATE_LOCAL_LAUNCH_COMPLETE == caddy->job_state) {
173 OPAL_OUTPUT_VERBOSE((5, orte_state_base_framework.framework_output,
174 "%s state:orted:track_jobs sending local launch complete for job %s",
175 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
176 ORTE_JOBID_PRINT(caddy->jdata->jobid)));
177
178 alert = OBJ_NEW(opal_buffer_t);
179
180 cmd = ORTE_PLM_UPDATE_PROC_STATE;
181 if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &cmd, 1, ORTE_PLM_CMD))) {
182 ORTE_ERROR_LOG(rc);
183 OBJ_RELEASE(alert);
184 goto cleanup;
185 }
186
187 if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &caddy->jdata->jobid, 1, ORTE_JOBID))) {
188 ORTE_ERROR_LOG(rc);
189 OBJ_RELEASE(alert);
190 goto cleanup;
191 }
192 for (i=0; i < orte_local_children->size; i++) {
193 if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
194 continue;
195 }
196
197 if (child->name.jobid == caddy->jdata->jobid) {
198
199 if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &(child->name.vpid), 1, ORTE_VPID))) {
200 ORTE_ERROR_LOG(rc);
201 OBJ_RELEASE(alert);
202 goto cleanup;
203 }
204
205 if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &child->pid, 1, OPAL_PID))) {
206 ORTE_ERROR_LOG(rc);
207 OBJ_RELEASE(alert);
208 goto cleanup;
209 }
210
211 if (ORTE_PROC_STATE_UNTERMINATED < child->state) {
212 if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &child->state, 1, ORTE_PROC_STATE))) {
213 ORTE_ERROR_LOG(rc);
214 OBJ_RELEASE(alert);
215 goto cleanup;
216 }
217 } else {
218
219 if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &running, 1, ORTE_PROC_STATE))) {
220 ORTE_ERROR_LOG(rc);
221 OBJ_RELEASE(alert);
222 goto cleanup;
223 }
224 }
225
226 if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &child->exit_code, 1, ORTE_EXIT_CODE))) {
227 ORTE_ERROR_LOG(rc);
228 OBJ_RELEASE(alert);
229 goto cleanup;
230 }
231 }
232 }
233
234
235 if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &null, 1, ORTE_VPID))) {
236 ORTE_ERROR_LOG(rc);
237 OBJ_RELEASE(alert);
238 goto cleanup;
239 }
240
241
242 if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, alert,
243 ORTE_RML_TAG_PLM,
244 orte_rml_send_callback, NULL))) {
245 ORTE_ERROR_LOG(rc);
246 OBJ_RELEASE(alert);
247 }
248 }
249
250 cleanup:
251 OBJ_RELEASE(caddy);
252 }
253
254 static void track_procs(int fd, short argc, void *cbdata)
255 {
256 orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
257 orte_process_name_t *proc;
258 orte_proc_state_t state;
259 orte_job_t *jdata;
260 orte_proc_t *pdata, *pptr;
261 opal_buffer_t *alert;
262 int rc, i;
263 orte_plm_cmd_flag_t cmd;
264 orte_std_cntr_t index;
265 orte_job_map_t *map;
266 orte_node_t *node;
267 orte_process_name_t target;
268
269 ORTE_ACQUIRE_OBJECT(caddy);
270 proc = &caddy->name;
271 state = caddy->proc_state;
272
273 OPAL_OUTPUT_VERBOSE((5, orte_state_base_framework.framework_output,
274 "%s state:orted:track_procs called for proc %s state %s",
275 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
276 ORTE_NAME_PRINT(proc),
277 orte_proc_state_to_str(state)));
278
279
280 if (NULL == (jdata = orte_get_job_data_object(proc->jobid))) {
281 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
282 goto cleanup;
283 }
284 pdata = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, proc->vpid);
285
286 if (ORTE_PROC_STATE_RUNNING == state) {
287
288 pdata->state = state;
289 jdata->num_launched++;
290 if (jdata->num_launched == jdata->num_local_procs) {
291
292
293
294
295 ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_LOCAL_LAUNCH_COMPLETE);
296 }
297
298 } else if (ORTE_PROC_STATE_REGISTERED == state) {
299
300 pdata->state = state;
301 jdata->num_reported++;
302 if (jdata->num_reported == jdata->num_local_procs) {
303
304
305 OPAL_OUTPUT_VERBOSE((5, orte_state_base_framework.framework_output,
306 "%s state:orted: notifying HNP all local registered",
307 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
308
309 alert = OBJ_NEW(opal_buffer_t);
310
311 cmd = ORTE_PLM_REGISTERED_CMD;
312 if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &cmd, 1, ORTE_PLM_CMD))) {
313 ORTE_ERROR_LOG(rc);
314 goto cleanup;
315 }
316
317 if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &proc->jobid, 1, ORTE_JOBID))) {
318 ORTE_ERROR_LOG(rc);
319 goto cleanup;
320 }
321
322 for (i=0; i < orte_local_children->size; i++) {
323 if (NULL == (pptr = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
324 continue;
325 }
326 if (pptr->name.jobid == proc->jobid) {
327 if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &pptr->name.vpid, 1, ORTE_VPID))) {
328 ORTE_ERROR_LOG(rc);
329 goto cleanup;
330 }
331 }
332 }
333
334 if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, alert,
335 ORTE_RML_TAG_PLM,
336 orte_rml_send_callback, NULL))) {
337 ORTE_ERROR_LOG(rc);
338 } else {
339 rc = ORTE_SUCCESS;
340 }
341 }
342 } else if (ORTE_PROC_STATE_IOF_COMPLETE == state) {
343
344
345
346
347 ORTE_FLAG_SET(pdata, ORTE_PROC_FLAG_IOF_COMPLETE);
348
349
350
351
352
353
354
355 if (NULL != orte_iof.close) {
356 orte_iof.close(proc, ORTE_IOF_STDALL);
357 }
358 if (ORTE_FLAG_TEST(pdata, ORTE_PROC_FLAG_WAITPID) &&
359 !ORTE_FLAG_TEST(pdata, ORTE_PROC_FLAG_RECORDED)) {
360 ORTE_ACTIVATE_PROC_STATE(proc, ORTE_PROC_STATE_TERMINATED);
361 }
362 } else if (ORTE_PROC_STATE_WAITPID_FIRED == state) {
363
364
365
366
367 ORTE_FLAG_SET(pdata, ORTE_PROC_FLAG_WAITPID);
368 if (ORTE_FLAG_TEST(pdata, ORTE_PROC_FLAG_IOF_COMPLETE) &&
369 !ORTE_FLAG_TEST(pdata, ORTE_PROC_FLAG_RECORDED)) {
370 ORTE_ACTIVATE_PROC_STATE(proc, ORTE_PROC_STATE_TERMINATED);
371 }
372 } else if (ORTE_PROC_STATE_TERMINATED == state) {
373
374
375 if (!ORTE_FLAG_TEST(pdata, ORTE_PROC_FLAG_RECORDED)) {
376 jdata->num_terminated++;
377 }
378
379 ORTE_FLAG_SET(pdata, ORTE_PROC_FLAG_RECORDED);
380 ORTE_FLAG_UNSET(pdata, ORTE_PROC_FLAG_ALIVE);
381 pdata->state = state;
382
383
384
385
386 orte_session_dir_finalize(proc);
387
388
389
390
391 if (orte_orteds_term_ordered &&
392 0 == orte_routed.num_routes()) {
393 for (i=0; i < orte_local_children->size; i++) {
394 if (NULL != (pdata = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i)) &&
395 ORTE_FLAG_TEST(pdata, ORTE_PROC_FLAG_ALIVE)) {
396
397 OPAL_OUTPUT_VERBOSE((5, orte_state_base_framework.framework_output,
398 "%s state:orted all routes gone but proc %s still alive",
399 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
400 ORTE_NAME_PRINT(&pdata->name)));
401 goto cleanup;
402 }
403 }
404
405 OPAL_OUTPUT_VERBOSE((5, orte_state_base_framework.framework_output,
406 "%s state:orted all routes and children gone - exiting",
407 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
408 ORTE_ACTIVATE_JOB_STATE(NULL, ORTE_JOB_STATE_DAEMONS_TERMINATED);
409 goto cleanup;
410 }
411
412 if (jdata->num_terminated == jdata->num_local_procs &&
413 !orte_get_attribute(&jdata->attributes, ORTE_JOB_TERM_NOTIFIED, NULL, OPAL_BOOL)) {
414
415 cmd = ORTE_PLM_UPDATE_PROC_STATE;
416 alert = OBJ_NEW(opal_buffer_t);
417 if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &cmd, 1, ORTE_PLM_CMD))) {
418 ORTE_ERROR_LOG(rc);
419 goto cleanup;
420 }
421
422 if (ORTE_SUCCESS != (rc = pack_state_update(alert, jdata))) {
423 ORTE_ERROR_LOG(rc);
424 }
425
426 OPAL_OUTPUT_VERBOSE((5, orte_state_base_framework.framework_output,
427 "%s state:orted: SENDING JOB LOCAL TERMINATION UPDATE FOR JOB %s",
428 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
429 ORTE_JOBID_PRINT(jdata->jobid)));
430 if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, alert,
431 ORTE_RML_TAG_PLM,
432 orte_rml_send_callback, NULL))) {
433 ORTE_ERROR_LOG(rc);
434 }
435
436 orte_set_attribute(&jdata->attributes, ORTE_JOB_TERM_NOTIFIED, ORTE_ATTR_LOCAL, NULL, OPAL_BOOL);
437
438 for (i=0; i < orte_local_children->size; i++) {
439 if (NULL == (pptr = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
440 continue;
441 }
442
443 if (pptr->name.jobid == jdata->jobid) {
444
445 opal_pointer_array_set_item(orte_local_children, i, NULL);
446 OBJ_RELEASE(pptr);
447 }
448 }
449
450 if (NULL != orte_iof.complete) {
451 orte_iof.complete(jdata);
452 }
453
454
455 if (NULL != opal_pmix.server_deregister_nspace) {
456 opal_pmix.server_deregister_nspace(jdata->jobid, NULL, NULL);
457 }
458
459
460 if (NULL != jdata->map) {
461 map = jdata->map;
462 for (index = 0; index < map->nodes->size; index++) {
463 if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, index))) {
464 continue;
465 }
466 OPAL_OUTPUT_VERBOSE((2, orte_state_base_framework.framework_output,
467 "%s state:orted releasing procs from node %s",
468 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
469 node->name));
470 for (i = 0; i < node->procs->size; i++) {
471 if (NULL == (pptr = (orte_proc_t*)opal_pointer_array_get_item(node->procs, i))) {
472 continue;
473 }
474 if (pptr->name.jobid != jdata->jobid) {
475
476 continue;
477 }
478 if (!ORTE_FLAG_TEST(pptr, ORTE_PROC_FLAG_TOOL)) {
479 node->slots_inuse--;
480 node->num_procs--;
481 }
482 OPAL_OUTPUT_VERBOSE((2, orte_state_base_framework.framework_output,
483 "%s state:orted releasing proc %s from node %s",
484 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
485 ORTE_NAME_PRINT(&pptr->name), node->name));
486
487 opal_pointer_array_set_item(node->procs, i, NULL);
488
489 OBJ_RELEASE(pptr);
490 }
491
492 opal_pointer_array_set_item(map->nodes, index, NULL);
493
494 OBJ_RELEASE(node);
495
496 ORTE_FLAG_UNSET(node, ORTE_NODE_FLAG_MAPPED);
497 }
498 OBJ_RELEASE(map);
499 jdata->map = NULL;
500 }
501
502
503 if (orte_state_base_run_fdcheck) {
504 orte_state_base_check_fds(jdata);
505 }
506
507
508
509 if (NULL != orte_data_server_uri) {
510 target.jobid = jdata->jobid;
511 target.vpid = ORTE_VPID_WILDCARD;
512 orte_state_base_notify_data_server(&target);
513 }
514
515
516 opal_hash_table_set_value_uint32(orte_job_data, jdata->jobid, NULL);
517 OBJ_RELEASE(jdata);
518 }
519 }
520
521 cleanup:
522 OBJ_RELEASE(caddy);
523 }
524
525 static int pack_state_for_proc(opal_buffer_t *alert, orte_proc_t *child)
526 {
527 int rc;
528
529
530 if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &(child->name.vpid), 1, ORTE_VPID))) {
531 ORTE_ERROR_LOG(rc);
532 return rc;
533 }
534
535 if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &child->pid, 1, OPAL_PID))) {
536 ORTE_ERROR_LOG(rc);
537 return rc;
538 }
539
540 if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &child->state, 1, ORTE_PROC_STATE))) {
541 ORTE_ERROR_LOG(rc);
542 return rc;
543 }
544
545 if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &child->exit_code, 1, ORTE_EXIT_CODE))) {
546 ORTE_ERROR_LOG(rc);
547 return rc;
548 }
549
550 return ORTE_SUCCESS;
551 }
552
553 static int pack_state_update(opal_buffer_t *alert, orte_job_t *jdata)
554 {
555 int i, rc;
556 orte_proc_t *child;
557 orte_vpid_t null=ORTE_VPID_INVALID;
558
559
560 if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &jdata->jobid, 1, ORTE_JOBID))) {
561 ORTE_ERROR_LOG(rc);
562 return rc;
563 }
564 for (i=0; i < orte_local_children->size; i++) {
565 if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
566 continue;
567 }
568
569 if (child->name.jobid == jdata->jobid) {
570 if (ORTE_SUCCESS != (rc = pack_state_for_proc(alert, child))) {
571 ORTE_ERROR_LOG(rc);
572 return rc;
573 }
574 }
575 }
576
577 if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &null, 1, ORTE_VPID))) {
578 ORTE_ERROR_LOG(rc);
579 return rc;
580 }
581
582 return ORTE_SUCCESS;
583 }