root/opal/mca/pmix/pmix4x/pmix/src/client/pmix_client_get.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. PMIx_Get
  2. PMIx_Get_nb
  3. _value_cbfunc
  4. _pack_get
  5. _getnb_cbfunc
  6. timeout
  7. process_values
  8. infocb
  9. _getfn_fastpath
  10. _getnbfn

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2014-2019 Intel, Inc.  All rights reserved.
   4  * Copyright (c) 2014-2019 Research Organization for Information Science
   5  *                         and Technology (RIST).  All rights reserved.
   6  * Copyright (c) 2014      Artem Y. Polyakov <artpol84@gmail.com>.
   7  *                         All rights reserved.
   8  * Copyright (c) 2016-2018 Mellanox Technologies, Inc.
   9  *                         All rights reserved.
  10  * Copyright (c) 2016      IBM Corporation.  All rights reserved.
  11  * $COPYRIGHT$
  12  *
  13  * Additional copyrights may follow
  14  *
  15  * $HEADER$
  16  */
  17 
  18 #include <src/include/pmix_config.h>
  19 
  20 #include <src/include/pmix_stdint.h>
  21 
  22 #include <pmix.h>
  23 #include <pmix_rename.h>
  24 
  25 #include "src/include/pmix_globals.h"
  26 
  27 #ifdef HAVE_STRING_H
  28 #include <string.h>
  29 #endif
  30 #include <fcntl.h>
  31 #ifdef HAVE_UNISTD_H
  32 #include <unistd.h>
  33 #endif
  34 #ifdef HAVE_SYS_SOCKET_H
  35 #include <sys/socket.h>
  36 #endif
  37 #ifdef HAVE_SYS_UN_H
  38 #include <sys/un.h>
  39 #endif
  40 #ifdef HAVE_SYS_UIO_H
  41 #include <sys/uio.h>
  42 #endif
  43 #ifdef HAVE_SYS_TYPES_H
  44 #include <sys/types.h>
  45 #endif
  46 
  47 #include PMIX_EVENT_HEADER
  48 
  49 #include "src/class/pmix_list.h"
  50 #include "src/mca/bfrops/bfrops.h"
  51 #include "src/mca/pcompress/base/base.h"
  52 #include "src/threads/threads.h"
  53 #include "src/util/argv.h"
  54 #include "src/util/error.h"
  55 #include "src/util/hash.h"
  56 #include "src/util/output.h"
  57 #include "src/mca/gds/gds.h"
  58 #include "src/mca/ptl/ptl.h"
  59 
  60 #include "pmix_client_ops.h"
  61 
  62 static pmix_buffer_t* _pack_get(char *nspace, pmix_rank_t rank,
  63                                const pmix_info_t info[], size_t ninfo,
  64                                pmix_cmd_t cmd);
  65 
  66 static void _getnbfn(int sd, short args, void *cbdata);
  67 
  68 static void _getnb_cbfunc(struct pmix_peer_t *pr,
  69                           pmix_ptl_hdr_t *hdr,
  70                           pmix_buffer_t *buf, void *cbdata);
  71 
  72 static void _value_cbfunc(pmix_status_t status, pmix_value_t *kv, void *cbdata);
  73 
  74 static pmix_status_t _getfn_fastpath(const pmix_proc_t *proc, const pmix_key_t key,
  75                                      const pmix_info_t info[], size_t ninfo,
  76                                      pmix_value_t **val);
  77 
  78 static pmix_status_t process_values(pmix_value_t **v, pmix_cb_t *cb);
  79 
  80 
  81 PMIX_EXPORT pmix_status_t PMIx_Get(const pmix_proc_t *proc,
  82                                    const pmix_key_t key,
  83                                    const pmix_info_t info[], size_t ninfo,
  84                                    pmix_value_t **val)
  85 {
  86     pmix_cb_t *cb;
  87     pmix_status_t rc;
  88 
  89     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
  90 
  91     if (pmix_globals.init_cntr <= 0) {
  92         PMIX_RELEASE_THREAD(&pmix_global_lock);
  93         return PMIX_ERR_INIT;
  94     }
  95     PMIX_RELEASE_THREAD(&pmix_global_lock);
  96 
  97     pmix_output_verbose(2, pmix_client_globals.get_output,
  98                         "pmix:client get for %s:%d key %s",
  99                         (NULL == proc) ? "NULL" : proc->nspace,
 100                         (NULL == proc) ? PMIX_RANK_UNDEF : proc->rank,
 101                         (NULL == key) ? "NULL" : key);
 102 
 103     /* try to get data directly, without threadshift */
 104     if (PMIX_SUCCESS == (rc = _getfn_fastpath(proc, key, info, ninfo, val))) {
 105         goto done;
 106     }
 107 
 108     /* create a callback object as we need to pass it to the
 109      * recv routine so we know which callback to use when
 110      * the return message is recvd */
 111     cb = PMIX_NEW(pmix_cb_t);
 112     if (PMIX_SUCCESS != (rc = PMIx_Get_nb(proc, key, info, ninfo, _value_cbfunc, cb))) {
 113         PMIX_RELEASE(cb);
 114         return rc;
 115     }
 116 
 117     /* wait for the data to return */
 118     PMIX_WAIT_THREAD(&cb->lock);
 119     rc = cb->status;
 120     if (NULL != val) {
 121         *val = cb->value;
 122         cb->value = NULL;
 123     }
 124     PMIX_RELEASE(cb);
 125 
 126   done:
 127     pmix_output_verbose(2, pmix_client_globals.get_output,
 128                         "pmix:client get completed");
 129 
 130     return rc;
 131 }
 132 
 133 PMIX_EXPORT pmix_status_t PMIx_Get_nb(const pmix_proc_t *proc, const pmix_key_t key,
 134                                       const pmix_info_t info[], size_t ninfo,
 135                                       pmix_value_cbfunc_t cbfunc, void *cbdata)
 136 {
 137     pmix_cb_t *cb;
 138     int rank;
 139     char *nm;
 140 
 141     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
 142 
 143     if (pmix_globals.init_cntr <= 0) {
 144         PMIX_RELEASE_THREAD(&pmix_global_lock);
 145         return PMIX_ERR_INIT;
 146     }
 147     PMIX_RELEASE_THREAD(&pmix_global_lock);
 148 
 149     /* if the proc is NULL, then the caller is assuming
 150      * that the key is universally unique within the caller's
 151      * own nspace. This most likely indicates that the code
 152      * was originally written for a legacy version of PMI.
 153      *
 154      * If the key is NULL, then the caller wants all
 155      * data from the specified proc. Again, this likely
 156      * indicates use of a legacy version of PMI.
 157      *
 158      * Either case is supported. However, we don't currently
 159      * support the case where -both- values are NULL */
 160     if (NULL == proc && NULL == key) {
 161         pmix_output_verbose(2, pmix_client_globals.get_output,
 162                             "pmix: get_nb value error - both proc and key are NULL");
 163         return PMIX_ERR_BAD_PARAM;
 164     }
 165 
 166     /* if the key is NULL, the rank cannot be WILDCARD as
 167      * we cannot return all info from every rank */
 168     if (NULL != proc && PMIX_RANK_WILDCARD == proc->rank && NULL == key) {
 169         pmix_output_verbose(2, pmix_client_globals.get_output,
 170                             "pmix: get_nb value error - WILDCARD rank and key is NULL");
 171         return PMIX_ERR_BAD_PARAM;
 172     }
 173 
 174     /* if the given proc param is NULL, or the nspace is
 175      * empty, then the caller is referencing our own nspace */
 176     if (NULL == proc || 0 == strlen(proc->nspace)) {
 177         nm = pmix_globals.myid.nspace;
 178     } else {
 179         nm = (char*)proc->nspace;
 180     }
 181 
 182     /* if the proc param is NULL, then we are seeking a key that
 183      * must be globally unique, so communicate this to the hash
 184      * functions with the UNDEF rank */
 185     if (NULL == proc) {
 186         rank = PMIX_RANK_UNDEF;
 187     } else {
 188         rank = proc->rank;
 189     }
 190 
 191     pmix_output_verbose(2, pmix_client_globals.get_output,
 192                         "pmix: get_nb value for proc %s:%u key %s",
 193                         nm, rank, (NULL == key) ? "NULL" : key);
 194 
 195     /* threadshift this request so we can access global structures */
 196     cb = PMIX_NEW(pmix_cb_t);
 197     cb->pname.nspace = strdup(nm);
 198     cb->pname.rank = rank;
 199     cb->key = (char*)key;
 200     cb->info = (pmix_info_t*)info;
 201     cb->ninfo = ninfo;
 202     cb->cbfunc.valuefn = cbfunc;
 203     cb->cbdata = cbdata;
 204     PMIX_THREADSHIFT(cb, _getnbfn);
 205 
 206     return PMIX_SUCCESS;
 207 }
 208 
 209 static void _value_cbfunc(pmix_status_t status, pmix_value_t *kv, void *cbdata)
 210 {
 211     pmix_cb_t *cb = (pmix_cb_t*)cbdata;
 212     pmix_status_t rc;
 213 
 214     PMIX_ACQUIRE_OBJECT(cb);
 215     cb->status = status;
 216     if (PMIX_SUCCESS == status) {
 217         PMIX_BFROPS_COPY(rc, pmix_client_globals.myserver,
 218                          (void**)&cb->value, kv, PMIX_VALUE);
 219         if (PMIX_SUCCESS != rc) {
 220             PMIX_ERROR_LOG(rc);
 221         }
 222     }
 223     PMIX_POST_OBJECT(cb);
 224     PMIX_WAKEUP_THREAD(&cb->lock);
 225 }
 226 
 227 static pmix_buffer_t* _pack_get(char *nspace, pmix_rank_t rank,
 228                                const pmix_info_t info[], size_t ninfo,
 229                                pmix_cmd_t cmd)
 230 {
 231     pmix_buffer_t *msg;
 232     pmix_status_t rc;
 233 
 234     /* nope - see if we can get it */
 235     msg = PMIX_NEW(pmix_buffer_t);
 236     /* pack the get cmd */
 237     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 238                      msg, &cmd, 1, PMIX_COMMAND);
 239     if (PMIX_SUCCESS != rc) {
 240         PMIX_ERROR_LOG(rc);
 241         PMIX_RELEASE(msg);
 242         return NULL;
 243     }
 244     /* pack the request information - we'll get the entire blob
 245      * for this proc, so we don't need to pass the key */
 246     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 247                      msg, &nspace, 1, PMIX_STRING);
 248     if (PMIX_SUCCESS != rc) {
 249         PMIX_ERROR_LOG(rc);
 250         PMIX_RELEASE(msg);
 251         return NULL;
 252     }
 253     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 254                      msg, &rank, 1, PMIX_PROC_RANK);
 255     if (PMIX_SUCCESS != rc) {
 256         PMIX_ERROR_LOG(rc);
 257         PMIX_RELEASE(msg);
 258         return NULL;
 259     }
 260     /* pack the number of info structs */
 261     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 262                      msg, &ninfo, 1, PMIX_SIZE);
 263     if (PMIX_SUCCESS != rc) {
 264         PMIX_ERROR_LOG(rc);
 265         PMIX_RELEASE(msg);
 266         return NULL;
 267     }
 268     if (0 < ninfo) {
 269         PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 270                          msg, info, ninfo, PMIX_INFO);
 271         if (PMIX_SUCCESS != rc) {
 272             PMIX_ERROR_LOG(rc);
 273             PMIX_RELEASE(msg);
 274             return NULL;
 275         }
 276     }
 277     return msg;
 278 }
 279 
 280 
 281 /* this callback is coming from the ptl recv, and thus
 282  * is occurring inside of our progress thread - hence, no
 283  * need to thread shift */
 284 static void _getnb_cbfunc(struct pmix_peer_t *pr,
 285                           pmix_ptl_hdr_t *hdr,
 286                           pmix_buffer_t *buf, void *cbdata)
 287 {
 288     pmix_cb_t *cb = (pmix_cb_t*)cbdata;
 289     pmix_cb_t *cb2;
 290     pmix_status_t rc, ret;
 291     pmix_value_t *val = NULL;
 292     int32_t cnt;
 293     pmix_proc_t proc;
 294     pmix_kval_t *kv;
 295 
 296     pmix_output_verbose(2, pmix_client_globals.get_output,
 297                         "pmix: get_nb callback recvd");
 298 
 299     if (NULL == cb) {
 300         /* nothing we can do */
 301         PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
 302         return;
 303     }
 304 
 305     /* cache the proc id */
 306     pmix_strncpy(proc.nspace, cb->pname.nspace, PMIX_MAX_NSLEN);
 307     proc.rank = cb->pname.rank;
 308 
 309     /* a zero-byte buffer indicates that this recv is being
 310      * completed due to a lost connection */
 311     if (PMIX_BUFFER_IS_EMPTY(buf)) {
 312         ret = PMIX_ERR_UNREACH;
 313         goto done;
 314     }
 315 
 316     /* unpack the status */
 317     cnt = 1;
 318     PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
 319                        buf, &ret, &cnt, PMIX_STATUS);
 320     if (PMIX_SUCCESS != rc) {
 321         PMIX_ERROR_LOG(rc);
 322         pmix_list_remove_item(&pmix_client_globals.pending_requests, &cb->super);
 323         PMIX_RELEASE(cb);
 324         return;
 325     }
 326 
 327     if (PMIX_SUCCESS != ret) {
 328         goto done;
 329     }
 330     PMIX_GDS_ACCEPT_KVS_RESP(rc, pmix_client_globals.myserver, buf);
 331     if (PMIX_SUCCESS != rc) {
 332         goto done;
 333     }
 334 
 335   done:
 336     /* now search any pending requests (including the one this was in
 337      * response to) to see if they can be met. Note that this function
 338      * will only be called if the user requested a specific key - we
 339      * don't support calls to "get" for a NULL key */
 340     PMIX_LIST_FOREACH_SAFE(cb, cb2, &pmix_client_globals.pending_requests, pmix_cb_t) {
 341         if (0 == strncmp(proc.nspace, cb->pname.nspace, PMIX_MAX_NSLEN) &&
 342             cb->pname.rank == proc.rank) {
 343            /* we have the data for this proc - see if we can find the key */
 344             cb->proc = &proc;
 345             cb->scope = PMIX_SCOPE_UNDEF;
 346             /* fetch the data from server peer module - since it is passing
 347              * it back to the user, we need a copy of it */
 348             cb->copy = true;
 349             PMIX_GDS_FETCH_KV(rc, pmix_client_globals.myserver, cb);
 350             if (PMIX_SUCCESS == rc) {
 351                 if (1 != pmix_list_get_size(&cb->kvs)) {
 352                     rc = PMIX_ERR_INVALID_VAL;
 353                     val = NULL;
 354                 } else {
 355                     kv = (pmix_kval_t*)pmix_list_remove_first(&cb->kvs);
 356                     val = kv->value;
 357                     kv->value = NULL; // protect the value
 358                     PMIX_RELEASE(kv);
 359                 }
 360             }
 361             cb->cbfunc.valuefn(rc, val, cb->cbdata);
 362             pmix_list_remove_item(&pmix_client_globals.pending_requests, &cb->super);
 363             PMIX_RELEASE(cb);
 364         }
 365     }
 366 }
 367 
 368 static void timeout(int fd, short flags, void *cbdata)
 369 {
 370     pmix_cb_t *cb = (pmix_cb_t*)cbdata;
 371 
 372     /* let them know that we timed out */
 373     cb->cbfunc.valuefn(PMIX_ERR_TIMEOUT, NULL, cb->cbdata);
 374     cb->timer_running = false;
 375 
 376     /* remove this request */
 377     pmix_list_remove_item(&pmix_client_globals.pending_requests, &cb->super);
 378     PMIX_RELEASE(cb);
 379 }
 380 
 381 static pmix_status_t process_values(pmix_value_t **v, pmix_cb_t *cb)
 382 {
 383     pmix_list_t *kvs = &cb->kvs;
 384     pmix_kval_t *kv;
 385     pmix_value_t *val;
 386     pmix_info_t *info;
 387     size_t ninfo, n;
 388 
 389     if (NULL != cb->key && 1 == pmix_list_get_size(kvs)) {
 390         kv = (pmix_kval_t*)pmix_list_get_first(kvs);
 391         *v = kv->value;
 392         kv->value = NULL;  // protect the value
 393         return PMIX_SUCCESS;
 394     }
 395     /* we will return the data as an array of pmix_info_t
 396      * in the kvs pmix_value_t */
 397     val = (pmix_value_t*)malloc(sizeof(pmix_value_t));
 398     if (NULL == val) {
 399         return PMIX_ERR_NOMEM;
 400     }
 401     val->type = PMIX_DATA_ARRAY;
 402     val->data.darray = (pmix_data_array_t*)malloc(sizeof(pmix_data_array_t));
 403     if (NULL == val->data.darray) {
 404         PMIX_VALUE_RELEASE(val);
 405         return PMIX_ERR_NOMEM;
 406     }
 407     val->data.darray->type = PMIX_INFO;
 408     val->data.darray->size = 0;
 409     val->data.darray->array = NULL;
 410     ninfo = pmix_list_get_size(kvs);
 411     PMIX_INFO_CREATE(info, ninfo);
 412     if (NULL == info) {
 413         PMIX_VALUE_RELEASE(val);
 414         return PMIX_ERR_NOMEM;
 415     }
 416     /* copy the list elements */
 417     n=0;
 418     PMIX_LIST_FOREACH(kv, kvs, pmix_kval_t) {
 419         pmix_strncpy(info[n].key, kv->key, PMIX_MAX_KEYLEN);
 420         pmix_value_xfer(&info[n].value, kv->value);
 421         ++n;
 422     }
 423     val->data.darray->size = ninfo;
 424     val->data.darray->array = info;
 425     *v = val;
 426     return PMIX_SUCCESS;
 427 }
 428 
 429 static void infocb(pmix_status_t status,
 430                    pmix_info_t *info, size_t ninfo,
 431                    void *cbdata,
 432                    pmix_release_cbfunc_t release_fn,
 433                    void *release_cbdata)
 434 {
 435     pmix_query_caddy_t *cd = (pmix_query_caddy_t*)cbdata;
 436     pmix_value_t *kv = NULL;
 437     pmix_status_t rc;
 438 
 439     if (PMIX_SUCCESS == status) {
 440         if (NULL != info) {
 441             /* there should be only one returned value */
 442             if (1 != ninfo) {
 443                 rc = PMIX_ERR_INVALID_VAL;
 444             } else {
 445                 PMIX_VALUE_CREATE(kv, 1);
 446                 if (NULL == kv) {
 447                     rc = PMIX_ERR_NOMEM;
 448                 } else {
 449                     /* if this is a compressed string, then uncompress it */
 450                     if (PMIX_COMPRESSED_STRING == info[0].value.type) {
 451                         kv->type = PMIX_STRING;
 452                         pmix_compress.decompress_string(&kv->data.string,
 453                                                         (uint8_t*)info[0].value.data.bo.bytes,
 454                                                         info[0].value.data.bo.size);
 455                         if (NULL == kv->data.string) {
 456                             PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
 457                             rc = PMIX_ERR_NOMEM;
 458                             PMIX_VALUE_FREE(kv, 1);
 459                             kv = NULL;
 460                         } else {
 461                             rc = PMIX_SUCCESS;
 462                         }
 463                     } else {
 464                         rc = pmix_value_xfer(kv, &info[0].value);
 465                     }
 466                 }
 467             }
 468         } else {
 469             rc = PMIX_ERR_NOT_FOUND;
 470         }
 471     } else {
 472         rc = status;
 473     }
 474     if (NULL != cd->valcbfunc) {
 475         cd->valcbfunc(rc, kv, cd->cbdata);
 476     }
 477     PMIX_RELEASE(cd);
 478     if (NULL != kv) {
 479         PMIX_VALUE_FREE(kv, 1);
 480     }
 481     if (NULL != release_fn) {
 482         release_fn(release_cbdata);
 483     }
 484 }
 485 
 486 static pmix_status_t _getfn_fastpath(const pmix_proc_t *proc, const pmix_key_t key,
 487                                      const pmix_info_t info[], size_t ninfo,
 488                                      pmix_value_t **val)
 489 {
 490     pmix_cb_t *cb = PMIX_NEW(pmix_cb_t);
 491     pmix_status_t rc = PMIX_SUCCESS;
 492     size_t n;
 493 
 494     /* scan the incoming directives */
 495     if (NULL != info) {
 496         for (n=0; n < ninfo; n++) {
 497             if (0 == strncmp(info[n].key, PMIX_DATA_SCOPE, PMIX_MAX_KEYLEN)) {
 498                 cb->scope = info[n].value.data.scope;
 499                 break;
 500             }
 501         }
 502     }
 503     cb->proc = (pmix_proc_t*)proc;
 504     cb->copy = true;
 505     cb->key = (char*)key;
 506     cb->info = (pmix_info_t*)info;
 507     cb->ninfo = ninfo;
 508 
 509     PMIX_GDS_FETCH_IS_TSAFE(rc, pmix_globals.mypeer);
 510     if (PMIX_SUCCESS == rc) {
 511         PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, cb);
 512         if (PMIX_SUCCESS == rc) {
 513             goto done;
 514         }
 515     }
 516     PMIX_GDS_FETCH_IS_TSAFE(rc, pmix_client_globals.myserver);
 517     if (PMIX_SUCCESS == rc) {
 518         PMIX_GDS_FETCH_KV(rc, pmix_client_globals.myserver, cb);
 519         if (PMIX_SUCCESS == rc) {
 520             goto done;
 521         }
 522     }
 523     PMIX_RELEASE(cb);
 524     return rc;
 525 
 526   done:
 527     rc = process_values(val, cb);
 528     if (NULL != *val) {
 529         PMIX_VALUE_COMPRESSED_STRING_UNPACK(*val);
 530     }
 531     PMIX_RELEASE(cb);
 532     return rc;
 533 }
 534 
 535 static void _getnbfn(int fd, short flags, void *cbdata)
 536 {
 537     pmix_cb_t *cb = (pmix_cb_t*)cbdata;
 538     pmix_cb_t *cbret;
 539     pmix_buffer_t *msg;
 540     pmix_value_t *val = NULL;
 541     pmix_status_t rc;
 542     size_t n;
 543     pmix_proc_t proc;
 544     bool optional = false;
 545     bool immediate = false;
 546     struct timeval tv;
 547     pmix_query_caddy_t *cd;
 548 
 549     /* cb was passed to us from another thread - acquire it */
 550     PMIX_ACQUIRE_OBJECT(cb);
 551 
 552     pmix_output_verbose(2, pmix_client_globals.get_output,
 553                         "pmix: getnbfn value for proc %s:%u key %s",
 554                         cb->pname.nspace, cb->pname.rank,
 555                         (NULL == cb->key) ? "NULL" : cb->key);
 556 
 557     /* set the proc object identifier */
 558     pmix_strncpy(proc.nspace, cb->pname.nspace, PMIX_MAX_NSLEN);
 559     proc.rank = cb->pname.rank;
 560 
 561     /* scan the incoming directives */
 562     if (NULL != cb->info) {
 563         for (n=0; n < cb->ninfo; n++) {
 564             if (0 == strncmp(cb->info[n].key, PMIX_OPTIONAL, PMIX_MAX_KEYLEN)) {
 565                 optional = PMIX_INFO_TRUE(&cb->info[n]);
 566             } else if (0 == strncmp(cb->info[n].key, PMIX_IMMEDIATE, PMIX_MAX_KEYLEN)) {
 567                 immediate = PMIX_INFO_TRUE(&cb->info[n]);
 568             } else if (0 == strncmp(cb->info[n].key, PMIX_TIMEOUT, PMIX_MAX_KEYLEN)) {
 569                 /* set a timer to kick us out if we don't
 570                  * have an answer within their window */
 571                 if (0 < cb->info[n].value.data.integer) {
 572                     tv.tv_sec = cb->info[n].value.data.integer;
 573                     tv.tv_usec = 0;
 574                     pmix_event_evtimer_set(pmix_globals.evbase, &cb->ev,
 575                                            timeout, cb);
 576                     pmix_event_evtimer_add(&cb->ev, &tv);
 577                     cb->timer_running = true;
 578                 }
 579             } else if (0 == strncmp(cb->info[n].key, PMIX_DATA_SCOPE, PMIX_MAX_KEYLEN)) {
 580                 cb->scope = cb->info[n].value.data.scope;
 581             }
 582         }
 583     }
 584 
 585     /* check the internal storage first */
 586     cb->proc = &proc;
 587     cb->copy = true;
 588     PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, cb);
 589     if (PMIX_SUCCESS == rc) {
 590         pmix_output_verbose(5, pmix_client_globals.get_output,
 591                             "pmix:client data found in internal storage");
 592         rc = process_values(&val, cb);
 593         goto respond;
 594     }
 595     pmix_output_verbose(5, pmix_client_globals.get_output,
 596                         "pmix:client data NOT found in internal storage");
 597 
 598     /* if the key is NULL or starts with "pmix", then they are looking
 599      * for data that was provided by the server at startup */
 600     if (NULL == cb->key || 0 == strncmp(cb->key, "pmix", 4)) {
 601         cb->proc = &proc;
 602         /* fetch the data from my server's module - since we are passing
 603          * it back to the user, we need a copy of it */
 604         cb->copy = true;
 605         PMIX_GDS_FETCH_KV(rc, pmix_client_globals.myserver, cb);
 606         if (PMIX_SUCCESS != rc) {
 607             pmix_output_verbose(5, pmix_client_globals.get_output,
 608                                 "pmix:client job-level data NOT found");
 609             if (0 != strncmp(cb->pname.nspace, pmix_globals.myid.nspace, PMIX_MAX_NSLEN)) {
 610                 /* we are asking about the job-level info from another
 611                  * namespace. It seems that we don't have it - go and
 612                  * ask server
 613                  */
 614                 goto request;
 615             } else if (NULL != cb->key) {
 616                 /* if immediate was given, then we are being directed to
 617                  * check with the server even though the caller is looking for
 618                  * job-level info. In some cases, a server may elect not
 619                  * to provide info at init to save memory */
 620                 if (immediate) {
 621                     pmix_output_verbose(5, pmix_client_globals.get_output,
 622                                         "pmix:client IMMEDIATE given - querying data");
 623                     /* the direct modex request doesn't pass a key as it
 624                      * was intended to support non-job-level information.
 625                      * So instead, we will use the PMIx_Query function
 626                      * to request the information */
 627                     cd = PMIX_NEW(pmix_query_caddy_t);
 628                     cd->cbdata = cb->cbdata;
 629                     cd->valcbfunc = cb->cbfunc.valuefn;
 630                     PMIX_QUERY_CREATE(cd->queries, 1);
 631                     cd->nqueries = 1;
 632                     pmix_argv_append_nosize(&cd->queries[0].keys, cb->key);
 633                     if (PMIX_SUCCESS != (rc = PMIx_Query_info_nb(cd->queries, 1, infocb, cd))) {
 634                         PMIX_RELEASE(cd);
 635                         goto respond;
 636                     }
 637                     PMIX_RELEASE(cb);
 638                     return;
 639                 }
 640                 /* we should have had this info, so respond with the error */
 641                 pmix_output_verbose(5, pmix_client_globals.get_output,
 642                                     "pmix:client returning NOT FOUND error");
 643                 goto respond;
 644             } else {
 645                 pmix_output_verbose(5, pmix_client_globals.get_output,
 646                                     "pmix:client NULL KEY - returning error");
 647                 goto respond;
 648             }
 649         }
 650         pmix_output_verbose(5, pmix_client_globals.get_output,
 651                             "pmix:client job-level data NOT found");
 652         rc = process_values(&val, cb);
 653         goto respond;
 654     } else {
 655         cb->proc = &proc;
 656         cb->copy = true;
 657         PMIX_GDS_FETCH_KV(rc, pmix_client_globals.myserver, cb);
 658         if (PMIX_SUCCESS != rc) {
 659             val = NULL;
 660             goto request;
 661         }
 662         /* return whatever we found */
 663         rc = process_values(&val, cb);
 664     }
 665 
 666   respond:
 667     /* if a callback was provided, execute it */
 668     if (NULL != cb->cbfunc.valuefn) {
 669         if (NULL != val)  {
 670             PMIX_VALUE_COMPRESSED_STRING_UNPACK(val);
 671         }
 672         cb->cbfunc.valuefn(rc, val, cb->cbdata);
 673     }
 674     if (NULL != val) {
 675         PMIX_VALUE_RELEASE(val);
 676     }
 677     PMIX_RELEASE(cb);
 678     return;
 679 
 680   request:
 681     /* if we got here, then we don't have the data for this proc. If we
 682      * are a server, or we are a client and not connected, then there is
 683      * nothing more we can do */
 684     if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer) ||
 685         (!PMIX_PROC_IS_SERVER(pmix_globals.mypeer) && !pmix_globals.connected)) {
 686         rc = PMIX_ERR_NOT_FOUND;
 687         goto respond;
 688     }
 689 
 690     /* we also have to check the user's directives to see if they do not want
 691      * us to attempt to retrieve it from the server */
 692     if (optional) {
 693         /* they don't want us to try and retrieve it */
 694         pmix_output_verbose(2, pmix_client_globals.get_output,
 695                             "PMIx_Get key=%s for rank = %u, namespace = %s was not found - request was optional",
 696                             cb->key, cb->pname.rank, cb->pname.nspace);
 697         rc = PMIX_ERR_NOT_FOUND;
 698         goto respond;
 699     }
 700 
 701     /* see if we already have a request in place with the server for data from
 702      * this nspace:rank. If we do, then no need to ask again as the
 703      * request will return _all_ data from that proc */
 704     PMIX_LIST_FOREACH(cbret, &pmix_client_globals.pending_requests, pmix_cb_t) {
 705         if (0 == strncmp(cbret->pname.nspace, cb->pname.nspace, PMIX_MAX_NSLEN) &&
 706             cbret->pname.rank == cb->pname.rank) {
 707             /* we do have a pending request, but we still need to track this
 708              * outstanding request so we can satisfy it once the data is returned */
 709             pmix_list_append(&pmix_client_globals.pending_requests, &cb->super);
 710             return;
 711         }
 712     }
 713 
 714     /* we don't have a pending request, so let's create one - don't worry
 715      * about packing the key as we return everything from that proc */
 716     msg = _pack_get(cb->pname.nspace, cb->pname.rank, cb->info, cb->ninfo, PMIX_GETNB_CMD);
 717     if (NULL == msg) {
 718         rc = PMIX_ERROR;
 719         goto respond;
 720     }
 721 
 722     pmix_output_verbose(2, pmix_client_globals.get_output,
 723                         "%s:%d REQUESTING DATA FROM SERVER FOR %s:%d KEY %s",
 724                         pmix_globals.myid.nspace, pmix_globals.myid.rank,
 725                         cb->pname.nspace, cb->pname.rank, cb->key);
 726 
 727     /* track the callback object */
 728     pmix_list_append(&pmix_client_globals.pending_requests, &cb->super);
 729     /* send to the server */
 730     PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver, msg, _getnb_cbfunc, (void*)cb);
 731     if (PMIX_SUCCESS != rc) {
 732         pmix_list_remove_item(&pmix_client_globals.pending_requests, &cb->super);
 733         rc = PMIX_ERROR;
 734         goto respond;
 735     }
 736     /* we made a lot of changes to cb, so ensure they get
 737      * written out before we return */
 738     PMIX_POST_OBJECT(cb);
 739     return;
 740 }

/* [<][>][^][v][top][bottom][index][help] */