root/opal/mca/pmix/pmix4x/pmix/src/server/pmix_server_ops.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. bufdes
  2. pmix_server_abort
  3. pmix_server_commit
  4. get_tracker
  5. new_tracker
  6. fence_timeout
  7. _collect_data
  8. pmix_server_fence
  9. opcbfunc
  10. pmix_server_publish
  11. lkcbfunc
  12. pmix_server_lookup
  13. pmix_server_unpublish
  14. spcbfunc
  15. pmix_server_spawn
  16. pmix_server_disconnect
  17. connect_timeout
  18. pmix_server_connect
  19. _check_cached_events
  20. regevopcbfunc
  21. pmix_server_register_events
  22. pmix_server_deregister_events
  23. local_cbfunc
  24. intermed_step
  25. pmix_server_event_recvd_from_client
  26. pmix_server_query
  27. logcbfn
  28. pmix_server_log
  29. pmix_server_alloc
  30. pmix_server_job_ctrl
  31. pmix_server_monitor
  32. pmix_server_get_credential
  33. pmix_server_validate_credential
  34. pmix_server_iofreg
  35. stdcbfunc
  36. pmix_server_iofstdin
  37. grp_timeout
  38. _grpcbfunc
  39. grpcbfunc
  40. pmix_server_grpconstruct
  41. pmix_server_grpdestruct
  42. tcon
  43. tdes
  44. cdcon
  45. cddes
  46. scadcon
  47. scaddes
  48. ncon
  49. ndes
  50. dmcon
  51. dmdes
  52. dmrqcon
  53. dmrqdes
  54. lmcon
  55. lmdes
  56. prevcon
  57. prevdes
  58. regcon
  59. regdes
  60. ilcon
  61. ildes
  62. grcon
  63. grdes
  64. iocon
  65. iodes

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2014-2019 Intel, Inc.  All rights reserved.
   4  * Copyright (c) 2014-2019 Research Organization for Information Science
   5  *                         and Technology (RIST).  All rights reserved.
   6  * Copyright (c) 2014-2015 Artem Y. Polyakov <artpol84@gmail.com>.
   7  *                         All rights reserved.
   8  * Copyright (c) 2016-2019 Mellanox Technologies, Inc.
   9  *                         All rights reserved.
  10  * Copyright (c) 2016      IBM Corporation.  All rights reserved.
  11  * $COPYRIGHT$
  12  *
  13  * Additional copyrights may follow
  14  *
  15  * $HEADER$
  16  */
  17 
  18 #include <src/include/pmix_config.h>
  19 
  20 #include <src/include/pmix_stdint.h>
  21 #include <src/include/pmix_socket_errno.h>
  22 
  23 #include <pmix_server.h>
  24 #include "src/include/pmix_globals.h"
  25 
  26 #ifdef HAVE_STRING_H
  27 #include <string.h>
  28 #endif
  29 #ifdef HAVE_SYS_STAT_H
  30 #include <sys/stat.h>
  31 #endif
  32 #include <fcntl.h>
  33 #ifdef HAVE_UNISTD_H
  34 #include <unistd.h>
  35 #endif
  36 #ifdef HAVE_SYS_SOCKET_H
  37 #include <sys/socket.h>
  38 #endif
  39 #ifdef HAVE_SYS_UN_H
  40 #include <sys/un.h>
  41 #endif
  42 #ifdef HAVE_SYS_UIO_H
  43 #include <sys/uio.h>
  44 #endif
  45 #ifdef HAVE_SYS_TYPES_H
  46 #include <sys/types.h>
  47 #endif
  48 #ifdef HAVE_TIME_H
  49 #include <time.h>
  50 #endif
  51 #include PMIX_EVENT_HEADER
  52 
  53 #include "src/class/pmix_hotel.h"
  54 #include "src/class/pmix_list.h"
  55 #include "src/common/pmix_attributes.h"
  56 #include "src/mca/bfrops/bfrops.h"
  57 #include "src/mca/plog/plog.h"
  58 #include "src/mca/psensor/psensor.h"
  59 #include "src/util/argv.h"
  60 #include "src/util/error.h"
  61 #include "src/util/output.h"
  62 #include "src/util/pmix_environ.h"
  63 #include "src/mca/gds/base/base.h"
  64 
  65 #include "pmix_server_ops.h"
  66 
  67 /* The rank_blob_t type to collect processes blobs,
  68  * this list afterward will form a node modex blob. */
  69 typedef struct {
  70     pmix_list_item_t super;
  71     pmix_buffer_t *buf;
  72 } rank_blob_t;
  73 
  74 static void bufdes(rank_blob_t *p)
  75 {
  76     PMIX_RELEASE(p);
  77 }
  78 static PMIX_CLASS_INSTANCE(rank_blob_t,
  79                            pmix_list_item_t,
  80                            NULL, bufdes);
  81 
  82 pmix_server_module_t pmix_host_server = {0};
  83 
  84 pmix_status_t pmix_server_abort(pmix_peer_t *peer, pmix_buffer_t *buf,
  85                                 pmix_op_cbfunc_t cbfunc, void *cbdata)
  86 {
  87     int32_t cnt;
  88     pmix_status_t rc;
  89     int status;
  90     char *msg;
  91     size_t nprocs;
  92     pmix_proc_t *procs = NULL;
  93     pmix_proc_t proc;
  94 
  95     pmix_output_verbose(2, pmix_server_globals.base_output, "recvd ABORT");
  96 
  97     /* unpack the status */
  98     cnt = 1;
  99     PMIX_BFROPS_UNPACK(rc, peer, buf, &status, &cnt, PMIX_STATUS);
 100     if (PMIX_SUCCESS != rc) {
 101         return rc;
 102     }
 103     /* unpack the message */
 104     cnt = 1;
 105     PMIX_BFROPS_UNPACK(rc, peer, buf, &msg, &cnt, PMIX_STRING);
 106     if (PMIX_SUCCESS != rc) {
 107         return rc;
 108     }
 109     /* unpack the number of procs */
 110     cnt = 1;
 111     PMIX_BFROPS_UNPACK(rc, peer, buf, &nprocs, &cnt, PMIX_SIZE);
 112     if (PMIX_SUCCESS != rc) {
 113         return rc;
 114     }
 115 
 116     /* unpack any provided procs - these are the procs the caller
 117      * wants aborted */
 118     if (0 < nprocs) {
 119         PMIX_PROC_CREATE(procs, nprocs);
 120         if (NULL == procs) {
 121             if (NULL != msg) {
 122                 free(msg);
 123             }
 124             return PMIX_ERR_NOMEM;
 125         }
 126         cnt = nprocs;
 127         PMIX_BFROPS_UNPACK(rc, peer, buf, procs, &cnt, PMIX_PROC);
 128         if (PMIX_SUCCESS != rc) {
 129             if (NULL != msg) {
 130                 free(msg);
 131             }
 132             return rc;
 133         }
 134     }
 135 
 136     /* let the local host's server execute it */
 137     if (NULL != pmix_host_server.abort) {
 138         pmix_strncpy(proc.nspace, peer->info->pname.nspace, PMIX_MAX_NSLEN);
 139         proc.rank = peer->info->pname.rank;
 140         rc = pmix_host_server.abort(&proc, peer->info->server_object, status, msg,
 141                                     procs, nprocs, cbfunc, cbdata);
 142     } else {
 143         rc = PMIX_ERR_NOT_SUPPORTED;
 144     }
 145     PMIX_PROC_FREE(procs, nprocs);
 146 
 147     /* the client passed this msg to us so we could give
 148      * it to the host server - we are done with it now */
 149     if (NULL != msg) {
 150         free(msg);
 151     }
 152 
 153     return rc;
 154 }
 155 
 156 pmix_status_t pmix_server_commit(pmix_peer_t *peer, pmix_buffer_t *buf)
 157 {
 158     int32_t cnt;
 159     pmix_status_t rc;
 160     pmix_buffer_t b2, pbkt;
 161     pmix_kval_t *kp;
 162     pmix_scope_t scope;
 163     pmix_namespace_t *nptr;
 164     pmix_rank_info_t *info;
 165     pmix_proc_t proc;
 166     pmix_dmdx_remote_t *dcd, *dcdnext;
 167     char *data;
 168     size_t sz;
 169     pmix_cb_t cb;
 170 
 171     /* shorthand */
 172     info = peer->info;
 173     nptr = peer->nptr;
 174     pmix_strncpy(proc.nspace, nptr->nspace, PMIX_MAX_NSLEN);
 175     proc.rank = info->pname.rank;
 176 
 177     pmix_output_verbose(2, pmix_server_globals.base_output,
 178                         "%s:%d EXECUTE COMMIT FOR %s:%d",
 179                         pmix_globals.myid.nspace,
 180                         pmix_globals.myid.rank,
 181                         nptr->nspace, info->pname.rank);
 182 
 183     /* this buffer will contain one or more buffers, each
 184      * representing a different scope. These need to be locally
 185      * stored separately so we can provide required data based
 186      * on the requestor's location */
 187     cnt = 1;
 188     PMIX_BFROPS_UNPACK(rc, peer, buf, &scope, &cnt, PMIX_SCOPE);
 189     while (PMIX_SUCCESS == rc) {
 190         /* unpack and store the blob */
 191         cnt = 1;
 192         PMIX_CONSTRUCT(&b2, pmix_buffer_t);
 193         PMIX_BFROPS_ASSIGN_TYPE(peer, &b2);
 194         PMIX_BFROPS_UNPACK(rc, peer, buf, &b2, &cnt, PMIX_BUFFER);
 195         if (PMIX_SUCCESS != rc) {
 196             PMIX_ERROR_LOG(rc);
 197             return rc;
 198         }
 199         /* unpack the buffer and store the values - we store them
 200          * in this peer's native GDS component so that other local
 201          * procs from that nspace can access it */
 202         kp = PMIX_NEW(pmix_kval_t);
 203         cnt = 1;
 204         PMIX_BFROPS_UNPACK(rc, peer, &b2, kp, &cnt, PMIX_KVAL);
 205         while (PMIX_SUCCESS == rc) {
 206             if( PMIX_LOCAL == scope || PMIX_GLOBAL == scope){
 207                 PMIX_GDS_STORE_KV(rc, peer, &proc, scope, kp);
 208                 if (PMIX_SUCCESS != rc) {
 209                     PMIX_ERROR_LOG(rc);
 210                     PMIX_RELEASE(kp);
 211                     PMIX_DESTRUCT(&b2);
 212                     return rc;
 213                 }
 214             }
 215             if (PMIX_REMOTE == scope || PMIX_GLOBAL == scope) {
 216                 PMIX_GDS_STORE_KV(rc, pmix_globals.mypeer, &proc, scope, kp);
 217                 if (PMIX_SUCCESS != rc) {
 218                     PMIX_ERROR_LOG(rc);
 219                     PMIX_RELEASE(kp);
 220                     PMIX_DESTRUCT(&b2);
 221                     return rc;
 222                 }
 223             }
 224             PMIX_RELEASE(kp);  // maintain accounting
 225             kp = PMIX_NEW(pmix_kval_t);
 226             cnt = 1;
 227             PMIX_BFROPS_UNPACK(rc, peer, &b2, kp, &cnt, PMIX_KVAL);
 228 
 229         }
 230         PMIX_RELEASE(kp);   // maintain accounting
 231         PMIX_DESTRUCT(&b2);
 232         if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
 233             PMIX_ERROR_LOG(rc);
 234             return rc;
 235         }
 236         cnt = 1;
 237         PMIX_BFROPS_UNPACK(rc, peer, buf, &scope, &cnt, PMIX_SCOPE);
 238     }
 239     if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
 240         PMIX_ERROR_LOG(rc);
 241         return rc;
 242     }
 243     rc = PMIX_SUCCESS;
 244     /* mark us as having successfully received a blob from this proc */
 245     info->modex_recvd = true;
 246 
 247     /* update the commit counter */
 248     peer->commit_cnt++;
 249 
 250     /* see if anyone remote is waiting on this data - could be more than one */
 251     PMIX_LIST_FOREACH_SAFE(dcd, dcdnext, &pmix_server_globals.remote_pnd, pmix_dmdx_remote_t) {
 252         if (0 != strncmp(dcd->cd->proc.nspace, nptr->nspace, PMIX_MAX_NSLEN)) {
 253             continue;
 254         }
 255         if (dcd->cd->proc.rank == info->pname.rank) {
 256            /* we can now fulfill this request - collect the
 257              * remote/global data from this proc - note that there
 258              * may not be a contribution */
 259             data = NULL;
 260             sz = 0;
 261             PMIX_CONSTRUCT(&cb, pmix_cb_t);
 262             cb.proc = &proc;
 263             cb.scope = PMIX_REMOTE;
 264             cb.copy = true;
 265             PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, &cb);
 266             if (PMIX_SUCCESS == rc) {
 267                 /* package it up */
 268                 PMIX_CONSTRUCT(&pbkt, pmix_buffer_t);
 269                 PMIX_LIST_FOREACH(kp, &cb.kvs, pmix_kval_t) {
 270                     /* we pack this in our native BFROPS form as it
 271                      * will be sent to another daemon */
 272                     PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &pbkt, kp, 1, PMIX_KVAL);
 273                 }
 274                 PMIX_UNLOAD_BUFFER(&pbkt, data, sz);
 275             }
 276             PMIX_DESTRUCT(&cb);
 277             /* execute the callback */
 278             dcd->cd->cbfunc(rc, data, sz, dcd->cd->cbdata);
 279             if (NULL != data) {
 280                 free(data);
 281             }
 282             /* we have finished this request */
 283             pmix_list_remove_item(&pmix_server_globals.remote_pnd, &dcd->super);
 284             PMIX_RELEASE(dcd);
 285         }
 286     }
 287     /* see if anyone local is waiting on this data- could be more than one */
 288     rc = pmix_pending_resolve(nptr, info->pname.rank, PMIX_SUCCESS, NULL);
 289     if (PMIX_SUCCESS != rc) {
 290         PMIX_ERROR_LOG(rc);
 291     }
 292     return rc;
 293 }
 294 
 295 /* get an existing object for tracking LOCAL participation in a collective
 296  * operation such as "fence". The only way this function can be
 297  * called is if at least one local client process is participating
 298  * in the operation. Thus, we know that at least one process is
 299  * involved AND has called the collective operation.
 300  *
 301  * NOTE: the host server *cannot* call us with a collective operation
 302  * as there is no mechanism by which it can do so. We call the host
 303  * server only after all participating local procs have called us.
 304  * So it is impossible for us to be called with a collective without
 305  * us already knowing about all local participants.
 306  *
 307  * procs - the array of procs participating in the collective,
 308  *         regardless of location
 309  * nprocs - the number of procs in the array
 310  */
 311 static pmix_server_trkr_t* get_tracker(char *id, pmix_proc_t *procs,
 312                                        size_t nprocs, pmix_cmd_t type)
 313 {
 314     pmix_server_trkr_t *trk;
 315     size_t i, j;
 316     size_t matches;
 317 
 318     pmix_output_verbose(5, pmix_server_globals.base_output,
 319                         "get_tracker called with %d procs", (int)nprocs);
 320 
 321     /* bozo check - should never happen outside of programmer error */
 322     if (NULL == procs && NULL == id) {
 323         PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
 324         return NULL;
 325     }
 326 
 327     /* there is no shortcut way to search the trackers - all
 328      * we can do is perform a brute-force search. Fortunately,
 329      * it is highly unlikely that there will be more than one
 330      * or two active at a time, and they are most likely to
 331      * involve only a single proc with WILDCARD rank - so this
 332      * shouldn't take long */
 333     PMIX_LIST_FOREACH(trk, &pmix_server_globals.collectives, pmix_server_trkr_t) {
 334         /* Collective operation if unique identified by
 335          * the set of participating processes and the type of collective,
 336          * or by the operation ID
 337          */
 338         if (NULL != id) {
 339             if (NULL != trk->id && 0 == strcmp(id, trk->id)) {
 340                 return trk;
 341             }
 342         } else {
 343             if (nprocs != trk->npcs) {
 344                 continue;
 345             }
 346             if (type != trk->type) {
 347                 continue;
 348             }
 349             matches = 0;
 350             for (i=0; i < nprocs; i++) {
 351                 /* the procs may be in different order, so we have
 352                  * to do an exhaustive search */
 353                 for (j=0; j < trk->npcs; j++) {
 354                     if (0 == strcmp(procs[i].nspace, trk->pcs[j].nspace) &&
 355                         procs[i].rank == trk->pcs[j].rank) {
 356                         ++matches;
 357                         break;
 358                     }
 359                 }
 360             }
 361             if (trk->npcs == matches) {
 362                 return trk;
 363             }
 364         }
 365     }
 366     /* No tracker was found */
 367     return NULL;
 368 }
 369 
 370 /* create a new object for tracking LOCAL participation in a collective
 371  * operation such as "fence". The only way this function can be
 372  * called is if at least one local client process is participating
 373  * in the operation. Thus, we know that at least one process is
 374  * involved AND has called the collective operation.
 375  *
 376  * NOTE: the host server *cannot* call us with a collective operation
 377  * as there is no mechanism by which it can do so. We call the host
 378  * server only after all participating local procs have called us.
 379  * So it is impossible for us to be called with a collective without
 380  * us already knowing about all local participants.
 381  *
 382  * procs - the array of procs participating in the collective,
 383  *         regardless of location
 384  * nprocs - the number of procs in the array
 385  */
 386 static pmix_server_trkr_t* new_tracker(char *id, pmix_proc_t *procs,
 387                                        size_t nprocs, pmix_cmd_t type)
 388 {
 389     pmix_server_trkr_t *trk;
 390     size_t i;
 391     bool all_def;
 392     pmix_namespace_t *nptr, *ns;
 393     pmix_rank_info_t *info;
 394     pmix_nspace_caddy_t *nm;
 395 
 396     pmix_output_verbose(5, pmix_server_globals.base_output,
 397                         "new_tracker called with %d procs", (int)nprocs);
 398 
 399     /* bozo check - should never happen outside of programmer error */
 400     if (NULL == procs) {
 401         PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
 402         return NULL;
 403     }
 404 
 405     pmix_output_verbose(5, pmix_server_globals.base_output,
 406                         "adding new tracker %s with %d procs",
 407                         (NULL == id) ? "NO-ID" : id, (int)nprocs);
 408 
 409     /* this tracker is new - create it */
 410     trk = PMIX_NEW(pmix_server_trkr_t);
 411     if (NULL == trk) {
 412         PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
 413         return NULL;
 414     }
 415 
 416     if (NULL != id) {
 417         trk->id = strdup(id);
 418     }
 419 
 420     if (NULL != procs) {
 421         /* copy the procs */
 422         PMIX_PROC_CREATE(trk->pcs, nprocs);
 423         if (NULL == trk->pcs) {
 424             PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
 425             PMIX_RELEASE(trk);
 426             return NULL;
 427         }
 428         memcpy(trk->pcs, procs, nprocs * sizeof(pmix_proc_t));
 429         trk->npcs = nprocs;
 430     }
 431     trk->type = type;
 432 
 433     all_def = true;
 434     for (i=0; i < nprocs; i++) {
 435         if (NULL == id) {
 436             pmix_strncpy(trk->pcs[i].nspace, procs[i].nspace, PMIX_MAX_NSLEN);
 437             trk->pcs[i].rank = procs[i].rank;
 438         }
 439         if (!all_def) {
 440             continue;
 441         }
 442         /* is this nspace known to us? */
 443         nptr = NULL;
 444         PMIX_LIST_FOREACH(ns, &pmix_server_globals.nspaces, pmix_namespace_t) {
 445             if (0 == strcmp(procs[i].nspace, ns->nspace)) {
 446                 nptr = ns;
 447                 break;
 448             }
 449         }
 450         if (NULL == nptr) {
 451             /* cannot be a local proc */
 452             pmix_output_verbose(5, pmix_server_globals.base_output,
 453                                 "new_tracker: unknown nspace %s",
 454                                 procs[i].nspace);
 455             continue;
 456         }
 457         /* check and add uniq ns into trk nslist */
 458         PMIX_LIST_FOREACH(nm, &trk->nslist, pmix_nspace_caddy_t) {
 459             if (0 == strcmp(nptr->nspace, nm->ns->nspace)) {
 460                 break;
 461             }
 462         }
 463         if ((pmix_nspace_caddy_t*)pmix_list_get_end(&trk->nslist) == nm) {
 464             nm = PMIX_NEW(pmix_nspace_caddy_t);
 465             PMIX_RETAIN(nptr);
 466             nm->ns = nptr;
 467             pmix_list_append(&trk->nslist, &nm->super);
 468         }
 469 
 470         /* have all the clients for this nspace been defined? */
 471         if (!nptr->all_registered) {
 472             /* nope, so no point in going further on this one - we'll
 473              * process it once all the procs are known */
 474             all_def = false;
 475             pmix_output_verbose(5, pmix_server_globals.base_output,
 476                                 "new_tracker: all clients not registered nspace %s",
 477                                 procs[i].nspace);
 478             /* we have to continue processing the list of procs
 479              * to setup the trk->pcs array, so don't break out
 480              * of the loop */
 481         }
 482         /* is this one of my local ranks? */
 483         PMIX_LIST_FOREACH(info, &nptr->ranks, pmix_rank_info_t) {
 484             if (procs[i].rank == info->pname.rank ||
 485                 PMIX_RANK_WILDCARD == procs[i].rank) {
 486                     pmix_output_verbose(5, pmix_server_globals.base_output,
 487                                         "adding local proc %s.%d to tracker",
 488                                         info->pname.nspace, info->pname.rank);
 489                 /* track the count */
 490                 ++trk->nlocal;
 491                 if (PMIX_RANK_WILDCARD != procs[i].rank) {
 492                     break;
 493                 }
 494             }
 495         }
 496     }
 497     if (all_def) {
 498         trk->def_complete = true;
 499     }
 500     pmix_list_append(&pmix_server_globals.collectives, &trk->super);
 501     return trk;
 502 }
 503 
 504 static void fence_timeout(int sd, short args, void *cbdata)
 505 {
 506     pmix_server_caddy_t *cd = (pmix_server_caddy_t*)cbdata;
 507 
 508     pmix_output_verbose(2, pmix_server_globals.fence_output,
 509                         "ALERT: fence timeout fired");
 510 
 511     /* execute the provided callback function with the error */
 512     if (NULL != cd->trk->modexcbfunc) {
 513         cd->trk->modexcbfunc(PMIX_ERR_TIMEOUT, NULL, 0, cd->trk, NULL, NULL);
 514         return;  // the cbfunc will have cleaned up the tracker
 515     }
 516     cd->event_active = false;
 517     /* remove it from the list */
 518     pmix_list_remove_item(&cd->trk->local_cbs, &cd->super);
 519     PMIX_RELEASE(cd);
 520 }
 521 
 522 static pmix_status_t _collect_data(pmix_server_trkr_t *trk,
 523                                    pmix_buffer_t *buf)
 524 {
 525     pmix_buffer_t bucket, *pbkt = NULL;
 526     pmix_cb_t cb;
 527     pmix_kval_t *kv;
 528     pmix_byte_object_t bo;
 529     pmix_server_caddy_t *scd;
 530     pmix_proc_t pcs;
 531     pmix_status_t rc = PMIX_SUCCESS;
 532     pmix_rank_t rel_rank;
 533     pmix_nspace_caddy_t *nm;
 534     bool found;
 535     pmix_list_t rank_blobs;
 536     rank_blob_t *blob;
 537     uint32_t kmap_size;
 538     /* key names map, the position of the key name
 539      * in the array determines the unique key index */
 540     char **kmap = NULL;
 541     int i;
 542     pmix_gds_modex_blob_info_t blob_info_byte = 0;
 543     pmix_gds_modex_key_fmt_t kmap_type = PMIX_MODEX_KEY_INVALID;
 544 
 545     PMIX_CONSTRUCT(&bucket, pmix_buffer_t);
 546 
 547     if (PMIX_COLLECT_YES == trk->collect_type) {
 548         pmix_output_verbose(2, pmix_server_globals.fence_output,
 549                             "fence - assembling data");
 550 
 551         /* Evaluate key names sizes and their count to select
 552          * a format to store key names:
 553          * - keymap: use key-map in blob header for key-name resolve
 554          *   from idx: key names stored as indexes (avoid key duplication)
 555          * - regular: key-names stored as is */
 556         if (PMIX_MODEX_KEY_INVALID == kmap_type) {
 557             size_t key_fmt_size[PMIX_MODEX_KEY_MAX] = {0};
 558             pmix_value_array_t *key_count_array = PMIX_NEW(pmix_value_array_t);
 559             uint32_t *key_count = NULL;
 560 
 561             pmix_value_array_init(key_count_array, sizeof(uint32_t));
 562 
 563             PMIX_LIST_FOREACH(scd, &trk->local_cbs, pmix_server_caddy_t) {
 564                 pmix_strncpy(pcs.nspace, scd->peer->info->pname.nspace,
 565                              PMIX_MAX_NSLEN);
 566                 pcs.rank = scd->peer->info->pname.rank;
 567                 PMIX_CONSTRUCT(&cb, pmix_cb_t);
 568                 cb.proc = &pcs;
 569                 cb.scope = PMIX_REMOTE;
 570                 cb.copy = true;
 571                 PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, &cb);
 572                 if (PMIX_SUCCESS == rc) {
 573                     int key_idx;
 574                     PMIX_LIST_FOREACH(kv, &cb.kvs, pmix_kval_t) {
 575                         rc = pmix_argv_append_unique_idx(&key_idx, &kmap,
 576                                                          kv->key);
 577                         if (pmix_value_array_get_size(key_count_array) <
 578                                 (size_t)(key_idx+1)) {
 579                             size_t new_size;
 580                             size_t old_size =
 581                                     pmix_value_array_get_size(key_count_array);
 582 
 583                             pmix_value_array_set_size(key_count_array,
 584                                                       key_idx+1);
 585                             new_size =
 586                                     pmix_value_array_get_size(key_count_array);
 587                             key_count =
 588                                     PMIX_VALUE_ARRAY_GET_BASE(key_count_array,
 589                                                               uint32_t);
 590                             memset(key_count + old_size, 0, sizeof(uint32_t) *
 591                                    (new_size - old_size));
 592                         }
 593                         key_count = PMIX_VALUE_ARRAY_GET_BASE(key_count_array,
 594                                                               uint32_t);
 595                         key_count[key_idx]++;
 596                     }
 597                 }
 598             }
 599 
 600             key_count = PMIX_VALUE_ARRAY_GET_BASE(key_count_array, uint32_t);
 601 
 602             for (i = 0; i < pmix_argv_count(kmap); i++) {
 603                 pmix_buffer_t tmp;
 604                 size_t kname_size;
 605                 size_t kidx_size;
 606 
 607                 PMIX_CONSTRUCT(&tmp, pmix_buffer_t);
 608                 PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &tmp, &kmap[i], 1,
 609                                  PMIX_STRING);
 610                 kname_size = tmp.bytes_used;
 611                 PMIX_DESTRUCT(&tmp);
 612                 PMIX_CONSTRUCT(&tmp, pmix_buffer_t);
 613                 PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &tmp, &i, 1,
 614                                  PMIX_UINT32);
 615                 kidx_size = tmp.bytes_used;
 616                 PMIX_DESTRUCT(&tmp);
 617 
 618                 /* calculate the key names sizes */
 619                 key_fmt_size[PMIX_MODEX_KEY_NATIVE_FMT] =
 620                         kname_size * key_count[i];
 621                 key_fmt_size[PMIX_MODEX_KEY_KEYMAP_FMT] =
 622                         kname_size + key_count[i]*kidx_size;
 623             }
 624             PMIX_RELEASE(key_count_array);
 625 
 626             /* select the most efficient key-name pack format */
 627             kmap_type = key_fmt_size[PMIX_MODEX_KEY_NATIVE_FMT] >
 628                         key_fmt_size[PMIX_MODEX_KEY_KEYMAP_FMT] ?
 629                         PMIX_MODEX_KEY_KEYMAP_FMT : PMIX_MODEX_KEY_NATIVE_FMT;
 630             pmix_output_verbose(5, pmix_server_globals.base_output,
 631                                 "key packing type %s",
 632                                 kmap_type == PMIX_MODEX_KEY_KEYMAP_FMT ?
 633                                     "kmap" : "native");
 634         }
 635         PMIX_CONSTRUCT(&rank_blobs, pmix_list_t);
 636         PMIX_LIST_FOREACH(scd, &trk->local_cbs, pmix_server_caddy_t) {
 637             /* get any remote contribution - note that there
 638              * may not be a contribution */
 639             pmix_strncpy(pcs.nspace, scd->peer->info->pname.nspace,
 640                          PMIX_MAX_NSLEN);
 641             pcs.rank = scd->peer->info->pname.rank;
 642             PMIX_CONSTRUCT(&cb, pmix_cb_t);
 643             cb.proc = &pcs;
 644             cb.scope = PMIX_REMOTE;
 645             cb.copy = true;
 646             PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, &cb);
 647             if (PMIX_SUCCESS == rc) {
 648                 /* calculate the throughout rank */
 649                 rel_rank = 0;
 650                 found = false;
 651                 if (pmix_list_get_size(&trk->nslist) == 1) {
 652                     found = true;
 653                 } else {
 654                     PMIX_LIST_FOREACH(nm, &trk->nslist, pmix_nspace_caddy_t) {
 655                         if (0 == strcmp(nm->ns->nspace, pcs.nspace)) {
 656                             found = true;
 657                             break;
 658                         }
 659                         rel_rank += nm->ns->nprocs;
 660                     }
 661                 }
 662                 if (false == found) {
 663                     rc = PMIX_ERR_NOT_FOUND;
 664                     PMIX_ERROR_LOG(rc);
 665                     PMIX_DESTRUCT(&cb);
 666                     PMIX_DESTRUCT(&rank_blobs);
 667                     goto cleanup;
 668                 }
 669                 rel_rank += pcs.rank;
 670 
 671                 /* pack the relative rank */
 672                 pbkt = PMIX_NEW(pmix_buffer_t);
 673                 PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, pbkt,
 674                                  &rel_rank, 1, PMIX_PROC_RANK);
 675                 if (PMIX_SUCCESS != rc) {
 676                     PMIX_ERROR_LOG(rc);
 677                     PMIX_DESTRUCT(&cb);
 678                     PMIX_DESTRUCT(&rank_blobs);
 679                     PMIX_RELEASE(pbkt);
 680                     goto cleanup;
 681                 }
 682                 /* pack the returned kval's */
 683                 PMIX_LIST_FOREACH(kv, &cb.kvs, pmix_kval_t) {
 684                     rc = pmix_gds_base_modex_pack_kval(kmap_type, pbkt, &kmap,
 685                                                        kv);
 686                     if (rc != PMIX_SUCCESS) {
 687                         PMIX_ERROR_LOG(rc);
 688                         PMIX_DESTRUCT(&cb);
 689                         PMIX_DESTRUCT(&rank_blobs);
 690                         PMIX_RELEASE(pbkt);
 691                         goto cleanup;
 692                     }
 693                 }
 694 
 695                 /* add part of the process modex to the list */
 696                 blob = PMIX_NEW(rank_blob_t);
 697                 blob->buf = pbkt;
 698                 pmix_list_append(&rank_blobs, &blob->super);
 699                 pbkt = NULL;
 700             }
 701             PMIX_DESTRUCT(&cb);
 702         }
 703         /* mark the collection type so we can check on the
 704          * receiving end that all participants did the same. Note
 705          * that if the receiving end thinks that the collect flag
 706          * is false, then store_modex will not be called on that
 707          * node and this information (and the flag) will be ignored,
 708          * meaning that no error is generated! */
 709         blob_info_byte |= PMIX_GDS_COLLECT_BIT;
 710         if (PMIX_MODEX_KEY_KEYMAP_FMT == kmap_type) {
 711             blob_info_byte |= PMIX_GDS_KEYMAP_BIT;
 712         }
 713         /* pack the modex blob info byte */
 714         PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &bucket,
 715                          &blob_info_byte, 1, PMIX_BYTE);
 716 
 717         if (PMIX_MODEX_KEY_KEYMAP_FMT == kmap_type) {
 718             /* pack node part of modex to `bucket` */
 719             /* pack the key names map for the remote server can
 720              * use it to match key names by index */
 721             kmap_size = pmix_argv_count(kmap);
 722             if (0 < kmap_size) {
 723                 PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &bucket,
 724                                  &kmap_size, 1, PMIX_UINT32);
 725                 PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &bucket,
 726                                  kmap, kmap_size, PMIX_STRING);
 727             }
 728         }
 729         /* pack the collected blobs of processes */
 730         PMIX_LIST_FOREACH(blob, &rank_blobs, rank_blob_t) {
 731             /* extract the blob */
 732             PMIX_UNLOAD_BUFFER(blob->buf, bo.bytes, bo.size);
 733             /* pack the returned blob */
 734             PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &bucket,
 735                              &bo, 1, PMIX_BYTE_OBJECT);
 736             PMIX_BYTE_OBJECT_DESTRUCT(&bo); // releases the data
 737             if (PMIX_SUCCESS != rc) {
 738                 PMIX_ERROR_LOG(rc);
 739                 goto cleanup;
 740             }
 741         }
 742         PMIX_DESTRUCT(&rank_blobs);
 743     } else {
 744         /* mark the collection type so we can check on the
 745          * receiving end that all participants did the same.
 746          * Don't do it for non-debug mode so we don't unnecessarily
 747          * send the collection bucket. The mdxcbfunc in the
 748          * server only calls store_modex if the local collect
 749          * flag is set to true. In debug mode, this check will
 750          * cause the store_modex function to see that this node
 751          * thought the collect flag was not set, and therefore
 752          * generate an error */
 753 #if PMIX_ENABLE_DEBUG
 754         /* pack the modex blob info byte */
 755         PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &bucket,
 756                          &blob_info_byte, 1, PMIX_BYTE);
 757 #endif
 758     }
 759     if (!PMIX_BUFFER_IS_EMPTY(&bucket)) {
 760         /* because the remote servers have to unpack things
 761          * in chunks, we have to pack the bucket as a single
 762          * byte object to allow remote unpack */
 763         PMIX_UNLOAD_BUFFER(&bucket, bo.bytes, bo.size);
 764         PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, buf,
 765                          &bo, 1, PMIX_BYTE_OBJECT);
 766         PMIX_BYTE_OBJECT_DESTRUCT(&bo);  // releases the data
 767         if (PMIX_SUCCESS != rc) {
 768             PMIX_ERROR_LOG(rc);
 769         }
 770     }
 771 
 772   cleanup:
 773     PMIX_DESTRUCT(&bucket);
 774     pmix_argv_free(kmap);
 775     return rc;
 776 }
 777 
 778 pmix_status_t pmix_server_fence(pmix_server_caddy_t *cd,
 779                                 pmix_buffer_t *buf,
 780                                 pmix_modex_cbfunc_t modexcbfunc,
 781                                 pmix_op_cbfunc_t opcbfunc)
 782 {
 783     int32_t cnt;
 784     pmix_status_t rc;
 785     size_t nprocs;
 786     pmix_proc_t *procs=NULL, *newprocs;
 787     bool collect_data = false;
 788     pmix_server_trkr_t *trk;
 789     char *data = NULL;
 790     size_t sz = 0;
 791     pmix_buffer_t bucket;
 792     pmix_info_t *info = NULL;
 793     size_t ninfo=0, n, nmbrs, idx;
 794     struct timeval tv = {0, 0};
 795     pmix_list_t expand;
 796     pmix_group_caddy_t *gcd;
 797     pmix_group_t *grp;
 798 
 799     pmix_output_verbose(2, pmix_server_globals.fence_output,
 800                         "recvd FENCE");
 801 
 802     if (NULL == pmix_host_server.fence_nb) {
 803         PMIX_ERROR_LOG(PMIX_ERR_NOT_SUPPORTED);
 804         return PMIX_ERR_NOT_SUPPORTED;
 805     }
 806 
 807     /* unpack the number of procs */
 808     cnt = 1;
 809     PMIX_BFROPS_UNPACK(rc, cd->peer, buf, &nprocs, &cnt, PMIX_SIZE);
 810     if (PMIX_SUCCESS != rc) {
 811         return rc;
 812     }
 813     pmix_output_verbose(2, pmix_server_globals.fence_output,
 814                         "recvd fence from %s:%u with %d procs",
 815                         cd->peer->info->pname.nspace, cd->peer->info->pname.rank, (int)nprocs);
 816     /* there must be at least one as the client has to at least provide
 817      * their own namespace */
 818     if (nprocs < 1) {
 819         return PMIX_ERR_BAD_PARAM;
 820     }
 821 
 822     /* create space for the procs */
 823     PMIX_PROC_CREATE(procs, nprocs);
 824     if (NULL == procs) {
 825         return PMIX_ERR_NOMEM;
 826     }
 827     /* unpack the procs */
 828     cnt = nprocs;
 829     PMIX_BFROPS_UNPACK(rc, cd->peer, buf, procs, &cnt, PMIX_PROC);
 830     if (PMIX_SUCCESS != rc) {
 831         goto cleanup;
 832     }
 833 
 834     /* cycle thru the procs and check to see if any reference
 835      * a PMIx group */
 836     nmbrs = nprocs;
 837     PMIX_CONSTRUCT(&expand, pmix_list_t);
 838     /* use groups as the outer-most loop as there will
 839      * usually not be any */
 840     PMIX_LIST_FOREACH(grp, &pmix_server_globals.groups, pmix_group_t) {
 841         for (n=0; n < nprocs; n++) {
 842             if (PMIX_CHECK_NSPACE(procs[n].nspace, grp->grpid)) {
 843                 /* we need to replace this proc with grp members */
 844                 gcd = PMIX_NEW(pmix_group_caddy_t);
 845                 gcd->grp = grp;
 846                 gcd->idx = n;
 847                 gcd->rank = procs[n].rank;
 848                 pmix_list_append(&expand, &gcd->super);
 849                 /* see how many need to come across */
 850                 if (PMIX_RANK_WILDCARD == procs[n].rank) {
 851                     nmbrs += grp->nmbrs - 1; // account for replacing current proc
 852                 }
 853                 break;
 854             }
 855         }
 856     }
 857 
 858     if (0 < pmix_list_get_size(&expand)) {
 859         PMIX_PROC_CREATE(newprocs, nmbrs);
 860         gcd = (pmix_group_caddy_t*)pmix_list_remove_first(&expand);
 861         n=0;
 862         idx = 0;
 863         while (n < nmbrs) {
 864             if (idx != gcd->idx) {
 865                 memcpy(&newprocs[n], &procs[idx], sizeof(pmix_proc_t));
 866                 ++n;
 867             } else {
 868                 /* if we are bringing over just one, then simply replace */
 869                 if (PMIX_RANK_WILDCARD != gcd->rank) {
 870                     memcpy(&newprocs[n], &gcd->grp->members[gcd->rank], sizeof(pmix_proc_t));
 871                     ++n;
 872                 } else {
 873                     /* take them all */
 874                     memcpy(&newprocs[n], gcd->grp->members, gcd->grp->nmbrs * sizeof(pmix_proc_t));
 875                     n += gcd->grp->nmbrs;
 876                 }
 877                 PMIX_RELEASE(gcd);
 878                 gcd = (pmix_group_caddy_t*)pmix_list_remove_first(&expand);
 879             }
 880             ++idx;
 881         }
 882         PMIX_PROC_FREE(procs, nprocs);
 883         procs = newprocs;
 884         nprocs = nmbrs;
 885     }
 886     PMIX_LIST_DESTRUCT(&expand);
 887 
 888     /* unpack the number of provided info structs */
 889     cnt = 1;
 890     PMIX_BFROPS_UNPACK(rc, cd->peer, buf, &ninfo, &cnt, PMIX_SIZE);
 891     if (PMIX_SUCCESS != rc) {
 892         return rc;
 893     }
 894     if (0 < ninfo) {
 895         PMIX_INFO_CREATE(info, ninfo);
 896         if (NULL == info) {
 897             PMIX_PROC_FREE(procs, nprocs);
 898             return PMIX_ERR_NOMEM;
 899         }
 900         /* unpack the info */
 901         cnt = ninfo;
 902         PMIX_BFROPS_UNPACK(rc, cd->peer, buf, info, &cnt, PMIX_INFO);
 903         if (PMIX_SUCCESS != rc) {
 904             goto cleanup;
 905         }
 906         /* see if we are to collect data or enforce a timeout - we don't internally care
 907          * about any other directives */
 908         for (n=0; n < ninfo; n++) {
 909             if (0 == strcmp(info[n].key, PMIX_COLLECT_DATA)) {
 910                 collect_data = true;
 911             } else if (0 == strncmp(info[n].key, PMIX_TIMEOUT, PMIX_MAX_KEYLEN)) {
 912                 tv.tv_sec = info[n].value.data.uint32;
 913             }
 914         }
 915     }
 916 
 917     /* find/create the local tracker for this operation */
 918     if (NULL == (trk = get_tracker(NULL, procs, nprocs, PMIX_FENCENB_CMD))) {
 919         /* If no tracker was found - create and initialize it once */
 920         if (NULL == (trk = new_tracker(NULL, procs, nprocs, PMIX_FENCENB_CMD))) {
 921             /* only if a bozo error occurs */
 922             PMIX_ERROR_LOG(PMIX_ERROR);
 923             /* DO NOT HANG */
 924             if (NULL != opcbfunc) {
 925                 opcbfunc(PMIX_ERROR, cd);
 926             }
 927             rc = PMIX_ERROR;
 928             goto cleanup;
 929         }
 930         trk->type = PMIX_FENCENB_CMD;
 931         trk->modexcbfunc = modexcbfunc;
 932        /* mark if they want the data back */
 933         if (collect_data) {
 934             trk->collect_type = PMIX_COLLECT_YES;
 935         } else {
 936             trk->collect_type = PMIX_COLLECT_NO;
 937         }
 938     } else {
 939         switch (trk->collect_type) {
 940         case PMIX_COLLECT_NO:
 941             if (collect_data) {
 942                 trk->collect_type = PMIX_COLLECT_INVALID;
 943             }
 944             break;
 945         case PMIX_COLLECT_YES:
 946             if (!collect_data) {
 947                 trk->collect_type = PMIX_COLLECT_INVALID;
 948             }
 949             break;
 950         default:
 951             break;
 952         }
 953     }
 954 
 955     /* we only save the info structs from the first caller
 956      * who provides them - it is a user error to provide
 957      * different values from different participants */
 958     if (NULL == trk->info) {
 959         trk->info = info;
 960         trk->ninfo = ninfo;
 961     } else {
 962         /* cleanup */
 963         PMIX_INFO_FREE(info, ninfo);
 964         info = NULL;
 965     }
 966 
 967     /* add this contributor to the tracker so they get
 968      * notified when we are done */
 969     PMIX_RETAIN(cd);
 970     pmix_list_append(&trk->local_cbs, &cd->super);
 971     /* if a timeout was specified, set it */
 972     if (0 < tv.tv_sec) {
 973         PMIX_RETAIN(trk);
 974         cd->trk = trk;
 975         pmix_event_evtimer_set(pmix_globals.evbase, &cd->ev,
 976                                fence_timeout, cd);
 977         pmix_event_evtimer_add(&cd->ev, &tv);
 978         cd->event_active = true;
 979     }
 980 
 981     /* if all local contributions have been received,
 982      * let the local host's server know that we are at the
 983      * "fence" point - they will callback once the barrier
 984      * across all participants has been completed */
 985     if (trk->def_complete &&
 986         pmix_list_get_size(&trk->local_cbs) == trk->nlocal) {
 987         pmix_output_verbose(2, pmix_server_globals.fence_output,
 988                             "fence complete");
 989         /* if the user asked us to collect data, then we have
 990          * to provide any locally collected data to the host
 991          * server so they can circulate it - only take data
 992          * from the specified procs as not everyone is necessarily
 993          * participating! And only take data intended for remote
 994          * or global distribution */
 995 
 996         PMIX_CONSTRUCT(&bucket, pmix_buffer_t);
 997         if (PMIX_SUCCESS != (rc = _collect_data(trk, &bucket))) {
 998             PMIX_ERROR_LOG(rc);
 999             PMIX_DESTRUCT(&bucket);
1000             goto cleanup;
1001         }
1002         /* now unload the blob and pass it upstairs */
1003         PMIX_UNLOAD_BUFFER(&bucket, data, sz);
1004         PMIX_DESTRUCT(&bucket);
1005         trk->host_called = true;
1006         rc = pmix_host_server.fence_nb(trk->pcs, trk->npcs,
1007                                        trk->info, trk->ninfo,
1008                                        data, sz, trk->modexcbfunc, trk);
1009         if (PMIX_SUCCESS != rc) {
1010             pmix_list_remove_item(&pmix_server_globals.collectives, &trk->super);
1011             PMIX_RELEASE(trk);
1012             cd->trk = NULL;
1013         }
1014     }
1015 
1016   cleanup:
1017     PMIX_PROC_FREE(procs, nprocs);
1018     return rc;
1019 }
1020 
1021 static void opcbfunc(pmix_status_t status, void *cbdata)
1022 {
1023     pmix_setup_caddy_t *cd = (pmix_setup_caddy_t*)cbdata;
1024 
1025     if (NULL != cd->keys) {
1026         pmix_argv_free(cd->keys);
1027     }
1028     if (NULL != cd->codes) {
1029         free(cd->codes);
1030     }
1031     if (NULL != cd->info) {
1032         PMIX_INFO_FREE(cd->info, cd->ninfo);
1033     }
1034     if (NULL != cd->opcbfunc) {
1035         cd->opcbfunc(status, cd->cbdata);
1036     }
1037     PMIX_RELEASE(cd);
1038 }
1039 
1040 pmix_status_t pmix_server_publish(pmix_peer_t *peer,
1041                                   pmix_buffer_t *buf,
1042                                   pmix_op_cbfunc_t cbfunc, void *cbdata)
1043 {
1044     pmix_setup_caddy_t *cd;
1045     pmix_status_t rc;
1046     int32_t cnt;
1047     size_t ninfo;
1048     pmix_proc_t proc;
1049     uint32_t uid;
1050 
1051     pmix_output_verbose(2, pmix_server_globals.pub_output,
1052                         "recvd PUBLISH");
1053 
1054     if (NULL == pmix_host_server.publish) {
1055         return PMIX_ERR_NOT_SUPPORTED;
1056     }
1057 
1058     /* unpack the effective user id */
1059     cnt=1;
1060     PMIX_BFROPS_UNPACK(rc, peer, buf, &uid, &cnt, PMIX_UINT32);
1061     if (PMIX_SUCCESS != rc) {
1062         PMIX_ERROR_LOG(rc);
1063         return rc;
1064     }
1065     /* unpack the number of info objects */
1066     cnt=1;
1067     PMIX_BFROPS_UNPACK(rc, peer, buf, &ninfo, &cnt, PMIX_SIZE);
1068     if (PMIX_SUCCESS != rc) {
1069         PMIX_ERROR_LOG(rc);
1070         return rc;
1071     }
1072     /* we will be adding one for the user id */
1073     cd = PMIX_NEW(pmix_setup_caddy_t);
1074     if (NULL == cd) {
1075         return PMIX_ERR_NOMEM;
1076     }
1077     cd->opcbfunc = cbfunc;
1078     cd->cbdata = cbdata;
1079     cd->ninfo = ninfo + 1;
1080     PMIX_INFO_CREATE(cd->info, cd->ninfo);
1081     if (NULL == cd->info) {
1082         rc = PMIX_ERR_NOMEM;
1083         goto cleanup;
1084     }
1085     /* unpack the array of info objects */
1086     if (0 < cd->ninfo) {
1087         cnt=cd->ninfo;
1088         PMIX_BFROPS_UNPACK(rc, peer, buf, cd->info, &cnt, PMIX_INFO);
1089         if (PMIX_SUCCESS != rc) {
1090             PMIX_ERROR_LOG(rc);
1091             goto cleanup;
1092         }
1093     }
1094     pmix_strncpy(cd->info[cd->ninfo-1].key, PMIX_USERID, PMIX_MAX_KEYLEN);
1095     cd->info[cd->ninfo-1].value.type = PMIX_UINT32;
1096     cd->info[cd->ninfo-1].value.data.uint32 = uid;
1097 
1098     /* call the local server */
1099     pmix_strncpy(proc.nspace, peer->info->pname.nspace, PMIX_MAX_NSLEN);
1100     proc.rank = peer->info->pname.rank;
1101     rc = pmix_host_server.publish(&proc, cd->info, cd->ninfo, opcbfunc, cd);
1102 
1103   cleanup:
1104     if (PMIX_SUCCESS != rc) {
1105         if (NULL != cd->info) {
1106             PMIX_INFO_FREE(cd->info, cd->ninfo);
1107         }
1108         PMIX_RELEASE(cd);
1109     }
1110     return rc;
1111 }
1112 
1113 static void lkcbfunc(pmix_status_t status,
1114                      pmix_pdata_t data[], size_t ndata,
1115                      void *cbdata)
1116 {
1117     pmix_setup_caddy_t *cd = (pmix_setup_caddy_t*)cbdata;
1118 
1119     /* cleanup the caddy */
1120     if (NULL != cd->keys) {
1121         pmix_argv_free(cd->keys);
1122     }
1123     if (NULL != cd->info) {
1124         PMIX_INFO_FREE(cd->info, cd->ninfo);
1125     }
1126 
1127     /* return the results */
1128     if (NULL != cd->lkcbfunc) {
1129         cd->lkcbfunc(status, data, ndata, cd->cbdata);
1130     }
1131     PMIX_RELEASE(cd);
1132 }
1133 pmix_status_t pmix_server_lookup(pmix_peer_t *peer,
1134                                  pmix_buffer_t *buf,
1135                                  pmix_lookup_cbfunc_t cbfunc, void *cbdata)
1136 {
1137     pmix_setup_caddy_t *cd;
1138     int32_t cnt;
1139     pmix_status_t rc;
1140     size_t nkeys, i;
1141     char *sptr;
1142     size_t ninfo;
1143     pmix_proc_t proc;
1144     uint32_t uid;
1145 
1146     pmix_output_verbose(2, pmix_server_globals.pub_output,
1147                         "recvd LOOKUP");
1148 
1149     if (NULL == pmix_host_server.lookup) {
1150         return PMIX_ERR_NOT_SUPPORTED;
1151     }
1152 
1153     /* unpack the effective user id */
1154     cnt=1;
1155     PMIX_BFROPS_UNPACK(rc, peer, buf, &uid, &cnt, PMIX_UINT32);
1156     if (PMIX_SUCCESS != rc) {
1157         PMIX_ERROR_LOG(rc);
1158         return rc;
1159     }
1160     /* unpack the number of keys */
1161     cnt=1;
1162     PMIX_BFROPS_UNPACK(rc, peer, buf, &nkeys, &cnt, PMIX_SIZE);
1163     if (PMIX_SUCCESS != rc) {
1164         PMIX_ERROR_LOG(rc);
1165         return rc;
1166     }
1167     /* setup the caddy */
1168     cd = PMIX_NEW(pmix_setup_caddy_t);
1169     if (NULL == cd) {
1170         return PMIX_ERR_NOMEM;
1171     }
1172     cd->lkcbfunc = cbfunc;
1173     cd->cbdata = cbdata;
1174     /* unpack the array of keys */
1175     for (i=0; i < nkeys; i++) {
1176         cnt=1;
1177         PMIX_BFROPS_UNPACK(rc, peer, buf, &sptr, &cnt, PMIX_STRING);
1178         if (PMIX_SUCCESS != rc) {
1179             PMIX_ERROR_LOG(rc);
1180             goto cleanup;
1181         }
1182         pmix_argv_append_nosize(&cd->keys, sptr);
1183         free(sptr);
1184     }
1185     /* unpack the number of info objects */
1186     cnt=1;
1187     PMIX_BFROPS_UNPACK(rc, peer, buf, &ninfo, &cnt, PMIX_SIZE);
1188     if (PMIX_SUCCESS != rc) {
1189         PMIX_ERROR_LOG(rc);
1190         goto cleanup;
1191     }
1192     /* we will be adding one for the user id */
1193     cd->ninfo = ninfo + 1;
1194     PMIX_INFO_CREATE(cd->info, cd->ninfo);
1195     if (NULL == cd->info) {
1196         rc = PMIX_ERR_NOMEM;
1197         goto cleanup;
1198     }
1199     /* unpack the array of info objects */
1200     if (0 < ninfo) {
1201         cnt=ninfo;
1202         PMIX_BFROPS_UNPACK(rc, peer, buf, cd->info, &cnt, PMIX_INFO);
1203         if (PMIX_SUCCESS != rc) {
1204             PMIX_ERROR_LOG(rc);
1205             goto cleanup;
1206         }
1207     }
1208     pmix_strncpy(cd->info[cd->ninfo-1].key, PMIX_USERID, PMIX_MAX_KEYLEN);
1209     cd->info[cd->ninfo-1].value.type = PMIX_UINT32;
1210     cd->info[cd->ninfo-1].value.data.uint32 = uid;
1211 
1212     /* call the local server */
1213     pmix_strncpy(proc.nspace, peer->info->pname.nspace, PMIX_MAX_NSLEN);
1214     proc.rank = peer->info->pname.rank;
1215     rc = pmix_host_server.lookup(&proc, cd->keys, cd->info, cd->ninfo, lkcbfunc, cd);
1216 
1217   cleanup:
1218     if (PMIX_SUCCESS != rc) {
1219         if (NULL != cd->keys) {
1220             pmix_argv_free(cd->keys);
1221         }
1222         if (NULL != cd->info) {
1223             PMIX_INFO_FREE(cd->info, cd->ninfo);
1224         }
1225         PMIX_RELEASE(cd);
1226     }
1227     return rc;
1228 }
1229 
1230 pmix_status_t pmix_server_unpublish(pmix_peer_t *peer,
1231                                     pmix_buffer_t *buf,
1232                                     pmix_op_cbfunc_t cbfunc, void *cbdata)
1233 {
1234     pmix_setup_caddy_t *cd;
1235     int32_t cnt;
1236     pmix_status_t rc;
1237     size_t i, nkeys, ninfo;
1238     char *sptr;
1239     pmix_proc_t proc;
1240     uint32_t uid;
1241 
1242     pmix_output_verbose(2, pmix_server_globals.pub_output,
1243                         "recvd UNPUBLISH");
1244 
1245     if (NULL == pmix_host_server.unpublish) {
1246         return PMIX_ERR_NOT_SUPPORTED;
1247     }
1248 
1249     /* unpack the effective user id */
1250     cnt=1;
1251     PMIX_BFROPS_UNPACK(rc, peer, buf, &uid, &cnt, PMIX_UINT32);
1252     if (PMIX_SUCCESS != rc) {
1253         PMIX_ERROR_LOG(rc);
1254         return rc;
1255     }
1256     /* unpack the number of keys */
1257     cnt=1;
1258     PMIX_BFROPS_UNPACK(rc, peer, buf, &nkeys, &cnt, PMIX_SIZE);
1259     if (PMIX_SUCCESS != rc) {
1260         PMIX_ERROR_LOG(rc);
1261         return rc;
1262     }
1263     /* setup the caddy */
1264     cd = PMIX_NEW(pmix_setup_caddy_t);
1265     if (NULL == cd) {
1266         return PMIX_ERR_NOMEM;
1267     }
1268     cd->opcbfunc = cbfunc;
1269     cd->cbdata = cbdata;
1270     /* unpack the array of keys */
1271     for (i=0; i < nkeys; i++) {
1272         cnt=1;
1273         PMIX_BFROPS_UNPACK(rc, peer, buf, &sptr, &cnt, PMIX_STRING);
1274         if (PMIX_SUCCESS != rc) {
1275             PMIX_ERROR_LOG(rc);
1276             goto cleanup;
1277         }
1278         pmix_argv_append_nosize(&cd->keys, sptr);
1279         free(sptr);
1280     }
1281     /* unpack the number of info objects */
1282     cnt=1;
1283     PMIX_BFROPS_UNPACK(rc, peer, buf, &ninfo, &cnt, PMIX_SIZE);
1284     if (PMIX_SUCCESS != rc) {
1285         PMIX_ERROR_LOG(rc);
1286         goto cleanup;
1287     }
1288     /* we will be adding one for the user id */
1289     cd->ninfo = ninfo + 1;
1290     PMIX_INFO_CREATE(cd->info, cd->ninfo);
1291     if (NULL == cd->info) {
1292         rc = PMIX_ERR_NOMEM;
1293         goto cleanup;
1294     }
1295     /* unpack the array of info objects */
1296     if (0 < ninfo) {
1297         cnt=ninfo;
1298         PMIX_BFROPS_UNPACK(rc, peer, buf, cd->info, &cnt, PMIX_INFO);
1299         if (PMIX_SUCCESS != rc) {
1300             PMIX_ERROR_LOG(rc);
1301             goto cleanup;
1302         }
1303     }
1304     pmix_strncpy(cd->info[cd->ninfo-1].key, PMIX_USERID, PMIX_MAX_KEYLEN);
1305     cd->info[cd->ninfo-1].value.type = PMIX_UINT32;
1306     cd->info[cd->ninfo-1].value.data.uint32 = uid;
1307 
1308     /* call the local server */
1309     pmix_strncpy(proc.nspace, peer->info->pname.nspace, PMIX_MAX_NSLEN);
1310     proc.rank = peer->info->pname.rank;
1311     rc = pmix_host_server.unpublish(&proc, cd->keys, cd->info, cd->ninfo, opcbfunc, cd);
1312 
1313   cleanup:
1314     if (PMIX_SUCCESS != rc) {
1315         if (NULL != cd->keys) {
1316             pmix_argv_free(cd->keys);
1317         }
1318         if (NULL != cd->info) {
1319             PMIX_INFO_FREE(cd->info, cd->ninfo);
1320         }
1321         PMIX_RELEASE(cd);
1322     }
1323     return rc;
1324 }
1325 
1326 static void spcbfunc(pmix_status_t status,
1327                      char nspace[], void *cbdata)
1328 {
1329     pmix_setup_caddy_t *cd = (pmix_setup_caddy_t*)cbdata;
1330     pmix_iof_req_t *req;
1331     pmix_buffer_t *msg;
1332     pmix_status_t rc;
1333     pmix_iof_cache_t *iof, *ionext;
1334 
1335     /* if it was successful, and there are IOF requests, then
1336      * register them now */
1337     if (PMIX_SUCCESS == status && PMIX_FWD_NO_CHANNELS != cd->channels) {
1338          /* record the request */
1339         req = PMIX_NEW(pmix_iof_req_t);
1340         if (NULL == req) {
1341             status = PMIX_ERR_NOMEM;
1342             goto cleanup;
1343         }
1344         PMIX_RETAIN(cd->peer);
1345         req->peer = cd->peer;
1346         req->pname.nspace = strdup(nspace);
1347         req->pname.rank = PMIX_RANK_WILDCARD;
1348         req->channels = cd->channels;
1349         pmix_list_append(&pmix_globals.iof_requests, &req->super);
1350         /* process any cached IO */
1351         PMIX_LIST_FOREACH_SAFE(iof, ionext, &pmix_server_globals.iof, pmix_iof_cache_t) {
1352             /* if the channels don't match, then ignore it */
1353             if (!(iof->channel & req->channels)) {
1354                 continue;
1355             }
1356             /* if the source does not match the request, then ignore it */
1357             if (!PMIX_CHECK_PROCID(&iof->source, &req->pname)) {
1358                 continue;
1359             }
1360             /* never forward back to the source! This can happen if the source
1361              * is a launcher */
1362             if (PMIX_CHECK_PROCID(&iof->source, &req->peer->info->pname)) {
1363                 continue;
1364             }
1365             pmix_output_verbose(2, pmix_server_globals.iof_output,
1366                                 "PMIX:SERVER:SPAWN delivering cached IOF from %s:%d to %s:%d",
1367                                 iof->source.nspace, iof->source.rank,
1368                                 req->pname.nspace, req->pname.rank);
1369             /* setup the msg */
1370             if (NULL == (msg = PMIX_NEW(pmix_buffer_t))) {
1371                 PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
1372                 rc = PMIX_ERR_OUT_OF_RESOURCE;
1373                 break;
1374             }
1375             /* provide the source */
1376             PMIX_BFROPS_PACK(rc, req->peer, msg, &iof->source, 1, PMIX_PROC);
1377             if (PMIX_SUCCESS != rc) {
1378                 PMIX_ERROR_LOG(rc);
1379                 PMIX_RELEASE(msg);
1380                 break;
1381             }
1382             /* provide the channel */
1383             PMIX_BFROPS_PACK(rc, req->peer, msg, &iof->channel, 1, PMIX_IOF_CHANNEL);
1384             if (PMIX_SUCCESS != rc) {
1385                 PMIX_ERROR_LOG(rc);
1386                 PMIX_RELEASE(msg);
1387                 break;
1388             }
1389             /* pack the data */
1390             PMIX_BFROPS_PACK(rc, req->peer, msg, iof->bo, 1, PMIX_BYTE_OBJECT);
1391             if (PMIX_SUCCESS != rc) {
1392                 PMIX_ERROR_LOG(rc);
1393                 PMIX_RELEASE(msg);
1394                 break;
1395             }
1396             /* send it to the requestor */
1397             PMIX_PTL_SEND_ONEWAY(rc, req->peer, msg, PMIX_PTL_TAG_IOF);
1398             if (PMIX_SUCCESS != rc) {
1399                 PMIX_ERROR_LOG(rc);
1400                 PMIX_RELEASE(msg);
1401             }
1402             /* remove it from the list since it has now been forwarded */
1403             pmix_list_remove_item(&pmix_server_globals.iof, &iof->super);
1404             PMIX_RELEASE(iof);
1405         }
1406     }
1407 
1408   cleanup:
1409     /* cleanup the caddy */
1410     if (NULL != cd->info) {
1411         PMIX_INFO_FREE(cd->info, cd->ninfo);
1412     }
1413     if (NULL != cd->apps) {
1414         PMIX_APP_FREE(cd->apps, cd->napps);
1415     }
1416     if (NULL != cd->spcbfunc) {
1417         cd->spcbfunc(status, nspace, cd->cbdata);
1418     }
1419     PMIX_RELEASE(cd);
1420 }
1421 
1422 pmix_status_t pmix_server_spawn(pmix_peer_t *peer,
1423                                 pmix_buffer_t *buf,
1424                                 pmix_spawn_cbfunc_t cbfunc,
1425                                 void *cbdata)
1426 {
1427     pmix_setup_caddy_t *cd;
1428     int32_t cnt;
1429     pmix_status_t rc;
1430     pmix_proc_t proc;
1431     size_t ninfo, n;
1432     bool stdout_found = false, stderr_found = false, stddiag_found = false;
1433 
1434     pmix_output_verbose(2, pmix_server_globals.spawn_output,
1435                         "recvd SPAWN from %s:%d", peer->info->pname.nspace, peer->info->pname.rank);
1436 
1437     if (NULL == pmix_host_server.spawn) {
1438         return PMIX_ERR_NOT_SUPPORTED;
1439     }
1440 
1441     /* setup */
1442     cd = PMIX_NEW(pmix_setup_caddy_t);
1443     if (NULL == cd) {
1444         return PMIX_ERR_NOMEM;
1445     }
1446     PMIX_RETAIN(peer);
1447     cd->peer = peer;
1448     cd->spcbfunc = cbfunc;
1449     cd->cbdata = cbdata;
1450 
1451     /* unpack the number of job-level directives */
1452     cnt=1;
1453     PMIX_BFROPS_UNPACK(rc, peer, buf, &ninfo, &cnt, PMIX_SIZE);
1454     if (PMIX_SUCCESS != rc) {
1455         PMIX_ERROR_LOG(rc);
1456         PMIX_RELEASE(cd);
1457         return rc;
1458     }
1459     /* always add one directive that indicates whether the requestor
1460      * is a tool or client */
1461     cd->ninfo = ninfo + 1;
1462     PMIX_INFO_CREATE(cd->info, cd->ninfo);
1463     if (NULL == cd->info) {
1464         rc = PMIX_ERR_NOMEM;
1465         goto cleanup;
1466     }
1467 
1468     /* unpack the array of directives */
1469     if (0 < ninfo) {
1470         cnt = ninfo;
1471         PMIX_BFROPS_UNPACK(rc, peer, buf, cd->info, &cnt, PMIX_INFO);
1472         if (PMIX_SUCCESS != rc) {
1473             PMIX_ERROR_LOG(rc);
1474             goto cleanup;
1475         }
1476         /* run a quick check of the directives to see if any IOF
1477          * requests were included so we can set that up now - helps
1478          * to catch any early output - and a request for notification
1479          * of job termination so we can setup the event registration */
1480         cd->channels = PMIX_FWD_NO_CHANNELS;
1481         for (n=0; n < cd->ninfo; n++) {
1482             if (0 == strncmp(cd->info[n].key, PMIX_FWD_STDIN, PMIX_MAX_KEYLEN)) {
1483                 if (PMIX_INFO_TRUE(&cd->info[n])) {
1484                     cd->channels |= PMIX_FWD_STDIN_CHANNEL;
1485                 }
1486             } else if (0 == strncmp(cd->info[n].key, PMIX_FWD_STDOUT, PMIX_MAX_KEYLEN)) {
1487                 stdout_found = true;
1488                 if (PMIX_INFO_TRUE(&cd->info[n])) {
1489                     cd->channels |= PMIX_FWD_STDOUT_CHANNEL;
1490                 }
1491             } else if (0 == strncmp(cd->info[n].key, PMIX_FWD_STDERR, PMIX_MAX_KEYLEN)) {
1492                 stderr_found = true;
1493                 if (PMIX_INFO_TRUE(&cd->info[n])) {
1494                     cd->channels |= PMIX_FWD_STDERR_CHANNEL;
1495                 }
1496             } else if (0 == strncmp(cd->info[n].key, PMIX_FWD_STDDIAG, PMIX_MAX_KEYLEN)) {
1497                 stddiag_found = true;
1498                 if (PMIX_INFO_TRUE(&cd->info[n])) {
1499                     cd->channels |= PMIX_FWD_STDDIAG_CHANNEL;
1500                 }
1501             }
1502         }
1503         /* we will construct any required iof request tracker upon completion of the spawn
1504          * as we need the nspace of the spawned application! */
1505     }
1506     /* add the directive to the end */
1507     if (PMIX_PROC_IS_TOOL(peer)) {
1508         PMIX_INFO_LOAD(&cd->info[ninfo], PMIX_REQUESTOR_IS_TOOL, NULL, PMIX_BOOL);
1509         /* if the requestor is a tool, we default to forwarding all
1510          * output IO channels */
1511         if (!stdout_found) {
1512             cd->channels |= PMIX_FWD_STDOUT_CHANNEL;
1513         }
1514         if (!stderr_found) {
1515             cd->channels |= PMIX_FWD_STDERR_CHANNEL;
1516         }
1517         if (!stddiag_found) {
1518             cd->channels |= PMIX_FWD_STDDIAG_CHANNEL;
1519         }
1520     } else {
1521         PMIX_INFO_LOAD(&cd->info[ninfo], PMIX_REQUESTOR_IS_CLIENT, NULL, PMIX_BOOL);
1522     }
1523 
1524     /* unpack the number of apps */
1525     cnt=1;
1526     PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->napps, &cnt, PMIX_SIZE);
1527     if (PMIX_SUCCESS != rc) {
1528         PMIX_ERROR_LOG(rc);
1529         goto cleanup;
1530     }
1531     /* unpack the array of apps */
1532     if (0 < cd->napps) {
1533         PMIX_APP_CREATE(cd->apps, cd->napps);
1534         if (NULL == cd->apps) {
1535             rc = PMIX_ERR_NOMEM;
1536             goto cleanup;
1537         }
1538         cnt = cd->napps;
1539         PMIX_BFROPS_UNPACK(rc, peer, buf, cd->apps, &cnt, PMIX_APP);
1540         if (PMIX_SUCCESS != rc) {
1541             PMIX_ERROR_LOG(rc);
1542             goto cleanup;
1543         }
1544     }
1545     /* call the local server */
1546     pmix_strncpy(proc.nspace, peer->info->pname.nspace, PMIX_MAX_NSLEN);
1547     proc.rank = peer->info->pname.rank;
1548     rc = pmix_host_server.spawn(&proc, cd->info, cd->ninfo, cd->apps, cd->napps, spcbfunc, cd);
1549 
1550   cleanup:
1551     if (PMIX_SUCCESS != rc) {
1552         if (NULL != cd->info) {
1553             PMIX_INFO_FREE(cd->info, cd->ninfo);
1554         }
1555         if (NULL != cd->apps) {
1556             PMIX_APP_FREE(cd->apps, cd->napps);
1557         }
1558         PMIX_RELEASE(cd);
1559     }
1560     return rc;
1561 }
1562 
1563 pmix_status_t pmix_server_disconnect(pmix_server_caddy_t *cd,
1564                                      pmix_buffer_t *buf,
1565                                      pmix_op_cbfunc_t cbfunc)
1566 {
1567     int32_t cnt;
1568     pmix_status_t rc;
1569     pmix_info_t *info = NULL;
1570     size_t nprocs, ninfo;
1571     pmix_server_trkr_t *trk;
1572     pmix_proc_t *procs = NULL;
1573 
1574     if (NULL == pmix_host_server.disconnect) {
1575         return PMIX_ERR_NOT_SUPPORTED;
1576     }
1577 
1578     /* unpack the number of procs */
1579     cnt = 1;
1580     PMIX_BFROPS_UNPACK(rc, cd->peer, buf, &nprocs, &cnt, PMIX_SIZE);
1581     if (PMIX_SUCCESS != rc) {
1582         PMIX_ERROR_LOG(rc);
1583         goto cleanup;
1584     }
1585     /* there must be at least one proc - we do not allow the client
1586      * to send us NULL proc as the server has no idea what to do
1587      * with that situation. Instead, the client should at least send
1588      * us their own namespace for the use-case where the connection
1589      * spans all procs in that namespace */
1590     if (nprocs < 1) {
1591         PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
1592         rc = PMIX_ERR_BAD_PARAM;
1593         goto cleanup;
1594     }
1595 
1596     /* unpack the procs */
1597     PMIX_PROC_CREATE(procs, nprocs);
1598     if (NULL == procs) {
1599         rc = PMIX_ERR_NOMEM;
1600         goto cleanup;
1601     }
1602     cnt = nprocs;
1603     PMIX_BFROPS_UNPACK(rc, cd->peer, buf, procs, &cnt, PMIX_PROC);
1604     if (PMIX_SUCCESS != rc) {
1605         PMIX_ERROR_LOG(rc);
1606         goto cleanup;
1607     }
1608 
1609     /* unpack the number of provided info structs */
1610     cnt = 1;
1611     PMIX_BFROPS_UNPACK(rc, cd->peer, buf, &ninfo, &cnt, PMIX_SIZE);
1612     if (PMIX_SUCCESS != rc) {
1613         return rc;
1614     }
1615     if (0 < ninfo) {
1616         PMIX_INFO_CREATE(info, ninfo);
1617         if (NULL == info) {
1618             rc = PMIX_ERR_NOMEM;
1619             goto cleanup;
1620         }
1621         /* unpack the info */
1622         cnt = ninfo;
1623         PMIX_BFROPS_UNPACK(rc, cd->peer, buf, info, &cnt, PMIX_INFO);
1624         if (PMIX_SUCCESS != rc) {
1625             goto cleanup;
1626         }
1627     }
1628 
1629     /* find/create the local tracker for this operation */
1630     if (NULL == (trk = get_tracker(NULL, procs, nprocs, PMIX_DISCONNECTNB_CMD))) {
1631         /* we don't have this tracker yet, so get a new one */
1632         if (NULL == (trk = new_tracker(NULL, procs, nprocs, PMIX_DISCONNECTNB_CMD))) {
1633             /* only if a bozo error occurs */
1634             PMIX_ERROR_LOG(PMIX_ERROR);
1635             rc = PMIX_ERROR;
1636             goto cleanup;
1637         }
1638         trk->op_cbfunc = cbfunc;
1639     }
1640 
1641     /* if the info keys have not been provided yet, pass
1642      * them along here */
1643     if (NULL == trk->info && NULL != info) {
1644         trk->info = info;
1645         trk->ninfo = ninfo;
1646         info = NULL;
1647         ninfo = 0;
1648     }
1649 
1650     /* add this contributor to the tracker so they get
1651      * notified when we are done */
1652     pmix_list_append(&trk->local_cbs, &cd->super);
1653     /* if all local contributions have been received,
1654      * let the local host's server know that we are at the
1655      * "fence" point - they will callback once the [dis]connect
1656      * across all participants has been completed */
1657     if (trk->def_complete &&
1658         pmix_list_get_size(&trk->local_cbs) == trk->nlocal) {
1659         trk->host_called = true;
1660         rc = pmix_host_server.disconnect(trk->pcs, trk->npcs, trk->info, trk->ninfo, cbfunc, trk);
1661         if (PMIX_SUCCESS != rc) {
1662             /* remove this contributor from the list - they will be notified
1663              * by the switchyard */
1664             pmix_list_remove_item(&trk->local_cbs, &cd->super);
1665         }
1666     } else {
1667         rc = PMIX_SUCCESS;
1668     }
1669 
1670   cleanup:
1671     if (NULL != info) {
1672         PMIX_INFO_FREE(info, ninfo);
1673     }
1674     return rc;
1675 }
1676 
1677 static void connect_timeout(int sd, short args, void *cbdata)
1678 {
1679     pmix_server_caddy_t *cd = (pmix_server_caddy_t*)cbdata;
1680 
1681     pmix_output_verbose(2, pmix_server_globals.connect_output,
1682                         "ALERT: connect timeout fired");
1683 
1684     /* execute the provided callback function with the error */
1685     if (NULL != cd->trk->op_cbfunc) {
1686         cd->trk->op_cbfunc(PMIX_ERR_TIMEOUT, cd->trk);
1687         return;  // the cbfunc will have cleaned up the tracker
1688     }
1689     cd->event_active = false;
1690     /* remove it from the list */
1691     pmix_list_remove_item(&cd->trk->local_cbs, &cd->super);
1692     PMIX_RELEASE(cd);
1693 }
1694 
1695 pmix_status_t pmix_server_connect(pmix_server_caddy_t *cd,
1696                                   pmix_buffer_t *buf,
1697                                   pmix_op_cbfunc_t cbfunc)
1698 {
1699     int32_t cnt;
1700     pmix_status_t rc;
1701     pmix_proc_t *procs = NULL;
1702     pmix_info_t *info = NULL;
1703     size_t nprocs, ninfo, n;
1704     pmix_server_trkr_t *trk;
1705     struct timeval tv = {0, 0};
1706 
1707     pmix_output_verbose(2, pmix_server_globals.connect_output,
1708                         "recvd CONNECT from peer %s:%d",
1709                         cd->peer->info->pname.nspace,
1710                         cd->peer->info->pname.rank);
1711 
1712     if (NULL == pmix_host_server.connect) {
1713         return PMIX_ERR_NOT_SUPPORTED;
1714     }
1715 
1716     /* unpack the number of procs */
1717     cnt = 1;
1718     PMIX_BFROPS_UNPACK(rc, cd->peer, buf, &nprocs, &cnt, PMIX_SIZE);
1719     if (PMIX_SUCCESS != rc) {
1720         PMIX_ERROR_LOG(rc);
1721         goto cleanup;
1722     }
1723     /* there must be at least one proc - we do not allow the client
1724      * to send us NULL proc as the server has no idea what to do
1725      * with that situation. Instead, the client should at least send
1726      * us their own namespace for the use-case where the connection
1727      * spans all procs in that namespace */
1728     if (nprocs < 1) {
1729         PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
1730         rc = PMIX_ERR_BAD_PARAM;
1731         goto cleanup;
1732     }
1733 
1734     /* unpack the procs */
1735     PMIX_PROC_CREATE(procs, nprocs);
1736     if (NULL == procs) {
1737         rc = PMIX_ERR_NOMEM;
1738         goto cleanup;
1739     }
1740     cnt = nprocs;
1741     PMIX_BFROPS_UNPACK(rc, cd->peer, buf, procs, &cnt, PMIX_PROC);
1742     if (PMIX_SUCCESS != rc) {
1743         PMIX_ERROR_LOG(rc);
1744         goto cleanup;
1745     }
1746 
1747     /* unpack the number of provided info structs */
1748     cnt = 1;
1749     PMIX_BFROPS_UNPACK(rc, cd->peer, buf, &ninfo, &cnt, PMIX_SIZE);
1750     if (PMIX_SUCCESS != rc) {
1751         return rc;
1752     }
1753     if (0 < ninfo) {
1754         PMIX_INFO_CREATE(info, ninfo);
1755         if (NULL == info) {
1756             rc = PMIX_ERR_NOMEM;
1757             goto cleanup;
1758         }
1759         /* unpack the info */
1760         cnt = ninfo;
1761         PMIX_BFROPS_UNPACK(rc, cd->peer, buf, info, &cnt, PMIX_INFO);
1762         if (PMIX_SUCCESS != rc) {
1763             goto cleanup;
1764         }
1765         /* check for a timeout */
1766         for (n=0; n < ninfo; n++) {
1767             if (0 == strncmp(info[n].key, PMIX_TIMEOUT, PMIX_MAX_KEYLEN)) {
1768                 tv.tv_sec = info[n].value.data.uint32;
1769                 break;
1770             }
1771         }
1772     }
1773 
1774     /* find/create the local tracker for this operation */
1775     if (NULL == (trk = get_tracker(NULL, procs, nprocs, PMIX_CONNECTNB_CMD))) {
1776         /* we don't have this tracker yet, so get a new one */
1777         if (NULL == (trk = new_tracker(NULL, procs, nprocs, PMIX_CONNECTNB_CMD))) {
1778             /* only if a bozo error occurs */
1779             PMIX_ERROR_LOG(PMIX_ERROR);
1780             /* DO NOT HANG */
1781             if (NULL != cbfunc) {
1782                 cbfunc(PMIX_ERROR, cd);
1783             }
1784             rc = PMIX_ERROR;
1785             goto cleanup;
1786         }
1787         trk->op_cbfunc = cbfunc;
1788     }
1789 
1790     /* if the info keys have not been provided yet, pass
1791      * them along here */
1792     if (NULL == trk->info && NULL != info) {
1793         trk->info = info;
1794         trk->ninfo = ninfo;
1795         info = NULL;
1796         ninfo = 0;
1797     }
1798 
1799     /* add this contributor to the tracker so they get
1800      * notified when we are done */
1801     pmix_list_append(&trk->local_cbs, &cd->super);
1802 
1803     /* if all local contributions have been received,
1804      * let the local host's server know that we are at the
1805      * "fence" point - they will callback once the [dis]connect
1806      * across all participants has been completed */
1807     if (trk->def_complete &&
1808         pmix_list_get_size(&trk->local_cbs) == trk->nlocal) {
1809         trk->host_called = true;
1810         rc = pmix_host_server.connect(trk->pcs, trk->npcs, trk->info, trk->ninfo, cbfunc, trk);
1811         if (PMIX_SUCCESS != rc) {
1812             /* remove this contributor from the list - they will be notified
1813              * by the switchyard */
1814             pmix_list_remove_item(&trk->local_cbs, &cd->super);
1815         }
1816     } else {
1817         rc = PMIX_SUCCESS;
1818     }
1819     /* if a timeout was specified, set it */
1820     if (PMIX_SUCCESS == rc && 0 < tv.tv_sec) {
1821         PMIX_RETAIN(trk);
1822         cd->trk = trk;
1823         pmix_event_evtimer_set(pmix_globals.evbase, &cd->ev,
1824                                connect_timeout, cd);
1825         pmix_event_evtimer_add(&cd->ev, &tv);
1826         cd->event_active = true;
1827     }
1828 
1829   cleanup:
1830     if (NULL != procs) {
1831         PMIX_PROC_FREE(procs, nprocs);
1832     }
1833     if (NULL != info) {
1834         PMIX_INFO_FREE(info, ninfo);
1835     }
1836     return rc;
1837 }
1838 
1839 static void _check_cached_events(int sd, short args, void *cbdata)
1840 {
1841     pmix_setup_caddy_t *scd = (pmix_setup_caddy_t*)cbdata;
1842     pmix_notify_caddy_t *cd;
1843     pmix_range_trkr_t rngtrk;
1844     pmix_proc_t proc;
1845     int i;
1846     size_t k, n;
1847     bool found, matched;
1848     pmix_buffer_t *relay;
1849     pmix_status_t ret = PMIX_SUCCESS;
1850     pmix_cmd_t cmd = PMIX_NOTIFY_CMD;
1851 
1852     /* check if any matching notifications have been cached */
1853     rngtrk.procs = NULL;
1854     rngtrk.nprocs = 0;
1855     for (i=0; i < pmix_globals.max_events; i++) {
1856         pmix_hotel_knock(&pmix_globals.notifications, i, (void**)&cd);
1857         if (NULL == cd) {
1858             continue;
1859         }
1860         found = false;
1861         if (NULL == scd->codes) {
1862             if (!cd->nondefault) {
1863                 /* they registered a default event handler - always matches */
1864                 found = true;
1865             }
1866         } else {
1867             for (k=0; k < scd->ncodes; k++) {
1868                 if (scd->codes[k] == cd->status) {
1869                     found = true;
1870                     break;
1871                 }
1872             }
1873         }
1874         if (!found) {
1875             continue;
1876         }
1877         /* check if the affected procs (if given) match those they
1878          * wanted to know about */
1879         if (!pmix_notify_check_affected(cd->affected, cd->naffected,
1880                                         scd->procs, scd->nprocs)) {
1881             continue;
1882         }
1883         /* check the range */
1884         if (NULL == cd->targets) {
1885             rngtrk.procs = &cd->source;
1886             rngtrk.nprocs = 1;
1887         } else {
1888             rngtrk.procs = cd->targets;
1889             rngtrk.nprocs = cd->ntargets;
1890         }
1891         rngtrk.range = cd->range;
1892         PMIX_LOAD_PROCID(&proc, scd->peer->info->pname.nspace, scd->peer->info->pname.rank);
1893         if (!pmix_notify_check_range(&rngtrk, &proc)) {
1894             continue;
1895         }
1896         /* if we were given specific targets, check if this is one */
1897         found = false;
1898         if (NULL != cd->targets) {
1899             matched = false;
1900             for (n=0; n < cd->ntargets; n++) {
1901                 /* if the source of the event is the same peer just registered, then ignore it
1902                  * as the event notification system will have already locally
1903                  * processed it */
1904                 if (PMIX_CHECK_PROCID(&cd->source, &scd->peer->info->pname)) {
1905                     continue;
1906                 }
1907                 if (PMIX_CHECK_PROCID(&scd->peer->info->pname, &cd->targets[n])) {
1908                     matched = true;
1909                     /* track the number of targets we have left to notify */
1910                     --cd->nleft;
1911                     /* if this is the last one, then evict this event
1912                      * from the cache */
1913                     if (0 == cd->nleft) {
1914                         pmix_hotel_checkout(&pmix_globals.notifications, cd->room);
1915                         found = true;  // mark that we should release cd
1916                     }
1917                     break;
1918                 }
1919             }
1920             if (!matched) {
1921                 /* do not notify this one */
1922                 continue;
1923             }
1924         }
1925 
1926         /* all matches - notify */
1927         relay = PMIX_NEW(pmix_buffer_t);
1928         if (NULL == relay) {
1929             /* nothing we can do */
1930             PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
1931             ret = PMIX_ERR_NOMEM;
1932             break;
1933         }
1934         /* pack the info data stored in the event */
1935         PMIX_BFROPS_PACK(ret, scd->peer, relay, &cmd, 1, PMIX_COMMAND);
1936         if (PMIX_SUCCESS != ret) {
1937             PMIX_ERROR_LOG(ret);
1938             break;
1939         }
1940         PMIX_BFROPS_PACK(ret, scd->peer, relay, &cd->status, 1, PMIX_STATUS);
1941         if (PMIX_SUCCESS != ret) {
1942             PMIX_ERROR_LOG(ret);
1943             break;
1944         }
1945         PMIX_BFROPS_PACK(ret, scd->peer, relay, &cd->source, 1, PMIX_PROC);
1946         if (PMIX_SUCCESS != ret) {
1947             PMIX_ERROR_LOG(ret);
1948             break;
1949         }
1950         PMIX_BFROPS_PACK(ret, scd->peer, relay, &cd->ninfo, 1, PMIX_SIZE);
1951         if (PMIX_SUCCESS != ret) {
1952             PMIX_ERROR_LOG(ret);
1953             break;
1954         }
1955         if (0 < cd->ninfo) {
1956             PMIX_BFROPS_PACK(ret, scd->peer, relay, cd->info, cd->ninfo, PMIX_INFO);
1957             if (PMIX_SUCCESS != ret) {
1958                 PMIX_ERROR_LOG(ret);
1959                 break;
1960             }
1961         }
1962         PMIX_SERVER_QUEUE_REPLY(ret, scd->peer, 0, relay);
1963         if (PMIX_SUCCESS != ret) {
1964             PMIX_RELEASE(relay);
1965         }
1966         if (found) {
1967             PMIX_RELEASE(cd);
1968         }
1969     }
1970     /* release the caddy */
1971     if (NULL != scd->codes) {
1972         free(scd->codes);
1973     }
1974     if (NULL != scd->info) {
1975         PMIX_INFO_FREE(scd->info, scd->ninfo);
1976     }
1977     if (NULL != scd->opcbfunc) {
1978         scd->opcbfunc(ret, scd->cbdata);
1979     }
1980     PMIX_RELEASE(scd);
1981 }
1982 
1983 /* provide a callback function for the host when it finishes
1984  * processing the registration */
1985 static void regevopcbfunc(pmix_status_t status, void *cbdata)
1986 {
1987     pmix_setup_caddy_t *cd = (pmix_setup_caddy_t*)cbdata;
1988 
1989     /* if the registration succeeded, then check local cache */
1990     if (PMIX_SUCCESS == status) {
1991         _check_cached_events(0, 0, cd);
1992         return;
1993     }
1994 
1995     /* it didn't succeed, so cleanup and execute the callback
1996      * so we don't hang */
1997     if (NULL != cd->codes) {
1998         free(cd->codes);
1999     }
2000     if (NULL != cd->info) {
2001         PMIX_INFO_FREE(cd->info, cd->ninfo);
2002     }
2003     if (NULL != cd->opcbfunc) {
2004         cd->opcbfunc(status, cd->cbdata);
2005     }
2006     PMIX_RELEASE(cd);
2007 }
2008 
2009 
2010 pmix_status_t pmix_server_register_events(pmix_peer_t *peer,
2011                                           pmix_buffer_t *buf,
2012                                           pmix_op_cbfunc_t cbfunc,
2013                                           void *cbdata)
2014 {
2015     int32_t cnt;
2016     pmix_status_t rc;
2017     pmix_status_t *codes = NULL;
2018     pmix_info_t *info = NULL;
2019     size_t ninfo=0, ncodes, n;
2020     pmix_regevents_info_t *reginfo;
2021     pmix_peer_events_info_t *prev = NULL;
2022     pmix_setup_caddy_t *scd;
2023     bool enviro_events = false;
2024     bool found;
2025     pmix_proc_t *affected = NULL;
2026     size_t naffected = 0;
2027 
2028     pmix_output_verbose(2, pmix_server_globals.event_output,
2029                         "recvd register events for peer %s:%d",
2030                         peer->info->pname.nspace, peer->info->pname.rank);
2031 
2032     /* unpack the number of codes */
2033     cnt=1;
2034     PMIX_BFROPS_UNPACK(rc, peer, buf, &ncodes, &cnt, PMIX_SIZE);
2035     if (PMIX_SUCCESS != rc) {
2036         PMIX_ERROR_LOG(rc);
2037         return rc;
2038     }
2039     /* unpack the array of codes */
2040     if (0 < ncodes) {
2041         codes = (pmix_status_t*)malloc(ncodes * sizeof(pmix_status_t));
2042         if (NULL == codes) {
2043             rc = PMIX_ERR_NOMEM;
2044             goto cleanup;
2045         }
2046         cnt=ncodes;
2047         PMIX_BFROPS_UNPACK(rc, peer, buf, codes, &cnt, PMIX_STATUS);
2048         if (PMIX_SUCCESS != rc) {
2049             PMIX_ERROR_LOG(rc);
2050             goto cleanup;
2051         }
2052     }
2053 
2054     /* unpack the number of info objects */
2055     cnt=1;
2056     PMIX_BFROPS_UNPACK(rc, peer, buf, &ninfo, &cnt, PMIX_SIZE);
2057     if (PMIX_SUCCESS != rc) {
2058         PMIX_ERROR_LOG(rc);
2059         return rc;
2060     }
2061     /* unpack the array of info objects */
2062     if (0 < ninfo) {
2063         PMIX_INFO_CREATE(info, ninfo);
2064         if (NULL == info) {
2065             rc = PMIX_ERR_NOMEM;
2066             goto cleanup;
2067         }
2068         cnt=ninfo;
2069         PMIX_BFROPS_UNPACK(rc, peer, buf, info, &cnt, PMIX_INFO);
2070         if (PMIX_SUCCESS != rc) {
2071             PMIX_ERROR_LOG(rc);
2072             goto cleanup;
2073         }
2074     }
2075 
2076     /* check the directives */
2077     for (n=0; n < ninfo; n++) {
2078         if (PMIX_CHECK_KEY(&info[n], PMIX_EVENT_AFFECTED_PROC)) {
2079             if (NULL != affected) {
2080                 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
2081                 rc = PMIX_ERR_BAD_PARAM;
2082                 goto cleanup;
2083             }
2084             naffected = 1;
2085             PMIX_PROC_CREATE(affected, naffected);
2086             memcpy(affected, info[n].value.data.proc, sizeof(pmix_proc_t));
2087         } else if (PMIX_CHECK_KEY(&info[n], PMIX_EVENT_AFFECTED_PROCS)) {
2088             if (NULL != affected) {
2089                 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
2090                 rc = PMIX_ERR_BAD_PARAM;
2091                 goto cleanup;
2092             }
2093             naffected = info[n].value.data.darray->size;
2094             PMIX_PROC_CREATE(affected, naffected);
2095             memcpy(affected, info[n].value.data.darray->array, naffected * sizeof(pmix_proc_t));
2096         }
2097     }
2098 
2099     /* check the codes for system events */
2100     for (n=0; n < ncodes; n++) {
2101         if (PMIX_SYSTEM_EVENT(codes[n])) {
2102             enviro_events = true;
2103             break;
2104         }
2105     }
2106 
2107     /* if they asked for enviro events, and our host doesn't support
2108      * register_events, then we cannot meet the request */
2109     if (enviro_events && NULL == pmix_host_server.register_events) {
2110         enviro_events = false;
2111         rc = PMIX_ERR_NOT_SUPPORTED;
2112         goto cleanup;
2113     }
2114 
2115     /* if they didn't send us any codes, then they are registering a
2116      * default event handler. In that case, check only for default
2117      * handlers and add this request to it, if not already present */
2118     if (0 == ncodes)  {
2119         PMIX_LIST_FOREACH(reginfo, &pmix_server_globals.events, pmix_regevents_info_t) {
2120             if (PMIX_MAX_ERR_CONSTANT == reginfo->code) {
2121                 /* both are default handlers */
2122                 prev = PMIX_NEW(pmix_peer_events_info_t);
2123                 if (NULL == prev) {
2124                     rc = PMIX_ERR_NOMEM;
2125                     goto cleanup;
2126                 }
2127                 PMIX_RETAIN(peer);
2128                 prev->peer = peer;
2129                 if (NULL != affected) {
2130                     PMIX_PROC_CREATE(prev->affected, naffected);
2131                     prev->naffected = naffected;
2132                     memcpy(prev->affected, affected, naffected * sizeof(pmix_proc_t));
2133                 }
2134                 pmix_list_append(&reginfo->peers, &prev->super);
2135                 break;
2136             }
2137         }
2138         rc = PMIX_OPERATION_SUCCEEDED;
2139         goto cleanup;
2140     }
2141 
2142     /* store the event registration info so we can call the registered
2143      * client when the server notifies the event */
2144     for (n=0; n < ncodes; n++) {
2145         found = false;
2146         PMIX_LIST_FOREACH(reginfo, &pmix_server_globals.events, pmix_regevents_info_t) {
2147             if (NULL == codes) {
2148                 if (PMIX_MAX_ERR_CONSTANT == reginfo->code) {
2149                     /* both are default handlers */
2150                     found = true;
2151                     break;
2152                 } else {
2153                     continue;
2154                 }
2155             } else {
2156                 if (PMIX_MAX_ERR_CONSTANT == reginfo->code) {
2157                     continue;
2158                 } else if (codes[n] == reginfo->code) {
2159                     found = true;
2160                     break;
2161                 }
2162             }
2163         }
2164         if (found) {
2165             /* found it - add this request */
2166             prev = PMIX_NEW(pmix_peer_events_info_t);
2167             if (NULL == prev) {
2168                 rc = PMIX_ERR_NOMEM;
2169                 goto cleanup;
2170             }
2171             PMIX_RETAIN(peer);
2172             prev->peer = peer;
2173             if (NULL != affected) {
2174                 PMIX_PROC_CREATE(prev->affected, naffected);
2175                 prev->naffected = naffected;
2176                 memcpy(prev->affected, affected, naffected * sizeof(pmix_proc_t));
2177             }
2178             prev->enviro_events = enviro_events;
2179             pmix_list_append(&reginfo->peers, &prev->super);
2180         } else {
2181             /* if we get here, then we didn't find an existing registration for this code */
2182             reginfo = PMIX_NEW(pmix_regevents_info_t);
2183             if (NULL == reginfo) {
2184                 rc = PMIX_ERR_NOMEM;
2185                 goto cleanup;
2186             }
2187             if (NULL == codes) {
2188                 reginfo->code = PMIX_MAX_ERR_CONSTANT;
2189             } else {
2190                 reginfo->code = codes[n];
2191             }
2192             pmix_list_append(&pmix_server_globals.events, &reginfo->super);
2193             prev = PMIX_NEW(pmix_peer_events_info_t);
2194             if (NULL == prev) {
2195                 rc = PMIX_ERR_NOMEM;
2196                 goto cleanup;
2197             }
2198             PMIX_RETAIN(peer);
2199             prev->peer = peer;
2200             if (NULL != affected) {
2201                 PMIX_PROC_CREATE(prev->affected, naffected);
2202                 prev->naffected = naffected;
2203                 memcpy(prev->affected, affected, naffected * sizeof(pmix_proc_t));
2204             }
2205             prev->enviro_events = enviro_events;
2206             pmix_list_append(&reginfo->peers, &prev->super);
2207         }
2208     }
2209 
2210     /* if they asked for enviro events, call the local server */
2211     if (enviro_events) {
2212         /* if they don't support this, then we cannot do it */
2213         if (NULL == pmix_host_server.register_events) {
2214             rc = PMIX_ERR_NOT_SUPPORTED;
2215             goto cleanup;
2216         }
2217         /* need to ensure the arrays don't go away until after the
2218          * host RM is done with them */
2219         scd = PMIX_NEW(pmix_setup_caddy_t);
2220         if (NULL == scd) {
2221             rc = PMIX_ERR_NOMEM;
2222             goto cleanup;
2223         }
2224         PMIX_RETAIN(peer);
2225         scd->peer = peer;
2226         scd->codes = codes;
2227         scd->ncodes = ncodes;
2228         scd->info = info;
2229         scd->ninfo = ninfo;
2230         scd->opcbfunc = cbfunc;
2231         scd->cbdata = cbdata;
2232         if (PMIX_SUCCESS == (rc = pmix_host_server.register_events(scd->codes, scd->ncodes, scd->info, scd->ninfo, regevopcbfunc, scd))) {
2233             /* the host will call us back when completed */
2234             pmix_output_verbose(2, pmix_server_globals.event_output,
2235                                  "server register events: host server processing event registration");
2236             if (NULL != affected) {
2237                 free(affected);
2238             }
2239             return rc;
2240         } else if (PMIX_OPERATION_SUCCEEDED == rc) {
2241             /* we need to check cached notifications, but we want to ensure
2242              * that occurs _after_ the client returns from registering the
2243              * event handler in case the event is flagged for do_not_cache.
2244              * Setup an event to fire after we return as that means it will
2245              * occur after we send the registration response back to the client,
2246              * thus guaranteeing that the client will get their registration
2247              * callback prior to delivery of an event notification */
2248             PMIX_RETAIN(peer);
2249             scd->peer = peer;
2250             scd->procs = affected;
2251             scd->nprocs = naffected;
2252             scd->opcbfunc = NULL;
2253             scd->cbdata = NULL;
2254             PMIX_THREADSHIFT(scd, _check_cached_events);
2255             return rc;
2256         } else {
2257             /* host returned a genuine error and won't be calling the callback function */
2258             pmix_output_verbose(2, pmix_server_globals.event_output,
2259                                  "server register events: host server reg events returned rc =%d", rc);
2260             PMIX_RELEASE(scd);
2261             goto cleanup;
2262         }
2263     } else {
2264         rc = PMIX_OPERATION_SUCCEEDED;
2265         /* we need to check cached notifications, but we want to ensure
2266          * that occurs _after_ the client returns from registering the
2267          * event handler in case the event is flagged for do_not_cache.
2268          * Setup an event to fire after we return as that means it will
2269          * occur after we send the registration response back to the client,
2270          * thus guaranteeing that the client will get their registration
2271          * callback prior to delivery of an event notification */
2272         scd = PMIX_NEW(pmix_setup_caddy_t);
2273         PMIX_RETAIN(peer);
2274         scd->peer = peer;
2275         scd->codes = codes;
2276         scd->ncodes = ncodes;
2277         scd->procs = affected;
2278         scd->nprocs = naffected;
2279         scd->opcbfunc = NULL;
2280         scd->cbdata = NULL;
2281         PMIX_THREADSHIFT(scd, _check_cached_events);
2282         if (NULL != info) {
2283             PMIX_INFO_FREE(info, ninfo);
2284         }
2285         return rc;
2286     }
2287 
2288   cleanup:
2289     pmix_output_verbose(2, pmix_server_globals.event_output,
2290                         "server register events: ninfo =%lu rc =%d", ninfo, rc);
2291     if (NULL != info) {
2292         PMIX_INFO_FREE(info, ninfo);
2293     }
2294     if (NULL != codes) {
2295         free(codes);
2296     }
2297     if (NULL != affected) {
2298         PMIX_PROC_FREE(affected, naffected);
2299     }
2300     return rc;
2301 }
2302 
2303 void pmix_server_deregister_events(pmix_peer_t *peer,
2304                                    pmix_buffer_t *buf)
2305 {
2306     int32_t cnt;
2307     pmix_status_t rc, code;
2308     pmix_regevents_info_t *reginfo = NULL;
2309     pmix_regevents_info_t *reginfo_next;
2310     pmix_peer_events_info_t *prev;
2311 
2312     pmix_output_verbose(2, pmix_server_globals.event_output,
2313                         "recvd deregister events");
2314 
2315     /* unpack codes and process until done */
2316     cnt=1;
2317     PMIX_BFROPS_UNPACK(rc, peer, buf, &code, &cnt, PMIX_STATUS);
2318     while (PMIX_SUCCESS == rc) {
2319         PMIX_LIST_FOREACH_SAFE(reginfo, reginfo_next, &pmix_server_globals.events, pmix_regevents_info_t) {
2320             if (code == reginfo->code) {
2321                 /* found it - remove this peer from the list */
2322                 PMIX_LIST_FOREACH(prev, &reginfo->peers, pmix_peer_events_info_t) {
2323                     if (prev->peer == peer) {
2324                         /* found it */
2325                         pmix_list_remove_item(&reginfo->peers, &prev->super);
2326                         PMIX_RELEASE(prev);
2327                         break;
2328                     }
2329                 }
2330                 /* if all of the peers for this code are now gone, then remove it */
2331                 if (0 == pmix_list_get_size(&reginfo->peers)) {
2332                     pmix_list_remove_item(&pmix_server_globals.events, &reginfo->super);
2333                     /* if this was registered with the host, then deregister it */
2334                     PMIX_RELEASE(reginfo);
2335                 }
2336             }
2337         }
2338         cnt=1;
2339         PMIX_BFROPS_UNPACK(rc, peer, buf, &code, &cnt, PMIX_STATUS);
2340     }
2341     if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
2342         PMIX_ERROR_LOG(rc);
2343     }
2344 }
2345 
2346 
2347 static void local_cbfunc(pmix_status_t status, void *cbdata)
2348 {
2349     pmix_notify_caddy_t *cd = (pmix_notify_caddy_t*)cbdata;
2350 
2351     if (NULL != cd->cbfunc) {
2352         cd->cbfunc(status, cd->cbdata);
2353     }
2354     PMIX_RELEASE(cd);
2355 }
2356 
2357 static void intermed_step(pmix_status_t status, void *cbdata)
2358 {
2359     pmix_notify_caddy_t *cd = (pmix_notify_caddy_t*)cbdata;
2360     pmix_status_t rc;
2361 
2362     if (PMIX_SUCCESS != status) {
2363         rc = status;
2364         goto complete;
2365     }
2366 
2367     /* check the range directive - if it is LOCAL, then we are
2368      * done. Otherwise, it needs to go up to our
2369      * host for dissemination */
2370     if (PMIX_RANGE_LOCAL == cd->range) {
2371         rc = PMIX_SUCCESS;
2372         goto complete;
2373     }
2374 
2375     if (NULL == pmix_host_server.notify_event) {
2376         rc = PMIX_ERR_NOT_SUPPORTED;
2377         goto complete;
2378     }
2379 
2380     /* since our host is going to send this everywhere, it may well
2381      * come back to us. We already processed it, so mark it here
2382      * to ensure we don't do it again. We previously inserted the
2383      * PMIX_SERVER_INTERNAL_NOTIFY key at the very end of the
2384      * info array - just overwrite that position */
2385     PMIX_INFO_LOAD(&cd->info[cd->ninfo-1], PMIX_EVENT_PROXY, &pmix_globals.myid, PMIX_PROC);
2386 
2387     /* pass it to our host RM for distribution */
2388     rc = pmix_host_server.notify_event(cd->status, &cd->source, cd->range,
2389                                        cd->info, cd->ninfo, local_cbfunc, cd);
2390     if (PMIX_SUCCESS == rc) {
2391         /* let the callback function respond for us */
2392         return;
2393     }
2394     if (PMIX_OPERATION_SUCCEEDED == rc) {
2395         rc = PMIX_SUCCESS;  // local_cbfunc will not be called
2396     }
2397 
2398   complete:
2399     if (NULL != cd->cbfunc) {
2400         cd->cbfunc(rc, cd->cbdata);
2401     }
2402     PMIX_RELEASE(cd);
2403 }
2404 
2405 /* Receive an event sent by the client library. Since it was sent
2406  * to us by one client, we have to both process it locally to ensure
2407  * we notify all relevant local clients AND (assuming a range other
2408  * than LOCAL) deliver to our host, requesting that they send it
2409  * to all peer servers in the current session */
2410 pmix_status_t pmix_server_event_recvd_from_client(pmix_peer_t *peer,
2411                                                   pmix_buffer_t *buf,
2412                                                   pmix_op_cbfunc_t cbfunc,
2413                                                   void *cbdata)
2414 {
2415     int32_t cnt;
2416     pmix_status_t rc;
2417     pmix_notify_caddy_t *cd;
2418     size_t ninfo, n;
2419 
2420     pmix_output_verbose(2, pmix_server_globals.event_output,
2421                         "%s:%d recvd event notification from client %s:%d",
2422                         pmix_globals.myid.nspace, pmix_globals.myid.rank,
2423                         peer->info->pname.nspace, peer->info->pname.rank);
2424 
2425     cd = PMIX_NEW(pmix_notify_caddy_t);
2426     if (NULL == cd) {
2427         return PMIX_ERR_NOMEM;
2428     }
2429     cd->cbfunc = cbfunc;
2430     cd->cbdata = cbdata;
2431     /* set the source */
2432     PMIX_LOAD_PROCID(&cd->source, peer->info->pname.nspace, peer->info->pname.rank);
2433 
2434     /* unpack status */
2435     cnt = 1;
2436     PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->status, &cnt, PMIX_STATUS);
2437     if (PMIX_SUCCESS != rc) {
2438         PMIX_ERROR_LOG(rc);
2439         goto exit;
2440     }
2441 
2442     /* unpack the range */
2443     cnt = 1;
2444     PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->range, &cnt, PMIX_DATA_RANGE);
2445     if (PMIX_SUCCESS != rc) {
2446         PMIX_ERROR_LOG(rc);
2447         goto exit;
2448     }
2449 
2450     /* unpack the info keys */
2451     cnt = 1;
2452     PMIX_BFROPS_UNPACK(rc, peer, buf, &ninfo, &cnt, PMIX_SIZE);
2453     if (PMIX_SUCCESS != rc) {
2454         PMIX_ERROR_LOG(rc);
2455         goto exit;
2456     }
2457     cd->ninfo = ninfo + 1;
2458     PMIX_INFO_CREATE(cd->info, cd->ninfo);
2459     if (NULL == cd->info) {
2460         rc = PMIX_ERR_NOMEM;
2461         goto exit;
2462     }
2463     if (0 < ninfo) {
2464         cnt = ninfo;
2465         PMIX_BFROPS_UNPACK(rc, peer, buf, cd->info, &cnt, PMIX_INFO);
2466         if (PMIX_SUCCESS != rc) {
2467             PMIX_ERROR_LOG(rc);
2468             goto exit;
2469         }
2470     }
2471 
2472     /* check to see if we already processed this event - it is possible
2473      * that a local client "echoed" it back to us and we want to avoid
2474      * a potential infinite loop */
2475     for (n=0; n < ninfo; n++) {
2476         if (PMIX_CHECK_KEY(&cd->info[n], PMIX_SERVER_INTERNAL_NOTIFY)) {
2477             /* yep, we did - so don't do it again! */
2478             rc = PMIX_OPERATION_SUCCEEDED;
2479             goto exit;
2480         }
2481     }
2482 
2483     /* add an info object to mark that we recvd this internally */
2484     PMIX_INFO_LOAD(&cd->info[ninfo], PMIX_SERVER_INTERNAL_NOTIFY, NULL, PMIX_BOOL);
2485     /* process it */
2486     if (PMIX_SUCCESS != (rc = pmix_server_notify_client_of_event(cd->status,
2487                                                                  &cd->source,
2488                                                                  cd->range,
2489                                                                  cd->info, cd->ninfo,
2490                                                                  intermed_step, cd))) {
2491         goto exit;
2492     }
2493     if (PMIX_SUCCESS != rc) {
2494         PMIX_RELEASE(cd);
2495     }
2496     return rc;
2497 
2498   exit:
2499     PMIX_RELEASE(cd);
2500     return rc;
2501 }
2502 
2503 
2504 pmix_status_t pmix_server_query(pmix_peer_t *peer,
2505                                 pmix_buffer_t *buf,
2506                                 pmix_info_cbfunc_t cbfunc,
2507                                 void *cbdata)
2508 {
2509     int32_t cnt;
2510     pmix_status_t rc;
2511     pmix_query_caddy_t *cd;
2512     pmix_proc_t proc;
2513     pmix_cb_t cb;
2514     size_t n, p;
2515     pmix_list_t results;
2516     pmix_kval_t *kv, *kvnxt;
2517 
2518     pmix_output_verbose(2, pmix_server_globals.base_output,
2519                         "recvd query from client");
2520 
2521     cd = PMIX_NEW(pmix_query_caddy_t);
2522     if (NULL == cd) {
2523         return PMIX_ERR_NOMEM;
2524     }
2525     cd->cbdata = cbdata;
2526     /* unpack the number of queries */
2527     cnt = 1;
2528     PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->nqueries, &cnt, PMIX_SIZE);
2529     if (PMIX_SUCCESS != rc) {
2530         PMIX_ERROR_LOG(rc);
2531         PMIX_RELEASE(cd);
2532         return rc;
2533     }
2534     /* unpack the queries */
2535     if (0 < cd->nqueries) {
2536         PMIX_QUERY_CREATE(cd->queries, cd->nqueries);
2537         if (NULL == cd->queries) {
2538             rc = PMIX_ERR_NOMEM;
2539             PMIX_RELEASE(cd);
2540             return rc;
2541         }
2542         cnt = cd->nqueries;
2543         PMIX_BFROPS_UNPACK(rc, peer, buf, cd->queries, &cnt, PMIX_QUERY);
2544         if (PMIX_SUCCESS != rc) {
2545             PMIX_ERROR_LOG(rc);
2546             PMIX_RELEASE(cd);
2547             return rc;
2548         }
2549     }
2550 
2551     /* check the directives to see if they want us to refresh
2552      * the local cached results - if we wanted to optimize this
2553      * more, we would check each query and allow those that don't
2554      * want to be refreshed to be executed locally, and those that
2555      * did would be sent to the host. However, for now we simply
2556      * determine that if we don't have it, then ask for everything */
2557     memset(proc.nspace, 0, PMIX_MAX_NSLEN+1);
2558     proc.rank = PMIX_RANK_INVALID;
2559     PMIX_CONSTRUCT(&results, pmix_list_t);
2560 
2561     for (n=0; n < cd->nqueries; n++) {
2562         /* if they are asking for information on support, then go get it */
2563         if (0 == strcmp(cd->queries[n].keys[0], PMIX_QUERY_ATTRIBUTE_SUPPORT)) {
2564             /* we are already in an event, but shift it as the handler expects to */
2565             cd->cbfunc = cbfunc;
2566             PMIX_RETAIN(cd); // protect against early release
2567             PMIX_THREADSHIFT(cd, pmix_attrs_query_support);
2568             PMIX_LIST_DESTRUCT(&results);
2569             return PMIX_SUCCESS;
2570         }
2571         for (p=0; p < cd->queries[n].nqual; p++) {
2572             if (PMIX_CHECK_KEY(&cd->queries[n].qualifiers[p], PMIX_QUERY_REFRESH_CACHE)) {
2573                 if (PMIX_INFO_TRUE(&cd->queries[n].qualifiers[p])) {
2574                     PMIX_LIST_DESTRUCT(&results);
2575                     goto query;
2576                 }
2577             } else if (PMIX_CHECK_KEY(&cd->queries[n].qualifiers[p], PMIX_PROCID)) {
2578                 PMIX_LOAD_NSPACE(proc.nspace, cd->queries[n].qualifiers[p].value.data.proc->nspace);
2579                 proc.rank = cd->queries[n].qualifiers[p].value.data.proc->rank;
2580             } else if (PMIX_CHECK_KEY(&cd->queries[n].qualifiers[p], PMIX_NSPACE)) {
2581                 PMIX_LOAD_NSPACE(proc.nspace, cd->queries[n].qualifiers[p].value.data.string);
2582             } else if (PMIX_CHECK_KEY(&cd->queries[n].qualifiers[p], PMIX_RANK)) {
2583                 proc.rank = cd->queries[n].qualifiers[p].value.data.rank;
2584             } else if (PMIX_CHECK_KEY(&cd->queries[n].qualifiers[p], PMIX_HOSTNAME)) {
2585                 if (0 != strcmp(cd->queries[n].qualifiers[p].value.data.string, pmix_globals.hostname)) {
2586                     /* asking about a different host, so ask for the info */
2587                     PMIX_LIST_DESTRUCT(&results);
2588                     goto query;
2589                 }
2590             }
2591         }
2592         /* we get here if a refresh isn't required - first try a local
2593          * "get" on the data to see if we already have it */
2594         PMIX_CONSTRUCT(&cb, pmix_cb_t);
2595         cb.copy = false;
2596         /* set the proc */
2597         if (PMIX_RANK_INVALID == proc.rank &&
2598             0 == strlen(proc.nspace)) {
2599             /* use our id */
2600             cb.proc = &pmix_globals.myid;
2601         } else {
2602             if (0 == strlen(proc.nspace)) {
2603                 /* use our nspace */
2604                 PMIX_LOAD_NSPACE(cb.proc->nspace, pmix_globals.myid.nspace);
2605             }
2606             if (PMIX_RANK_INVALID == proc.rank) {
2607                 /* user the wildcard rank */
2608                 proc.rank = PMIX_RANK_WILDCARD;
2609             }
2610             cb.proc = &proc;
2611         }
2612         for (p=0; NULL != cd->queries[n].keys[p]; p++) {
2613             cb.key = cd->queries[n].keys[p];
2614             PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, &cb);
2615             if (PMIX_SUCCESS != rc) {
2616                 /* needs to be passed to the host */
2617                 PMIX_LIST_DESTRUCT(&results);
2618                 PMIX_DESTRUCT(&cb);
2619                 goto query;
2620             }
2621             /* need to retain this result */
2622             PMIX_LIST_FOREACH_SAFE(kv, kvnxt, &cb.kvs, pmix_kval_t) {
2623                 pmix_list_remove_item(&cb.kvs, &kv->super);
2624                 pmix_list_append(&results, &kv->super);
2625             }
2626             PMIX_DESTRUCT(&cb);
2627         }
2628     }
2629 
2630     /* if we get here, then all queries were completely locally
2631      * resolved, so construct the results for return */
2632     rc = PMIX_ERR_NOT_FOUND;
2633     if (0 < (cd->ninfo = pmix_list_get_size(&results))) {
2634         PMIX_INFO_CREATE(cd->info, cd->ninfo);
2635         n = 0;
2636         PMIX_LIST_FOREACH_SAFE(kv, kvnxt, &results, pmix_kval_t) {
2637             PMIX_LOAD_KEY(cd->info[n].key, kv->key);
2638             rc = pmix_value_xfer(&cd->info[n].value, kv->value);
2639             if (PMIX_SUCCESS != rc) {
2640                 PMIX_INFO_FREE(cd->info, cd->ninfo);
2641                 cd->info = NULL;
2642                 cd->ninfo = 0;
2643                 break;
2644             }
2645             ++n;
2646         }
2647     }
2648     /* done with the list of results */
2649     PMIX_LIST_DESTRUCT(&results);
2650     /* we can just call the cbfunc here as we are already
2651      * in an event - let our internal cbfunc do a threadshift
2652      * if necessary */
2653     cbfunc(PMIX_SUCCESS, cd->info, cd->ninfo, cd, NULL, NULL);
2654     return PMIX_SUCCESS;
2655 
2656   query:
2657     if (NULL == pmix_host_server.query) {
2658         PMIX_RELEASE(cd);
2659         return PMIX_ERR_NOT_SUPPORTED;
2660     }
2661 
2662     /* setup the requesting peer name */
2663     PMIX_LOAD_PROCID(&proc, peer->info->pname.nspace, peer->info->pname.rank);
2664 
2665     /* ask the host for the info */
2666     if (PMIX_SUCCESS != (rc = pmix_host_server.query(&proc, cd->queries, cd->nqueries,
2667                                                      cbfunc, cd))) {
2668         PMIX_RELEASE(cd);
2669     }
2670     return rc;
2671 }
2672 
2673 static void logcbfn(pmix_status_t status, void *cbdata)
2674 {
2675     pmix_shift_caddy_t *cd = (pmix_shift_caddy_t*)cbdata;
2676 
2677     if (NULL != cd->cbfunc.opcbfn) {
2678         cd->cbfunc.opcbfn(status, cd->cbdata);
2679     }
2680     PMIX_RELEASE(cd);
2681 }
2682 pmix_status_t pmix_server_log(pmix_peer_t *peer,
2683                               pmix_buffer_t *buf,
2684                               pmix_op_cbfunc_t cbfunc,
2685                               void *cbdata)
2686 {
2687     int32_t cnt;
2688     pmix_status_t rc;
2689     pmix_shift_caddy_t *cd;
2690     pmix_proc_t proc;
2691     time_t timestamp;
2692 
2693     pmix_output_verbose(2, pmix_server_globals.base_output,
2694                         "recvd log from client");
2695 
2696     /* we need to deliver this to our internal log capability,
2697      * which may upcall it to our host if it cannot process
2698      * the request itself */
2699 
2700     /* setup the requesting peer name */
2701     pmix_strncpy(proc.nspace, peer->info->pname.nspace, PMIX_MAX_NSLEN);
2702     proc.rank = peer->info->pname.rank;
2703 
2704     cd = PMIX_NEW(pmix_shift_caddy_t);
2705     if (NULL == cd) {
2706         return PMIX_ERR_NOMEM;
2707     }
2708     cd->cbfunc.opcbfn = cbfunc;
2709     cd->cbdata = cbdata;
2710     /* unpack the timestamp */
2711     cnt = 1;
2712     PMIX_BFROPS_UNPACK(rc, peer, buf, &timestamp, &cnt, PMIX_TIME);
2713     if (PMIX_SUCCESS != rc) {
2714         PMIX_ERROR_LOG(rc);
2715         goto exit;
2716     }
2717 
2718     /* unpack the number of data */
2719     cnt = 1;
2720     PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->ninfo, &cnt, PMIX_SIZE);
2721     if (PMIX_SUCCESS != rc) {
2722         PMIX_ERROR_LOG(rc);
2723         goto exit;
2724     }
2725     cnt = cd->ninfo;
2726     PMIX_INFO_CREATE(cd->info, cd->ninfo);
2727     /* unpack the data */
2728     if (0 < cnt) {
2729         PMIX_BFROPS_UNPACK(rc, peer, buf, cd->info, &cnt, PMIX_INFO);
2730         if (PMIX_SUCCESS != rc) {
2731             PMIX_ERROR_LOG(rc);
2732             goto exit;
2733         }
2734     }
2735     /* unpack the number of directives */
2736     cnt = 1;
2737     PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->ndirs, &cnt, PMIX_SIZE);
2738     if (PMIX_SUCCESS != rc) {
2739         PMIX_ERROR_LOG(rc);
2740         goto exit;
2741     }
2742     cnt = cd->ndirs;
2743     /* always add the source to the directives so we
2744      * can tell downstream if this gets "upcalled" to
2745      * our host for relay */
2746     cd->ndirs = cnt + 1;
2747     /* if a timestamp was sent, then we add it to the directives */
2748     if (0 < timestamp) {
2749         cd->ndirs++;
2750         PMIX_INFO_CREATE(cd->directives, cd->ndirs);
2751         PMIX_INFO_LOAD(&cd->directives[cnt], PMIX_LOG_SOURCE, &proc, PMIX_PROC);
2752         PMIX_INFO_LOAD(&cd->directives[cnt+1], PMIX_LOG_TIMESTAMP, &timestamp, PMIX_TIME);
2753     } else {
2754         PMIX_INFO_CREATE(cd->directives, cd->ndirs);
2755         PMIX_INFO_LOAD(&cd->directives[cnt], PMIX_LOG_SOURCE, &proc, PMIX_PROC);
2756     }
2757 
2758     /* unpack the directives */
2759     if (0 < cnt) {
2760         PMIX_BFROPS_UNPACK(rc, peer, buf, cd->directives, &cnt, PMIX_INFO);
2761         if (PMIX_SUCCESS != rc) {
2762             PMIX_ERROR_LOG(rc);
2763             goto exit;
2764         }
2765     }
2766 
2767     /* pass it down */
2768     rc = pmix_plog.log(&proc, cd->info, cd->ninfo,
2769                        cd->directives, cd->ndirs,
2770                        logcbfn, cd);
2771     return rc;
2772 
2773   exit:
2774     PMIX_RELEASE(cd);
2775     return rc;
2776 }
2777 
2778 pmix_status_t pmix_server_alloc(pmix_peer_t *peer,
2779                                 pmix_buffer_t *buf,
2780                                 pmix_info_cbfunc_t cbfunc,
2781                                 void *cbdata)
2782 {
2783     int32_t cnt;
2784     pmix_status_t rc;
2785     pmix_query_caddy_t *cd;
2786     pmix_proc_t proc;
2787     pmix_alloc_directive_t directive;
2788 
2789     pmix_output_verbose(2, pmix_server_globals.base_output,
2790                         "recvd query from client");
2791 
2792     if (NULL == pmix_host_server.allocate) {
2793         return PMIX_ERR_NOT_SUPPORTED;
2794     }
2795 
2796     cd = PMIX_NEW(pmix_query_caddy_t);
2797     if (NULL == cd) {
2798         return PMIX_ERR_NOMEM;
2799     }
2800     cd->cbdata = cbdata;
2801 
2802     /* unpack the directive */
2803     cnt = 1;
2804     PMIX_BFROPS_UNPACK(rc, peer, buf, &directive, &cnt, PMIX_ALLOC_DIRECTIVE);
2805     if (PMIX_SUCCESS != rc) {
2806         PMIX_ERROR_LOG(rc);
2807         goto exit;
2808     }
2809 
2810     /* unpack the number of info objects */
2811     cnt = 1;
2812     PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->ninfo, &cnt, PMIX_SIZE);
2813     if (PMIX_SUCCESS != rc) {
2814         PMIX_ERROR_LOG(rc);
2815         goto exit;
2816     }
2817     /* unpack the info */
2818     if (0 < cd->ninfo) {
2819         PMIX_INFO_CREATE(cd->info, cd->ninfo);
2820         cnt = cd->ninfo;
2821         PMIX_BFROPS_UNPACK(rc, peer, buf, cd->info, &cnt, PMIX_INFO);
2822         if (PMIX_SUCCESS != rc) {
2823             PMIX_ERROR_LOG(rc);
2824             goto exit;
2825         }
2826     }
2827 
2828     /* setup the requesting peer name */
2829     pmix_strncpy(proc.nspace, peer->info->pname.nspace, PMIX_MAX_NSLEN);
2830     proc.rank = peer->info->pname.rank;
2831 
2832     /* ask the host to execute the request */
2833     if (PMIX_SUCCESS != (rc = pmix_host_server.allocate(&proc, directive,
2834                                                         cd->info, cd->ninfo,
2835                                                         cbfunc, cd))) {
2836         goto exit;
2837     }
2838     return PMIX_SUCCESS;
2839 
2840   exit:
2841     PMIX_RELEASE(cd);
2842     return rc;
2843 }
2844 
2845 typedef struct {
2846     pmix_list_item_t super;
2847     pmix_epilog_t *epi;
2848 } pmix_srvr_epi_caddy_t;
2849 static PMIX_CLASS_INSTANCE(pmix_srvr_epi_caddy_t,
2850                            pmix_list_item_t,
2851                            NULL, NULL);
2852 
2853 pmix_status_t pmix_server_job_ctrl(pmix_peer_t *peer,
2854                                    pmix_buffer_t *buf,
2855                                    pmix_info_cbfunc_t cbfunc,
2856                                    void *cbdata)
2857 {
2858     int32_t cnt, m;
2859     pmix_status_t rc;
2860     pmix_query_caddy_t *cd;
2861     pmix_namespace_t *nptr, *tmp;
2862     pmix_peer_t *pr;
2863     pmix_proc_t proc;
2864     size_t n;
2865     bool recurse = false, leave_topdir = false, duplicate;
2866     pmix_list_t cachedirs, cachefiles, ignorefiles, epicache;
2867     pmix_srvr_epi_caddy_t *epicd = NULL;
2868     pmix_cleanup_file_t *cf, *cf2, *cfptr;
2869     pmix_cleanup_dir_t *cdir, *cdir2, *cdirptr;
2870 
2871     pmix_output_verbose(2, pmix_server_globals.base_output,
2872                         "recvd job control request from client");
2873 
2874     if (NULL == pmix_host_server.job_control) {
2875         return PMIX_ERR_NOT_SUPPORTED;
2876     }
2877 
2878     cd = PMIX_NEW(pmix_query_caddy_t);
2879     if (NULL == cd) {
2880         return PMIX_ERR_NOMEM;
2881     }
2882     cd->cbdata = cbdata;
2883 
2884     PMIX_CONSTRUCT(&epicache, pmix_list_t);
2885 
2886     /* unpack the number of targets */
2887     cnt = 1;
2888     PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->ntargets, &cnt, PMIX_SIZE);
2889     if (PMIX_SUCCESS != rc) {
2890         PMIX_ERROR_LOG(rc);
2891         goto exit;
2892     }
2893     if (0 < cd->ntargets) {
2894         PMIX_PROC_CREATE(cd->targets, cd->ntargets);
2895         cnt = cd->ntargets;
2896         PMIX_BFROPS_UNPACK(rc, peer, buf, cd->targets, &cnt, PMIX_PROC);
2897         if (PMIX_SUCCESS != rc) {
2898             PMIX_ERROR_LOG(rc);
2899             goto exit;
2900         }
2901     }
2902 
2903     /* check targets to find proper place to put any epilog requests */
2904     if (NULL == cd->targets) {
2905         epicd = PMIX_NEW(pmix_srvr_epi_caddy_t);
2906         epicd->epi = &peer->nptr->epilog;
2907         pmix_list_append(&epicache, &epicd->super);
2908     } else {
2909         for (n=0; n < cd->ntargets; n++) {
2910             /* find the nspace of this proc */
2911             nptr = NULL;
2912             PMIX_LIST_FOREACH(tmp, &pmix_server_globals.nspaces, pmix_namespace_t) {
2913                 if (0 == strcmp(tmp->nspace, cd->targets[n].nspace)) {
2914                     nptr = tmp;
2915                     break;
2916                 }
2917             }
2918             if (NULL == nptr) {
2919                 nptr = PMIX_NEW(pmix_namespace_t);
2920                 if (NULL == nptr) {
2921                     rc = PMIX_ERR_NOMEM;
2922                     goto exit;
2923                 }
2924                 nptr->nspace = strdup(cd->targets[n].nspace);
2925                 pmix_list_append(&pmix_server_globals.nspaces, &nptr->super);
2926             }
2927             /* if the rank is wildcard, then we use the epilog for the nspace */
2928             if (PMIX_RANK_WILDCARD == cd->targets[n].rank) {
2929                 epicd = PMIX_NEW(pmix_srvr_epi_caddy_t);
2930                 epicd->epi = &nptr->epilog;
2931                 pmix_list_append(&epicache, &epicd->super);
2932             } else {
2933                 /* we need to find the precise peer - we can only
2934                  * do cleanup for a local client */
2935                 for (m=0; m < pmix_server_globals.clients.size; m++) {
2936                     if (NULL == (pr = (pmix_peer_t*)pmix_pointer_array_get_item(&pmix_server_globals.clients, m))) {
2937                         continue;
2938                     }
2939                     if (0 != strncmp(pr->info->pname.nspace, cd->targets[n].nspace, PMIX_MAX_NSLEN)) {
2940                         continue;
2941                     }
2942                     if (pr->info->pname.rank == cd->targets[n].rank) {
2943                         epicd = PMIX_NEW(pmix_srvr_epi_caddy_t);
2944                         epicd->epi = &pr->epilog;
2945                         pmix_list_append(&epicache, &epicd->super);
2946                         break;
2947                     }
2948                 }
2949             }
2950         }
2951     }
2952 
2953     /* unpack the number of info objects */
2954     cnt = 1;
2955     PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->ninfo, &cnt, PMIX_SIZE);
2956     if (PMIX_SUCCESS != rc) {
2957         PMIX_ERROR_LOG(rc);
2958         goto exit;
2959     }
2960     /* unpack the info */
2961     if (0 < cd->ninfo) {
2962         PMIX_INFO_CREATE(cd->info, cd->ninfo);
2963         cnt = cd->ninfo;
2964         PMIX_BFROPS_UNPACK(rc, peer, buf, cd->info, &cnt, PMIX_INFO);
2965         if (PMIX_SUCCESS != rc) {
2966             PMIX_ERROR_LOG(rc);
2967             goto exit;
2968         }
2969     }
2970 
2971     /* if this includes a request for post-termination cleanup, we handle
2972      * that request ourselves */
2973     PMIX_CONSTRUCT(&cachedirs, pmix_list_t);
2974     PMIX_CONSTRUCT(&cachefiles, pmix_list_t);
2975     PMIX_CONSTRUCT(&ignorefiles, pmix_list_t);
2976 
2977     cnt = 0;  // track how many infos are cleanup related
2978     for (n=0; n < cd->ninfo; n++) {
2979         if (0 == strncmp(cd->info[n].key, PMIX_REGISTER_CLEANUP, PMIX_MAX_KEYLEN)) {
2980             ++cnt;
2981             if (PMIX_STRING != cd->info[n].value.type ||
2982                 NULL == cd->info[n].value.data.string) {
2983                 /* return an error */
2984                 rc = PMIX_ERR_BAD_PARAM;
2985                 goto exit;
2986             }
2987             cf = PMIX_NEW(pmix_cleanup_file_t);
2988             if (NULL == cf) {
2989                 /* return an error */
2990                 rc = PMIX_ERR_NOMEM;
2991                 goto exit;
2992             }
2993             cf->path = strdup(cd->info[n].value.data.string);
2994             pmix_list_append(&cachefiles, &cf->super);
2995         } else if (0 == strncmp(cd->info[n].key, PMIX_REGISTER_CLEANUP_DIR, PMIX_MAX_KEYLEN)) {
2996             ++cnt;
2997             if (PMIX_STRING != cd->info[n].value.type ||
2998                 NULL == cd->info[n].value.data.string) {
2999                 /* return an error */
3000                 rc = PMIX_ERR_BAD_PARAM;
3001                 goto exit;
3002             }
3003             cdir = PMIX_NEW(pmix_cleanup_dir_t);
3004             if (NULL == cdir) {
3005                 /* return an error */
3006                 rc = PMIX_ERR_NOMEM;
3007                 goto exit;
3008             }
3009             cdir->path = strdup(cd->info[n].value.data.string);
3010             pmix_list_append(&cachedirs, &cdir->super);
3011         } else if (0 == strncmp(cd->info[n].key, PMIX_CLEANUP_RECURSIVE, PMIX_MAX_KEYLEN)) {
3012             recurse = PMIX_INFO_TRUE(&cd->info[n]);
3013             ++cnt;
3014         } else if (0 == strncmp(cd->info[n].key, PMIX_CLEANUP_IGNORE, PMIX_MAX_KEYLEN)) {
3015             if (PMIX_STRING != cd->info[n].value.type ||
3016                 NULL == cd->info[n].value.data.string) {
3017                 /* return an error */
3018                 rc = PMIX_ERR_BAD_PARAM;
3019                 goto exit;
3020             }
3021             cf = PMIX_NEW(pmix_cleanup_file_t);
3022             if (NULL == cf) {
3023                 /* return an error */
3024                 rc = PMIX_ERR_NOMEM;
3025                 goto exit;
3026             }
3027             cf->path = strdup(cd->info[n].value.data.string);
3028             pmix_list_append(&ignorefiles, &cf->super);
3029             ++cnt;
3030         } else if (0 == strncmp(cd->info[n].key, PMIX_CLEANUP_LEAVE_TOPDIR, PMIX_MAX_KEYLEN)) {
3031             leave_topdir = PMIX_INFO_TRUE(&cd->info[n]);
3032             ++cnt;
3033         }
3034     }
3035     if (0 < cnt) {
3036         /* handle any ignore directives first */
3037         PMIX_LIST_FOREACH(cf, &ignorefiles, pmix_cleanup_file_t) {
3038             PMIX_LIST_FOREACH(epicd, &epicache, pmix_srvr_epi_caddy_t) {
3039                 /* scan the existing list of files for any duplicate */
3040                 duplicate = false;
3041                 PMIX_LIST_FOREACH(cf2, &epicd->epi->cleanup_files, pmix_cleanup_file_t) {
3042                     if (0 == strcmp(cf2->path, cf->path)) {
3043                         duplicate = true;
3044                         break;
3045                     }
3046                 }
3047                 if (!duplicate) {
3048                     /* append it to the end of the list */
3049                     cfptr = PMIX_NEW(pmix_cleanup_file_t);
3050                     cfptr->path = strdup(cf->path);
3051                     pmix_list_append(&epicd->epi->ignores, &cf->super);
3052                 }
3053             }
3054         }
3055         PMIX_LIST_DESTRUCT(&ignorefiles);
3056         /* now look at the directories */
3057         PMIX_LIST_FOREACH(cdir, &cachedirs, pmix_cleanup_dir_t) {
3058             PMIX_LIST_FOREACH(epicd, &epicache, pmix_srvr_epi_caddy_t) {
3059                 /* scan the existing list of directories for any duplicate */
3060                 duplicate = false;
3061                 PMIX_LIST_FOREACH(cdir2, &epicd->epi->cleanup_dirs, pmix_cleanup_dir_t) {
3062                     if (0 == strcmp(cdir2->path, cdir->path)) {
3063                         /* duplicate - check for difference in flags per RFC
3064                          * precedence rules */
3065                         if (!cdir->recurse && recurse) {
3066                             cdir->recurse = recurse;
3067                         }
3068                         if (!cdir->leave_topdir && leave_topdir) {
3069                             cdir->leave_topdir = leave_topdir;
3070                         }
3071                         duplicate = true;
3072                         break;
3073                     }
3074                 }
3075                 if (!duplicate) {
3076                     /* check for conflict with ignore */
3077                     PMIX_LIST_FOREACH(cf, &epicd->epi->ignores, pmix_cleanup_file_t) {
3078                         if (0 == strcmp(cf->path, cdir->path)) {
3079                             /* return an error */
3080                             rc = PMIX_ERR_CONFLICTING_CLEANUP_DIRECTIVES;
3081                             PMIX_LIST_DESTRUCT(&cachedirs);
3082                             PMIX_LIST_DESTRUCT(&cachefiles);
3083                             goto exit;
3084                         }
3085                     }
3086                     /* append it to the end of the list */
3087                     cdirptr = PMIX_NEW(pmix_cleanup_dir_t);
3088                     cdirptr->path = strdup(cdir->path);
3089                     cdirptr->recurse = recurse;
3090                     cdirptr->leave_topdir = leave_topdir;
3091                     pmix_list_append(&epicd->epi->cleanup_dirs, &cdirptr->super);
3092                 }
3093             }
3094         }
3095         PMIX_LIST_DESTRUCT(&cachedirs);
3096         PMIX_LIST_FOREACH(cf, &cachefiles, pmix_cleanup_file_t) {
3097             PMIX_LIST_FOREACH(epicd, &epicache, pmix_srvr_epi_caddy_t) {
3098                 /* scan the existing list of files for any duplicate */
3099                 duplicate = false;
3100                 PMIX_LIST_FOREACH(cf2, &epicd->epi->cleanup_files, pmix_cleanup_file_t) {
3101                     if (0 == strcmp(cf2->path, cf->path)) {
3102                         duplicate = true;
3103                         break;
3104                     }
3105                 }
3106                 if (!duplicate) {
3107                     /* check for conflict with ignore */
3108                     PMIX_LIST_FOREACH(cf2, &epicd->epi->ignores, pmix_cleanup_file_t) {
3109                         if (0 == strcmp(cf->path, cf2->path)) {
3110                             /* return an error */
3111                             rc = PMIX_ERR_CONFLICTING_CLEANUP_DIRECTIVES;
3112                             PMIX_LIST_DESTRUCT(&cachedirs);
3113                             PMIX_LIST_DESTRUCT(&cachefiles);
3114                             goto exit;
3115                         }
3116                     }
3117                     /* append it to the end of the list */
3118                     cfptr = PMIX_NEW(pmix_cleanup_file_t);
3119                     cfptr->path = strdup(cf->path);
3120                     pmix_list_append(&epicd->epi->cleanup_files, &cfptr->super);
3121                 }
3122             }
3123         }
3124         PMIX_LIST_DESTRUCT(&cachefiles);
3125         if (cnt == (int)cd->ninfo) {
3126             /* nothing more to do */
3127             rc = PMIX_OPERATION_SUCCEEDED;
3128             goto exit;
3129         }
3130     }
3131 
3132     /* setup the requesting peer name */
3133     pmix_strncpy(proc.nspace, peer->info->pname.nspace, PMIX_MAX_NSLEN);
3134     proc.rank = peer->info->pname.rank;
3135 
3136     /* ask the host to execute the request */
3137     if (PMIX_SUCCESS != (rc = pmix_host_server.job_control(&proc,
3138                                                            cd->targets, cd->ntargets,
3139                                                            cd->info, cd->ninfo,
3140                                                            cbfunc, cd))) {
3141         goto exit;
3142     }
3143     PMIX_LIST_DESTRUCT(&epicache);
3144     return PMIX_SUCCESS;
3145 
3146   exit:
3147     PMIX_RELEASE(cd);
3148     PMIX_LIST_DESTRUCT(&epicache);
3149     return rc;
3150 }
3151 
3152 pmix_status_t pmix_server_monitor(pmix_peer_t *peer,
3153                                   pmix_buffer_t *buf,
3154                                   pmix_info_cbfunc_t cbfunc,
3155                                   void *cbdata)
3156 {
3157     int32_t cnt;
3158     pmix_info_t monitor;
3159     pmix_status_t rc, error;
3160     pmix_query_caddy_t *cd;
3161     pmix_proc_t proc;
3162 
3163     pmix_output_verbose(2, pmix_server_globals.base_output,
3164                         "recvd monitor request from client");
3165 
3166 
3167     cd = PMIX_NEW(pmix_query_caddy_t);
3168     if (NULL == cd) {
3169         return PMIX_ERR_NOMEM;
3170     }
3171     cd->cbdata = cbdata;
3172 
3173     /* unpack what is to be monitored */
3174     PMIX_INFO_CONSTRUCT(&monitor);
3175     cnt = 1;
3176     PMIX_BFROPS_UNPACK(rc, peer, buf, &monitor, &cnt, PMIX_INFO);
3177     if (PMIX_SUCCESS != rc) {
3178         PMIX_ERROR_LOG(rc);
3179         goto exit;
3180     }
3181 
3182     /* unpack the error code */
3183     cnt = 1;
3184     PMIX_BFROPS_UNPACK(rc, peer, buf, &error, &cnt, PMIX_STATUS);
3185     if (PMIX_SUCCESS != rc) {
3186         PMIX_ERROR_LOG(rc);
3187         goto exit;
3188     }
3189 
3190     /* unpack the number of directives */
3191     cnt = 1;
3192     PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->ninfo, &cnt, PMIX_SIZE);
3193     if (PMIX_SUCCESS != rc) {
3194         PMIX_ERROR_LOG(rc);
3195         goto exit;
3196     }
3197     /* unpack the directives */
3198     if (0 < cd->ninfo) {
3199         PMIX_INFO_CREATE(cd->info, cd->ninfo);
3200         cnt = cd->ninfo;
3201         PMIX_BFROPS_UNPACK(rc, peer, buf, cd->info, &cnt, PMIX_INFO);
3202         if (PMIX_SUCCESS != rc) {
3203             PMIX_ERROR_LOG(rc);
3204             goto exit;
3205         }
3206     }
3207 
3208     /* see if they are requesting one of the monitoring
3209      * methods we internally support */
3210     rc = pmix_psensor.start(peer, error, &monitor, cd->info, cd->ninfo);
3211     if (PMIX_SUCCESS == rc) {
3212         rc = PMIX_OPERATION_SUCCEEDED;
3213         goto exit;
3214     }
3215     if (PMIX_ERR_NOT_SUPPORTED != rc) {
3216         goto exit;
3217     }
3218 
3219     /* if we don't internally support it, see if
3220      * our host does */
3221     if (NULL == pmix_host_server.monitor) {
3222         rc = PMIX_ERR_NOT_SUPPORTED;
3223         goto exit;
3224     }
3225 
3226     /* setup the requesting peer name */
3227     pmix_strncpy(proc.nspace, peer->info->pname.nspace, PMIX_MAX_NSLEN);
3228     proc.rank = peer->info->pname.rank;
3229 
3230     /* ask the host to execute the request */
3231     if (PMIX_SUCCESS != (rc = pmix_host_server.monitor(&proc, &monitor, error,
3232                                                        cd->info, cd->ninfo,
3233                                                        cbfunc, cd))) {
3234         goto exit;
3235     }
3236     return PMIX_SUCCESS;
3237 
3238   exit:
3239     PMIX_INFO_DESTRUCT(&monitor);
3240     PMIX_RELEASE(cd);
3241     return rc;
3242 }
3243 
3244 pmix_status_t pmix_server_get_credential(pmix_peer_t *peer,
3245                                          pmix_buffer_t *buf,
3246                                          pmix_credential_cbfunc_t cbfunc,
3247                                          void *cbdata)
3248 {
3249     int32_t cnt;
3250     pmix_status_t rc;
3251     pmix_query_caddy_t *cd;
3252     pmix_proc_t proc;
3253 
3254     pmix_output_verbose(2, pmix_globals.debug_output,
3255                         "recvd get credential request from client");
3256 
3257     if (NULL == pmix_host_server.get_credential) {
3258         return PMIX_ERR_NOT_SUPPORTED;
3259     }
3260 
3261     cd = PMIX_NEW(pmix_query_caddy_t);
3262     if (NULL == cd) {
3263         return PMIX_ERR_NOMEM;
3264     }
3265     cd->cbdata = cbdata;
3266 
3267     /* unpack the number of directives */
3268     cnt = 1;
3269     PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->ninfo, &cnt, PMIX_SIZE);
3270     if (PMIX_SUCCESS != rc) {
3271         PMIX_ERROR_LOG(rc);
3272         goto exit;
3273     }
3274     /* unpack the directives */
3275     if (0 < cd->ninfo) {
3276         PMIX_INFO_CREATE(cd->info, cd->ninfo);
3277         cnt = cd->ninfo;
3278         PMIX_BFROPS_UNPACK(rc, peer, buf, cd->info, &cnt, PMIX_INFO);
3279         if (PMIX_SUCCESS != rc) {
3280             PMIX_ERROR_LOG(rc);
3281             goto exit;
3282         }
3283     }
3284 
3285     /* setup the requesting peer name */
3286     pmix_strncpy(proc.nspace, peer->info->pname.nspace, PMIX_MAX_NSLEN);
3287     proc.rank = peer->info->pname.rank;
3288 
3289     /* ask the host to execute the request */
3290     if (PMIX_SUCCESS != (rc = pmix_host_server.get_credential(&proc, cd->info, cd->ninfo,
3291                                                               cbfunc, cd))) {
3292         goto exit;
3293     }
3294     return PMIX_SUCCESS;
3295 
3296   exit:
3297     PMIX_RELEASE(cd);
3298     return rc;
3299 }
3300 
3301 pmix_status_t pmix_server_validate_credential(pmix_peer_t *peer,
3302                                               pmix_buffer_t *buf,
3303                                               pmix_validation_cbfunc_t cbfunc,
3304                                               void *cbdata)
3305 {
3306     int32_t cnt;
3307     pmix_status_t rc;
3308     pmix_query_caddy_t *cd;
3309     pmix_proc_t proc;
3310 
3311     pmix_output_verbose(2, pmix_globals.debug_output,
3312                         "recvd validate credential request from client");
3313 
3314     if (NULL == pmix_host_server.validate_credential) {
3315         return PMIX_ERR_NOT_SUPPORTED;
3316     }
3317 
3318     cd = PMIX_NEW(pmix_query_caddy_t);
3319     if (NULL == cd) {
3320         return PMIX_ERR_NOMEM;
3321     }
3322     cd->cbdata = cbdata;
3323 
3324     /* unpack the credential */
3325     cnt = 1;
3326     PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->bo, &cnt, PMIX_BYTE_OBJECT);
3327     if (PMIX_SUCCESS != rc) {
3328         PMIX_ERROR_LOG(rc);
3329         goto exit;
3330     }
3331 
3332     /* unpack the number of directives */
3333     cnt = 1;
3334     PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->ninfo, &cnt, PMIX_SIZE);
3335     if (PMIX_SUCCESS != rc) {
3336         PMIX_ERROR_LOG(rc);
3337         goto exit;
3338     }
3339     /* unpack the directives */
3340     if (0 < cd->ninfo) {
3341         PMIX_INFO_CREATE(cd->info, cd->ninfo);
3342         cnt = cd->ninfo;
3343         PMIX_BFROPS_UNPACK(rc, peer, buf, cd->info, &cnt, PMIX_INFO);
3344         if (PMIX_SUCCESS != rc) {
3345             PMIX_ERROR_LOG(rc);
3346             goto exit;
3347         }
3348     }
3349 
3350     /* setup the requesting peer name */
3351     pmix_strncpy(proc.nspace, peer->info->pname.nspace, PMIX_MAX_NSLEN);
3352     proc.rank = peer->info->pname.rank;
3353 
3354     /* ask the host to execute the request */
3355     if (PMIX_SUCCESS != (rc = pmix_host_server.validate_credential(&proc, &cd->bo,
3356                                                                    cd->info, cd->ninfo,
3357                                                                    cbfunc, cd))) {
3358         goto exit;
3359     }
3360     return PMIX_SUCCESS;
3361 
3362   exit:
3363     PMIX_RELEASE(cd);
3364     return rc;
3365 }
3366 
3367 pmix_status_t pmix_server_iofreg(pmix_peer_t *peer,
3368                                  pmix_buffer_t *buf,
3369                                  pmix_op_cbfunc_t cbfunc,
3370                                  void *cbdata)
3371 {
3372     int32_t cnt;
3373     pmix_status_t rc;
3374     pmix_setup_caddy_t *cd;
3375     pmix_iof_req_t *req;
3376     bool notify, match;
3377     size_t n;
3378     pmix_buffer_t *msg;
3379     pmix_iof_cache_t *iof, *ionext;
3380 
3381     pmix_output_verbose(2, pmix_server_globals.iof_output,
3382                         "recvd IOF PULL request from client");
3383 
3384     if (NULL == pmix_host_server.iof_pull) {
3385         return PMIX_ERR_NOT_SUPPORTED;
3386     }
3387 
3388     cd = PMIX_NEW(pmix_setup_caddy_t);
3389     if (NULL == cd) {
3390         return PMIX_ERR_NOMEM;
3391     }
3392     cd->cbdata = cbdata;  // this is the pmix_server_caddy_t
3393 
3394     /* unpack the number of procs */
3395     cnt = 1;
3396     PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->nprocs, &cnt, PMIX_SIZE);
3397     if (PMIX_SUCCESS != rc) {
3398         PMIX_ERROR_LOG(rc);
3399         goto exit;
3400     }
3401     /* unpack the procs */
3402     if (0 < cd->nprocs) {
3403         PMIX_PROC_CREATE(cd->procs, cd->nprocs);
3404         cnt = cd->nprocs;
3405         PMIX_BFROPS_UNPACK(rc, peer, buf, cd->procs, &cnt, PMIX_PROC);
3406         if (PMIX_SUCCESS != rc) {
3407             PMIX_ERROR_LOG(rc);
3408             goto exit;
3409         }
3410     }
3411 
3412     /* unpack the number of directives */
3413     cnt = 1;
3414     PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->ninfo, &cnt, PMIX_SIZE);
3415     if (PMIX_SUCCESS != rc) {
3416         PMIX_ERROR_LOG(rc);
3417         goto exit;
3418     }
3419     /* unpack the directives */
3420     if (0 < cd->ninfo) {
3421         PMIX_INFO_CREATE(cd->info, cd->ninfo);
3422         cnt = cd->ninfo;
3423         PMIX_BFROPS_UNPACK(rc, peer, buf, cd->info, &cnt, PMIX_INFO);
3424         if (PMIX_SUCCESS != rc) {
3425             PMIX_ERROR_LOG(rc);
3426             goto exit;
3427         }
3428     }
3429 
3430     /* unpack the channels */
3431     cnt = 1;
3432     PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->channels, &cnt, PMIX_IOF_CHANNEL);
3433     if (PMIX_SUCCESS != rc) {
3434         PMIX_ERROR_LOG(rc);
3435         goto exit;
3436     }
3437 
3438     /* check to see if we have already registered this source/channel combination */
3439     notify = false;
3440     for (n=0; n < cd->nprocs; n++) {
3441         match = false;
3442         PMIX_LIST_FOREACH(req, &pmix_globals.iof_requests, pmix_iof_req_t) {
3443             /* is this request from the same peer? */
3444             if (peer != req->peer) {
3445                 continue;
3446             }
3447             /* do we already have this source for this peer? */
3448             if (0 == strncmp(cd->procs[n].nspace, req->pname.nspace, PMIX_MAX_NSLEN) &&
3449                 (PMIX_RANK_WILDCARD == req->pname.rank || cd->procs[n].rank == req->pname.rank)) {
3450                 match = true;
3451                 if ((req->channels & cd->channels) != cd->channels) {
3452                     /* this is a channel update */
3453                     req->channels |= cd->channels;
3454                     /* we need to notify the host */
3455                     notify = true;
3456                 }
3457                 break;
3458             }
3459         }
3460         /* if we didn't find the matching entry, then add it */
3461         if (!match) {
3462             /* record the request */
3463             req = PMIX_NEW(pmix_iof_req_t);
3464             if (NULL == req) {
3465                 rc = PMIX_ERR_NOMEM;
3466                 goto exit;
3467             }
3468             PMIX_RETAIN(peer);
3469             req->peer = peer;
3470             req->pname.nspace = strdup(cd->procs[n].nspace);
3471             req->pname.rank = cd->procs[n].rank;
3472             req->channels = cd->channels;
3473             pmix_list_append(&pmix_globals.iof_requests, &req->super);
3474         }
3475         /* process any cached IO */
3476         PMIX_LIST_FOREACH_SAFE(iof, ionext, &pmix_server_globals.iof, pmix_iof_cache_t) {
3477             /* if the channels don't match, then ignore it */
3478             if (!(iof->channel & req->channels)) {
3479                 continue;
3480             }
3481             /* if the source does not match the request, then ignore it */
3482             if (!PMIX_CHECK_PROCID(&iof->source, &req->pname)) {
3483                 continue;
3484             }
3485             /* never forward back to the source! This can happen if the source
3486              * is a launcher */
3487             if (PMIX_CHECK_PROCID(&iof->source, &req->peer->info->pname)) {
3488                 continue;
3489             }
3490             pmix_output_verbose(2, pmix_server_globals.iof_output,
3491                                 "PMIX:SERVER:IOFREQ delivering cached IOF from %s:%d to %s:%d",
3492                                 iof->source.nspace, iof->source.rank,
3493                                 req->peer->info->pname.nspace, req->peer->info->pname.rank);
3494             /* setup the msg */
3495             if (NULL == (msg = PMIX_NEW(pmix_buffer_t))) {
3496                 PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
3497                 rc = PMIX_ERR_OUT_OF_RESOURCE;
3498                 break;
3499             }
3500             /* provide the source */
3501             PMIX_BFROPS_PACK(rc, req->peer, msg, &iof->source, 1, PMIX_PROC);
3502             if (PMIX_SUCCESS != rc) {
3503                 PMIX_ERROR_LOG(rc);
3504                 PMIX_RELEASE(msg);
3505                 break;
3506             }
3507             /* provide the channel */
3508             PMIX_BFROPS_PACK(rc, req->peer, msg, &iof->channel, 1, PMIX_IOF_CHANNEL);
3509             if (PMIX_SUCCESS != rc) {
3510                 PMIX_ERROR_LOG(rc);
3511                 PMIX_RELEASE(msg);
3512                 break;
3513             }
3514             /* pack the data */
3515             PMIX_BFROPS_PACK(rc, req->peer, msg, iof->bo, 1, PMIX_BYTE_OBJECT);
3516             if (PMIX_SUCCESS != rc) {
3517                 PMIX_ERROR_LOG(rc);
3518                 PMIX_RELEASE(msg);
3519                 break;
3520             }
3521             /* send it to the requestor */
3522             PMIX_PTL_SEND_ONEWAY(rc, req->peer, msg, PMIX_PTL_TAG_IOF);
3523             if (PMIX_SUCCESS != rc) {
3524                 PMIX_ERROR_LOG(rc);
3525                 PMIX_RELEASE(msg);
3526             }
3527             /* remove it from the list since it has now been forwarded */
3528             pmix_list_remove_item(&pmix_server_globals.iof, &iof->super);
3529             PMIX_RELEASE(iof);
3530         }
3531     }
3532     if (notify) {
3533         /* ask the host to execute the request */
3534         if (PMIX_SUCCESS != (rc = pmix_host_server.iof_pull(cd->procs, cd->nprocs,
3535                                                             cd->info, cd->ninfo,
3536                                                             cd->channels,
3537                                                             cbfunc, cd))) {
3538             goto exit;
3539         }
3540     }
3541     return PMIX_SUCCESS;
3542 
3543   exit:
3544     PMIX_RELEASE(cd);
3545     return rc;
3546 }
3547 
3548 static void stdcbfunc(pmix_status_t status, void *cbdata)
3549 {
3550     pmix_setup_caddy_t *cd = (pmix_setup_caddy_t*)cbdata;
3551 
3552     if (NULL != cd->opcbfunc) {
3553         cd->opcbfunc(status, cd->cbdata);
3554     }
3555     if (NULL != cd->procs) {
3556         PMIX_PROC_FREE(cd->procs, cd->nprocs);
3557     }
3558     if (NULL != cd->info) {
3559         PMIX_INFO_FREE(cd->info, cd->ninfo);
3560     }
3561     if (NULL != cd->bo) {
3562         PMIX_BYTE_OBJECT_FREE(cd->bo, 1);
3563     }
3564     PMIX_RELEASE(cd);
3565 }
3566 
3567 pmix_status_t pmix_server_iofstdin(pmix_peer_t *peer,
3568                                    pmix_buffer_t *buf,
3569                                    pmix_op_cbfunc_t cbfunc,
3570                                    void *cbdata)
3571 {
3572     int32_t cnt;
3573     pmix_status_t rc;
3574     pmix_proc_t source;
3575     pmix_setup_caddy_t *cd;
3576 
3577     pmix_output_verbose(2, pmix_server_globals.iof_output,
3578                         "recvd stdin IOF data from tool");
3579 
3580     if (NULL == pmix_host_server.push_stdin) {
3581         return PMIX_ERR_NOT_SUPPORTED;
3582     }
3583 
3584     cd = PMIX_NEW(pmix_setup_caddy_t);
3585     if (NULL == cd) {
3586         return PMIX_ERR_NOMEM;
3587     }
3588     cd->opcbfunc = cbfunc;
3589     cd->cbdata = cbdata;
3590 
3591     /* unpack the number of targets */
3592     cnt = 1;
3593     PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->nprocs, &cnt, PMIX_SIZE);
3594     if (PMIX_SUCCESS != rc) {
3595         PMIX_ERROR_LOG(rc);
3596         goto error;
3597     }
3598     if (0 < cd->nprocs) {
3599         PMIX_PROC_CREATE(cd->procs, cd->nprocs);
3600         if (NULL == cd->procs) {
3601             rc = PMIX_ERR_NOMEM;
3602             goto error;
3603         }
3604         cnt = cd->nprocs;
3605         PMIX_BFROPS_UNPACK(rc, peer, buf, cd->procs, &cnt, PMIX_PROC);
3606         if (PMIX_SUCCESS != rc) {
3607             PMIX_ERROR_LOG(rc);
3608             goto error;
3609         }
3610     }
3611 
3612     /* unpack the number of directives */
3613     cnt = 1;
3614     PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->ninfo, &cnt, PMIX_SIZE);
3615     if (PMIX_SUCCESS != rc) {
3616         PMIX_ERROR_LOG(rc);
3617         goto error;
3618     }
3619     if (0 < cd->ninfo) {
3620         PMIX_INFO_CREATE(cd->info, cd->ninfo);
3621         if (NULL == cd->info) {
3622             rc = PMIX_ERR_NOMEM;
3623             goto error;
3624         }
3625         cnt = cd->ninfo;
3626         PMIX_BFROPS_UNPACK(rc, peer, buf, cd->info, &cnt, PMIX_INFO);
3627         if (PMIX_SUCCESS != rc) {
3628             PMIX_ERROR_LOG(rc);
3629             goto error;
3630         }
3631     }
3632 
3633     /* unpack the data */
3634     PMIX_BYTE_OBJECT_CREATE(cd->bo, 1);
3635     if (NULL == cd->bo) {
3636         rc = PMIX_ERR_NOMEM;
3637         goto error;
3638     }
3639 
3640     cnt = 1;
3641     PMIX_BFROPS_UNPACK(rc, peer, buf, cd->bo, &cnt, PMIX_BYTE_OBJECT);
3642     if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER == rc) {
3643         /* it is okay for them to not send data */
3644         PMIX_BYTE_OBJECT_FREE(cd->bo, 1);
3645     } else if (PMIX_SUCCESS != rc) {
3646         PMIX_ERROR_LOG(rc);
3647         goto error;
3648     }
3649 
3650     /* pass the data to the host */
3651     pmix_strncpy(source.nspace, peer->nptr->nspace, PMIX_MAX_NSLEN);
3652     source.rank = peer->info->pname.rank;
3653     if (PMIX_SUCCESS != (rc = pmix_host_server.push_stdin(&source, cd->procs, cd->nprocs,
3654                                                           cd->info, cd->ninfo, cd->bo,
3655                                                           stdcbfunc, cd))) {
3656         if (PMIX_OPERATION_SUCCEEDED != rc) {
3657             goto error;
3658         }
3659     }
3660     return rc;
3661 
3662   error:
3663     PMIX_RELEASE(cd);
3664     return rc;
3665 }
3666 
3667 static void grp_timeout(int sd, short args, void *cbdata)
3668 {
3669     pmix_server_caddy_t *cd = (pmix_server_caddy_t*)cbdata;
3670     pmix_buffer_t *reply;
3671     pmix_status_t ret, rc = PMIX_ERR_TIMEOUT;
3672 
3673     pmix_output_verbose(2, pmix_server_globals.fence_output,
3674                         "ALERT: grp construct timeout fired");
3675 
3676     /* send this requestor the reply */
3677     reply = PMIX_NEW(pmix_buffer_t);
3678     if (NULL == reply) {
3679         goto error;
3680     }
3681     /* setup the reply, starting with the returned status */
3682     PMIX_BFROPS_PACK(ret, cd->peer, reply, &rc, 1, PMIX_STATUS);
3683     if (PMIX_SUCCESS != ret) {
3684         PMIX_ERROR_LOG(ret);
3685         PMIX_RELEASE(reply);
3686         goto error;
3687     }
3688     pmix_output_verbose(2, pmix_server_globals.base_output,
3689                         "server:grp_timeout reply being sent to %s:%u",
3690                         cd->peer->info->pname.nspace, cd->peer->info->pname.rank);
3691     PMIX_SERVER_QUEUE_REPLY(ret, cd->peer, cd->hdr.tag, reply);
3692     if (PMIX_SUCCESS != ret) {
3693         PMIX_RELEASE(reply);
3694     }
3695 
3696   error:
3697     cd->event_active = false;
3698     /* remove it from the list */
3699     pmix_list_remove_item(&cd->trk->local_cbs, &cd->super);
3700     PMIX_RELEASE(cd);
3701 }
3702 
3703 static void _grpcbfunc(int sd, short argc, void *cbdata)
3704 {
3705     pmix_shift_caddy_t *scd = (pmix_shift_caddy_t*)cbdata;
3706     pmix_server_trkr_t *trk = scd->tracker;
3707     pmix_server_caddy_t *cd;
3708     pmix_buffer_t *reply, xfer;
3709     pmix_status_t ret;
3710     size_t n, ctxid = SIZE_MAX;
3711     pmix_group_t *grp = (pmix_group_t*)trk->cbdata;
3712     pmix_byte_object_t *bo = NULL;
3713     pmix_nspace_caddy_t *nptr;
3714     pmix_list_t nslist;
3715     bool found;
3716 
3717     PMIX_ACQUIRE_OBJECT(scd);
3718 
3719     pmix_output_verbose(2, pmix_server_globals.connect_output,
3720                         "server:grpcbfunc processing WITH %d MEMBERS",
3721                         (NULL == trk) ? 0 : (int)pmix_list_get_size(&trk->local_cbs));
3722 
3723     if (NULL == trk) {
3724         /* give them a release if they want it - this should
3725          * never happen, but protect against the possibility */
3726         if (NULL != scd->cbfunc.relfn) {
3727             scd->cbfunc.relfn(scd->cbdata);
3728         }
3729         PMIX_RELEASE(scd);
3730         return;
3731     }
3732 
3733     /* if the timer is active, clear it */
3734     if (trk->event_active) {
3735         pmix_event_del(&trk->ev);
3736     }
3737 
3738     /* the tracker's "hybrid" field is used to indicate construct
3739      * vs destruct */
3740     if (trk->hybrid) {
3741         /* we destructed the group */
3742         if (NULL != grp) {
3743             pmix_list_remove_item(&pmix_server_globals.groups, &grp->super);
3744             PMIX_RELEASE(grp);
3745         }
3746     } else {
3747         /* see if this group was assigned a context ID or collected data */
3748         for (n=0; n < scd->ninfo; n++) {
3749             if (PMIX_CHECK_KEY(&scd->info[n], PMIX_GROUP_CONTEXT_ID)) {
3750                 PMIX_VALUE_GET_NUMBER(ret, &scd->info[n].value, ctxid, size_t);
3751             } else if (PMIX_CHECK_KEY(&scd->info[n], PMIX_GROUP_ENDPT_DATA)) {
3752                 bo = &scd->info[n].value.data.bo;
3753             }
3754         }
3755     }
3756 
3757     /* if data was returned, then we need to have the modex cbfunc
3758      * store it for us before releasing the group members */
3759     if (NULL != bo) {
3760         PMIX_CONSTRUCT(&xfer, pmix_buffer_t);
3761         PMIX_CONSTRUCT(&nslist, pmix_list_t);
3762         /* Collect the nptr list with uniq GDS components of all local
3763          * participants. It does not allow multiple storing to the
3764          * same GDS if participants have mutual GDS. */
3765         PMIX_LIST_FOREACH(cd, &trk->local_cbs, pmix_server_caddy_t) {
3766             // see if we already have this nspace
3767             found = false;
3768             PMIX_LIST_FOREACH(nptr, &nslist, pmix_nspace_caddy_t) {
3769                 if (0 == strcmp(nptr->ns->compat.gds->name,
3770                             cd->peer->nptr->compat.gds->name)) {
3771                     found = true;
3772                     break;
3773                 }
3774             }
3775             if (!found) {
3776                 // add it
3777                 nptr = PMIX_NEW(pmix_nspace_caddy_t);
3778                 PMIX_RETAIN(cd->peer->nptr);
3779                 nptr->ns = cd->peer->nptr;
3780                 pmix_list_append(&nslist, &nptr->super);
3781             }
3782         }
3783 
3784         PMIX_LIST_FOREACH(nptr, &nslist, pmix_nspace_caddy_t) {
3785             PMIX_LOAD_BUFFER(pmix_globals.mypeer, &xfer, bo->bytes, bo->size);
3786             PMIX_GDS_STORE_MODEX(ret, nptr->ns, &xfer, trk);
3787             if (PMIX_SUCCESS != ret) {
3788                 PMIX_ERROR_LOG(ret);
3789                 break;
3790             }
3791         }
3792     }
3793 
3794     /* loop across all procs in the tracker, sending them the reply */
3795     PMIX_LIST_FOREACH(cd, &trk->local_cbs, pmix_server_caddy_t) {
3796         reply = PMIX_NEW(pmix_buffer_t);
3797         if (NULL == reply) {
3798             break;
3799         }
3800         /* setup the reply, starting with the returned status */
3801         PMIX_BFROPS_PACK(ret, cd->peer, reply, &scd->status, 1, PMIX_STATUS);
3802         if (PMIX_SUCCESS != ret) {
3803             PMIX_ERROR_LOG(ret);
3804             PMIX_RELEASE(reply);
3805             break;
3806         }
3807         if (!trk->hybrid) {
3808             /* if a ctxid was provided, pass it along */
3809             PMIX_BFROPS_PACK(ret, cd->peer, reply, &ctxid, 1, PMIX_SIZE);
3810             if (PMIX_SUCCESS != ret) {
3811                 PMIX_ERROR_LOG(ret);
3812                 PMIX_RELEASE(reply);
3813                 break;
3814             }
3815         }
3816         pmix_output_verbose(2, pmix_server_globals.connect_output,
3817                             "server:grp_cbfunc reply being sent to %s:%u",
3818                             cd->peer->info->pname.nspace, cd->peer->info->pname.rank);
3819         PMIX_SERVER_QUEUE_REPLY(ret, cd->peer, cd->hdr.tag, reply);
3820         if (PMIX_SUCCESS != ret) {
3821             PMIX_RELEASE(reply);
3822         }
3823     }
3824 
3825     /* remove the tracker from the list */
3826     pmix_list_remove_item(&pmix_server_globals.collectives, &trk->super);
3827     PMIX_RELEASE(trk);
3828 
3829     /* we are done */
3830     if (NULL != scd->cbfunc.relfn) {
3831         scd->cbfunc.relfn(scd->cbdata);
3832     }
3833     PMIX_RELEASE(scd);
3834 }
3835 
3836 
3837 static void grpcbfunc(pmix_status_t status,
3838                       pmix_info_t *info, size_t ninfo,
3839                       void *cbdata,
3840                       pmix_release_cbfunc_t relfn,
3841                       void *relcbd)
3842 {
3843     pmix_server_trkr_t *tracker = (pmix_server_trkr_t*)cbdata;
3844     pmix_shift_caddy_t *scd;
3845 
3846     pmix_output_verbose(2, pmix_server_globals.connect_output,
3847                         "server:grpcbfunc called with %d info", (int)ninfo);
3848 
3849     if (NULL == tracker) {
3850         /* nothing to do - but be sure to give them
3851          * a release if they want it */
3852         if (NULL != relfn) {
3853             relfn(relcbd);
3854         }
3855         return;
3856     }
3857 
3858     /* need to thread-shift this callback as it accesses global data */
3859     scd = PMIX_NEW(pmix_shift_caddy_t);
3860     if (NULL == scd) {
3861         /* nothing we can do */
3862         if (NULL != relfn) {
3863             relfn(cbdata);
3864         }
3865         return;
3866     }
3867     scd->status = status;
3868     scd->info = info;
3869     scd->ninfo = ninfo;
3870     scd->tracker = tracker;
3871     scd->cbfunc.relfn = relfn;
3872     scd->cbdata = relcbd;
3873     PMIX_THREADSHIFT(scd, _grpcbfunc);
3874 }
3875 
3876 /* we are being called from the PMIx server's switchyard function,
3877  * which means we are in an event and can access global data */
3878 pmix_status_t pmix_server_grpconstruct(pmix_server_caddy_t *cd,
3879                                        pmix_buffer_t *buf)
3880 {
3881     pmix_peer_t *peer = (pmix_peer_t*)cd->peer;
3882     pmix_peer_t *pr;
3883     int32_t cnt, m;
3884     pmix_status_t rc;
3885     char *grpid;
3886     pmix_proc_t *procs;
3887     pmix_group_t *grp, *pgrp;
3888     pmix_info_t *info = NULL, *iptr;
3889     size_t n, ninfo, nprocs, n2;
3890     pmix_server_trkr_t *trk;
3891     struct timeval tv = {0, 0};
3892     bool need_cxtid = false;
3893     bool match, force_local = false;
3894     bool embed_barrier = false;
3895     bool barrier_directive_included = false;
3896     pmix_buffer_t bucket;
3897     pmix_byte_object_t bo;
3898     pmix_list_t mbrs;
3899     pmix_namelist_t *nm;
3900     bool expanded = false;
3901 
3902     pmix_output_verbose(2, pmix_server_globals.connect_output,
3903                         "recvd grpconstruct cmd");
3904 
3905     /* unpack the group ID */
3906     cnt = 1;
3907     PMIX_BFROPS_UNPACK(rc, peer, buf, &grpid, &cnt, PMIX_STRING);
3908     if (PMIX_SUCCESS != rc) {
3909         PMIX_ERROR_LOG(rc);
3910         goto error;
3911     }
3912 
3913     /* see if we already have this group */
3914     grp = NULL;
3915     PMIX_LIST_FOREACH(pgrp, &pmix_server_globals.groups, pmix_group_t) {
3916         if (0 == strcmp(grpid, pgrp->grpid)) {
3917             grp = pgrp;
3918             break;
3919         }
3920     }
3921     if (NULL == grp) {
3922         /* create a new entry */
3923         grp = PMIX_NEW(pmix_group_t);
3924         if (NULL == grp) {
3925             rc = PMIX_ERR_NOMEM;
3926             goto error;
3927         }
3928         grp->grpid = grpid;
3929         pmix_list_append(&pmix_server_globals.groups, &grp->super);
3930     } else {
3931         free(grpid);
3932     }
3933 
3934     /* unpack the number of procs */
3935     cnt = 1;
3936     PMIX_BFROPS_UNPACK(rc, peer, buf, &nprocs, &cnt, PMIX_SIZE);
3937     if (PMIX_SUCCESS != rc) {
3938         PMIX_ERROR_LOG(rc);
3939         goto error;
3940     }
3941     if (0 == nprocs) {
3942         return PMIX_ERR_BAD_PARAM;
3943     }
3944     PMIX_PROC_CREATE(procs, nprocs);
3945     if (NULL == procs) {
3946         rc = PMIX_ERR_NOMEM;
3947         goto error;
3948     }
3949     cnt = nprocs;
3950     PMIX_BFROPS_UNPACK(rc, peer, buf, procs, &cnt, PMIX_PROC);
3951     if (PMIX_SUCCESS != rc) {
3952         PMIX_ERROR_LOG(rc);
3953         PMIX_PROC_FREE(procs, nprocs);
3954         goto error;
3955     }
3956     if (NULL == grp->members) {
3957         /* see if they used a local proc or local peer
3958          * wildcard - if they did, then we need to expand
3959          * it here */
3960         PMIX_CONSTRUCT(&mbrs, pmix_list_t);
3961         for (n=0; n < nprocs; n++) {
3962             if (PMIX_RANK_LOCAL_PEERS == procs[n].rank) {
3963                 expanded = true;
3964                 /* expand to all local procs in this nspace */
3965                 for (m=0; m < pmix_server_globals.clients.size; m++) {
3966                     if (NULL == (pr = (pmix_peer_t*)pmix_pointer_array_get_item(&pmix_server_globals.clients, m))) {
3967                         continue;
3968                     }
3969                     if (PMIX_CHECK_NSPACE(procs[n].nspace, pr->info->pname.nspace)) {
3970                         nm = PMIX_NEW(pmix_namelist_t);
3971                         nm->pname = &pr->info->pname;
3972                         pmix_list_append(&mbrs, &nm->super);
3973                     }
3974                 }
3975             } else if (PMIX_RANK_LOCAL_NODE == procs[n].rank) {
3976                 expanded = true;
3977                 /* add in all procs on the node */
3978                 for (m=0; m < pmix_server_globals.clients.size; m++) {
3979                     if (NULL == (pr = (pmix_peer_t*)pmix_pointer_array_get_item(&pmix_server_globals.clients, m))) {
3980                         continue;
3981                     }
3982                     nm = PMIX_NEW(pmix_namelist_t);
3983                     nm->pname = &pr->info->pname;
3984                     pmix_list_append(&mbrs, &nm->super);
3985                 }
3986             } else {
3987                 nm = PMIX_NEW(pmix_namelist_t);
3988                 /* have to duplicate the name here */
3989                 nm->pname = (pmix_name_t*)malloc(sizeof(pmix_name_t));
3990                 nm->pname->nspace = strdup(procs[n].nspace);
3991                 nm->pname->rank = procs[n].rank;
3992                 pmix_list_append(&mbrs, &nm->super);
3993             }
3994         }
3995         if (expanded) {
3996             PMIX_PROC_FREE(procs, nprocs);
3997             nprocs = pmix_list_get_size(&mbrs);
3998             PMIX_PROC_CREATE(procs, nprocs);
3999             n=0;
4000             while (NULL != (nm = (pmix_namelist_t*)pmix_list_remove_first(&mbrs))) {
4001                 PMIX_LOAD_PROCID(&procs[n], nm->pname->nspace, nm->pname->rank);
4002                 PMIX_RELEASE(nm);
4003             }
4004             PMIX_DESTRUCT(&mbrs);
4005         }
4006         grp->members = procs;
4007         grp->nmbrs = nprocs;
4008     } else {
4009         PMIX_PROC_FREE(procs, nprocs);
4010     }
4011 
4012     /* unpack the number of directives */
4013     cnt = 1;
4014     PMIX_BFROPS_UNPACK(rc, peer, buf, &ninfo, &cnt, PMIX_SIZE);
4015     if (PMIX_SUCCESS != rc) {
4016         PMIX_ERROR_LOG(rc);
4017         goto error;
4018     }
4019     if (0 < ninfo) {
4020         PMIX_INFO_CREATE(info, ninfo);
4021         cnt = ninfo;
4022         PMIX_BFROPS_UNPACK(rc, peer, buf, info, &cnt, PMIX_INFO);
4023         if (PMIX_SUCCESS != rc) {
4024             PMIX_ERROR_LOG(rc);
4025             goto error;
4026         }
4027     }
4028 
4029     /* find/create the local tracker for this operation */
4030     if (NULL == (trk = get_tracker(grp->grpid, grp->members, grp->nmbrs, PMIX_GROUP_CONSTRUCT_CMD))) {
4031         /* If no tracker was found - create and initialize it once */
4032         if (NULL == (trk = new_tracker(grp->grpid, grp->members, grp->nmbrs, PMIX_GROUP_CONSTRUCT_CMD))) {
4033             /* only if a bozo error occurs */
4034             PMIX_ERROR_LOG(PMIX_ERROR);
4035             rc = PMIX_ERROR;
4036             goto error;
4037         }
4038         /* group members must have access to all endpoint info
4039          * upon completion of the construct operation */
4040         trk->collect_type = PMIX_COLLECT_YES;
4041         /* mark as being a construct operation */
4042         trk->hybrid = false;
4043         /* pass along the grp object */
4044         trk->cbdata = grp;
4045         /* we only save the info structs from the first caller
4046          * who provides them - it is a user error to provide
4047          * different values from different participants */
4048         trk->info = info;
4049         trk->ninfo = ninfo;
4050         /* see if we are to enforce a timeout or if they want
4051          * a context ID created - we don't internally care
4052          * about any other directives */
4053         for (n=0; n < ninfo; n++) {
4054             if (PMIX_CHECK_KEY(&info[n], PMIX_TIMEOUT)) {
4055                 tv.tv_sec = info[n].value.data.uint32;
4056             } else if (PMIX_CHECK_KEY(&info[n], PMIX_GROUP_ASSIGN_CONTEXT_ID)) {
4057                 need_cxtid = PMIX_INFO_TRUE(&info[n]);
4058             } else if (PMIX_CHECK_KEY(&info[n], PMIX_GROUP_LOCAL_ONLY)) {
4059                 force_local = PMIX_INFO_TRUE(&info[n]);
4060             } else if (PMIX_CHECK_KEY(&info[n], PMIX_EMBED_BARRIER)) {
4061                 embed_barrier = PMIX_INFO_TRUE(&info[n]);
4062                 barrier_directive_included = true;
4063             }
4064         }
4065         /* see if this constructor only references local processes and isn't
4066          * requesting a context ID - if both conditions are met, then we
4067          * can just locally process the request without bothering the host.
4068          * This is meant to provide an optimized path for a fairly common
4069          * operation */
4070         if (force_local) {
4071             trk->local = true;
4072         } else if (need_cxtid) {
4073             trk->local = false;
4074         } else {
4075             trk->local = true;
4076             for (n=0; n < grp->nmbrs; n++) {
4077                 /* if this entry references the local procs, then
4078                  * we can skip it */
4079                 if (PMIX_RANK_LOCAL_PEERS == grp->members[n].rank ||
4080                     PMIX_RANK_LOCAL_NODE == grp->members[n].rank) {
4081                     continue;
4082                 }
4083                 /* see if it references a specific local proc */
4084                 match = false;
4085                 for (m=0; m < pmix_server_globals.clients.size; m++) {
4086                     if (NULL == (pr = (pmix_peer_t*)pmix_pointer_array_get_item(&pmix_server_globals.clients, m))) {
4087                         continue;
4088                     }
4089                     if (PMIX_CHECK_PROCID(&grp->members[n], &pr->info->pname)) {
4090                         match = true;
4091                         break;
4092                     }
4093                 }
4094                 if (!match) {
4095                     /* this requires a non_local operation */
4096                     trk->local = false;
4097                     break;
4098                 }
4099             }
4100         }
4101     } else {
4102         /* cleanup */
4103         PMIX_INFO_FREE(info, ninfo);
4104         info = NULL;
4105     }
4106 
4107     /* add this contributor to the tracker so they get
4108      * notified when we are done */
4109     pmix_list_append(&trk->local_cbs, &cd->super);
4110 
4111     /* if a timeout was specified, set it */
4112     if (0 < tv.tv_sec) {
4113         pmix_event_evtimer_set(pmix_globals.evbase, &trk->ev,
4114                                grp_timeout, trk);
4115         pmix_event_evtimer_add(&trk->ev, &tv);
4116         trk->event_active = true;
4117     }
4118 
4119     /* if all local contributions have been received,
4120      * let the local host's server know that we are at the
4121      * "fence" point - they will callback once the barrier
4122      * across all participants has been completed */
4123     if (trk->def_complete &&
4124         pmix_list_get_size(&trk->local_cbs) == trk->nlocal) {
4125         pmix_output_verbose(2, pmix_server_globals.base_output,
4126                             "local group op complete with %d procs", (int)trk->npcs);
4127 
4128         if (trk->local) {
4129             /* nothing further needs to be done - we have
4130              * created the local group. let the grpcbfunc
4131              * threadshift the result */
4132             grpcbfunc(PMIX_SUCCESS, NULL, 0, trk, NULL, NULL);
4133             return PMIX_SUCCESS;
4134         }
4135 
4136         /* check if our host supports group operations */
4137         if (NULL == pmix_host_server.group) {
4138             /* remove the tracker from the list */
4139             pmix_list_remove_item(&pmix_server_globals.collectives, &trk->super);
4140             PMIX_RELEASE(trk);
4141             return PMIX_ERR_NOT_SUPPORTED;
4142         }
4143 
4144         /* if they direct us to not embed a barrier, then we won't gather
4145          * the data for distribution */
4146         if (!barrier_directive_included ||
4147             (barrier_directive_included && embed_barrier)) {
4148             /* collect any remote contributions provided by group members */
4149             PMIX_CONSTRUCT(&bucket, pmix_buffer_t);
4150             rc = _collect_data(trk, &bucket);
4151             if (PMIX_SUCCESS != rc) {
4152                 if (trk->event_active) {
4153                     pmix_event_del(&trk->ev);
4154                 }
4155                 /* remove the tracker from the list */
4156                 pmix_list_remove_item(&pmix_server_globals.collectives, &trk->super);
4157                 PMIX_RELEASE(trk);
4158                 PMIX_DESTRUCT(&bucket);
4159                 return rc;
4160             }
4161             /* xfer the results to a byte object */
4162             PMIX_UNLOAD_BUFFER(&bucket, bo.bytes, bo.size);
4163             PMIX_DESTRUCT(&bucket);
4164             /* load any results into a data object for inclusion in the
4165              * fence operation */
4166             n2 = trk->ninfo + 1;
4167             PMIX_INFO_CREATE(iptr, n2);
4168             for (n=0; n < trk->ninfo; n++) {
4169                 PMIX_INFO_XFER(&iptr[n], &trk->info[n]);
4170             }
4171             PMIX_INFO_LOAD(&iptr[ninfo], PMIX_GROUP_ENDPT_DATA, &bo, PMIX_BYTE_OBJECT);
4172             PMIX_BYTE_OBJECT_DESTRUCT(&bo);
4173             PMIX_INFO_FREE(trk->info, trk->ninfo);
4174             trk->info = iptr;
4175             trk->ninfo = n2;
4176         }
4177         rc = pmix_host_server.group(PMIX_GROUP_CONSTRUCT, grp->grpid,
4178                                     trk->pcs, trk->npcs,
4179                                     trk->info, trk->ninfo,
4180                                     grpcbfunc, trk);
4181         if (PMIX_SUCCESS != rc) {
4182             if (trk->event_active) {
4183                 pmix_event_del(&trk->ev);
4184             }
4185             if (PMIX_OPERATION_SUCCEEDED == rc) {
4186                 /* let the grpcbfunc threadshift the result */
4187                 grpcbfunc(PMIX_SUCCESS, NULL, 0, trk, NULL, NULL);
4188                 return PMIX_SUCCESS;
4189             }
4190             /* remove the tracker from the list */
4191             pmix_list_remove_item(&pmix_server_globals.collectives, &trk->super);
4192             PMIX_RELEASE(trk);
4193             return rc;
4194         }
4195     }
4196 
4197     return PMIX_SUCCESS;
4198 
4199   error:
4200     if (NULL != info) {
4201         PMIX_INFO_FREE(info, ninfo);
4202     }
4203     return rc;
4204 }
4205 
4206 /* we are being called from the PMIx server's switchyard function,
4207  * which means we are in an event and can access global data */
4208 pmix_status_t pmix_server_grpdestruct(pmix_server_caddy_t *cd,
4209                                       pmix_buffer_t *buf)
4210 {
4211     pmix_peer_t *peer = (pmix_peer_t*)cd->peer;
4212     int32_t cnt;
4213     pmix_status_t rc;
4214     char *grpid;
4215     pmix_info_t *info = NULL;
4216     size_t n, ninfo;
4217     pmix_server_trkr_t *trk;
4218     pmix_group_t *grp, *pgrp;
4219     struct timeval tv = {0, 0};
4220 
4221     pmix_output_verbose(2, pmix_server_globals.iof_output,
4222                         "recvd grpdestruct cmd");
4223 
4224     if (NULL == pmix_host_server.group) {
4225         PMIX_ERROR_LOG(PMIX_ERR_NOT_SUPPORTED);
4226         return PMIX_ERR_NOT_SUPPORTED;
4227     }
4228 
4229     /* unpack the group ID */
4230     cnt = 1;
4231     PMIX_BFROPS_UNPACK(rc, peer, buf, &grpid, &cnt, PMIX_STRING);
4232     if (PMIX_SUCCESS != rc) {
4233         PMIX_ERROR_LOG(rc);
4234         goto error;
4235     }
4236 
4237     /* find this group in our list */
4238     grp = NULL;
4239     PMIX_LIST_FOREACH(pgrp, &pmix_server_globals.groups, pmix_group_t) {
4240         if (0 == strcmp(grpid, pgrp->grpid)) {
4241             grp = pgrp;
4242             break;
4243         }
4244     }
4245     free(grpid);
4246 
4247     /* if not found, then this is an error - we cannot
4248      * destruct a group we don't know about */
4249     if (NULL == grp) {
4250         rc = PMIX_ERR_NOT_FOUND;
4251         goto error;
4252     }
4253 
4254     /* unpack the number of directives */
4255     cnt = 1;
4256     PMIX_BFROPS_UNPACK(rc, peer, buf, &ninfo, &cnt, PMIX_SIZE);
4257     if (PMIX_SUCCESS != rc) {
4258         PMIX_ERROR_LOG(rc);
4259         goto error;
4260     }
4261     if (0 < ninfo) {
4262         PMIX_INFO_CREATE(info, ninfo);
4263         cnt = ninfo;
4264         PMIX_BFROPS_UNPACK(rc, peer, buf, info, &cnt, PMIX_INFO);
4265         if (PMIX_SUCCESS != rc) {
4266             PMIX_ERROR_LOG(rc);
4267             goto error;
4268         }
4269         /* see if we are to enforce a timeout - we don't internally care
4270          * about any other directives */
4271         for (n=0; n < ninfo; n++) {
4272             if (PMIX_CHECK_KEY(&info[n], PMIX_TIMEOUT)) {
4273                 tv.tv_sec = info[n].value.data.uint32;
4274                 break;
4275             }
4276         }
4277     }
4278 
4279     /* find/create the local tracker for this operation */
4280     if (NULL == (trk = get_tracker(grp->grpid, grp->members, grp->nmbrs, PMIX_GROUP_DESTRUCT_CMD))) {
4281         /* If no tracker was found - create and initialize it once */
4282         if (NULL == (trk = new_tracker(grp->grpid, grp->members, grp->nmbrs, PMIX_GROUP_DESTRUCT_CMD))) {
4283             /* only if a bozo error occurs */
4284             PMIX_ERROR_LOG(PMIX_ERROR);
4285             rc = PMIX_ERROR;
4286             goto error;
4287         }
4288         trk->collect_type = PMIX_COLLECT_NO;
4289         /* mark as being a destruct operation */
4290         trk->hybrid = true;
4291         /* pass along the group object */
4292         trk->cbdata = grp;
4293     }
4294 
4295     /* we only save the info structs from the first caller
4296      * who provides them - it is a user error to provide
4297      * different values from different participants */
4298     if (NULL == trk->info) {
4299         trk->info = info;
4300         trk->ninfo = ninfo;
4301     } else {
4302         /* cleanup */
4303         PMIX_INFO_FREE(info, ninfo);
4304         info = NULL;
4305     }
4306 
4307     /* add this contributor to the tracker so they get
4308      * notified when we are done */
4309     pmix_list_append(&trk->local_cbs, &cd->super);
4310 
4311     /* if a timeout was specified, set it */
4312     if (0 < tv.tv_sec) {
4313         pmix_event_evtimer_set(pmix_globals.evbase, &trk->ev,
4314                                grp_timeout, trk);
4315         pmix_event_evtimer_add(&trk->ev, &tv);
4316         trk->event_active = true;
4317     }
4318 
4319     /* if all local contributions have been received,
4320      * let the local host's server know that we are at the
4321      * "fence" point - they will callback once the barrier
4322      * across all participants has been completed */
4323     if (trk->def_complete &&
4324         pmix_list_get_size(&trk->local_cbs) == trk->nlocal) {
4325         pmix_output_verbose(2, pmix_server_globals.base_output,
4326                             "local group op complete %d", (int)trk->nlocal);
4327 
4328         rc = pmix_host_server.group(PMIX_GROUP_DESTRUCT, grp->grpid,
4329                                     grp->members, grp->nmbrs,
4330                                     trk->info, trk->ninfo,
4331                                     grpcbfunc, trk);
4332         if (PMIX_SUCCESS != rc) {
4333             if (trk->event_active) {
4334                 pmix_event_del(&trk->ev);
4335             }
4336             if (PMIX_OPERATION_SUCCEEDED == rc) {
4337                 /* let the grpcbfunc threadshift the result */
4338                 grpcbfunc(PMIX_SUCCESS, NULL, 0, trk, NULL, NULL);
4339                 return PMIX_SUCCESS;
4340             }
4341             /* remove the tracker from the list */
4342             pmix_list_remove_item(&pmix_server_globals.collectives, &trk->super);
4343             PMIX_RELEASE(trk);
4344             return rc;
4345         }
4346     }
4347 
4348     return PMIX_SUCCESS;
4349 
4350   error:
4351     if (NULL != info) {
4352         PMIX_INFO_FREE(info, ninfo);
4353     }
4354     return rc;
4355 }
4356 
4357 /*****    INSTANCE SERVER LIBRARY CLASSES    *****/
4358 static void tcon(pmix_server_trkr_t *t)
4359 {
4360     t->event_active = false;
4361     t->host_called = false;
4362     t->local = false;
4363     t->id = NULL;
4364     memset(t->pname.nspace, 0, PMIX_MAX_NSLEN+1);
4365     t->pname.rank = PMIX_RANK_UNDEF;
4366     t->pcs = NULL;
4367     t->npcs = 0;
4368     PMIX_CONSTRUCT(&t->nslist, pmix_list_t);
4369     PMIX_CONSTRUCT_LOCK(&t->lock);
4370     t->def_complete = false;
4371     PMIX_CONSTRUCT(&t->local_cbs, pmix_list_t);
4372     t->nlocal = 0;
4373     t->local_cnt = 0;
4374     t->info = NULL;
4375     t->ninfo = 0;
4376     /* this needs to be set explicitly */
4377     t->collect_type = PMIX_COLLECT_INVALID;
4378     t->modexcbfunc = NULL;
4379     t->op_cbfunc = NULL;
4380     t->hybrid = false;
4381     t->cbdata = NULL;
4382 }
4383 static void tdes(pmix_server_trkr_t *t)
4384 {
4385     if (NULL != t->id) {
4386         free(t->id);
4387     }
4388     PMIX_DESTRUCT_LOCK(&t->lock);
4389     if (NULL != t->pcs) {
4390         free(t->pcs);
4391     }
4392     PMIX_LIST_DESTRUCT(&t->local_cbs);
4393     if (NULL != t->info) {
4394         PMIX_INFO_FREE(t->info, t->ninfo);
4395     }
4396     PMIX_DESTRUCT(&t->nslist);
4397 }
4398 PMIX_CLASS_INSTANCE(pmix_server_trkr_t,
4399                    pmix_list_item_t,
4400                    tcon, tdes);
4401 
4402 static void cdcon(pmix_server_caddy_t *cd)
4403 {
4404     memset(&cd->ev, 0, sizeof(pmix_event_t));
4405     cd->event_active = false;
4406     cd->trk = NULL;
4407     cd->peer = NULL;
4408 }
4409 static void cddes(pmix_server_caddy_t *cd)
4410 {
4411     if (cd->event_active) {
4412         pmix_event_del(&cd->ev);
4413     }
4414     if (NULL != cd->trk) {
4415         PMIX_RELEASE(cd->trk);
4416     }
4417     if (NULL != cd->peer) {
4418         PMIX_RELEASE(cd->peer);
4419     }
4420 }
4421 PMIX_CLASS_INSTANCE(pmix_server_caddy_t,
4422                    pmix_list_item_t,
4423                    cdcon, cddes);
4424 
4425 
4426 static void scadcon(pmix_setup_caddy_t *p)
4427 {
4428     p->peer = NULL;
4429     memset(&p->proc, 0, sizeof(pmix_proc_t));
4430     PMIX_CONSTRUCT_LOCK(&p->lock);
4431     p->nspace = NULL;
4432     p->codes = NULL;
4433     p->ncodes = 0;
4434     p->procs = NULL;
4435     p->nprocs = 0;
4436     p->apps = NULL;
4437     p->napps = 0;
4438     p->server_object = NULL;
4439     p->nlocalprocs = 0;
4440     p->info = NULL;
4441     p->ninfo = 0;
4442     p->keys = NULL;
4443     p->channels = PMIX_FWD_NO_CHANNELS;
4444     p->bo = NULL;
4445     p->nbo = 0;
4446     p->cbfunc = NULL;
4447     p->opcbfunc = NULL;
4448     p->setupcbfunc = NULL;
4449     p->lkcbfunc = NULL;
4450     p->spcbfunc = NULL;
4451     p->cbdata = NULL;
4452 }
4453 static void scaddes(pmix_setup_caddy_t *p)
4454 {
4455     if (NULL != p->peer) {
4456         PMIX_RELEASE(p->peer);
4457     }
4458     PMIX_PROC_FREE(p->procs, p->nprocs);
4459     if (NULL != p->apps) {
4460         PMIX_APP_FREE(p->apps, p->napps);
4461     }
4462     if (NULL != p->bo) {
4463         PMIX_BYTE_OBJECT_FREE(p->bo, p->nbo);
4464     }
4465     PMIX_DESTRUCT_LOCK(&p->lock);
4466 }
4467 PMIX_EXPORT PMIX_CLASS_INSTANCE(pmix_setup_caddy_t,
4468                                 pmix_object_t,
4469                                 scadcon, scaddes);
4470 
4471 static void ncon(pmix_notify_caddy_t *p)
4472 {
4473     PMIX_CONSTRUCT_LOCK(&p->lock);
4474 #if defined(__linux__) && PMIX_HAVE_CLOCK_GETTIME
4475     struct timespec tp;
4476     (void) clock_gettime(CLOCK_MONOTONIC, &tp);
4477     p->ts = tp.tv_sec;
4478 #else
4479     /* Fall back to gettimeofday() if we have nothing else */
4480     struct timeval tv;
4481     gettimeofday(&tv, NULL);
4482     p->ts = tv.tv_sec;
4483 #endif
4484     p->room = -1;
4485     memset(p->source.nspace, 0, PMIX_MAX_NSLEN+1);
4486     p->source.rank = PMIX_RANK_UNDEF;
4487     p->range = PMIX_RANGE_UNDEF;
4488     p->targets = NULL;
4489     p->ntargets = 0;
4490     p->nleft = SIZE_MAX;
4491     p->affected = NULL;
4492     p->naffected = 0;
4493     p->nondefault = false;
4494     p->info = NULL;
4495     p->ninfo = 0;
4496 }
4497 static void ndes(pmix_notify_caddy_t *p)
4498 {
4499     PMIX_DESTRUCT_LOCK(&p->lock);
4500     if (NULL != p->info) {
4501         PMIX_INFO_FREE(p->info, p->ninfo);
4502     }
4503     PMIX_PROC_FREE(p->affected, p->naffected);
4504     if (NULL != p->targets) {
4505         free(p->targets);
4506     }
4507 }
4508 PMIX_CLASS_INSTANCE(pmix_notify_caddy_t,
4509                     pmix_object_t,
4510                     ncon, ndes);
4511 
4512 
4513 PMIX_CLASS_INSTANCE(pmix_trkr_caddy_t,
4514                     pmix_object_t,
4515                     NULL, NULL);
4516 
4517 static void dmcon(pmix_dmdx_remote_t *p)
4518 {
4519     p->cd = NULL;
4520 }
4521 static void dmdes(pmix_dmdx_remote_t *p)
4522 {
4523     if (NULL != p->cd) {
4524         PMIX_RELEASE(p->cd);
4525     }
4526 }
4527 PMIX_CLASS_INSTANCE(pmix_dmdx_remote_t,
4528                     pmix_list_item_t,
4529                     dmcon, dmdes);
4530 
4531 static void dmrqcon(pmix_dmdx_request_t *p)
4532 {
4533     memset(&p->ev, 0, sizeof(pmix_event_t));
4534     p->event_active = false;
4535     p->lcd = NULL;
4536 }
4537 static void dmrqdes(pmix_dmdx_request_t *p)
4538 {
4539     if (p->event_active) {
4540         pmix_event_del(&p->ev);
4541     }
4542     if (NULL != p->lcd) {
4543         PMIX_RELEASE(p->lcd);
4544     }
4545 }
4546 PMIX_CLASS_INSTANCE(pmix_dmdx_request_t,
4547                     pmix_list_item_t,
4548                     dmrqcon, dmrqdes);
4549 
4550 static void lmcon(pmix_dmdx_local_t *p)
4551 {
4552     memset(&p->proc, 0, sizeof(pmix_proc_t));
4553     PMIX_CONSTRUCT(&p->loc_reqs, pmix_list_t);
4554     p->info = NULL;
4555     p->ninfo = 0;
4556 }
4557 static void lmdes(pmix_dmdx_local_t *p)
4558 {
4559     if (NULL != p->info) {
4560         PMIX_INFO_FREE(p->info, p->ninfo);
4561     }
4562     PMIX_LIST_DESTRUCT(&p->loc_reqs);
4563 }
4564 PMIX_CLASS_INSTANCE(pmix_dmdx_local_t,
4565                     pmix_list_item_t,
4566                     lmcon, lmdes);
4567 
4568 static void prevcon(pmix_peer_events_info_t *p)
4569 {
4570     p->peer = NULL;
4571     p->affected = NULL;
4572     p->naffected = 0;
4573 }
4574 static void prevdes(pmix_peer_events_info_t *p)
4575 {
4576     if (NULL != p->peer) {
4577         PMIX_RELEASE(p->peer);
4578     }
4579     if (NULL != p->affected) {
4580         PMIX_PROC_FREE(p->affected, p->naffected);
4581     }
4582 }
4583 PMIX_CLASS_INSTANCE(pmix_peer_events_info_t,
4584                     pmix_list_item_t,
4585                     prevcon, prevdes);
4586 
4587 static void regcon(pmix_regevents_info_t *p)
4588 {
4589     PMIX_CONSTRUCT(&p->peers, pmix_list_t);
4590 }
4591 static void regdes(pmix_regevents_info_t *p)
4592 {
4593     PMIX_LIST_DESTRUCT(&p->peers);
4594 }
4595 PMIX_CLASS_INSTANCE(pmix_regevents_info_t,
4596                     pmix_list_item_t,
4597                     regcon, regdes);
4598 
4599 static void ilcon(pmix_inventory_rollup_t *p)
4600 {
4601     PMIX_CONSTRUCT_LOCK(&p->lock);
4602     p->lock.active = false;
4603     p->status = PMIX_SUCCESS;
4604     p->requests = 0;
4605     p->replies = 0;
4606     PMIX_CONSTRUCT(&p->payload, pmix_list_t);
4607     p->info = NULL;
4608     p->ninfo = 0;
4609     p->cbfunc = NULL;
4610     p->infocbfunc = NULL;
4611     p->opcbfunc = NULL;
4612     p->cbdata = NULL;
4613 }
4614 static void ildes(pmix_inventory_rollup_t *p)
4615 {
4616     PMIX_DESTRUCT_LOCK(&p->lock);
4617     PMIX_LIST_DESTRUCT(&p->payload);
4618 }
4619 PMIX_CLASS_INSTANCE(pmix_inventory_rollup_t,
4620                     pmix_object_t,
4621                     ilcon, ildes);
4622 
4623 static void grcon(pmix_group_t *p)
4624 {
4625     p->grpid = NULL;
4626     p->members = NULL;
4627     p->nmbrs = 0;
4628 }
4629 static void grdes(pmix_group_t *p)
4630 {
4631     if (NULL != p->grpid) {
4632         free(p->grpid);
4633     }
4634     if (NULL != p->members) {
4635         PMIX_PROC_FREE(p->members, p->nmbrs);
4636     }
4637 }
4638 PMIX_CLASS_INSTANCE(pmix_group_t,
4639                     pmix_list_item_t,
4640                     grcon, grdes);
4641 
4642 PMIX_CLASS_INSTANCE(pmix_group_caddy_t,
4643                     pmix_list_item_t,
4644                     NULL, NULL);
4645 
4646 static void iocon(pmix_iof_cache_t *p)
4647 {
4648     p->bo = NULL;
4649 }
4650 static void iodes(pmix_iof_cache_t *p)
4651 {
4652     PMIX_BYTE_OBJECT_FREE(p->bo, 1);  // macro protects against NULL
4653 }
4654 PMIX_CLASS_INSTANCE(pmix_iof_cache_t,
4655                     pmix_list_item_t,
4656                     iocon, iodes);

/* [<][>][^][v][top][bottom][index][help] */