root/orte/mca/oob/tcp/oob_tcp_connection.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. tcp_peer_create_socket
  2. mca_oob_tcp_peer_try_connect
  3. tcp_peer_send_connect_ack
  4. tcp_peer_send_connect_nack
  5. tcp_peer_event_init
  6. mca_oob_tcp_peer_complete_connect
  7. tcp_peer_send_blocking
  8. retry
  9. mca_oob_tcp_peer_recv_connect_ack
  10. tcp_peer_connected
  11. mca_oob_tcp_peer_close
  12. tcp_peer_recv_blocking
  13. mca_oob_tcp_peer_dump
  14. mca_oob_tcp_peer_accept

   1 /*
   2  * Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2011 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2006-2013 Los Alamos National Security, LLC.
  13  *                         All rights reserved.
  14  * Copyright (c) 2009-2018 Cisco Systems, Inc.  All rights reserved
  15  * Copyright (c) 2011      Oak Ridge National Labs.  All rights reserved.
  16  * Copyright (c) 2013-2019 Intel, Inc.  All rights reserved.
  17  * Copyright (c) 2014-2015 Research Organization for Information Science
  18  *                         and Technology (RIST). All rights reserved.
  19  * Copyright (c) 2016      Mellanox Technologies Ltd. All rights reserved.
  20  * $COPYRIGHT$
  21  *
  22  * Additional copyrights may follow
  23  *
  24  * $HEADER$
  25  */
  26 
  27 #include "orte_config.h"
  28 
  29 #ifdef HAVE_UNISTD_H
  30 #include <unistd.h>
  31 #endif
  32 #include <fcntl.h>
  33 #include <sys/socket.h>
  34 
  35 #ifdef HAVE_SYS_UIO_H
  36 #include <sys/uio.h>
  37 #endif
  38 #ifdef HAVE_NET_UIO_H
  39 #include <net/uio.h>
  40 #endif
  41 #ifdef HAVE_SYS_TYPES_H
  42 #include <sys/types.h>
  43 #endif
  44 #include "opal/opal_socket_errno.h"
  45 #ifdef HAVE_NETINET_IN_H
  46 #include <netinet/in.h>
  47 #endif
  48 #ifdef HAVE_ARPA_INET_H
  49 #include <arpa/inet.h>
  50 #endif
  51 #ifdef HAVE_NETINET_TCP_H
  52 #include <netinet/tcp.h>
  53 #endif
  54 
  55 #include "opal/types.h"
  56 #include "opal_stdint.h"
  57 #include "opal/mca/backtrace/backtrace.h"
  58 #include "opal/mca/base/mca_base_var.h"
  59 #include "opal/util/output.h"
  60 #include "opal/util/net.h"
  61 #include "opal/util/fd.h"
  62 #include "opal/util/error.h"
  63 #include "opal/util/show_help.h"
  64 #include "opal/class/opal_hash_table.h"
  65 #include "opal/mca/event/event.h"
  66 
  67 #include "orte/util/name_fns.h"
  68 #include "orte/util/show_help.h"
  69 #include "orte/util/threads.h"
  70 #include "orte/mca/state/state.h"
  71 #include "orte/runtime/orte_globals.h"
  72 #include "orte/mca/errmgr/errmgr.h"
  73 #include "orte/mca/ess/ess.h"
  74 #include "orte/mca/routed/routed.h"
  75 #include "orte/runtime/orte_wait.h"
  76 
  77 #include "oob_tcp.h"
  78 #include "orte/mca/oob/tcp/oob_tcp_component.h"
  79 #include "orte/mca/oob/tcp/oob_tcp_peer.h"
  80 #include "orte/mca/oob/tcp/oob_tcp_common.h"
  81 #include "orte/mca/oob/tcp/oob_tcp_connection.h"
  82 #include "oob_tcp_peer.h"
  83 #include "oob_tcp_common.h"
  84 #include "oob_tcp_connection.h"
  85 
  86 static void tcp_peer_event_init(mca_oob_tcp_peer_t* peer);
  87 static int  tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer);
  88 static int tcp_peer_send_connect_nack(int sd, orte_process_name_t name);
  89 static int tcp_peer_send_blocking(int sd, void* data, size_t size);
  90 static bool tcp_peer_recv_blocking(mca_oob_tcp_peer_t* peer, int sd,
  91                                    void* data, size_t size);
  92 static void tcp_peer_connected(mca_oob_tcp_peer_t* peer);
  93 
  94 static int tcp_peer_create_socket(mca_oob_tcp_peer_t* peer, sa_family_t family)
  95 {
  96     int flags;
  97 
  98     if (peer->sd >= 0) {
  99         return ORTE_SUCCESS;
 100     }
 101 
 102     OPAL_OUTPUT_VERBOSE((1, orte_oob_base_framework.framework_output,
 103                          "%s oob:tcp:peer creating socket to %s",
 104                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 105                          ORTE_NAME_PRINT(&(peer->name))));
 106     peer->sd = socket(family, SOCK_STREAM, 0);
 107     if (peer->sd < 0) {
 108         opal_output(0, "%s-%s tcp_peer_create_socket: socket() failed: %s (%d)\n",
 109                     ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 110                     ORTE_NAME_PRINT(&(peer->name)),
 111                     strerror(opal_socket_errno),
 112                     opal_socket_errno);
 113         return ORTE_ERR_UNREACH;
 114     }
 115 
 116     /* Set this fd to be close-on-exec so that any subsequent children don't see it */
 117     if (opal_fd_set_cloexec(peer->sd) != OPAL_SUCCESS) {
 118         opal_output(0, "%s unable to set socket to CLOEXEC",
 119                     ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 120         close(peer->sd);
 121         peer->sd = -1;
 122         return ORTE_ERROR;
 123     }
 124 
 125     /* setup socket options */
 126     orte_oob_tcp_set_socket_options(peer->sd);
 127 
 128     /* setup event callbacks */
 129     tcp_peer_event_init(peer);
 130 
 131     /* setup the socket as non-blocking */
 132     if (peer->sd >= 0) {
 133         if((flags = fcntl(peer->sd, F_GETFL, 0)) < 0) {
 134             opal_output(0, "%s-%s tcp_peer_connect: fcntl(F_GETFL) failed: %s (%d)\n",
 135                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 136                         ORTE_NAME_PRINT(&(peer->name)),
 137                         strerror(opal_socket_errno),
 138                         opal_socket_errno);
 139         } else {
 140             flags |= O_NONBLOCK;
 141             if(fcntl(peer->sd, F_SETFL, flags) < 0)
 142                 opal_output(0, "%s-%s tcp_peer_connect: fcntl(F_SETFL) failed: %s (%d)\n",
 143                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 144                             ORTE_NAME_PRINT(&(peer->name)),
 145                             strerror(opal_socket_errno),
 146                             opal_socket_errno);
 147         }
 148     }
 149 
 150     return ORTE_SUCCESS;
 151 }
 152 
 153 
 154 /*
 155  * Try connecting to a peer - cycle across all known addresses
 156  * until one succeeds.
 157  */
 158 void mca_oob_tcp_peer_try_connect(int fd, short args, void *cbdata)
 159 {
 160     mca_oob_tcp_conn_op_t *op = (mca_oob_tcp_conn_op_t*)cbdata;
 161     mca_oob_tcp_peer_t *peer;
 162     int current_socket_family = 0;
 163     int rc;
 164     opal_socklen_t addrlen = 0;
 165     mca_oob_tcp_addr_t *addr;
 166     char *host;
 167     mca_oob_tcp_send_t *snd;
 168     bool connected = false;
 169 
 170     ORTE_ACQUIRE_OBJECT(op);
 171     peer = op->peer;
 172 
 173     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 174                         "%s orte_tcp_peer_try_connect: "
 175                         "attempting to connect to proc %s",
 176                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 177                         ORTE_NAME_PRINT(&(peer->name)));
 178 
 179     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 180                         "%s orte_tcp_peer_try_connect: "
 181                         "attempting to connect to proc %s on socket %d",
 182                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 183                         ORTE_NAME_PRINT(&(peer->name)), peer->sd);
 184 
 185     peer->active_addr = NULL;
 186     OPAL_LIST_FOREACH(addr, &peer->addrs, mca_oob_tcp_addr_t) {
 187         opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 188                             "%s orte_tcp_peer_try_connect: "
 189                             "attempting to connect to proc %s on %s:%d - %d retries",
 190                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 191                             ORTE_NAME_PRINT(&(peer->name)),
 192                             opal_net_get_hostname((struct sockaddr*)&addr->addr),
 193                             opal_net_get_port((struct sockaddr*)&addr->addr),
 194                             addr->retries);
 195         if (MCA_OOB_TCP_FAILED == addr->state) {
 196             opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 197                                 "%s orte_tcp_peer_try_connect: %s:%d is down",
 198                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 199                                 opal_net_get_hostname((struct sockaddr*)&addr->addr),
 200                                 opal_net_get_port((struct sockaddr*)&addr->addr));
 201             continue;
 202         }
 203         if (mca_oob_tcp_component.max_retries < addr->retries) {
 204             opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 205                                 "%s orte_tcp_peer_try_connect: %s:%d retries exceeded",
 206                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 207                                 opal_net_get_hostname((struct sockaddr*)&addr->addr),
 208                                 opal_net_get_port((struct sockaddr*)&addr->addr));
 209             continue;
 210         }
 211         peer->active_addr = addr;  // record the one we are using
 212         addrlen = addr->addr.ss_family == AF_INET6 ? sizeof(struct sockaddr_in6)
 213                                                    : sizeof(struct sockaddr_in);
 214         if (addr->addr.ss_family != current_socket_family) {
 215             if (peer->sd >= 0) {
 216                 CLOSE_THE_SOCKET(peer->sd);
 217                 peer->sd = -1;
 218             }
 219             rc = tcp_peer_create_socket(peer, addr->addr.ss_family);
 220             current_socket_family = addr->addr.ss_family;
 221 
 222             if (ORTE_SUCCESS != rc) {
 223                 /* FIXME: we cannot create a TCP socket - this spans
 224                  * all interfaces, so all we can do is report
 225                  * back to the component that this peer is
 226                  * unreachable so it can remove the peer
 227                  * from its list and report back to the base
 228                  * NOTE: this could be a reconnect attempt,
 229                  * so we also need to mark any queued messages
 230                  * and return them as "unreachable"
 231                  */
 232                 opal_output(0, "%s CANNOT CREATE SOCKET", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 233                 ORTE_FORCED_TERMINATE(1);
 234                 goto cleanup;
 235             }
 236         }
 237     retry_connect:
 238         addr->retries++;
 239 
 240         rc = connect(peer->sd, (struct sockaddr*) &addr->addr, addrlen);
 241         if (rc < 0) {
 242             /* non-blocking so wait for completion */
 243             if (opal_socket_errno == EINPROGRESS || opal_socket_errno == EWOULDBLOCK) {
 244                 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 245                                     "%s waiting for connect completion to %s - activating send event",
 246                                     ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 247                                     ORTE_NAME_PRINT(&peer->name));
 248                 /* just ensure the send_event is active */
 249                 if (!peer->send_ev_active) {
 250                     opal_event_add(&peer->send_event, 0);
 251                     peer->send_ev_active = true;
 252                 }
 253                 OBJ_RELEASE(op);
 254                 return;
 255             }
 256 
 257             /* Some kernels (Linux 2.6) will automatically software
 258                abort a connection that was ECONNREFUSED on the last
 259                attempt, without even trying to establish the
 260                connection.  Handle that case in a semi-rational
 261                way by trying twice before giving up */
 262             if (ECONNABORTED == opal_socket_errno) {
 263                 if (addr->retries < mca_oob_tcp_component.max_retries) {
 264                     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 265                                         "%s connection aborted by OS to %s - retrying",
 266                                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 267                                         ORTE_NAME_PRINT(&peer->name));
 268                     goto retry_connect;
 269                 } else {
 270                     /* We were unsuccessful in establishing this connection, and are
 271                      * not likely to suddenly become successful, so rotate to next option
 272                      */
 273                     addr->state = MCA_OOB_TCP_FAILED;
 274                     continue;
 275                 }
 276             }
 277         } else {
 278             /* connection succeeded */
 279             addr->retries = 0;
 280             connected = true;
 281             peer->num_retries = 0;
 282             break;
 283         }
 284     }
 285 
 286     if (!connected) {
 287         /* it could be that the intended recipient just hasn't
 288          * started yet. if requested, wait awhile and try again
 289          * unless/until we hit the maximum number of retries */
 290         if (0 < mca_oob_tcp_component.retry_delay) {
 291             if (mca_oob_tcp_component.max_recon_attempts < 0 ||
 292                 peer->num_retries < mca_oob_tcp_component.max_recon_attempts) {
 293                 struct timeval tv;
 294                 /* close the current socket */
 295                 CLOSE_THE_SOCKET(peer->sd);
 296                 /* reset the addr states */
 297                 OPAL_LIST_FOREACH(addr, &peer->addrs, mca_oob_tcp_addr_t) {
 298                     addr->state = MCA_OOB_TCP_UNCONNECTED;
 299                     addr->retries = 0;
 300                 }
 301                 /* give it awhile and try again */
 302                 tv.tv_sec = mca_oob_tcp_component.retry_delay;
 303                 tv.tv_usec = 0;
 304                 ++peer->num_retries;
 305                 ORTE_RETRY_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect, &tv);
 306                 goto cleanup;
 307             }
 308         }
 309         /* no address succeeded, so we cannot reach this peer */
 310         peer->state = MCA_OOB_TCP_FAILED;
 311         host = orte_get_proc_hostname(&(peer->name));
 312         if (NULL == host && NULL != peer->active_addr) {
 313             host = opal_net_get_hostname((struct sockaddr*)&(peer->active_addr->addr));
 314         }
 315         /* use an opal_output here instead of show_help as we may well
 316          * not be connected to the HNP at this point */
 317         opal_output(orte_clean_output,
 318                     "------------------------------------------------------------\n"
 319                     "A process or daemon was unable to complete a TCP connection\n"
 320                     "to another process:\n"
 321                     "  Local host:    %s\n"
 322                     "  Remote host:   %s\n"
 323                     "This is usually caused by a firewall on the remote host. Please\n"
 324                     "check that any firewall (e.g., iptables) has been disabled and\n"
 325                     "try again.\n"
 326                     "------------------------------------------------------------",
 327                     orte_process_info.nodename,
 328                     (NULL == host) ? "<unknown>" : host);
 329         /* close the socket */
 330         CLOSE_THE_SOCKET(peer->sd);
 331         /* let the TCP component know that this module failed to make
 332          * the connection so it can do some bookkeeping and fail back
 333          * to the OOB level so another component can try. This will activate
 334          * an event in the component event base, and so it will fire async
 335          * from us if we are in our own progress thread
 336          */
 337         ORTE_ACTIVATE_TCP_CMP_OP(peer, mca_oob_tcp_component_failed_to_connect);
 338         /* FIXME: post any messages in the send queue back to the OOB
 339          * level for reassignment
 340          */
 341         if (NULL != peer->send_msg) {
 342         }
 343         while (NULL != (snd = (mca_oob_tcp_send_t*)opal_list_remove_first(&peer->send_queue))) {
 344         }
 345         goto cleanup;
 346     }
 347 
 348     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 349                         "%s orte_tcp_peer_try_connect: "
 350                         "Connection to proc %s succeeded",
 351                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 352                         ORTE_NAME_PRINT(&peer->name));
 353 
 354     /* setup our recv to catch the return ack call */
 355     if (!peer->recv_ev_active) {
 356         opal_event_add(&peer->recv_event, 0);
 357         peer->recv_ev_active = true;
 358     }
 359 
 360     /* send our globally unique process identifier to the peer */
 361     if (ORTE_SUCCESS == (rc = tcp_peer_send_connect_ack(peer))) {
 362         peer->state = MCA_OOB_TCP_CONNECT_ACK;
 363     } else if (ORTE_ERR_UNREACH == rc) {
 364         /* this could happen if we are in a race condition where both
 365          * we and the peer are trying to connect at the same time. If I
 366          * am the higher vpid, then retry the connection - otherwise,
 367          * step aside for now */
 368         int cmpval = orte_util_compare_name_fields(ORTE_NS_CMP_ALL, ORTE_PROC_MY_NAME, &peer->name);
 369         if (OPAL_VALUE1_GREATER == cmpval) {
 370             peer->state = MCA_OOB_TCP_CONNECTING;
 371             ORTE_ACTIVATE_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect);
 372         } else {
 373             peer->state = MCA_OOB_TCP_UNCONNECTED;
 374         }
 375         /* close the socket */
 376         CLOSE_THE_SOCKET(peer->sd);
 377         return;
 378     } else {
 379         opal_output(0,
 380                     "%s orte_tcp_peer_try_connect: "
 381                     "tcp_peer_send_connect_ack to proc %s on %s:%d failed: %s (%d)",
 382                     ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 383                     ORTE_NAME_PRINT(&(peer->name)),
 384                     opal_net_get_hostname((struct sockaddr*)&addr->addr),
 385                     opal_net_get_port((struct sockaddr*)&addr->addr),
 386                     opal_strerror(rc),
 387                     rc);
 388         /* close the socket */
 389         CLOSE_THE_SOCKET(peer->sd);
 390         ORTE_FORCED_TERMINATE(1);
 391     }
 392 
 393  cleanup:
 394     OBJ_RELEASE(op);
 395 }
 396 
 397 /* send a handshake that includes our process identifier, our
 398  * version string, and a security token to ensure we are talking
 399  * to another OMPI process
 400  */
 401 static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer)
 402 {
 403     char *msg;
 404     mca_oob_tcp_hdr_t hdr;
 405     uint16_t ack_flag = htons(1);
 406     size_t sdsize, offset = 0;
 407 
 408     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 409                         "%s SEND CONNECT ACK", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 410 
 411     /* load the header */
 412     hdr.origin = *ORTE_PROC_MY_NAME;
 413     hdr.dst = peer->name;
 414     hdr.type = MCA_OOB_TCP_IDENT;
 415     hdr.tag = 0;
 416     hdr.seq_num = 0;
 417     memset(hdr.routed, 0, ORTE_MAX_RTD_SIZE+1);
 418 
 419     /* payload size */
 420     sdsize = sizeof(ack_flag) + strlen(orte_version_string) + 1;
 421     hdr.nbytes = sdsize;
 422     MCA_OOB_TCP_HDR_HTON(&hdr);
 423 
 424     /* create a space for our message */
 425     sdsize += sizeof(hdr);
 426     if (NULL == (msg = (char*)malloc(sdsize))) {
 427         return ORTE_ERR_OUT_OF_RESOURCE;
 428     }
 429     memset(msg, 0, sdsize);
 430 
 431     /* load the message */
 432     memcpy(msg + offset, &hdr, sizeof(hdr));
 433     offset += sizeof(hdr);
 434     memcpy(msg + offset, &ack_flag, sizeof(ack_flag));
 435     offset += sizeof(ack_flag);
 436     memcpy(msg + offset, orte_version_string, strlen(orte_version_string));
 437     offset += strlen(orte_version_string)+1;
 438 
 439     /* send it */
 440     if (ORTE_SUCCESS != tcp_peer_send_blocking(peer->sd, msg, sdsize)) {
 441         free(msg);
 442         peer->state = MCA_OOB_TCP_FAILED;
 443         mca_oob_tcp_peer_close(peer);
 444         return ORTE_ERR_UNREACH;
 445     }
 446     free(msg);
 447 
 448     return ORTE_SUCCESS;
 449 }
 450 
 451 /* send a handshake that includes our process identifier, our
 452  * version string, and a security token to ensure we are talking
 453  * to another OMPI process
 454  */
 455 static int tcp_peer_send_connect_nack(int sd, orte_process_name_t name)
 456 {
 457     char *msg;
 458     mca_oob_tcp_hdr_t hdr;
 459     uint16_t ack_flag = htons(0);
 460     int rc = ORTE_SUCCESS;
 461     size_t sdsize, offset = 0;
 462 
 463     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 464                         "%s SEND CONNECT NACK", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 465 
 466     /* load the header */
 467     hdr.origin = *ORTE_PROC_MY_NAME;
 468     hdr.dst = name;
 469     hdr.type = MCA_OOB_TCP_IDENT;
 470     hdr.tag = 0;
 471     hdr.seq_num = 0;
 472     memset(hdr.routed, 0, ORTE_MAX_RTD_SIZE+1);
 473 
 474     /* payload size */
 475     sdsize = sizeof(ack_flag);
 476     hdr.nbytes = sdsize;
 477     MCA_OOB_TCP_HDR_HTON(&hdr);
 478 
 479     /* create a space for our message */
 480     sdsize += sizeof(hdr);
 481     if (NULL == (msg = (char*)malloc(sdsize))) {
 482         return ORTE_ERR_OUT_OF_RESOURCE;
 483     }
 484     memset(msg, 0, sdsize);
 485 
 486     /* load the message */
 487     memcpy(msg + offset, &hdr, sizeof(hdr));
 488     offset += sizeof(hdr);
 489     memcpy(msg + offset, &ack_flag, sizeof(ack_flag));
 490     offset += sizeof(ack_flag);
 491 
 492     /* send it */
 493     if (ORTE_SUCCESS != tcp_peer_send_blocking(sd, msg, sdsize)) {
 494         /* it's ok if it fails - remote side may already
 495          * identifiet the collision and closed the connection
 496          */
 497         rc = ORTE_SUCCESS;
 498     }
 499     free(msg);
 500     return rc;
 501 }
 502 
 503 /*
 504  * Initialize events to be used by the peer instance for TCP select/poll callbacks.
 505  */
 506 static void tcp_peer_event_init(mca_oob_tcp_peer_t* peer)
 507 {
 508     if (peer->sd >= 0) {
 509         assert(!peer->send_ev_active && !peer->recv_ev_active);
 510         if (NULL == peer->ev_base) {
 511             ORTE_OOB_TCP_NEXT_BASE(peer);
 512         }
 513         opal_event_set(peer->ev_base,
 514                        &peer->recv_event,
 515                        peer->sd,
 516                        OPAL_EV_READ|OPAL_EV_PERSIST,
 517                        mca_oob_tcp_recv_handler,
 518                        peer);
 519         opal_event_set_priority(&peer->recv_event, ORTE_MSG_PRI);
 520         if (peer->recv_ev_active) {
 521             opal_event_del(&peer->recv_event);
 522             peer->recv_ev_active = false;
 523         }
 524 
 525         opal_event_set(peer->ev_base,
 526                        &peer->send_event,
 527                        peer->sd,
 528                        OPAL_EV_WRITE|OPAL_EV_PERSIST,
 529                        mca_oob_tcp_send_handler,
 530                        peer);
 531         opal_event_set_priority(&peer->send_event, ORTE_MSG_PRI);
 532         if (peer->send_ev_active) {
 533             opal_event_del(&peer->send_event);
 534             peer->send_ev_active = false;
 535         }
 536     }
 537 }
 538 
 539 /*
 540  * Check the status of the connection. If the connection failed, will retry
 541  * later. Otherwise, send this processes identifier to the peer on the
 542  * newly connected socket.
 543  */
 544 void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_peer_t *peer)
 545 {
 546     int so_error = 0;
 547     opal_socklen_t so_length = sizeof(so_error);
 548 
 549     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 550                         "%s:tcp:complete_connect called for peer %s on socket %d",
 551                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 552                         ORTE_NAME_PRINT(&peer->name), peer->sd);
 553 
 554     /* check connect completion status */
 555     if (getsockopt(peer->sd, SOL_SOCKET, SO_ERROR, (char *)&so_error, &so_length) < 0) {
 556         opal_output(0, "%s tcp_peer_complete_connect: getsockopt() to %s failed: %s (%d)\n",
 557                     ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 558                     ORTE_NAME_PRINT(&(peer->name)),
 559                     strerror(opal_socket_errno),
 560                     opal_socket_errno);
 561         peer->state = MCA_OOB_TCP_FAILED;
 562         mca_oob_tcp_peer_close(peer);
 563         return;
 564     }
 565 
 566     if (so_error == EINPROGRESS) {
 567         opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 568                             "%s:tcp:send:handler still in progress",
 569                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 570         return;
 571     } else if (so_error == ECONNREFUSED || so_error == ETIMEDOUT) {
 572         opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 573                             "%s-%s tcp_peer_complete_connect: connection failed: %s (%d)",
 574                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 575                             ORTE_NAME_PRINT(&(peer->name)),
 576                             strerror(so_error),
 577                             so_error);
 578         mca_oob_tcp_peer_close(peer);
 579         return;
 580     } else if (so_error != 0) {
 581         /* No need to worry about the return code here - we return regardless
 582            at this point, and if an error did occur a message has already been
 583            printed for the user */
 584         opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 585                             "%s-%s tcp_peer_complete_connect: "
 586                             "connection failed with error %d",
 587                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 588                             ORTE_NAME_PRINT(&(peer->name)), so_error);
 589         mca_oob_tcp_peer_close(peer);
 590         return;
 591     }
 592 
 593     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 594                         "%s tcp_peer_complete_connect: "
 595                         "sending ack to %s",
 596                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 597                         ORTE_NAME_PRINT(&(peer->name)));
 598 
 599     if (tcp_peer_send_connect_ack(peer) == ORTE_SUCCESS) {
 600         peer->state = MCA_OOB_TCP_CONNECT_ACK;
 601         opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 602                             "%s tcp_peer_complete_connect: "
 603                             "setting read event on connection to %s",
 604                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 605                             ORTE_NAME_PRINT(&(peer->name)));
 606 
 607         if (!peer->recv_ev_active) {
 608             peer->recv_ev_active = true;
 609             ORTE_POST_OBJECT(peer);
 610             opal_event_add(&peer->recv_event, 0);
 611         }
 612     } else {
 613         opal_output(0, "%s tcp_peer_complete_connect: unable to send connect ack to %s",
 614                     ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 615                     ORTE_NAME_PRINT(&(peer->name)));
 616         peer->state = MCA_OOB_TCP_FAILED;
 617         mca_oob_tcp_peer_close(peer);
 618     }
 619 }
 620 
 621 /*
 622  * A blocking send on a non-blocking socket. Used to send the small amount of connection
 623  * information that identifies the peers endpoint.
 624  */
 625 static int tcp_peer_send_blocking(int sd, void* data, size_t size)
 626 {
 627     unsigned char* ptr = (unsigned char*)data;
 628     size_t cnt = 0;
 629     int retval;
 630 
 631     ORTE_ACQUIRE_OBJECT(ptr);
 632 
 633     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 634                         "%s send blocking of %"PRIsize_t" bytes to socket %d",
 635                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 636                         size, sd);
 637 
 638     while (cnt < size) {
 639         retval = send(sd, (char*)ptr+cnt, size-cnt, 0);
 640         if (retval < 0) {
 641             if (opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {
 642                 opal_output(0, "%s tcp_peer_send_blocking: send() to socket %d failed: %s (%d)\n",
 643                     ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), sd,
 644                     strerror(opal_socket_errno),
 645                     opal_socket_errno);
 646                 return ORTE_ERR_UNREACH;
 647             }
 648             continue;
 649         }
 650         cnt += retval;
 651     }
 652 
 653     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 654                         "%s blocking send complete to socket %d",
 655                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), sd);
 656 
 657     return ORTE_SUCCESS;
 658 }
 659 
 660 /*
 661  *  Receive the peers globally unique process identification from a newly
 662  *  connected socket and verify the expected response. If so, move the
 663  *  socket to a connected state.
 664  */
 665 static bool retry(mca_oob_tcp_peer_t* peer, int sd, bool fatal)
 666 {
 667     int cmpval;
 668 
 669     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 670                         "%s SIMUL CONNECTION WITH %s",
 671                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 672                         ORTE_NAME_PRINT(&peer->name));
 673     cmpval = orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &peer->name, ORTE_PROC_MY_NAME);
 674     if (fatal) {
 675         if (peer->send_ev_active) {
 676             opal_event_del(&peer->send_event);
 677             peer->send_ev_active = false;
 678         }
 679         if (peer->recv_ev_active) {
 680             opal_event_del(&peer->recv_event);
 681             peer->recv_ev_active = false;
 682         }
 683         if (0 <= peer->sd) {
 684             CLOSE_THE_SOCKET(peer->sd);
 685             peer->sd = -1;
 686         }
 687         if (OPAL_VALUE1_GREATER == cmpval) {
 688             /* force the other end to retry the connection */
 689             peer->state = MCA_OOB_TCP_UNCONNECTED;
 690         } else {
 691             /* retry the connection */
 692             peer->state = MCA_OOB_TCP_CONNECTING;
 693             ORTE_ACTIVATE_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect);
 694         }
 695         return true;
 696     } else {
 697         if (OPAL_VALUE1_GREATER == cmpval) {
 698             /* The other end will retry the connection */
 699             if (peer->send_ev_active) {
 700                 opal_event_del(&peer->send_event);
 701                 peer->send_ev_active = false;
 702             }
 703             if (peer->recv_ev_active) {
 704                 opal_event_del(&peer->recv_event);
 705                 peer->recv_ev_active = false;
 706             }
 707             CLOSE_THE_SOCKET(peer->sd);
 708             peer->state = MCA_OOB_TCP_UNCONNECTED;
 709             return false;
 710         } else {
 711             /* The connection will be retried */
 712             tcp_peer_send_connect_nack(sd, peer->name);
 713             CLOSE_THE_SOCKET(sd);
 714             return true;
 715         }
 716     }
 717 }
 718 
 719 
 720 int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
 721                                       int sd, mca_oob_tcp_hdr_t *dhdr)
 722 {
 723     char *msg;
 724     char *version;
 725     size_t offset = 0;
 726     mca_oob_tcp_hdr_t hdr;
 727     mca_oob_tcp_peer_t *peer;
 728     uint64_t *ui64;
 729     uint16_t ack_flag;
 730     bool is_new = (NULL == pr);
 731 
 732     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 733                         "%s RECV CONNECT ACK FROM %s ON SOCKET %d",
 734                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 735                         (NULL == pr) ? "UNKNOWN" : ORTE_NAME_PRINT(&pr->name), sd);
 736 
 737     peer = pr;
 738     /* get the header */
 739     if (tcp_peer_recv_blocking(peer, sd, &hdr, sizeof(mca_oob_tcp_hdr_t))) {
 740         if (NULL != peer) {
 741             /* If the peer state is CONNECT_ACK, then we were waiting for
 742              * the connection to be ack'd
 743              */
 744             if (peer->state != MCA_OOB_TCP_CONNECT_ACK) {
 745                 /* handshake broke down - abort this connection */
 746                 opal_output(0, "%s RECV CONNECT BAD HANDSHAKE (%d) FROM %s ON SOCKET %d",
 747                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), peer->state,
 748                             ORTE_NAME_PRINT(&(peer->name)), sd);
 749                 mca_oob_tcp_peer_close(peer);
 750                 return ORTE_ERR_UNREACH;
 751             }
 752         }
 753     } else {
 754         /* unable to complete the recv */
 755         opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 756                             "%s unable to complete recv of connect-ack from %s ON SOCKET %d",
 757                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 758                             (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&peer->name), sd);
 759         return ORTE_ERR_UNREACH;
 760     }
 761 
 762     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 763                         "%s connect-ack recvd from %s",
 764                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 765                         (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&peer->name));
 766 
 767     /* convert the header */
 768     MCA_OOB_TCP_HDR_NTOH(&hdr);
 769     /* if the requestor wanted the header returned, then do so now */
 770     if (NULL != dhdr) {
 771         *dhdr = hdr;
 772     }
 773 
 774     if (MCA_OOB_TCP_PROBE == hdr.type) {
 775         /* send a header back */
 776         hdr.type = MCA_OOB_TCP_PROBE;
 777         hdr.dst = hdr.origin;
 778         hdr.origin = *ORTE_PROC_MY_NAME;
 779         MCA_OOB_TCP_HDR_HTON(&hdr);
 780         tcp_peer_send_blocking(sd, &hdr, sizeof(mca_oob_tcp_hdr_t));
 781         CLOSE_THE_SOCKET(sd);
 782         return ORTE_SUCCESS;
 783     }
 784 
 785     if (hdr.type != MCA_OOB_TCP_IDENT) {
 786         opal_output(0, "tcp_peer_recv_connect_ack: invalid header type: %d\n",
 787                     hdr.type);
 788         if (NULL != peer) {
 789             peer->state = MCA_OOB_TCP_FAILED;
 790             mca_oob_tcp_peer_close(peer);
 791         } else {
 792             CLOSE_THE_SOCKET(sd);
 793         }
 794         return ORTE_ERR_COMM_FAILURE;
 795     }
 796 
 797     /* if we don't already have it, get the peer */
 798     if (NULL == peer) {
 799         peer = mca_oob_tcp_peer_lookup(&hdr.origin);
 800         if (NULL == peer) {
 801             opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 802                                 "%s mca_oob_tcp_recv_connect: connection from new peer",
 803                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 804             peer = OBJ_NEW(mca_oob_tcp_peer_t);
 805             peer->name = hdr.origin;
 806             ORTE_OOB_TCP_NEXT_BASE(peer);  // assign it an event base
 807             peer->state = MCA_OOB_TCP_ACCEPTING;
 808             ui64 = (uint64_t*)(&peer->name);
 809             if (OPAL_SUCCESS != opal_hash_table_set_value_uint64(&mca_oob_tcp_component.peers, (*ui64), peer)) {
 810                 OBJ_RELEASE(peer);
 811                 CLOSE_THE_SOCKET(sd);
 812                 return ORTE_ERR_OUT_OF_RESOURCE;
 813             }
 814         }
 815     } else {
 816         /* compare the peers name to the expected value */
 817         if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &peer->name, &hdr.origin)) {
 818             opal_output(0, "%s tcp_peer_recv_connect_ack: "
 819                         "received unexpected process identifier %s from %s\n",
 820                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 821                         ORTE_NAME_PRINT(&(hdr.origin)),
 822                         ORTE_NAME_PRINT(&(peer->name)));
 823             peer->state = MCA_OOB_TCP_FAILED;
 824             mca_oob_tcp_peer_close(peer);
 825             return ORTE_ERR_CONNECTION_REFUSED;
 826         }
 827     }
 828 
 829     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 830                         "%s connect-ack header from %s is okay",
 831                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 832                         ORTE_NAME_PRINT(&peer->name));
 833 
 834     /* get the authentication and version payload */
 835     if (NULL == (msg = (char*)malloc(hdr.nbytes))) {
 836         peer->state = MCA_OOB_TCP_FAILED;
 837         mca_oob_tcp_peer_close(peer);
 838         return ORTE_ERR_OUT_OF_RESOURCE;
 839     }
 840     if (!tcp_peer_recv_blocking(peer, sd, msg, hdr.nbytes)) {
 841         /* unable to complete the recv but should never happen */
 842         opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 843                             "%s unable to complete recv of connect-ack from %s ON SOCKET %d",
 844                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 845                             ORTE_NAME_PRINT(&peer->name), peer->sd);
 846         free(msg);
 847         return ORTE_ERR_UNREACH;
 848     }
 849 
 850     /* Check the type of acknowledgement */
 851     memcpy(&ack_flag, msg + offset, sizeof(ack_flag));
 852     offset += sizeof(ack_flag);
 853 
 854     ack_flag = ntohs(ack_flag);
 855     if( !ack_flag ){
 856         if (MCA_OOB_TCP_CONNECT_ACK == peer->state) {
 857             /* We got nack from the remote side which means that
 858              * it will be the initiator of the connection.
 859              */
 860 
 861             /* release the socket */
 862             CLOSE_THE_SOCKET(peer->sd);
 863             peer->sd = -1;
 864 
 865             /* unregister active events */
 866             if (peer->recv_ev_active) {
 867                 opal_event_del(&peer->recv_event);
 868                 peer->recv_ev_active = false;
 869             }
 870             if (peer->send_ev_active) {
 871                 opal_event_del(&peer->send_event);
 872                 peer->send_ev_active = false;
 873             }
 874 
 875             /* change the state so we'll accept the remote
 876              * connection when it'll apeear
 877              */
 878             peer->state = MCA_OOB_TCP_UNCONNECTED;
 879         } else {
 880             /* FIXME: this shouldn't happen. We need to force next address
 881              * to be tried.
 882              */
 883             mca_oob_tcp_peer_close(peer);
 884         }
 885         free(msg);
 886         return ORTE_ERR_UNREACH;
 887     }
 888 
 889     /* check for a race condition - if I was in the process of
 890      * creating a connection to the peer, or have already established
 891      * such a connection, then we need to reject this connection. We will
 892      * let the higher ranked process retry - if I'm the lower ranked
 893      * process, I'll simply defer until I receive the request
 894      */
 895     if (is_new &&
 896         ( MCA_OOB_TCP_CONNECTED == peer->state ||
 897           MCA_OOB_TCP_CONNECTING == peer->state ||
 898          MCA_OOB_TCP_CONNECT_ACK == peer->state ) ) {
 899         if (retry(peer, sd, false)) {
 900             free(msg);
 901             return ORTE_ERR_UNREACH;
 902         }
 903     }
 904 
 905     /* check that this is from a matching version */
 906     version = (char*)((char*)msg + offset);
 907     offset += strlen(version) + 1;
 908     if (0 != strcmp(version, orte_version_string)) {
 909         opal_show_help("help-oob-tcp.txt", "version mismatch",
 910                        true,
 911                        opal_process_info.nodename,
 912                        ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 913                        orte_version_string,
 914                        opal_fd_get_peer_name(peer->sd),
 915                        ORTE_NAME_PRINT(&(peer->name)),
 916                        version);
 917 
 918         peer->state = MCA_OOB_TCP_FAILED;
 919         mca_oob_tcp_peer_close(peer);
 920         free(msg);
 921         return ORTE_ERR_CONNECTION_REFUSED;
 922     }
 923     free(msg);
 924 
 925     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 926                         "%s connect-ack version from %s matches ours",
 927                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 928                         ORTE_NAME_PRINT(&peer->name));
 929 
 930     /* if the requestor wanted the header returned, then they
 931      * will complete their processing
 932      */
 933     if (NULL != dhdr) {
 934         return ORTE_SUCCESS;
 935     }
 936 
 937     /* set the peer into the component and OOB-level peer tables to indicate
 938      * that we know this peer and we will be handling him
 939      */
 940     ORTE_ACTIVATE_TCP_CMP_OP(peer, mca_oob_tcp_component_set_module);
 941 
 942     /* connected */
 943     tcp_peer_connected(peer);
 944     if (OOB_TCP_DEBUG_CONNECT <= opal_output_get_verbosity(orte_oob_base_framework.framework_output)) {
 945         mca_oob_tcp_peer_dump(peer, "connected");
 946     }
 947     return ORTE_SUCCESS;
 948 }
 949 
 950 /*
 951  *  Setup peer state to reflect that connection has been established,
 952  *  and start any pending sends.
 953  */
 954 static void tcp_peer_connected(mca_oob_tcp_peer_t* peer)
 955 {
 956     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 957                         "%s-%s tcp_peer_connected on socket %d",
 958                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 959                         ORTE_NAME_PRINT(&(peer->name)), peer->sd);
 960 
 961     if (peer->timer_ev_active) {
 962         opal_event_del(&peer->timer_event);
 963         peer->timer_ev_active = false;
 964     }
 965     peer->state = MCA_OOB_TCP_CONNECTED;
 966     if (NULL != peer->active_addr) {
 967         peer->active_addr->retries = 0;
 968     }
 969 
 970     /* update the route */
 971     orte_routed.update_route(&peer->name, &peer->name);
 972 
 973     /* initiate send of first message on queue */
 974     if (NULL == peer->send_msg) {
 975         peer->send_msg = (mca_oob_tcp_send_t*)
 976             opal_list_remove_first(&peer->send_queue);
 977     }
 978     if (NULL != peer->send_msg && !peer->send_ev_active) {
 979         peer->send_ev_active = true;
 980         ORTE_POST_OBJECT(peer);
 981         opal_event_add(&peer->send_event, 0);
 982     }
 983 }
 984 
 985 /*
 986  * Remove any event registrations associated with the socket
 987  * and update the peer state to reflect the connection has
 988  * been closed.
 989  */
 990 void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t *peer)
 991 {
 992     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 993                         "%s tcp_peer_close for %s sd %d state %s",
 994                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 995                         ORTE_NAME_PRINT(&(peer->name)),
 996                         peer->sd, mca_oob_tcp_state_print(peer->state));
 997 
 998     /* release the socket */
 999     close(peer->sd);
