root/contrib/build-mca-comps-outside-of-tree/btl_tcp2_endpoint.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_btl_tcp2_endpoint_construct
  2. mca_btl_tcp2_endpoint_destruct
  3. mca_btl_tcp_endpoint_dump
  4. mca_btl_tcp2_endpoint_event_init
  5. mca_btl_tcp2_endpoint_send
  6. mca_btl_tcp2_endpoint_send_blocking
  7. mca_btl_tcp2_endpoint_send_connect_ack
  8. mca_btl_tcp2_endpoint_accept
  9. mca_btl_tcp2_endpoint_close
  10. mca_btl_tcp2_endpoint_connected
  11. mca_btl_tcp2_endpoint_recv_blocking
  12. mca_btl_tcp2_endpoint_recv_connect_ack
  13. mca_btl_tcp2_set_socket_options
  14. mca_btl_tcp2_endpoint_start_connect
  15. mca_btl_tcp2_endpoint_complete_connect
  16. mca_btl_tcp2_endpoint_recv_handler
  17. mca_btl_tcp2_endpoint_send_handler

   1 /*
   2  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2013 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2007-2008 Sun Microsystems, Inc.  All rights reserved.
  13  * Copyright (c) 2011      Cisco Systems, Inc.  All rights reserved.
  14  * $COPYRIGHT$
  15  *
  16  * Additional copyrights may follow
  17  *
  18  * $HEADER$
  19  *
  20  */
  21 
  22 #include "ompi_config.h"
  23 
  24 #include <stdlib.h>
  25 #include <string.h>
  26 #ifdef HAVE_UNISTD_H
  27 #include <unistd.h>
  28 #endif
  29 #include "opal/opal_socket_errno.h"
  30 #ifdef HAVE_SYS_TYPES_H
  31 #include <sys/types.h>
  32 #endif
  33 #ifdef HAVE_FCNTL_H
  34 #include <fcntl.h>
  35 #endif
  36 #ifdef HAVE_NETINET_IN_H
  37 #include <netinet/in.h>
  38 #endif
  39 #ifdef HAVE_NETINET_TCP_H
  40 #include <netinet/tcp.h>
  41 #endif
  42 #ifdef HAVE_ARPA_INET_H
  43 #include <arpa/inet.h>
  44 #endif
  45 #ifdef HAVE_SYS_TIME_H
  46 #include <sys/time.h>
  47 #endif  /* HAVE_SYS_TIME_H */
  48 #ifdef HAVE_TIME_H
  49 #include <time.h>
  50 #endif  /* HAVE_TIME_H */
  51 
  52 #include "opal/util/net.h"
  53 #include "opal/util/fd.h"
  54 #include "opal/util/show_help.h"
  55 #include "ompi/mca/btl/base/btl_base_error.h"
  56 #include "ompi/mca/rte/rte.h"
  57 
  58 #include "btl_tcp_endpoint.h"
  59 #include "btl_tcp_proc.h"
  60 #include "btl_tcp_frag.h"
  61 
  62 /*
  63  * Initialize state of the endpoint instance.
  64  *
  65  */
  66 static void mca_btl_tcp2_endpoint_construct(mca_btl_tcp2_endpoint_t* endpoint)
  67 {
  68     endpoint->endpoint_btl = NULL;
  69     endpoint->endpoint_proc = NULL;
  70     endpoint->endpoint_addr = NULL;
  71     endpoint->endpoint_sd = -1;
  72     endpoint->endpoint_send_frag = 0;
  73     endpoint->endpoint_recv_frag = 0;
  74     endpoint->endpoint_state = MCA_BTL_TCP_CLOSED;
  75     endpoint->endpoint_retries = 0;
  76     endpoint->endpoint_nbo = false;
  77 #if MCA_BTL_TCP_ENDPOINT_CACHE
  78     endpoint->endpoint_cache        = NULL;
  79     endpoint->endpoint_cache_pos    = NULL;
  80     endpoint->endpoint_cache_length = 0;
  81 #endif  /* MCA_BTL_TCP_ENDPOINT_CACHE */
  82     OBJ_CONSTRUCT(&endpoint->endpoint_frags, opal_list_t);
  83     OBJ_CONSTRUCT(&endpoint->endpoint_send_lock, opal_mutex_t);
  84     OBJ_CONSTRUCT(&endpoint->endpoint_recv_lock, opal_mutex_t);
  85 }
  86 
  87 /*
  88  * Destroy a endpoint
  89  *
  90  */
  91 static void mca_btl_tcp2_endpoint_destruct(mca_btl_tcp2_endpoint_t* endpoint)
  92 {
  93     mca_btl_tcp2_proc_remove(endpoint->endpoint_proc, endpoint);
  94     mca_btl_tcp2_endpoint_close(endpoint);
  95     OBJ_DESTRUCT(&endpoint->endpoint_frags);
  96     OBJ_DESTRUCT(&endpoint->endpoint_send_lock);
  97     OBJ_DESTRUCT(&endpoint->endpoint_recv_lock);
  98 }
  99 
 100 OBJ_CLASS_INSTANCE(
 101     mca_btl_tcp2_endpoint_t,
 102     opal_list_item_t,
 103     mca_btl_tcp2_endpoint_construct,
 104     mca_btl_tcp2_endpoint_destruct);
 105 
 106 
 107 static void mca_btl_tcp2_endpoint_construct(mca_btl_base_endpoint_t* btl_endpoint);
 108 static void mca_btl_tcp2_endpoint_destruct(mca_btl_base_endpoint_t* btl_endpoint);
 109 static int  mca_btl_tcp2_endpoint_start_connect(mca_btl_base_endpoint_t*);
 110 static void mca_btl_tcp2_endpoint_connected(mca_btl_base_endpoint_t*);
 111 static void mca_btl_tcp2_endpoint_recv_handler(int sd, short flags, void* user);
 112 static void mca_btl_tcp2_endpoint_send_handler(int sd, short flags, void* user);
 113 
 114 /*
 115  * Diagnostics: change this to "1" to enable the function
 116  * mca_btl_tcp2_endpoint_dump(), below
 117  */
 118 #define WANT_PEER_DUMP 0
 119 /*
 120  * diagnostics
 121  */
 122 
 123 void mca_btl_tcp_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, const char* msg)
 124 {
 125     char src[64], dst[64], *status;
 126     int sndbuf, rcvbuf, nodelay, flags = -1;
 127 #if OPAL_ENABLE_IPV6
 128     struct sockaddr_storage inaddr;
 129 #else
 130     struct sockaddr_in inaddr;
 131 #endif
 132     opal_socklen_t obtlen;
 133     opal_socklen_t addrlen = sizeof(inaddr);
 134     opal_list_item_t *item;
 135 
 136     if( -1 != btl_endpoint->endpoint_sd ) {
 137         getsockname(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen);
 138 #if OPAL_ENABLE_IPV6
 139         {
 140             char *address;
 141             address = (char *) opal_net_get_hostname((struct sockaddr*) &inaddr);
 142             if (NULL != address) {
 143                 sprintf(src, "%s", address);
 144             }
 145         }
 146 #else
 147         sprintf(src, "%s", inet_ntoa(inaddr.sin_addr));
 148 #endif
 149         getpeername(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen);
 150 #if OPAL_ENABLE_IPV6
 151         {
 152             char *address;
 153             address = (char *) opal_net_get_hostname ((struct sockaddr*) &inaddr);
 154             if (NULL != address) {
 155                 sprintf(dst, "%s", address);
 156             }
 157         }
 158 #else
 159         sprintf(dst, "%s", inet_ntoa(inaddr.sin_addr));
 160 #endif
 161 
 162         if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) {
 163             BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)",
 164                        strerror(opal_socket_errno), opal_socket_errno));
 165         }
 166 
 167 #if defined(SO_SNDBUF)
 168         obtlen = sizeof(sndbuf);
 169         if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &obtlen) < 0) {
 170             BTL_ERROR(("SO_SNDBUF option: %s (%d)",
 171                        strerror(opal_socket_errno), opal_socket_errno));
 172         }
 173 #else
 174         sndbuf = -1;
 175 #endif
 176 #if defined(SO_RCVBUF)
 177         obtlen = sizeof(rcvbuf);
 178         if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf, &obtlen) < 0) {
 179             BTL_ERROR(("SO_RCVBUF option: %s (%d)",
 180                        strerror(opal_socket_errno), opal_socket_errno));
 181         }
 182 #else
 183         rcvbuf = -1;
 184 #endif
 185 #if defined(TCP_NODELAY)
 186         obtlen = sizeof(nodelay);
 187         if(getsockopt(btl_endpoint->endpoint_sd, IPPROTO_TCP, TCP_NODELAY, (char *)&nodelay, &obtlen) < 0) {
 188             BTL_ERROR(("TCP_NODELAY option: %s (%d)",
 189                        strerror(opal_socket_errno), opal_socket_errno));
 190         }
 191 #else
 192         nodelay = 0;
 193 #endif
 194     }
 195 
 196     mca_btl_base_err("%s %s: endpoint %p src %s - dst %s nodelay %d sndbuf %d rcvbuf %d flags %08x\n",
 197                      ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), msg, (void*)btl_endpoint, src, dst, nodelay, sndbuf, rcvbuf, flags);
 198 
 199     switch(btl_endpoint->endpoint_state) {
 200     case MCA_BTL_TCP_CONNECTING:
 201         status = "connecting"; break;
 202     case MCA_BTL_TCP_CONNECT_ACK:
 203         status = "connect ack"; break;
 204     case MCA_BTL_TCP_CLOSED:
 205         status = "closed"; break;
 206     case MCA_BTL_TCP_FAILED:
 207         status = "failed"; break;
 208     case MCA_BTL_TCP_CONNECTED:
 209         status = "connected"; break;
 210     default:
 211         status = "undefined"; break;
 212     }
 213     mca_btl_base_err("%s |  [socket %d] [state %s] (nbo %s) (retries %u)\n"
 214 #if MCA_BTL_TCP_ENDPOINT_CACHE
 215                      "\tcache %p length %lu pos %ld\n"
 216 #endif  /* MCA_BTL_TCP_ENDPOINT_CACHE */
 217                      "\tpending: send %p recv %p\n",
 218                      msg, btl_endpoint->endpoint_sd, status,
 219                      (btl_endpoint->endpoint_nbo ? "true" : "false"), btl_endpoint->endpoint_retries,
 220 #if MCA_BTL_TCP_ENDPOINT_CACHE
 221                      btl_endpoint->endpoint_cache, btl_endpoint->endpoint_cache_length, btl_endpoint->endpoint_cache_pos - btl_endpoint->endpoint_cache,
 222 #endif  /* MCA_BTL_TCP_ENDPOINT_CACHE */
 223                      (void*)btl_endpoint->endpoint_send_frag, (void*)btl_endpoint->endpoint_recv_frag );
 224     for(item =  opal_list_get_first(&btl_endpoint->endpoint_frags);
 225         item != opal_list_get_end(&btl_endpoint->endpoint_frags);
 226         item = opal_list_get_next(item)) {
 227         mca_btl_tcp_dump_frag( (mca_btl_tcp_frag_t*)item, " | send" );
 228     }
 229 }
 230 
 231 /*
 232  * Initialize events to be used by the endpoint instance for TCP select/poll callbacks.
 233  */
 234 
 235 static inline void mca_btl_tcp2_endpoint_event_init(mca_btl_base_endpoint_t* btl_endpoint)
 236 {
 237 #if MCA_BTL_TCP_ENDPOINT_CACHE
 238     btl_endpoint->endpoint_cache     = (char*)malloc(mca_btl_tcp2_component.tcp_endpoint_cache);
 239     btl_endpoint->endpoint_cache_pos = btl_endpoint->endpoint_cache;
 240 #endif  /* MCA_BTL_TCP_ENDPOINT_CACHE */
 241 
 242     opal_event_set(mca_btl_tcp_event_base, &btl_endpoint->endpoint_recv_event,
 243                    btl_endpoint->endpoint_sd,
 244                    OPAL_EV_READ|OPAL_EV_PERSIST,
 245                    mca_btl_tcp_endpoint_recv_handler,
 246                    btl_endpoint );
 247     /**
 248      * The send event should be non persistent until the endpoint is
 249      * completely connected. This means, when the event is created it
 250      * will be fired only once, and when the endpoint is marked as
 251      * CONNECTED the event should be recreated with the correct flags.
 252      */
 253     opal_event_set(mca_btl_tcp_event_base, &btl_endpoint->endpoint_send_event,
 254                    btl_endpoint->endpoint_sd,
 255                    OPAL_EV_WRITE,
 256                    mca_btl_tcp_endpoint_send_handler,
 257                    btl_endpoint);
 258 }
 259 
 260 
 261 /*
 262  * Attempt to send a fragment using a given endpoint. If the endpoint is not connected,
 263  * queue the fragment and start the connection as required.
 264  */
 265 
 266 int mca_btl_tcp2_endpoint_send(mca_btl_base_endpoint_t* btl_endpoint, mca_btl_tcp2_frag_t* frag)
 267 {
 268     int rc = OMPI_SUCCESS;
 269 
 270     MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&btl_endpoint->endpoint_send_lock);
 271     switch(btl_endpoint->endpoint_state) {
 272     case MCA_BTL_TCP_CONNECTING:
 273     case MCA_BTL_TCP_CONNECT_ACK:
 274     case MCA_BTL_TCP_CLOSED:
 275         opal_list_append(&btl_endpoint->endpoint_frags, (opal_list_item_t*)frag);
 276         frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
 277         if(btl_endpoint->endpoint_state == MCA_BTL_TCP_CLOSED)
 278             rc = mca_btl_tcp2_endpoint_start_connect(btl_endpoint);
 279         break;
 280     case MCA_BTL_TCP_FAILED:
 281         rc = OMPI_ERR_UNREACH;
 282         break;
 283     case MCA_BTL_TCP_CONNECTED:
 284         if (btl_endpoint->endpoint_send_frag == NULL) {
 285             if(frag->base.des_flags & MCA_BTL_DES_FLAGS_PRIORITY &&
 286                mca_btl_tcp2_frag_send(frag, btl_endpoint->endpoint_sd)) {
 287                 int btl_ownership = (frag->base.des_flags & MCA_BTL_DES_FLAGS_BTL_OWNERSHIP);
 288                 opal_mutex_atomic_unlock(&btl_endpoint->endpoint_send_lock);
 289                 MCA_BTL_TCP_COMPLETE_FRAG_SEND(frag);
 290                 return 1;
 291             } else {
 292                 btl_endpoint->endpoint_send_frag = frag;
 293                 frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
 294                 MCA_BTL_TCP_ACTIVATE_EVENT(&btl_endpoint->endpoint_send_event, 0);
 295             }
 296         } else {
 297             frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
 298             opal_list_append(&btl_endpoint->endpoint_frags, (opal_list_item_t*)frag);
 299         }
 300         break;
 301     }
 302     MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&btl_endpoint->endpoint_send_lock);
 303     return rc;
 304 }
 305 
 306 
 307 /*
 308  * A blocking send on a non-blocking socket. Used to send the small amount of connection
 309  * information that identifies the endpoints endpoint.
 310  */
 311 static int mca_btl_tcp2_endpoint_send_blocking(mca_btl_base_endpoint_t* btl_endpoint, void* data, size_t size)
 312 {
 313     unsigned char* ptr = (unsigned char*)data;
 314     size_t cnt = 0;
 315     while(cnt < size) {
 316         int retval = send(btl_endpoint->endpoint_sd, (const char *)ptr+cnt, size-cnt, 0);
 317         if(retval < 0) {
 318             if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {
 319                 BTL_ERROR(("send() failed: %s (%d)",
 320                            strerror(opal_socket_errno), opal_socket_errno));
 321                 mca_btl_tcp2_endpoint_close(btl_endpoint);
 322                 return -1;
 323             }
 324             continue;
 325         }
 326         cnt += retval;
 327     }
 328     return cnt;
 329 }
 330 
 331 
 332 /*
 333  * Send the globally unique identifier for this process to a endpoint on
 334  * a newly connected socket.
 335  */
 336 
 337 static int mca_btl_tcp2_endpoint_send_connect_ack(mca_btl_base_endpoint_t* btl_endpoint)
 338 {
 339     /* send process identifier to remote endpoint */
 340     mca_btl_tcp2_proc_t* btl_proc = mca_btl_tcp2_proc_local();
 341     orte_process_name_t guid = btl_proc->proc_ompi->proc_name;
 342 
 343     ORTE_PROCESS_NAME_HTON(guid);
 344     if(mca_btl_tcp2_endpoint_send_blocking(btl_endpoint, &guid, sizeof(guid)) !=
 345           sizeof(guid)) {
 346         return OMPI_ERR_UNREACH;
 347     }
 348     return OMPI_SUCCESS;
 349 }
 350 
 351 /*
 352  * Check the state of this endpoint. If the incoming connection request matches
 353  * our endpoints address, check the state of our connection:
 354  * (1) if a connection has not been attempted, accept the connection
 355  * (2) if a connection has not been established, and the endpoints process identifier
 356  *     is less than the local process, accept the connection
 357  * otherwise, reject the connection and continue with the current connection
 358  */
 359 
 360 bool mca_btl_tcp2_endpoint_accept(mca_btl_base_endpoint_t* btl_endpoint,
 361                                  struct sockaddr* addr, int sd)
 362 {
 363     mca_btl_tcp_proc_t *endpoint_proc = btl_endpoint->endpoint_proc;
 364     const orte_process_name_t *this_proc = &(ompi_proc_local()->proc_name);
 365     int cmpval;
 366 
 367     if(NULL == btl_endpoint->endpoint_addr) {
 368         return false;
 369     }
 370 
 371     OPAL_THREAD_LOCK(&btl_endpoint->endpoint_recv_lock);
 372     OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
 373 
 374     cmpval = ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL,
 375                                     &endpoint_proc->proc_ompi->proc_name,
 376                                     this_proc);
 377     if((btl_endpoint->endpoint_sd < 0) ||
 378        (btl_endpoint->endpoint_state != MCA_BTL_TCP_CONNECTED &&
 379         cmpval < 0)) {
 380         mca_btl_tcp2_endpoint_close(btl_endpoint);
 381         btl_endpoint->endpoint_sd = sd;
 382         if(mca_btl_tcp2_endpoint_send_connect_ack(btl_endpoint) != OMPI_SUCCESS) {
 383             mca_btl_tcp2_endpoint_close(btl_endpoint);
 384             OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
 385             OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
 386             return false;
 387         }
 388         mca_btl_tcp_endpoint_event_init(btl_endpoint);
 389         /* NOT NEEDED if we remove the PERSISTENT flag when we create the
 390          * first recv_event.
 391          */
 392         opal_event_add(&btl_endpoint->endpoint_recv_event, 0);  /* TODO */
 393         mca_btl_tcp_endpoint_connected(btl_endpoint);
 394 #if OPAL_ENABLE_DEBUG && WANT_PEER_DUMP
 395         mca_btl_tcp2_endpoint_dump(btl_endpoint, "accepted");
 396 #endif
 397         OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
 398         OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
 399         return true;
 400     }
 401     OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
 402     OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
 403     return false;
 404 }
 405 
 406 
 407 /*
 408  * Remove any event registrations associated with the socket
 409  * and update the endpoint state to reflect the connection has
 410  * been closed.
 411  */
 412 void mca_btl_tcp2_endpoint_close(mca_btl_base_endpoint_t* btl_endpoint)
 413 {
 414     int sd = btl_endpoint->endpoint_sd;
 415 
 416     do {
 417         if( sd < 0 ) return;
 418     } while ( opal_atomic_cmpset( &(btl_endpoint->endpoint_sd), sd, -1 ) );
 419 
 420     CLOSE_THE_SOCKET(sd);
 421     btl_endpoint->endpoint_retries++;
 422     opal_event_del(&btl_endpoint->endpoint_recv_event);
 423     opal_event_del(&btl_endpoint->endpoint_send_event);
 424 #if MCA_BTL_TCP_ENDPOINT_CACHE
 425     if( NULL != btl_endpoint->endpoint_cache )
 426         free( btl_endpoint->endpoint_cache );
 427     btl_endpoint->endpoint_cache        = NULL;
 428     btl_endpoint->endpoint_cache_pos    = NULL;
 429     btl_endpoint->endpoint_cache_length = 0;
 430 #endif  /* MCA_BTL_TCP_ENDPOINT_CACHE */
 431 }
 432 
 433 /*
 434  *  Setup endpoint state to reflect that connection has been established,
 435  *  and start any pending sends. This function should be called with the
 436  *  send lock locked.
 437  */
 438 
 439 static void mca_btl_tcp2_endpoint_connected(mca_btl_base_endpoint_t* btl_endpoint)
 440 {
 441     /* setup socket options */
 442     btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECTED;
 443     btl_endpoint->endpoint_retries = 0;
 444 
 445     /* Create the send event in a persistent manner. */
 446     opal_event_set(mca_btl_tcp_event_base, &btl_endpoint->endpoint_send_event,
 447                    btl_endpoint->endpoint_sd,
 448                    OPAL_EV_WRITE | OPAL_EV_PERSIST,
 449                    mca_btl_tcp_endpoint_send_handler,
 450                    btl_endpoint );
 451 
 452     if(opal_list_get_size(&btl_endpoint->endpoint_frags) > 0) {
 453         if(NULL == btl_endpoint->endpoint_send_frag) {
 454             btl_endpoint->endpoint_send_frag = (mca_btl_tcp_frag_t*)
 455                 opal_list_remove_first(&btl_endpoint->endpoint_frags);
 456         }
 457         opal_event_add(&btl_endpoint->endpoint_send_event, 0);
 458     }
 459 }
 460 
 461 
 462 /*
 463  * A blocking recv on a non-blocking socket. Used to receive the small amount of connection
 464  * information that identifies the endpoints endpoint.
 465  */
 466 static int mca_btl_tcp2_endpoint_recv_blocking(mca_btl_base_endpoint_t* btl_endpoint, void* data, size_t size)
 467 {
 468     unsigned char* ptr = (unsigned char*)data;
 469     size_t cnt = 0;
 470     while(cnt < size) {
 471         int retval = recv(btl_endpoint->endpoint_sd, (char *)ptr+cnt, size-cnt, 0);
 472 
 473         /* remote closed connection */
 474         if(retval == 0) {
 475             mca_btl_tcp2_endpoint_close(btl_endpoint);
 476             return -1;
 477         }
 478 
 479         /* socket is non-blocking so handle errors */
 480         if(retval < 0) {
 481             if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {
 482                 BTL_ERROR(("recv(%d) failed: %s (%d)",
 483                            btl_endpoint->endpoint_sd, strerror(opal_socket_errno), opal_socket_errno));
 484                 mca_btl_tcp2_endpoint_close(btl_endpoint);
 485                 return -1;
 486             }
 487             continue;
 488         }
 489         cnt += retval;
 490     }
 491     return cnt;
 492 }
 493 
 494 
 495 /*
 496  *  Receive the endpoints globally unique process identification from a newly
 497  *  connected socket and verify the expected response. If so, move the
 498  *  socket to a connected state.
 499  */
 500 static int mca_btl_tcp2_endpoint_recv_connect_ack(mca_btl_base_endpoint_t* btl_endpoint)
 501 {
 502     orte_process_name_t guid;
 503     mca_btl_tcp2_proc_t* btl_proc = btl_endpoint->endpoint_proc;
 504 
 505     if((mca_btl_tcp2_endpoint_recv_blocking(btl_endpoint, &guid, sizeof(orte_process_name_t))) != sizeof(orte_process_name_t)) {
 506         return OMPI_ERR_UNREACH;
 507     }
 508     ORTE_PROCESS_NAME_NTOH(guid);
 509     /* compare this to the expected values */
 510     if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL,
 511                                                     &btl_proc->proc_ompi->proc_name,
 512                                                     &guid)) {
 513         BTL_ERROR(("received unexpected process identifier %s",
 514                    ORTE_NAME_PRINT(&guid)));
 515         mca_btl_tcp2_endpoint_close(btl_endpoint);
 516         return OMPI_ERR_UNREACH;
 517     }
 518 
 519     return OMPI_SUCCESS;
 520 }
 521 
 522 
 523 void mca_btl_tcp2_set_socket_options(int sd)
 524 {
 525     int optval;
 526 #if defined(TCP_NODELAY)
 527     optval = mca_btl_tcp2_component.tcp_use_nodelay;
 528     if(setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char *)&optval, sizeof(optval)) < 0) {
 529         BTL_ERROR(("setsockopt(TCP_NODELAY) failed: %s (%d)",
 530                    strerror(opal_socket_errno), opal_socket_errno));
 531     }
 532 #endif
 533 #if defined(SO_SNDBUF)
 534     if(mca_btl_tcp2_component.tcp_sndbuf > 0 &&
 535        setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&mca_btl_tcp2_component.tcp_sndbuf, sizeof(int)) < 0) {
 536         BTL_ERROR(("setsockopt(SO_SNDBUF) failed: %s (%d)",
 537                    strerror(opal_socket_errno), opal_socket_errno));
 538     }
 539 #endif
 540 #if defined(SO_RCVBUF)
 541     if(mca_btl_tcp2_component.tcp_rcvbuf > 0 &&
 542        setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&mca_btl_tcp2_component.tcp_rcvbuf, sizeof(int)) < 0) {
 543         BTL_ERROR(("setsockopt(SO_RCVBUF) failed: %s (%d)",
 544                    strerror(opal_socket_errno), opal_socket_errno));
 545     }
 546 #endif
 547 }
 548 
 549 
 550 
 551 /*
 552  *  Start a connection to the endpoint. This will likely not complete,
 553  *  as the socket is set to non-blocking, so register for event
 554  *  notification of connect completion. On connection we send
 555  *  our globally unique process identifier to the endpoint and wait for
 556  *  the endpoints response.
 557  */
 558 static int mca_btl_tcp2_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpoint)
 559 {
 560     int rc,flags;
 561     struct sockaddr_storage endpoint_addr;
 562     /* By default consider a IPv4 connection */
 563     uint16_t af_family = AF_INET;
 564     opal_socklen_t addrlen = sizeof(struct sockaddr_in);
 565 
 566 #if OPAL_ENABLE_IPV6
 567     if (AF_INET6 == btl_endpoint->endpoint_addr->addr_family) {
 568         af_family = AF_INET6;
 569         addrlen = sizeof (struct sockaddr_in6);
 570     }
 571 #endif
 572 
 573     btl_endpoint->endpoint_sd = socket(af_family, SOCK_STREAM, 0);
 574     if (btl_endpoint->endpoint_sd < 0) {
 575         btl_endpoint->endpoint_retries++;
 576         return OMPI_ERR_UNREACH;
 577     }
 578 
 579     /* setup socket buffer sizes */
 580     mca_btl_tcp2_set_socket_options(btl_endpoint->endpoint_sd);
 581 
 582     /* setup event callbacks */
 583     mca_btl_tcp2_endpoint_event_init(btl_endpoint);
 584 
 585     /* setup the socket as non-blocking */
 586     if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) {
 587         BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)",
 588                    strerror(opal_socket_errno), opal_socket_errno));
 589     } else {
 590         flags |= O_NONBLOCK;
 591         if(fcntl(btl_endpoint->endpoint_sd, F_SETFL, flags) < 0)
 592             BTL_ERROR(("fcntl(F_SETFL) failed: %s (%d)",
 593                        strerror(opal_socket_errno), opal_socket_errno));
 594     }
 595 
 596     /* start the connect - will likely fail with EINPROGRESS */
 597     mca_btl_tcp2_proc_tosocks(btl_endpoint->endpoint_addr, &endpoint_addr);
 598 
 599     opal_output_verbose(20, mca_btl_base_output,
 600                         "btl: tcp: attempting to connect() to address %s on port %d",
 601                         opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
 602                         btl_endpoint->endpoint_addr->addr_port);
 603 
 604     if(connect(btl_endpoint->endpoint_sd, (struct sockaddr*)&endpoint_addr, addrlen) < 0) {
 605         /* non-blocking so wait for completion */
 606         if(opal_socket_errno == EINPROGRESS || opal_socket_errno == EWOULDBLOCK) {
 607             btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECTING;
 608             MCA_BTL_TCP_ACTIVATE_EVENT(&btl_endpoint->endpoint_send_event, 0);
 609             return OMPI_SUCCESS;
 610         }
 611         {
 612             char *address;
 613             address = opal_net_get_hostname((struct sockaddr*) &endpoint_addr);
 614             BTL_PEER_ERROR( btl_endpoint->endpoint_proc->proc_ompi,
 615                           ( "Unable to connect to the peer %s on port %d: %s\n",
 616                             address,
 617                            btl_endpoint->endpoint_addr->addr_port, strerror(opal_socket_errno) ) );
 618         }
 619         mca_btl_tcp2_endpoint_close(btl_endpoint);
 620         btl_endpoint->endpoint_retries++;
 621         return OMPI_ERR_UNREACH;
 622     }
 623 
 624     /* send our globally unique process identifier to the endpoint */
 625     if((rc = mca_btl_tcp2_endpoint_send_connect_ack(btl_endpoint)) == OMPI_SUCCESS) {
 626         btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECT_ACK;
 627         MCA_BTL_TCP_ACTIVATE_EVENT(&btl_endpoint->endpoint_recv_event, 0);
 628     } else {
 629         mca_btl_tcp2_endpoint_close(btl_endpoint);
 630     }
 631     return rc;
 632 }
 633 
 634 
 635 /*
 636  * Check the status of the connection. If the connection failed, will retry
 637  * later. Otherwise, send this processes identifier to the endpoint on the
 638  * newly connected socket.
 639  */
 640 static void mca_btl_tcp2_endpoint_complete_connect(mca_btl_base_endpoint_t* btl_endpoint)
 641 {
 642     int so_error = 0;
 643     opal_socklen_t so_length = sizeof(so_error);
 644     struct sockaddr_storage endpoint_addr;
 645 
 646     mca_btl_tcp2_proc_tosocks(btl_endpoint->endpoint_addr, &endpoint_addr);
 647 
 648     /* unregister from receiving event notifications */
 649     opal_event_del(&btl_endpoint->endpoint_send_event);
 650 
 651     /* check connect completion status */
 652     if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_ERROR, (char *)&so_error, &so_length) < 0) {
 653         BTL_ERROR(("getsockopt() to %s failed: %s (%d)",
 654                    opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
 655                    strerror(opal_socket_errno), opal_socket_errno));
 656         mca_btl_tcp2_endpoint_close(btl_endpoint);
 657         return;
 658     }
 659     if(so_error == EINPROGRESS || so_error == EWOULDBLOCK) {
 660         opal_event_add(&btl_endpoint->endpoint_send_event, 0);
 661         return;
 662     }
 663     if(so_error != 0) {
 664         BTL_ERROR(("connect() to %s failed: %s (%d)",
 665                    opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
 666                    strerror(so_error), so_error));
 667         mca_btl_tcp2_endpoint_close(btl_endpoint);
 668         return;
 669     }
 670 
 671     if(mca_btl_tcp2_endpoint_send_connect_ack(btl_endpoint) == OMPI_SUCCESS) {
 672         btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECT_ACK;
 673         opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
 674     } else {
 675         mca_btl_tcp2_endpoint_close(btl_endpoint);
 676     }
 677 }
 678 
 679 
 680 /*
 681  * A file descriptor is available/ready for recv. Check the state
 682  * of the socket and take the appropriate action.
 683  */
 684 
 685 static void mca_btl_tcp2_endpoint_recv_handler(int sd, short flags, void* user)
 686 {
 687     mca_btl_base_endpoint_t* btl_endpoint = (mca_btl_base_endpoint_t *)user;
 688 
 689     /* Make sure we don't have a race between a thread that remove the
 690      * recv event, and one event already scheduled.
 691      */
 692     if( sd != btl_endpoint->endpoint_sd )
 693         return;
 694 
 695     OPAL_THREAD_LOCK(&btl_endpoint->endpoint_recv_lock);
 696     switch(btl_endpoint->endpoint_state) {
 697     case MCA_BTL_TCP_CONNECT_ACK:
 698         {
 699             int rc = OMPI_ERROR;
 700             rc = mca_btl_tcp2_endpoint_recv_connect_ack(btl_endpoint);
 701             if( OMPI_SUCCESS == rc ) {
 702                 /* we are now connected. Start sending the data */
 703                 OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
 704                 mca_btl_tcp2_endpoint_connected(btl_endpoint);
 705                 OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
 706 #if OPAL_ENABLE_DEBUG && WANT_PEER_DUMP
 707                 mca_btl_tcp2_endpoint_dump(btl_endpoint, "connected");
 708 #endif
 709             }
 710             OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
 711             return;
 712         }
 713     case MCA_BTL_TCP_CONNECTED:
 714         {
 715             mca_btl_tcp2_frag_t* frag;
 716 
 717             frag = btl_endpoint->endpoint_recv_frag;
 718 
 719         data_still_pending_on_endpoint:
 720             if(NULL == frag) {
 721 
 722                 if(mca_btl_tcp_module.super.btl_max_send_size >
 723                    mca_btl_tcp_module.super.btl_eager_limit) {
 724                     MCA_BTL_TCP_FRAG_ALLOC_MAX(frag);
 725                 } else {
 726                     MCA_BTL_TCP_FRAG_ALLOC_EAGER(frag);
 727                 }
 728 
 729                 if(NULL == frag) {
 730                     OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
 731                     return;
 732                 }
 733                 MCA_BTL_TCP_FRAG_INIT_DST(frag, btl_endpoint);
 734             }
 735 
 736             /* check for completion of non-blocking recv on the current fragment */
 737             if( mca_btl_tcp_frag_recv(frag, btl_endpoint->endpoint_sd) == false ) {
 738                 btl_endpoint->endpoint_recv_frag = frag;
 739             } else {
 740                 btl_endpoint->endpoint_recv_frag = NULL;
 741 
 742                 TODO_MCA_BTL_TCP_RECV_TRIGGER_CB(frag);
 743 
 744 #if MCA_BTL_TCP_ENDPOINT_CACHE
 745                 if( 0 != btl_endpoint->endpoint_cache_length ) {
 746 #if MCA_BTL_TCP_USES_PROGRESS_THREAD
 747                     /* Get a new fragment and try again */
 748                     frag = NULL;
 749 #else
 750                     /* If the cache still contain some data we can reuse the same fragment
 751                      * until we flush it completly.
 752                      */
 753                     MCA_BTL_TCP_FRAG_INIT_DST(frag, btl_endpoint);
 754 #endif  /* MCA_BTL_TCP_USES_PROGRESS_THREAD */
 755                     goto data_still_pending_on_endpoint;
 756                 }
 757 #endif  /* MCA_BTL_TCP_ENDPOINT_CACHE */
 758 
 759 #if !MCA_BTL_TCP_USES_PROGRESS_THREAD
 760                 MCA_BTL_TCP_FRAG_RETURN(frag);
 761 #endif  /* MCA_BTL_TCP_USES_PROGRESS_THREAD */
 762             }
 763 #if MCA_BTL_TCP_ENDPOINT_CACHE
 764             assert( 0 == btl_endpoint->endpoint_cache_length );
 765 #endif  /* MCA_BTL_TCP_ENDPOINT_CACHE */
 766             OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
 767             break;
 768         }
 769     case MCA_BTL_TCP_CLOSED:
 770         /* This is a thread-safety issue. As multiple threads are allowed
 771          * to generate events (in the lib event) we endup with several
 772          * threads executing the receive callback, when we reach the end
 773          * of the MPI_Finalize. The first one will close the connections,
 774          * and all others will complain.
 775          */
 776         break;
 777     default:
 778         OPAL_THREAD_LOCK(&btl_endpoint->endpoint_recv_lock);
 779         BTL_ERROR(("invalid socket state(%d)", btl_endpoint->endpoint_state));
 780         btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
 781         mca_btl_tcp_endpoint_close(btl_endpoint);
 782         OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
 783         break;
 784     }
 785 }
 786 
 787 
 788 /*
 789  * A file descriptor is available/ready for send. Check the state
 790  * of the socket and take the appropriate action.
 791  */
 792 
 793 static void mca_btl_tcp2_endpoint_send_handler(int sd, short flags, void* user)
 794 {
 795     mca_btl_tcp_endpoint_t* btl_endpoint = (mca_btl_tcp_endpoint_t *)user;
 796     opal_mutex_atomic_lock(&btl_endpoint->endpoint_send_lock);
 797     switch(btl_endpoint->endpoint_state) {
 798     case MCA_BTL_TCP_CONNECTING:
 799         mca_btl_tcp2_endpoint_complete_connect(btl_endpoint);
 800         break;
 801     case MCA_BTL_TCP_CONNECTED:
 802         /* complete the current send */
 803         while (NULL != btl_endpoint->endpoint_send_frag) {
 804             mca_btl_tcp2_frag_t* frag = btl_endpoint->endpoint_send_frag;
 805             int btl_ownership = (frag->base.des_flags & MCA_BTL_DES_FLAGS_BTL_OWNERSHIP);
 806 
 807             if(mca_btl_tcp2_frag_send(frag, btl_endpoint->endpoint_sd) == false) {
 808                 break;
 809             }
 810             /* progress any pending sends */
 811             btl_endpoint->endpoint_send_frag = (mca_btl_tcp2_frag_t*)
 812                 opal_list_remove_first(&btl_endpoint->endpoint_frags);
 813 
 814             /* if required - update request status and release fragment */
 815             opal_mutex_atomic_unlock(&btl_endpoint->endpoint_send_lock);
 816             assert( frag->base.des_flags & MCA_BTL_DES_SEND_ALWAYS_CALLBACK );
 817             TODO_MCA_BTL_TCP_COMPLETE_FRAG_SEND(frag);
 818             opal_mutex_atomic_lock(&btl_endpoint->endpoint_send_lock);
 819         }
 820 
 821         /* if no more data to send unregister the send notifications */
 822         if(NULL == btl_endpoint->endpoint_send_frag) {
 823             opal_event_del(&btl_endpoint->endpoint_send_event);
 824         }
 825         break;
 826     default:
 827         BTL_ERROR(("invalid connection state (%d)", btl_endpoint->endpoint_state));
 828         opal_event_del(&btl_endpoint->endpoint_send_event);
 829         break;
 830     }
 831     opal_mutex_atomic_unlock(&btl_endpoint->endpoint_send_lock);
 832 }
 833 
 834 
 835 

/* [<][>][^][v][top][bottom][index][help] */