This source file includes following definitions.
- udp_port_listener_zero
- udp_port_listener_constructor
- udp_port_listener_destructor
- ipc_listener_zero
- ipc_listener_constructor
- ipc_listener_destructor
- agent_ping_result_zero
- agent_ping_result_constructor
- agent_ping_result_destructor
- agent_sendto
- agent_thread_handle_ping
- agent_thread_handle_ack
- agent_thread_receive_ping
- agent_thread_find_listener
- agent_thread_cmd_listen_reply
- agent_thread_cmd_listen
- agent_thread_send_ping
- agent_thread_cmd_ping
- agent_thread_cmd_unlisten
- agent_thread_ipc_receive
- agent_thread_accept
- agent_thread_finalize
- opal_btl_usnic_connectivity_agent_init
- opal_btl_usnic_connectivity_agent_finalize
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 #include "opal_config.h"
  14 
  15 #include <assert.h>
  16 #include <sys/types.h>
  17 #include <sys/socket.h>
  18 #include <sys/un.h>
  19 #include <unistd.h>
  20 #ifdef HAVE_ALLOCA_H
  21 #include <alloca.h>
  22 #endif
  23 
  24 #include "opal_stdint.h"
  25 #include "opal/threads/mutex.h"
  26 #include "opal/mca/event/event.h"
  27 #include "opal/util/show_help.h"
  28 #include "opal/types.h"
  29 #include "opal/util/output.h"
  30 #include "opal/util/fd.h"
  31 #include "opal/util/string_copy.h"
  32 #include "opal/util/printf.h"
  33 
  34 #include "btl_usnic.h"
  35 #include "btl_usnic_connectivity.h"
  36 
  37 
  38 
  39 
  40 
  41 
  42 
  43 
  44 static int ipc_accept_fd = -1;
  45 static char *ipc_filename = NULL;
  46 static opal_event_t ipc_event;
  47 static struct timeval ack_timeout;
  48 static opal_list_t udp_port_listeners;
  49 static opal_list_t ipc_listeners;
  50 static volatile int ipc_accepts = 0;
  51 
  52 
  53 static opal_list_t pings_pending;
  54 static opal_list_t ping_results;
  55 static volatile bool agent_initialized = false;
  56 
  57 
  58 
  59 
  60 
  61 
  62 typedef struct {
  63     opal_list_item_t super;
  64 
  65     
  66     uint32_t ipv4_addr;
  67     uint32_t netmask;
  68     char ipv4_addr_str[IPV4STRADDRLEN];
  69     uint32_t max_msg_size;
  70     char *nodename;
  71     char *usnic_name;
  72 
  73     
  74     int fd;
  75     uint32_t udp_port;
  76     uint8_t *buffer;
  77     opal_event_t event;
  78     bool active;
  79     opal_btl_usnic_module_t *module;
  80 } agent_udp_port_listener_t;
  81 
  82 OBJ_CLASS_DECLARATION(agent_udp_port_listener_t);
  83 
  84 
  85 
  86 
  87 
  88 typedef struct {
  89     opal_list_item_t super;
  90 
  91     int client_fd;
  92     opal_event_t event;
  93     bool active;
  94 } agent_ipc_listener_t;
  95 
  96 OBJ_CLASS_DECLARATION(agent_ipc_listener_t);
  97 
  98 typedef enum {
  99     AGENT_MSG_TYPE_PING = 17,
 100     AGENT_MSG_TYPE_ACK
 101 } agent_udp_message_type_t;
 102 
 103 
 104 #define MAGIC_ORIGINATOR 0x9a9e2fbce63a11e5
 105 #define MAGIC_TARGET 0x60735c68f368aace
 106 
 107 
 108 
 109 
 110 typedef struct {
 111     uint8_t message_type;
 112 
 113     
 114 
 115 
 116 
 117     uint32_t src_ipv4_addr;
 118     uint32_t src_udp_port;
 119 
 120     
 121 
 122     uint64_t magic_number;
 123     uint32_t major_version, minor_version;
 124 
 125     
 126 
 127     uint32_t size;
 128 } agent_udp_message_t;
 129 
 130 typedef struct {
 131     opal_list_item_t super;
 132 
 133     
 134     uint32_t src_ipv4_addr; 
 135     uint32_t src_udp_port;
 136     agent_udp_port_listener_t *listener;
 137     uint32_t dest_ipv4_addr; 
 138     uint32_t dest_netmask;
 139     uint32_t dest_udp_port;
 140     struct sockaddr_in dest_sockaddr;
 141     char *dest_nodename;
 142 
 143     
 144 
 145 
 146 #define NUM_PING_SIZES 2
 147     size_t sizes[NUM_PING_SIZES];
 148     uint8_t *buffers[NUM_PING_SIZES];
 149     bool acked[NUM_PING_SIZES];
 150 
 151     
 152     int num_sends;
 153 
 154     
 155 
 156     opal_event_t timer;
 157     bool timer_active;
 158 } agent_ping_t;
 159 
 160 OBJ_CLASS_DECLARATION(agent_ping_t);
 161 
 162 
 163 
 164 
 165 
 166 
 167 static void udp_port_listener_zero(agent_udp_port_listener_t *obj)
 168 {
 169     obj->ipv4_addr =
 170         obj->netmask =
 171         obj->max_msg_size = 0;
 172     obj->nodename =
 173         obj->usnic_name = NULL;
 174     memset(obj->ipv4_addr_str, 0, sizeof(obj->ipv4_addr_str));
 175 
 176     obj->fd = -1;
 177     obj->udp_port = -1;
 178     obj->buffer = NULL;
 179 
 180     obj->active = false;
 181 }
 182 
 183 static void udp_port_listener_constructor(agent_udp_port_listener_t *obj)
 184 {
 185     udp_port_listener_zero(obj);
 186 }
 187 
 188 static void udp_port_listener_destructor(agent_udp_port_listener_t *obj)
 189 {
 190     
 191 
 192     agent_ping_t *ap, *apnext;
 193     OPAL_LIST_FOREACH_SAFE(ap, apnext, &pings_pending, agent_ping_t) {
 194         if (ap->src_ipv4_addr == obj->ipv4_addr) {
 195             opal_list_remove_item(&pings_pending, &ap->super);
 196             OBJ_RELEASE(ap);
 197         }
 198     }
 199 
 200     if (-1 != obj->fd) {
 201         close(obj->fd);
 202     }
 203     if (NULL != obj->nodename) {
 204         free(obj->nodename);
 205     }
 206     if (NULL != obj->usnic_name) {
 207         free(obj->usnic_name);
 208     }
 209     if (NULL != obj->buffer) {
 210         free(obj->buffer);
 211     }
 212 
 213     
 214 
 215     if (obj->active) {
 216         opal_event_del(&obj->event);
 217         opal_list_remove_item(&udp_port_listeners, &obj->super);
 218     }
 219 
 220     udp_port_listener_zero(obj);
 221 }
 222 
 223 OBJ_CLASS_INSTANCE(agent_udp_port_listener_t,
 224                    opal_list_item_t,
 225                    udp_port_listener_constructor,
 226                    udp_port_listener_destructor);
 227 
 228 static void ipc_listener_zero(agent_ipc_listener_t *obj)
 229 {
 230     obj->client_fd = -1;
 231     obj->active = false;
 232 }
 233 
 234 static void ipc_listener_constructor(agent_ipc_listener_t *obj)
 235 {
 236     ipc_listener_zero(obj);
 237 }
 238 
 239 static void ipc_listener_destructor(agent_ipc_listener_t *obj)
 240 {
 241     if (-1 != obj->client_fd) {
 242         close(obj->client_fd);
 243     }
 244 
 245     
 246 
 247     if (obj->active) {
 248         opal_event_del(&obj->event);
 249         opal_list_remove_item(&ipc_listeners, &obj->super);
 250     }
 251 
 252     ipc_listener_zero(obj);
 253 }
 254 
 255 OBJ_CLASS_INSTANCE(agent_ipc_listener_t,
 256                    opal_list_item_t,
 257                    ipc_listener_constructor,
 258                    ipc_listener_destructor);
 259 
 260 static void agent_ping_result_zero(agent_ping_t *obj)
 261 {
 262     obj->src_ipv4_addr = 0;
 263     obj->src_udp_port = 0;
 264     obj->listener = NULL;
 265     obj->dest_ipv4_addr = 0;
 266     obj->dest_udp_port = 0;
 267     obj->num_sends = 0;
 268     obj->timer_active = false;
 269 
 270     for (int i = 0; i < NUM_PING_SIZES; ++i) {
 271         obj->sizes[i] = 0;
 272         obj->buffers[i] = NULL;
 273         obj->acked[i] = false;
 274     }
 275 }
 276 
 277 static void agent_ping_result_constructor(agent_ping_t *obj)
 278 {
 279     agent_ping_result_zero(obj);
 280 }
 281 
 282 static void agent_ping_result_destructor(agent_ping_t *obj)
 283 {
 284     for (int i = 0; i < NUM_PING_SIZES; ++i) {
 285         if (NULL != obj->buffers[i]) {
 286             free(obj->buffers[i]);
 287         }
 288     }
 289     if (obj->timer_active) {
 290         opal_event_del(&obj->timer);
 291     }
 292 
 293     agent_ping_result_zero(obj);
 294 }
 295 
 296 OBJ_CLASS_INSTANCE(agent_ping_t,
 297                    opal_list_item_t,
 298                    agent_ping_result_constructor,
 299                    agent_ping_result_destructor);
 300 
 301 
 302 
 303 
 304 static void agent_sendto(int fd, char *buffer, ssize_t numbytes,
 305                          struct sockaddr *addr)
 306 {
 307     ssize_t rc;
 308     while (1) {
 309         rc = sendto(fd, buffer, numbytes, 0, addr, sizeof(*addr));
 310         
 311 
 312         if (rc == numbytes) {
 313             return;
 314         } else if (rc < 0) {
 315             if (errno == EAGAIN || errno == EINTR) {
 316                 continue;
 317             } else if (errno == EPERM) {
 318                 
 319                 usleep(5);
 320                 continue;
 321             }
 322 
 323             char *msg;
 324             opal_asprintf(&msg, "Unexpected sendto() error: errno=%d (%s)",
 325                      errno, strerror(errno));
 326             ABORT(msg);
 327             
 328         }
 329 
 330         
 331 
 332         usleep(1);
 333     }
 334 
 335     
 336 }
 337 
 338 
 339 
 340 
 341 
 342 
 343 
 344 
 345 static void agent_thread_handle_ping(agent_udp_port_listener_t *listener,
 346                                      ssize_t numbytes, struct sockaddr *from)
 347 {
 348     
 349 
 350     agent_udp_message_t *msg = (agent_udp_message_t*) listener->buffer;
 351     struct sockaddr_in *src_addr_in = (struct sockaddr_in*) from;
 352     if (msg->size != numbytes) {
 353         char str[INET_ADDRSTRLEN];
 354         inet_ntop(AF_INET, &src_addr_in->sin_addr, str, sizeof(str));
 355 
 356         opal_output_verbose(20, USNIC_OUT,
 357                             "usNIC connectivity got bad ping: %d bytes from %s, expected %d (discarded)",
 358                             (int) numbytes, str, (int) msg->size);
 359         return;
 360     }
 361 
 362     
 363 
 364 
 365 
 366 
 367     char msg_ipv4_addr_str[IPV4STRADDRLEN];
 368     char real_ipv4_addr_str[IPV4STRADDRLEN];
 369 
 370     opal_btl_usnic_snprintf_ipv4_addr(msg_ipv4_addr_str,
 371                                       sizeof(msg_ipv4_addr_str),
 372                                       msg->src_ipv4_addr, 0);
 373     opal_btl_usnic_snprintf_ipv4_addr(real_ipv4_addr_str,
 374                                       sizeof(real_ipv4_addr_str),
 375                                       src_addr_in->sin_addr.s_addr, 0);
 376 
 377     if (msg->src_ipv4_addr != src_addr_in->sin_addr.s_addr) {
 378         opal_output_verbose(20, USNIC_OUT,
 379                             "usNIC connectivity got bad ping (from unexpected address: %s != %s, discarded)",
 380                             msg_ipv4_addr_str, real_ipv4_addr_str);
 381         return;
 382     }
 383 
 384     if (msg->magic_number != MAGIC_ORIGINATOR) {
 385         opal_output_verbose(20, USNIC_OUT,
 386                             "usNIC connectivity got bad ping (magic number: %" PRIu64 ", discarded)",
 387                             msg->magic_number);
 388         return;
 389     }
 390     if (msg->major_version != OPAL_MAJOR_VERSION ||
 391         msg->minor_version != OPAL_MINOR_VERSION) {
 392         opal_output_verbose(20, USNIC_OUT,
 393                             "usNIC connectivity got bad ping (originator version: %d.%d, expected %d.%d, discarded)",
 394                             msg->major_version, msg->minor_version,
 395                             OPAL_MAJOR_VERSION, OPAL_MINOR_VERSION);
 396         return;
 397     }
 398 
 399     
 400 
 401 
 402 
 403     opal_output_verbose(20, USNIC_OUT,
 404                         "usNIC connectivity got PING (size=%ld) from %s; sending ACK",
 405                         numbytes, msg_ipv4_addr_str);
 406 
 407     
 408 
 409 
 410 
 411 
 412     msg->message_type = AGENT_MSG_TYPE_ACK;
 413     msg->magic_number = MAGIC_TARGET;
 414 
 415     agent_sendto(listener->fd, (char*) listener->buffer, sizeof(*msg), from);
 416 }
 417 
 418 
 419 
 420 
 421 static void agent_thread_handle_ack(agent_udp_port_listener_t *listener,
 422                                     ssize_t numbytes, struct sockaddr *from)
 423 {
 424     char str[INET_ADDRSTRLEN];
 425     struct sockaddr_in *src_addr_in = (struct sockaddr_in*) from;
 426     inet_ntop(AF_INET, &src_addr_in->sin_addr, str, sizeof(str));
 427 
 428     
 429 
 430     agent_udp_message_t *msg = (agent_udp_message_t*) listener->buffer;
 431     if (numbytes != sizeof(*msg)) {
 432         opal_output_verbose(20, USNIC_OUT,
 433                             "usNIC connectivity got bad ACK: %d bytes from %s, expected %d (discarded)",
 434                             (int) numbytes, str, (int) sizeof(*msg));
 435         return;
 436     }
 437     if (msg->magic_number != MAGIC_TARGET) {
 438         opal_output_verbose(20, USNIC_OUT,
 439                             "usNIC connectivity got bad ACK (magic number: %" PRIu64 ", discarded)",
 440                             msg->magic_number);
 441         return;
 442     }
 443 
 444     
 445 
 446     agent_ping_t *ap;
 447     uint32_t src_in_port = ntohs(src_addr_in->sin_port);
 448     OPAL_LIST_FOREACH(ap, &pings_pending, agent_ping_t) {
 449         if (ap->dest_ipv4_addr == src_addr_in->sin_addr.s_addr &&
 450             ap->dest_udp_port == src_in_port &&
 451             ap->src_ipv4_addr == msg->src_ipv4_addr &&
 452             ap->src_udp_port == msg->src_udp_port) {
 453             
 454             for (int i = 0; i < NUM_PING_SIZES; ++i) {
 455                 if (ap->sizes[i] == msg->size) {
 456                     ap->acked[i] = true;
 457                     return;
 458                 }
 459             }
 460         }
 461     }
 462 
 463     
 464 
 465     opal_output_verbose(20, USNIC_OUT,
 466                         "usNIC connectivity got unexpected ACK: %d bytes from %s (discarded)",
 467                         (int) numbytes, str);
 468 }
 469 
 470 
 471 
 472 
 473 static void agent_thread_receive_ping(int fd, short flags, void *context)
 474 {
 475     agent_udp_port_listener_t *listener =
 476         (agent_udp_port_listener_t *) context;
 477     assert(NULL != listener);
 478 
 479     
 480     ssize_t numbytes;
 481     struct sockaddr src_addr;
 482     struct sockaddr_in *src_addr_in = (struct sockaddr_in*) &src_addr;
 483     socklen_t addrlen = sizeof(src_addr);
 484 
 485     while (1) {
 486         numbytes = recvfrom(listener->fd, listener->buffer, listener->max_msg_size, 0,
 487                             &src_addr, &addrlen);
 488         if (numbytes > 0) {
 489             break;
 490         } else if (numbytes < 0) {
 491             if (errno == EAGAIN || errno == EINTR) {
 492                 continue;
 493             }
 494 
 495             ABORT("Unexpected error from recvfrom");
 496             
 497         }
 498     }
 499 
 500     char str[INET_ADDRSTRLEN];
 501     agent_udp_message_t *msg;
 502     msg = (agent_udp_message_t *) listener->buffer;
 503     switch (msg->message_type) {
 504     case AGENT_MSG_TYPE_PING:
 505         agent_thread_handle_ping(listener, numbytes, &src_addr);
 506         break;
 507     case AGENT_MSG_TYPE_ACK:
 508         agent_thread_handle_ack(listener, numbytes, &src_addr);
 509         break;
 510     default:
 511         
 512         inet_ntop(AF_INET, &src_addr_in->sin_addr, str, sizeof(str));
 513         opal_output_verbose(20, USNIC_OUT,
 514                             "usNIC connectivity agent received unknown message: %d bytes from %s",
 515                             (int) numbytes, str);
 516         break;
 517     }
 518 }
 519 
 520 static agent_udp_port_listener_t *
 521 agent_thread_find_listener(uint32_t ipv4_addr, uint32_t *udp_port)
 522 {
 523     agent_udp_port_listener_t *listener;
 524     OPAL_LIST_FOREACH(listener, &udp_port_listeners, agent_udp_port_listener_t) {
 525         if (listener->ipv4_addr == ipv4_addr) {
 526             *udp_port = listener->udp_port;
 527             return listener;
 528         }
 529     }
 530 
 531     return NULL;
 532 }
 533 
 534 
 535 
 536 
 537 
 538 static int agent_thread_cmd_listen_reply(int fd,
 539                                          uint32_t addr, int32_t udp_port)
 540 {
 541     int ret;
 542 
 543     opal_btl_usnic_connectivity_cmd_listen_reply_t cmd = {
 544         .cmd = CONNECTIVITY_AGENT_CMD_LISTEN,
 545         .ipv4_addr = addr,
 546         .udp_port = udp_port
 547     };
 548 
 549     ret = opal_fd_write(fd, sizeof(cmd), &cmd);
 550     if (OPAL_SUCCESS != ret) {
 551         OPAL_ERROR_LOG(ret);
 552         ABORT("usnic connectivity agent IPC write failed");
 553         
 554     }
 555 
 556     return OPAL_SUCCESS;
 557 }
 558 
 559 
 560 
 561 
 562 
 563 static void agent_thread_cmd_listen(agent_ipc_listener_t *ipc_listener)
 564 {
 565     
 566     int ret;
 567     opal_btl_usnic_connectivity_cmd_listen_t cmd;
 568     ret = opal_fd_read(ipc_listener->client_fd, sizeof(cmd), &cmd);
 569     if (OPAL_SUCCESS != ret) {
 570         OPAL_ERROR_LOG(ret);
 571         ABORT("usnic connectivity agent IPC LISTEN read failed");
 572         
 573     }
 574 
 575     
 576 
 577     uint32_t udp_port;
 578     agent_udp_port_listener_t *udp_listener;
 579     udp_listener = agent_thread_find_listener(cmd.ipv4_addr, &udp_port);
 580     if (NULL != udp_listener) {
 581         
 582 
 583 
 584 
 585 
 586 
 587         if (NULL == udp_listener->module) {
 588             udp_listener->module = cmd.module;
 589         }
 590         agent_thread_cmd_listen_reply(ipc_listener->client_fd,
 591                                       cmd.ipv4_addr, udp_port);
 592         return;
 593     }
 594 
 595     
 596 
 597     udp_listener = OBJ_NEW(agent_udp_port_listener_t);
 598     if (NULL == udp_listener) {
 599         OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
 600         ABORT("Out of memory");
 601         
 602     }
 603 
 604     udp_listener->module = cmd.module;
 605     udp_listener->max_msg_size = cmd.max_msg_size;
 606     udp_listener->ipv4_addr = cmd.ipv4_addr;
 607     udp_listener->netmask = cmd.netmask;
 608     udp_listener->usnic_name = strdup(cmd.usnic_name);
 609 
 610     
 611 
 612 
 613     opal_btl_usnic_snprintf_ipv4_addr(udp_listener->ipv4_addr_str,
 614                                       sizeof(udp_listener->ipv4_addr_str),
 615                                       cmd.ipv4_addr, cmd.netmask);
 616 
 617     udp_listener->buffer = malloc(udp_listener->max_msg_size);
 618     if (NULL == udp_listener->buffer) {
 619         OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
 620         ABORT("Out of memory");
 621         
 622     }
 623 
 624     
 625     udp_listener->fd = socket(AF_INET, SOCK_DGRAM, 0);
 626     if (udp_listener->fd < 0) {
 627         OPAL_ERROR_LOG(udp_listener->fd);
 628         ABORT("Could not open listening socket");
 629         
 630     }
 631 
 632     
 633     struct sockaddr_in inaddr;
 634     memset(&inaddr, 0, sizeof(inaddr));
 635     inaddr.sin_family = AF_INET;
 636     inaddr.sin_addr.s_addr = cmd.ipv4_addr;
 637     inaddr.sin_port = htons(0);
 638 
 639     ret = bind(udp_listener->fd, (struct sockaddr*) &inaddr, sizeof(inaddr));
 640     if (ret < 0) {
 641         OPAL_ERROR_LOG(ret);
 642         ABORT("Could not bind listening socket");
 643         
 644     }
 645 
 646     
 647     opal_socklen_t addrlen = sizeof(struct sockaddr_in);
 648     ret = getsockname(udp_listener->fd, (struct sockaddr*) &inaddr, &addrlen);
 649     if (ret < 0) {
 650         OPAL_ERROR_LOG(ret);
 651         ABORT("Could not get UDP port number from listening socket");
 652         
 653     }
 654     udp_listener->udp_port = ntohs(inaddr.sin_port);
 655 
 656     opal_output_verbose(20, USNIC_OUT,
 657                         "usNIC connectivity agent listening on %s:%d, (%s)",
 658                         udp_listener->ipv4_addr_str,
 659                         udp_listener->udp_port,
 660                         udp_listener->usnic_name);
 661 
 662     
 663 
 664 
 665 
 666     int val = IP_PMTUDISC_DO;
 667     ret = setsockopt(udp_listener->fd, IPPROTO_IP, IP_MTU_DISCOVER,
 668                      &val, sizeof(val));
 669     if (0 != ret) {
 670         OPAL_ERROR_LOG(ret);
 671         ABORT("Unable to set \"do not fragment\" on UDP socket");
 672         
 673     }
 674 
 675     
 676     int temp;
 677     temp = (int) udp_listener->max_msg_size;
 678     if ((ret = setsockopt(udp_listener->fd, SOL_SOCKET, SO_RCVBUF,
 679                           &temp, sizeof(temp))) < 0 ||
 680         (ret = setsockopt(udp_listener->fd, SOL_SOCKET, SO_SNDBUF,
 681                           &temp, sizeof(temp))) < 0) {
 682         OPAL_ERROR_LOG(ret);
 683         ABORT("Could not set socket buffer sizes");
 684         
 685     }
 686 
 687     
 688     opal_event_set(mca_btl_usnic_component.opal_evbase,
 689                    &udp_listener->event, udp_listener->fd,
 690                    OPAL_EV_READ | OPAL_EV_PERSIST,
 691                    agent_thread_receive_ping, udp_listener);
 692     opal_event_add(&udp_listener->event, 0);
 693 
 694     
 695     opal_list_append(&udp_port_listeners, &udp_listener->super);
 696 
 697     udp_listener->active = true;
 698 
 699     
 700     ret = agent_thread_cmd_listen_reply(ipc_listener->client_fd,
 701                                         cmd.ipv4_addr, udp_listener->udp_port);
 702 
 703     
 704     return;
 705 }
 706 
 707 
 708 
 709 
 710 static void agent_thread_send_ping(int fd, short flags, void *context)
 711 {
 712     agent_ping_t *ap = (agent_ping_t*) context;
 713     ap->timer_active = false;
 714 
 715     char dest_ipv4_addr_str[IPV4STRADDRLEN];
 716     opal_btl_usnic_snprintf_ipv4_addr(dest_ipv4_addr_str,
 717                                       sizeof(dest_ipv4_addr_str),
 718                                       ap->dest_ipv4_addr, ap->dest_netmask);
 719 
 720     
 721 
 722 
 723 
 724     if (ap->acked[0] && ap->acked[1]) {
 725         opal_list_remove_item(&pings_pending, &ap->super);
 726         opal_list_append(&ping_results, &ap->super);
 727 
 728         opal_output_verbose(20, USNIC_OUT,
 729                             "usNIC connectivity GOOD between %s <--> %s",
 730                             ap->listener->ipv4_addr_str,
 731                             dest_ipv4_addr_str);
 732 
 733         for (int i = 0; i < 2; ++i) {
 734             if (NULL != ap->buffers[i]) {
 735                 free(ap->buffers[i]);
 736                 ap->buffers[i] = NULL;
 737             }
 738         }
 739 
 740         return;
 741     }
 742 
 743     
 744     if (ap->num_sends > mca_btl_usnic_component.connectivity_num_retries) {
 745         char *topic;
 746         if (ap->acked[0] && !ap->acked[1]) {
 747             
 748             
 749             topic = "connectivity error: small ok, large bad";
 750         } else if (!ap->acked[0] && ap->acked[1]) {
 751             
 752             
 753             topic = "connectivity error: small bad, large ok";
 754         } else {
 755             
 756             
 757             topic = "connectivity error: small bad, large bad";
 758         }
 759 
 760         char ipv4_addr_str[IPV4STRADDRLEN];
 761         opal_btl_usnic_snprintf_ipv4_addr(ipv4_addr_str, sizeof(ipv4_addr_str),
 762                                           ap->dest_ipv4_addr,
 763                                           ap->dest_netmask);
 764         opal_show_help("help-mpi-btl-usnic.txt", topic, true,
 765                        opal_process_info.nodename,
 766                        ap->listener->ipv4_addr_str,
 767                        ap->listener->usnic_name,
 768                        ap->dest_nodename,
 769                        ipv4_addr_str,
 770                        ap->sizes[0],
 771                        ap->sizes[1]);
 772         opal_btl_usnic_exit(NULL);
 773         
 774     }
 775 
 776     time_t t = time(NULL);
 777     opal_output_verbose(20, USNIC_OUT,
 778                         "usNIC connectivity pinging %s:%d (%s) from %s (%s) at %s",
 779                         dest_ipv4_addr_str,
 780                         ntohs(ap->dest_sockaddr.sin_port),
 781                         ap->dest_nodename,
 782                         ap->listener->ipv4_addr_str,
 783                         ap->listener->usnic_name,
 784                         ctime(&t));
 785 
 786     
 787     for (int i = 0; i < NUM_PING_SIZES; ++i) {
 788         agent_sendto(ap->listener->fd, (char*) ap->buffers[i], ap->sizes[i],
 789                      (struct sockaddr*) &ap->dest_sockaddr);
 790     }
 791 
 792     
 793     opal_event_set(mca_btl_usnic_component.opal_evbase, &ap->timer,
 794                    -1, 0, agent_thread_send_ping, ap);
 795     opal_event_add(&ap->timer, &ack_timeout);
 796     ap->timer_active = true;
 797 
 798     
 799     ++ap->num_sends;
 800 }
 801 
 802 
 803 
 804 
 805 
 806 static void agent_thread_cmd_ping(agent_ipc_listener_t *ipc_listener)
 807 {
 808     
 809     int ret;
 810     opal_btl_usnic_connectivity_cmd_ping_t cmd;
 811     ret = opal_fd_read(ipc_listener->client_fd, sizeof(cmd), &cmd);
 812     if (OPAL_SUCCESS != ret) {
 813         OPAL_ERROR_LOG(ret);
 814         ABORT("usnic connectivity agent IPC PING read failed");
 815         
 816     }
 817 
 818     
 819     agent_ping_t *ap;
 820     OPAL_LIST_FOREACH(ap, &ping_results, agent_ping_t) {
 821         if (ap->dest_ipv4_addr == cmd.dest_ipv4_addr &&
 822             ap->dest_udp_port == cmd.dest_udp_port) {
 823             
 824 
 825             return;
 826         }
 827     }
 828 
 829     
 830     OPAL_LIST_FOREACH(ap, &pings_pending, agent_ping_t) {
 831         if (ap->dest_ipv4_addr == cmd.dest_ipv4_addr &&
 832             ap->dest_udp_port == cmd.dest_udp_port) {
 833             
 834 
 835             return;
 836         }
 837     }
 838 
 839     
 840 
 841     bool found = false;
 842     agent_udp_port_listener_t *udp_listener;
 843     OPAL_LIST_FOREACH(udp_listener, &udp_port_listeners,
 844                       agent_udp_port_listener_t) {
 845         if (udp_listener->ipv4_addr == cmd.src_ipv4_addr) {
 846             found = true;
 847             break;
 848         }
 849     }
 850     if (!found) {
 851         ABORT("Could not ping listener for ping request");
 852         
 853     }
 854 
 855     
 856     ap = OBJ_NEW(agent_ping_t);
 857     if (NULL == ap) {
 858         OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
 859         ABORT("Out of memory");
 860         
 861     }
 862     ap->src_ipv4_addr = cmd.src_ipv4_addr;
 863     ap->src_udp_port = cmd.src_udp_port;
 864     ap->listener = udp_listener;
 865     ap->dest_ipv4_addr = cmd.dest_ipv4_addr;
 866     ap->dest_netmask = cmd.dest_netmask;
 867     ap->dest_udp_port = cmd.dest_udp_port;
 868     ap->dest_sockaddr.sin_family = AF_INET;
 869     ap->dest_sockaddr.sin_addr.s_addr = cmd.dest_ipv4_addr;
 870     ap->dest_sockaddr.sin_port = htons(cmd.dest_udp_port);
 871     ap->dest_nodename = strdup(cmd.dest_nodename);
 872 
 873     
 874 
 875 
 876     ap->sizes[0] = sizeof(agent_udp_message_t);
 877 
 878     
 879 
 880 
 881 
 882 
 883 
 884     ap->sizes[1] = cmd.max_msg_size - 68;
 885 
 886     
 887 
 888     agent_udp_message_t *msg;
 889     for (size_t i = 0; i < NUM_PING_SIZES; ++i) {
 890         ap->buffers[i] = calloc(1, ap->sizes[i]);
 891         if (NULL == ap->buffers[i]) {
 892             OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
 893             ABORT("Out of memory");
 894             
 895         }
 896 
 897         
 898         msg = (agent_udp_message_t*) ap->buffers[i];
 899         msg->message_type = AGENT_MSG_TYPE_PING;
 900         msg->src_ipv4_addr = ap->src_ipv4_addr;
 901         msg->src_udp_port = ap->src_udp_port;
 902         msg->magic_number = MAGIC_ORIGINATOR;
 903         msg->major_version = OPAL_MAJOR_VERSION;
 904         msg->minor_version = OPAL_MINOR_VERSION;
 905         msg->size = ap->sizes[i];
 906     }
 907 
 908     
 909     opal_list_append(&pings_pending, &ap->super);
 910 
 911     
 912     agent_thread_send_ping(0, 0, ap);
 913 }
 914 
 915 
 916 
 917 
 918 
 919 static void agent_thread_cmd_unlisten(agent_ipc_listener_t *ipc_listener)
 920 {
 921     
 922     int ret;
 923     opal_btl_usnic_connectivity_cmd_unlisten_t cmd;
 924     ret = opal_fd_read(ipc_listener->client_fd, sizeof(cmd), &cmd);
 925     if (OPAL_SUCCESS != ret) {
 926         OPAL_ERROR_LOG(ret);
 927         ABORT("usnic connectivity agent IPC UNLISTEN read failed");
 928         
 929     }
 930 
 931     
 932 
 933     uint32_t udp_port;
 934     agent_udp_port_listener_t *udp_listener;
 935     udp_listener = agent_thread_find_listener(cmd.ipv4_addr, &udp_port);
 936     if (NULL != udp_listener) {
 937         OBJ_RELEASE(udp_listener);
 938     }
 939 
 940     
 941     return;
 942 }
 943 
 944 
 945 
 946 
 947 static void agent_thread_ipc_receive(int fd, short flags, void *context)
 948 {
 949     int32_t command;
 950     agent_ipc_listener_t *ipc_listener = (agent_ipc_listener_t*) context;
 951 
 952     
 953     command = -1;
 954     int ret = opal_fd_read(fd, sizeof(command), &command);
 955     if (OPAL_ERR_TIMEOUT == ret) {
 956         
 957         OBJ_RELEASE(ipc_listener);
 958         return;
 959     } else if (OPAL_SUCCESS != ret) {
 960         OPAL_ERROR_LOG(ret);
 961         ABORT("usnic connectivity agent IPC command read failed");
 962         
 963     }
 964 
 965     assert(CONNECTIVITY_AGENT_CMD_LISTEN == command ||
 966            CONNECTIVITY_AGENT_CMD_PING == command ||
 967            CONNECTIVITY_AGENT_CMD_UNLISTEN == command);
 968 
 969     switch (command) {
 970     case CONNECTIVITY_AGENT_CMD_LISTEN:
 971         agent_thread_cmd_listen(ipc_listener);
 972         break;
 973     case CONNECTIVITY_AGENT_CMD_PING:
 974         agent_thread_cmd_ping(ipc_listener);
 975         break;
 976     case CONNECTIVITY_AGENT_CMD_UNLISTEN:
 977         agent_thread_cmd_unlisten(ipc_listener);
 978         break;
 979     default:
 980         ABORT("Unexpected connectivity agent command");
 981         break;
 982     }
 983 }
 984 
 985 
 986 
 987 
 988 
 989 static void agent_thread_accept(int fd, short flags, void *context)
 990 {
 991     struct sockaddr addr;
 992     socklen_t len;
 993     agent_ipc_listener_t *listener = NULL;
 994 
 995     len = sizeof(addr);
 996     int client_fd = accept(fd, &addr, &len);
 997     if (client_fd < 0) {
 998         OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO);
 999         ABORT("accept() failed");
1000         
1001     }
1002 
1003     
1004     int tlen = strlen(CONNECTIVITY_MAGIC_TOKEN);
1005     char *msg = alloca(tlen + 1);
1006     if (NULL == msg) {
1007         OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
1008         ABORT("Out of memory");
1009         
1010     }
1011     if (OPAL_SUCCESS != opal_fd_read(client_fd, tlen, msg)) {
1012         OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO);
1013         ABORT("usnic connectivity agent IPC read failed");
1014         
1015     }
1016     if (0 != memcmp(msg, CONNECTIVITY_MAGIC_TOKEN, tlen)) {
1017         opal_output_verbose(20, USNIC_OUT,
1018                             "usNIC connectivity got bad IPC client (wrong magic token); disconnected");
1019         close(client_fd);
1020         return;
1021     }
1022 
1023     
1024     ++ipc_accepts;
1025 
1026     
1027     listener = OBJ_NEW(agent_ipc_listener_t);
1028     listener->client_fd = client_fd;
1029 
1030     
1031 
1032     if (OPAL_SUCCESS != opal_fd_write(client_fd, tlen,
1033                                       CONNECTIVITY_MAGIC_TOKEN)) {
1034         OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
1035         ABORT("usnic connectivity agent IPC read failed");
1036         
1037     }
1038 
1039     
1040     opal_event_set(mca_btl_usnic_component.opal_evbase,
1041                    &listener->event, client_fd,
1042                    OPAL_EV_READ | OPAL_EV_PERSIST,
1043                    agent_thread_ipc_receive, listener);
1044     opal_event_add(&listener->event, 0);
1045 
1046     
1047     opal_list_append(&ipc_listeners, &listener->super);
1048 
1049     listener->active = true;
1050 
1051     return;
1052 }
1053 
1054 
1055 
1056 
1057 
1058 
1059 
1060 
1061 
1062 
1063 
1064 static void agent_thread_finalize(int fd, short flags, void *context)
1065 {
1066     
1067     free(context);
1068 
1069     
1070 
1071 
1072 
1073 
1074     static bool first = true;
1075     static time_t timestamp = 0;
1076     if (first) {
1077         timestamp = time(NULL);
1078         first = false;
1079     }
1080 
1081     if (ipc_accepts < opal_process_info.num_local_peers &&
1082         time(NULL) < timestamp + 10) {
1083         opal_output_verbose(20, USNIC_OUT,
1084                             "usNIC connectivity agent delaying shutdown until all clients connect...");
1085 
1086         opal_event_t *ev = calloc(sizeof(*ev), 1);
1087         struct timeval finalize_retry = {
1088             .tv_sec = 0,
1089             .tv_usec = 10000
1090         };
1091 
1092         opal_event_set(mca_btl_usnic_component.opal_evbase,
1093                        ev, -1, 0, agent_thread_finalize, ev);
1094         opal_event_add(ev, &finalize_retry);
1095         return;
1096     }
1097     if (ipc_accepts < opal_process_info.num_local_peers) {
1098         opal_output_verbose(20, USNIC_OUT,
1099                             "usNIC connectivity agent: only %d of %d clients connected, but timeout has expired -- exiting anyway", ipc_accepts, opal_process_info.num_local_peers);
1100     }
1101 
1102     
1103 
1104     opal_event_del(&ipc_event);
1105 
1106     
1107     agent_udp_port_listener_t *udp_listener, *ulnext;
1108     OPAL_LIST_FOREACH_SAFE(udp_listener, ulnext, &udp_port_listeners,
1109                            agent_udp_port_listener_t) {
1110         OBJ_RELEASE(udp_listener);
1111     }
1112 
1113     
1114     agent_ping_t *request, *pnext;
1115     OPAL_LIST_FOREACH_SAFE(request, pnext, &pings_pending, agent_ping_t) {
1116         opal_list_remove_item(&pings_pending, &request->super);
1117         OBJ_RELEASE(request);
1118     }
1119 
1120     OPAL_LIST_FOREACH_SAFE(request, pnext, &ping_results, agent_ping_t) {
1121         opal_list_remove_item(&ping_results, &request->super);
1122         OBJ_RELEASE(request);
1123     }
1124 
1125     
1126     agent_ipc_listener_t *ipc_listener, *inext;
1127     OPAL_LIST_FOREACH_SAFE(ipc_listener, inext, &ipc_listeners,
1128                            agent_ipc_listener_t) {
1129         OBJ_RELEASE(ipc_listener);
1130     }
1131 
1132     agent_initialized = false;
1133 }
1134 
1135 
1136 
1137 
1138 
1139 
1140 
1141 
1142 
1143 int opal_btl_usnic_connectivity_agent_init(void)
1144 {
1145     
1146 
1147     if (opal_process_info.my_local_rank != 0) {
1148         return OPAL_SUCCESS;
1149     }
1150     if (agent_initialized) {
1151         return OPAL_SUCCESS;
1152     }
1153 
1154     
1155 
1156 
1157     ack_timeout.tv_sec =
1158         mca_btl_usnic_component.connectivity_ack_timeout / 1000;
1159     ack_timeout.tv_usec =
1160         1000 * (mca_btl_usnic_component.connectivity_ack_timeout % 1000);
1161 
1162     
1163     OBJ_CONSTRUCT(&udp_port_listeners, opal_list_t);
1164     OBJ_CONSTRUCT(&ipc_listeners, opal_list_t);
1165     OBJ_CONSTRUCT(&pings_pending, opal_list_t);
1166     OBJ_CONSTRUCT(&ping_results, opal_list_t);
1167 
1168     
1169 
1170 
1171 
1172 
1173     
1174     ipc_accept_fd = socket(PF_UNIX, SOCK_STREAM, 0);
1175     if (ipc_accept_fd < 0) {
1176         OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO);
1177         ABORT("socket() failed");
1178         
1179     }
1180 
1181     opal_asprintf(&ipc_filename, "%s/%s",
1182              opal_process_info.job_session_dir, CONNECTIVITY_SOCK_NAME);
1183     if (NULL == ipc_filename) {
1184         OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO);
1185         ABORT("Out of memory");
1186         
1187     }
1188     unlink(ipc_filename);
1189 
1190     struct sockaddr_un address;
1191     assert(strlen(ipc_filename) < sizeof(address.sun_path));
1192 
1193     memset(&address, 0, sizeof(struct sockaddr_un));
1194     address.sun_family = AF_UNIX;
1195     opal_string_copy(address.sun_path, ipc_filename, sizeof(address.sun_path));
1196 
1197     if (bind(ipc_accept_fd, (struct sockaddr *) &address,
1198              sizeof(struct sockaddr_un)) != 0) {
1199         OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO);
1200         ABORT("bind() failed");
1201         
1202     }
1203 
1204     
1205 
1206 
1207 
1208     if (listen(ipc_accept_fd, 256) != 0) {
1209         OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO);
1210         ABORT("listen() failed");
1211         
1212     }
1213 
1214     
1215     opal_event_set(mca_btl_usnic_component.opal_evbase,
1216                    &ipc_event, ipc_accept_fd,
1217                    OPAL_EV_READ | OPAL_EV_PERSIST,
1218                    agent_thread_accept, NULL);
1219     opal_event_add(&ipc_event, 0);
1220 
1221     opal_output_verbose(20, USNIC_OUT,
1222                         "usNIC connectivity agent initialized");
1223     agent_initialized = true;
1224     return OPAL_SUCCESS;
1225 }
1226 
1227 
1228 
1229 
1230 int opal_btl_usnic_connectivity_agent_finalize(void)
1231 {
1232     
1233     if (!agent_initialized) {
1234         return OPAL_SUCCESS;
1235     }
1236 
1237     
1238 
1239 
1240     opal_event_t *ev = calloc(sizeof(*ev), 1);
1241     opal_event_set(mca_btl_usnic_component.opal_evbase,
1242                    ev, -1, OPAL_EV_WRITE, agent_thread_finalize, ev);
1243     opal_event_active(ev, OPAL_EV_WRITE, 1);
1244 
1245     
1246     while (agent_initialized) {
1247         struct timespec tp = {
1248             .tv_sec  = 0,
1249             .tv_nsec = 1000
1250         };
1251         nanosleep(&tp, NULL);
1252     }
1253 
1254     
1255     if (ipc_accept_fd != -1) {
1256         close(ipc_accept_fd);
1257         ipc_accept_fd = -1;
1258     }
1259     if (NULL != ipc_filename) {
1260         unlink(ipc_filename);
1261         free(ipc_filename);
1262         ipc_filename = NULL;
1263     }
1264 
1265     opal_output_verbose(20, USNIC_OUT,
1266                         "usNIC connectivity client finalized");
1267     return OPAL_SUCCESS;
1268 }