root/orte/mca/snapc/full/snapc_full_app.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. app_coord_init
  2. app_coord_finalize
  3. snapc_full_app_signal_handler
  4. snapc_full_app_notify_response
  5. app_notify_resp_stage_1
  6. app_notify_resp_inc_prep_only
  7. app_notify_resp_stage_2
  8. app_define_pipe_names
  9. app_notify_resp_stage_3
  10. snapc_full_app_finished_msg
  11. snapc_full_app_notify_reopen_files
  12. snapc_full_app_ckpt_handshake_start
  13. snapc_full_app_ckpt_handshake_end
  14. app_coord_ft_event
  15. snapc_full_app_ft_event_update_process_info
  16. app_coord_request_op

   1 /*
   2  * Copyright (c) 2004-2012 The Trustees of Indiana University.
   3  *                         All rights reserved.
   4  * Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
   5  *                         All rights reserved.
   6  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   7  *                         University of Stuttgart.  All rights reserved.
   8  * Copyright (c) 2004-2005 The Regents of the University of California.
   9  *                         All rights reserved.
  10  * Copyright (c) 2018      Intel, Inc.  All rights reserved.
  11  * $COPYRIGHT$
  12  *
  13  * Additional copyrights may follow
  14  *
  15  * $HEADER$
  16  */
  17 
  18 #include "orte_config.h"
  19 
  20 #include <errno.h>
  21 #include <sys/types.h>
  22 #ifdef HAVE_UNISTD_H
  23 #include <unistd.h>
  24 #endif  /* HAVE_UNISTD_H */
  25 #ifdef HAVE_FCNTL_H
  26 #include <fcntl.h>
  27 #endif  /* HAVE_FCNTL_H */
  28 #ifdef HAVE_SYS_TYPES_H
  29 #include <sys/types.h>
  30 #endif  /* HAVE_SYS_TYPES_H */
  31 #ifdef HAVE_SYS_STAT_H
  32 #include <sys/stat.h>  /* for mkfifo */
  33 #endif  /* HAVE_SYS_STAT_H */
  34 #include <signal.h>
  35 #include <string.h>
  36 
  37 #include "orte/runtime/orte_cr.h"
  38 #include "orte/runtime/orte_globals.h"
  39 #include "orte/runtime/orte_wait.h"
  40 #include "opal/runtime/opal_cr.h"
  41 #include "opal/util/output.h"
  42 #include "opal/mca/event/event.h"
  43 #include "opal/util/opal_environ.h"
  44 #include "orte/mca/mca.h"
  45 #include "opal/mca/base/base.h"
  46 #include "opal/mca/crs/crs.h"
  47 #include "opal/mca/crs/base/base.h"
  48 
  49 #include "orte/util/name_fns.h"
  50 #include "opal/mca/pmix/pmix.h"
  51 #include "orte/mca/snapc/snapc.h"
  52 #include "orte/mca/snapc/base/base.h"
  53 #include "orte/mca/errmgr/errmgr.h"
  54 #include "orte/mca/grpcomm/grpcomm.h"
  55 #include "orte/mca/rml/rml.h"
  56 #include "orte/mca/rml/rml_types.h"
  57 #include "orte/mca/routed/routed.h"
  58 #include "orte/mca/routed/base/base.h"
  59 
  60 #include "snapc_full.h"
  61 
  62 /************************************
  63  * Locally Global vars & functions :)
  64  ************************************/
  65 static void snapc_full_app_signal_handler (int signo);
  66 static int snapc_full_app_notify_response(opal_cr_ckpt_cmd_state_t resp);
  67 static int app_notify_resp_stage_1(opal_cr_ckpt_cmd_state_t resp);
  68 static int app_notify_resp_stage_2(int cr_state );
  69 static int app_notify_resp_stage_3(int cr_state, bool skip_fin_msg);
  70 static int app_define_pipe_names(void);
  71 static int snapc_full_app_notify_reopen_files(void);
  72 static int snapc_full_app_ckpt_handshake_start(opal_cr_ckpt_cmd_state_t resp);
  73 static int snapc_full_app_ckpt_handshake_end(int cr_state);
  74 
  75 static int snapc_full_app_ft_event_update_process_info(orte_process_name_t proc, pid_t pid);
  76 static int snapc_full_app_finished_msg(int cr_state);
  77 
  78 static int app_notify_resp_inc_prep_only(int cr_state);
  79 
  80 static char *app_comm_pipe_r = NULL;
  81 static char *app_comm_pipe_w = NULL;
  82 static int   app_comm_pipe_r_fd = -1;
  83 static int   app_comm_pipe_w_fd = -1;
  84 
  85 static opal_crs_base_snapshot_t *local_snapshot = NULL;
  86 
  87 static bool app_notif_processed = false;
  88 
  89 static bool currently_migrating = false;
  90 static bool currently_all_migrating = false;
  91 
  92 static bool currently_checkpointing = false;
  93 static int  current_unique_id = 0;
  94 
  95 static int current_cr_state = OPAL_CRS_NONE;
  96 
  97 static orte_sstore_base_handle_t current_ss_handle = ORTE_SSTORE_HANDLE_INVALID, last_ss_handle = ORTE_SSTORE_HANDLE_INVALID;
  98 static opal_crs_base_ckpt_options_t *current_options = NULL;
  99 
 100 /************************
 101  * Function Definitions
 102  ************************/
 103 
 104 int app_coord_init()
 105 {
 106     int ret, exit_status  = ORTE_SUCCESS;
 107     opal_cr_notify_callback_fn_t prev_notify_func;
 108     orte_snapc_full_cmd_flag_t command = ORTE_SNAPC_FULL_REQUEST_OP_CMD;
 109     orte_snapc_base_request_op_event_t op_event = ORTE_SNAPC_OP_INIT;
 110     opal_buffer_t *buffer = NULL;
 111 
 112     OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
 113                          "App) Initalized for Application %s\n",
 114                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
 115 
 116     /*
 117      * Register the INC notification callback
 118      */
 119     opal_cr_reg_notify_callback(snapc_full_app_notify_response, &prev_notify_func);
 120 
 121     /*
 122      * Set the pipe names
 123      */
 124     current_unique_id = 0;
 125     app_define_pipe_names();
 126 
 127     /*
 128      * Setup a signal handler to catch and start the proper thread
 129      * to handle the checkpoint
 130      */
 131     if( SIG_ERR == signal(opal_cr_entry_point_signal, snapc_full_app_signal_handler) ) {
 132         opal_output(mca_snapc_full_component.super.output_handle,
 133                     "App) init: Error: Failed to register signal %d\n",
 134                     opal_cr_entry_point_signal);
 135         ORTE_ERROR_LOG(OPAL_ERROR);
 136         exit_status = OPAL_ERROR;
 137         goto cleanup;
 138     }
 139 
 140     OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
 141                          "app) Named Pipes (%s) (%s), Signal (%d)",
 142                          app_comm_pipe_r, app_comm_pipe_w, opal_cr_entry_point_signal));
 143 
 144     /*
 145      * All processes must sync here, so the Global coordinator can know that
 146      * it is safe to checkpoint now.
 147      * Rank 0: Sends confirmation message to the Global Coordinator
 148      */
 149     if( 0 == ORTE_PROC_MY_NAME->vpid ) {
 150         OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
 151                              "app) Startup Barrier..."));
 152     }
 153 
 154     if (ORTE_SUCCESS != (ret = opal_pmix.fence(NULL, 0))) {
 155         ORTE_ERROR_LOG(ret);
 156         exit_status = ret;
 157         goto cleanup;
 158     }
 159 
 160     if( 0 == ORTE_PROC_MY_NAME->vpid ) {
 161         OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
 162                              "app) Startup Barrier: Send INIT to HNP...!"));
 163 
 164         buffer = OBJ_NEW(opal_buffer_t);
 165 
 166         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
 167             ORTE_ERROR_LOG(ret);
 168             exit_status = ret;
 169             OBJ_RELEASE(buffer);
 170             return ORTE_ERROR;
 171         }
 172         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(ORTE_PROC_MY_NAME->jobid), 1, ORTE_JOBID))) {
 173             ORTE_ERROR_LOG(ret);
 174             exit_status = ret;
 175             OBJ_RELEASE(buffer);
 176             return ORTE_ERROR;
 177         }
 178 
 179         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(op_event), 1, OPAL_INT))) {
 180             ORTE_ERROR_LOG(ret);
 181             exit_status = ret;
 182             OBJ_RELEASE(buffer);
 183             goto cleanup;
 184         }
 185 
 186         if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer,
 187                                                            ORTE_RML_TAG_SNAPC_FULL,
 188                                                            orte_rml_send_callback, 0))) {
 189             ORTE_ERROR_LOG(ret);
 190             exit_status = ret;
 191             OBJ_RELEASE(buffer);
 192             return ORTE_ERROR;
 193         }
 194     }
 195 
 196     if( 0 == ORTE_PROC_MY_NAME->vpid ) {
 197         OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
 198                              "app) Startup Barrier: Done!"));
 199     }
 200 
 201  cleanup:
 202     return exit_status;
 203 }
 204 
 205 int app_coord_finalize()
 206 {
 207     int ret, exit_status = ORTE_SUCCESS;
 208     orte_snapc_full_cmd_flag_t command = ORTE_SNAPC_FULL_REQUEST_OP_CMD;
 209     orte_snapc_base_request_op_event_t op_event = ORTE_SNAPC_OP_FIN;
 210     opal_buffer_t *buffer = NULL;
 211     orte_std_cntr_t count;
 212     orte_rml_recv_cb_t *rb = NULL;
 213 
 214     /*
 215      * All processes must sync here, so the Global coordinator can know that
 216      * it is no longer safe to checkpoint.
 217      * Rank 0: Sends confirmation message to the Global Coordinator
 218      */
 219     if( 0 == ORTE_PROC_MY_NAME->vpid ) {
 220         OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
 221                              "app) Shutdown Barrier..."));
 222     }
 223 
 224     if (ORTE_SUCCESS != (ret = opal_pmix.fence(NULL, 0))) {
 225         ORTE_ERROR_LOG(ret);
 226         exit_status = ret;
 227         goto cleanup;
 228     }
 229 
 230     if( 0 == ORTE_PROC_MY_NAME->vpid ) {
 231         OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
 232                              "app) Shutdown Barrier: Send FIN to HNP...!"));
 233 
 234         /* Tell HNP that we are finalizing */
 235         buffer = OBJ_NEW(opal_buffer_t);
 236 
 237         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
 238             ORTE_ERROR_LOG(ret);
 239             exit_status = ret;
 240             OBJ_RELEASE(buffer);
 241             goto cleanup;
 242         }
 243         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(ORTE_PROC_MY_NAME->jobid), 1, ORTE_JOBID))) {
 244             ORTE_ERROR_LOG(ret);
 245             exit_status = ret;
 246             OBJ_RELEASE(buffer);
 247             goto cleanup;
 248         }
 249 
 250         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(op_event), 1, OPAL_INT))) {
 251             ORTE_ERROR_LOG(ret);
 252             exit_status = ret;
 253             OBJ_RELEASE(buffer);
 254             goto cleanup;
 255         }
 256 
 257         if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer,
 258                                                            ORTE_RML_TAG_SNAPC_FULL,
 259                                                            orte_rml_send_callback, 0))) {
 260             ORTE_ERROR_LOG(ret);
 261             exit_status = ret;
 262             OBJ_RELEASE(buffer);
 263             goto cleanup;
 264         }
 265 
 266         /* buffer should not be released here; the callback releases it */
 267         buffer = NULL;
 268 
 269         OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
 270                              "app) Shutdown Barrier: Waiting on FIN_ACK...!"));
 271 
 272         /* Wait for HNP to tell us that it is ok to finish finalization.
 273          * We could have been checkpointing just as we entered finalize, so we
 274          * need to wait until the checkpoint is finished before finishing.
 275          */
 276         rb = OBJ_NEW(orte_rml_recv_cb_t);
 277         rb->active = true;
 278         orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, orte_rml_recv_callback, rb);
 279         ORTE_WAIT_FOR_COMPLETION(rb->active);
 280 
 281         count = 1;
 282         if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &command, &count, ORTE_SNAPC_FULL_CMD))) {
 283             ORTE_ERROR_LOG(ret);
 284             exit_status = ret;
 285             goto cleanup;
 286         }
 287 
 288         count = 1;
 289         if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_event, &count, OPAL_INT))) {
 290             ORTE_ERROR_LOG(ret);
 291             exit_status = ret;
 292             goto cleanup;
 293         }
 294 
 295         OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
 296                              "app) Shutdown Barrier: Waiting on barrier...!"));
 297     }
 298 
 299     if( 0 == ORTE_PROC_MY_NAME->vpid ) {
 300         OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
 301                              "app) Shutdown Barrier, Done!"));
 302     }
 303 
 304  cleanup:
 305     /* cleanup */
 306     if (NULL != buffer) {
 307         OBJ_RELEASE(buffer);
 308         buffer = NULL;
 309     }
 310     if (NULL != rb) {
 311         OBJ_RELEASE(rb);
 312         rb = NULL;
 313     }
 314 
 315     /*
 316      * Cleanup named pipes
 317      */
 318     if( NULL != app_comm_pipe_r) {
 319         free(app_comm_pipe_r);
 320         app_comm_pipe_r = NULL;
 321     }
 322 
 323     if( NULL != app_comm_pipe_w) {
 324         free(app_comm_pipe_w);
 325         app_comm_pipe_w = NULL;
 326     }
 327 
 328     return exit_status;
 329 }
 330 
 331 /******************
 332  * Local functions
 333  ******************/
 334 static void snapc_full_app_signal_handler (int signo)
 335 {
 336     if( opal_cr_entry_point_signal != signo ) {
 337         OPAL_OUTPUT_VERBOSE((1, mca_snapc_full_component.super.output_handle,
 338                              "App) signal_handler: Received unknown signal %d",
 339                              signo));
 340         /* Not our signal */
 341         return;
 342     }
 343     if( currently_checkpointing ) {
 344         opal_output(0, "snapc:full:(app) Error: Received a signal to checkpoint, but Already checkpointing. Ignoring request!");
 345     }
 346     else {
 347         currently_checkpointing = true;
 348         /*
 349          * Signal thread to start checkpoint handshake
 350          */
 351         opal_cr_checkpoint_request   = OPAL_CR_STATUS_REQUESTED;
 352 
 353         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 354                              "App) signal_handler: Receive Checkpoint Request."));
 355     }
 356 }
 357 
 358 /*
 359  * Respond to an asynchronous checkpoint request
 360  */
 361 int snapc_full_app_notify_response(opal_cr_ckpt_cmd_state_t resp)
 362 {
 363     static int cr_state;
 364     int app_pid;
 365     int ret, exit_status = ORTE_SUCCESS;
 366 
 367     /*
 368      * Clear the options set
 369      */
 370     if( NULL == current_options ) {
 371         current_options = OBJ_NEW(opal_crs_base_ckpt_options_t);
 372     }
 373 
 374     if( opal_cr_currently_stalled ) {
 375         goto STAGE_1;
 376     }
 377 
 378     /* Default: use the fast way */
 379     opal_cr_continue_like_restart = false;
 380     orte_cr_flush_restart_files   = true;
 381 
 382     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 383                          "App) notify_response: Stage 1..."));
 384     if( ORTE_SUCCESS != (ret = app_notify_resp_stage_1(resp) ) ) {
 385         ORTE_ERROR_LOG(ret);
 386         exit_status = ret;
 387         goto ckpt_cleanup;
 388     }
 389 
 390     cr_state = OPAL_CRS_RUNNING;
 391     current_cr_state = cr_state;
 392 
 393 #if OPAL_ENABLE_CRDEBUG == 1
 394     if( current_options->attach_debugger ) {
 395         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 396                              "App) notify_response: C/R Debug: Wait for debugger..."));
 397         MPIR_debug_with_checkpoint = true;
 398     }
 399     if( current_options->detach_debugger ) {
 400         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 401                              "App) notify_response: C/R Debug: Do not wait for debugger..."));
 402         MPIR_debug_with_checkpoint = false;
 403     }
 404 #endif
 405 
 406     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 407                          "App) notify_response: Start checkpoint..."));
 408  STAGE_1:
 409     opal_cr_currently_stalled = false;
 410 
 411     app_pid = getpid();
 412     if( orte_snapc_full_skip_app ) {
 413         OPAL_OUTPUT_VERBOSE((2, mca_snapc_full_component.super.output_handle,
 414                              "App) notify_response: Skipping App. (%d)\n",
 415                              app_pid));
 416         ret = ORTE_SUCCESS;
 417         cr_state = OPAL_CRS_CONTINUE;
 418     }
 419     else {
 420         /*
 421          * INC: Prepare stack using the registered coordination routine
 422          */
 423         if(OPAL_SUCCESS != (ret = opal_cr_inc_core_prep() ) ) {
 424             if( OPAL_EXISTS == ret ) {
 425                 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
 426                                      "App) notify_response: Stalling the checkpoint progress until state is stable again (PID = %d)\n",
 427                                      app_pid));
 428                 opal_cr_currently_stalled = true;
 429                 return exit_status;
 430             }
 431             else {
 432                 opal_output(mca_snapc_full_component.super.output_handle,
 433                             "App) notify_response: Error: checkpoint notification failed. %d\n", ret);
 434                 ORTE_ERROR_LOG(ret);
 435                 exit_status = ret;
 436                 goto ckpt_cleanup;
 437             }
 438         }
 439 
 440         /*
 441          * If this is a quiesce_start operation then we can stop here after calling
 442          * the INC prep. Need to keep the connection open for the quiesce_end()
 443          * operation though.
 444          */
 445         if( current_options->inc_prep_only ) {
 446             OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 447                                  "App) notify_response: INC Prep Only..."));
 448             return app_notify_resp_inc_prep_only(cr_state);
 449         } else {
 450             OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 451                                  "App) notify_response: Normal operation..."));
 452         }
 453 
 454         /*
 455          * INC: Take the checkpoint
 456          *
 457          * If migrating, only checkpoint if you are the target process
 458          * otherwise just continue.
 459          */
 460         if( currently_all_migrating ) {
 461             opal_cr_continue_like_restart = true;
 462             orte_cr_flush_restart_files   = false;
 463         }
 464         if( !currently_migrating && currently_all_migrating ) {
 465             OPAL_OUTPUT_VERBOSE((2, mca_snapc_full_component.super.output_handle,
 466                                  "App) notify_response: Skipping App. (%d) - This process is not migrating \n",
 467                                  app_pid));
 468             ret = ORTE_SUCCESS;
 469             cr_state = OPAL_CRS_CONTINUE;
 470         }
 471         else {
 472             ret = opal_cr_inc_core_ckpt(app_pid, local_snapshot, current_options, &cr_state);
 473         }
 474         current_cr_state = cr_state;
 475 
 476         /*
 477          * Tell Local Coordinator that we are done with local checkpoint
 478          * (only if not restarting, on restart we are not attached to the Local
 479          *  Coordinator. )
 480          */
 481         if( OPAL_CRS_RESTART != cr_state ) {
 482             OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
 483                                  "App) notify_response: Stage 2..."));
 484             if( ORTE_SUCCESS != (ret = app_notify_resp_stage_2(cr_state) ) ) {
 485                 ORTE_ERROR_LOG(ret);
 486                 exit_status = ret;
 487                 goto ckpt_cleanup;
 488             }
 489         }
 490 
 491         /*
 492          * INC: Recover stack using the registered coordination routine
 493          */
 494         if( !currently_all_migrating ) {
 495             if( OPAL_SUCCESS != (ret = opal_cr_inc_core_recover(cr_state)) ) {
 496                 ORTE_ERROR_LOG(ret);
 497                 exit_status = ret;
 498                 goto ckpt_cleanup;
 499             }
 500         }
 501         /*
 502          * If this is a migrating target process, then do not recover the stack, but terminate.
 503          * All non-migrating processes will wait in the recovery until the target processes are
 504          * restarted on the target nodes.
 505          */
 506         else {
 507             /*
 508              * If we are one of the processes migrating, then terminate after checkpointing
 509              */
 510             if( currently_migrating ) {
 511                 if( OPAL_CRS_RESTART != cr_state ) {
 512                     current_options->term = true;
 513                 }
 514                 else {
 515                     if( OPAL_SUCCESS != (ret = opal_cr_inc_core_recover(cr_state)) ) {
 516                         ORTE_ERROR_LOG(ret);
 517                         exit_status = ret;
 518                         goto ckpt_cleanup;
 519                     }
 520                 }
 521             }
 522             /*
 523              * If we are not one of the processes migrating, then wait for release.
 524              * Need to act like we are restarting during recovery, since the migrating processes
 525              * will expect this logic.
 526              */
 527             else {
 528                 if( OPAL_SUCCESS != (ret = opal_cr_inc_core_recover(OPAL_CRS_RESTART)) ) {
 529                     ORTE_ERROR_LOG(ret);
 530                     exit_status = ret;
 531                     goto ckpt_cleanup;
 532                 }
 533             }
 534         }
 535     }
 536 
 537     /* Don't stall any longer */
 538     opal_cr_stall_check = false;
 539 
 540     if(OPAL_CRS_RESTART == cr_state) {
 541         OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
 542                              "App) notify_response: Restarting... (%s : %d)\n",
 543                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), app_pid));
 544 
 545         current_options->term = false;
 546         /* Do not respond to the non-existent command line tool */
 547         goto ckpt_cleanup;
 548     }
 549     else if(cr_state == OPAL_CRS_CONTINUE) {
 550         OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
 551                              "App) notify_response: Continuing...(%s : %d)\n",
 552                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), app_pid));
 553         ; /* Don't need to do anything here */
 554     }
 555     else if(cr_state == OPAL_CRS_TERM ) {
 556         ; /* Don't need to do anything here */
 557     }
 558     else {
 559         OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
 560                              "App) notify_response: Unknown cr_state(%d) [%d]",
 561                              cr_state, app_pid));
 562     }
 563 
 564  ckpt_cleanup:
 565     OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
 566                          "App) notify_response: Stage 3..."));
 567     if( ORTE_SUCCESS != (ret = app_notify_resp_stage_3(cr_state, false) )) {
 568         ORTE_ERROR_LOG(ret);
 569         exit_status = ret;
 570         goto ckpt_cleanup;
 571     }
 572 
 573     if( current_options->term ) {
 574         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 575                              "App) notify_response: User has asked to terminate the application"));
 576         /* Wait here for termination.
 577          * If we call 'exit' then the job will fail in an ugly way, instead just
 578          * wait for the Global coordinator to terminate us.
 579          */
 580         while(1) {
 581             opal_progress();
 582             sleep(1);
 583         }
 584     }
 585 
 586     if( NULL != current_options ) {
 587         OBJ_RELEASE(current_options);
 588         current_options = NULL;
 589     }
 590 
 591     currently_checkpointing = false;
 592 
 593     return exit_status;
 594 }
 595 
 596 static int app_notify_resp_stage_1(opal_cr_ckpt_cmd_state_t resp)
 597 {
 598     int ret, exit_status = ORTE_SUCCESS;
 599 
 600     OPAL_CR_CLEAR_TIMERS();
 601     opal_cr_timing_my_rank = ORTE_PROC_MY_NAME->vpid;
 602     OPAL_CR_SET_TIMER(OPAL_CR_TIMER_ENTRY0);
 603 
 604     /*
 605      * Open communication channels
 606      */
 607     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 608                          "App) notify_response: Open Communication Channels."));
 609     if (ORTE_SUCCESS != (ret = snapc_full_app_notify_reopen_files())) {
 610         ORTE_ERROR_LOG(ret);
 611         exit_status = ret;
 612         goto cleanup;
 613     }
 614 
 615     /*
 616      * Initial Handshake
 617      */
 618     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 619                          "App) notify_response: Initial Handshake."));
 620     if( ORTE_SUCCESS != (ret = snapc_full_app_ckpt_handshake_start(resp) ) ) {
 621         ORTE_ERROR_LOG(ret);
 622         exit_status = ret;
 623         goto cleanup;
 624     }
 625 
 626     OPAL_CR_SET_TIMER(OPAL_CR_TIMER_ENTRY1);
 627 
 628     /*
 629      * Register with SStore
 630      */
 631     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 632                          "App) notify_response: Register with SStore..."));
 633     if( OPAL_SUCCESS != (ret = orte_sstore.register_handle(current_ss_handle)) ) {
 634         ORTE_ERROR_LOG(ret);
 635         exit_status = ret;
 636         goto cleanup;
 637     }
 638 
 639     local_snapshot = OBJ_NEW(opal_crs_base_snapshot_t);
 640 
 641     if( !currently_migrating && currently_all_migrating ) {
 642         orte_sstore.set_attr(current_ss_handle,
 643                              SSTORE_METADATA_LOCAL_SKIP_CKPT,
 644                              "1");
 645     }
 646 
 647     orte_sstore.get_attr(current_ss_handle,
 648                          SSTORE_METADATA_LOCAL_SNAP_LOC,
 649                          &(local_snapshot->snapshot_directory));
 650     orte_sstore.get_attr(current_ss_handle,
 651                          SSTORE_METADATA_LOCAL_SNAP_META,
 652                          &(local_snapshot->metadata_filename));
 653 
 654     OPAL_CR_SET_TIMER(OPAL_CR_TIMER_ENTRY2);
 655 
 656     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 657                          "App) notify_response: Start checkpoint... (%d)", (int)current_ss_handle));
 658 
 659  cleanup:
 660     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 661                          "App) notify_response: Are we migrating [%5s]. Am I migrating [%5s]",
 662                          (currently_all_migrating ? "True" : "False"),
 663                          (currently_migrating ? "True" : "False") ));
 664 
 665     return exit_status;
 666 }
 667 
 668 static int app_notify_resp_inc_prep_only(int cr_state)
 669 {
 670     int ret, exit_status = ORTE_SUCCESS;
 671 
 672     /*
 673      * Tell the local coordinator that we are done with the INC prep
 674      */
 675     if( sizeof(int) != (ret = write(app_comm_pipe_w_fd, &cr_state, sizeof(int))) ) {
 676         opal_output(mca_snapc_full_component.super.output_handle,
 677                     "App) notify_response: Error: Unable to write cr_state to named pipe (%s).\n",
 678                     app_comm_pipe_w);
 679         ORTE_ERROR_LOG(ret);
 680         exit_status = ret;
 681         goto cleanup;
 682     }
 683 
 684     app_notif_processed = true;
 685 
 686  cleanup:
 687     return exit_status;
 688 }
 689 
 690 static int app_notify_resp_stage_2(int cr_state )
 691 {
 692     int ret;
 693 
 694     OPAL_CR_SET_TIMER(OPAL_CR_TIMER_ENTRY3);
 695 
 696     /*
 697      * Sync SStore
 698      * If we stopped the process, then we already did this
 699      */
 700     if( !(current_options->stop) ) {
 701         if( currently_migrating || !currently_all_migrating ) {
 702             orte_sstore.set_attr(current_ss_handle,
 703                                  SSTORE_METADATA_LOCAL_CRS_COMP,
 704                                  local_snapshot->component_name);
 705         }
 706 
 707         orte_sstore.sync(current_ss_handle);
 708     }
 709     last_ss_handle = current_ss_handle;
 710     current_ss_handle = 0;
 711 
 712     /*
 713      * Final Handshake
 714      */
 715     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 716                          "App) notify_response: Waiting for final handshake."));
 717     if( ORTE_SUCCESS != (ret = snapc_full_app_ckpt_handshake_end(cr_state ) ) ) {
 718         ORTE_ERROR_LOG(ret);
 719         return ret;
 720     }
 721 
 722     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 723                          "App) notify_response: Final Handshake complete."));
 724 
 725     return ORTE_SUCCESS;
 726 }
 727 
 728 static int app_define_pipe_names(void)
 729 {
 730     if( NULL != app_comm_pipe_r ) {
 731         free(app_comm_pipe_r);
 732         app_comm_pipe_r = NULL;
 733     }
 734 
 735     if( NULL != app_comm_pipe_w ) {
 736         free(app_comm_pipe_w);
 737         app_comm_pipe_w = NULL;
 738     }
 739 
 740     opal_asprintf(&app_comm_pipe_r, "%s/%s.%d_%d",
 741              opal_cr_pipe_dir, OPAL_CR_NAMED_PROG_R,
 742              (int)getpid(), current_unique_id);
 743     opal_asprintf(&app_comm_pipe_w, "%s/%s.%d_%d",
 744              opal_cr_pipe_dir, OPAL_CR_NAMED_PROG_W,
 745              (int)getpid(), current_unique_id);
 746 
 747     ++current_unique_id;
 748 
 749     return ORTE_SUCCESS;
 750 }
 751 
 752 static int app_notify_resp_stage_3(int cr_state, bool skip_fin_msg)
 753 {
 754     /*
 755      * Send a message to the local daemon letting it know that we are done
 756      */
 757     if( !skip_fin_msg ) {
 758         snapc_full_app_finished_msg(cr_state);
 759     }
 760 
 761     /*
 762      * Close and cleanup pipes
 763      */
 764     if( 0 <= app_comm_pipe_r_fd ) {
 765         close(app_comm_pipe_r_fd);
 766         app_comm_pipe_r_fd = -1;
 767     }
 768     if( 0 <= app_comm_pipe_w_fd ) {
 769         close(app_comm_pipe_w_fd);
 770         app_comm_pipe_w_fd = -1;
 771     }
 772 
 773     remove(app_comm_pipe_r);
 774     remove(app_comm_pipe_w);
 775 
 776     app_comm_pipe_r_fd = -1;
 777     app_comm_pipe_w_fd = -1;
 778 
 779     if( OPAL_CRS_RESTART == cr_state ) {
 780         current_unique_id = 0;
 781     }
 782 
 783     app_define_pipe_names();
 784 
 785     /* Prepare to wait for another checkpoint action */
 786     opal_cr_checkpointing_state = OPAL_CR_STATUS_NONE;
 787     opal_cr_currently_stalled   = false;
 788 
 789     currently_all_migrating = false;
 790     currently_migrating     = false;
 791 
 792     OPAL_CR_SET_TIMER(OPAL_CR_TIMER_ENTRY4);
 793     if(OPAL_CRS_RESTART != cr_state) {
 794         OPAL_CR_DISPLAY_ALL_TIMERS();
 795     }
 796 
 797     return ORTE_SUCCESS;
 798 }
 799 
 800 static int snapc_full_app_finished_msg(int cr_state) {
 801     int ret, exit_status = ORTE_SUCCESS;
 802     opal_buffer_t *buffer = NULL;
 803     orte_snapc_cmd_flag_t command = ORTE_SNAPC_LOCAL_FINISH_CMD;
 804 
 805     buffer = OBJ_NEW(opal_buffer_t);
 806 
 807     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_CMD))) {
 808         ORTE_ERROR_LOG(ret);
 809         exit_status = ret;
 810         goto cleanup;
 811     }
 812 
 813     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &cr_state, 1, OPAL_INT))) {
 814         ORTE_ERROR_LOG(ret);
 815         exit_status = ret;
 816         goto cleanup;
 817     }
 818 
 819     if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, buffer,
 820                                                        ORTE_RML_TAG_SNAPC,
 821                                                        orte_rml_send_callback, 0))) {
 822         ORTE_ERROR_LOG(ret);
 823         exit_status = ret;
 824         goto cleanup;
 825     }
 826 
 827     return ORTE_SUCCESS;
 828  cleanup:
 829     OBJ_RELEASE(buffer);
 830 
 831     return exit_status;
 832 }
 833 
 834 static int snapc_full_app_notify_reopen_files(void)
 835 {
 836     int ret = OPAL_ERR_NOT_IMPLEMENTED;
 837 
 838 #ifndef HAVE_MKFIFO
 839     return ret;
 840 #else
 841     /*
 842      * Open up the read pipe
 843      */
 844     if( (ret = mkfifo(app_comm_pipe_r, 0660)) < 0) {
 845         if(EEXIST == ret || -1 == ret ) {
 846             OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 847                                  "App) notify_reopen_files: mkfifo failed because file (%s) already exists, attempting to use this pipe. (%d)",
 848                                  app_comm_pipe_r, ret));
 849         }
 850         else {
 851             opal_output(mca_snapc_full_component.super.output_handle,
 852                         "App) notify_reopen_files: Error: mkfifo failed to make named pipe (%s). (%d)\n",
 853                         app_comm_pipe_r, ret);
 854             return ORTE_ERROR;
 855         }
 856     }
 857 
 858     app_comm_pipe_r_fd = open(app_comm_pipe_r, O_RDWR);
 859     if(app_comm_pipe_r_fd < 0) {
 860         opal_output(mca_snapc_full_component.super.output_handle,
 861                     "App) init: Error: open failed to open the named pipe (%s). %d\n",
 862                     app_comm_pipe_r, app_comm_pipe_r_fd);
 863         return ORTE_ERROR;
 864     }
 865 
 866     /*
 867      * Open up the write pipe
 868      */
 869     if( (ret = mkfifo(app_comm_pipe_w, 0660)) < 0) {
 870         if(EEXIST == ret || -1 == ret ) {
 871             OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 872                                  "App) notify_reopen_files: mkfifo failed because file (%s) already exists, attempting to use this pipe. (%d)",
 873                                  app_comm_pipe_w, ret));
 874         }
 875         else {
 876             opal_output(mca_snapc_full_component.super.output_handle,
 877                         "App) notify_reopen_files: Error: mkfifo failed to make named pipe (%s). (%d)\n",
 878                         app_comm_pipe_w, ret);
 879             return ORTE_ERROR;
 880         }
 881     }
 882 
 883     app_comm_pipe_w_fd = open(app_comm_pipe_w, O_WRONLY);
 884     if(app_comm_pipe_w_fd < 0) {
 885         opal_output(mca_snapc_full_component.super.output_handle,
 886                     "App) notify_reopen_files: Error: open failed to open the named pipe (%s). (%d)\n",
 887                     app_comm_pipe_w, app_comm_pipe_w_fd);
 888         return ORTE_ERROR;
 889     }
 890 
 891     return ORTE_SUCCESS;
 892 #endif  /* HAVE_MKFIFO */
 893 }
 894 
 895 static int snapc_full_app_ckpt_handshake_start(opal_cr_ckpt_cmd_state_t resp)
 896 {
 897     int ret, exit_status = ORTE_SUCCESS;
 898     int tmp_resp, opt_rep;
 899 
 900     /*
 901      * Get the initial handshake command:
 902      * - Migrating option [all, me]
 903      * - Term argument
 904      * - Stop argument
 905      */
 906     if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
 907         opal_output(mca_snapc_full_component.super.output_handle,
 908                     "App) notify_response: Error: Unable to read the all_migrating option from named pipe (%s). %d\n",
 909                     app_comm_pipe_r, ret);
 910         ORTE_ERROR_LOG(ret);
 911         goto cleanup;
 912     }
 913     currently_all_migrating = OPAL_INT_TO_BOOL(opt_rep);
 914 
 915     if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
 916         opal_output(mca_snapc_full_component.super.output_handle,
 917                     "App) notify_response: Error: Unable to read the migrating option from named pipe (%s). %d\n",
 918                     app_comm_pipe_r, ret);
 919         ORTE_ERROR_LOG(ret);
 920         goto cleanup;
 921     }
 922     currently_migrating = OPAL_INT_TO_BOOL(opt_rep);
 923 
 924     if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
 925         opal_output(mca_snapc_full_component.super.output_handle,
 926                     "App) notify_response: Error: Unable to read the 'term' from named pipe (%s). %d\n",
 927                     app_comm_pipe_r, ret);
 928         ORTE_ERROR_LOG(ret);
 929         goto cleanup;
 930     }
 931     current_options->term = OPAL_INT_TO_BOOL(opt_rep);
 932 
 933     if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
 934         opal_output(mca_snapc_full_component.super.output_handle,
 935                     "App) notify_response: Error: Unable to read the 'stop' from named pipe (%s). %d\n",
 936                     app_comm_pipe_r, ret);
 937         ORTE_ERROR_LOG(ret);
 938         goto cleanup;
 939     }
 940     current_options->stop = OPAL_INT_TO_BOOL(opt_rep);
 941 
 942     if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
 943         opal_output(mca_snapc_full_component.super.output_handle,
 944                     "App) notify_response: Error: Unable to read the 'inc_prep_only' from named pipe (%s). %d\n",
 945                     app_comm_pipe_r, ret);
 946         ORTE_ERROR_LOG(ret);
 947         goto cleanup;
 948     }
 949     current_options->inc_prep_only = OPAL_INT_TO_BOOL(opt_rep);
 950 
 951     if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
 952         opal_output(mca_snapc_full_component.super.output_handle,
 953                     "App) notify_response: Error: Unable to read the 'inc_recover_only' from named pipe (%s). %d\n",
 954                     app_comm_pipe_r, ret);
 955         ORTE_ERROR_LOG(ret);
 956         goto cleanup;
 957     }
 958     current_options->inc_recover_only = OPAL_INT_TO_BOOL(opt_rep);
 959 
 960 #if OPAL_ENABLE_CRDEBUG == 1
 961     if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
 962         opal_output(mca_snapc_full_component.super.output_handle,
 963                     "App) notify_response: Error: Unable to read the 'attach_debugger' from named pipe (%s). %d\n",
 964                     app_comm_pipe_r, ret);
 965         ORTE_ERROR_LOG(ret);
 966         goto cleanup;
 967     }
 968     current_options->attach_debugger = OPAL_INT_TO_BOOL(opt_rep);
 969 
 970     if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
 971         opal_output(mca_snapc_full_component.super.output_handle,
 972                     "App) notify_response: Error: Unable to read the 'detach_debugger' from named pipe (%s). %d\n",
 973                     app_comm_pipe_r, ret);
 974         ORTE_ERROR_LOG(ret);
 975         goto cleanup;
 976     }
 977     current_options->detach_debugger = OPAL_INT_TO_BOOL(opt_rep);
 978 #endif
 979 
 980     /*
 981      * Get SStore Handle
 982      */
 983     if( sizeof(orte_sstore_base_handle_t) != (ret = read(app_comm_pipe_r_fd, &current_ss_handle, sizeof(orte_sstore_base_handle_t))) ) {
 984         opal_output(mca_snapc_full_component.super.output_handle,
 985                     "App) notify_response: Error: Unable to read the sstore handle from named pipe (%s). %d\n",
 986                     app_comm_pipe_r, ret);
 987         ORTE_ERROR_LOG(ret);
 988         goto cleanup;
 989     }
 990 
 991     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 992                          "App) %s Received Options... Responding with %d\n",
 993                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)resp));
 994 
 995     /*
 996      * Write back the response to the request (message printed below)
 997      */
 998     tmp_resp = (int)resp;
 999     if( sizeof(int) != (ret = write(app_comm_pipe_w_fd, &tmp_resp, sizeof(int)) ) ) {
1000         opal_output(mca_snapc_full_component.super.output_handle,
1001                     "App) notify_response: %d: Error: Unable to write to pipe (%s) ret = %d [Line %d]\n",
1002                     tmp_resp, app_comm_pipe_w, ret, __LINE__);
1003         ORTE_ERROR_LOG(ret);
1004         goto cleanup;
1005     }
1006 
1007     /*
1008      * Respond that the checkpoint is currently in progress
1009      */
1010     if( OPAL_CHECKPOINT_CMD_IN_PROGRESS == resp ) {
1011         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1012                              "App) notify_response: Checkpoint in progress, cannot start (%d)",
1013                              getpid()));
1014         ORTE_ERROR_LOG(ret);
1015         goto cleanup;
1016     }
1017     /*
1018      * Respond that the application is unable to be checkpointed
1019      */
1020     else if( OPAL_CHECKPOINT_CMD_NULL == resp ) {
1021         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1022                              "App) notify_response: Non-checkpointable application, cannot start (%d)",
1023                              getpid()));
1024         ORTE_ERROR_LOG(ret);
1025         goto cleanup;
1026     }
1027     /*
1028      * Respond that some error has occurred such that the application is
1029      * not able to be checkpointed
1030      */
1031     else if( OPAL_CHECKPOINT_CMD_ERROR == resp ) {
1032         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1033                              "App) notify_response: Error generated, cannot start (%d)",
1034                              getpid()));
1035         ORTE_ERROR_LOG(ret);
1036         goto cleanup;
1037     }
1038 
1039     /*
1040      * Respond signalng that we wish to respond to this request
1041      */
1042     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1043                          "App) notify_response: Starting checkpoint request (%d)",
1044                          getpid()));
1045 
1046     /*
1047      * Get the sentinel value indicating that we can start now
1048      * JJH: Check for an error here indicating that even though this process is
1049      *      OK to checkpoint others might not be in which case we should cleanup
1050      *      properly.
1051      */
1052     if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
1053         opal_output(mca_snapc_full_component.super.output_handle,
1054                     "App) notify_response: Error: Unable to read from named pipe (%s). %d\n",
1055                     app_comm_pipe_r, ret);
1056         ORTE_ERROR_LOG(ret);
1057         goto cleanup;
1058     }
1059 
1060  cleanup:
1061     return exit_status;
1062 }
1063 
1064 static int snapc_full_app_ckpt_handshake_end(int cr_state)
1065 {
1066     int ret, exit_status = ORTE_SUCCESS;
1067     int last_cmd = 0;
1068     int err;
1069 
1070     /*
1071      * Return the final checkpoint state to the local coordinator
1072      */
1073     if( sizeof(int) != (ret = write(app_comm_pipe_w_fd, &cr_state, sizeof(int))) ) {
1074         err = errno;
1075         opal_output(mca_snapc_full_component.super.output_handle,
1076                     "App) notify_response: Error: Unable to write cr_state to named pipe (%s). %d/%d/%s\n",
1077                     app_comm_pipe_w, ret, err, strerror(err));
1078         ORTE_ERROR_LOG(ret);
1079         exit_status = ret;
1080         goto cleanup;
1081     }
1082 
1083     if( currently_all_migrating && currently_migrating ) {
1084         app_notify_resp_stage_3(cr_state, true);
1085         OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1086                              "App) handshake_end: Waiting for termination (%d)",
1087                              getpid()));
1088         /* Wait here for termination, do not terminate ourselves.
1089          * JJH: We cannot terminate ourselves without killing the job...
1090          */
1091         while(1) {
1092             opal_progress();
1093             sleep(1);
1094         }
1095     }
1096 
1097     OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1098                          "App) handshake_end: Waiting for release (%d)",
1099                          getpid()));
1100 
1101     /*
1102      * Wait for the local coordinator to release us
1103      */
1104     if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &last_cmd, sizeof(int))) ) {
1105         opal_output(mca_snapc_full_component.super.output_handle,
1106                     "App) notify_response: Error: Unable to read the 'last_cmd' from named pipe (%s). %d\n",
1107                     app_comm_pipe_r, ret);
1108         ORTE_ERROR_LOG(ret);
1109         exit_status = ret;
1110         goto cleanup;
1111     }
1112 
1113     OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1114                          "App) handshake_end: Released... (%d)",
1115                          getpid()));
1116 
1117  cleanup:
1118     return exit_status;
1119 }
1120 
1121 int app_coord_ft_event(int state) {
1122     int ret, exit_status = ORTE_SUCCESS;
1123 
1124     OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
1125                          "App) In ft_event(%d)", state));
1126 
1127     /******** Checkpoint Prep ********/
1128     if(OPAL_CRS_CHECKPOINT == state) {
1129         /*
1130          * Record the job session directory
1131          * This way we will recreate it on restart so that any components that
1132          * have old references to it (like btl/sm) can reference their files
1133          * (to close the fd's to them) on restart. We will remove it before we
1134          * create the new session directory.
1135          */
1136         orte_sstore.set_attr(orte_sstore_handle_current,
1137                              SSTORE_METADATA_LOCAL_MKDIR,
1138                              orte_process_info.job_session_dir);
1139 
1140         /*
1141          * If stopping then sync early
1142          */
1143         if( current_options->stop ) {
1144             orte_sstore.set_attr(current_ss_handle,
1145                                  SSTORE_METADATA_LOCAL_CRS_COMP,
1146                                  opal_crs_base_selected_component.base_version.mca_component_name);
1147 
1148             orte_sstore.sync(current_ss_handle);
1149         }
1150     }
1151     /******** Continue Recovery ********/
1152     else if (OPAL_CRS_CONTINUE == state ) {
1153 #if OPAL_ENABLE_CRDEBUG == 1
1154         /*
1155          * Send PID to HNP/daemon if debugging as an indicator that we have
1156          * finished the checkpoint operation.
1157          */
1158         if( ORTE_SUCCESS != (ret = snapc_full_app_ft_event_update_process_info(orte_process_info.my_name, getpid())) ) {
1159             ORTE_ERROR_LOG(ret);
1160             exit_status = ret;
1161             goto cleanup;
1162         }
1163 #endif
1164         ; /* Nothing */
1165     }
1166     /******** Restart Pre-Recovery ********/
1167     else if (OPAL_CRS_RESTART_PRE == state ) {
1168         ; /* Nothing */
1169     }
1170     /******** Restart Recovery ********/
1171     else if (OPAL_CRS_RESTART == state ) {
1172         OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
1173                              "App) Initalized for Application %s (Restart) (%5d)\n",
1174                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), getpid()));
1175 
1176         /*
1177          * Send new PID to HNP/daemon
1178          * The checkpointer could have used a proxy program to boot us
1179          * so the pid that the orted got from fork() may not be the
1180          * PID of this application.
1181          * - Note: BLCR does this because it tries to preseve the PID
1182          *         of the program across checkpointes
1183          */
1184         if( ORTE_SUCCESS != (ret = snapc_full_app_ft_event_update_process_info(orte_process_info.my_name, getpid())) ) {
1185             ORTE_ERROR_LOG(ret);
1186             exit_status = ret;
1187             goto cleanup;
1188         }
1189 
1190         /*
1191          * JJH: Optionally the non-migrating processes can wait here in stage_2
1192          * JJH: This will delay the initial checkpoint, but potentially speed up
1193          * JJH: restart.
1194          */
1195     }
1196     /******** Termination ********/
1197     else if (OPAL_CRS_TERM == state ) {
1198         ; /* Nothing */
1199     }
1200     /******** Error State ********/
1201     else {
1202         ; /* Nothing */
1203     }
1204 
1205  cleanup:
1206     return exit_status;
1207 }
1208 
1209 static int snapc_full_app_ft_event_update_process_info(orte_process_name_t proc, pid_t proc_pid)
1210 {
1211     int ret, exit_status = ORTE_SUCCESS;
1212     opal_buffer_t *buffer = NULL;
1213     orte_snapc_cmd_flag_t command = ORTE_SNAPC_LOCAL_UPDATE_CMD;
1214 
1215     buffer = OBJ_NEW(opal_buffer_t);
1216 
1217     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_CMD))) {
1218         ORTE_ERROR_LOG(ret);
1219         exit_status = ret;
1220         goto cleanup;
1221     }
1222 
1223     /* JJH CLEANUP: Do we really need this, it is equal to sender */
1224     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &proc, 1, ORTE_NAME))) {
1225         ORTE_ERROR_LOG(ret);
1226         exit_status = ret;
1227         goto cleanup;
1228     }
1229 
1230     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &proc_pid, 1, OPAL_PID))) {
1231         ORTE_ERROR_LOG(ret);
1232         exit_status = ret;
1233         goto cleanup;
1234     }
1235 
1236 #if OPAL_ENABLE_CRDEBUG == 1
1237     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &MPIR_debug_with_checkpoint, 1, OPAL_BOOL))) {
1238         ORTE_ERROR_LOG(ret);
1239         exit_status = ret;
1240         goto cleanup;
1241     }
1242 #endif
1243 
1244     if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, buffer,
1245                                                        ORTE_RML_TAG_SNAPC,
1246                                                        orte_rml_send_callback, 0))) {
1247         ORTE_ERROR_LOG(ret);
1248         exit_status = ret;
1249         goto cleanup;
1250     }
1251 
1252     return ORTE_SUCCESS;
1253  cleanup:
1254     OBJ_RELEASE(buffer);
1255 
1256     return exit_status;
1257 }
1258 
1259 int app_coord_request_op(orte_snapc_base_request_op_t *datum)
1260 {
1261     int ret, exit_status = ORTE_SUCCESS;
1262     orte_snapc_full_cmd_flag_t command = ORTE_SNAPC_FULL_REQUEST_OP_CMD;
1263     opal_buffer_t *buffer = NULL;
1264     orte_std_cntr_t count;
1265     orte_rml_recv_cb_t *rb = NULL;
1266     int op_event, op_state;
1267     char *seq_str = NULL, *tmp_str = NULL;
1268     int cr_state = OPAL_CRS_CONTINUE;
1269     int app_pid, i;
1270 
1271     /*
1272      * Quiesce_end recovers the library before talking to the Global coord.
1273      */
1274     if( ORTE_SNAPC_OP_QUIESCE_END == datum->event) {
1275         OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1276                              "App) Quiesce_end: Recovering the stack..."));
1277 
1278         /*
1279          * INC: Recover the stack
1280          */
1281         if( NULL == local_snapshot->component_name ) {
1282             local_snapshot->component_name = strdup("");
1283         }
1284         if( ORTE_SUCCESS != (ret = app_notify_resp_stage_2(cr_state) ) ) {
1285             exit_status = ret;
1286             ORTE_ERROR_LOG(ret);
1287             goto cleanup;
1288         }
1289 
1290         if(OPAL_SUCCESS != (ret = opal_cr_inc_core_recover(cr_state) ) ) {
1291             exit_status = ret;
1292             ORTE_ERROR_LOG(ret);
1293             goto cleanup;
1294         }
1295 
1296         if( ORTE_SUCCESS != (ret = app_notify_resp_stage_3(cr_state, false) )) {
1297             exit_status = ret;
1298             ORTE_ERROR_LOG(ret);
1299             goto cleanup;
1300         }
1301 
1302         currently_checkpointing = false;
1303         app_notif_processed = false;
1304 
1305         OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1306                              "App) Quiesce_end: Recovered."));
1307     }
1308     else if( ORTE_SNAPC_OP_QUIESCE_CHECKPOINT == datum->event) {
1309         app_pid = getpid();
1310         cr_state = OPAL_CRS_RUNNING;
1311         if( OPAL_SUCCESS != (ret = opal_cr_inc_core_ckpt(app_pid, local_snapshot, current_options, &cr_state)) ) {
1312             ORTE_ERROR_LOG(ret);
1313             exit_status = ret;
1314         }
1315 
1316         if( OPAL_CRS_RESTART != cr_state ) {
1317             orte_sstore.sync(current_ss_handle);
1318         }
1319 
1320         orte_sstore.get_attr(current_ss_handle,
1321                              SSTORE_METADATA_GLOBAL_SNAP_SEQ,
1322                              &seq_str);
1323         if( NULL != seq_str ) {
1324             datum->seq_num = atoi(seq_str);
1325         } else {
1326             datum->seq_num = -1;
1327         }
1328 
1329         orte_sstore.get_attr(current_ss_handle,
1330                              SSTORE_METADATA_GLOBAL_SNAP_REF,
1331                              &(datum->global_handle));
1332         if( NULL == datum->global_handle ) {
1333             datum->global_handle = strdup("Unknown");
1334         }
1335 
1336         return exit_status;
1337     }
1338 
1339     /*
1340      * Leader: Send the info to the head node
1341      */
1342     if( datum->leader == (int)ORTE_PROC_MY_NAME->vpid ) {
1343         OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1344                              "App) Request_op: Sending request (%3d)...",
1345                              datum->event));
1346         /*
1347          * Send request to HNP
1348          */
1349         buffer = OBJ_NEW(opal_buffer_t);
1350 
1351         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
1352             ORTE_ERROR_LOG(ret);
1353             exit_status = ret;
1354             goto cleanup;
1355         }
1356         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(ORTE_PROC_MY_NAME->jobid), 1, ORTE_JOBID))) {
1357             ORTE_ERROR_LOG(ret);
1358             exit_status = ret;
1359             goto cleanup;
1360         }
1361 
1362         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(datum->event), 1, OPAL_INT))) {
1363             ORTE_ERROR_LOG(ret);
1364             exit_status = ret;
1365             goto cleanup;
1366         }
1367 
1368         if( ORTE_SNAPC_OP_RESTART == datum->event) {
1369             if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(datum->seq_num), 1, OPAL_INT))) {
1370                 ORTE_ERROR_LOG(ret);
1371                 exit_status = ret;
1372                 goto cleanup;
1373             }
1374             if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(datum->global_handle), 1, OPAL_STRING))) {
1375                 ORTE_ERROR_LOG(ret);
1376                 exit_status = ret;
1377                 goto cleanup;
1378             }
1379         }
1380         else if( ORTE_SNAPC_OP_MIGRATE == datum->event) {
1381             /*
1382              * Check information
1383              *  Rank  | Hostname  | cr_off_node  | Meaning
1384              * -------+-----------+--------------+---------
1385              *  self  | home/same | false        | Do not move this process
1386              *        |           | true         | ERROR
1387              *        | NULL      | false        | Move wherever
1388              *        |           | true         | Move off of this node
1389              *        | other     | false/true   | Move to the 'other' node
1390              * -------+-----------+--------------+---------
1391              *  peer  | home/same | false        | Move 'peer' to me
1392              *        |           | true         | ERROR
1393              *        | NULL      | false        | Move wherever (Default: Move 'peer' to me)
1394              *        |           | true         | Move with peer to some other node
1395              *        | other     | false/true   | Move with peer to 'other' node
1396              * -------+-----------+--------------+---------
1397              * If 'rank' is set to a peer other than self, and the peer sets
1398              * conflicting 'hostname' or 'cr_off_node' preferences, then that
1399              * is an error. In which case the migration should fail.
1400              */
1401             currently_all_migrating = true;
1402 
1403             /*
1404              * Send information
1405              */
1406             if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(datum->mig_num), 1, OPAL_INT))) {
1407                 ORTE_ERROR_LOG(ret);
1408                 exit_status = ret;
1409                 goto cleanup;
1410             }
1411 
1412             for( i = 0; i < datum->mig_num; ++i ) {
1413                 OPAL_OUTPUT_VERBOSE((30, mca_snapc_full_component.super.output_handle,
1414                                      "App) Migration %3d/%3d: Sending Rank %3d - Requested <%s> (%3d) %c\n",
1415                                      datum->mig_num, i,
1416                                      (datum->mig_vpids)[i],
1417                                      (datum->mig_host_pref)[i],
1418                                      (datum->mig_vpid_pref)[i],
1419                                      (OPAL_INT_TO_BOOL((datum->mig_off_node)[i]) ? 'T' : 'F')
1420                                      ));
1421 
1422                 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &((datum->mig_vpids)[i]), 1, OPAL_INT))) {
1423                     ORTE_ERROR_LOG(ret);
1424                     exit_status = ret;
1425                     goto cleanup;
1426                 }
1427                 tmp_str = strdup((datum->mig_host_pref)[i]);
1428                 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &tmp_str, 1, OPAL_STRING))) {
1429                     ORTE_ERROR_LOG(ret);
1430                     exit_status = ret;
1431                     goto cleanup;
1432                 }
1433                 if( NULL != tmp_str ) {
1434                     free(tmp_str);
1435                     tmp_str = NULL;
1436                 }
1437 
1438                 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &((datum->mig_vpid_pref)[i]), 1, OPAL_INT))) {
1439                     ORTE_ERROR_LOG(ret);
1440                     exit_status = ret;
1441                     goto cleanup;
1442                 }
1443                 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &((datum->mig_off_node)[i]), 1, OPAL_INT))) {
1444                     ORTE_ERROR_LOG(ret);
1445                     exit_status = ret;
1446                     goto cleanup;
1447                 }
1448             }
1449         }
1450 
1451         if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer, ORTE_RML_TAG_SNAPC_FULL,
1452                                                            orte_rml_send_callback, 0))) {
1453             ORTE_ERROR_LOG(ret);
1454             exit_status = ret;
1455             goto cleanup;
1456         }
1457         /* buffer should not be released here; the callback releases it */
1458         buffer = NULL;
1459     }
1460 
1461     /*
1462      * Wait for the response
1463      */
1464     if( ORTE_SNAPC_OP_CHECKPOINT == datum->event) {
1465         if( datum->leader == (int)ORTE_PROC_MY_NAME->vpid ) {
1466             /*
1467              * Wait for local completion (need to check to see if we are restarting)
1468              */
1469             while(OPAL_CRS_CONTINUE != current_cr_state &&
1470                   OPAL_CRS_RESTART  != current_cr_state &&
1471                   OPAL_CRS_ERROR    != current_cr_state ) {
1472                 opal_progress();
1473                 OPAL_CR_TEST_CHECKPOINT_READY();
1474             }
1475 
1476             /* Do not wait for a response if we are restarting (it will never arrive) */
1477             if( OPAL_CRS_RESTART == current_cr_state ) {
1478                 orte_sstore.get_attr(current_ss_handle,
1479                                      SSTORE_METADATA_GLOBAL_SNAP_SEQ,
1480                                      &seq_str);
1481                 if( NULL != seq_str ) {
1482                     datum->seq_num = atoi(seq_str);
1483                 } else {
1484                     datum->seq_num = -1;
1485                 }
1486 
1487                 orte_sstore.get_attr(current_ss_handle,
1488                                      SSTORE_METADATA_GLOBAL_SNAP_REF,
1489                                      &(datum->global_handle));
1490                 if( NULL == datum->global_handle ) {
1491                     datum->global_handle = strdup("Unknown");
1492                 }
1493 
1494                 current_cr_state = OPAL_CRS_NONE;
1495 
1496                 exit_status = ORTE_SUCCESS;
1497                 goto cleanup;
1498             }
1499 
1500             /*
1501              * Wait for a response regarding completion
1502              */
1503             rb = OBJ_NEW(orte_rml_recv_cb_t);
1504             rb->active = true;
1505             orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, orte_rml_recv_callback, rb);
1506             ORTE_WAIT_FOR_COMPLETION(rb->active);
1507 
1508             count = 1;
1509             if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &command, &count, ORTE_SNAPC_FULL_CMD))) {
1510                 ORTE_ERROR_LOG(ret);
1511                 exit_status = ret;
1512                 goto cleanup;
1513             }
1514 
1515             count = 1;
1516             if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_event, &count, OPAL_INT))) {
1517                 ORTE_ERROR_LOG(ret);
1518                 exit_status = ret;
1519                 goto cleanup;
1520             }
1521 
1522             count = 1;
1523             if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_state, &count, OPAL_INT))) {
1524                 ORTE_ERROR_LOG(ret);
1525                 exit_status = ret;
1526                 goto cleanup;
1527             }
1528 
1529             orte_sstore.get_attr(last_ss_handle,
1530                                  SSTORE_METADATA_GLOBAL_SNAP_SEQ,
1531                                  &seq_str);
1532             datum->seq_num = atoi(seq_str);
1533 
1534             orte_sstore.get_attr(last_ss_handle,
1535                                  SSTORE_METADATA_GLOBAL_SNAP_REF,
1536                                  &(datum->global_handle));
1537         }
1538     }
1539     /*
1540      * Restart will terminate this process, so just wait...
1541      */
1542     else if( ORTE_SNAPC_OP_RESTART == datum->event) {
1543         while( 1 ) {
1544             opal_progress();
1545             OPAL_CR_TEST_CHECKPOINT_READY();
1546             sleep(1);
1547         }
1548     }
1549     /*
1550      * Leader waits for response
1551      */
1552     else if( ORTE_SNAPC_OP_MIGRATE == datum->event) {
1553         if( datum->leader == (int)ORTE_PROC_MY_NAME->vpid ) {
1554             while( currently_all_migrating ) {
1555                 opal_progress();
1556                 OPAL_CR_TEST_CHECKPOINT_READY();
1557                 sleep(1);
1558             }
1559 
1560             OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1561                                  "App) Request_op: Leader waiting for Migrate release (%3d)...",
1562                                  datum->event));
1563 
1564 
1565             /*
1566              * Wait for a response regarding completion
1567              */
1568             rb = OBJ_NEW(orte_rml_recv_cb_t);
1569             rb->active = true;
1570             orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, orte_rml_recv_callback, rb);
1571             ORTE_WAIT_FOR_COMPLETION(rb->active);
1572 
1573             count = 1;
1574             if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &command, &count, ORTE_SNAPC_FULL_CMD))) {
1575                 ORTE_ERROR_LOG(ret);
1576                 exit_status = ret;
1577                 goto cleanup;
1578             }
1579 
1580             count = 1;
1581             if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_event, &count, OPAL_INT))) {
1582                 ORTE_ERROR_LOG(ret);
1583                 exit_status = ret;
1584                 goto cleanup;
1585             }
1586 
1587             count = 1;
1588             if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_state, &count, OPAL_INT))) {
1589                 ORTE_ERROR_LOG(ret);
1590                 exit_status = ret;
1591                 goto cleanup;
1592             }
1593 
1594             OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1595                                  "App) Request_op: Leader continuing from Migration (%3d)...",
1596                                  datum->event));
1597         }
1598     }
1599     /*
1600      * Everyone waits here for completion of Quiesce start
1601      */
1602     else if( ORTE_SNAPC_OP_QUIESCE_START == datum->event) {
1603         OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1604                              "App) Quiesce_start: Waiting for release..."));
1605 
1606         while( !app_notif_processed ) {
1607             opal_progress();
1608             OPAL_CR_TEST_CHECKPOINT_READY();
1609         }
1610 
1611         OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1612                              "App) Quiesce_start: Released"));
1613     }
1614     /*
1615      * No waiting for Quiesce end (barrier occurs in protocol)
1616      */
1617     else if( ORTE_SNAPC_OP_QUIESCE_END == datum->event) {
1618         OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1619                              "App) Quiesce_end: Waiting for release..."));
1620 
1621         OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1622                              "App) Quiesce_end: Released"));
1623     }
1624 
1625 
1626  cleanup:
1627     if (NULL != buffer) {
1628         OBJ_RELEASE(buffer);
1629         buffer = NULL;
1630     }
1631     if (NULL != rb) {
1632         OBJ_RELEASE(rb);
1633         rb = NULL;
1634     }
1635 
1636     if( NULL != seq_str ) {
1637         free(seq_str);
1638         seq_str = NULL;
1639     }
1640 
1641     if( NULL != tmp_str ) {
1642         free(tmp_str);
1643         tmp_str = NULL;
1644     }
1645 
1646     return exit_status;
1647 }

/* [<][>][^][v][top][bottom][index][help] */