root/opal/mca/btl/usnic/btl_usnic_cagent.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. udp_port_listener_zero
  2. udp_port_listener_constructor
  3. udp_port_listener_destructor
  4. ipc_listener_zero
  5. ipc_listener_constructor
  6. ipc_listener_destructor
  7. agent_ping_result_zero
  8. agent_ping_result_constructor
  9. agent_ping_result_destructor
  10. agent_sendto
  11. agent_thread_handle_ping
  12. agent_thread_handle_ack
  13. agent_thread_receive_ping
  14. agent_thread_find_listener
  15. agent_thread_cmd_listen_reply
  16. agent_thread_cmd_listen
  17. agent_thread_send_ping
  18. agent_thread_cmd_ping
  19. agent_thread_cmd_unlisten
  20. agent_thread_ipc_receive
  21. agent_thread_accept
  22. agent_thread_finalize
  23. opal_btl_usnic_connectivity_agent_init
  24. opal_btl_usnic_connectivity_agent_finalize

   1 /*
   2  * Copyright (c) 2014-2016 Cisco Systems, Inc.  All rights reserved.
   3  * Copyright (c) 2015      Research Organization for Information Science
   4  *                         and Technology (RIST). All rights reserved.
   5  * Copyright (c) 2018      Amazon.com, Inc. or its affiliates.  All Rights reserved.
   6  * $COPYRIGHT$
   7  *
   8  * Additional copyrights may follow
   9  *
  10  * $HEADER$
  11  */
  12 
  13 #include "opal_config.h"
  14 
  15 #include <assert.h>
  16 #include <sys/types.h>
  17 #include <sys/socket.h>
  18 #include <sys/un.h>
  19 #include <unistd.h>
  20 #ifdef HAVE_ALLOCA_H
  21 #include <alloca.h>
  22 #endif
  23 
  24 #include "opal_stdint.h"
  25 #include "opal/threads/mutex.h"
  26 #include "opal/mca/event/event.h"
  27 #include "opal/util/show_help.h"
  28 #include "opal/types.h"
  29 #include "opal/util/output.h"
  30 #include "opal/util/fd.h"
  31 #include "opal/util/string_copy.h"
  32 #include "opal/util/printf.h"
  33 
  34 #include "btl_usnic.h"
  35 #include "btl_usnic_connectivity.h"
  36 
  37 /**************************************************************************
  38  * Agent data and methods
  39  **************************************************************************/
  40 
  41 /*
  42  * Local variables
  43  */
  44 static int ipc_accept_fd = -1;
  45 static char *ipc_filename = NULL;
  46 static opal_event_t ipc_event;
  47 static struct timeval ack_timeout;
  48 static opal_list_t udp_port_listeners;
  49 static opal_list_t ipc_listeners;
  50 static volatile int ipc_accepts = 0;
  51 /* JMS The pings_pending and ping_results should probably both be hash
  52    tables for more efficient lookups */
  53 static opal_list_t pings_pending;
  54 static opal_list_t ping_results;
  55 static volatile bool agent_initialized = false;
  56 
  57 
  58 /*
  59  * Holds all the information about a UDP port that the agent thread is
  60  * listening on (for incoming PINGs and ACKs).
  61  */
  62 typedef struct {
  63     opal_list_item_t super;
  64 
  65     /* Data from the LISTEN command message */
  66     uint32_t ipv4_addr;
  67     uint32_t netmask;
  68     char ipv4_addr_str[IPV4STRADDRLEN];
  69     uint32_t max_msg_size;
  70     char *nodename;
  71     char *usnic_name;
  72 
  73     /* File descriptor, UDP port, buffer to receive messages, and event */
  74     int fd;
  75     uint32_t udp_port;
  76     uint8_t *buffer;
  77     opal_event_t event;
  78     bool active;
  79     opal_btl_usnic_module_t *module;
  80 } agent_udp_port_listener_t;
  81 
  82 OBJ_CLASS_DECLARATION(agent_udp_port_listener_t);
  83 
  84 /*
  85  * Holds information for a local IPC socket fd (i.e., a connection
  86  * from a local process to this agent).
  87  */
  88 typedef struct {
  89     opal_list_item_t super;
  90 
  91     int client_fd;
  92     opal_event_t event;
  93     bool active;
  94 } agent_ipc_listener_t;
  95 
  96 OBJ_CLASS_DECLARATION(agent_ipc_listener_t);
  97 
  98 typedef enum {
  99     AGENT_MSG_TYPE_PING = 17,
 100     AGENT_MSG_TYPE_ACK
 101 } agent_udp_message_type_t;
 102 
 103 // Arbitrary 64 bit numbers
 104 #define MAGIC_ORIGINATOR 0x9a9e2fbce63a11e5
 105 #define MAGIC_TARGET 0x60735c68f368aace
 106 
 107 /*
 108  * Ping and ACK messages
 109  */
 110 typedef struct {
 111     uint8_t message_type;
 112 
 113     /* The sender's IP address and port (i.e., where the ACK can be
 114        sent).  This is actually redundant with the sockaddr that we
 115        get from recvfrom(), but that's ok -- it provides a sanity
 116        check.  */
 117     uint32_t src_ipv4_addr;
 118     uint32_t src_udp_port;
 119 
 120     /* A magic number that helps determine that the sender was Open
 121        MPI */
 122     uint64_t magic_number;
 123     uint32_t major_version, minor_version;
 124 
 125     /* If this is a PING, the message should be this size.
 126        If this is an ACK, we are ACKing a ping of this size. */
 127     uint32_t size;
 128 } agent_udp_message_t;
 129 
 130 typedef struct {
 131     opal_list_item_t super;
 132 
 133     /* Data from the PING command message */
 134     uint32_t src_ipv4_addr; /* in network byte order */
 135     uint32_t src_udp_port;
 136     agent_udp_port_listener_t *listener;
 137     uint32_t dest_ipv4_addr; /* in network byte order */
 138     uint32_t dest_netmask;
 139     uint32_t dest_udp_port;
 140     struct sockaddr_in dest_sockaddr;
 141     char *dest_nodename;
 142 
 143     /* The sizes and corresponding buffers of the PING messages that
 144        we'll send, and whether each of those PING messages have been
 145        ACKed yet */
 146 #define NUM_PING_SIZES 2
 147     size_t sizes[NUM_PING_SIZES];
 148     uint8_t *buffers[NUM_PING_SIZES];
 149     bool acked[NUM_PING_SIZES];
 150 
 151     /* Number of times we've sent this ping */
 152     int num_sends;
 153 
 154     /* Timer used to re-send the PING, and whether the timer is active
 155        or not */
 156     opal_event_t timer;
 157     bool timer_active;
 158 } agent_ping_t;
 159 
 160 OBJ_CLASS_DECLARATION(agent_ping_t);
 161 
 162 
 163 /**************************************************************************
 164  * Utility functions, constructors, destructors
 165  **************************************************************************/
 166 
 167 static void udp_port_listener_zero(agent_udp_port_listener_t *obj)
 168 {
 169     obj->ipv4_addr =
 170         obj->netmask =
 171         obj->max_msg_size = 0;
 172     obj->nodename =
 173         obj->usnic_name = NULL;
 174     memset(obj->ipv4_addr_str, 0, sizeof(obj->ipv4_addr_str));
 175 
 176     obj->fd = -1;
 177     obj->udp_port = -1;
 178     obj->buffer = NULL;
 179 
 180     obj->active = false;
 181 }
 182 
 183 static void udp_port_listener_constructor(agent_udp_port_listener_t *obj)
 184 {
 185     udp_port_listener_zero(obj);
 186 }
 187 
 188 static void udp_port_listener_destructor(agent_udp_port_listener_t *obj)
 189 {
 190     /* Find any pings that are pending on this listener and delete
 191        them */
 192     agent_ping_t *ap, *apnext;
 193     OPAL_LIST_FOREACH_SAFE(ap, apnext, &pings_pending, agent_ping_t) {
 194         if (ap->src_ipv4_addr == obj->ipv4_addr) {
 195             opal_list_remove_item(&pings_pending, &ap->super);
 196             OBJ_RELEASE(ap);
 197         }
 198     }
 199 
 200     if (-1 != obj->fd) {
 201         close(obj->fd);
 202     }
 203     if (NULL != obj->nodename) {
 204         free(obj->nodename);
 205     }
 206     if (NULL != obj->usnic_name) {
 207         free(obj->usnic_name);
 208     }
 209     if (NULL != obj->buffer) {
 210         free(obj->buffer);
 211     }
 212 
 213     /* If the "active" flag is set, then the event is active and the
 214        item is on the udp_port_listeners list */
 215     if (obj->active) {
 216         opal_event_del(&obj->event);
 217         opal_list_remove_item(&udp_port_listeners, &obj->super);
 218     }
 219 
 220     udp_port_listener_zero(obj);
 221 }
 222 
 223 OBJ_CLASS_INSTANCE(agent_udp_port_listener_t,
 224                    opal_list_item_t,
 225                    udp_port_listener_constructor,
 226                    udp_port_listener_destructor);
 227 
 228 static void ipc_listener_zero(agent_ipc_listener_t *obj)
 229 {
 230     obj->client_fd = -1;
 231     obj->active = false;
 232 }
 233 
 234 static void ipc_listener_constructor(agent_ipc_listener_t *obj)
 235 {
 236     ipc_listener_zero(obj);
 237 }
 238 
 239 static void ipc_listener_destructor(agent_ipc_listener_t *obj)
 240 {
 241     if (-1 != obj->client_fd) {
 242         close(obj->client_fd);
 243     }
 244 
 245     /* If the "active" flag is set, then the event is active and the
 246        item is on the ipc_listeners list */
 247     if (obj->active) {
 248         opal_event_del(&obj->event);
 249         opal_list_remove_item(&ipc_listeners, &obj->super);
 250     }
 251 
 252     ipc_listener_zero(obj);
 253 }
 254 
 255 OBJ_CLASS_INSTANCE(agent_ipc_listener_t,
 256                    opal_list_item_t,
 257                    ipc_listener_constructor,
 258                    ipc_listener_destructor);
 259 
 260 static void agent_ping_result_zero(agent_ping_t *obj)
 261 {
 262     obj->src_ipv4_addr = 0;
 263     obj->src_udp_port = 0;
 264     obj->listener = NULL;
 265     obj->dest_ipv4_addr = 0;
 266     obj->dest_udp_port = 0;
 267     obj->num_sends = 0;
 268     obj->timer_active = false;
 269 
 270     for (int i = 0; i < NUM_PING_SIZES; ++i) {
 271         obj->sizes[i] = 0;
 272         obj->buffers[i] = NULL;
 273         obj->acked[i] = false;
 274     }
 275 }
 276 
 277 static void agent_ping_result_constructor(agent_ping_t *obj)
 278 {
 279     agent_ping_result_zero(obj);
 280 }
 281 
 282 static void agent_ping_result_destructor(agent_ping_t *obj)
 283 {
 284     for (int i = 0; i < NUM_PING_SIZES; ++i) {
 285         if (NULL != obj->buffers[i]) {
 286             free(obj->buffers[i]);
 287         }
 288     }
 289     if (obj->timer_active) {
 290         opal_event_del(&obj->timer);
 291     }
 292 
 293     agent_ping_result_zero(obj);
 294 }
 295 
 296 OBJ_CLASS_INSTANCE(agent_ping_t,
 297                    opal_list_item_t,
 298                    agent_ping_result_constructor,
 299                    agent_ping_result_destructor);
 300 
 301 /*
 302  * Wrapper around sendto() loop
 303  */
 304 static void agent_sendto(int fd, char *buffer, ssize_t numbytes,
 305                          struct sockaddr *addr)
 306 {
 307     ssize_t rc;
 308     while (1) {
 309         rc = sendto(fd, buffer, numbytes, 0, addr, sizeof(*addr));
 310         /* Note that since this is UDP, so we don't need to check
 311            for 0 < rc < numbytes */
 312         if (rc == numbytes) {
 313             return;
 314         } else if (rc < 0) {
 315             if (errno == EAGAIN || errno == EINTR) {
 316                 continue;
 317             } else if (errno == EPERM) {
 318                 // We're sending too fast
 319                 usleep(5);
 320                 continue;
 321             }
 322 
 323             char *msg;
 324             opal_asprintf(&msg, "Unexpected sendto() error: errno=%d (%s)",
 325                      errno, strerror(errno));
 326             ABORT(msg);
 327             /* Will not return */
 328         }
 329 
 330         /* We should never get here, but just in case we do, sleep a
 331            little, just so we don't hammer the CPU */
 332         usleep(1);
 333     }
 334 
 335     /* Will not get here */
 336 }
 337 
 338 /**************************************************************************
 339  * All of the following functions run in agent thread
 340  **************************************************************************/
 341 
 342 /*
 343  * Handle an incoming PING message (send an ACK)
 344  */
 345 static void agent_thread_handle_ping(agent_udp_port_listener_t *listener,
 346                                      ssize_t numbytes, struct sockaddr *from)
 347 {
 348     /* If the size we received isn't equal to what the sender says it
 349        sent, do the simple thing: just don't send an ACK */
 350     agent_udp_message_t *msg = (agent_udp_message_t*) listener->buffer;
 351     struct sockaddr_in *src_addr_in = (struct sockaddr_in*) from;
 352     if (msg->size != numbytes) {
 353         char str[INET_ADDRSTRLEN];
 354         inet_ntop(AF_INET, &src_addr_in->sin_addr, str, sizeof(str));
 355 
 356         opal_output_verbose(20, USNIC_OUT,
 357                             "usNIC connectivity got bad ping: %d bytes from %s, expected %d (discarded)",
 358                             (int) numbytes, str, (int) msg->size);
 359         return;
 360     }
 361 
 362     /* Ensure that the sender sent the ping from the IP address that
 363        they think they sent it from.  If they didn't, then drop it
 364        (i.e., it's a bad ping because the sender sent it from an
 365        unexpected interface).  This should probably never happen, but
 366        it's a good failsafe for unexpected scenarios. */
 367     char msg_ipv4_addr_str[IPV4STRADDRLEN];
 368     char real_ipv4_addr_str[IPV4STRADDRLEN];
 369 
 370     opal_btl_usnic_snprintf_ipv4_addr(msg_ipv4_addr_str,
 371                                       sizeof(msg_ipv4_addr_str),
 372                                       msg->src_ipv4_addr, 0);
 373     opal_btl_usnic_snprintf_ipv4_addr(real_ipv4_addr_str,
 374                                       sizeof(real_ipv4_addr_str),
 375                                       src_addr_in->sin_addr.s_addr, 0);
 376 
 377     if (msg->src_ipv4_addr != src_addr_in->sin_addr.s_addr) {
 378         opal_output_verbose(20, USNIC_OUT,
 379                             "usNIC connectivity got bad ping (from unexpected address: %s != %s, discarded)",
 380                             msg_ipv4_addr_str, real_ipv4_addr_str);
 381         return;
 382     }
 383 
 384     if (msg->magic_number != MAGIC_ORIGINATOR) {
 385         opal_output_verbose(20, USNIC_OUT,
 386                             "usNIC connectivity got bad ping (magic number: %" PRIu64 ", discarded)",
 387                             msg->magic_number);
 388         return;
 389     }
 390     if (msg->major_version != OPAL_MAJOR_VERSION ||
 391         msg->minor_version != OPAL_MINOR_VERSION) {
 392         opal_output_verbose(20, USNIC_OUT,
 393                             "usNIC connectivity got bad ping (originator version: %d.%d, expected %d.%d, discarded)",
 394                             msg->major_version, msg->minor_version,
 395                             OPAL_MAJOR_VERSION, OPAL_MINOR_VERSION);
 396         return;
 397     }
 398 
 399     /* Ok, this is a good ping.  Send the ACK back.  The PING sender
 400        will verify that the ACK came back from the IP address that it
 401        expected. */
 402 
 403     opal_output_verbose(20, USNIC_OUT,
 404                         "usNIC connectivity got PING (size=%ld) from %s; sending ACK",
 405                         numbytes, msg_ipv4_addr_str);
 406 
 407     /* Send back an ACK.  No need to allocate a new buffer; just
 408        re-use the same buffer we just got.  Note that msg->size is
 409        already set.  We simply echo back the sender's IP address/port
 410        in the msg (the sender will use the msg fields and the
 411        recvfrom() src_addr to check for a match). */
 412     msg->message_type = AGENT_MSG_TYPE_ACK;
 413     msg->magic_number = MAGIC_TARGET;
 414 
 415     agent_sendto(listener->fd, (char*) listener->buffer, sizeof(*msg), from);
 416 }
 417 
 418 /*
 419  * Handle an incoming ACK message
 420  */
 421 static void agent_thread_handle_ack(agent_udp_port_listener_t *listener,
 422                                     ssize_t numbytes, struct sockaddr *from)
 423 {
 424     char str[INET_ADDRSTRLEN];
 425     struct sockaddr_in *src_addr_in = (struct sockaddr_in*) from;
 426     inet_ntop(AF_INET, &src_addr_in->sin_addr, str, sizeof(str));
 427 
 428     /* If we got a wonky ACK message that is the wrong length, just
 429        return */
 430     agent_udp_message_t *msg = (agent_udp_message_t*) listener->buffer;
 431     if (numbytes != sizeof(*msg)) {
 432         opal_output_verbose(20, USNIC_OUT,
 433                             "usNIC connectivity got bad ACK: %d bytes from %s, expected %d (discarded)",
 434                             (int) numbytes, str, (int) sizeof(*msg));
 435         return;
 436     }
 437     if (msg->magic_number != MAGIC_TARGET) {
 438         opal_output_verbose(20, USNIC_OUT,
 439                             "usNIC connectivity got bad ACK (magic number: %" PRIu64 ", discarded)",
 440                             msg->magic_number);
 441         return;
 442     }
 443 
 444     /* Find the pending ping request (on this interface) for this ACK.
 445        If we don't find a match, we'll drop it. */
 446     agent_ping_t *ap;
 447     uint32_t src_in_port = ntohs(src_addr_in->sin_port);
 448     OPAL_LIST_FOREACH(ap, &pings_pending, agent_ping_t) {
 449         if (ap->dest_ipv4_addr == src_addr_in->sin_addr.s_addr &&
 450             ap->dest_udp_port == src_in_port &&
 451             ap->src_ipv4_addr == msg->src_ipv4_addr &&
 452             ap->src_udp_port == msg->src_udp_port) {
 453             /* Found it -- indicate that it has been acked */
 454             for (int i = 0; i < NUM_PING_SIZES; ++i) {
 455                 if (ap->sizes[i] == msg->size) {
 456                     ap->acked[i] = true;
 457                     return;
 458                 }
 459             }
 460         }
 461     }
 462 
 463     /* If we didn't find the matching ping for this ACK, then just
 464        discard it */
 465     opal_output_verbose(20, USNIC_OUT,
 466                         "usNIC connectivity got unexpected ACK: %d bytes from %s (discarded)",
 467                         (int) numbytes, str);
 468 }
 469 
 470 /*
 471  * Receive a message from the listening UDP socket
 472  */
 473 static void agent_thread_receive_ping(int fd, short flags, void *context)
 474 {
 475     agent_udp_port_listener_t *listener =
 476         (agent_udp_port_listener_t *) context;
 477     assert(NULL != listener);
 478 
 479     /* Receive the message */
 480     ssize_t numbytes;
 481     struct sockaddr src_addr;
 482     struct sockaddr_in *src_addr_in = (struct sockaddr_in*) &src_addr;
 483     socklen_t addrlen = sizeof(src_addr);
 484 
 485     while (1) {
 486         numbytes = recvfrom(listener->fd, listener->buffer, listener->max_msg_size, 0,
 487                             &src_addr, &addrlen);
 488         if (numbytes > 0) {
 489             break;
 490         } else if (numbytes < 0) {
 491             if (errno == EAGAIN || errno == EINTR) {
 492                 continue;
 493             }
 494 
 495             ABORT("Unexpected error from recvfrom");
 496             /* Will not return */
 497         }
 498     }
 499 
 500     char str[INET_ADDRSTRLEN];
 501     agent_udp_message_t *msg;
 502     msg = (agent_udp_message_t *) listener->buffer;
 503     switch (msg->message_type) {
 504     case AGENT_MSG_TYPE_PING:
 505         agent_thread_handle_ping(listener, numbytes, &src_addr);
 506         break;
 507     case AGENT_MSG_TYPE_ACK:
 508         agent_thread_handle_ack(listener, numbytes, &src_addr);
 509         break;
 510     default:
 511         /* Ignore unknown pings */
 512         inet_ntop(AF_INET, &src_addr_in->sin_addr, str, sizeof(str));
 513         opal_output_verbose(20, USNIC_OUT,
 514                             "usNIC connectivity agent received unknown message: %d bytes from %s",
 515                             (int) numbytes, str);
 516         break;
 517     }
 518 }
 519 
 520 static agent_udp_port_listener_t *
 521 agent_thread_find_listener(uint32_t ipv4_addr, uint32_t *udp_port)
 522 {
 523     agent_udp_port_listener_t *listener;
 524     OPAL_LIST_FOREACH(listener, &udp_port_listeners, agent_udp_port_listener_t) {
 525         if (listener->ipv4_addr == ipv4_addr) {
 526             *udp_port = listener->udp_port;
 527             return listener;
 528         }
 529     }
 530 
 531     return NULL;
 532 }
 533 
 534 /*
 535  * Send reply back from the LISTEN command: send back the IP address
 536  * and UDP port that we're listening on.
 537  */
 538 static int agent_thread_cmd_listen_reply(int fd,
 539                                          uint32_t addr, int32_t udp_port)
 540 {
 541     int ret;
 542 
 543     opal_btl_usnic_connectivity_cmd_listen_reply_t cmd = {
 544         .cmd = CONNECTIVITY_AGENT_CMD_LISTEN,
 545         .ipv4_addr = addr,
 546         .udp_port = udp_port
 547     };
 548 
 549     ret = opal_fd_write(fd, sizeof(cmd), &cmd);
 550     if (OPAL_SUCCESS != ret) {
 551         OPAL_ERROR_LOG(ret);
 552         ABORT("usnic connectivity agent IPC write failed");
 553         /* Will not return */
 554     }
 555 
 556     return OPAL_SUCCESS;
 557 }
 558 
 559 /*
 560  * Receive and process the rest of a LISTEN command from a local IPC
 561  * client.
 562  */
 563 static void agent_thread_cmd_listen(agent_ipc_listener_t *ipc_listener)
 564 {
 565     /* Read the rest of the LISTEN command from the IPC socket */
 566     int ret;
 567     opal_btl_usnic_connectivity_cmd_listen_t cmd;
 568     ret = opal_fd_read(ipc_listener->client_fd, sizeof(cmd), &cmd);
 569     if (OPAL_SUCCESS != ret) {
 570         OPAL_ERROR_LOG(ret);
 571         ABORT("usnic connectivity agent IPC LISTEN read failed");
 572         /* Will not return */
 573     }
 574 
 575     /* If we're already listening on this address, send the UDP port
 576        back to the client. */
 577     uint32_t udp_port;
 578     agent_udp_port_listener_t *udp_listener;
 579     udp_listener = agent_thread_find_listener(cmd.ipv4_addr, &udp_port);
 580     if (NULL != udp_listener) {
 581         /* If we get a non-NULL "module" pointer value from the
 582            client, it means that this client is the same process as
 583            this agent, and we should save this pointer value (all
 584            non-agent MPI procs will send NULL as their "module"
 585            pointer value -- i.e., some non-agent MPI proc was the
 586            first one to send the LISTEN command). */
 587         if (NULL == udp_listener->module) {
 588             udp_listener->module = cmd.module;
 589         }
 590         agent_thread_cmd_listen_reply(ipc_listener->client_fd,
 591                                       cmd.ipv4_addr, udp_port);
 592         return;
 593     }
 594 
 595     /* We're not listening on this interface already, so create a
 596        UDP port listener entry */
 597     udp_listener = OBJ_NEW(agent_udp_port_listener_t);
 598     if (NULL == udp_listener) {
 599         OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
 600         ABORT("Out of memory");
 601         /* Will not return */
 602     }
 603 
 604     udp_listener->module = cmd.module;
 605     udp_listener->max_msg_size = cmd.max_msg_size;
 606     udp_listener->ipv4_addr = cmd.ipv4_addr;
 607     udp_listener->netmask = cmd.netmask;
 608     udp_listener->usnic_name = strdup(cmd.usnic_name);
 609 
 610     /* Fill in the ipv4_addr_str.  Since we don't have the IPv4
 611        address in sockaddr_in form, it's not worth using
 612        inet_ntop() */
 613     opal_btl_usnic_snprintf_ipv4_addr(udp_listener->ipv4_addr_str,
 614                                       sizeof(udp_listener->ipv4_addr_str),
 615                                       cmd.ipv4_addr, cmd.netmask);
 616 
 617     udp_listener->buffer = malloc(udp_listener->max_msg_size);
 618     if (NULL == udp_listener->buffer) {
 619         OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
 620         ABORT("Out of memory");
 621         /* Will not return */
 622     }
 623 
 624     /* Create the listening socket */
 625     udp_listener->fd = socket(AF_INET, SOCK_DGRAM, 0);
 626     if (udp_listener->fd < 0) {
 627         OPAL_ERROR_LOG(udp_listener->fd);
 628         ABORT("Could not open listening socket");
 629         /* Will not return */
 630     }
 631 
 632     /* Bind it to the designated interface */
 633     struct sockaddr_in inaddr;
 634     memset(&inaddr, 0, sizeof(inaddr));
 635     inaddr.sin_family = AF_INET;
 636     inaddr.sin_addr.s_addr = cmd.ipv4_addr;
 637     inaddr.sin_port = htons(0);
 638 
 639     ret = bind(udp_listener->fd, (struct sockaddr*) &inaddr, sizeof(inaddr));
 640     if (ret < 0) {
 641         OPAL_ERROR_LOG(ret);
 642         ABORT("Could not bind listening socket");
 643         /* Will not return */
 644     }
 645 
 646     /* Find out the port we got */
 647     opal_socklen_t addrlen = sizeof(struct sockaddr_in);
 648     ret = getsockname(udp_listener->fd, (struct sockaddr*) &inaddr, &addrlen);
 649     if (ret < 0) {
 650         OPAL_ERROR_LOG(ret);
 651         ABORT("Could not get UDP port number from listening socket");
 652         /* Will not return */
 653     }
 654     udp_listener->udp_port = ntohs(inaddr.sin_port);
 655 
 656     opal_output_verbose(20, USNIC_OUT,
 657                         "usNIC connectivity agent listening on %s:%d, (%s)",
 658                         udp_listener->ipv4_addr_str,
 659                         udp_listener->udp_port,
 660                         udp_listener->usnic_name);
 661 
 662     /* Set the "don't fragment" bit on outgoing frames because we
 663        want MTU-sized messages to get through successfully to the
 664        peer, or fail if they have to fragment because of an MTU
 665        mismatch somewhere enroute */
 666     int val = IP_PMTUDISC_DO;
 667     ret = setsockopt(udp_listener->fd, IPPROTO_IP, IP_MTU_DISCOVER,
 668                      &val, sizeof(val));
 669     if (0 != ret) {
 670         OPAL_ERROR_LOG(ret);
 671         ABORT("Unable to set \"do not fragment\" on UDP socket");
 672         /* Will not return */
 673     }
 674 
 675     /* Set the send and receive buffer sizes to our MTU size */
 676     int temp;
 677     temp = (int) udp_listener->max_msg_size;
 678     if ((ret = setsockopt(udp_listener->fd, SOL_SOCKET, SO_RCVBUF,
 679                           &temp, sizeof(temp))) < 0 ||
 680         (ret = setsockopt(udp_listener->fd, SOL_SOCKET, SO_SNDBUF,
 681                           &temp, sizeof(temp))) < 0) {
 682         OPAL_ERROR_LOG(ret);
 683         ABORT("Could not set socket buffer sizes");
 684         /* Will not return */
 685     }
 686 
 687     /* Create a listening event */
 688     opal_event_set(mca_btl_usnic_component.opal_evbase,
 689                    &udp_listener->event, udp_listener->fd,
 690                    OPAL_EV_READ | OPAL_EV_PERSIST,
 691                    agent_thread_receive_ping, udp_listener);
 692     opal_event_add(&udp_listener->event, 0);
 693 
 694     /* Save this listener on the list of udp_port_listeners */
 695     opal_list_append(&udp_port_listeners, &udp_listener->super);
 696 
 697     udp_listener->active = true;
 698 
 699     /* Return the port number to the sender */
 700     ret = agent_thread_cmd_listen_reply(ipc_listener->client_fd,
 701                                         cmd.ipv4_addr, udp_listener->udp_port);
 702 
 703     /* All done! */
 704     return;
 705 }
 706 
 707 /*
 708  * Send a ping
 709  */
 710 static void agent_thread_send_ping(int fd, short flags, void *context)
 711 {
 712     agent_ping_t *ap = (agent_ping_t*) context;
 713     ap->timer_active = false;
 714 
 715     char dest_ipv4_addr_str[IPV4STRADDRLEN];
 716     opal_btl_usnic_snprintf_ipv4_addr(dest_ipv4_addr_str,
 717                                       sizeof(dest_ipv4_addr_str),
 718                                       ap->dest_ipv4_addr, ap->dest_netmask);
 719 
 720     /* If we got all the ACKs for this ping, then move this ping from
 721        the "pending" list to the "results" list.  We can also free the
 722        buffers associated with this ping result, just to save some
 723        space in the long run.  */
 724     if (ap->acked[0] && ap->acked[1]) {
 725         opal_list_remove_item(&pings_pending, &ap->super);
 726         opal_list_append(&ping_results, &ap->super);
 727 
 728         opal_output_verbose(20, USNIC_OUT,
 729                             "usNIC connectivity GOOD between %s <--> %s",
 730                             ap->listener->ipv4_addr_str,
 731                             dest_ipv4_addr_str);
 732 
 733         for (int i = 0; i < 2; ++i) {
 734             if (NULL != ap->buffers[i]) {
 735                 free(ap->buffers[i]);
 736                 ap->buffers[i] = NULL;
 737             }
 738         }
 739 
 740         return;
 741     }
 742 
 743     /* If we've resent too many times, then just abort */
 744     if (ap->num_sends > mca_btl_usnic_component.connectivity_num_retries) {
 745         char *topic;
 746         if (ap->acked[0] && !ap->acked[1]) {
 747             // For the show_help topic checker script
 748             // SHOW_HELP:"help-mpi-btl-usnic.txt","connectivity error: small ok, large bad"
 749             topic = "connectivity error: small ok, large bad";
 750         } else if (!ap->acked[0] && ap->acked[1]) {
 751             // For the show_help topic checker script
 752             // SHOW_HELP:"help-mpi-btl-usnic.txt","connectivity error: small bad, large ok"
 753             topic = "connectivity error: small bad, large ok";
 754         } else {
 755             // For the show_help topic checker script
 756             // SHOW_HELP:"help-mpi-btl-usnic.txt","connectivity error: small bad, large bad"
 757             topic = "connectivity error: small bad, large bad";
 758         }
 759 
 760         char ipv4_addr_str[IPV4STRADDRLEN];
 761         opal_btl_usnic_snprintf_ipv4_addr(ipv4_addr_str, sizeof(ipv4_addr_str),
 762                                           ap->dest_ipv4_addr,
 763                                           ap->dest_netmask);
 764         opal_show_help("help-mpi-btl-usnic.txt", topic, true,
 765                        opal_process_info.nodename,
 766                        ap->listener->ipv4_addr_str,
 767                        ap->listener->usnic_name,
 768                        ap->dest_nodename,
 769                        ipv4_addr_str,
 770                        ap->sizes[0],
 771                        ap->sizes[1]);
 772         opal_btl_usnic_exit(NULL);
 773         /* Will not return */
 774     }
 775 
 776     time_t t = time(NULL);
 777     opal_output_verbose(20, USNIC_OUT,
 778                         "usNIC connectivity pinging %s:%d (%s) from %s (%s) at %s",
 779                         dest_ipv4_addr_str,
 780                         ntohs(ap->dest_sockaddr.sin_port),
 781                         ap->dest_nodename,
 782                         ap->listener->ipv4_addr_str,
 783                         ap->listener->usnic_name,
 784                         ctime(&t));
 785 
 786     /* Send the ping messages to the peer */
 787     for (int i = 0; i < NUM_PING_SIZES; ++i) {
 788         agent_sendto(ap->listener->fd, (char*) ap->buffers[i], ap->sizes[i],
 789                      (struct sockaddr*) &ap->dest_sockaddr);
 790     }
 791 
 792     /* Set a timer to check if these pings are ACKed */
 793     opal_event_set(mca_btl_usnic_component.opal_evbase, &ap->timer,
 794                    -1, 0, agent_thread_send_ping, ap);
 795     opal_event_add(&ap->timer, &ack_timeout);
 796     ap->timer_active = true;
 797 
 798     /* Count how many times we've done this */
 799     ++ap->num_sends;
 800 }
 801 
 802 /*
 803  * Receive and process the rest of a PING command from a local IPC
 804  * client.
 805  */
 806 static void agent_thread_cmd_ping(agent_ipc_listener_t *ipc_listener)
 807 {
 808     /* Read the rest of the PING command from the IPC socket */
 809     int ret;
 810     opal_btl_usnic_connectivity_cmd_ping_t cmd;
 811     ret = opal_fd_read(ipc_listener->client_fd, sizeof(cmd), &cmd);
 812     if (OPAL_SUCCESS != ret) {
 813         OPAL_ERROR_LOG(ret);
 814         ABORT("usnic connectivity agent IPC PING read failed");
 815         /* Will not return */
 816     }
 817 
 818     /* Have we already pinged this IP address / port? */
 819     agent_ping_t *ap;
 820     OPAL_LIST_FOREACH(ap, &ping_results, agent_ping_t) {
 821         if (ap->dest_ipv4_addr == cmd.dest_ipv4_addr &&
 822             ap->dest_udp_port == cmd.dest_udp_port) {
 823             /* We already have results from pinging this IP address /
 824                port, so there's no need for further action */
 825             return;
 826         }
 827     }
 828 
 829     /* Are we in the middle of pinging this IP address / port? */
 830     OPAL_LIST_FOREACH(ap, &pings_pending, agent_ping_t) {
 831         if (ap->dest_ipv4_addr == cmd.dest_ipv4_addr &&
 832             ap->dest_udp_port == cmd.dest_udp_port) {
 833             /* We're already in the middle of pinging this IP address
 834                / port, so there's no need for further action */
 835             return;
 836         }
 837     }
 838 
 839     /* This is a new ping request.  Find the listener with this source
 840        ipv4 address */
 841     bool found = false;
 842     agent_udp_port_listener_t *udp_listener;
 843     OPAL_LIST_FOREACH(udp_listener, &udp_port_listeners,
 844                       agent_udp_port_listener_t) {
 845         if (udp_listener->ipv4_addr == cmd.src_ipv4_addr) {
 846             found = true;
 847             break;
 848         }
 849     }
 850     if (!found) {
 851         ABORT("Could not ping listener for ping request");
 852         /* Will not return */
 853     }
 854 
 855     /* This is a new ping request; track it */
 856     ap = OBJ_NEW(agent_ping_t);
 857     if (NULL == ap) {
 858         OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
 859         ABORT("Out of memory");
 860         /* Will not return */
 861     }
 862     ap->src_ipv4_addr = cmd.src_ipv4_addr;
 863     ap->src_udp_port = cmd.src_udp_port;
 864     ap->listener = udp_listener;
 865     ap->dest_ipv4_addr = cmd.dest_ipv4_addr;
 866     ap->dest_netmask = cmd.dest_netmask;
 867     ap->dest_udp_port = cmd.dest_udp_port;
 868     ap->dest_sockaddr.sin_family = AF_INET;
 869     ap->dest_sockaddr.sin_addr.s_addr = cmd.dest_ipv4_addr;
 870     ap->dest_sockaddr.sin_port = htons(cmd.dest_udp_port);
 871     ap->dest_nodename = strdup(cmd.dest_nodename);
 872 
 873     /* The first message we send will be "short" (a simple control
 874        message); the second will be "long" (i.e., caller-specified
 875        length) */
 876     ap->sizes[0] = sizeof(agent_udp_message_t);
 877 
 878     /* Note that the MTU is the max Ethernet frame payload.  So from
 879        that MTU, we have to subtract off the max IP header (e.g., if
 880        all IP options are enabled, which is 60 bytes), and then also
 881        subtract off the UDP header (which is 8 bytes).  So we need to
 882        subtract off 68 bytes from the MTU, and that's the largest ping
 883        payload we can send. */
 884     ap->sizes[1] = cmd.max_msg_size - 68;
 885 
 886     /* Allocate a buffer for each size.  Make sure the smallest size
 887        is at least sizeof(agent_udp_message_t). */
 888     agent_udp_message_t *msg;
 889     for (size_t i = 0; i < NUM_PING_SIZES; ++i) {
 890         ap->buffers[i] = calloc(1, ap->sizes[i]);
 891         if (NULL == ap->buffers[i]) {
 892             OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
 893             ABORT("Out of memory");
 894             /* Will not return */
 895         }
 896 
 897         /* Fill in the message with return addressing information */
 898         msg = (agent_udp_message_t*) ap->buffers[i];
 899         msg->message_type = AGENT_MSG_TYPE_PING;
 900         msg->src_ipv4_addr = ap->src_ipv4_addr;
 901         msg->src_udp_port = ap->src_udp_port;
 902         msg->magic_number = MAGIC_ORIGINATOR;
 903         msg->major_version = OPAL_MAJOR_VERSION;
 904         msg->minor_version = OPAL_MINOR_VERSION;
 905         msg->size = ap->sizes[i];
 906     }
 907 
 908     /* Save this ping request on the "pending" list */
 909     opal_list_append(&pings_pending, &ap->super);
 910 
 911     /* Send the ping */
 912     agent_thread_send_ping(0, 0, ap);
 913 }
 914 
 915 /*
 916  * Receive and process the rest of an UNLISTEN command from a local IPC
 917  * client.
 918  */
 919 static void agent_thread_cmd_unlisten(agent_ipc_listener_t *ipc_listener)
 920 {
 921     /* Read the rest of the UNLISTEN command from the IPC socket */
 922     int ret;
 923     opal_btl_usnic_connectivity_cmd_unlisten_t cmd;
 924     ret = opal_fd_read(ipc_listener->client_fd, sizeof(cmd), &cmd);
 925     if (OPAL_SUCCESS != ret) {
 926         OPAL_ERROR_LOG(ret);
 927         ABORT("usnic connectivity agent IPC UNLISTEN read failed");
 928         /* Will not return */
 929     }
 930 
 931     /* If we are listening on this address (and we should be), then
 932        stop listening on it. */
 933     uint32_t udp_port;
 934     agent_udp_port_listener_t *udp_listener;
 935     udp_listener = agent_thread_find_listener(cmd.ipv4_addr, &udp_port);
 936     if (NULL != udp_listener) {
 937         OBJ_RELEASE(udp_listener);
 938     }
 939 
 940     /* All done! */
 941     return;
 942 }
 943 
 944 /*
 945  * Called when we get an incoming IPC message
 946  */
 947 static void agent_thread_ipc_receive(int fd, short flags, void *context)
 948 {
 949     int32_t command;
 950     agent_ipc_listener_t *ipc_listener = (agent_ipc_listener_t*) context;
 951 
 952     /* Read the command */
 953     command = -1;
 954     int ret = opal_fd_read(fd, sizeof(command), &command);
 955     if (OPAL_ERR_TIMEOUT == ret) {
 956         /* We get OPAL_ERR_TIMEOUT if the remote side hung up */
 957         OBJ_RELEASE(ipc_listener);
 958         return;
 959     } else if (OPAL_SUCCESS != ret) {
 960         OPAL_ERROR_LOG(ret);
 961         ABORT("usnic connectivity agent IPC command read failed");
 962         /* Will not return */
 963     }
 964 
 965     assert(CONNECTIVITY_AGENT_CMD_LISTEN == command ||
 966            CONNECTIVITY_AGENT_CMD_PING == command ||
 967            CONNECTIVITY_AGENT_CMD_UNLISTEN == command);
 968 
 969     switch (command) {
 970     case CONNECTIVITY_AGENT_CMD_LISTEN:
 971         agent_thread_cmd_listen(ipc_listener);
 972         break;
 973     case CONNECTIVITY_AGENT_CMD_PING:
 974         agent_thread_cmd_ping(ipc_listener);
 975         break;
 976     case CONNECTIVITY_AGENT_CMD_UNLISTEN:
 977         agent_thread_cmd_unlisten(ipc_listener);
 978         break;
 979     default:
 980         ABORT("Unexpected connectivity agent command");
 981         break;
 982     }
 983 }
 984 
 985 /*
 986  * We got a new connection on the IPC named socket.  Add it to the
 987  * event base.
 988  */
 989 static void agent_thread_accept(int fd, short flags, void *context)
 990 {
 991     struct sockaddr addr;
 992     socklen_t len;
 993     agent_ipc_listener_t *listener = NULL;
 994 
 995     len = sizeof(addr);
 996     int client_fd = accept(fd, &addr, &len);
 997     if (client_fd < 0) {
 998         OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO);
 999         ABORT("accept() failed");
1000         /* Will not return */
1001     }
1002 
1003     /* If we got a good client, verify that it sent the magic token */
1004     int tlen = strlen(CONNECTIVITY_MAGIC_TOKEN);
1005     char *msg = alloca(tlen + 1);
1006     if (NULL == msg) {
1007         OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
1008         ABORT("Out of memory");
1009         /* Will not return */
1010     }
1011     if (OPAL_SUCCESS != opal_fd_read(client_fd, tlen, msg)) {
1012         OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO);
1013         ABORT("usnic connectivity agent IPC read failed");
1014         /* Will not return */
1015     }
1016     if (0 != memcmp(msg, CONNECTIVITY_MAGIC_TOKEN, tlen)) {
1017         opal_output_verbose(20, USNIC_OUT,
1018                             "usNIC connectivity got bad IPC client (wrong magic token); disconnected");
1019         close(client_fd);
1020         return;
1021     }
1022 
1023     /* Remember how many accepts we have successfully completed */
1024     ++ipc_accepts;
1025 
1026     /* Make a listener object for this peer */
1027     listener = OBJ_NEW(agent_ipc_listener_t);
1028     listener->client_fd = client_fd;
1029 
1030     /* Write back the magic token to ACK that we got the peer's
1031        magic token and all is kosher */
1032     if (OPAL_SUCCESS != opal_fd_write(client_fd, tlen,
1033                                       CONNECTIVITY_MAGIC_TOKEN)) {
1034         OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
1035         ABORT("usnic connectivity agent IPC read failed");
1036         /* Will not return */
1037     }
1038 
1039     /* Add this IPC listener to the event base */
1040     opal_event_set(mca_btl_usnic_component.opal_evbase,
1041                    &listener->event, client_fd,
1042                    OPAL_EV_READ | OPAL_EV_PERSIST,
1043                    agent_thread_ipc_receive, listener);
1044     opal_event_add(&listener->event, 0);
1045 
1046     /* Save this listener on the list of ipc_listeners */
1047     opal_list_append(&ipc_listeners, &listener->super);
1048 
1049     listener->active = true;
1050 
1051     return;
1052 }
1053 
1054 /*
1055  * Tear down all active events.
1056  *
1057  * This is done as an event callback in the agent threaf so that there
1058  * is no race condition in the teardown.  Specifically: the progress
1059  * thread will only fire one event at a time.  Therefore, this one
1060  * event can "atomically" delete all the events and data structures
1061  * and not have to worry about concurrent access from some event
1062  * firing in the middle of the teardown process.
1063  */
1064 static void agent_thread_finalize(int fd, short flags, void *context)
1065 {
1066     /* Free the event that triggered this call */
1067     free(context);
1068 
1069     /* Ensure that all the local IPC clients have connected to me (so
1070        that we don't shut down before someone tries to connect to me),
1071        or 10 seconds have passed (i.e., if 10 seconds pass and they
1072        don't all connect to me, then something else is wrong, and we
1073        should just give up). */
1074     static bool first = true;
1075     static time_t timestamp = 0;
1076     if (first) {
1077         timestamp = time(NULL);
1078         first = false;
1079     }
1080 
1081     if (ipc_accepts < opal_process_info.num_local_peers &&
1082         time(NULL) < timestamp + 10) {
1083         opal_output_verbose(20, USNIC_OUT,
1084                             "usNIC connectivity agent delaying shutdown until all clients connect...");
1085 
1086         opal_event_t *ev = calloc(sizeof(*ev), 1);
1087         struct timeval finalize_retry = {
1088             .tv_sec = 0,
1089             .tv_usec = 10000
1090         };
1091 
1092         opal_event_set(mca_btl_usnic_component.opal_evbase,
1093                        ev, -1, 0, agent_thread_finalize, ev);
1094         opal_event_add(ev, &finalize_retry);
1095         return;
1096     }
1097     if (ipc_accepts < opal_process_info.num_local_peers) {
1098         opal_output_verbose(20, USNIC_OUT,
1099                             "usNIC connectivity agent: only %d of %d clients connected, but timeout has expired -- exiting anyway", ipc_accepts, opal_process_info.num_local_peers);
1100     }
1101 
1102     /* Remove the agent listening event from the opal async event
1103        base */
1104     opal_event_del(&ipc_event);
1105 
1106     /* Shut down all active udp_port_listeners */
1107     agent_udp_port_listener_t *udp_listener, *ulnext;
1108     OPAL_LIST_FOREACH_SAFE(udp_listener, ulnext, &udp_port_listeners,
1109                            agent_udp_port_listener_t) {
1110         OBJ_RELEASE(udp_listener);
1111     }
1112 
1113     /* Destroy the pending pings and ping results */
1114     agent_ping_t *request, *pnext;
1115     OPAL_LIST_FOREACH_SAFE(request, pnext, &pings_pending, agent_ping_t) {
1116         opal_list_remove_item(&pings_pending, &request->super);
1117         OBJ_RELEASE(request);
1118     }
1119 
1120     OPAL_LIST_FOREACH_SAFE(request, pnext, &ping_results, agent_ping_t) {
1121         opal_list_remove_item(&ping_results, &request->super);
1122         OBJ_RELEASE(request);
1123     }
1124 
1125     /* Shut down all active ipc_listeners */
1126     agent_ipc_listener_t *ipc_listener, *inext;
1127     OPAL_LIST_FOREACH_SAFE(ipc_listener, inext, &ipc_listeners,
1128                            agent_ipc_listener_t) {
1129         OBJ_RELEASE(ipc_listener);
1130     }
1131 
1132     agent_initialized = false;
1133 }
1134 
1135 /**************************************************************************
1136  * All of the following functions run in the main application thread
1137  **************************************************************************/
1138 
1139 /*
1140  * Setup the agent and start its event loop running in a dedicated
1141  * thread
1142  */
1143 int opal_btl_usnic_connectivity_agent_init(void)
1144 {
1145     /* Only do this initialization if I am the agent (the agent is
1146        local rank 0) */
1147     if (opal_process_info.my_local_rank != 0) {
1148         return OPAL_SUCCESS;
1149     }
1150     if (agent_initialized) {
1151         return OPAL_SUCCESS;
1152     }
1153 
1154     /* Make a struct timeval for use with timer events.  Note that the
1155        MCA param is expressed in terms of *milli*seconds, but the
1156        timeval timeout is expressed in terms of *micro*seconds. */
1157     ack_timeout.tv_sec =
1158         mca_btl_usnic_component.connectivity_ack_timeout / 1000;
1159     ack_timeout.tv_usec =
1160         1000 * (mca_btl_usnic_component.connectivity_ack_timeout % 1000);
1161 
1162     /* Create lists */
1163     OBJ_CONSTRUCT(&udp_port_listeners, opal_list_t);
1164     OBJ_CONSTRUCT(&ipc_listeners, opal_list_t);
1165     OBJ_CONSTRUCT(&pings_pending, opal_list_t);
1166     OBJ_CONSTRUCT(&ping_results, opal_list_t);
1167 
1168     /********************************************************************
1169      * Once all of the above is setup, create the unix domain socket
1170      * and start the event loop.
1171      ********************************************************************/
1172 
1173     /* Create the unix domain socket in the job session directory */
1174     ipc_accept_fd = socket(PF_UNIX, SOCK_STREAM, 0);
1175     if (ipc_accept_fd < 0) {
1176         OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO);
1177         ABORT("socket() failed");
1178         /* Will not return */
1179     }
1180 
1181     opal_asprintf(&ipc_filename, "%s/%s",
1182              opal_process_info.job_session_dir, CONNECTIVITY_SOCK_NAME);
1183     if (NULL == ipc_filename) {
1184         OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO);
1185         ABORT("Out of memory");
1186         /* Will not return */
1187     }
1188     unlink(ipc_filename);
1189 
1190     struct sockaddr_un address;
1191     assert(strlen(ipc_filename) < sizeof(address.sun_path));
1192 
1193     memset(&address, 0, sizeof(struct sockaddr_un));
1194     address.sun_family = AF_UNIX;
1195     opal_string_copy(address.sun_path, ipc_filename, sizeof(address.sun_path));
1196 
1197     if (bind(ipc_accept_fd, (struct sockaddr *) &address,
1198              sizeof(struct sockaddr_un)) != 0) {
1199         OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO);
1200         ABORT("bind() failed");
1201         /* Will not return */
1202     }
1203 
1204     /* Give an arbitrarily large backlog number so that connecting
1205        clients will never be backlogged (note for Future Jeff: please
1206        don't laugh at Past Jeff if 256 has become a trivially small
1207        number of on-server procs in a single job). */
1208     if (listen(ipc_accept_fd, 256) != 0) {
1209         OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO);
1210         ABORT("listen() failed");
1211         /* Will not return */
1212     }
1213 
1214     /* Add the socket to the event base */
1215     opal_event_set(mca_btl_usnic_component.opal_evbase,
1216                    &ipc_event, ipc_accept_fd,
1217                    OPAL_EV_READ | OPAL_EV_PERSIST,
1218                    agent_thread_accept, NULL);
1219     opal_event_add(&ipc_event, 0);
1220 
1221     opal_output_verbose(20, USNIC_OUT,
1222                         "usNIC connectivity agent initialized");
1223     agent_initialized = true;
1224     return OPAL_SUCCESS;
1225 }
1226 
1227 /*
1228  * Shut down the agent
1229  */
1230 int opal_btl_usnic_connectivity_agent_finalize(void)
1231 {
1232     /* Only do this if I have the agent running */
1233     if (!agent_initialized) {
1234         return OPAL_SUCCESS;
1235     }
1236 
1237     /* Submit an event to the async thread and tell it to delete all
1238        the usNIC events.  See the rationale for doing this in the
1239        comment in the agent_thread_finalize() function. */
1240     opal_event_t *ev = calloc(sizeof(*ev), 1);
1241     opal_event_set(mca_btl_usnic_component.opal_evbase,
1242                    ev, -1, OPAL_EV_WRITE, agent_thread_finalize, ev);
1243     opal_event_active(ev, OPAL_EV_WRITE, 1);
1244 
1245     /* Wait for the event to fire and complete */
1246     while (agent_initialized) {
1247         struct timespec tp = {
1248             .tv_sec  = 0,
1249             .tv_nsec = 1000
1250         };
1251         nanosleep(&tp, NULL);
1252     }
1253 
1254     /* Close the local IPC socket and remove the file */
1255     if (ipc_accept_fd != -1) {
1256         close(ipc_accept_fd);
1257         ipc_accept_fd = -1;
1258     }
1259     if (NULL != ipc_filename) {
1260         unlink(ipc_filename);
1261         free(ipc_filename);
1262         ipc_filename = NULL;
1263     }
1264 
1265     opal_output_verbose(20, USNIC_OUT,
1266                         "usNIC connectivity client finalized");
1267     return OPAL_SUCCESS;
1268 }

/* [<][>][^][v][top][bottom][index][help] */