This source file includes following definitions.
- gtcon
- gtdes
- PMIx_Group_construct
- PMIx_Group_construct_nb
- PMIx_Group_destruct
- PMIx_Group_destruct_nb
- chaincbfunc
- relcbfunc
- invite_handler
- regcbfunc
- PMIx_Group_invite
- PMIx_Group_invite_nb
- PMIx_Group_join
- PMIx_Group_join_nb
- op_cbfunc
- relfn
- grp_cbfunc
- info_cbfunc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 #include <src/include/pmix_config.h>
19
20 #include <src/include/pmix_stdint.h>
21
22 #include <pmix.h>
23 #include <pmix_rename.h>
24
25 #include "src/include/pmix_globals.h"
26 #include "src/mca/gds/base/base.h"
27
28 #ifdef HAVE_STRING_H
29 #include <string.h>
30 #endif
31 #include <fcntl.h>
32 #ifdef HAVE_UNISTD_H
33 #include <unistd.h>
34 #endif
35 #ifdef HAVE_SYS_SOCKET_H
36 #include <sys/socket.h>
37 #endif
38 #ifdef HAVE_SYS_UN_H
39 #include <sys/un.h>
40 #endif
41 #ifdef HAVE_SYS_UIO_H
42 #include <sys/uio.h>
43 #endif
44 #ifdef HAVE_SYS_TYPES_H
45 #include <sys/types.h>
46 #endif
47 #include PMIX_EVENT_HEADER
48
49 #include "src/class/pmix_list.h"
50 #include "src/mca/bfrops/bfrops.h"
51 #include "src/util/argv.h"
52 #include "src/util/error.h"
53 #include "src/util/output.h"
54 #include "src/threads/threads.h"
55 #include "src/mca/gds/gds.h"
56 #include "src/mca/ptl/ptl.h"
57
58 #include "pmix_client_ops.h"
59
60
61 typedef struct {
62 pmix_object_t super;
63 pmix_lock_t lock;
64 pmix_status_t status;
65 size_t ref;
66 size_t accepted;
67 pmix_proc_t *members;
68 size_t nmembers;
69 pmix_info_t *info;
70 size_t ninfo;
71 pmix_info_t *results;
72 size_t nresults;
73 pmix_op_cbfunc_t opcbfunc;
74 pmix_info_cbfunc_t cbfunc;
75 void *cbdata;
76 } pmix_group_tracker_t;
77
78 static void gtcon(pmix_group_tracker_t *p)
79 {
80 PMIX_CONSTRUCT_LOCK(&p->lock);
81 p->status = PMIX_SUCCESS;
82 p->ref = 0;
83 p->accepted = 0;
84 p->members = NULL;
85 p->nmembers = 0;
86 p->info = NULL;
87 p->ninfo = 0;
88 p->results = NULL;
89 p->nresults = 0;
90 p->cbfunc = NULL;
91 p->opcbfunc = NULL;
92 p->cbdata = NULL;
93 }
94 static void gtdes(pmix_group_tracker_t *p)
95 {
96 PMIX_DESTRUCT_LOCK(&p->lock);
97 if (NULL != p->members) {
98 PMIX_PROC_FREE(p->members, p->nmembers);
99 }
100 if (NULL != p->info) {
101 PMIX_INFO_FREE(p->info, p->ninfo);
102 }
103 }
104 PMIX_CLASS_INSTANCE(pmix_group_tracker_t,
105 pmix_object_t,
106 gtcon, gtdes);
107
108
109 static void grp_cbfunc(struct pmix_peer_t *pr,
110 pmix_ptl_hdr_t *hdr,
111 pmix_buffer_t *buf, void *cbdata);
112 static void op_cbfunc(pmix_status_t status, void *cbdata);
113
114 static void info_cbfunc(pmix_status_t status,
115 pmix_info_t *info, size_t ninfo,
116 void *cbdata,
117 pmix_release_cbfunc_t release_fn,
118 void *release_cbdata);
119
120
121 PMIX_EXPORT pmix_status_t PMIx_Group_construct(const char grp[],
122 const pmix_proc_t procs[], size_t nprocs,
123 const pmix_info_t info[], size_t ninfo,
124 pmix_info_t **results, size_t *nresults)
125 {
126 pmix_status_t rc;
127 pmix_group_tracker_t *cb;
128
129 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
130
131 pmix_output_verbose(2, pmix_client_globals.connect_output,
132 "pmix: group_construct called");
133
134 if (pmix_globals.init_cntr <= 0) {
135 PMIX_RELEASE_THREAD(&pmix_global_lock);
136 return PMIX_ERR_INIT;
137 }
138
139
140 if (!pmix_globals.connected) {
141 PMIX_RELEASE_THREAD(&pmix_global_lock);
142 return PMIX_ERR_UNREACH;
143 }
144 PMIX_RELEASE_THREAD(&pmix_global_lock);
145
146
147
148
149 cb = PMIX_NEW(pmix_group_tracker_t);
150
151
152 if (PMIX_SUCCESS != (rc = PMIx_Group_construct_nb(grp, procs, nprocs, info, ninfo, info_cbfunc, cb))) {
153 PMIX_RELEASE(cb);
154 return rc;
155 }
156
157
158 PMIX_WAIT_THREAD(&cb->lock);
159 rc = cb->status;
160
161 *results = cb->results;
162 *nresults = cb->nresults;
163 PMIX_RELEASE(cb);
164
165 pmix_output_verbose(2, pmix_globals.debug_output,
166 "pmix: group construct completed");
167
168 return rc;
169 }
170
171 PMIX_EXPORT pmix_status_t PMIx_Group_construct_nb(const char grp[],
172 const pmix_proc_t procs[], size_t nprocs,
173 const pmix_info_t info[], size_t ninfo,
174 pmix_info_cbfunc_t cbfunc, void *cbdata)
175 {
176 pmix_buffer_t *msg = NULL;
177 pmix_cmd_t cmd = PMIX_GROUP_CONSTRUCT_CMD;
178 pmix_status_t rc;
179 pmix_group_tracker_t *cb = NULL;
180 size_t n, num;
181 bool embed = true;
182 pmix_info_t *iptr = NULL;
183
184 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
185
186 pmix_output_verbose(2, pmix_client_globals.connect_output,
187 "pmix:group_construct_nb called");
188
189 if (pmix_globals.init_cntr <= 0) {
190 PMIX_RELEASE_THREAD(&pmix_global_lock);
191 return PMIX_ERR_INIT;
192 }
193
194
195 if (!pmix_globals.connected) {
196 PMIX_RELEASE_THREAD(&pmix_global_lock);
197 return PMIX_ERR_UNREACH;
198 }
199 PMIX_RELEASE_THREAD(&pmix_global_lock);
200
201
202 if (NULL == procs || 0 >= nprocs) {
203 return PMIX_ERR_BAD_PARAM;
204 }
205
206
207
208
209 for (n=0; n < ninfo; n++) {
210 if (PMIX_CHECK_KEY(&info[n], PMIX_EMBED_BARRIER)) {
211 embed = PMIX_INFO_TRUE(&info[n]);
212 break;
213 }
214 }
215 if (embed) {
216 PMIX_INFO_CREATE(iptr, ninfo + 1);
217 num = ninfo + 1;
218 for (n=0; n < ninfo; n++) {
219 PMIX_INFO_XFER(&iptr[n], &info[n]);
220 }
221 PMIX_INFO_LOAD(&iptr[ninfo], PMIX_EMBED_BARRIER, &embed, PMIX_BOOL);
222 } else {
223 iptr = (pmix_info_t*)info;
224 num = ninfo;
225 }
226
227 msg = PMIX_NEW(pmix_buffer_t);
228
229 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
230 msg, &cmd, 1, PMIX_COMMAND);
231 if (PMIX_SUCCESS != rc) {
232 PMIX_ERROR_LOG(rc);
233 goto done;
234 }
235
236
237 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
238 msg, &grp, 1, PMIX_STRING);
239 if (PMIX_SUCCESS != rc) {
240 PMIX_ERROR_LOG(rc);
241 goto done;
242 }
243
244
245 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
246 msg, &nprocs, 1, PMIX_SIZE);
247 if (PMIX_SUCCESS != rc) {
248 PMIX_ERROR_LOG(rc);
249 goto done;
250 }
251 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
252 msg, procs, nprocs, PMIX_PROC);
253 if (PMIX_SUCCESS != rc) {
254 PMIX_ERROR_LOG(rc);
255 goto done;
256 }
257
258
259 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
260 msg, &num, 1, PMIX_SIZE);
261 if (PMIX_SUCCESS != rc) {
262 PMIX_ERROR_LOG(rc);
263 PMIX_RELEASE(msg);
264 goto done;
265 }
266 if (0 < num) {
267 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
268 msg, iptr, num, PMIX_INFO);
269 if (PMIX_SUCCESS != rc) {
270 PMIX_ERROR_LOG(rc);
271 PMIX_RELEASE(msg);
272 goto done;
273 }
274 }
275
276
277
278
279 cb = PMIX_NEW(pmix_group_tracker_t);
280 cb->cbfunc = cbfunc;
281 cb->cbdata = cbdata;
282
283
284 PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
285 msg, grp_cbfunc, (void*)cb);
286 if (PMIX_SUCCESS != rc) {
287 PMIX_RELEASE(cb);
288 }
289
290 done:
291 if (embed && NULL != iptr) {
292 PMIX_INFO_FREE(iptr, num);
293 }
294 if (PMIX_SUCCESS != rc && NULL != msg) {
295 PMIX_RELEASE(msg);
296 }
297 return rc;
298 }
299
300 PMIX_EXPORT pmix_status_t PMIx_Group_destruct(const char grp[],
301 const pmix_info_t info[], size_t ninfo)
302 {
303 pmix_status_t rc;
304 pmix_group_tracker_t cb;
305
306 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
307
308 pmix_output_verbose(2, pmix_client_globals.connect_output,
309 "pmix: group_destruct called");
310
311 if (pmix_globals.init_cntr <= 0) {
312 PMIX_RELEASE_THREAD(&pmix_global_lock);
313 return PMIX_ERR_INIT;
314 }
315
316
317 if (!pmix_globals.connected) {
318 PMIX_RELEASE_THREAD(&pmix_global_lock);
319 return PMIX_ERR_UNREACH;
320 }
321 PMIX_RELEASE_THREAD(&pmix_global_lock);
322
323
324
325
326 PMIX_CONSTRUCT(&cb, pmix_group_tracker_t);
327
328
329 if (PMIX_SUCCESS != (rc = PMIx_Group_destruct_nb(grp, info, ninfo, op_cbfunc, (void*)&cb))) {
330 PMIX_ERROR_LOG(rc);
331 PMIX_DESTRUCT(&cb);
332 return rc;
333 }
334
335
336 PMIX_WAIT_THREAD(&cb.lock);
337 rc = cb.status;
338 PMIX_DESTRUCT(&cb);
339
340 pmix_output_verbose(2, pmix_client_globals.connect_output,
341 "pmix: group destruct completed");
342
343 return rc;
344 }
345
346 PMIX_EXPORT pmix_status_t PMIx_Group_destruct_nb(const char grp[],
347 const pmix_info_t info[], size_t ninfo,
348 pmix_op_cbfunc_t cbfunc, void *cbdata)
349 {
350 pmix_buffer_t *msg = NULL;
351 pmix_cmd_t cmd = PMIX_GROUP_DESTRUCT_CMD;
352 pmix_status_t rc;
353 pmix_group_tracker_t *cb = NULL;
354 size_t n, num;
355 bool embed = true;
356 pmix_info_t *iptr = NULL;
357
358 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
359
360 pmix_output_verbose(2, pmix_client_globals.connect_output,
361 "pmix:group_destruct_nb called");
362
363 if (pmix_globals.init_cntr <= 0) {
364 PMIX_RELEASE_THREAD(&pmix_global_lock);
365 return PMIX_ERR_INIT;
366 }
367
368
369 if (!pmix_globals.connected) {
370 PMIX_RELEASE_THREAD(&pmix_global_lock);
371 return PMIX_ERR_UNREACH;
372 }
373 PMIX_RELEASE_THREAD(&pmix_global_lock);
374
375
376 if (NULL == grp) {
377 return PMIX_ERR_BAD_PARAM;
378 }
379
380
381
382
383 for (n=0; n < ninfo; n++) {
384 if (PMIX_CHECK_KEY(&info[n], PMIX_EMBED_BARRIER)) {
385 embed = PMIX_INFO_TRUE(&info[n]);
386 break;
387 }
388 }
389 if (embed) {
390 PMIX_INFO_CREATE(iptr, ninfo + 1);
391 num = ninfo + 1;
392 for (n=0; n < ninfo; n++) {
393 PMIX_INFO_XFER(&iptr[n], &info[n]);
394 }
395 PMIX_INFO_LOAD(&iptr[ninfo], PMIX_EMBED_BARRIER, &embed, PMIX_BOOL);
396 } else {
397 iptr = (pmix_info_t*)info;
398 num = ninfo;
399 }
400
401 msg = PMIX_NEW(pmix_buffer_t);
402
403 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
404 msg, &cmd, 1, PMIX_COMMAND);
405 if (PMIX_SUCCESS != rc) {
406 PMIX_ERROR_LOG(rc);
407 goto done;
408 }
409
410
411 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
412 msg, &grp, 1, PMIX_STRING);
413 if (PMIX_SUCCESS != rc) {
414 PMIX_ERROR_LOG(rc);
415 goto done;
416 }
417
418
419 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
420 msg, &ninfo, 1, PMIX_SIZE);
421 if (PMIX_SUCCESS != rc) {
422 PMIX_ERROR_LOG(rc);
423 PMIX_RELEASE(msg);
424 goto done;
425 }
426 if (0 < ninfo) {
427 PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver,
428 msg, info, ninfo, PMIX_INFO);
429 if (PMIX_SUCCESS != rc) {
430 PMIX_ERROR_LOG(rc);
431 PMIX_RELEASE(msg);
432 goto done;
433 }
434 }
435
436
437
438
439 cb = PMIX_NEW(pmix_group_tracker_t);
440 cb->opcbfunc = cbfunc;
441 cb->cbdata = cbdata;
442
443
444 PMIX_PTL_SEND_RECV(rc, pmix_client_globals.myserver,
445 msg, grp_cbfunc, (void*)cb);
446 if (PMIX_SUCCESS != rc) {
447 PMIX_RELEASE(cb);
448 }
449
450 done:
451 if (embed && NULL != iptr) {
452 PMIX_INFO_FREE(iptr, num);
453 }
454 if (PMIX_SUCCESS != rc && NULL != msg) {
455 PMIX_RELEASE(msg);
456 }
457 return rc;
458 }
459
460 static void chaincbfunc(pmix_status_t status, void *cbdata)
461 {
462 pmix_group_tracker_t *cb = (pmix_group_tracker_t*)cbdata;
463
464 if (NULL != cb) {
465 PMIX_RELEASE(cb);
466 }
467 }
468
469 static void relcbfunc(void *cbdata)
470 {
471 pmix_group_tracker_t *cb = (pmix_group_tracker_t*)cbdata;
472
473 PMIX_RELEASE(cb);
474 }
475
476 static void invite_handler(size_t evhdlr_registration_id,
477 pmix_status_t status,
478 const pmix_proc_t *source,
479 pmix_info_t info[], size_t ninfo,
480 pmix_info_t *results, size_t nresults,
481 pmix_event_notification_cbfunc_fn_t cbfunc,
482 void *cbdata)
483 {
484 pmix_group_tracker_t *cb = NULL;
485 pmix_proc_t *affected = NULL;
486 size_t n;
487 pmix_status_t rc = PMIX_GROUP_INVITE_DECLINED;
488 size_t contextid = SIZE_MAX;
489
490
491 for (n=0; n < ninfo; n++) {
492 if (PMIX_CHECK_KEY(&info[n], PMIX_EVENT_RETURN_OBJECT)) {
493 if (PMIX_POINTER != info[n].value.type) {
494
495 }
496 cb = (pmix_group_tracker_t*)info[n].value.data.ptr;
497 } else if (PMIX_CHECK_KEY(&info[n], PMIX_EVENT_AFFECTED_PROC)) {
498 if (PMIX_PROC != info[n].value.type) {
499
500 }
501 affected = info[n].value.data.proc;
502 } else if (PMIX_CHECK_KEY(&info[n], PMIX_GROUP_CONTEXT_ID)) {
503 PMIX_VALUE_GET_NUMBER(rc, &info[n].value, contextid, size_t);
504 }
505 }
506 if (NULL == cb) {
507 pmix_output(0, "[%s:%d] INVITE HANDLER NULL OBJECT", pmix_globals.myid.nspace, pmix_globals.myid.rank);
508
509
510 cbfunc(rc, NULL, 0, chaincbfunc, NULL, cbdata);
511 return;
512 }
513
514
515 if (PMIX_GROUP_INVITE_ACCEPTED == status) {
516 cb->accepted++;
517
518 rc = PMIX_SUCCESS;
519 goto complete;
520 }
521
522
523
524 if (PMIX_PROC_TERMINATED == status) {
525 cb->ninfo = 2;
526 PMIX_INFO_CREATE(cb->info, cb->ninfo);
527 PMIX_INFO_LOAD(&cb->info[0], PMIX_EVENT_AFFECTED_PROC, affected, PMIX_PROC);
528 PMIX_INFO_LOAD(&cb->info[1], PMIX_GROUP_CONTEXT_ID, &contextid, PMIX_SIZE);
529 rc = PMIx_Notify_event(PMIX_GROUP_INVITE_FAILED,
530 &pmix_globals.myid,
531 PMIX_RANGE_PROC_LOCAL,
532 cb->info, cb->ninfo,
533 chaincbfunc, (void*)cb);
534 if (PMIX_SUCCESS != rc) {
535
536 pmix_output(0, "[%s:%d] INVITE HANDLER ERROR", pmix_globals.myid.nspace, pmix_globals.myid.rank);
537 }
538 PMIX_INFO_FREE(cb->info, cb->ninfo);
539 goto complete;
540 }
541
542
543
544
545
546
547 complete:
548
549 if (cb->accepted == cb->nmembers) {
550
551 if (NULL != cb->cbfunc) {
552 cb->cbfunc(PMIX_SUCCESS, NULL, 0, cb->cbdata, relcbfunc, cb);
553 rc = PMIX_EVENT_ACTION_COMPLETE;
554 }
555 }
556
557
558 cbfunc(rc, cb->results, cb->nresults, chaincbfunc, cb, cbdata);
559 return;
560 }
561
562 static void regcbfunc(pmix_status_t status,
563 size_t refid,
564 void *cbdata)
565 {
566 pmix_cb_t *cb = (pmix_cb_t*)cbdata;
567
568 cb->status = status;
569 cb->errhandler_ref = refid;
570 PMIX_WAKEUP_THREAD(&cb->lock);
571 }
572
573 PMIX_EXPORT pmix_status_t PMIx_Group_invite(const char grp[],
574 const pmix_proc_t procs[], size_t nprocs,
575 const pmix_info_t info[], size_t ninfo,
576 pmix_info_t **results, size_t *nresults)
577 {
578 pmix_group_tracker_t cb;
579 pmix_status_t rc;
580 size_t n;
581
582 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
583 if (pmix_globals.init_cntr <= 0) {
584 PMIX_RELEASE_THREAD(&pmix_global_lock);
585 return PMIX_ERR_INIT;
586 }
587
588
589 if (!pmix_globals.connected) {
590 PMIX_RELEASE_THREAD(&pmix_global_lock);
591 return PMIX_ERR_UNREACH;
592 }
593 PMIX_RELEASE_THREAD(&pmix_global_lock);
594
595
596 if (NULL == grp || NULL == procs) {
597 return PMIX_ERR_BAD_PARAM;
598 }
599
600 PMIX_CONSTRUCT(&cb, pmix_group_tracker_t);
601
602 rc = PMIx_Group_invite_nb(grp, procs, nprocs, info, ninfo, info_cbfunc, (void*)&cb);
603 if (PMIX_SUCCESS != rc) {
604 PMIX_DESTRUCT(&cb);
605 return rc;
606 }
607
608
609 PMIX_WAIT_THREAD(&cb.lock);
610 rc = cb.status;
611 *results = cb.results;
612 *nresults = cb.nresults;
613 PMIX_DESTRUCT(&cb);
614
615
616
617
618 PMIX_CONSTRUCT(&cb, pmix_group_tracker_t);
619 PMIX_INFO_CREATE(cb.info, 3);
620 if (NULL == cb.info) {
621 PMIX_DESTRUCT(&cb);
622 return PMIX_ERR_NOMEM;
623 }
624 cb.ninfo = 3;
625 n = 0;
626 (void)strncpy(cb.info[n].key, PMIX_EVENT_CUSTOM_RANGE, PMIX_MAX_KEYLEN);
627 cb.info[n].value.type = PMIX_DATA_ARRAY;
628 PMIX_DATA_ARRAY_CREATE(cb.info[n].value.data.darray, nprocs, PMIX_PROC);
629 if (NULL == cb.info[n].value.data.darray ||
630 NULL == cb.info[n].value.data.darray->array) {
631 PMIX_DESTRUCT(&cb);
632 return PMIX_ERR_NOMEM;
633 }
634 memcpy(cb.info[n].value.data.darray->array, procs, nprocs * sizeof(pmix_proc_t));
635 ++n;
636
637 PMIX_INFO_LOAD(&cb.info[n], PMIX_EVENT_NON_DEFAULT, NULL, PMIX_BOOL);
638 ++n;
639
640 PMIX_INFO_LOAD(&cb.info[n], PMIX_GROUP_ID, grp, PMIX_STRING);
641
642
643 rc = PMIx_Notify_event(PMIX_GROUP_CONSTRUCT_COMPLETE,
644 &pmix_globals.myid,
645 PMIX_RANGE_CUSTOM,
646 cb.info, cb.ninfo,
647 op_cbfunc, (void*)&cb);
648 if (PMIX_SUCCESS != rc) {
649 PMIX_DESTRUCT(&cb);
650 return rc;
651 }
652
653
654 PMIX_WAIT_THREAD(&cb.lock);
655 rc = cb.status;
656 PMIX_DESTRUCT(&cb);
657 return rc;
658 }
659
660 PMIX_EXPORT pmix_status_t PMIx_Group_invite_nb(const char grp[],
661 const pmix_proc_t procs[], size_t nprocs,
662 const pmix_info_t info[], size_t ninfo,
663 pmix_info_cbfunc_t cbfunc, void *cbdata)
664 {
665 pmix_group_tracker_t *cb;
666 pmix_cb_t lock;
667 pmix_status_t codes[] = {
668 PMIX_GROUP_INVITE_ACCEPTED,
669 PMIX_GROUP_INVITE_DECLINED,
670 PMIX_PROC_TERMINATED
671 };
672 size_t ncodes, n;
673 pmix_info_t myinfo[2];
674 pmix_status_t rc;
675
676 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
677 if (pmix_globals.init_cntr <= 0) {
678 PMIX_RELEASE_THREAD(&pmix_global_lock);
679 return PMIX_ERR_INIT;
680 }
681
682
683 if (!pmix_globals.connected) {
684 PMIX_RELEASE_THREAD(&pmix_global_lock);
685 return PMIX_ERR_UNREACH;
686 }
687 PMIX_RELEASE_THREAD(&pmix_global_lock);
688
689
690 if (NULL == grp || NULL == procs) {
691 return PMIX_ERR_BAD_PARAM;
692 }
693
694 cb = PMIX_NEW(pmix_group_tracker_t);
695 if (NULL == cb) {
696 return PMIX_ERR_NOMEM;
697 }
698 cb->cbfunc = cbfunc;
699 cb->cbdata = cbdata;
700 cb->accepted = 1;
701
702
703 for (n=0; n < nprocs; n++) {
704 if (PMIX_RANK_WILDCARD == procs[n].rank) {
705
706 } else {
707 cb->nmembers++;
708 }
709 }
710
711
712
713 PMIX_INFO_LOAD(&myinfo[0], PMIX_EVENT_RETURN_OBJECT, cb, PMIX_POINTER);
714 PMIX_INFO_LOAD(&myinfo[1], PMIX_EVENT_HDLR_PREPEND, NULL, PMIX_BOOL);
715 ncodes = sizeof(codes)/sizeof(pmix_status_t);
716 PMIX_CONSTRUCT(&lock, pmix_cb_t);
717 PMIx_Register_event_handler(codes, ncodes, myinfo, 2,
718 invite_handler, regcbfunc, &lock);
719 PMIX_WAIT_THREAD(&lock.lock);
720 rc = lock.status;
721 cb->ref = lock.errhandler_ref;
722 PMIX_DESTRUCT(&lock);
723 PMIX_INFO_DESTRUCT(&myinfo[0]);
724 PMIX_INFO_DESTRUCT(&myinfo[1]);
725 if (PMIX_SUCCESS != rc) {
726 PMIX_RELEASE(cb);
727 return rc;
728 }
729
730
731 if (NULL != info) {
732 for (n=0; n < ninfo; n++) {
733 if (PMIX_CHECK_KEY(&info[n], PMIX_TIMEOUT)) {
734
735 break;
736 }
737 }
738 }
739
740
741 PMIX_INFO_CREATE(cb->info, 3);
742 if (NULL == cb->info) {
743 PMIX_CONSTRUCT(&lock, pmix_cb_t);
744 PMIx_Deregister_event_handler(cb->ref,
745 op_cbfunc, &lock);
746 PMIX_WAIT_THREAD(&lock.lock);
747 PMIX_DESTRUCT(&lock);
748 PMIX_RELEASE(cb);
749 return PMIX_ERR_NOMEM;
750 }
751 cb->ninfo = 3;
752 n = 0;
753 (void)strncpy(cb->info[n].key, PMIX_EVENT_CUSTOM_RANGE, PMIX_MAX_KEYLEN);
754 cb->info[n].value.type = PMIX_DATA_ARRAY;
755 PMIX_DATA_ARRAY_CREATE(cb->info[n].value.data.darray, nprocs, PMIX_PROC);
756 if (NULL == cb->info[n].value.data.darray ||
757 NULL == cb->info[n].value.data.darray->array) {
758 PMIX_CONSTRUCT(&lock, pmix_cb_t);
759 PMIx_Deregister_event_handler(cb->ref,
760 op_cbfunc, &lock);
761 PMIX_WAIT_THREAD(&lock.lock);
762 PMIX_DESTRUCT(&lock);
763 PMIX_RELEASE(cb);
764 return PMIX_ERR_NOMEM;
765 }
766 memcpy(cb->info[n].value.data.darray->array, procs, nprocs * sizeof(pmix_proc_t));
767 ++n;
768
769 PMIX_INFO_LOAD(&cb->info[n], PMIX_EVENT_NON_DEFAULT, NULL, PMIX_BOOL);
770 ++n;
771
772 PMIX_INFO_LOAD(&cb->info[n], PMIX_GROUP_ID, grp, PMIX_STRING);
773
774
775 PMIX_CONSTRUCT(&lock, pmix_cb_t);
776 rc = PMIx_Notify_event(PMIX_GROUP_INVITED,
777 &pmix_globals.myid,
778 PMIX_RANGE_CUSTOM,
779 cb->info, cb->ninfo,
780 op_cbfunc, (void*)&lock);
781 PMIX_WAIT_THREAD(&lock.lock);
782 rc = lock.status;
783 PMIX_DESTRUCT(&lock);
784 if (PMIX_SUCCESS != rc) {
785 PMIX_CONSTRUCT(&lock, pmix_cb_t);
786 PMIx_Deregister_event_handler(cb->ref,
787 op_cbfunc, &lock);
788 PMIX_WAIT_THREAD(&lock.lock);
789 PMIX_DESTRUCT(&lock);
790 PMIX_RELEASE(cb);
791 }
792
793 return rc;
794 }
795
796 PMIX_EXPORT pmix_status_t PMIx_Group_join(const char grp[],
797 const pmix_proc_t *leader,
798 pmix_group_opt_t opt,
799 const pmix_info_t info[], size_t ninfo,
800 pmix_info_t **results, size_t *nresults)
801 {
802 pmix_status_t rc;
803 pmix_group_tracker_t *cb;
804
805 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
806 if (pmix_globals.init_cntr <= 0) {
807 PMIX_RELEASE_THREAD(&pmix_global_lock);
808 return PMIX_ERR_INIT;
809 }
810
811
812 if (!pmix_globals.connected) {
813 PMIX_RELEASE_THREAD(&pmix_global_lock);
814 return PMIX_ERR_UNREACH;
815 }
816 PMIX_RELEASE_THREAD(&pmix_global_lock);
817
818
819
820
821 cb = PMIX_NEW(pmix_group_tracker_t);
822
823 if (PMIX_SUCCESS != (rc = PMIx_Group_join_nb(grp, leader, opt, info, ninfo, info_cbfunc, cb))) {
824 PMIX_RELEASE(cb);
825 return rc;
826 }
827
828
829 PMIX_WAIT_THREAD(&cb->lock);
830 rc = cb->status;
831 PMIX_RELEASE(cb);
832
833 pmix_output_verbose(2, pmix_client_globals.connect_output,
834 "pmix: group construction completed");
835
836 return rc;
837 }
838
839 PMIX_EXPORT pmix_status_t PMIx_Group_join_nb(const char grp[],
840 const pmix_proc_t *leader,
841 pmix_group_opt_t opt,
842 const pmix_info_t info[], size_t ninfo,
843 pmix_info_cbfunc_t cbfunc, void *cbdata)
844 {
845 pmix_status_t rc;
846 pmix_group_tracker_t *cb;
847 pmix_status_t code;
848 size_t n;
849 pmix_data_range_t range;
850
851 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
852
853 pmix_output_verbose(2, pmix_client_globals.connect_output,
854 "[%s:%d] pmix: join nb called",
855 pmix_globals.myid.nspace, pmix_globals.myid.rank);
856
857 if (pmix_globals.init_cntr <= 0) {
858 PMIX_RELEASE_THREAD(&pmix_global_lock);
859 return PMIX_ERR_INIT;
860 }
861
862
863 if (!pmix_globals.connected) {
864 PMIX_RELEASE_THREAD(&pmix_global_lock);
865 return PMIX_ERR_UNREACH;
866 }
867 PMIX_RELEASE_THREAD(&pmix_global_lock);
868
869
870
871
872 cb = PMIX_NEW(pmix_group_tracker_t);
873 cb->cbdata = cbdata;
874
875
876 if (NULL != info) {
877 for (n=0; n < ninfo; n++) {
878 if (PMIX_CHECK_KEY(&info[n], PMIX_TIMEOUT)) {
879
880 break;
881 }
882 }
883 }
884
885
886 if (PMIX_GROUP_ACCEPT == opt) {
887 code = PMIX_GROUP_INVITE_ACCEPTED;
888 } else {
889 code = PMIX_GROUP_INVITE_DECLINED;
890 }
891
892
893 if (NULL != leader) {
894 range = PMIX_RANGE_CUSTOM;
895 PMIX_INFO_CREATE(cb->info, 1);
896 if (NULL == cb->info) {
897 PMIX_RELEASE(cb);
898 return PMIX_ERR_NOMEM;
899 }
900 PMIX_INFO_LOAD(&cb->info[0], PMIX_EVENT_CUSTOM_RANGE, leader, PMIX_PROC);
901 cb->ninfo = 1;
902 } else {
903 range = PMIX_RANGE_SESSION;
904 }
905
906 rc = PMIx_Notify_event(code,
907 &pmix_globals.myid,
908 range,
909 cb->info, cb->ninfo,
910 op_cbfunc, (void*)cb);
911 if (PMIX_SUCCESS != rc) {
912 PMIX_RELEASE(cb);
913 }
914 pmix_output_verbose(2, pmix_client_globals.connect_output,
915 "[%s:%d] pmix: group invite %s",
916 pmix_globals.myid.nspace, pmix_globals.myid.rank,
917 (PMIX_GROUP_INVITE_ACCEPTED == code) ? "ACCEPTED" : "DECLINED");
918
919 return rc;
920 }
921
922 static void op_cbfunc(pmix_status_t status, void *cbdata)
923 {
924 pmix_group_tracker_t *cb = (pmix_group_tracker_t*)cbdata;
925
926 cb->status = status;
927 if (NULL != cb->opcbfunc) {
928 cb->opcbfunc(status, cb->cbdata);
929 }
930 PMIX_POST_OBJECT(cb);
931 PMIX_WAKEUP_THREAD(&cb->lock);
932 }
933
934 static void relfn(void *cbdata)
935 {
936 pmix_group_tracker_t *cb = (pmix_group_tracker_t*)cbdata;
937 PMIX_RELEASE(cb);
938 }
939 static void grp_cbfunc(struct pmix_peer_t *pr,
940 pmix_ptl_hdr_t *hdr,
941 pmix_buffer_t *buf, void *cbdata)
942 {
943 pmix_group_tracker_t *cb = (pmix_group_tracker_t*)cbdata;
944 pmix_status_t rc;
945 pmix_status_t ret;
946 int32_t cnt;
947 size_t ctxid, ninfo=0;
948 pmix_info_t info, *iptr=NULL;
949
950 pmix_output_verbose(2, pmix_client_globals.connect_output,
951 "pmix:client recv callback activated with %d bytes",
952 (NULL == buf) ? -1 : (int)buf->bytes_used);
953
954 if (NULL == buf) {
955 ret = PMIX_ERR_BAD_PARAM;
956 goto report;
957 }
958
959
960
961 if (PMIX_BUFFER_IS_EMPTY(buf)) {
962 ret = PMIX_ERR_UNREACH;
963 goto report;
964 }
965
966
967 cnt = 1;
968 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
969 buf, &ret, &cnt, PMIX_STATUS);
970 if (PMIX_SUCCESS != rc) {
971 PMIX_ERROR_LOG(rc);
972 ret = rc;
973 }
974
975
976 cnt = 1;
977 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
978 buf, &ctxid, &cnt, PMIX_SIZE);
979 if (PMIX_SUCCESS != rc && PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
980 PMIX_ERROR_LOG(rc);
981 ret = rc;
982 } else {
983 PMIX_INFO_LOAD(&info, PMIX_GROUP_CONTEXT_ID, &ctxid, PMIX_SIZE);
984 iptr = &info;
985 ninfo = 1;
986 }
987
988 report:
989 if (NULL != cb->cbfunc) {
990 cb->cbfunc(ret, iptr, ninfo, cb->cbdata, relfn, cb);
991 return;
992 } else if (NULL != cb->opcbfunc) {
993 cb->opcbfunc(ret, cb->cbdata);
994 return;
995 }
996 PMIX_RELEASE(cb);
997 }
998
999 static void info_cbfunc(pmix_status_t status,
1000 pmix_info_t *info, size_t ninfo,
1001 void *cbdata,
1002 pmix_release_cbfunc_t release_fn,
1003 void *release_cbdata)
1004 {
1005 pmix_group_tracker_t *cb = (pmix_group_tracker_t*)cbdata;
1006 size_t n;
1007
1008
1009 cb->status = status;
1010
1011 if (NULL != info) {
1012 cb->nresults = ninfo;
1013 PMIX_INFO_CREATE(cb->results, cb->nresults);
1014 for (n=0; n < ninfo; n++) {
1015 PMIX_INFO_XFER(&cb->results[n], &info[n]);
1016 }
1017 }
1018 if (NULL != release_fn) {
1019 release_fn(release_cbdata);
1020 }
1021 PMIX_POST_OBJECT(cb);
1022 PMIX_WAKEUP_THREAD(&cb->lock);
1023 }