This source file includes following definitions.
- mca_btl_tcp2_param_register_string
- mca_btl_tcp2_param_register_int
- mca_btl_tcp2_event_construct
- mca_btl_tcp2_event_destruct
- mca_btl_tcp2_component_open
- mca_btl_tcp2_component_close
- mca_btl_tcp2_create
- split_and_resolve
- mca_btl_tcp2_component_create_instances
- mca_btl_tcp2_component_create_listen
- mca_btl_tcp2_component_exchange
- mca_btl_tcp2_component_init
- mca_btl_tcp2_component_control
- mca_btl_tcp2_component_accept_handler
- mca_btl_tcp2_component_recv_handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 #include "ompi_config.h"
25
26 #include "opal/opal_socket_errno.h"
27 #ifdef HAVE_UNISTD_H
28 #include <unistd.h>
29 #endif
30 #include <string.h>
31 #include <fcntl.h>
32 #ifdef HAVE_SYS_TYPES_H
33 #include <sys/types.h>
34 #endif
35 #ifdef HAVE_SYS_SOCKET_H
36 #include <sys/socket.h>
37 #endif
38 #ifdef HAVE_NETINET_IN_H
39 #include <netinet/in.h>
40 #endif
41 #ifdef HAVE_ARPA_INET_H
42 #include <arpa/inet.h>
43 #endif
44 #if OPAL_ENABLE_IPV6
45 # ifdef HAVE_NETDB_H
46 # include <netdb.h>
47 # endif
48 #endif
49 #include <ctype.h>
50 #include <limits.h>
51
52 #include "ompi/constants.h"
53 #include "opal/mca/event/event.h"
54 #include "opal/util/if.h"
55 #include "opal/util/output.h"
56 #include "opal/util/argv.h"
57 #include "opal/util/net.h"
58 #include "opal/util/opal_sos.h"
59 #include "opal/util/printf.h"
60
61 #include "orte/types.h"
62 #include "orte/util/show_help.h"
63
64 #include "ompi/mca/btl/btl.h"
65 #include "opal/mca/base/mca_base_param.h"
66 #include "ompi/runtime/ompi_module_exchange.h"
67 #include "ompi/mca/mpool/base/base.h"
68 #include "ompi/mca/btl/base/btl_base_error.h"
69 #include "btl_tcp2.h"
70 #include "btl_tcp2_addr.h"
71 #include "btl_tcp2_proc.h"
72 #include "btl_tcp2_frag.h"
73 #include "btl_tcp2_endpoint.h"
74 #include "ompi/mca/btl/base/base.h"
75
76
77 mca_btl_tcp2_component_t mca_btl_tcp2_component = {
78 {
79
80
81
82 {
83 MCA_BTL_BASE_VERSION_2_0_0,
84
85 "tcp2",
86 OMPI_MAJOR_VERSION,
87 OMPI_MINOR_VERSION,
88 OMPI_RELEASE_VERSION,
89 mca_btl_tcp2_component_open,
90 mca_btl_tcp2_component_close
91 },
92 {
93
94 MCA_BASE_METADATA_PARAM_CHECKPOINT
95 },
96
97 mca_btl_tcp2_component_init,
98 NULL,
99 }
100 };
101
102
103
104
105
106 static inline char* mca_btl_tcp2_param_register_string(
107 const char* param_name,
108 const char* help_string,
109 const char* default_value)
110 {
111 char *value;
112 mca_base_param_reg_string(&mca_btl_tcp2_component.super.btl_version,
113 param_name, help_string, false, false,
114 default_value, &value);
115 return value;
116 }
117
118 static inline int mca_btl_tcp2_param_register_int(
119 const char* param_name,
120 const char* help_string,
121 int default_value)
122 {
123 int value;
124 mca_base_param_reg_int(&mca_btl_tcp2_component.super.btl_version,
125 param_name, help_string, false, false,
126 default_value, &value);
127 return value;
128 }
129
130
131
132
133
134
135 struct mca_btl_tcp2_event_t {
136 opal_list_item_t item;
137 opal_event_t event;
138 };
139 typedef struct mca_btl_tcp2_event_t mca_btl_tcp2_event_t;
140
141 static void mca_btl_tcp2_event_construct(mca_btl_tcp2_event_t* event)
142 {
143 OPAL_THREAD_LOCK(&mca_btl_tcp2_component.tcp_lock);
144 opal_list_append(&mca_btl_tcp2_component.tcp_events, &event->item);
145 OPAL_THREAD_UNLOCK(&mca_btl_tcp2_component.tcp_lock);
146 }
147
148 static void mca_btl_tcp2_event_destruct(mca_btl_tcp2_event_t* event)
149 {
150 OPAL_THREAD_LOCK(&mca_btl_tcp2_component.tcp_lock);
151 opal_list_remove_item(&mca_btl_tcp2_component.tcp_events, &event->item);
152 OPAL_THREAD_UNLOCK(&mca_btl_tcp2_component.tcp_lock);
153 }
154
155 OBJ_CLASS_INSTANCE(
156 mca_btl_tcp2_event_t,
157 opal_list_item_t,
158 mca_btl_tcp2_event_construct,
159 mca_btl_tcp2_event_destruct);
160
161
162
163
164
165 static void mca_btl_tcp2_component_recv_handler(int, short, void*);
166 static void mca_btl_tcp2_component_accept_handler(int, short, void*);
167
168
169
170
171
172
173
174 int mca_btl_tcp2_component_open(void)
175 {
176 char* message;
177
178
179 mca_btl_tcp2_component.tcp_listen_sd = -1;
180 #if OPAL_ENABLE_IPV6
181 mca_btl_tcp2_component.tcp6_listen_sd = -1;
182 #endif
183 mca_btl_tcp2_component.tcp_num_btls=0;
184 mca_btl_tcp2_component.tcp_addr_count = 0;
185 mca_btl_tcp2_component.tcp_btls=NULL;
186
187
188 OBJ_CONSTRUCT(&mca_btl_tcp2_component.tcp_lock, opal_mutex_t);
189 OBJ_CONSTRUCT(&mca_btl_tcp2_component.tcp_procs, opal_hash_table_t);
190 OBJ_CONSTRUCT(&mca_btl_tcp2_component.tcp_events, opal_list_t);
191 OBJ_CONSTRUCT(&mca_btl_tcp2_component.tcp_frag_eager, ompi_free_list_t);
192 OBJ_CONSTRUCT(&mca_btl_tcp2_component.tcp_frag_max, ompi_free_list_t);
193 OBJ_CONSTRUCT(&mca_btl_tcp2_component.tcp_frag_user, ompi_free_list_t);
194 opal_hash_table_init(&mca_btl_tcp2_component.tcp_procs, 256);
195
196
197 mca_btl_tcp2_component.tcp_num_links =
198 mca_btl_tcp2_param_register_int("links", NULL, 1);
199 mca_btl_tcp2_component.tcp_if_include =
200 mca_btl_tcp2_param_register_string("if_include", "Comma-delimited list of devices or CIDR notation of networks to use for MPI communication (e.g., \"eth0,eth1\" or \"192.168.0.0/16,10.1.4.0/24\"). Mutually exclusive with btl_tcp2_if_exclude.", "");
201 mca_btl_tcp2_component.tcp_if_exclude =
202 mca_btl_tcp2_param_register_string("if_exclude", "Comma-delimited list of devices or CIDR notation of networks to NOT use for MPI communication -- all devices not matching these specifications will be used (e.g., \"eth0,eth1\" or \"192.168.0.0/16,10.1.4.0/24\"). Mutually exclusive with btl_tcp2_if_include.", "lo,sppp");
203 mca_btl_tcp2_component.tcp_free_list_num =
204 mca_btl_tcp2_param_register_int ("free_list_num", NULL, 8);
205 mca_btl_tcp2_component.tcp_free_list_max =
206 mca_btl_tcp2_param_register_int ("free_list_max", NULL, -1);
207 mca_btl_tcp2_component.tcp_free_list_inc =
208 mca_btl_tcp2_param_register_int ("free_list_inc", NULL, 32);
209 mca_btl_tcp2_component.tcp_sndbuf =
210 mca_btl_tcp2_param_register_int ("sndbuf", NULL, 128*1024);
211 mca_btl_tcp2_component.tcp_rcvbuf =
212 mca_btl_tcp2_param_register_int ("rcvbuf", NULL, 128*1024);
213 mca_btl_tcp2_component.tcp_endpoint_cache =
214 mca_btl_tcp2_param_register_int ("endpoint_cache",
215 "The size of the internal cache for each TCP connection. This cache is"
216 " used to reduce the number of syscalls, by replacing them with memcpy."
217 " Every read will read the expected data plus the amount of the"
218 " endpoint_cache", 30*1024);
219 mca_btl_tcp2_component.tcp_use_nodelay =
220 !mca_btl_tcp2_param_register_int ("use_nagle", "Whether to use Nagle's algorithm or not (using Nagle's algorithm may increase short message latency)", 0);
221 mca_btl_tcp2_component.tcp_port_min =
222 mca_btl_tcp2_param_register_int( "port_min_v4",
223 "The minimum port where the TCP BTL will try to bind (default 1024)", 1024 );
224 if( mca_btl_tcp2_component.tcp_port_min > USHRT_MAX ) {
225 orte_show_help("help-mpi-btl-tcp2.txt", "invalid minimum port",
226 true, "v4", orte_process_info.nodename,
227 mca_btl_tcp2_component.tcp_port_min );
228 mca_btl_tcp2_component.tcp_port_min = 1024;
229 }
230 opal_asprintf( &message,
231 "The number of ports where the TCP BTL will try to bind (default %d)."
232 " This parameter together with the port min, define a range of ports"
233 " where Open MPI will open sockets.",
234 (0x1 << 16) - mca_btl_tcp2_component.tcp_port_min - 1 );
235 mca_btl_tcp2_component.tcp_port_range =
236 mca_btl_tcp2_param_register_int( "port_range_v4", message,
237 (0x1 << 16) - mca_btl_tcp2_component.tcp_port_min - 1);
238 free(message);
239 #if OPAL_ENABLE_IPV6
240 mca_btl_tcp2_component.tcp6_port_min =
241 mca_btl_tcp2_param_register_int( "port_min_v6",
242 "The minimum port where the TCP BTL will try to bind (default 1024)", 1024 );
243 if( mca_btl_tcp2_component.tcp6_port_min > USHRT_MAX ) {
244 orte_show_help("help-mpi-btl-tcp2.txt", "invalid minimum port",
245 true, "v6", orte_process_info.nodename,
246 mca_btl_tcp2_component.tcp6_port_min );
247 mca_btl_tcp2_component.tcp6_port_min = 1024;
248 }
249 opal_asprintf( &message,
250 "The number of ports where the TCP BTL will try to bind (default %d)."
251 " This parameter together with the port min, define a range of ports"
252 " where Open MPI will open sockets.",
253 (0x1 << 16) - mca_btl_tcp2_component.tcp6_port_min - 1 );
254 mca_btl_tcp2_component.tcp6_port_range =
255 mca_btl_tcp2_param_register_int( "port_range_v6", message,
256 (0x1 << 16) - mca_btl_tcp2_component.tcp6_port_min - 1);
257 free(message);
258 #endif
259 mca_btl_tcp2_module.super.btl_exclusivity = MCA_BTL_EXCLUSIVITY_LOW + 100;
260 mca_btl_tcp2_module.super.btl_eager_limit = 64*1024;
261 mca_btl_tcp2_module.super.btl_rndv_eager_limit = 64*1024;
262 mca_btl_tcp2_module.super.btl_max_send_size = 128*1024;
263 mca_btl_tcp2_module.super.btl_rdma_pipeline_send_length = 128*1024;
264 mca_btl_tcp2_module.super.btl_rdma_pipeline_frag_size = INT_MAX;
265 mca_btl_tcp2_module.super.btl_min_rdma_pipeline_size = 0;
266 mca_btl_tcp2_module.super.btl_flags = MCA_BTL_FLAGS_PUT |
267 MCA_BTL_FLAGS_SEND_INPLACE |
268 MCA_BTL_FLAGS_NEED_CSUM |
269 MCA_BTL_FLAGS_NEED_ACK |
270 MCA_BTL_FLAGS_HETEROGENEOUS_RDMA;
271 mca_btl_tcp2_module.super.btl_bandwidth = 100;
272 mca_btl_tcp2_module.super.btl_latency = 100;
273 mca_btl_base_param_register(&mca_btl_tcp2_component.super.btl_version,
274 &mca_btl_tcp2_module.super);
275
276 mca_btl_tcp2_component.tcp_disable_family =
277 mca_btl_tcp2_param_register_int ("disable_family", NULL, 0);
278
279 return OMPI_SUCCESS;
280 }
281
282
283
284
285
286
287 int mca_btl_tcp2_component_close(void)
288 {
289 opal_list_item_t* item;
290 opal_list_item_t* next;
291
292 if(NULL != mca_btl_tcp2_component.tcp_if_include) {
293 free(mca_btl_tcp2_component.tcp_if_include);
294 mca_btl_tcp2_component.tcp_if_include = NULL;
295 }
296 if(NULL != mca_btl_tcp2_component.tcp_if_exclude) {
297 free(mca_btl_tcp2_component.tcp_if_exclude);
298 mca_btl_tcp2_component.tcp_if_exclude = NULL;
299 }
300
301 if (NULL != mca_btl_tcp2_component.tcp_btls)
302 free(mca_btl_tcp2_component.tcp_btls);
303
304 if (mca_btl_tcp2_component.tcp_listen_sd >= 0) {
305 opal_event_del(&mca_btl_tcp2_component.tcp_recv_event);
306 CLOSE_THE_SOCKET(mca_btl_tcp2_component.tcp_listen_sd);
307 mca_btl_tcp2_component.tcp_listen_sd = -1;
308 }
309 #if OPAL_ENABLE_IPV6
310 if (mca_btl_tcp2_component.tcp6_listen_sd >= 0) {
311 opal_event_del(&mca_btl_tcp2_component.tcp6_recv_event);
312 CLOSE_THE_SOCKET(mca_btl_tcp2_component.tcp6_listen_sd);
313 mca_btl_tcp2_component.tcp6_listen_sd = -1;
314 }
315 #endif
316
317
318
319 OPAL_THREAD_LOCK(&mca_btl_tcp2_component.tcp_lock);
320 for(item = opal_list_get_first(&mca_btl_tcp2_component.tcp_events);
321 item != opal_list_get_end(&mca_btl_tcp2_component.tcp_events);
322 item = next) {
323 mca_btl_tcp2_event_t* event = (mca_btl_tcp2_event_t*)item;
324 next = opal_list_get_next(item);
325 opal_event_del(&event->event);
326 OBJ_RELEASE(event);
327 }
328 OPAL_THREAD_UNLOCK(&mca_btl_tcp2_component.tcp_lock);
329
330
331 OBJ_DESTRUCT(&mca_btl_tcp2_component.tcp_procs);
332 OBJ_DESTRUCT(&mca_btl_tcp2_component.tcp_events);
333 OBJ_DESTRUCT(&mca_btl_tcp2_component.tcp_frag_eager);
334 OBJ_DESTRUCT(&mca_btl_tcp2_component.tcp_frag_max);
335 OBJ_DESTRUCT(&mca_btl_tcp2_component.tcp_frag_user);
336 OBJ_DESTRUCT(&mca_btl_tcp2_component.tcp_lock);
337
338 return OMPI_SUCCESS;
339 }
340
341
342
343
344
345
346 static int mca_btl_tcp2_create(int if_kindex, const char* if_name)
347 {
348 struct mca_btl_tcp2_module_t* btl;
349 char param[256];
350 int i;
351
352 for( i = 0; i < (int)mca_btl_tcp2_component.tcp_num_links; i++ ) {
353 btl = (struct mca_btl_tcp2_module_t *)malloc(sizeof(mca_btl_tcp2_module_t));
354 if(NULL == btl)
355 return OMPI_ERR_OUT_OF_RESOURCE;
356 memcpy(btl, &mca_btl_tcp2_module, sizeof(mca_btl_tcp2_module));
357 OBJ_CONSTRUCT(&btl->tcp_endpoints, opal_list_t);
358 mca_btl_tcp2_component.tcp_btls[mca_btl_tcp2_component.tcp_num_btls++] = btl;
359
360
361 btl->tcp_ifkindex = (uint16_t) if_kindex;
362 #if MCA_BTL_TCP_STATISTICS
363 btl->tcp_bytes_recv = 0;
364 btl->tcp_bytes_sent = 0;
365 btl->tcp_send_handler = 0;
366 #endif
367
368
369 sprintf(param, "bandwidth_%s", if_name);
370 btl->super.btl_bandwidth = mca_btl_tcp2_param_register_int(param, NULL, btl->super.btl_bandwidth);
371
372
373 sprintf(param, "latency_%s", if_name);
374 btl->super.btl_latency = mca_btl_tcp2_param_register_int(param, NULL, btl->super.btl_latency);
375 if( i > 0 ) {
376 btl->super.btl_bandwidth >>= 1;
377 btl->super.btl_latency <<= 1;
378 }
379
380
381 sprintf(param, "bandwidth_%s:%d", if_name, i);
382 btl->super.btl_bandwidth = mca_btl_tcp2_param_register_int(param, NULL, btl->super.btl_bandwidth);
383
384
385 sprintf(param, "latency_%s:%d", if_name, i);
386 btl->super.btl_latency = mca_btl_tcp2_param_register_int(param, NULL, btl->super.btl_latency);
387 #if 0 && OPAL_ENABLE_DEBUG
388 BTL_OUTPUT(("interface %s instance %i: bandwidth %d latency %d\n", if_name, i,
389 btl->super.btl_bandwidth, btl->super.btl_latency));
390 #endif
391 }
392 return OMPI_SUCCESS;
393 }
394
395
396
397
398
399
400 static char **split_and_resolve(char **orig_str, char *name)
401 {
402 int i, ret, save, if_index;
403 char **argv, *str, *tmp;
404 char if_name[IF_NAMESIZE];
405 struct sockaddr_storage argv_inaddr, if_inaddr;
406 uint32_t argv_prefix;
407
408
409 if (NULL == orig_str || NULL == *orig_str) {
410 return NULL;
411 }
412
413 argv = opal_argv_split(*orig_str, ',');
414 if (NULL == argv) {
415 return NULL;
416 }
417 for (save = i = 0; NULL != argv[i]; ++i) {
418 if (isalpha(argv[i][0])) {
419 argv[save++] = argv[i];
420 continue;
421 }
422
423
424
425 argv_prefix = 0;
426 tmp = strdup(argv[i]);
427 str = strchr(argv[i], '/');
428 if (NULL == str) {
429 orte_show_help("help-mpi-btl-tcp2.txt", "invalid if_inexclude",
430 true, name, orte_process_info.nodename,
431 tmp, "Invalid specification (missing \"/\")");
432 free(argv[i]);
433 free(tmp);
434 continue;
435 }
436 *str = '\0';
437 argv_prefix = atoi(str + 1);
438
439
440 ((struct sockaddr*) &argv_inaddr)->sa_family = AF_INET;
441 ret = inet_pton(AF_INET, argv[i],
442 &((struct sockaddr_in*) &argv_inaddr)->sin_addr);
443 free(argv[i]);
444
445 if (1 != ret) {
446 orte_show_help("help-mpi-btl-tcp2.txt", "invalid if_inexclude",
447 true, name, orte_process_info.nodename, tmp,
448 "Invalid specification (inet_pton() failed)");
449 free(tmp);
450 continue;
451 }
452 opal_output_verbose(20, mca_btl_base_output,
453 "btl: tcp: Searching for %s address+prefix: %s / %u",
454 name,
455 opal_net_get_hostname((struct sockaddr*) &argv_inaddr),
456 argv_prefix);
457
458
459 for (if_index = opal_ifbegin(); if_index >= 0;
460 if_index = opal_ifnext(if_index)) {
461 opal_ifindextoaddr(if_index,
462 (struct sockaddr*) &if_inaddr,
463 sizeof(if_inaddr));
464 if (opal_net_samenetwork((struct sockaddr*) &argv_inaddr,
465 (struct sockaddr*) &if_inaddr,
466 argv_prefix)) {
467 break;
468 }
469 }
470
471
472 if (if_index < 0) {
473 orte_show_help("help-mpi-btl-tcp2.txt", "invalid if_inexclude",
474 true, name, orte_process_info.nodename, tmp,
475 "Did not find interface matching this subnet");
476 free(tmp);
477 continue;
478 }
479
480
481
482 opal_ifindextoname(if_index, if_name, sizeof(if_name));
483 opal_output_verbose(20, mca_btl_base_output,
484 "btl: tcp: Found match: %s (%s)",
485 opal_net_get_hostname((struct sockaddr*) &if_inaddr),
486 if_name);
487 argv[save++] = strdup(if_name);
488 free(tmp);
489 }
490
491
492
493 argv[save] = NULL;
494 free(*orig_str);
495 *orig_str = opal_argv_join(argv, ',');
496 return argv;
497 }
498
499
500
501
502
503
504
505
506
507 static int mca_btl_tcp2_component_create_instances(void)
508 {
509 const int if_count = opal_ifcount();
510 int if_index;
511 int kif_count = 0;
512 int *kindexes = NULL;
513 char **include;
514 char **exclude;
515 char **argv;
516 int ret = OMPI_SUCCESS;
517
518 if(if_count <= 0) {
519 return OMPI_ERROR;
520 }
521
522 kindexes = (int *) malloc(sizeof(int) * if_count);
523 if (NULL == kindexes) {
524 return OMPI_ERR_OUT_OF_RESOURCE;
525 }
526
527
528 {
529 int j;
530
531
532 memset (kindexes, 0, sizeof(int) * if_count);
533
534
535
536
537 for(if_index = opal_ifbegin(); if_index >= 0; if_index = opal_ifnext(if_index)){
538 int index = opal_ifindextokindex (if_index);
539 if (index > 0) {
540 bool already_seen = false;
541 for (j=0; (false == already_seen) && (j < kif_count); j++) {
542 if (kindexes[j] == index) {
543 already_seen = true;
544 }
545 }
546
547 if (false == already_seen) {
548 kindexes[kif_count] = index;
549 kif_count++;
550 }
551 }
552 }
553 }
554
555
556 mca_btl_tcp2_component.tcp_btls = (mca_btl_tcp2_module_t**)malloc(mca_btl_tcp2_component.tcp_num_links *
557 kif_count * sizeof(mca_btl_tcp2_module_t*));
558 if(NULL == mca_btl_tcp2_component.tcp_btls) {
559 ret = OMPI_ERR_OUT_OF_RESOURCE;
560 goto cleanup;
561 }
562
563 mca_btl_tcp2_component.tcp_addr_count = if_count;
564
565
566 argv = include = split_and_resolve(&mca_btl_tcp2_component.tcp_if_include,
567 "include");
568 while(argv && *argv) {
569 char* if_name = *argv;
570 int if_index = opal_ifnametokindex(if_name);
571 if(if_index < 0) {
572 BTL_ERROR(("invalid interface \"%s\"", if_name));
573 ret = OMPI_ERR_NOT_FOUND;
574 goto cleanup;
575 }
576 mca_btl_tcp2_create(if_index, if_name);
577 argv++;
578 }
579 opal_argv_free(include);
580
581
582
583 if (mca_btl_tcp2_component.tcp_num_btls > 0) {
584 ret = OMPI_SUCCESS;
585 goto cleanup;
586 }
587
588
589
590
591 exclude = split_and_resolve(&mca_btl_tcp2_component.tcp_if_exclude,
592 "exclude");
593 {
594 int i;
595 for(i = 0; i < kif_count; i++) {
596
597 char if_name[IF_NAMESIZE];
598 if_index = kindexes[i];
599
600 opal_ifkindextoname(if_index, if_name, sizeof(if_name));
601
602
603 argv = exclude;
604 while(argv && *argv) {
605 if(strncmp(*argv,if_name,strlen(*argv)) == 0)
606 break;
607 argv++;
608 }
609
610 if(argv == 0 || *argv == 0) {
611 mca_btl_tcp2_create(if_index, if_name);
612 }
613 }
614 }
615 opal_argv_free(exclude);
616
617 cleanup:
618 if (NULL != kindexes) {
619 free(kindexes);
620 }
621 return ret;
622 }
623
624
625
626
627
628 static int mca_btl_tcp2_component_create_listen(uint16_t af_family)
629 {
630 int flags;
631 int sd;
632 struct sockaddr_storage inaddr;
633 opal_socklen_t addrlen;
634
635
636 sd = socket(af_family, SOCK_STREAM, 0);
637 if(sd < 0) {
638 if (EAFNOSUPPORT != opal_socket_errno) {
639 BTL_ERROR(("socket() failed: %s (%d)",
640 strerror(opal_socket_errno), opal_socket_errno));
641 }
642 return OMPI_ERR_IN_ERRNO;
643 }
644
645 mca_btl_tcp2_set_socket_options(sd);
646
647 #if OPAL_ENABLE_IPV6
648 {
649 struct addrinfo hints, *res = NULL;
650 int error;
651
652 memset (&hints, 0, sizeof(hints));
653 hints.ai_family = af_family;
654 hints.ai_socktype = SOCK_STREAM;
655 hints.ai_flags = AI_PASSIVE;
656
657 if ((error = getaddrinfo(NULL, "0", &hints, &res))) {
658 opal_output (0,
659 "mca_btl_tcp2_create_listen: unable to resolve. %s\n",
660 gai_strerror (error));
661 CLOSE_THE_SOCKET(sd);
662 return OMPI_ERROR;
663 }
664
665 memcpy (&inaddr, res->ai_addr, res->ai_addrlen);
666 addrlen = res->ai_addrlen;
667 freeaddrinfo (res);
668
669 #ifdef IPV6_V6ONLY
670
671 if (AF_INET6 == af_family) {
672 int flg = 1;
673 if (setsockopt (sd, IPPROTO_IPV6, IPV6_V6ONLY,
674 (char *) &flg, sizeof (flg)) < 0) {
675 opal_output(0,
676 "mca_btl_tcp2_create_listen: unable to disable v4-mapped addresses\n");
677 }
678 }
679 #endif
680 }
681 #else
682 ((struct sockaddr_in*) &inaddr)->sin_family = AF_INET;
683 ((struct sockaddr_in*) &inaddr)->sin_addr.s_addr = INADDR_ANY;
684 addrlen = sizeof(struct sockaddr_in);
685 #endif
686
687 {
688 int flg = 0;
689 if (setsockopt (sd, SOL_SOCKET, SO_REUSEADDR, (const char *)&flg, sizeof (flg)) < 0) {
690 BTL_ERROR(("mca_btl_tcp2_create_listen: unable to unset the "
691 "SO_REUSEADDR option (%s:%d)\n",
692 strerror(opal_socket_errno), opal_socket_errno));
693 CLOSE_THE_SOCKET(sd);
694 return OMPI_ERROR;
695 }
696 }
697
698 {
699 int index, range, port;
700
701 range = mca_btl_tcp2_component.tcp_port_range;
702 port = mca_btl_tcp2_component.tcp_port_min;
703 #if OPAL_ENABLE_IPV6
704 if (AF_INET6 == af_family) {
705 range = mca_btl_tcp2_component.tcp6_port_range;
706 port = mca_btl_tcp2_component.tcp6_port_min;
707 }
708 #endif
709
710 for( index = 0; index < range; index++ ) {
711 #if OPAL_ENABLE_IPV6
712 ((struct sockaddr_in6*) &inaddr)->sin6_port = htons(port + index);
713 #else
714 ((struct sockaddr_in*) &inaddr)->sin_port = htons(port + index);
715 #endif
716 if(bind(sd, (struct sockaddr*)&inaddr, addrlen) < 0) {
717 if( (EADDRINUSE == opal_socket_errno) || (EADDRNOTAVAIL == opal_socket_errno) ) {
718 continue;
719 }
720 BTL_ERROR(("bind() failed: %s (%d)",
721 strerror(opal_socket_errno), opal_socket_errno));
722 CLOSE_THE_SOCKET(sd);
723 return OMPI_ERROR;
724 }
725 goto socket_binded;
726 }
727 if( AF_INET == af_family ) {
728 BTL_ERROR(("bind() failed: no port available in the range [%d..%d]",
729 mca_btl_tcp2_component.tcp_port_min,
730 mca_btl_tcp2_component.tcp_port_min + range));
731 }
732 #if OPAL_ENABLE_IPV6
733 if (AF_INET6 == af_family) {
734 BTL_ERROR(("bind6() failed: no port available in the range [%d..%d]",
735 mca_btl_tcp2_component.tcp6_port_min,
736 mca_btl_tcp2_component.tcp6_port_min + range));
737 }
738 #endif
739 CLOSE_THE_SOCKET(sd);
740 return OMPI_ERROR;
741 }
742 socket_binded:
743
744 if(getsockname(sd, (struct sockaddr*)&inaddr, &addrlen) < 0) {
745 BTL_ERROR(("getsockname() failed: %s (%d)",
746 strerror(opal_socket_errno), opal_socket_errno));
747 CLOSE_THE_SOCKET(sd);
748 return OMPI_ERROR;
749 }
750
751 if (AF_INET == af_family) {
752 mca_btl_tcp2_component.tcp_listen_port = ((struct sockaddr_in*) &inaddr)->sin_port;
753 mca_btl_tcp2_component.tcp_listen_sd = sd;
754 }
755 #if OPAL_ENABLE_IPV6
756 if (AF_INET6 == af_family) {
757 mca_btl_tcp2_component.tcp6_listen_port = ((struct sockaddr_in6*) &inaddr)->sin6_port;
758 mca_btl_tcp2_component.tcp6_listen_sd = sd;
759 }
760 #endif
761
762
763 if(listen(sd, SOMAXCONN) < 0) {
764 BTL_ERROR(("listen() failed: %s (%d)",
765 strerror(opal_socket_errno), opal_socket_errno));
766 CLOSE_THE_SOCKET(sd);
767 return OMPI_ERROR;
768 }
769
770
771 if((flags = fcntl(sd, F_GETFL, 0)) < 0) {
772 BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)",
773 strerror(opal_socket_errno), opal_socket_errno));
774 CLOSE_THE_SOCKET(sd);
775 return OMPI_ERROR;
776 } else {
777 flags |= O_NONBLOCK;
778 if(fcntl(sd, F_SETFL, flags) < 0) {
779 BTL_ERROR(("fcntl(F_SETFL) failed: %s (%d)",
780 strerror(opal_socket_errno), opal_socket_errno));
781 CLOSE_THE_SOCKET(sd);
782 return OMPI_ERROR;
783 }
784 }
785
786
787 if (AF_INET == af_family) {
788 opal_event_set(opal_event_base, &mca_btl_tcp2_component.tcp_recv_event,
789 mca_btl_tcp2_component.tcp_listen_sd,
790 OPAL_EV_READ|OPAL_EV_PERSIST,
791 mca_btl_tcp2_component_accept_handler,
792 0 );
793 opal_event_add(&mca_btl_tcp2_component.tcp_recv_event, 0);
794 }
795 #if OPAL_ENABLE_IPV6
796 if (AF_INET6 == af_family) {
797 opal_event_set(opal_event_base, &mca_btl_tcp2_component.tcp6_recv_event,
798 mca_btl_tcp2_component.tcp6_listen_sd,
799 OPAL_EV_READ|OPAL_EV_PERSIST,
800 mca_btl_tcp2_component_accept_handler,
801 0 );
802 opal_event_add(&mca_btl_tcp2_component.tcp6_recv_event, 0);
803 }
804 #endif
805 return OMPI_SUCCESS;
806 }
807
808
809
810
811
812
813 static int mca_btl_tcp2_component_exchange(void)
814 {
815 int rc = 0, index;
816 size_t i = 0;
817 size_t size = mca_btl_tcp2_component.tcp_addr_count *
818 mca_btl_tcp2_component.tcp_num_links * sizeof(mca_btl_tcp2_addr_t);
819
820
821
822
823
824
825
826 size_t xfer_size = 0;
827 size_t current_addr = 0;
828
829 if(mca_btl_tcp2_component.tcp_num_btls != 0) {
830 mca_btl_tcp2_addr_t *addrs = (mca_btl_tcp2_addr_t *)malloc(size);
831 memset(addrs, 0, size);
832
833
834 for( i = 0; i < mca_btl_tcp2_component.tcp_num_btls; i++ ) {
835 for (index = opal_ifbegin(); index >= 0;
836 index = opal_ifnext(index)) {
837 struct sockaddr_storage my_ss;
838
839
840
841
842 if (opal_ifindextokindex (index) !=
843 mca_btl_tcp2_component.tcp_btls[i]->tcp_ifkindex) {
844 continue;
845 }
846
847 if (OPAL_SUCCESS !=
848 opal_ifindextoaddr(index, (struct sockaddr*) &my_ss,
849 sizeof (my_ss))) {
850 opal_output (0,
851 "btl_tcp2_component: problems getting address for index %i (kernel index %i)\n",
852 index, opal_ifindextokindex (index));
853 continue;
854 }
855
856 if ((AF_INET == my_ss.ss_family) &&
857 (4 != mca_btl_tcp2_component.tcp_disable_family)) {
858 memcpy(&addrs[current_addr].addr_inet,
859 &((struct sockaddr_in*)&my_ss)->sin_addr,
860 sizeof(addrs[0].addr_inet));
861 addrs[current_addr].addr_port =
862 mca_btl_tcp2_component.tcp_listen_port;
863 addrs[current_addr].addr_family = MCA_BTL_TCP_AF_INET;
864 xfer_size += sizeof (mca_btl_tcp2_addr_t);
865 addrs[current_addr].addr_inuse = 0;
866 addrs[current_addr].addr_ifkindex =
867 opal_ifindextokindex (index);
868 current_addr++;
869 }
870 #if OPAL_ENABLE_IPV6
871 if ((AF_INET6 == my_ss.ss_family) &&
872 (6 != mca_btl_tcp2_component.tcp_disable_family)) {
873 memcpy(&addrs[current_addr].addr_inet,
874 &((struct sockaddr_in6*)&my_ss)->sin6_addr,
875 sizeof(addrs[0].addr_inet));
876 addrs[current_addr].addr_port =
877 mca_btl_tcp2_component.tcp6_listen_port;
878 addrs[current_addr].addr_family = MCA_BTL_TCP_AF_INET6;
879 xfer_size += sizeof (mca_btl_tcp2_addr_t);
880 addrs[current_addr].addr_inuse = 0;
881 addrs[current_addr].addr_ifkindex =
882 opal_ifindextokindex (index);
883 current_addr++;
884 }
885 #endif
886 }
887 }
888 rc = ompi_modex_send(&mca_btl_tcp2_component.super.btl_version,
889 addrs, xfer_size);
890 free(addrs);
891 }
892 return rc;
893 }
894
895
896
897
898
899
900
901
902 mca_btl_base_module_t** mca_btl_tcp2_component_init(int *num_btl_modules,
903 bool enable_progress_threads,
904 bool enable_mpi_threads)
905 {
906 int ret = OMPI_SUCCESS;
907 mca_btl_base_module_t **btls;
908 *num_btl_modules = 0;
909
910
911 ompi_free_list_init_new( &mca_btl_tcp2_component.tcp_frag_eager,
912 sizeof (mca_btl_tcp2_frag_eager_t) +
913 mca_btl_tcp2_module.super.btl_eager_limit,
914 opal_cache_line_size,
915 OBJ_CLASS (mca_btl_tcp2_frag_eager_t),
916 0,opal_cache_line_size,
917 mca_btl_tcp2_component.tcp_free_list_num,
918 mca_btl_tcp2_component.tcp_free_list_max,
919 mca_btl_tcp2_component.tcp_free_list_inc,
920 NULL );
921
922 ompi_free_list_init_new( &mca_btl_tcp2_component.tcp_frag_max,
923 sizeof (mca_btl_tcp2_frag_max_t) +
924 mca_btl_tcp2_module.super.btl_max_send_size,
925 opal_cache_line_size,
926 OBJ_CLASS (mca_btl_tcp2_frag_max_t),
927 0,opal_cache_line_size,
928 mca_btl_tcp2_component.tcp_free_list_num,
929 mca_btl_tcp2_component.tcp_free_list_max,
930 mca_btl_tcp2_component.tcp_free_list_inc,
931 NULL );
932
933 ompi_free_list_init_new( &mca_btl_tcp2_component.tcp_frag_user,
934 sizeof (mca_btl_tcp2_frag_user_t),
935 opal_cache_line_size,
936 OBJ_CLASS (mca_btl_tcp2_frag_user_t),
937 0,opal_cache_line_size,
938 mca_btl_tcp2_component.tcp_free_list_num,
939 mca_btl_tcp2_component.tcp_free_list_max,
940 mca_btl_tcp2_component.tcp_free_list_inc,
941 NULL );
942
943
944 if(OMPI_SUCCESS != (ret = mca_btl_tcp2_component_create_instances() )) {
945 return 0;
946 }
947
948
949 if(OMPI_SUCCESS != (ret = mca_btl_tcp2_component_create_listen(AF_INET) )) {
950 return 0;
951 }
952 #if OPAL_ENABLE_IPV6
953 if((ret = mca_btl_tcp2_component_create_listen(AF_INET6)) != OMPI_SUCCESS) {
954 if (!(OMPI_ERR_IN_ERRNO == OPAL_SOS_GET_ERROR_CODE(ret) &&
955 EAFNOSUPPORT == opal_socket_errno)) {
956 opal_output (0, "mca_btl_tcp2_component: IPv6 listening socket failed\n");
957 return 0;
958 }
959 }
960 #endif
961
962
963 if(OMPI_SUCCESS != (ret = mca_btl_tcp2_component_exchange() )) {
964 return 0;
965 }
966
967 btls = (mca_btl_base_module_t **)malloc(mca_btl_tcp2_component.tcp_num_btls *
968 sizeof(mca_btl_base_module_t*));
969 if(NULL == btls) {
970 return NULL;
971 }
972
973 memcpy(btls, mca_btl_tcp2_component.tcp_btls, mca_btl_tcp2_component.tcp_num_btls*sizeof(mca_btl_tcp2_module_t*));
974 *num_btl_modules = mca_btl_tcp2_component.tcp_num_btls;
975 return btls;
976 }
977
978
979
980
981
982 int mca_btl_tcp2_component_control(int param, void* value, size_t size)
983 {
984 return OMPI_SUCCESS;
985 }
986
987
988
989
990
991
992
993 static void mca_btl_tcp2_component_accept_handler( int incoming_sd,
994 short ignored,
995 void* unused )
996 {
997 while(true) {
998 #if OPAL_ENABLE_IPV6
999 struct sockaddr_in6 addr;
1000 #else
1001 struct sockaddr_in addr;
1002 #endif
1003 opal_socklen_t addrlen = sizeof(addr);
1004
1005 mca_btl_tcp2_event_t *event;
1006 int sd = accept(incoming_sd, (struct sockaddr*)&addr, &addrlen);
1007 if(sd < 0) {
1008 if(opal_socket_errno == EINTR)
1009 continue;
1010 if(opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK)
1011 BTL_ERROR(("accept() failed: %s (%d).",
1012 strerror(opal_socket_errno), opal_socket_errno));
1013 return;
1014 }
1015 mca_btl_tcp2_set_socket_options(sd);
1016
1017
1018
1019 event = OBJ_NEW(mca_btl_tcp2_event_t);
1020 opal_event_set(opal_event_base, &event->event, sd, OPAL_EV_READ, mca_btl_tcp2_component_recv_handler, event);
1021 opal_event_add(&event->event, 0);
1022 }
1023 }
1024
1025
1026
1027
1028
1029
1030
1031
1032 static void mca_btl_tcp2_component_recv_handler(int sd, short flags, void* user)
1033 {
1034 orte_process_name_t guid;
1035 struct sockaddr_storage addr;
1036 int retval;
1037 mca_btl_tcp2_proc_t* btl_proc;
1038 opal_socklen_t addr_len = sizeof(addr);
1039 mca_btl_tcp2_event_t *event = (mca_btl_tcp2_event_t *)user;
1040
1041 OBJ_RELEASE(event);
1042
1043
1044 retval = recv(sd, (char *)&guid, sizeof(guid), 0);
1045 if(retval != sizeof(guid)) {
1046 CLOSE_THE_SOCKET(sd);
1047 return;
1048 }
1049 ORTE_PROCESS_NAME_NTOH(guid);
1050
1051
1052 if((flags = fcntl(sd, F_GETFL, 0)) < 0) {
1053 BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)",
1054 strerror(opal_socket_errno), opal_socket_errno));
1055 } else {
1056 flags |= O_NONBLOCK;
1057 if(fcntl(sd, F_SETFL, flags) < 0) {
1058 BTL_ERROR(("fcntl(F_SETFL) failed: %s (%d)",
1059 strerror(opal_socket_errno), opal_socket_errno));
1060 }
1061 }
1062
1063
1064 btl_proc = mca_btl_tcp2_proc_lookup(&guid);
1065 if(NULL == btl_proc) {
1066 CLOSE_THE_SOCKET(sd);
1067 return;
1068 }
1069
1070
1071 if(getpeername(sd, (struct sockaddr*)&addr, &addr_len) != 0) {
1072 BTL_ERROR(("getpeername() failed: %s (%d)",
1073 strerror(opal_socket_errno), opal_socket_errno));
1074 CLOSE_THE_SOCKET(sd);
1075 return;
1076 }
1077
1078
1079 if(mca_btl_tcp2_proc_accept(btl_proc, (struct sockaddr*)&addr, sd) == false) {
1080 CLOSE_THE_SOCKET(sd);
1081 return;
1082 }
1083 }
1084