root/orte/mca/oob/tcp/oob_tcp.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. accept_connection
  2. ping
  3. send_nb
  4. recv_handler
  5. ft_event
  6. ft_event

   1 /*
   2  * Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2011 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2006-2013 Los Alamos National Security, LLC.
  13  *                         All rights reserved.
  14  * Copyright (c) 2009-2012 Cisco Systems, Inc.  All rights reserved.
  15  * Copyright (c) 2011      Oak Ridge National Labs.  All rights reserved.
  16  * Copyright (c) 2013-2019 Intel, Inc.  All rights reserved.
  17  * Copyright (c) 2016      Research Organization for Information Science
  18  *                         and Technology (RIST). All rights reserved.
  19  * $COPYRIGHT$
  20  *
  21  * Additional copyrights may follow
  22  *
  23  * $HEADER$
  24  *
  25  */
  26 
  27 #include "orte_config.h"
  28 #include "orte/types.h"
  29 #include "opal/types.h"
  30 
  31 #ifdef HAVE_UNISTD_H
  32 #include <unistd.h>
  33 #endif
  34 #ifdef HAVE_SYS_TYPES_H
  35 #include <sys/types.h>
  36 #endif
  37 #include <fcntl.h>
  38 #ifdef HAVE_NETINET_IN_H
  39 #include <netinet/in.h>
  40 #endif
  41 #ifdef HAVE_ARPA_INET_H
  42 #include <arpa/inet.h>
  43 #endif
  44 #ifdef HAVE_NETDB_H
  45 #include <netdb.h>
  46 #endif
  47 #include <ctype.h>
  48 
  49 #include "opal/runtime/opal_progress_threads.h"
  50 #include "opal/util/show_help.h"
  51 #include "opal/util/error.h"
  52 #include "opal/util/output.h"
  53 #include "opal/opal_socket_errno.h"
  54 #include "opal/util/if.h"
  55 #include "opal/util/net.h"
  56 #include "opal/util/argv.h"
  57 #include "opal/class/opal_hash_table.h"
  58 
  59 #include "orte/mca/errmgr/errmgr.h"
  60 #include "orte/mca/ess/ess.h"
  61 #include "orte/mca/routed/routed.h"
  62 #include "orte/util/name_fns.h"
  63 #include "orte/util/parse_options.h"
  64 #include "orte/util/show_help.h"
  65 #include "orte/util/threads.h"
  66 #include "orte/runtime/orte_globals.h"
  67 
  68 #include "orte/mca/oob/tcp/oob_tcp.h"
  69 #include "orte/mca/oob/tcp/oob_tcp_component.h"
  70 #include "orte/mca/oob/tcp/oob_tcp_peer.h"
  71 #include "orte/mca/oob/tcp/oob_tcp_common.h"
  72 #include "orte/mca/oob/tcp/oob_tcp_connection.h"
  73 #include "orte/mca/oob/tcp/oob_tcp_sendrecv.h"
  74 
  75 static void accept_connection(const int accepted_fd,
  76                               const struct sockaddr *addr);
  77 static void ping(const orte_process_name_t *proc);
  78 static void send_nb(orte_rml_send_t *msg);
  79 static void ft_event(int state);
  80 
  81 mca_oob_tcp_module_t mca_oob_tcp_module = {
  82     .accept_connection = accept_connection,
  83     .ping = ping,
  84     .send_nb = send_nb,
  85     .ft_event = ft_event
  86 };
  87 
  88 /*
  89  * Local utility functions
  90  */
  91 static void recv_handler(int sd, short flags, void* user);
  92 
  93 /* Called by mca_oob_tcp_accept() and connection_handler() on
  94  * a socket that has been accepted.  This call finishes processing the
  95  * socket, including setting socket options and registering for the
  96  * OOB-level connection handshake.  Used in both the threaded and
  97  * event listen modes.
  98  */
  99 static void accept_connection(const int accepted_fd,
 100                               const struct sockaddr *addr)
 101 {
 102     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 103                         "%s accept_connection: %s:%d\n",
 104                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 105                         opal_net_get_hostname(addr),
 106                         opal_net_get_port(addr));
 107 
 108    /* setup socket options */
 109     orte_oob_tcp_set_socket_options(accepted_fd);
 110 
 111     /* use a one-time event to wait for receipt of peer's
 112      *  process ident message to complete this connection
 113      */
 114     ORTE_ACTIVATE_TCP_ACCEPT_STATE(accepted_fd, addr, recv_handler);
 115 }
 116 
 117 /* API functions */
 118 static void ping(const orte_process_name_t *proc)
 119 {
 120     mca_oob_tcp_peer_t *peer;
 121 
 122     opal_output_verbose(2, orte_oob_base_framework.framework_output,
 123                         "%s:[%s:%d] processing ping to peer %s",
 124                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 125                         __FILE__, __LINE__,
 126                         ORTE_NAME_PRINT(proc));
 127 
 128     /* do we know this peer? */
 129     if (NULL == (peer = mca_oob_tcp_peer_lookup(proc))) {
 130         /* push this back to the component so it can try
 131          * another module within this transport. If no
 132          * module can be found, the component can push back
 133          * to the framework so another component can try
 134          */
 135         opal_output_verbose(2, orte_oob_base_framework.framework_output,
 136                             "%s:[%s:%d] hop %s unknown",
 137                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 138                             __FILE__, __LINE__,
 139                             ORTE_NAME_PRINT(proc));
 140         ORTE_ACTIVATE_TCP_MSG_ERROR(NULL, NULL, proc, mca_oob_tcp_component_hop_unknown);
 141         return;
 142     }
 143 
 144     /* has this peer had a progress thread assigned yet? */
 145     if (NULL == peer->ev_base) {
 146         /* nope - assign one */
 147         ORTE_OOB_TCP_NEXT_BASE(peer);
 148     }
 149 
 150     /* if we are already connected, there is nothing to do */
 151     if (MCA_OOB_TCP_CONNECTED == peer->state) {
 152         opal_output_verbose(2, orte_oob_base_framework.framework_output,
 153                             "%s:[%s:%d] already connected to peer %s",
 154                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 155                             __FILE__, __LINE__,
 156                             ORTE_NAME_PRINT(proc));
 157         return;
 158     }
 159 
 160     /* if we are already connecting, there is nothing to do */
 161     if (MCA_OOB_TCP_CONNECTING == peer->state ||
 162         MCA_OOB_TCP_CONNECT_ACK == peer->state) {
 163         opal_output_verbose(2, orte_oob_base_framework.framework_output,
 164                             "%s:[%s:%d] already connecting to peer %s",
 165                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 166                             __FILE__, __LINE__,
 167                             ORTE_NAME_PRINT(proc));
 168         return;
 169     }
 170 
 171     /* attempt the connection */
 172     peer->state = MCA_OOB_TCP_CONNECTING;
 173     ORTE_ACTIVATE_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect);
 174 }
 175 
 176 static void send_nb(orte_rml_send_t *msg)
 177 {
 178     mca_oob_tcp_peer_t *peer;
 179     orte_process_name_t hop;
 180 
 181 
 182     /* do we have a route to this peer (could be direct)? */
 183     hop = orte_routed.get_route(&msg->dst);
 184     /* do we know this hop? */
 185     if (NULL == (peer = mca_oob_tcp_peer_lookup(&hop))) {
 186         /* push this back to the component so it can try
 187          * another module within this transport. If no
 188          * module can be found, the component can push back
 189          * to the framework so another component can try
 190          */
 191         opal_output_verbose(2, orte_oob_base_framework.framework_output,
 192                             "%s:[%s:%d] processing send to peer %s:%d seq_num = %d hop %s unknown",
 193                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 194                             __FILE__, __LINE__,
 195                             ORTE_NAME_PRINT(&msg->dst), msg->tag, msg->seq_num,
 196                             ORTE_NAME_PRINT(&hop));
 197         ORTE_ACTIVATE_TCP_NO_ROUTE(msg, &hop, mca_oob_tcp_component_no_route);
 198         return;
 199     }
 200 
 201     opal_output_verbose(2, orte_oob_base_framework.framework_output,
 202                         "%s:[%s:%d] processing send to peer %s:%d seq_num = %d via %s",
 203                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 204                         __FILE__, __LINE__,
 205                         ORTE_NAME_PRINT(&msg->dst), msg->tag, msg->seq_num,
 206                         ORTE_NAME_PRINT(&peer->name));
 207     /* has this peer had a progress thread assigned yet? */
 208     if (NULL == peer->ev_base) {
 209         /* nope - assign one */
 210         ORTE_OOB_TCP_NEXT_BASE(peer);
 211     }
 212     /* add the msg to the hop's send queue */
 213     if (MCA_OOB_TCP_CONNECTED == peer->state) {
 214         opal_output_verbose(2, orte_oob_base_framework.framework_output,
 215                             "%s tcp:send_nb: already connected to %s - queueing for send",
 216                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 217                             ORTE_NAME_PRINT(&peer->name));
 218         MCA_OOB_TCP_QUEUE_SEND(msg, peer);
 219         return;
 220     }
 221 
 222     /* add the message to the queue for sending after the
 223      * connection is formed
 224      */
 225     MCA_OOB_TCP_QUEUE_PENDING(msg, peer);
 226 
 227     if (MCA_OOB_TCP_CONNECTING != peer->state &&
 228         MCA_OOB_TCP_CONNECT_ACK != peer->state) {
 229         /* we have to initiate the connection - again, we do not
 230          * want to block while the connection is created.
 231          * So throw us into an event that will create
 232          * the connection via a mini-state-machine :-)
 233          */
 234         opal_output_verbose(2, orte_oob_base_framework.framework_output,
 235                             "%s tcp:send_nb: initiating connection to %s",
 236                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 237                             ORTE_NAME_PRINT(&peer->name));
 238         peer->state = MCA_OOB_TCP_CONNECTING;
 239         ORTE_ACTIVATE_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect);
 240     }
 241 }
 242 
 243 /*
 244  * Event callback when there is data available on the registered
 245  * socket to recv.  This is called for the listen sockets to accept an
 246  * incoming connection, on new sockets trying to complete the software
 247  * connection process, and for probes.  Data on an established
 248  * connection is handled elsewhere.
 249  */
 250 static void recv_handler(int sd, short flg, void *cbdata)
 251 {
 252     mca_oob_tcp_conn_op_t *op = (mca_oob_tcp_conn_op_t*)cbdata;
 253     int flags;
 254     mca_oob_tcp_hdr_t hdr;
 255     mca_oob_tcp_peer_t *peer;
 256 
 257     ORTE_ACQUIRE_OBJECT(op);
 258 
 259     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 260                         "%s:tcp:recv:handler called",
 261                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 262 
 263     /* get the handshake */
 264     if (ORTE_SUCCESS != mca_oob_tcp_peer_recv_connect_ack(NULL, sd, &hdr)) {
 265         goto cleanup;
 266     }
 267 
 268     /* finish processing ident */
 269     if (MCA_OOB_TCP_IDENT == hdr.type) {
 270         if (NULL == (peer = mca_oob_tcp_peer_lookup(&hdr.origin))) {
 271             /* should never happen */
 272             mca_oob_tcp_peer_close(peer);
 273             goto cleanup;
 274         }
 275         /* set socket up to be non-blocking */
 276         if ((flags = fcntl(sd, F_GETFL, 0)) < 0) {
 277             opal_output(0, "%s mca_oob_tcp_recv_connect: fcntl(F_GETFL) failed: %s (%d)",
 278                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), strerror(opal_socket_errno), opal_socket_errno);
 279         } else {
 280             flags |= O_NONBLOCK;
 281             if (fcntl(sd, F_SETFL, flags) < 0) {
 282                 opal_output(0, "%s mca_oob_tcp_recv_connect: fcntl(F_SETFL) failed: %s (%d)",
 283                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), strerror(opal_socket_errno), opal_socket_errno);
 284             }
 285         }
 286         /* is the peer instance willing to accept this connection */
 287         peer->sd = sd;
 288         if (mca_oob_tcp_peer_accept(peer) == false) {
 289             if (OOB_TCP_DEBUG_CONNECT <= opal_output_get_verbosity(orte_oob_base_framework.framework_output)) {
 290                 opal_output(0, "%s-%s mca_oob_tcp_recv_connect: "
 291                             "rejected connection from %s connection state %d",
 292                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 293                             ORTE_NAME_PRINT(&(peer->name)),
 294                             ORTE_NAME_PRINT(&(hdr.origin)),
 295                             peer->state);
 296             }
 297             CLOSE_THE_SOCKET(sd);
 298         }
 299     }
 300 
 301  cleanup:
 302     OBJ_RELEASE(op);
 303 }
 304 
 305 /* Dummy function for when we are not using FT. */
 306 #if OPAL_ENABLE_FT_CR == 0
 307 static void ft_event(int state)
 308 {
 309     return;
 310 }
 311 
 312 #else
 313 static void ft_event(int state) {
 314 #if 0
 315     opal_list_item_t *item;
 316 #endif
 317 
 318     if(OPAL_CRS_CHECKPOINT == state) {
 319 #if 0
 320         /*
 321          * Disable event processing while we are working
 322          */
 323         opal_event_disable();
 324 #endif
 325     }
 326     else if(OPAL_CRS_CONTINUE == state) {
 327 #if 0
 328         /*
 329          * Resume event processing
 330          */
 331         opal_event_enable();
 332     }
 333     else if(OPAL_CRS_RESTART == state) {
 334         /*
 335          * Clean out cached connection information
 336          * Select pieces of finalize/init
 337          */
 338         for (item = opal_list_remove_first(&mca_oob_tcp_module.peer_list);
 339             item != NULL;
 340             item = opal_list_remove_first(&mca_oob_tcp_module.peer_list)) {
 341             mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t*)item;
 342             /* JJH: Use the below command for debugging restarts with invalid sockets
 343              * mca_oob_tcp_peer_dump(peer, "RESTART CLEAN")
 344              */
 345             MCA_OOB_TCP_PEER_RETURN(peer);
 346         }
 347 
 348         OBJ_DESTRUCT(&mca_oob_tcp_module.peer_free);
 349         OBJ_DESTRUCT(&mca_oob_tcp_module.peer_names);
 350         OBJ_DESTRUCT(&mca_oob_tcp_module.peers);
 351         OBJ_DESTRUCT(&mca_oob_tcp_module.peer_list);
 352 
 353         OBJ_CONSTRUCT(&mca_oob_tcp_module.peer_list,     opal_list_t);
 354         OBJ_CONSTRUCT(&mca_oob_tcp_module.peers,         opal_hash_table_t);
 355         OBJ_CONSTRUCT(&mca_oob_tcp_module.peer_names,    opal_hash_table_t);
 356         OBJ_CONSTRUCT(&mca_oob_tcp_module.peer_free,     opal_free_list_t);
 357 
 358         /*
 359          * Resume event processing
 360          */
 361         opal_event_enable();
 362 #endif
 363     }
 364     else if(OPAL_CRS_TERM == state ) {
 365         ;
 366     }
 367     else {
 368         ;
 369     }
 370 
 371     return;
 372 }
 373 #endif

/* [<][>][^][v][top][bottom][index][help] */