This source file includes following definitions.
- _notify_complete
- lcfn
- pmix_ptl_base_lost_connection
- send_msg
- read_bytes
- pmix_ptl_base_send_handler
- pmix_ptl_base_recv_handler
- pmix_ptl_base_send
- pmix_ptl_base_send_recv
- pmix_ptl_base_process_msg
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 #include <src/include/pmix_config.h>
  17 
  18 #include <src/include/pmix_stdint.h>
  19 #include <src/include/pmix_socket_errno.h>
  20 
  21 #ifdef HAVE_STRING_H
  22 #include <string.h>
  23 #endif
  24 #include <fcntl.h>
  25 #ifdef HAVE_UNISTD_H
  26 #include <unistd.h>
  27 #endif
  28 #ifdef HAVE_SYS_SOCKET_H
  29 #include <sys/socket.h>
  30 #endif
  31 #ifdef HAVE_SYS_UN_H
  32 #include <sys/un.h>
  33 #endif
  34 #ifdef HAVE_SYS_UIO_H
  35 #include <sys/uio.h>
  36 #endif
  37 #ifdef HAVE_SYS_TYPES_H
  38 #include <sys/types.h>
  39 #endif
  40 
  41 #include "src/class/pmix_pointer_array.h"
  42 #include "src/include/pmix_globals.h"
  43 #include "src/client/pmix_client_ops.h"
  44 #include "src/server/pmix_server_ops.h"
  45 #include "src/util/error.h"
  46 #include "src/util/show_help.h"
  47 #include "src/mca/psensor/psensor.h"
  48 
  49 #include "src/mca/ptl/base/base.h"
  50 
  51 static void _notify_complete(pmix_status_t status, void *cbdata)
  52 {
  53     pmix_event_chain_t *chain = (pmix_event_chain_t*)cbdata;
  54     PMIX_RELEASE(chain);
  55 }
  56 
  57 static void lcfn(pmix_status_t status, void *cbdata)
  58 {
  59     pmix_peer_t *peer = (pmix_peer_t*)cbdata;
  60     PMIX_RELEASE(peer);
  61 }
  62 
  63 void pmix_ptl_base_lost_connection(pmix_peer_t *peer, pmix_status_t err)
  64 {
  65     pmix_server_trkr_t *trk, *tnxt;
  66     pmix_server_caddy_t *rinfo, *rnext;
  67     pmix_rank_info_t *info, *pinfo;
  68     pmix_ptl_posted_recv_t *rcv;
  69     pmix_buffer_t buf;
  70     pmix_ptl_hdr_t hdr;
  71     pmix_proc_t proc;
  72     pmix_status_t rc;
  73 
  74     
  75     if (peer->recv_ev_active) {
  76         pmix_event_del(&peer->recv_event);
  77         peer->recv_ev_active = false;
  78     }
  79     if (peer->send_ev_active) {
  80         pmix_event_del(&peer->send_event);
  81         peer->send_ev_active = false;
  82     }
  83     if (NULL != peer->recv_msg) {
  84         PMIX_RELEASE(peer->recv_msg);
  85         peer->recv_msg = NULL;
  86     }
  87     CLOSE_THE_SOCKET(peer->sd);
  88 
  89     if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer) &&
  90         !PMIX_PROC_IS_TOOL(pmix_globals.mypeer)) {
  91         
  92 
  93 
  94 
  95 
  96 
  97         PMIX_LIST_FOREACH_SAFE(trk, tnxt, &pmix_server_globals.collectives, pmix_server_trkr_t) {
  98             
  99             PMIX_LIST_FOREACH_SAFE(rinfo, rnext, &trk->local_cbs, pmix_server_caddy_t) {
 100                 if (!PMIX_CHECK_PROCID(&rinfo->peer->info->pname, &peer->info->pname)) {
 101                     continue;
 102                 }
 103                 
 104                 --trk->nlocal;
 105                 
 106                 pmix_list_remove_item(&trk->local_cbs, &rinfo->super);
 107                 PMIX_RELEASE(rinfo);
 108                 
 109 
 110 
 111                 if (trk->host_called) {
 112                     continue;
 113                 }
 114                 if (trk->def_complete && trk->nlocal == pmix_list_get_size(&trk->local_cbs)) {
 115                     
 116                     if (trk->local) {
 117                         
 118 
 119 
 120                         if (PMIX_FENCENB_CMD == trk->type) {
 121                             if (NULL != trk->modexcbfunc) {
 122                                 trk->modexcbfunc(PMIX_ERR_LOST_CONNECTION_TO_CLIENT, NULL, 0, trk, NULL, NULL);
 123                             }
 124                         } else if (PMIX_CONNECTNB_CMD == trk->type) {
 125                             if (NULL != trk->op_cbfunc) {
 126                                 trk->op_cbfunc(PMIX_ERR_LOST_CONNECTION_TO_CLIENT, trk);
 127                             }
 128                         } else if (PMIX_DISCONNECTNB_CMD == trk->type) {
 129                             if (NULL != trk->op_cbfunc) {
 130                                 trk->op_cbfunc(PMIX_ERR_LOST_CONNECTION_TO_CLIENT, trk);
 131                             }
 132                         }
 133                     } else {
 134                         
 135 
 136 
 137 
 138                         if (PMIX_FENCENB_CMD == trk->type) {
 139                             trk->host_called = true;
 140                             rc = pmix_host_server.fence_nb(trk->pcs, trk->npcs,
 141                                                            trk->info, trk->ninfo,
 142                                                            NULL, 0, trk->modexcbfunc, trk);
 143                             if (PMIX_SUCCESS != rc) {
 144                                 pmix_list_remove_item(&pmix_server_globals.collectives, &trk->super);
 145                                 PMIX_RELEASE(trk);
 146                             }
 147                         } else if (PMIX_CONNECTNB_CMD == trk->type) {
 148                             trk->host_called = true;
 149                             rc = pmix_host_server.connect(trk->pcs, trk->npcs, trk->info, trk->ninfo, trk->op_cbfunc, trk);
 150                             if (PMIX_SUCCESS != rc) {
 151                                 pmix_list_remove_item(&pmix_server_globals.collectives, &trk->super);
 152                                 PMIX_RELEASE(trk);
 153                             }
 154                         } else if (PMIX_DISCONNECTNB_CMD == trk->type) {
 155                             trk->host_called = true;
 156                             rc = pmix_host_server.disconnect(trk->pcs, trk->npcs, trk->info, trk->ninfo, trk->op_cbfunc, trk);
 157                             if (PMIX_SUCCESS != rc) {
 158                                 pmix_list_remove_item(&pmix_server_globals.collectives, &trk->super);
 159                                 PMIX_RELEASE(trk);
 160                             }
 161                         }
 162                     }
 163                 }
 164             }
 165         }
 166 
 167         
 168 
 169 
 170         PMIX_LIST_FOREACH_SAFE(info, pinfo, &(peer->nptr->ranks), pmix_rank_info_t) {
 171             if (info == peer->info) {
 172                 pmix_list_remove_item(&(peer->nptr->ranks), &(peer->info->super));
 173             }
 174         }
 175         
 176         if (0 < peer->nptr->nlocalprocs) {
 177             --peer->nptr->nlocalprocs;
 178         }
 179 
 180         
 181         pmix_pointer_array_set_item(&pmix_server_globals.clients,
 182                                     peer->index, NULL);
 183 
 184         
 185         pmix_server_purge_events(peer, NULL);
 186 
 187         if (PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer)) {
 188             
 189             pmix_globals.connected = false;
 190         } else {
 191             
 192             pmix_psensor.stop(peer, NULL);
 193         }
 194 
 195         if (!peer->finalized && !PMIX_PROC_IS_TOOL(peer) && !pmix_globals.mypeer->finalized) {
 196             
 197 
 198 
 199 
 200             PMIX_REPORT_EVENT(err, peer, PMIX_RANGE_PROC_LOCAL, _notify_complete);
 201         }
 202         
 203         PMIX_RELEASE(peer->info);
 204 
 205         
 206 
 207 
 208         if (NULL != pmix_host_server.client_finalized && !peer->finalized) {
 209             pmix_strncpy(proc.nspace, peer->info->pname.nspace, PMIX_MAX_NSLEN);
 210             proc.rank = peer->info->pname.rank;
 211             
 212             rc = pmix_host_server.client_finalized(&proc, peer->info->server_object,
 213                                                    lcfn, peer);
 214             if (PMIX_SUCCESS == rc) {
 215                 
 216                 peer->finalized = true;
 217                 return;
 218             }
 219         }
 220         
 221 
 222         peer->finalized = true;
 223         
 224         PMIX_RELEASE(peer);
 225      } else {
 226         
 227 
 228         pmix_globals.connected = false;
 229          
 230         err = PMIX_ERR_LOST_CONNECTION_TO_SERVER;
 231         
 232 
 233 
 234 
 235 
 236 
 237 
 238         PMIX_CONSTRUCT(&buf, pmix_buffer_t);
 239         
 240         buf.type = pmix_client_globals.myserver->nptr->compat.type;
 241         hdr.nbytes = 0; 
 242         PMIX_LIST_FOREACH(rcv, &pmix_ptl_globals.posted_recvs, pmix_ptl_posted_recv_t) {
 243             if (UINT_MAX != rcv->tag && NULL != rcv->cbfunc) {
 244                 
 245                 hdr.tag = rcv->tag;
 246                 rcv->cbfunc(pmix_globals.mypeer, &hdr, &buf, rcv->cbdata);
 247             }
 248         }
 249         PMIX_DESTRUCT(&buf);
 250         
 251         if (!pmix_globals.mypeer->finalized) {
 252             PMIX_REPORT_EVENT(err, pmix_client_globals.myserver, PMIX_RANGE_PROC_LOCAL, _notify_complete);
 253         }
 254     }
 255 }
 256 
 257 static pmix_status_t send_msg(int sd, pmix_ptl_send_t *msg)
 258 {
 259     struct iovec iov[2];
 260     int iov_count;
 261     ssize_t remain = msg->sdbytes, rc;
 262 
 263     iov[0].iov_base = msg->sdptr;
 264     iov[0].iov_len = msg->sdbytes;
 265     if (!msg->hdr_sent && NULL != msg->data) {
 266         iov[1].iov_base = msg->data->base_ptr;
 267         iov[1].iov_len = ntohl(msg->hdr.nbytes);
 268         remain += ntohl(msg->hdr.nbytes);
 269         iov_count = 2;
 270     } else {
 271         iov_count = 1;
 272     }
 273   retry:
 274     rc = writev(sd, iov, iov_count);
 275     if (PMIX_LIKELY(rc == remain)) {
 276         
 277         msg->hdr_sent = true;
 278         msg->sdbytes = 0;
 279         msg->sdptr = (char *)iov[iov_count-1].iov_base + iov[iov_count-1].iov_len;
 280         return PMIX_SUCCESS;
 281     } else if (rc < 0) {
 282         if (pmix_socket_errno == EINTR) {
 283             goto retry;
 284         } else if (pmix_socket_errno == EAGAIN) {
 285             
 286 
 287 
 288 
 289             return PMIX_ERR_RESOURCE_BUSY;
 290         } else if (pmix_socket_errno == EWOULDBLOCK) {
 291             
 292 
 293 
 294 
 295             return PMIX_ERR_WOULD_BLOCK;
 296         } else {
 297             
 298             pmix_output(0, "pmix_ptl_base: send_msg: write failed: %s (%d) [sd = %d]",
 299                         strerror(pmix_socket_errno),
 300                         pmix_socket_errno, sd);
 301             return PMIX_ERR_UNREACH;
 302         }
 303     } else {
 304         
 305 
 306 
 307         if ((size_t)rc < msg->sdbytes) {
 308             
 309             msg->sdptr = (char *)msg->sdptr + rc;
 310             msg->sdbytes -= rc;
 311         } else {
 312             
 313             msg->hdr_sent = true;
 314             rc -= msg->sdbytes;
 315             if (NULL != msg->data) {
 316                 
 317 
 318 
 319 
 320 
 321 
 322                 msg->sdptr = (char *)msg->data->base_ptr + rc;
 323             }
 324             msg->sdbytes = ntohl(msg->hdr.nbytes) - rc;
 325         }
 326         return PMIX_ERR_RESOURCE_BUSY;
 327     }
 328 }
 329 
 330 static pmix_status_t read_bytes(int sd, char **buf, size_t *remain)
 331 {
 332     pmix_status_t ret = PMIX_SUCCESS;
 333     int rc;
 334     char *ptr = *buf;
 335 
 336     
 337     while (0 < *remain) {
 338         rc = read(sd, ptr, *remain);
 339         if (rc < 0) {
 340             if(pmix_socket_errno == EINTR) {
 341                 continue;
 342             } else if (pmix_socket_errno == EAGAIN) {
 343                 
 344 
 345 
 346 
 347                 ret = PMIX_ERR_RESOURCE_BUSY;
 348                 goto exit;
 349             } else if (pmix_socket_errno == EWOULDBLOCK) {
 350                 
 351 
 352 
 353 
 354                 ret = PMIX_ERR_WOULD_BLOCK;
 355                 goto exit;
 356             }
 357             
 358 
 359 
 360 
 361             pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 362                                 "pmix_ptl_base_msg_recv: readv failed: %s (%d)",
 363                                 strerror(pmix_socket_errno),
 364                                 pmix_socket_errno);
 365             ret = PMIX_ERR_UNREACH;
 366             goto exit;
 367         } else if (0 == rc) {
 368             
 369             ret = PMIX_ERR_UNREACH;
 370             goto exit;
 371         }
 372         
 373         *remain -= rc;
 374         ptr += rc;
 375     }
 376     
 377   exit:
 378     *buf = ptr;
 379     return ret;
 380 }
 381 
 382 
 383 
 384 
 385 
 386 void pmix_ptl_base_send_handler(int sd, short flags, void *cbdata)
 387 {
 388     pmix_peer_t *peer = (pmix_peer_t*)cbdata;
 389     pmix_ptl_send_t *msg = peer->send_msg;
 390     pmix_status_t rc;
 391 
 392     
 393     PMIX_ACQUIRE_OBJECT(peer);
 394 
 395     pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 396                         "%s:%d ptl:base:send_handler SENDING TO PEER %s:%d tag %u with %s msg",
 397                         pmix_globals.myid.nspace, pmix_globals.myid.rank,
 398                         peer->info->pname.nspace, peer->info->pname.rank,
 399                         (NULL == msg) ? UINT_MAX : ntohl(msg->hdr.tag),
 400                         (NULL == msg) ? "NULL" : "NON-NULL");
 401 
 402     if (NULL != msg) {
 403         pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 404                             "ptl:base:send_handler SENDING MSG TO %s:%d TAG %u",
 405                             peer->info->pname.nspace, peer->info->pname.rank,
 406                             ntohl(msg->hdr.tag));
 407         if (PMIX_SUCCESS == (rc = send_msg(peer->sd, msg))) {
 408             
 409             pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 410                                 "ptl:base:send_handler MSG SENT");
 411             PMIX_RELEASE(msg);
 412             peer->send_msg = NULL;
 413         } else if (PMIX_ERR_RESOURCE_BUSY == rc ||
 414                    PMIX_ERR_WOULD_BLOCK == rc) {
 415             
 416             pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 417                                 "ptl:base:send_handler RES BUSY OR WOULD BLOCK");
 418             
 419 
 420             PMIX_POST_OBJECT(peer);
 421             return;
 422         } else {
 423             pmix_output_verbose(5, pmix_ptl_base_framework.framework_output,
 424                                 "%s:%d SEND ERROR %s",
 425                                 pmix_globals.myid.nspace, pmix_globals.myid.rank,
 426                                 PMIx_Error_string(rc));
 427             
 428             pmix_event_del(&peer->send_event);
 429             peer->send_ev_active = false;
 430             PMIX_RELEASE(msg);
 431             peer->send_msg = NULL;
 432             pmix_ptl_base_lost_connection(peer, rc);
 433             
 434 
 435             PMIX_POST_OBJECT(peer);
 436             return;
 437         }
 438 
 439         
 440 
 441 
 442 
 443 
 444 
 445         peer->send_msg = (pmix_ptl_send_t*)
 446             pmix_list_remove_first(&peer->send_queue);
 447     }
 448 
 449     
 450     if (NULL == peer->send_msg && peer->send_ev_active) {
 451         pmix_event_del(&peer->send_event);
 452         peer->send_ev_active = false;
 453     }
 454     
 455 
 456     PMIX_POST_OBJECT(peer);
 457 }
 458 
 459 
 460 
 461 
 462 
 463 
 464 void pmix_ptl_base_recv_handler(int sd, short flags, void *cbdata)
 465 {
 466     pmix_status_t rc;
 467     pmix_peer_t *peer = (pmix_peer_t*)cbdata;
 468     pmix_ptl_recv_t *msg = NULL;
 469     pmix_ptl_hdr_t hdr;
 470     size_t nbytes;
 471     char *ptr;
 472 
 473     
 474     PMIX_ACQUIRE_OBJECT(peer);
 475 
 476     pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 477                         "%s:%d ptl:base:recv:handler called with peer %s:%d",
 478                         pmix_globals.myid.nspace, pmix_globals.myid.rank,
 479                         (NULL == peer) ? "NULL" : peer->info->pname.nspace,
 480                         (NULL == peer) ? PMIX_RANK_UNDEF : peer->info->pname.rank);
 481 
 482     if (NULL == peer) {
 483         return;
 484     }
 485     
 486     if (NULL == peer->recv_msg) {
 487         pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 488                             "ptl:base:recv:handler allocate new recv msg");
 489         peer->recv_msg = PMIX_NEW(pmix_ptl_recv_t);
 490         if (NULL == peer->recv_msg) {
 491             pmix_output(0, "sptl:base:recv_handler: unable to allocate recv message\n");
 492             goto err_close;
 493         }
 494         PMIX_RETAIN(peer);
 495         peer->recv_msg->peer = peer;  
 496         
 497         peer->recv_msg->rdptr = (char*)&peer->recv_msg->hdr;
 498         peer->recv_msg->rdbytes = sizeof(pmix_ptl_hdr_t);
 499     }
 500     msg = peer->recv_msg;
 501     msg->sd = sd;
 502     
 503     if (!msg->hdr_recvd) {
 504          pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 505                             "ptl:base:recv:handler read hdr on socket %d", peer->sd);
 506         nbytes = sizeof(pmix_ptl_hdr_t);
 507         ptr = (char*)&hdr;
 508         if (PMIX_SUCCESS == (rc = read_bytes(peer->sd, &ptr, &nbytes))) {
 509             
 510             peer->recv_msg->hdr_recvd = true;
 511             
 512             peer->recv_msg->hdr.pindex = ntohl(hdr.pindex);
 513             peer->recv_msg->hdr.tag = ntohl(hdr.tag);
 514             peer->recv_msg->hdr.nbytes = ntohl(hdr.nbytes);
 515             pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 516                                 "RECVD MSG FOR TAG %d SIZE %d",
 517                                 (int)peer->recv_msg->hdr.tag,
 518                                 (int)peer->recv_msg->hdr.nbytes);
 519             
 520             if (0 == peer->recv_msg->hdr.nbytes) {
 521                 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 522                                     "RECVD ZERO-BYTE MESSAGE FROM %s:%u for tag %d",
 523                                     peer->info->pname.nspace, peer->info->pname.rank,
 524                                     peer->recv_msg->hdr.tag);
 525                 peer->recv_msg->data = NULL;  
 526                 peer->recv_msg->rdptr = NULL;
 527                 peer->recv_msg->rdbytes = 0;
 528                 
 529                 PMIX_ACTIVATE_POST_MSG(peer->recv_msg);
 530                 peer->recv_msg = NULL;
 531                 PMIX_POST_OBJECT(peer);
 532                 return;
 533             } else {
 534                 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 535                                     "ptl:base:recv:handler allocate data region of size %lu",
 536                                     (unsigned long)peer->recv_msg->hdr.nbytes);
 537                 
 538                 if (pmix_ptl_globals.max_msg_size < peer->recv_msg->hdr.nbytes) {
 539                     pmix_show_help("help-pmix-runtime.txt", "ptl:msg_size", true,
 540                                    (unsigned long)peer->recv_msg->hdr.nbytes,
 541                                    (unsigned long)pmix_ptl_globals.max_msg_size);
 542                     goto err_close;
 543                 }
 544                 peer->recv_msg->data = (char*)malloc(peer->recv_msg->hdr.nbytes);
 545                 memset(peer->recv_msg->data, 0, peer->recv_msg->hdr.nbytes);
 546                 
 547                 peer->recv_msg->rdptr = peer->recv_msg->data;
 548                 peer->recv_msg->rdbytes = peer->recv_msg->hdr.nbytes;
 549             }
 550             
 551         } else if (PMIX_ERR_RESOURCE_BUSY == rc ||
 552                    PMIX_ERR_WOULD_BLOCK == rc) {
 553             
 554             return;
 555         } else {
 556             
 557 
 558 
 559             pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 560                                 "ptl:base:msg_recv: peer %s:%d closed connection",
 561                                 peer->nptr->nspace, peer->info->pname.rank);
 562             goto err_close;
 563         }
 564     }
 565 
 566     if (peer->recv_msg->hdr_recvd) {
 567         
 568 
 569 
 570 
 571         if (PMIX_SUCCESS == (rc = read_bytes(peer->sd, &msg->rdptr, &msg->rdbytes))) {
 572             
 573             pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 574                                 "%s:%d RECVD COMPLETE MESSAGE FROM SERVER OF %d BYTES FOR TAG %d ON PEER SOCKET %d",
 575                                 pmix_globals.myid.nspace, pmix_globals.myid.rank,
 576                                 (int)peer->recv_msg->hdr.nbytes,
 577                                 peer->recv_msg->hdr.tag, peer->sd);
 578             
 579             PMIX_ACTIVATE_POST_MSG(peer->recv_msg);
 580             peer->recv_msg = NULL;
 581             
 582 
 583             PMIX_POST_OBJECT(peer);
 584             return;
 585         } else if (PMIX_ERR_RESOURCE_BUSY == rc ||
 586                    PMIX_ERR_WOULD_BLOCK == rc) {
 587             
 588             
 589 
 590             PMIX_POST_OBJECT(peer);
 591             return;
 592         } else {
 593             
 594 
 595 
 596             pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 597                                 "%s:%d ptl:base:msg_recv: peer %s:%d closed connection",
 598                                 pmix_globals.myid.nspace, pmix_globals.myid.rank,
 599                                 peer->nptr->nspace, peer->info->pname.rank);
 600             goto err_close;
 601         }
 602     }
 603     
 604     return;
 605 
 606   err_close:
 607     
 608     if (peer->recv_ev_active) {
 609         pmix_event_del(&peer->recv_event);
 610         peer->recv_ev_active = false;
 611     }
 612     if (peer->send_ev_active) {
 613         pmix_event_del(&peer->send_event);
 614         peer->send_ev_active = false;
 615     }
 616     if (NULL != peer->recv_msg) {
 617         PMIX_RELEASE(peer->recv_msg);
 618         peer->recv_msg = NULL;
 619     }
 620     pmix_ptl_base_lost_connection(peer, PMIX_ERR_UNREACH);
 621     
 622 
 623     PMIX_POST_OBJECT(peer);
 624 }
 625 
 626 void pmix_ptl_base_send(int sd, short args, void *cbdata)
 627 {
 628     pmix_ptl_queue_t *queue = (pmix_ptl_queue_t*)cbdata;
 629     pmix_ptl_send_t *snd;
 630 
 631     
 632     PMIX_ACQUIRE_OBJECT(queue);
 633 
 634     if (NULL == queue->peer || queue->peer->sd < 0 ||
 635         NULL == queue->peer->info || NULL == queue->peer->nptr) {
 636         
 637         if (NULL != queue->buf) {
 638             PMIX_RELEASE(queue->buf);
 639         }
 640         PMIX_RELEASE(queue);
 641         return;
 642     }
 643 
 644     pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 645                         "[%s:%d] send to %s:%u on tag %d",
 646                         __FILE__, __LINE__,
 647                         (queue->peer)->info->pname.nspace,
 648                         (queue->peer)->info->pname.rank, (queue->tag));
 649 
 650     if (NULL == queue->buf) {
 651         
 652         PMIX_RELEASE(queue);
 653         return;
 654     }
 655 
 656     snd = PMIX_NEW(pmix_ptl_send_t);
 657     snd->hdr.pindex = htonl(pmix_globals.pindex);
 658     snd->hdr.tag = htonl(queue->tag);
 659     snd->hdr.nbytes = htonl((queue->buf)->bytes_used);
 660     snd->data = (queue->buf);
 661     
 662     snd->sdptr = (char*)&snd->hdr;
 663     snd->sdbytes = sizeof(pmix_ptl_hdr_t);
 664 
 665     
 666     if (NULL == (queue->peer)->send_msg) {
 667         (queue->peer)->send_msg = snd;
 668     } else {
 669         
 670         pmix_list_append(&(queue->peer)->send_queue, &snd->super);
 671     }
 672     
 673     if (!(queue->peer)->send_ev_active) {
 674         (queue->peer)->send_ev_active = true;
 675         PMIX_POST_OBJECT(queue->peer);
 676         pmix_event_add(&(queue->peer)->send_event, 0);
 677     }
 678     PMIX_RELEASE(queue);
 679     PMIX_POST_OBJECT(snd);
 680 }
 681 
 682 void pmix_ptl_base_send_recv(int fd, short args, void *cbdata)
 683 {
 684     pmix_ptl_sr_t *ms = (pmix_ptl_sr_t*)cbdata;
 685     pmix_ptl_posted_recv_t *req;
 686     pmix_ptl_send_t *snd;
 687     uint32_t tag;
 688 
 689     
 690     PMIX_ACQUIRE_OBJECT(ms);
 691 
 692     if (NULL == ms->peer || ms->peer->sd < 0 ||
 693         NULL == ms->peer->info || NULL == ms->peer->nptr) {
 694         
 695         if (NULL != ms->bfr) {
 696             PMIX_RELEASE(ms->bfr);
 697         }
 698         PMIX_RELEASE(ms);
 699         return;
 700     }
 701 
 702     if (NULL == ms->bfr) {
 703         
 704         PMIX_RELEASE(ms);
 705         return;
 706     }
 707 
 708     
 709     pmix_ptl_globals.current_tag++;
 710     if (UINT32_MAX == pmix_ptl_globals.current_tag ) {
 711         pmix_ptl_globals.current_tag = PMIX_PTL_TAG_DYNAMIC;
 712     }
 713     tag = pmix_ptl_globals.current_tag;
 714 
 715     if (NULL != ms->cbfunc) {
 716         
 717         req = PMIX_NEW(pmix_ptl_posted_recv_t);
 718         req->tag = tag;
 719         req->cbfunc = ms->cbfunc;
 720         req->cbdata = ms->cbdata;
 721 
 722         pmix_output_verbose(5, pmix_ptl_base_framework.framework_output,
 723                             "posting recv on tag %d", req->tag);
 724         
 725 
 726 
 727         pmix_list_prepend(&pmix_ptl_globals.posted_recvs, &req->super);
 728     }
 729 
 730     pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 731                         "QUEIENG MSG TO SERVER OF SIZE %d",
 732                         (int)ms->bfr->bytes_used);
 733     snd = PMIX_NEW(pmix_ptl_send_t);
 734     snd->hdr.pindex = htonl(pmix_globals.pindex);
 735     snd->hdr.tag = htonl(tag);
 736     snd->hdr.nbytes = htonl(ms->bfr->bytes_used);
 737     snd->data = ms->bfr;
 738     
 739     snd->sdptr = (char*)&snd->hdr;
 740     snd->sdbytes = sizeof(pmix_ptl_hdr_t);
 741 
 742     
 743     if (NULL == ms->peer->send_msg) {
 744         ms->peer->send_msg = snd;
 745     } else {
 746         
 747         pmix_list_append(&ms->peer->send_queue, &snd->super);
 748     }
 749     
 750     if (!ms->peer->send_ev_active) {
 751         ms->peer->send_ev_active = true;
 752         PMIX_POST_OBJECT(snd);
 753         pmix_event_add(&ms->peer->send_event, 0);
 754     }
 755     
 756     PMIX_RELEASE(ms);
 757     PMIX_POST_OBJECT(snd);
 758 }
 759 
 760 void pmix_ptl_base_process_msg(int fd, short flags, void *cbdata)
 761 {
 762     pmix_ptl_recv_t *msg = (pmix_ptl_recv_t*)cbdata;
 763     pmix_ptl_posted_recv_t *rcv;
 764     pmix_buffer_t buf;
 765 
 766     
 767     PMIX_ACQUIRE_OBJECT(msg);
 768 
 769     pmix_output_verbose(5, pmix_ptl_base_framework.framework_output,
 770                         "%s:%d message received %d bytes for tag %u on socket %d",
 771                         pmix_globals.myid.nspace, pmix_globals.myid.rank,
 772                         (int)msg->hdr.nbytes, msg->hdr.tag, msg->sd);
 773 
 774     
 775     PMIX_LIST_FOREACH(rcv, &pmix_ptl_globals.posted_recvs, pmix_ptl_posted_recv_t) {
 776         pmix_output_verbose(5, pmix_ptl_base_framework.framework_output,
 777                             "checking msg on tag %u for tag %u",
 778                             msg->hdr.tag, rcv->tag);
 779 
 780         if (msg->hdr.tag == rcv->tag || UINT_MAX == rcv->tag) {
 781             if (NULL != rcv->cbfunc) {
 782                 
 783                 PMIX_CONSTRUCT(&buf, pmix_buffer_t);
 784                 if (NULL != msg->data) {
 785                     PMIX_LOAD_BUFFER(msg->peer, &buf, msg->data, msg->hdr.nbytes);
 786                 } else {
 787                     
 788 
 789                     buf.type = msg->peer->nptr->compat.type;
 790                 }
 791                 msg->data = NULL;  
 792                 pmix_output_verbose(5, pmix_ptl_base_framework.framework_output,
 793                                      "%s:%d EXECUTE CALLBACK for tag %u",
 794                                      pmix_globals.myid.nspace, pmix_globals.myid.rank,
 795                                      msg->hdr.tag);
 796                 rcv->cbfunc(msg->peer, &msg->hdr, &buf, rcv->cbdata);
 797                 pmix_output_verbose(5, pmix_ptl_base_framework.framework_output,
 798                                     "%s:%d CALLBACK COMPLETE",
 799                                     pmix_globals.myid.nspace, pmix_globals.myid.rank);
 800                 PMIX_DESTRUCT(&buf);  
 801             }
 802             
 803             if (PMIX_PTL_TAG_DYNAMIC <= rcv->tag && UINT_MAX != rcv->tag) {
 804                 pmix_list_remove_item(&pmix_ptl_globals.posted_recvs, &rcv->super);
 805                 PMIX_RELEASE(rcv);
 806             }
 807             PMIX_RELEASE(msg);
 808             return;
 809         }
 810     }
 811 
 812     
 813 
 814     if (PMIX_PTL_TAG_DYNAMIC <= msg->hdr.tag) {
 815         pmix_output(0, "UNEXPECTED MESSAGE tag = %d from source %s:%d",
 816                     msg->hdr.tag, msg->peer->info->pname.nspace,
 817                     msg->peer->info->pname.rank);
 818         PMIX_REPORT_EVENT(PMIX_ERROR, msg->peer, PMIX_RANGE_NAMESPACE, _notify_complete);
 819         PMIX_RELEASE(msg);
 820         return;
 821     }
 822 
 823     
 824 
 825     pmix_list_append(&pmix_ptl_globals.unexpected_msgs, &msg->super);
 826     
 827 
 828     PMIX_POST_OBJECT(msg);
 829 }