This source file includes following definitions.
- tcp_component_open
- tcp_component_close
- tcp_component_register
- component_available
- component_startup
- component_shutdown
- component_send
- component_get_addr
- parse_uri
- component_set_addr
- component_is_reachable
- component_ft_event
- mca_oob_tcp_component_set_module
- mca_oob_tcp_component_lost_connection
- mca_oob_tcp_component_no_route
- mca_oob_tcp_component_hop_unknown
- mca_oob_tcp_component_failed_to_connect
- split_and_resolve
- peer_cons
- peer_des
- padd_cons
- pop_cons
- pop_des
- nicaddr_cons
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 #include "orte_config.h"
35 #include "orte/types.h"
36 #include "opal/types.h"
37
38 #ifdef HAVE_UNISTD_H
39 #include <unistd.h>
40 #endif
41 #ifdef HAVE_SYS_TYPES_H
42 #include <sys/types.h>
43 #endif
44 #include <fcntl.h>
45 #ifdef HAVE_NETINET_IN_H
46 #include <netinet/in.h>
47 #endif
48 #ifdef HAVE_ARPA_INET_H
49 #include <arpa/inet.h>
50 #endif
51 #ifdef HAVE_NETDB_H
52 #include <netdb.h>
53 #endif
54 #include <ctype.h>
55 #include <sys/socket.h>
56 #include <arpa/inet.h>
57
58 #include "opal/util/show_help.h"
59 #include "opal/util/error.h"
60 #include "opal/util/output.h"
61 #include "opal/opal_socket_errno.h"
62 #include "opal/util/if.h"
63 #include "opal/util/net.h"
64 #include "opal/util/argv.h"
65 #include "opal/class/opal_hash_table.h"
66 #include "opal/class/opal_list.h"
67 #include "opal/mca/event/event.h"
68 #include "opal/runtime/opal_progress_threads.h"
69
70 #include "orte/mca/errmgr/errmgr.h"
71 #include "orte/mca/ess/ess.h"
72 #include "orte/mca/rml/rml_types.h"
73 #include "orte/mca/routed/routed.h"
74 #include "orte/mca/state/state.h"
75 #include "orte/util/attr.h"
76 #include "orte/util/name_fns.h"
77 #include "orte/util/parse_options.h"
78 #include "orte/util/show_help.h"
79 #include "orte/util/threads.h"
80 #include "orte/runtime/orte_globals.h"
81 #include "orte/runtime/orte_wait.h"
82
83 #include "orte/mca/oob/tcp/oob_tcp.h"
84 #include "orte/mca/oob/tcp/oob_tcp_common.h"
85 #include "orte/mca/oob/tcp/oob_tcp_component.h"
86 #include "orte/mca/oob/tcp/oob_tcp_peer.h"
87 #include "orte/mca/oob/tcp/oob_tcp_connection.h"
88 #include "orte/mca/oob/tcp/oob_tcp_listener.h"
89 #include "oob_tcp_peer.h"
90
91
92
93
94
95 static int tcp_component_register(void);
96 static int tcp_component_open(void);
97 static int tcp_component_close(void);
98
99 static int component_available(void);
100 static int component_startup(void);
101 static void component_shutdown(void);
102 static int component_send(orte_rml_send_t *msg);
103 static char* component_get_addr(void);
104 static int component_set_addr(orte_process_name_t *peer,
105 char **uris);
106 static bool component_is_reachable(orte_process_name_t *peer);
107 #if OPAL_ENABLE_FT_CR == 1
108 static int component_ft_event(int state);
109 #endif
110
111
112
113
114 mca_oob_tcp_component_t mca_oob_tcp_component = {
115 {
116 .oob_base = {
117 MCA_OOB_BASE_VERSION_2_0_0,
118 .mca_component_name = "tcp",
119 MCA_BASE_MAKE_VERSION(component, ORTE_MAJOR_VERSION, ORTE_MINOR_VERSION,
120 ORTE_RELEASE_VERSION),
121 .mca_open_component = tcp_component_open,
122 .mca_close_component = tcp_component_close,
123 .mca_register_component_params = tcp_component_register,
124 },
125 .oob_data = {
126
127 MCA_BASE_METADATA_PARAM_CHECKPOINT
128 },
129 .priority = 30,
130 .available = component_available,
131 .startup = component_startup,
132 .shutdown = component_shutdown,
133 .send_nb = component_send,
134 .get_addr = component_get_addr,
135 .set_addr = component_set_addr,
136 .is_reachable = component_is_reachable,
137 #if OPAL_ENABLE_FT_CR == 1
138 .ft_event = component_ft_event,
139 #endif
140 },
141 };
142
143
144
145
146 static int tcp_component_open(void)
147 {
148 mca_oob_tcp_component.next_base = 0;
149 OBJ_CONSTRUCT(&mca_oob_tcp_component.peers, opal_hash_table_t);
150 opal_hash_table_init(&mca_oob_tcp_component.peers, 32);
151 OBJ_CONSTRUCT(&mca_oob_tcp_component.ev_bases, opal_pointer_array_t);
152 opal_pointer_array_init(&mca_oob_tcp_component.ev_bases,
153 orte_oob_base.num_threads, 256, 8);
154
155 OBJ_CONSTRUCT(&mca_oob_tcp_component.listeners, opal_list_t);
156 if (ORTE_PROC_IS_HNP) {
157 OBJ_CONSTRUCT(&mca_oob_tcp_component.listen_thread, opal_thread_t);
158 mca_oob_tcp_component.listen_thread_active = false;
159 mca_oob_tcp_component.listen_thread_tv.tv_sec = 3600;
160 mca_oob_tcp_component.listen_thread_tv.tv_usec = 0;
161 }
162 mca_oob_tcp_component.addr_count = 0;
163 mca_oob_tcp_component.ipv4conns = NULL;
164 mca_oob_tcp_component.ipv4ports = NULL;
165 mca_oob_tcp_component.ipv6conns = NULL;
166 mca_oob_tcp_component.ipv6ports = NULL;
167
168
169 if (OPAL_SUCCESS !=
170 mca_base_var_check_exclusive("orte",
171 mca_oob_tcp_component.super.oob_base.mca_type_name,
172 mca_oob_tcp_component.super.oob_base.mca_component_name,
173 "if_include",
174 mca_oob_tcp_component.super.oob_base.mca_type_name,
175 mca_oob_tcp_component.super.oob_base.mca_component_name,
176 "if_exclude")) {
177
178
179 return ORTE_ERR_NOT_AVAILABLE;
180 }
181 return ORTE_SUCCESS;
182 }
183
184
185
186
187 static int tcp_component_close(void)
188 {
189
190 OPAL_LIST_DESTRUCT(&mca_oob_tcp_component.listeners);
191
192 OBJ_DESTRUCT(&mca_oob_tcp_component.peers);
193
194 if (NULL != mca_oob_tcp_component.ipv4conns) {
195 opal_argv_free(mca_oob_tcp_component.ipv4conns);
196 }
197 if (NULL != mca_oob_tcp_component.ipv4ports) {
198 opal_argv_free(mca_oob_tcp_component.ipv4ports);
199 }
200
201 #if OPAL_ENABLE_IPV6
202 if (NULL != mca_oob_tcp_component.ipv6conns) {
203 opal_argv_free(mca_oob_tcp_component.ipv6conns);
204 }
205 if (NULL != mca_oob_tcp_component.ipv6ports) {
206 opal_argv_free(mca_oob_tcp_component.ipv6ports);
207 }
208 #endif
209
210 OBJ_DESTRUCT(&mca_oob_tcp_component.ev_bases);
211
212 return ORTE_SUCCESS;
213 }
214 static char *static_port_string;
215 #if OPAL_ENABLE_IPV6
216 static char *static_port_string6;
217 #endif
218
219 static char *dyn_port_string;
220 #if OPAL_ENABLE_IPV6
221 static char *dyn_port_string6;
222 #endif
223
224 static int tcp_component_register(void)
225 {
226 mca_base_component_t *component = &mca_oob_tcp_component.super.oob_base;
227 int var_id;
228
229
230 mca_oob_tcp_component.peer_limit = -1;
231 (void)mca_base_component_var_register(component, "peer_limit",
232 "Maximum number of peer connections to simultaneously maintain (-1 = infinite)",
233 MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
234 OPAL_INFO_LVL_5,
235 MCA_BASE_VAR_SCOPE_LOCAL,
236 &mca_oob_tcp_component.peer_limit);
237
238 mca_oob_tcp_component.max_retries = 2;
239 (void)mca_base_component_var_register(component, "peer_retries",
240 "Number of times to try shutting down a connection before giving up",
241 MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
242 OPAL_INFO_LVL_5,
243 MCA_BASE_VAR_SCOPE_LOCAL,
244 &mca_oob_tcp_component.max_retries);
245
246 mca_oob_tcp_component.tcp_sndbuf = 0;
247 (void)mca_base_component_var_register(component, "sndbuf",
248 "TCP socket send buffering size (in bytes, 0 => leave system default)",
249 MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
250 OPAL_INFO_LVL_4,
251 MCA_BASE_VAR_SCOPE_LOCAL,
252 &mca_oob_tcp_component.tcp_sndbuf);
253
254 mca_oob_tcp_component.tcp_rcvbuf = 0;
255 (void)mca_base_component_var_register(component, "rcvbuf",
256 "TCP socket receive buffering size (in bytes, 0 => leave system default)",
257 MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
258 OPAL_INFO_LVL_4,
259 MCA_BASE_VAR_SCOPE_LOCAL,
260 &mca_oob_tcp_component.tcp_rcvbuf);
261
262 mca_oob_tcp_component.if_include = NULL;
263 var_id = mca_base_component_var_register(component, "if_include",
264 "Comma-delimited list of devices and/or CIDR notation of TCP networks to use for Open MPI bootstrap communication (e.g., \"eth0,192.168.0.0/16\"). Mutually exclusive with oob_tcp_if_exclude.",
265 MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0,
266 OPAL_INFO_LVL_2,
267 MCA_BASE_VAR_SCOPE_LOCAL,
268 &mca_oob_tcp_component.if_include);
269 (void)mca_base_var_register_synonym(var_id, "orte", "oob", "tcp", "include",
270 MCA_BASE_VAR_SYN_FLAG_DEPRECATED | MCA_BASE_VAR_SYN_FLAG_INTERNAL);
271
272 mca_oob_tcp_component.if_exclude = NULL;
273 var_id = mca_base_component_var_register(component, "if_exclude",
274 "Comma-delimited list of devices and/or CIDR notation of TCP networks to NOT use for Open MPI bootstrap communication -- all devices not matching these specifications will be used (e.g., \"eth0,192.168.0.0/16\"). If set to a non-default value, it is mutually exclusive with oob_tcp_if_include.",
275 MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0,
276 OPAL_INFO_LVL_2,
277 MCA_BASE_VAR_SCOPE_LOCAL,
278 &mca_oob_tcp_component.if_exclude);
279 (void)mca_base_var_register_synonym(var_id, "orte", "oob", "tcp", "exclude",
280 MCA_BASE_VAR_SYN_FLAG_DEPRECATED | MCA_BASE_VAR_SYN_FLAG_INTERNAL);
281
282
283 if (NULL != mca_oob_tcp_component.if_include &&
284 NULL != mca_oob_tcp_component.if_exclude) {
285
286
287 orte_show_help("help-oob-tcp.txt", "include-exclude", true,
288 mca_oob_tcp_component.if_include,
289 mca_oob_tcp_component.if_exclude);
290 return ORTE_ERR_NOT_AVAILABLE;
291 }
292
293 static_port_string = NULL;
294 (void)mca_base_component_var_register(component, "static_ipv4_ports",
295 "Static ports for daemons and procs (IPv4)",
296 MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0,
297 OPAL_INFO_LVL_2,
298 MCA_BASE_VAR_SCOPE_READONLY,
299 &static_port_string);
300
301
302 if (NULL != static_port_string) {
303 orte_util_parse_range_options(static_port_string, &mca_oob_tcp_component.tcp_static_ports);
304 if (0 == strcmp(mca_oob_tcp_component.tcp_static_ports[0], "-1")) {
305 opal_argv_free(mca_oob_tcp_component.tcp_static_ports);
306 mca_oob_tcp_component.tcp_static_ports = NULL;
307 }
308 } else {
309 mca_oob_tcp_component.tcp_static_ports = NULL;
310 }
311
312 #if OPAL_ENABLE_IPV6
313 static_port_string6 = NULL;
314 (void)mca_base_component_var_register(component, "static_ipv6_ports",
315 "Static ports for daemons and procs (IPv6)",
316 MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0,
317 OPAL_INFO_LVL_2,
318 MCA_BASE_VAR_SCOPE_READONLY,
319 &static_port_string6);
320
321
322 if (NULL != static_port_string6) {
323 orte_util_parse_range_options(static_port_string6, &mca_oob_tcp_component.tcp6_static_ports);
324 if (0 == strcmp(mca_oob_tcp_component.tcp6_static_ports[0], "-1")) {
325 opal_argv_free(mca_oob_tcp_component.tcp6_static_ports);
326 mca_oob_tcp_component.tcp6_static_ports = NULL;
327 }
328 } else {
329 mca_oob_tcp_component.tcp6_static_ports = NULL;
330 }
331 #endif
332
333 if (NULL != mca_oob_tcp_component.tcp_static_ports ||
334 NULL != mca_oob_tcp_component.tcp6_static_ports) {
335 orte_static_ports = true;
336 }
337
338 dyn_port_string = NULL;
339 (void)mca_base_component_var_register(component, "dynamic_ipv4_ports",
340 "Range of ports to be dynamically used by daemons and procs (IPv4)",
341 MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0,
342 OPAL_INFO_LVL_4,
343 MCA_BASE_VAR_SCOPE_READONLY,
344 &dyn_port_string);
345
346 if (NULL != dyn_port_string) {
347
348 if (orte_static_ports) {
349 char *err = opal_argv_join(mca_oob_tcp_component.tcp_static_ports, ',');
350 opal_show_help("help-oob-tcp.txt", "static-and-dynamic", true,
351 err, dyn_port_string);
352 free(err);
353 return ORTE_ERROR;
354 }
355 orte_util_parse_range_options(dyn_port_string, &mca_oob_tcp_component.tcp_dyn_ports);
356 if (0 == strcmp(mca_oob_tcp_component.tcp_dyn_ports[0], "-1")) {
357 opal_argv_free(mca_oob_tcp_component.tcp_dyn_ports);
358 mca_oob_tcp_component.tcp_dyn_ports = NULL;
359 }
360 } else {
361 mca_oob_tcp_component.tcp_dyn_ports = NULL;
362 }
363
364 #if OPAL_ENABLE_IPV6
365 dyn_port_string6 = NULL;
366 (void)mca_base_component_var_register(component, "dynamic_ipv6_ports",
367 "Range of ports to be dynamically used by daemons and procs (IPv6)",
368 MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0,
369 OPAL_INFO_LVL_4,
370 MCA_BASE_VAR_SCOPE_READONLY,
371 &dyn_port_string6);
372
373 if (NULL != dyn_port_string6) {
374
375 if (orte_static_ports) {
376 char *err4=NULL, *err6=NULL;
377 if (NULL != mca_oob_tcp_component.tcp_static_ports) {
378 err4 = opal_argv_join(mca_oob_tcp_component.tcp_static_ports, ',');
379 }
380 if (NULL != mca_oob_tcp_component.tcp6_static_ports) {
381 err6 = opal_argv_join(mca_oob_tcp_component.tcp6_static_ports, ',');
382 }
383 opal_show_help("help-oob-tcp.txt", "static-and-dynamic-ipv6", true,
384 (NULL == err4) ? "N/A" : err4,
385 (NULL == err6) ? "N/A" : err6,
386 dyn_port_string6);
387 if (NULL != err4) {
388 free(err4);
389 }
390 if (NULL != err6) {
391 free(err6);
392 }
393 return ORTE_ERROR;
394 }
395 orte_util_parse_range_options(dyn_port_string6, &mca_oob_tcp_component.tcp6_dyn_ports);
396 if (0 == strcmp(mca_oob_tcp_component.tcp6_dyn_ports[0], "-1")) {
397 opal_argv_free(mca_oob_tcp_component.tcp6_dyn_ports);
398 mca_oob_tcp_component.tcp6_dyn_ports = NULL;
399 }
400 } else {
401 mca_oob_tcp_component.tcp6_dyn_ports = NULL;
402 }
403 #endif
404
405 mca_oob_tcp_component.disable_ipv4_family = false;
406 (void)mca_base_component_var_register(component, "disable_ipv4_family",
407 "Disable the IPv4 interfaces",
408 MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
409 OPAL_INFO_LVL_4,
410 MCA_BASE_VAR_SCOPE_READONLY,
411 &mca_oob_tcp_component.disable_ipv4_family);
412
413 #if OPAL_ENABLE_IPV6
414 mca_oob_tcp_component.disable_ipv6_family = false;
415 (void)mca_base_component_var_register(component, "disable_ipv6_family",
416 "Disable the IPv6 interfaces",
417 MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
418 OPAL_INFO_LVL_4,
419 MCA_BASE_VAR_SCOPE_READONLY,
420 &mca_oob_tcp_component.disable_ipv6_family);
421 #endif
422
423
424 mca_oob_tcp_component.keepalive_time = 300;
425 (void)mca_base_component_var_register(component, "keepalive_time",
426 "Idle time in seconds before starting to send keepalives (keepalive_time <= 0 disables keepalive functionality)",
427 MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
428 OPAL_INFO_LVL_5,
429 MCA_BASE_VAR_SCOPE_READONLY,
430 &mca_oob_tcp_component.keepalive_time);
431
432
433 mca_oob_tcp_component.keepalive_intvl = 20;
434 (void)mca_base_component_var_register(component, "keepalive_intvl",
435 "Time between successive keepalive pings when peer has not responded, in seconds (ignored if keepalive_time <= 0)",
436 MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
437 OPAL_INFO_LVL_5,
438 MCA_BASE_VAR_SCOPE_READONLY,
439 &mca_oob_tcp_component.keepalive_intvl);
440
441
442 mca_oob_tcp_component.keepalive_probes = 9;
443 (void)mca_base_component_var_register(component, "keepalive_probes",
444 "Number of keepalives that can be missed before declaring error (ignored if keepalive_time <= 0)",
445 MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
446 OPAL_INFO_LVL_5,
447 MCA_BASE_VAR_SCOPE_READONLY,
448 &mca_oob_tcp_component.keepalive_probes);
449
450 mca_oob_tcp_component.retry_delay = 0;
451 (void)mca_base_component_var_register(component, "retry_delay",
452 "Time (in sec) to wait before trying to connect to peer again",
453 MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
454 OPAL_INFO_LVL_4,
455 MCA_BASE_VAR_SCOPE_READONLY,
456 &mca_oob_tcp_component.retry_delay);
457
458 mca_oob_tcp_component.max_recon_attempts = 10;
459 (void)mca_base_component_var_register(component, "max_recon_attempts",
460 "Max number of times to attempt connection before giving up (-1 -> never give up)",
461 MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
462 OPAL_INFO_LVL_4,
463 MCA_BASE_VAR_SCOPE_READONLY,
464 &mca_oob_tcp_component.max_recon_attempts);
465
466 return ORTE_SUCCESS;
467 }
468
469
470 static char **split_and_resolve(char **orig_str, char *name);
471
472 static int component_available(void)
473 {
474 int i, rc;
475 char **interfaces = NULL;
476 bool including = false, excluding = false;
477 char name[32];
478 struct sockaddr_storage my_ss;
479 int kindex;
480
481 opal_output_verbose(5, orte_oob_base_framework.framework_output,
482 "oob:tcp: component_available called");
483
484
485
486
487
488
489 if (NULL != mca_oob_tcp_component.if_include) {
490 interfaces = split_and_resolve(&mca_oob_tcp_component.if_include,
491 "include");
492 including = true;
493 excluding = false;
494 } else if (NULL != mca_oob_tcp_component.if_exclude) {
495 interfaces = split_and_resolve(&mca_oob_tcp_component.if_exclude,
496 "exclude");
497 including = false;
498 excluding = true;
499 }
500
501
502 for (i = opal_ifbegin(); i >= 0; i = opal_ifnext(i)) {
503 if (OPAL_SUCCESS != opal_ifindextoaddr(i, (struct sockaddr*) &my_ss,
504 sizeof (my_ss))) {
505 opal_output (0, "oob_tcp: problems getting address for index %i (kernel index %i)\n",
506 i, opal_ifindextokindex(i));
507 continue;
508 }
509
510 if (AF_INET != my_ss.ss_family
511 #if OPAL_ENABLE_IPV6
512 && AF_INET6 != my_ss.ss_family
513 #endif
514 ) {
515 continue;
516 }
517 kindex = opal_ifindextokindex(i);
518 if (kindex <= 0) {
519 continue;
520 }
521 opal_output_verbose(10, orte_oob_base_framework.framework_output,
522 "WORKING INTERFACE %d KERNEL INDEX %d FAMILY: %s", i, kindex,
523 (AF_INET == my_ss.ss_family) ? "V4" : "V6");
524
525
526 opal_ifindextoname(i, name, sizeof(name));
527
528
529 if (0 == strncmp(name, "vir", 3)) {
530 continue;
531 }
532
533
534 if (NULL != interfaces) {
535
536 rc = opal_ifmatches(kindex, interfaces);
537
538
539
540 if (OPAL_ERR_NETWORK_NOT_PARSEABLE == rc) {
541 orte_show_help("help-oob-tcp.txt", "not-parseable", true);
542 opal_argv_free(interfaces);
543 return ORTE_ERR_BAD_PARAM;
544 }
545
546 if (including) {
547 if (OPAL_SUCCESS != rc) {
548 opal_output_verbose(20, orte_oob_base_framework.framework_output,
549 "%s oob:tcp:init rejecting interface %s (not in include list)",
550 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), name);
551 continue;
552 }
553 } else {
554
555 if (OPAL_SUCCESS == rc) {
556 opal_output_verbose(20, orte_oob_base_framework.framework_output,
557 "%s oob:tcp:init rejecting interface %s (in exclude list)",
558 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), name);
559 continue;
560 }
561 }
562 } else {
563
564
565
566 if (1 < opal_ifcount() && opal_ifisloopback(i)) {
567 opal_output_verbose(20, orte_oob_base_framework.framework_output,
568 "%s oob:tcp:init rejecting loopback interface %s",
569 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), name);
570 continue;
571 }
572 }
573
574
575
576
577
578
579
580
581
582
583 if (AF_INET == my_ss.ss_family) {
584 opal_output_verbose(10, orte_oob_base_framework.framework_output,
585 "%s oob:tcp:init adding %s to our list of %s connections",
586 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
587 opal_net_get_hostname((struct sockaddr*) &my_ss),
588 (AF_INET == my_ss.ss_family) ? "V4" : "V6");
589 opal_argv_append_nosize(&mca_oob_tcp_component.ipv4conns, opal_net_get_hostname((struct sockaddr*) &my_ss));
590 } else if (AF_INET6 == my_ss.ss_family) {
591 #if OPAL_ENABLE_IPV6
592 opal_output_verbose(10, orte_oob_base_framework.framework_output,
593 "%s oob:tcp:init adding %s to our list of %s connections",
594 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
595 opal_net_get_hostname((struct sockaddr*) &my_ss),
596 (AF_INET == my_ss.ss_family) ? "V4" : "V6");
597 opal_argv_append_nosize(&mca_oob_tcp_component.ipv6conns, opal_net_get_hostname((struct sockaddr*) &my_ss));
598 #endif
599 } else {
600 opal_output_verbose(10, orte_oob_base_framework.framework_output,
601 "%s oob:tcp:init ignoring %s from out list of connections",
602 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
603 opal_net_get_hostname((struct sockaddr*) &my_ss));
604 }
605 }
606
607
608 if (NULL != interfaces) {
609 opal_argv_free(interfaces);
610 }
611
612 if (0 == opal_argv_count(mca_oob_tcp_component.ipv4conns)
613 #if OPAL_ENABLE_IPV6
614 && 0 == opal_argv_count(mca_oob_tcp_component.ipv6conns)
615 #endif
616 ) {
617 if (including) {
618 orte_show_help("help-oob-tcp.txt", "no-included-found", true, mca_oob_tcp_component.if_include);
619 } else if (excluding) {
620 orte_show_help("help-oob-tcp.txt", "excluded-all", true, mca_oob_tcp_component.if_exclude);
621 }
622 return ORTE_ERR_NOT_AVAILABLE;
623 }
624
625 return ORTE_SUCCESS;
626 }
627
628
629 static int component_startup(void)
630 {
631 int rc = ORTE_SUCCESS;
632 int i;
633 char *tmp;
634 opal_event_base_t *evb;
635
636 opal_output_verbose(2, orte_oob_base_framework.framework_output,
637 "%s TCP STARTUP",
638 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
639
640
641 if (0 == orte_oob_base.num_threads) {
642 opal_pointer_array_add(&mca_oob_tcp_component.ev_bases, orte_oob_base.ev_base);
643 } else {
644 for (i=0; i < orte_oob_base.num_threads; i++) {
645 opal_asprintf(&tmp, "OOB-TCP-%d", i);
646 evb = opal_progress_thread_init(tmp);
647 opal_pointer_array_add(&mca_oob_tcp_component.ev_bases, evb);
648 opal_argv_append_nosize(&mca_oob_tcp_component.ev_threads, tmp);
649 free(tmp);
650 }
651 }
652
653
654
655
656
657
658
659 if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_DAEMON ||
660 orte_standalone_operation) {
661 if (ORTE_SUCCESS != (rc = orte_oob_tcp_start_listening())) {
662 ORTE_ERROR_LOG(rc);
663 }
664 }
665
666 return rc;
667 }
668
669 static void component_shutdown(void)
670 {
671 mca_oob_tcp_peer_t *peer;
672 int i = 0, rc;
673 uint64_t key;
674 void *node;
675
676 opal_output_verbose(2, orte_oob_base_framework.framework_output,
677 "%s TCP SHUTDOWN",
678 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
679
680 if (0 < orte_oob_base.num_threads) {
681 for (i=0; i < orte_oob_base.num_threads; i++) {
682 opal_progress_thread_finalize(mca_oob_tcp_component.ev_threads[i]);
683 opal_pointer_array_set_item(&mca_oob_tcp_component.ev_bases, i, NULL);
684 }
685 opal_argv_free(mca_oob_tcp_component.ev_threads);
686 }
687
688 if (ORTE_PROC_IS_HNP && mca_oob_tcp_component.listen_thread_active) {
689 mca_oob_tcp_component.listen_thread_active = false;
690
691 write(mca_oob_tcp_component.stop_thread[1], &i, sizeof(int));
692 opal_thread_join(&mca_oob_tcp_component.listen_thread, NULL);
693 } else {
694 opal_output_verbose(2, orte_oob_base_framework.framework_output,
695 "no hnp or not active");
696 }
697
698
699 rc = opal_hash_table_get_first_key_uint64(&mca_oob_tcp_component.peers, &key,
700 (void **)&peer, &node);
701 while (OPAL_SUCCESS == rc) {
702 if (NULL != peer) {
703 OBJ_RELEASE(peer);
704 rc = opal_hash_table_set_value_uint64(&mca_oob_tcp_component.peers, key, NULL);
705 if (OPAL_SUCCESS != rc) {
706 ORTE_ERROR_LOG(rc);
707 }
708 }
709 rc = opal_hash_table_get_next_key_uint64(&mca_oob_tcp_component.peers, &key,
710 (void **) &peer, node, &node);
711 }
712
713 opal_output_verbose(2, orte_oob_base_framework.framework_output,
714 "%s TCP SHUTDOWN done",
715 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
716 }
717
718 static int component_send(orte_rml_send_t *msg)
719 {
720 opal_output_verbose(5, orte_oob_base_framework.framework_output,
721 "%s oob:tcp:send_nb to peer %s:%d seq = %d",
722 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
723 ORTE_NAME_PRINT(&msg->dst), msg->tag, msg->seq_num );
724
725
726
727
728
729
730
731
732 mca_oob_tcp_module.send_nb(msg);
733 return ORTE_SUCCESS;
734 }
735
736 static char* component_get_addr(void)
737 {
738 char *cptr=NULL, *tmp, *tp;
739
740 if (!mca_oob_tcp_component.disable_ipv4_family &&
741 NULL != mca_oob_tcp_component.ipv4conns) {
742 tmp = opal_argv_join(mca_oob_tcp_component.ipv4conns, ',');
743 tp = opal_argv_join(mca_oob_tcp_component.ipv4ports, ',');
744 opal_asprintf(&cptr, "tcp://%s:%s", tmp, tp);
745 free(tmp);
746 free(tp);
747 }
748 #if OPAL_ENABLE_IPV6
749 if (!mca_oob_tcp_component.disable_ipv6_family &&
750 NULL != mca_oob_tcp_component.ipv6conns) {
751 char *tmp2;
752
753
754
755
756
757
758
759
760
761
762
763
764 tmp = opal_argv_join(mca_oob_tcp_component.ipv6conns, ',');
765 tp = opal_argv_join(mca_oob_tcp_component.ipv6ports, ',');
766 if (NULL == cptr) {
767
768 opal_asprintf(&cptr, "tcp6://[%s]:%s", tmp, tp);
769 } else {
770 opal_asprintf(&tmp2, "%s;tcp6://[%s]:%s", cptr, tmp, tp);
771 free(cptr);
772 cptr = tmp2;
773 }
774 free(tmp);
775 free(tp);
776 }
777 #endif
778
779
780 return cptr;
781 }
782
783
784
785 static int parse_uri(const uint16_t af_family,
786 const char* host,
787 const char *port,
788 struct sockaddr_storage* inaddr)
789 {
790 struct sockaddr_in *in;
791
792 if (AF_INET == af_family) {
793 memset(inaddr, 0, sizeof(struct sockaddr_in));
794 in = (struct sockaddr_in*) inaddr;
795 in->sin_family = AF_INET;
796 in->sin_addr.s_addr = inet_addr(host);
797 if (in->sin_addr.s_addr == INADDR_NONE) {
798 return ORTE_ERR_BAD_PARAM;
799 }
800 ((struct sockaddr_in*) inaddr)->sin_port = htons(atoi(port));
801 }
802 #if OPAL_ENABLE_IPV6
803 else if (AF_INET6 == af_family) {
804 struct sockaddr_in6 *in6;
805 memset(inaddr, 0, sizeof(struct sockaddr_in6));
806 in6 = (struct sockaddr_in6*) inaddr;
807
808 if (0 == inet_pton(AF_INET6, host, (void*)&in6->sin6_addr)) {
809 opal_output (0, "oob_tcp_parse_uri: Could not convert %s\n", host);
810 return ORTE_ERR_BAD_PARAM;
811 }
812 in6->sin6_family = AF_INET6;
813 in6->sin6_port = htons(atoi(port));
814 }
815 #endif
816 else {
817 return ORTE_ERR_NOT_SUPPORTED;
818 }
819 return ORTE_SUCCESS;
820 }
821
822 static int component_set_addr(orte_process_name_t *peer,
823 char **uris)
824 {
825 char **addrs, *hptr;
826 char *tcpuri=NULL, *host, *ports;
827 int i, j, rc;
828 uint16_t af_family = AF_UNSPEC;
829 uint64_t ui64;
830 bool found;
831 mca_oob_tcp_peer_t *pr;
832 mca_oob_tcp_addr_t *maddr;
833
834 memcpy(&ui64, (char*)peer, sizeof(uint64_t));
835
836 found = false;
837
838 for (i=0; NULL != uris[i]; i++) {
839 tcpuri = strdup(uris[i]);
840 if (NULL == tcpuri) {
841 opal_output_verbose(2, orte_oob_base_framework.framework_output,
842 "%s oob:tcp: out of memory",
843 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
844 continue;
845 }
846 if (0 == strncmp(uris[i], "tcp:", 4)) {
847 af_family = AF_INET;
848 host = tcpuri + strlen("tcp://");
849 } else if (0 == strncmp(uris[i], "tcp6:", 5)) {
850 #if OPAL_ENABLE_IPV6
851 af_family = AF_INET6;
852 host = tcpuri + strlen("tcp6://");
853 #else
854
855 opal_output_verbose(2, orte_oob_base_framework.framework_output,
856 "%s oob:tcp: address %s not supported",
857 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), uris[i]);
858 free(tcpuri);
859 continue;
860 #endif
861 } else {
862
863 opal_output_verbose(2, orte_oob_base_framework.framework_output,
864 "%s oob:tcp: ignoring address %s",
865 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), uris[i]);
866 free(tcpuri);
867 continue;
868 }
869
870
871 opal_output_verbose(2, orte_oob_base_framework.framework_output,
872 "%s oob:tcp: working peer %s address %s",
873 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
874 ORTE_NAME_PRINT(peer), uris[i]);
875
876 ports = strrchr(tcpuri, ':');
877 if (NULL == ports) {
878 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
879 free(tcpuri);
880 continue;
881 }
882 *ports = '\0';
883 ports++;
884
885
886
887
888
889
890 hptr = host;
891 #if OPAL_ENABLE_IPV6
892 if (AF_INET6 == af_family) {
893 if ('[' == host[0]) {
894 hptr = &host[1];
895 }
896 if (']' == host[strlen(host)-1]) {
897 host[strlen(host)-1] = '\0';
898 }
899 }
900 #endif
901 addrs = opal_argv_split(hptr, ',');
902
903
904
905 for (j=0; NULL != addrs[j]; j++) {
906
907 if (0 == strcasecmp(addrs[j], "localhost")) {
908 #if OPAL_ENABLE_IPV6
909 if (AF_INET6 == af_family) {
910 if (NULL == mca_oob_tcp_component.ipv6conns ||
911 NULL == mca_oob_tcp_component.ipv6conns[0]) {
912 continue;
913 }
914 host = mca_oob_tcp_component.ipv6conns[0];
915 } else {
916 #endif
917 if (NULL == mca_oob_tcp_component.ipv4conns ||
918 NULL == mca_oob_tcp_component.ipv4conns[0]) {
919 continue;
920 }
921 host = mca_oob_tcp_component.ipv4conns[0];
922 #if OPAL_ENABLE_IPV6
923 }
924 #endif
925 } else {
926 host = addrs[j];
927 }
928
929 if (NULL == (pr = mca_oob_tcp_peer_lookup(peer))) {
930 pr = OBJ_NEW(mca_oob_tcp_peer_t);
931 pr->name.jobid = peer->jobid;
932 pr->name.vpid = peer->vpid;
933 opal_output_verbose(20, orte_oob_base_framework.framework_output,
934 "%s SET_PEER ADDING PEER %s",
935 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
936 ORTE_NAME_PRINT(peer));
937 if (OPAL_SUCCESS != opal_hash_table_set_value_uint64(&mca_oob_tcp_component.peers, ui64, pr)) {
938 OBJ_RELEASE(pr);
939 return ORTE_ERR_TAKE_NEXT_OPTION;
940 }
941 }
942
943 maddr = OBJ_NEW(mca_oob_tcp_addr_t);
944 ((struct sockaddr_storage*) &(maddr->addr))->ss_family = af_family;
945 if (ORTE_SUCCESS != (rc = parse_uri(af_family, host, ports, (struct sockaddr_storage*) &(maddr->addr)))) {
946 ORTE_ERROR_LOG(rc);
947 OBJ_RELEASE(maddr);
948 rc = opal_hash_table_set_value_uint64(&mca_oob_tcp_component.peers, ui64, NULL);
949 if (ORTE_SUCCESS != rc) {
950 ORTE_ERROR_LOG(rc);
951 }
952 OBJ_RELEASE(pr);
953 return ORTE_ERR_TAKE_NEXT_OPTION;
954 }
955
956 opal_output_verbose(20, orte_oob_base_framework.framework_output,
957 "%s set_peer: peer %s is listening on net %s port %s",
958 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
959 ORTE_NAME_PRINT(peer),
960 (NULL == host) ? "NULL" : host,
961 (NULL == ports) ? "NULL" : ports);
962 opal_list_append(&pr->addrs, &maddr->super);
963
964 found = true;
965 }
966 opal_argv_free(addrs);
967 free(tcpuri);
968 }
969 if (found) {
970
971 return ORTE_SUCCESS;
972 }
973
974
975 return ORTE_ERR_TAKE_NEXT_OPTION;
976 }
977
978 static bool component_is_reachable(orte_process_name_t *peer)
979 {
980 orte_process_name_t hop;
981
982
983 hop = orte_routed.get_route(peer);
984 if (ORTE_JOBID_INVALID == hop.jobid ||
985 ORTE_VPID_INVALID == hop.vpid) {
986 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
987 "%s is NOT reachable by TCP",
988 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
989 return false;
990 }
991
992
993 return true;
994 }
995
996 #if OPAL_ENABLE_FT_CR == 1
997 static int component_ft_event(int state)
998 {
999 opal_output_verbose(2, orte_oob_base_framework.framework_output,
1000 "%s TCP FT EVENT", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
1001
1002
1003 if (NULL != mca_oob_tcp_module.api.ft_event) {
1004 mca_oob_tcp_module.api.ft_event(state);
1005 }
1006
1007 return ORTE_SUCCESS;
1008 }
1009 #endif
1010
1011 void mca_oob_tcp_component_set_module(int fd, short args, void *cbdata)
1012 {
1013 mca_oob_tcp_peer_op_t *pop = (mca_oob_tcp_peer_op_t*)cbdata;
1014 uint64_t ui64;
1015 int rc;
1016 orte_oob_base_peer_t *bpr;
1017
1018 ORTE_ACQUIRE_OBJECT(pop);
1019
1020 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
1021 "%s tcp:set_module called for peer %s",
1022 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1023 ORTE_NAME_PRINT(&pop->peer));
1024
1025
1026
1027
1028
1029 memcpy(&ui64, (char*)&pop->peer, sizeof(uint64_t));
1030 if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_oob_base.peers,
1031 ui64, (void**)&bpr) || NULL == bpr) {
1032 bpr = OBJ_NEW(orte_oob_base_peer_t);
1033 }
1034 opal_bitmap_set_bit(&bpr->addressable, mca_oob_tcp_component.super.idx);
1035 bpr->component = &mca_oob_tcp_component.super;
1036 if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&orte_oob_base.peers,
1037 ui64, bpr))) {
1038 ORTE_ERROR_LOG(rc);
1039 }
1040
1041 OBJ_RELEASE(pop);
1042 }
1043
1044 void mca_oob_tcp_component_lost_connection(int fd, short args, void *cbdata)
1045 {
1046 mca_oob_tcp_peer_op_t *pop = (mca_oob_tcp_peer_op_t*)cbdata;
1047 uint64_t ui64;
1048 orte_oob_base_peer_t *bpr;
1049 int rc;
1050
1051 ORTE_ACQUIRE_OBJECT(pop);
1052
1053 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
1054 "%s tcp:lost connection called for peer %s",
1055 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1056 ORTE_NAME_PRINT(&pop->peer));
1057
1058
1059 memcpy(&ui64, (char*)&pop->peer, sizeof(uint64_t));
1060 if (OPAL_SUCCESS == opal_hash_table_get_value_uint64(&orte_oob_base.peers,
1061 ui64, (void**)&bpr) && NULL != bpr) {
1062 opal_bitmap_clear_bit(&bpr->addressable, mca_oob_tcp_component.super.idx);
1063 OBJ_RELEASE(bpr);
1064 }
1065 if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&orte_oob_base.peers,
1066 ui64, NULL))) {
1067 ORTE_ERROR_LOG(rc);
1068 }
1069
1070 if (!orte_finalizing) {
1071
1072 if (ORTE_SUCCESS != orte_routed.route_lost(&pop->peer)) {
1073 ORTE_ACTIVATE_PROC_STATE(&pop->peer, ORTE_PROC_STATE_LIFELINE_LOST);
1074 } else {
1075 ORTE_ACTIVATE_PROC_STATE(&pop->peer, ORTE_PROC_STATE_COMM_FAILED);
1076 }
1077 }
1078 OBJ_RELEASE(pop);
1079 }
1080
1081 void mca_oob_tcp_component_no_route(int fd, short args, void *cbdata)
1082 {
1083 mca_oob_tcp_msg_error_t *mop = (mca_oob_tcp_msg_error_t*)cbdata;
1084 uint64_t ui64;
1085 int rc;
1086 orte_oob_base_peer_t *bpr;
1087
1088 ORTE_ACQUIRE_OBJECT(mop);
1089
1090 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
1091 "%s tcp:no route called for peer %s",
1092 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1093 ORTE_NAME_PRINT(&mop->hop));
1094
1095
1096 memcpy(&ui64, (char*)&(mop->hop), sizeof(uint64_t));
1097 if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_oob_base.peers,
1098 ui64, (void**)&bpr) || NULL == bpr) {
1099 bpr = OBJ_NEW(orte_oob_base_peer_t);
1100 }
1101 opal_bitmap_clear_bit(&bpr->addressable, mca_oob_tcp_component.super.idx);
1102 if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&orte_oob_base.peers,
1103 ui64, NULL))) {
1104 ORTE_ERROR_LOG(rc);
1105 }
1106
1107
1108
1109
1110 mop->rmsg->retries++;
1111
1112 ORTE_OOB_SEND(mop->rmsg);
1113
1114 OBJ_RELEASE(mop);
1115 }
1116
1117 void mca_oob_tcp_component_hop_unknown(int fd, short args, void *cbdata)
1118 {
1119 mca_oob_tcp_msg_error_t *mop = (mca_oob_tcp_msg_error_t*)cbdata;
1120 uint64_t ui64;
1121 orte_rml_send_t *snd;
1122 orte_oob_base_peer_t *bpr;
1123
1124 ORTE_ACQUIRE_OBJECT(mop);
1125
1126 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
1127 "%s tcp:unknown hop called for peer %s",
1128 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1129 ORTE_NAME_PRINT(&mop->hop));
1130
1131 if (orte_finalizing || orte_abnormal_term_ordered) {
1132
1133 OBJ_RELEASE(mop);
1134 return;
1135 }
1136
1137
1138 memcpy(&ui64, (char*)&(mop->hop), sizeof(uint64_t));
1139 if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_oob_base.peers,
1140 ui64, (void**)&bpr) ||
1141 NULL == bpr) {
1142
1143
1144
1145
1146
1147
1148 opal_output(0, "%s ERROR: message to %s requires routing and the OOB has no knowledge of the reqd hop %s",
1149 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1150 ORTE_NAME_PRINT(&mop->snd->hdr.dst),
1151 ORTE_NAME_PRINT(&mop->hop));
1152 ORTE_ACTIVATE_PROC_STATE(&mop->hop, ORTE_PROC_STATE_UNABLE_TO_SEND_MSG);
1153 OBJ_RELEASE(mop);
1154 return;
1155 }
1156 opal_bitmap_clear_bit(&bpr->addressable, mca_oob_tcp_component.super.idx);
1157
1158
1159 memcpy(&ui64, (char*)&(mop->snd->hdr.dst), sizeof(uint64_t));
1160 if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_oob_base.peers,
1161 ui64, (void**)&bpr) ||
1162 NULL == bpr) {
1163 opal_output(0, "%s ERROR: message to %s requires routing and the OOB has no knowledge of this process",
1164 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1165 ORTE_NAME_PRINT(&mop->snd->hdr.dst));
1166 ORTE_ACTIVATE_PROC_STATE(&mop->hop, ORTE_PROC_STATE_UNABLE_TO_SEND_MSG);
1167 OBJ_RELEASE(mop);
1168 return;
1169 }
1170 opal_bitmap_clear_bit(&bpr->addressable, mca_oob_tcp_component.super.idx);
1171
1172
1173
1174
1175 MCA_OOB_TCP_HDR_NTOH(&mop->snd->hdr);
1176 snd = OBJ_NEW(orte_rml_send_t);
1177 snd->retries = mop->rmsg->retries + 1;
1178 snd->dst = mop->snd->hdr.dst;
1179 snd->origin = mop->snd->hdr.origin;
1180 snd->tag = mop->snd->hdr.tag;
1181 snd->seq_num = mop->snd->hdr.seq_num;
1182 snd->data = mop->snd->data;
1183 snd->count = mop->snd->hdr.nbytes;
1184 snd->cbfunc.iov = NULL;
1185 snd->cbdata = NULL;
1186
1187 ORTE_OOB_SEND(snd);
1188
1189 mop->snd->data = NULL;
1190
1191 OBJ_RELEASE(mop);
1192 }
1193
1194 void mca_oob_tcp_component_failed_to_connect(int fd, short args, void *cbdata)
1195 {
1196 mca_oob_tcp_peer_op_t *pop = (mca_oob_tcp_peer_op_t*)cbdata;
1197
1198 ORTE_ACQUIRE_OBJECT(pop);
1199
1200 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
1201 "%s tcp:failed_to_connect called for peer %s",
1202 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1203 ORTE_NAME_PRINT(&pop->peer));
1204
1205
1206 if (orte_orteds_term_ordered || orte_finalizing || orte_abnormal_term_ordered) {
1207 OBJ_RELEASE(pop);
1208 return;
1209 }
1210
1211
1212 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
1213 "%s tcp:failed_to_connect unable to reach peer %s",
1214 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1215 ORTE_NAME_PRINT(&pop->peer));
1216
1217 ORTE_ACTIVATE_PROC_STATE(&pop->peer, ORTE_PROC_STATE_FAILED_TO_CONNECT);
1218 OBJ_RELEASE(pop);
1219 }
1220
1221
1222
1223
1224
1225
1226 static char **split_and_resolve(char **orig_str, char *name)
1227 {
1228 int i, ret, save, if_index;
1229 char **argv, *str, *tmp;
1230 char if_name[IF_NAMESIZE];
1231 struct sockaddr_storage argv_inaddr, if_inaddr;
1232 uint32_t argv_prefix;
1233
1234
1235 if (NULL == orig_str || NULL == *orig_str) {
1236 return NULL;
1237 }
1238
1239 argv = opal_argv_split(*orig_str, ',');
1240 if (NULL == argv) {
1241 return NULL;
1242 }
1243 for (save = i = 0; NULL != argv[i]; ++i) {
1244 if (isalpha(argv[i][0])) {
1245 argv[save++] = argv[i];
1246 continue;
1247 }
1248
1249
1250
1251 argv_prefix = 0;
1252 tmp = strdup(argv[i]);
1253 str = strchr(argv[i], '/');
1254 if (NULL == str) {
1255 orte_show_help("help-oob-tcp.txt", "invalid if_inexclude",
1256 true, name, orte_process_info.nodename,
1257 tmp, "Invalid specification (missing \"/\")");
1258 free(argv[i]);
1259 free(tmp);
1260 continue;
1261 }
1262 *str = '\0';
1263 argv_prefix = atoi(str + 1);
1264
1265
1266 ((struct sockaddr*) &argv_inaddr)->sa_family = AF_INET;
1267 ret = inet_pton(AF_INET, argv[i],
1268 &((struct sockaddr_in*) &argv_inaddr)->sin_addr);
1269 free(argv[i]);
1270
1271 if (1 != ret) {
1272 orte_show_help("help-oob-tcp.txt", "invalid if_inexclude",
1273 true, name, orte_process_info.nodename, tmp,
1274 "Invalid specification (inet_pton() failed)");
1275 free(tmp);
1276 continue;
1277 }
1278 opal_output_verbose(20, orte_oob_base_framework.framework_output,
1279 "%s oob:tcp: Searching for %s address+prefix: %s / %u",
1280 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1281 name,
1282 opal_net_get_hostname((struct sockaddr*) &argv_inaddr),
1283 argv_prefix);
1284
1285
1286 for (if_index = opal_ifbegin(); if_index >= 0;
1287 if_index = opal_ifnext(if_index)) {
1288 opal_ifindextoaddr(if_index,
1289 (struct sockaddr*) &if_inaddr,
1290 sizeof(if_inaddr));
1291 if (opal_net_samenetwork((struct sockaddr*) &argv_inaddr,
1292 (struct sockaddr*) &if_inaddr,
1293 argv_prefix)) {
1294 break;
1295 }
1296 }
1297
1298 if (if_index < 0) {
1299 orte_show_help("help-oob-tcp.txt", "invalid if_inexclude",
1300 true, name, orte_process_info.nodename, tmp,
1301 "Did not find interface matching this subnet");
1302 free(tmp);
1303 continue;
1304 }
1305
1306
1307
1308 opal_ifindextoname(if_index, if_name, sizeof(if_name));
1309 opal_output_verbose(20, orte_oob_base_framework.framework_output,
1310 "%s oob:tcp: Found match: %s (%s)",
1311 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1312 opal_net_get_hostname((struct sockaddr*) &if_inaddr),
1313 if_name);
1314 argv[save++] = strdup(if_name);
1315 free(tmp);
1316 }
1317
1318
1319
1320 argv[save] = NULL;
1321 free(*orig_str);
1322 *orig_str = opal_argv_join(argv, ',');
1323 return argv;
1324 }
1325
1326
1327
1328 static void peer_cons(mca_oob_tcp_peer_t *peer)
1329 {
1330 peer->ev_base = NULL;
1331 peer->auth_method = NULL;
1332 peer->sd = -1;
1333 OBJ_CONSTRUCT(&peer->addrs, opal_list_t);
1334 peer->active_addr = NULL;
1335 peer->state = MCA_OOB_TCP_UNCONNECTED;
1336 peer->num_retries = 0;
1337 OBJ_CONSTRUCT(&peer->send_queue, opal_list_t);
1338 peer->send_msg = NULL;
1339 peer->recv_msg = NULL;
1340 peer->send_ev_active = false;
1341 peer->recv_ev_active = false;
1342 peer->timer_ev_active = false;
1343 }
1344 static void peer_des(mca_oob_tcp_peer_t *peer)
1345 {
1346 if (NULL != peer->auth_method) {
1347 free(peer->auth_method);
1348 }
1349 if (peer->send_ev_active) {
1350 opal_event_del(&peer->send_event);
1351 }
1352 if (peer->recv_ev_active) {
1353 opal_event_del(&peer->recv_event);
1354 }
1355 if (peer->timer_ev_active) {
1356 opal_event_del(&peer->timer_event);
1357 }
1358 if (0 <= peer->sd) {
1359 opal_output_verbose(2, orte_oob_base_framework.framework_output,
1360 "%s CLOSING SOCKET %d",
1361 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1362 peer->sd);
1363 CLOSE_THE_SOCKET(peer->sd);
1364 }
1365 OPAL_LIST_DESTRUCT(&peer->addrs);
1366 OPAL_LIST_DESTRUCT(&peer->send_queue);
1367 }
1368 OBJ_CLASS_INSTANCE(mca_oob_tcp_peer_t,
1369 opal_list_item_t,
1370 peer_cons, peer_des);
1371
1372 static void padd_cons(mca_oob_tcp_addr_t *ptr)
1373 {
1374 memset(&ptr->addr, 0, sizeof(ptr->addr));
1375 ptr->retries = 0;
1376 ptr->state = MCA_OOB_TCP_UNCONNECTED;
1377 }
1378 OBJ_CLASS_INSTANCE(mca_oob_tcp_addr_t,
1379 opal_list_item_t,
1380 padd_cons, NULL);
1381
1382
1383 static void pop_cons(mca_oob_tcp_peer_op_t *pop)
1384 {
1385 pop->net = NULL;
1386 pop->port = NULL;
1387 }
1388 static void pop_des(mca_oob_tcp_peer_op_t *pop)
1389 {
1390 if (NULL != pop->net) {
1391 free(pop->net);
1392 }
1393 if (NULL != pop->port) {
1394 free(pop->port);
1395 }
1396 }
1397 OBJ_CLASS_INSTANCE(mca_oob_tcp_peer_op_t,
1398 opal_object_t,
1399 pop_cons, pop_des);
1400
1401 OBJ_CLASS_INSTANCE(mca_oob_tcp_msg_op_t,
1402 opal_object_t,
1403 NULL, NULL);
1404
1405 OBJ_CLASS_INSTANCE(mca_oob_tcp_conn_op_t,
1406 opal_object_t,
1407 NULL, NULL);
1408
1409 static void nicaddr_cons(mca_oob_tcp_nicaddr_t *ptr)
1410 {
1411 ptr->af_family = PF_UNSPEC;
1412 memset(&ptr->addr, 0, sizeof(ptr->addr));
1413 }
1414 OBJ_CLASS_INSTANCE(mca_oob_tcp_nicaddr_t,
1415 opal_list_item_t,
1416 nicaddr_cons, NULL);