This source file includes following definitions.
- htcon
- htdes
- hash_init
- hash_finalize
- hash_assign_module
- store_map
- hash_cache_job_info
- register_info
- hash_register_job_info
- hash_store_job_info
- hash_store
- hash_store_modex
- _hash_store_modex
- hash_fetch
- setup_fork
- nspace_add
- nspace_del
- assemb_kvs_req
- accept_kvs_resp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 #include <src/include/pmix_config.h>
17
18 #include <string.h>
19 #ifdef HAVE_UNISTD_H
20 #include <unistd.h>
21 #endif
22 #ifdef HAVE_SYS_TYPES_H
23 #include <sys/types.h>
24 #endif
25 #ifdef HAVE_SYS_STAT_H
26 #include <sys/stat.h>
27 #endif
28 #ifdef HAVE_FCNTL_H
29 #include <fcntl.h>
30 #endif
31 #include <time.h>
32
33 #include <pmix_common.h>
34
35 #include "src/include/pmix_globals.h"
36 #include "src/class/pmix_list.h"
37 #include "src/client/pmix_client_ops.h"
38 #include "src/server/pmix_server_ops.h"
39 #include "src/util/argv.h"
40 #include "src/mca/pcompress/base/base.h"
41 #include "src/util/error.h"
42 #include "src/util/hash.h"
43 #include "src/util/output.h"
44 #include "src/util/pmix_environ.h"
45 #include "src/mca/preg/preg.h"
46
47 #include "src/mca/gds/base/base.h"
48 #include "gds_hash.h"
49
50 static pmix_status_t hash_init(pmix_info_t info[], size_t ninfo);
51 static void hash_finalize(void);
52
53 static pmix_status_t hash_assign_module(pmix_info_t *info, size_t ninfo,
54 int *priority);
55
56 static pmix_status_t hash_cache_job_info(struct pmix_namespace_t *ns,
57 pmix_info_t info[], size_t ninfo);
58
59 static pmix_status_t hash_register_job_info(struct pmix_peer_t *pr,
60 pmix_buffer_t *reply);
61
62 static pmix_status_t hash_store_job_info(const char *nspace,
63 pmix_buffer_t *buf);
64
65 static pmix_status_t hash_store(const pmix_proc_t *proc,
66 pmix_scope_t scope,
67 pmix_kval_t *kv);
68
69 static pmix_status_t hash_store_modex(struct pmix_namespace_t *ns,
70 pmix_buffer_t *buff,
71 void *cbdata);
72
73 static pmix_status_t _hash_store_modex(pmix_gds_base_ctx_t ctx,
74 pmix_proc_t *proc,
75 pmix_gds_modex_key_fmt_t key_fmt,
76 char **kmap,
77 pmix_buffer_t *pbkt);
78
79 static pmix_status_t hash_fetch(const pmix_proc_t *proc,
80 pmix_scope_t scope, bool copy,
81 const char *key,
82 pmix_info_t info[], size_t ninfo,
83 pmix_list_t *kvs);
84
85 static pmix_status_t setup_fork(const pmix_proc_t *peer, char ***env);
86
87 static pmix_status_t nspace_add(const char *nspace,
88 pmix_info_t info[],
89 size_t ninfo);
90
91 static pmix_status_t nspace_del(const char *nspace);
92
93 static pmix_status_t assemb_kvs_req(const pmix_proc_t *proc,
94 pmix_list_t *kvs,
95 pmix_buffer_t *bo,
96 void *cbdata);
97
98 static pmix_status_t accept_kvs_resp(pmix_buffer_t *buf);
99
100 pmix_gds_base_module_t pmix_hash_module = {
101 .name = "hash",
102 .is_tsafe = false,
103 .init = hash_init,
104 .finalize = hash_finalize,
105 .assign_module = hash_assign_module,
106 .cache_job_info = hash_cache_job_info,
107 .register_job_info = hash_register_job_info,
108 .store_job_info = hash_store_job_info,
109 .store = hash_store,
110 .store_modex = hash_store_modex,
111 .fetch = hash_fetch,
112 .setup_fork = setup_fork,
113 .add_nspace = nspace_add,
114 .del_nspace = nspace_del,
115 .assemb_kvs_req = assemb_kvs_req,
116 .accept_kvs_resp = accept_kvs_resp
117 };
118
119 typedef struct {
120 pmix_list_item_t super;
121 char *ns;
122 pmix_namespace_t *nptr;
123 pmix_hash_table_t internal;
124 pmix_hash_table_t remote;
125 pmix_hash_table_t local;
126 bool gdata_added;
127 } pmix_hash_trkr_t;
128
129 static void htcon(pmix_hash_trkr_t *p)
130 {
131 p->ns = NULL;
132 p->nptr = NULL;
133 PMIX_CONSTRUCT(&p->internal, pmix_hash_table_t);
134 pmix_hash_table_init(&p->internal, 256);
135 PMIX_CONSTRUCT(&p->remote, pmix_hash_table_t);
136 pmix_hash_table_init(&p->remote, 256);
137 PMIX_CONSTRUCT(&p->local, pmix_hash_table_t);
138 pmix_hash_table_init(&p->local, 256);
139 p->gdata_added = false;
140 }
141 static void htdes(pmix_hash_trkr_t *p)
142 {
143 if (NULL != p->ns) {
144 free(p->ns);
145 }
146 if (NULL != p->nptr) {
147 PMIX_RELEASE(p->nptr);
148 }
149 pmix_hash_remove_data(&p->internal, PMIX_RANK_WILDCARD, NULL);
150 PMIX_DESTRUCT(&p->internal);
151 pmix_hash_remove_data(&p->remote, PMIX_RANK_WILDCARD, NULL);
152 PMIX_DESTRUCT(&p->remote);
153 pmix_hash_remove_data(&p->local, PMIX_RANK_WILDCARD, NULL);
154 PMIX_DESTRUCT(&p->local);
155 }
156 static PMIX_CLASS_INSTANCE(pmix_hash_trkr_t,
157 pmix_list_item_t,
158 htcon, htdes);
159
160 static pmix_list_t myhashes;
161
162 static pmix_status_t hash_init(pmix_info_t info[], size_t ninfo)
163 {
164 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
165 "gds: hash init");
166
167 PMIX_CONSTRUCT(&myhashes, pmix_list_t);
168 return PMIX_SUCCESS;
169 }
170
171 static void hash_finalize(void)
172 {
173 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
174 "gds: hash finalize");
175
176 PMIX_LIST_DESTRUCT(&myhashes);
177 }
178
179 static pmix_status_t hash_assign_module(pmix_info_t *info, size_t ninfo,
180 int *priority)
181 {
182 size_t n, m;
183 char **options;
184
185 *priority = 10;
186 if (NULL != info) {
187 for (n=0; n < ninfo; n++) {
188 if (0 == strncmp(info[n].key, PMIX_GDS_MODULE, PMIX_MAX_KEYLEN)) {
189 options = pmix_argv_split(info[n].value.data.string, ',');
190 for (m=0; NULL != options[m]; m++) {
191 if (0 == strcmp(options[m], "hash")) {
192
193 *priority = 100;
194 break;
195 }
196 }
197 pmix_argv_free(options);
198 break;
199 }
200 }
201 }
202 return PMIX_SUCCESS;
203 }
204
205 static pmix_status_t store_map(pmix_hash_table_t *ht,
206 char **nodes, char **ppn)
207 {
208 pmix_status_t rc;
209 pmix_value_t *val;
210 size_t m, n;
211 pmix_info_t *iptr, *info;
212 pmix_rank_t rank;
213 bool updated;
214 pmix_kval_t *kp2;
215 char **procs;
216
217 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
218 "[%s:%d] gds:hash:store_map",
219 pmix_globals.myid.nspace, pmix_globals.myid.rank);
220
221
222 if (pmix_argv_count(nodes) != pmix_argv_count(ppn)) {
223 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
224 return PMIX_ERR_BAD_PARAM;
225 }
226
227 for (n=0; NULL != nodes[n]; n++) {
228
229 val = NULL;
230 rc = pmix_hash_fetch(ht, PMIX_RANK_WILDCARD, nodes[n], &val);
231 if (PMIX_SUCCESS == rc && NULL != val) {
232
233 if (PMIX_DATA_ARRAY != val->type ||
234 NULL == val->data.darray ||
235 PMIX_INFO != val->data.darray->type ||
236 0 == val->data.darray->size) {
237
238 PMIX_VALUE_RELEASE(val);
239 PMIX_ERROR_LOG(PMIX_ERR_INVALID_VAL);
240 return PMIX_ERR_INVALID_VAL;
241 }
242 iptr = (pmix_info_t*)val->data.darray->array;
243 updated = false;
244 for (m=0; m < val->data.darray->size; m++) {
245 if (0 == strncmp(iptr[m].key, PMIX_LOCAL_PEERS, PMIX_MAX_KEYLEN)) {
246
247 if (NULL != iptr[m].value.data.string) {
248 free(iptr[m].value.data.string);
249 }
250 iptr[m].value.data.string = strdup(ppn[n]);
251 updated = true;
252 break;
253 }
254 }
255 if (!updated) {
256
257 kp2 = PMIX_NEW(pmix_kval_t);
258 if (NULL == kp2) {
259 return PMIX_ERR_NOMEM;
260 }
261 kp2->key = strdup(nodes[n]);
262 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
263 if (NULL == kp2->value) {
264 PMIX_RELEASE(kp2);
265 return PMIX_ERR_NOMEM;
266 }
267 kp2->value->type = PMIX_DATA_ARRAY;
268 kp2->value->data.darray = (pmix_data_array_t*)malloc(sizeof(pmix_data_array_t));
269 if (NULL == kp2->value->data.darray) {
270 PMIX_RELEASE(kp2);
271 return PMIX_ERR_NOMEM;
272 }
273 kp2->value->data.darray->type = PMIX_INFO;
274 kp2->value->data.darray->size = val->data.darray->size + 1;
275 PMIX_INFO_CREATE(info, kp2->value->data.darray->size);
276 if (NULL == info) {
277 PMIX_RELEASE(kp2);
278 return PMIX_ERR_NOMEM;
279 }
280
281 for (m=0; m < val->data.darray->size; m++) {
282 PMIX_INFO_XFER(&info[m], &iptr[m]);
283 }
284 PMIX_INFO_LOAD(&info[kp2->value->data.darray->size-1], PMIX_LOCAL_PEERS, ppn[n], PMIX_STRING);
285 kp2->value->data.darray->array = info;
286 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kp2))) {
287 PMIX_ERROR_LOG(rc);
288 PMIX_RELEASE(kp2);
289 return rc;
290 }
291 PMIX_RELEASE(kp2);
292 }
293 } else {
294
295 kp2 = PMIX_NEW(pmix_kval_t);
296 if (NULL == kp2) {
297 return PMIX_ERR_NOMEM;
298 }
299 kp2->key = strdup(nodes[n]);
300 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
301 if (NULL == kp2->value) {
302 PMIX_RELEASE(kp2);
303 return PMIX_ERR_NOMEM;
304 }
305 kp2->value->type = PMIX_DATA_ARRAY;
306 kp2->value->data.darray = (pmix_data_array_t*)malloc(sizeof(pmix_data_array_t));
307 if (NULL == kp2->value->data.darray) {
308 PMIX_RELEASE(kp2);
309 return PMIX_ERR_NOMEM;
310 }
311 kp2->value->data.darray->type = PMIX_INFO;
312 PMIX_INFO_CREATE(info, 1);
313 if (NULL == info) {
314 PMIX_RELEASE(kp2);
315 return PMIX_ERR_NOMEM;
316 }
317 PMIX_INFO_LOAD(&info[0], PMIX_LOCAL_PEERS, ppn[n], PMIX_STRING);
318 kp2->value->data.darray->array = info;
319 kp2->value->data.darray->size = 1;
320 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kp2))) {
321 PMIX_ERROR_LOG(rc);
322 PMIX_RELEASE(kp2);
323 return rc;
324 }
325 PMIX_RELEASE(kp2);
326 }
327
328
329 procs = pmix_argv_split(ppn[n], ',');
330 for (m=0; NULL != procs[m]; m++) {
331
332 kp2 = PMIX_NEW(pmix_kval_t);
333 kp2->key = strdup(PMIX_HOSTNAME);
334 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
335 kp2->value->type = PMIX_STRING;
336 kp2->value->data.string = strdup(nodes[n]);
337 rank = strtol(procs[m], NULL, 10);
338 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, rank, kp2))) {
339 PMIX_ERROR_LOG(rc);
340 PMIX_RELEASE(kp2);
341 pmix_argv_free(procs);
342 return rc;
343 }
344 PMIX_RELEASE(kp2);
345 }
346 pmix_argv_free(procs);
347 }
348
349
350
351
352 kp2 = PMIX_NEW(pmix_kval_t);
353 kp2->key = strdup(PMIX_NODE_LIST);
354 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
355 kp2->value->type = PMIX_STRING;
356 kp2->value->data.string = pmix_argv_join(nodes, ',');
357 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kp2))) {
358 PMIX_ERROR_LOG(rc);
359 PMIX_RELEASE(kp2);
360 return rc;
361 }
362 PMIX_RELEASE(kp2);
363
364 return PMIX_SUCCESS;
365 }
366
367 pmix_status_t hash_cache_job_info(struct pmix_namespace_t *ns,
368 pmix_info_t info[], size_t ninfo)
369 {
370 pmix_namespace_t *nptr = (pmix_namespace_t*)ns;
371 pmix_hash_trkr_t *trk, *t;
372 pmix_hash_table_t *ht;
373 pmix_kval_t *kp2, *kvptr;
374 pmix_info_t *iptr;
375 char **nodes=NULL, **procs=NULL;
376 uint8_t *tmp;
377 pmix_rank_t rank;
378 pmix_status_t rc=PMIX_SUCCESS;
379 size_t n, j, size, len;
380
381 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
382 "[%s:%d] gds:hash:cache_job_info for nspace %s",
383 pmix_globals.myid.nspace, pmix_globals.myid.rank,
384 nptr->nspace);
385
386
387 trk = NULL;
388 PMIX_LIST_FOREACH(t, &myhashes, pmix_hash_trkr_t) {
389 if (0 == strcmp(nptr->nspace, t->ns)) {
390 trk = t;
391 break;
392 }
393 }
394 if (NULL == trk) {
395
396 trk = PMIX_NEW(pmix_hash_trkr_t);
397 if (NULL == trk) {
398 return PMIX_ERR_NOMEM;
399 }
400 PMIX_RETAIN(nptr);
401 trk->nptr = nptr;
402 trk->ns = strdup(nptr->nspace);
403 pmix_list_append(&myhashes, &trk->super);
404 }
405
406
407
408 if (NULL == info || 0 == ninfo) {
409 return PMIX_SUCCESS;
410 }
411
412
413 ht = &trk->internal;
414 for (n=0; n < ninfo; n++) {
415 if (0 == strcmp(info[n].key, PMIX_NODE_MAP)) {
416
417
418 kp2 = PMIX_NEW(pmix_kval_t);
419 kp2->key = strdup(PMIX_NODE_MAP);
420 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
421 kp2->value->type = PMIX_STRING;
422 kp2->value->data.string = strdup(info[n].value.data.string);
423 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kp2))) {
424 PMIX_ERROR_LOG(rc);
425 PMIX_RELEASE(kp2);
426 return rc;
427 }
428 PMIX_RELEASE(kp2);
429
430
431 if (PMIX_SUCCESS != (rc = pmix_preg.parse_nodes(info[n].value.data.string, &nodes))) {
432 PMIX_ERROR_LOG(rc);
433 goto release;
434 }
435
436
437 if (NULL != procs) {
438 if (PMIX_SUCCESS != (rc = store_map(ht, nodes, procs))) {
439 PMIX_ERROR_LOG(rc);
440 goto release;
441 }
442 }
443 } else if (0 == strcmp(info[n].key, PMIX_PROC_MAP)) {
444
445 if (PMIX_SUCCESS != (rc = pmix_preg.parse_procs(info[n].value.data.string, &procs))) {
446 PMIX_ERROR_LOG(rc);
447 goto release;
448 }
449
450
451 if (NULL != nodes) {
452 if (PMIX_SUCCESS != (rc = store_map(ht, nodes, procs))) {
453 PMIX_ERROR_LOG(rc);
454 goto release;
455 }
456 }
457 } else if (0 == strcmp(info[n].key, PMIX_PROC_DATA)) {
458
459 if (PMIX_DATA_ARRAY != info[n].value.type) {
460 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
461 rc = PMIX_ERR_TYPE_MISMATCH;
462 goto release;
463 }
464 size = info[n].value.data.darray->size;
465 iptr = (pmix_info_t*)info[n].value.data.darray->array;
466
467 if (0 != strcmp(iptr[0].key, PMIX_RANK) ||
468 PMIX_PROC_RANK != iptr[0].value.type) {
469 rc = PMIX_ERR_TYPE_MISMATCH;
470 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
471 goto release;
472 }
473 rank = iptr[0].value.data.rank;
474
475 for (j=1; j < size; j++) {
476 kp2 = PMIX_NEW(pmix_kval_t);
477 if (NULL == kp2) {
478 rc = PMIX_ERR_NOMEM;
479 goto release;
480 }
481 kp2->key = strdup(iptr[j].key);
482 PMIX_VALUE_XFER(rc, kp2->value, &iptr[j].value);
483 if (PMIX_SUCCESS != rc) {
484 PMIX_ERROR_LOG(rc);
485 PMIX_RELEASE(kp2);
486 goto release;
487 }
488
489
490 if (PMIX_STRING_SIZE_CHECK(kp2->value)) {
491 if (pmix_compress.compress_string(kp2->value->data.string, &tmp, &len)) {
492 if (NULL == tmp) {
493 PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
494 rc = PMIX_ERR_NOMEM;
495 goto release;
496 }
497 kp2->value->type = PMIX_COMPRESSED_STRING;
498 free(kp2->value->data.string);
499 kp2->value->data.bo.bytes = (char*)tmp;
500 kp2->value->data.bo.size = len;
501 }
502 }
503
504 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, rank, kp2))) {
505 PMIX_ERROR_LOG(rc);
506 PMIX_RELEASE(kp2);
507 goto release;
508 }
509 PMIX_RELEASE(kp2);
510 }
511 } else {
512
513 kp2 = PMIX_NEW(pmix_kval_t);
514 if (NULL == kp2) {
515 rc = PMIX_ERR_NOMEM;
516 goto release;
517 }
518 kp2->key = strdup(info[n].key);
519 PMIX_VALUE_XFER(rc, kp2->value, &info[n].value);
520 if (PMIX_SUCCESS != rc) {
521 PMIX_ERROR_LOG(rc);
522 PMIX_RELEASE(kp2);
523 goto release;
524 }
525
526
527 if (PMIX_STRING_SIZE_CHECK(kp2->value)) {
528 if (pmix_compress.compress_string(kp2->value->data.string, &tmp, &len)) {
529 if (NULL == tmp) {
530 rc = PMIX_ERR_NOMEM;
531 PMIX_ERROR_LOG(rc);
532 PMIX_RELEASE(kp2);
533 goto release;
534 }
535 kp2->value->type = PMIX_COMPRESSED_STRING;
536 free(kp2->value->data.string);
537 kp2->value->data.bo.bytes = (char*)tmp;
538 kp2->value->data.bo.size = len;
539 }
540 }
541 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kp2))) {
542 PMIX_ERROR_LOG(rc);
543 PMIX_RELEASE(kp2);
544 goto release;
545 }
546 PMIX_RELEASE(kp2);
547
548 if (0 == strncmp(info[n].key, PMIX_JOB_SIZE, PMIX_MAX_KEYLEN)) {
549 nptr->nprocs = info[n].value.data.uint32;
550 }
551 }
552 }
553
554
555 if (!trk->gdata_added) {
556 PMIX_LIST_FOREACH(kvptr, &pmix_server_globals.gdata, pmix_kval_t) {
557
558
559 kp2 = PMIX_NEW(pmix_kval_t);
560 if (NULL == kp2) {
561 rc = PMIX_ERR_NOMEM;
562 goto release;
563 }
564 kp2->key = strdup(kvptr->key);
565 PMIX_VALUE_XFER(rc, kp2->value, kvptr->value);
566 if (PMIX_SUCCESS != rc) {
567 PMIX_ERROR_LOG(rc);
568 PMIX_RELEASE(kp2);
569 goto release;
570 }
571 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kp2))) {
572 PMIX_ERROR_LOG(rc);
573 PMIX_RELEASE(kp2);
574 break;
575 }
576 PMIX_RELEASE(kp2);
577 }
578 trk->gdata_added = true;
579 }
580
581 release:
582 if (NULL != nodes) {
583 pmix_argv_free(nodes);
584 }
585 if (NULL != procs) {
586 pmix_argv_free(procs);
587 }
588 return rc;
589 }
590
591 static pmix_status_t register_info(pmix_peer_t *peer,
592 pmix_namespace_t *ns,
593 pmix_buffer_t *reply)
594 {
595 pmix_hash_trkr_t *trk, *t;
596 pmix_hash_table_t *ht;
597 pmix_value_t *val, blob;
598 pmix_status_t rc = PMIX_SUCCESS;
599 pmix_info_t *info;
600 size_t ninfo, n;
601 pmix_kval_t kv;
602 pmix_buffer_t buf;
603 pmix_rank_t rank;
604
605 trk = NULL;
606 PMIX_LIST_FOREACH(t, &myhashes, pmix_hash_trkr_t) {
607 if (0 == strcmp(ns->nspace, t->ns)) {
608 trk = t;
609 break;
610 }
611 }
612 if (NULL == trk) {
613 return PMIX_ERR_INVALID_NAMESPACE;
614 }
615
616 ht = &trk->internal;
617
618
619 val = NULL;
620 rc = pmix_hash_fetch(ht, PMIX_RANK_WILDCARD, NULL, &val);
621 if (PMIX_SUCCESS != rc) {
622 PMIX_ERROR_LOG(rc);
623 if (NULL != val) {
624 PMIX_VALUE_RELEASE(val);
625 }
626 return rc;
627 }
628
629 if (NULL == val || NULL == val->data.darray ||
630 PMIX_INFO != val->data.darray->type ||
631 0 == val->data.darray->size) {
632 return PMIX_ERR_NOT_FOUND;
633 }
634 info = (pmix_info_t*)val->data.darray->array;
635 ninfo = val->data.darray->size;
636 for (n=0; n < ninfo; n++) {
637 kv.key = info[n].key;
638 kv.value = &info[n].value;
639 PMIX_BFROPS_PACK(rc, peer, reply, &kv, 1, PMIX_KVAL);
640 }
641 if (NULL != val) {
642 PMIX_VALUE_RELEASE(val);
643 }
644
645 for (rank=0; rank < ns->nprocs; rank++) {
646 val = NULL;
647 rc = pmix_hash_fetch(ht, rank, NULL, &val);
648 if (PMIX_SUCCESS != rc) {
649 PMIX_ERROR_LOG(rc);
650 if (NULL != val) {
651 PMIX_VALUE_RELEASE(val);
652 }
653 return rc;
654 }
655 if (NULL == val) {
656 return PMIX_ERR_NOT_FOUND;
657 }
658 PMIX_CONSTRUCT(&buf, pmix_buffer_t);
659 PMIX_BFROPS_PACK(rc, peer, &buf, &rank, 1, PMIX_PROC_RANK);
660
661 info = (pmix_info_t*)val->data.darray->array;
662 ninfo = val->data.darray->size;
663 for (n=0; n < ninfo; n++) {
664 kv.key = info[n].key;
665 kv.value = &info[n].value;
666 PMIX_BFROPS_PACK(rc, peer, &buf, &kv, 1, PMIX_KVAL);
667 }
668 kv.key = PMIX_PROC_BLOB;
669 kv.value = &blob;
670 blob.type = PMIX_BYTE_OBJECT;
671 PMIX_UNLOAD_BUFFER(&buf, blob.data.bo.bytes, blob.data.bo.size);
672 PMIX_BFROPS_PACK(rc, peer, reply, &kv, 1, PMIX_KVAL);
673 PMIX_VALUE_DESTRUCT(&blob);
674 PMIX_DESTRUCT(&buf);
675
676 if (NULL != val) {
677 PMIX_VALUE_RELEASE(val);
678 }
679 }
680 return rc;
681 }
682
683
684
685
686 static pmix_status_t hash_register_job_info(struct pmix_peer_t *pr,
687 pmix_buffer_t *reply)
688 {
689 pmix_peer_t *peer = (pmix_peer_t*)pr;
690 pmix_namespace_t *ns = peer->nptr;
691 char *msg;
692 pmix_status_t rc;
693 pmix_hash_trkr_t *trk, *t2;
694
695 if (!PMIX_PROC_IS_SERVER(pmix_globals.mypeer) &&
696 !PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer)) {
697
698 PMIX_ERROR_LOG(PMIX_ERR_NOT_SUPPORTED);
699 return PMIX_ERR_NOT_SUPPORTED;
700 }
701
702 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
703 "[%s:%d] gds:hash:register_job_info for peer [%s:%d]",
704 pmix_globals.myid.nspace, pmix_globals.myid.rank,
705 peer->info->pname.nspace, peer->info->pname.rank);
706
707
708
709
710 if (NULL != ns->jobbkt) {
711
712 PMIX_BFROPS_COPY_PAYLOAD(rc, peer, reply, ns->jobbkt);
713 if (PMIX_SUCCESS != rc) {
714 PMIX_ERROR_LOG(rc);
715 }
716
717
718 if (ns->ndelivered == ns->nlocalprocs) {
719
720
721 PMIX_RELEASE(ns->jobbkt);
722 ns->jobbkt = NULL;
723 }
724 return rc;
725 }
726
727
728
729 trk = NULL;
730 PMIX_LIST_FOREACH(t2, &myhashes, pmix_hash_trkr_t) {
731 if (ns == t2->nptr) {
732 trk = t2;
733 if (NULL == trk->ns) {
734 trk->ns = strdup(ns->nspace);
735 }
736 break;
737 }
738 }
739 if (NULL == trk) {
740 trk = PMIX_NEW(pmix_hash_trkr_t);
741 trk->ns = strdup(ns->nspace);
742 PMIX_RETAIN(ns);
743 trk->nptr = ns;
744 pmix_list_append(&myhashes, &trk->super);
745 }
746
747
748
749
750
751 msg = ns->nspace;
752 PMIX_BFROPS_PACK(rc, peer, reply, &msg, 1, PMIX_STRING);
753 if (PMIX_SUCCESS != rc) {
754 PMIX_ERROR_LOG(rc);
755 return rc;
756 }
757
758 rc = register_info(peer, ns, reply);
759 if (PMIX_SUCCESS == rc) {
760
761
762 if (1 < ns->nlocalprocs) {
763 PMIX_RETAIN(reply);
764 ns->jobbkt = reply;
765 }
766 } else {
767 PMIX_ERROR_LOG(rc);
768 }
769
770 return rc;
771 }
772
773 static pmix_status_t hash_store_job_info(const char *nspace,
774 pmix_buffer_t *buf)
775 {
776 pmix_status_t rc = PMIX_SUCCESS;
777 pmix_kval_t *kptr, *kp2, kv;
778 pmix_value_t *val;
779 int32_t cnt;
780 size_t nnodes, len, n;
781 uint32_t i, j;
782 char **procs = NULL;
783 uint8_t *tmp;
784 pmix_byte_object_t *bo;
785 pmix_buffer_t buf2;
786 int rank;
787 pmix_hash_trkr_t *htptr;
788 pmix_hash_table_t *ht;
789 char **nodelist = NULL;
790 pmix_info_t *info, *iptr;
791
792 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
793 "[%s:%u] pmix:gds:hash store job info for nspace %s",
794 pmix_globals.myid.nspace, pmix_globals.myid.rank, nspace);
795
796 if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer) &&
797 !PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer)) {
798
799 PMIX_ERROR_LOG(PMIX_ERR_NOT_SUPPORTED);
800 return PMIX_ERR_NOT_SUPPORTED;
801 }
802
803
804 if ((NULL == buf) || (0 == buf->bytes_used)) {
805 rc = PMIX_ERR_BAD_PARAM;
806 PMIX_ERROR_LOG(rc);
807 return rc;
808 }
809
810
811 ht = NULL;
812 PMIX_LIST_FOREACH(htptr, &myhashes, pmix_hash_trkr_t) {
813 if (0 == strcmp(htptr->ns, nspace)) {
814 ht = &htptr->internal;
815 break;
816 }
817 }
818 if (NULL == ht) {
819
820 htptr = PMIX_NEW(pmix_hash_trkr_t);
821 htptr->ns = strdup(nspace);
822 pmix_list_append(&myhashes, &htptr->super);
823 ht = &htptr->internal;
824 }
825
826 cnt = 1;
827 kptr = PMIX_NEW(pmix_kval_t);
828 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
829 buf, kptr, &cnt, PMIX_KVAL);
830 while (PMIX_SUCCESS == rc) {
831 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
832 "[%s:%u] pmix:gds:hash store job info working key %s",
833 pmix_globals.myid.nspace, pmix_globals.myid.rank, kptr->key);
834 if (0 == strcmp(kptr->key, PMIX_PROC_BLOB)) {
835 bo = &(kptr->value->data.bo);
836 PMIX_CONSTRUCT(&buf2, pmix_buffer_t);
837 PMIX_LOAD_BUFFER(pmix_client_globals.myserver, &buf2, bo->bytes, bo->size);
838
839 cnt = 1;
840 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
841 &buf2, &rank, &cnt, PMIX_PROC_RANK);
842 if (PMIX_SUCCESS != rc) {
843 PMIX_ERROR_LOG(rc);
844 PMIX_DESTRUCT(&buf2);
845 return rc;
846 }
847
848 cnt = 1;
849 kp2 = PMIX_NEW(pmix_kval_t);
850 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
851 &buf2, kp2, &cnt, PMIX_KVAL);
852 while (PMIX_SUCCESS == rc) {
853
854
855 if (PMIX_STRING_SIZE_CHECK(kp2->value)) {
856 if (pmix_compress.compress_string(kp2->value->data.string, &tmp, &len)) {
857 if (NULL == tmp) {
858 PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
859 rc = PMIX_ERR_NOMEM;
860 return rc;
861 }
862 kp2->value->type = PMIX_COMPRESSED_STRING;
863 free(kp2->value->data.string);
864 kp2->value->data.bo.bytes = (char*)tmp;
865 kp2->value->data.bo.size = len;
866 }
867 }
868
869
870 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, rank, kp2))) {
871 PMIX_ERROR_LOG(rc);
872 PMIX_RELEASE(kp2);
873 PMIX_DESTRUCT(&buf2);
874 return rc;
875 }
876 PMIX_RELEASE(kp2);
877 cnt = 1;
878 kp2 = PMIX_NEW(pmix_kval_t);
879 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
880 &buf2, kp2, &cnt, PMIX_KVAL);
881 }
882
883 PMIX_DESTRUCT(&buf2);
884 PMIX_RELEASE(kp2);
885 } else if (0 == strcmp(kptr->key, PMIX_MAP_BLOB)) {
886
887 bo = &(kptr->value->data.bo);
888 PMIX_CONSTRUCT(&buf2, pmix_buffer_t);
889 PMIX_LOAD_BUFFER(pmix_client_globals.myserver, &buf2, bo->bytes, bo->size);
890
891 cnt = 1;
892 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
893 &buf2, &nnodes, &cnt, PMIX_SIZE);
894 if (PMIX_SUCCESS != rc) {
895 PMIX_ERROR_LOG(rc);
896 PMIX_DESTRUCT(&buf2);
897 return rc;
898 }
899
900 for (i=0; i < nnodes; i++) {
901 cnt = 1;
902 PMIX_CONSTRUCT(&kv, pmix_kval_t);
903 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
904 &buf2, &kv, &cnt, PMIX_KVAL);
905 if (PMIX_SUCCESS != rc) {
906 PMIX_ERROR_LOG(rc);
907 PMIX_DESTRUCT(&buf2);
908 PMIX_DESTRUCT(&kv);
909 return rc;
910 }
911
912 pmix_argv_append_nosize(&nodelist, kv.key);
913
914
915 rc = pmix_hash_fetch(ht, PMIX_RANK_WILDCARD, kv.key, &val);
916 if (PMIX_SUCCESS == rc) {
917
918 kp2 = PMIX_NEW(pmix_kval_t);
919 kp2->key = strdup(kv.key);
920 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
921 kp2->value->type = PMIX_DATA_ARRAY;
922 kp2->value->data.darray = (pmix_data_array_t*)malloc(sizeof(pmix_data_array_t));
923 if (NULL == kp2->value->data.darray) {
924 PMIX_DESTRUCT(&buf2);
925 PMIX_DESTRUCT(&kv);
926 PMIX_RELEASE(kp2);
927 return PMIX_ERR_NOMEM;
928 }
929 kp2->value->data.darray->type = PMIX_INFO;
930 kp2->value->data.darray->size = val->data.darray->size + 1;
931 PMIX_INFO_CREATE(info, kp2->value->data.darray->size);
932 if (NULL == info) {
933 PMIX_DESTRUCT(&buf2);
934 PMIX_DESTRUCT(&kv);
935 PMIX_RELEASE(kp2);
936 return PMIX_ERR_NOMEM;
937 }
938 iptr = (pmix_info_t*)val->data.darray->array;
939
940 for (n=0; n < val->data.darray->size; n++) {
941 PMIX_INFO_XFER(&info[n], &iptr[n]);
942 }
943 PMIX_INFO_LOAD(&info[kp2->value->data.darray->size-1], PMIX_LOCAL_PEERS, kv.value->data.string, PMIX_STRING);
944 kp2->value->data.darray->array = info;
945 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kp2))) {
946 PMIX_ERROR_LOG(rc);
947 PMIX_RELEASE(kp2);
948 PMIX_DESTRUCT(&kv);
949 PMIX_DESTRUCT(&buf2);
950 return rc;
951 }
952 PMIX_RELEASE(kp2);
953 } else {
954
955 kp2 = PMIX_NEW(pmix_kval_t);
956 kp2->key = strdup(kv.key);
957 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
958 kp2->value->type = PMIX_DATA_ARRAY;
959 kp2->value->data.darray = (pmix_data_array_t*)malloc(sizeof(pmix_data_array_t));
960 if (NULL == kp2->value->data.darray) {
961 PMIX_DESTRUCT(&buf2);
962 PMIX_DESTRUCT(&kv);
963 PMIX_RELEASE(kp2);
964 return PMIX_ERR_NOMEM;
965 }
966 kp2->value->data.darray->type = PMIX_INFO;
967 PMIX_INFO_CREATE(info, 1);
968 if (NULL == info) {
969 PMIX_DESTRUCT(&buf2);
970 PMIX_DESTRUCT(&kv);
971 PMIX_RELEASE(kp2);
972 return PMIX_ERR_NOMEM;
973 }
974 PMIX_INFO_LOAD(&info[0], PMIX_LOCAL_PEERS, kv.value->data.string, PMIX_STRING);
975 kp2->value->data.darray->array = info;
976 kp2->value->data.darray->size = 1;
977 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kp2))) {
978 PMIX_ERROR_LOG(rc);
979 PMIX_RELEASE(kp2);
980 PMIX_DESTRUCT(&kv);
981 PMIX_DESTRUCT(&buf2);
982 return rc;
983 }
984 PMIX_RELEASE(kp2);
985 }
986
987
988 procs = pmix_argv_split(kv.value->data.string, ',');
989 for (j=0; NULL != procs[j]; j++) {
990
991
992
993 kp2 = PMIX_NEW(pmix_kval_t);
994 kp2->key = strdup(PMIX_HOSTNAME);
995 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
996 kp2->value->type = PMIX_STRING;
997 kp2->value->data.string = strdup(kv.key);
998 rank = strtol(procs[j], NULL, 10);
999 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, rank, kp2))) {
1000 PMIX_ERROR_LOG(rc);
1001 PMIX_RELEASE(kp2);
1002 PMIX_DESTRUCT(&kv);
1003 PMIX_DESTRUCT(&buf2);
1004 pmix_argv_free(procs);
1005 return rc;
1006 }
1007 PMIX_RELEASE(kp2);
1008 }
1009 pmix_argv_free(procs);
1010 PMIX_DESTRUCT(&kv);
1011 }
1012 if (NULL != nodelist) {
1013
1014
1015 kp2 = PMIX_NEW(pmix_kval_t);
1016 kp2->key = strdup(PMIX_NODE_LIST);
1017 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
1018 kp2->value->type = PMIX_STRING;
1019 kp2->value->data.string = pmix_argv_join(nodelist, ',');
1020 pmix_argv_free(nodelist);
1021 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kp2))) {
1022 PMIX_ERROR_LOG(rc);
1023 PMIX_RELEASE(kp2);
1024 PMIX_DESTRUCT(&kv);
1025 PMIX_DESTRUCT(&buf2);
1026 return rc;
1027 }
1028 PMIX_RELEASE(kp2);
1029 }
1030
1031 PMIX_DESTRUCT(&buf2);
1032 } else {
1033
1034
1035 if (PMIX_STRING_SIZE_CHECK(kptr->value)) {
1036 if (pmix_compress.compress_string(kptr->value->data.string, &tmp, &len)) {
1037 if (NULL == tmp) {
1038 PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
1039 rc = PMIX_ERR_NOMEM;
1040 return rc;
1041 }
1042 kptr->value->type = PMIX_COMPRESSED_STRING;
1043 free(kptr->value->data.string);
1044 kptr->value->data.bo.bytes = (char*)tmp;
1045 kptr->value->data.bo.size = len;
1046 }
1047 }
1048 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1049 "[%s:%u] pmix:gds:hash store job info storing key %s for WILDCARD rank",
1050 pmix_globals.myid.nspace, pmix_globals.myid.rank, kptr->key);
1051 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kptr))) {
1052 PMIX_ERROR_LOG(rc);
1053 PMIX_RELEASE(kptr);
1054 return rc;
1055 }
1056 }
1057 PMIX_RELEASE(kptr);
1058 kptr = PMIX_NEW(pmix_kval_t);
1059 cnt = 1;
1060 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
1061 buf, kptr, &cnt, PMIX_KVAL);
1062 }
1063
1064 PMIX_RELEASE(kptr);
1065
1066 if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
1067 PMIX_ERROR_LOG(rc);
1068 } else {
1069 rc = PMIX_SUCCESS;
1070 }
1071 return rc;
1072 }
1073
1074 static pmix_status_t hash_store(const pmix_proc_t *proc,
1075 pmix_scope_t scope,
1076 pmix_kval_t *kv)
1077 {
1078 pmix_hash_trkr_t *trk, *t;
1079 pmix_status_t rc;
1080 pmix_kval_t *kp;
1081
1082 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1083 "[%s:%d] gds:hash:hash_store for proc [%s:%d] key %s type %s scope %s",
1084 pmix_globals.myid.nspace, pmix_globals.myid.rank,
1085 proc->nspace, proc->rank, kv->key,
1086 PMIx_Data_type_string(kv->value->type), PMIx_Scope_string(scope));
1087
1088 if (NULL == kv->key) {
1089 return PMIX_ERR_BAD_PARAM;
1090 }
1091
1092
1093 trk = NULL;
1094 PMIX_LIST_FOREACH(t, &myhashes, pmix_hash_trkr_t) {
1095 if (0 == strcmp(proc->nspace, t->ns)) {
1096 trk = t;
1097 break;
1098 }
1099 }
1100 if (NULL == trk) {
1101
1102 trk = PMIX_NEW(pmix_hash_trkr_t);
1103 trk->ns = strdup(proc->nspace);
1104 pmix_list_append(&myhashes, &trk->super);
1105 }
1106
1107
1108 if (proc->rank == pmix_globals.myid.rank &&
1109 0 == strncmp(proc->nspace, pmix_globals.myid.nspace, PMIX_MAX_NSLEN)) {
1110 if (PMIX_INTERNAL != scope) {
1111
1112
1113 kp = PMIX_NEW(pmix_kval_t);
1114 if (NULL == kp) {
1115 return PMIX_ERR_NOMEM;
1116 }
1117 kp->key = strdup(kv->key);
1118 kp->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
1119 if (NULL == kp->value) {
1120 PMIX_RELEASE(kp);
1121 return PMIX_ERR_NOMEM;
1122 }
1123 PMIX_BFROPS_VALUE_XFER(rc, pmix_globals.mypeer, kp->value, kv->value);
1124 if (PMIX_SUCCESS != rc) {
1125 PMIX_RELEASE(kp);
1126 return rc;
1127 }
1128 if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->internal, proc->rank, kp))) {
1129 PMIX_ERROR_LOG(rc);
1130 PMIX_RELEASE(kp);
1131 return rc;
1132 }
1133 PMIX_RELEASE(kp);
1134 }
1135 }
1136
1137
1138 if (PMIX_INTERNAL == scope) {
1139 if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->internal, proc->rank, kv))) {
1140 PMIX_ERROR_LOG(rc);
1141 return rc;
1142 }
1143 } else if (PMIX_REMOTE == scope) {
1144 if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->remote, proc->rank, kv))) {
1145 PMIX_ERROR_LOG(rc);
1146 return rc;
1147 }
1148 } else if (PMIX_LOCAL == scope) {
1149 if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->local, proc->rank, kv))) {
1150 PMIX_ERROR_LOG(rc);
1151 return rc;
1152 }
1153 } else if (PMIX_GLOBAL == scope) {
1154 if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->remote, proc->rank, kv))) {
1155 PMIX_ERROR_LOG(rc);
1156 return rc;
1157 }
1158
1159
1160 kp = PMIX_NEW(pmix_kval_t);
1161 if (NULL == kp) {
1162 return PMIX_ERR_NOMEM;
1163 }
1164 kp->key = strdup(kv->key);
1165 kp->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
1166 if (NULL == kp->value) {
1167 PMIX_RELEASE(kp);
1168 return PMIX_ERR_NOMEM;
1169 }
1170 PMIX_BFROPS_VALUE_XFER(rc, pmix_globals.mypeer, kp->value, kv->value);
1171 if (PMIX_SUCCESS != rc) {
1172 PMIX_ERROR_LOG(rc);
1173 PMIX_RELEASE(kp);
1174 return rc;
1175 }
1176 if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->local, proc->rank, kp))) {
1177 PMIX_ERROR_LOG(rc);
1178 PMIX_RELEASE(kp);
1179 return rc;
1180 }
1181 PMIX_RELEASE(kp);
1182 } else {
1183 return PMIX_ERR_BAD_PARAM;
1184 }
1185
1186 return PMIX_SUCCESS;
1187 }
1188
1189
1190
1191
1192
1193 static pmix_status_t hash_store_modex(struct pmix_namespace_t *nspace,
1194 pmix_buffer_t *buf,
1195 void *cbdata) {
1196 return pmix_gds_base_store_modex(nspace, buf, NULL,
1197 _hash_store_modex, cbdata);
1198 }
1199
1200 static pmix_status_t _hash_store_modex(pmix_gds_base_ctx_t ctx,
1201 pmix_proc_t *proc,
1202 pmix_gds_modex_key_fmt_t key_fmt,
1203 char **kmap,
1204 pmix_buffer_t *pbkt)
1205 {
1206 pmix_hash_trkr_t *trk, *t;
1207 pmix_status_t rc = PMIX_SUCCESS;
1208 pmix_kval_t *kv;
1209
1210 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1211 "[%s:%d] gds:hash:store_modex for nspace %s",
1212 pmix_globals.myid.nspace, pmix_globals.myid.rank,
1213 proc->nspace);
1214
1215
1216 trk = NULL;
1217 PMIX_LIST_FOREACH(t, &myhashes, pmix_hash_trkr_t) {
1218 if (0 == strcmp(proc->nspace, t->ns)) {
1219 trk = t;
1220 break;
1221 }
1222 }
1223 if (NULL == trk) {
1224
1225 trk = PMIX_NEW(pmix_hash_trkr_t);
1226 trk->ns = strdup(proc->nspace);
1227 pmix_list_append(&myhashes, &trk->super);
1228 }
1229
1230
1231
1232
1233
1234
1235
1236
1237 kv = PMIX_NEW(pmix_kval_t);
1238 rc = pmix_gds_base_modex_unpack_kval(key_fmt, pbkt, kmap, kv);
1239
1240 while (PMIX_SUCCESS == rc) {
1241
1242 if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->remote, proc->rank, kv))) {
1243 PMIX_ERROR_LOG(rc);
1244 return rc;
1245 }
1246 PMIX_RELEASE(kv);
1247
1248 kv = PMIX_NEW(pmix_kval_t);
1249 rc = pmix_gds_base_modex_unpack_kval(key_fmt, pbkt, kmap, kv);
1250 }
1251 PMIX_RELEASE(kv);
1252 if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
1253 PMIX_ERROR_LOG(rc);
1254 } else {
1255 rc = PMIX_SUCCESS;
1256 }
1257 return rc;
1258 }
1259
1260
1261 static pmix_status_t hash_fetch(const pmix_proc_t *proc,
1262 pmix_scope_t scope, bool copy,
1263 const char *key,
1264 pmix_info_t qualifiers[], size_t nqual,
1265 pmix_list_t *kvs)
1266 {
1267 pmix_hash_trkr_t *trk, *t;
1268 pmix_status_t rc;
1269 pmix_value_t *val;
1270 pmix_kval_t *kv;
1271 pmix_info_t *info;
1272 size_t n, ninfo;
1273 pmix_hash_table_t *ht;
1274
1275 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1276 "[%s:%u] pmix:gds:hash fetch %s for proc %s:%u on scope %s",
1277 pmix_globals.myid.nspace, pmix_globals.myid.rank,
1278 (NULL == key) ? "NULL" : key,
1279 proc->nspace, proc->rank, PMIx_Scope_string(scope));
1280
1281
1282
1283
1284 if (NULL == key && PMIX_RANK_WILDCARD == proc->rank) {
1285
1286
1287 trk = NULL;
1288 PMIX_LIST_FOREACH(t, &myhashes, pmix_hash_trkr_t) {
1289 if (0 == strcmp(proc->nspace, t->ns)) {
1290 trk = t;
1291 break;
1292 }
1293 }
1294 if (NULL == trk) {
1295
1296 return PMIX_ERR_INVALID_NAMESPACE;
1297 }
1298
1299 ht = &trk->internal;
1300
1301 val = NULL;
1302 rc = pmix_hash_fetch(ht, PMIX_RANK_WILDCARD, NULL, &val);
1303 if (PMIX_SUCCESS != rc) {
1304 if (NULL != val) {
1305 PMIX_VALUE_RELEASE(val);
1306 }
1307 return rc;
1308 }
1309 if (NULL == val) {
1310 return PMIX_ERR_NOT_FOUND;
1311 }
1312
1313
1314 if (PMIX_DATA_ARRAY != val->type ||
1315 NULL == val->data.darray ||
1316 PMIX_INFO != val->data.darray->type) {
1317 PMIX_VALUE_RELEASE(val);
1318 return PMIX_ERR_INVALID_VAL;
1319 }
1320 info = (pmix_info_t*)val->data.darray->array;
1321 ninfo = val->data.darray->size;
1322 for (n=0; n < ninfo; n++) {
1323 kv = PMIX_NEW(pmix_kval_t);
1324 if (NULL == kv) {
1325 rc = PMIX_ERR_NOMEM;
1326 PMIX_VALUE_RELEASE(val);
1327 return rc;
1328 }
1329 kv->key = strdup(info[n].key);
1330 PMIX_VALUE_XFER(rc, kv->value, &info[n].value);
1331 if (PMIX_SUCCESS != rc) {
1332 PMIX_ERROR_LOG(rc);
1333 PMIX_RELEASE(kv);
1334 PMIX_VALUE_RELEASE(val);
1335 return rc;
1336 }
1337 pmix_list_append(kvs, &kv->super);
1338 }
1339 PMIX_VALUE_RELEASE(val);
1340 return PMIX_SUCCESS;
1341 }
1342
1343
1344 trk = NULL;
1345 PMIX_LIST_FOREACH(t, &myhashes, pmix_hash_trkr_t) {
1346 if (0 == strcmp(proc->nspace, t->ns)) {
1347 trk = t;
1348 break;
1349 }
1350 }
1351 if (NULL == trk) {
1352 return PMIX_ERR_INVALID_NAMESPACE;
1353 }
1354
1355
1356
1357
1358 if (PMIX_INTERNAL == scope ||
1359 PMIX_SCOPE_UNDEF == scope ||
1360 PMIX_GLOBAL == scope ||
1361 PMIX_RANK_WILDCARD == proc->rank) {
1362 ht = &trk->internal;
1363 } else if (PMIX_LOCAL == scope ||
1364 PMIX_GLOBAL == scope) {
1365 ht = &trk->local;
1366 } else if (PMIX_REMOTE == scope) {
1367 ht = &trk->remote;
1368 } else {
1369 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
1370 return PMIX_ERR_BAD_PARAM;
1371 }
1372
1373 doover:
1374 rc = pmix_hash_fetch(ht, proc->rank, key, &val);
1375 if (PMIX_SUCCESS == rc) {
1376
1377
1378 if (NULL == key) {
1379 if (NULL == val->data.darray ||
1380 PMIX_INFO != val->data.darray->type ||
1381 0 == val->data.darray->size) {
1382 PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND);
1383 return PMIX_ERR_NOT_FOUND;
1384 }
1385 info = (pmix_info_t*)val->data.darray->array;
1386 ninfo = val->data.darray->size;
1387 for (n=0; n < ninfo; n++) {
1388 kv = PMIX_NEW(pmix_kval_t);
1389 if (NULL == kv) {
1390 PMIX_VALUE_RELEASE(val);
1391 return PMIX_ERR_NOMEM;
1392 }
1393 kv->key = strdup(info[n].key);
1394 kv->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
1395 if (NULL == kv->value) {
1396 PMIX_VALUE_RELEASE(val);
1397 PMIX_RELEASE(kv);
1398 return PMIX_ERR_NOMEM;
1399 }
1400 PMIX_BFROPS_VALUE_XFER(rc, pmix_globals.mypeer,
1401 kv->value, &info[n].value);
1402 if (PMIX_SUCCESS != rc) {
1403 PMIX_ERROR_LOG(rc);
1404 PMIX_VALUE_RELEASE(val);
1405 PMIX_RELEASE(kv);
1406 return rc;
1407 }
1408 pmix_list_append(kvs, &kv->super);
1409 }
1410 PMIX_VALUE_RELEASE(val);
1411 if (PMIX_GLOBAL == scope && ht == &trk->local) {
1412
1413 ht = &trk->remote;
1414 goto doover;
1415 }
1416 return PMIX_SUCCESS;
1417 }
1418
1419 kv = PMIX_NEW(pmix_kval_t);
1420 if (NULL == kv) {
1421 PMIX_VALUE_RELEASE(val);
1422 return PMIX_ERR_NOMEM;
1423 }
1424 kv->key = strdup(key);
1425 kv->value = val;
1426 pmix_list_append(kvs, &kv->super);
1427 } else {
1428 if (PMIX_GLOBAL == scope ||
1429 PMIX_SCOPE_UNDEF == scope) {
1430 if (ht == &trk->internal) {
1431
1432 ht = &trk->local;
1433 goto doover;
1434 } else if (ht == &trk->local) {
1435
1436 ht = &trk->remote;
1437 goto doover;
1438 }
1439 }
1440 }
1441
1442 return rc;
1443 }
1444
1445 static pmix_status_t setup_fork(const pmix_proc_t *proc, char ***env)
1446 {
1447
1448 return PMIX_SUCCESS;
1449 }
1450
1451 static pmix_status_t nspace_add(const char *nspace,
1452 pmix_info_t info[],
1453 size_t ninfo)
1454 {
1455
1456 return PMIX_SUCCESS;
1457 }
1458
1459 static pmix_status_t nspace_del(const char *nspace)
1460 {
1461 pmix_hash_trkr_t *t;
1462
1463
1464 PMIX_LIST_FOREACH(t, &myhashes, pmix_hash_trkr_t) {
1465 if (0 == strcmp(nspace, t->ns)) {
1466
1467 pmix_list_remove_item(&myhashes, &t->super);
1468 PMIX_RELEASE(t);
1469 break;
1470 }
1471 }
1472 return PMIX_SUCCESS;
1473 }
1474
1475 static pmix_status_t assemb_kvs_req(const pmix_proc_t *proc,
1476 pmix_list_t *kvs,
1477 pmix_buffer_t *buf,
1478 void *cbdata)
1479 {
1480 pmix_status_t rc = PMIX_SUCCESS;
1481 pmix_server_caddy_t *cd = (pmix_server_caddy_t*)cbdata;
1482 pmix_kval_t *kv;
1483
1484 if (!PMIX_PROC_IS_V1(cd->peer)) {
1485 PMIX_BFROPS_PACK(rc, cd->peer, buf, proc, 1, PMIX_PROC);
1486 if (PMIX_SUCCESS != rc) {
1487 return rc;
1488 }
1489 }
1490 PMIX_LIST_FOREACH(kv, kvs, pmix_kval_t) {
1491 PMIX_BFROPS_PACK(rc, cd->peer, buf, kv, 1, PMIX_KVAL);
1492 if (PMIX_SUCCESS != rc) {
1493 return rc;
1494 }
1495 }
1496 return rc;
1497 }
1498
1499 static pmix_status_t accept_kvs_resp(pmix_buffer_t *buf)
1500 {
1501 pmix_status_t rc = PMIX_SUCCESS;
1502 int32_t cnt;
1503 pmix_byte_object_t bo;
1504 pmix_buffer_t pbkt;
1505 pmix_kval_t *kv;
1506 pmix_proc_t proct;
1507
1508
1509
1510
1511
1512
1513
1514
1515 cnt = 1;
1516 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
1517 buf, &bo, &cnt, PMIX_BYTE_OBJECT);
1518 while (PMIX_SUCCESS == rc) {
1519
1520 PMIX_CONSTRUCT(&pbkt, pmix_buffer_t);
1521 PMIX_LOAD_BUFFER(pmix_client_globals.myserver,
1522 &pbkt, bo.bytes, bo.size);
1523
1524 cnt = 1;
1525 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
1526 &pbkt, &proct, &cnt, PMIX_PROC);
1527 if (PMIX_SUCCESS != rc) {
1528 PMIX_ERROR_LOG(rc);
1529 return rc;
1530 }
1531 cnt = 1;
1532 kv = PMIX_NEW(pmix_kval_t);
1533 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
1534 &pbkt, kv, &cnt, PMIX_KVAL);
1535 while (PMIX_SUCCESS == rc) {
1536
1537
1538
1539
1540
1541 PMIX_GDS_STORE_KV(rc, pmix_globals.mypeer, &proct, PMIX_INTERNAL, kv);
1542 if (PMIX_SUCCESS != rc) {
1543 PMIX_ERROR_LOG(rc);
1544 PMIX_RELEASE(kv);
1545 PMIX_DESTRUCT(&pbkt);
1546 return rc;
1547 }
1548 PMIX_RELEASE(kv);
1549
1550 kv = PMIX_NEW(pmix_kval_t);
1551 cnt = 1;
1552 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
1553 &pbkt, kv, &cnt, PMIX_KVAL);
1554 }
1555 PMIX_RELEASE(kv);
1556 if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
1557 PMIX_ERROR_LOG(rc);
1558 PMIX_DESTRUCT(&pbkt);
1559 return rc;
1560 }
1561 PMIX_DESTRUCT(&pbkt);
1562
1563 cnt = 1;
1564 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
1565 buf, &bo, &cnt, PMIX_BYTE_OBJECT);
1566 }
1567 if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
1568 PMIX_ERROR_LOG(rc);
1569 return rc;
1570 }
1571 return rc;
1572 }