This source file includes following definitions.
- local_coord_init
- local_coord_finalize
- local_coord_setup_job
- local_coord_release_job
- snapc_full_local_start_hnp_listener
- snapc_full_local_stop_hnp_listener
- snapc_full_local_start_app_listener
- snapc_full_local_stop_app_listener
- snapc_full_local_app_cmd_recv
- snapc_full_local_send_restart_proc_info
- snapc_full_local_hnp_cmd_recv
- snapc_full_local_process_job_update_cmd
- local_coord_job_state_update
- local_coord_job_state_update_finished_local
- local_coord_job_state_update_finished_local_vpid
- snapc_full_local_start_checkpoint_all
- local_define_pipe_names
- snapc_full_local_update_coord
- snapc_full_local_start_ckpt_open_comm
- snapc_full_local_start_ckpt_handshake_opts
- snapc_full_local_start_ckpt_handshake
- snapc_full_local_end_ckpt_handshake
- snapc_full_local_comm_read_event
- snapc_full_get_min_state
- snapc_full_local_get_vpids
- snapc_full_local_refresh_vpids
- find_vpid_snapshot
- orte_snapc_full_local_reset_coord
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 #include "orte_config.h"
21
22 #include <sys/types.h>
23 #ifdef HAVE_UNISTD_H
24 #include <unistd.h>
25 #endif
26 #include <signal.h>
27 #ifdef HAVE_FCNTL_H
28 #include <fcntl.h>
29 #endif
30 #ifdef HAVE_SYS_TYPES_H
31 #include <sys/types.h>
32 #endif
33 #ifdef HAVE_SYS_STAT_H
34 #include <sys/stat.h>
35 #endif
36 #ifdef HAVE_SYS_WAIT_H
37 #include <sys/wait.h>
38 #endif
39 #include <string.h>
40
41 #include "opal/runtime/opal_progress.h"
42 #include "opal/runtime/opal_cr.h"
43 #include "opal/util/output.h"
44 #include "opal/util/opal_environ.h"
45 #include "opal/util/os_dirpath.h"
46 #include "opal/util/basename.h"
47 #include "orte/mca/mca.h"
48 #include "opal/mca/base/base.h"
49 #include "opal/mca/crs/crs.h"
50 #include "opal/mca/crs/base/base.h"
51
52 #include "orte/util/show_help.h"
53 #include "orte/util/name_fns.h"
54 #include "orte/runtime/orte_wait.h"
55 #include "orte/runtime/orte_globals.h"
56 #include "orte/mca/rml/rml.h"
57 #include "orte/mca/rml/rml_types.h"
58 #include "orte/mca/odls/odls.h"
59 #include "orte/mca/odls/base/odls_private.h"
60 #include "orte/mca/errmgr/errmgr.h"
61 #include "orte/mca/routed/routed.h"
62 #include "orte/mca/grpcomm/grpcomm.h"
63
64 #include "orte/mca/snapc/snapc.h"
65 #include "orte/mca/snapc/base/base.h"
66
67 #include "snapc_full.h"
68
69
70
71
72 static orte_jobid_t current_local_jobid = ORTE_JOBID_INVALID;
73 static orte_snapc_base_global_snapshot_t local_global_snapshot;
74
75 static int current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_NONE;
76
77 static bool currently_migrating = false;
78 static bool flushed_modex = false;
79 static bool sstore_local_sync_finished = false;
80 static bool sstore_local_procs_finished = false;
81
82 static int local_define_pipe_names(orte_snapc_full_app_snapshot_t *vpid_snapshot);
83
84 static bool snapc_local_hnp_recv_issued = false;
85 static int snapc_full_local_start_hnp_listener(void);
86 static int snapc_full_local_stop_hnp_listener(void);
87 static void snapc_full_local_hnp_cmd_recv(int status,
88 orte_process_name_t* sender,
89 opal_buffer_t* buffer,
90 orte_rml_tag_t tag,
91 void* cbdata);
92
93 static bool snapc_local_app_recv_issued = false;
94 static int snapc_full_local_start_app_listener(void);
95 static int snapc_full_local_stop_app_listener(void);
96 static void snapc_full_local_app_cmd_recv(int status,
97 orte_process_name_t* sender,
98 opal_buffer_t* buffer,
99 orte_rml_tag_t tag,
100 void* cbdata);
101
102 static orte_snapc_full_app_snapshot_t *find_vpid_snapshot(orte_process_name_t *name );
103 static int snapc_full_local_get_vpids(void);
104 static int snapc_full_local_refresh_vpids(void);
105
106 #if OPAL_ENABLE_CRDEBUG == 1
107 static int snapc_full_local_send_restart_proc_info(void);
108 #endif
109
110 static void snapc_full_local_process_job_update_cmd(orte_process_name_t* sender,
111 opal_buffer_t* buffer,
112 bool quick);
113
114 static int local_coord_job_state_update_finished_local(void);
115 static int local_coord_job_state_update_finished_local_vpid(orte_snapc_full_app_snapshot_t *vpid_snapshot);
116
117 #if 0
118 static int snapc_full_establish_dir(void);
119 #endif
120 static int snapc_full_get_min_state(void);
121
122 static int snapc_full_local_update_coord(int state, bool quick);
123
124 static int snapc_full_local_start_checkpoint_all(int ckpt_state,
125 opal_crs_base_ckpt_options_t *options);
126 static int snapc_full_local_start_ckpt_open_comm(orte_snapc_full_app_snapshot_t *vpid_snapshot);
127 static int snapc_full_local_start_ckpt_handshake_opts(orte_snapc_full_app_snapshot_t *vpid_snapshot,
128 opal_crs_base_ckpt_options_t *options);
129 static int snapc_full_local_start_ckpt_handshake(orte_snapc_full_app_snapshot_t *vpid_snapshot);
130 static int snapc_full_local_end_ckpt_handshake(orte_snapc_full_app_snapshot_t *vpid_snapshot);
131 static void snapc_full_local_comm_read_event(int fd, short flags, void *arg);
132
133 static int orte_snapc_full_local_reset_coord(void);
134
135
136
137
138 int local_coord_init( void )
139 {
140 current_local_jobid = ORTE_JOBID_INVALID;
141 current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_NONE;
142
143 return ORTE_SUCCESS;
144 }
145
146 int local_coord_finalize( void )
147 {
148 if( ORTE_JOBID_INVALID != current_local_jobid ) {
149 return local_coord_release_job(current_local_jobid);
150 }
151
152 current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_NONE;
153
154 return ORTE_SUCCESS;
155 }
156
157 int local_coord_setup_job(orte_jobid_t jobid)
158 {
159 int ret, exit_status = ORTE_SUCCESS;
160 orte_snapc_full_app_snapshot_t *vpid_snapshot = NULL;
161 opal_list_item_t* item = NULL;
162
163
164
165
166 if( jobid == current_local_jobid ) {
167
168
169
170 if( currently_migrating ) {
171 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
172 "Local) Restarting Job %s from Migration...",
173 ORTE_JOBID_PRINT(jobid)));
174 if( ORTE_SUCCESS != (ret = snapc_full_local_refresh_vpids() ) ) {
175 ORTE_ERROR_LOG(ret);
176 exit_status = ret;
177 goto cleanup;
178 }
179 }
180 else {
181 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
182 "Local) Restarting Job %s...",
183 ORTE_JOBID_PRINT(jobid)));
184
185 for(item = opal_list_get_first(&(local_global_snapshot.local_snapshots));
186 item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
187 item = opal_list_get_next(item) ) {
188 vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
189 opal_list_remove_item(&(local_global_snapshot.local_snapshots), item);
190 }
191
192 if( ORTE_SUCCESS != (ret = snapc_full_local_get_vpids() ) ) {
193 ORTE_ERROR_LOG(ret);
194 exit_status = ret;
195 goto cleanup;
196 }
197
198 for(item = opal_list_get_first(&(local_global_snapshot.local_snapshots));
199 item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
200 item = opal_list_get_next(item) ) {
201 vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
202 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
203 "Local) Restarting Job %s: Daemon %s \t Process %s",
204 ORTE_JOBID_PRINT(jobid),
205 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
206 ORTE_NAME_PRINT(&vpid_snapshot->super.process_name)));
207 }
208 }
209
210 exit_status = ORTE_SUCCESS;
211 goto cleanup;
212 }
213 else if( ORTE_JOBID_INVALID != current_local_jobid ) {
214 opal_output(mca_snapc_full_component.super.output_handle,
215 "Local) Setup of job %s Failed! Already setup job %s\n",
216 ORTE_JOBID_PRINT(jobid), ORTE_JOBID_PRINT(current_local_jobid));
217 exit_status = ORTE_SUCCESS;
218 goto cleanup;
219 }
220
221 current_local_jobid = jobid;
222 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
223 "Local) Setting up jobid %s\n",
224 ORTE_JOBID_PRINT(current_local_jobid)));
225
226
227
228
229 OBJ_CONSTRUCT(&local_global_snapshot, orte_snapc_base_global_snapshot_t);
230
231 if( ORTE_SUCCESS != (ret = snapc_full_local_get_vpids()) ) {
232 ORTE_ERROR_LOG(ret);
233 exit_status = ret;
234 goto cleanup;
235 }
236
237
238
239
240
241 #if 0
242 if(orte_snapc_base_establish_global_snapshot_dir) {
243 if( ORTE_SUCCESS != (ret = snapc_full_establish_dir() ) ) {
244 ORTE_ERROR_LOG(ret);
245 exit_status = ret;
246 goto cleanup;
247 }
248 }
249 #endif
250
251
252
253
254 if( ORTE_SUCCESS != (ret = snapc_full_local_start_hnp_listener() ) ) {
255 ORTE_ERROR_LOG(ret);
256 exit_status = ret;
257 goto cleanup;
258 }
259
260
261
262
263 if( ORTE_SUCCESS != (ret = snapc_full_local_start_app_listener() ) ) {
264 ORTE_ERROR_LOG(ret);
265 exit_status = ret;
266 goto cleanup;
267 }
268
269
270 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
271 "Local) Finished setup of job %s",
272 ORTE_JOBID_PRINT(current_local_jobid) ));
273
274 cleanup:
275 return exit_status;
276 }
277
278 int local_coord_release_job(orte_jobid_t jobid)
279 {
280 int ret, exit_status = ORTE_SUCCESS;
281 orte_snapc_full_app_snapshot_t *vpid_snapshot;
282 opal_list_item_t* item = NULL;
283 bool is_done = true;
284
285
286
287
288
289 do {
290 is_done = true;
291
292 for(item = opal_list_get_first(&(local_global_snapshot.local_snapshots));
293 item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
294 item = opal_list_get_next(item) ) {
295 vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
296
297 if(ORTE_SNAPC_CKPT_STATE_NONE != vpid_snapshot->super.state &&
298 ORTE_SNAPC_CKPT_STATE_ERROR != vpid_snapshot->super.state &&
299 ORTE_SNAPC_CKPT_STATE_ESTABLISHED != vpid_snapshot->super.state &&
300 ORTE_SNAPC_CKPT_STATE_RECOVERED != vpid_snapshot->super.state ) {
301 is_done = false;
302 break;
303 }
304 else {
305 opal_list_remove_item(&(local_global_snapshot.local_snapshots), item);
306 }
307 }
308 if( !is_done ) {
309 opal_progress();
310 }
311 } while(!is_done);
312
313 OBJ_DESTRUCT(&local_global_snapshot);
314
315
316
317
318 if( ORTE_SUCCESS != (ret = snapc_full_local_stop_app_listener() ) ) {
319 ORTE_ERROR_LOG(ret);
320 exit_status = ret;
321 }
322
323 if( ORTE_SUCCESS != (ret = snapc_full_local_stop_hnp_listener() ) ) {
324 ORTE_ERROR_LOG(ret);
325 exit_status = ret;
326 }
327
328 return exit_status;
329 }
330
331
332
333
334
335
336
337
338 static int snapc_full_local_start_hnp_listener(void)
339 {
340
341
342
343 if( ORTE_SNAPC_GLOBAL_COORD_TYPE == (orte_snapc_coord_type & ORTE_SNAPC_GLOBAL_COORD_TYPE)) {
344 return ORTE_SUCCESS;
345 }
346
347 if (snapc_local_hnp_recv_issued ) {
348 return ORTE_SUCCESS;
349 }
350
351 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
352 "Local) Startup Coordinator Channel"));
353
354
355
356
357 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SNAPC_FULL,
358 ORTE_RML_PERSISTENT, snapc_full_local_hnp_cmd_recv, NULL);
359
360 snapc_local_hnp_recv_issued = true;
361
362 return ORTE_SUCCESS;
363 }
364
365 static int snapc_full_local_stop_hnp_listener(void)
366 {
367
368
369
370 if( ORTE_SNAPC_GLOBAL_COORD_TYPE == (orte_snapc_coord_type & ORTE_SNAPC_GLOBAL_COORD_TYPE)) {
371 return ORTE_SUCCESS;
372 }
373
374 if (!snapc_local_hnp_recv_issued ) {
375 return ORTE_SUCCESS;
376 }
377
378 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
379 "Local) Shutdown Coordinator Channel"));
380
381 orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SNAPC_FULL);
382
383 snapc_local_hnp_recv_issued = false;
384 return ORTE_SUCCESS;
385 }
386
387 static int snapc_full_local_start_app_listener(void)
388 {
389 if (snapc_local_app_recv_issued) {
390 return ORTE_SUCCESS;
391 }
392
393 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
394 "Local) Startup Application State Channel"));
395
396
397
398
399 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SNAPC,
400 ORTE_RML_PERSISTENT, snapc_full_local_app_cmd_recv,
401 NULL);
402
403 snapc_local_app_recv_issued = true;
404 return ORTE_SUCCESS;
405 }
406
407 static int snapc_full_local_stop_app_listener(void)
408 {
409 if (!snapc_local_app_recv_issued ) {
410 return ORTE_SUCCESS;
411 }
412
413 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
414 "Local) Shutdown Application State Channel"));
415
416 orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SNAPC);
417
418 snapc_local_app_recv_issued = false;
419 return ORTE_SUCCESS;
420 }
421
422
423
424
425 void snapc_full_local_app_cmd_recv(int status,
426 orte_process_name_t* sender,
427 opal_buffer_t* buffer,
428 orte_rml_tag_t tag,
429 void* cbdata)
430 {
431 int ret;
432 opal_list_item_t* item = NULL;
433 orte_snapc_full_app_snapshot_t *vpid_snapshot = NULL;
434 orte_snapc_cmd_flag_t command;
435 orte_process_name_t proc;
436 pid_t proc_pid = 0;
437 orte_std_cntr_t count;
438 int cr_state;
439 bool is_done;
440 #if OPAL_ENABLE_CRDEBUG == 1
441 bool all_done = false;
442 bool crdebug_enabled = false;
443 #endif
444
445 if( ORTE_RML_TAG_SNAPC != tag ) {
446 opal_output(mca_snapc_full_component.super.output_handle,
447 "Local) Error: Unknown tag: Received a command message from %s (tag = %d).",
448 ORTE_NAME_PRINT(sender), tag);
449 ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
450 return;
451 }
452
453
454
455
456 count = 1;
457 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SNAPC_CMD))) {
458 ORTE_ERROR_LOG(ret);
459 goto cleanup;
460 }
461
462 if( ORTE_SNAPC_LOCAL_UPDATE_CMD != command &&
463 ORTE_SNAPC_LOCAL_FINISH_CMD != command ) {
464 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
465 "Local) Warning: Expected an application command (%d) but received (%d)\n",
466 ORTE_SNAPC_LOCAL_UPDATE_CMD, command));
467 goto cleanup;
468 }
469
470 if( ORTE_SNAPC_LOCAL_UPDATE_CMD == command ) {
471
472
473
474 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
475 "Local) Application: Update pid operation."));
476
477
478
479
480
481
482 count = 1;
483
484 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &proc, &count, ORTE_NAME))) {
485 ORTE_ERROR_LOG(ret);
486 goto cleanup;
487 }
488 count = 1;
489 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &proc_pid, &count, OPAL_PID))) {
490 ORTE_ERROR_LOG(ret);
491 goto cleanup;
492 }
493 #if OPAL_ENABLE_CRDEBUG == 1
494 count = 1;
495 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &crdebug_enabled, &count, OPAL_BOOL))) {
496 ORTE_ERROR_LOG(ret);
497 goto cleanup;
498 }
499 #endif
500
501 if( NULL == (vpid_snapshot = find_vpid_snapshot(&proc)) ) {
502 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
503 goto cleanup;
504 }
505
506 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
507 "Local) Updated PID: %s : %d -> %d",
508 ORTE_NAME_PRINT(&vpid_snapshot->super.process_name), vpid_snapshot->process_pid, proc_pid));
509
510
511 vpid_snapshot->process_pid = proc_pid;
512 vpid_snapshot->finished = true;
513
514 #if OPAL_ENABLE_CRDEBUG == 1
515
516
517
518 if( crdebug_enabled ) {
519 all_done = true;
520 for(item = opal_list_get_first(&(local_global_snapshot.local_snapshots));
521 item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
522 item = opal_list_get_next(item) ) {
523 vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
524 if( !vpid_snapshot->finished ) {
525 all_done = false;
526 break;
527 }
528 }
529 if( all_done ) {
530
531 if( ORTE_SUCCESS != (ret = snapc_full_local_send_restart_proc_info() ) ) {
532 ORTE_ERROR_LOG(ret);
533 goto cleanup;
534 }
535 }
536 }
537 #endif
538
539
540
541
542
543 }
544 else if( ORTE_SNAPC_LOCAL_FINISH_CMD == command ) {
545
546
547
548
549 count = 1;
550 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &cr_state, &count, OPAL_INT))) {
551 ORTE_ERROR_LOG(ret);
552 goto cleanup;
553 }
554
555 if( NULL == (vpid_snapshot = find_vpid_snapshot(sender)) ) {
556 opal_output(0, "Local) Failed to find process %s",
557 ORTE_NAME_PRINT(sender));
558 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
559 goto cleanup;
560 }
561
562 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
563 "Local) Process %s Finished Recovery (%d)",
564 ORTE_NAME_PRINT(&vpid_snapshot->super.process_name),
565 cr_state));
566
567 vpid_snapshot->super.state = ORTE_SNAPC_CKPT_STATE_RECOVERED;
568
569
570
571
572 is_done = true;
573 for(item = opal_list_get_first(&(local_global_snapshot.local_snapshots));
574 item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
575 item = opal_list_get_next(item) ) {
576 vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
577
578 if( ORTE_SNAPC_CKPT_STATE_RECOVERED != vpid_snapshot->super.state ) {
579 is_done = false;
580 break;
581 }
582 }
583
584 if( is_done ) {
585
586
587
588 OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
589 "Local) Job Ckpt finished - Confirmed! Tell the Global Coord\n"));
590
591 if( ORTE_SUCCESS != (ret = snapc_full_local_update_coord(ORTE_SNAPC_CKPT_STATE_RECOVERED, true) ) ) {
592 ORTE_ERROR_LOG(ret);
593 goto cleanup;
594 }
595
596
597
598
599 if( !sstore_local_sync_finished ) {
600 sstore_local_procs_finished = true;
601 OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
602 "Local) Job Ckpt finished - Confirmed! Not finished Syncing...\n"));
603 } else {
604
605
606
607 if( ORTE_SUCCESS != (ret = orte_snapc_full_local_reset_coord()) ) {
608 ORTE_ERROR_LOG(ret);
609 goto cleanup;
610 }
611 }
612 }
613 }
614
615 cleanup:
616 return;
617 }
618
619 #if OPAL_ENABLE_CRDEBUG == 1
620 static int snapc_full_local_send_restart_proc_info(void)
621 {
622 int ret, exit_status = ORTE_SUCCESS;
623 orte_snapc_full_app_snapshot_t *vpid_snapshot = NULL;
624 opal_list_item_t* item = NULL;
625 opal_buffer_t *buffer = NULL;
626 orte_snapc_full_cmd_flag_t command = ORTE_SNAPC_FULL_RESTART_PROC_INFO;
627 size_t num_vpids = 0;
628
629
630
631
632 if( ORTE_SNAPC_GLOBAL_COORD_TYPE == (orte_snapc_coord_type & ORTE_SNAPC_GLOBAL_COORD_TYPE)) {
633 for(item = opal_list_get_first(&(local_global_snapshot.local_snapshots));
634 item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
635 item = opal_list_get_next(item) ) {
636 vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
637 global_coord_restart_proc_info(vpid_snapshot->process_pid, orte_process_info.nodename);
638 }
639
640
641
642 fflush(stdout);
643 return ORTE_SUCCESS;
644 }
645
646 buffer = OBJ_NEW(opal_buffer_t);
647
648
649
650
651 num_vpids = opal_list_get_size(&(local_global_snapshot.local_snapshots));
652 if( num_vpids <= 0 ) {
653 return exit_status;
654 }
655
656 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
657 ORTE_ERROR_LOG(ret);
658 exit_status = ret;
659 goto cleanup;
660 }
661
662 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(orte_process_info.nodename), 1, OPAL_STRING))) {
663 ORTE_ERROR_LOG(ret);
664 exit_status = ret;
665 goto cleanup;
666 }
667
668 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &num_vpids, 1, OPAL_SIZE))) {
669 ORTE_ERROR_LOG(ret);
670 exit_status = ret;
671 goto cleanup;
672 }
673
674 for(item = opal_list_get_first(&(local_global_snapshot.local_snapshots));
675 item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
676 item = opal_list_get_next(item) ) {
677 vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
678
679 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(vpid_snapshot->process_pid), 1, OPAL_PID))) {
680 ORTE_ERROR_LOG(ret);
681 exit_status = ret;
682 goto cleanup;
683 }
684
685 }
686
687 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer, ORTE_RML_TAG_SNAPC_FULL,
688 orte_rml_send_callback, 0))) {
689 ORTE_ERROR_LOG(ret);
690 exit_status = ret;
691 goto cleanup;
692 }
693
694 return ORTE_SUCCESS;
695
696 cleanup:
697 OBJ_RELEASE(buffer);
698
699 return exit_status;
700 }
701 #endif
702
703 void snapc_full_local_hnp_cmd_recv(int status,
704 orte_process_name_t* sender,
705 opal_buffer_t* buffer,
706 orte_rml_tag_t tag,
707 void* cbdata)
708 {
709 int ret;
710 orte_snapc_full_cmd_flag_t command;
711 orte_std_cntr_t count;
712
713 if( ORTE_RML_TAG_SNAPC_FULL != tag ) {
714 opal_output(mca_snapc_full_component.super.output_handle,
715 "Local) Error: Unknown tag: Received a command message from %s (tag = %d).",
716 ORTE_NAME_PRINT(sender), tag);
717 ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
718 return;
719 }
720
721
722
723
724 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
725 "Local) Receive a command message."));
726
727 count = 1;
728 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SNAPC_FULL_CMD))) {
729 ORTE_ERROR_LOG(ret);
730 goto cleanup;
731 }
732
733 switch (command) {
734 case ORTE_SNAPC_FULL_UPDATE_JOB_STATE_QUICK_CMD:
735 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
736 "Local) Command: Update Job state command (quick)"));
737
738 snapc_full_local_process_job_update_cmd(sender, buffer, true);
739 break;
740
741 case ORTE_SNAPC_FULL_UPDATE_JOB_STATE_CMD:
742 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
743 "Local) Command: Update Job state command"));
744
745 snapc_full_local_process_job_update_cmd(sender, buffer, false);
746 break;
747
748 case ORTE_SNAPC_FULL_RESTART_PROC_INFO:
749 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
750 "Local) Command: Update hostname/pid associations"));
751
752 break;
753
754 default:
755 ORTE_ERROR_LOG(ORTE_ERR_VALUE_OUT_OF_BOUNDS);
756 }
757
758 cleanup:
759 return;
760 }
761
762 static void snapc_full_local_process_job_update_cmd(orte_process_name_t* sender,
763 opal_buffer_t* buffer,
764 bool quick)
765 {
766 int ret;
767 orte_jobid_t jobid;
768 int job_ckpt_state;
769 orte_std_cntr_t count;
770 opal_crs_base_ckpt_options_t *options = NULL;
771 bool loc_migrating = false;
772 size_t loc_num_procs = 0;
773 orte_process_name_t proc_name;
774 size_t i;
775 orte_snapc_full_app_snapshot_t *vpid_snapshot = NULL;
776 opal_list_item_t* item = NULL;
777 orte_sstore_base_handle_t ss_handle;
778
779
780
781
782
783
784
785
786
787
788
789 count = 1;
790 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &jobid, &count, ORTE_JOBID))) {
791 ORTE_ERROR_LOG(ret);
792 return;
793 }
794
795 count = 1;
796 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &job_ckpt_state, &count, OPAL_INT))) {
797 ORTE_ERROR_LOG(ret);
798 return;
799 }
800
801 if( !quick ) {
802 if (ORTE_SUCCESS != (ret = orte_sstore.unpack_handle(sender, buffer, &ss_handle)) ) {
803 ORTE_ERROR_LOG(ret);
804 return;
805 }
806
807 options = OBJ_NEW(opal_crs_base_ckpt_options_t);
808 if( ORTE_SUCCESS != (ret = orte_snapc_base_unpack_options(buffer, options)) ) {
809 ORTE_ERROR_LOG(ret);
810 goto cleanup;
811 }
812
813
814
815 opal_crs_base_copy_options(options, local_global_snapshot.options);
816
817 count = 1;
818 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(loc_migrating), &count, OPAL_BOOL))) {
819 ORTE_ERROR_LOG(ret);
820 goto cleanup;
821 }
822
823 if( loc_migrating ) {
824 currently_migrating = true;
825
826 count = 1;
827 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &loc_num_procs, &count, OPAL_SIZE))) {
828 ORTE_ERROR_LOG(ret);
829 goto cleanup;
830 }
831
832 for( i = 0; i < loc_num_procs; ++i ) {
833 count = 1;
834 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &proc_name, &count, ORTE_NAME))) {
835 ORTE_ERROR_LOG(ret);
836 goto cleanup;
837 }
838
839
840
841
842 for(item = opal_list_get_first(&(local_global_snapshot.local_snapshots));
843 item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
844 item = opal_list_get_next(item) ) {
845 vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
846
847 if( OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL,
848 &(vpid_snapshot->super.process_name),
849 &proc_name) ) {
850 vpid_snapshot->migrating = true;
851 break;
852 }
853 }
854 }
855 }
856 }
857
858 if( ORTE_SUCCESS != (ret = local_coord_job_state_update(jobid,
859 job_ckpt_state,
860 ss_handle,
861 local_global_snapshot.options)) ) {
862 ORTE_ERROR_LOG(ret);
863 return;
864 }
865
866 cleanup:
867 if( NULL != options ) {
868 OBJ_RELEASE(options);
869 options = NULL;
870 }
871
872 return;
873 }
874
875
876 int local_coord_job_state_update(orte_jobid_t jobid,
877 int job_ckpt_state,
878 orte_sstore_base_handle_t ss_handle,
879 opal_crs_base_ckpt_options_t *options)
880 {
881 int ret, exit_status = ORTE_SUCCESS;
882 orte_snapc_full_app_snapshot_t *vpid_snapshot = NULL;
883 opal_list_item_t* item = NULL;
884 char * state_str = NULL;
885
886
887 opal_crs_base_copy_options(options, local_global_snapshot.options);
888
889 OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
890 "Local) Job %s: Changed to state to:\n",
891 ORTE_JOBID_PRINT(jobid)));
892 orte_snapc_ckpt_state_str(&state_str, job_ckpt_state);
893 OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
894 "Local) Job State: %d (%s)\n",
895 (int)job_ckpt_state, state_str ));
896 free(state_str);
897 state_str = NULL;
898
899
900
901
902
903
904 if( ORTE_SUCCESS != (ret = snapc_full_local_get_vpids() ) ) {
905 ORTE_ERROR_LOG(ret);
906 exit_status = ret;
907 goto cleanup;
908 }
909
910 current_job_ckpt_state = job_ckpt_state;
911
912
913
914
915 if( ORTE_SNAPC_CKPT_STATE_PENDING == job_ckpt_state ) {
916
917
918
919 local_global_snapshot.ss_handle = ss_handle;
920 orte_sstore.register_handle(local_global_snapshot.ss_handle);
921
922
923
924
925 for(item = opal_list_get_first(&(local_global_snapshot.local_snapshots));
926 item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
927 item = opal_list_get_next(item) ) {
928 vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
929
930 vpid_snapshot->super.state = job_ckpt_state;
931 vpid_snapshot->finished = false;
932 }
933
934
935
936
937 if( ORTE_SUCCESS != (ret = snapc_full_local_start_checkpoint_all(job_ckpt_state, options) ) ) {
938 ORTE_ERROR_LOG(ret);
939 exit_status = ret;
940 goto cleanup;
941 }
942 }
943 else if( ORTE_SNAPC_CKPT_STATE_MIGRATING == job_ckpt_state ) {
944 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
945 "Local) Migrating: Display a list of processes migrating"));
946
947 for(item = opal_list_get_first(&(local_global_snapshot.local_snapshots));
948 item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
949 item = opal_list_get_next(item) ) {
950 vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
951
952
953
954 if( vpid_snapshot->migrating ) {
955 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
956 "Local) Migrating: %s",
957 ORTE_NAME_PRINT(&vpid_snapshot->super.process_name) ));
958 }
959 }
960 }
961
962
963
964
965
966 else if( ORTE_SNAPC_CKPT_STATE_FINISHED_LOCAL == job_ckpt_state ) {
967 OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
968 "Local) Locally finished, release all processes\n"));
969 if( ORTE_SUCCESS != (ret = local_coord_job_state_update_finished_local() ) ) {
970 ORTE_ERROR_LOG(ORTE_ERROR);
971 exit_status = ORTE_ERROR;
972 goto cleanup;
973 }
974 }
975
976
977
978
979 else if( ORTE_SNAPC_CKPT_STATE_ESTABLISHED == job_ckpt_state ) {
980
981
982
983 }
984 else {
985 ;
986 }
987
988 cleanup:
989 if( NULL != state_str ) {
990 free(state_str);
991 state_str = NULL;
992 }
993
994 return exit_status;
995 }
996
997 static int local_coord_job_state_update_finished_local(void)
998 {
999 int ret, exit_status = ORTE_SUCCESS;
1000 orte_snapc_full_app_snapshot_t *vpid_snapshot = NULL;
1001 opal_list_item_t* item = NULL;
1002
1003 OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
1004 "Local) Job Ckpt finished tell all processes\n"));
1005
1006 for(item = opal_list_get_first(&(local_global_snapshot.local_snapshots));
1007 item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
1008 item = opal_list_get_next(item) ) {
1009 vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
1010
1011 if( ORTE_SUCCESS != (ret = local_coord_job_state_update_finished_local_vpid(vpid_snapshot) ) ) {
1012 ORTE_ERROR_LOG(ORTE_ERROR);
1013 exit_status = ORTE_ERROR;
1014 goto cleanup;
1015 }
1016
1017 if( vpid_snapshot->migrating ) {
1018 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1019 "Local) Removing Migrated Process: %s",
1020 ORTE_NAME_PRINT(&vpid_snapshot->super.process_name) ));
1021 opal_list_remove_item(&(local_global_snapshot.local_snapshots), item);
1022 }
1023 }
1024 cleanup:
1025 return exit_status;
1026 }
1027
1028 static int local_coord_job_state_update_finished_local_vpid(orte_snapc_full_app_snapshot_t *vpid_snapshot)
1029 {
1030 int ret;
1031
1032 OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
1033 "Local) Tell process %s (Ckpt Finished) %s\n",
1034 ORTE_NAME_PRINT(&vpid_snapshot->super.process_name),
1035 (vpid_snapshot->migrating ? "- Migrating, Skip" : "") ));
1036
1037
1038
1039
1040 if( vpid_snapshot->migrating ) {
1041 return ORTE_SUCCESS;
1042 }
1043
1044 if( ORTE_SUCCESS != (ret = snapc_full_local_end_ckpt_handshake(vpid_snapshot) ) ) {
1045 opal_output(mca_snapc_full_component.super.output_handle,
1046 "Local) Error: Unable to finish the handshake with peer %s. %d\n",
1047 ORTE_NAME_PRINT(&vpid_snapshot->super.process_name), ret);
1048 ORTE_ERROR_LOG(ORTE_ERROR);
1049 return ORTE_ERROR;
1050 }
1051
1052 return ORTE_SUCCESS;
1053 }
1054
1055
1056
1057
1058
1059 static int snapc_full_local_start_checkpoint_all(int ckpt_state,
1060 opal_crs_base_ckpt_options_t *options)
1061 {
1062 int ret, exit_status = ORTE_SUCCESS;
1063 orte_snapc_full_app_snapshot_t *vpid_snapshot;
1064 opal_list_item_t* item = NULL;
1065 size_t num_stopped = 0;
1066 int waitpid_status = 0;
1067
1068
1069
1070
1071
1072 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1073 "Local) start() Pass 1: Sanity check"));
1074
1075 for(item = opal_list_get_first(&(local_global_snapshot.local_snapshots));
1076 item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
1077 item = opal_list_get_next(item) ) {
1078 vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
1079
1080
1081 if( vpid_snapshot->process_pid == 0 ) {
1082 ret = snapc_full_local_get_vpids();
1083 if( ORTE_SUCCESS != ret || vpid_snapshot->process_pid == 0 ) {
1084 opal_output( mca_snapc_full_component.super.output_handle,
1085 "local) Cannot checkpoint an invalid pid (%d)\n",
1086 vpid_snapshot->process_pid);
1087 ORTE_ERROR_LOG(ORTE_ERROR);
1088 exit_status = ORTE_ERROR;
1089 goto cleanup;
1090 }
1091 break;
1092 }
1093 }
1094
1095
1096
1097
1098 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1099 "Local) start() Pass 2: Signal Procs"));
1100 for(item = opal_list_get_first(&(local_global_snapshot.local_snapshots));
1101 item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
1102 item = opal_list_get_next(item) ) {
1103 vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
1104
1105
1106
1107
1108 local_define_pipe_names(vpid_snapshot);
1109
1110 OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
1111 "Local) Signal process (%d) with signal %d\n",
1112 (int) vpid_snapshot->process_pid,
1113 opal_cr_entry_point_signal));
1114
1115
1116
1117
1118 if( 0 != (ret = kill(vpid_snapshot->process_pid, opal_cr_entry_point_signal) ) ) {
1119 opal_output(mca_snapc_full_component.super.output_handle,
1120 "local) Error: Failed to signal process %d with signal %d. %d\n",
1121 (int) vpid_snapshot->process_pid,
1122 opal_cr_entry_point_signal,
1123 ret);
1124 exit_status = ret;
1125 goto cleanup;
1126 }
1127 }
1128
1129
1130
1131
1132 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1133 "Local) start() Pass 3: Open pipes"));
1134 for(item = opal_list_get_first(&(local_global_snapshot.local_snapshots));
1135 item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
1136 item = opal_list_get_next(item) ) {
1137 vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
1138
1139 if( ORTE_SUCCESS != (ret = snapc_full_local_start_ckpt_open_comm(vpid_snapshot) ) ) {
1140 opal_output(mca_snapc_full_component.super.output_handle,
1141 "local) Error: Unable to initiate the handshake with peer %s. %d\n",
1142 ORTE_NAME_PRINT(&vpid_snapshot->super.process_name), ret);
1143 ORTE_ERROR_LOG(ORTE_ERROR);
1144 exit_status = ORTE_ERROR;
1145 goto cleanup;
1146 }
1147
1148 vpid_snapshot->super.state = ORTE_SNAPC_CKPT_STATE_RUNNING;
1149 }
1150
1151
1152
1153
1154 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1155 "Local) start() Pass 4: Start handshake"));
1156 for(item = opal_list_get_first(&(local_global_snapshot.local_snapshots));
1157 item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
1158 item = opal_list_get_next(item) ) {
1159 vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
1160
1161 if( ORTE_SUCCESS != (ret = snapc_full_local_start_ckpt_handshake_opts(vpid_snapshot, options) ) ) {
1162 opal_output(mca_snapc_full_component.super.output_handle,
1163 "local) Error: Unable to initiate the handshake with peer %s. %d\n",
1164 ORTE_NAME_PRINT(&vpid_snapshot->super.process_name), ret);
1165 ORTE_ERROR_LOG(ORTE_ERROR);
1166 exit_status = ORTE_ERROR;
1167 goto cleanup;
1168 }
1169 }
1170
1171
1172
1173
1174 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1175 "Local) start() Pass 5: Start checkpoints"));
1176 for(item = opal_list_get_first(&(local_global_snapshot.local_snapshots));
1177 item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
1178 item = opal_list_get_next(item) ) {
1179 vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
1180
1181 if( ORTE_SUCCESS != (ret = snapc_full_local_start_ckpt_handshake(vpid_snapshot) ) ) {
1182 opal_output(mca_snapc_full_component.super.output_handle,
1183 "local) Error: Unable to initiate the handshake with peer %s. %d\n",
1184 ORTE_NAME_PRINT(&vpid_snapshot->super.process_name), ret);
1185 ORTE_ERROR_LOG(ORTE_ERROR);
1186 exit_status = ORTE_ERROR;
1187 goto cleanup;
1188 }
1189 }
1190
1191
1192
1193
1194 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1195 "Local) start() Pass 6: Tell Global Coord that we are running now"));
1196 if( ORTE_SUCCESS != (ret = snapc_full_local_update_coord(ORTE_SNAPC_CKPT_STATE_RUNNING, true) ) ) {
1197 ORTE_ERROR_LOG(ret);
1198 exit_status = ret;
1199 goto cleanup;
1200 }
1201
1202
1203
1204
1205 if( options->stop ) {
1206 while( num_stopped < opal_list_get_size(&(local_global_snapshot.local_snapshots)) ) {
1207 opal_progress();
1208 sleep(1);
1209
1210 for(item = opal_list_get_first(&(local_global_snapshot.local_snapshots));
1211 item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
1212 item = opal_list_get_next(item) ) {
1213 vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
1214
1215 ret = waitpid(vpid_snapshot->process_pid, &waitpid_status, WNOHANG|WUNTRACED);
1216
1217 if( (ret > 0) && WIFSTOPPED(waitpid_status) && (SIGSTOP == WSTOPSIG(waitpid_status)) ) {
1218 ++num_stopped;
1219 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1220 "Local) Child (%d) is stopped [total = %d]",
1221 vpid_snapshot->process_pid, (int)num_stopped ));
1222 }
1223 else if( ret < 0 ) {
1224 if( 0 < mca_snapc_full_component.super.verbose ) {
1225 orte_show_help("help-orte-snapc-full.txt", "waitpid_stop_fail", true,
1226 vpid_snapshot->process_pid, ret,
1227 ORTE_NAME_PRINT(&vpid_snapshot->super.process_name));
1228 }
1229 goto skip_wait;
1230 }
1231 }
1232 }
1233
1234 skip_wait:
1235 for(item = opal_list_get_first(&(local_global_snapshot.local_snapshots));
1236 item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
1237 item = opal_list_get_next(item) ) {
1238 vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
1239
1240 vpid_snapshot->super.state = ORTE_SNAPC_CKPT_STATE_STOPPED;
1241 }
1242
1243 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1244 "Local) All Children have now been stopped [total = %d]",
1245 (int)num_stopped ));
1246
1247
1248
1249
1250 orte_sstore.sync(local_global_snapshot.ss_handle);
1251
1252
1253
1254
1255 if( ORTE_SUCCESS != (ret = snapc_full_local_update_coord(ORTE_SNAPC_CKPT_STATE_STOPPED, false) ) ) {
1256 ORTE_ERROR_LOG(ret);
1257 exit_status = ret;
1258 goto cleanup;
1259 }
1260 }
1261
1262 cleanup:
1263 if( ORTE_SUCCESS != exit_status ) {
1264 ckpt_state = ORTE_SNAPC_CKPT_STATE_ERROR;
1265 }
1266
1267 return exit_status;
1268 }
1269
1270 static int local_define_pipe_names(orte_snapc_full_app_snapshot_t *vpid_snapshot)
1271 {
1272 if( NULL != vpid_snapshot->comm_pipe_r ) {
1273 free(vpid_snapshot->comm_pipe_r);
1274 vpid_snapshot->comm_pipe_r = NULL;
1275 }
1276
1277 if( NULL != vpid_snapshot->comm_pipe_w ) {
1278 free(vpid_snapshot->comm_pipe_w);
1279 vpid_snapshot->comm_pipe_w = NULL;
1280 }
1281
1282 opal_asprintf(&(vpid_snapshot->comm_pipe_w),
1283 "%s/%s.%d_%d",
1284 opal_cr_pipe_dir, OPAL_CR_NAMED_PROG_R,
1285 vpid_snapshot->process_pid,
1286 vpid_snapshot->unique_pipe_id);
1287
1288 opal_asprintf(&(vpid_snapshot->comm_pipe_r),
1289 "%s/%s.%d_%d",
1290 opal_cr_pipe_dir, OPAL_CR_NAMED_PROG_W,
1291 vpid_snapshot->process_pid,
1292 vpid_snapshot->unique_pipe_id);
1293
1294 (vpid_snapshot->unique_pipe_id)++;
1295
1296 return ORTE_SUCCESS;
1297 }
1298
1299 static int snapc_full_local_update_coord(int state, bool quick)
1300 {
1301 int ret, exit_status = ORTE_SUCCESS;
1302 opal_buffer_t *buffer = NULL;
1303 orte_snapc_full_cmd_flag_t command;
1304
1305
1306
1307
1308 buffer = OBJ_NEW(opal_buffer_t);
1309
1310 if( quick ) {
1311 command = ORTE_SNAPC_FULL_UPDATE_ORTED_STATE_QUICK_CMD;
1312 } else {
1313 command = ORTE_SNAPC_FULL_UPDATE_ORTED_STATE_CMD;
1314 }
1315 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
1316 ORTE_ERROR_LOG(ret);
1317 exit_status = ret;
1318 goto cleanup;
1319 }
1320
1321 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &state, 1, OPAL_INT))) {
1322 ORTE_ERROR_LOG(ret);
1323 exit_status = ret;
1324 goto cleanup;
1325 }
1326
1327
1328
1329
1330
1331
1332
1333 if( quick ) {
1334 goto send_data;
1335 }
1336
1337 send_data:
1338 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer,
1339 ORTE_RML_TAG_SNAPC_FULL,
1340 orte_rml_send_callback, 0))) {
1341 ORTE_ERROR_LOG(ret);
1342 exit_status = ret;
1343 goto cleanup;
1344 }
1345
1346 return ORTE_SUCCESS;
1347
1348 cleanup:
1349 OBJ_RELEASE(buffer);
1350
1351 return exit_status;
1352 }
1353
1354 static int snapc_full_local_start_ckpt_open_comm(orte_snapc_full_app_snapshot_t *vpid_snapshot)
1355 {
1356 int ret, exit_status = ORTE_SUCCESS;
1357 int usleep_time = 1000;
1358 int s_time = 0, max_wait_time;
1359
1360
1361 max_wait_time = orte_snapc_full_max_wait_time * (1000000/usleep_time);
1362
1363
1364
1365
1366 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1367 "Local) Waiting for process %s's pipes (%s) (%s)\n",
1368 ORTE_NAME_PRINT(&vpid_snapshot->super.process_name),
1369 vpid_snapshot->comm_pipe_w,
1370 vpid_snapshot->comm_pipe_r));
1371 for( s_time = 0; s_time < max_wait_time || max_wait_time <= 0; ++s_time) {
1372
1373
1374
1375 if( 0 > (ret = access(vpid_snapshot->comm_pipe_r, F_OK) )) {
1376
1377 if( s_time >= max_wait_time - 5 && max_wait_time > 0 ) {
1378 OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
1379 "Local) WARNING: Read file does not exist yet: <%s> rtn = %d (waited %d/%d sec)\n",
1380 vpid_snapshot->comm_pipe_r, ret,
1381 s_time/usleep_time, max_wait_time/usleep_time));
1382 }
1383 usleep(usleep_time);
1384 continue;
1385 }
1386 else if( 0 > (ret = access(vpid_snapshot->comm_pipe_w, F_OK) )) {
1387
1388 if( s_time >= max_wait_time - 5 && max_wait_time > 0 ) {
1389 OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
1390 "Local) WARNING: Write file does not exist yet: <%s> rtn = %d (waited %d/%d sec)\n",
1391 vpid_snapshot->comm_pipe_w, ret,
1392 s_time/usleep_time, max_wait_time/usleep_time));
1393 }
1394 usleep(usleep_time);
1395 continue;
1396 }
1397 else {
1398 break;
1399 }
1400
1401 if( max_wait_time > 0 &&
1402 (s_time == (max_wait_time/2) ||
1403 s_time == (max_wait_time/4) ||
1404 s_time == (3*max_wait_time/4) ) ) {
1405 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1406 "WARNING: Pid (%d) not responding [%d / %d]",
1407 vpid_snapshot->process_pid, s_time, max_wait_time));
1408 }
1409 }
1410
1411 if( max_wait_time > 0 && s_time == max_wait_time ) {
1412
1413
1414
1415
1416
1417
1418
1419
1420 orte_show_help("help-opal-checkpoint.txt", "pid_does_not_exist", true,
1421 vpid_snapshot->process_pid,
1422 vpid_snapshot->comm_pipe_r,
1423 vpid_snapshot->comm_pipe_w);
1424
1425 exit_status = OPAL_ERROR;
1426 goto cleanup;
1427 }
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438 vpid_snapshot->comm_pipe_w_fd = open(vpid_snapshot->comm_pipe_w, O_WRONLY);
1439 if(vpid_snapshot->comm_pipe_w_fd < 0) {
1440 opal_output(mca_snapc_full_component.super.output_handle,
1441 "local) Error: Unable to open name pipe (%s). %d\n",
1442 vpid_snapshot->comm_pipe_w, vpid_snapshot->comm_pipe_w_fd);
1443 exit_status = OPAL_ERROR;
1444 goto cleanup;
1445 }
1446
1447 vpid_snapshot->comm_pipe_r_fd = open(vpid_snapshot->comm_pipe_r, O_RDONLY);
1448 if(vpid_snapshot->comm_pipe_r_fd < 0) {
1449 opal_output(mca_snapc_full_component.super.output_handle,
1450 "local) Error: Unable to open name pipe (%s). %d\n",
1451 vpid_snapshot->comm_pipe_r, vpid_snapshot->comm_pipe_r_fd);
1452 exit_status = OPAL_ERROR;
1453 goto cleanup;
1454 }
1455
1456 cleanup:
1457 return exit_status;
1458 }
1459
1460 static int snapc_full_local_start_ckpt_handshake_opts(orte_snapc_full_app_snapshot_t *vpid_snapshot,
1461 opal_crs_base_ckpt_options_t *options)
1462 {
1463 int ret, exit_status = ORTE_SUCCESS;
1464 int opt_rep;
1465
1466
1467
1468
1469
1470
1471
1472 if( vpid_snapshot->migrating ) {
1473 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1474 "Local) Tell app to MIGRATE. [%s (%d)]\n",
1475 (vpid_snapshot->migrating ? "True" : "False"),
1476 (int)(currently_migrating) ));
1477 }
1478 if( options->term ) {
1479 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1480 "Local) Tell app to TERMINATE after completion of checkpoint. [%s]\n",
1481 (options->term ? "True" : "False") ));
1482 }
1483 if( options->stop ) {
1484 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1485 "Local) Tell app to STOP after completion of checkpoint. [%s]\n",
1486 (options->stop ? "True" : "False") ));
1487 }
1488
1489
1490 opt_rep = (int)(currently_migrating);
1491 if( sizeof(int) != (ret = write(vpid_snapshot->comm_pipe_w_fd, &opt_rep, sizeof(int))) ) {
1492 opal_output(mca_snapc_full_component.super.output_handle,
1493 "Local) Error: Unable to write migrating (%d) to named pipe (%s), %d\n",
1494 vpid_snapshot->migrating, vpid_snapshot->comm_pipe_w, ret);
1495 exit_status = OPAL_ERROR;
1496 goto cleanup;
1497 }
1498
1499 opt_rep = (int)(vpid_snapshot->migrating);
1500 if( sizeof(int) != (ret = write(vpid_snapshot->comm_pipe_w_fd, &opt_rep, sizeof(int))) ) {
1501 opal_output(mca_snapc_full_component.super.output_handle,
1502 "Local) Error: Unable to write migrating (%d) to named pipe (%s), %d\n",
1503 vpid_snapshot->migrating, vpid_snapshot->comm_pipe_w, ret);
1504 exit_status = OPAL_ERROR;
1505 goto cleanup;
1506 }
1507
1508 opt_rep = (int)(options->term);
1509 if( sizeof(int) != (ret = write(vpid_snapshot->comm_pipe_w_fd, &opt_rep, sizeof(int))) ) {
1510 opal_output(mca_snapc_full_component.super.output_handle,
1511 "Local) Error: Unable to write term (%d) to named pipe (%s), %d\n",
1512 options->term, vpid_snapshot->comm_pipe_w, ret);
1513 exit_status = OPAL_ERROR;
1514 goto cleanup;
1515 }
1516
1517 opt_rep = (int)(options->stop);
1518 if( sizeof(int) != (ret = write(vpid_snapshot->comm_pipe_w_fd, &opt_rep, sizeof(int))) ) {
1519 opal_output(mca_snapc_full_component.super.output_handle,
1520 "Local) Error: Unable to write stop (%d) to named pipe (%s), %d\n",
1521 options->stop, vpid_snapshot->comm_pipe_w, ret);
1522 exit_status = OPAL_ERROR;
1523 goto cleanup;
1524 }
1525
1526 opt_rep = (int)(options->inc_prep_only);
1527 if( sizeof(int) != (ret = write(vpid_snapshot->comm_pipe_w_fd, &opt_rep, sizeof(int))) ) {
1528 opal_output(mca_snapc_full_component.super.output_handle,
1529 "Local) Error: Unable to write inc_prep_only (%d) to named pipe (%s), %d\n",
1530 options->stop, vpid_snapshot->comm_pipe_w, ret);
1531 exit_status = OPAL_ERROR;
1532 goto cleanup;
1533 }
1534
1535 opt_rep = (int)(options->inc_recover_only);
1536 if( sizeof(int) != (ret = write(vpid_snapshot->comm_pipe_w_fd, &opt_rep, sizeof(int))) ) {
1537 opal_output(mca_snapc_full_component.super.output_handle,
1538 "Local) Error: Unable to write inc_recover_only (%d) to named pipe (%s), %d\n",
1539 options->stop, vpid_snapshot->comm_pipe_w, ret);
1540 exit_status = OPAL_ERROR;
1541 goto cleanup;
1542 }
1543
1544 #if OPAL_ENABLE_CRDEBUG == 1
1545 opt_rep = (int)(options->attach_debugger);
1546 if( sizeof(int) != (ret = write(vpid_snapshot->comm_pipe_w_fd, &opt_rep, sizeof(int))) ) {
1547 opal_output(mca_snapc_full_component.super.output_handle,
1548 "local) Error: Unable to write attach_debugger (%d) to named pipe (%s), %d\n",
1549 options->attach_debugger, vpid_snapshot->comm_pipe_w, ret);
1550 exit_status = OPAL_ERROR;
1551 goto cleanup;
1552 }
1553
1554 opt_rep = (int)(options->detach_debugger);
1555 if( sizeof(int) != (ret = write(vpid_snapshot->comm_pipe_w_fd, &opt_rep, sizeof(int))) ) {
1556 opal_output(mca_snapc_full_component.super.output_handle,
1557 "local) Error: Unable to write detach_debugger (%d) to named pipe (%s), %d\n",
1558 options->detach_debugger, vpid_snapshot->comm_pipe_w, ret);
1559 exit_status = OPAL_ERROR;
1560 goto cleanup;
1561 }
1562 #endif
1563
1564
1565
1566
1567 if( sizeof(orte_sstore_base_handle_t) != (ret = write(vpid_snapshot->comm_pipe_w_fd,
1568 &(local_global_snapshot.ss_handle), sizeof(orte_sstore_base_handle_t) )) ) {
1569 opal_output(mca_snapc_full_component.super.output_handle,
1570 "Local) Error: Unable to write sstore handle (%d) to named pipe (%s). %d\n",
1571 (int)(local_global_snapshot.ss_handle), vpid_snapshot->comm_pipe_w, ret);
1572 ORTE_ERROR_LOG(OPAL_ERROR);
1573 exit_status = OPAL_ERROR;
1574 goto cleanup;
1575 }
1576
1577 cleanup:
1578 return exit_status;
1579 }
1580
1581 static int snapc_full_local_start_ckpt_handshake(orte_snapc_full_app_snapshot_t *vpid_snapshot)
1582 {
1583 int ret, exit_status = ORTE_SUCCESS;
1584 int value;
1585
1586
1587
1588
1589 if( sizeof(int) != (ret = read(vpid_snapshot->comm_pipe_r_fd, &value, sizeof(int))) ) {
1590 opal_output(mca_snapc_full_component.super.output_handle,
1591 "Local) Error: Unable to read length from named pipe (%s). %d\n",
1592 vpid_snapshot->comm_pipe_r, ret);
1593 exit_status = OPAL_ERROR;
1594 goto cleanup;
1595 }
1596
1597
1598 if( OPAL_CHECKPOINT_CMD_IN_PROGRESS == value ) {
1599 orte_show_help("help-opal-checkpoint.txt",
1600 "ckpt:in_progress",
1601 true,
1602 vpid_snapshot->process_pid);
1603 exit_status = OPAL_ERROR;
1604 goto cleanup;
1605 }
1606 else if( OPAL_CHECKPOINT_CMD_NULL == value ) {
1607 orte_show_help("help-opal-checkpoint.txt",
1608 "ckpt:req_null",
1609 true,
1610 vpid_snapshot->process_pid);
1611 exit_status = OPAL_ERROR;
1612 goto cleanup;
1613 }
1614 else if ( OPAL_CHECKPOINT_CMD_ERROR == value ) {
1615 orte_show_help("help-opal-checkpoint.txt",
1616 "ckpt:req_error",
1617 true,
1618 vpid_snapshot->process_pid);
1619 exit_status = OPAL_ERROR;
1620 goto cleanup;
1621 }
1622
1623 opal_event_set(orte_event_base, &(vpid_snapshot->comm_pipe_r_eh),
1624 vpid_snapshot->comm_pipe_r_fd,
1625 OPAL_EV_READ|OPAL_EV_PERSIST,
1626 snapc_full_local_comm_read_event,
1627 vpid_snapshot);
1628 vpid_snapshot->is_eh_active = true;
1629 opal_event_add(&(vpid_snapshot->comm_pipe_r_eh), NULL);
1630
1631
1632
1633
1634 if( sizeof(int) != (ret = write(vpid_snapshot->comm_pipe_w_fd, &value, sizeof(int))) ) {
1635 opal_output(mca_snapc_full_component.super.output_handle,
1636 "Local) Error: Unable to write to named pipe (%s). %d\n",
1637 vpid_snapshot->comm_pipe_w, ret);
1638 ORTE_ERROR_LOG(OPAL_ERROR);
1639 exit_status = OPAL_ERROR;
1640 goto cleanup;
1641 }
1642
1643 cleanup:
1644 return exit_status;
1645 }
1646
1647 static int snapc_full_local_end_ckpt_handshake(orte_snapc_full_app_snapshot_t *vpid_snapshot)
1648 {
1649 int ret, exit_status = ORTE_SUCCESS;
1650 int last_cmd = 0;
1651
1652
1653
1654
1655 if( 0 > vpid_snapshot->comm_pipe_w_fd ) {
1656 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1657 "Local) end_handshake: Process %s closed pipe. Skipping. (%d)\n",
1658 ORTE_NAME_PRINT(&vpid_snapshot->super.process_name),
1659 vpid_snapshot->comm_pipe_w_fd));
1660 return exit_status;
1661 }
1662
1663
1664
1665
1666 if( sizeof(int) != (ret = write(vpid_snapshot->comm_pipe_w_fd, &last_cmd, sizeof(int))) ) {
1667 opal_output(mca_snapc_full_component.super.output_handle,
1668 "Local) Error: Unable to release process %s (%d)\n",
1669 ORTE_NAME_PRINT(&vpid_snapshot->super.process_name), ret);
1670 exit_status = OPAL_ERROR;
1671 goto cleanup;
1672 }
1673
1674 cleanup:
1675
1676
1677
1678 close(vpid_snapshot->comm_pipe_w_fd);
1679 close(vpid_snapshot->comm_pipe_r_fd);
1680 vpid_snapshot->comm_pipe_w_fd = -1;
1681 vpid_snapshot->comm_pipe_r_fd = -1;
1682
1683 return exit_status;
1684 }
1685
1686 static void snapc_full_local_comm_read_event(int fd, short flags, void *arg)
1687 {
1688 int ret;
1689 orte_snapc_full_app_snapshot_t *vpid_snapshot = NULL;
1690 int ckpt_state;
1691 int loc_min_state;
1692 char * state_str = NULL;
1693
1694 vpid_snapshot = (orte_snapc_full_app_snapshot_t *)arg;
1695
1696 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1697 "Local) Read Event: Process %s done checkpointing...\n",
1698 ORTE_NAME_PRINT(&vpid_snapshot->super.process_name)));
1699
1700
1701
1702
1703 if( !vpid_snapshot->migrating ) {
1704 if( sizeof(int) != (ret = read(vpid_snapshot->comm_pipe_r_fd, &ckpt_state, sizeof(int))) ) {
1705 opal_output(mca_snapc_full_component.super.output_handle,
1706 "Local) Error: Unable to read state from named pipe (%s). %d\n",
1707 vpid_snapshot->comm_pipe_r, ret);
1708 ORTE_ERROR_LOG(ORTE_ERROR);
1709 goto cleanup;
1710 }
1711
1712
1713
1714
1715 if( local_global_snapshot.options->inc_prep_only &&
1716 OPAL_CRS_RUNNING == ckpt_state ) {
1717
1718
1719
1720 vpid_snapshot->super.state = ORTE_SNAPC_CKPT_STATE_INC_PREPED;
1721 loc_min_state = snapc_full_get_min_state();
1722 if( loc_min_state > current_job_ckpt_state &&
1723 ORTE_SNAPC_CKPT_STATE_INC_PREPED == loc_min_state ) {
1724 if( ORTE_SUCCESS != (ret = snapc_full_local_update_coord(ORTE_SNAPC_CKPT_STATE_INC_PREPED, false) ) ) {
1725 ORTE_ERROR_LOG(ret);
1726 goto cleanup;
1727 }
1728 }
1729
1730
1731 return;
1732 }
1733 }
1734
1735
1736
1737
1738
1739 if( ckpt_state == OPAL_CRS_ERROR ) {
1740 vpid_snapshot->super.state = ORTE_SNAPC_CKPT_STATE_ERROR;
1741 } else {
1742 vpid_snapshot->super.state = ORTE_SNAPC_CKPT_STATE_FINISHED_LOCAL;
1743 }
1744
1745
1746
1747
1748 if( currently_migrating && !flushed_modex ) {
1749 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1750 "Local) Read Event: Flush the modex cached data\n"));
1751
1752
1753
1754
1755 #if 0
1756 if (OPAL_SUCCESS != (ret = opal_dstore.remove(NULL, NULL))) {
1757 ORTE_ERROR_LOG(ret);
1758 exit_status = ret;
1759 goto cleanup;
1760 }
1761 #endif
1762
1763 flushed_modex = true;
1764 }
1765
1766
1767
1768
1769 if( ORTE_SNAPC_CKPT_STATE_ERROR == vpid_snapshot->super.state ) {
1770
1771 if( ORTE_SUCCESS != (ret = snapc_full_local_update_coord(ORTE_SNAPC_CKPT_STATE_ERROR, true) ) ) {
1772 ORTE_ERROR_LOG(ret);
1773 }
1774 goto cleanup;
1775 }
1776
1777
1778
1779
1780 loc_min_state = snapc_full_get_min_state();
1781 if( loc_min_state > current_job_ckpt_state &&
1782 ORTE_SNAPC_CKPT_STATE_FINISHED_LOCAL == loc_min_state ) {
1783
1784 orte_snapc_ckpt_state_str(&state_str, loc_min_state);
1785 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1786 "Local) Daemon State Changed: %d (%s)",
1787 (int)loc_min_state, state_str ));
1788 free(state_str);
1789 state_str = NULL;
1790
1791
1792
1793
1794 current_job_ckpt_state = loc_min_state;
1795 if( ORTE_SNAPC_GLOBAL_COORD_TYPE != (orte_snapc_coord_type & ORTE_SNAPC_GLOBAL_COORD_TYPE)) {
1796 if( ORTE_SUCCESS != (ret = snapc_full_local_update_coord(loc_min_state, false) ) ) {
1797 ORTE_ERROR_LOG(ret);
1798 goto cleanup;
1799 }
1800 }
1801
1802
1803
1804
1805
1806 if( !local_global_snapshot.options->stop ) {
1807 orte_sstore.sync(local_global_snapshot.ss_handle);
1808 sstore_local_sync_finished = true;
1809
1810
1811
1812
1813 if( sstore_local_procs_finished ) {
1814 if( ORTE_SUCCESS != (ret = orte_snapc_full_local_reset_coord()) ) {
1815 ORTE_ERROR_LOG(ret);
1816 goto cleanup;
1817 }
1818 }
1819 }
1820
1821
1822 if( ORTE_SNAPC_GLOBAL_COORD_TYPE == (orte_snapc_coord_type & ORTE_SNAPC_GLOBAL_COORD_TYPE)) {
1823 if( ORTE_SUCCESS != (ret = snapc_full_local_update_coord(loc_min_state, false) ) ) {
1824 ORTE_ERROR_LOG(ret);
1825 goto cleanup;
1826 }
1827 }
1828 }
1829
1830 cleanup:
1831
1832
1833
1834 opal_event_del(&(vpid_snapshot->comm_pipe_r_eh));
1835 vpid_snapshot->is_eh_active = false;
1836
1837 if( NULL != state_str ) {
1838 free(state_str);
1839 state_str = NULL;
1840 }
1841
1842 return;
1843 }
1844
1845 static int snapc_full_get_min_state(void)
1846 {
1847 int min_state = ORTE_SNAPC_CKPT_MAX;
1848 orte_snapc_full_app_snapshot_t *vpid_snapshot = NULL;
1849 opal_list_item_t* item = NULL;
1850 char * state_str_a = NULL;
1851 char * state_str_b = NULL;
1852
1853 for(item = opal_list_get_first(&(local_global_snapshot.local_snapshots));
1854 item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
1855 item = opal_list_get_next(item) ) {
1856 vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
1857
1858 if( NULL != state_str_a ) {
1859 free(state_str_a);
1860 }
1861 if( NULL != state_str_b ) {
1862 free(state_str_b);
1863 }
1864
1865 orte_snapc_ckpt_state_str(&state_str_a, vpid_snapshot->super.state);
1866 orte_snapc_ckpt_state_str(&state_str_b, min_state);
1867
1868 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1869 "Local) ... %s Checking [%d %s] vs [%d %s]",
1870 ORTE_NAME_PRINT(&vpid_snapshot->super.process_name),
1871 (int)vpid_snapshot->super.state, state_str_a,
1872 (int)min_state, state_str_b ));
1873 if( min_state > vpid_snapshot->super.state ) {
1874 min_state = vpid_snapshot->super.state;
1875 }
1876 }
1877
1878 if( NULL != state_str_b ) {
1879 free(state_str_b);
1880 state_str_b = NULL;
1881 }
1882 orte_snapc_ckpt_state_str(&state_str_b, min_state);
1883 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1884 "Local) ... Min State [%d %s]",
1885 (int)min_state, state_str_b ));
1886
1887 if( NULL != state_str_a ) {
1888 free(state_str_a);
1889 state_str_a = NULL;
1890 }
1891 if( NULL != state_str_b ) {
1892 free(state_str_b);
1893 state_str_b = NULL;
1894 }
1895
1896 return min_state;
1897 }
1898
1899 static int snapc_full_local_get_vpids(void)
1900 {
1901 orte_snapc_full_app_snapshot_t *vpid_snapshot = NULL;
1902 int i;
1903 orte_proc_t *child = NULL;
1904 size_t list_len = 0;
1905
1906
1907
1908
1909
1910 list_len = opal_list_get_size(&(local_global_snapshot.local_snapshots));
1911 if( list_len > 0 ) {
1912 vpid_snapshot = (orte_snapc_full_app_snapshot_t*)opal_list_get_first(&(local_global_snapshot.local_snapshots));
1913 if( 0 < vpid_snapshot->process_pid ) {
1914 return ORTE_SUCCESS;
1915 }
1916 }
1917
1918
1919
1920
1921 for (i=0; i < orte_local_children->size; i++) {
1922 if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
1923 continue;
1924 }
1925
1926
1927 if( 0 >= list_len ||
1928 NULL == (vpid_snapshot = find_vpid_snapshot(&child->name)) ) {
1929 vpid_snapshot = OBJ_NEW(orte_snapc_full_app_snapshot_t);
1930 opal_list_append(&(local_global_snapshot.local_snapshots), &(vpid_snapshot->super.super));
1931 }
1932
1933
1934 if( 0 >= vpid_snapshot->process_pid ) {
1935 vpid_snapshot->process_pid = child->pid;
1936 vpid_snapshot->super.process_name.jobid = child->name.jobid;
1937 vpid_snapshot->super.process_name.vpid = child->name.vpid;
1938 }
1939 }
1940
1941 return ORTE_SUCCESS;
1942 }
1943
1944 static int snapc_full_local_refresh_vpids(void)
1945 {
1946 opal_list_item_t *v_item = NULL;
1947 orte_snapc_full_app_snapshot_t *vpid_snapshot = NULL;
1948 int i;
1949 orte_proc_t *child = NULL;
1950 bool found = false;
1951
1952
1953
1954
1955
1956 for(v_item = opal_list_get_first(&(local_global_snapshot.local_snapshots));
1957 v_item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
1958 v_item = opal_list_get_next(v_item) ) {
1959 vpid_snapshot = (orte_snapc_full_app_snapshot_t*)v_item;
1960
1961 found = false;
1962 for (i=0; i < orte_local_children->size; i++) {
1963 if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
1964 continue;
1965 }
1966
1967 if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL,
1968 &child->name,
1969 &(vpid_snapshot->super.process_name) )) {
1970 found = true;
1971 break;
1972 }
1973 }
1974 if( !found ) {
1975 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1976 "Local) Refresh List: Remove Process %s (%5d) from Daemon %s",
1977 ORTE_NAME_PRINT(&vpid_snapshot->super.process_name),
1978 vpid_snapshot->process_pid,
1979 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME) ));
1980 opal_list_remove_item(&(local_global_snapshot.local_snapshots), v_item);
1981 }
1982 }
1983
1984
1985
1986
1987
1988 for (i=0; i < orte_local_children->size; i++) {
1989 if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
1990 continue;
1991 }
1992
1993
1994 if( NULL == (vpid_snapshot = find_vpid_snapshot(&child->name)) ) {
1995 vpid_snapshot = OBJ_NEW(orte_snapc_full_app_snapshot_t);
1996
1997 vpid_snapshot->process_pid = child->pid;
1998 vpid_snapshot->super.process_name.jobid = child->name.jobid;
1999 vpid_snapshot->super.process_name.vpid = child->name.vpid;
2000
2001
2002 opal_list_append(&(local_global_snapshot.local_snapshots), &(vpid_snapshot->super.super));
2003
2004 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
2005 "Local) Refresh List: Add Process %s (%5d) to Daemon %s",
2006 ORTE_NAME_PRINT(&vpid_snapshot->super.process_name),
2007 vpid_snapshot->process_pid,
2008 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME) ));
2009 }
2010
2011 else if( 0 >= vpid_snapshot->process_pid ) {
2012 vpid_snapshot->process_pid = child->pid;
2013 vpid_snapshot->super.process_name.jobid = child->name.jobid;
2014 vpid_snapshot->super.process_name.vpid = child->name.vpid;
2015 }
2016 }
2017
2018 return ORTE_SUCCESS;
2019 }
2020
2021 static orte_snapc_full_app_snapshot_t *find_vpid_snapshot(orte_process_name_t *name )
2022 {
2023 opal_list_item_t* item = NULL;
2024 orte_snapc_full_app_snapshot_t *vpid_snapshot = NULL;
2025 orte_ns_cmp_bitmask_t mask;
2026
2027 for(item = opal_list_get_first(&(local_global_snapshot.local_snapshots));
2028 item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
2029 item = opal_list_get_next(item) ) {
2030 vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
2031
2032 mask = ORTE_NS_CMP_ALL;
2033
2034 if (OPAL_EQUAL ==
2035 orte_util_compare_name_fields(mask, name, &vpid_snapshot->super.process_name)) {
2036 return vpid_snapshot;
2037 }
2038 }
2039
2040 return NULL;
2041 }
2042
2043 static int orte_snapc_full_local_reset_coord(void)
2044 {
2045 int ret, exit_status = ORTE_SUCCESS;
2046 orte_snapc_full_app_snapshot_t *vpid_snapshot = NULL;
2047 opal_list_item_t* item = NULL;
2048
2049 OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
2050 "Local) Job Ckpt finished - Cleanup\n"));
2051
2052 for(item = opal_list_get_first(&(local_global_snapshot.local_snapshots));
2053 item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
2054 item = opal_list_get_next(item) ) {
2055 vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
2056
2057
2058
2059
2060
2061 if( vpid_snapshot->comm_pipe_w_fd > 0 ) {
2062 if( ORTE_SUCCESS != (ret = local_coord_job_state_update_finished_local_vpid(vpid_snapshot) ) ) {
2063 ORTE_ERROR_LOG(ORTE_ERROR);
2064 goto cleanup;
2065 }
2066 }
2067
2068 vpid_snapshot->super.state = ORTE_SNAPC_CKPT_STATE_NONE;
2069 }
2070
2071
2072
2073
2074 opal_crs_base_clear_options(local_global_snapshot.options);
2075
2076 currently_migrating = false;
2077 flushed_modex = false;
2078
2079 sstore_local_sync_finished = false;
2080 sstore_local_procs_finished = false;
2081
2082 cleanup:
2083 return exit_status;
2084 }