root/orte/mca/snapc/full/snapc_full_global.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. global_coord_init
  2. global_coord_finalize
  3. global_coord_setup_job
  4. global_coord_release_job
  5. global_coord_start_ckpt
  6. global_coord_end_ckpt
  7. global_init_job_structs
  8. global_refresh_job_structs
  9. snapc_full_global_start_listener
  10. snapc_full_global_stop_listener
  11. snapc_full_global_start_cmdline_listener
  12. snapc_full_global_stop_cmdline_listener
  13. snapc_full_global_cmdline_recv
  14. snapc_full_global_orted_recv
  15. snapc_full_process_request_op_cmd
  16. snapc_full_process_orted_update_cmd
  17. snapc_full_process_restart_proc_info_cmd
  18. global_coord_restart_proc_info
  19. snapc_full_process_job_update_cmd
  20. snapc_full_establish_snapshot_dir
  21. snapc_full_global_checkpoint
  22. snapc_full_global_notify_checkpoint
  23. orte_snapc_full_global_set_job_ckpt_info
  24. global_coord_job_state_update
  25. write_out_global_metadata
  26. find_orted_snapshot
  27. snapc_full_global_get_min_state
  28. orte_snapc_full_global_reset_coord
  29. snapc_full_set_time
  30. snapc_full_display_all_timers
  31. snapc_full_display_recovered_timers
  32. snapc_full_clear_timers
  33. snapc_full_get_time
  34. snapc_full_display_indv_timer_core
  35. snapc_full_report_progress

   1 /*
   2  * Copyright (c) 2004-2010 The Trustees of Indiana University.
   3  *                         All rights reserved.
   4  * Copyright (c) 2004-2011 The Trustees of the University of Tennessee.
   5  *                         All rights reserved.
   6  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   7  *                         University of Stuttgart.  All rights reserved.
   8  * Copyright (c) 2004-2005 The Regents of the University of California.
   9  *                         All rights reserved.
  10  * Copyright (c) 2007      Evergrid, Inc. All rights reserved.
  11  *
  12  * Copyright (c) 2018      Cisco Systems, Inc.  All rights reserved
  13  * $COPYRIGHT$
  14  *
  15  * Additional copyrights may follow
  16  *
  17  * $HEADER$
  18  */
  19 #include "orte_config.h"
  20 
  21 #include <sys/types.h>
  22 #ifdef HAVE_UNISTD_H
  23 #include <unistd.h>
  24 #endif  /* HAVE_UNISTD_H */
  25 #include <string.h>
  26 
  27 #include "opal/include/opal/prefetch.h"
  28 #include "opal/util/output.h"
  29 #include "opal/util/opal_environ.h"
  30 #include "opal/util/basename.h"
  31 #include "opal/util/show_help.h"
  32 #include "opal/util/string_copy.h"
  33 #include "orte/mca/mca.h"
  34 #include "opal/mca/base/base.h"
  35 #include "opal/mca/crs/crs.h"
  36 #include "opal/mca/crs/base/base.h"
  37 
  38 #include "orte/util/name_fns.h"
  39 #include "orte/util/proc_info.h"
  40 #include "orte/runtime/orte_globals.h"
  41 #include "opal/dss/dss.h"
  42 #include "orte/mca/rml/rml.h"
  43 #include "orte/mca/rml/rml_types.h"
  44 #include "orte/mca/rmaps/rmaps.h"
  45 #include "orte/mca/rmaps/rmaps_types.h"
  46 #include "orte/mca/plm/plm.h"
  47 #include "orte/mca/grpcomm/grpcomm.h"
  48 #include "orte/runtime/orte_wait.h"
  49 #include "orte/mca/errmgr/errmgr.h"
  50 #include "orte/mca/errmgr/base/base.h"
  51 
  52 #include "orte/mca/snapc/snapc.h"
  53 #include "orte/mca/snapc/base/base.h"
  54 
  55 #include "snapc_full.h"
  56 
  57 #include MCA_timer_IMPLEMENTATION_HEADER
  58 
  59 /************************************
  60  * Locally Global vars & functions :)
  61  ************************************/
  62 #define INC_SEQ_NUM()                         \
  63  {                                            \
  64    if(orte_snapc_base_store_only_one_seq) {   \
  65      orte_snapc_base_snapshot_seq_number = 0; \
  66    } else {                                   \
  67      orte_snapc_base_snapshot_seq_number++;   \
  68    }                                          \
  69  }
  70 
  71 static orte_jobid_t current_global_jobid = ORTE_JOBID_INVALID;
  72 static orte_snapc_base_global_snapshot_t global_snapshot;
  73 static int current_total_orteds = 0;
  74 static bool updated_job_to_running;
  75 static int current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_NONE;
  76 static bool cleanup_on_establish = false;
  77 static bool global_coord_has_local_children = false;
  78 
  79 static bool currently_migrating = false;
  80 static opal_list_t *migrating_procs = NULL;
  81 
  82 static int global_init_job_structs(void);
  83 static int global_refresh_job_structs(void);
  84 
  85 static bool snapc_orted_recv_issued = false;
  86 static bool is_orte_checkpoint_connected = false;
  87 static bool is_app_checkpointable = false;
  88 static int snapc_full_global_start_listener(void);
  89 static int snapc_full_global_stop_listener(void);
  90 static void snapc_full_global_orted_recv(int status,
  91                                          orte_process_name_t* sender,
  92                                          opal_buffer_t* buffer,
  93                                          orte_rml_tag_t tag,
  94                                          void* cbdata);
  95 
  96 static void snapc_full_process_restart_proc_info_cmd(orte_process_name_t* sender,
  97                                                      opal_buffer_t* buffer);
  98 
  99 static void snapc_full_process_request_op_cmd(orte_process_name_t* sender,
 100                                               opal_buffer_t* buffer);
 101 
 102 /*** Command Line Interactions */
 103 static orte_process_name_t orte_checkpoint_sender;
 104 static bool snapc_cmdline_recv_issued = false;
 105 static int snapc_full_global_start_cmdline_listener(void);
 106 static int snapc_full_global_stop_cmdline_listener(void);
 107 static void snapc_full_global_cmdline_recv(int status,
 108                                            orte_process_name_t* sender,
 109                                            opal_buffer_t* buffer,
 110                                            orte_rml_tag_t tag,
 111                                            void* cbdata);
 112 
 113 static int snapc_full_establish_snapshot_dir(bool empty_metadata);
 114 
 115 /*** */
 116 static int snapc_full_global_checkpoint(opal_crs_base_ckpt_options_t *options);
 117 static int snapc_full_global_notify_checkpoint(orte_jobid_t jobid,
 118                                                opal_crs_base_ckpt_options_t *options);
 119 static int orte_snapc_full_global_set_job_ckpt_info( orte_jobid_t jobid,
 120                                                      int ckpt_state,
 121                                                      orte_sstore_base_handle_t handle,
 122                                                      bool quick,
 123                                                      opal_crs_base_ckpt_options_t *options);
 124 int global_coord_job_state_update(orte_jobid_t jobid,
 125                                   int job_ckpt_state,
 126                                   orte_sstore_base_handle_t handle,
 127                                   opal_crs_base_ckpt_options_t *options);
 128 static void snapc_full_process_job_update_cmd(orte_process_name_t* sender,
 129                                               opal_buffer_t* buffer,
 130                                               bool quick);
 131 static int snapc_full_process_orted_update_cmd(orte_process_name_t* sender,
 132                                                opal_buffer_t* buffer,
 133                                                bool quick);
 134 static orte_snapc_full_orted_snapshot_t *find_orted_snapshot(orte_process_name_t *name );
 135 
 136 static int snapc_full_global_get_min_state(void);
 137 static int write_out_global_metadata(void);
 138 
 139 static int orte_snapc_full_global_reset_coord(void);
 140 
 141 /*
 142  * Timer stuff
 143  */
 144 static void snapc_full_set_time(int idx);
 145 static void snapc_full_display_all_timers(void);
 146 static void snapc_full_display_recovered_timers(void);
 147 static void snapc_full_clear_timers(void);
 148 
 149 static double snapc_full_get_time(void);
 150 static void snapc_full_display_indv_timer_core(double diff, char *str);
 151 
 152 #define SNAPC_FULL_TIMER_START     0
 153 #define SNAPC_FULL_TIMER_RUNNING   1
 154 #define SNAPC_FULL_TIMER_FIN_LOCAL 2
 155 #define SNAPC_FULL_TIMER_SS_SYNC   3
 156 #define SNAPC_FULL_TIMER_ESTABLISH 4
 157 #define SNAPC_FULL_TIMER_RECOVERED 5
 158 #define SNAPC_FULL_TIMER_MAX       6
 159 
 160 static double timer_start[SNAPC_FULL_TIMER_MAX];
 161 
 162 #define SNAPC_FULL_CLEAR_TIMERS()                                       \
 163     {                                                                   \
 164         if(OPAL_UNLIKELY(orte_snapc_full_timing_enabled)) {             \
 165             snapc_full_clear_timers();                                  \
 166         }                                                               \
 167     }
 168 
 169 #define SNAPC_FULL_SET_TIMER(idx)                                       \
 170     {                                                                   \
 171         if(OPAL_UNLIKELY(orte_snapc_full_timing_enabled)) {             \
 172             snapc_full_set_time(idx);                                   \
 173         }                                                               \
 174     }
 175 
 176 #define SNAPC_FULL_DISPLAY_ALL_TIMERS()                                 \
 177     {                                                                   \
 178         if(OPAL_UNLIKELY(orte_snapc_full_timing_enabled)) {             \
 179             snapc_full_display_all_timers();                            \
 180         }                                                               \
 181     }
 182 #define SNAPC_FULL_DISPLAY_RECOVERED_TIMER()                            \
 183     {                                                                   \
 184         if(OPAL_UNLIKELY(orte_snapc_full_timing_enabled)) {             \
 185             snapc_full_display_recovered_timers();                      \
 186         }                                                               \
 187     }
 188 
 189 /*
 190  * Progress
 191  */
 192 static void snapc_full_report_progress(orte_snapc_full_orted_snapshot_t *orted_snapshot,
 193                                        int total,
 194                                        int min_state);
 195 static int    report_progress_cur_loc_finished = 0;
 196 static double report_progress_last_reported_loc_finished = 0;
 197 #define SNAPC_FULL_REPORT_PROGRESS(orted, total, min_state)             \
 198     {                                                                   \
 199         if(OPAL_UNLIKELY(orte_snapc_full_progress_meter > 0)) {         \
 200             snapc_full_report_progress(orted, total, min_state);        \
 201         }                                                               \
 202     }
 203 
 204 /************************
 205  * Function Definitions
 206  ************************/
 207 int global_coord_init(void)
 208 {
 209     current_global_jobid = ORTE_JOBID_INVALID;
 210     orte_snapc_base_snapshot_seq_number = -1;
 211 
 212     orte_checkpoint_sender = orte_name_invalid;
 213 
 214     SNAPC_FULL_CLEAR_TIMERS();
 215 
 216     return ORTE_SUCCESS;
 217 }
 218 
 219 int global_coord_finalize(void)
 220 {
 221     current_global_jobid = ORTE_JOBID_INVALID;
 222     orte_snapc_base_snapshot_seq_number = -1;
 223 
 224     SNAPC_FULL_CLEAR_TIMERS();
 225 
 226     return ORTE_SUCCESS;
 227 }
 228 
 229 int global_coord_setup_job(orte_jobid_t jobid) {
 230     int ret, exit_status = ORTE_SUCCESS;
 231     orte_job_t *jdata = NULL;
 232 
 233     /*
 234      * Only allow one job at a time.
 235      *
 236      * It is possible to pass through this function twice since HNP may also be
 237      * a local daemon. So it may be both a global and local coordinator.
 238      *  Global: orte_plm_base_setup_job()
 239      *  Local : odls_default_module.c
 240      */
 241     /* Global Coordinator pass */
 242     if( ORTE_JOBID_INVALID == current_global_jobid ) {
 243         current_global_jobid = jobid;
 244         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 245                              "Global) Setup job %s as the Global Coordinator\n",
 246                              ORTE_JOBID_PRINT(jobid)));
 247 
 248         SNAPC_FULL_CLEAR_TIMERS();
 249         SNAPC_FULL_SET_TIMER(SNAPC_FULL_TIMER_START);
 250     }
 251     /* Local Coordinator pass - Always happens after global coordinator pass */
 252     else if ( jobid == current_global_jobid ) {
 253 
 254         /* look up job data object */
 255         if (NULL == (jdata = orte_get_job_data_object(current_global_jobid))) {
 256             ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 257             return ORTE_ERR_NOT_FOUND;
 258         }
 259 
 260         if (ORTE_FLAG_TEST(jdata, ORTE_JOB_FLAG_RESTART)) {
 261             OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 262                                  "Global) Restarting Job %s...",
 263                                  ORTE_JOBID_PRINT(jobid)));
 264             SNAPC_FULL_CLEAR_TIMERS();
 265             SNAPC_FULL_SET_TIMER(SNAPC_FULL_TIMER_START);
 266 
 267             if( ORTE_SUCCESS != (ret = global_refresh_job_structs()) ) {
 268                 ORTE_ERROR_LOG(ret);
 269                 return ret;
 270             }
 271             if( ORTE_SNAPC_LOCAL_COORD_TYPE == (orte_snapc_coord_type & ORTE_SNAPC_LOCAL_COORD_TYPE) ) {
 272                 return local_coord_setup_job(jobid);
 273             }
 274             return ORTE_SUCCESS;
 275         }
 276 
 277         /* If there are no local children, do not become a local coordinator */
 278         if( !global_coord_has_local_children ) {
 279             return ORTE_SUCCESS;
 280         }
 281 
 282         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 283                              "Global) Setup job %s as the Local Coordinator\n",
 284                              ORTE_JOBID_PRINT(jobid)));
 285         orte_snapc_coord_type |= ORTE_SNAPC_LOCAL_COORD_TYPE;
 286         return local_coord_setup_job(jobid);
 287     }
 288     /* Only allow one job at a time */
 289     else {
 290         opal_output(mca_snapc_full_component.super.output_handle,
 291                     "Global) Setup of job %s Failed! Already setup job %s\n",
 292                     ORTE_JOBID_PRINT(jobid), ORTE_JOBID_PRINT(current_global_jobid));
 293         ORTE_ERROR_LOG(ORTE_ERROR);
 294         return ORTE_ERROR;
 295     }
 296 
 297     /*
 298      * Start out with a sequence number just below the first
 299      * This will be incremented when we checkpoint
 300      */
 301     orte_snapc_base_snapshot_seq_number = -1;
 302 
 303     /*
 304      * Allocate structure to track node status
 305      */
 306     if( ORTE_SUCCESS != (ret = global_init_job_structs()) ) {
 307         ORTE_ERROR_LOG(ret);
 308         exit_status = ret;
 309         goto cleanup;
 310     }
 311 
 312     /*
 313      * Setup Global Coordinator command processing listener
 314      */
 315     if( ORTE_SUCCESS != (ret = snapc_full_global_start_listener()) ) {
 316         ORTE_ERROR_LOG(ret);
 317         exit_status = ret;
 318         goto cleanup;
 319     }
 320 
 321     /*
 322      * Setup command line tool checkpoint request listener
 323      */
 324     if( ORTE_SUCCESS != (ret = snapc_full_global_start_cmdline_listener()) ) {
 325         ORTE_ERROR_LOG(ret);
 326         exit_status = ret;
 327         goto cleanup;
 328     }
 329 
 330     /*
 331      * If requested pre-establish the global snapshot directory
 332      */
 333 #if 0
 334     if(orte_snapc_base_establish_global_snapshot_dir) {
 335         opal_output(0, "Global) Error: Pre-establishment of snapshot directory currently not supported!");
 336         ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED);
 337 
 338         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 339                              "Global) Pre-establish the global snapshot directory\n"));
 340         if( ORTE_SUCCESS != (ret = snapc_full_establish_snapshot_dir(true))) {
 341             ORTE_ERROR_LOG(ret);
 342             exit_status = ret;
 343             goto cleanup;
 344         }
 345     }
 346 #endif
 347 
 348     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 349                          "Global) Finished setup of job %s ",
 350                          ORTE_JOBID_PRINT(jobid)));
 351 
 352  cleanup:
 353     return exit_status;
 354 }
 355 
 356 int global_coord_release_job(orte_jobid_t jobid) {
 357     int ret, exit_status = ORTE_SUCCESS;
 358 
 359     /*
 360      * Make sure we are not waiting on a checkpoint to complete
 361      */
 362     if( is_orte_checkpoint_connected ) {
 363         if( ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_update_cmd(&orte_checkpoint_sender,
 364                                                                                 global_snapshot.ss_handle,
 365                                                                                 ORTE_SNAPC_CKPT_STATE_ERROR)) ) {
 366             ORTE_ERROR_LOG(ret);
 367         }
 368     }
 369 
 370     /*
 371      * Clean up listeners
 372      */
 373     if( ORTE_SUCCESS != (ret = snapc_full_global_stop_cmdline_listener()) ) {
 374         ORTE_ERROR_LOG(ret);
 375         exit_status = ret;
 376     }
 377 
 378     if( ORTE_SUCCESS != (ret = snapc_full_global_stop_listener()) ) {
 379         ORTE_ERROR_LOG(ret);
 380         exit_status = ret;
 381     }
 382 
 383     OBJ_DESTRUCT(&global_snapshot);
 384 
 385     return exit_status;
 386 }
 387 
 388 int global_coord_start_ckpt(orte_snapc_base_quiesce_t *datum)
 389 {
 390     int ret, exit_status = ORTE_SUCCESS;
 391     orte_std_cntr_t i_proc;
 392     orte_proc_t *proc = NULL;
 393     orte_proc_t *new_proc = NULL;
 394     opal_list_item_t *item = NULL;
 395     opal_crs_base_ckpt_options_t *options = NULL;
 396     char *tmp_str = NULL;
 397 
 398     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 399                          "Global) Starting checkpoint (internally requested)"));
 400 
 401     orte_checkpoint_sender = orte_name_invalid;
 402 
 403     /*
 404      * If migrating
 405      */
 406     if( datum->migrating ) {
 407         currently_migrating = true;
 408         if( NULL != migrating_procs ) {
 409             while( NULL != (item = opal_list_remove_first(migrating_procs)) ) {
 410                 proc = (orte_proc_t*)item;
 411                 OBJ_RELEASE(proc);
 412             }
 413         } else {
 414             migrating_procs = OBJ_NEW(opal_list_t);
 415         }
 416 
 417         /*
 418          * Copy over the procs into a list
 419          */
 420         for(i_proc = 0; i_proc < opal_pointer_array_get_size(&(datum->migrating_procs)); ++i_proc) {
 421             proc = (orte_proc_t*)opal_pointer_array_get_item(&(datum->migrating_procs), i_proc);
 422             if( NULL == proc ) {
 423                 continue;
 424             }
 425 
 426             new_proc = OBJ_NEW(orte_proc_t);
 427             new_proc->name.jobid = proc->name.jobid;
 428             new_proc->name.vpid  = proc->name.vpid;
 429             new_proc->node = OBJ_NEW(orte_node_t);
 430             new_proc->node->name = proc->node->name;
 431             opal_list_append(migrating_procs, &new_proc->super);
 432             OBJ_RETAIN(new_proc);
 433         }
 434 
 435         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 436                              "Global) SnapC Migrating Processes: (%d procs) [Updated]\n",
 437                              (int)opal_list_get_size(migrating_procs) ));
 438         for (item  = opal_list_get_first(migrating_procs);
 439              item != opal_list_get_end(migrating_procs);
 440              item  = opal_list_get_next(item)) {
 441             new_proc = (orte_proc_t*)item;
 442             OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 443                                  "\t\"%s\" [%s]\n",
 444                                  ORTE_NAME_PRINT(&new_proc->name),new_proc->node->name));
 445         }
 446     }
 447 
 448     /*************************
 449      * Kick off the checkpoint (local coord will release the processes)
 450      *************************/
 451     options = OBJ_NEW(opal_crs_base_ckpt_options_t);
 452     if( ORTE_SUCCESS != (ret = snapc_full_global_checkpoint(options) ) ) {
 453         ORTE_ERROR_LOG(ret);
 454         exit_status = ret;
 455         goto cleanup;
 456     }
 457 
 458     /*
 459      * Wait for checkpoint to locally finish on all nodes
 460      */
 461     while(((currently_migrating  && current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_MIGRATING) ||
 462            (!currently_migrating && current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_FINISHED_LOCAL)) &&
 463           current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_ESTABLISHED &&
 464           current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_RECOVERED &&
 465           current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_ERROR &&
 466           current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_NONE ) {
 467         opal_progress();
 468     }
 469 
 470     /*
 471      * Update the quiesce structure with the handle
 472      */
 473     datum->snapshot = OBJ_NEW(orte_snapc_base_global_snapshot_t);
 474 
 475     datum->ss_handle = global_snapshot.ss_handle;
 476     datum->ss_snapshot = OBJ_NEW(orte_sstore_base_global_snapshot_info_t);
 477     if( ORTE_SUCCESS != (ret = orte_sstore.request_global_snapshot_data(&(datum->ss_handle), datum->ss_snapshot)) ) {
 478         ORTE_ERROR_LOG(ret);
 479         exit_status = ret;
 480         goto cleanup;
 481     }
 482 
 483     /* JJH Is the snapc structure useful with the sstore structure ??? */
 484     orte_sstore.get_attr(global_snapshot.ss_handle,
 485                          SSTORE_METADATA_GLOBAL_SNAP_SEQ,
 486                          &tmp_str);
 487     datum->epoch = atoi(tmp_str);
 488 
 489     if( NULL != tmp_str ) {
 490         free(tmp_str);
 491         tmp_str = NULL;
 492     }
 493 
 494  cleanup:
 495     if( NULL != options ) {
 496         OBJ_RELEASE(options);
 497         options = NULL;
 498     }
 499 
 500     return exit_status;
 501 }
 502 
 503 int global_coord_end_ckpt(orte_snapc_base_quiesce_t *datum)
 504 {
 505     int ret, exit_status = ORTE_SUCCESS;
 506     opal_list_item_t* item = NULL;
 507 
 508     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 509                          "Global) Finishing checkpoint (internally requested) [%3d]",
 510                          current_job_ckpt_state));
 511 
 512     if( currently_migrating ) {
 513         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 514                              "Global) End Ckpt: Flush the modex cached data\n"));
 515 
 516         /* TODO: You can't pass NULL as the identifier - what you'll need to do is
 517          * close all open dstore handles, and then open the ones you need
 518          */
 519 #if 0
 520         if (OPAL_SUCCESS != (ret = opal_dstore.remove(NULL, NULL))) {
 521             ORTE_ERROR_LOG(ret);
 522             exit_status = ret;
 523             goto cleanup;
 524         }
 525 #endif
 526 
 527         SNAPC_FULL_SET_TIMER(SNAPC_FULL_TIMER_ESTABLISH);
 528         if( ORTE_SUCCESS != (ret = orte_snapc_full_global_set_job_ckpt_info(current_global_jobid,
 529                                                                             ORTE_SNAPC_CKPT_STATE_FINISHED_LOCAL,
 530                                                                             global_snapshot.ss_handle,
 531                                                                             true, NULL) ) ) {
 532             ORTE_ERROR_LOG(ret);
 533             exit_status = ret;
 534             goto cleanup;
 535         }
 536     }
 537 
 538     while(current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_RECOVERED &&
 539           current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_ERROR &&
 540           current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_NONE ) {
 541         opal_progress();
 542     }
 543 
 544     /*
 545      * Update the job structure since processes may have moved around
 546      */
 547     if( ORTE_SUCCESS != (ret = global_refresh_job_structs()) ) {
 548         ORTE_ERROR_LOG(ret);
 549         exit_status = ret;
 550         goto cleanup;
 551     }
 552 
 553     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 554                          "Global) Finished checkpoint (internally requested) [%d]",
 555                          current_job_ckpt_state));
 556 
 557     if( currently_migrating ) {
 558         current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_NONE;
 559         cleanup_on_establish = false;
 560 
 561         report_progress_cur_loc_finished = 0;
 562         report_progress_last_reported_loc_finished = 0;
 563     }
 564 
 565  cleanup:
 566 
 567     currently_migrating = false;
 568     if( NULL != migrating_procs ) {
 569         while( NULL != (item = opal_list_remove_first(migrating_procs)) ) {
 570             OBJ_RELEASE(item);
 571         }
 572         OBJ_RELEASE(migrating_procs);
 573         migrating_procs = NULL;
 574     }
 575 
 576     return exit_status;
 577 }
 578 
 579 /******************
 580  * Local functions
 581  ******************/
 582 static int global_init_job_structs(void)
 583 {
 584     orte_snapc_full_orted_snapshot_t *orted_snapshot = NULL;
 585     orte_snapc_base_local_snapshot_t *app_snapshot = NULL;
 586     opal_list_item_t* orted_item = NULL;
 587     orte_node_t *cur_node = NULL;
 588     orte_job_map_t *map = NULL;
 589     orte_job_t *jdata = NULL;
 590     orte_proc_t **procs = NULL;
 591     orte_std_cntr_t i = 0;
 592     orte_vpid_t p = 0;
 593     orte_ns_cmp_bitmask_t mask;
 594     bool found = false;
 595 
 596     /* look up job data object */
 597     if (NULL == (jdata = orte_get_job_data_object(current_global_jobid))) {
 598         ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 599         return ORTE_ERR_NOT_FOUND;
 600     }
 601 
 602     OBJ_CONSTRUCT(&global_snapshot, orte_snapc_base_global_snapshot_t);
 603 
 604     map = jdata->map;
 605 
 606     for (i=0; i < map->nodes->size; i++) {
 607         if (NULL == (cur_node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, i))) {
 608             continue;
 609         }
 610 
 611         procs = (orte_proc_t**)cur_node->procs->addr;
 612 
 613         /*
 614          * Look out for duplicates
 615          * JJH: Should not happen, but does if rmaps get a bug in setting up the map.
 616          */
 617         found = false;
 618         for(orted_item  = opal_list_get_first(&(global_snapshot.local_snapshots));
 619             orted_item != opal_list_get_end(&(global_snapshot.local_snapshots));
 620             orted_item  = opal_list_get_next(orted_item) ) {
 621             orted_snapshot = (orte_snapc_full_orted_snapshot_t*)orted_item;
 622             /*
 623              * Is in list?
 624              */
 625             if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL,
 626                                                            &(cur_node->daemon->name),
 627                                                            &(orted_snapshot->process_name) )) {
 628                 found = true;
 629                 break;
 630             }
 631         }
 632         if( found ) {
 633             OPAL_OUTPUT_VERBOSE((1, mca_snapc_full_component.super.output_handle,
 634                                  "Global) [%d] Found Daemon %s with %d procs - Duplicate!! - Should not happen!",
 635                                  i, ORTE_NAME_PRINT(&(cur_node->daemon->name)), cur_node->num_procs));
 636             continue;
 637         }
 638 
 639         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 640                              "Global) [%d] Found Daemon %s with %d procs",
 641                              i, ORTE_NAME_PRINT(&(cur_node->daemon->name)), cur_node->num_procs));
 642 
 643         orted_snapshot = OBJ_NEW(orte_snapc_full_orted_snapshot_t);
 644 
 645         orted_snapshot->process_name.jobid  = cur_node->daemon->name.jobid;
 646         orted_snapshot->process_name.vpid   = cur_node->daemon->name.vpid;
 647 
 648         mask = ORTE_NS_CMP_JOBID;
 649 
 650         if (OPAL_EQUAL ==
 651                 orte_util_compare_name_fields(mask, &orted_snapshot->process_name, ORTE_PROC_MY_NAME)) {
 652             global_coord_has_local_children = true;
 653         }
 654 
 655         for(p = 0; p < cur_node->num_procs; ++p) {
 656             OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 657                                  "Global) \t [%d] Found Process %s on Daemon %s",
 658                                  p, ORTE_NAME_PRINT(&(procs[p]->name)), ORTE_NAME_PRINT(&(cur_node->daemon->name)) ));
 659 
 660             app_snapshot = OBJ_NEW(orte_snapc_base_local_snapshot_t);
 661 
 662             app_snapshot->process_name.jobid = procs[p]->name.jobid;
 663             app_snapshot->process_name.vpid = procs[p]->name.vpid;
 664 
 665             opal_list_append(&(orted_snapshot->super.local_snapshots), &(app_snapshot->super));
 666         }
 667 
 668 
 669         opal_list_append(&global_snapshot.local_snapshots, &(orted_snapshot->super.super));
 670     }
 671 
 672     return ORTE_SUCCESS;
 673 }
 674 
 675 static int global_refresh_job_structs(void)
 676 {
 677     orte_snapc_full_orted_snapshot_t *orted_snapshot = NULL;
 678     orte_snapc_base_local_snapshot_t *app_snapshot = NULL;
 679     opal_list_item_t* orted_item = NULL;
 680     opal_list_item_t* app_item = NULL;
 681     opal_list_item_t* item = NULL;
 682     orte_node_t *cur_node = NULL;
 683     orte_job_map_t *map = NULL;
 684     orte_job_t *jdata = NULL;
 685     orte_proc_t **procs = NULL;
 686     orte_proc_t *new_proc = NULL;
 687     orte_std_cntr_t i = 0;
 688     orte_vpid_t p = 0;
 689     bool found = false;
 690     orte_ns_cmp_bitmask_t mask;
 691 
 692     /* look up job data object */
 693     if (NULL == (jdata = orte_get_job_data_object(current_global_jobid))) {
 694         ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 695         return ORTE_ERR_NOT_FOUND;
 696     }
 697 
 698     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 699                          "Global) Refreshing Job Structures... [%3d]",
 700                          current_job_ckpt_state));
 701 
 702     if( NULL != migrating_procs ) {
 703         for (item  = opal_list_get_first(migrating_procs);
 704              item != opal_list_get_end(migrating_procs);
 705              item  = opal_list_get_next(item)) {
 706             new_proc = (orte_proc_t*)item;
 707 
 708             /*
 709              * Look through all daemons
 710              */
 711             found = false;
 712             for(orted_item  = opal_list_get_first(&(global_snapshot.local_snapshots));
 713                 orted_item != opal_list_get_end(&(global_snapshot.local_snapshots));
 714                 orted_item  = opal_list_get_next(orted_item) ) {
 715                 orted_snapshot = (orte_snapc_full_orted_snapshot_t*)orted_item;
 716 
 717                 /*
 718                  * Look through all processes tracked by this daemon
 719                  */
 720                 for(app_item  = opal_list_get_first(&(orted_snapshot->super.local_snapshots));
 721                     app_item != opal_list_get_end(&(orted_snapshot->super.local_snapshots));
 722                     app_item  = opal_list_get_next(app_item) ) {
 723                     app_snapshot = (orte_snapc_base_local_snapshot_t*)app_item;
 724 
 725                     if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL,
 726                                                                    &(new_proc->name),
 727                                                                    &(app_snapshot->process_name) )) {
 728                         found = true;
 729                         opal_list_remove_item(&(orted_snapshot->super.local_snapshots), app_item);
 730                         break;
 731                     }
 732                 }
 733 
 734                 if( found ) {
 735                     break;
 736                 }
 737             }
 738         }
 739     }
 740 
 741     /*
 742      * First make sure that all of the orted's have the proper number of
 743      * children, if no children, then stop tracking.
 744      */
 745     map = jdata->map;
 746     for(orted_item  = opal_list_get_first(&(global_snapshot.local_snapshots));
 747         orted_item != opal_list_get_end(&(global_snapshot.local_snapshots));
 748         orted_item  = opal_list_get_next(orted_item) ) {
 749         orted_snapshot = (orte_snapc_full_orted_snapshot_t*)orted_item;
 750 
 751         /* Make sure this orted is in the map */
 752         found = false;
 753         for (i=0; i < map->nodes->size; i++) {
 754             if (NULL == (cur_node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, i))) {
 755                 continue;
 756             }
 757 
 758             if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL,
 759                                                            &(cur_node->daemon->name),
 760                                                            &(orted_snapshot->process_name) )) {
 761                 found = true;
 762                 break;
 763             }
 764         }
 765         /* If not, then remove all processes, keep ref. we might reuse it later */
 766         if( !found ) {
 767             OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 768                                  "Global) Found Empty Daemon %s not in map (Refresh)",
 769                                  ORTE_NAME_PRINT(&(orted_snapshot->process_name)) ));
 770             while( NULL != (item = opal_list_remove_first(&(orted_snapshot->super.local_snapshots))) ) {
 771                 OBJ_RELEASE(item);
 772             }
 773         }
 774     }
 775 
 776     /*
 777      * Look for new nodes
 778      */
 779     for (i=0; i < map->nodes->size; i++) {
 780         if (NULL == (cur_node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, i))) {
 781             continue;
 782         }
 783 
 784         procs = (orte_proc_t**)cur_node->procs->addr;
 785 
 786         /*
 787          * See if we are already tracking it, if so refresh it
 788          * (This daemon could have been restarted, and processes migrated back to it)
 789          */
 790         found = false;
 791         for(orted_item  = opal_list_get_first(&(global_snapshot.local_snapshots));
 792             orted_item != opal_list_get_end(&(global_snapshot.local_snapshots));
 793             orted_item  = opal_list_get_next(orted_item) ) {
 794             orted_snapshot = (orte_snapc_full_orted_snapshot_t*)orted_item;
 795 
 796             if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL,
 797                                                            &(cur_node->daemon->name),
 798                                                            &(orted_snapshot->process_name) )) {
 799                 found = true;
 800                 break;
 801             }
 802         }
 803         if( found ) {
 804             OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 805                                  "Global) [%d] Found Daemon %s with %d procs (Refresh)",
 806                                  i, ORTE_NAME_PRINT(&(cur_node->daemon->name)), cur_node->num_procs));
 807 
 808             /* Remove all old processes */
 809             while( NULL != (item = opal_list_remove_first(&(orted_snapshot->super.local_snapshots))) ) {
 810                 OBJ_RELEASE(item);
 811             }
 812 
 813             /* Add back new processes (a bit of overkill, sure, but it works) */
 814             for(p = 0; p < cur_node->num_procs; ++p) {
 815                 if( NULL == procs[p] ) {
 816                     continue;
 817                 }
 818 
 819                 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 820                                      "Global) \t [%d] Found Process %s on Daemon %s",
 821                                      p, ORTE_NAME_PRINT(&(procs[p]->name)), ORTE_NAME_PRINT(&(cur_node->daemon->name)) ));
 822 
 823                 app_snapshot = OBJ_NEW(orte_snapc_base_local_snapshot_t);
 824 
 825                 app_snapshot->process_name.jobid = procs[p]->name.jobid;
 826                 app_snapshot->process_name.vpid = procs[p]->name.vpid;
 827 
 828                 opal_list_append(&(orted_snapshot->super.local_snapshots), &(app_snapshot->super));
 829             }
 830 
 831             continue;
 832         }
 833 
 834         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 835                              "Global) [%d] Found Daemon %s with %d procs",
 836                              i, ORTE_NAME_PRINT(&(cur_node->daemon->name)), cur_node->num_procs));
 837 
 838         orted_snapshot = OBJ_NEW(orte_snapc_full_orted_snapshot_t);
 839 
 840         orted_snapshot->process_name.jobid  = cur_node->daemon->name.jobid;
 841         orted_snapshot->process_name.vpid   = cur_node->daemon->name.vpid;
 842 
 843         mask = ORTE_NS_CMP_ALL;
 844 
 845         if (OPAL_EQUAL ==
 846                 orte_util_compare_name_fields(mask, &orted_snapshot->process_name, ORTE_PROC_MY_NAME)) {
 847             global_coord_has_local_children = true;
 848         }
 849         for(p = 0; p < cur_node->num_procs; ++p) {
 850             if( NULL == procs[p] ) {
 851                 continue;
 852             }
 853 
 854             OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 855                                  "Global) \t [%d] Found Process %s on Daemon %s",
 856                                  p, ORTE_NAME_PRINT(&(procs[p]->name)), ORTE_NAME_PRINT(&(cur_node->daemon->name)) ));
 857 
 858             app_snapshot = OBJ_NEW(orte_snapc_base_local_snapshot_t);
 859 
 860             app_snapshot->process_name.jobid = procs[p]->name.jobid;
 861             app_snapshot->process_name.vpid = procs[p]->name.vpid;
 862 
 863             opal_list_append(&(orted_snapshot->super.local_snapshots), &(app_snapshot->super));
 864         }
 865 
 866         opal_list_append(&global_snapshot.local_snapshots, &(orted_snapshot->super.super));
 867     }
 868 
 869     return ORTE_SUCCESS;
 870 }
 871 
 872 /*****************
 873  * Setup listeners
 874  *****************/
 875 static int snapc_full_global_start_listener(void)
 876 {
 877     if (snapc_orted_recv_issued && ORTE_PROC_IS_HNP) {
 878         return ORTE_SUCCESS;
 879     }
 880 
 881     OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
 882                          "Global) Startup Coordinator Channel"));
 883 
 884     /*
 885      * Coordinator command listener
 886      */
 887     orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SNAPC_FULL,
 888                             ORTE_RML_PERSISTENT, snapc_full_global_orted_recv, NULL);
 889 
 890     snapc_orted_recv_issued = true;
 891 
 892     return ORTE_SUCCESS;
 893 }
 894 
 895 static int snapc_full_global_stop_listener(void)
 896 {
 897     if (!snapc_orted_recv_issued && ORTE_PROC_IS_HNP) {
 898         return ORTE_SUCCESS;
 899     }
 900 
 901     OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
 902                          "Global) Shutdown Coordinator Channel"));
 903 
 904     orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SNAPC_FULL);
 905 
 906     snapc_orted_recv_issued = false;
 907     return ORTE_SUCCESS;
 908 }
 909 
 910 static int snapc_full_global_start_cmdline_listener(void)
 911 {
 912     if (snapc_cmdline_recv_issued && ORTE_PROC_IS_HNP) {
 913         return ORTE_SUCCESS;
 914     }
 915 
 916     OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
 917                          "Global) Startup Command Line Channel"));
 918 
 919     /*
 920      * Coordinator command listener
 921      */
 922     orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_CKPT, 0,
 923                             snapc_full_global_cmdline_recv, NULL);
 924 
 925     snapc_cmdline_recv_issued = true;
 926     return ORTE_SUCCESS;
 927 }
 928 
 929 static int snapc_full_global_stop_cmdline_listener(void)
 930 {
 931     if (!snapc_cmdline_recv_issued && ORTE_PROC_IS_HNP) {
 932         return ORTE_SUCCESS;
 933     }
 934 
 935     OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
 936                          "Global) Shutdown Command Line Channel"));
 937 
 938     orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_CKPT);
 939 
 940     snapc_cmdline_recv_issued = false;
 941     return ORTE_SUCCESS;
 942 }
 943 
 944 /*****************
 945  * Listener Callbacks
 946  *****************/
 947 static void snapc_full_global_cmdline_recv(int status,
 948                                            orte_process_name_t* sender,
 949                                            opal_buffer_t* buffer,
 950                                            orte_rml_tag_t tag,
 951                                            void* cbdata)
 952 {
 953     int ret;
 954     orte_snapc_cmd_flag_t command;
 955     orte_std_cntr_t count = 1;
 956     orte_jobid_t jobid;
 957     opal_crs_base_ckpt_options_t *options = NULL;
 958 
 959     if( ORTE_RML_TAG_CKPT != tag ) {
 960         opal_output(mca_snapc_full_component.super.output_handle,
 961                     "Global) Error: Unknown tag: Received a command message from %s (tag = %d).",
 962                     ORTE_NAME_PRINT(sender), tag);
 963         ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
 964         return;
 965     }
 966 
 967     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 968                          "Global) Command Line: Start a checkpoint operation [Sender = %s]",
 969                          ORTE_NAME_PRINT(sender)));
 970 
 971     snapc_cmdline_recv_issued = false; /* Not a persistent RML message */
 972 
 973     options = OBJ_NEW(opal_crs_base_ckpt_options_t);
 974 
 975     count = 1;
 976     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SNAPC_CMD))) {
 977         ORTE_ERROR_LOG(ret);
 978         goto cleanup;
 979     }
 980 
 981     /*
 982      * orte_checkpoint has requested that a checkpoint be taken
 983      */
 984     if (ORTE_SNAPC_GLOBAL_INIT_CMD == command) {
 985         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
 986                              "Global) Command line requested a checkpoint [command %d]\n",
 987                              command));
 988 
 989         /*
 990          * Unpack the buffer from the orte_checkpoint command
 991          */
 992         if( ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_init_cmd(sender,
 993                                                                               buffer,
 994                                                                               options,
 995                                                                               &jobid)) ) {
 996             ORTE_ERROR_LOG(ret);
 997             goto cleanup;
 998         }
 999 
