This source file includes following definitions.
- app_coord_init
- app_coord_finalize
- snapc_full_app_signal_handler
- snapc_full_app_notify_response
- app_notify_resp_stage_1
- app_notify_resp_inc_prep_only
- app_notify_resp_stage_2
- app_define_pipe_names
- app_notify_resp_stage_3
- snapc_full_app_finished_msg
- snapc_full_app_notify_reopen_files
- snapc_full_app_ckpt_handshake_start
- snapc_full_app_ckpt_handshake_end
- app_coord_ft_event
- snapc_full_app_ft_event_update_process_info
- app_coord_request_op
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 #include "orte_config.h"
19
20 #include <errno.h>
21 #include <sys/types.h>
22 #ifdef HAVE_UNISTD_H
23 #include <unistd.h>
24 #endif
25 #ifdef HAVE_FCNTL_H
26 #include <fcntl.h>
27 #endif
28 #ifdef HAVE_SYS_TYPES_H
29 #include <sys/types.h>
30 #endif
31 #ifdef HAVE_SYS_STAT_H
32 #include <sys/stat.h>
33 #endif
34 #include <signal.h>
35 #include <string.h>
36
37 #include "orte/runtime/orte_cr.h"
38 #include "orte/runtime/orte_globals.h"
39 #include "orte/runtime/orte_wait.h"
40 #include "opal/runtime/opal_cr.h"
41 #include "opal/util/output.h"
42 #include "opal/mca/event/event.h"
43 #include "opal/util/opal_environ.h"
44 #include "orte/mca/mca.h"
45 #include "opal/mca/base/base.h"
46 #include "opal/mca/crs/crs.h"
47 #include "opal/mca/crs/base/base.h"
48
49 #include "orte/util/name_fns.h"
50 #include "opal/mca/pmix/pmix.h"
51 #include "orte/mca/snapc/snapc.h"
52 #include "orte/mca/snapc/base/base.h"
53 #include "orte/mca/errmgr/errmgr.h"
54 #include "orte/mca/grpcomm/grpcomm.h"
55 #include "orte/mca/rml/rml.h"
56 #include "orte/mca/rml/rml_types.h"
57 #include "orte/mca/routed/routed.h"
58 #include "orte/mca/routed/base/base.h"
59
60 #include "snapc_full.h"
61
62
63
64
65 static void snapc_full_app_signal_handler (int signo);
66 static int snapc_full_app_notify_response(opal_cr_ckpt_cmd_state_t resp);
67 static int app_notify_resp_stage_1(opal_cr_ckpt_cmd_state_t resp);
68 static int app_notify_resp_stage_2(int cr_state );
69 static int app_notify_resp_stage_3(int cr_state, bool skip_fin_msg);
70 static int app_define_pipe_names(void);
71 static int snapc_full_app_notify_reopen_files(void);
72 static int snapc_full_app_ckpt_handshake_start(opal_cr_ckpt_cmd_state_t resp);
73 static int snapc_full_app_ckpt_handshake_end(int cr_state);
74
75 static int snapc_full_app_ft_event_update_process_info(orte_process_name_t proc, pid_t pid);
76 static int snapc_full_app_finished_msg(int cr_state);
77
78 static int app_notify_resp_inc_prep_only(int cr_state);
79
80 static char *app_comm_pipe_r = NULL;
81 static char *app_comm_pipe_w = NULL;
82 static int app_comm_pipe_r_fd = -1;
83 static int app_comm_pipe_w_fd = -1;
84
85 static opal_crs_base_snapshot_t *local_snapshot = NULL;
86
87 static bool app_notif_processed = false;
88
89 static bool currently_migrating = false;
90 static bool currently_all_migrating = false;
91
92 static bool currently_checkpointing = false;
93 static int current_unique_id = 0;
94
95 static int current_cr_state = OPAL_CRS_NONE;
96
97 static orte_sstore_base_handle_t current_ss_handle = ORTE_SSTORE_HANDLE_INVALID, last_ss_handle = ORTE_SSTORE_HANDLE_INVALID;
98 static opal_crs_base_ckpt_options_t *current_options = NULL;
99
100
101
102
103
104 int app_coord_init()
105 {
106 int ret, exit_status = ORTE_SUCCESS;
107 opal_cr_notify_callback_fn_t prev_notify_func;
108 orte_snapc_full_cmd_flag_t command = ORTE_SNAPC_FULL_REQUEST_OP_CMD;
109 orte_snapc_base_request_op_event_t op_event = ORTE_SNAPC_OP_INIT;
110 opal_buffer_t *buffer = NULL;
111
112 OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
113 "App) Initalized for Application %s\n",
114 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
115
116
117
118
119 opal_cr_reg_notify_callback(snapc_full_app_notify_response, &prev_notify_func);
120
121
122
123
124 current_unique_id = 0;
125 app_define_pipe_names();
126
127
128
129
130
131 if( SIG_ERR == signal(opal_cr_entry_point_signal, snapc_full_app_signal_handler) ) {
132 opal_output(mca_snapc_full_component.super.output_handle,
133 "App) init: Error: Failed to register signal %d\n",
134 opal_cr_entry_point_signal);
135 ORTE_ERROR_LOG(OPAL_ERROR);
136 exit_status = OPAL_ERROR;
137 goto cleanup;
138 }
139
140 OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
141 "app) Named Pipes (%s) (%s), Signal (%d)",
142 app_comm_pipe_r, app_comm_pipe_w, opal_cr_entry_point_signal));
143
144
145
146
147
148
149 if( 0 == ORTE_PROC_MY_NAME->vpid ) {
150 OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
151 "app) Startup Barrier..."));
152 }
153
154 if (ORTE_SUCCESS != (ret = opal_pmix.fence(NULL, 0))) {
155 ORTE_ERROR_LOG(ret);
156 exit_status = ret;
157 goto cleanup;
158 }
159
160 if( 0 == ORTE_PROC_MY_NAME->vpid ) {
161 OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
162 "app) Startup Barrier: Send INIT to HNP...!"));
163
164 buffer = OBJ_NEW(opal_buffer_t);
165
166 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
167 ORTE_ERROR_LOG(ret);
168 exit_status = ret;
169 OBJ_RELEASE(buffer);
170 return ORTE_ERROR;
171 }
172 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(ORTE_PROC_MY_NAME->jobid), 1, ORTE_JOBID))) {
173 ORTE_ERROR_LOG(ret);
174 exit_status = ret;
175 OBJ_RELEASE(buffer);
176 return ORTE_ERROR;
177 }
178
179 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(op_event), 1, OPAL_INT))) {
180 ORTE_ERROR_LOG(ret);
181 exit_status = ret;
182 OBJ_RELEASE(buffer);
183 goto cleanup;
184 }
185
186 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer,
187 ORTE_RML_TAG_SNAPC_FULL,
188 orte_rml_send_callback, 0))) {
189 ORTE_ERROR_LOG(ret);
190 exit_status = ret;
191 OBJ_RELEASE(buffer);
192 return ORTE_ERROR;
193 }
194 }
195
196 if( 0 == ORTE_PROC_MY_NAME->vpid ) {
197 OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
198 "app) Startup Barrier: Done!"));
199 }
200
201 cleanup:
202 return exit_status;
203 }
204
205 int app_coord_finalize()
206 {
207 int ret, exit_status = ORTE_SUCCESS;
208 orte_snapc_full_cmd_flag_t command = ORTE_SNAPC_FULL_REQUEST_OP_CMD;
209 orte_snapc_base_request_op_event_t op_event = ORTE_SNAPC_OP_FIN;
210 opal_buffer_t *buffer = NULL;
211 orte_std_cntr_t count;
212 orte_rml_recv_cb_t *rb = NULL;
213
214
215
216
217
218
219 if( 0 == ORTE_PROC_MY_NAME->vpid ) {
220 OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
221 "app) Shutdown Barrier..."));
222 }
223
224 if (ORTE_SUCCESS != (ret = opal_pmix.fence(NULL, 0))) {
225 ORTE_ERROR_LOG(ret);
226 exit_status = ret;
227 goto cleanup;
228 }
229
230 if( 0 == ORTE_PROC_MY_NAME->vpid ) {
231 OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
232 "app) Shutdown Barrier: Send FIN to HNP...!"));
233
234
235 buffer = OBJ_NEW(opal_buffer_t);
236
237 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
238 ORTE_ERROR_LOG(ret);
239 exit_status = ret;
240 OBJ_RELEASE(buffer);
241 goto cleanup;
242 }
243 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(ORTE_PROC_MY_NAME->jobid), 1, ORTE_JOBID))) {
244 ORTE_ERROR_LOG(ret);
245 exit_status = ret;
246 OBJ_RELEASE(buffer);
247 goto cleanup;
248 }
249
250 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(op_event), 1, OPAL_INT))) {
251 ORTE_ERROR_LOG(ret);
252 exit_status = ret;
253 OBJ_RELEASE(buffer);
254 goto cleanup;
255 }
256
257 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer,
258 ORTE_RML_TAG_SNAPC_FULL,
259 orte_rml_send_callback, 0))) {
260 ORTE_ERROR_LOG(ret);
261 exit_status = ret;
262 OBJ_RELEASE(buffer);
263 goto cleanup;
264 }
265
266
267 buffer = NULL;
268
269 OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
270 "app) Shutdown Barrier: Waiting on FIN_ACK...!"));
271
272
273
274
275
276 rb = OBJ_NEW(orte_rml_recv_cb_t);
277 rb->active = true;
278 orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, orte_rml_recv_callback, rb);
279 ORTE_WAIT_FOR_COMPLETION(rb->active);
280
281 count = 1;
282 if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &command, &count, ORTE_SNAPC_FULL_CMD))) {
283 ORTE_ERROR_LOG(ret);
284 exit_status = ret;
285 goto cleanup;
286 }
287
288 count = 1;
289 if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_event, &count, OPAL_INT))) {
290 ORTE_ERROR_LOG(ret);
291 exit_status = ret;
292 goto cleanup;
293 }
294
295 OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
296 "app) Shutdown Barrier: Waiting on barrier...!"));
297 }
298
299 if( 0 == ORTE_PROC_MY_NAME->vpid ) {
300 OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
301 "app) Shutdown Barrier, Done!"));
302 }
303
304 cleanup:
305
306 if (NULL != buffer) {
307 OBJ_RELEASE(buffer);
308 buffer = NULL;
309 }
310 if (NULL != rb) {
311 OBJ_RELEASE(rb);
312 rb = NULL;
313 }
314
315
316
317
318 if( NULL != app_comm_pipe_r) {
319 free(app_comm_pipe_r);
320 app_comm_pipe_r = NULL;
321 }
322
323 if( NULL != app_comm_pipe_w) {
324 free(app_comm_pipe_w);
325 app_comm_pipe_w = NULL;
326 }
327
328 return exit_status;
329 }
330
331
332
333
334 static void snapc_full_app_signal_handler (int signo)
335 {
336 if( opal_cr_entry_point_signal != signo ) {
337 OPAL_OUTPUT_VERBOSE((1, mca_snapc_full_component.super.output_handle,
338 "App) signal_handler: Received unknown signal %d",
339 signo));
340
341 return;
342 }
343 if( currently_checkpointing ) {
344 opal_output(0, "snapc:full:(app) Error: Received a signal to checkpoint, but Already checkpointing. Ignoring request!");
345 }
346 else {
347 currently_checkpointing = true;
348
349
350
351 opal_cr_checkpoint_request = OPAL_CR_STATUS_REQUESTED;
352
353 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
354 "App) signal_handler: Receive Checkpoint Request."));
355 }
356 }
357
358
359
360
361 int snapc_full_app_notify_response(opal_cr_ckpt_cmd_state_t resp)
362 {
363 static int cr_state;
364 int app_pid;
365 int ret, exit_status = ORTE_SUCCESS;
366
367
368
369
370 if( NULL == current_options ) {
371 current_options = OBJ_NEW(opal_crs_base_ckpt_options_t);
372 }
373
374 if( opal_cr_currently_stalled ) {
375 goto STAGE_1;
376 }
377
378
379 opal_cr_continue_like_restart = false;
380 orte_cr_flush_restart_files = true;
381
382 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
383 "App) notify_response: Stage 1..."));
384 if( ORTE_SUCCESS != (ret = app_notify_resp_stage_1(resp) ) ) {
385 ORTE_ERROR_LOG(ret);
386 exit_status = ret;
387 goto ckpt_cleanup;
388 }
389
390 cr_state = OPAL_CRS_RUNNING;
391 current_cr_state = cr_state;
392
393 #if OPAL_ENABLE_CRDEBUG == 1
394 if( current_options->attach_debugger ) {
395 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
396 "App) notify_response: C/R Debug: Wait for debugger..."));
397 MPIR_debug_with_checkpoint = true;
398 }
399 if( current_options->detach_debugger ) {
400 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
401 "App) notify_response: C/R Debug: Do not wait for debugger..."));
402 MPIR_debug_with_checkpoint = false;
403 }
404 #endif
405
406 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
407 "App) notify_response: Start checkpoint..."));
408 STAGE_1:
409 opal_cr_currently_stalled = false;
410
411 app_pid = getpid();
412 if( orte_snapc_full_skip_app ) {
413 OPAL_OUTPUT_VERBOSE((2, mca_snapc_full_component.super.output_handle,
414 "App) notify_response: Skipping App. (%d)\n",
415 app_pid));
416 ret = ORTE_SUCCESS;
417 cr_state = OPAL_CRS_CONTINUE;
418 }
419 else {
420
421
422
423 if(OPAL_SUCCESS != (ret = opal_cr_inc_core_prep() ) ) {
424 if( OPAL_EXISTS == ret ) {
425 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
426 "App) notify_response: Stalling the checkpoint progress until state is stable again (PID = %d)\n",
427 app_pid));
428 opal_cr_currently_stalled = true;
429 return exit_status;
430 }
431 else {
432 opal_output(mca_snapc_full_component.super.output_handle,
433 "App) notify_response: Error: checkpoint notification failed. %d\n", ret);
434 ORTE_ERROR_LOG(ret);
435 exit_status = ret;
436 goto ckpt_cleanup;
437 }
438 }
439
440
441
442
443
444
445 if( current_options->inc_prep_only ) {
446 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
447 "App) notify_response: INC Prep Only..."));
448 return app_notify_resp_inc_prep_only(cr_state);
449 } else {
450 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
451 "App) notify_response: Normal operation..."));
452 }
453
454
455
456
457
458
459
460 if( currently_all_migrating ) {
461 opal_cr_continue_like_restart = true;
462 orte_cr_flush_restart_files = false;
463 }
464 if( !currently_migrating && currently_all_migrating ) {
465 OPAL_OUTPUT_VERBOSE((2, mca_snapc_full_component.super.output_handle,
466 "App) notify_response: Skipping App. (%d) - This process is not migrating \n",
467 app_pid));
468 ret = ORTE_SUCCESS;
469 cr_state = OPAL_CRS_CONTINUE;
470 }
471 else {
472 ret = opal_cr_inc_core_ckpt(app_pid, local_snapshot, current_options, &cr_state);
473 }
474 current_cr_state = cr_state;
475
476
477
478
479
480
481 if( OPAL_CRS_RESTART != cr_state ) {
482 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
483 "App) notify_response: Stage 2..."));
484 if( ORTE_SUCCESS != (ret = app_notify_resp_stage_2(cr_state) ) ) {
485 ORTE_ERROR_LOG(ret);
486 exit_status = ret;
487 goto ckpt_cleanup;
488 }
489 }
490
491
492
493
494 if( !currently_all_migrating ) {
495 if( OPAL_SUCCESS != (ret = opal_cr_inc_core_recover(cr_state)) ) {
496 ORTE_ERROR_LOG(ret);
497 exit_status = ret;
498 goto ckpt_cleanup;
499 }
500 }
501
502
503
504
505
506 else {
507
508
509
510 if( currently_migrating ) {
511 if( OPAL_CRS_RESTART != cr_state ) {
512 current_options->term = true;
513 }
514 else {
515 if( OPAL_SUCCESS != (ret = opal_cr_inc_core_recover(cr_state)) ) {
516 ORTE_ERROR_LOG(ret);
517 exit_status = ret;
518 goto ckpt_cleanup;
519 }
520 }
521 }
522
523
524
525
526
527 else {
528 if( OPAL_SUCCESS != (ret = opal_cr_inc_core_recover(OPAL_CRS_RESTART)) ) {
529 ORTE_ERROR_LOG(ret);
530 exit_status = ret;
531 goto ckpt_cleanup;
532 }
533 }
534 }
535 }
536
537
538 opal_cr_stall_check = false;
539
540 if(OPAL_CRS_RESTART == cr_state) {
541 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
542 "App) notify_response: Restarting... (%s : %d)\n",
543 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), app_pid));
544
545 current_options->term = false;
546
547 goto ckpt_cleanup;
548 }
549 else if(cr_state == OPAL_CRS_CONTINUE) {
550 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
551 "App) notify_response: Continuing...(%s : %d)\n",
552 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), app_pid));
553 ;
554 }
555 else if(cr_state == OPAL_CRS_TERM ) {
556 ;
557 }
558 else {
559 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
560 "App) notify_response: Unknown cr_state(%d) [%d]",
561 cr_state, app_pid));
562 }
563
564 ckpt_cleanup:
565 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
566 "App) notify_response: Stage 3..."));
567 if( ORTE_SUCCESS != (ret = app_notify_resp_stage_3(cr_state, false) )) {
568 ORTE_ERROR_LOG(ret);
569 exit_status = ret;
570 goto ckpt_cleanup;
571 }
572
573 if( current_options->term ) {
574 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
575 "App) notify_response: User has asked to terminate the application"));
576
577
578
579
580 while(1) {
581 opal_progress();
582 sleep(1);
583 }
584 }
585
586 if( NULL != current_options ) {
587 OBJ_RELEASE(current_options);
588 current_options = NULL;
589 }
590
591 currently_checkpointing = false;
592
593 return exit_status;
594 }
595
596 static int app_notify_resp_stage_1(opal_cr_ckpt_cmd_state_t resp)
597 {
598 int ret, exit_status = ORTE_SUCCESS;
599
600 OPAL_CR_CLEAR_TIMERS();
601 opal_cr_timing_my_rank = ORTE_PROC_MY_NAME->vpid;
602 OPAL_CR_SET_TIMER(OPAL_CR_TIMER_ENTRY0);
603
604
605
606
607 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
608 "App) notify_response: Open Communication Channels."));
609 if (ORTE_SUCCESS != (ret = snapc_full_app_notify_reopen_files())) {
610 ORTE_ERROR_LOG(ret);
611 exit_status = ret;
612 goto cleanup;
613 }
614
615
616
617
618 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
619 "App) notify_response: Initial Handshake."));
620 if( ORTE_SUCCESS != (ret = snapc_full_app_ckpt_handshake_start(resp) ) ) {
621 ORTE_ERROR_LOG(ret);
622 exit_status = ret;
623 goto cleanup;
624 }
625
626 OPAL_CR_SET_TIMER(OPAL_CR_TIMER_ENTRY1);
627
628
629
630
631 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
632 "App) notify_response: Register with SStore..."));
633 if( OPAL_SUCCESS != (ret = orte_sstore.register_handle(current_ss_handle)) ) {
634 ORTE_ERROR_LOG(ret);
635 exit_status = ret;
636 goto cleanup;
637 }
638
639 local_snapshot = OBJ_NEW(opal_crs_base_snapshot_t);
640
641 if( !currently_migrating && currently_all_migrating ) {
642 orte_sstore.set_attr(current_ss_handle,
643 SSTORE_METADATA_LOCAL_SKIP_CKPT,
644 "1");
645 }
646
647 orte_sstore.get_attr(current_ss_handle,
648 SSTORE_METADATA_LOCAL_SNAP_LOC,
649 &(local_snapshot->snapshot_directory));
650 orte_sstore.get_attr(current_ss_handle,
651 SSTORE_METADATA_LOCAL_SNAP_META,
652 &(local_snapshot->metadata_filename));
653
654 OPAL_CR_SET_TIMER(OPAL_CR_TIMER_ENTRY2);
655
656 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
657 "App) notify_response: Start checkpoint... (%d)", (int)current_ss_handle));
658
659 cleanup:
660 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
661 "App) notify_response: Are we migrating [%5s]. Am I migrating [%5s]",
662 (currently_all_migrating ? "True" : "False"),
663 (currently_migrating ? "True" : "False") ));
664
665 return exit_status;
666 }
667
668 static int app_notify_resp_inc_prep_only(int cr_state)
669 {
670 int ret, exit_status = ORTE_SUCCESS;
671
672
673
674
675 if( sizeof(int) != (ret = write(app_comm_pipe_w_fd, &cr_state, sizeof(int))) ) {
676 opal_output(mca_snapc_full_component.super.output_handle,
677 "App) notify_response: Error: Unable to write cr_state to named pipe (%s).\n",
678 app_comm_pipe_w);
679 ORTE_ERROR_LOG(ret);
680 exit_status = ret;
681 goto cleanup;
682 }
683
684 app_notif_processed = true;
685
686 cleanup:
687 return exit_status;
688 }
689
690 static int app_notify_resp_stage_2(int cr_state )
691 {
692 int ret;
693
694 OPAL_CR_SET_TIMER(OPAL_CR_TIMER_ENTRY3);
695
696
697
698
699
700 if( !(current_options->stop) ) {
701 if( currently_migrating || !currently_all_migrating ) {
702 orte_sstore.set_attr(current_ss_handle,
703 SSTORE_METADATA_LOCAL_CRS_COMP,
704 local_snapshot->component_name);
705 }
706
707 orte_sstore.sync(current_ss_handle);
708 }
709 last_ss_handle = current_ss_handle;
710 current_ss_handle = 0;
711
712
713
714
715 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
716 "App) notify_response: Waiting for final handshake."));
717 if( ORTE_SUCCESS != (ret = snapc_full_app_ckpt_handshake_end(cr_state ) ) ) {
718 ORTE_ERROR_LOG(ret);
719 return ret;
720 }
721
722 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
723 "App) notify_response: Final Handshake complete."));
724
725 return ORTE_SUCCESS;
726 }
727
728 static int app_define_pipe_names(void)
729 {
730 if( NULL != app_comm_pipe_r ) {
731 free(app_comm_pipe_r);
732 app_comm_pipe_r = NULL;
733 }
734
735 if( NULL != app_comm_pipe_w ) {
736 free(app_comm_pipe_w);
737 app_comm_pipe_w = NULL;
738 }
739
740 opal_asprintf(&app_comm_pipe_r, "%s/%s.%d_%d",
741 opal_cr_pipe_dir, OPAL_CR_NAMED_PROG_R,
742 (int)getpid(), current_unique_id);
743 opal_asprintf(&app_comm_pipe_w, "%s/%s.%d_%d",
744 opal_cr_pipe_dir, OPAL_CR_NAMED_PROG_W,
745 (int)getpid(), current_unique_id);
746
747 ++current_unique_id;
748
749 return ORTE_SUCCESS;
750 }
751
752 static int app_notify_resp_stage_3(int cr_state, bool skip_fin_msg)
753 {
754
755
756
757 if( !skip_fin_msg ) {
758 snapc_full_app_finished_msg(cr_state);
759 }
760
761
762
763
764 if( 0 <= app_comm_pipe_r_fd ) {
765 close(app_comm_pipe_r_fd);
766 app_comm_pipe_r_fd = -1;
767 }
768 if( 0 <= app_comm_pipe_w_fd ) {
769 close(app_comm_pipe_w_fd);
770 app_comm_pipe_w_fd = -1;
771 }
772
773 remove(app_comm_pipe_r);
774 remove(app_comm_pipe_w);
775
776 app_comm_pipe_r_fd = -1;
777 app_comm_pipe_w_fd = -1;
778
779 if( OPAL_CRS_RESTART == cr_state ) {
780 current_unique_id = 0;
781 }
782
783 app_define_pipe_names();
784
785
786 opal_cr_checkpointing_state = OPAL_CR_STATUS_NONE;
787 opal_cr_currently_stalled = false;
788
789 currently_all_migrating = false;
790 currently_migrating = false;
791
792 OPAL_CR_SET_TIMER(OPAL_CR_TIMER_ENTRY4);
793 if(OPAL_CRS_RESTART != cr_state) {
794 OPAL_CR_DISPLAY_ALL_TIMERS();
795 }
796
797 return ORTE_SUCCESS;
798 }
799
800 static int snapc_full_app_finished_msg(int cr_state) {
801 int ret, exit_status = ORTE_SUCCESS;
802 opal_buffer_t *buffer = NULL;
803 orte_snapc_cmd_flag_t command = ORTE_SNAPC_LOCAL_FINISH_CMD;
804
805 buffer = OBJ_NEW(opal_buffer_t);
806
807 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_CMD))) {
808 ORTE_ERROR_LOG(ret);
809 exit_status = ret;
810 goto cleanup;
811 }
812
813 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &cr_state, 1, OPAL_INT))) {
814 ORTE_ERROR_LOG(ret);
815 exit_status = ret;
816 goto cleanup;
817 }
818
819 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, buffer,
820 ORTE_RML_TAG_SNAPC,
821 orte_rml_send_callback, 0))) {
822 ORTE_ERROR_LOG(ret);
823 exit_status = ret;
824 goto cleanup;
825 }
826
827 return ORTE_SUCCESS;
828 cleanup:
829 OBJ_RELEASE(buffer);
830
831 return exit_status;
832 }
833
834 static int snapc_full_app_notify_reopen_files(void)
835 {
836 int ret = OPAL_ERR_NOT_IMPLEMENTED;
837
838 #ifndef HAVE_MKFIFO
839 return ret;
840 #else
841
842
843
844 if( (ret = mkfifo(app_comm_pipe_r, 0660)) < 0) {
845 if(EEXIST == ret || -1 == ret ) {
846 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
847 "App) notify_reopen_files: mkfifo failed because file (%s) already exists, attempting to use this pipe. (%d)",
848 app_comm_pipe_r, ret));
849 }
850 else {
851 opal_output(mca_snapc_full_component.super.output_handle,
852 "App) notify_reopen_files: Error: mkfifo failed to make named pipe (%s). (%d)\n",
853 app_comm_pipe_r, ret);
854 return ORTE_ERROR;
855 }
856 }
857
858 app_comm_pipe_r_fd = open(app_comm_pipe_r, O_RDWR);
859 if(app_comm_pipe_r_fd < 0) {
860 opal_output(mca_snapc_full_component.super.output_handle,
861 "App) init: Error: open failed to open the named pipe (%s). %d\n",
862 app_comm_pipe_r, app_comm_pipe_r_fd);
863 return ORTE_ERROR;
864 }
865
866
867
868
869 if( (ret = mkfifo(app_comm_pipe_w, 0660)) < 0) {
870 if(EEXIST == ret || -1 == ret ) {
871 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
872 "App) notify_reopen_files: mkfifo failed because file (%s) already exists, attempting to use this pipe. (%d)",
873 app_comm_pipe_w, ret));
874 }
875 else {
876 opal_output(mca_snapc_full_component.super.output_handle,
877 "App) notify_reopen_files: Error: mkfifo failed to make named pipe (%s). (%d)\n",
878 app_comm_pipe_w, ret);
879 return ORTE_ERROR;
880 }
881 }
882
883 app_comm_pipe_w_fd = open(app_comm_pipe_w, O_WRONLY);
884 if(app_comm_pipe_w_fd < 0) {
885 opal_output(mca_snapc_full_component.super.output_handle,
886 "App) notify_reopen_files: Error: open failed to open the named pipe (%s). (%d)\n",
887 app_comm_pipe_w, app_comm_pipe_w_fd);
888 return ORTE_ERROR;
889 }
890
891 return ORTE_SUCCESS;
892 #endif
893 }
894
895 static int snapc_full_app_ckpt_handshake_start(opal_cr_ckpt_cmd_state_t resp)
896 {
897 int ret, exit_status = ORTE_SUCCESS;
898 int tmp_resp, opt_rep;
899
900
901
902
903
904
905
906 if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
907 opal_output(mca_snapc_full_component.super.output_handle,
908 "App) notify_response: Error: Unable to read the all_migrating option from named pipe (%s). %d\n",
909 app_comm_pipe_r, ret);
910 ORTE_ERROR_LOG(ret);
911 goto cleanup;
912 }
913 currently_all_migrating = OPAL_INT_TO_BOOL(opt_rep);
914
915 if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
916 opal_output(mca_snapc_full_component.super.output_handle,
917 "App) notify_response: Error: Unable to read the migrating option from named pipe (%s). %d\n",
918 app_comm_pipe_r, ret);
919 ORTE_ERROR_LOG(ret);
920 goto cleanup;
921 }
922 currently_migrating = OPAL_INT_TO_BOOL(opt_rep);
923
924 if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
925 opal_output(mca_snapc_full_component.super.output_handle,
926 "App) notify_response: Error: Unable to read the 'term' from named pipe (%s). %d\n",
927 app_comm_pipe_r, ret);
928 ORTE_ERROR_LOG(ret);
929 goto cleanup;
930 }
931 current_options->term = OPAL_INT_TO_BOOL(opt_rep);
932
933 if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
934 opal_output(mca_snapc_full_component.super.output_handle,
935 "App) notify_response: Error: Unable to read the 'stop' from named pipe (%s). %d\n",
936 app_comm_pipe_r, ret);
937 ORTE_ERROR_LOG(ret);
938 goto cleanup;
939 }
940 current_options->stop = OPAL_INT_TO_BOOL(opt_rep);
941
942 if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
943 opal_output(mca_snapc_full_component.super.output_handle,
944 "App) notify_response: Error: Unable to read the 'inc_prep_only' from named pipe (%s). %d\n",
945 app_comm_pipe_r, ret);
946 ORTE_ERROR_LOG(ret);
947 goto cleanup;
948 }
949 current_options->inc_prep_only = OPAL_INT_TO_BOOL(opt_rep);
950
951 if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
952 opal_output(mca_snapc_full_component.super.output_handle,
953 "App) notify_response: Error: Unable to read the 'inc_recover_only' from named pipe (%s). %d\n",
954 app_comm_pipe_r, ret);
955 ORTE_ERROR_LOG(ret);
956 goto cleanup;
957 }
958 current_options->inc_recover_only = OPAL_INT_TO_BOOL(opt_rep);
959
960 #if OPAL_ENABLE_CRDEBUG == 1
961 if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
962 opal_output(mca_snapc_full_component.super.output_handle,
963 "App) notify_response: Error: Unable to read the 'attach_debugger' from named pipe (%s). %d\n",
964 app_comm_pipe_r, ret);
965 ORTE_ERROR_LOG(ret);
966 goto cleanup;
967 }
968 current_options->attach_debugger = OPAL_INT_TO_BOOL(opt_rep);
969
970 if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
971 opal_output(mca_snapc_full_component.super.output_handle,
972 "App) notify_response: Error: Unable to read the 'detach_debugger' from named pipe (%s). %d\n",
973 app_comm_pipe_r, ret);
974 ORTE_ERROR_LOG(ret);
975 goto cleanup;
976 }
977 current_options->detach_debugger = OPAL_INT_TO_BOOL(opt_rep);
978 #endif
979
980
981
982
983 if( sizeof(orte_sstore_base_handle_t) != (ret = read(app_comm_pipe_r_fd, ¤t_ss_handle, sizeof(orte_sstore_base_handle_t))) ) {
984 opal_output(mca_snapc_full_component.super.output_handle,
985 "App) notify_response: Error: Unable to read the sstore handle from named pipe (%s). %d\n",
986 app_comm_pipe_r, ret);
987 ORTE_ERROR_LOG(ret);
988 goto cleanup;
989 }
990
991 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
992 "App) %s Received Options... Responding with %d\n",
993 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)resp));
994
995
996
997
998 tmp_resp = (int)resp;
999 if( sizeof(int) != (ret = write(app_comm_pipe_w_fd, &tmp_resp, sizeof(int)) ) ) {
1000 opal_output(mca_snapc_full_component.super.output_handle,
1001 "App) notify_response: %d: Error: Unable to write to pipe (%s) ret = %d [Line %d]\n",
1002 tmp_resp, app_comm_pipe_w, ret, __LINE__);
1003 ORTE_ERROR_LOG(ret);
1004 goto cleanup;
1005 }
1006
1007
1008
1009
1010 if( OPAL_CHECKPOINT_CMD_IN_PROGRESS == resp ) {
1011 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1012 "App) notify_response: Checkpoint in progress, cannot start (%d)",
1013 getpid()));
1014 ORTE_ERROR_LOG(ret);
1015 goto cleanup;
1016 }
1017
1018
1019
1020 else if( OPAL_CHECKPOINT_CMD_NULL == resp ) {
1021 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1022 "App) notify_response: Non-checkpointable application, cannot start (%d)",
1023 getpid()));
1024 ORTE_ERROR_LOG(ret);
1025 goto cleanup;
1026 }
1027
1028
1029
1030
1031 else if( OPAL_CHECKPOINT_CMD_ERROR == resp ) {
1032 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1033 "App) notify_response: Error generated, cannot start (%d)",
1034 getpid()));
1035 ORTE_ERROR_LOG(ret);
1036 goto cleanup;
1037 }
1038
1039
1040
1041
1042 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1043 "App) notify_response: Starting checkpoint request (%d)",
1044 getpid()));
1045
1046
1047
1048
1049
1050
1051
1052 if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
1053 opal_output(mca_snapc_full_component.super.output_handle,
1054 "App) notify_response: Error: Unable to read from named pipe (%s). %d\n",
1055 app_comm_pipe_r, ret);
1056 ORTE_ERROR_LOG(ret);
1057 goto cleanup;
1058 }
1059
1060 cleanup:
1061 return exit_status;
1062 }
1063
1064 static int snapc_full_app_ckpt_handshake_end(int cr_state)
1065 {
1066 int ret, exit_status = ORTE_SUCCESS;
1067 int last_cmd = 0;
1068 int err;
1069
1070
1071
1072
1073 if( sizeof(int) != (ret = write(app_comm_pipe_w_fd, &cr_state, sizeof(int))) ) {
1074 err = errno;
1075 opal_output(mca_snapc_full_component.super.output_handle,
1076 "App) notify_response: Error: Unable to write cr_state to named pipe (%s). %d/%d/%s\n",
1077 app_comm_pipe_w, ret, err, strerror(err));
1078 ORTE_ERROR_LOG(ret);
1079 exit_status = ret;
1080 goto cleanup;
1081 }
1082
1083 if( currently_all_migrating && currently_migrating ) {
1084 app_notify_resp_stage_3(cr_state, true);
1085 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1086 "App) handshake_end: Waiting for termination (%d)",
1087 getpid()));
1088
1089
1090
1091 while(1) {
1092 opal_progress();
1093 sleep(1);
1094 }
1095 }
1096
1097 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1098 "App) handshake_end: Waiting for release (%d)",
1099 getpid()));
1100
1101
1102
1103
1104 if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &last_cmd, sizeof(int))) ) {
1105 opal_output(mca_snapc_full_component.super.output_handle,
1106 "App) notify_response: Error: Unable to read the 'last_cmd' from named pipe (%s). %d\n",
1107 app_comm_pipe_r, ret);
1108 ORTE_ERROR_LOG(ret);
1109 exit_status = ret;
1110 goto cleanup;
1111 }
1112
1113 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1114 "App) handshake_end: Released... (%d)",
1115 getpid()));
1116
1117 cleanup:
1118 return exit_status;
1119 }
1120
1121 int app_coord_ft_event(int state) {
1122 int ret, exit_status = ORTE_SUCCESS;
1123
1124 OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
1125 "App) In ft_event(%d)", state));
1126
1127
1128 if(OPAL_CRS_CHECKPOINT == state) {
1129
1130
1131
1132
1133
1134
1135
1136 orte_sstore.set_attr(orte_sstore_handle_current,
1137 SSTORE_METADATA_LOCAL_MKDIR,
1138 orte_process_info.job_session_dir);
1139
1140
1141
1142
1143 if( current_options->stop ) {
1144 orte_sstore.set_attr(current_ss_handle,
1145 SSTORE_METADATA_LOCAL_CRS_COMP,
1146 opal_crs_base_selected_component.base_version.mca_component_name);
1147
1148 orte_sstore.sync(current_ss_handle);
1149 }
1150 }
1151
1152 else if (OPAL_CRS_CONTINUE == state ) {
1153 #if OPAL_ENABLE_CRDEBUG == 1
1154
1155
1156
1157
1158 if( ORTE_SUCCESS != (ret = snapc_full_app_ft_event_update_process_info(orte_process_info.my_name, getpid())) ) {
1159 ORTE_ERROR_LOG(ret);
1160 exit_status = ret;
1161 goto cleanup;
1162 }
1163 #endif
1164 ;
1165 }
1166
1167 else if (OPAL_CRS_RESTART_PRE == state ) {
1168 ;
1169 }
1170
1171 else if (OPAL_CRS_RESTART == state ) {
1172 OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
1173 "App) Initalized for Application %s (Restart) (%5d)\n",
1174 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), getpid()));
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184 if( ORTE_SUCCESS != (ret = snapc_full_app_ft_event_update_process_info(orte_process_info.my_name, getpid())) ) {
1185 ORTE_ERROR_LOG(ret);
1186 exit_status = ret;
1187 goto cleanup;
1188 }
1189
1190
1191
1192
1193
1194
1195 }
1196
1197 else if (OPAL_CRS_TERM == state ) {
1198 ;
1199 }
1200
1201 else {
1202 ;
1203 }
1204
1205 cleanup:
1206 return exit_status;
1207 }
1208
1209 static int snapc_full_app_ft_event_update_process_info(orte_process_name_t proc, pid_t proc_pid)
1210 {
1211 int ret, exit_status = ORTE_SUCCESS;
1212 opal_buffer_t *buffer = NULL;
1213 orte_snapc_cmd_flag_t command = ORTE_SNAPC_LOCAL_UPDATE_CMD;
1214
1215 buffer = OBJ_NEW(opal_buffer_t);
1216
1217 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_CMD))) {
1218 ORTE_ERROR_LOG(ret);
1219 exit_status = ret;
1220 goto cleanup;
1221 }
1222
1223
1224 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &proc, 1, ORTE_NAME))) {
1225 ORTE_ERROR_LOG(ret);
1226 exit_status = ret;
1227 goto cleanup;
1228 }
1229
1230 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &proc_pid, 1, OPAL_PID))) {
1231 ORTE_ERROR_LOG(ret);
1232 exit_status = ret;
1233 goto cleanup;
1234 }
1235
1236 #if OPAL_ENABLE_CRDEBUG == 1
1237 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &MPIR_debug_with_checkpoint, 1, OPAL_BOOL))) {
1238 ORTE_ERROR_LOG(ret);
1239 exit_status = ret;
1240 goto cleanup;
1241 }
1242 #endif
1243
1244 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, buffer,
1245 ORTE_RML_TAG_SNAPC,
1246 orte_rml_send_callback, 0))) {
1247 ORTE_ERROR_LOG(ret);
1248 exit_status = ret;
1249 goto cleanup;
1250 }
1251
1252 return ORTE_SUCCESS;
1253 cleanup:
1254 OBJ_RELEASE(buffer);
1255
1256 return exit_status;
1257 }
1258
1259 int app_coord_request_op(orte_snapc_base_request_op_t *datum)
1260 {
1261 int ret, exit_status = ORTE_SUCCESS;
1262 orte_snapc_full_cmd_flag_t command = ORTE_SNAPC_FULL_REQUEST_OP_CMD;
1263 opal_buffer_t *buffer = NULL;
1264 orte_std_cntr_t count;
1265 orte_rml_recv_cb_t *rb = NULL;
1266 int op_event, op_state;
1267 char *seq_str = NULL, *tmp_str = NULL;
1268 int cr_state = OPAL_CRS_CONTINUE;
1269 int app_pid, i;
1270
1271
1272
1273
1274 if( ORTE_SNAPC_OP_QUIESCE_END == datum->event) {
1275 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1276 "App) Quiesce_end: Recovering the stack..."));
1277
1278
1279
1280
1281 if( NULL == local_snapshot->component_name ) {
1282 local_snapshot->component_name = strdup("");
1283 }
1284 if( ORTE_SUCCESS != (ret = app_notify_resp_stage_2(cr_state) ) ) {
1285 exit_status = ret;
1286 ORTE_ERROR_LOG(ret);
1287 goto cleanup;
1288 }
1289
1290 if(OPAL_SUCCESS != (ret = opal_cr_inc_core_recover(cr_state) ) ) {
1291 exit_status = ret;
1292 ORTE_ERROR_LOG(ret);
1293 goto cleanup;
1294 }
1295
1296 if( ORTE_SUCCESS != (ret = app_notify_resp_stage_3(cr_state, false) )) {
1297 exit_status = ret;
1298 ORTE_ERROR_LOG(ret);
1299 goto cleanup;
1300 }
1301
1302 currently_checkpointing = false;
1303 app_notif_processed = false;
1304
1305 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1306 "App) Quiesce_end: Recovered."));
1307 }
1308 else if( ORTE_SNAPC_OP_QUIESCE_CHECKPOINT == datum->event) {
1309 app_pid = getpid();
1310 cr_state = OPAL_CRS_RUNNING;
1311 if( OPAL_SUCCESS != (ret = opal_cr_inc_core_ckpt(app_pid, local_snapshot, current_options, &cr_state)) ) {
1312 ORTE_ERROR_LOG(ret);
1313 exit_status = ret;
1314 }
1315
1316 if( OPAL_CRS_RESTART != cr_state ) {
1317 orte_sstore.sync(current_ss_handle);
1318 }
1319
1320 orte_sstore.get_attr(current_ss_handle,
1321 SSTORE_METADATA_GLOBAL_SNAP_SEQ,
1322 &seq_str);
1323 if( NULL != seq_str ) {
1324 datum->seq_num = atoi(seq_str);
1325 } else {
1326 datum->seq_num = -1;
1327 }
1328
1329 orte_sstore.get_attr(current_ss_handle,
1330 SSTORE_METADATA_GLOBAL_SNAP_REF,
1331 &(datum->global_handle));
1332 if( NULL == datum->global_handle ) {
1333 datum->global_handle = strdup("Unknown");
1334 }
1335
1336 return exit_status;
1337 }
1338
1339
1340
1341
1342 if( datum->leader == (int)ORTE_PROC_MY_NAME->vpid ) {
1343 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1344 "App) Request_op: Sending request (%3d)...",
1345 datum->event));
1346
1347
1348
1349 buffer = OBJ_NEW(opal_buffer_t);
1350
1351 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
1352 ORTE_ERROR_LOG(ret);
1353 exit_status = ret;
1354 goto cleanup;
1355 }
1356 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(ORTE_PROC_MY_NAME->jobid), 1, ORTE_JOBID))) {
1357 ORTE_ERROR_LOG(ret);
1358 exit_status = ret;
1359 goto cleanup;
1360 }
1361
1362 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(datum->event), 1, OPAL_INT))) {
1363 ORTE_ERROR_LOG(ret);
1364 exit_status = ret;
1365 goto cleanup;
1366 }
1367
1368 if( ORTE_SNAPC_OP_RESTART == datum->event) {
1369 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(datum->seq_num), 1, OPAL_INT))) {
1370 ORTE_ERROR_LOG(ret);
1371 exit_status = ret;
1372 goto cleanup;
1373 }
1374 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(datum->global_handle), 1, OPAL_STRING))) {
1375 ORTE_ERROR_LOG(ret);
1376 exit_status = ret;
1377 goto cleanup;
1378 }
1379 }
1380 else if( ORTE_SNAPC_OP_MIGRATE == datum->event) {
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401 currently_all_migrating = true;
1402
1403
1404
1405
1406 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(datum->mig_num), 1, OPAL_INT))) {
1407 ORTE_ERROR_LOG(ret);
1408 exit_status = ret;
1409 goto cleanup;
1410 }
1411
1412 for( i = 0; i < datum->mig_num; ++i ) {
1413 OPAL_OUTPUT_VERBOSE((30, mca_snapc_full_component.super.output_handle,
1414 "App) Migration %3d/%3d: Sending Rank %3d - Requested <%s> (%3d) %c\n",
1415 datum->mig_num, i,
1416 (datum->mig_vpids)[i],
1417 (datum->mig_host_pref)[i],
1418 (datum->mig_vpid_pref)[i],
1419 (OPAL_INT_TO_BOOL((datum->mig_off_node)[i]) ? 'T' : 'F')
1420 ));
1421
1422 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &((datum->mig_vpids)[i]), 1, OPAL_INT))) {
1423 ORTE_ERROR_LOG(ret);
1424 exit_status = ret;
1425 goto cleanup;
1426 }
1427 tmp_str = strdup((datum->mig_host_pref)[i]);
1428 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &tmp_str, 1, OPAL_STRING))) {
1429 ORTE_ERROR_LOG(ret);
1430 exit_status = ret;
1431 goto cleanup;
1432 }
1433 if( NULL != tmp_str ) {
1434 free(tmp_str);
1435 tmp_str = NULL;
1436 }
1437
1438 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &((datum->mig_vpid_pref)[i]), 1, OPAL_INT))) {
1439 ORTE_ERROR_LOG(ret);
1440 exit_status = ret;
1441 goto cleanup;
1442 }
1443 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &((datum->mig_off_node)[i]), 1, OPAL_INT))) {
1444 ORTE_ERROR_LOG(ret);
1445 exit_status = ret;
1446 goto cleanup;
1447 }
1448 }
1449 }
1450
1451 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer, ORTE_RML_TAG_SNAPC_FULL,
1452 orte_rml_send_callback, 0))) {
1453 ORTE_ERROR_LOG(ret);
1454 exit_status = ret;
1455 goto cleanup;
1456 }
1457
1458 buffer = NULL;
1459 }
1460
1461
1462
1463
1464 if( ORTE_SNAPC_OP_CHECKPOINT == datum->event) {
1465 if( datum->leader == (int)ORTE_PROC_MY_NAME->vpid ) {
1466
1467
1468
1469 while(OPAL_CRS_CONTINUE != current_cr_state &&
1470 OPAL_CRS_RESTART != current_cr_state &&
1471 OPAL_CRS_ERROR != current_cr_state ) {
1472 opal_progress();
1473 OPAL_CR_TEST_CHECKPOINT_READY();
1474 }
1475
1476
1477 if( OPAL_CRS_RESTART == current_cr_state ) {
1478 orte_sstore.get_attr(current_ss_handle,
1479 SSTORE_METADATA_GLOBAL_SNAP_SEQ,
1480 &seq_str);
1481 if( NULL != seq_str ) {
1482 datum->seq_num = atoi(seq_str);
1483 } else {
1484 datum->seq_num = -1;
1485 }
1486
1487 orte_sstore.get_attr(current_ss_handle,
1488 SSTORE_METADATA_GLOBAL_SNAP_REF,
1489 &(datum->global_handle));
1490 if( NULL == datum->global_handle ) {
1491 datum->global_handle = strdup("Unknown");
1492 }
1493
1494 current_cr_state = OPAL_CRS_NONE;
1495
1496 exit_status = ORTE_SUCCESS;
1497 goto cleanup;
1498 }
1499
1500
1501
1502
1503 rb = OBJ_NEW(orte_rml_recv_cb_t);
1504 rb->active = true;
1505 orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, orte_rml_recv_callback, rb);
1506 ORTE_WAIT_FOR_COMPLETION(rb->active);
1507
1508 count = 1;
1509 if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &command, &count, ORTE_SNAPC_FULL_CMD))) {
1510 ORTE_ERROR_LOG(ret);
1511 exit_status = ret;
1512 goto cleanup;
1513 }
1514
1515 count = 1;
1516 if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_event, &count, OPAL_INT))) {
1517 ORTE_ERROR_LOG(ret);
1518 exit_status = ret;
1519 goto cleanup;
1520 }
1521
1522 count = 1;
1523 if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_state, &count, OPAL_INT))) {
1524 ORTE_ERROR_LOG(ret);
1525 exit_status = ret;
1526 goto cleanup;
1527 }
1528
1529 orte_sstore.get_attr(last_ss_handle,
1530 SSTORE_METADATA_GLOBAL_SNAP_SEQ,
1531 &seq_str);
1532 datum->seq_num = atoi(seq_str);
1533
1534 orte_sstore.get_attr(last_ss_handle,
1535 SSTORE_METADATA_GLOBAL_SNAP_REF,
1536 &(datum->global_handle));
1537 }
1538 }
1539
1540
1541
1542 else if( ORTE_SNAPC_OP_RESTART == datum->event) {
1543 while( 1 ) {
1544 opal_progress();
1545 OPAL_CR_TEST_CHECKPOINT_READY();
1546 sleep(1);
1547 }
1548 }
1549
1550
1551
1552 else if( ORTE_SNAPC_OP_MIGRATE == datum->event) {
1553 if( datum->leader == (int)ORTE_PROC_MY_NAME->vpid ) {
1554 while( currently_all_migrating ) {
1555 opal_progress();
1556 OPAL_CR_TEST_CHECKPOINT_READY();
1557 sleep(1);
1558 }
1559
1560 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1561 "App) Request_op: Leader waiting for Migrate release (%3d)...",
1562 datum->event));
1563
1564
1565
1566
1567
1568 rb = OBJ_NEW(orte_rml_recv_cb_t);
1569 rb->active = true;
1570 orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, orte_rml_recv_callback, rb);
1571 ORTE_WAIT_FOR_COMPLETION(rb->active);
1572
1573 count = 1;
1574 if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &command, &count, ORTE_SNAPC_FULL_CMD))) {
1575 ORTE_ERROR_LOG(ret);
1576 exit_status = ret;
1577 goto cleanup;
1578 }
1579
1580 count = 1;
1581 if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_event, &count, OPAL_INT))) {
1582 ORTE_ERROR_LOG(ret);
1583 exit_status = ret;
1584 goto cleanup;
1585 }
1586
1587 count = 1;
1588 if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_state, &count, OPAL_INT))) {
1589 ORTE_ERROR_LOG(ret);
1590 exit_status = ret;
1591 goto cleanup;
1592 }
1593
1594 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1595 "App) Request_op: Leader continuing from Migration (%3d)...",
1596 datum->event));
1597 }
1598 }
1599
1600
1601
1602 else if( ORTE_SNAPC_OP_QUIESCE_START == datum->event) {
1603 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1604 "App) Quiesce_start: Waiting for release..."));
1605
1606 while( !app_notif_processed ) {
1607 opal_progress();
1608 OPAL_CR_TEST_CHECKPOINT_READY();
1609 }
1610
1611 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1612 "App) Quiesce_start: Released"));
1613 }
1614
1615
1616
1617 else if( ORTE_SNAPC_OP_QUIESCE_END == datum->event) {
1618 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1619 "App) Quiesce_end: Waiting for release..."));
1620
1621 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1622 "App) Quiesce_end: Released"));
1623 }
1624
1625
1626 cleanup:
1627 if (NULL != buffer) {
1628 OBJ_RELEASE(buffer);
1629 buffer = NULL;
1630 }
1631 if (NULL != rb) {
1632 OBJ_RELEASE(rb);
1633 rb = NULL;
1634 }
1635
1636 if( NULL != seq_str ) {
1637 free(seq_str);
1638 seq_str = NULL;
1639 }
1640
1641 if( NULL != tmp_str ) {
1642 free(tmp_str);
1643 tmp_str = NULL;
1644 }
1645
1646 return exit_status;
1647 }