This source file includes following definitions.
- orte_register_listener
- orte_start_listening
- orte_stop_listening
- listen_thread_fn
- lcons
- ldes
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 
  17 
  18 
  19 
  20 
  21 
  22 
  23 
  24 
  25 
  26 
  27 
  28 
  29 
  30 
  31 #include "orte_config.h"
  32 #include "orte/types.h"
  33 #include "opal/types.h"
  34 
  35 #ifdef HAVE_UNISTD_H
  36 #include <unistd.h>
  37 #endif
  38 #ifdef HAVE_SYS_TYPES_H
  39 #include <sys/types.h>
  40 #endif
  41 #include <fcntl.h>
  42 #ifdef HAVE_NETINET_IN_H
  43 #include <netinet/in.h>
  44 #endif
  45 #ifdef HAVE_ARPA_INET_H
  46 #include <arpa/inet.h>
  47 #endif
  48 #ifdef HAVE_NETDB_H
  49 #include <netdb.h>
  50 #endif
  51 #ifdef HAVE_SYS_SOCKET_H
  52 #include <sys/socket.h>
  53 #endif
  54 
  55 #include <ctype.h>
  56 
  57 #include "opal/util/error.h"
  58 #include "opal/util/output.h"
  59 #include "opal/opal_socket_errno.h"
  60 #include "opal/util/if.h"
  61 #include "opal/util/net.h"
  62 #include "opal/util/fd.h"
  63 #include "opal/class/opal_list.h"
  64 
  65 #include "orte/mca/errmgr/errmgr.h"
  66 #include "orte/util/name_fns.h"
  67 #include "orte/runtime/orte_globals.h"
  68 #include "orte/util/show_help.h"
  69 
  70 #include "orte/util/listener.h"
  71 
  72 static void* listen_thread_fn(opal_object_t *obj);
  73 static opal_list_t mylisteners;
  74 static bool initialized = false;
  75 static opal_thread_t listen_thread;
  76 static volatile bool listen_thread_active = false;
  77 static struct timeval listen_thread_tv;
  78 static int stop_thread[2];
  79 
  80 #define CLOSE_THE_SOCKET(socket)    \
  81     do {                            \
  82         shutdown(socket, 2);        \
  83         close(socket);              \
  84         socket = -1;                \
  85     } while(0)
  86 
  87 
  88 int orte_register_listener(struct sockaddr* address, opal_socklen_t addrlen,
  89                            opal_event_base_t *evbase,
  90                            orte_listener_callback_fn_t handler)
  91 {
  92     orte_listener_t *conn;
  93     int flags;
  94     int sd = -1;
  95 
  96     if (!initialized) {
  97         OBJ_CONSTRUCT(&mylisteners, opal_list_t);
  98         OBJ_CONSTRUCT(&listen_thread, opal_thread_t);
  99         if (0 > pipe(stop_thread)) {
 100             ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
 101             return ORTE_ERR_OUT_OF_RESOURCE;
 102         }
 103         
 104 
 105         if (opal_fd_set_cloexec(stop_thread[0]) != OPAL_SUCCESS ||
 106             opal_fd_set_cloexec(stop_thread[1]) != OPAL_SUCCESS) {
 107             close(stop_thread[0]);
 108             close(stop_thread[1]);
 109             ORTE_ERROR_LOG(ORTE_ERR_IN_ERRNO);
 110             return ORTE_ERR_IN_ERRNO;
 111         }
 112         listen_thread_tv.tv_sec = 3600;
 113         listen_thread_tv.tv_usec = 0;
 114         initialized = true;
 115     }
 116 
 117     
 118     sd = socket(PF_UNIX, SOCK_STREAM, 0);
 119     if (sd < 0) {
 120         if (EAFNOSUPPORT != opal_socket_errno) {
 121             opal_output(0,"pmix_server_start_listening: socket() failed: %s (%d)",
 122                         strerror(opal_socket_errno), opal_socket_errno);
 123         }
 124         return ORTE_ERR_IN_ERRNO;
 125     }
 126     
 127 
 128     if (opal_fd_set_cloexec(sd) != OPAL_SUCCESS) {
 129         opal_output(0, "pmix_server: unable to set the "
 130                     "listening socket to CLOEXEC (%s:%d)\n",
 131                     strerror(opal_socket_errno), opal_socket_errno);
 132         CLOSE_THE_SOCKET(sd);
 133         return ORTE_ERROR;
 134     }
 135 
 136 
 137     if (bind(sd, (struct sockaddr*)address, addrlen) < 0) {
 138         opal_output(0, "%s bind() failed on error %s (%d)",
 139                     ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 140                     strerror(opal_socket_errno),
 141                     opal_socket_errno );
 142         CLOSE_THE_SOCKET(sd);
 143         return ORTE_ERROR;
 144     }
 145 
 146     
 147     if (listen(sd, SOMAXCONN) < 0) {
 148         opal_output(0, "orte_listener: listen() failed: %s (%d)",
 149                     strerror(opal_socket_errno), opal_socket_errno);
 150         CLOSE_THE_SOCKET(sd);
 151         return ORTE_ERROR;
 152     }
 153 
 154     
 155     if ((flags = fcntl(sd, F_GETFL, 0)) < 0) {
 156         opal_output(0, "orte_listener: fcntl(F_GETFL) failed: %s (%d)",
 157                     strerror(opal_socket_errno), opal_socket_errno);
 158         CLOSE_THE_SOCKET(sd);
 159         return ORTE_ERROR;
 160     }
 161     flags |= O_NONBLOCK;
 162     if (fcntl(sd, F_SETFL, flags) < 0) {
 163         opal_output(0, "orte_listener: fcntl(F_SETFL) failed: %s (%d)",
 164                     strerror(opal_socket_errno), opal_socket_errno);
 165         CLOSE_THE_SOCKET(sd);
 166         return ORTE_ERROR;
 167     }
 168 
 169     
 170     conn = OBJ_NEW(orte_listener_t);
 171     conn->sd = sd;
 172     conn->evbase = evbase;
 173     conn->handler = handler;
 174     opal_list_append(&mylisteners, &conn->item);
 175 
 176     return ORTE_SUCCESS;
 177 }
 178 
 179 
 180 
 181 
 182 
 183 
 184 
 185 
 186 
 187 
 188 
 189 
 190 int orte_start_listening(void)
 191 {
 192     int rc;
 193 
 194     
 195 
 196     if (!initialized || 0 == opal_list_get_size(&mylisteners) ||
 197         listen_thread_active) {
 198         return ORTE_SUCCESS;
 199     }
 200 
 201     
 202     listen_thread_active = true;
 203     listen_thread.t_run = listen_thread_fn;
 204     listen_thread.t_arg = NULL;
 205     if (OPAL_SUCCESS != (rc = opal_thread_start(&listen_thread))) {
 206         ORTE_ERROR_LOG(rc);
 207         opal_output(0, "%s Unable to start listen thread", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 208     }
 209     return rc;
 210 }
 211 
 212 void orte_stop_listening(void)
 213 {
 214     int i=0;
 215 
 216     if (!listen_thread_active) {
 217         return;
 218     }
 219 
 220     listen_thread_active = false;
 221     
 222     write(stop_thread[1], &i, sizeof(int));
 223     opal_thread_join(&listen_thread, NULL);
 224     OBJ_DESTRUCT(&listen_thread);
 225     OPAL_LIST_DESTRUCT(&mylisteners);
 226 }
 227 
 228 
 229 
 230 
 231 
 232 
 233 
 234 static void* listen_thread_fn(opal_object_t *obj)
 235 {
 236     int rc, max, accepted_connections, sd;
 237     opal_socklen_t addrlen = sizeof(struct sockaddr_storage);
 238     orte_pending_connection_t *pending_connection;
 239     struct timeval timeout;
 240     fd_set readfds;
 241     orte_listener_t *listener;
 242 
 243     while (listen_thread_active) {
 244         FD_ZERO(&readfds);
 245         max = -1;
 246         OPAL_LIST_FOREACH(listener, &mylisteners, orte_listener_t) {
 247             FD_SET(listener->sd, &readfds);
 248             max = (listener->sd > max) ? listener->sd : max;
 249         }
 250         
 251         FD_SET(stop_thread[0], &readfds);
 252         max = (stop_thread[0] > max) ? stop_thread[0] : max;
 253 
 254         
 255         timeout.tv_sec = listen_thread_tv.tv_sec;
 256         timeout.tv_usec = listen_thread_tv.tv_usec;
 257 
 258         
 259 
 260 
 261         rc = select(max + 1, &readfds, NULL, NULL, &timeout);
 262         if (!listen_thread_active) {
 263             
 264             goto done;
 265         }
 266         if (rc < 0) {
 267             if (EAGAIN != opal_socket_errno && EINTR != opal_socket_errno) {
 268                 perror("select");
 269             }
 270             continue;
 271         }
 272 
 273         
 274 
 275 
 276 
 277         do {
 278             accepted_connections = 0;
 279             OPAL_LIST_FOREACH(listener, &mylisteners, orte_listener_t) {
 280                 sd = listener->sd;
 281 
 282                 
 283 
 284 
 285 
 286 
 287 
 288                 if (0 == FD_ISSET(sd, &readfds)) {
 289                     
 290                     continue;
 291                 }
 292 
 293                 
 294 
 295 
 296 
 297 
 298 
 299 
 300                 pending_connection = OBJ_NEW(orte_pending_connection_t);
 301                 opal_event_set(listener->evbase, &pending_connection->ev, -1,
 302                                OPAL_EV_WRITE, listener->handler, pending_connection);
 303                 opal_event_set_priority(&pending_connection->ev, ORTE_MSG_PRI);
 304                 pending_connection->fd = accept(sd,
 305                                                 (struct sockaddr*)&(pending_connection->addr),
 306                                                 &addrlen);
 307                 if (pending_connection->fd < 0) {
 308                     OBJ_RELEASE(pending_connection);
 309 
 310                     
 311                     if (EAGAIN == opal_socket_errno ||
 312                         EWOULDBLOCK == opal_socket_errno) {
 313                         continue;
 314                     }
 315 
 316                     
 317 
 318 
 319                     else if (EMFILE == opal_socket_errno) {
 320                         CLOSE_THE_SOCKET(sd);
 321                         ORTE_ERROR_LOG(ORTE_ERR_SYS_LIMITS_SOCKETS);
 322                         orte_show_help("help-oob-tcp.txt",
 323                                        "accept failed",
 324                                        true,
 325                                        opal_process_info.nodename,
 326                                        opal_socket_errno,
 327                                        strerror(opal_socket_errno),
 328                                        "Out of file descriptors");
 329                         goto done;
 330                     }
 331 
 332                     
 333 
 334                     else {
 335                         CLOSE_THE_SOCKET(sd);
 336                         orte_show_help("help-oob-tcp.txt",
 337                                        "accept failed",
 338                                        true,
 339                                        opal_process_info.nodename,
 340                                        opal_socket_errno,
 341                                        strerror(opal_socket_errno),
 342                                        "Unknown cause; job will try to continue");
 343                         continue;
 344                     }
 345                 }
 346 
 347                 
 348                 opal_event_active(&pending_connection->ev, OPAL_EV_WRITE, 1);
 349                 accepted_connections++;
 350             }
 351         } while (accepted_connections > 0);
 352     }
 353 
 354  done:
 355     close(stop_thread[0]);
 356     close(stop_thread[1]);
 357     return NULL;
 358 }
 359 
 360 
 361 
 362 static void lcons(orte_listener_t *p)
 363 {
 364     p->sd = -1;
 365     p->evbase = NULL;
 366     p->handler = NULL;
 367 }
 368 static void ldes(orte_listener_t *p)
 369 {
 370     if (0 <= p->sd) {
 371         CLOSE_THE_SOCKET(p->sd);
 372     }
 373 }
 374 OBJ_CLASS_INSTANCE(orte_listener_t,
 375                    opal_list_item_t,
 376                    lcons, ldes);
 377 
 378 OBJ_CLASS_INSTANCE(orte_pending_connection_t,
 379                    opal_object_t,
 380                    NULL,
 381                    NULL);