root/orte/mca/sstore/stage/sstore_stage_global.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. orte_sstore_stage_global_snapshot_info_construct
  2. orte_sstore_stage_global_snapshot_info_destruct
  3. orte_sstore_stage_global_module_init
  4. orte_sstore_stage_global_module_finalize
  5. orte_sstore_stage_global_request_checkpoint_handle
  6. orte_sstore_stage_global_request_global_snapshot_data
  7. orte_sstore_stage_global_register
  8. orte_sstore_stage_global_get_attr
  9. orte_sstore_stage_global_set_attr
  10. orte_sstore_stage_global_sync
  11. sync_global_dir
  12. orte_sstore_stage_global_remove
  13. orte_sstore_stage_global_pack
  14. orte_sstore_stage_global_unpack
  15. create_new_handle_info
  16. find_handle_info
  17. sstore_stage_global_start_listener
  18. sstore_stage_global_stop_listener
  19. sstore_stage_global_recv
  20. process_local_pull
  21. process_local_push
  22. process_local_done
  23. init_global_snapshot_directory
  24. wait_all_filem
  25. xcast_remove_all
  26. metadata_open
  27. metadata_close
  28. metadata_write_int
  29. metadata_write_str
  30. metadata_write_timestamp
  31. orte_sstore_stage_extract_global_metadata
  32. stage_snapshot_sort_compare_fn
  33. sstore_stage_report_progress

   1 /*
   2  * Copyright (c)      2011 The Trustees of Indiana University.
   3  *                         All rights reserved.
   4  * Copyright (c) 2004-2011 The University of Tennessee and The University
   5  *                         of Tennessee Research Foundation.  All rights
   6  *                         reserved.
   7  * Copyright (c) 2018      Intel, Inc.  All rights reserved.
   8  * $COPYRIGHT$
   9  *
  10  * Additional copyrights may follow
  11  *
  12  * $HEADER$
  13  */
  14 
  15 /*
  16  *
  17  */
  18 
  19 #include "orte_config.h"
  20 
  21 #include <string.h>
  22 #include <stdlib.h>
  23 #include <sys/types.h>
  24 #include <sys/stat.h>
  25 #include <sys/wait.h>
  26 #ifdef HAVE_UNISTD_H
  27 #include <unistd.h>
  28 #endif  /* HAVE_UNISTD_H */
  29 
  30 #include "orte/mca/mca.h"
  31 #include "opal/mca/base/base.h"
  32 
  33 #include "opal/mca/event/event.h"
  34 
  35 #include "orte/constants.h"
  36 #include "opal/util/argv.h"
  37 #include "opal/util/output.h"
  38 #include "opal/util/show_help.h"
  39 #include "opal/util/opal_environ.h"
  40 #include "opal/util/basename.h"
  41 #include "opal/util/os_dirpath.h"
  42 #include "opal/util/opal_getcwd.h"
  43 
  44 #include "opal/threads/mutex.h"
  45 #include "opal/threads/condition.h"
  46 
  47 #include "orte/util/show_help.h"
  48 #include "orte/util/name_fns.h"
  49 #include "orte/util/proc_info.h"
  50 #include "orte/runtime/orte_globals.h"
  51 #include "orte/runtime/orte_wait.h"
  52 
  53 #include "orte/mca/errmgr/errmgr.h"
  54 #include "orte/mca/errmgr/base/base.h"
  55 #include "orte/mca/errmgr/base/errmgr_private.h"
  56 #include "orte/mca/ess/ess.h"
  57 #include "orte/mca/rml/rml.h"
  58 #include "orte/mca/rml/rml_types.h"
  59 #include "orte/mca/filem/filem.h"
  60 #include "orte/mca/grpcomm/grpcomm.h"
  61 #include "orte/mca/snapc/snapc.h"
  62 #include "orte/mca/snapc/base/base.h"
  63 
  64 #include "orte/mca/sstore/sstore.h"
  65 #include "orte/mca/sstore/base/base.h"
  66 
  67 #include "sstore_stage.h"
  68 
  69 #define SSTORE_HANDLE_TYPE_NONE    0
  70 #define SSTORE_HANDLE_TYPE_CKPT    1
  71 #define SSTORE_HANDLE_TYPE_RESTART 2
  72 
  73 #define SSTORE_GLOBAL_NONE    0
  74 #define SSTORE_GLOBAL_ERROR   1
  75 #define SSTORE_GLOBAL_INIT    2
  76 #define SSTORE_GLOBAL_REG     3
  77 #define SSTORE_GLOBAL_SYNCING 4
  78 #define SSTORE_GLOBAL_SYNCED  5
  79 
  80 /**********
  81  * Object Stuff
  82  **********/
  83 struct  orte_sstore_stage_global_snapshot_info_t {
  84     /** List super object */
  85     opal_list_item_t super;
  86 
  87     /** */
  88     orte_sstore_base_handle_t id;
  89 
  90     /** Job ID */
  91     orte_jobid_t jobid;
  92 
  93     /** State */
  94     int state;
  95 
  96     /** Handle type */
  97     int handle_type;
  98 
  99     /** Sequence Number */
 100     int seq_num;
 101 
 102     /** Reference Name */
 103     char * ref_name;
 104 
 105     /** Local Location (Relative Path to base_location) */
 106     char * local_location;
 107 
 108     /** Application location format (Global) */
 109     char * app_global_location_fmt;
 110 
 111     /** Application location format (Local) */
 112     char * app_local_location_fmt;
 113 
 114     /** Application location format (Local) */
 115     char * app_local_cache_location_fmt;
 116 
 117     /** Base location */
 118     char * base_location;
 119 
 120     /** Metadata File Name */
 121     char *metadata_filename;
 122 
 123     /** Metadata File Descriptor */
 124     FILE *metadata;
 125 
 126     /** Num procs reported as locally synced */
 127     int num_procs_synced;
 128 
 129     /** Num procs reported as done */
 130     int num_procs_done;
 131 
 132     /** Num procs total in job */
 133     int num_procs_total;
 134 
 135     /** List of FileM Requests to wait upon */
 136     opal_list_t *filem_requests;
 137 
 138     /** Is this checkpoint representing a migration? */
 139     bool migrating;
 140 
 141     /** JJH: Assume all processes are compressed the same way */
 142     char * compress_comp;
 143     char * compress_postfix;
 144 
 145     /** Progress Meter */
 146     double last_progress_report;
 147 };
 148 typedef struct orte_sstore_stage_global_snapshot_info_t orte_sstore_stage_global_snapshot_info_t;
 149 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_sstore_stage_global_snapshot_info_t);
 150 
 151 void orte_sstore_stage_global_snapshot_info_construct(orte_sstore_stage_global_snapshot_info_t *info);
 152 void orte_sstore_stage_global_snapshot_info_destruct( orte_sstore_stage_global_snapshot_info_t *info);
 153 
 154 OBJ_CLASS_INSTANCE(orte_sstore_stage_global_snapshot_info_t,
 155                    opal_list_item_t,
 156                    orte_sstore_stage_global_snapshot_info_construct,
 157                    orte_sstore_stage_global_snapshot_info_destruct);
 158 
 159 
 160 /**********
 161  * Local Function and Variable Declarations
 162  **********/
 163 static bool is_global_listener_active = false;
 164 static int sstore_stage_global_start_listener(void);
 165 static int sstore_stage_global_stop_listener(void);
 166 static void sstore_stage_global_recv(int status,
 167                                        orte_process_name_t* sender,
 168                                        opal_buffer_t* buffer,
 169                                        orte_rml_tag_t tag,
 170                                        void* cbdata);
 171 
 172 static int process_local_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_global_snapshot_info_t *handle_info);
 173 static int process_local_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_global_snapshot_info_t *handle_info);
 174 static int process_local_done(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_global_snapshot_info_t *handle_info);
 175 static int xcast_remove_all(orte_sstore_stage_global_snapshot_info_t *handle_info);
 176 
 177 static orte_sstore_stage_global_snapshot_info_t *create_new_handle_info(int seq, int type, orte_jobid_t jobid);
 178 static orte_sstore_stage_global_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle);
 179 
 180 static int metadata_open(orte_sstore_stage_global_snapshot_info_t * handle_info);
 181 static int metadata_close(orte_sstore_stage_global_snapshot_info_t * handle_info);
 182 static int metadata_write_int(orte_sstore_stage_global_snapshot_info_t * handle_info, char * key, int value);
 183 static int metadata_write_str(orte_sstore_stage_global_snapshot_info_t * handle_info, char * key, char *value);
 184 static int metadata_write_timestamp(orte_sstore_stage_global_snapshot_info_t * handle_info);
 185 
 186 static int init_global_snapshot_directory(orte_sstore_stage_global_snapshot_info_t *handle_info);
 187 static int stage_snapshot_sort_compare_fn(opal_list_item_t **a,
 188                                           opal_list_item_t **b);
 189 static int orte_sstore_stage_extract_global_metadata(orte_sstore_stage_global_snapshot_info_t * handle_info,
 190                                                      orte_sstore_base_global_snapshot_info_t *global_snapshot);
 191 
 192 static int wait_all_filem(orte_sstore_stage_global_snapshot_info_t *handle_info);
 193 static void sync_global_dir(orte_sstore_stage_global_snapshot_info_t *handle_info);
 194 
 195 static int next_handle_id = 1;
 196 static opal_list_t *active_handles = NULL;
 197 
 198 /*
 199  * Progress
 200  */
 201 static void sstore_stage_report_progress(orte_sstore_stage_global_snapshot_info_t *handle_info);
 202 
 203 #define SSTORE_STAGE_REPORT_PROGRESS(handle_info)                       \
 204     {                                                                   \
 205         if(OPAL_UNLIKELY(orte_sstore_stage_progress_meter > 0)) {       \
 206             sstore_stage_report_progress(handle_info);                  \
 207         }                                                               \
 208     }
 209 
 210 /**********
 211  * Object stuff
 212  **********/
 213 void orte_sstore_stage_global_snapshot_info_construct(orte_sstore_stage_global_snapshot_info_t *info)
 214 {
 215     info->id      = next_handle_id;
 216     next_handle_id++;
 217 
 218     info->jobid = ORTE_JOBID_INVALID;
 219 
 220     info->state = SSTORE_GLOBAL_NONE;
 221 
 222     info->handle_type = SSTORE_HANDLE_TYPE_NONE;
 223 
 224     info->seq_num = -1;
 225 
 226     info->base_location  = strdup(orte_sstore_base_global_snapshot_dir);
 227 
 228     info->ref_name       = NULL;
 229     info->local_location = NULL;
 230     info->app_global_location_fmt = NULL;
 231     info->app_local_location_fmt = NULL;
 232     info->app_local_cache_location_fmt = NULL;
 233 
 234     info->metadata_filename = NULL;
 235     info->metadata = NULL;
 236 
 237     info->filem_requests = OBJ_NEW(opal_list_t);
 238 
 239     info->num_procs_synced = 0;
 240     info->num_procs_done   = 0;
 241     info->num_procs_total  = 0;
 242 
 243     info->migrating = false;
 244 
 245     info->compress_comp    = NULL;
 246     info->compress_postfix = NULL;
 247 
 248     info->last_progress_report = 0.0;
 249 }
 250 
 251 void orte_sstore_stage_global_snapshot_info_destruct( orte_sstore_stage_global_snapshot_info_t *info)
 252 {
 253     info->id      = 0;
 254     info->seq_num = -1;
 255 
 256     info->jobid = ORTE_JOBID_INVALID;
 257 
 258     info->state = SSTORE_GLOBAL_NONE;
 259 
 260     info->handle_type = SSTORE_HANDLE_TYPE_NONE;
 261 
 262     if( NULL != info->ref_name ) {
 263         free( info->ref_name );
 264         info->ref_name  = NULL;
 265     }
 266 
 267     if( NULL != info->local_location ) {
 268         free( info->local_location );
 269         info->local_location = NULL;
 270     }
 271 
 272     if( NULL != info->app_global_location_fmt ) {
 273         free( info->app_global_location_fmt );
 274         info->app_global_location_fmt = NULL;
 275     }
 276 
 277     if( NULL != info->app_local_location_fmt ) {
 278         free( info->app_local_location_fmt );
 279         info->app_local_location_fmt = NULL;
 280     }
 281 
 282     if( NULL != info->app_local_cache_location_fmt ) {
 283         free( info->app_local_cache_location_fmt );
 284         info->app_local_cache_location_fmt = NULL;
 285     }
 286 
 287     if( NULL != info->base_location ) {
 288         free( info->base_location );
 289         info->base_location = NULL;
 290     }
 291 
 292     if( NULL != info->metadata_filename ) {
 293         free( info->metadata_filename ) ;
 294         info->metadata_filename = NULL;
 295     }
 296 
 297     if( NULL != info->metadata ) {
 298         fclose(info->metadata);
 299         info->metadata = NULL;
 300     }
 301 
 302     if( NULL != info->filem_requests ) {
 303         OBJ_RELEASE(info->filem_requests);
 304         info->filem_requests = NULL;
 305     }
 306 
 307     info->num_procs_synced = 0;
 308     info->num_procs_done   = 0;
 309     info->num_procs_total  = 0;
 310 
 311     info->migrating = false;
 312 
 313     if( NULL != info->compress_comp ) {
 314         free(info->compress_comp);
 315         info->compress_comp = NULL;
 316     }
 317 
 318     if( NULL != info->compress_postfix ) {
 319         free(info->compress_postfix);
 320         info->compress_postfix = NULL;
 321     }
 322 
 323     info->last_progress_report = 0.0;
 324 }
 325 
 326 /******************
 327  * Local functions
 328  ******************/
 329 int orte_sstore_stage_global_module_init(void)
 330 {
 331     int ret, exit_status = ORTE_SUCCESS;
 332 
 333     if( NULL == active_handles ) {
 334         active_handles = OBJ_NEW(opal_list_t);
 335     }
 336 
 337     /*
 338      * If user has not enabled recovery, but enabled Caching  then caching does
 339      * not benefit the job. Continue using it, but warn the user.
 340      */
 341     if( orte_sstore_stage_enabled_caching && !orte_enable_recovery ) {
 342         opal_show_help("help-orte-sstore-stage.txt", "caching_no_recovery", true);
 343     }
 344 
 345     /*
 346      * Setup a listener for the HNP/Apps
 347      */
 348     if( ORTE_SUCCESS != (ret = sstore_stage_global_start_listener()) ) {
 349         ORTE_ERROR_LOG(ret);
 350         exit_status = ret;
 351         goto cleanup;
 352     }
 353 
 354     exit_status = orte_sstore_stage_local_module_init();
 355 
 356  cleanup:
 357     return exit_status;
 358 }
 359 
 360 int orte_sstore_stage_global_module_finalize(void)
 361 {
 362     int ret, exit_status = ORTE_SUCCESS;
 363     orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
 364     opal_list_item_t* item = NULL;
 365     bool done = false;
 366     int cur_time = 0, max_time = 120;
 367 
 368     /*
 369      * Wait for all active transfers to finish
 370      */
 371     done = false;
 372     while( 0 < opal_list_get_size(active_handles) && !done ) {
 373         done = true;
 374         for(item  = opal_list_get_first(active_handles);
 375             item != opal_list_get_end(active_handles);
 376             item  = opal_list_get_next(item) ) {
 377             handle_info = (orte_sstore_stage_global_snapshot_info_t*)item;
 378             if( SSTORE_GLOBAL_SYNCED != handle_info->state &&
 379                 SSTORE_GLOBAL_NONE   != handle_info->state ) {
 380                 done = false;
 381                 break;
 382             }
 383         }
 384         if( done ) {
 385             break;
 386         }
 387         else {
 388             if( cur_time != 0 && cur_time % 30 == 0 ) {
 389                 opal_output(0, "---> Waiting for sync(): %3d / %3d\n",
 390                             cur_time, max_time);
 391             }
 392 
 393             opal_progress();
 394             if( cur_time >= max_time ) {
 395                 break;
 396             } else {
 397                 sleep(1);
 398             }
 399             cur_time++;
 400         }
 401     }
 402 
 403     exit_status = orte_sstore_stage_local_module_finalize();
 404 
 405     if( NULL != active_handles ) {
 406         OBJ_RELEASE(active_handles);
 407     }
 408 
 409     /*
 410      * Shutdown the listener for the HNP/Apps
 411      */
 412     if( ORTE_SUCCESS != (ret = sstore_stage_global_stop_listener()) ) {
 413         ORTE_ERROR_LOG(ret);
 414         exit_status = ret;
 415         goto cleanup;
 416     }
 417 
 418  cleanup:
 419     return exit_status;
 420 }
 421 
 422 int orte_sstore_stage_global_request_checkpoint_handle(orte_sstore_base_handle_t *handle, int seq, orte_jobid_t jobid)
 423 {
 424     int ret, exit_status = ORTE_SUCCESS;
 425     orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
 426 
 427     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
 428                          "sstore:stage:(global): request_checkpoint_handle()"));
 429 
 430     /*
 431      * Construct a handle
 432      *  - Associate all of the necessary information
 433      */
 434     handle_info = create_new_handle_info(seq, SSTORE_HANDLE_TYPE_CKPT, jobid);
 435 
 436     /*
 437      * Create the global checkpoint directory
 438      */
 439     if( ORTE_SUCCESS != (ret = init_global_snapshot_directory(handle_info)) ) {
 440         ORTE_ERROR_LOG(ret);
 441         exit_status = ret;
 442         goto cleanup;
 443     }
 444 
 445     /*
 446      * Return the handle
 447      */
 448     *handle = handle_info->id;
 449 
 450  cleanup:
 451     return exit_status;
 452 }
 453 
 454 int orte_sstore_stage_global_request_global_snapshot_data(orte_sstore_base_handle_t *handle,
 455                                                           orte_sstore_base_global_snapshot_info_t *snapshot)
 456 {
 457     int ret, exit_status = ORTE_SUCCESS;
 458     orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
 459 
 460     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
 461                          "sstore:stage:(global): request_global_snapshot_data()"));
 462 
 463     /*
 464      * Lookup the handle (if NULL, use last stable)
 465      */
 466     if( NULL != handle ) {
 467         handle_info = find_handle_info(*handle);
 468         snapshot->ss_handle = *handle;
 469     } else {
 470         handle_info = find_handle_info(orte_sstore_handle_last_stable);
 471         snapshot->ss_handle = orte_sstore_handle_last_stable;
 472     }
 473 
 474     /*
 475      * Construct the snapshot from local data, and metadata file
 476      */
 477     snapshot->seq_num   = handle_info->seq_num;
 478     snapshot->reference = strdup(handle_info->ref_name);
 479     snapshot->basedir   = strdup(handle_info->base_location);
 480     snapshot->metadata_filename = strdup(handle_info->metadata_filename);
 481 
 482     /* If this is the current checkpoint, pull data from local cache */
 483     if( orte_sstore_handle_current == snapshot->ss_handle ) {
 484         if( ORTE_SUCCESS != (ret = orte_sstore_stage_extract_global_metadata(handle_info, snapshot)) ) {
 485             ORTE_ERROR_LOG(ret);
 486             exit_status = ret;
 487             goto cleanup;
 488         }
 489     }
 490     /* Otherwise, pull from metadata */
 491     else {
 492         if( ORTE_SUCCESS != (ret = orte_sstore_base_extract_global_metadata(snapshot)) ) {
 493             ORTE_ERROR_LOG(ret);
 494             exit_status = ret;
 495             goto cleanup;
 496         }
 497     }
 498 
 499     opal_list_sort(&snapshot->local_snapshots, stage_snapshot_sort_compare_fn);
 500 
 501  cleanup:
 502     return exit_status;
 503 }
 504 
 505 int orte_sstore_stage_global_register(orte_sstore_base_handle_t handle)
 506 {
 507     int ret, exit_status = ORTE_SUCCESS;
 508     orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
 509 
 510     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
 511                          "sstore:stage:(global): register(%d) - Global", handle));
 512 
 513     /*
 514      * Lookup the handle
 515      */
 516     handle_info = find_handle_info(handle);
 517     if( SSTORE_GLOBAL_REG != handle_info->state ) {
 518         handle_info->state = SSTORE_GLOBAL_REG;
 519     } else {
 520         return orte_sstore_stage_local_register(handle);
 521     }
 522 
 523     orte_sstore_handle_current = handle;
 524 
 525     /*
 526      * Associate the metadata
 527      */
 528     if( handle_info->migrating ) {
 529         if( ORTE_SUCCESS != (ret = metadata_write_int(handle_info,
 530                                                       SSTORE_METADATA_INTERNAL_MIG_SEQ_STR,
 531                                                       handle_info->seq_num)) ) {
 532             ORTE_ERROR_LOG(ret);
 533             exit_status = ret;
 534             goto cleanup;
 535         }
 536     } else {
 537         if( ORTE_SUCCESS != (ret = metadata_write_int(handle_info,
 538                                                       SSTORE_METADATA_GLOBAL_SNAP_SEQ_STR,
 539                                                       handle_info->seq_num)) ) {
 540             ORTE_ERROR_LOG(ret);
 541             exit_status = ret;
 542             goto cleanup;
 543         }
 544     }
 545 
 546     if( ORTE_SUCCESS != (ret = metadata_write_str(handle_info,
 547                                                   SSTORE_METADATA_LOCAL_SNAP_REF_FMT_STR,
 548                                                   orte_sstore_base_local_snapshot_fmt)) ) {
 549         ORTE_ERROR_LOG(ret);
 550         exit_status = ret;
 551         goto cleanup;
 552     }
 553 
 554     if( ORTE_SUCCESS != (ret = metadata_write_timestamp(handle_info)) ) {
 555         ORTE_ERROR_LOG(ret);
 556         exit_status = ret;
 557         goto cleanup;
 558     }
 559 
 560  cleanup:
 561     return exit_status;
 562 }
 563 
 564 int orte_sstore_stage_global_get_attr(orte_sstore_base_handle_t handle, orte_sstore_base_key_t key, char **value)
 565 {
 566     int exit_status = ORTE_SUCCESS;
 567     orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
 568 
 569     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
 570                          "sstore:stage:(global): get_attr()"));
 571 
 572     /*
 573      * Lookup the handle
 574      */
 575     handle_info = find_handle_info(handle);
 576 
 577     /*
 578      * Access metadata
 579      */
 580     /* Used by snapc */
 581     if( SSTORE_METADATA_GLOBAL_SNAP_REF == key ) {
 582         *value = strdup(handle_info->ref_name);
 583     }
 584     /* Used by snapc */
 585     else if( SSTORE_METADATA_GLOBAL_SNAP_SEQ == key ) {
 586         opal_asprintf(value, "%d", handle_info->seq_num);
 587     }
 588     /* Used by orte-restart and RecoS and snapc (kinda) */
 589     else if( SSTORE_METADATA_LOCAL_SNAP_LOC == key ) {
 590         opal_asprintf(value, "%s/%s/%d",
 591                  handle_info->base_location,
 592                  handle_info->ref_name,
 593                  handle_info->seq_num);
 594     }
 595     /* Used by orte-restart and RecoS */
 596     else if( SSTORE_METADATA_LOCAL_SNAP_REF_FMT == key ) {
 597         *value = strdup(orte_sstore_base_local_snapshot_fmt);
 598     }
 599     /* Used by orte-restart and RecoS */
 600     else if( SSTORE_METADATA_LOCAL_SNAP_REF_LOC_FMT == key ) {
 601         opal_asprintf(value, "%s/%s/%d/%s",
 602                  handle_info->base_location,
 603                  handle_info->ref_name,
 604                  handle_info->seq_num,
 605                  orte_sstore_base_local_snapshot_fmt);
 606     }
 607     else {
 608         exit_status = ORTE_ERR_NOT_SUPPORTED;
 609     }
 610 
 611     return exit_status;
 612 }
 613 
 614 int orte_sstore_stage_global_set_attr(orte_sstore_base_handle_t handle, orte_sstore_base_key_t key, char *value)
 615 {
 616     int ret, exit_status = ORTE_SUCCESS;
 617     orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
 618     char *key_str = NULL;
 619 
 620     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
 621                          "sstore:stage:(global): set_attr()"));
 622 
 623     /*
 624      * Lookup the handle
 625      */
 626     handle_info = find_handle_info(handle);
 627 
 628     /*
 629      * Process key (Access metadata)
 630      */
 631     if( key == SSTORE_METADATA_GLOBAL_MIGRATING ) {
 632         handle_info->migrating = true;
 633     }
 634     else {
 635         orte_sstore_base_convert_key_to_string(key, &key_str);
 636         if( NULL == key_str ) {
 637             ORTE_ERROR_LOG(ORTE_ERROR);
 638             exit_status = ORTE_ERROR;
 639             goto cleanup;
 640         }
 641 
 642         if( ORTE_SUCCESS != (ret = metadata_write_str(handle_info, key_str, value))) {
 643             ORTE_ERROR_LOG(ret);
 644             exit_status = ret;
 645             goto cleanup;
 646         }
 647     }
 648 
 649  cleanup:
 650     if( NULL != key_str ) {
 651         free(key_str);
 652         key_str = NULL;
 653     }
 654 
 655     return exit_status;
 656 }
 657 
 658 int orte_sstore_stage_global_sync(orte_sstore_base_handle_t handle)
 659 {
 660     int ret, exit_status = ORTE_SUCCESS;
 661     orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
 662 
 663     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
 664                          "sstore:stage:(global): sync()"));
 665 
 666     /*
 667      * Lookup the handle
 668      */
 669     handle_info = find_handle_info(handle);
 670     if( SSTORE_GLOBAL_SYNCING != handle_info->state ) {
 671         handle_info->state = SSTORE_GLOBAL_SYNCING;
 672         if( ORTE_SNAPC_LOCAL_COORD_TYPE == (orte_snapc_coord_type & ORTE_SNAPC_LOCAL_COORD_TYPE) ) {
 673             return orte_sstore_stage_local_sync(handle);
 674         }
 675     }
 676 
 677     /*
 678      * Wait for all the processes to report in before waiting on all the requests
 679      */
 680     while(handle_info->num_procs_synced < handle_info->num_procs_total) {
 681         opal_progress();
 682     }
 683 
 684     /*
 685      * Synchronize all of the files
 686      * Wait on FileM operations
 687      */
 688     if( !orte_sstore_stage_skip_filem ) {
 689         if( ORTE_SUCCESS != (ret = wait_all_filem(handle_info))) {
 690             ORTE_ERROR_LOG(ret);
 691             exit_status = ret;
 692             goto cleanup;
 693         }
 694     }
 695 
 696     /*
 697      * Finalize and close the metadata
 698      */
 699     if( ORTE_SUCCESS != (ret = metadata_write_timestamp(handle_info)) ) {
 700         ORTE_ERROR_LOG(ret);
 701         exit_status = ret;
 702         goto cleanup;
 703     }
 704 
 705     if( handle_info->migrating ) {
 706         if( ORTE_SUCCESS != (ret = metadata_write_int(handle_info,
 707                                                       SSTORE_METADATA_INTERNAL_DONE_MIG_SEQ_STR,
 708                                                       handle_info->seq_num)) ) {
 709             ORTE_ERROR_LOG(ret);
 710             exit_status = ret;
 711             goto cleanup;
 712         }
 713     } else {
 714         if( ORTE_SUCCESS != (ret = metadata_write_int(handle_info,
 715                                                       SSTORE_METADATA_INTERNAL_DONE_SEQ_STR,
 716                                                       handle_info->seq_num)) ) {
 717             ORTE_ERROR_LOG(ret);
 718             exit_status = ret;
 719             goto cleanup;
 720         }
 721     }
 722 
 723     if( ORTE_SUCCESS != (ret = metadata_close(handle_info)) ) {
 724         ORTE_ERROR_LOG(ret);
 725         exit_status = ret;
 726         goto cleanup;
 727     }
 728 
 729     /* JJH: We should lock this var! */
 730     if( !handle_info->migrating ) {
 731         orte_sstore_base_is_checkpoint_available = true;
 732         orte_sstore_handle_last_stable = orte_sstore_handle_current;
 733     }
 734 
 735     handle_info->state = SSTORE_GLOBAL_SYNCED;
 736 
 737  cleanup:
 738     return exit_status;
 739 }
 740 
 741 static void sync_global_dir(orte_sstore_stage_global_snapshot_info_t *handle_info)
 742 {
 743     opal_list_item_t* item = NULL, *f_item = NULL;
 744     orte_filem_base_request_t *filem_request = NULL;
 745     orte_filem_base_file_set_t * f_set = NULL;
 746     char * fs_str = NULL;
 747     char cwd[OPAL_PATH_MAX];
 748 
 749     opal_getcwd(cwd, OPAL_PATH_MAX);
 750 
 751     /*
 752      * Sync the Sequence num dir
 753      */
 754     opal_asprintf(&fs_str, "%s/%s/%d",
 755              handle_info->base_location,
 756              handle_info->ref_name,
 757              handle_info->seq_num);
 758     OPAL_OUTPUT_VERBOSE((20, mca_sstore_stage_component.super.output_handle,
 759                          "sstore:stage:(global): sync_dir(): Sync'ing on %s",
 760                          fs_str));
 761     if( 0 != chdir(fs_str) ) {
 762         opal_output(0, "sstore:stage:(global): Failed to chdir(%s)",
 763                     fs_str);
 764         goto cleanup;
 765     }
 766     system("sync ; sync ; ls > /dev/null");
 767 
 768     /*
 769      * Sync each of the local snapshots
 770      * if compressing, then this is already covered above
 771      */
 772     if( orte_sstore_stage_enabled_compression ) {
 773         goto cleanup;
 774     }
 775 
 776     for(f_item  = opal_list_get_first(handle_info->filem_requests);
 777         f_item != opal_list_get_end(handle_info->filem_requests);
 778         f_item  = opal_list_get_next(f_item) ) {
 779         filem_request = (orte_filem_base_request_t *)f_item;
 780 
 781         for(item  = opal_list_get_first(&(filem_request->file_sets));
 782             item != opal_list_get_end(&(filem_request->file_sets));
 783             item  = opal_list_get_next(item) ) {
 784             f_set = (orte_filem_base_file_set_t *) item;
 785 
 786             if( NULL != fs_str ) {
 787                 free(fs_str);
 788                 fs_str = NULL;
 789             }
 790 
 791             if( ORTE_FILEM_TYPE_FILE != f_set->target_flag ) {
 792                 OPAL_OUTPUT_VERBOSE((20, mca_sstore_stage_component.super.output_handle,
 793                                      "sstore:stage:(global): sync_dir(): Sync'ing on %s",
 794                                      f_set->local_target));
 795                 if( 0 != chdir(f_set->local_target) ) {
 796                     opal_output(0, "sstore:stage:(global): Failed to chdir(%s)",
 797                                 f_set->local_target);
 798                 } else {
 799                     system("sync ; sync ");
 800                 }
 801             }
 802         }
 803     }
 804 
 805  cleanup:
 806     chdir(cwd);
 807 
 808     if( NULL != fs_str ) {
 809         free(fs_str);
 810         fs_str = NULL;
 811     }
 812 
 813     return;
 814 }
 815 
 816 int orte_sstore_stage_global_remove(orte_sstore_base_handle_t handle)
 817 {
 818     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
 819                          "sstore:stage:(global): remove()"));
 820 
 821     /*
 822      * Lookup the handle
 823      */
 824 
 825     return ORTE_SUCCESS;
 826 }
 827 
 828 int orte_sstore_stage_global_pack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t handle)
 829 {
 830     int ret, exit_status = ORTE_SUCCESS;
 831     orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
 832 
 833     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
 834                          "sstore:stage:(global): pack()"));
 835 
 836     /*
 837      * Lookup the handle
 838      */
 839     handle_info = find_handle_info(handle);
 840 
 841     /*
 842      * Pack the handle ID
 843      */
 844     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &handle, 1, ORTE_SSTORE_HANDLE))) {
 845         ORTE_ERROR_LOG(ret);
 846         exit_status = ret;
 847         goto cleanup;
 848     }
 849 
 850     /*
 851      * Pack any metadata
 852      */
 853     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->seq_num), 1, OPAL_INT )) ) {
 854         ORTE_ERROR_LOG(ret);
 855         exit_status = ret;
 856         goto cleanup;
 857     }
 858 
 859     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->ref_name), 1, OPAL_STRING )) ) {
 860         ORTE_ERROR_LOG(ret);
 861         exit_status = ret;
 862         goto cleanup;
 863     }
 864 
 865     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->app_local_location_fmt), 1, OPAL_STRING )) ) {
 866         ORTE_ERROR_LOG(ret);
 867         exit_status = ret;
 868         goto cleanup;
 869     }
 870 
 871     if( orte_sstore_stage_enabled_caching ) {
 872         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->app_local_cache_location_fmt), 1, OPAL_STRING )) ) {
 873             ORTE_ERROR_LOG(ret);
 874             exit_status = ret;
 875             goto cleanup;
 876         }
 877     }
 878 
 879     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->migrating), 1, OPAL_BOOL )) ) {
 880         ORTE_ERROR_LOG(ret);
 881         exit_status = ret;
 882         goto cleanup;
 883     }
 884 
 885  cleanup:
 886     return exit_status;
 887 }
 888 
 889 int orte_sstore_stage_global_unpack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t *handle)
 890 {
 891     int ret, exit_status = ORTE_SUCCESS;
 892 
 893     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
 894                          "sstore:stage:(global): unpack()"));
 895 
 896     /*
 897      * Unpack the handle id
 898      */
 899     if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_JOBID,
 900                                                    ORTE_PROC_MY_NAME,
 901                                                    peer)) {
 902         /*
 903          * Differ to the orted version, so if we have application then they get updated too
 904          */
 905         if( ORTE_SUCCESS != (ret = orte_sstore_stage_local_unpack(peer, buffer, handle)) ) {
 906             ORTE_ERROR_LOG(ret);
 907             exit_status = ret;
 908             goto cleanup;
 909         }
 910     }
 911 
 912  cleanup:
 913     return exit_status;
 914 }
 915 
 916 /**************************
 917  * Local functions
 918  **************************/
 919 static orte_sstore_stage_global_snapshot_info_t *create_new_handle_info(int seq, int type, orte_jobid_t jobid)
 920 {
 921     orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
 922     orte_job_t *jdata = NULL;
 923 
 924     handle_info = OBJ_NEW(orte_sstore_stage_global_snapshot_info_t);
 925 
 926     handle_info->jobid = jobid;
 927 
 928     handle_info->state = SSTORE_GLOBAL_INIT;
 929 
 930     handle_info->handle_type = type;
 931 
 932     handle_info->seq_num = seq;
 933 
 934     orte_sstore_base_get_global_snapshot_ref(&(handle_info->ref_name), getpid());
 935 
 936     opal_asprintf(&(handle_info->local_location), "%s/%d",
 937              handle_info->ref_name, handle_info->seq_num);
 938 
 939     /* This is used by the application to establish the local directory */
 940     opal_asprintf(&(handle_info->app_local_location_fmt), "%s/%s/%s/%s",
 941              orte_sstore_stage_local_snapshot_dir,
 942              ORTE_SSTORE_LOCAL_SNAPSHOT_DIR_NAME,
 943              ORTE_SSTORE_LOCAL_SNAPSHOT_STAGE_DIR_NAME,
 944              orte_sstore_base_local_snapshot_fmt);
 945 
 946     if( orte_sstore_stage_enabled_caching ) {
 947         opal_asprintf(&(handle_info->app_local_cache_location_fmt), "%s/%s/%s/%d/%s",
 948                  orte_sstore_stage_local_snapshot_dir,
 949                  ORTE_SSTORE_LOCAL_SNAPSHOT_DIR_NAME,
 950                  ORTE_SSTORE_LOCAL_SNAPSHOT_CACHE_DIR_NAME,
 951                  handle_info->seq_num,
 952                  orte_sstore_base_local_snapshot_fmt);
 953     }
 954 
 955     /* This is used by the HNP to remember where it should place each process */
 956     opal_asprintf(&(handle_info->app_global_location_fmt), "%s/%s/%s",
 957              handle_info->base_location,
 958              handle_info->local_location,
 959              orte_sstore_base_local_snapshot_fmt);
 960 
 961     opal_asprintf(&(handle_info->metadata_filename), "%s/%s/%s",
 962              handle_info->base_location,
 963              handle_info->ref_name,
 964              orte_sstore_base_global_metadata_filename);
 965 
 966     jdata = orte_get_job_data_object(handle_info->jobid);
 967     handle_info->num_procs_total = (int)jdata->num_procs;
 968 
 969     opal_list_append(active_handles, &(handle_info->super));
 970 
 971     return handle_info;
 972 }
 973 
 974 static orte_sstore_stage_global_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle)
 975 {
 976     orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
 977     opal_list_item_t* item = NULL;
 978 
 979     for(item  = opal_list_get_first(active_handles);
 980         item != opal_list_get_end(active_handles);
 981         item  = opal_list_get_next(item) ) {
 982         handle_info = (orte_sstore_stage_global_snapshot_info_t*)item;
 983 
 984         if( handle_info->id == handle ) {
 985             return handle_info;
 986         }
 987     }
 988 
 989     return NULL;
 990 }
 991 
 992 static int sstore_stage_global_start_listener(void)
 993 {
 994     if( is_global_listener_active ) {
 995         return ORTE_SUCCESS;
 996     }
 997 
 998     orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL,
 999                             ORTE_RML_PERSISTENT, sstore_stage_global_recv, NULL);
