This source file includes following definitions.
- mca_btl_tcp2_endpoint_construct
- mca_btl_tcp2_endpoint_destruct
- mca_btl_tcp_endpoint_dump
- mca_btl_tcp2_endpoint_event_init
- mca_btl_tcp2_endpoint_send
- mca_btl_tcp2_endpoint_send_blocking
- mca_btl_tcp2_endpoint_send_connect_ack
- mca_btl_tcp2_endpoint_accept
- mca_btl_tcp2_endpoint_close
- mca_btl_tcp2_endpoint_connected
- mca_btl_tcp2_endpoint_recv_blocking
- mca_btl_tcp2_endpoint_recv_connect_ack
- mca_btl_tcp2_set_socket_options
- mca_btl_tcp2_endpoint_start_connect
- mca_btl_tcp2_endpoint_complete_connect
- mca_btl_tcp2_endpoint_recv_handler
- mca_btl_tcp2_endpoint_send_handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 #include "ompi_config.h"
23
24 #include <stdlib.h>
25 #include <string.h>
26 #ifdef HAVE_UNISTD_H
27 #include <unistd.h>
28 #endif
29 #include "opal/opal_socket_errno.h"
30 #ifdef HAVE_SYS_TYPES_H
31 #include <sys/types.h>
32 #endif
33 #ifdef HAVE_FCNTL_H
34 #include <fcntl.h>
35 #endif
36 #ifdef HAVE_NETINET_IN_H
37 #include <netinet/in.h>
38 #endif
39 #ifdef HAVE_NETINET_TCP_H
40 #include <netinet/tcp.h>
41 #endif
42 #ifdef HAVE_ARPA_INET_H
43 #include <arpa/inet.h>
44 #endif
45 #ifdef HAVE_SYS_TIME_H
46 #include <sys/time.h>
47 #endif
48 #ifdef HAVE_TIME_H
49 #include <time.h>
50 #endif
51
52 #include "opal/util/net.h"
53 #include "opal/util/fd.h"
54 #include "opal/util/show_help.h"
55 #include "ompi/mca/btl/base/btl_base_error.h"
56 #include "ompi/mca/rte/rte.h"
57
58 #include "btl_tcp_endpoint.h"
59 #include "btl_tcp_proc.h"
60 #include "btl_tcp_frag.h"
61
62
63
64
65
66 static void mca_btl_tcp2_endpoint_construct(mca_btl_tcp2_endpoint_t* endpoint)
67 {
68 endpoint->endpoint_btl = NULL;
69 endpoint->endpoint_proc = NULL;
70 endpoint->endpoint_addr = NULL;
71 endpoint->endpoint_sd = -1;
72 endpoint->endpoint_send_frag = 0;
73 endpoint->endpoint_recv_frag = 0;
74 endpoint->endpoint_state = MCA_BTL_TCP_CLOSED;
75 endpoint->endpoint_retries = 0;
76 endpoint->endpoint_nbo = false;
77 #if MCA_BTL_TCP_ENDPOINT_CACHE
78 endpoint->endpoint_cache = NULL;
79 endpoint->endpoint_cache_pos = NULL;
80 endpoint->endpoint_cache_length = 0;
81 #endif
82 OBJ_CONSTRUCT(&endpoint->endpoint_frags, opal_list_t);
83 OBJ_CONSTRUCT(&endpoint->endpoint_send_lock, opal_mutex_t);
84 OBJ_CONSTRUCT(&endpoint->endpoint_recv_lock, opal_mutex_t);
85 }
86
87
88
89
90
91 static void mca_btl_tcp2_endpoint_destruct(mca_btl_tcp2_endpoint_t* endpoint)
92 {
93 mca_btl_tcp2_proc_remove(endpoint->endpoint_proc, endpoint);
94 mca_btl_tcp2_endpoint_close(endpoint);
95 OBJ_DESTRUCT(&endpoint->endpoint_frags);
96 OBJ_DESTRUCT(&endpoint->endpoint_send_lock);
97 OBJ_DESTRUCT(&endpoint->endpoint_recv_lock);
98 }
99
100 OBJ_CLASS_INSTANCE(
101 mca_btl_tcp2_endpoint_t,
102 opal_list_item_t,
103 mca_btl_tcp2_endpoint_construct,
104 mca_btl_tcp2_endpoint_destruct);
105
106
107 static void mca_btl_tcp2_endpoint_construct(mca_btl_base_endpoint_t* btl_endpoint);
108 static void mca_btl_tcp2_endpoint_destruct(mca_btl_base_endpoint_t* btl_endpoint);
109 static int mca_btl_tcp2_endpoint_start_connect(mca_btl_base_endpoint_t*);
110 static void mca_btl_tcp2_endpoint_connected(mca_btl_base_endpoint_t*);
111 static void mca_btl_tcp2_endpoint_recv_handler(int sd, short flags, void* user);
112 static void mca_btl_tcp2_endpoint_send_handler(int sd, short flags, void* user);
113
114
115
116
117
118 #define WANT_PEER_DUMP 0
119
120
121
122
123 void mca_btl_tcp_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, const char* msg)
124 {
125 char src[64], dst[64], *status;
126 int sndbuf, rcvbuf, nodelay, flags = -1;
127 #if OPAL_ENABLE_IPV6
128 struct sockaddr_storage inaddr;
129 #else
130 struct sockaddr_in inaddr;
131 #endif
132 opal_socklen_t obtlen;
133 opal_socklen_t addrlen = sizeof(inaddr);
134 opal_list_item_t *item;
135
136 if( -1 != btl_endpoint->endpoint_sd ) {
137 getsockname(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen);
138 #if OPAL_ENABLE_IPV6
139 {
140 char *address;
141 address = (char *) opal_net_get_hostname((struct sockaddr*) &inaddr);
142 if (NULL != address) {
143 sprintf(src, "%s", address);
144 }
145 }
146 #else
147 sprintf(src, "%s", inet_ntoa(inaddr.sin_addr));
148 #endif
149 getpeername(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen);
150 #if OPAL_ENABLE_IPV6
151 {
152 char *address;
153 address = (char *) opal_net_get_hostname ((struct sockaddr*) &inaddr);
154 if (NULL != address) {
155 sprintf(dst, "%s", address);
156 }
157 }
158 #else
159 sprintf(dst, "%s", inet_ntoa(inaddr.sin_addr));
160 #endif
161
162 if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) {
163 BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)",
164 strerror(opal_socket_errno), opal_socket_errno));
165 }
166
167 #if defined(SO_SNDBUF)
168 obtlen = sizeof(sndbuf);
169 if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &obtlen) < 0) {
170 BTL_ERROR(("SO_SNDBUF option: %s (%d)",
171 strerror(opal_socket_errno), opal_socket_errno));
172 }
173 #else
174 sndbuf = -1;
175 #endif
176 #if defined(SO_RCVBUF)
177 obtlen = sizeof(rcvbuf);
178 if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf, &obtlen) < 0) {
179 BTL_ERROR(("SO_RCVBUF option: %s (%d)",
180 strerror(opal_socket_errno), opal_socket_errno));
181 }
182 #else
183 rcvbuf = -1;
184 #endif
185 #if defined(TCP_NODELAY)
186 obtlen = sizeof(nodelay);
187 if(getsockopt(btl_endpoint->endpoint_sd, IPPROTO_TCP, TCP_NODELAY, (char *)&nodelay, &obtlen) < 0) {
188 BTL_ERROR(("TCP_NODELAY option: %s (%d)",
189 strerror(opal_socket_errno), opal_socket_errno));
190 }
191 #else
192 nodelay = 0;
193 #endif
194 }
195
196 mca_btl_base_err("%s %s: endpoint %p src %s - dst %s nodelay %d sndbuf %d rcvbuf %d flags %08x\n",
197 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), msg, (void*)btl_endpoint, src, dst, nodelay, sndbuf, rcvbuf, flags);
198
199 switch(btl_endpoint->endpoint_state) {
200 case MCA_BTL_TCP_CONNECTING:
201 status = "connecting"; break;
202 case MCA_BTL_TCP_CONNECT_ACK:
203 status = "connect ack"; break;
204 case MCA_BTL_TCP_CLOSED:
205 status = "closed"; break;
206 case MCA_BTL_TCP_FAILED:
207 status = "failed"; break;
208 case MCA_BTL_TCP_CONNECTED:
209 status = "connected"; break;
210 default:
211 status = "undefined"; break;
212 }
213 mca_btl_base_err("%s | [socket %d] [state %s] (nbo %s) (retries %u)\n"
214 #if MCA_BTL_TCP_ENDPOINT_CACHE
215 "\tcache %p length %lu pos %ld\n"
216 #endif
217 "\tpending: send %p recv %p\n",
218 msg, btl_endpoint->endpoint_sd, status,
219 (btl_endpoint->endpoint_nbo ? "true" : "false"), btl_endpoint->endpoint_retries,
220 #if MCA_BTL_TCP_ENDPOINT_CACHE
221 btl_endpoint->endpoint_cache, btl_endpoint->endpoint_cache_length, btl_endpoint->endpoint_cache_pos - btl_endpoint->endpoint_cache,
222 #endif
223 (void*)btl_endpoint->endpoint_send_frag, (void*)btl_endpoint->endpoint_recv_frag );
224 for(item = opal_list_get_first(&btl_endpoint->endpoint_frags);
225 item != opal_list_get_end(&btl_endpoint->endpoint_frags);
226 item = opal_list_get_next(item)) {
227 mca_btl_tcp_dump_frag( (mca_btl_tcp_frag_t*)item, " | send" );
228 }
229 }
230
231
232
233
234
235 static inline void mca_btl_tcp2_endpoint_event_init(mca_btl_base_endpoint_t* btl_endpoint)
236 {
237 #if MCA_BTL_TCP_ENDPOINT_CACHE
238 btl_endpoint->endpoint_cache = (char*)malloc(mca_btl_tcp2_component.tcp_endpoint_cache);
239 btl_endpoint->endpoint_cache_pos = btl_endpoint->endpoint_cache;
240 #endif
241
242 opal_event_set(mca_btl_tcp_event_base, &btl_endpoint->endpoint_recv_event,
243 btl_endpoint->endpoint_sd,
244 OPAL_EV_READ|OPAL_EV_PERSIST,
245 mca_btl_tcp_endpoint_recv_handler,
246 btl_endpoint );
247
248
249
250
251
252
253 opal_event_set(mca_btl_tcp_event_base, &btl_endpoint->endpoint_send_event,
254 btl_endpoint->endpoint_sd,
255 OPAL_EV_WRITE,
256 mca_btl_tcp_endpoint_send_handler,
257 btl_endpoint);
258 }
259
260
261
262
263
264
265
266 int mca_btl_tcp2_endpoint_send(mca_btl_base_endpoint_t* btl_endpoint, mca_btl_tcp2_frag_t* frag)
267 {
268 int rc = OMPI_SUCCESS;
269
270 MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&btl_endpoint->endpoint_send_lock);
271 switch(btl_endpoint->endpoint_state) {
272 case MCA_BTL_TCP_CONNECTING:
273 case MCA_BTL_TCP_CONNECT_ACK:
274 case MCA_BTL_TCP_CLOSED:
275 opal_list_append(&btl_endpoint->endpoint_frags, (opal_list_item_t*)frag);
276 frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
277 if(btl_endpoint->endpoint_state == MCA_BTL_TCP_CLOSED)
278 rc = mca_btl_tcp2_endpoint_start_connect(btl_endpoint);
279 break;
280 case MCA_BTL_TCP_FAILED:
281 rc = OMPI_ERR_UNREACH;
282 break;
283 case MCA_BTL_TCP_CONNECTED:
284 if (btl_endpoint->endpoint_send_frag == NULL) {
285 if(frag->base.des_flags & MCA_BTL_DES_FLAGS_PRIORITY &&
286 mca_btl_tcp2_frag_send(frag, btl_endpoint->endpoint_sd)) {
287 int btl_ownership = (frag->base.des_flags & MCA_BTL_DES_FLAGS_BTL_OWNERSHIP);
288 opal_mutex_atomic_unlock(&btl_endpoint->endpoint_send_lock);
289 MCA_BTL_TCP_COMPLETE_FRAG_SEND(frag);
290 return 1;
291 } else {
292 btl_endpoint->endpoint_send_frag = frag;
293 frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
294 MCA_BTL_TCP_ACTIVATE_EVENT(&btl_endpoint->endpoint_send_event, 0);
295 }
296 } else {
297 frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
298 opal_list_append(&btl_endpoint->endpoint_frags, (opal_list_item_t*)frag);
299 }
300 break;
301 }
302 MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&btl_endpoint->endpoint_send_lock);
303 return rc;
304 }
305
306
307
308
309
310
311 static int mca_btl_tcp2_endpoint_send_blocking(mca_btl_base_endpoint_t* btl_endpoint, void* data, size_t size)
312 {
313 unsigned char* ptr = (unsigned char*)data;
314 size_t cnt = 0;
315 while(cnt < size) {
316 int retval = send(btl_endpoint->endpoint_sd, (const char *)ptr+cnt, size-cnt, 0);
317 if(retval < 0) {
318 if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {
319 BTL_ERROR(("send() failed: %s (%d)",
320 strerror(opal_socket_errno), opal_socket_errno));
321 mca_btl_tcp2_endpoint_close(btl_endpoint);
322 return -1;
323 }
324 continue;
325 }
326 cnt += retval;
327 }
328 return cnt;
329 }
330
331
332
333
334
335
336
337 static int mca_btl_tcp2_endpoint_send_connect_ack(mca_btl_base_endpoint_t* btl_endpoint)
338 {
339
340 mca_btl_tcp2_proc_t* btl_proc = mca_btl_tcp2_proc_local();
341 orte_process_name_t guid = btl_proc->proc_ompi->proc_name;
342
343 ORTE_PROCESS_NAME_HTON(guid);
344 if(mca_btl_tcp2_endpoint_send_blocking(btl_endpoint, &guid, sizeof(guid)) !=
345 sizeof(guid)) {
346 return OMPI_ERR_UNREACH;
347 }
348 return OMPI_SUCCESS;
349 }
350
351
352
353
354
355
356
357
358
359
360 bool mca_btl_tcp2_endpoint_accept(mca_btl_base_endpoint_t* btl_endpoint,
361 struct sockaddr* addr, int sd)
362 {
363 mca_btl_tcp_proc_t *endpoint_proc = btl_endpoint->endpoint_proc;
364 const orte_process_name_t *this_proc = &(ompi_proc_local()->proc_name);
365 int cmpval;
366
367 if(NULL == btl_endpoint->endpoint_addr) {
368 return false;
369 }
370
371 OPAL_THREAD_LOCK(&btl_endpoint->endpoint_recv_lock);
372 OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
373
374 cmpval = ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL,
375 &endpoint_proc->proc_ompi->proc_name,
376 this_proc);
377 if((btl_endpoint->endpoint_sd < 0) ||
378 (btl_endpoint->endpoint_state != MCA_BTL_TCP_CONNECTED &&
379 cmpval < 0)) {
380 mca_btl_tcp2_endpoint_close(btl_endpoint);
381 btl_endpoint->endpoint_sd = sd;
382 if(mca_btl_tcp2_endpoint_send_connect_ack(btl_endpoint) != OMPI_SUCCESS) {
383 mca_btl_tcp2_endpoint_close(btl_endpoint);
384 OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
385 OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
386 return false;
387 }
388 mca_btl_tcp_endpoint_event_init(btl_endpoint);
389
390
391
392 opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
393 mca_btl_tcp_endpoint_connected(btl_endpoint);
394 #if OPAL_ENABLE_DEBUG && WANT_PEER_DUMP
395 mca_btl_tcp2_endpoint_dump(btl_endpoint, "accepted");
396 #endif
397 OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
398 OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
399 return true;
400 }
401 OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
402 OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
403 return false;
404 }
405
406
407
408
409
410
411
412 void mca_btl_tcp2_endpoint_close(mca_btl_base_endpoint_t* btl_endpoint)
413 {
414 int sd = btl_endpoint->endpoint_sd;
415
416 do {
417 if( sd < 0 ) return;
418 } while ( opal_atomic_cmpset( &(btl_endpoint->endpoint_sd), sd, -1 ) );
419
420 CLOSE_THE_SOCKET(sd);
421 btl_endpoint->endpoint_retries++;
422 opal_event_del(&btl_endpoint->endpoint_recv_event);
423 opal_event_del(&btl_endpoint->endpoint_send_event);
424 #if MCA_BTL_TCP_ENDPOINT_CACHE
425 if( NULL != btl_endpoint->endpoint_cache )
426 free( btl_endpoint->endpoint_cache );
427 btl_endpoint->endpoint_cache = NULL;
428 btl_endpoint->endpoint_cache_pos = NULL;
429 btl_endpoint->endpoint_cache_length = 0;
430 #endif
431 }
432
433
434
435
436
437
438
439 static void mca_btl_tcp2_endpoint_connected(mca_btl_base_endpoint_t* btl_endpoint)
440 {
441
442 btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECTED;
443 btl_endpoint->endpoint_retries = 0;
444
445
446 opal_event_set(mca_btl_tcp_event_base, &btl_endpoint->endpoint_send_event,
447 btl_endpoint->endpoint_sd,
448 OPAL_EV_WRITE | OPAL_EV_PERSIST,
449 mca_btl_tcp_endpoint_send_handler,
450 btl_endpoint );
451
452 if(opal_list_get_size(&btl_endpoint->endpoint_frags) > 0) {
453 if(NULL == btl_endpoint->endpoint_send_frag) {
454 btl_endpoint->endpoint_send_frag = (mca_btl_tcp_frag_t*)
455 opal_list_remove_first(&btl_endpoint->endpoint_frags);
456 }
457 opal_event_add(&btl_endpoint->endpoint_send_event, 0);
458 }
459 }
460
461
462
463
464
465
466 static int mca_btl_tcp2_endpoint_recv_blocking(mca_btl_base_endpoint_t* btl_endpoint, void* data, size_t size)
467 {
468 unsigned char* ptr = (unsigned char*)data;
469 size_t cnt = 0;
470 while(cnt < size) {
471 int retval = recv(btl_endpoint->endpoint_sd, (char *)ptr+cnt, size-cnt, 0);
472
473
474 if(retval == 0) {
475 mca_btl_tcp2_endpoint_close(btl_endpoint);
476 return -1;
477 }
478
479
480 if(retval < 0) {
481 if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {
482 BTL_ERROR(("recv(%d) failed: %s (%d)",
483 btl_endpoint->endpoint_sd, strerror(opal_socket_errno), opal_socket_errno));
484 mca_btl_tcp2_endpoint_close(btl_endpoint);
485 return -1;
486 }
487 continue;
488 }
489 cnt += retval;
490 }
491 return cnt;
492 }
493
494
495
496
497
498
499
500 static int mca_btl_tcp2_endpoint_recv_connect_ack(mca_btl_base_endpoint_t* btl_endpoint)
501 {
502 orte_process_name_t guid;
503 mca_btl_tcp2_proc_t* btl_proc = btl_endpoint->endpoint_proc;
504
505 if((mca_btl_tcp2_endpoint_recv_blocking(btl_endpoint, &guid, sizeof(orte_process_name_t))) != sizeof(orte_process_name_t)) {
506 return OMPI_ERR_UNREACH;
507 }
508 ORTE_PROCESS_NAME_NTOH(guid);
509
510 if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL,
511 &btl_proc->proc_ompi->proc_name,
512 &guid)) {
513 BTL_ERROR(("received unexpected process identifier %s",
514 ORTE_NAME_PRINT(&guid)));
515 mca_btl_tcp2_endpoint_close(btl_endpoint);
516 return OMPI_ERR_UNREACH;
517 }
518
519 return OMPI_SUCCESS;
520 }
521
522
523 void mca_btl_tcp2_set_socket_options(int sd)
524 {
525 int optval;
526 #if defined(TCP_NODELAY)
527 optval = mca_btl_tcp2_component.tcp_use_nodelay;
528 if(setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char *)&optval, sizeof(optval)) < 0) {
529 BTL_ERROR(("setsockopt(TCP_NODELAY) failed: %s (%d)",
530 strerror(opal_socket_errno), opal_socket_errno));
531 }
532 #endif
533 #if defined(SO_SNDBUF)
534 if(mca_btl_tcp2_component.tcp_sndbuf > 0 &&
535 setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&mca_btl_tcp2_component.tcp_sndbuf, sizeof(int)) < 0) {
536 BTL_ERROR(("setsockopt(SO_SNDBUF) failed: %s (%d)",
537 strerror(opal_socket_errno), opal_socket_errno));
538 }
539 #endif
540 #if defined(SO_RCVBUF)
541 if(mca_btl_tcp2_component.tcp_rcvbuf > 0 &&
542 setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&mca_btl_tcp2_component.tcp_rcvbuf, sizeof(int)) < 0) {
543 BTL_ERROR(("setsockopt(SO_RCVBUF) failed: %s (%d)",
544 strerror(opal_socket_errno), opal_socket_errno));
545 }
546 #endif
547 }
548
549
550
551
552
553
554
555
556
557
558 static int mca_btl_tcp2_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpoint)
559 {
560 int rc,flags;
561 struct sockaddr_storage endpoint_addr;
562
563 uint16_t af_family = AF_INET;
564 opal_socklen_t addrlen = sizeof(struct sockaddr_in);
565
566 #if OPAL_ENABLE_IPV6
567 if (AF_INET6 == btl_endpoint->endpoint_addr->addr_family) {
568 af_family = AF_INET6;
569 addrlen = sizeof (struct sockaddr_in6);
570 }
571 #endif
572
573 btl_endpoint->endpoint_sd = socket(af_family, SOCK_STREAM, 0);
574 if (btl_endpoint->endpoint_sd < 0) {
575 btl_endpoint->endpoint_retries++;
576 return OMPI_ERR_UNREACH;
577 }
578
579
580 mca_btl_tcp2_set_socket_options(btl_endpoint->endpoint_sd);
581
582
583 mca_btl_tcp2_endpoint_event_init(btl_endpoint);
584
585
586 if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) {
587 BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)",
588 strerror(opal_socket_errno), opal_socket_errno));
589 } else {
590 flags |= O_NONBLOCK;
591 if(fcntl(btl_endpoint->endpoint_sd, F_SETFL, flags) < 0)
592 BTL_ERROR(("fcntl(F_SETFL) failed: %s (%d)",
593 strerror(opal_socket_errno), opal_socket_errno));
594 }
595
596
597 mca_btl_tcp2_proc_tosocks(btl_endpoint->endpoint_addr, &endpoint_addr);
598
599 opal_output_verbose(20, mca_btl_base_output,
600 "btl: tcp: attempting to connect() to address %s on port %d",
601 opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
602 btl_endpoint->endpoint_addr->addr_port);
603
604 if(connect(btl_endpoint->endpoint_sd, (struct sockaddr*)&endpoint_addr, addrlen) < 0) {
605
606 if(opal_socket_errno == EINPROGRESS || opal_socket_errno == EWOULDBLOCK) {
607 btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECTING;
608 MCA_BTL_TCP_ACTIVATE_EVENT(&btl_endpoint->endpoint_send_event, 0);
609 return OMPI_SUCCESS;
610 }
611 {
612 char *address;
613 address = opal_net_get_hostname((struct sockaddr*) &endpoint_addr);
614 BTL_PEER_ERROR( btl_endpoint->endpoint_proc->proc_ompi,
615 ( "Unable to connect to the peer %s on port %d: %s\n",
616 address,
617 btl_endpoint->endpoint_addr->addr_port, strerror(opal_socket_errno) ) );
618 }
619 mca_btl_tcp2_endpoint_close(btl_endpoint);
620 btl_endpoint->endpoint_retries++;
621 return OMPI_ERR_UNREACH;
622 }
623
624
625 if((rc = mca_btl_tcp2_endpoint_send_connect_ack(btl_endpoint)) == OMPI_SUCCESS) {
626 btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECT_ACK;
627 MCA_BTL_TCP_ACTIVATE_EVENT(&btl_endpoint->endpoint_recv_event, 0);
628 } else {
629 mca_btl_tcp2_endpoint_close(btl_endpoint);
630 }
631 return rc;
632 }
633
634
635
636
637
638
639
640 static void mca_btl_tcp2_endpoint_complete_connect(mca_btl_base_endpoint_t* btl_endpoint)
641 {
642 int so_error = 0;
643 opal_socklen_t so_length = sizeof(so_error);
644 struct sockaddr_storage endpoint_addr;
645
646 mca_btl_tcp2_proc_tosocks(btl_endpoint->endpoint_addr, &endpoint_addr);
647
648
649 opal_event_del(&btl_endpoint->endpoint_send_event);
650
651
652 if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_ERROR, (char *)&so_error, &so_length) < 0) {
653 BTL_ERROR(("getsockopt() to %s failed: %s (%d)",
654 opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
655 strerror(opal_socket_errno), opal_socket_errno));
656 mca_btl_tcp2_endpoint_close(btl_endpoint);
657 return;
658 }
659 if(so_error == EINPROGRESS || so_error == EWOULDBLOCK) {
660 opal_event_add(&btl_endpoint->endpoint_send_event, 0);
661 return;
662 }
663 if(so_error != 0) {
664 BTL_ERROR(("connect() to %s failed: %s (%d)",
665 opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
666 strerror(so_error), so_error));
667 mca_btl_tcp2_endpoint_close(btl_endpoint);
668 return;
669 }
670
671 if(mca_btl_tcp2_endpoint_send_connect_ack(btl_endpoint) == OMPI_SUCCESS) {
672 btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECT_ACK;
673 opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
674 } else {
675 mca_btl_tcp2_endpoint_close(btl_endpoint);
676 }
677 }
678
679
680
681
682
683
684
685 static void mca_btl_tcp2_endpoint_recv_handler(int sd, short flags, void* user)
686 {
687 mca_btl_base_endpoint_t* btl_endpoint = (mca_btl_base_endpoint_t *)user;
688
689
690
691
692 if( sd != btl_endpoint->endpoint_sd )
693 return;
694
695 OPAL_THREAD_LOCK(&btl_endpoint->endpoint_recv_lock);
696 switch(btl_endpoint->endpoint_state) {
697 case MCA_BTL_TCP_CONNECT_ACK:
698 {
699 int rc = OMPI_ERROR;
700 rc = mca_btl_tcp2_endpoint_recv_connect_ack(btl_endpoint);
701 if( OMPI_SUCCESS == rc ) {
702
703 OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
704 mca_btl_tcp2_endpoint_connected(btl_endpoint);
705 OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
706 #if OPAL_ENABLE_DEBUG && WANT_PEER_DUMP
707 mca_btl_tcp2_endpoint_dump(btl_endpoint, "connected");
708 #endif
709 }
710 OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
711 return;
712 }
713 case MCA_BTL_TCP_CONNECTED:
714 {
715 mca_btl_tcp2_frag_t* frag;
716
717 frag = btl_endpoint->endpoint_recv_frag;
718
719 data_still_pending_on_endpoint:
720 if(NULL == frag) {
721
722 if(mca_btl_tcp_module.super.btl_max_send_size >
723 mca_btl_tcp_module.super.btl_eager_limit) {
724 MCA_BTL_TCP_FRAG_ALLOC_MAX(frag);
725 } else {
726 MCA_BTL_TCP_FRAG_ALLOC_EAGER(frag);
727 }
728
729 if(NULL == frag) {
730 OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
731 return;
732 }
733 MCA_BTL_TCP_FRAG_INIT_DST(frag, btl_endpoint);
734 }
735
736
737 if( mca_btl_tcp_frag_recv(frag, btl_endpoint->endpoint_sd) == false ) {
738 btl_endpoint->endpoint_recv_frag = frag;
739 } else {
740 btl_endpoint->endpoint_recv_frag = NULL;
741
742 TODO_MCA_BTL_TCP_RECV_TRIGGER_CB(frag);
743
744 #if MCA_BTL_TCP_ENDPOINT_CACHE
745 if( 0 != btl_endpoint->endpoint_cache_length ) {
746 #if MCA_BTL_TCP_USES_PROGRESS_THREAD
747
748 frag = NULL;
749 #else
750
751
752
753 MCA_BTL_TCP_FRAG_INIT_DST(frag, btl_endpoint);
754 #endif
755 goto data_still_pending_on_endpoint;
756 }
757 #endif
758
759 #if !MCA_BTL_TCP_USES_PROGRESS_THREAD
760 MCA_BTL_TCP_FRAG_RETURN(frag);
761 #endif
762 }
763 #if MCA_BTL_TCP_ENDPOINT_CACHE
764 assert( 0 == btl_endpoint->endpoint_cache_length );
765 #endif
766 OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
767 break;
768 }
769 case MCA_BTL_TCP_CLOSED:
770
771
772
773
774
775
776 break;
777 default:
778 OPAL_THREAD_LOCK(&btl_endpoint->endpoint_recv_lock);
779 BTL_ERROR(("invalid socket state(%d)", btl_endpoint->endpoint_state));
780 btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
781 mca_btl_tcp_endpoint_close(btl_endpoint);
782 OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
783 break;
784 }
785 }
786
787
788
789
790
791
792
793 static void mca_btl_tcp2_endpoint_send_handler(int sd, short flags, void* user)
794 {
795 mca_btl_tcp_endpoint_t* btl_endpoint = (mca_btl_tcp_endpoint_t *)user;
796 opal_mutex_atomic_lock(&btl_endpoint->endpoint_send_lock);
797 switch(btl_endpoint->endpoint_state) {
798 case MCA_BTL_TCP_CONNECTING:
799 mca_btl_tcp2_endpoint_complete_connect(btl_endpoint);
800 break;
801 case MCA_BTL_TCP_CONNECTED:
802
803 while (NULL != btl_endpoint->endpoint_send_frag) {
804 mca_btl_tcp2_frag_t* frag = btl_endpoint->endpoint_send_frag;
805 int btl_ownership = (frag->base.des_flags & MCA_BTL_DES_FLAGS_BTL_OWNERSHIP);
806
807 if(mca_btl_tcp2_frag_send(frag, btl_endpoint->endpoint_sd) == false) {
808 break;
809 }
810
811 btl_endpoint->endpoint_send_frag = (mca_btl_tcp2_frag_t*)
812 opal_list_remove_first(&btl_endpoint->endpoint_frags);
813
814
815 opal_mutex_atomic_unlock(&btl_endpoint->endpoint_send_lock);
816 assert( frag->base.des_flags & MCA_BTL_DES_SEND_ALWAYS_CALLBACK );
817 TODO_MCA_BTL_TCP_COMPLETE_FRAG_SEND(frag);
818 opal_mutex_atomic_lock(&btl_endpoint->endpoint_send_lock);
819 }
820
821
822 if(NULL == btl_endpoint->endpoint_send_frag) {
823 opal_event_del(&btl_endpoint->endpoint_send_event);
824 }
825 break;
826 default:
827 BTL_ERROR(("invalid connection state (%d)", btl_endpoint->endpoint_state));
828 opal_event_del(&btl_endpoint->endpoint_send_event);
829 break;
830 }
831 opal_mutex_atomic_unlock(&btl_endpoint->endpoint_send_lock);
832 }
833
834
835