This source file includes following definitions.
- init
- finalize
- connect_to_peer
- send_recv
- send_oneway
- send_connect_ack
- recv_connect_ack
- send_bytes
- read_bytes
- pmix_usock_send_handler
- pmix_usock_recv_handler
- pmix_usock_send_recv
- pmix_usock_send
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 
  17 
  18 
  19 
  20 
  21 
  22 
  23 
  24 
  25 
  26 
  27 #include <src/include/pmix_config.h>
  28 #include "pmix_common.h"
  29 
  30 #ifdef HAVE_FCNTL_H
  31 #include <fcntl.h>
  32 #endif
  33 #ifdef HAVE_UNISTD_H
  34 #include <unistd.h>
  35 #endif
  36 #ifdef HAVE_SYS_SOCKET_H
  37 #include <sys/socket.h>
  38 #endif
  39 #ifdef HAVE_SYS_UN_H
  40 #include <sys/un.h>
  41 #endif
  42 #ifdef HAVE_SYS_UIO_H
  43 #include <sys/uio.h>
  44 #endif
  45 #ifdef HAVE_SYS_TYPES_H
  46 #include <sys/types.h>
  47 #endif
  48 #ifdef HAVE_SYS_STAT_H
  49 #include <sys/stat.h>
  50 #endif
  51 
  52 #include "src/util/argv.h"
  53 #include "src/util/error.h"
  54 #include "src/client/pmix_client_ops.h"
  55 #include "src/include/pmix_globals.h"
  56 #include "src/include/pmix_socket_errno.h"
  57 #include "src/mca/bfrops/base/base.h"
  58 #include "src/mca/psec/base/base.h"
  59 
  60 #include "src/mca/ptl/base/base.h"
  61 #include "ptl_usock.h"
  62 
  63 static pmix_status_t init(void);
  64 static void finalize(void);
  65 static pmix_status_t connect_to_peer(struct pmix_peer_t *peer,
  66                                      pmix_info_t *info, size_t ninfo);
  67 static pmix_status_t send_recv(struct pmix_peer_t *peer,
  68                                pmix_buffer_t *bfr,
  69                                pmix_ptl_cbfunc_t cbfunc,
  70                                void *cbdata);
  71 static pmix_status_t send_oneway(struct pmix_peer_t *peer,
  72                                  pmix_buffer_t *bfr,
  73                                  pmix_ptl_tag_t tag);
  74 
  75 pmix_ptl_module_t pmix_ptl_usock_module = {
  76     .init = init,
  77     .finalize = finalize,
  78     .send_recv = send_recv,
  79     .send = send_oneway,
  80     .connect_to_peer = connect_to_peer
  81 };
  82 
  83 static pmix_status_t recv_connect_ack(int sd);
  84 static pmix_status_t send_connect_ack(int sd);
  85 
  86 static pmix_status_t init(void)
  87 {
  88     return PMIX_SUCCESS;
  89 }
  90 
  91 static void finalize(void)
  92 {
  93 }
  94 
  95 static void pmix_usock_send_recv(int fd, short args, void *cbdata);
  96 static void pmix_usock_send(int fd, short args, void *cbdata);
  97 
  98 static pmix_status_t connect_to_peer(struct pmix_peer_t *peer,
  99                                      pmix_info_t *info, size_t ninfo)
 100 {
 101     struct sockaddr_un *address;
 102     char *evar, **uri;
 103     pmix_status_t rc;
 104     int sd;
 105     pmix_socklen_t len;
 106     bool retried = false;
 107 
 108     pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 109                         "[%s:%d] connect to server",
 110                         __FILE__, __LINE__);
 111 
 112     
 113     if (!PMIX_PROC_IS_CLIENT(pmix_globals.mypeer)) {
 114         return PMIX_ERR_NOT_SUPPORTED;
 115     }
 116 
 117     
 118 
 119     if (NULL != (evar = getenv("PMIX_SERVER_URI2USOCK"))) {
 120         
 121         pmix_globals.mypeer->nptr->compat.bfrops = pmix_bfrops_base_assign_module("v21");
 122         if (NULL == pmix_globals.mypeer->nptr->compat.bfrops) {
 123             return PMIX_ERR_INIT;
 124         }
 125     } else if (NULL != (evar = getenv("PMIX_SERVER_URI"))) {
 126         
 127         pmix_globals.mypeer->nptr->compat.bfrops = pmix_bfrops_base_assign_module("v12");
 128         if (NULL == pmix_globals.mypeer->nptr->compat.bfrops) {
 129             return PMIX_ERR_INIT;
 130         }
 131     } else {
 132         
 133         return PMIX_ERR_SERVER_NOT_AVAIL;
 134     }
 135     
 136     pmix_client_globals.myserver->nptr->compat.bfrops = pmix_globals.mypeer->nptr->compat.bfrops;
 137     
 138     pmix_globals.mypeer->protocol = PMIX_PROTOCOL_V1;
 139 
 140     uri = pmix_argv_split(evar, ':');
 141     if (3 != pmix_argv_count(uri)) {
 142         pmix_argv_free(uri);
 143         PMIX_ERROR_LOG(PMIX_ERROR);
 144         return PMIX_ERROR;
 145     }
 146     
 147     if (NULL == pmix_client_globals.myserver->info) {
 148         pmix_client_globals.myserver->info = PMIX_NEW(pmix_rank_info_t);
 149     }
 150     if (NULL == pmix_client_globals.myserver->nptr) {
 151         pmix_client_globals.myserver->nptr = PMIX_NEW(pmix_namespace_t);
 152     }
 153     if (NULL == pmix_client_globals.myserver->nptr->nspace) {
 154         pmix_client_globals.myserver->nptr->nspace = strdup(uri[0]);
 155     }
 156     if (NULL == pmix_client_globals.myserver->info->pname.nspace) {
 157         pmix_client_globals.myserver->info->pname.nspace = strdup(uri[0]);
 158     }
 159 
 160     
 161     pmix_client_globals.myserver->info->pname.rank = strtoull(uri[1], NULL, 10);
 162 
 163     
 164     memset(&mca_ptl_usock_component.connection, 0, sizeof(struct sockaddr_storage));
 165     address = (struct sockaddr_un*)&mca_ptl_usock_component.connection;
 166     address->sun_family = AF_UNIX;
 167     snprintf(address->sun_path, sizeof(address->sun_path)-1, "%s", uri[2]);
 168     
 169     if (0 != access(uri[2], R_OK)) {
 170         pmix_argv_free(uri);
 171         PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND);
 172         return PMIX_ERR_NOT_FOUND;
 173     }
 174     pmix_argv_free(uri);
 175 
 176   retry:
 177     
 178     len = sizeof(struct sockaddr_un);
 179     if (PMIX_SUCCESS != (rc = pmix_ptl_base_connect(&mca_ptl_usock_component.connection, len, &sd))) {
 180         PMIX_ERROR_LOG(rc);
 181         return rc;
 182     }
 183     pmix_client_globals.myserver->sd = sd;
 184 
 185     
 186     if (PMIX_SUCCESS != (rc = send_connect_ack(sd))) {
 187         CLOSE_THE_SOCKET(sd);
 188         return rc;
 189     }
 190 
 191     
 192     if (PMIX_SUCCESS != (rc = recv_connect_ack(sd))) {
 193         CLOSE_THE_SOCKET(sd);
 194         if (PMIX_ERR_TEMP_UNAVAILABLE == rc) {
 195             
 196             if (!retried) {
 197                 retried = true;
 198                 goto retry;
 199             }
 200         }
 201         return rc;
 202     }
 203 
 204     pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 205                         "sock_peer_try_connect: Connection across to server succeeded");
 206 
 207     
 208     pmix_globals.connected = true;
 209 
 210     pmix_ptl_base_set_nonblocking(sd);
 211 
 212     
 213     pmix_event_assign(&pmix_client_globals.myserver->recv_event,
 214                       pmix_globals.evbase,
 215                       pmix_client_globals.myserver->sd,
 216                       EV_READ | EV_PERSIST,
 217                       pmix_usock_recv_handler, pmix_client_globals.myserver);
 218     pmix_event_add(&pmix_client_globals.myserver->recv_event, 0);
 219     pmix_client_globals.myserver->recv_ev_active = true;
 220     PMIX_POST_OBJECT(pmix_client_globals.myserver);
 221     pmix_event_add(&pmix_client_globals.myserver->recv_event, 0);
 222 
 223     
 224     pmix_event_assign(&pmix_client_globals.myserver->send_event,
 225                       pmix_globals.evbase,
 226                       pmix_client_globals.myserver->sd,
 227                       EV_WRITE|EV_PERSIST,
 228                       pmix_usock_send_handler, pmix_client_globals.myserver);
 229     pmix_client_globals.myserver->send_ev_active = false;
 230 
 231     return PMIX_SUCCESS;
 232 }
 233 
 234 static pmix_status_t send_recv(struct pmix_peer_t *peer,
 235                                pmix_buffer_t *bfr,
 236                                pmix_ptl_cbfunc_t cbfunc,
 237                                void *cbdata)
 238 {
 239     pmix_ptl_sr_t *ms;
 240     pmix_peer_t *pr = (pmix_peer_t*)peer;
 241 
 242     pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 243                         "[%s:%d] post send to server",
 244                         __FILE__, __LINE__);
 245 
 246     ms = PMIX_NEW(pmix_ptl_sr_t);
 247     PMIX_RETAIN(pr);
 248     ms->peer = pr;
 249     ms->bfr = bfr;
 250     ms->cbfunc = cbfunc;
 251     ms->cbdata = cbdata;
 252     PMIX_THREADSHIFT(ms, pmix_usock_send_recv);
 253     return PMIX_SUCCESS;
 254 }
 255 
 256 static pmix_status_t send_oneway(struct pmix_peer_t *peer,
 257                                  pmix_buffer_t *bfr,
 258                                  pmix_ptl_tag_t tag)
 259 {
 260     pmix_ptl_queue_t *q;
 261     pmix_peer_t *pr = (pmix_peer_t*)peer;
 262 
 263     
 264 
 265 
 266     q = PMIX_NEW(pmix_ptl_queue_t);
 267     PMIX_RETAIN(pr);
 268     q->peer = peer;
 269     q->buf = bfr;
 270     q->tag = tag;
 271     PMIX_THREADSHIFT(q, pmix_usock_send);
 272 
 273     return PMIX_SUCCESS;
 274 }
 275 
 276 static pmix_status_t send_connect_ack(int sd)
 277 {
 278     char *msg;
 279     pmix_usock_hdr_t hdr;
 280     size_t sdsize=0, csize=0;
 281     pmix_byte_object_t cred;
 282     pmix_status_t rc;
 283     char *sec, *bfrops, *gds;
 284     pmix_bfrop_buffer_type_t bftype;
 285 
 286     pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 287                         "pmix: SEND CONNECT ACK");
 288 
 289     
 290     memset(&hdr, 0, sizeof(pmix_usock_hdr_t));
 291     hdr.pindex = -1;
 292     hdr.tag = UINT32_MAX;
 293 
 294     
 295     sdsize = strlen(pmix_globals.myid.nspace) + 1 + sizeof(int);
 296 
 297     
 298 
 299     PMIX_BYTE_OBJECT_CONSTRUCT(&cred);
 300     PMIX_PSEC_CREATE_CRED(rc, pmix_globals.mypeer,
 301                           NULL, 0, NULL, 0, &cred);
 302     if (PMIX_SUCCESS != rc) {
 303         return rc;
 304     }
 305 
 306     
 307 
 308     sec = pmix_globals.mypeer->nptr->compat.psec->name;
 309 
 310     
 311     bfrops = pmix_globals.mypeer->nptr->compat.bfrops->version;
 312     
 313     bftype = pmix_globals.mypeer->nptr->compat.type;
 314 
 315     
 316     gds = (char*)pmix_client_globals.myserver->nptr->compat.gds->name;
 317 
 318     
 319     hdr.nbytes = sdsize + (strlen(PMIX_VERSION) + 1) + \
 320                 (sizeof(size_t) + cred.size) + \
 321                 (strlen(sec) + 1) + \
 322                 (strlen(bfrops) + 1) + sizeof(bftype) + \
 323                 (strlen(gds) + 1);  
 324 
 325     
 326     sdsize = (sizeof(hdr) + hdr.nbytes);
 327     if (NULL == (msg = (char*)malloc(sdsize))) {
 328         PMIX_BYTE_OBJECT_DESTRUCT(&cred);
 329         return PMIX_ERR_OUT_OF_RESOURCE;
 330     }
 331     memset(msg, 0, sdsize);
 332 
 333     
 334     csize=0;
 335     memcpy(msg, &hdr, sizeof(pmix_usock_hdr_t));
 336     csize += sizeof(pmix_usock_hdr_t);
 337     
 338     memcpy(msg+csize, pmix_globals.myid.nspace, strlen(pmix_globals.myid.nspace));
 339     csize += strlen(pmix_globals.myid.nspace)+1;
 340     
 341     memcpy(msg+csize, &pmix_globals.myid.rank, sizeof(int));
 342     csize += sizeof(int);
 343 
 344     
 345     memcpy(msg+csize, PMIX_VERSION, strlen(PMIX_VERSION));
 346     csize += strlen(PMIX_VERSION)+1;
 347 
 348     
 349     memcpy(msg+csize, &cred.size, sizeof(size_t));
 350     csize += sizeof(size_t);
 351     if (0 < cred.size) {
 352         memcpy(msg+csize, cred.bytes, cred.size);
 353         csize += cred.size;
 354     }
 355     PMIX_BYTE_OBJECT_DESTRUCT(&cred);
 356 
 357     
 358     memcpy(msg+csize, sec, strlen(sec));
 359     csize += strlen(sec)+1;
 360 
 361     
 362     memcpy(msg+csize, bfrops, strlen(bfrops));
 363     csize += strlen(bfrops)+1;
 364 
 365     
 366     memcpy(msg+csize, &bftype, sizeof(bftype));
 367     csize += sizeof(bftype);
 368 
 369     
 370     memcpy(msg+csize, gds, strlen(gds));
 371 
 372     
 373     if (PMIX_SUCCESS != pmix_ptl_base_send_blocking(sd, msg, sdsize)) {
 374         free(msg);
 375         return PMIX_ERR_UNREACH;
 376     }
 377     free(msg);
 378     return PMIX_SUCCESS;
 379 }
 380 
 381 
 382 
 383 
 384 static pmix_status_t recv_connect_ack(int sd)
 385 {
 386     pmix_status_t reply;
 387     pmix_status_t rc;
 388     struct timeval tv, save;
 389     pmix_socklen_t sz;
 390     bool sockopt = true;
 391 
 392     pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 393                         "pmix: RECV CONNECT ACK FROM SERVER");
 394 
 395     
 396     sz = sizeof(save);
 397     if (0 != getsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, (void*)&save, &sz)) {
 398         if (ENOPROTOOPT == errno || EOPNOTSUPP == errno) {
 399             sockopt = false;
 400         } else {
 401              return PMIX_ERR_UNREACH;
 402         }
 403     } else {
 404         
 405         tv.tv_sec  = 2;
 406         tv.tv_usec = 0;
 407         if (0 != setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))) {
 408             pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 409                                 "pmix: recv_connect_ack could not setsockopt SO_RCVTIMEO");
 410             return PMIX_ERR_UNREACH;
 411         }
 412     }
 413 
 414     
 415     rc = pmix_ptl_base_recv_blocking(sd, (char*)&reply, sizeof(int));
 416     if (PMIX_SUCCESS != rc) {
 417         if (sockopt) {
 418             
 419             if (0 != setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &save, sz)) {
 420                 return PMIX_ERR_UNREACH;
 421             }
 422         }
 423         return rc;
 424     }
 425 
 426     
 427     if (PMIX_ERR_READY_FOR_HANDSHAKE == reply) {
 428         PMIX_PSEC_CLIENT_HANDSHAKE(rc, pmix_client_globals.myserver, sd);
 429         if (PMIX_SUCCESS != rc) {
 430             return rc;
 431         }
 432     } else if (PMIX_SUCCESS != reply) {
 433         return reply;
 434     }
 435 
 436     pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 437                         "pmix: RECV CONNECT CONFIRMATION");
 438 
 439     
 440     rc = pmix_ptl_base_recv_blocking(sd, (char*)&pmix_globals.pindex, sizeof(int));
 441     if (PMIX_SUCCESS != rc) {
 442         return rc;
 443     }
 444     if (sockopt) {
 445         
 446         if (0 != setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &save, sz)) {
 447             return PMIX_ERR_UNREACH;
 448         }
 449     }
 450 
 451     return PMIX_SUCCESS;
 452 }
 453 
 454 static pmix_status_t send_bytes(int sd, char **buf, size_t *remain)
 455 {
 456     pmix_status_t ret = PMIX_SUCCESS;
 457     int rc;
 458     char *ptr = *buf;
 459     while (0 < *remain) {
 460         rc = write(sd, ptr, *remain);
 461         if (rc < 0) {
 462             if (pmix_socket_errno == EINTR) {
 463                 continue;
 464             } else if (pmix_socket_errno == EAGAIN) {
 465                 
 466 
 467 
 468 
 469                 ret = PMIX_ERR_RESOURCE_BUSY;
 470                 goto exit;
 471             } else if (pmix_socket_errno == EWOULDBLOCK) {
 472                 
 473 
 474 
 475 
 476                 ret = PMIX_ERR_WOULD_BLOCK;
 477                 goto exit;
 478             }
 479             
 480             pmix_output(0, "pmix_usock_msg_send_bytes: write failed: %s (%d) [sd = %d]",
 481                         strerror(pmix_socket_errno),
 482                         pmix_socket_errno, sd);
 483             ret = PMIX_ERR_COMM_FAILURE;
 484             goto exit;
 485         }
 486         
 487         (*remain) -= rc;
 488         ptr += rc;
 489     }
 490     
 491 exit:
 492     *buf = ptr;
 493     return ret;
 494 }
 495 
 496 static pmix_status_t read_bytes(int sd, char **buf, size_t *remain)
 497 {
 498     pmix_status_t ret = PMIX_SUCCESS;
 499     int rc;
 500     char *ptr = *buf;
 501 
 502     
 503     while (0 < *remain) {
 504         rc = read(sd, ptr, *remain);
 505         if (rc < 0) {
 506             if(pmix_socket_errno == EINTR) {
 507                 continue;
 508             } else if (pmix_socket_errno == EAGAIN) {
 509                 
 510 
 511 
 512 
 513                 ret = PMIX_ERR_RESOURCE_BUSY;
 514                 goto exit;
 515             } else if (pmix_socket_errno == EWOULDBLOCK) {
 516                 
 517 
 518 
 519 
 520                 ret = PMIX_ERR_WOULD_BLOCK;
 521                 goto exit;
 522             }
 523             
 524 
 525 
 526 
 527             pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 528                                 "pmix_usock_msg_recv: readv failed: %s (%d)",
 529                                 strerror(pmix_socket_errno),
 530                                 pmix_socket_errno);
 531             ret = PMIX_ERR_UNREACH;
 532             goto exit;
 533         } else if (0 == rc) {
 534             
 535             ret = PMIX_ERR_UNREACH;
 536             goto exit;
 537         }
 538         
 539         *remain -= rc;
 540         ptr += rc;
 541     }
 542     
 543 exit:
 544     *buf = ptr;
 545     return ret;
 546 }
 547 
 548 
 549 
 550 
 551 
 552 void pmix_usock_send_handler(int sd, short flags, void *cbdata)
 553 {
 554     pmix_peer_t *peer = (pmix_peer_t*)cbdata;
 555     pmix_ptl_send_t *msg = peer->send_msg;
 556     pmix_status_t rc;
 557     uint32_t nbytes;
 558 
 559     
 560     PMIX_ACQUIRE_OBJECT(peer);
 561 
 562     pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 563                         "%s:%d usock:send_handler SENDING TO PEER %s:%d tag %u with %s msg",
 564                         pmix_globals.myid.nspace, pmix_globals.myid.rank,
 565                         peer->info->pname.nspace, peer->info->pname.rank,
 566                         (NULL == msg) ? UINT_MAX : msg->hdr.tag,
 567                         (NULL == msg) ? "NULL" : "NON-NULL");
 568 
 569     if (NULL != msg) {
 570         if (!msg->hdr_sent) {
 571             if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) {
 572             
 573                 msg->hdr.pindex = ntohl(msg->hdr.pindex);
 574                 msg->hdr.tag = ntohl(msg->hdr.tag);
 575                 nbytes = msg->hdr.nbytes;
 576                 msg->hdr.nbytes = ntohl(nbytes);
 577             }
 578             pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 579                                 "usock:send_handler SENDING HEADER WITH MSG IDX %d TAG %d SIZE %lu",
 580                                 msg->hdr.pindex, msg->hdr.tag, (unsigned long)msg->hdr.nbytes);
 581             if (PMIX_SUCCESS == (rc = send_bytes(peer->sd, &msg->sdptr, &msg->sdbytes))) {
 582                 
 583                 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 584                                     "usock:send_handler HEADER SENT");
 585                 msg->hdr_sent = true;
 586                 
 587                 if (NULL == msg->data) {
 588                     
 589                     PMIX_RELEASE(msg);
 590                     peer->send_msg = NULL;
 591                     goto next;
 592                 } else {
 593                     
 594                     msg->sdptr = msg->data->base_ptr;
 595                     msg->sdbytes = msg->hdr.nbytes;
 596                 }
 597                 
 598             } else if (PMIX_ERR_RESOURCE_BUSY == rc ||
 599                        PMIX_ERR_WOULD_BLOCK == rc) {
 600                 
 601                 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 602                                     "usock:send_handler RES BUSY OR WOULD BLOCK");
 603                 if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) {
 604                     
 605                     msg->hdr.pindex = htonl(msg->hdr.pindex);
 606                     msg->hdr.tag = htonl(msg->hdr.tag);
 607                     nbytes = msg->hdr.nbytes;
 608                     msg->hdr.nbytes = htonl(nbytes);
 609                 }
 610                 
 611 
 612                 PMIX_POST_OBJECT(peer);
 613                 return;
 614             } else {
 615                 
 616                 pmix_event_del(&peer->send_event);
 617                 peer->send_ev_active = false;
 618                 PMIX_RELEASE(msg);
 619                 peer->send_msg = NULL;
 620                 pmix_ptl_base_lost_connection(peer, rc);
 621                 
 622 
 623                 PMIX_POST_OBJECT(peer);
 624                 return;
 625             }
 626         }
 627 
 628         if (msg->hdr_sent) {
 629             pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 630                                 "usock:send_handler SENDING BODY OF MSG");
 631             if (PMIX_SUCCESS == (rc = send_bytes(peer->sd, &msg->sdptr, &msg->sdbytes))) {
 632                 
 633                 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 634                                     "usock:send_handler BODY SENT");
 635                 PMIX_RELEASE(msg);
 636                 peer->send_msg = NULL;
 637             } else if (PMIX_ERR_RESOURCE_BUSY == rc ||
 638                        PMIX_ERR_WOULD_BLOCK == rc) {
 639                 
 640                 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 641                                     "usock:send_handler RES BUSY OR WOULD BLOCK");
 642                 
 643 
 644                 PMIX_POST_OBJECT(peer);
 645                 return;
 646             } else {
 647                 
 648                 pmix_output(0, "pmix_usock_peer_send_handler: unable to send message ON SOCKET %d",
 649                             peer->sd);
 650                 pmix_event_del(&peer->send_event);
 651                 peer->send_ev_active = false;
 652                 PMIX_RELEASE(msg);
 653                 peer->send_msg = NULL;
 654                 pmix_ptl_base_lost_connection(peer, rc);
 655                 
 656 
 657                 PMIX_POST_OBJECT(peer);
 658                 return;
 659             }
 660         }
 661 
 662     next:
 663         
 664 
 665 
 666 
 667 
 668 
 669         peer->send_msg = (pmix_ptl_send_t*)
 670             pmix_list_remove_first(&peer->send_queue);
 671     }
 672 
 673     
 674     if (NULL == peer->send_msg && peer->send_ev_active) {
 675         pmix_event_del(&peer->send_event);
 676         peer->send_ev_active = false;
 677     }
 678 
 679     
 680 
 681     PMIX_POST_OBJECT(peer);
 682 }
 683 
 684 
 685 
 686 
 687 
 688 
 689 void pmix_usock_recv_handler(int sd, short flags, void *cbdata)
 690 {
 691     pmix_status_t rc;
 692     pmix_peer_t *peer = (pmix_peer_t*)cbdata;
 693     pmix_ptl_recv_t *msg = NULL;
 694 
 695     
 696     PMIX_ACQUIRE_OBJECT(peer);
 697 
 698     pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 699                         "usock:recv:handler called with peer %s:%d",
 700                         (NULL == peer) ? "NULL" : peer->info->pname.nspace,
 701                         (NULL == peer) ? PMIX_RANK_UNDEF : peer->info->pname.rank);
 702 
 703     if (NULL == peer) {
 704         return;
 705     }
 706     
 707     if (NULL == peer->recv_msg) {
 708         pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 709                             "usock:recv:handler allocate new recv msg");
 710         peer->recv_msg = PMIX_NEW(pmix_ptl_recv_t);
 711         if (NULL == peer->recv_msg) {
 712             pmix_output(0, "usock_recv_handler: unable to allocate recv message\n");
 713             goto err_close;
 714         }
 715         PMIX_RETAIN(peer);
 716         peer->recv_msg->peer = peer;  
 717         
 718         peer->recv_msg->rdptr = (char*)&peer->recv_msg->hdr;
 719         peer->recv_msg->rdbytes = sizeof(pmix_usock_hdr_t);
 720     }
 721     msg = peer->recv_msg;
 722     msg->sd = sd;
 723     
 724     if (!msg->hdr_recvd) {
 725         pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 726                             "usock:recv:handler read hdr on socket %d", peer->sd);
 727         if (PMIX_SUCCESS == (rc = read_bytes(peer->sd, &msg->rdptr, &msg->rdbytes))) {
 728             
 729             peer->recv_msg->hdr_recvd = true;
 730             pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 731                                 "RECVD MSG FOR TAG %d SIZE %d",
 732                                 (int)peer->recv_msg->hdr.tag,
 733                                 (int)peer->recv_msg->hdr.nbytes);
 734             
 735             if (0 == peer->recv_msg->hdr.nbytes) {
 736                 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 737                                     "RECVD ZERO-BYTE MESSAGE FROM %s:%d for tag %d",
 738                                     peer->info->pname.nspace, peer->info->pname.rank,
 739                                     peer->recv_msg->hdr.tag);
 740                 peer->recv_msg->data = NULL;  
 741                 peer->recv_msg->rdptr = NULL;
 742                 peer->recv_msg->rdbytes = 0;
 743                 
 744                 PMIX_ACTIVATE_POST_MSG(peer->recv_msg);
 745                 peer->recv_msg = NULL;
 746                 PMIX_POST_OBJECT(peer);
 747                 return;
 748             } else {
 749                 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 750                                     "usock:recv:handler allocate data region of size %lu",
 751                                     (unsigned long)peer->recv_msg->hdr.nbytes);
 752                 
 753                 peer->recv_msg->data = (char*)malloc(peer->recv_msg->hdr.nbytes);
 754                 memset(peer->recv_msg->data, 0, peer->recv_msg->hdr.nbytes);
 755                 
 756                 peer->recv_msg->rdptr = peer->recv_msg->data;
 757                 peer->recv_msg->rdbytes = peer->recv_msg->hdr.nbytes;
 758             }
 759             
 760         } else if (PMIX_ERR_RESOURCE_BUSY == rc ||
 761                    PMIX_ERR_WOULD_BLOCK == rc) {
 762             
 763             return;
 764         } else {
 765             
 766 
 767 
 768             pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 769                                 "pmix_usock_msg_recv: peer closed connection");
 770             goto err_close;
 771         }
 772     }
 773 
 774     if (peer->recv_msg->hdr_recvd) {
 775         
 776 
 777 
 778 
 779         if (PMIX_SUCCESS == (rc = read_bytes(peer->sd, &msg->rdptr, &msg->rdbytes))) {
 780             
 781             pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 782                                 "RECVD COMPLETE MESSAGE FROM SERVER OF %d BYTES FOR TAG %d ON PEER SOCKET %d",
 783                                 (int)peer->recv_msg->hdr.nbytes,
 784                                 peer->recv_msg->hdr.tag, peer->sd);
 785             
 786             PMIX_ACTIVATE_POST_MSG(peer->recv_msg);
 787             peer->recv_msg = NULL;
 788             
 789 
 790             PMIX_POST_OBJECT(peer);
 791             return;
 792         } else if (PMIX_ERR_RESOURCE_BUSY == rc ||
 793                    PMIX_ERR_WOULD_BLOCK == rc) {
 794             
 795             
 796 
 797             PMIX_POST_OBJECT(peer);
 798             return;
 799         } else {
 800             
 801 
 802 
 803             pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 804                                 "pmix_usock_msg_recv: peer closed connection");
 805             goto err_close;
 806         }
 807     }
 808     
 809     return;
 810 
 811   err_close:
 812     
 813     if (peer->recv_ev_active) {
 814         pmix_event_del(&peer->recv_event);
 815         peer->recv_ev_active = false;
 816     }
 817     if (peer->send_ev_active) {
 818         pmix_event_del(&peer->send_event);
 819         peer->send_ev_active = false;
 820     }
 821     if (NULL != peer->recv_msg) {
 822         PMIX_RELEASE(peer->recv_msg);
 823         peer->recv_msg = NULL;
 824     }
 825     pmix_ptl_base_lost_connection(peer, PMIX_ERR_UNREACH);
 826     
 827 
 828     PMIX_POST_OBJECT(peer);
 829 }
 830 
 831 void pmix_usock_send_recv(int fd, short args, void *cbdata)
 832 {
 833     pmix_ptl_sr_t *ms = (pmix_ptl_sr_t*)cbdata;
 834     pmix_ptl_posted_recv_t *req;
 835     pmix_ptl_send_t *snd;
 836     uint32_t tag;
 837 
 838     
 839     PMIX_ACQUIRE_OBJECT(ms);
 840 
 841     if (ms->peer->sd < 0) {
 842         
 843         PMIX_RELEASE(ms);
 844         
 845 
 846         PMIX_POST_OBJECT(NULL);
 847         return;
 848     }
 849 
 850     
 851     pmix_ptl_globals.current_tag++;
 852     if (UINT32_MAX == pmix_ptl_globals.current_tag ) {
 853         pmix_ptl_globals.current_tag = PMIX_PTL_TAG_DYNAMIC;
 854     }
 855     tag = pmix_ptl_globals.current_tag;
 856 
 857     if (NULL != ms->cbfunc) {
 858         
 859         req = PMIX_NEW(pmix_ptl_posted_recv_t);
 860         req->tag = tag;
 861         req->cbfunc = ms->cbfunc;
 862         req->cbdata = ms->cbdata;
 863         pmix_output_verbose(5, pmix_ptl_base_framework.framework_output,
 864                             "posting recv on tag %d", req->tag);
 865         
 866 
 867 
 868         pmix_list_prepend(&pmix_ptl_globals.posted_recvs, &req->super);
 869     }
 870 
 871     snd = PMIX_NEW(pmix_ptl_send_t);
 872     snd->hdr.pindex = pmix_globals.pindex;
 873     snd->hdr.tag = tag;
 874     snd->hdr.nbytes = ms->bfr->bytes_used;
 875     snd->data = ms->bfr;
 876     
 877     snd->sdptr = (char*)&snd->hdr;
 878     snd->sdbytes = sizeof(pmix_usock_hdr_t);
 879 
 880     
 881     if (NULL == ms->peer->send_msg) {
 882         ms->peer->send_msg = snd;
 883     } else {
 884         
 885         pmix_list_append(&ms->peer->send_queue, &snd->super);
 886     }
 887     
 888     if (!ms->peer->send_ev_active) {
 889         ms->peer->send_ev_active = true;
 890         PMIX_POST_OBJECT(snd);
 891         pmix_event_add(&ms->peer->send_event, 0);
 892     }
 893     
 894     PMIX_RELEASE(ms);
 895     PMIX_POST_OBJECT(snd);
 896 }
 897 
 898 static void pmix_usock_send(int sd, short args, void *cbdata)
 899 {
 900     pmix_ptl_queue_t *queue = (pmix_ptl_queue_t*)cbdata;
 901     pmix_ptl_send_t *snd;
 902 
 903     
 904     PMIX_ACQUIRE_OBJECT(queue);
 905 
 906     if (NULL == queue->peer || queue->peer->sd < 0 ||
 907         NULL == queue->peer->info || NULL == queue->peer->nptr) {
 908         
 909         PMIX_RELEASE(queue);
 910         
 911 
 912         PMIX_POST_OBJECT(queue);
 913         return;
 914     }
 915 
 916     pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 917                         "[%s:%d] send to %s:%u on tag %d",
 918                         __FILE__, __LINE__,
 919                         (queue->peer)->info->pname.nspace,
 920                         (queue->peer)->info->pname.rank, (queue->tag));
 921 
 922     snd = PMIX_NEW(pmix_ptl_send_t);
 923     snd->hdr.pindex = htonl(pmix_globals.pindex);
 924     snd->hdr.tag = htonl(queue->tag);
 925     snd->hdr.nbytes = htonl((queue->buf)->bytes_used);
 926     snd->data = (queue->buf);
 927     
 928     snd->sdptr = (char*)&snd->hdr;
 929     snd->sdbytes = sizeof(pmix_ptl_hdr_t);
 930 
 931     
 932     if (NULL == (queue->peer)->send_msg) {
 933         (queue->peer)->send_msg = snd;
 934     } else {
 935         
 936         pmix_list_append(&(queue->peer)->send_queue, &snd->super);
 937     }
 938     
 939     if (!(queue->peer)->send_ev_active) {
 940         (queue->peer)->send_ev_active = true;
 941         PMIX_POST_OBJECT(queue->peer);
 942         pmix_event_add(&(queue->peer)->send_event, 0);
 943     }
 944     PMIX_RELEASE(queue);
 945     PMIX_POST_OBJECT(snd);
 946 }