This source file includes following definitions.
- ncon
- ndes
- _esh_session_map_clean
- _esh_dir_del
- _esh_tbls_init
- _esh_ns_map_cleanup
- _esh_sessions_cleanup
- _esh_ns_track_cleanup
- _esh_session_map
- _esh_jobuid_tbl_search
- _esh_session_tbl_add
- _esh_session_map_search_server
- _esh_session_map_search_client
- _esh_session_init
- _esh_session_release
- _set_constants_from_env
- _update_ns_elem
- _put_ns_info_to_initial_segment
- _update_initial_segment_info
- _get_ns_info_from_initial_segment
- _get_track_elem_for_namespace
- _get_rank_meta_info
- set_rank_meta_info
- _get_data_region_by_offset
- get_free_offset
- put_empty_ext_slot
- put_data_to_the_end
- pmix_sm_store
- _store_data_for_rank
- _get_univ_size
- pmix_common_dstor_cache_job_info
- pmix_common_dstor_init
- pmix_common_dstor_finalize
- _dstore_store_nolock
- pmix_common_dstor_store
- _dstore_fetch
- pmix_common_dstor_fetch
- pmix_common_dstor_setup_fork
- pmix_common_dstor_add_nspace
- pmix_common_dstor_del_nspace
- _my_client
- pmix_common_dstor_store_modex
- _dstor_store_modex_cb
- _store_job_info
- pmix_common_dstor_register_job_info
- pmix_common_dstor_store_job_info
- _client_compat_save
- _client_peer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 #include <src/include/pmix_config.h>
17
18 #include <stdio.h>
19 #include <sys/types.h>
20 #include <sys/stat.h>
21 #include <sys/file.h>
22 #include <dirent.h>
23 #include <errno.h>
24 #ifdef HAVE_UNISTD_H
25 #include <unistd.h>
26 #endif
27 #ifdef HAVE_SYS_TYPES_H
28 #include <sys/types.h>
29 #endif
30 #ifdef HAVE_SYS_STAT_H
31 #include <sys/stat.h>
32 #endif
33 #ifdef HAVE_FCNTL_H
34 #include <fcntl.h>
35 #endif
36 #include <time.h>
37
38 #include <pmix_common.h>
39
40 #include "src/include/pmix_globals.h"
41 #include "src/class/pmix_list.h"
42 #include "src/client/pmix_client_ops.h"
43 #include "src/server/pmix_server_ops.h"
44 #include "src/util/argv.h"
45 #include "src/mca/pcompress/pcompress.h"
46 #include "src/util/error.h"
47 #include "src/util/output.h"
48 #include "src/util/pmix_environ.h"
49 #include "src/util/hash.h"
50 #include "src/mca/preg/preg.h"
51
52 #include "src/mca/gds/base/base.h"
53 #include "src/mca/pshmem/base/base.h"
54 #include "dstore_common.h"
55 #include "dstore_base.h"
56 #include "dstore_segment.h"
57
58 #define ESH_REGION_EXTENSION "EXTENSION_SLOT"
59 #define ESH_REGION_INVALIDATED "INVALIDATED"
60 #define ESH_ENV_INITIAL_SEG_SIZE "INITIAL_SEG_SIZE"
61 #define ESH_ENV_NS_META_SEG_SIZE "NS_META_SEG_SIZE"
62 #define ESH_ENV_NS_DATA_SEG_SIZE "NS_DATA_SEG_SIZE"
63 #define ESH_ENV_LINEAR "SM_USE_LINEAR_SEARCH"
64
65 #define ESH_INIT_SESSION_TBL_SIZE 2
66 #define ESH_INIT_NS_MAP_TBL_SIZE 2
67
68 static int _store_data_for_rank(pmix_common_dstore_ctx_t *ds_ctx, ns_track_elem_t *ns_info,
69 pmix_rank_t rank, pmix_buffer_t *buf);
70 static int _update_ns_elem(pmix_common_dstore_ctx_t *ds_ctx, ns_track_elem_t *ns_elem, ns_seg_info_t *info);
71 static int _put_ns_info_to_initial_segment(pmix_common_dstore_ctx_t *ds_ctx,
72 const ns_map_data_t *ns_map, pmix_pshmem_seg_t *metaseg,
73 pmix_pshmem_seg_t *dataseg);
74 static ns_seg_info_t *_get_ns_info_from_initial_segment(pmix_common_dstore_ctx_t *ds_ctx,
75 const ns_map_data_t *ns_map);
76 static ns_track_elem_t *_get_track_elem_for_namespace(pmix_common_dstore_ctx_t *ds_ctx,
77 ns_map_data_t *ns_map);
78 static rank_meta_info *_get_rank_meta_info(pmix_common_dstore_ctx_t *ds_ctx, pmix_rank_t rank,
79 pmix_dstore_seg_desc_t *segdesc);
80 static uint8_t *_get_data_region_by_offset(pmix_common_dstore_ctx_t *ds_ctx,
81 pmix_dstore_seg_desc_t *segdesc, size_t offset);
82 static void _update_initial_segment_info(pmix_common_dstore_ctx_t *ds_ctx,
83 const ns_map_data_t *ns_map);
84 static void _set_constants_from_env(pmix_common_dstore_ctx_t *ds_ctx);
85 static inline ssize_t _get_univ_size(pmix_common_dstore_ctx_t *ds_ctx, const char *nspace);
86
87 static inline ns_map_data_t * _esh_session_map_search_server(pmix_common_dstore_ctx_t *ds_ctx,
88 const char *nspace);
89 static inline ns_map_data_t * _esh_session_map_search_client(pmix_common_dstore_ctx_t *ds_ctx,
90 const char *nspace);
91 static inline ns_map_data_t * _esh_session_map(pmix_common_dstore_ctx_t *ds_ctx,
92 const char *nspace, uint32_t local_size,
93 size_t tbl_idx);
94 static inline void _esh_session_map_clean(pmix_common_dstore_ctx_t *ds_ctx, ns_map_t *m);
95 static inline int _esh_jobuid_tbl_search(pmix_common_dstore_ctx_t *ds_ctx,
96 uid_t jobuid, size_t *tbl_idx);
97 static inline int _esh_session_tbl_add(pmix_common_dstore_ctx_t *ds_ctx, size_t *tbl_idx);
98 static int _esh_session_init(pmix_common_dstore_ctx_t *ds_ctx, size_t idx, ns_map_data_t *m,
99 uint32_t local_size, size_t jobuid, int setjobuid);
100 static void _esh_session_release(pmix_common_dstore_ctx_t *ds_ctx, size_t idx);
101 static inline void _esh_ns_track_cleanup(pmix_common_dstore_ctx_t *ds_ctx);
102 static inline void _esh_sessions_cleanup(pmix_common_dstore_ctx_t *ds_ctx);
103 static inline void _esh_ns_map_cleanup(pmix_common_dstore_ctx_t *ds_ctx);
104 static inline int _esh_dir_del(const char *dirname);
105 static inline void _client_compat_save(pmix_common_dstore_ctx_t *ds_ctx, pmix_peer_t *peer);
106 static inline pmix_peer_t * _client_peer(pmix_common_dstore_ctx_t *ds_ctx);
107
108 static inline int _my_client(const char *nspace, pmix_rank_t rank);
109
110 static pmix_status_t _dstor_store_modex_cb(pmix_common_dstore_ctx_t *ds_ctx,
111 pmix_proc_t *proc,
112 pmix_gds_modex_key_fmt_t key_fmt,
113 char **kmap,
114 pmix_buffer_t *pbkt);
115
116 static pmix_status_t _dstore_store_nolock(pmix_common_dstore_ctx_t *ds_ctx,
117 ns_map_data_t *ns_map,
118 pmix_rank_t rank,
119 pmix_kval_t *kv);
120
121 static pmix_status_t _dstore_fetch(pmix_common_dstore_ctx_t *ds_ctx,
122 const char *nspace, pmix_rank_t rank,
123 const char *key, pmix_value_t **kvs);
124
125 ns_map_data_t * (*_esh_session_map_search)(const char *nspace) = NULL;
126
127 #define _ESH_SESSION_lock(session_array, tbl_idx) \
128 (PMIX_VALUE_ARRAY_GET_BASE(session_array, session_t)[tbl_idx].lock)
129
130 #define _ESH_SESSION_path(session_array, tbl_idx) \
131 (PMIX_VALUE_ARRAY_GET_BASE(session_array, session_t)[tbl_idx].nspace_path)
132
133 #define _ESH_SESSION_lockfile(session_array, tbl_idx) \
134 (PMIX_VALUE_ARRAY_GET_BASE(session_array, session_t)[tbl_idx].lockfile)
135
136 #define _ESH_SESSION_setjobuid(session_array, tbl_idx) \
137 (PMIX_VALUE_ARRAY_GET_BASE(session_array, session_t)[tbl_idx].setjobuid)
138
139 #define _ESH_SESSION_jobuid(session_array, tbl_idx) \
140 (PMIX_VALUE_ARRAY_GET_BASE(session_array, session_t)[tbl_idx].jobuid)
141
142 #define _ESH_SESSION_sm_seg_first(session_array, tbl_idx) \
143 (PMIX_VALUE_ARRAY_GET_BASE(session_array, session_t)[tbl_idx].sm_seg_first)
144 #define _ESH_SESSION_sm_seg_last(session_array, tbl_idx) \
145 (PMIX_VALUE_ARRAY_GET_BASE(session_array, session_t)[tbl_idx].sm_seg_last)
146
147 #define _ESH_SESSION_ns_info(session_array, tbl_idx) \
148 (PMIX_VALUE_ARRAY_GET_BASE(session_array, session_t)[tbl_idx].ns_info)
149
150 #ifdef ESH_PTHREAD_LOCK
151 #define _ESH_SESSION_pthread_rwlock(tbl_idx) (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].rwlock)
152 #define _ESH_SESSION_pthread_seg(tbl_idx) (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].rwlock_seg)
153 #define _ESH_SESSION_lock(tbl_idx) _ESH_SESSION_pthread_rwlock(tbl_idx)
154 #endif
155
156 #ifdef ESH_FCNTL_LOCK
157 #define _ESH_SESSION_lockfd(tbl_idx) (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].lockfd)
158 #define _ESH_SESSION_lock(tbl_idx) _ESH_SESSION_lockfd(tbl_idx)
159 #endif
160
161 #define _ESH_LOCK(ds_ctx, session_id, operation) \
162 __pmix_attribute_extension__ ({ \
163 pmix_status_t rc = PMIX_SUCCESS; \
164 rc = ds_ctx->lock_cbs->operation(_ESH_SESSION_lock(ds_ctx->session_array, \
165 session_id)); \
166 rc; \
167 })
168
169 static void ncon(ns_track_elem_t *p) {
170 memset(&p->ns_map, 0, sizeof(p->ns_map));
171 p->meta_seg = NULL;
172 p->data_seg = NULL;
173 p->num_meta_seg = 0;
174 p->num_data_seg = 0;
175 p->in_use = true;
176 }
177
178 static void ndes(ns_track_elem_t *p) {
179 pmix_common_dstor_delete_sm_desc(p->meta_seg);
180 pmix_common_dstor_delete_sm_desc(p->data_seg);
181 memset(&p->ns_map, 0, sizeof(p->ns_map));
182 p->in_use = false;
183 }
184
185 PMIX_CLASS_INSTANCE(ns_track_elem_t,
186 pmix_value_array_t,
187 ncon, ndes);
188
189 static inline void _esh_session_map_clean(pmix_common_dstore_ctx_t *ds_ctx, ns_map_t *m) {
190 memset(m, 0, sizeof(*m));
191 m->data.track_idx = -1;
192 }
193
194 static inline int _esh_dir_del(const char *path)
195 {
196 DIR *dir;
197 struct dirent *d_ptr;
198 struct stat st;
199 pmix_status_t rc = PMIX_SUCCESS;
200
201 char name[PMIX_PATH_MAX];
202
203 dir = opendir(path);
204 if (NULL == dir) {
205 rc = PMIX_ERR_BAD_PARAM;
206 return rc;
207 }
208
209 while (NULL != (d_ptr = readdir(dir))) {
210 snprintf(name, PMIX_PATH_MAX, "%s/%s", path, d_ptr->d_name);
211 if ( 0 > lstat(name, &st) ){
212
213
214
215 PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND);
216 continue;
217 }
218
219 if(S_ISDIR(st.st_mode)) {
220 if(strcmp(d_ptr->d_name, ".") && strcmp(d_ptr->d_name, "..")) {
221 rc = _esh_dir_del(name);
222 if( PMIX_SUCCESS != rc ){
223
224
225
226 PMIX_ERROR_LOG(rc);
227 }
228 }
229 }
230 else {
231 if( 0 > unlink(name) ){
232
233
234
235 PMIX_ERROR_LOG(PMIX_ERR_NO_PERMISSIONS);
236 }
237 }
238 }
239 closedir(dir);
240
241
242 if( 0 > rmdir(path) ){
243 rc = PMIX_ERR_NO_PERMISSIONS;
244 PMIX_ERROR_LOG(rc);
245 }
246 return rc;
247 }
248
249 static inline int _esh_tbls_init(pmix_common_dstore_ctx_t *ds_ctx)
250 {
251 pmix_status_t rc = PMIX_SUCCESS;
252 size_t idx;
253
254
255 ds_ctx->ns_track_array = NULL;
256 ds_ctx->session_array = NULL;
257 ds_ctx->ns_map_array = NULL;
258
259
260 if (NULL == (ds_ctx->ns_track_array = PMIX_NEW(pmix_value_array_t))) {
261 rc = PMIX_ERR_OUT_OF_RESOURCE;
262 PMIX_ERROR_LOG(rc);
263 goto err_exit;
264 }
265 if (PMIX_SUCCESS != (rc = pmix_value_array_init(ds_ctx->ns_track_array, sizeof(ns_track_elem_t)))){
266 PMIX_ERROR_LOG(rc);
267 goto err_exit;
268 }
269
270
271 if (NULL == (ds_ctx->session_array = PMIX_NEW(pmix_value_array_t))){
272 rc = PMIX_ERR_OUT_OF_RESOURCE;
273 PMIX_ERROR_LOG(rc);
274 goto err_exit;
275 }
276 if (PMIX_SUCCESS != (rc = pmix_value_array_init(ds_ctx->session_array, sizeof(session_t)))) {
277 PMIX_ERROR_LOG(rc);
278 goto err_exit;
279 }
280 if (PMIX_SUCCESS != (rc = pmix_value_array_set_size(ds_ctx->session_array, ESH_INIT_SESSION_TBL_SIZE))) {
281 PMIX_ERROR_LOG(rc);
282 goto err_exit;
283 }
284 for (idx = 0; idx < ESH_INIT_SESSION_TBL_SIZE; idx++) {
285 memset(pmix_value_array_get_item(ds_ctx->session_array, idx), 0, sizeof(session_t));
286 }
287
288
289 if (NULL == (ds_ctx->ns_map_array = PMIX_NEW(pmix_value_array_t))) {
290 rc = PMIX_ERR_OUT_OF_RESOURCE;
291 PMIX_ERROR_LOG(rc);
292 goto err_exit;
293 }
294 if (PMIX_SUCCESS != (rc = pmix_value_array_init(ds_ctx->ns_map_array, sizeof(ns_map_t)))) {
295 PMIX_ERROR_LOG(rc);
296 goto err_exit;
297 }
298 if (PMIX_SUCCESS != (rc = pmix_value_array_set_size(ds_ctx->ns_map_array, ESH_INIT_NS_MAP_TBL_SIZE))) {
299 PMIX_ERROR_LOG(rc);
300 goto err_exit;
301 }
302 for (idx = 0; idx < ESH_INIT_NS_MAP_TBL_SIZE; idx++) {
303 _esh_session_map_clean(ds_ctx, pmix_value_array_get_item(ds_ctx->ns_map_array, idx));
304 }
305
306 return PMIX_SUCCESS;
307 err_exit:
308 if (NULL != ds_ctx->ns_track_array) {
309 PMIX_RELEASE(ds_ctx->ns_track_array);
310 }
311 if (NULL != ds_ctx->session_array) {
312 PMIX_RELEASE(ds_ctx->session_array);
313 }
314 if (NULL != ds_ctx->ns_map_array) {
315 PMIX_RELEASE(ds_ctx->ns_map_array);
316 }
317 return rc;
318 }
319
320 static inline void _esh_ns_map_cleanup(pmix_common_dstore_ctx_t *ds_ctx)
321 {
322 size_t idx;
323 size_t size;
324 ns_map_t *ns_map;
325
326 if (NULL == ds_ctx->ns_map_array) {
327 return;
328 }
329
330 size = pmix_value_array_get_size(ds_ctx->ns_map_array);
331 ns_map = PMIX_VALUE_ARRAY_GET_BASE(ds_ctx->ns_map_array, ns_map_t);
332
333 for (idx = 0; idx < size; idx++) {
334 if(ns_map[idx].in_use) {
335 _esh_session_map_clean(ds_ctx, &ns_map[idx]);
336 }
337 }
338
339 PMIX_RELEASE(ds_ctx->ns_map_array);
340 ds_ctx->ns_map_array = NULL;
341 }
342
343 static inline void _esh_sessions_cleanup(pmix_common_dstore_ctx_t *ds_ctx)
344 {
345 size_t idx;
346 size_t size;
347 session_t *s_tbl;
348
349 if (NULL == ds_ctx->session_array) {
350 return;
351 }
352
353 size = pmix_value_array_get_size(ds_ctx->session_array);
354 s_tbl = PMIX_VALUE_ARRAY_GET_BASE(ds_ctx->session_array, session_t);
355
356 for (idx = 0; idx < size; idx++) {
357 if(s_tbl[idx].in_use)
358 _esh_session_release(ds_ctx, idx);
359 }
360
361 PMIX_RELEASE(ds_ctx->session_array);
362 ds_ctx->session_array = NULL;
363 }
364
365 static inline void _esh_ns_track_cleanup(pmix_common_dstore_ctx_t *ds_ctx)
366 {
367 int size;
368 ns_track_elem_t *ns_trk;
369
370 if (NULL == ds_ctx->ns_track_array) {
371 return;
372 }
373
374 size = pmix_value_array_get_size(ds_ctx->ns_track_array);
375 ns_trk = PMIX_VALUE_ARRAY_GET_BASE(ds_ctx->ns_track_array, ns_track_elem_t);
376
377 for (int i = 0; i < size; i++) {
378 ns_track_elem_t *trk = ns_trk + i;
379 if (trk->in_use) {
380 PMIX_DESTRUCT(trk);
381 }
382 }
383
384 PMIX_RELEASE(ds_ctx->ns_track_array);
385 ds_ctx->ns_track_array = NULL;
386 }
387
388 static inline ns_map_data_t * _esh_session_map(pmix_common_dstore_ctx_t *ds_ctx,
389 const char *nspace, uint32_t local_size,
390 size_t tbl_idx)
391 {
392 size_t map_idx;
393 size_t size = pmix_value_array_get_size(ds_ctx->ns_map_array);
394 ns_map_t *ns_map = PMIX_VALUE_ARRAY_GET_BASE(ds_ctx->ns_map_array, ns_map_t);
395 ns_map_t *new_map = NULL;
396
397 if (NULL == nspace) {
398 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
399 return NULL;
400 }
401
402 for(map_idx = 0; map_idx < size; map_idx++) {
403 if (!ns_map[map_idx].in_use) {
404 ns_map[map_idx].in_use = true;
405 pmix_strncpy(ns_map[map_idx].data.name, nspace, sizeof(ns_map[map_idx].data.name)-1);
406 ns_map[map_idx].data.tbl_idx = tbl_idx;
407 return &ns_map[map_idx].data;
408 }
409 }
410
411 if (NULL == (new_map = pmix_value_array_get_item(ds_ctx->ns_map_array, map_idx))) {
412 PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
413 return NULL;
414 }
415
416 _esh_session_map_clean(ds_ctx, new_map);
417 new_map->in_use = true;
418 new_map->data.tbl_idx = tbl_idx;
419 pmix_strncpy(new_map->data.name, nspace, sizeof(new_map->data.name)-1);
420
421 return &new_map->data;
422 }
423
424 static inline int _esh_jobuid_tbl_search(pmix_common_dstore_ctx_t *ds_ctx,
425 uid_t jobuid, size_t *tbl_idx)
426 {
427 size_t idx, size;
428 session_t *session_tbl = NULL;
429
430 size = pmix_value_array_get_size(ds_ctx->session_array);
431 session_tbl = PMIX_VALUE_ARRAY_GET_BASE(ds_ctx->session_array, session_t);
432
433 for(idx = 0; idx < size; idx++) {
434 if (session_tbl[idx].in_use && session_tbl[idx].jobuid == jobuid) {
435 *tbl_idx = idx;
436 return PMIX_SUCCESS;
437 }
438 }
439
440 return PMIX_ERR_NOT_FOUND;
441 }
442
443 static inline int _esh_session_tbl_add(pmix_common_dstore_ctx_t *ds_ctx, size_t *tbl_idx)
444 {
445 size_t idx;
446 size_t size = pmix_value_array_get_size(ds_ctx->session_array);
447 session_t *s_tbl = PMIX_VALUE_ARRAY_GET_BASE(ds_ctx->session_array, session_t);
448 session_t *new_sesion;
449 pmix_status_t rc = PMIX_SUCCESS;
450
451 for(idx = 0; idx < size; idx ++) {
452 if (0 == s_tbl[idx].in_use) {
453 goto done;
454 }
455 }
456
457 if (NULL == (new_sesion = pmix_value_array_get_item(ds_ctx->session_array, idx))) {
458 rc = PMIX_ERR_OUT_OF_RESOURCE;
459 PMIX_ERROR_LOG(rc);
460 return rc;
461 }
462
463 done:
464 s_tbl[idx].in_use = 1;
465 *tbl_idx = idx;
466
467 return PMIX_SUCCESS;
468 }
469
470 static inline ns_map_data_t * _esh_session_map_search_server(pmix_common_dstore_ctx_t *ds_ctx,
471 const char *nspace)
472 {
473 size_t idx, size = pmix_value_array_get_size(ds_ctx->ns_map_array);
474 ns_map_t *ns_map = PMIX_VALUE_ARRAY_GET_BASE(ds_ctx->ns_map_array, ns_map_t);
475 if (NULL == nspace) {
476 return NULL;
477 }
478
479 for (idx = 0; idx < size; idx++) {
480 if (ns_map[idx].in_use &&
481 (0 == strcmp(ns_map[idx].data.name, nspace))) {
482 return &ns_map[idx].data;
483 }
484 }
485 return NULL;
486 }
487
488 static inline ns_map_data_t * _esh_session_map_search_client(pmix_common_dstore_ctx_t *ds_ctx,
489 const char *nspace)
490 {
491 size_t idx, size = pmix_value_array_get_size(ds_ctx->ns_map_array);
492 ns_map_t *ns_map = PMIX_VALUE_ARRAY_GET_BASE(ds_ctx->ns_map_array, ns_map_t);
493
494 if (NULL == nspace) {
495 return NULL;
496 }
497
498 for (idx = 0; idx < size; idx++) {
499 if (ns_map[idx].in_use &&
500 (0 == strcmp(ns_map[idx].data.name, nspace))) {
501 return &ns_map[idx].data;
502 }
503 }
504 return _esh_session_map(ds_ctx, nspace, 0, 0);
505 }
506
507 static int _esh_session_init(pmix_common_dstore_ctx_t *ds_ctx, size_t idx, ns_map_data_t *m,
508 uint32_t local_size, size_t jobuid, int setjobuid)
509 {
510 pmix_dstore_seg_desc_t *seg = NULL;
511 session_t *s = &(PMIX_VALUE_ARRAY_GET_ITEM(ds_ctx->session_array, session_t, idx));
512 pmix_status_t rc = PMIX_SUCCESS;
513
514 s->setjobuid = setjobuid;
515 s->jobuid = jobuid;
516 s->nspace_path = strdup(ds_ctx->base_path);
517
518 if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) {
519 if (0 != mkdir(s->nspace_path, 0770)) {
520 if (EEXIST != errno) {
521 pmix_output(0, "session init: can not create session directory \"%s\": %s",
522 s->nspace_path, strerror(errno));
523 rc = PMIX_ERROR;
524 PMIX_ERROR_LOG(rc);
525 return rc;
526 }
527 }
528 if (s->setjobuid > 0){
529 if (0 > chown(s->nspace_path, (uid_t) s->jobuid, (gid_t) -1)){
530 rc = PMIX_ERROR;
531 PMIX_ERROR_LOG(rc);
532 return rc;
533 }
534 }
535 seg = pmix_common_dstor_create_new_segment(PMIX_DSTORE_INITIAL_SEGMENT, ds_ctx->base_path,
536 m->name, 0, ds_ctx->jobuid, ds_ctx->setjobuid);
537 if( NULL == seg ){
538 rc = PMIX_ERR_OUT_OF_RESOURCE;
539 PMIX_ERROR_LOG(rc);
540 return rc;
541 }
542 }
543 else {
544 seg = pmix_common_dstor_attach_new_segment(PMIX_DSTORE_INITIAL_SEGMENT, ds_ctx->base_path, m->name, 0);
545 if( NULL == seg ){
546 rc = PMIX_ERR_OUT_OF_RESOURCE;
547 PMIX_ERROR_LOG(rc);
548 return rc;
549 }
550 }
551 s->sm_seg_first = seg;
552 s->sm_seg_last = s->sm_seg_first;
553
554 return PMIX_SUCCESS;
555 }
556
557 static void _esh_session_release(pmix_common_dstore_ctx_t *ds_ctx, size_t idx)
558 {
559 session_t *s = &(PMIX_VALUE_ARRAY_GET_ITEM(ds_ctx->session_array, session_t, idx));
560
561 if (!s->in_use) {
562 return;
563 }
564
565 pmix_common_dstor_delete_sm_desc(s->sm_seg_first);
566
567 ds_ctx->lock_cbs->finalize(&_ESH_SESSION_lock(ds_ctx->session_array, idx));
568
569 if (NULL != s->nspace_path) {
570 if(PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) {
571 _esh_dir_del(s->nspace_path);
572 }
573 free(s->nspace_path);
574 }
575 memset ((char *) s, 0, sizeof(*s));
576 }
577
578 static void _set_constants_from_env(pmix_common_dstore_ctx_t *ds_ctx)
579 {
580 char *str;
581 int page_size = pmix_common_dstor_getpagesize();
582
583 if( NULL != (str = getenv(ESH_ENV_INITIAL_SEG_SIZE)) ) {
584 ds_ctx->initial_segment_size = strtoul(str, NULL, 10);
585 if ((size_t)page_size > ds_ctx->initial_segment_size) {
586 ds_ctx->initial_segment_size = (size_t)page_size;
587 }
588 }
589 if (0 == ds_ctx->initial_segment_size) {
590 ds_ctx->initial_segment_size = INITIAL_SEG_SIZE;
591 }
592 if( NULL != (str = getenv(ESH_ENV_NS_META_SEG_SIZE)) ) {
593 ds_ctx->meta_segment_size = strtoul(str, NULL, 10);
594 if ((size_t)page_size > ds_ctx->meta_segment_size) {
595 ds_ctx->meta_segment_size = (size_t)page_size;
596 }
597 }
598 if (0 == ds_ctx->meta_segment_size) {
599 ds_ctx->meta_segment_size = NS_META_SEG_SIZE;
600 }
601 if( NULL != (str = getenv(ESH_ENV_NS_DATA_SEG_SIZE)) ) {
602 ds_ctx->data_segment_size = strtoul(str, NULL, 10);
603 if ((size_t)page_size > ds_ctx->data_segment_size) {
604 ds_ctx->data_segment_size = (size_t)page_size;
605 }
606 }
607 if (0 == ds_ctx->data_segment_size) {
608 ds_ctx->data_segment_size = NS_DATA_SEG_SIZE;
609 }
610 if (NULL != (str = getenv(ESH_ENV_LINEAR))) {
611 if (1 == strtoul(str, NULL, 10)) {
612 ds_ctx->direct_mode = 1;
613 }
614 }
615
616 ds_ctx->lock_segment_size = page_size;
617 ds_ctx->max_ns_num = (ds_ctx->initial_segment_size - sizeof(size_t) * 2) / sizeof(ns_seg_info_t);
618 ds_ctx->max_meta_elems = (ds_ctx->meta_segment_size - sizeof(size_t)) / sizeof(rank_meta_info);
619
620 pmix_common_dstor_init_segment_info(ds_ctx->initial_segment_size, ds_ctx->meta_segment_size,
621 ds_ctx->data_segment_size);
622
623 }
624
625
626 static int _update_ns_elem(pmix_common_dstore_ctx_t *ds_ctx, ns_track_elem_t *ns_elem,
627 ns_seg_info_t *info)
628 {
629 pmix_dstore_seg_desc_t *seg, *tmp = NULL;
630 size_t i, offs;
631 ns_map_data_t *ns_map = NULL;
632 pmix_status_t rc;
633
634 PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
635 "%s:%d:%s",
636 __FILE__, __LINE__, __func__));
637
638 if (NULL == (ns_map = ds_ctx->session_map_search(ds_ctx, info->ns_map.name))) {
639 rc = PMIX_ERR_NOT_AVAILABLE;
640 PMIX_ERROR_LOG(rc);
641 return rc;
642 }
643
644 tmp = ns_elem->meta_seg;
645 if (NULL != tmp) {
646 while(NULL != tmp->next) {
647 tmp = tmp->next;
648 }
649 }
650
651
652 for (i = ns_elem->num_meta_seg; i < info->num_meta_seg; i++) {
653 if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) {
654 seg = pmix_common_dstor_create_new_segment(PMIX_DSTORE_NS_META_SEGMENT, ds_ctx->base_path,
655 info->ns_map.name, i, ds_ctx->jobuid,
656 ds_ctx->setjobuid);
657 if (NULL == seg) {
658 rc = PMIX_ERR_OUT_OF_RESOURCE;
659 PMIX_ERROR_LOG(rc);
660 return rc;
661 }
662 } else {
663 seg = pmix_common_dstor_attach_new_segment(PMIX_DSTORE_NS_META_SEGMENT, ds_ctx->base_path, info->ns_map.name, i);
664 if (NULL == seg) {
665 rc = PMIX_ERR_NOT_AVAILABLE;
666 PMIX_ERROR_LOG(rc);
667 return rc;
668 }
669 }
670
671 if (NULL == tmp) {
672 ns_elem->meta_seg = seg;
673 } else {
674 tmp->next = seg;
675 }
676 tmp = seg;
677 ns_elem->num_meta_seg++;
678 }
679
680 tmp = ns_elem->data_seg;
681 if (NULL != tmp) {
682 while(NULL != tmp->next) {
683 tmp = tmp->next;
684 }
685 }
686
687 for (i = ns_elem->num_data_seg; i < info->num_data_seg; i++) {
688 if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) {
689 seg = pmix_common_dstor_create_new_segment(PMIX_DSTORE_NS_DATA_SEGMENT, ds_ctx->base_path,
690 info->ns_map.name, i, ds_ctx->jobuid,
691 ds_ctx->setjobuid);
692 if (NULL == seg) {
693 rc = PMIX_ERR_OUT_OF_RESOURCE;
694 PMIX_ERROR_LOG(rc);
695 return rc;
696 }
697 offs = sizeof(size_t);
698 memcpy(seg->seg_info.seg_base_addr, &offs, sizeof(size_t));
699 } else {
700 seg = pmix_common_dstor_attach_new_segment(PMIX_DSTORE_NS_DATA_SEGMENT, ds_ctx->base_path, info->ns_map.name, i);
701 if (NULL == seg) {
702 rc = PMIX_ERR_NOT_AVAILABLE;
703 PMIX_ERROR_LOG(rc);
704 return rc;
705 }
706 }
707
708 if (NULL == tmp) {
709 ns_elem->data_seg = seg;
710 } else {
711 tmp->next = seg;
712 }
713 tmp = seg;
714 ns_elem->num_data_seg++;
715 }
716
717 return PMIX_SUCCESS;
718 }
719
720 static int _put_ns_info_to_initial_segment(pmix_common_dstore_ctx_t *ds_ctx,
721 const ns_map_data_t *ns_map, pmix_pshmem_seg_t *metaseg,
722 pmix_pshmem_seg_t *dataseg)
723 {
724 ns_seg_info_t elem;
725 size_t num_elems;
726 num_elems = *((size_t*)(_ESH_SESSION_sm_seg_last(ds_ctx->session_array,
727 ns_map->tbl_idx)->seg_info.seg_base_addr));
728 pmix_dstore_seg_desc_t *last_seg = _ESH_SESSION_sm_seg_last(ds_ctx->session_array, ns_map->tbl_idx);
729 pmix_status_t rc;
730
731 PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
732 "%s:%d:%s", __FILE__, __LINE__, __func__));
733
734 if (ds_ctx->max_ns_num == num_elems) {
735 num_elems = 0;
736 if (NULL == (last_seg = pmix_common_dstor_extend_segment(last_seg, ds_ctx->base_path, ns_map->name,
737 ds_ctx->jobuid, ds_ctx->setjobuid))) {
738 rc = PMIX_ERROR;
739 PMIX_ERROR_LOG(rc);
740 return rc;
741 }
742
743 size_t full = 1;
744 memcpy((uint8_t*)(_ESH_SESSION_sm_seg_last(ds_ctx->session_array, ns_map->tbl_idx)->seg_info.seg_base_addr +
745 sizeof(size_t)), &full, sizeof(size_t));
746 _ESH_SESSION_sm_seg_last(ds_ctx->session_array, ns_map->tbl_idx) = last_seg;
747 memset(_ESH_SESSION_sm_seg_last(ds_ctx->session_array, ns_map->tbl_idx)->seg_info.seg_base_addr,
748 0, ds_ctx->initial_segment_size);
749 }
750 memset(&elem.ns_map, 0, sizeof(elem.ns_map));
751 pmix_strncpy(elem.ns_map.name, ns_map->name, sizeof(elem.ns_map.name)-1);
752 elem.ns_map.tbl_idx = ns_map->tbl_idx;
753 elem.num_meta_seg = 1;
754 elem.num_data_seg = 1;
755 memcpy((uint8_t*)(_ESH_SESSION_sm_seg_last(ds_ctx->session_array, ns_map->tbl_idx)->seg_info.seg_base_addr) +
756 sizeof(size_t) * 2 + num_elems * sizeof(ns_seg_info_t), &elem, sizeof(ns_seg_info_t));
757 num_elems++;
758 memcpy((uint8_t*)(_ESH_SESSION_sm_seg_last(ds_ctx->session_array, ns_map->tbl_idx)->seg_info.seg_base_addr),
759 &num_elems, sizeof(size_t));
760 return PMIX_SUCCESS;
761 }
762
763
764 static void _update_initial_segment_info(pmix_common_dstore_ctx_t *ds_ctx, const ns_map_data_t *ns_map)
765 {
766 pmix_dstore_seg_desc_t *tmp;
767 tmp = _ESH_SESSION_sm_seg_first(ds_ctx->session_array, ns_map->tbl_idx);
768
769 PMIX_OUTPUT_VERBOSE((2, pmix_gds_base_framework.framework_output,
770 "%s:%d:%s", __FILE__, __LINE__, __func__));
771
772
773 do {
774
775 if (NULL == tmp->next && 1 == *((size_t*)((uint8_t*)(tmp->seg_info.seg_base_addr) + sizeof(size_t)))) {
776 tmp->next = pmix_common_dstor_attach_new_segment(PMIX_DSTORE_INITIAL_SEGMENT, ds_ctx->base_path,
777 ns_map->name, tmp->id+1);
778 }
779 tmp = tmp->next;
780 }
781 while (NULL != tmp);
782 }
783
784
785 static ns_seg_info_t *_get_ns_info_from_initial_segment(pmix_common_dstore_ctx_t *ds_ctx,
786 const ns_map_data_t *ns_map)
787 {
788 pmix_status_t rc;
789 size_t i;
790 pmix_dstore_seg_desc_t *tmp;
791 ns_seg_info_t *elem, *cur_elem;
792 elem = NULL;
793 size_t num_elems;
794
795 PMIX_OUTPUT_VERBOSE((2, pmix_gds_base_framework.framework_output,
796 "%s:%d:%s", __FILE__, __LINE__, __func__));
797
798 tmp = _ESH_SESSION_sm_seg_first(ds_ctx->session_array, ns_map->tbl_idx);
799
800 rc = 1;
801
802 do {
803 num_elems = *((size_t*)(tmp->seg_info.seg_base_addr));
804 for (i = 0; i < num_elems; i++) {
805 cur_elem = (ns_seg_info_t*)((uint8_t*)(tmp->seg_info.seg_base_addr) + sizeof(size_t) * 2 + i * sizeof(ns_seg_info_t));
806 if (0 == (rc = strncmp(cur_elem->ns_map.name, ns_map->name, strlen(ns_map->name)+1))) {
807 break;
808 }
809 }
810 if (0 == rc) {
811 elem = cur_elem;
812 break;
813 }
814 tmp = tmp->next;
815 }
816 while (NULL != tmp);
817 return elem;
818 }
819
820 static ns_track_elem_t *_get_track_elem_for_namespace(pmix_common_dstore_ctx_t *ds_ctx,
821 ns_map_data_t *ns_map)
822 {
823 ns_track_elem_t *new_elem = NULL;
824 size_t size = pmix_value_array_get_size(ds_ctx->ns_track_array);
825
826 PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
827 "%s:%d:%s: nspace %s",
828 __FILE__, __LINE__, __func__, ns_map->name));
829
830
831 if (ns_map->track_idx >= 0) {
832 if ((ns_map->track_idx + 1) > (int)size) {
833 return NULL;
834 }
835
836
837 return pmix_value_array_get_item(ds_ctx->ns_track_array, ns_map->track_idx);
838 }
839
840
841
842 if (NULL == (new_elem = pmix_value_array_get_item(ds_ctx->ns_track_array, size))) {
843 return NULL;
844 }
845 PMIX_CONSTRUCT(new_elem, ns_track_elem_t);
846 pmix_strncpy(new_elem->ns_map.name, ns_map->name, sizeof(new_elem->ns_map.name)-1);
847
848 ns_map->track_idx = size;
849
850 return new_elem;
851 }
852
853 static rank_meta_info *_get_rank_meta_info(pmix_common_dstore_ctx_t *ds_ctx, pmix_rank_t rank, pmix_dstore_seg_desc_t *segdesc)
854 {
855 size_t i;
856 rank_meta_info *elem = NULL;
857 pmix_dstore_seg_desc_t *tmp = segdesc;
858 size_t num_elems, rel_offset;
859 int id;
860 rank_meta_info *cur_elem;
861
862 size_t rcount = rank == PMIX_RANK_WILDCARD ? 0 : rank + 1;
863
864 PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
865 "%s:%d:%s",
866 __FILE__, __LINE__, __func__));
867
868 if (1 == ds_ctx->direct_mode) {
869
870
871
872 do {
873 num_elems = *((size_t*)(tmp->seg_info.seg_base_addr));
874 for (i = 0; i < num_elems; i++) {
875 cur_elem = (rank_meta_info*)((uint8_t*)(tmp->seg_info.seg_base_addr) + sizeof(size_t) + i * sizeof(rank_meta_info));
876 if (rcount == cur_elem->rank) {
877 elem = cur_elem;
878 break;
879 }
880 }
881 tmp = tmp->next;
882 }
883 while (NULL != tmp && NULL == elem);
884 } else {
885
886
887 id = rcount/ds_ctx->max_meta_elems;
888 rel_offset = (rcount % ds_ctx->max_meta_elems) * sizeof(rank_meta_info) + sizeof(size_t);
889
890
891 while (NULL != tmp->next && 0 != id) {
892 tmp = tmp->next;
893 id--;
894 }
895 if (0 == id) {
896
897 elem = (rank_meta_info*)((uint8_t*)(tmp->seg_info.seg_base_addr) + rel_offset);
898 if ( 0 == elem->offset) {
899
900 elem = NULL;
901 }
902 }
903 }
904 return elem;
905 }
906
907 static int set_rank_meta_info(pmix_common_dstore_ctx_t *ds_ctx, ns_track_elem_t *ns_info, rank_meta_info *rinfo)
908 {
909
910 pmix_dstore_seg_desc_t *tmp;
911 size_t num_elems, rel_offset;
912 int id, count;
913 rank_meta_info *cur_elem;
914
915 if (!ns_info || !rinfo) {
916 PMIX_ERROR_LOG(PMIX_ERROR);
917 return PMIX_ERROR;
918 }
919
920 PMIX_OUTPUT_VERBOSE((2, pmix_gds_base_framework.framework_output,
921 "%s:%d:%s: nspace %s, add rank %lu offset %lu count %lu meta info",
922 __FILE__, __LINE__, __func__,
923 ns_info->ns_map.name, (unsigned long)rinfo->rank,
924 (unsigned long)rinfo->offset, (unsigned long)rinfo->count));
925
926 tmp = ns_info->meta_seg;
927 if (1 == ds_ctx->direct_mode) {
928
929 while (NULL != tmp->next) {
930 tmp = tmp->next;
931 }
932 num_elems = *((size_t*)(tmp->seg_info.seg_base_addr));
933 if (ds_ctx->max_meta_elems <= num_elems) {
934 PMIX_OUTPUT_VERBOSE((2, pmix_gds_base_framework.framework_output,
935 "%s:%d:%s: extend meta segment for nspace %s",
936 __FILE__, __LINE__, __func__, ns_info->ns_map.name));
937
938 tmp = pmix_common_dstor_extend_segment(tmp, ds_ctx->base_path, ns_info->ns_map.name,
939 ds_ctx->jobuid, ds_ctx->setjobuid);
940 if (NULL == tmp) {
941 PMIX_ERROR_LOG(PMIX_ERROR);
942 return PMIX_ERROR;
943 }
944 ns_info->num_meta_seg++;
945 memset(tmp->seg_info.seg_base_addr, 0, sizeof(rank_meta_info));
946
947 ns_seg_info_t *elem = _get_ns_info_from_initial_segment(ds_ctx, &ns_info->ns_map);
948 if (NULL == elem) {
949 PMIX_ERROR_LOG(PMIX_ERROR);
950 return PMIX_ERROR;
951 }
952 if (ns_info->num_meta_seg != elem->num_meta_seg) {
953 elem->num_meta_seg = ns_info->num_meta_seg;
954 }
955 num_elems = 0;
956 }
957 cur_elem = (rank_meta_info*)((uint8_t*)(tmp->seg_info.seg_base_addr) + sizeof(size_t) + num_elems * sizeof(rank_meta_info));
958 memcpy(cur_elem, rinfo, sizeof(rank_meta_info));
959 num_elems++;
960 memcpy(tmp->seg_info.seg_base_addr, &num_elems, sizeof(size_t));
961 } else {
962
963
964 size_t rcount = rinfo->rank == PMIX_RANK_WILDCARD ? 0 : rinfo->rank + 1;
965 id = rcount/ds_ctx->max_meta_elems;
966 rel_offset = (rcount % ds_ctx->max_meta_elems) * sizeof(rank_meta_info) + sizeof(size_t);
967 count = id;
968
969
970 while (NULL != tmp->next && 0 != count) {
971 tmp = tmp->next;
972 count--;
973 }
974
975 if ((int)ns_info->num_meta_seg < (id+1)) {
976 while ((int)ns_info->num_meta_seg != (id+1)) {
977
978 tmp = pmix_common_dstor_extend_segment(tmp, ds_ctx->base_path, ns_info->ns_map.name,
979 ds_ctx->jobuid, ds_ctx->setjobuid);
980 if (NULL == tmp) {
981 PMIX_ERROR_LOG(PMIX_ERROR);
982 return PMIX_ERROR;
983 }
984 memset(tmp->seg_info.seg_base_addr, 0, sizeof(rank_meta_info));
985 ns_info->num_meta_seg++;
986 }
987
988 ns_seg_info_t *elem = _get_ns_info_from_initial_segment(ds_ctx, &ns_info->ns_map);
989 if (NULL == elem) {
990 PMIX_ERROR_LOG(PMIX_ERROR);
991 return PMIX_ERROR;
992 }
993 if (ns_info->num_meta_seg != elem->num_meta_seg) {
994 elem->num_meta_seg = ns_info->num_meta_seg;
995 }
996 }
997
998 cur_elem = (rank_meta_info*)((uint8_t*)(tmp->seg_info.seg_base_addr) + rel_offset);
999 memcpy(cur_elem, rinfo, sizeof(rank_meta_info));
1000 }
1001 return PMIX_SUCCESS;
1002 }
1003
1004 static uint8_t *_get_data_region_by_offset(pmix_common_dstore_ctx_t *ds_ctx, pmix_dstore_seg_desc_t *segdesc, size_t offset)
1005 {
1006 pmix_dstore_seg_desc_t *tmp = segdesc;
1007 size_t rel_offset = offset;
1008 uint8_t *dataaddr = NULL;
1009
1010 PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
1011 "%s:%d:%s",
1012 __FILE__, __LINE__, __func__));
1013
1014
1015 do {
1016 if (rel_offset >= ds_ctx->data_segment_size) {
1017 rel_offset -= ds_ctx->data_segment_size;
1018 } else {
1019 dataaddr = tmp->seg_info.seg_base_addr + rel_offset;
1020 }
1021 tmp = tmp->next;
1022 } while (NULL != tmp && NULL == dataaddr);
1023
1024 return dataaddr;
1025 }
1026
1027 static size_t get_free_offset(pmix_common_dstore_ctx_t *ds_ctx, pmix_dstore_seg_desc_t *data_seg)
1028 {
1029 size_t offset;
1030 pmix_dstore_seg_desc_t *tmp;
1031 int id = 0;
1032 tmp = data_seg;
1033
1034 while (NULL != tmp->next) {
1035 tmp = tmp->next;
1036 id++;
1037 }
1038 offset = *((size_t*)(tmp->seg_info.seg_base_addr));
1039 if (0 == offset) {
1040
1041 offset = sizeof(size_t);
1042 }
1043 return (id * ds_ctx->data_segment_size + offset);
1044 }
1045
1046 static int put_empty_ext_slot(pmix_common_dstore_ctx_t *ds_ctx, pmix_dstore_seg_desc_t *dataseg)
1047 {
1048 size_t global_offset, rel_offset, data_ended, val = 0;
1049 uint8_t *addr;
1050 pmix_status_t rc;
1051
1052 global_offset = get_free_offset(ds_ctx, dataseg);
1053 rel_offset = global_offset % ds_ctx->data_segment_size;
1054 if (rel_offset + PMIX_DS_SLOT_SIZE(ds_ctx) > ds_ctx->data_segment_size) {
1055 PMIX_ERROR_LOG(PMIX_ERROR);
1056 return PMIX_ERROR;
1057 }
1058 addr = _get_data_region_by_offset(ds_ctx, dataseg, global_offset);
1059 PMIX_DS_PUT_KEY(rc, ds_ctx, addr, ESH_REGION_EXTENSION, (void*)&val, sizeof(size_t));
1060 if (rc != PMIX_SUCCESS) {
1061 PMIX_ERROR_LOG(rc);
1062 return rc;
1063 }
1064
1065 data_ended = rel_offset + PMIX_DS_SLOT_SIZE(ds_ctx);
1066 addr = (uint8_t*)(addr - rel_offset);
1067 memcpy(addr, &data_ended, sizeof(size_t));
1068 return PMIX_SUCCESS;
1069 }
1070
1071 static size_t put_data_to_the_end(pmix_common_dstore_ctx_t *ds_ctx, ns_track_elem_t *ns_info,
1072 pmix_dstore_seg_desc_t *dataseg, char *key, void *buffer, size_t size)
1073 {
1074 size_t offset, id = 0;
1075 pmix_dstore_seg_desc_t *tmp;
1076 size_t global_offset, data_ended;
1077 uint8_t *addr;
1078 pmix_status_t rc;
1079
1080 PMIX_OUTPUT_VERBOSE((2, pmix_gds_base_framework.framework_output,
1081 "%s:%d:%s: key %s",
1082 __FILE__, __LINE__, __func__, key));
1083
1084 tmp = dataseg;
1085 while (NULL != tmp->next) {
1086 tmp = tmp->next;
1087 id++;
1088 }
1089 global_offset = get_free_offset(ds_ctx, dataseg);
1090 offset = global_offset % ds_ctx->data_segment_size;
1091
1092
1093
1094 if ((sizeof(size_t) + PMIX_DS_KEY_SIZE(ds_ctx, key, size) + PMIX_DS_SLOT_SIZE(ds_ctx)) >
1095 ds_ctx->data_segment_size) {
1096
1097
1098 offset = 0;
1099 pmix_output(0, "PLEASE set NS_DATA_SEG_SIZE to value which is larger when %lu.",
1100 (unsigned long)(sizeof(size_t) + strlen(key) + 1 + sizeof(size_t) +
1101 size + PMIX_DS_SLOT_SIZE(ds_ctx)));
1102 return offset;
1103 }
1104
1105
1106
1107
1108
1109
1110
1111
1112 if ( (0 == offset) || ( (offset + PMIX_DS_KEY_SIZE(ds_ctx, key, size) +
1113 PMIX_DS_SLOT_SIZE(ds_ctx)) > ds_ctx->data_segment_size) ) {
1114 id++;
1115
1116 tmp = pmix_common_dstor_extend_segment(tmp, ds_ctx->base_path, ns_info->ns_map.name,
1117 ds_ctx->jobuid, ds_ctx->setjobuid);
1118 if (NULL == tmp) {
1119 PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
1120 offset = 0;
1121 return offset;
1122 }
1123 ns_info->num_data_seg++;
1124
1125 ns_seg_info_t *elem = _get_ns_info_from_initial_segment(ds_ctx, &ns_info->ns_map);
1126 if (NULL == elem) {
1127 PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
1128 offset = 0;
1129 return offset;
1130 }
1131 elem->num_data_seg++;
1132 offset = sizeof(size_t);
1133 }
1134 global_offset = offset + id * ds_ctx->data_segment_size;
1135 addr = (uint8_t*)(tmp->seg_info.seg_base_addr)+offset;
1136 PMIX_DS_PUT_KEY(rc, ds_ctx, addr, key, buffer, size);
1137 if (rc != PMIX_SUCCESS) {
1138 PMIX_ERROR_LOG(rc);
1139 return 0;
1140 }
1141
1142
1143 data_ended = offset + PMIX_DS_KEY_SIZE(ds_ctx, key, size);
1144 addr = (uint8_t*)(tmp->seg_info.seg_base_addr);
1145 memcpy(addr, &data_ended, sizeof(size_t));
1146 PMIX_OUTPUT_VERBOSE((1, pmix_gds_base_framework.framework_output,
1147 "%s:%d:%s: key %s, rel start offset %lu, rel end offset %lu, abs shift %lu size %lu",
1148 __FILE__, __LINE__, __func__,
1149 key, (unsigned long)offset,
1150 (unsigned long)data_ended,
1151 (unsigned long)(id * ds_ctx->data_segment_size),
1152 (unsigned long)size));
1153 return global_offset;
1154 }
1155
1156 static int pmix_sm_store(pmix_common_dstore_ctx_t *ds_ctx, ns_track_elem_t *ns_info,
1157 pmix_rank_t rank, pmix_kval_t *kval, rank_meta_info **rinfo, int data_exist)
1158 {
1159 size_t offset, size, kval_cnt;
1160 pmix_buffer_t buffer;
1161 pmix_status_t rc;
1162 pmix_dstore_seg_desc_t *datadesc;
1163 uint8_t *addr;
1164
1165 PMIX_OUTPUT_VERBOSE((2, pmix_gds_base_framework.framework_output,
1166 "%s:%d:%s: for rank %u, replace flag %d",
1167 __FILE__, __LINE__, __func__, rank, data_exist));
1168
1169 datadesc = ns_info->data_seg;
1170
1171 PMIX_CONSTRUCT(&buffer, pmix_buffer_t);
1172 PMIX_BFROPS_PACK(rc, _client_peer(ds_ctx), &buffer, kval->value, 1, PMIX_VALUE);
1173 if (PMIX_SUCCESS != rc) {
1174 PMIX_ERROR_LOG(rc);
1175 goto exit;
1176 }
1177 size = buffer.bytes_used;
1178
1179 if (0 == data_exist) {
1180
1181 size_t free_offset;
1182 free_offset = get_free_offset(ds_ctx, datadesc);
1183 offset = put_data_to_the_end(ds_ctx, ns_info, datadesc, kval->key, buffer.base_ptr, size);
1184 if (0 == offset) {
1185
1186 rc = PMIX_ERROR;
1187 PMIX_ERROR_LOG(rc);
1188 goto exit;
1189 }
1190
1191
1192
1193
1194
1195 if (free_offset != offset && NULL != *rinfo) {
1196
1197
1198
1199
1200 addr = _get_data_region_by_offset(ds_ctx, datadesc, free_offset);
1201 PMIX_DS_PUT_KEY(rc, ds_ctx, addr, ESH_REGION_EXTENSION, (void*)&offset, sizeof(size_t));
1202 if (rc != PMIX_SUCCESS) {
1203 PMIX_ERROR_LOG(rc);
1204 return 0;
1205 }
1206 }
1207 if (NULL == *rinfo) {
1208 *rinfo = (rank_meta_info*)malloc(sizeof(rank_meta_info));
1209 (*rinfo)->rank = rank;
1210 (*rinfo)->offset = offset;
1211 (*rinfo)->count = 0;
1212 }
1213 (*rinfo)->count++;
1214 } else if (NULL != *rinfo) {
1215
1216 addr = _get_data_region_by_offset(ds_ctx, datadesc, (*rinfo)->offset);
1217 if (NULL == addr) {
1218 rc = PMIX_ERROR;
1219 PMIX_ERROR_LOG(rc);
1220 goto exit;
1221 }
1222
1223
1224
1225
1226
1227 kval_cnt = (*rinfo)->count;
1228 int add_to_the_end = 1;
1229 while (0 < kval_cnt) {
1230
1231
1232
1233
1234
1235
1236
1237
1238 if(PMIX_DS_KEY_IS_EXTSLOT(ds_ctx, addr)) {
1239 memcpy(&offset, PMIX_DS_DATA_PTR(ds_ctx, addr), sizeof(size_t));
1240 if (0 < offset) {
1241 PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
1242 "%s:%d:%s: for rank %lu, replace flag %d %s is filled with %lu value",
1243 __FILE__, __LINE__, __func__,
1244 (unsigned long)rank, data_exist,
1245 ESH_REGION_EXTENSION, (unsigned long)offset));
1246
1247 addr = _get_data_region_by_offset(ds_ctx, datadesc, offset);
1248 if (NULL == addr) {
1249 rc = PMIX_ERROR;
1250 PMIX_ERROR_LOG(rc);
1251 goto exit;
1252 }
1253 } else {
1254
1255 }
1256 } else if (0 == strncmp(PMIX_DS_KNAME_PTR(ds_ctx, addr), kval->key,
1257 PMIX_DS_KNAME_LEN(ds_ctx, kval->key))) {
1258 PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
1259 "%s:%d:%s: for rank %u, replace flag %d found target key %s",
1260 __FILE__, __LINE__, __func__, rank, data_exist, kval->key));
1261
1262 if (PMIX_DS_DATA_SIZE(ds_ctx, addr, PMIX_DS_DATA_PTR(ds_ctx, addr)) != size) {
1263
1264
1265 PMIX_DS_KEY_SET_INVALID(ds_ctx, addr);
1266
1267 (*rinfo)->count--;
1268 kval_cnt--;
1269
1270 addr += PMIX_DS_KV_SIZE(ds_ctx, addr);
1271 PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
1272 "%s:%d:%s: for rank %u, replace flag %d mark key %s regions as invalidated. put new data at the end.",
1273 __FILE__, __LINE__, __func__, rank, data_exist, kval->key));
1274 } else {
1275 PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
1276 "%s:%d:%s: for rank %u, replace flag %d replace data for key %s type %d in place",
1277 __FILE__, __LINE__, __func__, rank, data_exist, kval->key, kval->value->type));
1278
1279 memset(PMIX_DS_DATA_PTR(ds_ctx, addr), 0,
1280 PMIX_DS_DATA_SIZE(ds_ctx, addr, PMIX_DS_DATA_PTR(ds_ctx, addr)));
1281 memcpy(PMIX_DS_DATA_PTR(ds_ctx, addr), buffer.base_ptr, size);
1282 addr += PMIX_DS_KV_SIZE(ds_ctx, addr);
1283 add_to_the_end = 0;
1284 break;
1285 }
1286 } else {
1287 PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
1288 "%s:%d:%s: for rank %u, replace flag %d skip %s key, look for %s key",
1289 __FILE__, __LINE__, __func__, rank, data_exist,
1290 PMIX_DS_KNAME_PTR(ds_ctx, addr), kval->key));
1291
1292 if (!PMIX_DS_KEY_IS_INVALID(ds_ctx, addr)) {
1293
1294 kval_cnt--;
1295 }
1296
1297 addr += PMIX_DS_KV_SIZE(ds_ctx, addr);
1298 }
1299 }
1300 if (1 == add_to_the_end) {
1301
1302
1303
1304 size_t free_offset;
1305 (*rinfo)->count++;
1306 free_offset = get_free_offset(ds_ctx, datadesc);
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332 if (PMIX_DS_KEY_IS_EXTSLOT(ds_ctx, addr)){
1333
1334 pmix_dstore_seg_desc_t *ldesc = datadesc;
1335 uint8_t *segstart;
1336 size_t offs_past_extslot = 0;
1337 size_t offs_cur_segment = 0;
1338 while (NULL != ldesc->next) {
1339 ldesc = ldesc->next;
1340 }
1341
1342
1343 offs_cur_segment = free_offset % ds_ctx->data_segment_size;
1344 segstart = ldesc->seg_info.seg_base_addr;
1345 offs_past_extslot = (addr + PMIX_DS_KV_SIZE(ds_ctx, addr)) - segstart;
1346
1347
1348
1349
1350
1351
1352
1353 if( ( (addr > segstart) && (addr < (segstart + offs_cur_segment)) )
1354 && (offs_cur_segment == offs_past_extslot) ) {
1355
1356
1357 size_t new_offset = addr - segstart;
1358
1359
1360 memcpy(segstart, &new_offset, sizeof(size_t));
1361
1362 free_offset = get_free_offset(ds_ctx, datadesc);
1363 }
1364 }
1365
1366
1367 offset = put_data_to_the_end(ds_ctx, ns_info, datadesc, kval->key, buffer.base_ptr, size);
1368 if (0 == offset) {
1369 rc = PMIX_ERROR;
1370 PMIX_ERROR_LOG(rc);
1371 goto exit;
1372 }
1373
1374
1375
1376
1377
1378 if (PMIX_DS_KEY_IS_EXTSLOT(ds_ctx, addr)) {
1379 PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
1380 "%s:%d:%s: for rank %u, replace flag %d %s should be filled with offset %lu value",
1381 __FILE__, __LINE__, __func__, rank, data_exist, ESH_REGION_EXTENSION, offset));
1382 memcpy(PMIX_DS_DATA_PTR(ds_ctx, addr), &offset, sizeof(size_t));
1383 } else {
1384
1385
1386
1387
1388
1389
1390 if (free_offset != offset) {
1391
1392 PMIX_DS_PUT_KEY(rc, ds_ctx, addr, ESH_REGION_EXTENSION, (void*)&offset, sizeof(size_t));
1393 if (rc != PMIX_SUCCESS) {
1394 PMIX_ERROR_LOG(rc);
1395 return 0;
1396 }
1397 }
1398 }
1399 PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
1400 "%s:%d:%s: for rank %u, replace flag %d item not found ext slot empty, put key %s to the end",
1401 __FILE__, __LINE__, __func__, rank, data_exist, kval->key));
1402 }
1403 }
1404 exit:
1405 PMIX_DESTRUCT(&buffer);
1406 return rc;
1407 }
1408
1409 static int _store_data_for_rank(pmix_common_dstore_ctx_t *ds_ctx, ns_track_elem_t *ns_info,
1410 pmix_rank_t rank, pmix_buffer_t *buf)
1411 {
1412 pmix_status_t rc;
1413
1414 pmix_kval_t *kp;
1415 pmix_dstore_seg_desc_t *metadesc, *datadesc;
1416 int32_t cnt;
1417
1418 rank_meta_info *rinfo = NULL;
1419 size_t num_elems, free_offset, new_free_offset;
1420 int data_exist;
1421
1422 PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
1423 "%s:%d:%s: for rank %u", __FILE__, __LINE__, __func__, rank));
1424
1425 metadesc = ns_info->meta_seg;
1426 datadesc = ns_info->data_seg;
1427
1428 if (NULL == datadesc || NULL == metadesc) {
1429 rc = PMIX_ERR_BAD_PARAM;
1430 PMIX_ERROR_LOG(rc);
1431 return rc;
1432 }
1433
1434 num_elems = *((size_t*)(metadesc->seg_info.seg_base_addr));
1435 data_exist = 0;
1436
1437
1438 if (0 < num_elems || 0 == ds_ctx->direct_mode) {
1439
1440 rinfo = _get_rank_meta_info(ds_ctx, rank, metadesc);
1441 if (NULL != rinfo) {
1442 data_exist = 1;
1443 }
1444 }
1445
1446
1447
1448
1449 free_offset = get_free_offset(ds_ctx, datadesc);
1450 cnt = 1;
1451 kp = PMIX_NEW(pmix_kval_t);
1452 PMIX_BFROPS_UNPACK(rc, pmix_globals.mypeer, buf, kp, &cnt, PMIX_KVAL);
1453 while(PMIX_SUCCESS == rc) {
1454 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1455 "pmix: unpacked key %s", kp->key);
1456 if (PMIX_SUCCESS != (rc = pmix_sm_store(ds_ctx, ns_info, rank, kp, &rinfo, data_exist))) {
1457 PMIX_ERROR_LOG(rc);
1458 if (NULL != rinfo) {
1459 free(rinfo);
1460 }
1461 return rc;
1462 }
1463 PMIX_RELEASE(kp);
1464 cnt = 1;
1465 kp = PMIX_NEW(pmix_kval_t);
1466 PMIX_BFROPS_UNPACK(rc, pmix_globals.mypeer, buf, kp, &cnt, PMIX_KVAL);
1467 }
1468
1469 PMIX_RELEASE(kp);
1470
1471 if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
1472 PMIX_ERROR_LOG(rc);
1473
1474 } else {
1475 rc = PMIX_SUCCESS;
1476 }
1477
1478
1479
1480
1481
1482
1483 new_free_offset = get_free_offset(ds_ctx, datadesc);
1484 if (new_free_offset != free_offset) {
1485
1486
1487
1488
1489
1490
1491 rc = put_empty_ext_slot(ds_ctx, ns_info->data_seg);
1492 if (PMIX_SUCCESS != rc) {
1493 if ((0 == data_exist) && NULL != rinfo) {
1494 free(rinfo);
1495 }
1496 PMIX_ERROR_LOG(rc);
1497 return rc;
1498 }
1499 }
1500
1501
1502
1503 if (0 == data_exist) {
1504 set_rank_meta_info(ds_ctx, ns_info, rinfo);
1505 if (NULL != rinfo) {
1506 free(rinfo);
1507 }
1508 }
1509
1510 return rc;
1511 }
1512
1513 static inline ssize_t _get_univ_size(pmix_common_dstore_ctx_t *ds_ctx, const char *nspace)
1514 {
1515 ssize_t nprocs = 0;
1516 pmix_value_t *val;
1517 int rc;
1518
1519 rc = _dstore_fetch(ds_ctx, nspace, PMIX_RANK_WILDCARD, PMIX_UNIV_SIZE, &val);
1520 if( PMIX_SUCCESS != rc ) {
1521 PMIX_ERROR_LOG(rc);
1522 return rc;
1523 }
1524 if( val->type != PMIX_UINT32 ){
1525 rc = PMIX_ERR_BAD_PARAM;
1526 PMIX_ERROR_LOG(rc);
1527 return rc;
1528 }
1529 nprocs = (ssize_t)val->data.uint32;
1530 PMIX_VALUE_RELEASE(val);
1531 return nprocs;
1532 }
1533
1534 PMIX_EXPORT pmix_status_t pmix_common_dstor_cache_job_info(pmix_common_dstore_ctx_t *ds_ctx,
1535 struct pmix_namespace_t *ns,
1536 pmix_info_t info[], size_t ninfo)
1537 {
1538 return PMIX_SUCCESS;
1539 }
1540
1541
1542 pmix_common_dstore_ctx_t *pmix_common_dstor_init(const char *ds_name, pmix_info_t info[], size_t ninfo,
1543 pmix_common_lock_callbacks_t *lock_cb,
1544 pmix_common_dstore_file_cbs_t *file_cb)
1545 {
1546 pmix_status_t rc;
1547 size_t n;
1548 char *dstor_tmpdir = NULL;
1549 size_t tbl_idx = 0;
1550 ns_map_data_t *ns_map = NULL;
1551 pmix_common_dstore_ctx_t *ds_ctx = NULL;
1552
1553 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1554 "pmix:gds:dstore init");
1555
1556 ds_ctx = (pmix_common_dstore_ctx_t*) malloc(sizeof(*ds_ctx));
1557 if (NULL == ds_ctx) {
1558 PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
1559 return NULL;
1560 }
1561 memset(ds_ctx, 0, sizeof(*ds_ctx));
1562
1563
1564 ds_ctx->lock_cbs = lock_cb;
1565 ds_ctx->file_cbs = file_cb;
1566
1567
1568 if( PMIX_SUCCESS != (rc = pmix_mca_base_framework_open(&pmix_pshmem_base_framework, 0)) ) {
1569 PMIX_ERROR_LOG(rc);
1570 goto err_exit;
1571 }
1572 if( PMIX_SUCCESS != (rc = pmix_pshmem_base_select()) ) {
1573 PMIX_ERROR_LOG(rc);
1574 goto err_exit;
1575 }
1576
1577 ds_ctx->jobuid = getuid();
1578 ds_ctx->setjobuid = 0;
1579
1580 if (PMIX_SUCCESS != (rc = _esh_tbls_init(ds_ctx))) {
1581 PMIX_ERROR_LOG(rc);
1582 goto err_exit;
1583 }
1584
1585 rc = pmix_pshmem.init();
1586 if (PMIX_SUCCESS != rc) {
1587 PMIX_ERROR_LOG(rc);
1588 goto err_exit;
1589 }
1590
1591 _set_constants_from_env(ds_ctx);
1592 ds_ctx->ds_name = strdup(ds_name);
1593
1594
1595 if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) {
1596 ds_ctx->session_map_search = (session_map_search_fn_t)_esh_session_map_search_server;
1597
1598
1599 if (NULL != info) {
1600 for (n=0; n < ninfo; n++) {
1601 if (0 == strcmp(PMIX_USERID, info[n].key)) {
1602 ds_ctx->jobuid = info[n].value.data.uint32;
1603 ds_ctx->setjobuid = 1;
1604 continue;
1605 }
1606 if (0 == strcmp(PMIX_DSTPATH, info[n].key)) {
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617 if( PMIX_STRING != info[n].value.type ){
1618 rc = PMIX_ERR_BAD_PARAM;
1619 PMIX_ERROR_LOG(rc);
1620 goto err_exit;
1621 }
1622 dstor_tmpdir = (char*)info[n].value.data.string;
1623 continue;
1624 }
1625 if (0 == strcmp(PMIX_SERVER_TMPDIR, info[n].key)) {
1626 if( PMIX_STRING != info[n].value.type ){
1627 rc = PMIX_ERR_BAD_PARAM;
1628 PMIX_ERROR_LOG(rc);
1629 goto err_exit;
1630 }
1631 if (NULL == dstor_tmpdir) {
1632 dstor_tmpdir = (char*)info[n].value.data.string;
1633 }
1634 continue;
1635 }
1636 }
1637 }
1638
1639 if (NULL == dstor_tmpdir) {
1640 if (NULL == (dstor_tmpdir = getenv("TMPDIR"))) {
1641 if (NULL == (dstor_tmpdir = getenv("TEMP"))) {
1642 if (NULL == (dstor_tmpdir = getenv("TMP"))) {
1643 dstor_tmpdir = "/tmp";
1644 }
1645 }
1646 }
1647 }
1648
1649 rc = asprintf(&ds_ctx->base_path, "%s/pmix_dstor_%s_%d", dstor_tmpdir,
1650 ds_ctx->ds_name, getpid());
1651 if ((0 > rc) || (NULL == ds_ctx->base_path)) {
1652 rc = PMIX_ERR_OUT_OF_RESOURCE;
1653 PMIX_ERROR_LOG(rc);
1654 goto err_exit;
1655 }
1656
1657 if (0 != mkdir(ds_ctx->base_path, 0770)) {
1658 if (EEXIST != errno) {
1659 rc = PMIX_ERROR;
1660 PMIX_ERROR_LOG(rc);
1661 goto err_exit;
1662 }
1663 }
1664 if (ds_ctx->setjobuid > 0) {
1665 if (chown(ds_ctx->base_path, (uid_t) ds_ctx->jobuid, (gid_t) -1) < 0){
1666 rc = PMIX_ERR_NO_PERMISSIONS;
1667 PMIX_ERROR_LOG(rc);
1668 goto err_exit;
1669 }
1670 }
1671 ds_ctx->session_map_search = _esh_session_map_search_server;
1672 return ds_ctx;
1673 }
1674
1675 else {
1676 char *env_name = NULL;
1677 int ds_ver = 0;
1678
1679 sscanf(ds_ctx->ds_name, "ds%d", &ds_ver);
1680 if (0 == ds_ver) {
1681 rc = PMIX_ERR_INIT;
1682 PMIX_ERROR_LOG(rc);
1683 goto err_exit;
1684 }
1685 if (0 > asprintf(&env_name, PMIX_DSTORE_VER_BASE_PATH_FMT, ds_ver)) {
1686 rc = PMIX_ERR_NOMEM;
1687 PMIX_ERROR_LOG(rc);
1688 goto err_exit;
1689 }
1690 dstor_tmpdir = getenv(env_name);
1691 free(env_name);
1692
1693 if (NULL == dstor_tmpdir) {
1694 dstor_tmpdir = getenv(PMIX_DSTORE_ESH_BASE_PATH);
1695 }
1696 if (NULL == dstor_tmpdir){
1697 rc = PMIX_ERR_NOT_AVAILABLE;
1698 goto err_exit;
1699 }
1700 if (NULL == (ds_ctx->base_path = strdup(dstor_tmpdir))) {
1701 rc = PMIX_ERR_OUT_OF_RESOURCE;
1702 PMIX_ERROR_LOG(rc);
1703 goto err_exit;
1704 }
1705 ds_ctx->session_map_search = _esh_session_map_search_client;
1706
1707 if (0 != pthread_mutex_init(&ds_ctx->lock, NULL)) {
1708 rc = PMIX_ERR_INIT;
1709 PMIX_ERROR_LOG(rc);
1710 goto err_exit;
1711 }
1712 }
1713
1714 rc = _esh_session_tbl_add(ds_ctx, &tbl_idx);
1715 if (PMIX_SUCCESS != rc) {
1716 PMIX_ERROR_LOG(rc);
1717 goto err_exit;
1718 }
1719
1720 char *nspace = NULL;
1721
1722 if (NULL == (nspace = getenv("PMIX_NAMESPACE"))) {
1723 rc = PMIX_ERR_INVALID_NAMESPACE;
1724 PMIX_ERROR_LOG(rc);
1725 goto err_exit;
1726 }
1727
1728 rc = ds_ctx->lock_cbs->init(&_ESH_SESSION_lock(ds_ctx->session_array, tbl_idx), ds_ctx->base_path, nspace, 1, ds_ctx->jobuid, ds_ctx->setjobuid);
1729 if (rc != PMIX_SUCCESS) {
1730 goto err_exit;
1731 }
1732 ns_map = _esh_session_map(ds_ctx, nspace, 0, tbl_idx);
1733 if (NULL == ns_map) {
1734 rc = PMIX_ERR_OUT_OF_RESOURCE;
1735 PMIX_ERROR_LOG(rc);
1736 goto err_exit;
1737 }
1738
1739 if (PMIX_SUCCESS != (rc =_esh_session_init(ds_ctx, tbl_idx, ns_map, 1,
1740 ds_ctx->jobuid, ds_ctx->setjobuid))) {
1741 PMIX_ERROR_LOG(rc);
1742 goto err_exit;
1743 }
1744
1745 return ds_ctx;
1746 err_exit:
1747 pmix_common_dstor_finalize(ds_ctx);
1748 return NULL;
1749 }
1750
1751 PMIX_EXPORT void pmix_common_dstor_finalize(pmix_common_dstore_ctx_t *ds_ctx)
1752 {
1753 struct stat st = {0};
1754 pmix_status_t rc = PMIX_SUCCESS;
1755
1756 PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
1757 "%s:%d:%s", __FILE__, __LINE__, __func__));
1758
1759 _esh_sessions_cleanup(ds_ctx);
1760 _esh_ns_map_cleanup(ds_ctx);
1761 _esh_ns_track_cleanup(ds_ctx);
1762
1763 pmix_pshmem.finalize();
1764
1765 if (NULL != ds_ctx->base_path){
1766 if(PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) {
1767 if (lstat(ds_ctx->base_path, &st) >= 0){
1768 if (PMIX_SUCCESS != (rc = _esh_dir_del(ds_ctx->base_path))) {
1769 PMIX_ERROR_LOG(rc);
1770 }
1771 }
1772 }
1773 free(ds_ctx->base_path);
1774 ds_ctx->base_path = NULL;
1775 }
1776 if (NULL != ds_ctx->clients_peer) {
1777 PMIX_RELEASE(ds_ctx->clients_peer->nptr);
1778 PMIX_RELEASE(ds_ctx->clients_peer);
1779 }
1780
1781 if( PMIX_SUCCESS != (rc = pmix_mca_base_framework_close(&pmix_pshmem_base_framework)) ) {
1782 PMIX_ERROR_LOG(rc);
1783 }
1784 free(ds_ctx->ds_name);
1785 free(ds_ctx->base_path);
1786 free(ds_ctx);
1787 }
1788
1789 static pmix_status_t _dstore_store_nolock(pmix_common_dstore_ctx_t *ds_ctx,
1790 ns_map_data_t *ns_map,
1791 pmix_rank_t rank,
1792 pmix_kval_t *kv)
1793 {
1794 pmix_status_t rc = PMIX_SUCCESS;
1795 ns_track_elem_t *elem;
1796 pmix_buffer_t xfer;
1797 ns_seg_info_t ns_info;
1798
1799 if (NULL == kv) {
1800 return PMIX_ERROR;
1801 }
1802
1803 PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
1804 "%s:%d:%s: for %s:%u",
1805 __FILE__, __LINE__, __func__, ns_map->name, rank));
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817 elem = _get_track_elem_for_namespace(ds_ctx, ns_map);
1818 if (NULL == elem) {
1819 rc = PMIX_ERR_OUT_OF_RESOURCE;
1820 PMIX_ERROR_LOG(rc);
1821 goto exit;
1822 }
1823
1824
1825
1826 if (NULL == elem->meta_seg || NULL == elem->data_seg) {
1827 memset(&ns_info.ns_map, 0, sizeof(ns_info.ns_map));
1828 pmix_strncpy(ns_info.ns_map.name, ns_map->name, sizeof(ns_info.ns_map.name)-1);
1829 ns_info.ns_map.tbl_idx = ns_map->tbl_idx;
1830 ns_info.num_meta_seg = 1;
1831 ns_info.num_data_seg = 1;
1832 rc = _update_ns_elem(ds_ctx, elem, &ns_info);
1833 if (PMIX_SUCCESS != rc || NULL == elem->meta_seg || NULL == elem->data_seg) {
1834 PMIX_ERROR_LOG(rc);
1835 goto exit;
1836 }
1837
1838
1839 memset(elem->meta_seg->seg_info.seg_base_addr, 0, ds_ctx->meta_segment_size);
1840 memset(elem->data_seg->seg_info.seg_base_addr, 0, ds_ctx->data_segment_size);
1841
1842
1843 rc = _put_ns_info_to_initial_segment(ds_ctx, ns_map, &elem->meta_seg->seg_info, &elem->data_seg->seg_info);
1844 if (PMIX_SUCCESS != rc) {
1845 PMIX_ERROR_LOG(rc);
1846 goto exit;
1847 }
1848 }
1849
1850
1851
1852 PMIX_CONSTRUCT(&xfer, pmix_buffer_t);
1853 PMIX_LOAD_BUFFER(pmix_globals.mypeer, &xfer, kv->value->data.bo.bytes, kv->value->data.bo.size);
1854
1855 rc = _store_data_for_rank(ds_ctx, elem, rank, &xfer);
1856
1857 PMIX_DESTRUCT(&xfer);
1858
1859 if (PMIX_SUCCESS != rc) {
1860 PMIX_ERROR_LOG(rc);
1861 goto exit;
1862 }
1863
1864 exit:
1865 return rc;
1866 }
1867
1868 PMIX_EXPORT pmix_status_t pmix_common_dstor_store(pmix_common_dstore_ctx_t *ds_ctx,
1869 const pmix_proc_t *proc,
1870 pmix_scope_t scope,
1871 pmix_kval_t *kv)
1872 {
1873 pmix_status_t rc = PMIX_SUCCESS;
1874 ns_map_data_t *ns_map;
1875 pmix_kval_t *kv2;
1876 pmix_buffer_t tmp;
1877
1878 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1879 "[%s:%d] gds: dstore store for key '%s' scope %d",
1880 proc->nspace, proc->rank, kv->key, scope);
1881
1882 if (PMIX_PROC_IS_CLIENT(pmix_globals.mypeer)) {
1883 rc = PMIX_ERR_NOT_SUPPORTED;
1884 PMIX_ERROR_LOG(rc);
1885 return rc;
1886 }
1887
1888 kv2 = PMIX_NEW(pmix_kval_t);
1889 PMIX_VALUE_CREATE(kv2->value, 1);
1890 kv2->value->type = PMIX_BYTE_OBJECT;
1891
1892 PMIX_CONSTRUCT(&tmp, pmix_buffer_t);
1893
1894 PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &tmp, kv, 1, PMIX_KVAL);
1895 PMIX_UNLOAD_BUFFER(&tmp, kv2->value->data.bo.bytes, kv2->value->data.bo.size);
1896
1897 if (NULL == (ns_map = ds_ctx->session_map_search(ds_ctx, proc->nspace))) {
1898 rc = PMIX_ERROR;
1899 PMIX_ERROR_LOG(rc);
1900 goto exit;
1901 }
1902
1903
1904 rc = _ESH_LOCK(ds_ctx, ns_map->tbl_idx, wr_lock);
1905 if (PMIX_SUCCESS != rc) {
1906 PMIX_ERROR_LOG(rc);
1907 goto exit;
1908 }
1909
1910 rc = _dstore_store_nolock(ds_ctx, ns_map, proc->rank, kv2);
1911 if (PMIX_SUCCESS != rc) {
1912 PMIX_ERROR_LOG(rc);
1913 goto exit;
1914 }
1915
1916
1917 rc = _ESH_LOCK(ds_ctx, ns_map->tbl_idx, wr_unlock);
1918 if (PMIX_SUCCESS != rc) {
1919 PMIX_ERROR_LOG(rc);
1920 goto exit;
1921 }
1922
1923 exit:
1924 PMIX_RELEASE(kv2);
1925 PMIX_DESTRUCT(&tmp);
1926
1927 return rc;
1928 }
1929
1930 static pmix_status_t _dstore_fetch(pmix_common_dstore_ctx_t *ds_ctx,
1931 const char *nspace, pmix_rank_t rank,
1932 const char *key, pmix_value_t **kvs)
1933 {
1934 ns_seg_info_t *ns_info = NULL;
1935 pmix_status_t rc = PMIX_ERROR, lock_rc;
1936 ns_track_elem_t *elem;
1937 rank_meta_info *rinfo = NULL;
1938 size_t kval_cnt = 0;
1939 pmix_dstore_seg_desc_t *meta_seg, *data_seg;
1940 uint8_t *addr;
1941 pmix_buffer_t buffer;
1942 pmix_value_t val, *kval = NULL;
1943 uint32_t nprocs;
1944 pmix_rank_t cur_rank;
1945 ns_map_data_t *ns_map = NULL;
1946 bool all_ranks_found = true;
1947 bool key_found = false;
1948 pmix_info_t *info = NULL;
1949 size_t ninfo;
1950 size_t keyhash = 0;
1951 bool lock_is_set = false;
1952
1953 PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
1954 "%s:%d:%s: for %s:%u look for key %s",
1955 __FILE__, __LINE__, __func__, nspace, rank, key));
1956
1957 if ((PMIX_RANK_UNDEF == rank) && (NULL == key)) {
1958 PMIX_OUTPUT_VERBOSE((7, pmix_gds_base_framework.framework_output,
1959 "dstore: Does not support passed parameters"));
1960 rc = PMIX_ERR_BAD_PARAM;
1961 goto error;
1962 }
1963
1964 PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
1965 "%s:%d:%s: for %s:%u look for key %s",
1966 __FILE__, __LINE__, __func__, nspace, rank, key));
1967
1968
1969 if (!PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) {
1970 if (0 != (rc = pthread_mutex_lock(&ds_ctx->lock))) {
1971 goto error;
1972 }
1973 lock_is_set = true;
1974 }
1975
1976 if (NULL == (ns_map = ds_ctx->session_map_search(ds_ctx, nspace))) {
1977
1978
1979
1980 rc = PMIX_ERR_FATAL;
1981 goto error;
1982 }
1983
1984 if (NULL == kvs) {
1985 rc = PMIX_ERR_FATAL;
1986 goto error;
1987 }
1988
1989 if (PMIX_RANK_UNDEF == rank) {
1990 ssize_t _nprocs = _get_univ_size(ds_ctx, ns_map->name);
1991 if( 0 > _nprocs ){
1992 goto error;
1993 }
1994 nprocs = (size_t) _nprocs;
1995 cur_rank = 0;
1996 } else {
1997 nprocs = 1;
1998 cur_rank = rank;
1999 }
2000
2001
2002 lock_rc = _ESH_LOCK(ds_ctx, ns_map->tbl_idx, rd_lock);
2003 if (PMIX_SUCCESS != lock_rc) {
2004
2005 rc = lock_rc;
2006 goto error;
2007 }
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022 _update_initial_segment_info(ds_ctx, ns_map);
2023
2024 ns_info = _get_ns_info_from_initial_segment(ds_ctx, ns_map);
2025 if (NULL == ns_info) {
2026
2027 PMIX_OUTPUT_VERBOSE((7, pmix_gds_base_framework.framework_output,
2028 "%s:%d:%s: no data for ns %s is found in the shared memory.",
2029 __FILE__, __LINE__, __func__, ns_map->name));
2030 rc = PMIX_ERR_PROC_ENTRY_NOT_FOUND;
2031 goto done;
2032 }
2033
2034
2035 elem = _get_track_elem_for_namespace(ds_ctx, ns_map);
2036 if (NULL == elem) {
2037
2038 rc = PMIX_ERR_FATAL;
2039 PMIX_ERROR_LOG(rc);
2040 goto done;
2041 }
2042
2043
2044
2045
2046 rc = _update_ns_elem(ds_ctx, elem, ns_info);
2047 if (PMIX_SUCCESS != rc) {
2048 PMIX_ERROR_LOG(rc);
2049 goto done;
2050 }
2051
2052
2053 meta_seg = elem->meta_seg;
2054 data_seg = elem->data_seg;
2055
2056 if( NULL != key ) {
2057 keyhash = PMIX_DS_KEY_HASH(ds_ctx, key);
2058 }
2059
2060
2061 if (lock_is_set) {
2062 lock_is_set = false;
2063 if (0 != (rc = pthread_mutex_unlock(&ds_ctx->lock))) {
2064 goto error;
2065 }
2066 }
2067
2068 while (nprocs--) {
2069
2070 rinfo = _get_rank_meta_info(ds_ctx, cur_rank, meta_seg);
2071 if (NULL == rinfo) {
2072 PMIX_OUTPUT_VERBOSE((7, pmix_gds_base_framework.framework_output,
2073 "%s:%d:%s: no data for this rank is found in the shared memory. rank %u",
2074 __FILE__, __LINE__, __func__, cur_rank));
2075 all_ranks_found = false;
2076 continue;
2077 }
2078 addr = _get_data_region_by_offset(ds_ctx, data_seg, rinfo->offset);
2079 if (NULL == addr) {
2080
2081 rc = PMIX_ERR_FATAL;
2082 PMIX_ERROR_LOG(rc);
2083 goto done;
2084 }
2085 kval_cnt = rinfo->count;
2086
2087
2088 if ((NULL == key) && (kval_cnt > 0)) {
2089 kval = (pmix_value_t*)malloc(sizeof(pmix_value_t));
2090 if (NULL == kval) {
2091 rc = PMIX_ERR_NOMEM;
2092 goto done;
2093 }
2094 PMIX_VALUE_CONSTRUCT(kval);
2095
2096 ninfo = kval_cnt;
2097 PMIX_INFO_CREATE(info, ninfo);
2098 if (NULL == info) {
2099 rc = PMIX_ERR_NOMEM;
2100 goto done;
2101 }
2102
2103 kval->type = PMIX_DATA_ARRAY;
2104 kval->data.darray = (pmix_data_array_t*)malloc(sizeof(pmix_data_array_t));
2105 if (NULL == kval->data.darray) {
2106 rc = PMIX_ERR_NOMEM;
2107 goto done;
2108 }
2109 kval->data.darray->type = PMIX_INFO;
2110 kval->data.darray->size = ninfo;
2111 kval->data.darray->array = info;
2112 *kvs = kval;
2113 }
2114
2115 rc = PMIX_SUCCESS;
2116 while (0 < kval_cnt) {
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130 if (PMIX_DS_KEY_IS_INVALID(ds_ctx, addr)) {
2131 PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
2132 "%s:%d:%s: for rank %s:%u, skip %s region",
2133 __FILE__, __LINE__, __func__, nspace, cur_rank, ESH_REGION_INVALIDATED));
2134
2135
2136 addr += PMIX_DS_KV_SIZE(ds_ctx, addr);
2137 } else if (PMIX_DS_KEY_IS_EXTSLOT(ds_ctx, addr)) {
2138 size_t offset;
2139 memcpy(&offset, PMIX_DS_DATA_PTR(ds_ctx, addr), sizeof(size_t));
2140 PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
2141 "%s:%d:%s: for rank %s:%u, reached %s with %lu value",
2142 __FILE__, __LINE__, __func__, nspace, cur_rank, ESH_REGION_EXTENSION, offset));
2143 if (0 < offset) {
2144
2145 addr = _get_data_region_by_offset(ds_ctx, data_seg, offset);
2146 if (NULL == addr) {
2147
2148 rc = PMIX_ERR_FATAL;
2149 PMIX_ERROR_LOG(rc);
2150 goto done;
2151 }
2152 } else {
2153
2154 PMIX_OUTPUT_VERBOSE((7, pmix_gds_base_framework.framework_output,
2155 "%s:%d:%s: no more data for this rank is found in the shared memory. rank %u key %s not found",
2156 __FILE__, __LINE__, __func__, cur_rank, key));
2157 break;
2158 }
2159 } else if (NULL == key) {
2160 PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
2161 "%s:%d:%s: for rank %s:%u, found target key %s",
2162 __FILE__, __LINE__, __func__, nspace, cur_rank, PMIX_DS_KNAME_PTR(ds_ctx, addr)));
2163
2164 uint8_t *data_ptr = PMIX_DS_DATA_PTR(ds_ctx, addr);
2165 size_t data_size = PMIX_DS_DATA_SIZE(ds_ctx, addr, data_ptr);
2166 PMIX_CONSTRUCT(&buffer, pmix_buffer_t);
2167 PMIX_LOAD_BUFFER(_client_peer(ds_ctx), &buffer, data_ptr, data_size);
2168 int cnt = 1;
2169
2170 PMIX_VALUE_CONSTRUCT(&val);
2171 PMIX_BFROPS_UNPACK(rc, _client_peer(ds_ctx), &buffer, &val, &cnt, PMIX_VALUE);
2172 if (PMIX_SUCCESS != rc) {
2173 PMIX_ERROR_LOG(rc);
2174 goto done;
2175 }
2176 pmix_strncpy(info[kval_cnt - 1].key, PMIX_DS_KNAME_PTR(ds_ctx, addr),
2177 PMIX_DS_KNAME_LEN(ds_ctx, addr));
2178 pmix_value_xfer(&info[kval_cnt - 1].value, &val);
2179 PMIX_VALUE_DESTRUCT(&val);
2180 buffer.base_ptr = NULL;
2181 buffer.bytes_used = 0;
2182 PMIX_DESTRUCT(&buffer);
2183 key_found = true;
2184
2185 kval_cnt--;
2186 addr += PMIX_DS_KV_SIZE(ds_ctx, addr);
2187 } else if (PMIX_DS_KEY_MATCH(ds_ctx, addr, key, keyhash)) {
2188 PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
2189 "%s:%d:%s: for rank %s:%u, found target key %s",
2190 __FILE__, __LINE__, __func__, nspace, cur_rank, key));
2191
2192 uint8_t *data_ptr = PMIX_DS_DATA_PTR(ds_ctx, addr);
2193 size_t data_size = PMIX_DS_DATA_SIZE(ds_ctx, addr, data_ptr);
2194 PMIX_CONSTRUCT(&buffer, pmix_buffer_t);
2195 PMIX_LOAD_BUFFER(_client_peer(ds_ctx), &buffer, data_ptr, data_size);
2196 int cnt = 1;
2197
2198 *kvs = (pmix_value_t*)malloc(sizeof(pmix_value_t));
2199 PMIX_BFROPS_UNPACK(rc, _client_peer(ds_ctx), &buffer, (void*)*kvs, &cnt, PMIX_VALUE);
2200 if (PMIX_SUCCESS != rc) {
2201 PMIX_ERROR_LOG(rc);
2202 goto done;
2203 }
2204 buffer.base_ptr = NULL;
2205 buffer.bytes_used = 0;
2206 PMIX_DESTRUCT(&buffer);
2207 key_found = true;
2208 goto done;
2209 } else {
2210 PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
2211 "%s:%d:%s: for rank %s:%u, skip key %s look for key %s",
2212 __FILE__, __LINE__, __func__, nspace, cur_rank,
2213 PMIX_DS_KNAME_PTR(ds_ctx, addr), key));
2214
2215 addr += PMIX_DS_KV_SIZE(ds_ctx, addr);
2216 kval_cnt--;
2217 }
2218 }
2219
2220 if (PMIX_RANK_UNDEF == rank) {
2221 cur_rank++;
2222 }
2223 }
2224
2225 done:
2226
2227 lock_rc = _ESH_LOCK(ds_ctx, ns_map->tbl_idx, rd_unlock);
2228 if (PMIX_SUCCESS != lock_rc) {
2229 PMIX_ERROR_LOG(lock_rc);
2230 }
2231
2232
2233 if (lock_is_set) {
2234 pthread_mutex_unlock(&ds_ctx->lock);
2235 }
2236
2237 if( rc != PMIX_SUCCESS ){
2238 if ((NULL == key) && (kval_cnt > 0)) {
2239 if( NULL != info ) {
2240 PMIX_INFO_FREE(info, ninfo);
2241 }
2242 if (NULL != kval) {
2243 PMIX_VALUE_RELEASE(kval);
2244 }
2245 }
2246 return rc;
2247 }
2248
2249 if( key_found ){
2250
2251 return PMIX_SUCCESS;
2252 }
2253
2254 if( !all_ranks_found ){
2255
2256
2257
2258 rc = PMIX_ERR_PROC_ENTRY_NOT_FOUND;
2259 return rc;
2260 }
2261 rc = PMIX_ERR_NOT_FOUND;
2262 return rc;
2263
2264 error:
2265 if (lock_is_set) {
2266 pthread_mutex_unlock(&ds_ctx->lock);
2267 }
2268 PMIX_ERROR_LOG(rc);
2269 return rc;
2270 }
2271
2272 PMIX_EXPORT pmix_status_t pmix_common_dstor_fetch(pmix_common_dstore_ctx_t *ds_ctx,
2273 const pmix_proc_t *proc,
2274 pmix_scope_t scope, bool copy,
2275 const char *key,
2276 pmix_info_t info[], size_t ninfo,
2277 pmix_list_t *kvs)
2278 {
2279 pmix_kval_t *kv;
2280 pmix_value_t *val;
2281 pmix_status_t rc = PMIX_SUCCESS;
2282
2283 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
2284 "gds: dstore fetch `%s`", key == NULL ? "NULL" : key);
2285
2286 rc = _dstore_fetch(ds_ctx, proc->nspace, proc->rank, key, &val);
2287 if (PMIX_SUCCESS == rc) {
2288 if( NULL == key ) {
2289 pmix_info_t *info;
2290 size_t n, ninfo;
2291
2292 if (NULL == val->data.darray ||
2293 PMIX_INFO != val->data.darray->type ||
2294 0 == val->data.darray->size) {
2295 PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND);
2296 return PMIX_ERR_NOT_FOUND;
2297 }
2298 info = (pmix_info_t*)val->data.darray->array;
2299 ninfo = val->data.darray->size;
2300
2301 for (n = 0; n < ninfo; n++){
2302 kv = PMIX_NEW(pmix_kval_t);
2303 if (NULL == kv) {
2304 rc = PMIX_ERR_NOMEM;
2305 PMIX_VALUE_RELEASE(val);
2306 return rc;
2307 }
2308 kv->key = strdup(info[n].key);
2309 PMIX_VALUE_XFER(rc, kv->value, &info[n].value);
2310 if (PMIX_SUCCESS != rc) {
2311 PMIX_ERROR_LOG(rc);
2312 PMIX_RELEASE(kv);
2313 PMIX_VALUE_RELEASE(val);
2314 return rc;
2315 }
2316 pmix_list_append(kvs, &kv->super);
2317 }
2318
2319 return PMIX_SUCCESS;
2320 }
2321
2322 kv = PMIX_NEW(pmix_kval_t);
2323 if (NULL == kv) {
2324 PMIX_VALUE_RELEASE(val);
2325 return PMIX_ERR_NOMEM;
2326 }
2327 kv->key = strdup(key);
2328 kv->value = val;
2329 pmix_list_append(kvs, &kv->super);
2330 }
2331 return rc;
2332 }
2333
2334 PMIX_EXPORT pmix_status_t pmix_common_dstor_setup_fork(pmix_common_dstore_ctx_t *ds_ctx, const char *base_path_env,
2335 const pmix_proc_t *peer, char ***env)
2336 {
2337 pmix_status_t rc = PMIX_SUCCESS;
2338 ns_map_data_t *ns_map = NULL;
2339
2340 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
2341 "gds: dstore setup fork");
2342
2343 if (NULL == ds_ctx->session_map_search) {
2344 rc = PMIX_ERR_NOT_AVAILABLE;
2345 PMIX_ERROR_LOG(rc);
2346 return rc;
2347 }
2348
2349 if (NULL == (ns_map = ds_ctx->session_map_search(ds_ctx, peer->nspace))) {
2350 rc = PMIX_ERR_NOT_AVAILABLE;
2351 PMIX_ERROR_LOG(rc);
2352 return rc;
2353 }
2354
2355 if ((NULL == ds_ctx->base_path) || (strlen(ds_ctx->base_path) == 0)){
2356 rc = PMIX_ERR_NOT_AVAILABLE;
2357 PMIX_ERROR_LOG(rc);
2358 return rc;
2359 }
2360
2361 if(PMIX_SUCCESS != (rc = pmix_setenv(base_path_env,
2362 _ESH_SESSION_path(ds_ctx->session_array, ns_map->tbl_idx),
2363 true, env))){
2364 PMIX_ERROR_LOG(rc);
2365 }
2366
2367 return rc;
2368 }
2369
2370 PMIX_EXPORT pmix_status_t pmix_common_dstor_add_nspace(pmix_common_dstore_ctx_t *ds_ctx,
2371 const char *nspace, pmix_info_t info[], size_t ninfo)
2372 {
2373 pmix_status_t rc = PMIX_SUCCESS;
2374 size_t tbl_idx=0;
2375 uid_t jobuid = ds_ctx->jobuid;
2376 char setjobuid = ds_ctx->setjobuid;
2377 size_t n;
2378 ns_map_data_t *ns_map = NULL;
2379 uint32_t local_size = 0;
2380
2381 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
2382 "gds: dstore add nspace");
2383
2384 if (NULL != info) {
2385 for (n=0; n < ninfo; n++) {
2386 if (0 == strcmp(PMIX_USERID, info[n].key)) {
2387 jobuid = info[n].value.data.uint32;
2388 setjobuid = 1;
2389 continue;
2390 }
2391 if (0 == strcmp(PMIX_LOCAL_SIZE, info[n].key)) {
2392 local_size = info[n].value.data.uint32;
2393 continue;
2394 }
2395 }
2396 }
2397
2398 if (PMIX_SUCCESS != _esh_jobuid_tbl_search(ds_ctx, jobuid, &tbl_idx)) {
2399
2400 rc = _esh_session_tbl_add(ds_ctx, &tbl_idx);
2401 if (PMIX_SUCCESS != rc) {
2402 PMIX_ERROR_LOG(rc);
2403 return rc;
2404 }
2405 ns_map = _esh_session_map(ds_ctx, nspace, local_size, tbl_idx);
2406 if (NULL == ns_map) {
2407 rc = PMIX_ERROR;
2408 PMIX_ERROR_LOG(rc);
2409 return rc;
2410 }
2411
2412 if (PMIX_SUCCESS != (rc =_esh_session_init(ds_ctx, tbl_idx, ns_map,
2413 local_size, jobuid, setjobuid))) {
2414 rc = PMIX_ERROR;
2415 PMIX_ERROR_LOG(rc);
2416 return rc;
2417 }
2418 }
2419 else {
2420 ns_map = _esh_session_map(ds_ctx, nspace, local_size, tbl_idx);
2421 if (NULL == ns_map) {
2422 rc = PMIX_ERROR;
2423 PMIX_ERROR_LOG(rc);
2424 return rc;
2425 }
2426 }
2427
2428
2429 ds_ctx->lock_cbs->init(&_ESH_SESSION_lock(ds_ctx->session_array, tbl_idx),
2430 ds_ctx->base_path, nspace, local_size, ds_ctx->jobuid,
2431 ds_ctx->setjobuid);
2432 if (NULL == _ESH_SESSION_lock(ds_ctx->session_array, tbl_idx)) {
2433 PMIX_ERROR_LOG(rc);
2434 return rc;
2435 }
2436
2437 return PMIX_SUCCESS;
2438 }
2439
2440 PMIX_EXPORT pmix_status_t pmix_common_dstor_del_nspace(pmix_common_dstore_ctx_t *ds_ctx, const char* nspace)
2441 {
2442 pmix_status_t rc = PMIX_SUCCESS;
2443 size_t map_idx, size;
2444 int in_use = 0;
2445 ns_map_data_t *ns_map_data = NULL;
2446 ns_map_t *ns_map;
2447 session_t *session_tbl = NULL;
2448 ns_track_elem_t *trk = NULL;
2449 int dstor_track_idx;
2450 size_t session_tbl_idx;
2451
2452 PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
2453 "%s:%d:%s delete nspace `%s`", __FILE__, __LINE__, __func__, nspace));
2454
2455 if (NULL == (ns_map_data = ds_ctx->session_map_search(ds_ctx, nspace))) {
2456 rc = PMIX_ERR_NOT_AVAILABLE;
2457 return rc;
2458 }
2459 dstor_track_idx = ns_map_data->track_idx;
2460 session_tbl_idx = ns_map_data->tbl_idx;
2461 size = pmix_value_array_get_size(ds_ctx->ns_map_array);
2462 ns_map = PMIX_VALUE_ARRAY_GET_BASE(ds_ctx->ns_map_array, ns_map_t);
2463
2464 for (map_idx = 0; map_idx < size; map_idx++){
2465 if (ns_map[map_idx].in_use &&
2466 (ns_map[map_idx].data.tbl_idx == ns_map_data->tbl_idx)) {
2467 if (0 == strcmp(ns_map[map_idx].data.name, nspace)) {
2468 _esh_session_map_clean(ds_ctx, &ns_map[map_idx]);
2469 continue;
2470 }
2471 in_use++;
2472 }
2473 }
2474
2475
2476
2477 if (!in_use) {
2478 session_tbl = PMIX_VALUE_ARRAY_GET_BASE(ds_ctx->session_array, session_t);
2479 PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output,
2480 "%s:%d:%s delete session for jobuid: %d",
2481 __FILE__, __LINE__, __func__, session_tbl[session_tbl_idx].jobuid));
2482 size = pmix_value_array_get_size(ds_ctx->ns_track_array);
2483 if (size && (dstor_track_idx >= 0)) {
2484 if((dstor_track_idx + 1) > (int)size) {
2485 rc = PMIX_ERR_VALUE_OUT_OF_BOUNDS;
2486 PMIX_ERROR_LOG(rc);
2487 goto exit;
2488 }
2489 trk = pmix_value_array_get_item(ds_ctx->ns_track_array, dstor_track_idx);
2490 if (true == trk->in_use) {
2491 PMIX_DESTRUCT(trk);
2492 pmix_value_array_remove_item(ds_ctx->ns_track_array, dstor_track_idx);
2493 }
2494 }
2495 _esh_session_release(ds_ctx, session_tbl_idx);
2496 }
2497 exit:
2498 return rc;
2499 }
2500
2501 static inline int _my_client(const char *nspace, pmix_rank_t rank)
2502 {
2503 pmix_peer_t *peer;
2504 int i;
2505 int local = 0;
2506
2507 for (i = 0; i < pmix_server_globals.clients.size; i++) {
2508 if (NULL != (peer = (pmix_peer_t *)pmix_pointer_array_get_item(&pmix_server_globals.clients, i))) {
2509 if (0 == strcmp(peer->info->pname.nspace, nspace) && peer->info->pname.rank == rank) {
2510 local = 1;
2511 break;
2512 }
2513 }
2514 }
2515
2516 return local;
2517 }
2518
2519
2520
2521
2522
2523 PMIX_EXPORT pmix_status_t pmix_common_dstor_store_modex(pmix_common_dstore_ctx_t *ds_ctx,
2524 struct pmix_namespace_t *nspace,
2525 pmix_buffer_t *buf,
2526 void *cbdata)
2527 {
2528 pmix_status_t rc = PMIX_SUCCESS;
2529 pmix_status_t rc1 = PMIX_SUCCESS;
2530 pmix_namespace_t *ns = (pmix_namespace_t*)nspace;
2531 ns_map_data_t *ns_map;
2532
2533 if (NULL == (ns_map = ds_ctx->session_map_search(ds_ctx, ns->nspace))) {
2534 rc = PMIX_ERROR;
2535 PMIX_ERROR_LOG(rc);
2536 return rc;
2537 }
2538
2539
2540 rc = _ESH_LOCK(ds_ctx, ns_map->tbl_idx, wr_lock);
2541 if (PMIX_SUCCESS != rc) {
2542 PMIX_ERROR_LOG(rc);
2543 return rc;
2544 }
2545
2546 rc = pmix_gds_base_store_modex(nspace, buf, ds_ctx,
2547 (pmix_gds_base_store_modex_cb_fn_t)_dstor_store_modex_cb,
2548 cbdata);
2549 if (PMIX_SUCCESS != rc) {
2550 PMIX_ERROR_LOG(rc);
2551 }
2552
2553
2554 rc1 = _ESH_LOCK(ds_ctx, ns_map->tbl_idx, wr_unlock);
2555 if (PMIX_SUCCESS != rc1) {
2556 PMIX_ERROR_LOG(rc1);
2557 if (PMIX_SUCCESS == rc) {
2558 rc = rc1;
2559 }
2560 }
2561
2562 return rc;
2563 }
2564
2565 static pmix_status_t _dstor_store_modex_cb(pmix_common_dstore_ctx_t *ds_ctx,
2566 pmix_proc_t *proc,
2567 pmix_gds_modex_key_fmt_t key_fmt,
2568 char **kmap,
2569 pmix_buffer_t *pbkt)
2570 {
2571 pmix_status_t rc = PMIX_SUCCESS;
2572 pmix_kval_t *kv;
2573 ns_map_data_t *ns_map;
2574 pmix_buffer_t tmp;
2575
2576 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
2577 "[%s:%d] gds:dstore:store_modex for nspace %s",
2578 pmix_globals.myid.nspace, pmix_globals.myid.rank,
2579 proc->nspace);
2580
2581
2582
2583
2584
2585
2586
2587
2588
2589
2590
2591
2592 if (_my_client(proc->nspace, proc->rank)) {
2593 return PMIX_SUCCESS;
2594 }
2595
2596
2597 PMIX_CONSTRUCT(&tmp, pmix_buffer_t);
2598
2599
2600 kv = PMIX_NEW(pmix_kval_t);
2601 rc = pmix_gds_base_modex_unpack_kval(key_fmt, pbkt, kmap, kv);
2602
2603 while (PMIX_SUCCESS == rc) {
2604
2605 PMIX_GDS_STORE_KV(rc, pmix_globals.mypeer, proc, PMIX_REMOTE, kv);
2606 if (PMIX_SUCCESS != rc) {
2607 PMIX_ERROR_LOG(rc);
2608 return rc;
2609 }
2610
2611
2612 PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &tmp, kv, 1, PMIX_KVAL);
2613
2614
2615
2616 PMIX_RELEASE(kv);
2617
2618
2619 kv = PMIX_NEW(pmix_kval_t);
2620 rc = pmix_gds_base_modex_unpack_kval(key_fmt, pbkt, kmap, kv);
2621 if (PMIX_SUCCESS != rc) {
2622 break;
2623 }
2624 }
2625
2626
2627
2628 PMIX_RELEASE(kv);
2629 if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
2630 PMIX_ERROR_LOG(rc);
2631 } else {
2632 rc = PMIX_SUCCESS;
2633 }
2634
2635
2636
2637 kv = PMIX_NEW(pmix_kval_t);
2638 PMIX_VALUE_CREATE(kv->value, 1);
2639 kv->value->type = PMIX_BYTE_OBJECT;
2640 PMIX_UNLOAD_BUFFER(&tmp, kv->value->data.bo.bytes, kv->value->data.bo.size);
2641
2642
2643 if (NULL == (ns_map = ds_ctx->session_map_search(ds_ctx, proc->nspace))) {
2644 rc = PMIX_ERROR;
2645 PMIX_ERROR_LOG(rc);
2646 return rc;
2647 }
2648
2649
2650 rc = _dstore_store_nolock(ds_ctx, ns_map, proc->rank, kv);
2651 if (PMIX_SUCCESS != rc) {
2652 PMIX_ERROR_LOG(rc);
2653 }
2654
2655
2656 PMIX_RELEASE(kv);
2657 PMIX_DESTRUCT(&tmp);
2658
2659 return rc;
2660 }
2661
2662 static pmix_status_t _store_job_info(pmix_common_dstore_ctx_t *ds_ctx, ns_map_data_t *ns_map,
2663 pmix_proc_t *proc)
2664 {
2665 pmix_cb_t cb;
2666 pmix_kval_t *kv;
2667 pmix_buffer_t buf;
2668 pmix_kval_t *kv2 = NULL, *kvp;
2669 pmix_status_t rc = PMIX_SUCCESS;
2670
2671 PMIX_CONSTRUCT(&cb, pmix_cb_t);
2672 PMIX_CONSTRUCT(&buf, pmix_buffer_t);
2673 kvp = PMIX_NEW(pmix_kval_t);
2674 PMIX_VALUE_CREATE(kvp->value, 1);
2675 kvp->value->type = PMIX_BYTE_OBJECT;
2676
2677 cb.proc = proc;
2678 cb.scope = PMIX_INTERNAL;
2679 cb.copy = false;
2680
2681 PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, &cb);
2682 if (PMIX_SUCCESS != rc) {
2683 if (rc == PMIX_ERR_PROC_ENTRY_NOT_FOUND) {
2684
2685 rc = PMIX_SUCCESS;
2686 }
2687 goto exit;
2688 }
2689
2690 PMIX_LIST_FOREACH(kv, &cb.kvs, pmix_kval_t) {
2691 if ((PMIX_PROC_IS_V1(_client_peer(ds_ctx)) || PMIX_PROC_IS_V20(_client_peer(ds_ctx))) &&
2692 0 != strncmp("pmix.", kv->key, 4) &&
2693 kv->value->type == PMIX_DATA_ARRAY) {
2694 pmix_info_t *info;
2695 size_t size, i;
2696 info = kv->value->data.darray->array;
2697 size = kv->value->data.darray->size;
2698
2699 for (i = 0; i < size; i++) {
2700 if (0 == strcmp(PMIX_LOCAL_PEERS, info[i].key)) {
2701 kv2 = PMIX_NEW(pmix_kval_t);
2702 kv2->key = strdup(kv->key);
2703 PMIX_VALUE_XFER(rc, kv2->value, &info[i].value);
2704 if (PMIX_SUCCESS != rc) {
2705 PMIX_ERROR_LOG(rc);
2706 PMIX_RELEASE(kv2);
2707 goto exit;
2708 }
2709 PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &buf, kv2, 1, PMIX_KVAL);
2710 if (PMIX_SUCCESS != rc) {
2711 PMIX_ERROR_LOG(rc);
2712 PMIX_RELEASE(kv2);
2713 goto exit;
2714 }
2715 PMIX_RELEASE(kv2);
2716 }
2717 }
2718 } else {
2719 PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &buf, kv, 1, PMIX_KVAL);
2720 if (PMIX_SUCCESS != rc) {
2721 PMIX_ERROR_LOG(rc);
2722 goto exit;
2723 }
2724 }
2725 }
2726
2727 PMIX_UNLOAD_BUFFER(&buf, kvp->value->data.bo.bytes, kvp->value->data.bo.size);
2728 if (PMIX_SUCCESS != (rc = _dstore_store_nolock(ds_ctx, ns_map, proc->rank, kvp))) {
2729 PMIX_ERROR_LOG(rc);
2730 goto exit;
2731 }
2732
2733 exit:
2734 PMIX_RELEASE(kvp);
2735 PMIX_DESTRUCT(&cb);
2736 PMIX_DESTRUCT(&buf);
2737 return rc;
2738 }
2739
2740 PMIX_EXPORT pmix_status_t pmix_common_dstor_register_job_info(pmix_common_dstore_ctx_t *ds_ctx,
2741 struct pmix_peer_t *pr,
2742 pmix_buffer_t *reply)
2743 {
2744 pmix_peer_t *peer = (pmix_peer_t*)pr;
2745 pmix_namespace_t *ns = peer->nptr;
2746 char *msg;
2747 pmix_status_t rc;
2748 pmix_proc_t proc;
2749 pmix_rank_t rank;
2750
2751 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
2752 "[%s:%d] gds:dstore:register_job_info for peer [%s:%d]",
2753 pmix_globals.myid.nspace, pmix_globals.myid.rank,
2754 peer->info->pname.nspace, peer->info->pname.rank);
2755
2756 if (0 == ns->ndelivered) {
2757 ns_map_data_t *ns_map;
2758
2759 _client_compat_save(ds_ctx, peer);
2760 pmix_strncpy(proc.nspace, ns->nspace, PMIX_MAX_NSLEN);
2761 proc.rank = PMIX_RANK_WILDCARD;
2762 if (NULL == (ns_map = ds_ctx->session_map_search(ds_ctx, proc.nspace))) {
2763 rc = PMIX_ERROR;
2764 PMIX_ERROR_LOG(rc);
2765 return rc;
2766 }
2767
2768
2769 rc = _ESH_LOCK(ds_ctx, ns_map->tbl_idx, wr_lock);
2770 if (PMIX_SUCCESS != rc) {
2771 PMIX_ERROR_LOG(rc);
2772 return rc;
2773 }
2774
2775 rc = _store_job_info(ds_ctx, ns_map, &proc);
2776 if (PMIX_SUCCESS != rc) {
2777 PMIX_ERROR_LOG(rc);
2778 return rc;
2779 }
2780
2781 for (rank=0; rank < ns->nprocs; rank++) {
2782 proc.rank = rank;
2783 rc = _store_job_info(ds_ctx, ns_map, &proc);
2784 if (PMIX_SUCCESS != rc) {
2785 PMIX_ERROR_LOG(rc);
2786 return rc;
2787 }
2788 }
2789
2790 rc = _ESH_LOCK(ds_ctx, ns_map->tbl_idx, wr_unlock);
2791 if (PMIX_SUCCESS != rc) {
2792 PMIX_ERROR_LOG(rc);
2793 return rc;
2794 }
2795 }
2796
2797
2798 msg = ns->nspace;
2799 PMIX_BFROPS_PACK(rc, peer, reply, &msg, 1, PMIX_STRING);
2800 if (PMIX_SUCCESS != rc) {
2801 PMIX_ERROR_LOG(rc);
2802 return rc;
2803 }
2804
2805 return rc;
2806 }
2807
2808 PMIX_EXPORT pmix_status_t pmix_common_dstor_store_job_info(pmix_common_dstore_ctx_t *ds_ctx,
2809 const char *nspace,
2810 pmix_buffer_t *job_data)
2811 {
2812 pmix_status_t rc = PMIX_SUCCESS;
2813
2814 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
2815 "[%s:%u] pmix:gds:dstore store job info for nspace %s",
2816 pmix_globals.myid.nspace, pmix_globals.myid.rank, nspace);
2817
2818
2819 if ((NULL == job_data) || (0 == job_data->bytes_used)) {
2820 rc = PMIX_ERR_BAD_PARAM;
2821 PMIX_ERROR_LOG(rc);
2822 return rc;
2823 }
2824 return rc;
2825 }
2826
2827 static void _client_compat_save(pmix_common_dstore_ctx_t *ds_ctx, pmix_peer_t *peer)
2828 {
2829 pmix_namespace_t *nptr = NULL;
2830
2831 if (NULL == ds_ctx->clients_peer) {
2832 ds_ctx->clients_peer = PMIX_NEW(pmix_peer_t);
2833 nptr = PMIX_NEW(pmix_namespace_t);
2834 ds_ctx->clients_peer->nptr = nptr;
2835 }
2836 ds_ctx->clients_peer->nptr->compat = peer->nptr->compat;
2837 ds_ctx->clients_peer->proc_type = peer->proc_type;
2838 }
2839
2840 static inline pmix_peer_t * _client_peer(pmix_common_dstore_ctx_t *ds_ctx)
2841 {
2842 if (NULL == ds_ctx->clients_peer) {
2843 return pmix_globals.mypeer;
2844 }
2845 return ds_ctx->clients_peer;
2846 }