This source file includes following definitions.
- setup_listeners
- pmix_ptl_base_start_listening
- pmix_ptl_base_stop_listening
- listen_thread
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 #include <src/include/pmix_config.h>
19
20 #include <src/include/pmix_stdint.h>
21 #include <src/include/pmix_socket_errno.h>
22
23 #include <pmix_server.h>
24 #include "src/include/pmix_globals.h"
25
26 #ifdef HAVE_STRING_H
27 #include <string.h>
28 #endif
29 #include <fcntl.h>
30 #ifdef HAVE_UNISTD_H
31 #include <unistd.h>
32 #endif
33 #ifdef HAVE_SYS_SOCKET_H
34 #include <sys/socket.h>
35 #endif
36 #ifdef HAVE_SYS_UN_H
37 #include <sys/un.h>
38 #endif
39 #ifdef HAVE_SYS_UIO_H
40 #include <sys/uio.h>
41 #endif
42 #ifdef HAVE_SYS_TYPES_H
43 #include <sys/types.h>
44 #endif
45 #include <ctype.h>
46 #include <sys/stat.h>
47 #include PMIX_EVENT_HEADER
48 #include <pthread.h>
49
50 #include "src/class/pmix_list.h"
51 #include "src/util/error.h"
52 #include "src/util/fd.h"
53 #include "src/util/output.h"
54 #include "src/util/pmix_environ.h"
55
56 #include "src/mca/ptl/base/base.h"
57
58
59 static void* listen_thread(void *obj);
60 static pthread_t engine;
61 static bool setup_complete = false;
62
63
64
65
66
67 static pmix_status_t setup_listeners(pmix_info_t *info, size_t ninfo, bool *need_listener)
68 {
69 pmix_ptl_base_active_t *active;
70 pmix_status_t rc;
71 size_t n;
72 bool single = false;
73
74 if (!pmix_ptl_globals.initialized) {
75 return PMIX_ERR_INIT;
76 }
77
78
79 if (NULL != info) {
80 for (n=0; n < ninfo; n++) {
81 if (0 == strncmp(info[n].key, PMIX_SINGLE_LISTENER, PMIX_MAX_KEYLEN)) {
82 single = PMIX_INFO_TRUE(&info[n]);
83 break;
84 }
85 }
86 }
87
88 PMIX_LIST_FOREACH(active, &pmix_ptl_globals.actives, pmix_ptl_base_active_t) {
89 if (NULL != active->component->setup_listener) {
90 rc = active->component->setup_listener(info, ninfo, need_listener);
91 if (PMIX_SUCCESS != rc && PMIX_ERR_NOT_AVAILABLE != rc) {
92 return rc;
93 }
94 if (single) {
95 return PMIX_SUCCESS;
96 }
97 }
98 }
99
100 if (0 == pmix_list_get_size(&pmix_ptl_globals.listeners)) {
101 return PMIX_ERR_INIT;
102 }
103 return PMIX_SUCCESS;
104 }
105
106
107
108
109 pmix_status_t pmix_ptl_base_start_listening(pmix_info_t *info, size_t ninfo)
110 {
111 pmix_status_t rc;
112 bool need_listener = false;
113
114
115 if (!setup_complete) {
116 if (PMIX_SUCCESS != (rc = setup_listeners(info, ninfo, &need_listener))) {
117 return rc;
118 }
119 }
120 setup_complete = true;
121
122
123 if (!need_listener) {
124 return PMIX_SUCCESS;
125 }
126
127
128 if (0 > pipe(pmix_ptl_globals.stop_thread)) {
129 PMIX_ERROR_LOG(PMIX_ERR_IN_ERRNO);
130 return PMIX_ERR_OUT_OF_RESOURCE;
131 }
132
133
134 if (pmix_fd_set_cloexec(pmix_ptl_globals.stop_thread[0]) != PMIX_SUCCESS ||
135 pmix_fd_set_cloexec(pmix_ptl_globals.stop_thread[1]) != PMIX_SUCCESS) {
136 PMIX_ERROR_LOG(PMIX_ERR_IN_ERRNO);
137 close(pmix_ptl_globals.stop_thread[0]);
138 close(pmix_ptl_globals.stop_thread[1]);
139 return PMIX_ERR_OUT_OF_RESOURCE;
140 }
141
142 pmix_ptl_globals.listen_thread_active = true;
143 if (0 > pthread_create(&engine, NULL, listen_thread, NULL)) {
144 pmix_ptl_globals.listen_thread_active = false;
145 return PMIX_ERROR;
146 }
147
148 return PMIX_SUCCESS;
149 }
150
151 void pmix_ptl_base_stop_listening(void)
152 {
153 int i;
154 pmix_listener_t *lt;
155
156 pmix_output_verbose(8, pmix_ptl_base_framework.framework_output,
157 "listen_thread: shutdown");
158
159 if (!pmix_ptl_globals.listen_thread_active) {
160
161 return;
162 }
163
164
165 pmix_ptl_globals.listen_thread_active = false;
166
167
168
169 i=1;
170 if (0 > write(pmix_ptl_globals.stop_thread[1], &i, sizeof(int))) {
171 return;
172 }
173
174 pthread_join(engine, NULL);
175
176 PMIX_LIST_FOREACH(lt, &pmix_ptl_globals.listeners, pmix_listener_t) {
177 CLOSE_THE_SOCKET(lt->socket);
178 lt->socket = -1;
179 }
180 }
181
182 static void* listen_thread(void *obj)
183 {
184 int rc, max, accepted_connections;
185 socklen_t addrlen = sizeof(struct sockaddr_storage);
186 pmix_pending_connection_t *pending_connection;
187 struct timeval timeout;
188 fd_set readfds;
189 pmix_listener_t *lt;
190
191 pmix_output_verbose(8, pmix_ptl_base_framework.framework_output,
192 "listen_thread: active");
193
194
195 while (pmix_ptl_globals.listen_thread_active) {
196 FD_ZERO(&readfds);
197 max = -1;
198 PMIX_LIST_FOREACH(lt, &pmix_ptl_globals.listeners, pmix_listener_t) {
199 FD_SET(lt->socket, &readfds);
200 max = (lt->socket > max) ? lt->socket : max;
201 }
202
203 FD_SET(pmix_ptl_globals.stop_thread[0], &readfds);
204 max = (pmix_ptl_globals.stop_thread[0] > max) ? pmix_ptl_globals.stop_thread[0] : max;
205
206
207 timeout.tv_sec = 2;
208 timeout.tv_usec = 0;
209
210
211
212
213 rc = select(max + 1, &readfds, NULL, NULL, &timeout);
214 if (!pmix_ptl_globals.listen_thread_active) {
215
216 close(pmix_ptl_globals.stop_thread[0]);
217 close(pmix_ptl_globals.stop_thread[1]);
218 return NULL;
219 }
220 if (rc < 0) {
221 continue;
222 }
223
224
225
226
227
228 do {
229 accepted_connections = 0;
230 PMIX_LIST_FOREACH(lt, &pmix_ptl_globals.listeners, pmix_listener_t) {
231
232
233
234
235
236
237
238 if (0 == FD_ISSET(lt->socket, &readfds)) {
239
240 continue;
241 }
242
243
244
245
246
247
248
249
250 pending_connection = PMIX_NEW(pmix_pending_connection_t);
251 pending_connection->protocol = lt->protocol;
252 pending_connection->ptl = lt->ptl;
253 pmix_event_assign(&pending_connection->ev, pmix_globals.evbase, -1,
254 EV_WRITE, lt->cbfunc, pending_connection);
255 pending_connection->sd = accept(lt->socket,
256 (struct sockaddr*)&(pending_connection->addr),
257 &addrlen);
258 if (pending_connection->sd < 0) {
259 PMIX_RELEASE(pending_connection);
260 if (pmix_socket_errno != EAGAIN ||
261 pmix_socket_errno != EWOULDBLOCK) {
262 if (EMFILE == pmix_socket_errno ||
263 ENOBUFS == pmix_socket_errno ||
264 ENOMEM == pmix_socket_errno) {
265 PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
266 } else if (EINVAL == pmix_socket_errno ||
267 EINTR == pmix_socket_errno) {
268
269 goto done;
270 } else if (ECONNABORTED == pmix_socket_errno) {
271
272 continue;
273 } else {
274 pmix_output(0, "listen_thread: accept() failed: %s (%d).",
275 strerror(pmix_socket_errno), pmix_socket_errno);
276 }
277 goto done;
278 }
279 continue;
280 }
281
282 pmix_output_verbose(8, pmix_ptl_base_framework.framework_output,
283 "listen_thread: new connection: (%d, %d)",
284 pending_connection->sd, pmix_socket_errno);
285
286 PMIX_POST_OBJECT(pending_connection);
287
288 pmix_event_active(&pending_connection->ev, EV_WRITE, 1);
289 accepted_connections++;
290 }
291 } while (accepted_connections > 0);
292 }
293
294 done:
295 pmix_ptl_globals.listen_thread_active = false;
296 return NULL;
297 }