root/orte/mca/sstore/central/sstore_central_local.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. orte_sstore_central_local_snapshot_info_construct
  2. orte_sstore_central_local_snapshot_info_destruct
  3. orte_sstore_central_local_app_snapshot_info_construct
  4. orte_sstore_central_local_app_snapshot_info_destruct
  5. orte_sstore_central_local_module_init
  6. orte_sstore_central_local_module_finalize
  7. orte_sstore_central_local_request_checkpoint_handle
  8. orte_sstore_central_local_register
  9. orte_sstore_central_local_get_attr
  10. orte_sstore_central_local_set_attr
  11. orte_sstore_central_local_sync
  12. orte_sstore_central_local_remove
  13. orte_sstore_central_local_pack
  14. orte_sstore_central_local_unpack
  15. orte_sstore_central_local_recv
  16. create_new_handle_info
  17. find_handle_info
  18. append_new_app_handle_info
  19. find_app_handle_info
  20. sstore_central_local_start_listener
  21. sstore_central_local_stop_listener
  22. process_global_pull
  23. process_global_push
  24. process_app_pull
  25. process_app_push
  26. wait_all_apps_updated
  27. pull_handle_info
  28. push_handle_info

   1 /*
   2  * Copyright (c)      2011 The Trustees of Indiana University.
   3  *                         All rights reserved.
   4  * Copyright (c) 2004-2011 The University of Tennessee and The University
   5  *                         of Tennessee Research Foundation.  All rights
   6  *                         reserved.
   7  * Copyright (c) 2018      Intel, Inc.  All rights reserved.
   8  * $COPYRIGHT$
   9  *
  10  * Additional copyrights may follow
  11  *
  12  * $HEADER$
  13  */
  14 
  15 /*
  16  *
  17  */
  18 
  19 #include "orte_config.h"
  20 
  21 #include <string.h>
  22 #include <stdlib.h>
  23 #include <sys/types.h>
  24 #include <sys/stat.h>
  25 #include <sys/wait.h>
  26 #ifdef HAVE_UNISTD_H
  27 #include <unistd.h>
  28 #endif  /* HAVE_UNISTD_H */
  29 
  30 #include "orte/mca/mca.h"
  31 #include "opal/mca/base/base.h"
  32 
  33 #include "opal/mca/event/event.h"
  34 
  35 #include "orte/constants.h"
  36 #include "orte/util/show_help.h"
  37 #include "opal/util/argv.h"
  38 #include "opal/util/output.h"
  39 #include "opal/util/opal_environ.h"
  40 #include "opal/util/basename.h"
  41 
  42 #include "opal/threads/mutex.h"
  43 #include "opal/threads/condition.h"
  44 
  45 #include "orte/util/name_fns.h"
  46 #include "orte/util/proc_info.h"
  47 #include "orte/runtime/orte_globals.h"
  48 #include "orte/runtime/orte_wait.h"
  49 #include "orte/mca/errmgr/errmgr.h"
  50 #include "orte/mca/rml/rml.h"
  51 #include "orte/mca/rml/rml_types.h"
  52 #include "orte/mca/odls/odls_types.h"
  53 
  54 #include "orte/mca/sstore/sstore.h"
  55 #include "orte/mca/sstore/base/base.h"
  56 
  57 #include "sstore_central.h"
  58 
  59 /**********
  60  * Object stuff
  61  **********/
  62 #define SSTORE_LOCAL_NONE   0
  63 #define SSTORE_LOCAL_ERROR  1
  64 #define SSTORE_LOCAL_INIT   2
  65 #define SSTORE_LOCAL_READY  3
  66 #define SSTORE_LOCAL_SYNCED 4
  67 
  68 struct  orte_sstore_central_local_snapshot_info_t {
  69     /** List super object */
  70     opal_list_item_t super;
  71 
  72     /** */
  73     orte_sstore_base_handle_t id;
  74 
  75     /** Status */
  76     int status;
  77 
  78     /** Sequence Number */
  79     int seq_num;
  80 
  81     /** Global Reference Name */
  82     char * global_ref_name;
  83 
  84     /** Local Location Format String */
  85     char * location_fmt;
  86 
  87     /* Application info handles*/
  88     opal_list_t *app_info_handle;
  89 };
  90 typedef struct orte_sstore_central_local_snapshot_info_t orte_sstore_central_local_snapshot_info_t;
  91 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_sstore_central_local_snapshot_info_t);
  92 
  93 void orte_sstore_central_local_snapshot_info_construct(orte_sstore_central_local_snapshot_info_t *info);
  94 void orte_sstore_central_local_snapshot_info_destruct( orte_sstore_central_local_snapshot_info_t *info);
  95 
  96 OBJ_CLASS_INSTANCE(orte_sstore_central_local_snapshot_info_t,
  97                    opal_list_item_t,
  98                    orte_sstore_central_local_snapshot_info_construct,
  99                    orte_sstore_central_local_snapshot_info_destruct);
 100 
 101 struct  orte_sstore_central_local_app_snapshot_info_t {
 102     /** List super object */
 103     opal_list_item_t super;
 104 
 105     /** Process Name associated with this entry */
 106     orte_process_name_t name;
 107 
 108     /** Local Location (Absolute Path) */
 109     char * local_location;
 110 
 111     /** Metadata File Name (Absolute Path) */
 112     char * metadata_filename;
 113 
 114     /** CRS Component used */
 115     char * crs_comp;
 116 
 117     /** If this app. skipped the checkpoint - usually for non-migrating procs */
 118     bool ckpt_skipped;
 119 };
 120 typedef struct orte_sstore_central_local_app_snapshot_info_t orte_sstore_central_local_app_snapshot_info_t;
 121 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_sstore_central_local_app_snapshot_info_t);
 122 
 123 void orte_sstore_central_local_app_snapshot_info_construct(orte_sstore_central_local_app_snapshot_info_t *info);
 124 void orte_sstore_central_local_app_snapshot_info_destruct( orte_sstore_central_local_app_snapshot_info_t *info);
 125 
 126 OBJ_CLASS_INSTANCE(orte_sstore_central_local_app_snapshot_info_t,
 127                    opal_list_item_t,
 128                    orte_sstore_central_local_app_snapshot_info_construct,
 129                    orte_sstore_central_local_app_snapshot_info_destruct);
 130 
 131 
 132 
 133 /**********
 134  * Local Function and Variable Declarations
 135  **********/
 136 static bool is_global_listener_active = false;
 137 static int sstore_central_local_start_listener(void);
 138 static int sstore_central_local_stop_listener(void);
 139 
 140 static int process_global_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_central_local_snapshot_info_t *handle_info);
 141 static int process_global_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_central_local_snapshot_info_t *handle_info);
 142 static int process_app_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_central_local_snapshot_info_t *handle_info);
 143 static int process_app_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_central_local_snapshot_info_t *handle_info);
 144 
 145 static orte_sstore_central_local_snapshot_info_t *create_new_handle_info(orte_sstore_base_handle_t handle);
 146 static orte_sstore_central_local_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle);
 147 
 148 static int append_new_app_handle_info(orte_sstore_central_local_snapshot_info_t *handle_info,
 149                                       orte_process_name_t *name);
 150 static orte_sstore_central_local_app_snapshot_info_t *find_app_handle_info(orte_sstore_central_local_snapshot_info_t *handle_info,
 151                                                                            orte_process_name_t *name);
 152 
 153 static int pull_handle_info(orte_sstore_central_local_snapshot_info_t *handle_info );
 154 static int push_handle_info(orte_sstore_central_local_snapshot_info_t *handle_info );
 155 
 156 static int wait_all_apps_updated(orte_sstore_central_local_snapshot_info_t *handle_info);
 157 
 158 
 159 static opal_list_t *active_handles = NULL;
 160 
 161 /**********
 162  * Object stuff
 163  **********/
 164 void orte_sstore_central_local_snapshot_info_construct(orte_sstore_central_local_snapshot_info_t *info)
 165 {
 166     info->id      = 0;
 167 
 168     info->status = SSTORE_LOCAL_NONE;
 169 
 170     info->seq_num = -1;
 171 
 172     info->global_ref_name = NULL;
 173 
 174     info->location_fmt    = NULL;
 175 
 176     info->app_info_handle = OBJ_NEW(opal_list_t);
 177 }
 178 
 179 void orte_sstore_central_local_snapshot_info_destruct( orte_sstore_central_local_snapshot_info_t *info)
 180 {
 181     info->id      = 0;
 182 
 183     info->status = SSTORE_LOCAL_NONE;
 184 
 185     info->seq_num = -1;
 186 
 187     if( NULL != info->global_ref_name ) {
 188         free( info->global_ref_name );
 189         info->global_ref_name  = NULL;
 190     }
 191 
 192     if( NULL != info->location_fmt ) {
 193         free( info->location_fmt );
 194         info->location_fmt = NULL;
 195     }
 196 
 197     if( NULL != info->app_info_handle ) {
 198         OBJ_RELEASE(info->app_info_handle);
 199         info->app_info_handle = NULL;
 200     }
 201 }
 202 
 203 void orte_sstore_central_local_app_snapshot_info_construct(orte_sstore_central_local_app_snapshot_info_t *info)
 204 {
 205     info->name.jobid = ORTE_JOBID_INVALID;
 206     info->name.vpid  = ORTE_VPID_INVALID;
 207 
 208     info->local_location = NULL;
 209     info->metadata_filename = NULL;
 210     info->crs_comp = NULL;
 211     info->ckpt_skipped = false;
 212 }
 213 
 214 void orte_sstore_central_local_app_snapshot_info_destruct( orte_sstore_central_local_app_snapshot_info_t *info)
 215 {
 216     info->name.jobid = ORTE_JOBID_INVALID;
 217     info->name.vpid  = ORTE_VPID_INVALID;
 218 
 219     if( NULL != info->local_location ) {
 220         free(info->local_location);
 221         info->local_location = NULL;
 222     }
 223 
 224     if( NULL != info->metadata_filename ) {
 225         free(info->metadata_filename);
 226         info->metadata_filename = NULL;
 227     }
 228 
 229     if( NULL != info->crs_comp ) {
 230         free(info->crs_comp);
 231         info->crs_comp = NULL;
 232     }
 233 
 234     info->ckpt_skipped = false;
 235 }
 236 
 237 /******************
 238  * Local functions
 239  ******************/
 240 int orte_sstore_central_local_module_init(void)
 241 {
 242     int ret, exit_status = ORTE_SUCCESS;
 243 
 244     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
 245                          "sstore:central:(local): init()"));
 246 
 247     if( NULL == active_handles ) {
 248         active_handles = OBJ_NEW(opal_list_t);
 249     }
 250 
 251     /*
 252      * Setup a listener for the HNP/Apps
 253      * We could be the HNP, in which case the listener is already registered.
 254      */
 255     if( !ORTE_PROC_IS_HNP ) {
 256         if( ORTE_SUCCESS != (ret = sstore_central_local_start_listener()) ) {
 257             ORTE_ERROR_LOG(ret);
 258             exit_status = ret;
 259             goto cleanup;
 260         }
 261     }
 262 
 263  cleanup:
 264     return exit_status;
 265 }
 266 
 267 int orte_sstore_central_local_module_finalize(void)
 268 {
 269     int ret, exit_status = ORTE_SUCCESS;
 270 
 271     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
 272                          "sstore:central:(local): finalize()"));
 273 
 274     if( NULL != active_handles ) {
 275         OBJ_RELEASE(active_handles);
 276     }
 277 
 278     /*
 279      * Shutdown the listener for the HNP/Apps
 280      * We could be the HNP, in which case the listener is already deregistered.
 281      */
 282     if( !ORTE_PROC_IS_HNP ) {
 283         if( ORTE_SUCCESS != (ret = sstore_central_local_stop_listener()) ) {
 284             ORTE_ERROR_LOG(ret);
 285             exit_status = ret;
 286             goto cleanup;
 287         }
 288     }
 289 
 290  cleanup:
 291     return exit_status;
 292 }
 293 
 294 int orte_sstore_central_local_request_checkpoint_handle(orte_sstore_base_handle_t *handle, int seq, orte_jobid_t jobid)
 295 {
 296     opal_output(0, "sstore:central:(local): request_checkpoint_handle() Not implemented!");
 297     return ORTE_ERR_NOT_IMPLEMENTED;
 298 }
 299 
 300 int orte_sstore_central_local_register(orte_sstore_base_handle_t handle)
 301 {
 302     int ret, exit_status = ORTE_SUCCESS;
 303     orte_sstore_central_local_snapshot_info_t *handle_info = NULL;
 304 
 305     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
 306                          "sstore:central:(local): register()"));
 307 
 308     /*
 309      * Create a handle
 310      */
 311     if( NULL == (handle_info = find_handle_info(handle)) ) {
 312         handle_info = create_new_handle_info(handle);
 313     }
 314 
 315     /*
 316      * Get basic information from Global SStore
 317      */
 318     if( ORTE_SUCCESS != (ret = pull_handle_info(handle_info)) ) {
 319         ORTE_ERROR_LOG(ret);
 320         exit_status = ret;
 321         goto cleanup;
 322     }
 323 
 324     /*
 325      * Wait here until the pull request has been satisfied
 326      */
 327     while(SSTORE_LOCAL_READY != handle_info->status &&
 328           SSTORE_LOCAL_ERROR != handle_info->status ) {
 329         opal_progress();
 330     }
 331 
 332  cleanup:
 333     return exit_status;
 334 }
 335 
 336 int orte_sstore_central_local_get_attr(orte_sstore_base_handle_t handle,  orte_sstore_base_key_t key, char **value)
 337 {
 338     opal_output(0, "sstore:central:(local): get_attr() Not implemented!");
 339     return ORTE_ERR_NOT_IMPLEMENTED;
 340 }
 341 
 342 int orte_sstore_central_local_set_attr(orte_sstore_base_handle_t handle,  orte_sstore_base_key_t key, char *value)
 343 {
 344     opal_output(0, "sstore:central:(local): set_attr() Not implemented!");
 345     return ORTE_ERR_NOT_IMPLEMENTED;
 346 }
 347 
 348 int orte_sstore_central_local_sync(orte_sstore_base_handle_t handle)
 349 {
 350     int ret, exit_status = ORTE_SUCCESS;
 351     orte_sstore_central_local_snapshot_info_t *handle_info = NULL;
 352 
 353     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
 354                          "sstore:central:(local): sync()"));
 355 
 356     /*
 357      * Lookup the handle
 358      */
 359     handle_info = find_handle_info(handle);
 360 
 361     /*
 362      * Wait for all of the applications to update their metadata
 363      */
 364     if( ORTE_SUCCESS != (ret = wait_all_apps_updated(handle_info))) {
 365         ORTE_ERROR_LOG(ret);
 366         exit_status = ret;
 367         goto cleanup;
 368     }
 369 
 370     /*
 371      * Push information to the Global coordinator
 372      */
 373     if( ORTE_SUCCESS != (ret = push_handle_info(handle_info)) ) {
 374         ORTE_ERROR_LOG(ret);
 375         exit_status = ret;
 376         goto cleanup;
 377     }
 378 
 379     handle_info->status = SSTORE_LOCAL_SYNCED;
 380 
 381  cleanup:
 382     return exit_status;
 383 }
 384 
 385 int orte_sstore_central_local_remove(orte_sstore_base_handle_t handle)
 386 {
 387     opal_output(0, "sstore:central:(local): remove() Not implemented!");
 388     return ORTE_ERR_NOT_IMPLEMENTED;
 389 }
 390 
 391 int orte_sstore_central_local_pack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t handle)
 392 {
 393     int ret, exit_status = ORTE_SUCCESS;
 394 
 395     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
 396                          "sstore:central:(local): pack()"));
 397 
 398     /*
 399      * Lookup the handle
 400      */
 401 
 402 
 403     /*
 404      * Pack the handle ID
 405      */
 406     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &handle, 1, ORTE_SSTORE_HANDLE))) {
 407         ORTE_ERROR_LOG(ret);
 408         exit_status = ret;
 409         goto cleanup;
 410     }
 411 
 412     /*
 413      * Pack any metadata
 414      */
 415 
 416  cleanup:
 417     return exit_status;
 418 }
 419 
 420 int orte_sstore_central_local_unpack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t *handle)
 421 {
 422     int ret, exit_status = ORTE_SUCCESS;
 423     orte_sstore_central_local_snapshot_info_t *handle_info = NULL;
 424     orte_std_cntr_t count;
 425 
 426     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
 427                          "sstore:central:(local): unpack()"));
 428 
 429     /*
 430      * Unpack the handle id
 431      */
 432     count = 1;
 433     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, handle, &count, ORTE_SSTORE_HANDLE))) {
 434         ORTE_ERROR_LOG(ret);
 435         exit_status = ret;
 436         goto cleanup;
 437     }
 438 
 439     /*
 440      * Lookup the handle
 441      */
 442     if( NULL == (handle_info = find_handle_info(*handle)) ) {
 443         handle_info = create_new_handle_info(*handle);
 444     }
 445 
 446     /*
 447      * Unpack the metadata piggybacked on this message
 448      */
 449     if( ORTE_SUCCESS != (ret = process_global_push(peer, buffer, handle_info))) {
 450         ORTE_ERROR_LOG(ret);
 451         exit_status = ret;
 452         goto cleanup;
 453     }
 454 
 455     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
 456                          "sstore:central:(local): unpack(%d, %d, %s)",
 457                          handle_info->id,
 458                          handle_info->seq_num,
 459                          handle_info->global_ref_name));
 460 
 461  cleanup:
 462     return exit_status;
 463 }
 464 
 465 void orte_sstore_central_local_recv(int status,
 466                                     orte_process_name_t* sender,
 467                                     opal_buffer_t* buffer,
 468                                     orte_rml_tag_t tag,
 469                                     void* cbdata)
 470 {
 471     int ret;
 472     orte_sstore_central_cmd_flag_t command;
 473     orte_std_cntr_t count;
 474     orte_sstore_base_handle_t loc_id;
 475     orte_sstore_central_local_snapshot_info_t *handle_info = NULL;
 476 
 477     if( ORTE_RML_TAG_SSTORE_INTERNAL != tag ) {
 478         return;
 479     }
 480 
 481     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
 482                          "sstore:central:(local): process_cmd(%s)",
 483                          ORTE_NAME_PRINT(sender)));
 484 
 485     count = 1;
 486     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SSTORE_CENTRAL_CMD))) {
 487         ORTE_ERROR_LOG(ret);
 488         goto cleanup;
 489     }
 490 
 491     count = 1;
 492     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &loc_id, &count, ORTE_SSTORE_HANDLE )) ) {
 493         ORTE_ERROR_LOG(ret);
 494         goto cleanup;
 495     }
 496 
 497     /*
 498      * Find the referenced handle (Create if it does not exist)
 499      */
 500     if(NULL == (handle_info = find_handle_info(loc_id)) ) {
 501         handle_info = create_new_handle_info(loc_id);
 502     }
 503 
 504     /*
 505      * Process the command
 506      */
 507     if( ORTE_SSTORE_CENTRAL_PULL == command ) {
 508         if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, ORTE_PROC_MY_HNP, sender)) {
 509             process_global_pull(sender, buffer, handle_info);
 510         } else {
 511             process_app_pull(sender, buffer, handle_info);
 512         }
 513     }
 514     else if( ORTE_SSTORE_CENTRAL_PUSH == command ) {
 515         if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, ORTE_PROC_MY_HNP, sender)) {
 516             process_global_push(sender, buffer, handle_info);
 517         } else {
 518             process_app_push(sender, buffer, handle_info);
 519         }
 520     }
 521 
 522  cleanup:
 523     return;
 524 }
 525 
 526 /**************************
 527  * Local functions
 528  **************************/
 529 static orte_sstore_central_local_snapshot_info_t *create_new_handle_info(orte_sstore_base_handle_t handle)
 530 {
 531     orte_sstore_central_local_snapshot_info_t *handle_info = NULL;
 532     int i;
 533     orte_proc_t *child = NULL;
 534 
 535     if( NULL == active_handles ) {
 536         active_handles = OBJ_NEW(opal_list_t);
 537     }
 538 
 539     handle_info = OBJ_NEW(orte_sstore_central_local_snapshot_info_t);
 540 
 541     handle_info->id = handle;
 542 
 543     opal_list_append(active_handles, &(handle_info->super));
 544 
 545     /*
 546      * Create a sub structure for each child
 547      */
 548     for (i=0; i < orte_local_children->size; i++) {
 549             if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
 550             continue;
 551         }
 552         append_new_app_handle_info(handle_info, &child->name);
 553     }
 554 
 555     handle_info->status = SSTORE_LOCAL_INIT;
 556 
 557     return handle_info;
 558 }
 559 
 560 static orte_sstore_central_local_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle)
 561 {
 562     orte_sstore_central_local_snapshot_info_t *handle_info = NULL;
 563     opal_list_item_t* item = NULL;
 564 
 565     if( NULL == active_handles ) {
 566         return NULL;
 567     }
 568 
 569     for(item  = opal_list_get_first(active_handles);
 570         item != opal_list_get_end(active_handles);
 571         item  = opal_list_get_next(item) ) {
 572         handle_info = (orte_sstore_central_local_snapshot_info_t*)item;
 573 
 574         if( handle_info->id == handle ) {
 575             return handle_info;
 576         }
 577     }
 578 
 579     return NULL;
 580 }
 581 
 582 static int append_new_app_handle_info(orte_sstore_central_local_snapshot_info_t *handle_info,
 583                                       orte_process_name_t *name)
 584 {
 585     orte_sstore_central_local_app_snapshot_info_t *app_info = NULL;
 586 
 587     app_info = OBJ_NEW(orte_sstore_central_local_app_snapshot_info_t);
 588 
 589     app_info->name.jobid = name->jobid;
 590     app_info->name.vpid  = name->vpid;
 591 
 592     opal_list_append(handle_info->app_info_handle, &(app_info->super));
 593 
 594     return ORTE_SUCCESS;
 595 }
 596 
 597 static orte_sstore_central_local_app_snapshot_info_t *find_app_handle_info(orte_sstore_central_local_snapshot_info_t *handle_info,
 598                                                                            orte_process_name_t *name)
 599 {
 600     orte_sstore_central_local_app_snapshot_info_t *app_info = NULL;
 601     opal_list_item_t* item = NULL;
 602     orte_ns_cmp_bitmask_t mask;
 603 
 604     for(item  = opal_list_get_first(handle_info->app_info_handle);
 605         item != opal_list_get_end(handle_info->app_info_handle);
 606         item  = opal_list_get_next(item) ) {
 607         app_info = (orte_sstore_central_local_app_snapshot_info_t*)item;
 608 
 609         mask = ORTE_NS_CMP_ALL;
 610 
 611         if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &app_info->name, name)) {
 612             return app_info;
 613         }
 614     }
 615 
 616     return NULL;
 617 }
 618 
 619 static int sstore_central_local_start_listener(void)
 620 {
 621     if( is_global_listener_active ) {
 622         return ORTE_SUCCESS;
 623     }
 624 
 625     orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL,
 626                             ORTE_RML_PERSISTENT, orte_sstore_central_local_recv, NULL);
 627 
 628     is_global_listener_active = true;
 629     return ORTE_SUCCESS;
 630 }
 631 
 632 static int sstore_central_local_stop_listener(void)
 633 {
 634     orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL);
 635 
 636     is_global_listener_active = false;
 637     return ORTE_SUCCESS;
 638 }
 639 
 640 static int process_global_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_central_local_snapshot_info_t *handle_info)
 641 {
 642     /* JJH should be as simple as calling push_handle_info() */
 643     opal_output(0, "sstore:central:(local): process_global_pull() Not implemented!");
 644     return ORTE_ERR_NOT_IMPLEMENTED;
 645 }
 646 
 647 static int process_global_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_central_local_snapshot_info_t *handle_info)
 648 {
 649     int ret, exit_status = ORTE_SUCCESS;
 650     orte_std_cntr_t count;
 651     orte_sstore_central_local_app_snapshot_info_t *app_info = NULL;
 652     opal_list_item_t* item = NULL;
 653 
 654     count = 1;
 655     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->seq_num), &count, OPAL_INT))) {
 656         ORTE_ERROR_LOG(ret);
 657         exit_status = ret;
 658         goto cleanup;
 659     }
 660 
 661     count = 1;
 662     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->global_ref_name), &count, OPAL_STRING))) {
 663         ORTE_ERROR_LOG(ret);
 664         exit_status = ret;
 665         goto cleanup;
 666     }
 667 
 668     count = 1;
 669     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->location_fmt), &count, OPAL_STRING))) {
 670         ORTE_ERROR_LOG(ret);
 671         exit_status = ret;
 672         goto cleanup;
 673     }
 674 
 675     /*
 676      * For each process we are working with
 677      */
 678     for(item  = opal_list_get_first(handle_info->app_info_handle);
 679         item != opal_list_get_end(handle_info->app_info_handle);
 680         item  = opal_list_get_next(item) ) {
 681         app_info = (orte_sstore_central_local_app_snapshot_info_t*)item;
 682 
 683         if( NULL != app_info->local_location ) {
 684             free(app_info->local_location);
 685             app_info->local_location = NULL;
 686         }
 687         opal_asprintf(&(app_info->local_location), handle_info->location_fmt, app_info->name.vpid);
 688 
 689         if( NULL != app_info->metadata_filename ) {
 690             free(app_info->metadata_filename);
 691             app_info->metadata_filename = NULL;
 692         }
 693         opal_asprintf(&(app_info->metadata_filename), "%s/%s",
 694                  app_info->local_location,
 695                  orte_sstore_base_local_metadata_filename);
 696     }
 697 
 698  cleanup:
 699     if( ORTE_SUCCESS == exit_status ) {
 700         handle_info->status = SSTORE_LOCAL_READY;
 701     } else {
 702         handle_info->status = SSTORE_LOCAL_ERROR;
 703     }
 704 
 705     return exit_status;
 706 }
 707 
 708 static int process_app_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_central_local_snapshot_info_t *handle_info)
 709 {
 710     int ret, exit_status = ORTE_SUCCESS;
 711     opal_buffer_t *loc_buffer = NULL;
 712     orte_sstore_central_cmd_flag_t command;
 713     orte_sstore_central_local_app_snapshot_info_t *app_info = NULL;
 714 
 715     /*
 716      * Find this app's data
 717      */
 718     app_info = find_app_handle_info(handle_info, peer);
 719 
 720     /*
 721      * Push back the requested information
 722      */
 723     loc_buffer = OBJ_NEW(opal_buffer_t);
 724 
 725     command = ORTE_SSTORE_CENTRAL_PUSH;
 726     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &command, 1, ORTE_SSTORE_CENTRAL_CMD))) {
 727         ORTE_ERROR_LOG(ret);
 728         exit_status = ret;
 729         goto cleanup;
 730     }
 731 
 732     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
 733         ORTE_ERROR_LOG(ret);
 734         exit_status = ret;
 735         goto cleanup;
 736     }
 737 
 738     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->seq_num), 1, OPAL_INT))) {
 739         ORTE_ERROR_LOG(ret);
 740         exit_status = ret;
 741         goto cleanup;
 742     }
 743 
 744     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->global_ref_name), 1, OPAL_STRING))) {
 745         ORTE_ERROR_LOG(ret);
 746         exit_status = ret;
 747         goto cleanup;
 748     }
 749 
 750     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(app_info->local_location), 1, OPAL_STRING))) {
 751         ORTE_ERROR_LOG(ret);
 752         exit_status = ret;
 753         goto cleanup;
 754     }
 755 
 756     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(app_info->metadata_filename), 1, OPAL_STRING))) {
 757         ORTE_ERROR_LOG(ret);
 758         exit_status = ret;
 759         goto cleanup;
 760     }
 761 
 762     if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(peer, loc_buffer, ORTE_RML_TAG_SSTORE_INTERNAL,
 763                                                        orte_rml_send_callback, NULL))) {
 764         ORTE_ERROR_LOG(ret);
 765         exit_status = ret;
 766         goto cleanup;
 767     }
 768     /* loc_buffer should not be released here; the callback releases it */
 769     loc_buffer = NULL;
 770 
 771  cleanup:
 772     if (NULL != loc_buffer) {
 773         OBJ_RELEASE(loc_buffer);
 774         loc_buffer = NULL;
 775     }
 776 
 777     return exit_status;
 778 }
 779 
 780 static int process_app_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_central_local_snapshot_info_t *handle_info)
 781 {
 782     int ret, exit_status = ORTE_SUCCESS;
 783     orte_std_cntr_t count;
 784     orte_sstore_central_local_app_snapshot_info_t *app_info = NULL;
 785 
 786     /*
 787      * Find this app's data
 788      */
 789     app_info = find_app_handle_info(handle_info, peer);
 790 
 791     /*
 792      * Unpack the data
 793      */
 794     count = 1;
 795     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(app_info->ckpt_skipped), &count, OPAL_BOOL))) {
 796         ORTE_ERROR_LOG(ret);
 797         exit_status = ret;
 798         goto cleanup;
 799     }
 800 
 801     if( !app_info->ckpt_skipped ) {
 802         count = 1;
 803         if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(app_info->crs_comp), &count, OPAL_STRING))) {
 804             ORTE_ERROR_LOG(ret);
 805             exit_status = ret;
 806             goto cleanup;
 807         }
 808     }
 809 
 810     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
 811                          "sstore:central:(local): app_push(%s, skip=%s, %s)",
 812                          ORTE_NAME_PRINT(&(app_info->name)),
 813                          (app_info->ckpt_skipped ? "T" : "F"),
 814                          app_info->crs_comp));
 815 
 816  cleanup:
 817     return exit_status;
 818 }
 819 
 820 static int wait_all_apps_updated(orte_sstore_central_local_snapshot_info_t *handle_info)
 821 {
 822     orte_sstore_central_local_app_snapshot_info_t *app_info = NULL;
 823     opal_list_item_t *item = NULL;
 824     bool is_done = true;
 825 
 826     do {
 827         is_done = true;
 828         for(item  = opal_list_get_first(handle_info->app_info_handle);
 829             item != opal_list_get_end(handle_info->app_info_handle);
 830             item  = opal_list_get_next(item) ) {
 831             app_info = (orte_sstore_central_local_app_snapshot_info_t*)item;
 832 
 833             if( NULL == app_info->crs_comp && !app_info->ckpt_skipped ) {
 834                 is_done = false;
 835                 break;
 836             }
 837         }
 838 
 839         if( !is_done ) {
 840             OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
 841                                  "sstore:central:(local): Waiting for appliccation %s",
 842                                  ORTE_NAME_PRINT(&(app_info->name)) ));
 843             opal_progress();
 844         }
 845     } while(!is_done);
 846 
 847     return ORTE_SUCCESS;
 848 }
 849 
 850 static int pull_handle_info(orte_sstore_central_local_snapshot_info_t *handle_info )
 851 {
 852     int ret, exit_status = ORTE_SUCCESS;
 853     opal_buffer_t *buffer = NULL;
 854     orte_sstore_central_cmd_flag_t command;
 855 
 856     /*
 857      * Check to see if this is necessary
 858      * (Did we get all of the info from the handle unpack?)
 859      */
 860     if( 0 <= handle_info->seq_num &&
 861         NULL != handle_info->global_ref_name &&
 862         NULL != handle_info->location_fmt ) {
 863         handle_info->status = SSTORE_LOCAL_READY;
 864         return ORTE_SUCCESS;
 865     }
 866 
 867     buffer = OBJ_NEW(opal_buffer_t);
 868 
 869     /*
 870      * Ask the daemon to send us the info that we need
 871      */
 872     command = ORTE_SSTORE_CENTRAL_PULL;
 873     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SSTORE_CENTRAL_CMD))) {
 874         ORTE_ERROR_LOG(ret);
 875         exit_status = ret;
 876         goto cleanup;
 877     }
 878 
 879     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
 880         ORTE_ERROR_LOG(ret);
 881         exit_status = ret;
 882         goto cleanup;
 883     }
 884 
 885     if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer,
 886                                                        ORTE_RML_TAG_SSTORE_INTERNAL,
 887                                                        orte_rml_send_callback, NULL))) {
 888         ORTE_ERROR_LOG(ret);
 889         exit_status = ret;
 890         goto cleanup;
 891     }
 892 
 893     /* buffer should not be released here; the callback releases it */
 894     buffer = NULL;
 895 
 896  cleanup:
 897     if (NULL != buffer) {
 898         OBJ_RELEASE(buffer);
 899         buffer = NULL;
 900     }
 901 
 902     return exit_status;
 903 }
 904 
 905 static int push_handle_info(orte_sstore_central_local_snapshot_info_t *handle_info )
 906 {
 907     int ret, exit_status = ORTE_SUCCESS;
 908     opal_buffer_t *buffer = NULL;
 909     orte_sstore_central_cmd_flag_t command;
 910     orte_sstore_central_local_app_snapshot_info_t *app_info = NULL;
 911     opal_list_item_t *item = NULL;
 912     size_t list_size;
 913 
 914     buffer = OBJ_NEW(opal_buffer_t);
 915 
 916     command = ORTE_SSTORE_CENTRAL_PUSH;
 917     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SSTORE_CENTRAL_CMD))) {
 918         ORTE_ERROR_LOG(ret);
 919         exit_status = ret;
 920         goto cleanup;
 921     }
 922 
 923     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
 924         ORTE_ERROR_LOG(ret);
 925         exit_status = ret;
 926         goto cleanup;
 927     }
 928 
 929     list_size = opal_list_get_size(handle_info->app_info_handle);
 930     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &list_size, 1, OPAL_SIZE))) {
 931         ORTE_ERROR_LOG(ret);
 932         exit_status = ret;
 933         goto cleanup;
 934     }
 935 
 936     /*
 937      * For each process we are working with
 938      */
 939     for(item  = opal_list_get_first(handle_info->app_info_handle);
 940         item != opal_list_get_end(handle_info->app_info_handle);
 941         item  = opal_list_get_next(item) ) {
 942         app_info = (orte_sstore_central_local_app_snapshot_info_t*)item;
 943 
 944         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(app_info->name), 1, ORTE_NAME))) {
 945             ORTE_ERROR_LOG(ret);
 946             exit_status = ret;
 947             goto cleanup;
 948         }
 949 
 950         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(app_info->ckpt_skipped), 1, OPAL_BOOL))) {
 951             ORTE_ERROR_LOG(ret);
 952             exit_status = ret;
 953             goto cleanup;
 954         }
 955 
 956         if( !app_info->ckpt_skipped ) {
 957             if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(app_info->crs_comp), 1, OPAL_STRING))) {
 958                 ORTE_ERROR_LOG(ret);
 959                 exit_status = ret;
 960                 goto cleanup;
 961             }
 962         }
 963     }
 964 
 965     if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer,
 966                                                        ORTE_RML_TAG_SSTORE_INTERNAL,
 967                                                        orte_rml_send_callback, NULL))) {
 968         ORTE_ERROR_LOG(ret);
 969         exit_status = ret;
 970         goto cleanup;
 971     }
 972 
 973     /* buffer should not be released here; the callback releases it */
 974     buffer = NULL;
 975 
 976  cleanup:
 977     if (NULL != buffer) {
 978         OBJ_RELEASE(buffer);
 979         buffer = NULL;
 980     }
 981 
 982     return exit_status;
 983 }

/* [<][>][^][v][top][bottom][index][help] */