root/opal/mca/pmix/pmix4x/pmix/src/server/pmix_server_get.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. dcd_con
  2. relfn
  3. pmix_server_get
  4. create_local_tracker
  5. pmix_pending_nspace_requests
  6. _satisfy_request
  7. pmix_pending_resolve
  8. _process_dmdx_reply
  9. dmdx_cbfunc
  10. get_timeout

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2014-2018 Intel, Inc. All rights reserved.
   4  * Copyright (c) 2014-2019 Research Organization for Information Science
   5  *                         and Technology (RIST).  All rights reserved.
   6  * Copyright (c) 2014-2015 Artem Y. Polyakov <artpol84@gmail.com>.
   7  *                         All rights reserved.
   8  * Copyright (c) 2016      Mellanox Technologies, Inc.
   9  *                         All rights reserved.
  10  * Copyright (c) 2016      IBM Corporation.  All rights reserved.
  11  * $COPYRIGHT$
  12  *
  13  * Additional copyrights may follow
  14  *
  15  * $HEADER$
  16  */
  17 
  18 #include <src/include/pmix_config.h>
  19 
  20 #include <src/include/pmix_stdint.h>
  21 #include <src/include/pmix_socket_errno.h>
  22 
  23 #include <pmix_server.h>
  24 #include <pmix_rename.h>
  25 #include "src/include/pmix_globals.h"
  26 
  27 #ifdef HAVE_STRING_H
  28 #include <string.h>
  29 #endif
  30 #include <fcntl.h>
  31 #ifdef HAVE_UNISTD_H
  32 #include <unistd.h>
  33 #endif
  34 #ifdef HAVE_SYS_SOCKET_H
  35 #include <sys/socket.h>
  36 #endif
  37 #ifdef HAVE_SYS_UN_H
  38 #include <sys/un.h>
  39 #endif
  40 #ifdef HAVE_SYS_UIO_H
  41 #include <sys/uio.h>
  42 #endif
  43 #ifdef HAVE_SYS_TYPES_H
  44 #include <sys/types.h>
  45 #endif
  46 #include PMIX_EVENT_HEADER
  47 
  48 #include "src/class/pmix_list.h"
  49 #include "src/mca/bfrops/bfrops.h"
  50 #include "src/mca/gds/gds.h"
  51 #include "src/util/argv.h"
  52 #include "src/util/error.h"
  53 #include "src/util/output.h"
  54 #include "src/util/pmix_environ.h"
  55 
  56 #include "pmix_server_ops.h"
  57 
  58 extern pmix_server_module_t pmix_host_server;
  59 
  60 typedef struct {
  61     pmix_object_t super;
  62     pmix_event_t ev;
  63     volatile bool active;
  64     pmix_status_t status;
  65     const char *data;
  66     size_t ndata;
  67     pmix_dmdx_local_t *lcd;
  68     pmix_release_cbfunc_t relcbfunc;
  69     void *cbdata;
  70 } pmix_dmdx_reply_caddy_t;
  71 static void dcd_con(pmix_dmdx_reply_caddy_t *p)
  72 {
  73     p->status = PMIX_ERROR;
  74     p->ndata = 0;
  75     p->lcd = NULL;
  76     p->relcbfunc = NULL;
  77     p->cbdata = NULL;
  78 }
  79 PMIX_CLASS_INSTANCE(pmix_dmdx_reply_caddy_t,
  80                    pmix_object_t, dcd_con, NULL);
  81 
  82 
  83 static void dmdx_cbfunc(pmix_status_t status, const char *data,
  84                         size_t ndata, void *cbdata,
  85                         pmix_release_cbfunc_t relfn, void *relcbdata);
  86 static pmix_status_t _satisfy_request(pmix_namespace_t *ns, pmix_rank_t rank,
  87                                       pmix_server_caddy_t *cd,
  88                                       pmix_modex_cbfunc_t cbfunc, void *cbdata, bool *scope);
  89 static pmix_status_t create_local_tracker(char nspace[], pmix_rank_t rank,
  90                                           pmix_info_t info[], size_t ninfo,
  91                                           pmix_modex_cbfunc_t cbfunc,
  92                                           void *cbdata,
  93                                           pmix_dmdx_local_t **lcd,
  94                                           pmix_dmdx_request_t **rq);
  95 
  96 static void get_timeout(int sd, short args, void *cbdata);
  97 
  98 
  99 /* declare a function whose sole purpose is to
 100  * free data that we provided to our host server
 101  * when servicing dmodex requests */
 102 static void relfn(void *cbdata)
 103 {
 104     char *data = (char*)cbdata;
 105     if (NULL != data) {
 106         free(data);
 107     }
 108 }
 109 
 110 
 111 pmix_status_t pmix_server_get(pmix_buffer_t *buf,
 112                               pmix_modex_cbfunc_t cbfunc,
 113                               void *cbdata)
 114 {
 115     pmix_server_caddy_t *cd = (pmix_server_caddy_t*)cbdata;
 116     int32_t cnt;
 117     pmix_status_t rc;
 118     pmix_rank_t rank;
 119     char *cptr;
 120     char nspace[PMIX_MAX_NSLEN+1];
 121     pmix_namespace_t *ns, *nptr;
 122     pmix_info_t *info=NULL;
 123     size_t ninfo=0;
 124     pmix_dmdx_local_t *lcd;
 125     pmix_dmdx_request_t *req;
 126     bool local;
 127     bool localonly = false;
 128     struct timeval tv = {0, 0};
 129     pmix_buffer_t pbkt, pkt;
 130     pmix_byte_object_t bo;
 131     pmix_cb_t cb;
 132     pmix_proc_t proc;
 133     char *data;
 134     size_t sz, n;
 135     pmix_peer_t *peer;
 136 
 137     pmix_output_verbose(2, pmix_server_globals.get_output,
 138                         "recvd GET");
 139 
 140     /* setup */
 141     memset(nspace, 0, sizeof(nspace));
 142 
 143     /* retrieve the nspace and rank of the requested proc */
 144     cnt = 1;
 145     PMIX_BFROPS_UNPACK(rc, cd->peer, buf, &cptr, &cnt, PMIX_STRING);
 146     if (PMIX_SUCCESS != rc) {
 147         PMIX_ERROR_LOG(rc);
 148         return rc;
 149     }
 150     pmix_strncpy(nspace, cptr, PMIX_MAX_NSLEN);
 151     free(cptr);
 152     cnt = 1;
 153     PMIX_BFROPS_UNPACK(rc, cd->peer, buf, &rank, &cnt, PMIX_PROC_RANK);
 154     if (PMIX_SUCCESS != rc) {
 155         PMIX_ERROR_LOG(rc);
 156         return rc;
 157     }
 158     /* retrieve any provided info structs */
 159     cnt = 1;
 160     PMIX_BFROPS_UNPACK(rc, cd->peer, buf, &ninfo, &cnt, PMIX_SIZE);
 161     if (PMIX_SUCCESS != rc) {
 162         PMIX_ERROR_LOG(rc);
 163         return rc;
 164     }
 165     if (0 < ninfo) {
 166         PMIX_INFO_CREATE(info, ninfo);
 167         if (NULL == info) {
 168             PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
 169             return PMIX_ERR_NOMEM;
 170         }
 171         cnt = ninfo;
 172         PMIX_BFROPS_UNPACK(rc, cd->peer, buf, info, &cnt, PMIX_INFO);
 173         if (PMIX_SUCCESS != rc) {
 174             PMIX_ERROR_LOG(rc);
 175             PMIX_INFO_FREE(info, ninfo);
 176             return rc;
 177         }
 178     }
 179 
 180     /* search for directives we can deal with here */
 181     for (n=0; n < ninfo; n++) {
 182         if (0 == strncmp(info[n].key, PMIX_IMMEDIATE, PMIX_MAX_KEYLEN)) {
 183             /* just check our own data - don't wait
 184              * or request it from someone else */
 185             localonly = PMIX_INFO_TRUE(&info[n]);
 186         } else if (0 == strncmp(info[n].key, PMIX_TIMEOUT, PMIX_MAX_KEYLEN)) {
 187             tv.tv_sec = info[n].value.data.uint32;
 188         }
 189     }
 190 
 191     /* find the nspace object for this client */
 192     nptr = NULL;
 193     PMIX_LIST_FOREACH(ns, &pmix_server_globals.nspaces, pmix_namespace_t) {
 194         if (0 == strcmp(nspace, ns->nspace)) {
 195             nptr = ns;
 196             break;
 197         }
 198     }
 199 
 200     pmix_output_verbose(2, pmix_server_globals.get_output,
 201                         "%s:%d EXECUTE GET FOR %s:%d ON BEHALF OF %s:%d",
 202                         pmix_globals.myid.nspace,
 203                         pmix_globals.myid.rank, nspace, rank,
 204                         cd->peer->info->pname.nspace,
 205                         cd->peer->info->pname.rank);
 206 
 207     /* This call flows upward from a local client If we don't
 208      * know about this nspace, then it cannot refer to the
 209      * nspace of the requestor - i.e., they aren't asking
 210      * about one of their peers. There are two reasons why we
 211      * might not know about this nspace at this time:
 212      *
 213      * (a) we don't host any local procs from this nspace, and
 214      *     so the local RM didn't tell us about it. We will have
 215      *     to request the information from it.
 216      *
 217      * (b) a race condition where the other job hasn't registered
 218      *     its nspace yet. This begs the question as to how the
 219      *     requestor got the nspace name in the first place!
 220      *     However, there _may_ be some path whereby that could
 221      *     happen, so we try to deal with it here.
 222      *
 223      * Either way, we are going to have to request the info from
 224      * the host RM. Since we are hopeful of getting an answer,
 225      * we add the nspace to our list of known nspaces so the
 226      * info has a "landing zone" upon return */
 227 
 228     if (NULL == nptr) {
 229         if (localonly) {
 230             /* the user doesn't want us to look for the info,
 231              * so we simply return at this point */
 232             return PMIX_ERR_NOT_FOUND;
 233         }
 234         /* this is for an nspace we don't know about yet, so
 235          * record the request for data from this process and
 236          * give the host server a chance to tell us about it.
 237          * The cbdata passed here is the pmix_server_caddy_t
 238          * we were passed - it contains the pmix_peer_t of
 239          * the original requestor so they will get the data
 240          * back when we receive it */
 241         rc = create_local_tracker(nspace, rank,
 242                                   info, ninfo,
 243                                   cbfunc, cbdata, &lcd, &req);
 244         if (PMIX_ERR_NOMEM == rc) {
 245             PMIX_INFO_FREE(info, ninfo);
 246             return rc;
 247         }
 248         if (PMIX_SUCCESS == rc) {
 249             /* if they specified a timeout for this specific
 250              * request, set it up now */
 251             if (0 < tv.tv_sec) {
 252                 pmix_event_evtimer_set(pmix_globals.evbase, &req->ev,
 253                                        get_timeout, req);
 254                 pmix_event_evtimer_add(&req->ev, &tv);
 255                 req->event_active = true;
 256             }
 257             /* we already asked for this info - no need to
 258              * do it again */
 259             return PMIX_SUCCESS;
 260         }
 261         /* only other return code is NOT_FOUND, indicating that
 262          * we created a new tracker */
 263 
 264         /* Its possible there will be no local processes on this
 265          * host, so lets ask for this explicitly.  There can
 266          * be a race condition here if this information shows
 267          * up on its own, but at worst the direct modex
 268          * will simply overwrite the info later */
 269         if (NULL != pmix_host_server.direct_modex) {
 270             rc = pmix_host_server.direct_modex(&lcd->proc, info, ninfo, dmdx_cbfunc, lcd);
 271             if (PMIX_SUCCESS != rc) {
 272                 PMIX_INFO_FREE(info, ninfo);
 273                 pmix_list_remove_item(&pmix_server_globals.local_reqs, &lcd->super);
 274                 PMIX_RELEASE(lcd);
 275                 return rc;
 276             }
 277             /* if they specified a timeout for this specific
 278              * request, set it up now */
 279             if (0 < tv.tv_sec) {
 280                 pmix_event_evtimer_set(pmix_globals.evbase, &req->ev,
 281                                        get_timeout, req);
 282                 pmix_event_evtimer_add(&req->ev, &tv);
 283                 req->event_active = true;
 284             }
 285         } else {
 286         /* if we don't have direct modex feature, just respond with "not found" */
 287             PMIX_INFO_FREE(info, ninfo);
 288             pmix_list_remove_item(&pmix_server_globals.local_reqs, &lcd->super);
 289             PMIX_RELEASE(lcd);
 290             return PMIX_ERR_NOT_FOUND;
 291         }
 292 
 293         return PMIX_SUCCESS;
 294     }
 295 
 296     /* this nspace is known, so we can process the request.
 297      * if the rank is wildcard, then they are asking for the
 298      * job-level info for this nspace - provide it */
 299     if (PMIX_RANK_WILDCARD == rank) {
 300         /* see if we have the job-level info - we won't have it
 301          * if we have no local procs and haven't already asked
 302          * for it, so there is no guarantee we have it */
 303         data = NULL;
 304         sz = 0;
 305         pmix_strncpy(proc.nspace, nspace, PMIX_MAX_NSLEN);
 306         proc.rank = PMIX_RANK_WILDCARD;
 307         /* if we have local procs for this nspace, then we
 308          * can retrieve the info from that GDS. Otherwise,
 309          * we need to retrieve it from our own */
 310         PMIX_CONSTRUCT(&cb, pmix_cb_t);
 311         peer = pmix_globals.mypeer;
 312         /* this data is for a local client, so give the gds the
 313          * option of returning a complete copy of the data,
 314          * or returning a pointer to local storage */
 315         cb.proc = &proc;
 316         cb.scope = PMIX_SCOPE_UNDEF;
 317         cb.copy = false;
 318         PMIX_GDS_FETCH_KV(rc, peer, &cb);
 319         if (PMIX_SUCCESS != rc) {
 320             PMIX_DESTRUCT(&cb);
 321             return rc;
 322         }
 323         PMIX_CONSTRUCT(&pkt, pmix_buffer_t);
 324         /* assemble the provided data into a byte object */
 325         PMIX_GDS_ASSEMB_KVS_REQ(rc, peer, &proc, &cb.kvs, &pkt, cd);
 326         if (PMIX_SUCCESS != rc) {
 327             PMIX_ERROR_LOG(rc);
 328             PMIX_DESTRUCT(&cb);
 329             return rc;
 330         }
 331         PMIX_UNLOAD_BUFFER(&pkt, bo.bytes, bo.size);
 332         PMIX_DESTRUCT(&pkt);
 333         /* pack it into the payload */
 334         PMIX_CONSTRUCT(&pbkt, pmix_buffer_t);
 335         PMIX_BFROPS_PACK(rc, cd->peer, &pbkt, &bo, 1, PMIX_BYTE_OBJECT);
 336         free(bo.bytes);
 337         if (PMIX_SUCCESS != rc) {
 338             PMIX_ERROR_LOG(rc);
 339             PMIX_DESTRUCT(&pbkt);
 340             PMIX_DESTRUCT(&cb);
 341             return rc;
 342         }
 343         /* unload the resulting payload */
 344         PMIX_UNLOAD_BUFFER(&pbkt, data, sz);
 345         PMIX_DESTRUCT(&pbkt);
 346         /* call the internal callback function - it will
 347          * release the cbdata */
 348         cbfunc(PMIX_SUCCESS, data, sz, cbdata, relfn, data);
 349         /* return success so the server doesn't duplicate
 350          * the release of cbdata */
 351         return PMIX_SUCCESS;
 352     }
 353 
 354     /* We have to wait for all local clients to be registered before
 355      * we can know whether this request is for data from a local or a
 356      * remote client because one client might ask for data about another
 357      * client that the host RM hasn't told us about yet. Fortunately,
 358      * we do know how many clients to expect, so first check to see if
 359      * all clients have been registered with us */
 360      if (!nptr->all_registered) {
 361         pmix_output_verbose(2, pmix_server_globals.get_output,
 362                             "%s:%d NSPACE %s not all registered",
 363                             pmix_globals.myid.nspace,
 364                             pmix_globals.myid.rank, nspace);
 365 
 366         if (localonly) {
 367             /* the client asked that we not wait, so return now */
 368             pmix_output_verbose(2, pmix_server_globals.get_output,
 369                                 "%s:%d CLIENT REQUESTED IMMEDIATE",
 370                                 pmix_globals.myid.nspace,
 371                                 pmix_globals.myid.rank);
 372             return PMIX_ERR_NOT_FOUND;
 373         }
 374         /* we cannot do anything further, so just track this request
 375          * for now */
 376         rc = create_local_tracker(nspace, rank, info, ninfo,
 377                                   cbfunc, cbdata, &lcd, &req);
 378         if (PMIX_ERR_NOMEM == rc) {
 379             PMIX_INFO_FREE(info, ninfo);
 380             return rc;
 381         }
 382         pmix_output_verbose(2, pmix_server_globals.get_output,
 383                             "%s:%d TRACKER CREATED - WAITING",
 384                             pmix_globals.myid.nspace,
 385                             pmix_globals.myid.rank);
 386         /* if they specified a timeout, set it up now */
 387         if (0 < tv.tv_sec) {
 388             pmix_event_evtimer_set(pmix_globals.evbase, &req->ev,
 389                                    get_timeout, req);
 390             pmix_event_evtimer_add(&req->ev, &tv);
 391             req->event_active = true;
 392         }
 393         /* the peer object has been added to the new lcd tracker,
 394          * so return success here */
 395         return PMIX_SUCCESS;
 396     }
 397 
 398     /* if everyone has registered, see if we already have this data */
 399     rc = _satisfy_request(nptr, rank, cd, cbfunc, cbdata, &local);
 400     if( PMIX_SUCCESS == rc ){
 401         /* request was successfully satisfied */
 402         PMIX_INFO_FREE(info, ninfo);
 403         /* return success as the satisfy_request function
 404          * calls the cbfunc for us, and it will have
 405          * released the cbdata object */
 406         return PMIX_SUCCESS;
 407     }
 408 
 409     pmix_output_verbose(2, pmix_server_globals.get_output,
 410                         "%s:%d DATA NOT FOUND",
 411                         pmix_globals.myid.nspace,
 412                         pmix_globals.myid.rank);
 413 
 414     /* If we get here, then we don't have the data at this time. If
 415      * the user doesn't want to look for it, then we are done */
 416     if (localonly) {
 417         pmix_output_verbose(2, pmix_server_globals.get_output,
 418                             "%s:%d CLIENT REQUESTED IMMEDIATE",
 419                             pmix_globals.myid.nspace,
 420                             pmix_globals.myid.rank);
 421         return PMIX_ERR_NOT_FOUND;
 422     }
 423 
 424     /* Check to see if we already have a pending request for the data - if
 425      * we do, then we can just wait for it to arrive */
 426     rc = create_local_tracker(nspace, rank, info, ninfo,
 427                               cbfunc, cbdata, &lcd, &req);
 428     if (PMIX_ERR_NOMEM == rc || NULL == lcd) {
 429         /* we have a problem */
 430         PMIX_INFO_FREE(info, ninfo);
 431         return PMIX_ERR_NOMEM;
 432     }
 433     /* if they specified a timeout, set it up now */
 434     if (0 < tv.tv_sec) {
 435         pmix_event_evtimer_set(pmix_globals.evbase, &req->ev,
 436                                get_timeout, req);
 437         pmix_event_evtimer_add(&req->ev, &tv);
 438         req->event_active = true;
 439     }
 440     if (PMIX_SUCCESS == rc) {
 441        /* we are already waiting for the data - nothing more
 442         * for us to do as the function added the new request
 443         * to the tracker for us */
 444        return PMIX_SUCCESS;
 445     }
 446 
 447     /* Getting here means that we didn't already have a request for
 448      * for data pending, and so we created a new tracker for this
 449      * request. We know the identity of all our local clients, so
 450      * if this is one, then we have nothing further to do - we will
 451      * fulfill the request once the process commits its data */
 452     if (local) {
 453         return PMIX_SUCCESS;
 454     }
 455 
 456     /* this isn't a local client of ours, so we need to ask the host
 457      * resource manager server to please get the info for us from
 458      * whomever is hosting the target process */
 459     if (NULL != pmix_host_server.direct_modex) {
 460         rc = pmix_host_server.direct_modex(&lcd->proc, info, ninfo, dmdx_cbfunc, lcd);
 461         if (PMIX_SUCCESS != rc) {
 462             /* may have a function entry but not support the request */
 463             PMIX_INFO_FREE(info, ninfo);
 464             pmix_list_remove_item(&pmix_server_globals.local_reqs, &lcd->super);
 465             PMIX_RELEASE(lcd);
 466         }
 467     } else {
 468         pmix_output_verbose(2, pmix_server_globals.get_output,
 469                             "%s:%d NO SERVER SUPPORT",
 470                             pmix_globals.myid.nspace,
 471                             pmix_globals.myid.rank);
 472         /* if we don't have direct modex feature, just respond with "not found" */
 473         PMIX_INFO_FREE(info, ninfo);
 474         pmix_list_remove_item(&pmix_server_globals.local_reqs, &lcd->super);
 475         PMIX_RELEASE(lcd);
 476         rc = PMIX_ERR_NOT_FOUND;
 477     }
 478 
 479     return rc;
 480 }
 481 
 482 static pmix_status_t create_local_tracker(char nspace[], pmix_rank_t rank,
 483                                           pmix_info_t info[], size_t ninfo,
 484                                           pmix_modex_cbfunc_t cbfunc,
 485                                           void *cbdata,
 486                                           pmix_dmdx_local_t **ld,
 487                                           pmix_dmdx_request_t **rq)
 488 {
 489     pmix_dmdx_local_t *lcd, *cd;
 490     pmix_dmdx_request_t *req;
 491     pmix_status_t rc;
 492 
 493     /* define default */
 494     *ld = NULL;
 495     *rq = NULL;
 496 
 497     /* see if we already have an existing request for data
 498      * from this namespace/rank */
 499     lcd = NULL;
 500     PMIX_LIST_FOREACH(cd, &pmix_server_globals.local_reqs, pmix_dmdx_local_t) {
 501         if (0 != strncmp(nspace, cd->proc.nspace, PMIX_MAX_NSLEN) ||
 502                 rank != cd->proc.rank ) {
 503             continue;
 504         }
 505         lcd = cd;
 506         break;
 507     }
 508     if (NULL != lcd) {
 509         /* we already have a request, so just track that someone
 510          * else wants data from the same target */
 511         rc = PMIX_SUCCESS; // indicates we found an existing request
 512         goto complete;
 513     }
 514     /* we do not have an existing request, so let's create
 515      * one and add it to our list */
 516     lcd = PMIX_NEW(pmix_dmdx_local_t);
 517     if (NULL == lcd){
 518         return PMIX_ERR_NOMEM;
 519     }
 520     pmix_strncpy(lcd->proc.nspace, nspace, PMIX_MAX_NSLEN);
 521     lcd->proc.rank = rank;
 522     lcd->info = info;
 523     lcd->ninfo = ninfo;
 524     pmix_list_append(&pmix_server_globals.local_reqs, &lcd->super);
 525     rc = PMIX_ERR_NOT_FOUND;  // indicates that we created a new request tracker
 526 
 527   complete:
 528     /* track this specific requestor so we return the
 529      * data to them */
 530     req = PMIX_NEW(pmix_dmdx_request_t);
 531     if (NULL == req) {
 532         *ld = lcd;
 533         return PMIX_ERR_NOMEM;
 534     }
 535     PMIX_RETAIN(lcd);
 536     req->lcd = lcd;
 537     req->cbfunc = cbfunc;
 538     req->cbdata = cbdata;
 539     pmix_list_append(&lcd->loc_reqs, &req->super);
 540     *ld = lcd;
 541     *rq = req;
 542     return rc;
 543 }
 544 
 545 void pmix_pending_nspace_requests(pmix_namespace_t *nptr)
 546 {
 547     pmix_dmdx_local_t *cd, *cd_next;
 548     pmix_status_t rc;
 549 
 550     /* Now that we know all local ranks, go along request list and ask for remote data
 551      * for the non-local ranks, and resolve all pending requests for local procs
 552      * that were waiting for registration to complete
 553      */
 554     PMIX_LIST_FOREACH_SAFE(cd, cd_next, &pmix_server_globals.local_reqs, pmix_dmdx_local_t) {
 555         pmix_rank_info_t *info;
 556         bool found = false;
 557 
 558         if (0 != strncmp(nptr->nspace, cd->proc.nspace, PMIX_MAX_NSLEN) ) {
 559             continue;
 560         }
 561 
 562         PMIX_LIST_FOREACH(info, &nptr->ranks, pmix_rank_info_t) {
 563             if (info->pname.rank == cd->proc.rank) {
 564                 found = true;  // we will satisy this request upon commit from new proc
 565                 break;
 566             }
 567         }
 568 
 569         /* if not found - this is remote process and we need to send
 570          * corresponding direct modex request */
 571         if (!found){
 572             rc = PMIX_ERR_NOT_SUPPORTED;
 573             if (NULL != pmix_host_server.direct_modex){
 574                 rc = pmix_host_server.direct_modex(&cd->proc, cd->info, cd->ninfo, dmdx_cbfunc, cd);
 575             }
 576             if (PMIX_SUCCESS != rc) {
 577                 pmix_dmdx_request_t *req, *req_next;
 578                 PMIX_LIST_FOREACH_SAFE(req, req_next, &cd->loc_reqs, pmix_dmdx_request_t) {
 579                     req->cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, req->cbdata, NULL, NULL);
 580                     pmix_list_remove_item(&cd->loc_reqs, &req->super);
 581                     PMIX_RELEASE(req);
 582                 }
 583                 pmix_list_remove_item(&pmix_server_globals.local_reqs, &cd->super);
 584                 PMIX_RELEASE(cd);
 585             }
 586         }
 587     }
 588 }
 589 
 590 static pmix_status_t _satisfy_request(pmix_namespace_t *nptr, pmix_rank_t rank,
 591                                       pmix_server_caddy_t *cd,
 592                                       pmix_modex_cbfunc_t cbfunc,
 593                                       void *cbdata, bool *local)
 594 {
 595     pmix_status_t rc;
 596     bool found = false;
 597     pmix_buffer_t pbkt, pkt;
 598     pmix_rank_info_t *iptr;
 599     pmix_proc_t proc;
 600     pmix_cb_t cb;
 601     pmix_peer_t *peer = NULL;
 602     pmix_byte_object_t bo;
 603     char *data = NULL;
 604     size_t sz = 0;
 605     pmix_scope_t scope = PMIX_SCOPE_UNDEF;
 606 
 607     pmix_output_verbose(2, pmix_server_globals.get_output,
 608                         "%s:%d SATISFY REQUEST CALLED",
 609                         pmix_globals.myid.nspace,
 610                         pmix_globals.myid.rank);
 611 
 612     /* check to see if this data already has been
 613      * obtained as a result of a prior direct modex request from
 614      * a remote peer, or due to data from a local client
 615      * having been committed */
 616     PMIX_CONSTRUCT(&pbkt, pmix_buffer_t);
 617     pmix_strncpy(proc.nspace, nptr->nspace, PMIX_MAX_NSLEN);
 618 
 619     /* if we have local clients of this nspace, then we use
 620      * the corresponding GDS to retrieve the data. Otherwise,
 621      * the data will have been stored under our GDS */
 622     if (0 < nptr->nlocalprocs) {
 623         if (local) {
 624             *local = true;
 625         }
 626         if (PMIX_RANK_WILDCARD != rank) {
 627             peer = NULL;
 628             /* see if the requested rank is local */
 629             PMIX_LIST_FOREACH(iptr, &nptr->ranks, pmix_rank_info_t) {
 630                 if (rank == iptr->pname.rank) {
 631                     scope = PMIX_LOCAL;
 632                     if (0 <= iptr->peerid) {
 633                         peer = (pmix_peer_t*)pmix_pointer_array_get_item(&pmix_server_globals.clients, iptr->peerid);
 634                     }
 635                     if (NULL == peer) {
 636                         /* this rank has not connected yet, so this request needs to be held */
 637                         return PMIX_ERR_NOT_FOUND;
 638                     }
 639                     break;
 640                 }
 641             }
 642             if (PMIX_LOCAL != scope)  {
 643                 /* this must be a remote rank */
 644                 if (local) {
 645                     *local = false;
 646                 }
 647                 scope = PMIX_REMOTE;
 648                 peer = pmix_globals.mypeer;
 649             }
 650         }
 651     } else {
 652         if (local) {
 653             *local = false;
 654         }
 655         peer = pmix_globals.mypeer;
 656         scope = PMIX_REMOTE;
 657     }
 658 
 659     /* if they are asking about a rank from an nspace different
 660      * from their own, or they gave a rank of "wildcard", then
 661      * include a copy of the job-level info */
 662     if (PMIX_RANK_WILDCARD == rank ||
 663         0 != strncmp(nptr->nspace, cd->peer->info->pname.nspace, PMIX_MAX_NSLEN)) {
 664         proc.rank = PMIX_RANK_WILDCARD;
 665         PMIX_CONSTRUCT(&cb, pmix_cb_t);
 666         /* this data is requested by a local client, so give the gds the option
 667          * of returning a copy of the data, or a pointer to
 668          * local storage */
 669         cb.proc = &proc;
 670         cb.scope = PMIX_INTERNAL;
 671         cb.copy = false;
 672         PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, &cb);
 673         if (PMIX_SUCCESS == rc) {
 674             PMIX_CONSTRUCT(&pkt, pmix_buffer_t);
 675             /* assemble the provided data into a byte object */
 676             PMIX_GDS_ASSEMB_KVS_REQ(rc, cd->peer, &proc, &cb.kvs, &pkt, cd);
 677             if (rc != PMIX_SUCCESS) {
 678                 PMIX_ERROR_LOG(rc);
 679                 PMIX_DESTRUCT(&pkt);
 680                 PMIX_DESTRUCT(&pbkt);
 681                 PMIX_DESTRUCT(&cb);
 682                 return rc;
 683             }
 684             if (PMIX_PROC_IS_V1(cd->peer)) {
 685                 /* if the client is using v1, then it expects the
 686                  * data returned to it as the rank followed by abyte object containing
 687                  * a buffer - so we have to do a little gyration */
 688                 pmix_buffer_t xfer;
 689                 PMIX_CONSTRUCT(&xfer, pmix_buffer_t);
 690                 PMIX_BFROPS_PACK(rc, cd->peer, &xfer, &pkt, 1, PMIX_BUFFER);
 691                 if (PMIX_SUCCESS != rc) {
 692                     PMIX_ERROR_LOG(rc);
 693                     PMIX_DESTRUCT(&pkt);
 694                     PMIX_DESTRUCT(&pbkt);
 695                     PMIX_DESTRUCT(&xfer);
 696                     PMIX_DESTRUCT(&cb);
 697                     return rc;
 698                 }
 699                 PMIX_UNLOAD_BUFFER(&xfer, bo.bytes, bo.size);
 700                 PMIX_DESTRUCT(&xfer);
 701             } else {
 702                 PMIX_UNLOAD_BUFFER(&pkt, bo.bytes, bo.size);
 703             }
 704             PMIX_DESTRUCT(&pkt);
 705             /* pack it for transmission */
 706             PMIX_BFROPS_PACK(rc, cd->peer, &pbkt, &bo, 1, PMIX_BYTE_OBJECT);
 707             if (PMIX_SUCCESS != rc) {
 708                 PMIX_ERROR_LOG(rc);
 709                 PMIX_DESTRUCT(&pbkt);
 710                 PMIX_DESTRUCT(&cb);
 711                 return rc;
 712             }
 713         }
 714         PMIX_DESTRUCT(&cb);
 715         if (rank == PMIX_RANK_WILDCARD) {
 716             found = true;
 717         }
 718     }
 719 
 720     /* retrieve the data for the specific rank they are asking about */
 721     if (PMIX_RANK_WILDCARD != rank) {
 722         if (!PMIX_PROC_IS_SERVER(peer) && !peer->commit_cnt) {
 723             /* this condition works only for local requests, server does
 724              * count commits for local ranks, and check this count when
 725              * local request.
 726              * if that request performs for remote rank on the remote
 727              * node (by direct modex) so `peer->commit_cnt` should be ignored,
 728              * it is can not be counted for the remote side and this condition
 729              * does not matter for remote case */
 730             return PMIX_ERR_NOT_FOUND;
 731         }
 732         proc.rank = rank;
 733         PMIX_CONSTRUCT(&cb, pmix_cb_t);
 734         /* this is a local request, so give the gds the option
 735          * of returning a copy of the data, or a pointer to
 736          * local storage */
 737         cb.proc = &proc;
 738         cb.scope = scope;
 739         cb.copy = false;
 740         PMIX_GDS_FETCH_KV(rc, peer, &cb);
 741         if (PMIX_SUCCESS == rc) {
 742             found = true;
 743             PMIX_CONSTRUCT(&pkt, pmix_buffer_t);
 744             /* assemble the provided data into a byte object */
 745             PMIX_GDS_ASSEMB_KVS_REQ(rc, cd->peer, &proc, &cb.kvs, &pkt, cd);
 746             if (rc != PMIX_SUCCESS) {
 747                 PMIX_ERROR_LOG(rc);
 748                 PMIX_DESTRUCT(&pkt);
 749                 PMIX_DESTRUCT(&pbkt);
 750                 PMIX_DESTRUCT(&cb);
 751                 return rc;
 752             }
 753             if (PMIX_PROC_IS_V1(cd->peer)) {
 754                 /* if the client is using v1, then it expects the
 755                  * data returned to it in a different order than v2
 756                  * - so we have to do a little gyration */
 757                 /* pack the rank */
 758                 PMIX_BFROPS_PACK(rc, cd->peer, &pbkt, &rank, 1, PMIX_PROC_RANK);
 759                 if (PMIX_SUCCESS != rc) {
 760                     PMIX_ERROR_LOG(rc);
 761                     PMIX_DESTRUCT(&pkt);
 762                     PMIX_DESTRUCT(&pbkt);
 763                     PMIX_DESTRUCT(&cb);
 764                     return rc;
 765                 }
 766                 /* now pack the data itself as a buffer */
 767                 PMIX_BFROPS_PACK(rc, cd->peer, &pbkt, &pkt, 1, PMIX_BUFFER);
 768                 if (PMIX_SUCCESS != rc) {
 769                     PMIX_ERROR_LOG(rc);
 770                     PMIX_DESTRUCT(&pkt);
 771                     PMIX_DESTRUCT(&pbkt);
 772                     PMIX_DESTRUCT(&cb);
 773                     return rc;
 774                 }
 775                 PMIX_DESTRUCT(&pkt);
 776             } else {
 777                 PMIX_UNLOAD_BUFFER(&pkt, bo.bytes, bo.size);
 778                 PMIX_DESTRUCT(&pkt);
 779                 /* pack it for transmission */
 780                 PMIX_BFROPS_PACK(rc, cd->peer, &pbkt, &bo, 1, PMIX_BYTE_OBJECT);
 781                 if (PMIX_SUCCESS != rc) {
 782                     PMIX_ERROR_LOG(rc);
 783                     PMIX_DESTRUCT(&pbkt);
 784                     PMIX_DESTRUCT(&cb);
 785                     return rc;
 786                 }
 787             }
 788         }
 789         PMIX_DESTRUCT(&cb);
 790     }
 791     PMIX_UNLOAD_BUFFER(&pbkt, data, sz);
 792     PMIX_DESTRUCT(&pbkt);
 793 
 794     if (found) {
 795         /* pass it back */
 796         cbfunc(rc, data, sz, cbdata, relfn, data);
 797         return rc;
 798     }
 799 
 800     return PMIX_ERR_NOT_FOUND;
 801 }
 802 
 803 /* Resolve pending requests to this namespace/rank */
 804 pmix_status_t pmix_pending_resolve(pmix_namespace_t *nptr, pmix_rank_t rank,
 805                                    pmix_status_t status, pmix_dmdx_local_t *lcd)
 806 {
 807     pmix_dmdx_local_t *cd, *ptr;
 808     pmix_dmdx_request_t *req;
 809     pmix_server_caddy_t *scd;
 810 
 811     /* find corresponding request (if exists) */
 812     if (NULL == lcd) {
 813         ptr = NULL;
 814         if (NULL != nptr) {
 815             PMIX_LIST_FOREACH(cd, &pmix_server_globals.local_reqs, pmix_dmdx_local_t) {
 816                 if (!PMIX_CHECK_NSPACE(nptr->nspace, cd->proc.nspace) ||
 817                         rank != cd->proc.rank) {
 818                     continue;
 819                 }
 820                 ptr = cd;
 821                 break;
 822             }
 823         }
 824         if (NULL == ptr) {
 825             return PMIX_SUCCESS;
 826         }
 827     } else {
 828         ptr = lcd;
 829     }
 830 
 831     /* if there are no local reqs on this request (e.g., only
 832      * one proc requested it and that proc has died), then
 833      * just remove the request */
 834     if (0 == pmix_list_get_size(&ptr->loc_reqs)) {
 835         goto cleanup;
 836     }
 837 
 838     /* somebody was interested in this rank */
 839     if (PMIX_SUCCESS != status){
 840         /* if we've got an error for this request - just forward it*/
 841         PMIX_LIST_FOREACH(req, &ptr->loc_reqs, pmix_dmdx_request_t) {
 842             req->cbfunc(status, NULL, 0, req->cbdata, NULL, NULL);
 843         }
 844     } else if (NULL != nptr) {
 845         /* if we've got the blob - try to satisfy requests */
 846         /* run through all the requests to this rank */
 847         /* this info is going back to one of our peers, so provide a server
 848          * caddy with our peer in it so the data gets packed correctly */
 849         scd = PMIX_NEW(pmix_server_caddy_t);
 850         PMIX_RETAIN(pmix_globals.mypeer);
 851         scd->peer = pmix_globals.mypeer;
 852         PMIX_LIST_FOREACH(req, &ptr->loc_reqs, pmix_dmdx_request_t) {
 853             pmix_status_t rc;
 854             rc = _satisfy_request(nptr, rank, scd, req->cbfunc, req->cbdata, NULL);
 855             if( PMIX_SUCCESS != rc ){
 856                 /* if we can't satisfy this particular request (missing key?) */
 857                 req->cbfunc(rc, NULL, 0, req->cbdata, NULL, NULL);
 858             }
 859         }
 860         PMIX_RELEASE(scd);
 861     }
 862 
 863   cleanup:
 864     /* remove all requests to this rank and cleanup the corresponding structure */
 865     pmix_list_remove_item(&pmix_server_globals.local_reqs, &ptr->super);
 866     PMIX_RELEASE(ptr);
 867 
 868     return PMIX_SUCCESS;
 869 }
 870 
 871 /* process the returned data from the host RM server */
 872 static void _process_dmdx_reply(int fd, short args, void *cbdata)
 873 {
 874     pmix_dmdx_reply_caddy_t *caddy = (pmix_dmdx_reply_caddy_t *)cbdata;
 875     pmix_server_caddy_t *cd;
 876     pmix_peer_t *peer;
 877     pmix_rank_info_t *rinfo;
 878     int32_t cnt;
 879     pmix_kval_t *kv;
 880     pmix_namespace_t *ns, *nptr;
 881     pmix_status_t rc;
 882     pmix_list_t nspaces;
 883     pmix_nspace_caddy_t *nm;
 884     pmix_dmdx_request_t *dm;
 885     bool found;
 886     pmix_buffer_t pbkt;
 887     pmix_cb_t cb;
 888 
 889     PMIX_ACQUIRE_OBJECT(caddy);
 890 
 891     pmix_output_verbose(2, pmix_server_globals.get_output,
 892                     "[%s:%d] process dmdx reply from %s:%u",
 893                     __FILE__, __LINE__,
 894                     caddy->lcd->proc.nspace, caddy->lcd->proc.rank);
 895 
 896     /* find the nspace object for the proc whose data is being received */
 897     nptr = NULL;
 898     PMIX_LIST_FOREACH(ns, &pmix_server_globals.nspaces, pmix_namespace_t) {
 899         if (0 == strcmp(caddy->lcd->proc.nspace, ns->nspace)) {
 900             nptr = ns;
 901             break;
 902         }
 903     }
 904 
 905     if (NULL == nptr) {
 906         /* We may not have this namespace because there are no local
 907          * processes from it running on this host - so just record it
 908          * so we know we have the data for any future requests */
 909         nptr = PMIX_NEW(pmix_namespace_t);
 910         nptr->nspace = strdup(caddy->lcd->proc.nspace);
 911         /* add to the list */
 912         pmix_list_append(&pmix_server_globals.nspaces, &nptr->super);
 913     }
 914 
 915     /* if the request was successfully satisfied, then store the data.
 916      * Although we could immediately
 917      * resolve any outstanding requests on our tracking list, we instead
 918      * store the data first so we can immediately satisfy any future
 919      * requests. Then, rather than duplicate the resolve code here, we
 920      * will let the pmix_pending_resolve function go ahead and retrieve
 921      * it from the GDS
 922      *
 923      * NOTE: if the data returned is NULL, then it has already been
 924      * stored (e.g., via a register_nspace call in response to a request
 925      * for job-level data). For now, we will retrieve it so it can
 926      * be stored for each peer */
 927     if (PMIX_SUCCESS == caddy->status) {
 928         /* cycle across all outstanding local requests and collect their
 929          * unique nspaces so we can store this for each one */
 930         PMIX_CONSTRUCT(&nspaces, pmix_list_t);
 931         PMIX_LIST_FOREACH(dm, &caddy->lcd->loc_reqs, pmix_dmdx_request_t) {
 932             /* this is a local proc that has requested this data - search
 933              * the list of nspace's and see if we already have it */
 934             cd = (pmix_server_caddy_t*)dm->cbdata;
 935             found = false;
 936             PMIX_LIST_FOREACH(nm, &nspaces, pmix_nspace_caddy_t) {
 937                 if (0 == strcmp(nm->ns->nspace, cd->peer->nptr->nspace)) {
 938                     found = true;
 939                     break;
 940                 }
 941             }
 942             if (!found) {
 943                 /* add it */
 944                 nm = PMIX_NEW(pmix_nspace_caddy_t);
 945                 PMIX_RETAIN(cd->peer->nptr);
 946                 nm->ns = cd->peer->nptr;
 947                 pmix_list_append(&nspaces, &nm->super);
 948             }
 949         }
 950         /* now go thru each unique nspace and store the data using its
 951          * assigned GDS component */
 952         PMIX_LIST_FOREACH(nm, &nspaces, pmix_nspace_caddy_t) {
 953             if (NULL == nm->ns->compat.gds || 0 == nm->ns->nlocalprocs) {
 954                 peer = pmix_globals.mypeer;
 955             } else {
 956                 /* there must be at least one local proc */
 957                 rinfo = (pmix_rank_info_t*)pmix_list_get_first(&nm->ns->ranks);
 958                 peer = (pmix_peer_t*)pmix_pointer_array_get_item(&pmix_server_globals.clients, rinfo->peerid);
 959             }
 960             PMIX_CONSTRUCT(&pbkt, pmix_buffer_t);
 961             if (NULL == caddy->data) {
 962                 /* we assume that the data was provided via a call to
 963                  * register_nspace, so what we need to do now is simply
 964                  * transfer it across to the individual nspace storage
 965                  * components */
 966                 PMIX_CONSTRUCT(&cb, pmix_cb_t);
 967                 PMIX_PROC_CREATE(cb.proc, 1);
 968                 if (NULL == cb.proc) {
 969                     PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
 970                     PMIX_DESTRUCT(&cb);
 971                     goto complete;
 972                 }
 973                 pmix_strncpy(cb.proc->nspace, nm->ns->nspace, PMIX_MAX_NSLEN);
 974                 cb.proc->rank = PMIX_RANK_WILDCARD;
 975                 cb.scope = PMIX_INTERNAL;
 976                 cb.copy = false;
 977                 PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, &cb);
 978                 if (PMIX_SUCCESS != rc) {
 979                     PMIX_ERROR_LOG(rc);
 980                     PMIX_DESTRUCT(&cb);
 981                     goto complete;
 982                 }
 983                 PMIX_LIST_FOREACH(kv, &cb.kvs, pmix_kval_t) {
 984                     PMIX_GDS_STORE_KV(rc, peer, &caddy->lcd->proc, PMIX_INTERNAL, kv);
 985                     if (PMIX_SUCCESS != rc) {
 986                         PMIX_ERROR_LOG(rc);
 987                         break;
 988                     }
 989                 }
 990                 PMIX_DESTRUCT(&cb);
 991             } else {
 992                 PMIX_LOAD_BUFFER(pmix_globals.mypeer, &pbkt, caddy->data, caddy->ndata);
 993                 /* unpack and store it*/
 994                 kv = PMIX_NEW(pmix_kval_t);
 995                 cnt = 1;
 996                 PMIX_BFROPS_UNPACK(rc, pmix_globals.mypeer, &pbkt, kv, &cnt, PMIX_KVAL);
 997                 while (PMIX_SUCCESS == rc) {
 998                     if (caddy->lcd->proc.rank == PMIX_RANK_WILDCARD) {
 999                         PMIX_GDS_STORE_KV(rc, peer, &caddy->lcd->proc, PMIX_INTERNAL, kv);
1000                     } else {
1001                         PMIX_GDS_STORE_KV(rc, peer, &caddy->lcd->proc, PMIX_REMOTE, kv);
1002                     }
1003                     if (PMIX_SUCCESS != rc) {
1004                         PMIX_ERROR_LOG(rc);
1005                         caddy->status = rc;
1006                         goto complete;
1007                     }
1008                     PMIX_RELEASE(kv);
1009                     kv = PMIX_NEW(pmix_kval_t);
1010                     cnt = 1;
1011                     PMIX_BFROPS_UNPACK(rc, pmix_globals.mypeer, &pbkt, kv, &cnt, PMIX_KVAL);
1012                 }
1013                 PMIX_RELEASE(kv);
1014                 pbkt.base_ptr = NULL;  // protect the data
1015                 PMIX_DESTRUCT(&pbkt);
1016                 if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
1017                     PMIX_ERROR_LOG(rc);
1018                     caddy->status = rc;
1019                     goto complete;
1020                 }
1021             }
1022         }
1023         PMIX_LIST_DESTRUCT(&nspaces);
1024     }
1025 
1026   complete:
1027     /* always execute the callback to avoid having the client hang */
1028     pmix_pending_resolve(nptr, caddy->lcd->proc.rank, caddy->status, caddy->lcd);
1029 
1030     /* now call the release function so the host server
1031      * knows it can release the data */
1032     if (NULL != caddy->relcbfunc) {
1033         caddy->relcbfunc(caddy->cbdata);
1034     }
1035     PMIX_RELEASE(caddy);
1036 }
1037 
1038 /* this is the callback function that the host RM server will call
1039  * when it gets requested info back from a remote server */
1040 static void dmdx_cbfunc(pmix_status_t status,
1041                         const char *data, size_t ndata, void *cbdata,
1042                         pmix_release_cbfunc_t release_fn, void *release_cbdata)
1043 {
1044     pmix_dmdx_reply_caddy_t *caddy;
1045 
1046     /* because the host RM is calling us from their own thread, we
1047      * need to thread-shift into our local progress thread before
1048      * accessing any global info */
1049     caddy = PMIX_NEW(pmix_dmdx_reply_caddy_t);
1050     caddy->status = status;
1051     /* point to the callers cbfunc */
1052     caddy->relcbfunc = release_fn;
1053     caddy->cbdata = release_cbdata;
1054 
1055     /* point to the returned data and our own internal
1056      * tracker */
1057     caddy->data   = data;
1058     caddy->ndata  = ndata;
1059     caddy->lcd    = (pmix_dmdx_local_t *)cbdata;
1060     pmix_output_verbose(2, pmix_server_globals.get_output,
1061                         "[%s:%d] queue dmdx reply for %s:%u",
1062                         __FILE__, __LINE__,
1063                         caddy->lcd->proc.nspace, caddy->lcd->proc.rank);
1064     PMIX_THREADSHIFT(caddy, _process_dmdx_reply);
1065 }
1066 
1067 static void get_timeout(int sd, short args, void *cbdata)
1068 {
1069     pmix_dmdx_request_t *req = (pmix_dmdx_request_t*)cbdata;
1070 
1071     pmix_output_verbose(2, pmix_server_globals.get_output,
1072                         "ALERT: get timeout fired");
1073     /* execute the provided callback function with the error */
1074     if (NULL != req->cbfunc) {
1075         req->cbfunc(PMIX_ERR_TIMEOUT, NULL, 0, req->cbdata, NULL, NULL);
1076     }
1077     req->event_active = false;
1078     pmix_list_remove_item(&req->lcd->loc_reqs, &req->super);
1079     PMIX_RELEASE(req);
1080 }

/* [<][>][^][v][top][bottom][index][help] */