This source file includes following definitions.
- dcd_con
- relfn
- pmix_server_get
- create_local_tracker
- pmix_pending_nspace_requests
- _satisfy_request
- pmix_pending_resolve
- _process_dmdx_reply
- dmdx_cbfunc
- get_timeout
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 #include <src/include/pmix_config.h>
19
20 #include <src/include/pmix_stdint.h>
21 #include <src/include/pmix_socket_errno.h>
22
23 #include <pmix_server.h>
24 #include <pmix_rename.h>
25 #include "src/include/pmix_globals.h"
26
27 #ifdef HAVE_STRING_H
28 #include <string.h>
29 #endif
30 #include <fcntl.h>
31 #ifdef HAVE_UNISTD_H
32 #include <unistd.h>
33 #endif
34 #ifdef HAVE_SYS_SOCKET_H
35 #include <sys/socket.h>
36 #endif
37 #ifdef HAVE_SYS_UN_H
38 #include <sys/un.h>
39 #endif
40 #ifdef HAVE_SYS_UIO_H
41 #include <sys/uio.h>
42 #endif
43 #ifdef HAVE_SYS_TYPES_H
44 #include <sys/types.h>
45 #endif
46 #include PMIX_EVENT_HEADER
47
48 #include "src/class/pmix_list.h"
49 #include "src/mca/bfrops/bfrops.h"
50 #include "src/mca/gds/gds.h"
51 #include "src/util/argv.h"
52 #include "src/util/error.h"
53 #include "src/util/output.h"
54 #include "src/util/pmix_environ.h"
55
56 #include "pmix_server_ops.h"
57
58 extern pmix_server_module_t pmix_host_server;
59
60 typedef struct {
61 pmix_object_t super;
62 pmix_event_t ev;
63 volatile bool active;
64 pmix_status_t status;
65 const char *data;
66 size_t ndata;
67 pmix_dmdx_local_t *lcd;
68 pmix_release_cbfunc_t relcbfunc;
69 void *cbdata;
70 } pmix_dmdx_reply_caddy_t;
71 static void dcd_con(pmix_dmdx_reply_caddy_t *p)
72 {
73 p->status = PMIX_ERROR;
74 p->ndata = 0;
75 p->lcd = NULL;
76 p->relcbfunc = NULL;
77 p->cbdata = NULL;
78 }
79 PMIX_CLASS_INSTANCE(pmix_dmdx_reply_caddy_t,
80 pmix_object_t, dcd_con, NULL);
81
82
83 static void dmdx_cbfunc(pmix_status_t status, const char *data,
84 size_t ndata, void *cbdata,
85 pmix_release_cbfunc_t relfn, void *relcbdata);
86 static pmix_status_t _satisfy_request(pmix_namespace_t *ns, pmix_rank_t rank,
87 pmix_server_caddy_t *cd,
88 pmix_modex_cbfunc_t cbfunc, void *cbdata, bool *scope);
89 static pmix_status_t create_local_tracker(char nspace[], pmix_rank_t rank,
90 pmix_info_t info[], size_t ninfo,
91 pmix_modex_cbfunc_t cbfunc,
92 void *cbdata,
93 pmix_dmdx_local_t **lcd,
94 pmix_dmdx_request_t **rq);
95
96 static void get_timeout(int sd, short args, void *cbdata);
97
98
99
100
101
102 static void relfn(void *cbdata)
103 {
104 char *data = (char*)cbdata;
105 if (NULL != data) {
106 free(data);
107 }
108 }
109
110
111 pmix_status_t pmix_server_get(pmix_buffer_t *buf,
112 pmix_modex_cbfunc_t cbfunc,
113 void *cbdata)
114 {
115 pmix_server_caddy_t *cd = (pmix_server_caddy_t*)cbdata;
116 int32_t cnt;
117 pmix_status_t rc;
118 pmix_rank_t rank;
119 char *cptr;
120 char nspace[PMIX_MAX_NSLEN+1];
121 pmix_namespace_t *ns, *nptr;
122 pmix_info_t *info=NULL;
123 size_t ninfo=0;
124 pmix_dmdx_local_t *lcd;
125 pmix_dmdx_request_t *req;
126 bool local;
127 bool localonly = false;
128 struct timeval tv = {0, 0};
129 pmix_buffer_t pbkt, pkt;
130 pmix_byte_object_t bo;
131 pmix_cb_t cb;
132 pmix_proc_t proc;
133 char *data;
134 size_t sz, n;
135 pmix_peer_t *peer;
136
137 pmix_output_verbose(2, pmix_server_globals.get_output,
138 "recvd GET");
139
140
141 memset(nspace, 0, sizeof(nspace));
142
143
144 cnt = 1;
145 PMIX_BFROPS_UNPACK(rc, cd->peer, buf, &cptr, &cnt, PMIX_STRING);
146 if (PMIX_SUCCESS != rc) {
147 PMIX_ERROR_LOG(rc);
148 return rc;
149 }
150 pmix_strncpy(nspace, cptr, PMIX_MAX_NSLEN);
151 free(cptr);
152 cnt = 1;
153 PMIX_BFROPS_UNPACK(rc, cd->peer, buf, &rank, &cnt, PMIX_PROC_RANK);
154 if (PMIX_SUCCESS != rc) {
155 PMIX_ERROR_LOG(rc);
156 return rc;
157 }
158
159 cnt = 1;
160 PMIX_BFROPS_UNPACK(rc, cd->peer, buf, &ninfo, &cnt, PMIX_SIZE);
161 if (PMIX_SUCCESS != rc) {
162 PMIX_ERROR_LOG(rc);
163 return rc;
164 }
165 if (0 < ninfo) {
166 PMIX_INFO_CREATE(info, ninfo);
167 if (NULL == info) {
168 PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
169 return PMIX_ERR_NOMEM;
170 }
171 cnt = ninfo;
172 PMIX_BFROPS_UNPACK(rc, cd->peer, buf, info, &cnt, PMIX_INFO);
173 if (PMIX_SUCCESS != rc) {
174 PMIX_ERROR_LOG(rc);
175 PMIX_INFO_FREE(info, ninfo);
176 return rc;
177 }
178 }
179
180
181 for (n=0; n < ninfo; n++) {
182 if (0 == strncmp(info[n].key, PMIX_IMMEDIATE, PMIX_MAX_KEYLEN)) {
183
184
185 localonly = PMIX_INFO_TRUE(&info[n]);
186 } else if (0 == strncmp(info[n].key, PMIX_TIMEOUT, PMIX_MAX_KEYLEN)) {
187 tv.tv_sec = info[n].value.data.uint32;
188 }
189 }
190
191
192 nptr = NULL;
193 PMIX_LIST_FOREACH(ns, &pmix_server_globals.nspaces, pmix_namespace_t) {
194 if (0 == strcmp(nspace, ns->nspace)) {
195 nptr = ns;
196 break;
197 }
198 }
199
200 pmix_output_verbose(2, pmix_server_globals.get_output,
201 "%s:%d EXECUTE GET FOR %s:%d ON BEHALF OF %s:%d",
202 pmix_globals.myid.nspace,
203 pmix_globals.myid.rank, nspace, rank,
204 cd->peer->info->pname.nspace,
205 cd->peer->info->pname.rank);
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228 if (NULL == nptr) {
229 if (localonly) {
230
231
232 return PMIX_ERR_NOT_FOUND;
233 }
234
235
236
237
238
239
240
241 rc = create_local_tracker(nspace, rank,
242 info, ninfo,
243 cbfunc, cbdata, &lcd, &req);
244 if (PMIX_ERR_NOMEM == rc) {
245 PMIX_INFO_FREE(info, ninfo);
246 return rc;
247 }
248 if (PMIX_SUCCESS == rc) {
249
250
251 if (0 < tv.tv_sec) {
252 pmix_event_evtimer_set(pmix_globals.evbase, &req->ev,
253 get_timeout, req);
254 pmix_event_evtimer_add(&req->ev, &tv);
255 req->event_active = true;
256 }
257
258
259 return PMIX_SUCCESS;
260 }
261
262
263
264
265
266
267
268
269 if (NULL != pmix_host_server.direct_modex) {
270 rc = pmix_host_server.direct_modex(&lcd->proc, info, ninfo, dmdx_cbfunc, lcd);
271 if (PMIX_SUCCESS != rc) {
272 PMIX_INFO_FREE(info, ninfo);
273 pmix_list_remove_item(&pmix_server_globals.local_reqs, &lcd->super);
274 PMIX_RELEASE(lcd);
275 return rc;
276 }
277
278
279 if (0 < tv.tv_sec) {
280 pmix_event_evtimer_set(pmix_globals.evbase, &req->ev,
281 get_timeout, req);
282 pmix_event_evtimer_add(&req->ev, &tv);
283 req->event_active = true;
284 }
285 } else {
286
287 PMIX_INFO_FREE(info, ninfo);
288 pmix_list_remove_item(&pmix_server_globals.local_reqs, &lcd->super);
289 PMIX_RELEASE(lcd);
290 return PMIX_ERR_NOT_FOUND;
291 }
292
293 return PMIX_SUCCESS;
294 }
295
296
297
298
299 if (PMIX_RANK_WILDCARD == rank) {
300
301
302
303 data = NULL;
304 sz = 0;
305 pmix_strncpy(proc.nspace, nspace, PMIX_MAX_NSLEN);
306 proc.rank = PMIX_RANK_WILDCARD;
307
308
309
310 PMIX_CONSTRUCT(&cb, pmix_cb_t);
311 peer = pmix_globals.mypeer;
312
313
314
315 cb.proc = &proc;
316 cb.scope = PMIX_SCOPE_UNDEF;
317 cb.copy = false;
318 PMIX_GDS_FETCH_KV(rc, peer, &cb);
319 if (PMIX_SUCCESS != rc) {
320 PMIX_DESTRUCT(&cb);
321 return rc;
322 }
323 PMIX_CONSTRUCT(&pkt, pmix_buffer_t);
324
325 PMIX_GDS_ASSEMB_KVS_REQ(rc, peer, &proc, &cb.kvs, &pkt, cd);
326 if (PMIX_SUCCESS != rc) {
327 PMIX_ERROR_LOG(rc);
328 PMIX_DESTRUCT(&cb);
329 return rc;
330 }
331 PMIX_UNLOAD_BUFFER(&pkt, bo.bytes, bo.size);
332 PMIX_DESTRUCT(&pkt);
333
334 PMIX_CONSTRUCT(&pbkt, pmix_buffer_t);
335 PMIX_BFROPS_PACK(rc, cd->peer, &pbkt, &bo, 1, PMIX_BYTE_OBJECT);
336 free(bo.bytes);
337 if (PMIX_SUCCESS != rc) {
338 PMIX_ERROR_LOG(rc);
339 PMIX_DESTRUCT(&pbkt);
340 PMIX_DESTRUCT(&cb);
341 return rc;
342 }
343
344 PMIX_UNLOAD_BUFFER(&pbkt, data, sz);
345 PMIX_DESTRUCT(&pbkt);
346
347
348 cbfunc(PMIX_SUCCESS, data, sz, cbdata, relfn, data);
349
350
351 return PMIX_SUCCESS;
352 }
353
354
355
356
357
358
359
360 if (!nptr->all_registered) {
361 pmix_output_verbose(2, pmix_server_globals.get_output,
362 "%s:%d NSPACE %s not all registered",
363 pmix_globals.myid.nspace,
364 pmix_globals.myid.rank, nspace);
365
366 if (localonly) {
367
368 pmix_output_verbose(2, pmix_server_globals.get_output,
369 "%s:%d CLIENT REQUESTED IMMEDIATE",
370 pmix_globals.myid.nspace,
371 pmix_globals.myid.rank);
372 return PMIX_ERR_NOT_FOUND;
373 }
374
375
376 rc = create_local_tracker(nspace, rank, info, ninfo,
377 cbfunc, cbdata, &lcd, &req);
378 if (PMIX_ERR_NOMEM == rc) {
379 PMIX_INFO_FREE(info, ninfo);
380 return rc;
381 }
382 pmix_output_verbose(2, pmix_server_globals.get_output,
383 "%s:%d TRACKER CREATED - WAITING",
384 pmix_globals.myid.nspace,
385 pmix_globals.myid.rank);
386
387 if (0 < tv.tv_sec) {
388 pmix_event_evtimer_set(pmix_globals.evbase, &req->ev,
389 get_timeout, req);
390 pmix_event_evtimer_add(&req->ev, &tv);
391 req->event_active = true;
392 }
393
394
395 return PMIX_SUCCESS;
396 }
397
398
399 rc = _satisfy_request(nptr, rank, cd, cbfunc, cbdata, &local);
400 if( PMIX_SUCCESS == rc ){
401
402 PMIX_INFO_FREE(info, ninfo);
403
404
405
406 return PMIX_SUCCESS;
407 }
408
409 pmix_output_verbose(2, pmix_server_globals.get_output,
410 "%s:%d DATA NOT FOUND",
411 pmix_globals.myid.nspace,
412 pmix_globals.myid.rank);
413
414
415
416 if (localonly) {
417 pmix_output_verbose(2, pmix_server_globals.get_output,
418 "%s:%d CLIENT REQUESTED IMMEDIATE",
419 pmix_globals.myid.nspace,
420 pmix_globals.myid.rank);
421 return PMIX_ERR_NOT_FOUND;
422 }
423
424
425
426 rc = create_local_tracker(nspace, rank, info, ninfo,
427 cbfunc, cbdata, &lcd, &req);
428 if (PMIX_ERR_NOMEM == rc || NULL == lcd) {
429
430 PMIX_INFO_FREE(info, ninfo);
431 return PMIX_ERR_NOMEM;
432 }
433
434 if (0 < tv.tv_sec) {
435 pmix_event_evtimer_set(pmix_globals.evbase, &req->ev,
436 get_timeout, req);
437 pmix_event_evtimer_add(&req->ev, &tv);
438 req->event_active = true;
439 }
440 if (PMIX_SUCCESS == rc) {
441
442
443
444 return PMIX_SUCCESS;
445 }
446
447
448
449
450
451
452 if (local) {
453 return PMIX_SUCCESS;
454 }
455
456
457
458
459 if (NULL != pmix_host_server.direct_modex) {
460 rc = pmix_host_server.direct_modex(&lcd->proc, info, ninfo, dmdx_cbfunc, lcd);
461 if (PMIX_SUCCESS != rc) {
462
463 PMIX_INFO_FREE(info, ninfo);
464 pmix_list_remove_item(&pmix_server_globals.local_reqs, &lcd->super);
465 PMIX_RELEASE(lcd);
466 }
467 } else {
468 pmix_output_verbose(2, pmix_server_globals.get_output,
469 "%s:%d NO SERVER SUPPORT",
470 pmix_globals.myid.nspace,
471 pmix_globals.myid.rank);
472
473 PMIX_INFO_FREE(info, ninfo);
474 pmix_list_remove_item(&pmix_server_globals.local_reqs, &lcd->super);
475 PMIX_RELEASE(lcd);
476 rc = PMIX_ERR_NOT_FOUND;
477 }
478
479 return rc;
480 }
481
482 static pmix_status_t create_local_tracker(char nspace[], pmix_rank_t rank,
483 pmix_info_t info[], size_t ninfo,
484 pmix_modex_cbfunc_t cbfunc,
485 void *cbdata,
486 pmix_dmdx_local_t **ld,
487 pmix_dmdx_request_t **rq)
488 {
489 pmix_dmdx_local_t *lcd, *cd;
490 pmix_dmdx_request_t *req;
491 pmix_status_t rc;
492
493
494 *ld = NULL;
495 *rq = NULL;
496
497
498
499 lcd = NULL;
500 PMIX_LIST_FOREACH(cd, &pmix_server_globals.local_reqs, pmix_dmdx_local_t) {
501 if (0 != strncmp(nspace, cd->proc.nspace, PMIX_MAX_NSLEN) ||
502 rank != cd->proc.rank ) {
503 continue;
504 }
505 lcd = cd;
506 break;
507 }
508 if (NULL != lcd) {
509
510
511 rc = PMIX_SUCCESS;
512 goto complete;
513 }
514
515
516 lcd = PMIX_NEW(pmix_dmdx_local_t);
517 if (NULL == lcd){
518 return PMIX_ERR_NOMEM;
519 }
520 pmix_strncpy(lcd->proc.nspace, nspace, PMIX_MAX_NSLEN);
521 lcd->proc.rank = rank;
522 lcd->info = info;
523 lcd->ninfo = ninfo;
524 pmix_list_append(&pmix_server_globals.local_reqs, &lcd->super);
525 rc = PMIX_ERR_NOT_FOUND;
526
527 complete:
528
529
530 req = PMIX_NEW(pmix_dmdx_request_t);
531 if (NULL == req) {
532 *ld = lcd;
533 return PMIX_ERR_NOMEM;
534 }
535 PMIX_RETAIN(lcd);
536 req->lcd = lcd;
537 req->cbfunc = cbfunc;
538 req->cbdata = cbdata;
539 pmix_list_append(&lcd->loc_reqs, &req->super);
540 *ld = lcd;
541 *rq = req;
542 return rc;
543 }
544
545 void pmix_pending_nspace_requests(pmix_namespace_t *nptr)
546 {
547 pmix_dmdx_local_t *cd, *cd_next;
548 pmix_status_t rc;
549
550
551
552
553
554 PMIX_LIST_FOREACH_SAFE(cd, cd_next, &pmix_server_globals.local_reqs, pmix_dmdx_local_t) {
555 pmix_rank_info_t *info;
556 bool found = false;
557
558 if (0 != strncmp(nptr->nspace, cd->proc.nspace, PMIX_MAX_NSLEN) ) {
559 continue;
560 }
561
562 PMIX_LIST_FOREACH(info, &nptr->ranks, pmix_rank_info_t) {
563 if (info->pname.rank == cd->proc.rank) {
564 found = true;
565 break;
566 }
567 }
568
569
570
571 if (!found){
572 rc = PMIX_ERR_NOT_SUPPORTED;
573 if (NULL != pmix_host_server.direct_modex){
574 rc = pmix_host_server.direct_modex(&cd->proc, cd->info, cd->ninfo, dmdx_cbfunc, cd);
575 }
576 if (PMIX_SUCCESS != rc) {
577 pmix_dmdx_request_t *req, *req_next;
578 PMIX_LIST_FOREACH_SAFE(req, req_next, &cd->loc_reqs, pmix_dmdx_request_t) {
579 req->cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, req->cbdata, NULL, NULL);
580 pmix_list_remove_item(&cd->loc_reqs, &req->super);
581 PMIX_RELEASE(req);
582 }
583 pmix_list_remove_item(&pmix_server_globals.local_reqs, &cd->super);
584 PMIX_RELEASE(cd);
585 }
586 }
587 }
588 }
589
590 static pmix_status_t _satisfy_request(pmix_namespace_t *nptr, pmix_rank_t rank,
591 pmix_server_caddy_t *cd,
592 pmix_modex_cbfunc_t cbfunc,
593 void *cbdata, bool *local)
594 {
595 pmix_status_t rc;
596 bool found = false;
597 pmix_buffer_t pbkt, pkt;
598 pmix_rank_info_t *iptr;
599 pmix_proc_t proc;
600 pmix_cb_t cb;
601 pmix_peer_t *peer = NULL;
602 pmix_byte_object_t bo;
603 char *data = NULL;
604 size_t sz = 0;
605 pmix_scope_t scope = PMIX_SCOPE_UNDEF;
606
607 pmix_output_verbose(2, pmix_server_globals.get_output,
608 "%s:%d SATISFY REQUEST CALLED",
609 pmix_globals.myid.nspace,
610 pmix_globals.myid.rank);
611
612
613
614
615
616 PMIX_CONSTRUCT(&pbkt, pmix_buffer_t);
617 pmix_strncpy(proc.nspace, nptr->nspace, PMIX_MAX_NSLEN);
618
619
620
621
622 if (0 < nptr->nlocalprocs) {
623 if (local) {
624 *local = true;
625 }
626 if (PMIX_RANK_WILDCARD != rank) {
627 peer = NULL;
628
629 PMIX_LIST_FOREACH(iptr, &nptr->ranks, pmix_rank_info_t) {
630 if (rank == iptr->pname.rank) {
631 scope = PMIX_LOCAL;
632 if (0 <= iptr->peerid) {
633 peer = (pmix_peer_t*)pmix_pointer_array_get_item(&pmix_server_globals.clients, iptr->peerid);
634 }
635 if (NULL == peer) {
636
637 return PMIX_ERR_NOT_FOUND;
638 }
639 break;
640 }
641 }
642 if (PMIX_LOCAL != scope) {
643
644 if (local) {
645 *local = false;
646 }
647 scope = PMIX_REMOTE;
648 peer = pmix_globals.mypeer;
649 }
650 }
651 } else {
652 if (local) {
653 *local = false;
654 }
655 peer = pmix_globals.mypeer;
656 scope = PMIX_REMOTE;
657 }
658
659
660
661
662 if (PMIX_RANK_WILDCARD == rank ||
663 0 != strncmp(nptr->nspace, cd->peer->info->pname.nspace, PMIX_MAX_NSLEN)) {
664 proc.rank = PMIX_RANK_WILDCARD;
665 PMIX_CONSTRUCT(&cb, pmix_cb_t);
666
667
668
669 cb.proc = &proc;
670 cb.scope = PMIX_INTERNAL;
671 cb.copy = false;
672 PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, &cb);
673 if (PMIX_SUCCESS == rc) {
674 PMIX_CONSTRUCT(&pkt, pmix_buffer_t);
675
676 PMIX_GDS_ASSEMB_KVS_REQ(rc, cd->peer, &proc, &cb.kvs, &pkt, cd);
677 if (rc != PMIX_SUCCESS) {
678 PMIX_ERROR_LOG(rc);
679 PMIX_DESTRUCT(&pkt);
680 PMIX_DESTRUCT(&pbkt);
681 PMIX_DESTRUCT(&cb);
682 return rc;
683 }
684 if (PMIX_PROC_IS_V1(cd->peer)) {
685
686
687
688 pmix_buffer_t xfer;
689 PMIX_CONSTRUCT(&xfer, pmix_buffer_t);
690 PMIX_BFROPS_PACK(rc, cd->peer, &xfer, &pkt, 1, PMIX_BUFFER);
691 if (PMIX_SUCCESS != rc) {
692 PMIX_ERROR_LOG(rc);
693 PMIX_DESTRUCT(&pkt);
694 PMIX_DESTRUCT(&pbkt);
695 PMIX_DESTRUCT(&xfer);
696 PMIX_DESTRUCT(&cb);
697 return rc;
698 }
699 PMIX_UNLOAD_BUFFER(&xfer, bo.bytes, bo.size);
700 PMIX_DESTRUCT(&xfer);
701 } else {
702 PMIX_UNLOAD_BUFFER(&pkt, bo.bytes, bo.size);
703 }
704 PMIX_DESTRUCT(&pkt);
705
706 PMIX_BFROPS_PACK(rc, cd->peer, &pbkt, &bo, 1, PMIX_BYTE_OBJECT);
707 if (PMIX_SUCCESS != rc) {
708 PMIX_ERROR_LOG(rc);
709 PMIX_DESTRUCT(&pbkt);
710 PMIX_DESTRUCT(&cb);
711 return rc;
712 }
713 }
714 PMIX_DESTRUCT(&cb);
715 if (rank == PMIX_RANK_WILDCARD) {
716 found = true;
717 }
718 }
719
720
721 if (PMIX_RANK_WILDCARD != rank) {
722 if (!PMIX_PROC_IS_SERVER(peer) && !peer->commit_cnt) {
723
724
725
726
727
728
729
730 return PMIX_ERR_NOT_FOUND;
731 }
732 proc.rank = rank;
733 PMIX_CONSTRUCT(&cb, pmix_cb_t);
734
735
736
737 cb.proc = &proc;
738 cb.scope = scope;
739 cb.copy = false;
740 PMIX_GDS_FETCH_KV(rc, peer, &cb);
741 if (PMIX_SUCCESS == rc) {
742 found = true;
743 PMIX_CONSTRUCT(&pkt, pmix_buffer_t);
744
745 PMIX_GDS_ASSEMB_KVS_REQ(rc, cd->peer, &proc, &cb.kvs, &pkt, cd);
746 if (rc != PMIX_SUCCESS) {
747 PMIX_ERROR_LOG(rc);
748 PMIX_DESTRUCT(&pkt);
749 PMIX_DESTRUCT(&pbkt);
750 PMIX_DESTRUCT(&cb);
751 return rc;
752 }
753 if (PMIX_PROC_IS_V1(cd->peer)) {
754
755
756
757
758 PMIX_BFROPS_PACK(rc, cd->peer, &pbkt, &rank, 1, PMIX_PROC_RANK);
759 if (PMIX_SUCCESS != rc) {
760 PMIX_ERROR_LOG(rc);
761 PMIX_DESTRUCT(&pkt);
762 PMIX_DESTRUCT(&pbkt);
763 PMIX_DESTRUCT(&cb);
764 return rc;
765 }
766
767 PMIX_BFROPS_PACK(rc, cd->peer, &pbkt, &pkt, 1, PMIX_BUFFER);
768 if (PMIX_SUCCESS != rc) {
769 PMIX_ERROR_LOG(rc);
770 PMIX_DESTRUCT(&pkt);
771 PMIX_DESTRUCT(&pbkt);
772 PMIX_DESTRUCT(&cb);
773 return rc;
774 }
775 PMIX_DESTRUCT(&pkt);
776 } else {
777 PMIX_UNLOAD_BUFFER(&pkt, bo.bytes, bo.size);
778 PMIX_DESTRUCT(&pkt);
779
780 PMIX_BFROPS_PACK(rc, cd->peer, &pbkt, &bo, 1, PMIX_BYTE_OBJECT);
781 if (PMIX_SUCCESS != rc) {
782 PMIX_ERROR_LOG(rc);
783 PMIX_DESTRUCT(&pbkt);
784 PMIX_DESTRUCT(&cb);
785 return rc;
786 }
787 }
788 }
789 PMIX_DESTRUCT(&cb);
790 }
791 PMIX_UNLOAD_BUFFER(&pbkt, data, sz);
792 PMIX_DESTRUCT(&pbkt);
793
794 if (found) {
795
796 cbfunc(rc, data, sz, cbdata, relfn, data);
797 return rc;
798 }
799
800 return PMIX_ERR_NOT_FOUND;
801 }
802
803
804 pmix_status_t pmix_pending_resolve(pmix_namespace_t *nptr, pmix_rank_t rank,
805 pmix_status_t status, pmix_dmdx_local_t *lcd)
806 {
807 pmix_dmdx_local_t *cd, *ptr;
808 pmix_dmdx_request_t *req;
809 pmix_server_caddy_t *scd;
810
811
812 if (NULL == lcd) {
813 ptr = NULL;
814 if (NULL != nptr) {
815 PMIX_LIST_FOREACH(cd, &pmix_server_globals.local_reqs, pmix_dmdx_local_t) {
816 if (!PMIX_CHECK_NSPACE(nptr->nspace, cd->proc.nspace) ||
817 rank != cd->proc.rank) {
818 continue;
819 }
820 ptr = cd;
821 break;
822 }
823 }
824 if (NULL == ptr) {
825 return PMIX_SUCCESS;
826 }
827 } else {
828 ptr = lcd;
829 }
830
831
832
833
834 if (0 == pmix_list_get_size(&ptr->loc_reqs)) {
835 goto cleanup;
836 }
837
838
839 if (PMIX_SUCCESS != status){
840
841 PMIX_LIST_FOREACH(req, &ptr->loc_reqs, pmix_dmdx_request_t) {
842 req->cbfunc(status, NULL, 0, req->cbdata, NULL, NULL);
843 }
844 } else if (NULL != nptr) {
845
846
847
848
849 scd = PMIX_NEW(pmix_server_caddy_t);
850 PMIX_RETAIN(pmix_globals.mypeer);
851 scd->peer = pmix_globals.mypeer;
852 PMIX_LIST_FOREACH(req, &ptr->loc_reqs, pmix_dmdx_request_t) {
853 pmix_status_t rc;
854 rc = _satisfy_request(nptr, rank, scd, req->cbfunc, req->cbdata, NULL);
855 if( PMIX_SUCCESS != rc ){
856
857 req->cbfunc(rc, NULL, 0, req->cbdata, NULL, NULL);
858 }
859 }
860 PMIX_RELEASE(scd);
861 }
862
863 cleanup:
864
865 pmix_list_remove_item(&pmix_server_globals.local_reqs, &ptr->super);
866 PMIX_RELEASE(ptr);
867
868 return PMIX_SUCCESS;
869 }
870
871
872 static void _process_dmdx_reply(int fd, short args, void *cbdata)
873 {
874 pmix_dmdx_reply_caddy_t *caddy = (pmix_dmdx_reply_caddy_t *)cbdata;
875 pmix_server_caddy_t *cd;
876 pmix_peer_t *peer;
877 pmix_rank_info_t *rinfo;
878 int32_t cnt;
879 pmix_kval_t *kv;
880 pmix_namespace_t *ns, *nptr;
881 pmix_status_t rc;
882 pmix_list_t nspaces;
883 pmix_nspace_caddy_t *nm;
884 pmix_dmdx_request_t *dm;
885 bool found;
886 pmix_buffer_t pbkt;
887 pmix_cb_t cb;
888
889 PMIX_ACQUIRE_OBJECT(caddy);
890
891 pmix_output_verbose(2, pmix_server_globals.get_output,
892 "[%s:%d] process dmdx reply from %s:%u",
893 __FILE__, __LINE__,
894 caddy->lcd->proc.nspace, caddy->lcd->proc.rank);
895
896
897 nptr = NULL;
898 PMIX_LIST_FOREACH(ns, &pmix_server_globals.nspaces, pmix_namespace_t) {
899 if (0 == strcmp(caddy->lcd->proc.nspace, ns->nspace)) {
900 nptr = ns;
901 break;
902 }
903 }
904
905 if (NULL == nptr) {
906
907
908
909 nptr = PMIX_NEW(pmix_namespace_t);
910 nptr->nspace = strdup(caddy->lcd->proc.nspace);
911
912 pmix_list_append(&pmix_server_globals.nspaces, &nptr->super);
913 }
914
915
916
917
918
919
920
921
922
923
924
925
926
927 if (PMIX_SUCCESS == caddy->status) {
928
929
930 PMIX_CONSTRUCT(&nspaces, pmix_list_t);
931 PMIX_LIST_FOREACH(dm, &caddy->lcd->loc_reqs, pmix_dmdx_request_t) {
932
933
934 cd = (pmix_server_caddy_t*)dm->cbdata;
935 found = false;
936 PMIX_LIST_FOREACH(nm, &nspaces, pmix_nspace_caddy_t) {
937 if (0 == strcmp(nm->ns->nspace, cd->peer->nptr->nspace)) {
938 found = true;
939 break;
940 }
941 }
942 if (!found) {
943
944 nm = PMIX_NEW(pmix_nspace_caddy_t);
945 PMIX_RETAIN(cd->peer->nptr);
946 nm->ns = cd->peer->nptr;
947 pmix_list_append(&nspaces, &nm->super);
948 }
949 }
950
951
952 PMIX_LIST_FOREACH(nm, &nspaces, pmix_nspace_caddy_t) {
953 if (NULL == nm->ns->compat.gds || 0 == nm->ns->nlocalprocs) {
954 peer = pmix_globals.mypeer;
955 } else {
956
957 rinfo = (pmix_rank_info_t*)pmix_list_get_first(&nm->ns->ranks);
958 peer = (pmix_peer_t*)pmix_pointer_array_get_item(&pmix_server_globals.clients, rinfo->peerid);
959 }
960 PMIX_CONSTRUCT(&pbkt, pmix_buffer_t);
961 if (NULL == caddy->data) {
962
963
964
965
966 PMIX_CONSTRUCT(&cb, pmix_cb_t);
967 PMIX_PROC_CREATE(cb.proc, 1);
968 if (NULL == cb.proc) {
969 PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
970 PMIX_DESTRUCT(&cb);
971 goto complete;
972 }
973 pmix_strncpy(cb.proc->nspace, nm->ns->nspace, PMIX_MAX_NSLEN);
974 cb.proc->rank = PMIX_RANK_WILDCARD;
975 cb.scope = PMIX_INTERNAL;
976 cb.copy = false;
977 PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, &cb);
978 if (PMIX_SUCCESS != rc) {
979 PMIX_ERROR_LOG(rc);
980 PMIX_DESTRUCT(&cb);
981 goto complete;
982 }
983 PMIX_LIST_FOREACH(kv, &cb.kvs, pmix_kval_t) {
984 PMIX_GDS_STORE_KV(rc, peer, &caddy->lcd->proc, PMIX_INTERNAL, kv);
985 if (PMIX_SUCCESS != rc) {
986 PMIX_ERROR_LOG(rc);
987 break;
988 }
989 }
990 PMIX_DESTRUCT(&cb);
991 } else {
992 PMIX_LOAD_BUFFER(pmix_globals.mypeer, &pbkt, caddy->data, caddy->ndata);
993
994 kv = PMIX_NEW(pmix_kval_t);
995 cnt = 1;
996 PMIX_BFROPS_UNPACK(rc, pmix_globals.mypeer, &pbkt, kv, &cnt, PMIX_KVAL);
997 while (PMIX_SUCCESS == rc) {
998 if (caddy->lcd->proc.rank == PMIX_RANK_WILDCARD) {
999 PMIX_GDS_STORE_KV(rc, peer, &caddy->lcd->proc, PMIX_INTERNAL, kv);
1000 } else {
1001 PMIX_GDS_STORE_KV(rc, peer, &caddy->lcd->proc, PMIX_REMOTE, kv);
1002 }
1003 if (PMIX_SUCCESS != rc) {
1004 PMIX_ERROR_LOG(rc);
1005 caddy->status = rc;
1006 goto complete;
1007 }
1008 PMIX_RELEASE(kv);
1009 kv = PMIX_NEW(pmix_kval_t);
1010 cnt = 1;
1011 PMIX_BFROPS_UNPACK(rc, pmix_globals.mypeer, &pbkt, kv, &cnt, PMIX_KVAL);
1012 }
1013 PMIX_RELEASE(kv);
1014 pbkt.base_ptr = NULL;
1015 PMIX_DESTRUCT(&pbkt);
1016 if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
1017 PMIX_ERROR_LOG(rc);
1018 caddy->status = rc;
1019 goto complete;
1020 }
1021 }
1022 }
1023 PMIX_LIST_DESTRUCT(&nspaces);
1024 }
1025
1026 complete:
1027
1028 pmix_pending_resolve(nptr, caddy->lcd->proc.rank, caddy->status, caddy->lcd);
1029
1030
1031
1032 if (NULL != caddy->relcbfunc) {
1033 caddy->relcbfunc(caddy->cbdata);
1034 }
1035 PMIX_RELEASE(caddy);
1036 }
1037
1038
1039
1040 static void dmdx_cbfunc(pmix_status_t status,
1041 const char *data, size_t ndata, void *cbdata,
1042 pmix_release_cbfunc_t release_fn, void *release_cbdata)
1043 {
1044 pmix_dmdx_reply_caddy_t *caddy;
1045
1046
1047
1048
1049 caddy = PMIX_NEW(pmix_dmdx_reply_caddy_t);
1050 caddy->status = status;
1051
1052 caddy->relcbfunc = release_fn;
1053 caddy->cbdata = release_cbdata;
1054
1055
1056
1057 caddy->data = data;
1058 caddy->ndata = ndata;
1059 caddy->lcd = (pmix_dmdx_local_t *)cbdata;
1060 pmix_output_verbose(2, pmix_server_globals.get_output,
1061 "[%s:%d] queue dmdx reply for %s:%u",
1062 __FILE__, __LINE__,
1063 caddy->lcd->proc.nspace, caddy->lcd->proc.rank);
1064 PMIX_THREADSHIFT(caddy, _process_dmdx_reply);
1065 }
1066
1067 static void get_timeout(int sd, short args, void *cbdata)
1068 {
1069 pmix_dmdx_request_t *req = (pmix_dmdx_request_t*)cbdata;
1070
1071 pmix_output_verbose(2, pmix_server_globals.get_output,
1072 "ALERT: get timeout fired");
1073
1074 if (NULL != req->cbfunc) {
1075 req->cbfunc(PMIX_ERR_TIMEOUT, NULL, 0, req->cbdata, NULL, NULL);
1076 }
1077 req->event_active = false;
1078 pmix_list_remove_item(&req->lcd->loc_reqs, &req->super);
1079 PMIX_RELEASE(req);
1080 }