root/opal/mca/pmix/s2/pmix_s2.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. kvs_put
  2. kvs_get
  3. s2_init
  4. s2_fini
  5. s2_initialized
  6. s2_abort
  7. s2_spawn
  8. s2_job_connect
  9. s2_job_disconnect
  10. s2_put
  11. s2_commit
  12. fencenb
  13. s2_fencenb
  14. fence_release
  15. s2_fence
  16. s2_get
  17. s2_publish
  18. s2_lookup
  19. s2_unpublish
  20. s2_store_local
  21. s2_get_nspace
  22. s2_register_jobid
  23. pmix_error

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2007      The Trustees of Indiana University.
   4  *                         All rights reserved.
   5  * Copyright (c) 2011-2016 Cisco Systems, Inc.  All rights reserved.
   6  * Copyright (c) 2011-2017 Los Alamos National Security, LLC. All
   7  *                         rights reserved.
   8  * Copyright (c) 2013-2018 Intel, Inc. All rights reserved.
   9  * Copyright (c) 2014-2016 Research Organization for Information Science
  10  *                         and Technology (RIST). All rights reserved.
  11  * $COPYRIGHT$
  12  *
  13  * Additional copyrights may follow
  14  *
  15  * $HEADER$
  16  */
  17 
  18 #include "opal_config.h"
  19 #include "opal/constants.h"
  20 #include "opal/types.h"
  21 
  22 #include "opal_stdint.h"
  23 #include "opal/mca/hwloc/base/base.h"
  24 #include "opal/util/argv.h"
  25 #include "opal/util/opal_environ.h"
  26 #include "opal/util/output.h"
  27 #include "opal/util/proc.h"
  28 #include "opal/util/show_help.h"
  29 
  30 #include "pmi2_pmap_parser.h"
  31 
  32 #include <string.h>
  33 #include <pmi2.h>
  34 
  35 #include "opal/mca/pmix/base/base.h"
  36 #include "opal/mca/pmix/base/pmix_base_hash.h"
  37 #include "pmix_s2.h"
  38 
  39 static int s2_init(opal_list_t *ilist);
  40 static int s2_fini(void);
  41 static int s2_initialized(void);
  42 static int s2_abort(int flag, const char msg[],
  43                     opal_list_t *procs);
  44 static int s2_commit(void);
  45 static int s2_fencenb(opal_list_t *procs, int collect_data,
  46                       opal_pmix_op_cbfunc_t cbfunc, void *cbdata);
  47 static int s2_fence(opal_list_t *procs, int collect_data);
  48 static int s2_put(opal_pmix_scope_t scope,
  49                   opal_value_t *kv);
  50 static int s2_get(const opal_process_name_t *id,
  51                   const char *key, opal_list_t *info,
  52                   opal_value_t **kv);
  53 static int s2_publish(opal_list_t *info);
  54 static int s2_lookup(opal_list_t *data, opal_list_t *info);
  55 static int s2_unpublish(char **keys, opal_list_t *info);
  56 static int s2_spawn(opal_list_t *jobinfo, opal_list_t *apps, opal_jobid_t *jobid);
  57 static int s2_job_connect(opal_list_t *procs);
  58 static int s2_job_disconnect(opal_list_t *procs);
  59 static int s2_store_local(const opal_process_name_t *proc,
  60                           opal_value_t *val);
  61 static const char *s2_get_nspace(opal_jobid_t jobid);
  62 static void s2_register_jobid(opal_jobid_t jobid, const char *nspace);
  63 
  64 const opal_pmix_base_module_t opal_pmix_s2_module = {
  65     .init = s2_init,
  66     .finalize = s2_fini,
  67     .initialized = s2_initialized,
  68     .abort = s2_abort,
  69     .commit = s2_commit,
  70     .fence_nb = s2_fencenb,
  71     .fence = s2_fence,
  72     .put = s2_put,
  73     .get = s2_get,
  74     .publish = s2_publish,
  75     .lookup = s2_lookup,
  76     .unpublish = s2_unpublish,
  77     .spawn = s2_spawn,
  78     .connect = s2_job_connect,
  79     .disconnect = s2_job_disconnect,
  80     .register_evhandler = opal_pmix_base_register_handler,
  81     .deregister_evhandler = opal_pmix_base_deregister_handler,
  82     .store_local = s2_store_local,
  83     .get_nspace = s2_get_nspace,
  84     .register_jobid = s2_register_jobid
  85 };
  86 
  87 // usage accounting
  88 static int pmix_init_count = 0;
  89 
  90 // local object
  91 typedef struct {
  92     opal_object_t super;
  93     opal_event_t ev;
  94     opal_pmix_op_cbfunc_t opcbfunc;
  95     void *cbdata;
  96 } pmi_opcaddy_t;
  97 static OBJ_CLASS_INSTANCE(pmi_opcaddy_t,
  98                           opal_object_t,
  99                           NULL, NULL);
 100 
 101 // PMI constant values:
 102 static int pmix_kvslen_max = 0;
 103 static int pmix_keylen_max = 0;
 104 static int pmix_vallen_max = 0;
 105 static int pmix_vallen_threshold = INT_MAX;
 106 
 107 // Job environment description
 108 static char *pmix_kvs_name = NULL;
 109 
 110 static char* pmix_packed_data = NULL;
 111 static int pmix_packed_data_offset = 0;
 112 static char* pmix_packed_encoded_data = NULL;
 113 static int pmix_packed_encoded_data_offset = 0;
 114 static int pmix_pack_key = 0;
 115 
 116 static int s2_rank;
 117 static uint16_t s2_lrank;
 118 static uint16_t s2_nrank;
 119 static int s2_jsize;
 120 static int s2_appnum;
 121 static int s2_nlranks;
 122 static int *s2_lranks=NULL;
 123 static opal_process_name_t s2_pname;
 124 
 125 static bool got_modex_data = false;
 126 static char* pmix_error(int pmix_err);
 127 #define OPAL_PMI_ERROR(pmi_err, pmi_func)                       \
 128     do {                                                        \
 129     opal_output(0, "%s [%s:%d:%s]: %s\n",                   \
 130     pmi_func, __FILE__, __LINE__, __func__,     \
 131     pmix_error(pmi_err));                        \
 132     } while(0);
 133 
 134 static int kvs_put(const char key[], const char value[])
 135 {
 136     int rc;
 137     rc = PMI2_KVS_Put(key, value);
 138     if( PMI2_SUCCESS != rc ){
 139         OPAL_PMI_ERROR(rc, "PMI2_KVS_Put");
 140         return OPAL_ERROR;
 141     }
 142     return OPAL_SUCCESS;
 143 }
 144 
 145 static int kvs_get(const char key[], char value [], int maxvalue)
 146 {
 147     int rc;
 148     int len;
 149     rc = PMI2_KVS_Get(pmix_kvs_name, PMI2_ID_NULL, key, value, maxvalue, &len);
 150     /*
 151      * turns out the KVS can be called for keys that haven't yet
 152      * been inserted, so suppress warning message if this is the
 153      * case
 154      */
 155     if (PMI2_SUCCESS != rc) {
 156         return OPAL_ERROR;
 157     }
 158     return OPAL_SUCCESS;
 159 }
 160 
 161 static int s2_init(opal_list_t *ilist)
 162 {
 163     int spawned, size, rank, appnum;
 164     int rc, ret = OPAL_ERROR;
 165     char buf[16];
 166     int found;
 167     int my_node;
 168     uint32_t stepid;
 169     int i;
 170     opal_process_name_t ldr;
 171     opal_value_t kv;
 172     char **localranks;
 173     char *str;
 174     char nmtmp[64];
 175     opal_process_name_t wildcard_rank;
 176 
 177     if (0 < pmix_init_count) {
 178         ++pmix_init_count;
 179         return OPAL_SUCCESS;
 180     }
 181 
 182     /* if we can't startup PMI, we can't be used */
 183     if ( PMI2_Initialized () ) {
 184         return OPAL_SUCCESS;
 185     }
 186     size = -1;
 187     rank = -1;
 188     appnum = -1;
 189     // setup hash table so we always can finalize it
 190     opal_pmix_base_hash_init();
 191 
 192     if (PMI2_SUCCESS != (rc = PMI2_Init(&spawned, &size, &rank, &appnum))) {
 193         opal_show_help("help-pmix-base.txt", "pmix2-init-failed", true, rc);
 194         return OPAL_ERROR;
 195     }
 196     if( size < 0 || rank < 0 ){
 197         opal_show_help("help-pmix-base.txt", "pmix2-init-returned-bad-values", true);
 198         ret = OPAL_ERR_BAD_PARAM;
 199         goto err_exit;
 200     }
 201 
 202     s2_jsize = size;
 203     s2_rank = rank;
 204     s2_appnum = appnum;
 205 
 206     pmix_vallen_max = PMI2_MAX_VALLEN;
 207     pmix_kvslen_max = PMI2_MAX_VALLEN; // FIX ME: What to put here for versatility?
 208     pmix_keylen_max = PMI2_MAX_KEYLEN;
 209     pmix_vallen_threshold = PMI2_MAX_VALLEN * 3;
 210     pmix_vallen_threshold >>= 2;
 211 
 212     pmix_kvs_name = (char*)malloc(pmix_kvslen_max);
 213     if( pmix_kvs_name == NULL ){
 214         PMI2_Finalize();
 215         ret = OPAL_ERR_OUT_OF_RESOURCE;
 216         goto err_exit;
 217     }
 218     rc = PMI2_Job_GetId(pmix_kvs_name, pmix_kvslen_max);
 219     if( PMI2_SUCCESS != rc ) {
 220         OPAL_PMI_ERROR(rc, "PMI2_Job_GetId");
 221         free(pmix_kvs_name);
 222         ret = OPAL_ERR_BAD_PARAM;
 223         goto err_exit;
 224     }
 225 
 226     /* store our name in the opal_proc_t so that
 227      * debug messages will make sense - an upper
 228      * layer will eventually overwrite it, but that
 229      * won't do any harm */
 230     s2_pname.jobid = strtoul(pmix_kvs_name, &str, 10);
 231     s2_pname.jobid = (s2_pname.jobid << 16) & 0xffff0000;
 232     if (NULL != str) {
 233         stepid = strtoul(str+1, NULL, 10);
 234         s2_pname.jobid |= (stepid & 0x0000ffff);
 235     }
 236     s2_pname.vpid = s2_rank;
 237     opal_proc_set_name(&s2_pname);
 238     opal_output_verbose(2, opal_pmix_base_framework.framework_output,
 239                         "%s pmix:s2: assigned tmp name",
 240                         OPAL_NAME_PRINT(s2_pname));
 241 
 242     /* setup wildcard rank*/
 243     wildcard_rank = OPAL_PROC_MY_NAME;
 244     wildcard_rank.vpid = OPAL_VPID_WILDCARD;
 245 
 246     /* Slurm PMI provides the job id as an integer followed
 247      * by a '.', followed by essentially a stepid. The first integer
 248      * defines an overall job number. The second integer is the number of
 249      * individual jobs we have run within that allocation.
 250      */
 251     OBJ_CONSTRUCT(&kv, opal_value_t);
 252     kv.key = strdup(OPAL_PMIX_JOBID);
 253     kv.type = OPAL_UINT32;
 254     kv.data.uint32 = s2_pname.jobid;
 255     if (OPAL_SUCCESS != (ret = opal_pmix_base_store(&wildcard_rank, &kv))) {
 256         OPAL_ERROR_LOG(ret);
 257         OBJ_DESTRUCT(&kv);
 258         goto err_exit;
 259     }
 260     OBJ_DESTRUCT(&kv);  // frees pmix_kvs_name
 261 
 262     /* save the job size */
 263     OBJ_CONSTRUCT(&kv, opal_value_t);
 264     kv.key = strdup(OPAL_PMIX_JOB_SIZE);
 265     kv.type = OPAL_UINT32;
 266     kv.data.uint32 = size;
 267     if (OPAL_SUCCESS != (ret = opal_pmix_base_store(&wildcard_rank, &kv))) {
 268         OPAL_ERROR_LOG(ret);
 269         OBJ_DESTRUCT(&kv);
 270         goto err_exit;
 271     }
 272     OBJ_DESTRUCT(&kv);
 273 
 274     /* save the appnum */
 275     OBJ_CONSTRUCT(&kv, opal_value_t);
 276     kv.key = strdup(OPAL_PMIX_APPNUM);
 277     kv.type = OPAL_UINT32;
 278     kv.data.uint32 = appnum;
 279     if (OPAL_SUCCESS != (ret = opal_pmix_base_store(&OPAL_PROC_MY_NAME, &kv))) {
 280         OPAL_ERROR_LOG(ret);
 281         OBJ_DESTRUCT(&kv);
 282         goto err_exit;
 283     }
 284     OBJ_DESTRUCT(&kv);
 285 
 286     rc = PMI2_Info_GetJobAttr("universeSize", buf, 16, &found);
 287     if( PMI2_SUCCESS != rc ) {
 288         OPAL_PMI_ERROR(rc, "PMI_Get_universe_size");
 289         ret = OPAL_ERR_BAD_PARAM;
 290         goto err_exit;
 291     }
 292     /* save it */
 293     OBJ_CONSTRUCT(&kv, opal_value_t);
 294     kv.key = strdup(OPAL_PMIX_UNIV_SIZE);
 295     kv.type = OPAL_UINT32;
 296     kv.data.uint32 = atoi(buf);
 297     if (OPAL_SUCCESS != (ret = opal_pmix_base_store(&wildcard_rank, &kv))) {
 298         OPAL_ERROR_LOG(ret);
 299         OBJ_DESTRUCT(&kv);
 300         goto err_exit;
 301     }
 302     OBJ_DESTRUCT(&kv);
 303     /* push this into the dstore for subsequent fetches */
 304     OBJ_CONSTRUCT(&kv, opal_value_t);
 305     kv.key = strdup(OPAL_PMIX_MAX_PROCS);
 306     kv.type = OPAL_UINT32;
 307     kv.data.uint32 = atoi(buf);
 308     if (OPAL_SUCCESS != (ret = opal_pmix_base_store(&wildcard_rank, &kv))) {
 309         OPAL_ERROR_LOG(ret);
 310         OBJ_DESTRUCT(&kv);
 311         goto err_exit;
 312     }
 313     OBJ_DESTRUCT(&kv);
 314 
 315     char *pmapping = (char*)malloc(PMI2_MAX_VALLEN);
 316     if( pmapping == NULL ){
 317         ret = OPAL_ERR_OUT_OF_RESOURCE;
 318         OPAL_ERROR_LOG(ret);
 319         goto err_exit;
 320     }
 321 
 322     rc = PMI2_Info_GetJobAttr("PMI_process_mapping", pmapping, PMI2_MAX_VALLEN, &found);
 323     if( !found || PMI2_SUCCESS != rc ) {
 324         OPAL_PMI_ERROR(rc,"PMI2_Info_GetJobAttr");
 325         ret = OPAL_ERR_BAD_PARAM;
 326         goto err_exit;
 327     }
 328 
 329     s2_lranks = mca_common_pmi2_parse_pmap(pmapping, s2_pname.vpid, &my_node, &s2_nlranks);
 330     if (NULL == s2_lranks) {
 331         ret = OPAL_ERR_OUT_OF_RESOURCE;
 332         OPAL_ERROR_LOG(ret);
 333         goto err_exit;
 334     }
 335 
 336     free(pmapping);
 337 
 338     /* save the local size */
 339     OBJ_CONSTRUCT(&kv, opal_value_t);
 340     kv.key = strdup(OPAL_PMIX_LOCAL_SIZE);
 341     kv.type = OPAL_UINT32;
 342     kv.data.uint32 = s2_nlranks;
 343     if (OPAL_SUCCESS != (ret = opal_pmix_base_store(&wildcard_rank, &kv))) {
 344         OPAL_ERROR_LOG(ret);
 345         OBJ_DESTRUCT(&kv);
 346         goto err_exit;
 347     }
 348     OBJ_DESTRUCT(&kv);
 349 
 350     s2_lrank = 0;
 351     s2_nrank = 0;
 352     ldr.vpid = rank;
 353     localranks = NULL;
 354     if (0 < s2_nlranks && NULL != s2_lranks) {
 355         /* note the local ldr */
 356         ldr.vpid = s2_lranks[0];
 357         /* find ourselves */
 358         ldr.jobid = s2_pname.jobid;
 359         ldr.vpid = s2_pname.vpid;
 360         memset(nmtmp, 0, 64);
 361         for (i=0; i < s2_nlranks; i++) {
 362             (void)snprintf(nmtmp, 64, "%d", s2_lranks[i]);
 363             opal_argv_append_nosize(&localranks, nmtmp);
 364             if (s2_rank == s2_lranks[i]) {
 365                 s2_lrank = i;
 366                 s2_nrank = i;
 367             }
 368         }
 369         str = opal_argv_join(localranks, ',');
 370         opal_argv_free(localranks);
 371         OBJ_CONSTRUCT(&kv, opal_value_t);
 372         kv.key = strdup(OPAL_PMIX_LOCAL_PEERS);
 373         kv.type = OPAL_STRING;
 374         kv.data.string = str;
 375         if (OPAL_SUCCESS != (ret = opal_pmix_base_store(&wildcard_rank, &kv))) {
 376             OPAL_ERROR_LOG(ret);
 377             OBJ_DESTRUCT(&kv);
 378             goto err_exit;
 379         }
 380         OBJ_DESTRUCT(&kv);
 381     }
 382 
 383     /* save the local leader */
 384     OBJ_CONSTRUCT(&kv, opal_value_t);
 385     kv.key = strdup(OPAL_PMIX_LOCALLDR);
 386     kv.type = OPAL_UINT64;
 387     kv.data.uint64 = *(uint64_t*)&ldr;
 388     if (OPAL_SUCCESS != (ret = opal_pmix_base_store(&wildcard_rank, &kv))) {
 389         OPAL_ERROR_LOG(ret);
 390         OBJ_DESTRUCT(&kv);
 391         goto err_exit;
 392     }
 393     OBJ_DESTRUCT(&kv);
 394     /* save our local rank */
 395     OBJ_CONSTRUCT(&kv, opal_value_t);
 396     kv.key = strdup(OPAL_PMIX_LOCAL_RANK);
 397     kv.type = OPAL_UINT16;
 398     kv.data.uint16 = s2_lrank;
 399     if (OPAL_SUCCESS != (ret = opal_pmix_base_store(&OPAL_PROC_MY_NAME, &kv))) {
 400         OPAL_ERROR_LOG(ret);
 401         OBJ_DESTRUCT(&kv);
 402         goto err_exit;
 403     }
 404     OBJ_DESTRUCT(&kv);
 405     /* and our node rank */
 406     OBJ_CONSTRUCT(&kv, opal_value_t);
 407     kv.key = strdup(OPAL_PMIX_NODE_RANK);
 408     kv.type = OPAL_UINT16;
 409     kv.data.uint16 = s2_nrank;
 410     if (OPAL_SUCCESS != (ret = opal_pmix_base_store(&OPAL_PROC_MY_NAME, &kv))) {
 411         OPAL_ERROR_LOG(ret);
 412         OBJ_DESTRUCT(&kv);
 413         goto err_exit;
 414     }
 415     OBJ_DESTRUCT(&kv);
 416 
 417     /* increment the init count */
 418     ++pmix_init_count;
 419 
 420     return OPAL_SUCCESS;
 421 err_exit:
 422     PMI2_Finalize();
 423     return ret;
 424 }
 425 
 426 static int s2_fini(void) {
 427     if (0 == pmix_init_count) {
 428         return OPAL_SUCCESS;
 429     }
 430 
 431     if (0 == --pmix_init_count) {
 432         PMI2_Finalize();
 433         if (NULL != pmix_kvs_name) {
 434             free(pmix_kvs_name);
 435             pmix_kvs_name = NULL;
 436         }
 437         if (NULL != s2_lranks) {
 438             free(s2_lranks);
 439         }
 440     }
 441     return OPAL_SUCCESS;
 442 }
 443 
 444 static int s2_initialized(void)
 445 {
 446     if (0 < pmix_init_count) {
 447         return 1;
 448     }
 449     return 0;
 450 }
 451 
 452 static int s2_abort(int flag, const char msg[],
 453                     opal_list_t *procs)
 454 {
 455     PMI2_Abort(flag, msg);
 456     return OPAL_SUCCESS;
 457 }
 458 
 459 static int s2_spawn(opal_list_t *jobinfo, opal_list_t *apps, opal_jobid_t *jobid)
 460 {
 461     /*
 462     int rc;
 463     size_t preput_vector_size;
 464     const int info_keyval_sizes[1];
 465     info_keyval_sizes[0] = (int)opal_list_get_size(info_keyval_vector);
 466     //FIXME what's the size of array of lists?
 467     preput_vector_size = opal_list_get_size(preput_keyval_vector);
 468     rc = PMI2_Job_Spawn(count, cmds, argcs, argvs, maxprocs, info_keyval_sizes, info_keyval_vector, (int)preput_vector_size, preput_keyval_vector, jobId, jobIdSize, errors);
 469     if( PMI2_SUCCESS != rc ) {
 470     OPAL_PMI_ERROR(rc, "PMI2_Job_Spawn");
 471     return OPAL_ERROR;
 472     }*/
 473     return OPAL_ERR_NOT_IMPLEMENTED;
 474 }
 475 
 476 static int s2_job_connect(opal_list_t *procs)
 477 {
 478     int rc;
 479     PMI2_Connect_comm_t conn;
 480     opal_namelist_t *nm;
 481     char *jobid;
 482 
 483     if (NULL == procs || 1 < opal_list_get_size(procs)) {
 484         return OPAL_ERR_NOT_SUPPORTED;
 485     }
 486     nm = (opal_namelist_t*)opal_list_get_first(procs);
 487     (void)asprintf(&jobid, "%s", OPAL_JOBID_PRINT(nm->name.jobid));
 488 
 489     /*FIXME should change function prototype to add void* conn */
 490     rc = PMI2_Job_Connect(jobid, &conn);
 491     if( PMI2_SUCCESS != rc ){
 492         OPAL_PMI_ERROR(rc, "PMI2_Job_Connect");
 493         free(jobid);
 494         return OPAL_ERROR;
 495     }
 496     free(jobid);
 497     return OPAL_SUCCESS;
 498 }
 499 
 500 static int s2_job_disconnect(opal_list_t *procs)
 501 {
 502     int rc;
 503     opal_namelist_t *nm;
 504     char *jobid;
 505 
 506     if (NULL == procs || 1 < opal_list_get_size(procs)) {
 507         return OPAL_ERR_NOT_SUPPORTED;
 508     }
 509     nm = (opal_namelist_t*)opal_list_get_first(procs);
 510     (void)asprintf(&jobid, "%s", OPAL_JOBID_PRINT(nm->name.jobid));
 511 
 512     rc = PMI2_Job_Disconnect(jobid);
 513     if( PMI2_SUCCESS != rc ){
 514         OPAL_PMI_ERROR(rc, "PMI2_Job_Disconnect");
 515         free(jobid);
 516         return OPAL_ERROR;
 517     }
 518     free(jobid);
 519     return OPAL_SUCCESS;
 520 }
 521 
 522 static int s2_put(opal_pmix_scope_t scope,
 523                   opal_value_t *kv)
 524 {
 525     int rc;
 526 
 527     opal_output_verbose(2, opal_pmix_base_framework.framework_output,
 528                         "%s pmix:s2 put for key %s",
 529                         OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), kv->key);
 530 
 531     if (OPAL_SUCCESS != (rc = opal_pmix_base_store_encoded (kv->key, (void*)&kv->data, kv->type, &pmix_packed_data, &pmix_packed_data_offset))) {
 532         OPAL_ERROR_LOG(rc);
 533         return rc;
 534     }
 535 
 536     if (pmix_packed_data_offset == 0) {
 537         /* nothing to write */
 538         return OPAL_SUCCESS;
 539     }
 540 
 541     if (((pmix_packed_data_offset/3)*4) + pmix_packed_encoded_data_offset < pmix_vallen_max) {
 542         /* this meta-key is still being filled,
 543      * nothing to put yet
 544      */
 545         return OPAL_SUCCESS;
 546     }
 547 
 548     rc = opal_pmix_base_partial_commit_packed (&pmix_packed_data, &pmix_packed_data_offset,
 549                                                &pmix_packed_encoded_data, &pmix_packed_encoded_data_offset,
 550                                                pmix_vallen_max, &pmix_pack_key, kvs_put);
 551 
 552     return rc;
 553 }
 554 
 555 static int s2_commit(void)
 556 {
 557     return OPAL_SUCCESS;
 558 }
 559 
 560 static void fencenb(int sd, short args, void *cbdata)
 561 {
 562     pmi_opcaddy_t *op = (pmi_opcaddy_t*)cbdata;
 563     int rc = OPAL_SUCCESS;
 564     int32_t i;
 565     opal_value_t *kp, kvn;
 566     opal_hwloc_locality_t locality;
 567     opal_process_name_t pname;
 568 
 569     opal_output_verbose(2, opal_pmix_base_framework.framework_output,
 570                         "%s pmix:s2 called fence",
 571                         OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
 572 
 573     /* check if there is partially filled meta key and put them */
 574     opal_pmix_base_commit_packed (&pmix_packed_data, &pmix_packed_data_offset,
 575                                   &pmix_packed_encoded_data, &pmix_packed_encoded_data_offset,
 576                                   pmix_vallen_max, &pmix_pack_key, kvs_put);
 577 
 578     /* now call fence */
 579     if (PMI2_SUCCESS != PMI2_KVS_Fence()) {
 580         rc = OPAL_ERROR;
 581         goto cleanup;
 582     }
 583 
 584     /* get the modex data from each local process and set the
 585      * localities to avoid having the MPI layer fetch data
 586      * for every process in the job */
 587     pname.jobid = OPAL_PROC_MY_NAME.jobid;
 588     if (!got_modex_data) {
 589         got_modex_data = true;
 590         /* we only need to set locality for each local rank as "not found"
 591      * equates to "non-local" */
 592         for (i=0; i < s2_nlranks; i++) {
 593             pname.vpid = s2_lranks[i];
 594             rc = opal_pmix_base_cache_keys_locally(&s2_pname, OPAL_PMIX_CPUSET,
 595                                                    &kp, pmix_kvs_name, pmix_vallen_max, kvs_get);
 596             if (OPAL_SUCCESS != rc) {
 597                 OPAL_ERROR_LOG(rc);
 598                 goto cleanup;
 599             }
 600             if (NULL == kp || NULL == kp->data.string) {
 601                 /* if we share a node, but we don't know anything more, then
 602          * mark us as on the node as this is all we know
 603          */
 604                 locality = OPAL_PROC_ON_CLUSTER | OPAL_PROC_ON_CU | OPAL_PROC_ON_NODE;
 605             } else {
 606                 /* determine relative location on our node */
 607                 locality = opal_hwloc_base_get_relative_locality(opal_hwloc_topology,
 608                                                                  opal_process_info.cpuset,
 609                                                                  kp->data.string);
 610             }
 611             if (NULL != kp) {
 612                 OBJ_RELEASE(kp);
 613             }
 614             OPAL_OUTPUT_VERBOSE((1, opal_pmix_base_framework.framework_output,
 615                                  "%s pmix:s2 proc %s locality %s",
 616                                  OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
 617                                  OPAL_NAME_PRINT(s2_pname),
 618                                  opal_hwloc_base_print_locality(locality)));
 619 
 620             OBJ_CONSTRUCT(&kvn, opal_value_t);
 621             kvn.key = strdup(OPAL_PMIX_LOCALITY);
 622             kvn.type = OPAL_UINT16;
 623             kvn.data.uint16 = locality;
 624             opal_pmix_base_store(&pname, &kvn);
 625             OBJ_DESTRUCT(&kvn);
 626         }
 627     }
 628 
 629 cleanup:
 630     if (NULL != op->opcbfunc) {
 631         op->opcbfunc(rc, op->cbdata);
 632     }
 633     OBJ_RELEASE(op);
 634     return;
 635 }
 636 
 637 static int s2_fencenb(opal_list_t *procs, int collect_data,
 638                       opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
 639 {
 640     pmi_opcaddy_t *op;
 641 
 642     /* thread-shift this so we don't block in SLURM's barrier */
 643     op = OBJ_NEW(pmi_opcaddy_t);
 644     op->opcbfunc = cbfunc;
 645     op->cbdata = cbdata;
 646     event_assign(&op->ev, opal_pmix_base.evbase, -1,
 647                  EV_WRITE, fencenb, op);
 648     event_active(&op->ev, EV_WRITE, 1);
 649 
 650     return OPAL_SUCCESS;
 651 }
 652 
 653 #define S2_WAIT_FOR_COMPLETION(a)               \
 654     do {                                        \
 655     while ((a)) {                           \
 656     usleep(10);                         \
 657     }                                       \
 658     } while (0)
 659 
 660 struct fence_result {
 661     volatile int flag;
 662     int status;
 663 };
 664 
 665 static void fence_release(int status, void *cbdata)
 666 {
 667     struct fence_result *res = (struct fence_result*)cbdata;
 668     res->status = status;
 669     opal_atomic_wmb();
 670     res->flag = 0;
 671 }
 672 
 673 static int s2_fence(opal_list_t *procs, int collect_data)
 674 {
 675     struct fence_result result = { 1, OPAL_SUCCESS };
 676     s2_fencenb(procs, collect_data, fence_release, (void*)&result);
 677     S2_WAIT_FOR_COMPLETION(result.flag);
 678     return result.status;
 679 }
 680 
 681 
 682 static int s2_get(const opal_process_name_t *id,
 683                   const char *key, opal_list_t *info,
 684                   opal_value_t **kv)
 685 {
 686     int rc;
 687     rc = opal_pmix_base_cache_keys_locally(id, key, kv, pmix_kvs_name, pmix_vallen_max, kvs_get);
 688     return rc;
 689 }
 690 
 691 static int s2_publish(opal_list_t *info)
 692 {
 693 #if 0
 694     int rc;
 695 
 696     if (PMI2_SUCCESS != (rc = PMI2_Nameserv_publish(service_name, NULL, port))) {
 697         OPAL_PMI_ERROR(rc, "PMI2_Nameserv_publish");
 698         return OPAL_ERROR;
 699     }
 700 #endif
 701     return OPAL_ERR_NOT_IMPLEMENTED;
 702 }
 703 
 704 static int s2_lookup(opal_list_t *data, opal_list_t *info)
 705 {
 706 #if 0
 707     int rc;
 708 
 709     if (PMI2_SUCCESS != (rc = PMI2_Nameserv_lookup(service_name, NULL, port, portLen))) {
 710         OPAL_PMI_ERROR(rc, "PMI2_Nameserv_lookup");
 711         return OPAL_ERROR;
 712     }
 713 
 714 #endif
 715     return OPAL_ERR_NOT_IMPLEMENTED;
 716 }
 717 
 718 static int s2_unpublish(char **keys, opal_list_t *info)
 719 {
 720 #if 0
 721     int rc;
 722 
 723     if (PMI2_SUCCESS != (rc = PMI2_Nameserv_unpublish(service_name, NULL))) {
 724         OPAL_PMI_ERROR(rc, "PMI2_Nameserv_unpublish");
 725         return OPAL_ERROR;
 726     }
 727 #endif
 728     return OPAL_ERR_NOT_IMPLEMENTED;
 729 }
 730 
 731 static int s2_store_local(const opal_process_name_t *proc,
 732                           opal_value_t *val)
 733 {
 734     opal_pmix_base_store(proc, val);
 735 
 736     return OPAL_SUCCESS;
 737 }
 738 
 739 static const char *s2_get_nspace(opal_jobid_t jobid)
 740 {
 741     return "N/A";
 742 }
 743 static void s2_register_jobid(opal_jobid_t jobid, const char *nspace)
 744 {
 745     return;
 746 }
 747 
 748 static char* pmix_error(int pmix_err)
 749 {
 750     char * err_msg;
 751 
 752     switch(pmix_err) {
 753     case PMI2_FAIL: err_msg = "Operation failed"; break;
 754     case PMI2_ERR_INIT: err_msg = "PMI is not initialized"; break;
 755     case PMI2_ERR_NOMEM: err_msg = "Input buffer not large enough"; break;
 756     case PMI2_ERR_INVALID_ARG: err_msg = "Invalid argument"; break;
 757     case PMI2_ERR_INVALID_KEY: err_msg = "Invalid key argument"; break;
 758     case PMI2_ERR_INVALID_KEY_LENGTH: err_msg = "Invalid key length argument"; break;
 759     case PMI2_ERR_INVALID_VAL: err_msg = "Invalid value argument"; break;
 760     case PMI2_ERR_INVALID_VAL_LENGTH: err_msg = "Invalid value length argument"; break;
 761     case PMI2_ERR_INVALID_LENGTH: err_msg = "Invalid length argument"; break;
 762     case PMI2_ERR_INVALID_NUM_ARGS: err_msg = "Invalid number of arguments"; break;
 763     case PMI2_ERR_INVALID_ARGS: err_msg = "Invalid args argument"; break;
 764     case PMI2_ERR_INVALID_NUM_PARSED: err_msg = "Invalid num_parsed length argument"; break;
 765     case PMI2_ERR_INVALID_KEYVALP: err_msg = "Invalid keyvalp argument"; break;
 766     case PMI2_ERR_INVALID_SIZE: err_msg = "Invalid size argument"; break;
 767     case PMI2_SUCCESS: err_msg = "Success"; break;
 768     default: err_msg = "Unkown error";
 769     }
 770     return err_msg;
 771 }

/* [<][>][^][v][top][bottom][index][help] */