This source file includes following definitions.
- mca_btl_tcp_param_register_string
- mca_btl_tcp_param_register_int
- mca_btl_tcp_param_register_uint
- mca_btl_tcp_event_construct
- mca_btl_tcp_event_destruct
- mca_btl_tcp_component_verify
- mca_btl_tcp_component_register
- mca_btl_tcp_component_open
- mca_btl_tcp_component_close
- mca_btl_tcp_create
- split_and_resolve
- mca_btl_tcp_component_create_instances
- mca_btl_tcp_progress_thread_engine
- mca_btl_tcp_component_event_async_handler
- mca_btl_tcp_component_create_listen
- mca_btl_tcp_component_exchange
- mca_btl_tcp_component_init
- mca_btl_tcp_component_accept_handler
- mca_btl_tcp_component_recv_handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 #include "opal_config.h"
32
33 #include "opal/opal_socket_errno.h"
34 #ifdef HAVE_UNISTD_H
35 #include <unistd.h>
36 #endif
37 #include <string.h>
38 #include <fcntl.h>
39 #ifdef HAVE_SYS_TYPES_H
40 #include <sys/types.h>
41 #endif
42 #ifdef HAVE_SYS_SOCKET_H
43 #include <sys/socket.h>
44 #endif
45 #ifdef HAVE_NETINET_IN_H
46 #include <netinet/in.h>
47 #endif
48 #ifdef HAVE_ARPA_INET_H
49 #include <arpa/inet.h>
50 #endif
51 #if OPAL_ENABLE_IPV6
52 # ifdef HAVE_NETDB_H
53 # include <netdb.h>
54 # endif
55 #endif
56 #include <ctype.h>
57 #include <limits.h>
58 #ifdef HAVE_SYS_TIME_H
59 #include <sys/time.h>
60 #endif
61
62 #include "opal/mca/event/event.h"
63 #include "opal/util/ethtool.h"
64 #include "opal/util/if.h"
65 #include "opal/util/output.h"
66 #include "opal/util/argv.h"
67 #include "opal/util/net.h"
68 #include "opal/util/proc.h"
69 #include "opal/util/net.h"
70 #include "opal/util/fd.h"
71 #include "opal/util/show_help.h"
72 #include "opal/util/printf.h"
73 #include "opal/constants.h"
74 #include "opal/mca/btl/btl.h"
75 #include "opal/mca/btl/base/base.h"
76 #include "opal/mca/mpool/base/base.h"
77 #include "opal/mca/btl/base/btl_base_error.h"
78 #include "opal/mca/pmix/pmix.h"
79 #include "opal/threads/threads.h"
80
81 #include "opal/constants.h"
82 #include "opal/mca/btl/btl.h"
83 #include "opal/mca/btl/base/base.h"
84 #include "opal/mca/btl/base/btl_base_error.h"
85 #include "btl_tcp.h"
86 #include "btl_tcp_addr.h"
87 #include "btl_tcp_proc.h"
88 #include "btl_tcp_frag.h"
89 #include "btl_tcp_endpoint.h"
90 #if OPAL_CUDA_SUPPORT
91 #include "opal/mca/common/cuda/common_cuda.h"
92 #endif
93
94 #define MCA_BTL_TCP_BTL_BANDWIDTH 100
95 #define MCA_BTL_TCP_BTL_LATENCY 100
96
97
98
99
100 static int mca_btl_tcp_component_register(void);
101 static int mca_btl_tcp_component_open(void);
102 static int mca_btl_tcp_component_close(void);
103
104 opal_event_base_t* mca_btl_tcp_event_base = NULL;
105 int mca_btl_tcp_progress_thread_trigger = -1;
106 int mca_btl_tcp_pipe_to_progress[2] = { -1, -1 };
107 static opal_thread_t mca_btl_tcp_progress_thread = { { 0 } };
108 opal_list_t mca_btl_tcp_ready_frag_pending_queue = { { 0 } };
109 opal_mutex_t mca_btl_tcp_ready_frag_mutex = OPAL_MUTEX_STATIC_INIT;
110
111 mca_btl_tcp_component_t mca_btl_tcp_component = {
112 .super = {
113
114
115
116 .btl_version = {
117 MCA_BTL_DEFAULT_VERSION("tcp"),
118 .mca_open_component = mca_btl_tcp_component_open,
119 .mca_close_component = mca_btl_tcp_component_close,
120 .mca_register_component_params = mca_btl_tcp_component_register,
121 },
122 .btl_data = {
123
124 .param_field = MCA_BASE_METADATA_PARAM_CHECKPOINT
125 },
126
127 .btl_init = mca_btl_tcp_component_init,
128 .btl_progress = NULL,
129 }
130 };
131
132
133
134
135
136 static inline char* mca_btl_tcp_param_register_string(
137 const char* param_name,
138 const char* help_string,
139 const char* default_value,
140 int level,
141 char **storage)
142 {
143 *storage = (char *) default_value;
144 (void) mca_base_component_var_register(&mca_btl_tcp_component.super.btl_version,
145 param_name, help_string, MCA_BASE_VAR_TYPE_STRING,
146 NULL, 0, 0, level,
147 MCA_BASE_VAR_SCOPE_READONLY, storage);
148 return *storage;
149 }
150
151 static inline int mca_btl_tcp_param_register_int(
152 const char* param_name,
153 const char* help_string,
154 int default_value,
155 int level,
156 int *storage)
157 {
158 *storage = default_value;
159 (void) mca_base_component_var_register(&mca_btl_tcp_component.super.btl_version,
160 param_name, help_string, MCA_BASE_VAR_TYPE_INT,
161 NULL, 0, 0, level,
162 MCA_BASE_VAR_SCOPE_READONLY, storage);
163 return *storage;
164 }
165
166 static inline unsigned int mca_btl_tcp_param_register_uint(
167 const char* param_name,
168 const char* help_string,
169 unsigned int default_value,
170 int level,
171 unsigned int *storage)
172 {
173 *storage = default_value;
174 (void) mca_base_component_var_register(&mca_btl_tcp_component.super.btl_version,
175 param_name, help_string, MCA_BASE_VAR_TYPE_UNSIGNED_INT,
176 NULL, 0, 0, level,
177 MCA_BASE_VAR_SCOPE_READONLY, storage);
178 return *storage;
179 }
180
181
182
183
184
185
186 struct mca_btl_tcp_event_t {
187 opal_list_item_t item;
188 opal_event_t event;
189 };
190 typedef struct mca_btl_tcp_event_t mca_btl_tcp_event_t;
191
192 static void mca_btl_tcp_event_construct(mca_btl_tcp_event_t* event)
193 {
194 MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_lock);
195 opal_list_append(&mca_btl_tcp_component.tcp_events, &event->item);
196 MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_lock);
197 }
198
199 static void mca_btl_tcp_event_destruct(mca_btl_tcp_event_t* event)
200 {
201 MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_lock);
202 opal_list_remove_item(&mca_btl_tcp_component.tcp_events, &event->item);
203 MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_lock);
204 }
205
206 OBJ_CLASS_INSTANCE(
207 mca_btl_tcp_event_t,
208 opal_list_item_t,
209 mca_btl_tcp_event_construct,
210 mca_btl_tcp_event_destruct);
211
212
213
214
215
216 static void mca_btl_tcp_component_recv_handler(int, short, void*);
217 static void mca_btl_tcp_component_accept_handler(int, short, void*);
218
219 static int mca_btl_tcp_component_verify(void)
220 {
221 if( mca_btl_tcp_component.tcp_port_min > USHRT_MAX ) {
222 opal_show_help("help-mpi-btl-tcp.txt", "invalid minimum port",
223 true, "v4", opal_process_info.nodename,
224 mca_btl_tcp_component.tcp_port_min );
225 mca_btl_tcp_component.tcp_port_min = 1024;
226 }
227 #if OPAL_ENABLE_IPV6
228 if( mca_btl_tcp_component.tcp6_port_min > USHRT_MAX ) {
229 opal_show_help("help-mpi-btl-tcp.txt", "invalid minimum port",
230 true, "v6", opal_process_info.nodename,
231 mca_btl_tcp_component.tcp6_port_min );
232 mca_btl_tcp_component.tcp6_port_min = 1024;
233 }
234 #endif
235
236 return OPAL_SUCCESS;
237 }
238
239
240
241
242
243
244 static int mca_btl_tcp_component_register(void)
245 {
246 char* message;
247
248
249 mca_btl_tcp_param_register_uint("links", NULL, 1, OPAL_INFO_LVL_4, &mca_btl_tcp_component.tcp_num_links);
250 mca_btl_tcp_param_register_string("if_include", "Comma-delimited list of devices and/or CIDR notation of networks to use for MPI communication (e.g., \"eth0,192.168.0.0/16\"). Mutually exclusive with btl_tcp_if_exclude.", "", OPAL_INFO_LVL_1, &mca_btl_tcp_component.tcp_if_include);
251 mca_btl_tcp_param_register_string("if_exclude", "Comma-delimited list of devices and/or CIDR notation of networks to NOT use for MPI communication -- all devices not matching these specifications will be used (e.g., \"eth0,192.168.0.0/16\"). If set to a non-default value, it is mutually exclusive with btl_tcp_if_include.",
252 "127.0.0.1/8,sppp",
253 OPAL_INFO_LVL_1, &mca_btl_tcp_component.tcp_if_exclude);
254
255 mca_btl_tcp_param_register_int ("free_list_num", NULL, 8, OPAL_INFO_LVL_5, &mca_btl_tcp_component.tcp_free_list_num);
256 mca_btl_tcp_param_register_int ("free_list_max", NULL, -1, OPAL_INFO_LVL_5, &mca_btl_tcp_component.tcp_free_list_max);
257 mca_btl_tcp_param_register_int ("free_list_inc", NULL, 32, OPAL_INFO_LVL_5, &mca_btl_tcp_component.tcp_free_list_inc);
258 mca_btl_tcp_param_register_int ("sndbuf",
259 "The size of the send buffer socket option for each connection. "
260 "Modern TCP stacks generally are smarter than a fixed size and in some "
261 "situations setting a buffer size explicitly can actually lower "
262 "performance. 0 means the tcp btl will not try to set a send buffer "
263 "size.",
264 0, OPAL_INFO_LVL_4, &mca_btl_tcp_component.tcp_sndbuf);
265 mca_btl_tcp_param_register_int ("rcvbuf",
266 "The size of the receive buffer socket option for each connection. "
267 "Modern TCP stacks generally are smarter than a fixed size and in some "
268 "situations setting a buffer size explicitly can actually lower "
269 "performance. 0 means the tcp btl will not try to set a receive buffer "
270 "size.",
271 0, OPAL_INFO_LVL_4, &mca_btl_tcp_component.tcp_rcvbuf);
272 mca_btl_tcp_param_register_int ("endpoint_cache",
273 "The size of the internal cache for each TCP connection. This cache is"
274 " used to reduce the number of syscalls, by replacing them with memcpy."
275 " Every read will read the expected data plus the amount of the"
276 " endpoint_cache", 30*1024, OPAL_INFO_LVL_4, &mca_btl_tcp_component.tcp_endpoint_cache);
277 mca_btl_tcp_param_register_int ("use_nagle", "Whether to use Nagle's algorithm or not (using Nagle's algorithm may increase short message latency)",
278 0, OPAL_INFO_LVL_4, &mca_btl_tcp_component.tcp_not_use_nodelay);
279 mca_btl_tcp_param_register_int( "port_min_v4",
280 "The minimum port where the TCP BTL will try to bind (default 1024)",
281 1024, OPAL_INFO_LVL_2, &mca_btl_tcp_component.tcp_port_min);
282
283 opal_asprintf( &message,
284 "The number of ports where the TCP BTL will try to bind (default %d)."
285 " This parameter together with the port min, define a range of ports"
286 " where Open MPI will open sockets.",
287 (0x1 << 16) - mca_btl_tcp_component.tcp_port_min - 1 );
288 mca_btl_tcp_param_register_int( "port_range_v4", message,
289 (0x1 << 16) - mca_btl_tcp_component.tcp_port_min - 1,
290 OPAL_INFO_LVL_2, &mca_btl_tcp_component.tcp_port_range);
291 free(message);
292 #if OPAL_ENABLE_IPV6
293 mca_btl_tcp_param_register_int( "port_min_v6",
294 "The minimum port where the TCP BTL will try to bind (default 1024)", 1024,
295 OPAL_INFO_LVL_2, & mca_btl_tcp_component.tcp6_port_min );
296 opal_asprintf( &message,
297 "The number of ports where the TCP BTL will try to bind (default %d)."
298 " This parameter together with the port min, define a range of ports"
299 " where Open MPI will open sockets.",
300 (0x1 << 16) - mca_btl_tcp_component.tcp6_port_min - 1 );
301 mca_btl_tcp_param_register_int( "port_range_v6", message,
302 (0x1 << 16) - mca_btl_tcp_component.tcp6_port_min - 1,
303 OPAL_INFO_LVL_2, &mca_btl_tcp_component.tcp6_port_range );
304 free(message);
305 #endif
306
307
308 mca_btl_tcp_param_register_int ("progress_thread", NULL, 0, OPAL_INFO_LVL_1,
309 &mca_btl_tcp_component.tcp_enable_progress_thread);
310 mca_btl_tcp_component.report_all_unfound_interfaces = false;
311 (void) mca_base_component_var_register(&mca_btl_tcp_component.super.btl_version,
312 "warn_all_unfound_interfaces",
313 "Issue a warning for all unfound interfaces included in if_exclude",
314 MCA_BASE_VAR_TYPE_BOOL,
315 NULL, 0, 0, OPAL_INFO_LVL_2,
316 MCA_BASE_VAR_SCOPE_READONLY, &mca_btl_tcp_component.report_all_unfound_interfaces);
317
318 mca_btl_tcp_module.super.btl_exclusivity = MCA_BTL_EXCLUSIVITY_LOW + 100;
319 mca_btl_tcp_module.super.btl_eager_limit = 64*1024;
320 mca_btl_tcp_module.super.btl_rndv_eager_limit = 64*1024;
321 mca_btl_tcp_module.super.btl_max_send_size = 128*1024;
322 mca_btl_tcp_module.super.btl_rdma_pipeline_send_length = 128*1024;
323
324
325
326
327
328 mca_btl_tcp_module.super.btl_rdma_pipeline_frag_size = ((1UL<<31) - 1024);
329 mca_btl_tcp_module.super.btl_min_rdma_pipeline_size = 0;
330 mca_btl_tcp_module.super.btl_flags = MCA_BTL_FLAGS_PUT |
331 MCA_BTL_FLAGS_SEND_INPLACE |
332 MCA_BTL_FLAGS_NEED_CSUM |
333 MCA_BTL_FLAGS_NEED_ACK |
334 MCA_BTL_FLAGS_HETEROGENEOUS_RDMA |
335 MCA_BTL_FLAGS_SEND;
336
337
338
339
340 mca_btl_tcp_module.super.btl_bandwidth = 0;
341 mca_btl_tcp_module.super.btl_latency = 0;
342
343 mca_btl_base_param_register(&mca_btl_tcp_component.super.btl_version,
344 &mca_btl_tcp_module.super);
345 if (mca_btl_tcp_module.super.btl_rdma_pipeline_frag_size > ((1UL<<31) - 1024) ) {
346
347
348 mca_btl_tcp_module.super.btl_rdma_pipeline_frag_size = ((1UL<<31) - 1024);
349 }
350 mca_btl_tcp_param_register_int ("disable_family", NULL, 0, OPAL_INFO_LVL_2, &mca_btl_tcp_component.tcp_disable_family);
351
352 return mca_btl_tcp_component_verify();
353 }
354
355 static int mca_btl_tcp_component_open(void)
356 {
357 if (OPAL_SUCCESS != mca_btl_tcp_component_verify()) {
358 return OPAL_ERROR;
359 }
360
361
362 mca_btl_tcp_component.tcp_listen_sd = -1;
363 #if OPAL_ENABLE_IPV6
364 mca_btl_tcp_component.tcp6_listen_sd = -1;
365 #endif
366 mca_btl_tcp_component.tcp_num_btls = 0;
367 mca_btl_tcp_component.tcp_addr_count = 0;
368 mca_btl_tcp_component.tcp_btls = NULL;
369
370
371 OBJ_CONSTRUCT(&mca_btl_tcp_component.tcp_lock, opal_mutex_t);
372 OBJ_CONSTRUCT(&mca_btl_tcp_component.tcp_procs, opal_proc_table_t);
373 OBJ_CONSTRUCT(&mca_btl_tcp_component.tcp_events, opal_list_t);
374 OBJ_CONSTRUCT(&mca_btl_tcp_component.tcp_frag_eager, opal_free_list_t);
375 OBJ_CONSTRUCT(&mca_btl_tcp_component.tcp_frag_max, opal_free_list_t);
376 OBJ_CONSTRUCT(&mca_btl_tcp_component.tcp_frag_user, opal_free_list_t);
377 opal_proc_table_init(&mca_btl_tcp_component.tcp_procs, 16, 256);
378
379 OBJ_CONSTRUCT(&mca_btl_tcp_component.tcp_frag_eager_mutex, opal_mutex_t);
380 OBJ_CONSTRUCT(&mca_btl_tcp_component.tcp_frag_max_mutex, opal_mutex_t);
381 OBJ_CONSTRUCT(&mca_btl_tcp_component.tcp_frag_user_mutex, opal_mutex_t);
382 OBJ_CONSTRUCT(&mca_btl_tcp_ready_frag_mutex, opal_mutex_t);
383 OBJ_CONSTRUCT(&mca_btl_tcp_ready_frag_pending_queue, opal_list_t);
384
385
386 if (OPAL_SUCCESS !=
387 mca_base_var_check_exclusive("opal",
388 mca_btl_tcp_component.super.btl_version.mca_type_name,
389 mca_btl_tcp_component.super.btl_version.mca_component_name,
390 "if_include",
391 mca_btl_tcp_component.super.btl_version.mca_type_name,
392 mca_btl_tcp_component.super.btl_version.mca_component_name,
393 "if_exclude")) {
394
395
396 return OPAL_ERR_NOT_AVAILABLE;
397 }
398
399 return OPAL_SUCCESS;
400 }
401
402
403
404
405
406
407 static int mca_btl_tcp_component_close(void)
408 {
409 mca_btl_tcp_event_t *event, *next;
410
411
412
413
414
415 if( (NULL != mca_btl_tcp_event_base) &&
416 (mca_btl_tcp_event_base != opal_sync_event_base) ) {
417
418 if( -1 != mca_btl_tcp_progress_thread_trigger ) {
419 void* ret = NULL;
420
421 mca_btl_tcp_progress_thread_trigger = 0;
422
423 if( -1 != mca_btl_tcp_pipe_to_progress[1] ) {
424 close(mca_btl_tcp_pipe_to_progress[1]);
425 mca_btl_tcp_pipe_to_progress[1] = -1;
426 }
427
428 opal_thread_join(&mca_btl_tcp_progress_thread, &ret);
429 assert( -1 == mca_btl_tcp_progress_thread_trigger );
430 }
431 opal_event_del(&mca_btl_tcp_component.tcp_recv_thread_async_event);
432 opal_event_base_free(mca_btl_tcp_event_base);
433 mca_btl_tcp_event_base = NULL;
434
435
436 if( -1 != mca_btl_tcp_pipe_to_progress[0] ) {
437 close(mca_btl_tcp_pipe_to_progress[0]);
438 mca_btl_tcp_pipe_to_progress[0] = -1;
439 }
440 }
441
442 OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_frag_eager_mutex);
443 OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_frag_max_mutex);
444
445 OBJ_DESTRUCT(&mca_btl_tcp_ready_frag_mutex);
446 OBJ_DESTRUCT(&mca_btl_tcp_ready_frag_pending_queue);
447
448 if (NULL != mca_btl_tcp_component.tcp_btls) {
449 free(mca_btl_tcp_component.tcp_btls);
450 }
451
452 if (mca_btl_tcp_component.tcp_listen_sd >= 0) {
453 opal_event_del(&mca_btl_tcp_component.tcp_recv_event);
454 CLOSE_THE_SOCKET(mca_btl_tcp_component.tcp_listen_sd);
455 mca_btl_tcp_component.tcp_listen_sd = -1;
456 }
457 #if OPAL_ENABLE_IPV6
458 if (mca_btl_tcp_component.tcp6_listen_sd >= 0) {
459 opal_event_del(&mca_btl_tcp_component.tcp6_recv_event);
460 CLOSE_THE_SOCKET(mca_btl_tcp_component.tcp6_listen_sd);
461 mca_btl_tcp_component.tcp6_listen_sd = -1;
462 }
463 #endif
464
465
466
467 OPAL_LIST_FOREACH_SAFE(event, next, &mca_btl_tcp_component.tcp_events, mca_btl_tcp_event_t) {
468 opal_event_del(&event->event);
469 OBJ_RELEASE(event);
470 }
471
472 opal_proc_table_remove_value(&mca_btl_tcp_component.tcp_procs, opal_proc_local_get()->proc_name);
473
474
475 OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_procs);
476 OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_frag_eager);
477 OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_frag_max);
478 OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_frag_user);
479 OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_lock);
480
481 #if OPAL_CUDA_SUPPORT
482 mca_common_cuda_fini();
483 #endif
484
485 return OPAL_SUCCESS;
486 }
487
488
489
490
491
492
493 static int mca_btl_tcp_create(const int if_kindex, const char* if_name)
494 {
495 struct mca_btl_tcp_module_t* btl;
496 char param[256];
497 int i;
498 struct sockaddr_storage addr;
499 bool found = false;
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518 for (i = opal_ifbegin() ; i >= 0 ; i = opal_ifnext(i)) {
519 int ret;
520
521 if (if_kindex != opal_ifindextokindex(i)) {
522 continue;
523 }
524
525 ret = opal_ifindextoaddr(i, (struct sockaddr*)&addr,
526 sizeof(struct sockaddr_storage));
527 if (OPAL_SUCCESS != ret) {
528 return ret;
529 }
530
531 if (addr.ss_family == AF_INET &&
532 4 != mca_btl_tcp_component.tcp_disable_family) {
533 found = true;
534 break;
535 } else if (addr.ss_family == AF_INET6 &&
536 6 != mca_btl_tcp_component.tcp_disable_family) {
537 found = true;
538 break;
539 }
540 }
541
542
543 if (!found) {
544 return OPAL_SUCCESS;
545 }
546
547 for( i = 0; i < (int)mca_btl_tcp_component.tcp_num_links; i++ ) {
548 btl = (struct mca_btl_tcp_module_t *)malloc(sizeof(mca_btl_tcp_module_t));
549 if(NULL == btl)
550 return OPAL_ERR_OUT_OF_RESOURCE;
551 memcpy(btl, &mca_btl_tcp_module, sizeof(mca_btl_tcp_module));
552 OBJ_CONSTRUCT(&btl->tcp_endpoints, opal_list_t);
553 OBJ_CONSTRUCT(&btl->tcp_endpoints_mutex, opal_mutex_t);
554 mca_btl_tcp_component.tcp_btls[mca_btl_tcp_component.tcp_num_btls++] = btl;
555
556
557 btl->tcp_ifkindex = (uint16_t) if_kindex;
558 #if MCA_BTL_TCP_STATISTICS
559 btl->tcp_bytes_recv = 0;
560 btl->tcp_bytes_sent = 0;
561 btl->tcp_send_handler = 0;
562 #endif
563
564 memcpy(&btl->tcp_ifaddr, &addr, sizeof(struct sockaddr_storage));
565
566
567 sprintf(param, "bandwidth_%s", if_name);
568 mca_btl_tcp_param_register_uint(param, NULL, btl->super.btl_bandwidth, OPAL_INFO_LVL_5, &btl->super.btl_bandwidth);
569
570
571 sprintf(param, "latency_%s", if_name);
572 mca_btl_tcp_param_register_uint(param, NULL, btl->super.btl_latency, OPAL_INFO_LVL_5, &btl->super.btl_latency);
573 if( i > 0 ) {
574 btl->super.btl_bandwidth >>= 1;
575 btl->super.btl_latency <<= 1;
576 }
577
578
579 sprintf(param, "bandwidth_%s:%d", if_name, i);
580 mca_btl_tcp_param_register_uint(param, NULL, btl->super.btl_bandwidth, OPAL_INFO_LVL_5, &btl->super.btl_bandwidth);
581
582
583 sprintf(param, "latency_%s:%d", if_name, i);
584 mca_btl_tcp_param_register_uint(param, NULL, btl->super.btl_latency, OPAL_INFO_LVL_5, &btl->super.btl_latency);
585
586
587
588
589
590
591 if (0 == btl->super.btl_bandwidth) {
592 unsigned int speed = opal_ethtool_get_speed(if_name);
593 btl->super.btl_bandwidth = (speed == 0) ? MCA_BTL_TCP_BTL_BANDWIDTH : speed;
594 if (i > 0) {
595 btl->super.btl_bandwidth >>= 1;
596 }
597 }
598
599 if (0 == btl->super.btl_latency) {
600 btl->super.btl_latency = MCA_BTL_TCP_BTL_LATENCY;
601 if (i > 0) {
602 btl->super.btl_latency <<= 1;
603 }
604 }
605
606 opal_output_verbose(5, opal_btl_base_framework.framework_output,
607 "btl:tcp: %p: if %s kidx %d cnt %i addr %s %s bw %d lt %d\n",
608 (void*)btl, if_name, (int) btl->tcp_ifkindex, i,
609 opal_net_get_hostname((struct sockaddr*)&btl->tcp_ifaddr),
610 (btl->tcp_ifaddr.ss_family == AF_INET) ? "IPv4" : "IPv6",
611 btl->super.btl_bandwidth, btl->super.btl_latency);
612 }
613 return OPAL_SUCCESS;
614 }
615
616
617
618
619
620
621 static char **split_and_resolve(char **orig_str, char *name, bool reqd)
622 {
623 int i, ret, save, if_index;
624 char **argv, *str, *tmp;
625 char if_name[IF_NAMESIZE];
626 struct sockaddr_storage argv_inaddr, if_inaddr;
627 uint32_t argv_prefix;
628
629
630 if (NULL == orig_str || NULL == *orig_str) {
631 return NULL;
632 }
633
634 argv = opal_argv_split(*orig_str, ',');
635 if (NULL == argv) {
636 return NULL;
637 }
638 for (save = i = 0; NULL != argv[i]; ++i) {
639 if (isalpha(argv[i][0])) {
640 argv[save++] = argv[i];
641 continue;
642 }
643
644
645
646 argv_prefix = 0;
647 tmp = strdup(argv[i]);
648 str = strchr(argv[i], '/');
649 if (NULL == str) {
650 opal_show_help("help-mpi-btl-tcp.txt", "invalid if_inexclude",
651 true, name, opal_process_info.nodename,
652 tmp, "Invalid specification (missing \"/\")");
653 free(argv[i]);
654 free(tmp);
655 continue;
656 }
657 *str = '\0';
658 argv_prefix = atoi(str + 1);
659
660
661 ((struct sockaddr*) &argv_inaddr)->sa_family = AF_INET;
662 ret = inet_pton(AF_INET, argv[i],
663 &((struct sockaddr_in*) &argv_inaddr)->sin_addr);
664 free(argv[i]);
665
666 if (1 != ret) {
667 opal_show_help("help-mpi-btl-tcp.txt", "invalid if_inexclude",
668 true, name, opal_process_info.nodename, tmp,
669 "Invalid specification (inet_pton() failed)");
670 free(tmp);
671 continue;
672 }
673 opal_output_verbose(20, opal_btl_base_framework.framework_output,
674 "btl: tcp: Searching for %s address+prefix: %s / %u",
675 name,
676 opal_net_get_hostname((struct sockaddr*) &argv_inaddr),
677 argv_prefix);
678
679
680 for (if_index = opal_ifbegin(); if_index >= 0;
681 if_index = opal_ifnext(if_index)) {
682 opal_ifindextoaddr(if_index,
683 (struct sockaddr*) &if_inaddr,
684 sizeof(if_inaddr));
685 if (opal_net_samenetwork((struct sockaddr*) &argv_inaddr,
686 (struct sockaddr*) &if_inaddr,
687 argv_prefix)) {
688 break;
689 }
690 }
691
692
693 if (if_index < 0) {
694 if (reqd || mca_btl_tcp_component.report_all_unfound_interfaces) {
695 opal_show_help("help-mpi-btl-tcp.txt", "invalid if_inexclude",
696 true, name, opal_process_info.nodename, tmp,
697 "Did not find interface matching this subnet");
698 }
699 free(tmp);
700 continue;
701 }
702
703
704
705 opal_ifindextoname(if_index, if_name, sizeof(if_name));
706 opal_output_verbose(20, opal_btl_base_framework.framework_output,
707 "btl: tcp: Found match: %s (%s)",
708 opal_net_get_hostname((struct sockaddr*) &if_inaddr),
709 if_name);
710 argv[save++] = strdup(if_name);
711 free(tmp);
712 }
713
714
715
716 argv[save] = NULL;
717 free(*orig_str);
718 *orig_str = opal_argv_join(argv, ',');
719 return argv;
720 }
721
722
723
724
725
726
727
728
729
730 static int mca_btl_tcp_component_create_instances(void)
731 {
732 const int if_count = opal_ifcount();
733 int if_index;
734 int kif_count = 0;
735 int *kindexes;
736 char **include = NULL;
737 char **exclude = NULL;
738 char **argv;
739 int ret = OPAL_SUCCESS;
740
741 if(if_count <= 0) {
742 return OPAL_ERROR;
743 }
744
745 kindexes = (int *) malloc(sizeof(int) * if_count);
746 if (NULL == kindexes) {
747 return OPAL_ERR_OUT_OF_RESOURCE;
748 }
749
750
751 {
752 int j;
753
754
755
756 memset (kindexes, 0, sizeof(int) * if_count);
757
758
759
760
761 for(if_index = opal_ifbegin(); if_index >= 0; if_index = opal_ifnext(if_index)){
762 int index = opal_ifindextokindex (if_index);
763
764 if (index > 0) {
765 bool want_this_if = true;
766
767
768 for (j = 0; want_this_if && (j < kif_count); j++) {
769 if (kindexes[j] == index) {
770 want_this_if = false;
771 }
772 }
773
774 if (want_this_if) {
775 kindexes[kif_count] = index;
776 kif_count++;
777 }
778 }
779 }
780 }
781
782
783 mca_btl_tcp_component.tcp_btls = (mca_btl_tcp_module_t**)malloc(mca_btl_tcp_component.tcp_num_links *
784 kif_count * sizeof(mca_btl_tcp_module_t*));
785 if(NULL == mca_btl_tcp_component.tcp_btls) {
786 ret = OPAL_ERR_OUT_OF_RESOURCE;
787 goto cleanup;
788 }
789
790 mca_btl_tcp_component.tcp_addr_count = if_count;
791
792
793 argv = include = split_and_resolve(&mca_btl_tcp_component.tcp_if_include,
794 "include", true);
795 while(argv && *argv) {
796 char* if_name = *argv;
797 int if_index = opal_ifnametokindex(if_name);
798 if(if_index < 0) {
799 opal_show_help("help-mpi-btl-tcp.txt", "invalid if_inexclude",
800 true, "include", opal_process_info.nodename,
801 if_name, "Unknown interface name");
802 ret = OPAL_ERR_NOT_FOUND;
803 goto cleanup;
804 }
805 mca_btl_tcp_create(if_index, if_name);
806 argv++;
807 }
808
809
810
811 if (mca_btl_tcp_component.tcp_num_btls > 0) {
812 ret = OPAL_SUCCESS;
813 goto cleanup;
814 }
815
816
817
818
819 exclude = split_and_resolve(&mca_btl_tcp_component.tcp_if_exclude,
820 "exclude", false);
821 {
822 int i;
823 for(i = 0; i < kif_count; i++) {
824
825 char if_name[IF_NAMESIZE];
826 if_index = kindexes[i];
827
828 opal_ifkindextoname(if_index, if_name, sizeof(if_name));
829
830
831 argv = exclude;
832 while(argv && *argv) {
833 if(strncmp(*argv,if_name,strlen(*argv)) == 0)
834 break;
835 argv++;
836 }
837
838 if(argv == 0 || *argv == 0) {
839 mca_btl_tcp_create(if_index, if_name);
840 }
841 }
842 }
843
844 cleanup:
845 if (NULL != include) {
846 opal_argv_free(include);
847 }
848 if (NULL != exclude) {
849 opal_argv_free(exclude);
850 }
851 if (NULL != kindexes) {
852 free(kindexes);
853 }
854 return ret;
855 }
856
857 static void* mca_btl_tcp_progress_thread_engine(opal_object_t *obj)
858 {
859 opal_thread_t* current_thread = (opal_thread_t*)obj;
860
861 while( 1 == (*((int*)current_thread->t_arg)) ) {
862 opal_event_loop(mca_btl_tcp_event_base, OPAL_EVLOOP_ONCE);
863 }
864 (*((int*)current_thread->t_arg)) = -1;
865 return NULL;
866 }
867
868 static void mca_btl_tcp_component_event_async_handler(int fd, short unused, void *context)
869 {
870 opal_event_t* event;
871 int rc;
872
873 rc = read(fd, (void*)&event, sizeof(opal_event_t*));
874 assert( fd == mca_btl_tcp_pipe_to_progress[0] );
875 if( 0 == rc ) {
876
877 opal_thread_t* current_thread = (opal_thread_t*)context;
878 (*((int*)current_thread->t_arg)) = 0;
879 } else {
880 opal_event_add(event, 0);
881 }
882 }
883
884
885
886
887
888 static int mca_btl_tcp_component_create_listen(uint16_t af_family)
889 {
890 int flags, sd, rc;
891 struct sockaddr_storage inaddr;
892 opal_socklen_t addrlen;
893
894
895 sd = socket(af_family, SOCK_STREAM, 0);
896 if(sd < 0) {
897 if (EAFNOSUPPORT != opal_socket_errno) {
898 BTL_ERROR(("socket() failed: %s (%d)",
899 strerror(opal_socket_errno), opal_socket_errno));
900 }
901 return OPAL_ERR_IN_ERRNO;
902 }
903
904 mca_btl_tcp_set_socket_options(sd);
905
906 #if OPAL_ENABLE_IPV6
907 {
908 struct addrinfo hints, *res = NULL;
909
910 memset (&hints, 0, sizeof(hints));
911 hints.ai_family = af_family;
912 hints.ai_socktype = SOCK_STREAM;
913 hints.ai_flags = AI_PASSIVE;
914
915 if ((rc = getaddrinfo(NULL, "0", &hints, &res))) {
916 opal_output (0,
917 "mca_btl_tcp_create_listen: unable to resolve. %s\n",
918 gai_strerror (rc));
919 CLOSE_THE_SOCKET(sd);
920 return OPAL_ERROR;
921 }
922
923 memcpy (&inaddr, res->ai_addr, res->ai_addrlen);
924 addrlen = res->ai_addrlen;
925 freeaddrinfo (res);
926
927 #ifdef IPV6_V6ONLY
928
929
930
931
932
933
934
935 if (AF_INET6 == af_family) {
936 int flg = 1;
937 if (setsockopt (sd, IPPROTO_IPV6, IPV6_V6ONLY,
938 (char *) &flg, sizeof (flg)) < 0) {
939 BTL_ERROR(("mca_btl_tcp_create_listen: unable to set IPV6_V6ONLY\n"));
940 }
941 }
942 #endif
943 }
944 #else
945 ((struct sockaddr_in*) &inaddr)->sin_family = AF_INET;
946 ((struct sockaddr_in*) &inaddr)->sin_addr.s_addr = INADDR_ANY;
947 addrlen = sizeof(struct sockaddr_in);
948 #endif
949
950 {
951 int flg = 0;
952 if (setsockopt (sd, SOL_SOCKET, SO_REUSEADDR, (const char *)&flg, sizeof (flg)) < 0) {
953 BTL_ERROR(("mca_btl_tcp_create_listen: unable to unset the "
954 "SO_REUSEADDR option (%s:%d)\n",
955 strerror(opal_socket_errno), opal_socket_errno));
956 CLOSE_THE_SOCKET(sd);
957 return OPAL_ERROR;
958 }
959 }
960
961 {
962 int index, range, port;
963
964 #if OPAL_ENABLE_IPV6
965 if (AF_INET6 == af_family) {
966 range = mca_btl_tcp_component.tcp6_port_range;
967 port = mca_btl_tcp_component.tcp6_port_min;
968 } else
969 #endif
970 {
971 range = mca_btl_tcp_component.tcp_port_range;
972 port = mca_btl_tcp_component.tcp_port_min;
973 }
974
975 for( index = 0; index < range; index++ ) {
976 #if OPAL_ENABLE_IPV6
977 ((struct sockaddr_in6*) &inaddr)->sin6_port = htons(port + index);
978 #else
979 ((struct sockaddr_in*) &inaddr)->sin_port = htons(port + index);
980 #endif
981 opal_output_verbose(30, opal_btl_base_framework.framework_output,
982 "btl:tcp: Attempting to bind to %s port %d",
983 (AF_INET == af_family) ? "AF_INET" : "AF_INET6",
984 port + index);
985 if(bind(sd, (struct sockaddr*)&inaddr, addrlen) < 0) {
986 if( (EADDRINUSE == opal_socket_errno) || (EADDRNOTAVAIL == opal_socket_errno) ) {
987 continue;
988 }
989 BTL_ERROR(("bind() failed: %s (%d)",
990 strerror(opal_socket_errno), opal_socket_errno));
991 CLOSE_THE_SOCKET(sd);
992 return OPAL_ERROR;
993 }
994 opal_output_verbose(30, opal_btl_base_framework.framework_output,
995 "btl:tcp: Successfully bound to %s port %d",
996 (AF_INET == af_family) ? "AF_INET" : "AF_INET6",
997 port + index);
998 goto socket_binded;
999 }
1000 #if OPAL_ENABLE_IPV6
1001 if (AF_INET6 == af_family) {
1002 BTL_ERROR(("bind6() failed: no port available in the range [%d..%d]",
1003 mca_btl_tcp_component.tcp6_port_min,
1004 mca_btl_tcp_component.tcp6_port_min + range));
1005 } else
1006 #endif
1007 {
1008 BTL_ERROR(("bind() failed: no port available in the range [%d..%d]",
1009 mca_btl_tcp_component.tcp_port_min,
1010 mca_btl_tcp_component.tcp_port_min + range));
1011 }
1012 CLOSE_THE_SOCKET(sd);
1013 return OPAL_ERROR;
1014 }
1015 socket_binded:
1016
1017 if(getsockname(sd, (struct sockaddr*)&inaddr, &addrlen) < 0) {
1018 BTL_ERROR(("getsockname() failed: %s (%d)",
1019 strerror(opal_socket_errno), opal_socket_errno));
1020 CLOSE_THE_SOCKET(sd);
1021 return OPAL_ERROR;
1022 }
1023
1024 #if OPAL_ENABLE_IPV6
1025 if (AF_INET6 == af_family) {
1026 mca_btl_tcp_component.tcp6_listen_port = ((struct sockaddr_in6*) &inaddr)->sin6_port;
1027 mca_btl_tcp_component.tcp6_listen_sd = sd;
1028 opal_output_verbose(30, opal_btl_base_framework.framework_output,
1029 "btl:tcp: my listening v6 socket port is %d",
1030 ntohs(mca_btl_tcp_component.tcp6_listen_port));
1031 } else
1032 #endif
1033 {
1034 char str[16];
1035 mca_btl_tcp_component.tcp_listen_port = ((struct sockaddr_in*) &inaddr)->sin_port;
1036 mca_btl_tcp_component.tcp_listen_sd = sd;
1037 inet_ntop(AF_INET, &(((struct sockaddr_in*)&inaddr)->sin_addr), str, sizeof(str));
1038 opal_output_verbose(30, opal_btl_base_framework.framework_output,
1039 "btl:tcp: my listening v4 socket is %s:%u",
1040 str, ntohs(mca_btl_tcp_component.tcp_listen_port));
1041 }
1042
1043
1044 if(listen(sd, SOMAXCONN) < 0) {
1045 BTL_ERROR(("listen() failed: %s (%d)",
1046 strerror(opal_socket_errno), opal_socket_errno));
1047 CLOSE_THE_SOCKET(sd);
1048 return OPAL_ERROR;
1049 }
1050
1051
1052 if((flags = fcntl(sd, F_GETFL, 0)) < 0) {
1053 opal_show_help("help-mpi-btl-tcp.txt", "socket flag fail",
1054 true, opal_process_info.nodename,
1055 getpid(), "fcntl(sd, F_GETFL, 0)",
1056 strerror(opal_socket_errno), opal_socket_errno);
1057 CLOSE_THE_SOCKET(sd);
1058 return OPAL_ERROR;
1059 } else {
1060 flags |= O_NONBLOCK;
1061 if(fcntl(sd, F_SETFL, flags) < 0) {
1062 opal_show_help("help-mpi-btl-tcp.txt", "socket flag fail",
1063 true, opal_process_info.nodename,
1064 getpid(),
1065 "fcntl(sd, F_SETFL, flags & O_NONBLOCK)",
1066 strerror(opal_socket_errno), opal_socket_errno);
1067 CLOSE_THE_SOCKET(sd);
1068 return OPAL_ERROR;
1069 }
1070 }
1071
1072 if(mca_btl_tcp_component.tcp_enable_progress_thread){
1073
1074 opal_event_use_threads();
1075 if( NULL == mca_btl_tcp_event_base ) {
1076
1077
1078 if( NULL == (mca_btl_tcp_event_base = opal_event_base_create()) ) {
1079 BTL_ERROR(("BTL TCP failed to create progress event base"));
1080 goto move_forward_with_no_thread;
1081 }
1082 opal_event_base_priority_init(mca_btl_tcp_event_base, OPAL_EVENT_NUM_PRI);
1083
1084
1085 OBJ_CONSTRUCT(&mca_btl_tcp_progress_thread, opal_thread_t);
1086
1087
1088
1089
1090 if (0 != pipe(mca_btl_tcp_pipe_to_progress)) {
1091 opal_event_base_free(mca_btl_tcp_event_base);
1092
1093 mca_btl_tcp_event_base = opal_sync_event_base;
1094 mca_btl_tcp_progress_thread_trigger = -1;
1095 goto move_forward_with_no_thread;
1096 }
1097
1098 if((flags = fcntl(mca_btl_tcp_pipe_to_progress[0], F_GETFL, 0)) < 0) {
1099 BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)",
1100 strerror(opal_socket_errno), opal_socket_errno));
1101 } else {
1102 flags |= O_NONBLOCK;
1103 if(fcntl(mca_btl_tcp_pipe_to_progress[0], F_SETFL, flags) < 0)
1104 BTL_ERROR(("fcntl(F_SETFL) failed: %s (%d)",
1105 strerror(opal_socket_errno), opal_socket_errno));
1106 }
1107
1108 opal_event_set(mca_btl_tcp_event_base, &mca_btl_tcp_component.tcp_recv_thread_async_event,
1109 mca_btl_tcp_pipe_to_progress[0],
1110 OPAL_EV_READ|OPAL_EV_PERSIST,
1111 mca_btl_tcp_component_event_async_handler,
1112 &mca_btl_tcp_progress_thread );
1113 opal_event_add(&mca_btl_tcp_component.tcp_recv_thread_async_event, 0);
1114
1115
1116 mca_btl_tcp_progress_thread.t_run = mca_btl_tcp_progress_thread_engine;
1117 mca_btl_tcp_progress_thread.t_arg = &mca_btl_tcp_progress_thread_trigger;
1118 mca_btl_tcp_progress_thread_trigger = 1;
1119 if( OPAL_SUCCESS != (rc = opal_thread_start(&mca_btl_tcp_progress_thread)) ) {
1120 BTL_ERROR(("BTL TCP progress thread initialization failed (%d)", rc));
1121 opal_event_base_free(mca_btl_tcp_event_base);
1122
1123 mca_btl_tcp_event_base = opal_sync_event_base;
1124 mca_btl_tcp_progress_thread_trigger = -1;
1125 goto move_forward_with_no_thread;
1126 }
1127
1128 opal_set_using_threads(true);
1129 }
1130 }
1131 else {
1132 move_forward_with_no_thread:
1133 mca_btl_tcp_event_base = opal_sync_event_base;
1134 }
1135
1136 if (AF_INET == af_family) {
1137 opal_event_set(mca_btl_tcp_event_base, &mca_btl_tcp_component.tcp_recv_event,
1138 mca_btl_tcp_component.tcp_listen_sd,
1139 OPAL_EV_READ|OPAL_EV_PERSIST,
1140 mca_btl_tcp_component_accept_handler,
1141 0 );
1142 MCA_BTL_TCP_ACTIVATE_EVENT(&mca_btl_tcp_component.tcp_recv_event, 0);
1143 }
1144 #if OPAL_ENABLE_IPV6
1145 if (AF_INET6 == af_family) {
1146 opal_event_set(mca_btl_tcp_event_base, &mca_btl_tcp_component.tcp6_recv_event,
1147 mca_btl_tcp_component.tcp6_listen_sd,
1148 OPAL_EV_READ|OPAL_EV_PERSIST,
1149 mca_btl_tcp_component_accept_handler,
1150 0 );
1151 MCA_BTL_TCP_ACTIVATE_EVENT(&mca_btl_tcp_component.tcp6_recv_event, 0);
1152 }
1153 #endif
1154 return OPAL_SUCCESS;
1155 }
1156
1157
1158
1159
1160
1161
1162 static int mca_btl_tcp_component_exchange(void)
1163 {
1164 int rc;
1165 size_t i;
1166 size_t num_btls = mca_btl_tcp_component.tcp_num_btls;
1167 size_t size = num_btls * sizeof(mca_btl_tcp_modex_addr_t);
1168 mca_btl_tcp_modex_addr_t *addrs;
1169
1170 if (num_btls <= 0) {
1171 return 0;
1172 }
1173
1174 addrs = (mca_btl_tcp_modex_addr_t*)malloc(size);
1175 if (NULL == addrs) {
1176 return OPAL_ERR_OUT_OF_RESOURCE;
1177 }
1178 memset(addrs, 0, size);
1179
1180 for (i = 0 ; i < num_btls ; i++) {
1181 struct mca_btl_tcp_module_t *btl = mca_btl_tcp_component.tcp_btls[i];
1182 struct sockaddr *addr = (struct sockaddr*)&(btl->tcp_ifaddr);
1183
1184 #if OPAL_ENABLE_IPV6
1185 if (AF_INET6 == addr->sa_family) {
1186 struct sockaddr_in6 *inaddr6 = (struct sockaddr_in6*)addr;
1187
1188 memcpy(&addrs[i].addr, &(inaddr6->sin6_addr),
1189 sizeof(struct in6_addr));
1190 addrs[i].addr_port = mca_btl_tcp_component.tcp6_listen_port;
1191 addrs[i].addr_ifkindex = btl->tcp_ifkindex;
1192 addrs[i].addr_family = MCA_BTL_TCP_AF_INET6;
1193 opal_output_verbose(5, opal_btl_base_framework.framework_output,
1194 "btl: tcp: exchange: %d %d IPv6 %s",
1195 (int)i, btl->tcp_ifkindex,
1196 opal_net_get_hostname(addr));
1197 } else
1198 #endif
1199 if (AF_INET == addr->sa_family) {
1200 struct sockaddr_in *inaddr = (struct sockaddr_in*)addr;
1201
1202 memcpy(&addrs[i].addr, &(inaddr->sin_addr),
1203 sizeof(struct in_addr));
1204 addrs[i].addr_port = mca_btl_tcp_component.tcp_listen_port;
1205 addrs[i].addr_ifkindex = btl->tcp_ifkindex;
1206 addrs[i].addr_family = MCA_BTL_TCP_AF_INET;
1207 opal_output_verbose(5, opal_btl_base_framework.framework_output,
1208 "btl: tcp: exchange: %d %d IPv4 %s",
1209 (int)i, btl->tcp_ifkindex,
1210 opal_net_get_hostname(addr));
1211 } else {
1212 BTL_ERROR(("Unexpected address family: %d", addr->sa_family));
1213 return OPAL_ERR_BAD_PARAM;
1214 }
1215 }
1216
1217 OPAL_MODEX_SEND(rc, OPAL_PMIX_GLOBAL,
1218 &mca_btl_tcp_component.super.btl_version,
1219 addrs, size);
1220 free(addrs);
1221
1222 return rc;
1223 }
1224
1225
1226
1227
1228
1229
1230
1231
1232 mca_btl_base_module_t** mca_btl_tcp_component_init(int *num_btl_modules,
1233 bool enable_progress_threads,
1234 bool enable_mpi_threads)
1235 {
1236 int ret = OPAL_SUCCESS;
1237 unsigned int i;
1238 mca_btl_base_module_t **btls;
1239 *num_btl_modules = 0;
1240
1241
1242 opal_free_list_init( &mca_btl_tcp_component.tcp_frag_eager,
1243 sizeof (mca_btl_tcp_frag_eager_t) +
1244 mca_btl_tcp_module.super.btl_eager_limit,
1245 opal_cache_line_size,
1246 OBJ_CLASS (mca_btl_tcp_frag_eager_t),
1247 0,opal_cache_line_size,
1248 mca_btl_tcp_component.tcp_free_list_num,
1249 mca_btl_tcp_component.tcp_free_list_max,
1250 mca_btl_tcp_component.tcp_free_list_inc,
1251 NULL, 0, NULL, NULL, NULL );
1252
1253 opal_free_list_init( &mca_btl_tcp_component.tcp_frag_max,
1254 sizeof (mca_btl_tcp_frag_max_t) +
1255 mca_btl_tcp_module.super.btl_max_send_size,
1256 opal_cache_line_size,
1257 OBJ_CLASS (mca_btl_tcp_frag_max_t),
1258 0,opal_cache_line_size,
1259 mca_btl_tcp_component.tcp_free_list_num,
1260 mca_btl_tcp_component.tcp_free_list_max,
1261 mca_btl_tcp_component.tcp_free_list_inc,
1262 NULL, 0, NULL, NULL, NULL );
1263
1264 opal_free_list_init( &mca_btl_tcp_component.tcp_frag_user,
1265 sizeof (mca_btl_tcp_frag_user_t),
1266 opal_cache_line_size,
1267 OBJ_CLASS (mca_btl_tcp_frag_user_t),
1268 0,opal_cache_line_size,
1269 mca_btl_tcp_component.tcp_free_list_num,
1270 mca_btl_tcp_component.tcp_free_list_max,
1271 mca_btl_tcp_component.tcp_free_list_inc,
1272 NULL, 0, NULL, NULL, NULL );
1273
1274
1275 if(OPAL_SUCCESS != (ret = mca_btl_tcp_component_create_instances() )) {
1276 return 0;
1277 }
1278
1279
1280 if(OPAL_SUCCESS != (ret = mca_btl_tcp_component_create_listen(AF_INET) )) {
1281 return 0;
1282 }
1283 #if OPAL_ENABLE_IPV6
1284 if((ret = mca_btl_tcp_component_create_listen(AF_INET6)) != OPAL_SUCCESS) {
1285 if (!(OPAL_ERR_IN_ERRNO == ret &&
1286 EAFNOSUPPORT == opal_socket_errno)) {
1287 opal_output (0, "mca_btl_tcp_component: IPv6 listening socket failed\n");
1288 return 0;
1289 }
1290 }
1291 #endif
1292
1293
1294 if(OPAL_SUCCESS != (ret = mca_btl_tcp_component_exchange() )) {
1295 return 0;
1296 }
1297 btls = (mca_btl_base_module_t **)malloc(mca_btl_tcp_component.tcp_num_btls *
1298 sizeof(mca_btl_base_module_t*));
1299 if(NULL == btls) {
1300 return NULL;
1301 }
1302
1303
1304 if (0 < mca_btl_tcp_progress_thread_trigger) {
1305 for( i = 0; i < mca_btl_tcp_component.tcp_num_btls; i++) {
1306 mca_btl_tcp_component.tcp_btls[i]->super.btl_flags |= MCA_BTL_FLAGS_BTL_PROGRESS_THREAD_ENABLED;
1307 }
1308 }
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321 if (mca_btl_tcp_component.tcp_num_btls > 1 &&
1322 (enable_mpi_threads || 0 < mca_btl_tcp_progress_thread_trigger)) {
1323 for( i = 0; i < mca_btl_tcp_component.tcp_num_btls; i++) {
1324 mca_btl_tcp_component.tcp_btls[i]->super.btl_flags |= MCA_BTL_FLAGS_SINGLE_ADD_PROCS;
1325 }
1326 }
1327
1328 #if OPAL_CUDA_SUPPORT
1329 mca_common_cuda_stage_one_init();
1330 #endif
1331
1332 memcpy(btls, mca_btl_tcp_component.tcp_btls, mca_btl_tcp_component.tcp_num_btls*sizeof(mca_btl_tcp_module_t*));
1333 *num_btl_modules = mca_btl_tcp_component.tcp_num_btls;
1334 return btls;
1335 }
1336
1337
1338
1339
1340
1341
1342
1343 static void mca_btl_tcp_component_accept_handler( int incoming_sd,
1344 short ignored,
1345 void* unused )
1346 {
1347 while(true) {
1348 #if OPAL_ENABLE_IPV6
1349 struct sockaddr_in6 addr;
1350 #else
1351 struct sockaddr_in addr;
1352 #endif
1353 opal_socklen_t addrlen = sizeof(addr);
1354
1355 mca_btl_tcp_event_t *event;
1356 int sd = accept(incoming_sd, (struct sockaddr*)&addr, &addrlen);
1357 if(sd < 0) {
1358 if(opal_socket_errno == EINTR)
1359 continue;
1360 if (opal_socket_errno != EAGAIN &&
1361 opal_socket_errno != EWOULDBLOCK) {
1362 opal_show_help("help-mpi-btl-tcp.txt", "accept failed",
1363 true, opal_process_info.nodename,
1364 getpid(),
1365 opal_socket_errno,
1366 strerror(opal_socket_errno));
1367 }
1368 return;
1369 }
1370 mca_btl_tcp_set_socket_options(sd);
1371
1372 assert( NULL != mca_btl_tcp_event_base );
1373
1374 event = OBJ_NEW(mca_btl_tcp_event_t);
1375 opal_event_set(mca_btl_tcp_event_base, &(event->event), sd,
1376 OPAL_EV_READ, mca_btl_tcp_component_recv_handler, event);
1377 opal_event_add(&event->event, 0);
1378 }
1379 }
1380
1381
1382
1383
1384
1385
1386
1387
1388 static void mca_btl_tcp_component_recv_handler(int sd, short flags, void* user)
1389 {
1390 mca_btl_tcp_event_t *event = (mca_btl_tcp_event_t *)user;
1391 opal_process_name_t guid;
1392 struct sockaddr_storage addr;
1393 opal_socklen_t addr_len = sizeof(addr);
1394 mca_btl_tcp_proc_t* btl_proc;
1395 bool sockopt = true;
1396 size_t retval, len = strlen(mca_btl_tcp_magic_id_string);
1397 mca_btl_tcp_endpoint_hs_msg_t hs_msg;
1398 struct timeval save, tv;
1399 socklen_t rcvtimeo_save_len = sizeof(save);
1400
1401
1402
1403
1404
1405
1406
1407
1408 if (0 != getsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, (void*)&save, &rcvtimeo_save_len)) {
1409 if (ENOPROTOOPT == errno || EOPNOTSUPP == errno) {
1410 sockopt = false;
1411 } else {
1412 opal_show_help("help-mpi-btl-tcp.txt", "socket flag fail",
1413 true, opal_process_info.nodename,
1414 getpid(),
1415 "getsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, ...)",
1416 strerror(opal_socket_errno), opal_socket_errno);
1417 return;
1418 }
1419 } else {
1420 tv.tv_sec = 2;
1421 tv.tv_usec = 0;
1422 if (0 != setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))) {
1423 opal_show_help("help-mpi-btl-tcp.txt", "socket flag fail",
1424 true, opal_process_info.nodename,
1425 getpid(),
1426 "setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, ...)",
1427 strerror(opal_socket_errno), opal_socket_errno);
1428 return;
1429 }
1430 }
1431
1432 OBJ_RELEASE(event);
1433 retval = mca_btl_tcp_recv_blocking(sd, (void *)&hs_msg, sizeof(hs_msg));
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444 if (retval < sizeof(hs_msg)) {
1445 const char *peer = opal_fd_get_peer_name(sd);
1446 opal_output_verbose(20, opal_btl_base_framework.framework_output,
1447 "Peer %s closed socket without sending BTL TCP magic ID handshake (we received %d bytes out of the expected %d) -- closing/ignoring this connection",
1448 peer, (int) retval, (int) sizeof(hs_msg));
1449 free((char*) peer);
1450 CLOSE_THE_SOCKET(sd);
1451 return;
1452 }
1453
1454
1455
1456
1457 guid = hs_msg.guid;
1458 if (0 != strncmp(hs_msg.magic_id, mca_btl_tcp_magic_id_string, len)) {
1459 const char *peer = opal_fd_get_peer_name(sd);
1460 opal_output_verbose(20, opal_btl_base_framework.framework_output,
1461 "Peer %s send us an incorrect Open MPI magic ID string (i.e., this was not a connection from the same version of Open MPI; expected \"%s\", received \"%s\")",
1462 peer,
1463 mca_btl_tcp_magic_id_string,
1464 hs_msg.magic_id);
1465 free((char*) peer);
1466
1467
1468 CLOSE_THE_SOCKET(sd);
1469 return;
1470 }
1471
1472 if (sockopt) {
1473
1474 if (0 != setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &save, sizeof(save))) {
1475 opal_show_help("help-mpi-btl-tcp.txt", "socket flag fail",
1476 true, opal_process_info.nodename,
1477 getpid(),
1478 "setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, ...)",
1479 strerror(opal_socket_errno), opal_socket_errno);
1480 return;
1481 }
1482 }
1483
1484 OPAL_PROCESS_NAME_NTOH(guid);
1485
1486
1487 if((flags = fcntl(sd, F_GETFL, 0)) < 0) {
1488 opal_show_help("help-mpi-btl-tcp.txt", "socket flag fail",
1489 true, opal_process_info.nodename,
1490 getpid(), "fcntl(sd, F_GETFL, 0)",
1491 strerror(opal_socket_errno), opal_socket_errno);
1492 CLOSE_THE_SOCKET(sd);
1493 } else {
1494 flags |= O_NONBLOCK;
1495 if(fcntl(sd, F_SETFL, flags) < 0) {
1496 opal_show_help("help-mpi-btl-tcp.txt", "socket flag fail",
1497 true, opal_process_info.nodename,
1498 getpid(),
1499 "fcntl(sd, F_SETFL, flags & O_NONBLOCK)",
1500 strerror(opal_socket_errno), opal_socket_errno);
1501 CLOSE_THE_SOCKET(sd);
1502 }
1503 }
1504
1505
1506 btl_proc = mca_btl_tcp_proc_lookup(&guid);
1507 if(NULL == btl_proc) {
1508 opal_show_help("help-mpi-btl-tcp.txt",
1509 "server accept cannot find guid",
1510 true, opal_process_info.nodename,
1511 getpid());
1512 CLOSE_THE_SOCKET(sd);
1513 return;
1514 }
1515
1516
1517 if(getpeername(sd, (struct sockaddr*)&addr, &addr_len) != 0) {
1518 if (ENOTCONN != opal_socket_errno) {
1519 opal_show_help("help-mpi-btl-tcp.txt",
1520 "server getpeername failed",
1521 true, opal_process_info.nodename,
1522 getpid(),
1523 strerror(opal_socket_errno), opal_socket_errno);
1524 }
1525 CLOSE_THE_SOCKET(sd);
1526 return;
1527 }
1528
1529
1530 (void)mca_btl_tcp_proc_accept(btl_proc, (struct sockaddr*)&addr, sd);
1531
1532 const char *str = opal_fd_get_peer_name(sd);
1533 opal_output_verbose(10, opal_btl_base_framework.framework_output,
1534 "btl:tcp: now connected to %s, process %s", str,
1535 OPAL_NAME_PRINT(btl_proc->proc_opal->proc_name));
1536 free((char*) str);
1537 }