This source file includes following definitions.
- orte_register_listener
- orte_start_listening
- orte_stop_listening
- listen_thread_fn
- lcons
- ldes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 #include "orte_config.h"
32 #include "orte/types.h"
33 #include "opal/types.h"
34
35 #ifdef HAVE_UNISTD_H
36 #include <unistd.h>
37 #endif
38 #ifdef HAVE_SYS_TYPES_H
39 #include <sys/types.h>
40 #endif
41 #include <fcntl.h>
42 #ifdef HAVE_NETINET_IN_H
43 #include <netinet/in.h>
44 #endif
45 #ifdef HAVE_ARPA_INET_H
46 #include <arpa/inet.h>
47 #endif
48 #ifdef HAVE_NETDB_H
49 #include <netdb.h>
50 #endif
51 #ifdef HAVE_SYS_SOCKET_H
52 #include <sys/socket.h>
53 #endif
54
55 #include <ctype.h>
56
57 #include "opal/util/error.h"
58 #include "opal/util/output.h"
59 #include "opal/opal_socket_errno.h"
60 #include "opal/util/if.h"
61 #include "opal/util/net.h"
62 #include "opal/util/fd.h"
63 #include "opal/class/opal_list.h"
64
65 #include "orte/mca/errmgr/errmgr.h"
66 #include "orte/util/name_fns.h"
67 #include "orte/runtime/orte_globals.h"
68 #include "orte/util/show_help.h"
69
70 #include "orte/util/listener.h"
71
72 static void* listen_thread_fn(opal_object_t *obj);
73 static opal_list_t mylisteners;
74 static bool initialized = false;
75 static opal_thread_t listen_thread;
76 static volatile bool listen_thread_active = false;
77 static struct timeval listen_thread_tv;
78 static int stop_thread[2];
79
80 #define CLOSE_THE_SOCKET(socket) \
81 do { \
82 shutdown(socket, 2); \
83 close(socket); \
84 socket = -1; \
85 } while(0)
86
87
88 int orte_register_listener(struct sockaddr* address, opal_socklen_t addrlen,
89 opal_event_base_t *evbase,
90 orte_listener_callback_fn_t handler)
91 {
92 orte_listener_t *conn;
93 int flags;
94 int sd = -1;
95
96 if (!initialized) {
97 OBJ_CONSTRUCT(&mylisteners, opal_list_t);
98 OBJ_CONSTRUCT(&listen_thread, opal_thread_t);
99 if (0 > pipe(stop_thread)) {
100 ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
101 return ORTE_ERR_OUT_OF_RESOURCE;
102 }
103
104
105 if (opal_fd_set_cloexec(stop_thread[0]) != OPAL_SUCCESS ||
106 opal_fd_set_cloexec(stop_thread[1]) != OPAL_SUCCESS) {
107 close(stop_thread[0]);
108 close(stop_thread[1]);
109 ORTE_ERROR_LOG(ORTE_ERR_IN_ERRNO);
110 return ORTE_ERR_IN_ERRNO;
111 }
112 listen_thread_tv.tv_sec = 3600;
113 listen_thread_tv.tv_usec = 0;
114 initialized = true;
115 }
116
117
118 sd = socket(PF_UNIX, SOCK_STREAM, 0);
119 if (sd < 0) {
120 if (EAFNOSUPPORT != opal_socket_errno) {
121 opal_output(0,"pmix_server_start_listening: socket() failed: %s (%d)",
122 strerror(opal_socket_errno), opal_socket_errno);
123 }
124 return ORTE_ERR_IN_ERRNO;
125 }
126
127
128 if (opal_fd_set_cloexec(sd) != OPAL_SUCCESS) {
129 opal_output(0, "pmix_server: unable to set the "
130 "listening socket to CLOEXEC (%s:%d)\n",
131 strerror(opal_socket_errno), opal_socket_errno);
132 CLOSE_THE_SOCKET(sd);
133 return ORTE_ERROR;
134 }
135
136
137 if (bind(sd, (struct sockaddr*)address, addrlen) < 0) {
138 opal_output(0, "%s bind() failed on error %s (%d)",
139 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
140 strerror(opal_socket_errno),
141 opal_socket_errno );
142 CLOSE_THE_SOCKET(sd);
143 return ORTE_ERROR;
144 }
145
146
147 if (listen(sd, SOMAXCONN) < 0) {
148 opal_output(0, "orte_listener: listen() failed: %s (%d)",
149 strerror(opal_socket_errno), opal_socket_errno);
150 CLOSE_THE_SOCKET(sd);
151 return ORTE_ERROR;
152 }
153
154
155 if ((flags = fcntl(sd, F_GETFL, 0)) < 0) {
156 opal_output(0, "orte_listener: fcntl(F_GETFL) failed: %s (%d)",
157 strerror(opal_socket_errno), opal_socket_errno);
158 CLOSE_THE_SOCKET(sd);
159 return ORTE_ERROR;
160 }
161 flags |= O_NONBLOCK;
162 if (fcntl(sd, F_SETFL, flags) < 0) {
163 opal_output(0, "orte_listener: fcntl(F_SETFL) failed: %s (%d)",
164 strerror(opal_socket_errno), opal_socket_errno);
165 CLOSE_THE_SOCKET(sd);
166 return ORTE_ERROR;
167 }
168
169
170 conn = OBJ_NEW(orte_listener_t);
171 conn->sd = sd;
172 conn->evbase = evbase;
173 conn->handler = handler;
174 opal_list_append(&mylisteners, &conn->item);
175
176 return ORTE_SUCCESS;
177 }
178
179
180
181
182
183
184
185
186
187
188
189
190 int orte_start_listening(void)
191 {
192 int rc;
193
194
195
196 if (!initialized || 0 == opal_list_get_size(&mylisteners) ||
197 listen_thread_active) {
198 return ORTE_SUCCESS;
199 }
200
201
202 listen_thread_active = true;
203 listen_thread.t_run = listen_thread_fn;
204 listen_thread.t_arg = NULL;
205 if (OPAL_SUCCESS != (rc = opal_thread_start(&listen_thread))) {
206 ORTE_ERROR_LOG(rc);
207 opal_output(0, "%s Unable to start listen thread", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
208 }
209 return rc;
210 }
211
212 void orte_stop_listening(void)
213 {
214 int i=0;
215
216 if (!listen_thread_active) {
217 return;
218 }
219
220 listen_thread_active = false;
221
222 write(stop_thread[1], &i, sizeof(int));
223 opal_thread_join(&listen_thread, NULL);
224 OBJ_DESTRUCT(&listen_thread);
225 OPAL_LIST_DESTRUCT(&mylisteners);
226 }
227
228
229
230
231
232
233
234 static void* listen_thread_fn(opal_object_t *obj)
235 {
236 int rc, max, accepted_connections, sd;
237 opal_socklen_t addrlen = sizeof(struct sockaddr_storage);
238 orte_pending_connection_t *pending_connection;
239 struct timeval timeout;
240 fd_set readfds;
241 orte_listener_t *listener;
242
243 while (listen_thread_active) {
244 FD_ZERO(&readfds);
245 max = -1;
246 OPAL_LIST_FOREACH(listener, &mylisteners, orte_listener_t) {
247 FD_SET(listener->sd, &readfds);
248 max = (listener->sd > max) ? listener->sd : max;
249 }
250
251 FD_SET(stop_thread[0], &readfds);
252 max = (stop_thread[0] > max) ? stop_thread[0] : max;
253
254
255 timeout.tv_sec = listen_thread_tv.tv_sec;
256 timeout.tv_usec = listen_thread_tv.tv_usec;
257
258
259
260
261 rc = select(max + 1, &readfds, NULL, NULL, &timeout);
262 if (!listen_thread_active) {
263
264 goto done;
265 }
266 if (rc < 0) {
267 if (EAGAIN != opal_socket_errno && EINTR != opal_socket_errno) {
268 perror("select");
269 }
270 continue;
271 }
272
273
274
275
276
277 do {
278 accepted_connections = 0;
279 OPAL_LIST_FOREACH(listener, &mylisteners, orte_listener_t) {
280 sd = listener->sd;
281
282
283
284
285
286
287
288 if (0 == FD_ISSET(sd, &readfds)) {
289
290 continue;
291 }
292
293
294
295
296
297
298
299
300 pending_connection = OBJ_NEW(orte_pending_connection_t);
301 opal_event_set(listener->evbase, &pending_connection->ev, -1,
302 OPAL_EV_WRITE, listener->handler, pending_connection);
303 opal_event_set_priority(&pending_connection->ev, ORTE_MSG_PRI);
304 pending_connection->fd = accept(sd,
305 (struct sockaddr*)&(pending_connection->addr),
306 &addrlen);
307 if (pending_connection->fd < 0) {
308 OBJ_RELEASE(pending_connection);
309
310
311 if (EAGAIN == opal_socket_errno ||
312 EWOULDBLOCK == opal_socket_errno) {
313 continue;
314 }
315
316
317
318
319 else if (EMFILE == opal_socket_errno) {
320 CLOSE_THE_SOCKET(sd);
321 ORTE_ERROR_LOG(ORTE_ERR_SYS_LIMITS_SOCKETS);
322 orte_show_help("help-oob-tcp.txt",
323 "accept failed",
324 true,
325 opal_process_info.nodename,
326 opal_socket_errno,
327 strerror(opal_socket_errno),
328 "Out of file descriptors");
329 goto done;
330 }
331
332
333
334 else {
335 CLOSE_THE_SOCKET(sd);
336 orte_show_help("help-oob-tcp.txt",
337 "accept failed",
338 true,
339 opal_process_info.nodename,
340 opal_socket_errno,
341 strerror(opal_socket_errno),
342 "Unknown cause; job will try to continue");
343 continue;
344 }
345 }
346
347
348 opal_event_active(&pending_connection->ev, OPAL_EV_WRITE, 1);
349 accepted_connections++;
350 }
351 } while (accepted_connections > 0);
352 }
353
354 done:
355 close(stop_thread[0]);
356 close(stop_thread[1]);
357 return NULL;
358 }
359
360
361
362 static void lcons(orte_listener_t *p)
363 {
364 p->sd = -1;
365 p->evbase = NULL;
366 p->handler = NULL;
367 }
368 static void ldes(orte_listener_t *p)
369 {
370 if (0 <= p->sd) {
371 CLOSE_THE_SOCKET(p->sd);
372 }
373 }
374 OBJ_CLASS_INSTANCE(orte_listener_t,
375 opal_list_item_t,
376 lcons, ldes);
377
378 OBJ_CLASS_INSTANCE(orte_pending_connection_t,
379 opal_object_t,
380 NULL,
381 NULL);