root/opal/mca/btl/tcp/btl_tcp_endpoint.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_btl_tcp_endpoint_construct
  2. mca_btl_tcp_endpoint_destruct
  3. mca_btl_tcp_endpoint_dump
  4. mca_btl_tcp_endpoint_event_init
  5. mca_btl_tcp_endpoint_send
  6. mca_btl_tcp_endpoint_send_blocking
  7. mca_btl_tcp_endpoint_send_connect_ack
  8. mca_btl_tcp_endpoint_complete_accept
  9. mca_btl_tcp_endpoint_accept
  10. mca_btl_tcp_endpoint_close
  11. mca_btl_tcp_endpoint_connected
  12. mca_btl_tcp_endpoint_recv_connect_ack
  13. mca_btl_tcp_set_socket_options
  14. mca_btl_tcp_endpoint_start_connect
  15. mca_btl_tcp_endpoint_complete_connect
  16. mca_btl_tcp_endpoint_recv_handler
  17. mca_btl_tcp_endpoint_send_handler

   1 /*
   2  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2016 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2007-2008 Sun Microsystems, Inc.  All rights reserved.
  13  * Copyright (c) 2013-2018 Cisco Systems, Inc.  All rights reserved
  14  * Copyright (c) 2014      Intel, Inc.  All rights reserved.
  15  * Copyright (c) 2015      Research Organization for Information Science
  16  *                         and Technology (RIST). All rights reserved.
  17  * Copyright (c) 2018      Amazon.com, Inc. or its affiliates.  All Rights reserved.
  18  * $COPYRIGHT$
  19  *
  20  * Additional copyrights may follow
  21  *
  22  * $HEADER$
  23  *
  24  */
  25 
  26 #include "opal_config.h"
  27 
  28 #include <stdlib.h>
  29 #include <string.h>
  30 #ifdef HAVE_UNISTD_H
  31 #include <unistd.h>
  32 #endif
  33 #include "opal/opal_socket_errno.h"
  34 #ifdef HAVE_SYS_TYPES_H
  35 #include <sys/types.h>
  36 #endif
  37 #ifdef HAVE_FCNTL_H
  38 #include <fcntl.h>
  39 #endif
  40 #ifdef HAVE_NETINET_IN_H
  41 #include <netinet/in.h>
  42 #endif
  43 #ifdef HAVE_NETINET_TCP_H
  44 #include <netinet/tcp.h>
  45 #endif
  46 #ifdef HAVE_ARPA_INET_H
  47 #include <arpa/inet.h>
  48 #endif
  49 #ifdef HAVE_SYS_TIME_H
  50 #include <sys/time.h>
  51 #endif  /* HAVE_SYS_TIME_H */
  52 #include <time.h>
  53 
  54 #include "opal/mca/event/event.h"
  55 #include "opal/util/net.h"
  56 #include "opal/util/show_help.h"
  57 #include "opal/util/proc.h"
  58 #include "opal/util/printf.h"
  59 #include "opal/util/string_copy.h"
  60 #include "opal/mca/btl/base/btl_base_error.h"
  61 
  62 #include "btl_tcp.h"
  63 #include "btl_tcp_endpoint.h"
  64 #include "btl_tcp_proc.h"
  65 #include "btl_tcp_frag.h"
  66 #include "btl_tcp_addr.h"
  67 
  68 /*
  69  * Magic ID string send during connect/accept handshake
  70  */
  71 
  72 const char mca_btl_tcp_magic_id_string[MCA_BTL_TCP_MAGIC_STRING_LENGTH] = "OPAL-TCP-BTL";
  73 
  74 /*
  75  * Initialize state of the endpoint instance.
  76  *
  77  */
  78 static void mca_btl_tcp_endpoint_construct(mca_btl_tcp_endpoint_t* endpoint)
  79 {
  80     endpoint->endpoint_btl = NULL;
  81     endpoint->endpoint_proc = NULL;
  82     endpoint->endpoint_addr = NULL;
  83     endpoint->endpoint_sd = -1;
  84     endpoint->endpoint_sd_next = -1;
  85     endpoint->endpoint_send_frag = 0;
  86     endpoint->endpoint_recv_frag = 0;
  87     endpoint->endpoint_state = MCA_BTL_TCP_CLOSED;
  88     endpoint->endpoint_retries = 0;
  89     endpoint->endpoint_nbo = false;
  90 #if MCA_BTL_TCP_ENDPOINT_CACHE
  91     endpoint->endpoint_cache        = NULL;
  92     endpoint->endpoint_cache_pos    = NULL;
  93     endpoint->endpoint_cache_length = 0;
  94 #endif  /* MCA_BTL_TCP_ENDPOINT_CACHE */
  95     OBJ_CONSTRUCT(&endpoint->endpoint_frags, opal_list_t);
  96     OBJ_CONSTRUCT(&endpoint->endpoint_send_lock, opal_mutex_t);
  97     OBJ_CONSTRUCT(&endpoint->endpoint_recv_lock, opal_mutex_t);
  98 }
  99 
 100 /*
 101  * Destroy a endpoint
 102  *
 103  */
 104 static void mca_btl_tcp_endpoint_destruct(mca_btl_tcp_endpoint_t* endpoint)
 105 {
 106     mca_btl_tcp_endpoint_close(endpoint);
 107     mca_btl_tcp_proc_remove(endpoint->endpoint_proc, endpoint);
 108     OBJ_DESTRUCT(&endpoint->endpoint_frags);
 109     OBJ_DESTRUCT(&endpoint->endpoint_send_lock);
 110     OBJ_DESTRUCT(&endpoint->endpoint_recv_lock);
 111 }
 112 
 113 OBJ_CLASS_INSTANCE(
 114     mca_btl_tcp_endpoint_t,
 115     opal_list_item_t,
 116     mca_btl_tcp_endpoint_construct,
 117     mca_btl_tcp_endpoint_destruct);
 118 
 119 
 120 static void mca_btl_tcp_endpoint_construct(mca_btl_base_endpoint_t* btl_endpoint);
 121 static void mca_btl_tcp_endpoint_destruct(mca_btl_base_endpoint_t* btl_endpoint);
 122 static int  mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t*);
 123 static void mca_btl_tcp_endpoint_connected(mca_btl_base_endpoint_t*);
 124 static void mca_btl_tcp_endpoint_recv_handler(int sd, short flags, void* user);
 125 static void mca_btl_tcp_endpoint_send_handler(int sd, short flags, void* user);
 126 
 127 /*
 128  * diagnostics
 129  */
 130 
 131 #if OPAL_ENABLE_DEBUG && WANT_PEER_DUMP
 132 
 133 #define DEBUG_LENGTH  1024
 134 /**
 135  * The lack of protection in the mca_btl_tcp_endpoint_dump function is voluntary
 136  * so that it can be called regardless of the state of the mutexes. As a result,
 137  * when multiple threads work on the same endpoint not only the information
 138  * displayed might be inacurate, but when we manipulate the pending fragments we
 139  * might access freed memory. Thus, the caller should lock the endpoint prior
 140  * to the call.
 141  */
 142 void
 143 mca_btl_tcp_endpoint_dump(int level,
 144                           const char* fname,
 145                           int lineno,
 146                           const char* funcname,
 147                           mca_btl_base_endpoint_t* btl_endpoint,
 148                           bool full_info,
 149                           const char* msg)
 150 {
 151     char outmsg[DEBUG_LENGTH];
 152     int sndbuf, rcvbuf, nodelay, flags, used = 0;
 153 #if OPAL_ENABLE_IPV6
 154     struct sockaddr_storage inaddr;
 155 #else
 156     struct sockaddr_in inaddr;
 157 #endif
 158     opal_socklen_t obtlen;
 159     opal_socklen_t addrlen = sizeof(inaddr);
 160     mca_btl_tcp_frag_t* item;
 161 
 162     used += snprintf(&outmsg[used], DEBUG_LENGTH - used, "%s: ", msg);
 163     if (used >= DEBUG_LENGTH) goto out;
 164 
 165     getsockname(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen);
 166 #if OPAL_ENABLE_IPV6
 167     {
 168         char *address;
 169         address = (char *) opal_net_get_hostname((struct sockaddr*) &inaddr);
 170         if (NULL != address) {
 171             used += snprintf(&outmsg[used], DEBUG_LENGTH - used, "%s -", address);
 172             if (used >= DEBUG_LENGTH) goto out;
 173         }
 174     }
 175 #else
 176     used += snprintf(&outmsg[used], DEBUG_LENGTH - used, "%s -", inet_ntoa(inaddr.sin_addr));
 177     if (used >= DEBUG_LENGTH) goto out;
 178 #endif
 179     getpeername(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen);
 180 #if OPAL_ENABLE_IPV6
 181     {
 182         char *address;
 183         address = (char *) opal_net_get_hostname ((struct sockaddr*) &inaddr);
 184         if (NULL != address) {
 185             used += snprintf(&outmsg[used], DEBUG_LENGTH - used, " %s", address);
 186             if (used >= DEBUG_LENGTH) goto out;
 187         }
 188     }
 189 #else
 190     used += snprintf(&outmsg[used], DEBUG_LENGTH - used, " %s", inet_ntoa(inaddr.sin_addr));
 191     if (used >= DEBUG_LENGTH) goto out;
 192 #endif
 193 
 194     used = snprintf(outmsg, DEBUG_LENGTH, "[%d", btl_endpoint->endpoint_sd);
 195     if (used >= DEBUG_LENGTH) goto out;
 196     switch(btl_endpoint->endpoint_state) {
 197     case MCA_BTL_TCP_CONNECTING:
 198         used += snprintf(&outmsg[used], DEBUG_LENGTH - used, ":%s]", "connecting");
 199         if (used >= DEBUG_LENGTH) goto out;
 200         break;
 201     case MCA_BTL_TCP_CONNECT_ACK:
 202         used += snprintf(&outmsg[used], DEBUG_LENGTH - used, ":%s]", "ack");
 203         if (used >= DEBUG_LENGTH) goto out;
 204         break;
 205     case MCA_BTL_TCP_CLOSED:
 206         used += snprintf(&outmsg[used], DEBUG_LENGTH - used, ":%s]", "close");
 207         if (used >= DEBUG_LENGTH) goto out;
 208         break;
 209     case MCA_BTL_TCP_FAILED:
 210         used += snprintf(&outmsg[used], DEBUG_LENGTH - used, ":%s]", "failed");
 211         if (used >= DEBUG_LENGTH) goto out;
 212         break;
 213     case MCA_BTL_TCP_CONNECTED:
 214         used += snprintf(&outmsg[used], DEBUG_LENGTH - used, ":%s]", "connected");
 215         if (used >= DEBUG_LENGTH) goto out;
 216         break;
 217     default:
 218         used += snprintf(&outmsg[used], DEBUG_LENGTH - used, ":%s]", "unknown");
 219         if (used >= DEBUG_LENGTH) goto out;
 220         break;
 221     }
 222 
 223     if( full_info ) {
 224         if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) {
 225             BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)",
 226                        strerror(opal_socket_errno), opal_socket_errno));
 227         }
 228 
 229 #if defined(SO_SNDBUF)
 230         obtlen = sizeof(sndbuf);
 231         if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &obtlen) < 0) {
 232             BTL_ERROR(("SO_SNDBUF option: %s (%d)",
 233                        strerror(opal_socket_errno), opal_socket_errno));
 234         }
 235 #else
 236         sndbuf = -1;
 237 #endif
 238 #if defined(SO_RCVBUF)
 239         obtlen = sizeof(rcvbuf);
 240         if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf, &obtlen) < 0) {
 241             BTL_ERROR(("SO_RCVBUF option: %s (%d)",
 242                        strerror(opal_socket_errno), opal_socket_errno));
 243         }
 244 #else
 245         rcvbuf = -1;
 246 #endif
 247 #if defined(TCP_NODELAY)
 248         obtlen = sizeof(nodelay);
 249         if(getsockopt(btl_endpoint->endpoint_sd, IPPROTO_TCP, TCP_NODELAY, (char *)&nodelay, &obtlen) < 0) {
 250             BTL_ERROR(("TCP_NODELAY option: %s (%d)",
 251                        strerror(opal_socket_errno), opal_socket_errno));
 252         }
 253 #else
 254         nodelay = 0;
 255 #endif
 256         used += snprintf(&outmsg[used], DEBUG_LENGTH - used, " nodelay %d sndbuf %d rcvbuf %d flags %08x",
 257                          nodelay, sndbuf, rcvbuf, flags);
 258         if (used >= DEBUG_LENGTH) goto out;
 259 #if MCA_BTL_TCP_ENDPOINT_CACHE
 260         used += snprintf(&outmsg[used], DEBUG_LENGTH - used, "\n\t[cache %p used %lu/%lu]",
 261                          (void*)btl_endpoint->endpoint_cache, btl_endpoint->endpoint_cache_pos - btl_endpoint->endpoint_cache,
 262                          btl_endpoint->endpoint_cache_length);
 263         if (used >= DEBUG_LENGTH) goto out;
 264 #endif  /* MCA_BTL_TCP_ENDPOINT_CACHE */
 265         used += snprintf(&outmsg[used], DEBUG_LENGTH - used, "{%s - retries %d}",
 266                          (btl_endpoint->endpoint_nbo ? "NBO" : ""), (int)btl_endpoint->endpoint_retries);
 267         if (used >= DEBUG_LENGTH) goto out;
 268     }
 269     used += snprintf(&outmsg[used], DEBUG_LENGTH - used, "\n");
 270     if (used >= DEBUG_LENGTH) goto out;
 271 
 272     if( NULL != btl_endpoint->endpoint_recv_frag )
 273         used += mca_btl_tcp_frag_dump(btl_endpoint->endpoint_recv_frag, "active recv",
 274                                       &outmsg[used], DEBUG_LENGTH - used);
 275     if (used >= DEBUG_LENGTH) goto out;
 276 
 277     if( NULL != btl_endpoint->endpoint_send_frag )
 278         used += mca_btl_tcp_frag_dump(btl_endpoint->endpoint_send_frag, "active send (inaccurate iov)",
 279                                       &outmsg[used], DEBUG_LENGTH - used);
 280     if (used >= DEBUG_LENGTH) goto out;
 281     OPAL_LIST_FOREACH(item, &btl_endpoint->endpoint_frags, mca_btl_tcp_frag_t) {
 282         used += mca_btl_tcp_frag_dump(item, "pending send", &outmsg[used], DEBUG_LENGTH - used);
 283         if (used >= DEBUG_LENGTH) goto out;
 284     }
 285 out:
 286     outmsg[ used >= DEBUG_LENGTH ? (DEBUG_LENGTH-1) : used ] = '\0';
 287     opal_output_verbose(level, opal_btl_base_framework.framework_output,
 288                         "[%s:%d:%s][%s -> %s] %s",
 289                         fname, lineno, funcname,
 290                         OPAL_NAME_PRINT(opal_proc_local_get()->proc_name),
 291                         (NULL != btl_endpoint->endpoint_proc ? OPAL_NAME_PRINT(btl_endpoint->endpoint_proc->proc_opal->proc_name) : "unknown remote"),
 292                         outmsg);
 293 }
 294 #endif /* OPAL_ENABLE_DEBUG && WANT_PEER_DUMP */
 295 
 296 /*
 297  * Initialize events to be used by the endpoint instance for TCP select/poll callbacks.
 298  */
 299 
 300 static inline void mca_btl_tcp_endpoint_event_init(mca_btl_base_endpoint_t* btl_endpoint)
 301 {
 302 #if MCA_BTL_TCP_ENDPOINT_CACHE
 303     assert(NULL == btl_endpoint->endpoint_cache);
 304     btl_endpoint->endpoint_cache     = (char*)malloc(mca_btl_tcp_component.tcp_endpoint_cache);
 305     btl_endpoint->endpoint_cache_pos = btl_endpoint->endpoint_cache;
 306 #endif  /* MCA_BTL_TCP_ENDPOINT_CACHE */
 307 
 308     opal_event_set(mca_btl_tcp_event_base, &btl_endpoint->endpoint_recv_event,
 309                     btl_endpoint->endpoint_sd,
 310                     OPAL_EV_READ | OPAL_EV_PERSIST,
 311                     mca_btl_tcp_endpoint_recv_handler,
 312                     btl_endpoint );
 313     /**
 314      * In the multi-threaded case, the send event must be persistent in order
 315      * to avoid missing the connection notification in send_handler due to
 316      * a local handling of the peer process (which holds the lock).
 317      */
 318     opal_event_set(mca_btl_tcp_event_base, &btl_endpoint->endpoint_send_event,
 319                     btl_endpoint->endpoint_sd,
 320                     OPAL_EV_WRITE | OPAL_EV_PERSIST,
 321                     mca_btl_tcp_endpoint_send_handler,
 322                     btl_endpoint);
 323 }
 324 
 325 
 326 /*
 327  * Attempt to send a fragment using a given endpoint. If the endpoint is not connected,
 328  * queue the fragment and start the connection as required.
 329  */
 330 
 331 int mca_btl_tcp_endpoint_send(mca_btl_base_endpoint_t* btl_endpoint, mca_btl_tcp_frag_t* frag)
 332 {
 333     int rc = OPAL_SUCCESS;
 334 
 335     OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
 336     switch(btl_endpoint->endpoint_state) {
 337     case MCA_BTL_TCP_CONNECTING:
 338     case MCA_BTL_TCP_CONNECT_ACK:
 339     case MCA_BTL_TCP_CLOSED:
 340         opal_list_append(&btl_endpoint->endpoint_frags, (opal_list_item_t*)frag);
 341         frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
 342         if(btl_endpoint->endpoint_state == MCA_BTL_TCP_CLOSED)
 343             rc = mca_btl_tcp_endpoint_start_connect(btl_endpoint);
 344         break;
 345     case MCA_BTL_TCP_FAILED:
 346         rc = OPAL_ERR_UNREACH;
 347         break;
 348     case MCA_BTL_TCP_CONNECTED:
 349         if (NULL == btl_endpoint->endpoint_send_frag) {
 350             if(frag->base.des_flags & MCA_BTL_DES_FLAGS_PRIORITY &&
 351                mca_btl_tcp_frag_send(frag, btl_endpoint->endpoint_sd)) {
 352                 int btl_ownership = (frag->base.des_flags & MCA_BTL_DES_FLAGS_BTL_OWNERSHIP);
 353 
 354                 OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
 355                 if( frag->base.des_flags & MCA_BTL_DES_SEND_ALWAYS_CALLBACK ) {
 356                     frag->base.des_cbfunc(&frag->btl->super, frag->endpoint, &frag->base, frag->rc);
 357                 }
 358                 if( btl_ownership ) {
 359                     MCA_BTL_TCP_FRAG_RETURN(frag);
 360                 }
 361                 MCA_BTL_TCP_ENDPOINT_DUMP(50, btl_endpoint, true, "complete send fragment [endpoint_send]");
 362                 return 1;
 363             } else {
 364                 btl_endpoint->endpoint_send_frag = frag;
 365                 MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "event_add(send) [endpoint_send]");
 366                 frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
 367                 MCA_BTL_TCP_ACTIVATE_EVENT(&btl_endpoint->endpoint_send_event, 0);
 368             }
 369         } else {
 370             MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "send fragment enqueued [endpoint_send]");
 371             frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
 372             opal_list_append(&btl_endpoint->endpoint_frags, (opal_list_item_t*)frag);
 373         }
 374         break;
 375     }
 376     OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
 377     return rc;
 378 }
 379 
 380 
 381 /*
 382  * A blocking send on a non-blocking socket. Used to send the small
 383  * amount of connection information that identifies the endpoints endpoint.
 384  */
 385 static int
 386 mca_btl_tcp_endpoint_send_blocking(mca_btl_base_endpoint_t* btl_endpoint,
 387                                    const void* data, size_t size)
 388 {
 389     int ret = mca_btl_tcp_send_blocking(btl_endpoint->endpoint_sd, data, size);
 390     if (ret < 0) {
 391         mca_btl_tcp_endpoint_close(btl_endpoint);
 392     }
 393     return ret;
 394 }
 395 
 396 /*
 397  * Send the globally unique identifier for this process to a endpoint on
 398  * a newly connected socket.
 399  */
 400 static int 
 401 mca_btl_tcp_endpoint_send_connect_ack(mca_btl_base_endpoint_t* btl_endpoint)
 402 {
 403     opal_process_name_t guid = opal_proc_local_get()->proc_name;
 404     OPAL_PROCESS_NAME_HTON(guid);
 405     
 406     mca_btl_tcp_endpoint_hs_msg_t hs_msg;
 407     opal_string_copy(hs_msg.magic_id, mca_btl_tcp_magic_id_string,
 408                      sizeof(hs_msg.magic_id));
 409     hs_msg.guid = guid;
 410     
 411     if(sizeof(hs_msg) != 
 412        mca_btl_tcp_endpoint_send_blocking(btl_endpoint, 
 413                                           &hs_msg, sizeof(hs_msg))) {
 414          opal_show_help("help-mpi-btl-tcp.txt", "client handshake fail",
 415                        true, opal_process_info.nodename,
 416                        sizeof(hs_msg),
 417                        "connect ACK failed to send magic-id and guid");
 418           return OPAL_ERR_UNREACH;
 419     }
 420     return OPAL_SUCCESS;
 421 }
 422 
 423 static void *mca_btl_tcp_endpoint_complete_accept(int fd, int flags, void *context)
 424 {
 425     mca_btl_base_endpoint_t* btl_endpoint = (mca_btl_base_endpoint_t*)context;
 426     struct timeval now = {0, 0};
 427     int cmpval;
 428 
 429     if( OPAL_THREAD_TRYLOCK(&btl_endpoint->endpoint_recv_lock) ) {
 430         opal_event_add(&btl_endpoint->endpoint_accept_event, &now);
 431         return NULL;
 432     }
 433     if( OPAL_THREAD_TRYLOCK(&btl_endpoint->endpoint_send_lock) ) {
 434         OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
 435         opal_event_add(&btl_endpoint->endpoint_accept_event, &now);
 436         return NULL;
 437     }
 438 
 439     if(NULL == btl_endpoint->endpoint_addr) {
 440         CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd_next); /* No further use of this socket. Close it */
 441         btl_endpoint->endpoint_sd_next = -1;
 442         OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
 443         OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
 444         if( NULL != btl_endpoint->endpoint_btl->tcp_error_cb ) {
 445             btl_endpoint->endpoint_btl->tcp_error_cb(
 446                 &btl_endpoint->endpoint_btl->super, MCA_BTL_ERROR_FLAGS_NONFATAL,
 447                 btl_endpoint->endpoint_proc->proc_opal,
 448                 "The endpoint addr is set to NULL (unsettling)");
 449         }
 450         return NULL;
 451     }
 452 
 453     cmpval = opal_compare_proc(btl_endpoint->endpoint_proc->proc_opal->proc_name,
 454                                opal_proc_local_get()->proc_name);
 455     if((btl_endpoint->endpoint_sd < 0) ||
 456        (btl_endpoint->endpoint_state != MCA_BTL_TCP_CONNECTED &&
 457         cmpval < 0)) {
 458         mca_btl_tcp_endpoint_close(btl_endpoint);
 459         btl_endpoint->endpoint_sd = btl_endpoint->endpoint_sd_next;
 460         btl_endpoint->endpoint_sd_next = -1;
 461         if(mca_btl_tcp_endpoint_send_connect_ack(btl_endpoint) != OPAL_SUCCESS) {
 462             MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, true, " [endpoint_accept]");
 463             btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
 464             mca_btl_tcp_endpoint_close(btl_endpoint);
 465             goto unlock_and_return;
 466         }
 467         mca_btl_tcp_endpoint_event_init(btl_endpoint);
 468         MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "event_add(recv) [endpoint_accept]");
 469         opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
 470         if( mca_btl_tcp_event_base == opal_sync_event_base ) {
 471             /* If no progress thread then raise the awarness of the default progress engine */
 472             opal_progress_event_users_increment();
 473         }
 474         mca_btl_tcp_endpoint_connected(btl_endpoint);
 475 
 476         MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "accepted");
 477         goto unlock_and_return;
 478     }
 479     CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd_next); /* No further use of this socket. Close it */
 480     btl_endpoint->endpoint_sd_next = -1;
 481   unlock_and_return:
 482     OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
 483     OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
 484     return NULL;
 485 }
 486 
 487 /*
 488  * Check the state of this endpoint. If the incoming connection request matches
 489  * our endpoints address, check the state of our connection:
 490  * (1) if a connection has not been attempted, accept the connection
 491  * (2) if a connection has not been established, and the endpoints process identifier
 492  *     is less than the local process, accept the connection
 493  * otherwise, reject the connection and continue with the current connection
 494  */
 495 
 496 void mca_btl_tcp_endpoint_accept(mca_btl_base_endpoint_t* btl_endpoint,
 497                                  struct sockaddr* addr, int sd)
 498 {
 499     struct timeval now = {0, 0};
 500 
 501     assert(btl_endpoint->endpoint_sd_next == -1);
 502     btl_endpoint->endpoint_sd_next = sd;
 503 
 504     opal_event_evtimer_set(mca_btl_tcp_event_base, &btl_endpoint->endpoint_accept_event,
 505                            mca_btl_tcp_endpoint_complete_accept, btl_endpoint);
 506     opal_event_add(&btl_endpoint->endpoint_accept_event, &now);
 507 }
 508 
 509 
 510 /*
 511  * Remove any event registrations associated with the socket
 512  * and update the endpoint state to reflect the connection has
 513  * been closed.
 514  */
 515 void mca_btl_tcp_endpoint_close(mca_btl_base_endpoint_t* btl_endpoint)
 516 {
 517     MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, false, "[close]");
 518     if(btl_endpoint->endpoint_sd < 0)
 519         return;
 520     btl_endpoint->endpoint_retries++;
 521     MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, false, "event_del(recv) [close]");
 522     opal_event_del(&btl_endpoint->endpoint_recv_event);
 523     if( mca_btl_tcp_event_base == opal_sync_event_base ) {
 524         /* If no progress thread then lower the awarness of the default progress engine */
 525         opal_progress_event_users_decrement();
 526     }
 527     MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, false, "event_del(send) [close]");
 528     opal_event_del(&btl_endpoint->endpoint_send_event);
 529 
 530 #if MCA_BTL_TCP_ENDPOINT_CACHE
 531     free( btl_endpoint->endpoint_cache );
 532     btl_endpoint->endpoint_cache        = NULL;
 533     btl_endpoint->endpoint_cache_pos    = NULL;
 534     btl_endpoint->endpoint_cache_length = 0;
 535 #endif  /* MCA_BTL_TCP_ENDPOINT_CACHE */
 536 
 537     CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd);
 538     btl_endpoint->endpoint_sd = -1;
 539     /**
 540      * If we keep failing to connect to the peer let the caller know about
 541      * this situation by triggering all the pending fragments callback and
 542      * reporting the error.
 543      */
 544     if( MCA_BTL_TCP_FAILED == btl_endpoint->endpoint_state ) {
 545         mca_btl_tcp_frag_t* frag = btl_endpoint->endpoint_send_frag;
 546         if( NULL == frag )
 547             frag = (mca_btl_tcp_frag_t*)opal_list_remove_first(&btl_endpoint->endpoint_frags);
 548         while(NULL != frag) {
 549             frag->base.des_cbfunc(&frag->btl->super, frag->endpoint, &frag->base, OPAL_ERR_UNREACH);
 550 
 551             frag = (mca_btl_tcp_frag_t*)opal_list_remove_first(&btl_endpoint->endpoint_frags);
 552         }
 553     }
 554     btl_endpoint->endpoint_state = MCA_BTL_TCP_CLOSED;
 555 }
 556 
 557 /*
 558  *  Setup endpoint state to reflect that connection has been established,
 559  *  and start any pending sends. This function should be called with the
 560  *  send lock locked.
 561  */
 562 
 563 static void mca_btl_tcp_endpoint_connected(mca_btl_base_endpoint_t* btl_endpoint)
 564 {
 565     /* setup socket options */
 566     assert( MCA_BTL_TCP_CONNECTED != btl_endpoint->endpoint_state );
 567     btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECTED;
 568     btl_endpoint->endpoint_retries = 0;
 569     MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, true, "READY [endpoint_connected]");
 570 
 571     if(opal_list_get_size(&btl_endpoint->endpoint_frags) > 0) {
 572         if(NULL == btl_endpoint->endpoint_send_frag)
 573             btl_endpoint->endpoint_send_frag = (mca_btl_tcp_frag_t*)
 574                 opal_list_remove_first(&btl_endpoint->endpoint_frags);
 575         MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "event_add(send) [endpoint_connected]");
 576         opal_event_add(&btl_endpoint->endpoint_send_event, 0);
 577     }
 578 }
 579 
 580 
 581 /*
 582  *  Receive the endpoints globally unique process identification from a newly
 583  *  connected socket and verify the expected response. If so, move the
 584  *  socket to a connected state.
 585  *
 586  *  NOTE: The return codes from this function are checked in
 587  *  mca_btl_tcp_endpoint_recv_handler().  Don't change them here
 588  *  without also changing the handling in _recv_handler()!
 589  */
 590 static int mca_btl_tcp_endpoint_recv_connect_ack(mca_btl_base_endpoint_t* btl_endpoint)
 591 {
 592     size_t retval, len = strlen(mca_btl_tcp_magic_id_string);;
 593     mca_btl_tcp_proc_t* btl_proc = btl_endpoint->endpoint_proc;
 594     opal_process_name_t guid;
 595 
 596     mca_btl_tcp_endpoint_hs_msg_t hs_msg;
 597     retval = mca_btl_tcp_recv_blocking(btl_endpoint->endpoint_sd, &hs_msg, sizeof(hs_msg));
 598 
 599     if (sizeof(hs_msg) != retval) {
 600         mca_btl_tcp_endpoint_close(btl_endpoint);
 601         if (0 == retval) {
 602             /* If we get zero bytes, the peer closed the socket. This
 603                can happen when the two peers started the connection
 604                protocol simultaneously. Just report the problem
 605                upstream. */
 606             return OPAL_ERROR;
 607         }
 608         opal_show_help("help-mpi-btl-tcp.txt", "client handshake fail",
 609                        true, opal_process_info.nodename,
 610                        getpid(), "did not receive entire connect ACK from peer");
 611         
 612         return OPAL_ERR_BAD_PARAM;
 613     }
 614     if (0 != strncmp(hs_msg.magic_id, mca_btl_tcp_magic_id_string, len)) {
 615         opal_show_help("help-mpi-btl-tcp.txt", "server did not receive magic string",
 616                        true, opal_process_info.nodename,
 617                        getpid(), "client", hs_msg.magic_id,
 618                        "string value");
 619         return OPAL_ERR_BAD_PARAM;
 620     }
 621 
 622     guid = hs_msg.guid;
 623     OPAL_PROCESS_NAME_NTOH(guid);
 624     /* compare this to the expected values */
 625     /* TODO: this deserve a little bit more thinking as we are not supposed
 626      * to be able to exchange the opal_process_name_t over the network.
 627      */
 628     if (0 != opal_compare_proc(btl_proc->proc_opal->proc_name, guid)) {
 629         BTL_ERROR(("received unexpected process identifier %s",
 630                    OPAL_NAME_PRINT(guid)));
 631         mca_btl_tcp_endpoint_close(btl_endpoint);
 632         return OPAL_ERR_UNREACH;
 633     }
 634 
 635     return OPAL_SUCCESS;
 636 }
 637 
 638 
 639 void mca_btl_tcp_set_socket_options(int sd)
 640 {
 641 #if defined(TCP_NODELAY)
 642     int optval;
 643     optval = !mca_btl_tcp_component.tcp_not_use_nodelay;
 644     if(setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char *)&optval, sizeof(optval)) < 0) {
 645         BTL_ERROR(("setsockopt(TCP_NODELAY) failed: %s (%d)",
 646                    strerror(opal_socket_errno), opal_socket_errno));
 647     }
 648 #endif
 649 #if defined(SO_SNDBUF)
 650     if(mca_btl_tcp_component.tcp_sndbuf > 0 &&
 651        setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&mca_btl_tcp_component.tcp_sndbuf, sizeof(int)) < 0) {
 652         BTL_ERROR(("setsockopt(SO_SNDBUF) failed: %s (%d)",
 653                    strerror(opal_socket_errno), opal_socket_errno));
 654     }
 655 #endif
 656 #if defined(SO_RCVBUF)
 657     if(mca_btl_tcp_component.tcp_rcvbuf > 0 &&
 658        setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&mca_btl_tcp_component.tcp_rcvbuf, sizeof(int)) < 0) {
 659         BTL_ERROR(("setsockopt(SO_RCVBUF) failed: %s (%d)",
 660                    strerror(opal_socket_errno), opal_socket_errno));
 661     }
 662 #endif
 663 }
 664 
 665 
 666 
 667 /*
 668  *  Start a connection to the endpoint. This will likely not complete,
 669  *  as the socket is set to non-blocking, so register for event
 670  *  notification of connect completion. On connection we send our
 671  *  globally unique process identifier to the endpoint and wait for
 672  *  the endpoint response.
 673  */
 674 static int mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpoint)
 675 {
 676     int rc,flags;
 677     struct sockaddr_storage endpoint_addr;
 678     /* By default consider a IPv4 connection */
 679     uint16_t af_family = AF_INET;
 680     opal_socklen_t addrlen = sizeof(struct sockaddr_in);
 681 
 682 #if OPAL_ENABLE_IPV6
 683     if (AF_INET6 == btl_endpoint->endpoint_addr->addr_family) {
 684         af_family = AF_INET6;
 685         addrlen = sizeof (struct sockaddr_in6);
 686     }
 687 #endif
 688     assert( btl_endpoint->endpoint_sd < 0 );
 689     btl_endpoint->endpoint_sd = socket(af_family, SOCK_STREAM, 0);
 690     if (btl_endpoint->endpoint_sd < 0) {
 691         btl_endpoint->endpoint_retries++;
 692         return OPAL_ERR_UNREACH;
 693     }
 694 
 695     /* setup socket buffer sizes */
 696     mca_btl_tcp_set_socket_options(btl_endpoint->endpoint_sd);
 697 
 698     /* setup event callbacks */
 699     mca_btl_tcp_endpoint_event_init(btl_endpoint);
 700 
 701     /* setup the socket as non-blocking */
 702     if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) {
 703         opal_show_help("help-mpi-btl-tcp.txt", "socket flag fail",
 704                        true, opal_process_info.nodename,
 705                        getpid(), "fcntl(sd, F_GETFL, 0)",
 706                        strerror(opal_socket_errno), opal_socket_errno);
 707         /* Upper layer will handler the error */
 708         return OPAL_ERR_UNREACH;
 709     } else {
 710         flags |= O_NONBLOCK;
 711         if(fcntl(btl_endpoint->endpoint_sd, F_SETFL, flags) < 0) {
 712             opal_show_help("help-mpi-btl-tcp.txt", "socket flag fail",
 713                            true, opal_process_info.nodename,
 714                            getpid(),
 715                            "fcntl(sd, F_SETFL, flags & O_NONBLOCK)",
 716                            strerror(opal_socket_errno), opal_socket_errno);
 717             /* Upper layer will handler the error */
 718             return OPAL_ERR_UNREACH;
 719         }
 720     }
 721 
 722     /* start the connect - will likely fail with EINPROGRESS */
 723     mca_btl_tcp_proc_tosocks(btl_endpoint->endpoint_addr, &endpoint_addr);
 724 
 725     /* Bind the socket to one of the addresses associated with
 726      * this btl module.  This sets the source IP to one of the 
 727      * addresses shared in modex, so that the destination rank 
 728      * can properly pair btl modules, even in cases where Linux 
 729      * might do something unexpected with routing */
 730     if (endpoint_addr.ss_family == AF_INET) {
 731         assert(NULL != &btl_endpoint->endpoint_btl->tcp_ifaddr);
 732         if (bind(btl_endpoint->endpoint_sd, (struct sockaddr*) &btl_endpoint->endpoint_btl->tcp_ifaddr,
 733                  sizeof(struct sockaddr_in)) < 0) {
 734             BTL_ERROR(("bind on local address (%s:%d) failed: %s (%d)",
 735                        opal_net_get_hostname((struct sockaddr*) &btl_endpoint->endpoint_btl->tcp_ifaddr),
 736                        htons(((struct sockaddr_in*)&btl_endpoint->endpoint_btl->tcp_ifaddr)->sin_port),
 737                        strerror(opal_socket_errno), opal_socket_errno));
 738 
 739             CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd);
 740             return OPAL_ERROR;
 741         }
 742     }
 743 #if OPAL_ENABLE_IPV6
 744     if (endpoint_addr.ss_family == AF_INET6) {
 745         assert(NULL != &btl_endpoint->endpoint_btl->tcp_ifaddr);
 746         if (bind(btl_endpoint->endpoint_sd, (struct sockaddr*) &btl_endpoint->endpoint_btl->tcp_ifaddr,
 747                  sizeof(struct sockaddr_in6)) < 0) {
 748             BTL_ERROR(("bind on local address (%s:%d) failed: %s (%d)",
 749                        opal_net_get_hostname((struct sockaddr*) &btl_endpoint->endpoint_btl->tcp_ifaddr),
 750                        htons(((struct sockaddr_in6*)&btl_endpoint->endpoint_btl->tcp_ifaddr)->sin6_port),
 751                        strerror(opal_socket_errno), opal_socket_errno));
 752 
 753             CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd);
 754             return OPAL_ERROR;
 755         }
 756     }
 757 #endif
 758     opal_output_verbose(10, opal_btl_base_framework.framework_output,
 759                         "btl: tcp: attempting to connect() to %s address %s on port %d",
 760                         OPAL_NAME_PRINT(btl_endpoint->endpoint_proc->proc_opal->proc_name),
 761                         opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
 762                         ntohs(btl_endpoint->endpoint_addr->addr_port));
 763 
 764     if(0 == connect(btl_endpoint->endpoint_sd, (struct sockaddr*)&endpoint_addr, addrlen)) {
 765         opal_output_verbose(10, opal_btl_base_framework.framework_output,
 766                             "btl:tcp: connect() to %s:%d completed",
 767                             opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
 768                             ntohs(((struct sockaddr_in*) &endpoint_addr)->sin_port));
 769         /* send our globally unique process identifier to the endpoint */
 770         if((rc = mca_btl_tcp_endpoint_send_connect_ack(btl_endpoint)) == OPAL_SUCCESS) {
 771             btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECT_ACK;
 772             MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "event_add(recv) [start_connect]");
 773             opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
 774             if( mca_btl_tcp_event_base == opal_sync_event_base ) {
 775                 /* If no progress thread then raise the awarness of the default progress engine */
 776                 opal_progress_event_users_increment();
 777             }
 778             return OPAL_SUCCESS;
 779         }
 780         /* We connected to the peer, but he close the socket before we got a chance to send our guid */
 781         MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, true, "dropped connection [start_connect]");
 782     } else {
 783         /* non-blocking so wait for completion */
 784         if(opal_socket_errno == EINPROGRESS || opal_socket_errno == EWOULDBLOCK) {
 785             btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECTING;
 786             MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "event_add(send) [start_connect]");
 787             MCA_BTL_TCP_ACTIVATE_EVENT(&btl_endpoint->endpoint_send_event, 0);
 788             opal_output_verbose(30, opal_btl_base_framework.framework_output,
 789                                 "btl:tcp: would block, so allowing background progress");
 790             return OPAL_SUCCESS;
 791         }
 792     }
 793 
 794     {
 795         char *address;
 796         address = opal_net_get_hostname((struct sockaddr*) &endpoint_addr);
 797         BTL_PEER_ERROR( btl_endpoint->endpoint_proc->proc_opal,
 798                       ( "Unable to connect to the peer %s on port %d: %s\n",
 799                         address,
 800                         ntohs(btl_endpoint->endpoint_addr->addr_port), strerror(opal_socket_errno) ) );
 801     }
 802     btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
 803     mca_btl_tcp_endpoint_close(btl_endpoint);
 804     return OPAL_ERR_UNREACH;
 805 }
 806 
 807 
 808 /*
 809  * Check the status of the connection. If the connection failed, will retry
 810  * later. Otherwise, send this processes identifier to the endpoint on the
 811  * newly connected socket.
 812  */
 813 static int mca_btl_tcp_endpoint_complete_connect(mca_btl_base_endpoint_t* btl_endpoint)
 814 {
 815     int so_error = 0;
 816     opal_socklen_t so_length = sizeof(so_error);
 817     struct sockaddr_storage endpoint_addr;
 818 
 819     /* Delete the send event notification, as the next step is waiting for the ack
 820      * from the peer. Once this ack is received we will deal with the send notification
 821      * accordingly.
 822      */
 823     opal_event_del(&btl_endpoint->endpoint_send_event);
 824 
 825     mca_btl_tcp_proc_tosocks(btl_endpoint->endpoint_addr, &endpoint_addr);
 826 
 827     /* check connect completion status */
 828     if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_ERROR, (char *)&so_error, &so_length) < 0) {
 829         opal_show_help("help-mpi-btl-tcp.txt", "socket flag fail",
 830                        true, opal_process_info.nodename,
 831                        getpid(), "fcntl(sd, F_GETFL, 0)",
 832                        strerror(opal_socket_errno), opal_socket_errno);
 833         BTL_ERROR(("getsockopt() to %s:%d failed: %s (%d)",
 834                    opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
 835                    ((struct sockaddr_in*) &endpoint_addr)->sin_port,
 836                    strerror(opal_socket_errno), opal_socket_errno));
 837         mca_btl_tcp_endpoint_close(btl_endpoint);
 838         return OPAL_ERROR;
 839     }
 840     if(so_error == EINPROGRESS || so_error == EWOULDBLOCK) {
 841         return OPAL_SUCCESS;
 842     }
 843     if(so_error != 0) {
 844         char *msg;
 845         opal_asprintf(&msg, "connect() to %s:%d failed",
 846                  opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
 847                  ntohs(((struct sockaddr_in*) &endpoint_addr)->sin_port));
 848         opal_show_help("help-mpi-btl-tcp.txt", "client connect fail",
 849                        true, opal_process_info.nodename,
 850                        getpid(), msg,
 851                        strerror(opal_socket_errno), opal_socket_errno);
 852         free(msg);
 853         mca_btl_tcp_endpoint_close(btl_endpoint);
 854         return OPAL_ERROR;
 855     }
 856 
 857     opal_output_verbose(10, opal_btl_base_framework.framework_output,
 858                         "btl:tcp: connect() to %s:%d completed (complete_connect), sending connect ACK",
 859                         opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
 860                         ntohs(((struct sockaddr_in*) &endpoint_addr)->sin_port));
 861 
 862     if(mca_btl_tcp_endpoint_send_connect_ack(btl_endpoint) == OPAL_SUCCESS) {
 863         btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECT_ACK;
 864         opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
 865         if( mca_btl_tcp_event_base == opal_sync_event_base ) {
 866             /* If no progress thread then raise the awarness of the default progress engine */
 867             opal_progress_event_users_increment();
 868         }
 869         MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, false, "event_add(recv) [complete_connect]");
 870         return OPAL_SUCCESS;
 871     }
 872     MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, false, " [complete_connect]");
 873     btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
 874     mca_btl_tcp_endpoint_close(btl_endpoint);
 875     return OPAL_ERROR;
 876 }
 877 
 878 
 879 /*
 880  * A file descriptor is available/ready for recv. Check the state
 881  * of the socket and take the appropriate action.
 882  */
 883 
 884 static void mca_btl_tcp_endpoint_recv_handler(int sd, short flags, void* user)
 885 {
 886     mca_btl_base_endpoint_t* btl_endpoint = (mca_btl_base_endpoint_t *)user;
 887 
 888     /* Make sure we don't have a race between a thread that remove the
 889      * recv event, and one event already scheduled.
 890      */
 891     if( sd != btl_endpoint->endpoint_sd )
 892         return;
 893 
 894     /**
 895      * There is an extremely rare race condition here, that can only be
 896      * triggered during the initialization. If the two processes start their
 897      * connection in same time, one of the processes will have to close it's
 898      * previous endpoint (the one opened from the local send). As a result it
 899      * might go in btl_endpoint_close and try to delete the recv_event. This
 900      * call will go back in the libevent, and in a multithreaded case will try
 901      * to lock the event. If another thread noticed the active event (and this
 902      * is possible as during the initialization there will be 2 sockets), one
 903      * thread might get stuck trying to lock the endpoint_recv_lock (while
 904      * holding the event_base lock) while the other thread will try to lock the
 905      * event_base lock (while holding the endpoint_recv lock).
 906      *
 907      * If we can't lock this mutex, it is OK to cancel the receive operation, it
 908      * will be eventually triggered again shorthly.
 909      */
 910     if( OPAL_THREAD_TRYLOCK(&btl_endpoint->endpoint_recv_lock) )
 911         return;
 912 
 913     switch(btl_endpoint->endpoint_state) {
 914     case MCA_BTL_TCP_CONNECT_ACK:
 915         {
 916             int rc = mca_btl_tcp_endpoint_recv_connect_ack(btl_endpoint);
 917             if( OPAL_SUCCESS == rc ) {
 918                 /* we are now connected. Start sending the data */
 919                 OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
 920                 mca_btl_tcp_endpoint_connected(btl_endpoint);
 921                 OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
 922                 MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "connected");
 923             }
 924             else if (OPAL_ERR_BAD_PARAM == rc) {
 925                 /* If we get a BAD_PARAM, it means that it probably wasn't
 926                    an OMPI process on the other end of the socket (e.g.,
 927                    the magic string ID failed).  So we can probably just
 928                    close the socket and ignore this connection. */
 929                 CLOSE_THE_SOCKET(sd);
 930             }
 931             else {
 932                 /* Otherwise, it probably *was* an OMPI peer process on
 933                    the other end, and something bad has probably
 934                    happened.  */
 935                 mca_btl_tcp_module_t *m = btl_endpoint->endpoint_btl;
 936 
 937                 /* Fail up to the PML */
 938                 if (NULL != m->tcp_error_cb) {
 939                     m->tcp_error_cb((mca_btl_base_module_t*) m, MCA_BTL_ERROR_FLAGS_FATAL,
 940                         btl_endpoint->endpoint_proc->proc_opal,
 941                         "TCP ACK is neither SUCCESS nor ERR (something bad has probably happened)");
 942                 }
 943             }
 944             OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
 945             return;
 946         }
 947     case MCA_BTL_TCP_CONNECTED:
 948         {
 949             mca_btl_tcp_frag_t* frag;
 950 
 951             frag = btl_endpoint->endpoint_recv_frag;
 952             if(NULL == frag) {
 953                 if(mca_btl_tcp_module.super.btl_max_send_size >
 954                    mca_btl_tcp_module.super.btl_eager_limit) {
 955                     MCA_BTL_TCP_FRAG_ALLOC_MAX(frag);
 956                 } else {
 957                     MCA_BTL_TCP_FRAG_ALLOC_EAGER(frag);
 958                 }
 959 
 960                 if(NULL == frag) {
 961                     OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
 962                     return;
 963                 }
 964                 MCA_BTL_TCP_FRAG_INIT_DST(frag, btl_endpoint);
 965             }
 966 
 967 #if MCA_BTL_TCP_ENDPOINT_CACHE
 968             assert( 0 == btl_endpoint->endpoint_cache_length );
 969         data_still_pending_on_endpoint:
 970 #endif  /* MCA_BTL_TCP_ENDPOINT_CACHE */
 971             /* check for completion of non-blocking recv on the current fragment */
 972             if(mca_btl_tcp_frag_recv(frag, btl_endpoint->endpoint_sd) == false) {
 973                 btl_endpoint->endpoint_recv_frag = frag;
 974             } else {
 975                 btl_endpoint->endpoint_recv_frag = NULL;
 976                 if( MCA_BTL_TCP_HDR_TYPE_SEND == frag->hdr.type ) {
 977                     mca_btl_active_message_callback_t* reg;
 978                     reg = mca_btl_base_active_message_trigger + frag->hdr.base.tag;
 979                     reg->cbfunc(&frag->btl->super, frag->hdr.base.tag, &frag->base, reg->cbdata);
 980                 }
 981 #if MCA_BTL_TCP_ENDPOINT_CACHE
 982                 if( 0 != btl_endpoint->endpoint_cache_length ) {
 983                     /* If the cache still contain some data we can reuse the same fragment
 984                      * until we flush it completly.
 985                      */
 986                     MCA_BTL_TCP_FRAG_INIT_DST(frag, btl_endpoint);
 987                     goto data_still_pending_on_endpoint;
 988                 }
 989 #endif  /* MCA_BTL_TCP_ENDPOINT_CACHE */
 990                 MCA_BTL_TCP_FRAG_RETURN(frag);
 991             }
 992 #if MCA_BTL_TCP_ENDPOINT_CACHE
 993             assert( 0 == btl_endpoint->endpoint_cache_length );
 994 #endif  /* MCA_BTL_TCP_ENDPOINT_CACHE */
 995             OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
 996             break;
 997         }
 998     case MCA_BTL_TCP_CLOSED:
 999         /* This is a thread-safety issue. As multiple threads are allowed
1000          * to generate events (in the lib event) we endup with several
1001          * threads executing the receive callback, when we reach the end
1002          * of the MPI_Finalize. The first one will close the connections,
1003          * and all others will complain.
1004          */
1005         OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
1006         break;
1007     default:
1008         OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
1009         BTL_ERROR(("invalid socket state(%d)", btl_endpoint->endpoint_state));
1010         btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
1011         mca_btl_tcp_endpoint_close(btl_endpoint);
1012         break;
1013     }
1014 }
1015 
1016 
1017 /*
1018  * A file descriptor is available/ready for send. Check the state
1019  * of the socket and take the appropriate action.
1020  */
1021 
1022 static void mca_btl_tcp_endpoint_send_handler(int sd, short flags, void* user)
1023 {
1024     mca_btl_tcp_endpoint_t* btl_endpoint = (mca_btl_tcp_endpoint_t *)user;
1025 
1026     /* if another thread is already here, give up */
1027     if( OPAL_THREAD_TRYLOCK(&btl_endpoint->endpoint_send_lock) )
1028         return;
1029 
1030     switch(btl_endpoint->endpoint_state) {
1031     case MCA_BTL_TCP_CONNECTING:
1032         mca_btl_tcp_endpoint_complete_connect(btl_endpoint);
1033         break;
1034     case MCA_BTL_TCP_CONNECTED:
1035         /* complete the current send */
1036         while (NULL != btl_endpoint->endpoint_send_frag) {
1037             mca_btl_tcp_frag_t* frag = btl_endpoint->endpoint_send_frag;
1038             int btl_ownership = (frag->base.des_flags & MCA_BTL_DES_FLAGS_BTL_OWNERSHIP);
1039 
1040             if(mca_btl_tcp_frag_send(frag, btl_endpoint->endpoint_sd) == false) {
1041                 break;
1042             }
1043             /* progress any pending sends */
1044             btl_endpoint->endpoint_send_frag = (mca_btl_tcp_frag_t*)
1045                 opal_list_remove_first(&btl_endpoint->endpoint_frags);
1046 
1047             /* if required - update request status and release fragment */
1048             OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
1049             assert( frag->base.des_flags & MCA_BTL_DES_SEND_ALWAYS_CALLBACK );
1050             frag->base.des_cbfunc(&frag->btl->super, frag->endpoint, &frag->base, frag->rc);
1051             if( btl_ownership ) {
1052                 MCA_BTL_TCP_FRAG_RETURN(frag);
1053             }
1054             /* if we fail to take the lock simply return. In the worst case the
1055              * send_handler will be triggered once more, and as there will be
1056              * nothing to send the handler will be deleted.
1057              */
1058             if( OPAL_THREAD_TRYLOCK(&btl_endpoint->endpoint_send_lock) )
1059                 return;
1060         }
1061 
1062         /* if nothing else to do unregister for send event notifications */
1063         if(NULL == btl_endpoint->endpoint_send_frag) {
1064             MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, false, "event_del(send) [endpoint_send_handler]");
1065             opal_event_del(&btl_endpoint->endpoint_send_event);
1066         }
1067         break;
1068     default:
1069         BTL_ERROR(("invalid connection state (%d)", btl_endpoint->endpoint_state));
1070         MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, true, "event_del(send) [endpoint_send_handler:error]");
1071         opal_event_del(&btl_endpoint->endpoint_send_event);
1072         break;
1073     }
1074     OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
1075 }

/* [<][>][^][v][top][bottom][index][help] */