This source file includes following definitions.
- mca_btl_tcp_endpoint_construct
- mca_btl_tcp_endpoint_destruct
- mca_btl_tcp_endpoint_dump
- mca_btl_tcp_endpoint_event_init
- mca_btl_tcp_endpoint_send
- mca_btl_tcp_endpoint_send_blocking
- mca_btl_tcp_endpoint_send_connect_ack
- mca_btl_tcp_endpoint_complete_accept
- mca_btl_tcp_endpoint_accept
- mca_btl_tcp_endpoint_close
- mca_btl_tcp_endpoint_connected
- mca_btl_tcp_endpoint_recv_connect_ack
- mca_btl_tcp_set_socket_options
- mca_btl_tcp_endpoint_start_connect
- mca_btl_tcp_endpoint_complete_connect
- mca_btl_tcp_endpoint_recv_handler
- mca_btl_tcp_endpoint_send_handler
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 
  17 
  18 
  19 
  20 
  21 
  22 
  23 
  24 
  25 
  26 #include "opal_config.h"
  27 
  28 #include <stdlib.h>
  29 #include <string.h>
  30 #ifdef HAVE_UNISTD_H
  31 #include <unistd.h>
  32 #endif
  33 #include "opal/opal_socket_errno.h"
  34 #ifdef HAVE_SYS_TYPES_H
  35 #include <sys/types.h>
  36 #endif
  37 #ifdef HAVE_FCNTL_H
  38 #include <fcntl.h>
  39 #endif
  40 #ifdef HAVE_NETINET_IN_H
  41 #include <netinet/in.h>
  42 #endif
  43 #ifdef HAVE_NETINET_TCP_H
  44 #include <netinet/tcp.h>
  45 #endif
  46 #ifdef HAVE_ARPA_INET_H
  47 #include <arpa/inet.h>
  48 #endif
  49 #ifdef HAVE_SYS_TIME_H
  50 #include <sys/time.h>
  51 #endif  
  52 #include <time.h>
  53 
  54 #include "opal/mca/event/event.h"
  55 #include "opal/util/net.h"
  56 #include "opal/util/show_help.h"
  57 #include "opal/util/proc.h"
  58 #include "opal/util/printf.h"
  59 #include "opal/util/string_copy.h"
  60 #include "opal/mca/btl/base/btl_base_error.h"
  61 
  62 #include "btl_tcp.h"
  63 #include "btl_tcp_endpoint.h"
  64 #include "btl_tcp_proc.h"
  65 #include "btl_tcp_frag.h"
  66 #include "btl_tcp_addr.h"
  67 
  68 
  69 
  70 
  71 
  72 const char mca_btl_tcp_magic_id_string[MCA_BTL_TCP_MAGIC_STRING_LENGTH] = "OPAL-TCP-BTL";
  73 
  74 
  75 
  76 
  77 
  78 static void mca_btl_tcp_endpoint_construct(mca_btl_tcp_endpoint_t* endpoint)
  79 {
  80     endpoint->endpoint_btl = NULL;
  81     endpoint->endpoint_proc = NULL;
  82     endpoint->endpoint_addr = NULL;
  83     endpoint->endpoint_sd = -1;
  84     endpoint->endpoint_sd_next = -1;
  85     endpoint->endpoint_send_frag = 0;
  86     endpoint->endpoint_recv_frag = 0;
  87     endpoint->endpoint_state = MCA_BTL_TCP_CLOSED;
  88     endpoint->endpoint_retries = 0;
  89     endpoint->endpoint_nbo = false;
  90 #if MCA_BTL_TCP_ENDPOINT_CACHE
  91     endpoint->endpoint_cache        = NULL;
  92     endpoint->endpoint_cache_pos    = NULL;
  93     endpoint->endpoint_cache_length = 0;
  94 #endif  
  95     OBJ_CONSTRUCT(&endpoint->endpoint_frags, opal_list_t);
  96     OBJ_CONSTRUCT(&endpoint->endpoint_send_lock, opal_mutex_t);
  97     OBJ_CONSTRUCT(&endpoint->endpoint_recv_lock, opal_mutex_t);
  98 }
  99 
 100 
 101 
 102 
 103 
 104 static void mca_btl_tcp_endpoint_destruct(mca_btl_tcp_endpoint_t* endpoint)
 105 {
 106     mca_btl_tcp_endpoint_close(endpoint);
 107     mca_btl_tcp_proc_remove(endpoint->endpoint_proc, endpoint);
 108     OBJ_DESTRUCT(&endpoint->endpoint_frags);
 109     OBJ_DESTRUCT(&endpoint->endpoint_send_lock);
 110     OBJ_DESTRUCT(&endpoint->endpoint_recv_lock);
 111 }
 112 
 113 OBJ_CLASS_INSTANCE(
 114     mca_btl_tcp_endpoint_t,
 115     opal_list_item_t,
 116     mca_btl_tcp_endpoint_construct,
 117     mca_btl_tcp_endpoint_destruct);
 118 
 119 
 120 static void mca_btl_tcp_endpoint_construct(mca_btl_base_endpoint_t* btl_endpoint);
 121 static void mca_btl_tcp_endpoint_destruct(mca_btl_base_endpoint_t* btl_endpoint);
 122 static int  mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t*);
 123 static void mca_btl_tcp_endpoint_connected(mca_btl_base_endpoint_t*);
 124 static void mca_btl_tcp_endpoint_recv_handler(int sd, short flags, void* user);
 125 static void mca_btl_tcp_endpoint_send_handler(int sd, short flags, void* user);
 126 
 127 
 128 
 129 
 130 
 131 #if OPAL_ENABLE_DEBUG && WANT_PEER_DUMP
 132 
 133 #define DEBUG_LENGTH  1024
 134 
 135 
 136 
 137 
 138 
 139 
 140 
 141 
 142 void
 143 mca_btl_tcp_endpoint_dump(int level,
 144                           const char* fname,
 145                           int lineno,
 146                           const char* funcname,
 147                           mca_btl_base_endpoint_t* btl_endpoint,
 148                           bool full_info,
 149                           const char* msg)
 150 {
 151     char outmsg[DEBUG_LENGTH];
 152     int sndbuf, rcvbuf, nodelay, flags, used = 0;
 153 #if OPAL_ENABLE_IPV6
 154     struct sockaddr_storage inaddr;
 155 #else
 156     struct sockaddr_in inaddr;
 157 #endif
 158     opal_socklen_t obtlen;
 159     opal_socklen_t addrlen = sizeof(inaddr);
 160     mca_btl_tcp_frag_t* item;
 161 
 162     used += snprintf(&outmsg[used], DEBUG_LENGTH - used, "%s: ", msg);
 163     if (used >= DEBUG_LENGTH) goto out;
 164 
 165     getsockname(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen);
 166 #if OPAL_ENABLE_IPV6
 167     {
 168         char *address;
 169         address = (char *) opal_net_get_hostname((struct sockaddr*) &inaddr);
 170         if (NULL != address) {
 171             used += snprintf(&outmsg[used], DEBUG_LENGTH - used, "%s -", address);
 172             if (used >= DEBUG_LENGTH) goto out;
 173         }
 174     }
 175 #else
 176     used += snprintf(&outmsg[used], DEBUG_LENGTH - used, "%s -", inet_ntoa(inaddr.sin_addr));
 177     if (used >= DEBUG_LENGTH) goto out;
 178 #endif
 179     getpeername(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen);
 180 #if OPAL_ENABLE_IPV6
 181     {
 182         char *address;
 183         address = (char *) opal_net_get_hostname ((struct sockaddr*) &inaddr);
 184         if (NULL != address) {
 185             used += snprintf(&outmsg[used], DEBUG_LENGTH - used, " %s", address);
 186             if (used >= DEBUG_LENGTH) goto out;
 187         }
 188     }
 189 #else
 190     used += snprintf(&outmsg[used], DEBUG_LENGTH - used, " %s", inet_ntoa(inaddr.sin_addr));
 191     if (used >= DEBUG_LENGTH) goto out;
 192 #endif
 193 
 194     used = snprintf(outmsg, DEBUG_LENGTH, "[%d", btl_endpoint->endpoint_sd);
 195     if (used >= DEBUG_LENGTH) goto out;
 196     switch(btl_endpoint->endpoint_state) {
 197     case MCA_BTL_TCP_CONNECTING:
 198         used += snprintf(&outmsg[used], DEBUG_LENGTH - used, ":%s]", "connecting");
 199         if (used >= DEBUG_LENGTH) goto out;
 200         break;
 201     case MCA_BTL_TCP_CONNECT_ACK:
 202         used += snprintf(&outmsg[used], DEBUG_LENGTH - used, ":%s]", "ack");
 203         if (used >= DEBUG_LENGTH) goto out;
 204         break;
 205     case MCA_BTL_TCP_CLOSED:
 206         used += snprintf(&outmsg[used], DEBUG_LENGTH - used, ":%s]", "close");
 207         if (used >= DEBUG_LENGTH) goto out;
 208         break;
 209     case MCA_BTL_TCP_FAILED:
 210         used += snprintf(&outmsg[used], DEBUG_LENGTH - used, ":%s]", "failed");
 211         if (used >= DEBUG_LENGTH) goto out;
 212         break;
 213     case MCA_BTL_TCP_CONNECTED:
 214         used += snprintf(&outmsg[used], DEBUG_LENGTH - used, ":%s]", "connected");
 215         if (used >= DEBUG_LENGTH) goto out;
 216         break;
 217     default:
 218         used += snprintf(&outmsg[used], DEBUG_LENGTH - used, ":%s]", "unknown");
 219         if (used >= DEBUG_LENGTH) goto out;
 220         break;
 221     }
 222 
 223     if( full_info ) {
 224         if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) {
 225             BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)",
 226                        strerror(opal_socket_errno), opal_socket_errno));
 227         }
 228 
 229 #if defined(SO_SNDBUF)
 230         obtlen = sizeof(sndbuf);
 231         if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &obtlen) < 0) {
 232             BTL_ERROR(("SO_SNDBUF option: %s (%d)",
 233                        strerror(opal_socket_errno), opal_socket_errno));
 234         }
 235 #else
 236         sndbuf = -1;
 237 #endif
 238 #if defined(SO_RCVBUF)
 239         obtlen = sizeof(rcvbuf);
 240         if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf, &obtlen) < 0) {
 241             BTL_ERROR(("SO_RCVBUF option: %s (%d)",
 242                        strerror(opal_socket_errno), opal_socket_errno));
 243         }
 244 #else
 245         rcvbuf = -1;
 246 #endif
 247 #if defined(TCP_NODELAY)
 248         obtlen = sizeof(nodelay);
 249         if(getsockopt(btl_endpoint->endpoint_sd, IPPROTO_TCP, TCP_NODELAY, (char *)&nodelay, &obtlen) < 0) {
 250             BTL_ERROR(("TCP_NODELAY option: %s (%d)",
 251                        strerror(opal_socket_errno), opal_socket_errno));
 252         }
 253 #else
 254         nodelay = 0;
 255 #endif
 256         used += snprintf(&outmsg[used], DEBUG_LENGTH - used, " nodelay %d sndbuf %d rcvbuf %d flags %08x",
 257                          nodelay, sndbuf, rcvbuf, flags);
 258         if (used >= DEBUG_LENGTH) goto out;
 259 #if MCA_BTL_TCP_ENDPOINT_CACHE
 260         used += snprintf(&outmsg[used], DEBUG_LENGTH - used, "\n\t[cache %p used %lu/%lu]",
 261                          (void*)btl_endpoint->endpoint_cache, btl_endpoint->endpoint_cache_pos - btl_endpoint->endpoint_cache,
 262                          btl_endpoint->endpoint_cache_length);
 263         if (used >= DEBUG_LENGTH) goto out;
 264 #endif  
 265         used += snprintf(&outmsg[used], DEBUG_LENGTH - used, "{%s - retries %d}",
 266                          (btl_endpoint->endpoint_nbo ? "NBO" : ""), (int)btl_endpoint->endpoint_retries);
 267         if (used >= DEBUG_LENGTH) goto out;
 268     }
 269     used += snprintf(&outmsg[used], DEBUG_LENGTH - used, "\n");
 270     if (used >= DEBUG_LENGTH) goto out;
 271 
 272     if( NULL != btl_endpoint->endpoint_recv_frag )
 273         used += mca_btl_tcp_frag_dump(btl_endpoint->endpoint_recv_frag, "active recv",
 274                                       &outmsg[used], DEBUG_LENGTH - used);
 275     if (used >= DEBUG_LENGTH) goto out;
 276 
 277     if( NULL != btl_endpoint->endpoint_send_frag )
 278         used += mca_btl_tcp_frag_dump(btl_endpoint->endpoint_send_frag, "active send (inaccurate iov)",
 279                                       &outmsg[used], DEBUG_LENGTH - used);
 280     if (used >= DEBUG_LENGTH) goto out;
 281     OPAL_LIST_FOREACH(item, &btl_endpoint->endpoint_frags, mca_btl_tcp_frag_t) {
 282         used += mca_btl_tcp_frag_dump(item, "pending send", &outmsg[used], DEBUG_LENGTH - used);
 283         if (used >= DEBUG_LENGTH) goto out;
 284     }
 285 out:
 286     outmsg[ used >= DEBUG_LENGTH ? (DEBUG_LENGTH-1) : used ] = '\0';
 287     opal_output_verbose(level, opal_btl_base_framework.framework_output,
 288                         "[%s:%d:%s][%s -> %s] %s",
 289                         fname, lineno, funcname,
 290                         OPAL_NAME_PRINT(opal_proc_local_get()->proc_name),
 291                         (NULL != btl_endpoint->endpoint_proc ? OPAL_NAME_PRINT(btl_endpoint->endpoint_proc->proc_opal->proc_name) : "unknown remote"),
 292                         outmsg);
 293 }
 294 #endif 
 295 
 296 
 297 
 298 
 299 
 300 static inline void mca_btl_tcp_endpoint_event_init(mca_btl_base_endpoint_t* btl_endpoint)
 301 {
 302 #if MCA_BTL_TCP_ENDPOINT_CACHE
 303     assert(NULL == btl_endpoint->endpoint_cache);
 304     btl_endpoint->endpoint_cache     = (char*)malloc(mca_btl_tcp_component.tcp_endpoint_cache);
 305     btl_endpoint->endpoint_cache_pos = btl_endpoint->endpoint_cache;
 306 #endif  
 307 
 308     opal_event_set(mca_btl_tcp_event_base, &btl_endpoint->endpoint_recv_event,
 309                     btl_endpoint->endpoint_sd,
 310                     OPAL_EV_READ | OPAL_EV_PERSIST,
 311                     mca_btl_tcp_endpoint_recv_handler,
 312                     btl_endpoint );
 313     
 314 
 315 
 316 
 317 
 318     opal_event_set(mca_btl_tcp_event_base, &btl_endpoint->endpoint_send_event,
 319                     btl_endpoint->endpoint_sd,
 320                     OPAL_EV_WRITE | OPAL_EV_PERSIST,
 321                     mca_btl_tcp_endpoint_send_handler,
 322                     btl_endpoint);
 323 }
 324 
 325 
 326 
 327 
 328 
 329 
 330 
 331 int mca_btl_tcp_endpoint_send(mca_btl_base_endpoint_t* btl_endpoint, mca_btl_tcp_frag_t* frag)
 332 {
 333     int rc = OPAL_SUCCESS;
 334 
 335     OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
 336     switch(btl_endpoint->endpoint_state) {
 337     case MCA_BTL_TCP_CONNECTING:
 338     case MCA_BTL_TCP_CONNECT_ACK:
 339     case MCA_BTL_TCP_CLOSED:
 340         opal_list_append(&btl_endpoint->endpoint_frags, (opal_list_item_t*)frag);
 341         frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
 342         if(btl_endpoint->endpoint_state == MCA_BTL_TCP_CLOSED)
 343             rc = mca_btl_tcp_endpoint_start_connect(btl_endpoint);
 344         break;
 345     case MCA_BTL_TCP_FAILED:
 346         rc = OPAL_ERR_UNREACH;
 347         break;
 348     case MCA_BTL_TCP_CONNECTED:
 349         if (NULL == btl_endpoint->endpoint_send_frag) {
 350             if(frag->base.des_flags & MCA_BTL_DES_FLAGS_PRIORITY &&
 351                mca_btl_tcp_frag_send(frag, btl_endpoint->endpoint_sd)) {
 352                 int btl_ownership = (frag->base.des_flags & MCA_BTL_DES_FLAGS_BTL_OWNERSHIP);
 353 
 354                 OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
 355                 if( frag->base.des_flags & MCA_BTL_DES_SEND_ALWAYS_CALLBACK ) {
 356                     frag->base.des_cbfunc(&frag->btl->super, frag->endpoint, &frag->base, frag->rc);
 357                 }
 358                 if( btl_ownership ) {
 359                     MCA_BTL_TCP_FRAG_RETURN(frag);
 360                 }
 361                 MCA_BTL_TCP_ENDPOINT_DUMP(50, btl_endpoint, true, "complete send fragment [endpoint_send]");
 362                 return 1;
 363             } else {
 364                 btl_endpoint->endpoint_send_frag = frag;
 365                 MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "event_add(send) [endpoint_send]");
 366                 frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
 367                 MCA_BTL_TCP_ACTIVATE_EVENT(&btl_endpoint->endpoint_send_event, 0);
 368             }
 369         } else {
 370             MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "send fragment enqueued [endpoint_send]");
 371             frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
 372             opal_list_append(&btl_endpoint->endpoint_frags, (opal_list_item_t*)frag);
 373         }
 374         break;
 375     }
 376     OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
 377     return rc;
 378 }
 379 
 380 
 381 
 382 
 383 
 384 
 385 static int
 386 mca_btl_tcp_endpoint_send_blocking(mca_btl_base_endpoint_t* btl_endpoint,
 387                                    const void* data, size_t size)
 388 {
 389     int ret = mca_btl_tcp_send_blocking(btl_endpoint->endpoint_sd, data, size);
 390     if (ret < 0) {
 391         mca_btl_tcp_endpoint_close(btl_endpoint);
 392     }
 393     return ret;
 394 }
 395 
 396 
 397 
 398 
 399 
 400 static int 
 401 mca_btl_tcp_endpoint_send_connect_ack(mca_btl_base_endpoint_t* btl_endpoint)
 402 {
 403     opal_process_name_t guid = opal_proc_local_get()->proc_name;
 404     OPAL_PROCESS_NAME_HTON(guid);
 405     
 406     mca_btl_tcp_endpoint_hs_msg_t hs_msg;
 407     opal_string_copy(hs_msg.magic_id, mca_btl_tcp_magic_id_string,
 408                      sizeof(hs_msg.magic_id));
 409     hs_msg.guid = guid;
 410     
 411     if(sizeof(hs_msg) != 
 412        mca_btl_tcp_endpoint_send_blocking(btl_endpoint, 
 413                                           &hs_msg, sizeof(hs_msg))) {
 414          opal_show_help("help-mpi-btl-tcp.txt", "client handshake fail",
 415                        true, opal_process_info.nodename,
 416                        sizeof(hs_msg),
 417                        "connect ACK failed to send magic-id and guid");
 418           return OPAL_ERR_UNREACH;
 419     }
 420     return OPAL_SUCCESS;
 421 }
 422 
 423 static void *mca_btl_tcp_endpoint_complete_accept(int fd, int flags, void *context)
 424 {
 425     mca_btl_base_endpoint_t* btl_endpoint = (mca_btl_base_endpoint_t*)context;
 426     struct timeval now = {0, 0};
 427     int cmpval;
 428 
 429     if( OPAL_THREAD_TRYLOCK(&btl_endpoint->endpoint_recv_lock) ) {
 430         opal_event_add(&btl_endpoint->endpoint_accept_event, &now);
 431         return NULL;
 432     }
 433     if( OPAL_THREAD_TRYLOCK(&btl_endpoint->endpoint_send_lock) ) {
 434         OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
 435         opal_event_add(&btl_endpoint->endpoint_accept_event, &now);
 436         return NULL;
 437     }
 438 
 439     if(NULL == btl_endpoint->endpoint_addr) {
 440         CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd_next); 
 441         btl_endpoint->endpoint_sd_next = -1;
 442         OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
 443         OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
 444         if( NULL != btl_endpoint->endpoint_btl->tcp_error_cb ) {
 445             btl_endpoint->endpoint_btl->tcp_error_cb(
 446                 &btl_endpoint->endpoint_btl->super, MCA_BTL_ERROR_FLAGS_NONFATAL,
 447                 btl_endpoint->endpoint_proc->proc_opal,
 448                 "The endpoint addr is set to NULL (unsettling)");
 449         }
 450         return NULL;
 451     }
 452 
 453     cmpval = opal_compare_proc(btl_endpoint->endpoint_proc->proc_opal->proc_name,
 454                                opal_proc_local_get()->proc_name);
 455     if((btl_endpoint->endpoint_sd < 0) ||
 456        (btl_endpoint->endpoint_state != MCA_BTL_TCP_CONNECTED &&
 457         cmpval < 0)) {
 458         mca_btl_tcp_endpoint_close(btl_endpoint);
 459         btl_endpoint->endpoint_sd = btl_endpoint->endpoint_sd_next;
 460         btl_endpoint->endpoint_sd_next = -1;
 461         if(mca_btl_tcp_endpoint_send_connect_ack(btl_endpoint) != OPAL_SUCCESS) {
 462             MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, true, " [endpoint_accept]");
 463             btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
 464             mca_btl_tcp_endpoint_close(btl_endpoint);
 465             goto unlock_and_return;
 466         }
 467         mca_btl_tcp_endpoint_event_init(btl_endpoint);
 468         MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "event_add(recv) [endpoint_accept]");
 469         opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
 470         if( mca_btl_tcp_event_base == opal_sync_event_base ) {
 471             
 472             opal_progress_event_users_increment();
 473         }
 474         mca_btl_tcp_endpoint_connected(btl_endpoint);
 475 
 476         MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "accepted");
 477         goto unlock_and_return;
 478     }
 479     CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd_next); 
 480     btl_endpoint->endpoint_sd_next = -1;
 481   unlock_and_return:
 482     OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
 483     OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
 484     return NULL;
 485 }
 486 
 487 
 488 
 489 
 490 
 491 
 492 
 493 
 494 
 495 
 496 void mca_btl_tcp_endpoint_accept(mca_btl_base_endpoint_t* btl_endpoint,
 497                                  struct sockaddr* addr, int sd)
 498 {
 499     struct timeval now = {0, 0};
 500 
 501     assert(btl_endpoint->endpoint_sd_next == -1);
 502     btl_endpoint->endpoint_sd_next = sd;
 503 
 504     opal_event_evtimer_set(mca_btl_tcp_event_base, &btl_endpoint->endpoint_accept_event,
 505                            mca_btl_tcp_endpoint_complete_accept, btl_endpoint);
 506     opal_event_add(&btl_endpoint->endpoint_accept_event, &now);
 507 }
 508 
 509 
 510 
 511 
 512 
 513 
 514 
 515 void mca_btl_tcp_endpoint_close(mca_btl_base_endpoint_t* btl_endpoint)
 516 {
 517     MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, false, "[close]");
 518     if(btl_endpoint->endpoint_sd < 0)
 519         return;
 520     btl_endpoint->endpoint_retries++;
 521     MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, false, "event_del(recv) [close]");
 522     opal_event_del(&btl_endpoint->endpoint_recv_event);
 523     if( mca_btl_tcp_event_base == opal_sync_event_base ) {
 524         
 525         opal_progress_event_users_decrement();
 526     }
 527     MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, false, "event_del(send) [close]");
 528     opal_event_del(&btl_endpoint->endpoint_send_event);
 529 
 530 #if MCA_BTL_TCP_ENDPOINT_CACHE
 531     free( btl_endpoint->endpoint_cache );
 532     btl_endpoint->endpoint_cache        = NULL;
 533     btl_endpoint->endpoint_cache_pos    = NULL;
 534     btl_endpoint->endpoint_cache_length = 0;
 535 #endif  
 536 
 537     CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd);
 538     btl_endpoint->endpoint_sd = -1;
 539     
 540 
 541 
 542 
 543 
 544     if( MCA_BTL_TCP_FAILED == btl_endpoint->endpoint_state ) {
 545         mca_btl_tcp_frag_t* frag = btl_endpoint->endpoint_send_frag;
 546         if( NULL == frag )
 547             frag = (mca_btl_tcp_frag_t*)opal_list_remove_first(&btl_endpoint->endpoint_frags);
 548         while(NULL != frag) {
 549             frag->base.des_cbfunc(&frag->btl->super, frag->endpoint, &frag->base, OPAL_ERR_UNREACH);
 550 
 551             frag = (mca_btl_tcp_frag_t*)opal_list_remove_first(&btl_endpoint->endpoint_frags);
 552         }
 553     }
 554     btl_endpoint->endpoint_state = MCA_BTL_TCP_CLOSED;
 555 }
 556 
 557 
 558 
 559 
 560 
 561 
 562 
 563 static void mca_btl_tcp_endpoint_connected(mca_btl_base_endpoint_t* btl_endpoint)
 564 {
 565     
 566     assert( MCA_BTL_TCP_CONNECTED != btl_endpoint->endpoint_state );
 567     btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECTED;
 568     btl_endpoint->endpoint_retries = 0;
 569     MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, true, "READY [endpoint_connected]");
 570 
 571     if(opal_list_get_size(&btl_endpoint->endpoint_frags) > 0) {
 572         if(NULL == btl_endpoint->endpoint_send_frag)
 573             btl_endpoint->endpoint_send_frag = (mca_btl_tcp_frag_t*)
 574                 opal_list_remove_first(&btl_endpoint->endpoint_frags);
 575         MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "event_add(send) [endpoint_connected]");
 576         opal_event_add(&btl_endpoint->endpoint_send_event, 0);
 577     }
 578 }
 579 
 580 
 581 
 582 
 583 
 584 
 585 
 586 
 587 
 588 
 589 
 590 static int mca_btl_tcp_endpoint_recv_connect_ack(mca_btl_base_endpoint_t* btl_endpoint)
 591 {
 592     size_t retval, len = strlen(mca_btl_tcp_magic_id_string);;
 593     mca_btl_tcp_proc_t* btl_proc = btl_endpoint->endpoint_proc;
 594     opal_process_name_t guid;
 595 
 596     mca_btl_tcp_endpoint_hs_msg_t hs_msg;
 597     retval = mca_btl_tcp_recv_blocking(btl_endpoint->endpoint_sd, &hs_msg, sizeof(hs_msg));
 598 
 599     if (sizeof(hs_msg) != retval) {
 600         mca_btl_tcp_endpoint_close(btl_endpoint);
 601         if (0 == retval) {
 602             
 603 
 604 
 605 
 606             return OPAL_ERROR;
 607         }
 608         opal_show_help("help-mpi-btl-tcp.txt", "client handshake fail",
 609                        true, opal_process_info.nodename,
 610                        getpid(), "did not receive entire connect ACK from peer");
 611         
 612         return OPAL_ERR_BAD_PARAM;
 613     }
 614     if (0 != strncmp(hs_msg.magic_id, mca_btl_tcp_magic_id_string, len)) {
 615         opal_show_help("help-mpi-btl-tcp.txt", "server did not receive magic string",
 616                        true, opal_process_info.nodename,
 617                        getpid(), "client", hs_msg.magic_id,
 618                        "string value");
 619         return OPAL_ERR_BAD_PARAM;
 620     }
 621 
 622     guid = hs_msg.guid;
 623     OPAL_PROCESS_NAME_NTOH(guid);
 624     
 625     
 626 
 627 
 628     if (0 != opal_compare_proc(btl_proc->proc_opal->proc_name, guid)) {
 629         BTL_ERROR(("received unexpected process identifier %s",
 630                    OPAL_NAME_PRINT(guid)));
 631         mca_btl_tcp_endpoint_close(btl_endpoint);
 632         return OPAL_ERR_UNREACH;
 633     }
 634 
 635     return OPAL_SUCCESS;
 636 }
 637 
 638 
 639 void mca_btl_tcp_set_socket_options(int sd)
 640 {
 641 #if defined(TCP_NODELAY)
 642     int optval;
 643     optval = !mca_btl_tcp_component.tcp_not_use_nodelay;
 644     if(setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char *)&optval, sizeof(optval)) < 0) {
 645         BTL_ERROR(("setsockopt(TCP_NODELAY) failed: %s (%d)",
 646                    strerror(opal_socket_errno), opal_socket_errno));
 647     }
 648 #endif
 649 #if defined(SO_SNDBUF)
 650     if(mca_btl_tcp_component.tcp_sndbuf > 0 &&
 651        setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&mca_btl_tcp_component.tcp_sndbuf, sizeof(int)) < 0) {
 652         BTL_ERROR(("setsockopt(SO_SNDBUF) failed: %s (%d)",
 653                    strerror(opal_socket_errno), opal_socket_errno));
 654     }
 655 #endif
 656 #if defined(SO_RCVBUF)
 657     if(mca_btl_tcp_component.tcp_rcvbuf > 0 &&
 658        setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&mca_btl_tcp_component.tcp_rcvbuf, sizeof(int)) < 0) {
 659         BTL_ERROR(("setsockopt(SO_RCVBUF) failed: %s (%d)",
 660                    strerror(opal_socket_errno), opal_socket_errno));
 661     }
 662 #endif
 663 }
 664 
 665 
 666 
 667 
 668 
 669 
 670 
 671 
 672 
 673 
 674 static int mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpoint)
 675 {
 676     int rc,flags;
 677     struct sockaddr_storage endpoint_addr;
 678     
 679     uint16_t af_family = AF_INET;
 680     opal_socklen_t addrlen = sizeof(struct sockaddr_in);
 681 
 682 #if OPAL_ENABLE_IPV6
 683     if (AF_INET6 == btl_endpoint->endpoint_addr->addr_family) {
 684         af_family = AF_INET6;
 685         addrlen = sizeof (struct sockaddr_in6);
 686     }
 687 #endif
 688     assert( btl_endpoint->endpoint_sd < 0 );
 689     btl_endpoint->endpoint_sd = socket(af_family, SOCK_STREAM, 0);
 690     if (btl_endpoint->endpoint_sd < 0) {
 691         btl_endpoint->endpoint_retries++;
 692         return OPAL_ERR_UNREACH;
 693     }
 694 
 695     
 696     mca_btl_tcp_set_socket_options(btl_endpoint->endpoint_sd);
 697 
 698     
 699     mca_btl_tcp_endpoint_event_init(btl_endpoint);
 700 
 701     
 702     if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) {
 703         opal_show_help("help-mpi-btl-tcp.txt", "socket flag fail",
 704                        true, opal_process_info.nodename,
 705                        getpid(), "fcntl(sd, F_GETFL, 0)",
 706                        strerror(opal_socket_errno), opal_socket_errno);
 707         
 708         return OPAL_ERR_UNREACH;
 709     } else {
 710         flags |= O_NONBLOCK;
 711         if(fcntl(btl_endpoint->endpoint_sd, F_SETFL, flags) < 0) {
 712             opal_show_help("help-mpi-btl-tcp.txt", "socket flag fail",
 713                            true, opal_process_info.nodename,
 714                            getpid(),
 715                            "fcntl(sd, F_SETFL, flags & O_NONBLOCK)",
 716                            strerror(opal_socket_errno), opal_socket_errno);
 717             
 718             return OPAL_ERR_UNREACH;
 719         }
 720     }
 721 
 722     
 723     mca_btl_tcp_proc_tosocks(btl_endpoint->endpoint_addr, &endpoint_addr);
 724 
 725     
 726 
 727 
 728 
 729 
 730     if (endpoint_addr.ss_family == AF_INET) {
 731         assert(NULL != &btl_endpoint->endpoint_btl->tcp_ifaddr);
 732         if (bind(btl_endpoint->endpoint_sd, (struct sockaddr*) &btl_endpoint->endpoint_btl->tcp_ifaddr,
 733                  sizeof(struct sockaddr_in)) < 0) {
 734             BTL_ERROR(("bind on local address (%s:%d) failed: %s (%d)",
 735                        opal_net_get_hostname((struct sockaddr*) &btl_endpoint->endpoint_btl->tcp_ifaddr),
 736                        htons(((struct sockaddr_in*)&btl_endpoint->endpoint_btl->tcp_ifaddr)->sin_port),
 737                        strerror(opal_socket_errno), opal_socket_errno));
 738 
 739             CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd);
 740             return OPAL_ERROR;
 741         }
 742     }
 743 #if OPAL_ENABLE_IPV6
 744     if (endpoint_addr.ss_family == AF_INET6) {
 745         assert(NULL != &btl_endpoint->endpoint_btl->tcp_ifaddr);
 746         if (bind(btl_endpoint->endpoint_sd, (struct sockaddr*) &btl_endpoint->endpoint_btl->tcp_ifaddr,
 747                  sizeof(struct sockaddr_in6)) < 0) {
 748             BTL_ERROR(("bind on local address (%s:%d) failed: %s (%d)",
 749                        opal_net_get_hostname((struct sockaddr*) &btl_endpoint->endpoint_btl->tcp_ifaddr),
 750                        htons(((struct sockaddr_in6*)&btl_endpoint->endpoint_btl->tcp_ifaddr)->sin6_port),
 751                        strerror(opal_socket_errno), opal_socket_errno));
 752 
 753             CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd);
 754             return OPAL_ERROR;
 755         }
 756     }
 757 #endif
 758     opal_output_verbose(10, opal_btl_base_framework.framework_output,
 759                         "btl: tcp: attempting to connect() to %s address %s on port %d",
 760                         OPAL_NAME_PRINT(btl_endpoint->endpoint_proc->proc_opal->proc_name),
 761                         opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
 762                         ntohs(btl_endpoint->endpoint_addr->addr_port));
 763 
 764     if(0 == connect(btl_endpoint->endpoint_sd, (struct sockaddr*)&endpoint_addr, addrlen)) {
 765         opal_output_verbose(10, opal_btl_base_framework.framework_output,
 766                             "btl:tcp: connect() to %s:%d completed",
 767                             opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
 768                             ntohs(((struct sockaddr_in*) &endpoint_addr)->sin_port));
 769         
 770         if((rc = mca_btl_tcp_endpoint_send_connect_ack(btl_endpoint)) == OPAL_SUCCESS) {
 771             btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECT_ACK;
 772             MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "event_add(recv) [start_connect]");
 773             opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
 774             if( mca_btl_tcp_event_base == opal_sync_event_base ) {
 775                 
 776                 opal_progress_event_users_increment();
 777             }
 778             return OPAL_SUCCESS;
 779         }
 780         
 781         MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, true, "dropped connection [start_connect]");
 782     } else {
 783         
 784         if(opal_socket_errno == EINPROGRESS || opal_socket_errno == EWOULDBLOCK) {
 785             btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECTING;
 786             MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "event_add(send) [start_connect]");
 787             MCA_BTL_TCP_ACTIVATE_EVENT(&btl_endpoint->endpoint_send_event, 0);
 788             opal_output_verbose(30, opal_btl_base_framework.framework_output,
 789                                 "btl:tcp: would block, so allowing background progress");
 790             return OPAL_SUCCESS;
 791         }
 792     }
 793 
 794     {
 795         char *address;
 796         address = opal_net_get_hostname((struct sockaddr*) &endpoint_addr);
 797         BTL_PEER_ERROR( btl_endpoint->endpoint_proc->proc_opal,
 798                       ( "Unable to connect to the peer %s on port %d: %s\n",
 799                         address,
 800                         ntohs(btl_endpoint->endpoint_addr->addr_port), strerror(opal_socket_errno) ) );
 801     }
 802     btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
 803     mca_btl_tcp_endpoint_close(btl_endpoint);
 804     return OPAL_ERR_UNREACH;
 805 }
 806 
 807 
 808 
 809 
 810 
 811 
 812 
 813 static int mca_btl_tcp_endpoint_complete_connect(mca_btl_base_endpoint_t* btl_endpoint)
 814 {
 815     int so_error = 0;
 816     opal_socklen_t so_length = sizeof(so_error);
 817     struct sockaddr_storage endpoint_addr;
 818 
 819     
 820 
 821 
 822 
 823     opal_event_del(&btl_endpoint->endpoint_send_event);
 824 
 825     mca_btl_tcp_proc_tosocks(btl_endpoint->endpoint_addr, &endpoint_addr);
 826 
 827     
 828     if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_ERROR, (char *)&so_error, &so_length) < 0) {
 829         opal_show_help("help-mpi-btl-tcp.txt", "socket flag fail",
 830                        true, opal_process_info.nodename,
 831                        getpid(), "fcntl(sd, F_GETFL, 0)",
 832                        strerror(opal_socket_errno), opal_socket_errno);
 833         BTL_ERROR(("getsockopt() to %s:%d failed: %s (%d)",
 834                    opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
 835                    ((struct sockaddr_in*) &endpoint_addr)->sin_port,
 836                    strerror(opal_socket_errno), opal_socket_errno));
 837         mca_btl_tcp_endpoint_close(btl_endpoint);
 838         return OPAL_ERROR;
 839     }
 840     if(so_error == EINPROGRESS || so_error == EWOULDBLOCK) {
 841         return OPAL_SUCCESS;
 842     }
 843     if(so_error != 0) {
 844         char *msg;
 845         opal_asprintf(&msg, "connect() to %s:%d failed",
 846                  opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
 847                  ntohs(((struct sockaddr_in*) &endpoint_addr)->sin_port));
 848         opal_show_help("help-mpi-btl-tcp.txt", "client connect fail",
 849                        true, opal_process_info.nodename,
 850                        getpid(), msg,
 851                        strerror(opal_socket_errno), opal_socket_errno);
 852         free(msg);
 853         mca_btl_tcp_endpoint_close(btl_endpoint);
 854         return OPAL_ERROR;
 855     }
 856 
 857     opal_output_verbose(10, opal_btl_base_framework.framework_output,
 858                         "btl:tcp: connect() to %s:%d completed (complete_connect), sending connect ACK",
 859                         opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
 860                         ntohs(((struct sockaddr_in*) &endpoint_addr)->sin_port));
 861 
 862     if(mca_btl_tcp_endpoint_send_connect_ack(btl_endpoint) == OPAL_SUCCESS) {
 863         btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECT_ACK;
 864         opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
 865         if( mca_btl_tcp_event_base == opal_sync_event_base ) {
 866             
 867             opal_progress_event_users_increment();
 868         }
 869         MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, false, "event_add(recv) [complete_connect]");
 870         return OPAL_SUCCESS;
 871     }
 872     MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, false, " [complete_connect]");
 873     btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
 874     mca_btl_tcp_endpoint_close(btl_endpoint);
 875     return OPAL_ERROR;
 876 }
 877 
 878 
 879 
 880 
 881 
 882 
 883 
 884 static void mca_btl_tcp_endpoint_recv_handler(int sd, short flags, void* user)
 885 {
 886     mca_btl_base_endpoint_t* btl_endpoint = (mca_btl_base_endpoint_t *)user;
 887 
 888     
 889 
 890 
 891     if( sd != btl_endpoint->endpoint_sd )
 892         return;
 893 
 894     
 895 
 896 
 897 
 898 
 899 
 900 
 901 
 902 
 903 
 904 
 905 
 906 
 907 
 908 
 909 
 910     if( OPAL_THREAD_TRYLOCK(&btl_endpoint->endpoint_recv_lock) )
 911         return;
 912 
 913     switch(btl_endpoint->endpoint_state) {
 914     case MCA_BTL_TCP_CONNECT_ACK:
 915         {
 916             int rc = mca_btl_tcp_endpoint_recv_connect_ack(btl_endpoint);
 917             if( OPAL_SUCCESS == rc ) {
 918                 
 919                 OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
 920                 mca_btl_tcp_endpoint_connected(btl_endpoint);
 921                 OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
 922                 MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "connected");
 923             }
 924             else if (OPAL_ERR_BAD_PARAM == rc) {
 925                 
 926 
 927 
 928 
 929                 CLOSE_THE_SOCKET(sd);
 930             }
 931             else {
 932                 
 933 
 934 
 935                 mca_btl_tcp_module_t *m = btl_endpoint->endpoint_btl;
 936 
 937                 
 938                 if (NULL != m->tcp_error_cb) {
 939                     m->tcp_error_cb((mca_btl_base_module_t*) m, MCA_BTL_ERROR_FLAGS_FATAL,
 940                         btl_endpoint->endpoint_proc->proc_opal,
 941                         "TCP ACK is neither SUCCESS nor ERR (something bad has probably happened)");
 942                 }
 943             }
 944             OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
 945             return;
 946         }
 947     case MCA_BTL_TCP_CONNECTED:
 948         {
 949             mca_btl_tcp_frag_t* frag;
 950 
 951             frag = btl_endpoint->endpoint_recv_frag;
 952             if(NULL == frag) {
 953                 if(mca_btl_tcp_module.super.btl_max_send_size >
 954                    mca_btl_tcp_module.super.btl_eager_limit) {
 955                     MCA_BTL_TCP_FRAG_ALLOC_MAX(frag);
 956                 } else {
 957                     MCA_BTL_TCP_FRAG_ALLOC_EAGER(frag);
 958                 }
 959 
 960                 if(NULL == frag) {
 961                     OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
 962                     return;
 963                 }
 964                 MCA_BTL_TCP_FRAG_INIT_DST(frag, btl_endpoint);
 965             }
 966 
 967 #if MCA_BTL_TCP_ENDPOINT_CACHE
 968             assert( 0 == btl_endpoint->endpoint_cache_length );
 969         data_still_pending_on_endpoint:
 970 #endif  
 971             
 972             if(mca_btl_tcp_frag_recv(frag, btl_endpoint->endpoint_sd) == false) {
 973                 btl_endpoint->endpoint_recv_frag = frag;
 974             } else {
 975                 btl_endpoint->endpoint_recv_frag = NULL;
 976                 if( MCA_BTL_TCP_HDR_TYPE_SEND == frag->hdr.type ) {
 977                     mca_btl_active_message_callback_t* reg;
 978                     reg = mca_btl_base_active_message_trigger + frag->hdr.base.tag;
 979                     reg->cbfunc(&frag->btl->super, frag->hdr.base.tag, &frag->base, reg->cbdata);
 980                 }
 981 #if MCA_BTL_TCP_ENDPOINT_CACHE
 982                 if( 0 != btl_endpoint->endpoint_cache_length ) {
 983                     
 984 
 985 
 986                     MCA_BTL_TCP_FRAG_INIT_DST(frag, btl_endpoint);
 987                     goto data_still_pending_on_endpoint;
 988                 }
 989 #endif  
 990                 MCA_BTL_TCP_FRAG_RETURN(frag);
 991             }
 992 #if MCA_BTL_TCP_ENDPOINT_CACHE
 993             assert( 0 == btl_endpoint->endpoint_cache_length );
 994 #endif  
 995             OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
 996             break;
 997         }
 998     case MCA_BTL_TCP_CLOSED:
 999         
1000 
1001 
1002 
1003 
1004 
1005         OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
1006         break;
1007     default:
1008         OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
1009         BTL_ERROR(("invalid socket state(%d)", btl_endpoint->endpoint_state));
1010         btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
1011         mca_btl_tcp_endpoint_close(btl_endpoint);
1012         break;
1013     }
1014 }
1015 
1016 
1017 
1018 
1019 
1020 
1021 
1022 static void mca_btl_tcp_endpoint_send_handler(int sd, short flags, void* user)
1023 {
1024     mca_btl_tcp_endpoint_t* btl_endpoint = (mca_btl_tcp_endpoint_t *)user;
1025 
1026     
1027     if( OPAL_THREAD_TRYLOCK(&btl_endpoint->endpoint_send_lock) )
1028         return;
1029 
1030     switch(btl_endpoint->endpoint_state) {
1031     case MCA_BTL_TCP_CONNECTING:
1032         mca_btl_tcp_endpoint_complete_connect(btl_endpoint);
1033         break;
1034     case MCA_BTL_TCP_CONNECTED:
1035         
1036         while (NULL != btl_endpoint->endpoint_send_frag) {
1037             mca_btl_tcp_frag_t* frag = btl_endpoint->endpoint_send_frag;
1038             int btl_ownership = (frag->base.des_flags & MCA_BTL_DES_FLAGS_BTL_OWNERSHIP);
1039 
1040             if(mca_btl_tcp_frag_send(frag, btl_endpoint->endpoint_sd) == false) {
1041                 break;
1042             }
1043             
1044             btl_endpoint->endpoint_send_frag = (mca_btl_tcp_frag_t*)
1045                 opal_list_remove_first(&btl_endpoint->endpoint_frags);
1046 
1047             
1048             OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
1049             assert( frag->base.des_flags & MCA_BTL_DES_SEND_ALWAYS_CALLBACK );
1050             frag->base.des_cbfunc(&frag->btl->super, frag->endpoint, &frag->base, frag->rc);
1051             if( btl_ownership ) {
1052                 MCA_BTL_TCP_FRAG_RETURN(frag);
1053             }
1054             
1055 
1056 
1057 
1058             if( OPAL_THREAD_TRYLOCK(&btl_endpoint->endpoint_send_lock) )
1059                 return;
1060         }
1061 
1062         
1063         if(NULL == btl_endpoint->endpoint_send_frag) {
1064             MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, false, "event_del(send) [endpoint_send_handler]");
1065             opal_event_del(&btl_endpoint->endpoint_send_event);
1066         }
1067         break;
1068     default:
1069         BTL_ERROR(("invalid connection state (%d)", btl_endpoint->endpoint_state));
1070         MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, true, "event_del(send) [endpoint_send_handler:error]");
1071         opal_event_del(&btl_endpoint->endpoint_send_event);
1072         break;
1073     }
1074     OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
1075 }