root/opal/mca/pmix/pmix4x/pmix/src/server/pmix_server.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. pmix_server_initialize
  2. PMIx_server_init
  3. PMIx_server_finalize
  4. _register_nspace
  5. PMIx_server_register_nspace
  6. pmix_server_purge_events
  7. _deregister_nspace
  8. PMIx_server_deregister_nspace
  9. pmix_server_execute_collective
  10. _register_client
  11. PMIx_server_register_client
  12. _deregister_client
  13. PMIx_server_deregister_client
  14. PMIx_server_setup_fork
  15. _dmodex_req
  16. PMIx_server_dmodex_request
  17. _store_internal
  18. PMIx_Store_internal
  19. PMIx_generate_regex
  20. PMIx_generate_ppn
  21. _setup_op
  22. _setup_app
  23. PMIx_server_setup_application
  24. _setup_local_support
  25. PMIx_server_setup_local_support
  26. _iofdeliver
  27. PMIx_server_IOF_deliver
  28. cirelease
  29. clct_complete
  30. clct
  31. PMIx_server_collect_inventory
  32. dlinv_complete
  33. dlinv
  34. PMIx_server_deliver_inventory
  35. op_cbfunc
  36. connection_cleanup
  37. op_cbfunc2
  38. _spcb
  39. spawn_cbfunc
  40. lookup_cbfunc
  41. _mdxcbfunc
  42. modex_cbfunc
  43. get_cbfunc
  44. _cnct
  45. cnct_cbfunc
  46. _discnct
  47. discnct_cbfunc
  48. regevents_cbfunc
  49. notifyerror_cbfunc
  50. alloc_cbfunc
  51. query_cbfunc
  52. jctrl_cbfunc
  53. monitor_cbfunc
  54. cred_cbfunc
  55. validate_cbfunc
  56. _iofreg
  57. iof_cbfunc
  58. server_switchyard
  59. pmix_server_message_handler

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2014-2019 Intel, Inc.  All rights reserved.
   4  * Copyright (c) 2014-2019 Research Organization for Information Science
   5  *                         and Technology (RIST).  All rights reserved.
   6  * Copyright (c) 2014-2015 Artem Y. Polyakov <artpol84@gmail.com>.
   7  *                         All rights reserved.
   8  * Copyright (c) 2016-2019 Mellanox Technologies, Inc.
   9  *                         All rights reserved.
  10  * Copyright (c) 2016-2018 IBM Corporation.  All rights reserved.
  11  * Copyright (c) 2018      Cisco Systems, Inc.  All rights reserved
  12  * $COPYRIGHT$
  13  *
  14  * Additional copyrights may follow
  15  *
  16  * $HEADER$
  17  */
  18 
  19 #include <src/include/pmix_config.h>
  20 
  21 #include <src/include/pmix_stdint.h>
  22 #include <src/include/pmix_socket_errno.h>
  23 
  24 #include <pmix_server.h>
  25 #include <pmix_common.h>
  26 #include <pmix_rename.h>
  27 
  28 #include "src/include/pmix_globals.h"
  29 
  30 #ifdef HAVE_STRING_H
  31 #include <string.h>
  32 #endif
  33 #include <fcntl.h>
  34 #ifdef HAVE_UNISTD_H
  35 #include <unistd.h>
  36 #endif
  37 #ifdef HAVE_SYS_SOCKET_H
  38 #include <sys/socket.h>
  39 #endif
  40 #ifdef HAVE_SYS_UN_H
  41 #include <sys/un.h>
  42 #endif
  43 #ifdef HAVE_SYS_UIO_H
  44 #include <sys/uio.h>
  45 #endif
  46 #ifdef HAVE_SYS_TYPES_H
  47 #include <sys/types.h>
  48 #endif
  49 #include <ctype.h>
  50 #include <sys/stat.h>
  51 
  52 
  53 #include "src/common/pmix_attributes.h"
  54 #include "src/util/argv.h"
  55 #include "src/util/error.h"
  56 #include "src/util/name_fns.h"
  57 #include "src/util/output.h"
  58 #include "src/util/pmix_environ.h"
  59 #include "src/util/show_help.h"
  60 #include "src/mca/base/base.h"
  61 #include "src/mca/base/pmix_mca_base_var.h"
  62 #include "src/mca/pinstalldirs/base/base.h"
  63 #include "src/mca/pnet/base/base.h"
  64 #include "src/runtime/pmix_progress_threads.h"
  65 #include "src/runtime/pmix_rte.h"
  66 #include "src/mca/bfrops/base/base.h"
  67 #include "src/mca/gds/base/base.h"
  68 #include "src/mca/preg/preg.h"
  69 #include "src/mca/psensor/base/base.h"
  70 #include "src/mca/ptl/base/base.h"
  71 #include "src/hwloc/hwloc-internal.h"
  72 
  73 /* the server also needs access to client operations
  74  * as it can, and often does, behave as a client */
  75 #include "src/client/pmix_client_ops.h"
  76 #include "pmix_server_ops.h"
  77 
  78 // global variables
  79 pmix_server_globals_t pmix_server_globals = {{{0}}};
  80 
  81 // local variables
  82 static char *security_mode = NULL;
  83 static char *ptl_mode = NULL;
  84 static char *bfrops_mode = NULL;
  85 static char *gds_mode = NULL;
  86 static pid_t mypid;
  87 
  88 // local functions for connection support
  89 pmix_status_t pmix_server_initialize(void)
  90 {
  91     /* setup the server-specific globals */
  92     PMIX_CONSTRUCT(&pmix_server_globals.clients, pmix_pointer_array_t);
  93     pmix_pointer_array_init(&pmix_server_globals.clients, 1, INT_MAX, 1);
  94     PMIX_CONSTRUCT(&pmix_server_globals.collectives, pmix_list_t);
  95     PMIX_CONSTRUCT(&pmix_server_globals.remote_pnd, pmix_list_t);
  96     PMIX_CONSTRUCT(&pmix_server_globals.gdata, pmix_list_t);
  97     PMIX_CONSTRUCT(&pmix_server_globals.events, pmix_list_t);
  98     PMIX_CONSTRUCT(&pmix_server_globals.local_reqs, pmix_list_t);
  99     PMIX_CONSTRUCT(&pmix_server_globals.nspaces, pmix_list_t);
 100     PMIX_CONSTRUCT(&pmix_server_globals.groups, pmix_list_t);
 101     PMIX_CONSTRUCT(&pmix_server_globals.iof, pmix_list_t);
 102 
 103     pmix_output_verbose(2, pmix_server_globals.base_output,
 104                         "pmix:server init called");
 105 
 106     /* setup the server verbosities */
 107     if (0 < pmix_server_globals.get_verbose) {
 108         /* set default output */
 109         pmix_server_globals.get_output = pmix_output_open(NULL);
 110         pmix_output_set_verbosity(pmix_server_globals.get_output,
 111                                   pmix_server_globals.get_verbose);
 112     }
 113     if (0 < pmix_server_globals.connect_verbose) {
 114         /* set default output */
 115         pmix_server_globals.connect_output = pmix_output_open(NULL);
 116         pmix_output_set_verbosity(pmix_server_globals.connect_output,
 117                                   pmix_server_globals.connect_verbose);
 118     }
 119     if (0 < pmix_server_globals.fence_verbose) {
 120         /* set default output */
 121         pmix_server_globals.fence_output = pmix_output_open(NULL);
 122         pmix_output_set_verbosity(pmix_server_globals.fence_output,
 123                                   pmix_server_globals.fence_verbose);
 124     }
 125     if (0 < pmix_server_globals.pub_verbose) {
 126         /* set default output */
 127         pmix_server_globals.pub_output = pmix_output_open(NULL);
 128         pmix_output_set_verbosity(pmix_server_globals.pub_output,
 129                                   pmix_server_globals.pub_verbose);
 130     }
 131     if (0 < pmix_server_globals.spawn_verbose) {
 132         /* set default output */
 133         pmix_server_globals.spawn_output = pmix_output_open(NULL);
 134         pmix_output_set_verbosity(pmix_server_globals.spawn_output,
 135                                   pmix_server_globals.spawn_verbose);
 136     }
 137     if (0 < pmix_server_globals.event_verbose) {
 138         /* set default output */
 139         pmix_server_globals.event_output = pmix_output_open(NULL);
 140         pmix_output_set_verbosity(pmix_server_globals.event_output,
 141                                   pmix_server_globals.event_verbose);
 142     }
 143     if (0 < pmix_server_globals.iof_verbose) {
 144         /* set default output */
 145         pmix_server_globals.iof_output = pmix_output_open(NULL);
 146         pmix_output_set_verbosity(pmix_server_globals.iof_output,
 147                                   pmix_server_globals.iof_verbose);
 148     }
 149     /* setup the base verbosity */
 150     if (0 < pmix_server_globals.base_verbose) {
 151         /* set default output */
 152         pmix_server_globals.base_output = pmix_output_open(NULL);
 153         pmix_output_set_verbosity(pmix_server_globals.base_output,
 154                                   pmix_server_globals.base_verbose);
 155     }
 156 
 157     return PMIX_SUCCESS;
 158 }
 159 
 160 PMIX_EXPORT pmix_status_t PMIx_server_init(pmix_server_module_t *module,
 161                                            pmix_info_t info[], size_t ninfo)
 162 {
 163     pmix_ptl_posted_recv_t *req;
 164     pmix_status_t rc;
 165     size_t n, m;
 166     pmix_kval_t *kv;
 167     bool protect, nspace_given = false, rank_given = false;
 168     pmix_info_t ginfo;
 169     char *protected[] = {
 170         PMIX_USERID,
 171         PMIX_GRPID,
 172         PMIX_SOCKET_MODE,
 173         PMIX_SERVER_TOOL_SUPPORT,
 174         PMIX_SERVER_SYSTEM_SUPPORT,
 175         PMIX_SERVER_GATEWAY,
 176         NULL
 177     };
 178     char *evar;
 179     pmix_rank_info_t *rinfo;
 180     pmix_proc_type_t ptype = PMIX_PROC_SERVER;
 181 
 182     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
 183 
 184     pmix_output_verbose(2, pmix_server_globals.base_output,
 185                         "pmix:server init called");
 186 
 187     /* setup the function pointers */
 188     pmix_host_server = *module;
 189 
 190     if (NULL != info) {
 191         for (n=0; n < ninfo; n++) {
 192             if (0 == strncmp(info[n].key, PMIX_SERVER_GATEWAY, PMIX_MAX_KEYLEN)) {
 193                 if (PMIX_INFO_TRUE(&info[n])) {
 194                     ptype |= PMIX_PROC_GATEWAY;
 195                 }
 196             } else if (0 == strncmp(info[n].key, PMIX_SERVER_TMPDIR, PMIX_MAX_KEYLEN)) {
 197                 pmix_server_globals.tmpdir = strdup(info[n].value.data.string);
 198             } else if (0 == strncmp(info[n].key, PMIX_SYSTEM_TMPDIR, PMIX_MAX_KEYLEN)) {
 199                 pmix_server_globals.system_tmpdir = strdup(info[n].value.data.string);
 200             }
 201         }
 202     }
 203     if (NULL == pmix_server_globals.tmpdir) {
 204         if (NULL == (evar = getenv("PMIX_SERVER_TMPDIR"))) {
 205             pmix_server_globals.tmpdir = strdup(pmix_tmp_directory());
 206         } else {
 207             pmix_server_globals.tmpdir = strdup(evar);
 208         }
 209     }
 210     if (NULL == pmix_server_globals.system_tmpdir) {
 211         if (NULL == (evar = getenv("PMIX_SYSTEM_TMPDIR"))) {
 212             pmix_server_globals.system_tmpdir = strdup(pmix_tmp_directory());
 213         } else {
 214             pmix_server_globals.system_tmpdir = strdup(evar);
 215         }
 216     }
 217 
 218     /* setup the runtime - this init's the globals,
 219      * opens and initializes the required frameworks */
 220     if (PMIX_SUCCESS != (rc = pmix_rte_init(ptype, info, ninfo, NULL))) {
 221         PMIX_ERROR_LOG(rc);
 222         PMIX_RELEASE_THREAD(&pmix_global_lock);
 223         return rc;
 224     }
 225 
 226     /* setup the server-specific globals */
 227     if (PMIX_SUCCESS != (rc = pmix_server_initialize())) {
 228         PMIX_ERROR_LOG(rc);
 229         PMIX_RELEASE_THREAD(&pmix_global_lock);
 230         return rc;
 231     }
 232 
 233     /* assign our internal bfrops module */
 234     pmix_globals.mypeer->nptr->compat.bfrops = pmix_bfrops_base_assign_module(NULL);
 235     if (NULL == pmix_globals.mypeer->nptr->compat.bfrops) {
 236         PMIX_ERROR_LOG(rc);
 237         PMIX_RELEASE_THREAD(&pmix_global_lock);
 238         return rc;
 239     }
 240     /* and set our buffer type */
 241     pmix_globals.mypeer->nptr->compat.type = pmix_bfrops_globals.default_type;
 242 
 243     /* assign our internal security module */
 244     pmix_globals.mypeer->nptr->compat.psec = pmix_psec_base_assign_module(NULL);
 245     if (NULL == pmix_globals.mypeer->nptr->compat.psec) {
 246         PMIX_ERROR_LOG(rc);
 247         PMIX_RELEASE_THREAD(&pmix_global_lock);
 248         return rc;
 249     }
 250 
 251     /* assign our internal ptl module */
 252     pmix_globals.mypeer->nptr->compat.ptl = pmix_ptl_base_assign_module();
 253     if (NULL == pmix_globals.mypeer->nptr->compat.ptl) {
 254         PMIX_ERROR_LOG(rc);
 255         PMIX_RELEASE_THREAD(&pmix_global_lock);
 256         return rc;
 257     }
 258 
 259     /* assign our internal gds module */
 260     PMIX_INFO_LOAD(&ginfo, PMIX_GDS_MODULE, "hash", PMIX_STRING);
 261     pmix_globals.mypeer->nptr->compat.gds = pmix_gds_base_assign_module(&ginfo, 1);
 262     if (NULL == pmix_globals.mypeer->nptr->compat.gds) {
 263         PMIX_ERROR_LOG(rc);
 264         PMIX_RELEASE_THREAD(&pmix_global_lock);
 265         return rc;
 266     }
 267     PMIX_INFO_DESTRUCT(&ginfo);
 268 
 269     /* copy needed parts over to the client_globals.myserver field
 270      * so that calls into client-side functions will use our peer */
 271     pmix_client_globals.myserver = PMIX_NEW(pmix_peer_t);
 272     PMIX_RETAIN(pmix_globals.mypeer->nptr);
 273     pmix_client_globals.myserver->nptr = pmix_globals.mypeer->nptr;
 274 
 275     /* get our available security modules */
 276     security_mode = pmix_psec_base_get_available_modules();
 277 
 278     /* get our available ptl modules */
 279     ptl_mode = pmix_ptl_base_get_available_modules();
 280 
 281     /* get our available bfrop modules */
 282     bfrops_mode = pmix_bfrops_base_get_available_modules();
 283 
 284     /* get available gds modules */
 285     gds_mode = pmix_gds_base_get_available_modules();
 286 
 287     /* check the info keys for info we
 288      * need to provide to every client and
 289      * directives aimed at us */
 290     if (NULL != info) {
 291         for (n=0; n < ninfo; n++) {
 292             if (0 == strncmp(info[n].key, PMIX_SERVER_NSPACE, PMIX_MAX_KEYLEN)) {
 293                 pmix_strncpy(pmix_globals.myid.nspace, info[n].value.data.string, PMIX_MAX_NSLEN);
 294                 nspace_given = true;
 295             } else if (0 == strncmp(info[n].key, PMIX_SERVER_RANK, PMIX_MAX_KEYLEN)) {
 296                 pmix_globals.myid.rank = info[n].value.data.rank;
 297                 rank_given = true;
 298             } else {
 299                 /* check the list of protected keys */
 300                 protect = false;
 301                 for (m=0; NULL != protected[m]; m++) {
 302                     if (0 == strcmp(info[n].key, protected[m])) {
 303                         protect = true;
 304                         break;
 305                     }
 306                 }
 307                 if (protect) {
 308                     continue;
 309                 }
 310                 /* store and pass along to every client */
 311                 kv = PMIX_NEW(pmix_kval_t);
 312                 kv->key = strdup(info[n].key);
 313                 PMIX_VALUE_CREATE(kv->value, 1);
 314                 PMIX_BFROPS_VALUE_XFER(rc, pmix_globals.mypeer,
 315                                        kv->value, &info[n].value);
 316                 if (PMIX_SUCCESS != rc) {
 317                     PMIX_RELEASE(kv);
 318                     PMIX_ERROR_LOG(rc);
 319                     PMIX_RELEASE_THREAD(&pmix_global_lock);
 320                     return rc;
 321                 }
 322                 pmix_list_append(&pmix_server_globals.gdata, &kv->super);
 323             }
 324         }
 325     }
 326 
 327     if (!nspace_given) {
 328         /* look for our namespace, if one was given */
 329         if (NULL == (evar = getenv("PMIX_SERVER_NAMESPACE"))) {
 330             /* use a fake namespace */
 331             pmix_strncpy(pmix_globals.myid.nspace, "pmix-server", PMIX_MAX_NSLEN);
 332         } else {
 333             pmix_strncpy(pmix_globals.myid.nspace, evar, PMIX_MAX_NSLEN);
 334         }
 335     }
 336     if (!rank_given) {
 337         /* look for our rank, if one was given */
 338         mypid = getpid();
 339         if (NULL == (evar = getenv("PMIX_SERVER_RANK"))) {
 340             /* use our pid */
 341             pmix_globals.myid.rank = mypid;
 342         } else {
 343             pmix_globals.myid.rank = strtol(evar, NULL, 10);
 344         }
 345     }
 346 
 347     /* copy it into mypeer entries */
 348     if (NULL == pmix_globals.mypeer->info) {
 349         rinfo = PMIX_NEW(pmix_rank_info_t);
 350         pmix_globals.mypeer->info = rinfo;
 351     } else {
 352         rinfo = pmix_globals.mypeer->info;
 353     }
 354     if (NULL == pmix_globals.mypeer->nptr) {
 355         pmix_globals.mypeer->nptr = PMIX_NEW(pmix_namespace_t);
 356         /* ensure our own nspace is first on the list */
 357         PMIX_RETAIN(pmix_globals.mypeer->nptr);
 358         pmix_list_prepend(&pmix_server_globals.nspaces, &pmix_globals.mypeer->nptr->super);
 359     }
 360     pmix_globals.mypeer->nptr->nspace = strdup(pmix_globals.myid.nspace);
 361     rinfo->pname.nspace = strdup(pmix_globals.mypeer->nptr->nspace);
 362     rinfo->pname.rank = pmix_globals.myid.rank;
 363     rinfo->uid = pmix_globals.uid;
 364     rinfo->gid = pmix_globals.gid;
 365     PMIX_RETAIN(pmix_globals.mypeer->info);
 366     pmix_client_globals.myserver->info = pmix_globals.mypeer->info;
 367 
 368     /* open the pnet framework and select the active modules for this environment */
 369     if (PMIX_SUCCESS != (rc = pmix_mca_base_framework_open(&pmix_pnet_base_framework, 0))) {
 370         PMIX_RELEASE_THREAD(&pmix_global_lock);
 371         return rc;
 372     }
 373     if (PMIX_SUCCESS != (rc = pmix_pnet_base_select())) {
 374         PMIX_RELEASE_THREAD(&pmix_global_lock);
 375         return rc;
 376     }
 377 
 378     /* if requested, setup the topology */
 379     if (PMIX_SUCCESS != (rc = pmix_hwloc_get_topology(info, ninfo))) {
 380         PMIX_RELEASE_THREAD(&pmix_global_lock);
 381         return rc;
 382     }
 383 
 384     /* open the psensor framework */
 385     if (PMIX_SUCCESS != (rc = pmix_mca_base_framework_open(&pmix_psensor_base_framework, 0))) {
 386         PMIX_RELEASE_THREAD(&pmix_global_lock);
 387         return rc;
 388     }
 389     if (PMIX_SUCCESS != (rc = pmix_psensor_base_select())) {
 390         PMIX_RELEASE_THREAD(&pmix_global_lock);
 391         return rc;
 392     }
 393 
 394     /* setup the wildcard recv for inbound messages from clients */
 395     req = PMIX_NEW(pmix_ptl_posted_recv_t);
 396     req->tag = UINT32_MAX;
 397     req->cbfunc = pmix_server_message_handler;
 398     /* add it to the end of the list of recvs */
 399     pmix_list_append(&pmix_ptl_globals.posted_recvs, &req->super);
 400 
 401     /* if we are a gateway, setup our IOF events */
 402     if (PMIX_PROC_IS_GATEWAY(pmix_globals.mypeer)) {
 403         /* setup IOF */
 404         PMIX_IOF_SINK_DEFINE(&pmix_client_globals.iof_stdout, &pmix_globals.myid,
 405                              1, PMIX_FWD_STDOUT_CHANNEL, pmix_iof_write_handler);
 406         PMIX_IOF_SINK_DEFINE(&pmix_client_globals.iof_stderr, &pmix_globals.myid,
 407                              2, PMIX_FWD_STDERR_CHANNEL, pmix_iof_write_handler);
 408     }
 409 
 410     /* register our attributes */
 411     if (PMIX_SUCCESS != (rc = pmix_register_server_attrs())) {
 412         PMIX_RELEASE_THREAD(&pmix_global_lock);
 413         return rc;
 414     }
 415 
 416     /* start listening for connections */
 417     if (PMIX_SUCCESS != pmix_ptl_base_start_listening(info, ninfo)) {
 418         pmix_show_help("help-pmix-server.txt", "listener-thread-start", true);
 419         PMIX_RELEASE_THREAD(&pmix_global_lock);
 420         PMIx_server_finalize();
 421         return PMIX_ERR_INIT;
 422     }
 423 
 424     ++pmix_globals.init_cntr;
 425     PMIX_RELEASE_THREAD(&pmix_global_lock);
 426 
 427     return PMIX_SUCCESS;
 428 }
 429 
 430 PMIX_EXPORT pmix_status_t PMIx_server_finalize(void)
 431 {
 432     int i;
 433     pmix_peer_t *peer;
 434     pmix_namespace_t *ns;
 435 
 436     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
 437     if (pmix_globals.init_cntr <= 0) {
 438         PMIX_RELEASE_THREAD(&pmix_global_lock);
 439         return PMIX_ERR_INIT;
 440     }
 441 
 442     if (1 != pmix_globals.init_cntr) {
 443         --pmix_globals.init_cntr;
 444         PMIX_RELEASE_THREAD(&pmix_global_lock);
 445         return PMIX_SUCCESS;
 446     }
 447     pmix_globals.init_cntr = 0;
 448 
 449     pmix_output_verbose(2, pmix_server_globals.base_output,
 450                         "pmix:server finalize called");
 451 
 452     if (!pmix_globals.external_evbase) {
 453         /* stop the progress thread, but leave the event base
 454          * still constructed. This will allow us to safely
 455          * tear down the infrastructure, including removal
 456          * of any events objects may be holding */
 457         (void)pmix_progress_thread_pause(NULL);
 458     }
 459 
 460     pmix_ptl_base_stop_listening();
 461 
 462     for (i=0; i < pmix_server_globals.clients.size; i++) {
 463         if (NULL != (peer = (pmix_peer_t*)pmix_pointer_array_get_item(&pmix_server_globals.clients, i))) {
 464             /* ensure that we do the specified cleanup - if this is an
 465              * abnormal termination, then the peer object may not be
 466              * at zero refcount */
 467             pmix_execute_epilog(&peer->epilog);
 468             PMIX_RELEASE(peer);
 469         }
 470     }
 471     PMIX_DESTRUCT(&pmix_server_globals.clients);
 472     PMIX_LIST_DESTRUCT(&pmix_server_globals.collectives);
 473     PMIX_LIST_DESTRUCT(&pmix_server_globals.remote_pnd);
 474     PMIX_LIST_DESTRUCT(&pmix_server_globals.local_reqs);
 475     PMIX_LIST_DESTRUCT(&pmix_server_globals.gdata);
 476     PMIX_LIST_DESTRUCT(&pmix_server_globals.events);
 477     PMIX_LIST_FOREACH(ns, &pmix_server_globals.nspaces, pmix_namespace_t) {
 478         /* ensure that we do the specified cleanup - if this is an
 479          * abnormal termination, then the nspace object may not be
 480          * at zero refcount */
 481         pmix_execute_epilog(&ns->epilog);
 482     }
 483     PMIX_LIST_DESTRUCT(&pmix_server_globals.nspaces);
 484     PMIX_LIST_DESTRUCT(&pmix_server_globals.groups);
 485     PMIX_LIST_DESTRUCT(&pmix_server_globals.iof);
 486 
 487     pmix_hwloc_cleanup();
 488 
 489     if (NULL != security_mode) {
 490         free(security_mode);
 491     }
 492 
 493     if (NULL != ptl_mode) {
 494         free(ptl_mode);
 495     }
 496 
 497     if (NULL != bfrops_mode) {
 498         free(bfrops_mode);
 499     }
 500 
 501     if (NULL != gds_mode) {
 502         free(gds_mode);
 503     }
 504     if (NULL != pmix_server_globals.tmpdir) {
 505         free(pmix_server_globals.tmpdir);
 506     }
 507     /* close the psensor framework */
 508     (void)pmix_mca_base_framework_close(&pmix_psensor_base_framework);
 509     /* close the pnet framework */
 510     (void)pmix_mca_base_framework_close(&pmix_pnet_base_framework);
 511 
 512     PMIX_RELEASE_THREAD(&pmix_global_lock);
 513     PMIX_DESTRUCT_LOCK(&pmix_global_lock);
 514 
 515     pmix_rte_finalize();
 516     if (NULL != pmix_globals.mypeer) {
 517         PMIX_RELEASE(pmix_globals.mypeer);
 518     }
 519 
 520     pmix_output_verbose(2, pmix_server_globals.base_output,
 521                         "pmix:server finalize complete");
 522 
 523     /* finalize the class/object system */
 524     pmix_class_finalize();
 525 
 526     return PMIX_SUCCESS;
 527 }
 528 
 529 static void _register_nspace(int sd, short args, void *cbdata)
 530 {
 531     pmix_setup_caddy_t *cd = (pmix_setup_caddy_t*)cbdata;
 532     pmix_namespace_t *nptr, *tmp;
 533     pmix_status_t rc;
 534     size_t i;
 535 
 536     PMIX_ACQUIRE_OBJECT(caddy);
 537 
 538     pmix_output_verbose(2, pmix_server_globals.base_output,
 539                         "pmix:server _register_nspace %s", cd->proc.nspace);
 540 
 541     /* see if we already have this nspace */
 542     nptr = NULL;
 543     PMIX_LIST_FOREACH(tmp, &pmix_server_globals.nspaces, pmix_namespace_t) {
 544         if (0 == strcmp(tmp->nspace, cd->proc.nspace)) {
 545             nptr = tmp;
 546             break;
 547         }
 548     }
 549     if (NULL == nptr) {
 550         nptr = PMIX_NEW(pmix_namespace_t);
 551         if (NULL == nptr) {
 552             rc = PMIX_ERR_NOMEM;
 553             goto release;
 554         }
 555         nptr->nspace = strdup(cd->proc.nspace);
 556         pmix_list_append(&pmix_server_globals.nspaces, &nptr->super);
 557     }
 558     nptr->nlocalprocs = cd->nlocalprocs;
 559 
 560     /* see if we have everyone */
 561     if (nptr->nlocalprocs == pmix_list_get_size(&nptr->ranks)) {
 562         nptr->all_registered = true;
 563     }
 564 
 565     /* check info directives to see if we want to store this info */
 566     for (i=0; i < cd->ninfo; i++) {
 567         if (0 == strcmp(cd->info[i].key, PMIX_REGISTER_NODATA)) {
 568             /* nope - so we are done */
 569             rc = PMIX_SUCCESS;
 570             goto release;
 571         }
 572     }
 573 
 574     /* register nspace for each activate components */
 575     PMIX_GDS_ADD_NSPACE(rc, nptr->nspace, cd->info, cd->ninfo);
 576     if (PMIX_SUCCESS != rc) {
 577         goto release;
 578     }
 579 
 580     /* store this data in our own GDS module - we will retrieve
 581      * it later so it can be passed down to the launched procs
 582      * once they connect to us and we know what GDS module they
 583      * are using */
 584     PMIX_GDS_CACHE_JOB_INFO(rc, pmix_globals.mypeer, nptr,
 585                             cd->info, cd->ninfo);
 586 
 587   release:
 588     if (NULL != cd->opcbfunc) {
 589         cd->opcbfunc(rc, cd->cbdata);
 590     }
 591     PMIX_RELEASE(cd);
 592 }
 593 
 594 /* setup the data for a job */
 595 PMIX_EXPORT pmix_status_t PMIx_server_register_nspace(const pmix_nspace_t nspace, int nlocalprocs,
 596                                                       pmix_info_t info[], size_t ninfo,
 597                                                       pmix_op_cbfunc_t cbfunc, void *cbdata)
 598 {
 599     pmix_setup_caddy_t *cd;
 600 
 601     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
 602     if (pmix_globals.init_cntr <= 0) {
 603         PMIX_RELEASE_THREAD(&pmix_global_lock);
 604         return PMIX_ERR_INIT;
 605     }
 606     PMIX_RELEASE_THREAD(&pmix_global_lock);
 607 
 608     cd = PMIX_NEW(pmix_setup_caddy_t);
 609     pmix_strncpy(cd->proc.nspace, nspace, PMIX_MAX_NSLEN);
 610     cd->nlocalprocs = nlocalprocs;
 611     cd->opcbfunc = cbfunc;
 612     cd->cbdata = cbdata;
 613     /* copy across the info array, if given */
 614     if (0 < ninfo) {
 615         cd->ninfo = ninfo;
 616         cd->info = info;
 617     }
 618 
 619     /* we have to push this into our event library to avoid
 620      * potential threading issues */
 621     PMIX_THREADSHIFT(cd, _register_nspace);
 622     return PMIX_SUCCESS;
 623 }
 624 
 625 void pmix_server_purge_events(pmix_peer_t *peer,
 626                               pmix_proc_t *proc)
 627 {
 628     pmix_regevents_info_t *reginfo, *regnext;
 629     pmix_peer_events_info_t *prev, *pnext;
 630     pmix_iof_req_t *req, *nxt;
 631     int i;
 632     pmix_notify_caddy_t *ncd;
 633     size_t n, m, p, ntgs;
 634     pmix_proc_t *tgs, *tgt;
 635     pmix_dmdx_local_t *dlcd, *dnxt;
 636 
 637     /* since the client is finalizing, remove them from any event
 638      * registrations they may still have on our list */
 639     PMIX_LIST_FOREACH_SAFE(reginfo, regnext, &pmix_server_globals.events, pmix_regevents_info_t) {
 640         PMIX_LIST_FOREACH_SAFE(prev, pnext, &reginfo->peers, pmix_peer_events_info_t) {
 641             if ((NULL != peer && prev->peer == peer) ||
 642                 (NULL != proc && PMIX_CHECK_PROCID(proc, &prev->peer->info->pname))) {
 643                 pmix_list_remove_item(&reginfo->peers, &prev->super);
 644                 PMIX_RELEASE(prev);
 645                 if (0 == pmix_list_get_size(&reginfo->peers)) {
 646                     pmix_list_remove_item(&pmix_server_globals.events, &reginfo->super);
 647                     PMIX_RELEASE(reginfo);
 648                     break;
 649                 }
 650             }
 651         }
 652     }
 653 
 654     /* since the client is finalizing, remove them from any IOF
 655      * registrations they may still have on our list */
 656     PMIX_LIST_FOREACH_SAFE(req, nxt, &pmix_globals.iof_requests, pmix_iof_req_t) {
 657         if ((NULL != peer && PMIX_CHECK_PROCID(&req->peer->info->pname, &peer->info->pname)) ||
 658             (NULL != proc && PMIX_CHECK_PROCID(&req->peer->info->pname, proc))) {
 659             pmix_list_remove_item(&pmix_globals.iof_requests, &req->super);
 660             PMIX_RELEASE(req);
 661         }
 662     }
 663 
 664     /* see if this proc is involved in any direct modex requests */
 665     PMIX_LIST_FOREACH_SAFE(dlcd, dnxt, &pmix_server_globals.local_reqs, pmix_dmdx_local_t) {
 666         if ((NULL != peer && PMIX_CHECK_PROCID(&peer->info->pname, &dlcd->proc)) ||
 667             (NULL != proc && PMIX_CHECK_PROCID(proc, &dlcd->proc))) {
 668                 /* cleanup this request */
 669             pmix_list_remove_item(&pmix_server_globals.local_reqs, &dlcd->super);
 670                 /* we can release the dlcd item here because we are not
 671                  * releasing the tracker held by the host - we are only
 672                  * releasing one item on that tracker */
 673             PMIX_RELEASE(dlcd);
 674         }
 675     }
 676 
 677     /* purge this client from any cached notifications */
 678     for (i=0; i < pmix_globals.max_events; i++) {
 679        pmix_hotel_knock(&pmix_globals.notifications, i, (void**)&ncd);
 680         if (NULL != ncd && NULL != ncd->targets && 0 < ncd->ntargets) {
 681             tgt = NULL;
 682             for (n=0; n < ncd->ntargets; n++) {
 683                 if ((NULL != peer && PMIX_CHECK_PROCID(&peer->info->pname, &ncd->targets[n])) ||
 684                     (NULL != proc && PMIX_CHECK_PROCID(proc, &ncd->targets[n]))) {
 685                     tgt = &ncd->targets[n];
 686                     break;
 687                 }
 688             }
 689             if (NULL != tgt) {
 690                 /* if this client was the only target, then just
 691                  * evict the notification */
 692                 if (1 == ncd->ntargets) {
 693                     pmix_hotel_checkout(&pmix_globals.notifications, i);
 694                     PMIX_RELEASE(ncd);
 695                 } else if (PMIX_RANK_WILDCARD == tgt->rank &&
 696                            NULL != proc && PMIX_RANK_WILDCARD == proc->rank) {
 697                     /* we have to remove this target, but leave the rest */
 698                     ntgs = ncd->ntargets - 1;
 699                     PMIX_PROC_CREATE(tgs, ntgs);
 700                     p=0;
 701                     for (m=0; m < ncd->ntargets; m++) {
 702                         if (tgt != &ncd->targets[m]) {
 703                             memcpy(&tgs[p], &ncd->targets[n], sizeof(pmix_proc_t));
 704                             ++p;
 705                         }
 706                     }
 707                     PMIX_PROC_FREE(ncd->targets, ncd->ntargets);
 708                     ncd->targets = tgs;
 709                     ncd->ntargets = ntgs;
 710                 }
 711             }
 712         }
 713     }
 714 
 715     if (NULL != peer) {
 716         /* ensure we honor any peer-level epilog requests */
 717         pmix_execute_epilog(&peer->epilog);
 718     }
 719 }
 720 
 721 static void _deregister_nspace(int sd, short args, void *cbdata)
 722 {
 723     pmix_setup_caddy_t *cd = (pmix_setup_caddy_t*)cbdata;
 724     pmix_namespace_t *tmp;
 725     pmix_status_t rc;
 726 
 727     PMIX_ACQUIRE_OBJECT(cd);
 728 
 729     pmix_output_verbose(2, pmix_server_globals.base_output,
 730                         "pmix:server _deregister_nspace %s",
 731                         cd->proc.nspace);
 732 
 733     /* release any job-level network resources */
 734     pmix_pnet.deregister_nspace(cd->proc.nspace);
 735 
 736     /* let our local storage clean up */
 737     PMIX_GDS_DEL_NSPACE(rc, cd->proc.nspace);
 738 
 739     /* remove any event registrations, IOF registrations, and
 740      * cached notifications targeting procs from this nspace */
 741     pmix_server_purge_events(NULL, &cd->proc);
 742 
 743     /* release this nspace */
 744     PMIX_LIST_FOREACH(tmp, &pmix_server_globals.nspaces, pmix_namespace_t) {
 745         if (PMIX_CHECK_NSPACE(tmp->nspace, cd->proc.nspace)) {
 746             /* perform any nspace-level epilog */
 747             pmix_execute_epilog(&tmp->epilog);
 748             /* remove and release it */
 749             pmix_list_remove_item(&pmix_server_globals.nspaces, &tmp->super);
 750             PMIX_RELEASE(tmp);
 751             break;
 752         }
 753     }
 754 
 755     /* release the caller */
 756     if (NULL != cd->opcbfunc) {
 757         cd->opcbfunc(rc, cd->cbdata);
 758     }
 759     PMIX_RELEASE(cd);
 760 }
 761 
 762 PMIX_EXPORT void PMIx_server_deregister_nspace(const pmix_nspace_t nspace,
 763                                                pmix_op_cbfunc_t cbfunc,
 764                                                void *cbdata)
 765 {
 766     pmix_setup_caddy_t *cd;
 767 
 768     pmix_output_verbose(2, pmix_server_globals.base_output,
 769                         "pmix:server deregister nspace %s",
 770                         nspace);
 771 
 772     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
 773     if (pmix_globals.init_cntr <= 0) {
 774         PMIX_RELEASE_THREAD(&pmix_global_lock);
 775         if (NULL != cbfunc) {
 776             cbfunc(PMIX_ERR_INIT, cbdata);
 777         }
 778         return;
 779     }
 780     PMIX_RELEASE_THREAD(&pmix_global_lock);
 781 
 782     cd = PMIX_NEW(pmix_setup_caddy_t);
 783     PMIX_LOAD_PROCID(&cd->proc, nspace, PMIX_RANK_WILDCARD);
 784     cd->opcbfunc = cbfunc;
 785     cd->cbdata = cbdata;
 786 
 787     /* we have to push this into our event library to avoid
 788      * potential threading issues */
 789     PMIX_THREADSHIFT(cd, _deregister_nspace);
 790 }
 791 
 792 void pmix_server_execute_collective(int sd, short args, void *cbdata)
 793 {
 794     pmix_trkr_caddy_t *tcd = (pmix_trkr_caddy_t*)cbdata;
 795     pmix_server_trkr_t *trk = tcd->trk;
 796     pmix_server_caddy_t *cd;
 797     pmix_peer_t *peer;
 798     char *data = NULL;
 799     size_t sz = 0;
 800     pmix_byte_object_t bo;
 801     pmix_buffer_t bucket, pbkt;
 802     pmix_kval_t *kv;
 803     pmix_proc_t proc;
 804     bool first;
 805     pmix_status_t rc;
 806     pmix_list_t pnames;
 807     pmix_namelist_t *pn;
 808     bool found;
 809     pmix_cb_t cb;
 810 
 811     PMIX_ACQUIRE_OBJECT(tcd);
 812 
 813     /* we don't need to check for non-NULL APIs here as
 814      * that was already done when the tracker was created */
 815     if (PMIX_FENCENB_CMD == trk->type) {
 816         /* if the user asked us to collect data, then we have
 817          * to provide any locally collected data to the host
 818          * server so they can circulate it - only take data
 819          * from the specified procs as not everyone is necessarily
 820          * participating! And only take data intended for remote
 821          * distribution as local data will be added when we send
 822          * the result to our local clients */
 823         if (trk->hybrid) {
 824             /* if this is a hybrid, then we pack everything using
 825              * the daemon-level bfrops module as each daemon is
 826              * going to have to unpack it, and then repack it for
 827              * each participant. */
 828             peer = pmix_globals.mypeer;
 829         } else {
 830             /* in some error situations, the list of local callbacks can
 831              * be empty - if that happens, we just need to call the fence
 832              * function to prevent others from hanging */
 833             if (0 == pmix_list_get_size(&trk->local_cbs)) {
 834                 pmix_host_server.fence_nb(trk->pcs, trk->npcs,
 835                                           trk->info, trk->ninfo,
 836                                           data, sz, trk->modexcbfunc, trk);
 837                 PMIX_RELEASE(tcd);
 838                 return;
 839             }
 840             /* since all procs are the same, just use the first proc's module */
 841             cd = (pmix_server_caddy_t*)pmix_list_get_first(&trk->local_cbs);
 842             peer = cd->peer;
 843         }
 844         PMIX_CONSTRUCT(&bucket, pmix_buffer_t);
 845 
 846         unsigned char tmp = (unsigned char)trk->collect_type;
 847         PMIX_BFROPS_PACK(rc, peer, &bucket, &tmp, 1, PMIX_BYTE);
 848 
 849         if (PMIX_COLLECT_YES == trk->collect_type) {
 850             pmix_output_verbose(2, pmix_server_globals.base_output,
 851                                 "fence - assembling data");
 852             first = true;
 853             PMIX_CONSTRUCT(&pnames, pmix_list_t);
 854             PMIX_LIST_FOREACH(cd, &trk->local_cbs, pmix_server_caddy_t) {
 855                 /* see if we have already gotten the contribution from
 856                  * this proc */
 857                 found = false;
 858                 PMIX_LIST_FOREACH(pn, &pnames, pmix_namelist_t) {
 859                     if (pn->pname == &cd->peer->info->pname) {
 860                         /* got it */
 861                         found = true;
 862                         break;
 863                     }
 864                 }
 865                 if (found) {
 866                     continue;
 867                 } else {
 868                     pn = PMIX_NEW(pmix_namelist_t);
 869                     pn->pname = &cd->peer->info->pname;
 870                 }
 871                 if (trk->hybrid || first) {
 872                     /* setup the nspace */
 873                     pmix_strncpy(proc.nspace, cd->peer->info->pname.nspace, PMIX_MAX_NSLEN);
 874                     first = false;
 875                 }
 876                 proc.rank = cd->peer->info->pname.rank;
 877                 /* get any remote contribution - note that there
 878                  * may not be a contribution */
 879                 PMIX_CONSTRUCT(&cb, pmix_cb_t);
 880                 cb.proc = &proc;
 881                 cb.scope = PMIX_REMOTE;
 882                 cb.copy = true;
 883                 PMIX_GDS_FETCH_KV(rc, peer, &cb);
 884                 if (PMIX_SUCCESS == rc) {
 885                     /* pack the returned kvals */
 886                     PMIX_CONSTRUCT(&pbkt, pmix_buffer_t);
 887                     /* start with the proc id */
 888                     PMIX_BFROPS_PACK(rc, peer, &pbkt, &proc, 1, PMIX_PROC);
 889                     if (PMIX_SUCCESS != rc) {
 890                         PMIX_ERROR_LOG(rc);
 891                         PMIX_DESTRUCT(&cb);
 892                         PMIX_DESTRUCT(&pbkt);
 893                         PMIX_DESTRUCT(&bucket);
 894                         return;
 895                     }
 896                     PMIX_LIST_FOREACH(kv, &cb.kvs, pmix_kval_t) {
 897                         PMIX_BFROPS_PACK(rc, peer, &pbkt, kv, 1, PMIX_KVAL);
 898                         if (PMIX_SUCCESS != rc) {
 899                             PMIX_ERROR_LOG(rc);
 900                             PMIX_DESTRUCT(&cb);
 901                             PMIX_DESTRUCT(&pbkt);
 902                             PMIX_DESTRUCT(&bucket);
 903                             return;
 904                         }
 905                     }
 906                     /* extract the resulting byte object */
 907                     PMIX_UNLOAD_BUFFER(&pbkt, bo.bytes, bo.size);
 908                     PMIX_DESTRUCT(&pbkt);
 909                     /* now pack that into the bucket for return */
 910                     PMIX_BFROPS_PACK(rc, peer, &bucket, &bo, 1, PMIX_BYTE_OBJECT);
 911                     if (PMIX_SUCCESS != rc) {
 912                         PMIX_ERROR_LOG(rc);
 913                         PMIX_DESTRUCT(&cb);
 914                         PMIX_BYTE_OBJECT_DESTRUCT(&bo);
 915                         PMIX_DESTRUCT(&bucket);
 916                         PMIX_RELEASE(tcd);
 917                         return;
 918                     }
 919                 }
 920                 PMIX_DESTRUCT(&cb);
 921             }
 922             PMIX_LIST_DESTRUCT(&pnames);
 923         }
 924         PMIX_UNLOAD_BUFFER(&bucket, data, sz);
 925         PMIX_DESTRUCT(&bucket);
 926         pmix_host_server.fence_nb(trk->pcs, trk->npcs,
 927                                   trk->info, trk->ninfo,
 928                                   data, sz, trk->modexcbfunc, trk);
 929     } else if (PMIX_CONNECTNB_CMD == trk->type) {
 930         pmix_host_server.connect(trk->pcs, trk->npcs,
 931                                  trk->info, trk->ninfo,
 932                                  trk->op_cbfunc, trk);
 933     } else if (PMIX_DISCONNECTNB_CMD == trk->type) {
 934         pmix_host_server.disconnect(trk->pcs, trk->npcs,
 935                                     trk->info, trk->ninfo,
 936                                     trk->op_cbfunc, trk);
 937     } else {
 938         /* unknown type */
 939         PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND);
 940         pmix_list_remove_item(&pmix_server_globals.collectives, &trk->super);
 941         PMIX_RELEASE(trk);
 942     }
 943     PMIX_RELEASE(tcd);
 944 }
 945 
 946 static void _register_client(int sd, short args, void *cbdata)
 947 {
 948     pmix_setup_caddy_t *cd = (pmix_setup_caddy_t*)cbdata;
 949     pmix_rank_info_t *info, *iptr;
 950     pmix_namespace_t *nptr, *ns;
 951     pmix_server_trkr_t *trk;
 952     pmix_trkr_caddy_t *tcd;
 953     bool all_def;
 954     size_t i;
 955     pmix_status_t rc;
 956 
 957     PMIX_ACQUIRE_OBJECT(cd);
 958 
 959     pmix_output_verbose(2, pmix_server_globals.base_output,
 960                         "pmix:server _register_client for nspace %s rank %d %s object",
 961                         cd->proc.nspace, cd->proc.rank,
 962                         (NULL == cd->server_object) ? "NULL" : "NON-NULL");
 963 
 964     /* see if we already have this nspace */
 965     nptr = NULL;
 966     PMIX_LIST_FOREACH(ns, &pmix_server_globals.nspaces, pmix_namespace_t) {
 967         if (0 == strcmp(ns->nspace, cd->proc.nspace)) {
 968             nptr = ns;
 969             break;
 970         }
 971     }
 972     if (NULL == nptr) {
 973         nptr = PMIX_NEW(pmix_namespace_t);
 974         if (NULL == nptr) {
 975             rc = PMIX_ERR_NOMEM;
 976             goto cleanup;
 977         }
 978         nptr->nspace = strdup(cd->proc.nspace);
 979         pmix_list_append(&pmix_server_globals.nspaces, &nptr->super);
 980     }
 981     /* setup a peer object for this client - since the host server
 982      * only deals with the original processes and not any clones,
 983      * we know this function will be called only once per rank */
 984     info = PMIX_NEW(pmix_rank_info_t);
 985     if (NULL == info) {
 986         rc = PMIX_ERR_NOMEM;
 987         goto cleanup;
 988     }
 989     info->pname.nspace = strdup(nptr->nspace);
 990     info->pname.rank = cd->proc.rank;
 991     info->uid = cd->uid;
 992     info->gid = cd->gid;
 993     info->server_object = cd->server_object;
 994     pmix_list_append(&nptr->ranks, &info->super);
 995     /* see if we have everyone */
 996     if (nptr->nlocalprocs == pmix_list_get_size(&nptr->ranks)) {
 997         nptr->all_registered = true;
 998         /* check any pending trackers to see if they are
 999          * waiting for us. There is a slight race condition whereby
1000          * the host server could have spawned the local client and
1001          * it called back into the collective -before- our local event
1002          * would fire the register_client callback. Deal with that here. */
1003         all_def = true;
1004         PMIX_LIST_FOREACH(trk, &pmix_server_globals.collectives, pmix_server_trkr_t) {
1005             /* if this tracker is already complete, then we
1006              * don't need to update it */
1007             if (trk->def_complete) {
1008                 continue;
1009             }
1010             /* see if any of our procs from this nspace are involved - the tracker will
1011              * have been created because a callback was received, but
1012              * we may or may not have received _all_ callbacks by this
1013              * time. So check and see if any procs from this nspace are
1014              * involved, and add them to the count of local participants */
1015             for (i=0; i < trk->npcs; i++) {
1016                 /* since we have to do this search, let's see
1017                  * if the nspaces are all defined */
1018                 if (all_def) {
1019                     /* so far, they have all been defined - check this one */
1020                     PMIX_LIST_FOREACH(ns, &pmix_server_globals.nspaces, pmix_namespace_t) {
1021                         if (0 < ns->nlocalprocs &&
1022                             0 == strcmp(trk->pcs[i].nspace, ns->nspace)) {
1023                             all_def = ns->all_registered;
1024                             break;
1025                         }
1026                     }
1027                 }
1028                 /* now see if this proc is local to us */
1029                 if (0 != strncmp(trk->pcs[i].nspace, nptr->nspace, PMIX_MAX_NSLEN)) {
1030                     continue;
1031                 }
1032                 /* need to check if this rank is one of mine */
1033                 PMIX_LIST_FOREACH(iptr, &nptr->ranks, pmix_rank_info_t) {
1034                     if (PMIX_RANK_WILDCARD == trk->pcs[i].rank ||
1035                         iptr->pname.rank == trk->pcs[i].rank) {
1036                         /* this is one of mine - track the count */
1037                         ++trk->nlocal;
1038                         break;
1039                     }
1040                 }
1041             }
1042             /* update this tracker's status */
1043             trk->def_complete = all_def;
1044             /* is this now locally completed? */
1045             if (trk->def_complete && pmix_list_get_size(&trk->local_cbs) == trk->nlocal) {
1046                 /* it did, so now we need to process it
1047                  * we don't want to block someone
1048                  * here, so kick any completed trackers into a
1049                  * new event for processing */
1050                 PMIX_EXECUTE_COLLECTIVE(tcd, trk, pmix_server_execute_collective);
1051             }
1052         }
1053         /* also check any pending local modex requests to see if
1054          * someone has been waiting for a request on a remote proc
1055          * in one of our nspaces, but we didn't know all the local procs
1056          * and so couldn't determine the proc was remote */
1057         pmix_pending_nspace_requests(nptr);
1058     }
1059     rc = PMIX_SUCCESS;
1060 
1061   cleanup:
1062     /* let the caller know we are done */
1063     if (NULL != cd->opcbfunc) {
1064         cd->opcbfunc(rc, cd->cbdata);
1065     }
1066     PMIX_RELEASE(cd);
1067 }
1068 
1069 PMIX_EXPORT pmix_status_t PMIx_server_register_client(const pmix_proc_t *proc,
1070                                                       uid_t uid, gid_t gid, void *server_object,
1071                                                       pmix_op_cbfunc_t cbfunc, void *cbdata)
1072 {
1073     pmix_setup_caddy_t *cd;
1074 
1075     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
1076     if (pmix_globals.init_cntr <= 0) {
1077         PMIX_RELEASE_THREAD(&pmix_global_lock);
1078         return PMIX_ERR_INIT;
1079     }
1080     PMIX_RELEASE_THREAD(&pmix_global_lock);
1081 
1082     pmix_output_verbose(2, pmix_server_globals.base_output,
1083                         "pmix:server register client %s:%d",
1084                         proc->nspace, proc->rank);
1085 
1086     cd = PMIX_NEW(pmix_setup_caddy_t);
1087     if (NULL == cd) {
1088         return PMIX_ERR_NOMEM;
1089     }
1090     pmix_strncpy(cd->proc.nspace, proc->nspace, PMIX_MAX_NSLEN);
1091     cd->proc.rank = proc->rank;
1092     cd->uid = uid;
1093     cd->gid = gid;
1094     cd->server_object = server_object;
1095     cd->opcbfunc = cbfunc;
1096     cd->cbdata = cbdata;
1097 
1098     /* we have to push this into our event library to avoid
1099      * potential threading issues */
1100     PMIX_THREADSHIFT(cd, _register_client);
1101     return PMIX_SUCCESS;
1102 }
1103 
1104 static void _deregister_client(int sd, short args, void *cbdata)
1105 {
1106     pmix_setup_caddy_t *cd = (pmix_setup_caddy_t*)cbdata;
1107     pmix_rank_info_t *info;
1108     pmix_namespace_t *nptr, *tmp;
1109     pmix_peer_t *peer;
1110 
1111     PMIX_ACQUIRE_OBJECT(cd);
1112 
1113     pmix_output_verbose(2, pmix_server_globals.base_output,
1114                         "pmix:server _deregister_client for nspace %s rank %d",
1115                         cd->proc.nspace, cd->proc.rank);
1116 
1117     /* see if we already have this nspace */
1118     nptr = NULL;
1119     PMIX_LIST_FOREACH(tmp, &pmix_server_globals.nspaces, pmix_namespace_t) {
1120         if (0 == strcmp(tmp->nspace, cd->proc.nspace)) {
1121             nptr = tmp;
1122             break;
1123         }
1124     }
1125     if (NULL == nptr) {
1126         /* nothing to do */
1127         goto cleanup;
1128     }
1129     /* find and remove this client */
1130     PMIX_LIST_FOREACH(info, &nptr->ranks, pmix_rank_info_t) {
1131         if (info->pname.rank == cd->proc.rank) {
1132             /* if this client failed to call finalize, we still need
1133              * to restore any allocations that were given to it */
1134             if (NULL == (peer = (pmix_peer_t*)pmix_pointer_array_get_item(&pmix_server_globals.clients, info->peerid))) {
1135                 /* this peer never connected, and hence it won't finalize,
1136                  * so account for it here */
1137                 nptr->nfinalized++;
1138                 /* even if they never connected, resources were allocated
1139                  * to them, so we need to ensure they are properly released */
1140                 pmix_pnet.child_finalized(&cd->proc);
1141             } else {
1142                 if (!peer->finalized) {
1143                     /* this peer connected to us, but is being deregistered
1144                      * without having finalized. This usually means an
1145                      * abnormal termination that was picked up by
1146                      * our host prior to our seeing the connection drop.
1147                      * It is also possible that we missed the dropped
1148                      * connection, so mark the peer as finalized so
1149                      * we don't duplicate account for it and take care
1150                      * of it here */
1151                     peer->finalized = true;
1152                     nptr->nfinalized++;
1153                 }
1154                 /* resources may have been allocated to them, so
1155                  * ensure they get cleaned up - this isn't true
1156                  * for tools, so don't clean them up */
1157                 if (!PMIX_PROC_IS_TOOL(peer)) {
1158                     pmix_pnet.child_finalized(&cd->proc);
1159                     pmix_psensor.stop(peer, NULL);
1160                 }
1161                 /* honor any registered epilogs */
1162                 pmix_execute_epilog(&peer->epilog);
1163                 /* ensure we close the socket to this peer so we don't
1164                  * generate "connection lost" events should it be
1165                  * subsequently "killed" by the host */
1166                 CLOSE_THE_SOCKET(peer->sd);
1167             }
1168             if (nptr->nlocalprocs == nptr->nfinalized) {
1169                 pmix_pnet.local_app_finalized(nptr);
1170             }
1171             pmix_list_remove_item(&nptr->ranks, &info->super);
1172             PMIX_RELEASE(info);
1173             break;
1174         }
1175     }
1176 
1177   cleanup:
1178     if (NULL != cd->opcbfunc) {
1179         cd->opcbfunc(PMIX_SUCCESS, cd->cbdata);
1180     }
1181     PMIX_RELEASE(cd);
1182 }
1183 
1184 PMIX_EXPORT void PMIx_server_deregister_client(const pmix_proc_t *proc,
1185                                                pmix_op_cbfunc_t cbfunc, void *cbdata)
1186 {
1187     pmix_setup_caddy_t *cd;
1188 
1189     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
1190     if (pmix_globals.init_cntr <= 0) {
1191         PMIX_RELEASE_THREAD(&pmix_global_lock);
1192         if (NULL != cbfunc) {
1193             cbfunc(PMIX_ERR_INIT, cbdata);
1194         }
1195         return;
1196     }
1197     PMIX_RELEASE_THREAD(&pmix_global_lock);
1198 
1199     pmix_output_verbose(2, pmix_server_globals.base_output,
1200                         "pmix:server deregister client %s:%d",
1201                         proc->nspace, proc->rank);
1202 
1203     cd = PMIX_NEW(pmix_setup_caddy_t);
1204     if (NULL == cd) {
1205         if (NULL != cbfunc) {
1206             cbfunc(PMIX_ERR_NOMEM, cbdata);
1207         }
1208         return;
1209     }
1210     pmix_strncpy(cd->proc.nspace, proc->nspace, PMIX_MAX_NSLEN);
1211     cd->proc.rank = proc->rank;
1212     cd->opcbfunc = cbfunc;
1213     cd->cbdata = cbdata;
1214 
1215     /* we have to push this into our event library to avoid
1216      * potential threading issues */
1217     PMIX_THREADSHIFT(cd, _deregister_client);
1218 }
1219 
1220 /* setup the envars for a child process */
1221 PMIX_EXPORT pmix_status_t PMIx_server_setup_fork(const pmix_proc_t *proc, char ***env)
1222 {
1223     char rankstr[128];
1224     pmix_listener_t *lt;
1225     pmix_status_t rc;
1226     char **varnames;
1227     int n;
1228 
1229     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
1230     if (pmix_globals.init_cntr <= 0) {
1231         PMIX_RELEASE_THREAD(&pmix_global_lock);
1232         return PMIX_ERR_INIT;
1233     }
1234     PMIX_RELEASE_THREAD(&pmix_global_lock);
1235 
1236     pmix_output_verbose(2, pmix_server_globals.base_output,
1237                         "pmix:server setup_fork for nspace %s rank %d",
1238                         proc->nspace, proc->rank);
1239 
1240     /* pass the nspace */
1241     pmix_setenv("PMIX_NAMESPACE", proc->nspace, true, env);
1242     /* pass the rank */
1243     (void)snprintf(rankstr, 127, "%d", proc->rank);
1244     pmix_setenv("PMIX_RANK", rankstr, true, env);
1245     /* pass our rendezvous info */
1246     PMIX_LIST_FOREACH(lt, &pmix_ptl_globals.listeners, pmix_listener_t) {
1247         if (NULL != lt->uri && NULL != lt->varname) {
1248             varnames = pmix_argv_split(lt->varname, ':');
1249             for (n=0; NULL != varnames[n]; n++) {
1250                 pmix_setenv(varnames[n], lt->uri, true, env);
1251             }
1252             pmix_argv_free(varnames);
1253         }
1254     }
1255     /* pass our active security modules */
1256     pmix_setenv("PMIX_SECURITY_MODE", security_mode, true, env);
1257     /* pass our available ptl modules */
1258     pmix_setenv("PMIX_PTL_MODULE", ptl_mode, true, env);
1259     /* pass the type of buffer we are using */
1260     if (PMIX_BFROP_BUFFER_FULLY_DESC == pmix_globals.mypeer->nptr->compat.type) {
1261         pmix_setenv("PMIX_BFROP_BUFFER_TYPE", "PMIX_BFROP_BUFFER_FULLY_DESC", true, env);
1262     } else {
1263         pmix_setenv("PMIX_BFROP_BUFFER_TYPE", "PMIX_BFROP_BUFFER_NON_DESC", true, env);
1264     }
1265     /* pass our available gds modules */
1266     pmix_setenv("PMIX_GDS_MODULE", gds_mode, true, env);
1267 
1268     /* get any PTL contribution such as tmpdir settings for session files */
1269     if (PMIX_SUCCESS != (rc = pmix_ptl_base_setup_fork(proc, env))) {
1270         PMIX_ERROR_LOG(rc);
1271         return rc;
1272     }
1273 
1274     /* get any network contribution */
1275     if (PMIX_SUCCESS != (rc = pmix_pnet.setup_fork(proc, env))) {
1276         PMIX_ERROR_LOG(rc);
1277         return rc;
1278     }
1279 
1280     /* get any GDS contributions */
1281     if (PMIX_SUCCESS != (rc = pmix_gds_base_setup_fork(proc, env))) {
1282         PMIX_ERROR_LOG(rc);
1283         return rc;
1284     }
1285 
1286     return PMIX_SUCCESS;
1287 }
1288 
1289 /***************************************************************************************************
1290  *  Support calls from the host server down to us requesting direct modex data provided by one     *
1291  *  of our local clients                                                                           *
1292  ***************************************************************************************************/
1293 
1294 static void _dmodex_req(int sd, short args, void *cbdata)
1295 {
1296     pmix_setup_caddy_t *cd = (pmix_setup_caddy_t*)cbdata;
1297     pmix_rank_info_t *info, *iptr;
1298     pmix_namespace_t *nptr, *ns;
1299     char *data = NULL;
1300     size_t sz = 0;
1301     pmix_dmdx_remote_t *dcd;
1302     pmix_status_t rc;
1303     pmix_buffer_t pbkt;
1304     pmix_kval_t *kv;
1305     pmix_cb_t cb;
1306 
1307     PMIX_ACQUIRE_OBJECT(cd);
1308 
1309     pmix_output_verbose(2, pmix_server_globals.base_output,
1310                         "DMODX LOOKING FOR %s:%d",
1311                         cd->proc.nspace, cd->proc.rank);
1312 
1313     /* this should be one of my clients, but a race condition
1314      * could cause this request to arrive prior to us having
1315      * been informed of it - so first check to see if we know
1316      * about this nspace yet */
1317     nptr = NULL;
1318     PMIX_LIST_FOREACH(ns, &pmix_server_globals.nspaces, pmix_namespace_t) {
1319         if (0 == strcmp(ns->nspace, cd->proc.nspace)) {
1320             nptr = ns;
1321             break;
1322         }
1323     }
1324     if (NULL == nptr) {
1325         /* we don't know this namespace yet, and so we obviously
1326          * haven't received the data from this proc yet - defer
1327          * the request until we do */
1328         dcd = PMIX_NEW(pmix_dmdx_remote_t);
1329         if (NULL == dcd) {
1330             rc = PMIX_ERR_NOMEM;
1331             goto cleanup;
1332         }
1333         dcd->cd = cd;
1334         pmix_list_append(&pmix_server_globals.remote_pnd, &dcd->super);
1335         return;
1336     }
1337 
1338     /* They are asking for job level data for this process */
1339     if (cd->proc.rank == PMIX_RANK_WILDCARD) {
1340         /* fetch the job-level info for this nspace */
1341         /* this is going to a remote peer, so inform the gds
1342          * that we need an actual copy of the data */
1343         PMIX_CONSTRUCT(&cb, pmix_cb_t);
1344         cb.proc = &cd->proc;
1345         cb.scope = PMIX_REMOTE;
1346         cb.copy = true;
1347         PMIX_CONSTRUCT(&pbkt, pmix_buffer_t);
1348         PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, &cb);
1349         if (PMIX_SUCCESS == rc) {
1350             /* assemble the provided data into a byte object */
1351             PMIX_LIST_FOREACH(kv, &cb.kvs, pmix_kval_t) {
1352                 PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &pbkt, kv, 1, PMIX_KVAL);
1353                 if (PMIX_SUCCESS != rc) {
1354                     PMIX_DESTRUCT(&pbkt);
1355                     PMIX_DESTRUCT(&cb);
1356                     goto cleanup;
1357                 }
1358             }
1359         }
1360         PMIX_DESTRUCT(&cb);
1361         PMIX_UNLOAD_BUFFER(&pbkt, data, sz);
1362         PMIX_DESTRUCT(&pbkt);
1363         goto cleanup;
1364     }
1365 
1366     /* see if we have this peer in our list */
1367     info = NULL;
1368     PMIX_LIST_FOREACH(iptr, &nptr->ranks, pmix_rank_info_t) {
1369         if (iptr->pname.rank == cd->proc.rank) {
1370             info = iptr;
1371             break;
1372         }
1373     }
1374     if (NULL == info) {
1375         /* rank isn't known yet - defer
1376          * the request until we do */
1377         dcd = PMIX_NEW(pmix_dmdx_remote_t);
1378         dcd->cd = cd;
1379         pmix_list_append(&pmix_server_globals.remote_pnd, &dcd->super);
1380         return;
1381     }
1382 
1383     /* have we received the modex from this proc yet - if
1384      * not, then defer */
1385     if (!info->modex_recvd) {
1386         /* track the request so we can fulfill it once
1387          * data is recvd */
1388         dcd = PMIX_NEW(pmix_dmdx_remote_t);
1389         dcd->cd = cd;
1390         pmix_list_append(&pmix_server_globals.remote_pnd, &dcd->super);
1391         return;
1392     }
1393 
1394     /* collect the remote/global data from this proc */
1395     PMIX_CONSTRUCT(&cb, pmix_cb_t);
1396     cb.proc = &cd->proc;
1397     cb.scope = PMIX_REMOTE;
1398     cb.copy = true;
1399     PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, &cb);
1400     if (PMIX_SUCCESS == rc) {
1401         /* assemble the provided data into a byte object */
1402         PMIX_CONSTRUCT(&pbkt, pmix_buffer_t);
1403         PMIX_LIST_FOREACH(kv, &cb.kvs, pmix_kval_t) {
1404             PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &pbkt, kv, 1, PMIX_KVAL);
1405             if (PMIX_SUCCESS != rc) {
1406                 PMIX_DESTRUCT(&pbkt);
1407                 PMIX_DESTRUCT(&cb);
1408                 goto cleanup;
1409             }
1410         }
1411         PMIX_UNLOAD_BUFFER(&pbkt, data, sz);
1412         PMIX_DESTRUCT(&pbkt);
1413     }
1414     PMIX_DESTRUCT(&cb);
1415 
1416   cleanup:
1417     /* execute the callback */
1418     cd->cbfunc(rc, data, sz, cd->cbdata);
1419     if (NULL != data) {
1420         free(data);
1421     }
1422     PMIX_RELEASE(cd);
1423 }
1424 
1425 PMIX_EXPORT pmix_status_t PMIx_server_dmodex_request(const pmix_proc_t *proc,
1426                                                      pmix_dmodex_response_fn_t cbfunc,
1427                                                      void *cbdata)
1428 {
1429     pmix_setup_caddy_t *cd;
1430 
1431     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
1432     if (pmix_globals.init_cntr <= 0) {
1433         PMIX_RELEASE_THREAD(&pmix_global_lock);
1434         return PMIX_ERR_INIT;
1435     }
1436     PMIX_RELEASE_THREAD(&pmix_global_lock);
1437 
1438     /* protect against bozo */
1439     if (NULL == cbfunc || NULL == proc) {
1440         return PMIX_ERR_BAD_PARAM;
1441     }
1442 
1443     pmix_output_verbose(2, pmix_server_globals.base_output,
1444                         "pmix:server dmodex request%s:%d",
1445                         proc->nspace, proc->rank);
1446 
1447     cd = PMIX_NEW(pmix_setup_caddy_t);
1448     pmix_strncpy(cd->proc.nspace, proc->nspace, PMIX_MAX_NSLEN);
1449     cd->proc.rank = proc->rank;
1450     cd->cbfunc = cbfunc;
1451     cd->cbdata = cbdata;
1452 
1453     /* we have to push this into our event library to avoid
1454      * potential threading issues */
1455     PMIX_THREADSHIFT(cd, _dmodex_req);
1456     return PMIX_SUCCESS;
1457 }
1458 
1459 static void _store_internal(int sd, short args, void *cbdata)
1460 {
1461     pmix_shift_caddy_t *cd = (pmix_shift_caddy_t*)cbdata;
1462     pmix_proc_t proc;
1463 
1464     PMIX_ACQUIRE_OBJECT(cd);
1465 
1466     pmix_strncpy(proc.nspace, cd->pname.nspace, PMIX_MAX_NSLEN);
1467     proc.rank = cd->pname.rank;
1468     PMIX_GDS_STORE_KV(cd->status, pmix_globals.mypeer,
1469                       &proc, PMIX_INTERNAL, cd->kv);
1470     if (cd->lock.active) {
1471         PMIX_WAKEUP_THREAD(&cd->lock);
1472     }
1473  }
1474 
1475 PMIX_EXPORT pmix_status_t PMIx_Store_internal(const pmix_proc_t *proc,
1476                                               const pmix_key_t key, pmix_value_t *val)
1477 {
1478     pmix_shift_caddy_t *cd;
1479     pmix_status_t rc;
1480 
1481     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
1482     if (pmix_globals.init_cntr <= 0) {
1483         PMIX_RELEASE_THREAD(&pmix_global_lock);
1484         return PMIX_ERR_INIT;
1485     }
1486     PMIX_RELEASE_THREAD(&pmix_global_lock);
1487 
1488     /* setup to thread shift this request */
1489     cd = PMIX_NEW(pmix_shift_caddy_t);
1490     if (NULL == cd) {
1491         return PMIX_ERR_NOMEM;
1492     }
1493     cd->pname.nspace = strdup(proc->nspace);
1494     cd->pname.rank = proc->rank;
1495 
1496     cd->kv = PMIX_NEW(pmix_kval_t);
1497     if (NULL == cd->kv) {
1498         PMIX_RELEASE(cd);
1499         return PMIX_ERR_NOMEM;
1500     }
1501     cd->kv->key = strdup((char*)key);
1502     cd->kv->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
1503     PMIX_BFROPS_VALUE_XFER(rc, pmix_globals.mypeer, cd->kv->value, val);
1504     if (PMIX_SUCCESS != rc) {
1505         PMIX_ERROR_LOG(rc);
1506         PMIX_RELEASE(cd);
1507         return rc;
1508     }
1509 
1510     PMIX_THREADSHIFT(cd, _store_internal);
1511     PMIX_WAIT_THREAD(&cd->lock);
1512     rc = cd->status;
1513     PMIX_RELEASE(cd);
1514 
1515     return rc;
1516 }
1517 
1518 PMIX_EXPORT pmix_status_t PMIx_generate_regex(const char *input, char **regexp)
1519 {
1520     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
1521     if (pmix_globals.init_cntr <= 0) {
1522         PMIX_RELEASE_THREAD(&pmix_global_lock);
1523         return PMIX_ERR_INIT;
1524     }
1525     PMIX_RELEASE_THREAD(&pmix_global_lock);
1526 
1527     return pmix_preg.generate_node_regex(input, regexp);
1528 }
1529 
1530 PMIX_EXPORT pmix_status_t PMIx_generate_ppn(const char *input, char **regexp)
1531 {
1532     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
1533     if (pmix_globals.init_cntr <= 0) {
1534         PMIX_RELEASE_THREAD(&pmix_global_lock);
1535         return PMIX_ERR_INIT;
1536     }
1537     PMIX_RELEASE_THREAD(&pmix_global_lock);
1538 
1539     return pmix_preg.generate_ppn(input, regexp);
1540 }
1541 
1542 static void _setup_op(pmix_status_t rc, void *cbdata)
1543 {
1544     pmix_setup_caddy_t *fcd = (pmix_setup_caddy_t*)cbdata;
1545 
1546     if (NULL != fcd->info) {
1547         PMIX_INFO_FREE(fcd->info, fcd->ninfo);
1548     }
1549     PMIX_RELEASE(fcd);
1550 }
1551 
1552 static void _setup_app(int sd, short args, void *cbdata)
1553 {
1554     pmix_setup_caddy_t *cd = (pmix_setup_caddy_t*)cbdata;
1555     pmix_setup_caddy_t *fcd = NULL;
1556     pmix_status_t rc;
1557     pmix_list_t ilist;
1558     pmix_kval_t *kv;
1559     size_t n;
1560 
1561     PMIX_ACQUIRE_OBJECT(cd);
1562 
1563     PMIX_CONSTRUCT(&ilist, pmix_list_t);
1564 
1565     /* pass to the network libraries */
1566     if (PMIX_SUCCESS != (rc = pmix_pnet.allocate(cd->nspace,
1567                                                  cd->info, cd->ninfo,
1568                                                  &ilist))) {
1569         goto depart;
1570     }
1571 
1572     /* setup the return callback */
1573     fcd = PMIX_NEW(pmix_setup_caddy_t);
1574     if (NULL == fcd) {
1575         rc = PMIX_ERR_NOMEM;
1576         PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
1577         goto depart;
1578     }
1579 
1580     /* if anything came back, construct an info array */
1581     if (0 < (fcd->ninfo = pmix_list_get_size(&ilist))) {
1582         PMIX_INFO_CREATE(fcd->info, fcd->ninfo);
1583         if (NULL == fcd->info) {
1584             rc = PMIX_ERR_NOMEM;
1585             PMIX_RELEASE(fcd);
1586             goto depart;
1587         }
1588         n = 0;
1589         PMIX_LIST_FOREACH(kv, &ilist, pmix_kval_t) {
1590             pmix_strncpy(fcd->info[n].key, kv->key, PMIX_MAX_KEYLEN);
1591             pmix_value_xfer(&fcd->info[n].value, kv->value);
1592             ++n;
1593         }
1594     }
1595 
1596   depart:
1597     /* always execute the callback to avoid hanging */
1598     if (NULL != cd->setupcbfunc) {
1599         if (NULL == fcd) {
1600             cd->setupcbfunc(rc, NULL, 0, cd->cbdata, NULL, NULL);
1601         } else {
1602             cd->setupcbfunc(rc, fcd->info, fcd->ninfo, cd->cbdata, _setup_op, fcd);
1603         }
1604     }
1605 
1606     /* cleanup memory */
1607     PMIX_LIST_DESTRUCT(&ilist);
1608     if (NULL != cd->nspace) {
1609         free(cd->nspace);
1610     }
1611     PMIX_RELEASE(cd);
1612 }
1613 
1614 pmix_status_t PMIx_server_setup_application(const pmix_nspace_t nspace,
1615                                             pmix_info_t info[], size_t ninfo,
1616                                             pmix_setup_application_cbfunc_t cbfunc, void *cbdata)
1617 {
1618     pmix_setup_caddy_t *cd;
1619 
1620     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
1621     if (pmix_globals.init_cntr <= 0) {
1622         PMIX_RELEASE_THREAD(&pmix_global_lock);
1623         return PMIX_ERR_INIT;
1624     }
1625     PMIX_RELEASE_THREAD(&pmix_global_lock);
1626 
1627     /* need to threadshift this request */
1628     cd = PMIX_NEW(pmix_setup_caddy_t);
1629     if (NULL == cd) {
1630         return PMIX_ERR_NOMEM;
1631     }
1632     if (NULL != nspace) {
1633         cd->nspace = strdup(nspace);
1634     }
1635     cd->info = info;
1636     cd->ninfo = ninfo;
1637     cd->setupcbfunc = cbfunc;
1638     cd->cbdata = cbdata;
1639     PMIX_THREADSHIFT(cd, _setup_app);
1640 
1641     return PMIX_SUCCESS;
1642 }
1643 
1644 static void _setup_local_support(int sd, short args, void *cbdata)
1645 {
1646     pmix_setup_caddy_t *cd = (pmix_setup_caddy_t*)cbdata;
1647     pmix_status_t rc;
1648 
1649     PMIX_ACQUIRE_OBJECT(cd);
1650 
1651     /* pass to the network libraries */
1652     rc = pmix_pnet.setup_local_network(cd->nspace, cd->info, cd->ninfo);
1653 
1654     /* pass the info back */
1655     if (NULL != cd->opcbfunc) {
1656         cd->opcbfunc(rc, cd->cbdata);
1657     }
1658     /* cleanup memory */
1659     if (NULL != cd->nspace) {
1660         free(cd->nspace);
1661     }
1662     PMIX_RELEASE(cd);
1663 }
1664 
1665 pmix_status_t PMIx_server_setup_local_support(const pmix_nspace_t nspace,
1666                                               pmix_info_t info[], size_t ninfo,
1667                                               pmix_op_cbfunc_t cbfunc, void *cbdata)
1668 {
1669     pmix_setup_caddy_t *cd;
1670 
1671     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
1672     if (pmix_globals.init_cntr <= 0) {
1673         PMIX_RELEASE_THREAD(&pmix_global_lock);
1674         return PMIX_ERR_INIT;
1675     }
1676     PMIX_RELEASE_THREAD(&pmix_global_lock);
1677 
1678     /* need to threadshift this request */
1679     cd = PMIX_NEW(pmix_setup_caddy_t);
1680     if (NULL == cd) {
1681         return PMIX_ERR_NOMEM;
1682     }
1683     if (NULL != nspace) {
1684         cd->nspace = strdup(nspace);
1685     }
1686     cd->info = info;
1687     cd->ninfo = ninfo;
1688     cd->opcbfunc = cbfunc;
1689     cd->cbdata = cbdata;
1690     PMIX_THREADSHIFT(cd, _setup_local_support);
1691 
1692     return PMIX_SUCCESS;
1693 }
1694 
1695 static void _iofdeliver(int sd, short args, void *cbdata)
1696 {
1697     pmix_setup_caddy_t *cd = (pmix_setup_caddy_t*)cbdata;
1698     pmix_iof_req_t *req;
1699     pmix_status_t rc;
1700     pmix_buffer_t *msg;
1701     bool found = false;
1702     bool cached = false;
1703     pmix_iof_cache_t *iof;
1704 
1705     pmix_output_verbose(2, pmix_server_globals.iof_output,
1706                         "PMIX:SERVER delivering IOF from %s on channel %0x",
1707                         PMIX_NAME_PRINT(cd->procs), cd->channels);
1708 
1709     /* cycle across our list of IOF requestors and see who wants
1710      * this channel from this source */
1711     PMIX_LIST_FOREACH(req, &pmix_globals.iof_requests, pmix_iof_req_t) {
1712         /* if the channel wasn't included, then ignore it */
1713         if (!(cd->channels & req->channels)) {
1714             continue;
1715         }
1716         /* see if the source matches the request */
1717         if (!PMIX_CHECK_PROCID(cd->procs, &req->pname)) {
1718             continue;
1719         }
1720         /* never forward back to the source! This can happen if the source
1721          * is a launcher - also, never forward to a peer that is no
1722          * longer with us */
1723         if (NULL == req->peer->info || req->peer->finalized) {
1724             continue;
1725         }
1726         if (PMIX_CHECK_PROCID(cd->procs, &req->peer->info->pname)) {
1727             continue;
1728         }
1729         found = true;
1730         /* setup the msg */
1731         if (NULL == (msg = PMIX_NEW(pmix_buffer_t))) {
1732             PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
1733             rc = PMIX_ERR_OUT_OF_RESOURCE;
1734             break;
1735         }
1736         /* provide the source */
1737         PMIX_BFROPS_PACK(rc, req->peer, msg, cd->procs, 1, PMIX_PROC);
1738         if (PMIX_SUCCESS != rc) {
1739             PMIX_ERROR_LOG(rc);
1740             PMIX_RELEASE(msg);
1741             break;
1742         }
1743         /* provide the channel */
1744         PMIX_BFROPS_PACK(rc, req->peer, msg, &cd->channels, 1, PMIX_IOF_CHANNEL);
1745         if (PMIX_SUCCESS != rc) {
1746             PMIX_ERROR_LOG(rc);
1747             PMIX_RELEASE(msg);
1748             break;
1749         }
1750         /* pack the data */
1751         PMIX_BFROPS_PACK(rc, req->peer, msg, cd->bo, 1, PMIX_BYTE_OBJECT);
1752         if (PMIX_SUCCESS != rc) {
1753             PMIX_ERROR_LOG(rc);
1754             PMIX_RELEASE(msg);
1755             break;
1756         }
1757         /* send it to the requestor */
1758         PMIX_PTL_SEND_ONEWAY(rc, req->peer, msg, PMIX_PTL_TAG_IOF);
1759         if (PMIX_SUCCESS != rc) {
1760             PMIX_ERROR_LOG(rc);
1761             PMIX_RELEASE(msg);
1762         }
1763     }
1764 
1765     /* if nobody has registered for this yet, then cache it */
1766     if (!found) {
1767         pmix_output_verbose(2, pmix_server_globals.iof_output,
1768                             "PMIx:SERVER caching IOF");
1769         if (pmix_server_globals.max_iof_cache == pmix_list_get_size(&pmix_server_globals.iof)) {
1770             /* remove the oldest cached message */
1771             iof = (pmix_iof_cache_t*)pmix_list_remove_first(&pmix_server_globals.iof);
1772             PMIX_RELEASE(iof);
1773         }
1774         /* add this output to our cache so it is cached until someone
1775          * registers to receive it */
1776         iof = PMIX_NEW(pmix_iof_cache_t);
1777         memcpy(&iof->source, cd->procs, sizeof(pmix_proc_t));
1778         iof->channel = cd->channels;
1779         iof->bo = cd->bo;
1780         cd->bo = NULL;  // protect the data
1781         pmix_list_append(&pmix_server_globals.iof, &iof->super);
1782     }
1783 
1784 
1785     if (NULL != cd->opcbfunc) {
1786         cd->opcbfunc(rc, cd->cbdata);
1787     }
1788     if (!cached) {
1789         PMIX_RELEASE(cd);
1790     }
1791 }
1792 
1793 pmix_status_t PMIx_server_IOF_deliver(const pmix_proc_t *source,
1794                                       pmix_iof_channel_t channel,
1795                                       const pmix_byte_object_t *bo,
1796                                       const pmix_info_t info[], size_t ninfo,
1797                                       pmix_op_cbfunc_t cbfunc, void *cbdata)
1798 {
1799     pmix_setup_caddy_t *cd;
1800     size_t n;
1801 
1802     /* need to threadshift this request */
1803     cd = PMIX_NEW(pmix_setup_caddy_t);
1804     if (NULL == cd) {
1805         return PMIX_ERR_NOMEM;
1806     }
1807     /* unfortunately, we need to copy the input because we
1808      * might have to cache it for later delivery */
1809     PMIX_PROC_CREATE(cd->procs, 1);
1810     if (NULL == cd->procs) {
1811         PMIX_RELEASE(cd);
1812         return PMIX_ERR_NOMEM;
1813     }
1814     cd->nprocs = 1;
1815     pmix_strncpy(cd->procs[0].nspace, source->nspace, PMIX_MAX_NSLEN);
1816     cd->procs[0].rank = source->rank;
1817     cd->channels = channel;
1818     PMIX_BYTE_OBJECT_CREATE(cd->bo, 1);
1819     if (NULL == cd->bo) {
1820         PMIX_RELEASE(cd);
1821         return PMIX_ERR_NOMEM;
1822     }
1823     cd->nbo = 1;
1824     cd->bo[0].bytes = (char*)malloc(bo->size);
1825     if (NULL == cd->bo[0].bytes) {
1826         PMIX_RELEASE(cd);
1827         return PMIX_ERR_NOMEM;
1828     }
1829     memcpy(cd->bo[0].bytes, bo->bytes, bo->size);
1830     cd->bo[0].size = bo->size;
1831     if (0 < ninfo) {
1832         PMIX_INFO_CREATE(cd->info, ninfo);
1833         if (NULL == cd->info) {
1834             PMIX_RELEASE(cd);
1835             return PMIX_ERR_NOMEM;
1836         }
1837         cd->ninfo = ninfo;
1838         for (n=0; n < ninfo; n++) {
1839             PMIX_INFO_XFER(&cd->info[n], (pmix_info_t*)&info[n]);
1840         }
1841     }
1842     cd->opcbfunc = cbfunc;
1843     cd->cbdata = cbdata;
1844     PMIX_THREADSHIFT(cd, _iofdeliver);
1845     return PMIX_SUCCESS;
1846 }
1847 
1848 static void cirelease(void *cbdata)
1849 {
1850     pmix_inventory_rollup_t *rollup = (pmix_inventory_rollup_t*)cbdata;
1851     if (NULL != rollup->info) {
1852         PMIX_INFO_FREE(rollup->info, rollup->ninfo);
1853     }
1854     PMIX_RELEASE(rollup);
1855 }
1856 
1857 static void clct_complete(pmix_status_t status,
1858                           pmix_list_t *inventory,
1859                           void *cbdata)
1860 {
1861     pmix_inventory_rollup_t *cd = (pmix_inventory_rollup_t*)cbdata;
1862     pmix_kval_t *kv;
1863     size_t n;
1864     pmix_status_t rc;
1865 
1866     PMIX_ACQUIRE_THREAD(&cd->lock);
1867 
1868     /* collect the results */
1869     if (NULL != inventory) {
1870         while (NULL != (kv = (pmix_kval_t*)pmix_list_remove_first(inventory))) {
1871             pmix_list_append(&cd->payload, &kv->super);
1872         }
1873     }
1874     if (PMIX_SUCCESS != status && PMIX_SUCCESS == cd->status) {
1875         cd->status = status;
1876     }
1877     /* see if we are done */
1878     cd->replies++;
1879     if (cd->replies == cd->requests) {
1880         /* no need to continue tracking the input directives */
1881         cd->info = NULL;
1882         cd->ninfo = 0;
1883         if (NULL != cd->infocbfunc) {
1884             /* convert the list to an array of pmix_info_t */
1885             cd->ninfo = pmix_list_get_size(&cd->payload);
1886             if (0 < cd->ninfo) {
1887                 PMIX_INFO_CREATE(cd->info, cd->ninfo);
1888                 if (NULL == cd->info) {
1889                     cd->status = PMIX_ERR_NOMEM;
1890                     cd->ninfo = 0;
1891                     PMIX_RELEASE_THREAD(&cd->lock);
1892                     goto error;
1893                 }
1894                 /* transfer the results */
1895                 n=0;
1896                 PMIX_LIST_FOREACH(kv, &cd->payload, pmix_kval_t) {
1897                     pmix_strncpy(cd->info[n].key, kv->key, PMIX_MAX_KEYLEN);
1898                     rc = pmix_value_xfer(&cd->info[n].value, kv->value);
1899                     if (PMIX_SUCCESS != rc) {
1900                         PMIX_INFO_FREE(cd->info, cd->ninfo);
1901                         cd->status = rc;
1902                         break;
1903                     }
1904                     ++n;
1905                 }
1906             }
1907             /* now call the requestor back */
1908             PMIX_RELEASE_THREAD(&cd->lock);
1909             cd->infocbfunc(cd->status, cd->info, cd->ninfo, cd->cbdata, cirelease, cd);
1910             return;
1911         }
1912     }
1913     /* continue to wait */
1914     PMIX_RELEASE_THREAD(&cd->lock);
1915     return;
1916 
1917   error:
1918     /* let them know */
1919     if (NULL != cd->infocbfunc) {
1920         cd->infocbfunc(cd->status, NULL, 0, cd->cbdata, NULL, NULL);
1921     }
1922     PMIX_RELEASE(cd);
1923 
1924 
1925 }
1926 static void clct(int sd, short args, void *cbdata)
1927 {
1928     pmix_inventory_rollup_t *cd = (pmix_inventory_rollup_t*)cbdata;
1929 
1930 #if PMIX_HAVE_HWLOC
1931     /* if we don't know our topology, we better get it now */
1932     pmix_status_t rc;
1933     if (NULL == pmix_hwloc_topology) {
1934         if (PMIX_SUCCESS != (rc = pmix_hwloc_get_topology(NULL, 0))) {
1935             PMIX_ERROR_LOG(rc);
1936             return;
1937         }
1938     }
1939 #endif
1940 
1941     /* we only have one source at this time */
1942     cd->requests = 1;
1943 
1944     /* collect the pnet inventory */
1945     pmix_pnet.collect_inventory(cd->info, cd->ninfo,
1946                                 clct_complete, cd);
1947 
1948     return;
1949 }
1950 
1951 pmix_status_t PMIx_server_collect_inventory(pmix_info_t directives[], size_t ndirs,
1952                                             pmix_info_cbfunc_t cbfunc, void *cbdata)
1953 {
1954     pmix_inventory_rollup_t *cd;
1955 
1956     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
1957     if (pmix_globals.init_cntr <= 0) {
1958         PMIX_RELEASE_THREAD(&pmix_global_lock);
1959         return PMIX_ERR_INIT;
1960     }
1961     PMIX_RELEASE_THREAD(&pmix_global_lock);
1962 
1963     /* need to threadshift this request */
1964     cd = PMIX_NEW(pmix_inventory_rollup_t);
1965     if (NULL == cd) {
1966         return PMIX_ERR_NOMEM;
1967     }
1968     cd->info = directives;
1969     cd->ninfo = ndirs;
1970     cd->infocbfunc = cbfunc;
1971     cd->cbdata = cbdata;
1972     PMIX_THREADSHIFT(cd, clct);
1973 
1974     return PMIX_SUCCESS;
1975 }
1976 
1977 static void dlinv_complete(pmix_status_t status, void *cbdata)
1978 {
1979     pmix_shift_caddy_t *cd = (pmix_shift_caddy_t*)cbdata;
1980 
1981     /* take the lock */
1982     PMIX_ACQUIRE_THREAD(&cd->lock);
1983 
1984     /* increment number of replies */
1985     cd->ndata++;  // reuse field in shift_caddy
1986     /* update status, if necessary */
1987     if (PMIX_SUCCESS != status && PMIX_SUCCESS == cd->status) {
1988         cd->status = status;
1989     }
1990     if (cd->ncodes == cd->ndata) {
1991         /* we are done - let the caller know */
1992         PMIX_RELEASE_THREAD(&cd->lock);
1993         if (NULL != cd->cbfunc.opcbfn) {
1994             cd->cbfunc.opcbfn(cd->status, cd->cbdata);
1995         }
1996         PMIX_RELEASE(cd);
1997         return;
1998     }
1999 
2000     PMIX_RELEASE_THREAD(&cd->lock);
2001     return;
2002 }
2003 
2004 
2005 static void dlinv(int sd, short args, void *cbdata)
2006 {
2007     pmix_shift_caddy_t *cd = (pmix_shift_caddy_t*)cbdata;
2008 
2009     /* only have one place to deliver inventory
2010      * at this time */
2011     cd->ncodes = 1;  // reuse field in shift_caddy
2012 
2013     pmix_pnet.deliver_inventory(cd->info, cd->ninfo,
2014                                 cd->directives, cd->ndirs,
2015                                 dlinv_complete, cd);
2016 
2017     return;
2018 
2019 }
2020 pmix_status_t PMIx_server_deliver_inventory(pmix_info_t info[], size_t ninfo,
2021                                             pmix_info_t directives[], size_t ndirs,
2022                                             pmix_op_cbfunc_t cbfunc, void *cbdata)
2023 {
2024     pmix_shift_caddy_t *cd;
2025 
2026     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
2027     if (pmix_globals.init_cntr <= 0) {
2028         PMIX_RELEASE_THREAD(&pmix_global_lock);
2029         return PMIX_ERR_INIT;
2030     }
2031     PMIX_RELEASE_THREAD(&pmix_global_lock);
2032 
2033     /* need to threadshift this request */
2034     cd = PMIX_NEW(pmix_shift_caddy_t);
2035     if (NULL == cd) {
2036         return PMIX_ERR_NOMEM;
2037     }
2038     cd->lock.active = false;
2039     cd->info = info;
2040     cd->ninfo = ninfo;
2041     cd->directives = directives;
2042     cd->ndirs = ndirs;
2043     cd->cbfunc.opcbfn = cbfunc;
2044     cd->cbdata = cbdata;
2045     PMIX_THREADSHIFT(cd, dlinv);
2046 
2047     return PMIX_SUCCESS;
2048 
2049 }
2050 
2051 /****    THE FOLLOWING CALLBACK FUNCTIONS ARE USED BY THE HOST SERVER    ****
2052  ****    THEY THEREFORE CAN OCCUR IN EITHER THE HOST SERVER'S THREAD     ****
2053  ****    CONTEXT, OR IN OUR OWN THREAD CONTEXT IF THE CALLBACK OCCURS    ****
2054  ****    IMMEDIATELY. THUS ANYTHING THAT ACCESSES A GLOBAL ENTITY        ****
2055  ****    MUST BE PUSHED INTO AN EVENT FOR PROTECTION                     ****/
2056 
2057 static void op_cbfunc(pmix_status_t status, void *cbdata)
2058 {
2059     pmix_server_caddy_t *cd = (pmix_server_caddy_t*)cbdata;
2060     pmix_buffer_t *reply;
2061     pmix_status_t rc;
2062 
2063     /* no need to thread-shift here as no global data is
2064      * being accessed */
2065 
2066     /* setup the reply with the returned status */
2067     if (NULL == (reply = PMIX_NEW(pmix_buffer_t))) {
2068         PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
2069         PMIX_RELEASE(cd);
2070         return;
2071     }
2072     PMIX_BFROPS_PACK(rc, cd->peer, reply, &status, 1, PMIX_STATUS);
2073     if (PMIX_SUCCESS != rc) {
2074         PMIX_ERROR_LOG(rc);
2075         PMIX_RELEASE(reply);
2076         PMIX_RELEASE(cd);
2077         return;
2078     }
2079 
2080     /* the function that created the server_caddy did a
2081      * retain on the peer, so we don't have to worry about
2082      * it still being present - send a copy to the originator */
2083     PMIX_PTL_SEND_ONEWAY(rc, cd->peer, reply, cd->hdr.tag);
2084     if (PMIX_SUCCESS != rc) {
2085         PMIX_ERROR_LOG(rc);
2086         PMIX_RELEASE(reply);
2087     }
2088 
2089     /* cleanup */
2090     PMIX_RELEASE(cd);
2091 }
2092 
2093 static void connection_cleanup(int sd, short args, void *cbdata)
2094 {
2095     pmix_server_caddy_t *cd = (pmix_server_caddy_t*)cbdata;
2096 
2097     /* ensure that we know the peer has finalized else we
2098      * will generate an event - yes, it should have been
2099      * done, but it is REALLY important that it be set */
2100     cd->peer->finalized = true;
2101     pmix_ptl_base_lost_connection(cd->peer, PMIX_SUCCESS);
2102     /* cleanup the caddy */
2103     PMIX_RELEASE(cd);
2104 }
2105 
2106 static void op_cbfunc2(pmix_status_t status, void *cbdata)
2107 {
2108     pmix_server_caddy_t *cd = (pmix_server_caddy_t*)cbdata;
2109     pmix_buffer_t *reply;
2110     pmix_status_t rc;
2111 
2112     /* no need to thread-shift here as no global data is
2113      * being accessed */
2114 
2115     /* setup the reply with the returned status */
2116     if (NULL == (reply = PMIX_NEW(pmix_buffer_t))) {
2117         PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
2118         PMIX_RELEASE(cd);
2119         return;
2120     }
2121     PMIX_BFROPS_PACK(rc, cd->peer, reply, &status, 1, PMIX_STATUS);
2122     if (PMIX_SUCCESS != rc) {
2123         PMIX_ERROR_LOG(rc);
2124         PMIX_RELEASE(reply);
2125         PMIX_RELEASE(cd);
2126         return;
2127     }
2128 
2129     /* the function that created the server_caddy did a
2130      * retain on the peer, so we don't have to worry about
2131      * it still being present - send a copy to the originator */
2132     PMIX_PTL_SEND_ONEWAY(rc, cd->peer, reply, cd->hdr.tag);
2133     if (PMIX_SUCCESS != rc) {
2134         PMIX_ERROR_LOG(rc);
2135         PMIX_RELEASE(reply);
2136     }
2137 
2138     /* cleanup any lingering references to this peer - note
2139      * that we cannot call the lost_connection function
2140      * directly as we need the connection to still
2141      * exist for the message (queued above) to be
2142      * sent. So we push this into an event, thus
2143      * ensuring that it will "fire" after the message
2144      * event has completed */
2145     PMIX_THREADSHIFT(cd, connection_cleanup);
2146 }
2147 
2148 static void _spcb(int sd, short args, void *cbdata)
2149 {
2150     pmix_shift_caddy_t *cd = (pmix_shift_caddy_t*)cbdata;
2151     pmix_buffer_t *reply;
2152     pmix_status_t rc;
2153     pmix_proc_t proc;
2154     pmix_cb_t cb;
2155     pmix_kval_t *kv;
2156 
2157     PMIX_ACQUIRE_OBJECT(cd);
2158 
2159     /* setup the reply with the returned status */
2160     if (NULL == (reply = PMIX_NEW(pmix_buffer_t))) {
2161         PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
2162         goto cleanup;
2163     }
2164     PMIX_BFROPS_PACK(rc, cd->cd->peer, reply, &cd->status, 1, PMIX_STATUS);
2165     if (PMIX_SUCCESS != rc) {
2166         PMIX_ERROR_LOG(rc);
2167         PMIX_RELEASE(reply);
2168         goto cleanup;
2169     }
2170     /* pass back the name of the nspace */
2171     PMIX_BFROPS_PACK(rc, cd->cd->peer, reply, &cd->pname.nspace, 1, PMIX_STRING);
2172     if (PMIX_SUCCESS != rc) {
2173         PMIX_ERROR_LOG(rc);
2174         PMIX_RELEASE(reply);
2175         goto cleanup;
2176     }
2177     /* add the job-level info, if we have it */
2178     pmix_strncpy(proc.nspace, cd->pname.nspace, PMIX_MAX_NSLEN);
2179     proc.rank = PMIX_RANK_WILDCARD;
2180     /* this is going to a local client, so let the gds
2181      * have the option of returning a copy of the data,
2182      * or a pointer to local storage */
2183     PMIX_CONSTRUCT(&cb, pmix_cb_t);
2184     cb.proc = &proc;
2185     cb.scope = PMIX_SCOPE_UNDEF;
2186     cb.copy = false;
2187     PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, &cb);
2188     if (PMIX_SUCCESS == rc) {
2189         PMIX_LIST_FOREACH(kv, &cb.kvs, pmix_kval_t) {
2190             PMIX_BFROPS_PACK(rc, cd->cd->peer, reply, kv, 1, PMIX_KVAL);
2191             if (PMIX_SUCCESS != rc) {
2192                 PMIX_ERROR_LOG(rc);
2193                 PMIX_RELEASE(reply);
2194                 PMIX_DESTRUCT(&cb);
2195                 goto cleanup;
2196             }
2197         }
2198         PMIX_DESTRUCT(&cb);
2199     }
2200 
2201     /* the function that created the server_caddy did a
2202      * retain on the peer, so we don't have to worry about
2203      * it still being present - tell the originator the result */
2204     PMIX_SERVER_QUEUE_REPLY(rc, cd->cd->peer, cd->cd->hdr.tag, reply);
2205     if (PMIX_SUCCESS != rc) {
2206         PMIX_RELEASE(reply);
2207     }
2208 
2209   cleanup:
2210     /* cleanup */
2211     PMIX_RELEASE(cd->cd);
2212     PMIX_RELEASE(cd);
2213 }
2214 
2215 static void spawn_cbfunc(pmix_status_t status, char *nspace, void *cbdata)
2216 {
2217     pmix_shift_caddy_t *cd;
2218 
2219     /* need to thread-shift this request */
2220     cd = PMIX_NEW(pmix_shift_caddy_t);
2221     cd->status = status;
2222     cd->pname.nspace = strdup(nspace);
2223     cd->cd = (pmix_server_caddy_t*)cbdata;;
2224 
2225     PMIX_THREADSHIFT(cd, _spcb);
2226 }
2227 
2228 static void lookup_cbfunc(pmix_status_t status, pmix_pdata_t pdata[], size_t ndata,
2229                           void *cbdata)
2230 {
2231     pmix_server_caddy_t *cd = (pmix_server_caddy_t*)cbdata;
2232     pmix_buffer_t *reply;
2233     pmix_status_t rc;
2234 
2235     /* no need to thread-shift as no global data is accessed */
2236     /* setup the reply with the returned status */
2237     if (NULL == (reply = PMIX_NEW(pmix_buffer_t))) {
2238         PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
2239         PMIX_RELEASE(cd);
2240         return;
2241     }
2242     PMIX_BFROPS_PACK(rc, cd->peer, reply, &status, 1, PMIX_STATUS);
2243     if (PMIX_SUCCESS != rc) {
2244         PMIX_ERROR_LOG(rc);
2245         PMIX_RELEASE(reply);
2246         return;
2247     }
2248     if (PMIX_SUCCESS == status) {
2249         /* pack the returned data objects */
2250         PMIX_BFROPS_PACK(rc, cd->peer, reply, &ndata, 1, PMIX_SIZE);
2251         if (PMIX_SUCCESS != rc) {
2252             PMIX_ERROR_LOG(rc);
2253             PMIX_RELEASE(reply);
2254             return;
2255         }
2256         PMIX_BFROPS_PACK(rc, cd->peer, reply, pdata, ndata, PMIX_PDATA);
2257         if (PMIX_SUCCESS != rc) {
2258             PMIX_ERROR_LOG(rc);
2259             PMIX_RELEASE(reply);
2260             return;
2261         }
2262     }
2263 
2264     /* the function that created the server_caddy did a
2265      * retain on the peer, so we don't have to worry about
2266      * it still being present - tell the originator the result */
2267     PMIX_SERVER_QUEUE_REPLY(rc, cd->peer, cd->hdr.tag, reply);
2268     if (PMIX_SUCCESS != rc) {
2269         PMIX_RELEASE(reply);
2270     }
2271     /* cleanup */
2272     PMIX_RELEASE(cd);
2273 }
2274 
2275 /* fence modex calls return here when the host RM has completed
2276  * the operation - any enclosed data is provided to us as a blob
2277  * which contains byte objects, one for each set of data. Our
2278  * peer servers will have packed the blobs using our common
2279  * GDS module, so use the mypeer one to unpack them */
2280 static void _mdxcbfunc(int sd, short argc, void *cbdata)
2281 {
2282     pmix_shift_caddy_t *scd = (pmix_shift_caddy_t*)cbdata;
2283     pmix_server_trkr_t *tracker = scd->tracker;
2284     pmix_buffer_t xfer, *reply;
2285     pmix_server_caddy_t *cd, *nxt;
2286     pmix_status_t rc = PMIX_SUCCESS, ret;
2287     pmix_nspace_caddy_t *nptr;
2288     pmix_list_t nslist;
2289     bool found;
2290 
2291     PMIX_ACQUIRE_OBJECT(scd);
2292 
2293     if (NULL == tracker) {
2294         /* give them a release if they want it - this should
2295          * never happen, but protect against the possibility */
2296         if (NULL != scd->cbfunc.relfn) {
2297             scd->cbfunc.relfn(scd->cbdata);
2298         }
2299         PMIX_RELEASE(scd);
2300         return;
2301     }
2302 
2303     /* if we get here, then there are processes waiting
2304      * for a response */
2305 
2306     /* if the timer is active, clear it */
2307     if (tracker->event_active) {
2308         pmix_event_del(&tracker->ev);
2309     }
2310 
2311     /* pass the blobs being returned */
2312     PMIX_CONSTRUCT(&xfer, pmix_buffer_t);
2313     PMIX_CONSTRUCT(&nslist, pmix_list_t);
2314 
2315     if (PMIX_SUCCESS != scd->status) {
2316         rc = scd->status;
2317         goto finish_collective;
2318     }
2319 
2320     if (PMIX_COLLECT_INVALID == tracker->collect_type) {
2321         rc = PMIX_ERR_INVALID_ARG;
2322         goto finish_collective;
2323     }
2324 
2325     // Skip the data if we didn't collect it
2326     if (PMIX_COLLECT_YES != tracker->collect_type) {
2327         rc = PMIX_SUCCESS;
2328         goto finish_collective;
2329     }
2330 
2331     /* Collect the nptr list with uniq GDS components of all local
2332      * participants. It does not allow multiple storing to the
2333      * same GDS if participants have mutual GDS. */
2334     PMIX_LIST_FOREACH(cd, &tracker->local_cbs, pmix_server_caddy_t) {
2335         // see if we already have this nspace
2336         found = false;
2337         PMIX_LIST_FOREACH(nptr, &nslist, pmix_nspace_caddy_t) {
2338             if (0 == strcmp(nptr->ns->compat.gds->name,
2339                             cd->peer->nptr->compat.gds->name)) {
2340                 found = true;
2341                 break;
2342             }
2343         }
2344         if (!found) {
2345             // add it
2346             nptr = PMIX_NEW(pmix_nspace_caddy_t);
2347             PMIX_RETAIN(cd->peer->nptr);
2348             nptr->ns = cd->peer->nptr;
2349             pmix_list_append(&nslist, &nptr->super);
2350         }
2351     }
2352     PMIX_LIST_FOREACH(nptr, &nslist, pmix_nspace_caddy_t) {
2353         /* pass the blobs being returned */
2354         PMIX_LOAD_BUFFER(pmix_globals.mypeer, &xfer, scd->data, scd->ndata);
2355         PMIX_GDS_STORE_MODEX(rc, nptr->ns, &xfer, tracker);
2356         if (PMIX_SUCCESS != rc) {
2357             PMIX_ERROR_LOG(rc);
2358             break;
2359         }
2360     }
2361 
2362   finish_collective:
2363     /* loop across all procs in the tracker, sending them the reply */
2364     PMIX_LIST_FOREACH_SAFE(cd, nxt, &tracker->local_cbs, pmix_server_caddy_t) {
2365         reply = PMIX_NEW(pmix_buffer_t);
2366         if (NULL == reply) {
2367             rc = PMIX_ERR_NOMEM;
2368             break;
2369         }
2370         /* setup the reply, starting with the returned status */
2371         PMIX_BFROPS_PACK(ret, cd->peer, reply, &rc, 1, PMIX_STATUS);
2372         if (PMIX_SUCCESS != ret) {
2373             PMIX_ERROR_LOG(ret);
2374             goto cleanup;
2375         }
2376         pmix_output_verbose(2, pmix_server_globals.base_output,
2377                             "server:modex_cbfunc reply being sent to %s:%u",
2378                             cd->peer->info->pname.nspace, cd->peer->info->pname.rank);
2379         PMIX_SERVER_QUEUE_REPLY(rc, cd->peer, cd->hdr.tag, reply);
2380         if (PMIX_SUCCESS != rc) {
2381             PMIX_RELEASE(reply);
2382         }
2383         /* remove this entry */
2384         pmix_list_remove_item(&tracker->local_cbs, &cd->super);
2385         PMIX_RELEASE(cd);
2386     }
2387 
2388   cleanup:
2389     /* Protect data from being free'd because RM pass
2390      * the pointer that is set to the middle of some
2391      * buffer (the case with SLURM).
2392      * RM is responsible on the release of the buffer
2393      */
2394     xfer.base_ptr = NULL;
2395     xfer.bytes_used = 0;
2396     PMIX_DESTRUCT(&xfer);
2397 
2398     pmix_list_remove_item(&pmix_server_globals.collectives, &tracker->super);
2399     PMIX_RELEASE(tracker);
2400     PMIX_LIST_DESTRUCT(&nslist);
2401 
2402     /* we are done */
2403     if (NULL != scd->cbfunc.relfn) {
2404         scd->cbfunc.relfn(scd->cbdata);
2405     }
2406     PMIX_RELEASE(scd);
2407 }
2408 
2409 static void modex_cbfunc(pmix_status_t status, const char *data, size_t ndata, void *cbdata,
2410                          pmix_release_cbfunc_t relfn, void *relcbd)
2411 {
2412     pmix_server_trkr_t *tracker = (pmix_server_trkr_t*)cbdata;
2413     pmix_shift_caddy_t *scd;
2414 
2415     pmix_output_verbose(2, pmix_server_globals.base_output,
2416                         "server:modex_cbfunc called with %d bytes", (int)ndata);
2417 
2418     /* need to thread-shift this callback as it accesses global data */
2419     scd = PMIX_NEW(pmix_shift_caddy_t);
2420     if (NULL == scd) {
2421         /* nothing we can do */
2422         if (NULL != relfn) {
2423             relfn(cbdata);
2424         }
2425         return;
2426     }
2427     scd->status = status;
2428     scd->data = data;
2429     scd->ndata = ndata;
2430     scd->tracker = tracker;
2431     scd->cbfunc.relfn = relfn;
2432     scd->cbdata = relcbd;
2433     PMIX_THREADSHIFT(scd, _mdxcbfunc);
2434 }
2435 
2436 static void get_cbfunc(pmix_status_t status, const char *data, size_t ndata, void *cbdata,
2437                        pmix_release_cbfunc_t relfn, void *relcbd)
2438 {
2439     pmix_server_caddy_t *cd = (pmix_server_caddy_t*)cbdata;
2440     pmix_buffer_t *reply, buf;
2441     pmix_status_t rc;
2442 
2443     pmix_output_verbose(2, pmix_server_globals.base_output,
2444                         "server:get_cbfunc called with %d bytes", (int)ndata);
2445 
2446     /* no need to thread-shift here as no global data is accessed
2447      * and we are called from another internal function
2448      * (see pmix_server_get.c:pmix_pending_resolve) that
2449      * has already been thread-shifted */
2450 
2451     if (NULL == cd) {
2452         /* nothing to do - but be sure to give them
2453          * a release if they want it */
2454         if (NULL != relfn) {
2455             relfn(relcbd);
2456         }
2457         return;
2458     }
2459 
2460     /* setup the reply, starting with the returned status */
2461     reply = PMIX_NEW(pmix_buffer_t);
2462     if (NULL == reply) {
2463         rc = PMIX_ERR_NOMEM;
2464         goto cleanup;
2465     }
2466     PMIX_BFROPS_PACK(rc, cd->peer, reply, &status, 1, PMIX_STATUS);
2467     if (PMIX_SUCCESS != rc) {
2468         PMIX_ERROR_LOG(rc);
2469         goto cleanup;
2470     }
2471     /* pack the blob being returned */
2472     PMIX_CONSTRUCT(&buf, pmix_buffer_t);
2473     PMIX_LOAD_BUFFER(cd->peer, &buf, data, ndata);
2474     PMIX_BFROPS_COPY_PAYLOAD(rc, cd->peer, reply, &buf);
2475     buf.base_ptr = NULL;
2476     buf.bytes_used = 0;
2477     PMIX_DESTRUCT(&buf);
2478     /* send the data to the requestor */
2479     pmix_output_verbose(2, pmix_server_globals.base_output,
2480                         "server:get_cbfunc reply being sent to %s:%u",
2481                         cd->peer->info->pname.nspace, cd->peer->info->pname.rank);
2482     pmix_output_hexdump(10, pmix_server_globals.base_output,
2483                         reply->base_ptr, (reply->bytes_used < 256 ? reply->bytes_used : 256));
2484 
2485     PMIX_SERVER_QUEUE_REPLY(rc, cd->peer, cd->hdr.tag, reply);
2486     if (PMIX_SUCCESS != rc) {
2487         PMIX_RELEASE(reply);
2488     }
2489 
2490  cleanup:
2491     /* if someone wants a release, give it to them */
2492     if (NULL != relfn) {
2493         relfn(relcbd);
2494     }
2495     PMIX_RELEASE(cd);
2496 }
2497 
2498 static void _cnct(int sd, short args, void *cbdata)
2499 {
2500     pmix_shift_caddy_t *scd = (pmix_shift_caddy_t*)cbdata;
2501     pmix_server_trkr_t *tracker = scd->tracker;
2502     pmix_buffer_t *reply, pbkt;
2503     pmix_byte_object_t bo;
2504     pmix_status_t rc;
2505     int i;
2506     pmix_server_caddy_t *cd;
2507     char **nspaces=NULL;
2508     bool found;
2509     pmix_proc_t proc;
2510     pmix_cb_t cb;
2511     pmix_kval_t *kptr;
2512 
2513     PMIX_ACQUIRE_OBJECT(scd);
2514 
2515     if (NULL == tracker) {
2516         /* nothing to do */
2517         return;
2518     }
2519 
2520     /* if we get here, then there are processes waiting
2521      * for a response */
2522 
2523     /* if the timer is active, clear it */
2524     if (tracker->event_active) {
2525         pmix_event_del(&tracker->ev);
2526     }
2527 
2528     /* find the unique nspaces that are participating */
2529     PMIX_LIST_FOREACH(cd, &tracker->local_cbs, pmix_server_caddy_t) {
2530         if (NULL == nspaces) {
2531             pmix_argv_append_nosize(&nspaces, cd->peer->info->pname.nspace);
2532         } else {
2533             found = false;
2534             for (i=0; NULL != nspaces[i]; i++) {
2535                 if (0 == strcmp(nspaces[i], cd->peer->info->pname.nspace)) {
2536                     found = true;
2537                     break;
2538                 }
2539             }
2540             if (!found) {
2541                 pmix_argv_append_nosize(&nspaces, cd->peer->info->pname.nspace);
2542             }
2543         }
2544     }
2545 
2546     /* loop across all local procs in the tracker, sending them the reply */
2547     PMIX_LIST_FOREACH(cd, &tracker->local_cbs, pmix_server_caddy_t) {
2548         /* setup the reply, starting with the returned status */
2549         reply = PMIX_NEW(pmix_buffer_t);
2550         if (NULL == reply) {
2551             PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
2552             rc = PMIX_ERR_NOMEM;
2553             goto cleanup;
2554         }
2555         /* start with the status */
2556         PMIX_BFROPS_PACK(rc, cd->peer, reply, &scd->status, 1, PMIX_STATUS);
2557         if (PMIX_SUCCESS != rc) {
2558             PMIX_ERROR_LOG(rc);
2559             PMIX_RELEASE(reply);
2560             goto cleanup;
2561         }
2562         if (PMIX_SUCCESS == scd->status) {
2563             /* loop across all participating nspaces and include their
2564              * job-related info */
2565             for (i=0; NULL != nspaces[i]; i++) {
2566                 /* if this is the local proc's own nspace, then
2567                  * ignore it - it already has this info */
2568                 if (0 == strncmp(nspaces[i], cd->peer->info->pname.nspace, PMIX_MAX_NSLEN)) {
2569                     continue;
2570                 }
2571 
2572                 /* this is a local request, so give the gds the option
2573                  * of returning a copy of the data, or a pointer to
2574                  * local storage */
2575                 /* add the job-level info, if necessary */
2576                 proc.rank = PMIX_RANK_WILDCARD;
2577                 pmix_strncpy(proc.nspace, nspaces[i], PMIX_MAX_NSLEN);
2578                 PMIX_CONSTRUCT(&cb, pmix_cb_t);
2579                 /* this is for a local client, so give the gds the
2580                  * option of returning a complete copy of the data,
2581                  * or returning a pointer to local storage */
2582                 cb.proc = &proc;
2583                 cb.scope = PMIX_SCOPE_UNDEF;
2584                 cb.copy = false;
2585                 PMIX_GDS_FETCH_KV(rc, cd->peer, &cb);
2586                 if (PMIX_SUCCESS != rc) {
2587                     PMIX_ERROR_LOG(rc);
2588                     PMIX_RELEASE(reply);
2589                     PMIX_DESTRUCT(&cb);
2590                     goto cleanup;
2591                 }
2592                 PMIX_CONSTRUCT(&pbkt, pmix_buffer_t);
2593                 /* pack the nspace name */
2594                 PMIX_BFROPS_PACK(rc, cd->peer, &pbkt, &nspaces[i], 1, PMIX_STRING);
2595                 if (PMIX_SUCCESS != rc) {
2596                     PMIX_ERROR_LOG(rc);
2597                     PMIX_RELEASE(reply);
2598                     PMIX_DESTRUCT(&pbkt);
2599                     PMIX_DESTRUCT(&cb);
2600                     goto cleanup;
2601                 }
2602                 PMIX_LIST_FOREACH(kptr, &cb.kvs, pmix_kval_t) {
2603                     PMIX_BFROPS_PACK(rc, cd->peer, &pbkt, kptr, 1, PMIX_KVAL);
2604                     if (PMIX_SUCCESS != rc) {
2605                         PMIX_ERROR_LOG(rc);
2606                         PMIX_RELEASE(reply);
2607                         PMIX_DESTRUCT(&pbkt);
2608                         PMIX_DESTRUCT(&cb);
2609                         goto cleanup;
2610                     }
2611                 }
2612                 PMIX_DESTRUCT(&cb);
2613 
2614                 if (PMIX_PROC_IS_V1(cd->peer) || PMIX_PROC_IS_V20(cd->peer)) {
2615                     PMIX_BFROPS_PACK(rc, cd->peer, reply, &pbkt, 1, PMIX_BUFFER);
2616                     if (PMIX_SUCCESS != rc) {
2617                         PMIX_ERROR_LOG(rc);
2618                         PMIX_RELEASE(reply);
2619                         PMIX_DESTRUCT(&pbkt);
2620                         PMIX_DESTRUCT(&cb);
2621                         goto cleanup;
2622                     }
2623                 } else {
2624                     PMIX_UNLOAD_BUFFER(&pbkt, bo.bytes, bo.size);
2625                     PMIX_BFROPS_PACK(rc, cd->peer, reply, &bo, 1, PMIX_BYTE_OBJECT);
2626                     if (PMIX_SUCCESS != rc) {
2627                         PMIX_ERROR_LOG(rc);
2628                         PMIX_RELEASE(reply);
2629                         PMIX_DESTRUCT(&pbkt);
2630                         PMIX_DESTRUCT(&cb);
2631                         goto cleanup;
2632                     }
2633                 }
2634 
2635                 PMIX_DESTRUCT(&pbkt);
2636             }
2637         }
2638         pmix_output_verbose(2, pmix_server_globals.base_output,
2639                             "server:cnct_cbfunc reply being sent to %s:%u",
2640                             cd->peer->info->pname.nspace, cd->peer->info->pname.rank);
2641         PMIX_SERVER_QUEUE_REPLY(rc, cd->peer, cd->hdr.tag, reply);
2642         if (PMIX_SUCCESS != rc) {
2643             PMIX_RELEASE(reply);
2644         }
2645     }
2646 
2647   cleanup:
2648     if (NULL != nspaces) {
2649       pmix_argv_free(nspaces);
2650     }
2651     pmix_list_remove_item(&pmix_server_globals.collectives, &tracker->super);
2652     PMIX_RELEASE(tracker);
2653 
2654     /* we are done */
2655     PMIX_RELEASE(scd);
2656 }
2657 
2658 static void cnct_cbfunc(pmix_status_t status, void *cbdata)
2659 {
2660     pmix_server_trkr_t *tracker = (pmix_server_trkr_t*)cbdata;
2661     pmix_shift_caddy_t *scd;
2662 
2663     pmix_output_verbose(2, pmix_server_globals.base_output,
2664                         "server:cnct_cbfunc called");
2665 
2666     /* need to thread-shift this callback as it accesses global data */
2667     scd = PMIX_NEW(pmix_shift_caddy_t);
2668     if (NULL == scd) {
2669         /* nothing we can do */
2670         return;
2671     }
2672     scd->status = status;
2673     scd->tracker = tracker;
2674     PMIX_THREADSHIFT(scd, _cnct);
2675 }
2676 
2677 static void _discnct(int sd, short args, void *cbdata)
2678 {
2679     pmix_shift_caddy_t *scd = (pmix_shift_caddy_t*)cbdata;
2680     pmix_server_trkr_t *tracker = scd->tracker;
2681     pmix_buffer_t *reply;
2682     pmix_status_t rc;
2683     pmix_server_caddy_t *cd;
2684 
2685     PMIX_ACQUIRE_OBJECT(scd);
2686 
2687     if (NULL == tracker) {
2688         /* nothing to do */
2689         return;
2690     }
2691 
2692     /* if we get here, then there are processes waiting
2693      * for a response */
2694 
2695     /* if the timer is active, clear it */
2696     if (tracker->event_active) {
2697         pmix_event_del(&tracker->ev);
2698     }
2699 
2700     /* loop across all local procs in the tracker, sending them the reply */
2701     PMIX_LIST_FOREACH(cd, &tracker->local_cbs, pmix_server_caddy_t) {
2702         /* setup the reply */
2703         reply = PMIX_NEW(pmix_buffer_t);
2704         if (NULL == reply) {
2705             PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
2706             rc = PMIX_ERR_NOMEM;
2707             goto cleanup;
2708         }
2709         /* return the status */
2710         PMIX_BFROPS_PACK(rc, cd->peer, reply, &scd->status, 1, PMIX_STATUS);
2711         if (PMIX_SUCCESS != rc) {
2712             PMIX_ERROR_LOG(rc);
2713             PMIX_RELEASE(reply);
2714             goto cleanup;
2715         }
2716         pmix_output_verbose(2, pmix_server_globals.base_output,
2717                             "server:cnct_cbfunc reply being sent to %s:%u",
2718                             cd->peer->info->pname.nspace, cd->peer->info->pname.rank);
2719         PMIX_SERVER_QUEUE_REPLY(rc, cd->peer, cd->hdr.tag, reply);
2720         if (PMIX_SUCCESS != rc) {
2721             PMIX_RELEASE(reply);
2722         }
2723     }
2724 
2725   cleanup:
2726     /* cleanup the tracker -- the host RM is responsible for
2727      * telling us when to remove the nspace from our data */
2728     pmix_list_remove_item(&pmix_server_globals.collectives, &tracker->super);
2729     PMIX_RELEASE(tracker);
2730 
2731     /* we are done */
2732     PMIX_RELEASE(scd);
2733 }
2734 
2735 static void discnct_cbfunc(pmix_status_t status, void *cbdata)
2736 {
2737     pmix_server_trkr_t *tracker = (pmix_server_trkr_t*)cbdata;
2738     pmix_shift_caddy_t *scd;
2739 
2740     pmix_output_verbose(2, pmix_server_globals.base_output,
2741                         "server:discnct_cbfunc called on nspace %s",
2742                         (NULL == tracker) ? "NULL" : tracker->pname.nspace);
2743 
2744     /* need to thread-shift this callback as it accesses global data */
2745     scd = PMIX_NEW(pmix_shift_caddy_t);
2746     if (NULL == scd) {
2747         /* nothing we can do */
2748         return;
2749     }
2750     scd->status = status;
2751     scd->tracker = tracker;
2752     PMIX_THREADSHIFT(scd, _discnct);
2753 }
2754 
2755 
2756 static void regevents_cbfunc(pmix_status_t status, void *cbdata)
2757 {
2758     pmix_status_t rc;
2759     pmix_server_caddy_t *cd = (pmix_server_caddy_t*) cbdata;
2760     pmix_buffer_t *reply;
2761 
2762     pmix_output_verbose(2, pmix_server_globals.base_output,
2763                         "server:regevents_cbfunc called status = %d", status);
2764 
2765     reply = PMIX_NEW(pmix_buffer_t);
2766     if (NULL == reply) {
2767         PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
2768         PMIX_RELEASE(cd);
2769         return;
2770     }
2771     PMIX_BFROPS_PACK(rc, cd->peer, reply, &status, 1, PMIX_STATUS);
2772     if (PMIX_SUCCESS != rc) {
2773         PMIX_ERROR_LOG(rc);
2774     }
2775     // send reply
2776     PMIX_SERVER_QUEUE_REPLY(rc, cd->peer, cd->hdr.tag, reply);
2777     if (PMIX_SUCCESS != rc) {
2778         PMIX_RELEASE(reply);
2779     }
2780     PMIX_RELEASE(cd);
2781 }
2782 
2783 static void notifyerror_cbfunc (pmix_status_t status, void *cbdata)
2784 {
2785     pmix_status_t rc;
2786     pmix_server_caddy_t *cd = (pmix_server_caddy_t*) cbdata;
2787     pmix_buffer_t *reply;
2788 
2789     pmix_output_verbose(2, pmix_server_globals.base_output,
2790                         "server:notifyerror_cbfunc called status = %d", status);
2791 
2792     reply = PMIX_NEW(pmix_buffer_t);
2793     if (NULL == reply) {
2794         PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
2795         PMIX_RELEASE(cd);
2796         return;
2797     }
2798     PMIX_BFROPS_PACK(rc, cd->peer, reply, &status, 1, PMIX_STATUS);
2799     if (PMIX_SUCCESS != rc) {
2800         PMIX_ERROR_LOG(rc);
2801     }
2802     // send reply
2803     PMIX_SERVER_QUEUE_REPLY(rc, cd->peer, cd->hdr.tag, reply);
2804     if (PMIX_SUCCESS != rc) {
2805         PMIX_RELEASE(reply);
2806     }
2807     PMIX_RELEASE(cd);
2808 }
2809 
2810 static void alloc_cbfunc(pmix_status_t status,
2811                          pmix_info_t *info, size_t ninfo,
2812                          void *cbdata,
2813                          pmix_release_cbfunc_t release_fn,
2814                          void *release_cbdata)
2815 {
2816     pmix_query_caddy_t *qcd = (pmix_query_caddy_t*)cbdata;
2817     pmix_server_caddy_t *cd = (pmix_server_caddy_t*)qcd->cbdata;
2818     pmix_buffer_t *reply;
2819     pmix_status_t rc;
2820 
2821     pmix_output_verbose(2, pmix_server_globals.base_output,
2822                         "pmix:alloc callback with status %d", status);
2823 
2824     reply = PMIX_NEW(pmix_buffer_t);
2825     if (NULL == reply) {
2826         PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
2827         PMIX_RELEASE(cd);
2828         return;
2829     }
2830     PMIX_BFROPS_PACK(rc, cd->peer, reply, &status, 1, PMIX_STATUS);
2831     if (PMIX_SUCCESS != rc) {
2832         PMIX_ERROR_LOG(rc);
2833         goto complete;
2834     }
2835     /* pack the returned data */
2836     PMIX_BFROPS_PACK(rc, cd->peer, reply, &ninfo, 1, PMIX_SIZE);
2837     if (PMIX_SUCCESS != rc) {
2838         PMIX_ERROR_LOG(rc);
2839         goto complete;
2840     }
2841     if (0 < ninfo) {
2842         PMIX_BFROPS_PACK(rc, cd->peer, reply, info, ninfo, PMIX_INFO);
2843         if (PMIX_SUCCESS != rc) {
2844             PMIX_ERROR_LOG(rc);
2845         }
2846     }
2847 
2848   complete:
2849     // send reply
2850     PMIX_SERVER_QUEUE_REPLY(rc, cd->peer, cd->hdr.tag, reply);
2851         if (PMIX_SUCCESS != rc) {
2852         PMIX_RELEASE(reply);
2853     }
2854 
2855     // cleanup
2856     if (NULL != qcd->queries) {
2857         PMIX_QUERY_FREE(qcd->queries, qcd->nqueries);
2858     }
2859     if (NULL != qcd->info) {
2860         PMIX_INFO_FREE(qcd->info, qcd->ninfo);
2861     }
2862     PMIX_RELEASE(qcd);
2863     PMIX_RELEASE(cd);
2864     if (NULL != release_fn) {
2865         release_fn(release_cbdata);
2866     }
2867 }
2868 
2869 static void query_cbfunc(pmix_status_t status,
2870                          pmix_info_t *info, size_t ninfo,
2871                          void *cbdata,
2872                          pmix_release_cbfunc_t release_fn,
2873                          void *release_cbdata)
2874 {
2875     pmix_query_caddy_t *qcd = (pmix_query_caddy_t*)cbdata;
2876     pmix_server_caddy_t *cd = (pmix_server_caddy_t*)qcd->cbdata;
2877     pmix_buffer_t *reply;
2878     pmix_status_t rc;
2879 
2880     pmix_output_verbose(2, pmix_server_globals.base_output,
2881                         "pmix:query callback with status %s", PMIx_Error_string(status));
2882 
2883     reply = PMIX_NEW(pmix_buffer_t);
2884     if (NULL == reply) {
2885         PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
2886         PMIX_RELEASE(cd);
2887         return;
2888     }
2889     PMIX_BFROPS_PACK(rc, cd->peer, reply, &status, 1, PMIX_STATUS);
2890     if (PMIX_SUCCESS != rc) {
2891         PMIX_ERROR_LOG(rc);
2892         goto complete;
2893     }
2894     /* pack the returned data */
2895     PMIX_BFROPS_PACK(rc, cd->peer, reply, &ninfo, 1, PMIX_SIZE);
2896     if (PMIX_SUCCESS != rc) {
2897         PMIX_ERROR_LOG(rc);
2898         goto complete;
2899     }
2900     if (0 < ninfo) {
2901         PMIX_BFROPS_PACK(rc, cd->peer, reply, info, ninfo, PMIX_INFO);
2902         if (PMIX_SUCCESS != rc) {
2903             PMIX_ERROR_LOG(rc);
2904         }
2905     }
2906 
2907     /* cache the data for any future requests */
2908 
2909   complete:
2910     // send reply
2911     PMIX_SERVER_QUEUE_REPLY(rc, cd->peer, cd->hdr.tag, reply);
2912         if (PMIX_SUCCESS != rc) {
2913         PMIX_RELEASE(reply);
2914     }
2915 
2916     // cleanup
2917     if (NULL != qcd->queries) {
2918         PMIX_QUERY_FREE(qcd->queries, qcd->nqueries);
2919     }
2920     if (NULL != qcd->info) {
2921         PMIX_INFO_FREE(qcd->info, qcd->ninfo);
2922     }
2923     PMIX_RELEASE(qcd);
2924     PMIX_RELEASE(cd);
2925     if (NULL != release_fn) {
2926         release_fn(release_cbdata);
2927     }
2928 }
2929 
2930 static void jctrl_cbfunc(pmix_status_t status,
2931                          pmix_info_t *info, size_t ninfo,
2932                          void *cbdata,
2933                          pmix_release_cbfunc_t release_fn,
2934                          void *release_cbdata)
2935 {
2936     pmix_query_caddy_t *qcd = (pmix_query_caddy_t*)cbdata;
2937     pmix_server_caddy_t *cd = (pmix_server_caddy_t*)qcd->cbdata;
2938     pmix_buffer_t *reply;
2939     pmix_status_t rc;
2940 
2941     pmix_output_verbose(2, pmix_server_globals.base_output,
2942                         "pmix:jctrl callback with status %d", status);
2943 
2944     reply = PMIX_NEW(pmix_buffer_t);
2945     if (NULL == reply) {
2946         PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
2947         PMIX_RELEASE(cd);
2948         return;
2949     }
2950     PMIX_BFROPS_PACK(rc, cd->peer, reply, &status, 1, PMIX_STATUS);
2951     if (PMIX_SUCCESS != rc) {
2952         PMIX_ERROR_LOG(rc);
2953         goto complete;
2954     }
2955     /* pack the returned data */
2956     PMIX_BFROPS_PACK(rc, cd->peer, reply, &ninfo, 1, PMIX_SIZE);
2957     if (PMIX_SUCCESS != rc) {
2958         PMIX_ERROR_LOG(rc);
2959         goto complete;
2960     }
2961     if (0 < ninfo) {
2962         PMIX_BFROPS_PACK(rc, cd->peer, reply, info, ninfo, PMIX_INFO);
2963         if (PMIX_SUCCESS != rc) {
2964             PMIX_ERROR_LOG(rc);
2965         }
2966     }
2967 
2968   complete:
2969     // send reply
2970     PMIX_SERVER_QUEUE_REPLY(rc, cd->peer, cd->hdr.tag, reply);
2971         if (PMIX_SUCCESS != rc) {
2972         PMIX_RELEASE(reply);
2973     }
2974 
2975     // cleanup
2976     if (NULL != qcd->queries) {
2977         PMIX_QUERY_FREE(qcd->queries, qcd->nqueries);
2978     }
2979     if (NULL != qcd->info) {
2980         PMIX_INFO_FREE(qcd->info, qcd->ninfo);
2981     }
2982     PMIX_RELEASE(qcd);
2983     PMIX_RELEASE(cd);
2984     if (NULL != release_fn) {
2985         release_fn(release_cbdata);
2986     }
2987 }
2988 
2989 static void monitor_cbfunc(pmix_status_t status,
2990                            pmix_info_t *info, size_t ninfo,
2991                            void *cbdata,
2992                            pmix_release_cbfunc_t release_fn,
2993                            void *release_cbdata)
2994 {
2995     pmix_query_caddy_t *qcd = (pmix_query_caddy_t*)cbdata;
2996     pmix_server_caddy_t *cd = (pmix_server_caddy_t*)qcd->cbdata;
2997     pmix_buffer_t *reply;
2998     pmix_status_t rc;
2999 
3000     pmix_output_verbose(2, pmix_server_globals.base_output,
3001                         "pmix:monitor callback with status %d", status);
3002 
3003     reply = PMIX_NEW(pmix_buffer_t);
3004     if (NULL == reply) {
3005         PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
3006         PMIX_RELEASE(cd);
3007         return;
3008     }
3009     PMIX_BFROPS_PACK(rc, cd->peer, reply, &status, 1, PMIX_STATUS);
3010     if (PMIX_SUCCESS != rc) {
3011         PMIX_ERROR_LOG(rc);
3012         goto complete;
3013     }
3014     /* pack the returned data */
3015     PMIX_BFROPS_PACK(rc, cd->peer, reply, &ninfo, 1, PMIX_SIZE);
3016     if (PMIX_SUCCESS != rc) {
3017         PMIX_ERROR_LOG(rc);
3018         goto complete;
3019     }
3020     if (0 < ninfo) {
3021         PMIX_BFROPS_PACK(rc, cd->peer, reply, info, ninfo, PMIX_INFO);
3022         if (PMIX_SUCCESS != rc) {
3023             PMIX_ERROR_LOG(rc);
3024         }
3025     }
3026 
3027   complete:
3028     // send reply
3029     PMIX_SERVER_QUEUE_REPLY(rc, cd->peer, cd->hdr.tag, reply);
3030         if (PMIX_SUCCESS != rc) {
3031         PMIX_RELEASE(reply);
3032     }
3033 
3034     // cleanup
3035     if (NULL != qcd->queries) {
3036         PMIX_QUERY_FREE(qcd->queries, qcd->nqueries);
3037     }
3038     if (NULL != qcd->info) {
3039         PMIX_INFO_FREE(qcd->info, qcd->ninfo);
3040     }
3041     PMIX_RELEASE(qcd);
3042     PMIX_RELEASE(cd);
3043     if (NULL != release_fn) {
3044         release_fn(release_cbdata);
3045     }
3046 }
3047 
3048 static void cred_cbfunc(pmix_status_t status,
3049                         pmix_byte_object_t *credential,
3050                         pmix_info_t info[], size_t ninfo,
3051                         void *cbdata)
3052 {
3053     pmix_query_caddy_t *qcd = (pmix_query_caddy_t*)cbdata;
3054     pmix_server_caddy_t *cd = (pmix_server_caddy_t*)qcd->cbdata;
3055     pmix_buffer_t *reply;
3056     pmix_status_t rc;
3057 
3058     pmix_output_verbose(2, pmix_globals.debug_output,
3059                         "pmix:get credential callback with status %d", status);
3060 
3061     reply = PMIX_NEW(pmix_buffer_t);
3062     if (NULL == reply) {
3063         PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
3064         PMIX_RELEASE(cd);
3065         return;
3066     }
3067 
3068     /* pack the status */
3069     PMIX_BFROPS_PACK(rc, cd->peer, reply, &status, 1, PMIX_STATUS);
3070     if (PMIX_SUCCESS != rc) {
3071         PMIX_ERROR_LOG(rc);
3072         goto complete;
3073     }
3074 
3075     if (PMIX_SUCCESS == status) {
3076         /* pack the returned credential */
3077         PMIX_BFROPS_PACK(rc, cd->peer, reply, credential, 1, PMIX_BYTE_OBJECT);
3078         if (PMIX_SUCCESS != rc) {
3079             PMIX_ERROR_LOG(rc);
3080             goto complete;
3081         }
3082 
3083         /* pack any returned data */
3084         PMIX_BFROPS_PACK(rc, cd->peer, reply, &ninfo, 1, PMIX_SIZE);
3085         if (PMIX_SUCCESS != rc) {
3086             PMIX_ERROR_LOG(rc);
3087             goto complete;
3088         }
3089         if (0 < ninfo) {
3090             PMIX_BFROPS_PACK(rc, cd->peer, reply, info, ninfo, PMIX_INFO);
3091             if (PMIX_SUCCESS != rc) {
3092                 PMIX_ERROR_LOG(rc);
3093             }
3094         }
3095     }
3096 
3097   complete:
3098     // send reply
3099     PMIX_SERVER_QUEUE_REPLY(rc, cd->peer, cd->hdr.tag, reply);
3100         if (PMIX_SUCCESS != rc) {
3101         PMIX_RELEASE(reply);
3102     }
3103 
3104     // cleanup
3105     if (NULL != qcd->info) {
3106         PMIX_INFO_FREE(qcd->info, qcd->ninfo);
3107     }
3108     PMIX_RELEASE(qcd);
3109     PMIX_RELEASE(cd);
3110 }
3111 
3112 static void validate_cbfunc(pmix_status_t status,
3113                             pmix_info_t info[], size_t ninfo,
3114                             void *cbdata)
3115 {
3116     pmix_query_caddy_t *qcd = (pmix_query_caddy_t*)cbdata;
3117     pmix_server_caddy_t *cd = (pmix_server_caddy_t*)qcd->cbdata;
3118     pmix_buffer_t *reply;
3119     pmix_status_t rc;
3120 
3121     pmix_output_verbose(2, pmix_globals.debug_output,
3122                         "pmix:validate credential callback with status %d", status);
3123 
3124     reply = PMIX_NEW(pmix_buffer_t);
3125     if (NULL == reply) {
3126         PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
3127         PMIX_RELEASE(cd);
3128         return;
3129     }
3130     PMIX_BFROPS_PACK(rc, cd->peer, reply, &status, 1, PMIX_STATUS);
3131     if (PMIX_SUCCESS != rc) {
3132         PMIX_ERROR_LOG(rc);
3133         goto complete;
3134     }
3135     /* pack any returned data */
3136     PMIX_BFROPS_PACK(rc, cd->peer, reply, &ninfo, 1, PMIX_SIZE);
3137     if (PMIX_SUCCESS != rc) {
3138         PMIX_ERROR_LOG(rc);
3139         goto complete;
3140     }
3141     if (0 < ninfo) {
3142         PMIX_BFROPS_PACK(rc, cd->peer, reply, info, ninfo, PMIX_INFO);
3143         if (PMIX_SUCCESS != rc) {
3144             PMIX_ERROR_LOG(rc);
3145         }
3146     }
3147 
3148   complete:
3149     // send reply
3150     PMIX_SERVER_QUEUE_REPLY(rc, cd->peer, cd->hdr.tag, reply);
3151     if (PMIX_SUCCESS != rc) {
3152         PMIX_RELEASE(reply);
3153     }
3154     // cleanup
3155     if (NULL != qcd->info) {
3156         PMIX_INFO_FREE(qcd->info, qcd->ninfo);
3157     }
3158     PMIX_RELEASE(qcd);
3159     PMIX_RELEASE(cd);
3160 }
3161 
3162 
3163 static void _iofreg(int sd, short args, void *cbdata)
3164 {
3165     pmix_setup_caddy_t *cd = (pmix_setup_caddy_t*)cbdata;
3166     pmix_server_caddy_t *scd = (pmix_server_caddy_t*)cd->cbdata;
3167     pmix_buffer_t *reply;
3168     pmix_status_t rc;
3169 
3170     PMIX_ACQUIRE_OBJECT(cd);
3171 
3172     /* setup the reply to the requestor */
3173     reply = PMIX_NEW(pmix_buffer_t);
3174     if (NULL == reply) {
3175         PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
3176         rc = PMIX_ERR_NOMEM;
3177         goto cleanup;
3178     }
3179     /* start with the status */
3180     PMIX_BFROPS_PACK(rc, scd->peer, reply, &cd->status, 1, PMIX_STATUS);
3181     if (PMIX_SUCCESS != rc) {
3182         PMIX_ERROR_LOG(rc);
3183         PMIX_RELEASE(reply);
3184         goto cleanup;
3185     }
3186 
3187     /* was the request a success? */
3188     if (PMIX_SUCCESS != cd->status) {
3189         /* find and remove the tracker(s) */
3190     }
3191 
3192     pmix_output_verbose(2, pmix_server_globals.iof_output,
3193                         "server:_iofreg reply being sent to %s:%u",
3194                         scd->peer->info->pname.nspace, scd->peer->info->pname.rank);
3195     PMIX_SERVER_QUEUE_REPLY(rc, scd->peer, scd->hdr.tag, reply);
3196     if (PMIX_SUCCESS != rc) {
3197         PMIX_RELEASE(reply);
3198     }
3199 
3200   cleanup:
3201     /* release the cached info */
3202     if (NULL != cd->procs) {
3203         PMIX_PROC_FREE(cd->procs, cd->nprocs);
3204     }
3205     PMIX_INFO_FREE(cd->info, cd->ninfo);
3206     /* we are done */
3207     PMIX_RELEASE(cd);
3208 }
3209 
3210 static void iof_cbfunc(pmix_status_t status,
3211                        void *cbdata)
3212 {
3213     pmix_setup_caddy_t *cd = (pmix_setup_caddy_t*)cbdata;
3214 
3215     pmix_output_verbose(2, pmix_server_globals.iof_output,
3216                         "server:iof_cbfunc called with status %d",
3217                         status);
3218 
3219     if (NULL == cd) {
3220         /* nothing to do */
3221         return;
3222     }
3223     cd->status = status;
3224 
3225     /* need to thread-shift this callback as it accesses global data */
3226     PMIX_THREADSHIFT(cd, _iofreg);
3227 }
3228 
3229 /* the switchyard is the primary message handling function. It's purpose
3230  * is to take incoming commands (packed into a buffer), unpack them,
3231  * and then call the corresponding host server's function to execute
3232  * them. Some commands involve only a single proc (i.e., the one
3233  * sending the command) and can be executed while we wait. In these cases,
3234  * the switchyard will construct and pack a reply buffer to be returned
3235  * to the sender.
3236  *
3237  * Other cases (either multi-process collective or cmds that require
3238  * an async reply) cannot generate an immediate reply. In these cases,
3239  * the reply buffer will be NULL. An appropriate callback function will
3240  * be called that will be responsible for eventually replying to the
3241  * calling processes.
3242  *
3243  * Should an error be encountered at any time within the switchyard, an
3244  * error reply buffer will be returned so that the caller can be notified,
3245  * thereby preventing the process from hanging. */
3246 static pmix_status_t server_switchyard(pmix_peer_t *peer, uint32_t tag,
3247                                        pmix_buffer_t *buf)
3248 {
3249     pmix_status_t rc=PMIX_ERR_NOT_SUPPORTED;
3250     int32_t cnt;
3251     pmix_cmd_t cmd;
3252     pmix_server_caddy_t *cd;
3253     pmix_proc_t proc;
3254     pmix_buffer_t *reply;
3255 
3256     /* retrieve the cmd */
3257     cnt = 1;
3258     PMIX_BFROPS_UNPACK(rc, peer, buf, &cmd, &cnt, PMIX_COMMAND);
3259     if (PMIX_SUCCESS != rc) {
3260         PMIX_ERROR_LOG(rc);
3261         return rc;
3262     }
3263     pmix_output_verbose(2, pmix_server_globals.base_output,
3264                         "recvd pmix cmd %s from %s:%u",
3265                         pmix_command_string(cmd), peer->info->pname.nspace, peer->info->pname.rank);
3266 
3267     if (PMIX_REQ_CMD == cmd) {
3268         reply = PMIX_NEW(pmix_buffer_t);
3269         if (NULL == reply) {
3270             PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
3271             return PMIX_ERR_NOMEM;
3272         }
3273         PMIX_GDS_REGISTER_JOB_INFO(rc, peer, reply);
3274         if (PMIX_SUCCESS != rc) {
3275             PMIX_ERROR_LOG(rc);
3276             return rc;
3277         }
3278         PMIX_SERVER_QUEUE_REPLY(rc, peer, tag, reply);
3279         if (PMIX_SUCCESS != rc) {
3280             PMIX_RELEASE(reply);
3281         }
3282         peer->nptr->ndelivered++;
3283         return PMIX_SUCCESS;
3284     }
3285 
3286     if (PMIX_ABORT_CMD == cmd) {
3287         PMIX_GDS_CADDY(cd, peer, tag);
3288         if (PMIX_SUCCESS != (rc = pmix_server_abort(peer, buf, op_cbfunc, cd))) {
3289             PMIX_RELEASE(cd);
3290         }
3291         return rc;
3292     }
3293 
3294     if (PMIX_COMMIT_CMD == cmd) {
3295         rc = pmix_server_commit(peer, buf);
3296         if (!PMIX_PROC_IS_V1(peer)) {
3297             reply = PMIX_NEW(pmix_buffer_t);
3298             if (NULL == reply) {
3299                 PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
3300                 return PMIX_ERR_NOMEM;
3301             }
3302             PMIX_BFROPS_PACK(rc, peer, reply, &rc, 1, PMIX_STATUS);
3303             if (PMIX_SUCCESS != rc) {
3304                 PMIX_ERROR_LOG(rc);
3305             }
3306             PMIX_SERVER_QUEUE_REPLY(rc, peer, tag, reply);
3307             if (PMIX_SUCCESS != rc) {
3308                 PMIX_RELEASE(reply);
3309             }
3310         }
3311         return PMIX_SUCCESS; // don't reply twice
3312     }
3313 
3314     if (PMIX_FENCENB_CMD == cmd) {
3315         PMIX_GDS_CADDY(cd, peer, tag);
3316         if (PMIX_SUCCESS != (rc = pmix_server_fence(cd, buf, modex_cbfunc, op_cbfunc))) {
3317             PMIX_RELEASE(cd);
3318         }
3319         return rc;
3320     }
3321 
3322     if (PMIX_GETNB_CMD == cmd) {
3323         PMIX_GDS_CADDY(cd, peer, tag);
3324         if (PMIX_SUCCESS != (rc = pmix_server_get(buf, get_cbfunc, cd))) {
3325             PMIX_RELEASE(cd);
3326         }
3327         return rc;
3328     }
3329 
3330     if (PMIX_FINALIZE_CMD == cmd) {
3331         pmix_output_verbose(2, pmix_server_globals.base_output,
3332                             "recvd FINALIZE");
3333         peer->nptr->nfinalized++;
3334         /* purge events */
3335         pmix_server_purge_events(peer, NULL);
3336         /* turn off the recv event - we shouldn't hear anything
3337          * more from this proc */
3338         if (peer->recv_ev_active) {
3339             pmix_event_del(&peer->recv_event);
3340             peer->recv_ev_active = false;
3341         }
3342         PMIX_GDS_CADDY(cd, peer, tag);
3343         /* call the local server, if supported */
3344         if (NULL != pmix_host_server.client_finalized) {
3345             pmix_strncpy(proc.nspace, peer->info->pname.nspace, PMIX_MAX_NSLEN);
3346             proc.rank = peer->info->pname.rank;
3347             /* now tell the host server */
3348             rc = pmix_host_server.client_finalized(&proc, peer->info->server_object,
3349                                                    op_cbfunc2, cd);
3350             if (PMIX_SUCCESS == rc) {
3351                 /* don't reply to them ourselves - we will do so when the host
3352                  * server calls us back */
3353                 return rc;
3354             } else if (PMIX_OPERATION_SUCCEEDED == rc) {
3355                 /* they did it atomically */
3356                 rc = PMIX_SUCCESS;
3357             }
3358             /* if the call doesn't succeed (e.g., they provided the stub
3359              * but return NOT_SUPPORTED), then the callback function
3360              * won't be called, but we still need to cleanup
3361              * any lingering references to this peer and answer
3362              * the client. Thus, we call the callback function ourselves
3363              * in this case */
3364             op_cbfunc2(rc, cd);
3365             /* return SUCCESS as the cbfunc generated the return msg
3366              * and released the cd object */
3367             return PMIX_SUCCESS;
3368         }
3369         /* if the host doesn't provide a client_finalized function,
3370          * we still need to ensure that we cleanup any lingering
3371          * references to this peer. We use the callback function
3372          * here as well to ensure the client gets its required
3373          * response and that we delay before cleaning up the
3374          * connection*/
3375         op_cbfunc2(PMIX_SUCCESS, cd);
3376         /* return SUCCESS as the cbfunc generated the return msg
3377          * and released the cd object */
3378         return PMIX_SUCCESS;
3379     }
3380 
3381 
3382     if (PMIX_PUBLISHNB_CMD == cmd) {
3383         PMIX_GDS_CADDY(cd, peer, tag);
3384         if (PMIX_SUCCESS != (rc = pmix_server_publish(peer, buf, op_cbfunc, cd))) {
3385             PMIX_RELEASE(cd);
3386         }
3387         return rc;
3388     }
3389 
3390 
3391     if (PMIX_LOOKUPNB_CMD == cmd) {
3392         PMIX_GDS_CADDY(cd, peer, tag);
3393         if (PMIX_SUCCESS != (rc = pmix_server_lookup(peer, buf, lookup_cbfunc, cd))) {
3394             PMIX_RELEASE(cd);
3395         }
3396         return rc;
3397     }
3398 
3399 
3400     if (PMIX_UNPUBLISHNB_CMD == cmd) {
3401         PMIX_GDS_CADDY(cd, peer, tag);
3402         if (PMIX_SUCCESS != (rc = pmix_server_unpublish(peer, buf, op_cbfunc, cd))) {
3403             PMIX_RELEASE(cd);
3404         }
3405         return rc;
3406     }
3407 
3408 
3409     if (PMIX_SPAWNNB_CMD == cmd) {
3410         PMIX_GDS_CADDY(cd, peer, tag);
3411         if (PMIX_SUCCESS != (rc = pmix_server_spawn(peer, buf, spawn_cbfunc, cd))) {
3412             PMIX_RELEASE(cd);
3413         }
3414         return rc;
3415     }
3416 
3417 
3418     if (PMIX_CONNECTNB_CMD == cmd) {
3419         PMIX_GDS_CADDY(cd, peer, tag);
3420         rc = pmix_server_connect(cd, buf, cnct_cbfunc);
3421         if (PMIX_SUCCESS != rc) {
3422             PMIX_RELEASE(cd);
3423         }
3424         return rc;
3425     }
3426 
3427     if (PMIX_DISCONNECTNB_CMD == cmd) {
3428         PMIX_GDS_CADDY(cd, peer, tag);
3429         rc = pmix_server_disconnect(cd, buf, discnct_cbfunc);
3430         if (PMIX_SUCCESS != rc) {
3431             PMIX_RELEASE(cd);
3432         }
3433         return rc;
3434     }
3435 
3436     if (PMIX_REGEVENTS_CMD == cmd) {
3437         PMIX_GDS_CADDY(cd, peer, tag);
3438         if (PMIX_SUCCESS != (rc = pmix_server_register_events(peer, buf, regevents_cbfunc, cd))) {
3439             PMIX_RELEASE(cd);
3440         }
3441         return rc;
3442     }
3443 
3444     if (PMIX_DEREGEVENTS_CMD == cmd) {
3445         pmix_server_deregister_events(peer, buf);
3446         return PMIX_SUCCESS;
3447     }
3448 
3449     if (PMIX_NOTIFY_CMD == cmd) {
3450         PMIX_GDS_CADDY(cd, peer, tag);
3451         if (PMIX_SUCCESS != (rc = pmix_server_event_recvd_from_client(peer, buf, notifyerror_cbfunc, cd))) {
3452             PMIX_RELEASE(cd);
3453         }
3454         return rc;
3455     }
3456 
3457     if (PMIX_QUERY_CMD == cmd) {
3458         PMIX_GDS_CADDY(cd, peer, tag);
3459         if (PMIX_SUCCESS != (rc = pmix_server_query(peer, buf, query_cbfunc, cd))) {
3460             PMIX_RELEASE(cd);
3461         }
3462         return rc;
3463     }
3464 
3465     if (PMIX_LOG_CMD == cmd) {
3466         PMIX_GDS_CADDY(cd, peer, tag);
3467         if (PMIX_SUCCESS != (rc = pmix_server_log(peer, buf, op_cbfunc, cd))) {
3468             PMIX_RELEASE(cd);
3469         }
3470         return rc;
3471     }
3472 
3473     if (PMIX_ALLOC_CMD == cmd) {
3474         PMIX_GDS_CADDY(cd, peer, tag);
3475         if (PMIX_SUCCESS != (rc = pmix_server_alloc(peer, buf, alloc_cbfunc, cd))) {
3476             PMIX_RELEASE(cd);
3477         }
3478         return rc;
3479     }
3480 
3481     if (PMIX_JOB_CONTROL_CMD == cmd) {
3482         PMIX_GDS_CADDY(cd, peer, tag);
3483         if (PMIX_SUCCESS != (rc = pmix_server_job_ctrl(peer, buf, jctrl_cbfunc, cd))) {
3484             PMIX_RELEASE(cd);
3485         }
3486         return rc;
3487     }
3488 
3489     if (PMIX_MONITOR_CMD == cmd) {
3490         PMIX_GDS_CADDY(cd, peer, tag);
3491         if (PMIX_SUCCESS != (rc = pmix_server_monitor(peer, buf, monitor_cbfunc, cd))) {
3492             PMIX_RELEASE(cd);
3493         }
3494         return rc;
3495     }
3496 
3497     if (PMIX_GET_CREDENTIAL_CMD == cmd) {
3498         PMIX_GDS_CADDY(cd, peer, tag);
3499         if (PMIX_SUCCESS != (rc = pmix_server_get_credential(peer, buf, cred_cbfunc, cd))) {
3500             PMIX_RELEASE(cd);
3501         }
3502         return rc;
3503     }
3504 
3505     if (PMIX_VALIDATE_CRED_CMD == cmd) {
3506         PMIX_GDS_CADDY(cd, peer, tag);
3507         if (PMIX_SUCCESS != (rc = pmix_server_validate_credential(peer, buf, validate_cbfunc, cd))) {
3508             PMIX_RELEASE(cd);
3509         }
3510         return rc;
3511     }
3512 
3513     if (PMIX_IOF_PULL_CMD == cmd) {
3514         PMIX_GDS_CADDY(cd, peer, tag);
3515         if (PMIX_SUCCESS != (rc = pmix_server_iofreg(peer, buf, iof_cbfunc, cd))) {
3516             PMIX_RELEASE(cd);
3517         }
3518         return rc;
3519     }
3520 
3521     if (PMIX_IOF_PUSH_CMD == cmd) {
3522         PMIX_GDS_CADDY(cd, peer, tag);
3523         if (PMIX_SUCCESS != (rc = pmix_server_iofstdin(peer, buf, op_cbfunc, cd))) {
3524             PMIX_RELEASE(cd);
3525         }
3526         return rc;
3527     }
3528 
3529     if (PMIX_GROUP_CONSTRUCT_CMD == cmd) {
3530         PMIX_GDS_CADDY(cd, peer, tag);
3531         if (PMIX_SUCCESS != (rc = pmix_server_grpconstruct(cd, buf))) {
3532             PMIX_RELEASE(cd);
3533         }
3534         return rc;
3535     }
3536 
3537     if (PMIX_GROUP_DESTRUCT_CMD == cmd) {
3538         PMIX_GDS_CADDY(cd, peer, tag);
3539         if (PMIX_SUCCESS != (rc = pmix_server_grpdestruct(cd, buf))) {
3540             PMIX_RELEASE(cd);
3541         }
3542         return rc;
3543     }
3544 
3545     return PMIX_ERR_NOT_SUPPORTED;
3546 }
3547 
3548 void pmix_server_message_handler(struct pmix_peer_t *pr,
3549                                  pmix_ptl_hdr_t *hdr,
3550                                  pmix_buffer_t *buf, void *cbdata)
3551 {
3552     pmix_peer_t *peer = (pmix_peer_t*)pr;
3553     pmix_buffer_t *reply;
3554     pmix_status_t rc, ret;
3555 
3556     pmix_output_verbose(2, pmix_server_globals.base_output,
3557                         "SWITCHYARD for %s:%u:%d",
3558                         peer->info->pname.nspace,
3559                         peer->info->pname.rank, peer->sd);
3560 
3561     ret = server_switchyard(peer, hdr->tag, buf);
3562     /* send the return, if there was an error returned */
3563     if (PMIX_SUCCESS != ret) {
3564         reply = PMIX_NEW(pmix_buffer_t);
3565         if (NULL == reply) {
3566             PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
3567             return;
3568         }
3569         if (PMIX_OPERATION_SUCCEEDED == ret) {
3570             ret = PMIX_SUCCESS;
3571         }
3572         PMIX_BFROPS_PACK(rc, pr, reply, &ret, 1, PMIX_STATUS);
3573         if (PMIX_SUCCESS != rc) {
3574             PMIX_ERROR_LOG(rc);
3575         }
3576         PMIX_SERVER_QUEUE_REPLY(rc, peer, hdr->tag, reply);
3577         if (PMIX_SUCCESS != rc) {
3578             PMIX_RELEASE(reply);
3579         }
3580     }
3581 }

/* [<][>][^][v][top][bottom][index][help] */