root/orte/mca/sstore/central/sstore_central_global.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. orte_sstore_central_global_snapshot_info_construct
  2. orte_sstore_central_global_snapshot_info_destruct
  3. orte_sstore_central_global_module_init
  4. orte_sstore_central_global_module_finalize
  5. orte_sstore_central_global_request_checkpoint_handle
  6. orte_sstore_central_global_request_restart_handle
  7. orte_sstore_central_global_request_global_snapshot_data
  8. orte_sstore_central_global_register
  9. orte_sstore_central_global_get_attr
  10. orte_sstore_central_global_set_attr
  11. orte_sstore_central_global_sync
  12. orte_sstore_central_global_remove
  13. orte_sstore_central_global_pack
  14. orte_sstore_central_global_unpack
  15. create_new_handle_info
  16. find_handle_info
  17. find_handle_info_from_ref
  18. sstore_central_global_start_listener
  19. sstore_central_global_stop_listener
  20. sstore_central_global_recv
  21. process_local_pull
  22. process_local_push
  23. init_global_snapshot_directory
  24. metadata_open
  25. metadata_close
  26. metadata_write_int
  27. metadata_write_str
  28. metadata_write_timestamp
  29. orte_sstore_central_extract_global_metadata
  30. central_snapshot_sort_compare_fn

   1 /*
   2  * Copyright (c)      2011 The Trustees of Indiana University.
   3  *                         All rights reserved.
   4  * Copyright (c) 2004-2011 The University of Tennessee and The University
   5  *                         of Tennessee Research Foundation.  All rights
   6  *                         reserved.
   7  * Copyright (c) 2018      Intel, Inc.  All rights reserved.
   8  * $COPYRIGHT$
   9  *
  10  * Additional copyrights may follow
  11  *
  12  * $HEADER$
  13  */
  14 
  15 /*
  16  *
  17  */
  18 
  19 #include "orte_config.h"
  20 
  21 #include <string.h>
  22 #include <stdlib.h>
  23 #include <sys/types.h>
  24 #include <sys/stat.h>
  25 #include <sys/wait.h>
  26 #ifdef HAVE_UNISTD_H
  27 #include <unistd.h>
  28 #endif  /* HAVE_UNISTD_H */
  29 
  30 #include "orte/mca/mca.h"
  31 #include "opal/mca/base/base.h"
  32 
  33 #include "opal/mca/event/event.h"
  34 
  35 #include "orte/constants.h"
  36 #include "orte/util/show_help.h"
  37 #include "opal/util/argv.h"
  38 #include "opal/util/output.h"
  39 #include "opal/util/opal_environ.h"
  40 #include "opal/util/basename.h"
  41 #include "opal/util/os_dirpath.h"
  42 
  43 #include "opal/threads/mutex.h"
  44 #include "opal/threads/condition.h"
  45 
  46 #include "orte/util/name_fns.h"
  47 #include "orte/util/proc_info.h"
  48 #include "orte/runtime/orte_globals.h"
  49 #include "orte/runtime/orte_wait.h"
  50 #include "orte/mca/errmgr/errmgr.h"
  51 #include "orte/mca/ess/ess.h"
  52 #include "orte/mca/rml/rml.h"
  53 #include "orte/mca/rml/rml_types.h"
  54 #include "orte/mca/snapc/snapc.h"
  55 #include "orte/mca/snapc/base/base.h"
  56 
  57 #include "orte/mca/sstore/sstore.h"
  58 #include "orte/mca/sstore/base/base.h"
  59 
  60 #include "sstore_central.h"
  61 
  62 #define SSTORE_HANDLE_TYPE_NONE    0
  63 #define SSTORE_HANDLE_TYPE_CKPT    1
  64 #define SSTORE_HANDLE_TYPE_RESTART 2
  65 
  66 #define SSTORE_GLOBAL_NONE    0
  67 #define SSTORE_GLOBAL_ERROR   1
  68 #define SSTORE_GLOBAL_INIT    2
  69 #define SSTORE_GLOBAL_REG     3
  70 #define SSTORE_GLOBAL_SYNCING 4
  71 #define SSTORE_GLOBAL_SYNCED  5
  72 
  73 /**********
  74  * Object Stuff
  75  **********/
  76 struct  orte_sstore_central_global_snapshot_info_t {
  77     /** List super object */
  78     opal_list_item_t super;
  79 
  80     /** */
  81     orte_sstore_base_handle_t id;
  82 
  83     /** Job ID */
  84     orte_jobid_t jobid;
  85 
  86     /** State */
  87     int state;
  88 
  89     /** Handle type */
  90     int handle_type;
  91 
  92     /** Sequence Number */
  93     int seq_num;
  94 
  95     /** Reference Name */
  96     char * ref_name;
  97 
  98     /** Local Location (Relative Path to base_location) */
  99     char * local_location;
 100 
 101     /** Application location format */
 102     char * app_location_fmt;
 103 
 104     /** Base location */
 105     char * base_location;
 106 
 107     /** Metadata File Name */
 108     char *metadata_filename;
 109 
 110     /** Metadata File Descriptor */
 111     FILE *metadata;
 112 
 113     /** Num procs in job */
 114     int num_procs_total;
 115 
 116     /** Num procs synced */
 117     int num_procs_synced;
 118 
 119     /** Is this checkpoint representing a migration? */
 120     bool migrating;
 121 };
 122 typedef struct orte_sstore_central_global_snapshot_info_t orte_sstore_central_global_snapshot_info_t;
 123 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_sstore_central_global_snapshot_info_t);
 124 
 125 void orte_sstore_central_global_snapshot_info_construct(orte_sstore_central_global_snapshot_info_t *info);
 126 void orte_sstore_central_global_snapshot_info_destruct( orte_sstore_central_global_snapshot_info_t *info);
 127 
 128 OBJ_CLASS_INSTANCE(orte_sstore_central_global_snapshot_info_t,
 129                    opal_list_item_t,
 130                    orte_sstore_central_global_snapshot_info_construct,
 131                    orte_sstore_central_global_snapshot_info_destruct);
 132 
 133 
 134 /**********
 135  * Local Function and Variable Declarations
 136  **********/
 137 static bool is_global_listener_active = false;
 138 static int sstore_central_global_start_listener(void);
 139 static int sstore_central_global_stop_listener(void);
 140 static void sstore_central_global_recv(int status,
 141                                        orte_process_name_t* sender,
 142                                        opal_buffer_t* buffer,
 143                                        orte_rml_tag_t tag,
 144                                        void* cbdata);
 145 static int process_local_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_central_global_snapshot_info_t *handle_info);
 146 static int process_local_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_central_global_snapshot_info_t *handle_info);
 147 
 148 static orte_sstore_central_global_snapshot_info_t *create_new_handle_info(int seq, int type, orte_jobid_t jobid);
 149 static orte_sstore_central_global_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle);
 150 static orte_sstore_central_global_snapshot_info_t *find_handle_info_from_ref(char *ref, int seq);
 151 
 152 static int metadata_open(orte_sstore_central_global_snapshot_info_t * handle_info);
 153 static int metadata_close(orte_sstore_central_global_snapshot_info_t * handle_info);
 154 static int metadata_write_int(orte_sstore_central_global_snapshot_info_t * handle_info, char * key, int value);
 155 static int metadata_write_str(orte_sstore_central_global_snapshot_info_t * handle_info, char * key, char *value);
 156 static int metadata_write_timestamp(orte_sstore_central_global_snapshot_info_t * handle_info);
 157 
 158 static int init_global_snapshot_directory(orte_sstore_central_global_snapshot_info_t *handle_info);
 159 static int central_snapshot_sort_compare_fn(opal_list_item_t **a,
 160                                             opal_list_item_t **b);
 161 static int orte_sstore_central_extract_global_metadata(orte_sstore_central_global_snapshot_info_t * handle_info,
 162                                                        orte_sstore_base_global_snapshot_info_t *global_snapshot);
 163 
 164 static int next_handle_id = 1;
 165 
 166 static opal_list_t *active_handles = NULL;
 167 
 168 /**********
 169  * Object stuff
 170  **********/
 171 void orte_sstore_central_global_snapshot_info_construct(orte_sstore_central_global_snapshot_info_t *info)
 172 {
 173     info->id      = next_handle_id;
 174     next_handle_id++;
 175 
 176     info->jobid = ORTE_JOBID_INVALID;
 177 
 178     info->state = SSTORE_GLOBAL_NONE;
 179 
 180     info->handle_type = SSTORE_HANDLE_TYPE_NONE;
 181 
 182     info->seq_num = -1;
 183 
 184     info->base_location  = strdup(orte_sstore_base_global_snapshot_dir);
 185 
 186     info->ref_name       = NULL;
 187     info->local_location = NULL;
 188     info->app_location_fmt = NULL;
 189 
 190     info->metadata_filename = NULL;
 191     info->metadata = NULL;
 192 
 193     info->num_procs_total = 0;
 194     info->num_procs_synced = 0;
 195 
 196     info->migrating = false;
 197 }
 198 
 199 void orte_sstore_central_global_snapshot_info_destruct( orte_sstore_central_global_snapshot_info_t *info)
 200 {
 201     info->id      = 0;
 202     info->seq_num = -1;
 203 
 204     info->jobid = ORTE_JOBID_INVALID;
 205 
 206     info->state = SSTORE_GLOBAL_NONE;
 207 
 208     info->handle_type = SSTORE_HANDLE_TYPE_NONE;
 209 
 210     if( NULL != info->ref_name ) {
 211         free( info->ref_name );
 212         info->ref_name  = NULL;
 213     }
 214 
 215     if( NULL != info->local_location ) {
 216         free( info->local_location );
 217         info->local_location = NULL;
 218     }
 219 
 220     if( NULL != info->app_location_fmt ) {
 221         free( info->app_location_fmt );
 222         info->app_location_fmt = NULL;
 223     }
 224 
 225     if( NULL != info->base_location ) {
 226         free( info->base_location );
 227         info->base_location = NULL;
 228     }
 229 
 230     if( NULL != info->metadata_filename ) {
 231         free( info->metadata_filename ) ;
 232         info->metadata_filename = NULL;
 233     }
 234 
 235     if( NULL != info->metadata ) {
 236         fclose(info->metadata);
 237         info->metadata = NULL;
 238     }
 239 
 240     info->num_procs_total = 0;
 241     info->num_procs_synced = 0;
 242 
 243     info->migrating = false;
 244 }
 245 
 246 /******************
 247  * Local functions
 248  ******************/
 249 int orte_sstore_central_global_module_init(void)
 250 {
 251     int ret, exit_status = ORTE_SUCCESS;
 252 
 253     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
 254                          "sstore:central:(global): init()"));
 255 
 256     if( NULL == active_handles ) {
 257         active_handles = OBJ_NEW(opal_list_t);
 258     }
 259 
 260     /*
 261      * Setup a listener for the HNP/Apps
 262      */
 263     if( ORTE_SUCCESS != (ret = sstore_central_global_start_listener()) ) {
 264         ORTE_ERROR_LOG(ret);
 265         exit_status = ret;
 266         goto cleanup;
 267     }
 268 
 269     exit_status = orte_sstore_central_local_module_init();
 270 
 271  cleanup:
 272     return exit_status;
 273 }
 274 
 275 int orte_sstore_central_global_module_finalize(void)
 276 {
 277     int ret, exit_status = ORTE_SUCCESS;
 278 
 279     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
 280                          "sstore:central:(global): finalize()"));
 281 
 282     exit_status = orte_sstore_central_local_module_finalize();
 283 
 284     if( NULL != active_handles ) {
 285         OBJ_RELEASE(active_handles);
 286     }
 287 
 288     /*
 289      * Shutdown the listener for the HNP/Apps
 290      */
 291     if( ORTE_SUCCESS != (ret = sstore_central_global_stop_listener()) ) {
 292         ORTE_ERROR_LOG(ret);
 293         exit_status = ret;
 294         goto cleanup;
 295     }
 296 
 297  cleanup:
 298     return exit_status;
 299 }
 300 
 301 int orte_sstore_central_global_request_checkpoint_handle(orte_sstore_base_handle_t *handle, int seq, orte_jobid_t jobid)
 302 {
 303     int ret, exit_status = ORTE_SUCCESS;
 304     orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
 305 
 306     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
 307                          "sstore:central:(global): request_checkpoint_handle()"));
 308 
 309     /*
 310      * Construct a handle
 311      *  - Associate all of the necessary information
 312      */
 313     handle_info = create_new_handle_info(seq, SSTORE_HANDLE_TYPE_CKPT, jobid);
 314 
 315     /*
 316      * Create the global checkpoint directory
 317      */
 318     if( ORTE_SUCCESS != (ret = init_global_snapshot_directory(handle_info)) ) {
 319         ORTE_ERROR_LOG(ret);
 320         exit_status = ret;
 321         goto cleanup;
 322     }
 323 
 324     /*
 325      * Return the handle
 326      */
 327     *handle = handle_info->id;
 328 
 329  cleanup:
 330     return exit_status;
 331 }
 332 
 333 int orte_sstore_central_global_request_restart_handle(orte_sstore_base_handle_t *handle, char *basedir,
 334                                                       char *ref, int seq,
 335                                                       orte_sstore_base_global_snapshot_info_t *snapshot)
 336 {
 337     int ret, exit_status = ORTE_SUCCESS;
 338     orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
 339 
 340     handle_info = find_handle_info_from_ref(ref, seq);
 341     if( NULL == handle_info ) {
 342         ret = ORTE_ERROR;
 343         ORTE_ERROR_LOG(ret);
 344         exit_status = ret;
 345         goto cleanup;
 346     }
 347 
 348     *handle = handle_info->id;
 349 
 350  cleanup:
 351     return exit_status;
 352 }
 353 
 354 int orte_sstore_central_global_request_global_snapshot_data(orte_sstore_base_handle_t *handle,
 355                                                             orte_sstore_base_global_snapshot_info_t *snapshot)
 356 {
 357     int ret, exit_status = ORTE_SUCCESS;
 358     orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
 359 
 360     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
 361                          "sstore:central:(global): request_global_snapshot_data()"));
 362 
 363     /*
 364      * Lookup the handle (if NULL, use last stable)
 365      */
 366     if( NULL != handle ) {
 367         handle_info = find_handle_info(*handle);
 368         snapshot->ss_handle = *handle;
 369     } else {
 370         handle_info = find_handle_info(orte_sstore_handle_last_stable);
 371         snapshot->ss_handle = orte_sstore_handle_last_stable;
 372     }
 373 
 374     /*
 375      * Construct the snapshot from local data, and metadata file
 376      */
 377     snapshot->seq_num   = handle_info->seq_num;
 378     snapshot->reference = strdup(handle_info->ref_name);
 379     snapshot->basedir   = strdup(handle_info->base_location);
 380     snapshot->metadata_filename = strdup(handle_info->metadata_filename);
 381 
 382     /* If this is the current checkpoint, pull data from local cache */
 383     if( orte_sstore_handle_current == snapshot->ss_handle ) {
 384         if( ORTE_SUCCESS != (ret = orte_sstore_central_extract_global_metadata(handle_info, snapshot)) ) {
 385             ORTE_ERROR_LOG(ret);
 386             exit_status = ret;
 387             goto cleanup;
 388         }
 389     }
 390     /* Otherwise, pull from metadata */
 391     else {
 392         if( ORTE_SUCCESS != (ret = orte_sstore_base_extract_global_metadata(snapshot)) ) {
 393             ORTE_ERROR_LOG(ret);
 394             exit_status = ret;
 395             goto cleanup;
 396         }
 397     }
 398 
 399     opal_list_sort(&snapshot->local_snapshots, central_snapshot_sort_compare_fn);
 400 
 401  cleanup:
 402     return exit_status;
 403 }
 404 
 405 int orte_sstore_central_global_register(orte_sstore_base_handle_t handle)
 406 {
 407     int ret, exit_status = ORTE_SUCCESS;
 408     orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
 409 
 410     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
 411                          "sstore:central:(global): register(%d) - Global", handle));
 412 
 413     /*
 414      * Lookup the handle
 415      */
 416     handle_info = find_handle_info(handle);
 417     if( SSTORE_GLOBAL_REG != handle_info->state ) {
 418         handle_info->state = SSTORE_GLOBAL_REG;
 419     } else {
 420         return orte_sstore_central_local_register(handle);
 421     }
 422 
 423     orte_sstore_handle_current = handle;
 424 
 425     /*
 426      * Associate the metadata
 427      */
 428     if( handle_info->migrating ) {
 429         if( ORTE_SUCCESS != (ret = metadata_write_int(handle_info,
 430                                                       SSTORE_METADATA_INTERNAL_MIG_SEQ_STR,
 431                                                       handle_info->seq_num)) ) {
 432             ORTE_ERROR_LOG(ret);
 433             exit_status = ret;
 434             goto cleanup;
 435         }
 436     } else {
 437         if( ORTE_SUCCESS != (ret = metadata_write_int(handle_info,
 438                                                       SSTORE_METADATA_GLOBAL_SNAP_SEQ_STR,
 439                                                       handle_info->seq_num)) ) {
 440             ORTE_ERROR_LOG(ret);
 441             exit_status = ret;
 442             goto cleanup;
 443         }
 444     }
 445 
 446     if( ORTE_SUCCESS != (ret = metadata_write_str(handle_info,
 447                                                   SSTORE_METADATA_LOCAL_SNAP_REF_FMT_STR,
 448                                                   orte_sstore_base_local_snapshot_fmt)) ) {
 449         ORTE_ERROR_LOG(ret);
 450         exit_status = ret;
 451         goto cleanup;
 452     }
 453 
 454     if( ORTE_SUCCESS != (ret = metadata_write_timestamp(handle_info)) ) {
 455         ORTE_ERROR_LOG(ret);
 456         exit_status = ret;
 457         goto cleanup;
 458     }
 459 
 460  cleanup:
 461     return exit_status;
 462 }
 463 
 464 int orte_sstore_central_global_get_attr(orte_sstore_base_handle_t handle, orte_sstore_base_key_t key, char **value)
 465 {
 466     int exit_status = ORTE_SUCCESS;
 467     orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
 468 
 469     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
 470                          "sstore:central:(global): get_attr()"));
 471 
 472     /*
 473      * Lookup the handle
 474      */
 475     handle_info = find_handle_info(handle);
 476 
 477     /*
 478      * Access metadata
 479      */
 480     if( SSTORE_METADATA_GLOBAL_SNAP_REF == key ) {
 481         *value = strdup(handle_info->ref_name);
 482     }
 483     else if( SSTORE_METADATA_GLOBAL_SNAP_SEQ == key ) {
 484         opal_asprintf(value, "%d", handle_info->seq_num);
 485     }
 486     else if( SSTORE_METADATA_LOCAL_SNAP_REF_FMT == key ) {
 487         *value = strdup(orte_sstore_base_local_snapshot_fmt);
 488     }
 489     /* 'central' does not cache, so these are the same */
 490     else if( SSTORE_METADATA_LOCAL_SNAP_LOC       == key ) {
 491         opal_asprintf(value, "%s/%s/%d",
 492                  handle_info->base_location,
 493                  handle_info->ref_name,
 494                  handle_info->seq_num);
 495     }
 496     else if( SSTORE_METADATA_LOCAL_SNAP_REF_LOC_FMT == key ) {
 497         opal_asprintf(value, "%s/%s/%d/%s",
 498                  handle_info->base_location,
 499                  handle_info->ref_name,
 500                  handle_info->seq_num,
 501                  orte_sstore_base_local_snapshot_fmt);
 502     }
 503     else {
 504         exit_status = ORTE_ERR_NOT_SUPPORTED;
 505     }
 506 
 507     return exit_status;
 508 }
 509 
 510 int orte_sstore_central_global_set_attr(orte_sstore_base_handle_t handle, orte_sstore_base_key_t key, char *value)
 511 {
 512     int ret, exit_status = ORTE_SUCCESS;
 513     orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
 514     char *key_str = NULL;
 515 
 516     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
 517                          "sstore:central:(global): set_attr()"));
 518 
 519     /*
 520      * Lookup the handle
 521      */
 522     handle_info = find_handle_info(handle);
 523 
 524     /*
 525      * Process key (Access metadata)
 526      */
 527     if( key == SSTORE_METADATA_GLOBAL_MIGRATING ) {
 528         handle_info->migrating = true;
 529     }
 530     else {
 531         orte_sstore_base_convert_key_to_string(key, &key_str);
 532         if( NULL == key_str ) {
 533             ORTE_ERROR_LOG(ORTE_ERROR);
 534             exit_status = ORTE_ERROR;
 535             goto cleanup;
 536         }
 537 
 538         if( ORTE_SUCCESS != (ret = metadata_write_str(handle_info, key_str, value))) {
 539             ORTE_ERROR_LOG(ret);
 540             exit_status = ret;
 541             goto cleanup;
 542         }
 543     }
 544 
 545  cleanup:
 546     if( NULL != key_str ) {
 547         free(key_str);
 548         key_str = NULL;
 549     }
 550 
 551     return exit_status;
 552 }
 553 
 554 int orte_sstore_central_global_sync(orte_sstore_base_handle_t handle)
 555 {
 556     int ret, exit_status = ORTE_SUCCESS;
 557     orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
 558 
 559     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
 560                          "sstore:central:(global): sync()"));
 561 
 562     /*
 563      * Lookup the handle
 564      */
 565     handle_info = find_handle_info(handle);
 566     if( SSTORE_GLOBAL_SYNCING != handle_info->state ) {
 567         handle_info->state = SSTORE_GLOBAL_SYNCING;
 568         if( ORTE_SNAPC_LOCAL_COORD_TYPE == (orte_snapc_coord_type & ORTE_SNAPC_LOCAL_COORD_TYPE) ) {
 569             return orte_sstore_central_local_sync(handle);
 570         }
 571     }
 572 
 573     /*
 574      * Synchronize all of the files
 575      */
 576     while(handle_info->num_procs_synced < handle_info->num_procs_total) {
 577         opal_progress();
 578     }
 579 
 580     /*
 581      * Finalize and close the metadata
 582      */
 583     if( ORTE_SUCCESS != (ret = metadata_write_timestamp(handle_info)) ) {
 584         ORTE_ERROR_LOG(ret);
 585         exit_status = ret;
 586         goto cleanup;
 587     }
 588 
 589     if( handle_info->migrating ) {
 590         if( ORTE_SUCCESS != (ret = metadata_write_int(handle_info,
 591                                                       SSTORE_METADATA_INTERNAL_DONE_MIG_SEQ_STR,
 592                                                       handle_info->seq_num)) ) {
 593             ORTE_ERROR_LOG(ret);
 594             exit_status = ret;
 595             goto cleanup;
 596         }
 597     } else {
 598         if( ORTE_SUCCESS != (ret = metadata_write_int(handle_info,
 599                                                       SSTORE_METADATA_INTERNAL_DONE_SEQ_STR,
 600                                                       handle_info->seq_num)) ) {
 601             ORTE_ERROR_LOG(ret);
 602             exit_status = ret;
 603             goto cleanup;
 604         }
 605     }
 606 
 607     if( ORTE_SUCCESS != (ret = metadata_close(handle_info)) ) {
 608         ORTE_ERROR_LOG(ret);
 609         exit_status = ret;
 610         goto cleanup;
 611     }
 612 
 613     /* JJH: We should lock this var! */
 614     if( !handle_info->migrating ) {
 615         orte_sstore_base_is_checkpoint_available = true;
 616         orte_sstore_handle_last_stable = orte_sstore_handle_current;
 617     }
 618 
 619  cleanup:
 620     return exit_status;
 621 }
 622 
 623 int orte_sstore_central_global_remove(orte_sstore_base_handle_t handle)
 624 {
 625     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
 626                          "sstore:central:(global): remove()"));
 627 
 628     /*
 629      * Lookup the handle
 630      */
 631 
 632     return ORTE_SUCCESS;
 633 }
 634 
 635 int orte_sstore_central_global_pack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t handle)
 636 {
 637     int ret, exit_status = ORTE_SUCCESS;
 638     orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
 639 
 640     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
 641                          "sstore:central:(global): pack()"));
 642 
 643     /*
 644      * Lookup the handle
 645      */
 646     handle_info = find_handle_info(handle);
 647 
 648     /*
 649      * Pack the handle ID
 650      */
 651     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &handle, 1, ORTE_SSTORE_HANDLE))) {
 652         ORTE_ERROR_LOG(ret);
 653         exit_status = ret;
 654         goto cleanup;
 655     }
 656 
 657     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
 658                          "sstore:central:(global): pack(%d, %d, %s)",
 659                          handle_info->id,
 660                          handle_info->seq_num,
 661                          handle_info->ref_name));
 662 
 663     /*
 664      * Pack any metadata
 665      */
 666     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->seq_num), 1, OPAL_INT )) ) {
 667         ORTE_ERROR_LOG(ret);
 668         exit_status = ret;
 669         goto cleanup;
 670     }
 671 
 672     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->ref_name), 1, OPAL_STRING )) ) {
 673         ORTE_ERROR_LOG(ret);
 674         exit_status = ret;
 675         goto cleanup;
 676     }
 677 
 678     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->app_location_fmt), 1, OPAL_STRING )) ) {
 679         ORTE_ERROR_LOG(ret);
 680         exit_status = ret;
 681         goto cleanup;
 682     }
 683 
 684  cleanup:
 685     return exit_status;
 686 }
 687 
 688 int orte_sstore_central_global_unpack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t *handle)
 689 {
 690     int ret, exit_status = ORTE_SUCCESS;
 691 
 692     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
 693                          "sstore:central:(global): unpack()"));
 694 
 695     /*
 696      * Unpack the handle id
 697      */
 698     if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_JOBID,
 699                                                    ORTE_PROC_MY_NAME,
 700                                                    peer)) {
 701         /*
 702          * Differ to the orted version, so if we have application then they get updated too
 703          */
 704         if( ORTE_SUCCESS != (ret = orte_sstore_central_local_unpack(peer, buffer, handle)) ) {
 705             ORTE_ERROR_LOG(ret);
 706             exit_status = ret;
 707             goto cleanup;
 708         }
 709     }
 710 
 711  cleanup:
 712     return exit_status;
 713 }
 714 
 715 /**************************
 716  * Local functions
 717  **************************/
 718 static orte_sstore_central_global_snapshot_info_t *create_new_handle_info(int seq, int type, orte_jobid_t jobid)
 719 {
 720     orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
 721     orte_job_t *jdata = NULL;
 722 
 723     handle_info = OBJ_NEW(orte_sstore_central_global_snapshot_info_t);
 724 
 725     handle_info->jobid = jobid;
 726 
 727     handle_info->state = SSTORE_GLOBAL_INIT;
 728 
 729     handle_info->handle_type = type;
 730 
 731     handle_info->seq_num = seq;
 732 
 733     orte_sstore_base_get_global_snapshot_ref(&(handle_info->ref_name), getpid());
 734 
 735     opal_asprintf(&(handle_info->local_location), "%s/%d",
 736              handle_info->ref_name, handle_info->seq_num);
 737 
 738     opal_asprintf(&(handle_info->app_location_fmt), "%s/%s/%s",
 739              handle_info->base_location,
 740              handle_info->local_location,
 741              orte_sstore_base_local_snapshot_fmt);
 742 
 743     opal_asprintf(&(handle_info->metadata_filename), "%s/%s/%s",
 744              handle_info->base_location,
 745              handle_info->ref_name,
 746              orte_sstore_base_global_metadata_filename);
 747 
 748     jdata = orte_get_job_data_object(handle_info->jobid);
 749     handle_info->num_procs_total = (int)jdata->num_procs;
 750 
 751     opal_list_append(active_handles, &(handle_info->super));
 752 
 753     return handle_info;
 754 }
 755 
 756 static orte_sstore_central_global_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle)
 757 {
 758     orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
 759     opal_list_item_t* item = NULL;
 760 
 761     for(item  = opal_list_get_first(active_handles);
 762         item != opal_list_get_end(active_handles);
 763         item  = opal_list_get_next(item) ) {
 764         handle_info = (orte_sstore_central_global_snapshot_info_t*)item;
 765 
 766         if( handle_info->id == handle ) {
 767             return handle_info;
 768         }
 769     }
 770 
 771     return NULL;
 772 }
 773 
 774 static orte_sstore_central_global_snapshot_info_t *find_handle_info_from_ref(char *ref, int seq)
 775 {
 776     orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
 777     opal_list_item_t* item = NULL;
 778 
 779     for(item  = opal_list_get_first(active_handles);
 780         item != opal_list_get_end(active_handles);
 781         item  = opal_list_get_next(item) ) {
 782         handle_info = (orte_sstore_central_global_snapshot_info_t*)item;
 783 
 784         if( handle_info->seq_num == seq ) {
 785             if( NULL != ref &&
 786                 strncmp(handle_info->ref_name, ref, strlen(ref)) ) {
 787                 return handle_info;
 788             } else {
 789                 return handle_info;
 790             }
 791         }
 792     }
 793 
 794     return NULL;
 795 }
 796 
 797 static int sstore_central_global_start_listener(void)
 798 {
 799     if( is_global_listener_active ) {
 800         return ORTE_SUCCESS;
 801     }
 802 
 803     orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL,
 804                             ORTE_RML_PERSISTENT, sstore_central_global_recv, NULL);
 805 
 806     is_global_listener_active = true;
 807     return ORTE_SUCCESS;
 808 }
 809 
 810 static int sstore_central_global_stop_listener(void)
 811 {
 812     orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL);
 813 
 814     is_global_listener_active = false;
 815     return ORTE_SUCCESS;
 816 }
 817 
 818 static void sstore_central_global_recv(int status,
 819                                        orte_process_name_t* sender,
 820                                        opal_buffer_t* buffer,
 821                                        orte_rml_tag_t tag,
 822                                        void* cbdata)
 823 {
 824     int ret;
 825     orte_sstore_central_cmd_flag_t command;
 826     orte_std_cntr_t count;
 827     orte_sstore_base_handle_t loc_id;
 828     orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
 829 
 830     if( ORTE_RML_TAG_SSTORE_INTERNAL != tag ) {
 831         return;
 832     }
 833 
 834     /*
 835      * If this was an application process contacting us, then act like an orted
 836      * instead of an HNP
 837      */
 838     if(OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_JOBID,
 839                                                    ORTE_PROC_MY_NAME,
 840                                                    sender)) {
 841         orte_sstore_central_local_recv(status, sender, buffer, tag, cbdata);
 842         return;
 843     }
 844 
 845 
 846     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
 847                          "sstore:central:(global): process_cmd(%s)",
 848                          ORTE_NAME_PRINT(sender)));
 849 
 850     count = 1;
 851     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SSTORE_CENTRAL_CMD))) {
 852         ORTE_ERROR_LOG(ret);
 853         goto cleanup;
 854     }
 855 
 856     count = 1;
 857     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &loc_id, &count, ORTE_SSTORE_HANDLE )) ) {
 858         ORTE_ERROR_LOG(ret);
 859         goto cleanup;
 860     }
 861 
 862     /*
 863      * Find the referenced handle
 864      */
 865     if(NULL == (handle_info = find_handle_info(loc_id)) ) {
 866         ; /* JJH big problem */
 867     }
 868 
 869     /*
 870      * Process the command
 871      */
 872     if( ORTE_SSTORE_CENTRAL_PULL == command ) {
 873         process_local_pull(sender, buffer, handle_info);
 874     }
 875     else if( ORTE_SSTORE_CENTRAL_PUSH == command ) {
 876         process_local_push(sender, buffer, handle_info);
 877     }
 878 
 879  cleanup:
 880     return;
 881 }
 882 
 883 static int process_local_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_central_global_snapshot_info_t *handle_info)
 884 {
 885     int ret, exit_status = ORTE_SUCCESS;
 886     opal_buffer_t *loc_buffer = NULL;
 887     orte_sstore_central_cmd_flag_t command;
 888 
 889     /*
 890      * Push back the requested information
 891      */
 892     loc_buffer = OBJ_NEW(opal_buffer_t);
 893 
 894     command = ORTE_SSTORE_CENTRAL_PUSH;
 895     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &command, 1, ORTE_SSTORE_CENTRAL_CMD))) {
 896         ORTE_ERROR_LOG(ret);
 897         exit_status = ret;
 898         goto cleanup;
 899     }
 900 
 901     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
 902         ORTE_ERROR_LOG(ret);
 903         exit_status = ret;
 904         goto cleanup;
 905     }
 906 
 907     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->seq_num), 1, OPAL_INT))) {
 908         ORTE_ERROR_LOG(ret);
 909         exit_status = ret;
 910         goto cleanup;
 911     }
 912 
 913     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->ref_name), 1, OPAL_STRING))) {
 914         ORTE_ERROR_LOG(ret);
 915         exit_status = ret;
 916         goto cleanup;
 917     }
 918 
 919     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->app_location_fmt), 1, OPAL_STRING))) {
 920         ORTE_ERROR_LOG(ret);
 921         exit_status = ret;
 922         goto cleanup;
 923     }
 924 
 925     if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(peer, loc_buffer, ORTE_RML_TAG_SSTORE_INTERNAL,
 926                                                        orte_rml_send_callback, NULL))) {
 927         ORTE_ERROR_LOG(ret);
 928         exit_status = ret;
 929         goto cleanup;
 930     }
 931     /* loc_buffer should not be released here; the callback releases it */
 932     loc_buffer = NULL;
 933 
 934  cleanup:
 935     if (NULL != loc_buffer) {
 936         OBJ_RELEASE(loc_buffer);
 937         loc_buffer = NULL;
 938     }
 939 
 940     return exit_status;
 941 }
 942 
 943 static int process_local_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_central_global_snapshot_info_t *handle_info)
 944 {
 945     int ret, exit_status = ORTE_SUCCESS;
 946     orte_std_cntr_t count;
 947     size_t num_entries, i;
 948     orte_process_name_t name;
 949     bool ckpt_skipped = false;
 950     char * crs_comp = NULL;
 951     char * proc_name = NULL;
 952 
 953     /*
 954      * Unpack the data
 955      */
 956     count = 1;
 957     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &num_entries, &count, OPAL_SIZE))) {
 958         ORTE_ERROR_LOG(ret);
 959         exit_status = ret;
 960         goto cleanup;
 961     }
 962 
 963     for(i = 0; i < num_entries; ++i ) {
 964         count = 1;
 965         if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &name, &count, ORTE_NAME))) {
 966             ORTE_ERROR_LOG(ret);
 967             exit_status = ret;
 968             goto cleanup;
 969         }
 970 
 971         count = 1;
 972         if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &ckpt_skipped, &count, OPAL_BOOL))) {
 973             ORTE_ERROR_LOG(ret);
 974             exit_status = ret;
 975             goto cleanup;
 976         }
 977 
 978         if( !ckpt_skipped ) {
 979             count = 1;
 980             if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &crs_comp, &count, OPAL_STRING))) {
 981                 ORTE_ERROR_LOG(ret);
 982                 exit_status = ret;
 983                 goto cleanup;
 984             }
 985 
 986             /*
 987              * Write this information to the global metadata
 988              */
 989             orte_util_convert_process_name_to_string(&proc_name, &name);
 990 
 991             metadata_write_str(handle_info,
 992                                SSTORE_METADATA_INTERNAL_PROCESS_STR,
 993                                proc_name);
 994             metadata_write_str(handle_info,
 995                                SSTORE_METADATA_LOCAL_CRS_COMP_STR,
 996                                crs_comp);
 997         }
 998 
 999         if( NULL != crs_comp ) {
1000             free(crs_comp);
1001             crs_comp = NULL;
1002         }
1003         if( NULL != proc_name ) {
1004             free(proc_name);
1005             proc_name = NULL;
1006         }
1007 
1008         (handle_info->num_procs_synced)++;
1009     }
1010 
1011  cleanup:
1012     if( NULL != crs_comp ) {
1013         free(crs_comp);
1014         crs_comp = NULL;
1015     }
1016     if( NULL != proc_name ) {
1017         free(proc_name);
1018         proc_name = NULL;
1019     }
1020 
1021     return exit_status;
1022 }
1023 
1024 static int init_global_snapshot_directory(orte_sstore_central_global_snapshot_info_t *handle_info)
1025 {
1026     int ret, exit_status = ORTE_SUCCESS;
1027     char * dir_name = NULL;
1028     mode_t my_mode = S_IRWXU;
1029 
1030     /*
1031      * Make the snapshot directory from the uniq_global_snapshot_name
1032      */
1033     opal_asprintf(&dir_name, "%s/%s",
1034              handle_info->base_location,
1035              handle_info->local_location);
1036     if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(dir_name, my_mode)) ) {
1037         ORTE_ERROR_LOG(ret);
1038         exit_status = ret;
1039         goto cleanup;
1040     }
1041 
1042     /*
1043      * Open up the metadata file
1044      */
1045     if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
1046         ORTE_ERROR_LOG(ret);
1047         exit_status = ret;
1048         goto cleanup;
1049     }
1050 
1051  cleanup:
1052     if(NULL != dir_name) {
1053         free(dir_name);
1054         dir_name = NULL;
1055     }
1056 
1057     return exit_status;
1058 }
1059 
1060 /**************************
1061  * Metadata functions
1062  **************************/
1063 static int metadata_open(orte_sstore_central_global_snapshot_info_t * handle_info)
1064 {
1065     /* If already open, then just return */
1066     if( NULL != handle_info->metadata ) {
1067         return ORTE_SUCCESS;
1068     }
1069 
1070     if (NULL == (handle_info->metadata = fopen(handle_info->metadata_filename, "a")) ) {
1071         opal_output(orte_sstore_base_framework.framework_output,
1072                     "sstore:central:(global):init_dir() Unable to open the file (%s)\n",
1073                     handle_info->metadata_filename);
1074         ORTE_ERROR_LOG(ORTE_ERROR);
1075         return ORTE_ERROR;
1076    }
1077 
1078    return ORTE_SUCCESS;
1079 }
1080 
1081 static int metadata_close(orte_sstore_central_global_snapshot_info_t * handle_info)
1082 {
1083     /* If already closed, then just return */
1084     if( NULL == handle_info->metadata ) {
1085         return ORTE_SUCCESS;
1086     }
1087 
1088     fclose(handle_info->metadata);
1089     handle_info->metadata = NULL;
1090 
1091     return ORTE_SUCCESS;
1092 }
1093 
1094 static int metadata_write_int(orte_sstore_central_global_snapshot_info_t * handle_info, char *key, int value)
1095 {
1096     int ret, exit_status = ORTE_SUCCESS;
1097 
1098     /* Make sure the metadata file is open */
1099     if( NULL == handle_info->metadata ) {
1100         if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
1101             ORTE_ERROR_LOG(ret);
1102             exit_status = ret;
1103             goto cleanup;
1104         }
1105     }
1106 
1107     fprintf(handle_info->metadata, "%s%d\n", key, value);
1108 
1109  cleanup:
1110     return exit_status;
1111 }
1112 
1113 static int metadata_write_str(orte_sstore_central_global_snapshot_info_t * handle_info, char *key, char *value)
1114 {
1115     int ret, exit_status = ORTE_SUCCESS;
1116 
1117     /* Make sure the metadata file is open */
1118     if( NULL == handle_info->metadata ) {
1119         if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
1120             ORTE_ERROR_LOG(ret);
1121             exit_status = ret;
1122             goto cleanup;
1123         }
1124     }
1125 
1126     fprintf(handle_info->metadata, "%s%s\n", key, value);
1127 
1128  cleanup:
1129     return exit_status;
1130 }
1131 
1132 static int metadata_write_timestamp(orte_sstore_central_global_snapshot_info_t * handle_info)
1133 {
1134     int ret, exit_status = ORTE_SUCCESS;
1135     time_t timestamp;
1136 
1137     /* Make sure the metadata file is open */
1138     if( NULL == handle_info->metadata ) {
1139         if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
1140             ORTE_ERROR_LOG(ret);
1141             exit_status = ret;
1142             goto cleanup;
1143         }
1144     }
1145 
1146     timestamp = time(NULL);
1147     fprintf(handle_info->metadata, "%s%s",
1148             SSTORE_METADATA_INTERNAL_TIME_STR,
1149             ctime(&timestamp));
1150 
1151  cleanup:
1152     return exit_status;
1153 }
1154 
1155 static int orte_sstore_central_extract_global_metadata(orte_sstore_central_global_snapshot_info_t * handle_info,
1156                                                        orte_sstore_base_global_snapshot_info_t *global_snapshot)
1157 {
1158     int exit_status = ORTE_SUCCESS;
1159     orte_sstore_base_local_snapshot_info_t *vpid_snapshot = NULL;
1160     opal_list_item_t* item = NULL;
1161     int i = 0;
1162 
1163     /*
1164      * Cleanup the structure a bit, so we can refresh it below
1165      */
1166     while (NULL != (item = opal_list_remove_first(&global_snapshot->local_snapshots))) {
1167         OBJ_RELEASE(item);
1168     }
1169 
1170     if( NULL != global_snapshot->start_time ) {
1171         free( global_snapshot->start_time );
1172         global_snapshot->start_time = NULL;
1173     }
1174 
1175     if( NULL != global_snapshot->end_time ) {
1176         free( global_snapshot->end_time );
1177         global_snapshot->end_time = NULL;
1178     }
1179 
1180     /*
1181      * Create a structure for each application process
1182      */
1183     for(i = 0; i < handle_info->num_procs_total; ++i) {
1184         vpid_snapshot = OBJ_NEW(orte_sstore_base_local_snapshot_info_t);
1185         vpid_snapshot->ss_handle = handle_info->id;
1186 
1187         vpid_snapshot->process_name.jobid  = handle_info->jobid;
1188         vpid_snapshot->process_name.vpid   = i;
1189 
1190         vpid_snapshot->crs_comp     = NULL;
1191         global_snapshot->start_time = NULL;
1192         global_snapshot->end_time   = NULL;
1193 
1194         opal_list_append(&global_snapshot->local_snapshots, &(vpid_snapshot->super));
1195     }
1196 
1197     return exit_status;
1198 }
1199 
1200 static int central_snapshot_sort_compare_fn(opal_list_item_t **a,
1201                                             opal_list_item_t **b)
1202 {
1203     orte_sstore_base_local_snapshot_info_t *snap_a, *snap_b;
1204 
1205     snap_a = (orte_sstore_base_local_snapshot_info_t*)(*a);
1206     snap_b = (orte_sstore_base_local_snapshot_info_t*)(*b);
1207 
1208     if( snap_a->process_name.vpid > snap_b->process_name.vpid ) {
1209         return 1;
1210     }
1211     else if( snap_a->process_name.vpid == snap_b->process_name.vpid ) {
1212         return 0;
1213     }
1214     else {
1215         return -1;
1216     }
1217 }

/* [<][>][^][v][top][bottom][index][help] */