This source file includes following definitions.
- setup_listeners
- pmix_ptl_base_start_listening
- pmix_ptl_base_stop_listening
- listen_thread
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 
  17 
  18 #include <src/include/pmix_config.h>
  19 
  20 #include <src/include/pmix_stdint.h>
  21 #include <src/include/pmix_socket_errno.h>
  22 
  23 #include <pmix_server.h>
  24 #include "src/include/pmix_globals.h"
  25 
  26 #ifdef HAVE_STRING_H
  27 #include <string.h>
  28 #endif
  29 #include <fcntl.h>
  30 #ifdef HAVE_UNISTD_H
  31 #include <unistd.h>
  32 #endif
  33 #ifdef HAVE_SYS_SOCKET_H
  34 #include <sys/socket.h>
  35 #endif
  36 #ifdef HAVE_SYS_UN_H
  37 #include <sys/un.h>
  38 #endif
  39 #ifdef HAVE_SYS_UIO_H
  40 #include <sys/uio.h>
  41 #endif
  42 #ifdef HAVE_SYS_TYPES_H
  43 #include <sys/types.h>
  44 #endif
  45 #include <ctype.h>
  46 #include <sys/stat.h>
  47 #include PMIX_EVENT_HEADER
  48 #include <pthread.h>
  49 
  50 #include "src/class/pmix_list.h"
  51 #include "src/util/error.h"
  52 #include "src/util/fd.h"
  53 #include "src/util/output.h"
  54 #include "src/util/pmix_environ.h"
  55 
  56  #include "src/mca/ptl/base/base.h"
  57 
  58 
  59 static void* listen_thread(void *obj);
  60 static pthread_t engine;
  61 static bool setup_complete = false;
  62 
  63 
  64 
  65 
  66 
  67 static pmix_status_t setup_listeners(pmix_info_t *info, size_t ninfo, bool *need_listener)
  68 {
  69     pmix_ptl_base_active_t *active;
  70     pmix_status_t rc;
  71     size_t n;
  72     bool single = false;
  73 
  74     if (!pmix_ptl_globals.initialized) {
  75         return PMIX_ERR_INIT;
  76     }
  77 
  78     
  79     if (NULL != info) {
  80         for (n=0; n < ninfo; n++) {
  81             if (0 == strncmp(info[n].key, PMIX_SINGLE_LISTENER, PMIX_MAX_KEYLEN)) {
  82                 single = PMIX_INFO_TRUE(&info[n]);
  83                 break;
  84             }
  85         }
  86     }
  87 
  88     PMIX_LIST_FOREACH(active, &pmix_ptl_globals.actives, pmix_ptl_base_active_t) {
  89         if (NULL != active->component->setup_listener) {
  90             rc = active->component->setup_listener(info, ninfo, need_listener);
  91             if (PMIX_SUCCESS != rc && PMIX_ERR_NOT_AVAILABLE != rc) {
  92                 return rc;
  93             }
  94             if (single) {
  95                 return PMIX_SUCCESS;
  96             }
  97         }
  98     }
  99     
 100     if (0 == pmix_list_get_size(&pmix_ptl_globals.listeners)) {
 101         return PMIX_ERR_INIT;
 102     }
 103     return PMIX_SUCCESS;
 104 }
 105 
 106 
 107 
 108 
 109 pmix_status_t pmix_ptl_base_start_listening(pmix_info_t *info, size_t ninfo)
 110 {
 111     pmix_status_t rc;
 112     bool need_listener = false;
 113 
 114     
 115     if (!setup_complete) {
 116         if (PMIX_SUCCESS != (rc = setup_listeners(info, ninfo, &need_listener))) {
 117             return rc;
 118         }
 119     }
 120     setup_complete = true;
 121 
 122     
 123     if (!need_listener) {
 124         return PMIX_SUCCESS;
 125     }
 126 
 127     
 128     if (0 > pipe(pmix_ptl_globals.stop_thread)) {
 129         PMIX_ERROR_LOG(PMIX_ERR_IN_ERRNO);
 130         return PMIX_ERR_OUT_OF_RESOURCE;
 131     }
 132     
 133 
 134     if (pmix_fd_set_cloexec(pmix_ptl_globals.stop_thread[0]) != PMIX_SUCCESS ||
 135         pmix_fd_set_cloexec(pmix_ptl_globals.stop_thread[1]) != PMIX_SUCCESS) {
 136         PMIX_ERROR_LOG(PMIX_ERR_IN_ERRNO);
 137         close(pmix_ptl_globals.stop_thread[0]);
 138         close(pmix_ptl_globals.stop_thread[1]);
 139         return PMIX_ERR_OUT_OF_RESOURCE;
 140     }
 141     
 142     pmix_ptl_globals.listen_thread_active = true;
 143     if (0 > pthread_create(&engine, NULL, listen_thread, NULL)) {
 144         pmix_ptl_globals.listen_thread_active = false;
 145         return PMIX_ERROR;
 146     }
 147 
 148     return PMIX_SUCCESS;
 149 }
 150 
 151 void pmix_ptl_base_stop_listening(void)
 152 {
 153     int i;
 154     pmix_listener_t *lt;
 155 
 156     pmix_output_verbose(8, pmix_ptl_base_framework.framework_output,
 157                         "listen_thread: shutdown");
 158 
 159     if (!pmix_ptl_globals.listen_thread_active) {
 160         
 161         return;
 162     }
 163 
 164     
 165     pmix_ptl_globals.listen_thread_active = false;
 166     
 167 
 168 
 169     i=1;
 170     if (0 > write(pmix_ptl_globals.stop_thread[1], &i, sizeof(int))) {
 171         return;
 172     }
 173     
 174     pthread_join(engine, NULL);
 175     
 176     PMIX_LIST_FOREACH(lt, &pmix_ptl_globals.listeners, pmix_listener_t) {
 177         CLOSE_THE_SOCKET(lt->socket);
 178         lt->socket = -1;
 179     }
 180 }
 181 
 182 static void* listen_thread(void *obj)
 183 {
 184     int rc, max, accepted_connections;
 185     socklen_t addrlen = sizeof(struct sockaddr_storage);
 186     pmix_pending_connection_t *pending_connection;
 187     struct timeval timeout;
 188     fd_set readfds;
 189     pmix_listener_t *lt;
 190 
 191     pmix_output_verbose(8, pmix_ptl_base_framework.framework_output,
 192                         "listen_thread: active");
 193 
 194 
 195     while (pmix_ptl_globals.listen_thread_active) {
 196         FD_ZERO(&readfds);
 197         max = -1;
 198         PMIX_LIST_FOREACH(lt, &pmix_ptl_globals.listeners, pmix_listener_t) {
 199             FD_SET(lt->socket, &readfds);
 200             max = (lt->socket > max) ? lt->socket : max;
 201         }
 202         
 203         FD_SET(pmix_ptl_globals.stop_thread[0], &readfds);
 204         max = (pmix_ptl_globals.stop_thread[0] > max) ? pmix_ptl_globals.stop_thread[0] : max;
 205 
 206         
 207         timeout.tv_sec = 2;
 208         timeout.tv_usec = 0;
 209 
 210         
 211 
 212 
 213         rc = select(max + 1, &readfds, NULL, NULL, &timeout);
 214         if (!pmix_ptl_globals.listen_thread_active) {
 215             
 216             close(pmix_ptl_globals.stop_thread[0]);
 217             close(pmix_ptl_globals.stop_thread[1]);
 218             return NULL;
 219         }
 220         if (rc < 0) {
 221             continue;
 222         }
 223 
 224         
 225 
 226 
 227 
 228         do {
 229             accepted_connections = 0;
 230             PMIX_LIST_FOREACH(lt, &pmix_ptl_globals.listeners, pmix_listener_t) {
 231 
 232                 
 233 
 234 
 235 
 236 
 237 
 238                 if (0 == FD_ISSET(lt->socket, &readfds)) {
 239                     
 240                     continue;
 241                 }
 242 
 243                 
 244 
 245 
 246 
 247 
 248 
 249 
 250                 pending_connection = PMIX_NEW(pmix_pending_connection_t);
 251                 pending_connection->protocol = lt->protocol;
 252                 pending_connection->ptl = lt->ptl;
 253                 pmix_event_assign(&pending_connection->ev, pmix_globals.evbase, -1,
 254                                   EV_WRITE, lt->cbfunc, pending_connection);
 255                 pending_connection->sd = accept(lt->socket,
 256                                                 (struct sockaddr*)&(pending_connection->addr),
 257                                                 &addrlen);
 258                 if (pending_connection->sd < 0) {
 259                     PMIX_RELEASE(pending_connection);
 260                     if (pmix_socket_errno != EAGAIN ||
 261                         pmix_socket_errno != EWOULDBLOCK) {
 262                         if (EMFILE == pmix_socket_errno ||
 263                             ENOBUFS == pmix_socket_errno ||
 264                             ENOMEM == pmix_socket_errno) {
 265                             PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
 266                         } else if (EINVAL == pmix_socket_errno ||
 267                                    EINTR == pmix_socket_errno) {
 268                             
 269                             goto done;
 270                         } else if (ECONNABORTED == pmix_socket_errno) {
 271                             
 272                             continue;
 273                         } else {
 274                             pmix_output(0, "listen_thread: accept() failed: %s (%d).",
 275                                         strerror(pmix_socket_errno), pmix_socket_errno);
 276                         }
 277                         goto done;
 278                     }
 279                     continue;
 280                 }
 281 
 282                 pmix_output_verbose(8, pmix_ptl_base_framework.framework_output,
 283                                     "listen_thread: new connection: (%d, %d)",
 284                                     pending_connection->sd, pmix_socket_errno);
 285                 
 286                 PMIX_POST_OBJECT(pending_connection);
 287                 
 288                 pmix_event_active(&pending_connection->ev, EV_WRITE, 1);
 289                 accepted_connections++;
 290             }
 291         } while (accepted_connections > 0);
 292     }
 293 
 294  done:
 295     pmix_ptl_globals.listen_thread_active = false;
 296     return NULL;
 297 }