This source file includes following definitions.
- PMIx_Publish
- PMIx_Publish_nb
- PMIx_Lookup
- PMIx_Lookup_nb
- PMIx_Unpublish
- PMIx_Unpublish_nb
- wait_cbfunc
- op_cbfunc
- wait_lookup_cbfunc
- lookup_cbfunc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 #include <src/include/pmix_config.h>
19
20 #include <src/include/pmix_stdint.h>
21
22 #include <pmix.h>
23 #include <pmix_rename.h>
24
25 #include "src/include/pmix_globals.h"
26
27 #ifdef HAVE_STRING_H
28 #include <string.h>
29 #endif
30 #include <fcntl.h>
31 #ifdef HAVE_UNISTD_H
32 #include <unistd.h>
33 #endif
34 #ifdef HAVE_SYS_SOCKET_H
35 #include <sys/socket.h>
36 #endif
37 #ifdef HAVE_SYS_UN_H
38 #include <sys/un.h>
39 #endif
40 #ifdef HAVE_SYS_UIO_H
41 #include <sys/uio.h>
42 #endif
43 #ifdef HAVE_SYS_TYPES_H
44 #include <sys/types.h>
45 #endif
46 #include PMIX_EVENT_HEADER
47
48 #include "src/class/pmix_list.h"
49 #include "src/threads/threads.h"
50 #include "src/mca/bfrops/bfrops.h"
51 #include "src/util/argv.h"
52 #include "src/util/error.h"
53 #include "src/util/output.h"
54 #include "src/mca/ptl/ptl.h"
55
56 #include "pmix_client_ops.h"
57
58 static void wait_cbfunc(struct pmix_peer_t *pr,
59 pmix_ptl_hdr_t *hdr,
60 pmix_buffer_t *buf, void *cbdata);
61 static void op_cbfunc(pmix_status_t status, void *cbdata);
62 static void wait_lookup_cbfunc(struct pmix_peer_t *pr,
63 pmix_ptl_hdr_t *hdr,
64 pmix_buffer_t *buf, void *cbdata);
65 static void lookup_cbfunc(pmix_status_t status, pmix_pdata_t pdata[], size_t ndata,
66 void *cbdata);
67
68 PMIX_EXPORT pmix_status_t PMIx_Publish(const pmix_info_t info[],
69 size_t ninfo)
70 {
71 pmix_status_t rc;
72 pmix_cb_t *cb;
73
74 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
75
76 pmix_output_verbose(2, pmix_globals.debug_output,
77 "pmix: publish called");
78
79 if (pmix_globals.init_cntr <= 0) {
80 PMIX_RELEASE_THREAD(&pmix_global_lock);
81 return PMIX_ERR_INIT;
82 }
83
84
85 if (!pmix_globals.connected) {
86 PMIX_RELEASE_THREAD(&pmix_global_lock);
87 return PMIX_ERR_UNREACH;
88 }
89 PMIX_RELEASE_THREAD(&pmix_global_lock);
90
91
92 cb = PMIX_NEW(pmix_cb_t);
93
94 if (PMIX_SUCCESS != (rc = PMIx_Publish_nb(info, ninfo, op_cbfunc, cb))) {
95 PMIX_ERROR_LOG(rc);
96 PMIX_RELEASE(cb);
97 return rc;
98 }
99
100
101 PMIX_WAIT_THREAD(&cb->lock);
102 rc = (pmix_status_t)cb->status;
103 PMIX_RELEASE(cb);
104
105 return rc;
106 }
107
108 PMIX_EXPORT pmix_status_t PMIx_Publish_nb(const pmix_info_t info[], size_t ninfo,
109 pmix_op_cbfunc_t cbfunc, void *cbdata)
110 {
111 pmix_buffer_t *msg;
112 pmix_cmd_t cmd = PMIX_PUBLISHNB_CMD;
113 pmix_status_t rc;
114 pmix_cb_t *cb;
115
116 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
117
118 pmix_output_verbose(2, pmix_globals.debug_output,
119 "pmix: publish called");
120
121 if (pmix_globals.init_cntr <= 0) {
122 PMIX_RELEASE_THREAD(&pmix_global_lock);
123 return PMIX_ERR_INIT;
124 }
125
126
127 if (!pmix_globals.connected) {
128 PMIX_RELEASE_THREAD(&pmix_global_lock);
129 return PMIX_ERR_UNREACH;
130 }
131 PMIX_RELEASE_THREAD(&pmix_global_lock);
132
133
134 if (NULL == info) {
135
136 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
137 return PMIX_ERR_BAD_PARAM;
138 }
139
140
141 msg = PMIX_NEW(pmix_buffer_t);
142
143 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
144 msg, &cmd, 1, PMIX_COMMAND);
145 if (PMIX_SUCCESS != rc) {
146 PMIX_ERROR_LOG(rc);
147 PMIX_RELEASE(msg);
148 return rc;
149 }
150
151 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
152 msg, &pmix_globals.uid, 1, PMIX_UINT32);
153 if (PMIX_SUCCESS != rc) {
154 PMIX_ERROR_LOG(rc);
155 PMIX_RELEASE(msg);
156 return rc;
157 }
158
159
160 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
161 msg, &ninfo, 1, PMIX_SIZE);
162 if (PMIX_SUCCESS != rc) {
163 PMIX_ERROR_LOG(rc);
164 PMIX_RELEASE(msg);
165 return rc;
166 }
167 if (0 < ninfo) {
168
169 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
170 msg, info, ninfo, PMIX_INFO);
171 if (PMIX_SUCCESS != rc) {
172 PMIX_ERROR_LOG(rc);
173 PMIX_RELEASE(msg);
174 return rc;
175 }
176 }
177
178
179
180
181 cb = PMIX_NEW(pmix_cb_t);
182 cb->cbfunc.opfn = cbfunc;
183 cb->cbdata = cbdata;
184
185
186 PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
187 msg, wait_cbfunc, (void*)cb);
188 if (PMIX_SUCCESS != rc) {
189 PMIX_RELEASE(msg);
190 PMIX_RELEASE(cb);
191 }
192
193 return rc;
194 }
195
196 PMIX_EXPORT pmix_status_t PMIx_Lookup(pmix_pdata_t pdata[], size_t ndata,
197 const pmix_info_t info[], size_t ninfo)
198 {
199 pmix_status_t rc;
200 pmix_cb_t *cb;
201 char **keys = NULL;
202 size_t i;
203
204 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
205
206 pmix_output_verbose(2, pmix_globals.debug_output,
207 "pmix: lookup called");
208
209 if (pmix_globals.init_cntr <= 0) {
210 PMIX_RELEASE_THREAD(&pmix_global_lock);
211 return PMIX_ERR_INIT;
212 }
213
214
215 if (!pmix_globals.connected) {
216 PMIX_RELEASE_THREAD(&pmix_global_lock);
217 return PMIX_ERR_UNREACH;
218 }
219 PMIX_RELEASE_THREAD(&pmix_global_lock);
220
221
222 if (NULL == pdata) {
223 return PMIX_ERR_BAD_PARAM;
224 }
225
226
227 for (i=0; i < ndata; i++) {
228 if ('\0' != pdata[i].key[0]) {
229 pmix_argv_append_nosize(&keys, pdata[i].key);
230 }
231 }
232
233
234
235
236 cb = PMIX_NEW(pmix_cb_t);
237 cb->cbdata = (void*)pdata;
238 cb->nvals = ndata;
239
240 if (PMIX_SUCCESS != (rc = PMIx_Lookup_nb(keys, info, ninfo,
241 lookup_cbfunc, cb))) {
242 PMIX_RELEASE(cb);
243 pmix_argv_free(keys);
244 return rc;
245 }
246
247
248 PMIX_WAIT_THREAD(&cb->lock);
249
250
251
252 rc = cb->status;
253 PMIX_RELEASE(cb);
254 return rc;
255 }
256
257 PMIX_EXPORT pmix_status_t PMIx_Lookup_nb(char **keys,
258 const pmix_info_t info[], size_t ninfo,
259 pmix_lookup_cbfunc_t cbfunc, void *cbdata)
260 {
261 pmix_buffer_t *msg;
262 pmix_cmd_t cmd = PMIX_LOOKUPNB_CMD;
263 pmix_status_t rc;
264 pmix_cb_t *cb;
265 size_t nkeys, n;
266
267 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
268
269 pmix_output_verbose(2, pmix_globals.debug_output,
270 "pmix: lookup_nb called");
271
272 if (pmix_globals.init_cntr <= 0) {
273 PMIX_RELEASE_THREAD(&pmix_global_lock);
274 return PMIX_ERR_INIT;
275 }
276
277
278 if (!pmix_globals.connected) {
279 PMIX_RELEASE_THREAD(&pmix_global_lock);
280 return PMIX_ERR_UNREACH;
281 }
282 PMIX_RELEASE_THREAD(&pmix_global_lock);
283
284
285 if (NULL == keys) {
286 return PMIX_ERR_BAD_PARAM;
287 }
288
289
290 msg = PMIX_NEW(pmix_buffer_t);
291
292 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
293 msg, &cmd, 1, PMIX_COMMAND);
294 if (PMIX_SUCCESS != rc) {
295 PMIX_ERROR_LOG(rc);
296 PMIX_RELEASE(msg);
297 return rc;
298 }
299
300 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
301 msg, &pmix_globals.uid, 1, PMIX_UINT32);
302 if (PMIX_SUCCESS != rc) {
303 PMIX_ERROR_LOG(rc);
304 PMIX_RELEASE(msg);
305 return rc;
306 }
307
308 nkeys = pmix_argv_count(keys);
309 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
310 msg, &nkeys, 1, PMIX_SIZE);
311 if (PMIX_SUCCESS != rc) {
312 PMIX_ERROR_LOG(rc);
313 PMIX_RELEASE(msg);
314 return rc;
315 }
316 if (0 < nkeys) {
317 for (n=0; n < nkeys; n++) {
318 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
319 msg, &keys[n], 1, PMIX_STRING);
320 if (PMIX_SUCCESS != rc) {
321 PMIX_ERROR_LOG(rc);
322 PMIX_RELEASE(msg);
323 return rc;
324 }
325 }
326 }
327
328
329 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
330 msg, &ninfo, 1, PMIX_SIZE);
331 if (PMIX_SUCCESS != rc) {
332 PMIX_ERROR_LOG(rc);
333 PMIX_RELEASE(msg);
334 return rc;
335 }
336 if (0 < ninfo) {
337
338 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
339 msg, info, ninfo, PMIX_INFO);
340 if (PMIX_SUCCESS != rc) {
341 PMIX_ERROR_LOG(rc);
342 PMIX_RELEASE(msg);
343 return rc;
344 }
345 }
346
347
348
349
350 cb = PMIX_NEW(pmix_cb_t);
351 cb->cbfunc.lookupfn = cbfunc;
352 cb->cbdata = cbdata;
353
354
355 PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
356 msg, wait_lookup_cbfunc, (void*)cb);
357 if (PMIX_SUCCESS != rc) {
358 PMIX_RELEASE(msg);
359 PMIX_RELEASE(cb);
360 }
361
362 return rc;
363 }
364
365 PMIX_EXPORT pmix_status_t PMIx_Unpublish(char **keys,
366 const pmix_info_t info[],
367 size_t ninfo)
368 {
369 pmix_status_t rc;
370 pmix_cb_t *cb;
371
372 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
373
374 pmix_output_verbose(2, pmix_globals.debug_output,
375 "pmix: unpublish called");
376
377 if (pmix_globals.init_cntr <= 0) {
378 PMIX_RELEASE_THREAD(&pmix_global_lock);
379 return PMIX_ERR_INIT;
380 }
381
382
383 if (!pmix_globals.connected) {
384 PMIX_RELEASE_THREAD(&pmix_global_lock);
385 return PMIX_ERR_UNREACH;
386 }
387 PMIX_RELEASE_THREAD(&pmix_global_lock);
388
389
390
391
392 cb = PMIX_NEW(pmix_cb_t);
393
394
395 if (PMIX_SUCCESS != (rc = PMIx_Unpublish_nb(keys, info, ninfo, op_cbfunc, cb))) {
396 PMIX_RELEASE(cb);
397 return rc;
398 }
399
400
401 PMIX_WAIT_THREAD(&cb->lock);
402 rc = cb->status;
403 PMIX_RELEASE(cb);
404
405 return rc;
406 }
407
408 PMIX_EXPORT pmix_status_t PMIx_Unpublish_nb(char **keys,
409 const pmix_info_t info[], size_t ninfo,
410 pmix_op_cbfunc_t cbfunc, void *cbdata)
411 {
412 pmix_buffer_t *msg;
413 pmix_cmd_t cmd = PMIX_UNPUBLISHNB_CMD;
414 pmix_status_t rc;
415 pmix_cb_t *cb;
416 size_t i, j;
417
418 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
419
420 pmix_output_verbose(2, pmix_globals.debug_output,
421 "pmix: unpublish called");
422
423 if (pmix_globals.init_cntr <= 0) {
424 PMIX_RELEASE_THREAD(&pmix_global_lock);
425 return PMIX_ERR_INIT;
426 }
427
428
429 if (!pmix_globals.connected) {
430 PMIX_RELEASE_THREAD(&pmix_global_lock);
431 return PMIX_ERR_UNREACH;
432 }
433 PMIX_RELEASE_THREAD(&pmix_global_lock);
434
435
436 msg = PMIX_NEW(pmix_buffer_t);
437
438 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
439 msg, &cmd, 1, PMIX_COMMAND);
440 if (PMIX_SUCCESS != rc) {
441 PMIX_ERROR_LOG(rc);
442 PMIX_RELEASE(msg);
443 return rc;
444 }
445
446 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
447 msg, &pmix_globals.uid, 1, PMIX_UINT32);
448 if (PMIX_SUCCESS != rc) {
449 PMIX_ERROR_LOG(rc);
450 PMIX_RELEASE(msg);
451 return rc;
452 }
453
454 i = pmix_argv_count(keys);
455 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
456 msg, &i, 1, PMIX_SIZE);
457 if (PMIX_SUCCESS != rc) {
458 PMIX_ERROR_LOG(rc);
459 PMIX_RELEASE(msg);
460 return rc;
461 }
462 if (0 < i) {
463 for (j=0; j < i; j++) {
464 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
465 msg, &keys[j], 1, PMIX_STRING);
466 if (PMIX_SUCCESS != rc) {
467 PMIX_ERROR_LOG(rc);
468 PMIX_RELEASE(msg);
469 return rc;
470 }
471 }
472 }
473
474
475 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
476 msg, &ninfo, 1, PMIX_SIZE);
477 if (PMIX_SUCCESS != rc) {
478 PMIX_ERROR_LOG(rc);
479 PMIX_RELEASE(msg);
480 return rc;
481 }
482 if (0 < ninfo) {
483
484 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
485 msg, info, ninfo, PMIX_INFO);
486 if (PMIX_SUCCESS != rc) {
487 PMIX_ERROR_LOG(rc);
488 PMIX_RELEASE(msg);
489 return rc;
490 }
491 }
492
493
494 cb = PMIX_NEW(pmix_cb_t);
495 cb->cbfunc.opfn = cbfunc;
496 cb->cbdata = cbdata;
497
498
499 PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
500 msg, wait_cbfunc, (void*)cb);
501 if (PMIX_SUCCESS != rc) {
502 PMIX_RELEASE(msg);
503 PMIX_RELEASE(cb);
504 }
505
506 return rc;
507 }
508
509 static void wait_cbfunc(struct pmix_peer_t *pr,
510 pmix_ptl_hdr_t *hdr,
511 pmix_buffer_t *buf, void *cbdata)
512 {
513 pmix_cb_t *cb = (pmix_cb_t*)cbdata;
514 pmix_status_t rc;
515 int ret;
516 int32_t cnt;
517
518 PMIX_ACQUIRE_OBJECT(cb);
519
520 pmix_output_verbose(2, pmix_globals.debug_output,
521 "pmix:client recv callback activated with %d bytes",
522 (NULL == buf) ? -1 : (int)buf->bytes_used);
523
524 if (NULL == buf) {
525 rc = PMIX_ERR_BAD_PARAM;
526 goto report;
527 }
528
529
530 if (PMIX_BUFFER_IS_EMPTY(buf)) {
531 rc = PMIX_ERR_UNREACH;
532 goto report;
533 }
534
535
536 cnt = 1;
537 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
538 buf, &ret, &cnt, PMIX_STATUS);
539 if (PMIX_SUCCESS != rc) {
540 PMIX_ERROR_LOG(rc);
541 }
542
543 report:
544 if (NULL != cb->cbfunc.opfn) {
545 cb->cbfunc.opfn(rc, cb->cbdata);
546 }
547 PMIX_RELEASE(cb);
548 }
549
550 static void op_cbfunc(pmix_status_t status, void *cbdata)
551 {
552 pmix_cb_t *cb = (pmix_cb_t*)cbdata;
553
554 cb->status = status;
555 PMIX_POST_OBJECT(cb);
556 PMIX_WAKEUP_THREAD(&cb->lock);
557 }
558
559 static void wait_lookup_cbfunc(struct pmix_peer_t *pr,
560 pmix_ptl_hdr_t *hdr,
561 pmix_buffer_t *buf, void *cbdata)
562 {
563 pmix_cb_t *cb = (pmix_cb_t*)cbdata;
564 pmix_status_t rc, ret;
565 int32_t cnt;
566 pmix_pdata_t *pdata;
567 size_t ndata;
568
569 PMIX_ACQUIRE_OBJECT(cb);
570
571 pmix_output_verbose(2, pmix_globals.debug_output,
572 "pmix:client recv callback activated with %d bytes",
573 (NULL == buf) ? -1 : (int)buf->bytes_used);
574
575
576 pdata = NULL;
577 ndata = 0;
578
579 if (NULL == cb->cbfunc.lookupfn) {
580
581 PMIX_RELEASE(cb);
582 return;
583 }
584 if (NULL == buf) {
585 rc = PMIX_ERR_BAD_PARAM;
586 goto report;
587 }
588
589
590 if (PMIX_BUFFER_IS_EMPTY(buf)) {
591 rc = PMIX_ERR_UNREACH;
592 goto report;
593 }
594
595
596 cnt = 1;
597 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
598 buf, &ret, &cnt, PMIX_STATUS);
599 if (PMIX_SUCCESS != rc) {
600 PMIX_ERROR_LOG(rc);
601 ret = rc;
602 }
603 if (PMIX_SUCCESS != ret) {
604 if (NULL != cb->cbfunc.lookupfn) {
605 cb->cbfunc.lookupfn(ret, NULL, 0, cb->cbdata);
606 }
607 PMIX_RELEASE(cb);
608 return;
609 }
610
611
612 cnt = 1;
613 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
614 buf, &ndata, &cnt, PMIX_SIZE);
615 if (PMIX_SUCCESS != rc) {
616 PMIX_ERROR_LOG(rc);
617 PMIX_RELEASE(cb);
618 return;
619 }
620 if (0 < ndata) {
621
622 PMIX_PDATA_CREATE(pdata, ndata);
623 cnt = ndata;
624
625 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
626 buf, pdata, &cnt, PMIX_PDATA);
627 if (PMIX_SUCCESS != rc) {
628 PMIX_ERROR_LOG(rc);
629 goto cleanup;
630 }
631 }
632
633 report:
634 if (NULL != cb->cbfunc.lookupfn) {
635 cb->cbfunc.lookupfn(rc, pdata, ndata, cb->cbdata);
636 }
637
638 cleanup:
639
640 if (NULL != pdata) {
641 PMIX_PDATA_FREE(pdata, ndata);
642 }
643
644 PMIX_RELEASE(cb);
645 }
646
647 static void lookup_cbfunc(pmix_status_t status, pmix_pdata_t pdata[], size_t ndata,
648 void *cbdata)
649 {
650 pmix_cb_t *cb = (pmix_cb_t*)cbdata;
651 pmix_pdata_t *tgt = (pmix_pdata_t*)cb->cbdata;
652 size_t i, j;
653
654 PMIX_ACQUIRE_OBJECT(cb);
655 cb->status = status;
656 if (PMIX_SUCCESS == status) {
657
658 for (i=0; i < ndata; i++) {
659 for (j=0; j < cb->nvals; j++) {
660 if (0 == strcmp(pdata[i].key, tgt[j].key)) {
661
662 pmix_strncpy(tgt[j].proc.nspace, pdata[i].proc.nspace, PMIX_MAX_NSLEN);
663 tgt[j].proc.rank = pdata[i].proc.rank;
664
665 PMIX_BFROPS_VALUE_XFER(cb->status, pmix_client_globals.myserver, &tgt[j].value, &pdata[i].value);
666 break;
667 }
668 }
669 }
670 }
671 PMIX_POST_OBJECT(cb);
672 PMIX_WAKEUP_THREAD(&cb->lock);
673 }