This source file includes following definitions.
- mca_btl_tcp2_endpoint_construct
- mca_btl_tcp2_endpoint_destruct
- mca_btl_tcp_endpoint_dump
- mca_btl_tcp2_endpoint_event_init
- mca_btl_tcp2_endpoint_send
- mca_btl_tcp2_endpoint_send_blocking
- mca_btl_tcp2_endpoint_send_connect_ack
- mca_btl_tcp2_endpoint_accept
- mca_btl_tcp2_endpoint_close
- mca_btl_tcp2_endpoint_connected
- mca_btl_tcp2_endpoint_recv_blocking
- mca_btl_tcp2_endpoint_recv_connect_ack
- mca_btl_tcp2_set_socket_options
- mca_btl_tcp2_endpoint_start_connect
- mca_btl_tcp2_endpoint_complete_connect
- mca_btl_tcp2_endpoint_recv_handler
- mca_btl_tcp2_endpoint_send_handler
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 
  17 
  18 
  19 
  20 
  21 
  22 #include "ompi_config.h"
  23 
  24 #include <stdlib.h>
  25 #include <string.h>
  26 #ifdef HAVE_UNISTD_H
  27 #include <unistd.h>
  28 #endif
  29 #include "opal/opal_socket_errno.h"
  30 #ifdef HAVE_SYS_TYPES_H
  31 #include <sys/types.h>
  32 #endif
  33 #ifdef HAVE_FCNTL_H
  34 #include <fcntl.h>
  35 #endif
  36 #ifdef HAVE_NETINET_IN_H
  37 #include <netinet/in.h>
  38 #endif
  39 #ifdef HAVE_NETINET_TCP_H
  40 #include <netinet/tcp.h>
  41 #endif
  42 #ifdef HAVE_ARPA_INET_H
  43 #include <arpa/inet.h>
  44 #endif
  45 #ifdef HAVE_SYS_TIME_H
  46 #include <sys/time.h>
  47 #endif  
  48 #ifdef HAVE_TIME_H
  49 #include <time.h>
  50 #endif  
  51 
  52 #include "opal/util/net.h"
  53 #include "opal/util/fd.h"
  54 #include "opal/util/show_help.h"
  55 #include "ompi/mca/btl/base/btl_base_error.h"
  56 #include "ompi/mca/rte/rte.h"
  57 
  58 #include "btl_tcp_endpoint.h"
  59 #include "btl_tcp_proc.h"
  60 #include "btl_tcp_frag.h"
  61 
  62 
  63 
  64 
  65 
  66 static void mca_btl_tcp2_endpoint_construct(mca_btl_tcp2_endpoint_t* endpoint)
  67 {
  68     endpoint->endpoint_btl = NULL;
  69     endpoint->endpoint_proc = NULL;
  70     endpoint->endpoint_addr = NULL;
  71     endpoint->endpoint_sd = -1;
  72     endpoint->endpoint_send_frag = 0;
  73     endpoint->endpoint_recv_frag = 0;
  74     endpoint->endpoint_state = MCA_BTL_TCP_CLOSED;
  75     endpoint->endpoint_retries = 0;
  76     endpoint->endpoint_nbo = false;
  77 #if MCA_BTL_TCP_ENDPOINT_CACHE
  78     endpoint->endpoint_cache        = NULL;
  79     endpoint->endpoint_cache_pos    = NULL;
  80     endpoint->endpoint_cache_length = 0;
  81 #endif  
  82     OBJ_CONSTRUCT(&endpoint->endpoint_frags, opal_list_t);
  83     OBJ_CONSTRUCT(&endpoint->endpoint_send_lock, opal_mutex_t);
  84     OBJ_CONSTRUCT(&endpoint->endpoint_recv_lock, opal_mutex_t);
  85 }
  86 
  87 
  88 
  89 
  90 
  91 static void mca_btl_tcp2_endpoint_destruct(mca_btl_tcp2_endpoint_t* endpoint)
  92 {
  93     mca_btl_tcp2_proc_remove(endpoint->endpoint_proc, endpoint);
  94     mca_btl_tcp2_endpoint_close(endpoint);
  95     OBJ_DESTRUCT(&endpoint->endpoint_frags);
  96     OBJ_DESTRUCT(&endpoint->endpoint_send_lock);
  97     OBJ_DESTRUCT(&endpoint->endpoint_recv_lock);
  98 }
  99 
 100 OBJ_CLASS_INSTANCE(
 101     mca_btl_tcp2_endpoint_t,
 102     opal_list_item_t,
 103     mca_btl_tcp2_endpoint_construct,
 104     mca_btl_tcp2_endpoint_destruct);
 105 
 106 
 107 static void mca_btl_tcp2_endpoint_construct(mca_btl_base_endpoint_t* btl_endpoint);
 108 static void mca_btl_tcp2_endpoint_destruct(mca_btl_base_endpoint_t* btl_endpoint);
 109 static int  mca_btl_tcp2_endpoint_start_connect(mca_btl_base_endpoint_t*);
 110 static void mca_btl_tcp2_endpoint_connected(mca_btl_base_endpoint_t*);
 111 static void mca_btl_tcp2_endpoint_recv_handler(int sd, short flags, void* user);
 112 static void mca_btl_tcp2_endpoint_send_handler(int sd, short flags, void* user);
 113 
 114 
 115 
 116 
 117 
 118 #define WANT_PEER_DUMP 0
 119 
 120 
 121 
 122 
 123 void mca_btl_tcp_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, const char* msg)
 124 {
 125     char src[64], dst[64], *status;
 126     int sndbuf, rcvbuf, nodelay, flags = -1;
 127 #if OPAL_ENABLE_IPV6
 128     struct sockaddr_storage inaddr;
 129 #else
 130     struct sockaddr_in inaddr;
 131 #endif
 132     opal_socklen_t obtlen;
 133     opal_socklen_t addrlen = sizeof(inaddr);
 134     opal_list_item_t *item;
 135 
 136     if( -1 != btl_endpoint->endpoint_sd ) {
 137         getsockname(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen);
 138 #if OPAL_ENABLE_IPV6
 139         {
 140             char *address;
 141             address = (char *) opal_net_get_hostname((struct sockaddr*) &inaddr);
 142             if (NULL != address) {
 143                 sprintf(src, "%s", address);
 144             }
 145         }
 146 #else
 147         sprintf(src, "%s", inet_ntoa(inaddr.sin_addr));
 148 #endif
 149         getpeername(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen);
 150 #if OPAL_ENABLE_IPV6
 151         {
 152             char *address;
 153             address = (char *) opal_net_get_hostname ((struct sockaddr*) &inaddr);
 154             if (NULL != address) {
 155                 sprintf(dst, "%s", address);
 156             }
 157         }
 158 #else
 159         sprintf(dst, "%s", inet_ntoa(inaddr.sin_addr));
 160 #endif
 161 
 162         if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) {
 163             BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)",
 164                        strerror(opal_socket_errno), opal_socket_errno));
 165         }
 166 
 167 #if defined(SO_SNDBUF)
 168         obtlen = sizeof(sndbuf);
 169         if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &obtlen) < 0) {
 170             BTL_ERROR(("SO_SNDBUF option: %s (%d)",
 171                        strerror(opal_socket_errno), opal_socket_errno));
 172         }
 173 #else
 174         sndbuf = -1;
 175 #endif
 176 #if defined(SO_RCVBUF)
 177         obtlen = sizeof(rcvbuf);
 178         if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf, &obtlen) < 0) {
 179             BTL_ERROR(("SO_RCVBUF option: %s (%d)",
 180                        strerror(opal_socket_errno), opal_socket_errno));
 181         }
 182 #else
 183         rcvbuf = -1;
 184 #endif
 185 #if defined(TCP_NODELAY)
 186         obtlen = sizeof(nodelay);
 187         if(getsockopt(btl_endpoint->endpoint_sd, IPPROTO_TCP, TCP_NODELAY, (char *)&nodelay, &obtlen) < 0) {
 188             BTL_ERROR(("TCP_NODELAY option: %s (%d)",
 189                        strerror(opal_socket_errno), opal_socket_errno));
 190         }
 191 #else
 192         nodelay = 0;
 193 #endif
 194     }
 195 
 196     mca_btl_base_err("%s %s: endpoint %p src %s - dst %s nodelay %d sndbuf %d rcvbuf %d flags %08x\n",
 197                      ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), msg, (void*)btl_endpoint, src, dst, nodelay, sndbuf, rcvbuf, flags);
 198 
 199     switch(btl_endpoint->endpoint_state) {
 200     case MCA_BTL_TCP_CONNECTING:
 201         status = "connecting"; break;
 202     case MCA_BTL_TCP_CONNECT_ACK:
 203         status = "connect ack"; break;
 204     case MCA_BTL_TCP_CLOSED:
 205         status = "closed"; break;
 206     case MCA_BTL_TCP_FAILED:
 207         status = "failed"; break;
 208     case MCA_BTL_TCP_CONNECTED:
 209         status = "connected"; break;
 210     default:
 211         status = "undefined"; break;
 212     }
 213     mca_btl_base_err("%s |  [socket %d] [state %s] (nbo %s) (retries %u)\n"
 214 #if MCA_BTL_TCP_ENDPOINT_CACHE
 215                      "\tcache %p length %lu pos %ld\n"
 216 #endif  
 217                      "\tpending: send %p recv %p\n",
 218                      msg, btl_endpoint->endpoint_sd, status,
 219                      (btl_endpoint->endpoint_nbo ? "true" : "false"), btl_endpoint->endpoint_retries,
 220 #if MCA_BTL_TCP_ENDPOINT_CACHE
 221                      btl_endpoint->endpoint_cache, btl_endpoint->endpoint_cache_length, btl_endpoint->endpoint_cache_pos - btl_endpoint->endpoint_cache,
 222 #endif  
 223                      (void*)btl_endpoint->endpoint_send_frag, (void*)btl_endpoint->endpoint_recv_frag );
 224     for(item =  opal_list_get_first(&btl_endpoint->endpoint_frags);
 225         item != opal_list_get_end(&btl_endpoint->endpoint_frags);
 226         item = opal_list_get_next(item)) {
 227         mca_btl_tcp_dump_frag( (mca_btl_tcp_frag_t*)item, " | send" );
 228     }
 229 }
 230 
 231 
 232 
 233 
 234 
 235 static inline void mca_btl_tcp2_endpoint_event_init(mca_btl_base_endpoint_t* btl_endpoint)
 236 {
 237 #if MCA_BTL_TCP_ENDPOINT_CACHE
 238     btl_endpoint->endpoint_cache     = (char*)malloc(mca_btl_tcp2_component.tcp_endpoint_cache);
 239     btl_endpoint->endpoint_cache_pos = btl_endpoint->endpoint_cache;
 240 #endif  
 241 
 242     opal_event_set(mca_btl_tcp_event_base, &btl_endpoint->endpoint_recv_event,
 243                    btl_endpoint->endpoint_sd,
 244                    OPAL_EV_READ|OPAL_EV_PERSIST,
 245                    mca_btl_tcp_endpoint_recv_handler,
 246                    btl_endpoint );
 247     
 248 
 249 
 250 
 251 
 252 
 253     opal_event_set(mca_btl_tcp_event_base, &btl_endpoint->endpoint_send_event,
 254                    btl_endpoint->endpoint_sd,
 255                    OPAL_EV_WRITE,
 256                    mca_btl_tcp_endpoint_send_handler,
 257                    btl_endpoint);
 258 }
 259 
 260 
 261 
 262 
 263 
 264 
 265 
 266 int mca_btl_tcp2_endpoint_send(mca_btl_base_endpoint_t* btl_endpoint, mca_btl_tcp2_frag_t* frag)
 267 {
 268     int rc = OMPI_SUCCESS;
 269 
 270     MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&btl_endpoint->endpoint_send_lock);
 271     switch(btl_endpoint->endpoint_state) {
 272     case MCA_BTL_TCP_CONNECTING:
 273     case MCA_BTL_TCP_CONNECT_ACK:
 274     case MCA_BTL_TCP_CLOSED:
 275         opal_list_append(&btl_endpoint->endpoint_frags, (opal_list_item_t*)frag);
 276         frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
 277         if(btl_endpoint->endpoint_state == MCA_BTL_TCP_CLOSED)
 278             rc = mca_btl_tcp2_endpoint_start_connect(btl_endpoint);
 279         break;
 280     case MCA_BTL_TCP_FAILED:
 281         rc = OMPI_ERR_UNREACH;
 282         break;
 283     case MCA_BTL_TCP_CONNECTED:
 284         if (btl_endpoint->endpoint_send_frag == NULL) {
 285             if(frag->base.des_flags & MCA_BTL_DES_FLAGS_PRIORITY &&
 286                mca_btl_tcp2_frag_send(frag, btl_endpoint->endpoint_sd)) {
 287                 int btl_ownership = (frag->base.des_flags & MCA_BTL_DES_FLAGS_BTL_OWNERSHIP);
 288                 opal_mutex_atomic_unlock(&btl_endpoint->endpoint_send_lock);
 289                 MCA_BTL_TCP_COMPLETE_FRAG_SEND(frag);
 290                 return 1;
 291             } else {
 292                 btl_endpoint->endpoint_send_frag = frag;
 293                 frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
 294                 MCA_BTL_TCP_ACTIVATE_EVENT(&btl_endpoint->endpoint_send_event, 0);
 295             }
 296         } else {
 297             frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
 298             opal_list_append(&btl_endpoint->endpoint_frags, (opal_list_item_t*)frag);
 299         }
 300         break;
 301     }
 302     MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&btl_endpoint->endpoint_send_lock);
 303     return rc;
 304 }
 305 
 306 
 307 
 308 
 309 
 310 
 311 static int mca_btl_tcp2_endpoint_send_blocking(mca_btl_base_endpoint_t* btl_endpoint, void* data, size_t size)
 312 {
 313     unsigned char* ptr = (unsigned char*)data;
 314     size_t cnt = 0;
 315     while(cnt < size) {
 316         int retval = send(btl_endpoint->endpoint_sd, (const char *)ptr+cnt, size-cnt, 0);
 317         if(retval < 0) {
 318             if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {
 319                 BTL_ERROR(("send() failed: %s (%d)",
 320                            strerror(opal_socket_errno), opal_socket_errno));
 321                 mca_btl_tcp2_endpoint_close(btl_endpoint);
 322                 return -1;
 323             }
 324             continue;
 325         }
 326         cnt += retval;
 327     }
 328     return cnt;
 329 }
 330 
 331 
 332 
 333 
 334 
 335 
 336 
 337 static int mca_btl_tcp2_endpoint_send_connect_ack(mca_btl_base_endpoint_t* btl_endpoint)
 338 {
 339     
 340     mca_btl_tcp2_proc_t* btl_proc = mca_btl_tcp2_proc_local();
 341     orte_process_name_t guid = btl_proc->proc_ompi->proc_name;
 342 
 343     ORTE_PROCESS_NAME_HTON(guid);
 344     if(mca_btl_tcp2_endpoint_send_blocking(btl_endpoint, &guid, sizeof(guid)) !=
 345           sizeof(guid)) {
 346         return OMPI_ERR_UNREACH;
 347     }
 348     return OMPI_SUCCESS;
 349 }
 350 
 351 
 352 
 353 
 354 
 355 
 356 
 357 
 358 
 359 
 360 bool mca_btl_tcp2_endpoint_accept(mca_btl_base_endpoint_t* btl_endpoint,
 361                                  struct sockaddr* addr, int sd)
 362 {
 363     mca_btl_tcp_proc_t *endpoint_proc = btl_endpoint->endpoint_proc;
 364     const orte_process_name_t *this_proc = &(ompi_proc_local()->proc_name);
 365     int cmpval;
 366 
 367     if(NULL == btl_endpoint->endpoint_addr) {
 368         return false;
 369     }
 370 
 371     OPAL_THREAD_LOCK(&btl_endpoint->endpoint_recv_lock);
 372     OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
 373 
 374     cmpval = ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL,
 375                                     &endpoint_proc->proc_ompi->proc_name,
 376                                     this_proc);
 377     if((btl_endpoint->endpoint_sd < 0) ||
 378        (btl_endpoint->endpoint_state != MCA_BTL_TCP_CONNECTED &&
 379         cmpval < 0)) {
 380         mca_btl_tcp2_endpoint_close(btl_endpoint);
 381         btl_endpoint->endpoint_sd = sd;
 382         if(mca_btl_tcp2_endpoint_send_connect_ack(btl_endpoint) != OMPI_SUCCESS) {
 383             mca_btl_tcp2_endpoint_close(btl_endpoint);
 384             OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
 385             OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
 386             return false;
 387         }
 388         mca_btl_tcp_endpoint_event_init(btl_endpoint);
 389         
 390 
 391 
 392         opal_event_add(&btl_endpoint->endpoint_recv_event, 0);  
 393         mca_btl_tcp_endpoint_connected(btl_endpoint);
 394 #if OPAL_ENABLE_DEBUG && WANT_PEER_DUMP
 395         mca_btl_tcp2_endpoint_dump(btl_endpoint, "accepted");
 396 #endif
 397         OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
 398         OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
 399         return true;
 400     }
 401     OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
 402     OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
 403     return false;
 404 }
 405 
 406 
 407 
 408 
 409 
 410 
 411 
 412 void mca_btl_tcp2_endpoint_close(mca_btl_base_endpoint_t* btl_endpoint)
 413 {
 414     int sd = btl_endpoint->endpoint_sd;
 415 
 416     do {
 417         if( sd < 0 ) return;
 418     } while ( opal_atomic_cmpset( &(btl_endpoint->endpoint_sd), sd, -1 ) );
 419 
 420     CLOSE_THE_SOCKET(sd);
 421     btl_endpoint->endpoint_retries++;
 422     opal_event_del(&btl_endpoint->endpoint_recv_event);
 423     opal_event_del(&btl_endpoint->endpoint_send_event);
 424 #if MCA_BTL_TCP_ENDPOINT_CACHE
 425     if( NULL != btl_endpoint->endpoint_cache )
 426         free( btl_endpoint->endpoint_cache );
 427     btl_endpoint->endpoint_cache        = NULL;
 428     btl_endpoint->endpoint_cache_pos    = NULL;
 429     btl_endpoint->endpoint_cache_length = 0;
 430 #endif  
 431 }
 432 
 433 
 434 
 435 
 436 
 437 
 438 
 439 static void mca_btl_tcp2_endpoint_connected(mca_btl_base_endpoint_t* btl_endpoint)
 440 {
 441     
 442     btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECTED;
 443     btl_endpoint->endpoint_retries = 0;
 444 
 445     
 446     opal_event_set(mca_btl_tcp_event_base, &btl_endpoint->endpoint_send_event,
 447                    btl_endpoint->endpoint_sd,
 448                    OPAL_EV_WRITE | OPAL_EV_PERSIST,
 449                    mca_btl_tcp_endpoint_send_handler,
 450                    btl_endpoint );
 451 
 452     if(opal_list_get_size(&btl_endpoint->endpoint_frags) > 0) {
 453         if(NULL == btl_endpoint->endpoint_send_frag) {
 454             btl_endpoint->endpoint_send_frag = (mca_btl_tcp_frag_t*)
 455                 opal_list_remove_first(&btl_endpoint->endpoint_frags);
 456         }
 457         opal_event_add(&btl_endpoint->endpoint_send_event, 0);
 458     }
 459 }
 460 
 461 
 462 
 463 
 464 
 465 
 466 static int mca_btl_tcp2_endpoint_recv_blocking(mca_btl_base_endpoint_t* btl_endpoint, void* data, size_t size)
 467 {
 468     unsigned char* ptr = (unsigned char*)data;
 469     size_t cnt = 0;
 470     while(cnt < size) {
 471         int retval = recv(btl_endpoint->endpoint_sd, (char *)ptr+cnt, size-cnt, 0);
 472 
 473         
 474         if(retval == 0) {
 475             mca_btl_tcp2_endpoint_close(btl_endpoint);
 476             return -1;
 477         }
 478 
 479         
 480         if(retval < 0) {
 481             if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {
 482                 BTL_ERROR(("recv(%d) failed: %s (%d)",
 483                            btl_endpoint->endpoint_sd, strerror(opal_socket_errno), opal_socket_errno));
 484                 mca_btl_tcp2_endpoint_close(btl_endpoint);
 485                 return -1;
 486             }
 487             continue;
 488         }
 489         cnt += retval;
 490     }
 491     return cnt;
 492 }
 493 
 494 
 495 
 496 
 497 
 498 
 499 
 500 static int mca_btl_tcp2_endpoint_recv_connect_ack(mca_btl_base_endpoint_t* btl_endpoint)
 501 {
 502     orte_process_name_t guid;
 503     mca_btl_tcp2_proc_t* btl_proc = btl_endpoint->endpoint_proc;
 504 
 505     if((mca_btl_tcp2_endpoint_recv_blocking(btl_endpoint, &guid, sizeof(orte_process_name_t))) != sizeof(orte_process_name_t)) {
 506         return OMPI_ERR_UNREACH;
 507     }
 508     ORTE_PROCESS_NAME_NTOH(guid);
 509     
 510     if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL,
 511                                                     &btl_proc->proc_ompi->proc_name,
 512                                                     &guid)) {
 513         BTL_ERROR(("received unexpected process identifier %s",
 514                    ORTE_NAME_PRINT(&guid)));
 515         mca_btl_tcp2_endpoint_close(btl_endpoint);
 516         return OMPI_ERR_UNREACH;
 517     }
 518 
 519     return OMPI_SUCCESS;
 520 }
 521 
 522 
 523 void mca_btl_tcp2_set_socket_options(int sd)
 524 {
 525     int optval;
 526 #if defined(TCP_NODELAY)
 527     optval = mca_btl_tcp2_component.tcp_use_nodelay;
 528     if(setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char *)&optval, sizeof(optval)) < 0) {
 529         BTL_ERROR(("setsockopt(TCP_NODELAY) failed: %s (%d)",
 530                    strerror(opal_socket_errno), opal_socket_errno));
 531     }
 532 #endif
 533 #if defined(SO_SNDBUF)
 534     if(mca_btl_tcp2_component.tcp_sndbuf > 0 &&
 535        setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&mca_btl_tcp2_component.tcp_sndbuf, sizeof(int)) < 0) {
 536         BTL_ERROR(("setsockopt(SO_SNDBUF) failed: %s (%d)",
 537                    strerror(opal_socket_errno), opal_socket_errno));
 538     }
 539 #endif
 540 #if defined(SO_RCVBUF)
 541     if(mca_btl_tcp2_component.tcp_rcvbuf > 0 &&
 542        setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&mca_btl_tcp2_component.tcp_rcvbuf, sizeof(int)) < 0) {
 543         BTL_ERROR(("setsockopt(SO_RCVBUF) failed: %s (%d)",
 544                    strerror(opal_socket_errno), opal_socket_errno));
 545     }
 546 #endif
 547 }
 548 
 549 
 550 
 551 
 552 
 553 
 554 
 555 
 556 
 557 
 558 static int mca_btl_tcp2_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpoint)
 559 {
 560     int rc,flags;
 561     struct sockaddr_storage endpoint_addr;
 562     
 563     uint16_t af_family = AF_INET;
 564     opal_socklen_t addrlen = sizeof(struct sockaddr_in);
 565 
 566 #if OPAL_ENABLE_IPV6
 567     if (AF_INET6 == btl_endpoint->endpoint_addr->addr_family) {
 568         af_family = AF_INET6;
 569         addrlen = sizeof (struct sockaddr_in6);
 570     }
 571 #endif
 572 
 573     btl_endpoint->endpoint_sd = socket(af_family, SOCK_STREAM, 0);
 574     if (btl_endpoint->endpoint_sd < 0) {
 575         btl_endpoint->endpoint_retries++;
 576         return OMPI_ERR_UNREACH;
 577     }
 578 
 579     
 580     mca_btl_tcp2_set_socket_options(btl_endpoint->endpoint_sd);
 581 
 582     
 583     mca_btl_tcp2_endpoint_event_init(btl_endpoint);
 584 
 585     
 586     if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) {
 587         BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)",
 588                    strerror(opal_socket_errno), opal_socket_errno));
 589     } else {
 590         flags |= O_NONBLOCK;
 591         if(fcntl(btl_endpoint->endpoint_sd, F_SETFL, flags) < 0)
 592             BTL_ERROR(("fcntl(F_SETFL) failed: %s (%d)",
 593                        strerror(opal_socket_errno), opal_socket_errno));
 594     }
 595 
 596     
 597     mca_btl_tcp2_proc_tosocks(btl_endpoint->endpoint_addr, &endpoint_addr);
 598 
 599     opal_output_verbose(20, mca_btl_base_output,
 600                         "btl: tcp: attempting to connect() to address %s on port %d",
 601                         opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
 602                         btl_endpoint->endpoint_addr->addr_port);
 603 
 604     if(connect(btl_endpoint->endpoint_sd, (struct sockaddr*)&endpoint_addr, addrlen) < 0) {
 605         
 606         if(opal_socket_errno == EINPROGRESS || opal_socket_errno == EWOULDBLOCK) {
 607             btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECTING;
 608             MCA_BTL_TCP_ACTIVATE_EVENT(&btl_endpoint->endpoint_send_event, 0);
 609             return OMPI_SUCCESS;
 610         }
 611         {
 612             char *address;
 613             address = opal_net_get_hostname((struct sockaddr*) &endpoint_addr);
 614             BTL_PEER_ERROR( btl_endpoint->endpoint_proc->proc_ompi,
 615                           ( "Unable to connect to the peer %s on port %d: %s\n",
 616                             address,
 617                            btl_endpoint->endpoint_addr->addr_port, strerror(opal_socket_errno) ) );
 618         }
 619         mca_btl_tcp2_endpoint_close(btl_endpoint);
 620         btl_endpoint->endpoint_retries++;
 621         return OMPI_ERR_UNREACH;
 622     }
 623 
 624     
 625     if((rc = mca_btl_tcp2_endpoint_send_connect_ack(btl_endpoint)) == OMPI_SUCCESS) {
 626         btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECT_ACK;
 627         MCA_BTL_TCP_ACTIVATE_EVENT(&btl_endpoint->endpoint_recv_event, 0);
 628     } else {
 629         mca_btl_tcp2_endpoint_close(btl_endpoint);
 630     }
 631     return rc;
 632 }
 633 
 634 
 635 
 636 
 637 
 638 
 639 
 640 static void mca_btl_tcp2_endpoint_complete_connect(mca_btl_base_endpoint_t* btl_endpoint)
 641 {
 642     int so_error = 0;
 643     opal_socklen_t so_length = sizeof(so_error);
 644     struct sockaddr_storage endpoint_addr;
 645 
 646     mca_btl_tcp2_proc_tosocks(btl_endpoint->endpoint_addr, &endpoint_addr);
 647 
 648     
 649     opal_event_del(&btl_endpoint->endpoint_send_event);
 650 
 651     
 652     if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_ERROR, (char *)&so_error, &so_length) < 0) {
 653         BTL_ERROR(("getsockopt() to %s failed: %s (%d)",
 654                    opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
 655                    strerror(opal_socket_errno), opal_socket_errno));
 656         mca_btl_tcp2_endpoint_close(btl_endpoint);
 657         return;
 658     }
 659     if(so_error == EINPROGRESS || so_error == EWOULDBLOCK) {
 660         opal_event_add(&btl_endpoint->endpoint_send_event, 0);
 661         return;
 662     }
 663     if(so_error != 0) {
 664         BTL_ERROR(("connect() to %s failed: %s (%d)",
 665                    opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
 666                    strerror(so_error), so_error));
 667         mca_btl_tcp2_endpoint_close(btl_endpoint);
 668         return;
 669     }
 670 
 671     if(mca_btl_tcp2_endpoint_send_connect_ack(btl_endpoint) == OMPI_SUCCESS) {
 672         btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECT_ACK;
 673         opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
 674     } else {
 675         mca_btl_tcp2_endpoint_close(btl_endpoint);
 676     }
 677 }
 678 
 679 
 680 
 681 
 682 
 683 
 684 
 685 static void mca_btl_tcp2_endpoint_recv_handler(int sd, short flags, void* user)
 686 {
 687     mca_btl_base_endpoint_t* btl_endpoint = (mca_btl_base_endpoint_t *)user;
 688 
 689     
 690 
 691 
 692     if( sd != btl_endpoint->endpoint_sd )
 693         return;
 694 
 695     OPAL_THREAD_LOCK(&btl_endpoint->endpoint_recv_lock);
 696     switch(btl_endpoint->endpoint_state) {
 697     case MCA_BTL_TCP_CONNECT_ACK:
 698         {
 699             int rc = OMPI_ERROR;
 700             rc = mca_btl_tcp2_endpoint_recv_connect_ack(btl_endpoint);
 701             if( OMPI_SUCCESS == rc ) {
 702                 
 703                 OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
 704                 mca_btl_tcp2_endpoint_connected(btl_endpoint);
 705                 OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
 706 #if OPAL_ENABLE_DEBUG && WANT_PEER_DUMP
 707                 mca_btl_tcp2_endpoint_dump(btl_endpoint, "connected");
 708 #endif
 709             }
 710             OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
 711             return;
 712         }
 713     case MCA_BTL_TCP_CONNECTED:
 714         {
 715             mca_btl_tcp2_frag_t* frag;
 716 
 717             frag = btl_endpoint->endpoint_recv_frag;
 718 
 719         data_still_pending_on_endpoint:
 720             if(NULL == frag) {
 721 
 722                 if(mca_btl_tcp_module.super.btl_max_send_size >
 723                    mca_btl_tcp_module.super.btl_eager_limit) {
 724                     MCA_BTL_TCP_FRAG_ALLOC_MAX(frag);
 725                 } else {
 726                     MCA_BTL_TCP_FRAG_ALLOC_EAGER(frag);
 727                 }
 728 
 729                 if(NULL == frag) {
 730                     OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
 731                     return;
 732                 }
 733                 MCA_BTL_TCP_FRAG_INIT_DST(frag, btl_endpoint);
 734             }
 735 
 736             
 737             if( mca_btl_tcp_frag_recv(frag, btl_endpoint->endpoint_sd) == false ) {
 738                 btl_endpoint->endpoint_recv_frag = frag;
 739             } else {
 740                 btl_endpoint->endpoint_recv_frag = NULL;
 741 
 742                 TODO_MCA_BTL_TCP_RECV_TRIGGER_CB(frag);
 743 
 744 #if MCA_BTL_TCP_ENDPOINT_CACHE
 745                 if( 0 != btl_endpoint->endpoint_cache_length ) {
 746 #if MCA_BTL_TCP_USES_PROGRESS_THREAD
 747                     
 748                     frag = NULL;
 749 #else
 750                     
 751 
 752 
 753                     MCA_BTL_TCP_FRAG_INIT_DST(frag, btl_endpoint);
 754 #endif  
 755                     goto data_still_pending_on_endpoint;
 756                 }
 757 #endif  
 758 
 759 #if !MCA_BTL_TCP_USES_PROGRESS_THREAD
 760                 MCA_BTL_TCP_FRAG_RETURN(frag);
 761 #endif  
 762             }
 763 #if MCA_BTL_TCP_ENDPOINT_CACHE
 764             assert( 0 == btl_endpoint->endpoint_cache_length );
 765 #endif  
 766             OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
 767             break;
 768         }
 769     case MCA_BTL_TCP_CLOSED:
 770         
 771 
 772 
 773 
 774 
 775 
 776         break;
 777     default:
 778         OPAL_THREAD_LOCK(&btl_endpoint->endpoint_recv_lock);
 779         BTL_ERROR(("invalid socket state(%d)", btl_endpoint->endpoint_state));
 780         btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
 781         mca_btl_tcp_endpoint_close(btl_endpoint);
 782         OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
 783         break;
 784     }
 785 }
 786 
 787 
 788 
 789 
 790 
 791 
 792 
 793 static void mca_btl_tcp2_endpoint_send_handler(int sd, short flags, void* user)
 794 {
 795     mca_btl_tcp_endpoint_t* btl_endpoint = (mca_btl_tcp_endpoint_t *)user;
 796     opal_mutex_atomic_lock(&btl_endpoint->endpoint_send_lock);
 797     switch(btl_endpoint->endpoint_state) {
 798     case MCA_BTL_TCP_CONNECTING:
 799         mca_btl_tcp2_endpoint_complete_connect(btl_endpoint);
 800         break;
 801     case MCA_BTL_TCP_CONNECTED:
 802         
 803         while (NULL != btl_endpoint->endpoint_send_frag) {
 804             mca_btl_tcp2_frag_t* frag = btl_endpoint->endpoint_send_frag;
 805             int btl_ownership = (frag->base.des_flags & MCA_BTL_DES_FLAGS_BTL_OWNERSHIP);
 806 
 807             if(mca_btl_tcp2_frag_send(frag, btl_endpoint->endpoint_sd) == false) {
 808                 break;
 809             }
 810             
 811             btl_endpoint->endpoint_send_frag = (mca_btl_tcp2_frag_t*)
 812                 opal_list_remove_first(&btl_endpoint->endpoint_frags);
 813 
 814             
 815             opal_mutex_atomic_unlock(&btl_endpoint->endpoint_send_lock);
 816             assert( frag->base.des_flags & MCA_BTL_DES_SEND_ALWAYS_CALLBACK );
 817             TODO_MCA_BTL_TCP_COMPLETE_FRAG_SEND(frag);
 818             opal_mutex_atomic_lock(&btl_endpoint->endpoint_send_lock);
 819         }
 820 
 821         
 822         if(NULL == btl_endpoint->endpoint_send_frag) {
 823             opal_event_del(&btl_endpoint->endpoint_send_event);
 824         }
 825         break;
 826     default:
 827         BTL_ERROR(("invalid connection state (%d)", btl_endpoint->endpoint_state));
 828         opal_event_del(&btl_endpoint->endpoint_send_event);
 829         break;
 830     }
 831     opal_mutex_atomic_unlock(&btl_endpoint->endpoint_send_lock);
 832 }
 833 
 834 
 835