This source file includes following definitions.
- orte_oob_tcp_start_listening
- create_listen
- create_listen6
- listen_thread
- connection_handler
- connection_event_handler
- tcp_ev_cons
- tcp_ev_des
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 #include "orte_config.h"
32 #include "orte/types.h"
33 #include "opal/types.h"
34
35 #ifdef HAVE_UNISTD_H
36 #include <unistd.h>
37 #endif
38 #ifdef HAVE_SYS_TYPES_H
39 #include <sys/types.h>
40 #endif
41 #include <fcntl.h>
42 #ifdef HAVE_NETINET_IN_H
43 #include <netinet/in.h>
44 #endif
45 #ifdef HAVE_ARPA_INET_H
46 #include <arpa/inet.h>
47 #endif
48 #ifdef HAVE_NETDB_H
49 #include <netdb.h>
50 #endif
51 #include <ctype.h>
52
53 #include "opal/util/show_help.h"
54 #include "opal/util/error.h"
55 #include "opal/util/output.h"
56 #include "opal/opal_socket_errno.h"
57 #include "opal/util/if.h"
58 #include "opal/util/net.h"
59 #include "opal/util/argv.h"
60 #include "opal/util/fd.h"
61 #include "opal/class/opal_hash_table.h"
62 #include "opal/class/opal_list.h"
63
64 #include "orte/mca/errmgr/errmgr.h"
65 #include "orte/mca/ess/ess.h"
66 #include "orte/util/name_fns.h"
67 #include "orte/util/parse_options.h"
68 #include "orte/util/show_help.h"
69 #include "orte/util/threads.h"
70 #include "orte/runtime/orte_globals.h"
71
72 #include "orte/mca/oob/tcp/oob_tcp.h"
73 #include "orte/mca/oob/tcp/oob_tcp_component.h"
74 #include "orte/mca/oob/tcp/oob_tcp_peer.h"
75 #include "orte/mca/oob/tcp/oob_tcp_connection.h"
76 #include "orte/mca/oob/tcp/oob_tcp_listener.h"
77 #include "orte/mca/oob/tcp/oob_tcp_common.h"
78
79 static void connection_event_handler(int incoming_sd, short flags, void* cbdata);
80 static void* listen_thread(opal_object_t *obj);
81 static int create_listen(void);
82 #if OPAL_ENABLE_IPV6
83 static int create_listen6(void);
84 #endif
85 static void connection_handler(int sd, short flags, void* cbdata);
86 static void connection_event_handler(int sd, short flags, void* cbdata);
87
88
89
90
91
92
93
94
95
96
97
98
99 int orte_oob_tcp_start_listening(void)
100 {
101 int rc;
102 mca_oob_tcp_listener_t *listener;
103
104
105 if (NULL == mca_oob_tcp_component.ipv4conns
106 #if OPAL_ENABLE_IPV6
107 && NULL == mca_oob_tcp_component.ipv6conns
108 #endif
109 ) {
110 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
111 return ORTE_ERR_NOT_FOUND;
112 }
113
114
115 if (ORTE_SUCCESS != (rc = create_listen())) {
116 ORTE_ERROR_LOG(rc);
117 return rc;
118 }
119
120 #if OPAL_ENABLE_IPV6
121
122 if (ORTE_SUCCESS != (rc = create_listen6())) {
123 ORTE_ERROR_LOG(rc);
124 return rc;
125 }
126 #endif
127
128
129
130
131 if (ORTE_PROC_IS_HNP) {
132 if (0 > pipe(mca_oob_tcp_component.stop_thread)) {
133 ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
134 return ORTE_ERR_OUT_OF_RESOURCE;
135 }
136
137
138
139 if (opal_fd_set_cloexec(mca_oob_tcp_component.stop_thread[0]) != OPAL_SUCCESS ||
140 opal_fd_set_cloexec(mca_oob_tcp_component.stop_thread[1]) != OPAL_SUCCESS) {
141 close(mca_oob_tcp_component.stop_thread[0]);
142 close(mca_oob_tcp_component.stop_thread[1]);
143 ORTE_ERROR_LOG(ORTE_ERR_IN_ERRNO);
144 return ORTE_ERR_IN_ERRNO;
145 }
146
147 mca_oob_tcp_component.listen_thread_active = true;
148 mca_oob_tcp_component.listen_thread.t_run = listen_thread;
149 mca_oob_tcp_component.listen_thread.t_arg = NULL;
150 if (OPAL_SUCCESS != (rc = opal_thread_start(&mca_oob_tcp_component.listen_thread))) {
151 ORTE_ERROR_LOG(rc);
152 opal_output(0, "%s Unable to start listen thread", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
153 }
154 return rc;
155 }
156
157
158 OPAL_LIST_FOREACH(listener, &mca_oob_tcp_component.listeners, mca_oob_tcp_listener_t) {
159 listener->ev_active = true;
160 opal_event_set(orte_oob_base.ev_base, &listener->event,
161 listener->sd,
162 OPAL_EV_READ|OPAL_EV_PERSIST,
163 connection_event_handler,
164 0);
165 opal_event_set_priority(&listener->event, ORTE_MSG_PRI);
166 ORTE_POST_OBJECT(listener);
167 opal_event_add(&listener->event, 0);
168 }
169
170 return ORTE_SUCCESS;
171 }
172
173
174
175
176
177
178
179
180
181
182 static int create_listen(void)
183 {
184 int flags, i;
185 uint16_t port=0;
186 struct sockaddr_storage inaddr;
187 opal_socklen_t addrlen;
188 char **ports=NULL;
189 int sd = -1;
190 char *tconn;
191 mca_oob_tcp_listener_t *conn;
192
193
194
195
196
197 if (ORTE_PROC_IS_DAEMON) {
198 if (NULL != mca_oob_tcp_component.tcp_static_ports) {
199
200
201
202 opal_argv_append_nosize(&ports, mca_oob_tcp_component.tcp_static_ports[0]);
203
204 orte_static_ports = true;
205 } else if (NULL != mca_oob_tcp_component.tcp_dyn_ports) {
206
207 ports = opal_argv_copy(mca_oob_tcp_component.tcp_dyn_ports);
208 orte_static_ports = false;
209 } else {
210
211 opal_argv_append_nosize(&ports, "0");
212 orte_static_ports = false;
213 }
214 } else if (ORTE_PROC_IS_HNP) {
215 if (NULL != mca_oob_tcp_component.tcp_static_ports) {
216
217
218
219 opal_argv_append_nosize(&ports, mca_oob_tcp_component.tcp_static_ports[0]);
220
221 orte_static_ports = true;
222 } else if (NULL != mca_oob_tcp_component.tcp_dyn_ports) {
223
224 ports = opal_argv_copy(mca_oob_tcp_component.tcp_dyn_ports);
225 orte_static_ports = false;
226 } else {
227
228 opal_argv_append_nosize(&ports, "0");
229 orte_static_ports = false;
230 }
231 } else if (ORTE_PROC_IS_MPI) {
232 if (NULL != mca_oob_tcp_component.tcp_static_ports) {
233
234
235
236
237 orte_node_rank_t nrank;
238
239 if (ORTE_NODE_RANK_INVALID != (nrank = orte_process_info.my_node_rank) &&
240 (nrank+1) < opal_argv_count(mca_oob_tcp_component.tcp_static_ports)) {
241
242 opal_argv_append_nosize(&ports, mca_oob_tcp_component.tcp_static_ports[nrank+1]);
243
244 orte_static_ports = true;
245 } else {
246
247 opal_argv_append_nosize(&ports, "0");
248 orte_static_ports = false;
249 }
250 } else if (NULL != mca_oob_tcp_component.tcp_dyn_ports) {
251
252 ports = opal_argv_copy(mca_oob_tcp_component.tcp_dyn_ports);
253 orte_static_ports = false;
254 } else {
255
256 opal_argv_append_nosize(&ports, "0");
257 orte_static_ports = false;
258 }
259 } else {
260
261
262
263 opal_argv_append_nosize(&ports, "0");
264
265
266
267 if (NULL != mca_oob_tcp_component.tcp_static_ports) {
268 orte_static_ports = true;
269 } else {
270 orte_static_ports = false;
271 }
272 }
273
274
275 if (NULL == ports) {
276 return ORTE_ERROR;
277 }
278
279
280 memset(&inaddr, 0, sizeof(inaddr));
281 ((struct sockaddr_in*) &inaddr)->sin_family = AF_INET;
282 ((struct sockaddr_in*) &inaddr)->sin_addr.s_addr = INADDR_ANY;
283 addrlen = sizeof(struct sockaddr_in);
284
285
286
287
288
289
290 for (i=0; i < opal_argv_count(ports); i++) {
291 opal_output_verbose(5, orte_oob_base_framework.framework_output,
292 "%s attempting to bind to IPv4 port %s",
293 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
294 ports[i]);
295
296 port = strtol(ports[i], NULL, 10);
297
298 port = htons(port);
299
300 ((struct sockaddr_in*) &inaddr)->sin_port = port;
301
302
303 sd = socket(AF_INET, SOCK_STREAM, 0);
304 if (sd < 0) {
305 if (EAFNOSUPPORT != opal_socket_errno) {
306 opal_output(0,"mca_oob_tcp_component_init: socket() failed: %s (%d)",
307 strerror(opal_socket_errno), opal_socket_errno);
308 }
309 opal_argv_free(ports);
310 return ORTE_ERR_IN_ERRNO;
311 }
312
313
314 if (orte_static_ports) {
315 flags = 1;
316 } else {
317 flags = 0;
318 }
319 if (setsockopt (sd, SOL_SOCKET, SO_REUSEADDR, (const char *)&flags, sizeof(flags)) < 0) {
320 opal_output(0, "mca_oob_tcp_create_listen: unable to set the "
321 "SO_REUSEADDR option (%s:%d)\n",
322 strerror(opal_socket_errno), opal_socket_errno);
323 CLOSE_THE_SOCKET(sd);
324 opal_argv_free(ports);
325 return ORTE_ERROR;
326 }
327
328
329
330 if (opal_fd_set_cloexec(sd) != OPAL_SUCCESS) {
331 opal_output(0, "mca_oob_tcp_create_listen: unable to set the "
332 "listening socket to CLOEXEC (%s:%d)\n",
333 strerror(opal_socket_errno), opal_socket_errno);
334 CLOSE_THE_SOCKET(sd);
335 opal_argv_free(ports);
336 return ORTE_ERROR;
337 }
338
339 if (bind(sd, (struct sockaddr*)&inaddr, addrlen) < 0) {
340 if( (EADDRINUSE == opal_socket_errno) || (EADDRNOTAVAIL == opal_socket_errno) ) {
341 continue;
342 }
343 opal_output(0, "%s bind() failed for port %d: %s (%d)",
344 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
345 (int)ntohs(port),
346 strerror(opal_socket_errno),
347 opal_socket_errno );
348 CLOSE_THE_SOCKET(sd);
349 opal_argv_free(ports);
350 return ORTE_ERROR;
351 }
352
353 if (getsockname(sd, (struct sockaddr*)&inaddr, &addrlen) < 0) {
354 opal_output(0, "mca_oob_tcp_create_listen: getsockname(): %s (%d)",
355 strerror(opal_socket_errno), opal_socket_errno);
356 CLOSE_THE_SOCKET(sd);
357 opal_argv_free(ports);
358 return ORTE_ERROR;
359 }
360
361
362 if (listen(sd, SOMAXCONN) < 0) {
363 opal_output(0, "mca_oob_tcp_component_init: listen(): %s (%d)",
364 strerror(opal_socket_errno), opal_socket_errno);
365 CLOSE_THE_SOCKET(sd);
366 opal_argv_free(ports);
367 return ORTE_ERROR;
368 }
369
370
371 if ((flags = fcntl(sd, F_GETFL, 0)) < 0) {
372 opal_output(0, "mca_oob_tcp_component_init: fcntl(F_GETFL) failed: %s (%d)",
373 strerror(opal_socket_errno), opal_socket_errno);
374 CLOSE_THE_SOCKET(sd);
375 opal_argv_free(ports);
376 return ORTE_ERROR;
377 }
378 flags |= O_NONBLOCK;
379 if (fcntl(sd, F_SETFL, flags) < 0) {
380 opal_output(0, "mca_oob_tcp_component_init: fcntl(F_SETFL) failed: %s (%d)",
381 strerror(opal_socket_errno), opal_socket_errno);
382 CLOSE_THE_SOCKET(sd);
383 opal_argv_free(ports);
384 return ORTE_ERROR;
385 }
386
387
388 conn = OBJ_NEW(mca_oob_tcp_listener_t);
389 conn->sd = sd;
390 conn->port = ntohs(((struct sockaddr_in*) &inaddr)->sin_port);
391 if (0 == orte_process_info.my_port) {
392
393 orte_process_info.my_port = conn->port;
394 }
395 opal_list_append(&mca_oob_tcp_component.listeners, &conn->item);
396
397 opal_asprintf(&tconn, "%d", ntohs(((struct sockaddr_in*) &inaddr)->sin_port));
398 opal_argv_append_nosize(&mca_oob_tcp_component.ipv4ports, tconn);
399 free(tconn);
400 if (OOB_TCP_DEBUG_CONNECT <= opal_output_get_verbosity(orte_oob_base_framework.framework_output)) {
401 port = ntohs(((struct sockaddr_in*) &inaddr)->sin_port);
402 opal_output(0, "%s assigned IPv4 port %d",
403 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), port);
404 }
405
406 if (!ORTE_PROC_IS_HNP) {
407
408 break;
409 }
410 }
411
412 opal_argv_free(ports);
413
414 if (0 == opal_list_get_size(&mca_oob_tcp_component.listeners)) {
415
416 if (0 <= sd) {
417 CLOSE_THE_SOCKET(sd);
418 }
419 return ORTE_ERR_SOCKET_NOT_AVAILABLE;
420 }
421
422 return ORTE_SUCCESS;
423 }
424
425 #if OPAL_ENABLE_IPV6
426
427
428
429
430
431
432
433
434
435 static int create_listen6(void)
436 {
437 int flags, i;
438 uint16_t port=0;
439 struct sockaddr_storage inaddr;
440 opal_socklen_t addrlen;
441 char **ports=NULL;
442 int sd;
443 char *tconn;
444 mca_oob_tcp_listener_t *conn;
445
446
447
448
449
450 if (ORTE_PROC_IS_DAEMON) {
451 if (NULL != mca_oob_tcp_component.tcp6_static_ports) {
452
453
454
455 opal_argv_append_nosize(&ports, mca_oob_tcp_component.tcp6_static_ports[0]);
456
457 orte_static_ports = true;
458 } else if (NULL != mca_oob_tcp_component.tcp6_dyn_ports) {
459
460 ports = opal_argv_copy(mca_oob_tcp_component.tcp6_dyn_ports);
461 orte_static_ports = false;
462 } else {
463
464 opal_argv_append_nosize(&ports, "0");
465 orte_static_ports = false;
466 }
467 } else if (ORTE_PROC_IS_HNP) {
468 if (NULL != mca_oob_tcp_component.tcp6_static_ports) {
469
470
471
472 opal_argv_append_nosize(&ports, mca_oob_tcp_component.tcp6_static_ports[0]);
473
474 orte_static_ports = true;
475 } else if (NULL != mca_oob_tcp_component.tcp6_dyn_ports) {
476
477 ports = opal_argv_copy(mca_oob_tcp_component.tcp6_dyn_ports);
478 orte_static_ports = false;
479 } else {
480
481 opal_argv_append_nosize(&ports, "0");
482 orte_static_ports = false;
483 }
484 } else if (ORTE_PROC_IS_MPI) {
485 if (NULL != mca_oob_tcp_component.tcp6_static_ports) {
486
487
488
489
490 orte_node_rank_t nrank;
491
492 if (ORTE_NODE_RANK_INVALID != (nrank = orte_process_info.my_node_rank) &&
493 (nrank+1) < opal_argv_count(mca_oob_tcp_component.tcp6_static_ports)) {
494
495 opal_argv_append_nosize(&ports, mca_oob_tcp_component.tcp6_static_ports[nrank+1]);
496
497 orte_static_ports = true;
498 } else {
499
500 opal_argv_append_nosize(&ports, "0");
501 orte_static_ports = false;
502 }
503 } else if (NULL != mca_oob_tcp_component.tcp6_dyn_ports) {
504
505 ports = opal_argv_copy(mca_oob_tcp_component.tcp6_dyn_ports);
506 orte_static_ports = false;
507 } else {
508
509 opal_argv_append_nosize(&ports, "0");
510 orte_static_ports = false;
511 }
512 } else {
513
514
515
516 opal_argv_append_nosize(&ports, "0");
517
518
519
520 if (NULL != mca_oob_tcp_component.tcp6_static_ports) {
521 orte_static_ports = true;
522 } else {
523 orte_static_ports = false;
524 }
525 }
526
527
528 if (NULL == ports) {
529 return ORTE_ERROR;
530 }
531
532
533 memset(&inaddr, 0, sizeof(inaddr));
534 ((struct sockaddr_in6*) &inaddr)->sin6_family = AF_INET6;
535 ((struct sockaddr_in6*) &inaddr)->sin6_addr = in6addr_any;
536 addrlen = sizeof(struct sockaddr_in6);
537
538
539
540
541
542
543 for (i=0; i < opal_argv_count(ports); i++) {
544 opal_output_verbose(5, orte_oob_base_framework.framework_output,
545 "%s attempting to bind to IPv6 port %s",
546 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
547 ports[i]);
548
549 port = strtol(ports[i], NULL, 10);
550
551 port = htons(port);
552
553 ((struct sockaddr_in6*) &inaddr)->sin6_port = port;
554
555
556 sd = socket(AF_INET6, SOCK_STREAM, 0);
557 if (sd < 0) {
558 if (EAFNOSUPPORT != opal_socket_errno) {
559 opal_output(0,"mca_oob_tcp_component_init: socket() failed: %s (%d)",
560 strerror(opal_socket_errno), opal_socket_errno);
561 }
562 return ORTE_ERR_IN_ERRNO;
563 }
564
565
566 if (opal_fd_set_cloexec(sd) != OPAL_SUCCESS) {
567 opal_output(0, "mca_oob_tcp_create_listen6: unable to set the "
568 "listening socket to CLOEXEC (%s:%d)\n",
569 strerror(opal_socket_errno), opal_socket_errno);
570 CLOSE_THE_SOCKET(sd);
571 opal_argv_free(ports);
572 return ORTE_ERROR;
573 }
574
575
576 if (orte_static_ports) {
577 flags = 1;
578 } else {
579 flags = 0;
580 }
581 if (setsockopt (sd, SOL_SOCKET, SO_REUSEADDR, (const char *)&flags, sizeof(flags)) < 0) {
582 opal_output(0, "mca_oob_tcp_create_listen: unable to set the "
583 "SO_REUSEADDR option (%s:%d)\n",
584 strerror(opal_socket_errno), opal_socket_errno);
585 CLOSE_THE_SOCKET(sd);
586 opal_argv_free(ports);
587 return ORTE_ERROR;
588 }
589
590 if (bind(sd, (struct sockaddr*)&inaddr, addrlen) < 0) {
591 if( (EADDRINUSE == opal_socket_errno) || (EADDRNOTAVAIL == opal_socket_errno) ) {
592 continue;
593 }
594 opal_output(0, "%s bind() failed for port %d: %s (%d)",
595 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
596 (int)ntohs(port),
597 strerror(opal_socket_errno),
598 opal_socket_errno );
599 CLOSE_THE_SOCKET(sd);
600 opal_argv_free(ports);
601 return ORTE_ERROR;
602 }
603
604 if (getsockname(sd, (struct sockaddr*)&inaddr, &addrlen) < 0) {
605 opal_output(0, "mca_oob_tcp_create_listen: getsockname(): %s (%d)",
606 strerror(opal_socket_errno), opal_socket_errno);
607 CLOSE_THE_SOCKET(sd);
608 return ORTE_ERROR;
609 }
610
611
612 if (listen(sd, SOMAXCONN) < 0) {
613 opal_output(0, "mca_oob_tcp_component_init: listen(): %s (%d)",
614 strerror(opal_socket_errno), opal_socket_errno);
615 return ORTE_ERROR;
616 }
617
618
619 if ((flags = fcntl(sd, F_GETFL, 0)) < 0) {
620 opal_output(0, "mca_oob_tcp_component_init: fcntl(F_GETFL) failed: %s (%d)",
621 strerror(opal_socket_errno), opal_socket_errno);
622 return ORTE_ERROR;
623 }
624 flags |= O_NONBLOCK;
625 if (fcntl(sd, F_SETFL, flags) < 0) {
626 opal_output(0, "mca_oob_tcp_component_init: fcntl(F_SETFL) failed: %s (%d)",
627 strerror(opal_socket_errno), opal_socket_errno);
628 return ORTE_ERROR;
629 }
630
631
632 conn = OBJ_NEW(mca_oob_tcp_listener_t);
633 conn->tcp6 = true;
634 conn->sd = sd;
635 conn->port = ntohs(((struct sockaddr_in6*) &inaddr)->sin6_port);
636 opal_list_append(&mca_oob_tcp_component.listeners, &conn->item);
637
638 opal_asprintf(&tconn, "%d", ntohs(((struct sockaddr_in6*) &inaddr)->sin6_port));
639 opal_argv_append_nosize(&mca_oob_tcp_component.ipv6ports, tconn);
640 free(tconn);
641 if (OOB_TCP_DEBUG_CONNECT <= opal_output_get_verbosity(orte_oob_base_framework.framework_output)) {
642 opal_output(0, "%s assigned IPv6 port %d",
643 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
644 (int)ntohs(((struct sockaddr_in6*) &inaddr)->sin6_port));
645 }
646
647 if (!ORTE_PROC_IS_HNP) {
648
649 break;
650 }
651 }
652 if (0 == opal_list_get_size(&mca_oob_tcp_component.listeners)) {
653
654 CLOSE_THE_SOCKET(sd);
655 opal_argv_free(ports);
656 return ORTE_ERR_SOCKET_NOT_AVAILABLE;
657 }
658
659
660 opal_argv_free(ports);
661
662 return ORTE_SUCCESS;
663 }
664 #endif
665
666
667
668
669
670
671
672
673 static void* listen_thread(opal_object_t *obj)
674 {
675 int rc, max, accepted_connections, sd;
676 opal_socklen_t addrlen = sizeof(struct sockaddr_storage);
677 mca_oob_tcp_pending_connection_t *pending_connection;
678 struct timeval timeout;
679 fd_set readfds;
680 mca_oob_tcp_listener_t *listener;
681
682
683
684
685
686
687 while (mca_oob_tcp_component.listen_thread_active) {
688 FD_ZERO(&readfds);
689 max = -1;
690 OPAL_LIST_FOREACH(listener, &mca_oob_tcp_component.listeners, mca_oob_tcp_listener_t) {
691 FD_SET(listener->sd, &readfds);
692 max = (listener->sd > max) ? listener->sd : max;
693 }
694
695 FD_SET(mca_oob_tcp_component.stop_thread[0], &readfds);
696 max = (mca_oob_tcp_component.stop_thread[0] > max) ? mca_oob_tcp_component.stop_thread[0] : max;
697
698
699 timeout.tv_sec = mca_oob_tcp_component.listen_thread_tv.tv_sec;
700 timeout.tv_usec = mca_oob_tcp_component.listen_thread_tv.tv_usec;
701
702
703
704
705 rc = select(max + 1, &readfds, NULL, NULL, &timeout);
706 if (!mca_oob_tcp_component.listen_thread_active) {
707
708 close(mca_oob_tcp_component.stop_thread[0]);
709 close(mca_oob_tcp_component.stop_thread[1]);
710 return NULL;
711 }
712 if (rc < 0) {
713 if (EAGAIN != opal_socket_errno && EINTR != opal_socket_errno) {
714 perror("select");
715 }
716 continue;
717 }
718
719
720
721
722
723 do {
724 accepted_connections = 0;
725 OPAL_LIST_FOREACH(listener, &mca_oob_tcp_component.listeners, mca_oob_tcp_listener_t) {
726 sd = listener->sd;
727
728
729
730
731
732
733
734 if (0 == FD_ISSET(sd, &readfds)) {
735
736 continue;
737 }
738
739
740
741
742
743
744
745
746 pending_connection = OBJ_NEW(mca_oob_tcp_pending_connection_t);
747 opal_event_set(orte_oob_base.ev_base, &pending_connection->ev, -1,
748 OPAL_EV_WRITE, connection_handler, pending_connection);
749 opal_event_set_priority(&pending_connection->ev, ORTE_MSG_PRI);
750 pending_connection->fd = accept(sd,
751 (struct sockaddr*)&(pending_connection->addr),
752 &addrlen);
753
754
755 if (pending_connection->fd < 0) {
756 OBJ_RELEASE(pending_connection);
757
758
759 if (EAGAIN == opal_socket_errno ||
760 EWOULDBLOCK == opal_socket_errno) {
761 continue;
762 }
763
764
765
766
767 else if (EMFILE == opal_socket_errno) {
768 CLOSE_THE_SOCKET(sd);
769 ORTE_ERROR_LOG(ORTE_ERR_SYS_LIMITS_SOCKETS);
770 orte_show_help("help-oob-tcp.txt",
771 "accept failed",
772 true,
773 opal_process_info.nodename,
774 opal_socket_errno,
775 strerror(opal_socket_errno),
776 "Out of file descriptors");
777 goto done;
778 }
779
780
781
782 else {
783 orte_show_help("help-oob-tcp.txt",
784 "accept failed",
785 true,
786 opal_process_info.nodename,
787 opal_socket_errno,
788 strerror(opal_socket_errno),
789 "Unknown cause; job will try to continue");
790 continue;
791 }
792 }
793
794 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
795 "%s mca_oob_tcp_listen_thread: incoming connection: "
796 "(%d, %d) %s:%d\n",
797 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
798 pending_connection->fd, opal_socket_errno,
799 opal_net_get_hostname((struct sockaddr*) &pending_connection->addr),
800 opal_net_get_port((struct sockaddr*) &pending_connection->addr));
801
802
803
804
805 if (1024 >= listener->port) {
806 uint16_t inport;
807 inport = opal_net_get_port((struct sockaddr*) &pending_connection->addr);
808 if (1024 < inport) {
809
810
811 orte_show_help("help-oob-tcp.txt",
812 "privilege failure", true,
813 opal_process_info.nodename, listener->port,
814 opal_net_get_hostname((struct sockaddr*) &pending_connection->addr),
815 inport);
816 CLOSE_THE_SOCKET(pending_connection->fd);
817 OBJ_RELEASE(pending_connection);
818 continue;
819 }
820 }
821
822
823 ORTE_POST_OBJECT(pending_connection);
824 opal_event_active(&pending_connection->ev, OPAL_EV_WRITE, 1);
825 accepted_connections++;
826 }
827 } while (accepted_connections > 0);
828 }
829
830 done:
831 #if 0
832
833
834
835
836
837
838
839
840 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
841 "%s mca_oob_tcp_listen_thread: switching to event lib",
842 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
843
844 OPAL_LIST_FOREACH(listener, &mca_oob_tcp_component.listeners, mca_oob_tcp_listener_t) {
845 opal_event_set(orte_event_base, listener->event,
846 listener->sd,
847 OPAL_EV_READ|OPAL_EV_PERSIST,
848 connection_event_handler,
849 0);
850 opal_event_set_priority(listener->event, ORTE_MSG_PRI);
851 opal_event_add(listener->event, 0);
852 }
853 #endif
854 return NULL;
855 }
856
857
858
859
860 static void connection_handler(int sd, short flags, void* cbdata)
861 {
862 mca_oob_tcp_pending_connection_t *new_connection;
863
864 new_connection = (mca_oob_tcp_pending_connection_t*)cbdata;
865
866 ORTE_ACQUIRE_OBJECT(new_connection);
867
868 opal_output_verbose(4, orte_oob_base_framework.framework_output,
869 "%s connection_handler: working connection "
870 "(%d, %d) %s:%d\n",
871 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
872 new_connection->fd, opal_socket_errno,
873 opal_net_get_hostname((struct sockaddr*) &new_connection->addr),
874 opal_net_get_port((struct sockaddr*) &new_connection->addr));
875
876
877 mca_oob_tcp_module.accept_connection(new_connection->fd,
878 (struct sockaddr*) &(new_connection->addr));
879
880 OBJ_RELEASE(new_connection);
881 }
882
883
884
885
886 static void connection_event_handler(int incoming_sd, short flags, void* cbdata)
887 {
888 struct sockaddr addr;
889 opal_socklen_t addrlen = sizeof(struct sockaddr);
890 int sd;
891
892 sd = accept(incoming_sd, (struct sockaddr*)&addr, &addrlen);
893 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
894 "%s connection_event_handler: working connection "
895 "(%d, %d) %s:%d\n",
896 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
897 sd, opal_socket_errno,
898 opal_net_get_hostname((struct sockaddr*) &addr),
899 opal_net_get_port((struct sockaddr*) &addr));
900 if (sd < 0) {
901
902 if (EINTR == opal_socket_errno ||
903 EAGAIN == opal_socket_errno ||
904 EWOULDBLOCK == opal_socket_errno) {
905 return;
906 }
907
908
909
910
911 else if (EMFILE == opal_socket_errno) {
912 CLOSE_THE_SOCKET(incoming_sd);
913 ORTE_ERROR_LOG(ORTE_ERR_SYS_LIMITS_SOCKETS);
914 orte_show_help("help-oob-tcp.txt",
915 "accept failed",
916 true,
917 opal_process_info.nodename,
918 opal_socket_errno,
919 strerror(opal_socket_errno),
920 "Out of file descriptors");
921 orte_errmgr.abort(ORTE_ERROR_DEFAULT_EXIT_CODE, NULL);
922 return;
923 }
924
925
926
927 else {
928 CLOSE_THE_SOCKET(incoming_sd);
929 orte_show_help("help-oob-tcp.txt",
930 "accept failed",
931 true,
932 opal_process_info.nodename,
933 opal_socket_errno,
934 strerror(opal_socket_errno),
935 "Unknown cause; job will try to continue");
936 return;
937 }
938 }
939
940
941 mca_oob_tcp_module.accept_connection(sd, &addr);
942 }
943
944
945 static void tcp_ev_cons(mca_oob_tcp_listener_t* event)
946 {
947 event->ev_active = false;
948 event->tcp6 = false;
949 event->sd = -1;
950 event->port = 0;
951 }
952 static void tcp_ev_des(mca_oob_tcp_listener_t* event)
953 {
954 if (event->ev_active) {
955 opal_event_del(&event->event);
956 }
957 event->ev_active = false;
958 if (0 <= event->sd) {
959 CLOSE_THE_SOCKET(event->sd);
960 event->sd = -1;
961 }
962 }
963
964 OBJ_CLASS_INSTANCE(mca_oob_tcp_listener_t,
965 opal_list_item_t,
966 tcp_ev_cons, tcp_ev_des);
967
968 OBJ_CLASS_INSTANCE(mca_oob_tcp_pending_connection_t,
969 opal_object_t,
970 NULL,
971 NULL);