root/orte/util/listener.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. orte_register_listener
  2. orte_start_listening
  3. orte_stop_listening
  4. listen_thread_fn
  5. lcons
  6. ldes

   1 /*
   2  * Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2011 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2006-2013 Los Alamos National Security, LLC.
  13  *                         All rights reserved.
  14  * Copyright (c) 2009-2015 Cisco Systems, Inc.  All rights reserved.
  15  * Copyright (c) 2011      Oak Ridge National Labs.  All rights reserved.
  16  * Copyright (c) 2013-2015 Intel, Inc.  All rights reserved.
  17  * Copyright (c) 2015      Research Organization for Information Science
  18  *                         and Technology (RIST). All rights reserved.
  19  * $COPYRIGHT$
  20  *
  21  * Additional copyrights may follow
  22  *
  23  * $HEADER$
  24  *
  25  * In windows, many of the socket functions return an EWOULDBLOCK
  26  * instead of things like EAGAIN, EINPROGRESS, etc. It has been
  27  * verified that this will not conflict with other error codes that
  28  * are returned by these functions under UNIX/Linux environments
  29  */
  30 
  31 #include "orte_config.h"
  32 #include "orte/types.h"
  33 #include "opal/types.h"
  34 
  35 #ifdef HAVE_UNISTD_H
  36 #include <unistd.h>
  37 #endif
  38 #ifdef HAVE_SYS_TYPES_H
  39 #include <sys/types.h>
  40 #endif
  41 #include <fcntl.h>
  42 #ifdef HAVE_NETINET_IN_H
  43 #include <netinet/in.h>
  44 #endif
  45 #ifdef HAVE_ARPA_INET_H
  46 #include <arpa/inet.h>
  47 #endif
  48 #ifdef HAVE_NETDB_H
  49 #include <netdb.h>
  50 #endif
  51 #ifdef HAVE_SYS_SOCKET_H
  52 #include <sys/socket.h>
  53 #endif
  54 
  55 #include <ctype.h>
  56 
  57 #include "opal/util/error.h"
  58 #include "opal/util/output.h"
  59 #include "opal/opal_socket_errno.h"
  60 #include "opal/util/if.h"
  61 #include "opal/util/net.h"
  62 #include "opal/util/fd.h"
  63 #include "opal/class/opal_list.h"
  64 
  65 #include "orte/mca/errmgr/errmgr.h"
  66 #include "orte/util/name_fns.h"
  67 #include "orte/runtime/orte_globals.h"
  68 #include "orte/util/show_help.h"
  69 
  70 #include "orte/util/listener.h"
  71 
  72 static void* listen_thread_fn(opal_object_t *obj);
  73 static opal_list_t mylisteners;
  74 static bool initialized = false;
  75 static opal_thread_t listen_thread;
  76 static volatile bool listen_thread_active = false;
  77 static struct timeval listen_thread_tv;
  78 static int stop_thread[2];
  79 
  80 #define CLOSE_THE_SOCKET(socket)    \
  81     do {                            \
  82         shutdown(socket, 2);        \
  83         close(socket);              \
  84         socket = -1;                \
  85     } while(0)
  86 
  87 
  88 int orte_register_listener(struct sockaddr* address, opal_socklen_t addrlen,
  89                            opal_event_base_t *evbase,
  90                            orte_listener_callback_fn_t handler)
  91 {
  92     orte_listener_t *conn;
  93     int flags;
  94     int sd = -1;
  95 
  96     if (!initialized) {
  97         OBJ_CONSTRUCT(&mylisteners, opal_list_t);
  98         OBJ_CONSTRUCT(&listen_thread, opal_thread_t);
  99         if (0 > pipe(stop_thread)) {
 100             ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
 101             return ORTE_ERR_OUT_OF_RESOURCE;
 102         }
 103         /* Make sure the pipe FDs are set to close-on-exec so that
 104            they don't leak into children */
 105         if (opal_fd_set_cloexec(stop_thread[0]) != OPAL_SUCCESS ||
 106             opal_fd_set_cloexec(stop_thread[1]) != OPAL_SUCCESS) {
 107             close(stop_thread[0]);
 108             close(stop_thread[1]);
 109             ORTE_ERROR_LOG(ORTE_ERR_IN_ERRNO);
 110             return ORTE_ERR_IN_ERRNO;
 111         }
 112         listen_thread_tv.tv_sec = 3600;
 113         listen_thread_tv.tv_usec = 0;
 114         initialized = true;
 115     }
 116 
 117     /* create a listen socket for incoming connection attempts */
 118     sd = socket(PF_UNIX, SOCK_STREAM, 0);
 119     if (sd < 0) {
 120         if (EAFNOSUPPORT != opal_socket_errno) {
 121             opal_output(0,"pmix_server_start_listening: socket() failed: %s (%d)",
 122                         strerror(opal_socket_errno), opal_socket_errno);
 123         }
 124         return ORTE_ERR_IN_ERRNO;
 125     }
 126     /* Set the socket to close-on-exec so that no children inherit
 127        this FD */
 128     if (opal_fd_set_cloexec(sd) != OPAL_SUCCESS) {
 129         opal_output(0, "pmix_server: unable to set the "
 130                     "listening socket to CLOEXEC (%s:%d)\n",
 131                     strerror(opal_socket_errno), opal_socket_errno);
 132         CLOSE_THE_SOCKET(sd);
 133         return ORTE_ERROR;
 134     }
 135 
 136 
 137     if (bind(sd, (struct sockaddr*)address, addrlen) < 0) {
 138         opal_output(0, "%s bind() failed on error %s (%d)",
 139                     ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 140                     strerror(opal_socket_errno),
 141                     opal_socket_errno );
 142         CLOSE_THE_SOCKET(sd);
 143         return ORTE_ERROR;
 144     }
 145 
 146     /* setup listen backlog to maximum allowed by kernel */
 147     if (listen(sd, SOMAXCONN) < 0) {
 148         opal_output(0, "orte_listener: listen() failed: %s (%d)",
 149                     strerror(opal_socket_errno), opal_socket_errno);
 150         CLOSE_THE_SOCKET(sd);
 151         return ORTE_ERROR;
 152     }
 153 
 154     /* set socket up to be non-blocking, otherwise accept could block */
 155     if ((flags = fcntl(sd, F_GETFL, 0)) < 0) {
 156         opal_output(0, "orte_listener: fcntl(F_GETFL) failed: %s (%d)",
 157                     strerror(opal_socket_errno), opal_socket_errno);
 158         CLOSE_THE_SOCKET(sd);
 159         return ORTE_ERROR;
 160     }
 161     flags |= O_NONBLOCK;
 162     if (fcntl(sd, F_SETFL, flags) < 0) {
 163         opal_output(0, "orte_listener: fcntl(F_SETFL) failed: %s (%d)",
 164                     strerror(opal_socket_errno), opal_socket_errno);
 165         CLOSE_THE_SOCKET(sd);
 166         return ORTE_ERROR;
 167     }
 168 
 169     /* add this port to our connections */
 170     conn = OBJ_NEW(orte_listener_t);
 171     conn->sd = sd;
 172     conn->evbase = evbase;
 173     conn->handler = handler;
 174     opal_list_append(&mylisteners, &conn->item);
 175 
 176     return ORTE_SUCCESS;
 177 }
 178 
 179 /*
 180  * Component initialization - create a module for each available
 181  * TCP interface and initialize the static resources associated
 182  * with that module.
 183  *
 184  * Also initializes the list of devices that will be used/supported by
 185  * the module, using the if_include and if_exclude variables.  This is
 186  * the only place that this sorting should occur -- all other places
 187  * should use the tcp_avaiable_devices list.  This is a change from
 188  * previous versions of this component.
 189  */
 190 int orte_start_listening(void)
 191 {
 192     int rc;
 193 
 194     /* if we aren't initialized, or have nothing
 195      * registered, or are already listening, then return SUCCESS */
 196     if (!initialized || 0 == opal_list_get_size(&mylisteners) ||
 197         listen_thread_active) {
 198         return ORTE_SUCCESS;
 199     }
 200 
 201     /* start our listener thread */
 202     listen_thread_active = true;
 203     listen_thread.t_run = listen_thread_fn;
 204     listen_thread.t_arg = NULL;
 205     if (OPAL_SUCCESS != (rc = opal_thread_start(&listen_thread))) {
 206         ORTE_ERROR_LOG(rc);
 207         opal_output(0, "%s Unable to start listen thread", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 208     }
 209     return rc;
 210 }
 211 
 212 void orte_stop_listening(void)
 213 {
 214     int i=0;
 215 
 216     if (!listen_thread_active) {
 217         return;
 218     }
 219 
 220     listen_thread_active = false;
 221     /* tell the thread to exit */
 222     write(stop_thread[1], &i, sizeof(int));
 223     opal_thread_join(&listen_thread, NULL);
 224     OBJ_DESTRUCT(&listen_thread);
 225     OPAL_LIST_DESTRUCT(&mylisteners);
 226 }
 227 
 228 /*
 229  * The listen thread accepts incoming connections and places them
 230  * in a queue for further processing
 231  *
 232  * Runs until orte_listener_shutdown is set to true.
 233  */
 234 static void* listen_thread_fn(opal_object_t *obj)
 235 {
 236     int rc, max, accepted_connections, sd;
 237     opal_socklen_t addrlen = sizeof(struct sockaddr_storage);
 238     orte_pending_connection_t *pending_connection;
 239     struct timeval timeout;
 240     fd_set readfds;
 241     orte_listener_t *listener;
 242 
 243     while (listen_thread_active) {
 244         FD_ZERO(&readfds);
 245         max = -1;
 246         OPAL_LIST_FOREACH(listener, &mylisteners, orte_listener_t) {
 247             FD_SET(listener->sd, &readfds);
 248             max = (listener->sd > max) ? listener->sd : max;
 249         }
 250         /* add the stop_thread fd */
 251         FD_SET(stop_thread[0], &readfds);
 252         max = (stop_thread[0] > max) ? stop_thread[0] : max;
 253 
 254         /* set timeout interval */
 255         timeout.tv_sec = listen_thread_tv.tv_sec;
 256         timeout.tv_usec = listen_thread_tv.tv_usec;
 257 
 258         /* Block in a select to avoid hammering the cpu.  If a connection
 259          * comes in, we'll get woken up right away.
 260          */
 261         rc = select(max + 1, &readfds, NULL, NULL, &timeout);
 262         if (!listen_thread_active) {
 263             /* we've been asked to terminate */
 264             goto done;
 265         }
 266         if (rc < 0) {
 267             if (EAGAIN != opal_socket_errno && EINTR != opal_socket_errno) {
 268                 perror("select");
 269             }
 270             continue;
 271         }
 272 
 273         /* Spin accepting connections until all active listen sockets
 274          * do not have any incoming connections, pushing each connection
 275          * onto its respective event queue for processing
 276          */
 277         do {
 278             accepted_connections = 0;
 279             OPAL_LIST_FOREACH(listener, &mylisteners, orte_listener_t) {
 280                 sd = listener->sd;
 281 
 282                 /* according to the man pages, select replaces the given descriptor
 283                  * set with a subset consisting of those descriptors that are ready
 284                  * for the specified operation - in this case, a read. So we need to
 285                  * first check to see if this file descriptor is included in the
 286                  * returned subset
 287                  */
 288                 if (0 == FD_ISSET(sd, &readfds)) {
 289                     /* this descriptor is not included */
 290                     continue;
 291                 }
 292 
 293                 /* this descriptor is ready to be read, which means a connection
 294                  * request has been received - so harvest it. All we want to do
 295                  * here is accept the connection and push the info onto the event
 296                  * library for subsequent processing - we don't want to actually
 297                  * process the connection here as it takes too long, and so the
 298                  * OS might start rejecting connections due to timeout.
 299                  */
 300                 pending_connection = OBJ_NEW(orte_pending_connection_t);
 301                 opal_event_set(listener->evbase, &pending_connection->ev, -1,
 302                                OPAL_EV_WRITE, listener->handler, pending_connection);
 303                 opal_event_set_priority(&pending_connection->ev, ORTE_MSG_PRI);
 304                 pending_connection->fd = accept(sd,
 305                                                 (struct sockaddr*)&(pending_connection->addr),
 306                                                 &addrlen);
 307                 if (pending_connection->fd < 0) {
 308                     OBJ_RELEASE(pending_connection);
 309 
 310                     /* Non-fatal errors */
 311                     if (EAGAIN == opal_socket_errno ||
 312                         EWOULDBLOCK == opal_socket_errno) {
 313                         continue;
 314                     }
 315 
 316                     /* If we run out of file descriptors, log an extra
 317                        warning (so that the user can know to fix this
 318                        problem) and abandon all hope. */
 319                     else if (EMFILE == opal_socket_errno) {
 320                         CLOSE_THE_SOCKET(sd);
 321                         ORTE_ERROR_LOG(ORTE_ERR_SYS_LIMITS_SOCKETS);
 322                         orte_show_help("help-oob-tcp.txt",
 323                                        "accept failed",
 324                                        true,
 325                                        opal_process_info.nodename,
 326                                        opal_socket_errno,
 327                                        strerror(opal_socket_errno),
 328                                        "Out of file descriptors");
 329                         goto done;
 330                     }
 331 
 332                     /* For all other cases, close the socket, print a
 333                        warning but try to continue */
 334                     else {
 335                         CLOSE_THE_SOCKET(sd);
 336                         orte_show_help("help-oob-tcp.txt",
 337                                        "accept failed",
 338                                        true,
 339                                        opal_process_info.nodename,
 340                                        opal_socket_errno,
 341                                        strerror(opal_socket_errno),
 342                                        "Unknown cause; job will try to continue");
 343                         continue;
 344                     }
 345                 }
 346 
 347                 /* activate the event */
 348                 opal_event_active(&pending_connection->ev, OPAL_EV_WRITE, 1);
 349                 accepted_connections++;
 350             }
 351         } while (accepted_connections > 0);
 352     }
 353 
 354  done:
 355     close(stop_thread[0]);
 356     close(stop_thread[1]);
 357     return NULL;
 358 }
 359 
 360 
 361 /* INSTANTIATE CLASSES */
 362 static void lcons(orte_listener_t *p)
 363 {
 364     p->sd = -1;
 365     p->evbase = NULL;
 366     p->handler = NULL;
 367 }
 368 static void ldes(orte_listener_t *p)
 369 {
 370     if (0 <= p->sd) {
 371         CLOSE_THE_SOCKET(p->sd);
 372     }
 373 }
 374 OBJ_CLASS_INSTANCE(orte_listener_t,
 375                    opal_list_item_t,
 376                    lcons, ldes);
 377 
 378 OBJ_CLASS_INSTANCE(orte_pending_connection_t,
 379                    opal_object_t,
 380                    NULL,
 381                    NULL);

/* [<][>][^][v][top][bottom][index][help] */