root/opal/mca/pmix/pmix4x/pmix/src/mca/gds/hash/gds_hash.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. htcon
  2. htdes
  3. hash_init
  4. hash_finalize
  5. hash_assign_module
  6. store_map
  7. hash_cache_job_info
  8. register_info
  9. hash_register_job_info
  10. hash_store_job_info
  11. hash_store
  12. hash_store_modex
  13. _hash_store_modex
  14. hash_fetch
  15. setup_fork
  16. nspace_add
  17. nspace_del
  18. assemb_kvs_req
  19. accept_kvs_resp

   1 /*
   2  * Copyright (c) 2015-2019 Intel, Inc.  All rights reserved.
   3  * Copyright (c) 2016-2018 IBM Corporation.  All rights reserved.
   4  * Copyright (c) 2018      Research Organization for Information Science
   5  *                         and Technology (RIST).  All rights reserved.
   6  * Copyright (c) 2018-2019 Mellanox Technologies, Inc.
   7  *                         All rights reserved.
   8  *
   9  * $COPYRIGHT$
  10  *
  11  * Additional copyrights may follow
  12  *
  13  * $HEADER$
  14  */
  15 
  16 #include <src/include/pmix_config.h>
  17 
  18 #include <string.h>
  19 #ifdef HAVE_UNISTD_H
  20 #include <unistd.h>
  21 #endif
  22 #ifdef HAVE_SYS_TYPES_H
  23 #include <sys/types.h>
  24 #endif
  25 #ifdef HAVE_SYS_STAT_H
  26 #include <sys/stat.h>
  27 #endif
  28 #ifdef HAVE_FCNTL_H
  29 #include <fcntl.h>
  30 #endif
  31 #include <time.h>
  32 
  33 #include <pmix_common.h>
  34 
  35 #include "src/include/pmix_globals.h"
  36 #include "src/class/pmix_list.h"
  37 #include "src/client/pmix_client_ops.h"
  38 #include "src/server/pmix_server_ops.h"
  39 #include "src/util/argv.h"
  40 #include "src/mca/pcompress/base/base.h"
  41 #include "src/util/error.h"
  42 #include "src/util/hash.h"
  43 #include "src/util/output.h"
  44 #include "src/util/pmix_environ.h"
  45 #include "src/mca/preg/preg.h"
  46 
  47 #include "src/mca/gds/base/base.h"
  48 #include "gds_hash.h"
  49 
  50 static pmix_status_t hash_init(pmix_info_t info[], size_t ninfo);
  51 static void hash_finalize(void);
  52 
  53 static pmix_status_t hash_assign_module(pmix_info_t *info, size_t ninfo,
  54                                         int *priority);
  55 
  56 static pmix_status_t hash_cache_job_info(struct pmix_namespace_t *ns,
  57                                          pmix_info_t info[], size_t ninfo);
  58 
  59 static pmix_status_t hash_register_job_info(struct pmix_peer_t *pr,
  60                                             pmix_buffer_t *reply);
  61 
  62 static pmix_status_t hash_store_job_info(const char *nspace,
  63                                         pmix_buffer_t *buf);
  64 
  65 static pmix_status_t hash_store(const pmix_proc_t *proc,
  66                                 pmix_scope_t scope,
  67                                 pmix_kval_t *kv);
  68 
  69 static pmix_status_t hash_store_modex(struct pmix_namespace_t *ns,
  70                                       pmix_buffer_t *buff,
  71                                       void *cbdata);
  72 
  73 static pmix_status_t _hash_store_modex(pmix_gds_base_ctx_t ctx,
  74                                        pmix_proc_t *proc,
  75                                        pmix_gds_modex_key_fmt_t key_fmt,
  76                                        char **kmap,
  77                                        pmix_buffer_t *pbkt);
  78 
  79 static pmix_status_t hash_fetch(const pmix_proc_t *proc,
  80                                 pmix_scope_t scope, bool copy,
  81                                 const char *key,
  82                                 pmix_info_t info[], size_t ninfo,
  83                                 pmix_list_t *kvs);
  84 
  85 static pmix_status_t setup_fork(const pmix_proc_t *peer, char ***env);
  86 
  87 static pmix_status_t nspace_add(const char *nspace,
  88                                 pmix_info_t info[],
  89                                 size_t ninfo);
  90 
  91 static pmix_status_t nspace_del(const char *nspace);
  92 
  93 static pmix_status_t assemb_kvs_req(const pmix_proc_t *proc,
  94                               pmix_list_t *kvs,
  95                               pmix_buffer_t *bo,
  96                               void *cbdata);
  97 
  98 static pmix_status_t accept_kvs_resp(pmix_buffer_t *buf);
  99 
 100 pmix_gds_base_module_t pmix_hash_module = {
 101     .name = "hash",
 102     .is_tsafe = false,
 103     .init = hash_init,
 104     .finalize = hash_finalize,
 105     .assign_module = hash_assign_module,
 106     .cache_job_info = hash_cache_job_info,
 107     .register_job_info = hash_register_job_info,
 108     .store_job_info = hash_store_job_info,
 109     .store = hash_store,
 110     .store_modex = hash_store_modex,
 111     .fetch = hash_fetch,
 112     .setup_fork = setup_fork,
 113     .add_nspace = nspace_add,
 114     .del_nspace = nspace_del,
 115     .assemb_kvs_req = assemb_kvs_req,
 116     .accept_kvs_resp = accept_kvs_resp
 117 };
 118 
 119 typedef struct {
 120     pmix_list_item_t super;
 121     char *ns;
 122     pmix_namespace_t *nptr;
 123     pmix_hash_table_t internal;
 124     pmix_hash_table_t remote;
 125     pmix_hash_table_t local;
 126     bool gdata_added;
 127 } pmix_hash_trkr_t;
 128 
 129 static void htcon(pmix_hash_trkr_t *p)
 130 {
 131     p->ns = NULL;
 132     p->nptr = NULL;
 133     PMIX_CONSTRUCT(&p->internal, pmix_hash_table_t);
 134     pmix_hash_table_init(&p->internal, 256);
 135     PMIX_CONSTRUCT(&p->remote, pmix_hash_table_t);
 136     pmix_hash_table_init(&p->remote, 256);
 137     PMIX_CONSTRUCT(&p->local, pmix_hash_table_t);
 138     pmix_hash_table_init(&p->local, 256);
 139     p->gdata_added = false;
 140 }
 141 static void htdes(pmix_hash_trkr_t *p)
 142 {
 143     if (NULL != p->ns) {
 144         free(p->ns);
 145     }
 146     if (NULL != p->nptr) {
 147         PMIX_RELEASE(p->nptr);
 148     }
 149     pmix_hash_remove_data(&p->internal, PMIX_RANK_WILDCARD, NULL);
 150     PMIX_DESTRUCT(&p->internal);
 151     pmix_hash_remove_data(&p->remote, PMIX_RANK_WILDCARD, NULL);
 152     PMIX_DESTRUCT(&p->remote);
 153     pmix_hash_remove_data(&p->local, PMIX_RANK_WILDCARD, NULL);
 154     PMIX_DESTRUCT(&p->local);
 155 }
 156 static PMIX_CLASS_INSTANCE(pmix_hash_trkr_t,
 157                            pmix_list_item_t,
 158                            htcon, htdes);
 159 
 160 static pmix_list_t myhashes;
 161 
 162 static pmix_status_t hash_init(pmix_info_t info[], size_t ninfo)
 163 {
 164     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
 165                         "gds: hash init");
 166 
 167     PMIX_CONSTRUCT(&myhashes, pmix_list_t);
 168     return PMIX_SUCCESS;
 169 }
 170 
 171 static void hash_finalize(void)
 172 {
 173     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
 174                         "gds: hash finalize");
 175 
 176     PMIX_LIST_DESTRUCT(&myhashes);
 177 }
 178 
 179 static pmix_status_t hash_assign_module(pmix_info_t *info, size_t ninfo,
 180                                         int *priority)
 181 {
 182     size_t n, m;
 183     char **options;
 184 
 185     *priority = 10;
 186     if (NULL != info) {
 187         for (n=0; n < ninfo; n++) {
 188             if (0 == strncmp(info[n].key, PMIX_GDS_MODULE, PMIX_MAX_KEYLEN)) {
 189                 options = pmix_argv_split(info[n].value.data.string, ',');
 190                 for (m=0; NULL != options[m]; m++) {
 191                     if (0 == strcmp(options[m], "hash")) {
 192                         /* they specifically asked for us */
 193                         *priority = 100;
 194                         break;
 195                     }
 196                 }
 197                 pmix_argv_free(options);
 198                 break;
 199             }
 200         }
 201     }
 202     return PMIX_SUCCESS;
 203 }
 204 
 205 static pmix_status_t store_map(pmix_hash_table_t *ht,
 206                                char **nodes, char **ppn)
 207 {
 208     pmix_status_t rc;
 209     pmix_value_t *val;
 210     size_t m, n;
 211     pmix_info_t *iptr, *info;
 212     pmix_rank_t rank;
 213     bool updated;
 214     pmix_kval_t *kp2;
 215     char **procs;
 216 
 217     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
 218                         "[%s:%d] gds:hash:store_map",
 219                         pmix_globals.myid.nspace, pmix_globals.myid.rank);
 220 
 221     /* if the lists don't match, then that's wrong */
 222     if (pmix_argv_count(nodes) != pmix_argv_count(ppn)) {
 223         PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
 224         return PMIX_ERR_BAD_PARAM;
 225     }
 226 
 227     for (n=0; NULL != nodes[n]; n++) {
 228         /* check and see if we already have data for this node */
 229         val = NULL;
 230         rc = pmix_hash_fetch(ht, PMIX_RANK_WILDCARD, nodes[n], &val);
 231         if (PMIX_SUCCESS == rc && NULL != val) {
 232             /* already have some data. See if we have the list of local peers */
 233             if (PMIX_DATA_ARRAY != val->type ||
 234                 NULL == val->data.darray ||
 235                 PMIX_INFO != val->data.darray->type ||
 236                 0 == val->data.darray->size) {
 237                 /* something is wrong */
 238                 PMIX_VALUE_RELEASE(val);
 239                 PMIX_ERROR_LOG(PMIX_ERR_INVALID_VAL);
 240                 return PMIX_ERR_INVALID_VAL;
 241             }
 242             iptr = (pmix_info_t*)val->data.darray->array;
 243             updated = false;
 244             for (m=0; m < val->data.darray->size; m++) {
 245                 if (0 == strncmp(iptr[m].key, PMIX_LOCAL_PEERS, PMIX_MAX_KEYLEN)) {
 246                     /* we will update this entry */
 247                     if (NULL != iptr[m].value.data.string) {
 248                         free(iptr[m].value.data.string);
 249                     }
 250                     iptr[m].value.data.string = strdup(ppn[n]);
 251                     updated = true;
 252                     break;
 253                 }
 254             }
 255             if (!updated) {
 256                 /* append this entry to the current data */
 257                 kp2 = PMIX_NEW(pmix_kval_t);
 258                 if (NULL == kp2) {
 259                     return PMIX_ERR_NOMEM;
 260                 }
 261                 kp2->key = strdup(nodes[n]);
 262                 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
 263                 if (NULL == kp2->value) {
 264                     PMIX_RELEASE(kp2);
 265                     return PMIX_ERR_NOMEM;
 266                 }
 267                 kp2->value->type = PMIX_DATA_ARRAY;
 268                 kp2->value->data.darray = (pmix_data_array_t*)malloc(sizeof(pmix_data_array_t));
 269                 if (NULL == kp2->value->data.darray) {
 270                     PMIX_RELEASE(kp2);
 271                     return PMIX_ERR_NOMEM;
 272                 }
 273                 kp2->value->data.darray->type = PMIX_INFO;
 274                 kp2->value->data.darray->size = val->data.darray->size + 1;
 275                 PMIX_INFO_CREATE(info, kp2->value->data.darray->size);
 276                 if (NULL == info) {
 277                     PMIX_RELEASE(kp2);
 278                     return PMIX_ERR_NOMEM;
 279                 }
 280                 /* copy the pre-existing data across */
 281                 for (m=0; m < val->data.darray->size; m++) {
 282                     PMIX_INFO_XFER(&info[m], &iptr[m]);
 283                 }
 284                 PMIX_INFO_LOAD(&info[kp2->value->data.darray->size-1], PMIX_LOCAL_PEERS, ppn[n], PMIX_STRING);
 285                 kp2->value->data.darray->array = info;
 286                 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kp2))) {
 287                     PMIX_ERROR_LOG(rc);
 288                     PMIX_RELEASE(kp2);
 289                     return rc;
 290                 }
 291                 PMIX_RELEASE(kp2);
 292             }
 293         } else {
 294             /* store the list as-is */
 295             kp2 = PMIX_NEW(pmix_kval_t);
 296             if (NULL == kp2) {
 297                 return PMIX_ERR_NOMEM;
 298             }
 299             kp2->key = strdup(nodes[n]);
 300             kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
 301             if (NULL == kp2->value) {
 302                 PMIX_RELEASE(kp2);
 303                 return PMIX_ERR_NOMEM;
 304             }
 305             kp2->value->type = PMIX_DATA_ARRAY;
 306             kp2->value->data.darray = (pmix_data_array_t*)malloc(sizeof(pmix_data_array_t));
 307             if (NULL == kp2->value->data.darray) {
 308                 PMIX_RELEASE(kp2);
 309                 return PMIX_ERR_NOMEM;
 310             }
 311             kp2->value->data.darray->type = PMIX_INFO;
 312             PMIX_INFO_CREATE(info, 1);
 313             if (NULL == info) {
 314                 PMIX_RELEASE(kp2);
 315                 return PMIX_ERR_NOMEM;
 316             }
 317             PMIX_INFO_LOAD(&info[0], PMIX_LOCAL_PEERS, ppn[n], PMIX_STRING);
 318             kp2->value->data.darray->array = info;
 319             kp2->value->data.darray->size = 1;
 320             if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kp2))) {
 321                 PMIX_ERROR_LOG(rc);
 322                 PMIX_RELEASE(kp2);
 323                 return rc;
 324             }
 325             PMIX_RELEASE(kp2);
 326         }
 327         /* split the list of procs so we can store their
 328          * individual location data */
 329         procs = pmix_argv_split(ppn[n], ',');
 330         for (m=0; NULL != procs[m]; m++) {
 331             /* store the hostname for each proc */
 332             kp2 = PMIX_NEW(pmix_kval_t);
 333             kp2->key = strdup(PMIX_HOSTNAME);
 334             kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
 335             kp2->value->type = PMIX_STRING;
 336             kp2->value->data.string = strdup(nodes[n]);
 337             rank = strtol(procs[m], NULL, 10);
 338             if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, rank, kp2))) {
 339                 PMIX_ERROR_LOG(rc);
 340                 PMIX_RELEASE(kp2);
 341                 pmix_argv_free(procs);
 342                 return rc;
 343             }
 344             PMIX_RELEASE(kp2);  // maintain acctg
 345         }
 346         pmix_argv_free(procs);
 347     }
 348 
 349     /* store the comma-delimited list of nodes hosting
 350      * procs in this nspace in case someone using PMIx v2
 351      * requests it */
 352     kp2 = PMIX_NEW(pmix_kval_t);
 353     kp2->key = strdup(PMIX_NODE_LIST);
 354     kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
 355     kp2->value->type = PMIX_STRING;
 356     kp2->value->data.string = pmix_argv_join(nodes, ',');
 357     if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kp2))) {
 358         PMIX_ERROR_LOG(rc);
 359         PMIX_RELEASE(kp2);
 360         return rc;
 361     }
 362     PMIX_RELEASE(kp2);  // maintain acctg
 363 
 364     return PMIX_SUCCESS;
 365 }
 366 
 367 pmix_status_t hash_cache_job_info(struct pmix_namespace_t *ns,
 368                                   pmix_info_t info[], size_t ninfo)
 369 {
 370     pmix_namespace_t *nptr = (pmix_namespace_t*)ns;
 371     pmix_hash_trkr_t *trk, *t;
 372     pmix_hash_table_t *ht;
 373     pmix_kval_t *kp2, *kvptr;
 374     pmix_info_t *iptr;
 375     char **nodes=NULL, **procs=NULL;
 376     uint8_t *tmp;
 377     pmix_rank_t rank;
 378     pmix_status_t rc=PMIX_SUCCESS;
 379     size_t n, j, size, len;
 380 
 381     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
 382                         "[%s:%d] gds:hash:cache_job_info for nspace %s",
 383                         pmix_globals.myid.nspace, pmix_globals.myid.rank,
 384                         nptr->nspace);
 385 
 386     /* find the hash table for this nspace */
 387     trk = NULL;
 388     PMIX_LIST_FOREACH(t, &myhashes, pmix_hash_trkr_t) {
 389         if (0 == strcmp(nptr->nspace, t->ns)) {
 390             trk = t;
 391             break;
 392         }
 393     }
 394     if (NULL == trk) {
 395         /* create a tracker as we will likely need it */
 396         trk = PMIX_NEW(pmix_hash_trkr_t);
 397         if (NULL == trk) {
 398             return PMIX_ERR_NOMEM;
 399         }
 400         PMIX_RETAIN(nptr);
 401         trk->nptr = nptr;
 402         trk->ns = strdup(nptr->nspace);
 403         pmix_list_append(&myhashes, &trk->super);
 404     }
 405 
 406     /* if there isn't any data, then be content with just
 407      * creating the tracker */
 408     if (NULL == info || 0 == ninfo) {
 409         return PMIX_SUCCESS;
 410     }
 411 
 412     /* cache the job info on the internal hash table for this nspace */
 413     ht = &trk->internal;
 414     for (n=0; n < ninfo; n++) {
 415         if (0 == strcmp(info[n].key, PMIX_NODE_MAP)) {
 416             /* store the node map itself since that is
 417              * what v3 uses */
 418             kp2 = PMIX_NEW(pmix_kval_t);
 419             kp2->key = strdup(PMIX_NODE_MAP);
 420             kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
 421             kp2->value->type = PMIX_STRING;
 422             kp2->value->data.string = strdup(info[n].value.data.string);
 423             if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kp2))) {
 424                 PMIX_ERROR_LOG(rc);
 425                 PMIX_RELEASE(kp2);
 426                 return rc;
 427             }
 428             PMIX_RELEASE(kp2);  // maintain acctg
 429 
 430             /* parse the regex to get the argv array of node names */
 431             if (PMIX_SUCCESS != (rc = pmix_preg.parse_nodes(info[n].value.data.string, &nodes))) {
 432                 PMIX_ERROR_LOG(rc);
 433                 goto release;
 434             }
 435             /* if we have already found the proc map, then parse
 436              * and store the detailed map */
 437             if (NULL != procs) {
 438                 if (PMIX_SUCCESS != (rc = store_map(ht, nodes, procs))) {
 439                     PMIX_ERROR_LOG(rc);
 440                     goto release;
 441                 }
 442             }
 443         } else if (0 == strcmp(info[n].key, PMIX_PROC_MAP)) {
 444             /* parse the regex to get the argv array containing proc ranks on each node */
 445             if (PMIX_SUCCESS != (rc = pmix_preg.parse_procs(info[n].value.data.string, &procs))) {
 446                 PMIX_ERROR_LOG(rc);
 447                 goto release;
 448             }
 449             /* if we have already recv'd the node map, then parse
 450              * and store the detailed map */
 451             if (NULL != nodes) {
 452                 if (PMIX_SUCCESS != (rc = store_map(ht, nodes, procs))) {
 453                     PMIX_ERROR_LOG(rc);
 454                     goto release;
 455                 }
 456             }
 457         } else if (0 == strcmp(info[n].key, PMIX_PROC_DATA)) {
 458             /* an array of data pertaining to a specific proc */
 459             if (PMIX_DATA_ARRAY != info[n].value.type) {
 460                 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
 461                 rc = PMIX_ERR_TYPE_MISMATCH;
 462                 goto release;
 463             }
 464             size = info[n].value.data.darray->size;
 465             iptr = (pmix_info_t*)info[n].value.data.darray->array;
 466             /* first element of the array must be the rank */
 467             if (0 != strcmp(iptr[0].key, PMIX_RANK) ||
 468                 PMIX_PROC_RANK != iptr[0].value.type) {
 469                 rc = PMIX_ERR_TYPE_MISMATCH;
 470                 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
 471                 goto release;
 472             }
 473             rank = iptr[0].value.data.rank;
 474             /* cycle thru the values for this rank and store them */
 475             for (j=1; j < size; j++) {
 476                 kp2 = PMIX_NEW(pmix_kval_t);
 477                 if (NULL == kp2) {
 478                     rc = PMIX_ERR_NOMEM;
 479                     goto release;
 480                 }
 481                 kp2->key = strdup(iptr[j].key);
 482                 PMIX_VALUE_XFER(rc, kp2->value, &iptr[j].value);
 483                 if (PMIX_SUCCESS != rc) {
 484                     PMIX_ERROR_LOG(rc);
 485                     PMIX_RELEASE(kp2);
 486                     goto release;
 487                 }
 488                 /* if the value contains a string that is longer than the
 489                  * limit, then compress it */
 490                 if (PMIX_STRING_SIZE_CHECK(kp2->value)) {
 491                     if (pmix_compress.compress_string(kp2->value->data.string, &tmp, &len)) {
 492                         if (NULL == tmp) {
 493                             PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
 494                             rc = PMIX_ERR_NOMEM;
 495                             goto release;
 496                         }
 497                         kp2->value->type = PMIX_COMPRESSED_STRING;
 498                         free(kp2->value->data.string);
 499                         kp2->value->data.bo.bytes = (char*)tmp;
 500                         kp2->value->data.bo.size = len;
 501                     }
 502                 }
 503                 /* store it in the hash_table */
 504                 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, rank, kp2))) {
 505                     PMIX_ERROR_LOG(rc);
 506                     PMIX_RELEASE(kp2);
 507                     goto release;
 508                 }
 509                 PMIX_RELEASE(kp2);  // maintain acctg
 510             }
 511         } else {
 512             /* just a value relating to the entire job */
 513             kp2 = PMIX_NEW(pmix_kval_t);
 514             if (NULL == kp2) {
 515                 rc = PMIX_ERR_NOMEM;
 516                 goto release;
 517             }
 518             kp2->key = strdup(info[n].key);
 519             PMIX_VALUE_XFER(rc, kp2->value, &info[n].value);
 520             if (PMIX_SUCCESS != rc) {
 521                 PMIX_ERROR_LOG(rc);
 522                 PMIX_RELEASE(kp2);
 523                 goto release;
 524             }
 525             /* if the value contains a string that is longer than the
 526              * limit, then compress it */
 527             if (PMIX_STRING_SIZE_CHECK(kp2->value)) {
 528                 if (pmix_compress.compress_string(kp2->value->data.string, &tmp, &len)) {
 529                     if (NULL == tmp) {
 530                         rc = PMIX_ERR_NOMEM;
 531                         PMIX_ERROR_LOG(rc);
 532                         PMIX_RELEASE(kp2);
 533                         goto release;
 534                     }
 535                     kp2->value->type = PMIX_COMPRESSED_STRING;
 536                     free(kp2->value->data.string);
 537                     kp2->value->data.bo.bytes = (char*)tmp;
 538                     kp2->value->data.bo.size = len;
 539                 }
 540             }
 541             if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kp2))) {
 542                 PMIX_ERROR_LOG(rc);
 543                 PMIX_RELEASE(kp2);
 544                 goto release;
 545             }
 546             PMIX_RELEASE(kp2);  // maintain acctg
 547             /* if this is the job size, then store it */
 548             if (0 == strncmp(info[n].key, PMIX_JOB_SIZE, PMIX_MAX_KEYLEN)) {
 549                 nptr->nprocs = info[n].value.data.uint32;
 550             }
 551         }
 552     }
 553 
 554     /* now add any global data that was provided */
 555     if (!trk->gdata_added) {
 556         PMIX_LIST_FOREACH(kvptr, &pmix_server_globals.gdata, pmix_kval_t) {
 557             /* sadly, the data cannot simultaneously exist on two lists,
 558              * so we must make a copy of it here */
 559             kp2 = PMIX_NEW(pmix_kval_t);
 560             if (NULL == kp2) {
 561                 rc = PMIX_ERR_NOMEM;
 562                 goto release;
 563             }
 564             kp2->key = strdup(kvptr->key);
 565             PMIX_VALUE_XFER(rc, kp2->value, kvptr->value);
 566             if (PMIX_SUCCESS != rc) {
 567                 PMIX_ERROR_LOG(rc);
 568                 PMIX_RELEASE(kp2);
 569                 goto release;
 570             }
 571             if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kp2))) {
 572                 PMIX_ERROR_LOG(rc);
 573                 PMIX_RELEASE(kp2);
 574                 break;
 575             }
 576             PMIX_RELEASE(kp2);  // maintain acctg
 577         }
 578         trk->gdata_added = true;
 579     }
 580 
 581   release:
 582     if (NULL != nodes) {
 583         pmix_argv_free(nodes);
 584     }
 585     if (NULL != procs) {
 586         pmix_argv_free(procs);
 587     }
 588     return rc;
 589 }
 590 
 591 static pmix_status_t register_info(pmix_peer_t *peer,
 592                                    pmix_namespace_t *ns,
 593                                    pmix_buffer_t *reply)
 594 {
 595     pmix_hash_trkr_t *trk, *t;
 596     pmix_hash_table_t *ht;
 597     pmix_value_t *val, blob;
 598     pmix_status_t rc = PMIX_SUCCESS;
 599     pmix_info_t *info;
 600     size_t ninfo, n;
 601     pmix_kval_t kv;
 602     pmix_buffer_t buf;
 603     pmix_rank_t rank;
 604 
 605     trk = NULL;
 606     PMIX_LIST_FOREACH(t, &myhashes, pmix_hash_trkr_t) {
 607         if (0 == strcmp(ns->nspace, t->ns)) {
 608             trk = t;
 609             break;
 610         }
 611     }
 612     if (NULL == trk) {
 613         return PMIX_ERR_INVALID_NAMESPACE;
 614     }
 615     /* the job data is stored on the internal hash table */
 616     ht = &trk->internal;
 617 
 618     /* fetch all values from the hash table tied to rank=wildcard */
 619     val = NULL;
 620     rc = pmix_hash_fetch(ht, PMIX_RANK_WILDCARD, NULL, &val);
 621     if (PMIX_SUCCESS != rc) {
 622         PMIX_ERROR_LOG(rc);
 623         if (NULL != val) {
 624             PMIX_VALUE_RELEASE(val);
 625         }
 626         return rc;
 627     }
 628 
 629     if (NULL == val || NULL == val->data.darray ||
 630         PMIX_INFO != val->data.darray->type ||
 631         0 == val->data.darray->size) {
 632         return PMIX_ERR_NOT_FOUND;
 633     }
 634     info = (pmix_info_t*)val->data.darray->array;
 635     ninfo = val->data.darray->size;
 636     for (n=0; n < ninfo; n++) {
 637         kv.key = info[n].key;
 638         kv.value = &info[n].value;
 639         PMIX_BFROPS_PACK(rc, peer, reply, &kv, 1, PMIX_KVAL);
 640     }
 641     if (NULL != val) {
 642         PMIX_VALUE_RELEASE(val);
 643     }
 644 
 645     for (rank=0; rank < ns->nprocs; rank++) {
 646         val = NULL;
 647         rc = pmix_hash_fetch(ht, rank, NULL, &val);
 648         if (PMIX_SUCCESS != rc) {
 649             PMIX_ERROR_LOG(rc);
 650             if (NULL != val) {
 651                 PMIX_VALUE_RELEASE(val);
 652             }
 653             return rc;
 654         }
 655         if (NULL == val) {
 656             return PMIX_ERR_NOT_FOUND;
 657         }
 658         PMIX_CONSTRUCT(&buf, pmix_buffer_t);
 659         PMIX_BFROPS_PACK(rc, peer, &buf, &rank, 1, PMIX_PROC_RANK);
 660 
 661         info = (pmix_info_t*)val->data.darray->array;
 662         ninfo = val->data.darray->size;
 663         for (n=0; n < ninfo; n++) {
 664             kv.key = info[n].key;
 665             kv.value = &info[n].value;
 666             PMIX_BFROPS_PACK(rc, peer, &buf, &kv, 1, PMIX_KVAL);
 667         }
 668         kv.key = PMIX_PROC_BLOB;
 669         kv.value = &blob;
 670         blob.type = PMIX_BYTE_OBJECT;
 671         PMIX_UNLOAD_BUFFER(&buf, blob.data.bo.bytes, blob.data.bo.size);
 672         PMIX_BFROPS_PACK(rc, peer, reply, &kv, 1, PMIX_KVAL);
 673         PMIX_VALUE_DESTRUCT(&blob);
 674         PMIX_DESTRUCT(&buf);
 675 
 676         if (NULL != val) {
 677             PMIX_VALUE_RELEASE(val);
 678         }
 679     }
 680     return rc;
 681 }
 682 
 683 /* the purpose of this function is to pack the job-level
 684  * info stored in the pmix_namespace_t into a buffer and send
 685  * it to the given client */
 686 static pmix_status_t hash_register_job_info(struct pmix_peer_t *pr,
 687                                             pmix_buffer_t *reply)
 688 {
 689     pmix_peer_t *peer = (pmix_peer_t*)pr;
 690     pmix_namespace_t *ns = peer->nptr;
 691     char *msg;
 692     pmix_status_t rc;
 693     pmix_hash_trkr_t *trk, *t2;
 694 
 695     if (!PMIX_PROC_IS_SERVER(pmix_globals.mypeer) &&
 696         !PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer)) {
 697         /* this function is only available on servers */
 698         PMIX_ERROR_LOG(PMIX_ERR_NOT_SUPPORTED);
 699         return PMIX_ERR_NOT_SUPPORTED;
 700     }
 701 
 702     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
 703                         "[%s:%d] gds:hash:register_job_info for peer [%s:%d]",
 704                         pmix_globals.myid.nspace, pmix_globals.myid.rank,
 705                         peer->info->pname.nspace, peer->info->pname.rank);
 706 
 707     /* first see if we already have processed this data
 708      * for another peer in this nspace so we don't waste
 709      * time doing it again */
 710     if (NULL != ns->jobbkt) {
 711         /* we have packed this before - can just deliver it */
 712         PMIX_BFROPS_COPY_PAYLOAD(rc, peer, reply, ns->jobbkt);
 713         if (PMIX_SUCCESS != rc) {
 714             PMIX_ERROR_LOG(rc);
 715         }
 716         /* now see if we have delivered it to all our local
 717          * clients for this nspace */
 718         if (ns->ndelivered == ns->nlocalprocs) {
 719             /* we have, so let's get rid of the packed
 720              * copy of the data */
 721             PMIX_RELEASE(ns->jobbkt);
 722             ns->jobbkt = NULL;
 723         }
 724         return rc;
 725     }
 726 
 727     /* setup a tracker for this nspace as we will likely
 728      * need it again */
 729     trk = NULL;
 730     PMIX_LIST_FOREACH(t2, &myhashes, pmix_hash_trkr_t) {
 731         if (ns == t2->nptr) {
 732             trk = t2;
 733             if (NULL == trk->ns) {
 734                 trk->ns = strdup(ns->nspace);
 735             }
 736             break;
 737         }
 738     }
 739     if (NULL == trk) {
 740         trk = PMIX_NEW(pmix_hash_trkr_t);
 741         trk->ns = strdup(ns->nspace);
 742         PMIX_RETAIN(ns);
 743         trk->nptr = ns;
 744         pmix_list_append(&myhashes, &trk->super);
 745     }
 746 
 747     /* the job info for the specified nspace has
 748      * been given to us in the info array - pack
 749      * them for delivery */
 750     /* pack the name of the nspace */
 751     msg = ns->nspace;
 752     PMIX_BFROPS_PACK(rc, peer, reply, &msg, 1, PMIX_STRING);
 753     if (PMIX_SUCCESS != rc) {
 754         PMIX_ERROR_LOG(rc);
 755         return rc;
 756     }
 757 
 758     rc = register_info(peer, ns, reply);
 759     if (PMIX_SUCCESS == rc) {
 760         /* if we have more than one local client for this nspace,
 761          * save this packed object so we don't do this again */
 762         if (1 < ns->nlocalprocs) {
 763             PMIX_RETAIN(reply);
 764             ns->jobbkt = reply;
 765         }
 766     } else {
 767         PMIX_ERROR_LOG(rc);
 768     }
 769 
 770     return rc;
 771 }
 772 
 773 static pmix_status_t hash_store_job_info(const char *nspace,
 774                                          pmix_buffer_t *buf)
 775 {
 776     pmix_status_t rc = PMIX_SUCCESS;
 777     pmix_kval_t *kptr, *kp2, kv;
 778     pmix_value_t *val;
 779     int32_t cnt;
 780     size_t nnodes, len, n;
 781     uint32_t i, j;
 782     char **procs = NULL;
 783     uint8_t *tmp;
 784     pmix_byte_object_t *bo;
 785     pmix_buffer_t buf2;
 786     int rank;
 787     pmix_hash_trkr_t *htptr;
 788     pmix_hash_table_t *ht;
 789     char **nodelist = NULL;
 790     pmix_info_t *info, *iptr;
 791 
 792     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
 793                         "[%s:%u] pmix:gds:hash store job info for nspace %s",
 794                         pmix_globals.myid.nspace, pmix_globals.myid.rank, nspace);
 795 
 796     if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer) &&
 797         !PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer)) {
 798         /* this function is NOT available on servers */
 799         PMIX_ERROR_LOG(PMIX_ERR_NOT_SUPPORTED);
 800         return PMIX_ERR_NOT_SUPPORTED;
 801     }
 802 
 803     /* check buf data */
 804     if ((NULL == buf) || (0 == buf->bytes_used)) {
 805         rc = PMIX_ERR_BAD_PARAM;
 806         PMIX_ERROR_LOG(rc);
 807         return rc;
 808     }
 809 
 810     /* see if we already have a hash table for this nspace */
 811     ht = NULL;
 812     PMIX_LIST_FOREACH(htptr, &myhashes, pmix_hash_trkr_t) {
 813         if (0 == strcmp(htptr->ns, nspace)) {
 814             ht = &htptr->internal;
 815             break;
 816         }
 817     }
 818     if (NULL == ht) {
 819         /* nope - create one */
 820         htptr = PMIX_NEW(pmix_hash_trkr_t);
 821         htptr->ns = strdup(nspace);
 822         pmix_list_append(&myhashes, &htptr->super);
 823         ht = &htptr->internal;
 824     }
 825 
 826     cnt = 1;
 827     kptr = PMIX_NEW(pmix_kval_t);
 828     PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
 829                        buf, kptr, &cnt, PMIX_KVAL);
 830     while (PMIX_SUCCESS == rc) {
 831         pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
 832                             "[%s:%u] pmix:gds:hash store job info working key %s",
 833                             pmix_globals.myid.nspace, pmix_globals.myid.rank, kptr->key);
 834         if (0 == strcmp(kptr->key, PMIX_PROC_BLOB)) {
 835             bo = &(kptr->value->data.bo);
 836             PMIX_CONSTRUCT(&buf2, pmix_buffer_t);
 837             PMIX_LOAD_BUFFER(pmix_client_globals.myserver, &buf2, bo->bytes, bo->size);
 838             /* start by unpacking the rank */
 839             cnt = 1;
 840             PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
 841                                &buf2, &rank, &cnt, PMIX_PROC_RANK);
 842             if (PMIX_SUCCESS != rc) {
 843                 PMIX_ERROR_LOG(rc);
 844                 PMIX_DESTRUCT(&buf2);
 845                 return rc;
 846             }
 847             /* unpack the blob and save the values for this rank */
 848             cnt = 1;
 849             kp2 = PMIX_NEW(pmix_kval_t);
 850             PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
 851                                &buf2, kp2, &cnt, PMIX_KVAL);
 852             while (PMIX_SUCCESS == rc) {
 853                 /* if the value contains a string that is longer than the
 854                  * limit, then compress it */
 855                 if (PMIX_STRING_SIZE_CHECK(kp2->value)) {
 856                     if (pmix_compress.compress_string(kp2->value->data.string, &tmp, &len)) {
 857                         if (NULL == tmp) {
 858                             PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
 859                             rc = PMIX_ERR_NOMEM;
 860                             return rc;
 861                         }
 862                         kp2->value->type = PMIX_COMPRESSED_STRING;
 863                         free(kp2->value->data.string);
 864                         kp2->value->data.bo.bytes = (char*)tmp;
 865                         kp2->value->data.bo.size = len;
 866                     }
 867                 }
 868                 /* this is data provided by a job-level exchange, so store it
 869                  * in the job-level data hash_table */
 870                 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, rank, kp2))) {
 871                     PMIX_ERROR_LOG(rc);
 872                     PMIX_RELEASE(kp2);
 873                     PMIX_DESTRUCT(&buf2);
 874                     return rc;
 875                 }
 876                 PMIX_RELEASE(kp2); // maintain accounting
 877                 cnt = 1;
 878                 kp2 = PMIX_NEW(pmix_kval_t);
 879                 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
 880                                    &buf2, kp2, &cnt, PMIX_KVAL);
 881             }
 882             /* cleanup */
 883             PMIX_DESTRUCT(&buf2);  // releases the original kptr data
 884             PMIX_RELEASE(kp2);
 885         } else if (0 == strcmp(kptr->key, PMIX_MAP_BLOB)) {
 886             /* transfer the byte object for unpacking */
 887             bo = &(kptr->value->data.bo);
 888             PMIX_CONSTRUCT(&buf2, pmix_buffer_t);
 889             PMIX_LOAD_BUFFER(pmix_client_globals.myserver, &buf2, bo->bytes, bo->size);
 890             /* start by unpacking the number of nodes */
 891             cnt = 1;
 892             PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
 893                                &buf2, &nnodes, &cnt, PMIX_SIZE);
 894             if (PMIX_SUCCESS != rc) {
 895                 PMIX_ERROR_LOG(rc);
 896                 PMIX_DESTRUCT(&buf2);
 897                 return rc;
 898             }
 899             /* unpack the list of procs on each node */
 900             for (i=0; i < nnodes; i++) {
 901                 cnt = 1;
 902                 PMIX_CONSTRUCT(&kv, pmix_kval_t);
 903                 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
 904                                    &buf2, &kv, &cnt, PMIX_KVAL);
 905                 if (PMIX_SUCCESS != rc) {
 906                     PMIX_ERROR_LOG(rc);
 907                     PMIX_DESTRUCT(&buf2);
 908                     PMIX_DESTRUCT(&kv);
 909                     return rc;
 910                 }
 911                 /* track the nodes in this nspace */
 912                 pmix_argv_append_nosize(&nodelist, kv.key);
 913                 /* save the list of peers for this node - but first
 914                  * check to see if we already have some data for this node */
 915                 rc = pmix_hash_fetch(ht, PMIX_RANK_WILDCARD, kv.key, &val);
 916                 if (PMIX_SUCCESS == rc) {
 917                     /* already have some data, so we need to add to it */
 918                     kp2 = PMIX_NEW(pmix_kval_t);
 919                     kp2->key = strdup(kv.key);
 920                     kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
 921                     kp2->value->type = PMIX_DATA_ARRAY;
 922                     kp2->value->data.darray = (pmix_data_array_t*)malloc(sizeof(pmix_data_array_t));
 923                     if (NULL == kp2->value->data.darray) {
 924                         PMIX_DESTRUCT(&buf2);
 925                         PMIX_DESTRUCT(&kv);
 926                         PMIX_RELEASE(kp2);
 927                         return PMIX_ERR_NOMEM;
 928                     }
 929                     kp2->value->data.darray->type = PMIX_INFO;
 930                     kp2->value->data.darray->size = val->data.darray->size + 1;
 931                     PMIX_INFO_CREATE(info, kp2->value->data.darray->size);
 932                     if (NULL == info) {
 933                         PMIX_DESTRUCT(&buf2);
 934                         PMIX_DESTRUCT(&kv);
 935                         PMIX_RELEASE(kp2);
 936                         return PMIX_ERR_NOMEM;
 937                     }
 938                     iptr = (pmix_info_t*)val->data.darray->array;
 939                     /* copy the pre-existing data across */
 940                     for (n=0; n < val->data.darray->size; n++) {
 941                         PMIX_INFO_XFER(&info[n], &iptr[n]);
 942                     }
 943                     PMIX_INFO_LOAD(&info[kp2->value->data.darray->size-1], PMIX_LOCAL_PEERS, kv.value->data.string, PMIX_STRING);
 944                     kp2->value->data.darray->array = info;
 945                     if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kp2))) {
 946                         PMIX_ERROR_LOG(rc);
 947                         PMIX_RELEASE(kp2);
 948                         PMIX_DESTRUCT(&kv);
 949                         PMIX_DESTRUCT(&buf2);
 950                         return rc;
 951                     }
 952                     PMIX_RELEASE(kp2);  // maintain acctg
 953                 } else {
 954                     /* nope - so add this by itself */
 955                     kp2 = PMIX_NEW(pmix_kval_t);
 956                     kp2->key = strdup(kv.key);
 957                     kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
 958                     kp2->value->type = PMIX_DATA_ARRAY;
 959                     kp2->value->data.darray = (pmix_data_array_t*)malloc(sizeof(pmix_data_array_t));
 960                     if (NULL == kp2->value->data.darray) {
 961                         PMIX_DESTRUCT(&buf2);
 962                         PMIX_DESTRUCT(&kv);
 963                         PMIX_RELEASE(kp2);
 964                         return PMIX_ERR_NOMEM;
 965                     }
 966                     kp2->value->data.darray->type = PMIX_INFO;
 967                     PMIX_INFO_CREATE(info, 1);
 968                     if (NULL == info) {
 969                         PMIX_DESTRUCT(&buf2);
 970                         PMIX_DESTRUCT(&kv);
 971                         PMIX_RELEASE(kp2);
 972                         return PMIX_ERR_NOMEM;
 973                     }
 974                     PMIX_INFO_LOAD(&info[0], PMIX_LOCAL_PEERS, kv.value->data.string, PMIX_STRING);
 975                     kp2->value->data.darray->array = info;
 976                     kp2->value->data.darray->size = 1;
 977                     if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kp2))) {
 978                         PMIX_ERROR_LOG(rc);
 979                         PMIX_RELEASE(kp2);
 980                         PMIX_DESTRUCT(&kv);
 981                         PMIX_DESTRUCT(&buf2);
 982                         return rc;
 983                     }
 984                     PMIX_RELEASE(kp2);  // maintain acctg
 985                 }
 986                 /* split the list of procs so we can store their
 987                  * individual location data */
 988                 procs = pmix_argv_split(kv.value->data.string, ',');
 989                 for (j=0; NULL != procs[j]; j++) {
 990                     /* store the hostname for each proc - again, this is
 991                      * data obtained via a job-level exchange, so store it
 992                      * in the job-level data hash_table */
 993                     kp2 = PMIX_NEW(pmix_kval_t);
 994                     kp2->key = strdup(PMIX_HOSTNAME);
 995                     kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
 996                     kp2->value->type = PMIX_STRING;
 997                     kp2->value->data.string = strdup(kv.key);
 998                     rank = strtol(procs[j], NULL, 10);
 999                     if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, rank, kp2))) {
1000                         PMIX_ERROR_LOG(rc);
1001                         PMIX_RELEASE(kp2);
1002                         PMIX_DESTRUCT(&kv);
1003                         PMIX_DESTRUCT(&buf2);
1004                         pmix_argv_free(procs);
1005                         return rc;
1006                     }
1007                     PMIX_RELEASE(kp2);  // maintain acctg
1008                 }
1009                 pmix_argv_free(procs);
1010                 PMIX_DESTRUCT(&kv);
1011             }
1012             if (NULL != nodelist) {
1013                 /* store the comma-delimited list of nodes hosting
1014                  * procs in this nspace */
1015                 kp2 = PMIX_NEW(pmix_kval_t);
1016                 kp2->key = strdup(PMIX_NODE_LIST);
1017                 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
1018                 kp2->value->type = PMIX_STRING;
1019                 kp2->value->data.string = pmix_argv_join(nodelist, ',');
1020                 pmix_argv_free(nodelist);
1021                 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kp2))) {
1022                     PMIX_ERROR_LOG(rc);
1023                     PMIX_RELEASE(kp2);
1024                     PMIX_DESTRUCT(&kv);
1025                     PMIX_DESTRUCT(&buf2);
1026                     return rc;
1027                 }
1028                 PMIX_RELEASE(kp2);  // maintain acctg
1029             }
1030             /* cleanup */
1031             PMIX_DESTRUCT(&buf2);
1032         } else {
1033             /* if the value contains a string that is longer than the
1034              * limit, then compress it */
1035             if (PMIX_STRING_SIZE_CHECK(kptr->value)) {
1036                 if (pmix_compress.compress_string(kptr->value->data.string, &tmp, &len)) {
1037                     if (NULL == tmp) {
1038                         PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
1039                         rc = PMIX_ERR_NOMEM;
1040                         return rc;
1041                     }
1042                     kptr->value->type = PMIX_COMPRESSED_STRING;
1043                     free(kptr->value->data.string);
1044                     kptr->value->data.bo.bytes = (char*)tmp;
1045                     kptr->value->data.bo.size = len;
1046                 }
1047             }
1048             pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1049                                 "[%s:%u] pmix:gds:hash store job info storing key %s for WILDCARD rank",
1050                                 pmix_globals.myid.nspace, pmix_globals.myid.rank, kptr->key);
1051             if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kptr))) {
1052                 PMIX_ERROR_LOG(rc);
1053                 PMIX_RELEASE(kptr);
1054                 return rc;
1055             }
1056         }
1057         PMIX_RELEASE(kptr);
1058         kptr = PMIX_NEW(pmix_kval_t);
1059         cnt = 1;
1060         PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
1061                            buf, kptr, &cnt, PMIX_KVAL);
1062     }
1063     /* need to release the leftover kptr */
1064     PMIX_RELEASE(kptr);
1065 
1066     if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
1067         PMIX_ERROR_LOG(rc);
1068     } else {
1069         rc = PMIX_SUCCESS;
1070     }
1071     return rc;
1072 }
1073 
1074 static pmix_status_t hash_store(const pmix_proc_t *proc,
1075                                 pmix_scope_t scope,
1076                                 pmix_kval_t *kv)
1077 {
1078     pmix_hash_trkr_t *trk, *t;
1079     pmix_status_t rc;
1080     pmix_kval_t *kp;
1081 
1082     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1083                         "[%s:%d] gds:hash:hash_store for proc [%s:%d] key %s type %s scope %s",
1084                         pmix_globals.myid.nspace, pmix_globals.myid.rank,
1085                         proc->nspace, proc->rank, kv->key,
1086                         PMIx_Data_type_string(kv->value->type), PMIx_Scope_string(scope));
1087 
1088     if (NULL == kv->key) {
1089         return PMIX_ERR_BAD_PARAM;
1090     }
1091 
1092     /* find the hash table for this nspace */
1093     trk = NULL;
1094     PMIX_LIST_FOREACH(t, &myhashes, pmix_hash_trkr_t) {
1095         if (0 == strcmp(proc->nspace, t->ns)) {
1096             trk = t;
1097             break;
1098         }
1099     }
1100     if (NULL == trk) {
1101         /* create one */
1102         trk = PMIX_NEW(pmix_hash_trkr_t);
1103         trk->ns = strdup(proc->nspace);
1104         pmix_list_append(&myhashes, &trk->super);
1105     }
1106 
1107     /* see if the proc is me */
1108     if (proc->rank == pmix_globals.myid.rank &&
1109         0 == strncmp(proc->nspace, pmix_globals.myid.nspace, PMIX_MAX_NSLEN)) {
1110         if (PMIX_INTERNAL != scope) {
1111             /* always maintain a copy of my own info here to simplify
1112              * later retrieval */
1113             kp = PMIX_NEW(pmix_kval_t);
1114             if (NULL == kp) {
1115                 return PMIX_ERR_NOMEM;
1116             }
1117             kp->key = strdup(kv->key);
1118             kp->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
1119             if (NULL == kp->value) {
1120                 PMIX_RELEASE(kp);
1121                 return PMIX_ERR_NOMEM;
1122             }
1123             PMIX_BFROPS_VALUE_XFER(rc, pmix_globals.mypeer, kp->value, kv->value);
1124             if (PMIX_SUCCESS != rc) {
1125                 PMIX_RELEASE(kp);
1126                 return rc;
1127             }
1128             if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->internal, proc->rank, kp))) {
1129                 PMIX_ERROR_LOG(rc);
1130                 PMIX_RELEASE(kp);
1131                 return rc;
1132             }
1133             PMIX_RELEASE(kp);  // maintain accounting
1134         }
1135     }
1136 
1137     /* store it in the corresponding hash table */
1138     if (PMIX_INTERNAL == scope) {
1139         if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->internal, proc->rank, kv))) {
1140             PMIX_ERROR_LOG(rc);
1141             return rc;
1142         }
1143     } else if (PMIX_REMOTE == scope) {
1144         if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->remote, proc->rank, kv))) {
1145             PMIX_ERROR_LOG(rc);
1146             return rc;
1147         }
1148     } else if (PMIX_LOCAL == scope) {
1149         if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->local, proc->rank, kv))) {
1150             PMIX_ERROR_LOG(rc);
1151             return rc;
1152         }
1153     } else if (PMIX_GLOBAL == scope) {
1154         if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->remote, proc->rank, kv))) {
1155             PMIX_ERROR_LOG(rc);
1156             return rc;
1157         }
1158         /* a pmix_kval_t can only be on one list at a time, so we
1159          * have to duplicate it here */
1160         kp = PMIX_NEW(pmix_kval_t);
1161         if (NULL == kp) {
1162             return PMIX_ERR_NOMEM;
1163         }
1164         kp->key = strdup(kv->key);
1165         kp->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
1166         if (NULL == kp->value) {
1167             PMIX_RELEASE(kp);
1168             return PMIX_ERR_NOMEM;
1169         }
1170         PMIX_BFROPS_VALUE_XFER(rc, pmix_globals.mypeer, kp->value, kv->value);
1171         if (PMIX_SUCCESS != rc) {
1172             PMIX_ERROR_LOG(rc);
1173             PMIX_RELEASE(kp);
1174             return rc;
1175         }
1176         if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->local, proc->rank, kp))) {
1177             PMIX_ERROR_LOG(rc);
1178             PMIX_RELEASE(kp);
1179             return rc;
1180         }
1181         PMIX_RELEASE(kp);  // maintain accounting
1182     } else {
1183         return PMIX_ERR_BAD_PARAM;
1184     }
1185 
1186     return PMIX_SUCCESS;
1187 }
1188 
1189 /* this function is only called by the PMIx server when its
1190  * host has received data from some other peer. It therefore
1191  * always contains data solely from remote procs, and we
1192  * shall store it accordingly */
1193 static pmix_status_t hash_store_modex(struct pmix_namespace_t *nspace,
1194                                       pmix_buffer_t *buf,
1195                                       void *cbdata) {
1196     return pmix_gds_base_store_modex(nspace, buf, NULL,
1197                                      _hash_store_modex, cbdata);
1198 }
1199 
1200 static pmix_status_t _hash_store_modex(pmix_gds_base_ctx_t ctx,
1201                                        pmix_proc_t *proc,
1202                                        pmix_gds_modex_key_fmt_t key_fmt,
1203                                        char **kmap,
1204                                        pmix_buffer_t *pbkt)
1205 {
1206     pmix_hash_trkr_t *trk, *t;
1207     pmix_status_t rc = PMIX_SUCCESS;
1208     pmix_kval_t *kv;
1209 
1210     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1211                         "[%s:%d] gds:hash:store_modex for nspace %s",
1212                         pmix_globals.myid.nspace, pmix_globals.myid.rank,
1213                         proc->nspace);
1214 
1215     /* find the hash table for this nspace */
1216     trk = NULL;
1217     PMIX_LIST_FOREACH(t, &myhashes, pmix_hash_trkr_t) {
1218         if (0 == strcmp(proc->nspace, t->ns)) {
1219             trk = t;
1220             break;
1221         }
1222     }
1223     if (NULL == trk) {
1224         /* create one */
1225         trk = PMIX_NEW(pmix_hash_trkr_t);
1226         trk->ns = strdup(proc->nspace);
1227         pmix_list_append(&myhashes, &trk->super);
1228     }
1229 
1230     /* this is data returned via the PMIx_Fence call when
1231      * data collection was requested, so it only contains
1232      * REMOTE/GLOBAL data. The byte object contains
1233      * the rank followed by pmix_kval_t's. The list of callbacks
1234      * contains all local participants. */
1235 
1236     /* unpack the remaining values until we hit the end of the buffer */
1237     kv = PMIX_NEW(pmix_kval_t);
1238     rc = pmix_gds_base_modex_unpack_kval(key_fmt, pbkt, kmap, kv);
1239 
1240     while (PMIX_SUCCESS == rc) {
1241         /* store this in the hash table */
1242         if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->remote, proc->rank, kv))) {
1243             PMIX_ERROR_LOG(rc);
1244             return rc;
1245         }
1246         PMIX_RELEASE(kv);  // maintain accounting as the hash increments the ref count
1247         /* continue along */
1248         kv = PMIX_NEW(pmix_kval_t);
1249         rc = pmix_gds_base_modex_unpack_kval(key_fmt, pbkt, kmap, kv);
1250     }
1251     PMIX_RELEASE(kv);  // maintain accounting
1252     if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
1253         PMIX_ERROR_LOG(rc);
1254     } else {
1255         rc = PMIX_SUCCESS;
1256     }
1257     return rc;
1258 }
1259 
1260 
1261 static pmix_status_t hash_fetch(const pmix_proc_t *proc,
1262                                 pmix_scope_t scope, bool copy,
1263                                 const char *key,
1264                                 pmix_info_t qualifiers[], size_t nqual,
1265                                 pmix_list_t *kvs)
1266 {
1267     pmix_hash_trkr_t *trk, *t;
1268     pmix_status_t rc;
1269     pmix_value_t *val;
1270     pmix_kval_t *kv;
1271     pmix_info_t *info;
1272     size_t n, ninfo;
1273     pmix_hash_table_t *ht;
1274 
1275     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1276                         "[%s:%u] pmix:gds:hash fetch %s for proc %s:%u on scope %s",
1277                         pmix_globals.myid.nspace, pmix_globals.myid.rank,
1278                         (NULL == key) ? "NULL" : key,
1279                         proc->nspace, proc->rank, PMIx_Scope_string(scope));
1280 
1281     /* if the rank is wildcard and the key is NULL, then
1282      * they are asking for a complete copy of the job-level
1283      * info for this nspace - retrieve it */
1284     if (NULL == key && PMIX_RANK_WILDCARD == proc->rank) {
1285         /* see if we have a tracker for this nspace - we will
1286          * if we already cached the job info for it */
1287         trk = NULL;
1288         PMIX_LIST_FOREACH(t, &myhashes, pmix_hash_trkr_t) {
1289             if (0 == strcmp(proc->nspace, t->ns)) {
1290                 trk = t;
1291                 break;
1292             }
1293         }
1294         if (NULL == trk) {
1295             /* let the caller know */
1296             return PMIX_ERR_INVALID_NAMESPACE;
1297         }
1298         /* the job data is stored on the internal hash table */
1299         ht = &trk->internal;
1300         /* fetch all values from the hash table tied to rank=wildcard */
1301         val = NULL;
1302         rc = pmix_hash_fetch(ht, PMIX_RANK_WILDCARD, NULL, &val);
1303         if (PMIX_SUCCESS != rc) {
1304             if (NULL != val) {
1305                 PMIX_VALUE_RELEASE(val);
1306             }
1307             return rc;
1308         }
1309         if (NULL == val) {
1310             return PMIX_ERR_NOT_FOUND;
1311         }
1312         /* the data is returned in a pmix_data_array_t of pmix_info_t
1313          * structs. cycle thru and transfer them to the list */
1314         if (PMIX_DATA_ARRAY != val->type ||
1315             NULL == val->data.darray ||
1316             PMIX_INFO != val->data.darray->type) {
1317             PMIX_VALUE_RELEASE(val);
1318             return PMIX_ERR_INVALID_VAL;
1319         }
1320         info = (pmix_info_t*)val->data.darray->array;
1321         ninfo = val->data.darray->size;
1322         for (n=0; n < ninfo; n++) {
1323             kv = PMIX_NEW(pmix_kval_t);
1324             if (NULL == kv) {
1325                 rc = PMIX_ERR_NOMEM;
1326                 PMIX_VALUE_RELEASE(val);
1327                 return rc;
1328             }
1329             kv->key = strdup(info[n].key);
1330             PMIX_VALUE_XFER(rc, kv->value, &info[n].value);
1331             if (PMIX_SUCCESS != rc) {
1332                 PMIX_ERROR_LOG(rc);
1333                 PMIX_RELEASE(kv);
1334                 PMIX_VALUE_RELEASE(val);
1335                 return rc;
1336             }
1337             pmix_list_append(kvs, &kv->super);
1338         }
1339         PMIX_VALUE_RELEASE(val);
1340         return PMIX_SUCCESS;
1341     }
1342 
1343     /* find the hash table for this nspace */
1344     trk = NULL;
1345     PMIX_LIST_FOREACH(t, &myhashes, pmix_hash_trkr_t) {
1346         if (0 == strcmp(proc->nspace, t->ns)) {
1347             trk = t;
1348             break;
1349         }
1350     }
1351     if (NULL == trk) {
1352         return PMIX_ERR_INVALID_NAMESPACE;
1353     }
1354 
1355     /* fetch from the corresponding hash table - note that
1356      * we always provide a copy as we don't support
1357      * shared memory */
1358     if (PMIX_INTERNAL == scope ||
1359         PMIX_SCOPE_UNDEF == scope ||
1360         PMIX_GLOBAL == scope ||
1361         PMIX_RANK_WILDCARD == proc->rank) {
1362         ht = &trk->internal;
1363     } else if (PMIX_LOCAL == scope ||
1364                PMIX_GLOBAL == scope) {
1365         ht = &trk->local;
1366     } else if (PMIX_REMOTE == scope) {
1367         ht = &trk->remote;
1368     } else {
1369         PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
1370         return PMIX_ERR_BAD_PARAM;
1371     }
1372 
1373   doover:
1374     rc = pmix_hash_fetch(ht, proc->rank, key, &val);
1375     if (PMIX_SUCCESS == rc) {
1376         /* if the key was NULL, then all found keys will be
1377          * returned as a pmix_data_array_t in the value */
1378         if (NULL == key) {
1379             if (NULL == val->data.darray ||
1380                 PMIX_INFO != val->data.darray->type ||
1381                 0 == val->data.darray->size) {
1382                 PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND);
1383                 return PMIX_ERR_NOT_FOUND;
1384             }
1385             info = (pmix_info_t*)val->data.darray->array;
1386             ninfo = val->data.darray->size;
1387             for (n=0; n < ninfo; n++) {
1388                 kv = PMIX_NEW(pmix_kval_t);
1389                 if (NULL == kv) {
1390                     PMIX_VALUE_RELEASE(val);
1391                     return PMIX_ERR_NOMEM;
1392                 }
1393                 kv->key = strdup(info[n].key);
1394                 kv->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
1395                 if (NULL == kv->value) {
1396                     PMIX_VALUE_RELEASE(val);
1397                     PMIX_RELEASE(kv);
1398                     return PMIX_ERR_NOMEM;
1399                 }
1400                 PMIX_BFROPS_VALUE_XFER(rc, pmix_globals.mypeer,
1401                                        kv->value, &info[n].value);
1402                 if (PMIX_SUCCESS != rc) {
1403                     PMIX_ERROR_LOG(rc);
1404                     PMIX_VALUE_RELEASE(val);
1405                     PMIX_RELEASE(kv);
1406                     return rc;
1407                 }
1408                 pmix_list_append(kvs, &kv->super);
1409             }
1410             PMIX_VALUE_RELEASE(val);
1411             if (PMIX_GLOBAL == scope && ht == &trk->local) {
1412                 /* need to do this again for the remote data */
1413                 ht = &trk->remote;
1414                 goto doover;
1415             }
1416             return PMIX_SUCCESS;
1417         }
1418         /* just return the value */
1419         kv = PMIX_NEW(pmix_kval_t);
1420         if (NULL == kv) {
1421             PMIX_VALUE_RELEASE(val);
1422             return PMIX_ERR_NOMEM;
1423         }
1424         kv->key = strdup(key);
1425         kv->value = val;
1426         pmix_list_append(kvs, &kv->super);
1427     } else {
1428         if (PMIX_GLOBAL == scope ||
1429             PMIX_SCOPE_UNDEF == scope) {
1430             if (ht == &trk->internal) {
1431                 /* need to also try the local data */
1432                 ht = &trk->local;
1433                 goto doover;
1434             } else if (ht == &trk->local) {
1435                 /* need to also try the remote data */
1436                 ht = &trk->remote;
1437                 goto doover;
1438             }
1439         }
1440     }
1441 
1442     return rc;
1443 }
1444 
1445 static pmix_status_t setup_fork(const pmix_proc_t *proc, char ***env)
1446 {
1447     /* we don't need to add anything */
1448     return PMIX_SUCCESS;
1449 }
1450 
1451 static pmix_status_t nspace_add(const char *nspace,
1452                                 pmix_info_t info[],
1453                                 size_t ninfo)
1454 {
1455     /* we don't need to do anything here */
1456     return PMIX_SUCCESS;
1457 }
1458 
1459 static pmix_status_t nspace_del(const char *nspace)
1460 {
1461     pmix_hash_trkr_t *t;
1462 
1463     /* find the hash table for this nspace */
1464     PMIX_LIST_FOREACH(t, &myhashes, pmix_hash_trkr_t) {
1465         if (0 == strcmp(nspace, t->ns)) {
1466             /* release it */
1467             pmix_list_remove_item(&myhashes, &t->super);
1468             PMIX_RELEASE(t);
1469             break;
1470         }
1471     }
1472     return PMIX_SUCCESS;
1473 }
1474 
1475 static pmix_status_t assemb_kvs_req(const pmix_proc_t *proc,
1476                               pmix_list_t *kvs,
1477                               pmix_buffer_t *buf,
1478                               void *cbdata)
1479 {
1480     pmix_status_t rc = PMIX_SUCCESS;
1481     pmix_server_caddy_t *cd = (pmix_server_caddy_t*)cbdata;
1482     pmix_kval_t *kv;
1483 
1484     if (!PMIX_PROC_IS_V1(cd->peer)) {
1485         PMIX_BFROPS_PACK(rc, cd->peer, buf, proc, 1, PMIX_PROC);
1486         if (PMIX_SUCCESS != rc) {
1487             return rc;
1488         }
1489     }
1490     PMIX_LIST_FOREACH(kv, kvs, pmix_kval_t) {
1491         PMIX_BFROPS_PACK(rc, cd->peer, buf, kv, 1, PMIX_KVAL);
1492         if (PMIX_SUCCESS != rc) {
1493             return rc;
1494         }
1495     }
1496     return rc;
1497 }
1498 
1499 static pmix_status_t accept_kvs_resp(pmix_buffer_t *buf)
1500 {
1501     pmix_status_t rc = PMIX_SUCCESS;
1502     int32_t cnt;
1503     pmix_byte_object_t bo;
1504     pmix_buffer_t pbkt;
1505     pmix_kval_t *kv;
1506     pmix_proc_t proct;
1507 
1508     /* the incoming payload is provided as a set of packed
1509      * byte objects, one for each rank. A pmix_proc_t is the first
1510      * entry in the byte object. If the rank=PMIX_RANK_WILDCARD,
1511      * then that byte object contains job level info
1512      * for the provided nspace. Otherwise, the byte
1513      * object contains the pmix_kval_t's that were "put" by the
1514      * referenced process */
1515     cnt = 1;
1516     PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
1517                        buf, &bo, &cnt, PMIX_BYTE_OBJECT);
1518     while (PMIX_SUCCESS == rc) {
1519         /* setup the byte object for unpacking */
1520         PMIX_CONSTRUCT(&pbkt, pmix_buffer_t);
1521         PMIX_LOAD_BUFFER(pmix_client_globals.myserver,
1522                          &pbkt, bo.bytes, bo.size);
1523         /* unpack the id of the providing process */
1524         cnt = 1;
1525         PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
1526                            &pbkt, &proct, &cnt, PMIX_PROC);
1527         if (PMIX_SUCCESS != rc) {
1528             PMIX_ERROR_LOG(rc);
1529             return rc;
1530         }
1531         cnt = 1;
1532         kv = PMIX_NEW(pmix_kval_t);
1533         PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
1534                            &pbkt, kv, &cnt, PMIX_KVAL);
1535         while (PMIX_SUCCESS == rc) {
1536             /* let the GDS component for this peer store it - if
1537              * the kval contains shmem connection info, then the
1538              * component will know what to do about it (or else
1539              * we selected the wrong component for this peer!) */
1540 
1541             PMIX_GDS_STORE_KV(rc, pmix_globals.mypeer, &proct, PMIX_INTERNAL, kv);
1542             if (PMIX_SUCCESS != rc) {
1543                 PMIX_ERROR_LOG(rc);
1544                 PMIX_RELEASE(kv);
1545                 PMIX_DESTRUCT(&pbkt);
1546                 return rc;
1547             }
1548             PMIX_RELEASE(kv);  // maintain accounting
1549             /* get the next one */
1550             kv = PMIX_NEW(pmix_kval_t);
1551             cnt = 1;
1552             PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
1553                                &pbkt, kv, &cnt, PMIX_KVAL);
1554         }
1555         PMIX_RELEASE(kv);  // maintain accounting
1556         if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
1557             PMIX_ERROR_LOG(rc);
1558             PMIX_DESTRUCT(&pbkt);
1559             return rc;
1560         }
1561         PMIX_DESTRUCT(&pbkt);
1562         /* get the next one */
1563         cnt = 1;
1564         PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
1565                            buf, &bo, &cnt, PMIX_BYTE_OBJECT);
1566     }
1567     if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
1568         PMIX_ERROR_LOG(rc);
1569         return rc;
1570     }
1571     return rc;
1572 }

/* [<][>][^][v][top][bottom][index][help] */