This source file includes following definitions.
- udp_port_listener_zero
- udp_port_listener_constructor
- udp_port_listener_destructor
- ipc_listener_zero
- ipc_listener_constructor
- ipc_listener_destructor
- agent_ping_result_zero
- agent_ping_result_constructor
- agent_ping_result_destructor
- agent_sendto
- agent_thread_handle_ping
- agent_thread_handle_ack
- agent_thread_receive_ping
- agent_thread_find_listener
- agent_thread_cmd_listen_reply
- agent_thread_cmd_listen
- agent_thread_send_ping
- agent_thread_cmd_ping
- agent_thread_cmd_unlisten
- agent_thread_ipc_receive
- agent_thread_accept
- agent_thread_finalize
- opal_btl_usnic_connectivity_agent_init
- opal_btl_usnic_connectivity_agent_finalize
1
2
3
4
5
6
7
8
9
10
11
12
13 #include "opal_config.h"
14
15 #include <assert.h>
16 #include <sys/types.h>
17 #include <sys/socket.h>
18 #include <sys/un.h>
19 #include <unistd.h>
20 #ifdef HAVE_ALLOCA_H
21 #include <alloca.h>
22 #endif
23
24 #include "opal_stdint.h"
25 #include "opal/threads/mutex.h"
26 #include "opal/mca/event/event.h"
27 #include "opal/util/show_help.h"
28 #include "opal/types.h"
29 #include "opal/util/output.h"
30 #include "opal/util/fd.h"
31 #include "opal/util/string_copy.h"
32 #include "opal/util/printf.h"
33
34 #include "btl_usnic.h"
35 #include "btl_usnic_connectivity.h"
36
37
38
39
40
41
42
43
44 static int ipc_accept_fd = -1;
45 static char *ipc_filename = NULL;
46 static opal_event_t ipc_event;
47 static struct timeval ack_timeout;
48 static opal_list_t udp_port_listeners;
49 static opal_list_t ipc_listeners;
50 static volatile int ipc_accepts = 0;
51
52
53 static opal_list_t pings_pending;
54 static opal_list_t ping_results;
55 static volatile bool agent_initialized = false;
56
57
58
59
60
61
62 typedef struct {
63 opal_list_item_t super;
64
65
66 uint32_t ipv4_addr;
67 uint32_t netmask;
68 char ipv4_addr_str[IPV4STRADDRLEN];
69 uint32_t max_msg_size;
70 char *nodename;
71 char *usnic_name;
72
73
74 int fd;
75 uint32_t udp_port;
76 uint8_t *buffer;
77 opal_event_t event;
78 bool active;
79 opal_btl_usnic_module_t *module;
80 } agent_udp_port_listener_t;
81
82 OBJ_CLASS_DECLARATION(agent_udp_port_listener_t);
83
84
85
86
87
88 typedef struct {
89 opal_list_item_t super;
90
91 int client_fd;
92 opal_event_t event;
93 bool active;
94 } agent_ipc_listener_t;
95
96 OBJ_CLASS_DECLARATION(agent_ipc_listener_t);
97
98 typedef enum {
99 AGENT_MSG_TYPE_PING = 17,
100 AGENT_MSG_TYPE_ACK
101 } agent_udp_message_type_t;
102
103
104 #define MAGIC_ORIGINATOR 0x9a9e2fbce63a11e5
105 #define MAGIC_TARGET 0x60735c68f368aace
106
107
108
109
110 typedef struct {
111 uint8_t message_type;
112
113
114
115
116
117 uint32_t src_ipv4_addr;
118 uint32_t src_udp_port;
119
120
121
122 uint64_t magic_number;
123 uint32_t major_version, minor_version;
124
125
126
127 uint32_t size;
128 } agent_udp_message_t;
129
130 typedef struct {
131 opal_list_item_t super;
132
133
134 uint32_t src_ipv4_addr;
135 uint32_t src_udp_port;
136 agent_udp_port_listener_t *listener;
137 uint32_t dest_ipv4_addr;
138 uint32_t dest_netmask;
139 uint32_t dest_udp_port;
140 struct sockaddr_in dest_sockaddr;
141 char *dest_nodename;
142
143
144
145
146 #define NUM_PING_SIZES 2
147 size_t sizes[NUM_PING_SIZES];
148 uint8_t *buffers[NUM_PING_SIZES];
149 bool acked[NUM_PING_SIZES];
150
151
152 int num_sends;
153
154
155
156 opal_event_t timer;
157 bool timer_active;
158 } agent_ping_t;
159
160 OBJ_CLASS_DECLARATION(agent_ping_t);
161
162
163
164
165
166
167 static void udp_port_listener_zero(agent_udp_port_listener_t *obj)
168 {
169 obj->ipv4_addr =
170 obj->netmask =
171 obj->max_msg_size = 0;
172 obj->nodename =
173 obj->usnic_name = NULL;
174 memset(obj->ipv4_addr_str, 0, sizeof(obj->ipv4_addr_str));
175
176 obj->fd = -1;
177 obj->udp_port = -1;
178 obj->buffer = NULL;
179
180 obj->active = false;
181 }
182
183 static void udp_port_listener_constructor(agent_udp_port_listener_t *obj)
184 {
185 udp_port_listener_zero(obj);
186 }
187
188 static void udp_port_listener_destructor(agent_udp_port_listener_t *obj)
189 {
190
191
192 agent_ping_t *ap, *apnext;
193 OPAL_LIST_FOREACH_SAFE(ap, apnext, &pings_pending, agent_ping_t) {
194 if (ap->src_ipv4_addr == obj->ipv4_addr) {
195 opal_list_remove_item(&pings_pending, &ap->super);
196 OBJ_RELEASE(ap);
197 }
198 }
199
200 if (-1 != obj->fd) {
201 close(obj->fd);
202 }
203 if (NULL != obj->nodename) {
204 free(obj->nodename);
205 }
206 if (NULL != obj->usnic_name) {
207 free(obj->usnic_name);
208 }
209 if (NULL != obj->buffer) {
210 free(obj->buffer);
211 }
212
213
214
215 if (obj->active) {
216 opal_event_del(&obj->event);
217 opal_list_remove_item(&udp_port_listeners, &obj->super);
218 }
219
220 udp_port_listener_zero(obj);
221 }
222
223 OBJ_CLASS_INSTANCE(agent_udp_port_listener_t,
224 opal_list_item_t,
225 udp_port_listener_constructor,
226 udp_port_listener_destructor);
227
228 static void ipc_listener_zero(agent_ipc_listener_t *obj)
229 {
230 obj->client_fd = -1;
231 obj->active = false;
232 }
233
234 static void ipc_listener_constructor(agent_ipc_listener_t *obj)
235 {
236 ipc_listener_zero(obj);
237 }
238
239 static void ipc_listener_destructor(agent_ipc_listener_t *obj)
240 {
241 if (-1 != obj->client_fd) {
242 close(obj->client_fd);
243 }
244
245
246
247 if (obj->active) {
248 opal_event_del(&obj->event);
249 opal_list_remove_item(&ipc_listeners, &obj->super);
250 }
251
252 ipc_listener_zero(obj);
253 }
254
255 OBJ_CLASS_INSTANCE(agent_ipc_listener_t,
256 opal_list_item_t,
257 ipc_listener_constructor,
258 ipc_listener_destructor);
259
260 static void agent_ping_result_zero(agent_ping_t *obj)
261 {
262 obj->src_ipv4_addr = 0;
263 obj->src_udp_port = 0;
264 obj->listener = NULL;
265 obj->dest_ipv4_addr = 0;
266 obj->dest_udp_port = 0;
267 obj->num_sends = 0;
268 obj->timer_active = false;
269
270 for (int i = 0; i < NUM_PING_SIZES; ++i) {
271 obj->sizes[i] = 0;
272 obj->buffers[i] = NULL;
273 obj->acked[i] = false;
274 }
275 }
276
277 static void agent_ping_result_constructor(agent_ping_t *obj)
278 {
279 agent_ping_result_zero(obj);
280 }
281
282 static void agent_ping_result_destructor(agent_ping_t *obj)
283 {
284 for (int i = 0; i < NUM_PING_SIZES; ++i) {
285 if (NULL != obj->buffers[i]) {
286 free(obj->buffers[i]);
287 }
288 }
289 if (obj->timer_active) {
290 opal_event_del(&obj->timer);
291 }
292
293 agent_ping_result_zero(obj);
294 }
295
296 OBJ_CLASS_INSTANCE(agent_ping_t,
297 opal_list_item_t,
298 agent_ping_result_constructor,
299 agent_ping_result_destructor);
300
301
302
303
304 static void agent_sendto(int fd, char *buffer, ssize_t numbytes,
305 struct sockaddr *addr)
306 {
307 ssize_t rc;
308 while (1) {
309 rc = sendto(fd, buffer, numbytes, 0, addr, sizeof(*addr));
310
311
312 if (rc == numbytes) {
313 return;
314 } else if (rc < 0) {
315 if (errno == EAGAIN || errno == EINTR) {
316 continue;
317 } else if (errno == EPERM) {
318
319 usleep(5);
320 continue;
321 }
322
323 char *msg;
324 opal_asprintf(&msg, "Unexpected sendto() error: errno=%d (%s)",
325 errno, strerror(errno));
326 ABORT(msg);
327
328 }
329
330
331
332 usleep(1);
333 }
334
335
336 }
337
338
339
340
341
342
343
344
345 static void agent_thread_handle_ping(agent_udp_port_listener_t *listener,
346 ssize_t numbytes, struct sockaddr *from)
347 {
348
349
350 agent_udp_message_t *msg = (agent_udp_message_t*) listener->buffer;
351 struct sockaddr_in *src_addr_in = (struct sockaddr_in*) from;
352 if (msg->size != numbytes) {
353 char str[INET_ADDRSTRLEN];
354 inet_ntop(AF_INET, &src_addr_in->sin_addr, str, sizeof(str));
355
356 opal_output_verbose(20, USNIC_OUT,
357 "usNIC connectivity got bad ping: %d bytes from %s, expected %d (discarded)",
358 (int) numbytes, str, (int) msg->size);
359 return;
360 }
361
362
363
364
365
366
367 char msg_ipv4_addr_str[IPV4STRADDRLEN];
368 char real_ipv4_addr_str[IPV4STRADDRLEN];
369
370 opal_btl_usnic_snprintf_ipv4_addr(msg_ipv4_addr_str,
371 sizeof(msg_ipv4_addr_str),
372 msg->src_ipv4_addr, 0);
373 opal_btl_usnic_snprintf_ipv4_addr(real_ipv4_addr_str,
374 sizeof(real_ipv4_addr_str),
375 src_addr_in->sin_addr.s_addr, 0);
376
377 if (msg->src_ipv4_addr != src_addr_in->sin_addr.s_addr) {
378 opal_output_verbose(20, USNIC_OUT,
379 "usNIC connectivity got bad ping (from unexpected address: %s != %s, discarded)",
380 msg_ipv4_addr_str, real_ipv4_addr_str);
381 return;
382 }
383
384 if (msg->magic_number != MAGIC_ORIGINATOR) {
385 opal_output_verbose(20, USNIC_OUT,
386 "usNIC connectivity got bad ping (magic number: %" PRIu64 ", discarded)",
387 msg->magic_number);
388 return;
389 }
390 if (msg->major_version != OPAL_MAJOR_VERSION ||
391 msg->minor_version != OPAL_MINOR_VERSION) {
392 opal_output_verbose(20, USNIC_OUT,
393 "usNIC connectivity got bad ping (originator version: %d.%d, expected %d.%d, discarded)",
394 msg->major_version, msg->minor_version,
395 OPAL_MAJOR_VERSION, OPAL_MINOR_VERSION);
396 return;
397 }
398
399
400
401
402
403 opal_output_verbose(20, USNIC_OUT,
404 "usNIC connectivity got PING (size=%ld) from %s; sending ACK",
405 numbytes, msg_ipv4_addr_str);
406
407
408
409
410
411
412 msg->message_type = AGENT_MSG_TYPE_ACK;
413 msg->magic_number = MAGIC_TARGET;
414
415 agent_sendto(listener->fd, (char*) listener->buffer, sizeof(*msg), from);
416 }
417
418
419
420
421 static void agent_thread_handle_ack(agent_udp_port_listener_t *listener,
422 ssize_t numbytes, struct sockaddr *from)
423 {
424 char str[INET_ADDRSTRLEN];
425 struct sockaddr_in *src_addr_in = (struct sockaddr_in*) from;
426 inet_ntop(AF_INET, &src_addr_in->sin_addr, str, sizeof(str));
427
428
429
430 agent_udp_message_t *msg = (agent_udp_message_t*) listener->buffer;
431 if (numbytes != sizeof(*msg)) {
432 opal_output_verbose(20, USNIC_OUT,
433 "usNIC connectivity got bad ACK: %d bytes from %s, expected %d (discarded)",
434 (int) numbytes, str, (int) sizeof(*msg));
435 return;
436 }
437 if (msg->magic_number != MAGIC_TARGET) {
438 opal_output_verbose(20, USNIC_OUT,
439 "usNIC connectivity got bad ACK (magic number: %" PRIu64 ", discarded)",
440 msg->magic_number);
441 return;
442 }
443
444
445
446 agent_ping_t *ap;
447 uint32_t src_in_port = ntohs(src_addr_in->sin_port);
448 OPAL_LIST_FOREACH(ap, &pings_pending, agent_ping_t) {
449 if (ap->dest_ipv4_addr == src_addr_in->sin_addr.s_addr &&
450 ap->dest_udp_port == src_in_port &&
451 ap->src_ipv4_addr == msg->src_ipv4_addr &&
452 ap->src_udp_port == msg->src_udp_port) {
453
454 for (int i = 0; i < NUM_PING_SIZES; ++i) {
455 if (ap->sizes[i] == msg->size) {
456 ap->acked[i] = true;
457 return;
458 }
459 }
460 }
461 }
462
463
464
465 opal_output_verbose(20, USNIC_OUT,
466 "usNIC connectivity got unexpected ACK: %d bytes from %s (discarded)",
467 (int) numbytes, str);
468 }
469
470
471
472
473 static void agent_thread_receive_ping(int fd, short flags, void *context)
474 {
475 agent_udp_port_listener_t *listener =
476 (agent_udp_port_listener_t *) context;
477 assert(NULL != listener);
478
479
480 ssize_t numbytes;
481 struct sockaddr src_addr;
482 struct sockaddr_in *src_addr_in = (struct sockaddr_in*) &src_addr;
483 socklen_t addrlen = sizeof(src_addr);
484
485 while (1) {
486 numbytes = recvfrom(listener->fd, listener->buffer, listener->max_msg_size, 0,
487 &src_addr, &addrlen);
488 if (numbytes > 0) {
489 break;
490 } else if (numbytes < 0) {
491 if (errno == EAGAIN || errno == EINTR) {
492 continue;
493 }
494
495 ABORT("Unexpected error from recvfrom");
496
497 }
498 }
499
500 char str[INET_ADDRSTRLEN];
501 agent_udp_message_t *msg;
502 msg = (agent_udp_message_t *) listener->buffer;
503 switch (msg->message_type) {
504 case AGENT_MSG_TYPE_PING:
505 agent_thread_handle_ping(listener, numbytes, &src_addr);
506 break;
507 case AGENT_MSG_TYPE_ACK:
508 agent_thread_handle_ack(listener, numbytes, &src_addr);
509 break;
510 default:
511
512 inet_ntop(AF_INET, &src_addr_in->sin_addr, str, sizeof(str));
513 opal_output_verbose(20, USNIC_OUT,
514 "usNIC connectivity agent received unknown message: %d bytes from %s",
515 (int) numbytes, str);
516 break;
517 }
518 }
519
520 static agent_udp_port_listener_t *
521 agent_thread_find_listener(uint32_t ipv4_addr, uint32_t *udp_port)
522 {
523 agent_udp_port_listener_t *listener;
524 OPAL_LIST_FOREACH(listener, &udp_port_listeners, agent_udp_port_listener_t) {
525 if (listener->ipv4_addr == ipv4_addr) {
526 *udp_port = listener->udp_port;
527 return listener;
528 }
529 }
530
531 return NULL;
532 }
533
534
535
536
537
538 static int agent_thread_cmd_listen_reply(int fd,
539 uint32_t addr, int32_t udp_port)
540 {
541 int ret;
542
543 opal_btl_usnic_connectivity_cmd_listen_reply_t cmd = {
544 .cmd = CONNECTIVITY_AGENT_CMD_LISTEN,
545 .ipv4_addr = addr,
546 .udp_port = udp_port
547 };
548
549 ret = opal_fd_write(fd, sizeof(cmd), &cmd);
550 if (OPAL_SUCCESS != ret) {
551 OPAL_ERROR_LOG(ret);
552 ABORT("usnic connectivity agent IPC write failed");
553
554 }
555
556 return OPAL_SUCCESS;
557 }
558
559
560
561
562
563 static void agent_thread_cmd_listen(agent_ipc_listener_t *ipc_listener)
564 {
565
566 int ret;
567 opal_btl_usnic_connectivity_cmd_listen_t cmd;
568 ret = opal_fd_read(ipc_listener->client_fd, sizeof(cmd), &cmd);
569 if (OPAL_SUCCESS != ret) {
570 OPAL_ERROR_LOG(ret);
571 ABORT("usnic connectivity agent IPC LISTEN read failed");
572
573 }
574
575
576
577 uint32_t udp_port;
578 agent_udp_port_listener_t *udp_listener;
579 udp_listener = agent_thread_find_listener(cmd.ipv4_addr, &udp_port);
580 if (NULL != udp_listener) {
581
582
583
584
585
586
587 if (NULL == udp_listener->module) {
588 udp_listener->module = cmd.module;
589 }
590 agent_thread_cmd_listen_reply(ipc_listener->client_fd,
591 cmd.ipv4_addr, udp_port);
592 return;
593 }
594
595
596
597 udp_listener = OBJ_NEW(agent_udp_port_listener_t);
598 if (NULL == udp_listener) {
599 OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
600 ABORT("Out of memory");
601
602 }
603
604 udp_listener->module = cmd.module;
605 udp_listener->max_msg_size = cmd.max_msg_size;
606 udp_listener->ipv4_addr = cmd.ipv4_addr;
607 udp_listener->netmask = cmd.netmask;
608 udp_listener->usnic_name = strdup(cmd.usnic_name);
609
610
611
612
613 opal_btl_usnic_snprintf_ipv4_addr(udp_listener->ipv4_addr_str,
614 sizeof(udp_listener->ipv4_addr_str),
615 cmd.ipv4_addr, cmd.netmask);
616
617 udp_listener->buffer = malloc(udp_listener->max_msg_size);
618 if (NULL == udp_listener->buffer) {
619 OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
620 ABORT("Out of memory");
621
622 }
623
624
625 udp_listener->fd = socket(AF_INET, SOCK_DGRAM, 0);
626 if (udp_listener->fd < 0) {
627 OPAL_ERROR_LOG(udp_listener->fd);
628 ABORT("Could not open listening socket");
629
630 }
631
632
633 struct sockaddr_in inaddr;
634 memset(&inaddr, 0, sizeof(inaddr));
635 inaddr.sin_family = AF_INET;
636 inaddr.sin_addr.s_addr = cmd.ipv4_addr;
637 inaddr.sin_port = htons(0);
638
639 ret = bind(udp_listener->fd, (struct sockaddr*) &inaddr, sizeof(inaddr));
640 if (ret < 0) {
641 OPAL_ERROR_LOG(ret);
642 ABORT("Could not bind listening socket");
643
644 }
645
646
647 opal_socklen_t addrlen = sizeof(struct sockaddr_in);
648 ret = getsockname(udp_listener->fd, (struct sockaddr*) &inaddr, &addrlen);
649 if (ret < 0) {
650 OPAL_ERROR_LOG(ret);
651 ABORT("Could not get UDP port number from listening socket");
652
653 }
654 udp_listener->udp_port = ntohs(inaddr.sin_port);
655
656 opal_output_verbose(20, USNIC_OUT,
657 "usNIC connectivity agent listening on %s:%d, (%s)",
658 udp_listener->ipv4_addr_str,
659 udp_listener->udp_port,
660 udp_listener->usnic_name);
661
662
663
664
665
666 int val = IP_PMTUDISC_DO;
667 ret = setsockopt(udp_listener->fd, IPPROTO_IP, IP_MTU_DISCOVER,
668 &val, sizeof(val));
669 if (0 != ret) {
670 OPAL_ERROR_LOG(ret);
671 ABORT("Unable to set \"do not fragment\" on UDP socket");
672
673 }
674
675
676 int temp;
677 temp = (int) udp_listener->max_msg_size;
678 if ((ret = setsockopt(udp_listener->fd, SOL_SOCKET, SO_RCVBUF,
679 &temp, sizeof(temp))) < 0 ||
680 (ret = setsockopt(udp_listener->fd, SOL_SOCKET, SO_SNDBUF,
681 &temp, sizeof(temp))) < 0) {
682 OPAL_ERROR_LOG(ret);
683 ABORT("Could not set socket buffer sizes");
684
685 }
686
687
688 opal_event_set(mca_btl_usnic_component.opal_evbase,
689 &udp_listener->event, udp_listener->fd,
690 OPAL_EV_READ | OPAL_EV_PERSIST,
691 agent_thread_receive_ping, udp_listener);
692 opal_event_add(&udp_listener->event, 0);
693
694
695 opal_list_append(&udp_port_listeners, &udp_listener->super);
696
697 udp_listener->active = true;
698
699
700 ret = agent_thread_cmd_listen_reply(ipc_listener->client_fd,
701 cmd.ipv4_addr, udp_listener->udp_port);
702
703
704 return;
705 }
706
707
708
709
710 static void agent_thread_send_ping(int fd, short flags, void *context)
711 {
712 agent_ping_t *ap = (agent_ping_t*) context;
713 ap->timer_active = false;
714
715 char dest_ipv4_addr_str[IPV4STRADDRLEN];
716 opal_btl_usnic_snprintf_ipv4_addr(dest_ipv4_addr_str,
717 sizeof(dest_ipv4_addr_str),
718 ap->dest_ipv4_addr, ap->dest_netmask);
719
720
721
722
723
724 if (ap->acked[0] && ap->acked[1]) {
725 opal_list_remove_item(&pings_pending, &ap->super);
726 opal_list_append(&ping_results, &ap->super);
727
728 opal_output_verbose(20, USNIC_OUT,
729 "usNIC connectivity GOOD between %s <--> %s",
730 ap->listener->ipv4_addr_str,
731 dest_ipv4_addr_str);
732
733 for (int i = 0; i < 2; ++i) {
734 if (NULL != ap->buffers[i]) {
735 free(ap->buffers[i]);
736 ap->buffers[i] = NULL;
737 }
738 }
739
740 return;
741 }
742
743
744 if (ap->num_sends > mca_btl_usnic_component.connectivity_num_retries) {
745 char *topic;
746 if (ap->acked[0] && !ap->acked[1]) {
747
748
749 topic = "connectivity error: small ok, large bad";
750 } else if (!ap->acked[0] && ap->acked[1]) {
751
752
753 topic = "connectivity error: small bad, large ok";
754 } else {
755
756
757 topic = "connectivity error: small bad, large bad";
758 }
759
760 char ipv4_addr_str[IPV4STRADDRLEN];
761 opal_btl_usnic_snprintf_ipv4_addr(ipv4_addr_str, sizeof(ipv4_addr_str),
762 ap->dest_ipv4_addr,
763 ap->dest_netmask);
764 opal_show_help("help-mpi-btl-usnic.txt", topic, true,
765 opal_process_info.nodename,
766 ap->listener->ipv4_addr_str,
767 ap->listener->usnic_name,
768 ap->dest_nodename,
769 ipv4_addr_str,
770 ap->sizes[0],
771 ap->sizes[1]);
772 opal_btl_usnic_exit(NULL);
773
774 }
775
776 time_t t = time(NULL);
777 opal_output_verbose(20, USNIC_OUT,
778 "usNIC connectivity pinging %s:%d (%s) from %s (%s) at %s",
779 dest_ipv4_addr_str,
780 ntohs(ap->dest_sockaddr.sin_port),
781 ap->dest_nodename,
782 ap->listener->ipv4_addr_str,
783 ap->listener->usnic_name,
784 ctime(&t));
785
786
787 for (int i = 0; i < NUM_PING_SIZES; ++i) {
788 agent_sendto(ap->listener->fd, (char*) ap->buffers[i], ap->sizes[i],
789 (struct sockaddr*) &ap->dest_sockaddr);
790 }
791
792
793 opal_event_set(mca_btl_usnic_component.opal_evbase, &ap->timer,
794 -1, 0, agent_thread_send_ping, ap);
795 opal_event_add(&ap->timer, &ack_timeout);
796 ap->timer_active = true;
797
798
799 ++ap->num_sends;
800 }
801
802
803
804
805
806 static void agent_thread_cmd_ping(agent_ipc_listener_t *ipc_listener)
807 {
808
809 int ret;
810 opal_btl_usnic_connectivity_cmd_ping_t cmd;
811 ret = opal_fd_read(ipc_listener->client_fd, sizeof(cmd), &cmd);
812 if (OPAL_SUCCESS != ret) {
813 OPAL_ERROR_LOG(ret);
814 ABORT("usnic connectivity agent IPC PING read failed");
815
816 }
817
818
819 agent_ping_t *ap;
820 OPAL_LIST_FOREACH(ap, &ping_results, agent_ping_t) {
821 if (ap->dest_ipv4_addr == cmd.dest_ipv4_addr &&
822 ap->dest_udp_port == cmd.dest_udp_port) {
823
824
825 return;
826 }
827 }
828
829
830 OPAL_LIST_FOREACH(ap, &pings_pending, agent_ping_t) {
831 if (ap->dest_ipv4_addr == cmd.dest_ipv4_addr &&
832 ap->dest_udp_port == cmd.dest_udp_port) {
833
834
835 return;
836 }
837 }
838
839
840
841 bool found = false;
842 agent_udp_port_listener_t *udp_listener;
843 OPAL_LIST_FOREACH(udp_listener, &udp_port_listeners,
844 agent_udp_port_listener_t) {
845 if (udp_listener->ipv4_addr == cmd.src_ipv4_addr) {
846 found = true;
847 break;
848 }
849 }
850 if (!found) {
851 ABORT("Could not ping listener for ping request");
852
853 }
854
855
856 ap = OBJ_NEW(agent_ping_t);
857 if (NULL == ap) {
858 OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
859 ABORT("Out of memory");
860
861 }
862 ap->src_ipv4_addr = cmd.src_ipv4_addr;
863 ap->src_udp_port = cmd.src_udp_port;
864 ap->listener = udp_listener;
865 ap->dest_ipv4_addr = cmd.dest_ipv4_addr;
866 ap->dest_netmask = cmd.dest_netmask;
867 ap->dest_udp_port = cmd.dest_udp_port;
868 ap->dest_sockaddr.sin_family = AF_INET;
869 ap->dest_sockaddr.sin_addr.s_addr = cmd.dest_ipv4_addr;
870 ap->dest_sockaddr.sin_port = htons(cmd.dest_udp_port);
871 ap->dest_nodename = strdup(cmd.dest_nodename);
872
873
874
875
876 ap->sizes[0] = sizeof(agent_udp_message_t);
877
878
879
880
881
882
883
884 ap->sizes[1] = cmd.max_msg_size - 68;
885
886
887
888 agent_udp_message_t *msg;
889 for (size_t i = 0; i < NUM_PING_SIZES; ++i) {
890 ap->buffers[i] = calloc(1, ap->sizes[i]);
891 if (NULL == ap->buffers[i]) {
892 OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
893 ABORT("Out of memory");
894
895 }
896
897
898 msg = (agent_udp_message_t*) ap->buffers[i];
899 msg->message_type = AGENT_MSG_TYPE_PING;
900 msg->src_ipv4_addr = ap->src_ipv4_addr;
901 msg->src_udp_port = ap->src_udp_port;
902 msg->magic_number = MAGIC_ORIGINATOR;
903 msg->major_version = OPAL_MAJOR_VERSION;
904 msg->minor_version = OPAL_MINOR_VERSION;
905 msg->size = ap->sizes[i];
906 }
907
908
909 opal_list_append(&pings_pending, &ap->super);
910
911
912 agent_thread_send_ping(0, 0, ap);
913 }
914
915
916
917
918
919 static void agent_thread_cmd_unlisten(agent_ipc_listener_t *ipc_listener)
920 {
921
922 int ret;
923 opal_btl_usnic_connectivity_cmd_unlisten_t cmd;
924 ret = opal_fd_read(ipc_listener->client_fd, sizeof(cmd), &cmd);
925 if (OPAL_SUCCESS != ret) {
926 OPAL_ERROR_LOG(ret);
927 ABORT("usnic connectivity agent IPC UNLISTEN read failed");
928
929 }
930
931
932
933 uint32_t udp_port;
934 agent_udp_port_listener_t *udp_listener;
935 udp_listener = agent_thread_find_listener(cmd.ipv4_addr, &udp_port);
936 if (NULL != udp_listener) {
937 OBJ_RELEASE(udp_listener);
938 }
939
940
941 return;
942 }
943
944
945
946
947 static void agent_thread_ipc_receive(int fd, short flags, void *context)
948 {
949 int32_t command;
950 agent_ipc_listener_t *ipc_listener = (agent_ipc_listener_t*) context;
951
952
953 command = -1;
954 int ret = opal_fd_read(fd, sizeof(command), &command);
955 if (OPAL_ERR_TIMEOUT == ret) {
956
957 OBJ_RELEASE(ipc_listener);
958 return;
959 } else if (OPAL_SUCCESS != ret) {
960 OPAL_ERROR_LOG(ret);
961 ABORT("usnic connectivity agent IPC command read failed");
962
963 }
964
965 assert(CONNECTIVITY_AGENT_CMD_LISTEN == command ||
966 CONNECTIVITY_AGENT_CMD_PING == command ||
967 CONNECTIVITY_AGENT_CMD_UNLISTEN == command);
968
969 switch (command) {
970 case CONNECTIVITY_AGENT_CMD_LISTEN:
971 agent_thread_cmd_listen(ipc_listener);
972 break;
973 case CONNECTIVITY_AGENT_CMD_PING:
974 agent_thread_cmd_ping(ipc_listener);
975 break;
976 case CONNECTIVITY_AGENT_CMD_UNLISTEN:
977 agent_thread_cmd_unlisten(ipc_listener);
978 break;
979 default:
980 ABORT("Unexpected connectivity agent command");
981 break;
982 }
983 }
984
985
986
987
988
989 static void agent_thread_accept(int fd, short flags, void *context)
990 {
991 struct sockaddr addr;
992 socklen_t len;
993 agent_ipc_listener_t *listener = NULL;
994
995 len = sizeof(addr);
996 int client_fd = accept(fd, &addr, &len);
997 if (client_fd < 0) {
998 OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO);
999 ABORT("accept() failed");
1000
1001 }
1002
1003
1004 int tlen = strlen(CONNECTIVITY_MAGIC_TOKEN);
1005 char *msg = alloca(tlen + 1);
1006 if (NULL == msg) {
1007 OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
1008 ABORT("Out of memory");
1009
1010 }
1011 if (OPAL_SUCCESS != opal_fd_read(client_fd, tlen, msg)) {
1012 OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO);
1013 ABORT("usnic connectivity agent IPC read failed");
1014
1015 }
1016 if (0 != memcmp(msg, CONNECTIVITY_MAGIC_TOKEN, tlen)) {
1017 opal_output_verbose(20, USNIC_OUT,
1018 "usNIC connectivity got bad IPC client (wrong magic token); disconnected");
1019 close(client_fd);
1020 return;
1021 }
1022
1023
1024 ++ipc_accepts;
1025
1026
1027 listener = OBJ_NEW(agent_ipc_listener_t);
1028 listener->client_fd = client_fd;
1029
1030
1031
1032 if (OPAL_SUCCESS != opal_fd_write(client_fd, tlen,
1033 CONNECTIVITY_MAGIC_TOKEN)) {
1034 OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
1035 ABORT("usnic connectivity agent IPC read failed");
1036
1037 }
1038
1039
1040 opal_event_set(mca_btl_usnic_component.opal_evbase,
1041 &listener->event, client_fd,
1042 OPAL_EV_READ | OPAL_EV_PERSIST,
1043 agent_thread_ipc_receive, listener);
1044 opal_event_add(&listener->event, 0);
1045
1046
1047 opal_list_append(&ipc_listeners, &listener->super);
1048
1049 listener->active = true;
1050
1051 return;
1052 }
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064 static void agent_thread_finalize(int fd, short flags, void *context)
1065 {
1066
1067 free(context);
1068
1069
1070
1071
1072
1073
1074 static bool first = true;
1075 static time_t timestamp = 0;
1076 if (first) {
1077 timestamp = time(NULL);
1078 first = false;
1079 }
1080
1081 if (ipc_accepts < opal_process_info.num_local_peers &&
1082 time(NULL) < timestamp + 10) {
1083 opal_output_verbose(20, USNIC_OUT,
1084 "usNIC connectivity agent delaying shutdown until all clients connect...");
1085
1086 opal_event_t *ev = calloc(sizeof(*ev), 1);
1087 struct timeval finalize_retry = {
1088 .tv_sec = 0,
1089 .tv_usec = 10000
1090 };
1091
1092 opal_event_set(mca_btl_usnic_component.opal_evbase,
1093 ev, -1, 0, agent_thread_finalize, ev);
1094 opal_event_add(ev, &finalize_retry);
1095 return;
1096 }
1097 if (ipc_accepts < opal_process_info.num_local_peers) {
1098 opal_output_verbose(20, USNIC_OUT,
1099 "usNIC connectivity agent: only %d of %d clients connected, but timeout has expired -- exiting anyway", ipc_accepts, opal_process_info.num_local_peers);
1100 }
1101
1102
1103
1104 opal_event_del(&ipc_event);
1105
1106
1107 agent_udp_port_listener_t *udp_listener, *ulnext;
1108 OPAL_LIST_FOREACH_SAFE(udp_listener, ulnext, &udp_port_listeners,
1109 agent_udp_port_listener_t) {
1110 OBJ_RELEASE(udp_listener);
1111 }
1112
1113
1114 agent_ping_t *request, *pnext;
1115 OPAL_LIST_FOREACH_SAFE(request, pnext, &pings_pending, agent_ping_t) {
1116 opal_list_remove_item(&pings_pending, &request->super);
1117 OBJ_RELEASE(request);
1118 }
1119
1120 OPAL_LIST_FOREACH_SAFE(request, pnext, &ping_results, agent_ping_t) {
1121 opal_list_remove_item(&ping_results, &request->super);
1122 OBJ_RELEASE(request);
1123 }
1124
1125
1126 agent_ipc_listener_t *ipc_listener, *inext;
1127 OPAL_LIST_FOREACH_SAFE(ipc_listener, inext, &ipc_listeners,
1128 agent_ipc_listener_t) {
1129 OBJ_RELEASE(ipc_listener);
1130 }
1131
1132 agent_initialized = false;
1133 }
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143 int opal_btl_usnic_connectivity_agent_init(void)
1144 {
1145
1146
1147 if (opal_process_info.my_local_rank != 0) {
1148 return OPAL_SUCCESS;
1149 }
1150 if (agent_initialized) {
1151 return OPAL_SUCCESS;
1152 }
1153
1154
1155
1156
1157 ack_timeout.tv_sec =
1158 mca_btl_usnic_component.connectivity_ack_timeout / 1000;
1159 ack_timeout.tv_usec =
1160 1000 * (mca_btl_usnic_component.connectivity_ack_timeout % 1000);
1161
1162
1163 OBJ_CONSTRUCT(&udp_port_listeners, opal_list_t);
1164 OBJ_CONSTRUCT(&ipc_listeners, opal_list_t);
1165 OBJ_CONSTRUCT(&pings_pending, opal_list_t);
1166 OBJ_CONSTRUCT(&ping_results, opal_list_t);
1167
1168
1169
1170
1171
1172
1173
1174 ipc_accept_fd = socket(PF_UNIX, SOCK_STREAM, 0);
1175 if (ipc_accept_fd < 0) {
1176 OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO);
1177 ABORT("socket() failed");
1178
1179 }
1180
1181 opal_asprintf(&ipc_filename, "%s/%s",
1182 opal_process_info.job_session_dir, CONNECTIVITY_SOCK_NAME);
1183 if (NULL == ipc_filename) {
1184 OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO);
1185 ABORT("Out of memory");
1186
1187 }
1188 unlink(ipc_filename);
1189
1190 struct sockaddr_un address;
1191 assert(strlen(ipc_filename) < sizeof(address.sun_path));
1192
1193 memset(&address, 0, sizeof(struct sockaddr_un));
1194 address.sun_family = AF_UNIX;
1195 opal_string_copy(address.sun_path, ipc_filename, sizeof(address.sun_path));
1196
1197 if (bind(ipc_accept_fd, (struct sockaddr *) &address,
1198 sizeof(struct sockaddr_un)) != 0) {
1199 OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO);
1200 ABORT("bind() failed");
1201
1202 }
1203
1204
1205
1206
1207
1208 if (listen(ipc_accept_fd, 256) != 0) {
1209 OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO);
1210 ABORT("listen() failed");
1211
1212 }
1213
1214
1215 opal_event_set(mca_btl_usnic_component.opal_evbase,
1216 &ipc_event, ipc_accept_fd,
1217 OPAL_EV_READ | OPAL_EV_PERSIST,
1218 agent_thread_accept, NULL);
1219 opal_event_add(&ipc_event, 0);
1220
1221 opal_output_verbose(20, USNIC_OUT,
1222 "usNIC connectivity agent initialized");
1223 agent_initialized = true;
1224 return OPAL_SUCCESS;
1225 }
1226
1227
1228
1229
1230 int opal_btl_usnic_connectivity_agent_finalize(void)
1231 {
1232
1233 if (!agent_initialized) {
1234 return OPAL_SUCCESS;
1235 }
1236
1237
1238
1239
1240 opal_event_t *ev = calloc(sizeof(*ev), 1);
1241 opal_event_set(mca_btl_usnic_component.opal_evbase,
1242 ev, -1, OPAL_EV_WRITE, agent_thread_finalize, ev);
1243 opal_event_active(ev, OPAL_EV_WRITE, 1);
1244
1245
1246 while (agent_initialized) {
1247 struct timespec tp = {
1248 .tv_sec = 0,
1249 .tv_nsec = 1000
1250 };
1251 nanosleep(&tp, NULL);
1252 }
1253
1254
1255 if (ipc_accept_fd != -1) {
1256 close(ipc_accept_fd);
1257 ipc_accept_fd = -1;
1258 }
1259 if (NULL != ipc_filename) {
1260 unlink(ipc_filename);
1261 free(ipc_filename);
1262 ipc_filename = NULL;
1263 }
1264
1265 opal_output_verbose(20, USNIC_OUT,
1266 "usNIC connectivity client finalized");
1267 return OPAL_SUCCESS;
1268 }