root/opal/mca/pmix/pmix4x/pmix/src/mca/ptl/base/ptl_base_sendrecv.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. _notify_complete
  2. lcfn
  3. pmix_ptl_base_lost_connection
  4. send_msg
  5. read_bytes
  6. pmix_ptl_base_send_handler
  7. pmix_ptl_base_recv_handler
  8. pmix_ptl_base_send
  9. pmix_ptl_base_send_recv
  10. pmix_ptl_base_process_msg

   1 /*
   2  * Copyright (c) 2014-2019 Intel, Inc.  All rights reserved.
   3  * Copyright (c) 2014      Artem Y. Polyakov <artpol84@gmail.com>.
   4  *                         All rights reserved.
   5  * Copyright (c) 2015-2019 Research Organization for Information Science
   6  *                         and Technology (RIST).  All rights reserved.
   7  * Copyright (c) 2016      Mellanox Technologies, Inc.
   8  *                         All rights reserved.
   9  * Copyright (c) 2016      IBM Corporation.  All rights reserved.
  10  * $COPYRIGHT$
  11  *
  12  * Additional copyrights may follow
  13  *
  14  * $HEADER$
  15  */
  16 #include <src/include/pmix_config.h>
  17 
  18 #include <src/include/pmix_stdint.h>
  19 #include <src/include/pmix_socket_errno.h>
  20 
  21 #ifdef HAVE_STRING_H
  22 #include <string.h>
  23 #endif
  24 #include <fcntl.h>
  25 #ifdef HAVE_UNISTD_H
  26 #include <unistd.h>
  27 #endif
  28 #ifdef HAVE_SYS_SOCKET_H
  29 #include <sys/socket.h>
  30 #endif
  31 #ifdef HAVE_SYS_UN_H
  32 #include <sys/un.h>
  33 #endif
  34 #ifdef HAVE_SYS_UIO_H
  35 #include <sys/uio.h>
  36 #endif
  37 #ifdef HAVE_SYS_TYPES_H
  38 #include <sys/types.h>
  39 #endif
  40 
  41 #include "src/class/pmix_pointer_array.h"
  42 #include "src/include/pmix_globals.h"
  43 #include "src/client/pmix_client_ops.h"
  44 #include "src/server/pmix_server_ops.h"
  45 #include "src/util/error.h"
  46 #include "src/util/show_help.h"
  47 #include "src/mca/psensor/psensor.h"
  48 
  49 #include "src/mca/ptl/base/base.h"
  50 
  51 static void _notify_complete(pmix_status_t status, void *cbdata)
  52 {
  53     pmix_event_chain_t *chain = (pmix_event_chain_t*)cbdata;
  54     PMIX_RELEASE(chain);
  55 }
  56 
  57 static void lcfn(pmix_status_t status, void *cbdata)
  58 {
  59     pmix_peer_t *peer = (pmix_peer_t*)cbdata;
  60     PMIX_RELEASE(peer);
  61 }
  62 
  63 void pmix_ptl_base_lost_connection(pmix_peer_t *peer, pmix_status_t err)
  64 {
  65     pmix_server_trkr_t *trk, *tnxt;
  66     pmix_server_caddy_t *rinfo, *rnext;
  67     pmix_rank_info_t *info, *pinfo;
  68     pmix_ptl_posted_recv_t *rcv;
  69     pmix_buffer_t buf;
  70     pmix_ptl_hdr_t hdr;
  71     pmix_proc_t proc;
  72     pmix_status_t rc;
  73 
  74     /* stop all events */
  75     if (peer->recv_ev_active) {
  76         pmix_event_del(&peer->recv_event);
  77         peer->recv_ev_active = false;
  78     }
  79     if (peer->send_ev_active) {
  80         pmix_event_del(&peer->send_event);
  81         peer->send_ev_active = false;
  82     }
  83     if (NULL != peer->recv_msg) {
  84         PMIX_RELEASE(peer->recv_msg);
  85         peer->recv_msg = NULL;
  86     }
  87     CLOSE_THE_SOCKET(peer->sd);
  88 
  89     if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer) &&
  90         !PMIX_PROC_IS_TOOL(pmix_globals.mypeer)) {
  91         /* if I am a server, then we need to ensure that
  92          * we properly account for the loss of this client
  93          * from any local collectives in which it was
  94          * participating - note that the proc would not
  95          * have been added to any collective tracker until
  96          * after it successfully connected */
  97         PMIX_LIST_FOREACH_SAFE(trk, tnxt, &pmix_server_globals.collectives, pmix_server_trkr_t) {
  98             /* see if this proc is participating in this tracker */
  99             PMIX_LIST_FOREACH_SAFE(rinfo, rnext, &trk->local_cbs, pmix_server_caddy_t) {
 100                 if (!PMIX_CHECK_PROCID(&rinfo->peer->info->pname, &peer->info->pname)) {
 101                     continue;
 102                 }
 103                 /* it is - adjust the count */
 104                 --trk->nlocal;
 105                 /* remove it from the list */
 106                 pmix_list_remove_item(&trk->local_cbs, &rinfo->super);
 107                 PMIX_RELEASE(rinfo);
 108                 /* if the host has already been called for this tracker,
 109                  * then do nothing here - just wait for the host to return
 110                  * from the operation */
 111                 if (trk->host_called) {
 112                     continue;
 113                 }
 114                 if (trk->def_complete && trk->nlocal == pmix_list_get_size(&trk->local_cbs)) {
 115                     /* if this is a local-only collective, then resolve it now */
 116                     if (trk->local) {
 117                         /* everyone else has called in - we need to let them know
 118                          * that this proc has disappeared
 119                          * as otherwise the collective will never complete */
 120                         if (PMIX_FENCENB_CMD == trk->type) {
 121                             if (NULL != trk->modexcbfunc) {
 122                                 trk->modexcbfunc(PMIX_ERR_LOST_CONNECTION_TO_CLIENT, NULL, 0, trk, NULL, NULL);
 123                             }
 124                         } else if (PMIX_CONNECTNB_CMD == trk->type) {
 125                             if (NULL != trk->op_cbfunc) {
 126                                 trk->op_cbfunc(PMIX_ERR_LOST_CONNECTION_TO_CLIENT, trk);
 127                             }
 128                         } else if (PMIX_DISCONNECTNB_CMD == trk->type) {
 129                             if (NULL != trk->op_cbfunc) {
 130                                 trk->op_cbfunc(PMIX_ERR_LOST_CONNECTION_TO_CLIENT, trk);
 131                             }
 132                         }
 133                     } else {
 134                         /* if the host has not been called, then we need to see if
 135                          * the collective is locally complete without this lost
 136                          * participant. If so, then we need to pass the call
 137                          * up to the host as otherwise the global collective will hang */
 138                         if (PMIX_FENCENB_CMD == trk->type) {
 139                             trk->host_called = true;
 140                             rc = pmix_host_server.fence_nb(trk->pcs, trk->npcs,
 141                                                            trk->info, trk->ninfo,
 142                                                            NULL, 0, trk->modexcbfunc, trk);
 143                             if (PMIX_SUCCESS != rc) {
 144                                 pmix_list_remove_item(&pmix_server_globals.collectives, &trk->super);
 145                                 PMIX_RELEASE(trk);
 146                             }
 147                         } else if (PMIX_CONNECTNB_CMD == trk->type) {
 148                             trk->host_called = true;
 149                             rc = pmix_host_server.connect(trk->pcs, trk->npcs, trk->info, trk->ninfo, trk->op_cbfunc, trk);
 150                             if (PMIX_SUCCESS != rc) {
 151                                 pmix_list_remove_item(&pmix_server_globals.collectives, &trk->super);
 152                                 PMIX_RELEASE(trk);
 153                             }
 154                         } else if (PMIX_DISCONNECTNB_CMD == trk->type) {
 155                             trk->host_called = true;
 156                             rc = pmix_host_server.disconnect(trk->pcs, trk->npcs, trk->info, trk->ninfo, trk->op_cbfunc, trk);
 157                             if (PMIX_SUCCESS != rc) {
 158                                 pmix_list_remove_item(&pmix_server_globals.collectives, &trk->super);
 159                                 PMIX_RELEASE(trk);
 160                             }
 161                         }
 162                     }
 163                 }
 164             }
 165         }
 166 
 167         /* remove this proc from the list of ranks for this nspace if it is
 168          * still there - we must check for multiple copies as there will be
 169          * one for each "clone" of this peer */
 170         PMIX_LIST_FOREACH_SAFE(info, pinfo, &(peer->nptr->ranks), pmix_rank_info_t) {
 171             if (info == peer->info) {
 172                 pmix_list_remove_item(&(peer->nptr->ranks), &(peer->info->super));
 173             }
 174         }
 175         /* reduce the number of local procs */
 176         if (0 < peer->nptr->nlocalprocs) {
 177             --peer->nptr->nlocalprocs;
 178         }
 179 
 180         /* remove this client from our array */
 181         pmix_pointer_array_set_item(&pmix_server_globals.clients,
 182                                     peer->index, NULL);
 183 
 184         /* purge any notifications cached for this client */
 185         pmix_server_purge_events(peer, NULL);
 186 
 187         if (PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer)) {
 188             /* only connection I can lose is to my server, so mark it */
 189             pmix_globals.connected = false;
 190         } else {
 191             /* cleanup any sensors that are monitoring them */
 192             pmix_psensor.stop(peer, NULL);
 193         }
 194 
 195         if (!peer->finalized && !PMIX_PROC_IS_TOOL(peer) && !pmix_globals.mypeer->finalized) {
 196             /* if this peer already called finalize, then
 197              * we are just seeing their connection go away
 198              * when they terminate - so do not generate
 199              * an event. If not, then we do */
 200             PMIX_REPORT_EVENT(err, peer, PMIX_RANGE_PROC_LOCAL, _notify_complete);
 201         }
 202         /* now decrease the refcount - might actually free the object */
 203         PMIX_RELEASE(peer->info);
 204 
 205         /* be sure to let the host know that the tool or client
 206          * is gone - otherwise, it won't know to cleanup the
 207          * resources it allocated to it */
 208         if (NULL != pmix_host_server.client_finalized && !peer->finalized) {
 209             pmix_strncpy(proc.nspace, peer->info->pname.nspace, PMIX_MAX_NSLEN);
 210             proc.rank = peer->info->pname.rank;
 211             /* now tell the host server */
 212             rc = pmix_host_server.client_finalized(&proc, peer->info->server_object,
 213                                                    lcfn, peer);
 214             if (PMIX_SUCCESS == rc) {
 215                 /* we will release the peer when the server calls us back */
 216                 peer->finalized = true;
 217                 return;
 218             }
 219         }
 220         /* mark the peer as "gone" since a release doesn't guarantee
 221          * that the peer object doesn't persist */
 222         peer->finalized = true;
 223         /* Release peer info */
 224         PMIX_RELEASE(peer);
 225      } else {
 226         /* if I am a client, there is only
 227          * one connection we can have */
 228         pmix_globals.connected = false;
 229          /* set the public error status */
 230         err = PMIX_ERR_LOST_CONNECTION_TO_SERVER;
 231         /* it is possible that we have sendrecv's in progress where
 232          * we are waiting for a response to arrive. Since we have
 233          * lost connection to the server, that will never happen.
 234          * Thus, to preclude any chance of hanging, cycle thru
 235          * the list of posted recvs and complete any that are
 236          * the return call from a sendrecv - i.e., any that are
 237          * waiting on dynamic tags */
 238         PMIX_CONSTRUCT(&buf, pmix_buffer_t);
 239         /* must set the buffer type so it doesn't fail in unpack */
 240         buf.type = pmix_client_globals.myserver->nptr->compat.type;
 241         hdr.nbytes = 0; // initialize the hdr to something safe
 242         PMIX_LIST_FOREACH(rcv, &pmix_ptl_globals.posted_recvs, pmix_ptl_posted_recv_t) {
 243             if (UINT_MAX != rcv->tag && NULL != rcv->cbfunc) {
 244                 /* construct and load the buffer */
 245                 hdr.tag = rcv->tag;
 246                 rcv->cbfunc(pmix_globals.mypeer, &hdr, &buf, rcv->cbdata);
 247             }
 248         }
 249         PMIX_DESTRUCT(&buf);
 250         /* if I called finalize, then don't generate an event */
 251         if (!pmix_globals.mypeer->finalized) {
 252             PMIX_REPORT_EVENT(err, pmix_client_globals.myserver, PMIX_RANGE_PROC_LOCAL, _notify_complete);
 253         }
 254     }
 255 }
 256 
 257 static pmix_status_t send_msg(int sd, pmix_ptl_send_t *msg)
 258 {
 259     struct iovec iov[2];
 260     int iov_count;
 261     ssize_t remain = msg->sdbytes, rc;
 262 
 263     iov[0].iov_base = msg->sdptr;
 264     iov[0].iov_len = msg->sdbytes;
 265     if (!msg->hdr_sent && NULL != msg->data) {
 266         iov[1].iov_base = msg->data->base_ptr;
 267         iov[1].iov_len = ntohl(msg->hdr.nbytes);
 268         remain += ntohl(msg->hdr.nbytes);
 269         iov_count = 2;
 270     } else {
 271         iov_count = 1;
 272     }
 273   retry:
 274     rc = writev(sd, iov, iov_count);
 275     if (PMIX_LIKELY(rc == remain)) {
 276         /* we successfully sent the header and the msg data if any */
 277         msg->hdr_sent = true;
 278         msg->sdbytes = 0;
 279         msg->sdptr = (char *)iov[iov_count-1].iov_base + iov[iov_count-1].iov_len;
 280         return PMIX_SUCCESS;
 281     } else if (rc < 0) {
 282         if (pmix_socket_errno == EINTR) {
 283             goto retry;
 284         } else if (pmix_socket_errno == EAGAIN) {
 285             /* tell the caller to keep this message on active,
 286              * but let the event lib cycle so other messages
 287              * can progress while this socket is busy
 288              */
 289             return PMIX_ERR_RESOURCE_BUSY;
 290         } else if (pmix_socket_errno == EWOULDBLOCK) {
 291             /* tell the caller to keep this message on active,
 292              * but let the event lib cycle so other messages
 293              * can progress while this socket is busy
 294              */
 295             return PMIX_ERR_WOULD_BLOCK;
 296         } else {
 297             /* we hit an error and cannot progress this message */
 298             pmix_output(0, "pmix_ptl_base: send_msg: write failed: %s (%d) [sd = %d]",
 299                         strerror(pmix_socket_errno),
 300                         pmix_socket_errno, sd);
 301             return PMIX_ERR_UNREACH;
 302         }
 303     } else {
 304         /* short writev. This usually means the kernel buffer is full,
 305          * so there is no point for retrying at that time.
 306          * simply update the msg and return with PMIX_ERR_RESOURCE_BUSY */
 307         if ((size_t)rc < msg->sdbytes) {
 308             /* partial write of the header or the msg data */
 309             msg->sdptr = (char *)msg->sdptr + rc;
 310             msg->sdbytes -= rc;
 311         } else {
 312             /* header was fully written, but only a part of the msg data was written */
 313             msg->hdr_sent = true;
 314             rc -= msg->sdbytes;
 315             if (NULL != msg->data) {
 316                 /* technically, this should never happen as iov_count
 317                  * would be 1 for a zero-byte message, and so we cannot
 318                  * have a case where we write the header and part of the
 319                  * msg. However, code checkers don't know that and are
 320                  * fooled by our earlier check for NULL, and so
 321                  * we silence their warnings by using this check */
 322                 msg->sdptr = (char *)msg->data->base_ptr + rc;
 323             }
 324             msg->sdbytes = ntohl(msg->hdr.nbytes) - rc;
 325         }
 326         return PMIX_ERR_RESOURCE_BUSY;
 327     }
 328 }
 329 
 330 static pmix_status_t read_bytes(int sd, char **buf, size_t *remain)
 331 {
 332     pmix_status_t ret = PMIX_SUCCESS;
 333     int rc;
 334     char *ptr = *buf;
 335 
 336     /* read until all bytes recvd or error */
 337     while (0 < *remain) {
 338         rc = read(sd, ptr, *remain);
 339         if (rc < 0) {
 340             if(pmix_socket_errno == EINTR) {
 341                 continue;
 342             } else if (pmix_socket_errno == EAGAIN) {
 343                 /* tell the caller to keep this message on active,
 344                  * but let the event lib cycle so other messages
 345                  * can progress while this socket is busy
 346                  */
 347                 ret = PMIX_ERR_RESOURCE_BUSY;
 348                 goto exit;
 349             } else if (pmix_socket_errno == EWOULDBLOCK) {
 350                 /* tell the caller to keep this message on active,
 351                  * but let the event lib cycle so other messages
 352                  * can progress while this socket is busy
 353                  */
 354                 ret = PMIX_ERR_WOULD_BLOCK;
 355                 goto exit;
 356             }
 357             /* we hit an error and cannot progress this message - report
 358              * the error back to the RML and let the caller know
 359              * to abort this message
 360              */
 361             pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 362                                 "pmix_ptl_base_msg_recv: readv failed: %s (%d)",
 363                                 strerror(pmix_socket_errno),
 364                                 pmix_socket_errno);
 365             ret = PMIX_ERR_UNREACH;
 366             goto exit;
 367         } else if (0 == rc) {
 368             /* the remote peer closed the connection */
 369             ret = PMIX_ERR_UNREACH;
 370             goto exit;
 371         }
 372         /* we were able to read something, so adjust counters and location */
 373         *remain -= rc;
 374         ptr += rc;
 375     }
 376     /* we read the full data block */
 377   exit:
 378     *buf = ptr;
 379     return ret;
 380 }
 381 
 382 /*
 383  * A file descriptor is available/ready for send. Check the state
 384  * of the socket and take the appropriate action.
 385  */
 386 void pmix_ptl_base_send_handler(int sd, short flags, void *cbdata)
 387 {
 388     pmix_peer_t *peer = (pmix_peer_t*)cbdata;
 389     pmix_ptl_send_t *msg = peer->send_msg;
 390     pmix_status_t rc;
 391 
 392     /* acquire the object */
 393     PMIX_ACQUIRE_OBJECT(peer);
 394 
 395     pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 396                         "%s:%d ptl:base:send_handler SENDING TO PEER %s:%d tag %u with %s msg",
 397                         pmix_globals.myid.nspace, pmix_globals.myid.rank,
 398                         peer->info->pname.nspace, peer->info->pname.rank,
 399                         (NULL == msg) ? UINT_MAX : ntohl(msg->hdr.tag),
 400                         (NULL == msg) ? "NULL" : "NON-NULL");
 401 
 402     if (NULL != msg) {
 403         pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 404                             "ptl:base:send_handler SENDING MSG TO %s:%d TAG %u",
 405                             peer->info->pname.nspace, peer->info->pname.rank,
 406                             ntohl(msg->hdr.tag));
 407         if (PMIX_SUCCESS == (rc = send_msg(peer->sd, msg))) {
 408             // message is complete
 409             pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 410                                 "ptl:base:send_handler MSG SENT");
 411             PMIX_RELEASE(msg);
 412             peer->send_msg = NULL;
 413         } else if (PMIX_ERR_RESOURCE_BUSY == rc ||
 414                    PMIX_ERR_WOULD_BLOCK == rc) {
 415             /* exit this event and let the event lib progress */
 416             pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 417                                 "ptl:base:send_handler RES BUSY OR WOULD BLOCK");
 418             /* ensure we post the modified peer object before another thread
 419              * picks it back up */
 420             PMIX_POST_OBJECT(peer);
 421             return;
 422         } else {
 423             pmix_output_verbose(5, pmix_ptl_base_framework.framework_output,
 424                                 "%s:%d SEND ERROR %s",
 425                                 pmix_globals.myid.nspace, pmix_globals.myid.rank,
 426                                 PMIx_Error_string(rc));
 427             // report the error
 428             pmix_event_del(&peer->send_event);
 429             peer->send_ev_active = false;
 430             PMIX_RELEASE(msg);
 431             peer->send_msg = NULL;
 432             pmix_ptl_base_lost_connection(peer, rc);
 433             /* ensure we post the modified peer object before another thread
 434              * picks it back up */
 435             PMIX_POST_OBJECT(peer);
 436             return;
 437         }
 438 
 439         /* if current message completed - progress any pending sends by
 440          * moving the next in the queue into the "on-deck" position. Note
 441          * that this doesn't mean we send the message right now - we will
 442          * wait for another send_event to fire before doing so. This gives
 443          * us a chance to service any pending recvs.
 444          */
 445         peer->send_msg = (pmix_ptl_send_t*)
 446             pmix_list_remove_first(&peer->send_queue);
 447     }
 448 
 449     /* if nothing else to do unregister for send event notifications */
 450     if (NULL == peer->send_msg && peer->send_ev_active) {
 451         pmix_event_del(&peer->send_event);
 452         peer->send_ev_active = false;
 453     }
 454     /* ensure we post the modified peer object before another thread
 455      * picks it back up */
 456     PMIX_POST_OBJECT(peer);
 457 }
 458 
 459 /*
 460  * Dispatch to the appropriate action routine based on the state
 461  * of the connection with the peer.
 462  */
 463 
 464 void pmix_ptl_base_recv_handler(int sd, short flags, void *cbdata)
 465 {
 466     pmix_status_t rc;
 467     pmix_peer_t *peer = (pmix_peer_t*)cbdata;
 468     pmix_ptl_recv_t *msg = NULL;
 469     pmix_ptl_hdr_t hdr;
 470     size_t nbytes;
 471     char *ptr;
 472 
 473     /* acquire the object */
 474     PMIX_ACQUIRE_OBJECT(peer);
 475 
 476     pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 477                         "%s:%d ptl:base:recv:handler called with peer %s:%d",
 478                         pmix_globals.myid.nspace, pmix_globals.myid.rank,
 479                         (NULL == peer) ? "NULL" : peer->info->pname.nspace,
 480                         (NULL == peer) ? PMIX_RANK_UNDEF : peer->info->pname.rank);
 481 
 482     if (NULL == peer) {
 483         return;
 484     }
 485     /* allocate a new message and setup for recv */
 486     if (NULL == peer->recv_msg) {
 487         pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 488                             "ptl:base:recv:handler allocate new recv msg");
 489         peer->recv_msg = PMIX_NEW(pmix_ptl_recv_t);
 490         if (NULL == peer->recv_msg) {
 491             pmix_output(0, "sptl:base:recv_handler: unable to allocate recv message\n");
 492             goto err_close;
 493         }
 494         PMIX_RETAIN(peer);
 495         peer->recv_msg->peer = peer;  // provide a handle back to the peer object
 496         /* start by reading the header */
 497         peer->recv_msg->rdptr = (char*)&peer->recv_msg->hdr;
 498         peer->recv_msg->rdbytes = sizeof(pmix_ptl_hdr_t);
 499     }
 500     msg = peer->recv_msg;
 501     msg->sd = sd;
 502     /* if the header hasn't been completely read, read it */
 503     if (!msg->hdr_recvd) {
 504          pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 505                             "ptl:base:recv:handler read hdr on socket %d", peer->sd);
 506         nbytes = sizeof(pmix_ptl_hdr_t);
 507         ptr = (char*)&hdr;
 508         if (PMIX_SUCCESS == (rc = read_bytes(peer->sd, &ptr, &nbytes))) {
 509             /* completed reading the header */
 510             peer->recv_msg->hdr_recvd = true;
 511             /* convert the hdr to host format */
 512             peer->recv_msg->hdr.pindex = ntohl(hdr.pindex);
 513             peer->recv_msg->hdr.tag = ntohl(hdr.tag);
 514             peer->recv_msg->hdr.nbytes = ntohl(hdr.nbytes);
 515             pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 516                                 "RECVD MSG FOR TAG %d SIZE %d",
 517                                 (int)peer->recv_msg->hdr.tag,
 518                                 (int)peer->recv_msg->hdr.nbytes);
 519             /* if this is a zero-byte message, then we are done */
 520             if (0 == peer->recv_msg->hdr.nbytes) {
 521                 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 522                                     "RECVD ZERO-BYTE MESSAGE FROM %s:%u for tag %d",
 523                                     peer->info->pname.nspace, peer->info->pname.rank,
 524                                     peer->recv_msg->hdr.tag);
 525                 peer->recv_msg->data = NULL;  // make sure
 526                 peer->recv_msg->rdptr = NULL;
 527                 peer->recv_msg->rdbytes = 0;
 528                 /* post it for delivery */
 529                 PMIX_ACTIVATE_POST_MSG(peer->recv_msg);
 530                 peer->recv_msg = NULL;
 531                 PMIX_POST_OBJECT(peer);
 532                 return;
 533             } else {
 534                 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 535                                     "ptl:base:recv:handler allocate data region of size %lu",
 536                                     (unsigned long)peer->recv_msg->hdr.nbytes);
 537                 /* allocate the data region */
 538                 if (pmix_ptl_globals.max_msg_size < peer->recv_msg->hdr.nbytes) {
 539                     pmix_show_help("help-pmix-runtime.txt", "ptl:msg_size", true,
 540                                    (unsigned long)peer->recv_msg->hdr.nbytes,
 541                                    (unsigned long)pmix_ptl_globals.max_msg_size);
 542                     goto err_close;
 543                 }
 544                 peer->recv_msg->data = (char*)malloc(peer->recv_msg->hdr.nbytes);
 545                 memset(peer->recv_msg->data, 0, peer->recv_msg->hdr.nbytes);
 546                 /* point to it */
 547                 peer->recv_msg->rdptr = peer->recv_msg->data;
 548                 peer->recv_msg->rdbytes = peer->recv_msg->hdr.nbytes;
 549             }
 550             /* fall thru and attempt to read the data */
 551         } else if (PMIX_ERR_RESOURCE_BUSY == rc ||
 552                    PMIX_ERR_WOULD_BLOCK == rc) {
 553             /* exit this event and let the event lib progress */
 554             return;
 555         } else {
 556             /* the remote peer closed the connection - report that condition
 557              * and let the caller know
 558              */
 559             pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 560                                 "ptl:base:msg_recv: peer %s:%d closed connection",
 561                                 peer->nptr->nspace, peer->info->pname.rank);
 562             goto err_close;
 563         }
 564     }
 565 
 566     if (peer->recv_msg->hdr_recvd) {
 567         /* continue to read the data block - we start from
 568          * wherever we left off, which could be at the
 569          * beginning or somewhere in the message
 570          */
 571         if (PMIX_SUCCESS == (rc = read_bytes(peer->sd, &msg->rdptr, &msg->rdbytes))) {
 572             /* we recvd all of the message */
 573             pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 574                                 "%s:%d RECVD COMPLETE MESSAGE FROM SERVER OF %d BYTES FOR TAG %d ON PEER SOCKET %d",
 575                                 pmix_globals.myid.nspace, pmix_globals.myid.rank,
 576                                 (int)peer->recv_msg->hdr.nbytes,
 577                                 peer->recv_msg->hdr.tag, peer->sd);
 578             /* post it for delivery */
 579             PMIX_ACTIVATE_POST_MSG(peer->recv_msg);
 580             peer->recv_msg = NULL;
 581             /* ensure we post the modified peer object before another thread
 582              * picks it back up */
 583             PMIX_POST_OBJECT(peer);
 584             return;
 585         } else if (PMIX_ERR_RESOURCE_BUSY == rc ||
 586                    PMIX_ERR_WOULD_BLOCK == rc) {
 587             /* exit this event and let the event lib progress */
 588             /* ensure we post the modified peer object before another thread
 589              * picks it back up */
 590             PMIX_POST_OBJECT(peer);
 591             return;
 592         } else {
 593             /* the remote peer closed the connection - report that condition
 594              * and let the caller know
 595              */
 596             pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 597                                 "%s:%d ptl:base:msg_recv: peer %s:%d closed connection",
 598                                 pmix_globals.myid.nspace, pmix_globals.myid.rank,
 599                                 peer->nptr->nspace, peer->info->pname.rank);
 600             goto err_close;
 601         }
 602     }
 603     /* success */
 604     return;
 605 
 606   err_close:
 607     /* stop all events */
 608     if (peer->recv_ev_active) {
 609         pmix_event_del(&peer->recv_event);
 610         peer->recv_ev_active = false;
 611     }
 612     if (peer->send_ev_active) {
 613         pmix_event_del(&peer->send_event);
 614         peer->send_ev_active = false;
 615     }
 616     if (NULL != peer->recv_msg) {
 617         PMIX_RELEASE(peer->recv_msg);
 618         peer->recv_msg = NULL;
 619     }
 620     pmix_ptl_base_lost_connection(peer, PMIX_ERR_UNREACH);
 621     /* ensure we post the modified peer object before another thread
 622      * picks it back up */
 623     PMIX_POST_OBJECT(peer);
 624 }
 625 
 626 void pmix_ptl_base_send(int sd, short args, void *cbdata)
 627 {
 628     pmix_ptl_queue_t *queue = (pmix_ptl_queue_t*)cbdata;
 629     pmix_ptl_send_t *snd;
 630 
 631     /* acquire the object */
 632     PMIX_ACQUIRE_OBJECT(queue);
 633 
 634     if (NULL == queue->peer || queue->peer->sd < 0 ||
 635         NULL == queue->peer->info || NULL == queue->peer->nptr) {
 636         /* this peer has lost connection */
 637         if (NULL != queue->buf) {
 638             PMIX_RELEASE(queue->buf);
 639         }
 640         PMIX_RELEASE(queue);
 641         return;
 642     }
 643 
 644     pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 645                         "[%s:%d] send to %s:%u on tag %d",
 646                         __FILE__, __LINE__,
 647                         (queue->peer)->info->pname.nspace,
 648                         (queue->peer)->info->pname.rank, (queue->tag));
 649 
 650     if (NULL == queue->buf) {
 651         /* nothing to send? */
 652         PMIX_RELEASE(queue);
 653         return;
 654     }
 655 
 656     snd = PMIX_NEW(pmix_ptl_send_t);
 657     snd->hdr.pindex = htonl(pmix_globals.pindex);
 658     snd->hdr.tag = htonl(queue->tag);
 659     snd->hdr.nbytes = htonl((queue->buf)->bytes_used);
 660     snd->data = (queue->buf);
 661     /* always start with the header */
 662     snd->sdptr = (char*)&snd->hdr;
 663     snd->sdbytes = sizeof(pmix_ptl_hdr_t);
 664 
 665     /* if there is no message on-deck, put this one there */
 666     if (NULL == (queue->peer)->send_msg) {
 667         (queue->peer)->send_msg = snd;
 668     } else {
 669         /* add it to the queue */
 670         pmix_list_append(&(queue->peer)->send_queue, &snd->super);
 671     }
 672     /* ensure the send event is active */
 673     if (!(queue->peer)->send_ev_active) {
 674         (queue->peer)->send_ev_active = true;
 675         PMIX_POST_OBJECT(queue->peer);
 676         pmix_event_add(&(queue->peer)->send_event, 0);
 677     }
 678     PMIX_RELEASE(queue);
 679     PMIX_POST_OBJECT(snd);
 680 }
 681 
 682 void pmix_ptl_base_send_recv(int fd, short args, void *cbdata)
 683 {
 684     pmix_ptl_sr_t *ms = (pmix_ptl_sr_t*)cbdata;
 685     pmix_ptl_posted_recv_t *req;
 686     pmix_ptl_send_t *snd;
 687     uint32_t tag;
 688 
 689     /* acquire the object */
 690     PMIX_ACQUIRE_OBJECT(ms);
 691 
 692     if (NULL == ms->peer || ms->peer->sd < 0 ||
 693         NULL == ms->peer->info || NULL == ms->peer->nptr) {
 694         /* this peer has lost connection */
 695         if (NULL != ms->bfr) {
 696             PMIX_RELEASE(ms->bfr);
 697         }
 698         PMIX_RELEASE(ms);
 699         return;
 700     }
 701 
 702     if (NULL == ms->bfr) {
 703         /* nothing to send? */
 704         PMIX_RELEASE(ms);
 705         return;
 706     }
 707 
 708     /* take the next tag in the sequence */
 709     pmix_ptl_globals.current_tag++;
 710     if (UINT32_MAX == pmix_ptl_globals.current_tag ) {
 711         pmix_ptl_globals.current_tag = PMIX_PTL_TAG_DYNAMIC;
 712     }
 713     tag = pmix_ptl_globals.current_tag;
 714 
 715     if (NULL != ms->cbfunc) {
 716         /* if a callback msg is expected, setup a recv for it */
 717         req = PMIX_NEW(pmix_ptl_posted_recv_t);
 718         req->tag = tag;
 719         req->cbfunc = ms->cbfunc;
 720         req->cbdata = ms->cbdata;
 721 
 722         pmix_output_verbose(5, pmix_ptl_base_framework.framework_output,
 723                             "posting recv on tag %d", req->tag);
 724         /* add it to the list of recvs - we cannot have unexpected messages
 725          * in this subsystem as the server never sends us something that
 726          * we didn't previously request */
 727         pmix_list_prepend(&pmix_ptl_globals.posted_recvs, &req->super);
 728     }
 729 
 730     pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
 731                         "QUEIENG MSG TO SERVER OF SIZE %d",
 732                         (int)ms->bfr->bytes_used);
 733     snd = PMIX_NEW(pmix_ptl_send_t);
 734     snd->hdr.pindex = htonl(pmix_globals.pindex);
 735     snd->hdr.tag = htonl(tag);
 736     snd->hdr.nbytes = htonl(ms->bfr->bytes_used);
 737     snd->data = ms->bfr;
 738     /* always start with the header */
 739     snd->sdptr = (char*)&snd->hdr;
 740     snd->sdbytes = sizeof(pmix_ptl_hdr_t);
 741 
 742     /* if there is no message on-deck, put this one there */
 743     if (NULL == ms->peer->send_msg) {
 744         ms->peer->send_msg = snd;
 745     } else {
 746         /* add it to the queue */
 747         pmix_list_append(&ms->peer->send_queue, &snd->super);
 748     }
 749     /* ensure the send event is active */
 750     if (!ms->peer->send_ev_active) {
 751         ms->peer->send_ev_active = true;
 752         PMIX_POST_OBJECT(snd);
 753         pmix_event_add(&ms->peer->send_event, 0);
 754     }
 755     /* cleanup */
 756     PMIX_RELEASE(ms);
 757     PMIX_POST_OBJECT(snd);
 758 }
 759 
 760 void pmix_ptl_base_process_msg(int fd, short flags, void *cbdata)
 761 {
 762     pmix_ptl_recv_t *msg = (pmix_ptl_recv_t*)cbdata;
 763     pmix_ptl_posted_recv_t *rcv;
 764     pmix_buffer_t buf;
 765 
 766     /* acquire the object */
 767     PMIX_ACQUIRE_OBJECT(msg);
 768 
 769     pmix_output_verbose(5, pmix_ptl_base_framework.framework_output,
 770                         "%s:%d message received %d bytes for tag %u on socket %d",
 771                         pmix_globals.myid.nspace, pmix_globals.myid.rank,
 772                         (int)msg->hdr.nbytes, msg->hdr.tag, msg->sd);
 773 
 774     /* see if we have a waiting recv for this message */
 775     PMIX_LIST_FOREACH(rcv, &pmix_ptl_globals.posted_recvs, pmix_ptl_posted_recv_t) {
 776         pmix_output_verbose(5, pmix_ptl_base_framework.framework_output,
 777                             "checking msg on tag %u for tag %u",
 778                             msg->hdr.tag, rcv->tag);
 779 
 780         if (msg->hdr.tag == rcv->tag || UINT_MAX == rcv->tag) {
 781             if (NULL != rcv->cbfunc) {
 782                 /* construct and load the buffer */
 783                 PMIX_CONSTRUCT(&buf, pmix_buffer_t);
 784                 if (NULL != msg->data) {
 785                     PMIX_LOAD_BUFFER(msg->peer, &buf, msg->data, msg->hdr.nbytes);
 786                 } else {
 787                     /* we need to at least set the buffer type so
 788                      * unpack of a zero-byte message doesn't error */
 789                     buf.type = msg->peer->nptr->compat.type;
 790                 }
 791                 msg->data = NULL;  // protect the data region
 792                 pmix_output_verbose(5, pmix_ptl_base_framework.framework_output,
 793                                      "%s:%d EXECUTE CALLBACK for tag %u",
 794                                      pmix_globals.myid.nspace, pmix_globals.myid.rank,
 795                                      msg->hdr.tag);
 796                 rcv->cbfunc(msg->peer, &msg->hdr, &buf, rcv->cbdata);
 797                 pmix_output_verbose(5, pmix_ptl_base_framework.framework_output,
 798                                     "%s:%d CALLBACK COMPLETE",
 799                                     pmix_globals.myid.nspace, pmix_globals.myid.rank);
 800                 PMIX_DESTRUCT(&buf);  // free's the msg data
 801             }
 802             /* done with the recv if it is a dynamic tag */
 803             if (PMIX_PTL_TAG_DYNAMIC <= rcv->tag && UINT_MAX != rcv->tag) {
 804                 pmix_list_remove_item(&pmix_ptl_globals.posted_recvs, &rcv->super);
 805                 PMIX_RELEASE(rcv);
 806             }
 807             PMIX_RELEASE(msg);
 808             return;
 809         }
 810     }
 811 
 812     /* if the tag in this message is above the dynamic marker, then
 813      * that is an error */
 814     if (PMIX_PTL_TAG_DYNAMIC <= msg->hdr.tag) {
 815         pmix_output(0, "UNEXPECTED MESSAGE tag = %d from source %s:%d",
 816                     msg->hdr.tag, msg->peer->info->pname.nspace,
 817                     msg->peer->info->pname.rank);
 818         PMIX_REPORT_EVENT(PMIX_ERROR, msg->peer, PMIX_RANGE_NAMESPACE, _notify_complete);
 819         PMIX_RELEASE(msg);
 820         return;
 821     }
 822 
 823     /* it is possible that someone may post a recv for this message
 824      * at some point, so we have to hold onto it */
 825     pmix_list_append(&pmix_ptl_globals.unexpected_msgs, &msg->super);
 826     /* ensure we post the modified object before another thread
 827      * picks it back up */
 828     PMIX_POST_OBJECT(msg);
 829 }

/* [<][>][^][v][top][bottom][index][help] */