This source file includes following definitions.
- mca_oob_tcp_queue_msg
- send_msg
- mca_oob_tcp_send_handler
- read_bytes
- mca_oob_tcp_recv_handler
- snd_cons
- snd_des
- rcv_cons
- err_cons
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 
  17 
  18 
  19 
  20 
  21 
  22 
  23 
  24 
  25 
  26 
  27 
  28 
  29 
  30 
  31 #include "orte_config.h"
  32 
  33 #ifdef HAVE_UNISTD_H
  34 #include <unistd.h>
  35 #endif
  36 #include <fcntl.h>
  37 #ifdef HAVE_SYS_UIO_H
  38 #include <sys/uio.h>
  39 #endif
  40 #ifdef HAVE_NET_UIO_H
  41 #include <net/uio.h>
  42 #endif
  43 #ifdef HAVE_SYS_TYPES_H
  44 #include <sys/types.h>
  45 #endif
  46 #include "opal/opal_socket_errno.h"
  47 #ifdef HAVE_NETINET_IN_H
  48 #include <netinet/in.h>
  49 #endif
  50 #ifdef HAVE_ARPA_INET_H
  51 #include <arpa/inet.h>
  52 #endif
  53 #ifdef HAVE_NETINET_TCP_H
  54 #include <netinet/tcp.h>
  55 #endif
  56 
  57 #include "opal_stdint.h"
  58 #include "opal/types.h"
  59 #include "opal/mca/backtrace/backtrace.h"
  60 #include "opal/util/output.h"
  61 #include "opal/util/net.h"
  62 #include "opal/util/error.h"
  63 #include "opal/class/opal_hash_table.h"
  64 #include "opal/mca/event/event.h"
  65 
  66 #include "orte/util/name_fns.h"
  67 #include "orte/util/threads.h"
  68 #include "orte/runtime/orte_globals.h"
  69 #include "orte/mca/errmgr/errmgr.h"
  70 #include "orte/mca/ess/ess.h"
  71 #include "orte/mca/routed/routed.h"
  72 #include "orte/mca/state/state.h"
  73 #include "orte/runtime/orte_wait.h"
  74 
  75 #include "oob_tcp.h"
  76 #include "orte/mca/oob/tcp/oob_tcp_component.h"
  77 #include "orte/mca/oob/tcp/oob_tcp_peer.h"
  78 #include "orte/mca/oob/tcp/oob_tcp_common.h"
  79 #include "orte/mca/oob/tcp/oob_tcp_connection.h"
  80 
  81 #define OOB_SEND_MAX_RETRIES 3
  82 
  83 void mca_oob_tcp_queue_msg(int sd, short args, void *cbdata)
  84 {
  85     mca_oob_tcp_send_t *snd = (mca_oob_tcp_send_t*)cbdata;
  86     mca_oob_tcp_peer_t *peer;
  87 
  88     ORTE_ACQUIRE_OBJECT(snd);
  89     peer = (mca_oob_tcp_peer_t*)snd->peer;
  90 
  91     
  92     if (NULL == peer->send_msg) {
  93         peer->send_msg = snd;
  94     } else {
  95         
  96         opal_list_append(&peer->send_queue, &snd->super);
  97     }
  98     if (snd->activate) {
  99         
 100         if (MCA_OOB_TCP_CONNECTED != peer->state) {
 101             peer->state = MCA_OOB_TCP_CONNECTING;
 102             ORTE_ACTIVATE_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect);
 103         } else {
 104             
 105             if (!peer->send_ev_active) {
 106                 peer->send_ev_active = true;
 107                 ORTE_POST_OBJECT(peer);
 108                 opal_event_add(&peer->send_event, 0);
 109             }
 110         }
 111     }
 112 }
 113 
 114 static int send_msg(mca_oob_tcp_peer_t* peer, mca_oob_tcp_send_t* msg)
 115 {
 116     struct iovec iov[2];
 117     int iov_count, retries = 0;
 118     ssize_t remain = msg->sdbytes, rc;
 119 
 120     iov[0].iov_base = msg->sdptr;
 121     iov[0].iov_len = msg->sdbytes;
 122     if (!msg->hdr_sent) {
 123         if (NULL != msg->data) {
 124             
 125             iov[1].iov_base = msg->data;
 126         } else if (NULL != msg->msg->buffer) {
 127             
 128             iov[1].iov_base = msg->msg->buffer->base_ptr;
 129         } else {
 130             iov[1].iov_base = msg->msg->data;
 131         }
 132         iov[1].iov_len = ntohl(msg->hdr.nbytes);
 133         remain += ntohl(msg->hdr.nbytes);
 134         iov_count = 2;
 135     } else {
 136         iov_count = 1;
 137     }
 138 
 139   retry:
 140     rc = writev(peer->sd, iov, iov_count);
 141     if (OPAL_LIKELY(rc == remain)) {
 142         
 143         msg->hdr_sent = true;
 144         msg->sdbytes = 0;
 145         msg->sdptr = (char *)iov[iov_count-1].iov_base + iov[iov_count-1].iov_len;
 146         return ORTE_SUCCESS;
 147     } else if (rc < 0) {
 148         if (opal_socket_errno == EINTR) {
 149             goto retry;
 150         } else if (opal_socket_errno == EAGAIN) {
 151             
 152 
 153 
 154 
 155             ++retries;
 156             if (retries < OOB_SEND_MAX_RETRIES) {
 157                 goto retry;
 158             }
 159             return ORTE_ERR_RESOURCE_BUSY;
 160         } else if (opal_socket_errno == EWOULDBLOCK) {
 161             
 162 
 163 
 164 
 165             ++retries;
 166             if (retries < OOB_SEND_MAX_RETRIES) {
 167                 goto retry;
 168             }
 169             return ORTE_ERR_WOULD_BLOCK;
 170         } else {
 171             
 172             opal_output(0, "oob:tcp: send_msg: write failed: %s (%d) [sd = %d]",
 173                         strerror(opal_socket_errno),
 174                         opal_socket_errno, peer->sd);
 175             return ORTE_ERR_UNREACH;
 176         }
 177     } else {
 178         
 179 
 180 
 181         if ((size_t)rc < msg->sdbytes) {
 182             
 183             msg->sdptr = (char *)msg->sdptr + rc;
 184             msg->sdbytes -= rc;
 185         } else {
 186             
 187             msg->hdr_sent = true;
 188             rc -= msg->sdbytes;
 189             assert(2 == iov_count);
 190             msg->sdptr = (char *)iov[1].iov_base + rc;
 191             msg->sdbytes = ntohl(msg->hdr.nbytes) - rc;
 192         }
 193         return ORTE_ERR_RESOURCE_BUSY;
 194     }
 195 }
 196 
 197 
 198 
 199 
 200 
 201 void mca_oob_tcp_send_handler(int sd, short flags, void *cbdata)
 202 {
 203     mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t*)cbdata;
 204     mca_oob_tcp_send_t* msg;
 205     int rc;
 206 
 207     ORTE_ACQUIRE_OBJECT(peer);
 208     msg = peer->send_msg;
 209 
 210     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 211                         "%s tcp:send_handler called to send to peer %s",
 212                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 213                         ORTE_NAME_PRINT(&peer->name));
 214 
 215     switch (peer->state) {
 216     case MCA_OOB_TCP_CONNECTING:
 217     case MCA_OOB_TCP_CLOSED:
 218         opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 219                             "%s tcp:send_handler %s",
 220                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 221                             mca_oob_tcp_state_print(peer->state));
 222         mca_oob_tcp_peer_complete_connect(peer);
 223         
 224 
 225 
 226         if (peer->send_ev_active) {
 227             opal_event_del(&peer->send_event);
 228             peer->send_ev_active = false;
 229         }
 230         break;
 231     case MCA_OOB_TCP_CONNECTED:
 232         opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 233                             "%s tcp:send_handler SENDING TO %s",
 234                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 235                             (NULL == peer->send_msg) ? "NULL" : ORTE_NAME_PRINT(&peer->name));
 236         if (NULL != msg) {
 237             opal_output_verbose(2, orte_oob_base_framework.framework_output,
 238                                 "oob:tcp:send_handler SENDING MSG");
 239             if (ORTE_SUCCESS == (rc = send_msg(peer, msg))) {
 240                 
 241                 if (NULL != msg->data || NULL == msg->msg) {
 242                     
 243                     opal_output_verbose(2, orte_oob_base_framework.framework_output,
 244                                         "%s MESSAGE RELAY COMPLETE TO %s OF %d BYTES ON SOCKET %d",
 245                                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 246                                         ORTE_NAME_PRINT(&(peer->name)),
 247                                         (int)ntohl(msg->hdr.nbytes), peer->sd);
 248                     OBJ_RELEASE(msg);
 249                     peer->send_msg = NULL;
 250                 } else if (NULL != msg->msg->buffer) {
 251                     
 252                     opal_output_verbose(2, orte_oob_base_framework.framework_output,
 253                                         "%s MESSAGE SEND COMPLETE TO %s OF %d BYTES ON SOCKET %d",
 254                                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 255                                         ORTE_NAME_PRINT(&(peer->name)),
 256                                         (int)ntohl(msg->hdr.nbytes), peer->sd);
 257                     msg->msg->status = ORTE_SUCCESS;
 258                     ORTE_RML_SEND_COMPLETE(msg->msg);
 259                     OBJ_RELEASE(msg);
 260                     peer->send_msg = NULL;
 261                 } else if (NULL != msg->msg->data) {
 262                     
 263 
 264 
 265 
 266                     opal_output_verbose(2, orte_oob_base_framework.framework_output,
 267                                         "%s MESSAGE RELAY COMPLETE TO %s OF %d BYTES ON SOCKET %d",
 268                                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 269                                         ORTE_NAME_PRINT(&(peer->name)),
 270                                         (int)ntohl(msg->hdr.nbytes), peer->sd);
 271                     msg->msg->status = ORTE_SUCCESS;
 272                     OBJ_RELEASE(msg);
 273                     peer->send_msg = NULL;
 274                 } else {
 275                     
 276                     msg->iovnum++;
 277                     if (msg->iovnum < msg->msg->count) {
 278                         msg->sdptr = msg->msg->iov[msg->iovnum].iov_base;
 279                         msg->sdbytes = msg->msg->iov[msg->iovnum].iov_len;
 280                         
 281 
 282 
 283 
 284                         return;
 285                     } else {
 286                         
 287                         opal_output_verbose(2, orte_oob_base_framework.framework_output,
 288                                             "%s MESSAGE SEND COMPLETE TO %s OF %d BYTES ON SOCKET %d",
 289                                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 290                                             ORTE_NAME_PRINT(&(peer->name)),
 291                                             (int)ntohl(msg->hdr.nbytes), peer->sd);
 292                         msg->msg->status = ORTE_SUCCESS;
 293                         ORTE_RML_SEND_COMPLETE(msg->msg);
 294                         OBJ_RELEASE(msg);
 295                         peer->send_msg = NULL;
 296                     }
 297                 }
 298                 
 299             } else if (ORTE_ERR_RESOURCE_BUSY == rc ||
 300                        ORTE_ERR_WOULD_BLOCK == rc) {
 301                 
 302                 return;
 303             } else {
 304                 
 305                 opal_output(0, "%s-%s mca_oob_tcp_peer_send_handler: unable to send message ON SOCKET %d",
 306                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 307                             ORTE_NAME_PRINT(&(peer->name)), peer->sd);
 308                 opal_event_del(&peer->send_event);
 309                 msg->msg->status = rc;
 310                 ORTE_RML_SEND_COMPLETE(msg->msg);
 311                 OBJ_RELEASE(msg);
 312                 peer->send_msg = NULL;
 313                 ORTE_FORCED_TERMINATE(1);
 314                 return;
 315             }
 316 
 317             
 318 
 319 
 320 
 321 
 322 
 323             peer->send_msg = (mca_oob_tcp_send_t*)
 324                 opal_list_remove_first(&peer->send_queue);
 325         }
 326 
 327         
 328         if (NULL == peer->send_msg && peer->send_ev_active) {
 329             opal_event_del(&peer->send_event);
 330             peer->send_ev_active = false;
 331         }
 332         break;
 333     default:
 334         opal_output(0, "%s-%s mca_oob_tcp_peer_send_handler: invalid connection state (%d) on socket %d",
 335                     ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 336                     ORTE_NAME_PRINT(&(peer->name)),
 337                     peer->state, peer->sd);
 338         if (peer->send_ev_active) {
 339             opal_event_del(&peer->send_event);
 340             peer->send_ev_active = false;
 341         }
 342         break;
 343     }
 344 }
 345 
 346 static int read_bytes(mca_oob_tcp_peer_t* peer)
 347 {
 348     int rc;
 349 
 350     
 351     while (0 < peer->recv_msg->rdbytes) {
 352         rc = read(peer->sd, peer->recv_msg->rdptr, peer->recv_msg->rdbytes);
 353         if (rc < 0) {
 354             if(opal_socket_errno == EINTR) {
 355                 continue;
 356             } else if (opal_socket_errno == EAGAIN) {
 357                 
 358 
 359 
 360 
 361                 return ORTE_ERR_RESOURCE_BUSY;
 362             } else if (opal_socket_errno == EWOULDBLOCK) {
 363                 
 364 
 365 
 366 
 367                 return ORTE_ERR_WOULD_BLOCK;
 368             }
 369             
 370 
 371 
 372 
 373             opal_output_verbose(OOB_TCP_DEBUG_FAIL, orte_oob_base_framework.framework_output,
 374                                 "%s-%s mca_oob_tcp_msg_recv: readv failed: %s (%d)",
 375                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 376                                 ORTE_NAME_PRINT(&(peer->name)),
 377                                 strerror(opal_socket_errno),
 378                                 opal_socket_errno);
 379             
 380             
 381             
 382             
 383             return ORTE_ERR_COMM_FAILURE;
 384         } else if (rc == 0)  {
 385             
 386 
 387 
 388             opal_output_verbose(OOB_TCP_DEBUG_FAIL, orte_oob_base_framework.framework_output,
 389                                 "%s-%s mca_oob_tcp_msg_recv: peer closed connection",
 390                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 391                                 ORTE_NAME_PRINT(&(peer->name)));
 392             
 393             if (peer->recv_ev_active) {
 394                 opal_event_del(&peer->recv_event);
 395                 peer->recv_ev_active = false;
 396             }
 397             if (peer->timer_ev_active) {
 398                 opal_event_del(&peer->timer_event);
 399                 peer->timer_ev_active = false;
 400             }
 401             if (peer->send_ev_active) {
 402                 opal_event_del(&peer->send_event);
 403                 peer->send_ev_active = false;
 404             }
 405             if (NULL != peer->recv_msg) {
 406                 OBJ_RELEASE(peer->recv_msg);
 407                 peer->recv_msg = NULL;
 408             }
 409             mca_oob_tcp_peer_close(peer);
 410             
 411             
 412             
 413             return ORTE_ERR_WOULD_BLOCK;
 414         }
 415         
 416         peer->recv_msg->rdbytes -= rc;
 417         peer->recv_msg->rdptr += rc;
 418     }
 419 
 420     
 421     return ORTE_SUCCESS;
 422 }
 423 
 424 
 425 
 426 
 427 
 428 
 429 void mca_oob_tcp_recv_handler(int sd, short flags, void *cbdata)
 430 {
 431     mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t*)cbdata;
 432     int rc;
 433     orte_rml_send_t *snd;
 434 
 435     ORTE_ACQUIRE_OBJECT(peer);
 436 
 437     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 438                         "%s:tcp:recv:handler called for peer %s",
 439                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 440                         ORTE_NAME_PRINT(&peer->name));
 441 
 442     switch (peer->state) {
 443     case MCA_OOB_TCP_CONNECT_ACK:
 444         if (ORTE_SUCCESS == (rc = mca_oob_tcp_peer_recv_connect_ack(peer, peer->sd, NULL))) {
 445             opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 446                                 "%s:tcp:recv:handler starting send/recv events",
 447                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 448             
 449             if (!peer->recv_ev_active) {
 450                 peer->recv_ev_active = true;
 451                 ORTE_POST_OBJECT(peer);
 452                 opal_event_add(&peer->recv_event, 0);
 453             }
 454             if (peer->timer_ev_active) {
 455                 opal_event_del(&peer->timer_event);
 456                 peer->timer_ev_active = false;
 457             }
 458             
 459             if (NULL == peer->send_msg) {
 460                 peer->send_msg = (mca_oob_tcp_send_t*)opal_list_remove_first(&peer->send_queue);
 461             }
 462             if (NULL != peer->send_msg && !peer->send_ev_active) {
 463                 peer->send_ev_active = true;
 464                 ORTE_POST_OBJECT(peer);
 465                 opal_event_add(&peer->send_event, 0);
 466             }
 467             
 468             peer->state = MCA_OOB_TCP_CONNECTED;
 469         } else if (ORTE_ERR_UNREACH != rc) {
 470             
 471 
 472 
 473             opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 474                                 "%s UNABLE TO COMPLETE CONNECT ACK WITH %s",
 475                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 476                                 ORTE_NAME_PRINT(&peer->name));
 477             opal_event_del(&peer->recv_event);
 478             ORTE_FORCED_TERMINATE(1);
 479             return;
 480         }
 481         break;
 482     case MCA_OOB_TCP_CONNECTED:
 483         opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 484                             "%s:tcp:recv:handler CONNECTED",
 485                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 486         
 487         if (NULL == peer->recv_msg) {
 488             opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 489                                 "%s:tcp:recv:handler allocate new recv msg",
 490                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 491             peer->recv_msg = OBJ_NEW(mca_oob_tcp_recv_t);
 492             if (NULL == peer->recv_msg) {
 493                 opal_output(0, "%s-%s mca_oob_tcp_peer_recv_handler: unable to allocate recv message\n",
 494                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 495                             ORTE_NAME_PRINT(&(peer->name)));
 496                 return;
 497             }
 498             
 499             peer->recv_msg->rdptr = (char*)&peer->recv_msg->hdr;
 500             peer->recv_msg->rdbytes = sizeof(mca_oob_tcp_hdr_t);
 501         }
 502         
 503         if (!peer->recv_msg->hdr_recvd) {
 504             opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 505                                 "%s:tcp:recv:handler read hdr",
 506                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 507             if (ORTE_SUCCESS == (rc = read_bytes(peer))) {
 508                 
 509                 peer->recv_msg->hdr_recvd = true;
 510                 
 511                 MCA_OOB_TCP_HDR_NTOH(&peer->recv_msg->hdr);
 512                 
 513                 if (0 == peer->recv_msg->hdr.nbytes) {
 514                     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 515                                         "%s RECVD ZERO-BYTE MESSAGE FROM %s for tag %d",
 516                                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 517                                         ORTE_NAME_PRINT(&peer->name), peer->recv_msg->hdr.tag);
 518                     peer->recv_msg->data = NULL;  
 519                 } else {
 520                     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 521                                         "%s:tcp:recv:handler allocate data region of size %lu",
 522                                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (unsigned long)peer->recv_msg->hdr.nbytes);
 523                     
 524                     peer->recv_msg->data = (char*)malloc(peer->recv_msg->hdr.nbytes);
 525                     
 526                     peer->recv_msg->rdptr = peer->recv_msg->data;
 527                     peer->recv_msg->rdbytes = peer->recv_msg->hdr.nbytes;
 528                 }
 529                 
 530             } else if (ORTE_ERR_RESOURCE_BUSY == rc ||
 531                        ORTE_ERR_WOULD_BLOCK == rc) {
 532                 
 533                 return;
 534             } else {
 535                 
 536                 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 537                                     "%s:tcp:recv:handler error reading bytes - closing connection",
 538                                     ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 539                 mca_oob_tcp_peer_close(peer);
 540                 return;
 541             }
 542         }
 543 
 544         if (peer->recv_msg->hdr_recvd) {
 545             
 546 
 547 
 548 
 549             if (ORTE_SUCCESS == (rc = read_bytes(peer))) {
 550                 
 551                 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 552                                     "%s RECVD COMPLETE MESSAGE FROM %s (ORIGIN %s) OF %d BYTES FOR DEST %s TAG %d",
 553                                     ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 554                                     ORTE_NAME_PRINT(&peer->name),
 555                                     ORTE_NAME_PRINT(&peer->recv_msg->hdr.origin),
 556                                     (int)peer->recv_msg->hdr.nbytes,
 557                                     ORTE_NAME_PRINT(&peer->recv_msg->hdr.dst),
 558                                     peer->recv_msg->hdr.tag);
 559 
 560                 
 561                 if (peer->recv_msg->hdr.dst.jobid == ORTE_PROC_MY_NAME->jobid &&
 562                     peer->recv_msg->hdr.dst.vpid == ORTE_PROC_MY_NAME->vpid) {
 563                     
 564                     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 565                                         "%s DELIVERING TO RML tag = %d seq_num = %d",
 566                                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 567                                         peer->recv_msg->hdr.tag,
 568                                         peer->recv_msg->hdr.seq_num);
 569                     ORTE_RML_POST_MESSAGE(&peer->recv_msg->hdr.origin,
 570                                           peer->recv_msg->hdr.tag,
 571                                           peer->recv_msg->hdr.seq_num,
 572                                           peer->recv_msg->data,
 573                                           peer->recv_msg->hdr.nbytes);
 574                     OBJ_RELEASE(peer->recv_msg);
 575                 } else {
 576                     
 577 
 578                     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 579                                         "%s TCP PROMOTING ROUTED MESSAGE FOR %s TO OOB",
 580                                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 581                                         ORTE_NAME_PRINT(&peer->recv_msg->hdr.dst));
 582                     snd = OBJ_NEW(orte_rml_send_t);
 583                     snd->dst = peer->recv_msg->hdr.dst;
 584                     snd->origin = peer->recv_msg->hdr.origin;
 585                     snd->tag = peer->recv_msg->hdr.tag;
 586                     snd->data = peer->recv_msg->data;
 587                     snd->seq_num = peer->recv_msg->hdr.seq_num;
 588                     snd->count = peer->recv_msg->hdr.nbytes;
 589                     snd->cbfunc.iov = NULL;
 590                     snd->cbdata = NULL;
 591                     
 592                     ORTE_OOB_SEND(snd);
 593                     
 594                     peer->recv_msg->data = NULL;
 595                     
 596                     OBJ_RELEASE(peer->recv_msg);
 597                 }
 598                 peer->recv_msg = NULL;
 599                 return;
 600             } else if (ORTE_ERR_RESOURCE_BUSY == rc ||
 601                        ORTE_ERR_WOULD_BLOCK == rc) {
 602                 
 603                 return;
 604             } else {
 605                 
 606                 opal_output(0, "%s-%s mca_oob_tcp_peer_recv_handler: unable to recv message",
 607                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 608                             ORTE_NAME_PRINT(&(peer->name)));
 609                 
 610                 opal_event_del(&peer->recv_event);
 611                 ORTE_FORCED_TERMINATE(1);
 612                 return;
 613             }
 614         }
 615         break;
 616     default:
 617         opal_output(0, "%s-%s mca_oob_tcp_peer_recv_handler: invalid socket state(%d)",
 618                     ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 619                     ORTE_NAME_PRINT(&(peer->name)),
 620                     peer->state);
 621         
 622         break;
 623     }
 624 }
 625 
 626 static void snd_cons(mca_oob_tcp_send_t *ptr)
 627 {
 628     memset(&ptr->hdr, 0, sizeof(mca_oob_tcp_hdr_t));
 629     ptr->msg = NULL;
 630     ptr->data = NULL;
 631     ptr->hdr_sent = false;
 632     ptr->iovnum = 0;
 633     ptr->sdptr = NULL;
 634     ptr->sdbytes = 0;
 635 }
 636 
 637 
 638 
 639 
 640 
 641 
 642 static void snd_des(mca_oob_tcp_send_t *ptr)
 643 {
 644     if (NULL != ptr->data) {
 645         free(ptr->data);
 646     }
 647 }
 648 OBJ_CLASS_INSTANCE(mca_oob_tcp_send_t,
 649                    opal_list_item_t,
 650                    snd_cons, snd_des);
 651 
 652 static void rcv_cons(mca_oob_tcp_recv_t *ptr)
 653 {
 654     memset(&ptr->hdr, 0, sizeof(mca_oob_tcp_hdr_t));
 655     ptr->hdr_recvd = false;
 656     ptr->rdptr = NULL;
 657     ptr->rdbytes = 0;
 658 }
 659 OBJ_CLASS_INSTANCE(mca_oob_tcp_recv_t,
 660                    opal_list_item_t,
 661                    rcv_cons, NULL);
 662 
 663 static void err_cons(mca_oob_tcp_msg_error_t *ptr)
 664 {
 665     ptr->rmsg = NULL;
 666     ptr->snd = NULL;
 667 }
 668 OBJ_CLASS_INSTANCE(mca_oob_tcp_msg_error_t,
 669                    opal_object_t,
 670                    err_cons, NULL);