root/opal/mca/pmix/pmix4x/pmix/src/client/pmix_client_spawn.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. PMIx_Spawn
  2. PMIx_Spawn_nb
  3. wait_cbfunc
  4. spawn_cbfunc

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2014-2019 Intel, Inc.  All rights reserved.
   4  * Copyright (c) 2014-2019 Research Organization for Information Science
   5  *                         and Technology (RIST).  All rights reserved.
   6  * Copyright (c) 2014      Artem Y. Polyakov <artpol84@gmail.com>.
   7  *                         All rights reserved.
   8  * Copyright (c) 2016      Mellanox Technologies, Inc.
   9  *                         All rights reserved.
  10  * Copyright (c) 2016      IBM Corporation.  All rights reserved.
  11  * $COPYRIGHT$
  12  *
  13  * Additional copyrights may follow
  14  *
  15  * $HEADER$
  16  */
  17 
  18 #include <src/include/pmix_config.h>
  19 
  20 #include <src/include/pmix_stdint.h>
  21 
  22 #include <pmix.h>
  23 #include <pmix_rename.h>
  24 
  25 #include "src/include/pmix_globals.h"
  26 
  27 #ifdef HAVE_STRING_H
  28 #include <string.h>
  29 #endif
  30 #include <fcntl.h>
  31 #ifdef HAVE_UNISTD_H
  32 #include <unistd.h>
  33 #endif
  34 #ifdef HAVE_SYS_SOCKET_H
  35 #include <sys/socket.h>
  36 #endif
  37 #ifdef HAVE_SYS_UN_H
  38 #include <sys/un.h>
  39 #endif
  40 #ifdef HAVE_SYS_UIO_H
  41 #include <sys/uio.h>
  42 #endif
  43 #ifdef HAVE_SYS_TYPES_H
  44 #include <sys/types.h>
  45 #endif
  46 #include PMIX_EVENT_HEADER
  47 
  48 #include "src/class/pmix_list.h"
  49 #include "src/threads/threads.h"
  50 #include "src/mca/bfrops/bfrops.h"
  51 #include "src/mca/pnet/base/base.h"
  52 #include "src/util/argv.h"
  53 #include "src/util/error.h"
  54 #include "src/util/output.h"
  55 #include "src/mca/gds/gds.h"
  56 #include "src/mca/ptl/ptl.h"
  57 
  58 #include "pmix_client_ops.h"
  59 
  60 static void wait_cbfunc(struct pmix_peer_t *pr,
  61                         pmix_ptl_hdr_t *hdr,
  62                         pmix_buffer_t *buf, void *cbdata);
  63 static void spawn_cbfunc(pmix_status_t status, char nspace[], void *cbdata);
  64 
  65 PMIX_EXPORT pmix_status_t PMIx_Spawn(const pmix_info_t job_info[], size_t ninfo,
  66                            const pmix_app_t apps[], size_t napps,
  67                            pmix_nspace_t nspace)
  68 {
  69     pmix_status_t rc;
  70     pmix_cb_t *cb;
  71 
  72     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
  73 
  74     pmix_output_verbose(2, pmix_globals.debug_output,
  75                         "pmix: spawn called");
  76 
  77     if (pmix_globals.init_cntr <= 0) {
  78         PMIX_RELEASE_THREAD(&pmix_global_lock);
  79         return PMIX_ERR_INIT;
  80     }
  81 
  82     /* if we aren't connected, don't attempt to send */
  83     if (!pmix_globals.connected) {
  84         PMIX_RELEASE_THREAD(&pmix_global_lock);
  85         return PMIX_ERR_UNREACH;
  86     }
  87     PMIX_RELEASE_THREAD(&pmix_global_lock);
  88 
  89 
  90     /* ensure the nspace (if provided) is initialized */
  91     if (NULL != nspace) {
  92         memset(nspace, 0, PMIX_MAX_NSLEN+1);
  93     }
  94 
  95     /* create a callback object */
  96     cb = PMIX_NEW(pmix_cb_t);
  97 
  98     if (PMIX_SUCCESS != (rc = PMIx_Spawn_nb(job_info, ninfo, apps, napps, spawn_cbfunc, cb))) {
  99         PMIX_RELEASE(cb);
 100         return rc;
 101     }
 102 
 103     /* wait for the result */
 104     PMIX_WAIT_THREAD(&cb->lock);
 105     rc = cb->status;
 106     if (NULL != nspace) {
 107         pmix_strncpy(nspace, cb->pname.nspace, PMIX_MAX_NSLEN);
 108     }
 109     PMIX_RELEASE(cb);
 110 
 111     return rc;
 112 }
 113 
 114 PMIX_EXPORT pmix_status_t PMIx_Spawn_nb(const pmix_info_t job_info[], size_t ninfo,
 115                                         const pmix_app_t apps[], size_t napps,
 116                                         pmix_spawn_cbfunc_t cbfunc, void *cbdata)
 117 {
 118     pmix_buffer_t *msg;
 119     pmix_cmd_t cmd = PMIX_SPAWNNB_CMD;
 120     pmix_status_t rc;
 121     pmix_cb_t *cb;
 122     size_t n, m;
 123     pmix_app_t *aptr;
 124     bool jobenvars = false;
 125     char *harvest[2] = {"PMIX_MCA_", NULL};
 126     pmix_kval_t *kv;
 127     pmix_list_t ilist;
 128 
 129     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
 130 
 131     pmix_output_verbose(2, pmix_globals.debug_output,
 132                         "pmix: spawn called");
 133 
 134     if (pmix_globals.init_cntr <= 0) {
 135         PMIX_RELEASE_THREAD(&pmix_global_lock);
 136         return PMIX_ERR_INIT;
 137     }
 138 
 139     /* if we aren't connected, don't attempt to send */
 140     if (!pmix_globals.connected) {
 141         PMIX_RELEASE_THREAD(&pmix_global_lock);
 142         return PMIX_ERR_UNREACH;
 143     }
 144     PMIX_RELEASE_THREAD(&pmix_global_lock);
 145 
 146     /* check job info for directives */
 147     if (NULL != job_info) {
 148         for (n=0; n < ninfo; n++) {
 149             if (PMIX_CHECK_KEY(&job_info[n], PMIX_SETUP_APP_ENVARS)) {
 150                 PMIX_CONSTRUCT(&ilist, pmix_list_t);
 151                 rc = pmix_pnet_base_harvest_envars(harvest, NULL, &ilist);
 152                 if (PMIX_SUCCESS != rc) {
 153                     PMIX_LIST_DESTRUCT(&ilist);
 154                     return rc;
 155                 }
 156                 PMIX_LIST_FOREACH(kv, &ilist, pmix_kval_t) {
 157                     /* cycle across all the apps and set this envar */
 158                     for (m=0; m < napps; m++) {
 159                         aptr = (pmix_app_t*)&apps[m];
 160                         pmix_setenv(kv->value->data.envar.envar,
 161                                     kv->value->data.envar.value,
 162                                     true, &aptr->env);
 163                     }
 164                 }
 165                 jobenvars = true;
 166                 PMIX_LIST_DESTRUCT(&ilist);
 167                 break;
 168             }
 169         }
 170     }
 171 
 172     for (n=0; n < napps; n++) {
 173         /* do a quick check of the apps directive array to ensure
 174          * the ninfo field has been set */
 175         aptr = (pmix_app_t*)&apps[n];
 176         if (NULL != aptr->info && 0 == aptr->ninfo) {
 177             /* look for the info marked as "end" */
 178             m = 0;
 179             while (!(PMIX_INFO_IS_END(&aptr->info[m])) && m < SIZE_MAX) {
 180                 ++m;
 181             }
 182             if (SIZE_MAX == m) {
 183                 /* nothing we can do */
 184                 return PMIX_ERR_BAD_PARAM;
 185             }
 186             aptr->ninfo = m;
 187         }
 188         if (!jobenvars) {
 189             for (m=0; m < aptr->ninfo; m++) {
 190                 if (PMIX_CHECK_KEY(&aptr->info[m], PMIX_SETUP_APP_ENVARS)) {
 191                     PMIX_CONSTRUCT(&ilist, pmix_list_t);
 192                     rc = pmix_pnet_base_harvest_envars(harvest, NULL, &ilist);
 193                     if (PMIX_SUCCESS != rc) {
 194                         PMIX_LIST_DESTRUCT(&ilist);
 195                         return rc;
 196                     }
 197                     PMIX_LIST_FOREACH(kv, &ilist, pmix_kval_t) {
 198                         pmix_setenv(kv->value->data.envar.envar,
 199                                     kv->value->data.envar.value,
 200                                     true, &aptr->env);
 201                     }
 202                     jobenvars = true;
 203                     PMIX_LIST_DESTRUCT(&ilist);
 204                     break;
 205                 }
 206             }
 207         }
 208     }
 209 
 210     msg = PMIX_NEW(pmix_buffer_t);
 211     /* pack the cmd */
 212     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 213                      msg, &cmd, 1, PMIX_COMMAND);
 214     if (PMIX_SUCCESS != rc) {
 215         PMIX_ERROR_LOG(rc);
 216         PMIX_RELEASE(msg);
 217         return rc;
 218     }
 219 
 220     /* pack the job-level directives */
 221     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 222                      msg, &ninfo, 1, PMIX_SIZE);
 223     if (PMIX_SUCCESS != rc) {
 224         PMIX_ERROR_LOG(rc);
 225         PMIX_RELEASE(msg);
 226         return rc;
 227     }
 228     if (0 < ninfo) {
 229         PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 230                          msg, job_info, ninfo, PMIX_INFO);
 231         if (PMIX_SUCCESS != rc) {
 232             PMIX_ERROR_LOG(rc);
 233             PMIX_RELEASE(msg);
 234             return rc;
 235         }
 236     }
 237 
 238     /* pack the apps */
 239     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 240                      msg, &napps, 1, PMIX_SIZE);
 241     if (PMIX_SUCCESS != rc) {
 242         PMIX_ERROR_LOG(rc);
 243         PMIX_RELEASE(msg);
 244         return rc;
 245     }
 246     if (0 < napps) {
 247         PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 248                          msg, apps, napps, PMIX_APP);
 249         if (PMIX_SUCCESS != rc) {
 250             PMIX_ERROR_LOG(rc);
 251             PMIX_RELEASE(msg);
 252             return rc;
 253         }
 254     }
 255 
 256     /* create a callback object as we need to pass it to the
 257      * recv routine so we know which callback to use when
 258      * the return message is recvd */
 259     cb = PMIX_NEW(pmix_cb_t);
 260     cb->cbfunc.spawnfn = cbfunc;
 261     cb->cbdata = cbdata;
 262 
 263     /* push the message into our event base to send to the server */
 264     PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
 265                        msg, wait_cbfunc, (void*)cb);
 266     if (PMIX_SUCCESS != rc) {
 267         PMIX_RELEASE(msg);
 268         PMIX_RELEASE(cb);
 269     }
 270 
 271     return rc;
 272 }
 273 
 274 /* callback for wait completion */
 275 static void wait_cbfunc(struct pmix_peer_t *pr,
 276                         pmix_ptl_hdr_t *hdr,
 277                         pmix_buffer_t *buf, void *cbdata)
 278 {
 279     pmix_cb_t *cb = (pmix_cb_t*)cbdata;
 280     char nspace[PMIX_MAX_NSLEN+1];
 281     char *n2 = NULL;
 282     pmix_status_t rc, ret;
 283     int32_t cnt;
 284 
 285     PMIX_ACQUIRE_OBJECT(cb);
 286 
 287     pmix_output_verbose(2, pmix_globals.debug_output,
 288                         "pmix:client recv callback activated with %d bytes",
 289                         (NULL == buf) ? -1 : (int)buf->bytes_used);
 290 
 291     /* init */
 292     memset(nspace, 0, PMIX_MAX_NSLEN+1);
 293 
 294     if (NULL == buf) {
 295         ret = PMIX_ERR_BAD_PARAM;
 296         goto report;
 297     }
 298     /* a zero-byte buffer indicates that this recv is being
 299      * completed due to a lost connection */
 300     if (PMIX_BUFFER_IS_EMPTY(buf)) {
 301         ret = PMIX_ERR_UNREACH;
 302         goto report;
 303     }
 304 
 305     /* unpack the returned status */
 306     cnt = 1;
 307     PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
 308                        buf, &ret, &cnt, PMIX_STATUS);
 309     if (PMIX_SUCCESS != rc) {
 310         PMIX_ERROR_LOG(rc);
 311         ret = rc;
 312     }
 313     /* unpack the namespace */
 314     cnt = 1;
 315     PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
 316                        buf, &n2, &cnt, PMIX_STRING);
 317     if (PMIX_SUCCESS != rc && PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
 318         PMIX_ERROR_LOG(rc);
 319         ret = rc;
 320     }
 321     pmix_output_verbose(1, pmix_globals.debug_output,
 322                     "pmix:client recv '%s'", n2);
 323 
 324     if (NULL != n2) {
 325         /* protect length */
 326         pmix_strncpy(nspace, n2, PMIX_MAX_NSLEN);
 327         free(n2);
 328         PMIX_GDS_STORE_JOB_INFO(rc, pmix_globals.mypeer, nspace, buf);
 329         /* extract and process any job-related info for this nspace */
 330         if (PMIX_SUCCESS != rc) {
 331             PMIX_ERROR_LOG(rc);
 332         }
 333     }
 334 
 335   report:
 336     if (NULL != cb->cbfunc.spawnfn) {
 337         cb->cbfunc.spawnfn(ret, nspace, cb->cbdata);
 338     }
 339     PMIX_RELEASE(cb);
 340 }
 341 
 342 static void spawn_cbfunc(pmix_status_t status, char nspace[], void *cbdata)
 343 {
 344     pmix_cb_t *cb = (pmix_cb_t*)cbdata;
 345 
 346     PMIX_ACQUIRE_OBJECT(cb);
 347     cb->status = status;
 348     if (NULL != nspace) {
 349         cb->pname.nspace = strdup(nspace);
 350     }
 351     PMIX_POST_OBJECT(cb);
 352     PMIX_WAKEUP_THREAD(&cb->lock);
 353 }

/* [<][>][^][v][top][bottom][index][help] */