This source file includes following definitions.
- orte_oob_tcp_start_listening
- create_listen
- create_listen6
- listen_thread
- connection_handler
- connection_event_handler
- tcp_ev_cons
- tcp_ev_des
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 
  17 
  18 
  19 
  20 
  21 
  22 
  23 
  24 
  25 
  26 
  27 
  28 
  29 
  30 
  31 #include "orte_config.h"
  32 #include "orte/types.h"
  33 #include "opal/types.h"
  34 
  35 #ifdef HAVE_UNISTD_H
  36 #include <unistd.h>
  37 #endif
  38 #ifdef HAVE_SYS_TYPES_H
  39 #include <sys/types.h>
  40 #endif
  41 #include <fcntl.h>
  42 #ifdef HAVE_NETINET_IN_H
  43 #include <netinet/in.h>
  44 #endif
  45 #ifdef HAVE_ARPA_INET_H
  46 #include <arpa/inet.h>
  47 #endif
  48 #ifdef HAVE_NETDB_H
  49 #include <netdb.h>
  50 #endif
  51 #include <ctype.h>
  52 
  53 #include "opal/util/show_help.h"
  54 #include "opal/util/error.h"
  55 #include "opal/util/output.h"
  56 #include "opal/opal_socket_errno.h"
  57 #include "opal/util/if.h"
  58 #include "opal/util/net.h"
  59 #include "opal/util/argv.h"
  60 #include "opal/util/fd.h"
  61 #include "opal/class/opal_hash_table.h"
  62 #include "opal/class/opal_list.h"
  63 
  64 #include "orte/mca/errmgr/errmgr.h"
  65 #include "orte/mca/ess/ess.h"
  66 #include "orte/util/name_fns.h"
  67 #include "orte/util/parse_options.h"
  68 #include "orte/util/show_help.h"
  69 #include "orte/util/threads.h"
  70 #include "orte/runtime/orte_globals.h"
  71 
  72 #include "orte/mca/oob/tcp/oob_tcp.h"
  73 #include "orte/mca/oob/tcp/oob_tcp_component.h"
  74 #include "orte/mca/oob/tcp/oob_tcp_peer.h"
  75 #include "orte/mca/oob/tcp/oob_tcp_connection.h"
  76 #include "orte/mca/oob/tcp/oob_tcp_listener.h"
  77 #include "orte/mca/oob/tcp/oob_tcp_common.h"
  78 
  79 static void connection_event_handler(int incoming_sd, short flags, void* cbdata);
  80 static void* listen_thread(opal_object_t *obj);
  81 static int create_listen(void);
  82 #if OPAL_ENABLE_IPV6
  83 static int create_listen6(void);
  84 #endif
  85 static void connection_handler(int sd, short flags, void* cbdata);
  86 static void connection_event_handler(int sd, short flags, void* cbdata);
  87 
  88 
  89 
  90 
  91 
  92 
  93 
  94 
  95 
  96 
  97 
  98 
  99 int orte_oob_tcp_start_listening(void)
 100 {
 101     int rc;
 102     mca_oob_tcp_listener_t *listener;
 103 
 104     
 105     if (NULL == mca_oob_tcp_component.ipv4conns
 106 #if OPAL_ENABLE_IPV6
 107         && NULL == mca_oob_tcp_component.ipv6conns
 108 #endif
 109         ) {
 110         ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 111         return ORTE_ERR_NOT_FOUND;
 112     }
 113 
 114     
 115     if (ORTE_SUCCESS != (rc = create_listen())) {
 116         ORTE_ERROR_LOG(rc);
 117         return rc;
 118     }
 119 
 120 #if OPAL_ENABLE_IPV6
 121     
 122     if (ORTE_SUCCESS != (rc = create_listen6())) {
 123         ORTE_ERROR_LOG(rc);
 124         return rc;
 125     }
 126 #endif
 127 
 128     
 129 
 130 
 131     if (ORTE_PROC_IS_HNP) {
 132         if (0 > pipe(mca_oob_tcp_component.stop_thread)) {
 133             ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
 134             return ORTE_ERR_OUT_OF_RESOURCE;
 135         }
 136 
 137         
 138 
 139         if (opal_fd_set_cloexec(mca_oob_tcp_component.stop_thread[0]) != OPAL_SUCCESS ||
 140             opal_fd_set_cloexec(mca_oob_tcp_component.stop_thread[1]) != OPAL_SUCCESS) {
 141             close(mca_oob_tcp_component.stop_thread[0]);
 142             close(mca_oob_tcp_component.stop_thread[1]);
 143             ORTE_ERROR_LOG(ORTE_ERR_IN_ERRNO);
 144             return ORTE_ERR_IN_ERRNO;
 145         }
 146 
 147         mca_oob_tcp_component.listen_thread_active = true;
 148         mca_oob_tcp_component.listen_thread.t_run = listen_thread;
 149         mca_oob_tcp_component.listen_thread.t_arg = NULL;
 150         if (OPAL_SUCCESS != (rc = opal_thread_start(&mca_oob_tcp_component.listen_thread))) {
 151             ORTE_ERROR_LOG(rc);
 152             opal_output(0, "%s Unable to start listen thread", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 153         }
 154         return rc;
 155     }
 156 
 157     
 158     OPAL_LIST_FOREACH(listener, &mca_oob_tcp_component.listeners, mca_oob_tcp_listener_t) {
 159         listener->ev_active = true;
 160         opal_event_set(orte_oob_base.ev_base, &listener->event,
 161                        listener->sd,
 162                        OPAL_EV_READ|OPAL_EV_PERSIST,
 163                        connection_event_handler,
 164                        0);
 165         opal_event_set_priority(&listener->event, ORTE_MSG_PRI);
 166         ORTE_POST_OBJECT(listener);
 167         opal_event_add(&listener->event, 0);
 168     }
 169 
 170     return ORTE_SUCCESS;
 171 }
 172 
 173 
 174 
 175 
 176 
 177 
 178 
 179 
 180 
 181 
 182 static int create_listen(void)
 183 {
 184     int flags, i;
 185     uint16_t port=0;
 186     struct sockaddr_storage inaddr;
 187     opal_socklen_t addrlen;
 188     char **ports=NULL;
 189     int sd = -1;
 190     char *tconn;
 191     mca_oob_tcp_listener_t *conn;
 192 
 193     
 194 
 195 
 196 
 197     if (ORTE_PROC_IS_DAEMON) {
 198         if (NULL != mca_oob_tcp_component.tcp_static_ports) {
 199             
 200 
 201 
 202             opal_argv_append_nosize(&ports, mca_oob_tcp_component.tcp_static_ports[0]);
 203             
 204             orte_static_ports = true;
 205         } else if (NULL != mca_oob_tcp_component.tcp_dyn_ports) {
 206             
 207             ports = opal_argv_copy(mca_oob_tcp_component.tcp_dyn_ports);
 208             orte_static_ports = false;
 209         } else {
 210             
 211             opal_argv_append_nosize(&ports, "0");
 212             orte_static_ports = false;
 213         }
 214     } else if (ORTE_PROC_IS_HNP) {
 215         if (NULL != mca_oob_tcp_component.tcp_static_ports) {
 216             
 217 
 218 
 219             opal_argv_append_nosize(&ports, mca_oob_tcp_component.tcp_static_ports[0]);
 220             
 221             orte_static_ports = true;
 222         } else if (NULL != mca_oob_tcp_component.tcp_dyn_ports) {
 223             
 224             ports = opal_argv_copy(mca_oob_tcp_component.tcp_dyn_ports);
 225             orte_static_ports = false;
 226         } else {
 227             
 228             opal_argv_append_nosize(&ports, "0");
 229             orte_static_ports = false;
 230         }
 231     } else if (ORTE_PROC_IS_MPI) {
 232         if (NULL != mca_oob_tcp_component.tcp_static_ports) {
 233             
 234 
 235 
 236 
 237             orte_node_rank_t nrank;
 238             
 239             if (ORTE_NODE_RANK_INVALID != (nrank = orte_process_info.my_node_rank) &&
 240                 (nrank+1) < opal_argv_count(mca_oob_tcp_component.tcp_static_ports)) {
 241                 
 242                 opal_argv_append_nosize(&ports, mca_oob_tcp_component.tcp_static_ports[nrank+1]);
 243                 
 244                 orte_static_ports = true;
 245             } else {
 246                 
 247                 opal_argv_append_nosize(&ports, "0");
 248                 orte_static_ports = false;
 249             }
 250         } else if (NULL != mca_oob_tcp_component.tcp_dyn_ports) {
 251             
 252             ports = opal_argv_copy(mca_oob_tcp_component.tcp_dyn_ports);
 253             orte_static_ports = false;
 254         } else {
 255             
 256             opal_argv_append_nosize(&ports, "0");
 257             orte_static_ports = false;
 258         }
 259     } else {
 260         
 261 
 262 
 263         opal_argv_append_nosize(&ports, "0");
 264         
 265 
 266 
 267         if (NULL != mca_oob_tcp_component.tcp_static_ports) {
 268             orte_static_ports = true;
 269         } else {
 270             orte_static_ports = false;
 271         }
 272     }
 273 
 274     
 275     if (NULL == ports) {
 276         return ORTE_ERROR;
 277     }
 278 
 279     
 280     memset(&inaddr, 0, sizeof(inaddr));
 281     ((struct sockaddr_in*) &inaddr)->sin_family = AF_INET;
 282     ((struct sockaddr_in*) &inaddr)->sin_addr.s_addr = INADDR_ANY;
 283     addrlen = sizeof(struct sockaddr_in);
 284 
 285     
 286 
 287 
 288 
 289 
 290     for (i=0; i < opal_argv_count(ports); i++) {
 291         opal_output_verbose(5, orte_oob_base_framework.framework_output,
 292                             "%s attempting to bind to IPv4 port %s",
 293                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 294                             ports[i]);
 295         
 296         port = strtol(ports[i], NULL, 10);
 297         
 298         port = htons(port);
 299 
 300         ((struct sockaddr_in*) &inaddr)->sin_port = port;
 301 
 302         
 303         sd = socket(AF_INET, SOCK_STREAM, 0);
 304         if (sd < 0) {
 305             if (EAFNOSUPPORT != opal_socket_errno) {
 306                 opal_output(0,"mca_oob_tcp_component_init: socket() failed: %s (%d)",
 307                             strerror(opal_socket_errno), opal_socket_errno);
 308             }
 309             opal_argv_free(ports);
 310             return ORTE_ERR_IN_ERRNO;
 311         }
 312 
 313         
 314         if (orte_static_ports) {
 315             flags = 1;
 316         } else {
 317             flags = 0;
 318         }
 319         if (setsockopt (sd, SOL_SOCKET, SO_REUSEADDR, (const char *)&flags, sizeof(flags)) < 0) {
 320             opal_output(0, "mca_oob_tcp_create_listen: unable to set the "
 321                         "SO_REUSEADDR option (%s:%d)\n",
 322                         strerror(opal_socket_errno), opal_socket_errno);
 323             CLOSE_THE_SOCKET(sd);
 324             opal_argv_free(ports);
 325             return ORTE_ERROR;
 326         }
 327 
 328         
 329 
 330         if (opal_fd_set_cloexec(sd) != OPAL_SUCCESS) {
 331             opal_output(0, "mca_oob_tcp_create_listen: unable to set the "
 332                         "listening socket to CLOEXEC (%s:%d)\n",
 333                         strerror(opal_socket_errno), opal_socket_errno);
 334             CLOSE_THE_SOCKET(sd);
 335             opal_argv_free(ports);
 336             return ORTE_ERROR;
 337         }
 338 
 339         if (bind(sd, (struct sockaddr*)&inaddr, addrlen) < 0) {
 340             if( (EADDRINUSE == opal_socket_errno) || (EADDRNOTAVAIL == opal_socket_errno) ) {
 341                 continue;
 342             }
 343             opal_output(0, "%s bind() failed for port %d: %s (%d)",
 344                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 345                         (int)ntohs(port),
 346                         strerror(opal_socket_errno),
 347                         opal_socket_errno );
 348             CLOSE_THE_SOCKET(sd);
 349             opal_argv_free(ports);
 350             return ORTE_ERROR;
 351         }
 352         
 353         if (getsockname(sd, (struct sockaddr*)&inaddr, &addrlen) < 0) {
 354             opal_output(0, "mca_oob_tcp_create_listen: getsockname(): %s (%d)",
 355                         strerror(opal_socket_errno), opal_socket_errno);
 356             CLOSE_THE_SOCKET(sd);
 357             opal_argv_free(ports);
 358             return ORTE_ERROR;
 359         }
 360 
 361         
 362         if (listen(sd, SOMAXCONN) < 0) {
 363             opal_output(0, "mca_oob_tcp_component_init: listen(): %s (%d)",
 364                         strerror(opal_socket_errno), opal_socket_errno);
 365             CLOSE_THE_SOCKET(sd);
 366             opal_argv_free(ports);
 367             return ORTE_ERROR;
 368         }
 369 
 370         
 371         if ((flags = fcntl(sd, F_GETFL, 0)) < 0) {
 372             opal_output(0, "mca_oob_tcp_component_init: fcntl(F_GETFL) failed: %s (%d)",
 373                         strerror(opal_socket_errno), opal_socket_errno);
 374             CLOSE_THE_SOCKET(sd);
 375             opal_argv_free(ports);
 376             return ORTE_ERROR;
 377         }
 378         flags |= O_NONBLOCK;
 379         if (fcntl(sd, F_SETFL, flags) < 0) {
 380             opal_output(0, "mca_oob_tcp_component_init: fcntl(F_SETFL) failed: %s (%d)",
 381                         strerror(opal_socket_errno), opal_socket_errno);
 382             CLOSE_THE_SOCKET(sd);
 383             opal_argv_free(ports);
 384             return ORTE_ERROR;
 385         }
 386 
 387         
 388         conn = OBJ_NEW(mca_oob_tcp_listener_t);
 389         conn->sd = sd;
 390         conn->port = ntohs(((struct sockaddr_in*) &inaddr)->sin_port);
 391         if (0 == orte_process_info.my_port) {
 392             
 393             orte_process_info.my_port = conn->port;
 394         }
 395         opal_list_append(&mca_oob_tcp_component.listeners, &conn->item);
 396         
 397         opal_asprintf(&tconn, "%d", ntohs(((struct sockaddr_in*) &inaddr)->sin_port));
 398         opal_argv_append_nosize(&mca_oob_tcp_component.ipv4ports, tconn);
 399         free(tconn);
 400         if (OOB_TCP_DEBUG_CONNECT <= opal_output_get_verbosity(orte_oob_base_framework.framework_output)) {
 401             port = ntohs(((struct sockaddr_in*) &inaddr)->sin_port);
 402             opal_output(0, "%s assigned IPv4 port %d",
 403                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), port);
 404         }
 405 
 406         if (!ORTE_PROC_IS_HNP) {
 407             
 408             break;
 409         }
 410     }
 411     
 412     opal_argv_free(ports);
 413 
 414     if (0 == opal_list_get_size(&mca_oob_tcp_component.listeners)) {
 415         
 416         if (0 <= sd) {
 417             CLOSE_THE_SOCKET(sd);
 418         }
 419         return ORTE_ERR_SOCKET_NOT_AVAILABLE;
 420     }
 421 
 422     return ORTE_SUCCESS;
 423 }
 424 
 425 #if OPAL_ENABLE_IPV6
 426 
 427 
 428 
 429 
 430 
 431 
 432 
 433 
 434 
 435 static int create_listen6(void)
 436 {
 437     int flags, i;
 438     uint16_t port=0;
 439     struct sockaddr_storage inaddr;
 440     opal_socklen_t addrlen;
 441     char **ports=NULL;
 442     int sd;
 443     char *tconn;
 444     mca_oob_tcp_listener_t *conn;
 445 
 446     
 447 
 448 
 449 
 450     if (ORTE_PROC_IS_DAEMON) {
 451         if (NULL != mca_oob_tcp_component.tcp6_static_ports) {
 452             
 453 
 454 
 455             opal_argv_append_nosize(&ports, mca_oob_tcp_component.tcp6_static_ports[0]);
 456             
 457             orte_static_ports = true;
 458         } else if (NULL != mca_oob_tcp_component.tcp6_dyn_ports) {
 459             
 460             ports = opal_argv_copy(mca_oob_tcp_component.tcp6_dyn_ports);
 461             orte_static_ports = false;
 462         } else {
 463             
 464             opal_argv_append_nosize(&ports, "0");
 465             orte_static_ports = false;
 466         }
 467     } else if (ORTE_PROC_IS_HNP) {
 468         if (NULL != mca_oob_tcp_component.tcp6_static_ports) {
 469             
 470 
 471 
 472             opal_argv_append_nosize(&ports, mca_oob_tcp_component.tcp6_static_ports[0]);
 473             
 474             orte_static_ports = true;
 475         } else if (NULL != mca_oob_tcp_component.tcp6_dyn_ports) {
 476             
 477             ports = opal_argv_copy(mca_oob_tcp_component.tcp6_dyn_ports);
 478             orte_static_ports = false;
 479         } else {
 480             
 481             opal_argv_append_nosize(&ports, "0");
 482             orte_static_ports = false;
 483         }
 484     } else if (ORTE_PROC_IS_MPI) {
 485         if (NULL != mca_oob_tcp_component.tcp6_static_ports) {
 486             
 487 
 488 
 489 
 490             orte_node_rank_t nrank;
 491             
 492             if (ORTE_NODE_RANK_INVALID != (nrank = orte_process_info.my_node_rank) &&
 493                 (nrank+1) < opal_argv_count(mca_oob_tcp_component.tcp6_static_ports)) {
 494                 
 495                 opal_argv_append_nosize(&ports, mca_oob_tcp_component.tcp6_static_ports[nrank+1]);
 496                 
 497                 orte_static_ports = true;
 498             } else {
 499                 
 500                 opal_argv_append_nosize(&ports, "0");
 501                 orte_static_ports = false;
 502             }
 503         } else if (NULL != mca_oob_tcp_component.tcp6_dyn_ports) {
 504             
 505             ports = opal_argv_copy(mca_oob_tcp_component.tcp6_dyn_ports);
 506             orte_static_ports = false;
 507         } else {
 508             
 509             opal_argv_append_nosize(&ports, "0");
 510             orte_static_ports = false;
 511         }
 512     } else {
 513         
 514 
 515 
 516         opal_argv_append_nosize(&ports, "0");
 517         
 518 
 519 
 520         if (NULL != mca_oob_tcp_component.tcp6_static_ports) {
 521             orte_static_ports = true;
 522         } else {
 523             orte_static_ports = false;
 524         }
 525     }
 526 
 527     
 528     if (NULL == ports) {
 529         return ORTE_ERROR;
 530     }
 531 
 532     
 533     memset(&inaddr, 0, sizeof(inaddr));
 534     ((struct sockaddr_in6*) &inaddr)->sin6_family = AF_INET6;
 535     ((struct sockaddr_in6*) &inaddr)->sin6_addr = in6addr_any;
 536     addrlen = sizeof(struct sockaddr_in6);
 537 
 538     
 539 
 540 
 541 
 542 
 543     for (i=0; i < opal_argv_count(ports); i++) {
 544         opal_output_verbose(5, orte_oob_base_framework.framework_output,
 545                             "%s attempting to bind to IPv6 port %s",
 546                             ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 547                             ports[i]);
 548         
 549         port = strtol(ports[i], NULL, 10);
 550         
 551         port = htons(port);
 552 
 553         ((struct sockaddr_in6*) &inaddr)->sin6_port = port;
 554 
 555         
 556         sd = socket(AF_INET6, SOCK_STREAM, 0);
 557         if (sd < 0) {
 558             if (EAFNOSUPPORT != opal_socket_errno) {
 559                 opal_output(0,"mca_oob_tcp_component_init: socket() failed: %s (%d)",
 560                             strerror(opal_socket_errno), opal_socket_errno);
 561             }
 562             return ORTE_ERR_IN_ERRNO;
 563         }
 564         
 565 
 566         if (opal_fd_set_cloexec(sd) != OPAL_SUCCESS) {
 567             opal_output(0, "mca_oob_tcp_create_listen6: unable to set the "
 568                         "listening socket to CLOEXEC (%s:%d)\n",
 569                         strerror(opal_socket_errno), opal_socket_errno);
 570             CLOSE_THE_SOCKET(sd);
 571             opal_argv_free(ports);
 572             return ORTE_ERROR;
 573         }
 574 
 575         
 576         if (orte_static_ports) {
 577             flags = 1;
 578         } else {
 579             flags = 0;
 580         }
 581         if (setsockopt (sd, SOL_SOCKET, SO_REUSEADDR, (const char *)&flags, sizeof(flags)) < 0) {
 582             opal_output(0, "mca_oob_tcp_create_listen: unable to set the "
 583                         "SO_REUSEADDR option (%s:%d)\n",
 584                         strerror(opal_socket_errno), opal_socket_errno);
 585             CLOSE_THE_SOCKET(sd);
 586             opal_argv_free(ports);
 587             return ORTE_ERROR;
 588         }
 589 
 590         if (bind(sd, (struct sockaddr*)&inaddr, addrlen) < 0) {
 591             if( (EADDRINUSE == opal_socket_errno) || (EADDRNOTAVAIL == opal_socket_errno) ) {
 592                 continue;
 593             }
 594             opal_output(0, "%s bind() failed for port %d: %s (%d)",
 595                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 596                         (int)ntohs(port),
 597                         strerror(opal_socket_errno),
 598                         opal_socket_errno );
 599             CLOSE_THE_SOCKET(sd);
 600             opal_argv_free(ports);
 601             return ORTE_ERROR;
 602         }
 603         
 604         if (getsockname(sd, (struct sockaddr*)&inaddr, &addrlen) < 0) {
 605             opal_output(0, "mca_oob_tcp_create_listen: getsockname(): %s (%d)",
 606                         strerror(opal_socket_errno), opal_socket_errno);
 607             CLOSE_THE_SOCKET(sd);
 608             return ORTE_ERROR;
 609         }
 610 
 611         
 612         if (listen(sd, SOMAXCONN) < 0) {
 613             opal_output(0, "mca_oob_tcp_component_init: listen(): %s (%d)",
 614                         strerror(opal_socket_errno), opal_socket_errno);
 615             return ORTE_ERROR;
 616         }
 617 
 618         
 619         if ((flags = fcntl(sd, F_GETFL, 0)) < 0) {
 620             opal_output(0, "mca_oob_tcp_component_init: fcntl(F_GETFL) failed: %s (%d)",
 621                         strerror(opal_socket_errno), opal_socket_errno);
 622             return ORTE_ERROR;
 623         }
 624         flags |= O_NONBLOCK;
 625         if (fcntl(sd, F_SETFL, flags) < 0) {
 626             opal_output(0, "mca_oob_tcp_component_init: fcntl(F_SETFL) failed: %s (%d)",
 627                         strerror(opal_socket_errno), opal_socket_errno);
 628             return ORTE_ERROR;
 629         }
 630 
 631         
 632         conn = OBJ_NEW(mca_oob_tcp_listener_t);
 633         conn->tcp6 = true;
 634         conn->sd = sd;
 635         conn->port = ntohs(((struct sockaddr_in6*) &inaddr)->sin6_port);
 636         opal_list_append(&mca_oob_tcp_component.listeners, &conn->item);
 637         
 638         opal_asprintf(&tconn, "%d", ntohs(((struct sockaddr_in6*) &inaddr)->sin6_port));
 639         opal_argv_append_nosize(&mca_oob_tcp_component.ipv6ports, tconn);
 640         free(tconn);
 641         if (OOB_TCP_DEBUG_CONNECT <= opal_output_get_verbosity(orte_oob_base_framework.framework_output)) {
 642             opal_output(0, "%s assigned IPv6 port %d",
 643                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 644                         (int)ntohs(((struct sockaddr_in6*) &inaddr)->sin6_port));
 645         }
 646 
 647         if (!ORTE_PROC_IS_HNP) {
 648             
 649             break;
 650         }
 651     }
 652     if (0 == opal_list_get_size(&mca_oob_tcp_component.listeners)) {
 653         
 654         CLOSE_THE_SOCKET(sd);
 655         opal_argv_free(ports);
 656         return ORTE_ERR_SOCKET_NOT_AVAILABLE;
 657     }
 658 
 659     
 660     opal_argv_free(ports);
 661 
 662     return ORTE_SUCCESS;
 663 }
 664 #endif
 665 
 666 
 667 
 668 
 669 
 670 
 671 
 672 
 673 static void* listen_thread(opal_object_t *obj)
 674 {
 675     int rc, max, accepted_connections, sd;
 676     opal_socklen_t addrlen = sizeof(struct sockaddr_storage);
 677     mca_oob_tcp_pending_connection_t *pending_connection;
 678     struct timeval timeout;
 679     fd_set readfds;
 680     mca_oob_tcp_listener_t *listener;
 681 
 682     
 683 
 684 
 685 
 686 
 687     while (mca_oob_tcp_component.listen_thread_active) {
 688         FD_ZERO(&readfds);
 689         max = -1;
 690         OPAL_LIST_FOREACH(listener, &mca_oob_tcp_component.listeners, mca_oob_tcp_listener_t) {
 691             FD_SET(listener->sd, &readfds);
 692             max = (listener->sd > max) ? listener->sd : max;
 693         }
 694         
 695         FD_SET(mca_oob_tcp_component.stop_thread[0], &readfds);
 696         max = (mca_oob_tcp_component.stop_thread[0] > max) ? mca_oob_tcp_component.stop_thread[0] : max;
 697 
 698         
 699         timeout.tv_sec = mca_oob_tcp_component.listen_thread_tv.tv_sec;
 700         timeout.tv_usec = mca_oob_tcp_component.listen_thread_tv.tv_usec;
 701 
 702         
 703 
 704 
 705         rc = select(max + 1, &readfds, NULL, NULL, &timeout);
 706         if (!mca_oob_tcp_component.listen_thread_active) {
 707             
 708             close(mca_oob_tcp_component.stop_thread[0]);
 709             close(mca_oob_tcp_component.stop_thread[1]);
 710             return NULL;
 711         }
 712         if (rc < 0) {
 713             if (EAGAIN != opal_socket_errno && EINTR != opal_socket_errno) {
 714                 perror("select");
 715             }
 716             continue;
 717         }
 718 
 719         
 720 
 721 
 722 
 723         do {
 724             accepted_connections = 0;
 725             OPAL_LIST_FOREACH(listener, &mca_oob_tcp_component.listeners, mca_oob_tcp_listener_t) {
 726                 sd = listener->sd;
 727 
 728                 
 729 
 730 
 731 
 732 
 733 
 734                 if (0 == FD_ISSET(sd, &readfds)) {
 735                     
 736                     continue;
 737                 }
 738 
 739                 
 740 
 741 
 742 
 743 
 744 
 745 
 746                 pending_connection = OBJ_NEW(mca_oob_tcp_pending_connection_t);
 747                 opal_event_set(orte_oob_base.ev_base, &pending_connection->ev, -1,
 748                                OPAL_EV_WRITE, connection_handler, pending_connection);
 749                 opal_event_set_priority(&pending_connection->ev, ORTE_MSG_PRI);
 750                 pending_connection->fd = accept(sd,
 751                                                 (struct sockaddr*)&(pending_connection->addr),
 752                                                 &addrlen);
 753 
 754                 
 755                 if (pending_connection->fd < 0) {
 756                     OBJ_RELEASE(pending_connection);
 757 
 758                     
 759                     if (EAGAIN == opal_socket_errno ||
 760                         EWOULDBLOCK == opal_socket_errno) {
 761                         continue;
 762                     }
 763 
 764                     
 765 
 766 
 767                     else if (EMFILE == opal_socket_errno) {
 768                         CLOSE_THE_SOCKET(sd);
 769                         ORTE_ERROR_LOG(ORTE_ERR_SYS_LIMITS_SOCKETS);
 770                         orte_show_help("help-oob-tcp.txt",
 771                                        "accept failed",
 772                                        true,
 773                                        opal_process_info.nodename,
 774                                        opal_socket_errno,
 775                                        strerror(opal_socket_errno),
 776                                        "Out of file descriptors");
 777                         goto done;
 778                     }
 779 
 780                     
 781 
 782                     else {
 783                         orte_show_help("help-oob-tcp.txt",
 784                                        "accept failed",
 785                                        true,
 786                                        opal_process_info.nodename,
 787                                        opal_socket_errno,
 788                                        strerror(opal_socket_errno),
 789                                        "Unknown cause; job will try to continue");
 790                         continue;
 791                     }
 792                 }
 793 
 794                 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 795                                     "%s mca_oob_tcp_listen_thread: incoming connection: "
 796                                     "(%d, %d) %s:%d\n",
 797                                     ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 798                                     pending_connection->fd, opal_socket_errno,
 799                                     opal_net_get_hostname((struct sockaddr*) &pending_connection->addr),
 800                                     opal_net_get_port((struct sockaddr*) &pending_connection->addr));
 801 
 802                 
 803 
 804 
 805                 if (1024 >= listener->port) {
 806                     uint16_t inport;
 807                     inport = opal_net_get_port((struct sockaddr*) &pending_connection->addr);
 808                     if (1024 < inport) {
 809                         
 810 
 811                         orte_show_help("help-oob-tcp.txt",
 812                                        "privilege failure", true,
 813                                        opal_process_info.nodename, listener->port,
 814                                        opal_net_get_hostname((struct sockaddr*) &pending_connection->addr),
 815                                        inport);
 816                         CLOSE_THE_SOCKET(pending_connection->fd);
 817                         OBJ_RELEASE(pending_connection);
 818                         continue;
 819                     }
 820                 }
 821 
 822                 
 823                 ORTE_POST_OBJECT(pending_connection);
 824                 opal_event_active(&pending_connection->ev, OPAL_EV_WRITE, 1);
 825                 accepted_connections++;
 826             }
 827         } while (accepted_connections > 0);
 828     }
 829 
 830  done:
 831 #if 0
 832     
 833 
 834 
 835 
 836 
 837 
 838 
 839 
 840     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 841                         "%s mca_oob_tcp_listen_thread: switching to event lib",
 842                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 843     
 844     OPAL_LIST_FOREACH(listener, &mca_oob_tcp_component.listeners, mca_oob_tcp_listener_t) {
 845         opal_event_set(orte_event_base, listener->event,
 846                    listener->sd,
 847                    OPAL_EV_READ|OPAL_EV_PERSIST,
 848                    connection_event_handler,
 849                    0);
 850         opal_event_set_priority(listener->event, ORTE_MSG_PRI);
 851         opal_event_add(listener->event, 0);
 852     }
 853 #endif
 854     return NULL;
 855 }
 856 
 857 
 858 
 859 
 860 static void connection_handler(int sd, short flags, void* cbdata)
 861 {
 862     mca_oob_tcp_pending_connection_t *new_connection;
 863 
 864     new_connection = (mca_oob_tcp_pending_connection_t*)cbdata;
 865 
 866     ORTE_ACQUIRE_OBJECT(new_connection);
 867 
 868     opal_output_verbose(4, orte_oob_base_framework.framework_output,
 869                         "%s connection_handler: working connection "
 870                         "(%d, %d) %s:%d\n",
 871                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 872                         new_connection->fd, opal_socket_errno,
 873                         opal_net_get_hostname((struct sockaddr*) &new_connection->addr),
 874                         opal_net_get_port((struct sockaddr*) &new_connection->addr));
 875 
 876     
 877     mca_oob_tcp_module.accept_connection(new_connection->fd,
 878                                          (struct sockaddr*) &(new_connection->addr));
 879     
 880     OBJ_RELEASE(new_connection);
 881 }
 882 
 883 
 884 
 885 
 886 static void connection_event_handler(int incoming_sd, short flags, void* cbdata)
 887 {
 888     struct sockaddr addr;
 889     opal_socklen_t addrlen = sizeof(struct sockaddr);
 890     int sd;
 891 
 892     sd = accept(incoming_sd, (struct sockaddr*)&addr, &addrlen);
 893     opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
 894                         "%s connection_event_handler: working connection "
 895                         "(%d, %d) %s:%d\n",
 896                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 897                         sd, opal_socket_errno,
 898                         opal_net_get_hostname((struct sockaddr*) &addr),
 899                         opal_net_get_port((struct sockaddr*) &addr));
 900     if (sd < 0) {
 901         
 902         if (EINTR == opal_socket_errno ||
 903             EAGAIN == opal_socket_errno ||
 904             EWOULDBLOCK == opal_socket_errno) {
 905             return;
 906         }
 907 
 908         
 909 
 910 
 911         else if (EMFILE == opal_socket_errno) {
 912             CLOSE_THE_SOCKET(incoming_sd);
 913             ORTE_ERROR_LOG(ORTE_ERR_SYS_LIMITS_SOCKETS);
 914             orte_show_help("help-oob-tcp.txt",
 915                            "accept failed",
 916                            true,
 917                            opal_process_info.nodename,
 918                            opal_socket_errno,
 919                            strerror(opal_socket_errno),
 920                            "Out of file descriptors");
 921             orte_errmgr.abort(ORTE_ERROR_DEFAULT_EXIT_CODE, NULL);
 922             return;
 923         }
 924 
 925         
 926 
 927         else {
 928             CLOSE_THE_SOCKET(incoming_sd);
 929             orte_show_help("help-oob-tcp.txt",
 930                            "accept failed",
 931                            true,
 932                            opal_process_info.nodename,
 933                            opal_socket_errno,
 934                            strerror(opal_socket_errno),
 935                            "Unknown cause; job will try to continue");
 936             return;
 937         }
 938     }
 939 
 940     
 941     mca_oob_tcp_module.accept_connection(sd, &addr);
 942 }
 943 
 944 
 945 static void tcp_ev_cons(mca_oob_tcp_listener_t* event)
 946 {
 947     event->ev_active = false;
 948     event->tcp6 = false;
 949     event->sd = -1;
 950     event->port = 0;
 951 }
 952 static void tcp_ev_des(mca_oob_tcp_listener_t* event)
 953 {
 954     if (event->ev_active) {
 955         opal_event_del(&event->event);
 956     }
 957     event->ev_active = false;
 958     if (0 <= event->sd) {
 959         CLOSE_THE_SOCKET(event->sd);
 960         event->sd = -1;
 961     }
 962 }
 963 
 964 OBJ_CLASS_INSTANCE(mca_oob_tcp_listener_t,
 965                    opal_list_item_t,
 966                    tcp_ev_cons, tcp_ev_des);
 967 
 968 OBJ_CLASS_INSTANCE(mca_oob_tcp_pending_connection_t,
 969                    opal_object_t,
 970                    NULL,
 971                    NULL);