root/opal/mca/pmix/pmix4x/pmix/src/mca/ptl/base/ptl_base_listener.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. setup_listeners
  2. pmix_ptl_base_start_listening
  3. pmix_ptl_base_stop_listening
  4. listen_thread

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2014-2017 Intel, Inc.  All rights reserved.
   4  * Copyright (c) 2014-2019 Research Organization for Information Science
   5  *                         and Technology (RIST).  All rights reserved.
   6  * Copyright (c) 2014-2015 Artem Y. Polyakov <artpol84@gmail.com>.
   7  *                         All rights reserved.
   8  * Copyright (c) 2016      Mellanox Technologies, Inc.
   9  *                         All rights reserved.
  10  * Copyright (c) 2016      IBM Corporation.  All rights reserved.
  11  * $COPYRIGHT$
  12  *
  13  * Additional copyrights may follow
  14  *
  15  * $HEADER$
  16  */
  17 
  18 #include <src/include/pmix_config.h>
  19 
  20 #include <src/include/pmix_stdint.h>
  21 #include <src/include/pmix_socket_errno.h>
  22 
  23 #include <pmix_server.h>
  24 #include "src/include/pmix_globals.h"
  25 
  26 #ifdef HAVE_STRING_H
  27 #include <string.h>
  28 #endif
  29 #include <fcntl.h>
  30 #ifdef HAVE_UNISTD_H
  31 #include <unistd.h>
  32 #endif
  33 #ifdef HAVE_SYS_SOCKET_H
  34 #include <sys/socket.h>
  35 #endif
  36 #ifdef HAVE_SYS_UN_H
  37 #include <sys/un.h>
  38 #endif
  39 #ifdef HAVE_SYS_UIO_H
  40 #include <sys/uio.h>
  41 #endif
  42 #ifdef HAVE_SYS_TYPES_H
  43 #include <sys/types.h>
  44 #endif
  45 #include <ctype.h>
  46 #include <sys/stat.h>
  47 #include PMIX_EVENT_HEADER
  48 #include <pthread.h>
  49 
  50 #include "src/class/pmix_list.h"
  51 #include "src/util/error.h"
  52 #include "src/util/fd.h"
  53 #include "src/util/output.h"
  54 #include "src/util/pmix_environ.h"
  55 
  56  #include "src/mca/ptl/base/base.h"
  57 
  58 // local functions for connection support
  59 static void* listen_thread(void *obj);
  60 static pthread_t engine;
  61 static bool setup_complete = false;
  62 
  63 /* Cycle across all available plugins and provide them with
  64  * an opportunity to register rendezvous points (server-side
  65  * function). Function is to return PMIX_SUCCESS if at least
  66  * one rendezvous can be defined. */
  67 static pmix_status_t setup_listeners(pmix_info_t *info, size_t ninfo, bool *need_listener)
  68 {
  69     pmix_ptl_base_active_t *active;
  70     pmix_status_t rc;
  71     size_t n;
  72     bool single = false;
  73 
  74     if (!pmix_ptl_globals.initialized) {
  75         return PMIX_ERR_INIT;
  76     }
  77 
  78     /* scan the directives to see if they want only one listener setup */
  79     if (NULL != info) {
  80         for (n=0; n < ninfo; n++) {
  81             if (0 == strncmp(info[n].key, PMIX_SINGLE_LISTENER, PMIX_MAX_KEYLEN)) {
  82                 single = PMIX_INFO_TRUE(&info[n]);
  83                 break;
  84             }
  85         }
  86     }
  87 
  88     PMIX_LIST_FOREACH(active, &pmix_ptl_globals.actives, pmix_ptl_base_active_t) {
  89         if (NULL != active->component->setup_listener) {
  90             rc = active->component->setup_listener(info, ninfo, need_listener);
  91             if (PMIX_SUCCESS != rc && PMIX_ERR_NOT_AVAILABLE != rc) {
  92                 return rc;
  93             }
  94             if (single) {
  95                 return PMIX_SUCCESS;
  96             }
  97         }
  98     }
  99     /* we must have at least one listener */
 100     if (0 == pmix_list_get_size(&pmix_ptl_globals.listeners)) {
 101         return PMIX_ERR_INIT;
 102     }
 103     return PMIX_SUCCESS;
 104 }
 105 
 106 /*
 107  * start listening thread
 108  */
 109 pmix_status_t pmix_ptl_base_start_listening(pmix_info_t *info, size_t ninfo)
 110 {
 111     pmix_status_t rc;
 112     bool need_listener = false;
 113 
 114     /* setup the listeners */
 115     if (!setup_complete) {
 116         if (PMIX_SUCCESS != (rc = setup_listeners(info, ninfo, &need_listener))) {
 117             return rc;
 118         }
 119     }
 120     setup_complete = true;
 121 
 122     /* if we don't need a listener thread, then we are done */
 123     if (!need_listener) {
 124         return PMIX_SUCCESS;
 125     }
 126 
 127     /*** spawn internal listener thread */
 128     if (0 > pipe(pmix_ptl_globals.stop_thread)) {
 129         PMIX_ERROR_LOG(PMIX_ERR_IN_ERRNO);
 130         return PMIX_ERR_OUT_OF_RESOURCE;
 131     }
 132     /* Make sure the pipe FDs are set to close-on-exec so that
 133        they don't leak into children */
 134     if (pmix_fd_set_cloexec(pmix_ptl_globals.stop_thread[0]) != PMIX_SUCCESS ||
 135         pmix_fd_set_cloexec(pmix_ptl_globals.stop_thread[1]) != PMIX_SUCCESS) {
 136         PMIX_ERROR_LOG(PMIX_ERR_IN_ERRNO);
 137         close(pmix_ptl_globals.stop_thread[0]);
 138         close(pmix_ptl_globals.stop_thread[1]);
 139         return PMIX_ERR_OUT_OF_RESOURCE;
 140     }
 141     /* fork off the listener thread */
 142     pmix_ptl_globals.listen_thread_active = true;
 143     if (0 > pthread_create(&engine, NULL, listen_thread, NULL)) {
 144         pmix_ptl_globals.listen_thread_active = false;
 145         return PMIX_ERROR;
 146     }
 147 
 148     return PMIX_SUCCESS;
 149 }
 150 
 151 void pmix_ptl_base_stop_listening(void)
 152 {
 153     int i;
 154     pmix_listener_t *lt;
 155 
 156     pmix_output_verbose(8, pmix_ptl_base_framework.framework_output,
 157                         "listen_thread: shutdown");
 158 
 159     if (!pmix_ptl_globals.listen_thread_active) {
 160         /* nothing we can do */
 161         return;
 162     }
 163 
 164     /* mark it as inactive */
 165     pmix_ptl_globals.listen_thread_active = false;
 166     /* use the block to break it loose just in
 167      * case the thread is blocked in a call to select for
 168      * a long time */
 169     i=1;
 170     if (0 > write(pmix_ptl_globals.stop_thread[1], &i, sizeof(int))) {
 171         return;
 172     }
 173     /* wait for thread to exit */
 174     pthread_join(engine, NULL);
 175     /* close the sockets to remove the connection points */
 176     PMIX_LIST_FOREACH(lt, &pmix_ptl_globals.listeners, pmix_listener_t) {
 177         CLOSE_THE_SOCKET(lt->socket);
 178         lt->socket = -1;
 179     }
 180 }
 181 
 182 static void* listen_thread(void *obj)
 183 {
 184     int rc, max, accepted_connections;
 185     socklen_t addrlen = sizeof(struct sockaddr_storage);
 186     pmix_pending_connection_t *pending_connection;
 187     struct timeval timeout;
 188     fd_set readfds;
 189     pmix_listener_t *lt;
 190 
 191     pmix_output_verbose(8, pmix_ptl_base_framework.framework_output,
 192                         "listen_thread: active");
 193 
 194 
 195     while (pmix_ptl_globals.listen_thread_active) {
 196         FD_ZERO(&readfds);
 197         max = -1;
 198         PMIX_LIST_FOREACH(lt, &pmix_ptl_globals.listeners, pmix_listener_t) {
 199             FD_SET(lt->socket, &readfds);
 200             max = (lt->socket > max) ? lt->socket : max;
 201         }
 202         /* add the stop_thread fd */
 203         FD_SET(pmix_ptl_globals.stop_thread[0], &readfds);
 204         max = (pmix_ptl_globals.stop_thread[0] > max) ? pmix_ptl_globals.stop_thread[0] : max;
 205 
 206         /* set timeout interval */
 207         timeout.tv_sec = 2;
 208         timeout.tv_usec = 0;
 209 
 210         /* Block in a select to avoid hammering the cpu.  If a connection
 211          * comes in, we'll get woken up right away.
 212          */
 213         rc = select(max + 1, &readfds, NULL, NULL, &timeout);
 214         if (!pmix_ptl_globals.listen_thread_active) {
 215             /* we've been asked to terminate */
 216             close(pmix_ptl_globals.stop_thread[0]);
 217             close(pmix_ptl_globals.stop_thread[1]);
 218             return NULL;
 219         }
 220         if (rc < 0) {
 221             continue;
 222         }
 223 
 224         /* Spin accepting connections until all active listen sockets
 225          * do not have any incoming connections, pushing each connection
 226          * onto the event queue for processing
 227          */
 228         do {
 229             accepted_connections = 0;
 230             PMIX_LIST_FOREACH(lt, &pmix_ptl_globals.listeners, pmix_listener_t) {
 231 
 232                 /* according to the man pages, select replaces the given descriptor
 233                  * set with a subset consisting of those descriptors that are ready
 234                  * for the specified operation - in this case, a read. So we need to
 235                  * first check to see if this file descriptor is included in the
 236                  * returned subset
 237                  */
 238                 if (0 == FD_ISSET(lt->socket, &readfds)) {
 239                     /* this descriptor is not included */
 240                     continue;
 241                 }
 242 
 243                 /* this descriptor is ready to be read, which means a connection
 244                  * request has been received - so harvest it. All we want to do
 245                  * here is accept the connection and push the info onto the event
 246                  * library for subsequent processing - we don't want to actually
 247                  * process the connection here as it takes too long, and so the
 248                  * OS might start rejecting connections due to timeout.
 249                  */
 250                 pending_connection = PMIX_NEW(pmix_pending_connection_t);
 251                 pending_connection->protocol = lt->protocol;
 252                 pending_connection->ptl = lt->ptl;
 253                 pmix_event_assign(&pending_connection->ev, pmix_globals.evbase, -1,
 254                                   EV_WRITE, lt->cbfunc, pending_connection);
 255                 pending_connection->sd = accept(lt->socket,
 256                                                 (struct sockaddr*)&(pending_connection->addr),
 257                                                 &addrlen);
 258                 if (pending_connection->sd < 0) {
 259                     PMIX_RELEASE(pending_connection);
 260                     if (pmix_socket_errno != EAGAIN ||
 261                         pmix_socket_errno != EWOULDBLOCK) {
 262                         if (EMFILE == pmix_socket_errno ||
 263                             ENOBUFS == pmix_socket_errno ||
 264                             ENOMEM == pmix_socket_errno) {
 265                             PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
 266                         } else if (EINVAL == pmix_socket_errno ||
 267                                    EINTR == pmix_socket_errno) {
 268                             /* race condition at finalize */
 269                             goto done;
 270                         } else if (ECONNABORTED == pmix_socket_errno) {
 271                             /* they aborted the attempt */
 272                             continue;
 273                         } else {
 274                             pmix_output(0, "listen_thread: accept() failed: %s (%d).",
 275                                         strerror(pmix_socket_errno), pmix_socket_errno);
 276                         }
 277                         goto done;
 278                     }
 279                     continue;
 280                 }
 281 
 282                 pmix_output_verbose(8, pmix_ptl_base_framework.framework_output,
 283                                     "listen_thread: new connection: (%d, %d)",
 284                                     pending_connection->sd, pmix_socket_errno);
 285                 /* post the object */
 286                 PMIX_POST_OBJECT(pending_connection);
 287                 /* activate the event */
 288                 pmix_event_active(&pending_connection->ev, EV_WRITE, 1);
 289                 accepted_connections++;
 290             }
 291         } while (accepted_connections > 0);
 292     }
 293 
 294  done:
 295     pmix_ptl_globals.listen_thread_active = false;
 296     return NULL;
 297 }

/* [<][>][^][v][top][bottom][index][help] */