root/orte/mca/oob/tcp/oob_tcp_sendrecv.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_oob_tcp_queue_msg
  2. send_msg
  3. mca_oob_tcp_send_handler
  4. read_bytes
  5. mca_oob_tcp_recv_handler
  6. snd_cons
  7. snd_des
  8. rcv_cons
  9. err_cons

   1 /*
   2  * Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2011 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2006-2013 Los Alamos National Security, LLC.
  13  *                         All rights reserved.
  14  * Copyright (c) 2009      Cisco Systems, Inc.  All rights reserved.
  15  * Copyright (c) 2011      Oak Ridge National Labs.  All rights reserved.
  16  * Copyright (c) 2013-2019 Intel, Inc.  All rights reserved.
  17  * Copyright (c) 2017      Research Organization for Information Science
  18  *                         and Technology (RIST). All rights reserved.
  19  * $COPYRIGHT$
  20  *
  21  * Additional copyrights may follow
  22  *
  23  * $HEADER$
  24  *
  25  * In windows, many of the socket functions return an EWOULDBLOCK
  26  * instead of \ things like EAGAIN, EINPROGRESS, etc. It has been
  27  * verified that this will \ not conflict with other error codes that
  28  * are returned by these functions \ under UNIX/Linux environments
  29  */
  30 
  31 #include "orte_config.h"
  32 
  33 #ifdef HAVE_UNISTD_H
  34 #include <unistd.h>
  35 #endif
  36 #include <fcntl.h>
  37 #ifdef HAVE_SYS_UIO_H
  38 #include <sys/uio.h>
  39 #endif
  40 #ifdef HAVE_NET_UIO_H
  41 #include <net/uio.h>
  42 #endif
  43 #ifdef HAVE_SYS_TYPES_H
  44 #include <sys/types.h>
  45 #endif
  46 #include "opal/opal_socket_errno.h"
  47 #ifdef HAVE_NETINET_IN_H
  48 #include <netinet/in.h>
  49 #endif
  50 #ifdef HAVE_ARPA_INET_H
  51 #include <arpa/inet.h>
  52 #endif
  53 #ifdef HAVE_NETINET_TCP_H
  54 #include <netinet/tcp.h>
  55 #endif
  56 
  57 #include "opal_stdint.h"
  58 #include "opal/types.h"
  59 #include "opal/mca/backtrace/backtrace.h"
  60 #include "opal/util/output.h"
  61 #include "opal/util/net.h"
  62 #include "opal/util/error.h"
  63 #include "opal/class/opal_hash_table.h"
  64 #include "opal/mca/event/event.h"
  65 
  66 #include "orte/util/name_fns.h"
  67 #include "orte/util/threads.h"
  68 #include "orte/runtime/orte_globals.h"
  69 #include "orte/mca/errmgr/errmgr.h"
  70 #include "orte/mca/ess/ess.h"
  71 #include "orte/mca/routed/routed.h"
  72 #include "orte/mca/state/state.h"
  73 #include "orte/runtime/orte_wait.h"
  74 
  75 #include "oob_tcp.h"
  76 #include "orte/mca/oob/tcp/oob_tcp_component.h"
  77 #include "orte/mca/oob/tcp/oob_tcp_peer.h"
  78 #include "orte/mca/oob/tcp/oob_tcp_common.h"
  79 #include "orte/mca/oob/tcp/oob_tcp_connection.h"
  80 
  81 #define OOB_SEND_MAX_RETRIES 3
  82 
  83 void mca_oob_tcp_queue_msg(int sd, short args, void *cbdata)
  84 {
  85     mca_oob_tcp_send_t *snd = (mca_oob_tcp_send_t*)cbdata;
  86     mca_oob_tcp_peer_t *peer;
  87 
  88     ORTE_ACQUIRE_OBJECT(snd);
  89     peer = (mca_oob_tcp_peer_t*)snd->peer;
  90 
  91     /* if there is no message on-deck, put this one there */
  92     if (NULL == peer->send_msg) {
  93         peer->send_msg = snd;
  94     } else {
  95         /* add it to the queue */
  96         opal_list_append(&peer->send_queue, &snd->super);
  97     }
  98     if (snd->activate) {
  99         /* if we aren't connected, then start connecting */
 100         if (MCA_OOB_TCP_CONNECTED != peer->state) {
 101             peer->state = MCA_OOB_TCP_CONNECTING;
 102             ORTE_ACTIVATE_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect);
 103         } else {
 104             /* ensure the send event is active */
 105             if (!peer->send_ev_active) {
 106                 peer->send_ev_active = true;
 107                 ORTE_POST_OBJECT(peer);
 108                 opal_event_add(&peer->send_event, 0);
 109             }
 110         }
 111     }
 112 }
 113 
 114 static int send_msg(mca_oob_tcp_peer_t* peer, mca_oob_tcp_send_t* msg)
 115 {
 116     struct iovec iov[2];
 117     int iov_count, retries = 0;
 118     ssize_t remain = msg->sdbytes, rc;
 119 
 120     iov[0].iov_base = msg->sdptr;
 121     iov[0].iov_len = msg->sdbytes;
 122     if (!msg->hdr_sent) {
 123         if (NULL != msg->data) {
 124             /* relay message - just send that data */
 125             iov[1].iov_base = msg->data;
 126         } else if (NULL != msg->msg->buffer) {
 127             /* buffer send */
 128             iov[1].iov_base = msg->msg->buffer->base_ptr;
 129         } else {
 130             iov[1].iov_base = msg->msg->data;
 131         }
 132         iov[1].iov_len = ntohl(msg->hdr.nbytes);
 133         remain += ntohl(msg->hdr.nbytes);
 134         iov_count = 2;
 135     } else {
 136         iov_count = 1;
 137     }
 138 
 139   retry:
 140     rc = writev(peer->sd, iov, iov_count);
 141     if (OPAL_LIKELY(rc == remain)) {
 142         /* we successfully sent the header and the msg data if any */
 143         msg->hdr_sent = true;
 144         msg->sdbytes = 0;
 145         msg->sdptr = (char *)iov[iov_count-1].iov_base + iov[iov_count-1].iov_len;
 146         return ORTE_SUCCESS;
 147     } else if (rc < 0) {
 148         if (opal_socket_errno == EINTR) {
 149             goto retry;
 150         } else if (opal_socket_errno == EAGAIN) {
 151             /* tell the caller to keep this message on active,
 152              * but let the event lib cycle so other messages
 153              * can progress while this socket is busy
 154              */
 155             ++retries;
 156             if (retries < OOB_SEND_MAX_RETRIES) {
 157                 goto retry;
 158             }
 159             return ORTE_ERR_RESOURCE_BUSY;
 160         } else if (opal_socket_errno == EWOULDBLOCK) {
 161             /* tell the caller to keep this message on active,
 162              * but let the event lib cycle so other messages
 163              * can progress while this socket is busy
 164              */
 165             ++retries;
 166             if (retries < OOB_SEND_MAX_RETRIES) {
 167                 goto retry;
 168             }
 169             return ORTE_ERR_WOULD_BLOCK;
 170         } else {
 171             /* we hit an error and cannot progress this message */
 172             opal_output(0, "oob:tcp: send_msg: write failed: %s (%d) [sd = %d]",
 173                         strerror(opal_socket_errno),
 174                         opal_socket_errno, peer->sd);
 175             return ORTE_ERR_UNREACH;
 176         }
 177     } else {
 178         /* short writev. This usually means the kernel buffer is full,
 179          * so there is no point for retrying at that time.
 180          * simply update the msg and return with PMIX_ERR_RESOURCE_BUSY */
 181         if ((size_t)rc < msg->sdbytes) {
 182             /* partial write of the header or the msg data */
 183             msg->sdptr = (char *)msg->sdptr + rc;
 184             msg->sdbytes -= rc;
 185         } else {
 186             /* header was fully written, but only a part of the msg data was written */
 187             msg->hdr_sent = true;
 188             rc -= msg->sdbytes;
 189             assert(2 == iov_count);
 190             msg->sdptr = (char *)iov[1].iov_base + rc;
 191             msg->sdbytes = ntohl(msg->hdr.nbytes) - rc;
 192         }
 193         return ORTE_ERR_RESOURCE_BUSY;
 194     }
 195 }
 196 
 197 /*
 198  * A file descriptor is available/ready for send. Check the state
 199  * of the socket and take the appropriate action.
 200  */
 201 void mca_oob_tcp_send_handler(int sd, short flags, void *cbdata)
 202 {
 203     mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t*)cbdata;
 204     mca_oob_tcp_send_t* msg;
 205     int rc;
 206 
 207     ORTE_ACQUIRE_OBJECT(peer);
 208     msg = peer->send_msg;
 209 
 210     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 211                         "%s tcp:send_handler called to send to peer %s",
 212                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 213                         ORTE_NAME_PRINT(&peer->name));
 214 
 215     switch (peer->state) {
 216     case MCA_OOB_TCP_CONNECTING:
 217     case MCA_OOB_TCP_CLOSED:
 218         opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 219                             "%s tcp:send_handler %s",
 220                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 221                             mca_oob_tcp_state_print(peer->state));
 222         mca_oob_tcp_peer_complete_connect(peer);
 223         /* de-activate the send event until the connection
 224          * handshake completes
 225          */
 226         if (peer->send_ev_active) {
 227             opal_event_del(&peer->send_event);
 228             peer->send_ev_active = false;
 229         }
 230         break;
 231     case MCA_OOB_TCP_CONNECTED:
 232         opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 233                             "%s tcp:send_handler SENDING TO %s",
 234                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 235                             (NULL == peer->send_msg) ? "NULL" : ORTE_NAME_PRINT(&peer->name));
 236         if (NULL != msg) {
 237             opal_output_verbose(2, orte_oob_base_framework.framework_output,
 238                                 "oob:tcp:send_handler SENDING MSG");
 239             if (ORTE_SUCCESS == (rc = send_msg(peer, msg))) {
 240                 /* this msg is complete */
 241                 if (NULL != msg->data || NULL == msg->msg) {
 242                     /* the relay is complete - release the data */
 243                     opal_output_verbose(2, orte_oob_base_framework.framework_output,
 244                                         "%s MESSAGE RELAY COMPLETE TO %s OF %d BYTES ON SOCKET %d",
 245                                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 246                                         ORTE_NAME_PRINT(&(peer->name)),
 247                                         (int)ntohl(msg->hdr.nbytes), peer->sd);
 248                     OBJ_RELEASE(msg);
 249                     peer->send_msg = NULL;
 250                 } else if (NULL != msg->msg->buffer) {
 251                     /* we are done - notify the RML */
 252                     opal_output_verbose(2, orte_oob_base_framework.framework_output,
 253                                         "%s MESSAGE SEND COMPLETE TO %s OF %d BYTES ON SOCKET %d",
 254                                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 255                                         ORTE_NAME_PRINT(&(peer->name)),
 256                                         (int)ntohl(msg->hdr.nbytes), peer->sd);
 257                     msg->msg->status = ORTE_SUCCESS;
 258                     ORTE_RML_SEND_COMPLETE(msg->msg);
 259                     OBJ_RELEASE(msg);
 260                     peer->send_msg = NULL;
 261                 } else if (NULL != msg->msg->data) {
 262                     /* this was a relay we have now completed - no need to
 263                      * notify the RML as the local proc didn't initiate
 264                      * the send
 265                      */
 266                     opal_output_verbose(2, orte_oob_base_framework.framework_output,
 267                                         "%s MESSAGE RELAY COMPLETE TO %s OF %d BYTES ON SOCKET %d",
 268                                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 269                                         ORTE_NAME_PRINT(&(peer->name)),
 270                                         (int)ntohl(msg->hdr.nbytes), peer->sd);
 271                     msg->msg->status = ORTE_SUCCESS;
 272                     OBJ_RELEASE(msg);
 273                     peer->send_msg = NULL;
 274                 } else {
 275                     /* rotate to the next iovec */
 276                     msg->iovnum++;
 277                     if (msg->iovnum < msg->msg->count) {
 278                         msg->sdptr = msg->msg->iov[msg->iovnum].iov_base;
 279                         msg->sdbytes = msg->msg->iov[msg->iovnum].iov_len;
 280                         /* exit this event to give the event lib
 281                          * a chance to progress any other pending
 282                          * actions
 283                          */
 284                         return;
 285                     } else {
 286                         /* this message is complete - notify the RML */
 287                         opal_output_verbose(2, orte_oob_base_framework.framework_output,
 288                                             "%s MESSAGE SEND COMPLETE TO %s OF %d BYTES ON SOCKET %d",
 289                                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 290                                             ORTE_NAME_PRINT(&(peer->name)),
 291                                             (int)ntohl(msg->hdr.nbytes), peer->sd);
 292                         msg->msg->status = ORTE_SUCCESS;
 293                         ORTE_RML_SEND_COMPLETE(msg->msg);
 294                         OBJ_RELEASE(msg);
 295                         peer->send_msg = NULL;
 296                     }
 297                 }
 298                 /* fall thru to queue the next message */
 299             } else if (ORTE_ERR_RESOURCE_BUSY == rc ||
 300                        ORTE_ERR_WOULD_BLOCK == rc) {
 301                 /* exit this event and let the event lib progress */
 302                 return;
 303             } else {
 304                 // report the error
 305                 opal_output(0, "%s-%s mca_oob_tcp_peer_send_handler: unable to send message ON SOCKET %d",
 306                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 307                             ORTE_NAME_PRINT(&(peer->name)), peer->sd);
 308                 opal_event_del(&peer->send_event);
 309                 msg->msg->status = rc;
 310                 ORTE_RML_SEND_COMPLETE(msg->msg);
 311                 OBJ_RELEASE(msg);
 312                 peer->send_msg = NULL;
 313                 ORTE_FORCED_TERMINATE(1);
 314                 return;
 315             }
 316 
 317             /* if current message completed - progress any pending sends by
 318              * moving the next in the queue into the "on-deck" position. Note
 319              * that this doesn't mean we send the message right now - we will
 320              * wait for another send_event to fire before doing so. This gives
 321              * us a chance to service any pending recvs.
 322              */
 323             peer->send_msg = (mca_oob_tcp_send_t*)
 324                 opal_list_remove_first(&peer->send_queue);
 325         }
 326 
 327         /* if nothing else to do unregister for send event notifications */
 328         if (NULL == peer->send_msg && peer->send_ev_active) {
 329             opal_event_del(&peer->send_event);
 330             peer->send_ev_active = false;
 331         }
 332         break;
 333     default:
 334         opal_output(0, "%s-%s mca_oob_tcp_peer_send_handler: invalid connection state (%d) on socket %d",
 335                     ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 336                     ORTE_NAME_PRINT(&(peer->name)),
 337                     peer->state, peer->sd);
 338         if (peer->send_ev_active) {
 339             opal_event_del(&peer->send_event);
 340             peer->send_ev_active = false;
 341         }
 342         break;
 343     }
 344 }
 345 
 346 static int read_bytes(mca_oob_tcp_peer_t* peer)
 347 {
 348     int rc;
 349 
 350     /* read until all bytes recvd or error */
 351     while (0 < peer->recv_msg->rdbytes) {
 352         rc = read(peer->sd, peer->recv_msg->rdptr, peer->recv_msg->rdbytes);
 353         if (rc < 0) {
 354             if(opal_socket_errno == EINTR) {
 355                 continue;
 356             } else if (opal_socket_errno == EAGAIN) {
 357                 /* tell the caller to keep this message on active,
 358                  * but let the event lib cycle so other messages
 359                  * can progress while this socket is busy
 360                  */
 361                 return ORTE_ERR_RESOURCE_BUSY;
 362             } else if (opal_socket_errno == EWOULDBLOCK) {
 363                 /* tell the caller to keep this message on active,
 364                  * but let the event lib cycle so other messages
 365                  * can progress while this socket is busy
 366                  */
 367                 return ORTE_ERR_WOULD_BLOCK;
 368             }
 369             /* we hit an error and cannot progress this message - report
 370              * the error back to the RML and let the caller know
 371              * to abort this message
 372              */
 373             opal_output_verbose(OOB_TCP_DEBUG_FAIL, orte_oob_base_framework.framework_output,
 374                                 "%s-%s mca_oob_tcp_msg_recv: readv failed: %s (%d)",
 375                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 376                                 ORTE_NAME_PRINT(&(peer->name)),
 377                                 strerror(opal_socket_errno),
 378                                 opal_socket_errno);
 379             // mca_oob_tcp_peer_close(peer);
 380             // if (NULL != mca_oob_tcp.oob_exception_callback) {
 381             // mca_oob_tcp.oob_exception_callback(&peer->name, ORTE_RML_PEER_DISCONNECTED);
 382             //}
 383             return ORTE_ERR_COMM_FAILURE;
 384         } else if (rc == 0)  {
 385             /* the remote peer closed the connection - report that condition
 386              * and let the caller know
 387              */
 388             opal_output_verbose(OOB_TCP_DEBUG_FAIL, orte_oob_base_framework.framework_output,
 389                                 "%s-%s mca_oob_tcp_msg_recv: peer closed connection",
 390                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 391                                 ORTE_NAME_PRINT(&(peer->name)));
 392             /* stop all events */
 393             if (peer->recv_ev_active) {
 394                 opal_event_del(&peer->recv_event);
 395                 peer->recv_ev_active = false;
 396             }
 397             if (peer->timer_ev_active) {
 398                 opal_event_del(&peer->timer_event);
 399                 peer->timer_ev_active = false;
 400             }
 401             if (peer->send_ev_active) {
 402                 opal_event_del(&peer->send_event);
 403                 peer->send_ev_active = false;
 404             }
 405             if (NULL != peer->recv_msg) {
 406                 OBJ_RELEASE(peer->recv_msg);
 407                 peer->recv_msg = NULL;
 408             }
 409             mca_oob_tcp_peer_close(peer);
 410             //if (NULL != mca_oob_tcp.oob_exception_callback) {
 411             //   mca_oob_tcp.oob_exception_callback(&peer->peer_name, ORTE_RML_PEER_DISCONNECTED);
 412             //}
 413             return ORTE_ERR_WOULD_BLOCK;
 414         }
 415         /* we were able to read something, so adjust counters and location */
 416         peer->recv_msg->rdbytes -= rc;
 417         peer->recv_msg->rdptr += rc;
 418     }
 419 
 420     /* we read the full data block */
 421     return ORTE_SUCCESS;
 422 }
 423 
 424 /*
 425  * Dispatch to the appropriate action routine based on the state
 426  * of the connection with the peer.
 427  */
 428 
 429 void mca_oob_tcp_recv_handler(int sd, short flags, void *cbdata)
 430 {
 431     mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t*)cbdata;
 432     int rc;
 433     orte_rml_send_t *snd;
 434 
 435     ORTE_ACQUIRE_OBJECT(peer);
 436 
 437     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 438                         "%s:tcp:recv:handler called for peer %s",
 439                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 440                         ORTE_NAME_PRINT(&peer->name));
 441 
 442     switch (peer->state) {
 443     case MCA_OOB_TCP_CONNECT_ACK:
 444         if (ORTE_SUCCESS == (rc = mca_oob_tcp_peer_recv_connect_ack(peer, peer->sd, NULL))) {
 445             opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 446                                 "%s:tcp:recv:handler starting send/recv events",
 447                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 448             /* we connected! Start the send/recv events */
 449             if (!peer->recv_ev_active) {
 450                 peer->recv_ev_active = true;
 451                 ORTE_POST_OBJECT(peer);
 452                 opal_event_add(&peer->recv_event, 0);
 453             }
 454             if (peer->timer_ev_active) {
 455                 opal_event_del(&peer->timer_event);
 456                 peer->timer_ev_active = false;
 457             }
 458             /* if there is a message waiting to be sent, queue it */
 459             if (NULL == peer->send_msg) {
 460                 peer->send_msg = (mca_oob_tcp_send_t*)opal_list_remove_first(&peer->send_queue);
 461             }
 462             if (NULL != peer->send_msg && !peer->send_ev_active) {
 463                 peer->send_ev_active = true;
 464                 ORTE_POST_OBJECT(peer);
 465                 opal_event_add(&peer->send_event, 0);
 466             }
 467             /* update our state */
 468             peer->state = MCA_OOB_TCP_CONNECTED;
 469         } else if (ORTE_ERR_UNREACH != rc) {
 470             /* we get an unreachable error returned if a connection
 471              * completes but is rejected - otherwise, we don't want
 472              * to terminate as we might be retrying the connection */
 473             opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 474                                 "%s UNABLE TO COMPLETE CONNECT ACK WITH %s",
 475                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 476                                 ORTE_NAME_PRINT(&peer->name));
 477             opal_event_del(&peer->recv_event);
 478             ORTE_FORCED_TERMINATE(1);
 479             return;
 480         }
 481         break;
 482     case MCA_OOB_TCP_CONNECTED:
 483         opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 484                             "%s:tcp:recv:handler CONNECTED",
 485                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 486         /* allocate a new message and setup for recv */
 487         if (NULL == peer->recv_msg) {
 488             opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 489                                 "%s:tcp:recv:handler allocate new recv msg",
 490                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 491             peer->recv_msg = OBJ_NEW(mca_oob_tcp_recv_t);
 492             if (NULL == peer->recv_msg) {
 493                 opal_output(0, "%s-%s mca_oob_tcp_peer_recv_handler: unable to allocate recv message\n",
 494                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 495                             ORTE_NAME_PRINT(&(peer->name)));
 496                 return;
 497             }
 498             /* start by reading the header */
 499             peer->recv_msg->rdptr = (char*)&peer->recv_msg->hdr;
 500             peer->recv_msg->rdbytes = sizeof(mca_oob_tcp_hdr_t);
 501         }
 502         /* if the header hasn't been completely read, read it */
 503         if (!peer->recv_msg->hdr_recvd) {
 504             opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 505                                 "%s:tcp:recv:handler read hdr",
 506                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 507             if (ORTE_SUCCESS == (rc = read_bytes(peer))) {
 508                 /* completed reading the header */
 509                 peer->recv_msg->hdr_recvd = true;
 510                 /* convert the header */
 511                 MCA_OOB_TCP_HDR_NTOH(&peer->recv_msg->hdr);
 512                 /* if this is a zero-byte message, then we are done */
 513                 if (0 == peer->recv_msg->hdr.nbytes) {
 514                     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 515                                         "%s RECVD ZERO-BYTE MESSAGE FROM %s for tag %d",
 516                                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 517                                         ORTE_NAME_PRINT(&peer->name), peer->recv_msg->hdr.tag);
 518                     peer->recv_msg->data = NULL;  // make sure
 519                 } else {
 520                     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 521                                         "%s:tcp:recv:handler allocate data region of size %lu",
 522                                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (unsigned long)peer->recv_msg->hdr.nbytes);
 523                     /* allocate the data region */
 524                     peer->recv_msg->data = (char*)malloc(peer->recv_msg->hdr.nbytes);
 525                     /* point to it */
 526                     peer->recv_msg->rdptr = peer->recv_msg->data;
 527                     peer->recv_msg->rdbytes = peer->recv_msg->hdr.nbytes;
 528                 }
 529                 /* fall thru and attempt to read the data */
 530             } else if (ORTE_ERR_RESOURCE_BUSY == rc ||
 531                        ORTE_ERR_WOULD_BLOCK == rc) {
 532                 /* exit this event and let the event lib progress */
 533                 return;
 534             } else {
 535                 /* close the connection */
 536                 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 537                                     "%s:tcp:recv:handler error reading bytes - closing connection",
 538                                     ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 539                 mca_oob_tcp_peer_close(peer);
 540                 return;
 541             }
 542         }
 543 
 544         if (peer->recv_msg->hdr_recvd) {
 545             /* continue to read the data block - we start from
 546              * wherever we left off, which could be at the
 547              * beginning or somewhere in the message
 548              */
 549             if (ORTE_SUCCESS == (rc = read_bytes(peer))) {
 550                 /* we recvd all of the message */
 551                 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 552                                     "%s RECVD COMPLETE MESSAGE FROM %s (ORIGIN %s) OF %d BYTES FOR DEST %s TAG %d",
 553                                     ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 554                                     ORTE_NAME_PRINT(&peer->name),
 555                                     ORTE_NAME_PRINT(&peer->recv_msg->hdr.origin),
 556                                     (int)peer->recv_msg->hdr.nbytes,
 557                                     ORTE_NAME_PRINT(&peer->recv_msg->hdr.dst),
 558                                     peer->recv_msg->hdr.tag);
 559 
 560                 /* am I the intended recipient (header was already converted back to host order)? */
 561                 if (peer->recv_msg->hdr.dst.jobid == ORTE_PROC_MY_NAME->jobid &&
 562                     peer->recv_msg->hdr.dst.vpid == ORTE_PROC_MY_NAME->vpid) {
 563                     /* yes - post it to the RML for delivery */
 564                     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 565                                         "%s DELIVERING TO RML tag = %d seq_num = %d",
 566                                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 567                                         peer->recv_msg->hdr.tag,
 568                                         peer->recv_msg->hdr.seq_num);
 569                     ORTE_RML_POST_MESSAGE(&peer->recv_msg->hdr.origin,
 570                                           peer->recv_msg->hdr.tag,
 571                                           peer->recv_msg->hdr.seq_num,
 572                                           peer->recv_msg->data,
 573                                           peer->recv_msg->hdr.nbytes);
 574                     OBJ_RELEASE(peer->recv_msg);
 575                 } else {
 576                     /* promote this to the OOB as some other transport might
 577                      * be the next best hop */
 578                     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 579                                         "%s TCP PROMOTING ROUTED MESSAGE FOR %s TO OOB",
 580                                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 581                                         ORTE_NAME_PRINT(&peer->recv_msg->hdr.dst));
 582                     snd = OBJ_NEW(orte_rml_send_t);
 583                     snd->dst = peer->recv_msg->hdr.dst;
 584                     snd->origin = peer->recv_msg->hdr.origin;
 585                     snd->tag = peer->recv_msg->hdr.tag;
 586                     snd->data = peer->recv_msg->data;
 587                     snd->seq_num = peer->recv_msg->hdr.seq_num;
 588                     snd->count = peer->recv_msg->hdr.nbytes;
 589                     snd->cbfunc.iov = NULL;
 590                     snd->cbdata = NULL;
 591                     /* activate the OOB send state */
 592                     ORTE_OOB_SEND(snd);
 593                     /* protect the data */
 594                     peer->recv_msg->data = NULL;
 595                     /* cleanup */
 596                     OBJ_RELEASE(peer->recv_msg);
 597                 }
 598                 peer->recv_msg = NULL;
 599                 return;
 600             } else if (ORTE_ERR_RESOURCE_BUSY == rc ||
 601                        ORTE_ERR_WOULD_BLOCK == rc) {
 602                 /* exit this event and let the event lib progress */
 603                 return;
 604             } else {
 605                 // report the error
 606                 opal_output(0, "%s-%s mca_oob_tcp_peer_recv_handler: unable to recv message",
 607                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 608                             ORTE_NAME_PRINT(&(peer->name)));
 609                 /* turn off the recv event */
 610                 opal_event_del(&peer->recv_event);
 611                 ORTE_FORCED_TERMINATE(1);
 612                 return;
 613             }
 614         }
 615         break;
 616     default:
 617         opal_output(0, "%s-%s mca_oob_tcp_peer_recv_handler: invalid socket state(%d)",
 618                     ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 619                     ORTE_NAME_PRINT(&(peer->name)),
 620                     peer->state);
 621         // mca_oob_tcp_peer_close(peer);
 622         break;
 623     }
 624 }
 625 
 626 static void snd_cons(mca_oob_tcp_send_t *ptr)
 627 {
 628     memset(&ptr->hdr, 0, sizeof(mca_oob_tcp_hdr_t));
 629     ptr->msg = NULL;
 630     ptr->data = NULL;
 631     ptr->hdr_sent = false;
 632     ptr->iovnum = 0;
 633     ptr->sdptr = NULL;
 634     ptr->sdbytes = 0;
 635 }
 636 /* we don't destruct any RML msg that is
 637  * attached to our send as the RML owns
 638  * that memory. However, if we relay a
 639  * msg, the data in the relay belongs to
 640  * us and must be free'd
 641  */
 642 static void snd_des(mca_oob_tcp_send_t *ptr)
 643 {
 644     if (NULL != ptr->data) {
 645         free(ptr->data);
 646     }
 647 }
 648 OBJ_CLASS_INSTANCE(mca_oob_tcp_send_t,
 649                    opal_list_item_t,
 650                    snd_cons, snd_des);
 651 
 652 static void rcv_cons(mca_oob_tcp_recv_t *ptr)
 653 {
 654     memset(&ptr->hdr, 0, sizeof(mca_oob_tcp_hdr_t));
 655     ptr->hdr_recvd = false;
 656     ptr->rdptr = NULL;
 657     ptr->rdbytes = 0;
 658 }
 659 OBJ_CLASS_INSTANCE(mca_oob_tcp_recv_t,
 660                    opal_list_item_t,
 661                    rcv_cons, NULL);
 662 
 663 static void err_cons(mca_oob_tcp_msg_error_t *ptr)
 664 {
 665     ptr->rmsg = NULL;
 666     ptr->snd = NULL;
 667 }
 668 OBJ_CLASS_INSTANCE(mca_oob_tcp_msg_error_t,
 669                    opal_object_t,
 670                    err_cons, NULL);

/* [<][>][^][v][top][bottom][index][help] */