root/opal/mca/pmix/pmix4x/pmix/src/mca/common/dstore/dstore_base.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. ncon
  2. ndes
  3. _esh_session_map_clean
  4. _esh_dir_del
  5. _esh_tbls_init
  6. _esh_ns_map_cleanup
  7. _esh_sessions_cleanup
  8. _esh_ns_track_cleanup
  9. _esh_session_map
  10. _esh_jobuid_tbl_search
  11. _esh_session_tbl_add
  12. _esh_session_map_search_server
  13. _esh_session_map_search_client
  14. _esh_session_init
  15. _esh_session_release
  16. _set_constants_from_env
  17. _update_ns_elem
  18. _put_ns_info_to_initial_segment
  19. _update_initial_segment_info
  20. _get_ns_info_from_initial_segment
  21. _get_track_elem_for_namespace
  22. _get_rank_meta_info
  23. set_rank_meta_info
  24. _get_data_region_by_offset
  25. get_free_offset
  26. put_empty_ext_slot
  27. put_data_to_the_end
  28. pmix_sm_store
  29. _store_data_for_rank
  30. _get_univ_size
  31. pmix_common_dstor_cache_job_info
  32. pmix_common_dstor_init
  33. pmix_common_dstor_finalize
  34. _dstore_store_nolock
  35. pmix_common_dstor_store
  36. _dstore_fetch
  37. pmix_common_dstor_fetch
  38. pmix_common_dstor_setup_fork
  39. pmix_common_dstor_add_nspace
  40. pmix_common_dstor_del_nspace
  41. _my_client
  42. pmix_common_dstor_store_modex
  43. _dstor_store_modex_cb
  44. _store_job_info
  45. pmix_common_dstor_register_job_info
  46. pmix_common_dstor_store_job_info
  47. _client_compat_save
  48. _client_peer

   1 /*
   2  * Copyright (c) 2015-2019 Intel, Inc.  All rights reserved.
   3  * Copyright (c) 2016-2018 IBM Corporation.  All rights reserved.
   4  * Copyright (c) 2016-2019 Mellanox Technologies, Inc.
   5  *                         All rights reserved.
   6  * Copyright (c) 2018-2019 Research Organization for Information Science
   7  *                         and Technology (RIST).  All rights reserved.
   8  *
   9  * $COPYRIGHT$
  10  *
  11  * Additional copyrights may follow
  12  *
  13  * $HEADER$
  14  */
  15 
  16 #include <src/include/pmix_config.h>
  17 
  18 #include <stdio.h>
  19 #include <sys/types.h>
  20 #include <sys/stat.h>
  21 #include <sys/file.h>
  22 #include <dirent.h>
  23 #include <errno.h>
  24 #ifdef HAVE_UNISTD_H
  25 #include <unistd.h>
  26 #endif
  27 #ifdef HAVE_SYS_TYPES_H
  28 #include <sys/types.h>
  29 #endif
  30 #ifdef HAVE_SYS_STAT_H
  31 #include <sys/stat.h>
  32 #endif
  33 #ifdef HAVE_FCNTL_H
  34 #include <fcntl.h>
  35 #endif
  36 #include <time.h>
  37 
  38 #include <pmix_common.h>
  39 
  40 #include "src/include/pmix_globals.h"
  41 #include "src/class/pmix_list.h"
  42 #include "src/client/pmix_client_ops.h"
  43 #include "src/server/pmix_server_ops.h"
  44 #include "src/util/argv.h"
  45 #include "src/mca/pcompress/pcompress.h"
  46 #include "src/util/error.h"
  47 #include "src/util/output.h"
  48 #include "src/util/pmix_environ.h"
  49 #include "src/util/hash.h"
  50 #include "src/mca/preg/preg.h"
  51 
  52 #include "src/mca/gds/base/base.h"
  53 #include "src/mca/pshmem/base/base.h"
  54 #include "dstore_common.h"
  55 #include "dstore_base.h"
  56 #include "dstore_segment.h"
  57 
  58 #define ESH_REGION_EXTENSION        "EXTENSION_SLOT"
  59 #define ESH_REGION_INVALIDATED      "INVALIDATED"
  60 #define ESH_ENV_INITIAL_SEG_SIZE    "INITIAL_SEG_SIZE"
  61 #define ESH_ENV_NS_META_SEG_SIZE    "NS_META_SEG_SIZE"
  62 #define ESH_ENV_NS_DATA_SEG_SIZE    "NS_DATA_SEG_SIZE"
  63 #define ESH_ENV_LINEAR              "SM_USE_LINEAR_SEARCH"
  64 
  65 #define ESH_INIT_SESSION_TBL_SIZE 2
  66 #define ESH_INIT_NS_MAP_TBL_SIZE  2
  67 
  68 static int _store_data_for_rank(pmix_common_dstore_ctx_t *ds_ctx, ns_track_elem_t *ns_info,
  69                                 pmix_rank_t rank, pmix_buffer_t *buf);
  70 static int _update_ns_elem(pmix_common_dstore_ctx_t *ds_ctx, ns_track_elem_t *ns_elem, ns_seg_info_t *info);
  71 static int _put_ns_info_to_initial_segment(pmix_common_dstore_ctx_t *ds_ctx,
  72                                            const ns_map_data_t *ns_map, pmix_pshmem_seg_t *metaseg,
  73                                            pmix_pshmem_seg_t *dataseg);
  74 static ns_seg_info_t *_get_ns_info_from_initial_segment(pmix_common_dstore_ctx_t *ds_ctx,
  75                                                         const ns_map_data_t *ns_map);
  76 static ns_track_elem_t *_get_track_elem_for_namespace(pmix_common_dstore_ctx_t *ds_ctx,
  77                                                       ns_map_data_t *ns_map);
  78 static rank_meta_info *_get_rank_meta_info(pmix_common_dstore_ctx_t *ds_ctx, pmix_rank_t rank,
  79                                            pmix_dstore_seg_desc_t *segdesc);
  80 static uint8_t *_get_data_region_by_offset(pmix_common_dstore_ctx_t *ds_ctx,
  81                                            pmix_dstore_seg_desc_t *segdesc, size_t offset);
  82 static void _update_initial_segment_info(pmix_common_dstore_ctx_t *ds_ctx,
  83                                          const ns_map_data_t *ns_map);
  84 static void _set_constants_from_env(pmix_common_dstore_ctx_t *ds_ctx);
  85 static inline ssize_t _get_univ_size(pmix_common_dstore_ctx_t *ds_ctx, const char *nspace);
  86 
  87 static inline ns_map_data_t * _esh_session_map_search_server(pmix_common_dstore_ctx_t *ds_ctx,
  88                                                              const char *nspace);
  89 static inline ns_map_data_t * _esh_session_map_search_client(pmix_common_dstore_ctx_t *ds_ctx,
  90                                                              const char *nspace);
  91 static inline ns_map_data_t * _esh_session_map(pmix_common_dstore_ctx_t *ds_ctx,
  92                                                const char *nspace, uint32_t local_size,
  93                                                size_t tbl_idx);
  94 static inline void _esh_session_map_clean(pmix_common_dstore_ctx_t *ds_ctx, ns_map_t *m);
  95 static inline int _esh_jobuid_tbl_search(pmix_common_dstore_ctx_t *ds_ctx,
  96                                          uid_t jobuid, size_t *tbl_idx);
  97 static inline int _esh_session_tbl_add(pmix_common_dstore_ctx_t *ds_ctx, size_t *tbl_idx);
  98 static int _esh_session_init(pmix_common_dstore_ctx_t *ds_ctx, size_t idx, ns_map_data_t *m,
  99                              uint32_t local_size, size_t jobuid, int setjobuid);
 100 static void _esh_session_release(pmix_common_dstore_ctx_t *ds_ctx, size_t idx);
 101 static inline void _esh_ns_track_cleanup(pmix_common_dstore_ctx_t *ds_ctx);
 102 static inline void _esh_sessions_cleanup(pmix_common_dstore_ctx_t *ds_ctx);
 103 static inline void _esh_ns_map_cleanup(pmix_common_dstore_ctx_t *ds_ctx);
 104 static inline int _esh_dir_del(const char *dirname);
 105 static inline void _client_compat_save(pmix_common_dstore_ctx_t *ds_ctx, pmix_peer_t *peer);
 106 static inline pmix_peer_t * _client_peer(pmix_common_dstore_ctx_t *ds_ctx);
 107 
 108 static inline int _my_client(const char *nspace, pmix_rank_t rank);
 109 
 110 static pmix_status_t _dstor_store_modex_cb(pmix_common_dstore_ctx_t *ds_ctx,
 111                                            pmix_proc_t *proc,
 112                                            pmix_gds_modex_key_fmt_t key_fmt,
 113                                            char **kmap,
 114                                            pmix_buffer_t *pbkt);
 115 
 116 static pmix_status_t _dstore_store_nolock(pmix_common_dstore_ctx_t *ds_ctx,
 117                                    ns_map_data_t *ns_map,
 118                                    pmix_rank_t rank,
 119                                    pmix_kval_t *kv);
 120 
 121 static pmix_status_t _dstore_fetch(pmix_common_dstore_ctx_t *ds_ctx,
 122                                    const char *nspace, pmix_rank_t rank,
 123                                    const char *key, pmix_value_t **kvs);
 124 
 125 ns_map_data_t * (*_esh_session_map_search)(const char *nspace) = NULL;
 126 
 127 #define _ESH_SESSION_lock(session_array, tbl_idx) \
 128     (PMIX_VALUE_ARRAY_GET_BASE(session_array, session_t)[tbl_idx].lock)
 129 
 130 #define _ESH_SESSION_path(session_array, tbl_idx) \
 131     (PMIX_VALUE_ARRAY_GET_BASE(session_array, session_t)[tbl_idx].nspace_path)
 132 
 133 #define _ESH_SESSION_lockfile(session_array, tbl_idx) \
 134     (PMIX_VALUE_ARRAY_GET_BASE(session_array, session_t)[tbl_idx].lockfile)
 135 
 136 #define _ESH_SESSION_setjobuid(session_array, tbl_idx) \
 137     (PMIX_VALUE_ARRAY_GET_BASE(session_array, session_t)[tbl_idx].setjobuid)
 138 
 139 #define _ESH_SESSION_jobuid(session_array, tbl_idx) \
 140     (PMIX_VALUE_ARRAY_GET_BASE(session_array, session_t)[tbl_idx].jobuid)
 141 
 142 #define _ESH_SESSION_sm_seg_first(session_array, tbl_idx) \
 143     (PMIX_VALUE_ARRAY_GET_BASE(session_array, session_t)[tbl_idx].sm_seg_first)
 144 #define _ESH_SESSION_sm_seg_last(session_array, tbl_idx) \
 145     (PMIX_VALUE_ARRAY_GET_BASE(session_array, session_t)[tbl_idx].sm_seg_last)
 146 
 147 #define _ESH_SESSION_ns_info(session_array, tbl_idx) \
 148     (PMIX_VALUE_ARRAY_GET_BASE(session_array, session_t)[tbl_idx].ns_info)
 149 
 150 #ifdef ESH_PTHREAD_LOCK
 151 #define _ESH_SESSION_pthread_rwlock(tbl_idx) (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].rwlock)
 152 #define _ESH_SESSION_pthread_seg(tbl_idx)   (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].rwlock_seg)
 153 #define _ESH_SESSION_lock(tbl_idx)         _ESH_SESSION_pthread_rwlock(tbl_idx)
 154 #endif
 155 
 156 #ifdef ESH_FCNTL_LOCK
 157 #define _ESH_SESSION_lockfd(tbl_idx)       (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].lockfd)
 158 #define _ESH_SESSION_lock(tbl_idx)         _ESH_SESSION_lockfd(tbl_idx)
 159 #endif
 160 
 161 #define _ESH_LOCK(ds_ctx, session_id, operation)                               \
 162 __pmix_attribute_extension__ ({                                                \
 163     pmix_status_t rc = PMIX_SUCCESS;                                           \
 164     rc = ds_ctx->lock_cbs->operation(_ESH_SESSION_lock(ds_ctx->session_array,  \
 165                                                     session_id));              \
 166     rc;                                                                        \
 167 })
 168 
 169 static void ncon(ns_track_elem_t *p) {
 170     memset(&p->ns_map, 0, sizeof(p->ns_map));
 171     p->meta_seg = NULL;
 172     p->data_seg = NULL;
 173     p->num_meta_seg = 0;
 174     p->num_data_seg = 0;
 175     p->in_use = true;
 176 }
 177 
 178 static void ndes(ns_track_elem_t *p) {
 179     pmix_common_dstor_delete_sm_desc(p->meta_seg);
 180     pmix_common_dstor_delete_sm_desc(p->data_seg);
 181     memset(&p->ns_map, 0, sizeof(p->ns_map));
 182     p->in_use = false;
 183 }
 184 
 185 PMIX_CLASS_INSTANCE(ns_track_elem_t,
 186                     pmix_value_array_t,
 187                     ncon, ndes);
 188 
 189 static inline void _esh_session_map_clean(pmix_common_dstore_ctx_t *ds_ctx, ns_map_t *m) {
 190     memset(m, 0, sizeof(*m));
 191     m->data.track_idx = -1;
 192 }
 193 
 194 static inline int _esh_dir_del(const char *path)
 195 {
 196     DIR *dir;
 197     struct dirent *d_ptr;
 198     struct stat st;
 199     pmix_status_t rc = PMIX_SUCCESS;
 200 
 201     char name[PMIX_PATH_MAX];
 202 
 203     dir = opendir(path);
 204     if (NULL == dir) {
 205         rc = PMIX_ERR_BAD_PARAM;
 206         return rc;
 207     }
 208 
 209     while (NULL != (d_ptr = readdir(dir))) {
 210         snprintf(name, PMIX_PATH_MAX, "%s/%s", path, d_ptr->d_name);
 211         if ( 0 > lstat(name, &st) ){
 212             /* No fatal error here - just log this event
 213              * we will hit the error later at rmdir. Keep trying ...
 214              */
 215             PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND);
 216             continue;
 217         }
 218 
 219         if(S_ISDIR(st.st_mode)) {
 220             if(strcmp(d_ptr->d_name, ".") && strcmp(d_ptr->d_name, "..")) {
 221                 rc = _esh_dir_del(name);
 222                 if( PMIX_SUCCESS != rc ){
 223                     /* No fatal error here - just log this event
 224                      * we will hit the error later at rmdir. Keep trying ...
 225                      */
 226                     PMIX_ERROR_LOG(rc);
 227                 }
 228             }
 229         }
 230         else {
 231             if( 0 > unlink(name) ){
 232                 /* No fatal error here - just log this event
 233                  * we will hit the error later at rmdir. Keep trying ...
 234                  */
 235                 PMIX_ERROR_LOG(PMIX_ERR_NO_PERMISSIONS);
 236             }
 237         }
 238     }
 239     closedir(dir);
 240 
 241     /* remove the top dir */
 242     if( 0 > rmdir(path) ){
 243         rc = PMIX_ERR_NO_PERMISSIONS;
 244         PMIX_ERROR_LOG(rc);
 245     }
 246     return rc;
 247 }
 248 
 249 static inline int _esh_tbls_init(pmix_common_dstore_ctx_t *ds_ctx)
 250 {
 251     pmix_status_t rc = PMIX_SUCCESS;
 252     size_t idx;
 253 
 254     /* initial settings */
 255     ds_ctx->ns_track_array = NULL;
 256     ds_ctx->session_array = NULL;
 257     ds_ctx->ns_map_array = NULL;
 258 
 259     /* Setup namespace tracking array */
 260     if (NULL == (ds_ctx->ns_track_array = PMIX_NEW(pmix_value_array_t))) {
 261         rc = PMIX_ERR_OUT_OF_RESOURCE;
 262         PMIX_ERROR_LOG(rc);
 263         goto err_exit;
 264     }
 265     if (PMIX_SUCCESS != (rc = pmix_value_array_init(ds_ctx->ns_track_array, sizeof(ns_track_elem_t)))){
 266         PMIX_ERROR_LOG(rc);
 267         goto err_exit;
 268     }
 269 
 270     /* Setup sessions table */
 271     if (NULL == (ds_ctx->session_array = PMIX_NEW(pmix_value_array_t))){
 272         rc = PMIX_ERR_OUT_OF_RESOURCE;
 273         PMIX_ERROR_LOG(rc);
 274         goto err_exit;
 275     }
 276     if (PMIX_SUCCESS != (rc = pmix_value_array_init(ds_ctx->session_array, sizeof(session_t)))) {
 277         PMIX_ERROR_LOG(rc);
 278         goto err_exit;
 279     }
 280     if (PMIX_SUCCESS != (rc = pmix_value_array_set_size(ds_ctx->session_array, ESH_INIT_SESSION_TBL_SIZE))) {
 281         PMIX_ERROR_LOG(rc);
 282         goto err_exit;
 283     }
 284     for (idx = 0; idx < ESH_INIT_SESSION_TBL_SIZE; idx++) {
 285         memset(pmix_value_array_get_item(ds_ctx->session_array, idx), 0, sizeof(session_t));
 286     }
 287 
 288     /* Setup namespace map array */
 289     if (NULL == (ds_ctx->ns_map_array = PMIX_NEW(pmix_value_array_t))) {
 290         rc = PMIX_ERR_OUT_OF_RESOURCE;
 291         PMIX_ERROR_LOG(rc);
 292         goto err_exit;
 293     }
 294     if (PMIX_SUCCESS != (rc = pmix_value_array_init(ds_ctx->ns_map_array, sizeof(ns_map_t)))) {
 295         PMIX_ERROR_LOG(rc);
 296         goto err_exit;
 297     }
 298     if (PMIX_SUCCESS != (rc = pmix_value_array_set_size(ds_ctx->ns_map_array, ESH_INIT_NS_MAP_TBL_SIZE))) {
 299         PMIX_ERROR_LOG(rc);
 300         goto err_exit;
 301     }
 302     for (idx = 0; idx < ESH_INIT_NS_MAP_TBL_SIZE; idx++) {
 303         _esh_session_map_clean(ds_ctx, pmix_value_array_get_item(ds_ctx->ns_map_array, idx));
 304     }
 305 
 306     return PMIX_SUCCESS;
 307 err_exit:
 308     if (NULL != ds_ctx->ns_track_array) {
 309         PMIX_RELEASE(ds_ctx->ns_track_array);
 310     }
 311     if (NULL != ds_ctx->session_array) {
 312         PMIX_RELEASE(ds_ctx->session_array);
 313     }
 314     if (NULL != ds_ctx->ns_map_array) {
 315         PMIX_RELEASE(ds_ctx->ns_map_array);
 316     }
 317     return rc;
 318 }
 319 
 320 static inline void _esh_ns_map_cleanup(pmix_common_dstore_ctx_t *ds_ctx)
 321 {
 322     size_t idx;
 323     size_t size;
 324     ns_map_t *ns_map;
 325 
 326     if (NULL == ds_ctx->ns_map_array) {
 327         return;
 328     }
 329 
 330     size = pmix_value_array_get_size(ds_ctx->ns_map_array);
 331     ns_map = PMIX_VALUE_ARRAY_GET_BASE(ds_ctx->ns_map_array, ns_map_t);
 332 
 333     for (idx = 0; idx < size; idx++) {
 334         if(ns_map[idx].in_use) {
 335             _esh_session_map_clean(ds_ctx, &ns_map[idx]);
 336         }
 337     }
 338 
 339     PMIX_RELEASE(ds_ctx->ns_map_array);
 340     ds_ctx->ns_map_array = NULL;
 341 }
 342 
 343 static inline void _esh_sessions_cleanup(pmix_common_dstore_ctx_t *ds_ctx)
 344 {
 345     size_t idx;
 346     size_t size;
 347     session_t *s_tbl;
 348 
 349     if (NULL == ds_ctx->session_array) {
 350         return;
 351     }
 352 
 353     size = pmix_value_array_get_size(ds_ctx->session_array);
 354     s_tbl = PMIX_VALUE_ARRAY_GET_BASE(ds_ctx->session_array, session_t);
 355 
 356     for (idx = 0; idx < size; idx++) {
 357         if(s_tbl[idx].in_use)
 358             _esh_session_release(ds_ctx, idx);
 359     }
 360 
 361     PMIX_RELEASE(ds_ctx->session_array);
 362     ds_ctx->session_array = NULL;
 363 }
 364 
 365 static inline void _esh_ns_track_cleanup(pmix_common_dstore_ctx_t *ds_ctx)
 366 {
 367     int size;
 368     ns_track_elem_t *ns_trk;
 369 
 370     if (NULL == ds_ctx->ns_track_array) {
 371         return;
 372     }
 373 
 374     size = pmix_value_array_get_size(ds_ctx->ns_track_array);
 375     ns_trk = PMIX_VALUE_ARRAY_GET_BASE(ds_ctx->ns_track_array, ns_track_elem_t);
 376 
 377     for (int i = 0; i < size; i++) {
 378         ns_track_elem_t *trk = ns_trk + i;
 379         if (trk->in_use) {
 380             PMIX_DESTRUCT(trk);
 381         }
 382     }
 383 
 384     PMIX_RELEASE(ds_ctx->ns_track_array);
 385     ds_ctx->ns_track_array = NULL;
 386 }
 387 
 388 static inline ns_map_data_t * _esh_session_map(pmix_common_dstore_ctx_t *ds_ctx,
 389                                                const char *nspace, uint32_t local_size,
 390                                                size_t tbl_idx)
 391 {
 392     size_t map_idx;
 393     size_t size = pmix_value_array_get_size(ds_ctx->ns_map_array);
 394     ns_map_t *ns_map = PMIX_VALUE_ARRAY_GET_BASE(ds_ctx->ns_map_array, ns_map_t);
 395     ns_map_t *new_map = NULL;
 396 
 397     if (NULL == nspace) {
 398         PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
 399         return NULL;
 400     }
 401 
 402     for(map_idx = 0; map_idx < size; map_idx++) {
 403         if (!ns_map[map_idx].in_use) {
 404             ns_map[map_idx].in_use = true;
 405             pmix_strncpy(ns_map[map_idx].data.name, nspace, sizeof(ns_map[map_idx].data.name)-1);
 406             ns_map[map_idx].data.tbl_idx = tbl_idx;
 407             return  &ns_map[map_idx].data;
 408         }
 409     }
 410 
 411     if (NULL == (new_map = pmix_value_array_get_item(ds_ctx->ns_map_array, map_idx))) {
 412         PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
 413         return NULL;
 414     }
 415 
 416     _esh_session_map_clean(ds_ctx, new_map);
 417     new_map->in_use = true;
 418     new_map->data.tbl_idx = tbl_idx;
 419     pmix_strncpy(new_map->data.name, nspace, sizeof(new_map->data.name)-1);
 420 
 421     return  &new_map->data;
 422 }
 423 
 424 static inline int _esh_jobuid_tbl_search(pmix_common_dstore_ctx_t *ds_ctx,
 425                                          uid_t jobuid, size_t *tbl_idx)
 426 {
 427     size_t idx, size;
 428     session_t *session_tbl = NULL;
 429 
 430     size = pmix_value_array_get_size(ds_ctx->session_array);
 431     session_tbl = PMIX_VALUE_ARRAY_GET_BASE(ds_ctx->session_array, session_t);
 432 
 433     for(idx = 0; idx < size; idx++) {
 434         if (session_tbl[idx].in_use && session_tbl[idx].jobuid == jobuid) {
 435             *tbl_idx = idx;
 436             return PMIX_SUCCESS;
 437         }
 438     }
 439 
 440     return PMIX_ERR_NOT_FOUND;
 441 }
 442 
 443 static inline int _esh_session_tbl_add(pmix_common_dstore_ctx_t *ds_ctx, size_t *tbl_idx)
 444 {
 445     size_t idx;
 446     size_t size = pmix_value_array_get_size(ds_ctx->session_array);
 447     session_t *s_tbl = PMIX_VALUE_ARRAY_GET_BASE(ds_ctx->session_array, session_t);
 448     session_t *new_sesion;
 449     pmix_status_t rc = PMIX_SUCCESS;
 450 
 451     for(idx = 0; idx < size; idx ++) {
 452         if (0 == s_tbl[idx].in_use) {
 453             goto done;
 454         }
 455     }
 456 
 457     if (NULL == (new_sesion = pmix_value_array_get_item(ds_ctx->session_array, idx))) {
 458         rc = PMIX_ERR_OUT_OF_RESOURCE;
 459         PMIX_ERROR_LOG(rc);
 460         return rc;
 461     }
 462 
 463 done:
 464     s_tbl[idx].in_use = 1;
 465     *tbl_idx = idx;
 466 
 467     return PMIX_SUCCESS;
 468 }
 469 
 470 static inline ns_map_data_t * _esh_session_map_search_server(pmix_common_dstore_ctx_t *ds_ctx,
 471                                                              const char *nspace)
 472 {
 473     size_t idx, size = pmix_value_array_get_size(ds_ctx->ns_map_array);
 474     ns_map_t *ns_map = PMIX_VALUE_ARRAY_GET_BASE(ds_ctx->ns_map_array, ns_map_t);
 475     if (NULL == nspace) {
 476         return NULL;
 477     }
 478 
 479     for (idx = 0; idx < size; idx++) {
 480         if (ns_map[idx].in_use &&
 481             (0 == strcmp(ns_map[idx].data.name, nspace))) {
 482                 return &ns_map[idx].data;
 483         }
 484     }
 485     return NULL;
 486 }
 487 
 488 static inline ns_map_data_t * _esh_session_map_search_client(pmix_common_dstore_ctx_t *ds_ctx,
 489                                                              const char *nspace)
 490 {
 491     size_t idx, size = pmix_value_array_get_size(ds_ctx->ns_map_array);
 492     ns_map_t *ns_map = PMIX_VALUE_ARRAY_GET_BASE(ds_ctx->ns_map_array, ns_map_t);
 493 
 494     if (NULL == nspace) {
 495         return NULL;
 496     }
 497 
 498     for (idx = 0; idx < size; idx++) {
 499         if (ns_map[idx].in_use &&
 500             (0 == strcmp(ns_map[idx].data.name, nspace))) {
 501                 return &ns_map[idx].data;
 502         }
 503     }
 504     return _esh_session_map(ds_ctx, nspace, 0, 0);
 505 }
 506 
 507 static int _esh_session_init(pmix_common_dstore_ctx_t *ds_ctx, size_t idx, ns_map_data_t *m,
 508                              uint32_t local_size, size_t jobuid, int setjobuid)
 509 {
 510     pmix_dstore_seg_desc_t *seg = NULL;
 511     session_t *s = &(PMIX_VALUE_ARRAY_GET_ITEM(ds_ctx->session_array, session_t, idx));
 512     pmix_status_t rc = PMIX_SUCCESS;
 513 
 514     s->setjobuid = setjobuid;
 515     s->jobuid = jobuid;
 516     s->nspace_path = strdup(ds_ctx->base_path);
 517 
 518     if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) {
 519         if (0 != mkdir(s->nspace_path, 0770)) {
 520             if (EEXIST != errno) {
 521                 pmix_output(0, "session init: can not create session directory \"%s\": %s",
 522                     s->nspace_path, strerror(errno));
 523                 rc = PMIX_ERROR;
 524                 PMIX_ERROR_LOG(rc);
 525                 return rc;
 526             }
 527         }
 528         if (s->setjobuid > 0){
 529             if (0 > chown(s->nspace_path, (uid_t) s->jobuid, (gid_t) -1)){
 530                 rc = PMIX_ERROR;
 531                 PMIX_ERROR_LOG(rc);
 532                 return rc;
 533             }
 534         }
 535         seg = pmix_common_dstor_create_new_segment(PMIX_DSTORE_INITIAL_SEGMENT, ds_ctx->base_path,
 536                                                    m->name, 0, ds_ctx->jobuid, ds_ctx->setjobuid);
 537         if( NULL == seg ){
 538             rc = PMIX_ERR_OUT_OF_RESOURCE;
 539             PMIX_ERROR_LOG(rc);
 540             return rc;
 541         }
 542     }
 543     else {
 544         seg = pmix_common_dstor_attach_new_segment(PMIX_DSTORE_INITIAL_SEGMENT, ds_ctx->base_path, m->name, 0);
 545         if( NULL == seg ){
 546             rc = PMIX_ERR_OUT_OF_RESOURCE;
 547             PMIX_ERROR_LOG(rc);
 548             return rc;
 549         }
 550     }
 551     s->sm_seg_first = seg;
 552     s->sm_seg_last = s->sm_seg_first;
 553 
 554     return PMIX_SUCCESS;
 555 }
 556 
 557 static void _esh_session_release(pmix_common_dstore_ctx_t *ds_ctx, size_t idx)
 558 {
 559     session_t *s = &(PMIX_VALUE_ARRAY_GET_ITEM(ds_ctx->session_array, session_t, idx));
 560 
 561     if (!s->in_use) {
 562         return;
 563     }
 564 
 565     pmix_common_dstor_delete_sm_desc(s->sm_seg_first);
 566 
 567     ds_ctx->lock_cbs->finalize(&_ESH_SESSION_lock(ds_ctx->session_array, idx));
 568 
 569     if (NULL != s->nspace_path) {
 570         if(PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) {
 571             _esh_dir_del(s->nspace_path);
 572         }
 573         free(s->nspace_path);
 574     }
 575     memset ((char *) s, 0, sizeof(*s));
 576 }
 577 
 578 static void _set_constants_from_env(pmix_common_dstore_ctx_t *ds_ctx)
 579 {
 580     char *str;
 581     int page_size = pmix_common_dstor_getpagesize();
 582 
 583     if( NULL != (str = getenv(ESH_ENV_INITIAL_SEG_SIZE)) ) {
 584         ds_ctx->initial_segment_size = strtoul(str, NULL, 10);
 585         if ((size_t)page_size > ds_ctx->initial_segment_size) {
 586             ds_ctx->initial_segment_size = (size_t)page_size;
 587         }
 588     }
 589     if (0 == ds_ctx->initial_segment_size) {
 590         ds_ctx->initial_segment_size = INITIAL_SEG_SIZE;
 591     }
 592     if( NULL != (str = getenv(ESH_ENV_NS_META_SEG_SIZE)) ) {
 593         ds_ctx->meta_segment_size = strtoul(str, NULL, 10);
 594         if ((size_t)page_size > ds_ctx->meta_segment_size) {
 595             ds_ctx->meta_segment_size = (size_t)page_size;
 596         }
 597     }
 598     if (0 == ds_ctx->meta_segment_size) {
 599         ds_ctx->meta_segment_size = NS_META_SEG_SIZE;
 600     }
 601     if( NULL != (str = getenv(ESH_ENV_NS_DATA_SEG_SIZE)) ) {
 602         ds_ctx->data_segment_size = strtoul(str, NULL, 10);
 603         if ((size_t)page_size > ds_ctx->data_segment_size) {
 604             ds_ctx->data_segment_size = (size_t)page_size;
 605         }
 606     }
 607     if (0 == ds_ctx->data_segment_size) {
 608         ds_ctx->data_segment_size = NS_DATA_SEG_SIZE;
 609     }
 610     if (NULL != (str = getenv(ESH_ENV_LINEAR))) {
 611         if (1 == strtoul(str, NULL, 10)) {
 612             ds_ctx->direct_mode = 1;
 613         }
 614     }
 615 
 616     ds_ctx->lock_segment_size = page_size;
 617     ds_ctx->max_ns_num = (ds_ctx->initial_segment_size - sizeof(size_t) * 2) / sizeof(ns_seg_info_t);
 618     ds_ctx->max_meta_elems = (ds_ctx->meta_segment_size - sizeof(size_t)) / sizeof(rank_meta_info);
 619 
 620     pmix_common_dstor_init_segment_info(ds_ctx->initial_segment_size, ds_ctx->meta_segment_size,
 621                                         ds_ctx->data_segment_size);
 622 
 623 }
 624 
 625 /* This function synchronizes the content of initial shared segment and the local track list. */
 626 static int _update_ns_elem(pmix_common_dstore_ctx_t *ds_ctx, ns_track_elem_t *ns_elem,
 627                            ns_seg_info_t *info)
 628 {
 629     pmix_dstore_seg_desc_t *seg, *tmp = NULL;
 630     size_t i, offs;
 631     ns_map_data_t *ns_map = NULL;
 632     pmix_status_t rc;
 633 
 634     PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
 635                          "%s:%d:%s",
 636                          __FILE__, __LINE__, __func__));
 637 
 638     if (NULL == (ns_map = ds_ctx->session_map_search(ds_ctx, info->ns_map.name))) {
 639         rc = PMIX_ERR_NOT_AVAILABLE;
 640         PMIX_ERROR_LOG(rc);
 641         return rc;
 642     }
 643 
 644     tmp = ns_elem->meta_seg;
 645     if (NULL != tmp) {
 646         while(NULL != tmp->next) {
 647             tmp = tmp->next;
 648         }
 649     }
 650 
 651     /* synchronize number of meta segments for the target namespace. */
 652     for (i = ns_elem->num_meta_seg; i < info->num_meta_seg; i++) {
 653         if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) {
 654             seg = pmix_common_dstor_create_new_segment(PMIX_DSTORE_NS_META_SEGMENT, ds_ctx->base_path,
 655                                                        info->ns_map.name, i, ds_ctx->jobuid,
 656                                                        ds_ctx->setjobuid);
 657             if (NULL == seg) {
 658                 rc = PMIX_ERR_OUT_OF_RESOURCE;
 659                 PMIX_ERROR_LOG(rc);
 660                 return rc;
 661             }
 662         } else {
 663             seg = pmix_common_dstor_attach_new_segment(PMIX_DSTORE_NS_META_SEGMENT, ds_ctx->base_path, info->ns_map.name, i);
 664             if (NULL == seg) {
 665                 rc = PMIX_ERR_NOT_AVAILABLE;
 666                 PMIX_ERROR_LOG(rc);
 667                 return rc;
 668             }
 669         }
 670 
 671         if (NULL == tmp) {
 672             ns_elem->meta_seg = seg;
 673         } else {
 674             tmp->next = seg;
 675         }
 676         tmp = seg;
 677         ns_elem->num_meta_seg++;
 678     }
 679 
 680     tmp = ns_elem->data_seg;
 681     if (NULL != tmp) {
 682         while(NULL != tmp->next) {
 683             tmp = tmp->next;
 684         }
 685     }
 686     /* synchronize number of data segments for the target namespace. */
 687     for (i = ns_elem->num_data_seg; i < info->num_data_seg; i++) {
 688         if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) {
 689             seg = pmix_common_dstor_create_new_segment(PMIX_DSTORE_NS_DATA_SEGMENT, ds_ctx->base_path,
 690                                                        info->ns_map.name, i, ds_ctx->jobuid,
 691                                                        ds_ctx->setjobuid);
 692             if (NULL == seg) {
 693                 rc = PMIX_ERR_OUT_OF_RESOURCE;
 694                 PMIX_ERROR_LOG(rc);
 695                 return rc;
 696             }
 697             offs = sizeof(size_t);//shift on offset field itself
 698             memcpy(seg->seg_info.seg_base_addr, &offs, sizeof(size_t));
 699         } else {
 700             seg = pmix_common_dstor_attach_new_segment(PMIX_DSTORE_NS_DATA_SEGMENT, ds_ctx->base_path, info->ns_map.name, i);
 701             if (NULL == seg) {
 702                 rc = PMIX_ERR_NOT_AVAILABLE;
 703                 PMIX_ERROR_LOG(rc);
 704                 return rc;
 705             }
 706         }
 707 
 708         if (NULL == tmp) {
 709             ns_elem->data_seg = seg;
 710         } else {
 711             tmp->next = seg;
 712         }
 713         tmp = seg;
 714         ns_elem->num_data_seg++;
 715     }
 716 
 717     return PMIX_SUCCESS;
 718 }
 719 
 720 static int _put_ns_info_to_initial_segment(pmix_common_dstore_ctx_t *ds_ctx,
 721                                            const ns_map_data_t *ns_map, pmix_pshmem_seg_t *metaseg,
 722                                            pmix_pshmem_seg_t *dataseg)
 723 {
 724     ns_seg_info_t elem;
 725     size_t num_elems;
 726     num_elems = *((size_t*)(_ESH_SESSION_sm_seg_last(ds_ctx->session_array,
 727                                                      ns_map->tbl_idx)->seg_info.seg_base_addr));
 728     pmix_dstore_seg_desc_t *last_seg = _ESH_SESSION_sm_seg_last(ds_ctx->session_array, ns_map->tbl_idx);
 729     pmix_status_t rc;
 730 
 731     PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
 732                          "%s:%d:%s", __FILE__, __LINE__, __func__));
 733 
 734     if (ds_ctx->max_ns_num == num_elems) {
 735         num_elems = 0;
 736         if (NULL == (last_seg = pmix_common_dstor_extend_segment(last_seg, ds_ctx->base_path, ns_map->name,
 737                                                                  ds_ctx->jobuid, ds_ctx->setjobuid))) {
 738             rc = PMIX_ERROR;
 739             PMIX_ERROR_LOG(rc);
 740             return rc;
 741         }
 742         /* mark previous segment as full */
 743         size_t full = 1;
 744         memcpy((uint8_t*)(_ESH_SESSION_sm_seg_last(ds_ctx->session_array, ns_map->tbl_idx)->seg_info.seg_base_addr +
 745                           sizeof(size_t)), &full, sizeof(size_t));
 746         _ESH_SESSION_sm_seg_last(ds_ctx->session_array, ns_map->tbl_idx) = last_seg;
 747         memset(_ESH_SESSION_sm_seg_last(ds_ctx->session_array, ns_map->tbl_idx)->seg_info.seg_base_addr,
 748                0, ds_ctx->initial_segment_size);
 749     }
 750     memset(&elem.ns_map, 0, sizeof(elem.ns_map));
 751     pmix_strncpy(elem.ns_map.name, ns_map->name, sizeof(elem.ns_map.name)-1);
 752     elem.ns_map.tbl_idx = ns_map->tbl_idx;
 753     elem.num_meta_seg = 1;
 754     elem.num_data_seg = 1;
 755     memcpy((uint8_t*)(_ESH_SESSION_sm_seg_last(ds_ctx->session_array, ns_map->tbl_idx)->seg_info.seg_base_addr) +
 756            sizeof(size_t) * 2 + num_elems * sizeof(ns_seg_info_t), &elem, sizeof(ns_seg_info_t));
 757     num_elems++;
 758     memcpy((uint8_t*)(_ESH_SESSION_sm_seg_last(ds_ctx->session_array, ns_map->tbl_idx)->seg_info.seg_base_addr),
 759            &num_elems, sizeof(size_t));
 760     return PMIX_SUCCESS;
 761 }
 762 
 763 /* clients should sync local info with information from initial segment regularly */
 764 static void _update_initial_segment_info(pmix_common_dstore_ctx_t *ds_ctx, const ns_map_data_t *ns_map)
 765 {
 766     pmix_dstore_seg_desc_t *tmp;
 767     tmp = _ESH_SESSION_sm_seg_first(ds_ctx->session_array, ns_map->tbl_idx);
 768 
 769     PMIX_OUTPUT_VERBOSE((2, pmix_gds_base_framework.framework_output,
 770                          "%s:%d:%s", __FILE__, __LINE__, __func__));
 771 
 772     /* go through all global segments */
 773     do {
 774         /* check if current segment was marked as full but no more next segment is in the chain */
 775         if (NULL == tmp->next && 1 == *((size_t*)((uint8_t*)(tmp->seg_info.seg_base_addr) + sizeof(size_t)))) {
 776             tmp->next = pmix_common_dstor_attach_new_segment(PMIX_DSTORE_INITIAL_SEGMENT, ds_ctx->base_path,
 777                                                              ns_map->name, tmp->id+1);
 778         }
 779         tmp = tmp->next;
 780     }
 781     while (NULL != tmp);
 782 }
 783 
 784 /* this function will be used by clients to get ns data from the initial segment and add them to the tracker list */
 785 static ns_seg_info_t *_get_ns_info_from_initial_segment(pmix_common_dstore_ctx_t *ds_ctx,
 786                                                         const ns_map_data_t *ns_map)
 787 {
 788     pmix_status_t rc;
 789     size_t i;
 790     pmix_dstore_seg_desc_t *tmp;
 791     ns_seg_info_t *elem, *cur_elem;
 792     elem = NULL;
 793     size_t num_elems;
 794 
 795     PMIX_OUTPUT_VERBOSE((2, pmix_gds_base_framework.framework_output,
 796                          "%s:%d:%s", __FILE__, __LINE__, __func__));
 797 
 798     tmp = _ESH_SESSION_sm_seg_first(ds_ctx->session_array, ns_map->tbl_idx);
 799 
 800     rc = 1;
 801     /* go through all global segments */
 802     do {
 803         num_elems = *((size_t*)(tmp->seg_info.seg_base_addr));
 804         for (i = 0; i < num_elems; i++) {
 805             cur_elem = (ns_seg_info_t*)((uint8_t*)(tmp->seg_info.seg_base_addr) + sizeof(size_t) * 2 + i * sizeof(ns_seg_info_t));
 806             if (0 == (rc = strncmp(cur_elem->ns_map.name, ns_map->name, strlen(ns_map->name)+1))) {
 807                 break;
 808             }
 809         }
 810         if (0 == rc) {
 811             elem = cur_elem;
 812             break;
 813         }
 814         tmp = tmp->next;
 815     }
 816     while (NULL != tmp);
 817     return elem;
 818 }
 819 
 820 static ns_track_elem_t *_get_track_elem_for_namespace(pmix_common_dstore_ctx_t *ds_ctx,
 821                                                       ns_map_data_t *ns_map)
 822 {
 823     ns_track_elem_t *new_elem = NULL;
 824     size_t size = pmix_value_array_get_size(ds_ctx->ns_track_array);
 825 
 826     PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
 827                          "%s:%d:%s: nspace %s",
 828                          __FILE__, __LINE__, __func__, ns_map->name));
 829 
 830     /* check if this namespace is already being tracked to avoid duplicating data. */
 831     if (ns_map->track_idx >= 0) {
 832         if ((ns_map->track_idx + 1) > (int)size) {
 833             return NULL;
 834         }
 835         /* data for this namespace should be already stored in shared memory region. */
 836         /* so go and just put new data. */
 837         return pmix_value_array_get_item(ds_ctx->ns_track_array, ns_map->track_idx);
 838     }
 839 
 840     /* create shared memory regions for this namespace and store its info locally
 841      * to operate with address and detach/unlink afterwards. */
 842     if (NULL == (new_elem = pmix_value_array_get_item(ds_ctx->ns_track_array, size))) {
 843         return NULL;
 844     }
 845     PMIX_CONSTRUCT(new_elem, ns_track_elem_t);
 846     pmix_strncpy(new_elem->ns_map.name, ns_map->name, sizeof(new_elem->ns_map.name)-1);
 847     /* save latest track idx to info of nspace */
 848     ns_map->track_idx = size;
 849 
 850     return new_elem;
 851 }
 852 
 853 static rank_meta_info *_get_rank_meta_info(pmix_common_dstore_ctx_t *ds_ctx, pmix_rank_t rank, pmix_dstore_seg_desc_t *segdesc)
 854 {
 855     size_t i;
 856     rank_meta_info *elem = NULL;
 857     pmix_dstore_seg_desc_t *tmp = segdesc;
 858     size_t num_elems, rel_offset;
 859     int id;
 860     rank_meta_info *cur_elem;
 861 
 862     size_t rcount = rank == PMIX_RANK_WILDCARD ? 0 : rank + 1;
 863 
 864     PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
 865                          "%s:%d:%s",
 866                          __FILE__, __LINE__, __func__));
 867 
 868     if (1 == ds_ctx->direct_mode) {
 869         /* do linear search to find the requested rank inside all meta segments
 870          * for this namespace. */
 871         /* go through all existing meta segments for this namespace */
 872         do {
 873             num_elems = *((size_t*)(tmp->seg_info.seg_base_addr));
 874             for (i = 0; i < num_elems; i++) {
 875                 cur_elem = (rank_meta_info*)((uint8_t*)(tmp->seg_info.seg_base_addr) + sizeof(size_t) + i * sizeof(rank_meta_info));
 876                 if (rcount == cur_elem->rank) {
 877                     elem = cur_elem;
 878                     break;
 879                 }
 880             }
 881             tmp = tmp->next;
 882         }
 883         while (NULL != tmp && NULL == elem);
 884     } else {
 885         /* directly compute index of meta segment (id) and relative offset (rel_offset)
 886          * inside this segment for fast lookup a rank_meta_info object for the requested rank. */
 887         id = rcount/ds_ctx->max_meta_elems;
 888         rel_offset = (rcount % ds_ctx->max_meta_elems) * sizeof(rank_meta_info) + sizeof(size_t);
 889         /* go through all existing meta segments for this namespace.
 890          * Stop at id number if it exists. */
 891         while (NULL != tmp->next && 0 != id) {
 892             tmp = tmp->next;
 893             id--;
 894         }
 895         if (0 == id) {
 896             /* the segment is found, looking for data for the target rank. */
 897             elem = (rank_meta_info*)((uint8_t*)(tmp->seg_info.seg_base_addr) + rel_offset);
 898             if ( 0 == elem->offset) {
 899                 /* offset can never be 0, it means that there is no data for this rank yet. */
 900                 elem = NULL;
 901             }
 902         }
 903     }
 904     return elem;
 905 }
 906 
 907 static int set_rank_meta_info(pmix_common_dstore_ctx_t *ds_ctx, ns_track_elem_t *ns_info, rank_meta_info *rinfo)
 908 {
 909     /* it's claimed that there is still no meta info for this rank stored */
 910     pmix_dstore_seg_desc_t *tmp;
 911     size_t num_elems, rel_offset;
 912     int id, count;
 913     rank_meta_info *cur_elem;
 914 
 915     if (!ns_info || !rinfo) {
 916         PMIX_ERROR_LOG(PMIX_ERROR);
 917         return PMIX_ERROR;
 918     }
 919 
 920     PMIX_OUTPUT_VERBOSE((2, pmix_gds_base_framework.framework_output,
 921                          "%s:%d:%s: nspace %s, add rank %lu offset %lu count %lu meta info",
 922                          __FILE__, __LINE__, __func__,
 923                          ns_info->ns_map.name, (unsigned long)rinfo->rank,
 924                          (unsigned long)rinfo->offset, (unsigned long)rinfo->count));
 925 
 926     tmp = ns_info->meta_seg;
 927     if (1 == ds_ctx->direct_mode) {
 928         /* get the last meta segment to put new rank_meta_info at the end. */
 929         while (NULL != tmp->next) {
 930             tmp = tmp->next;
 931         }
 932         num_elems = *((size_t*)(tmp->seg_info.seg_base_addr));
 933         if (ds_ctx->max_meta_elems <= num_elems) {
 934             PMIX_OUTPUT_VERBOSE((2, pmix_gds_base_framework.framework_output,
 935                         "%s:%d:%s: extend meta segment for nspace %s",
 936                         __FILE__, __LINE__, __func__, ns_info->ns_map.name));
 937             /* extend meta segment, so create a new one */
 938             tmp = pmix_common_dstor_extend_segment(tmp, ds_ctx->base_path, ns_info->ns_map.name,
 939                                                    ds_ctx->jobuid, ds_ctx->setjobuid);
 940             if (NULL == tmp) {
 941                 PMIX_ERROR_LOG(PMIX_ERROR);
 942                 return PMIX_ERROR;
 943             }
 944             ns_info->num_meta_seg++;
 945             memset(tmp->seg_info.seg_base_addr, 0, sizeof(rank_meta_info));
 946             /* update number of meta segments for namespace in initial_segment */
 947             ns_seg_info_t *elem = _get_ns_info_from_initial_segment(ds_ctx, &ns_info->ns_map);
 948             if (NULL == elem) {
 949                 PMIX_ERROR_LOG(PMIX_ERROR);
 950                 return PMIX_ERROR;
 951             }
 952             if (ns_info->num_meta_seg != elem->num_meta_seg) {
 953                 elem->num_meta_seg = ns_info->num_meta_seg;
 954             }
 955             num_elems = 0;
 956         }
 957         cur_elem = (rank_meta_info*)((uint8_t*)(tmp->seg_info.seg_base_addr) + sizeof(size_t) + num_elems * sizeof(rank_meta_info));
 958         memcpy(cur_elem, rinfo, sizeof(rank_meta_info));
 959         num_elems++;
 960         memcpy(tmp->seg_info.seg_base_addr, &num_elems, sizeof(size_t));
 961     } else {
 962         /* directly compute index of meta segment (id) and relative offset (rel_offset)
 963          * inside this segment for fast lookup a rank_meta_info object for the requested rank. */
 964         size_t rcount = rinfo->rank == PMIX_RANK_WILDCARD ? 0 : rinfo->rank + 1;
 965         id = rcount/ds_ctx->max_meta_elems;
 966         rel_offset = (rcount % ds_ctx->max_meta_elems) * sizeof(rank_meta_info) + sizeof(size_t);
 967         count = id;
 968         /* go through all existing meta segments for this namespace.
 969          * Stop at id number if it exists. */
 970         while (NULL != tmp->next && 0 != count) {
 971             tmp = tmp->next;
 972             count--;
 973         }
 974         /* if there is no segment with this id, then create all missing segments till the id number. */
 975         if ((int)ns_info->num_meta_seg < (id+1)) {
 976             while ((int)ns_info->num_meta_seg != (id+1)) {
 977                 /* extend meta segment, so create a new one */
 978                 tmp = pmix_common_dstor_extend_segment(tmp, ds_ctx->base_path, ns_info->ns_map.name,
 979                                                        ds_ctx->jobuid, ds_ctx->setjobuid);
 980                 if (NULL == tmp) {
 981                     PMIX_ERROR_LOG(PMIX_ERROR);
 982                     return PMIX_ERROR;
 983                 }
 984                 memset(tmp->seg_info.seg_base_addr, 0, sizeof(rank_meta_info));
 985                 ns_info->num_meta_seg++;
 986             }
 987             /* update number of meta segments for namespace in initial_segment */
 988             ns_seg_info_t *elem = _get_ns_info_from_initial_segment(ds_ctx, &ns_info->ns_map);
 989             if (NULL == elem) {
 990                 PMIX_ERROR_LOG(PMIX_ERROR);
 991                 return PMIX_ERROR;
 992             }
 993             if (ns_info->num_meta_seg != elem->num_meta_seg) {
 994                 elem->num_meta_seg = ns_info->num_meta_seg;
 995             }
 996         }
 997         /* store rank_meta_info object by rel_offset. */
 998         cur_elem = (rank_meta_info*)((uint8_t*)(tmp->seg_info.seg_base_addr) + rel_offset);
 999         memcpy(cur_elem, rinfo, sizeof(rank_meta_info));
1000     }
1001     return PMIX_SUCCESS;
1002 }
1003 
1004 static uint8_t *_get_data_region_by_offset(pmix_common_dstore_ctx_t *ds_ctx, pmix_dstore_seg_desc_t *segdesc, size_t offset)
1005 {
1006     pmix_dstore_seg_desc_t *tmp = segdesc;
1007     size_t rel_offset = offset;
1008     uint8_t *dataaddr = NULL;
1009 
1010     PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
1011                          "%s:%d:%s",
1012                          __FILE__, __LINE__, __func__));
1013 
1014     /* go through all existing data segments for this namespace */
1015     do {
1016         if (rel_offset >= ds_ctx->data_segment_size) {
1017             rel_offset -= ds_ctx->data_segment_size;
1018         } else {
1019             dataaddr = tmp->seg_info.seg_base_addr + rel_offset;
1020         }
1021         tmp = tmp->next;
1022     } while (NULL != tmp && NULL == dataaddr);
1023 
1024     return dataaddr;
1025 }
1026 
1027 static size_t get_free_offset(pmix_common_dstore_ctx_t *ds_ctx, pmix_dstore_seg_desc_t *data_seg)
1028 {
1029     size_t offset;
1030     pmix_dstore_seg_desc_t *tmp;
1031     int id = 0;
1032     tmp = data_seg;
1033     /* first find the last data segment */
1034     while (NULL != tmp->next) {
1035         tmp = tmp->next;
1036         id++;
1037     }
1038     offset = *((size_t*)(tmp->seg_info.seg_base_addr));
1039     if (0 == offset) {
1040         /* this is the first created data segment, the first 8 bytes are used to place the free offset value itself */
1041         offset = sizeof(size_t);
1042     }
1043     return (id * ds_ctx->data_segment_size + offset);
1044 }
1045 
1046 static int put_empty_ext_slot(pmix_common_dstore_ctx_t *ds_ctx, pmix_dstore_seg_desc_t *dataseg)
1047 {
1048     size_t global_offset, rel_offset, data_ended, val = 0;
1049     uint8_t *addr;
1050     pmix_status_t rc;
1051 
1052     global_offset = get_free_offset(ds_ctx, dataseg);
1053     rel_offset = global_offset % ds_ctx->data_segment_size;
1054     if (rel_offset + PMIX_DS_SLOT_SIZE(ds_ctx) > ds_ctx->data_segment_size) {
1055         PMIX_ERROR_LOG(PMIX_ERROR);
1056         return PMIX_ERROR;
1057     }
1058     addr = _get_data_region_by_offset(ds_ctx, dataseg, global_offset);
1059     PMIX_DS_PUT_KEY(rc, ds_ctx, addr, ESH_REGION_EXTENSION, (void*)&val, sizeof(size_t));
1060     if (rc != PMIX_SUCCESS) {
1061         PMIX_ERROR_LOG(rc);
1062         return rc;
1063     }
1064     /* update offset at the beginning of current segment */
1065     data_ended = rel_offset + PMIX_DS_SLOT_SIZE(ds_ctx);
1066     addr = (uint8_t*)(addr - rel_offset);
1067     memcpy(addr, &data_ended, sizeof(size_t));
1068     return PMIX_SUCCESS;
1069 }
1070 
1071 static size_t put_data_to_the_end(pmix_common_dstore_ctx_t *ds_ctx, ns_track_elem_t *ns_info,
1072                                   pmix_dstore_seg_desc_t *dataseg, char *key, void *buffer, size_t size)
1073 {
1074     size_t offset, id = 0;
1075     pmix_dstore_seg_desc_t *tmp;
1076     size_t global_offset, data_ended;
1077     uint8_t *addr;
1078     pmix_status_t rc;
1079 
1080     PMIX_OUTPUT_VERBOSE((2, pmix_gds_base_framework.framework_output,
1081                          "%s:%d:%s: key %s",
1082                          __FILE__, __LINE__, __func__, key));
1083 
1084     tmp = dataseg;
1085     while (NULL != tmp->next) {
1086         tmp = tmp->next;
1087         id++;
1088     }
1089     global_offset = get_free_offset(ds_ctx, dataseg);
1090     offset = global_offset % ds_ctx->data_segment_size;
1091 
1092     /* We should provide additional space at the end of segment to
1093      * place EXTENSION_SLOT to have an ability to enlarge data for this rank.*/
1094     if ((sizeof(size_t) + PMIX_DS_KEY_SIZE(ds_ctx, key, size) + PMIX_DS_SLOT_SIZE(ds_ctx)) >
1095             ds_ctx->data_segment_size) {
1096         /* this is an error case: segment is so small that cannot place evem a single key-value pair.
1097          * warn a user about it and fail. */
1098         offset = 0; /* offset cannot be 0 in normal case, so we use this value to indicate a problem. */
1099         pmix_output(0, "PLEASE set NS_DATA_SEG_SIZE to value which is larger when %lu.",
1100                     (unsigned long)(sizeof(size_t) + strlen(key) + 1 + sizeof(size_t) +
1101                                     size + PMIX_DS_SLOT_SIZE(ds_ctx)));
1102         return offset;
1103     }
1104 
1105     /* check the corner case that was observed at large scales:
1106      * https://github.com/pmix/master/pull/282#issuecomment-277454198
1107      *
1108      * if last time we stopped exactly on the border of the segment
1109      * new segment wasn't allocated to us but (global_offset % _data_segment_size) == 0
1110      * so if offset is 0 here - we need to allocate the segment as well
1111      */
1112     if ( (0 == offset) || ( (offset + PMIX_DS_KEY_SIZE(ds_ctx, key, size) +
1113                              PMIX_DS_SLOT_SIZE(ds_ctx)) > ds_ctx->data_segment_size) ) {
1114         id++;
1115         /* create a new data segment. */
1116         tmp = pmix_common_dstor_extend_segment(tmp, ds_ctx->base_path, ns_info->ns_map.name,
1117                                                ds_ctx->jobuid, ds_ctx->setjobuid);
1118         if (NULL == tmp) {
1119             PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
1120             offset = 0; /* offset cannot be 0 in normal case, so we use this value to indicate a problem. */
1121             return offset;
1122         }
1123         ns_info->num_data_seg++;
1124         /* update_ns_info_in_initial_segment */
1125         ns_seg_info_t *elem = _get_ns_info_from_initial_segment(ds_ctx, &ns_info->ns_map);
1126         if (NULL == elem) {
1127             PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
1128             offset = 0; /* offset cannot be 0 in normal case, so we use this value to indicate a problem. */
1129             return offset;
1130         }
1131         elem->num_data_seg++;
1132         offset = sizeof(size_t);
1133     }
1134     global_offset = offset + id * ds_ctx->data_segment_size;
1135     addr = (uint8_t*)(tmp->seg_info.seg_base_addr)+offset;
1136     PMIX_DS_PUT_KEY(rc, ds_ctx, addr, key, buffer, size);
1137     if (rc != PMIX_SUCCESS) {
1138         PMIX_ERROR_LOG(rc);
1139         return 0;
1140     }
1141 
1142     /* update offset at the beginning of current segment */
1143     data_ended = offset + PMIX_DS_KEY_SIZE(ds_ctx, key, size);
1144     addr = (uint8_t*)(tmp->seg_info.seg_base_addr);
1145     memcpy(addr, &data_ended, sizeof(size_t));
1146     PMIX_OUTPUT_VERBOSE((1, pmix_gds_base_framework.framework_output,
1147                          "%s:%d:%s: key %s, rel start offset %lu, rel end offset %lu, abs shift %lu size %lu",
1148                          __FILE__, __LINE__, __func__,
1149                          key, (unsigned long)offset,
1150                          (unsigned long)data_ended,
1151                          (unsigned long)(id * ds_ctx->data_segment_size),
1152                          (unsigned long)size));
1153     return global_offset;
1154 }
1155 
1156 static int pmix_sm_store(pmix_common_dstore_ctx_t *ds_ctx, ns_track_elem_t *ns_info,
1157                          pmix_rank_t rank, pmix_kval_t *kval, rank_meta_info **rinfo, int data_exist)
1158 {
1159     size_t offset, size, kval_cnt;
1160     pmix_buffer_t buffer;
1161     pmix_status_t rc;
1162     pmix_dstore_seg_desc_t *datadesc;
1163     uint8_t *addr;
1164 
1165     PMIX_OUTPUT_VERBOSE((2, pmix_gds_base_framework.framework_output,
1166                          "%s:%d:%s: for rank %u, replace flag %d",
1167                          __FILE__, __LINE__, __func__, rank, data_exist));
1168 
1169     datadesc = ns_info->data_seg;
1170     /* pack value to the buffer */
1171     PMIX_CONSTRUCT(&buffer, pmix_buffer_t);
1172     PMIX_BFROPS_PACK(rc, _client_peer(ds_ctx), &buffer, kval->value, 1, PMIX_VALUE);
1173     if (PMIX_SUCCESS != rc) {
1174         PMIX_ERROR_LOG(rc);
1175         goto exit;
1176     }
1177     size = buffer.bytes_used;
1178 
1179     if (0 == data_exist) {
1180         /* there is no data blob for this rank yet, so add it. */
1181         size_t free_offset;
1182         free_offset = get_free_offset(ds_ctx, datadesc);
1183         offset = put_data_to_the_end(ds_ctx, ns_info, datadesc, kval->key, buffer.base_ptr, size);
1184         if (0 == offset) {
1185             /* this is an error */
1186             rc = PMIX_ERROR;
1187             PMIX_ERROR_LOG(rc);
1188             goto exit;
1189         }
1190         /* if it's the first time when we put data for this rank, then *rinfo == NULL,
1191          * and even if segment was extended, and data was put into the next segment,
1192          * we don't need to extension slot at the end of previous segment.
1193          * If we try, we might overwrite other segments memory,
1194          * because previous segment is already full. */
1195         if (free_offset != offset && NULL != *rinfo) {
1196             /* here we compare previous free offset with the offset where we just put data.
1197              * It should be equal in the normal case. If it's not true, then it means that
1198              * segment was extended, and we put data to the next segment, so we now need to
1199              * put extension slot at the end of previous segment with a "reference" to a new_offset */
1200             addr = _get_data_region_by_offset(ds_ctx, datadesc, free_offset);
1201             PMIX_DS_PUT_KEY(rc, ds_ctx, addr, ESH_REGION_EXTENSION, (void*)&offset, sizeof(size_t));
1202             if (rc != PMIX_SUCCESS) {
1203                 PMIX_ERROR_LOG(rc);
1204                 return 0;
1205             }
1206         }
1207         if (NULL == *rinfo) {
1208             *rinfo = (rank_meta_info*)malloc(sizeof(rank_meta_info));
1209             (*rinfo)->rank = rank;
1210             (*rinfo)->offset = offset;
1211             (*rinfo)->count = 0;
1212         }
1213         (*rinfo)->count++;
1214     } else if (NULL != *rinfo) {
1215         /* there is data blob for this rank */
1216         addr = _get_data_region_by_offset(ds_ctx, datadesc, (*rinfo)->offset);
1217         if (NULL == addr) {
1218             rc = PMIX_ERROR;
1219             PMIX_ERROR_LOG(rc);
1220             goto exit;
1221         }
1222         /* go through previous data region and find key matches.
1223          * If one is found, then mark this kval as invalidated.
1224          * Then put a new empty offset to the next extension slot,
1225          * and add new kval by this offset.
1226          * no need to update meta info, it's still the same. */
1227         kval_cnt = (*rinfo)->count;
1228         int add_to_the_end = 1;
1229         while (0 < kval_cnt) {
1230             /* data is stored in the following format:
1231              * size_t size
1232              * key[ESH_KNAME_LEN(addr)]
1233              * byte buffer containing pmix_value, should be loaded to pmix_buffer_t and unpacked.
1234              * next kval pair
1235              * .....
1236              * extension slot which has key = EXTENSION_SLOT and a size_t value for offset to next data address for this process.
1237              */
1238             if(PMIX_DS_KEY_IS_EXTSLOT(ds_ctx, addr)) {
1239                 memcpy(&offset, PMIX_DS_DATA_PTR(ds_ctx, addr), sizeof(size_t));
1240                 if (0 < offset) {
1241                     PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
1242                                 "%s:%d:%s: for rank %lu, replace flag %d %s is filled with %lu value",
1243                                 __FILE__, __LINE__, __func__,
1244                                 (unsigned long)rank, data_exist,
1245                                 ESH_REGION_EXTENSION, (unsigned long)offset));
1246                     /* go to next item, updating address */
1247                     addr = _get_data_region_by_offset(ds_ctx, datadesc, offset);
1248                     if (NULL == addr) {
1249                         rc = PMIX_ERROR;
1250                         PMIX_ERROR_LOG(rc);
1251                         goto exit;
1252                     }
1253                 } else {
1254                     /* should not be, we should be out of cycle when this happens */
1255                 }
1256             } else if (0 == strncmp(PMIX_DS_KNAME_PTR(ds_ctx, addr), kval->key,
1257                                     PMIX_DS_KNAME_LEN(ds_ctx, kval->key))) {
1258                 PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
1259                             "%s:%d:%s: for rank %u, replace flag %d found target key %s",
1260                             __FILE__, __LINE__, __func__, rank, data_exist, kval->key));
1261                 /* target key is found, compare value sizes */
1262                 if (PMIX_DS_DATA_SIZE(ds_ctx, addr, PMIX_DS_DATA_PTR(ds_ctx, addr)) != size) {
1263                 //if (1) { /* if we want to test replacing values for existing keys. */
1264                     /* invalidate current value and store another one at the end of data region. */
1265                     PMIX_DS_KEY_SET_INVALID(ds_ctx, addr);
1266                     /* decrementing count, it will be incremented back when we add a new value for this key at the end of region. */
1267                     (*rinfo)->count--;
1268                     kval_cnt--;
1269                     /* go to next item, updating address */
1270                     addr += PMIX_DS_KV_SIZE(ds_ctx, addr);
1271                     PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
1272                                 "%s:%d:%s: for rank %u, replace flag %d mark key %s regions as invalidated. put new data at the end.",
1273                                 __FILE__, __LINE__, __func__, rank, data_exist, kval->key));
1274                 } else {
1275                     PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
1276                                 "%s:%d:%s: for rank %u, replace flag %d replace data for key %s type %d in place",
1277                                 __FILE__, __LINE__, __func__, rank, data_exist, kval->key, kval->value->type));
1278                     /* replace old data with new one. */
1279                     memset(PMIX_DS_DATA_PTR(ds_ctx, addr), 0,
1280                            PMIX_DS_DATA_SIZE(ds_ctx, addr, PMIX_DS_DATA_PTR(ds_ctx, addr)));
1281                     memcpy(PMIX_DS_DATA_PTR(ds_ctx, addr), buffer.base_ptr, size);
1282                     addr += PMIX_DS_KV_SIZE(ds_ctx, addr);
1283                     add_to_the_end = 0;
1284                     break;
1285                 }
1286             } else {
1287                 PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
1288                             "%s:%d:%s: for rank %u, replace flag %d skip %s key, look for %s key",
1289                             __FILE__, __LINE__, __func__, rank, data_exist,
1290                             PMIX_DS_KNAME_PTR(ds_ctx, addr), kval->key));
1291                 /* Skip it: key is "INVALIDATED" or key is valid but different from target one. */
1292                 if (!PMIX_DS_KEY_IS_INVALID(ds_ctx, addr)) {
1293                     /* count only valid items */
1294                     kval_cnt--;
1295                 }
1296                 /* go to next item, updating address */
1297                 addr += PMIX_DS_KV_SIZE(ds_ctx, addr);
1298             }
1299         }
1300         if (1 == add_to_the_end) {
1301             /* if we get here, it means that we want to add a new item for the target rank, or
1302              * we mark existing item with the same key as "invalidated" and want to add new item
1303              * for the same key. */
1304             size_t free_offset;
1305             (*rinfo)->count++;
1306             free_offset = get_free_offset(ds_ctx, datadesc);
1307 
1308             /*
1309              * Remove trailing extention slot if we are continuing
1310              * same ranks data.
1311              *
1312              * When keys are stored individually through _store_data_for_rank
1313              * an empty extention slot is placed every time.
1314              *
1315              * This is required because there is no information about whether or not the next key
1316              * will belong to the same rank.
1317              *
1318              * As the result EACH keys stored with _store_data_for_rank is
1319              * followed by extension slot. This slows down search and increases
1320              * the memory footprint.
1321              *
1322              * The following code tries to deal with such one-key-at-a-time
1323              * situation by:
1324              *  - checking if the last key-value for this rank is an extention
1325              *    slot
1326              *  - If this is the case - checks if this key-value pair is the
1327              *    last one at the moment and can be safely deleted.
1328              *  - if it is - current segment's offset pointer is decreased by
1329              *    the size of the extention slot key-value effectively removing
1330              *    it from the dstor
1331              */
1332             if (PMIX_DS_KEY_IS_EXTSLOT(ds_ctx, addr)){
1333                 /* Find the last data segment */
1334                 pmix_dstore_seg_desc_t *ldesc = datadesc;
1335                 uint8_t *segstart;
1336                 size_t offs_past_extslot = 0;
1337                 size_t offs_cur_segment = 0;
1338                 while (NULL != ldesc->next) {
1339                     ldesc = ldesc->next;
1340                 }
1341 
1342                 /* Calculate the offset of the end of the extension slot */
1343                 offs_cur_segment = free_offset % ds_ctx->data_segment_size;
1344                 segstart = ldesc->seg_info.seg_base_addr;
1345                 offs_past_extslot = (addr + PMIX_DS_KV_SIZE(ds_ctx, addr)) - segstart;
1346 
1347                 /* We can erase extension slot if:
1348                  *  - address of the ext slot belongs to the occupied part of the
1349                  *    last segment
1350                  *  - local offset within the segment is equal to the local
1351                  *    offset of the end of extension slot
1352                  */
1353                 if( ( (addr > segstart) && (addr < (segstart + offs_cur_segment)) )
1354                      && (offs_cur_segment == offs_past_extslot) ) {
1355                     /* Calculate a new free offset that doesn't account this
1356                      * extension slot */
1357                     size_t new_offset = addr - segstart;
1358                     /* Rewrite segment's offset information to exclude
1359                      * extension slot */
1360                     memcpy(segstart, &new_offset, sizeof(size_t));
1361                     /* Recalculate free_offset */
1362                     free_offset = get_free_offset(ds_ctx, datadesc);
1363                 }
1364             }
1365 
1366             /* add to the end */
1367             offset = put_data_to_the_end(ds_ctx, ns_info, datadesc, kval->key, buffer.base_ptr, size);
1368             if (0 == offset) {
1369                 rc = PMIX_ERROR;
1370                 PMIX_ERROR_LOG(rc);
1371                 goto exit;
1372             }
1373             /* we just reached the end of data for the target rank, and there can be two cases:
1374              * (1) - we are in the middle of data segment; data for this rank is separated from
1375              * data for different ranks, and that's why next element is EXTENSION_SLOT.
1376              * We put new data to the end of data region and just update EXTENSION_SLOT value by new offset.
1377              */
1378             if (PMIX_DS_KEY_IS_EXTSLOT(ds_ctx, addr)) {
1379                 PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
1380                             "%s:%d:%s: for rank %u, replace flag %d %s should be filled with offset %lu value",
1381                             __FILE__, __LINE__, __func__, rank, data_exist, ESH_REGION_EXTENSION, offset));
1382                 memcpy(PMIX_DS_DATA_PTR(ds_ctx, addr), &offset, sizeof(size_t));
1383             } else {
1384                 /* (2) - we point to the first free offset, no more data is stored further in this segment.
1385                  * There is no EXTENSION_SLOT by this addr since we continue pushing data for the same rank,
1386                  * and there is no need to split it.
1387                  * But it's possible that we reached the end of current data region and just jumped to the new region
1388                  * to put new data, in that case free_offset != offset and we must put EXTENSION_SLOT by the current addr
1389                  * forcibly and store new offset in its value. */
1390                 if (free_offset != offset) {
1391                     /* segment was extended, need to put extension slot by free_offset indicating new_offset */
1392                     PMIX_DS_PUT_KEY(rc, ds_ctx, addr, ESH_REGION_EXTENSION, (void*)&offset, sizeof(size_t));
1393                     if (rc != PMIX_SUCCESS) {
1394                         PMIX_ERROR_LOG(rc);
1395                         return 0;
1396                     }
1397                 }
1398             }
1399             PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
1400                         "%s:%d:%s: for rank %u, replace flag %d item not found ext slot empty, put key %s to the end",
1401                         __FILE__, __LINE__, __func__, rank, data_exist, kval->key));
1402         }
1403     }
1404 exit:
1405     PMIX_DESTRUCT(&buffer);
1406     return rc;
1407 }
1408 
1409 static int _store_data_for_rank(pmix_common_dstore_ctx_t *ds_ctx, ns_track_elem_t *ns_info,
1410                                 pmix_rank_t rank, pmix_buffer_t *buf)
1411 {
1412     pmix_status_t rc;
1413 
1414     pmix_kval_t *kp;
1415     pmix_dstore_seg_desc_t *metadesc, *datadesc;
1416     int32_t cnt;
1417 
1418     rank_meta_info *rinfo = NULL;
1419     size_t num_elems, free_offset, new_free_offset;
1420     int data_exist;
1421 
1422     PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
1423                          "%s:%d:%s: for rank %u", __FILE__, __LINE__, __func__, rank));
1424 
1425     metadesc = ns_info->meta_seg;
1426     datadesc = ns_info->data_seg;
1427 
1428     if (NULL == datadesc || NULL == metadesc) {
1429         rc = PMIX_ERR_BAD_PARAM;
1430         PMIX_ERROR_LOG(rc);
1431         return rc;
1432     }
1433 
1434     num_elems = *((size_t*)(metadesc->seg_info.seg_base_addr));
1435     data_exist = 0;
1436     /* when we don't use linear search (direct_mode == 0) we don't use num_elems field,
1437      * so anyway try to get rank_meta_info first. */
1438     if (0 < num_elems || 0 == ds_ctx->direct_mode) {
1439         /* go through all elements in meta segment and look for target rank. */
1440         rinfo = _get_rank_meta_info(ds_ctx, rank, metadesc);
1441         if (NULL != rinfo) {
1442             data_exist = 1;
1443         }
1444     }
1445     /* incoming buffer may contain several inner buffers for different scopes,
1446      * so unpack these buffers, and then unpack kvals from each modex buffer,
1447      * storing them in the shared memory dstore.
1448      */
1449     free_offset = get_free_offset(ds_ctx, datadesc);
1450     cnt = 1;
1451     kp = PMIX_NEW(pmix_kval_t);
1452     PMIX_BFROPS_UNPACK(rc, pmix_globals.mypeer, buf, kp, &cnt, PMIX_KVAL);
1453     while(PMIX_SUCCESS == rc) {
1454         pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1455                             "pmix: unpacked key %s", kp->key);
1456         if (PMIX_SUCCESS != (rc = pmix_sm_store(ds_ctx, ns_info, rank, kp, &rinfo, data_exist))) {
1457             PMIX_ERROR_LOG(rc);
1458             if (NULL != rinfo) {
1459                 free(rinfo);
1460             }
1461             return rc;
1462         }
1463         PMIX_RELEASE(kp); // maintain acctg - hash_store does a retain
1464         cnt = 1;
1465         kp = PMIX_NEW(pmix_kval_t);
1466         PMIX_BFROPS_UNPACK(rc, pmix_globals.mypeer, buf, kp, &cnt, PMIX_KVAL);
1467     }
1468 
1469     PMIX_RELEASE(kp);
1470 
1471     if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
1472         PMIX_ERROR_LOG(rc);
1473         /* TODO: should we error-exit here? */
1474     } else {
1475         rc = PMIX_SUCCESS;
1476     }
1477 
1478     /* Check if new data was put at the end of data segment.
1479      * It's possible that old data just was replaced with new one,
1480      * in that case we don't reserve space for EXTENSION_SLOT, it's
1481      * already reserved.
1482      * */
1483     new_free_offset = get_free_offset(ds_ctx, datadesc);
1484     if (new_free_offset != free_offset) {
1485         /* Reserve space for EXTENSION_SLOT at the end of data blob.
1486          * We need it to split data for one rank from data for different
1487          * ranks and to allow extending data further.
1488          * We also put EXTENSION_SLOT at the end of each data segment, and
1489          * its value points to the beginning of next data segment.
1490          * */
1491         rc = put_empty_ext_slot(ds_ctx, ns_info->data_seg);
1492         if (PMIX_SUCCESS != rc) {
1493             if ((0 == data_exist) && NULL != rinfo) {
1494                 free(rinfo);
1495             }
1496             PMIX_ERROR_LOG(rc);
1497             return rc;
1498         }
1499     }
1500 
1501     /* if this is the first data posted for this rank, then
1502      * update meta info for it */
1503     if (0 == data_exist) {
1504         set_rank_meta_info(ds_ctx, ns_info, rinfo);
1505         if (NULL != rinfo) {
1506             free(rinfo);
1507         }
1508     }
1509 
1510     return rc;
1511 }
1512 
1513 static inline ssize_t _get_univ_size(pmix_common_dstore_ctx_t *ds_ctx, const char *nspace)
1514 {
1515     ssize_t nprocs = 0;
1516     pmix_value_t *val;
1517     int rc;
1518 
1519     rc = _dstore_fetch(ds_ctx, nspace, PMIX_RANK_WILDCARD, PMIX_UNIV_SIZE, &val);
1520     if( PMIX_SUCCESS != rc ) {
1521         PMIX_ERROR_LOG(rc);
1522         return rc;
1523     }
1524     if( val->type != PMIX_UINT32 ){
1525         rc = PMIX_ERR_BAD_PARAM;
1526         PMIX_ERROR_LOG(rc);
1527         return rc;
1528     }
1529     nprocs = (ssize_t)val->data.uint32;
1530     PMIX_VALUE_RELEASE(val);
1531     return nprocs;
1532 }
1533 
1534 PMIX_EXPORT pmix_status_t pmix_common_dstor_cache_job_info(pmix_common_dstore_ctx_t *ds_ctx,
1535                                 struct pmix_namespace_t *ns,
1536                                 pmix_info_t info[], size_t ninfo)
1537 {
1538     return PMIX_SUCCESS;
1539 }
1540 
1541 
1542 pmix_common_dstore_ctx_t *pmix_common_dstor_init(const char *ds_name, pmix_info_t info[], size_t ninfo,
1543                                                  pmix_common_lock_callbacks_t *lock_cb,
1544                                                  pmix_common_dstore_file_cbs_t *file_cb)
1545 {
1546     pmix_status_t rc;
1547     size_t n;
1548     char *dstor_tmpdir = NULL;
1549     size_t tbl_idx = 0;
1550     ns_map_data_t *ns_map = NULL;
1551     pmix_common_dstore_ctx_t *ds_ctx = NULL;
1552 
1553     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1554                         "pmix:gds:dstore init");
1555 
1556     ds_ctx = (pmix_common_dstore_ctx_t*) malloc(sizeof(*ds_ctx));
1557     if (NULL == ds_ctx) {
1558         PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
1559         return NULL;
1560     }
1561     memset(ds_ctx, 0, sizeof(*ds_ctx));
1562 
1563     /* assign lock callbacks */
1564     ds_ctx->lock_cbs = lock_cb;
1565     ds_ctx->file_cbs = file_cb;
1566 
1567     /* open the pshmem and select the active plugins */
1568     if( PMIX_SUCCESS != (rc = pmix_mca_base_framework_open(&pmix_pshmem_base_framework, 0)) ) {
1569         PMIX_ERROR_LOG(rc);
1570         goto err_exit;
1571     }
1572     if( PMIX_SUCCESS != (rc = pmix_pshmem_base_select()) ) {
1573         PMIX_ERROR_LOG(rc);
1574         goto err_exit;
1575     }
1576 
1577     ds_ctx->jobuid = getuid();
1578     ds_ctx->setjobuid = 0;
1579 
1580     if (PMIX_SUCCESS != (rc = _esh_tbls_init(ds_ctx))) {
1581         PMIX_ERROR_LOG(rc);
1582         goto err_exit;
1583     }
1584 
1585     rc = pmix_pshmem.init();
1586     if (PMIX_SUCCESS != rc) {
1587         PMIX_ERROR_LOG(rc);
1588         goto err_exit;
1589     }
1590 
1591     _set_constants_from_env(ds_ctx);
1592     ds_ctx->ds_name = strdup(ds_name);
1593 
1594     /* find the temp dir */
1595     if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) {
1596         ds_ctx->session_map_search = (session_map_search_fn_t)_esh_session_map_search_server;
1597 
1598         /* scan incoming info for directives */
1599         if (NULL != info) {
1600             for (n=0; n < ninfo; n++) {
1601                 if (0 == strcmp(PMIX_USERID, info[n].key)) {
1602                     ds_ctx->jobuid = info[n].value.data.uint32;
1603                     ds_ctx->setjobuid = 1;
1604                     continue;
1605                 }
1606                 if (0 == strcmp(PMIX_DSTPATH, info[n].key)) {
1607                     /* PMIX_DSTPATH is the way for RM to customize the
1608                      * place where shared memory files are placed.
1609                      * We need this for the following reasons:
1610                      * - disk usage: files can be relatively large and the system may
1611                      *   have a small common temp directory.
1612                      * - performance: system may have a fast IO device (i.e. burst buffer)
1613                      *   for the local usage.
1614                      *
1615                      * PMIX_DSTPATH has higher priority than PMIX_SERVER_TMPDIR
1616                      */
1617                     if( PMIX_STRING != info[n].value.type ){
1618                         rc = PMIX_ERR_BAD_PARAM;
1619                         PMIX_ERROR_LOG(rc);
1620                         goto err_exit;
1621                     }
1622                     dstor_tmpdir = (char*)info[n].value.data.string;
1623                     continue;
1624                 }
1625                 if (0 == strcmp(PMIX_SERVER_TMPDIR, info[n].key)) {
1626                     if( PMIX_STRING != info[n].value.type ){
1627                         rc = PMIX_ERR_BAD_PARAM;
1628                         PMIX_ERROR_LOG(rc);
1629                         goto err_exit;
1630                     }
1631                     if (NULL == dstor_tmpdir) {
1632                         dstor_tmpdir = (char*)info[n].value.data.string;
1633                     }
1634                     continue;
1635                 }
1636             }
1637         }
1638 
1639         if (NULL == dstor_tmpdir) {
1640             if (NULL == (dstor_tmpdir = getenv("TMPDIR"))) {
1641                 if (NULL == (dstor_tmpdir = getenv("TEMP"))) {
1642                     if (NULL == (dstor_tmpdir = getenv("TMP"))) {
1643                         dstor_tmpdir = "/tmp";
1644                     }
1645                 }
1646             }
1647         }
1648 
1649         rc = asprintf(&ds_ctx->base_path, "%s/pmix_dstor_%s_%d", dstor_tmpdir,
1650                       ds_ctx->ds_name, getpid());
1651         if ((0 > rc) || (NULL == ds_ctx->base_path)) {
1652             rc = PMIX_ERR_OUT_OF_RESOURCE;
1653             PMIX_ERROR_LOG(rc);
1654             goto err_exit;
1655         }
1656 
1657         if (0 != mkdir(ds_ctx->base_path, 0770)) {
1658             if (EEXIST != errno) {
1659                 rc = PMIX_ERROR;
1660                 PMIX_ERROR_LOG(rc);
1661                 goto err_exit;
1662             }
1663         }
1664         if (ds_ctx->setjobuid > 0) {
1665             if (chown(ds_ctx->base_path, (uid_t) ds_ctx->jobuid, (gid_t) -1) < 0){
1666                 rc = PMIX_ERR_NO_PERMISSIONS;
1667                 PMIX_ERROR_LOG(rc);
1668                 goto err_exit;
1669             }
1670         }
1671         ds_ctx->session_map_search = _esh_session_map_search_server;
1672         return ds_ctx;
1673     }
1674     /* for clients */
1675     else {
1676         char *env_name = NULL;
1677         int ds_ver = 0;
1678 
1679         sscanf(ds_ctx->ds_name, "ds%d", &ds_ver);
1680         if (0 == ds_ver) {
1681             rc = PMIX_ERR_INIT;
1682             PMIX_ERROR_LOG(rc);
1683             goto err_exit;
1684         }
1685         if (0 > asprintf(&env_name, PMIX_DSTORE_VER_BASE_PATH_FMT, ds_ver)) {
1686              rc = PMIX_ERR_NOMEM;
1687              PMIX_ERROR_LOG(rc);
1688              goto err_exit;
1689         }
1690         dstor_tmpdir = getenv(env_name);
1691         free(env_name);
1692 
1693         if (NULL == dstor_tmpdir) {
1694             dstor_tmpdir = getenv(PMIX_DSTORE_ESH_BASE_PATH);
1695         }
1696         if (NULL == dstor_tmpdir){
1697             rc = PMIX_ERR_NOT_AVAILABLE; // simply disqualify ourselves
1698             goto err_exit;
1699         }
1700         if (NULL == (ds_ctx->base_path = strdup(dstor_tmpdir))) {
1701             rc = PMIX_ERR_OUT_OF_RESOURCE;
1702             PMIX_ERROR_LOG(rc);
1703             goto err_exit;
1704         }
1705         ds_ctx->session_map_search = _esh_session_map_search_client;
1706         /* init ds_ctx protect lock */
1707         if (0 != pthread_mutex_init(&ds_ctx->lock, NULL)) {
1708             rc = PMIX_ERR_INIT;
1709             PMIX_ERROR_LOG(rc);
1710             goto err_exit;
1711         }
1712     }
1713 
1714     rc = _esh_session_tbl_add(ds_ctx, &tbl_idx);
1715     if (PMIX_SUCCESS != rc) {
1716         PMIX_ERROR_LOG(rc);
1717         goto err_exit;
1718     }
1719 
1720     char *nspace = NULL;
1721     /* if we don't see the required info, then we cannot init */
1722     if (NULL == (nspace = getenv("PMIX_NAMESPACE"))) {
1723         rc = PMIX_ERR_INVALID_NAMESPACE;
1724         PMIX_ERROR_LOG(rc);
1725         goto err_exit;
1726     }
1727     /* lock init */
1728     rc = ds_ctx->lock_cbs->init(&_ESH_SESSION_lock(ds_ctx->session_array, tbl_idx), ds_ctx->base_path, nspace, 1, ds_ctx->jobuid, ds_ctx->setjobuid);
1729     if (rc != PMIX_SUCCESS) {
1730         goto err_exit;
1731     }
1732     ns_map = _esh_session_map(ds_ctx, nspace, 0, tbl_idx);
1733     if (NULL == ns_map) {
1734         rc = PMIX_ERR_OUT_OF_RESOURCE;
1735         PMIX_ERROR_LOG(rc);
1736         goto err_exit;
1737     }
1738 
1739     if (PMIX_SUCCESS != (rc =_esh_session_init(ds_ctx, tbl_idx, ns_map, 1,
1740                                                ds_ctx->jobuid, ds_ctx->setjobuid))) {
1741         PMIX_ERROR_LOG(rc);
1742         goto err_exit;
1743     }
1744 
1745     return ds_ctx;
1746 err_exit:
1747     pmix_common_dstor_finalize(ds_ctx);
1748     return NULL;
1749 }
1750 
1751 PMIX_EXPORT void pmix_common_dstor_finalize(pmix_common_dstore_ctx_t *ds_ctx)
1752 {
1753     struct stat st = {0};
1754     pmix_status_t rc = PMIX_SUCCESS;
1755 
1756     PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
1757                          "%s:%d:%s", __FILE__, __LINE__, __func__));
1758 
1759     _esh_sessions_cleanup(ds_ctx);
1760     _esh_ns_map_cleanup(ds_ctx);
1761     _esh_ns_track_cleanup(ds_ctx);
1762 
1763     pmix_pshmem.finalize();
1764 
1765     if (NULL != ds_ctx->base_path){
1766         if(PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) {
1767             if (lstat(ds_ctx->base_path, &st) >= 0){
1768                 if (PMIX_SUCCESS != (rc = _esh_dir_del(ds_ctx->base_path))) {
1769                     PMIX_ERROR_LOG(rc);
1770                 }
1771             }
1772         }
1773         free(ds_ctx->base_path);
1774         ds_ctx->base_path = NULL;
1775     }
1776     if (NULL != ds_ctx->clients_peer) {
1777         PMIX_RELEASE(ds_ctx->clients_peer->nptr);
1778         PMIX_RELEASE(ds_ctx->clients_peer);
1779     }
1780     /* close the pshmem framework */
1781     if( PMIX_SUCCESS != (rc = pmix_mca_base_framework_close(&pmix_pshmem_base_framework)) ) {
1782         PMIX_ERROR_LOG(rc);
1783     }
1784     free(ds_ctx->ds_name);
1785     free(ds_ctx->base_path);
1786     free(ds_ctx);
1787 }
1788 
1789 static pmix_status_t _dstore_store_nolock(pmix_common_dstore_ctx_t *ds_ctx,
1790                                    ns_map_data_t *ns_map,
1791                                    pmix_rank_t rank,
1792                                    pmix_kval_t *kv)
1793 {
1794     pmix_status_t rc = PMIX_SUCCESS;
1795     ns_track_elem_t *elem;
1796     pmix_buffer_t xfer;
1797     ns_seg_info_t ns_info;
1798 
1799     if (NULL == kv) {
1800         return PMIX_ERROR;
1801     }
1802 
1803     PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
1804                          "%s:%d:%s: for %s:%u",
1805                          __FILE__, __LINE__, __func__, ns_map->name, rank));
1806 
1807     /* First of all, we go through local track list (list of ns_track_elem_t structures)
1808      * and look for an element for the target namespace.
1809      * If it is there, then shared memory segments for it are created, so we take it.
1810      * Otherwise, create a new element, fill its fields, create corresponding meta
1811      * and data segments for this namespace, add it to the local track list,
1812      * and put this info (ns_seg_info_t) to the initial segment. If initial segment
1813      * if full, then extend it by creating a new one and mark previous one as full.
1814      * All this stuff is done inside _get_track_elem_for_namespace function.
1815      */
1816 
1817     elem = _get_track_elem_for_namespace(ds_ctx, ns_map);
1818     if (NULL == elem) {
1819         rc = PMIX_ERR_OUT_OF_RESOURCE;
1820         PMIX_ERROR_LOG(rc);
1821         goto exit;
1822     }
1823 
1824     /* If a new element was just created, we need to create corresponding meta and
1825      * data segments and update corresponding element's fields. */
1826     if (NULL == elem->meta_seg || NULL == elem->data_seg) {
1827         memset(&ns_info.ns_map, 0, sizeof(ns_info.ns_map));
1828         pmix_strncpy(ns_info.ns_map.name, ns_map->name, sizeof(ns_info.ns_map.name)-1);
1829         ns_info.ns_map.tbl_idx = ns_map->tbl_idx;
1830         ns_info.num_meta_seg = 1;
1831         ns_info.num_data_seg = 1;
1832         rc = _update_ns_elem(ds_ctx, elem, &ns_info);
1833         if (PMIX_SUCCESS != rc || NULL == elem->meta_seg || NULL == elem->data_seg) {
1834             PMIX_ERROR_LOG(rc);
1835             goto exit;
1836         }
1837 
1838         /* zero created shared memory segments for this namespace */
1839         memset(elem->meta_seg->seg_info.seg_base_addr, 0, ds_ctx->meta_segment_size);
1840         memset(elem->data_seg->seg_info.seg_base_addr, 0, ds_ctx->data_segment_size);
1841 
1842         /* put ns's shared segments info to the global meta segment. */
1843         rc = _put_ns_info_to_initial_segment(ds_ctx, ns_map, &elem->meta_seg->seg_info, &elem->data_seg->seg_info);
1844         if (PMIX_SUCCESS != rc) {
1845             PMIX_ERROR_LOG(rc);
1846             goto exit;
1847         }
1848     }
1849 
1850     /* Now we know info about meta segment for this namespace. If meta segment
1851      * is not empty, then we look for data for the target rank. If they present, replace it. */
1852     PMIX_CONSTRUCT(&xfer, pmix_buffer_t);
1853     PMIX_LOAD_BUFFER(pmix_globals.mypeer, &xfer, kv->value->data.bo.bytes, kv->value->data.bo.size);
1854 
1855     rc = _store_data_for_rank(ds_ctx, elem, rank, &xfer);
1856 
1857     PMIX_DESTRUCT(&xfer);
1858 
1859     if (PMIX_SUCCESS != rc) {
1860         PMIX_ERROR_LOG(rc);
1861         goto exit;
1862     }
1863 
1864 exit:
1865     return rc;
1866 }
1867 
1868 PMIX_EXPORT pmix_status_t pmix_common_dstor_store(pmix_common_dstore_ctx_t *ds_ctx,
1869                                 const pmix_proc_t *proc,
1870                                 pmix_scope_t scope,
1871                                 pmix_kval_t *kv)
1872 {
1873     pmix_status_t rc = PMIX_SUCCESS;
1874     ns_map_data_t *ns_map;
1875     pmix_kval_t *kv2;
1876     pmix_buffer_t tmp;
1877 
1878     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1879                         "[%s:%d] gds: dstore store for key '%s' scope %d",
1880                         proc->nspace, proc->rank, kv->key, scope);
1881 
1882     if (PMIX_PROC_IS_CLIENT(pmix_globals.mypeer)) {
1883         rc = PMIX_ERR_NOT_SUPPORTED;
1884         PMIX_ERROR_LOG(rc);
1885         return rc;
1886     }
1887 
1888     kv2 = PMIX_NEW(pmix_kval_t);
1889     PMIX_VALUE_CREATE(kv2->value, 1);
1890     kv2->value->type = PMIX_BYTE_OBJECT;
1891 
1892     PMIX_CONSTRUCT(&tmp, pmix_buffer_t);
1893 
1894     PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &tmp, kv, 1, PMIX_KVAL);
1895     PMIX_UNLOAD_BUFFER(&tmp, kv2->value->data.bo.bytes, kv2->value->data.bo.size);
1896 
1897     if (NULL == (ns_map = ds_ctx->session_map_search(ds_ctx, proc->nspace))) {
1898         rc = PMIX_ERROR;
1899         PMIX_ERROR_LOG(rc);
1900         goto exit;
1901     }
1902 
1903     /* set exclusive lock */
1904     rc = _ESH_LOCK(ds_ctx, ns_map->tbl_idx, wr_lock);
1905     if (PMIX_SUCCESS != rc) {
1906         PMIX_ERROR_LOG(rc);
1907         goto exit;
1908     }
1909 
1910     rc = _dstore_store_nolock(ds_ctx, ns_map, proc->rank, kv2);
1911     if (PMIX_SUCCESS != rc) {
1912         PMIX_ERROR_LOG(rc);
1913         goto exit;
1914     }
1915 
1916     /* unset lock */
1917     rc = _ESH_LOCK(ds_ctx, ns_map->tbl_idx, wr_unlock);
1918     if (PMIX_SUCCESS != rc) {
1919         PMIX_ERROR_LOG(rc);
1920         goto exit;
1921     }
1922 
1923 exit:
1924     PMIX_RELEASE(kv2);
1925     PMIX_DESTRUCT(&tmp);
1926 
1927     return rc;
1928 }
1929 
1930 static pmix_status_t _dstore_fetch(pmix_common_dstore_ctx_t *ds_ctx,
1931                                    const char *nspace, pmix_rank_t rank,
1932                                    const char *key, pmix_value_t **kvs)
1933 {
1934     ns_seg_info_t *ns_info = NULL;
1935     pmix_status_t rc = PMIX_ERROR, lock_rc;
1936     ns_track_elem_t *elem;
1937     rank_meta_info *rinfo = NULL;
1938     size_t kval_cnt = 0;
1939     pmix_dstore_seg_desc_t *meta_seg, *data_seg;
1940     uint8_t *addr;
1941     pmix_buffer_t buffer;
1942     pmix_value_t val, *kval = NULL;
1943     uint32_t nprocs;
1944     pmix_rank_t cur_rank;
1945     ns_map_data_t *ns_map = NULL;
1946     bool all_ranks_found = true;
1947     bool key_found = false;
1948     pmix_info_t *info = NULL;
1949     size_t ninfo;
1950     size_t keyhash = 0;
1951     bool lock_is_set = false;
1952 
1953     PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
1954                          "%s:%d:%s: for %s:%u look for key %s",
1955                          __FILE__, __LINE__, __func__, nspace, rank, key));
1956 
1957     if ((PMIX_RANK_UNDEF == rank) && (NULL == key)) {
1958         PMIX_OUTPUT_VERBOSE((7, pmix_gds_base_framework.framework_output,
1959                              "dstore: Does not support passed parameters"));
1960         rc = PMIX_ERR_BAD_PARAM;
1961         goto error;
1962     }
1963 
1964     PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
1965                          "%s:%d:%s: for %s:%u look for key %s",
1966                          __FILE__, __LINE__, __func__, nspace, rank, key));
1967 
1968     /* protect info of dstore segments before it will be updated */
1969     if (!PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) {
1970         if (0 != (rc = pthread_mutex_lock(&ds_ctx->lock))) {
1971             goto error;
1972         }
1973         lock_is_set = true;
1974     }
1975 
1976     if (NULL == (ns_map = ds_ctx->session_map_search(ds_ctx, nspace))) {
1977         /* This call is issued from the the client.
1978          * client must have the session, otherwise the error is fatal.
1979          */
1980         rc = PMIX_ERR_FATAL;
1981         goto error;
1982     }
1983 
1984     if (NULL == kvs) {
1985         rc = PMIX_ERR_FATAL;
1986         goto error;
1987     }
1988 
1989     if (PMIX_RANK_UNDEF == rank) {
1990         ssize_t _nprocs = _get_univ_size(ds_ctx, ns_map->name);
1991         if( 0 > _nprocs ){
1992             goto error;
1993         }
1994         nprocs = (size_t) _nprocs;
1995         cur_rank = 0;
1996     } else {
1997         nprocs = 1;
1998         cur_rank = rank;
1999     }
2000 
2001     /* grab shared lock */
2002     lock_rc = _ESH_LOCK(ds_ctx, ns_map->tbl_idx, rd_lock);
2003     if (PMIX_SUCCESS != lock_rc) {
2004         /* Something wrong with the lock. The error is fatal */
2005         rc = lock_rc;
2006         goto error;
2007     }
2008 
2009     /* First of all, we go through all initial segments and look at their field.
2010      * If it's 1, then generate name of next initial segment incrementing id by one and attach to it.
2011      * We need this step to synchronize initial shared segments with our local track list.
2012      * Then we look for the target namespace in all initial segments.
2013      * If it is found, we get numbers of meta & data segments and
2014      * compare these numbers with the number of trackable meta & data
2015      * segments for this namespace in the local track list.
2016      * If the first number exceeds the last, or the local track list
2017      * doesn't track current namespace yet, then we update it (attach
2018      * to additional segments).
2019      */
2020 
2021     /* first update local information about initial segments. they can be extended, so then we need to attach to new segments. */
2022     _update_initial_segment_info(ds_ctx, ns_map);
2023 
2024     ns_info = _get_ns_info_from_initial_segment(ds_ctx, ns_map);
2025     if (NULL == ns_info) {
2026         /* no data for this namespace is found in the shared memory. */
2027         PMIX_OUTPUT_VERBOSE((7, pmix_gds_base_framework.framework_output,
2028                     "%s:%d:%s:  no data for ns %s is found in the shared memory.",
2029                     __FILE__, __LINE__, __func__, ns_map->name));
2030         rc = PMIX_ERR_PROC_ENTRY_NOT_FOUND;
2031         goto done;
2032     }
2033 
2034     /* get ns_track_elem_t object for the target namespace from the local track list. */
2035     elem = _get_track_elem_for_namespace(ds_ctx, ns_map);
2036     if (NULL == elem) {
2037         /* Shouldn't happen! */
2038         rc = PMIX_ERR_FATAL;
2039         PMIX_ERROR_LOG(rc);
2040         goto done;
2041     }
2042 
2043     /* need to update tracker:
2044      * attach to shared memory regions for this namespace and store its info locally
2045      * to operate with address and detach/unlink afterwards. */
2046     rc = _update_ns_elem(ds_ctx, elem, ns_info);
2047     if (PMIX_SUCCESS != rc) {
2048         PMIX_ERROR_LOG(rc);
2049         goto done;
2050     }
2051 
2052     /* Now we have the data from meta segment for this namespace. */
2053     meta_seg = elem->meta_seg;
2054     data_seg = elem->data_seg;
2055 
2056     if( NULL != key ) {
2057         keyhash = PMIX_DS_KEY_HASH(ds_ctx, key);
2058     }
2059 
2060     /* all segment data updated, ctx lock may released */
2061     if (lock_is_set) {
2062         lock_is_set = false;
2063         if (0 != (rc = pthread_mutex_unlock(&ds_ctx->lock))) {
2064             goto error;
2065         }
2066     }
2067 
2068     while (nprocs--) {
2069         /* Get the rank meta info in the shared meta segment. */
2070         rinfo = _get_rank_meta_info(ds_ctx, cur_rank, meta_seg);
2071         if (NULL == rinfo) {
2072             PMIX_OUTPUT_VERBOSE((7, pmix_gds_base_framework.framework_output,
2073                         "%s:%d:%s:  no data for this rank is found in the shared memory. rank %u",
2074                         __FILE__, __LINE__, __func__, cur_rank));
2075             all_ranks_found = false;
2076             continue;
2077         }
2078         addr = _get_data_region_by_offset(ds_ctx, data_seg, rinfo->offset);
2079         if (NULL == addr) {
2080             /* This means that meta-info is broken - error is fatal */
2081             rc = PMIX_ERR_FATAL;
2082             PMIX_ERROR_LOG(rc);
2083             goto done;
2084         }
2085         kval_cnt = rinfo->count;
2086 
2087         /*  Initialize array for all keys of rank */
2088         if ((NULL == key) && (kval_cnt > 0)) {
2089             kval = (pmix_value_t*)malloc(sizeof(pmix_value_t));
2090             if (NULL == kval) {
2091                 rc = PMIX_ERR_NOMEM;
2092                 goto done;
2093             }
2094             PMIX_VALUE_CONSTRUCT(kval);
2095 
2096             ninfo = kval_cnt;
2097             PMIX_INFO_CREATE(info, ninfo);
2098             if (NULL == info) {
2099                 rc = PMIX_ERR_NOMEM;
2100                 goto done;
2101             }
2102 
2103             kval->type = PMIX_DATA_ARRAY;
2104             kval->data.darray = (pmix_data_array_t*)malloc(sizeof(pmix_data_array_t));
2105             if (NULL == kval->data.darray) {
2106                 rc = PMIX_ERR_NOMEM;
2107                 goto done;
2108             }
2109             kval->data.darray->type = PMIX_INFO;
2110             kval->data.darray->size = ninfo;
2111             kval->data.darray->array = info;
2112             *kvs = kval;
2113         }
2114 
2115         rc = PMIX_SUCCESS;
2116         while (0 < kval_cnt) {
2117             /* data is stored in the following format:
2118              * key_val_pair {
2119              *     size_t size;
2120              *     char key[KNAME_LEN(addr)];
2121              *     byte_t byte[size]; // should be loaded to pmix_buffer_t and unpacked.
2122              * };
2123              * segment_format {
2124              *     key_val_pair kv_array[n];
2125              *     EXTENSION slot;
2126              * }
2127              * EXTENSION slot which has key = EXTENSION_SLOT and a size_t value for offset
2128              * to next data address for this process.
2129              */
2130             if (PMIX_DS_KEY_IS_INVALID(ds_ctx, addr)) {
2131                 PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
2132                             "%s:%d:%s: for rank %s:%u, skip %s region",
2133                             __FILE__, __LINE__, __func__, nspace, cur_rank, ESH_REGION_INVALIDATED));
2134                 /* skip it
2135                  * go to next item, updating address */
2136                 addr += PMIX_DS_KV_SIZE(ds_ctx, addr);
2137             } else if (PMIX_DS_KEY_IS_EXTSLOT(ds_ctx, addr)) {
2138                 size_t offset;
2139                 memcpy(&offset, PMIX_DS_DATA_PTR(ds_ctx, addr), sizeof(size_t));
2140                 PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
2141                             "%s:%d:%s: for rank %s:%u, reached %s with %lu value",
2142                             __FILE__, __LINE__, __func__, nspace, cur_rank, ESH_REGION_EXTENSION, offset));
2143                 if (0 < offset) {
2144                     /* go to next item, updating address */
2145                     addr = _get_data_region_by_offset(ds_ctx, data_seg, offset);
2146                     if (NULL == addr) {
2147                         /* This shouldn't happen - error is fatal */
2148                         rc = PMIX_ERR_FATAL;
2149                         PMIX_ERROR_LOG(rc);
2150                         goto done;
2151                     }
2152                 } else {
2153                     /* no more data for this rank */
2154                     PMIX_OUTPUT_VERBOSE((7, pmix_gds_base_framework.framework_output,
2155                                 "%s:%d:%s:  no more data for this rank is found in the shared memory. rank %u key %s not found",
2156                                 __FILE__, __LINE__, __func__, cur_rank, key));
2157                     break;
2158                 }
2159             } else if (NULL == key) {
2160                 PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
2161                             "%s:%d:%s: for rank %s:%u, found target key %s",
2162                             __FILE__, __LINE__, __func__, nspace, cur_rank, PMIX_DS_KNAME_PTR(ds_ctx, addr)));
2163 
2164                 uint8_t *data_ptr = PMIX_DS_DATA_PTR(ds_ctx, addr);
2165                 size_t data_size = PMIX_DS_DATA_SIZE(ds_ctx, addr, data_ptr);
2166                 PMIX_CONSTRUCT(&buffer, pmix_buffer_t);
2167                 PMIX_LOAD_BUFFER(_client_peer(ds_ctx), &buffer, data_ptr, data_size);
2168                 int cnt = 1;
2169                 /* unpack value for this key from the buffer. */
2170                 PMIX_VALUE_CONSTRUCT(&val);
2171                 PMIX_BFROPS_UNPACK(rc, _client_peer(ds_ctx), &buffer, &val, &cnt, PMIX_VALUE);
2172                 if (PMIX_SUCCESS != rc) {
2173                     PMIX_ERROR_LOG(rc);
2174                     goto done;
2175                 }
2176                 pmix_strncpy(info[kval_cnt - 1].key, PMIX_DS_KNAME_PTR(ds_ctx, addr),
2177                         PMIX_DS_KNAME_LEN(ds_ctx, addr));
2178                 pmix_value_xfer(&info[kval_cnt - 1].value, &val);
2179                 PMIX_VALUE_DESTRUCT(&val);
2180                 buffer.base_ptr = NULL;
2181                 buffer.bytes_used = 0;
2182                 PMIX_DESTRUCT(&buffer);
2183                 key_found = true;
2184 
2185                 kval_cnt--;
2186                 addr += PMIX_DS_KV_SIZE(ds_ctx, addr);
2187             } else if (PMIX_DS_KEY_MATCH(ds_ctx, addr, key, keyhash)) {
2188                 PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
2189                             "%s:%d:%s: for rank %s:%u, found target key %s",
2190                             __FILE__, __LINE__, __func__, nspace, cur_rank, key));
2191                 /* target key is found, get value */
2192                 uint8_t *data_ptr = PMIX_DS_DATA_PTR(ds_ctx, addr);
2193                 size_t data_size = PMIX_DS_DATA_SIZE(ds_ctx, addr, data_ptr);
2194                 PMIX_CONSTRUCT(&buffer, pmix_buffer_t);
2195                 PMIX_LOAD_BUFFER(_client_peer(ds_ctx), &buffer, data_ptr, data_size);
2196                 int cnt = 1;
2197                 /* unpack value for this key from the buffer. */
2198                 *kvs = (pmix_value_t*)malloc(sizeof(pmix_value_t));
2199                 PMIX_BFROPS_UNPACK(rc, _client_peer(ds_ctx), &buffer, (void*)*kvs, &cnt, PMIX_VALUE);
2200                 if (PMIX_SUCCESS != rc) {
2201                     PMIX_ERROR_LOG(rc);
2202                     goto done;
2203                 }
2204                 buffer.base_ptr = NULL;
2205                 buffer.bytes_used = 0;
2206                 PMIX_DESTRUCT(&buffer);
2207                 key_found = true;
2208                 goto done;
2209             } else {
2210                 PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
2211                             "%s:%d:%s: for rank %s:%u, skip key %s look for key %s",
2212                             __FILE__, __LINE__, __func__, nspace, cur_rank,
2213                             PMIX_DS_KNAME_PTR(ds_ctx, addr), key));
2214                 /* go to next item, updating address */
2215                 addr += PMIX_DS_KV_SIZE(ds_ctx, addr);
2216                 kval_cnt--;
2217             }
2218         }
2219 
2220         if (PMIX_RANK_UNDEF == rank) {
2221             cur_rank++;
2222         }
2223     }
2224 
2225 done:
2226     /* unset lock */
2227     lock_rc = _ESH_LOCK(ds_ctx, ns_map->tbl_idx, rd_unlock);
2228     if (PMIX_SUCCESS != lock_rc) {
2229         PMIX_ERROR_LOG(lock_rc);
2230     }
2231 
2232     /* unset ds_ctx lock */
2233     if (lock_is_set) {
2234         pthread_mutex_unlock(&ds_ctx->lock);
2235     }
2236 
2237     if( rc != PMIX_SUCCESS ){
2238         if ((NULL == key) && (kval_cnt > 0)) {
2239             if( NULL != info ) {
2240                 PMIX_INFO_FREE(info, ninfo);
2241             }
2242             if (NULL != kval) {
2243                 PMIX_VALUE_RELEASE(kval);
2244             }
2245         }
2246         return rc;
2247     }
2248 
2249     if( key_found ){
2250         /* the key is found - nothing to do */
2251         return PMIX_SUCCESS;
2252     }
2253 
2254     if( !all_ranks_found ){
2255         /* Not all ranks was found - need to request
2256          * all of them and search again
2257          */
2258         rc = PMIX_ERR_PROC_ENTRY_NOT_FOUND;
2259         return rc;
2260     }
2261     rc = PMIX_ERR_NOT_FOUND;
2262     return rc;
2263 
2264 error:
2265     if (lock_is_set) {
2266         pthread_mutex_unlock(&ds_ctx->lock);
2267     }
2268     PMIX_ERROR_LOG(rc);
2269     return rc;
2270 }
2271 
2272 PMIX_EXPORT pmix_status_t pmix_common_dstor_fetch(pmix_common_dstore_ctx_t *ds_ctx,
2273                                                     const pmix_proc_t *proc,
2274                                                     pmix_scope_t scope, bool copy,
2275                                                     const char *key,
2276                                                     pmix_info_t info[], size_t ninfo,
2277                                                     pmix_list_t *kvs)
2278 {
2279     pmix_kval_t *kv;
2280     pmix_value_t *val;
2281     pmix_status_t rc = PMIX_SUCCESS;
2282 
2283     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
2284                         "gds: dstore fetch `%s`", key == NULL ? "NULL" : key);
2285 
2286     rc = _dstore_fetch(ds_ctx, proc->nspace, proc->rank, key, &val);
2287     if (PMIX_SUCCESS == rc) {
2288         if( NULL == key ) {
2289             pmix_info_t *info;
2290             size_t n, ninfo;
2291 
2292             if (NULL == val->data.darray ||
2293                 PMIX_INFO != val->data.darray->type ||
2294                 0 == val->data.darray->size) {
2295                 PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND);
2296                 return PMIX_ERR_NOT_FOUND;
2297             }
2298             info = (pmix_info_t*)val->data.darray->array;
2299             ninfo = val->data.darray->size;
2300 
2301             for (n = 0; n < ninfo; n++){
2302                 kv = PMIX_NEW(pmix_kval_t);
2303                 if (NULL == kv) {
2304                     rc = PMIX_ERR_NOMEM;
2305                     PMIX_VALUE_RELEASE(val);
2306                     return rc;
2307                 }
2308                 kv->key = strdup(info[n].key);
2309                 PMIX_VALUE_XFER(rc, kv->value, &info[n].value);
2310                 if (PMIX_SUCCESS != rc) {
2311                     PMIX_ERROR_LOG(rc);
2312                     PMIX_RELEASE(kv);
2313                     PMIX_VALUE_RELEASE(val);
2314                     return rc;
2315                 }
2316                 pmix_list_append(kvs, &kv->super);
2317             }
2318 
2319             return PMIX_SUCCESS;
2320         }
2321         /* just return the value */
2322         kv = PMIX_NEW(pmix_kval_t);
2323         if (NULL == kv) {
2324             PMIX_VALUE_RELEASE(val);
2325             return PMIX_ERR_NOMEM;
2326         }
2327         kv->key = strdup(key);
2328         kv->value = val;
2329         pmix_list_append(kvs, &kv->super);
2330     }
2331     return rc;
2332 }
2333 
2334 PMIX_EXPORT pmix_status_t pmix_common_dstor_setup_fork(pmix_common_dstore_ctx_t *ds_ctx, const char *base_path_env,
2335                                            const pmix_proc_t *peer, char ***env)
2336 {
2337     pmix_status_t rc = PMIX_SUCCESS;
2338     ns_map_data_t *ns_map = NULL;
2339 
2340     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
2341                         "gds: dstore setup fork");
2342 
2343     if (NULL == ds_ctx->session_map_search) {
2344         rc = PMIX_ERR_NOT_AVAILABLE;
2345         PMIX_ERROR_LOG(rc);
2346         return rc;
2347     }
2348 
2349     if (NULL == (ns_map = ds_ctx->session_map_search(ds_ctx, peer->nspace))) {
2350         rc = PMIX_ERR_NOT_AVAILABLE;
2351         PMIX_ERROR_LOG(rc);
2352         return rc;
2353     }
2354 
2355     if ((NULL == ds_ctx->base_path) || (strlen(ds_ctx->base_path) == 0)){
2356         rc = PMIX_ERR_NOT_AVAILABLE;
2357         PMIX_ERROR_LOG(rc);
2358         return rc;
2359     }
2360 
2361     if(PMIX_SUCCESS != (rc = pmix_setenv(base_path_env,
2362                                         _ESH_SESSION_path(ds_ctx->session_array, ns_map->tbl_idx),
2363                                          true, env))){
2364         PMIX_ERROR_LOG(rc);
2365     }
2366 
2367     return rc;
2368 }
2369 
2370 PMIX_EXPORT pmix_status_t pmix_common_dstor_add_nspace(pmix_common_dstore_ctx_t *ds_ctx,
2371                                 const char *nspace, pmix_info_t info[], size_t ninfo)
2372 {
2373     pmix_status_t rc = PMIX_SUCCESS;
2374     size_t tbl_idx=0;
2375     uid_t jobuid = ds_ctx->jobuid;
2376     char setjobuid = ds_ctx->setjobuid;
2377     size_t n;
2378     ns_map_data_t *ns_map = NULL;
2379     uint32_t local_size = 0;
2380 
2381     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
2382                         "gds: dstore add nspace");
2383 
2384     if (NULL != info) {
2385         for (n=0; n < ninfo; n++) {
2386             if (0 == strcmp(PMIX_USERID, info[n].key)) {
2387                 jobuid = info[n].value.data.uint32;
2388                 setjobuid = 1;
2389                 continue;
2390             }
2391             if (0 == strcmp(PMIX_LOCAL_SIZE, info[n].key)) {
2392                 local_size = info[n].value.data.uint32;
2393                 continue;
2394             }
2395         }
2396     }
2397 
2398     if (PMIX_SUCCESS != _esh_jobuid_tbl_search(ds_ctx, jobuid, &tbl_idx)) {
2399 
2400         rc = _esh_session_tbl_add(ds_ctx, &tbl_idx);
2401         if (PMIX_SUCCESS != rc) {
2402             PMIX_ERROR_LOG(rc);
2403             return rc;
2404         }
2405         ns_map = _esh_session_map(ds_ctx, nspace, local_size, tbl_idx);
2406         if (NULL == ns_map) {
2407             rc = PMIX_ERROR;
2408             PMIX_ERROR_LOG(rc);
2409             return rc;
2410         }
2411 
2412         if (PMIX_SUCCESS != (rc =_esh_session_init(ds_ctx, tbl_idx, ns_map,
2413                                                    local_size, jobuid, setjobuid))) {
2414             rc = PMIX_ERROR;
2415             PMIX_ERROR_LOG(rc);
2416             return rc;
2417         }
2418     }
2419     else {
2420         ns_map = _esh_session_map(ds_ctx, nspace, local_size, tbl_idx);
2421         if (NULL == ns_map) {
2422             rc = PMIX_ERROR;
2423             PMIX_ERROR_LOG(rc);
2424             return rc;
2425         }
2426     }
2427 
2428     /* lock init */
2429     ds_ctx->lock_cbs->init(&_ESH_SESSION_lock(ds_ctx->session_array, tbl_idx),
2430                            ds_ctx->base_path, nspace, local_size, ds_ctx->jobuid,
2431                            ds_ctx->setjobuid);
2432     if (NULL == _ESH_SESSION_lock(ds_ctx->session_array, tbl_idx)) {
2433         PMIX_ERROR_LOG(rc);
2434         return rc;
2435     }
2436 
2437     return PMIX_SUCCESS;
2438 }
2439 
2440 PMIX_EXPORT pmix_status_t pmix_common_dstor_del_nspace(pmix_common_dstore_ctx_t *ds_ctx, const char* nspace)
2441 {
2442     pmix_status_t rc = PMIX_SUCCESS;
2443     size_t map_idx, size;
2444     int in_use = 0;
2445     ns_map_data_t *ns_map_data = NULL;
2446     ns_map_t *ns_map;
2447     session_t *session_tbl = NULL;
2448     ns_track_elem_t *trk = NULL;
2449     int dstor_track_idx;
2450     size_t session_tbl_idx;
2451 
2452     PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
2453         "%s:%d:%s delete nspace `%s`", __FILE__, __LINE__, __func__, nspace));
2454 
2455     if (NULL == (ns_map_data = ds_ctx->session_map_search(ds_ctx, nspace))) {
2456         rc = PMIX_ERR_NOT_AVAILABLE;
2457         return rc;
2458     }
2459     dstor_track_idx = ns_map_data->track_idx;
2460     session_tbl_idx = ns_map_data->tbl_idx;
2461     size = pmix_value_array_get_size(ds_ctx->ns_map_array);
2462     ns_map = PMIX_VALUE_ARRAY_GET_BASE(ds_ctx->ns_map_array, ns_map_t);
2463 
2464     for (map_idx = 0; map_idx < size; map_idx++){
2465         if (ns_map[map_idx].in_use &&
2466                         (ns_map[map_idx].data.tbl_idx == ns_map_data->tbl_idx)) {
2467             if (0 == strcmp(ns_map[map_idx].data.name, nspace)) {
2468                 _esh_session_map_clean(ds_ctx, &ns_map[map_idx]);
2469                 continue;
2470             }
2471             in_use++;
2472         }
2473     }
2474 
2475     /* A lot of nspaces may be using same session info
2476      * session record can only be deleted once all references are gone */
2477     if (!in_use) {
2478         session_tbl = PMIX_VALUE_ARRAY_GET_BASE(ds_ctx->session_array, session_t);
2479         PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
2480                              "%s:%d:%s delete session for jobuid: %d",
2481                              __FILE__, __LINE__, __func__, session_tbl[session_tbl_idx].jobuid));
2482         size = pmix_value_array_get_size(ds_ctx->ns_track_array);
2483         if (size && (dstor_track_idx >= 0)) {
2484             if((dstor_track_idx + 1) > (int)size) {
2485                 rc = PMIX_ERR_VALUE_OUT_OF_BOUNDS;
2486                 PMIX_ERROR_LOG(rc);
2487                 goto exit;
2488             }
2489             trk = pmix_value_array_get_item(ds_ctx->ns_track_array, dstor_track_idx);
2490             if (true == trk->in_use) {
2491                 PMIX_DESTRUCT(trk);
2492                 pmix_value_array_remove_item(ds_ctx->ns_track_array, dstor_track_idx);
2493             }
2494         }
2495         _esh_session_release(ds_ctx, session_tbl_idx);
2496      }
2497 exit:
2498     return rc;
2499 }
2500 
2501 static inline int _my_client(const char *nspace, pmix_rank_t rank)
2502 {
2503     pmix_peer_t *peer;
2504     int i;
2505     int local = 0;
2506 
2507     for (i = 0; i < pmix_server_globals.clients.size; i++) {
2508         if (NULL != (peer = (pmix_peer_t *)pmix_pointer_array_get_item(&pmix_server_globals.clients, i))) {
2509             if (0 == strcmp(peer->info->pname.nspace, nspace) && peer->info->pname.rank == rank) {
2510                 local = 1;
2511                 break;
2512             }
2513         }
2514     }
2515 
2516     return local;
2517 }
2518 
2519 /* this function is only called by the PMIx server when its
2520  * host has received data from some other peer. It therefore
2521  * always contains data solely from remote procs, and we
2522  * shall store it accordingly */
2523 PMIX_EXPORT pmix_status_t pmix_common_dstor_store_modex(pmix_common_dstore_ctx_t *ds_ctx,
2524                                                         struct pmix_namespace_t *nspace,
2525                                                         pmix_buffer_t *buf,
2526                                                         void *cbdata)
2527 {
2528     pmix_status_t rc = PMIX_SUCCESS;
2529     pmix_status_t rc1 = PMIX_SUCCESS;
2530     pmix_namespace_t *ns = (pmix_namespace_t*)nspace;
2531     ns_map_data_t *ns_map;
2532 
2533     if (NULL == (ns_map = ds_ctx->session_map_search(ds_ctx, ns->nspace))) {
2534         rc = PMIX_ERROR;
2535         PMIX_ERROR_LOG(rc);
2536         return rc;
2537     }
2538 
2539     /* set exclusive lock */
2540     rc = _ESH_LOCK(ds_ctx, ns_map->tbl_idx, wr_lock);
2541     if (PMIX_SUCCESS != rc) {
2542         PMIX_ERROR_LOG(rc);
2543         return rc;
2544     }
2545 
2546     rc = pmix_gds_base_store_modex(nspace, buf, ds_ctx,
2547                     (pmix_gds_base_store_modex_cb_fn_t)_dstor_store_modex_cb,
2548                     cbdata);
2549     if (PMIX_SUCCESS != rc) {
2550         PMIX_ERROR_LOG(rc);
2551     }
2552 
2553     /* unset lock */
2554     rc1 = _ESH_LOCK(ds_ctx, ns_map->tbl_idx, wr_unlock);
2555     if (PMIX_SUCCESS != rc1) {
2556         PMIX_ERROR_LOG(rc1);
2557         if (PMIX_SUCCESS == rc) {
2558             rc = rc1;
2559         }
2560     }
2561 
2562     return rc;
2563 }
2564 
2565 static pmix_status_t _dstor_store_modex_cb(pmix_common_dstore_ctx_t *ds_ctx,
2566                                            pmix_proc_t *proc,
2567                                            pmix_gds_modex_key_fmt_t key_fmt,
2568                                            char **kmap,
2569                                            pmix_buffer_t *pbkt)
2570 {
2571     pmix_status_t rc = PMIX_SUCCESS;
2572     pmix_kval_t *kv;
2573     ns_map_data_t *ns_map;
2574     pmix_buffer_t tmp;
2575 
2576     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
2577                         "[%s:%d] gds:dstore:store_modex for nspace %s",
2578                         pmix_globals.myid.nspace, pmix_globals.myid.rank,
2579                         proc->nspace);
2580 
2581     /* NOTE: THE BYTE OBJECT DELIVERED HERE WAS CONSTRUCTED
2582      * BY A SERVER, AND IS THEREFORE PACKED USING THE SERVER'S
2583      * PEER OBJECT (WHICH IS REQUIRED TO BE THE SAME AS OUR OWN) */
2584 
2585     /* this is data returned via the PMIx_Fence call when
2586      * data collection was requested, so it only contains
2587      * REMOTE/GLOBAL data. The byte object contains
2588      * the rank followed by pmix_kval_t's. The list of callbacks
2589      * contains all local participants. */
2590 
2591     /* don't store blobs to the sm dstore from local clients */
2592     if (_my_client(proc->nspace, proc->rank)) {
2593         return PMIX_SUCCESS;
2594     }
2595 
2596     /* Prepare a buffer to be provided to the dstor store primitive */
2597     PMIX_CONSTRUCT(&tmp, pmix_buffer_t);
2598 
2599     /* unpack the remaining values until we hit the end of the buffer */
2600     kv = PMIX_NEW(pmix_kval_t);
2601     rc = pmix_gds_base_modex_unpack_kval(key_fmt, pbkt, kmap, kv);
2602 
2603     while (PMIX_SUCCESS == rc) {
2604         /* store this in the hash table */
2605         PMIX_GDS_STORE_KV(rc, pmix_globals.mypeer, proc, PMIX_REMOTE, kv);
2606         if (PMIX_SUCCESS != rc) {
2607             PMIX_ERROR_LOG(rc);
2608             return rc;
2609         }
2610 
2611         /* place the key to the to be provided to _dstore_store_nolock */
2612         PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &tmp, kv, 1, PMIX_KVAL);
2613 
2614         /* Release the kv to maintain accounting
2615          * as the hash increments the ref count */
2616         PMIX_RELEASE(kv);
2617 
2618         /* proceed to the next element */
2619         kv = PMIX_NEW(pmix_kval_t);
2620         rc = pmix_gds_base_modex_unpack_kval(key_fmt, pbkt, kmap, kv);
2621         if (PMIX_SUCCESS != rc) {
2622             break;
2623         }
2624     }
2625 
2626     /* Release the kv that didn't received the value
2627      * because input buffer was exhausted */
2628     PMIX_RELEASE(kv);
2629     if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
2630         PMIX_ERROR_LOG(rc);
2631     } else {
2632         rc = PMIX_SUCCESS;
2633     }
2634 
2635     /* Create a key-value pair with the buffer
2636      * to be passed to _dstore_store_nolock */
2637     kv = PMIX_NEW(pmix_kval_t);
2638     PMIX_VALUE_CREATE(kv->value, 1);
2639     kv->value->type = PMIX_BYTE_OBJECT;
2640     PMIX_UNLOAD_BUFFER(&tmp, kv->value->data.bo.bytes, kv->value->data.bo.size);
2641 
2642     /* Get the namespace map element for the process "proc" */
2643     if (NULL == (ns_map = ds_ctx->session_map_search(ds_ctx, proc->nspace))) {
2644         rc = PMIX_ERROR;
2645         PMIX_ERROR_LOG(rc);
2646         return rc;
2647     }
2648 
2649     /* Store all keys at once */
2650     rc = _dstore_store_nolock(ds_ctx, ns_map, proc->rank, kv);
2651     if (PMIX_SUCCESS != rc) {
2652         PMIX_ERROR_LOG(rc);
2653     }
2654 
2655     /* Release all resources */
2656     PMIX_RELEASE(kv);
2657     PMIX_DESTRUCT(&tmp);
2658 
2659     return rc;
2660 }
2661 
2662 static pmix_status_t _store_job_info(pmix_common_dstore_ctx_t *ds_ctx, ns_map_data_t *ns_map,
2663                                      pmix_proc_t *proc)
2664 {
2665     pmix_cb_t cb;
2666     pmix_kval_t *kv;
2667     pmix_buffer_t buf;
2668     pmix_kval_t *kv2 = NULL, *kvp;
2669     pmix_status_t rc = PMIX_SUCCESS;
2670 
2671     PMIX_CONSTRUCT(&cb, pmix_cb_t);
2672     PMIX_CONSTRUCT(&buf, pmix_buffer_t);
2673     kvp = PMIX_NEW(pmix_kval_t);
2674     PMIX_VALUE_CREATE(kvp->value, 1);
2675     kvp->value->type = PMIX_BYTE_OBJECT;
2676 
2677     cb.proc = proc;
2678     cb.scope = PMIX_INTERNAL;
2679     cb.copy = false;
2680 
2681     PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, &cb);
2682     if (PMIX_SUCCESS != rc) {
2683         if (rc == PMIX_ERR_PROC_ENTRY_NOT_FOUND) {
2684             /* there is no error if no data for job info */
2685             rc = PMIX_SUCCESS;
2686         }
2687         goto exit;
2688     }
2689 
2690     PMIX_LIST_FOREACH(kv, &cb.kvs, pmix_kval_t) {
2691       if ((PMIX_PROC_IS_V1(_client_peer(ds_ctx)) || PMIX_PROC_IS_V20(_client_peer(ds_ctx))) &&
2692            0 != strncmp("pmix.", kv->key, 4) &&
2693            kv->value->type == PMIX_DATA_ARRAY) {
2694             pmix_info_t *info;
2695             size_t size, i;
2696             info = kv->value->data.darray->array;
2697             size = kv->value->data.darray->size;
2698 
2699             for (i = 0; i < size; i++) {
2700                 if (0 == strcmp(PMIX_LOCAL_PEERS, info[i].key)) {
2701                     kv2 = PMIX_NEW(pmix_kval_t);
2702                     kv2->key = strdup(kv->key);
2703                     PMIX_VALUE_XFER(rc, kv2->value, &info[i].value);
2704                     if (PMIX_SUCCESS != rc) {
2705                         PMIX_ERROR_LOG(rc);
2706                         PMIX_RELEASE(kv2);
2707                         goto exit;
2708                     }
2709                     PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &buf, kv2, 1, PMIX_KVAL);
2710                     if (PMIX_SUCCESS != rc) {
2711                         PMIX_ERROR_LOG(rc);
2712                         PMIX_RELEASE(kv2);
2713                         goto exit;
2714                     }
2715                     PMIX_RELEASE(kv2);
2716                 }
2717             }
2718         } else {
2719             PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &buf, kv, 1, PMIX_KVAL);
2720             if (PMIX_SUCCESS != rc) {
2721                 PMIX_ERROR_LOG(rc);
2722                 goto exit;
2723             }
2724         }
2725     }
2726 
2727     PMIX_UNLOAD_BUFFER(&buf, kvp->value->data.bo.bytes, kvp->value->data.bo.size);
2728     if (PMIX_SUCCESS != (rc = _dstore_store_nolock(ds_ctx, ns_map, proc->rank, kvp))) {
2729         PMIX_ERROR_LOG(rc);
2730         goto exit;
2731     }
2732 
2733 exit:
2734     PMIX_RELEASE(kvp);
2735     PMIX_DESTRUCT(&cb);
2736     PMIX_DESTRUCT(&buf);
2737     return rc;
2738 }
2739 
2740 PMIX_EXPORT pmix_status_t pmix_common_dstor_register_job_info(pmix_common_dstore_ctx_t *ds_ctx,
2741                                 struct pmix_peer_t *pr,
2742                                 pmix_buffer_t *reply)
2743 {
2744     pmix_peer_t *peer = (pmix_peer_t*)pr;
2745     pmix_namespace_t *ns = peer->nptr;
2746     char *msg;
2747     pmix_status_t rc;
2748     pmix_proc_t proc;
2749     pmix_rank_t rank;
2750 
2751     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
2752                         "[%s:%d] gds:dstore:register_job_info for peer [%s:%d]",
2753                         pmix_globals.myid.nspace, pmix_globals.myid.rank,
2754                         peer->info->pname.nspace, peer->info->pname.rank);
2755 
2756     if (0 == ns->ndelivered) { // don't store twice
2757         ns_map_data_t *ns_map;
2758 
2759         _client_compat_save(ds_ctx, peer);
2760         pmix_strncpy(proc.nspace, ns->nspace, PMIX_MAX_NSLEN);
2761         proc.rank = PMIX_RANK_WILDCARD;
2762         if (NULL == (ns_map = ds_ctx->session_map_search(ds_ctx, proc.nspace))) {
2763             rc = PMIX_ERROR;
2764             PMIX_ERROR_LOG(rc);
2765             return rc;
2766         }
2767 
2768         /* set exclusive lock */
2769         rc = _ESH_LOCK(ds_ctx, ns_map->tbl_idx, wr_lock);
2770         if (PMIX_SUCCESS != rc) {
2771             PMIX_ERROR_LOG(rc);
2772             return rc;
2773         }
2774 
2775         rc = _store_job_info(ds_ctx, ns_map, &proc);
2776         if (PMIX_SUCCESS != rc) {
2777             PMIX_ERROR_LOG(rc);
2778             return rc;
2779         }
2780 
2781         for (rank=0; rank < ns->nprocs; rank++) {
2782             proc.rank = rank;
2783             rc = _store_job_info(ds_ctx, ns_map, &proc);
2784             if (PMIX_SUCCESS != rc) {
2785                 PMIX_ERROR_LOG(rc);
2786                 return rc;
2787             }
2788         }
2789         /* unset lock */
2790         rc = _ESH_LOCK(ds_ctx, ns_map->tbl_idx, wr_unlock);
2791         if (PMIX_SUCCESS != rc) {
2792             PMIX_ERROR_LOG(rc);
2793             return rc;
2794         }
2795     }
2796 
2797     /* answer to client */
2798     msg = ns->nspace;
2799     PMIX_BFROPS_PACK(rc, peer, reply, &msg, 1, PMIX_STRING);
2800     if (PMIX_SUCCESS != rc) {
2801         PMIX_ERROR_LOG(rc);
2802         return rc;
2803     }
2804 
2805     return rc;
2806 }
2807 
2808 PMIX_EXPORT pmix_status_t pmix_common_dstor_store_job_info(pmix_common_dstore_ctx_t *ds_ctx,
2809                                 const char *nspace,
2810                                 pmix_buffer_t *job_data)
2811 {
2812     pmix_status_t rc = PMIX_SUCCESS;
2813 
2814     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
2815                         "[%s:%u] pmix:gds:dstore store job info for nspace %s",
2816                         pmix_globals.myid.nspace, pmix_globals.myid.rank, nspace);
2817 
2818     /* check buf data */
2819     if ((NULL == job_data) || (0 == job_data->bytes_used)) {
2820         rc = PMIX_ERR_BAD_PARAM;
2821         PMIX_ERROR_LOG(rc);
2822         return rc;
2823     }
2824     return rc;
2825 }
2826 
2827 static void _client_compat_save(pmix_common_dstore_ctx_t *ds_ctx, pmix_peer_t *peer)
2828 {
2829     pmix_namespace_t *nptr = NULL;
2830 
2831     if (NULL == ds_ctx->clients_peer) {
2832         ds_ctx->clients_peer = PMIX_NEW(pmix_peer_t);
2833         nptr = PMIX_NEW(pmix_namespace_t);
2834         ds_ctx->clients_peer->nptr = nptr;
2835     }
2836     ds_ctx->clients_peer->nptr->compat = peer->nptr->compat;
2837     ds_ctx->clients_peer->proc_type = peer->proc_type;
2838 }
2839 
2840 static inline pmix_peer_t * _client_peer(pmix_common_dstore_ctx_t *ds_ctx)
2841 {
2842     if (NULL == ds_ctx->clients_peer) {
2843         return pmix_globals.mypeer;
2844     }
2845     return ds_ctx->clients_peer;
2846 }

/* [<][>][^][v][top][bottom][index][help] */