root/orte/mca/sstore/stage/sstore_stage_local.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. orte_sstore_stage_local_snapshot_info_construct
  2. orte_sstore_stage_local_snapshot_info_destruct
  3. orte_sstore_stage_local_app_snapshot_info_construct
  4. orte_sstore_stage_local_app_snapshot_info_destruct
  5. orte_sstore_stage_local_module_init
  6. orte_sstore_stage_local_module_finalize
  7. orte_sstore_stage_local_request_checkpoint_handle
  8. orte_sstore_stage_local_register
  9. orte_sstore_stage_local_get_attr
  10. orte_sstore_stage_local_set_attr
  11. orte_sstore_stage_local_sync
  12. orte_sstore_stage_local_remove
  13. orte_sstore_stage_local_pack
  14. orte_sstore_stage_local_unpack
  15. orte_sstore_stage_local_fetch_app_deps
  16. orte_sstore_stage_local_wait_all_deps
  17. create_new_handle_info
  18. find_handle_info
  19. find_handle_info_ref
  20. append_new_app_handle_info
  21. find_app_handle_info
  22. sstore_stage_local_start_listener
  23. sstore_stage_local_stop_listener
  24. sstore_stage_local_recv
  25. orte_sstore_stage_local_process_cmd_action
  26. process_global_pull
  27. process_global_push
  28. process_global_remove
  29. process_app_pull
  30. process_app_push
  31. wait_all_apps_updated
  32. start_compression
  33. sstore_stage_local_compress_waitpid_cb
  34. wait_all_compressed
  35. pull_handle_info
  36. push_handle_info
  37. sstore_stage_create_local_dir
  38. sstore_stage_destroy_local_dir
  39. sstore_stage_create_cache
  40. sstore_stage_destroy_cache
  41. sstore_stage_update_cache
  42. orte_sstore_stage_local_preload_files

   1 /*
   2  * Copyright (c) 2010      The Trustees of Indiana University.
   3  *                         All rights reserved.
   4  * Copyright (c) 2004-2011 The University of Tennessee and The University
   5  *                         of Tennessee Research Foundation.  All rights
   6  *                         reserved.
   7  * Copyright (c) 2017      Research Organization for Information Science
   8  *                         and Technology (RIST). All rights reserved.
   9  * Copyright (c) 2018      Intel, Inc.  All rights reserved.
  10  * $COPYRIGHT$
  11  *
  12  * Additional copyrights may follow
  13  *
  14  * $HEADER$
  15  */
  16 
  17 /*
  18  *
  19  */
  20 
  21 #include "orte_config.h"
  22 
  23 #include <string.h>
  24 #include <stdlib.h>
  25 #include <sys/types.h>
  26 #include <sys/stat.h>
  27 #include <sys/wait.h>
  28 #ifdef HAVE_UNISTD_H
  29 #include <unistd.h>
  30 #endif  /* HAVE_UNISTD_H */
  31 
  32 #include "orte/mca/mca.h"
  33 #include "opal/mca/base/base.h"
  34 
  35 #include "opal/mca/event/event.h"
  36 
  37 #include "orte/constants.h"
  38 #include "orte/util/show_help.h"
  39 #include "opal/util/argv.h"
  40 #include "opal/util/output.h"
  41 #include "opal/util/opal_environ.h"
  42 #include "opal/util/basename.h"
  43 #include "opal/util/os_dirpath.h"
  44 
  45 #include "opal/mca/compress/compress.h"
  46 #include "opal/mca/compress/base/base.h"
  47 
  48 #include "opal/threads/mutex.h"
  49 #include "opal/threads/condition.h"
  50 
  51 #include "orte/util/name_fns.h"
  52 #include "orte/util/proc_info.h"
  53 #include "orte/runtime/orte_globals.h"
  54 #include "orte/runtime/orte_wait.h"
  55 #include "orte/mca/errmgr/errmgr.h"
  56 #include "orte/mca/rml/rml.h"
  57 #include "orte/mca/rml/rml_types.h"
  58 #include "orte/mca/odls/odls_types.h"
  59 #include "orte/mca/filem/filem.h"
  60 #include "orte/mca/filem/base/base.h"
  61 
  62 #include "orte/mca/sstore/sstore.h"
  63 #include "orte/mca/sstore/base/base.h"
  64 
  65 #include "sstore_stage.h"
  66 
  67 /**********
  68  * Object stuff
  69  **********/
  70 #define SSTORE_LOCAL_NONE   0
  71 #define SSTORE_LOCAL_ERROR  1
  72 #define SSTORE_LOCAL_INIT   2
  73 #define SSTORE_LOCAL_READY  3
  74 #define SSTORE_LOCAL_SYNCED 4
  75 #define SSTORE_LOCAL_DONE   5
  76 
  77 struct  orte_sstore_stage_local_snapshot_info_t {
  78     /** List super object */
  79     opal_list_item_t super;
  80 
  81     /** */
  82     orte_sstore_base_handle_t id;
  83 
  84     /** Status */
  85     int status;
  86 
  87     /** Sequence Number */
  88     int seq_num;
  89 
  90     /** Global Reference Name */
  91     char * global_ref_name;
  92 
  93     /** Local Location Format String */
  94     char * location_fmt;
  95 
  96     /** Local Cache Location Format String */
  97     char * cache_location_fmt;
  98 
  99     /* Application info handles*/
 100     opal_list_t *app_info_handle;
 101 
 102     /** Compress Component used */
 103     char * compress_comp;
 104 
 105     /** Compress Component postfix */
 106     char * compress_postfix;
 107 
 108     /** Is this checkpoint representing a migration? */
 109     bool migrating;
 110 };
 111 typedef struct orte_sstore_stage_local_snapshot_info_t orte_sstore_stage_local_snapshot_info_t;
 112 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_sstore_stage_local_snapshot_info_t);
 113 
 114 void orte_sstore_stage_local_snapshot_info_construct(orte_sstore_stage_local_snapshot_info_t *info);
 115 void orte_sstore_stage_local_snapshot_info_destruct( orte_sstore_stage_local_snapshot_info_t *info);
 116 
 117 OBJ_CLASS_INSTANCE(orte_sstore_stage_local_snapshot_info_t,
 118                    opal_list_item_t,
 119                    orte_sstore_stage_local_snapshot_info_construct,
 120                    orte_sstore_stage_local_snapshot_info_destruct);
 121 
 122 struct  orte_sstore_stage_local_app_snapshot_info_t {
 123     /** List super object */
 124     opal_list_item_t super;
 125 
 126     /** Process Name associated with this entry */
 127     orte_process_name_t name;
 128 
 129     /** Local Location (Absolute Path) */
 130     char * local_location;
 131 
 132     /** Compressed Local Location (Absolute Path) */
 133     char * compressed_local_location;
 134 
 135     /** Local Cache Location (Absolute Path) */
 136     char * local_cache_location;
 137 
 138     /** Metadata File Name (Absolute Path) */
 139     char * metadata_filename;
 140 
 141     /** CRS Component used */
 142     char * crs_comp;
 143 
 144     /** If this app. skipped the checkpoint - usually for non-migrating procs */
 145     bool ckpt_skipped;
 146 
 147     /** Compression PID to wait on */
 148     pid_t compress_pid;
 149 };
 150 typedef struct orte_sstore_stage_local_app_snapshot_info_t orte_sstore_stage_local_app_snapshot_info_t;
 151 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_sstore_stage_local_app_snapshot_info_t);
 152 
 153 void orte_sstore_stage_local_app_snapshot_info_construct(orte_sstore_stage_local_app_snapshot_info_t *info);
 154 void orte_sstore_stage_local_app_snapshot_info_destruct( orte_sstore_stage_local_app_snapshot_info_t *info);
 155 
 156 OBJ_CLASS_INSTANCE(orte_sstore_stage_local_app_snapshot_info_t,
 157                    opal_list_item_t,
 158                    orte_sstore_stage_local_app_snapshot_info_construct,
 159                    orte_sstore_stage_local_app_snapshot_info_destruct);
 160 
 161 
 162 
 163 /**********
 164  * Local Function and Variable Declarations
 165  **********/
 166 static bool is_global_listener_active = false;
 167 static int sstore_stage_local_start_listener(void);
 168 static int sstore_stage_local_stop_listener(void);
 169 static void sstore_stage_local_recv(int status,
 170                                       orte_process_name_t* sender,
 171                                       opal_buffer_t* buffer,
 172                                       orte_rml_tag_t tag,
 173                                       void* cbdata);
 174 
 175 static int process_global_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info);
 176 static int process_global_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info);
 177 static int process_global_remove(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info);
 178 static int process_app_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info);
 179 static int process_app_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info);
 180 
 181 static orte_sstore_stage_local_snapshot_info_t *create_new_handle_info(orte_sstore_base_handle_t handle);
 182 static orte_sstore_stage_local_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle);
 183 static orte_sstore_stage_local_snapshot_info_t *find_handle_info_ref(char * ref, int seq);
 184 
 185 static int append_new_app_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info,
 186                                       orte_process_name_t *name);
 187 static orte_sstore_stage_local_app_snapshot_info_t *find_app_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info,
 188                                                                            orte_process_name_t *name);
 189 
 190 static int pull_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info );
 191 static int push_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info );
 192 
 193 static int wait_all_apps_updated(orte_sstore_stage_local_snapshot_info_t *handle_info);
 194 
 195 static int start_compression(orte_sstore_stage_local_snapshot_info_t *handle_info,
 196                              orte_sstore_stage_local_app_snapshot_info_t *app_info);
 197 static void sstore_stage_local_compress_waitpid_cb(orte_proc_t *proc, void* cbdata);
 198 static int wait_all_compressed(orte_sstore_stage_local_snapshot_info_t *handle_info);
 199 
 200 static int orte_sstore_stage_local_preload_files(char **local_location, bool *skip_xfer,
 201                                                  char *global_loc, char *ref, char *postfix, int seq);
 202 
 203 static int sstore_stage_create_local_dir(void);
 204 static int sstore_stage_destroy_local_dir(void);
 205 
 206 static int sstore_stage_create_cache(void);
 207 static int sstore_stage_update_cache(orte_sstore_stage_local_snapshot_info_t *handle_info);
 208 static int sstore_stage_destroy_cache(void);
 209 
 210 static opal_list_t *active_handles = NULL;
 211 static char * sstore_stage_local_basedir = NULL;
 212 
 213 static char * sstore_stage_cache_basedir = NULL;
 214 
 215 static char * sstore_stage_cache_current_dir = NULL;
 216 static char * sstore_stage_cache_last_dir    = NULL;
 217 
 218 static opal_list_t * preload_filem_requests = NULL;
 219 
 220 /**********
 221  * Object stuff
 222  **********/
 223 void orte_sstore_stage_local_snapshot_info_construct(orte_sstore_stage_local_snapshot_info_t *info)
 224 {
 225     info->id      = 0;
 226 
 227     info->status = SSTORE_LOCAL_NONE;
 228 
 229     info->seq_num = -1;
 230 
 231     info->global_ref_name = NULL;
 232 
 233     info->location_fmt    = NULL;
 234 
 235     info->cache_location_fmt    = NULL;
 236 
 237     info->app_info_handle = OBJ_NEW(opal_list_t);
 238 
 239     info->compress_comp = NULL;
 240 
 241     info->compress_postfix = NULL;
 242 
 243     info->migrating = false;
 244 }
 245 
 246 void orte_sstore_stage_local_snapshot_info_destruct( orte_sstore_stage_local_snapshot_info_t *info)
 247 {
 248     info->id      = 0;
 249 
 250     info->status = SSTORE_LOCAL_NONE;
 251 
 252     info->seq_num = -1;
 253 
 254     if( NULL != info->global_ref_name ) {
 255         free( info->global_ref_name );
 256         info->global_ref_name  = NULL;
 257     }
 258 
 259     if( NULL != info->location_fmt ) {
 260         free( info->location_fmt );
 261         info->location_fmt = NULL;
 262     }
 263 
 264     if( NULL != info->cache_location_fmt ) {
 265         free( info->cache_location_fmt );
 266         info->cache_location_fmt = NULL;
 267     }
 268 
 269     if( NULL != info->app_info_handle ) {
 270         OBJ_RELEASE(info->app_info_handle);
 271         info->app_info_handle = NULL;
 272     }
 273 
 274     if( NULL != info->compress_comp ) {
 275         free(info->compress_comp);
 276         info->compress_comp = NULL;
 277     }
 278 
 279     if( NULL != info->compress_postfix ) {
 280         free(info->compress_postfix);
 281         info->compress_postfix = NULL;
 282     }
 283 
 284     info->migrating = false;
 285 }
 286 
 287 void orte_sstore_stage_local_app_snapshot_info_construct(orte_sstore_stage_local_app_snapshot_info_t *info)
 288 {
 289     info->name.jobid = ORTE_JOBID_INVALID;
 290     info->name.vpid  = ORTE_VPID_INVALID;
 291 
 292     info->local_location = NULL;
 293     info->compressed_local_location = NULL;
 294     info->local_cache_location = NULL;
 295     info->metadata_filename = NULL;
 296     info->crs_comp = NULL;
 297     info->ckpt_skipped = false;
 298     info->compress_pid = 0;
 299 }
 300 
 301 void orte_sstore_stage_local_app_snapshot_info_destruct( orte_sstore_stage_local_app_snapshot_info_t *info)
 302 {
 303     info->name.jobid = ORTE_JOBID_INVALID;
 304     info->name.vpid  = ORTE_VPID_INVALID;
 305 
 306     if( NULL != info->local_location ) {
 307         free(info->local_location);
 308         info->local_location = NULL;
 309     }
 310 
 311     if( NULL != info->compressed_local_location ) {
 312         free(info->compressed_local_location);
 313         info->compressed_local_location = NULL;
 314     }
 315 
 316     if( NULL != info->local_cache_location ) {
 317         free(info->local_cache_location);
 318         info->local_cache_location = NULL;
 319     }
 320 
 321     if( NULL != info->metadata_filename ) {
 322         free(info->metadata_filename);
 323         info->metadata_filename = NULL;
 324     }
 325 
 326     if( NULL != info->crs_comp ) {
 327         free(info->crs_comp);
 328         info->crs_comp = NULL;
 329     }
 330 
 331     info->ckpt_skipped = false;
 332 
 333     info->compress_pid = 0;
 334 }
 335 
 336 /******************
 337  * Local functions
 338  ******************/
 339 int orte_sstore_stage_local_module_init(void)
 340 {
 341     int ret, exit_status = ORTE_SUCCESS;
 342 
 343     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
 344                          "sstore:stage:(local): init()"));
 345 
 346     if( NULL == active_handles ) {
 347         active_handles = OBJ_NEW(opal_list_t);
 348     }
 349 
 350     if( NULL == preload_filem_requests ) {
 351         preload_filem_requests = OBJ_NEW(opal_list_t);
 352     }
 353 
 354     /*
 355      * Create the local storage directory
 356      */
 357     opal_asprintf(&sstore_stage_local_basedir, "%s/%s/%s",
 358              orte_sstore_stage_local_snapshot_dir,
 359              ORTE_SSTORE_LOCAL_SNAPSHOT_DIR_NAME,
 360              ORTE_SSTORE_LOCAL_SNAPSHOT_STAGE_DIR_NAME);
 361     if( ORTE_SUCCESS != (ret = sstore_stage_create_local_dir()) ) {
 362         ORTE_ERROR_LOG(ret);
 363         exit_status = ret;
 364         goto cleanup;
 365     }
 366 
 367     /*
 368      * Create the local cache
 369      */
 370     if( orte_sstore_stage_enabled_caching ) {
 371         opal_asprintf(&sstore_stage_cache_basedir, "%s/%s/%s",
 372                  orte_sstore_stage_local_snapshot_dir,
 373                  ORTE_SSTORE_LOCAL_SNAPSHOT_DIR_NAME,
 374                  ORTE_SSTORE_LOCAL_SNAPSHOT_CACHE_DIR_NAME);
 375 
 376         if( ORTE_SUCCESS != (ret = sstore_stage_create_cache()) ) {
 377             ORTE_ERROR_LOG(ret);
 378             exit_status = ret;
 379             goto cleanup;
 380         }
 381     }
 382 
 383     /*
 384      * Setup a listener for the HNP/Apps
 385      * We could be the HNP, in which case the listener is already registered.
 386      */
 387     if( !ORTE_PROC_IS_HNP ) {
 388         if( ORTE_SUCCESS != (ret = sstore_stage_local_start_listener()) ) {
 389             ORTE_ERROR_LOG(ret);
 390             exit_status = ret;
 391             goto cleanup;
 392         }
 393     }
 394 
 395  cleanup:
 396     return exit_status;
 397 }
 398 
 399 int orte_sstore_stage_local_module_finalize(void)
 400 {
 401     int ret, exit_status = ORTE_SUCCESS;
 402     orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
 403     opal_list_item_t* item = NULL;
 404     bool done = false;
 405     int cur_time = 0, max_time = 120;
 406 
 407     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
 408                          "sstore:stage:(local): finalize()"));
 409 
 410     /*
 411      * Wait for all active transfers to finish
 412      */
 413     if( !ORTE_PROC_IS_HNP ) {
 414         done = false;
 415         while( 0 < opal_list_get_size(active_handles) && !done ) {
 416             done = true;
 417             for(item  = opal_list_get_first(active_handles);
 418                 item != opal_list_get_end(active_handles);
 419                 item  = opal_list_get_next(item) ) {
 420                 handle_info = (orte_sstore_stage_local_snapshot_info_t*)item;
 421                 if( SSTORE_LOCAL_DONE  != handle_info->status &&
 422                     SSTORE_LOCAL_NONE  != handle_info->status &&
 423                     SSTORE_LOCAL_ERROR != handle_info->status ) {
 424                     done = false;
 425                     break;
 426                 }
 427             }
 428             if( done ) {
 429                 break;
 430             }
 431             else {
 432                 if( cur_time != 0 && cur_time % 30 == 0 ) {
 433                     opal_output(0, "---> Waiting for fin(): %3d / %3d\n",
 434                                 cur_time, max_time);
 435                 }
 436 
 437                 opal_progress();
 438                 if( cur_time >= max_time ) {
 439                     break;
 440                 } else {
 441                     sleep(1);
 442                 }
 443                 cur_time++;
 444             }
 445         }
 446     }
 447 
 448     if( NULL != active_handles ) {
 449         OBJ_RELEASE(active_handles);
 450     }
 451 
 452     if( NULL != preload_filem_requests ) {
 453         OBJ_RELEASE(preload_filem_requests);
 454     }
 455 
 456     /*
 457      * Shutdown the listener for the HNP/Apps
 458      * We could be the HNP, in which case the listener is already deregistered.
 459      */
 460     if( !ORTE_PROC_IS_HNP ) {
 461         if( ORTE_SUCCESS != (ret = sstore_stage_local_stop_listener()) ) {
 462             ORTE_ERROR_LOG(ret);
 463             exit_status = ret;
 464             goto cleanup;
 465         }
 466     }
 467 
 468     /*
 469      * Destroy the local cache
 470      */
 471     if( orte_sstore_stage_enabled_caching ) {
 472         if( ORTE_SUCCESS != (ret = sstore_stage_destroy_cache()) ) {
 473             ORTE_ERROR_LOG(ret);
 474             exit_status = ret;
 475             goto cleanup;
 476         }
 477     }
 478 
 479     /*
 480      * Destroy the local storage directory
 481      */
 482     if( ORTE_SUCCESS != (ret = sstore_stage_destroy_local_dir()) ) {
 483         ORTE_ERROR_LOG(ret);
 484         exit_status = ret;
 485         goto cleanup;
 486     }
 487 
 488  cleanup:
 489     if( orte_sstore_stage_enabled_caching ) {
 490         if( NULL != sstore_stage_cache_basedir ) {
 491             free(sstore_stage_cache_basedir);
 492             sstore_stage_cache_basedir = NULL;
 493         }
 494     }
 495 
 496     if( NULL != sstore_stage_local_basedir ) {
 497         free(sstore_stage_local_basedir);
 498         sstore_stage_local_basedir = NULL;
 499     }
 500 
 501     return exit_status;
 502 }
 503 
 504 int orte_sstore_stage_local_request_checkpoint_handle(orte_sstore_base_handle_t *handle, int seq, orte_jobid_t jobid)
 505 {
 506     opal_output(0, "sstore:stage:(local): request_checkpoint_handle() Not implemented!");
 507     return ORTE_ERR_NOT_IMPLEMENTED;
 508 }
 509 
 510 int orte_sstore_stage_local_register(orte_sstore_base_handle_t handle)
 511 {
 512     int ret, exit_status = ORTE_SUCCESS;
 513     orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
 514 
 515     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
 516                          "sstore:stage:(local): register()"));
 517 
 518     /*
 519      * Create a handle
 520      */
 521     if( NULL == (handle_info = find_handle_info(handle)) ) {
 522         handle_info = create_new_handle_info(handle);
 523     }
 524 
 525     /*
 526      * Get basic information from Global SStore
 527      */
 528     if( ORTE_SUCCESS != (ret = pull_handle_info(handle_info)) ) {
 529         ORTE_ERROR_LOG(ret);
 530         exit_status = ret;
 531         goto cleanup;
 532     }
 533 
 534     /*
 535      * Wait here until the pull request has been satisfied
 536      */
 537     while(SSTORE_LOCAL_READY != handle_info->status &&
 538           SSTORE_LOCAL_ERROR != handle_info->status ) {
 539         opal_progress();
 540     }
 541 
 542  cleanup:
 543     return exit_status;
 544 }
 545 
 546 int orte_sstore_stage_local_get_attr(orte_sstore_base_handle_t handle,  orte_sstore_base_key_t key, char **value)
 547 {
 548     opal_output(0, "sstore:stage:(local): get_attr() Not implemented!");
 549     return ORTE_ERR_NOT_IMPLEMENTED;
 550 }
 551 
 552 int orte_sstore_stage_local_set_attr(orte_sstore_base_handle_t handle,  orte_sstore_base_key_t key, char *value)
 553 {
 554     opal_output(0, "sstore:stage:(local): set_attr() Not implemented!");
 555     return ORTE_ERR_NOT_IMPLEMENTED;
 556 }
 557 
 558 int orte_sstore_stage_local_sync(orte_sstore_base_handle_t handle)
 559 {
 560     int ret, exit_status = ORTE_SUCCESS;
 561     orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
 562 
 563     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
 564                          "sstore:stage:(local): sync()"));
 565 
 566     /*
 567      * Lookup the handle
 568      */
 569     handle_info = find_handle_info(handle);
 570 
 571     /*
 572      * Wait for all of the applications to update their metadata
 573      */
 574     if( ORTE_SUCCESS != (ret = wait_all_apps_updated(handle_info))) {
 575         ORTE_ERROR_LOG(ret);
 576         exit_status = ret;
 577         goto cleanup;
 578     }
 579 
 580     /*
 581      * Wait for compression to finish
 582      */
 583     if( orte_sstore_stage_enabled_compression ) {
 584         if( ORTE_SUCCESS != (ret = wait_all_compressed(handle_info))) {
 585             ORTE_ERROR_LOG(ret);
 586             exit_status = ret;
 587             goto cleanup;
 588         }
 589     }
 590 
 591     /*
 592      * Push information to the Global coordinator
 593      */
 594     if( ORTE_SUCCESS != (ret = push_handle_info(handle_info)) ) {
 595         ORTE_ERROR_LOG(ret);
 596         exit_status = ret;
 597         goto cleanup;
 598     }
 599 
 600     handle_info->status = SSTORE_LOCAL_SYNCED;
 601 
 602  cleanup:
 603     return exit_status;
 604 }
 605 
 606 int orte_sstore_stage_local_remove(orte_sstore_base_handle_t handle)
 607 {
 608     opal_output(0, "sstore:stage:(local): remove() Not implemented!");
 609     return ORTE_ERR_NOT_IMPLEMENTED;
 610 }
 611 
 612 int orte_sstore_stage_local_pack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t handle)
 613 {
 614     int ret, exit_status = ORTE_SUCCESS;
 615 
 616     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
 617                          "sstore:stage:(local): pack()"));
 618 
 619     /*
 620      * Lookup the handle
 621      */
 622 
 623 
 624     /*
 625      * Pack the handle ID
 626      */
 627     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &handle, 1, ORTE_SSTORE_HANDLE))) {
 628         ORTE_ERROR_LOG(ret);
 629         exit_status = ret;
 630         goto cleanup;
 631     }
 632 
 633     /*
 634      * Pack any metadata
 635      */
 636 
 637  cleanup:
 638     return exit_status;
 639 }
 640 
 641 int orte_sstore_stage_local_unpack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t *handle)
 642 {
 643     int ret, exit_status = ORTE_SUCCESS;
 644     orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
 645     orte_std_cntr_t count;
 646 
 647     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
 648                          "sstore:stage:(local): unpack()"));
 649 
 650     /*
 651      * Unpack the handle id
 652      */
 653     count = 1;
 654     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, handle, &count, ORTE_SSTORE_HANDLE))) {
 655         ORTE_ERROR_LOG(ret);
 656         exit_status = ret;
 657         goto cleanup;
 658     }
 659 
 660     /*
 661      * Lookup the handle
 662      */
 663     if( NULL == (handle_info = find_handle_info(*handle)) ) {
 664         handle_info = create_new_handle_info(*handle);
 665     }
 666 
 667     /*
 668      * Unpack the metadata piggybacked on this message
 669      */
 670     if( ORTE_SUCCESS != (ret = process_global_push(peer, buffer, handle_info))) {
 671         ORTE_ERROR_LOG(ret);
 672         exit_status = ret;
 673         goto cleanup;
 674     }
 675 
 676  cleanup:
 677     return exit_status;
 678 }
 679 
 680 int orte_sstore_stage_local_fetch_app_deps(orte_app_context_t *app)
 681 {
 682     int ret, exit_status = ORTE_SUCCESS;
 683     orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
 684     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
 685     char **sstore_args = NULL;
 686     char * req_snap_loc = NULL;
 687     char * req_snap_global_ref = NULL;
 688     char * req_snap_ref = NULL;
 689     char * req_snap_postfix = NULL;
 690     char * local_location = NULL;
 691     char * req_snap_compress = NULL;
 692     char * compress_local_location = NULL;
 693     char * compress_ref = NULL;
 694     char * tmp_str = NULL;
 695     int req_snap_seq = 0;
 696     int i;
 697     orte_proc_t *child = NULL;
 698     int loc_argc = 0;
 699     bool skip_xfer = false;
 700     char *sload = NULL;
 701 
 702     orte_get_attribute(&app->attributes, ORTE_APP_SSTORE_LOAD, (void **)&sload, OPAL_STRING);
 703 
 704     if(!ORTE_FLAG_TEST(app, ORTE_APP_FLAG_USED_ON_NODE) || NULL == sload) {
 705         OPAL_OUTPUT_VERBOSE((30, mca_sstore_stage_component.super.output_handle,
 706                              "sstore:stage:(local): fetch_app_deps(%3d): Not for this daemon (%s, %d, %s)",
 707                              app->idx, (ORTE_FLAG_TEST(app, ORTE_APP_FLAG_USED_ON_NODE) ? "T" : "F"),
 708                              (int)app->num_procs, sload));
 709         /* Nothing to do */
 710         goto cleanup;
 711     }
 712 
 713     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
 714                          "sstore:stage:(local): fetch_app_deps(%3d): %s",
 715                          app->idx, sload));
 716 
 717     /*
 718      * Extract the 'ref:seq' parameter
 719      */
 720     sstore_args = opal_argv_split(sload, ':');
 721     req_snap_loc        = strdup(sstore_args[0]);
 722     req_snap_global_ref = strdup(sstore_args[1]);
 723     req_snap_ref        = strdup(sstore_args[2]);
 724     if( NULL == sstore_args[4] ) { /* Not compressed */
 725         req_snap_seq        = atoi(  sstore_args[3]);
 726     } else {
 727         req_snap_compress   = strdup(sstore_args[3]);
 728         req_snap_postfix    = strdup(sstore_args[4]);
 729         req_snap_seq        = atoi(  sstore_args[5]);
 730     }
 731 
 732     handle_info = find_handle_info_ref(req_snap_global_ref, req_snap_seq);
 733     if( NULL == handle_info ) {
 734         /* No checkpoints known, just preload the checkpoint */
 735         OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
 736                              "sstore:stage:(local): fetch_app_deps(%3d): No known checkpoint [%s, %d]",
 737                              app->idx,
 738                              req_snap_ref,
 739                              req_snap_seq));
 740         goto filem_preload;
 741     }
 742 
 743     /*
 744      * If caching enabled, then look to see if we have this snapshot cached
 745      * Do not cache if migrating, since checkpoints taken while migrating are
 746      * not guaranteed to be globally taken.
 747      */
 748     if( orte_sstore_stage_enabled_caching && !handle_info->migrating ) {
 749         /*
 750          * Find the process
 751          */
 752         for (i=0; i < orte_local_children->size; i++) {
 753             if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
 754                 continue;
 755             }
 756 
 757             if( app->idx == child->app_idx ) {
 758                 /*
 759                  * Find the app snapshot ref
 760                  */
 761                 app_info = find_app_handle_info(handle_info, &child->name);
 762                 break;
 763             }
 764         }
 765 
 766         if( NULL == app_info ) {
 767             OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
 768                                  "sstore:stage:(local): fetch_app_deps(%3d): No processes known for this app context",
 769                                  app->idx));
 770             goto filem_preload;
 771         }
 772 
 773         /*
 774          * Do we have a cached version of this file?
 775          */
 776         if( NULL != app_info->local_cache_location &&
 777             0 == (ret = access(app_info->local_cache_location, F_OK)) ) {
 778             OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
 779                                  "sstore:stage:(local): fetch_app_deps(%3d): Using local cache. (%s)",
 780                                  app->idx,
 781                                  app_info->local_cache_location));
 782 
 783             opal_argv_append(&loc_argc, &(app->argv), "-c");
 784             opal_argv_append(&loc_argc, &(app->argv), app_info->local_cache_location);
 785             goto cleanup;
 786         } else {
 787             OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
 788                                  "sstore:stage:(local): fetch_app_deps(%3d): No cache available for %s. (%s)",
 789                                  app->idx,
 790                                  ORTE_NAME_PRINT(&app_info->name),
 791                                  app_info->local_cache_location));
 792         }
 793     }
 794 
 795  filem_preload:
 796     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
 797                          "sstore:stage:(local): fetch_app_deps(%3d): Fetch files from Central storage",
 798                          app->idx));
 799 
 800     /*
 801      * If we got here, then there is no cached directory, so just preload the
 802      * files, update the argument set, and carry on.
 803      */
 804     if( ORTE_SUCCESS != (ret = orte_sstore_stage_local_preload_files(&local_location,
 805                                                                      &skip_xfer,
 806                                                                      req_snap_loc,
 807                                                                      req_snap_ref,
 808                                                                      req_snap_postfix,
 809                                                                      req_snap_seq)) ) {
 810         ORTE_ERROR_LOG(ret);
 811         exit_status = ret;
 812         goto cleanup;
 813     }
 814     opal_argv_append(&loc_argc, &(app->argv), "-l");
 815     opal_argv_append(&loc_argc, &(app->argv), local_location);
 816 
 817     /*
 818      * Decompress files:
 819      *  opal-restart will do this for us on launch
 820      */
 821     if( !skip_xfer ) {
 822         if( NULL != req_snap_compress && 0 < strlen(req_snap_compress) ) {
 823             opal_argv_append(&loc_argc, &(app->argv), "-d");
 824             opal_argv_append(&loc_argc, &(app->argv), req_snap_compress);
 825         }
 826         if( NULL != req_snap_postfix && 0 < strlen(req_snap_postfix) ) {
 827             opal_argv_append(&loc_argc, &(app->argv), "-p");
 828             opal_argv_append(&loc_argc, &(app->argv), req_snap_postfix);
 829         }
 830     }
 831 
 832     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
 833                          "sstore:stage:(local): fetch_app_deps(%3d): Fetching to (%s)",
 834                          app->idx,
 835                          local_location));
 836 
 837  cleanup:
 838     if( NULL != req_snap_compress ) {
 839         free(req_snap_compress);
 840         req_snap_compress = NULL;
 841     }
 842 
 843     if( NULL != tmp_str ) {
 844         free(tmp_str);
 845         tmp_str = NULL;
 846     }
 847 
 848     if( NULL != compress_local_location ) {
 849         free(compress_local_location);
 850         compress_local_location = NULL;
 851     }
 852 
 853     if( NULL != compress_ref ) {
 854         free(compress_ref);
 855         compress_ref = NULL;
 856     }
 857 
 858     if( NULL != sstore_args ) {
 859         opal_argv_free(sstore_args);
 860         sstore_args = NULL;
 861     }
 862 
 863     if( NULL != req_snap_ref ) {
 864         free(req_snap_ref);
 865         req_snap_ref = NULL;
 866     }
 867 
 868     if( NULL != req_snap_postfix ) {
 869         free(req_snap_postfix);
 870         req_snap_postfix = NULL;
 871     }
 872 
 873     if( NULL != req_snap_loc ) {
 874         free(req_snap_loc);
 875         req_snap_loc = NULL;
 876     }
 877 
 878     if( NULL != req_snap_global_ref ) {
 879         free(req_snap_global_ref);
 880         req_snap_global_ref = NULL;
 881     }
 882 
 883     return exit_status;
 884 }
 885 
 886 int orte_sstore_stage_local_wait_all_deps(void)
 887 {
 888     int ret, exit_status = ORTE_SUCCESS;
 889     opal_list_item_t* item = NULL;
 890 
 891     /* Nothing being preloaded, so just move on */
 892     if( 0 >= opal_list_get_size(preload_filem_requests) ) {
 893         return ORTE_SUCCESS;
 894     }
 895 
 896     /*
 897      * Wait for all files to move
 898      */
 899     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
 900                          "sstore:stage:(local): wait_all_deps(): Waiting on %d requests",
 901                          (int)opal_list_get_size(preload_filem_requests)));
 902 
 903     if(ORTE_SUCCESS != (ret = orte_filem.wait_all(preload_filem_requests)) ) {
 904         ORTE_ERROR_LOG(ret);
 905         exit_status = ret;
 906         goto cleanup;
 907     }
 908 
 909     /*
 910      * Cache the restart files locally, so we can restart faster next time
 911      * JJH: We already check the restart directory for a local copy before
 912      *      starting the transfer. So this feels unnecessary since the
 913      *      restart directory is always used as a cache, whether or not
 914      *      caching is enabled. The extra copy to the cache directory
 915      *      does not buy us anything.
 916      */
 917 
 918     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
 919                          "sstore:stage:(local): wait_all_deps(): Finished waiting on %d requests!",
 920                          (int)opal_list_get_size(preload_filem_requests)));
 921 
 922  cleanup:
 923     while (NULL != (item = opal_list_remove_first(preload_filem_requests) ) ) {
 924         OBJ_RELEASE(item);
 925     }
 926 
 927     return exit_status;
 928 }
 929 
 930 /**************************
 931  * Local functions
 932  **************************/
 933 static orte_sstore_stage_local_snapshot_info_t *create_new_handle_info(orte_sstore_base_handle_t handle)
 934 {
 935     orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
 936     int i;
 937     orte_proc_t *child = NULL;
 938 
 939     if( NULL == active_handles ) {
 940         active_handles = OBJ_NEW(opal_list_t);
 941     }
 942 
 943     handle_info = OBJ_NEW(orte_sstore_stage_local_snapshot_info_t);
 944 
 945     handle_info->id = handle;
 946 
 947     opal_list_append(active_handles, &(handle_info->super));
 948 
 949     /*
 950      * Create a sub structure for each child
 951      */
 952     for (i=0; i < orte_local_children->size; i++) {
 953             if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
 954             continue;
 955         }
 956         append_new_app_handle_info(handle_info, &child->name);
 957     }
 958 
 959     handle_info->status = SSTORE_LOCAL_INIT;
 960 
 961     return handle_info;
 962 }
 963 
 964 static orte_sstore_stage_local_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle)
 965 {
 966     orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
 967     opal_list_item_t* item = NULL;
 968 
 969     if( NULL == active_handles ) {
 970         return NULL;
 971     }
 972 
 973     for(item  = opal_list_get_first(active_handles);
 974         item != opal_list_get_end(active_handles);
 975         item  = opal_list_get_next(item) ) {
 976         handle_info = (orte_sstore_stage_local_snapshot_info_t*)item;
 977 
 978         if( handle_info->id == handle ) {
 979             return handle_info;
 980         }
 981     }
 982 
 983     return NULL;
 984 }
 985 
 986 static orte_sstore_stage_local_snapshot_info_t *find_handle_info_ref(char * ref, int seq)
 987 {
 988     orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
 989     opal_list_item_t* item = NULL;
 990 
 991     if( NULL == active_handles ) {
 992         return NULL;
 993     }
 994 
 995     for(item  = opal_list_get_first(active_handles);
 996         item != opal_list_get_end(active_handles);
 997         item  = opal_list_get_next(item) ) {
 998         handle_info = (orte_sstore_stage_local_snapshot_info_t*)item;
 999 
1000         if( 0 == strncmp(handle_info->global_ref_name, ref, strlen(ref)) &&
1001             handle_info->seq_num == seq ) {
1002             return handle_info;
1003         }
1004     }
1005 
1006     return NULL;
1007 }
1008 
1009 static int append_new_app_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info,
1010                                       orte_process_name_t *name)
1011 {
1012     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1013 
1014     app_info = OBJ_NEW(orte_sstore_stage_local_app_snapshot_info_t);
1015 
1016     app_info->name.jobid = name->jobid;
1017     app_info->name.vpid  = name->vpid;
1018 
1019     opal_list_append(handle_info->app_info_handle, &(app_info->super));
1020 
1021     return ORTE_SUCCESS;
1022 }
1023 
1024 static orte_sstore_stage_local_app_snapshot_info_t *find_app_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info,
1025                                                                            orte_process_name_t *name)
1026 {
1027     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1028     opal_list_item_t* item = NULL;
1029     orte_ns_cmp_bitmask_t mask;
1030 
1031     for(item  = opal_list_get_first(handle_info->app_info_handle);
1032         item != opal_list_get_end(handle_info->app_info_handle);
1033         item  = opal_list_get_next(item) ) {
1034         app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1035 
1036         mask = ORTE_NS_CMP_ALL;
1037 
1038         if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &app_info->name, name)) {
1039             return app_info;
1040         }
1041     }
1042 
1043     return NULL;
1044 }
1045 
1046 static int sstore_stage_local_start_listener(void)
1047 {
1048     if( is_global_listener_active ) {
1049         return ORTE_SUCCESS;
1050     }
1051 
1052     orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL,
1053                             ORTE_RML_PERSISTENT, sstore_stage_local_recv, NULL);
1054 
1055     is_global_listener_active = true;
1056     return ORTE_SUCCESS;
1057 }
1058 
1059 static int sstore_stage_local_stop_listener(void)
1060 {
1061     orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL);
1062     is_global_listener_active = false;
1063     return ORTE_SUCCESS;
1064 }
1065 
1066 static void sstore_stage_local_recv(int status,
1067                                     orte_process_name_t* sender,
1068                                     opal_buffer_t* buffer,
1069                                     orte_rml_tag_t tag,
1070                                     void* cbdata)
1071 {
1072     int ret;
1073     orte_sstore_stage_cmd_flag_t command;
1074     orte_std_cntr_t count;
1075     orte_sstore_base_handle_t loc_id;
1076 
1077     if( ORTE_RML_TAG_SSTORE_INTERNAL != tag ) {
1078         return;
1079     }
1080 
1081     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1082                          "sstore:stage:(local): process_cmd(%s)",
1083                          ORTE_NAME_PRINT(sender)));
1084 
1085     count = 1;
1086     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SSTORE_STAGE_CMD))) {
1087         ORTE_ERROR_LOG(ret);
1088         goto cleanup;
1089     }
1090 
1091     count = 1;
1092     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &loc_id, &count, ORTE_SSTORE_HANDLE )) ) {
1093         ORTE_ERROR_LOG(ret);
1094         goto cleanup;
1095     }
1096 
1097     orte_sstore_stage_local_process_cmd_action(sender, command, loc_id, buffer);
1098 
1099  cleanup:
1100     return;
1101 }
1102 
1103 int orte_sstore_stage_local_process_cmd_action(orte_process_name_t *sender,
1104                                                orte_sstore_stage_cmd_flag_t command,
1105                                                orte_sstore_base_handle_t loc_id,
1106                                                opal_buffer_t* buffer)
1107 {
1108     orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
1109 
1110     /*
1111      * Find the referenced handle (Create if it does not exist)
1112      */
1113     if(NULL == (handle_info = find_handle_info(loc_id)) ) {
1114         handle_info = create_new_handle_info(loc_id);
1115     }
1116 
1117     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1118                          "sstore:stage:(local): process_cmd(%s) - Command = %s",
1119                          ORTE_NAME_PRINT(sender),
1120                          (ORTE_SSTORE_STAGE_PULL == command ? "Pull" :
1121                           (ORTE_SSTORE_STAGE_PUSH == command ? "Push" :
1122                            (ORTE_SSTORE_STAGE_REMOVE == command ? "Remove" :
1123                             (ORTE_SSTORE_STAGE_DONE == command ? "Done" : "Unknown")))) ));
1124 
1125     /*
1126      * Process the command
1127      */
1128     if( ORTE_SSTORE_STAGE_PULL == command ) {
1129         if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, ORTE_PROC_MY_HNP, sender)) {
1130             process_global_pull(sender, buffer, handle_info);
1131         } else {
1132             process_app_pull(sender, buffer, handle_info);
1133         }
1134     }
1135     else if( ORTE_SSTORE_STAGE_PUSH == command ) {
1136         if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, ORTE_PROC_MY_HNP, sender)) {
1137             process_global_push(sender, buffer, handle_info);
1138         } else {
1139             process_app_push(sender, buffer, handle_info);
1140         }
1141     }
1142     else if( ORTE_SSTORE_STAGE_REMOVE == command ) {
1143         /* The xcast from the root makes the 'sender' equal to this process :/
1144          * so we know it is the HNP, so just use that name */
1145         process_global_remove(ORTE_PROC_MY_HNP, buffer, handle_info);
1146     }
1147 
1148     return ORTE_SUCCESS;
1149 }
1150 
1151 static int process_global_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info)
1152 {
1153     /* JJH should be as simple as calling push_handle_info() */
1154     opal_output(0, "sstore:stage:(local): process_global_pull() Not implemented!");
1155     return ORTE_ERR_NOT_IMPLEMENTED;
1156 }
1157 
1158 static int process_global_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info)
1159 {
1160     int ret, exit_status = ORTE_SUCCESS;
1161     orte_std_cntr_t count;
1162     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1163     opal_list_item_t* item = NULL;
1164 
1165     count = 1;
1166     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->seq_num), &count, OPAL_INT))) {
1167         ORTE_ERROR_LOG(ret);
1168         exit_status = ret;
1169         goto cleanup;
1170     }
1171 
1172     count = 1;
1173     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->global_ref_name), &count, OPAL_STRING))) {
1174         ORTE_ERROR_LOG(ret);
1175         exit_status = ret;
1176         goto cleanup;
1177     }
1178 
1179     count = 1;
1180     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->location_fmt), &count, OPAL_STRING))) {
1181         ORTE_ERROR_LOG(ret);
1182         exit_status = ret;
1183         goto cleanup;
1184     }
1185 
1186     if( orte_sstore_stage_enabled_caching ) {
1187         count = 1;
1188         if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->cache_location_fmt), &count, OPAL_STRING))) {
1189             ORTE_ERROR_LOG(ret);
1190             exit_status = ret;
1191             goto cleanup;
1192         }
1193     }
1194 
1195     count = 1;
1196     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->migrating), &count, OPAL_BOOL))) {
1197         ORTE_ERROR_LOG(ret);
1198         exit_status = ret;
1199         goto cleanup;
1200     }
1201 
1202     /*
1203      * For each process we are working with
1204      */
1205     for(item  = opal_list_get_first(handle_info->app_info_handle);
1206         item != opal_list_get_end(handle_info->app_info_handle);
1207         item  = opal_list_get_next(item) ) {
1208         app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1209 
1210         if( NULL != app_info->local_location ) {
1211             free(app_info->local_location);
1212             app_info->local_location = NULL;
1213         }
1214         opal_asprintf(&(app_info->local_location), handle_info->location_fmt, app_info->name.vpid);
1215 
1216         if( orte_sstore_stage_enabled_caching ) {
1217             if( NULL != app_info->local_cache_location ) {
1218                 free(app_info->local_cache_location);
1219                 app_info->local_cache_location = NULL;
1220             }
1221             opal_asprintf(&(app_info->local_cache_location), handle_info->cache_location_fmt, app_info->name.vpid);
1222         }
1223 
1224         if( NULL != app_info->metadata_filename ) {
1225             free(app_info->metadata_filename);
1226             app_info->metadata_filename = NULL;
1227         }
1228         opal_asprintf(&(app_info->metadata_filename), "%s/%s",
1229                  app_info->local_location,
1230                  orte_sstore_base_local_metadata_filename);
1231     }
1232 
1233  cleanup:
1234     if( ORTE_SUCCESS == exit_status ) {
1235         handle_info->status = SSTORE_LOCAL_READY;
1236     } else {
1237         handle_info->status = SSTORE_LOCAL_ERROR;
1238     }
1239 
1240     return exit_status;
1241 }
1242 
1243 static int process_global_remove(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info)
1244 {
1245     int ret, exit_status = ORTE_SUCCESS;
1246     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1247     opal_list_item_t* item = NULL;
1248     opal_buffer_t *loc_buffer = NULL;
1249     orte_sstore_stage_cmd_flag_t command;
1250     size_t list_size;
1251     char * cmd = NULL;
1252 
1253     /*
1254      * If not caching, then just remove the local copy
1255      * Or if migrating, since we do not cache checkpoints generated while
1256      * migrating.
1257      */
1258     if( !orte_sstore_stage_enabled_caching || handle_info->migrating ) {
1259         for(item  = opal_list_get_first(handle_info->app_info_handle);
1260             item != opal_list_get_end(handle_info->app_info_handle);
1261             item  = opal_list_get_next(item) ) {
1262             app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1263 
1264             opal_asprintf(&cmd, "rm -rf %s", app_info->local_location);
1265             OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1266                                  "sstore:stage:(local): update_cache(): Removing with command (%s)",
1267                                  cmd));
1268             system(cmd);
1269 
1270             if( orte_sstore_stage_enabled_compression && NULL != app_info->compressed_local_location) {
1271                 free(cmd);
1272                 cmd = NULL;
1273 
1274                 opal_asprintf(&cmd, "rm -rf %s", app_info->compressed_local_location);
1275                 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1276                                      "sstore:stage:(local): update_cache(): Removing with command (%s)",
1277                                      cmd));
1278                 system(cmd);
1279             }
1280         }
1281     }
1282     else {
1283           /*
1284            * Update the local cache
1285            */
1286           if( ORTE_SUCCESS != (ret = sstore_stage_update_cache(handle_info)) ) {
1287               ORTE_ERROR_LOG(ret);
1288               exit_status = ret;
1289               goto cleanup;
1290           }
1291     }
1292 
1293     loc_buffer = OBJ_NEW(opal_buffer_t);
1294 
1295     command = ORTE_SSTORE_STAGE_DONE;
1296     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
1297         ORTE_ERROR_LOG(ret);
1298         exit_status = ret;
1299         goto cleanup;
1300     }
1301 
1302     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
1303         ORTE_ERROR_LOG(ret);
1304         exit_status = ret;
1305         goto cleanup;
1306     }
1307 
1308     list_size = opal_list_get_size(handle_info->app_info_handle);
1309     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &list_size, 1, OPAL_SIZE))) {
1310         ORTE_ERROR_LOG(ret);
1311         exit_status = ret;
1312         goto cleanup;
1313     }
1314 
1315     if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(peer, loc_buffer, ORTE_RML_TAG_SSTORE_INTERNAL,
1316                                                        orte_rml_send_callback, NULL))) {
1317         ORTE_ERROR_LOG(ret);
1318         exit_status = ret;
1319         goto cleanup;
1320     }
1321 
1322     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1323                          "sstore:stage:(local): remove(): Sent done for %d files to %s",
1324                          (int)list_size,
1325                          ORTE_NAME_PRINT(peer)));
1326 
1327     handle_info->status = SSTORE_LOCAL_DONE;
1328     /* loc_buffer should not be released here; the callback releases it */
1329     loc_buffer = NULL;
1330 
1331  cleanup:
1332     if( NULL != cmd ) {
1333         free(cmd);
1334         cmd = NULL;
1335     }
1336 
1337     if (NULL != loc_buffer) {
1338         OBJ_RELEASE(loc_buffer);
1339         loc_buffer = NULL;
1340     }
1341 
1342     return exit_status;
1343 }
1344 
1345 static int process_app_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info)
1346 {
1347     int ret, exit_status = ORTE_SUCCESS;
1348     opal_buffer_t *loc_buffer = NULL;
1349     orte_sstore_stage_cmd_flag_t command;
1350     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1351 
1352     /*
1353      * Find this app's data
1354      */
1355     app_info = find_app_handle_info(handle_info, peer);
1356 
1357     /*
1358      * Push back the requested information
1359      */
1360     loc_buffer = OBJ_NEW(opal_buffer_t);
1361 
1362     command = ORTE_SSTORE_STAGE_PUSH;
1363     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
1364         ORTE_ERROR_LOG(ret);
1365         exit_status = ret;
1366         goto cleanup;
1367     }
1368 
1369     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
1370         ORTE_ERROR_LOG(ret);
1371         exit_status = ret;
1372         goto cleanup;
1373     }
1374 
1375     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->seq_num), 1, OPAL_INT))) {
1376         ORTE_ERROR_LOG(ret);
1377         exit_status = ret;
1378         goto cleanup;
1379     }
1380 
1381     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->global_ref_name), 1, OPAL_STRING))) {
1382         ORTE_ERROR_LOG(ret);
1383         exit_status = ret;
1384         goto cleanup;
1385     }
1386 
1387     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(app_info->local_location), 1, OPAL_STRING))) {
1388         ORTE_ERROR_LOG(ret);
1389         exit_status = ret;
1390         goto cleanup;
1391     }
1392 
1393     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(app_info->metadata_filename), 1, OPAL_STRING))) {
1394         ORTE_ERROR_LOG(ret);
1395         exit_status = ret;
1396         goto cleanup;
1397     }
1398 
1399     if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(peer, loc_buffer, ORTE_RML_TAG_SSTORE_INTERNAL,
1400                                                        orte_rml_send_callback, NULL))) {
1401         ORTE_ERROR_LOG(ret);
1402         exit_status = ret;
1403         goto cleanup;
1404     }
1405 
1406     /* loc_buffer should not be released here; the callback releases it */
1407     loc_buffer = NULL;
1408 
1409  cleanup:
1410     if (NULL != loc_buffer) {
1411         OBJ_RELEASE(loc_buffer);
1412         loc_buffer = NULL;
1413     }
1414 
1415     return exit_status;
1416 }
1417 
1418 static int process_app_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info)
1419 {
1420     int ret, exit_status = ORTE_SUCCESS;
1421     orte_std_cntr_t count;
1422     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1423 
1424     /*
1425      * Find this app's data
1426      */
1427     app_info = find_app_handle_info(handle_info, peer);
1428 
1429     /*
1430      * Unpack the data
1431      */
1432     count = 1;
1433     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(app_info->ckpt_skipped), &count, OPAL_BOOL))) {
1434         ORTE_ERROR_LOG(ret);
1435         exit_status = ret;
1436         goto cleanup;
1437     }
1438 
1439     if( !app_info->ckpt_skipped ) {
1440         count = 1;
1441         if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(app_info->crs_comp), &count, OPAL_STRING))) {
1442             ORTE_ERROR_LOG(ret);
1443             exit_status = ret;
1444             goto cleanup;
1445         }
1446     }
1447 
1448     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1449                          "sstore:stage:(local): app_push(%s, skip=%s, %s)",
1450                          ORTE_NAME_PRINT(&(app_info->name)),
1451                          (app_info->ckpt_skipped ? "T" : "F"),
1452                          app_info->crs_comp));
1453 
1454     /* Compression started on sync() */
1455 
1456  cleanup:
1457     return exit_status;
1458 }
1459 
1460 static int wait_all_apps_updated(orte_sstore_stage_local_snapshot_info_t *handle_info)
1461 {
1462     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1463     opal_list_item_t *item = NULL;
1464     bool is_done = true;
1465 
1466     do {
1467         is_done = true;
1468         for(item  = opal_list_get_first(handle_info->app_info_handle);
1469             item != opal_list_get_end(handle_info->app_info_handle);
1470             item  = opal_list_get_next(item) ) {
1471             app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1472 
1473             if( NULL == app_info->crs_comp && !app_info->ckpt_skipped ) {
1474                 is_done = false;
1475                 break;
1476             }
1477         }
1478 
1479         if( !is_done ) {
1480             OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1481                                  "sstore:stage:(local): Waiting for appliccation %s",
1482                                  ORTE_NAME_PRINT(&(app_info->name)) ));
1483             opal_progress();
1484         }
1485     } while(!is_done);
1486 
1487     return ORTE_SUCCESS;
1488 }
1489 
1490 static int start_compression(orte_sstore_stage_local_snapshot_info_t *handle_info,
1491                              orte_sstore_stage_local_app_snapshot_info_t *app_info)
1492 {
1493     int ret, exit_status = ORTE_SUCCESS;
1494     char * postfix = NULL;
1495     orte_proc_t *proc;
1496 
1497     /* Sanity Check */
1498     if( !orte_sstore_stage_enabled_compression ) {
1499         goto cleanup;
1500     }
1501 
1502     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1503                          "sstore:stage:(local): start_compression() Starting compression for process %s of (%s)",
1504                          ORTE_NAME_PRINT(&(app_info->name)),
1505                          app_info->local_location ));
1506 
1507     /*
1508      * Start compression (nonblocking)
1509      */
1510     if( ORTE_SUCCESS != (ret = opal_compress.compress_nb(app_info->local_location,
1511                                                          &(app_info->compressed_local_location),
1512                                                          &(postfix),
1513                                                          &(app_info->compress_pid))) ) {
1514         ORTE_ERROR_LOG(ret);
1515         exit_status = ret;
1516         goto cleanup;
1517     }
1518     if( app_info->compress_pid <= 0 ) {
1519         ORTE_ERROR_LOG(ORTE_ERROR);
1520         exit_status = ORTE_ERROR;
1521         goto cleanup;
1522     }
1523 
1524     if( NULL == handle_info->compress_comp ) {
1525         handle_info->compress_comp = strdup(opal_compress_base_selected_component.base_version.mca_component_name);
1526         handle_info->compress_postfix = strdup(postfix);
1527     }
1528 
1529     /*
1530      * Setup a callback for when it is finished
1531      */
1532     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1533                          "sstore:stage:(local): start_compression() Waiting for compression (%d) for process %s",
1534                          app_info->compress_pid,
1535                          ORTE_NAME_PRINT(&(app_info->name)) ));
1536 
1537     proc = OBJ_NEW(orte_proc_t);
1538     proc->pid = app_info->compress_pid;
1539     /* be sure to mark it as alive so we don't instantly fire */
1540     ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_ALIVE);
1541 
1542     orte_wait_cb(proc, sstore_stage_local_compress_waitpid_cb, app_info);
1543 
1544  cleanup:
1545     if( NULL != postfix ) {
1546         free(postfix);
1547         postfix = NULL;
1548     }
1549 
1550     return exit_status;
1551 }
1552 
1553 static void sstore_stage_local_compress_waitpid_cb(orte_proc_t *proc, void* cbdata)
1554 {
1555     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1556     orte_wait_tracker_t *t2 = (orte_wait_tracker_t *)cbdata;
1557 
1558     app_info = (orte_sstore_stage_local_app_snapshot_info_t*)t2->cbdata;
1559 
1560     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1561                          "sstore:stage:(local): waitpid(%6d) Compression finished for Process %s",
1562                          (int)proc->pid,
1563                          ORTE_NAME_PRINT(&(app_info->name)) ));
1564 
1565     app_info->compress_pid = 0;
1566     OBJ_RELEASE(proc);
1567     OBJ_RELEASE(t2);
1568 }
1569 
1570 static int wait_all_compressed(orte_sstore_stage_local_snapshot_info_t *handle_info)
1571 {
1572     int ret, exit_status = ORTE_SUCCESS;
1573     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1574     opal_list_item_t *item = NULL;
1575     bool is_done = true;
1576     int usleep_time = 1000;
1577     int s_time = 0, max_wait_time;
1578 
1579     /* Sanity Check */
1580     if( !orte_sstore_stage_enabled_compression ) {
1581         return ORTE_SUCCESS;
1582     }
1583 
1584     /*
1585      * Start all compression
1586      */
1587     if( orte_sstore_stage_compress_delay > 0 ) {
1588         OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1589                              "sstore:stage:(local): Delaying %d second before starting compression...",
1590                              orte_sstore_stage_compress_delay));
1591         max_wait_time = orte_sstore_stage_compress_delay * (1000000/usleep_time);
1592         for( s_time = 0; s_time < max_wait_time; ++s_time) {
1593             opal_progress();
1594             usleep(1000);
1595         }
1596     }
1597 
1598     for(item  = opal_list_get_first(handle_info->app_info_handle);
1599         item != opal_list_get_end(handle_info->app_info_handle);
1600         item  = opal_list_get_next(item) ) {
1601         app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1602 
1603         if( ORTE_SUCCESS != (ret = start_compression(handle_info, app_info)) ) {
1604             ORTE_ERROR_LOG(ret);
1605             exit_status = ret;
1606             goto cleanup;
1607         }
1608     }
1609 
1610     /*
1611      * Wait for compression to finish
1612      */
1613     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1614                          "sstore:stage:(local): Waiting for compression to finish..."));
1615     do {
1616         is_done = true;
1617         for(item  = opal_list_get_first(handle_info->app_info_handle);
1618             item != opal_list_get_end(handle_info->app_info_handle);
1619             item  = opal_list_get_next(item) ) {
1620             app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1621 
1622             if( 0 < app_info->compress_pid ) {
1623                 is_done = false;
1624                 break;
1625             }
1626         }
1627 
1628         if( !is_done ) {
1629             OPAL_OUTPUT_VERBOSE((30, mca_sstore_stage_component.super.output_handle,
1630                                  "sstore:stage:(local): Waiting for compression to finish for appliccation %s",
1631                                  ORTE_NAME_PRINT(&(app_info->name)) ));
1632             opal_progress();
1633         }
1634     } while(!is_done);
1635 
1636     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1637                          "sstore:stage:(local): Compression finished!"));
1638  cleanup:
1639     return exit_status;
1640 }
1641 
1642 static int pull_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info )
1643 {
1644     int ret, exit_status = ORTE_SUCCESS;
1645     opal_buffer_t *buffer = NULL;
1646     orte_sstore_stage_cmd_flag_t command;
1647 
1648     /*
1649      * Check to see if this is necessary
1650      * (Did we get all of the info from the handle unpack?)
1651      */
1652     if( 0 <= handle_info->seq_num &&
1653         NULL != handle_info->global_ref_name &&
1654         NULL != handle_info->location_fmt ) {
1655         handle_info->status = SSTORE_LOCAL_READY;
1656         return ORTE_SUCCESS;
1657     }
1658 
1659     buffer = OBJ_NEW(opal_buffer_t);
1660 
1661     /*
1662      * Ask the daemon to send us the info that we need
1663      */
1664     command = ORTE_SSTORE_STAGE_PULL;
1665     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
1666         ORTE_ERROR_LOG(ret);
1667         exit_status = ret;
1668         goto cleanup;
1669     }
1670 
1671     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
1672         ORTE_ERROR_LOG(ret);
1673         exit_status = ret;
1674         goto cleanup;
1675     }
1676 
1677     if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer,
1678                                                        ORTE_RML_TAG_SSTORE_INTERNAL,
1679                                                        orte_rml_send_callback, NULL))) {
1680         ORTE_ERROR_LOG(ret);
1681         exit_status = ret;
1682         goto cleanup;
1683     }
1684 
1685     /* buffer should not be released here; the callback releases it */
1686     buffer = NULL;
1687 
1688  cleanup:
1689     if (NULL != buffer) {
1690         OBJ_RELEASE(buffer);
1691         buffer = NULL;
1692     }
1693 
1694     return exit_status;
1695 }
1696 
1697 static int push_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info )
1698 {
1699     int ret, exit_status = ORTE_SUCCESS;
1700     opal_buffer_t *buffer = NULL;
1701     orte_sstore_stage_cmd_flag_t command;
1702     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1703     opal_list_item_t *item = NULL;
1704     size_t list_size;
1705 
1706     buffer = OBJ_NEW(opal_buffer_t);
1707 
1708     command = ORTE_SSTORE_STAGE_PUSH;
1709     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
1710         ORTE_ERROR_LOG(ret);
1711         exit_status = ret;
1712         goto cleanup;
1713     }
1714 
1715     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
1716         ORTE_ERROR_LOG(ret);
1717         exit_status = ret;
1718         goto cleanup;
1719     }
1720 
1721     list_size = opal_list_get_size(handle_info->app_info_handle);
1722     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &list_size, 1, OPAL_SIZE))) {
1723         ORTE_ERROR_LOG(ret);
1724         exit_status = ret;
1725         goto cleanup;
1726     }
1727 
1728     /*
1729      * For each process we are working with
1730      */
1731     for(item  = opal_list_get_first(handle_info->app_info_handle);
1732         item != opal_list_get_end(handle_info->app_info_handle);
1733         item  = opal_list_get_next(item) ) {
1734         app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1735 
1736         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(app_info->name), 1, ORTE_NAME))) {
1737             ORTE_ERROR_LOG(ret);
1738             exit_status = ret;
1739             goto cleanup;
1740         }
1741 
1742         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(app_info->ckpt_skipped), 1, OPAL_BOOL))) {
1743             ORTE_ERROR_LOG(ret);
1744             exit_status = ret;
1745             goto cleanup;
1746         }
1747 
1748         if( !app_info->ckpt_skipped ) {
1749             if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(app_info->crs_comp), 1, OPAL_STRING))) {
1750                 ORTE_ERROR_LOG(ret);
1751                 exit_status = ret;
1752                 goto cleanup;
1753             }
1754 
1755             if( orte_sstore_stage_enabled_compression ) {
1756                 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->compress_comp), 1, OPAL_STRING))) {
1757                     ORTE_ERROR_LOG(ret);
1758                     exit_status = ret;
1759                     goto cleanup;
1760                 }
1761                 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->compress_postfix), 1, OPAL_STRING))) {
1762                     ORTE_ERROR_LOG(ret);
1763                     exit_status = ret;
1764                     goto cleanup;
1765                 }
1766             }
1767         }
1768     }
1769 
1770     if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer, ORTE_RML_TAG_SSTORE_INTERNAL,
1771                                                        orte_rml_send_callback, NULL))) {
1772         ORTE_ERROR_LOG(ret);
1773         exit_status = ret;
1774         goto cleanup;
1775     }
1776 
1777     /* buffer should not be released here; the callback releases it */
1778     buffer = NULL;
1779 
1780  cleanup:
1781     if (NULL != buffer) {
1782         OBJ_RELEASE(buffer);
1783         buffer = NULL;
1784     }
1785 
1786     return exit_status;
1787 }
1788 
1789 static int sstore_stage_create_local_dir(void)
1790 {
1791     int ret, exit_status = ORTE_SUCCESS;
1792     mode_t my_mode = S_IRWXU;
1793 
1794     if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(sstore_stage_local_basedir, my_mode)) ) {
1795         ORTE_ERROR_LOG(ret);
1796         exit_status = ret;
1797         goto cleanup;
1798     }
1799 
1800  cleanup:
1801     return exit_status;
1802 }
1803 
1804 static int sstore_stage_destroy_local_dir(void)
1805 {
1806     int ret, exit_status = ORTE_SUCCESS;
1807     char * basedir_root = NULL;
1808 
1809     opal_asprintf(&basedir_root, "%s/%s",
1810              orte_sstore_stage_local_snapshot_dir,
1811              ORTE_SSTORE_LOCAL_SNAPSHOT_DIR_NAME);
1812 
1813     if(OPAL_SUCCESS != (ret = opal_os_dirpath_destroy(basedir_root, true, NULL)) ) {
1814         ORTE_ERROR_LOG(ret);
1815         exit_status = ret;
1816         goto cleanup;
1817     }
1818 
1819  cleanup:
1820     if( NULL != basedir_root ) {
1821         free(basedir_root);
1822         basedir_root = NULL;
1823     }
1824 
1825     return exit_status;
1826 }
1827 
1828 static int sstore_stage_create_cache(void)
1829 {
1830     int ret, exit_status = ORTE_SUCCESS;
1831     mode_t my_mode = S_IRWXU;
1832 
1833     /* Sanity check */
1834     if( !orte_sstore_stage_enabled_caching ) {
1835         goto cleanup;
1836     }
1837 
1838     if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(sstore_stage_cache_basedir, my_mode)) ) {
1839         ORTE_ERROR_LOG(ret);
1840         exit_status = ret;
1841         goto cleanup;
1842     }
1843 
1844  cleanup:
1845     return exit_status;
1846 }
1847 
1848 static int sstore_stage_destroy_cache(void)
1849 {
1850     int ret, exit_status = ORTE_SUCCESS;
1851 
1852     /* Sanity check */
1853     if( !orte_sstore_stage_enabled_caching ) {
1854         goto cleanup;
1855     }
1856 
1857     if(OPAL_SUCCESS != (ret = opal_os_dirpath_destroy(sstore_stage_cache_basedir, true, NULL)) ) {
1858         ORTE_ERROR_LOG(ret);
1859         exit_status = ret;
1860         goto cleanup;
1861     }
1862 
1863  cleanup:
1864     return exit_status;
1865 }
1866 
1867 static int sstore_stage_update_cache(orte_sstore_stage_local_snapshot_info_t *handle_info)
1868 {
1869     int ret, exit_status = ORTE_SUCCESS;
1870     char *cmd = NULL;
1871     mode_t my_mode = S_IRWXU;
1872     char *cache_dirname = NULL;
1873     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1874     opal_list_item_t* item = NULL;
1875     size_t list_size;
1876 
1877     /* Sanity Check */
1878     if( !orte_sstore_stage_enabled_caching || handle_info->migrating) {
1879         goto cleanup;
1880     }
1881 
1882     list_size = opal_list_get_size(handle_info->app_info_handle);
1883     if( 0 >= list_size ) {
1884         /* No processes on this node, skip */
1885         exit_status = ORTE_SUCCESS;
1886         goto cleanup;
1887     }
1888 
1889     app_info = (orte_sstore_stage_local_app_snapshot_info_t*)opal_list_get_first(handle_info->app_info_handle);
1890     if( NULL == app_info ) {
1891         ORTE_ERROR_LOG(ORTE_ERROR);
1892         exit_status = ORTE_ERROR;
1893         goto cleanup;
1894     }
1895 
1896     /*
1897      * Create the base cache directory
1898      */
1899     cache_dirname = opal_dirname(app_info->local_cache_location);
1900     if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(cache_dirname, my_mode)) ) {
1901         ORTE_ERROR_LOG(ret);
1902         exit_status = ret;
1903         goto cleanup;
1904     }
1905 
1906     /*
1907      * For each process, move the current checkpoint to the cache directory
1908      * Cached snapshots are always stored uncompressed.
1909      */
1910     for(item  = opal_list_get_first(handle_info->app_info_handle);
1911         item != opal_list_get_end(handle_info->app_info_handle);
1912         item  = opal_list_get_next(item) ) {
1913         app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1914 
1915         opal_asprintf(&cmd, "mv %s %s", app_info->local_location, cache_dirname);
1916         OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1917                              "sstore:stage:(local): update_cache(): Caching snapshot for process %s [%s]",
1918                              ORTE_NAME_PRINT(&app_info->name),
1919                              cmd));
1920         system(cmd);
1921 
1922         /* (JJH) Remove the cached files */
1923         if( orte_sstore_stage_enabled_compression && NULL != app_info->compressed_local_location) {
1924             free(cmd);
1925             cmd = NULL;
1926 
1927             opal_asprintf(&cmd, "rm -rf %s", app_info->compressed_local_location);
1928             OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1929                                  "sstore:stage:(local): update_cache(): Removing with command (%s)",
1930                                  cmd));
1931             system(cmd);
1932         }
1933     }
1934 
1935     /*
1936      * Remove the previous cached checkpoint
1937      */
1938     if( NULL != sstore_stage_cache_last_dir ) {
1939         opal_asprintf(&cmd, "rm -rf %s", sstore_stage_cache_last_dir);
1940         OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1941                              "sstore:stage:(local): update_cache(): Removing old cache dir command (%s)",
1942                              sstore_stage_cache_last_dir));
1943         system(cmd);
1944     }
1945 
1946     /*
1947      * Update 'last' cache pointer
1948      */
1949     if( NULL != sstore_stage_cache_last_dir ) {
1950         free(sstore_stage_cache_last_dir);
1951         sstore_stage_cache_last_dir = NULL;
1952     }
1953     if( NULL != sstore_stage_cache_current_dir ) {
1954         sstore_stage_cache_last_dir    = strdup(sstore_stage_cache_current_dir);
1955     }
1956 
1957     /*
1958      * Update 'current' cache pointer
1959      */
1960     if( NULL != sstore_stage_cache_current_dir ) {
1961         free(sstore_stage_cache_current_dir);
1962         sstore_stage_cache_current_dir = NULL;
1963     }
1964     sstore_stage_cache_current_dir = strdup(cache_dirname);
1965 
1966     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1967                          "sstore:stage:(local): update_cache(): Cache Pointers cur(%s), last(%s)",
1968                          sstore_stage_cache_current_dir, sstore_stage_cache_last_dir));
1969 
1970  cleanup:
1971     if( NULL != cmd ) {
1972         free(cmd);
1973         cmd = NULL;
1974     }
1975 
1976     return exit_status;
1977 }
1978 
1979 static int orte_sstore_stage_local_preload_files(char **local_location, bool *skip_xfer,
1980                                                  char *global_loc, char *ref, char *postfix, int seq)
1981 {
1982     int ret, exit_status = ORTE_SUCCESS;
1983     mode_t my_mode = S_IRWXU;
1984     orte_filem_base_request_t *filem_request;
1985     orte_filem_base_process_set_t *p_set = NULL;
1986     orte_filem_base_file_set_t * f_set = NULL;
1987     char * full_local_location = NULL;
1988 
1989     *skip_xfer = false;
1990 
1991     if( NULL != *local_location) {
1992         free(*local_location);
1993         *local_location = NULL;
1994     }
1995 
1996     /*
1997      * If the global directory is shared, then just reference directly
1998      *
1999      * Skip this optimization if compressing. Since decompressing on the
2000      * central storage would typically require a transfer to the local
2001      * disk to decompress, then transfer back. Eliminating all benefits
2002      * of the optimization.
2003      */
2004     /* (JJH) If we are going to use the preloaded restart files for subsequent
2005      *       restarts then we actually always want to preload the files. This
2006      *       way if we need to restart from the same checkpoint again, then
2007      *       we can from the local restart cache.
2008      */
2009 #if 0
2010     if( orte_sstore_stage_global_is_shared &&
2011         (NULL == postfix || 0 >= strlen(postfix) ) ) {
2012         *local_location = strdup(global_loc);
2013         *skip_xfer = true;
2014         goto cleanup;
2015     }
2016 #endif
2017 
2018     opal_asprintf(local_location, "%s/%s/%s/%d",
2019              orte_sstore_stage_local_snapshot_dir,
2020              ORTE_SSTORE_LOCAL_SNAPSHOT_DIR_NAME,
2021              ORTE_SSTORE_LOCAL_SNAPSHOT_RESTART_DIR_NAME,
2022              seq);
2023     opal_asprintf(&full_local_location, "%s/%s",
2024              *local_location,
2025              ref);
2026 
2027     /*
2028      * If the snapshot already exists locally, just reuse it instead of
2029      * transfering it again.
2030      */
2031     if( 0 == (ret = access(full_local_location, F_OK)) ) {
2032         OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
2033                              "sstore:stage:(local): preload_files() Local snapshot already exists, reuse it (%s)",
2034                              full_local_location));
2035         *skip_xfer = true;
2036         goto cleanup;
2037     }
2038 
2039     /*
2040      * Create the local restart directory
2041      */
2042     if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(*local_location, my_mode)) ) {
2043         ORTE_ERROR_LOG(ret);
2044         exit_status = ret;
2045         goto cleanup;
2046     }
2047 
2048     /*
2049      * FileM request to move the checkpoint to the local directory
2050      */
2051     filem_request = OBJ_NEW(orte_filem_base_request_t);
2052 
2053     /* Define the process set */
2054     p_set = OBJ_NEW(orte_filem_base_process_set_t);
2055     if( ORTE_PROC_IS_HNP ) {
2056         /* if I am the HNP, then use me as the source */
2057         p_set->source.jobid = ORTE_PROC_MY_NAME->jobid;
2058         p_set->source.vpid  = ORTE_PROC_MY_NAME->vpid;
2059     }
2060     else {
2061         /* otherwise, set the HNP as the source */
2062         p_set->source.jobid = ORTE_PROC_MY_HNP->jobid;
2063         p_set->source.vpid  = ORTE_PROC_MY_HNP->vpid;
2064     }
2065     p_set->sink.jobid   = ORTE_PROC_MY_NAME->jobid;
2066     p_set->sink.vpid    = ORTE_PROC_MY_NAME->vpid;
2067     opal_list_append(&(filem_request->process_sets), &(p_set->super) );
2068 
2069     /* Define the file set */
2070     f_set = OBJ_NEW(orte_filem_base_file_set_t);
2071 
2072     f_set->local_target = strdup(*local_location);
2073     if( NULL != postfix && 0 < strlen(postfix) ) {
2074         opal_asprintf(&(f_set->remote_target), "%s/%s%s",
2075                  global_loc,
2076                  ref,
2077                  postfix);
2078     } else {
2079         opal_asprintf(&(f_set->remote_target), "%s/%s",
2080                  global_loc,
2081                  ref);
2082     }
2083     if( NULL != postfix && 0 < strlen(postfix) ) {
2084         f_set->target_flag = ORTE_FILEM_TYPE_FILE;
2085     } else {
2086         f_set->target_flag = ORTE_FILEM_TYPE_DIR;
2087     }
2088 
2089     opal_list_append(&(filem_request->file_sets), &(f_set->super) );
2090 
2091     /* Start getting the files */
2092     opal_list_append(preload_filem_requests, &(filem_request->super));
2093     if(ORTE_SUCCESS != (ret = orte_filem.get_nb(filem_request)) ) {
2094         ORTE_ERROR_LOG(ret);
2095         exit_status = ret;
2096         goto cleanup;
2097     }
2098 
2099  cleanup:
2100     if( NULL != full_local_location ) {
2101         free(full_local_location);
2102         full_local_location = NULL;
2103     }
2104 
2105     return exit_status;
2106 }

/* [<][>][^][v][top][bottom][index][help] */