This source file includes following definitions.
- PMI_Init
- PMI_Initialized
- PMI_Finalize
- PMI_Get_size
- PMI_Get_rank
- PMI_Get_universe_size
- PMI_Get_appnum
- PMI_Barrier
- PMI_Abort
- PMI_KVS_Get_my_name
- PMI_KVS_Get_name_length_max
- PMI_KVS_Get_key_length_max
- PMI_KVS_Get_value_length_max
- PMI_KVS_Put
- PMI_KVS_Commit
- PMI_KVS_Get
- PMI_Get_clique_size
- PMI_Get_clique_ranks
- kvs_get
- kvs_put
- cache_put_uint
- cache_put_string
- flux_init
- flux_fini
- flux_initialized
- flux_abort
- flux_spawn
- flux_put
- flux_commit
- flux_fence
- flux_get
- flux_publish
- flux_lookup
- flux_unpublish
- flux_job_connect
- flux_job_disconnect
- flux_store_local
- flux_get_nspace
- flux_register_jobid
- pmix_error
1
2
3
4
5
6
7
8
9
10
11
12
13
14 #include "opal_config.h"
15 #include "opal/constants.h"
16 #include "opal/types.h"
17
18 #include "opal_stdint.h"
19 #include "opal/mca/hwloc/base/base.h"
20 #include "opal/util/argv.h"
21 #include "opal/util/opal_environ.h"
22 #include "opal/util/output.h"
23 #include "opal/util/proc.h"
24 #include "opal/util/show_help.h"
25
26 #include <string.h>
27 #if defined (HAVE_FLUX_PMI_LIBRARY)
28 #include <pmi.h>
29 #else
30 #include <dlfcn.h>
31 #endif
32
33 #include "opal/mca/pmix/base/base.h"
34 #include "opal/mca/pmix/base/pmix_base_fns.h"
35 #include "opal/mca/pmix/base/pmix_base_hash.h"
36 #include "pmix_flux.h"
37
38 static int flux_init(opal_list_t *ilist);
39 static int flux_fini(void);
40 static int flux_initialized(void);
41 static int flux_abort(int flag, const char msg[],
42 opal_list_t *procs);
43 static int flux_commit(void);
44 static int flux_fence(opal_list_t *procs, int collect_data);
45 static int flux_put(opal_pmix_scope_t scope,
46 opal_value_t *kv);
47 static int flux_get(const opal_process_name_t *id,
48 const char *key, opal_list_t *info,
49 opal_value_t **kv);
50 static int flux_publish(opal_list_t *info);
51 static int flux_lookup(opal_list_t *data, opal_list_t *info);
52 static int flux_unpublish(char **keys, opal_list_t *info);
53 static int flux_spawn(opal_list_t *jobinfo, opal_list_t *apps, opal_jobid_t *jobid);
54 static int flux_job_connect(opal_list_t *procs);
55 static int flux_job_disconnect(opal_list_t *procs);
56 static int flux_store_local(const opal_process_name_t *proc,
57 opal_value_t *val);
58 static const char *flux_get_nspace(opal_jobid_t jobid);
59 static void flux_register_jobid(opal_jobid_t jobid, const char *nspace);
60
61 const opal_pmix_base_module_t opal_pmix_flux_module = {
62 .init = flux_init,
63 .finalize = flux_fini,
64 .initialized = flux_initialized,
65 .abort = flux_abort,
66 .commit = flux_commit,
67 .fence = flux_fence,
68 .put = flux_put,
69 .get = flux_get,
70 .publish = flux_publish,
71 .lookup = flux_lookup,
72 .unpublish = flux_unpublish,
73 .spawn = flux_spawn,
74 .connect = flux_job_connect,
75 .disconnect = flux_job_disconnect,
76 .register_evhandler = opal_pmix_base_register_handler,
77 .deregister_evhandler = opal_pmix_base_deregister_handler,
78 .store_local = flux_store_local,
79 .get_nspace = flux_get_nspace,
80 .register_jobid = flux_register_jobid
81 };
82
83
84 static int pmix_init_count = 0;
85
86
87 static int pmix_kvslen_max = 0;
88 static int pmix_keylen_max = 0;
89 static int pmix_vallen_max = 0;
90 static int pmix_vallen_threshold = INT_MAX;
91
92
93 static char *pmix_kvs_name = NULL;
94 static bool flux_committed = false;
95 static char* pmix_packed_data = NULL;
96 static int pmix_packed_data_offset = 0;
97 static char* pmix_packed_encoded_data = NULL;
98 static int pmix_packed_encoded_data_offset = 0;
99 static int pmix_pack_key = 0;
100 static opal_process_name_t flux_pname;
101 static int *lranks = NULL, nlranks;
102
103 static char* pmix_error(int pmix_err);
104 #define OPAL_PMI_ERROR(pmi_err, pmi_func) \
105 do { \
106 opal_output(0, "%s [%s:%d:%s]: %s\n", \
107 pmi_func, __FILE__, __LINE__, __func__, \
108 pmix_error(pmi_err)); \
109 } while(0);
110
111
112 #if !defined (HAVE_FLUX_PMI_LIBRARY)
113
114
115
116 #define PMI_SUCCESS 0
117 #define PMI_FAIL -1
118 #define PMI_ERR_INIT 1
119 #define PMI_ERR_NOMEM 2
120 #define PMI_ERR_INVALID_ARG 3
121 #define PMI_ERR_INVALID_KEY 4
122 #define PMI_ERR_INVALID_KEY_LENGTH 5
123 #define PMI_ERR_INVALID_VAL 6
124 #define PMI_ERR_INVALID_VAL_LENGTH 7
125 #define PMI_ERR_INVALID_LENGTH 8
126 #define PMI_ERR_INVALID_NUM_ARGS 9
127 #define PMI_ERR_INVALID_ARGS 10
128 #define PMI_ERR_INVALID_NUM_PARSED 11
129 #define PMI_ERR_INVALID_KEYVALP 12
130 #define PMI_ERR_INVALID_SIZE 13
131
132 static void *dso = NULL;
133
134 static int PMI_Init (int *spawned)
135 {
136 int (*f)(int *);
137 if (!dso) {
138 const char *path;
139 if ((path = getenv ("FLUX_PMI_LIBRARY_PATH")))
140 dso = dlopen (path, RTLD_NOW | RTLD_GLOBAL);
141 if (!dso)
142 return PMI_FAIL;
143 }
144 *(void **)(&f) = dlsym (dso, "PMI_Init");
145 return f ? f (spawned) : PMI_FAIL;
146 }
147
148 static int PMI_Initialized (int *initialized)
149 {
150 int (*f)(int *);
151 if (!dso) {
152 if (initialized)
153 *initialized = 0;
154 return PMI_SUCCESS;
155 }
156 *(void **)(&f) = dlsym (dso, "PMI_Initialized");
157 return f ? f (initialized) : PMI_FAIL;
158 }
159
160 static int PMI_Finalize (void)
161 {
162 int (*f)(void);
163 int rc;
164 if (!dso)
165 return PMI_SUCCESS;
166 *(void **)(&f) = dlsym (dso, "PMI_Finalize");
167 rc = f ? f () : PMI_FAIL;
168 dlclose (dso);
169 return rc;
170 }
171
172 static int PMI_Get_size (int *size)
173 {
174 int (*f)(int *);
175 *(void **)(&f) = dso ? dlsym (dso, "PMI_Get_size") : NULL;
176 return f ? f (size) : PMI_FAIL;
177 }
178
179 static int PMI_Get_rank (int *rank)
180 {
181 int (*f)(int *);
182 *(void **)(&f) = dso ? dlsym (dso, "PMI_Get_rank") : NULL;
183 return f ? f (rank) : PMI_FAIL;
184 }
185
186 static int PMI_Get_universe_size (int *size)
187 {
188 int (*f)(int *);
189 *(void **)(&f) = dso ? dlsym (dso, "PMI_Get_universe_size") : NULL;
190 return f ? f (size) : PMI_FAIL;
191 }
192
193 static int PMI_Get_appnum (int *appnum)
194 {
195 int (*f)(int *);
196 *(void **)(&f) = dso ? dlsym (dso, "PMI_Get_appnum") : NULL;
197 return f ? f (appnum) : PMI_FAIL;
198 }
199
200 static int PMI_Barrier (void)
201 {
202 int (*f)(void);
203 *(void **)(&f) = dso ? dlsym (dso, "PMI_Barrier") : NULL;
204 return f ? f () : PMI_FAIL;
205 }
206
207 static int PMI_Abort (int exit_code, const char *error_msg)
208 {
209 int (*f)(int, const char *);
210 *(void **)(&f) = dso ? dlsym (dso, "PMI_Abort") : NULL;
211 return f ? f (exit_code, error_msg) : PMI_FAIL;
212 }
213
214 static int PMI_KVS_Get_my_name (char *kvsname, int length)
215 {
216 int (*f)(char *, int);
217 *(void **)(&f) = dso ? dlsym (dso, "PMI_KVS_Get_my_name") : NULL;
218 return f ? f (kvsname, length) : PMI_FAIL;
219 }
220
221 static int PMI_KVS_Get_name_length_max (int *length)
222 {
223 int (*f)(int *);
224 *(void **)(&f) = dso ? dlsym (dso, "PMI_KVS_Get_name_length_max") : NULL;
225 return f ? f (length) : PMI_FAIL;
226 }
227
228 static int PMI_KVS_Get_key_length_max (int *length)
229 {
230 int (*f)(int *);
231 *(void **)(&f) = dso ? dlsym (dso, "PMI_KVS_Get_key_length_max") : NULL;
232 return f ? f (length) : PMI_FAIL;
233 }
234
235 static int PMI_KVS_Get_value_length_max (int *length)
236 {
237 int (*f)(int *);
238 *(void **)(&f) = dso ? dlsym (dso, "PMI_KVS_Get_value_length_max") : NULL;
239 return f ? f (length) : PMI_FAIL;
240 }
241
242 static int PMI_KVS_Put (const char *kvsname, const char *key, const char *value)
243 {
244 int (*f)(const char *, const char *, const char *);
245 *(void **)(&f) = dso ? dlsym (dso, "PMI_KVS_Put") : NULL;
246 return f ? f (kvsname, key, value) : PMI_FAIL;
247 }
248
249 static int PMI_KVS_Commit (const char *kvsname)
250 {
251 int (*f)(const char *);
252 *(void **)(&f) = dso ? dlsym (dso, "PMI_KVS_Commit") : NULL;
253 return f ? f (kvsname) : PMI_FAIL;
254 }
255
256 static int PMI_KVS_Get (const char *kvsname, const char *key,
257 char *value, int len)
258 {
259 int (*f)(const char *, const char *, char *, int);
260 *(void **)(&f) = dso ? dlsym (dso, "PMI_KVS_Get") : NULL;
261 return f ? f (kvsname, key, value, len) : PMI_FAIL;
262 }
263
264 static int PMI_Get_clique_size (int *size)
265 {
266 int (*f)(int *);
267 *(void **)(&f) = dso ? dlsym (dso, "PMI_Get_clique_size") : NULL;
268 return f ? f (size) : PMI_FAIL;
269 }
270
271 static int PMI_Get_clique_ranks (int *ranks, int length)
272 {
273 int (*f)(int *, int);
274 *(void **)(&f) = dso ? dlsym (dso, "PMI_Get_clique_ranks") : NULL;
275 return f ? f (ranks, length) : PMI_FAIL;
276 }
277
278 #endif
279
280 static int kvs_get(const char key[], char value [], int maxvalue)
281 {
282 int rc;
283 rc = PMI_KVS_Get(pmix_kvs_name, key, value, maxvalue);
284 if( PMI_SUCCESS != rc ){
285
286 return OPAL_ERROR;
287 }
288 return OPAL_SUCCESS;
289 }
290
291 static int kvs_put(const char key[], const char value[])
292 {
293 int rc;
294 rc = PMI_KVS_Put(pmix_kvs_name, key, value);
295 if( PMI_SUCCESS != rc ){
296 OPAL_PMI_ERROR(rc, "PMI_KVS_Put");
297 return OPAL_ERROR;
298 }
299 return rc;
300 }
301
302 static int cache_put_uint(opal_process_name_t *id, int type,
303 const char key[], uint64_t val)
304 {
305 char *cpy;
306 opal_value_t kv;
307 int ret;
308
309 if (!(cpy = strdup (key))) {
310 ret = OPAL_ERR_OUT_OF_RESOURCE;
311 goto done;
312 }
313 OBJ_CONSTRUCT(&kv, opal_value_t);
314 kv.key = cpy;
315 kv.type = type;
316 switch (type) {
317 case OPAL_UINT16:
318 kv.data.uint16 = val;
319 break;
320 case OPAL_UINT32:
321 kv.data.uint32 = val;
322 break;
323 case OPAL_UINT64:
324 kv.data.uint64 = val;
325 break;
326 default:
327 ret = OPAL_ERROR;
328 goto done_free;
329 }
330 ret = opal_pmix_base_store(id, &kv);
331 done_free:
332 OBJ_DESTRUCT(&kv);
333 done:
334 if (OPAL_SUCCESS != ret)
335 OPAL_ERROR_LOG(ret);
336 return ret;
337 }
338
339 static int cache_put_string (opal_process_name_t *id,
340 const char key[], char *val)
341 {
342 char *cpy;
343 opal_value_t kv;
344 int ret;
345
346 if (!(cpy = strdup (key))) {
347 ret = OPAL_ERR_OUT_OF_RESOURCE;
348 goto done;
349 }
350 OBJ_CONSTRUCT(&kv, opal_value_t);
351 kv.key = cpy;
352 kv.type = OPAL_STRING;
353 kv.data.string = val;
354 ret = opal_pmix_base_store(id, &kv);
355 OBJ_DESTRUCT(&kv);
356 done:
357 if (OPAL_SUCCESS != ret)
358 OPAL_ERROR_LOG(ret);
359 return ret;
360 }
361
362 static int flux_init(opal_list_t *ilist)
363 {
364 int initialized;
365 int spawned;
366 int rc, ret = OPAL_ERROR;
367 int i, rank, lrank, nrank;
368 char tmp[64];
369 const char *jobid;
370 opal_process_name_t ldr;
371 char **localranks=NULL;
372 opal_process_name_t wildcard_rank;
373 char *str;
374
375 if (0 < pmix_init_count) {
376 return OPAL_SUCCESS;
377 }
378
379 if (PMI_SUCCESS != (rc = PMI_Initialized(&initialized))) {
380 OPAL_PMI_ERROR(rc, "PMI_Initialized");
381 return OPAL_ERROR;
382 }
383
384 if (!initialized && PMI_SUCCESS != (rc = PMI_Init(&spawned))) {
385 OPAL_PMI_ERROR(rc, "PMI_Init");
386 return OPAL_ERROR;
387 }
388
389
390 opal_pmix_base_hash_init();
391
392
393 rc = PMI_KVS_Get_value_length_max(&pmix_vallen_max);
394 if (PMI_SUCCESS != rc) {
395 OPAL_PMI_ERROR(rc, "PMI_KVS_Get_value_length_max");
396 goto err_exit;
397 }
398 pmix_vallen_threshold = pmix_vallen_max * 3;
399 pmix_vallen_threshold >>= 2;
400
401 rc = PMI_KVS_Get_name_length_max(&pmix_kvslen_max);
402 if (PMI_SUCCESS != rc) {
403 OPAL_PMI_ERROR(rc, "PMI_KVS_Get_name_length_max");
404 goto err_exit;
405 }
406
407 rc = PMI_KVS_Get_key_length_max(&pmix_keylen_max);
408 if (PMI_SUCCESS != rc) {
409 OPAL_PMI_ERROR(rc, "PMI_KVS_Get_key_length_max");
410 goto err_exit;
411 }
412
413
414 rc = PMI_Get_rank(&rank);
415 if (PMI_SUCCESS != rc) {
416 OPAL_PMI_ERROR(rc, "PMI_Get_rank");
417 goto err_exit;
418 }
419
420
421 if (!(jobid = getenv ("FLUX_JOB_ID"))) {
422 opal_output(0, "getenv FLUX_JOB_ID [%s:%d:%s]: failed\n",
423 __FILE__, __LINE__, __func__);
424 goto err_exit;
425 }
426 flux_pname.jobid = strtoul(jobid, NULL, 10);
427 ldr.jobid = flux_pname.jobid;
428 flux_pname.vpid = rank;
429
430
431
432
433 opal_proc_set_name(&flux_pname);
434 opal_output_verbose(2, opal_pmix_base_framework.framework_output,
435 "%s pmix:flux: assigned tmp name",
436 OPAL_NAME_PRINT(flux_pname));
437
438
439 wildcard_rank = OPAL_PROC_MY_NAME;
440 wildcard_rank.vpid = OPAL_VPID_WILDCARD;
441
442 if (OPAL_SUCCESS != (ret = cache_put_uint (&wildcard_rank,
443 OPAL_UINT32,
444 OPAL_PMIX_JOBID,
445 flux_pname.jobid)))
446 goto err_exit;
447 if (OPAL_SUCCESS != (ret = cache_put_uint (&OPAL_PROC_MY_NAME,
448 OPAL_UINT32,
449 OPAL_PMIX_RANK,
450 rank)))
451 goto err_exit;
452
453 pmix_kvs_name = (char*)malloc(pmix_kvslen_max);
454 if (pmix_kvs_name == NULL) {
455 ret = OPAL_ERR_OUT_OF_RESOURCE;
456 goto err_exit;
457 }
458
459 rc = PMI_KVS_Get_my_name(pmix_kvs_name, pmix_kvslen_max);
460 if (PMI_SUCCESS != rc) {
461 OPAL_PMI_ERROR(rc, "PMI_KVS_Get_my_name");
462 goto err_exit;
463 }
464
465
466 if (PMI_SUCCESS != (rc = PMI_Get_clique_size(&nlranks))) {
467 OPAL_PMI_ERROR(rc, "PMI_Get_clique_size");
468 goto err_exit;
469 }
470
471 if (OPAL_SUCCESS != (ret = cache_put_uint (&wildcard_rank,
472 OPAL_UINT32,
473 OPAL_PMIX_LOCAL_SIZE,
474 nlranks)))
475 goto err_exit;
476 lrank = 0;
477 nrank = 0;
478 ldr.vpid = rank;
479 if (0 < nlranks) {
480
481 lranks = (int*)calloc(nlranks, sizeof(int));
482 if (NULL == lranks) {
483 ret = OPAL_ERR_OUT_OF_RESOURCE;
484 OPAL_ERROR_LOG(rc);
485 goto err_exit;
486 }
487 if (PMI_SUCCESS != (rc = PMI_Get_clique_ranks(lranks, nlranks))) {
488 OPAL_PMI_ERROR(rc, "PMI_Get_clique_ranks");
489 free(lranks);
490 goto err_exit;
491 }
492
493 ldr.vpid = lranks[0];
494
495 memset(tmp, 0, 64);
496 for (i=0; i < nlranks; i++) {
497 (void)snprintf(tmp, 64, "%d", lranks[i]);
498 opal_argv_append_nosize(&localranks, tmp);
499 if (rank == lranks[i]) {
500 lrank = i;
501 nrank = i;
502 }
503 }
504 str = opal_argv_join(localranks, ',');
505 opal_argv_free(localranks);
506 if (OPAL_SUCCESS != (ret = cache_put_string (&wildcard_rank,
507 OPAL_PMIX_LOCAL_PEERS,
508 str)))
509 goto err_exit;
510 }
511
512
513 if (OPAL_SUCCESS != (ret = cache_put_uint (&OPAL_PROC_MY_NAME,
514 OPAL_UINT64,
515 OPAL_PMIX_LOCALLDR,
516 *(uint64_t*)&ldr)))
517 goto err_exit;
518
519 if (OPAL_SUCCESS != (ret = cache_put_uint (&OPAL_PROC_MY_NAME,
520 OPAL_UINT16,
521 OPAL_PMIX_LOCAL_RANK,
522 lrank)))
523 goto err_exit;
524
525 if (OPAL_SUCCESS != (ret = cache_put_uint (&OPAL_PROC_MY_NAME,
526 OPAL_UINT16,
527 OPAL_PMIX_NODE_RANK,
528 nrank)))
529 goto err_exit;
530
531 rc = PMI_Get_universe_size(&i);
532 if (PMI_SUCCESS != rc) {
533 OPAL_PMI_ERROR(rc, "PMI_Get_universe_size");
534 goto err_exit;
535 }
536
537 if (OPAL_SUCCESS != (ret = cache_put_uint (&wildcard_rank,
538 OPAL_UINT32,
539 OPAL_PMIX_UNIV_SIZE,
540 i)))
541 goto err_exit;
542 if (OPAL_SUCCESS != (ret = cache_put_uint (&wildcard_rank,
543 OPAL_UINT32,
544 OPAL_PMIX_MAX_PROCS,
545 i)))
546 goto err_exit;
547
548 rc = PMI_Get_size(&i);
549 if (PMI_SUCCESS != rc) {
550 OPAL_PMI_ERROR(rc, "PMI_Get_size");
551 goto err_exit;
552 }
553 if (OPAL_SUCCESS != (ret = cache_put_uint (&wildcard_rank,
554 OPAL_UINT32,
555 OPAL_PMIX_JOB_SIZE,
556 i)))
557 goto err_exit;
558
559
560 rc = PMI_Get_appnum(&i);
561 if (PMI_SUCCESS != rc) {
562 OPAL_PMI_ERROR(rc, "PMI_Get_appnum");
563 goto err_exit;
564 }
565 if (OPAL_SUCCESS != (ret = cache_put_uint (&OPAL_PROC_MY_NAME,
566 OPAL_UINT32,
567 OPAL_PMIX_APPNUM,
568 i)))
569 goto err_exit;
570
571
572 ++pmix_init_count;
573
574 return OPAL_SUCCESS;
575
576 err_exit:
577 PMI_Finalize();
578 return (ret == OPAL_SUCCESS ? OPAL_ERROR : ret);
579 }
580
581 static int flux_fini(void) {
582 if (0 == pmix_init_count) {
583 return OPAL_SUCCESS;
584 }
585
586 if (0 == --pmix_init_count) {
587 PMI_Finalize ();
588 }
589
590
591 opal_pmix_base_hash_finalize();
592
593 return OPAL_SUCCESS;
594 }
595
596 static int flux_initialized(void)
597 {
598 if (0 < pmix_init_count) {
599 return 1;
600 }
601 return 0;
602 }
603
604 static int flux_abort(int flag, const char msg[],
605 opal_list_t *procs)
606 {
607 PMI_Abort(flag, msg);
608 return OPAL_SUCCESS;
609 }
610
611 static int flux_spawn(opal_list_t *jobinfo, opal_list_t *apps, opal_jobid_t *jobid)
612 {
613
614
615
616
617
618
619
620
621
622
623
624
625 return OPAL_ERR_NOT_IMPLEMENTED;
626 }
627
628 static int flux_put(opal_pmix_scope_t scope,
629 opal_value_t *kv)
630 {
631 int rc;
632
633 opal_output_verbose(2, opal_pmix_base_framework.framework_output,
634 "%s pmix:flux put for key %s",
635 OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), kv->key);
636
637 if (OPAL_SUCCESS != (rc = opal_pmix_base_store_encoded (kv->key, (void*)&kv->data, kv->type, &pmix_packed_data, &pmix_packed_data_offset))) {
638 OPAL_ERROR_LOG(rc);
639 return rc;
640 }
641
642 if (pmix_packed_data_offset == 0) {
643
644 return OPAL_SUCCESS;
645 }
646
647 if (((pmix_packed_data_offset/3)*4) + pmix_packed_encoded_data_offset < pmix_vallen_max) {
648
649
650
651 return OPAL_SUCCESS;
652 }
653
654 rc = opal_pmix_base_partial_commit_packed (&pmix_packed_data, &pmix_packed_data_offset,
655 &pmix_packed_encoded_data, &pmix_packed_encoded_data_offset,
656 pmix_vallen_max, &pmix_pack_key, kvs_put);
657
658 flux_committed = false;
659 return rc;
660 }
661
662 static int flux_commit(void)
663 {
664 int rc;
665
666
667 opal_pmix_base_commit_packed (&pmix_packed_data, &pmix_packed_data_offset,
668 &pmix_packed_encoded_data, &pmix_packed_encoded_data_offset,
669 pmix_vallen_max, &pmix_pack_key, kvs_put);
670
671 if (PMI_SUCCESS != (rc = PMI_KVS_Commit(pmix_kvs_name))) {
672 OPAL_PMI_ERROR(rc, "PMI_KVS_Commit");
673 return OPAL_ERROR;
674 }
675 return OPAL_SUCCESS;
676 }
677
678 static int flux_fence(opal_list_t *procs, int collect_data)
679 {
680 int rc;
681 if (PMI_SUCCESS != (rc = PMI_Barrier())) {
682 OPAL_PMI_ERROR(rc, "PMI_Barrier");
683 return OPAL_ERROR;
684 }
685 return OPAL_SUCCESS;
686 }
687
688 static int flux_get(const opal_process_name_t *id,
689 const char *key, opal_list_t *info,
690 opal_value_t **kv)
691 {
692 int rc;
693 opal_output_verbose(2, opal_pmix_base_framework.framework_output,
694 "%s pmix:flux called get for key %s",
695 OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), key);
696
697
698
699 if (id->vpid == OPAL_VPID_WILDCARD) {
700 opal_list_t values;
701 OBJ_CONSTRUCT(&values, opal_list_t);
702 rc = opal_pmix_base_fetch (id, key, &values);
703 OPAL_LIST_DESTRUCT(&values);
704 if (OPAL_SUCCESS != rc) {
705 return rc;
706 }
707 }
708
709 rc = opal_pmix_base_cache_keys_locally(id, key, kv, pmix_kvs_name, pmix_vallen_max, kvs_get);
710 opal_output_verbose(2, opal_pmix_base_framework.framework_output,
711 "%s pmix:flux got key %s",
712 OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), key);
713
714 return rc;
715 }
716
717 static int flux_publish(opal_list_t *info)
718 {
719 return OPAL_ERR_NOT_SUPPORTED;
720 }
721
722 static int flux_lookup(opal_list_t *data, opal_list_t *info)
723 {
724
725
726 return OPAL_ERR_NOT_SUPPORTED;
727 }
728
729 static int flux_unpublish(char **keys, opal_list_t *info)
730 {
731 return OPAL_ERR_NOT_SUPPORTED;
732 }
733
734 static int flux_job_connect(opal_list_t *procs)
735 {
736 return OPAL_ERR_NOT_SUPPORTED;
737 }
738
739 static int flux_job_disconnect(opal_list_t *procs)
740 {
741 return OPAL_ERR_NOT_SUPPORTED;
742 }
743
744 static int flux_store_local(const opal_process_name_t *proc,
745 opal_value_t *val)
746 {
747 opal_pmix_base_store(proc, val);
748
749 return OPAL_SUCCESS;
750 }
751
752 static const char *flux_get_nspace(opal_jobid_t jobid)
753 {
754 return "N/A";
755 }
756 static void flux_register_jobid(opal_jobid_t jobid, const char *nspace)
757 {
758 return;
759 }
760
761 static char* pmix_error(int pmix_err)
762 {
763 char * err_msg;
764
765 switch(pmix_err) {
766 case PMI_FAIL: err_msg = "Operation failed"; break;
767 case PMI_ERR_INIT: err_msg = "PMI is not initialized"; break;
768 case PMI_ERR_NOMEM: err_msg = "Input buffer not large enough"; break;
769 case PMI_ERR_INVALID_ARG: err_msg = "Invalid argument"; break;
770 case PMI_ERR_INVALID_KEY: err_msg = "Invalid key argument"; break;
771 case PMI_ERR_INVALID_KEY_LENGTH: err_msg = "Invalid key length argument"; break;
772 case PMI_ERR_INVALID_VAL: err_msg = "Invalid value argument"; break;
773 case PMI_ERR_INVALID_VAL_LENGTH: err_msg = "Invalid value length argument"; break;
774 case PMI_ERR_INVALID_LENGTH: err_msg = "Invalid length argument"; break;
775 case PMI_ERR_INVALID_NUM_ARGS: err_msg = "Invalid number of arguments"; break;
776 case PMI_ERR_INVALID_ARGS: err_msg = "Invalid args argument"; break;
777 case PMI_ERR_INVALID_NUM_PARSED: err_msg = "Invalid num_parsed length argument"; break;
778 case PMI_ERR_INVALID_KEYVALP: err_msg = "Invalid keyvalp argument"; break;
779 case PMI_ERR_INVALID_SIZE: err_msg = "Invalid size argument"; break;
780 #if defined(PMI_ERR_INVALID_KVS)
781
782 case PMI_ERR_INVALID_KVS: err_msg = "Invalid kvs argument"; break;
783 #endif
784 case PMI_SUCCESS: err_msg = "Success"; break;
785 default: err_msg = "Unkown error";
786 }
787 return err_msg;
788 }