root/opal/mca/pmix/flux/pmix_flux.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. PMI_Init
  2. PMI_Initialized
  3. PMI_Finalize
  4. PMI_Get_size
  5. PMI_Get_rank
  6. PMI_Get_universe_size
  7. PMI_Get_appnum
  8. PMI_Barrier
  9. PMI_Abort
  10. PMI_KVS_Get_my_name
  11. PMI_KVS_Get_name_length_max
  12. PMI_KVS_Get_key_length_max
  13. PMI_KVS_Get_value_length_max
  14. PMI_KVS_Put
  15. PMI_KVS_Commit
  16. PMI_KVS_Get
  17. PMI_Get_clique_size
  18. PMI_Get_clique_ranks
  19. kvs_get
  20. kvs_put
  21. cache_put_uint
  22. cache_put_string
  23. flux_init
  24. flux_fini
  25. flux_initialized
  26. flux_abort
  27. flux_spawn
  28. flux_put
  29. flux_commit
  30. flux_fence
  31. flux_get
  32. flux_publish
  33. flux_lookup
  34. flux_unpublish
  35. flux_job_connect
  36. flux_job_disconnect
  37. flux_store_local
  38. flux_get_nspace
  39. flux_register_jobid
  40. pmix_error

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
   4  * Copyright (c) 2014-2016 Research Organization for Information Science
   5  *                         and Technology (RIST). All rights reserved.
   6  * Copyright (c) 2016 Cisco Systems, Inc.  All rights reserved.
   7  * $COPYRIGHT$
   8  *
   9  * Additional copyrights may follow
  10  *
  11  * $HEADER$
  12  */
  13 
  14 #include "opal_config.h"
  15 #include "opal/constants.h"
  16 #include "opal/types.h"
  17 
  18 #include "opal_stdint.h"
  19 #include "opal/mca/hwloc/base/base.h"
  20 #include "opal/util/argv.h"
  21 #include "opal/util/opal_environ.h"
  22 #include "opal/util/output.h"
  23 #include "opal/util/proc.h"
  24 #include "opal/util/show_help.h"
  25 
  26 #include <string.h>
  27 #if defined (HAVE_FLUX_PMI_LIBRARY)
  28 #include <pmi.h>
  29 #else
  30 #include <dlfcn.h>
  31 #endif
  32 
  33 #include "opal/mca/pmix/base/base.h"
  34 #include "opal/mca/pmix/base/pmix_base_fns.h"
  35 #include "opal/mca/pmix/base/pmix_base_hash.h"
  36 #include "pmix_flux.h"
  37 
  38 static int flux_init(opal_list_t *ilist);
  39 static int flux_fini(void);
  40 static int flux_initialized(void);
  41 static int flux_abort(int flag, const char msg[],
  42                     opal_list_t *procs);
  43 static int flux_commit(void);
  44 static int flux_fence(opal_list_t *procs, int collect_data);
  45 static int flux_put(opal_pmix_scope_t scope,
  46                   opal_value_t *kv);
  47 static int flux_get(const opal_process_name_t *id,
  48                   const char *key, opal_list_t *info,
  49                   opal_value_t **kv);
  50 static int flux_publish(opal_list_t *info);
  51 static int flux_lookup(opal_list_t *data, opal_list_t *info);
  52 static int flux_unpublish(char **keys, opal_list_t *info);
  53 static int flux_spawn(opal_list_t *jobinfo, opal_list_t *apps, opal_jobid_t *jobid);
  54 static int flux_job_connect(opal_list_t *procs);
  55 static int flux_job_disconnect(opal_list_t *procs);
  56 static int flux_store_local(const opal_process_name_t *proc,
  57                           opal_value_t *val);
  58 static const char *flux_get_nspace(opal_jobid_t jobid);
  59 static void flux_register_jobid(opal_jobid_t jobid, const char *nspace);
  60 
  61 const opal_pmix_base_module_t opal_pmix_flux_module = {
  62     .init = flux_init,
  63     .finalize = flux_fini,
  64     .initialized = flux_initialized,
  65     .abort = flux_abort,
  66     .commit = flux_commit,
  67     .fence = flux_fence,
  68     .put = flux_put,
  69     .get = flux_get,
  70     .publish = flux_publish,
  71     .lookup = flux_lookup,
  72     .unpublish = flux_unpublish,
  73     .spawn = flux_spawn,
  74     .connect = flux_job_connect,
  75     .disconnect = flux_job_disconnect,
  76     .register_evhandler = opal_pmix_base_register_handler,
  77     .deregister_evhandler = opal_pmix_base_deregister_handler,
  78     .store_local = flux_store_local,
  79     .get_nspace = flux_get_nspace,
  80     .register_jobid = flux_register_jobid
  81 };
  82 
  83 // usage accounting
  84 static int pmix_init_count = 0;
  85 
  86 // PMI constant values:
  87 static int pmix_kvslen_max = 0;
  88 static int pmix_keylen_max = 0;
  89 static int pmix_vallen_max = 0;
  90 static int pmix_vallen_threshold = INT_MAX;
  91 
  92 // Job environment description
  93 static char *pmix_kvs_name = NULL;
  94 static bool flux_committed = false;
  95 static char* pmix_packed_data = NULL;
  96 static int pmix_packed_data_offset = 0;
  97 static char* pmix_packed_encoded_data = NULL;
  98 static int pmix_packed_encoded_data_offset = 0;
  99 static int pmix_pack_key = 0;
 100 static opal_process_name_t flux_pname;
 101 static int *lranks = NULL, nlranks;
 102 
 103 static char* pmix_error(int pmix_err);
 104 #define OPAL_PMI_ERROR(pmi_err, pmi_func)                       \
 105     do {                                                        \
 106         opal_output(0, "%s [%s:%d:%s]: %s\n",                   \
 107             pmi_func, __FILE__, __LINE__, __func__,     \
 108             pmix_error(pmi_err));                        \
 109     } while(0);
 110 
 111 
 112 #if !defined (HAVE_FLUX_PMI_LIBRARY)
 113 //
 114 // Wrapper functions for dlopened() PMI library.
 115 //
 116 #define PMI_SUCCESS                  0
 117 #define PMI_FAIL                    -1
 118 #define PMI_ERR_INIT                 1
 119 #define PMI_ERR_NOMEM                2
 120 #define PMI_ERR_INVALID_ARG          3
 121 #define PMI_ERR_INVALID_KEY          4
 122 #define PMI_ERR_INVALID_KEY_LENGTH   5
 123 #define PMI_ERR_INVALID_VAL          6
 124 #define PMI_ERR_INVALID_VAL_LENGTH   7
 125 #define PMI_ERR_INVALID_LENGTH       8
 126 #define PMI_ERR_INVALID_NUM_ARGS     9
 127 #define PMI_ERR_INVALID_ARGS        10
 128 #define PMI_ERR_INVALID_NUM_PARSED  11
 129 #define PMI_ERR_INVALID_KEYVALP     12
 130 #define PMI_ERR_INVALID_SIZE        13
 131 
 132 static void *dso = NULL;
 133 
 134 static int PMI_Init (int *spawned)
 135 {
 136     int (*f)(int *);
 137     if (!dso) {
 138         const char *path;
 139         if ((path = getenv ("FLUX_PMI_LIBRARY_PATH")))
 140             dso = dlopen (path, RTLD_NOW | RTLD_GLOBAL);
 141         if (!dso)
 142             return PMI_FAIL;
 143     }
 144     *(void **)(&f) = dlsym (dso, "PMI_Init");
 145     return f ? f (spawned) : PMI_FAIL;
 146 }
 147 
 148 static int PMI_Initialized (int *initialized)
 149 {
 150     int (*f)(int *);
 151     if (!dso) {
 152         if (initialized)
 153             *initialized = 0;
 154         return PMI_SUCCESS;
 155     }
 156     *(void **)(&f) = dlsym (dso, "PMI_Initialized");
 157     return f ? f (initialized) : PMI_FAIL;
 158 }
 159 
 160 static int PMI_Finalize (void)
 161 {
 162     int (*f)(void);
 163     int rc;
 164     if (!dso)
 165         return PMI_SUCCESS;
 166     *(void **)(&f) = dlsym (dso, "PMI_Finalize");
 167     rc = f ? f () : PMI_FAIL;
 168     dlclose (dso);
 169     return rc;
 170 }
 171 
 172 static int PMI_Get_size (int *size)
 173 {
 174     int (*f)(int *);
 175     *(void **)(&f) = dso ? dlsym (dso, "PMI_Get_size") : NULL;
 176     return f ? f (size) : PMI_FAIL;
 177 }
 178 
 179 static int PMI_Get_rank (int *rank)
 180 {
 181     int (*f)(int *);
 182     *(void **)(&f) = dso ? dlsym (dso, "PMI_Get_rank") : NULL;
 183     return f ? f (rank) : PMI_FAIL;
 184 }
 185 
 186 static int PMI_Get_universe_size (int *size)
 187 {
 188     int (*f)(int *);
 189     *(void **)(&f) = dso ? dlsym (dso, "PMI_Get_universe_size") : NULL;
 190     return f ? f (size) : PMI_FAIL;
 191 }
 192 
 193 static int PMI_Get_appnum (int *appnum)
 194 {
 195     int (*f)(int *);
 196     *(void **)(&f) = dso ? dlsym (dso, "PMI_Get_appnum") : NULL;
 197     return f ? f (appnum) : PMI_FAIL;
 198 }
 199 
 200 static int PMI_Barrier (void)
 201 {
 202     int (*f)(void);
 203     *(void **)(&f) = dso ? dlsym (dso, "PMI_Barrier") : NULL;
 204     return f ? f () : PMI_FAIL;
 205 }
 206 
 207 static int PMI_Abort (int exit_code, const char *error_msg)
 208 {
 209     int (*f)(int, const char *);
 210     *(void **)(&f) = dso ? dlsym (dso, "PMI_Abort") : NULL;
 211     return f ? f (exit_code, error_msg) : PMI_FAIL;
 212 }
 213 
 214 static int PMI_KVS_Get_my_name (char *kvsname, int length)
 215 {
 216     int (*f)(char *, int);
 217     *(void **)(&f) = dso ? dlsym (dso, "PMI_KVS_Get_my_name") : NULL;
 218     return f ? f (kvsname, length) : PMI_FAIL;
 219 }
 220 
 221 static int PMI_KVS_Get_name_length_max (int *length)
 222 {
 223     int (*f)(int *);
 224     *(void **)(&f) = dso ? dlsym (dso, "PMI_KVS_Get_name_length_max") : NULL;
 225     return f ? f (length) : PMI_FAIL;
 226 }
 227 
 228 static int PMI_KVS_Get_key_length_max (int *length)
 229 {
 230     int (*f)(int *);
 231     *(void **)(&f) = dso ? dlsym (dso, "PMI_KVS_Get_key_length_max") : NULL;
 232     return f ? f (length) : PMI_FAIL;
 233 }
 234 
 235 static int PMI_KVS_Get_value_length_max (int *length)
 236 {
 237     int (*f)(int *);
 238     *(void **)(&f) = dso ? dlsym (dso, "PMI_KVS_Get_value_length_max") : NULL;
 239     return f ? f (length) : PMI_FAIL;
 240 }
 241 
 242 static int PMI_KVS_Put (const char *kvsname, const char *key, const char *value)
 243 {
 244     int (*f)(const char *, const char *, const char *);
 245     *(void **)(&f) = dso ? dlsym (dso, "PMI_KVS_Put") : NULL;
 246     return f ? f (kvsname, key, value) : PMI_FAIL;
 247 }
 248 
 249 static int PMI_KVS_Commit (const char *kvsname)
 250 {
 251     int (*f)(const char *);
 252     *(void **)(&f) = dso ? dlsym (dso, "PMI_KVS_Commit") : NULL;
 253     return f ? f (kvsname) : PMI_FAIL;
 254 }
 255 
 256 static int PMI_KVS_Get (const char *kvsname, const char *key,
 257                         char *value, int len)
 258 {
 259     int (*f)(const char *, const char *, char *, int);
 260     *(void **)(&f) = dso ? dlsym (dso, "PMI_KVS_Get") : NULL;
 261     return f ? f (kvsname, key, value, len) : PMI_FAIL;
 262 }
 263 
 264 static int PMI_Get_clique_size (int *size)
 265 {
 266     int (*f)(int *);
 267     *(void **)(&f) = dso ? dlsym (dso, "PMI_Get_clique_size") : NULL;
 268     return f ? f (size) : PMI_FAIL;
 269 }
 270 
 271 static int PMI_Get_clique_ranks (int *ranks, int length)
 272 {
 273     int (*f)(int *, int);
 274     *(void **)(&f) = dso ? dlsym (dso, "PMI_Get_clique_ranks") : NULL;
 275     return f ? f (ranks, length) : PMI_FAIL;
 276 }
 277 
 278 #endif /* !HAVE_FLUX_PMI_LIBRARY */
 279 
 280 static int kvs_get(const char key[], char value [], int maxvalue)
 281 {
 282     int rc;
 283     rc = PMI_KVS_Get(pmix_kvs_name, key, value, maxvalue);
 284     if( PMI_SUCCESS != rc ){
 285         /* silently return an error - might be okay */
 286         return OPAL_ERROR;
 287     }
 288     return OPAL_SUCCESS;
 289 }
 290 
 291 static int kvs_put(const char key[], const char value[])
 292 {
 293     int rc;
 294     rc = PMI_KVS_Put(pmix_kvs_name, key, value);
 295     if( PMI_SUCCESS != rc ){
 296         OPAL_PMI_ERROR(rc, "PMI_KVS_Put");
 297         return OPAL_ERROR;
 298     }
 299     return rc;
 300 }
 301 
 302 static int cache_put_uint(opal_process_name_t *id, int type,
 303                           const char key[], uint64_t val)
 304 {
 305     char *cpy;
 306     opal_value_t kv;
 307     int ret;
 308 
 309     if (!(cpy = strdup (key))) {
 310         ret = OPAL_ERR_OUT_OF_RESOURCE;
 311         goto done;
 312     }
 313     OBJ_CONSTRUCT(&kv, opal_value_t);
 314     kv.key = cpy;
 315     kv.type = type;
 316     switch (type) {
 317         case OPAL_UINT16:
 318             kv.data.uint16 = val;
 319             break;
 320         case OPAL_UINT32:
 321             kv.data.uint32 = val;
 322             break;
 323         case OPAL_UINT64:
 324             kv.data.uint64 = val;
 325             break;
 326         default:
 327             ret = OPAL_ERROR;
 328             goto done_free;
 329     }
 330     ret = opal_pmix_base_store(id, &kv);
 331 done_free:
 332     OBJ_DESTRUCT(&kv);
 333 done:
 334     if (OPAL_SUCCESS != ret)
 335         OPAL_ERROR_LOG(ret);
 336     return ret;
 337 }
 338 
 339 static int cache_put_string (opal_process_name_t *id,
 340                              const char key[], char *val)
 341 {
 342     char *cpy;
 343     opal_value_t kv;
 344     int ret;
 345 
 346     if (!(cpy = strdup (key))) {
 347         ret = OPAL_ERR_OUT_OF_RESOURCE;
 348         goto done;
 349     }
 350     OBJ_CONSTRUCT(&kv, opal_value_t);
 351     kv.key = cpy;
 352     kv.type = OPAL_STRING;
 353     kv.data.string = val;
 354     ret = opal_pmix_base_store(id, &kv);
 355     OBJ_DESTRUCT(&kv);
 356 done:
 357     if (OPAL_SUCCESS != ret)
 358         OPAL_ERROR_LOG(ret);
 359     return ret;
 360 }
 361 
 362 static int flux_init(opal_list_t *ilist)
 363 {
 364     int initialized;
 365     int spawned;
 366     int rc, ret = OPAL_ERROR;
 367     int i, rank, lrank, nrank;
 368     char tmp[64];
 369     const char *jobid;
 370     opal_process_name_t ldr;
 371     char **localranks=NULL;
 372     opal_process_name_t wildcard_rank;
 373     char *str;
 374 
 375     if (0 < pmix_init_count) {
 376         return OPAL_SUCCESS;
 377     }
 378 
 379     if (PMI_SUCCESS != (rc = PMI_Initialized(&initialized))) {
 380         OPAL_PMI_ERROR(rc, "PMI_Initialized");
 381         return OPAL_ERROR;
 382     }
 383 
 384     if (!initialized && PMI_SUCCESS != (rc = PMI_Init(&spawned))) {
 385         OPAL_PMI_ERROR(rc, "PMI_Init");
 386         return OPAL_ERROR;
 387     }
 388 
 389     // setup hash table
 390     opal_pmix_base_hash_init();
 391 
 392     // Initialize space demands
 393     rc = PMI_KVS_Get_value_length_max(&pmix_vallen_max);
 394     if (PMI_SUCCESS != rc) {
 395         OPAL_PMI_ERROR(rc, "PMI_KVS_Get_value_length_max");
 396         goto err_exit;
 397     }
 398     pmix_vallen_threshold = pmix_vallen_max * 3;
 399     pmix_vallen_threshold >>= 2;
 400 
 401     rc = PMI_KVS_Get_name_length_max(&pmix_kvslen_max);
 402     if (PMI_SUCCESS != rc) {
 403         OPAL_PMI_ERROR(rc, "PMI_KVS_Get_name_length_max");
 404         goto err_exit;
 405     }
 406 
 407     rc = PMI_KVS_Get_key_length_max(&pmix_keylen_max);
 408     if (PMI_SUCCESS != rc) {
 409         OPAL_PMI_ERROR(rc, "PMI_KVS_Get_key_length_max");
 410         goto err_exit;
 411     }
 412 
 413     /* get our rank */
 414     rc = PMI_Get_rank(&rank);
 415     if (PMI_SUCCESS != rc) {
 416         OPAL_PMI_ERROR(rc, "PMI_Get_rank");
 417         goto err_exit;
 418     }
 419 
 420     /* get integer job id */
 421     if (!(jobid = getenv ("FLUX_JOB_ID"))) {
 422         opal_output(0, "getenv FLUX_JOB_ID [%s:%d:%s]: failed\n",
 423                     __FILE__, __LINE__, __func__);
 424         goto err_exit;
 425     }
 426     flux_pname.jobid = strtoul(jobid, NULL, 10);
 427     ldr.jobid = flux_pname.jobid;
 428     flux_pname.vpid = rank;
 429     /* store our name in the opal_proc_t so that
 430      * debug messages will make sense - an upper
 431      * layer will eventually overwrite it, but that
 432      * won't do any harm */
 433     opal_proc_set_name(&flux_pname);
 434     opal_output_verbose(2, opal_pmix_base_framework.framework_output,
 435                         "%s pmix:flux: assigned tmp name",
 436                         OPAL_NAME_PRINT(flux_pname));
 437 
 438     /* setup wildcard rank*/
 439     wildcard_rank = OPAL_PROC_MY_NAME;
 440     wildcard_rank.vpid = OPAL_VPID_WILDCARD;
 441 
 442     if (OPAL_SUCCESS != (ret = cache_put_uint (&wildcard_rank,
 443                                                OPAL_UINT32,
 444                                                OPAL_PMIX_JOBID,
 445                                                flux_pname.jobid)))
 446         goto err_exit;
 447     if (OPAL_SUCCESS != (ret = cache_put_uint (&OPAL_PROC_MY_NAME,
 448                                                OPAL_UINT32,
 449                                                OPAL_PMIX_RANK,
 450                                                rank)))
 451         goto err_exit;
 452 
 453     pmix_kvs_name = (char*)malloc(pmix_kvslen_max);
 454     if (pmix_kvs_name == NULL) {
 455         ret = OPAL_ERR_OUT_OF_RESOURCE;
 456         goto err_exit;
 457     }
 458 
 459     rc = PMI_KVS_Get_my_name(pmix_kvs_name, pmix_kvslen_max);
 460     if (PMI_SUCCESS != rc) {
 461         OPAL_PMI_ERROR(rc, "PMI_KVS_Get_my_name");
 462         goto err_exit;
 463     }
 464 
 465     /* get our local proc info to find our local rank */
 466     if (PMI_SUCCESS != (rc = PMI_Get_clique_size(&nlranks))) {
 467         OPAL_PMI_ERROR(rc, "PMI_Get_clique_size");
 468         goto err_exit;
 469     }
 470     /* save the local size */
 471     if (OPAL_SUCCESS != (ret = cache_put_uint (&wildcard_rank,
 472                                                OPAL_UINT32,
 473                                                OPAL_PMIX_LOCAL_SIZE,
 474                                                nlranks)))
 475         goto err_exit;
 476     lrank = 0;
 477     nrank = 0;
 478     ldr.vpid = rank;
 479     if (0 < nlranks) {
 480         /* now get the specific ranks */
 481         lranks = (int*)calloc(nlranks, sizeof(int));
 482         if (NULL == lranks) {
 483             ret = OPAL_ERR_OUT_OF_RESOURCE;
 484             OPAL_ERROR_LOG(rc);
 485             goto err_exit;
 486         }
 487         if (PMI_SUCCESS != (rc = PMI_Get_clique_ranks(lranks, nlranks))) {
 488             OPAL_PMI_ERROR(rc, "PMI_Get_clique_ranks");
 489             free(lranks);
 490             goto err_exit;
 491         }
 492         /* note the local ldr */
 493         ldr.vpid = lranks[0];
 494         /* save this */
 495         memset(tmp, 0, 64);
 496         for (i=0; i < nlranks; i++) {
 497             (void)snprintf(tmp, 64, "%d", lranks[i]);
 498             opal_argv_append_nosize(&localranks, tmp);
 499             if (rank == lranks[i]) {
 500                 lrank = i;
 501                 nrank = i;
 502             }
 503         }
 504         str = opal_argv_join(localranks, ',');
 505         opal_argv_free(localranks);
 506         if (OPAL_SUCCESS != (ret = cache_put_string (&wildcard_rank,
 507                                                      OPAL_PMIX_LOCAL_PEERS,
 508                                                      str)))
 509             goto err_exit;
 510     }
 511 
 512     /* save the local leader */
 513     if (OPAL_SUCCESS != (ret = cache_put_uint (&OPAL_PROC_MY_NAME,
 514                                                OPAL_UINT64,
 515                                                OPAL_PMIX_LOCALLDR,
 516                                                *(uint64_t*)&ldr)))
 517         goto err_exit;
 518     /* save our local rank */
 519     if (OPAL_SUCCESS != (ret = cache_put_uint (&OPAL_PROC_MY_NAME,
 520                                                OPAL_UINT16,
 521                                                OPAL_PMIX_LOCAL_RANK,
 522                                                lrank)))
 523         goto err_exit;
 524     /* and our node rank */
 525     if (OPAL_SUCCESS != (ret = cache_put_uint (&OPAL_PROC_MY_NAME,
 526                                                OPAL_UINT16,
 527                                                OPAL_PMIX_NODE_RANK,
 528                                                nrank)))
 529         goto err_exit;
 530     /* get universe size */
 531     rc = PMI_Get_universe_size(&i);
 532     if (PMI_SUCCESS != rc) {
 533         OPAL_PMI_ERROR(rc, "PMI_Get_universe_size");
 534         goto err_exit;
 535     }
 536     /* push this into the dstore for subsequent fetches */
 537     if (OPAL_SUCCESS != (ret = cache_put_uint (&wildcard_rank,
 538                                                OPAL_UINT32,
 539                                                OPAL_PMIX_UNIV_SIZE,
 540                                                i)))
 541         goto err_exit;
 542     if (OPAL_SUCCESS != (ret = cache_put_uint (&wildcard_rank,
 543                                                OPAL_UINT32,
 544                                                OPAL_PMIX_MAX_PROCS,
 545                                                i)))
 546         goto err_exit;
 547     /* get job size */
 548     rc = PMI_Get_size(&i);
 549     if (PMI_SUCCESS != rc) {
 550         OPAL_PMI_ERROR(rc, "PMI_Get_size");
 551         goto err_exit;
 552     }
 553     if (OPAL_SUCCESS != (ret = cache_put_uint (&wildcard_rank,
 554                                                OPAL_UINT32,
 555                                                OPAL_PMIX_JOB_SIZE,
 556                                                i)))
 557         goto err_exit;
 558 
 559     /* get appnum */
 560     rc = PMI_Get_appnum(&i);
 561     if (PMI_SUCCESS != rc) {
 562         OPAL_PMI_ERROR(rc, "PMI_Get_appnum");
 563         goto err_exit;
 564     }
 565     if (OPAL_SUCCESS != (ret = cache_put_uint (&OPAL_PROC_MY_NAME,
 566                                                OPAL_UINT32,
 567                                                OPAL_PMIX_APPNUM,
 568                                                i)))
 569         goto err_exit;
 570 
 571     /* increment the init count */
 572     ++pmix_init_count;
 573 
 574     return OPAL_SUCCESS;
 575 
 576 err_exit:
 577     PMI_Finalize();
 578     return (ret == OPAL_SUCCESS ? OPAL_ERROR : ret);
 579 }
 580 
 581 static int flux_fini(void) {
 582     if (0 == pmix_init_count) {
 583         return OPAL_SUCCESS;
 584     }
 585 
 586     if (0 == --pmix_init_count) {
 587         PMI_Finalize ();
 588     }
 589 
 590     // teardown hash table
 591     opal_pmix_base_hash_finalize();
 592 
 593     return OPAL_SUCCESS;
 594 }
 595 
 596 static int flux_initialized(void)
 597 {
 598     if (0 < pmix_init_count) {
 599         return 1;
 600     }
 601     return 0;
 602 }
 603 
 604 static int flux_abort(int flag, const char msg[],
 605                     opal_list_t *procs)
 606 {
 607     PMI_Abort(flag, msg);
 608     return OPAL_SUCCESS;
 609 }
 610 
 611 static int flux_spawn(opal_list_t *jobinfo, opal_list_t *apps, opal_jobid_t *jobid)
 612 {
 613     /*
 614     int rc;
 615     size_t preput_vector_size;
 616     const int info_keyval_sizes[1];
 617     info_keyval_sizes[0] = (int)opal_list_get_size(info_keyval_vector);
 618     //FIXME what's the size of array of lists?
 619     preput_vector_size = opal_list_get_size(preput_keyval_vector);
 620     rc = PMI_Spawn_multiple(count, cmds, argcs, argvs, maxprocs, info_keyval_sizes, info_keyval_vector, (int)preput_vector_size, preput_keyval_vector);
 621     if( PMI_SUCCESS != rc ) {
 622         OPAL_PMI_ERROR(rc, "PMI_Spawn_multiple");
 623         return OPAL_ERROR;
 624     }*/
 625     return OPAL_ERR_NOT_IMPLEMENTED;
 626 }
 627 
 628 static int flux_put(opal_pmix_scope_t scope,
 629                   opal_value_t *kv)
 630 {
 631     int rc;
 632 
 633     opal_output_verbose(2, opal_pmix_base_framework.framework_output,
 634                         "%s pmix:flux put for key %s",
 635                         OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), kv->key);
 636 
 637     if (OPAL_SUCCESS != (rc = opal_pmix_base_store_encoded (kv->key, (void*)&kv->data, kv->type, &pmix_packed_data, &pmix_packed_data_offset))) {
 638         OPAL_ERROR_LOG(rc);
 639         return rc;
 640     }
 641 
 642     if (pmix_packed_data_offset == 0) {
 643         /* nothing to write */
 644         return OPAL_SUCCESS;
 645     }
 646 
 647     if (((pmix_packed_data_offset/3)*4) + pmix_packed_encoded_data_offset < pmix_vallen_max) {
 648         /* this meta-key is still being filled,
 649          * nothing to put yet
 650          */
 651         return OPAL_SUCCESS;
 652     }
 653 
 654     rc = opal_pmix_base_partial_commit_packed (&pmix_packed_data, &pmix_packed_data_offset,
 655                                                &pmix_packed_encoded_data, &pmix_packed_encoded_data_offset,
 656                                                pmix_vallen_max, &pmix_pack_key, kvs_put);
 657 
 658     flux_committed = false;
 659     return rc;
 660 }
 661 
 662 static int flux_commit(void)
 663 {
 664     int rc;
 665 
 666     /* check if there is partially filled meta key and put them */
 667     opal_pmix_base_commit_packed (&pmix_packed_data, &pmix_packed_data_offset,
 668                                   &pmix_packed_encoded_data, &pmix_packed_encoded_data_offset,
 669                                   pmix_vallen_max, &pmix_pack_key, kvs_put);
 670 
 671     if (PMI_SUCCESS != (rc = PMI_KVS_Commit(pmix_kvs_name))) {
 672         OPAL_PMI_ERROR(rc, "PMI_KVS_Commit");
 673         return OPAL_ERROR;
 674     }
 675     return OPAL_SUCCESS;
 676 }
 677 
 678 static int flux_fence(opal_list_t *procs, int collect_data)
 679 {
 680     int rc;
 681     if (PMI_SUCCESS != (rc = PMI_Barrier())) {
 682         OPAL_PMI_ERROR(rc, "PMI_Barrier");
 683         return OPAL_ERROR;
 684     }
 685     return OPAL_SUCCESS;
 686 }
 687 
 688 static int flux_get(const opal_process_name_t *id,
 689                   const char *key, opal_list_t *info,
 690                   opal_value_t **kv)
 691 {
 692     int rc;
 693     opal_output_verbose(2, opal_pmix_base_framework.framework_output,
 694                         "%s pmix:flux called get for key %s",
 695                         OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), key);
 696 
 697     /* Keys presumed stored directly to cache by flux_init() under the
 698      * wildcard rank must not trigger PMI_KVS_Get() if not found. */
 699     if (id->vpid == OPAL_VPID_WILDCARD) {
 700         opal_list_t values;
 701         OBJ_CONSTRUCT(&values, opal_list_t);
 702         rc = opal_pmix_base_fetch (id, key, &values);
 703         OPAL_LIST_DESTRUCT(&values);
 704         if (OPAL_SUCCESS != rc) {
 705             return rc;
 706         }
 707     }
 708 
 709     rc = opal_pmix_base_cache_keys_locally(id, key, kv, pmix_kvs_name, pmix_vallen_max, kvs_get);
 710     opal_output_verbose(2, opal_pmix_base_framework.framework_output,
 711                         "%s pmix:flux got key %s",
 712                         OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), key);
 713 
 714     return rc;
 715 }
 716 
 717 static int flux_publish(opal_list_t *info)
 718 {
 719     return OPAL_ERR_NOT_SUPPORTED;
 720 }
 721 
 722 static int flux_lookup(opal_list_t *data, opal_list_t *info)
 723 {
 724     // Allocate mem for port here? Otherwise we won't get success!
 725 
 726     return OPAL_ERR_NOT_SUPPORTED;
 727 }
 728 
 729 static int flux_unpublish(char **keys, opal_list_t *info)
 730 {
 731     return OPAL_ERR_NOT_SUPPORTED;
 732 }
 733 
 734 static int flux_job_connect(opal_list_t *procs)
 735 {
 736     return OPAL_ERR_NOT_SUPPORTED;
 737 }
 738 
 739 static int flux_job_disconnect(opal_list_t *procs)
 740 {
 741     return OPAL_ERR_NOT_SUPPORTED;
 742 }
 743 
 744 static int flux_store_local(const opal_process_name_t *proc,
 745                           opal_value_t *val)
 746 {
 747     opal_pmix_base_store(proc, val);
 748 
 749     return OPAL_SUCCESS;
 750 }
 751 
 752 static const char *flux_get_nspace(opal_jobid_t jobid)
 753 {
 754     return "N/A";
 755 }
 756 static void flux_register_jobid(opal_jobid_t jobid, const char *nspace)
 757 {
 758     return;
 759 }
 760 
 761 static char* pmix_error(int pmix_err)
 762 {
 763     char * err_msg;
 764 
 765     switch(pmix_err) {
 766     case PMI_FAIL: err_msg = "Operation failed"; break;
 767     case PMI_ERR_INIT: err_msg = "PMI is not initialized"; break;
 768     case PMI_ERR_NOMEM: err_msg = "Input buffer not large enough"; break;
 769     case PMI_ERR_INVALID_ARG: err_msg = "Invalid argument"; break;
 770     case PMI_ERR_INVALID_KEY: err_msg = "Invalid key argument"; break;
 771     case PMI_ERR_INVALID_KEY_LENGTH: err_msg = "Invalid key length argument"; break;
 772     case PMI_ERR_INVALID_VAL: err_msg = "Invalid value argument"; break;
 773     case PMI_ERR_INVALID_VAL_LENGTH: err_msg = "Invalid value length argument"; break;
 774     case PMI_ERR_INVALID_LENGTH: err_msg = "Invalid length argument"; break;
 775     case PMI_ERR_INVALID_NUM_ARGS: err_msg = "Invalid number of arguments"; break;
 776     case PMI_ERR_INVALID_ARGS: err_msg = "Invalid args argument"; break;
 777     case PMI_ERR_INVALID_NUM_PARSED: err_msg = "Invalid num_parsed length argument"; break;
 778     case PMI_ERR_INVALID_KEYVALP: err_msg = "Invalid keyvalp argument"; break;
 779     case PMI_ERR_INVALID_SIZE: err_msg = "Invalid size argument"; break;
 780 #if defined(PMI_ERR_INVALID_KVS)
 781         /* pmix.h calls this a valid return code but mpich doesn't define it */
 782     case PMI_ERR_INVALID_KVS: err_msg = "Invalid kvs argument"; break;
 783 #endif
 784     case PMI_SUCCESS: err_msg = "Success"; break;
 785     default: err_msg = "Unkown error";
 786     }
 787     return err_msg;
 788 }

/* [<][>][^][v][top][bottom][index][help] */