This source file includes following definitions.
- tcp_peer_create_socket
- mca_oob_tcp_peer_try_connect
- tcp_peer_send_connect_ack
- tcp_peer_send_connect_nack
- tcp_peer_event_init
- mca_oob_tcp_peer_complete_connect
- tcp_peer_send_blocking
- retry
- mca_oob_tcp_peer_recv_connect_ack
- tcp_peer_connected
- mca_oob_tcp_peer_close
- tcp_peer_recv_blocking
- mca_oob_tcp_peer_dump
- mca_oob_tcp_peer_accept
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 #include "orte_config.h"
28
29 #ifdef HAVE_UNISTD_H
30 #include <unistd.h>
31 #endif
32 #include <fcntl.h>
33 #include <sys/socket.h>
34
35 #ifdef HAVE_SYS_UIO_H
36 #include <sys/uio.h>
37 #endif
38 #ifdef HAVE_NET_UIO_H
39 #include <net/uio.h>
40 #endif
41 #ifdef HAVE_SYS_TYPES_H
42 #include <sys/types.h>
43 #endif
44 #include "opal/opal_socket_errno.h"
45 #ifdef HAVE_NETINET_IN_H
46 #include <netinet/in.h>
47 #endif
48 #ifdef HAVE_ARPA_INET_H
49 #include <arpa/inet.h>
50 #endif
51 #ifdef HAVE_NETINET_TCP_H
52 #include <netinet/tcp.h>
53 #endif
54
55 #include "opal/types.h"
56 #include "opal_stdint.h"
57 #include "opal/mca/backtrace/backtrace.h"
58 #include "opal/mca/base/mca_base_var.h"
59 #include "opal/util/output.h"
60 #include "opal/util/net.h"
61 #include "opal/util/fd.h"
62 #include "opal/util/error.h"
63 #include "opal/util/show_help.h"
64 #include "opal/class/opal_hash_table.h"
65 #include "opal/mca/event/event.h"
66
67 #include "orte/util/name_fns.h"
68 #include "orte/util/show_help.h"
69 #include "orte/util/threads.h"
70 #include "orte/mca/state/state.h"
71 #include "orte/runtime/orte_globals.h"
72 #include "orte/mca/errmgr/errmgr.h"
73 #include "orte/mca/ess/ess.h"
74 #include "orte/mca/routed/routed.h"
75 #include "orte/runtime/orte_wait.h"
76
77 #include "oob_tcp.h"
78 #include "orte/mca/oob/tcp/oob_tcp_component.h"
79 #include "orte/mca/oob/tcp/oob_tcp_peer.h"
80 #include "orte/mca/oob/tcp/oob_tcp_common.h"
81 #include "orte/mca/oob/tcp/oob_tcp_connection.h"
82 #include "oob_tcp_peer.h"
83 #include "oob_tcp_common.h"
84 #include "oob_tcp_connection.h"
85
86 static void tcp_peer_event_init(mca_oob_tcp_peer_t* peer);
87 static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer);
88 static int tcp_peer_send_connect_nack(int sd, orte_process_name_t name);
89 static int tcp_peer_send_blocking(int sd, void* data, size_t size);
90 static bool tcp_peer_recv_blocking(mca_oob_tcp_peer_t* peer, int sd,
91 void* data, size_t size);
92 static void tcp_peer_connected(mca_oob_tcp_peer_t* peer);
93
94 static int tcp_peer_create_socket(mca_oob_tcp_peer_t* peer, sa_family_t family)
95 {
96 int flags;
97
98 if (peer->sd >= 0) {
99 return ORTE_SUCCESS;
100 }
101
102 OPAL_OUTPUT_VERBOSE((1, orte_oob_base_framework.framework_output,
103 "%s oob:tcp:peer creating socket to %s",
104 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
105 ORTE_NAME_PRINT(&(peer->name))));
106 peer->sd = socket(family, SOCK_STREAM, 0);
107 if (peer->sd < 0) {
108 opal_output(0, "%s-%s tcp_peer_create_socket: socket() failed: %s (%d)\n",
109 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
110 ORTE_NAME_PRINT(&(peer->name)),
111 strerror(opal_socket_errno),
112 opal_socket_errno);
113 return ORTE_ERR_UNREACH;
114 }
115
116
117 if (opal_fd_set_cloexec(peer->sd) != OPAL_SUCCESS) {
118 opal_output(0, "%s unable to set socket to CLOEXEC",
119 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
120 close(peer->sd);
121 peer->sd = -1;
122 return ORTE_ERROR;
123 }
124
125
126 orte_oob_tcp_set_socket_options(peer->sd);
127
128
129 tcp_peer_event_init(peer);
130
131
132 if (peer->sd >= 0) {
133 if((flags = fcntl(peer->sd, F_GETFL, 0)) < 0) {
134 opal_output(0, "%s-%s tcp_peer_connect: fcntl(F_GETFL) failed: %s (%d)\n",
135 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
136 ORTE_NAME_PRINT(&(peer->name)),
137 strerror(opal_socket_errno),
138 opal_socket_errno);
139 } else {
140 flags |= O_NONBLOCK;
141 if(fcntl(peer->sd, F_SETFL, flags) < 0)
142 opal_output(0, "%s-%s tcp_peer_connect: fcntl(F_SETFL) failed: %s (%d)\n",
143 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
144 ORTE_NAME_PRINT(&(peer->name)),
145 strerror(opal_socket_errno),
146 opal_socket_errno);
147 }
148 }
149
150 return ORTE_SUCCESS;
151 }
152
153
154
155
156
157
158 void mca_oob_tcp_peer_try_connect(int fd, short args, void *cbdata)
159 {
160 mca_oob_tcp_conn_op_t *op = (mca_oob_tcp_conn_op_t*)cbdata;
161 mca_oob_tcp_peer_t *peer;
162 int current_socket_family = 0;
163 int rc;
164 opal_socklen_t addrlen = 0;
165 mca_oob_tcp_addr_t *addr;
166 char *host;
167 mca_oob_tcp_send_t *snd;
168 bool connected = false;
169
170 ORTE_ACQUIRE_OBJECT(op);
171 peer = op->peer;
172
173 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
174 "%s orte_tcp_peer_try_connect: "
175 "attempting to connect to proc %s",
176 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
177 ORTE_NAME_PRINT(&(peer->name)));
178
179 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
180 "%s orte_tcp_peer_try_connect: "
181 "attempting to connect to proc %s on socket %d",
182 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
183 ORTE_NAME_PRINT(&(peer->name)), peer->sd);
184
185 peer->active_addr = NULL;
186 OPAL_LIST_FOREACH(addr, &peer->addrs, mca_oob_tcp_addr_t) {
187 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
188 "%s orte_tcp_peer_try_connect: "
189 "attempting to connect to proc %s on %s:%d - %d retries",
190 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
191 ORTE_NAME_PRINT(&(peer->name)),
192 opal_net_get_hostname((struct sockaddr*)&addr->addr),
193 opal_net_get_port((struct sockaddr*)&addr->addr),
194 addr->retries);
195 if (MCA_OOB_TCP_FAILED == addr->state) {
196 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
197 "%s orte_tcp_peer_try_connect: %s:%d is down",
198 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
199 opal_net_get_hostname((struct sockaddr*)&addr->addr),
200 opal_net_get_port((struct sockaddr*)&addr->addr));
201 continue;
202 }
203 if (mca_oob_tcp_component.max_retries < addr->retries) {
204 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
205 "%s orte_tcp_peer_try_connect: %s:%d retries exceeded",
206 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
207 opal_net_get_hostname((struct sockaddr*)&addr->addr),
208 opal_net_get_port((struct sockaddr*)&addr->addr));
209 continue;
210 }
211 peer->active_addr = addr;
212 addrlen = addr->addr.ss_family == AF_INET6 ? sizeof(struct sockaddr_in6)
213 : sizeof(struct sockaddr_in);
214 if (addr->addr.ss_family != current_socket_family) {
215 if (peer->sd >= 0) {
216 CLOSE_THE_SOCKET(peer->sd);
217 peer->sd = -1;
218 }
219 rc = tcp_peer_create_socket(peer, addr->addr.ss_family);
220 current_socket_family = addr->addr.ss_family;
221
222 if (ORTE_SUCCESS != rc) {
223
224
225
226
227
228
229
230
231
232 opal_output(0, "%s CANNOT CREATE SOCKET", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
233 ORTE_FORCED_TERMINATE(1);
234 goto cleanup;
235 }
236 }
237 retry_connect:
238 addr->retries++;
239
240 rc = connect(peer->sd, (struct sockaddr*) &addr->addr, addrlen);
241 if (rc < 0) {
242
243 if (opal_socket_errno == EINPROGRESS || opal_socket_errno == EWOULDBLOCK) {
244 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
245 "%s waiting for connect completion to %s - activating send event",
246 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
247 ORTE_NAME_PRINT(&peer->name));
248
249 if (!peer->send_ev_active) {
250 opal_event_add(&peer->send_event, 0);
251 peer->send_ev_active = true;
252 }
253 OBJ_RELEASE(op);
254 return;
255 }
256
257
258
259
260
261
262 if (ECONNABORTED == opal_socket_errno) {
263 if (addr->retries < mca_oob_tcp_component.max_retries) {
264 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
265 "%s connection aborted by OS to %s - retrying",
266 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
267 ORTE_NAME_PRINT(&peer->name));
268 goto retry_connect;
269 } else {
270
271
272
273 addr->state = MCA_OOB_TCP_FAILED;
274 continue;
275 }
276 }
277 } else {
278
279 addr->retries = 0;
280 connected = true;
281 peer->num_retries = 0;
282 break;
283 }
284 }
285
286 if (!connected) {
287
288
289
290 if (0 < mca_oob_tcp_component.retry_delay) {
291 if (mca_oob_tcp_component.max_recon_attempts < 0 ||
292 peer->num_retries < mca_oob_tcp_component.max_recon_attempts) {
293 struct timeval tv;
294
295 CLOSE_THE_SOCKET(peer->sd);
296
297 OPAL_LIST_FOREACH(addr, &peer->addrs, mca_oob_tcp_addr_t) {
298 addr->state = MCA_OOB_TCP_UNCONNECTED;
299 addr->retries = 0;
300 }
301
302 tv.tv_sec = mca_oob_tcp_component.retry_delay;
303 tv.tv_usec = 0;
304 ++peer->num_retries;
305 ORTE_RETRY_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect, &tv);
306 goto cleanup;
307 }
308 }
309
310 peer->state = MCA_OOB_TCP_FAILED;
311 host = orte_get_proc_hostname(&(peer->name));
312 if (NULL == host && NULL != peer->active_addr) {
313 host = opal_net_get_hostname((struct sockaddr*)&(peer->active_addr->addr));
314 }
315
316
317 opal_output(orte_clean_output,
318 "------------------------------------------------------------\n"
319 "A process or daemon was unable to complete a TCP connection\n"
320 "to another process:\n"
321 " Local host: %s\n"
322 " Remote host: %s\n"
323 "This is usually caused by a firewall on the remote host. Please\n"
324 "check that any firewall (e.g., iptables) has been disabled and\n"
325 "try again.\n"
326 "------------------------------------------------------------",
327 orte_process_info.nodename,
328 (NULL == host) ? "<unknown>" : host);
329
330 CLOSE_THE_SOCKET(peer->sd);
331
332
333
334
335
336
337 ORTE_ACTIVATE_TCP_CMP_OP(peer, mca_oob_tcp_component_failed_to_connect);
338
339
340
341 if (NULL != peer->send_msg) {
342 }
343 while (NULL != (snd = (mca_oob_tcp_send_t*)opal_list_remove_first(&peer->send_queue))) {
344 }
345 goto cleanup;
346 }
347
348 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
349 "%s orte_tcp_peer_try_connect: "
350 "Connection to proc %s succeeded",
351 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
352 ORTE_NAME_PRINT(&peer->name));
353
354
355 if (!peer->recv_ev_active) {
356 opal_event_add(&peer->recv_event, 0);
357 peer->recv_ev_active = true;
358 }
359
360
361 if (ORTE_SUCCESS == (rc = tcp_peer_send_connect_ack(peer))) {
362 peer->state = MCA_OOB_TCP_CONNECT_ACK;
363 } else if (ORTE_ERR_UNREACH == rc) {
364
365
366
367
368 int cmpval = orte_util_compare_name_fields(ORTE_NS_CMP_ALL, ORTE_PROC_MY_NAME, &peer->name);
369 if (OPAL_VALUE1_GREATER == cmpval) {
370 peer->state = MCA_OOB_TCP_CONNECTING;
371 ORTE_ACTIVATE_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect);
372 } else {
373 peer->state = MCA_OOB_TCP_UNCONNECTED;
374 }
375
376 CLOSE_THE_SOCKET(peer->sd);
377 return;
378 } else {
379 opal_output(0,
380 "%s orte_tcp_peer_try_connect: "
381 "tcp_peer_send_connect_ack to proc %s on %s:%d failed: %s (%d)",
382 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
383 ORTE_NAME_PRINT(&(peer->name)),
384 opal_net_get_hostname((struct sockaddr*)&addr->addr),
385 opal_net_get_port((struct sockaddr*)&addr->addr),
386 opal_strerror(rc),
387 rc);
388
389 CLOSE_THE_SOCKET(peer->sd);
390 ORTE_FORCED_TERMINATE(1);
391 }
392
393 cleanup:
394 OBJ_RELEASE(op);
395 }
396
397
398
399
400
401 static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer)
402 {
403 char *msg;
404 mca_oob_tcp_hdr_t hdr;
405 uint16_t ack_flag = htons(1);
406 size_t sdsize, offset = 0;
407
408 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
409 "%s SEND CONNECT ACK", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
410
411
412 hdr.origin = *ORTE_PROC_MY_NAME;
413 hdr.dst = peer->name;
414 hdr.type = MCA_OOB_TCP_IDENT;
415 hdr.tag = 0;
416 hdr.seq_num = 0;
417 memset(hdr.routed, 0, ORTE_MAX_RTD_SIZE+1);
418
419
420 sdsize = sizeof(ack_flag) + strlen(orte_version_string) + 1;
421 hdr.nbytes = sdsize;
422 MCA_OOB_TCP_HDR_HTON(&hdr);
423
424
425 sdsize += sizeof(hdr);
426 if (NULL == (msg = (char*)malloc(sdsize))) {
427 return ORTE_ERR_OUT_OF_RESOURCE;
428 }
429 memset(msg, 0, sdsize);
430
431
432 memcpy(msg + offset, &hdr, sizeof(hdr));
433 offset += sizeof(hdr);
434 memcpy(msg + offset, &ack_flag, sizeof(ack_flag));
435 offset += sizeof(ack_flag);
436 memcpy(msg + offset, orte_version_string, strlen(orte_version_string));
437 offset += strlen(orte_version_string)+1;
438
439
440 if (ORTE_SUCCESS != tcp_peer_send_blocking(peer->sd, msg, sdsize)) {
441 free(msg);
442 peer->state = MCA_OOB_TCP_FAILED;
443 mca_oob_tcp_peer_close(peer);
444 return ORTE_ERR_UNREACH;
445 }
446 free(msg);
447
448 return ORTE_SUCCESS;
449 }
450
451
452
453
454
455 static int tcp_peer_send_connect_nack(int sd, orte_process_name_t name)
456 {
457 char *msg;
458 mca_oob_tcp_hdr_t hdr;
459 uint16_t ack_flag = htons(0);
460 int rc = ORTE_SUCCESS;
461 size_t sdsize, offset = 0;
462
463 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
464 "%s SEND CONNECT NACK", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
465
466
467 hdr.origin = *ORTE_PROC_MY_NAME;
468 hdr.dst = name;
469 hdr.type = MCA_OOB_TCP_IDENT;
470 hdr.tag = 0;
471 hdr.seq_num = 0;
472 memset(hdr.routed, 0, ORTE_MAX_RTD_SIZE+1);
473
474
475 sdsize = sizeof(ack_flag);
476 hdr.nbytes = sdsize;
477 MCA_OOB_TCP_HDR_HTON(&hdr);
478
479
480 sdsize += sizeof(hdr);
481 if (NULL == (msg = (char*)malloc(sdsize))) {
482 return ORTE_ERR_OUT_OF_RESOURCE;
483 }
484 memset(msg, 0, sdsize);
485
486
487 memcpy(msg + offset, &hdr, sizeof(hdr));
488 offset += sizeof(hdr);
489 memcpy(msg + offset, &ack_flag, sizeof(ack_flag));
490 offset += sizeof(ack_flag);
491
492
493 if (ORTE_SUCCESS != tcp_peer_send_blocking(sd, msg, sdsize)) {
494
495
496
497 rc = ORTE_SUCCESS;
498 }
499 free(msg);
500 return rc;
501 }
502
503
504
505
506 static void tcp_peer_event_init(mca_oob_tcp_peer_t* peer)
507 {
508 if (peer->sd >= 0) {
509 assert(!peer->send_ev_active && !peer->recv_ev_active);
510 if (NULL == peer->ev_base) {
511 ORTE_OOB_TCP_NEXT_BASE(peer);
512 }
513 opal_event_set(peer->ev_base,
514 &peer->recv_event,
515 peer->sd,
516 OPAL_EV_READ|OPAL_EV_PERSIST,
517 mca_oob_tcp_recv_handler,
518 peer);
519 opal_event_set_priority(&peer->recv_event, ORTE_MSG_PRI);
520 if (peer->recv_ev_active) {
521 opal_event_del(&peer->recv_event);
522 peer->recv_ev_active = false;
523 }
524
525 opal_event_set(peer->ev_base,
526 &peer->send_event,
527 peer->sd,
528 OPAL_EV_WRITE|OPAL_EV_PERSIST,
529 mca_oob_tcp_send_handler,
530 peer);
531 opal_event_set_priority(&peer->send_event, ORTE_MSG_PRI);
532 if (peer->send_ev_active) {
533 opal_event_del(&peer->send_event);
534 peer->send_ev_active = false;
535 }
536 }
537 }
538
539
540
541
542
543
544 void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_peer_t *peer)
545 {
546 int so_error = 0;
547 opal_socklen_t so_length = sizeof(so_error);
548
549 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
550 "%s:tcp:complete_connect called for peer %s on socket %d",
551 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
552 ORTE_NAME_PRINT(&peer->name), peer->sd);
553
554
555 if (getsockopt(peer->sd, SOL_SOCKET, SO_ERROR, (char *)&so_error, &so_length) < 0) {
556 opal_output(0, "%s tcp_peer_complete_connect: getsockopt() to %s failed: %s (%d)\n",
557 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
558 ORTE_NAME_PRINT(&(peer->name)),
559 strerror(opal_socket_errno),
560 opal_socket_errno);
561 peer->state = MCA_OOB_TCP_FAILED;
562 mca_oob_tcp_peer_close(peer);
563 return;
564 }
565
566 if (so_error == EINPROGRESS) {
567 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
568 "%s:tcp:send:handler still in progress",
569 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
570 return;
571 } else if (so_error == ECONNREFUSED || so_error == ETIMEDOUT) {
572 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
573 "%s-%s tcp_peer_complete_connect: connection failed: %s (%d)",
574 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
575 ORTE_NAME_PRINT(&(peer->name)),
576 strerror(so_error),
577 so_error);
578 mca_oob_tcp_peer_close(peer);
579 return;
580 } else if (so_error != 0) {
581
582
583
584 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
585 "%s-%s tcp_peer_complete_connect: "
586 "connection failed with error %d",
587 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
588 ORTE_NAME_PRINT(&(peer->name)), so_error);
589 mca_oob_tcp_peer_close(peer);
590 return;
591 }
592
593 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
594 "%s tcp_peer_complete_connect: "
595 "sending ack to %s",
596 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
597 ORTE_NAME_PRINT(&(peer->name)));
598
599 if (tcp_peer_send_connect_ack(peer) == ORTE_SUCCESS) {
600 peer->state = MCA_OOB_TCP_CONNECT_ACK;
601 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
602 "%s tcp_peer_complete_connect: "
603 "setting read event on connection to %s",
604 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
605 ORTE_NAME_PRINT(&(peer->name)));
606
607 if (!peer->recv_ev_active) {
608 peer->recv_ev_active = true;
609 ORTE_POST_OBJECT(peer);
610 opal_event_add(&peer->recv_event, 0);
611 }
612 } else {
613 opal_output(0, "%s tcp_peer_complete_connect: unable to send connect ack to %s",
614 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
615 ORTE_NAME_PRINT(&(peer->name)));
616 peer->state = MCA_OOB_TCP_FAILED;
617 mca_oob_tcp_peer_close(peer);
618 }
619 }
620
621
622
623
624
625 static int tcp_peer_send_blocking(int sd, void* data, size_t size)
626 {
627 unsigned char* ptr = (unsigned char*)data;
628 size_t cnt = 0;
629 int retval;
630
631 ORTE_ACQUIRE_OBJECT(ptr);
632
633 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
634 "%s send blocking of %"PRIsize_t" bytes to socket %d",
635 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
636 size, sd);
637
638 while (cnt < size) {
639 retval = send(sd, (char*)ptr+cnt, size-cnt, 0);
640 if (retval < 0) {
641 if (opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {
642 opal_output(0, "%s tcp_peer_send_blocking: send() to socket %d failed: %s (%d)\n",
643 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), sd,
644 strerror(opal_socket_errno),
645 opal_socket_errno);
646 return ORTE_ERR_UNREACH;
647 }
648 continue;
649 }
650 cnt += retval;
651 }
652
653 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
654 "%s blocking send complete to socket %d",
655 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), sd);
656
657 return ORTE_SUCCESS;
658 }
659
660
661
662
663
664
665 static bool retry(mca_oob_tcp_peer_t* peer, int sd, bool fatal)
666 {
667 int cmpval;
668
669 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
670 "%s SIMUL CONNECTION WITH %s",
671 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
672 ORTE_NAME_PRINT(&peer->name));
673 cmpval = orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &peer->name, ORTE_PROC_MY_NAME);
674 if (fatal) {
675 if (peer->send_ev_active) {
676 opal_event_del(&peer->send_event);
677 peer->send_ev_active = false;
678 }
679 if (peer->recv_ev_active) {
680 opal_event_del(&peer->recv_event);
681 peer->recv_ev_active = false;
682 }
683 if (0 <= peer->sd) {
684 CLOSE_THE_SOCKET(peer->sd);
685 peer->sd = -1;
686 }
687 if (OPAL_VALUE1_GREATER == cmpval) {
688
689 peer->state = MCA_OOB_TCP_UNCONNECTED;
690 } else {
691
692 peer->state = MCA_OOB_TCP_CONNECTING;
693 ORTE_ACTIVATE_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect);
694 }
695 return true;
696 } else {
697 if (OPAL_VALUE1_GREATER == cmpval) {
698
699 if (peer->send_ev_active) {
700 opal_event_del(&peer->send_event);
701 peer->send_ev_active = false;
702 }
703 if (peer->recv_ev_active) {
704 opal_event_del(&peer->recv_event);
705 peer->recv_ev_active = false;
706 }
707 CLOSE_THE_SOCKET(peer->sd);
708 peer->state = MCA_OOB_TCP_UNCONNECTED;
709 return false;
710 } else {
711
712 tcp_peer_send_connect_nack(sd, peer->name);
713 CLOSE_THE_SOCKET(sd);
714 return true;
715 }
716 }
717 }
718
719
720 int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
721 int sd, mca_oob_tcp_hdr_t *dhdr)
722 {
723 char *msg;
724 char *version;
725 size_t offset = 0;
726 mca_oob_tcp_hdr_t hdr;
727 mca_oob_tcp_peer_t *peer;
728 uint64_t *ui64;
729 uint16_t ack_flag;
730 bool is_new = (NULL == pr);
731
732 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
733 "%s RECV CONNECT ACK FROM %s ON SOCKET %d",
734 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
735 (NULL == pr) ? "UNKNOWN" : ORTE_NAME_PRINT(&pr->name), sd);
736
737 peer = pr;
738
739 if (tcp_peer_recv_blocking(peer, sd, &hdr, sizeof(mca_oob_tcp_hdr_t))) {
740 if (NULL != peer) {
741
742
743
744 if (peer->state != MCA_OOB_TCP_CONNECT_ACK) {
745
746 opal_output(0, "%s RECV CONNECT BAD HANDSHAKE (%d) FROM %s ON SOCKET %d",
747 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), peer->state,
748 ORTE_NAME_PRINT(&(peer->name)), sd);
749 mca_oob_tcp_peer_close(peer);
750 return ORTE_ERR_UNREACH;
751 }
752 }
753 } else {
754
755 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
756 "%s unable to complete recv of connect-ack from %s ON SOCKET %d",
757 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
758 (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&peer->name), sd);
759 return ORTE_ERR_UNREACH;
760 }
761
762 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
763 "%s connect-ack recvd from %s",
764 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
765 (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&peer->name));
766
767
768 MCA_OOB_TCP_HDR_NTOH(&hdr);
769
770 if (NULL != dhdr) {
771 *dhdr = hdr;
772 }
773
774 if (MCA_OOB_TCP_PROBE == hdr.type) {
775
776 hdr.type = MCA_OOB_TCP_PROBE;
777 hdr.dst = hdr.origin;
778 hdr.origin = *ORTE_PROC_MY_NAME;
779 MCA_OOB_TCP_HDR_HTON(&hdr);
780 tcp_peer_send_blocking(sd, &hdr, sizeof(mca_oob_tcp_hdr_t));
781 CLOSE_THE_SOCKET(sd);
782 return ORTE_SUCCESS;
783 }
784
785 if (hdr.type != MCA_OOB_TCP_IDENT) {
786 opal_output(0, "tcp_peer_recv_connect_ack: invalid header type: %d\n",
787 hdr.type);
788 if (NULL != peer) {
789 peer->state = MCA_OOB_TCP_FAILED;
790 mca_oob_tcp_peer_close(peer);
791 } else {
792 CLOSE_THE_SOCKET(sd);
793 }
794 return ORTE_ERR_COMM_FAILURE;
795 }
796
797
798 if (NULL == peer) {
799 peer = mca_oob_tcp_peer_lookup(&hdr.origin);
800 if (NULL == peer) {
801 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
802 "%s mca_oob_tcp_recv_connect: connection from new peer",
803 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
804 peer = OBJ_NEW(mca_oob_tcp_peer_t);
805 peer->name = hdr.origin;
806 ORTE_OOB_TCP_NEXT_BASE(peer);
807 peer->state = MCA_OOB_TCP_ACCEPTING;
808 ui64 = (uint64_t*)(&peer->name);
809 if (OPAL_SUCCESS != opal_hash_table_set_value_uint64(&mca_oob_tcp_component.peers, (*ui64), peer)) {
810 OBJ_RELEASE(peer);
811 CLOSE_THE_SOCKET(sd);
812 return ORTE_ERR_OUT_OF_RESOURCE;
813 }
814 }
815 } else {
816
817 if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &peer->name, &hdr.origin)) {
818 opal_output(0, "%s tcp_peer_recv_connect_ack: "
819 "received unexpected process identifier %s from %s\n",
820 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
821 ORTE_NAME_PRINT(&(hdr.origin)),
822 ORTE_NAME_PRINT(&(peer->name)));
823 peer->state = MCA_OOB_TCP_FAILED;
824 mca_oob_tcp_peer_close(peer);
825 return ORTE_ERR_CONNECTION_REFUSED;
826 }
827 }
828
829 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
830 "%s connect-ack header from %s is okay",
831 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
832 ORTE_NAME_PRINT(&peer->name));
833
834
835 if (NULL == (msg = (char*)malloc(hdr.nbytes))) {
836 peer->state = MCA_OOB_TCP_FAILED;
837 mca_oob_tcp_peer_close(peer);
838 return ORTE_ERR_OUT_OF_RESOURCE;
839 }
840 if (!tcp_peer_recv_blocking(peer, sd, msg, hdr.nbytes)) {
841
842 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
843 "%s unable to complete recv of connect-ack from %s ON SOCKET %d",
844 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
845 ORTE_NAME_PRINT(&peer->name), peer->sd);
846 free(msg);
847 return ORTE_ERR_UNREACH;
848 }
849
850
851 memcpy(&ack_flag, msg + offset, sizeof(ack_flag));
852 offset += sizeof(ack_flag);
853
854 ack_flag = ntohs(ack_flag);
855 if( !ack_flag ){
856 if (MCA_OOB_TCP_CONNECT_ACK == peer->state) {
857
858
859
860
861
862 CLOSE_THE_SOCKET(peer->sd);
863 peer->sd = -1;
864
865
866 if (peer->recv_ev_active) {
867 opal_event_del(&peer->recv_event);
868 peer->recv_ev_active = false;
869 }
870 if (peer->send_ev_active) {
871 opal_event_del(&peer->send_event);
872 peer->send_ev_active = false;
873 }
874
875
876
877
878 peer->state = MCA_OOB_TCP_UNCONNECTED;
879 } else {
880
881
882
883 mca_oob_tcp_peer_close(peer);
884 }
885 free(msg);
886 return ORTE_ERR_UNREACH;
887 }
888
889
890
891
892
893
894
895 if (is_new &&
896 ( MCA_OOB_TCP_CONNECTED == peer->state ||
897 MCA_OOB_TCP_CONNECTING == peer->state ||
898 MCA_OOB_TCP_CONNECT_ACK == peer->state ) ) {
899 if (retry(peer, sd, false)) {
900 free(msg);
901 return ORTE_ERR_UNREACH;
902 }
903 }
904
905
906 version = (char*)((char*)msg + offset);
907 offset += strlen(version) + 1;
908 if (0 != strcmp(version, orte_version_string)) {
909 opal_show_help("help-oob-tcp.txt", "version mismatch",
910 true,
911 opal_process_info.nodename,
912 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
913 orte_version_string,
914 opal_fd_get_peer_name(peer->sd),
915 ORTE_NAME_PRINT(&(peer->name)),
916 version);
917
918 peer->state = MCA_OOB_TCP_FAILED;
919 mca_oob_tcp_peer_close(peer);
920 free(msg);
921 return ORTE_ERR_CONNECTION_REFUSED;
922 }
923 free(msg);
924
925 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
926 "%s connect-ack version from %s matches ours",
927 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
928 ORTE_NAME_PRINT(&peer->name));
929
930
931
932
933 if (NULL != dhdr) {
934 return ORTE_SUCCESS;
935 }
936
937
938
939
940 ORTE_ACTIVATE_TCP_CMP_OP(peer, mca_oob_tcp_component_set_module);
941
942
943 tcp_peer_connected(peer);
944 if (OOB_TCP_DEBUG_CONNECT <= opal_output_get_verbosity(orte_oob_base_framework.framework_output)) {
945 mca_oob_tcp_peer_dump(peer, "connected");
946 }
947 return ORTE_SUCCESS;
948 }
949
950
951
952
953
954 static void tcp_peer_connected(mca_oob_tcp_peer_t* peer)
955 {
956 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
957 "%s-%s tcp_peer_connected on socket %d",
958 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
959 ORTE_NAME_PRINT(&(peer->name)), peer->sd);
960
961 if (peer->timer_ev_active) {
962 opal_event_del(&peer->timer_event);
963 peer->timer_ev_active = false;
964 }
965 peer->state = MCA_OOB_TCP_CONNECTED;
966 if (NULL != peer->active_addr) {
967 peer->active_addr->retries = 0;
968 }
969
970
971 orte_routed.update_route(&peer->name, &peer->name);
972
973
974 if (NULL == peer->send_msg) {
975 peer->send_msg = (mca_oob_tcp_send_t*)
976 opal_list_remove_first(&peer->send_queue);
977 }
978 if (NULL != peer->send_msg && !peer->send_ev_active) {
979 peer->send_ev_active = true;
980 ORTE_POST_OBJECT(peer);
981 opal_event_add(&peer->send_event, 0);
982 }
983 }
984
985
986
987
988
989
990 void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t *peer)
991 {
992 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
993 "%s tcp_peer_close for %s sd %d state %s",
994 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
995 ORTE_NAME_PRINT(&(peer->name)),
996 peer->sd, mca_oob_tcp_state_print(peer->state));
997
998
999 close(peer->sd);
1000 peer->sd = -1;
1001
1002
1003
1004 if (MCA_OOB_TCP_CONNECTING == peer->state) {
1005 if (NULL != peer->active_addr) {
1006 peer->active_addr->state = MCA_OOB_TCP_FAILED;
1007 }
1008 ORTE_ACTIVATE_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect);
1009 return;
1010 }
1011
1012 peer->state = MCA_OOB_TCP_CLOSED;
1013 if (NULL != peer->active_addr) {
1014 peer->active_addr->state = MCA_OOB_TCP_CLOSED;
1015 }
1016
1017
1018 if (peer->recv_ev_active) {
1019 opal_event_del(&peer->recv_event);
1020 peer->recv_ev_active = false;
1021 }
1022 if (peer->send_ev_active) {
1023 opal_event_del(&peer->send_event);
1024 peer->send_ev_active = false;
1025 }
1026
1027
1028
1029
1030 ORTE_ACTIVATE_TCP_CMP_OP(peer, mca_oob_tcp_component_lost_connection);
1031
1032 if (orte_orteds_term_ordered || orte_finalizing || orte_abnormal_term_ordered) {
1033
1034 return;
1035 }
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049 }
1050
1051
1052
1053
1054
1055 static bool tcp_peer_recv_blocking(mca_oob_tcp_peer_t* peer, int sd,
1056 void* data, size_t size)
1057 {
1058 unsigned char* ptr = (unsigned char*)data;
1059 size_t cnt = 0;
1060
1061 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
1062 "%s waiting for connect ack from %s",
1063 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1064 (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&(peer->name)));
1065
1066 while (cnt < size) {
1067 int retval = recv(sd, (char *)ptr+cnt, size-cnt, 0);
1068
1069
1070 if (retval == 0) {
1071 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
1072 "%s-%s tcp_peer_recv_blocking: "
1073 "peer closed connection: peer state %d",
1074 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1075 (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&(peer->name)),
1076 (NULL == peer) ? 0 : peer->state);
1077 if (NULL != peer) {
1078 mca_oob_tcp_peer_close(peer);
1079 } else {
1080 CLOSE_THE_SOCKET(sd);
1081 }
1082 return false;
1083 }
1084
1085
1086 if (retval < 0) {
1087 if (opal_socket_errno != EINTR &&
1088 opal_socket_errno != EAGAIN &&
1089 opal_socket_errno != EWOULDBLOCK) {
1090 if (NULL == peer) {
1091
1092 CLOSE_THE_SOCKET(sd);
1093 return false;
1094 } else if (peer->state == MCA_OOB_TCP_CONNECT_ACK) {
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
1110 "%s connect ack received error %s from %s",
1111 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1112 strerror(opal_socket_errno),
1113 ORTE_NAME_PRINT(&(peer->name)));
1114 return false;
1115 } else {
1116 opal_output(0,
1117 "%s tcp_peer_recv_blocking: "
1118 "recv() failed for %s: %s (%d)\n",
1119 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1120 ORTE_NAME_PRINT(&(peer->name)),
1121 strerror(opal_socket_errno),
1122 opal_socket_errno);
1123 peer->state = MCA_OOB_TCP_FAILED;
1124 mca_oob_tcp_peer_close(peer);
1125 return false;
1126 }
1127 }
1128 continue;
1129 }
1130 cnt += retval;
1131 }
1132
1133 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
1134 "%s connect ack received from %s",
1135 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1136 (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&(peer->name)));
1137 return true;
1138 }
1139
1140
1141
1142
1143 void mca_oob_tcp_peer_dump(mca_oob_tcp_peer_t* peer, const char* msg)
1144 {
1145 char src[64];
1146 char dst[64];
1147 char buff[255];
1148 int sndbuf,rcvbuf,nodelay,flags;
1149 struct sockaddr_storage inaddr;
1150 opal_socklen_t addrlen = sizeof(struct sockaddr_storage);
1151 opal_socklen_t optlen;
1152
1153 if (getsockname(peer->sd, (struct sockaddr*)&inaddr, &addrlen) < 0) {
1154 opal_output(0, "tcp_peer_dump: getsockname: %s (%d)\n",
1155 strerror(opal_socket_errno),
1156 opal_socket_errno);
1157 } else {
1158 snprintf(src, sizeof(src), "%s", opal_net_get_hostname((struct sockaddr*) &inaddr));
1159 }
1160 if (getpeername(peer->sd, (struct sockaddr*)&inaddr, &addrlen) < 0) {
1161 opal_output(0, "tcp_peer_dump: getpeername: %s (%d)\n",
1162 strerror(opal_socket_errno),
1163 opal_socket_errno);
1164 } else {
1165 snprintf(dst, sizeof(dst), "%s", opal_net_get_hostname((struct sockaddr*) &inaddr));
1166 }
1167
1168 if ((flags = fcntl(peer->sd, F_GETFL, 0)) < 0) {
1169 opal_output(0, "tcp_peer_dump: fcntl(F_GETFL) failed: %s (%d)\n",
1170 strerror(opal_socket_errno),
1171 opal_socket_errno);
1172 }
1173
1174 #if defined(SO_SNDBUF)
1175 optlen = sizeof(sndbuf);
1176 if(getsockopt(peer->sd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &optlen) < 0) {
1177 opal_output(0, "tcp_peer_dump: SO_SNDBUF option: %s (%d)\n",
1178 strerror(opal_socket_errno),
1179 opal_socket_errno);
1180 }
1181 #else
1182 sndbuf = -1;
1183 #endif
1184 #if defined(SO_RCVBUF)
1185 optlen = sizeof(rcvbuf);
1186 if (getsockopt(peer->sd, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf, &optlen) < 0) {
1187 opal_output(0, "tcp_peer_dump: SO_RCVBUF option: %s (%d)\n",
1188 strerror(opal_socket_errno),
1189 opal_socket_errno);
1190 }
1191 #else
1192 rcvbuf = -1;
1193 #endif
1194 #if defined(TCP_NODELAY)
1195 optlen = sizeof(nodelay);
1196 if (getsockopt(peer->sd, IPPROTO_TCP, TCP_NODELAY, (char *)&nodelay, &optlen) < 0) {
1197 opal_output(0, "tcp_peer_dump: TCP_NODELAY option: %s (%d)\n",
1198 strerror(opal_socket_errno),
1199 opal_socket_errno);
1200 }
1201 #else
1202 nodelay = 0;
1203 #endif
1204
1205 snprintf(buff, sizeof(buff), "%s-%s %s: %s - %s nodelay %d sndbuf %d rcvbuf %d flags %08x\n",
1206 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1207 ORTE_NAME_PRINT(&(peer->name)),
1208 msg, src, dst, nodelay, sndbuf, rcvbuf, flags);
1209 opal_output(0, "%s", buff);
1210 }
1211
1212
1213
1214
1215
1216 bool mca_oob_tcp_peer_accept(mca_oob_tcp_peer_t* peer)
1217 {
1218 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
1219 "%s tcp:peer_accept called for peer %s in state %s on socket %d",
1220 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1221 ORTE_NAME_PRINT(&peer->name),
1222 mca_oob_tcp_state_print(peer->state), peer->sd);
1223
1224 if (peer->state != MCA_OOB_TCP_CONNECTED) {
1225
1226 tcp_peer_event_init(peer);
1227
1228 if (tcp_peer_send_connect_ack(peer) != ORTE_SUCCESS) {
1229 opal_output(0, "%s-%s tcp_peer_accept: "
1230 "tcp_peer_send_connect_ack failed\n",
1231 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1232 ORTE_NAME_PRINT(&(peer->name)));
1233 peer->state = MCA_OOB_TCP_FAILED;
1234 mca_oob_tcp_peer_close(peer);
1235 return false;
1236 }
1237
1238
1239
1240
1241 ORTE_ACTIVATE_TCP_CMP_OP(peer, mca_oob_tcp_component_set_module);
1242
1243 tcp_peer_connected(peer);
1244 if (!peer->recv_ev_active) {
1245 peer->recv_ev_active = true;
1246 ORTE_POST_OBJECT(peer);
1247 opal_event_add(&peer->recv_event, 0);
1248 }
1249 if (OOB_TCP_DEBUG_CONNECT <= opal_output_get_verbosity(orte_oob_base_framework.framework_output)) {
1250 mca_oob_tcp_peer_dump(peer, "accepted");
1251 }
1252 return true;
1253 }
1254
1255 opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
1256 "%s tcp:peer_accept ignored for peer %s in state %s on socket %d",
1257 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1258 ORTE_NAME_PRINT(&peer->name),
1259 mca_oob_tcp_state_print(peer->state), peer->sd);
1260 return false;
1261 }