1000         orte_checkpoint_sender = *sender;
1001         is_orte_checkpoint_connected = true;
1002 
1003         /*
1004          * If the application is not ready for a checkpoint,
1005          * then send back an error.
1006          */
1007         if( !is_app_checkpointable ) {
1008             OPAL_OUTPUT_VERBOSE((1, mca_snapc_full_component.super.output_handle,
1009                                  "Global) request_cmd(): Checkpointing currently disabled, rejecting request"));
1010             if( ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_update_cmd(&orte_checkpoint_sender,
1011                                                                                     0,
1012                                                                                     ORTE_SNAPC_CKPT_STATE_ERROR))) {
1013                 ORTE_ERROR_LOG(ret);
1014             }
1015 
1016             orte_checkpoint_sender = orte_name_invalid;
1017             is_orte_checkpoint_connected = false;
1018 
1019             /* Reset the listener */
1020             if( ORTE_SUCCESS != (ret = snapc_full_global_start_cmdline_listener() ) ){
1021                 ORTE_ERROR_LOG(ret);
1022             }
1023 
1024             goto cleanup;
1025         }
1026 
1027         /*
1028          * If the jobid was specified, and does not match the current job, then fail
1029          */
1030         if( ORTE_JOBID_INVALID != jobid && jobid != current_global_jobid) {
1031             opal_output(mca_snapc_full_component.super.output_handle,
1032                         "Global) Error: Jobid %s does not match the current jobid %s",
1033                         ORTE_JOBID_PRINT(jobid), ORTE_JOBID_PRINT(current_global_jobid));
1034             ORTE_ERROR_LOG(ORTE_ERROR);
1035             goto cleanup;
1036         }
1037 
1038         /*************************
1039          * Kick off the checkpoint
1040          *************************/
1041         SNAPC_FULL_CLEAR_TIMERS();
1042         SNAPC_FULL_SET_TIMER(SNAPC_FULL_TIMER_START);
1043 
1044         if( ORTE_SUCCESS != (ret = snapc_full_global_checkpoint(options) ) ) {
1045             ORTE_ERROR_LOG(ret);
1046             goto cleanup;
1047         }
1048 
1049     }
1050     /*
1051      * Terminate the connection (Not currently implemented)
1052      */
1053     else if (ORTE_SNAPC_GLOBAL_TERM_CMD == command) {
1054         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1055                              "Global) Command line requested to terminate connection (command %d)\n",
1056                              command));
1057         ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED);
1058         goto cleanup;
1059     }
1060     /*
1061      * Unknown command
1062      */
1063     else {
1064         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1065                              "Global) Command line sent an unknown command (command %d)\n",
1066                              command));
1067         ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED);
1068         goto cleanup;
1069     }
1070 
1071  cleanup:
1072     if( NULL != options ) {
1073         OBJ_RELEASE(options);
1074         options = NULL;
1075     }
1076 
1077     return;
1078 }
1079 
1080 void snapc_full_global_orted_recv(int status,
1081                                   orte_process_name_t* sender,
1082                                   opal_buffer_t* buffer,
1083                                   orte_rml_tag_t tag,
1084                                   void* cbdata)
1085 {
1086     int ret;
1087     orte_snapc_full_cmd_flag_t command;
1088     orte_std_cntr_t count;
1089     static int num_inside = 0;
1090 
1091     if( ORTE_RML_TAG_SNAPC_FULL != tag ) {
1092         opal_output(mca_snapc_full_component.super.output_handle,
1093                     "Global) Error: Unknown tag: Received a command message from %s (tag = %d).",
1094                     ORTE_NAME_PRINT(sender), tag);
1095         ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
1096         return;
1097     }
1098 
1099     /*
1100      * This is a message from a Local Coordinator
1101      */
1102     OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1103                          "Global) Receive a command message from %s.",
1104                          ORTE_NAME_PRINT(sender)));
1105 
1106     count = 1;
1107     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SNAPC_FULL_CMD))) {
1108         ORTE_ERROR_LOG(ret);
1109         return;
1110     }
1111 
1112     ++num_inside;
1113 
1114     switch (command) {
1115         case ORTE_SNAPC_FULL_UPDATE_JOB_STATE_QUICK_CMD:
1116             OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1117                                  "Global) Command: Job State Update (quick)"));
1118 
1119             snapc_full_process_job_update_cmd(sender, buffer, true);
1120             break;
1121 
1122         case ORTE_SNAPC_FULL_UPDATE_JOB_STATE_CMD:
1123             OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1124                                  "Global) Command: Job State Update"));
1125 
1126             snapc_full_process_job_update_cmd(sender, buffer, false);
1127             break;
1128 
1129         case ORTE_SNAPC_FULL_UPDATE_ORTED_STATE_QUICK_CMD:
1130             OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1131                                  "Global) Command: Daemon State Update (quick)"));
1132 
1133             snapc_full_process_orted_update_cmd(sender, buffer, true);
1134             break;
1135 
1136         case ORTE_SNAPC_FULL_UPDATE_ORTED_STATE_CMD:
1137             OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1138                                  "Global) Command: Daemon State Update"));
1139 
1140             snapc_full_process_orted_update_cmd(sender, buffer, false);
1141             break;
1142 
1143         case ORTE_SNAPC_FULL_RESTART_PROC_INFO:
1144             OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1145                                  "Global) Command: Update hostname/pid associations"));
1146 
1147             snapc_full_process_restart_proc_info_cmd(sender, buffer);
1148             break;
1149 
1150         case ORTE_SNAPC_FULL_REQUEST_OP_CMD:
1151             OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1152                                  "Global) Command: Request Op"));
1153 
1154             snapc_full_process_request_op_cmd(sender, buffer);
1155             break;
1156 
1157         default:
1158             ORTE_ERROR_LOG(ORTE_ERR_VALUE_OUT_OF_BOUNDS);
1159     }
1160 
1161     return;
1162 }
1163 
1164 static void snapc_full_process_request_op_cmd(orte_process_name_t* sender,
1165                                               opal_buffer_t* sbuffer)
1166 {
1167     int ret;
1168     orte_std_cntr_t count = 1;
1169     orte_jobid_t jobid;
1170     int op_event, op_state;
1171     opal_crs_base_ckpt_options_t *options = NULL;
1172     opal_buffer_t *buffer = NULL;
1173     orte_snapc_full_cmd_flag_t command = ORTE_SNAPC_FULL_REQUEST_OP_CMD;
1174     int seq_num = -1, i;
1175     char * global_handle = NULL, *tmp_str = NULL;
1176     orte_snapc_base_request_op_t *datum = NULL;
1177 
1178     orte_checkpoint_sender = orte_name_invalid;
1179 
1180     count = 1;
1181     if (ORTE_SUCCESS != (ret = opal_dss.unpack(sbuffer, &jobid, &count, ORTE_JOBID))) {
1182         ORTE_ERROR_LOG(ret);
1183         goto cleanup;
1184     }
1185 
1186     count = 1;
1187     if (ORTE_SUCCESS != (ret = opal_dss.unpack(sbuffer, &op_event, &count, OPAL_INT))) {
1188         ORTE_ERROR_LOG(ret);
1189         goto cleanup;
1190     }
1191 
1192     OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1193                          "Global) process_request_op(): Op Code %2d\n",
1194                          op_event));
1195 
1196     /************************************
1197      * Application have been initialized, and are ready for checkpointing
1198      ************************************/
1199     if( ORTE_SNAPC_OP_INIT == op_event ) {
1200         OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
1201                              "Global) process_request_op(): Checkpointing Enabled (%2d)\n",
1202                              op_event));
1203         is_app_checkpointable = true;
1204     }
1205     /************************************
1206      * Application is finalizing, and no longer ready for checkpointing.
1207      ************************************/
1208     else if( ORTE_SNAPC_OP_FIN == op_event ) {
1209         OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
1210                              "Global) process_request_op(): Checkpointing Disabled (%2d)\n",
1211                              op_event));
1212         is_app_checkpointable = false;
1213 
1214         /*
1215          * Wait for any ongoing checkpoints to finish
1216          */
1217         if( current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_ERROR &&
1218             current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_NONE ) {
1219             OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
1220                                  "Global) process_request_op(): Wait for ongoing checkpoint to complete..."));
1221             while( current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_ERROR &&
1222                    current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_NONE ) {
1223                 opal_progress();
1224             }
1225         }
1226 
1227         /*
1228          * Tell application that it is now ok to finailze
1229          */
1230         OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
1231                              "Global) process_request_op(): Send Finalize ACK to the job"));
1232 
1233         buffer = OBJ_NEW(opal_buffer_t);
1234         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
1235             ORTE_ERROR_LOG(ret);
1236             goto cleanup;
1237         }
1238 
1239         op_event = ORTE_SNAPC_OP_FIN_ACK;
1240         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &op_event, 1, OPAL_INT))) {
1241             ORTE_ERROR_LOG(ret);
1242             goto cleanup;
1243         }
1244 
1245         if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(sender, buffer, ORTE_RML_TAG_SNAPC_FULL,
1246                                                            orte_rml_send_callback, NULL))) {
1247             ORTE_ERROR_LOG(ret);
1248             goto cleanup;
1249         }
1250         /* buffer should not be released here; the callback releases it */
1251         buffer = NULL;
1252     }
1253     /************************************
1254      * Start a checkpoint operation
1255      ************************************/
1256     else if( ORTE_SNAPC_OP_CHECKPOINT == op_event ) {
1257         OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1258                              "Global) process_request_op(): Starting checkpoint (%2d)\n",
1259                              op_event));
1260 
1261         options = OBJ_NEW(opal_crs_base_ckpt_options_t);
1262         if( ORTE_SUCCESS != (ret = snapc_full_global_checkpoint(options) ) ) {
1263             ORTE_ERROR_LOG(ret);
1264             goto cleanup;
1265         }
1266 
1267         /*
1268          * Wait for the operation to complete
1269          */
1270         while( current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_ERROR &&
1271                current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_NONE ) {
1272             opal_progress();
1273         }
1274 
1275         if( ORTE_SNAPC_CKPT_STATE_ERROR == current_job_ckpt_state ) {
1276             op_state = -1;
1277         } else {
1278             op_state = 0;
1279         }
1280 
1281         /*
1282          * Tell the sender that the operation is finished
1283          */
1284         buffer = OBJ_NEW(opal_buffer_t);
1285         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
1286             ORTE_ERROR_LOG(ret);
1287             goto cleanup;
1288         }
1289 
1290         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &op_event, 1, OPAL_INT))) {
1291             ORTE_ERROR_LOG(ret);
1292             goto cleanup;
1293         }
1294 
1295         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &op_state, 1, OPAL_INT))) {
1296             ORTE_ERROR_LOG(ret);
1297             goto cleanup;
1298         }
1299 
1300         if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(sender, buffer, ORTE_RML_TAG_SNAPC_FULL,
1301                                                            orte_rml_send_callback, NULL))) {
1302             ORTE_ERROR_LOG(ret);
1303             goto cleanup;
1304         }
1305         /* buffer should not be released here; the callback releases it */
1306         buffer = NULL;
1307     }
1308     /************************************
1309      * Start the Restart operation
1310      ************************************/
1311     else if( ORTE_SNAPC_OP_RESTART == op_event ) {
1312         OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1313                              "Global) process_request_op(): Starting restart (%2d)\n",
1314                              op_event));
1315 
1316         count = 1;
1317         if (ORTE_SUCCESS != (ret = opal_dss.unpack(sbuffer, &seq_num, &count, OPAL_INT))) {
1318             ORTE_ERROR_LOG(ret);
1319             orte_snapc_ckpt_state_notify(ORTE_SNAPC_CKPT_STATE_NO_RESTART);
1320             goto cleanup;
1321         }
1322 
1323         count = 1;
1324         if (ORTE_SUCCESS != (ret = opal_dss.unpack(sbuffer, &global_handle, &count, OPAL_STRING))) {
1325             ORTE_ERROR_LOG(ret);
1326             orte_snapc_ckpt_state_notify(ORTE_SNAPC_CKPT_STATE_NO_RESTART);
1327             goto cleanup;
1328         }
1329 
1330         /*
1331          * Kick off the restart
1332          */
1333         if( ORTE_SUCCESS != (ret = orte_errmgr_base_restart_job(current_global_jobid, global_handle, seq_num) ) ) {
1334             ORTE_ERROR_LOG(ret);
1335             orte_snapc_ckpt_state_notify(ORTE_SNAPC_CKPT_STATE_NO_RESTART);
1336             goto cleanup;
1337         }
1338     }
1339     /************************************
1340      * Start the Migration operation
1341      ************************************/
1342     else if( ORTE_SNAPC_OP_MIGRATE == op_event ) {
1343         OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1344                              "Global) process_request_op(): Starting migration (%2d)\n",
1345                              op_event));
1346 
1347         datum = OBJ_NEW(orte_snapc_base_request_op_t);
1348 
1349         /*
1350          * Unpack migration information
1351          */
1352         count = 1;
1353         if (ORTE_SUCCESS != (ret = opal_dss.unpack(sbuffer, &(datum->mig_num), &count, OPAL_INT))) {
1354             ORTE_ERROR_LOG(ret);
1355             goto cleanup;
1356         }
1357 
1358         datum->mig_vpids = malloc(sizeof(int) * datum->mig_num);
1359         datum->mig_host_pref = malloc(sizeof(char) * datum->mig_num * OPAL_MAX_PROCESSOR_NAME);
1360         datum->mig_vpid_pref = malloc(sizeof(int) * datum->mig_num);
1361         datum->mig_off_node  = malloc(sizeof(int) * datum->mig_num);
1362 
1363         for( i = 0; i < datum->mig_num; ++i ) {
1364             (datum->mig_vpids)[i] = 0;
1365             (datum->mig_host_pref)[i][0] = '\0';
1366             (datum->mig_vpid_pref)[i] = 0;
1367             (datum->mig_off_node)[i] = (int)false;
1368         }
1369 
1370         for( i = 0; i < datum->mig_num; ++i ) {
1371             count = 1;
1372             if (ORTE_SUCCESS != (ret = opal_dss.unpack(sbuffer, &((datum->mig_vpids)[i]), &count, OPAL_INT))) {
1373                 ORTE_ERROR_LOG(ret);
1374                 goto cleanup;
1375             }
1376 
1377             if(NULL != tmp_str ) {
1378                 free(tmp_str);
1379                 tmp_str = NULL;
1380             }
1381             count = 1;
1382             if (ORTE_SUCCESS != (ret = opal_dss.unpack(sbuffer, &tmp_str, &count, OPAL_STRING))) {
1383                 ORTE_ERROR_LOG(ret);
1384                 goto cleanup;
1385             }
1386             opal_string_copy( ((datum->mig_host_pref)[i]), tmp_str, OPAL_MAX_PROCESSOR_NAME);
1387 
1388             count = 1;
1389             if (ORTE_SUCCESS != (ret = opal_dss.unpack(sbuffer, &((datum->mig_vpid_pref)[i]), &count, OPAL_INT))) {
1390                 ORTE_ERROR_LOG(ret);
1391                 goto cleanup;
1392             }
1393 
1394             count = 1;
1395             if (ORTE_SUCCESS != (ret = opal_dss.unpack(sbuffer, &((datum->mig_off_node)[i]), &count, OPAL_INT))) {
1396                 ORTE_ERROR_LOG(ret);
1397                 goto cleanup;
1398             }
1399 
1400             OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
1401                                  "Global) Migration %3d/%3d: Received Rank %3d - Requested <%s> (%3d) %c\n",
1402                                  datum->mig_num, i,
1403                                  (datum->mig_vpids)[i],
1404                                  (datum->mig_host_pref)[i],
1405                                  (datum->mig_vpid_pref)[i],
1406                                  (OPAL_INT_TO_BOOL((datum->mig_off_node)[i]) ? 'T' : 'F')
1407                                  ));
1408         }
1409 
1410         /*
1411          * Kick off the migration
1412          */
1413         OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
1414                              "Global) ------ Kick Off Migration -----"));
1415         if( ORTE_SUCCESS != (ret = orte_errmgr_base_migrate_job(current_global_jobid, datum) ) ) {
1416             ORTE_ERROR_LOG(ret);
1417             goto cleanup;
1418         }
1419 
1420         /*
1421          * Tell the sender that the operation is finished
1422          */
1423         OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
1424                              "Global) ------ Finished Migration. Release processes (%15s )-----",
1425                              ORTE_NAME_PRINT(sender) ));
1426         buffer = OBJ_NEW(opal_buffer_t);
1427         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
1428             ORTE_ERROR_LOG(ret);
1429             goto cleanup;
1430         }
1431 
1432         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &op_event, 1, OPAL_INT))) {
1433             ORTE_ERROR_LOG(ret);
1434             goto cleanup;
1435         }
1436 
1437         op_state = 0;
1438         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &op_state, 1, OPAL_INT))) {
1439             ORTE_ERROR_LOG(ret);
1440             goto cleanup;
1441         }
1442 
1443         if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(sender, buffer, ORTE_RML_TAG_SNAPC_FULL,
1444                                                            orte_rml_send_callback, NULL))) {
1445             ORTE_ERROR_LOG(ret);
1446             goto cleanup;
1447         }
1448 
1449         OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
1450                              "Global) ------ Finished Migration. Released processes (%15s )-----",
1451                              ORTE_NAME_PRINT(sender) ));
1452     }
1453     /************************************
1454      * Start the Quiesce operation
1455      ************************************/
1456     else if( ORTE_SNAPC_OP_QUIESCE_START == op_event) {
1457         OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1458                              "Global) process_request_op(): Starting quiesce (%2d)\n",
1459                              op_event));
1460 
1461         options = OBJ_NEW(opal_crs_base_ckpt_options_t);
1462         options->inc_prep_only = true;
1463         if( ORTE_SUCCESS != (ret = snapc_full_global_checkpoint(options) ) ) {
1464             ORTE_ERROR_LOG(ret);
1465             goto cleanup;
1466         }
1467 
1468         /*
1469          * Wait for quiescence
1470          */
1471         while( current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_ERROR &&
1472                current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_INC_PREPED ) {
1473             opal_progress();
1474         }
1475 
1476         OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1477                              "Global) process_request_op(): Quiesce_start finished(%2d)\n",
1478                              op_event));
1479     }
1480     /************************************
1481      * End the Quiesce operation
1482      ************************************/
1483     else if( ORTE_SNAPC_OP_QUIESCE_END == op_event) {
1484         OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1485                              "Global) process_request_op(): Ending quiesce (%2d)\n",
1486                              op_event));
1487 
1488         /*
1489          * Wait for the checkpoint operation to finish
1490          */
1491         while( current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_ERROR &&
1492                current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_NONE ) {
1493             opal_progress();
1494         }
1495 
1496         OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1497                              "Global) process_request_op(): Quiesce_end finished(%2d)\n",
1498                              op_event));
1499     }
1500 
1501 cleanup:
1502     if (NULL != buffer) {
1503         OBJ_RELEASE(buffer);
1504         buffer = NULL;
1505     }
1506 
1507     if( NULL != options ) {
1508         OBJ_RELEASE(options);
1509         options = NULL;
1510     }
1511 
1512     if(NULL != tmp_str ) {
1513         free(tmp_str);
1514         tmp_str = NULL;
1515     }
1516 
1517     return;
1518 }
1519 
1520 static int snapc_full_process_orted_update_cmd(orte_process_name_t* sender,
1521                                                opal_buffer_t* buffer,
1522                                                bool quick)
1523 {
1524     int ret, exit_status = ORTE_SUCCESS;
1525     orte_std_cntr_t count;
1526     int remote_ckpt_state;
1527     opal_list_item_t* item = NULL;
1528     opal_list_item_t* aitem = NULL;
1529     orte_snapc_full_orted_snapshot_t *orted_snapshot = NULL;
1530     orte_snapc_base_local_snapshot_t *app_snapshot = NULL;
1531     int loc_min_state;
1532     char *state_str = NULL;
1533 
1534     orted_snapshot = find_orted_snapshot(sender);
1535     if( NULL == orted_snapshot ) {
1536         opal_output(mca_snapc_full_component.super.output_handle,
1537                     "Global) Error: Unknown Daemon %s",
1538                     ORTE_NAME_PRINT(sender) );
1539         exit_status = ORTE_ERROR;
1540         ORTE_ERROR_LOG(ORTE_ERROR);
1541         goto cleanup;
1542     }
1543 
1544     OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
1545                          "Global) Daemon %s: Changed state to:\n",
1546                          ORTE_NAME_PRINT(&(orted_snapshot->process_name)) ));
1547 
1548     /*
1549      * Unpack the data (quick)
1550      * - state
1551      * Unpack the data (long)
1552      * - state
1553      * - # procs
1554      * - Foreach proc
1555      *   - process name
1556      */
1557     count = 1;
1558     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &remote_ckpt_state, &count, OPAL_INT))) {
1559         ORTE_ERROR_LOG(ret);
1560         exit_status = ret;
1561         goto cleanup;
1562     }
1563     orted_snapshot->state = remote_ckpt_state;
1564     orte_snapc_ckpt_state_str(&state_str, orted_snapshot->state);
1565     OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
1566                          "Global)   State:        %d (%s)\n",
1567                          (int)(orted_snapshot->state), state_str));
1568     free(state_str);
1569     state_str = NULL;
1570 
1571     /* JJH: Though there is currently no additional information sent in a long
1572      *      message versus a small message, keep this logic so that in the
1573      *      future it can be easily reused without substantially modifying
1574      *      the component.
1575      */
1576     if( quick ) {
1577         exit_status = ORTE_SUCCESS;
1578         goto post_process;
1579     }
1580 
1581  post_process:
1582     loc_min_state = snapc_full_global_get_min_state();
1583 
1584     SNAPC_FULL_REPORT_PROGRESS(orted_snapshot, current_total_orteds, loc_min_state);
1585 
1586     /*
1587      * Notify the orte-checkpoint command once we have everyone running.
1588      * No need to broadcast this to everyone since they already know.
1589      */
1590     if( ORTE_SNAPC_CKPT_STATE_RUNNING == loc_min_state &&
1591         ORTE_SNAPC_CKPT_STATE_RUNNING != current_job_ckpt_state) {
1592         current_job_ckpt_state = loc_min_state;
1593 
1594         SNAPC_FULL_SET_TIMER(SNAPC_FULL_TIMER_RUNNING);
1595 
1596         if( is_orte_checkpoint_connected &&
1597             ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_update_cmd(&orte_checkpoint_sender,
1598                                                                                 global_snapshot.ss_handle,
1599                                                                                 current_job_ckpt_state)) ) {
1600             ORTE_ERROR_LOG(ret);
1601             exit_status = ret;
1602             goto cleanup;
1603         }
1604     }
1605 
1606     /*
1607      * If we are just prep'ing the INC, then acknowledge the state change
1608      */
1609     if( ORTE_SNAPC_CKPT_STATE_INC_PREPED == loc_min_state &&
1610         ORTE_SNAPC_CKPT_STATE_INC_PREPED > current_job_ckpt_state) {
1611         current_job_ckpt_state = loc_min_state;
1612 
1613         OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
1614                              "Global)    All Processes have finished the INC prep!\n"));
1615     }
1616 
1617     /*
1618      * Notify the orte-checkpoint command once we have everyone stopped.
1619      * No need to broadcast this to everyone since they already know.
1620      */
1621     if( ORTE_SNAPC_CKPT_STATE_STOPPED == loc_min_state &&
1622         ORTE_SNAPC_CKPT_STATE_STOPPED > current_job_ckpt_state) {
1623         current_job_ckpt_state = loc_min_state;
1624 
1625         OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
1626                              "Global)    All Processes have been stopped!\n"));
1627 
1628         if( is_orte_checkpoint_connected &&
1629             ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_update_cmd(&orte_checkpoint_sender,
1630                                                                                 global_snapshot.ss_handle,
1631                                                                                 current_job_ckpt_state)) ) {
1632             ORTE_ERROR_LOG(ret);
1633             exit_status = ret;
1634             goto cleanup;
1635         }
1636 
1637         /* orte-checkpoint detaches at this point */
1638         is_orte_checkpoint_connected = false;
1639 
1640         /*
1641          * Synchronize the checkpoint here
1642          */
1643         write_out_global_metadata();
1644     }
1645 
1646     /*
1647      * If all daemons have finished, let everyone know we are locally finished.
1648      */
1649     if( ORTE_SNAPC_CKPT_STATE_FINISHED_LOCAL == loc_min_state &&
1650         ORTE_SNAPC_CKPT_STATE_FINISHED_LOCAL > current_job_ckpt_state) {
1651 
1652         SNAPC_FULL_SET_TIMER(SNAPC_FULL_TIMER_FIN_LOCAL);
1653 
1654         if( ORTE_SNAPC_CKPT_STATE_NONE != current_job_ckpt_state ) {
1655             if( loc_min_state == current_job_ckpt_state) {
1656                 opal_output(0, "Global) JJH WARNING!!: (%d) == (%d)", loc_min_state, current_job_ckpt_state);
1657             }
1658         }
1659 
1660         if( currently_migrating ) {
1661             write_out_global_metadata();
1662             current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_MIGRATING;
1663         }
1664         else {
1665             current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_FINISHED_LOCAL;
1666         }
1667 
1668         if( NULL != state_str ) {
1669             free(state_str);
1670         }
1671         orte_snapc_ckpt_state_str(&state_str, current_job_ckpt_state);
1672         OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
1673                              "Global) Job State Changed: %d (%s)\n",
1674                              (int)current_job_ckpt_state, state_str ));
1675         free(state_str);
1676         state_str = NULL;
1677 
1678         if( ORTE_SUCCESS != (ret = orte_snapc_full_global_set_job_ckpt_info(current_global_jobid,
1679                                                                             current_job_ckpt_state,
1680                                                                             global_snapshot.ss_handle,
1681                                                                             true, NULL) ) ) {
1682             ORTE_ERROR_LOG(ret);
1683             exit_status = ret;
1684             goto cleanup;
1685         }
1686 
1687         /*
1688          * Now that we have finished locally,
1689          * - Write out the metadata
1690          * - Sync the snapshot to SStore
1691          * if we are stopping then we have already written out this data.
1692          */
1693         if( !(global_snapshot.options->stop) && !currently_migrating ) {
1694             write_out_global_metadata();
1695         }
1696 
1697         SNAPC_FULL_SET_TIMER(SNAPC_FULL_TIMER_ESTABLISH);
1698 
1699         if( ORTE_SUCCESS != (ret = orte_snapc_full_global_set_job_ckpt_info(current_global_jobid,
1700                                                                             ORTE_SNAPC_CKPT_STATE_ESTABLISHED,
1701                                                                             global_snapshot.ss_handle,
1702                                                                             true, NULL) ) ) {
1703             ORTE_ERROR_LOG(ret);
1704             exit_status = ret;
1705             goto cleanup;
1706         }
1707     }
1708 
1709     /*
1710      * If all daemons have confirmed that their local proces are finished
1711      * and we have finished establishing the checkpoint,
1712      * then let the command line tool know and cleanup.
1713      */
1714     if( ORTE_SNAPC_CKPT_STATE_RECOVERED == loc_min_state &&
1715         ORTE_SNAPC_CKPT_STATE_RECOVERED > current_job_ckpt_state ) {
1716 
1717         /*
1718          * If this is a job restarting then we do something different
1719          */
1720         if( current_job_ckpt_state == ORTE_SNAPC_CKPT_STATE_NONE ) {
1721             OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1722                                  "Global) Job has been successfully restarted"));
1723 
1724             /*current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_RECOVERED;*/
1725             orte_snapc_ckpt_state_notify(ORTE_SNAPC_CKPT_STATE_RECOVERED);
1726 
1727             for(item  = opal_list_get_first(&(global_snapshot.local_snapshots));
1728                 item != opal_list_get_end(&(global_snapshot.local_snapshots));
1729                 item  = opal_list_get_next(item) ) {
1730                 orted_snapshot = (orte_snapc_full_orted_snapshot_t*)item;
1731 
1732                 orted_snapshot->state = ORTE_SNAPC_CKPT_STATE_NONE;
1733 
1734                 for(aitem  = opal_list_get_first(&(orted_snapshot->super.local_snapshots));
1735                     aitem != opal_list_get_end(&(orted_snapshot->super.local_snapshots));
1736                     aitem  = opal_list_get_next(aitem) ) {
1737                     app_snapshot = (orte_snapc_base_local_snapshot_t*)aitem;
1738 
1739                     app_snapshot->state = ORTE_SNAPC_CKPT_STATE_NONE;
1740                 }
1741             }
1742 
1743             SNAPC_FULL_SET_TIMER(SNAPC_FULL_TIMER_RECOVERED);
1744             SNAPC_FULL_DISPLAY_RECOVERED_TIMER();
1745             orte_snapc_base_has_recovered = true;
1746             is_app_checkpointable = true;
1747 
1748             exit_status = ORTE_SUCCESS;
1749             goto cleanup;
1750         }
1751 
1752         /*
1753          * If the checkpoint has not been established yet, then do not clear the
1754          * snapshot structure just yet.
1755          */
1756         if(ORTE_SNAPC_CKPT_STATE_ESTABLISHED != current_job_ckpt_state ) {
1757             cleanup_on_establish = true;
1758         }
1759 
1760         current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_RECOVERED;
1761 
1762         if( NULL != state_str ) {
1763             free(state_str);
1764         }
1765         orte_snapc_ckpt_state_str(&state_str, current_job_ckpt_state);
1766         OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
1767                              "Global) Job State Changed: %d (%s)\n",
1768                              (int)current_job_ckpt_state, state_str ));
1769         free(state_str);
1770         state_str = NULL;
1771 
1772         /*
1773          * Notify the orte-checkpoint command
1774          */
1775         if( is_orte_checkpoint_connected &&
1776             ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_update_cmd(&orte_checkpoint_sender,
1777                                                                                 global_snapshot.ss_handle,
1778                                                                                 current_job_ckpt_state)) ) {
1779             ORTE_ERROR_LOG(ret);
1780             exit_status = ret;
1781             goto cleanup;
1782         }
1783 
1784         SNAPC_FULL_SET_TIMER(SNAPC_FULL_TIMER_RECOVERED);
1785 
1786         /*
1787          * If the checkpoint has been established at this point, then cleanup.
1788          */
1789         if( !cleanup_on_establish && ORTE_SNAPC_CKPT_STATE_RECOVERED == current_job_ckpt_state) {
1790             if( ORTE_SUCCESS != (ret = orte_snapc_full_global_reset_coord()) ) {
1791                 ORTE_ERROR_LOG(ret);
1792                 exit_status = ret;
1793                 goto cleanup;
1794             }
1795         }
1796     }
1797 
1798  cleanup:
1799     if( NULL != state_str ) {
1800         free(state_str);
1801         state_str = NULL;
1802     }
1803 
1804     return exit_status;
1805 }
1806 
1807 static void snapc_full_process_restart_proc_info_cmd(orte_process_name_t* sender,
1808                                                      opal_buffer_t* buffer)
1809 {
1810     int ret;
1811     orte_std_cntr_t count;
1812     size_t num_vpids = 0, i;
1813     pid_t tmp_pid;
1814     char * tmp_hostname = NULL;
1815 
1816     count = 1;
1817     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &tmp_hostname, &count, OPAL_STRING))) {
1818         opal_output(mca_snapc_full_component.super.output_handle,
1819                     "Global) vpid_assoc: Failed to unpack process Hostname from peer %s\n",
1820                     ORTE_NAME_PRINT(sender));
1821         goto cleanup;
1822     }
1823 
1824     count = 1;
1825     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &num_vpids, &count, OPAL_SIZE))) {
1826         opal_output(mca_snapc_full_component.super.output_handle,
1827                     "Global) vpid_assoc: Failed to unpack num_vpids from peer %s\n",
1828                     ORTE_NAME_PRINT(sender));
1829         goto cleanup;
1830     }
1831 
1832     for(i = 0; i < num_vpids; ++i) {
1833         count = 1;
1834         if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &tmp_pid, &count, OPAL_PID))) {
1835             opal_output(mca_snapc_full_component.super.output_handle,
1836                         "Global) vpid_assoc: Failed to unpack process PID from peer %s\n",
1837                         ORTE_NAME_PRINT(sender));
1838             goto cleanup;
1839         }
1840 
1841         global_coord_restart_proc_info(tmp_pid, tmp_hostname);
1842     }
1843 
1844     /* stdout may be buffered by the C library so it needs to be flushed so
1845      * that the debugger can read the process info.
1846      */
1847     fflush(stdout);
1848 
1849  cleanup:
1850     return;
1851 }
1852 
1853 int global_coord_restart_proc_info(pid_t local_pid, char * local_hostname)
1854 {
1855     printf("MPIR_debug_info) %s:%d\n", local_hostname, local_pid);
1856     return 0;
1857 }
1858 
1859 static void snapc_full_process_job_update_cmd(orte_process_name_t* sender,
1860                                               opal_buffer_t* buffer,
1861                                               bool quick)
1862 {
1863     int ret;
1864     orte_std_cntr_t count;
1865     orte_jobid_t jobid;
1866     int   job_ckpt_state = ORTE_SNAPC_CKPT_STATE_NONE;
1867     opal_crs_base_ckpt_options_t *options = NULL;
1868     bool loc_migrating = false;
1869     size_t loc_num_procs = 0;
1870     orte_proc_t *proc = NULL;
1871     size_t i;
1872     orte_sstore_base_handle_t ss_handle;
1873 
1874     /*
1875      * Unpack the data (quick)
1876      * - jobid
1877      * - ckpt_state
1878      * - sstore_handle
1879      * Unpack the data (long)
1880      * - jobid
1881      * - ckpt_state
1882      * - ckpt_options
1883      */
1884     count = 1;
1885     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &jobid, &count, ORTE_JOBID))) {
1886         ORTE_ERROR_LOG(ret);
1887         return;
1888     }
1889 
1890     count = 1;
1891     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &job_ckpt_state, &count, OPAL_INT))) {
1892         ORTE_ERROR_LOG(ret);
1893         return;
1894     }
1895 
1896     if( !quick ) {
1897         if (ORTE_SUCCESS != (ret = orte_sstore.unpack_handle(sender, buffer, &ss_handle)) ) {
1898             ORTE_ERROR_LOG(ret);
1899             return;
1900         }
1901 
1902         options = OBJ_NEW(opal_crs_base_ckpt_options_t);
1903         if( ORTE_SUCCESS != (ret = orte_snapc_base_unpack_options(buffer, options)) ) {
1904             ORTE_ERROR_LOG(ret);
1905             return;
1906         }
1907         /* In this case we want to use the current_options that are cached
1908          * so that we do not have to send them every time.
1909          */
1910         opal_crs_base_copy_options(options, global_snapshot.options);
1911 
1912         count = 1;
1913         if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(loc_migrating), &count, OPAL_BOOL))) {
1914             ORTE_ERROR_LOG(ret);
1915             goto cleanup;
1916         }
1917 
1918         if( loc_migrating ) {
1919             count = 1;
1920             if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &loc_num_procs, &count, OPAL_SIZE))) {
1921                 ORTE_ERROR_LOG(ret);
1922                 goto cleanup;
1923             }
1924 
1925             for( i = 0; i < loc_num_procs; ++i ) {
1926                 count = 1;
1927                 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &proc, &count, ORTE_NAME))) {
1928                     ORTE_ERROR_LOG(ret);
1929                     goto cleanup;
1930                 }
1931                 /* JJH: Update local info as needed */
1932             }
1933         }
1934     }
1935 
1936     if( ORTE_SUCCESS != (ret = global_coord_job_state_update(jobid,
1937                                                              job_ckpt_state,
1938                                                              ss_handle,
1939                                                              global_snapshot.options) ) ) {
1940         ORTE_ERROR_LOG(ret);
1941     }
1942 
1943  cleanup:
1944     if( NULL != options ) {
1945         OBJ_RELEASE(options);
1946         options = NULL;
1947     }
1948 
1949     return;
1950 }
1951 
1952 static int snapc_full_establish_snapshot_dir(bool empty_metadata)
1953 {
1954     char **value = NULL;
1955     int idx = 0;
1956 
1957     /*********************
1958      * Contact the Stable Storage Framework to setup the storage directory
1959      *********************/
1960     INC_SEQ_NUM();
1961     orte_sstore.request_checkpoint_handle(&(global_snapshot.ss_handle),
1962                                           orte_snapc_base_snapshot_seq_number,
1963                                           current_global_jobid);
1964     if( currently_migrating ) {
1965         orte_sstore.set_attr(global_snapshot.ss_handle,
1966                              SSTORE_METADATA_GLOBAL_MIGRATING,
1967                              "1");
1968     }
1969     orte_sstore.register_handle(global_snapshot.ss_handle);
1970 
1971     /*
1972      * Save the AMCA parameter used into the metadata file
1973      */
1974     if( 0 > (idx = mca_base_var_find("opal", "mca", "base", "param_file_prefix")) ) {
1975         opal_show_help("help-orte-restart.txt", "amca_param_not_found", true);
1976     }
1977     if( 0 < idx ) {
1978         mca_base_var_get_value (idx, &value, NULL, NULL);
1979 
1980         if (*value) {
1981             orte_sstore.set_attr(global_snapshot.ss_handle,
1982                                  SSTORE_METADATA_GLOBAL_AMCA_PARAM,
1983                                  *value);
1984 
1985             OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1986                                  "Global) AMCA Parameter Preserved: %s",
1987                                  *value));
1988         }
1989     }
1990 
1991     /*
1992      * Save the TUNE parameter used into the metadata file
1993      */
1994     if( 0 > (idx = mca_base_var_find("opal", "mca", "base", "envar_file_prefix")) ) {
1995         opal_show_help("help-orte-restart.txt", "tune_param_not_found", true);
1996     }
1997     if( 0 < idx ) {
1998         mca_base_var_get_value (idx, &value, NULL, NULL);
1999 
2000         if (*value) {
2001             orte_sstore.set_attr(global_snapshot.ss_handle,
2002                                  SSTORE_METADATA_GLOBAL_TUNE_PARAM,
2003                                  *value);
2004 
2005             OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
2006                                  "Global) TUNE Parameter Preserved: %s",
2007                                  *value));
2008         }
2009     }
2010 
2011     return ORTE_SUCCESS;
2012 }
2013 
2014 static int snapc_full_global_checkpoint(opal_crs_base_ckpt_options_t *options)
2015 {
2016     int ret, exit_status = ORTE_SUCCESS;
2017 
2018     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
2019                          "Global) Checkpoint of job %s has been requested\n",
2020                          ORTE_JOBID_PRINT(current_global_jobid)));
2021 
2022     /* opal_output(0, "================> JJH Checkpoint Started"); */
2023 
2024     current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_REQUEST;
2025 
2026     /*********************
2027      * Generate the global snapshot directory, and unique global snapshot handle
2028      *********************/
2029     if( ORTE_SUCCESS != (ret = snapc_full_establish_snapshot_dir(false))) {
2030         ORTE_ERROR_LOG(ret);
2031         exit_status = ret;
2032         goto cleanup;
2033     }
2034 
2035     /***********************************
2036      * Do an update handshake with the orte_checkpoint command
2037      ***********************************/
2038     updated_job_to_running = false;
2039     if( is_orte_checkpoint_connected &&
2040         ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_update_cmd(&orte_checkpoint_sender,
2041                                                                             global_snapshot.ss_handle,
2042                                                                             current_job_ckpt_state) ) ) {
2043         ORTE_ERROR_LOG(ret);
2044         exit_status = ret;
2045         goto cleanup;
2046     }
2047 
2048     /**********************
2049      * Notify the Local Snapshot Coordinators of the checkpoint request
2050      **********************/
2051     OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
2052                          "Global) Notifying the Local Coordinators\n"));
2053 
2054     if( ORTE_SUCCESS != (ret = snapc_full_global_notify_checkpoint(current_global_jobid, options)) ) {
2055         ORTE_ERROR_LOG(ret);
2056         exit_status = ret;
2057         goto cleanup;
2058     }
2059 
2060  cleanup:
2061     return exit_status;
2062 }
2063 
2064 static int  snapc_full_global_notify_checkpoint(orte_jobid_t jobid,
2065                                                 opal_crs_base_ckpt_options_t *options)
2066 {
2067     int ret, exit_status = ORTE_SUCCESS;
2068     orte_snapc_full_orted_snapshot_t *orted_snapshot = NULL;
2069     opal_list_item_t* item = NULL;
2070     int ckpt_state;
2071 
2072     ckpt_state = ORTE_SNAPC_CKPT_STATE_PENDING;
2073 
2074     /*
2075      * Copy over the options
2076      */
2077     opal_crs_base_copy_options(options, global_snapshot.options);
2078 
2079     /*
2080      * Update the global structure
2081      */
2082     for(item  = opal_list_get_first(&global_snapshot.local_snapshots);
2083         item != opal_list_get_end(&global_snapshot.local_snapshots);
2084         item  = opal_list_get_next(item) ) {
2085         orted_snapshot = (orte_snapc_full_orted_snapshot_t*)item;
2086 
2087         orted_snapshot->state   = ckpt_state;
2088     }
2089 
2090     /*
2091      * Update the job state, and broadcast to all local daemons
2092      */
2093     if( ORTE_SUCCESS != (ret = orte_snapc_full_global_set_job_ckpt_info(jobid,
2094                                                                         ckpt_state,
2095                                                                         global_snapshot.ss_handle,
2096                                                                         false, options) ) ) {
2097         ORTE_ERROR_LOG(ret);
2098         exit_status = ret;
2099         goto cleanup;
2100     }
2101 
2102  cleanup:
2103     return exit_status;
2104 }
2105 
2106 /**********************************
2107  * Job/Proc State Set/Get Routines
2108  **********************************/
2109 static int orte_snapc_full_global_set_job_ckpt_info( orte_jobid_t jobid,
2110                                                      int    ckpt_state,
2111                                                      orte_sstore_base_handle_t handle,
2112                                                      bool quick,
2113                                                      opal_crs_base_ckpt_options_t *options)
2114 {
2115     int ret, exit_status = ORTE_SUCCESS;
2116     orte_snapc_full_cmd_flag_t command;
2117     opal_buffer_t *buffer = NULL;
2118     char * state_str = NULL;
2119     orte_proc_t *proc = NULL;
2120     opal_list_item_t *item = NULL;
2121     size_t num_procs;
2122     orte_grpcomm_signature_t *sig;
2123 
2124     /*
2125      * Update all Local Coordinators (broadcast operation)
2126      */
2127     buffer = OBJ_NEW(opal_buffer_t);
2128 
2129     if( quick ) {
2130         command = ORTE_SNAPC_FULL_UPDATE_JOB_STATE_QUICK_CMD;
2131     } else {
2132         command = ORTE_SNAPC_FULL_UPDATE_JOB_STATE_CMD;
2133     }
2134 
2135     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
2136         ORTE_ERROR_LOG(ret);
2137         exit_status = ret;
2138         goto cleanup;
2139     }
2140 
2141     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &jobid, 1, ORTE_JOBID))) {
2142         ORTE_ERROR_LOG(ret);
2143         exit_status = ret;
2144         goto cleanup;
2145     }
2146 
2147     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &ckpt_state, 1, OPAL_INT))) {
2148         ORTE_ERROR_LOG(ret);
2149         exit_status = ret;
2150         goto cleanup;
2151     }
2152 
2153     if( quick ) {
2154         goto process_msg;
2155     }
2156 
2157     if (ORTE_SUCCESS != (ret = orte_sstore.pack_handle(NULL, buffer, handle))) {
2158         ORTE_ERROR_LOG(ret);
2159         exit_status = ret;
2160         goto cleanup;
2161     }
2162 
2163     if(ORTE_SUCCESS != (ret = orte_snapc_base_pack_options(buffer, options))) {
2164         ORTE_ERROR_LOG(ret);
2165         exit_status = ret;
2166         goto cleanup;
2167     }
2168 
2169     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(currently_migrating), 1, OPAL_BOOL))) {
2170         ORTE_ERROR_LOG(ret);
2171         exit_status = ret;
2172         goto cleanup;
2173     }
2174 
2175     if( currently_migrating ) {
2176         num_procs = opal_list_get_size(migrating_procs);
2177 
2178         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &num_procs, 1, OPAL_SIZE))) {
2179             ORTE_ERROR_LOG(ret);
2180             exit_status = ret;
2181             goto cleanup;
2182         }
2183 
2184         for (item  = opal_list_get_first(migrating_procs);
2185              item != opal_list_get_end(migrating_procs);
2186              item  = opal_list_get_next(item)) {
2187             proc = (orte_proc_t*)item;
2188             if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(proc->name), 1, ORTE_NAME))) {
2189                 ORTE_ERROR_LOG(ret);
2190                 exit_status = ret;
2191                 goto cleanup;
2192             }
2193         }
2194     }
2195 
2196  process_msg:
2197     orte_snapc_ckpt_state_str(&state_str, ckpt_state);
2198     OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
2199                          "Global) Notify Local Coordinators of job %s state change to %d (%s)\n",
2200                          ORTE_JOBID_PRINT(jobid), (int)ckpt_state, state_str ));
2201     free(state_str);
2202     state_str = NULL;
2203 
2204     /* goes to all daemons */
2205     sig = OBJ_NEW(orte_grpcomm_signature_t);
2206     sig->signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t));
2207     sig->signature[0].jobid = ORTE_PROC_MY_NAME->jobid;
2208     sig->signature[0].vpid = ORTE_VPID_WILDCARD;
2209     if (ORTE_SUCCESS != (ret = orte_grpcomm.xcast(sig, ORTE_RML_TAG_SNAPC_FULL, buffer))) {
2210         ORTE_ERROR_LOG(ret);
2211         exit_status = ret;
2212         goto cleanup;
2213     }
2214 
2215     /*
2216      * We will also receive the job update, and process in the RML callback
2217      */
2218 
2219  cleanup:
2220     if( NULL != state_str ) {
2221         free(state_str);
2222         state_str = NULL;
2223     }
2224 
2225     OBJ_RELEASE(buffer);
2226     OBJ_RELEASE(sig);
2227 
2228     return exit_status;
2229 }
2230 
2231 int global_coord_job_state_update(orte_jobid_t jobid,
2232                                   int    job_ckpt_state,
2233                                   orte_sstore_base_handle_t ss_handle,
2234                                   opal_crs_base_ckpt_options_t *options)
2235 {
2236     int ret, exit_status = ORTE_SUCCESS;
2237     char * state_str = NULL;
2238 
2239     orte_snapc_ckpt_state_str(&state_str, job_ckpt_state);
2240     OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
2241                          "Global) Job update command: jobid %s -> state %d (%s)\n",
2242                          ORTE_JOBID_PRINT(jobid), (int)job_ckpt_state, state_str ));
2243     free(state_str);
2244     state_str = NULL;
2245 
2246     /************************
2247      * Update the orte_checkpoint command
2248      ************************/
2249     current_job_ckpt_state = job_ckpt_state;
2250     if( is_orte_checkpoint_connected &&
2251         ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_update_cmd(&orte_checkpoint_sender,
2252                                                                             global_snapshot.ss_handle,
2253                                                                             current_job_ckpt_state)) ) {
2254         ORTE_ERROR_LOG(ret);
2255         exit_status = ret;
2256         goto cleanup;
2257     }
2258 
2259     /*
2260      * Global Coordinator: If also a Local coordinator then act locally before globally
2261      */
2262     if( ORTE_SNAPC_LOCAL_COORD_TYPE == (orte_snapc_coord_type & ORTE_SNAPC_LOCAL_COORD_TYPE) ) {
2263         if( ORTE_SUCCESS != (ret = local_coord_job_state_update(jobid,
2264                                                                 job_ckpt_state,
2265                                                                 ss_handle,
2266                                                                 options)) ) {
2267             ORTE_ERROR_LOG(ret);
2268             exit_status = ret;
2269             goto cleanup;
2270         }
2271     }
2272 
2273     /*
2274      * Process the cmd
2275      */
2276     if(ORTE_SNAPC_CKPT_STATE_ESTABLISHED == job_ckpt_state ) {
2277         /*
2278          * If the processes recovered before the checkpoint was established,
2279          * then we need to cleanup here instead of in the recovery block
2280          */
2281         if( cleanup_on_establish ) {
2282             if( ORTE_SUCCESS != (ret = orte_snapc_full_global_reset_coord()) ) {
2283                 ORTE_ERROR_LOG(ret);
2284                 exit_status = ret;
2285                 goto cleanup;
2286             }
2287         }
2288     }
2289     else if(ORTE_SNAPC_CKPT_STATE_ERROR     == job_ckpt_state ) {
2290         opal_output(mca_snapc_full_component.super.output_handle,
2291                     "Error: Checkpoint failed!");
2292     }
2293     /*
2294      * This should not happen, since this state is always handled locally
2295      */
2296     else if(ORTE_SNAPC_CKPT_STATE_STOPPED == job_ckpt_state ) {
2297         ;
2298     }
2299     /*
2300      * This should not happen, since we do not handle this case
2301      */
2302     else if(ORTE_SNAPC_CKPT_STATE_REQUEST == job_ckpt_state ) {
2303         opal_output(mca_snapc_full_component.super.output_handle,
2304                     "ERROR: Internal Checkpoint request not implemented.");
2305         ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED);
2306     }
2307 
2308  cleanup:
2309     if( NULL != state_str) {
2310         free(state_str);
2311         state_str = NULL;
2312     }
2313 
2314     return exit_status;
2315 }
2316 
2317 static int write_out_global_metadata(void)
2318 {
2319     orte_snapc_full_orted_snapshot_t *orted_snapshot = NULL;
2320     opal_list_item_t* orted_item = NULL;
2321 
2322     OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
2323                          "Global) Updating Metadata"));
2324 
2325     /*
2326      * Check for an error
2327      * JJH CLEANUP: Check might be good, but mostly unnecessary
2328      * JJH: Do we want to pass this along to the SStore? Probably
2329      */
2330     for(orted_item  = opal_list_get_first(&(global_snapshot.local_snapshots));
2331         orted_item != opal_list_get_end(&(global_snapshot.local_snapshots));
2332         orted_item  = opal_list_get_next(orted_item) ) {
2333         orted_snapshot = (orte_snapc_full_orted_snapshot_t*)orted_item;
2334 
2335         if( ORTE_SNAPC_CKPT_STATE_ERROR == orted_snapshot->state ) {
2336             return ORTE_ERROR;
2337         }
2338     }
2339 
2340     /*
2341      * Sync the stable storage
2342      */
2343     orte_sstore.sync(global_snapshot.ss_handle);
2344 
2345     SNAPC_FULL_SET_TIMER(SNAPC_FULL_TIMER_SS_SYNC);
2346 
2347     return ORTE_SUCCESS;
2348 }
2349 
2350 static orte_snapc_full_orted_snapshot_t *find_orted_snapshot(orte_process_name_t *name )
2351 {
2352     int ret;
2353 
2354     orte_snapc_full_orted_snapshot_t *orted_snapshot = NULL;
2355     opal_list_item_t* item = NULL;
2356     orte_ns_cmp_bitmask_t mask;
2357 
2358     for(item  = opal_list_get_first(&(global_snapshot.local_snapshots));
2359         item != opal_list_get_end(&(global_snapshot.local_snapshots));
2360         item  = opal_list_get_next(item) ) {
2361         orted_snapshot = (orte_snapc_full_orted_snapshot_t*)item;
2362 
2363         mask = ORTE_NS_CMP_ALL;
2364 
2365         if (OPAL_EQUAL ==
2366                 orte_util_compare_name_fields(mask, name, &orted_snapshot->process_name)) {
2367             return orted_snapshot;
2368         }
2369     }
2370 
2371     /*
2372      * Refresh the job structure, and try again
2373      */
2374     OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
2375                          "Global) find_orted(%s) failed. Refreshing and trying again...",
2376                          ORTE_NAME_PRINT(name) ));
2377 
2378     if( ORTE_SUCCESS != (ret = global_refresh_job_structs()) ) {
2379         ORTE_ERROR_LOG(ret);
2380         return NULL;
2381     }
2382 
2383     for(item  = opal_list_get_first(&(global_snapshot.local_snapshots));
2384         item != opal_list_get_end(&(global_snapshot.local_snapshots));
2385         item  = opal_list_get_next(item) ) {
2386         orted_snapshot = (orte_snapc_full_orted_snapshot_t*)item;
2387 
2388         mask = ORTE_NS_CMP_ALL;
2389 
2390         if (OPAL_EQUAL ==
2391                 orte_util_compare_name_fields(mask, name, &orted_snapshot->process_name)) {
2392             return orted_snapshot;
2393         }
2394     }
2395 
2396     return NULL;
2397 }
2398 
2399 static int snapc_full_global_get_min_state(void)
2400 {
2401     int min_state = ORTE_SNAPC_CKPT_MAX;
2402     orte_snapc_full_orted_snapshot_t *orted_snapshot = NULL;
2403     opal_list_item_t* item = NULL;
2404     char * state_str_a = NULL;
2405     char * state_str_b = NULL;
2406 
2407     current_total_orteds = 0;
2408 
2409     for(item  = opal_list_get_first(&(global_snapshot.local_snapshots));
2410         item != opal_list_get_end(&(global_snapshot.local_snapshots));
2411         item  = opal_list_get_next(item) ) {
2412         orted_snapshot = (orte_snapc_full_orted_snapshot_t*)item;
2413 
2414         /* Ignore orteds with no processes */
2415         if( 0 >= opal_list_get_size(&(orted_snapshot->super.local_snapshots)) ) {
2416             OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
2417                                  "Global) ... %s Skipping - (no children)",
2418                                  ORTE_NAME_PRINT(&orted_snapshot->process_name) ));
2419             continue;
2420         }
2421 
2422         current_total_orteds++;
2423 
2424         if( NULL != state_str_a ) {
2425             free(state_str_a);
2426             state_str_a = NULL;
2427         }
2428         if( NULL != state_str_b ) {
2429             free(state_str_b);
2430             state_str_b = NULL;
2431         }
2432 
2433         orte_snapc_ckpt_state_str(&state_str_a, orted_snapshot->state);
2434         orte_snapc_ckpt_state_str(&state_str_b, min_state);
2435 
2436         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
2437                              "Global) ... %s Checking [%d %s] vs [%d %s]",
2438                              ORTE_NAME_PRINT(&orted_snapshot->process_name),
2439                              (int)orted_snapshot->state, state_str_a,
2440                              min_state, state_str_b ));
2441 
2442         if( (int)min_state > (int)orted_snapshot->state ) {
2443             min_state = orted_snapshot->state;
2444 
2445             OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
2446                                  "Global) ... %s Update  --> Min State [%d %s]",
2447                                  ORTE_NAME_PRINT(&orted_snapshot->process_name),
2448                                  (int)min_state, state_str_a ));
2449         }
2450     }
2451 
2452     if( NULL != state_str_b ) {
2453         free(state_str_b);
2454         state_str_b = NULL;
2455     }
2456     orte_snapc_ckpt_state_str(&state_str_b, min_state);
2457     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
2458                          "Global) ... Min State [%d %s]",
2459                          (int)min_state, state_str_b ));
2460 
2461     if( NULL != state_str_a ) {
2462         free(state_str_a);
2463         state_str_a = NULL;
2464     }
2465     if( NULL != state_str_b ) {
2466         free(state_str_b);
2467         state_str_b = NULL;
2468     }
2469 
2470     return min_state;
2471 }
2472 
2473 static int orte_snapc_full_global_reset_coord(void)
2474 {
2475     int ret, exit_status = ORTE_SUCCESS;
2476     opal_list_item_t* item = NULL;
2477     opal_list_item_t* aitem = NULL;
2478     orte_snapc_full_orted_snapshot_t *orted_snapshot = NULL;
2479     orte_snapc_base_local_snapshot_t *app_snapshot = NULL;
2480 
2481 
2482     /********************************
2483      * Terminate the job if requested
2484      * At this point the application should have already exited, but do this
2485      * just to make doubly sure that the job is terminated.
2486      *********************************/
2487     if( global_snapshot.options->term ) {
2488         SNAPC_FULL_DISPLAY_ALL_TIMERS();
2489         orte_plm.terminate_job(current_global_jobid);
2490     } else {
2491         SNAPC_FULL_DISPLAY_ALL_TIMERS();
2492     }
2493 
2494     /*
2495      * Just cleanup, do not need to send out another message
2496      */
2497     opal_crs_base_clear_options(global_snapshot.options);
2498 
2499     /*
2500      * Reset global data structures
2501      */
2502     for(item  = opal_list_get_first(&(global_snapshot.local_snapshots));
2503         item != opal_list_get_end(&(global_snapshot.local_snapshots));
2504         item  = opal_list_get_next(item) ) {
2505         orted_snapshot = (orte_snapc_full_orted_snapshot_t*)item;
2506 
2507         orted_snapshot->state = ORTE_SNAPC_CKPT_STATE_NONE;
2508 
2509         for(aitem  = opal_list_get_first(&(orted_snapshot->super.local_snapshots));
2510             aitem != opal_list_get_end(&(orted_snapshot->super.local_snapshots));
2511             aitem  = opal_list_get_next(aitem) ) {
2512             app_snapshot = (orte_snapc_base_local_snapshot_t*)aitem;
2513 
2514             app_snapshot->state = ORTE_SNAPC_CKPT_STATE_NONE;
2515         }
2516     }
2517 
2518     /************************
2519      * Set up the Command Line listener again
2520      *************************/
2521     is_orte_checkpoint_connected = false;
2522     if( ORTE_SUCCESS != (ret = snapc_full_global_start_cmdline_listener() ) ){
2523         ORTE_ERROR_LOG(ret);
2524         exit_status = ret;
2525     }
2526 
2527     current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_NONE;
2528     cleanup_on_establish = false;
2529 
2530     report_progress_cur_loc_finished = 0;
2531     report_progress_last_reported_loc_finished = 0;
2532 
2533     return exit_status;
2534 }
2535 
2536 /************************
2537  * Timing
2538  ************************/
2539 static void snapc_full_set_time(int idx)
2540 {
2541     if(idx < SNAPC_FULL_TIMER_MAX ) {
2542         if( timer_start[idx] <= 0.0 ) {
2543             timer_start[idx] = snapc_full_get_time();
2544         }
2545     }
2546 }
2547 
2548 static void snapc_full_display_all_timers(void)
2549 {
2550     double diff = 0.0;
2551     char * label = NULL;
2552 
2553     opal_output(0, "Snapshot Coordination Timing: ******************** Summary Begin\n");
2554 
2555     /********** Startup time **********/
2556     label = strdup("Running");
2557     diff = timer_start[SNAPC_FULL_TIMER_RUNNING]   - timer_start[SNAPC_FULL_TIMER_START];
2558     snapc_full_display_indv_timer_core(diff, label);
2559     free(label);
2560 
2561     /********** Time to finish locally **********/
2562     label = strdup("Finish Locally");
2563     diff = timer_start[SNAPC_FULL_TIMER_FIN_LOCAL] - timer_start[SNAPC_FULL_TIMER_RUNNING];
2564     snapc_full_display_indv_timer_core(diff, label);
2565     free(label);
2566 
2567     if( timer_start[SNAPC_FULL_TIMER_SS_SYNC] <= timer_start[SNAPC_FULL_TIMER_RECOVERED] ) {
2568         /********** SStore Sync **********/
2569         label = strdup("SStore Sync");
2570         diff = timer_start[SNAPC_FULL_TIMER_SS_SYNC]   - timer_start[SNAPC_FULL_TIMER_FIN_LOCAL];
2571         snapc_full_display_indv_timer_core(diff, label);
2572         free(label);
2573 
2574         /********** Establish Ckpt **********/
2575         label = strdup("Establish");
2576         diff = timer_start[SNAPC_FULL_TIMER_ESTABLISH]   - timer_start[SNAPC_FULL_TIMER_SS_SYNC];
2577         snapc_full_display_indv_timer_core(diff, label);
2578         free(label);
2579 
2580         /********** Recover **********/
2581         label = strdup("Continue/Recover");
2582         diff = timer_start[SNAPC_FULL_TIMER_RECOVERED] - timer_start[SNAPC_FULL_TIMER_ESTABLISH];
2583         snapc_full_display_indv_timer_core(diff, label);
2584         free(label);
2585     } else { /* Established after procs recovered */
2586         /********** SStore Sync **********/
2587         label = strdup("SStore Sync*");
2588         diff = timer_start[SNAPC_FULL_TIMER_SS_SYNC]   - timer_start[SNAPC_FULL_TIMER_RECOVERED];
2589         snapc_full_display_indv_timer_core(diff, label);
2590         free(label);
2591 
2592         /********** Establish Ckpt **********/
2593         label = strdup("Establish*");
2594         diff = timer_start[SNAPC_FULL_TIMER_ESTABLISH]   - timer_start[SNAPC_FULL_TIMER_SS_SYNC];
2595         snapc_full_display_indv_timer_core(diff, label);
2596         free(label);
2597 
2598         /********** Recover **********/
2599         label = strdup("Continue/Recover*");
2600         diff = timer_start[SNAPC_FULL_TIMER_RECOVERED] - timer_start[SNAPC_FULL_TIMER_FIN_LOCAL];
2601         snapc_full_display_indv_timer_core(diff, label);
2602         free(label);
2603     }
2604 
2605     opal_output(0, "Snapshot Coordination Timing: ******************** Summary End\n");
2606 }
2607 
2608 static void snapc_full_display_recovered_timers(void)
2609 {
2610     double diff = 0.0;
2611     char * label = NULL;
2612 
2613     opal_output(0, "Snapshot Coordination Timing: ******************** Summary Begin\n");
2614 
2615     /********** Recover **********/
2616     label = strdup("Recover");
2617     diff = timer_start[SNAPC_FULL_TIMER_RECOVERED] - timer_start[SNAPC_FULL_TIMER_START];
2618     snapc_full_display_indv_timer_core(diff, label);
2619     free(label);
2620 
2621     opal_output(0, "Snapshot Coordination Timing: ******************** Summary End\n");
2622 }
2623 
2624 static void snapc_full_clear_timers(void)
2625 {
2626     int i;
2627     for(i = 0; i < SNAPC_FULL_TIMER_MAX; ++i) {
2628         timer_start[i] = 0.0;
2629     }
2630 }
2631 
2632 static double snapc_full_get_time(void)
2633 {
2634     double wtime;
2635 
2636 #if OPAL_TIMER_USEC_NATIVE
2637     wtime = (double)opal_timer_base_get_usec() / 1000000.0;
2638 #else
2639     struct timeval tv;
2640     gettimeofday(&tv, NULL);
2641     wtime = tv.tv_sec;
2642     wtime += (double)tv.tv_usec / 1000000.0;
2643 #endif
2644 
2645     return wtime;
2646 }
2647 
2648 static void snapc_full_display_indv_timer_core(double diff, char *str)
2649 {
2650     double total = 0;
2651     double perc  = 0;
2652 
2653     if( timer_start[SNAPC_FULL_TIMER_SS_SYNC] <= timer_start[SNAPC_FULL_TIMER_RECOVERED] ) {
2654         total = timer_start[SNAPC_FULL_TIMER_RECOVERED] - timer_start[SNAPC_FULL_TIMER_START];
2655     } else {
2656         total = timer_start[SNAPC_FULL_TIMER_ESTABLISH] - timer_start[SNAPC_FULL_TIMER_START];
2657     }
2658     perc = (diff/total) * 100;
2659 
2660     opal_output(0,
2661                 "snapc_full: timing: %-20s = %10.2f s\t%10.2f s\t%6.2f\n",
2662                 str,
2663                 diff,
2664                 total,
2665                 perc);
2666     return;
2667 }
2668 
2669 static void snapc_full_report_progress(orte_snapc_full_orted_snapshot_t *orted_snapshot, int total, int min_state)
2670 {
2671     orte_snapc_full_orted_snapshot_t *loc_orted_snapshot = NULL;
2672     opal_list_item_t* item = NULL;
2673     double perc_done;
2674 
2675     if( ORTE_SNAPC_CKPT_STATE_FINISHED_LOCAL != orted_snapshot->state ) {
2676         return;
2677     }
2678 
2679     report_progress_cur_loc_finished++;
2680     perc_done = (total-report_progress_cur_loc_finished)/(total*1.0);
2681     perc_done = (perc_done-1)*(-100.0);
2682 
2683     if( perc_done >= (report_progress_last_reported_loc_finished + orte_snapc_full_progress_meter) ||
2684         report_progress_last_reported_loc_finished == 0.0 ) {
2685         report_progress_last_reported_loc_finished = perc_done;
2686         opal_output(0, "snapc_full: progress:   %10.2f %c Locally Finished\n",
2687                     perc_done, '%');
2688     }
2689 
2690     if( perc_done > 95.0 ) {
2691         opal_output(0, "snapc_full: progress:   Waiting on the following daemons (%10.2f %c):", perc_done, '%');
2692 
2693         for(item  = opal_list_get_first(&(global_snapshot.local_snapshots));
2694             item != opal_list_get_end(&(global_snapshot.local_snapshots));
2695             item  = opal_list_get_next(item) ) {
2696             loc_orted_snapshot = (orte_snapc_full_orted_snapshot_t*)item;
2697 
2698             if( ORTE_SNAPC_CKPT_STATE_FINISHED_LOCAL != loc_orted_snapshot->state ) {
2699                 opal_output(0, "snapc_full: progress:        Daemon %s",
2700                             ORTE_NAME_PRINT(&loc_orted_snapshot->process_name));
2701             }
2702         }
2703     }
2704 
2705     return;
2706 }

/* [<][>][^][v][top][bottom][index][help] */