This source file includes following definitions.
- bufdes
- pmix_server_abort
- pmix_server_commit
- get_tracker
- new_tracker
- fence_timeout
- _collect_data
- pmix_server_fence
- opcbfunc
- pmix_server_publish
- lkcbfunc
- pmix_server_lookup
- pmix_server_unpublish
- spcbfunc
- pmix_server_spawn
- pmix_server_disconnect
- connect_timeout
- pmix_server_connect
- _check_cached_events
- regevopcbfunc
- pmix_server_register_events
- pmix_server_deregister_events
- local_cbfunc
- intermed_step
- pmix_server_event_recvd_from_client
- pmix_server_query
- logcbfn
- pmix_server_log
- pmix_server_alloc
- pmix_server_job_ctrl
- pmix_server_monitor
- pmix_server_get_credential
- pmix_server_validate_credential
- pmix_server_iofreg
- stdcbfunc
- pmix_server_iofstdin
- grp_timeout
- _grpcbfunc
- grpcbfunc
- pmix_server_grpconstruct
- pmix_server_grpdestruct
- tcon
- tdes
- cdcon
- cddes
- scadcon
- scaddes
- ncon
- ndes
- dmcon
- dmdes
- dmrqcon
- dmrqdes
- lmcon
- lmdes
- prevcon
- prevdes
- regcon
- regdes
- ilcon
- ildes
- grcon
- grdes
- iocon
- iodes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 #include <src/include/pmix_config.h>
19
20 #include <src/include/pmix_stdint.h>
21 #include <src/include/pmix_socket_errno.h>
22
23 #include <pmix_server.h>
24 #include "src/include/pmix_globals.h"
25
26 #ifdef HAVE_STRING_H
27 #include <string.h>
28 #endif
29 #ifdef HAVE_SYS_STAT_H
30 #include <sys/stat.h>
31 #endif
32 #include <fcntl.h>
33 #ifdef HAVE_UNISTD_H
34 #include <unistd.h>
35 #endif
36 #ifdef HAVE_SYS_SOCKET_H
37 #include <sys/socket.h>
38 #endif
39 #ifdef HAVE_SYS_UN_H
40 #include <sys/un.h>
41 #endif
42 #ifdef HAVE_SYS_UIO_H
43 #include <sys/uio.h>
44 #endif
45 #ifdef HAVE_SYS_TYPES_H
46 #include <sys/types.h>
47 #endif
48 #ifdef HAVE_TIME_H
49 #include <time.h>
50 #endif
51 #include PMIX_EVENT_HEADER
52
53 #include "src/class/pmix_hotel.h"
54 #include "src/class/pmix_list.h"
55 #include "src/common/pmix_attributes.h"
56 #include "src/mca/bfrops/bfrops.h"
57 #include "src/mca/plog/plog.h"
58 #include "src/mca/psensor/psensor.h"
59 #include "src/util/argv.h"
60 #include "src/util/error.h"
61 #include "src/util/output.h"
62 #include "src/util/pmix_environ.h"
63 #include "src/mca/gds/base/base.h"
64
65 #include "pmix_server_ops.h"
66
67
68
69 typedef struct {
70 pmix_list_item_t super;
71 pmix_buffer_t *buf;
72 } rank_blob_t;
73
74 static void bufdes(rank_blob_t *p)
75 {
76 PMIX_RELEASE(p);
77 }
78 static PMIX_CLASS_INSTANCE(rank_blob_t,
79 pmix_list_item_t,
80 NULL, bufdes);
81
82 pmix_server_module_t pmix_host_server = {0};
83
84 pmix_status_t pmix_server_abort(pmix_peer_t *peer, pmix_buffer_t *buf,
85 pmix_op_cbfunc_t cbfunc, void *cbdata)
86 {
87 int32_t cnt;
88 pmix_status_t rc;
89 int status;
90 char *msg;
91 size_t nprocs;
92 pmix_proc_t *procs = NULL;
93 pmix_proc_t proc;
94
95 pmix_output_verbose(2, pmix_server_globals.base_output, "recvd ABORT");
96
97
98 cnt = 1;
99 PMIX_BFROPS_UNPACK(rc, peer, buf, &status, &cnt, PMIX_STATUS);
100 if (PMIX_SUCCESS != rc) {
101 return rc;
102 }
103
104 cnt = 1;
105 PMIX_BFROPS_UNPACK(rc, peer, buf, &msg, &cnt, PMIX_STRING);
106 if (PMIX_SUCCESS != rc) {
107 return rc;
108 }
109
110 cnt = 1;
111 PMIX_BFROPS_UNPACK(rc, peer, buf, &nprocs, &cnt, PMIX_SIZE);
112 if (PMIX_SUCCESS != rc) {
113 return rc;
114 }
115
116
117
118 if (0 < nprocs) {
119 PMIX_PROC_CREATE(procs, nprocs);
120 if (NULL == procs) {
121 if (NULL != msg) {
122 free(msg);
123 }
124 return PMIX_ERR_NOMEM;
125 }
126 cnt = nprocs;
127 PMIX_BFROPS_UNPACK(rc, peer, buf, procs, &cnt, PMIX_PROC);
128 if (PMIX_SUCCESS != rc) {
129 if (NULL != msg) {
130 free(msg);
131 }
132 return rc;
133 }
134 }
135
136
137 if (NULL != pmix_host_server.abort) {
138 pmix_strncpy(proc.nspace, peer->info->pname.nspace, PMIX_MAX_NSLEN);
139 proc.rank = peer->info->pname.rank;
140 rc = pmix_host_server.abort(&proc, peer->info->server_object, status, msg,
141 procs, nprocs, cbfunc, cbdata);
142 } else {
143 rc = PMIX_ERR_NOT_SUPPORTED;
144 }
145 PMIX_PROC_FREE(procs, nprocs);
146
147
148
149 if (NULL != msg) {
150 free(msg);
151 }
152
153 return rc;
154 }
155
156 pmix_status_t pmix_server_commit(pmix_peer_t *peer, pmix_buffer_t *buf)
157 {
158 int32_t cnt;
159 pmix_status_t rc;
160 pmix_buffer_t b2, pbkt;
161 pmix_kval_t *kp;
162 pmix_scope_t scope;
163 pmix_namespace_t *nptr;
164 pmix_rank_info_t *info;
165 pmix_proc_t proc;
166 pmix_dmdx_remote_t *dcd, *dcdnext;
167 char *data;
168 size_t sz;
169 pmix_cb_t cb;
170
171
172 info = peer->info;
173 nptr = peer->nptr;
174 pmix_strncpy(proc.nspace, nptr->nspace, PMIX_MAX_NSLEN);
175 proc.rank = info->pname.rank;
176
177 pmix_output_verbose(2, pmix_server_globals.base_output,
178 "%s:%d EXECUTE COMMIT FOR %s:%d",
179 pmix_globals.myid.nspace,
180 pmix_globals.myid.rank,
181 nptr->nspace, info->pname.rank);
182
183
184
185
186
187 cnt = 1;
188 PMIX_BFROPS_UNPACK(rc, peer, buf, &scope, &cnt, PMIX_SCOPE);
189 while (PMIX_SUCCESS == rc) {
190
191 cnt = 1;
192 PMIX_CONSTRUCT(&b2, pmix_buffer_t);
193 PMIX_BFROPS_ASSIGN_TYPE(peer, &b2);
194 PMIX_BFROPS_UNPACK(rc, peer, buf, &b2, &cnt, PMIX_BUFFER);
195 if (PMIX_SUCCESS != rc) {
196 PMIX_ERROR_LOG(rc);
197 return rc;
198 }
199
200
201
202 kp = PMIX_NEW(pmix_kval_t);
203 cnt = 1;
204 PMIX_BFROPS_UNPACK(rc, peer, &b2, kp, &cnt, PMIX_KVAL);
205 while (PMIX_SUCCESS == rc) {
206 if( PMIX_LOCAL == scope || PMIX_GLOBAL == scope){
207 PMIX_GDS_STORE_KV(rc, peer, &proc, scope, kp);
208 if (PMIX_SUCCESS != rc) {
209 PMIX_ERROR_LOG(rc);
210 PMIX_RELEASE(kp);
211 PMIX_DESTRUCT(&b2);
212 return rc;
213 }
214 }
215 if (PMIX_REMOTE == scope || PMIX_GLOBAL == scope) {
216 PMIX_GDS_STORE_KV(rc, pmix_globals.mypeer, &proc, scope, kp);
217 if (PMIX_SUCCESS != rc) {
218 PMIX_ERROR_LOG(rc);
219 PMIX_RELEASE(kp);
220 PMIX_DESTRUCT(&b2);
221 return rc;
222 }
223 }
224 PMIX_RELEASE(kp);
225 kp = PMIX_NEW(pmix_kval_t);
226 cnt = 1;
227 PMIX_BFROPS_UNPACK(rc, peer, &b2, kp, &cnt, PMIX_KVAL);
228
229 }
230 PMIX_RELEASE(kp);
231 PMIX_DESTRUCT(&b2);
232 if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
233 PMIX_ERROR_LOG(rc);
234 return rc;
235 }
236 cnt = 1;
237 PMIX_BFROPS_UNPACK(rc, peer, buf, &scope, &cnt, PMIX_SCOPE);
238 }
239 if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
240 PMIX_ERROR_LOG(rc);
241 return rc;
242 }
243 rc = PMIX_SUCCESS;
244
245 info->modex_recvd = true;
246
247
248 peer->commit_cnt++;
249
250
251 PMIX_LIST_FOREACH_SAFE(dcd, dcdnext, &pmix_server_globals.remote_pnd, pmix_dmdx_remote_t) {
252 if (0 != strncmp(dcd->cd->proc.nspace, nptr->nspace, PMIX_MAX_NSLEN)) {
253 continue;
254 }
255 if (dcd->cd->proc.rank == info->pname.rank) {
256
257
258
259 data = NULL;
260 sz = 0;
261 PMIX_CONSTRUCT(&cb, pmix_cb_t);
262 cb.proc = &proc;
263 cb.scope = PMIX_REMOTE;
264 cb.copy = true;
265 PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, &cb);
266 if (PMIX_SUCCESS == rc) {
267
268 PMIX_CONSTRUCT(&pbkt, pmix_buffer_t);
269 PMIX_LIST_FOREACH(kp, &cb.kvs, pmix_kval_t) {
270
271
272 PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &pbkt, kp, 1, PMIX_KVAL);
273 }
274 PMIX_UNLOAD_BUFFER(&pbkt, data, sz);
275 }
276 PMIX_DESTRUCT(&cb);
277
278 dcd->cd->cbfunc(rc, data, sz, dcd->cd->cbdata);
279 if (NULL != data) {
280 free(data);
281 }
282
283 pmix_list_remove_item(&pmix_server_globals.remote_pnd, &dcd->super);
284 PMIX_RELEASE(dcd);
285 }
286 }
287
288 rc = pmix_pending_resolve(nptr, info->pname.rank, PMIX_SUCCESS, NULL);
289 if (PMIX_SUCCESS != rc) {
290 PMIX_ERROR_LOG(rc);
291 }
292 return rc;
293 }
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311 static pmix_server_trkr_t* get_tracker(char *id, pmix_proc_t *procs,
312 size_t nprocs, pmix_cmd_t type)
313 {
314 pmix_server_trkr_t *trk;
315 size_t i, j;
316 size_t matches;
317
318 pmix_output_verbose(5, pmix_server_globals.base_output,
319 "get_tracker called with %d procs", (int)nprocs);
320
321
322 if (NULL == procs && NULL == id) {
323 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
324 return NULL;
325 }
326
327
328
329
330
331
332
333 PMIX_LIST_FOREACH(trk, &pmix_server_globals.collectives, pmix_server_trkr_t) {
334
335
336
337
338 if (NULL != id) {
339 if (NULL != trk->id && 0 == strcmp(id, trk->id)) {
340 return trk;
341 }
342 } else {
343 if (nprocs != trk->npcs) {
344 continue;
345 }
346 if (type != trk->type) {
347 continue;
348 }
349 matches = 0;
350 for (i=0; i < nprocs; i++) {
351
352
353 for (j=0; j < trk->npcs; j++) {
354 if (0 == strcmp(procs[i].nspace, trk->pcs[j].nspace) &&
355 procs[i].rank == trk->pcs[j].rank) {
356 ++matches;
357 break;
358 }
359 }
360 }
361 if (trk->npcs == matches) {
362 return trk;
363 }
364 }
365 }
366
367 return NULL;
368 }
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386 static pmix_server_trkr_t* new_tracker(char *id, pmix_proc_t *procs,
387 size_t nprocs, pmix_cmd_t type)
388 {
389 pmix_server_trkr_t *trk;
390 size_t i;
391 bool all_def;
392 pmix_namespace_t *nptr, *ns;
393 pmix_rank_info_t *info;
394 pmix_nspace_caddy_t *nm;
395
396 pmix_output_verbose(5, pmix_server_globals.base_output,
397 "new_tracker called with %d procs", (int)nprocs);
398
399
400 if (NULL == procs) {
401 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
402 return NULL;
403 }
404
405 pmix_output_verbose(5, pmix_server_globals.base_output,
406 "adding new tracker %s with %d procs",
407 (NULL == id) ? "NO-ID" : id, (int)nprocs);
408
409
410 trk = PMIX_NEW(pmix_server_trkr_t);
411 if (NULL == trk) {
412 PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
413 return NULL;
414 }
415
416 if (NULL != id) {
417 trk->id = strdup(id);
418 }
419
420 if (NULL != procs) {
421
422 PMIX_PROC_CREATE(trk->pcs, nprocs);
423 if (NULL == trk->pcs) {
424 PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
425 PMIX_RELEASE(trk);
426 return NULL;
427 }
428 memcpy(trk->pcs, procs, nprocs * sizeof(pmix_proc_t));
429 trk->npcs = nprocs;
430 }
431 trk->type = type;
432
433 all_def = true;
434 for (i=0; i < nprocs; i++) {
435 if (NULL == id) {
436 pmix_strncpy(trk->pcs[i].nspace, procs[i].nspace, PMIX_MAX_NSLEN);
437 trk->pcs[i].rank = procs[i].rank;
438 }
439 if (!all_def) {
440 continue;
441 }
442
443 nptr = NULL;
444 PMIX_LIST_FOREACH(ns, &pmix_server_globals.nspaces, pmix_namespace_t) {
445 if (0 == strcmp(procs[i].nspace, ns->nspace)) {
446 nptr = ns;
447 break;
448 }
449 }
450 if (NULL == nptr) {
451
452 pmix_output_verbose(5, pmix_server_globals.base_output,
453 "new_tracker: unknown nspace %s",
454 procs[i].nspace);
455 continue;
456 }
457
458 PMIX_LIST_FOREACH(nm, &trk->nslist, pmix_nspace_caddy_t) {
459 if (0 == strcmp(nptr->nspace, nm->ns->nspace)) {
460 break;
461 }
462 }
463 if ((pmix_nspace_caddy_t*)pmix_list_get_end(&trk->nslist) == nm) {
464 nm = PMIX_NEW(pmix_nspace_caddy_t);
465 PMIX_RETAIN(nptr);
466 nm->ns = nptr;
467 pmix_list_append(&trk->nslist, &nm->super);
468 }
469
470
471 if (!nptr->all_registered) {
472
473
474 all_def = false;
475 pmix_output_verbose(5, pmix_server_globals.base_output,
476 "new_tracker: all clients not registered nspace %s",
477 procs[i].nspace);
478
479
480
481 }
482
483 PMIX_LIST_FOREACH(info, &nptr->ranks, pmix_rank_info_t) {
484 if (procs[i].rank == info->pname.rank ||
485 PMIX_RANK_WILDCARD == procs[i].rank) {
486 pmix_output_verbose(5, pmix_server_globals.base_output,
487 "adding local proc %s.%d to tracker",
488 info->pname.nspace, info->pname.rank);
489
490 ++trk->nlocal;
491 if (PMIX_RANK_WILDCARD != procs[i].rank) {
492 break;
493 }
494 }
495 }
496 }
497 if (all_def) {
498 trk->def_complete = true;
499 }
500 pmix_list_append(&pmix_server_globals.collectives, &trk->super);
501 return trk;
502 }
503
504 static void fence_timeout(int sd, short args, void *cbdata)
505 {
506 pmix_server_caddy_t *cd = (pmix_server_caddy_t*)cbdata;
507
508 pmix_output_verbose(2, pmix_server_globals.fence_output,
509 "ALERT: fence timeout fired");
510
511
512 if (NULL != cd->trk->modexcbfunc) {
513 cd->trk->modexcbfunc(PMIX_ERR_TIMEOUT, NULL, 0, cd->trk, NULL, NULL);
514 return;
515 }
516 cd->event_active = false;
517
518 pmix_list_remove_item(&cd->trk->local_cbs, &cd->super);
519 PMIX_RELEASE(cd);
520 }
521
522 static pmix_status_t _collect_data(pmix_server_trkr_t *trk,
523 pmix_buffer_t *buf)
524 {
525 pmix_buffer_t bucket, *pbkt = NULL;
526 pmix_cb_t cb;
527 pmix_kval_t *kv;
528 pmix_byte_object_t bo;
529 pmix_server_caddy_t *scd;
530 pmix_proc_t pcs;
531 pmix_status_t rc = PMIX_SUCCESS;
532 pmix_rank_t rel_rank;
533 pmix_nspace_caddy_t *nm;
534 bool found;
535 pmix_list_t rank_blobs;
536 rank_blob_t *blob;
537 uint32_t kmap_size;
538
539
540 char **kmap = NULL;
541 int i;
542 pmix_gds_modex_blob_info_t blob_info_byte = 0;
543 pmix_gds_modex_key_fmt_t kmap_type = PMIX_MODEX_KEY_INVALID;
544
545 PMIX_CONSTRUCT(&bucket, pmix_buffer_t);
546
547 if (PMIX_COLLECT_YES == trk->collect_type) {
548 pmix_output_verbose(2, pmix_server_globals.fence_output,
549 "fence - assembling data");
550
551
552
553
554
555
556 if (PMIX_MODEX_KEY_INVALID == kmap_type) {
557 size_t key_fmt_size[PMIX_MODEX_KEY_MAX] = {0};
558 pmix_value_array_t *key_count_array = PMIX_NEW(pmix_value_array_t);
559 uint32_t *key_count = NULL;
560
561 pmix_value_array_init(key_count_array, sizeof(uint32_t));
562
563 PMIX_LIST_FOREACH(scd, &trk->local_cbs, pmix_server_caddy_t) {
564 pmix_strncpy(pcs.nspace, scd->peer->info->pname.nspace,
565 PMIX_MAX_NSLEN);
566 pcs.rank = scd->peer->info->pname.rank;
567 PMIX_CONSTRUCT(&cb, pmix_cb_t);
568 cb.proc = &pcs;
569 cb.scope = PMIX_REMOTE;
570 cb.copy = true;
571 PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, &cb);
572 if (PMIX_SUCCESS == rc) {
573 int key_idx;
574 PMIX_LIST_FOREACH(kv, &cb.kvs, pmix_kval_t) {
575 rc = pmix_argv_append_unique_idx(&key_idx, &kmap,
576 kv->key);
577 if (pmix_value_array_get_size(key_count_array) <
578 (size_t)(key_idx+1)) {
579 size_t new_size;
580 size_t old_size =
581 pmix_value_array_get_size(key_count_array);
582
583 pmix_value_array_set_size(key_count_array,
584 key_idx+1);
585 new_size =
586 pmix_value_array_get_size(key_count_array);
587 key_count =
588 PMIX_VALUE_ARRAY_GET_BASE(key_count_array,
589 uint32_t);
590 memset(key_count + old_size, 0, sizeof(uint32_t) *
591 (new_size - old_size));
592 }
593 key_count = PMIX_VALUE_ARRAY_GET_BASE(key_count_array,
594 uint32_t);
595 key_count[key_idx]++;
596 }
597 }
598 }
599
600 key_count = PMIX_VALUE_ARRAY_GET_BASE(key_count_array, uint32_t);
601
602 for (i = 0; i < pmix_argv_count(kmap); i++) {
603 pmix_buffer_t tmp;
604 size_t kname_size;
605 size_t kidx_size;
606
607 PMIX_CONSTRUCT(&tmp, pmix_buffer_t);
608 PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &tmp, &kmap[i], 1,
609 PMIX_STRING);
610 kname_size = tmp.bytes_used;
611 PMIX_DESTRUCT(&tmp);
612 PMIX_CONSTRUCT(&tmp, pmix_buffer_t);
613 PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &tmp, &i, 1,
614 PMIX_UINT32);
615 kidx_size = tmp.bytes_used;
616 PMIX_DESTRUCT(&tmp);
617
618
619 key_fmt_size[PMIX_MODEX_KEY_NATIVE_FMT] =
620 kname_size * key_count[i];
621 key_fmt_size[PMIX_MODEX_KEY_KEYMAP_FMT] =
622 kname_size + key_count[i]*kidx_size;
623 }
624 PMIX_RELEASE(key_count_array);
625
626
627 kmap_type = key_fmt_size[PMIX_MODEX_KEY_NATIVE_FMT] >
628 key_fmt_size[PMIX_MODEX_KEY_KEYMAP_FMT] ?
629 PMIX_MODEX_KEY_KEYMAP_FMT : PMIX_MODEX_KEY_NATIVE_FMT;
630 pmix_output_verbose(5, pmix_server_globals.base_output,
631 "key packing type %s",
632 kmap_type == PMIX_MODEX_KEY_KEYMAP_FMT ?
633 "kmap" : "native");
634 }
635 PMIX_CONSTRUCT(&rank_blobs, pmix_list_t);
636 PMIX_LIST_FOREACH(scd, &trk->local_cbs, pmix_server_caddy_t) {
637
638
639 pmix_strncpy(pcs.nspace, scd->peer->info->pname.nspace,
640 PMIX_MAX_NSLEN);
641 pcs.rank = scd->peer->info->pname.rank;
642 PMIX_CONSTRUCT(&cb, pmix_cb_t);
643 cb.proc = &pcs;
644 cb.scope = PMIX_REMOTE;
645 cb.copy = true;
646 PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, &cb);
647 if (PMIX_SUCCESS == rc) {
648
649 rel_rank = 0;
650 found = false;
651 if (pmix_list_get_size(&trk->nslist) == 1) {
652 found = true;
653 } else {
654 PMIX_LIST_FOREACH(nm, &trk->nslist, pmix_nspace_caddy_t) {
655 if (0 == strcmp(nm->ns->nspace, pcs.nspace)) {
656 found = true;
657 break;
658 }
659 rel_rank += nm->ns->nprocs;
660 }
661 }
662 if (false == found) {
663 rc = PMIX_ERR_NOT_FOUND;
664 PMIX_ERROR_LOG(rc);
665 PMIX_DESTRUCT(&cb);
666 PMIX_DESTRUCT(&rank_blobs);
667 goto cleanup;
668 }
669 rel_rank += pcs.rank;
670
671
672 pbkt = PMIX_NEW(pmix_buffer_t);
673 PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, pbkt,
674 &rel_rank, 1, PMIX_PROC_RANK);
675 if (PMIX_SUCCESS != rc) {
676 PMIX_ERROR_LOG(rc);
677 PMIX_DESTRUCT(&cb);
678 PMIX_DESTRUCT(&rank_blobs);
679 PMIX_RELEASE(pbkt);
680 goto cleanup;
681 }
682
683 PMIX_LIST_FOREACH(kv, &cb.kvs, pmix_kval_t) {
684 rc = pmix_gds_base_modex_pack_kval(kmap_type, pbkt, &kmap,
685 kv);
686 if (rc != PMIX_SUCCESS) {
687 PMIX_ERROR_LOG(rc);
688 PMIX_DESTRUCT(&cb);
689 PMIX_DESTRUCT(&rank_blobs);
690 PMIX_RELEASE(pbkt);
691 goto cleanup;
692 }
693 }
694
695
696 blob = PMIX_NEW(rank_blob_t);
697 blob->buf = pbkt;
698 pmix_list_append(&rank_blobs, &blob->super);
699 pbkt = NULL;
700 }
701 PMIX_DESTRUCT(&cb);
702 }
703
704
705
706
707
708
709 blob_info_byte |= PMIX_GDS_COLLECT_BIT;
710 if (PMIX_MODEX_KEY_KEYMAP_FMT == kmap_type) {
711 blob_info_byte |= PMIX_GDS_KEYMAP_BIT;
712 }
713
714 PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &bucket,
715 &blob_info_byte, 1, PMIX_BYTE);
716
717 if (PMIX_MODEX_KEY_KEYMAP_FMT == kmap_type) {
718
719
720
721 kmap_size = pmix_argv_count(kmap);
722 if (0 < kmap_size) {
723 PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &bucket,
724 &kmap_size, 1, PMIX_UINT32);
725 PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &bucket,
726 kmap, kmap_size, PMIX_STRING);
727 }
728 }
729
730 PMIX_LIST_FOREACH(blob, &rank_blobs, rank_blob_t) {
731
732 PMIX_UNLOAD_BUFFER(blob->buf, bo.bytes, bo.size);
733
734 PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &bucket,
735 &bo, 1, PMIX_BYTE_OBJECT);
736 PMIX_BYTE_OBJECT_DESTRUCT(&bo);
737 if (PMIX_SUCCESS != rc) {
738 PMIX_ERROR_LOG(rc);
739 goto cleanup;
740 }
741 }
742 PMIX_DESTRUCT(&rank_blobs);
743 } else {
744
745
746
747
748
749
750
751
752
753 #if PMIX_ENABLE_DEBUG
754
755 PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &bucket,
756 &blob_info_byte, 1, PMIX_BYTE);
757 #endif
758 }
759 if (!PMIX_BUFFER_IS_EMPTY(&bucket)) {
760
761
762
763 PMIX_UNLOAD_BUFFER(&bucket, bo.bytes, bo.size);
764 PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, buf,
765 &bo, 1, PMIX_BYTE_OBJECT);
766 PMIX_BYTE_OBJECT_DESTRUCT(&bo);
767 if (PMIX_SUCCESS != rc) {
768 PMIX_ERROR_LOG(rc);
769 }
770 }
771
772 cleanup:
773 PMIX_DESTRUCT(&bucket);
774 pmix_argv_free(kmap);
775 return rc;
776 }
777
778 pmix_status_t pmix_server_fence(pmix_server_caddy_t *cd,
779 pmix_buffer_t *buf,
780 pmix_modex_cbfunc_t modexcbfunc,
781 pmix_op_cbfunc_t opcbfunc)
782 {
783 int32_t cnt;
784 pmix_status_t rc;
785 size_t nprocs;
786 pmix_proc_t *procs=NULL, *newprocs;
787 bool collect_data = false;
788 pmix_server_trkr_t *trk;
789 char *data = NULL;
790 size_t sz = 0;
791 pmix_buffer_t bucket;
792 pmix_info_t *info = NULL;
793 size_t ninfo=0, n, nmbrs, idx;
794 struct timeval tv = {0, 0};
795 pmix_list_t expand;
796 pmix_group_caddy_t *gcd;
797 pmix_group_t *grp;
798
799 pmix_output_verbose(2, pmix_server_globals.fence_output,
800 "recvd FENCE");
801
802 if (NULL == pmix_host_server.fence_nb) {
803 PMIX_ERROR_LOG(PMIX_ERR_NOT_SUPPORTED);
804 return PMIX_ERR_NOT_SUPPORTED;
805 }
806
807
808 cnt = 1;
809 PMIX_BFROPS_UNPACK(rc, cd->peer, buf, &nprocs, &cnt, PMIX_SIZE);
810 if (PMIX_SUCCESS != rc) {
811 return rc;
812 }
813 pmix_output_verbose(2, pmix_server_globals.fence_output,
814 "recvd fence from %s:%u with %d procs",
815 cd->peer->info->pname.nspace, cd->peer->info->pname.rank, (int)nprocs);
816
817
818 if (nprocs < 1) {
819 return PMIX_ERR_BAD_PARAM;
820 }
821
822
823 PMIX_PROC_CREATE(procs, nprocs);
824 if (NULL == procs) {
825 return PMIX_ERR_NOMEM;
826 }
827
828 cnt = nprocs;
829 PMIX_BFROPS_UNPACK(rc, cd->peer, buf, procs, &cnt, PMIX_PROC);
830 if (PMIX_SUCCESS != rc) {
831 goto cleanup;
832 }
833
834
835
836 nmbrs = nprocs;
837 PMIX_CONSTRUCT(&expand, pmix_list_t);
838
839
840 PMIX_LIST_FOREACH(grp, &pmix_server_globals.groups, pmix_group_t) {
841 for (n=0; n < nprocs; n++) {
842 if (PMIX_CHECK_NSPACE(procs[n].nspace, grp->grpid)) {
843
844 gcd = PMIX_NEW(pmix_group_caddy_t);
845 gcd->grp = grp;
846 gcd->idx = n;
847 gcd->rank = procs[n].rank;
848 pmix_list_append(&expand, &gcd->super);
849
850 if (PMIX_RANK_WILDCARD == procs[n].rank) {
851 nmbrs += grp->nmbrs - 1;
852 }
853 break;
854 }
855 }
856 }
857
858 if (0 < pmix_list_get_size(&expand)) {
859 PMIX_PROC_CREATE(newprocs, nmbrs);
860 gcd = (pmix_group_caddy_t*)pmix_list_remove_first(&expand);
861 n=0;
862 idx = 0;
863 while (n < nmbrs) {
864 if (idx != gcd->idx) {
865 memcpy(&newprocs[n], &procs[idx], sizeof(pmix_proc_t));
866 ++n;
867 } else {
868
869 if (PMIX_RANK_WILDCARD != gcd->rank) {
870 memcpy(&newprocs[n], &gcd->grp->members[gcd->rank], sizeof(pmix_proc_t));
871 ++n;
872 } else {
873
874 memcpy(&newprocs[n], gcd->grp->members, gcd->grp->nmbrs * sizeof(pmix_proc_t));
875 n += gcd->grp->nmbrs;
876 }
877 PMIX_RELEASE(gcd);
878 gcd = (pmix_group_caddy_t*)pmix_list_remove_first(&expand);
879 }
880 ++idx;
881 }
882 PMIX_PROC_FREE(procs, nprocs);
883 procs = newprocs;
884 nprocs = nmbrs;
885 }
886 PMIX_LIST_DESTRUCT(&expand);
887
888
889 cnt = 1;
890 PMIX_BFROPS_UNPACK(rc, cd->peer, buf, &ninfo, &cnt, PMIX_SIZE);
891 if (PMIX_SUCCESS != rc) {
892 return rc;
893 }
894 if (0 < ninfo) {
895 PMIX_INFO_CREATE(info, ninfo);
896 if (NULL == info) {
897 PMIX_PROC_FREE(procs, nprocs);
898 return PMIX_ERR_NOMEM;
899 }
900
901 cnt = ninfo;
902 PMIX_BFROPS_UNPACK(rc, cd->peer, buf, info, &cnt, PMIX_INFO);
903 if (PMIX_SUCCESS != rc) {
904 goto cleanup;
905 }
906
907
908 for (n=0; n < ninfo; n++) {
909 if (0 == strcmp(info[n].key, PMIX_COLLECT_DATA)) {
910 collect_data = true;
911 } else if (0 == strncmp(info[n].key, PMIX_TIMEOUT, PMIX_MAX_KEYLEN)) {
912 tv.tv_sec = info[n].value.data.uint32;
913 }
914 }
915 }
916
917
918 if (NULL == (trk = get_tracker(NULL, procs, nprocs, PMIX_FENCENB_CMD))) {
919
920 if (NULL == (trk = new_tracker(NULL, procs, nprocs, PMIX_FENCENB_CMD))) {
921
922 PMIX_ERROR_LOG(PMIX_ERROR);
923
924 if (NULL != opcbfunc) {
925 opcbfunc(PMIX_ERROR, cd);
926 }
927 rc = PMIX_ERROR;
928 goto cleanup;
929 }
930 trk->type = PMIX_FENCENB_CMD;
931 trk->modexcbfunc = modexcbfunc;
932
933 if (collect_data) {
934 trk->collect_type = PMIX_COLLECT_YES;
935 } else {
936 trk->collect_type = PMIX_COLLECT_NO;
937 }
938 } else {
939 switch (trk->collect_type) {
940 case PMIX_COLLECT_NO:
941 if (collect_data) {
942 trk->collect_type = PMIX_COLLECT_INVALID;
943 }
944 break;
945 case PMIX_COLLECT_YES:
946 if (!collect_data) {
947 trk->collect_type = PMIX_COLLECT_INVALID;
948 }
949 break;
950 default:
951 break;
952 }
953 }
954
955
956
957
958 if (NULL == trk->info) {
959 trk->info = info;
960 trk->ninfo = ninfo;
961 } else {
962
963 PMIX_INFO_FREE(info, ninfo);
964 info = NULL;
965 }
966
967
968
969 PMIX_RETAIN(cd);
970 pmix_list_append(&trk->local_cbs, &cd->super);
971
972 if (0 < tv.tv_sec) {
973 PMIX_RETAIN(trk);
974 cd->trk = trk;
975 pmix_event_evtimer_set(pmix_globals.evbase, &cd->ev,
976 fence_timeout, cd);
977 pmix_event_evtimer_add(&cd->ev, &tv);
978 cd->event_active = true;
979 }
980
981
982
983
984
985 if (trk->def_complete &&
986 pmix_list_get_size(&trk->local_cbs) == trk->nlocal) {
987 pmix_output_verbose(2, pmix_server_globals.fence_output,
988 "fence complete");
989
990
991
992
993
994
995
996 PMIX_CONSTRUCT(&bucket, pmix_buffer_t);
997 if (PMIX_SUCCESS != (rc = _collect_data(trk, &bucket))) {
998 PMIX_ERROR_LOG(rc);
999 PMIX_DESTRUCT(&bucket);
1000 goto cleanup;
1001 }
1002
1003 PMIX_UNLOAD_BUFFER(&bucket, data, sz);
1004 PMIX_DESTRUCT(&bucket);
1005 trk->host_called = true;
1006 rc = pmix_host_server.fence_nb(trk->pcs, trk->npcs,
1007 trk->info, trk->ninfo,
1008 data, sz, trk->modexcbfunc, trk);
1009 if (PMIX_SUCCESS != rc) {
1010 pmix_list_remove_item(&pmix_server_globals.collectives, &trk->super);
1011 PMIX_RELEASE(trk);
1012 cd->trk = NULL;
1013 }
1014 }
1015
1016 cleanup:
1017 PMIX_PROC_FREE(procs, nprocs);
1018 return rc;
1019 }
1020
1021 static void opcbfunc(pmix_status_t status, void *cbdata)
1022 {
1023 pmix_setup_caddy_t *cd = (pmix_setup_caddy_t*)cbdata;
1024
1025 if (NULL != cd->keys) {
1026 pmix_argv_free(cd->keys);
1027 }
1028 if (NULL != cd->codes) {
1029 free(cd->codes);
1030 }
1031 if (NULL != cd->info) {
1032 PMIX_INFO_FREE(cd->info, cd->ninfo);
1033 }
1034 if (NULL != cd->opcbfunc) {
1035 cd->opcbfunc(status, cd->cbdata);
1036 }
1037 PMIX_RELEASE(cd);
1038 }
1039
1040 pmix_status_t pmix_server_publish(pmix_peer_t *peer,
1041 pmix_buffer_t *buf,
1042 pmix_op_cbfunc_t cbfunc, void *cbdata)
1043 {
1044 pmix_setup_caddy_t *cd;
1045 pmix_status_t rc;
1046 int32_t cnt;
1047 size_t ninfo;
1048 pmix_proc_t proc;
1049 uint32_t uid;
1050
1051 pmix_output_verbose(2, pmix_server_globals.pub_output,
1052 "recvd PUBLISH");
1053
1054 if (NULL == pmix_host_server.publish) {
1055 return PMIX_ERR_NOT_SUPPORTED;
1056 }
1057
1058
1059 cnt=1;
1060 PMIX_BFROPS_UNPACK(rc, peer, buf, &uid, &cnt, PMIX_UINT32);
1061 if (PMIX_SUCCESS != rc) {
1062 PMIX_ERROR_LOG(rc);
1063 return rc;
1064 }
1065
1066 cnt=1;
1067 PMIX_BFROPS_UNPACK(rc, peer, buf, &ninfo, &cnt, PMIX_SIZE);
1068 if (PMIX_SUCCESS != rc) {
1069 PMIX_ERROR_LOG(rc);
1070 return rc;
1071 }
1072
1073 cd = PMIX_NEW(pmix_setup_caddy_t);
1074 if (NULL == cd) {
1075 return PMIX_ERR_NOMEM;
1076 }
1077 cd->opcbfunc = cbfunc;
1078 cd->cbdata = cbdata;
1079 cd->ninfo = ninfo + 1;
1080 PMIX_INFO_CREATE(cd->info, cd->ninfo);
1081 if (NULL == cd->info) {
1082 rc = PMIX_ERR_NOMEM;
1083 goto cleanup;
1084 }
1085
1086 if (0 < cd->ninfo) {
1087 cnt=cd->ninfo;
1088 PMIX_BFROPS_UNPACK(rc, peer, buf, cd->info, &cnt, PMIX_INFO);
1089 if (PMIX_SUCCESS != rc) {
1090 PMIX_ERROR_LOG(rc);
1091 goto cleanup;
1092 }
1093 }
1094 pmix_strncpy(cd->info[cd->ninfo-1].key, PMIX_USERID, PMIX_MAX_KEYLEN);
1095 cd->info[cd->ninfo-1].value.type = PMIX_UINT32;
1096 cd->info[cd->ninfo-1].value.data.uint32 = uid;
1097
1098
1099 pmix_strncpy(proc.nspace, peer->info->pname.nspace, PMIX_MAX_NSLEN);
1100 proc.rank = peer->info->pname.rank;
1101 rc = pmix_host_server.publish(&proc, cd->info, cd->ninfo, opcbfunc, cd);
1102
1103 cleanup:
1104 if (PMIX_SUCCESS != rc) {
1105 if (NULL != cd->info) {
1106 PMIX_INFO_FREE(cd->info, cd->ninfo);
1107 }
1108 PMIX_RELEASE(cd);
1109 }
1110 return rc;
1111 }
1112
1113 static void lkcbfunc(pmix_status_t status,
1114 pmix_pdata_t data[], size_t ndata,
1115 void *cbdata)
1116 {
1117 pmix_setup_caddy_t *cd = (pmix_setup_caddy_t*)cbdata;
1118
1119
1120 if (NULL != cd->keys) {
1121 pmix_argv_free(cd->keys);
1122 }
1123 if (NULL != cd->info) {
1124 PMIX_INFO_FREE(cd->info, cd->ninfo);
1125 }
1126
1127
1128 if (NULL != cd->lkcbfunc) {
1129 cd->lkcbfunc(status, data, ndata, cd->cbdata);
1130 }
1131 PMIX_RELEASE(cd);
1132 }
1133 pmix_status_t pmix_server_lookup(pmix_peer_t *peer,
1134 pmix_buffer_t *buf,
1135 pmix_lookup_cbfunc_t cbfunc, void *cbdata)
1136 {
1137 pmix_setup_caddy_t *cd;
1138 int32_t cnt;
1139 pmix_status_t rc;
1140 size_t nkeys, i;
1141 char *sptr;
1142 size_t ninfo;
1143 pmix_proc_t proc;
1144 uint32_t uid;
1145
1146 pmix_output_verbose(2, pmix_server_globals.pub_output,
1147 "recvd LOOKUP");
1148
1149 if (NULL == pmix_host_server.lookup) {
1150 return PMIX_ERR_NOT_SUPPORTED;
1151 }
1152
1153
1154 cnt=1;
1155 PMIX_BFROPS_UNPACK(rc, peer, buf, &uid, &cnt, PMIX_UINT32);
1156 if (PMIX_SUCCESS != rc) {
1157 PMIX_ERROR_LOG(rc);
1158 return rc;
1159 }
1160
1161 cnt=1;
1162 PMIX_BFROPS_UNPACK(rc, peer, buf, &nkeys, &cnt, PMIX_SIZE);
1163 if (PMIX_SUCCESS != rc) {
1164 PMIX_ERROR_LOG(rc);
1165 return rc;
1166 }
1167
1168 cd = PMIX_NEW(pmix_setup_caddy_t);
1169 if (NULL == cd) {
1170 return PMIX_ERR_NOMEM;
1171 }
1172 cd->lkcbfunc = cbfunc;
1173 cd->cbdata = cbdata;
1174
1175 for (i=0; i < nkeys; i++) {
1176 cnt=1;
1177 PMIX_BFROPS_UNPACK(rc, peer, buf, &sptr, &cnt, PMIX_STRING);
1178 if (PMIX_SUCCESS != rc) {
1179 PMIX_ERROR_LOG(rc);
1180 goto cleanup;
1181 }
1182 pmix_argv_append_nosize(&cd->keys, sptr);
1183 free(sptr);
1184 }
1185
1186 cnt=1;
1187 PMIX_BFROPS_UNPACK(rc, peer, buf, &ninfo, &cnt, PMIX_SIZE);
1188 if (PMIX_SUCCESS != rc) {
1189 PMIX_ERROR_LOG(rc);
1190 goto cleanup;
1191 }
1192
1193 cd->ninfo = ninfo + 1;
1194 PMIX_INFO_CREATE(cd->info, cd->ninfo);
1195 if (NULL == cd->info) {
1196 rc = PMIX_ERR_NOMEM;
1197 goto cleanup;
1198 }
1199
1200 if (0 < ninfo) {
1201 cnt=ninfo;
1202 PMIX_BFROPS_UNPACK(rc, peer, buf, cd->info, &cnt, PMIX_INFO);
1203 if (PMIX_SUCCESS != rc) {
1204 PMIX_ERROR_LOG(rc);
1205 goto cleanup;
1206 }
1207 }
1208 pmix_strncpy(cd->info[cd->ninfo-1].key, PMIX_USERID, PMIX_MAX_KEYLEN);
1209 cd->info[cd->ninfo-1].value.type = PMIX_UINT32;
1210 cd->info[cd->ninfo-1].value.data.uint32 = uid;
1211
1212
1213 pmix_strncpy(proc.nspace, peer->info->pname.nspace, PMIX_MAX_NSLEN);
1214 proc.rank = peer->info->pname.rank;
1215 rc = pmix_host_server.lookup(&proc, cd->keys, cd->info, cd->ninfo, lkcbfunc, cd);
1216
1217 cleanup:
1218 if (PMIX_SUCCESS != rc) {
1219 if (NULL != cd->keys) {
1220 pmix_argv_free(cd->keys);
1221 }
1222 if (NULL != cd->info) {
1223 PMIX_INFO_FREE(cd->info, cd->ninfo);
1224 }
1225 PMIX_RELEASE(cd);
1226 }
1227 return rc;
1228 }
1229
1230 pmix_status_t pmix_server_unpublish(pmix_peer_t *peer,
1231 pmix_buffer_t *buf,
1232 pmix_op_cbfunc_t cbfunc, void *cbdata)
1233 {
1234 pmix_setup_caddy_t *cd;
1235 int32_t cnt;
1236 pmix_status_t rc;
1237 size_t i, nkeys, ninfo;
1238 char *sptr;
1239 pmix_proc_t proc;
1240 uint32_t uid;
1241
1242 pmix_output_verbose(2, pmix_server_globals.pub_output,
1243 "recvd UNPUBLISH");
1244
1245 if (NULL == pmix_host_server.unpublish) {
1246 return PMIX_ERR_NOT_SUPPORTED;
1247 }
1248
1249
1250 cnt=1;
1251 PMIX_BFROPS_UNPACK(rc, peer, buf, &uid, &cnt, PMIX_UINT32);
1252 if (PMIX_SUCCESS != rc) {
1253 PMIX_ERROR_LOG(rc);
1254 return rc;
1255 }
1256
1257 cnt=1;
1258 PMIX_BFROPS_UNPACK(rc, peer, buf, &nkeys, &cnt, PMIX_SIZE);
1259 if (PMIX_SUCCESS != rc) {
1260 PMIX_ERROR_LOG(rc);
1261 return rc;
1262 }
1263
1264 cd = PMIX_NEW(pmix_setup_caddy_t);
1265 if (NULL == cd) {
1266 return PMIX_ERR_NOMEM;
1267 }
1268 cd->opcbfunc = cbfunc;
1269 cd->cbdata = cbdata;
1270
1271 for (i=0; i < nkeys; i++) {
1272 cnt=1;
1273 PMIX_BFROPS_UNPACK(rc, peer, buf, &sptr, &cnt, PMIX_STRING);
1274 if (PMIX_SUCCESS != rc) {
1275 PMIX_ERROR_LOG(rc);
1276 goto cleanup;
1277 }
1278 pmix_argv_append_nosize(&cd->keys, sptr);
1279 free(sptr);
1280 }
1281
1282 cnt=1;
1283 PMIX_BFROPS_UNPACK(rc, peer, buf, &ninfo, &cnt, PMIX_SIZE);
1284 if (PMIX_SUCCESS != rc) {
1285 PMIX_ERROR_LOG(rc);
1286 goto cleanup;
1287 }
1288
1289 cd->ninfo = ninfo + 1;
1290 PMIX_INFO_CREATE(cd->info, cd->ninfo);
1291 if (NULL == cd->info) {
1292 rc = PMIX_ERR_NOMEM;
1293 goto cleanup;
1294 }
1295
1296 if (0 < ninfo) {
1297 cnt=ninfo;
1298 PMIX_BFROPS_UNPACK(rc, peer, buf, cd->info, &cnt, PMIX_INFO);
1299 if (PMIX_SUCCESS != rc) {
1300 PMIX_ERROR_LOG(rc);
1301 goto cleanup;
1302 }
1303 }
1304 pmix_strncpy(cd->info[cd->ninfo-1].key, PMIX_USERID, PMIX_MAX_KEYLEN);
1305 cd->info[cd->ninfo-1].value.type = PMIX_UINT32;
1306 cd->info[cd->ninfo-1].value.data.uint32 = uid;
1307
1308
1309 pmix_strncpy(proc.nspace, peer->info->pname.nspace, PMIX_MAX_NSLEN);
1310 proc.rank = peer->info->pname.rank;
1311 rc = pmix_host_server.unpublish(&proc, cd->keys, cd->info, cd->ninfo, opcbfunc, cd);
1312
1313 cleanup:
1314 if (PMIX_SUCCESS != rc) {
1315 if (NULL != cd->keys) {
1316 pmix_argv_free(cd->keys);
1317 }
1318 if (NULL != cd->info) {
1319 PMIX_INFO_FREE(cd->info, cd->ninfo);
1320 }
1321 PMIX_RELEASE(cd);
1322 }
1323 return rc;
1324 }
1325
1326 static void spcbfunc(pmix_status_t status,
1327 char nspace[], void *cbdata)
1328 {
1329 pmix_setup_caddy_t *cd = (pmix_setup_caddy_t*)cbdata;
1330 pmix_iof_req_t *req;
1331 pmix_buffer_t *msg;
1332 pmix_status_t rc;
1333 pmix_iof_cache_t *iof, *ionext;
1334
1335
1336
1337 if (PMIX_SUCCESS == status && PMIX_FWD_NO_CHANNELS != cd->channels) {
1338
1339 req = PMIX_NEW(pmix_iof_req_t);
1340 if (NULL == req) {
1341 status = PMIX_ERR_NOMEM;
1342 goto cleanup;
1343 }
1344 PMIX_RETAIN(cd->peer);
1345 req->peer = cd->peer;
1346 req->pname.nspace = strdup(nspace);
1347 req->pname.rank = PMIX_RANK_WILDCARD;
1348 req->channels = cd->channels;
1349 pmix_list_append(&pmix_globals.iof_requests, &req->super);
1350
1351 PMIX_LIST_FOREACH_SAFE(iof, ionext, &pmix_server_globals.iof, pmix_iof_cache_t) {
1352
1353 if (!(iof->channel & req->channels)) {
1354 continue;
1355 }
1356
1357 if (!PMIX_CHECK_PROCID(&iof->source, &req->pname)) {
1358 continue;
1359 }
1360
1361
1362 if (PMIX_CHECK_PROCID(&iof->source, &req->peer->info->pname)) {
1363 continue;
1364 }
1365 pmix_output_verbose(2, pmix_server_globals.iof_output,
1366 "PMIX:SERVER:SPAWN delivering cached IOF from %s:%d to %s:%d",
1367 iof->source.nspace, iof->source.rank,
1368 req->pname.nspace, req->pname.rank);
1369
1370 if (NULL == (msg = PMIX_NEW(pmix_buffer_t))) {
1371 PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
1372 rc = PMIX_ERR_OUT_OF_RESOURCE;
1373 break;
1374 }
1375
1376 PMIX_BFROPS_PACK(rc, req->peer, msg, &iof->source, 1, PMIX_PROC);
1377 if (PMIX_SUCCESS != rc) {
1378 PMIX_ERROR_LOG(rc);
1379 PMIX_RELEASE(msg);
1380 break;
1381 }
1382
1383 PMIX_BFROPS_PACK(rc, req->peer, msg, &iof->channel, 1, PMIX_IOF_CHANNEL);
1384 if (PMIX_SUCCESS != rc) {
1385 PMIX_ERROR_LOG(rc);
1386 PMIX_RELEASE(msg);
1387 break;
1388 }
1389
1390 PMIX_BFROPS_PACK(rc, req->peer, msg, iof->bo, 1, PMIX_BYTE_OBJECT);
1391 if (PMIX_SUCCESS != rc) {
1392 PMIX_ERROR_LOG(rc);
1393 PMIX_RELEASE(msg);
1394 break;
1395 }
1396
1397 PMIX_PTL_SEND_ONEWAY(rc, req->peer, msg, PMIX_PTL_TAG_IOF);
1398 if (PMIX_SUCCESS != rc) {
1399 PMIX_ERROR_LOG(rc);
1400 PMIX_RELEASE(msg);
1401 }
1402
1403 pmix_list_remove_item(&pmix_server_globals.iof, &iof->super);
1404 PMIX_RELEASE(iof);
1405 }
1406 }
1407
1408 cleanup:
1409
1410 if (NULL != cd->info) {
1411 PMIX_INFO_FREE(cd->info, cd->ninfo);
1412 }
1413 if (NULL != cd->apps) {
1414 PMIX_APP_FREE(cd->apps, cd->napps);
1415 }
1416 if (NULL != cd->spcbfunc) {
1417 cd->spcbfunc(status, nspace, cd->cbdata);
1418 }
1419 PMIX_RELEASE(cd);
1420 }
1421
1422 pmix_status_t pmix_server_spawn(pmix_peer_t *peer,
1423 pmix_buffer_t *buf,
1424 pmix_spawn_cbfunc_t cbfunc,
1425 void *cbdata)
1426 {
1427 pmix_setup_caddy_t *cd;
1428 int32_t cnt;
1429 pmix_status_t rc;
1430 pmix_proc_t proc;
1431 size_t ninfo, n;
1432 bool stdout_found = false, stderr_found = false, stddiag_found = false;
1433
1434 pmix_output_verbose(2, pmix_server_globals.spawn_output,
1435 "recvd SPAWN from %s:%d", peer->info->pname.nspace, peer->info->pname.rank);
1436
1437 if (NULL == pmix_host_server.spawn) {
1438 return PMIX_ERR_NOT_SUPPORTED;
1439 }
1440
1441
1442 cd = PMIX_NEW(pmix_setup_caddy_t);
1443 if (NULL == cd) {
1444 return PMIX_ERR_NOMEM;
1445 }
1446 PMIX_RETAIN(peer);
1447 cd->peer = peer;
1448 cd->spcbfunc = cbfunc;
1449 cd->cbdata = cbdata;
1450
1451
1452 cnt=1;
1453 PMIX_BFROPS_UNPACK(rc, peer, buf, &ninfo, &cnt, PMIX_SIZE);
1454 if (PMIX_SUCCESS != rc) {
1455 PMIX_ERROR_LOG(rc);
1456 PMIX_RELEASE(cd);
1457 return rc;
1458 }
1459
1460
1461 cd->ninfo = ninfo + 1;
1462 PMIX_INFO_CREATE(cd->info, cd->ninfo);
1463 if (NULL == cd->info) {
1464 rc = PMIX_ERR_NOMEM;
1465 goto cleanup;
1466 }
1467
1468
1469 if (0 < ninfo) {
1470 cnt = ninfo;
1471 PMIX_BFROPS_UNPACK(rc, peer, buf, cd->info, &cnt, PMIX_INFO);
1472 if (PMIX_SUCCESS != rc) {
1473 PMIX_ERROR_LOG(rc);
1474 goto cleanup;
1475 }
1476
1477
1478
1479
1480 cd->channels = PMIX_FWD_NO_CHANNELS;
1481 for (n=0; n < cd->ninfo; n++) {
1482 if (0 == strncmp(cd->info[n].key, PMIX_FWD_STDIN, PMIX_MAX_KEYLEN)) {
1483 if (PMIX_INFO_TRUE(&cd->info[n])) {
1484 cd->channels |= PMIX_FWD_STDIN_CHANNEL;
1485 }
1486 } else if (0 == strncmp(cd->info[n].key, PMIX_FWD_STDOUT, PMIX_MAX_KEYLEN)) {
1487 stdout_found = true;
1488 if (PMIX_INFO_TRUE(&cd->info[n])) {
1489 cd->channels |= PMIX_FWD_STDOUT_CHANNEL;
1490 }
1491 } else if (0 == strncmp(cd->info[n].key, PMIX_FWD_STDERR, PMIX_MAX_KEYLEN)) {
1492 stderr_found = true;
1493 if (PMIX_INFO_TRUE(&cd->info[n])) {
1494 cd->channels |= PMIX_FWD_STDERR_CHANNEL;
1495 }
1496 } else if (0 == strncmp(cd->info[n].key, PMIX_FWD_STDDIAG, PMIX_MAX_KEYLEN)) {
1497 stddiag_found = true;
1498 if (PMIX_INFO_TRUE(&cd->info[n])) {
1499 cd->channels |= PMIX_FWD_STDDIAG_CHANNEL;
1500 }
1501 }
1502 }
1503
1504
1505 }
1506
1507 if (PMIX_PROC_IS_TOOL(peer)) {
1508 PMIX_INFO_LOAD(&cd->info[ninfo], PMIX_REQUESTOR_IS_TOOL, NULL, PMIX_BOOL);
1509
1510
1511 if (!stdout_found) {
1512 cd->channels |= PMIX_FWD_STDOUT_CHANNEL;
1513 }
1514 if (!stderr_found) {
1515 cd->channels |= PMIX_FWD_STDERR_CHANNEL;
1516 }
1517 if (!stddiag_found) {
1518 cd->channels |= PMIX_FWD_STDDIAG_CHANNEL;
1519 }
1520 } else {
1521 PMIX_INFO_LOAD(&cd->info[ninfo], PMIX_REQUESTOR_IS_CLIENT, NULL, PMIX_BOOL);
1522 }
1523
1524
1525 cnt=1;
1526 PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->napps, &cnt, PMIX_SIZE);
1527 if (PMIX_SUCCESS != rc) {
1528 PMIX_ERROR_LOG(rc);
1529 goto cleanup;
1530 }
1531
1532 if (0 < cd->napps) {
1533 PMIX_APP_CREATE(cd->apps, cd->napps);
1534 if (NULL == cd->apps) {
1535 rc = PMIX_ERR_NOMEM;
1536 goto cleanup;
1537 }
1538 cnt = cd->napps;
1539 PMIX_BFROPS_UNPACK(rc, peer, buf, cd->apps, &cnt, PMIX_APP);
1540 if (PMIX_SUCCESS != rc) {
1541 PMIX_ERROR_LOG(rc);
1542 goto cleanup;
1543 }
1544 }
1545
1546 pmix_strncpy(proc.nspace, peer->info->pname.nspace, PMIX_MAX_NSLEN);
1547 proc.rank = peer->info->pname.rank;
1548 rc = pmix_host_server.spawn(&proc, cd->info, cd->ninfo, cd->apps, cd->napps, spcbfunc, cd);
1549
1550 cleanup:
1551 if (PMIX_SUCCESS != rc) {
1552 if (NULL != cd->info) {
1553 PMIX_INFO_FREE(cd->info, cd->ninfo);
1554 }
1555 if (NULL != cd->apps) {
1556 PMIX_APP_FREE(cd->apps, cd->napps);
1557 }
1558 PMIX_RELEASE(cd);
1559 }
1560 return rc;
1561 }
1562
1563 pmix_status_t pmix_server_disconnect(pmix_server_caddy_t *cd,
1564 pmix_buffer_t *buf,
1565 pmix_op_cbfunc_t cbfunc)
1566 {
1567 int32_t cnt;
1568 pmix_status_t rc;
1569 pmix_info_t *info = NULL;
1570 size_t nprocs, ninfo;
1571 pmix_server_trkr_t *trk;
1572 pmix_proc_t *procs = NULL;
1573
1574 if (NULL == pmix_host_server.disconnect) {
1575 return PMIX_ERR_NOT_SUPPORTED;
1576 }
1577
1578
1579 cnt = 1;
1580 PMIX_BFROPS_UNPACK(rc, cd->peer, buf, &nprocs, &cnt, PMIX_SIZE);
1581 if (PMIX_SUCCESS != rc) {
1582 PMIX_ERROR_LOG(rc);
1583 goto cleanup;
1584 }
1585
1586
1587
1588
1589
1590 if (nprocs < 1) {
1591 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
1592 rc = PMIX_ERR_BAD_PARAM;
1593 goto cleanup;
1594 }
1595
1596
1597 PMIX_PROC_CREATE(procs, nprocs);
1598 if (NULL == procs) {
1599 rc = PMIX_ERR_NOMEM;
1600 goto cleanup;
1601 }
1602 cnt = nprocs;
1603 PMIX_BFROPS_UNPACK(rc, cd->peer, buf, procs, &cnt, PMIX_PROC);
1604 if (PMIX_SUCCESS != rc) {
1605 PMIX_ERROR_LOG(rc);
1606 goto cleanup;
1607 }
1608
1609
1610 cnt = 1;
1611 PMIX_BFROPS_UNPACK(rc, cd->peer, buf, &ninfo, &cnt, PMIX_SIZE);
1612 if (PMIX_SUCCESS != rc) {
1613 return rc;
1614 }
1615 if (0 < ninfo) {
1616 PMIX_INFO_CREATE(info, ninfo);
1617 if (NULL == info) {
1618 rc = PMIX_ERR_NOMEM;
1619 goto cleanup;
1620 }
1621
1622 cnt = ninfo;
1623 PMIX_BFROPS_UNPACK(rc, cd->peer, buf, info, &cnt, PMIX_INFO);
1624 if (PMIX_SUCCESS != rc) {
1625 goto cleanup;
1626 }
1627 }
1628
1629
1630 if (NULL == (trk = get_tracker(NULL, procs, nprocs, PMIX_DISCONNECTNB_CMD))) {
1631
1632 if (NULL == (trk = new_tracker(NULL, procs, nprocs, PMIX_DISCONNECTNB_CMD))) {
1633
1634 PMIX_ERROR_LOG(PMIX_ERROR);
1635 rc = PMIX_ERROR;
1636 goto cleanup;
1637 }
1638 trk->op_cbfunc = cbfunc;
1639 }
1640
1641
1642
1643 if (NULL == trk->info && NULL != info) {
1644 trk->info = info;
1645 trk->ninfo = ninfo;
1646 info = NULL;
1647 ninfo = 0;
1648 }
1649
1650
1651
1652 pmix_list_append(&trk->local_cbs, &cd->super);
1653
1654
1655
1656
1657 if (trk->def_complete &&
1658 pmix_list_get_size(&trk->local_cbs) == trk->nlocal) {
1659 trk->host_called = true;
1660 rc = pmix_host_server.disconnect(trk->pcs, trk->npcs, trk->info, trk->ninfo, cbfunc, trk);
1661 if (PMIX_SUCCESS != rc) {
1662
1663
1664 pmix_list_remove_item(&trk->local_cbs, &cd->super);
1665 }
1666 } else {
1667 rc = PMIX_SUCCESS;
1668 }
1669
1670 cleanup:
1671 if (NULL != info) {
1672 PMIX_INFO_FREE(info, ninfo);
1673 }
1674 return rc;
1675 }
1676
1677 static void connect_timeout(int sd, short args, void *cbdata)
1678 {
1679 pmix_server_caddy_t *cd = (pmix_server_caddy_t*)cbdata;
1680
1681 pmix_output_verbose(2, pmix_server_globals.connect_output,
1682 "ALERT: connect timeout fired");
1683
1684
1685 if (NULL != cd->trk->op_cbfunc) {
1686 cd->trk->op_cbfunc(PMIX_ERR_TIMEOUT, cd->trk);
1687 return;
1688 }
1689 cd->event_active = false;
1690
1691 pmix_list_remove_item(&cd->trk->local_cbs, &cd->super);
1692 PMIX_RELEASE(cd);
1693 }
1694
1695 pmix_status_t pmix_server_connect(pmix_server_caddy_t *cd,
1696 pmix_buffer_t *buf,
1697 pmix_op_cbfunc_t cbfunc)
1698 {
1699 int32_t cnt;
1700 pmix_status_t rc;
1701 pmix_proc_t *procs = NULL;
1702 pmix_info_t *info = NULL;
1703 size_t nprocs, ninfo, n;
1704 pmix_server_trkr_t *trk;
1705 struct timeval tv = {0, 0};
1706
1707 pmix_output_verbose(2, pmix_server_globals.connect_output,
1708 "recvd CONNECT from peer %s:%d",
1709 cd->peer->info->pname.nspace,
1710 cd->peer->info->pname.rank);
1711
1712 if (NULL == pmix_host_server.connect) {
1713 return PMIX_ERR_NOT_SUPPORTED;
1714 }
1715
1716
1717 cnt = 1;
1718 PMIX_BFROPS_UNPACK(rc, cd->peer, buf, &nprocs, &cnt, PMIX_SIZE);
1719 if (PMIX_SUCCESS != rc) {
1720 PMIX_ERROR_LOG(rc);
1721 goto cleanup;
1722 }
1723
1724
1725
1726
1727
1728 if (nprocs < 1) {
1729 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
1730 rc = PMIX_ERR_BAD_PARAM;
1731 goto cleanup;
1732 }
1733
1734
1735 PMIX_PROC_CREATE(procs, nprocs);
1736 if (NULL == procs) {
1737 rc = PMIX_ERR_NOMEM;
1738 goto cleanup;
1739 }
1740 cnt = nprocs;
1741 PMIX_BFROPS_UNPACK(rc, cd->peer, buf, procs, &cnt, PMIX_PROC);
1742 if (PMIX_SUCCESS != rc) {
1743 PMIX_ERROR_LOG(rc);
1744 goto cleanup;
1745 }
1746
1747
1748 cnt = 1;
1749 PMIX_BFROPS_UNPACK(rc, cd->peer, buf, &ninfo, &cnt, PMIX_SIZE);
1750 if (PMIX_SUCCESS != rc) {
1751 return rc;
1752 }
1753 if (0 < ninfo) {
1754 PMIX_INFO_CREATE(info, ninfo);
1755 if (NULL == info) {
1756 rc = PMIX_ERR_NOMEM;
1757 goto cleanup;
1758 }
1759
1760 cnt = ninfo;
1761 PMIX_BFROPS_UNPACK(rc, cd->peer, buf, info, &cnt, PMIX_INFO);
1762 if (PMIX_SUCCESS != rc) {
1763 goto cleanup;
1764 }
1765
1766 for (n=0; n < ninfo; n++) {
1767 if (0 == strncmp(info[n].key, PMIX_TIMEOUT, PMIX_MAX_KEYLEN)) {
1768 tv.tv_sec = info[n].value.data.uint32;
1769 break;
1770 }
1771 }
1772 }
1773
1774
1775 if (NULL == (trk = get_tracker(NULL, procs, nprocs, PMIX_CONNECTNB_CMD))) {
1776
1777 if (NULL == (trk = new_tracker(NULL, procs, nprocs, PMIX_CONNECTNB_CMD))) {
1778
1779 PMIX_ERROR_LOG(PMIX_ERROR);
1780
1781 if (NULL != cbfunc) {
1782 cbfunc(PMIX_ERROR, cd);
1783 }
1784 rc = PMIX_ERROR;
1785 goto cleanup;
1786 }
1787 trk->op_cbfunc = cbfunc;
1788 }
1789
1790
1791
1792 if (NULL == trk->info && NULL != info) {
1793 trk->info = info;
1794 trk->ninfo = ninfo;
1795 info = NULL;
1796 ninfo = 0;
1797 }
1798
1799
1800
1801 pmix_list_append(&trk->local_cbs, &cd->super);
1802
1803
1804
1805
1806
1807 if (trk->def_complete &&
1808 pmix_list_get_size(&trk->local_cbs) == trk->nlocal) {
1809 trk->host_called = true;
1810 rc = pmix_host_server.connect(trk->pcs, trk->npcs, trk->info, trk->ninfo, cbfunc, trk);
1811 if (PMIX_SUCCESS != rc) {
1812
1813
1814 pmix_list_remove_item(&trk->local_cbs, &cd->super);
1815 }
1816 } else {
1817 rc = PMIX_SUCCESS;
1818 }
1819
1820 if (PMIX_SUCCESS == rc && 0 < tv.tv_sec) {
1821 PMIX_RETAIN(trk);
1822 cd->trk = trk;
1823 pmix_event_evtimer_set(pmix_globals.evbase, &cd->ev,
1824 connect_timeout, cd);
1825 pmix_event_evtimer_add(&cd->ev, &tv);
1826 cd->event_active = true;
1827 }
1828
1829 cleanup:
1830 if (NULL != procs) {
1831 PMIX_PROC_FREE(procs, nprocs);
1832 }
1833 if (NULL != info) {
1834 PMIX_INFO_FREE(info, ninfo);
1835 }
1836 return rc;
1837 }
1838
1839 static void _check_cached_events(int sd, short args, void *cbdata)
1840 {
1841 pmix_setup_caddy_t *scd = (pmix_setup_caddy_t*)cbdata;
1842 pmix_notify_caddy_t *cd;
1843 pmix_range_trkr_t rngtrk;
1844 pmix_proc_t proc;
1845 int i;
1846 size_t k, n;
1847 bool found, matched;
1848 pmix_buffer_t *relay;
1849 pmix_status_t ret = PMIX_SUCCESS;
1850 pmix_cmd_t cmd = PMIX_NOTIFY_CMD;
1851
1852
1853 rngtrk.procs = NULL;
1854 rngtrk.nprocs = 0;
1855 for (i=0; i < pmix_globals.max_events; i++) {
1856 pmix_hotel_knock(&pmix_globals.notifications, i, (void**)&cd);
1857 if (NULL == cd) {
1858 continue;
1859 }
1860 found = false;
1861 if (NULL == scd->codes) {
1862 if (!cd->nondefault) {
1863
1864 found = true;
1865 }
1866 } else {
1867 for (k=0; k < scd->ncodes; k++) {
1868 if (scd->codes[k] == cd->status) {
1869 found = true;
1870 break;
1871 }
1872 }
1873 }
1874 if (!found) {
1875 continue;
1876 }
1877
1878
1879 if (!pmix_notify_check_affected(cd->affected, cd->naffected,
1880 scd->procs, scd->nprocs)) {
1881 continue;
1882 }
1883
1884 if (NULL == cd->targets) {
1885 rngtrk.procs = &cd->source;
1886 rngtrk.nprocs = 1;
1887 } else {
1888 rngtrk.procs = cd->targets;
1889 rngtrk.nprocs = cd->ntargets;
1890 }
1891 rngtrk.range = cd->range;
1892 PMIX_LOAD_PROCID(&proc, scd->peer->info->pname.nspace, scd->peer->info->pname.rank);
1893 if (!pmix_notify_check_range(&rngtrk, &proc)) {
1894 continue;
1895 }
1896
1897 found = false;
1898 if (NULL != cd->targets) {
1899 matched = false;
1900 for (n=0; n < cd->ntargets; n++) {
1901
1902
1903
1904 if (PMIX_CHECK_PROCID(&cd->source, &scd->peer->info->pname)) {
1905 continue;
1906 }
1907 if (PMIX_CHECK_PROCID(&scd->peer->info->pname, &cd->targets[n])) {
1908 matched = true;
1909
1910 --cd->nleft;
1911
1912
1913 if (0 == cd->nleft) {
1914 pmix_hotel_checkout(&pmix_globals.notifications, cd->room);
1915 found = true;
1916 }
1917 break;
1918 }
1919 }
1920 if (!matched) {
1921
1922 continue;
1923 }
1924 }
1925
1926
1927 relay = PMIX_NEW(pmix_buffer_t);
1928 if (NULL == relay) {
1929
1930 PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
1931 ret = PMIX_ERR_NOMEM;
1932 break;
1933 }
1934
1935 PMIX_BFROPS_PACK(ret, scd->peer, relay, &cmd, 1, PMIX_COMMAND);
1936 if (PMIX_SUCCESS != ret) {
1937 PMIX_ERROR_LOG(ret);
1938 break;
1939 }
1940 PMIX_BFROPS_PACK(ret, scd->peer, relay, &cd->status, 1, PMIX_STATUS);
1941 if (PMIX_SUCCESS != ret) {
1942 PMIX_ERROR_LOG(ret);
1943 break;
1944 }
1945 PMIX_BFROPS_PACK(ret, scd->peer, relay, &cd->source, 1, PMIX_PROC);
1946 if (PMIX_SUCCESS != ret) {
1947 PMIX_ERROR_LOG(ret);
1948 break;
1949 }
1950 PMIX_BFROPS_PACK(ret, scd->peer, relay, &cd->ninfo, 1, PMIX_SIZE);
1951 if (PMIX_SUCCESS != ret) {
1952 PMIX_ERROR_LOG(ret);
1953 break;
1954 }
1955 if (0 < cd->ninfo) {
1956 PMIX_BFROPS_PACK(ret, scd->peer, relay, cd->info, cd->ninfo, PMIX_INFO);
1957 if (PMIX_SUCCESS != ret) {
1958 PMIX_ERROR_LOG(ret);
1959 break;
1960 }
1961 }
1962 PMIX_SERVER_QUEUE_REPLY(ret, scd->peer, 0, relay);
1963 if (PMIX_SUCCESS != ret) {
1964 PMIX_RELEASE(relay);
1965 }
1966 if (found) {
1967 PMIX_RELEASE(cd);
1968 }
1969 }
1970
1971 if (NULL != scd->codes) {
1972 free(scd->codes);
1973 }
1974 if (NULL != scd->info) {
1975 PMIX_INFO_FREE(scd->info, scd->ninfo);
1976 }
1977 if (NULL != scd->opcbfunc) {
1978 scd->opcbfunc(ret, scd->cbdata);
1979 }
1980 PMIX_RELEASE(scd);
1981 }
1982
1983
1984
1985 static void regevopcbfunc(pmix_status_t status, void *cbdata)
1986 {
1987 pmix_setup_caddy_t *cd = (pmix_setup_caddy_t*)cbdata;
1988
1989
1990 if (PMIX_SUCCESS == status) {
1991 _check_cached_events(0, 0, cd);
1992 return;
1993 }
1994
1995
1996
1997 if (NULL != cd->codes) {
1998 free(cd->codes);
1999 }
2000 if (NULL != cd->info) {
2001 PMIX_INFO_FREE(cd->info, cd->ninfo);
2002 }
2003 if (NULL != cd->opcbfunc) {
2004 cd->opcbfunc(status, cd->cbdata);
2005 }
2006 PMIX_RELEASE(cd);
2007 }
2008
2009
2010 pmix_status_t pmix_server_register_events(pmix_peer_t *peer,
2011 pmix_buffer_t *buf,
2012 pmix_op_cbfunc_t cbfunc,
2013 void *cbdata)
2014 {
2015 int32_t cnt;
2016 pmix_status_t rc;
2017 pmix_status_t *codes = NULL;
2018 pmix_info_t *info = NULL;
2019 size_t ninfo=0, ncodes, n;
2020 pmix_regevents_info_t *reginfo;
2021 pmix_peer_events_info_t *prev = NULL;
2022 pmix_setup_caddy_t *scd;
2023 bool enviro_events = false;
2024 bool found;
2025 pmix_proc_t *affected = NULL;
2026 size_t naffected = 0;
2027
2028 pmix_output_verbose(2, pmix_server_globals.event_output,
2029 "recvd register events for peer %s:%d",
2030 peer->info->pname.nspace, peer->info->pname.rank);
2031
2032
2033 cnt=1;
2034 PMIX_BFROPS_UNPACK(rc, peer, buf, &ncodes, &cnt, PMIX_SIZE);
2035 if (PMIX_SUCCESS != rc) {
2036 PMIX_ERROR_LOG(rc);
2037 return rc;
2038 }
2039
2040 if (0 < ncodes) {
2041 codes = (pmix_status_t*)malloc(ncodes * sizeof(pmix_status_t));
2042 if (NULL == codes) {
2043 rc = PMIX_ERR_NOMEM;
2044 goto cleanup;
2045 }
2046 cnt=ncodes;
2047 PMIX_BFROPS_UNPACK(rc, peer, buf, codes, &cnt, PMIX_STATUS);
2048 if (PMIX_SUCCESS != rc) {
2049 PMIX_ERROR_LOG(rc);
2050 goto cleanup;
2051 }
2052 }
2053
2054
2055 cnt=1;
2056 PMIX_BFROPS_UNPACK(rc, peer, buf, &ninfo, &cnt, PMIX_SIZE);
2057 if (PMIX_SUCCESS != rc) {
2058 PMIX_ERROR_LOG(rc);
2059 return rc;
2060 }
2061
2062 if (0 < ninfo) {
2063 PMIX_INFO_CREATE(info, ninfo);
2064 if (NULL == info) {
2065 rc = PMIX_ERR_NOMEM;
2066 goto cleanup;
2067 }
2068 cnt=ninfo;
2069 PMIX_BFROPS_UNPACK(rc, peer, buf, info, &cnt, PMIX_INFO);
2070 if (PMIX_SUCCESS != rc) {
2071 PMIX_ERROR_LOG(rc);
2072 goto cleanup;
2073 }
2074 }
2075
2076
2077 for (n=0; n < ninfo; n++) {
2078 if (PMIX_CHECK_KEY(&info[n], PMIX_EVENT_AFFECTED_PROC)) {
2079 if (NULL != affected) {
2080 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
2081 rc = PMIX_ERR_BAD_PARAM;
2082 goto cleanup;
2083 }
2084 naffected = 1;
2085 PMIX_PROC_CREATE(affected, naffected);
2086 memcpy(affected, info[n].value.data.proc, sizeof(pmix_proc_t));
2087 } else if (PMIX_CHECK_KEY(&info[n], PMIX_EVENT_AFFECTED_PROCS)) {
2088 if (NULL != affected) {
2089 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
2090 rc = PMIX_ERR_BAD_PARAM;
2091 goto cleanup;
2092 }
2093 naffected = info[n].value.data.darray->size;
2094 PMIX_PROC_CREATE(affected, naffected);
2095 memcpy(affected, info[n].value.data.darray->array, naffected * sizeof(pmix_proc_t));
2096 }
2097 }
2098
2099
2100 for (n=0; n < ncodes; n++) {
2101 if (PMIX_SYSTEM_EVENT(codes[n])) {
2102 enviro_events = true;
2103 break;
2104 }
2105 }
2106
2107
2108
2109 if (enviro_events && NULL == pmix_host_server.register_events) {
2110 enviro_events = false;
2111 rc = PMIX_ERR_NOT_SUPPORTED;
2112 goto cleanup;
2113 }
2114
2115
2116
2117
2118 if (0 == ncodes) {
2119 PMIX_LIST_FOREACH(reginfo, &pmix_server_globals.events, pmix_regevents_info_t) {
2120 if (PMIX_MAX_ERR_CONSTANT == reginfo->code) {
2121
2122 prev = PMIX_NEW(pmix_peer_events_info_t);
2123 if (NULL == prev) {
2124 rc = PMIX_ERR_NOMEM;
2125 goto cleanup;
2126 }
2127 PMIX_RETAIN(peer);
2128 prev->peer = peer;
2129 if (NULL != affected) {
2130 PMIX_PROC_CREATE(prev->affected, naffected);
2131 prev->naffected = naffected;
2132 memcpy(prev->affected, affected, naffected * sizeof(pmix_proc_t));
2133 }
2134 pmix_list_append(®info->peers, &prev->super);
2135 break;
2136 }
2137 }
2138 rc = PMIX_OPERATION_SUCCEEDED;
2139 goto cleanup;
2140 }
2141
2142
2143
2144 for (n=0; n < ncodes; n++) {
2145 found = false;
2146 PMIX_LIST_FOREACH(reginfo, &pmix_server_globals.events, pmix_regevents_info_t) {
2147 if (NULL == codes) {
2148 if (PMIX_MAX_ERR_CONSTANT == reginfo->code) {
2149
2150 found = true;
2151 break;
2152 } else {
2153 continue;
2154 }
2155 } else {
2156 if (PMIX_MAX_ERR_CONSTANT == reginfo->code) {
2157 continue;
2158 } else if (codes[n] == reginfo->code) {
2159 found = true;
2160 break;
2161 }
2162 }
2163 }
2164 if (found) {
2165
2166 prev = PMIX_NEW(pmix_peer_events_info_t);
2167 if (NULL == prev) {
2168 rc = PMIX_ERR_NOMEM;
2169 goto cleanup;
2170 }
2171 PMIX_RETAIN(peer);
2172 prev->peer = peer;
2173 if (NULL != affected) {
2174 PMIX_PROC_CREATE(prev->affected, naffected);
2175 prev->naffected = naffected;
2176 memcpy(prev->affected, affected, naffected * sizeof(pmix_proc_t));
2177 }
2178 prev->enviro_events = enviro_events;
2179 pmix_list_append(®info->peers, &prev->super);
2180 } else {
2181
2182 reginfo = PMIX_NEW(pmix_regevents_info_t);
2183 if (NULL == reginfo) {
2184 rc = PMIX_ERR_NOMEM;
2185 goto cleanup;
2186 }
2187 if (NULL == codes) {
2188 reginfo->code = PMIX_MAX_ERR_CONSTANT;
2189 } else {
2190 reginfo->code = codes[n];
2191 }
2192 pmix_list_append(&pmix_server_globals.events, ®info->super);
2193 prev = PMIX_NEW(pmix_peer_events_info_t);
2194 if (NULL == prev) {
2195 rc = PMIX_ERR_NOMEM;
2196 goto cleanup;
2197 }
2198 PMIX_RETAIN(peer);
2199 prev->peer = peer;
2200 if (NULL != affected) {
2201 PMIX_PROC_CREATE(prev->affected, naffected);
2202 prev->naffected = naffected;
2203 memcpy(prev->affected, affected, naffected * sizeof(pmix_proc_t));
2204 }
2205 prev->enviro_events = enviro_events;
2206 pmix_list_append(®info->peers, &prev->super);
2207 }
2208 }
2209
2210
2211 if (enviro_events) {
2212
2213 if (NULL == pmix_host_server.register_events) {
2214 rc = PMIX_ERR_NOT_SUPPORTED;
2215 goto cleanup;
2216 }
2217
2218
2219 scd = PMIX_NEW(pmix_setup_caddy_t);
2220 if (NULL == scd) {
2221 rc = PMIX_ERR_NOMEM;
2222 goto cleanup;
2223 }
2224 PMIX_RETAIN(peer);
2225 scd->peer = peer;
2226 scd->codes = codes;
2227 scd->ncodes = ncodes;
2228 scd->info = info;
2229 scd->ninfo = ninfo;
2230 scd->opcbfunc = cbfunc;
2231 scd->cbdata = cbdata;
2232 if (PMIX_SUCCESS == (rc = pmix_host_server.register_events(scd->codes, scd->ncodes, scd->info, scd->ninfo, regevopcbfunc, scd))) {
2233
2234 pmix_output_verbose(2, pmix_server_globals.event_output,
2235 "server register events: host server processing event registration");
2236 if (NULL != affected) {
2237 free(affected);
2238 }
2239 return rc;
2240 } else if (PMIX_OPERATION_SUCCEEDED == rc) {
2241
2242
2243
2244
2245
2246
2247
2248 PMIX_RETAIN(peer);
2249 scd->peer = peer;
2250 scd->procs = affected;
2251 scd->nprocs = naffected;
2252 scd->opcbfunc = NULL;
2253 scd->cbdata = NULL;
2254 PMIX_THREADSHIFT(scd, _check_cached_events);
2255 return rc;
2256 } else {
2257
2258 pmix_output_verbose(2, pmix_server_globals.event_output,
2259 "server register events: host server reg events returned rc =%d", rc);
2260 PMIX_RELEASE(scd);
2261 goto cleanup;
2262 }
2263 } else {
2264 rc = PMIX_OPERATION_SUCCEEDED;
2265
2266
2267
2268
2269
2270
2271
2272 scd = PMIX_NEW(pmix_setup_caddy_t);
2273 PMIX_RETAIN(peer);
2274 scd->peer = peer;
2275 scd->codes = codes;
2276 scd->ncodes = ncodes;
2277 scd->procs = affected;
2278 scd->nprocs = naffected;
2279 scd->opcbfunc = NULL;
2280 scd->cbdata = NULL;
2281 PMIX_THREADSHIFT(scd, _check_cached_events);
2282 if (NULL != info) {
2283 PMIX_INFO_FREE(info, ninfo);
2284 }
2285 return rc;
2286 }
2287
2288 cleanup:
2289 pmix_output_verbose(2, pmix_server_globals.event_output,
2290 "server register events: ninfo =%lu rc =%d", ninfo, rc);
2291 if (NULL != info) {
2292 PMIX_INFO_FREE(info, ninfo);
2293 }
2294 if (NULL != codes) {
2295 free(codes);
2296 }
2297 if (NULL != affected) {
2298 PMIX_PROC_FREE(affected, naffected);
2299 }
2300 return rc;
2301 }
2302
2303 void pmix_server_deregister_events(pmix_peer_t *peer,
2304 pmix_buffer_t *buf)
2305 {
2306 int32_t cnt;
2307 pmix_status_t rc, code;
2308 pmix_regevents_info_t *reginfo = NULL;
2309 pmix_regevents_info_t *reginfo_next;
2310 pmix_peer_events_info_t *prev;
2311
2312 pmix_output_verbose(2, pmix_server_globals.event_output,
2313 "recvd deregister events");
2314
2315
2316 cnt=1;
2317 PMIX_BFROPS_UNPACK(rc, peer, buf, &code, &cnt, PMIX_STATUS);
2318 while (PMIX_SUCCESS == rc) {
2319 PMIX_LIST_FOREACH_SAFE(reginfo, reginfo_next, &pmix_server_globals.events, pmix_regevents_info_t) {
2320 if (code == reginfo->code) {
2321
2322 PMIX_LIST_FOREACH(prev, ®info->peers, pmix_peer_events_info_t) {
2323 if (prev->peer == peer) {
2324
2325 pmix_list_remove_item(®info->peers, &prev->super);
2326 PMIX_RELEASE(prev);
2327 break;
2328 }
2329 }
2330
2331 if (0 == pmix_list_get_size(®info->peers)) {
2332 pmix_list_remove_item(&pmix_server_globals.events, ®info->super);
2333
2334 PMIX_RELEASE(reginfo);
2335 }
2336 }
2337 }
2338 cnt=1;
2339 PMIX_BFROPS_UNPACK(rc, peer, buf, &code, &cnt, PMIX_STATUS);
2340 }
2341 if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
2342 PMIX_ERROR_LOG(rc);
2343 }
2344 }
2345
2346
2347 static void local_cbfunc(pmix_status_t status, void *cbdata)
2348 {
2349 pmix_notify_caddy_t *cd = (pmix_notify_caddy_t*)cbdata;
2350
2351 if (NULL != cd->cbfunc) {
2352 cd->cbfunc(status, cd->cbdata);
2353 }
2354 PMIX_RELEASE(cd);
2355 }
2356
2357 static void intermed_step(pmix_status_t status, void *cbdata)
2358 {
2359 pmix_notify_caddy_t *cd = (pmix_notify_caddy_t*)cbdata;
2360 pmix_status_t rc;
2361
2362 if (PMIX_SUCCESS != status) {
2363 rc = status;
2364 goto complete;
2365 }
2366
2367
2368
2369
2370 if (PMIX_RANGE_LOCAL == cd->range) {
2371 rc = PMIX_SUCCESS;
2372 goto complete;
2373 }
2374
2375 if (NULL == pmix_host_server.notify_event) {
2376 rc = PMIX_ERR_NOT_SUPPORTED;
2377 goto complete;
2378 }
2379
2380
2381
2382
2383
2384
2385 PMIX_INFO_LOAD(&cd->info[cd->ninfo-1], PMIX_EVENT_PROXY, &pmix_globals.myid, PMIX_PROC);
2386
2387
2388 rc = pmix_host_server.notify_event(cd->status, &cd->source, cd->range,
2389 cd->info, cd->ninfo, local_cbfunc, cd);
2390 if (PMIX_SUCCESS == rc) {
2391
2392 return;
2393 }
2394 if (PMIX_OPERATION_SUCCEEDED == rc) {
2395 rc = PMIX_SUCCESS;
2396 }
2397
2398 complete:
2399 if (NULL != cd->cbfunc) {
2400 cd->cbfunc(rc, cd->cbdata);
2401 }
2402 PMIX_RELEASE(cd);
2403 }
2404
2405
2406
2407
2408
2409
2410 pmix_status_t pmix_server_event_recvd_from_client(pmix_peer_t *peer,
2411 pmix_buffer_t *buf,
2412 pmix_op_cbfunc_t cbfunc,
2413 void *cbdata)
2414 {
2415 int32_t cnt;
2416 pmix_status_t rc;
2417 pmix_notify_caddy_t *cd;
2418 size_t ninfo, n;
2419
2420 pmix_output_verbose(2, pmix_server_globals.event_output,
2421 "%s:%d recvd event notification from client %s:%d",
2422 pmix_globals.myid.nspace, pmix_globals.myid.rank,
2423 peer->info->pname.nspace, peer->info->pname.rank);
2424
2425 cd = PMIX_NEW(pmix_notify_caddy_t);
2426 if (NULL == cd) {
2427 return PMIX_ERR_NOMEM;
2428 }
2429 cd->cbfunc = cbfunc;
2430 cd->cbdata = cbdata;
2431
2432 PMIX_LOAD_PROCID(&cd->source, peer->info->pname.nspace, peer->info->pname.rank);
2433
2434
2435 cnt = 1;
2436 PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->status, &cnt, PMIX_STATUS);
2437 if (PMIX_SUCCESS != rc) {
2438 PMIX_ERROR_LOG(rc);
2439 goto exit;
2440 }
2441
2442
2443 cnt = 1;
2444 PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->range, &cnt, PMIX_DATA_RANGE);
2445 if (PMIX_SUCCESS != rc) {
2446 PMIX_ERROR_LOG(rc);
2447 goto exit;
2448 }
2449
2450
2451 cnt = 1;
2452 PMIX_BFROPS_UNPACK(rc, peer, buf, &ninfo, &cnt, PMIX_SIZE);
2453 if (PMIX_SUCCESS != rc) {
2454 PMIX_ERROR_LOG(rc);
2455 goto exit;
2456 }
2457 cd->ninfo = ninfo + 1;
2458 PMIX_INFO_CREATE(cd->info, cd->ninfo);
2459 if (NULL == cd->info) {
2460 rc = PMIX_ERR_NOMEM;
2461 goto exit;
2462 }
2463 if (0 < ninfo) {
2464 cnt = ninfo;
2465 PMIX_BFROPS_UNPACK(rc, peer, buf, cd->info, &cnt, PMIX_INFO);
2466 if (PMIX_SUCCESS != rc) {
2467 PMIX_ERROR_LOG(rc);
2468 goto exit;
2469 }
2470 }
2471
2472
2473
2474
2475 for (n=0; n < ninfo; n++) {
2476 if (PMIX_CHECK_KEY(&cd->info[n], PMIX_SERVER_INTERNAL_NOTIFY)) {
2477
2478 rc = PMIX_OPERATION_SUCCEEDED;
2479 goto exit;
2480 }
2481 }
2482
2483
2484 PMIX_INFO_LOAD(&cd->info[ninfo], PMIX_SERVER_INTERNAL_NOTIFY, NULL, PMIX_BOOL);
2485
2486 if (PMIX_SUCCESS != (rc = pmix_server_notify_client_of_event(cd->status,
2487 &cd->source,
2488 cd->range,
2489 cd->info, cd->ninfo,
2490 intermed_step, cd))) {
2491 goto exit;
2492 }
2493 if (PMIX_SUCCESS != rc) {
2494 PMIX_RELEASE(cd);
2495 }
2496 return rc;
2497
2498 exit:
2499 PMIX_RELEASE(cd);
2500 return rc;
2501 }
2502
2503
2504 pmix_status_t pmix_server_query(pmix_peer_t *peer,
2505 pmix_buffer_t *buf,
2506 pmix_info_cbfunc_t cbfunc,
2507 void *cbdata)
2508 {
2509 int32_t cnt;
2510 pmix_status_t rc;
2511 pmix_query_caddy_t *cd;
2512 pmix_proc_t proc;
2513 pmix_cb_t cb;
2514 size_t n, p;
2515 pmix_list_t results;
2516 pmix_kval_t *kv, *kvnxt;
2517
2518 pmix_output_verbose(2, pmix_server_globals.base_output,
2519 "recvd query from client");
2520
2521 cd = PMIX_NEW(pmix_query_caddy_t);
2522 if (NULL == cd) {
2523 return PMIX_ERR_NOMEM;
2524 }
2525 cd->cbdata = cbdata;
2526
2527 cnt = 1;
2528 PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->nqueries, &cnt, PMIX_SIZE);
2529 if (PMIX_SUCCESS != rc) {
2530 PMIX_ERROR_LOG(rc);
2531 PMIX_RELEASE(cd);
2532 return rc;
2533 }
2534
2535 if (0 < cd->nqueries) {
2536 PMIX_QUERY_CREATE(cd->queries, cd->nqueries);
2537 if (NULL == cd->queries) {
2538 rc = PMIX_ERR_NOMEM;
2539 PMIX_RELEASE(cd);
2540 return rc;
2541 }
2542 cnt = cd->nqueries;
2543 PMIX_BFROPS_UNPACK(rc, peer, buf, cd->queries, &cnt, PMIX_QUERY);
2544 if (PMIX_SUCCESS != rc) {
2545 PMIX_ERROR_LOG(rc);
2546 PMIX_RELEASE(cd);
2547 return rc;
2548 }
2549 }
2550
2551
2552
2553
2554
2555
2556
2557 memset(proc.nspace, 0, PMIX_MAX_NSLEN+1);
2558 proc.rank = PMIX_RANK_INVALID;
2559 PMIX_CONSTRUCT(&results, pmix_list_t);
2560
2561 for (n=0; n < cd->nqueries; n++) {
2562
2563 if (0 == strcmp(cd->queries[n].keys[0], PMIX_QUERY_ATTRIBUTE_SUPPORT)) {
2564
2565 cd->cbfunc = cbfunc;
2566 PMIX_RETAIN(cd);
2567 PMIX_THREADSHIFT(cd, pmix_attrs_query_support);
2568 PMIX_LIST_DESTRUCT(&results);
2569 return PMIX_SUCCESS;
2570 }
2571 for (p=0; p < cd->queries[n].nqual; p++) {
2572 if (PMIX_CHECK_KEY(&cd->queries[n].qualifiers[p], PMIX_QUERY_REFRESH_CACHE)) {
2573 if (PMIX_INFO_TRUE(&cd->queries[n].qualifiers[p])) {
2574 PMIX_LIST_DESTRUCT(&results);
2575 goto query;
2576 }
2577 } else if (PMIX_CHECK_KEY(&cd->queries[n].qualifiers[p], PMIX_PROCID)) {
2578 PMIX_LOAD_NSPACE(proc.nspace, cd->queries[n].qualifiers[p].value.data.proc->nspace);
2579 proc.rank = cd->queries[n].qualifiers[p].value.data.proc->rank;
2580 } else if (PMIX_CHECK_KEY(&cd->queries[n].qualifiers[p], PMIX_NSPACE)) {
2581 PMIX_LOAD_NSPACE(proc.nspace, cd->queries[n].qualifiers[p].value.data.string);
2582 } else if (PMIX_CHECK_KEY(&cd->queries[n].qualifiers[p], PMIX_RANK)) {
2583 proc.rank = cd->queries[n].qualifiers[p].value.data.rank;
2584 } else if (PMIX_CHECK_KEY(&cd->queries[n].qualifiers[p], PMIX_HOSTNAME)) {
2585 if (0 != strcmp(cd->queries[n].qualifiers[p].value.data.string, pmix_globals.hostname)) {
2586
2587 PMIX_LIST_DESTRUCT(&results);
2588 goto query;
2589 }
2590 }
2591 }
2592
2593
2594 PMIX_CONSTRUCT(&cb, pmix_cb_t);
2595 cb.copy = false;
2596
2597 if (PMIX_RANK_INVALID == proc.rank &&
2598 0 == strlen(proc.nspace)) {
2599
2600 cb.proc = &pmix_globals.myid;
2601 } else {
2602 if (0 == strlen(proc.nspace)) {
2603
2604 PMIX_LOAD_NSPACE(cb.proc->nspace, pmix_globals.myid.nspace);
2605 }
2606 if (PMIX_RANK_INVALID == proc.rank) {
2607
2608 proc.rank = PMIX_RANK_WILDCARD;
2609 }
2610 cb.proc = &proc;
2611 }
2612 for (p=0; NULL != cd->queries[n].keys[p]; p++) {
2613 cb.key = cd->queries[n].keys[p];
2614 PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, &cb);
2615 if (PMIX_SUCCESS != rc) {
2616
2617 PMIX_LIST_DESTRUCT(&results);
2618 PMIX_DESTRUCT(&cb);
2619 goto query;
2620 }
2621
2622 PMIX_LIST_FOREACH_SAFE(kv, kvnxt, &cb.kvs, pmix_kval_t) {
2623 pmix_list_remove_item(&cb.kvs, &kv->super);
2624 pmix_list_append(&results, &kv->super);
2625 }
2626 PMIX_DESTRUCT(&cb);
2627 }
2628 }
2629
2630
2631
2632 rc = PMIX_ERR_NOT_FOUND;
2633 if (0 < (cd->ninfo = pmix_list_get_size(&results))) {
2634 PMIX_INFO_CREATE(cd->info, cd->ninfo);
2635 n = 0;
2636 PMIX_LIST_FOREACH_SAFE(kv, kvnxt, &results, pmix_kval_t) {
2637 PMIX_LOAD_KEY(cd->info[n].key, kv->key);
2638 rc = pmix_value_xfer(&cd->info[n].value, kv->value);
2639 if (PMIX_SUCCESS != rc) {
2640 PMIX_INFO_FREE(cd->info, cd->ninfo);
2641 cd->info = NULL;
2642 cd->ninfo = 0;
2643 break;
2644 }
2645 ++n;
2646 }
2647 }
2648
2649 PMIX_LIST_DESTRUCT(&results);
2650
2651
2652
2653 cbfunc(PMIX_SUCCESS, cd->info, cd->ninfo, cd, NULL, NULL);
2654 return PMIX_SUCCESS;
2655
2656 query:
2657 if (NULL == pmix_host_server.query) {
2658 PMIX_RELEASE(cd);
2659 return PMIX_ERR_NOT_SUPPORTED;
2660 }
2661
2662
2663 PMIX_LOAD_PROCID(&proc, peer->info->pname.nspace, peer->info->pname.rank);
2664
2665
2666 if (PMIX_SUCCESS != (rc = pmix_host_server.query(&proc, cd->queries, cd->nqueries,
2667 cbfunc, cd))) {
2668 PMIX_RELEASE(cd);
2669 }
2670 return rc;
2671 }
2672
2673 static void logcbfn(pmix_status_t status, void *cbdata)
2674 {
2675 pmix_shift_caddy_t *cd = (pmix_shift_caddy_t*)cbdata;
2676
2677 if (NULL != cd->cbfunc.opcbfn) {
2678 cd->cbfunc.opcbfn(status, cd->cbdata);
2679 }
2680 PMIX_RELEASE(cd);
2681 }
2682 pmix_status_t pmix_server_log(pmix_peer_t *peer,
2683 pmix_buffer_t *buf,
2684 pmix_op_cbfunc_t cbfunc,
2685 void *cbdata)
2686 {
2687 int32_t cnt;
2688 pmix_status_t rc;
2689 pmix_shift_caddy_t *cd;
2690 pmix_proc_t proc;
2691 time_t timestamp;
2692
2693 pmix_output_verbose(2, pmix_server_globals.base_output,
2694 "recvd log from client");
2695
2696
2697
2698
2699
2700
2701 pmix_strncpy(proc.nspace, peer->info->pname.nspace, PMIX_MAX_NSLEN);
2702 proc.rank = peer->info->pname.rank;
2703
2704 cd = PMIX_NEW(pmix_shift_caddy_t);
2705 if (NULL == cd) {
2706 return PMIX_ERR_NOMEM;
2707 }
2708 cd->cbfunc.opcbfn = cbfunc;
2709 cd->cbdata = cbdata;
2710
2711 cnt = 1;
2712 PMIX_BFROPS_UNPACK(rc, peer, buf, ×tamp, &cnt, PMIX_TIME);
2713 if (PMIX_SUCCESS != rc) {
2714 PMIX_ERROR_LOG(rc);
2715 goto exit;
2716 }
2717
2718
2719 cnt = 1;
2720 PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->ninfo, &cnt, PMIX_SIZE);
2721 if (PMIX_SUCCESS != rc) {
2722 PMIX_ERROR_LOG(rc);
2723 goto exit;
2724 }
2725 cnt = cd->ninfo;
2726 PMIX_INFO_CREATE(cd->info, cd->ninfo);
2727
2728 if (0 < cnt) {
2729 PMIX_BFROPS_UNPACK(rc, peer, buf, cd->info, &cnt, PMIX_INFO);
2730 if (PMIX_SUCCESS != rc) {
2731 PMIX_ERROR_LOG(rc);
2732 goto exit;
2733 }
2734 }
2735
2736 cnt = 1;
2737 PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->ndirs, &cnt, PMIX_SIZE);
2738 if (PMIX_SUCCESS != rc) {
2739 PMIX_ERROR_LOG(rc);
2740 goto exit;
2741 }
2742 cnt = cd->ndirs;
2743
2744
2745
2746 cd->ndirs = cnt + 1;
2747
2748 if (0 < timestamp) {
2749 cd->ndirs++;
2750 PMIX_INFO_CREATE(cd->directives, cd->ndirs);
2751 PMIX_INFO_LOAD(&cd->directives[cnt], PMIX_LOG_SOURCE, &proc, PMIX_PROC);
2752 PMIX_INFO_LOAD(&cd->directives[cnt+1], PMIX_LOG_TIMESTAMP, ×tamp, PMIX_TIME);
2753 } else {
2754 PMIX_INFO_CREATE(cd->directives, cd->ndirs);
2755 PMIX_INFO_LOAD(&cd->directives[cnt], PMIX_LOG_SOURCE, &proc, PMIX_PROC);
2756 }
2757
2758
2759 if (0 < cnt) {
2760 PMIX_BFROPS_UNPACK(rc, peer, buf, cd->directives, &cnt, PMIX_INFO);
2761 if (PMIX_SUCCESS != rc) {
2762 PMIX_ERROR_LOG(rc);
2763 goto exit;
2764 }
2765 }
2766
2767
2768 rc = pmix_plog.log(&proc, cd->info, cd->ninfo,
2769 cd->directives, cd->ndirs,
2770 logcbfn, cd);
2771 return rc;
2772
2773 exit:
2774 PMIX_RELEASE(cd);
2775 return rc;
2776 }
2777
2778 pmix_status_t pmix_server_alloc(pmix_peer_t *peer,
2779 pmix_buffer_t *buf,
2780 pmix_info_cbfunc_t cbfunc,
2781 void *cbdata)
2782 {
2783 int32_t cnt;
2784 pmix_status_t rc;
2785 pmix_query_caddy_t *cd;
2786 pmix_proc_t proc;
2787 pmix_alloc_directive_t directive;
2788
2789 pmix_output_verbose(2, pmix_server_globals.base_output,
2790 "recvd query from client");
2791
2792 if (NULL == pmix_host_server.allocate) {
2793 return PMIX_ERR_NOT_SUPPORTED;
2794 }
2795
2796 cd = PMIX_NEW(pmix_query_caddy_t);
2797 if (NULL == cd) {
2798 return PMIX_ERR_NOMEM;
2799 }
2800 cd->cbdata = cbdata;
2801
2802
2803 cnt = 1;
2804 PMIX_BFROPS_UNPACK(rc, peer, buf, &directive, &cnt, PMIX_ALLOC_DIRECTIVE);
2805 if (PMIX_SUCCESS != rc) {
2806 PMIX_ERROR_LOG(rc);
2807 goto exit;
2808 }
2809
2810
2811 cnt = 1;
2812 PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->ninfo, &cnt, PMIX_SIZE);
2813 if (PMIX_SUCCESS != rc) {
2814 PMIX_ERROR_LOG(rc);
2815 goto exit;
2816 }
2817
2818 if (0 < cd->ninfo) {
2819 PMIX_INFO_CREATE(cd->info, cd->ninfo);
2820 cnt = cd->ninfo;
2821 PMIX_BFROPS_UNPACK(rc, peer, buf, cd->info, &cnt, PMIX_INFO);
2822 if (PMIX_SUCCESS != rc) {
2823 PMIX_ERROR_LOG(rc);
2824 goto exit;
2825 }
2826 }
2827
2828
2829 pmix_strncpy(proc.nspace, peer->info->pname.nspace, PMIX_MAX_NSLEN);
2830 proc.rank = peer->info->pname.rank;
2831
2832
2833 if (PMIX_SUCCESS != (rc = pmix_host_server.allocate(&proc, directive,
2834 cd->info, cd->ninfo,
2835 cbfunc, cd))) {
2836 goto exit;
2837 }
2838 return PMIX_SUCCESS;
2839
2840 exit:
2841 PMIX_RELEASE(cd);
2842 return rc;
2843 }
2844
2845 typedef struct {
2846 pmix_list_item_t super;
2847 pmix_epilog_t *epi;
2848 } pmix_srvr_epi_caddy_t;
2849 static PMIX_CLASS_INSTANCE(pmix_srvr_epi_caddy_t,
2850 pmix_list_item_t,
2851 NULL, NULL);
2852
2853 pmix_status_t pmix_server_job_ctrl(pmix_peer_t *peer,
2854 pmix_buffer_t *buf,
2855 pmix_info_cbfunc_t cbfunc,
2856 void *cbdata)
2857 {
2858 int32_t cnt, m;
2859 pmix_status_t rc;
2860 pmix_query_caddy_t *cd;
2861 pmix_namespace_t *nptr, *tmp;
2862 pmix_peer_t *pr;
2863 pmix_proc_t proc;
2864 size_t n;
2865 bool recurse = false, leave_topdir = false, duplicate;
2866 pmix_list_t cachedirs, cachefiles, ignorefiles, epicache;
2867 pmix_srvr_epi_caddy_t *epicd = NULL;
2868 pmix_cleanup_file_t *cf, *cf2, *cfptr;
2869 pmix_cleanup_dir_t *cdir, *cdir2, *cdirptr;
2870
2871 pmix_output_verbose(2, pmix_server_globals.base_output,
2872 "recvd job control request from client");
2873
2874 if (NULL == pmix_host_server.job_control) {
2875 return PMIX_ERR_NOT_SUPPORTED;
2876 }
2877
2878 cd = PMIX_NEW(pmix_query_caddy_t);
2879 if (NULL == cd) {
2880 return PMIX_ERR_NOMEM;
2881 }
2882 cd->cbdata = cbdata;
2883
2884 PMIX_CONSTRUCT(&epicache, pmix_list_t);
2885
2886
2887 cnt = 1;
2888 PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->ntargets, &cnt, PMIX_SIZE);
2889 if (PMIX_SUCCESS != rc) {
2890 PMIX_ERROR_LOG(rc);
2891 goto exit;
2892 }
2893 if (0 < cd->ntargets) {
2894 PMIX_PROC_CREATE(cd->targets, cd->ntargets);
2895 cnt = cd->ntargets;
2896 PMIX_BFROPS_UNPACK(rc, peer, buf, cd->targets, &cnt, PMIX_PROC);
2897 if (PMIX_SUCCESS != rc) {
2898 PMIX_ERROR_LOG(rc);
2899 goto exit;
2900 }
2901 }
2902
2903
2904 if (NULL == cd->targets) {
2905 epicd = PMIX_NEW(pmix_srvr_epi_caddy_t);
2906 epicd->epi = &peer->nptr->epilog;
2907 pmix_list_append(&epicache, &epicd->super);
2908 } else {
2909 for (n=0; n < cd->ntargets; n++) {
2910
2911 nptr = NULL;
2912 PMIX_LIST_FOREACH(tmp, &pmix_server_globals.nspaces, pmix_namespace_t) {
2913 if (0 == strcmp(tmp->nspace, cd->targets[n].nspace)) {
2914 nptr = tmp;
2915 break;
2916 }
2917 }
2918 if (NULL == nptr) {
2919 nptr = PMIX_NEW(pmix_namespace_t);
2920 if (NULL == nptr) {
2921 rc = PMIX_ERR_NOMEM;
2922 goto exit;
2923 }
2924 nptr->nspace = strdup(cd->targets[n].nspace);
2925 pmix_list_append(&pmix_server_globals.nspaces, &nptr->super);
2926 }
2927
2928 if (PMIX_RANK_WILDCARD == cd->targets[n].rank) {
2929 epicd = PMIX_NEW(pmix_srvr_epi_caddy_t);
2930 epicd->epi = &nptr->epilog;
2931 pmix_list_append(&epicache, &epicd->super);
2932 } else {
2933
2934
2935 for (m=0; m < pmix_server_globals.clients.size; m++) {
2936 if (NULL == (pr = (pmix_peer_t*)pmix_pointer_array_get_item(&pmix_server_globals.clients, m))) {
2937 continue;
2938 }
2939 if (0 != strncmp(pr->info->pname.nspace, cd->targets[n].nspace, PMIX_MAX_NSLEN)) {
2940 continue;
2941 }
2942 if (pr->info->pname.rank == cd->targets[n].rank) {
2943 epicd = PMIX_NEW(pmix_srvr_epi_caddy_t);
2944 epicd->epi = &pr->epilog;
2945 pmix_list_append(&epicache, &epicd->super);
2946 break;
2947 }
2948 }
2949 }
2950 }
2951 }
2952
2953
2954 cnt = 1;
2955 PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->ninfo, &cnt, PMIX_SIZE);
2956 if (PMIX_SUCCESS != rc) {
2957 PMIX_ERROR_LOG(rc);
2958 goto exit;
2959 }
2960
2961 if (0 < cd->ninfo) {
2962 PMIX_INFO_CREATE(cd->info, cd->ninfo);
2963 cnt = cd->ninfo;
2964 PMIX_BFROPS_UNPACK(rc, peer, buf, cd->info, &cnt, PMIX_INFO);
2965 if (PMIX_SUCCESS != rc) {
2966 PMIX_ERROR_LOG(rc);
2967 goto exit;
2968 }
2969 }
2970
2971
2972
2973 PMIX_CONSTRUCT(&cachedirs, pmix_list_t);
2974 PMIX_CONSTRUCT(&cachefiles, pmix_list_t);
2975 PMIX_CONSTRUCT(&ignorefiles, pmix_list_t);
2976
2977 cnt = 0;
2978 for (n=0; n < cd->ninfo; n++) {
2979 if (0 == strncmp(cd->info[n].key, PMIX_REGISTER_CLEANUP, PMIX_MAX_KEYLEN)) {
2980 ++cnt;
2981 if (PMIX_STRING != cd->info[n].value.type ||
2982 NULL == cd->info[n].value.data.string) {
2983
2984 rc = PMIX_ERR_BAD_PARAM;
2985 goto exit;
2986 }
2987 cf = PMIX_NEW(pmix_cleanup_file_t);
2988 if (NULL == cf) {
2989
2990 rc = PMIX_ERR_NOMEM;
2991 goto exit;
2992 }
2993 cf->path = strdup(cd->info[n].value.data.string);
2994 pmix_list_append(&cachefiles, &cf->super);
2995 } else if (0 == strncmp(cd->info[n].key, PMIX_REGISTER_CLEANUP_DIR, PMIX_MAX_KEYLEN)) {
2996 ++cnt;
2997 if (PMIX_STRING != cd->info[n].value.type ||
2998 NULL == cd->info[n].value.data.string) {
2999
3000 rc = PMIX_ERR_BAD_PARAM;
3001 goto exit;
3002 }
3003 cdir = PMIX_NEW(pmix_cleanup_dir_t);
3004 if (NULL == cdir) {
3005
3006 rc = PMIX_ERR_NOMEM;
3007 goto exit;
3008 }
3009 cdir->path = strdup(cd->info[n].value.data.string);
3010 pmix_list_append(&cachedirs, &cdir->super);
3011 } else if (0 == strncmp(cd->info[n].key, PMIX_CLEANUP_RECURSIVE, PMIX_MAX_KEYLEN)) {
3012 recurse = PMIX_INFO_TRUE(&cd->info[n]);
3013 ++cnt;
3014 } else if (0 == strncmp(cd->info[n].key, PMIX_CLEANUP_IGNORE, PMIX_MAX_KEYLEN)) {
3015 if (PMIX_STRING != cd->info[n].value.type ||
3016 NULL == cd->info[n].value.data.string) {
3017
3018 rc = PMIX_ERR_BAD_PARAM;
3019 goto exit;
3020 }
3021 cf = PMIX_NEW(pmix_cleanup_file_t);
3022 if (NULL == cf) {
3023
3024 rc = PMIX_ERR_NOMEM;
3025 goto exit;
3026 }
3027 cf->path = strdup(cd->info[n].value.data.string);
3028 pmix_list_append(&ignorefiles, &cf->super);
3029 ++cnt;
3030 } else if (0 == strncmp(cd->info[n].key, PMIX_CLEANUP_LEAVE_TOPDIR, PMIX_MAX_KEYLEN)) {
3031 leave_topdir = PMIX_INFO_TRUE(&cd->info[n]);
3032 ++cnt;
3033 }
3034 }
3035 if (0 < cnt) {
3036
3037 PMIX_LIST_FOREACH(cf, &ignorefiles, pmix_cleanup_file_t) {
3038 PMIX_LIST_FOREACH(epicd, &epicache, pmix_srvr_epi_caddy_t) {
3039
3040 duplicate = false;
3041 PMIX_LIST_FOREACH(cf2, &epicd->epi->cleanup_files, pmix_cleanup_file_t) {
3042 if (0 == strcmp(cf2->path, cf->path)) {
3043 duplicate = true;
3044 break;
3045 }
3046 }
3047 if (!duplicate) {
3048
3049 cfptr = PMIX_NEW(pmix_cleanup_file_t);
3050 cfptr->path = strdup(cf->path);
3051 pmix_list_append(&epicd->epi->ignores, &cf->super);
3052 }
3053 }
3054 }
3055 PMIX_LIST_DESTRUCT(&ignorefiles);
3056
3057 PMIX_LIST_FOREACH(cdir, &cachedirs, pmix_cleanup_dir_t) {
3058 PMIX_LIST_FOREACH(epicd, &epicache, pmix_srvr_epi_caddy_t) {
3059
3060 duplicate = false;
3061 PMIX_LIST_FOREACH(cdir2, &epicd->epi->cleanup_dirs, pmix_cleanup_dir_t) {
3062 if (0 == strcmp(cdir2->path, cdir->path)) {
3063
3064
3065 if (!cdir->recurse && recurse) {
3066 cdir->recurse = recurse;
3067 }
3068 if (!cdir->leave_topdir && leave_topdir) {
3069 cdir->leave_topdir = leave_topdir;
3070 }
3071 duplicate = true;
3072 break;
3073 }
3074 }
3075 if (!duplicate) {
3076
3077 PMIX_LIST_FOREACH(cf, &epicd->epi->ignores, pmix_cleanup_file_t) {
3078 if (0 == strcmp(cf->path, cdir->path)) {
3079
3080 rc = PMIX_ERR_CONFLICTING_CLEANUP_DIRECTIVES;
3081 PMIX_LIST_DESTRUCT(&cachedirs);
3082 PMIX_LIST_DESTRUCT(&cachefiles);
3083 goto exit;
3084 }
3085 }
3086
3087 cdirptr = PMIX_NEW(pmix_cleanup_dir_t);
3088 cdirptr->path = strdup(cdir->path);
3089 cdirptr->recurse = recurse;
3090 cdirptr->leave_topdir = leave_topdir;
3091 pmix_list_append(&epicd->epi->cleanup_dirs, &cdirptr->super);
3092 }
3093 }
3094 }
3095 PMIX_LIST_DESTRUCT(&cachedirs);
3096 PMIX_LIST_FOREACH(cf, &cachefiles, pmix_cleanup_file_t) {
3097 PMIX_LIST_FOREACH(epicd, &epicache, pmix_srvr_epi_caddy_t) {
3098
3099 duplicate = false;
3100 PMIX_LIST_FOREACH(cf2, &epicd->epi->cleanup_files, pmix_cleanup_file_t) {
3101 if (0 == strcmp(cf2->path, cf->path)) {
3102 duplicate = true;
3103 break;
3104 }
3105 }
3106 if (!duplicate) {
3107
3108 PMIX_LIST_FOREACH(cf2, &epicd->epi->ignores, pmix_cleanup_file_t) {
3109 if (0 == strcmp(cf->path, cf2->path)) {
3110
3111 rc = PMIX_ERR_CONFLICTING_CLEANUP_DIRECTIVES;
3112 PMIX_LIST_DESTRUCT(&cachedirs);
3113 PMIX_LIST_DESTRUCT(&cachefiles);
3114 goto exit;
3115 }
3116 }
3117
3118 cfptr = PMIX_NEW(pmix_cleanup_file_t);
3119 cfptr->path = strdup(cf->path);
3120 pmix_list_append(&epicd->epi->cleanup_files, &cfptr->super);
3121 }
3122 }
3123 }
3124 PMIX_LIST_DESTRUCT(&cachefiles);
3125 if (cnt == (int)cd->ninfo) {
3126
3127 rc = PMIX_OPERATION_SUCCEEDED;
3128 goto exit;
3129 }
3130 }
3131
3132
3133 pmix_strncpy(proc.nspace, peer->info->pname.nspace, PMIX_MAX_NSLEN);
3134 proc.rank = peer->info->pname.rank;
3135
3136
3137 if (PMIX_SUCCESS != (rc = pmix_host_server.job_control(&proc,
3138 cd->targets, cd->ntargets,
3139 cd->info, cd->ninfo,
3140 cbfunc, cd))) {
3141 goto exit;
3142 }
3143 PMIX_LIST_DESTRUCT(&epicache);
3144 return PMIX_SUCCESS;
3145
3146 exit:
3147 PMIX_RELEASE(cd);
3148 PMIX_LIST_DESTRUCT(&epicache);
3149 return rc;
3150 }
3151
3152 pmix_status_t pmix_server_monitor(pmix_peer_t *peer,
3153 pmix_buffer_t *buf,
3154 pmix_info_cbfunc_t cbfunc,
3155 void *cbdata)
3156 {
3157 int32_t cnt;
3158 pmix_info_t monitor;
3159 pmix_status_t rc, error;
3160 pmix_query_caddy_t *cd;
3161 pmix_proc_t proc;
3162
3163 pmix_output_verbose(2, pmix_server_globals.base_output,
3164 "recvd monitor request from client");
3165
3166
3167 cd = PMIX_NEW(pmix_query_caddy_t);
3168 if (NULL == cd) {
3169 return PMIX_ERR_NOMEM;
3170 }
3171 cd->cbdata = cbdata;
3172
3173
3174 PMIX_INFO_CONSTRUCT(&monitor);
3175 cnt = 1;
3176 PMIX_BFROPS_UNPACK(rc, peer, buf, &monitor, &cnt, PMIX_INFO);
3177 if (PMIX_SUCCESS != rc) {
3178 PMIX_ERROR_LOG(rc);
3179 goto exit;
3180 }
3181
3182
3183 cnt = 1;
3184 PMIX_BFROPS_UNPACK(rc, peer, buf, &error, &cnt, PMIX_STATUS);
3185 if (PMIX_SUCCESS != rc) {
3186 PMIX_ERROR_LOG(rc);
3187 goto exit;
3188 }
3189
3190
3191 cnt = 1;
3192 PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->ninfo, &cnt, PMIX_SIZE);
3193 if (PMIX_SUCCESS != rc) {
3194 PMIX_ERROR_LOG(rc);
3195 goto exit;
3196 }
3197
3198 if (0 < cd->ninfo) {
3199 PMIX_INFO_CREATE(cd->info, cd->ninfo);
3200 cnt = cd->ninfo;
3201 PMIX_BFROPS_UNPACK(rc, peer, buf, cd->info, &cnt, PMIX_INFO);
3202 if (PMIX_SUCCESS != rc) {
3203 PMIX_ERROR_LOG(rc);
3204 goto exit;
3205 }
3206 }
3207
3208
3209
3210 rc = pmix_psensor.start(peer, error, &monitor, cd->info, cd->ninfo);
3211 if (PMIX_SUCCESS == rc) {
3212 rc = PMIX_OPERATION_SUCCEEDED;
3213 goto exit;
3214 }
3215 if (PMIX_ERR_NOT_SUPPORTED != rc) {
3216 goto exit;
3217 }
3218
3219
3220
3221 if (NULL == pmix_host_server.monitor) {
3222 rc = PMIX_ERR_NOT_SUPPORTED;
3223 goto exit;
3224 }
3225
3226
3227 pmix_strncpy(proc.nspace, peer->info->pname.nspace, PMIX_MAX_NSLEN);
3228 proc.rank = peer->info->pname.rank;
3229
3230
3231 if (PMIX_SUCCESS != (rc = pmix_host_server.monitor(&proc, &monitor, error,
3232 cd->info, cd->ninfo,
3233 cbfunc, cd))) {
3234 goto exit;
3235 }
3236 return PMIX_SUCCESS;
3237
3238 exit:
3239 PMIX_INFO_DESTRUCT(&monitor);
3240 PMIX_RELEASE(cd);
3241 return rc;
3242 }
3243
3244 pmix_status_t pmix_server_get_credential(pmix_peer_t *peer,
3245 pmix_buffer_t *buf,
3246 pmix_credential_cbfunc_t cbfunc,
3247 void *cbdata)
3248 {
3249 int32_t cnt;
3250 pmix_status_t rc;
3251 pmix_query_caddy_t *cd;
3252 pmix_proc_t proc;
3253
3254 pmix_output_verbose(2, pmix_globals.debug_output,
3255 "recvd get credential request from client");
3256
3257 if (NULL == pmix_host_server.get_credential) {
3258 return PMIX_ERR_NOT_SUPPORTED;
3259 }
3260
3261 cd = PMIX_NEW(pmix_query_caddy_t);
3262 if (NULL == cd) {
3263 return PMIX_ERR_NOMEM;
3264 }
3265 cd->cbdata = cbdata;
3266
3267
3268 cnt = 1;
3269 PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->ninfo, &cnt, PMIX_SIZE);
3270 if (PMIX_SUCCESS != rc) {
3271 PMIX_ERROR_LOG(rc);
3272 goto exit;
3273 }
3274
3275 if (0 < cd->ninfo) {
3276 PMIX_INFO_CREATE(cd->info, cd->ninfo);
3277 cnt = cd->ninfo;
3278 PMIX_BFROPS_UNPACK(rc, peer, buf, cd->info, &cnt, PMIX_INFO);
3279 if (PMIX_SUCCESS != rc) {
3280 PMIX_ERROR_LOG(rc);
3281 goto exit;
3282 }
3283 }
3284
3285
3286 pmix_strncpy(proc.nspace, peer->info->pname.nspace, PMIX_MAX_NSLEN);
3287 proc.rank = peer->info->pname.rank;
3288
3289
3290 if (PMIX_SUCCESS != (rc = pmix_host_server.get_credential(&proc, cd->info, cd->ninfo,
3291 cbfunc, cd))) {
3292 goto exit;
3293 }
3294 return PMIX_SUCCESS;
3295
3296 exit:
3297 PMIX_RELEASE(cd);
3298 return rc;
3299 }
3300
3301 pmix_status_t pmix_server_validate_credential(pmix_peer_t *peer,
3302 pmix_buffer_t *buf,
3303 pmix_validation_cbfunc_t cbfunc,
3304 void *cbdata)
3305 {
3306 int32_t cnt;
3307 pmix_status_t rc;
3308 pmix_query_caddy_t *cd;
3309 pmix_proc_t proc;
3310
3311 pmix_output_verbose(2, pmix_globals.debug_output,
3312 "recvd validate credential request from client");
3313
3314 if (NULL == pmix_host_server.validate_credential) {
3315 return PMIX_ERR_NOT_SUPPORTED;
3316 }
3317
3318 cd = PMIX_NEW(pmix_query_caddy_t);
3319 if (NULL == cd) {
3320 return PMIX_ERR_NOMEM;
3321 }
3322 cd->cbdata = cbdata;
3323
3324
3325 cnt = 1;
3326 PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->bo, &cnt, PMIX_BYTE_OBJECT);
3327 if (PMIX_SUCCESS != rc) {
3328 PMIX_ERROR_LOG(rc);
3329 goto exit;
3330 }
3331
3332
3333 cnt = 1;
3334 PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->ninfo, &cnt, PMIX_SIZE);
3335 if (PMIX_SUCCESS != rc) {
3336 PMIX_ERROR_LOG(rc);
3337 goto exit;
3338 }
3339
3340 if (0 < cd->ninfo) {
3341 PMIX_INFO_CREATE(cd->info, cd->ninfo);
3342 cnt = cd->ninfo;
3343 PMIX_BFROPS_UNPACK(rc, peer, buf, cd->info, &cnt, PMIX_INFO);
3344 if (PMIX_SUCCESS != rc) {
3345 PMIX_ERROR_LOG(rc);
3346 goto exit;
3347 }
3348 }
3349
3350
3351 pmix_strncpy(proc.nspace, peer->info->pname.nspace, PMIX_MAX_NSLEN);
3352 proc.rank = peer->info->pname.rank;
3353
3354
3355 if (PMIX_SUCCESS != (rc = pmix_host_server.validate_credential(&proc, &cd->bo,
3356 cd->info, cd->ninfo,
3357 cbfunc, cd))) {
3358 goto exit;
3359 }
3360 return PMIX_SUCCESS;
3361
3362 exit:
3363 PMIX_RELEASE(cd);
3364 return rc;
3365 }
3366
3367 pmix_status_t pmix_server_iofreg(pmix_peer_t *peer,
3368 pmix_buffer_t *buf,
3369 pmix_op_cbfunc_t cbfunc,
3370 void *cbdata)
3371 {
3372 int32_t cnt;
3373 pmix_status_t rc;
3374 pmix_setup_caddy_t *cd;
3375 pmix_iof_req_t *req;
3376 bool notify, match;
3377 size_t n;
3378 pmix_buffer_t *msg;
3379 pmix_iof_cache_t *iof, *ionext;
3380
3381 pmix_output_verbose(2, pmix_server_globals.iof_output,
3382 "recvd IOF PULL request from client");
3383
3384 if (NULL == pmix_host_server.iof_pull) {
3385 return PMIX_ERR_NOT_SUPPORTED;
3386 }
3387
3388 cd = PMIX_NEW(pmix_setup_caddy_t);
3389 if (NULL == cd) {
3390 return PMIX_ERR_NOMEM;
3391 }
3392 cd->cbdata = cbdata;
3393
3394
3395 cnt = 1;
3396 PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->nprocs, &cnt, PMIX_SIZE);
3397 if (PMIX_SUCCESS != rc) {
3398 PMIX_ERROR_LOG(rc);
3399 goto exit;
3400 }
3401
3402 if (0 < cd->nprocs) {
3403 PMIX_PROC_CREATE(cd->procs, cd->nprocs);
3404 cnt = cd->nprocs;
3405 PMIX_BFROPS_UNPACK(rc, peer, buf, cd->procs, &cnt, PMIX_PROC);
3406 if (PMIX_SUCCESS != rc) {
3407 PMIX_ERROR_LOG(rc);
3408 goto exit;
3409 }
3410 }
3411
3412
3413 cnt = 1;
3414 PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->ninfo, &cnt, PMIX_SIZE);
3415 if (PMIX_SUCCESS != rc) {
3416 PMIX_ERROR_LOG(rc);
3417 goto exit;
3418 }
3419
3420 if (0 < cd->ninfo) {
3421 PMIX_INFO_CREATE(cd->info, cd->ninfo);
3422 cnt = cd->ninfo;
3423 PMIX_BFROPS_UNPACK(rc, peer, buf, cd->info, &cnt, PMIX_INFO);
3424 if (PMIX_SUCCESS != rc) {
3425 PMIX_ERROR_LOG(rc);
3426 goto exit;
3427 }
3428 }
3429
3430
3431 cnt = 1;
3432 PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->channels, &cnt, PMIX_IOF_CHANNEL);
3433 if (PMIX_SUCCESS != rc) {
3434 PMIX_ERROR_LOG(rc);
3435 goto exit;
3436 }
3437
3438
3439 notify = false;
3440 for (n=0; n < cd->nprocs; n++) {
3441 match = false;
3442 PMIX_LIST_FOREACH(req, &pmix_globals.iof_requests, pmix_iof_req_t) {
3443
3444 if (peer != req->peer) {
3445 continue;
3446 }
3447
3448 if (0 == strncmp(cd->procs[n].nspace, req->pname.nspace, PMIX_MAX_NSLEN) &&
3449 (PMIX_RANK_WILDCARD == req->pname.rank || cd->procs[n].rank == req->pname.rank)) {
3450 match = true;
3451 if ((req->channels & cd->channels) != cd->channels) {
3452
3453 req->channels |= cd->channels;
3454
3455 notify = true;
3456 }
3457 break;
3458 }
3459 }
3460
3461 if (!match) {
3462
3463 req = PMIX_NEW(pmix_iof_req_t);
3464 if (NULL == req) {
3465 rc = PMIX_ERR_NOMEM;
3466 goto exit;
3467 }
3468 PMIX_RETAIN(peer);
3469 req->peer = peer;
3470 req->pname.nspace = strdup(cd->procs[n].nspace);
3471 req->pname.rank = cd->procs[n].rank;
3472 req->channels = cd->channels;
3473 pmix_list_append(&pmix_globals.iof_requests, &req->super);
3474 }
3475
3476 PMIX_LIST_FOREACH_SAFE(iof, ionext, &pmix_server_globals.iof, pmix_iof_cache_t) {
3477
3478 if (!(iof->channel & req->channels)) {
3479 continue;
3480 }
3481
3482 if (!PMIX_CHECK_PROCID(&iof->source, &req->pname)) {
3483 continue;
3484 }
3485
3486
3487 if (PMIX_CHECK_PROCID(&iof->source, &req->peer->info->pname)) {
3488 continue;
3489 }
3490 pmix_output_verbose(2, pmix_server_globals.iof_output,
3491 "PMIX:SERVER:IOFREQ delivering cached IOF from %s:%d to %s:%d",
3492 iof->source.nspace, iof->source.rank,
3493 req->peer->info->pname.nspace, req->peer->info->pname.rank);
3494
3495 if (NULL == (msg = PMIX_NEW(pmix_buffer_t))) {
3496 PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
3497 rc = PMIX_ERR_OUT_OF_RESOURCE;
3498 break;
3499 }
3500
3501 PMIX_BFROPS_PACK(rc, req->peer, msg, &iof->source, 1, PMIX_PROC);
3502 if (PMIX_SUCCESS != rc) {
3503 PMIX_ERROR_LOG(rc);
3504 PMIX_RELEASE(msg);
3505 break;
3506 }
3507
3508 PMIX_BFROPS_PACK(rc, req->peer, msg, &iof->channel, 1, PMIX_IOF_CHANNEL);
3509 if (PMIX_SUCCESS != rc) {
3510 PMIX_ERROR_LOG(rc);
3511 PMIX_RELEASE(msg);
3512 break;
3513 }
3514
3515 PMIX_BFROPS_PACK(rc, req->peer, msg, iof->bo, 1, PMIX_BYTE_OBJECT);
3516 if (PMIX_SUCCESS != rc) {
3517 PMIX_ERROR_LOG(rc);
3518 PMIX_RELEASE(msg);
3519 break;
3520 }
3521
3522 PMIX_PTL_SEND_ONEWAY(rc, req->peer, msg, PMIX_PTL_TAG_IOF);
3523 if (PMIX_SUCCESS != rc) {
3524 PMIX_ERROR_LOG(rc);
3525 PMIX_RELEASE(msg);
3526 }
3527
3528 pmix_list_remove_item(&pmix_server_globals.iof, &iof->super);
3529 PMIX_RELEASE(iof);
3530 }
3531 }
3532 if (notify) {
3533
3534 if (PMIX_SUCCESS != (rc = pmix_host_server.iof_pull(cd->procs, cd->nprocs,
3535 cd->info, cd->ninfo,
3536 cd->channels,
3537 cbfunc, cd))) {
3538 goto exit;
3539 }
3540 }
3541 return PMIX_SUCCESS;
3542
3543 exit:
3544 PMIX_RELEASE(cd);
3545 return rc;
3546 }
3547
3548 static void stdcbfunc(pmix_status_t status, void *cbdata)
3549 {
3550 pmix_setup_caddy_t *cd = (pmix_setup_caddy_t*)cbdata;
3551
3552 if (NULL != cd->opcbfunc) {
3553 cd->opcbfunc(status, cd->cbdata);
3554 }
3555 if (NULL != cd->procs) {
3556 PMIX_PROC_FREE(cd->procs, cd->nprocs);
3557 }
3558 if (NULL != cd->info) {
3559 PMIX_INFO_FREE(cd->info, cd->ninfo);
3560 }
3561 if (NULL != cd->bo) {
3562 PMIX_BYTE_OBJECT_FREE(cd->bo, 1);
3563 }
3564 PMIX_RELEASE(cd);
3565 }
3566
3567 pmix_status_t pmix_server_iofstdin(pmix_peer_t *peer,
3568 pmix_buffer_t *buf,
3569 pmix_op_cbfunc_t cbfunc,
3570 void *cbdata)
3571 {
3572 int32_t cnt;
3573 pmix_status_t rc;
3574 pmix_proc_t source;
3575 pmix_setup_caddy_t *cd;
3576
3577 pmix_output_verbose(2, pmix_server_globals.iof_output,
3578 "recvd stdin IOF data from tool");
3579
3580 if (NULL == pmix_host_server.push_stdin) {
3581 return PMIX_ERR_NOT_SUPPORTED;
3582 }
3583
3584 cd = PMIX_NEW(pmix_setup_caddy_t);
3585 if (NULL == cd) {
3586 return PMIX_ERR_NOMEM;
3587 }
3588 cd->opcbfunc = cbfunc;
3589 cd->cbdata = cbdata;
3590
3591
3592 cnt = 1;
3593 PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->nprocs, &cnt, PMIX_SIZE);
3594 if (PMIX_SUCCESS != rc) {
3595 PMIX_ERROR_LOG(rc);
3596 goto error;
3597 }
3598 if (0 < cd->nprocs) {
3599 PMIX_PROC_CREATE(cd->procs, cd->nprocs);
3600 if (NULL == cd->procs) {
3601 rc = PMIX_ERR_NOMEM;
3602 goto error;
3603 }
3604 cnt = cd->nprocs;
3605 PMIX_BFROPS_UNPACK(rc, peer, buf, cd->procs, &cnt, PMIX_PROC);
3606 if (PMIX_SUCCESS != rc) {
3607 PMIX_ERROR_LOG(rc);
3608 goto error;
3609 }
3610 }
3611
3612
3613 cnt = 1;
3614 PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->ninfo, &cnt, PMIX_SIZE);
3615 if (PMIX_SUCCESS != rc) {
3616 PMIX_ERROR_LOG(rc);
3617 goto error;
3618 }
3619 if (0 < cd->ninfo) {
3620 PMIX_INFO_CREATE(cd->info, cd->ninfo);
3621 if (NULL == cd->info) {
3622 rc = PMIX_ERR_NOMEM;
3623 goto error;
3624 }
3625 cnt = cd->ninfo;
3626 PMIX_BFROPS_UNPACK(rc, peer, buf, cd->info, &cnt, PMIX_INFO);
3627 if (PMIX_SUCCESS != rc) {
3628 PMIX_ERROR_LOG(rc);
3629 goto error;
3630 }
3631 }
3632
3633
3634 PMIX_BYTE_OBJECT_CREATE(cd->bo, 1);
3635 if (NULL == cd->bo) {
3636 rc = PMIX_ERR_NOMEM;
3637 goto error;
3638 }
3639
3640 cnt = 1;
3641 PMIX_BFROPS_UNPACK(rc, peer, buf, cd->bo, &cnt, PMIX_BYTE_OBJECT);
3642 if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER == rc) {
3643
3644 PMIX_BYTE_OBJECT_FREE(cd->bo, 1);
3645 } else if (PMIX_SUCCESS != rc) {
3646 PMIX_ERROR_LOG(rc);
3647 goto error;
3648 }
3649
3650
3651 pmix_strncpy(source.nspace, peer->nptr->nspace, PMIX_MAX_NSLEN);
3652 source.rank = peer->info->pname.rank;
3653 if (PMIX_SUCCESS != (rc = pmix_host_server.push_stdin(&source, cd->procs, cd->nprocs,
3654 cd->info, cd->ninfo, cd->bo,
3655 stdcbfunc, cd))) {
3656 if (PMIX_OPERATION_SUCCEEDED != rc) {
3657 goto error;
3658 }
3659 }
3660 return rc;
3661
3662 error:
3663 PMIX_RELEASE(cd);
3664 return rc;
3665 }
3666
3667 static void grp_timeout(int sd, short args, void *cbdata)
3668 {
3669 pmix_server_caddy_t *cd = (pmix_server_caddy_t*)cbdata;
3670 pmix_buffer_t *reply;
3671 pmix_status_t ret, rc = PMIX_ERR_TIMEOUT;
3672
3673 pmix_output_verbose(2, pmix_server_globals.fence_output,
3674 "ALERT: grp construct timeout fired");
3675
3676
3677 reply = PMIX_NEW(pmix_buffer_t);
3678 if (NULL == reply) {
3679 goto error;
3680 }
3681
3682 PMIX_BFROPS_PACK(ret, cd->peer, reply, &rc, 1, PMIX_STATUS);
3683 if (PMIX_SUCCESS != ret) {
3684 PMIX_ERROR_LOG(ret);
3685 PMIX_RELEASE(reply);
3686 goto error;
3687 }
3688 pmix_output_verbose(2, pmix_server_globals.base_output,
3689 "server:grp_timeout reply being sent to %s:%u",
3690 cd->peer->info->pname.nspace, cd->peer->info->pname.rank);
3691 PMIX_SERVER_QUEUE_REPLY(ret, cd->peer, cd->hdr.tag, reply);
3692 if (PMIX_SUCCESS != ret) {
3693 PMIX_RELEASE(reply);
3694 }
3695
3696 error:
3697 cd->event_active = false;
3698
3699 pmix_list_remove_item(&cd->trk->local_cbs, &cd->super);
3700 PMIX_RELEASE(cd);
3701 }
3702
3703 static void _grpcbfunc(int sd, short argc, void *cbdata)
3704 {
3705 pmix_shift_caddy_t *scd = (pmix_shift_caddy_t*)cbdata;
3706 pmix_server_trkr_t *trk = scd->tracker;
3707 pmix_server_caddy_t *cd;
3708 pmix_buffer_t *reply, xfer;
3709 pmix_status_t ret;
3710 size_t n, ctxid = SIZE_MAX;
3711 pmix_group_t *grp = (pmix_group_t*)trk->cbdata;
3712 pmix_byte_object_t *bo = NULL;
3713 pmix_nspace_caddy_t *nptr;
3714 pmix_list_t nslist;
3715 bool found;
3716
3717 PMIX_ACQUIRE_OBJECT(scd);
3718
3719 pmix_output_verbose(2, pmix_server_globals.connect_output,
3720 "server:grpcbfunc processing WITH %d MEMBERS",
3721 (NULL == trk) ? 0 : (int)pmix_list_get_size(&trk->local_cbs));
3722
3723 if (NULL == trk) {
3724
3725
3726 if (NULL != scd->cbfunc.relfn) {
3727 scd->cbfunc.relfn(scd->cbdata);
3728 }
3729 PMIX_RELEASE(scd);
3730 return;
3731 }
3732
3733
3734 if (trk->event_active) {
3735 pmix_event_del(&trk->ev);
3736 }
3737
3738
3739
3740 if (trk->hybrid) {
3741
3742 if (NULL != grp) {
3743 pmix_list_remove_item(&pmix_server_globals.groups, &grp->super);
3744 PMIX_RELEASE(grp);
3745 }
3746 } else {
3747
3748 for (n=0; n < scd->ninfo; n++) {
3749 if (PMIX_CHECK_KEY(&scd->info[n], PMIX_GROUP_CONTEXT_ID)) {
3750 PMIX_VALUE_GET_NUMBER(ret, &scd->info[n].value, ctxid, size_t);
3751 } else if (PMIX_CHECK_KEY(&scd->info[n], PMIX_GROUP_ENDPT_DATA)) {
3752 bo = &scd->info[n].value.data.bo;
3753 }
3754 }
3755 }
3756
3757
3758
3759 if (NULL != bo) {
3760 PMIX_CONSTRUCT(&xfer, pmix_buffer_t);
3761 PMIX_CONSTRUCT(&nslist, pmix_list_t);
3762
3763
3764
3765 PMIX_LIST_FOREACH(cd, &trk->local_cbs, pmix_server_caddy_t) {
3766
3767 found = false;
3768 PMIX_LIST_FOREACH(nptr, &nslist, pmix_nspace_caddy_t) {
3769 if (0 == strcmp(nptr->ns->compat.gds->name,
3770 cd->peer->nptr->compat.gds->name)) {
3771 found = true;
3772 break;
3773 }
3774 }
3775 if (!found) {
3776
3777 nptr = PMIX_NEW(pmix_nspace_caddy_t);
3778 PMIX_RETAIN(cd->peer->nptr);
3779 nptr->ns = cd->peer->nptr;
3780 pmix_list_append(&nslist, &nptr->super);
3781 }
3782 }
3783
3784 PMIX_LIST_FOREACH(nptr, &nslist, pmix_nspace_caddy_t) {
3785 PMIX_LOAD_BUFFER(pmix_globals.mypeer, &xfer, bo->bytes, bo->size);
3786 PMIX_GDS_STORE_MODEX(ret, nptr->ns, &xfer, trk);
3787 if (PMIX_SUCCESS != ret) {
3788 PMIX_ERROR_LOG(ret);
3789 break;
3790 }
3791 }
3792 }
3793
3794
3795 PMIX_LIST_FOREACH(cd, &trk->local_cbs, pmix_server_caddy_t) {
3796 reply = PMIX_NEW(pmix_buffer_t);
3797 if (NULL == reply) {
3798 break;
3799 }
3800
3801 PMIX_BFROPS_PACK(ret, cd->peer, reply, &scd->status, 1, PMIX_STATUS);
3802 if (PMIX_SUCCESS != ret) {
3803 PMIX_ERROR_LOG(ret);
3804 PMIX_RELEASE(reply);
3805 break;
3806 }
3807 if (!trk->hybrid) {
3808
3809 PMIX_BFROPS_PACK(ret, cd->peer, reply, &ctxid, 1, PMIX_SIZE);
3810 if (PMIX_SUCCESS != ret) {
3811 PMIX_ERROR_LOG(ret);
3812 PMIX_RELEASE(reply);
3813 break;
3814 }
3815 }
3816 pmix_output_verbose(2, pmix_server_globals.connect_output,
3817 "server:grp_cbfunc reply being sent to %s:%u",
3818 cd->peer->info->pname.nspace, cd->peer->info->pname.rank);
3819 PMIX_SERVER_QUEUE_REPLY(ret, cd->peer, cd->hdr.tag, reply);
3820 if (PMIX_SUCCESS != ret) {
3821 PMIX_RELEASE(reply);
3822 }
3823 }
3824
3825
3826 pmix_list_remove_item(&pmix_server_globals.collectives, &trk->super);
3827 PMIX_RELEASE(trk);
3828
3829
3830 if (NULL != scd->cbfunc.relfn) {
3831 scd->cbfunc.relfn(scd->cbdata);
3832 }
3833 PMIX_RELEASE(scd);
3834 }
3835
3836
3837 static void grpcbfunc(pmix_status_t status,
3838 pmix_info_t *info, size_t ninfo,
3839 void *cbdata,
3840 pmix_release_cbfunc_t relfn,
3841 void *relcbd)
3842 {
3843 pmix_server_trkr_t *tracker = (pmix_server_trkr_t*)cbdata;
3844 pmix_shift_caddy_t *scd;
3845
3846 pmix_output_verbose(2, pmix_server_globals.connect_output,
3847 "server:grpcbfunc called with %d info", (int)ninfo);
3848
3849 if (NULL == tracker) {
3850
3851
3852 if (NULL != relfn) {
3853 relfn(relcbd);
3854 }
3855 return;
3856 }
3857
3858
3859 scd = PMIX_NEW(pmix_shift_caddy_t);
3860 if (NULL == scd) {
3861
3862 if (NULL != relfn) {
3863 relfn(cbdata);
3864 }
3865 return;
3866 }
3867 scd->status = status;
3868 scd->info = info;
3869 scd->ninfo = ninfo;
3870 scd->tracker = tracker;
3871 scd->cbfunc.relfn = relfn;
3872 scd->cbdata = relcbd;
3873 PMIX_THREADSHIFT(scd, _grpcbfunc);
3874 }
3875
3876
3877
3878 pmix_status_t pmix_server_grpconstruct(pmix_server_caddy_t *cd,
3879 pmix_buffer_t *buf)
3880 {
3881 pmix_peer_t *peer = (pmix_peer_t*)cd->peer;
3882 pmix_peer_t *pr;
3883 int32_t cnt, m;
3884 pmix_status_t rc;
3885 char *grpid;
3886 pmix_proc_t *procs;
3887 pmix_group_t *grp, *pgrp;
3888 pmix_info_t *info = NULL, *iptr;
3889 size_t n, ninfo, nprocs, n2;
3890 pmix_server_trkr_t *trk;
3891 struct timeval tv = {0, 0};
3892 bool need_cxtid = false;
3893 bool match, force_local = false;
3894 bool embed_barrier = false;
3895 bool barrier_directive_included = false;
3896 pmix_buffer_t bucket;
3897 pmix_byte_object_t bo;
3898 pmix_list_t mbrs;
3899 pmix_namelist_t *nm;
3900 bool expanded = false;
3901
3902 pmix_output_verbose(2, pmix_server_globals.connect_output,
3903 "recvd grpconstruct cmd");
3904
3905
3906 cnt = 1;
3907 PMIX_BFROPS_UNPACK(rc, peer, buf, &grpid, &cnt, PMIX_STRING);
3908 if (PMIX_SUCCESS != rc) {
3909 PMIX_ERROR_LOG(rc);
3910 goto error;
3911 }
3912
3913
3914 grp = NULL;
3915 PMIX_LIST_FOREACH(pgrp, &pmix_server_globals.groups, pmix_group_t) {
3916 if (0 == strcmp(grpid, pgrp->grpid)) {
3917 grp = pgrp;
3918 break;
3919 }
3920 }
3921 if (NULL == grp) {
3922
3923 grp = PMIX_NEW(pmix_group_t);
3924 if (NULL == grp) {
3925 rc = PMIX_ERR_NOMEM;
3926 goto error;
3927 }
3928 grp->grpid = grpid;
3929 pmix_list_append(&pmix_server_globals.groups, &grp->super);
3930 } else {
3931 free(grpid);
3932 }
3933
3934
3935 cnt = 1;
3936 PMIX_BFROPS_UNPACK(rc, peer, buf, &nprocs, &cnt, PMIX_SIZE);
3937 if (PMIX_SUCCESS != rc) {
3938 PMIX_ERROR_LOG(rc);
3939 goto error;
3940 }
3941 if (0 == nprocs) {
3942 return PMIX_ERR_BAD_PARAM;
3943 }
3944 PMIX_PROC_CREATE(procs, nprocs);
3945 if (NULL == procs) {
3946 rc = PMIX_ERR_NOMEM;
3947 goto error;
3948 }
3949 cnt = nprocs;
3950 PMIX_BFROPS_UNPACK(rc, peer, buf, procs, &cnt, PMIX_PROC);
3951 if (PMIX_SUCCESS != rc) {
3952 PMIX_ERROR_LOG(rc);
3953 PMIX_PROC_FREE(procs, nprocs);
3954 goto error;
3955 }
3956 if (NULL == grp->members) {
3957
3958
3959
3960 PMIX_CONSTRUCT(&mbrs, pmix_list_t);
3961 for (n=0; n < nprocs; n++) {
3962 if (PMIX_RANK_LOCAL_PEERS == procs[n].rank) {
3963 expanded = true;
3964
3965 for (m=0; m < pmix_server_globals.clients.size; m++) {
3966 if (NULL == (pr = (pmix_peer_t*)pmix_pointer_array_get_item(&pmix_server_globals.clients, m))) {
3967 continue;
3968 }
3969 if (PMIX_CHECK_NSPACE(procs[n].nspace, pr->info->pname.nspace)) {
3970 nm = PMIX_NEW(pmix_namelist_t);
3971 nm->pname = &pr->info->pname;
3972 pmix_list_append(&mbrs, &nm->super);
3973 }
3974 }
3975 } else if (PMIX_RANK_LOCAL_NODE == procs[n].rank) {
3976 expanded = true;
3977
3978 for (m=0; m < pmix_server_globals.clients.size; m++) {
3979 if (NULL == (pr = (pmix_peer_t*)pmix_pointer_array_get_item(&pmix_server_globals.clients, m))) {
3980 continue;
3981 }
3982 nm = PMIX_NEW(pmix_namelist_t);
3983 nm->pname = &pr->info->pname;
3984 pmix_list_append(&mbrs, &nm->super);
3985 }
3986 } else {
3987 nm = PMIX_NEW(pmix_namelist_t);
3988
3989 nm->pname = (pmix_name_t*)malloc(sizeof(pmix_name_t));
3990 nm->pname->nspace = strdup(procs[n].nspace);
3991 nm->pname->rank = procs[n].rank;
3992 pmix_list_append(&mbrs, &nm->super);
3993 }
3994 }
3995 if (expanded) {
3996 PMIX_PROC_FREE(procs, nprocs);
3997 nprocs = pmix_list_get_size(&mbrs);
3998 PMIX_PROC_CREATE(procs, nprocs);
3999 n=0;
4000 while (NULL != (nm = (pmix_namelist_t*)pmix_list_remove_first(&mbrs))) {
4001 PMIX_LOAD_PROCID(&procs[n], nm->pname->nspace, nm->pname->rank);
4002 PMIX_RELEASE(nm);
4003 }
4004 PMIX_DESTRUCT(&mbrs);
4005 }
4006 grp->members = procs;
4007 grp->nmbrs = nprocs;
4008 } else {
4009 PMIX_PROC_FREE(procs, nprocs);
4010 }
4011
4012
4013 cnt = 1;
4014 PMIX_BFROPS_UNPACK(rc, peer, buf, &ninfo, &cnt, PMIX_SIZE);
4015 if (PMIX_SUCCESS != rc) {
4016 PMIX_ERROR_LOG(rc);
4017 goto error;
4018 }
4019 if (0 < ninfo) {
4020 PMIX_INFO_CREATE(info, ninfo);
4021 cnt = ninfo;
4022 PMIX_BFROPS_UNPACK(rc, peer, buf, info, &cnt, PMIX_INFO);
4023 if (PMIX_SUCCESS != rc) {
4024 PMIX_ERROR_LOG(rc);
4025 goto error;
4026 }
4027 }
4028
4029
4030 if (NULL == (trk = get_tracker(grp->grpid, grp->members, grp->nmbrs, PMIX_GROUP_CONSTRUCT_CMD))) {
4031
4032 if (NULL == (trk = new_tracker(grp->grpid, grp->members, grp->nmbrs, PMIX_GROUP_CONSTRUCT_CMD))) {
4033
4034 PMIX_ERROR_LOG(PMIX_ERROR);
4035 rc = PMIX_ERROR;
4036 goto error;
4037 }
4038
4039
4040 trk->collect_type = PMIX_COLLECT_YES;
4041
4042 trk->hybrid = false;
4043
4044 trk->cbdata = grp;
4045
4046
4047
4048 trk->info = info;
4049 trk->ninfo = ninfo;
4050
4051
4052
4053 for (n=0; n < ninfo; n++) {
4054 if (PMIX_CHECK_KEY(&info[n], PMIX_TIMEOUT)) {
4055 tv.tv_sec = info[n].value.data.uint32;
4056 } else if (PMIX_CHECK_KEY(&info[n], PMIX_GROUP_ASSIGN_CONTEXT_ID)) {
4057 need_cxtid = PMIX_INFO_TRUE(&info[n]);
4058 } else if (PMIX_CHECK_KEY(&info[n], PMIX_GROUP_LOCAL_ONLY)) {
4059 force_local = PMIX_INFO_TRUE(&info[n]);
4060 } else if (PMIX_CHECK_KEY(&info[n], PMIX_EMBED_BARRIER)) {
4061 embed_barrier = PMIX_INFO_TRUE(&info[n]);
4062 barrier_directive_included = true;
4063 }
4064 }
4065
4066
4067
4068
4069
4070 if (force_local) {
4071 trk->local = true;
4072 } else if (need_cxtid) {
4073 trk->local = false;
4074 } else {
4075 trk->local = true;
4076 for (n=0; n < grp->nmbrs; n++) {
4077
4078
4079 if (PMIX_RANK_LOCAL_PEERS == grp->members[n].rank ||
4080 PMIX_RANK_LOCAL_NODE == grp->members[n].rank) {
4081 continue;
4082 }
4083
4084 match = false;
4085 for (m=0; m < pmix_server_globals.clients.size; m++) {
4086 if (NULL == (pr = (pmix_peer_t*)pmix_pointer_array_get_item(&pmix_server_globals.clients, m))) {
4087 continue;
4088 }
4089 if (PMIX_CHECK_PROCID(&grp->members[n], &pr->info->pname)) {
4090 match = true;
4091 break;
4092 }
4093 }
4094 if (!match) {
4095
4096 trk->local = false;
4097 break;
4098 }
4099 }
4100 }
4101 } else {
4102
4103 PMIX_INFO_FREE(info, ninfo);
4104 info = NULL;
4105 }
4106
4107
4108
4109 pmix_list_append(&trk->local_cbs, &cd->super);
4110
4111
4112 if (0 < tv.tv_sec) {
4113 pmix_event_evtimer_set(pmix_globals.evbase, &trk->ev,
4114 grp_timeout, trk);
4115 pmix_event_evtimer_add(&trk->ev, &tv);
4116 trk->event_active = true;
4117 }
4118
4119
4120
4121
4122
4123 if (trk->def_complete &&
4124 pmix_list_get_size(&trk->local_cbs) == trk->nlocal) {
4125 pmix_output_verbose(2, pmix_server_globals.base_output,
4126 "local group op complete with %d procs", (int)trk->npcs);
4127
4128 if (trk->local) {
4129
4130
4131
4132 grpcbfunc(PMIX_SUCCESS, NULL, 0, trk, NULL, NULL);
4133 return PMIX_SUCCESS;
4134 }
4135
4136
4137 if (NULL == pmix_host_server.group) {
4138
4139 pmix_list_remove_item(&pmix_server_globals.collectives, &trk->super);
4140 PMIX_RELEASE(trk);
4141 return PMIX_ERR_NOT_SUPPORTED;
4142 }
4143
4144
4145
4146 if (!barrier_directive_included ||
4147 (barrier_directive_included && embed_barrier)) {
4148
4149 PMIX_CONSTRUCT(&bucket, pmix_buffer_t);
4150 rc = _collect_data(trk, &bucket);
4151 if (PMIX_SUCCESS != rc) {
4152 if (trk->event_active) {
4153 pmix_event_del(&trk->ev);
4154 }
4155
4156 pmix_list_remove_item(&pmix_server_globals.collectives, &trk->super);
4157 PMIX_RELEASE(trk);
4158 PMIX_DESTRUCT(&bucket);
4159 return rc;
4160 }
4161
4162 PMIX_UNLOAD_BUFFER(&bucket, bo.bytes, bo.size);
4163 PMIX_DESTRUCT(&bucket);
4164
4165
4166 n2 = trk->ninfo + 1;
4167 PMIX_INFO_CREATE(iptr, n2);
4168 for (n=0; n < trk->ninfo; n++) {
4169 PMIX_INFO_XFER(&iptr[n], &trk->info[n]);
4170 }
4171 PMIX_INFO_LOAD(&iptr[ninfo], PMIX_GROUP_ENDPT_DATA, &bo, PMIX_BYTE_OBJECT);
4172 PMIX_BYTE_OBJECT_DESTRUCT(&bo);
4173 PMIX_INFO_FREE(trk->info, trk->ninfo);
4174 trk->info = iptr;
4175 trk->ninfo = n2;
4176 }
4177 rc = pmix_host_server.group(PMIX_GROUP_CONSTRUCT, grp->grpid,
4178 trk->pcs, trk->npcs,
4179 trk->info, trk->ninfo,
4180 grpcbfunc, trk);
4181 if (PMIX_SUCCESS != rc) {
4182 if (trk->event_active) {
4183 pmix_event_del(&trk->ev);
4184 }
4185 if (PMIX_OPERATION_SUCCEEDED == rc) {
4186
4187 grpcbfunc(PMIX_SUCCESS, NULL, 0, trk, NULL, NULL);
4188 return PMIX_SUCCESS;
4189 }
4190
4191 pmix_list_remove_item(&pmix_server_globals.collectives, &trk->super);
4192 PMIX_RELEASE(trk);
4193 return rc;
4194 }
4195 }
4196
4197 return PMIX_SUCCESS;
4198
4199 error:
4200 if (NULL != info) {
4201 PMIX_INFO_FREE(info, ninfo);
4202 }
4203 return rc;
4204 }
4205
4206
4207
4208 pmix_status_t pmix_server_grpdestruct(pmix_server_caddy_t *cd,
4209 pmix_buffer_t *buf)
4210 {
4211 pmix_peer_t *peer = (pmix_peer_t*)cd->peer;
4212 int32_t cnt;
4213 pmix_status_t rc;
4214 char *grpid;
4215 pmix_info_t *info = NULL;
4216 size_t n, ninfo;
4217 pmix_server_trkr_t *trk;
4218 pmix_group_t *grp, *pgrp;
4219 struct timeval tv = {0, 0};
4220
4221 pmix_output_verbose(2, pmix_server_globals.iof_output,
4222 "recvd grpdestruct cmd");
4223
4224 if (NULL == pmix_host_server.group) {
4225 PMIX_ERROR_LOG(PMIX_ERR_NOT_SUPPORTED);
4226 return PMIX_ERR_NOT_SUPPORTED;
4227 }
4228
4229
4230 cnt = 1;
4231 PMIX_BFROPS_UNPACK(rc, peer, buf, &grpid, &cnt, PMIX_STRING);
4232 if (PMIX_SUCCESS != rc) {
4233 PMIX_ERROR_LOG(rc);
4234 goto error;
4235 }
4236
4237
4238 grp = NULL;
4239 PMIX_LIST_FOREACH(pgrp, &pmix_server_globals.groups, pmix_group_t) {
4240 if (0 == strcmp(grpid, pgrp->grpid)) {
4241 grp = pgrp;
4242 break;
4243 }
4244 }
4245 free(grpid);
4246
4247
4248
4249 if (NULL == grp) {
4250 rc = PMIX_ERR_NOT_FOUND;
4251 goto error;
4252 }
4253
4254
4255 cnt = 1;
4256 PMIX_BFROPS_UNPACK(rc, peer, buf, &ninfo, &cnt, PMIX_SIZE);
4257 if (PMIX_SUCCESS != rc) {
4258 PMIX_ERROR_LOG(rc);
4259 goto error;
4260 }
4261 if (0 < ninfo) {
4262 PMIX_INFO_CREATE(info, ninfo);
4263 cnt = ninfo;
4264 PMIX_BFROPS_UNPACK(rc, peer, buf, info, &cnt, PMIX_INFO);
4265 if (PMIX_SUCCESS != rc) {
4266 PMIX_ERROR_LOG(rc);
4267 goto error;
4268 }
4269
4270
4271 for (n=0; n < ninfo; n++) {
4272 if (PMIX_CHECK_KEY(&info[n], PMIX_TIMEOUT)) {
4273 tv.tv_sec = info[n].value.data.uint32;
4274 break;
4275 }
4276 }
4277 }
4278
4279
4280 if (NULL == (trk = get_tracker(grp->grpid, grp->members, grp->nmbrs, PMIX_GROUP_DESTRUCT_CMD))) {
4281
4282 if (NULL == (trk = new_tracker(grp->grpid, grp->members, grp->nmbrs, PMIX_GROUP_DESTRUCT_CMD))) {
4283
4284 PMIX_ERROR_LOG(PMIX_ERROR);
4285 rc = PMIX_ERROR;
4286 goto error;
4287 }
4288 trk->collect_type = PMIX_COLLECT_NO;
4289
4290 trk->hybrid = true;
4291
4292 trk->cbdata = grp;
4293 }
4294
4295
4296
4297
4298 if (NULL == trk->info) {
4299 trk->info = info;
4300 trk->ninfo = ninfo;
4301 } else {
4302
4303 PMIX_INFO_FREE(info, ninfo);
4304 info = NULL;
4305 }
4306
4307
4308
4309 pmix_list_append(&trk->local_cbs, &cd->super);
4310
4311
4312 if (0 < tv.tv_sec) {
4313 pmix_event_evtimer_set(pmix_globals.evbase, &trk->ev,
4314 grp_timeout, trk);
4315 pmix_event_evtimer_add(&trk->ev, &tv);
4316 trk->event_active = true;
4317 }
4318
4319
4320
4321
4322
4323 if (trk->def_complete &&
4324 pmix_list_get_size(&trk->local_cbs) == trk->nlocal) {
4325 pmix_output_verbose(2, pmix_server_globals.base_output,
4326 "local group op complete %d", (int)trk->nlocal);
4327
4328 rc = pmix_host_server.group(PMIX_GROUP_DESTRUCT, grp->grpid,
4329 grp->members, grp->nmbrs,
4330 trk->info, trk->ninfo,
4331 grpcbfunc, trk);
4332 if (PMIX_SUCCESS != rc) {
4333 if (trk->event_active) {
4334 pmix_event_del(&trk->ev);
4335 }
4336 if (PMIX_OPERATION_SUCCEEDED == rc) {
4337
4338 grpcbfunc(PMIX_SUCCESS, NULL, 0, trk, NULL, NULL);
4339 return PMIX_SUCCESS;
4340 }
4341
4342 pmix_list_remove_item(&pmix_server_globals.collectives, &trk->super);
4343 PMIX_RELEASE(trk);
4344 return rc;
4345 }
4346 }
4347
4348 return PMIX_SUCCESS;
4349
4350 error:
4351 if (NULL != info) {
4352 PMIX_INFO_FREE(info, ninfo);
4353 }
4354 return rc;
4355 }
4356
4357
4358 static void tcon(pmix_server_trkr_t *t)
4359 {
4360 t->event_active = false;
4361 t->host_called = false;
4362 t->local = false;
4363 t->id = NULL;
4364 memset(t->pname.nspace, 0, PMIX_MAX_NSLEN+1);
4365 t->pname.rank = PMIX_RANK_UNDEF;
4366 t->pcs = NULL;
4367 t->npcs = 0;
4368 PMIX_CONSTRUCT(&t->nslist, pmix_list_t);
4369 PMIX_CONSTRUCT_LOCK(&t->lock);
4370 t->def_complete = false;
4371 PMIX_CONSTRUCT(&t->local_cbs, pmix_list_t);
4372 t->nlocal = 0;
4373 t->local_cnt = 0;
4374 t->info = NULL;
4375 t->ninfo = 0;
4376
4377 t->collect_type = PMIX_COLLECT_INVALID;
4378 t->modexcbfunc = NULL;
4379 t->op_cbfunc = NULL;
4380 t->hybrid = false;
4381 t->cbdata = NULL;
4382 }
4383 static void tdes(pmix_server_trkr_t *t)
4384 {
4385 if (NULL != t->id) {
4386 free(t->id);
4387 }
4388 PMIX_DESTRUCT_LOCK(&t->lock);
4389 if (NULL != t->pcs) {
4390 free(t->pcs);
4391 }
4392 PMIX_LIST_DESTRUCT(&t->local_cbs);
4393 if (NULL != t->info) {
4394 PMIX_INFO_FREE(t->info, t->ninfo);
4395 }
4396 PMIX_DESTRUCT(&t->nslist);
4397 }
4398 PMIX_CLASS_INSTANCE(pmix_server_trkr_t,
4399 pmix_list_item_t,
4400 tcon, tdes);
4401
4402 static void cdcon(pmix_server_caddy_t *cd)
4403 {
4404 memset(&cd->ev, 0, sizeof(pmix_event_t));
4405 cd->event_active = false;
4406 cd->trk = NULL;
4407 cd->peer = NULL;
4408 }
4409 static void cddes(pmix_server_caddy_t *cd)
4410 {
4411 if (cd->event_active) {
4412 pmix_event_del(&cd->ev);
4413 }
4414 if (NULL != cd->trk) {
4415 PMIX_RELEASE(cd->trk);
4416 }
4417 if (NULL != cd->peer) {
4418 PMIX_RELEASE(cd->peer);
4419 }
4420 }
4421 PMIX_CLASS_INSTANCE(pmix_server_caddy_t,
4422 pmix_list_item_t,
4423 cdcon, cddes);
4424
4425
4426 static void scadcon(pmix_setup_caddy_t *p)
4427 {
4428 p->peer = NULL;
4429 memset(&p->proc, 0, sizeof(pmix_proc_t));
4430 PMIX_CONSTRUCT_LOCK(&p->lock);
4431 p->nspace = NULL;
4432 p->codes = NULL;
4433 p->ncodes = 0;
4434 p->procs = NULL;
4435 p->nprocs = 0;
4436 p->apps = NULL;
4437 p->napps = 0;
4438 p->server_object = NULL;
4439 p->nlocalprocs = 0;
4440 p->info = NULL;
4441 p->ninfo = 0;
4442 p->keys = NULL;
4443 p->channels = PMIX_FWD_NO_CHANNELS;
4444 p->bo = NULL;
4445 p->nbo = 0;
4446 p->cbfunc = NULL;
4447 p->opcbfunc = NULL;
4448 p->setupcbfunc = NULL;
4449 p->lkcbfunc = NULL;
4450 p->spcbfunc = NULL;
4451 p->cbdata = NULL;
4452 }
4453 static void scaddes(pmix_setup_caddy_t *p)
4454 {
4455 if (NULL != p->peer) {
4456 PMIX_RELEASE(p->peer);
4457 }
4458 PMIX_PROC_FREE(p->procs, p->nprocs);
4459 if (NULL != p->apps) {
4460 PMIX_APP_FREE(p->apps, p->napps);
4461 }
4462 if (NULL != p->bo) {
4463 PMIX_BYTE_OBJECT_FREE(p->bo, p->nbo);
4464 }
4465 PMIX_DESTRUCT_LOCK(&p->lock);
4466 }
4467 PMIX_EXPORT PMIX_CLASS_INSTANCE(pmix_setup_caddy_t,
4468 pmix_object_t,
4469 scadcon, scaddes);
4470
4471 static void ncon(pmix_notify_caddy_t *p)
4472 {
4473 PMIX_CONSTRUCT_LOCK(&p->lock);
4474 #if defined(__linux__) && PMIX_HAVE_CLOCK_GETTIME
4475 struct timespec tp;
4476 (void) clock_gettime(CLOCK_MONOTONIC, &tp);
4477 p->ts = tp.tv_sec;
4478 #else
4479
4480 struct timeval tv;
4481 gettimeofday(&tv, NULL);
4482 p->ts = tv.tv_sec;
4483 #endif
4484 p->room = -1;
4485 memset(p->source.nspace, 0, PMIX_MAX_NSLEN+1);
4486 p->source.rank = PMIX_RANK_UNDEF;
4487 p->range = PMIX_RANGE_UNDEF;
4488 p->targets = NULL;
4489 p->ntargets = 0;
4490 p->nleft = SIZE_MAX;
4491 p->affected = NULL;
4492 p->naffected = 0;
4493 p->nondefault = false;
4494 p->info = NULL;
4495 p->ninfo = 0;
4496 }
4497 static void ndes(pmix_notify_caddy_t *p)
4498 {
4499 PMIX_DESTRUCT_LOCK(&p->lock);
4500 if (NULL != p->info) {
4501 PMIX_INFO_FREE(p->info, p->ninfo);
4502 }
4503 PMIX_PROC_FREE(p->affected, p->naffected);
4504 if (NULL != p->targets) {
4505 free(p->targets);
4506 }
4507 }
4508 PMIX_CLASS_INSTANCE(pmix_notify_caddy_t,
4509 pmix_object_t,
4510 ncon, ndes);
4511
4512
4513 PMIX_CLASS_INSTANCE(pmix_trkr_caddy_t,
4514 pmix_object_t,
4515 NULL, NULL);
4516
4517 static void dmcon(pmix_dmdx_remote_t *p)
4518 {
4519 p->cd = NULL;
4520 }
4521 static void dmdes(pmix_dmdx_remote_t *p)
4522 {
4523 if (NULL != p->cd) {
4524 PMIX_RELEASE(p->cd);
4525 }
4526 }
4527 PMIX_CLASS_INSTANCE(pmix_dmdx_remote_t,
4528 pmix_list_item_t,
4529 dmcon, dmdes);
4530
4531 static void dmrqcon(pmix_dmdx_request_t *p)
4532 {
4533 memset(&p->ev, 0, sizeof(pmix_event_t));
4534 p->event_active = false;
4535 p->lcd = NULL;
4536 }
4537 static void dmrqdes(pmix_dmdx_request_t *p)
4538 {
4539 if (p->event_active) {
4540 pmix_event_del(&p->ev);
4541 }
4542 if (NULL != p->lcd) {
4543 PMIX_RELEASE(p->lcd);
4544 }
4545 }
4546 PMIX_CLASS_INSTANCE(pmix_dmdx_request_t,
4547 pmix_list_item_t,
4548 dmrqcon, dmrqdes);
4549
4550 static void lmcon(pmix_dmdx_local_t *p)
4551 {
4552 memset(&p->proc, 0, sizeof(pmix_proc_t));
4553 PMIX_CONSTRUCT(&p->loc_reqs, pmix_list_t);
4554 p->info = NULL;
4555 p->ninfo = 0;
4556 }
4557 static void lmdes(pmix_dmdx_local_t *p)
4558 {
4559 if (NULL != p->info) {
4560 PMIX_INFO_FREE(p->info, p->ninfo);
4561 }
4562 PMIX_LIST_DESTRUCT(&p->loc_reqs);
4563 }
4564 PMIX_CLASS_INSTANCE(pmix_dmdx_local_t,
4565 pmix_list_item_t,
4566 lmcon, lmdes);
4567
4568 static void prevcon(pmix_peer_events_info_t *p)
4569 {
4570 p->peer = NULL;
4571 p->affected = NULL;
4572 p->naffected = 0;
4573 }
4574 static void prevdes(pmix_peer_events_info_t *p)
4575 {
4576 if (NULL != p->peer) {
4577 PMIX_RELEASE(p->peer);
4578 }
4579 if (NULL != p->affected) {
4580 PMIX_PROC_FREE(p->affected, p->naffected);
4581 }
4582 }
4583 PMIX_CLASS_INSTANCE(pmix_peer_events_info_t,
4584 pmix_list_item_t,
4585 prevcon, prevdes);
4586
4587 static void regcon(pmix_regevents_info_t *p)
4588 {
4589 PMIX_CONSTRUCT(&p->peers, pmix_list_t);
4590 }
4591 static void regdes(pmix_regevents_info_t *p)
4592 {
4593 PMIX_LIST_DESTRUCT(&p->peers);
4594 }
4595 PMIX_CLASS_INSTANCE(pmix_regevents_info_t,
4596 pmix_list_item_t,
4597 regcon, regdes);
4598
4599 static void ilcon(pmix_inventory_rollup_t *p)
4600 {
4601 PMIX_CONSTRUCT_LOCK(&p->lock);
4602 p->lock.active = false;
4603 p->status = PMIX_SUCCESS;
4604 p->requests = 0;
4605 p->replies = 0;
4606 PMIX_CONSTRUCT(&p->payload, pmix_list_t);
4607 p->info = NULL;
4608 p->ninfo = 0;
4609 p->cbfunc = NULL;
4610 p->infocbfunc = NULL;
4611 p->opcbfunc = NULL;
4612 p->cbdata = NULL;
4613 }
4614 static void ildes(pmix_inventory_rollup_t *p)
4615 {
4616 PMIX_DESTRUCT_LOCK(&p->lock);
4617 PMIX_LIST_DESTRUCT(&p->payload);
4618 }
4619 PMIX_CLASS_INSTANCE(pmix_inventory_rollup_t,
4620 pmix_object_t,
4621 ilcon, ildes);
4622
4623 static void grcon(pmix_group_t *p)
4624 {
4625 p->grpid = NULL;
4626 p->members = NULL;
4627 p->nmbrs = 0;
4628 }
4629 static void grdes(pmix_group_t *p)
4630 {
4631 if (NULL != p->grpid) {
4632 free(p->grpid);
4633 }
4634 if (NULL != p->members) {
4635 PMIX_PROC_FREE(p->members, p->nmbrs);
4636 }
4637 }
4638 PMIX_CLASS_INSTANCE(pmix_group_t,
4639 pmix_list_item_t,
4640 grcon, grdes);
4641
4642 PMIX_CLASS_INSTANCE(pmix_group_caddy_t,
4643 pmix_list_item_t,
4644 NULL, NULL);
4645
4646 static void iocon(pmix_iof_cache_t *p)
4647 {
4648 p->bo = NULL;
4649 }
4650 static void iodes(pmix_iof_cache_t *p)
4651 {
4652 PMIX_BYTE_OBJECT_FREE(p->bo, 1);
4653 }
4654 PMIX_CLASS_INSTANCE(pmix_iof_cache_t,
4655 pmix_list_item_t,
4656 iocon, iodes);