1000 
1001     is_global_listener_active = true;
1002     return ORTE_SUCCESS;
1003 }
1004 
1005 static int sstore_stage_global_stop_listener(void)
1006 {
1007     orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL);
1008 
1009     is_global_listener_active = false;
1010     return ORTE_SUCCESS;
1011 }
1012 
1013 static void sstore_stage_global_recv(int status,
1014                                        orte_process_name_t* sender,
1015                                        opal_buffer_t* buffer,
1016                                        orte_rml_tag_t tag,
1017                                        void* cbdata)
1018 {
1019     int ret;
1020     orte_sstore_stage_cmd_flag_t command;
1021     orte_std_cntr_t count;
1022     orte_sstore_base_handle_t loc_id;
1023     orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
1024 
1025     if( ORTE_RML_TAG_SSTORE_INTERNAL != tag ) {
1026         return;
1027     }
1028 
1029     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1030                          "sstore:stage:(global): process_cmd(%s)",
1031                          ORTE_NAME_PRINT(sender)));
1032 
1033     count = 1;
1034     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SSTORE_STAGE_CMD))) {
1035         ORTE_ERROR_LOG(ret);
1036         goto cleanup;
1037     }
1038 
1039     count = 1;
1040     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &loc_id, &count, ORTE_SSTORE_HANDLE )) ) {
1041         ORTE_ERROR_LOG(ret);
1042         goto cleanup;
1043     }
1044 
1045     /*
1046      * If this was an application process contacting us, then act like an orted
1047      * instead of an HNP
1048      */
1049     if(OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_JOBID,
1050                                                    ORTE_PROC_MY_NAME,
1051                                                    sender)) {
1052 
1053         OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1054                              "sstore:stage:(local): process_cmd(%s)",
1055                              ORTE_NAME_PRINT(sender)));
1056 
1057         orte_sstore_stage_local_process_cmd_action(sender, command, loc_id, buffer);
1058         return;
1059     }
1060 
1061     /*
1062      * Find the referenced handle
1063      */
1064     if(NULL == (handle_info = find_handle_info(loc_id)) ) {
1065         ; /* JJH big problem */
1066     }
1067 
1068     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1069                          "sstore:stage:(global): process_cmd(%s) - Command = %s",
1070                          ORTE_NAME_PRINT(sender),
1071                          (ORTE_SSTORE_STAGE_PULL == command ? "Pull" :
1072                           (ORTE_SSTORE_STAGE_PUSH == command ? "Push" :
1073                            (ORTE_SSTORE_STAGE_REMOVE == command ? "Remove" :
1074                             (ORTE_SSTORE_STAGE_DONE == command ? "Done" : "Unknown")))) ));
1075 
1076     /*
1077      * Process the command
1078      */
1079     if( ORTE_SSTORE_STAGE_PULL == command ) {
1080         process_local_pull(sender, buffer, handle_info);
1081     }
1082     else if( ORTE_SSTORE_STAGE_PUSH == command ) {
1083         process_local_push(sender, buffer, handle_info);
1084     }
1085     else if( ORTE_SSTORE_STAGE_REMOVE == command ) {
1086         /* This is actually intended for the local coordinator */
1087         orte_sstore_stage_local_process_cmd_action(sender, command, loc_id, buffer);
1088     }
1089     else if( ORTE_SSTORE_STAGE_DONE == command ) {
1090         process_local_done(sender, buffer, handle_info);
1091     }
1092 
1093  cleanup:
1094     return;
1095 }
1096 
1097 static int process_local_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_global_snapshot_info_t *handle_info)
1098 {
1099     int ret, exit_status = ORTE_SUCCESS;
1100     opal_buffer_t *loc_buffer = NULL;
1101     orte_sstore_stage_cmd_flag_t command;
1102 
1103     /*
1104      * Push back the requested information
1105      */
1106     loc_buffer = OBJ_NEW(opal_buffer_t);
1107 
1108     command = ORTE_SSTORE_STAGE_PUSH;
1109     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
1110         ORTE_ERROR_LOG(ret);
1111         exit_status = ret;
1112         goto cleanup;
1113     }
1114 
1115     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
1116         ORTE_ERROR_LOG(ret);
1117         exit_status = ret;
1118         goto cleanup;
1119     }
1120 
1121     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->seq_num), 1, OPAL_INT))) {
1122         ORTE_ERROR_LOG(ret);
1123         exit_status = ret;
1124         goto cleanup;
1125     }
1126 
1127     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->ref_name), 1, OPAL_STRING))) {
1128         ORTE_ERROR_LOG(ret);
1129         exit_status = ret;
1130         goto cleanup;
1131     }
1132 
1133     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->app_local_location_fmt), 1, OPAL_STRING))) {
1134         ORTE_ERROR_LOG(ret);
1135         exit_status = ret;
1136         goto cleanup;
1137     }
1138 
1139     if( orte_sstore_stage_enabled_caching ) {
1140         if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->app_local_cache_location_fmt), 1, OPAL_STRING))) {
1141             ORTE_ERROR_LOG(ret);
1142             exit_status = ret;
1143             goto cleanup;
1144         }
1145     }
1146 
1147     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->migrating), 1, OPAL_BOOL))) {
1148         ORTE_ERROR_LOG(ret);
1149         exit_status = ret;
1150         goto cleanup;
1151     }
1152 
1153     if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(peer, loc_buffer, ORTE_RML_TAG_SSTORE_INTERNAL,
1154                                                        orte_rml_send_callback, NULL))) {
1155         ORTE_ERROR_LOG(ret);
1156         exit_status = ret;
1157         goto cleanup;
1158     }
1159     /* loc_buffer should not be released here; the callback releases it */
1160     loc_buffer = NULL;
1161 
1162  cleanup:
1163     if (NULL != loc_buffer) {
1164         OBJ_RELEASE(loc_buffer);
1165         loc_buffer = NULL;
1166     }
1167 
1168     return exit_status;
1169 }
1170 
1171 static int process_local_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_global_snapshot_info_t *handle_info)
1172 {
1173     int ret, exit_status = ORTE_SUCCESS;
1174     orte_std_cntr_t count;
1175     size_t num_entries, i;
1176     orte_process_name_t name;
1177     bool ckpt_skipped = false;
1178     char * crs_comp = NULL;
1179     char * compress_comp = NULL;
1180     char * compress_postfix = NULL;
1181     char * proc_name = NULL;
1182     char * tmp_str = NULL;
1183     orte_filem_base_request_t *filem_request = NULL;
1184     orte_filem_base_process_set_t *p_set = NULL;
1185     orte_filem_base_file_set_t * f_set = NULL;
1186 
1187     if( !orte_sstore_stage_skip_filem ) {
1188         filem_request = OBJ_NEW(orte_filem_base_request_t);
1189         /*
1190          * Define the process set:
1191          * Source (daemon) -> Sink (HNP)
1192          */
1193         p_set = OBJ_NEW(orte_filem_base_process_set_t);
1194         p_set->source.jobid = peer->jobid;
1195         p_set->source.vpid  = peer->vpid;
1196         p_set->sink.jobid   = ORTE_PROC_MY_NAME->jobid;
1197         p_set->sink.vpid    = ORTE_PROC_MY_NAME->vpid;
1198         opal_list_append(&(filem_request->process_sets), &(p_set->super) );
1199     }
1200 
1201     /*
1202      * Unpack the data
1203      */
1204     count = 1;
1205     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &num_entries, &count, OPAL_SIZE))) {
1206         ORTE_ERROR_LOG(ret);
1207         exit_status = ret;
1208         goto cleanup;
1209     }
1210 
1211     for(i = 0; i < num_entries; ++i ) {
1212         count = 1;
1213         if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &name, &count, ORTE_NAME))) {
1214             ORTE_ERROR_LOG(ret);
1215             exit_status = ret;
1216             goto cleanup;
1217         }
1218 
1219         count = 1;
1220         if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &ckpt_skipped, &count, OPAL_BOOL))) {
1221             ORTE_ERROR_LOG(ret);
1222             exit_status = ret;
1223             goto cleanup;
1224         }
1225 
1226         if( !ckpt_skipped ) {
1227             count = 1;
1228             if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &crs_comp, &count, OPAL_STRING))) {
1229                 ORTE_ERROR_LOG(ret);
1230                 exit_status = ret;
1231                 goto cleanup;
1232             }
1233 
1234             if( orte_sstore_stage_enabled_compression ) {
1235                 count = 1;
1236                 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &compress_comp, &count, OPAL_STRING))) {
1237                     ORTE_ERROR_LOG(ret);
1238                     exit_status = ret;
1239                     goto cleanup;
1240                 }
1241                 if( NULL == handle_info->compress_comp ) {
1242                     handle_info->compress_comp = strdup(compress_comp);
1243                 }
1244 
1245                 count = 1;
1246                 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &compress_postfix, &count, OPAL_STRING))) {
1247                     ORTE_ERROR_LOG(ret);
1248                     exit_status = ret;
1249                     goto cleanup;
1250                 }
1251                 if( NULL == handle_info->compress_postfix ) {
1252                     handle_info->compress_postfix = strdup(compress_postfix);
1253                 }
1254             }
1255 
1256             if( !orte_sstore_stage_skip_filem ) {
1257                 /*
1258                  * Append to the file set for movement
1259                  */
1260                 f_set = OBJ_NEW(orte_filem_base_file_set_t);
1261                 if( orte_sstore_stage_enabled_compression ) {
1262                     f_set->target_flag   = ORTE_FILEM_TYPE_FILE;
1263                 } else {
1264                     f_set->target_flag   = ORTE_FILEM_TYPE_DIR;
1265                 }
1266 
1267                 if( orte_sstore_stage_enabled_compression ) {
1268                     opal_asprintf(&tmp_str,
1269                              handle_info->app_global_location_fmt,
1270                              name.vpid);
1271                     opal_asprintf(&(f_set->local_target), "%s%s",
1272                              tmp_str,
1273                              compress_postfix);
1274                 } else {
1275                     opal_asprintf(&(f_set->local_target),
1276                              handle_info->app_global_location_fmt,
1277                              name.vpid);
1278                 }
1279 
1280                 if( orte_sstore_stage_global_is_shared ) {
1281                     f_set->local_hint = ORTE_FILEM_HINT_SHARED;
1282                 }
1283 
1284                 if( orte_sstore_stage_enabled_compression ) {
1285                     opal_asprintf(&tmp_str,
1286                              handle_info->app_local_location_fmt,
1287                              name.vpid);
1288                     opal_asprintf(&(f_set->remote_target), "%s%s",
1289                              tmp_str,
1290                              compress_postfix);
1291                 } else {
1292                     opal_asprintf(&(f_set->remote_target),
1293                              handle_info->app_local_location_fmt,
1294                              name.vpid);
1295                 }
1296 
1297                 opal_list_append(&(filem_request->file_sets), &(f_set->super) );
1298 
1299                 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1300                                      "sstore:stage:(global): push(): Pulling remote file <%s> to <%s>",
1301                                      f_set->remote_target,
1302                                      f_set->local_target));
1303             }
1304 
1305             /*
1306              * Write this information to the global metadata
1307              */
1308             orte_util_convert_process_name_to_string(&proc_name, &name);
1309 
1310             metadata_write_str(handle_info,
1311                                SSTORE_METADATA_INTERNAL_PROCESS_STR,
1312                                proc_name);
1313             metadata_write_str(handle_info,
1314                                SSTORE_METADATA_LOCAL_CRS_COMP_STR,
1315                                crs_comp);
1316             if( orte_sstore_stage_enabled_compression ) {
1317                 metadata_write_str(handle_info,
1318                                    SSTORE_METADATA_LOCAL_COMPRESS_COMP_STR,
1319                                    compress_comp);
1320                 metadata_write_str(handle_info,
1321                                    SSTORE_METADATA_LOCAL_COMPRESS_POSTFIX_STR,
1322                                    compress_postfix);
1323             }
1324         }
1325 
1326         if( NULL != crs_comp ) {
1327             free(crs_comp);
1328             crs_comp = NULL;
1329         }
1330         if( NULL != compress_comp ) {
1331             free(compress_comp);
1332             compress_comp = NULL;
1333         }
1334         if( NULL != compress_postfix ) {
1335             free(compress_postfix);
1336             compress_postfix = NULL;
1337         }
1338         if( NULL != proc_name ) {
1339             free(proc_name);
1340             proc_name = NULL;
1341         }
1342         if( NULL != tmp_str ) {
1343             free(tmp_str);
1344             tmp_str = NULL;
1345         }
1346 
1347         (handle_info->num_procs_synced)++;
1348     }
1349 
1350     if( !orte_sstore_stage_skip_filem && 0 < opal_list_get_size(&(filem_request->file_sets)) ) {
1351         /*
1352          * Start to pull the files to global storage
1353          */
1354         OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1355                              "sstore:stage:(global): push(): Pulling remote files from %s (%3d of %3d done)",
1356                              ORTE_NAME_PRINT(peer),
1357                              handle_info->num_procs_synced,
1358                              handle_info->num_procs_total));
1359         opal_list_append(handle_info->filem_requests, &(filem_request->super));
1360         if(ORTE_SUCCESS != (ret = orte_filem.get_nb(filem_request)) ) {
1361             ORTE_ERROR_LOG(ret);
1362             exit_status = ret;
1363             goto cleanup;
1364         }
1365     }
1366 
1367  cleanup:
1368     if( NULL != crs_comp ) {
1369         free(crs_comp);
1370         crs_comp = NULL;
1371     }
1372     if( NULL != compress_comp ) {
1373         free(compress_comp);
1374         compress_comp = NULL;
1375     }
1376     if( NULL != compress_postfix ) {
1377         free(compress_postfix);
1378         compress_postfix = NULL;
1379     }
1380     if( NULL != proc_name ) {
1381         free(proc_name);
1382         proc_name = NULL;
1383     }
1384     if( NULL != tmp_str ) {
1385         free(tmp_str);
1386         tmp_str = NULL;
1387     }
1388 
1389     return exit_status;
1390 }
1391 
1392 static int process_local_done(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_global_snapshot_info_t *handle_info)
1393 {
1394     int ret, exit_status = ORTE_SUCCESS;
1395     orte_std_cntr_t count;
1396     size_t num_entries;
1397 
1398     /*
1399      * Unpack the data
1400      */
1401     count = 1;
1402     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &num_entries, &count, OPAL_SIZE))) {
1403         ORTE_ERROR_LOG(ret);
1404         exit_status = ret;
1405         goto cleanup;
1406     }
1407 
1408     (handle_info->num_procs_done) += (int)num_entries;
1409 
1410     SSTORE_STAGE_REPORT_PROGRESS(handle_info);
1411 
1412     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1413                          "sstore:stage:(global): done(): [Peer %s] Moved %d files (%3d of %3d reported as done)",
1414                          ORTE_NAME_PRINT(peer),
1415                          (int)num_entries,
1416                          handle_info->num_procs_done,
1417                          handle_info->num_procs_total));
1418 
1419  cleanup:
1420     return exit_status;
1421 }
1422 
1423 static int init_global_snapshot_directory(orte_sstore_stage_global_snapshot_info_t *handle_info)
1424 {
1425     int ret, exit_status = ORTE_SUCCESS;
1426     char * dir_name = NULL;
1427     mode_t my_mode = S_IRWXU;
1428 
1429     /*
1430      * Make the snapshot directory from the uniq_global_snapshot_name
1431      */
1432     opal_asprintf(&dir_name, "%s/%s",
1433              handle_info->base_location,
1434              handle_info->local_location);
1435     if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(dir_name, my_mode)) ) {
1436         ORTE_ERROR_LOG(ret);
1437         exit_status = ret;
1438         goto cleanup;
1439     }
1440 
1441     /*
1442      * Open up the metadata file
1443      */
1444     if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
1445         ORTE_ERROR_LOG(ret);
1446         exit_status = ret;
1447         goto cleanup;
1448     }
1449 
1450  cleanup:
1451     if(NULL != dir_name) {
1452         free(dir_name);
1453         dir_name = NULL;
1454     }
1455 
1456     return exit_status;
1457 }
1458 
1459 static int wait_all_filem(orte_sstore_stage_global_snapshot_info_t *handle_info)
1460 {
1461     int ret, exit_status = ORTE_SUCCESS;
1462     opal_list_item_t* item = NULL;
1463 
1464     if( orte_sstore_stage_skip_filem ) {
1465         return exit_status;
1466     }
1467 
1468     /*
1469      * Wait for all the transfers to complete
1470      */
1471     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1472                          "sstore:stage:(global): wait_all_filem(): Waiting on all outstanding FileM requests (%d)",
1473                          (int)opal_list_get_size(handle_info->filem_requests) ));
1474 
1475     if(ORTE_SUCCESS != (ret = orte_filem.wait_all(handle_info->filem_requests)) ) {
1476         ORTE_ERROR_LOG(ret);
1477         exit_status = ret;
1478         goto cleanup;
1479     }
1480 
1481     /*
1482      * Remove the data on the remote side
1483      */
1484     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1485                          "sstore:stage:(global): wait_all_filem(): Removing all local files"));
1486     if( ORTE_SUCCESS != (ret = xcast_remove_all(handle_info))) {
1487         ORTE_ERROR_LOG(ret);
1488         exit_status = ret;
1489         goto cleanup;
1490     }
1491 
1492     /*
1493      * Touch all local checkpoints
1494      */
1495     sync_global_dir(handle_info);
1496 
1497     /*
1498      * Wait for the removal to complete
1499      */
1500     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1501                          "sstore:stage:(global): wait_all_filem(): Waiting for remove to finish..."));
1502     while(handle_info->num_procs_done < handle_info->num_procs_total) {
1503         opal_progress();
1504     }
1505 
1506     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1507                          "sstore:stage:(global): wait_all_filem(): All files have been transfered"));
1508 
1509  cleanup:
1510     while (NULL != (item = opal_list_remove_first(handle_info->filem_requests) ) ) {
1511         OBJ_RELEASE(item);
1512     }
1513     OBJ_DESTRUCT(handle_info->filem_requests);
1514 
1515     return exit_status;
1516 }
1517 
1518 static int xcast_remove_all(orte_sstore_stage_global_snapshot_info_t *handle_info)
1519 {
1520     int ret, exit_status = ORTE_SUCCESS;
1521     opal_buffer_t *loc_buffer = NULL;
1522     orte_sstore_stage_cmd_flag_t command;
1523     orte_grpcomm_signature_t *sig;
1524 
1525     handle_info->num_procs_done = 0;
1526 
1527     loc_buffer = OBJ_NEW(opal_buffer_t);
1528 
1529     command = ORTE_SSTORE_STAGE_REMOVE;
1530     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
1531         ORTE_ERROR_LOG(ret);
1532         exit_status = ret;
1533         goto cleanup;
1534     }
1535 
1536     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
1537         ORTE_ERROR_LOG(ret);
1538         exit_status = ret;
1539         goto cleanup;
1540     }
1541 
1542     /* goes to all daemons */
1543     sig = OBJ_NEW(orte_grpcomm_signature_t);
1544     sig->signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t));
1545     sig->signature[0].jobid = ORTE_PROC_MY_NAME->jobid;
1546     sig->signature[0].vpid = ORTE_VPID_WILDCARD;
1547     if (ORTE_SUCCESS != (ret = orte_grpcomm.xcast(sig, ORTE_RML_TAG_SSTORE_INTERNAL, loc_buffer))) {
1548         ORTE_ERROR_LOG(ret);
1549         exit_status = ret;
1550         goto cleanup;
1551     }
1552 
1553     /* loc_buffer should not be released here; the callback releases it */
1554     loc_buffer = NULL;
1555 
1556  cleanup:
1557     if (NULL != loc_buffer) {
1558         OBJ_RELEASE(loc_buffer);
1559         loc_buffer = NULL;
1560     }
1561 
1562     OBJ_RELEASE(sig);
1563 
1564     return exit_status;
1565 }
1566 
1567 /**************************
1568  * Metadata functions
1569  **************************/
1570 static int metadata_open(orte_sstore_stage_global_snapshot_info_t * handle_info)
1571 {
1572     /* If already open, then just return */
1573     if( NULL != handle_info->metadata ) {
1574         return ORTE_SUCCESS;
1575     }
1576 
1577     if (NULL == (handle_info->metadata = fopen(handle_info->metadata_filename, "a")) ) {
1578         opal_output(orte_sstore_base_framework.framework_output,
1579                     "sstore:stage:(global):init_dir() Unable to open the file (%s)\n",
1580                     handle_info->metadata_filename);
1581         ORTE_ERROR_LOG(ORTE_ERROR);
1582         return ORTE_ERROR;
1583    }
1584 
1585    return ORTE_SUCCESS;
1586 }
1587 
1588 static int metadata_close(orte_sstore_stage_global_snapshot_info_t * handle_info)
1589 {
1590     /* If already closed, then just return */
1591     if( NULL == handle_info->metadata ) {
1592         return ORTE_SUCCESS;
1593     }
1594 
1595     fclose(handle_info->metadata);
1596     handle_info->metadata = NULL;
1597 
1598     return ORTE_SUCCESS;
1599 }
1600 
1601 static int metadata_write_int(orte_sstore_stage_global_snapshot_info_t * handle_info, char *key, int value)
1602 {
1603     int ret, exit_status = ORTE_SUCCESS;
1604 
1605     /* Make sure the metadata file is open */
1606     if( NULL == handle_info->metadata ) {
1607         if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
1608             ORTE_ERROR_LOG(ret);
1609             exit_status = ret;
1610             goto cleanup;
1611         }
1612     }
1613 
1614     fprintf(handle_info->metadata, "%s%d\n", key, value);
1615 
1616  cleanup:
1617     return exit_status;
1618 }
1619 
1620 static int metadata_write_str(orte_sstore_stage_global_snapshot_info_t * handle_info, char *key, char *value)
1621 {
1622     int ret, exit_status = ORTE_SUCCESS;
1623 
1624     /* Make sure the metadata file is open */
1625     if( NULL == handle_info->metadata ) {
1626         if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
1627             ORTE_ERROR_LOG(ret);
1628             exit_status = ret;
1629             goto cleanup;
1630         }
1631     }
1632 
1633     fprintf(handle_info->metadata, "%s%s\n", key, value);
1634 
1635  cleanup:
1636     return exit_status;
1637 }
1638 
1639 static int metadata_write_timestamp(orte_sstore_stage_global_snapshot_info_t * handle_info)
1640 {
1641     int ret, exit_status = ORTE_SUCCESS;
1642     time_t timestamp;
1643 
1644     /* Make sure the metadata file is open */
1645     if( NULL == handle_info->metadata ) {
1646         if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
1647             ORTE_ERROR_LOG(ret);
1648             exit_status = ret;
1649             goto cleanup;
1650         }
1651     }
1652 
1653     timestamp = time(NULL);
1654     fprintf(handle_info->metadata, "%s%s",
1655             SSTORE_METADATA_INTERNAL_TIME_STR,
1656             ctime(&timestamp));
1657 
1658  cleanup:
1659     return exit_status;
1660 }
1661 
1662 static int orte_sstore_stage_extract_global_metadata(orte_sstore_stage_global_snapshot_info_t * handle_info,
1663                                                      orte_sstore_base_global_snapshot_info_t *global_snapshot)
1664 {
1665     int exit_status = ORTE_SUCCESS;
1666     orte_sstore_base_local_snapshot_info_t *vpid_snapshot = NULL;
1667     opal_list_item_t* item = NULL;
1668     int i = 0;
1669 
1670     /*
1671      * Cleanup the structure a bit, so we can refresh it below
1672      */
1673     while (NULL != (item = opal_list_remove_first(&global_snapshot->local_snapshots))) {
1674         OBJ_RELEASE(item);
1675     }
1676 
1677     if( NULL != global_snapshot->start_time ) {
1678         free( global_snapshot->start_time );
1679         global_snapshot->start_time = NULL;
1680     }
1681 
1682     if( NULL != global_snapshot->end_time ) {
1683         free( global_snapshot->end_time );
1684         global_snapshot->end_time = NULL;
1685     }
1686 
1687     /*
1688      * Create a structure for each application process
1689      */
1690     for(i = 0; i < handle_info->num_procs_total; ++i) {
1691         vpid_snapshot = OBJ_NEW(orte_sstore_base_local_snapshot_info_t);
1692         vpid_snapshot->ss_handle = handle_info->id;
1693 
1694         vpid_snapshot->process_name.jobid  = handle_info->jobid;
1695         vpid_snapshot->process_name.vpid   = i;
1696 
1697         /* JJH: Currently we do not have this information since we do not save
1698          * individual vpid info in the Global SStore. It is in the metadata
1699          * though.
1700          */
1701         vpid_snapshot->crs_comp      = NULL;
1702         if( NULL != handle_info->compress_comp ) {
1703             vpid_snapshot->compress_comp = strdup(handle_info->compress_comp);
1704         } else {
1705             vpid_snapshot->compress_comp = NULL;
1706         }
1707         if( NULL != handle_info->compress_postfix ) {
1708             vpid_snapshot->compress_postfix = strdup(handle_info->compress_postfix);
1709         } else {
1710             vpid_snapshot->compress_postfix = NULL;
1711         }
1712         vpid_snapshot->start_time = NULL;
1713         vpid_snapshot->end_time   = NULL;
1714 
1715         opal_list_append(&global_snapshot->local_snapshots, &(vpid_snapshot->super));
1716     }
1717 
1718     return exit_status;
1719 }
1720 
1721 static int stage_snapshot_sort_compare_fn(opal_list_item_t **a,
1722                                           opal_list_item_t **b)
1723 {
1724     orte_sstore_base_local_snapshot_info_t *snap_a, *snap_b;
1725 
1726     snap_a = (orte_sstore_base_local_snapshot_info_t*)(*a);
1727     snap_b = (orte_sstore_base_local_snapshot_info_t*)(*b);
1728 
1729     if( snap_a->process_name.vpid > snap_b->process_name.vpid ) {
1730         return 1;
1731     }
1732     else if( snap_a->process_name.vpid == snap_b->process_name.vpid ) {
1733         return 0;
1734     }
1735     else {
1736         return -1;
1737     }
1738 }
1739 
1740 static void sstore_stage_report_progress(orte_sstore_stage_global_snapshot_info_t *handle_info)
1741 {
1742     double perc_done;
1743 
1744     perc_done = (handle_info->num_procs_total - handle_info->num_procs_done);
1745     perc_done = perc_done / (1.0 * handle_info->num_procs_total);
1746     perc_done = (perc_done-1)*(-100.0);
1747 
1748     if( perc_done >= (handle_info->last_progress_report + orte_sstore_stage_progress_meter ) ||
1749         handle_info->last_progress_report == 0.0 ) {
1750         handle_info->last_progress_report = perc_done;
1751         opal_output(0, "sstore:stage: progress: %10.2f %c Finished\n",
1752                     perc_done, '%');
1753     }
1754 
1755     return;
1756 }

/* [<][>][^][v][top][bottom][index][help] */