root/opal/mca/pmix/pmix4x/pmix/src/client/pmix_client.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. _notify_complete
  2. pmix_client_notify_recv
  3. wait_cbfunc
  4. job_data
  5. PMIx_Get_version
  6. evhandler_reg_callbk
  7. notification_fn
  8. release_info
  9. _check_for_notify
  10. client_iof_handler
  11. PMIx_Init
  12. PMIx_Initialized
  13. fin_timeout
  14. finwait_cbfunc
  15. PMIx_Finalize
  16. PMIx_Abort
  17. _putfn
  18. PMIx_Put
  19. _commitfn
  20. PMIx_Commit
  21. _resolve_peers
  22. PMIx_Resolve_peers
  23. _resolve_nodes
  24. PMIx_Resolve_nodes

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2014-2019 Intel, Inc.  All rights reserved.
   4  * Copyright (c) 2014-2019 Research Organization for Information Science
   5  *                         and Technology (RIST).  All rights reserved.
   6  * Copyright (c) 2014      Artem Y. Polyakov <artpol84@gmail.com>.
   7  *                         All rights reserved.
   8  * Copyright (c) 2016-2017 Mellanox Technologies, Inc.
   9  *                         All rights reserved.
  10  * Copyright (c) 2016-2017 IBM Corporation.  All rights reserved.
  11  * $COPYRIGHT$
  12  *
  13  * Additional copyrights may follow
  14  *
  15  * $HEADER$
  16  */
  17 
  18 #include <src/include/pmix_config.h>
  19 
  20 #include <src/include/pmix_stdint.h>
  21 #include <src/include/pmix_socket_errno.h>
  22 
  23 #include <pmix.h>
  24 #include <pmix_rename.h>
  25 
  26 #include "src/include/pmix_globals.h"
  27 
  28 #ifdef HAVE_STRING_H
  29 #include <string.h>
  30 #endif
  31 #include <fcntl.h>
  32 #ifdef HAVE_UNISTD_H
  33 #include <unistd.h>
  34 #endif
  35 #ifdef HAVE_SYS_SOCKET_H
  36 #include <sys/socket.h>
  37 #endif
  38 #ifdef HAVE_SYS_UN_H
  39 #include <sys/un.h>
  40 #endif
  41 #ifdef HAVE_SYS_UIO_H
  42 #include <sys/uio.h>
  43 #endif
  44 #ifdef HAVE_SYS_TYPES_H
  45 #include <sys/types.h>
  46 #endif
  47 
  48 #include PMIX_EVENT_HEADER
  49 #ifdef PMIX_EVENT2_THREAD_HEADER
  50 #include PMIX_EVENT2_THREAD_HEADER
  51 #endif
  52 
  53 static const char pmix_version_string[] = PMIX_VERSION;
  54 static pmix_status_t pmix_init_result = PMIX_ERR_INIT;
  55 
  56 #include "src/class/pmix_list.h"
  57 #include "src/event/pmix_event.h"
  58 #include "src/util/argv.h"
  59 #include "src/util/error.h"
  60 #include "src/util/hash.h"
  61 #include "src/util/name_fns.h"
  62 #include "src/util/output.h"
  63 #include "src/runtime/pmix_progress_threads.h"
  64 #include "src/runtime/pmix_rte.h"
  65 #include "src/threads/threads.h"
  66 #include "src/mca/bfrops/base/base.h"
  67 #include "src/mca/pcompress/base/base.h"
  68 #include "src/mca/gds/base/base.h"
  69 #include "src/mca/preg/preg.h"
  70 #include "src/mca/ptl/base/base.h"
  71 #include "src/include/pmix_globals.h"
  72 #include "src/common/pmix_attributes.h"
  73 #include "src/common/pmix_iof.h"
  74 
  75 #include "pmix_client_ops.h"
  76 
  77 #define PMIX_MAX_RETRIES 10
  78 
  79 static void _notify_complete(pmix_status_t status, void *cbdata)
  80 {
  81     pmix_event_chain_t *chain = (pmix_event_chain_t*)cbdata;
  82     PMIX_ACQUIRE_OBJECT(chain);
  83     PMIX_RELEASE(chain);
  84 }
  85 
  86 static void pmix_client_notify_recv(struct pmix_peer_t *peer,
  87                                     pmix_ptl_hdr_t *hdr,
  88                                     pmix_buffer_t *buf, void *cbdata)
  89 {
  90     pmix_status_t rc;
  91     int32_t cnt;
  92     pmix_cmd_t cmd;
  93     pmix_event_chain_t *chain;
  94     size_t ninfo;
  95 
  96     pmix_output_verbose(2, pmix_client_globals.base_output,
  97                         "pmix:client_notify_recv - processing event");
  98 
  99     /* a zero-byte buffer indicates that this recv is being
 100      * completed due to a lost connection */
 101     if (PMIX_BUFFER_IS_EMPTY(buf)) {
 102         return;
 103     }
 104 
 105     /* start the local notification chain */
 106     chain = PMIX_NEW(pmix_event_chain_t);
 107     if (NULL == chain) {
 108         PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
 109         return;
 110     }
 111     chain->final_cbfunc = _notify_complete;
 112     chain->final_cbdata = chain;
 113 
 114     cnt=1;
 115     PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
 116                        buf, &cmd, &cnt, PMIX_COMMAND);
 117     if (PMIX_SUCCESS != rc) {
 118         PMIX_ERROR_LOG(rc);
 119         PMIX_RELEASE(chain);
 120         goto error;
 121     }
 122     /* unpack the status */
 123     cnt=1;
 124     PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
 125                        buf, &chain->status, &cnt, PMIX_STATUS);
 126     if (PMIX_SUCCESS != rc) {
 127         PMIX_ERROR_LOG(rc);
 128         PMIX_RELEASE(chain);
 129         goto error;
 130     }
 131 
 132     /* unpack the source of the event */
 133     cnt=1;
 134     PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
 135                        buf, &chain->source, &cnt, PMIX_PROC);
 136     if (PMIX_SUCCESS != rc) {
 137         PMIX_ERROR_LOG(rc);
 138         PMIX_RELEASE(chain);
 139         goto error;
 140     }
 141 
 142     /* unpack the info that might have been provided */
 143     cnt=1;
 144     PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
 145                        buf, &ninfo, &cnt, PMIX_SIZE);
 146     if (PMIX_SUCCESS != rc) {
 147         PMIX_ERROR_LOG(rc);
 148         PMIX_RELEASE(chain);
 149         goto error;
 150     }
 151 
 152     /* we always leave space for event hdlr name and a callback object */
 153     chain->nallocated = ninfo + 2;
 154     PMIX_INFO_CREATE(chain->info, chain->nallocated);
 155     if (NULL == chain->info) {
 156         PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
 157         PMIX_RELEASE(chain);
 158         return;
 159     }
 160 
 161     if (0 < ninfo) {
 162         chain->ninfo = ninfo;
 163         cnt = ninfo;
 164         PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
 165                            buf, chain->info, &cnt, PMIX_INFO);
 166         if (PMIX_SUCCESS != rc) {
 167             PMIX_ERROR_LOG(rc);
 168             PMIX_RELEASE(chain);
 169             goto error;
 170         }
 171     }
 172     /* prep the chain for processing */
 173     pmix_prep_event_chain(chain, chain->info, ninfo, false);
 174 
 175     pmix_output_verbose(2, pmix_client_globals.base_output,
 176                         "[%s:%d] pmix:client_notify_recv - processing event %s, calling errhandler",
 177                         pmix_globals.myid.nspace, pmix_globals.myid.rank, PMIx_Error_string(chain->status));
 178 
 179     pmix_invoke_local_event_hdlr(chain);
 180     return;
 181 
 182   error:
 183     /* we always need to return */
 184     pmix_output_verbose(2, pmix_client_globals.base_output,
 185                         "pmix:client_notify_recv - unpack error status =%d, calling def errhandler", rc);
 186     chain = PMIX_NEW(pmix_event_chain_t);
 187     if (NULL == chain) {
 188         PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
 189         return;
 190     }
 191     chain->status = rc;
 192     pmix_invoke_local_event_hdlr(chain);
 193 }
 194 
 195 
 196 pmix_client_globals_t pmix_client_globals = {0};
 197 
 198 /* callback for wait completion */
 199 static void wait_cbfunc(struct pmix_peer_t *pr,
 200                         pmix_ptl_hdr_t *hdr,
 201                         pmix_buffer_t *buf, void *cbdata)
 202 {
 203     pmix_lock_t *lock = (pmix_lock_t*)cbdata;
 204 
 205     pmix_output_verbose(2, pmix_client_globals.base_output,
 206                         "pmix:client wait_cbfunc received");
 207     PMIX_WAKEUP_THREAD(lock);
 208 }
 209 
 210 /* callback to receive job info */
 211 static void job_data(struct pmix_peer_t *pr,
 212                      pmix_ptl_hdr_t *hdr,
 213                      pmix_buffer_t *buf, void *cbdata)
 214 {
 215     pmix_status_t rc;
 216     char *nspace;
 217     int32_t cnt = 1;
 218     pmix_cb_t *cb = (pmix_cb_t*)cbdata;
 219 
 220     /* unpack the nspace - should be same as our own */
 221     PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
 222                        buf, &nspace, &cnt, PMIX_STRING);
 223     if (PMIX_SUCCESS != rc) {
 224         PMIX_ERROR_LOG(rc);
 225         cb->status = PMIX_ERROR;
 226         PMIX_POST_OBJECT(cb);
 227         PMIX_WAKEUP_THREAD(&cb->lock);
 228         return;
 229     }
 230 
 231     /* decode it */
 232     PMIX_GDS_STORE_JOB_INFO(cb->status,
 233                             pmix_client_globals.myserver,
 234                             nspace, buf);
 235     free(nspace);
 236     cb->status = PMIX_SUCCESS;
 237     PMIX_POST_OBJECT(cb);
 238     PMIX_WAKEUP_THREAD(&cb->lock);
 239 }
 240 
 241 PMIX_EXPORT const char* PMIx_Get_version(void)
 242 {
 243     return pmix_version_string;
 244 }
 245 
 246 /* event handler registration callback */
 247 static void evhandler_reg_callbk(pmix_status_t status,
 248                                  size_t evhandler_ref,
 249                                  void *cbdata)
 250 {
 251     pmix_lock_t *lock = (pmix_lock_t*)cbdata;
 252 
 253     lock->status = status;
 254     PMIX_WAKEUP_THREAD(lock);
 255 }
 256 
 257 
 258 static void notification_fn(size_t evhdlr_registration_id,
 259                             pmix_status_t status,
 260                             const pmix_proc_t *source,
 261                             pmix_info_t info[], size_t ninfo,
 262                             pmix_info_t results[], size_t nresults,
 263                             pmix_event_notification_cbfunc_fn_t cbfunc,
 264                             void *cbdata)
 265 {
 266     pmix_lock_t *lock=NULL;
 267     char *name = NULL;
 268     size_t n;
 269 
 270     pmix_output_verbose(2, pmix_client_globals.base_output,
 271                         "[%s:%d] DEBUGGER RELEASE RECVD",
 272                         pmix_globals.myid.nspace, pmix_globals.myid.rank);
 273     if (NULL != info) {
 274         lock = NULL;
 275         for (n=0; n < ninfo; n++) {
 276             if (0 == strncmp(info[n].key, PMIX_EVENT_RETURN_OBJECT, PMIX_MAX_KEYLEN)) {
 277                 lock = (pmix_lock_t*)info[n].value.data.ptr;
 278             } else if (0 == strncmp(info[n].key, PMIX_EVENT_HDLR_NAME, PMIX_MAX_KEYLEN)) {
 279                 name = info[n].value.data.string;
 280             }
 281         }
 282         /* if the object wasn't returned, then that is an error */
 283         if (NULL == lock) {
 284             pmix_output_verbose(2, pmix_client_globals.base_output,
 285                                 "event handler %s failed to return object",
 286                                 (NULL == name) ? "NULL" : name);
 287             /* let the event handler progress */
 288             if (NULL != cbfunc) {
 289                 cbfunc(PMIX_SUCCESS, NULL, 0, NULL, NULL, cbdata);
 290             }
 291             return;
 292         }
 293     }
 294     if (NULL != lock) {
 295         PMIX_WAKEUP_THREAD(lock);
 296     }
 297 
 298     if (NULL != cbfunc) {
 299         cbfunc(PMIX_EVENT_ACTION_COMPLETE, NULL, 0, NULL, NULL, cbdata);
 300     }
 301 }
 302 
 303 typedef struct {
 304     pmix_info_t *info;
 305     size_t ninfo;
 306 } mydata_t;
 307 
 308 static void release_info(pmix_status_t status, void *cbdata)
 309 {
 310     mydata_t *cd = (mydata_t*)cbdata;
 311     PMIX_INFO_FREE(cd->info, cd->ninfo);
 312     free(cd);
 313 }
 314 
 315 static void _check_for_notify(pmix_info_t info[], size_t ninfo)
 316 {
 317     mydata_t *cd;
 318     size_t n, m=0;
 319     pmix_info_t *model=NULL, *library=NULL, *vers=NULL, *tmod=NULL;
 320 
 321     for (n=0; n < ninfo; n++) {
 322         if (0 == strncmp(info[n].key, PMIX_PROGRAMMING_MODEL, PMIX_MAX_KEYLEN)) {
 323             /* we need to generate an event indicating that
 324              * a programming model has been declared */
 325             model = &info[n];
 326             ++m;
 327         } else if (0 == strncmp(info[n].key, PMIX_MODEL_LIBRARY_NAME, PMIX_MAX_KEYLEN)) {
 328             library = &info[n];
 329             ++m;
 330         } else if (0 == strncmp(info[n].key, PMIX_MODEL_LIBRARY_VERSION, PMIX_MAX_KEYLEN)) {
 331             vers = &info[n];
 332             ++m;
 333         } else if (0 == strncmp(info[n].key, PMIX_THREADING_MODEL, PMIX_MAX_KEYLEN)) {
 334             tmod = &info[n];
 335             ++m;
 336         }
 337     }
 338     if (0 < m) {
 339         /* notify anyone listening that a model has been declared */
 340         cd = (mydata_t*)malloc(sizeof(mydata_t));
 341         if (NULL == cd) {
 342             /* nothing we can do */
 343             return;
 344         }
 345         PMIX_INFO_CREATE(cd->info, m+1);
 346         if (NULL == cd->info) {
 347             free(cd);
 348             return;
 349         }
 350         cd->ninfo = m+1;
 351         n = 0;
 352         if (NULL != model) {
 353             PMIX_INFO_XFER(&cd->info[n], model);
 354             ++n;
 355         }
 356         if (NULL != library) {
 357             PMIX_INFO_XFER(&cd->info[n], library);
 358             ++n;
 359         }
 360         if (NULL != vers) {
 361             PMIX_INFO_XFER(&cd->info[n], vers);
 362             ++n;
 363         }
 364         if (NULL != tmod) {
 365             PMIX_INFO_XFER(&cd->info[n], tmod);
 366             ++n;
 367         }
 368         /* mark that it is not to go to any default handlers */
 369         PMIX_INFO_LOAD(&cd->info[n], PMIX_EVENT_NON_DEFAULT, NULL, PMIX_BOOL);
 370         PMIx_Notify_event(PMIX_MODEL_DECLARED,
 371                           &pmix_globals.myid, PMIX_RANGE_PROC_LOCAL,
 372                           cd->info, cd->ninfo, release_info, (void*)cd);
 373     }
 374 }
 375 
 376 static void client_iof_handler(struct pmix_peer_t *pr,
 377                                pmix_ptl_hdr_t *hdr,
 378                                pmix_buffer_t *buf, void *cbdata)
 379 {
 380     pmix_peer_t *peer = (pmix_peer_t*)pr;
 381     pmix_proc_t source;
 382     pmix_iof_channel_t channel;
 383     pmix_byte_object_t bo;
 384     int32_t cnt;
 385     pmix_status_t rc;
 386 
 387     pmix_output_verbose(2, pmix_client_globals.iof_output,
 388                         "recvd IOF");
 389 
 390     /* if the buffer is empty, they are simply closing the channel */
 391     if (0 == buf->bytes_used) {
 392         return;
 393     }
 394 
 395     cnt = 1;
 396     PMIX_BFROPS_UNPACK(rc, peer, buf, &source, &cnt, PMIX_PROC);
 397     if (PMIX_SUCCESS != rc) {
 398         PMIX_ERROR_LOG(rc);
 399         return;
 400     }
 401     cnt = 1;
 402     PMIX_BFROPS_UNPACK(rc, peer, buf, &channel, &cnt, PMIX_IOF_CHANNEL);
 403     if (PMIX_SUCCESS != rc) {
 404         PMIX_ERROR_LOG(rc);
 405         return;
 406     }
 407     cnt = 1;
 408     PMIX_BFROPS_UNPACK(rc, peer, buf, &bo, &cnt, PMIX_BYTE_OBJECT);
 409     if (PMIX_SUCCESS != rc) {
 410         PMIX_ERROR_LOG(rc);
 411         return;
 412     }
 413     if (NULL != bo.bytes && 0 < bo.size) {
 414         pmix_iof_write_output(&source, channel, &bo, NULL);
 415     }
 416     PMIX_BYTE_OBJECT_DESTRUCT(&bo);
 417 }
 418 
 419 PMIX_EXPORT pmix_status_t PMIx_Init(pmix_proc_t *proc,
 420                                     pmix_info_t info[], size_t ninfo)
 421 {
 422     char *evar;
 423     pmix_status_t rc = PMIX_SUCCESS;
 424     pmix_cb_t cb;
 425     pmix_buffer_t *req;
 426     pmix_cmd_t cmd = PMIX_REQ_CMD;
 427     pmix_status_t code = PMIX_ERR_DEBUGGER_RELEASE;
 428     pmix_proc_t wildcard;
 429     pmix_info_t ginfo, evinfo[2];
 430     pmix_value_t *val = NULL;
 431     pmix_lock_t reglock, releaselock;
 432     size_t n;
 433     bool found;
 434     pmix_ptl_posted_recv_t *rcv;
 435 
 436     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
 437 
 438     if (0 < pmix_globals.init_cntr ||
 439         (NULL != pmix_globals.mypeer && PMIX_PROC_IS_SERVER(pmix_globals.mypeer))) {
 440         /* since we have been called before, the nspace and
 441          * rank should be known. So return them here if
 442          * requested */
 443          if (NULL != proc) {
 444             pmix_strncpy(proc->nspace, pmix_globals.myid.nspace, PMIX_MAX_NSLEN);
 445             proc->rank = pmix_globals.myid.rank;
 446         }
 447         ++pmix_globals.init_cntr;
 448         /* we also need to check the info keys to see if something need
 449          * be done with them - e.g., to notify another library that we
 450          * also have called init */
 451         PMIX_RELEASE_THREAD(&pmix_global_lock);
 452         if (NULL != info) {
 453             _check_for_notify(info, ninfo);
 454         }
 455         return pmix_init_result;
 456     }
 457     ++pmix_globals.init_cntr;
 458 
 459     /* if we don't see the required info, then we cannot init */
 460     if (NULL == (evar = getenv("PMIX_NAMESPACE"))) {
 461         pmix_init_result = PMIX_ERR_INVALID_NAMESPACE;
 462         PMIX_RELEASE_THREAD(&pmix_global_lock);
 463         return PMIX_ERR_INVALID_NAMESPACE;
 464     }
 465 
 466     /* setup the runtime - this init's the globals,
 467      * opens and initializes the required frameworks */
 468     if (PMIX_SUCCESS != (rc = pmix_rte_init(PMIX_PROC_CLIENT, info, ninfo,
 469                                             pmix_client_notify_recv))) {
 470         PMIX_ERROR_LOG(rc);
 471         pmix_init_result = rc;
 472         PMIX_RELEASE_THREAD(&pmix_global_lock);
 473         return rc;
 474     }
 475     /* setup the IO Forwarding recv */
 476     rcv = PMIX_NEW(pmix_ptl_posted_recv_t);
 477     rcv->tag = PMIX_PTL_TAG_IOF;
 478     rcv->cbfunc = client_iof_handler;
 479     /* add it to the end of the list of recvs */
 480     pmix_list_append(&pmix_ptl_globals.posted_recvs, &rcv->super);
 481 
 482 
 483     /* setup the globals */
 484     PMIX_CONSTRUCT(&pmix_client_globals.pending_requests, pmix_list_t);
 485     PMIX_CONSTRUCT(&pmix_client_globals.peers, pmix_pointer_array_t);
 486     pmix_pointer_array_init(&pmix_client_globals.peers, 1, INT_MAX, 1);
 487     pmix_client_globals.myserver = PMIX_NEW(pmix_peer_t);
 488     if (NULL == pmix_client_globals.myserver) {
 489         pmix_init_result = PMIX_ERR_NOMEM;
 490         PMIX_RELEASE_THREAD(&pmix_global_lock);
 491         return PMIX_ERR_NOMEM;
 492     }
 493     pmix_client_globals.myserver->nptr = PMIX_NEW(pmix_namespace_t);
 494     if (NULL == pmix_client_globals.myserver->nptr) {
 495         PMIX_RELEASE(pmix_client_globals.myserver);
 496         pmix_init_result = PMIX_ERR_NOMEM;
 497         PMIX_RELEASE_THREAD(&pmix_global_lock);
 498         return PMIX_ERR_NOMEM;
 499     }
 500     pmix_client_globals.myserver->info = PMIX_NEW(pmix_rank_info_t);
 501     if (NULL == pmix_client_globals.myserver->info) {
 502         PMIX_RELEASE(pmix_client_globals.myserver);
 503         pmix_init_result = PMIX_ERR_NOMEM;
 504         PMIX_RELEASE_THREAD(&pmix_global_lock);
 505         return PMIX_ERR_NOMEM;
 506     }
 507 
 508     /* setup the base verbosity */
 509     if (0 < pmix_client_globals.base_verbose) {
 510         /* set default output */
 511         pmix_client_globals.base_output = pmix_output_open(NULL);
 512         pmix_output_set_verbosity(pmix_client_globals.base_output,
 513                                   pmix_client_globals.base_verbose);
 514     }
 515 
 516     pmix_output_verbose(2, pmix_client_globals.base_output,
 517                         "pmix: init called");
 518 
 519     /* we require our nspace */
 520     if (NULL != proc) {
 521         pmix_strncpy(proc->nspace, evar, PMIX_MAX_NSLEN);
 522     }
 523     PMIX_LOAD_NSPACE(pmix_globals.myid.nspace, evar);
 524     /* set the global pmix_namespace_t object for our peer */
 525     pmix_globals.mypeer->nptr->nspace = strdup(evar);
 526 
 527     /* we also require our rank */
 528     if (NULL == (evar = getenv("PMIX_RANK"))) {
 529         /* let the caller know that the server isn't available yet */
 530         pmix_init_result = PMIX_ERR_DATA_VALUE_NOT_FOUND;
 531         PMIX_RELEASE_THREAD(&pmix_global_lock);
 532         return PMIX_ERR_DATA_VALUE_NOT_FOUND;
 533     }
 534     pmix_globals.myid.rank = strtol(evar, NULL, 10);
 535     if (NULL != proc) {
 536         proc->rank = pmix_globals.myid.rank;
 537     }
 538     pmix_globals.pindex = -1;
 539     /* setup a rank_info object for us */
 540     pmix_globals.mypeer->info = PMIX_NEW(pmix_rank_info_t);
 541     if (NULL == pmix_globals.mypeer->info) {
 542         pmix_init_result = PMIX_ERR_NOMEM;
 543         PMIX_RELEASE_THREAD(&pmix_global_lock);
 544         return PMIX_ERR_NOMEM;
 545     }
 546     pmix_globals.mypeer->info->pname.nspace = strdup(proc->nspace);
 547     pmix_globals.mypeer->info->pname.rank = proc->rank;
 548 
 549     /* select our psec compat module - the selection will be based
 550      * on the corresponding envars that should have been passed
 551      * to us at launch */
 552     evar = getenv("PMIX_SECURITY_MODE");
 553     pmix_globals.mypeer->nptr->compat.psec = pmix_psec_base_assign_module(evar);
 554     if (NULL == pmix_globals.mypeer->nptr->compat.psec) {
 555         pmix_init_result = PMIX_ERR_INIT;
 556         PMIX_RELEASE_THREAD(&pmix_global_lock);
 557         return PMIX_ERR_INIT;
 558     }
 559     /* the server will be using the same */
 560     pmix_client_globals.myserver->nptr->compat.psec = pmix_globals.mypeer->nptr->compat.psec;
 561 
 562     /* set the buffer type - the selection will be based
 563      * on the corresponding envars that should have been passed
 564      * to us at launch */
 565     evar = getenv("PMIX_BFROP_BUFFER_TYPE");
 566     if (NULL == evar) {
 567         /* just set to our default */
 568         pmix_globals.mypeer->nptr->compat.type = pmix_bfrops_globals.default_type;
 569     } else if (0 == strcmp(evar, "PMIX_BFROP_BUFFER_FULLY_DESC")) {
 570         pmix_globals.mypeer->nptr->compat.type = PMIX_BFROP_BUFFER_FULLY_DESC;
 571     } else {
 572         pmix_globals.mypeer->nptr->compat.type = PMIX_BFROP_BUFFER_NON_DESC;
 573     }
 574     /* the server will be using the same */
 575     pmix_client_globals.myserver->nptr->compat.type = pmix_globals.mypeer->nptr->compat.type;
 576 
 577     /* select the gds compat module we will use to interact with
 578      * our server- the selection will be based
 579      * on the corresponding envars that should have been passed
 580      * to us at launch */
 581     evar = getenv("PMIX_GDS_MODULE");
 582     if (NULL != evar) {
 583         PMIX_INFO_LOAD(&ginfo, PMIX_GDS_MODULE, evar, PMIX_STRING);
 584         pmix_client_globals.myserver->nptr->compat.gds = pmix_gds_base_assign_module(&ginfo, 1);
 585         PMIX_INFO_DESTRUCT(&ginfo);
 586     } else {
 587         pmix_client_globals.myserver->nptr->compat.gds = pmix_gds_base_assign_module(NULL, 0);
 588     }
 589     if (NULL == pmix_client_globals.myserver->nptr->compat.gds) {
 590         pmix_init_result = PMIX_ERR_INIT;
 591         PMIX_RELEASE_THREAD(&pmix_global_lock);
 592         return PMIX_ERR_INIT;
 593     }
 594     /* now select a GDS module for our own internal use - the user may
 595      * have passed down a directive for this purpose. If they did, then
 596      * use it. Otherwise, we want the "hash" module */
 597     found = false;
 598     if (info != NULL) {
 599         for (n=0; n < ninfo; n++) {
 600             if (0 == strncmp(info[n].key, PMIX_GDS_MODULE, PMIX_MAX_KEYLEN)) {
 601                 PMIX_INFO_LOAD(&ginfo, PMIX_GDS_MODULE, info[n].value.data.string, PMIX_STRING);
 602                 found = true;
 603                 break;
 604             }
 605         }
 606     }
 607     if (!found) {
 608         PMIX_INFO_LOAD(&ginfo, PMIX_GDS_MODULE, "hash", PMIX_STRING);
 609     }
 610     pmix_globals.mypeer->nptr->compat.gds = pmix_gds_base_assign_module(&ginfo, 1);
 611     if (NULL == pmix_globals.mypeer->nptr->compat.gds) {
 612         PMIX_INFO_DESTRUCT(&ginfo);
 613         pmix_init_result = PMIX_ERR_INIT;
 614         PMIX_RELEASE_THREAD(&pmix_global_lock);
 615         return PMIX_ERR_INIT;
 616     }
 617     PMIX_INFO_DESTRUCT(&ginfo);
 618 
 619     /* connect to the server */
 620     rc = pmix_ptl_base_connect_to_peer((struct pmix_peer_t*)pmix_client_globals.myserver, info, ninfo);
 621     if (PMIX_SUCCESS != rc) {
 622         pmix_init_result = rc;
 623         PMIX_RELEASE_THREAD(&pmix_global_lock);
 624         return rc;
 625     }
 626     /* mark that we are using the same module as used for the server */
 627     pmix_globals.mypeer->nptr->compat.ptl = pmix_client_globals.myserver->nptr->compat.ptl;
 628 
 629     /* send a request for our job info - we do this as a non-blocking
 630      * transaction because some systems cannot handle very large
 631      * blocking operations and error out if we try them. */
 632      req = PMIX_NEW(pmix_buffer_t);
 633      PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 634                       req, &cmd, 1, PMIX_COMMAND);
 635      if (PMIX_SUCCESS != rc) {
 636         PMIX_ERROR_LOG(rc);
 637         PMIX_RELEASE(req);
 638         pmix_init_result = rc;
 639         PMIX_RELEASE_THREAD(&pmix_global_lock);
 640         return rc;
 641     }
 642     /* send to the server */
 643     PMIX_CONSTRUCT(&cb, pmix_cb_t);
 644     PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
 645                        req, job_data, (void*)&cb);
 646     if (PMIX_SUCCESS != rc) {
 647         pmix_init_result = rc;
 648         PMIX_RELEASE_THREAD(&pmix_global_lock);
 649         return rc;
 650     }
 651     /* wait for the data to return */
 652     PMIX_WAIT_THREAD(&cb.lock);
 653     rc = cb.status;
 654     PMIX_DESTRUCT(&cb);
 655 
 656     if (PMIX_SUCCESS == rc) {
 657         pmix_init_result = PMIX_SUCCESS;
 658     } else {
 659         pmix_init_result = rc;
 660         PMIX_RELEASE_THREAD(&pmix_global_lock);
 661         return rc;
 662     }
 663     PMIX_RELEASE_THREAD(&pmix_global_lock);
 664 
 665     /* look for a debugger attach key */
 666     pmix_strncpy(wildcard.nspace, pmix_globals.myid.nspace, PMIX_MAX_NSLEN);
 667     wildcard.rank = PMIX_RANK_WILDCARD;
 668     PMIX_INFO_LOAD(&ginfo, PMIX_OPTIONAL, NULL, PMIX_BOOL);
 669     if (PMIX_SUCCESS == PMIx_Get(&wildcard, PMIX_DEBUG_STOP_IN_INIT, &ginfo, 1, &val)) {
 670         PMIX_VALUE_FREE(val, 1); // cleanup memory
 671         /* if the value was found, then we need to wait for debugger attach here */
 672         /* register for the debugger release notification */
 673         PMIX_CONSTRUCT_LOCK(&reglock);
 674         PMIX_CONSTRUCT_LOCK(&releaselock);
 675         PMIX_INFO_LOAD(&evinfo[0], PMIX_EVENT_RETURN_OBJECT, &releaselock, PMIX_POINTER);
 676         PMIX_INFO_LOAD(&evinfo[1], PMIX_EVENT_HDLR_NAME, "WAIT-FOR-DEBUGGER", PMIX_STRING);
 677         pmix_output_verbose(2, pmix_client_globals.base_output,
 678                             "[%s:%d] WAITING IN INIT FOR DEBUGGER",
 679                             pmix_globals.myid.nspace, pmix_globals.myid.rank);
 680         PMIx_Register_event_handler(&code, 1, evinfo, 2,
 681                                     notification_fn, evhandler_reg_callbk, (void*)&reglock);
 682         /* wait for registration to complete */
 683         PMIX_WAIT_THREAD(&reglock);
 684         PMIX_DESTRUCT_LOCK(&reglock);
 685         PMIX_INFO_DESTRUCT(&evinfo[0]);
 686         PMIX_INFO_DESTRUCT(&evinfo[1]);
 687         /* wait for release to arrive */
 688         PMIX_WAIT_THREAD(&releaselock);
 689         PMIX_DESTRUCT_LOCK(&releaselock);
 690     }
 691     PMIX_INFO_DESTRUCT(&ginfo);
 692 
 693     /* check to see if we need to notify anyone */
 694     if (NULL != info) {
 695         _check_for_notify(info, ninfo);
 696     }
 697 
 698     /* register the client supported attrs */
 699     rc = pmix_register_client_attrs();
 700     return rc;
 701 }
 702 
 703 PMIX_EXPORT int PMIx_Initialized(void)
 704 {
 705     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
 706 
 707     if (0 < pmix_globals.init_cntr) {
 708         PMIX_RELEASE_THREAD(&pmix_global_lock);
 709         return true;
 710     }
 711     PMIX_RELEASE_THREAD(&pmix_global_lock);
 712     return false;
 713 }
 714 
 715 typedef struct {
 716     pmix_lock_t lock;
 717     pmix_event_t ev;
 718     bool active;
 719 } pmix_client_timeout_t;
 720 
 721 /* timer callback */
 722 static void fin_timeout(int sd, short args, void *cbdata)
 723 {
 724     pmix_client_timeout_t *tev;
 725     tev = (pmix_client_timeout_t*)cbdata;
 726 
 727     pmix_output_verbose(2, pmix_client_globals.base_output,
 728                         "pmix:client finwait timeout fired");
 729     if (tev->active) {
 730         tev->active = false;
 731         PMIX_WAKEUP_THREAD(&tev->lock);
 732     }
 733 }
 734 /* callback for finalize completion */
 735 static void finwait_cbfunc(struct pmix_peer_t *pr,
 736                            pmix_ptl_hdr_t *hdr,
 737                            pmix_buffer_t *buf, void *cbdata)
 738 {
 739     pmix_client_timeout_t *tev;
 740     tev = (pmix_client_timeout_t*)cbdata;
 741 
 742     pmix_output_verbose(2, pmix_client_globals.base_output,
 743                         "pmix:client finwait_cbfunc received");
 744     if (tev->active) {
 745         tev->active = false;
 746         PMIX_WAKEUP_THREAD(&tev->lock);
 747     }
 748 }
 749 
 750 PMIX_EXPORT pmix_status_t PMIx_Finalize(const pmix_info_t info[], size_t ninfo)
 751 {
 752     pmix_buffer_t *msg;
 753     pmix_cmd_t cmd = PMIX_FINALIZE_CMD;
 754     pmix_status_t rc;
 755     size_t n;
 756     pmix_client_timeout_t tev;
 757     struct timeval tv = {2, 0};
 758     pmix_peer_t *peer;
 759     int i;
 760 
 761     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
 762     if (1 != pmix_globals.init_cntr) {
 763         --pmix_globals.init_cntr;
 764         PMIX_RELEASE_THREAD(&pmix_global_lock);
 765         return PMIX_SUCCESS;
 766     }
 767     pmix_globals.init_cntr = 0;
 768 
 769     pmix_output_verbose(2, pmix_client_globals.base_output,
 770                         "%s:%d pmix:client finalize called",
 771                         pmix_globals.myid.nspace, pmix_globals.myid.rank);
 772 
 773     /* mark that I called finalize */
 774     pmix_globals.mypeer->finalized = true;
 775 
 776     if ( 0 <= pmix_client_globals.myserver->sd ) {
 777         /* check to see if we are supposed to execute a
 778          * blocking fence prior to actually finalizing */
 779         if (NULL != info && 0 < ninfo) {
 780             for (n=0; n < ninfo; n++) {
 781                 if (0 == strcmp(PMIX_EMBED_BARRIER, info[n].key)) {
 782                     if (PMIX_INFO_TRUE(&info[n])) {
 783                         rc = PMIx_Fence(NULL, 0, NULL, 0);
 784                         if (PMIX_SUCCESS != rc) {
 785                             PMIX_ERROR_LOG(rc);
 786                         }
 787                     }
 788                     break;
 789                 }
 790             }
 791         }
 792 
 793         /* setup a cmd message to notify the PMIx
 794          * server that we are normally terminating */
 795         msg = PMIX_NEW(pmix_buffer_t);
 796         /* pack the cmd */
 797         PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 798                          msg, &cmd, 1, PMIX_COMMAND);
 799         if (PMIX_SUCCESS != rc) {
 800             PMIX_ERROR_LOG(rc);
 801             PMIX_RELEASE(msg);
 802             PMIX_RELEASE_THREAD(&pmix_global_lock);
 803             return rc;
 804         }
 805 
 806 
 807         pmix_output_verbose(2, pmix_client_globals.base_output,
 808                              "%s:%d pmix:client sending finalize sync to server",
 809                              pmix_globals.myid.nspace, pmix_globals.myid.rank);
 810 
 811         /* setup a timer to protect ourselves should the server be unable
 812          * to answer for some reason */
 813         PMIX_CONSTRUCT_LOCK(&tev.lock);
 814         pmix_event_assign(&tev.ev, pmix_globals.evbase, -1, 0,
 815                           fin_timeout, &tev);
 816         tev.active = true;
 817         PMIX_POST_OBJECT(&tev);
 818         pmix_event_add(&tev.ev, &tv);
 819         /* send to the server */
 820         PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
 821                            msg, finwait_cbfunc, (void*)&tev);
 822         if (PMIX_SUCCESS != rc) {
 823             PMIX_RELEASE_THREAD(&pmix_global_lock);
 824             return rc;
 825         }
 826 
 827         /* wait for the ack to return */
 828         PMIX_WAIT_THREAD(&tev.lock);
 829         PMIX_DESTRUCT_LOCK(&tev.lock);
 830         if (tev.active) {
 831             pmix_event_del(&tev.ev);
 832         }
 833 
 834         pmix_output_verbose(2, pmix_client_globals.base_output,
 835                              "%s:%d pmix:client finalize sync received",
 836                              pmix_globals.myid.nspace, pmix_globals.myid.rank);
 837     }
 838 
 839     if (!pmix_globals.external_evbase) {
 840         /* stop the progress thread, but leave the event base
 841          * still constructed. This will allow us to safely
 842          * tear down the infrastructure, including removal
 843          * of any events objects may be holding */
 844         (void)pmix_progress_thread_pause(NULL);
 845     }
 846 
 847     PMIX_LIST_DESTRUCT(&pmix_client_globals.pending_requests);
 848     for (i=0; i < pmix_client_globals.peers.size; i++) {
 849         if (NULL != (peer = (pmix_peer_t*)pmix_pointer_array_get_item(&pmix_client_globals.peers, i))) {
 850             PMIX_RELEASE(peer);
 851         }
 852     }
 853     PMIX_DESTRUCT(&pmix_client_globals.peers);
 854 
 855     if (0 <= pmix_client_globals.myserver->sd) {
 856         CLOSE_THE_SOCKET(pmix_client_globals.myserver->sd);
 857     }
 858     if (NULL != pmix_client_globals.myserver) {
 859         PMIX_RELEASE(pmix_client_globals.myserver);
 860     }
 861 
 862 
 863     pmix_rte_finalize();
 864     if (NULL != pmix_globals.mypeer) {
 865         PMIX_RELEASE(pmix_globals.mypeer);
 866     }
 867 
 868     PMIX_RELEASE_THREAD(&pmix_global_lock);
 869 
 870     /* finalize the class/object system */
 871     pmix_class_finalize();
 872 
 873     return PMIX_SUCCESS;
 874 }
 875 
 876 PMIX_EXPORT pmix_status_t PMIx_Abort(int flag, const char msg[],
 877                                      pmix_proc_t procs[], size_t nprocs)
 878 {
 879     pmix_buffer_t *bfr;
 880     pmix_cmd_t cmd = PMIX_ABORT_CMD;
 881     pmix_status_t rc;
 882     pmix_lock_t reglock;
 883 
 884     pmix_output_verbose(2, pmix_client_globals.base_output,
 885                         "pmix:client abort called");
 886 
 887     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
 888     if (pmix_globals.init_cntr <= 0) {
 889         PMIX_RELEASE_THREAD(&pmix_global_lock);
 890         return PMIX_ERR_INIT;
 891     }
 892 
 893     /* if we aren't connected, don't attempt to send */
 894     if (!pmix_globals.connected) {
 895         PMIX_RELEASE_THREAD(&pmix_global_lock);
 896         return PMIX_ERR_UNREACH;
 897     }
 898     PMIX_RELEASE_THREAD(&pmix_global_lock);
 899 
 900     /* create a buffer to hold the message */
 901     bfr = PMIX_NEW(pmix_buffer_t);
 902     /* pack the cmd */
 903     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 904                      bfr, &cmd, 1, PMIX_COMMAND);
 905     if (PMIX_SUCCESS != rc) {
 906         PMIX_ERROR_LOG(rc);
 907         PMIX_RELEASE(bfr);
 908         return rc;
 909     }
 910     /* pack the status flag */
 911     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 912                      bfr, &flag, 1, PMIX_STATUS);
 913     if (PMIX_SUCCESS != rc) {
 914         PMIX_ERROR_LOG(rc);
 915         PMIX_RELEASE(bfr);
 916         return rc;
 917     }
 918     /* pack the string message - a NULL is okay */
 919     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 920                      bfr, &msg, 1, PMIX_STRING);
 921     if (PMIX_SUCCESS != rc) {
 922         PMIX_ERROR_LOG(rc);
 923         PMIX_RELEASE(bfr);
 924         return rc;
 925     }
 926     /* pack the number of procs */
 927     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 928                      bfr, &nprocs, 1, PMIX_SIZE);
 929     if (PMIX_SUCCESS != rc) {
 930         PMIX_ERROR_LOG(rc);
 931         PMIX_RELEASE(bfr);
 932         return rc;
 933     }
 934     /* pack any provided procs */
 935     if (0 < nprocs) {
 936         PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 937                          bfr, procs, nprocs, PMIX_PROC);
 938         if (PMIX_SUCCESS != rc) {
 939             PMIX_ERROR_LOG(rc);
 940             PMIX_RELEASE(bfr);
 941             return rc;
 942         }
 943     }
 944 
 945     /* send to the server */
 946     PMIX_CONSTRUCT_LOCK(&reglock);
 947     PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver, bfr,
 948                        wait_cbfunc, (void*)&reglock);
 949     if (PMIX_SUCCESS != rc) {
 950         PMIX_DESTRUCT_LOCK(&reglock);
 951         return rc;
 952     }
 953 
 954     /* wait for the release */
 955      PMIX_WAIT_THREAD(&reglock);
 956      PMIX_DESTRUCT_LOCK(&reglock);
 957      return PMIX_SUCCESS;
 958  }
 959 
 960 static void _putfn(int sd, short args, void *cbdata)
 961 {
 962     pmix_cb_t *cb = (pmix_cb_t*)cbdata;
 963     pmix_status_t rc;
 964     pmix_kval_t *kv = NULL;
 965     uint8_t *tmp;
 966     size_t len;
 967 
 968     /* need to acquire the cb object from its originating thread */
 969     PMIX_ACQUIRE_OBJECT(cb);
 970 
 971     /* no need to push info that starts with "pmix" as that is
 972      * info we would have been provided at startup */
 973     if (0 == strncmp(cb->key, "pmix", 4)) {
 974         rc = PMIX_SUCCESS;
 975         goto done;
 976     }
 977 
 978     /* setup to xfer the data */
 979     kv = PMIX_NEW(pmix_kval_t);
 980     kv->key = strdup(cb->key);  // need to copy as the input belongs to the user
 981     kv->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
 982     if (PMIX_STRING_SIZE_CHECK(cb->value)) {
 983         /* compress large strings */
 984         if (pmix_compress.compress_string(cb->value->data.string, &tmp, &len)) {
 985             if (NULL == tmp) {
 986                 PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
 987                 rc = PMIX_ERR_NOMEM;
 988                 PMIX_ERROR_LOG(rc);
 989                 goto done;
 990             }
 991             kv->value->type = PMIX_COMPRESSED_STRING;
 992             kv->value->data.bo.bytes = (char*)tmp;
 993             kv->value->data.bo.size = len;
 994             rc = PMIX_SUCCESS;
 995         } else {
 996             PMIX_BFROPS_VALUE_XFER(rc, pmix_globals.mypeer,
 997                                    kv->value, cb->value);
 998         }
 999     } else {
1000         PMIX_BFROPS_VALUE_XFER(rc, pmix_globals.mypeer,
1001                                kv->value, cb->value);
1002     }
1003     if (PMIX_SUCCESS != rc) {
1004         PMIX_ERROR_LOG(rc);
1005         goto done;
1006     }
1007 
1008     /* store it */
1009     PMIX_GDS_STORE_KV(rc, pmix_globals.mypeer,
1010                        &pmix_globals.myid,
1011                        cb->scope, kv);
1012     if (PMIX_SUCCESS != rc) {
1013         PMIX_ERROR_LOG(rc);
1014     }
1015 
1016     /* mark that fresh values have been stored so we know
1017      * to commit them later */
1018     pmix_globals.commits_pending = true;
1019 
1020   done:
1021     if (NULL != kv) {
1022         PMIX_RELEASE(kv);  // maintain accounting
1023     }
1024     cb->pstatus = rc;
1025     /* post the data so the receiving thread can acquire it */
1026     PMIX_POST_OBJECT(cb);
1027     PMIX_WAKEUP_THREAD(&cb->lock);
1028 }
1029 
1030 PMIX_EXPORT pmix_status_t PMIx_Put(pmix_scope_t scope,
1031                                    const pmix_key_t key,
1032                                    pmix_value_t *val)
1033 {
1034     pmix_cb_t *cb;
1035     pmix_status_t rc;
1036 
1037     pmix_output_verbose(2, pmix_client_globals.base_output,
1038                         "pmix: executing put for key %s type %d",
1039                         key, val->type);
1040 
1041     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
1042     if (pmix_globals.init_cntr <= 0) {
1043         PMIX_RELEASE_THREAD(&pmix_global_lock);
1044         return PMIX_ERR_INIT;
1045     }
1046     PMIX_RELEASE_THREAD(&pmix_global_lock);
1047 
1048     /* create a callback object */
1049     cb = PMIX_NEW(pmix_cb_t);
1050     cb->scope = scope;
1051     cb->key = (char*)key;
1052     cb->value = val;
1053 
1054     /* pass this into the event library for thread protection */
1055     PMIX_THREADSHIFT(cb, _putfn);
1056 
1057     /* wait for the result */
1058     PMIX_WAIT_THREAD(&cb->lock);
1059     rc = cb->pstatus;
1060     PMIX_RELEASE(cb);
1061 
1062     return rc;
1063 }
1064 
1065 static void _commitfn(int sd, short args, void *cbdata)
1066 {
1067     pmix_cb_t *cb = (pmix_cb_t*)cbdata;
1068     pmix_status_t rc;
1069     pmix_scope_t scope;
1070     pmix_buffer_t *msgout, bkt;
1071     pmix_cmd_t cmd=PMIX_COMMIT_CMD;
1072     pmix_kval_t *kv, *kvn;
1073 
1074     /* need to acquire the cb object from its originating thread */
1075     PMIX_ACQUIRE_OBJECT(cb);
1076 
1077     msgout = PMIX_NEW(pmix_buffer_t);
1078     /* pack the cmd */
1079     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
1080                      msgout, &cmd, 1, PMIX_COMMAND);
1081     if (PMIX_SUCCESS != rc) {
1082         PMIX_ERROR_LOG(rc);
1083         PMIX_RELEASE(msgout);
1084         goto error;
1085     }
1086 
1087     /* if we haven't already done it, ensure we have committed our values */
1088     if (pmix_globals.commits_pending) {
1089         /* fetch and pack the local values */
1090         scope = PMIX_LOCAL;
1091         /* allow the GDS module to pass us this info
1092          * as a local connection as this data would
1093          * only go to another local client */
1094         cb->proc = &pmix_globals.myid;
1095         cb->scope = scope;
1096         cb->copy = false;
1097         PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, cb);
1098         if (PMIX_SUCCESS == rc) {
1099             PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
1100                              msgout, &scope, 1, PMIX_SCOPE);
1101             if (PMIX_SUCCESS != rc) {
1102                 PMIX_ERROR_LOG(rc);
1103                 PMIX_RELEASE(msgout);
1104                 goto error;
1105             }
1106             PMIX_CONSTRUCT(&bkt, pmix_buffer_t);
1107             PMIX_LIST_FOREACH_SAFE(kv, kvn, &cb->kvs, pmix_kval_t) {
1108                 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
1109                                  &bkt, kv, 1, PMIX_KVAL);
1110                 if (PMIX_SUCCESS != rc) {
1111                     PMIX_ERROR_LOG(rc);
1112                     PMIX_DESTRUCT(&bkt);
1113                     PMIX_RELEASE(msgout);
1114                     goto error;
1115                 }
1116                 pmix_list_remove_item(&cb->kvs, &kv->super);
1117                 PMIX_RELEASE(kv);
1118             }
1119             /* now pack the result */
1120             PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
1121                              msgout, &bkt, 1, PMIX_BUFFER);
1122             PMIX_DESTRUCT(&bkt);
1123             if (PMIX_SUCCESS != rc) {
1124                 PMIX_ERROR_LOG(rc);
1125                 PMIX_RELEASE(msgout);
1126                 goto error;
1127             }
1128         }
1129 
1130         /* fetch and pack the remote values */
1131         scope = PMIX_REMOTE;
1132         /* we need real copies here as this data will
1133          * go to remote procs - so a connection will
1134          * not suffice */
1135         cb->proc = &pmix_globals.myid;
1136         cb->scope = scope;
1137         cb->copy = true;
1138         PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, cb);
1139         if (PMIX_SUCCESS == rc) {
1140             PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
1141                              msgout, &scope, 1, PMIX_SCOPE);
1142             if (PMIX_SUCCESS != rc) {
1143                 PMIX_ERROR_LOG(rc);
1144                 PMIX_RELEASE(msgout);
1145                 goto error;
1146             }
1147             PMIX_CONSTRUCT(&bkt, pmix_buffer_t);
1148             PMIX_LIST_FOREACH_SAFE(kv, kvn, &cb->kvs, pmix_kval_t) {
1149                 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
1150                                  &bkt, kv, 1, PMIX_KVAL);
1151                 if (PMIX_SUCCESS != rc) {
1152                     PMIX_ERROR_LOG(rc);
1153                     PMIX_DESTRUCT(&bkt);
1154                     PMIX_RELEASE(msgout);
1155                     goto error;
1156                 }
1157                 pmix_list_remove_item(&cb->kvs, &kv->super);
1158                 PMIX_RELEASE(kv);
1159             }
1160             /* now pack the result */
1161             PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
1162                              msgout, &bkt, 1, PMIX_BUFFER);
1163             PMIX_DESTRUCT(&bkt);
1164             if (PMIX_SUCCESS != rc) {
1165                 PMIX_ERROR_LOG(rc);
1166                 PMIX_RELEASE(msgout);
1167                 goto error;
1168             }
1169         }
1170 
1171         /* record that all committed data to-date has been sent */
1172         pmix_globals.commits_pending = false;
1173     }
1174 
1175     /* always send, even if we have nothing to contribute, so the server knows
1176      * that we contributed whatever we had */
1177     PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver, msgout,
1178                        wait_cbfunc, (void*)&cb->lock);
1179     if (PMIX_SUCCESS == rc) {
1180         /* we should wait for the callback, so don't
1181          * modify the active flag */
1182         cb->pstatus = PMIX_SUCCESS;
1183         return;
1184     }
1185 
1186   error:
1187     cb->pstatus = rc;
1188     /* post the data so the receiving thread can acquire it */
1189     PMIX_POST_OBJECT(cb);
1190     PMIX_WAKEUP_THREAD(&cb->lock);
1191  }
1192 
1193  PMIX_EXPORT pmix_status_t PMIx_Commit(void)
1194  {
1195     pmix_cb_t *cb;
1196     pmix_status_t rc;
1197 
1198     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
1199     if (pmix_globals.init_cntr <= 0) {
1200         PMIX_RELEASE_THREAD(&pmix_global_lock);
1201         return PMIX_ERR_INIT;
1202     }
1203 
1204     /* if we are a server, or we aren't connected, don't attempt to send */
1205     if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) {
1206         PMIX_RELEASE_THREAD(&pmix_global_lock);
1207         return PMIX_SUCCESS;  // not an error
1208     }
1209     if (!pmix_globals.connected) {
1210         PMIX_RELEASE_THREAD(&pmix_global_lock);
1211         return PMIX_ERR_UNREACH;
1212     }
1213     PMIX_RELEASE_THREAD(&pmix_global_lock);
1214 
1215     /* create a callback object */
1216     cb = PMIX_NEW(pmix_cb_t);
1217     /* pass this into the event library for thread protection */
1218     PMIX_THREADSHIFT(cb, _commitfn);
1219 
1220     /* wait for the result */
1221     PMIX_WAIT_THREAD(&cb->lock);
1222     rc = cb->pstatus;
1223     PMIX_RELEASE(cb);
1224 
1225     return rc;
1226 }
1227 
1228 static void _resolve_peers(int sd, short args, void *cbdata)
1229 {
1230     pmix_cb_t *cb = (pmix_cb_t*)cbdata;
1231 
1232     cb->status = pmix_preg.resolve_peers(cb->key, cb->pname.nspace,
1233                                          &cb->procs, &cb->nprocs);
1234     /* post the data so the receiving thread can acquire it */
1235     PMIX_POST_OBJECT(cb);
1236     PMIX_WAKEUP_THREAD(&cb->lock);
1237 }
1238 
1239 /* need to thread-shift this request */
1240 PMIX_EXPORT pmix_status_t PMIx_Resolve_peers(const char *nodename,
1241                                              const pmix_nspace_t nspace,
1242                                              pmix_proc_t **procs, size_t *nprocs)
1243 {
1244     pmix_cb_t *cb;
1245     pmix_status_t rc;
1246     pmix_proc_t proc;
1247 
1248     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
1249     if (pmix_globals.init_cntr <= 0) {
1250         PMIX_RELEASE_THREAD(&pmix_global_lock);
1251         return PMIX_ERR_INIT;
1252     }
1253     PMIX_RELEASE_THREAD(&pmix_global_lock);
1254 
1255 
1256     cb = PMIX_NEW(pmix_cb_t);
1257     cb->key = (char*)nodename;
1258     cb->pname.nspace = strdup(nspace);
1259 
1260     PMIX_THREADSHIFT(cb, _resolve_peers);
1261 
1262     /* wait for the result */
1263     PMIX_WAIT_THREAD(&cb->lock);
1264 
1265     /* if the nspace wasn't found, then we need to
1266      * ask the server for that info */
1267     if (PMIX_ERR_INVALID_NAMESPACE == cb->status) {
1268         pmix_strncpy(proc.nspace, nspace, PMIX_MAX_NSLEN);
1269         proc.rank = PMIX_RANK_WILDCARD;
1270         /* any key will suffice as it will bring down
1271          * the entire data blob */
1272         rc = PMIx_Get(&proc, PMIX_UNIV_SIZE, NULL, 0, NULL);
1273         if (PMIX_SUCCESS != rc) {
1274             PMIX_RELEASE(cb);
1275             return rc;
1276         }
1277         /* retry the fetch */
1278         cb->lock.active = true;
1279         PMIX_THREADSHIFT(cb, _resolve_peers);
1280         PMIX_WAIT_THREAD(&cb->lock);
1281     }
1282     *procs = cb->procs;
1283     *nprocs = cb->nprocs;
1284 
1285     rc = cb->status;
1286     PMIX_RELEASE(cb);
1287     return rc;
1288 }
1289 
1290 static void _resolve_nodes(int fd, short args, void *cbdata)
1291 {
1292     pmix_cb_t *cb = (pmix_cb_t*)cbdata;
1293     char *regex, **names;
1294 
1295     /* get a regular expression describing the PMIX_NODE_MAP */
1296     cb->status = pmix_preg.resolve_nodes(cb->pname.nspace, &regex);
1297     if (PMIX_SUCCESS == cb->status) {
1298         /* parse it into an argv array of names */
1299         cb->status = pmix_preg.parse_nodes(regex, &names);
1300         if (PMIX_SUCCESS == cb->status) {
1301             /* assemble it into a comma-delimited list */
1302             cb->key = pmix_argv_join(names, ',');
1303             pmix_argv_free(names);
1304         } else {
1305             free(regex);
1306         }
1307     }
1308     /* post the data so the receiving thread can acquire it */
1309     PMIX_POST_OBJECT(cb);
1310     PMIX_WAKEUP_THREAD(&cb->lock);
1311 }
1312 
1313 /* need to thread-shift this request */
1314 PMIX_EXPORT pmix_status_t PMIx_Resolve_nodes(const pmix_nspace_t nspace, char **nodelist)
1315 {
1316     pmix_cb_t *cb;
1317     pmix_status_t rc;
1318     pmix_proc_t proc;
1319 
1320     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
1321     if (pmix_globals.init_cntr <= 0) {
1322         PMIX_RELEASE_THREAD(&pmix_global_lock);
1323         return PMIX_ERR_INIT;
1324     }
1325     PMIX_RELEASE_THREAD(&pmix_global_lock);
1326 
1327     cb = PMIX_NEW(pmix_cb_t);
1328     cb->pname.nspace = strdup(nspace);
1329 
1330     PMIX_THREADSHIFT(cb, _resolve_nodes);
1331 
1332     /* wait for the result */
1333     PMIX_WAIT_THREAD(&cb->lock);
1334 
1335     /* if the nspace wasn't found, then we need to
1336      * ask the server for that info */
1337     if (PMIX_ERR_INVALID_NAMESPACE == cb->status) {
1338         pmix_strncpy(proc.nspace, nspace, PMIX_MAX_NSLEN);
1339         proc.rank = PMIX_RANK_WILDCARD;
1340         /* any key will suffice as it will bring down
1341          * the entire data blob */
1342         rc = PMIx_Get(&proc, PMIX_UNIV_SIZE, NULL, 0, NULL);
1343         if (PMIX_SUCCESS != rc) {
1344             PMIX_RELEASE(cb);
1345             return rc;
1346         }
1347         /* retry the fetch */
1348         cb->lock.active = true;
1349         PMIX_THREADSHIFT(cb, _resolve_nodes);
1350         PMIX_WAIT_THREAD(&cb->lock);
1351     }
1352     /* the string we want is in the key field */
1353     *nodelist = cb->key;
1354 
1355     rc = cb->status;
1356     PMIX_RELEASE(cb);
1357     return rc;
1358 }

/* [<][>][^][v][top][bottom][index][help] */