root/orte/mca/snapc/full/snapc_full_local.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. local_coord_init
  2. local_coord_finalize
  3. local_coord_setup_job
  4. local_coord_release_job
  5. snapc_full_local_start_hnp_listener
  6. snapc_full_local_stop_hnp_listener
  7. snapc_full_local_start_app_listener
  8. snapc_full_local_stop_app_listener
  9. snapc_full_local_app_cmd_recv
  10. snapc_full_local_send_restart_proc_info
  11. snapc_full_local_hnp_cmd_recv
  12. snapc_full_local_process_job_update_cmd
  13. local_coord_job_state_update
  14. local_coord_job_state_update_finished_local
  15. local_coord_job_state_update_finished_local_vpid
  16. snapc_full_local_start_checkpoint_all
  17. local_define_pipe_names
  18. snapc_full_local_update_coord
  19. snapc_full_local_start_ckpt_open_comm
  20. snapc_full_local_start_ckpt_handshake_opts
  21. snapc_full_local_start_ckpt_handshake
  22. snapc_full_local_end_ckpt_handshake
  23. snapc_full_local_comm_read_event
  24. snapc_full_get_min_state
  25. snapc_full_local_get_vpids
  26. snapc_full_local_refresh_vpids
  27. find_vpid_snapshot
  28. orte_snapc_full_local_reset_coord

   1 /*
   2  * Copyright (c) 2004-2010 The Trustees of Indiana University.
   3  *                         All rights reserved.
   4  * Copyright (c) 2004-2011 The Trustees of the University of Tennessee.
   5  *                         All rights reserved.
   6  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   7  *                         University of Stuttgart.  All rights reserved.
   8  * Copyright (c) 2004-2005 The Regents of the University of California.
   9  *                         All rights reserved.
  10  * Copyright (c) 2007      Evergrid, Inc. All rights reserved.
  11  *
  12  * Copyright (c) 2018      Intel, Inc.  All rights reserved.
  13  * $COPYRIGHT$
  14  *
  15  * Additional copyrights may follow
  16  *
  17  * $HEADER$
  18  */
  19 
  20 #include "orte_config.h"
  21 
  22 #include <sys/types.h>
  23 #ifdef HAVE_UNISTD_H
  24 #include <unistd.h>
  25 #endif  /* HAVE_UNISTD_H */
  26 #include <signal.h>
  27 #ifdef HAVE_FCNTL_H
  28 #include <fcntl.h>
  29 #endif  /* HAVE_FCNTL_H */
  30 #ifdef HAVE_SYS_TYPES_H
  31 #include <sys/types.h>
  32 #endif  /* HAVE_SYS_TYPES_H */
  33 #ifdef HAVE_SYS_STAT_H
  34 #include <sys/stat.h>  /* for mkfifo */
  35 #endif  /* HAVE_SYS_STAT_H */
  36 #ifdef HAVE_SYS_WAIT_H
  37 #include <sys/wait.h>
  38 #endif
  39 #include <string.h>
  40 
  41 #include "opal/runtime/opal_progress.h"
  42 #include "opal/runtime/opal_cr.h"
  43 #include "opal/util/output.h"
  44 #include "opal/util/opal_environ.h"
  45 #include "opal/util/os_dirpath.h"
  46 #include "opal/util/basename.h"
  47 #include "orte/mca/mca.h"
  48 #include "opal/mca/base/base.h"
  49 #include "opal/mca/crs/crs.h"
  50 #include "opal/mca/crs/base/base.h"
  51 
  52 #include "orte/util/show_help.h"
  53 #include "orte/util/name_fns.h"
  54 #include "orte/runtime/orte_wait.h"
  55 #include "orte/runtime/orte_globals.h"
  56 #include "orte/mca/rml/rml.h"
  57 #include "orte/mca/rml/rml_types.h"
  58 #include "orte/mca/odls/odls.h"
  59 #include "orte/mca/odls/base/odls_private.h"
  60 #include "orte/mca/errmgr/errmgr.h"
  61 #include "orte/mca/routed/routed.h"
  62 #include "orte/mca/grpcomm/grpcomm.h"
  63 
  64 #include "orte/mca/snapc/snapc.h"
  65 #include "orte/mca/snapc/base/base.h"
  66 
  67 #include "snapc_full.h"
  68 
  69 /************************************
  70  * Locally Global vars & functions :)
  71  ************************************/
  72 static orte_jobid_t current_local_jobid = ORTE_JOBID_INVALID;
  73 static orte_snapc_base_global_snapshot_t local_global_snapshot;
  74 
  75 static int current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_NONE;
  76 
  77 static bool currently_migrating = false;
  78 static bool flushed_modex = false;
  79 static bool sstore_local_sync_finished = false;
  80 static bool sstore_local_procs_finished = false;
  81 
  82 static int local_define_pipe_names(orte_snapc_full_app_snapshot_t *vpid_snapshot);
  83 
  84 static bool snapc_local_hnp_recv_issued = false;
  85 static int  snapc_full_local_start_hnp_listener(void);
  86 static int  snapc_full_local_stop_hnp_listener(void);
  87 static void snapc_full_local_hnp_cmd_recv(int status,
  88                                           orte_process_name_t* sender,
  89                                           opal_buffer_t* buffer,
  90                                           orte_rml_tag_t tag,
  91                                           void* cbdata);
  92 
  93 static bool snapc_local_app_recv_issued = false;
  94 static int  snapc_full_local_start_app_listener(void);
  95 static int  snapc_full_local_stop_app_listener(void);
  96 static void snapc_full_local_app_cmd_recv(int status,
  97                                        orte_process_name_t* sender,
  98                                        opal_buffer_t* buffer,
  99                                        orte_rml_tag_t tag,
 100                                        void* cbdata);
 101 
 102 static orte_snapc_full_app_snapshot_t *find_vpid_snapshot(orte_process_name_t *name );
 103 static int snapc_full_local_get_vpids(void);
 104 static int snapc_full_local_refresh_vpids(void);
 105 
 106 #if OPAL_ENABLE_CRDEBUG == 1
 107 static int snapc_full_local_send_restart_proc_info(void);
 108 #endif
 109 
 110 static void snapc_full_local_process_job_update_cmd(orte_process_name_t* sender,
 111                                                     opal_buffer_t* buffer,
 112                                                     bool quick);
 113 
 114 static int local_coord_job_state_update_finished_local(void);
 115 static int local_coord_job_state_update_finished_local_vpid(orte_snapc_full_app_snapshot_t *vpid_snapshot);
 116 
 117 #if 0
 118 static int snapc_full_establish_dir(void);
 119 #endif
 120 static int snapc_full_get_min_state(void);
 121 
 122 static int snapc_full_local_update_coord(int state, bool quick);
 123 
 124 static int snapc_full_local_start_checkpoint_all(int ckpt_state,
 125                                                  opal_crs_base_ckpt_options_t *options);
 126 static int snapc_full_local_start_ckpt_open_comm(orte_snapc_full_app_snapshot_t *vpid_snapshot);
 127 static int snapc_full_local_start_ckpt_handshake_opts(orte_snapc_full_app_snapshot_t *vpid_snapshot,
 128                                                       opal_crs_base_ckpt_options_t *options);
 129 static int snapc_full_local_start_ckpt_handshake(orte_snapc_full_app_snapshot_t *vpid_snapshot);
 130 static int snapc_full_local_end_ckpt_handshake(orte_snapc_full_app_snapshot_t *vpid_snapshot);
 131 static void snapc_full_local_comm_read_event(int fd, short flags, void *arg);
 132 
 133 static int orte_snapc_full_local_reset_coord(void);
 134 
 135 /************************
 136  * Function Definitions
 137  ************************/
 138 int local_coord_init( void )
 139 {
 140     current_local_jobid = ORTE_JOBID_INVALID;
 141     current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_NONE;
 142 
 143     return ORTE_SUCCESS;
 144 }
 145 
 146 int local_coord_finalize( void )
 147 {
 148     if( ORTE_JOBID_INVALID != current_local_jobid ) {
 149         return local_coord_release_job(current_local_jobid);
 150     }
 151 
 152     current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_NONE;
 153 
 154     return ORTE_SUCCESS;
 155 }
 156 
 157 int local_coord_setup_job(orte_jobid_t jobid)
 158 {
 159     int ret, exit_status = ORTE_SUCCESS;
 160     orte_snapc_full_app_snapshot_t *vpid_snapshot = NULL;
 161     opal_list_item_t* item = NULL;
 162 
 163     /*
 164      * Set the jobid that we are responsible for
 165      */
 166     if( jobid == current_local_jobid ) {
 167         /* If we pass this way twice, we must be restarting.
 168          * so just refresh the vpid structure
 169          */
 170         if( currently_migrating ) {
 171             OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 172                                  "Local) Restarting Job %s from Migration...",
 173                                  ORTE_JOBID_PRINT(jobid)));
 174             if( ORTE_SUCCESS != (ret = snapc_full_local_refresh_vpids() ) ) {
 175                 ORTE_ERROR_LOG(ret);
 176                 exit_status = ret;
 177                 goto cleanup;
 178             }
 179         }
 180         else {
 181             OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 182                                  "Local) Restarting Job %s...",
 183                                  ORTE_JOBID_PRINT(jobid)));
 184 
 185             for(item  = opal_list_get_first(&(local_global_snapshot.local_snapshots));
 186                 item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
 187                 item  = opal_list_get_next(item) ) {
 188                 vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
 189                 opal_list_remove_item(&(local_global_snapshot.local_snapshots), item);
 190             }
 191 
 192             if( ORTE_SUCCESS != (ret = snapc_full_local_get_vpids() ) ) {
 193                 ORTE_ERROR_LOG(ret);
 194                 exit_status = ret;
 195                 goto cleanup;
 196             }
 197 
 198             for(item  = opal_list_get_first(&(local_global_snapshot.local_snapshots));
 199                 item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
 200                 item  = opal_list_get_next(item) ) {
 201                 vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
 202                 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 203                                      "Local) Restarting Job %s: Daemon %s \t Process %s",
 204                                      ORTE_JOBID_PRINT(jobid),
 205                                      ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 206                                      ORTE_NAME_PRINT(&vpid_snapshot->super.process_name)));
 207             }
 208         }
 209 
 210         exit_status = ORTE_SUCCESS;
 211         goto cleanup;
 212     }
 213     else if( ORTE_JOBID_INVALID != current_local_jobid ) {
 214         opal_output(mca_snapc_full_component.super.output_handle,
 215                     "Local) Setup of job %s Failed! Already setup job %s\n",
 216                     ORTE_JOBID_PRINT(jobid), ORTE_JOBID_PRINT(current_local_jobid));
 217         exit_status = ORTE_SUCCESS;
 218         goto cleanup;
 219     }
 220 
 221     current_local_jobid = jobid;
 222     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 223                          "Local) Setting up jobid %s\n",
 224                          ORTE_JOBID_PRINT(current_local_jobid)));
 225 
 226     /*
 227      * Get the list of vpid's that we care about
 228      */
 229     OBJ_CONSTRUCT(&local_global_snapshot, orte_snapc_base_global_snapshot_t);
 230 
 231     if( ORTE_SUCCESS != (ret = snapc_full_local_get_vpids()) ) {
 232         ORTE_ERROR_LOG(ret);
 233         exit_status = ret;
 234         goto cleanup;
 235     }
 236 
 237     /*
 238      * Wait for the snapshot directory to be established before registering
 239      * the callbacks since they use the same tag.
 240      */
 241 #if 0
 242     if(orte_snapc_base_establish_global_snapshot_dir) {
 243         if( ORTE_SUCCESS != (ret = snapc_full_establish_dir() ) ) {
 244             ORTE_ERROR_LOG(ret);
 245             exit_status = ret;
 246             goto cleanup;
 247         }
 248     }
 249 #endif
 250 
 251     /*
 252      * Setup Global Coordinator listener
 253      */
 254     if( ORTE_SUCCESS != (ret = snapc_full_local_start_hnp_listener() ) ) {
 255         ORTE_ERROR_LOG(ret);
 256         exit_status = ret;
 257         goto cleanup;
 258     }
 259 
 260     /*
 261      * Setup Global Coordinator listener for Application updates
 262      */
 263     if( ORTE_SUCCESS != (ret = snapc_full_local_start_app_listener() ) ) {
 264         ORTE_ERROR_LOG(ret);
 265         exit_status = ret;
 266         goto cleanup;
 267     }
 268 
 269 
 270     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 271                          "Local) Finished setup of job %s",
 272                          ORTE_JOBID_PRINT(current_local_jobid) ));
 273 
 274  cleanup:
 275     return exit_status;
 276 }
 277 
 278 int local_coord_release_job(orte_jobid_t jobid)
 279 {
 280     int ret, exit_status = ORTE_SUCCESS;
 281     orte_snapc_full_app_snapshot_t *vpid_snapshot;
 282     opal_list_item_t* item = NULL;
 283     bool is_done = true;
 284 
 285     /*
 286      * Wait around until we hear back from the checkpoint requests that
 287      * we have outstanding.
 288      */
 289     do {
 290         is_done = true;
 291 
 292         for(item  = opal_list_get_first(&(local_global_snapshot.local_snapshots));
 293             item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
 294             item  = opal_list_get_next(item) ) {
 295             vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
 296 
 297             if(ORTE_SNAPC_CKPT_STATE_NONE      != vpid_snapshot->super.state &&
 298                ORTE_SNAPC_CKPT_STATE_ERROR     != vpid_snapshot->super.state &&
 299                ORTE_SNAPC_CKPT_STATE_ESTABLISHED  != vpid_snapshot->super.state &&
 300                ORTE_SNAPC_CKPT_STATE_RECOVERED != vpid_snapshot->super.state ) {
 301                 is_done = false;
 302                 break;
 303             }
 304             else {
 305                 opal_list_remove_item(&(local_global_snapshot.local_snapshots), item);
 306             }
 307         }
 308         if( !is_done ) {
 309             opal_progress();
 310         }
 311     } while(!is_done);
 312 
 313     OBJ_DESTRUCT(&local_global_snapshot);
 314 
 315     /*
 316      * Stop Global Coordinator listeners
 317      */
 318     if( ORTE_SUCCESS != (ret = snapc_full_local_stop_app_listener() ) ) {
 319         ORTE_ERROR_LOG(ret);
 320         exit_status = ret;
 321     }
 322 
 323     if( ORTE_SUCCESS != (ret = snapc_full_local_stop_hnp_listener() ) ) {
 324         ORTE_ERROR_LOG(ret);
 325         exit_status = ret;
 326     }
 327 
 328     return exit_status;
 329 }
 330 
 331 /******************
 332  * Local functions
 333  ******************/
 334 
 335 /******************
 336  * Setup Listeners
 337  ******************/
 338 static int snapc_full_local_start_hnp_listener(void)
 339 {
 340     /*
 341      * Global Coordinator: Do not register a Local listener
 342      */
 343     if( ORTE_SNAPC_GLOBAL_COORD_TYPE == (orte_snapc_coord_type & ORTE_SNAPC_GLOBAL_COORD_TYPE)) {
 344         return ORTE_SUCCESS;
 345     }
 346 
 347     if (snapc_local_hnp_recv_issued ) {
 348         return ORTE_SUCCESS;
 349     }
 350 
 351     OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
 352                          "Local) Startup Coordinator Channel"));
 353 
 354     /*
 355      * Coordinator command listener
 356      */
 357     orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SNAPC_FULL,
 358                             ORTE_RML_PERSISTENT, snapc_full_local_hnp_cmd_recv, NULL);
 359 
 360     snapc_local_hnp_recv_issued = true;
 361 
 362     return ORTE_SUCCESS;
 363 }
 364 
 365 static int snapc_full_local_stop_hnp_listener(void)
 366 {
 367     /*
 368      * Global Coordinator: Does not register a Local listener
 369      */
 370     if( ORTE_SNAPC_GLOBAL_COORD_TYPE == (orte_snapc_coord_type & ORTE_SNAPC_GLOBAL_COORD_TYPE)) {
 371         return ORTE_SUCCESS;
 372     }
 373 
 374     if (!snapc_local_hnp_recv_issued ) {
 375         return ORTE_SUCCESS;
 376     }
 377 
 378     OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
 379                          "Local) Shutdown Coordinator Channel"));
 380 
 381     orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SNAPC_FULL);
 382 
 383     snapc_local_hnp_recv_issued = false;
 384     return ORTE_SUCCESS;
 385 }
 386 
 387 static int snapc_full_local_start_app_listener(void)
 388 {
 389     if (snapc_local_app_recv_issued) {
 390         return ORTE_SUCCESS;
 391     }
 392 
 393     OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
 394                          "Local) Startup Application State Channel"));
 395 
 396     /*
 397      * Coordinator command listener
 398      */
 399     orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SNAPC,
 400                             ORTE_RML_PERSISTENT, snapc_full_local_app_cmd_recv,
 401                             NULL);
 402 
 403     snapc_local_app_recv_issued = true;
 404     return ORTE_SUCCESS;
 405 }
 406 
 407 static int snapc_full_local_stop_app_listener(void)
 408 {
 409     if (!snapc_local_app_recv_issued ) {
 410         return ORTE_SUCCESS;
 411     }
 412 
 413     OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
 414                          "Local) Shutdown Application State Channel"));
 415 
 416     orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SNAPC);
 417 
 418     snapc_local_app_recv_issued = false;
 419     return ORTE_SUCCESS;
 420 }
 421 
 422 /******************
 423  * Listener Handlers
 424  ******************/
 425 void snapc_full_local_app_cmd_recv(int status,
 426                                    orte_process_name_t* sender,
 427                                    opal_buffer_t* buffer,
 428                                    orte_rml_tag_t tag,
 429                                    void* cbdata)
 430 {
 431     int ret;
 432     opal_list_item_t* item = NULL;
 433     orte_snapc_full_app_snapshot_t *vpid_snapshot = NULL;
 434     orte_snapc_cmd_flag_t command;
 435     orte_process_name_t proc;
 436     pid_t proc_pid = 0;
 437     orte_std_cntr_t count;
 438     int cr_state;
 439     bool is_done;
 440 #if OPAL_ENABLE_CRDEBUG == 1
 441     bool all_done = false;
 442     bool crdebug_enabled = false;
 443 #endif
 444 
 445     if( ORTE_RML_TAG_SNAPC != tag ) {
 446         opal_output(mca_snapc_full_component.super.output_handle,
 447                     "Local) Error: Unknown tag: Received a command message from %s (tag = %d).",
 448                     ORTE_NAME_PRINT(sender), tag);
 449         ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
 450         return;
 451     }
 452 
 453     /*
 454      * Verify the command
 455      */
 456     count = 1;
 457     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SNAPC_CMD))) {
 458         ORTE_ERROR_LOG(ret);
 459         goto cleanup;
 460     }
 461 
 462     if( ORTE_SNAPC_LOCAL_UPDATE_CMD != command &&
 463         ORTE_SNAPC_LOCAL_FINISH_CMD != command ) {
 464         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 465                              "Local) Warning: Expected an application command (%d) but received (%d)\n",
 466                              ORTE_SNAPC_LOCAL_UPDATE_CMD, command));
 467         goto cleanup;
 468     }
 469 
 470     if( ORTE_SNAPC_LOCAL_UPDATE_CMD == command ) {
 471         /*
 472          * This is the local process contacting us with its updated pid information
 473          */
 474         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 475                              "Local) Application: Update pid operation."));
 476 
 477         /*
 478          * Unpack the data
 479          * - process name
 480          * - PID
 481          */
 482         count = 1;
 483         /* JJH CLEANUP: Do we really need this, it is equal to sender */
 484         if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &proc, &count, ORTE_NAME))) {
 485             ORTE_ERROR_LOG(ret);
 486             goto cleanup;
 487         }
 488         count = 1;
 489         if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &proc_pid, &count, OPAL_PID))) {
 490             ORTE_ERROR_LOG(ret);
 491             goto cleanup;
 492         }
 493 #if OPAL_ENABLE_CRDEBUG == 1
 494         count = 1;
 495         if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &crdebug_enabled, &count, OPAL_BOOL))) {
 496             ORTE_ERROR_LOG(ret);
 497             goto cleanup;
 498         }
 499 #endif
 500 
 501         if( NULL == (vpid_snapshot = find_vpid_snapshot(&proc)) ) {
 502             ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 503             goto cleanup;
 504         }
 505 
 506         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 507                              "Local) Updated PID: %s : %d -> %d",
 508                              ORTE_NAME_PRINT(&vpid_snapshot->super.process_name), vpid_snapshot->process_pid, proc_pid));
 509 
 510         /* JJH: Maybe we should save the old and the newly restarted pid? */
 511         vpid_snapshot->process_pid = proc_pid;
 512         vpid_snapshot->finished = true;
 513 
 514 #if OPAL_ENABLE_CRDEBUG == 1
 515         /*
 516          * Once we have received all updates we should send them to the Global coord
 517          */
 518         if( crdebug_enabled ) {
 519             all_done = true;
 520             for(item  = opal_list_get_first(&(local_global_snapshot.local_snapshots));
 521                 item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
 522                 item  = opal_list_get_next(item) ) {
 523                 vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
 524                 if( !vpid_snapshot->finished ) {
 525                     all_done = false;
 526                     break;
 527                 }
 528             }
 529             if( all_done ) {
 530                 /* If C/R Debugging then send hostlist */
 531                 if( ORTE_SUCCESS != (ret = snapc_full_local_send_restart_proc_info() ) ) {
 532                     ORTE_ERROR_LOG(ret);
 533                     goto cleanup;
 534                 }
 535             }
 536         }
 537 #endif
 538 
 539         /* Note: We should not update the ORTE structure since, if the CRS uses
 540          * an intermediary restart mechanism (e.g., BLCR's cr_restart) that
 541          * forks a child, then this process cannot call waitpit() on it.
 542          */
 543     }
 544     else if( ORTE_SNAPC_LOCAL_FINISH_CMD == command ) {
 545         /*
 546          * Unpack the data
 547          * - cr_state
 548          */
 549         count = 1;
 550         if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &cr_state, &count, OPAL_INT))) {
 551             ORTE_ERROR_LOG(ret);
 552             goto cleanup;
 553         }
 554 
 555         if( NULL == (vpid_snapshot = find_vpid_snapshot(sender)) ) {
 556             opal_output(0, "Local) Failed to find process %s",
 557                         ORTE_NAME_PRINT(sender));
 558             ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 559             goto cleanup;
 560         }
 561 
 562         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 563                              "Local) Process %s Finished Recovery (%d)",
 564                              ORTE_NAME_PRINT(&vpid_snapshot->super.process_name),
 565                              cr_state));
 566 
 567         vpid_snapshot->super.state = ORTE_SNAPC_CKPT_STATE_RECOVERED;
 568 
 569         /*
 570          * Check if we are done
 571          */
 572         is_done = true;
 573         for(item  = opal_list_get_first(&(local_global_snapshot.local_snapshots));
 574             item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
 575             item  = opal_list_get_next(item) ) {
 576             vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
 577 
 578             if( ORTE_SNAPC_CKPT_STATE_RECOVERED != vpid_snapshot->super.state ) {
 579                 is_done = false;
 580                 break;
 581             }
 582         }
 583 
 584         if( is_done ) {
 585             /*
 586              * Tell the Global Coordinator that all of our processes are finished
 587              */
 588             OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
 589                                  "Local) Job Ckpt finished - Confirmed! Tell the Global Coord\n"));
 590 
 591             if( ORTE_SUCCESS != (ret = snapc_full_local_update_coord(ORTE_SNAPC_CKPT_STATE_RECOVERED, true) ) ) {
 592                 ORTE_ERROR_LOG(ret);
 593                 goto cleanup;
 594             }
 595 
 596             /*
 597              * If we are not finished sync'ing then delay cleanup
 598              */
 599             if( !sstore_local_sync_finished ) {
 600                 sstore_local_procs_finished = true;
 601                 OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
 602                                      "Local) Job Ckpt finished - Confirmed! Not finished Syncing...\n"));
 603             } else {
 604                 /*
 605                  * Cleanup
 606                  */
 607                 if( ORTE_SUCCESS != (ret = orte_snapc_full_local_reset_coord()) ) {
 608                     ORTE_ERROR_LOG(ret);
 609                     goto cleanup;
 610                 }
 611             }
 612         }
 613     }
 614 
 615  cleanup:
 616     return;
 617 }
 618 
 619 #if OPAL_ENABLE_CRDEBUG == 1
 620 static int snapc_full_local_send_restart_proc_info(void)
 621 {
 622     int ret, exit_status = ORTE_SUCCESS;
 623     orte_snapc_full_app_snapshot_t *vpid_snapshot = NULL;
 624     opal_list_item_t* item = NULL;
 625     opal_buffer_t *buffer = NULL;
 626     orte_snapc_full_cmd_flag_t command = ORTE_SNAPC_FULL_RESTART_PROC_INFO;
 627     size_t num_vpids = 0;
 628 
 629     /*
 630      * Global Coordinator: Operate locally
 631      */
 632     if( ORTE_SNAPC_GLOBAL_COORD_TYPE == (orte_snapc_coord_type & ORTE_SNAPC_GLOBAL_COORD_TYPE)) {
 633         for(item  = opal_list_get_first(&(local_global_snapshot.local_snapshots));
 634             item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
 635             item  = opal_list_get_next(item) ) {
 636             vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
 637             global_coord_restart_proc_info(vpid_snapshot->process_pid, orte_process_info.nodename);
 638         }
 639         /* stdout may be buffered by the C library so it needs to be flushed so
 640          * that the debugger can read the process info.
 641          */
 642         fflush(stdout);
 643         return ORTE_SUCCESS;
 644     }
 645 
 646     buffer = OBJ_NEW(opal_buffer_t);
 647     /*
 648      * Local Coordinator: Send Global Coordinator the information
 649      * [ hostname, num_pids, {pids} ]
 650      */
 651     num_vpids = opal_list_get_size(&(local_global_snapshot.local_snapshots));
 652     if( num_vpids <= 0 ) {
 653         return exit_status;
 654     }
 655 
 656     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
 657         ORTE_ERROR_LOG(ret);
 658         exit_status = ret;
 659         goto cleanup;
 660     }
 661 
 662     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(orte_process_info.nodename), 1, OPAL_STRING))) {
 663         ORTE_ERROR_LOG(ret);
 664         exit_status = ret;
 665         goto cleanup;
 666     }
 667 
 668     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &num_vpids, 1, OPAL_SIZE))) {
 669         ORTE_ERROR_LOG(ret);
 670         exit_status = ret;
 671         goto cleanup;
 672     }
 673 
 674     for(item  = opal_list_get_first(&(local_global_snapshot.local_snapshots));
 675         item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
 676         item  = opal_list_get_next(item) ) {
 677         vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
 678 
 679         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(vpid_snapshot->process_pid), 1, OPAL_PID))) {
 680             ORTE_ERROR_LOG(ret);
 681             exit_status = ret;
 682             goto cleanup;
 683         }
 684 
 685     }
 686 
 687     if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer, ORTE_RML_TAG_SNAPC_FULL,
 688                                                        orte_rml_send_callback, 0))) {
 689         ORTE_ERROR_LOG(ret);
 690         exit_status = ret;
 691         goto cleanup;
 692     }
 693 
 694     return ORTE_SUCCESS;
 695 
 696  cleanup:
 697     OBJ_RELEASE(buffer);
 698 
 699     return exit_status;
 700 }
 701 #endif
 702 
 703 void snapc_full_local_hnp_cmd_recv(int status,
 704                                    orte_process_name_t* sender,
 705                                    opal_buffer_t* buffer,
 706                                    orte_rml_tag_t tag,
 707                                    void* cbdata)
 708 {
 709     int ret;
 710     orte_snapc_full_cmd_flag_t command;
 711     orte_std_cntr_t count;
 712 
 713     if( ORTE_RML_TAG_SNAPC_FULL != tag ) {
 714         opal_output(mca_snapc_full_component.super.output_handle,
 715                     "Local) Error: Unknown tag: Received a command message from %s (tag = %d).",
 716                     ORTE_NAME_PRINT(sender), tag);
 717         ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
 718         return;
 719     }
 720 
 721     /*
 722      * This is a Global Coordinator message.
 723      */
 724     OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
 725                          "Local) Receive a command message."));
 726 
 727     count = 1;
 728     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SNAPC_FULL_CMD))) {
 729         ORTE_ERROR_LOG(ret);
 730         goto cleanup;
 731     }
 732 
 733     switch (command) {
 734         case ORTE_SNAPC_FULL_UPDATE_JOB_STATE_QUICK_CMD:
 735             OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 736                                  "Local) Command: Update Job state command (quick)"));
 737 
 738             snapc_full_local_process_job_update_cmd(sender, buffer, true);
 739             break;
 740 
 741         case ORTE_SNAPC_FULL_UPDATE_JOB_STATE_CMD:
 742             OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 743                                  "Local) Command: Update Job state command"));
 744 
 745             snapc_full_local_process_job_update_cmd(sender, buffer, false);
 746             break;
 747 
 748         case ORTE_SNAPC_FULL_RESTART_PROC_INFO:
 749             OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 750                                  "Local) Command: Update hostname/pid associations"));
 751             /* Nothing to do */
 752             break;
 753 
 754         default:
 755             ORTE_ERROR_LOG(ORTE_ERR_VALUE_OUT_OF_BOUNDS);
 756     }
 757 
 758  cleanup:
 759     return;
 760 }
 761 
 762 static void snapc_full_local_process_job_update_cmd(orte_process_name_t* sender,
 763                                                     opal_buffer_t* buffer,
 764                                                     bool quick)
 765 {
 766     int ret;
 767     orte_jobid_t jobid;
 768     int job_ckpt_state;
 769     orte_std_cntr_t count;
 770     opal_crs_base_ckpt_options_t *options = NULL;
 771     bool loc_migrating = false;
 772     size_t loc_num_procs = 0;
 773     orte_process_name_t proc_name;
 774     size_t i;
 775     orte_snapc_full_app_snapshot_t *vpid_snapshot = NULL;
 776     opal_list_item_t* item = NULL;
 777     orte_sstore_base_handle_t ss_handle;
 778 
 779     /*
 780      * Unpack the data (quick)
 781      * - jobid
 782      * - ckpt_state
 783      * - sstore_handle
 784      * Unpack the data (long)
 785      * - jobid
 786      * - ckpt_state
 787      * - ckpt_options
 788      */
 789     count = 1;
 790     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &jobid, &count, ORTE_JOBID))) {
 791         ORTE_ERROR_LOG(ret);
 792         return;
 793     }
 794 
 795     count = 1;
 796     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &job_ckpt_state, &count, OPAL_INT))) {
 797         ORTE_ERROR_LOG(ret);
 798         return;
 799     }
 800 
 801     if( !quick ) {
 802         if (ORTE_SUCCESS != (ret = orte_sstore.unpack_handle(sender, buffer, &ss_handle)) ) {
 803             ORTE_ERROR_LOG(ret);
 804             return;
 805         }
 806 
 807         options = OBJ_NEW(opal_crs_base_ckpt_options_t);
 808         if( ORTE_SUCCESS != (ret = orte_snapc_base_unpack_options(buffer, options)) ) {
 809             ORTE_ERROR_LOG(ret);
 810             goto cleanup;
 811         }
 812         /* In this case we want to use the current_local_options that are cached
 813          * so that we do not have to send them every time.
 814          */
 815         opal_crs_base_copy_options(options, local_global_snapshot.options);
 816 
 817         count = 1;
 818         if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(loc_migrating), &count, OPAL_BOOL))) {
 819             ORTE_ERROR_LOG(ret);
 820             goto cleanup;
 821         }
 822 
 823         if( loc_migrating ) {
 824             currently_migrating = true;
 825 
 826             count = 1;
 827             if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &loc_num_procs, &count, OPAL_SIZE))) {
 828                 ORTE_ERROR_LOG(ret);
 829                 goto cleanup;
 830             }
 831 
 832             for( i = 0; i < loc_num_procs; ++i ) {
 833                 count = 1;
 834                 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &proc_name, &count, ORTE_NAME))) {
 835                     ORTE_ERROR_LOG(ret);
 836                     goto cleanup;
 837                 }
 838 
 839                 /*
 840                  * See if we are watching this process
 841                  */
 842                 for(item  = opal_list_get_first(&(local_global_snapshot.local_snapshots));
 843                     item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
 844                     item  = opal_list_get_next(item) ) {
 845                     vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
 846 
 847                     if( OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL,
 848                                                                     &(vpid_snapshot->super.process_name),
 849                                                                     &proc_name) ) {
 850                         vpid_snapshot->migrating = true;
 851                         break;
 852                     }
 853                 }
 854             }
 855         }
 856     }
 857 
 858     if( ORTE_SUCCESS != (ret = local_coord_job_state_update(jobid,
 859                                                             job_ckpt_state,
 860                                                             ss_handle,
 861                                                             local_global_snapshot.options)) ) {
 862         ORTE_ERROR_LOG(ret);
 863         return;
 864     }
 865 
 866  cleanup:
 867     if( NULL != options ) {
 868         OBJ_RELEASE(options);
 869         options = NULL;
 870     }
 871 
 872     return;
 873 }
 874 
 875 
 876 int local_coord_job_state_update(orte_jobid_t jobid,
 877                                  int    job_ckpt_state,
 878                                  orte_sstore_base_handle_t ss_handle,
 879                                  opal_crs_base_ckpt_options_t *options)
 880 {
 881     int ret, exit_status = ORTE_SUCCESS;
 882     orte_snapc_full_app_snapshot_t *vpid_snapshot = NULL;
 883     opal_list_item_t* item = NULL;
 884     char * state_str = NULL;
 885 
 886     /* Save Options */
 887     opal_crs_base_copy_options(options, local_global_snapshot.options);
 888 
 889     OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
 890                          "Local) Job %s: Changed to state to:\n",
 891                          ORTE_JOBID_PRINT(jobid)));
 892     orte_snapc_ckpt_state_str(&state_str, job_ckpt_state);
 893     OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
 894                          "Local)    Job State:        %d (%s)\n",
 895                          (int)job_ckpt_state, state_str ));
 896     free(state_str);
 897     state_str = NULL;
 898 
 899     /*
 900      * Update the vpid structure if we need to.
 901      * Really only need to if we don't have valid information (PID)
 902      * for the application.
 903      */
 904     if( ORTE_SUCCESS != (ret = snapc_full_local_get_vpids() ) ) {
 905         ORTE_ERROR_LOG(ret);
 906         exit_status = ret;
 907         goto cleanup;
 908     }
 909 
 910     current_job_ckpt_state = job_ckpt_state;
 911 
 912     /*
 913      * If we have been asked to checkpoint do so
 914      */
 915     if( ORTE_SNAPC_CKPT_STATE_PENDING      == job_ckpt_state ) {
 916         /*
 917          * Register with the SStore
 918          */
 919         local_global_snapshot.ss_handle = ss_handle;
 920         orte_sstore.register_handle(local_global_snapshot.ss_handle);
 921 
 922         /*
 923          * For each of the processes we are tasked with, start their checkpoints
 924          */
 925         for(item  = opal_list_get_first(&(local_global_snapshot.local_snapshots));
 926             item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
 927             item  = opal_list_get_next(item) ) {
 928             vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
 929 
 930             vpid_snapshot->super.state = job_ckpt_state;
 931             vpid_snapshot->finished = false;
 932         }
 933 
 934         /*
 935          * Start checkpointing all local processes
 936          */
 937         if( ORTE_SUCCESS != (ret = snapc_full_local_start_checkpoint_all(job_ckpt_state, options) ) ) {
 938             ORTE_ERROR_LOG(ret);
 939             exit_status = ret;
 940             goto cleanup;
 941         }
 942     }
 943     else if( ORTE_SNAPC_CKPT_STATE_MIGRATING  == job_ckpt_state ) {
 944         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 945                              "Local) Migrating: Display a list of processes migrating"));
 946 
 947         for(item  = opal_list_get_first(&(local_global_snapshot.local_snapshots));
 948             item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
 949             item  = opal_list_get_next(item) ) {
 950             vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
 951             /*
 952              * If this process migrated away, then remove it from our list.
 953              */
 954             if( vpid_snapshot->migrating ) {
 955                 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 956                                      "Local) Migrating:    %s",
 957                                      ORTE_NAME_PRINT(&vpid_snapshot->super.process_name) ));
 958             }
 959         }
 960     }
 961     /*
 962      * Release all checkpointed processes now that the checkpoint is complete
 963      * If the request was to checkpoint then terminate this command will tell
 964      * the application to do so upon release.
 965      */
 966     else if( ORTE_SNAPC_CKPT_STATE_FINISHED_LOCAL  == job_ckpt_state ) {
 967         OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
 968                              "Local) Locally finished, release all processes\n"));
 969         if( ORTE_SUCCESS != (ret = local_coord_job_state_update_finished_local() ) ) {
 970             ORTE_ERROR_LOG(ORTE_ERROR);
 971             exit_status = ORTE_ERROR;
 972             goto cleanup;
 973         }
 974     }
 975     /*
 976      * Once we get the FINISHED state then the checkpoint is all done, and we
 977      * reset our state to NONE.
 978      */
 979     else if( ORTE_SNAPC_CKPT_STATE_ESTABLISHED  == job_ckpt_state ) {
 980         /*
 981          * Wait to cleanup until all have reported
 982          */
 983     }
 984     else {
 985         ;
 986     }
 987 
 988  cleanup:
 989     if( NULL != state_str ) {
 990         free(state_str);
 991         state_str = NULL;
 992     }
 993 
 994     return exit_status;
 995 }
 996 
 997 static int local_coord_job_state_update_finished_local(void)
 998 {
 999     int ret, exit_status = ORTE_SUCCESS;
1000     orte_snapc_full_app_snapshot_t *vpid_snapshot = NULL;
1001     opal_list_item_t* item = NULL;
1002 
1003     OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
1004                          "Local) Job Ckpt finished tell all processes\n"));
1005 
1006     for(item  = opal_list_get_first(&(local_global_snapshot.local_snapshots));
1007         item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
1008         item  = opal_list_get_next(item) ) {
1009         vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
1010 
1011         if( ORTE_SUCCESS != (ret = local_coord_job_state_update_finished_local_vpid(vpid_snapshot) ) ) {
1012             ORTE_ERROR_LOG(ORTE_ERROR);
1013             exit_status = ORTE_ERROR;
1014             goto cleanup;
1015         }
1016 
1017         if( vpid_snapshot->migrating ) {
1018             OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1019                                  "Local) Removing Migrated Process:    %s",
1020                                  ORTE_NAME_PRINT(&vpid_snapshot->super.process_name) ));
1021             opal_list_remove_item(&(local_global_snapshot.local_snapshots), item);
1022         }
1023     }
1024  cleanup:
1025     return exit_status;
1026 }
1027 
1028 static int local_coord_job_state_update_finished_local_vpid(orte_snapc_full_app_snapshot_t *vpid_snapshot)
1029 {
1030     int ret;
1031 
1032     OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
1033                          "Local)   Tell process %s (Ckpt Finished) %s\n",
1034                          ORTE_NAME_PRINT(&vpid_snapshot->super.process_name),
1035                          (vpid_snapshot->migrating ? "- Migrating, Skip" : "") ));
1036 
1037     /*
1038      * If this process is migrating, it has already been told
1039      */
1040     if( vpid_snapshot->migrating ) {
1041         return ORTE_SUCCESS;
1042     }
1043 
1044     if( ORTE_SUCCESS != (ret = snapc_full_local_end_ckpt_handshake(vpid_snapshot) ) ) {
1045         opal_output(mca_snapc_full_component.super.output_handle,
1046                     "Local) Error: Unable to finish the handshake with peer %s. %d\n",
1047                     ORTE_NAME_PRINT(&vpid_snapshot->super.process_name), ret);
1048         ORTE_ERROR_LOG(ORTE_ERROR);
1049         return ORTE_ERROR;
1050     }
1051 
1052     return ORTE_SUCCESS;
1053 }
1054 
1055 
1056 /************************
1057  * Start the checkpoint
1058  ************************/
1059 static int snapc_full_local_start_checkpoint_all(int ckpt_state,
1060                                                  opal_crs_base_ckpt_options_t *options)
1061 {
1062     int ret, exit_status = ORTE_SUCCESS;
1063     orte_snapc_full_app_snapshot_t *vpid_snapshot;
1064     opal_list_item_t* item = NULL;
1065     size_t num_stopped = 0;
1066     int waitpid_status = 0;
1067 
1068     /*
1069      * Pass 1: make sure all vpids are setup correctly
1070      * This is a sanity check. Most of the time it will not be necessary.
1071      */
1072     OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1073                          "Local) start() Pass 1: Sanity check"));
1074 
1075     for(item  = opal_list_get_first(&(local_global_snapshot.local_snapshots));
1076         item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
1077         item  = opal_list_get_next(item) ) {
1078         vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
1079 
1080         /* Dummy check */
1081         if( vpid_snapshot->process_pid == 0 ) {
1082             ret = snapc_full_local_get_vpids();
1083             if( ORTE_SUCCESS != ret || vpid_snapshot->process_pid == 0 ) {
1084                 opal_output( mca_snapc_full_component.super.output_handle,
1085                              "local) Cannot checkpoint an invalid pid (%d)\n",
1086                              vpid_snapshot->process_pid);
1087                 ORTE_ERROR_LOG(ORTE_ERROR);
1088                 exit_status = ORTE_ERROR;
1089                 goto cleanup;
1090             }
1091             break;
1092         }
1093     }
1094 
1095     /*
1096      * Pass 2: Start process of opening communication channels
1097      */
1098     OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1099                          "Local) start() Pass 2: Signal Procs"));
1100     for(item  = opal_list_get_first(&(local_global_snapshot.local_snapshots));
1101         item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
1102         item  = opal_list_get_next(item) ) {
1103         vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
1104 
1105         /*
1106          * Create named pipe references for this process
1107          */
1108         local_define_pipe_names(vpid_snapshot);
1109 
1110         OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
1111                              "Local) Signal process (%d) with signal %d\n",
1112                              (int) vpid_snapshot->process_pid,
1113                              opal_cr_entry_point_signal));
1114 
1115         /*
1116          * Signal the application
1117          */
1118         if( 0 != (ret = kill(vpid_snapshot->process_pid, opal_cr_entry_point_signal) ) ) {
1119             opal_output(mca_snapc_full_component.super.output_handle,
1120                         "local) Error: Failed to signal process %d with signal %d. %d\n",
1121                         (int) vpid_snapshot->process_pid,
1122                         opal_cr_entry_point_signal,
1123                         ret);
1124             exit_status = ret;
1125             goto cleanup;
1126         }
1127     }
1128 
1129     /*
1130      * Pass 3: Wait for channels to open up
1131      */
1132     OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1133                          "Local) start() Pass 3: Open pipes"));
1134     for(item  = opal_list_get_first(&(local_global_snapshot.local_snapshots));
1135         item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
1136         item  = opal_list_get_next(item) ) {
1137         vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
1138 
1139         if( ORTE_SUCCESS != (ret = snapc_full_local_start_ckpt_open_comm(vpid_snapshot) ) ) {
1140             opal_output(mca_snapc_full_component.super.output_handle,
1141                         "local) Error: Unable to initiate the handshake with peer %s. %d\n",
1142                         ORTE_NAME_PRINT(&vpid_snapshot->super.process_name), ret);
1143             ORTE_ERROR_LOG(ORTE_ERROR);
1144             exit_status = ORTE_ERROR;
1145             goto cleanup;
1146         }
1147 
1148         vpid_snapshot->super.state = ORTE_SNAPC_CKPT_STATE_RUNNING;
1149     }
1150 
1151     /*
1152      * Pass 4: Start Handshake, send option argument set and sstore handle
1153      */
1154     OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1155                          "Local) start() Pass 4: Start handshake"));
1156     for(item  = opal_list_get_first(&(local_global_snapshot.local_snapshots));
1157         item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
1158         item  = opal_list_get_next(item) ) {
1159         vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
1160 
1161         if( ORTE_SUCCESS != (ret = snapc_full_local_start_ckpt_handshake_opts(vpid_snapshot, options) ) ) {
1162             opal_output(mca_snapc_full_component.super.output_handle,
1163                         "local) Error: Unable to initiate the handshake with peer %s. %d\n",
1164                         ORTE_NAME_PRINT(&vpid_snapshot->super.process_name), ret);
1165             ORTE_ERROR_LOG(ORTE_ERROR);
1166             exit_status = ORTE_ERROR;
1167             goto cleanup;
1168         }
1169     }
1170 
1171     /*
1172      * Pass 5: Start checkpoint
1173      */
1174     OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1175                          "Local) start() Pass 5: Start checkpoints"));
1176     for(item  = opal_list_get_first(&(local_global_snapshot.local_snapshots));
1177         item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
1178         item  = opal_list_get_next(item) ) {
1179         vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
1180 
1181         if( ORTE_SUCCESS != (ret = snapc_full_local_start_ckpt_handshake(vpid_snapshot) ) ) {
1182             opal_output(mca_snapc_full_component.super.output_handle,
1183                         "local) Error: Unable to initiate the handshake with peer %s. %d\n",
1184                         ORTE_NAME_PRINT(&vpid_snapshot->super.process_name), ret);
1185             ORTE_ERROR_LOG(ORTE_ERROR);
1186             exit_status = ORTE_ERROR;
1187             goto cleanup;
1188         }
1189     }
1190 
1191     /*
1192      * Progress Update to Global Coordinator
1193      */
1194     OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1195                          "Local) start() Pass 6: Tell Global Coord that we are running now"));
1196     if( ORTE_SUCCESS != (ret = snapc_full_local_update_coord(ORTE_SNAPC_CKPT_STATE_RUNNING, true) ) ) {
1197         ORTE_ERROR_LOG(ret);
1198         exit_status = ret;
1199         goto cleanup;
1200     }
1201 
1202     /*
1203      * If stopping then wait for all processes to stop
1204      */
1205     if( options->stop ) {
1206         while( num_stopped < opal_list_get_size(&(local_global_snapshot.local_snapshots)) ) {
1207             opal_progress();
1208             sleep(1);
1209 
1210             for(item  = opal_list_get_first(&(local_global_snapshot.local_snapshots));
1211                 item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
1212                 item  = opal_list_get_next(item) ) {
1213                 vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
1214 
1215                 ret = waitpid(vpid_snapshot->process_pid, &waitpid_status, WNOHANG|WUNTRACED);
1216 
1217                 if( (ret > 0) && WIFSTOPPED(waitpid_status) && (SIGSTOP == WSTOPSIG(waitpid_status)) ) {
1218                     ++num_stopped;
1219                     OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1220                                          "Local) Child (%d) is stopped [total = %d]",
1221                                          vpid_snapshot->process_pid, (int)num_stopped ));
1222                 }
1223                 else if( ret < 0 ) {
1224                     if( 0 < mca_snapc_full_component.super.verbose ) {
1225                         orte_show_help("help-orte-snapc-full.txt", "waitpid_stop_fail", true,
1226                                        vpid_snapshot->process_pid, ret,
1227                                        ORTE_NAME_PRINT(&vpid_snapshot->super.process_name));
1228                     }
1229                     goto skip_wait;
1230                 }
1231             }
1232         }
1233 
1234     skip_wait:
1235         for(item  = opal_list_get_first(&(local_global_snapshot.local_snapshots));
1236             item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
1237             item  = opal_list_get_next(item) ) {
1238             vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
1239 
1240             vpid_snapshot->super.state = ORTE_SNAPC_CKPT_STATE_STOPPED;
1241         }
1242 
1243         OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1244                              "Local) All Children have now been stopped [total = %d]",
1245                              (int)num_stopped ));
1246 
1247         /*
1248          * Finish the local snapshot
1249          */
1250         orte_sstore.sync(local_global_snapshot.ss_handle);
1251 
1252         /*
1253          * Progress Update to Global Coordinator
1254          */
1255         if( ORTE_SUCCESS != (ret = snapc_full_local_update_coord(ORTE_SNAPC_CKPT_STATE_STOPPED, false) ) ) {
1256             ORTE_ERROR_LOG(ret);
1257             exit_status = ret;
1258             goto cleanup;
1259         }
1260     }
1261 
1262  cleanup:
1263     if( ORTE_SUCCESS != exit_status ) {
1264         ckpt_state = ORTE_SNAPC_CKPT_STATE_ERROR;
1265     }
1266 
1267     return exit_status;
1268 }
1269 
1270 static int local_define_pipe_names(orte_snapc_full_app_snapshot_t *vpid_snapshot)
1271 {
1272     if( NULL != vpid_snapshot->comm_pipe_r ) {
1273         free(vpid_snapshot->comm_pipe_r);
1274         vpid_snapshot->comm_pipe_r = NULL;
1275     }
1276 
1277     if( NULL != vpid_snapshot->comm_pipe_w ) {
1278         free(vpid_snapshot->comm_pipe_w);
1279         vpid_snapshot->comm_pipe_w = NULL;
1280     }
1281 
1282     opal_asprintf(&(vpid_snapshot->comm_pipe_w),
1283              "%s/%s.%d_%d",
1284              opal_cr_pipe_dir, OPAL_CR_NAMED_PROG_R,
1285              vpid_snapshot->process_pid,
1286              vpid_snapshot->unique_pipe_id);
1287 
1288     opal_asprintf(&(vpid_snapshot->comm_pipe_r),
1289              "%s/%s.%d_%d",
1290              opal_cr_pipe_dir, OPAL_CR_NAMED_PROG_W,
1291              vpid_snapshot->process_pid,
1292              vpid_snapshot->unique_pipe_id);
1293 
1294     (vpid_snapshot->unique_pipe_id)++;
1295 
1296     return ORTE_SUCCESS;
1297 }
1298 
1299 static int snapc_full_local_update_coord(int state, bool quick)
1300 {
1301     int ret, exit_status = ORTE_SUCCESS;
1302     opal_buffer_t *buffer = NULL;
1303     orte_snapc_full_cmd_flag_t command;
1304 
1305     /*
1306      * Local Coordinator: Send Global Coordinator state information
1307      */
1308     buffer = OBJ_NEW(opal_buffer_t);
1309 
1310     if( quick ) {
1311         command = ORTE_SNAPC_FULL_UPDATE_ORTED_STATE_QUICK_CMD;
1312     } else {
1313         command = ORTE_SNAPC_FULL_UPDATE_ORTED_STATE_CMD;
1314     }
1315     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
1316         ORTE_ERROR_LOG(ret);
1317         exit_status = ret;
1318         goto cleanup;
1319     }
1320 
1321     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &state, 1, OPAL_INT))) {
1322         ORTE_ERROR_LOG(ret);
1323         exit_status = ret;
1324         goto cleanup;
1325     }
1326 
1327     /* Optionally send only an abbreviated message to improve scalability */
1328     /* JJH: Though there is currently no additional information sent in a long
1329      *      message versus a small message, keep this logic so that in the
1330      *      future it can be easily reused without substantially modifying
1331      *      the component.
1332      */
1333     if( quick ) {
1334         goto send_data;
1335     }
1336 
1337  send_data:
1338     if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer,
1339                                                        ORTE_RML_TAG_SNAPC_FULL,
1340                                                        orte_rml_send_callback, 0))) {
1341         ORTE_ERROR_LOG(ret);
1342         exit_status = ret;
1343         goto cleanup;
1344     }
1345 
1346     return ORTE_SUCCESS;
1347 
1348  cleanup:
1349     OBJ_RELEASE(buffer);
1350 
1351     return exit_status;
1352 }
1353 
1354 static int snapc_full_local_start_ckpt_open_comm(orte_snapc_full_app_snapshot_t *vpid_snapshot)
1355 {
1356     int ret, exit_status = ORTE_SUCCESS;
1357     int usleep_time = 1000;
1358     int s_time = 0, max_wait_time;
1359 
1360     /* wait time before giving up on the checkpoint */
1361     max_wait_time = orte_snapc_full_max_wait_time * (1000000/usleep_time);
1362 
1363     /*
1364      * Wait for the named pipes to be created
1365      */
1366     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1367                          "Local) Waiting for process %s's pipes (%s) (%s)\n",
1368                          ORTE_NAME_PRINT(&vpid_snapshot->super.process_name),
1369                          vpid_snapshot->comm_pipe_w,
1370                          vpid_snapshot->comm_pipe_r));
1371     for( s_time = 0; s_time < max_wait_time || max_wait_time <= 0; ++s_time) {
1372         /*
1373          * See if the named pipe exists yet for the PID in question
1374          */
1375         if( 0 > (ret = access(vpid_snapshot->comm_pipe_r, F_OK) )) {
1376             /* File doesn't exist yet, keep waiting */
1377             if( s_time >= max_wait_time - 5 && max_wait_time > 0 ) {
1378                 OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
1379                                      "Local) WARNING: Read file does not exist yet: <%s> rtn = %d (waited %d/%d sec)\n",
1380                                      vpid_snapshot->comm_pipe_r, ret,
1381                                      s_time/usleep_time, max_wait_time/usleep_time));
1382             }
1383             usleep(usleep_time);
1384             continue;
1385         }
1386         else if( 0 > (ret = access(vpid_snapshot->comm_pipe_w, F_OK) )) {
1387             /* File doesn't exist yet, keep waiting */
1388             if( s_time >= max_wait_time - 5 && max_wait_time > 0 ) {
1389                 OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
1390                                      "Local) WARNING: Write file does not exist yet: <%s> rtn = %d (waited %d/%d sec)\n",
1391                                      vpid_snapshot->comm_pipe_w, ret,
1392                                      s_time/usleep_time, max_wait_time/usleep_time));
1393             }
1394             usleep(usleep_time);
1395             continue;
1396         }
1397         else {
1398             break;
1399         }
1400 
1401         if( max_wait_time > 0 &&
1402             (s_time == (max_wait_time/2) ||
1403              s_time == (max_wait_time/4) ||
1404              s_time == (3*max_wait_time/4) ) ) {
1405             OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1406                                  "WARNING: Pid (%d) not responding [%d / %d]",
1407                                  vpid_snapshot->process_pid, s_time, max_wait_time));
1408         }
1409     }
1410 
1411     if( max_wait_time > 0 && s_time == max_wait_time ) {
1412         /* The file doesn't exist,
1413          * This means that the process didn't open up a named pipe for us
1414          * to access their checkpoint notification routine. Therefore,
1415          * the application either:
1416          *  - Doesn't exist
1417          *  - Isn't checkpointable
1418          * In either case there is nothing we can do.
1419          */
1420         orte_show_help("help-opal-checkpoint.txt", "pid_does_not_exist", true,
1421                        vpid_snapshot->process_pid,
1422                        vpid_snapshot->comm_pipe_r,
1423                        vpid_snapshot->comm_pipe_w);
1424 
1425         exit_status = OPAL_ERROR;
1426         goto cleanup;
1427     }
1428 
1429     /*
1430      * Open Pipes...
1431      *  - prog_named_write_pipe:
1432      *    prog makes this file and opens Read Only
1433      *    this app. opens it Write Only
1434      *  - prog_named_read_pipe:
1435      *    prog makes this file and opens Write Only
1436      *    this app. opens it Read Only
1437      */
1438     vpid_snapshot->comm_pipe_w_fd = open(vpid_snapshot->comm_pipe_w, O_WRONLY);
1439     if(vpid_snapshot->comm_pipe_w_fd < 0) {
1440         opal_output(mca_snapc_full_component.super.output_handle,
1441                     "local) Error: Unable to open name pipe (%s). %d\n",
1442                     vpid_snapshot->comm_pipe_w, vpid_snapshot->comm_pipe_w_fd);
1443         exit_status = OPAL_ERROR;
1444         goto cleanup;
1445     }
1446 
1447     vpid_snapshot->comm_pipe_r_fd = open(vpid_snapshot->comm_pipe_r, O_RDONLY);
1448     if(vpid_snapshot->comm_pipe_r_fd < 0) {
1449         opal_output(mca_snapc_full_component.super.output_handle,
1450                     "local) Error: Unable to open name pipe (%s). %d\n",
1451                     vpid_snapshot->comm_pipe_r, vpid_snapshot->comm_pipe_r_fd);
1452         exit_status = OPAL_ERROR;
1453         goto cleanup;
1454     }
1455 
1456  cleanup:
1457     return exit_status;
1458 }
1459 
1460 static int snapc_full_local_start_ckpt_handshake_opts(orte_snapc_full_app_snapshot_t *vpid_snapshot,
1461                                                       opal_crs_base_ckpt_options_t *options)
1462 {
1463     int ret, exit_status = ORTE_SUCCESS;
1464     int opt_rep;
1465 
1466     /*
1467      * Start the handshake:
1468      * - Send the migrating options [All, this proc]
1469      * - Send term argument
1470      * - Send stop argument
1471      */
1472     if( vpid_snapshot->migrating ) {
1473         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1474                              "Local) Tell app to MIGRATE. [%s (%d)]\n",
1475                              (vpid_snapshot->migrating ? "True" : "False"),
1476                              (int)(currently_migrating) ));
1477     }
1478     if( options->term ) {
1479         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1480                              "Local) Tell app to TERMINATE after completion of checkpoint. [%s]\n",
1481                              (options->term ? "True" : "False") ));
1482     }
1483     if( options->stop ) {
1484         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1485                              "Local) Tell app to STOP after completion of checkpoint. [%s]\n",
1486                              (options->stop ? "True" : "False") ));
1487     }
1488 
1489 
1490     opt_rep = (int)(currently_migrating);
1491     if( sizeof(int) != (ret = write(vpid_snapshot->comm_pipe_w_fd, &opt_rep, sizeof(int))) ) {
1492         opal_output(mca_snapc_full_component.super.output_handle,
1493                     "Local) Error: Unable to write migrating (%d) to named pipe (%s), %d\n",
1494                     vpid_snapshot->migrating, vpid_snapshot->comm_pipe_w, ret);
1495         exit_status = OPAL_ERROR;
1496         goto cleanup;
1497     }
1498 
1499     opt_rep = (int)(vpid_snapshot->migrating);
1500     if( sizeof(int) != (ret = write(vpid_snapshot->comm_pipe_w_fd, &opt_rep, sizeof(int))) ) {
1501         opal_output(mca_snapc_full_component.super.output_handle,
1502                     "Local) Error: Unable to write migrating (%d) to named pipe (%s), %d\n",
1503                     vpid_snapshot->migrating, vpid_snapshot->comm_pipe_w, ret);
1504         exit_status = OPAL_ERROR;
1505         goto cleanup;
1506     }
1507 
1508     opt_rep = (int)(options->term);
1509     if( sizeof(int) != (ret = write(vpid_snapshot->comm_pipe_w_fd, &opt_rep, sizeof(int))) ) {
1510         opal_output(mca_snapc_full_component.super.output_handle,
1511                     "Local) Error: Unable to write term (%d) to named pipe (%s), %d\n",
1512                     options->term, vpid_snapshot->comm_pipe_w, ret);
1513         exit_status = OPAL_ERROR;
1514         goto cleanup;
1515     }
1516 
1517     opt_rep = (int)(options->stop);
1518     if( sizeof(int) != (ret = write(vpid_snapshot->comm_pipe_w_fd, &opt_rep, sizeof(int))) ) {
1519         opal_output(mca_snapc_full_component.super.output_handle,
1520                     "Local) Error: Unable to write stop (%d) to named pipe (%s), %d\n",
1521                     options->stop, vpid_snapshot->comm_pipe_w, ret);
1522         exit_status = OPAL_ERROR;
1523         goto cleanup;
1524     }
1525 
1526     opt_rep = (int)(options->inc_prep_only);
1527     if( sizeof(int) != (ret = write(vpid_snapshot->comm_pipe_w_fd, &opt_rep, sizeof(int))) ) {
1528         opal_output(mca_snapc_full_component.super.output_handle,
1529                     "Local) Error: Unable to write inc_prep_only (%d) to named pipe (%s), %d\n",
1530                     options->stop, vpid_snapshot->comm_pipe_w, ret);
1531         exit_status = OPAL_ERROR;
1532         goto cleanup;
1533     }
1534 
1535     opt_rep = (int)(options->inc_recover_only);
1536     if( sizeof(int) != (ret = write(vpid_snapshot->comm_pipe_w_fd, &opt_rep, sizeof(int))) ) {
1537         opal_output(mca_snapc_full_component.super.output_handle,
1538                     "Local) Error: Unable to write inc_recover_only (%d) to named pipe (%s), %d\n",
1539                     options->stop, vpid_snapshot->comm_pipe_w, ret);
1540         exit_status = OPAL_ERROR;
1541         goto cleanup;
1542     }
1543 
1544 #if OPAL_ENABLE_CRDEBUG == 1
1545     opt_rep = (int)(options->attach_debugger);
1546     if( sizeof(int) != (ret = write(vpid_snapshot->comm_pipe_w_fd, &opt_rep, sizeof(int))) ) {
1547         opal_output(mca_snapc_full_component.super.output_handle,
1548                     "local) Error: Unable to write attach_debugger (%d) to named pipe (%s), %d\n",
1549                     options->attach_debugger, vpid_snapshot->comm_pipe_w, ret);
1550         exit_status = OPAL_ERROR;
1551         goto cleanup;
1552     }
1553 
1554     opt_rep = (int)(options->detach_debugger);
1555     if( sizeof(int) != (ret = write(vpid_snapshot->comm_pipe_w_fd, &opt_rep, sizeof(int))) ) {
1556         opal_output(mca_snapc_full_component.super.output_handle,
1557                     "local) Error: Unable to write detach_debugger (%d) to named pipe (%s), %d\n",
1558                     options->detach_debugger, vpid_snapshot->comm_pipe_w, ret);
1559         exit_status = OPAL_ERROR;
1560         goto cleanup;
1561     }
1562 #endif
1563 
1564     /*
1565      * Send the SStore handle
1566      */
1567     if( sizeof(orte_sstore_base_handle_t) != (ret = write(vpid_snapshot->comm_pipe_w_fd,
1568                                                           &(local_global_snapshot.ss_handle), sizeof(orte_sstore_base_handle_t) )) ) {
1569         opal_output(mca_snapc_full_component.super.output_handle,
1570                     "Local) Error: Unable to write sstore handle (%d) to named pipe (%s). %d\n",
1571                     (int)(local_global_snapshot.ss_handle), vpid_snapshot->comm_pipe_w, ret);
1572         ORTE_ERROR_LOG(OPAL_ERROR);
1573         exit_status = OPAL_ERROR;
1574         goto cleanup;
1575     }
1576 
1577  cleanup:
1578     return exit_status;
1579 }
1580 
1581 static int snapc_full_local_start_ckpt_handshake(orte_snapc_full_app_snapshot_t *vpid_snapshot)
1582 {
1583     int ret, exit_status = ORTE_SUCCESS;
1584     int value;
1585 
1586     /*
1587      * Wait for the appliation to respond
1588      */
1589     if( sizeof(int) != (ret = read(vpid_snapshot->comm_pipe_r_fd, &value, sizeof(int))) ) {
1590         opal_output(mca_snapc_full_component.super.output_handle,
1591                     "Local) Error: Unable to read length from named pipe (%s). %d\n",
1592                     vpid_snapshot->comm_pipe_r, ret);
1593         exit_status = OPAL_ERROR;
1594         goto cleanup;
1595     }
1596 
1597     /* Check the response to make sure we can checkpoint this process */
1598     if( OPAL_CHECKPOINT_CMD_IN_PROGRESS == value ) {
1599         orte_show_help("help-opal-checkpoint.txt",
1600                        "ckpt:in_progress",
1601                        true,
1602                        vpid_snapshot->process_pid);
1603         exit_status = OPAL_ERROR;
1604         goto cleanup;
1605     }
1606     else if( OPAL_CHECKPOINT_CMD_NULL == value ) {
1607         orte_show_help("help-opal-checkpoint.txt",
1608                        "ckpt:req_null",
1609                        true,
1610                        vpid_snapshot->process_pid);
1611         exit_status = OPAL_ERROR;
1612         goto cleanup;
1613     }
1614     else if ( OPAL_CHECKPOINT_CMD_ERROR == value ) {
1615         orte_show_help("help-opal-checkpoint.txt",
1616                        "ckpt:req_error",
1617                        true,
1618                        vpid_snapshot->process_pid);
1619         exit_status = OPAL_ERROR;
1620         goto cleanup;
1621     }
1622 
1623     opal_event_set(orte_event_base, &(vpid_snapshot->comm_pipe_r_eh),
1624                    vpid_snapshot->comm_pipe_r_fd,
1625                    OPAL_EV_READ|OPAL_EV_PERSIST,
1626                    snapc_full_local_comm_read_event,
1627                    vpid_snapshot);
1628     vpid_snapshot->is_eh_active = true;
1629     opal_event_add(&(vpid_snapshot->comm_pipe_r_eh), NULL);
1630 
1631     /*
1632      * Let the application know that it can proceed
1633      */
1634     if( sizeof(int) != (ret = write(vpid_snapshot->comm_pipe_w_fd, &value, sizeof(int))) ) {
1635         opal_output(mca_snapc_full_component.super.output_handle,
1636                     "Local) Error: Unable to write to named pipe (%s). %d\n",
1637                     vpid_snapshot->comm_pipe_w, ret);
1638         ORTE_ERROR_LOG(OPAL_ERROR);
1639         exit_status = OPAL_ERROR;
1640         goto cleanup;
1641     }
1642 
1643  cleanup:
1644     return exit_status;
1645 }
1646 
1647 static int snapc_full_local_end_ckpt_handshake(orte_snapc_full_app_snapshot_t *vpid_snapshot)
1648 {
1649     int ret, exit_status = ORTE_SUCCESS;
1650     int last_cmd = 0;
1651 
1652     /*
1653      * Make sure the pipe is open, so we do not try to do this twice
1654      */
1655     if( 0 > vpid_snapshot->comm_pipe_w_fd ) {
1656         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1657                              "Local) end_handshake: Process %s closed pipe. Skipping. (%d)\n",
1658                              ORTE_NAME_PRINT(&vpid_snapshot->super.process_name),
1659                              vpid_snapshot->comm_pipe_w_fd));
1660         return exit_status;
1661     }
1662 
1663     /*
1664      * Finish the handshake.
1665      */
1666     if( sizeof(int) != (ret = write(vpid_snapshot->comm_pipe_w_fd, &last_cmd, sizeof(int))) ) {
1667         opal_output(mca_snapc_full_component.super.output_handle,
1668                     "Local) Error: Unable to release process %s (%d)\n",
1669                     ORTE_NAME_PRINT(&vpid_snapshot->super.process_name), ret);
1670         exit_status = OPAL_ERROR;
1671         goto cleanup;
1672     }
1673 
1674  cleanup:
1675     /*
1676      * Close all pipes
1677      */
1678     close(vpid_snapshot->comm_pipe_w_fd);
1679     close(vpid_snapshot->comm_pipe_r_fd);
1680     vpid_snapshot->comm_pipe_w_fd = -1;
1681     vpid_snapshot->comm_pipe_r_fd = -1;
1682 
1683     return exit_status;
1684 }
1685 
1686 static void snapc_full_local_comm_read_event(int fd, short flags, void *arg)
1687 {
1688     int ret;
1689     orte_snapc_full_app_snapshot_t *vpid_snapshot = NULL;
1690     int ckpt_state;
1691     int loc_min_state;
1692     char * state_str = NULL;
1693 
1694     vpid_snapshot = (orte_snapc_full_app_snapshot_t *)arg;
1695 
1696     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1697                          "Local) Read Event: Process %s done checkpointing...\n",
1698                          ORTE_NAME_PRINT(&vpid_snapshot->super.process_name)));
1699 
1700     /*
1701      * Get the final state of the checkpoint from the checkpointing process
1702      */
1703     if( !vpid_snapshot->migrating ) {
1704         if( sizeof(int) != (ret = read(vpid_snapshot->comm_pipe_r_fd, &ckpt_state, sizeof(int))) ) {
1705             opal_output(mca_snapc_full_component.super.output_handle,
1706                         "Local) Error: Unable to read state from named pipe (%s). %d\n",
1707                         vpid_snapshot->comm_pipe_r, ret);
1708             ORTE_ERROR_LOG(ORTE_ERROR);
1709             goto cleanup;
1710         }
1711 
1712         /*
1713          * If only doing INC Prep phase, then jump out
1714          */
1715         if( local_global_snapshot.options->inc_prep_only &&
1716             OPAL_CRS_RUNNING == ckpt_state ) {
1717             /*
1718              * If all local procs are done, then tell the Global coord
1719              */
1720             vpid_snapshot->super.state = ORTE_SNAPC_CKPT_STATE_INC_PREPED;
1721             loc_min_state = snapc_full_get_min_state();
1722             if( loc_min_state > current_job_ckpt_state &&
1723                 ORTE_SNAPC_CKPT_STATE_INC_PREPED == loc_min_state ) {
1724                 if( ORTE_SUCCESS != (ret = snapc_full_local_update_coord(ORTE_SNAPC_CKPT_STATE_INC_PREPED, false) ) ) {
1725                     ORTE_ERROR_LOG(ret);
1726                     goto cleanup;
1727                 }
1728             }
1729 
1730             /* Just return */
1731             return;
1732         }
1733     }
1734 
1735     /*
1736      * Now that the checkpoint is finished
1737      * Update our status information
1738      */
1739     if( ckpt_state == OPAL_CRS_ERROR ) {
1740         vpid_snapshot->super.state = ORTE_SNAPC_CKPT_STATE_ERROR;
1741     } else {
1742         vpid_snapshot->super.state = ORTE_SNAPC_CKPT_STATE_FINISHED_LOCAL;
1743     }
1744 
1745     /*
1746      * Flush GrpComm modex info if migrating
1747      */
1748     if( currently_migrating && !flushed_modex ) {
1749         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1750                              "Local) Read Event: Flush the modex cached data\n"));
1751 
1752         /* TODO: You can't pass NULL as the identifier - what you'll need to do is
1753          * close all open dstore handles, and then open the ones you need
1754          */
1755 #if 0
1756         if (OPAL_SUCCESS != (ret = opal_dstore.remove(NULL, NULL))) {
1757             ORTE_ERROR_LOG(ret);
1758             exit_status = ret;
1759             goto cleanup;
1760         }
1761 #endif
1762 
1763         flushed_modex = true;
1764     }
1765 
1766     /*
1767      * If error, then exit early
1768      */
1769     if( ORTE_SNAPC_CKPT_STATE_ERROR == vpid_snapshot->super.state ) {
1770         /* JJH - The error path needs some more work */
1771         if( ORTE_SUCCESS != (ret = snapc_full_local_update_coord(ORTE_SNAPC_CKPT_STATE_ERROR, true) ) ) {
1772             ORTE_ERROR_LOG(ret);
1773         }
1774         goto cleanup;
1775     }
1776 
1777     /*
1778      * If all processes have finished locally, notify Global Coordinator
1779      */
1780     loc_min_state = snapc_full_get_min_state();
1781     if( loc_min_state > current_job_ckpt_state &&
1782         ORTE_SNAPC_CKPT_STATE_FINISHED_LOCAL == loc_min_state ) {
1783 
1784         orte_snapc_ckpt_state_str(&state_str, loc_min_state);
1785         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1786                              "Local) Daemon State Changed: %d (%s)",
1787                              (int)loc_min_state, state_str ));
1788         free(state_str);
1789         state_str = NULL;
1790 
1791         /*
1792          * Notify the Global Coordinator
1793          */
1794         current_job_ckpt_state = loc_min_state;
1795         if( ORTE_SNAPC_GLOBAL_COORD_TYPE != (orte_snapc_coord_type & ORTE_SNAPC_GLOBAL_COORD_TYPE)) {
1796             if( ORTE_SUCCESS != (ret = snapc_full_local_update_coord(loc_min_state, false) ) ) {
1797                 ORTE_ERROR_LOG(ret);
1798                 goto cleanup;
1799             }
1800         }
1801 
1802         /*
1803          * Sync the SStore
1804          * If we stopped the process then we already did this
1805          */
1806         if( !local_global_snapshot.options->stop ) {
1807             orte_sstore.sync(local_global_snapshot.ss_handle);
1808             sstore_local_sync_finished = true;
1809             /*
1810              * If the processes finished before we finished sync'ing
1811              * then we need to cleanup.
1812              */
1813             if( sstore_local_procs_finished ) {
1814                 if( ORTE_SUCCESS != (ret = orte_snapc_full_local_reset_coord()) ) {
1815                     ORTE_ERROR_LOG(ret);
1816                     goto cleanup;
1817                 }
1818             }
1819         }
1820 
1821         /* If this process is also the global coord, then we have to update -after- we sync locally */
1822         if( ORTE_SNAPC_GLOBAL_COORD_TYPE == (orte_snapc_coord_type & ORTE_SNAPC_GLOBAL_COORD_TYPE)) {
1823             if( ORTE_SUCCESS != (ret = snapc_full_local_update_coord(loc_min_state, false) ) ) {
1824                 ORTE_ERROR_LOG(ret);
1825                 goto cleanup;
1826             }
1827         }
1828     }
1829 
1830  cleanup:
1831     /*
1832      * Disable events
1833      */
1834     opal_event_del(&(vpid_snapshot->comm_pipe_r_eh));
1835     vpid_snapshot->is_eh_active = false;
1836 
1837     if( NULL != state_str ) {
1838         free(state_str);
1839         state_str = NULL;
1840     }
1841 
1842     return;
1843 }
1844 
1845 static int snapc_full_get_min_state(void)
1846 {
1847     int min_state = ORTE_SNAPC_CKPT_MAX;
1848     orte_snapc_full_app_snapshot_t *vpid_snapshot = NULL;
1849     opal_list_item_t* item = NULL;
1850     char * state_str_a = NULL;
1851     char * state_str_b = NULL;
1852 
1853     for(item  = opal_list_get_first(&(local_global_snapshot.local_snapshots));
1854         item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
1855         item  = opal_list_get_next(item) ) {
1856         vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
1857 
1858         if( NULL != state_str_a ) {
1859             free(state_str_a);
1860         }
1861         if( NULL != state_str_b ) {
1862             free(state_str_b);
1863         }
1864 
1865         orte_snapc_ckpt_state_str(&state_str_a, vpid_snapshot->super.state);
1866         orte_snapc_ckpt_state_str(&state_str_b, min_state);
1867 
1868         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1869                              "Local) ... %s Checking [%d %s] vs [%d %s]",
1870                              ORTE_NAME_PRINT(&vpid_snapshot->super.process_name),
1871                              (int)vpid_snapshot->super.state, state_str_a,
1872                              (int)min_state, state_str_b ));
1873         if( min_state > vpid_snapshot->super.state ) {
1874             min_state = vpid_snapshot->super.state;
1875         }
1876     }
1877 
1878     if( NULL != state_str_b ) {
1879         free(state_str_b);
1880         state_str_b = NULL;
1881     }
1882     orte_snapc_ckpt_state_str(&state_str_b, min_state);
1883     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1884                          "Local) ... Min State [%d %s]",
1885                          (int)min_state, state_str_b ));
1886 
1887     if( NULL != state_str_a ) {
1888         free(state_str_a);
1889         state_str_a = NULL;
1890     }
1891     if( NULL != state_str_b ) {
1892         free(state_str_b);
1893         state_str_b = NULL;
1894     }
1895 
1896     return min_state;
1897 }
1898 
1899 static int snapc_full_local_get_vpids(void)
1900 {
1901     orte_snapc_full_app_snapshot_t *vpid_snapshot = NULL;
1902     int i;
1903     orte_proc_t *child = NULL;
1904     size_t list_len = 0;
1905 
1906     /*
1907      * If the list is populated, and has updated pid information then
1908      * there is nothing to update.
1909      */
1910     list_len = opal_list_get_size(&(local_global_snapshot.local_snapshots));
1911     if( list_len > 0 ) {
1912         vpid_snapshot = (orte_snapc_full_app_snapshot_t*)opal_list_get_first(&(local_global_snapshot.local_snapshots));
1913         if( 0 < vpid_snapshot->process_pid ) {
1914             return ORTE_SUCCESS;
1915         }
1916     }
1917 
1918     /*
1919      * Otherwise update or populate the list
1920      */
1921     for (i=0; i < orte_local_children->size; i++) {
1922             if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
1923                     continue;
1924             }
1925 
1926         /* if the list is empty or this child is not in the list then add it */
1927         if( 0    >= list_len ||
1928             NULL == (vpid_snapshot = find_vpid_snapshot(&child->name)) ) {
1929             vpid_snapshot = OBJ_NEW(orte_snapc_full_app_snapshot_t);
1930             opal_list_append(&(local_global_snapshot.local_snapshots), &(vpid_snapshot->super.super));
1931         }
1932 
1933         /* Only update if the PID is -not- already set */
1934         if( 0 >= vpid_snapshot->process_pid ) {
1935             vpid_snapshot->process_pid              = child->pid;
1936             vpid_snapshot->super.process_name.jobid = child->name.jobid;
1937             vpid_snapshot->super.process_name.vpid  = child->name.vpid;
1938         }
1939     }
1940 
1941     return ORTE_SUCCESS;
1942 }
1943 
1944 static int snapc_full_local_refresh_vpids(void)
1945 {
1946     opal_list_item_t *v_item = NULL;
1947     orte_snapc_full_app_snapshot_t *vpid_snapshot = NULL;
1948     int i;
1949     orte_proc_t *child = NULL;
1950     bool found = false;
1951 
1952     /*
1953      * First make sure that all of the vpids in the list are still our
1954      * children (they may have moved)
1955      */
1956     for(v_item  = opal_list_get_first(&(local_global_snapshot.local_snapshots));
1957         v_item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
1958         v_item  = opal_list_get_next(v_item) ) {
1959         vpid_snapshot = (orte_snapc_full_app_snapshot_t*)v_item;
1960 
1961         found = false;
1962         for (i=0; i < orte_local_children->size; i++) {
1963             if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
1964                 continue;
1965             }
1966 
1967             if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL,
1968                                                            &child->name,
1969                                                            &(vpid_snapshot->super.process_name) )) {
1970                 found = true;
1971                 break;
1972             }
1973         }
1974         if( !found ) {
1975             OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1976                                  "Local) Refresh List: Remove Process %s (%5d) from Daemon %s",
1977                                  ORTE_NAME_PRINT(&vpid_snapshot->super.process_name),
1978                                  vpid_snapshot->process_pid,
1979                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME) ));
1980             opal_list_remove_item(&(local_global_snapshot.local_snapshots), v_item);
1981         }
1982     }
1983 
1984     /*
1985      * Next pass to find new processes that we are not already tracking
1986      * (processes migrated to us).
1987      */
1988     for (i=0; i < orte_local_children->size; i++) {
1989         if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
1990             continue;
1991         }
1992 
1993         /* if the list is empty or this child is not in the list then add it */
1994         if( NULL == (vpid_snapshot = find_vpid_snapshot(&child->name)) ) {
1995             vpid_snapshot = OBJ_NEW(orte_snapc_full_app_snapshot_t);
1996 
1997             vpid_snapshot->process_pid              = child->pid;
1998             vpid_snapshot->super.process_name.jobid = child->name.jobid;
1999             vpid_snapshot->super.process_name.vpid  = child->name.vpid;
2000             /*vpid_snapshot->migrating = true;*/
2001 
2002             opal_list_append(&(local_global_snapshot.local_snapshots), &(vpid_snapshot->super.super));
2003 
2004             OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
2005                                  "Local) Refresh List: Add Process %s (%5d) to Daemon %s",
2006                                  ORTE_NAME_PRINT(&vpid_snapshot->super.process_name),
2007                                  vpid_snapshot->process_pid,
2008                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME) ));
2009         }
2010         /* Only update if the PID is -not- already set */
2011         else if( 0 >= vpid_snapshot->process_pid ) {
2012             vpid_snapshot->process_pid              = child->pid;
2013             vpid_snapshot->super.process_name.jobid = child->name.jobid;
2014             vpid_snapshot->super.process_name.vpid  = child->name.vpid;
2015         }
2016     }
2017 
2018     return ORTE_SUCCESS;
2019 }
2020 
2021 static orte_snapc_full_app_snapshot_t *find_vpid_snapshot(orte_process_name_t *name )
2022 {
2023     opal_list_item_t* item = NULL;
2024     orte_snapc_full_app_snapshot_t *vpid_snapshot = NULL;
2025     orte_ns_cmp_bitmask_t mask;
2026 
2027     for(item  = opal_list_get_first(&(local_global_snapshot.local_snapshots));
2028         item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
2029         item  = opal_list_get_next(item) ) {
2030         vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
2031 
2032         mask = ORTE_NS_CMP_ALL;
2033 
2034         if (OPAL_EQUAL ==
2035                 orte_util_compare_name_fields(mask, name, &vpid_snapshot->super.process_name)) {
2036             return vpid_snapshot;
2037         }
2038     }
2039 
2040     return NULL;
2041 }
2042 
2043 static int orte_snapc_full_local_reset_coord(void)
2044 {
2045     int ret, exit_status = ORTE_SUCCESS;
2046     orte_snapc_full_app_snapshot_t *vpid_snapshot = NULL;
2047     opal_list_item_t* item = NULL;
2048 
2049     OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
2050                          "Local) Job Ckpt finished - Cleanup\n"));
2051 
2052     for(item  = opal_list_get_first(&(local_global_snapshot.local_snapshots));
2053         item != opal_list_get_end(&(local_global_snapshot.local_snapshots));
2054         item  = opal_list_get_next(item) ) {
2055         vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
2056 
2057         /* If we forgot to close the pipes to the application, then do so
2058          * now. It is rare that this would have happened, so more of a
2059          * sanity check as part of cleanup.
2060          */
2061         if( vpid_snapshot->comm_pipe_w_fd > 0  ) {
2062             if( ORTE_SUCCESS != (ret = local_coord_job_state_update_finished_local_vpid(vpid_snapshot) ) ) {
2063                 ORTE_ERROR_LOG(ORTE_ERROR);
2064                 goto cleanup;
2065             }
2066         }
2067 
2068         vpid_snapshot->super.state = ORTE_SNAPC_CKPT_STATE_NONE;
2069     }
2070 
2071     /*
2072      * Clear globally cached options
2073      */
2074     opal_crs_base_clear_options(local_global_snapshot.options);
2075 
2076     currently_migrating = false;
2077     flushed_modex = false;
2078 
2079     sstore_local_sync_finished  = false;
2080     sstore_local_procs_finished = false;
2081 
2082  cleanup:
2083     return exit_status;
2084 }

/* [<][>][^][v][top][bottom][index][help] */