root/orte/mca/oob/tcp/oob_tcp_listener.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. orte_oob_tcp_start_listening
  2. create_listen
  3. create_listen6
  4. listen_thread
  5. connection_handler
  6. connection_event_handler
  7. tcp_ev_cons
  8. tcp_ev_des

   1 /*
   2  * Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2011 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2006-2013 Los Alamos National Security, LLC.
  13  *                         All rights reserved.
  14  * Copyright (c) 2009-2015 Cisco Systems, Inc.  All rights reserved.
  15  * Copyright (c) 2011      Oak Ridge National Labs.  All rights reserved.
  16  * Copyright (c) 2013-2018 Intel, Inc.  All rights reserved.
  17  * Copyright (c) 2015      Research Organization for Information Science
  18  *                         and Technology (RIST). All rights reserved.
  19  * $COPYRIGHT$
  20  *
  21  * Additional copyrights may follow
  22  *
  23  * $HEADER$
  24  *
  25  * In windows, many of the socket functions return an EWOULDBLOCK
  26  * instead of things like EAGAIN, EINPROGRESS, etc. It has been
  27  * verified that this will not conflict with other error codes that
  28  * are returned by these functions under UNIX/Linux environments
  29  */
  30 
  31 #include "orte_config.h"
  32 #include "orte/types.h"
  33 #include "opal/types.h"
  34 
  35 #ifdef HAVE_UNISTD_H
  36 #include <unistd.h>
  37 #endif
  38 #ifdef HAVE_SYS_TYPES_H
  39 #include <sys/types.h>
  40 #endif
  41 #include <fcntl.h>
  42 #ifdef HAVE_NETINET_IN_H
  43 #include <netinet/in.h>
  44 #endif
  45 #ifdef HAVE_ARPA_INET_H
  46 #include <arpa/inet.h>
  47 #endif
  48 #ifdef HAVE_NETDB_H
  49 #include <netdb.h>
  50 #endif
  51 #include <ctype.h>
  52 
  53 #include "opal/util/show_help.h"
  54 #include "opal/util/error.h"
  55 #include "opal/util/output.h"
  56 #include "opal/opal_socket_errno.h"
  57 #include "opal/util/if.h"
  58 #include "opal/util/net.h"
  59 #include "opal/util/argv.h"
  60 #include "opal/util/fd.h"
  61 #include "opal/class/opal_hash_table.h"
  62 #include "opal/class/opal_list.h"
  63 
  64 #include "orte/mca/errmgr/errmgr.h"
  65 #include "orte/mca/ess/ess.h"
  66 #include "orte/util/name_fns.h"
  67 #include "orte/util/parse_options.h"
  68 #include "orte/util/show_help.h"
  69 #include "orte/util/threads.h"
  70 #include "orte/runtime/orte_globals.h"
  71 
  72 #include "orte/mca/oob/tcp/oob_tcp.h"
  73 #include "orte/mca/oob/tcp/oob_tcp_component.h"
  74 #include "orte/mca/oob/tcp/oob_tcp_peer.h"
  75 #include "orte/mca/oob/tcp/oob_tcp_connection.h"
  76 #include "orte/mca/oob/tcp/oob_tcp_listener.h"
  77 #include "orte/mca/oob/tcp/oob_tcp_common.h"
  78 
  79 static void connection_event_handler(int incoming_sd, short flags, void* cbdata);
  80 static void* listen_thread(opal_object_t *obj);
  81 static int create_listen(void);
  82 #if OPAL_ENABLE_IPV6
  83 static int create_listen6(void);
  84 #endif
  85 static void connection_handler(int sd, short flags, void* cbdata);
  86 static void connection_event_handler(int sd, short flags, void* cbdata);
  87 
  88 /*
  89  * Component initialization - create a module for each available
  90  * TCP interface and initialize the static resources associated
  91  * with that module.
  92  *
  93  * Also initializes the list of devices that will be used/supported by
  94  * the module, using the if_include and if_exclude variables.  This is
  95  * the only place that this sorting should occur -- all other places
  96  * should use the tcp_avaiable_devices list.  This is a change from
  97  * previous versions of this component.
  98  */
  99 int orte_oob_tcp_start_listening(void)
 100 {
 101     int rc;
 102     mca_oob_tcp_listener_t *listener;
 103 
 104     /* if we don't have any TCP interfaces, we shouldn't be here */
 105     if (NULL == mca_oob_tcp_component.ipv4conns
 106 #if OPAL_ENABLE_IPV6
 107         && NULL == mca_oob_tcp_component.ipv6conns
 108 #endif
 109         ) {
 110         ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 111         return ORTE_ERR_NOT_FOUND;
 112     }
 113 
 114     /* create listen socket(s) for incoming connection attempts */
 115     if (ORTE_SUCCESS != (rc = create_listen())) {
 116         ORTE_ERROR_LOG(rc);
 117         return rc;
 118     }
 119 
 120 #if OPAL_ENABLE_IPV6
 121     /* create listen socket(s) for incoming connection attempts */
 122     if (ORTE_SUCCESS != (rc = create_listen6())) {
 123         ORTE_ERROR_LOG(rc);
 124         return rc;
 125     }
 126 #endif
 127 
 128     /* if I am the HNP, start a listening thread so we can
 129      * harvest connection requests as rapidly as possible
 130      */
 131     if (ORTE_PROC_IS_HNP) {
 132         if (0 > pipe(mca_oob_tcp_component.stop_thread)) {
 133             ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
 134             return ORTE_ERR_OUT_OF_RESOURCE;
 135         }
 136 
 137         /* Make sure the pipe FDs are set to close-on-exec so that
 138            they don't leak into children */
 139         if (opal_fd_set_cloexec(mca_oob_tcp_component.stop_thread[0]) != OPAL_SUCCESS ||
 140             opal_fd_set_cloexec(mca_oob_tcp_component.stop_thread[1]) != OPAL_SUCCESS) {
 141             close(mca_oob_tcp_component.stop_thread[0]);
 142             close(mca_oob_tcp_component.stop_thread[1]);
 143             ORTE_ERROR_LOG(ORTE_ERR_IN_ERRNO);
 144             return ORTE_ERR_IN_ERRNO;
 145         }
 146 
 147         mca_oob_tcp_component.listen_thread_active = true;
 148         mca_oob_tcp_component.listen_thread.t_run = listen_thread;
 149         mca_oob_tcp_component.listen_thread.t_arg = NULL;
 150         if (OPAL_SUCCESS != (rc = opal_thread_start(&mca_oob_tcp_component.listen_thread))) {
 151             ORTE_ERROR_LOG(rc);
 152             opal_output(0, "%s Unable to start listen thread", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 153         }
 154         return rc;
 155     }
 156 
 157     /* otherwise, setup to listen via the event lib */
 158     OPAL_LIST_FOREACH(listener, &mca_oob_tcp_component.listeners, mca_oob_tcp_listener_t) {
 159         listener->ev_active = true;
 160         opal_event_set(orte_oob_base.ev_base, &listener->event,
 161                        listener->sd,
 162                        OPAL_EV_READ|OPAL_EV_PERSIST,
 163                        connection_event_handler,
 164                        0);
 165         opal_event_set_priority(&listener->event, ORTE_MSG_PRI);
 166         ORTE_POST_OBJECT(listener);
 167         opal_event_add(&listener->event, 0);
 168     }
 169 
 170     return ORTE_SUCCESS;
 171 }
 172 
 173 /*
 174  * Create an IPv4 listen socket and bind to all interfaces.
 175  *
 176  * At one time, this also registered a callback with the event library
 177  * for when connections were received on the listen socket.  This is
 178  * no longer the case -- the caller must register any events required.
 179  *
 180  * Called by both the threaded and event based listen modes.
 181  */
 182 static int create_listen(void)
 183 {
 184     int flags, i;
 185     uint16_t port=0;
 186     struct sockaddr_storage inaddr;
 187     opal_socklen_t addrlen;
 188     char **ports=NULL;
 189     int sd = -1;
 190     char *tconn;
 191     mca_oob_tcp_listener_t *conn;
 192 
 193     /* If an explicit range of ports was given, find the first open
 194      * port in the range.  Otherwise, tcp_port_min will be 0, which
 195      * means "pick any port"
 196      */
 197     if (ORTE_PROC_IS_DAEMON) {
 198         if (NULL != mca_oob_tcp_component.tcp_static_ports) {
 199             /* if static ports were provided, take the
 200              * first entry in the list
 201              */
 202             opal_argv_append_nosize(&ports, mca_oob_tcp_component.tcp_static_ports[0]);
 203             /* flag that we are using static ports */
 204             orte_static_ports = true;
 205         } else if (NULL != mca_oob_tcp_component.tcp_dyn_ports) {
 206             /* take the entire range */
 207             ports = opal_argv_copy(mca_oob_tcp_component.tcp_dyn_ports);
 208             orte_static_ports = false;
 209         } else {
 210             /* flag the system to dynamically take any available port */
 211             opal_argv_append_nosize(&ports, "0");
 212             orte_static_ports = false;
 213         }
 214     } else if (ORTE_PROC_IS_HNP) {
 215         if (NULL != mca_oob_tcp_component.tcp_static_ports) {
 216             /* if static ports were provided, take the
 217              * first entry in the list
 218              */
 219             opal_argv_append_nosize(&ports, mca_oob_tcp_component.tcp_static_ports[0]);
 220             /* flag that we are using static ports */
 221             orte_static_ports = true;
 222         } else if (NULL != mca_oob_tcp_component.tcp_dyn_ports) {
 223             /* take the entire range */
 224             ports = opal_argv_copy(mca_oob_tcp_component.tcp_dyn_ports);
 225             orte_static_ports = false;
 226         } else {
 227             /* flag the system to dynamically take any available port */
 228             opal_argv_append_nosize(&ports, "0");
 229             orte_static_ports = false;
 230         }
 231     } else if (ORTE_PROC_IS_MPI) {
 232         if (NULL != mca_oob_tcp_component.tcp_static_ports) {
 233             /* if static ports were provided, an mpi proc takes its
 234              * node_local_rank entry in the list IF it has that info
 235              * AND enough ports were provided - otherwise, we "pick any port"
 236              */
 237             orte_node_rank_t nrank;
 238             /* do I know my node_local_rank yet? */
 239             if (ORTE_NODE_RANK_INVALID != (nrank = orte_process_info.my_node_rank) &&
 240                 (nrank+1) < opal_argv_count(mca_oob_tcp_component.tcp_static_ports)) {
 241                 /* any daemon takes the first entry, so we start with the second */
 242                 opal_argv_append_nosize(&ports, mca_oob_tcp_component.tcp_static_ports[nrank+1]);
 243                 /* flag that we are using static ports */
 244                 orte_static_ports = true;
 245             } else {
 246                 /* flag the system to dynamically take any available port */
 247                 opal_argv_append_nosize(&ports, "0");
 248                 orte_static_ports = false;
 249             }
 250         } else if (NULL != mca_oob_tcp_component.tcp_dyn_ports) {
 251             /* take the entire range */
 252             ports = opal_argv_copy(mca_oob_tcp_component.tcp_dyn_ports);
 253             orte_static_ports = false;
 254         } else {
 255             /* flag the system to dynamically take any available port */
 256             opal_argv_append_nosize(&ports, "0");
 257             orte_static_ports = false;
 258         }
 259     } else {
 260         /* if we are a tool, then we must let the
 261          * system pick any port
 262          */
 263         opal_argv_append_nosize(&ports, "0");
 264         /* if static ports were specified, flag it
 265          * so the HNP does the right thing
 266          */
 267         if (NULL != mca_oob_tcp_component.tcp_static_ports) {
 268             orte_static_ports = true;
 269         } else {
 270             orte_static_ports = false;
 271         }
 272     }
 273 
 274     /* bozo check - this should be impossible, but... */
 275     if (NULL == ports) {
 276         return ORTE_ERROR;
 277     }
 278 
 279     /* get the address info for this interface */
 280     memset(&inaddr, 0, sizeof(inaddr));
 281     ((struct sockaddr_in*) &inaddr)->sin_family = AF_INET;
 282     ((struct sockaddr_in*) &inaddr)->sin_addr.s_addr = INADDR_ANY;
 283     addrlen = sizeof(struct sockaddr_in);
 284 
 285     /* loop across all the specified ports, establishing a socket
 286      * for each one - note that application procs will ONLY have
 287      * one socket, but that orterun and daemons will have multiple
 288      * sockets to support more flexible wireup protocols
 289      */
 290     for (i=0; i < opal_argv_count(ports); i++) {
 291         opal_output_verbose(5, orte_oob_base_framework.framework_output,
 292                             "%s attempting to bind to IPv4 port %s",
 293                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 294                             ports[i]);
 295         /* get the port number */
 296         port = strtol(ports[i], NULL, 10);
 297         /* convert it to network-byte-order */
 298         port = htons(port);
 299 
 300         ((struct sockaddr_in*) &inaddr)->sin_port = port;
 301 
 302         /* create a listen socket for incoming connections on this port */
 303         sd = socket(AF_INET, SOCK_STREAM, 0);
 304         if (sd < 0) {
 305             if (EAFNOSUPPORT != opal_socket_errno) {
 306                 opal_output(0,"mca_oob_tcp_component_init: socket() failed: %s (%d)",
 307                             strerror(opal_socket_errno), opal_socket_errno);
 308             }
 309             opal_argv_free(ports);
 310             return ORTE_ERR_IN_ERRNO;
 311         }
 312 
 313         /* Enable/disable reusing ports */
 314         if (orte_static_ports) {
 315             flags = 1;
 316         } else {
 317             flags = 0;
 318         }
 319         if (setsockopt (sd, SOL_SOCKET, SO_REUSEADDR, (const char *)&flags, sizeof(flags)) < 0) {
 320             opal_output(0, "mca_oob_tcp_create_listen: unable to set the "
 321                         "SO_REUSEADDR option (%s:%d)\n",
 322                         strerror(opal_socket_errno), opal_socket_errno);
 323             CLOSE_THE_SOCKET(sd);
 324             opal_argv_free(ports);
 325             return ORTE_ERROR;
 326         }
 327 
 328         /* Set the socket to close-on-exec so that no children inherit
 329            this FD */
 330         if (opal_fd_set_cloexec(sd) != OPAL_SUCCESS) {
 331             opal_output(0, "mca_oob_tcp_create_listen: unable to set the "
 332                         "listening socket to CLOEXEC (%s:%d)\n",
 333                         strerror(opal_socket_errno), opal_socket_errno);
 334             CLOSE_THE_SOCKET(sd);
 335             opal_argv_free(ports);
 336             return ORTE_ERROR;
 337         }
 338 
 339         if (bind(sd, (struct sockaddr*)&inaddr, addrlen) < 0) {
 340             if( (EADDRINUSE == opal_socket_errno) || (EADDRNOTAVAIL == opal_socket_errno) ) {
 341                 continue;
 342             }
 343             opal_output(0, "%s bind() failed for port %d: %s (%d)",
 344                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 345                         (int)ntohs(port),
 346                         strerror(opal_socket_errno),
 347                         opal_socket_errno );
 348             CLOSE_THE_SOCKET(sd);
 349             opal_argv_free(ports);
 350             return ORTE_ERROR;
 351         }
 352         /* resolve assigned port */
 353         if (getsockname(sd, (struct sockaddr*)&inaddr, &addrlen) < 0) {
 354             opal_output(0, "mca_oob_tcp_create_listen: getsockname(): %s (%d)",
 355                         strerror(opal_socket_errno), opal_socket_errno);
 356             CLOSE_THE_SOCKET(sd);
 357             opal_argv_free(ports);
 358             return ORTE_ERROR;
 359         }
 360 
 361         /* setup listen backlog to maximum allowed by kernel */
 362         if (listen(sd, SOMAXCONN) < 0) {
 363             opal_output(0, "mca_oob_tcp_component_init: listen(): %s (%d)",
 364                         strerror(opal_socket_errno), opal_socket_errno);
 365             CLOSE_THE_SOCKET(sd);
 366             opal_argv_free(ports);
 367             return ORTE_ERROR;
 368         }
 369 
 370         /* set socket up to be non-blocking, otherwise accept could block */
 371         if ((flags = fcntl(sd, F_GETFL, 0)) < 0) {
 372             opal_output(0, "mca_oob_tcp_component_init: fcntl(F_GETFL) failed: %s (%d)",
 373                         strerror(opal_socket_errno), opal_socket_errno);
 374             CLOSE_THE_SOCKET(sd);
 375             opal_argv_free(ports);
 376             return ORTE_ERROR;
 377         }
 378         flags |= O_NONBLOCK;
 379         if (fcntl(sd, F_SETFL, flags) < 0) {
 380             opal_output(0, "mca_oob_tcp_component_init: fcntl(F_SETFL) failed: %s (%d)",
 381                         strerror(opal_socket_errno), opal_socket_errno);
 382             CLOSE_THE_SOCKET(sd);
 383             opal_argv_free(ports);
 384             return ORTE_ERROR;
 385         }
 386 
 387         /* add this port to our connections */
 388         conn = OBJ_NEW(mca_oob_tcp_listener_t);
 389         conn->sd = sd;
 390         conn->port = ntohs(((struct sockaddr_in*) &inaddr)->sin_port);
 391         if (0 == orte_process_info.my_port) {
 392             /* save the first one */
 393             orte_process_info.my_port = conn->port;
 394         }
 395         opal_list_append(&mca_oob_tcp_component.listeners, &conn->item);
 396         /* and to our ports */
 397         opal_asprintf(&tconn, "%d", ntohs(((struct sockaddr_in*) &inaddr)->sin_port));
 398         opal_argv_append_nosize(&mca_oob_tcp_component.ipv4ports, tconn);
 399         free(tconn);
 400         if (OOB_TCP_DEBUG_CONNECT <= opal_output_get_verbosity(orte_oob_base_framework.framework_output)) {
 401             port = ntohs(((struct sockaddr_in*) &inaddr)->sin_port);
 402             opal_output(0, "%s assigned IPv4 port %d",
 403                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), port);
 404         }
 405 
 406         if (!ORTE_PROC_IS_HNP) {
 407             /* only the HNP binds to multiple ports */
 408             break;
 409         }
 410     }
 411     /* done with this, so release it */
 412     opal_argv_free(ports);
 413 
 414     if (0 == opal_list_get_size(&mca_oob_tcp_component.listeners)) {
 415         /* cleanup */
 416         if (0 <= sd) {
 417             CLOSE_THE_SOCKET(sd);
 418         }
 419         return ORTE_ERR_SOCKET_NOT_AVAILABLE;
 420     }
 421 
 422     return ORTE_SUCCESS;
 423 }
 424 
 425 #if OPAL_ENABLE_IPV6
 426 /*
 427  * Create an IPv6 listen socket and bind to all interfaces.
 428  *
 429  * At one time, this also registered a callback with the event library
 430  * for when connections were received on the listen socket.  This is
 431  * no longer the case -- the caller must register any events required.
 432  *
 433  * Called by both the threaded and event based listen modes.
 434  */
 435 static int create_listen6(void)
 436 {
 437     int flags, i;
 438     uint16_t port=0;
 439     struct sockaddr_storage inaddr;
 440     opal_socklen_t addrlen;
 441     char **ports=NULL;
 442     int sd;
 443     char *tconn;
 444     mca_oob_tcp_listener_t *conn;
 445 
 446     /* If an explicit range of ports was given, find the first open
 447      * port in the range.  Otherwise, tcp_port_min will be 0, which
 448      * means "pick any port"
 449      */
 450     if (ORTE_PROC_IS_DAEMON) {
 451         if (NULL != mca_oob_tcp_component.tcp6_static_ports) {
 452             /* if static ports were provided, take the
 453              * first entry in the list
 454              */
 455             opal_argv_append_nosize(&ports, mca_oob_tcp_component.tcp6_static_ports[0]);
 456             /* flag that we are using static ports */
 457             orte_static_ports = true;
 458         } else if (NULL != mca_oob_tcp_component.tcp6_dyn_ports) {
 459             /* take the entire range */
 460             ports = opal_argv_copy(mca_oob_tcp_component.tcp6_dyn_ports);
 461             orte_static_ports = false;
 462         } else {
 463             /* flag the system to dynamically take any available port */
 464             opal_argv_append_nosize(&ports, "0");
 465             orte_static_ports = false;
 466         }
 467     } else if (ORTE_PROC_IS_HNP) {
 468         if (NULL != mca_oob_tcp_component.tcp6_static_ports) {
 469             /* if static ports were provided, take the
 470              * first entry in the list
 471              */
 472             opal_argv_append_nosize(&ports, mca_oob_tcp_component.tcp6_static_ports[0]);
 473             /* flag that we are using static ports */
 474             orte_static_ports = true;
 475         } else if (NULL != mca_oob_tcp_component.tcp6_dyn_ports) {
 476             /* take the entire range */
 477             ports = opal_argv_copy(mca_oob_tcp_component.tcp6_dyn_ports);
 478             orte_static_ports = false;
 479         } else {
 480             /* flag the system to dynamically take any available port */
 481             opal_argv_append_nosize(&ports, "0");
 482             orte_static_ports = false;
 483         }
 484     } else if (ORTE_PROC_IS_MPI) {
 485         if (NULL != mca_oob_tcp_component.tcp6_static_ports) {
 486             /* if static ports were provided, an mpi proc takes its
 487              * node_local_rank entry in the list IF it has that info
 488              * AND enough ports were provided - otherwise, we "pick any port"
 489              */
 490             orte_node_rank_t nrank;
 491             /* do I know my node_local_rank yet? */
 492             if (ORTE_NODE_RANK_INVALID != (nrank = orte_process_info.my_node_rank) &&
 493                 (nrank+1) < opal_argv_count(mca_oob_tcp_component.tcp6_static_ports)) {
 494                 /* any daemon takes the first entry, so we start with the second */
 495                 opal_argv_append_nosize(&ports, mca_oob_tcp_component.tcp6_static_ports[nrank+1]);
 496                 /* flag that we are using static ports */
 497                 orte_static_ports = true;
 498             } else {
 499                 /* flag the system to dynamically take any available port */
 500                 opal_argv_append_nosize(&ports, "0");
 501                 orte_static_ports = false;
 502             }
 503         } else if (NULL != mca_oob_tcp_component.tcp6_dyn_ports) {
 504             /* take the entire range */
 505             ports = opal_argv_copy(mca_oob_tcp_component.tcp6_dyn_ports);
 506             orte_static_ports = false;
 507         } else {
 508             /* flag the system to dynamically take any available port */
 509             opal_argv_append_nosize(&ports, "0");
 510             orte_static_ports = false;
 511         }
 512     } else {
 513         /* if we are a tool, then we must let the
 514          * system pick any port
 515          */
 516         opal_argv_append_nosize(&ports, "0");
 517         /* if static ports were specified, flag it
 518          * so the HNP does the right thing
 519          */
 520         if (NULL != mca_oob_tcp_component.tcp6_static_ports) {
 521             orte_static_ports = true;
 522         } else {
 523             orte_static_ports = false;
 524         }
 525     }
 526 
 527     /* bozo check - this should be impossible, but... */
 528     if (NULL == ports) {
 529         return ORTE_ERROR;
 530     }
 531 
 532     /* get the address info for this interface */
 533     memset(&inaddr, 0, sizeof(inaddr));
 534     ((struct sockaddr_in6*) &inaddr)->sin6_family = AF_INET6;
 535     ((struct sockaddr_in6*) &inaddr)->sin6_addr = in6addr_any;
 536     addrlen = sizeof(struct sockaddr_in6);
 537 
 538     /* loop across all the specified ports, establishing a socket
 539      * for each one - note that application procs will ONLY have
 540      * one socket, but that orterun and daemons will have multiple
 541      * sockets to support more flexible wireup protocols
 542      */
 543     for (i=0; i < opal_argv_count(ports); i++) {
 544         opal_output_verbose(5, orte_oob_base_framework.framework_output,
 545                             "%s attempting to bind to IPv6 port %s",
 546                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 547                             ports[i]);
 548         /* get the port number */
 549         port = strtol(ports[i], NULL, 10);
 550         /* convert it to network-byte-order */
 551         port = htons(port);
 552 
 553         ((struct sockaddr_in6*) &inaddr)->sin6_port = port;
 554 
 555         /* create a listen socket for incoming connections on this port */
 556         sd = socket(AF_INET6, SOCK_STREAM, 0);
 557         if (sd < 0) {
 558             if (EAFNOSUPPORT != opal_socket_errno) {
 559                 opal_output(0,"mca_oob_tcp_component_init: socket() failed: %s (%d)",
 560                             strerror(opal_socket_errno), opal_socket_errno);
 561             }
 562             return ORTE_ERR_IN_ERRNO;
 563         }
 564         /* Set the socket to close-on-exec so that no children inherit
 565            this FD */
 566         if (opal_fd_set_cloexec(sd) != OPAL_SUCCESS) {
 567             opal_output(0, "mca_oob_tcp_create_listen6: unable to set the "
 568                         "listening socket to CLOEXEC (%s:%d)\n",
 569                         strerror(opal_socket_errno), opal_socket_errno);
 570             CLOSE_THE_SOCKET(sd);
 571             opal_argv_free(ports);
 572             return ORTE_ERROR;
 573         }
 574 
 575         /* Enable/disable reusing ports */
 576         if (orte_static_ports) {
 577             flags = 1;
 578         } else {
 579             flags = 0;
 580         }
 581         if (setsockopt (sd, SOL_SOCKET, SO_REUSEADDR, (const char *)&flags, sizeof(flags)) < 0) {
 582             opal_output(0, "mca_oob_tcp_create_listen: unable to set the "
 583                         "SO_REUSEADDR option (%s:%d)\n",
 584                         strerror(opal_socket_errno), opal_socket_errno);
 585             CLOSE_THE_SOCKET(sd);
 586             opal_argv_free(ports);
 587             return ORTE_ERROR;
 588         }
 589 
 590         if (bind(sd, (struct sockaddr*)&inaddr, addrlen) < 0) {
 591             if( (EADDRINUSE == opal_socket_errno) || (EADDRNOTAVAIL == opal_socket_errno) ) {
 592                 continue;
 593             }
 594             opal_output(0, "%s bind() failed for port %d: %s (%d)",
 595                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 596                         (int)ntohs(port),
 597                         strerror(opal_socket_errno),
 598                         opal_socket_errno );
 599             CLOSE_THE_SOCKET(sd);
 600             opal_argv_free(ports);
 601             return ORTE_ERROR;
 602         }
 603         /* resolve assigned port */
 604         if (getsockname(sd, (struct sockaddr*)&inaddr, &addrlen) < 0) {
 605             opal_output(0, "mca_oob_tcp_create_listen: getsockname(): %s (%d)",
 606                         strerror(opal_socket_errno), opal_socket_errno);
 607             CLOSE_THE_SOCKET(sd);
 608             return ORTE_ERROR;
 609         }
 610 
 611         /* setup listen backlog to maximum allowed by kernel */
 612         if (listen(sd, SOMAXCONN) < 0) {
 613             opal_output(0, "mca_oob_tcp_component_init: listen(): %s (%d)",
 614                         strerror(opal_socket_errno), opal_socket_errno);
 615             return ORTE_ERROR;
 616         }
 617 
 618         /* set socket up to be non-blocking, otherwise accept could block */
 619         if ((flags = fcntl(sd, F_GETFL, 0)) < 0) {
 620             opal_output(0, "mca_oob_tcp_component_init: fcntl(F_GETFL) failed: %s (%d)",
 621                         strerror(opal_socket_errno), opal_socket_errno);
 622             return ORTE_ERROR;
 623         }
 624         flags |= O_NONBLOCK;
 625         if (fcntl(sd, F_SETFL, flags) < 0) {
 626             opal_output(0, "mca_oob_tcp_component_init: fcntl(F_SETFL) failed: %s (%d)",
 627                         strerror(opal_socket_errno), opal_socket_errno);
 628             return ORTE_ERROR;
 629         }
 630 
 631         /* add this port to our connections */
 632         conn = OBJ_NEW(mca_oob_tcp_listener_t);
 633         conn->tcp6 = true;
 634         conn->sd = sd;
 635         conn->port = ntohs(((struct sockaddr_in6*) &inaddr)->sin6_port);
 636         opal_list_append(&mca_oob_tcp_component.listeners, &conn->item);
 637         /* and to our ports */
 638         opal_asprintf(&tconn, "%d", ntohs(((struct sockaddr_in6*) &inaddr)->sin6_port));
 639         opal_argv_append_nosize(&mca_oob_tcp_component.ipv6ports, tconn);
 640         free(tconn);
 641         if (OOB_TCP_DEBUG_CONNECT <= opal_output_get_verbosity(orte_oob_base_framework.framework_output)) {
 642             opal_output(0, "%s assigned IPv6 port %d",
 643                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 644                         (int)ntohs(((struct sockaddr_in6*) &inaddr)->sin6_port));
 645         }
 646 
 647         if (!ORTE_PROC_IS_HNP) {
 648             /* only the HNP binds to multiple ports */
 649             break;
 650         }
 651     }
 652     if (0 == opal_list_get_size(&mca_oob_tcp_component.listeners)) {
 653         /* cleanup */
 654         CLOSE_THE_SOCKET(sd);
 655         opal_argv_free(ports);
 656         return ORTE_ERR_SOCKET_NOT_AVAILABLE;
 657     }
 658 
 659     /* done with this, so release it */
 660     opal_argv_free(ports);
 661 
 662     return ORTE_SUCCESS;
 663 }
 664 #endif
 665 
 666 /*
 667  * The listen thread created when listen_mode is threaded.  Accepts
 668  * incoming connections and places them in a queue for further
 669  * processing
 670  *
 671  * Runs until mca_oob_tcp_compnent.shutdown is set to true.
 672  */
 673 static void* listen_thread(opal_object_t *obj)
 674 {
 675     int rc, max, accepted_connections, sd;
 676     opal_socklen_t addrlen = sizeof(struct sockaddr_storage);
 677     mca_oob_tcp_pending_connection_t *pending_connection;
 678     struct timeval timeout;
 679     fd_set readfds;
 680     mca_oob_tcp_listener_t *listener;
 681 
 682     /* only execute during the initial VM startup stage - once
 683      * all the initial daemons have reported in, we will revert
 684      * to the event method for handling any further connections
 685      * so as to minimize overhead
 686      */
 687     while (mca_oob_tcp_component.listen_thread_active) {
 688         FD_ZERO(&readfds);
 689         max = -1;
 690         OPAL_LIST_FOREACH(listener, &mca_oob_tcp_component.listeners, mca_oob_tcp_listener_t) {
 691             FD_SET(listener->sd, &readfds);
 692             max = (listener->sd > max) ? listener->sd : max;
 693         }
 694         /* add the stop_thread fd */
 695         FD_SET(mca_oob_tcp_component.stop_thread[0], &readfds);
 696         max = (mca_oob_tcp_component.stop_thread[0] > max) ? mca_oob_tcp_component.stop_thread[0] : max;
 697 
 698         /* set timeout interval */
 699         timeout.tv_sec = mca_oob_tcp_component.listen_thread_tv.tv_sec;
 700         timeout.tv_usec = mca_oob_tcp_component.listen_thread_tv.tv_usec;
 701 
 702         /* Block in a select to avoid hammering the cpu.  If a connection
 703          * comes in, we'll get woken up right away.
 704          */
 705         rc = select(max + 1, &readfds, NULL, NULL, &timeout);
 706         if (!mca_oob_tcp_component.listen_thread_active) {
 707             /* we've been asked to terminate */
 708             close(mca_oob_tcp_component.stop_thread[0]);
 709             close(mca_oob_tcp_component.stop_thread[1]);
 710             return NULL;
 711         }
 712         if (rc < 0) {
 713             if (EAGAIN != opal_socket_errno && EINTR != opal_socket_errno) {
 714                 perror("select");
 715             }
 716             continue;
 717         }
 718 
 719         /* Spin accepting connections until all active listen sockets
 720          * do not have any incoming connections, pushing each connection
 721          * onto the event queue for processing
 722          */
 723         do {
 724             accepted_connections = 0;
 725             OPAL_LIST_FOREACH(listener, &mca_oob_tcp_component.listeners, mca_oob_tcp_listener_t) {
 726                 sd = listener->sd;
 727 
 728                 /* according to the man pages, select replaces the given descriptor
 729                  * set with a subset consisting of those descriptors that are ready
 730                  * for the specified operation - in this case, a read. So we need to
 731                  * first check to see if this file descriptor is included in the
 732                  * returned subset
 733                  */
 734                 if (0 == FD_ISSET(sd, &readfds)) {
 735                     /* this descriptor is not included */
 736                     continue;
 737                 }
 738 
 739                 /* this descriptor is ready to be read, which means a connection
 740                  * request has been received - so harvest it. All we want to do
 741                  * here is accept the connection and push the info onto the event
 742                  * library for subsequent processing - we don't want to actually
 743                  * process the connection here as it takes too long, and so the
 744                  * OS might start rejecting connections due to timeout.
 745                  */
 746                 pending_connection = OBJ_NEW(mca_oob_tcp_pending_connection_t);
 747                 opal_event_set(orte_oob_base.ev_base, &pending_connection->ev, -1,
 748                                OPAL_EV_WRITE, connection_handler, pending_connection);
 749                 opal_event_set_priority(&pending_connection->ev, ORTE_MSG_PRI);
 750                 pending_connection->fd = accept(sd,
 751                                                 (struct sockaddr*)&(pending_connection->addr),
 752                                                 &addrlen);
 753 
 754                 /* check for < 0 as indicating an error upon accept */
 755                 if (pending_connection->fd < 0) {
 756                     OBJ_RELEASE(pending_connection);
 757 
 758                     /* Non-fatal errors */
 759                     if (EAGAIN == opal_socket_errno ||
 760                         EWOULDBLOCK == opal_socket_errno) {
 761                         continue;
 762                     }
 763 
 764                     /* If we run out of file descriptors, log an extra
 765                        warning (so that the user can know to fix this
 766                        problem) and abandon all hope. */
 767                     else if (EMFILE == opal_socket_errno) {
 768                         CLOSE_THE_SOCKET(sd);
 769                         ORTE_ERROR_LOG(ORTE_ERR_SYS_LIMITS_SOCKETS);
 770                         orte_show_help("help-oob-tcp.txt",
 771                                        "accept failed",
 772                                        true,
 773                                        opal_process_info.nodename,
 774                                        opal_socket_errno,
 775                                        strerror(opal_socket_errno),
 776                                        "Out of file descriptors");
 777                         goto done;
 778                     }
 779 
 780                     /* For all other cases, print a
 781                        warning but try to continue */
 782                     else {
 783                         orte_show_help("help-oob-tcp.txt",
 784                                        "accept failed",
 785                                        true,
 786                                        opal_process_info.nodename,
 787                                        opal_socket_errno,
 788                                        strerror(opal_socket_errno),
 789                                        "Unknown cause; job will try to continue");
 790                         continue;
 791                     }
 792                 }
 793 
 794                 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 795                                     "%s mca_oob_tcp_listen_thread: incoming connection: "
 796                                     "(%d, %d) %s:%d\n",
 797                                     ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 798                                     pending_connection->fd, opal_socket_errno,
 799                                     opal_net_get_hostname((struct sockaddr*) &pending_connection->addr),
 800                                     opal_net_get_port((struct sockaddr*) &pending_connection->addr));
 801 
 802                 /* if we are on a privileged port, we only accept connections
 803                  * from other privileged sockets. A privileged port is one
 804                  * whose port is less than 1024 on Linux, so we'll check for that. */
 805                 if (1024 >= listener->port) {
 806                     uint16_t inport;
 807                     inport = opal_net_get_port((struct sockaddr*) &pending_connection->addr);
 808                     if (1024 < inport) {
 809                         /* someone tried to cross-connect privileges,
 810                          * say something */
 811                         orte_show_help("help-oob-tcp.txt",
 812                                        "privilege failure", true,
 813                                        opal_process_info.nodename, listener->port,
 814                                        opal_net_get_hostname((struct sockaddr*) &pending_connection->addr),
 815                                        inport);
 816                         CLOSE_THE_SOCKET(pending_connection->fd);
 817                         OBJ_RELEASE(pending_connection);
 818                         continue;
 819                     }
 820                 }
 821 
 822                 /* activate the event */
 823                 ORTE_POST_OBJECT(pending_connection);
 824                 opal_event_active(&pending_connection->ev, OPAL_EV_WRITE, 1);
 825                 accepted_connections++;
 826             }
 827         } while (accepted_connections > 0);
 828     }
 829 
 830  done:
 831 #if 0
 832     /* once we complete the initial launch, the "flood" of connections
 833      * will end - only connection requests from local procs, connect/accept
 834      * operations across mpirun instances, or the occasional tool will need
 835      * to be serviced. As these are relatively small events, we can easily
 836      * handle them in the context of the event library and no longer require
 837      * a separate connection harvesting thread. So switch over to the event
 838      * lib handler now
 839      */
 840     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 841                         "%s mca_oob_tcp_listen_thread: switching to event lib",
 842                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 843     /* setup to listen via event library */
 844     OPAL_LIST_FOREACH(listener, &mca_oob_tcp_component.listeners, mca_oob_tcp_listener_t) {
 845         opal_event_set(orte_event_base, listener->event,
 846                    listener->sd,
 847                    OPAL_EV_READ|OPAL_EV_PERSIST,
 848                    connection_event_handler,
 849                    0);
 850         opal_event_set_priority(listener->event, ORTE_MSG_PRI);
 851         opal_event_add(listener->event, 0);
 852     }
 853 #endif
 854     return NULL;
 855 }
 856 
 857 /*
 858  * Handler for accepting connections from the listen thread
 859  */
 860 static void connection_handler(int sd, short flags, void* cbdata)
 861 {
 862     mca_oob_tcp_pending_connection_t *new_connection;
 863 
 864     new_connection = (mca_oob_tcp_pending_connection_t*)cbdata;
 865 
 866     ORTE_ACQUIRE_OBJECT(new_connection);
 867 
 868     opal_output_verbose(4, orte_oob_base_framework.framework_output,
 869                         "%s connection_handler: working connection "
 870                         "(%d, %d) %s:%d\n",
 871                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 872                         new_connection->fd, opal_socket_errno,
 873                         opal_net_get_hostname((struct sockaddr*) &new_connection->addr),
 874                         opal_net_get_port((struct sockaddr*) &new_connection->addr));
 875 
 876     /* process the connection */
 877     mca_oob_tcp_module.accept_connection(new_connection->fd,
 878                                          (struct sockaddr*) &(new_connection->addr));
 879     /* cleanup */
 880     OBJ_RELEASE(new_connection);
 881 }
 882 
 883 /*
 884  * Handler for accepting connections from the event library
 885  */
 886 static void connection_event_handler(int incoming_sd, short flags, void* cbdata)
 887 {
 888     struct sockaddr addr;
 889     opal_socklen_t addrlen = sizeof(struct sockaddr);
 890     int sd;
 891 
 892     sd = accept(incoming_sd, (struct sockaddr*)&addr, &addrlen);
 893     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 894                         "%s connection_event_handler: working connection "
 895                         "(%d, %d) %s:%d\n",
 896                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 897                         sd, opal_socket_errno,
 898                         opal_net_get_hostname((struct sockaddr*) &addr),
 899                         opal_net_get_port((struct sockaddr*) &addr));
 900     if (sd < 0) {
 901         /* Non-fatal errors */
 902         if (EINTR == opal_socket_errno ||
 903             EAGAIN == opal_socket_errno ||
 904             EWOULDBLOCK == opal_socket_errno) {
 905             return;
 906         }
 907 
 908         /* If we run out of file descriptors, log an extra warning (so
 909            that the user can know to fix this problem) and abandon all
 910            hope. */
 911         else if (EMFILE == opal_socket_errno) {
 912             CLOSE_THE_SOCKET(incoming_sd);
 913             ORTE_ERROR_LOG(ORTE_ERR_SYS_LIMITS_SOCKETS);
 914             orte_show_help("help-oob-tcp.txt",
 915                            "accept failed",
 916                            true,
 917                            opal_process_info.nodename,
 918                            opal_socket_errno,
 919                            strerror(opal_socket_errno),
 920                            "Out of file descriptors");
 921             orte_errmgr.abort(ORTE_ERROR_DEFAULT_EXIT_CODE, NULL);
 922             return;
 923         }
 924 
 925         /* For all other cases, close the socket, print a warning but
 926            try to continue */
 927         else {
 928             CLOSE_THE_SOCKET(incoming_sd);
 929             orte_show_help("help-oob-tcp.txt",
 930                            "accept failed",
 931                            true,
 932                            opal_process_info.nodename,
 933                            opal_socket_errno,
 934                            strerror(opal_socket_errno),
 935                            "Unknown cause; job will try to continue");
 936             return;
 937         }
 938     }
 939 
 940     /* process the connection */
 941     mca_oob_tcp_module.accept_connection(sd, &addr);
 942 }
 943 
 944 
 945 static void tcp_ev_cons(mca_oob_tcp_listener_t* event)
 946 {
 947     event->ev_active = false;
 948     event->tcp6 = false;
 949     event->sd = -1;
 950     event->port = 0;
 951 }
 952 static void tcp_ev_des(mca_oob_tcp_listener_t* event)
 953 {
 954     if (event->ev_active) {
 955         opal_event_del(&event->event);
 956     }
 957     event->ev_active = false;
 958     if (0 <= event->sd) {
 959         CLOSE_THE_SOCKET(event->sd);
 960         event->sd = -1;
 961     }
 962 }
 963 
 964 OBJ_CLASS_INSTANCE(mca_oob_tcp_listener_t,
 965                    opal_list_item_t,
 966                    tcp_ev_cons, tcp_ev_des);
 967 
 968 OBJ_CLASS_INSTANCE(mca_oob_tcp_pending_connection_t,
 969                    opal_object_t,
 970                    NULL,
 971                    NULL);

/* [<][>][^][v][top][bottom][index][help] */