This source file includes following definitions.
- xfcon
- xfdes
- opcbfunc
- main
- setup_cbfunc
- set_namespace
- errhandler
- errhandler_reg_callbk
- connected
- finalized
- abcbfunc
- abort_fn
- fencenb_fn
- dmodex_fn
- publish_fn
- lookup_fn
- unpublish_fn
- spcbfunc
- spawn_fn
- connect_fn
- disconnect_fn
- register_event_fn
- deregister_events
- notify_event
- query_fn
- tool_connect_fn
- log_fn
- wait_signal_callback
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 #include <src/include/pmix_config.h>
29 #include <pmix_server.h>
30 #include <src/include/types.h>
31 #include <src/include/pmix_globals.h>
32
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <unistd.h>
36 #include <time.h>
37 #include <sys/types.h>
38 #include <sys/wait.h>
39 #include <errno.h>
40 #include <signal.h>
41 #include <pwd.h>
42 #include <sys/stat.h>
43 #include <dirent.h>
44
45 #include "src/class/pmix_list.h"
46 #include "src/util/pmix_environ.h"
47 #include "src/util/output.h"
48 #include "src/util/printf.h"
49 #include "src/util/argv.h"
50
51 static pmix_status_t connected(const pmix_proc_t *proc, void *server_object,
52 pmix_op_cbfunc_t cbfunc, void *cbdata);
53 static pmix_status_t finalized(const pmix_proc_t *proc, void *server_object,
54 pmix_op_cbfunc_t cbfunc, void *cbdata);
55 static pmix_status_t abort_fn(const pmix_proc_t *proc, void *server_object,
56 int status, const char msg[],
57 pmix_proc_t procs[], size_t nprocs,
58 pmix_op_cbfunc_t cbfunc, void *cbdata);
59 static pmix_status_t fencenb_fn(const pmix_proc_t procs[], size_t nprocs,
60 const pmix_info_t info[], size_t ninfo,
61 char *data, size_t ndata,
62 pmix_modex_cbfunc_t cbfunc, void *cbdata);
63 static pmix_status_t dmodex_fn(const pmix_proc_t *proc,
64 const pmix_info_t info[], size_t ninfo,
65 pmix_modex_cbfunc_t cbfunc, void *cbdata);
66 static pmix_status_t publish_fn(const pmix_proc_t *proc,
67 const pmix_info_t info[], size_t ninfo,
68 pmix_op_cbfunc_t cbfunc, void *cbdata);
69 static pmix_status_t lookup_fn(const pmix_proc_t *proc, char **keys,
70 const pmix_info_t info[], size_t ninfo,
71 pmix_lookup_cbfunc_t cbfunc, void *cbdata);
72 static pmix_status_t unpublish_fn(const pmix_proc_t *proc, char **keys,
73 const pmix_info_t info[], size_t ninfo,
74 pmix_op_cbfunc_t cbfunc, void *cbdata);
75 static pmix_status_t spawn_fn(const pmix_proc_t *proc,
76 const pmix_info_t job_info[], size_t ninfo,
77 const pmix_app_t apps[], size_t napps,
78 pmix_spawn_cbfunc_t cbfunc, void *cbdata);
79 static pmix_status_t connect_fn(const pmix_proc_t procs[], size_t nprocs,
80 const pmix_info_t info[], size_t ninfo,
81 pmix_op_cbfunc_t cbfunc, void *cbdata);
82 static pmix_status_t disconnect_fn(const pmix_proc_t procs[], size_t nprocs,
83 const pmix_info_t info[], size_t ninfo,
84 pmix_op_cbfunc_t cbfunc, void *cbdata);
85 static pmix_status_t register_event_fn(pmix_status_t *codes, size_t ncodes,
86 const pmix_info_t info[], size_t ninfo,
87 pmix_op_cbfunc_t cbfunc, void *cbdata);
88 static pmix_status_t deregister_events(pmix_status_t *codes, size_t ncodes,
89 pmix_op_cbfunc_t cbfunc, void *cbdata);
90 static pmix_status_t notify_event(pmix_status_t code,
91 const pmix_proc_t *source,
92 pmix_data_range_t range,
93 pmix_info_t info[], size_t ninfo,
94 pmix_op_cbfunc_t cbfunc, void *cbdata);
95 static pmix_status_t query_fn(pmix_proc_t *proct,
96 pmix_query_t *queries, size_t nqueries,
97 pmix_info_cbfunc_t cbfunc,
98 void *cbdata);
99 static void tool_connect_fn(pmix_info_t *info, size_t ninfo,
100 pmix_tool_connection_cbfunc_t cbfunc,
101 void *cbdata);
102 static void log_fn(const pmix_proc_t *client,
103 const pmix_info_t data[], size_t ndata,
104 const pmix_info_t directives[], size_t ndirs,
105 pmix_op_cbfunc_t cbfunc, void *cbdata);
106
107 static pmix_server_module_t mymodule = {
108 .client_connected = connected,
109 .client_finalized = finalized,
110 .abort = abort_fn,
111 .fence_nb = fencenb_fn,
112 .direct_modex = dmodex_fn,
113 .publish = publish_fn,
114 .lookup = lookup_fn,
115 .unpublish = unpublish_fn,
116 .spawn = spawn_fn,
117 .connect = connect_fn,
118 .disconnect = disconnect_fn,
119 .register_events = register_event_fn,
120 .deregister_events = deregister_events,
121 .notify_event = notify_event,
122 .query = query_fn,
123 .tool_connected = tool_connect_fn,
124 .log = log_fn
125 };
126
127 typedef struct {
128 pmix_list_item_t super;
129 pmix_pdata_t pdata;
130 } pmix_locdat_t;
131 PMIX_CLASS_INSTANCE(pmix_locdat_t,
132 pmix_list_item_t,
133 NULL, NULL);
134
135 #define PMIX_WAIT_FOR_COMPLETION(a) \
136 do { \
137 while ((a)) { \
138 usleep(10); \
139 } \
140 PMIX_ACQUIRE_OBJECT((a)); \
141 } while (0)
142
143 typedef struct {
144 pmix_object_t super;
145 volatile bool active;
146 pmix_proc_t caller;
147 pmix_info_t *info;
148 size_t ninfo;
149 pmix_op_cbfunc_t cbfunc;
150 pmix_spawn_cbfunc_t spcbfunc;
151 void *cbdata;
152 } myxfer_t;
153 static void xfcon(myxfer_t *p)
154 {
155 p->info = NULL;
156 p->ninfo = 0;
157 p->active = true;
158 p->cbfunc = NULL;
159 p->spcbfunc = NULL;
160 p->cbdata = NULL;
161 }
162 static void xfdes(myxfer_t *p)
163 {
164 if (NULL != p->info) {
165 PMIX_INFO_FREE(p->info, p->ninfo);
166 }
167 }
168 PMIX_CLASS_INSTANCE(myxfer_t,
169 pmix_object_t,
170 xfcon, xfdes);
171
172 typedef struct {
173 pmix_list_item_t super;
174 pid_t pid;
175 } wait_tracker_t;
176 PMIX_CLASS_INSTANCE(wait_tracker_t,
177 pmix_list_item_t,
178 NULL, NULL);
179
180 static volatile int wakeup;
181 static pmix_list_t pubdata;
182 static pmix_event_t handler;
183 static pmix_list_t children;
184
185 static void set_namespace(int nprocs, char *ranks, char *nspace,
186 pmix_op_cbfunc_t cbfunc, myxfer_t *x);
187 static void errhandler(size_t evhdlr_registration_id,
188 pmix_status_t status,
189 const pmix_proc_t *source,
190 pmix_info_t info[], size_t ninfo,
191 pmix_info_t results[], size_t nresults,
192 pmix_event_notification_cbfunc_fn_t cbfunc,
193 void *cbdata);
194 static void wait_signal_callback(int fd, short event, void *arg);
195 static void errhandler_reg_callbk (pmix_status_t status,
196 size_t errhandler_ref,
197 void *cbdata);
198
199 static void opcbfunc(pmix_status_t status, void *cbdata)
200 {
201 myxfer_t *x = (myxfer_t*)cbdata;
202
203
204 if (NULL != x->cbfunc) {
205 x->cbfunc(PMIX_SUCCESS, x->cbdata);
206 }
207 x->active = false;
208 }
209
210 int main(int argc, char **argv)
211 {
212 char **client_env=NULL;
213 char **client_argv=NULL;
214 char *tmp, **atmp, *executable=NULL, *tmpdir, *cleanup;
215 int rc, nprocs=1, n, k;
216 uid_t myuid;
217 gid_t mygid;
218 pid_t pid;
219 myxfer_t *x;
220 pmix_proc_t proc;
221 wait_tracker_t *child;
222 char *tdir;
223 uid_t uid = geteuid();
224 pmix_info_t *info;
225 struct stat buf;
226
227
228 if (NULL == (tdir = getenv("TMPDIR"))) {
229 if (NULL == (tdir = getenv("TEMP"))) {
230 if (NULL == (tdir = getenv("TMP"))) {
231 tdir = "/tmp";
232 }
233 }
234 }
235 if (0 > asprintf(&tmpdir, "%s/pmix.%lu", tdir, (long unsigned)uid)) {
236 fprintf(stderr, "Out of memory\n");
237 exit(1);
238 }
239
240 if (0 != stat(tmpdir, &buf)) {
241
242 if (0 != mkdir(tmpdir, S_IRWXU)) {
243 fprintf(stderr, "Cannot make tmpdir %s", tmpdir);
244 exit(1);
245 }
246 }
247 asprintf(&cleanup, "rm -rf %s", tmpdir);
248 PMIX_INFO_CREATE(info, 1);
249 PMIX_INFO_LOAD(&info[0], PMIX_SERVER_TMPDIR, tmpdir, PMIX_STRING);
250
251
252 if (PMIX_SUCCESS != (rc = PMIx_server_init(&mymodule, info, 1))) {
253 fprintf(stderr, "Init failed with error %d\n", rc);
254 return rc;
255 }
256 PMIX_INFO_FREE(info, 1);
257
258
259 PMIx_Register_event_handler(NULL, 0, NULL, 0,
260 errhandler, errhandler_reg_callbk, NULL);
261
262
263 PMIX_CONSTRUCT(&pubdata, pmix_list_t);
264
265
266 PMIX_CONSTRUCT(&children, pmix_list_t);
267 pmix_event_assign(&handler, pmix_globals.evbase, SIGCHLD,
268 EV_SIGNAL|EV_PERSIST,wait_signal_callback, &handler);
269 pmix_event_add(&handler, NULL);
270
271
272
273 for (n=1; n < (argc-1); n++) {
274 if (0 == strcmp("-n", argv[n]) &&
275 NULL != argv[n+1]) {
276 nprocs = strtol(argv[n+1], NULL, 10);
277 ++n;
278 } else if (0 == strcmp("-e", argv[n]) &&
279 NULL != argv[n+1]) {
280 executable = strdup(argv[n+1]);
281 for (k=n+2; NULL != argv[k]; k++) {
282 pmix_argv_append_nosize(&client_argv, argv[k]);
283 }
284 n += k;
285 }
286 }
287 if (NULL == executable) {
288 executable = strdup("./simpclient");
289 }
290
291
292 atmp = NULL;
293 for (n=0; n < nprocs; n++) {
294 asprintf(&tmp, "%d", n);
295 pmix_argv_append_nosize(&atmp, tmp);
296 free(tmp);
297 }
298 tmp = pmix_argv_join(atmp, ',');
299 pmix_argv_free(atmp);
300
301 x = PMIX_NEW(myxfer_t);
302 set_namespace(nprocs, tmp, "foobar", opcbfunc, x);
303
304
305 client_env = pmix_argv_copy(environ);
306 pmix_argv_prepend_nosize(&client_argv, executable);
307
308 wakeup = nprocs;
309 myuid = getuid();
310 mygid = getgid();
311
312
313
314 PMIX_WAIT_FOR_COMPLETION(x->active);
315 free(tmp);
316 PMIX_RELEASE(x);
317
318
319 x = PMIX_NEW(myxfer_t);
320 if (PMIX_SUCCESS != (rc = PMIx_server_setup_local_support("foobar", NULL, 0, opcbfunc, x))) {
321 fprintf(stderr, "Setup local support failed: %d\n", rc);
322 PMIx_server_finalize();
323 system(cleanup);
324 return rc;
325 }
326 PMIX_WAIT_FOR_COMPLETION(x->active);
327 PMIX_RELEASE(x);
328
329
330 (void)strncpy(proc.nspace, "foobar", PMIX_MAX_NSLEN);
331 for (n = 0; n < nprocs; n++) {
332 proc.rank = n;
333 if (PMIX_SUCCESS != (rc = PMIx_server_setup_fork(&proc, &client_env))) {
334 fprintf(stderr, "Server fork setup failed with error %d\n", rc);
335 PMIx_server_finalize();
336 system(cleanup);
337 return rc;
338 }
339 x = PMIX_NEW(myxfer_t);
340 if (PMIX_SUCCESS != (rc = PMIx_server_register_client(&proc, myuid, mygid,
341 NULL, opcbfunc, x))) {
342 fprintf(stderr, "Server fork setup failed with error %d\n", rc);
343 PMIx_server_finalize();
344 system(cleanup);
345 return rc;
346 }
347
348
349 PMIX_WAIT_FOR_COMPLETION(x->active);
350 PMIX_RELEASE(x);
351 pid = fork();
352 if (pid < 0) {
353 fprintf(stderr, "Fork failed\n");
354 PMIx_server_finalize();
355 system(cleanup);
356 return -1;
357 }
358 child = PMIX_NEW(wait_tracker_t);
359 child->pid = pid;
360 pmix_list_append(&children, &child->super);
361
362 if (pid == 0) {
363 execve(executable, client_argv, client_env);
364
365 exit(0);
366 }
367 }
368 free(executable);
369 pmix_argv_free(client_argv);
370 pmix_argv_free(client_env);
371
372
373 while (0 < wakeup) {
374 struct timespec ts;
375 ts.tv_sec = 0;
376 ts.tv_nsec = 100000;
377 nanosleep(&ts, NULL);
378 }
379
380
381 PMIx_Deregister_event_handler(0, NULL, NULL);
382
383
384 PMIX_LIST_DESTRUCT(&pubdata);
385
386
387 if (PMIX_SUCCESS != (rc = PMIx_server_finalize())) {
388 fprintf(stderr, "Finalize failed with error %d\n", rc);
389 }
390
391 fprintf(stderr, "Test finished OK!\n");
392 system(cleanup);
393
394 return rc;
395 }
396
397 static void setup_cbfunc(pmix_status_t status,
398 pmix_info_t info[], size_t ninfo,
399 void *provided_cbdata,
400 pmix_op_cbfunc_t cbfunc, void *cbdata)
401 {
402 myxfer_t *myxfer = (myxfer_t*)provided_cbdata;
403 size_t i;
404
405 if (PMIX_SUCCESS == status && 0 < ninfo) {
406 myxfer->ninfo = ninfo;
407 PMIX_INFO_CREATE(myxfer->info, ninfo);
408 for (i=0; i < ninfo; i++) {
409 PMIX_INFO_XFER(&myxfer->info[i], &info[i]);
410 }
411 }
412 if (NULL != cbfunc) {
413 cbfunc(PMIX_SUCCESS, cbdata);
414 }
415 myxfer->active = false;
416 }
417
418 static void set_namespace(int nprocs, char *ranks, char *nspace,
419 pmix_op_cbfunc_t cbfunc, myxfer_t *x)
420 {
421 char *regex, *ppn;
422 char hostname[PMIX_MAXHOSTNAMELEN];
423 pmix_status_t rc;
424 myxfer_t myxfer;
425 size_t i;
426
427 gethostname(hostname, sizeof(hostname));
428
429
430
431 PMIX_CONSTRUCT(&myxfer, myxfer_t);
432 myxfer.active = true;
433 if (PMIX_SUCCESS != (rc = PMIx_server_setup_application(nspace, NULL, 0, setup_cbfunc, &myxfer))) {
434 PMIX_DESTRUCT(&myxfer);
435 fprintf(stderr, "Failed to setup application: %d\n", rc);
436 exit(1);
437 }
438 PMIX_WAIT_FOR_COMPLETION(myxfer.active);
439 x->ninfo = myxfer.ninfo + 7;
440
441 PMIX_INFO_CREATE(x->info, x->ninfo);
442 if (0 < myxfer.ninfo) {
443 for (i=0; i < myxfer.ninfo; i++) {
444 PMIX_INFO_XFER(&x->info[i], &myxfer.info[i]);
445 }
446 }
447 PMIX_DESTRUCT(&myxfer);
448
449 (void)strncpy(x->info[i].key, PMIX_UNIV_SIZE, PMIX_MAX_KEYLEN);
450 x->info[i].value.type = PMIX_UINT32;
451 x->info[i].value.data.uint32 = nprocs;
452
453 ++i;
454 (void)strncpy(x->info[i].key, PMIX_SPAWNED, PMIX_MAX_KEYLEN);
455 x->info[i].value.type = PMIX_UINT32;
456 x->info[i].value.data.uint32 = 0;
457
458 ++i;
459 (void)strncpy(x->info[i].key, PMIX_LOCAL_SIZE, PMIX_MAX_KEYLEN);
460 x->info[i].value.type = PMIX_UINT32;
461 x->info[i].value.data.uint32 = nprocs;
462
463 ++i;
464 (void)strncpy(x->info[i].key, PMIX_LOCAL_PEERS, PMIX_MAX_KEYLEN);
465 x->info[i].value.type = PMIX_STRING;
466 x->info[i].value.data.string = strdup(ranks);
467
468 ++i;
469 PMIx_generate_regex(hostname, ®ex);
470 (void)strncpy(x->info[i].key, PMIX_NODE_MAP, PMIX_MAX_KEYLEN);
471 x->info[i].value.type = PMIX_STRING;
472 x->info[i].value.data.string = regex;
473
474 ++i;
475 PMIx_generate_ppn(ranks, &ppn);
476 (void)strncpy(x->info[i].key, PMIX_PROC_MAP, PMIX_MAX_KEYLEN);
477 x->info[i].value.type = PMIX_STRING;
478 x->info[i].value.data.string = ppn;
479
480 ++i;
481 (void)strncpy(x->info[i].key, PMIX_JOB_SIZE, PMIX_MAX_KEYLEN);
482 x->info[i].value.type = PMIX_UINT32;
483 x->info[i].value.data.uint32 = nprocs;
484
485 PMIx_server_register_nspace(nspace, nprocs, x->info, x->ninfo,
486 cbfunc, x);
487 }
488
489 static void errhandler(size_t evhdlr_registration_id,
490 pmix_status_t status,
491 const pmix_proc_t *source,
492 pmix_info_t info[], size_t ninfo,
493 pmix_info_t results[], size_t nresults,
494 pmix_event_notification_cbfunc_fn_t cbfunc,
495 void *cbdata)
496 {
497 pmix_output(0, "SERVER: ERRHANDLER CALLED WITH STATUS %d", status);
498 }
499
500 static void errhandler_reg_callbk (pmix_status_t status,
501 size_t errhandler_ref,
502 void *cbdata)
503 {
504 return;
505 }
506
507 static pmix_status_t connected(const pmix_proc_t *proc, void *server_object,
508 pmix_op_cbfunc_t cbfunc, void *cbdata)
509 {
510 if (NULL != cbfunc) {
511 cbfunc(PMIX_SUCCESS, cbdata);
512 }
513 return PMIX_SUCCESS;
514 }
515 static pmix_status_t finalized(const pmix_proc_t *proc, void *server_object,
516 pmix_op_cbfunc_t cbfunc, void *cbdata)
517 {
518 pmix_output(0, "SERVER: FINALIZED %s:%d",
519 proc->nspace, proc->rank);
520 --wakeup;
521
522 if (NULL != cbfunc) {
523 cbfunc(PMIX_SUCCESS, cbdata);
524 }
525 return PMIX_SUCCESS;
526 }
527
528 static void abcbfunc(pmix_status_t status, void *cbdata)
529 {
530 myxfer_t *x = (myxfer_t*)cbdata;
531
532
533 if (NULL != x->cbfunc) {
534 x->cbfunc(status, x->cbdata);
535 }
536 PMIX_RELEASE(x);
537 }
538
539 static pmix_status_t abort_fn(const pmix_proc_t *proc,
540 void *server_object,
541 int status, const char msg[],
542 pmix_proc_t procs[], size_t nprocs,
543 pmix_op_cbfunc_t cbfunc, void *cbdata)
544 {
545 pmix_status_t rc;
546 myxfer_t *x;
547
548 if (NULL != procs) {
549 pmix_output(0, "SERVER: ABORT on %s:%d", procs[0].nspace, procs[0].rank);
550 } else {
551 pmix_output(0, "SERVER: ABORT OF ALL PROCS IN NSPACE %s", proc->nspace);
552 }
553
554
555
556
557
558
559 x = PMIX_NEW(myxfer_t);
560 (void)strncpy(x->caller.nspace, proc->nspace, PMIX_MAX_NSLEN);
561 x->caller.rank = proc->rank;
562
563 PMIX_INFO_CREATE(x->info, 2);
564 (void)strncpy(x->info[0].key, "DARTH", PMIX_MAX_KEYLEN);
565 x->info[0].value.type = PMIX_INT8;
566 x->info[0].value.data.int8 = 12;
567 (void)strncpy(x->info[1].key, "VADER", PMIX_MAX_KEYLEN);
568 x->info[1].value.type = PMIX_DOUBLE;
569 x->info[1].value.data.dval = 12.34;
570 x->cbfunc = cbfunc;
571 x->cbdata = cbdata;
572
573 if (PMIX_SUCCESS != (rc = PMIx_Notify_event(status, &x->caller,
574 PMIX_RANGE_NAMESPACE,
575 x->info, 2,
576 abcbfunc, x))) {
577 pmix_output(0, "SERVER: FAILED NOTIFY ERROR %d", (int)rc);
578 }
579
580 return PMIX_SUCCESS;
581 }
582
583
584 static pmix_status_t fencenb_fn(const pmix_proc_t procs[], size_t nprocs,
585 const pmix_info_t info[], size_t ninfo,
586 char *data, size_t ndata,
587 pmix_modex_cbfunc_t cbfunc, void *cbdata)
588 {
589 pmix_output(0, "SERVER: FENCENB");
590
591 if (NULL != cbfunc) {
592 cbfunc(PMIX_SUCCESS, data, ndata, cbdata, NULL, NULL);
593 }
594 return PMIX_SUCCESS;
595 }
596
597
598 static pmix_status_t dmodex_fn(const pmix_proc_t *proc,
599 const pmix_info_t info[], size_t ninfo,
600 pmix_modex_cbfunc_t cbfunc, void *cbdata)
601 {
602 pmix_output(0, "SERVER: DMODEX");
603
604
605
606 if (NULL != cbfunc) {
607 cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, cbdata, NULL, NULL);
608 }
609 return PMIX_SUCCESS;
610 }
611
612
613 static pmix_status_t publish_fn(const pmix_proc_t *proc,
614 const pmix_info_t info[], size_t ninfo,
615 pmix_op_cbfunc_t cbfunc, void *cbdata)
616 {
617 pmix_locdat_t *p;
618 size_t n;
619
620 pmix_output(0, "SERVER: PUBLISH");
621
622 for (n=0; n < ninfo; n++) {
623 p = PMIX_NEW(pmix_locdat_t);
624 (void)strncpy(p->pdata.proc.nspace, proc->nspace, PMIX_MAX_NSLEN);
625 p->pdata.proc.rank = proc->rank;
626 (void)strncpy(p->pdata.key, info[n].key, PMIX_MAX_KEYLEN);
627 pmix_value_xfer(&p->pdata.value, (pmix_value_t*)&info[n].value);
628 pmix_list_append(&pubdata, &p->super);
629 }
630 if (NULL != cbfunc) {
631 cbfunc(PMIX_SUCCESS, cbdata);
632 }
633 return PMIX_SUCCESS;
634 }
635
636
637 static pmix_status_t lookup_fn(const pmix_proc_t *proc, char **keys,
638 const pmix_info_t info[], size_t ninfo,
639 pmix_lookup_cbfunc_t cbfunc, void *cbdata)
640 {
641 pmix_locdat_t *p, *p2;
642 pmix_list_t results;
643 size_t i, n;
644 pmix_pdata_t *pd = NULL;
645 pmix_status_t ret = PMIX_ERR_NOT_FOUND;
646
647 pmix_output(0, "SERVER: LOOKUP");
648
649 PMIX_CONSTRUCT(&results, pmix_list_t);
650
651 for (n=0; NULL != keys[n]; n++) {
652 PMIX_LIST_FOREACH(p, &pubdata, pmix_locdat_t) {
653 if (0 == strncmp(keys[n], p->pdata.key, PMIX_MAX_KEYLEN)) {
654 p2 = PMIX_NEW(pmix_locdat_t);
655 (void)strncpy(p2->pdata.proc.nspace, p->pdata.proc.nspace, PMIX_MAX_NSLEN);
656 p2->pdata.proc.rank = p->pdata.proc.rank;
657 (void)strncpy(p2->pdata.key, p->pdata.key, PMIX_MAX_KEYLEN);
658 pmix_value_xfer(&p2->pdata.value, &p->pdata.value);
659 pmix_list_append(&results, &p2->super);
660 break;
661 }
662 }
663 }
664 if (0 < (n = pmix_list_get_size(&results))) {
665 ret = PMIX_SUCCESS;
666 PMIX_PDATA_CREATE(pd, n);
667 for (i=0; i < n; i++) {
668 p = (pmix_locdat_t*)pmix_list_remove_first(&results);
669 if (p) {
670 (void)strncpy(pd[i].proc.nspace, p->pdata.proc.nspace, PMIX_MAX_NSLEN);
671 pd[i].proc.rank = p->pdata.proc.rank;
672 (void)strncpy(pd[i].key, p->pdata.key, PMIX_MAX_KEYLEN);
673 pmix_value_xfer(&pd[i].value, &p->pdata.value);
674 }
675 }
676 }
677 PMIX_LIST_DESTRUCT(&results);
678 if (NULL != cbfunc) {
679 cbfunc(ret, pd, n, cbdata);
680 }
681 if (0 < n) {
682 PMIX_PDATA_FREE(pd, n);
683 }
684 return PMIX_SUCCESS;
685 }
686
687
688 static pmix_status_t unpublish_fn(const pmix_proc_t *proc, char **keys,
689 const pmix_info_t info[], size_t ninfo,
690 pmix_op_cbfunc_t cbfunc, void *cbdata)
691 {
692 pmix_locdat_t *p, *p2;
693 size_t n;
694
695 pmix_output(0, "SERVER: UNPUBLISH");
696
697 for (n=0; NULL != keys[n]; n++) {
698 PMIX_LIST_FOREACH_SAFE(p, p2, &pubdata, pmix_locdat_t) {
699 if (0 == strncmp(keys[n], p->pdata.key, PMIX_MAX_KEYLEN)) {
700 pmix_list_remove_item(&pubdata, &p->super);
701 PMIX_RELEASE(p);
702 break;
703 }
704 }
705 }
706 if (NULL != cbfunc) {
707 cbfunc(PMIX_SUCCESS, cbdata);
708 }
709 return PMIX_SUCCESS;
710 }
711
712 static void spcbfunc(pmix_status_t status, void *cbdata)
713 {
714 myxfer_t *x = (myxfer_t*)cbdata;
715
716 if (NULL != x->spcbfunc) {
717 x->spcbfunc(PMIX_SUCCESS, "DYNSPACE", x->cbdata);
718 }
719 }
720
721 static pmix_status_t spawn_fn(const pmix_proc_t *proc,
722 const pmix_info_t job_info[], size_t ninfo,
723 const pmix_app_t apps[], size_t napps,
724 pmix_spawn_cbfunc_t cbfunc, void *cbdata)
725 {
726 myxfer_t *x;
727
728 pmix_output(0, "SERVER: SPAWN");
729
730
731
732
733
734
735
736
737 x = PMIX_NEW(myxfer_t);
738 x->spcbfunc = cbfunc;
739 x->cbdata = cbdata;
740
741 set_namespace(2, "0,1", "DYNSPACE", spcbfunc, x);
742
743 return PMIX_SUCCESS;
744 }
745
746
747 static pmix_status_t connect_fn(const pmix_proc_t procs[], size_t nprocs,
748 const pmix_info_t info[], size_t ninfo,
749 pmix_op_cbfunc_t cbfunc, void *cbdata)
750 {
751 pmix_output(0, "SERVER: CONNECT");
752
753
754
755
756 if (NULL != cbfunc) {
757 cbfunc(PMIX_SUCCESS, cbdata);
758 }
759
760 return PMIX_SUCCESS;
761 }
762
763
764 static pmix_status_t disconnect_fn(const pmix_proc_t procs[], size_t nprocs,
765 const pmix_info_t info[], size_t ninfo,
766 pmix_op_cbfunc_t cbfunc, void *cbdata)
767 {
768 pmix_output(0, "SERVER: DISCONNECT");
769
770
771
772
773 if (NULL != cbfunc) {
774 cbfunc(PMIX_SUCCESS, cbdata);
775 }
776
777 return PMIX_SUCCESS;
778 }
779
780 static pmix_status_t register_event_fn(pmix_status_t *codes, size_t ncodes,
781 const pmix_info_t info[], size_t ninfo,
782 pmix_op_cbfunc_t cbfunc, void *cbdata)
783 {
784 if (NULL != cbfunc) {
785 cbfunc(PMIX_SUCCESS, cbdata);
786 }
787 return PMIX_SUCCESS;
788 }
789
790 static pmix_status_t deregister_events(pmix_status_t *codes, size_t ncodes,
791 pmix_op_cbfunc_t cbfunc, void *cbdata)
792 {
793 return PMIX_SUCCESS;
794 }
795
796 static pmix_status_t notify_event(pmix_status_t code,
797 const pmix_proc_t *source,
798 pmix_data_range_t range,
799 pmix_info_t info[], size_t ninfo,
800 pmix_op_cbfunc_t cbfunc, void *cbdata)
801 {
802 return PMIX_SUCCESS;
803 }
804
805 typedef struct query_data_t {
806 pmix_info_t *data;
807 size_t ndata;
808 } query_data_t;
809
810 static pmix_status_t query_fn(pmix_proc_t *proct,
811 pmix_query_t *queries, size_t nqueries,
812 pmix_info_cbfunc_t cbfunc,
813 void *cbdata)
814 {
815 size_t n;
816 pmix_info_t *info;
817
818 pmix_output(0, "SERVER: QUERY");
819
820 if (NULL == cbfunc) {
821 return PMIX_ERROR;
822 }
823
824 PMIX_INFO_CREATE(info, nqueries);
825 for (n=0; n < nqueries; n++) {
826 (void)strncpy(info[n].key, queries[n].keys[0], PMIX_MAX_KEYLEN);
827 info[n].value.type = PMIX_STRING;
828 if (0 > asprintf(&info[n].value.data.string, "%d", (int)n)) {
829 return PMIX_ERROR;
830 }
831 }
832 cbfunc(PMIX_SUCCESS, info, nqueries, cbdata, NULL, NULL);
833 return PMIX_SUCCESS;
834 }
835
836 static void tool_connect_fn(pmix_info_t *info, size_t ninfo,
837 pmix_tool_connection_cbfunc_t cbfunc,
838 void *cbdata)
839 {
840 pmix_proc_t proc;
841
842 pmix_output(0, "SERVER: TOOL CONNECT");
843
844
845 (void)strncpy(proc.nspace, "TOOL", PMIX_MAX_NSLEN);
846 proc.rank = 0;
847
848 if (NULL != cbfunc) {
849 cbfunc(PMIX_SUCCESS, &proc, cbdata);
850 }
851 }
852
853 static void log_fn(const pmix_proc_t *client,
854 const pmix_info_t data[], size_t ndata,
855 const pmix_info_t directives[], size_t ndirs,
856 pmix_op_cbfunc_t cbfunc, void *cbdata)
857 {
858 pmix_output(0, "SERVER: LOG");
859
860 if (NULL != cbfunc) {
861 cbfunc(PMIX_SUCCESS, cbdata);
862 }
863 }
864
865 static void wait_signal_callback(int fd, short event, void *arg)
866 {
867 pmix_event_t *sig = (pmix_event_t*) arg;
868 int status;
869 pid_t pid;
870 wait_tracker_t *t2;
871
872 if (SIGCHLD != pmix_event_get_signal(sig)) {
873 return;
874 }
875
876
877
878
879 while (1) {
880 pid = waitpid(-1, &status, WNOHANG);
881 if (-1 == pid && EINTR == errno) {
882
883 continue;
884 }
885
886 if (pid <= 0) {
887 return;
888 }
889
890
891 PMIX_LIST_FOREACH(t2, &children, wait_tracker_t) {
892 if (pid == t2->pid) {
893
894 --wakeup;
895 break;
896 }
897 }
898 }
899 }