root/opal/mca/pmix/pmix4x/pmix/src/common/pmix_control.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. relcbfunc
  2. query_cbfunc
  3. acb
  4. PMIx_Job_control
  5. PMIx_Job_control_nb
  6. PMIx_Process_monitor
  7. PMIx_Process_monitor_nb

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2014-2018 Intel, Inc. All rights reserved.
   4  * Copyright (c) 2016      Mellanox Technologies, Inc.
   5  *                         All rights reserved.
   6  * Copyright (c) 2016      IBM Corporation.  All rights reserved.
   7  * Copyright (c) 2019      Research Organization for Information Science
   8  *                         and Technology (RIST).  All rights reserved.
   9  * $COPYRIGHT$
  10  *
  11  * Additional copyrights may follow
  12  *
  13  * $HEADER$
  14  */
  15 #include <src/include/pmix_config.h>
  16 
  17 #include <src/include/pmix_stdint.h>
  18 #include <src/include/pmix_socket_errno.h>
  19 
  20 #include <pmix.h>
  21 #include <pmix_common.h>
  22 #include <pmix_server.h>
  23 #include <pmix_rename.h>
  24 
  25 #include "src/threads/threads.h"
  26 #include "src/util/argv.h"
  27 #include "src/util/error.h"
  28 #include "src/util/name_fns.h"
  29 #include "src/util/output.h"
  30 #include "src/mca/bfrops/bfrops.h"
  31 #include "src/mca/ptl/ptl.h"
  32 
  33 #include "src/client/pmix_client_ops.h"
  34 #include "src/server/pmix_server_ops.h"
  35 #include "src/include/pmix_globals.h"
  36 
  37 static void relcbfunc(void *cbdata)
  38 {
  39     pmix_shift_caddy_t *cd = (pmix_shift_caddy_t*)cbdata;
  40 
  41     pmix_output_verbose(2, pmix_globals.debug_output,
  42                         "pmix:job_ctrl release callback");
  43 
  44     if (NULL != cd->info) {
  45         PMIX_INFO_FREE(cd->info, cd->ninfo);
  46     }
  47     PMIX_RELEASE(cd);
  48 }
  49 static void query_cbfunc(struct pmix_peer_t *peer,
  50                          pmix_ptl_hdr_t *hdr,
  51                          pmix_buffer_t *buf, void *cbdata)
  52 {
  53     pmix_query_caddy_t *cd = (pmix_query_caddy_t*)cbdata;
  54     pmix_status_t rc;
  55     pmix_shift_caddy_t *results;
  56     int cnt;
  57 
  58     pmix_output_verbose(2, pmix_globals.debug_output,
  59                         "pmix:job_ctrl cback from server with %d bytes",
  60                         (int)buf->bytes_used);
  61 
  62     /* a zero-byte buffer indicates that this recv is being
  63      * completed due to a lost connection */
  64     if (PMIX_BUFFER_IS_EMPTY(buf)) {
  65         /* release the caller */
  66         if (NULL != cd->cbfunc) {
  67             cd->cbfunc(PMIX_ERR_COMM_FAILURE, NULL, 0, cd->cbdata, NULL, NULL);
  68         }
  69         PMIX_RELEASE(cd);
  70         return;
  71     }
  72 
  73     results = PMIX_NEW(pmix_shift_caddy_t);
  74 
  75     /* unpack the status */
  76     cnt = 1;
  77     PMIX_BFROPS_UNPACK(rc, peer, buf, &results->status, &cnt, PMIX_STATUS);
  78     if (PMIX_SUCCESS != rc) {
  79         PMIX_ERROR_LOG(rc);
  80         goto complete;
  81     }
  82     if (PMIX_SUCCESS != results->status) {
  83         goto complete;
  84     }
  85 
  86     /* unpack any returned data */
  87     cnt = 1;
  88     PMIX_BFROPS_UNPACK(rc, peer, buf, &results->ninfo, &cnt, PMIX_SIZE);
  89     if (PMIX_SUCCESS != rc && PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
  90         PMIX_ERROR_LOG(rc);
  91         goto complete;
  92     }
  93     if (0 < results->ninfo) {
  94         PMIX_INFO_CREATE(results->info, results->ninfo);
  95         cnt = results->ninfo;
  96         PMIX_BFROPS_UNPACK(rc, peer, buf, results->info, &cnt, PMIX_INFO);
  97         if (PMIX_SUCCESS != rc) {
  98             PMIX_ERROR_LOG(rc);
  99             goto complete;
 100         }
 101     }
 102 
 103   complete:
 104     pmix_output_verbose(2, pmix_globals.debug_output,
 105                         "pmix:job_ctrl cback from server releasing");
 106     /* release the caller */
 107     if (NULL != cd->cbfunc) {
 108         cd->cbfunc(results->status, results->info, results->ninfo, cd->cbdata, relcbfunc, results);
 109     } else {
 110         PMIX_RELEASE(results);
 111     }
 112     PMIX_RELEASE(cd);
 113 }
 114 
 115 static void acb(pmix_status_t status,
 116                 pmix_info_t *info, size_t ninfo,
 117                 void *cbdata,
 118                 pmix_release_cbfunc_t release_fn,
 119                 void *release_cbdata)
 120 {
 121     pmix_cb_t *cb = (pmix_cb_t*)cbdata;
 122     cb->status = status;
 123     if (NULL != release_fn) {
 124         release_fn(release_cbdata);
 125     }
 126     PMIX_WAKEUP_THREAD(&cb->lock);
 127 }
 128 
 129 PMIX_EXPORT pmix_status_t PMIx_Job_control(const pmix_proc_t targets[], size_t ntargets,
 130                                            const pmix_info_t directives[], size_t ndirs)
 131 {
 132     pmix_cb_t cb;
 133     pmix_status_t rc;
 134 
 135     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
 136 
 137     if (pmix_globals.init_cntr <= 0) {
 138         PMIX_RELEASE_THREAD(&pmix_global_lock);
 139         return PMIX_ERR_INIT;
 140     }
 141     PMIX_RELEASE_THREAD(&pmix_global_lock);
 142 
 143     pmix_output_verbose(2, pmix_globals.debug_output,
 144                         "%s pmix:job_ctrl", PMIX_NAME_PRINT(&pmix_globals.myid));
 145 
 146     /* create a callback object as we need to pass it to the
 147      * recv routine so we know which callback to use when
 148      * the return message is recvd */
 149     PMIX_CONSTRUCT(&cb, pmix_cb_t);
 150     if (PMIX_SUCCESS != (rc = PMIx_Job_control_nb(targets, ntargets,
 151                                                   directives, ndirs,
 152                                                   acb, &cb))) {
 153         PMIX_DESTRUCT(&cb);
 154         return rc;
 155     }
 156 
 157     /* wait for the operation to complete */
 158     PMIX_WAIT_THREAD(&cb.lock);
 159     rc = cb.status;
 160     PMIX_DESTRUCT(&cb);
 161 
 162     pmix_output_verbose(2, pmix_globals.debug_output,
 163                         "pmix:job_ctrl completed");
 164 
 165     return rc;
 166 }
 167 
 168 PMIX_EXPORT pmix_status_t PMIx_Job_control_nb(const pmix_proc_t targets[], size_t ntargets,
 169                                               const pmix_info_t directives[], size_t ndirs,
 170                                               pmix_info_cbfunc_t cbfunc, void *cbdata)
 171 {
 172     pmix_buffer_t *msg;
 173     pmix_cmd_t cmd = PMIX_JOB_CONTROL_CMD;
 174     pmix_status_t rc;
 175     pmix_query_caddy_t *cb;
 176 
 177     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
 178 
 179     pmix_output_verbose(2, pmix_globals.debug_output,
 180                         "pmix: job control called with %d directives", (int)ndirs);
 181 
 182     if (pmix_globals.init_cntr <= 0) {
 183         PMIX_RELEASE_THREAD(&pmix_global_lock);
 184         return PMIX_ERR_INIT;
 185     }
 186 
 187     /* if we are the server, then we just issue the request and
 188      * return the response */
 189     if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer) &&
 190         !PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer)) {
 191         PMIX_RELEASE_THREAD(&pmix_global_lock);
 192         if (NULL == pmix_host_server.job_control) {
 193             /* nothing we can do */
 194             return PMIX_ERR_NOT_SUPPORTED;
 195         }
 196         pmix_output_verbose(2, pmix_globals.debug_output,
 197                             "pmix:job_control handed to RM");
 198         rc = pmix_host_server.job_control(&pmix_globals.myid,
 199                                           targets, ntargets,
 200                                           directives, ndirs,
 201                                           cbfunc, cbdata);
 202         return rc;
 203     }
 204 
 205     /* we need to send, so check for connection */
 206     if (!pmix_globals.connected) {
 207         PMIX_RELEASE_THREAD(&pmix_global_lock);
 208         return PMIX_ERR_UNREACH;
 209     }
 210     PMIX_RELEASE_THREAD(&pmix_global_lock);
 211 
 212     /* if we are a client, then relay this request to the server */
 213     msg = PMIX_NEW(pmix_buffer_t);
 214     /* pack the cmd */
 215     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 216                      msg, &cmd, 1, PMIX_COMMAND);
 217     if (PMIX_SUCCESS != rc) {
 218         PMIX_ERROR_LOG(rc);
 219         PMIX_RELEASE(msg);
 220         return rc;
 221     }
 222 
 223     /* pack the number of targets */
 224     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 225                      msg, &ntargets, 1, PMIX_SIZE);
 226     if (PMIX_SUCCESS != rc) {
 227         PMIX_ERROR_LOG(rc);
 228         PMIX_RELEASE(msg);
 229         return rc;
 230     }
 231     /* remember, the targets can be NULL to indicate that the operation
 232      * is to be done against all members of our nspace */
 233     if (NULL != targets && 0 < ntargets) {
 234         /* pack the targets */
 235         PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 236                          msg, targets, ntargets, PMIX_PROC);
 237         if (PMIX_SUCCESS != rc) {
 238             PMIX_ERROR_LOG(rc);
 239             PMIX_RELEASE(msg);
 240             return rc;
 241         }
 242     }
 243 
 244     /* pack the directives */
 245     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 246                      msg, &ndirs, 1, PMIX_SIZE);
 247     if (PMIX_SUCCESS != rc) {
 248         PMIX_ERROR_LOG(rc);
 249         PMIX_RELEASE(msg);
 250         return rc;
 251     }
 252     if (NULL != directives && 0 < ndirs) {
 253         PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 254                          msg, directives, ndirs, PMIX_INFO);
 255         if (PMIX_SUCCESS != rc) {
 256             PMIX_ERROR_LOG(rc);
 257             PMIX_RELEASE(msg);
 258             return rc;
 259         }
 260     }
 261 
 262     /* create a callback object as we need to pass it to the
 263      * recv routine so we know which callback to use when
 264      * the return message is recvd */
 265     cb = PMIX_NEW(pmix_query_caddy_t);
 266     cb->cbfunc = cbfunc;
 267     cb->cbdata = cbdata;
 268 
 269     /* push the message into our event base to send to the server */
 270     PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
 271                        msg, query_cbfunc, (void*)cb);
 272     if (PMIX_SUCCESS != rc) {
 273         PMIX_RELEASE(msg);
 274         PMIX_RELEASE(cb);
 275     }
 276 
 277     return rc;
 278 }
 279 
 280 PMIX_EXPORT pmix_status_t PMIx_Process_monitor(const pmix_info_t *monitor, pmix_status_t error,
 281                                                const pmix_info_t directives[], size_t ndirs)
 282 {
 283     pmix_cb_t cb;
 284     pmix_status_t rc;
 285 
 286     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
 287 
 288     if (pmix_globals.init_cntr <= 0) {
 289         PMIX_RELEASE_THREAD(&pmix_global_lock);
 290         return PMIX_ERR_INIT;
 291     }
 292     PMIX_RELEASE_THREAD(&pmix_global_lock);
 293 
 294     pmix_output_verbose(2, pmix_globals.debug_output,
 295                         "%s pmix:monitor", PMIX_NAME_PRINT(&pmix_globals.myid));
 296 
 297     /* create a callback object as we need to pass it to the
 298      * recv routine so we know which callback to use when
 299      * the return message is recvd */
 300     PMIX_CONSTRUCT(&cb, pmix_cb_t);
 301     if (PMIX_SUCCESS != (rc = PMIx_Process_monitor_nb(monitor, error,
 302                                                       directives, ndirs,
 303                                                       acb, &cb))) {
 304         PMIX_DESTRUCT(&cb);
 305         return rc;
 306     }
 307 
 308     /* wait for the operation to complete */
 309     PMIX_WAIT_THREAD(&cb.lock);
 310     rc = cb.status;
 311     PMIX_DESTRUCT(&cb);
 312 
 313     pmix_output_verbose(2, pmix_globals.debug_output,
 314                         "pmix:monitor completed");
 315 
 316     return rc;
 317 }
 318 
 319 PMIX_EXPORT pmix_status_t PMIx_Process_monitor_nb(const pmix_info_t *monitor, pmix_status_t error,
 320                                                   const pmix_info_t directives[], size_t ndirs,
 321                                                   pmix_info_cbfunc_t cbfunc, void *cbdata)
 322 {
 323     pmix_buffer_t *msg;
 324     pmix_cmd_t cmd = PMIX_MONITOR_CMD;
 325     pmix_status_t rc;
 326     pmix_query_caddy_t *cb;
 327 
 328     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
 329 
 330     pmix_output_verbose(2, pmix_globals.debug_output,
 331                         "pmix: monitor called");
 332 
 333     if (pmix_globals.init_cntr <= 0) {
 334         PMIX_RELEASE_THREAD(&pmix_global_lock);
 335         return PMIX_ERR_INIT;
 336     }
 337 
 338     /* sanity check */
 339     if (NULL == monitor) {
 340         PMIX_RELEASE_THREAD(&pmix_global_lock);
 341         return PMIX_ERR_BAD_PARAM;
 342     }
 343 
 344     /* if we are the server, then we just issue the request and
 345      * return the response */
 346     if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer) &&
 347         !PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer)) {
 348         PMIX_RELEASE_THREAD(&pmix_global_lock);
 349         if (NULL == pmix_host_server.monitor) {
 350             /* nothing we can do */
 351             return PMIX_ERR_NOT_SUPPORTED;
 352         }
 353         pmix_output_verbose(2, pmix_globals.debug_output,
 354                             "pmix:monitor handed to RM");
 355         rc = pmix_host_server.monitor(&pmix_globals.myid, monitor, error,
 356                                       directives, ndirs, cbfunc, cbdata);
 357         return rc;
 358     }
 359 
 360     /* we need to send, so check for connection */
 361     if (!pmix_globals.connected) {
 362         PMIX_RELEASE_THREAD(&pmix_global_lock);
 363         return PMIX_ERR_UNREACH;
 364     }
 365     PMIX_RELEASE_THREAD(&pmix_global_lock);
 366 
 367     /* if the monitor is PMIX_SEND_HEARTBEAT, then send it */
 368     if (0 == strncmp(monitor->key, PMIX_SEND_HEARTBEAT, PMIX_MAX_KEYLEN)) {
 369         msg = PMIX_NEW(pmix_buffer_t);
 370         if (NULL == msg) {
 371             return PMIX_ERR_NOMEM;
 372         }
 373         PMIX_PTL_SEND_ONEWAY(rc, pmix_client_globals.myserver, msg, PMIX_PTL_TAG_HEARTBEAT);
 374         if (PMIX_SUCCESS != rc) {
 375             PMIX_RELEASE(msg);
 376         }
 377         return rc;
 378     }
 379 
 380     /* if we are a client, then relay this request to the server */
 381     msg = PMIX_NEW(pmix_buffer_t);
 382     /* pack the cmd */
 383     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 384                      msg, &cmd, 1, PMIX_COMMAND);
 385     if (PMIX_SUCCESS != rc) {
 386         PMIX_ERROR_LOG(rc);
 387         PMIX_RELEASE(msg);
 388         return rc;
 389     }
 390 
 391     /* pack the monitor */
 392     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 393                      msg, monitor, 1, PMIX_INFO);
 394     if (PMIX_SUCCESS != rc) {
 395         PMIX_ERROR_LOG(rc);
 396         PMIX_RELEASE(msg);
 397         return rc;
 398     }
 399 
 400     /* pack the error */
 401     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 402                      msg, &error, 1, PMIX_STATUS);
 403     if (PMIX_SUCCESS != rc) {
 404         PMIX_ERROR_LOG(rc);
 405         PMIX_RELEASE(msg);
 406         return rc;
 407     }
 408 
 409     /* pack the directives */
 410     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 411                      msg, &ndirs, 1, PMIX_SIZE);
 412     if (PMIX_SUCCESS != rc) {
 413         PMIX_ERROR_LOG(rc);
 414         PMIX_RELEASE(msg);
 415         return rc;
 416     }
 417     if (0 < ndirs) {
 418         PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 419                          msg, directives, ndirs, PMIX_INFO);
 420         if (PMIX_SUCCESS != rc) {
 421             PMIX_ERROR_LOG(rc);
 422             PMIX_RELEASE(msg);
 423             return rc;
 424         }
 425     }
 426 
 427     /* create a callback object as we need to pass it to the
 428      * recv routine so we know which callback to use when
 429      * the return message is recvd */
 430     cb = PMIX_NEW(pmix_query_caddy_t);
 431     cb->cbfunc = cbfunc;
 432     cb->cbdata = cbdata;
 433 
 434     /* push the message into our event base to send to the server */
 435     PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
 436                        msg, query_cbfunc, (void*)cb);
 437     if (PMIX_SUCCESS != rc) {
 438         PMIX_RELEASE(msg);
 439         PMIX_RELEASE(cb);
 440     }
 441 
 442     return rc;
 443 }

/* [<][>][^][v][top][bottom][index][help] */