root/opal/mca/pmix/pmix4x/pmix/src/client/pmix_client_connect.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. PMIx_Connect
  2. PMIx_Connect_nb
  3. PMIx_Disconnect
  4. PMIx_Disconnect_nb
  5. wait_cbfunc
  6. op_cbfunc

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2014-2018 Intel, Inc. All rights reserved.
   4  * Copyright (c) 2014-2019 Research Organization for Information Science
   5  *                         and Technology (RIST).  All rights reserved.
   6  * Copyright (c) 2014      Artem Y. Polyakov <artpol84@gmail.com>.
   7  *                         All rights reserved.
   8  * Copyright (c) 2016      Mellanox Technologies, Inc.
   9  *                         All rights reserved.
  10  * Copyright (c) 2016      IBM Corporation.  All rights reserved.
  11  * $COPYRIGHT$
  12  *
  13  * Additional copyrights may follow
  14  *
  15  * $HEADER$
  16  */
  17 
  18 #include <src/include/pmix_config.h>
  19 
  20 #include <src/include/pmix_stdint.h>
  21 
  22 #include <pmix.h>
  23 #include <pmix_rename.h>
  24 
  25 #include "src/include/pmix_globals.h"
  26 #include "src/mca/gds/base/base.h"
  27 
  28 #ifdef HAVE_STRING_H
  29 #include <string.h>
  30 #endif
  31 #include <fcntl.h>
  32 #ifdef HAVE_UNISTD_H
  33 #include <unistd.h>
  34 #endif
  35 #ifdef HAVE_SYS_SOCKET_H
  36 #include <sys/socket.h>
  37 #endif
  38 #ifdef HAVE_SYS_UN_H
  39 #include <sys/un.h>
  40 #endif
  41 #ifdef HAVE_SYS_UIO_H
  42 #include <sys/uio.h>
  43 #endif
  44 #ifdef HAVE_SYS_TYPES_H
  45 #include <sys/types.h>
  46 #endif
  47 #include PMIX_EVENT_HEADER
  48 
  49 #include "src/class/pmix_list.h"
  50 #include "src/mca/bfrops/bfrops.h"
  51 #include "src/util/argv.h"
  52 #include "src/util/error.h"
  53 #include "src/util/output.h"
  54 #include "src/threads/threads.h"
  55 #include "src/mca/gds/gds.h"
  56 #include "src/mca/ptl/ptl.h"
  57 
  58 #include "pmix_client_ops.h"
  59 
  60 /* callback for wait completion */
  61 static void wait_cbfunc(struct pmix_peer_t *pr,
  62                         pmix_ptl_hdr_t *hdr,
  63                         pmix_buffer_t *buf, void *cbdata);
  64 static void op_cbfunc(pmix_status_t status, void *cbdata);
  65 
  66 PMIX_EXPORT pmix_status_t PMIx_Connect(const pmix_proc_t procs[], size_t nprocs,
  67                                        const pmix_info_t info[], size_t ninfo)
  68 {
  69     pmix_status_t rc;
  70     pmix_cb_t *cb;
  71 
  72     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
  73 
  74     pmix_output_verbose(2, pmix_client_globals.connect_output,
  75                         "pmix: connect called");
  76 
  77     if (pmix_globals.init_cntr <= 0) {
  78         PMIX_RELEASE_THREAD(&pmix_global_lock);
  79         return PMIX_ERR_INIT;
  80     }
  81 
  82     /* if we aren't connected, don't attempt to send */
  83     if (!pmix_globals.connected) {
  84         PMIX_RELEASE_THREAD(&pmix_global_lock);
  85         return PMIX_ERR_UNREACH;
  86     }
  87     PMIX_RELEASE_THREAD(&pmix_global_lock);
  88 
  89     /* create a callback object as we need to pass it to the
  90      * recv routine so we know which callback to use when
  91      * the return message is recvd */
  92     cb = PMIX_NEW(pmix_cb_t);
  93 
  94     /* push the message into our event base to send to the server */
  95     if (PMIX_SUCCESS != (rc = PMIx_Connect_nb(procs, nprocs, info, ninfo, op_cbfunc, cb))) {
  96         PMIX_RELEASE(cb);
  97         return rc;
  98     }
  99 
 100     /* wait for the connect to complete */
 101     PMIX_WAIT_THREAD(&cb->lock);
 102     rc = cb->status;
 103     PMIX_RELEASE(cb);
 104 
 105     pmix_output_verbose(2, pmix_globals.debug_output,
 106                         "pmix: connect completed");
 107 
 108     return rc;
 109 }
 110 
 111 PMIX_EXPORT pmix_status_t PMIx_Connect_nb(const pmix_proc_t procs[], size_t nprocs,
 112                                           const pmix_info_t info[], size_t ninfo,
 113                                           pmix_op_cbfunc_t cbfunc, void *cbdata)
 114 {
 115     pmix_buffer_t *msg;
 116     pmix_cmd_t cmd = PMIX_CONNECTNB_CMD;
 117     pmix_status_t rc;
 118     pmix_cb_t *cb;
 119 
 120     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
 121 
 122     pmix_output_verbose(2, pmix_client_globals.connect_output,
 123                         "pmix:connect_nb called");
 124 
 125     if (pmix_globals.init_cntr <= 0) {
 126         PMIX_RELEASE_THREAD(&pmix_global_lock);
 127         return PMIX_ERR_INIT;
 128     }
 129 
 130     /* if we aren't connected, don't attempt to send */
 131     if (!pmix_globals.connected) {
 132         PMIX_RELEASE_THREAD(&pmix_global_lock);
 133         return PMIX_ERR_UNREACH;
 134     }
 135     PMIX_RELEASE_THREAD(&pmix_global_lock);
 136 
 137     /* check for bozo input */
 138     if (NULL == procs || 0 >= nprocs) {
 139         return PMIX_ERR_BAD_PARAM;
 140     }
 141 
 142     msg = PMIX_NEW(pmix_buffer_t);
 143     /* pack the cmd */
 144     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 145                      msg, &cmd, 1, PMIX_COMMAND);
 146     if (PMIX_SUCCESS != rc) {
 147         PMIX_ERROR_LOG(rc);
 148         return rc;
 149     }
 150 
 151     /* pack the number of procs */
 152     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 153                      msg, &nprocs, 1, PMIX_SIZE);
 154     if (PMIX_SUCCESS != rc) {
 155         PMIX_ERROR_LOG(rc);
 156         return rc;
 157     }
 158     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 159                      msg, procs, nprocs, PMIX_PROC);
 160     if (PMIX_SUCCESS != rc) {
 161         PMIX_ERROR_LOG(rc);
 162         return rc;
 163     }
 164 
 165     /* pack the info structs */
 166     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 167                      msg, &ninfo, 1, PMIX_SIZE);
 168     if (PMIX_SUCCESS != rc) {
 169         PMIX_ERROR_LOG(rc);
 170         PMIX_RELEASE(msg);
 171         return rc;
 172     }
 173     if (0 < ninfo) {
 174         PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 175                          msg, info, ninfo, PMIX_INFO);
 176         if (PMIX_SUCCESS != rc) {
 177             PMIX_ERROR_LOG(rc);
 178             PMIX_RELEASE(msg);
 179             return rc;
 180         }
 181     }
 182 
 183     /* create a callback object as we need to pass it to the
 184      * recv routine so we know which callback to use when
 185      * the return message is recvd */
 186     cb = PMIX_NEW(pmix_cb_t);
 187     cb->cbfunc.opfn = cbfunc;
 188     cb->cbdata = cbdata;
 189 
 190     /* push the message into our event base to send to the server */
 191     PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
 192                        msg, wait_cbfunc, (void*)cb);
 193     if (PMIX_SUCCESS != rc) {
 194         PMIX_RELEASE(msg);
 195         PMIX_RELEASE(cb);
 196     }
 197 
 198     return rc;
 199 }
 200 
 201 PMIX_EXPORT pmix_status_t PMIx_Disconnect(const pmix_proc_t procs[], size_t nprocs,
 202                                           const pmix_info_t info[], size_t ninfo)
 203 {
 204     pmix_status_t rc;
 205     pmix_cb_t *cb;
 206 
 207     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
 208     if (pmix_globals.init_cntr <= 0) {
 209         PMIX_RELEASE_THREAD(&pmix_global_lock);
 210         return PMIX_ERR_INIT;
 211     }
 212 
 213     /* if we aren't connected, don't attempt to send */
 214     if (!pmix_globals.connected) {
 215         PMIX_RELEASE_THREAD(&pmix_global_lock);
 216         return PMIX_ERR_UNREACH;
 217     }
 218     PMIX_RELEASE_THREAD(&pmix_global_lock);
 219 
 220     /* create a callback object as we need to pass it to the
 221      * recv routine so we know which callback to use when
 222      * the return message is recvd */
 223     cb = PMIX_NEW(pmix_cb_t);
 224 
 225     if (PMIX_SUCCESS != (rc = PMIx_Disconnect_nb(procs, nprocs, info, ninfo, op_cbfunc, cb))) {
 226         PMIX_RELEASE(cb);
 227         return rc;
 228     }
 229 
 230     /* wait for the connect to complete */
 231     PMIX_WAIT_THREAD(&cb->lock);
 232     rc = cb->status;
 233     PMIX_RELEASE(cb);
 234 
 235     pmix_output_verbose(2, pmix_globals.debug_output,
 236                         "pmix: disconnect completed");
 237 
 238     return rc;
 239 }
 240 
 241 PMIX_EXPORT pmix_status_t PMIx_Disconnect_nb(const pmix_proc_t procs[], size_t nprocs,
 242                                              const pmix_info_t info[], size_t ninfo,
 243                                              pmix_op_cbfunc_t cbfunc, void *cbdata)
 244 {
 245     pmix_buffer_t *msg;
 246     pmix_cmd_t cmd = PMIX_DISCONNECTNB_CMD;
 247     pmix_status_t rc;
 248     pmix_cb_t *cb;
 249 
 250     PMIX_ACQUIRE_THREAD(&pmix_global_lock);
 251 
 252     pmix_output_verbose(2, pmix_globals.debug_output,
 253                         "pmix: disconnect called");
 254 
 255     size_t cnt;
 256     for (cnt = 0; cnt < nprocs; cnt++) {
 257         if (0 != strcmp(pmix_globals.myid.nspace, procs[cnt].nspace)) {
 258             PMIX_GDS_DEL_NSPACE(rc, procs[cnt].nspace);
 259         }
 260     }
 261 
 262     if (pmix_globals.init_cntr <= 0) {
 263         PMIX_RELEASE_THREAD(&pmix_global_lock);
 264         return PMIX_ERR_INIT;
 265     }
 266 
 267     /* if we aren't connected, don't attempt to send */
 268     if (!pmix_globals.connected) {
 269         PMIX_RELEASE_THREAD(&pmix_global_lock);
 270         return PMIX_ERR_UNREACH;
 271     }
 272     PMIX_RELEASE_THREAD(&pmix_global_lock);
 273 
 274     /* check for bozo input */
 275     if (NULL == procs || 0 >= nprocs) {
 276         return PMIX_ERR_BAD_PARAM;
 277     }
 278 
 279     msg = PMIX_NEW(pmix_buffer_t);
 280     /* pack the cmd */
 281     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 282                      msg, &cmd, 1, PMIX_COMMAND);
 283     if (PMIX_SUCCESS != rc) {
 284         PMIX_ERROR_LOG(rc);
 285         return rc;
 286     }
 287 
 288     /* pack the number of procs */
 289     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 290                      msg, &nprocs, 1, PMIX_SIZE);
 291     if (PMIX_SUCCESS != rc) {
 292         PMIX_ERROR_LOG(rc);
 293         return rc;
 294     }
 295     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 296                      msg, procs, nprocs, PMIX_PROC);
 297     if (PMIX_SUCCESS != rc) {
 298         PMIX_ERROR_LOG(rc);
 299         return rc;
 300     }
 301 
 302     /* pack the info structs */
 303     PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 304                      msg, &ninfo, 1, PMIX_SIZE);
 305     if (PMIX_SUCCESS != rc) {
 306         PMIX_ERROR_LOG(rc);
 307         PMIX_RELEASE(msg);
 308         return rc;
 309     }
 310     if (0 < ninfo) {
 311         PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
 312                          msg, info, ninfo, PMIX_INFO);
 313         if (PMIX_SUCCESS != rc) {
 314             PMIX_ERROR_LOG(rc);
 315             PMIX_RELEASE(msg);
 316             return rc;
 317         }
 318     }
 319 
 320     /* create a callback object as we need to pass it to the
 321      * recv routine so we know which callback to use when
 322      * the return message is recvd */
 323     cb = PMIX_NEW(pmix_cb_t);
 324     cb->cbfunc.opfn = cbfunc;
 325     cb->cbdata = cbdata;
 326 
 327     /* push the message into our event base to send to the server */
 328     PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
 329                        msg, wait_cbfunc, (void*)cb);
 330     if (PMIX_SUCCESS != rc) {
 331         PMIX_RELEASE(msg);
 332         PMIX_RELEASE(cb);
 333     }
 334 
 335     pmix_output_verbose(2, pmix_globals.debug_output,
 336                         "pmix: disconnect completed");
 337 
 338     return rc;
 339 }
 340 
 341 static void wait_cbfunc(struct pmix_peer_t *pr,
 342                         pmix_ptl_hdr_t *hdr,
 343                         pmix_buffer_t *buf, void *cbdata)
 344 {
 345     pmix_cb_t *cb = (pmix_cb_t*)cbdata;
 346     pmix_status_t rc;
 347     pmix_status_t ret;
 348     int32_t cnt;
 349     char *nspace;
 350     pmix_buffer_t bkt;
 351     pmix_byte_object_t bo;
 352 
 353     pmix_output_verbose(2, pmix_globals.debug_output,
 354                         "pmix:client recv callback activated with %d bytes",
 355                         (NULL == buf) ? -1 : (int)buf->bytes_used);
 356 
 357     if (NULL == buf) {
 358         ret = PMIX_ERR_BAD_PARAM;
 359         goto report;
 360     }
 361 
 362     /* a zero-byte buffer indicates that this recv is being
 363      * completed due to a lost connection */
 364     if (PMIX_BUFFER_IS_EMPTY(buf)) {
 365         ret = PMIX_ERR_UNREACH;
 366         goto report;
 367     }
 368 
 369     /* unpack the returned status */
 370     cnt = 1;
 371     PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
 372                        buf, &ret, &cnt, PMIX_STATUS);
 373     if (PMIX_SUCCESS != rc) {
 374         PMIX_ERROR_LOG(rc);
 375         ret = rc;
 376     }
 377     /* connect has to also pass back data from all nspace's involved in
 378      * the operation, including our own. Each will come as a byte object */
 379     cnt = 1;
 380     PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
 381                        buf, &bo, &cnt, PMIX_BYTE_OBJECT);
 382     while (PMIX_SUCCESS == rc) {
 383         /* load it for unpacking */
 384         PMIX_CONSTRUCT(&bkt, pmix_buffer_t);
 385         PMIX_LOAD_BUFFER(pmix_client_globals.myserver, &bkt, bo.bytes, bo.size);
 386 
 387         /* unpack the nspace for this blob */
 388         cnt = 1;
 389         PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
 390                            &bkt, &nspace, &cnt, PMIX_STRING);
 391         if (PMIX_SUCCESS != rc) {
 392             PMIX_ERROR_LOG(rc);
 393             PMIX_DESTRUCT(&bkt);
 394             continue;
 395         }
 396         /* extract and process any proc-related info for this nspace */
 397         PMIX_GDS_STORE_JOB_INFO(rc, pmix_globals.mypeer, nspace, &bkt);
 398         if (PMIX_SUCCESS != rc) {
 399             PMIX_ERROR_LOG(rc);
 400         }
 401         free(nspace);
 402         PMIX_DESTRUCT(&bkt);
 403         /* get the next one */
 404         cnt = 1;
 405         PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
 406                            buf, &bo, &cnt, PMIX_BYTE_OBJECT);
 407         }
 408     if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
 409         PMIX_ERROR_LOG(rc);
 410         ret = rc;
 411     }
 412 
 413   report:
 414     if (NULL != cb->cbfunc.opfn) {
 415         cb->cbfunc.opfn(ret, cb->cbdata);
 416     }
 417     PMIX_RELEASE(cb);
 418 }
 419 
 420 static void op_cbfunc(pmix_status_t status, void *cbdata)
 421 {
 422     pmix_cb_t *cb = (pmix_cb_t*)cbdata;
 423 
 424     cb->status = status;
 425     PMIX_POST_OBJECT(cb);
 426     PMIX_WAKEUP_THREAD(&cb->lock);
 427 }

/* [<][>][^][v][top][bottom][index][help] */