1000     peer->sd = -1;
1001 
1002     /* if we were CONNECTING, then we need to mark the address as
1003      * failed and cycle back to try the next address */
1004     if (MCA_OOB_TCP_CONNECTING == peer->state) {
1005         if (NULL != peer->active_addr) {
1006             peer->active_addr->state = MCA_OOB_TCP_FAILED;
1007         }
1008         ORTE_ACTIVATE_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect);
1009         return;
1010     }
1011 
1012     peer->state = MCA_OOB_TCP_CLOSED;
1013     if (NULL != peer->active_addr) {
1014         peer->active_addr->state = MCA_OOB_TCP_CLOSED;
1015     }
1016 
1017     /* unregister active events */
1018     if (peer->recv_ev_active) {
1019         opal_event_del(&peer->recv_event);
1020         peer->recv_ev_active = false;
1021     }
1022     if (peer->send_ev_active) {
1023         opal_event_del(&peer->send_event);
1024         peer->send_ev_active = false;
1025     }
1026 
1027     /* inform the component-level that we have lost a connection so
1028      * it can decide what to do about it.
1029      */
1030     ORTE_ACTIVATE_TCP_CMP_OP(peer, mca_oob_tcp_component_lost_connection);
1031 
1032     if (orte_orteds_term_ordered || orte_finalizing || orte_abnormal_term_ordered) {
1033         /* nothing more to do */
1034         return;
1035     }
1036 
1037     /* FIXME: push any queued messages back onto the OOB for retry - note that
1038      * this must be done after the prior call to ensure that the component
1039      * processes the "lost connection" notice before the OOB begins to
1040      * handle these recycled messages. This prevents us from unintentionally
1041      * attempting to send the message again across the now-failed interface
1042      */
1043     /*
1044     if (NULL != peer->send_msg) {
1045     }
1046     while (NULL != (snd = (mca_oob_tcp_send_t*)opal_list_remove_first(&peer->send_queue))) {
1047     }
1048     */
1049 }
1050 
1051 /*
1052  * A blocking recv on a non-blocking socket. Used to receive the small amount of connection
1053  * information that identifies the peers endpoint.
1054  */
1055 static bool tcp_peer_recv_blocking(mca_oob_tcp_peer_t* peer, int sd,
1056                                    void* data, size_t size)
1057 {
1058     unsigned char* ptr = (unsigned char*)data;
1059     size_t cnt = 0;
1060 
1061     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
1062                         "%s waiting for connect ack from %s",
1063                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1064                         (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&(peer->name)));
1065 
1066     while (cnt < size) {
1067         int retval = recv(sd, (char *)ptr+cnt, size-cnt, 0);
1068 
1069         /* remote closed connection */
1070         if (retval == 0) {
1071             opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
1072                                 "%s-%s tcp_peer_recv_blocking: "
1073                                 "peer closed connection: peer state %d",
1074                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1075                                 (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&(peer->name)),
1076                                 (NULL == peer) ? 0 : peer->state);
1077             if (NULL != peer) {
1078                 mca_oob_tcp_peer_close(peer);
1079             } else {
1080                 CLOSE_THE_SOCKET(sd);
1081             }
1082             return false;
1083         }
1084 
1085         /* socket is non-blocking so handle errors */
1086         if (retval < 0) {
1087             if (opal_socket_errno != EINTR &&
1088                 opal_socket_errno != EAGAIN &&
1089                 opal_socket_errno != EWOULDBLOCK) {
1090                 if (NULL == peer) {
1091                     /* protect against things like port scanners */
1092                     CLOSE_THE_SOCKET(sd);
1093                     return false;
1094                 } else if (peer->state == MCA_OOB_TCP_CONNECT_ACK) {
1095                     /* If we overflow the listen backlog, it's
1096                        possible that even though we finished the three
1097                        way handshake, the remote host was unable to
1098                        transition the connection from half connected
1099                        (received the initial SYN) to fully connected
1100                        (in the listen backlog).  We likely won't see
1101                        the failure until we try to receive, due to
1102                        timing and the like.  The first thing we'll get
1103                        in that case is a RST packet, which receive
1104                        will turn into a connection reset by peer
1105                        errno.  In that case, leave the socket in
1106                        CONNECT_ACK and propogate the error up to
1107                        recv_connect_ack, who will try to establish the
1108                        connection again */
1109                     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
1110                                         "%s connect ack received error %s from %s",
1111                                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1112                                         strerror(opal_socket_errno),
1113                                         ORTE_NAME_PRINT(&(peer->name)));
1114                     return false;
1115                 } else {
1116                     opal_output(0,
1117                                 "%s tcp_peer_recv_blocking: "
1118                                 "recv() failed for %s: %s (%d)\n",
1119                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1120                                 ORTE_NAME_PRINT(&(peer->name)),
1121                                 strerror(opal_socket_errno),
1122                                 opal_socket_errno);
1123                     peer->state = MCA_OOB_TCP_FAILED;
1124                     mca_oob_tcp_peer_close(peer);
1125                     return false;
1126                 }
1127             }
1128             continue;
1129         }
1130         cnt += retval;
1131     }
1132 
1133     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
1134                         "%s connect ack received from %s",
1135                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1136                         (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&(peer->name)));
1137     return true;
1138 }
1139 
1140 /*
1141  * Routine for debugging to print the connection state and socket options
1142  */
1143 void mca_oob_tcp_peer_dump(mca_oob_tcp_peer_t* peer, const char* msg)
1144 {
1145     char src[64];
1146     char dst[64];
1147     char buff[255];
1148     int sndbuf,rcvbuf,nodelay,flags;
1149     struct sockaddr_storage inaddr;
1150     opal_socklen_t addrlen = sizeof(struct sockaddr_storage);
1151     opal_socklen_t optlen;
1152 
1153     if (getsockname(peer->sd, (struct sockaddr*)&inaddr, &addrlen) < 0) {
1154         opal_output(0, "tcp_peer_dump: getsockname: %s (%d)\n",
1155                     strerror(opal_socket_errno),
1156                     opal_socket_errno);
1157     } else {
1158         snprintf(src, sizeof(src), "%s", opal_net_get_hostname((struct sockaddr*) &inaddr));
1159     }
1160     if (getpeername(peer->sd, (struct sockaddr*)&inaddr, &addrlen) < 0) {
1161         opal_output(0, "tcp_peer_dump: getpeername: %s (%d)\n",
1162                     strerror(opal_socket_errno),
1163                     opal_socket_errno);
1164     } else {
1165         snprintf(dst, sizeof(dst), "%s", opal_net_get_hostname((struct sockaddr*) &inaddr));
1166     }
1167 
1168     if ((flags = fcntl(peer->sd, F_GETFL, 0)) < 0) {
1169         opal_output(0, "tcp_peer_dump: fcntl(F_GETFL) failed: %s (%d)\n",
1170                     strerror(opal_socket_errno),
1171                     opal_socket_errno);
1172     }
1173 
1174 #if defined(SO_SNDBUF)
1175     optlen = sizeof(sndbuf);
1176     if(getsockopt(peer->sd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &optlen) < 0) {
1177         opal_output(0, "tcp_peer_dump: SO_SNDBUF option: %s (%d)\n",
1178                     strerror(opal_socket_errno),
1179                     opal_socket_errno);
1180     }
1181 #else
1182     sndbuf = -1;
1183 #endif
1184 #if defined(SO_RCVBUF)
1185     optlen = sizeof(rcvbuf);
1186     if (getsockopt(peer->sd, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf, &optlen) < 0) {
1187         opal_output(0, "tcp_peer_dump: SO_RCVBUF option: %s (%d)\n",
1188                     strerror(opal_socket_errno),
1189                     opal_socket_errno);
1190     }
1191 #else
1192     rcvbuf = -1;
1193 #endif
1194 #if defined(TCP_NODELAY)
1195     optlen = sizeof(nodelay);
1196     if (getsockopt(peer->sd, IPPROTO_TCP, TCP_NODELAY, (char *)&nodelay, &optlen) < 0) {
1197         opal_output(0, "tcp_peer_dump: TCP_NODELAY option: %s (%d)\n",
1198                     strerror(opal_socket_errno),
1199                     opal_socket_errno);
1200     }
1201 #else
1202     nodelay = 0;
1203 #endif
1204 
1205     snprintf(buff, sizeof(buff), "%s-%s %s: %s - %s nodelay %d sndbuf %d rcvbuf %d flags %08x\n",
1206         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1207         ORTE_NAME_PRINT(&(peer->name)),
1208         msg, src, dst, nodelay, sndbuf, rcvbuf, flags);
1209     opal_output(0, "%s", buff);
1210 }
1211 
1212 /*
1213  * Accept incoming connection - if not already connected
1214  */
1215 
1216 bool mca_oob_tcp_peer_accept(mca_oob_tcp_peer_t* peer)
1217 {
1218     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
1219                         "%s tcp:peer_accept called for peer %s in state %s on socket %d",
1220                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1221                         ORTE_NAME_PRINT(&peer->name),
1222                         mca_oob_tcp_state_print(peer->state), peer->sd);
1223 
1224     if (peer->state != MCA_OOB_TCP_CONNECTED) {
1225 
1226         tcp_peer_event_init(peer);
1227 
1228         if (tcp_peer_send_connect_ack(peer) != ORTE_SUCCESS) {
1229             opal_output(0, "%s-%s tcp_peer_accept: "
1230                         "tcp_peer_send_connect_ack failed\n",
1231                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1232                         ORTE_NAME_PRINT(&(peer->name)));
1233             peer->state = MCA_OOB_TCP_FAILED;
1234             mca_oob_tcp_peer_close(peer);
1235             return false;
1236         }
1237 
1238         /* set the peer into the component and OOB-level peer tables to indicate
1239          * that we know this peer and we will be handling him
1240          */
1241         ORTE_ACTIVATE_TCP_CMP_OP(peer, mca_oob_tcp_component_set_module);
1242 
1243         tcp_peer_connected(peer);
1244         if (!peer->recv_ev_active) {
1245             peer->recv_ev_active = true;
1246             ORTE_POST_OBJECT(peer);
1247             opal_event_add(&peer->recv_event, 0);
1248         }
1249         if (OOB_TCP_DEBUG_CONNECT <= opal_output_get_verbosity(orte_oob_base_framework.framework_output)) {
1250             mca_oob_tcp_peer_dump(peer, "accepted");
1251         }
1252         return true;
1253     }
1254 
1255     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
1256                         "%s tcp:peer_accept ignored for peer %s in state %s on socket %d",
1257                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1258                         ORTE_NAME_PRINT(&peer->name),
1259                         mca_oob_tcp_state_print(peer->state), peer->sd);
1260     return false;
1261 }

/* [<][>][^][v][top][bottom][index][help] */