This source file includes following definitions.
- init
- finalize
- connect_to_peer
- send_recv
- send_oneway
- send_connect_ack
- recv_connect_ack
- send_bytes
- read_bytes
- pmix_usock_send_handler
- pmix_usock_recv_handler
- pmix_usock_send_recv
- pmix_usock_send
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 #include <src/include/pmix_config.h>
28 #include "pmix_common.h"
29
30 #ifdef HAVE_FCNTL_H
31 #include <fcntl.h>
32 #endif
33 #ifdef HAVE_UNISTD_H
34 #include <unistd.h>
35 #endif
36 #ifdef HAVE_SYS_SOCKET_H
37 #include <sys/socket.h>
38 #endif
39 #ifdef HAVE_SYS_UN_H
40 #include <sys/un.h>
41 #endif
42 #ifdef HAVE_SYS_UIO_H
43 #include <sys/uio.h>
44 #endif
45 #ifdef HAVE_SYS_TYPES_H
46 #include <sys/types.h>
47 #endif
48 #ifdef HAVE_SYS_STAT_H
49 #include <sys/stat.h>
50 #endif
51
52 #include "src/util/argv.h"
53 #include "src/util/error.h"
54 #include "src/client/pmix_client_ops.h"
55 #include "src/include/pmix_globals.h"
56 #include "src/include/pmix_socket_errno.h"
57 #include "src/mca/bfrops/base/base.h"
58 #include "src/mca/psec/base/base.h"
59
60 #include "src/mca/ptl/base/base.h"
61 #include "ptl_usock.h"
62
63 static pmix_status_t init(void);
64 static void finalize(void);
65 static pmix_status_t connect_to_peer(struct pmix_peer_t *peer,
66 pmix_info_t *info, size_t ninfo);
67 static pmix_status_t send_recv(struct pmix_peer_t *peer,
68 pmix_buffer_t *bfr,
69 pmix_ptl_cbfunc_t cbfunc,
70 void *cbdata);
71 static pmix_status_t send_oneway(struct pmix_peer_t *peer,
72 pmix_buffer_t *bfr,
73 pmix_ptl_tag_t tag);
74
75 pmix_ptl_module_t pmix_ptl_usock_module = {
76 .init = init,
77 .finalize = finalize,
78 .send_recv = send_recv,
79 .send = send_oneway,
80 .connect_to_peer = connect_to_peer
81 };
82
83 static pmix_status_t recv_connect_ack(int sd);
84 static pmix_status_t send_connect_ack(int sd);
85
86 static pmix_status_t init(void)
87 {
88 return PMIX_SUCCESS;
89 }
90
91 static void finalize(void)
92 {
93 }
94
95 static void pmix_usock_send_recv(int fd, short args, void *cbdata);
96 static void pmix_usock_send(int fd, short args, void *cbdata);
97
98 static pmix_status_t connect_to_peer(struct pmix_peer_t *peer,
99 pmix_info_t *info, size_t ninfo)
100 {
101 struct sockaddr_un *address;
102 char *evar, **uri;
103 pmix_status_t rc;
104 int sd;
105 pmix_socklen_t len;
106 bool retried = false;
107
108 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
109 "[%s:%d] connect to server",
110 __FILE__, __LINE__);
111
112
113 if (!PMIX_PROC_IS_CLIENT(pmix_globals.mypeer)) {
114 return PMIX_ERR_NOT_SUPPORTED;
115 }
116
117
118
119 if (NULL != (evar = getenv("PMIX_SERVER_URI2USOCK"))) {
120
121 pmix_globals.mypeer->nptr->compat.bfrops = pmix_bfrops_base_assign_module("v21");
122 if (NULL == pmix_globals.mypeer->nptr->compat.bfrops) {
123 return PMIX_ERR_INIT;
124 }
125 } else if (NULL != (evar = getenv("PMIX_SERVER_URI"))) {
126
127 pmix_globals.mypeer->nptr->compat.bfrops = pmix_bfrops_base_assign_module("v12");
128 if (NULL == pmix_globals.mypeer->nptr->compat.bfrops) {
129 return PMIX_ERR_INIT;
130 }
131 } else {
132
133 return PMIX_ERR_SERVER_NOT_AVAIL;
134 }
135
136 pmix_client_globals.myserver->nptr->compat.bfrops = pmix_globals.mypeer->nptr->compat.bfrops;
137
138 pmix_globals.mypeer->protocol = PMIX_PROTOCOL_V1;
139
140 uri = pmix_argv_split(evar, ':');
141 if (3 != pmix_argv_count(uri)) {
142 pmix_argv_free(uri);
143 PMIX_ERROR_LOG(PMIX_ERROR);
144 return PMIX_ERROR;
145 }
146
147 if (NULL == pmix_client_globals.myserver->info) {
148 pmix_client_globals.myserver->info = PMIX_NEW(pmix_rank_info_t);
149 }
150 if (NULL == pmix_client_globals.myserver->nptr) {
151 pmix_client_globals.myserver->nptr = PMIX_NEW(pmix_namespace_t);
152 }
153 if (NULL == pmix_client_globals.myserver->nptr->nspace) {
154 pmix_client_globals.myserver->nptr->nspace = strdup(uri[0]);
155 }
156 if (NULL == pmix_client_globals.myserver->info->pname.nspace) {
157 pmix_client_globals.myserver->info->pname.nspace = strdup(uri[0]);
158 }
159
160
161 pmix_client_globals.myserver->info->pname.rank = strtoull(uri[1], NULL, 10);
162
163
164 memset(&mca_ptl_usock_component.connection, 0, sizeof(struct sockaddr_storage));
165 address = (struct sockaddr_un*)&mca_ptl_usock_component.connection;
166 address->sun_family = AF_UNIX;
167 snprintf(address->sun_path, sizeof(address->sun_path)-1, "%s", uri[2]);
168
169 if (0 != access(uri[2], R_OK)) {
170 pmix_argv_free(uri);
171 PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND);
172 return PMIX_ERR_NOT_FOUND;
173 }
174 pmix_argv_free(uri);
175
176 retry:
177
178 len = sizeof(struct sockaddr_un);
179 if (PMIX_SUCCESS != (rc = pmix_ptl_base_connect(&mca_ptl_usock_component.connection, len, &sd))) {
180 PMIX_ERROR_LOG(rc);
181 return rc;
182 }
183 pmix_client_globals.myserver->sd = sd;
184
185
186 if (PMIX_SUCCESS != (rc = send_connect_ack(sd))) {
187 CLOSE_THE_SOCKET(sd);
188 return rc;
189 }
190
191
192 if (PMIX_SUCCESS != (rc = recv_connect_ack(sd))) {
193 CLOSE_THE_SOCKET(sd);
194 if (PMIX_ERR_TEMP_UNAVAILABLE == rc) {
195
196 if (!retried) {
197 retried = true;
198 goto retry;
199 }
200 }
201 return rc;
202 }
203
204 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
205 "sock_peer_try_connect: Connection across to server succeeded");
206
207
208 pmix_globals.connected = true;
209
210 pmix_ptl_base_set_nonblocking(sd);
211
212
213 pmix_event_assign(&pmix_client_globals.myserver->recv_event,
214 pmix_globals.evbase,
215 pmix_client_globals.myserver->sd,
216 EV_READ | EV_PERSIST,
217 pmix_usock_recv_handler, pmix_client_globals.myserver);
218 pmix_event_add(&pmix_client_globals.myserver->recv_event, 0);
219 pmix_client_globals.myserver->recv_ev_active = true;
220 PMIX_POST_OBJECT(pmix_client_globals.myserver);
221 pmix_event_add(&pmix_client_globals.myserver->recv_event, 0);
222
223
224 pmix_event_assign(&pmix_client_globals.myserver->send_event,
225 pmix_globals.evbase,
226 pmix_client_globals.myserver->sd,
227 EV_WRITE|EV_PERSIST,
228 pmix_usock_send_handler, pmix_client_globals.myserver);
229 pmix_client_globals.myserver->send_ev_active = false;
230
231 return PMIX_SUCCESS;
232 }
233
234 static pmix_status_t send_recv(struct pmix_peer_t *peer,
235 pmix_buffer_t *bfr,
236 pmix_ptl_cbfunc_t cbfunc,
237 void *cbdata)
238 {
239 pmix_ptl_sr_t *ms;
240 pmix_peer_t *pr = (pmix_peer_t*)peer;
241
242 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
243 "[%s:%d] post send to server",
244 __FILE__, __LINE__);
245
246 ms = PMIX_NEW(pmix_ptl_sr_t);
247 PMIX_RETAIN(pr);
248 ms->peer = pr;
249 ms->bfr = bfr;
250 ms->cbfunc = cbfunc;
251 ms->cbdata = cbdata;
252 PMIX_THREADSHIFT(ms, pmix_usock_send_recv);
253 return PMIX_SUCCESS;
254 }
255
256 static pmix_status_t send_oneway(struct pmix_peer_t *peer,
257 pmix_buffer_t *bfr,
258 pmix_ptl_tag_t tag)
259 {
260 pmix_ptl_queue_t *q;
261 pmix_peer_t *pr = (pmix_peer_t*)peer;
262
263
264
265
266 q = PMIX_NEW(pmix_ptl_queue_t);
267 PMIX_RETAIN(pr);
268 q->peer = peer;
269 q->buf = bfr;
270 q->tag = tag;
271 PMIX_THREADSHIFT(q, pmix_usock_send);
272
273 return PMIX_SUCCESS;
274 }
275
276 static pmix_status_t send_connect_ack(int sd)
277 {
278 char *msg;
279 pmix_usock_hdr_t hdr;
280 size_t sdsize=0, csize=0;
281 pmix_byte_object_t cred;
282 pmix_status_t rc;
283 char *sec, *bfrops, *gds;
284 pmix_bfrop_buffer_type_t bftype;
285
286 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
287 "pmix: SEND CONNECT ACK");
288
289
290 memset(&hdr, 0, sizeof(pmix_usock_hdr_t));
291 hdr.pindex = -1;
292 hdr.tag = UINT32_MAX;
293
294
295 sdsize = strlen(pmix_globals.myid.nspace) + 1 + sizeof(int);
296
297
298
299 PMIX_BYTE_OBJECT_CONSTRUCT(&cred);
300 PMIX_PSEC_CREATE_CRED(rc, pmix_globals.mypeer,
301 NULL, 0, NULL, 0, &cred);
302 if (PMIX_SUCCESS != rc) {
303 return rc;
304 }
305
306
307
308 sec = pmix_globals.mypeer->nptr->compat.psec->name;
309
310
311 bfrops = pmix_globals.mypeer->nptr->compat.bfrops->version;
312
313 bftype = pmix_globals.mypeer->nptr->compat.type;
314
315
316 gds = (char*)pmix_client_globals.myserver->nptr->compat.gds->name;
317
318
319 hdr.nbytes = sdsize + (strlen(PMIX_VERSION) + 1) + \
320 (sizeof(size_t) + cred.size) + \
321 (strlen(sec) + 1) + \
322 (strlen(bfrops) + 1) + sizeof(bftype) + \
323 (strlen(gds) + 1);
324
325
326 sdsize = (sizeof(hdr) + hdr.nbytes);
327 if (NULL == (msg = (char*)malloc(sdsize))) {
328 PMIX_BYTE_OBJECT_DESTRUCT(&cred);
329 return PMIX_ERR_OUT_OF_RESOURCE;
330 }
331 memset(msg, 0, sdsize);
332
333
334 csize=0;
335 memcpy(msg, &hdr, sizeof(pmix_usock_hdr_t));
336 csize += sizeof(pmix_usock_hdr_t);
337
338 memcpy(msg+csize, pmix_globals.myid.nspace, strlen(pmix_globals.myid.nspace));
339 csize += strlen(pmix_globals.myid.nspace)+1;
340
341 memcpy(msg+csize, &pmix_globals.myid.rank, sizeof(int));
342 csize += sizeof(int);
343
344
345 memcpy(msg+csize, PMIX_VERSION, strlen(PMIX_VERSION));
346 csize += strlen(PMIX_VERSION)+1;
347
348
349 memcpy(msg+csize, &cred.size, sizeof(size_t));
350 csize += sizeof(size_t);
351 if (0 < cred.size) {
352 memcpy(msg+csize, cred.bytes, cred.size);
353 csize += cred.size;
354 }
355 PMIX_BYTE_OBJECT_DESTRUCT(&cred);
356
357
358 memcpy(msg+csize, sec, strlen(sec));
359 csize += strlen(sec)+1;
360
361
362 memcpy(msg+csize, bfrops, strlen(bfrops));
363 csize += strlen(bfrops)+1;
364
365
366 memcpy(msg+csize, &bftype, sizeof(bftype));
367 csize += sizeof(bftype);
368
369
370 memcpy(msg+csize, gds, strlen(gds));
371
372
373 if (PMIX_SUCCESS != pmix_ptl_base_send_blocking(sd, msg, sdsize)) {
374 free(msg);
375 return PMIX_ERR_UNREACH;
376 }
377 free(msg);
378 return PMIX_SUCCESS;
379 }
380
381
382
383
384 static pmix_status_t recv_connect_ack(int sd)
385 {
386 pmix_status_t reply;
387 pmix_status_t rc;
388 struct timeval tv, save;
389 pmix_socklen_t sz;
390 bool sockopt = true;
391
392 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
393 "pmix: RECV CONNECT ACK FROM SERVER");
394
395
396 sz = sizeof(save);
397 if (0 != getsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, (void*)&save, &sz)) {
398 if (ENOPROTOOPT == errno || EOPNOTSUPP == errno) {
399 sockopt = false;
400 } else {
401 return PMIX_ERR_UNREACH;
402 }
403 } else {
404
405 tv.tv_sec = 2;
406 tv.tv_usec = 0;
407 if (0 != setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))) {
408 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
409 "pmix: recv_connect_ack could not setsockopt SO_RCVTIMEO");
410 return PMIX_ERR_UNREACH;
411 }
412 }
413
414
415 rc = pmix_ptl_base_recv_blocking(sd, (char*)&reply, sizeof(int));
416 if (PMIX_SUCCESS != rc) {
417 if (sockopt) {
418
419 if (0 != setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &save, sz)) {
420 return PMIX_ERR_UNREACH;
421 }
422 }
423 return rc;
424 }
425
426
427 if (PMIX_ERR_READY_FOR_HANDSHAKE == reply) {
428 PMIX_PSEC_CLIENT_HANDSHAKE(rc, pmix_client_globals.myserver, sd);
429 if (PMIX_SUCCESS != rc) {
430 return rc;
431 }
432 } else if (PMIX_SUCCESS != reply) {
433 return reply;
434 }
435
436 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
437 "pmix: RECV CONNECT CONFIRMATION");
438
439
440 rc = pmix_ptl_base_recv_blocking(sd, (char*)&pmix_globals.pindex, sizeof(int));
441 if (PMIX_SUCCESS != rc) {
442 return rc;
443 }
444 if (sockopt) {
445
446 if (0 != setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &save, sz)) {
447 return PMIX_ERR_UNREACH;
448 }
449 }
450
451 return PMIX_SUCCESS;
452 }
453
454 static pmix_status_t send_bytes(int sd, char **buf, size_t *remain)
455 {
456 pmix_status_t ret = PMIX_SUCCESS;
457 int rc;
458 char *ptr = *buf;
459 while (0 < *remain) {
460 rc = write(sd, ptr, *remain);
461 if (rc < 0) {
462 if (pmix_socket_errno == EINTR) {
463 continue;
464 } else if (pmix_socket_errno == EAGAIN) {
465
466
467
468
469 ret = PMIX_ERR_RESOURCE_BUSY;
470 goto exit;
471 } else if (pmix_socket_errno == EWOULDBLOCK) {
472
473
474
475
476 ret = PMIX_ERR_WOULD_BLOCK;
477 goto exit;
478 }
479
480 pmix_output(0, "pmix_usock_msg_send_bytes: write failed: %s (%d) [sd = %d]",
481 strerror(pmix_socket_errno),
482 pmix_socket_errno, sd);
483 ret = PMIX_ERR_COMM_FAILURE;
484 goto exit;
485 }
486
487 (*remain) -= rc;
488 ptr += rc;
489 }
490
491 exit:
492 *buf = ptr;
493 return ret;
494 }
495
496 static pmix_status_t read_bytes(int sd, char **buf, size_t *remain)
497 {
498 pmix_status_t ret = PMIX_SUCCESS;
499 int rc;
500 char *ptr = *buf;
501
502
503 while (0 < *remain) {
504 rc = read(sd, ptr, *remain);
505 if (rc < 0) {
506 if(pmix_socket_errno == EINTR) {
507 continue;
508 } else if (pmix_socket_errno == EAGAIN) {
509
510
511
512
513 ret = PMIX_ERR_RESOURCE_BUSY;
514 goto exit;
515 } else if (pmix_socket_errno == EWOULDBLOCK) {
516
517
518
519
520 ret = PMIX_ERR_WOULD_BLOCK;
521 goto exit;
522 }
523
524
525
526
527 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
528 "pmix_usock_msg_recv: readv failed: %s (%d)",
529 strerror(pmix_socket_errno),
530 pmix_socket_errno);
531 ret = PMIX_ERR_UNREACH;
532 goto exit;
533 } else if (0 == rc) {
534
535 ret = PMIX_ERR_UNREACH;
536 goto exit;
537 }
538
539 *remain -= rc;
540 ptr += rc;
541 }
542
543 exit:
544 *buf = ptr;
545 return ret;
546 }
547
548
549
550
551
552 void pmix_usock_send_handler(int sd, short flags, void *cbdata)
553 {
554 pmix_peer_t *peer = (pmix_peer_t*)cbdata;
555 pmix_ptl_send_t *msg = peer->send_msg;
556 pmix_status_t rc;
557 uint32_t nbytes;
558
559
560 PMIX_ACQUIRE_OBJECT(peer);
561
562 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
563 "%s:%d usock:send_handler SENDING TO PEER %s:%d tag %u with %s msg",
564 pmix_globals.myid.nspace, pmix_globals.myid.rank,
565 peer->info->pname.nspace, peer->info->pname.rank,
566 (NULL == msg) ? UINT_MAX : msg->hdr.tag,
567 (NULL == msg) ? "NULL" : "NON-NULL");
568
569 if (NULL != msg) {
570 if (!msg->hdr_sent) {
571 if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) {
572
573 msg->hdr.pindex = ntohl(msg->hdr.pindex);
574 msg->hdr.tag = ntohl(msg->hdr.tag);
575 nbytes = msg->hdr.nbytes;
576 msg->hdr.nbytes = ntohl(nbytes);
577 }
578 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
579 "usock:send_handler SENDING HEADER WITH MSG IDX %d TAG %d SIZE %lu",
580 msg->hdr.pindex, msg->hdr.tag, (unsigned long)msg->hdr.nbytes);
581 if (PMIX_SUCCESS == (rc = send_bytes(peer->sd, &msg->sdptr, &msg->sdbytes))) {
582
583 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
584 "usock:send_handler HEADER SENT");
585 msg->hdr_sent = true;
586
587 if (NULL == msg->data) {
588
589 PMIX_RELEASE(msg);
590 peer->send_msg = NULL;
591 goto next;
592 } else {
593
594 msg->sdptr = msg->data->base_ptr;
595 msg->sdbytes = msg->hdr.nbytes;
596 }
597
598 } else if (PMIX_ERR_RESOURCE_BUSY == rc ||
599 PMIX_ERR_WOULD_BLOCK == rc) {
600
601 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
602 "usock:send_handler RES BUSY OR WOULD BLOCK");
603 if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) {
604
605 msg->hdr.pindex = htonl(msg->hdr.pindex);
606 msg->hdr.tag = htonl(msg->hdr.tag);
607 nbytes = msg->hdr.nbytes;
608 msg->hdr.nbytes = htonl(nbytes);
609 }
610
611
612 PMIX_POST_OBJECT(peer);
613 return;
614 } else {
615
616 pmix_event_del(&peer->send_event);
617 peer->send_ev_active = false;
618 PMIX_RELEASE(msg);
619 peer->send_msg = NULL;
620 pmix_ptl_base_lost_connection(peer, rc);
621
622
623 PMIX_POST_OBJECT(peer);
624 return;
625 }
626 }
627
628 if (msg->hdr_sent) {
629 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
630 "usock:send_handler SENDING BODY OF MSG");
631 if (PMIX_SUCCESS == (rc = send_bytes(peer->sd, &msg->sdptr, &msg->sdbytes))) {
632
633 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
634 "usock:send_handler BODY SENT");
635 PMIX_RELEASE(msg);
636 peer->send_msg = NULL;
637 } else if (PMIX_ERR_RESOURCE_BUSY == rc ||
638 PMIX_ERR_WOULD_BLOCK == rc) {
639
640 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
641 "usock:send_handler RES BUSY OR WOULD BLOCK");
642
643
644 PMIX_POST_OBJECT(peer);
645 return;
646 } else {
647
648 pmix_output(0, "pmix_usock_peer_send_handler: unable to send message ON SOCKET %d",
649 peer->sd);
650 pmix_event_del(&peer->send_event);
651 peer->send_ev_active = false;
652 PMIX_RELEASE(msg);
653 peer->send_msg = NULL;
654 pmix_ptl_base_lost_connection(peer, rc);
655
656
657 PMIX_POST_OBJECT(peer);
658 return;
659 }
660 }
661
662 next:
663
664
665
666
667
668
669 peer->send_msg = (pmix_ptl_send_t*)
670 pmix_list_remove_first(&peer->send_queue);
671 }
672
673
674 if (NULL == peer->send_msg && peer->send_ev_active) {
675 pmix_event_del(&peer->send_event);
676 peer->send_ev_active = false;
677 }
678
679
680
681 PMIX_POST_OBJECT(peer);
682 }
683
684
685
686
687
688
689 void pmix_usock_recv_handler(int sd, short flags, void *cbdata)
690 {
691 pmix_status_t rc;
692 pmix_peer_t *peer = (pmix_peer_t*)cbdata;
693 pmix_ptl_recv_t *msg = NULL;
694
695
696 PMIX_ACQUIRE_OBJECT(peer);
697
698 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
699 "usock:recv:handler called with peer %s:%d",
700 (NULL == peer) ? "NULL" : peer->info->pname.nspace,
701 (NULL == peer) ? PMIX_RANK_UNDEF : peer->info->pname.rank);
702
703 if (NULL == peer) {
704 return;
705 }
706
707 if (NULL == peer->recv_msg) {
708 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
709 "usock:recv:handler allocate new recv msg");
710 peer->recv_msg = PMIX_NEW(pmix_ptl_recv_t);
711 if (NULL == peer->recv_msg) {
712 pmix_output(0, "usock_recv_handler: unable to allocate recv message\n");
713 goto err_close;
714 }
715 PMIX_RETAIN(peer);
716 peer->recv_msg->peer = peer;
717
718 peer->recv_msg->rdptr = (char*)&peer->recv_msg->hdr;
719 peer->recv_msg->rdbytes = sizeof(pmix_usock_hdr_t);
720 }
721 msg = peer->recv_msg;
722 msg->sd = sd;
723
724 if (!msg->hdr_recvd) {
725 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
726 "usock:recv:handler read hdr on socket %d", peer->sd);
727 if (PMIX_SUCCESS == (rc = read_bytes(peer->sd, &msg->rdptr, &msg->rdbytes))) {
728
729 peer->recv_msg->hdr_recvd = true;
730 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
731 "RECVD MSG FOR TAG %d SIZE %d",
732 (int)peer->recv_msg->hdr.tag,
733 (int)peer->recv_msg->hdr.nbytes);
734
735 if (0 == peer->recv_msg->hdr.nbytes) {
736 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
737 "RECVD ZERO-BYTE MESSAGE FROM %s:%d for tag %d",
738 peer->info->pname.nspace, peer->info->pname.rank,
739 peer->recv_msg->hdr.tag);
740 peer->recv_msg->data = NULL;
741 peer->recv_msg->rdptr = NULL;
742 peer->recv_msg->rdbytes = 0;
743
744 PMIX_ACTIVATE_POST_MSG(peer->recv_msg);
745 peer->recv_msg = NULL;
746 PMIX_POST_OBJECT(peer);
747 return;
748 } else {
749 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
750 "usock:recv:handler allocate data region of size %lu",
751 (unsigned long)peer->recv_msg->hdr.nbytes);
752
753 peer->recv_msg->data = (char*)malloc(peer->recv_msg->hdr.nbytes);
754 memset(peer->recv_msg->data, 0, peer->recv_msg->hdr.nbytes);
755
756 peer->recv_msg->rdptr = peer->recv_msg->data;
757 peer->recv_msg->rdbytes = peer->recv_msg->hdr.nbytes;
758 }
759
760 } else if (PMIX_ERR_RESOURCE_BUSY == rc ||
761 PMIX_ERR_WOULD_BLOCK == rc) {
762
763 return;
764 } else {
765
766
767
768 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
769 "pmix_usock_msg_recv: peer closed connection");
770 goto err_close;
771 }
772 }
773
774 if (peer->recv_msg->hdr_recvd) {
775
776
777
778
779 if (PMIX_SUCCESS == (rc = read_bytes(peer->sd, &msg->rdptr, &msg->rdbytes))) {
780
781 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
782 "RECVD COMPLETE MESSAGE FROM SERVER OF %d BYTES FOR TAG %d ON PEER SOCKET %d",
783 (int)peer->recv_msg->hdr.nbytes,
784 peer->recv_msg->hdr.tag, peer->sd);
785
786 PMIX_ACTIVATE_POST_MSG(peer->recv_msg);
787 peer->recv_msg = NULL;
788
789
790 PMIX_POST_OBJECT(peer);
791 return;
792 } else if (PMIX_ERR_RESOURCE_BUSY == rc ||
793 PMIX_ERR_WOULD_BLOCK == rc) {
794
795
796
797 PMIX_POST_OBJECT(peer);
798 return;
799 } else {
800
801
802
803 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
804 "pmix_usock_msg_recv: peer closed connection");
805 goto err_close;
806 }
807 }
808
809 return;
810
811 err_close:
812
813 if (peer->recv_ev_active) {
814 pmix_event_del(&peer->recv_event);
815 peer->recv_ev_active = false;
816 }
817 if (peer->send_ev_active) {
818 pmix_event_del(&peer->send_event);
819 peer->send_ev_active = false;
820 }
821 if (NULL != peer->recv_msg) {
822 PMIX_RELEASE(peer->recv_msg);
823 peer->recv_msg = NULL;
824 }
825 pmix_ptl_base_lost_connection(peer, PMIX_ERR_UNREACH);
826
827
828 PMIX_POST_OBJECT(peer);
829 }
830
831 void pmix_usock_send_recv(int fd, short args, void *cbdata)
832 {
833 pmix_ptl_sr_t *ms = (pmix_ptl_sr_t*)cbdata;
834 pmix_ptl_posted_recv_t *req;
835 pmix_ptl_send_t *snd;
836 uint32_t tag;
837
838
839 PMIX_ACQUIRE_OBJECT(ms);
840
841 if (ms->peer->sd < 0) {
842
843 PMIX_RELEASE(ms);
844
845
846 PMIX_POST_OBJECT(NULL);
847 return;
848 }
849
850
851 pmix_ptl_globals.current_tag++;
852 if (UINT32_MAX == pmix_ptl_globals.current_tag ) {
853 pmix_ptl_globals.current_tag = PMIX_PTL_TAG_DYNAMIC;
854 }
855 tag = pmix_ptl_globals.current_tag;
856
857 if (NULL != ms->cbfunc) {
858
859 req = PMIX_NEW(pmix_ptl_posted_recv_t);
860 req->tag = tag;
861 req->cbfunc = ms->cbfunc;
862 req->cbdata = ms->cbdata;
863 pmix_output_verbose(5, pmix_ptl_base_framework.framework_output,
864 "posting recv on tag %d", req->tag);
865
866
867
868 pmix_list_prepend(&pmix_ptl_globals.posted_recvs, &req->super);
869 }
870
871 snd = PMIX_NEW(pmix_ptl_send_t);
872 snd->hdr.pindex = pmix_globals.pindex;
873 snd->hdr.tag = tag;
874 snd->hdr.nbytes = ms->bfr->bytes_used;
875 snd->data = ms->bfr;
876
877 snd->sdptr = (char*)&snd->hdr;
878 snd->sdbytes = sizeof(pmix_usock_hdr_t);
879
880
881 if (NULL == ms->peer->send_msg) {
882 ms->peer->send_msg = snd;
883 } else {
884
885 pmix_list_append(&ms->peer->send_queue, &snd->super);
886 }
887
888 if (!ms->peer->send_ev_active) {
889 ms->peer->send_ev_active = true;
890 PMIX_POST_OBJECT(snd);
891 pmix_event_add(&ms->peer->send_event, 0);
892 }
893
894 PMIX_RELEASE(ms);
895 PMIX_POST_OBJECT(snd);
896 }
897
898 static void pmix_usock_send(int sd, short args, void *cbdata)
899 {
900 pmix_ptl_queue_t *queue = (pmix_ptl_queue_t*)cbdata;
901 pmix_ptl_send_t *snd;
902
903
904 PMIX_ACQUIRE_OBJECT(queue);
905
906 if (NULL == queue->peer || queue->peer->sd < 0 ||
907 NULL == queue->peer->info || NULL == queue->peer->nptr) {
908
909 PMIX_RELEASE(queue);
910
911
912 PMIX_POST_OBJECT(queue);
913 return;
914 }
915
916 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
917 "[%s:%d] send to %s:%u on tag %d",
918 __FILE__, __LINE__,
919 (queue->peer)->info->pname.nspace,
920 (queue->peer)->info->pname.rank, (queue->tag));
921
922 snd = PMIX_NEW(pmix_ptl_send_t);
923 snd->hdr.pindex = htonl(pmix_globals.pindex);
924 snd->hdr.tag = htonl(queue->tag);
925 snd->hdr.nbytes = htonl((queue->buf)->bytes_used);
926 snd->data = (queue->buf);
927
928 snd->sdptr = (char*)&snd->hdr;
929 snd->sdbytes = sizeof(pmix_ptl_hdr_t);
930
931
932 if (NULL == (queue->peer)->send_msg) {
933 (queue->peer)->send_msg = snd;
934 } else {
935
936 pmix_list_append(&(queue->peer)->send_queue, &snd->super);
937 }
938
939 if (!(queue->peer)->send_ev_active) {
940 (queue->peer)->send_ev_active = true;
941 PMIX_POST_OBJECT(queue->peer);
942 pmix_event_add(&(queue->peer)->send_event, 0);
943 }
944 PMIX_RELEASE(queue);
945 PMIX_POST_OBJECT(snd);
946 }