root/opal/mca/pmix/pmix4x/pmix/src/common/pmix_query.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. relcbfunc
  2. query_cbfunc
  3. _local_relcb
  4. _local_cbfunc
  5. PMIx_Query_info_nb
  6. acb
  7. PMIx_Allocation_request
  8. PMIx_Allocation_request_nb

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2014-2019 Intel, Inc.  All rights reserved.
   4  * Copyright (c) 2016      Mellanox Technologies, Inc.
   5  *                         All rights reserved.
   6  * Copyright (c) 2016      IBM Corporation.  All rights reserved.
   7  * Copyright (c) 2019      Research Organization for Information Science
   8  *                         and Technology (RIST).  All rights reserved.
   9  * $COPYRIGHT$
  10  *
  11  * Additional copyrights may follow
  12  *
  13  * $HEADER$
  14  */
  15 #include <src/include/pmix_config.h>
  16 
  17 #include <src/include/pmix_stdint.h>
  18 #include <src/include/pmix_socket_errno.h>
  19 
  20 #include <pmix.h>
  21 #include <pmix_common.h>
  22 #include <pmix_server.h>
  23 #include <pmix_rename.h>
  24 
  25 #include "src/threads/threads.h"
  26 #include "src/util/argv.h"
  27 #include "src/util/error.h"
  28 #include "src/util/name_fns.h"
  29 #include "src/util/output.h"
  30 #include "src/mca/bfrops/bfrops.h"
  31 #include "src/mca/ptl/ptl.h"
  32 #include "src/common/pmix_attributes.h"
  33 
  34 #include "src/client/pmix_client_ops.h"
  35 #include "src/server/pmix_server_ops.h"
  36 #include "src/include/pmix_globals.h"
  37 
  38 static void relcbfunc(void *cbdata)
  39 {
  40     pmix_shift_caddy_t *cd = (pmix_shift_caddy_t*)cbdata;
  41 
  42     pmix_output_verbose(2, pmix_globals.debug_output,
  43                         "pmix:query release callback");
  44 
  45     if (NULL != cd->info) {
  46         PMIX_INFO_FREE(cd->info, cd->ninfo);
  47     }
  48     PMIX_RELEASE(cd);
  49 }
  50 static void query_cbfunc(struct pmix_peer_t *peer,
  51                          pmix_ptl_hdr_t *hdr,
  52                          pmix_buffer_t *buf, void *cbdata)
  53 {
  54     pmix_query_caddy_t *cd = (pmix_query_caddy_t*)cbdata;
  55     pmix_status_t rc;
  56     pmix_shift_caddy_t *results;
  57     int cnt;
  58     size_t n;
  59     pmix_kval_t *kv;
  60 
  61     pmix_output_verbose(2, pmix_globals.debug_output,
  62                         "pmix:query cback from server");
  63 
  64     results = PMIX_NEW(pmix_shift_caddy_t);
  65 
  66     /* unpack the status */
  67     cnt = 1;
  68     PMIX_BFROPS_UNPACK(rc, peer, buf, &results->status, &cnt, PMIX_STATUS);
  69     if (PMIX_SUCCESS != rc) {
  70         PMIX_ERROR_LOG(rc);
  71         results->status = rc;
  72         goto complete;
  73     }
  74     if (PMIX_SUCCESS != results->status) {
  75         goto complete;
  76     }
  77 
  78     /* unpack any returned data */
  79     cnt = 1;
  80     PMIX_BFROPS_UNPACK(rc, peer, buf, &results->ninfo, &cnt, PMIX_SIZE);
  81     if (PMIX_SUCCESS != rc) {
  82         PMIX_ERROR_LOG(rc);
  83         results->status = rc;
  84         goto complete;
  85     }
  86     if (0 < results->ninfo) {
  87         PMIX_INFO_CREATE(results->info, results->ninfo);
  88         cnt = results->ninfo;
  89         PMIX_BFROPS_UNPACK(rc, peer, buf, results->info, &cnt, PMIX_INFO);
  90         if (PMIX_SUCCESS != rc) {
  91             PMIX_ERROR_LOG(rc);
  92             results->status = rc;
  93             goto complete;
  94         }
  95         /* locally cache the results */
  96         for (n=0; n < results->ninfo; n++) {
  97             kv = PMIX_NEW(pmix_kval_t);
  98             kv->key = strdup(results->info[n].key);
  99             PMIX_VALUE_CREATE(kv->value, 1);
 100             PMIX_BFROPS_VALUE_XFER(rc, pmix_globals.mypeer,
 101                                    kv->value, &results->info[n].value);
 102 
 103             PMIX_GDS_STORE_KV(rc, pmix_globals.mypeer,
 104                               &pmix_globals.myid, PMIX_INTERNAL,
 105                               kv);
 106             PMIX_RELEASE(kv);  // maintain accounting
 107         }
 108     }
 109 
 110   complete:
 111     pmix_output_verbose(2, pmix_globals.debug_output,
 112                         "pmix:query cback from server releasing with status %s", PMIx_Error_string(results->status));
 113     /* release the caller */
 114     if (NULL != cd->cbfunc) {
 115         cd->cbfunc(results->status, results->info, results->ninfo, cd->cbdata, relcbfunc, results);
 116     }
 117     PMIX_RELEASE(cd);
 118 }
 119 
 120 static void _local_relcb(void *cbdata)
 121 {
 122     pmix_query_caddy_t *cd = (pmix_query_caddy_t*)cbdata;
 123     PMIX_RELEASE(cd);
 124 }
 125 
 126 static void _local_cbfunc(int sd, short args, void *cbdata)
 127 {
 128     pmix_query_caddy_t *cd = (pmix_query_caddy_t*)cbdata;
 129     if (NULL != cd->cbfunc) {
 130         cd->cbfunc(cd->status, cd->info, cd->ninfo, cd->cbdata, _local_relcb, cd);
 131         return;
 132     }
 133     PMIX_RELEASE(cd);
 134 }
 135 
 136 PMIX_EXPORT pmix_status_t PMIx_Query_info_nb(pmix_query_t queries[], size_t nqueries,
 137                                              pmix_info_cbfunc_t cbfunc, void *cbdata)
 138 
 139 {
 140     pmix_query_caddy_t *cd;
 141     pmix_cmd_t cmd = PMIX_QUERY_CMD;
 142     pmix_buffer_t *msg;
 143     pmix_status_t rc;
 144     pmix_cb_t cb;
 145     size_t n, p;
 146     pmix_list_t results;
 147     pmix_kval_t *kv, *kvnxt;
 148     pmix_proc_t proc;
 149 
 150     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
 151 
 152     pmix_output_verbose(2, pmix_globals.debug_output,
 153                         "pmix:query non-blocking");
 154 
 155     if (pmix_globals.init_cntr <= 0) {
 156         PMIX_RELEASE_THREAD(&pmix_global_lock);
 157         return PMIX_ERR_INIT;
 158     }
 159 
 160     if (0 == nqueries || NULL == queries) {
 161         PMIX_RELEASE_THREAD(&pmix_global_lock);
 162         return PMIX_ERR_BAD_PARAM;
 163     }
 164 
 165     /* do a quick check of the qualifiers array to ensure
 166      * the nqual field has been set */
 167     for (n=0; n < nqueries; n++) {
 168         if (NULL != queries[n].qualifiers && 0 == queries[n].nqual) {
 169             /* look for the info marked as "end" */
 170             p = 0;
 171             while (!(PMIX_INFO_IS_END(&queries[n].qualifiers[p])) && p < SIZE_MAX) {
 172                 ++p;
 173             }
 174             if (SIZE_MAX == p) {
 175                 /* nothing we can do */
 176                 PMIX_RELEASE_THREAD(&pmix_global_lock);
 177                 return PMIX_ERR_BAD_PARAM;
 178             }
 179             queries[n].nqual = p;
 180         }
 181     }
 182 
 183     /* setup the list of local results */
 184     PMIX_CONSTRUCT(&results, pmix_list_t);
 185 
 186     /* check the directives to see if they want us to refresh
 187      * the local cached results - if we wanted to optimize this
 188      * more, we would check each query and allow those that don't
 189      * want to be refreshed to be executed locally, and those that
 190      * did would be sent to the host. However, for now we simply
 191      * assume that any requirement to refresh will force all to
 192      * do so */
 193     memset(proc.nspace, 0, PMIX_MAX_NSLEN+1);
 194     proc.rank = PMIX_RANK_INVALID;
 195     for (n=0; n < nqueries; n++) {
 196         /* check for requests to report supported attributes */
 197         if (0 == strcmp(queries[n].keys[0], PMIX_QUERY_ATTRIBUTE_SUPPORT)) {
 198             cd = PMIX_NEW(pmix_query_caddy_t);
 199             cd->queries = queries;
 200             cd->nqueries = nqueries;
 201             cd->cbfunc = cbfunc;
 202             cd->cbdata = cbdata;
 203             PMIX_THREADSHIFT(cd, pmix_attrs_query_support);
 204             /* regardless of the result of the query, we return
 205              * PMIX_SUCCESS here to indicate that the operation
 206              * was accepted for processing */
 207             PMIX_RELEASE_THREAD(&pmix_global_lock);
 208             return PMIX_SUCCESS;
 209         }
 210         for (p=0; p < queries[n].nqual; p++) {
 211             if (PMIX_CHECK_KEY(&queries[n].qualifiers[p], PMIX_QUERY_REFRESH_CACHE)) {
 212                 if (PMIX_INFO_TRUE(&queries[n].qualifiers[p])) {
 213                     PMIX_LIST_DESTRUCT(&results);
 214                     goto query;
 215                 }
 216             } else if (PMIX_CHECK_KEY(&queries[n].qualifiers[p], PMIX_PROCID)) {
 217                 PMIX_LOAD_NSPACE(proc.nspace, queries[n].qualifiers[p].value.data.proc->nspace);
 218                 proc.rank = queries[n].qualifiers[p].value.data.proc->rank;
 219             } else if (PMIX_CHECK_KEY(&queries[n].qualifiers[p], PMIX_NSPACE)) {
 220                 PMIX_LOAD_NSPACE(proc.nspace, queries[n].qualifiers[p].value.data.string);
 221             } else if (PMIX_CHECK_KEY(&queries[n].qualifiers[p], PMIX_RANK)) {
 222                 proc.rank = queries[n].qualifiers[p].value.data.rank;
 223             } else if (PMIX_CHECK_KEY(&queries[n].qualifiers[p], PMIX_HOSTNAME)) {
 224                 if (0 != strcmp(queries[n].qualifiers[p].value.data.string, pmix_globals.hostname)) {
 225                     /* asking about a different host, so ask for the info */
 226                     PMIX_LIST_DESTRUCT(&results);
 227                     goto query;
 228                 }
 229             }
 230         }
 231         /* we get here if a refresh isn't required - first try a local
 232          * "get" on the data to see if we already have it */
 233         PMIX_CONSTRUCT(&cb, pmix_cb_t);
 234         cb.copy = false;
 235         /* set the proc */
 236         if (PMIX_RANK_INVALID == proc.rank &&
 237             0 == strlen(proc.nspace)) {
 238             /* use our id */
 239             cb.proc = &pmix_globals.myid;
 240         } else {
 241             if (0 == strlen(proc.nspace)) {
 242                 /* use our nspace */
 243                 PMIX_LOAD_NSPACE(cb.proc->nspace, pmix_globals.myid.nspace);
 244             }
 245             if (PMIX_RANK_INVALID == proc.rank) {
 246                 /* user the wildcard rank */
 247                 proc.rank = PMIX_RANK_WILDCARD;
 248             }
 249             cb.proc = &proc;
 250         }
 251         for (p=0; NULL != queries[n].keys[p]; p++) {
 252             cb.key = queries[n].keys[p];
 253             PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, &cb);
 254             if (PMIX_SUCCESS != rc) {
 255                 /* needs to be passed to the host */
 256                 PMIX_LIST_DESTRUCT(&results);
 257                 PMIX_DESTRUCT(&cb);
 258                 goto query;
 259             }
 260             /* need to retain this result */
 261             PMIX_LIST_FOREACH_SAFE(kv, kvnxt, &cb.kvs, pmix_kval_t) {
 262                 pmix_list_remove_item(&cb.kvs, &kv->super);
 263                 pmix_list_append(&results, &kv->super);
 264             }
 265             PMIX_DESTRUCT(&cb);
 266         }
 267     }
 268 
 269     /* if we get here, then all queries were completely locally
 270      * resolved, so construct the results for return */
 271     cd = PMIX_NEW(pmix_query_caddy_t);
 272     cd->cbfunc = cbfunc;
 273     cd->cbdata = cbdata;
 274     cd->status = PMIX_SUCCESS;
 275     cd->ninfo = pmix_list_get_size(&results);
 276     PMIX_INFO_CREATE(cd->info, cd->ninfo);
 277     n = 0;
 278     PMIX_LIST_FOREACH_SAFE(kv, kvnxt, &results, pmix_kval_t) {
 279         PMIX_LOAD_KEY(cd->info[n].key, kv->key);
 280         rc = pmix_value_xfer(&cd->info[n].value, kv->value);
 281         if (PMIX_SUCCESS != rc) {
 282             cd->status = rc;
 283             PMIX_INFO_FREE(cd->info, cd->ninfo);
 284             break;
 285         }
 286         ++n;
 287     }
 288     /* done with the list of results */
 289     PMIX_LIST_DESTRUCT(&results);
 290     /* we need to thread-shift as we are not allowed to
 291      * execute the callback function prior to returning
 292      * from the API */
 293     PMIX_THREADSHIFT(cd, _local_cbfunc);
 294     /* regardless of the result of the query, we return
 295      * PMIX_SUCCESS here to indicate that the operation
 296      * was accepted for processing */
 297     PMIX_RELEASE_THREAD(&pmix_global_lock);
 298     return PMIX_SUCCESS;
 299 
 300 
 301   query:
 302     /* if we are the server, then we just issue the query and
 303      * return the response */
 304     if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer) &&
 305         !PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer)) {
 306         PMIX_RELEASE_THREAD(&pmix_global_lock);
 307         if (NULL == pmix_host_server.query) {
 308             /* nothing we can do */
 309             return PMIX_ERR_NOT_SUPPORTED;
 310         }
 311         pmix_output_verbose(2, pmix_globals.debug_output,
 312                             "pmix:query handed to RM");
 313         rc = pmix_host_server.query(&pmix_globals.myid,
 314                                     queries, nqueries,
 315                                     cbfunc, cbdata);
 316         return rc;
 317     }
 318 
 319     /* if we aren't connected, don't attempt to send */
 320     if (!pmix_globals.connected) {
 321         PMIX_RELEASE_THREAD(&pmix_global_lock);
 322         return PMIX_ERR_UNREACH;
 323     }
 324     PMIX_RELEASE_THREAD(&pmix_global_lock);
 325 
 326     /* if we are a client, then relay this request to the server */
 327     cd = PMIX_NEW(pmix_query_caddy_t);
 328     cd->cbfunc = cbfunc;
 329     cd->cbdata = cbdata;
 330     msg = PMIX_NEW(pmix_buffer_t);
 331     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 332                      msg, &cmd, 1, PMIX_COMMAND);
 333     if (PMIX_SUCCESS != rc) {
 334         PMIX_ERROR_LOG(rc);
 335         PMIX_RELEASE(msg);
 336         PMIX_RELEASE(cd);
 337         return rc;
 338     }
 339     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 340                      msg, &nqueries, 1, PMIX_SIZE);
 341     if (PMIX_SUCCESS != rc) {
 342         PMIX_ERROR_LOG(rc);
 343         PMIX_RELEASE(msg);
 344         PMIX_RELEASE(cd);
 345         return rc;
 346     }
 347     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 348                      msg, queries, nqueries, PMIX_QUERY);
 349     if (PMIX_SUCCESS != rc) {
 350         PMIX_ERROR_LOG(rc);
 351         PMIX_RELEASE(msg);
 352         PMIX_RELEASE(cd);
 353         return rc;
 354     }
 355 
 356     pmix_output_verbose(2, pmix_globals.debug_output,
 357                         "pmix:query sending to server");
 358     PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
 359                        msg, query_cbfunc, (void*)cd);
 360     if (PMIX_SUCCESS != rc) {
 361         PMIX_RELEASE(cd);
 362     }
 363     return rc;
 364 }
 365 
 366 static void acb(pmix_status_t status,
 367                 pmix_info_t *info, size_t ninfo,
 368                 void *cbdata,
 369                 pmix_release_cbfunc_t release_fn,
 370                 void *release_cbdata)
 371 {
 372     pmix_cb_t *cb = (pmix_cb_t*)cbdata;
 373     cb->status = status;
 374     if (NULL != release_fn) {
 375         release_fn(release_cbdata);
 376     }
 377     PMIX_WAKEUP_THREAD(&cb->lock);
 378 }
 379 
 380 PMIX_EXPORT pmix_status_t PMIx_Allocation_request(pmix_alloc_directive_t directive,
 381                                                   pmix_info_t *info, size_t ninfo)
 382 {
 383     pmix_cb_t cb;
 384     pmix_status_t rc;
 385 
 386     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
 387 
 388     if (pmix_globals.init_cntr <= 0) {
 389         PMIX_RELEASE_THREAD(&pmix_global_lock);
 390         return PMIX_ERR_INIT;
 391     }
 392     PMIX_RELEASE_THREAD(&pmix_global_lock);
 393 
 394     pmix_output_verbose(2, pmix_globals.debug_output,
 395                         "%s pmix:allocate", PMIX_NAME_PRINT(&pmix_globals.myid));
 396 
 397     /* create a callback object as we need to pass it to the
 398      * recv routine so we know which callback to use when
 399      * the return message is recvd */
 400     PMIX_CONSTRUCT(&cb, pmix_cb_t);
 401     if (PMIX_SUCCESS != (rc = PMIx_Allocation_request_nb(directive, info, ninfo,
 402                                                          acb, &cb))) {
 403         PMIX_DESTRUCT(&cb);
 404         return rc;
 405     }
 406 
 407     /* wait for the operation to complete */
 408     PMIX_WAIT_THREAD(&cb.lock);
 409     rc = cb.status;
 410     PMIX_DESTRUCT(&cb);
 411 
 412     pmix_output_verbose(2, pmix_globals.debug_output,
 413                         "pmix:allocate completed");
 414 
 415     return rc;
 416 }
 417 
 418 PMIX_EXPORT pmix_status_t PMIx_Allocation_request_nb(pmix_alloc_directive_t directive,
 419                                                      pmix_info_t *info, size_t ninfo,
 420                                                      pmix_info_cbfunc_t cbfunc, void *cbdata)
 421 {
 422     pmix_buffer_t *msg;
 423     pmix_cmd_t cmd = PMIX_ALLOC_CMD;
 424     pmix_status_t rc;
 425     pmix_query_caddy_t *cb;
 426 
 427     pmix_output_verbose(2, pmix_globals.debug_output,
 428                         "pmix: allocate called");
 429 
 430     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
 431 
 432     if (pmix_globals.init_cntr <= 0) {
 433         PMIX_RELEASE_THREAD(&pmix_global_lock);
 434         return PMIX_ERR_INIT;
 435     }
 436 
 437     /* if we are the server, then we just issue the request and
 438      * return the response */
 439     if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer) &&
 440         !PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer)) {
 441         PMIX_RELEASE_THREAD(&pmix_global_lock);
 442         if (NULL == pmix_host_server.allocate) {
 443             /* nothing we can do */
 444             return PMIX_ERR_NOT_SUPPORTED;
 445         }
 446         pmix_output_verbose(2, pmix_globals.debug_output,
 447                             "pmix:allocate handed to RM");
 448         rc = pmix_host_server.allocate(&pmix_globals.myid,
 449                                        directive,
 450                                        info, ninfo,
 451                                        cbfunc, cbdata);
 452         return rc;
 453     }
 454 
 455     /* if we are a client, then relay this request to the server */
 456 
 457     /* if we aren't connected, don't attempt to send */
 458     if (!pmix_globals.connected) {
 459         PMIX_RELEASE_THREAD(&pmix_global_lock);
 460         return PMIX_ERR_UNREACH;
 461     }
 462     PMIX_RELEASE_THREAD(&pmix_global_lock);
 463 
 464     msg = PMIX_NEW(pmix_buffer_t);
 465     /* pack the cmd */
 466     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 467                      msg, &cmd, 1, PMIX_COMMAND);
 468     if (PMIX_SUCCESS != rc) {
 469         PMIX_ERROR_LOG(rc);
 470         PMIX_RELEASE(msg);
 471         return rc;
 472     }
 473 
 474     /* pack the directive */
 475     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 476                      msg, &directive, 1, PMIX_ALLOC_DIRECTIVE);
 477     if (PMIX_SUCCESS != rc) {
 478         PMIX_ERROR_LOG(rc);
 479         PMIX_RELEASE(msg);
 480         return rc;
 481     }
 482 
 483     /* pack the info */
 484     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 485                      msg, &ninfo, 1, PMIX_SIZE);
 486     if (PMIX_SUCCESS != rc) {
 487         PMIX_ERROR_LOG(rc);
 488         PMIX_RELEASE(msg);
 489         return rc;
 490     }
 491     if (0 < ninfo) {
 492         PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 493                          msg, info, ninfo, PMIX_INFO);
 494         if (PMIX_SUCCESS != rc) {
 495             PMIX_ERROR_LOG(rc);
 496             PMIX_RELEASE(msg);
 497             return rc;
 498         }
 499     }
 500 
 501     /* create a callback object as we need to pass it to the
 502      * recv routine so we know which callback to use when
 503      * the return message is recvd */
 504     cb = PMIX_NEW(pmix_query_caddy_t);
 505     cb->cbfunc = cbfunc;
 506     cb->cbdata = cbdata;
 507 
 508     /* push the message into our event base to send to the server */
 509     PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
 510                        msg, query_cbfunc, (void*)cb);
 511     if (PMIX_SUCCESS != rc) {
 512         PMIX_RELEASE(msg);
 513         PMIX_RELEASE(cb);
 514     }
 515 
 516     return rc;
 517 }

/* [<][>][^][v][top][bottom][index][help] */