This source file includes following definitions.
- pmix_server_initialize
- PMIx_server_init
- PMIx_server_finalize
- _register_nspace
- PMIx_server_register_nspace
- pmix_server_purge_events
- _deregister_nspace
- PMIx_server_deregister_nspace
- pmix_server_execute_collective
- _register_client
- PMIx_server_register_client
- _deregister_client
- PMIx_server_deregister_client
- PMIx_server_setup_fork
- _dmodex_req
- PMIx_server_dmodex_request
- _store_internal
- PMIx_Store_internal
- PMIx_generate_regex
- PMIx_generate_ppn
- _setup_op
- _setup_app
- PMIx_server_setup_application
- _setup_local_support
- PMIx_server_setup_local_support
- _iofdeliver
- PMIx_server_IOF_deliver
- cirelease
- clct_complete
- clct
- PMIx_server_collect_inventory
- dlinv_complete
- dlinv
- PMIx_server_deliver_inventory
- op_cbfunc
- connection_cleanup
- op_cbfunc2
- _spcb
- spawn_cbfunc
- lookup_cbfunc
- _mdxcbfunc
- modex_cbfunc
- get_cbfunc
- _cnct
- cnct_cbfunc
- _discnct
- discnct_cbfunc
- regevents_cbfunc
- notifyerror_cbfunc
- alloc_cbfunc
- query_cbfunc
- jctrl_cbfunc
- monitor_cbfunc
- cred_cbfunc
- validate_cbfunc
- _iofreg
- iof_cbfunc
- server_switchyard
- pmix_server_message_handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 #include <src/include/pmix_config.h>
20
21 #include <src/include/pmix_stdint.h>
22 #include <src/include/pmix_socket_errno.h>
23
24 #include <pmix_server.h>
25 #include <pmix_common.h>
26 #include <pmix_rename.h>
27
28 #include "src/include/pmix_globals.h"
29
30 #ifdef HAVE_STRING_H
31 #include <string.h>
32 #endif
33 #include <fcntl.h>
34 #ifdef HAVE_UNISTD_H
35 #include <unistd.h>
36 #endif
37 #ifdef HAVE_SYS_SOCKET_H
38 #include <sys/socket.h>
39 #endif
40 #ifdef HAVE_SYS_UN_H
41 #include <sys/un.h>
42 #endif
43 #ifdef HAVE_SYS_UIO_H
44 #include <sys/uio.h>
45 #endif
46 #ifdef HAVE_SYS_TYPES_H
47 #include <sys/types.h>
48 #endif
49 #include <ctype.h>
50 #include <sys/stat.h>
51
52
53 #include "src/common/pmix_attributes.h"
54 #include "src/util/argv.h"
55 #include "src/util/error.h"
56 #include "src/util/name_fns.h"
57 #include "src/util/output.h"
58 #include "src/util/pmix_environ.h"
59 #include "src/util/show_help.h"
60 #include "src/mca/base/base.h"
61 #include "src/mca/base/pmix_mca_base_var.h"
62 #include "src/mca/pinstalldirs/base/base.h"
63 #include "src/mca/pnet/base/base.h"
64 #include "src/runtime/pmix_progress_threads.h"
65 #include "src/runtime/pmix_rte.h"
66 #include "src/mca/bfrops/base/base.h"
67 #include "src/mca/gds/base/base.h"
68 #include "src/mca/preg/preg.h"
69 #include "src/mca/psensor/base/base.h"
70 #include "src/mca/ptl/base/base.h"
71 #include "src/hwloc/hwloc-internal.h"
72
73
74
75 #include "src/client/pmix_client_ops.h"
76 #include "pmix_server_ops.h"
77
78
79 pmix_server_globals_t pmix_server_globals = {{{0}}};
80
81
82 static char *security_mode = NULL;
83 static char *ptl_mode = NULL;
84 static char *bfrops_mode = NULL;
85 static char *gds_mode = NULL;
86 static pid_t mypid;
87
88
89 pmix_status_t pmix_server_initialize(void)
90 {
91
92 PMIX_CONSTRUCT(&pmix_server_globals.clients, pmix_pointer_array_t);
93 pmix_pointer_array_init(&pmix_server_globals.clients, 1, INT_MAX, 1);
94 PMIX_CONSTRUCT(&pmix_server_globals.collectives, pmix_list_t);
95 PMIX_CONSTRUCT(&pmix_server_globals.remote_pnd, pmix_list_t);
96 PMIX_CONSTRUCT(&pmix_server_globals.gdata, pmix_list_t);
97 PMIX_CONSTRUCT(&pmix_server_globals.events, pmix_list_t);
98 PMIX_CONSTRUCT(&pmix_server_globals.local_reqs, pmix_list_t);
99 PMIX_CONSTRUCT(&pmix_server_globals.nspaces, pmix_list_t);
100 PMIX_CONSTRUCT(&pmix_server_globals.groups, pmix_list_t);
101 PMIX_CONSTRUCT(&pmix_server_globals.iof, pmix_list_t);
102
103 pmix_output_verbose(2, pmix_server_globals.base_output,
104 "pmix:server init called");
105
106
107 if (0 < pmix_server_globals.get_verbose) {
108
109 pmix_server_globals.get_output = pmix_output_open(NULL);
110 pmix_output_set_verbosity(pmix_server_globals.get_output,
111 pmix_server_globals.get_verbose);
112 }
113 if (0 < pmix_server_globals.connect_verbose) {
114
115 pmix_server_globals.connect_output = pmix_output_open(NULL);
116 pmix_output_set_verbosity(pmix_server_globals.connect_output,
117 pmix_server_globals.connect_verbose);
118 }
119 if (0 < pmix_server_globals.fence_verbose) {
120
121 pmix_server_globals.fence_output = pmix_output_open(NULL);
122 pmix_output_set_verbosity(pmix_server_globals.fence_output,
123 pmix_server_globals.fence_verbose);
124 }
125 if (0 < pmix_server_globals.pub_verbose) {
126
127 pmix_server_globals.pub_output = pmix_output_open(NULL);
128 pmix_output_set_verbosity(pmix_server_globals.pub_output,
129 pmix_server_globals.pub_verbose);
130 }
131 if (0 < pmix_server_globals.spawn_verbose) {
132
133 pmix_server_globals.spawn_output = pmix_output_open(NULL);
134 pmix_output_set_verbosity(pmix_server_globals.spawn_output,
135 pmix_server_globals.spawn_verbose);
136 }
137 if (0 < pmix_server_globals.event_verbose) {
138
139 pmix_server_globals.event_output = pmix_output_open(NULL);
140 pmix_output_set_verbosity(pmix_server_globals.event_output,
141 pmix_server_globals.event_verbose);
142 }
143 if (0 < pmix_server_globals.iof_verbose) {
144
145 pmix_server_globals.iof_output = pmix_output_open(NULL);
146 pmix_output_set_verbosity(pmix_server_globals.iof_output,
147 pmix_server_globals.iof_verbose);
148 }
149
150 if (0 < pmix_server_globals.base_verbose) {
151
152 pmix_server_globals.base_output = pmix_output_open(NULL);
153 pmix_output_set_verbosity(pmix_server_globals.base_output,
154 pmix_server_globals.base_verbose);
155 }
156
157 return PMIX_SUCCESS;
158 }
159
160 PMIX_EXPORT pmix_status_t PMIx_server_init(pmix_server_module_t *module,
161 pmix_info_t info[], size_t ninfo)
162 {
163 pmix_ptl_posted_recv_t *req;
164 pmix_status_t rc;
165 size_t n, m;
166 pmix_kval_t *kv;
167 bool protect, nspace_given = false, rank_given = false;
168 pmix_info_t ginfo;
169 char *protected[] = {
170 PMIX_USERID,
171 PMIX_GRPID,
172 PMIX_SOCKET_MODE,
173 PMIX_SERVER_TOOL_SUPPORT,
174 PMIX_SERVER_SYSTEM_SUPPORT,
175 PMIX_SERVER_GATEWAY,
176 NULL
177 };
178 char *evar;
179 pmix_rank_info_t *rinfo;
180 pmix_proc_type_t ptype = PMIX_PROC_SERVER;
181
182 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
183
184 pmix_output_verbose(2, pmix_server_globals.base_output,
185 "pmix:server init called");
186
187
188 pmix_host_server = *module;
189
190 if (NULL != info) {
191 for (n=0; n < ninfo; n++) {
192 if (0 == strncmp(info[n].key, PMIX_SERVER_GATEWAY, PMIX_MAX_KEYLEN)) {
193 if (PMIX_INFO_TRUE(&info[n])) {
194 ptype |= PMIX_PROC_GATEWAY;
195 }
196 } else if (0 == strncmp(info[n].key, PMIX_SERVER_TMPDIR, PMIX_MAX_KEYLEN)) {
197 pmix_server_globals.tmpdir = strdup(info[n].value.data.string);
198 } else if (0 == strncmp(info[n].key, PMIX_SYSTEM_TMPDIR, PMIX_MAX_KEYLEN)) {
199 pmix_server_globals.system_tmpdir = strdup(info[n].value.data.string);
200 }
201 }
202 }
203 if (NULL == pmix_server_globals.tmpdir) {
204 if (NULL == (evar = getenv("PMIX_SERVER_TMPDIR"))) {
205 pmix_server_globals.tmpdir = strdup(pmix_tmp_directory());
206 } else {
207 pmix_server_globals.tmpdir = strdup(evar);
208 }
209 }
210 if (NULL == pmix_server_globals.system_tmpdir) {
211 if (NULL == (evar = getenv("PMIX_SYSTEM_TMPDIR"))) {
212 pmix_server_globals.system_tmpdir = strdup(pmix_tmp_directory());
213 } else {
214 pmix_server_globals.system_tmpdir = strdup(evar);
215 }
216 }
217
218
219
220 if (PMIX_SUCCESS != (rc = pmix_rte_init(ptype, info, ninfo, NULL))) {
221 PMIX_ERROR_LOG(rc);
222 PMIX_RELEASE_THREAD(&pmix_global_lock);
223 return rc;
224 }
225
226
227 if (PMIX_SUCCESS != (rc = pmix_server_initialize())) {
228 PMIX_ERROR_LOG(rc);
229 PMIX_RELEASE_THREAD(&pmix_global_lock);
230 return rc;
231 }
232
233
234 pmix_globals.mypeer->nptr->compat.bfrops = pmix_bfrops_base_assign_module(NULL);
235 if (NULL == pmix_globals.mypeer->nptr->compat.bfrops) {
236 PMIX_ERROR_LOG(rc);
237 PMIX_RELEASE_THREAD(&pmix_global_lock);
238 return rc;
239 }
240
241 pmix_globals.mypeer->nptr->compat.type = pmix_bfrops_globals.default_type;
242
243
244 pmix_globals.mypeer->nptr->compat.psec = pmix_psec_base_assign_module(NULL);
245 if (NULL == pmix_globals.mypeer->nptr->compat.psec) {
246 PMIX_ERROR_LOG(rc);
247 PMIX_RELEASE_THREAD(&pmix_global_lock);
248 return rc;
249 }
250
251
252 pmix_globals.mypeer->nptr->compat.ptl = pmix_ptl_base_assign_module();
253 if (NULL == pmix_globals.mypeer->nptr->compat.ptl) {
254 PMIX_ERROR_LOG(rc);
255 PMIX_RELEASE_THREAD(&pmix_global_lock);
256 return rc;
257 }
258
259
260 PMIX_INFO_LOAD(&ginfo, PMIX_GDS_MODULE, "hash", PMIX_STRING);
261 pmix_globals.mypeer->nptr->compat.gds = pmix_gds_base_assign_module(&ginfo, 1);
262 if (NULL == pmix_globals.mypeer->nptr->compat.gds) {
263 PMIX_ERROR_LOG(rc);
264 PMIX_RELEASE_THREAD(&pmix_global_lock);
265 return rc;
266 }
267 PMIX_INFO_DESTRUCT(&ginfo);
268
269
270
271 pmix_client_globals.myserver = PMIX_NEW(pmix_peer_t);
272 PMIX_RETAIN(pmix_globals.mypeer->nptr);
273 pmix_client_globals.myserver->nptr = pmix_globals.mypeer->nptr;
274
275
276 security_mode = pmix_psec_base_get_available_modules();
277
278
279 ptl_mode = pmix_ptl_base_get_available_modules();
280
281
282 bfrops_mode = pmix_bfrops_base_get_available_modules();
283
284
285 gds_mode = pmix_gds_base_get_available_modules();
286
287
288
289
290 if (NULL != info) {
291 for (n=0; n < ninfo; n++) {
292 if (0 == strncmp(info[n].key, PMIX_SERVER_NSPACE, PMIX_MAX_KEYLEN)) {
293 pmix_strncpy(pmix_globals.myid.nspace, info[n].value.data.string, PMIX_MAX_NSLEN);
294 nspace_given = true;
295 } else if (0 == strncmp(info[n].key, PMIX_SERVER_RANK, PMIX_MAX_KEYLEN)) {
296 pmix_globals.myid.rank = info[n].value.data.rank;
297 rank_given = true;
298 } else {
299
300 protect = false;
301 for (m=0; NULL != protected[m]; m++) {
302 if (0 == strcmp(info[n].key, protected[m])) {
303 protect = true;
304 break;
305 }
306 }
307 if (protect) {
308 continue;
309 }
310
311 kv = PMIX_NEW(pmix_kval_t);
312 kv->key = strdup(info[n].key);
313 PMIX_VALUE_CREATE(kv->value, 1);
314 PMIX_BFROPS_VALUE_XFER(rc, pmix_globals.mypeer,
315 kv->value, &info[n].value);
316 if (PMIX_SUCCESS != rc) {
317 PMIX_RELEASE(kv);
318 PMIX_ERROR_LOG(rc);
319 PMIX_RELEASE_THREAD(&pmix_global_lock);
320 return rc;
321 }
322 pmix_list_append(&pmix_server_globals.gdata, &kv->super);
323 }
324 }
325 }
326
327 if (!nspace_given) {
328
329 if (NULL == (evar = getenv("PMIX_SERVER_NAMESPACE"))) {
330
331 pmix_strncpy(pmix_globals.myid.nspace, "pmix-server", PMIX_MAX_NSLEN);
332 } else {
333 pmix_strncpy(pmix_globals.myid.nspace, evar, PMIX_MAX_NSLEN);
334 }
335 }
336 if (!rank_given) {
337
338 mypid = getpid();
339 if (NULL == (evar = getenv("PMIX_SERVER_RANK"))) {
340
341 pmix_globals.myid.rank = mypid;
342 } else {
343 pmix_globals.myid.rank = strtol(evar, NULL, 10);
344 }
345 }
346
347
348 if (NULL == pmix_globals.mypeer->info) {
349 rinfo = PMIX_NEW(pmix_rank_info_t);
350 pmix_globals.mypeer->info = rinfo;
351 } else {
352 rinfo = pmix_globals.mypeer->info;
353 }
354 if (NULL == pmix_globals.mypeer->nptr) {
355 pmix_globals.mypeer->nptr = PMIX_NEW(pmix_namespace_t);
356
357 PMIX_RETAIN(pmix_globals.mypeer->nptr);
358 pmix_list_prepend(&pmix_server_globals.nspaces, &pmix_globals.mypeer->nptr->super);
359 }
360 pmix_globals.mypeer->nptr->nspace = strdup(pmix_globals.myid.nspace);
361 rinfo->pname.nspace = strdup(pmix_globals.mypeer->nptr->nspace);
362 rinfo->pname.rank = pmix_globals.myid.rank;
363 rinfo->uid = pmix_globals.uid;
364 rinfo->gid = pmix_globals.gid;
365 PMIX_RETAIN(pmix_globals.mypeer->info);
366 pmix_client_globals.myserver->info = pmix_globals.mypeer->info;
367
368
369 if (PMIX_SUCCESS != (rc = pmix_mca_base_framework_open(&pmix_pnet_base_framework, 0))) {
370 PMIX_RELEASE_THREAD(&pmix_global_lock);
371 return rc;
372 }
373 if (PMIX_SUCCESS != (rc = pmix_pnet_base_select())) {
374 PMIX_RELEASE_THREAD(&pmix_global_lock);
375 return rc;
376 }
377
378
379 if (PMIX_SUCCESS != (rc = pmix_hwloc_get_topology(info, ninfo))) {
380 PMIX_RELEASE_THREAD(&pmix_global_lock);
381 return rc;
382 }
383
384
385 if (PMIX_SUCCESS != (rc = pmix_mca_base_framework_open(&pmix_psensor_base_framework, 0))) {
386 PMIX_RELEASE_THREAD(&pmix_global_lock);
387 return rc;
388 }
389 if (PMIX_SUCCESS != (rc = pmix_psensor_base_select())) {
390 PMIX_RELEASE_THREAD(&pmix_global_lock);
391 return rc;
392 }
393
394
395 req = PMIX_NEW(pmix_ptl_posted_recv_t);
396 req->tag = UINT32_MAX;
397 req->cbfunc = pmix_server_message_handler;
398
399 pmix_list_append(&pmix_ptl_globals.posted_recvs, &req->super);
400
401
402 if (PMIX_PROC_IS_GATEWAY(pmix_globals.mypeer)) {
403
404 PMIX_IOF_SINK_DEFINE(&pmix_client_globals.iof_stdout, &pmix_globals.myid,
405 1, PMIX_FWD_STDOUT_CHANNEL, pmix_iof_write_handler);
406 PMIX_IOF_SINK_DEFINE(&pmix_client_globals.iof_stderr, &pmix_globals.myid,
407 2, PMIX_FWD_STDERR_CHANNEL, pmix_iof_write_handler);
408 }
409
410
411 if (PMIX_SUCCESS != (rc = pmix_register_server_attrs())) {
412 PMIX_RELEASE_THREAD(&pmix_global_lock);
413 return rc;
414 }
415
416
417 if (PMIX_SUCCESS != pmix_ptl_base_start_listening(info, ninfo)) {
418 pmix_show_help("help-pmix-server.txt", "listener-thread-start", true);
419 PMIX_RELEASE_THREAD(&pmix_global_lock);
420 PMIx_server_finalize();
421 return PMIX_ERR_INIT;
422 }
423
424 ++pmix_globals.init_cntr;
425 PMIX_RELEASE_THREAD(&pmix_global_lock);
426
427 return PMIX_SUCCESS;
428 }
429
430 PMIX_EXPORT pmix_status_t PMIx_server_finalize(void)
431 {
432 int i;
433 pmix_peer_t *peer;
434 pmix_namespace_t *ns;
435
436 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
437 if (pmix_globals.init_cntr <= 0) {
438 PMIX_RELEASE_THREAD(&pmix_global_lock);
439 return PMIX_ERR_INIT;
440 }
441
442 if (1 != pmix_globals.init_cntr) {
443 --pmix_globals.init_cntr;
444 PMIX_RELEASE_THREAD(&pmix_global_lock);
445 return PMIX_SUCCESS;
446 }
447 pmix_globals.init_cntr = 0;
448
449 pmix_output_verbose(2, pmix_server_globals.base_output,
450 "pmix:server finalize called");
451
452 if (!pmix_globals.external_evbase) {
453
454
455
456
457 (void)pmix_progress_thread_pause(NULL);
458 }
459
460 pmix_ptl_base_stop_listening();
461
462 for (i=0; i < pmix_server_globals.clients.size; i++) {
463 if (NULL != (peer = (pmix_peer_t*)pmix_pointer_array_get_item(&pmix_server_globals.clients, i))) {
464
465
466
467 pmix_execute_epilog(&peer->epilog);
468 PMIX_RELEASE(peer);
469 }
470 }
471 PMIX_DESTRUCT(&pmix_server_globals.clients);
472 PMIX_LIST_DESTRUCT(&pmix_server_globals.collectives);
473 PMIX_LIST_DESTRUCT(&pmix_server_globals.remote_pnd);
474 PMIX_LIST_DESTRUCT(&pmix_server_globals.local_reqs);
475 PMIX_LIST_DESTRUCT(&pmix_server_globals.gdata);
476 PMIX_LIST_DESTRUCT(&pmix_server_globals.events);
477 PMIX_LIST_FOREACH(ns, &pmix_server_globals.nspaces, pmix_namespace_t) {
478
479
480
481 pmix_execute_epilog(&ns->epilog);
482 }
483 PMIX_LIST_DESTRUCT(&pmix_server_globals.nspaces);
484 PMIX_LIST_DESTRUCT(&pmix_server_globals.groups);
485 PMIX_LIST_DESTRUCT(&pmix_server_globals.iof);
486
487 pmix_hwloc_cleanup();
488
489 if (NULL != security_mode) {
490 free(security_mode);
491 }
492
493 if (NULL != ptl_mode) {
494 free(ptl_mode);
495 }
496
497 if (NULL != bfrops_mode) {
498 free(bfrops_mode);
499 }
500
501 if (NULL != gds_mode) {
502 free(gds_mode);
503 }
504 if (NULL != pmix_server_globals.tmpdir) {
505 free(pmix_server_globals.tmpdir);
506 }
507
508 (void)pmix_mca_base_framework_close(&pmix_psensor_base_framework);
509
510 (void)pmix_mca_base_framework_close(&pmix_pnet_base_framework);
511
512 PMIX_RELEASE_THREAD(&pmix_global_lock);
513 PMIX_DESTRUCT_LOCK(&pmix_global_lock);
514
515 pmix_rte_finalize();
516 if (NULL != pmix_globals.mypeer) {
517 PMIX_RELEASE(pmix_globals.mypeer);
518 }
519
520 pmix_output_verbose(2, pmix_server_globals.base_output,
521 "pmix:server finalize complete");
522
523
524 pmix_class_finalize();
525
526 return PMIX_SUCCESS;
527 }
528
529 static void _register_nspace(int sd, short args, void *cbdata)
530 {
531 pmix_setup_caddy_t *cd = (pmix_setup_caddy_t*)cbdata;
532 pmix_namespace_t *nptr, *tmp;
533 pmix_status_t rc;
534 size_t i;
535
536 PMIX_ACQUIRE_OBJECT(caddy);
537
538 pmix_output_verbose(2, pmix_server_globals.base_output,
539 "pmix:server _register_nspace %s", cd->proc.nspace);
540
541
542 nptr = NULL;
543 PMIX_LIST_FOREACH(tmp, &pmix_server_globals.nspaces, pmix_namespace_t) {
544 if (0 == strcmp(tmp->nspace, cd->proc.nspace)) {
545 nptr = tmp;
546 break;
547 }
548 }
549 if (NULL == nptr) {
550 nptr = PMIX_NEW(pmix_namespace_t);
551 if (NULL == nptr) {
552 rc = PMIX_ERR_NOMEM;
553 goto release;
554 }
555 nptr->nspace = strdup(cd->proc.nspace);
556 pmix_list_append(&pmix_server_globals.nspaces, &nptr->super);
557 }
558 nptr->nlocalprocs = cd->nlocalprocs;
559
560
561 if (nptr->nlocalprocs == pmix_list_get_size(&nptr->ranks)) {
562 nptr->all_registered = true;
563 }
564
565
566 for (i=0; i < cd->ninfo; i++) {
567 if (0 == strcmp(cd->info[i].key, PMIX_REGISTER_NODATA)) {
568
569 rc = PMIX_SUCCESS;
570 goto release;
571 }
572 }
573
574
575 PMIX_GDS_ADD_NSPACE(rc, nptr->nspace, cd->info, cd->ninfo);
576 if (PMIX_SUCCESS != rc) {
577 goto release;
578 }
579
580
581
582
583
584 PMIX_GDS_CACHE_JOB_INFO(rc, pmix_globals.mypeer, nptr,
585 cd->info, cd->ninfo);
586
587 release:
588 if (NULL != cd->opcbfunc) {
589 cd->opcbfunc(rc, cd->cbdata);
590 }
591 PMIX_RELEASE(cd);
592 }
593
594
595 PMIX_EXPORT pmix_status_t PMIx_server_register_nspace(const pmix_nspace_t nspace, int nlocalprocs,
596 pmix_info_t info[], size_t ninfo,
597 pmix_op_cbfunc_t cbfunc, void *cbdata)
598 {
599 pmix_setup_caddy_t *cd;
600
601 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
602 if (pmix_globals.init_cntr <= 0) {
603 PMIX_RELEASE_THREAD(&pmix_global_lock);
604 return PMIX_ERR_INIT;
605 }
606 PMIX_RELEASE_THREAD(&pmix_global_lock);
607
608 cd = PMIX_NEW(pmix_setup_caddy_t);
609 pmix_strncpy(cd->proc.nspace, nspace, PMIX_MAX_NSLEN);
610 cd->nlocalprocs = nlocalprocs;
611 cd->opcbfunc = cbfunc;
612 cd->cbdata = cbdata;
613
614 if (0 < ninfo) {
615 cd->ninfo = ninfo;
616 cd->info = info;
617 }
618
619
620
621 PMIX_THREADSHIFT(cd, _register_nspace);
622 return PMIX_SUCCESS;
623 }
624
625 void pmix_server_purge_events(pmix_peer_t *peer,
626 pmix_proc_t *proc)
627 {
628 pmix_regevents_info_t *reginfo, *regnext;
629 pmix_peer_events_info_t *prev, *pnext;
630 pmix_iof_req_t *req, *nxt;
631 int i;
632 pmix_notify_caddy_t *ncd;
633 size_t n, m, p, ntgs;
634 pmix_proc_t *tgs, *tgt;
635 pmix_dmdx_local_t *dlcd, *dnxt;
636
637
638
639 PMIX_LIST_FOREACH_SAFE(reginfo, regnext, &pmix_server_globals.events, pmix_regevents_info_t) {
640 PMIX_LIST_FOREACH_SAFE(prev, pnext, ®info->peers, pmix_peer_events_info_t) {
641 if ((NULL != peer && prev->peer == peer) ||
642 (NULL != proc && PMIX_CHECK_PROCID(proc, &prev->peer->info->pname))) {
643 pmix_list_remove_item(®info->peers, &prev->super);
644 PMIX_RELEASE(prev);
645 if (0 == pmix_list_get_size(®info->peers)) {
646 pmix_list_remove_item(&pmix_server_globals.events, ®info->super);
647 PMIX_RELEASE(reginfo);
648 break;
649 }
650 }
651 }
652 }
653
654
655
656 PMIX_LIST_FOREACH_SAFE(req, nxt, &pmix_globals.iof_requests, pmix_iof_req_t) {
657 if ((NULL != peer && PMIX_CHECK_PROCID(&req->peer->info->pname, &peer->info->pname)) ||
658 (NULL != proc && PMIX_CHECK_PROCID(&req->peer->info->pname, proc))) {
659 pmix_list_remove_item(&pmix_globals.iof_requests, &req->super);
660 PMIX_RELEASE(req);
661 }
662 }
663
664
665 PMIX_LIST_FOREACH_SAFE(dlcd, dnxt, &pmix_server_globals.local_reqs, pmix_dmdx_local_t) {
666 if ((NULL != peer && PMIX_CHECK_PROCID(&peer->info->pname, &dlcd->proc)) ||
667 (NULL != proc && PMIX_CHECK_PROCID(proc, &dlcd->proc))) {
668
669 pmix_list_remove_item(&pmix_server_globals.local_reqs, &dlcd->super);
670
671
672
673 PMIX_RELEASE(dlcd);
674 }
675 }
676
677
678 for (i=0; i < pmix_globals.max_events; i++) {
679 pmix_hotel_knock(&pmix_globals.notifications, i, (void**)&ncd);
680 if (NULL != ncd && NULL != ncd->targets && 0 < ncd->ntargets) {
681 tgt = NULL;
682 for (n=0; n < ncd->ntargets; n++) {
683 if ((NULL != peer && PMIX_CHECK_PROCID(&peer->info->pname, &ncd->targets[n])) ||
684 (NULL != proc && PMIX_CHECK_PROCID(proc, &ncd->targets[n]))) {
685 tgt = &ncd->targets[n];
686 break;
687 }
688 }
689 if (NULL != tgt) {
690
691
692 if (1 == ncd->ntargets) {
693 pmix_hotel_checkout(&pmix_globals.notifications, i);
694 PMIX_RELEASE(ncd);
695 } else if (PMIX_RANK_WILDCARD == tgt->rank &&
696 NULL != proc && PMIX_RANK_WILDCARD == proc->rank) {
697
698 ntgs = ncd->ntargets - 1;
699 PMIX_PROC_CREATE(tgs, ntgs);
700 p=0;
701 for (m=0; m < ncd->ntargets; m++) {
702 if (tgt != &ncd->targets[m]) {
703 memcpy(&tgs[p], &ncd->targets[n], sizeof(pmix_proc_t));
704 ++p;
705 }
706 }
707 PMIX_PROC_FREE(ncd->targets, ncd->ntargets);
708 ncd->targets = tgs;
709 ncd->ntargets = ntgs;
710 }
711 }
712 }
713 }
714
715 if (NULL != peer) {
716
717 pmix_execute_epilog(&peer->epilog);
718 }
719 }
720
721 static void _deregister_nspace(int sd, short args, void *cbdata)
722 {
723 pmix_setup_caddy_t *cd = (pmix_setup_caddy_t*)cbdata;
724 pmix_namespace_t *tmp;
725 pmix_status_t rc;
726
727 PMIX_ACQUIRE_OBJECT(cd);
728
729 pmix_output_verbose(2, pmix_server_globals.base_output,
730 "pmix:server _deregister_nspace %s",
731 cd->proc.nspace);
732
733
734 pmix_pnet.deregister_nspace(cd->proc.nspace);
735
736
737 PMIX_GDS_DEL_NSPACE(rc, cd->proc.nspace);
738
739
740
741 pmix_server_purge_events(NULL, &cd->proc);
742
743
744 PMIX_LIST_FOREACH(tmp, &pmix_server_globals.nspaces, pmix_namespace_t) {
745 if (PMIX_CHECK_NSPACE(tmp->nspace, cd->proc.nspace)) {
746
747 pmix_execute_epilog(&tmp->epilog);
748
749 pmix_list_remove_item(&pmix_server_globals.nspaces, &tmp->super);
750 PMIX_RELEASE(tmp);
751 break;
752 }
753 }
754
755
756 if (NULL != cd->opcbfunc) {
757 cd->opcbfunc(rc, cd->cbdata);
758 }
759 PMIX_RELEASE(cd);
760 }
761
762 PMIX_EXPORT void PMIx_server_deregister_nspace(const pmix_nspace_t nspace,
763 pmix_op_cbfunc_t cbfunc,
764 void *cbdata)
765 {
766 pmix_setup_caddy_t *cd;
767
768 pmix_output_verbose(2, pmix_server_globals.base_output,
769 "pmix:server deregister nspace %s",
770 nspace);
771
772 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
773 if (pmix_globals.init_cntr <= 0) {
774 PMIX_RELEASE_THREAD(&pmix_global_lock);
775 if (NULL != cbfunc) {
776 cbfunc(PMIX_ERR_INIT, cbdata);
777 }
778 return;
779 }
780 PMIX_RELEASE_THREAD(&pmix_global_lock);
781
782 cd = PMIX_NEW(pmix_setup_caddy_t);
783 PMIX_LOAD_PROCID(&cd->proc, nspace, PMIX_RANK_WILDCARD);
784 cd->opcbfunc = cbfunc;
785 cd->cbdata = cbdata;
786
787
788
789 PMIX_THREADSHIFT(cd, _deregister_nspace);
790 }
791
792 void pmix_server_execute_collective(int sd, short args, void *cbdata)
793 {
794 pmix_trkr_caddy_t *tcd = (pmix_trkr_caddy_t*)cbdata;
795 pmix_server_trkr_t *trk = tcd->trk;
796 pmix_server_caddy_t *cd;
797 pmix_peer_t *peer;
798 char *data = NULL;
799 size_t sz = 0;
800 pmix_byte_object_t bo;
801 pmix_buffer_t bucket, pbkt;
802 pmix_kval_t *kv;
803 pmix_proc_t proc;
804 bool first;
805 pmix_status_t rc;
806 pmix_list_t pnames;
807 pmix_namelist_t *pn;
808 bool found;
809 pmix_cb_t cb;
810
811 PMIX_ACQUIRE_OBJECT(tcd);
812
813
814
815 if (PMIX_FENCENB_CMD == trk->type) {
816
817
818
819
820
821
822
823 if (trk->hybrid) {
824
825
826
827
828 peer = pmix_globals.mypeer;
829 } else {
830
831
832
833 if (0 == pmix_list_get_size(&trk->local_cbs)) {
834 pmix_host_server.fence_nb(trk->pcs, trk->npcs,
835 trk->info, trk->ninfo,
836 data, sz, trk->modexcbfunc, trk);
837 PMIX_RELEASE(tcd);
838 return;
839 }
840
841 cd = (pmix_server_caddy_t*)pmix_list_get_first(&trk->local_cbs);
842 peer = cd->peer;
843 }
844 PMIX_CONSTRUCT(&bucket, pmix_buffer_t);
845
846 unsigned char tmp = (unsigned char)trk->collect_type;
847 PMIX_BFROPS_PACK(rc, peer, &bucket, &tmp, 1, PMIX_BYTE);
848
849 if (PMIX_COLLECT_YES == trk->collect_type) {
850 pmix_output_verbose(2, pmix_server_globals.base_output,
851 "fence - assembling data");
852 first = true;
853 PMIX_CONSTRUCT(&pnames, pmix_list_t);
854 PMIX_LIST_FOREACH(cd, &trk->local_cbs, pmix_server_caddy_t) {
855
856
857 found = false;
858 PMIX_LIST_FOREACH(pn, &pnames, pmix_namelist_t) {
859 if (pn->pname == &cd->peer->info->pname) {
860
861 found = true;
862 break;
863 }
864 }
865 if (found) {
866 continue;
867 } else {
868 pn = PMIX_NEW(pmix_namelist_t);
869 pn->pname = &cd->peer->info->pname;
870 }
871 if (trk->hybrid || first) {
872
873 pmix_strncpy(proc.nspace, cd->peer->info->pname.nspace, PMIX_MAX_NSLEN);
874 first = false;
875 }
876 proc.rank = cd->peer->info->pname.rank;
877
878
879 PMIX_CONSTRUCT(&cb, pmix_cb_t);
880 cb.proc = &proc;
881 cb.scope = PMIX_REMOTE;
882 cb.copy = true;
883 PMIX_GDS_FETCH_KV(rc, peer, &cb);
884 if (PMIX_SUCCESS == rc) {
885
886 PMIX_CONSTRUCT(&pbkt, pmix_buffer_t);
887
888 PMIX_BFROPS_PACK(rc, peer, &pbkt, &proc, 1, PMIX_PROC);
889 if (PMIX_SUCCESS != rc) {
890 PMIX_ERROR_LOG(rc);
891 PMIX_DESTRUCT(&cb);
892 PMIX_DESTRUCT(&pbkt);
893 PMIX_DESTRUCT(&bucket);
894 return;
895 }
896 PMIX_LIST_FOREACH(kv, &cb.kvs, pmix_kval_t) {
897 PMIX_BFROPS_PACK(rc, peer, &pbkt, kv, 1, PMIX_KVAL);
898 if (PMIX_SUCCESS != rc) {
899 PMIX_ERROR_LOG(rc);
900 PMIX_DESTRUCT(&cb);
901 PMIX_DESTRUCT(&pbkt);
902 PMIX_DESTRUCT(&bucket);
903 return;
904 }
905 }
906
907 PMIX_UNLOAD_BUFFER(&pbkt, bo.bytes, bo.size);
908 PMIX_DESTRUCT(&pbkt);
909
910 PMIX_BFROPS_PACK(rc, peer, &bucket, &bo, 1, PMIX_BYTE_OBJECT);
911 if (PMIX_SUCCESS != rc) {
912 PMIX_ERROR_LOG(rc);
913 PMIX_DESTRUCT(&cb);
914 PMIX_BYTE_OBJECT_DESTRUCT(&bo);
915 PMIX_DESTRUCT(&bucket);
916 PMIX_RELEASE(tcd);
917 return;
918 }
919 }
920 PMIX_DESTRUCT(&cb);
921 }
922 PMIX_LIST_DESTRUCT(&pnames);
923 }
924 PMIX_UNLOAD_BUFFER(&bucket, data, sz);
925 PMIX_DESTRUCT(&bucket);
926 pmix_host_server.fence_nb(trk->pcs, trk->npcs,
927 trk->info, trk->ninfo,
928 data, sz, trk->modexcbfunc, trk);
929 } else if (PMIX_CONNECTNB_CMD == trk->type) {
930 pmix_host_server.connect(trk->pcs, trk->npcs,
931 trk->info, trk->ninfo,
932 trk->op_cbfunc, trk);
933 } else if (PMIX_DISCONNECTNB_CMD == trk->type) {
934 pmix_host_server.disconnect(trk->pcs, trk->npcs,
935 trk->info, trk->ninfo,
936 trk->op_cbfunc, trk);
937 } else {
938
939 PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND);
940 pmix_list_remove_item(&pmix_server_globals.collectives, &trk->super);
941 PMIX_RELEASE(trk);
942 }
943 PMIX_RELEASE(tcd);
944 }
945
946 static void _register_client(int sd, short args, void *cbdata)
947 {
948 pmix_setup_caddy_t *cd = (pmix_setup_caddy_t*)cbdata;
949 pmix_rank_info_t *info, *iptr;
950 pmix_namespace_t *nptr, *ns;
951 pmix_server_trkr_t *trk;
952 pmix_trkr_caddy_t *tcd;
953 bool all_def;
954 size_t i;
955 pmix_status_t rc;
956
957 PMIX_ACQUIRE_OBJECT(cd);
958
959 pmix_output_verbose(2, pmix_server_globals.base_output,
960 "pmix:server _register_client for nspace %s rank %d %s object",
961 cd->proc.nspace, cd->proc.rank,
962 (NULL == cd->server_object) ? "NULL" : "NON-NULL");
963
964
965 nptr = NULL;
966 PMIX_LIST_FOREACH(ns, &pmix_server_globals.nspaces, pmix_namespace_t) {
967 if (0 == strcmp(ns->nspace, cd->proc.nspace)) {
968 nptr = ns;
969 break;
970 }
971 }
972 if (NULL == nptr) {
973 nptr = PMIX_NEW(pmix_namespace_t);
974 if (NULL == nptr) {
975 rc = PMIX_ERR_NOMEM;
976 goto cleanup;
977 }
978 nptr->nspace = strdup(cd->proc.nspace);
979 pmix_list_append(&pmix_server_globals.nspaces, &nptr->super);
980 }
981
982
983
984 info = PMIX_NEW(pmix_rank_info_t);
985 if (NULL == info) {
986 rc = PMIX_ERR_NOMEM;
987 goto cleanup;
988 }
989 info->pname.nspace = strdup(nptr->nspace);
990 info->pname.rank = cd->proc.rank;
991 info->uid = cd->uid;
992 info->gid = cd->gid;
993 info->server_object = cd->server_object;
994 pmix_list_append(&nptr->ranks, &info->super);
995
996 if (nptr->nlocalprocs == pmix_list_get_size(&nptr->ranks)) {
997 nptr->all_registered = true;
998
999
1000
1001
1002
1003 all_def = true;
1004 PMIX_LIST_FOREACH(trk, &pmix_server_globals.collectives, pmix_server_trkr_t) {
1005
1006
1007 if (trk->def_complete) {
1008 continue;
1009 }
1010
1011
1012
1013
1014
1015 for (i=0; i < trk->npcs; i++) {
1016
1017
1018 if (all_def) {
1019
1020 PMIX_LIST_FOREACH(ns, &pmix_server_globals.nspaces, pmix_namespace_t) {
1021 if (0 < ns->nlocalprocs &&
1022 0 == strcmp(trk->pcs[i].nspace, ns->nspace)) {
1023 all_def = ns->all_registered;
1024 break;
1025 }
1026 }
1027 }
1028
1029 if (0 != strncmp(trk->pcs[i].nspace, nptr->nspace, PMIX_MAX_NSLEN)) {
1030 continue;
1031 }
1032
1033 PMIX_LIST_FOREACH(iptr, &nptr->ranks, pmix_rank_info_t) {
1034 if (PMIX_RANK_WILDCARD == trk->pcs[i].rank ||
1035 iptr->pname.rank == trk->pcs[i].rank) {
1036
1037 ++trk->nlocal;
1038 break;
1039 }
1040 }
1041 }
1042
1043 trk->def_complete = all_def;
1044
1045 if (trk->def_complete && pmix_list_get_size(&trk->local_cbs) == trk->nlocal) {
1046
1047
1048
1049
1050 PMIX_EXECUTE_COLLECTIVE(tcd, trk, pmix_server_execute_collective);
1051 }
1052 }
1053
1054
1055
1056
1057 pmix_pending_nspace_requests(nptr);
1058 }
1059 rc = PMIX_SUCCESS;
1060
1061 cleanup:
1062
1063 if (NULL != cd->opcbfunc) {
1064 cd->opcbfunc(rc, cd->cbdata);
1065 }
1066 PMIX_RELEASE(cd);
1067 }
1068
1069 PMIX_EXPORT pmix_status_t PMIx_server_register_client(const pmix_proc_t *proc,
1070 uid_t uid, gid_t gid, void *server_object,
1071 pmix_op_cbfunc_t cbfunc, void *cbdata)
1072 {
1073 pmix_setup_caddy_t *cd;
1074
1075 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
1076 if (pmix_globals.init_cntr <= 0) {
1077 PMIX_RELEASE_THREAD(&pmix_global_lock);
1078 return PMIX_ERR_INIT;
1079 }
1080 PMIX_RELEASE_THREAD(&pmix_global_lock);
1081
1082 pmix_output_verbose(2, pmix_server_globals.base_output,
1083 "pmix:server register client %s:%d",
1084 proc->nspace, proc->rank);
1085
1086 cd = PMIX_NEW(pmix_setup_caddy_t);
1087 if (NULL == cd) {
1088 return PMIX_ERR_NOMEM;
1089 }
1090 pmix_strncpy(cd->proc.nspace, proc->nspace, PMIX_MAX_NSLEN);
1091 cd->proc.rank = proc->rank;
1092 cd->uid = uid;
1093 cd->gid = gid;
1094 cd->server_object = server_object;
1095 cd->opcbfunc = cbfunc;
1096 cd->cbdata = cbdata;
1097
1098
1099
1100 PMIX_THREADSHIFT(cd, _register_client);
1101 return PMIX_SUCCESS;
1102 }
1103
1104 static void _deregister_client(int sd, short args, void *cbdata)
1105 {
1106 pmix_setup_caddy_t *cd = (pmix_setup_caddy_t*)cbdata;
1107 pmix_rank_info_t *info;
1108 pmix_namespace_t *nptr, *tmp;
1109 pmix_peer_t *peer;
1110
1111 PMIX_ACQUIRE_OBJECT(cd);
1112
1113 pmix_output_verbose(2, pmix_server_globals.base_output,
1114 "pmix:server _deregister_client for nspace %s rank %d",
1115 cd->proc.nspace, cd->proc.rank);
1116
1117
1118 nptr = NULL;
1119 PMIX_LIST_FOREACH(tmp, &pmix_server_globals.nspaces, pmix_namespace_t) {
1120 if (0 == strcmp(tmp->nspace, cd->proc.nspace)) {
1121 nptr = tmp;
1122 break;
1123 }
1124 }
1125 if (NULL == nptr) {
1126
1127 goto cleanup;
1128 }
1129
1130 PMIX_LIST_FOREACH(info, &nptr->ranks, pmix_rank_info_t) {
1131 if (info->pname.rank == cd->proc.rank) {
1132
1133
1134 if (NULL == (peer = (pmix_peer_t*)pmix_pointer_array_get_item(&pmix_server_globals.clients, info->peerid))) {
1135
1136
1137 nptr->nfinalized++;
1138
1139
1140 pmix_pnet.child_finalized(&cd->proc);
1141 } else {
1142 if (!peer->finalized) {
1143
1144
1145
1146
1147
1148
1149
1150
1151 peer->finalized = true;
1152 nptr->nfinalized++;
1153 }
1154
1155
1156
1157 if (!PMIX_PROC_IS_TOOL(peer)) {
1158 pmix_pnet.child_finalized(&cd->proc);
1159 pmix_psensor.stop(peer, NULL);
1160 }
1161
1162 pmix_execute_epilog(&peer->epilog);
1163
1164
1165
1166 CLOSE_THE_SOCKET(peer->sd);
1167 }
1168 if (nptr->nlocalprocs == nptr->nfinalized) {
1169 pmix_pnet.local_app_finalized(nptr);
1170 }
1171 pmix_list_remove_item(&nptr->ranks, &info->super);
1172 PMIX_RELEASE(info);
1173 break;
1174 }
1175 }
1176
1177 cleanup:
1178 if (NULL != cd->opcbfunc) {
1179 cd->opcbfunc(PMIX_SUCCESS, cd->cbdata);
1180 }
1181 PMIX_RELEASE(cd);
1182 }
1183
1184 PMIX_EXPORT void PMIx_server_deregister_client(const pmix_proc_t *proc,
1185 pmix_op_cbfunc_t cbfunc, void *cbdata)
1186 {
1187 pmix_setup_caddy_t *cd;
1188
1189 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
1190 if (pmix_globals.init_cntr <= 0) {
1191 PMIX_RELEASE_THREAD(&pmix_global_lock);
1192 if (NULL != cbfunc) {
1193 cbfunc(PMIX_ERR_INIT, cbdata);
1194 }
1195 return;
1196 }
1197 PMIX_RELEASE_THREAD(&pmix_global_lock);
1198
1199 pmix_output_verbose(2, pmix_server_globals.base_output,
1200 "pmix:server deregister client %s:%d",
1201 proc->nspace, proc->rank);
1202
1203 cd = PMIX_NEW(pmix_setup_caddy_t);
1204 if (NULL == cd) {
1205 if (NULL != cbfunc) {
1206 cbfunc(PMIX_ERR_NOMEM, cbdata);
1207 }
1208 return;
1209 }
1210 pmix_strncpy(cd->proc.nspace, proc->nspace, PMIX_MAX_NSLEN);
1211 cd->proc.rank = proc->rank;
1212 cd->opcbfunc = cbfunc;
1213 cd->cbdata = cbdata;
1214
1215
1216
1217 PMIX_THREADSHIFT(cd, _deregister_client);
1218 }
1219
1220
1221 PMIX_EXPORT pmix_status_t PMIx_server_setup_fork(const pmix_proc_t *proc, char ***env)
1222 {
1223 char rankstr[128];
1224 pmix_listener_t *lt;
1225 pmix_status_t rc;
1226 char **varnames;
1227 int n;
1228
1229 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
1230 if (pmix_globals.init_cntr <= 0) {
1231 PMIX_RELEASE_THREAD(&pmix_global_lock);
1232 return PMIX_ERR_INIT;
1233 }
1234 PMIX_RELEASE_THREAD(&pmix_global_lock);
1235
1236 pmix_output_verbose(2, pmix_server_globals.base_output,
1237 "pmix:server setup_fork for nspace %s rank %d",
1238 proc->nspace, proc->rank);
1239
1240
1241 pmix_setenv("PMIX_NAMESPACE", proc->nspace, true, env);
1242
1243 (void)snprintf(rankstr, 127, "%d", proc->rank);
1244 pmix_setenv("PMIX_RANK", rankstr, true, env);
1245
1246 PMIX_LIST_FOREACH(lt, &pmix_ptl_globals.listeners, pmix_listener_t) {
1247 if (NULL != lt->uri && NULL != lt->varname) {
1248 varnames = pmix_argv_split(lt->varname, ':');
1249 for (n=0; NULL != varnames[n]; n++) {
1250 pmix_setenv(varnames[n], lt->uri, true, env);
1251 }
1252 pmix_argv_free(varnames);
1253 }
1254 }
1255
1256 pmix_setenv("PMIX_SECURITY_MODE", security_mode, true, env);
1257
1258 pmix_setenv("PMIX_PTL_MODULE", ptl_mode, true, env);
1259
1260 if (PMIX_BFROP_BUFFER_FULLY_DESC == pmix_globals.mypeer->nptr->compat.type) {
1261 pmix_setenv("PMIX_BFROP_BUFFER_TYPE", "PMIX_BFROP_BUFFER_FULLY_DESC", true, env);
1262 } else {
1263 pmix_setenv("PMIX_BFROP_BUFFER_TYPE", "PMIX_BFROP_BUFFER_NON_DESC", true, env);
1264 }
1265
1266 pmix_setenv("PMIX_GDS_MODULE", gds_mode, true, env);
1267
1268
1269 if (PMIX_SUCCESS != (rc = pmix_ptl_base_setup_fork(proc, env))) {
1270 PMIX_ERROR_LOG(rc);
1271 return rc;
1272 }
1273
1274
1275 if (PMIX_SUCCESS != (rc = pmix_pnet.setup_fork(proc, env))) {
1276 PMIX_ERROR_LOG(rc);
1277 return rc;
1278 }
1279
1280
1281 if (PMIX_SUCCESS != (rc = pmix_gds_base_setup_fork(proc, env))) {
1282 PMIX_ERROR_LOG(rc);
1283 return rc;
1284 }
1285
1286 return PMIX_SUCCESS;
1287 }
1288
1289
1290
1291
1292
1293
1294 static void _dmodex_req(int sd, short args, void *cbdata)
1295 {
1296 pmix_setup_caddy_t *cd = (pmix_setup_caddy_t*)cbdata;
1297 pmix_rank_info_t *info, *iptr;
1298 pmix_namespace_t *nptr, *ns;
1299 char *data = NULL;
1300 size_t sz = 0;
1301 pmix_dmdx_remote_t *dcd;
1302 pmix_status_t rc;
1303 pmix_buffer_t pbkt;
1304 pmix_kval_t *kv;
1305 pmix_cb_t cb;
1306
1307 PMIX_ACQUIRE_OBJECT(cd);
1308
1309 pmix_output_verbose(2, pmix_server_globals.base_output,
1310 "DMODX LOOKING FOR %s:%d",
1311 cd->proc.nspace, cd->proc.rank);
1312
1313
1314
1315
1316
1317 nptr = NULL;
1318 PMIX_LIST_FOREACH(ns, &pmix_server_globals.nspaces, pmix_namespace_t) {
1319 if (0 == strcmp(ns->nspace, cd->proc.nspace)) {
1320 nptr = ns;
1321 break;
1322 }
1323 }
1324 if (NULL == nptr) {
1325
1326
1327
1328 dcd = PMIX_NEW(pmix_dmdx_remote_t);
1329 if (NULL == dcd) {
1330 rc = PMIX_ERR_NOMEM;
1331 goto cleanup;
1332 }
1333 dcd->cd = cd;
1334 pmix_list_append(&pmix_server_globals.remote_pnd, &dcd->super);
1335 return;
1336 }
1337
1338
1339 if (cd->proc.rank == PMIX_RANK_WILDCARD) {
1340
1341
1342
1343 PMIX_CONSTRUCT(&cb, pmix_cb_t);
1344 cb.proc = &cd->proc;
1345 cb.scope = PMIX_REMOTE;
1346 cb.copy = true;
1347 PMIX_CONSTRUCT(&pbkt, pmix_buffer_t);
1348 PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, &cb);
1349 if (PMIX_SUCCESS == rc) {
1350
1351 PMIX_LIST_FOREACH(kv, &cb.kvs, pmix_kval_t) {
1352 PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &pbkt, kv, 1, PMIX_KVAL);
1353 if (PMIX_SUCCESS != rc) {
1354 PMIX_DESTRUCT(&pbkt);
1355 PMIX_DESTRUCT(&cb);
1356 goto cleanup;
1357 }
1358 }
1359 }
1360 PMIX_DESTRUCT(&cb);
1361 PMIX_UNLOAD_BUFFER(&pbkt, data, sz);
1362 PMIX_DESTRUCT(&pbkt);
1363 goto cleanup;
1364 }
1365
1366
1367 info = NULL;
1368 PMIX_LIST_FOREACH(iptr, &nptr->ranks, pmix_rank_info_t) {
1369 if (iptr->pname.rank == cd->proc.rank) {
1370 info = iptr;
1371 break;
1372 }
1373 }
1374 if (NULL == info) {
1375
1376
1377 dcd = PMIX_NEW(pmix_dmdx_remote_t);
1378 dcd->cd = cd;
1379 pmix_list_append(&pmix_server_globals.remote_pnd, &dcd->super);
1380 return;
1381 }
1382
1383
1384
1385 if (!info->modex_recvd) {
1386
1387
1388 dcd = PMIX_NEW(pmix_dmdx_remote_t);
1389 dcd->cd = cd;
1390 pmix_list_append(&pmix_server_globals.remote_pnd, &dcd->super);
1391 return;
1392 }
1393
1394
1395 PMIX_CONSTRUCT(&cb, pmix_cb_t);
1396 cb.proc = &cd->proc;
1397 cb.scope = PMIX_REMOTE;
1398 cb.copy = true;
1399 PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, &cb);
1400 if (PMIX_SUCCESS == rc) {
1401
1402 PMIX_CONSTRUCT(&pbkt, pmix_buffer_t);
1403 PMIX_LIST_FOREACH(kv, &cb.kvs, pmix_kval_t) {
1404 PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &pbkt, kv, 1, PMIX_KVAL);
1405 if (PMIX_SUCCESS != rc) {
1406 PMIX_DESTRUCT(&pbkt);
1407 PMIX_DESTRUCT(&cb);
1408 goto cleanup;
1409 }
1410 }
1411 PMIX_UNLOAD_BUFFER(&pbkt, data, sz);
1412 PMIX_DESTRUCT(&pbkt);
1413 }
1414 PMIX_DESTRUCT(&cb);
1415
1416 cleanup:
1417
1418 cd->cbfunc(rc, data, sz, cd->cbdata);
1419 if (NULL != data) {
1420 free(data);
1421 }
1422 PMIX_RELEASE(cd);
1423 }
1424
1425 PMIX_EXPORT pmix_status_t PMIx_server_dmodex_request(const pmix_proc_t *proc,
1426 pmix_dmodex_response_fn_t cbfunc,
1427 void *cbdata)
1428 {
1429 pmix_setup_caddy_t *cd;
1430
1431 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
1432 if (pmix_globals.init_cntr <= 0) {
1433 PMIX_RELEASE_THREAD(&pmix_global_lock);
1434 return PMIX_ERR_INIT;
1435 }
1436 PMIX_RELEASE_THREAD(&pmix_global_lock);
1437
1438
1439 if (NULL == cbfunc || NULL == proc) {
1440 return PMIX_ERR_BAD_PARAM;
1441 }
1442
1443 pmix_output_verbose(2, pmix_server_globals.base_output,
1444 "pmix:server dmodex request%s:%d",
1445 proc->nspace, proc->rank);
1446
1447 cd = PMIX_NEW(pmix_setup_caddy_t);
1448 pmix_strncpy(cd->proc.nspace, proc->nspace, PMIX_MAX_NSLEN);
1449 cd->proc.rank = proc->rank;
1450 cd->cbfunc = cbfunc;
1451 cd->cbdata = cbdata;
1452
1453
1454
1455 PMIX_THREADSHIFT(cd, _dmodex_req);
1456 return PMIX_SUCCESS;
1457 }
1458
1459 static void _store_internal(int sd, short args, void *cbdata)
1460 {
1461 pmix_shift_caddy_t *cd = (pmix_shift_caddy_t*)cbdata;
1462 pmix_proc_t proc;
1463
1464 PMIX_ACQUIRE_OBJECT(cd);
1465
1466 pmix_strncpy(proc.nspace, cd->pname.nspace, PMIX_MAX_NSLEN);
1467 proc.rank = cd->pname.rank;
1468 PMIX_GDS_STORE_KV(cd->status, pmix_globals.mypeer,
1469 &proc, PMIX_INTERNAL, cd->kv);
1470 if (cd->lock.active) {
1471 PMIX_WAKEUP_THREAD(&cd->lock);
1472 }
1473 }
1474
1475 PMIX_EXPORT pmix_status_t PMIx_Store_internal(const pmix_proc_t *proc,
1476 const pmix_key_t key, pmix_value_t *val)
1477 {
1478 pmix_shift_caddy_t *cd;
1479 pmix_status_t rc;
1480
1481 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
1482 if (pmix_globals.init_cntr <= 0) {
1483 PMIX_RELEASE_THREAD(&pmix_global_lock);
1484 return PMIX_ERR_INIT;
1485 }
1486 PMIX_RELEASE_THREAD(&pmix_global_lock);
1487
1488
1489 cd = PMIX_NEW(pmix_shift_caddy_t);
1490 if (NULL == cd) {
1491 return PMIX_ERR_NOMEM;
1492 }
1493 cd->pname.nspace = strdup(proc->nspace);
1494 cd->pname.rank = proc->rank;
1495
1496 cd->kv = PMIX_NEW(pmix_kval_t);
1497 if (NULL == cd->kv) {
1498 PMIX_RELEASE(cd);
1499 return PMIX_ERR_NOMEM;
1500 }
1501 cd->kv->key = strdup((char*)key);
1502 cd->kv->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
1503 PMIX_BFROPS_VALUE_XFER(rc, pmix_globals.mypeer, cd->kv->value, val);
1504 if (PMIX_SUCCESS != rc) {
1505 PMIX_ERROR_LOG(rc);
1506 PMIX_RELEASE(cd);
1507 return rc;
1508 }
1509
1510 PMIX_THREADSHIFT(cd, _store_internal);
1511 PMIX_WAIT_THREAD(&cd->lock);
1512 rc = cd->status;
1513 PMIX_RELEASE(cd);
1514
1515 return rc;
1516 }
1517
1518 PMIX_EXPORT pmix_status_t PMIx_generate_regex(const char *input, char **regexp)
1519 {
1520 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
1521 if (pmix_globals.init_cntr <= 0) {
1522 PMIX_RELEASE_THREAD(&pmix_global_lock);
1523 return PMIX_ERR_INIT;
1524 }
1525 PMIX_RELEASE_THREAD(&pmix_global_lock);
1526
1527 return pmix_preg.generate_node_regex(input, regexp);
1528 }
1529
1530 PMIX_EXPORT pmix_status_t PMIx_generate_ppn(const char *input, char **regexp)
1531 {
1532 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
1533 if (pmix_globals.init_cntr <= 0) {
1534 PMIX_RELEASE_THREAD(&pmix_global_lock);
1535 return PMIX_ERR_INIT;
1536 }
1537 PMIX_RELEASE_THREAD(&pmix_global_lock);
1538
1539 return pmix_preg.generate_ppn(input, regexp);
1540 }
1541
1542 static void _setup_op(pmix_status_t rc, void *cbdata)
1543 {
1544 pmix_setup_caddy_t *fcd = (pmix_setup_caddy_t*)cbdata;
1545
1546 if (NULL != fcd->info) {
1547 PMIX_INFO_FREE(fcd->info, fcd->ninfo);
1548 }
1549 PMIX_RELEASE(fcd);
1550 }
1551
1552 static void _setup_app(int sd, short args, void *cbdata)
1553 {
1554 pmix_setup_caddy_t *cd = (pmix_setup_caddy_t*)cbdata;
1555 pmix_setup_caddy_t *fcd = NULL;
1556 pmix_status_t rc;
1557 pmix_list_t ilist;
1558 pmix_kval_t *kv;
1559 size_t n;
1560
1561 PMIX_ACQUIRE_OBJECT(cd);
1562
1563 PMIX_CONSTRUCT(&ilist, pmix_list_t);
1564
1565
1566 if (PMIX_SUCCESS != (rc = pmix_pnet.allocate(cd->nspace,
1567 cd->info, cd->ninfo,
1568 &ilist))) {
1569 goto depart;
1570 }
1571
1572
1573 fcd = PMIX_NEW(pmix_setup_caddy_t);
1574 if (NULL == fcd) {
1575 rc = PMIX_ERR_NOMEM;
1576 PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
1577 goto depart;
1578 }
1579
1580
1581 if (0 < (fcd->ninfo = pmix_list_get_size(&ilist))) {
1582 PMIX_INFO_CREATE(fcd->info, fcd->ninfo);
1583 if (NULL == fcd->info) {
1584 rc = PMIX_ERR_NOMEM;
1585 PMIX_RELEASE(fcd);
1586 goto depart;
1587 }
1588 n = 0;
1589 PMIX_LIST_FOREACH(kv, &ilist, pmix_kval_t) {
1590 pmix_strncpy(fcd->info[n].key, kv->key, PMIX_MAX_KEYLEN);
1591 pmix_value_xfer(&fcd->info[n].value, kv->value);
1592 ++n;
1593 }
1594 }
1595
1596 depart:
1597
1598 if (NULL != cd->setupcbfunc) {
1599 if (NULL == fcd) {
1600 cd->setupcbfunc(rc, NULL, 0, cd->cbdata, NULL, NULL);
1601 } else {
1602 cd->setupcbfunc(rc, fcd->info, fcd->ninfo, cd->cbdata, _setup_op, fcd);
1603 }
1604 }
1605
1606
1607 PMIX_LIST_DESTRUCT(&ilist);
1608 if (NULL != cd->nspace) {
1609 free(cd->nspace);
1610 }
1611 PMIX_RELEASE(cd);
1612 }
1613
1614 pmix_status_t PMIx_server_setup_application(const pmix_nspace_t nspace,
1615 pmix_info_t info[], size_t ninfo,
1616 pmix_setup_application_cbfunc_t cbfunc, void *cbdata)
1617 {
1618 pmix_setup_caddy_t *cd;
1619
1620 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
1621 if (pmix_globals.init_cntr <= 0) {
1622 PMIX_RELEASE_THREAD(&pmix_global_lock);
1623 return PMIX_ERR_INIT;
1624 }
1625 PMIX_RELEASE_THREAD(&pmix_global_lock);
1626
1627
1628 cd = PMIX_NEW(pmix_setup_caddy_t);
1629 if (NULL == cd) {
1630 return PMIX_ERR_NOMEM;
1631 }
1632 if (NULL != nspace) {
1633 cd->nspace = strdup(nspace);
1634 }
1635 cd->info = info;
1636 cd->ninfo = ninfo;
1637 cd->setupcbfunc = cbfunc;
1638 cd->cbdata = cbdata;
1639 PMIX_THREADSHIFT(cd, _setup_app);
1640
1641 return PMIX_SUCCESS;
1642 }
1643
1644 static void _setup_local_support(int sd, short args, void *cbdata)
1645 {
1646 pmix_setup_caddy_t *cd = (pmix_setup_caddy_t*)cbdata;
1647 pmix_status_t rc;
1648
1649 PMIX_ACQUIRE_OBJECT(cd);
1650
1651
1652 rc = pmix_pnet.setup_local_network(cd->nspace, cd->info, cd->ninfo);
1653
1654
1655 if (NULL != cd->opcbfunc) {
1656 cd->opcbfunc(rc, cd->cbdata);
1657 }
1658
1659 if (NULL != cd->nspace) {
1660 free(cd->nspace);
1661 }
1662 PMIX_RELEASE(cd);
1663 }
1664
1665 pmix_status_t PMIx_server_setup_local_support(const pmix_nspace_t nspace,
1666 pmix_info_t info[], size_t ninfo,
1667 pmix_op_cbfunc_t cbfunc, void *cbdata)
1668 {
1669 pmix_setup_caddy_t *cd;
1670
1671 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
1672 if (pmix_globals.init_cntr <= 0) {
1673 PMIX_RELEASE_THREAD(&pmix_global_lock);
1674 return PMIX_ERR_INIT;
1675 }
1676 PMIX_RELEASE_THREAD(&pmix_global_lock);
1677
1678
1679 cd = PMIX_NEW(pmix_setup_caddy_t);
1680 if (NULL == cd) {
1681 return PMIX_ERR_NOMEM;
1682 }
1683 if (NULL != nspace) {
1684 cd->nspace = strdup(nspace);
1685 }
1686 cd->info = info;
1687 cd->ninfo = ninfo;
1688 cd->opcbfunc = cbfunc;
1689 cd->cbdata = cbdata;
1690 PMIX_THREADSHIFT(cd, _setup_local_support);
1691
1692 return PMIX_SUCCESS;
1693 }
1694
1695 static void _iofdeliver(int sd, short args, void *cbdata)
1696 {
1697 pmix_setup_caddy_t *cd = (pmix_setup_caddy_t*)cbdata;
1698 pmix_iof_req_t *req;
1699 pmix_status_t rc;
1700 pmix_buffer_t *msg;
1701 bool found = false;
1702 bool cached = false;
1703 pmix_iof_cache_t *iof;
1704
1705 pmix_output_verbose(2, pmix_server_globals.iof_output,
1706 "PMIX:SERVER delivering IOF from %s on channel %0x",
1707 PMIX_NAME_PRINT(cd->procs), cd->channels);
1708
1709
1710
1711 PMIX_LIST_FOREACH(req, &pmix_globals.iof_requests, pmix_iof_req_t) {
1712
1713 if (!(cd->channels & req->channels)) {
1714 continue;
1715 }
1716
1717 if (!PMIX_CHECK_PROCID(cd->procs, &req->pname)) {
1718 continue;
1719 }
1720
1721
1722
1723 if (NULL == req->peer->info || req->peer->finalized) {
1724 continue;
1725 }
1726 if (PMIX_CHECK_PROCID(cd->procs, &req->peer->info->pname)) {
1727 continue;
1728 }
1729 found = true;
1730
1731 if (NULL == (msg = PMIX_NEW(pmix_buffer_t))) {
1732 PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
1733 rc = PMIX_ERR_OUT_OF_RESOURCE;
1734 break;
1735 }
1736
1737 PMIX_BFROPS_PACK(rc, req->peer, msg, cd->procs, 1, PMIX_PROC);
1738 if (PMIX_SUCCESS != rc) {
1739 PMIX_ERROR_LOG(rc);
1740 PMIX_RELEASE(msg);
1741 break;
1742 }
1743
1744 PMIX_BFROPS_PACK(rc, req->peer, msg, &cd->channels, 1, PMIX_IOF_CHANNEL);
1745 if (PMIX_SUCCESS != rc) {
1746 PMIX_ERROR_LOG(rc);
1747 PMIX_RELEASE(msg);
1748 break;
1749 }
1750
1751 PMIX_BFROPS_PACK(rc, req->peer, msg, cd->bo, 1, PMIX_BYTE_OBJECT);
1752 if (PMIX_SUCCESS != rc) {
1753 PMIX_ERROR_LOG(rc);
1754 PMIX_RELEASE(msg);
1755 break;
1756 }
1757
1758 PMIX_PTL_SEND_ONEWAY(rc, req->peer, msg, PMIX_PTL_TAG_IOF);
1759 if (PMIX_SUCCESS != rc) {
1760 PMIX_ERROR_LOG(rc);
1761 PMIX_RELEASE(msg);
1762 }
1763 }
1764
1765
1766 if (!found) {
1767 pmix_output_verbose(2, pmix_server_globals.iof_output,
1768 "PMIx:SERVER caching IOF");
1769 if (pmix_server_globals.max_iof_cache == pmix_list_get_size(&pmix_server_globals.iof)) {
1770
1771 iof = (pmix_iof_cache_t*)pmix_list_remove_first(&pmix_server_globals.iof);
1772 PMIX_RELEASE(iof);
1773 }
1774
1775
1776 iof = PMIX_NEW(pmix_iof_cache_t);
1777 memcpy(&iof->source, cd->procs, sizeof(pmix_proc_t));
1778 iof->channel = cd->channels;
1779 iof->bo = cd->bo;
1780 cd->bo = NULL;
1781 pmix_list_append(&pmix_server_globals.iof, &iof->super);
1782 }
1783
1784
1785 if (NULL != cd->opcbfunc) {
1786 cd->opcbfunc(rc, cd->cbdata);
1787 }
1788 if (!cached) {
1789 PMIX_RELEASE(cd);
1790 }
1791 }
1792
1793 pmix_status_t PMIx_server_IOF_deliver(const pmix_proc_t *source,
1794 pmix_iof_channel_t channel,
1795 const pmix_byte_object_t *bo,
1796 const pmix_info_t info[], size_t ninfo,
1797 pmix_op_cbfunc_t cbfunc, void *cbdata)
1798 {
1799 pmix_setup_caddy_t *cd;
1800 size_t n;
1801
1802
1803 cd = PMIX_NEW(pmix_setup_caddy_t);
1804 if (NULL == cd) {
1805 return PMIX_ERR_NOMEM;
1806 }
1807
1808
1809 PMIX_PROC_CREATE(cd->procs, 1);
1810 if (NULL == cd->procs) {
1811 PMIX_RELEASE(cd);
1812 return PMIX_ERR_NOMEM;
1813 }
1814 cd->nprocs = 1;
1815 pmix_strncpy(cd->procs[0].nspace, source->nspace, PMIX_MAX_NSLEN);
1816 cd->procs[0].rank = source->rank;
1817 cd->channels = channel;
1818 PMIX_BYTE_OBJECT_CREATE(cd->bo, 1);
1819 if (NULL == cd->bo) {
1820 PMIX_RELEASE(cd);
1821 return PMIX_ERR_NOMEM;
1822 }
1823 cd->nbo = 1;
1824 cd->bo[0].bytes = (char*)malloc(bo->size);
1825 if (NULL == cd->bo[0].bytes) {
1826 PMIX_RELEASE(cd);
1827 return PMIX_ERR_NOMEM;
1828 }
1829 memcpy(cd->bo[0].bytes, bo->bytes, bo->size);
1830 cd->bo[0].size = bo->size;
1831 if (0 < ninfo) {
1832 PMIX_INFO_CREATE(cd->info, ninfo);
1833 if (NULL == cd->info) {
1834 PMIX_RELEASE(cd);
1835 return PMIX_ERR_NOMEM;
1836 }
1837 cd->ninfo = ninfo;
1838 for (n=0; n < ninfo; n++) {
1839 PMIX_INFO_XFER(&cd->info[n], (pmix_info_t*)&info[n]);
1840 }
1841 }
1842 cd->opcbfunc = cbfunc;
1843 cd->cbdata = cbdata;
1844 PMIX_THREADSHIFT(cd, _iofdeliver);
1845 return PMIX_SUCCESS;
1846 }
1847
1848 static void cirelease(void *cbdata)
1849 {
1850 pmix_inventory_rollup_t *rollup = (pmix_inventory_rollup_t*)cbdata;
1851 if (NULL != rollup->info) {
1852 PMIX_INFO_FREE(rollup->info, rollup->ninfo);
1853 }
1854 PMIX_RELEASE(rollup);
1855 }
1856
1857 static void clct_complete(pmix_status_t status,
1858 pmix_list_t *inventory,
1859 void *cbdata)
1860 {
1861 pmix_inventory_rollup_t *cd = (pmix_inventory_rollup_t*)cbdata;
1862 pmix_kval_t *kv;
1863 size_t n;
1864 pmix_status_t rc;
1865
1866 PMIX_ACQUIRE_THREAD(&cd->lock);
1867
1868
1869 if (NULL != inventory) {
1870 while (NULL != (kv = (pmix_kval_t*)pmix_list_remove_first(inventory))) {
1871 pmix_list_append(&cd->payload, &kv->super);
1872 }
1873 }
1874 if (PMIX_SUCCESS != status && PMIX_SUCCESS == cd->status) {
1875 cd->status = status;
1876 }
1877
1878 cd->replies++;
1879 if (cd->replies == cd->requests) {
1880
1881 cd->info = NULL;
1882 cd->ninfo = 0;
1883 if (NULL != cd->infocbfunc) {
1884
1885 cd->ninfo = pmix_list_get_size(&cd->payload);
1886 if (0 < cd->ninfo) {
1887 PMIX_INFO_CREATE(cd->info, cd->ninfo);
1888 if (NULL == cd->info) {
1889 cd->status = PMIX_ERR_NOMEM;
1890 cd->ninfo = 0;
1891 PMIX_RELEASE_THREAD(&cd->lock);
1892 goto error;
1893 }
1894
1895 n=0;
1896 PMIX_LIST_FOREACH(kv, &cd->payload, pmix_kval_t) {
1897 pmix_strncpy(cd->info[n].key, kv->key, PMIX_MAX_KEYLEN);
1898 rc = pmix_value_xfer(&cd->info[n].value, kv->value);
1899 if (PMIX_SUCCESS != rc) {
1900 PMIX_INFO_FREE(cd->info, cd->ninfo);
1901 cd->status = rc;
1902 break;
1903 }
1904 ++n;
1905 }
1906 }
1907
1908 PMIX_RELEASE_THREAD(&cd->lock);
1909 cd->infocbfunc(cd->status, cd->info, cd->ninfo, cd->cbdata, cirelease, cd);
1910 return;
1911 }
1912 }
1913
1914 PMIX_RELEASE_THREAD(&cd->lock);
1915 return;
1916
1917 error:
1918
1919 if (NULL != cd->infocbfunc) {
1920 cd->infocbfunc(cd->status, NULL, 0, cd->cbdata, NULL, NULL);
1921 }
1922 PMIX_RELEASE(cd);
1923
1924
1925 }
1926 static void clct(int sd, short args, void *cbdata)
1927 {
1928 pmix_inventory_rollup_t *cd = (pmix_inventory_rollup_t*)cbdata;
1929
1930 #if PMIX_HAVE_HWLOC
1931
1932 pmix_status_t rc;
1933 if (NULL == pmix_hwloc_topology) {
1934 if (PMIX_SUCCESS != (rc = pmix_hwloc_get_topology(NULL, 0))) {
1935 PMIX_ERROR_LOG(rc);
1936 return;
1937 }
1938 }
1939 #endif
1940
1941
1942 cd->requests = 1;
1943
1944
1945 pmix_pnet.collect_inventory(cd->info, cd->ninfo,
1946 clct_complete, cd);
1947
1948 return;
1949 }
1950
1951 pmix_status_t PMIx_server_collect_inventory(pmix_info_t directives[], size_t ndirs,
1952 pmix_info_cbfunc_t cbfunc, void *cbdata)
1953 {
1954 pmix_inventory_rollup_t *cd;
1955
1956 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
1957 if (pmix_globals.init_cntr <= 0) {
1958 PMIX_RELEASE_THREAD(&pmix_global_lock);
1959 return PMIX_ERR_INIT;
1960 }
1961 PMIX_RELEASE_THREAD(&pmix_global_lock);
1962
1963
1964 cd = PMIX_NEW(pmix_inventory_rollup_t);
1965 if (NULL == cd) {
1966 return PMIX_ERR_NOMEM;
1967 }
1968 cd->info = directives;
1969 cd->ninfo = ndirs;
1970 cd->infocbfunc = cbfunc;
1971 cd->cbdata = cbdata;
1972 PMIX_THREADSHIFT(cd, clct);
1973
1974 return PMIX_SUCCESS;
1975 }
1976
1977 static void dlinv_complete(pmix_status_t status, void *cbdata)
1978 {
1979 pmix_shift_caddy_t *cd = (pmix_shift_caddy_t*)cbdata;
1980
1981
1982 PMIX_ACQUIRE_THREAD(&cd->lock);
1983
1984
1985 cd->ndata++;
1986
1987 if (PMIX_SUCCESS != status && PMIX_SUCCESS == cd->status) {
1988 cd->status = status;
1989 }
1990 if (cd->ncodes == cd->ndata) {
1991
1992 PMIX_RELEASE_THREAD(&cd->lock);
1993 if (NULL != cd->cbfunc.opcbfn) {
1994 cd->cbfunc.opcbfn(cd->status, cd->cbdata);
1995 }
1996 PMIX_RELEASE(cd);
1997 return;
1998 }
1999
2000 PMIX_RELEASE_THREAD(&cd->lock);
2001 return;
2002 }
2003
2004
2005 static void dlinv(int sd, short args, void *cbdata)
2006 {
2007 pmix_shift_caddy_t *cd = (pmix_shift_caddy_t*)cbdata;
2008
2009
2010
2011 cd->ncodes = 1;
2012
2013 pmix_pnet.deliver_inventory(cd->info, cd->ninfo,
2014 cd->directives, cd->ndirs,
2015 dlinv_complete, cd);
2016
2017 return;
2018
2019 }
2020 pmix_status_t PMIx_server_deliver_inventory(pmix_info_t info[], size_t ninfo,
2021 pmix_info_t directives[], size_t ndirs,
2022 pmix_op_cbfunc_t cbfunc, void *cbdata)
2023 {
2024 pmix_shift_caddy_t *cd;
2025
2026 PMIX_ACQUIRE_THREAD(&pmix_global_lock);
2027 if (pmix_globals.init_cntr <= 0) {
2028 PMIX_RELEASE_THREAD(&pmix_global_lock);
2029 return PMIX_ERR_INIT;
2030 }
2031 PMIX_RELEASE_THREAD(&pmix_global_lock);
2032
2033
2034 cd = PMIX_NEW(pmix_shift_caddy_t);
2035 if (NULL == cd) {
2036 return PMIX_ERR_NOMEM;
2037 }
2038 cd->lock.active = false;
2039 cd->info = info;
2040 cd->ninfo = ninfo;
2041 cd->directives = directives;
2042 cd->ndirs = ndirs;
2043 cd->cbfunc.opcbfn = cbfunc;
2044 cd->cbdata = cbdata;
2045 PMIX_THREADSHIFT(cd, dlinv);
2046
2047 return PMIX_SUCCESS;
2048
2049 }
2050
2051
2052
2053
2054
2055
2056
2057 static void op_cbfunc(pmix_status_t status, void *cbdata)
2058 {
2059 pmix_server_caddy_t *cd = (pmix_server_caddy_t*)cbdata;
2060 pmix_buffer_t *reply;
2061 pmix_status_t rc;
2062
2063
2064
2065
2066
2067 if (NULL == (reply = PMIX_NEW(pmix_buffer_t))) {
2068 PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
2069 PMIX_RELEASE(cd);
2070 return;
2071 }
2072 PMIX_BFROPS_PACK(rc, cd->peer, reply, &status, 1, PMIX_STATUS);
2073 if (PMIX_SUCCESS != rc) {
2074 PMIX_ERROR_LOG(rc);
2075 PMIX_RELEASE(reply);
2076 PMIX_RELEASE(cd);
2077 return;
2078 }
2079
2080
2081
2082
2083 PMIX_PTL_SEND_ONEWAY(rc, cd->peer, reply, cd->hdr.tag);
2084 if (PMIX_SUCCESS != rc) {
2085 PMIX_ERROR_LOG(rc);
2086 PMIX_RELEASE(reply);
2087 }
2088
2089
2090 PMIX_RELEASE(cd);
2091 }
2092
2093 static void connection_cleanup(int sd, short args, void *cbdata)
2094 {
2095 pmix_server_caddy_t *cd = (pmix_server_caddy_t*)cbdata;
2096
2097
2098
2099
2100 cd->peer->finalized = true;
2101 pmix_ptl_base_lost_connection(cd->peer, PMIX_SUCCESS);
2102
2103 PMIX_RELEASE(cd);
2104 }
2105
2106 static void op_cbfunc2(pmix_status_t status, void *cbdata)
2107 {
2108 pmix_server_caddy_t *cd = (pmix_server_caddy_t*)cbdata;
2109 pmix_buffer_t *reply;
2110 pmix_status_t rc;
2111
2112
2113
2114
2115
2116 if (NULL == (reply = PMIX_NEW(pmix_buffer_t))) {
2117 PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
2118 PMIX_RELEASE(cd);
2119 return;
2120 }
2121 PMIX_BFROPS_PACK(rc, cd->peer, reply, &status, 1, PMIX_STATUS);
2122 if (PMIX_SUCCESS != rc) {
2123 PMIX_ERROR_LOG(rc);
2124 PMIX_RELEASE(reply);
2125 PMIX_RELEASE(cd);
2126 return;
2127 }
2128
2129
2130
2131
2132 PMIX_PTL_SEND_ONEWAY(rc, cd->peer, reply, cd->hdr.tag);
2133 if (PMIX_SUCCESS != rc) {
2134 PMIX_ERROR_LOG(rc);
2135 PMIX_RELEASE(reply);
2136 }
2137
2138
2139
2140
2141
2142
2143
2144
2145 PMIX_THREADSHIFT(cd, connection_cleanup);
2146 }
2147
2148 static void _spcb(int sd, short args, void *cbdata)
2149 {
2150 pmix_shift_caddy_t *cd = (pmix_shift_caddy_t*)cbdata;
2151 pmix_buffer_t *reply;
2152 pmix_status_t rc;
2153 pmix_proc_t proc;
2154 pmix_cb_t cb;
2155 pmix_kval_t *kv;
2156
2157 PMIX_ACQUIRE_OBJECT(cd);
2158
2159
2160 if (NULL == (reply = PMIX_NEW(pmix_buffer_t))) {
2161 PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
2162 goto cleanup;
2163 }
2164 PMIX_BFROPS_PACK(rc, cd->cd->peer, reply, &cd->status, 1, PMIX_STATUS);
2165 if (PMIX_SUCCESS != rc) {
2166 PMIX_ERROR_LOG(rc);
2167 PMIX_RELEASE(reply);
2168 goto cleanup;
2169 }
2170
2171 PMIX_BFROPS_PACK(rc, cd->cd->peer, reply, &cd->pname.nspace, 1, PMIX_STRING);
2172 if (PMIX_SUCCESS != rc) {
2173 PMIX_ERROR_LOG(rc);
2174 PMIX_RELEASE(reply);
2175 goto cleanup;
2176 }
2177
2178 pmix_strncpy(proc.nspace, cd->pname.nspace, PMIX_MAX_NSLEN);
2179 proc.rank = PMIX_RANK_WILDCARD;
2180
2181
2182
2183 PMIX_CONSTRUCT(&cb, pmix_cb_t);
2184 cb.proc = &proc;
2185 cb.scope = PMIX_SCOPE_UNDEF;
2186 cb.copy = false;
2187 PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, &cb);
2188 if (PMIX_SUCCESS == rc) {
2189 PMIX_LIST_FOREACH(kv, &cb.kvs, pmix_kval_t) {
2190 PMIX_BFROPS_PACK(rc, cd->cd->peer, reply, kv, 1, PMIX_KVAL);
2191 if (PMIX_SUCCESS != rc) {
2192 PMIX_ERROR_LOG(rc);
2193 PMIX_RELEASE(reply);
2194 PMIX_DESTRUCT(&cb);
2195 goto cleanup;
2196 }
2197 }
2198 PMIX_DESTRUCT(&cb);
2199 }
2200
2201
2202
2203
2204 PMIX_SERVER_QUEUE_REPLY(rc, cd->cd->peer, cd->cd->hdr.tag, reply);
2205 if (PMIX_SUCCESS != rc) {
2206 PMIX_RELEASE(reply);
2207 }
2208
2209 cleanup:
2210
2211 PMIX_RELEASE(cd->cd);
2212 PMIX_RELEASE(cd);
2213 }
2214
2215 static void spawn_cbfunc(pmix_status_t status, char *nspace, void *cbdata)
2216 {
2217 pmix_shift_caddy_t *cd;
2218
2219
2220 cd = PMIX_NEW(pmix_shift_caddy_t);
2221 cd->status = status;
2222 cd->pname.nspace = strdup(nspace);
2223 cd->cd = (pmix_server_caddy_t*)cbdata;;
2224
2225 PMIX_THREADSHIFT(cd, _spcb);
2226 }
2227
2228 static void lookup_cbfunc(pmix_status_t status, pmix_pdata_t pdata[], size_t ndata,
2229 void *cbdata)
2230 {
2231 pmix_server_caddy_t *cd = (pmix_server_caddy_t*)cbdata;
2232 pmix_buffer_t *reply;
2233 pmix_status_t rc;
2234
2235
2236
2237 if (NULL == (reply = PMIX_NEW(pmix_buffer_t))) {
2238 PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
2239 PMIX_RELEASE(cd);
2240 return;
2241 }
2242 PMIX_BFROPS_PACK(rc, cd->peer, reply, &status, 1, PMIX_STATUS);
2243 if (PMIX_SUCCESS != rc) {
2244 PMIX_ERROR_LOG(rc);
2245 PMIX_RELEASE(reply);
2246 return;
2247 }
2248 if (PMIX_SUCCESS == status) {
2249
2250 PMIX_BFROPS_PACK(rc, cd->peer, reply, &ndata, 1, PMIX_SIZE);
2251 if (PMIX_SUCCESS != rc) {
2252 PMIX_ERROR_LOG(rc);
2253 PMIX_RELEASE(reply);
2254 return;
2255 }
2256 PMIX_BFROPS_PACK(rc, cd->peer, reply, pdata, ndata, PMIX_PDATA);
2257 if (PMIX_SUCCESS != rc) {
2258 PMIX_ERROR_LOG(rc);
2259 PMIX_RELEASE(reply);
2260 return;
2261 }
2262 }
2263
2264
2265
2266
2267 PMIX_SERVER_QUEUE_REPLY(rc, cd->peer, cd->hdr.tag, reply);
2268 if (PMIX_SUCCESS != rc) {
2269 PMIX_RELEASE(reply);
2270 }
2271
2272 PMIX_RELEASE(cd);
2273 }
2274
2275
2276
2277
2278
2279
2280 static void _mdxcbfunc(int sd, short argc, void *cbdata)
2281 {
2282 pmix_shift_caddy_t *scd = (pmix_shift_caddy_t*)cbdata;
2283 pmix_server_trkr_t *tracker = scd->tracker;
2284 pmix_buffer_t xfer, *reply;
2285 pmix_server_caddy_t *cd, *nxt;
2286 pmix_status_t rc = PMIX_SUCCESS, ret;
2287 pmix_nspace_caddy_t *nptr;
2288 pmix_list_t nslist;
2289 bool found;
2290
2291 PMIX_ACQUIRE_OBJECT(scd);
2292
2293 if (NULL == tracker) {
2294
2295
2296 if (NULL != scd->cbfunc.relfn) {
2297 scd->cbfunc.relfn(scd->cbdata);
2298 }
2299 PMIX_RELEASE(scd);
2300 return;
2301 }
2302
2303
2304
2305
2306
2307 if (tracker->event_active) {
2308 pmix_event_del(&tracker->ev);
2309 }
2310
2311
2312 PMIX_CONSTRUCT(&xfer, pmix_buffer_t);
2313 PMIX_CONSTRUCT(&nslist, pmix_list_t);
2314
2315 if (PMIX_SUCCESS != scd->status) {
2316 rc = scd->status;
2317 goto finish_collective;
2318 }
2319
2320 if (PMIX_COLLECT_INVALID == tracker->collect_type) {
2321 rc = PMIX_ERR_INVALID_ARG;
2322 goto finish_collective;
2323 }
2324
2325
2326 if (PMIX_COLLECT_YES != tracker->collect_type) {
2327 rc = PMIX_SUCCESS;
2328 goto finish_collective;
2329 }
2330
2331
2332
2333
2334 PMIX_LIST_FOREACH(cd, &tracker->local_cbs, pmix_server_caddy_t) {
2335
2336 found = false;
2337 PMIX_LIST_FOREACH(nptr, &nslist, pmix_nspace_caddy_t) {
2338 if (0 == strcmp(nptr->ns->compat.gds->name,
2339 cd->peer->nptr->compat.gds->name)) {
2340 found = true;
2341 break;
2342 }
2343 }
2344 if (!found) {
2345
2346 nptr = PMIX_NEW(pmix_nspace_caddy_t);
2347 PMIX_RETAIN(cd->peer->nptr);
2348 nptr->ns = cd->peer->nptr;
2349 pmix_list_append(&nslist, &nptr->super);
2350 }
2351 }
2352 PMIX_LIST_FOREACH(nptr, &nslist, pmix_nspace_caddy_t) {
2353
2354 PMIX_LOAD_BUFFER(pmix_globals.mypeer, &xfer, scd->data, scd->ndata);
2355 PMIX_GDS_STORE_MODEX(rc, nptr->ns, &xfer, tracker);
2356 if (PMIX_SUCCESS != rc) {
2357 PMIX_ERROR_LOG(rc);
2358 break;
2359 }
2360 }
2361
2362 finish_collective:
2363
2364 PMIX_LIST_FOREACH_SAFE(cd, nxt, &tracker->local_cbs, pmix_server_caddy_t) {
2365 reply = PMIX_NEW(pmix_buffer_t);
2366 if (NULL == reply) {
2367 rc = PMIX_ERR_NOMEM;
2368 break;
2369 }
2370
2371 PMIX_BFROPS_PACK(ret, cd->peer, reply, &rc, 1, PMIX_STATUS);
2372 if (PMIX_SUCCESS != ret) {
2373 PMIX_ERROR_LOG(ret);
2374 goto cleanup;
2375 }
2376 pmix_output_verbose(2, pmix_server_globals.base_output,
2377 "server:modex_cbfunc reply being sent to %s:%u",
2378 cd->peer->info->pname.nspace, cd->peer->info->pname.rank);
2379 PMIX_SERVER_QUEUE_REPLY(rc, cd->peer, cd->hdr.tag, reply);
2380 if (PMIX_SUCCESS != rc) {
2381 PMIX_RELEASE(reply);
2382 }
2383
2384 pmix_list_remove_item(&tracker->local_cbs, &cd->super);
2385 PMIX_RELEASE(cd);
2386 }
2387
2388 cleanup:
2389
2390
2391
2392
2393
2394 xfer.base_ptr = NULL;
2395 xfer.bytes_used = 0;
2396 PMIX_DESTRUCT(&xfer);
2397
2398 pmix_list_remove_item(&pmix_server_globals.collectives, &tracker->super);
2399 PMIX_RELEASE(tracker);
2400 PMIX_LIST_DESTRUCT(&nslist);
2401
2402
2403 if (NULL != scd->cbfunc.relfn) {
2404 scd->cbfunc.relfn(scd->cbdata);
2405 }
2406 PMIX_RELEASE(scd);
2407 }
2408
2409 static void modex_cbfunc(pmix_status_t status, const char *data, size_t ndata, void *cbdata,
2410 pmix_release_cbfunc_t relfn, void *relcbd)
2411 {
2412 pmix_server_trkr_t *tracker = (pmix_server_trkr_t*)cbdata;
2413 pmix_shift_caddy_t *scd;
2414
2415 pmix_output_verbose(2, pmix_server_globals.base_output,
2416 "server:modex_cbfunc called with %d bytes", (int)ndata);
2417
2418
2419 scd = PMIX_NEW(pmix_shift_caddy_t);
2420 if (NULL == scd) {
2421
2422 if (NULL != relfn) {
2423 relfn(cbdata);
2424 }
2425 return;
2426 }
2427 scd->status = status;
2428 scd->data = data;
2429 scd->ndata = ndata;
2430 scd->tracker = tracker;
2431 scd->cbfunc.relfn = relfn;
2432 scd->cbdata = relcbd;
2433 PMIX_THREADSHIFT(scd, _mdxcbfunc);
2434 }
2435
2436 static void get_cbfunc(pmix_status_t status, const char *data, size_t ndata, void *cbdata,
2437 pmix_release_cbfunc_t relfn, void *relcbd)
2438 {
2439 pmix_server_caddy_t *cd = (pmix_server_caddy_t*)cbdata;
2440 pmix_buffer_t *reply, buf;
2441 pmix_status_t rc;
2442
2443 pmix_output_verbose(2, pmix_server_globals.base_output,
2444 "server:get_cbfunc called with %d bytes", (int)ndata);
2445
2446
2447
2448
2449
2450
2451 if (NULL == cd) {
2452
2453
2454 if (NULL != relfn) {
2455 relfn(relcbd);
2456 }
2457 return;
2458 }
2459
2460
2461 reply = PMIX_NEW(pmix_buffer_t);
2462 if (NULL == reply) {
2463 rc = PMIX_ERR_NOMEM;
2464 goto cleanup;
2465 }
2466 PMIX_BFROPS_PACK(rc, cd->peer, reply, &status, 1, PMIX_STATUS);
2467 if (PMIX_SUCCESS != rc) {
2468 PMIX_ERROR_LOG(rc);
2469 goto cleanup;
2470 }
2471
2472 PMIX_CONSTRUCT(&buf, pmix_buffer_t);
2473 PMIX_LOAD_BUFFER(cd->peer, &buf, data, ndata);
2474 PMIX_BFROPS_COPY_PAYLOAD(rc, cd->peer, reply, &buf);
2475 buf.base_ptr = NULL;
2476 buf.bytes_used = 0;
2477 PMIX_DESTRUCT(&buf);
2478
2479 pmix_output_verbose(2, pmix_server_globals.base_output,
2480 "server:get_cbfunc reply being sent to %s:%u",
2481 cd->peer->info->pname.nspace, cd->peer->info->pname.rank);
2482 pmix_output_hexdump(10, pmix_server_globals.base_output,
2483 reply->base_ptr, (reply->bytes_used < 256 ? reply->bytes_used : 256));
2484
2485 PMIX_SERVER_QUEUE_REPLY(rc, cd->peer, cd->hdr.tag, reply);
2486 if (PMIX_SUCCESS != rc) {
2487 PMIX_RELEASE(reply);
2488 }
2489
2490 cleanup:
2491
2492 if (NULL != relfn) {
2493 relfn(relcbd);
2494 }
2495 PMIX_RELEASE(cd);
2496 }
2497
2498 static void _cnct(int sd, short args, void *cbdata)
2499 {
2500 pmix_shift_caddy_t *scd = (pmix_shift_caddy_t*)cbdata;
2501 pmix_server_trkr_t *tracker = scd->tracker;
2502 pmix_buffer_t *reply, pbkt;
2503 pmix_byte_object_t bo;
2504 pmix_status_t rc;
2505 int i;
2506 pmix_server_caddy_t *cd;
2507 char **nspaces=NULL;
2508 bool found;
2509 pmix_proc_t proc;
2510 pmix_cb_t cb;
2511 pmix_kval_t *kptr;
2512
2513 PMIX_ACQUIRE_OBJECT(scd);
2514
2515 if (NULL == tracker) {
2516
2517 return;
2518 }
2519
2520
2521
2522
2523
2524 if (tracker->event_active) {
2525 pmix_event_del(&tracker->ev);
2526 }
2527
2528
2529 PMIX_LIST_FOREACH(cd, &tracker->local_cbs, pmix_server_caddy_t) {
2530 if (NULL == nspaces) {
2531 pmix_argv_append_nosize(&nspaces, cd->peer->info->pname.nspace);
2532 } else {
2533 found = false;
2534 for (i=0; NULL != nspaces[i]; i++) {
2535 if (0 == strcmp(nspaces[i], cd->peer->info->pname.nspace)) {
2536 found = true;
2537 break;
2538 }
2539 }
2540 if (!found) {
2541 pmix_argv_append_nosize(&nspaces, cd->peer->info->pname.nspace);
2542 }
2543 }
2544 }
2545
2546
2547 PMIX_LIST_FOREACH(cd, &tracker->local_cbs, pmix_server_caddy_t) {
2548
2549 reply = PMIX_NEW(pmix_buffer_t);
2550 if (NULL == reply) {
2551 PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
2552 rc = PMIX_ERR_NOMEM;
2553 goto cleanup;
2554 }
2555
2556 PMIX_BFROPS_PACK(rc, cd->peer, reply, &scd->status, 1, PMIX_STATUS);
2557 if (PMIX_SUCCESS != rc) {
2558 PMIX_ERROR_LOG(rc);
2559 PMIX_RELEASE(reply);
2560 goto cleanup;
2561 }
2562 if (PMIX_SUCCESS == scd->status) {
2563
2564
2565 for (i=0; NULL != nspaces[i]; i++) {
2566
2567
2568 if (0 == strncmp(nspaces[i], cd->peer->info->pname.nspace, PMIX_MAX_NSLEN)) {
2569 continue;
2570 }
2571
2572
2573
2574
2575
2576 proc.rank = PMIX_RANK_WILDCARD;
2577 pmix_strncpy(proc.nspace, nspaces[i], PMIX_MAX_NSLEN);
2578 PMIX_CONSTRUCT(&cb, pmix_cb_t);
2579
2580
2581
2582 cb.proc = &proc;
2583 cb.scope = PMIX_SCOPE_UNDEF;
2584 cb.copy = false;
2585 PMIX_GDS_FETCH_KV(rc, cd->peer, &cb);
2586 if (PMIX_SUCCESS != rc) {
2587 PMIX_ERROR_LOG(rc);
2588 PMIX_RELEASE(reply);
2589 PMIX_DESTRUCT(&cb);
2590 goto cleanup;
2591 }
2592 PMIX_CONSTRUCT(&pbkt, pmix_buffer_t);
2593
2594 PMIX_BFROPS_PACK(rc, cd->peer, &pbkt, &nspaces[i], 1, PMIX_STRING);
2595 if (PMIX_SUCCESS != rc) {
2596 PMIX_ERROR_LOG(rc);
2597 PMIX_RELEASE(reply);
2598 PMIX_DESTRUCT(&pbkt);
2599 PMIX_DESTRUCT(&cb);
2600 goto cleanup;
2601 }
2602 PMIX_LIST_FOREACH(kptr, &cb.kvs, pmix_kval_t) {
2603 PMIX_BFROPS_PACK(rc, cd->peer, &pbkt, kptr, 1, PMIX_KVAL);
2604 if (PMIX_SUCCESS != rc) {
2605 PMIX_ERROR_LOG(rc);
2606 PMIX_RELEASE(reply);
2607 PMIX_DESTRUCT(&pbkt);
2608 PMIX_DESTRUCT(&cb);
2609 goto cleanup;
2610 }
2611 }
2612 PMIX_DESTRUCT(&cb);
2613
2614 if (PMIX_PROC_IS_V1(cd->peer) || PMIX_PROC_IS_V20(cd->peer)) {
2615 PMIX_BFROPS_PACK(rc, cd->peer, reply, &pbkt, 1, PMIX_BUFFER);
2616 if (PMIX_SUCCESS != rc) {
2617 PMIX_ERROR_LOG(rc);
2618 PMIX_RELEASE(reply);
2619 PMIX_DESTRUCT(&pbkt);
2620 PMIX_DESTRUCT(&cb);
2621 goto cleanup;
2622 }
2623 } else {
2624 PMIX_UNLOAD_BUFFER(&pbkt, bo.bytes, bo.size);
2625 PMIX_BFROPS_PACK(rc, cd->peer, reply, &bo, 1, PMIX_BYTE_OBJECT);
2626 if (PMIX_SUCCESS != rc) {
2627 PMIX_ERROR_LOG(rc);
2628 PMIX_RELEASE(reply);
2629 PMIX_DESTRUCT(&pbkt);
2630 PMIX_DESTRUCT(&cb);
2631 goto cleanup;
2632 }
2633 }
2634
2635 PMIX_DESTRUCT(&pbkt);
2636 }
2637 }
2638 pmix_output_verbose(2, pmix_server_globals.base_output,
2639 "server:cnct_cbfunc reply being sent to %s:%u",
2640 cd->peer->info->pname.nspace, cd->peer->info->pname.rank);
2641 PMIX_SERVER_QUEUE_REPLY(rc, cd->peer, cd->hdr.tag, reply);
2642 if (PMIX_SUCCESS != rc) {
2643 PMIX_RELEASE(reply);
2644 }
2645 }
2646
2647 cleanup:
2648 if (NULL != nspaces) {
2649 pmix_argv_free(nspaces);
2650 }
2651 pmix_list_remove_item(&pmix_server_globals.collectives, &tracker->super);
2652 PMIX_RELEASE(tracker);
2653
2654
2655 PMIX_RELEASE(scd);
2656 }
2657
2658 static void cnct_cbfunc(pmix_status_t status, void *cbdata)
2659 {
2660 pmix_server_trkr_t *tracker = (pmix_server_trkr_t*)cbdata;
2661 pmix_shift_caddy_t *scd;
2662
2663 pmix_output_verbose(2, pmix_server_globals.base_output,
2664 "server:cnct_cbfunc called");
2665
2666
2667 scd = PMIX_NEW(pmix_shift_caddy_t);
2668 if (NULL == scd) {
2669
2670 return;
2671 }
2672 scd->status = status;
2673 scd->tracker = tracker;
2674 PMIX_THREADSHIFT(scd, _cnct);
2675 }
2676
2677 static void _discnct(int sd, short args, void *cbdata)
2678 {
2679 pmix_shift_caddy_t *scd = (pmix_shift_caddy_t*)cbdata;
2680 pmix_server_trkr_t *tracker = scd->tracker;
2681 pmix_buffer_t *reply;
2682 pmix_status_t rc;
2683 pmix_server_caddy_t *cd;
2684
2685 PMIX_ACQUIRE_OBJECT(scd);
2686
2687 if (NULL == tracker) {
2688
2689 return;
2690 }
2691
2692
2693
2694
2695
2696 if (tracker->event_active) {
2697 pmix_event_del(&tracker->ev);
2698 }
2699
2700
2701 PMIX_LIST_FOREACH(cd, &tracker->local_cbs, pmix_server_caddy_t) {
2702
2703 reply = PMIX_NEW(pmix_buffer_t);
2704 if (NULL == reply) {
2705 PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
2706 rc = PMIX_ERR_NOMEM;
2707 goto cleanup;
2708 }
2709
2710 PMIX_BFROPS_PACK(rc, cd->peer, reply, &scd->status, 1, PMIX_STATUS);
2711 if (PMIX_SUCCESS != rc) {
2712 PMIX_ERROR_LOG(rc);
2713 PMIX_RELEASE(reply);
2714 goto cleanup;
2715 }
2716 pmix_output_verbose(2, pmix_server_globals.base_output,
2717 "server:cnct_cbfunc reply being sent to %s:%u",
2718 cd->peer->info->pname.nspace, cd->peer->info->pname.rank);
2719 PMIX_SERVER_QUEUE_REPLY(rc, cd->peer, cd->hdr.tag, reply);
2720 if (PMIX_SUCCESS != rc) {
2721 PMIX_RELEASE(reply);
2722 }
2723 }
2724
2725 cleanup:
2726
2727
2728 pmix_list_remove_item(&pmix_server_globals.collectives, &tracker->super);
2729 PMIX_RELEASE(tracker);
2730
2731
2732 PMIX_RELEASE(scd);
2733 }
2734
2735 static void discnct_cbfunc(pmix_status_t status, void *cbdata)
2736 {
2737 pmix_server_trkr_t *tracker = (pmix_server_trkr_t*)cbdata;
2738 pmix_shift_caddy_t *scd;
2739
2740 pmix_output_verbose(2, pmix_server_globals.base_output,
2741 "server:discnct_cbfunc called on nspace %s",
2742 (NULL == tracker) ? "NULL" : tracker->pname.nspace);
2743
2744
2745 scd = PMIX_NEW(pmix_shift_caddy_t);
2746 if (NULL == scd) {
2747
2748 return;
2749 }
2750 scd->status = status;
2751 scd->tracker = tracker;
2752 PMIX_THREADSHIFT(scd, _discnct);
2753 }
2754
2755
2756 static void regevents_cbfunc(pmix_status_t status, void *cbdata)
2757 {
2758 pmix_status_t rc;
2759 pmix_server_caddy_t *cd = (pmix_server_caddy_t*) cbdata;
2760 pmix_buffer_t *reply;
2761
2762 pmix_output_verbose(2, pmix_server_globals.base_output,
2763 "server:regevents_cbfunc called status = %d", status);
2764
2765 reply = PMIX_NEW(pmix_buffer_t);
2766 if (NULL == reply) {
2767 PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
2768 PMIX_RELEASE(cd);
2769 return;
2770 }
2771 PMIX_BFROPS_PACK(rc, cd->peer, reply, &status, 1, PMIX_STATUS);
2772 if (PMIX_SUCCESS != rc) {
2773 PMIX_ERROR_LOG(rc);
2774 }
2775
2776 PMIX_SERVER_QUEUE_REPLY(rc, cd->peer, cd->hdr.tag, reply);
2777 if (PMIX_SUCCESS != rc) {
2778 PMIX_RELEASE(reply);
2779 }
2780 PMIX_RELEASE(cd);
2781 }
2782
2783 static void notifyerror_cbfunc (pmix_status_t status, void *cbdata)
2784 {
2785 pmix_status_t rc;
2786 pmix_server_caddy_t *cd = (pmix_server_caddy_t*) cbdata;
2787 pmix_buffer_t *reply;
2788
2789 pmix_output_verbose(2, pmix_server_globals.base_output,
2790 "server:notifyerror_cbfunc called status = %d", status);
2791
2792 reply = PMIX_NEW(pmix_buffer_t);
2793 if (NULL == reply) {
2794 PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
2795 PMIX_RELEASE(cd);
2796 return;
2797 }
2798 PMIX_BFROPS_PACK(rc, cd->peer, reply, &status, 1, PMIX_STATUS);
2799 if (PMIX_SUCCESS != rc) {
2800 PMIX_ERROR_LOG(rc);
2801 }
2802
2803 PMIX_SERVER_QUEUE_REPLY(rc, cd->peer, cd->hdr.tag, reply);
2804 if (PMIX_SUCCESS != rc) {
2805 PMIX_RELEASE(reply);
2806 }
2807 PMIX_RELEASE(cd);
2808 }
2809
2810 static void alloc_cbfunc(pmix_status_t status,
2811 pmix_info_t *info, size_t ninfo,
2812 void *cbdata,
2813 pmix_release_cbfunc_t release_fn,
2814 void *release_cbdata)
2815 {
2816 pmix_query_caddy_t *qcd = (pmix_query_caddy_t*)cbdata;
2817 pmix_server_caddy_t *cd = (pmix_server_caddy_t*)qcd->cbdata;
2818 pmix_buffer_t *reply;
2819 pmix_status_t rc;
2820
2821 pmix_output_verbose(2, pmix_server_globals.base_output,
2822 "pmix:alloc callback with status %d", status);
2823
2824 reply = PMIX_NEW(pmix_buffer_t);
2825 if (NULL == reply) {
2826 PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
2827 PMIX_RELEASE(cd);
2828 return;
2829 }
2830 PMIX_BFROPS_PACK(rc, cd->peer, reply, &status, 1, PMIX_STATUS);
2831 if (PMIX_SUCCESS != rc) {
2832 PMIX_ERROR_LOG(rc);
2833 goto complete;
2834 }
2835
2836 PMIX_BFROPS_PACK(rc, cd->peer, reply, &ninfo, 1, PMIX_SIZE);
2837 if (PMIX_SUCCESS != rc) {
2838 PMIX_ERROR_LOG(rc);
2839 goto complete;
2840 }
2841 if (0 < ninfo) {
2842 PMIX_BFROPS_PACK(rc, cd->peer, reply, info, ninfo, PMIX_INFO);
2843 if (PMIX_SUCCESS != rc) {
2844 PMIX_ERROR_LOG(rc);
2845 }
2846 }
2847
2848 complete:
2849
2850 PMIX_SERVER_QUEUE_REPLY(rc, cd->peer, cd->hdr.tag, reply);
2851 if (PMIX_SUCCESS != rc) {
2852 PMIX_RELEASE(reply);
2853 }
2854
2855
2856 if (NULL != qcd->queries) {
2857 PMIX_QUERY_FREE(qcd->queries, qcd->nqueries);
2858 }
2859 if (NULL != qcd->info) {
2860 PMIX_INFO_FREE(qcd->info, qcd->ninfo);
2861 }
2862 PMIX_RELEASE(qcd);
2863 PMIX_RELEASE(cd);
2864 if (NULL != release_fn) {
2865 release_fn(release_cbdata);
2866 }
2867 }
2868
2869 static void query_cbfunc(pmix_status_t status,
2870 pmix_info_t *info, size_t ninfo,
2871 void *cbdata,
2872 pmix_release_cbfunc_t release_fn,
2873 void *release_cbdata)
2874 {
2875 pmix_query_caddy_t *qcd = (pmix_query_caddy_t*)cbdata;
2876 pmix_server_caddy_t *cd = (pmix_server_caddy_t*)qcd->cbdata;
2877 pmix_buffer_t *reply;
2878 pmix_status_t rc;
2879
2880 pmix_output_verbose(2, pmix_server_globals.base_output,
2881 "pmix:query callback with status %s", PMIx_Error_string(status));
2882
2883 reply = PMIX_NEW(pmix_buffer_t);
2884 if (NULL == reply) {
2885 PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
2886 PMIX_RELEASE(cd);
2887 return;
2888 }
2889 PMIX_BFROPS_PACK(rc, cd->peer, reply, &status, 1, PMIX_STATUS);
2890 if (PMIX_SUCCESS != rc) {
2891 PMIX_ERROR_LOG(rc);
2892 goto complete;
2893 }
2894
2895 PMIX_BFROPS_PACK(rc, cd->peer, reply, &ninfo, 1, PMIX_SIZE);
2896 if (PMIX_SUCCESS != rc) {
2897 PMIX_ERROR_LOG(rc);
2898 goto complete;
2899 }
2900 if (0 < ninfo) {
2901 PMIX_BFROPS_PACK(rc, cd->peer, reply, info, ninfo, PMIX_INFO);
2902 if (PMIX_SUCCESS != rc) {
2903 PMIX_ERROR_LOG(rc);
2904 }
2905 }
2906
2907
2908
2909 complete:
2910
2911 PMIX_SERVER_QUEUE_REPLY(rc, cd->peer, cd->hdr.tag, reply);
2912 if (PMIX_SUCCESS != rc) {
2913 PMIX_RELEASE(reply);
2914 }
2915
2916
2917 if (NULL != qcd->queries) {
2918 PMIX_QUERY_FREE(qcd->queries, qcd->nqueries);
2919 }
2920 if (NULL != qcd->info) {
2921 PMIX_INFO_FREE(qcd->info, qcd->ninfo);
2922 }
2923 PMIX_RELEASE(qcd);
2924 PMIX_RELEASE(cd);
2925 if (NULL != release_fn) {
2926 release_fn(release_cbdata);
2927 }
2928 }
2929
2930 static void jctrl_cbfunc(pmix_status_t status,
2931 pmix_info_t *info, size_t ninfo,
2932 void *cbdata,
2933 pmix_release_cbfunc_t release_fn,
2934 void *release_cbdata)
2935 {
2936 pmix_query_caddy_t *qcd = (pmix_query_caddy_t*)cbdata;
2937 pmix_server_caddy_t *cd = (pmix_server_caddy_t*)qcd->cbdata;
2938 pmix_buffer_t *reply;
2939 pmix_status_t rc;
2940
2941 pmix_output_verbose(2, pmix_server_globals.base_output,
2942 "pmix:jctrl callback with status %d", status);
2943
2944 reply = PMIX_NEW(pmix_buffer_t);
2945 if (NULL == reply) {
2946 PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
2947 PMIX_RELEASE(cd);
2948 return;
2949 }
2950 PMIX_BFROPS_PACK(rc, cd->peer, reply, &status, 1, PMIX_STATUS);
2951 if (PMIX_SUCCESS != rc) {
2952 PMIX_ERROR_LOG(rc);
2953 goto complete;
2954 }
2955
2956 PMIX_BFROPS_PACK(rc, cd->peer, reply, &ninfo, 1, PMIX_SIZE);
2957 if (PMIX_SUCCESS != rc) {
2958 PMIX_ERROR_LOG(rc);
2959 goto complete;
2960 }
2961 if (0 < ninfo) {
2962 PMIX_BFROPS_PACK(rc, cd->peer, reply, info, ninfo, PMIX_INFO);
2963 if (PMIX_SUCCESS != rc) {
2964 PMIX_ERROR_LOG(rc);
2965 }
2966 }
2967
2968 complete:
2969
2970 PMIX_SERVER_QUEUE_REPLY(rc, cd->peer, cd->hdr.tag, reply);
2971 if (PMIX_SUCCESS != rc) {
2972 PMIX_RELEASE(reply);
2973 }
2974
2975
2976 if (NULL != qcd->queries) {
2977 PMIX_QUERY_FREE(qcd->queries, qcd->nqueries);
2978 }
2979 if (NULL != qcd->info) {
2980 PMIX_INFO_FREE(qcd->info, qcd->ninfo);
2981 }
2982 PMIX_RELEASE(qcd);
2983 PMIX_RELEASE(cd);
2984 if (NULL != release_fn) {
2985 release_fn(release_cbdata);
2986 }
2987 }
2988
2989 static void monitor_cbfunc(pmix_status_t status,
2990 pmix_info_t *info, size_t ninfo,
2991 void *cbdata,
2992 pmix_release_cbfunc_t release_fn,
2993 void *release_cbdata)
2994 {
2995 pmix_query_caddy_t *qcd = (pmix_query_caddy_t*)cbdata;
2996 pmix_server_caddy_t *cd = (pmix_server_caddy_t*)qcd->cbdata;
2997 pmix_buffer_t *reply;
2998 pmix_status_t rc;
2999
3000 pmix_output_verbose(2, pmix_server_globals.base_output,
3001 "pmix:monitor callback with status %d", status);
3002
3003 reply = PMIX_NEW(pmix_buffer_t);
3004 if (NULL == reply) {
3005 PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
3006 PMIX_RELEASE(cd);
3007 return;
3008 }
3009 PMIX_BFROPS_PACK(rc, cd->peer, reply, &status, 1, PMIX_STATUS);
3010 if (PMIX_SUCCESS != rc) {
3011 PMIX_ERROR_LOG(rc);
3012 goto complete;
3013 }
3014
3015 PMIX_BFROPS_PACK(rc, cd->peer, reply, &ninfo, 1, PMIX_SIZE);
3016 if (PMIX_SUCCESS != rc) {
3017 PMIX_ERROR_LOG(rc);
3018 goto complete;
3019 }
3020 if (0 < ninfo) {
3021 PMIX_BFROPS_PACK(rc, cd->peer, reply, info, ninfo, PMIX_INFO);
3022 if (PMIX_SUCCESS != rc) {
3023 PMIX_ERROR_LOG(rc);
3024 }
3025 }
3026
3027 complete:
3028
3029 PMIX_SERVER_QUEUE_REPLY(rc, cd->peer, cd->hdr.tag, reply);
3030 if (PMIX_SUCCESS != rc) {
3031 PMIX_RELEASE(reply);
3032 }
3033
3034
3035 if (NULL != qcd->queries) {
3036 PMIX_QUERY_FREE(qcd->queries, qcd->nqueries);
3037 }
3038 if (NULL != qcd->info) {
3039 PMIX_INFO_FREE(qcd->info, qcd->ninfo);
3040 }
3041 PMIX_RELEASE(qcd);
3042 PMIX_RELEASE(cd);
3043 if (NULL != release_fn) {
3044 release_fn(release_cbdata);
3045 }
3046 }
3047
3048 static void cred_cbfunc(pmix_status_t status,
3049 pmix_byte_object_t *credential,
3050 pmix_info_t info[], size_t ninfo,
3051 void *cbdata)
3052 {
3053 pmix_query_caddy_t *qcd = (pmix_query_caddy_t*)cbdata;
3054 pmix_server_caddy_t *cd = (pmix_server_caddy_t*)qcd->cbdata;
3055 pmix_buffer_t *reply;
3056 pmix_status_t rc;
3057
3058 pmix_output_verbose(2, pmix_globals.debug_output,
3059 "pmix:get credential callback with status %d", status);
3060
3061 reply = PMIX_NEW(pmix_buffer_t);
3062 if (NULL == reply) {
3063 PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
3064 PMIX_RELEASE(cd);
3065 return;
3066 }
3067
3068
3069 PMIX_BFROPS_PACK(rc, cd->peer, reply, &status, 1, PMIX_STATUS);
3070 if (PMIX_SUCCESS != rc) {
3071 PMIX_ERROR_LOG(rc);
3072 goto complete;
3073 }
3074
3075 if (PMIX_SUCCESS == status) {
3076
3077 PMIX_BFROPS_PACK(rc, cd->peer, reply, credential, 1, PMIX_BYTE_OBJECT);
3078 if (PMIX_SUCCESS != rc) {
3079 PMIX_ERROR_LOG(rc);
3080 goto complete;
3081 }
3082
3083
3084 PMIX_BFROPS_PACK(rc, cd->peer, reply, &ninfo, 1, PMIX_SIZE);
3085 if (PMIX_SUCCESS != rc) {
3086 PMIX_ERROR_LOG(rc);
3087 goto complete;
3088 }
3089 if (0 < ninfo) {
3090 PMIX_BFROPS_PACK(rc, cd->peer, reply, info, ninfo, PMIX_INFO);
3091 if (PMIX_SUCCESS != rc) {
3092 PMIX_ERROR_LOG(rc);
3093 }
3094 }
3095 }
3096
3097 complete:
3098
3099 PMIX_SERVER_QUEUE_REPLY(rc, cd->peer, cd->hdr.tag, reply);
3100 if (PMIX_SUCCESS != rc) {
3101 PMIX_RELEASE(reply);
3102 }
3103
3104
3105 if (NULL != qcd->info) {
3106 PMIX_INFO_FREE(qcd->info, qcd->ninfo);
3107 }
3108 PMIX_RELEASE(qcd);
3109 PMIX_RELEASE(cd);
3110 }
3111
3112 static void validate_cbfunc(pmix_status_t status,
3113 pmix_info_t info[], size_t ninfo,
3114 void *cbdata)
3115 {
3116 pmix_query_caddy_t *qcd = (pmix_query_caddy_t*)cbdata;
3117 pmix_server_caddy_t *cd = (pmix_server_caddy_t*)qcd->cbdata;
3118 pmix_buffer_t *reply;
3119 pmix_status_t rc;
3120
3121 pmix_output_verbose(2, pmix_globals.debug_output,
3122 "pmix:validate credential callback with status %d", status);
3123
3124 reply = PMIX_NEW(pmix_buffer_t);
3125 if (NULL == reply) {
3126 PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
3127 PMIX_RELEASE(cd);
3128 return;
3129 }
3130 PMIX_BFROPS_PACK(rc, cd->peer, reply, &status, 1, PMIX_STATUS);
3131 if (PMIX_SUCCESS != rc) {
3132 PMIX_ERROR_LOG(rc);
3133 goto complete;
3134 }
3135
3136 PMIX_BFROPS_PACK(rc, cd->peer, reply, &ninfo, 1, PMIX_SIZE);
3137 if (PMIX_SUCCESS != rc) {
3138 PMIX_ERROR_LOG(rc);
3139 goto complete;
3140 }
3141 if (0 < ninfo) {
3142 PMIX_BFROPS_PACK(rc, cd->peer, reply, info, ninfo, PMIX_INFO);
3143 if (PMIX_SUCCESS != rc) {
3144 PMIX_ERROR_LOG(rc);
3145 }
3146 }
3147
3148 complete:
3149
3150 PMIX_SERVER_QUEUE_REPLY(rc, cd->peer, cd->hdr.tag, reply);
3151 if (PMIX_SUCCESS != rc) {
3152 PMIX_RELEASE(reply);
3153 }
3154
3155 if (NULL != qcd->info) {
3156 PMIX_INFO_FREE(qcd->info, qcd->ninfo);
3157 }
3158 PMIX_RELEASE(qcd);
3159 PMIX_RELEASE(cd);
3160 }
3161
3162
3163 static void _iofreg(int sd, short args, void *cbdata)
3164 {
3165 pmix_setup_caddy_t *cd = (pmix_setup_caddy_t*)cbdata;
3166 pmix_server_caddy_t *scd = (pmix_server_caddy_t*)cd->cbdata;
3167 pmix_buffer_t *reply;
3168 pmix_status_t rc;
3169
3170 PMIX_ACQUIRE_OBJECT(cd);
3171
3172
3173 reply = PMIX_NEW(pmix_buffer_t);
3174 if (NULL == reply) {
3175 PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
3176 rc = PMIX_ERR_NOMEM;
3177 goto cleanup;
3178 }
3179
3180 PMIX_BFROPS_PACK(rc, scd->peer, reply, &cd->status, 1, PMIX_STATUS);
3181 if (PMIX_SUCCESS != rc) {
3182 PMIX_ERROR_LOG(rc);
3183 PMIX_RELEASE(reply);
3184 goto cleanup;
3185 }
3186
3187
3188 if (PMIX_SUCCESS != cd->status) {
3189
3190 }
3191
3192 pmix_output_verbose(2, pmix_server_globals.iof_output,
3193 "server:_iofreg reply being sent to %s:%u",
3194 scd->peer->info->pname.nspace, scd->peer->info->pname.rank);
3195 PMIX_SERVER_QUEUE_REPLY(rc, scd->peer, scd->hdr.tag, reply);
3196 if (PMIX_SUCCESS != rc) {
3197 PMIX_RELEASE(reply);
3198 }
3199
3200 cleanup:
3201
3202 if (NULL != cd->procs) {
3203 PMIX_PROC_FREE(cd->procs, cd->nprocs);
3204 }
3205 PMIX_INFO_FREE(cd->info, cd->ninfo);
3206
3207 PMIX_RELEASE(cd);
3208 }
3209
3210 static void iof_cbfunc(pmix_status_t status,
3211 void *cbdata)
3212 {
3213 pmix_setup_caddy_t *cd = (pmix_setup_caddy_t*)cbdata;
3214
3215 pmix_output_verbose(2, pmix_server_globals.iof_output,
3216 "server:iof_cbfunc called with status %d",
3217 status);
3218
3219 if (NULL == cd) {
3220
3221 return;
3222 }
3223 cd->status = status;
3224
3225
3226 PMIX_THREADSHIFT(cd, _iofreg);
3227 }
3228
3229
3230
3231
3232
3233
3234
3235
3236
3237
3238
3239
3240
3241
3242
3243
3244
3245
3246 static pmix_status_t server_switchyard(pmix_peer_t *peer, uint32_t tag,
3247 pmix_buffer_t *buf)
3248 {
3249 pmix_status_t rc=PMIX_ERR_NOT_SUPPORTED;
3250 int32_t cnt;
3251 pmix_cmd_t cmd;
3252 pmix_server_caddy_t *cd;
3253 pmix_proc_t proc;
3254 pmix_buffer_t *reply;
3255
3256
3257 cnt = 1;
3258 PMIX_BFROPS_UNPACK(rc, peer, buf, &cmd, &cnt, PMIX_COMMAND);
3259 if (PMIX_SUCCESS != rc) {
3260 PMIX_ERROR_LOG(rc);
3261 return rc;
3262 }
3263 pmix_output_verbose(2, pmix_server_globals.base_output,
3264 "recvd pmix cmd %s from %s:%u",
3265 pmix_command_string(cmd), peer->info->pname.nspace, peer->info->pname.rank);
3266
3267 if (PMIX_REQ_CMD == cmd) {
3268 reply = PMIX_NEW(pmix_buffer_t);
3269 if (NULL == reply) {
3270 PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
3271 return PMIX_ERR_NOMEM;
3272 }
3273 PMIX_GDS_REGISTER_JOB_INFO(rc, peer, reply);
3274 if (PMIX_SUCCESS != rc) {
3275 PMIX_ERROR_LOG(rc);
3276 return rc;
3277 }
3278 PMIX_SERVER_QUEUE_REPLY(rc, peer, tag, reply);
3279 if (PMIX_SUCCESS != rc) {
3280 PMIX_RELEASE(reply);
3281 }
3282 peer->nptr->ndelivered++;
3283 return PMIX_SUCCESS;
3284 }
3285
3286 if (PMIX_ABORT_CMD == cmd) {
3287 PMIX_GDS_CADDY(cd, peer, tag);
3288 if (PMIX_SUCCESS != (rc = pmix_server_abort(peer, buf, op_cbfunc, cd))) {
3289 PMIX_RELEASE(cd);
3290 }
3291 return rc;
3292 }
3293
3294 if (PMIX_COMMIT_CMD == cmd) {
3295 rc = pmix_server_commit(peer, buf);
3296 if (!PMIX_PROC_IS_V1(peer)) {
3297 reply = PMIX_NEW(pmix_buffer_t);
3298 if (NULL == reply) {
3299 PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
3300 return PMIX_ERR_NOMEM;
3301 }
3302 PMIX_BFROPS_PACK(rc, peer, reply, &rc, 1, PMIX_STATUS);
3303 if (PMIX_SUCCESS != rc) {
3304 PMIX_ERROR_LOG(rc);
3305 }
3306 PMIX_SERVER_QUEUE_REPLY(rc, peer, tag, reply);
3307 if (PMIX_SUCCESS != rc) {
3308 PMIX_RELEASE(reply);
3309 }
3310 }
3311 return PMIX_SUCCESS;
3312 }
3313
3314 if (PMIX_FENCENB_CMD == cmd) {
3315 PMIX_GDS_CADDY(cd, peer, tag);
3316 if (PMIX_SUCCESS != (rc = pmix_server_fence(cd, buf, modex_cbfunc, op_cbfunc))) {
3317 PMIX_RELEASE(cd);
3318 }
3319 return rc;
3320 }
3321
3322 if (PMIX_GETNB_CMD == cmd) {
3323 PMIX_GDS_CADDY(cd, peer, tag);
3324 if (PMIX_SUCCESS != (rc = pmix_server_get(buf, get_cbfunc, cd))) {
3325 PMIX_RELEASE(cd);
3326 }
3327 return rc;
3328 }
3329
3330 if (PMIX_FINALIZE_CMD == cmd) {
3331 pmix_output_verbose(2, pmix_server_globals.base_output,
3332 "recvd FINALIZE");
3333 peer->nptr->nfinalized++;
3334
3335 pmix_server_purge_events(peer, NULL);
3336
3337
3338 if (peer->recv_ev_active) {
3339 pmix_event_del(&peer->recv_event);
3340 peer->recv_ev_active = false;
3341 }
3342 PMIX_GDS_CADDY(cd, peer, tag);
3343
3344 if (NULL != pmix_host_server.client_finalized) {
3345 pmix_strncpy(proc.nspace, peer->info->pname.nspace, PMIX_MAX_NSLEN);
3346 proc.rank = peer->info->pname.rank;
3347
3348 rc = pmix_host_server.client_finalized(&proc, peer->info->server_object,
3349 op_cbfunc2, cd);
3350 if (PMIX_SUCCESS == rc) {
3351
3352
3353 return rc;
3354 } else if (PMIX_OPERATION_SUCCEEDED == rc) {
3355
3356 rc = PMIX_SUCCESS;
3357 }
3358
3359
3360
3361
3362
3363
3364 op_cbfunc2(rc, cd);
3365
3366
3367 return PMIX_SUCCESS;
3368 }
3369
3370
3371
3372
3373
3374
3375 op_cbfunc2(PMIX_SUCCESS, cd);
3376
3377
3378 return PMIX_SUCCESS;
3379 }
3380
3381
3382 if (PMIX_PUBLISHNB_CMD == cmd) {
3383 PMIX_GDS_CADDY(cd, peer, tag);
3384 if (PMIX_SUCCESS != (rc = pmix_server_publish(peer, buf, op_cbfunc, cd))) {
3385 PMIX_RELEASE(cd);
3386 }
3387 return rc;
3388 }
3389
3390
3391 if (PMIX_LOOKUPNB_CMD == cmd) {
3392 PMIX_GDS_CADDY(cd, peer, tag);
3393 if (PMIX_SUCCESS != (rc = pmix_server_lookup(peer, buf, lookup_cbfunc, cd))) {
3394 PMIX_RELEASE(cd);
3395 }
3396 return rc;
3397 }
3398
3399
3400 if (PMIX_UNPUBLISHNB_CMD == cmd) {
3401 PMIX_GDS_CADDY(cd, peer, tag);
3402 if (PMIX_SUCCESS != (rc = pmix_server_unpublish(peer, buf, op_cbfunc, cd))) {
3403 PMIX_RELEASE(cd);
3404 }
3405 return rc;
3406 }
3407
3408
3409 if (PMIX_SPAWNNB_CMD == cmd) {
3410 PMIX_GDS_CADDY(cd, peer, tag);
3411 if (PMIX_SUCCESS != (rc = pmix_server_spawn(peer, buf, spawn_cbfunc, cd))) {
3412 PMIX_RELEASE(cd);
3413 }
3414 return rc;
3415 }
3416
3417
3418 if (PMIX_CONNECTNB_CMD == cmd) {
3419 PMIX_GDS_CADDY(cd, peer, tag);
3420 rc = pmix_server_connect(cd, buf, cnct_cbfunc);
3421 if (PMIX_SUCCESS != rc) {
3422 PMIX_RELEASE(cd);
3423 }
3424 return rc;
3425 }
3426
3427 if (PMIX_DISCONNECTNB_CMD == cmd) {
3428 PMIX_GDS_CADDY(cd, peer, tag);
3429 rc = pmix_server_disconnect(cd, buf, discnct_cbfunc);
3430 if (PMIX_SUCCESS != rc) {
3431 PMIX_RELEASE(cd);
3432 }
3433 return rc;
3434 }
3435
3436 if (PMIX_REGEVENTS_CMD == cmd) {
3437 PMIX_GDS_CADDY(cd, peer, tag);
3438 if (PMIX_SUCCESS != (rc = pmix_server_register_events(peer, buf, regevents_cbfunc, cd))) {
3439 PMIX_RELEASE(cd);
3440 }
3441 return rc;
3442 }
3443
3444 if (PMIX_DEREGEVENTS_CMD == cmd) {
3445 pmix_server_deregister_events(peer, buf);
3446 return PMIX_SUCCESS;
3447 }
3448
3449 if (PMIX_NOTIFY_CMD == cmd) {
3450 PMIX_GDS_CADDY(cd, peer, tag);
3451 if (PMIX_SUCCESS != (rc = pmix_server_event_recvd_from_client(peer, buf, notifyerror_cbfunc, cd))) {
3452 PMIX_RELEASE(cd);
3453 }
3454 return rc;
3455 }
3456
3457 if (PMIX_QUERY_CMD == cmd) {
3458 PMIX_GDS_CADDY(cd, peer, tag);
3459 if (PMIX_SUCCESS != (rc = pmix_server_query(peer, buf, query_cbfunc, cd))) {
3460 PMIX_RELEASE(cd);
3461 }
3462 return rc;
3463 }
3464
3465 if (PMIX_LOG_CMD == cmd) {
3466 PMIX_GDS_CADDY(cd, peer, tag);
3467 if (PMIX_SUCCESS != (rc = pmix_server_log(peer, buf, op_cbfunc, cd))) {
3468 PMIX_RELEASE(cd);
3469 }
3470 return rc;
3471 }
3472
3473 if (PMIX_ALLOC_CMD == cmd) {
3474 PMIX_GDS_CADDY(cd, peer, tag);
3475 if (PMIX_SUCCESS != (rc = pmix_server_alloc(peer, buf, alloc_cbfunc, cd))) {
3476 PMIX_RELEASE(cd);
3477 }
3478 return rc;
3479 }
3480
3481 if (PMIX_JOB_CONTROL_CMD == cmd) {
3482 PMIX_GDS_CADDY(cd, peer, tag);
3483 if (PMIX_SUCCESS != (rc = pmix_server_job_ctrl(peer, buf, jctrl_cbfunc, cd))) {
3484 PMIX_RELEASE(cd);
3485 }
3486 return rc;
3487 }
3488
3489 if (PMIX_MONITOR_CMD == cmd) {
3490 PMIX_GDS_CADDY(cd, peer, tag);
3491 if (PMIX_SUCCESS != (rc = pmix_server_monitor(peer, buf, monitor_cbfunc, cd))) {
3492 PMIX_RELEASE(cd);
3493 }
3494 return rc;
3495 }
3496
3497 if (PMIX_GET_CREDENTIAL_CMD == cmd) {
3498 PMIX_GDS_CADDY(cd, peer, tag);
3499 if (PMIX_SUCCESS != (rc = pmix_server_get_credential(peer, buf, cred_cbfunc, cd))) {
3500 PMIX_RELEASE(cd);
3501 }
3502 return rc;
3503 }
3504
3505 if (PMIX_VALIDATE_CRED_CMD == cmd) {
3506 PMIX_GDS_CADDY(cd, peer, tag);
3507 if (PMIX_SUCCESS != (rc = pmix_server_validate_credential(peer, buf, validate_cbfunc, cd))) {
3508 PMIX_RELEASE(cd);
3509 }
3510 return rc;
3511 }
3512
3513 if (PMIX_IOF_PULL_CMD == cmd) {
3514 PMIX_GDS_CADDY(cd, peer, tag);
3515 if (PMIX_SUCCESS != (rc = pmix_server_iofreg(peer, buf, iof_cbfunc, cd))) {
3516 PMIX_RELEASE(cd);
3517 }
3518 return rc;
3519 }
3520
3521 if (PMIX_IOF_PUSH_CMD == cmd) {
3522 PMIX_GDS_CADDY(cd, peer, tag);
3523 if (PMIX_SUCCESS != (rc = pmix_server_iofstdin(peer, buf, op_cbfunc, cd))) {
3524 PMIX_RELEASE(cd);
3525 }
3526 return rc;
3527 }
3528
3529 if (PMIX_GROUP_CONSTRUCT_CMD == cmd) {
3530 PMIX_GDS_CADDY(cd, peer, tag);
3531 if (PMIX_SUCCESS != (rc = pmix_server_grpconstruct(cd, buf))) {
3532 PMIX_RELEASE(cd);
3533 }
3534 return rc;
3535 }
3536
3537 if (PMIX_GROUP_DESTRUCT_CMD == cmd) {
3538 PMIX_GDS_CADDY(cd, peer, tag);
3539 if (PMIX_SUCCESS != (rc = pmix_server_grpdestruct(cd, buf))) {
3540 PMIX_RELEASE(cd);
3541 }
3542 return rc;
3543 }
3544
3545 return PMIX_ERR_NOT_SUPPORTED;
3546 }
3547
3548 void pmix_server_message_handler(struct pmix_peer_t *pr,
3549 pmix_ptl_hdr_t *hdr,
3550 pmix_buffer_t *buf, void *cbdata)
3551 {
3552 pmix_peer_t *peer = (pmix_peer_t*)pr;
3553 pmix_buffer_t *reply;
3554 pmix_status_t rc, ret;
3555
3556 pmix_output_verbose(2, pmix_server_globals.base_output,
3557 "SWITCHYARD for %s:%u:%d",
3558 peer->info->pname.nspace,
3559 peer->info->pname.rank, peer->sd);
3560
3561 ret = server_switchyard(peer, hdr->tag, buf);
3562
3563 if (PMIX_SUCCESS != ret) {
3564 reply = PMIX_NEW(pmix_buffer_t);
3565 if (NULL == reply) {
3566 PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
3567 return;
3568 }
3569 if (PMIX_OPERATION_SUCCEEDED == ret) {
3570 ret = PMIX_SUCCESS;
3571 }
3572 PMIX_BFROPS_PACK(rc, pr, reply, &ret, 1, PMIX_STATUS);
3573 if (PMIX_SUCCESS != rc) {
3574 PMIX_ERROR_LOG(rc);
3575 }
3576 PMIX_SERVER_QUEUE_REPLY(rc, peer, hdr->tag, reply);
3577 if (PMIX_SUCCESS != rc) {
3578 PMIX_RELEASE(reply);
3579 }
3580 }
3581 }