This source file includes following definitions.
- _notify_complete
- lcfn
- pmix_ptl_base_lost_connection
- send_msg
- read_bytes
- pmix_ptl_base_send_handler
- pmix_ptl_base_recv_handler
- pmix_ptl_base_send
- pmix_ptl_base_send_recv
- pmix_ptl_base_process_msg
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 #include <src/include/pmix_config.h>
17
18 #include <src/include/pmix_stdint.h>
19 #include <src/include/pmix_socket_errno.h>
20
21 #ifdef HAVE_STRING_H
22 #include <string.h>
23 #endif
24 #include <fcntl.h>
25 #ifdef HAVE_UNISTD_H
26 #include <unistd.h>
27 #endif
28 #ifdef HAVE_SYS_SOCKET_H
29 #include <sys/socket.h>
30 #endif
31 #ifdef HAVE_SYS_UN_H
32 #include <sys/un.h>
33 #endif
34 #ifdef HAVE_SYS_UIO_H
35 #include <sys/uio.h>
36 #endif
37 #ifdef HAVE_SYS_TYPES_H
38 #include <sys/types.h>
39 #endif
40
41 #include "src/class/pmix_pointer_array.h"
42 #include "src/include/pmix_globals.h"
43 #include "src/client/pmix_client_ops.h"
44 #include "src/server/pmix_server_ops.h"
45 #include "src/util/error.h"
46 #include "src/util/show_help.h"
47 #include "src/mca/psensor/psensor.h"
48
49 #include "src/mca/ptl/base/base.h"
50
51 static void _notify_complete(pmix_status_t status, void *cbdata)
52 {
53 pmix_event_chain_t *chain = (pmix_event_chain_t*)cbdata;
54 PMIX_RELEASE(chain);
55 }
56
57 static void lcfn(pmix_status_t status, void *cbdata)
58 {
59 pmix_peer_t *peer = (pmix_peer_t*)cbdata;
60 PMIX_RELEASE(peer);
61 }
62
63 void pmix_ptl_base_lost_connection(pmix_peer_t *peer, pmix_status_t err)
64 {
65 pmix_server_trkr_t *trk, *tnxt;
66 pmix_server_caddy_t *rinfo, *rnext;
67 pmix_rank_info_t *info, *pinfo;
68 pmix_ptl_posted_recv_t *rcv;
69 pmix_buffer_t buf;
70 pmix_ptl_hdr_t hdr;
71 pmix_proc_t proc;
72 pmix_status_t rc;
73
74
75 if (peer->recv_ev_active) {
76 pmix_event_del(&peer->recv_event);
77 peer->recv_ev_active = false;
78 }
79 if (peer->send_ev_active) {
80 pmix_event_del(&peer->send_event);
81 peer->send_ev_active = false;
82 }
83 if (NULL != peer->recv_msg) {
84 PMIX_RELEASE(peer->recv_msg);
85 peer->recv_msg = NULL;
86 }
87 CLOSE_THE_SOCKET(peer->sd);
88
89 if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer) &&
90 !PMIX_PROC_IS_TOOL(pmix_globals.mypeer)) {
91
92
93
94
95
96
97 PMIX_LIST_FOREACH_SAFE(trk, tnxt, &pmix_server_globals.collectives, pmix_server_trkr_t) {
98
99 PMIX_LIST_FOREACH_SAFE(rinfo, rnext, &trk->local_cbs, pmix_server_caddy_t) {
100 if (!PMIX_CHECK_PROCID(&rinfo->peer->info->pname, &peer->info->pname)) {
101 continue;
102 }
103
104 --trk->nlocal;
105
106 pmix_list_remove_item(&trk->local_cbs, &rinfo->super);
107 PMIX_RELEASE(rinfo);
108
109
110
111 if (trk->host_called) {
112 continue;
113 }
114 if (trk->def_complete && trk->nlocal == pmix_list_get_size(&trk->local_cbs)) {
115
116 if (trk->local) {
117
118
119
120 if (PMIX_FENCENB_CMD == trk->type) {
121 if (NULL != trk->modexcbfunc) {
122 trk->modexcbfunc(PMIX_ERR_LOST_CONNECTION_TO_CLIENT, NULL, 0, trk, NULL, NULL);
123 }
124 } else if (PMIX_CONNECTNB_CMD == trk->type) {
125 if (NULL != trk->op_cbfunc) {
126 trk->op_cbfunc(PMIX_ERR_LOST_CONNECTION_TO_CLIENT, trk);
127 }
128 } else if (PMIX_DISCONNECTNB_CMD == trk->type) {
129 if (NULL != trk->op_cbfunc) {
130 trk->op_cbfunc(PMIX_ERR_LOST_CONNECTION_TO_CLIENT, trk);
131 }
132 }
133 } else {
134
135
136
137
138 if (PMIX_FENCENB_CMD == trk->type) {
139 trk->host_called = true;
140 rc = pmix_host_server.fence_nb(trk->pcs, trk->npcs,
141 trk->info, trk->ninfo,
142 NULL, 0, trk->modexcbfunc, trk);
143 if (PMIX_SUCCESS != rc) {
144 pmix_list_remove_item(&pmix_server_globals.collectives, &trk->super);
145 PMIX_RELEASE(trk);
146 }
147 } else if (PMIX_CONNECTNB_CMD == trk->type) {
148 trk->host_called = true;
149 rc = pmix_host_server.connect(trk->pcs, trk->npcs, trk->info, trk->ninfo, trk->op_cbfunc, trk);
150 if (PMIX_SUCCESS != rc) {
151 pmix_list_remove_item(&pmix_server_globals.collectives, &trk->super);
152 PMIX_RELEASE(trk);
153 }
154 } else if (PMIX_DISCONNECTNB_CMD == trk->type) {
155 trk->host_called = true;
156 rc = pmix_host_server.disconnect(trk->pcs, trk->npcs, trk->info, trk->ninfo, trk->op_cbfunc, trk);
157 if (PMIX_SUCCESS != rc) {
158 pmix_list_remove_item(&pmix_server_globals.collectives, &trk->super);
159 PMIX_RELEASE(trk);
160 }
161 }
162 }
163 }
164 }
165 }
166
167
168
169
170 PMIX_LIST_FOREACH_SAFE(info, pinfo, &(peer->nptr->ranks), pmix_rank_info_t) {
171 if (info == peer->info) {
172 pmix_list_remove_item(&(peer->nptr->ranks), &(peer->info->super));
173 }
174 }
175
176 if (0 < peer->nptr->nlocalprocs) {
177 --peer->nptr->nlocalprocs;
178 }
179
180
181 pmix_pointer_array_set_item(&pmix_server_globals.clients,
182 peer->index, NULL);
183
184
185 pmix_server_purge_events(peer, NULL);
186
187 if (PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer)) {
188
189 pmix_globals.connected = false;
190 } else {
191
192 pmix_psensor.stop(peer, NULL);
193 }
194
195 if (!peer->finalized && !PMIX_PROC_IS_TOOL(peer) && !pmix_globals.mypeer->finalized) {
196
197
198
199
200 PMIX_REPORT_EVENT(err, peer, PMIX_RANGE_PROC_LOCAL, _notify_complete);
201 }
202
203 PMIX_RELEASE(peer->info);
204
205
206
207
208 if (NULL != pmix_host_server.client_finalized && !peer->finalized) {
209 pmix_strncpy(proc.nspace, peer->info->pname.nspace, PMIX_MAX_NSLEN);
210 proc.rank = peer->info->pname.rank;
211
212 rc = pmix_host_server.client_finalized(&proc, peer->info->server_object,
213 lcfn, peer);
214 if (PMIX_SUCCESS == rc) {
215
216 peer->finalized = true;
217 return;
218 }
219 }
220
221
222 peer->finalized = true;
223
224 PMIX_RELEASE(peer);
225 } else {
226
227
228 pmix_globals.connected = false;
229
230 err = PMIX_ERR_LOST_CONNECTION_TO_SERVER;
231
232
233
234
235
236
237
238 PMIX_CONSTRUCT(&buf, pmix_buffer_t);
239
240 buf.type = pmix_client_globals.myserver->nptr->compat.type;
241 hdr.nbytes = 0;
242 PMIX_LIST_FOREACH(rcv, &pmix_ptl_globals.posted_recvs, pmix_ptl_posted_recv_t) {
243 if (UINT_MAX != rcv->tag && NULL != rcv->cbfunc) {
244
245 hdr.tag = rcv->tag;
246 rcv->cbfunc(pmix_globals.mypeer, &hdr, &buf, rcv->cbdata);
247 }
248 }
249 PMIX_DESTRUCT(&buf);
250
251 if (!pmix_globals.mypeer->finalized) {
252 PMIX_REPORT_EVENT(err, pmix_client_globals.myserver, PMIX_RANGE_PROC_LOCAL, _notify_complete);
253 }
254 }
255 }
256
257 static pmix_status_t send_msg(int sd, pmix_ptl_send_t *msg)
258 {
259 struct iovec iov[2];
260 int iov_count;
261 ssize_t remain = msg->sdbytes, rc;
262
263 iov[0].iov_base = msg->sdptr;
264 iov[0].iov_len = msg->sdbytes;
265 if (!msg->hdr_sent && NULL != msg->data) {
266 iov[1].iov_base = msg->data->base_ptr;
267 iov[1].iov_len = ntohl(msg->hdr.nbytes);
268 remain += ntohl(msg->hdr.nbytes);
269 iov_count = 2;
270 } else {
271 iov_count = 1;
272 }
273 retry:
274 rc = writev(sd, iov, iov_count);
275 if (PMIX_LIKELY(rc == remain)) {
276
277 msg->hdr_sent = true;
278 msg->sdbytes = 0;
279 msg->sdptr = (char *)iov[iov_count-1].iov_base + iov[iov_count-1].iov_len;
280 return PMIX_SUCCESS;
281 } else if (rc < 0) {
282 if (pmix_socket_errno == EINTR) {
283 goto retry;
284 } else if (pmix_socket_errno == EAGAIN) {
285
286
287
288
289 return PMIX_ERR_RESOURCE_BUSY;
290 } else if (pmix_socket_errno == EWOULDBLOCK) {
291
292
293
294
295 return PMIX_ERR_WOULD_BLOCK;
296 } else {
297
298 pmix_output(0, "pmix_ptl_base: send_msg: write failed: %s (%d) [sd = %d]",
299 strerror(pmix_socket_errno),
300 pmix_socket_errno, sd);
301 return PMIX_ERR_UNREACH;
302 }
303 } else {
304
305
306
307 if ((size_t)rc < msg->sdbytes) {
308
309 msg->sdptr = (char *)msg->sdptr + rc;
310 msg->sdbytes -= rc;
311 } else {
312
313 msg->hdr_sent = true;
314 rc -= msg->sdbytes;
315 if (NULL != msg->data) {
316
317
318
319
320
321
322 msg->sdptr = (char *)msg->data->base_ptr + rc;
323 }
324 msg->sdbytes = ntohl(msg->hdr.nbytes) - rc;
325 }
326 return PMIX_ERR_RESOURCE_BUSY;
327 }
328 }
329
330 static pmix_status_t read_bytes(int sd, char **buf, size_t *remain)
331 {
332 pmix_status_t ret = PMIX_SUCCESS;
333 int rc;
334 char *ptr = *buf;
335
336
337 while (0 < *remain) {
338 rc = read(sd, ptr, *remain);
339 if (rc < 0) {
340 if(pmix_socket_errno == EINTR) {
341 continue;
342 } else if (pmix_socket_errno == EAGAIN) {
343
344
345
346
347 ret = PMIX_ERR_RESOURCE_BUSY;
348 goto exit;
349 } else if (pmix_socket_errno == EWOULDBLOCK) {
350
351
352
353
354 ret = PMIX_ERR_WOULD_BLOCK;
355 goto exit;
356 }
357
358
359
360
361 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
362 "pmix_ptl_base_msg_recv: readv failed: %s (%d)",
363 strerror(pmix_socket_errno),
364 pmix_socket_errno);
365 ret = PMIX_ERR_UNREACH;
366 goto exit;
367 } else if (0 == rc) {
368
369 ret = PMIX_ERR_UNREACH;
370 goto exit;
371 }
372
373 *remain -= rc;
374 ptr += rc;
375 }
376
377 exit:
378 *buf = ptr;
379 return ret;
380 }
381
382
383
384
385
386 void pmix_ptl_base_send_handler(int sd, short flags, void *cbdata)
387 {
388 pmix_peer_t *peer = (pmix_peer_t*)cbdata;
389 pmix_ptl_send_t *msg = peer->send_msg;
390 pmix_status_t rc;
391
392
393 PMIX_ACQUIRE_OBJECT(peer);
394
395 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
396 "%s:%d ptl:base:send_handler SENDING TO PEER %s:%d tag %u with %s msg",
397 pmix_globals.myid.nspace, pmix_globals.myid.rank,
398 peer->info->pname.nspace, peer->info->pname.rank,
399 (NULL == msg) ? UINT_MAX : ntohl(msg->hdr.tag),
400 (NULL == msg) ? "NULL" : "NON-NULL");
401
402 if (NULL != msg) {
403 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
404 "ptl:base:send_handler SENDING MSG TO %s:%d TAG %u",
405 peer->info->pname.nspace, peer->info->pname.rank,
406 ntohl(msg->hdr.tag));
407 if (PMIX_SUCCESS == (rc = send_msg(peer->sd, msg))) {
408
409 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
410 "ptl:base:send_handler MSG SENT");
411 PMIX_RELEASE(msg);
412 peer->send_msg = NULL;
413 } else if (PMIX_ERR_RESOURCE_BUSY == rc ||
414 PMIX_ERR_WOULD_BLOCK == rc) {
415
416 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
417 "ptl:base:send_handler RES BUSY OR WOULD BLOCK");
418
419
420 PMIX_POST_OBJECT(peer);
421 return;
422 } else {
423 pmix_output_verbose(5, pmix_ptl_base_framework.framework_output,
424 "%s:%d SEND ERROR %s",
425 pmix_globals.myid.nspace, pmix_globals.myid.rank,
426 PMIx_Error_string(rc));
427
428 pmix_event_del(&peer->send_event);
429 peer->send_ev_active = false;
430 PMIX_RELEASE(msg);
431 peer->send_msg = NULL;
432 pmix_ptl_base_lost_connection(peer, rc);
433
434
435 PMIX_POST_OBJECT(peer);
436 return;
437 }
438
439
440
441
442
443
444
445 peer->send_msg = (pmix_ptl_send_t*)
446 pmix_list_remove_first(&peer->send_queue);
447 }
448
449
450 if (NULL == peer->send_msg && peer->send_ev_active) {
451 pmix_event_del(&peer->send_event);
452 peer->send_ev_active = false;
453 }
454
455
456 PMIX_POST_OBJECT(peer);
457 }
458
459
460
461
462
463
464 void pmix_ptl_base_recv_handler(int sd, short flags, void *cbdata)
465 {
466 pmix_status_t rc;
467 pmix_peer_t *peer = (pmix_peer_t*)cbdata;
468 pmix_ptl_recv_t *msg = NULL;
469 pmix_ptl_hdr_t hdr;
470 size_t nbytes;
471 char *ptr;
472
473
474 PMIX_ACQUIRE_OBJECT(peer);
475
476 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
477 "%s:%d ptl:base:recv:handler called with peer %s:%d",
478 pmix_globals.myid.nspace, pmix_globals.myid.rank,
479 (NULL == peer) ? "NULL" : peer->info->pname.nspace,
480 (NULL == peer) ? PMIX_RANK_UNDEF : peer->info->pname.rank);
481
482 if (NULL == peer) {
483 return;
484 }
485
486 if (NULL == peer->recv_msg) {
487 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
488 "ptl:base:recv:handler allocate new recv msg");
489 peer->recv_msg = PMIX_NEW(pmix_ptl_recv_t);
490 if (NULL == peer->recv_msg) {
491 pmix_output(0, "sptl:base:recv_handler: unable to allocate recv message\n");
492 goto err_close;
493 }
494 PMIX_RETAIN(peer);
495 peer->recv_msg->peer = peer;
496
497 peer->recv_msg->rdptr = (char*)&peer->recv_msg->hdr;
498 peer->recv_msg->rdbytes = sizeof(pmix_ptl_hdr_t);
499 }
500 msg = peer->recv_msg;
501 msg->sd = sd;
502
503 if (!msg->hdr_recvd) {
504 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
505 "ptl:base:recv:handler read hdr on socket %d", peer->sd);
506 nbytes = sizeof(pmix_ptl_hdr_t);
507 ptr = (char*)&hdr;
508 if (PMIX_SUCCESS == (rc = read_bytes(peer->sd, &ptr, &nbytes))) {
509
510 peer->recv_msg->hdr_recvd = true;
511
512 peer->recv_msg->hdr.pindex = ntohl(hdr.pindex);
513 peer->recv_msg->hdr.tag = ntohl(hdr.tag);
514 peer->recv_msg->hdr.nbytes = ntohl(hdr.nbytes);
515 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
516 "RECVD MSG FOR TAG %d SIZE %d",
517 (int)peer->recv_msg->hdr.tag,
518 (int)peer->recv_msg->hdr.nbytes);
519
520 if (0 == peer->recv_msg->hdr.nbytes) {
521 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
522 "RECVD ZERO-BYTE MESSAGE FROM %s:%u for tag %d",
523 peer->info->pname.nspace, peer->info->pname.rank,
524 peer->recv_msg->hdr.tag);
525 peer->recv_msg->data = NULL;
526 peer->recv_msg->rdptr = NULL;
527 peer->recv_msg->rdbytes = 0;
528
529 PMIX_ACTIVATE_POST_MSG(peer->recv_msg);
530 peer->recv_msg = NULL;
531 PMIX_POST_OBJECT(peer);
532 return;
533 } else {
534 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
535 "ptl:base:recv:handler allocate data region of size %lu",
536 (unsigned long)peer->recv_msg->hdr.nbytes);
537
538 if (pmix_ptl_globals.max_msg_size < peer->recv_msg->hdr.nbytes) {
539 pmix_show_help("help-pmix-runtime.txt", "ptl:msg_size", true,
540 (unsigned long)peer->recv_msg->hdr.nbytes,
541 (unsigned long)pmix_ptl_globals.max_msg_size);
542 goto err_close;
543 }
544 peer->recv_msg->data = (char*)malloc(peer->recv_msg->hdr.nbytes);
545 memset(peer->recv_msg->data, 0, peer->recv_msg->hdr.nbytes);
546
547 peer->recv_msg->rdptr = peer->recv_msg->data;
548 peer->recv_msg->rdbytes = peer->recv_msg->hdr.nbytes;
549 }
550
551 } else if (PMIX_ERR_RESOURCE_BUSY == rc ||
552 PMIX_ERR_WOULD_BLOCK == rc) {
553
554 return;
555 } else {
556
557
558
559 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
560 "ptl:base:msg_recv: peer %s:%d closed connection",
561 peer->nptr->nspace, peer->info->pname.rank);
562 goto err_close;
563 }
564 }
565
566 if (peer->recv_msg->hdr_recvd) {
567
568
569
570
571 if (PMIX_SUCCESS == (rc = read_bytes(peer->sd, &msg->rdptr, &msg->rdbytes))) {
572
573 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
574 "%s:%d RECVD COMPLETE MESSAGE FROM SERVER OF %d BYTES FOR TAG %d ON PEER SOCKET %d",
575 pmix_globals.myid.nspace, pmix_globals.myid.rank,
576 (int)peer->recv_msg->hdr.nbytes,
577 peer->recv_msg->hdr.tag, peer->sd);
578
579 PMIX_ACTIVATE_POST_MSG(peer->recv_msg);
580 peer->recv_msg = NULL;
581
582
583 PMIX_POST_OBJECT(peer);
584 return;
585 } else if (PMIX_ERR_RESOURCE_BUSY == rc ||
586 PMIX_ERR_WOULD_BLOCK == rc) {
587
588
589
590 PMIX_POST_OBJECT(peer);
591 return;
592 } else {
593
594
595
596 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
597 "%s:%d ptl:base:msg_recv: peer %s:%d closed connection",
598 pmix_globals.myid.nspace, pmix_globals.myid.rank,
599 peer->nptr->nspace, peer->info->pname.rank);
600 goto err_close;
601 }
602 }
603
604 return;
605
606 err_close:
607
608 if (peer->recv_ev_active) {
609 pmix_event_del(&peer->recv_event);
610 peer->recv_ev_active = false;
611 }
612 if (peer->send_ev_active) {
613 pmix_event_del(&peer->send_event);
614 peer->send_ev_active = false;
615 }
616 if (NULL != peer->recv_msg) {
617 PMIX_RELEASE(peer->recv_msg);
618 peer->recv_msg = NULL;
619 }
620 pmix_ptl_base_lost_connection(peer, PMIX_ERR_UNREACH);
621
622
623 PMIX_POST_OBJECT(peer);
624 }
625
626 void pmix_ptl_base_send(int sd, short args, void *cbdata)
627 {
628 pmix_ptl_queue_t *queue = (pmix_ptl_queue_t*)cbdata;
629 pmix_ptl_send_t *snd;
630
631
632 PMIX_ACQUIRE_OBJECT(queue);
633
634 if (NULL == queue->peer || queue->peer->sd < 0 ||
635 NULL == queue->peer->info || NULL == queue->peer->nptr) {
636
637 if (NULL != queue->buf) {
638 PMIX_RELEASE(queue->buf);
639 }
640 PMIX_RELEASE(queue);
641 return;
642 }
643
644 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
645 "[%s:%d] send to %s:%u on tag %d",
646 __FILE__, __LINE__,
647 (queue->peer)->info->pname.nspace,
648 (queue->peer)->info->pname.rank, (queue->tag));
649
650 if (NULL == queue->buf) {
651
652 PMIX_RELEASE(queue);
653 return;
654 }
655
656 snd = PMIX_NEW(pmix_ptl_send_t);
657 snd->hdr.pindex = htonl(pmix_globals.pindex);
658 snd->hdr.tag = htonl(queue->tag);
659 snd->hdr.nbytes = htonl((queue->buf)->bytes_used);
660 snd->data = (queue->buf);
661
662 snd->sdptr = (char*)&snd->hdr;
663 snd->sdbytes = sizeof(pmix_ptl_hdr_t);
664
665
666 if (NULL == (queue->peer)->send_msg) {
667 (queue->peer)->send_msg = snd;
668 } else {
669
670 pmix_list_append(&(queue->peer)->send_queue, &snd->super);
671 }
672
673 if (!(queue->peer)->send_ev_active) {
674 (queue->peer)->send_ev_active = true;
675 PMIX_POST_OBJECT(queue->peer);
676 pmix_event_add(&(queue->peer)->send_event, 0);
677 }
678 PMIX_RELEASE(queue);
679 PMIX_POST_OBJECT(snd);
680 }
681
682 void pmix_ptl_base_send_recv(int fd, short args, void *cbdata)
683 {
684 pmix_ptl_sr_t *ms = (pmix_ptl_sr_t*)cbdata;
685 pmix_ptl_posted_recv_t *req;
686 pmix_ptl_send_t *snd;
687 uint32_t tag;
688
689
690 PMIX_ACQUIRE_OBJECT(ms);
691
692 if (NULL == ms->peer || ms->peer->sd < 0 ||
693 NULL == ms->peer->info || NULL == ms->peer->nptr) {
694
695 if (NULL != ms->bfr) {
696 PMIX_RELEASE(ms->bfr);
697 }
698 PMIX_RELEASE(ms);
699 return;
700 }
701
702 if (NULL == ms->bfr) {
703
704 PMIX_RELEASE(ms);
705 return;
706 }
707
708
709 pmix_ptl_globals.current_tag++;
710 if (UINT32_MAX == pmix_ptl_globals.current_tag ) {
711 pmix_ptl_globals.current_tag = PMIX_PTL_TAG_DYNAMIC;
712 }
713 tag = pmix_ptl_globals.current_tag;
714
715 if (NULL != ms->cbfunc) {
716
717 req = PMIX_NEW(pmix_ptl_posted_recv_t);
718 req->tag = tag;
719 req->cbfunc = ms->cbfunc;
720 req->cbdata = ms->cbdata;
721
722 pmix_output_verbose(5, pmix_ptl_base_framework.framework_output,
723 "posting recv on tag %d", req->tag);
724
725
726
727 pmix_list_prepend(&pmix_ptl_globals.posted_recvs, &req->super);
728 }
729
730 pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
731 "QUEIENG MSG TO SERVER OF SIZE %d",
732 (int)ms->bfr->bytes_used);
733 snd = PMIX_NEW(pmix_ptl_send_t);
734 snd->hdr.pindex = htonl(pmix_globals.pindex);
735 snd->hdr.tag = htonl(tag);
736 snd->hdr.nbytes = htonl(ms->bfr->bytes_used);
737 snd->data = ms->bfr;
738
739 snd->sdptr = (char*)&snd->hdr;
740 snd->sdbytes = sizeof(pmix_ptl_hdr_t);
741
742
743 if (NULL == ms->peer->send_msg) {
744 ms->peer->send_msg = snd;
745 } else {
746
747 pmix_list_append(&ms->peer->send_queue, &snd->super);
748 }
749
750 if (!ms->peer->send_ev_active) {
751 ms->peer->send_ev_active = true;
752 PMIX_POST_OBJECT(snd);
753 pmix_event_add(&ms->peer->send_event, 0);
754 }
755
756 PMIX_RELEASE(ms);
757 PMIX_POST_OBJECT(snd);
758 }
759
760 void pmix_ptl_base_process_msg(int fd, short flags, void *cbdata)
761 {
762 pmix_ptl_recv_t *msg = (pmix_ptl_recv_t*)cbdata;
763 pmix_ptl_posted_recv_t *rcv;
764 pmix_buffer_t buf;
765
766
767 PMIX_ACQUIRE_OBJECT(msg);
768
769 pmix_output_verbose(5, pmix_ptl_base_framework.framework_output,
770 "%s:%d message received %d bytes for tag %u on socket %d",
771 pmix_globals.myid.nspace, pmix_globals.myid.rank,
772 (int)msg->hdr.nbytes, msg->hdr.tag, msg->sd);
773
774
775 PMIX_LIST_FOREACH(rcv, &pmix_ptl_globals.posted_recvs, pmix_ptl_posted_recv_t) {
776 pmix_output_verbose(5, pmix_ptl_base_framework.framework_output,
777 "checking msg on tag %u for tag %u",
778 msg->hdr.tag, rcv->tag);
779
780 if (msg->hdr.tag == rcv->tag || UINT_MAX == rcv->tag) {
781 if (NULL != rcv->cbfunc) {
782
783 PMIX_CONSTRUCT(&buf, pmix_buffer_t);
784 if (NULL != msg->data) {
785 PMIX_LOAD_BUFFER(msg->peer, &buf, msg->data, msg->hdr.nbytes);
786 } else {
787
788
789 buf.type = msg->peer->nptr->compat.type;
790 }
791 msg->data = NULL;
792 pmix_output_verbose(5, pmix_ptl_base_framework.framework_output,
793 "%s:%d EXECUTE CALLBACK for tag %u",
794 pmix_globals.myid.nspace, pmix_globals.myid.rank,
795 msg->hdr.tag);
796 rcv->cbfunc(msg->peer, &msg->hdr, &buf, rcv->cbdata);
797 pmix_output_verbose(5, pmix_ptl_base_framework.framework_output,
798 "%s:%d CALLBACK COMPLETE",
799 pmix_globals.myid.nspace, pmix_globals.myid.rank);
800 PMIX_DESTRUCT(&buf);
801 }
802
803 if (PMIX_PTL_TAG_DYNAMIC <= rcv->tag && UINT_MAX != rcv->tag) {
804 pmix_list_remove_item(&pmix_ptl_globals.posted_recvs, &rcv->super);
805 PMIX_RELEASE(rcv);
806 }
807 PMIX_RELEASE(msg);
808 return;
809 }
810 }
811
812
813
814 if (PMIX_PTL_TAG_DYNAMIC <= msg->hdr.tag) {
815 pmix_output(0, "UNEXPECTED MESSAGE tag = %d from source %s:%d",
816 msg->hdr.tag, msg->peer->info->pname.nspace,
817 msg->peer->info->pname.rank);
818 PMIX_REPORT_EVENT(PMIX_ERROR, msg->peer, PMIX_RANGE_NAMESPACE, _notify_complete);
819 PMIX_RELEASE(msg);
820 return;
821 }
822
823
824
825 pmix_list_append(&pmix_ptl_globals.unexpected_msgs, &msg->super);
826
827
828 PMIX_POST_OBJECT(msg);
829 }