root/opal/mca/pmix/pmix4x/pmix/src/mca/ptl/usock/ptl_usock.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. init
  2. finalize
  3. connect_to_peer
  4. send_recv
  5. send_oneway
  6. send_connect_ack
  7. recv_connect_ack
  8. send_bytes
  9. read_bytes
  10. pmix_usock_send_handler
  11. pmix_usock_recv_handler
  12. pmix_usock_send_recv
  13. pmix_usock_send

   1 /*
   2  * Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2011 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2010-2011 Oak Ridge National Labs.  All rights reserved.
  13  * Copyright (c) 2011-2014 Cisco Systems, Inc.  All rights reserved.
  14  * Copyright (c) 2011-2013 Los Alamos National Security, LLC.  All rights
  15  *                         reserved.
  16  * Copyright (c) 2013-2018 Intel, Inc.  All rights reserved.
  17  * Copyright (c) 2019      Research Organization for Information Science
  18  *                         and Technology (RIST).  All rights reserved.
  19  * $COPYRIGHT$
  20  *
  21  * Additional copyrights may follow
  22  *
  23  * $HEADER$
  24  *
  25  */
  26 
  27 #include <src/include/pmix_config.h>
  28 #include "pmix_common.h"
  29 
  30 #ifdef HAVE_FCNTL_H
  31 #include <fcntl.h>
  32 #endif
  33 #ifdef HAVE_UNISTD_H
  34 #include <unistd.h>
  35 #endif
  36 #ifdef HAVE_SYS_SOCKET_H
  37 #include <sys/socket.h>
  38 #endif
  39 #ifdef HAVE_SYS_UN_H
  40 #include <sys/un.h>
  41 #endif
  42 #ifdef HAVE_SYS_UIO_H
  43 #include <sys/uio.h>
  44 #endif
  45 #ifdef HAVE_SYS_TYPES_H
  46 #include <sys/types.h>
  47 #endif
  48 #ifdef HAVE_SYS_STAT_H
  49 #include <sys/stat.h>
  50 #endif
  51 
  52 #include "src/util/argv.h"
  53 #include "src/util/error.h"
  54 #include "src/client/pmix_client_ops.h"
  55 #include "src/include/pmix_globals.h"
  56 #include "src/include/pmix_socket_errno.h"
  57 #include "src/mca/bfrops/base/base.h"
  58 #include "src/mca/psec/base/base.h"
  59 
  60 #include "src/mca/ptl/base/base.h"
  61 #include "ptl_usock.h"
  62 
  63 static pmix_status_t init(void);
  64 static void finalize(void);
  65 static pmix_status_t connect_to_peer(struct pmix_peer_t *peer,
  66                                      pmix_info_t *info, size_t ninfo);
  67 static pmix_status_t send_recv(struct pmix_peer_t *peer,
  68                                pmix_buffer_t *bfr,
  69                                pmix_ptl_cbfunc_t cbfunc,
  70                                void *cbdata);
  71 static pmix_status_t send_oneway(struct pmix_peer_t *peer,
  72                                  pmix_buffer_t *bfr,
  73                                  pmix_ptl_tag_t tag);
  74 
  75 pmix_ptl_module_t pmix_ptl_usock_module = {
  76     .init = init,
  77     .finalize = finalize,
  78     .send_recv = send_recv,
  79     .send = send_oneway,
  80     .connect_to_peer = connect_to_peer
  81 };
  82 
  83 static pmix_status_t recv_connect_ack(int sd);
  84 static pmix_status_t send_connect_ack(int sd);
  85 
  86 static pmix_status_t init(void)
  87 {
  88     return PMIX_SUCCESS;
  89 }
  90 
  91 static void finalize(void)
  92 {
  93 }
  94 
  95 static void pmix_usock_send_recv(int fd, short args, void *cbdata);
  96 static void pmix_usock_send(int fd, short args, void *cbdata);
  97 
  98 static pmix_status_t connect_to_peer(struct pmix_peer_t *peer,
  99                                      pmix_info_t *info, size_t ninfo)
 100 {
 101     struct sockaddr_un *address;
 102     char *evar, **uri;
 103     pmix_status_t rc;
 104     int sd;
 105     pmix_socklen_t len;
 106     bool retried = false;
 107 
 108     pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 109                         "[%s:%d] connect to server",
 110                         __FILE__, __LINE__);
 111 
 112     /* if we are not a client, there is nothing we can do */
 113     if (!PMIX_PROC_IS_CLIENT(pmix_globals.mypeer)) {
 114         return PMIX_ERR_NOT_SUPPORTED;
 115     }
 116 
 117     /* if we don't have a path to the daemon rendezvous point,
 118      * then we need to return an error */
 119     if (NULL != (evar = getenv("PMIX_SERVER_URI2USOCK"))) {
 120         /* this is a v2.1+ server */
 121         pmix_globals.mypeer->nptr->compat.bfrops = pmix_bfrops_base_assign_module("v21");
 122         if (NULL == pmix_globals.mypeer->nptr->compat.bfrops) {
 123             return PMIX_ERR_INIT;
 124         }
 125     } else if (NULL != (evar = getenv("PMIX_SERVER_URI"))) {
 126         /* this is a pre-v2.1 server - must use the v12 bfrops module */
 127         pmix_globals.mypeer->nptr->compat.bfrops = pmix_bfrops_base_assign_module("v12");
 128         if (NULL == pmix_globals.mypeer->nptr->compat.bfrops) {
 129             return PMIX_ERR_INIT;
 130         }
 131     } else {
 132         /* let the caller know that the server isn't available */
 133         return PMIX_ERR_SERVER_NOT_AVAIL;
 134     }
 135     /* the server will be using the same bfrops as us */
 136     pmix_client_globals.myserver->nptr->compat.bfrops = pmix_globals.mypeer->nptr->compat.bfrops;
 137     /* mark that we are using the V1 protocol */
 138     pmix_globals.mypeer->protocol = PMIX_PROTOCOL_V1;
 139 
 140     uri = pmix_argv_split(evar, ':');
 141     if (3 != pmix_argv_count(uri)) {
 142         pmix_argv_free(uri);
 143         PMIX_ERROR_LOG(PMIX_ERROR);
 144         return PMIX_ERROR;
 145     }
 146     /* set the server nspace */
 147     if (NULL == pmix_client_globals.myserver->info) {
 148         pmix_client_globals.myserver->info = PMIX_NEW(pmix_rank_info_t);
 149     }
 150     if (NULL == pmix_client_globals.myserver->nptr) {
 151         pmix_client_globals.myserver->nptr = PMIX_NEW(pmix_namespace_t);
 152     }
 153     if (NULL == pmix_client_globals.myserver->nptr->nspace) {
 154         pmix_client_globals.myserver->nptr->nspace = strdup(uri[0]);
 155     }
 156     if (NULL == pmix_client_globals.myserver->info->pname.nspace) {
 157         pmix_client_globals.myserver->info->pname.nspace = strdup(uri[0]);
 158     }
 159 
 160     /* set the server rank */
 161     pmix_client_globals.myserver->info->pname.rank = strtoull(uri[1], NULL, 10);
 162 
 163     /* setup the path to the daemon rendezvous point */
 164     memset(&mca_ptl_usock_component.connection, 0, sizeof(struct sockaddr_storage));
 165     address = (struct sockaddr_un*)&mca_ptl_usock_component.connection;
 166     address->sun_family = AF_UNIX;
 167     snprintf(address->sun_path, sizeof(address->sun_path)-1, "%s", uri[2]);
 168     /* if the rendezvous file doesn't exist, that's an error */
 169     if (0 != access(uri[2], R_OK)) {
 170         pmix_argv_free(uri);
 171         PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND);
 172         return PMIX_ERR_NOT_FOUND;
 173     }
 174     pmix_argv_free(uri);
 175 
 176   retry:
 177     /* establish the connection */
 178     len = sizeof(struct sockaddr_un);
 179     if (PMIX_SUCCESS != (rc = pmix_ptl_base_connect(&mca_ptl_usock_component.connection, len, &sd))) {
 180         PMIX_ERROR_LOG(rc);
 181         return rc;
 182     }
 183     pmix_client_globals.myserver->sd = sd;
 184 
 185     /* send our identity and any authentication credentials to the server */
 186     if (PMIX_SUCCESS != (rc = send_connect_ack(sd))) {
 187         CLOSE_THE_SOCKET(sd);
 188         return rc;
 189     }
 190 
 191     /* do whatever handshake is required */
 192     if (PMIX_SUCCESS != (rc = recv_connect_ack(sd))) {
 193         CLOSE_THE_SOCKET(sd);
 194         if (PMIX_ERR_TEMP_UNAVAILABLE == rc) {
 195             /* give it two tries */
 196             if (!retried) {
 197                 retried = true;
 198                 goto retry;
 199             }
 200         }
 201         return rc;
 202     }
 203 
 204     pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 205                         "sock_peer_try_connect: Connection across to server succeeded");
 206 
 207     /* mark the connection as made */
 208     pmix_globals.connected = true;
 209 
 210     pmix_ptl_base_set_nonblocking(sd);
 211 
 212     /* setup recv event */
 213     pmix_event_assign(&pmix_client_globals.myserver->recv_event,
 214                       pmix_globals.evbase,
 215                       pmix_client_globals.myserver->sd,
 216                       EV_READ | EV_PERSIST,
 217                       pmix_usock_recv_handler, pmix_client_globals.myserver);
 218     pmix_event_add(&pmix_client_globals.myserver->recv_event, 0);
 219     pmix_client_globals.myserver->recv_ev_active = true;
 220     PMIX_POST_OBJECT(pmix_client_globals.myserver);
 221     pmix_event_add(&pmix_client_globals.myserver->recv_event, 0);
 222 
 223     /* setup send event */
 224     pmix_event_assign(&pmix_client_globals.myserver->send_event,
 225                       pmix_globals.evbase,
 226                       pmix_client_globals.myserver->sd,
 227                       EV_WRITE|EV_PERSIST,
 228                       pmix_usock_send_handler, pmix_client_globals.myserver);
 229     pmix_client_globals.myserver->send_ev_active = false;
 230 
 231     return PMIX_SUCCESS;
 232 }
 233 
 234 static pmix_status_t send_recv(struct pmix_peer_t *peer,
 235                                pmix_buffer_t *bfr,
 236                                pmix_ptl_cbfunc_t cbfunc,
 237                                void *cbdata)
 238 {
 239     pmix_ptl_sr_t *ms;
 240     pmix_peer_t *pr = (pmix_peer_t*)peer;
 241 
 242     pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 243                         "[%s:%d] post send to server",
 244                         __FILE__, __LINE__);
 245 
 246     ms = PMIX_NEW(pmix_ptl_sr_t);
 247     PMIX_RETAIN(pr);
 248     ms->peer = pr;
 249     ms->bfr = bfr;
 250     ms->cbfunc = cbfunc;
 251     ms->cbdata = cbdata;
 252     PMIX_THREADSHIFT(ms, pmix_usock_send_recv);
 253     return PMIX_SUCCESS;
 254 }
 255 
 256 static pmix_status_t send_oneway(struct pmix_peer_t *peer,
 257                                  pmix_buffer_t *bfr,
 258                                  pmix_ptl_tag_t tag)
 259 {
 260     pmix_ptl_queue_t *q;
 261     pmix_peer_t *pr = (pmix_peer_t*)peer;
 262 
 263     /* we have to transfer this to an event for thread
 264      * safety as we need to post this message on the
 265      * peer's send queue */
 266     q = PMIX_NEW(pmix_ptl_queue_t);
 267     PMIX_RETAIN(pr);
 268     q->peer = peer;
 269     q->buf = bfr;
 270     q->tag = tag;
 271     PMIX_THREADSHIFT(q, pmix_usock_send);
 272 
 273     return PMIX_SUCCESS;
 274 }
 275 
 276 static pmix_status_t send_connect_ack(int sd)
 277 {
 278     char *msg;
 279     pmix_usock_hdr_t hdr;
 280     size_t sdsize=0, csize=0;
 281     pmix_byte_object_t cred;
 282     pmix_status_t rc;
 283     char *sec, *bfrops, *gds;
 284     pmix_bfrop_buffer_type_t bftype;
 285 
 286     pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 287                         "pmix: SEND CONNECT ACK");
 288 
 289     /* setup the header */
 290     memset(&hdr, 0, sizeof(pmix_usock_hdr_t));
 291     hdr.pindex = -1;
 292     hdr.tag = UINT32_MAX;
 293 
 294     /* reserve space for the nspace and rank info */
 295     sdsize = strlen(pmix_globals.myid.nspace) + 1 + sizeof(int);
 296 
 297     /* get a credential, if the security system provides one. Not
 298      * every SPC will do so, thus we must first check */
 299     PMIX_BYTE_OBJECT_CONSTRUCT(&cred);
 300     PMIX_PSEC_CREATE_CRED(rc, pmix_globals.mypeer,
 301                           NULL, 0, NULL, 0, &cred);
 302     if (PMIX_SUCCESS != rc) {
 303         return rc;
 304     }
 305 
 306     /* add the name of our active sec module - we selected it
 307      * in pmix_client.c prior to entering here */
 308     sec = pmix_globals.mypeer->nptr->compat.psec->name;
 309 
 310     /* add our active bfrops module name */
 311     bfrops = pmix_globals.mypeer->nptr->compat.bfrops->version;
 312     /* and the type of buffer we are using */
 313     bftype = pmix_globals.mypeer->nptr->compat.type;
 314 
 315     /* add our active gds module for working with the server */
 316     gds = (char*)pmix_client_globals.myserver->nptr->compat.gds->name;
 317 
 318     /* set the number of bytes to be read beyond the header */
 319     hdr.nbytes = sdsize + (strlen(PMIX_VERSION) + 1) + \
 320                 (sizeof(size_t) + cred.size) + \
 321                 (strlen(sec) + 1) + \
 322                 (strlen(bfrops) + 1) + sizeof(bftype) + \
 323                 (strlen(gds) + 1);  // must NULL terminate the strings!
 324 
 325     /* create a space for our message */
 326     sdsize = (sizeof(hdr) + hdr.nbytes);
 327     if (NULL == (msg = (char*)malloc(sdsize))) {
 328         PMIX_BYTE_OBJECT_DESTRUCT(&cred);
 329         return PMIX_ERR_OUT_OF_RESOURCE;
 330     }
 331     memset(msg, 0, sdsize);
 332 
 333     /* load the message */
 334     csize=0;
 335     memcpy(msg, &hdr, sizeof(pmix_usock_hdr_t));
 336     csize += sizeof(pmix_usock_hdr_t);
 337     /* pass our nspace */
 338     memcpy(msg+csize, pmix_globals.myid.nspace, strlen(pmix_globals.myid.nspace));
 339     csize += strlen(pmix_globals.myid.nspace)+1;
 340     /* pass our rank */
 341     memcpy(msg+csize, &pmix_globals.myid.rank, sizeof(int));
 342     csize += sizeof(int);
 343 
 344     /* pass our version string */
 345     memcpy(msg+csize, PMIX_VERSION, strlen(PMIX_VERSION));
 346     csize += strlen(PMIX_VERSION)+1;
 347 
 348     /* pass the size of the credential */
 349     memcpy(msg+csize, &cred.size, sizeof(size_t));
 350     csize += sizeof(size_t);
 351     if (0 < cred.size) {
 352         memcpy(msg+csize, cred.bytes, cred.size);
 353         csize += cred.size;
 354     }
 355     PMIX_BYTE_OBJECT_DESTRUCT(&cred);
 356 
 357     /* pass our active sec module */
 358     memcpy(msg+csize, sec, strlen(sec));
 359     csize += strlen(sec)+1;
 360 
 361     /* provide our active bfrops module */
 362     memcpy(msg+csize, bfrops, strlen(bfrops));
 363     csize += strlen(bfrops)+1;
 364 
 365     /* provide the bfrops type */
 366     memcpy(msg+csize, &bftype, sizeof(bftype));
 367     csize += sizeof(bftype);
 368 
 369     /* provide the gds module */
 370     memcpy(msg+csize, gds, strlen(gds));
 371 
 372     /* send the entire msg across */
 373     if (PMIX_SUCCESS != pmix_ptl_base_send_blocking(sd, msg, sdsize)) {
 374         free(msg);
 375         return PMIX_ERR_UNREACH;
 376     }
 377     free(msg);
 378     return PMIX_SUCCESS;
 379 }
 380 
 381 /* we receive a connection acknowledgement from the server,
 382  * consisting of nothing more than a status report. If success,
 383  * then we initiate authentication method */
 384 static pmix_status_t recv_connect_ack(int sd)
 385 {
 386     pmix_status_t reply;
 387     pmix_status_t rc;
 388     struct timeval tv, save;
 389     pmix_socklen_t sz;
 390     bool sockopt = true;
 391 
 392     pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 393                         "pmix: RECV CONNECT ACK FROM SERVER");
 394 
 395     /* get the current timeout value so we can reset to it */
 396     sz = sizeof(save);
 397     if (0 != getsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, (void*)&save, &sz)) {
 398         if (ENOPROTOOPT == errno || EOPNOTSUPP == errno) {
 399             sockopt = false;
 400         } else {
 401              return PMIX_ERR_UNREACH;
 402         }
 403     } else {
 404         /* set a timeout on the blocking recv so we don't hang */
 405         tv.tv_sec  = 2;
 406         tv.tv_usec = 0;
 407         if (0 != setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))) {
 408             pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 409                                 "pmix: recv_connect_ack could not setsockopt SO_RCVTIMEO");
 410             return PMIX_ERR_UNREACH;
 411         }
 412     }
 413 
 414     /* receive the status reply */
 415     rc = pmix_ptl_base_recv_blocking(sd, (char*)&reply, sizeof(int));
 416     if (PMIX_SUCCESS != rc) {
 417         if (sockopt) {
 418             /* return the socket to normal */
 419             if (0 != setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &save, sz)) {
 420                 return PMIX_ERR_UNREACH;
 421             }
 422         }
 423         return rc;
 424     }
 425 
 426     /* see if they want us to do the handshake */
 427     if (PMIX_ERR_READY_FOR_HANDSHAKE == reply) {
 428         PMIX_PSEC_CLIENT_HANDSHAKE(rc, pmix_client_globals.myserver, sd);
 429         if (PMIX_SUCCESS != rc) {
 430             return rc;
 431         }
 432     } else if (PMIX_SUCCESS != reply) {
 433         return reply;
 434     }
 435 
 436     pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 437                         "pmix: RECV CONNECT CONFIRMATION");
 438 
 439     /* receive our index into the server's client array */
 440     rc = pmix_ptl_base_recv_blocking(sd, (char*)&pmix_globals.pindex, sizeof(int));
 441     if (PMIX_SUCCESS != rc) {
 442         return rc;
 443     }
 444     if (sockopt) {
 445         /* return the socket to normal */
 446         if (0 != setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &save, sz)) {
 447             return PMIX_ERR_UNREACH;
 448         }
 449     }
 450 
 451     return PMIX_SUCCESS;
 452 }
 453 
 454 static pmix_status_t send_bytes(int sd, char **buf, size_t *remain)
 455 {
 456     pmix_status_t ret = PMIX_SUCCESS;
 457     int rc;
 458     char *ptr = *buf;
 459     while (0 < *remain) {
 460         rc = write(sd, ptr, *remain);
 461         if (rc < 0) {
 462             if (pmix_socket_errno == EINTR) {
 463                 continue;
 464             } else if (pmix_socket_errno == EAGAIN) {
 465                 /* tell the caller to keep this message on active,
 466                  * but let the event lib cycle so other messages
 467                  * can progress while this socket is busy
 468                  */
 469                 ret = PMIX_ERR_RESOURCE_BUSY;
 470                 goto exit;
 471             } else if (pmix_socket_errno == EWOULDBLOCK) {
 472                 /* tell the caller to keep this message on active,
 473                  * but let the event lib cycle so other messages
 474                  * can progress while this socket is busy
 475                  */
 476                 ret = PMIX_ERR_WOULD_BLOCK;
 477                 goto exit;
 478             }
 479             /* we hit an error and cannot progress this message */
 480             pmix_output(0, "pmix_usock_msg_send_bytes: write failed: %s (%d) [sd = %d]",
 481                         strerror(pmix_socket_errno),
 482                         pmix_socket_errno, sd);
 483             ret = PMIX_ERR_COMM_FAILURE;
 484             goto exit;
 485         }
 486         /* update location */
 487         (*remain) -= rc;
 488         ptr += rc;
 489     }
 490     /* we sent the full data block */
 491 exit:
 492     *buf = ptr;
 493     return ret;
 494 }
 495 
 496 static pmix_status_t read_bytes(int sd, char **buf, size_t *remain)
 497 {
 498     pmix_status_t ret = PMIX_SUCCESS;
 499     int rc;
 500     char *ptr = *buf;
 501 
 502     /* read until all bytes recvd or error */
 503     while (0 < *remain) {
 504         rc = read(sd, ptr, *remain);
 505         if (rc < 0) {
 506             if(pmix_socket_errno == EINTR) {
 507                 continue;
 508             } else if (pmix_socket_errno == EAGAIN) {
 509                 /* tell the caller to keep this message on active,
 510                  * but let the event lib cycle so other messages
 511                  * can progress while this socket is busy
 512                  */
 513                 ret = PMIX_ERR_RESOURCE_BUSY;
 514                 goto exit;
 515             } else if (pmix_socket_errno == EWOULDBLOCK) {
 516                 /* tell the caller to keep this message on active,
 517                  * but let the event lib cycle so other messages
 518                  * can progress while this socket is busy
 519                  */
 520                 ret = PMIX_ERR_WOULD_BLOCK;
 521                 goto exit;
 522             }
 523             /* we hit an error and cannot progress this message - report
 524              * the error back to the RML and let the caller know
 525              * to abort this message
 526              */
 527             pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 528                                 "pmix_usock_msg_recv: readv failed: %s (%d)",
 529                                 strerror(pmix_socket_errno),
 530                                 pmix_socket_errno);
 531             ret = PMIX_ERR_UNREACH;
 532             goto exit;
 533         } else if (0 == rc) {
 534             /* the remote peer closed the connection */
 535             ret = PMIX_ERR_UNREACH;
 536             goto exit;
 537         }
 538         /* we were able to read something, so adjust counters and location */
 539         *remain -= rc;
 540         ptr += rc;
 541     }
 542     /* we read the full data block */
 543 exit:
 544     *buf = ptr;
 545     return ret;
 546 }
 547 
 548 /*
 549  * A file descriptor is available/ready for send. Check the state
 550  * of the socket and take the appropriate action.
 551  */
 552 void pmix_usock_send_handler(int sd, short flags, void *cbdata)
 553 {
 554     pmix_peer_t *peer = (pmix_peer_t*)cbdata;
 555     pmix_ptl_send_t *msg = peer->send_msg;
 556     pmix_status_t rc;
 557     uint32_t nbytes;
 558 
 559     /* acquire the object */
 560     PMIX_ACQUIRE_OBJECT(peer);
 561 
 562     pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 563                         "%s:%d usock:send_handler SENDING TO PEER %s:%d tag %u with %s msg",
 564                         pmix_globals.myid.nspace, pmix_globals.myid.rank,
 565                         peer->info->pname.nspace, peer->info->pname.rank,
 566                         (NULL == msg) ? UINT_MAX : msg->hdr.tag,
 567                         (NULL == msg) ? "NULL" : "NON-NULL");
 568 
 569     if (NULL != msg) {
 570         if (!msg->hdr_sent) {
 571             if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) {
 572             /* we have to convert the header back to host-byte order */
 573                 msg->hdr.pindex = ntohl(msg->hdr.pindex);
 574                 msg->hdr.tag = ntohl(msg->hdr.tag);
 575                 nbytes = msg->hdr.nbytes;
 576                 msg->hdr.nbytes = ntohl(nbytes);
 577             }
 578             pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 579                                 "usock:send_handler SENDING HEADER WITH MSG IDX %d TAG %d SIZE %lu",
 580                                 msg->hdr.pindex, msg->hdr.tag, (unsigned long)msg->hdr.nbytes);
 581             if (PMIX_SUCCESS == (rc = send_bytes(peer->sd, &msg->sdptr, &msg->sdbytes))) {
 582                 /* header is completely sent */
 583                 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 584                                     "usock:send_handler HEADER SENT");
 585                 msg->hdr_sent = true;
 586                 /* setup to send the data */
 587                 if (NULL == msg->data) {
 588                     /* this was a zero-byte msg - nothing more to do */
 589                     PMIX_RELEASE(msg);
 590                     peer->send_msg = NULL;
 591                     goto next;
 592                 } else {
 593                     /* send the data as a single block */
 594                     msg->sdptr = msg->data->base_ptr;
 595                     msg->sdbytes = msg->hdr.nbytes;
 596                 }
 597                 /* fall thru and let the send progress */
 598             } else if (PMIX_ERR_RESOURCE_BUSY == rc ||
 599                        PMIX_ERR_WOULD_BLOCK == rc) {
 600                 /* exit this event and let the event lib progress */
 601                 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 602                                     "usock:send_handler RES BUSY OR WOULD BLOCK");
 603                 if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) {
 604                     /* have to convert back again so we are correct when we re-enter */
 605                     msg->hdr.pindex = htonl(msg->hdr.pindex);
 606                     msg->hdr.tag = htonl(msg->hdr.tag);
 607                     nbytes = msg->hdr.nbytes;
 608                     msg->hdr.nbytes = htonl(nbytes);
 609                 }
 610                 /* ensure we post the modified peer object before another thread
 611                  * picks it back up */
 612                 PMIX_POST_OBJECT(peer);
 613                 return;
 614             } else {
 615                 // report the error
 616                 pmix_event_del(&peer->send_event);
 617                 peer->send_ev_active = false;
 618                 PMIX_RELEASE(msg);
 619                 peer->send_msg = NULL;
 620                 pmix_ptl_base_lost_connection(peer, rc);
 621                 /* ensure we post the modified peer object before another thread
 622                  * picks it back up */
 623                 PMIX_POST_OBJECT(peer);
 624                 return;
 625             }
 626         }
 627 
 628         if (msg->hdr_sent) {
 629             pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 630                                 "usock:send_handler SENDING BODY OF MSG");
 631             if (PMIX_SUCCESS == (rc = send_bytes(peer->sd, &msg->sdptr, &msg->sdbytes))) {
 632                 // message is complete
 633                 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 634                                     "usock:send_handler BODY SENT");
 635                 PMIX_RELEASE(msg);
 636                 peer->send_msg = NULL;
 637             } else if (PMIX_ERR_RESOURCE_BUSY == rc ||
 638                        PMIX_ERR_WOULD_BLOCK == rc) {
 639                 /* exit this event and let the event lib progress */
 640                 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 641                                     "usock:send_handler RES BUSY OR WOULD BLOCK");
 642                 /* ensure we post the modified peer object before another thread
 643                  * picks it back up */
 644                 PMIX_POST_OBJECT(peer);
 645                 return;
 646             } else {
 647                 // report the error
 648                 pmix_output(0, "pmix_usock_peer_send_handler: unable to send message ON SOCKET %d",
 649                             peer->sd);
 650                 pmix_event_del(&peer->send_event);
 651                 peer->send_ev_active = false;
 652                 PMIX_RELEASE(msg);
 653                 peer->send_msg = NULL;
 654                 pmix_ptl_base_lost_connection(peer, rc);
 655                 /* ensure we post the modified peer object before another thread
 656                  * picks it back up */
 657                 PMIX_POST_OBJECT(peer);
 658                 return;
 659             }
 660         }
 661 
 662     next:
 663         /* if current message completed - progress any pending sends by
 664          * moving the next in the queue into the "on-deck" position. Note
 665          * that this doesn't mean we send the message right now - we will
 666          * wait for another send_event to fire before doing so. This gives
 667          * us a chance to service any pending recvs.
 668          */
 669         peer->send_msg = (pmix_ptl_send_t*)
 670             pmix_list_remove_first(&peer->send_queue);
 671     }
 672 
 673     /* if nothing else to do unregister for send event notifications */
 674     if (NULL == peer->send_msg && peer->send_ev_active) {
 675         pmix_event_del(&peer->send_event);
 676         peer->send_ev_active = false;
 677     }
 678 
 679     /* ensure we post the modified peer object before another thread
 680      * picks it back up */
 681     PMIX_POST_OBJECT(peer);
 682 }
 683 
 684 /*
 685  * Dispatch to the appropriate action routine based on the state
 686  * of the connection with the peer.
 687  */
 688 
 689 void pmix_usock_recv_handler(int sd, short flags, void *cbdata)
 690 {
 691     pmix_status_t rc;
 692     pmix_peer_t *peer = (pmix_peer_t*)cbdata;
 693     pmix_ptl_recv_t *msg = NULL;
 694 
 695     /* acquire the object */
 696     PMIX_ACQUIRE_OBJECT(peer);
 697 
 698     pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 699                         "usock:recv:handler called with peer %s:%d",
 700                         (NULL == peer) ? "NULL" : peer->info->pname.nspace,
 701                         (NULL == peer) ? PMIX_RANK_UNDEF : peer->info->pname.rank);
 702 
 703     if (NULL == peer) {
 704         return;
 705     }
 706     /* allocate a new message and setup for recv */
 707     if (NULL == peer->recv_msg) {
 708         pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 709                             "usock:recv:handler allocate new recv msg");
 710         peer->recv_msg = PMIX_NEW(pmix_ptl_recv_t);
 711         if (NULL == peer->recv_msg) {
 712             pmix_output(0, "usock_recv_handler: unable to allocate recv message\n");
 713             goto err_close;
 714         }
 715         PMIX_RETAIN(peer);
 716         peer->recv_msg->peer = peer;  // provide a handle back to the peer object
 717         /* start by reading the header */
 718         peer->recv_msg->rdptr = (char*)&peer->recv_msg->hdr;
 719         peer->recv_msg->rdbytes = sizeof(pmix_usock_hdr_t);
 720     }
 721     msg = peer->recv_msg;
 722     msg->sd = sd;
 723     /* if the header hasn't been completely read, read it */
 724     if (!msg->hdr_recvd) {
 725         pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 726                             "usock:recv:handler read hdr on socket %d", peer->sd);
 727         if (PMIX_SUCCESS == (rc = read_bytes(peer->sd, &msg->rdptr, &msg->rdbytes))) {
 728             /* completed reading the header */
 729             peer->recv_msg->hdr_recvd = true;
 730             pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 731                                 "RECVD MSG FOR TAG %d SIZE %d",
 732                                 (int)peer->recv_msg->hdr.tag,
 733                                 (int)peer->recv_msg->hdr.nbytes);
 734             /* if this is a zero-byte message, then we are done */
 735             if (0 == peer->recv_msg->hdr.nbytes) {
 736                 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 737                                     "RECVD ZERO-BYTE MESSAGE FROM %s:%d for tag %d",
 738                                     peer->info->pname.nspace, peer->info->pname.rank,
 739                                     peer->recv_msg->hdr.tag);
 740                 peer->recv_msg->data = NULL;  // make sure
 741                 peer->recv_msg->rdptr = NULL;
 742                 peer->recv_msg->rdbytes = 0;
 743                 /* post it for delivery */
 744                 PMIX_ACTIVATE_POST_MSG(peer->recv_msg);
 745                 peer->recv_msg = NULL;
 746                 PMIX_POST_OBJECT(peer);
 747                 return;
 748             } else {
 749                 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 750                                     "usock:recv:handler allocate data region of size %lu",
 751                                     (unsigned long)peer->recv_msg->hdr.nbytes);
 752                 /* allocate the data region */
 753                 peer->recv_msg->data = (char*)malloc(peer->recv_msg->hdr.nbytes);
 754                 memset(peer->recv_msg->data, 0, peer->recv_msg->hdr.nbytes);
 755                 /* point to it */
 756                 peer->recv_msg->rdptr = peer->recv_msg->data;
 757                 peer->recv_msg->rdbytes = peer->recv_msg->hdr.nbytes;
 758             }
 759             /* fall thru and attempt to read the data */
 760         } else if (PMIX_ERR_RESOURCE_BUSY == rc ||
 761                    PMIX_ERR_WOULD_BLOCK == rc) {
 762             /* exit this event and let the event lib progress */
 763             return;
 764         } else {
 765             /* the remote peer closed the connection - report that condition
 766              * and let the caller know
 767              */
 768             pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 769                                 "pmix_usock_msg_recv: peer closed connection");
 770             goto err_close;
 771         }
 772     }
 773 
 774     if (peer->recv_msg->hdr_recvd) {
 775         /* continue to read the data block - we start from
 776          * wherever we left off, which could be at the
 777          * beginning or somewhere in the message
 778          */
 779         if (PMIX_SUCCESS == (rc = read_bytes(peer->sd, &msg->rdptr, &msg->rdbytes))) {
 780             /* we recvd all of the message */
 781             pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 782                                 "RECVD COMPLETE MESSAGE FROM SERVER OF %d BYTES FOR TAG %d ON PEER SOCKET %d",
 783                                 (int)peer->recv_msg->hdr.nbytes,
 784                                 peer->recv_msg->hdr.tag, peer->sd);
 785             /* post it for delivery */
 786             PMIX_ACTIVATE_POST_MSG(peer->recv_msg);
 787             peer->recv_msg = NULL;
 788             /* ensure we post the modified peer object before another thread
 789              * picks it back up */
 790             PMIX_POST_OBJECT(peer);
 791             return;
 792         } else if (PMIX_ERR_RESOURCE_BUSY == rc ||
 793                    PMIX_ERR_WOULD_BLOCK == rc) {
 794             /* exit this event and let the event lib progress */
 795             /* ensure we post the modified peer object before another thread
 796              * picks it back up */
 797             PMIX_POST_OBJECT(peer);
 798             return;
 799         } else {
 800             /* the remote peer closed the connection - report that condition
 801              * and let the caller know
 802              */
 803             pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 804                                 "pmix_usock_msg_recv: peer closed connection");
 805             goto err_close;
 806         }
 807     }
 808     /* success */
 809     return;
 810 
 811   err_close:
 812     /* stop all events */
 813     if (peer->recv_ev_active) {
 814         pmix_event_del(&peer->recv_event);
 815         peer->recv_ev_active = false;
 816     }
 817     if (peer->send_ev_active) {
 818         pmix_event_del(&peer->send_event);
 819         peer->send_ev_active = false;
 820     }
 821     if (NULL != peer->recv_msg) {
 822         PMIX_RELEASE(peer->recv_msg);
 823         peer->recv_msg = NULL;
 824     }
 825     pmix_ptl_base_lost_connection(peer, PMIX_ERR_UNREACH);
 826     /* ensure we post the modified peer object before another thread
 827      * picks it back up */
 828     PMIX_POST_OBJECT(peer);
 829 }
 830 
 831 void pmix_usock_send_recv(int fd, short args, void *cbdata)
 832 {
 833     pmix_ptl_sr_t *ms = (pmix_ptl_sr_t*)cbdata;
 834     pmix_ptl_posted_recv_t *req;
 835     pmix_ptl_send_t *snd;
 836     uint32_t tag;
 837 
 838     /* acquire the object */
 839     PMIX_ACQUIRE_OBJECT(ms);
 840 
 841     if (ms->peer->sd < 0) {
 842         /* this peer's socket has been closed */
 843         PMIX_RELEASE(ms);
 844         /* ensure we post the object before another thread
 845          * picks it back up */
 846         PMIX_POST_OBJECT(NULL);
 847         return;
 848     }
 849 
 850     /* take the next tag in the sequence */
 851     pmix_ptl_globals.current_tag++;
 852     if (UINT32_MAX == pmix_ptl_globals.current_tag ) {
 853         pmix_ptl_globals.current_tag = PMIX_PTL_TAG_DYNAMIC;
 854     }
 855     tag = pmix_ptl_globals.current_tag;
 856 
 857     if (NULL != ms->cbfunc) {
 858         /* if a callback msg is expected, setup a recv for it */
 859         req = PMIX_NEW(pmix_ptl_posted_recv_t);
 860         req->tag = tag;
 861         req->cbfunc = ms->cbfunc;
 862         req->cbdata = ms->cbdata;
 863         pmix_output_verbose(5, pmix_ptl_base_framework.framework_output,
 864                             "posting recv on tag %d", req->tag);
 865         /* add it to the list of recvs - we cannot have unexpected messages
 866          * in this subsystem as the server never sends us something that
 867          * we didn't previously request */
 868         pmix_list_prepend(&pmix_ptl_globals.posted_recvs, &req->super);
 869     }
 870 
 871     snd = PMIX_NEW(pmix_ptl_send_t);
 872     snd->hdr.pindex = pmix_globals.pindex;
 873     snd->hdr.tag = tag;
 874     snd->hdr.nbytes = ms->bfr->bytes_used;
 875     snd->data = ms->bfr;
 876     /* always start with the header */
 877     snd->sdptr = (char*)&snd->hdr;
 878     snd->sdbytes = sizeof(pmix_usock_hdr_t);
 879 
 880     /* if there is no message on-deck, put this one there */
 881     if (NULL == ms->peer->send_msg) {
 882         ms->peer->send_msg = snd;
 883     } else {
 884         /* add it to the queue */
 885         pmix_list_append(&ms->peer->send_queue, &snd->super);
 886     }
 887     /* ensure the send event is active */
 888     if (!ms->peer->send_ev_active) {
 889         ms->peer->send_ev_active = true;
 890         PMIX_POST_OBJECT(snd);
 891         pmix_event_add(&ms->peer->send_event, 0);
 892     }
 893     /* cleanup */
 894     PMIX_RELEASE(ms);
 895     PMIX_POST_OBJECT(snd);
 896 }
 897 
 898 static void pmix_usock_send(int sd, short args, void *cbdata)
 899 {
 900     pmix_ptl_queue_t *queue = (pmix_ptl_queue_t*)cbdata;
 901     pmix_ptl_send_t *snd;
 902 
 903     /* acquire the object */
 904     PMIX_ACQUIRE_OBJECT(queue);
 905 
 906     if (NULL == queue->peer || queue->peer->sd < 0 ||
 907         NULL == queue->peer->info || NULL == queue->peer->nptr) {
 908         /* this peer has lost connection */
 909         PMIX_RELEASE(queue);
 910         /* ensure we post the object before another thread
 911          * picks it back up */
 912         PMIX_POST_OBJECT(queue);
 913         return;
 914     }
 915 
 916     pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 917                         "[%s:%d] send to %s:%u on tag %d",
 918                         __FILE__, __LINE__,
 919                         (queue->peer)->info->pname.nspace,
 920                         (queue->peer)->info->pname.rank, (queue->tag));
 921 
 922     snd = PMIX_NEW(pmix_ptl_send_t);
 923     snd->hdr.pindex = htonl(pmix_globals.pindex);
 924     snd->hdr.tag = htonl(queue->tag);
 925     snd->hdr.nbytes = htonl((queue->buf)->bytes_used);
 926     snd->data = (queue->buf);
 927     /* always start with the header */
 928     snd->sdptr = (char*)&snd->hdr;
 929     snd->sdbytes = sizeof(pmix_ptl_hdr_t);
 930 
 931     /* if there is no message on-deck, put this one there */
 932     if (NULL == (queue->peer)->send_msg) {
 933         (queue->peer)->send_msg = snd;
 934     } else {
 935         /* add it to the queue */
 936         pmix_list_append(&(queue->peer)->send_queue, &snd->super);
 937     }
 938     /* ensure the send event is active */
 939     if (!(queue->peer)->send_ev_active) {
 940         (queue->peer)->send_ev_active = true;
 941         PMIX_POST_OBJECT(queue->peer);
 942         pmix_event_add(&(queue->peer)->send_event, 0);
 943     }
 944     PMIX_RELEASE(queue);
 945     PMIX_POST_OBJECT(snd);
 946 }

/* [<][>][^][v][top][bottom][index][help] */