This source file includes following definitions.
- xfcon
- xfdes
- opcbfunc
- main
- set_namespace
- errhandler
- errhandler_reg_callbk
- connected
- finalized
- abcbfunc
- abort_fn
- fencenb_fn
- dmodex_fn
- publish_fn
- lookup_fn
- unpublish_fn
- spcbfunc
- spawn_fn
- connect_fn
- disconnect_fn
- register_event_fn
- deregister_events
- notify_event
- query_fn
- tool_connect_fn
- log_fn
- wait_signal_callback
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 #include <src/include/pmix_config.h>
29 #include <pmix_server.h>
30 #include <src/include/types.h>
31 #include <src/include/pmix_globals.h>
32
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <unistd.h>
36 #include <time.h>
37 #include <sys/types.h>
38 #include <sys/wait.h>
39 #include <errno.h>
40 #include <signal.h>
41
42 #if PMIX_HAVE_HWLOC
43 #include <src/hwloc/hwloc-internal.h>
44 #endif
45
46 #include "src/class/pmix_list.h"
47 #include "src/util/pmix_environ.h"
48 #include "src/util/output.h"
49 #include "src/util/printf.h"
50 #include "src/util/argv.h"
51
52 #include "simptest.h"
53
54 static pmix_status_t connected(const pmix_proc_t *proc, void *server_object,
55 pmix_op_cbfunc_t cbfunc, void *cbdata);
56 static pmix_status_t finalized(const pmix_proc_t *proc, void *server_object,
57 pmix_op_cbfunc_t cbfunc, void *cbdata);
58 static pmix_status_t abort_fn(const pmix_proc_t *proc, void *server_object,
59 int status, const char msg[],
60 pmix_proc_t procs[], size_t nprocs,
61 pmix_op_cbfunc_t cbfunc, void *cbdata);
62 static pmix_status_t fencenb_fn(const pmix_proc_t procs[], size_t nprocs,
63 const pmix_info_t info[], size_t ninfo,
64 char *data, size_t ndata,
65 pmix_modex_cbfunc_t cbfunc, void *cbdata);
66 static pmix_status_t dmodex_fn(const pmix_proc_t *proc,
67 const pmix_info_t info[], size_t ninfo,
68 pmix_modex_cbfunc_t cbfunc, void *cbdata);
69 static pmix_status_t publish_fn(const pmix_proc_t *proc,
70 const pmix_info_t info[], size_t ninfo,
71 pmix_op_cbfunc_t cbfunc, void *cbdata);
72 static pmix_status_t lookup_fn(const pmix_proc_t *proc, char **keys,
73 const pmix_info_t info[], size_t ninfo,
74 pmix_lookup_cbfunc_t cbfunc, void *cbdata);
75 static pmix_status_t unpublish_fn(const pmix_proc_t *proc, char **keys,
76 const pmix_info_t info[], size_t ninfo,
77 pmix_op_cbfunc_t cbfunc, void *cbdata);
78 static pmix_status_t spawn_fn(const pmix_proc_t *proc,
79 const pmix_info_t job_info[], size_t ninfo,
80 const pmix_app_t apps[], size_t napps,
81 pmix_spawn_cbfunc_t cbfunc, void *cbdata);
82 static pmix_status_t connect_fn(const pmix_proc_t procs[], size_t nprocs,
83 const pmix_info_t info[], size_t ninfo,
84 pmix_op_cbfunc_t cbfunc, void *cbdata);
85 static pmix_status_t disconnect_fn(const pmix_proc_t procs[], size_t nprocs,
86 const pmix_info_t info[], size_t ninfo,
87 pmix_op_cbfunc_t cbfunc, void *cbdata);
88 static pmix_status_t register_event_fn(pmix_status_t *codes, size_t ncodes,
89 const pmix_info_t info[], size_t ninfo,
90 pmix_op_cbfunc_t cbfunc, void *cbdata);
91 static pmix_status_t deregister_events(pmix_status_t *codes, size_t ncodes,
92 pmix_op_cbfunc_t cbfunc, void *cbdata);
93 static pmix_status_t notify_event(pmix_status_t code,
94 const pmix_proc_t *source,
95 pmix_data_range_t range,
96 pmix_info_t info[], size_t ninfo,
97 pmix_op_cbfunc_t cbfunc, void *cbdata);
98 static pmix_status_t query_fn(pmix_proc_t *proct,
99 pmix_query_t *queries, size_t nqueries,
100 pmix_info_cbfunc_t cbfunc,
101 void *cbdata);
102 static void tool_connect_fn(pmix_info_t *info, size_t ninfo,
103 pmix_tool_connection_cbfunc_t cbfunc,
104 void *cbdata);
105 static void log_fn(const pmix_proc_t *client,
106 const pmix_info_t data[], size_t ndata,
107 const pmix_info_t directives[], size_t ndirs,
108 pmix_op_cbfunc_t cbfunc, void *cbdata);
109
110 static pmix_server_module_t mymodule = {
111 .client_connected = connected,
112 .client_finalized = finalized,
113 .abort = abort_fn,
114 .fence_nb = fencenb_fn,
115 .direct_modex = dmodex_fn,
116 .publish = publish_fn,
117 .lookup = lookup_fn,
118 .unpublish = unpublish_fn,
119 .spawn = spawn_fn,
120 .connect = connect_fn,
121 .disconnect = disconnect_fn,
122 .register_events = register_event_fn,
123 .deregister_events = deregister_events,
124 .notify_event = notify_event,
125 .query = query_fn,
126 .tool_connected = tool_connect_fn,
127 .log = log_fn
128 };
129
130 typedef struct {
131 pmix_list_item_t super;
132 pmix_pdata_t pdata;
133 } pmix_locdat_t;
134 PMIX_CLASS_INSTANCE(pmix_locdat_t,
135 pmix_list_item_t,
136 NULL, NULL);
137
138 typedef struct {
139 pmix_object_t super;
140 mylock_t lock;
141 pmix_event_t ev;
142 pmix_proc_t caller;
143 pmix_info_t *info;
144 size_t ninfo;
145 pmix_op_cbfunc_t cbfunc;
146 pmix_spawn_cbfunc_t spcbfunc;
147 pmix_release_cbfunc_t relcbfunc;
148 void *cbdata;
149 } myxfer_t;
150 static void xfcon(myxfer_t *p)
151 {
152 DEBUG_CONSTRUCT_LOCK(&p->lock);
153 p->info = NULL;
154 p->ninfo = 0;
155 p->cbfunc = NULL;
156 p->spcbfunc = NULL;
157 p->cbdata = NULL;
158 }
159 static void xfdes(myxfer_t *p)
160 {
161 DEBUG_DESTRUCT_LOCK(&p->lock);
162 if (NULL != p->info) {
163 PMIX_INFO_FREE(p->info, p->ninfo);
164 }
165 }
166 PMIX_CLASS_INSTANCE(myxfer_t,
167 pmix_object_t,
168 xfcon, xfdes);
169
170 typedef struct {
171 pmix_list_item_t super;
172 int exit_code;
173 pid_t pid;
174 } wait_tracker_t;
175 PMIX_CLASS_INSTANCE(wait_tracker_t,
176 pmix_list_item_t,
177 NULL, NULL);
178
179 static volatile int wakeup;
180 static int exit_code = 0;
181 static pmix_list_t pubdata;
182 static pmix_event_t handler;
183 static pmix_list_t children;
184 static bool istimeouttest = false;
185
186 static void set_namespace(int nprocs, char *ranks, char *nspace,
187 pmix_op_cbfunc_t cbfunc, myxfer_t *x);
188 static void errhandler(size_t evhdlr_registration_id,
189 pmix_status_t status,
190 const pmix_proc_t *source,
191 pmix_info_t info[], size_t ninfo,
192 pmix_info_t results[], size_t nresults,
193 pmix_event_notification_cbfunc_fn_t cbfunc,
194 void *cbdata);
195 static void wait_signal_callback(int fd, short event, void *arg);
196 static void errhandler_reg_callbk (pmix_status_t status,
197 size_t errhandler_ref,
198 void *cbdata);
199
200 static void opcbfunc(pmix_status_t status, void *cbdata)
201 {
202 myxfer_t *x = (myxfer_t*)cbdata;
203
204
205 if (NULL != x->cbfunc) {
206 x->cbfunc(PMIX_SUCCESS, x->cbdata);
207 }
208 DEBUG_WAKEUP_THREAD(&x->lock);
209 }
210
211 int main(int argc, char **argv)
212 {
213 char **client_env=NULL;
214 char **client_argv=NULL;
215 char *tmp, **atmp, *executable=NULL, *nspace;
216 int rc, nprocs=1, n, k;
217 uid_t myuid;
218 gid_t mygid;
219 pid_t pid;
220 myxfer_t *x;
221 pmix_proc_t proc;
222 wait_tracker_t *child;
223 pmix_info_t *info;
224 size_t ninfo;
225 mylock_t mylock;
226 int ncycles=1, m, delay=0;
227
228
229 if (PMIX_SUCCESS != 0) {
230 fprintf(stderr, "ERROR IN COMPUTING CONSTANTS: PMIX_SUCCESS = %d\n", PMIX_SUCCESS);
231 exit(1);
232 }
233
234 fprintf(stderr, "Testing version %s\n", PMIx_Get_version());
235
236
237
238 for (n=1; n < argc; n++) {
239 if (0 == strcmp("-n", argv[n]) &&
240 NULL != argv[n+1]) {
241 nprocs = strtol(argv[n+1], NULL, 10);
242 ++n;
243 } else if (0 == strcmp("-e", argv[n]) &&
244 NULL != argv[n+1]) {
245 executable = strdup(argv[n+1]);
246
247 if (NULL != strstr(executable, "quietclient")) {
248 istimeouttest = true;
249 }
250 for (k=n+2; NULL != argv[k]; k++) {
251 pmix_argv_append_nosize(&client_argv, argv[k]);
252 }
253 n += k;
254 } else if ((0 == strcmp("-reps", argv[n]) ||
255 0 == strcmp("--reps", argv[n])) &&
256 NULL != argv[n+1]) {
257 ncycles = strtol(argv[n+1], NULL, 10);
258 } else if ((0 == strcmp("-sleep", argv[n]) ||
259 0 == strcmp("--sleep", argv[n])) &&
260 NULL != argv[n+1]) {
261 delay = strtol(argv[n+1], NULL, 10);
262 } else if (0 == strcmp("-h", argv[n])) {
263
264 fprintf(stderr, "usage: simptest <options>\n");
265 fprintf(stderr, " -n N Number of clients to run\n");
266 fprintf(stderr, " -e foo Name of the client executable to run (default: simpclient\n");
267 fprintf(stderr, " -reps N Cycle for N repetitions");
268 exit(0);
269 }
270 }
271 if (NULL == executable) {
272 executable = strdup("./quietclient");
273 }
274
275 ninfo = 3;
276
277 PMIX_INFO_CREATE(info, ninfo);
278 PMIX_INFO_LOAD(&info[0], PMIX_SERVER_TOOL_SUPPORT, NULL, PMIX_BOOL);
279 PMIX_INFO_LOAD(&info[1], PMIX_USOCK_DISABLE, NULL, PMIX_BOOL);
280 PMIX_INFO_LOAD(&info[2], PMIX_SERVER_GATEWAY, NULL, PMIX_BOOL);
281 if (PMIX_SUCCESS != (rc = PMIx_server_init(&mymodule, info, ninfo))) {
282 fprintf(stderr, "Init failed with error %d\n", rc);
283 return rc;
284 }
285 PMIX_INFO_FREE(info, ninfo);
286
287
288 DEBUG_CONSTRUCT_LOCK(&mylock);
289 ninfo = 1;
290 PMIX_INFO_CREATE(info, ninfo);
291 PMIX_INFO_LOAD(&info[0], PMIX_EVENT_HDLR_NAME, "SIMPTEST-DEFAULT", PMIX_STRING);
292 PMIx_Register_event_handler(NULL, 0, info, ninfo,
293 errhandler, errhandler_reg_callbk, (void*)&mylock);
294 DEBUG_WAIT_THREAD(&mylock);
295 PMIX_INFO_FREE(info, ninfo);
296 if (PMIX_SUCCESS != mylock.status) {
297 exit(mylock.status);
298 }
299 DEBUG_DESTRUCT_LOCK(&mylock);
300
301
302 PMIX_CONSTRUCT(&pubdata, pmix_list_t);
303
304
305 PMIX_CONSTRUCT(&children, pmix_list_t);
306 pmix_event_assign(&handler, pmix_globals.evbase, SIGCHLD,
307 EV_SIGNAL|EV_PERSIST,wait_signal_callback, &handler);
308 pmix_event_add(&handler, NULL);
309
310 for (m=0; m < ncycles; m++) {
311 fprintf(stderr, "Running cycle %d\n", m);
312
313 atmp = NULL;
314 for (n=0; n < nprocs; n++) {
315 asprintf(&tmp, "%d", n);
316 pmix_argv_append_nosize(&atmp, tmp);
317 free(tmp);
318 }
319 tmp = pmix_argv_join(atmp, ',');
320 pmix_argv_free(atmp);
321 asprintf(&nspace, "foobar%d", m);
322 (void)strncpy(proc.nspace, nspace, PMIX_MAX_NSLEN);
323 x = PMIX_NEW(myxfer_t);
324 set_namespace(nprocs, tmp, nspace, opcbfunc, x);
325
326
327
328 client_env = pmix_argv_copy(environ);
329 pmix_argv_prepend_nosize(&client_argv, executable);
330
331 wakeup = nprocs;
332 myuid = getuid();
333 mygid = getgid();
334
335
336
337 DEBUG_WAIT_THREAD(&x->lock);
338 free(tmp);
339 free(nspace);
340 PMIX_RELEASE(x);
341
342
343 for (n = 0; n < nprocs; n++) {
344 proc.rank = n;
345 if (PMIX_SUCCESS != (rc = PMIx_server_setup_fork(&proc, &client_env))) {
346 fprintf(stderr, "Server fork setup failed with error %d\n", rc);
347 PMIx_server_finalize();
348 return rc;
349 }
350 x = PMIX_NEW(myxfer_t);
351 if (PMIX_SUCCESS != (rc = PMIx_server_register_client(&proc, myuid, mygid,
352 NULL, opcbfunc, x))) {
353 fprintf(stderr, "Server register client failed with error %d\n", rc);
354 PMIx_server_finalize();
355 return rc;
356 }
357
358
359 DEBUG_WAIT_THREAD(&x->lock);
360 PMIX_RELEASE(x);
361 pid = fork();
362 if (pid < 0) {
363 fprintf(stderr, "Fork failed\n");
364 PMIx_server_finalize();
365 return -1;
366 }
367 child = PMIX_NEW(wait_tracker_t);
368 child->pid = pid;
369 pmix_list_append(&children, &child->super);
370
371 if (pid == 0) {
372 execve(executable, client_argv, client_env);
373
374 exit(0);
375 }
376 }
377 pmix_argv_free(client_argv);
378 client_argv = NULL;
379 pmix_argv_free(client_env);
380 client_env = NULL;
381
382
383 while (0 < wakeup) {
384 struct timespec ts;
385 ts.tv_sec = 0;
386 ts.tv_nsec = 100000;
387 nanosleep(&ts, NULL);
388 }
389
390
391 n=0;
392 PMIX_LIST_FOREACH(child, &children, wait_tracker_t) {
393 if (0 != child->exit_code) {
394 fprintf(stderr, "Child %d exited with status %d - test FAILED\n", n, child->exit_code);
395 goto done;
396 }
397 ++n;
398 }
399
400
401 for (n = 0; n < nprocs; n++) {
402 proc.rank = n;
403 x = PMIX_NEW(myxfer_t);
404 PMIx_server_deregister_client(&proc, opcbfunc, x);
405 DEBUG_WAIT_THREAD(&x->lock);
406 PMIX_RELEASE(x);
407 }
408
409 x = PMIX_NEW(myxfer_t);
410 PMIx_server_deregister_nspace(proc.nspace, opcbfunc, x);
411 DEBUG_WAIT_THREAD(&x->lock);
412 PMIX_RELEASE(x);
413
414 PMIX_LIST_DESTRUCT(&children);
415 PMIX_CONSTRUCT(&children, pmix_list_t);
416
417 sleep(delay);
418 }
419
420 done:
421
422 PMIx_Deregister_event_handler(0, NULL, NULL);
423
424
425 PMIX_LIST_DESTRUCT(&pubdata);
426
427 free(executable);
428
429
430 if (PMIX_SUCCESS != (rc = PMIx_server_finalize())) {
431 fprintf(stderr, "Finalize failed with error %d\n", rc);
432 exit_code = rc;
433 }
434
435 if (0 == exit_code) {
436 fprintf(stderr, "Test finished OK!\n");
437 } else {
438 fprintf(stderr, "TEST FAILED WITH ERROR %d\n", exit_code);
439 }
440
441 return exit_code;
442 }
443
444 static void set_namespace(int nprocs, char *ranks, char *nspace,
445 pmix_op_cbfunc_t cbfunc, myxfer_t *x)
446 {
447 char *regex, *ppn;
448 char hostname[PMIX_MAXHOSTNAMELEN];
449
450 gethostname(hostname, sizeof(hostname));
451 x->ninfo = 7;
452
453 PMIX_INFO_CREATE(x->info, x->ninfo);
454 (void)strncpy(x->info[0].key, PMIX_UNIV_SIZE, PMIX_MAX_KEYLEN);
455 x->info[0].value.type = PMIX_UINT32;
456 x->info[0].value.data.uint32 = nprocs;
457
458 (void)strncpy(x->info[1].key, PMIX_SPAWNED, PMIX_MAX_KEYLEN);
459 x->info[1].value.type = PMIX_UINT32;
460 x->info[1].value.data.uint32 = 0;
461
462 (void)strncpy(x->info[2].key, PMIX_LOCAL_SIZE, PMIX_MAX_KEYLEN);
463 x->info[2].value.type = PMIX_UINT32;
464 x->info[2].value.data.uint32 = nprocs;
465
466 (void)strncpy(x->info[3].key, PMIX_LOCAL_PEERS, PMIX_MAX_KEYLEN);
467 x->info[3].value.type = PMIX_STRING;
468 x->info[3].value.data.string = strdup(ranks);
469
470 PMIx_generate_regex(hostname, ®ex);
471 (void)strncpy(x->info[4].key, PMIX_NODE_MAP, PMIX_MAX_KEYLEN);
472 x->info[4].value.type = PMIX_STRING;
473 x->info[4].value.data.string = regex;
474
475 PMIx_generate_ppn(ranks, &ppn);
476 (void)strncpy(x->info[5].key, PMIX_PROC_MAP, PMIX_MAX_KEYLEN);
477 x->info[5].value.type = PMIX_STRING;
478 x->info[5].value.data.string = ppn;
479
480 (void)strncpy(x->info[6].key, PMIX_JOB_SIZE, PMIX_MAX_KEYLEN);
481 x->info[6].value.type = PMIX_UINT32;
482 x->info[6].value.data.uint32 = nprocs;
483
484 PMIx_server_register_nspace(nspace, nprocs, x->info, x->ninfo,
485 cbfunc, x);
486 }
487
488 static void errhandler(size_t evhdlr_registration_id,
489 pmix_status_t status,
490 const pmix_proc_t *source,
491 pmix_info_t info[], size_t ninfo,
492 pmix_info_t results[], size_t nresults,
493 pmix_event_notification_cbfunc_fn_t cbfunc,
494 void *cbdata)
495 {
496 return;
497 }
498
499 static void errhandler_reg_callbk (pmix_status_t status,
500 size_t errhandler_ref,
501 void *cbdata)
502 {
503 mylock_t *lock = (mylock_t*)cbdata;
504
505 lock->status = status;
506 DEBUG_WAKEUP_THREAD(lock);
507 }
508
509 static pmix_status_t connected(const pmix_proc_t *proc, void *server_object,
510 pmix_op_cbfunc_t cbfunc, void *cbdata)
511 {
512 if (NULL != cbfunc) {
513 cbfunc(PMIX_SUCCESS, cbdata);
514 }
515 return PMIX_SUCCESS;
516 }
517 static pmix_status_t finalized(const pmix_proc_t *proc, void *server_object,
518 pmix_op_cbfunc_t cbfunc, void *cbdata)
519 {
520
521 if (NULL != cbfunc) {
522 cbfunc(PMIX_SUCCESS, cbdata);
523 }
524 return PMIX_SUCCESS;
525 }
526
527 static void abcbfunc(pmix_status_t status, void *cbdata)
528 {
529 myxfer_t *x = (myxfer_t*)cbdata;
530
531
532 if (NULL != x->cbfunc) {
533 x->cbfunc(status, x->cbdata);
534 }
535 PMIX_RELEASE(x);
536 }
537
538 static pmix_status_t abort_fn(const pmix_proc_t *proc,
539 void *server_object,
540 int status, const char msg[],
541 pmix_proc_t procs[], size_t nprocs,
542 pmix_op_cbfunc_t cbfunc, void *cbdata)
543 {
544 pmix_status_t rc;
545 myxfer_t *x;
546
547
548
549
550
551
552 x = PMIX_NEW(myxfer_t);
553 (void)strncpy(x->caller.nspace, proc->nspace, PMIX_MAX_NSLEN);
554 x->caller.rank = proc->rank;
555
556 PMIX_INFO_CREATE(x->info, 2);
557 (void)strncpy(x->info[0].key, "DARTH", PMIX_MAX_KEYLEN);
558 x->info[0].value.type = PMIX_INT8;
559 x->info[0].value.data.int8 = 12;
560 (void)strncpy(x->info[1].key, "VADER", PMIX_MAX_KEYLEN);
561 x->info[1].value.type = PMIX_DOUBLE;
562 x->info[1].value.data.dval = 12.34;
563 x->cbfunc = cbfunc;
564 x->cbdata = cbdata;
565
566 if (PMIX_SUCCESS != (rc = PMIx_Notify_event(status, &x->caller,
567 PMIX_RANGE_NAMESPACE,
568 x->info, 2,
569 abcbfunc, x))) {
570 pmix_output(0, "SERVER: FAILED NOTIFY ERROR %d", (int)rc);
571 }
572
573 return PMIX_SUCCESS;
574 }
575
576
577 static pmix_status_t fencenb_fn(const pmix_proc_t procs[], size_t nprocs,
578 const pmix_info_t info[], size_t ninfo,
579 char *data, size_t ndata,
580 pmix_modex_cbfunc_t cbfunc, void *cbdata)
581 {
582
583 if (NULL != cbfunc) {
584 cbfunc(PMIX_SUCCESS, data, ndata, cbdata, free, data);
585 }
586 return PMIX_SUCCESS;
587 }
588
589
590 static pmix_status_t dmodex_fn(const pmix_proc_t *proc,
591 const pmix_info_t info[], size_t ninfo,
592 pmix_modex_cbfunc_t cbfunc, void *cbdata)
593 {
594
595 if (istimeouttest) {
596 return PMIX_SUCCESS;
597 }
598
599
600
601 if (NULL != cbfunc) {
602 cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, cbdata, NULL, NULL);
603 }
604 return PMIX_SUCCESS;
605 }
606
607
608 static pmix_status_t publish_fn(const pmix_proc_t *proc,
609 const pmix_info_t info[], size_t ninfo,
610 pmix_op_cbfunc_t cbfunc, void *cbdata)
611 {
612 pmix_locdat_t *p;
613 size_t n;
614
615 for (n=0; n < ninfo; n++) {
616 p = PMIX_NEW(pmix_locdat_t);
617 (void)strncpy(p->pdata.proc.nspace, proc->nspace, PMIX_MAX_NSLEN);
618 p->pdata.proc.rank = proc->rank;
619 (void)strncpy(p->pdata.key, info[n].key, PMIX_MAX_KEYLEN);
620 pmix_value_xfer(&p->pdata.value, (pmix_value_t*)&info[n].value);
621 pmix_list_append(&pubdata, &p->super);
622 }
623 if (NULL != cbfunc) {
624 cbfunc(PMIX_SUCCESS, cbdata);
625 }
626 return PMIX_SUCCESS;
627 }
628
629
630 static pmix_status_t lookup_fn(const pmix_proc_t *proc, char **keys,
631 const pmix_info_t info[], size_t ninfo,
632 pmix_lookup_cbfunc_t cbfunc, void *cbdata)
633 {
634 pmix_locdat_t *p, *p2;
635 pmix_list_t results;
636 size_t i, n;
637 pmix_pdata_t *pd = NULL;
638 pmix_status_t ret = PMIX_ERR_NOT_FOUND;
639
640 PMIX_CONSTRUCT(&results, pmix_list_t);
641
642 for (n=0; NULL != keys[n]; n++) {
643 PMIX_LIST_FOREACH(p, &pubdata, pmix_locdat_t) {
644 if (0 == strncmp(keys[n], p->pdata.key, PMIX_MAX_KEYLEN)) {
645 p2 = PMIX_NEW(pmix_locdat_t);
646 (void)strncpy(p2->pdata.proc.nspace, p->pdata.proc.nspace, PMIX_MAX_NSLEN);
647 p2->pdata.proc.rank = p->pdata.proc.rank;
648 (void)strncpy(p2->pdata.key, p->pdata.key, PMIX_MAX_KEYLEN);
649 pmix_value_xfer(&p2->pdata.value, &p->pdata.value);
650 pmix_list_append(&results, &p2->super);
651 break;
652 }
653 }
654 }
655 if (0 < (n = pmix_list_get_size(&results))) {
656 ret = PMIX_SUCCESS;
657 PMIX_PDATA_CREATE(pd, n);
658 for (i=0; i < n; i++) {
659 p = (pmix_locdat_t*)pmix_list_remove_first(&results);
660 if (p) {
661 (void)strncpy(pd[i].proc.nspace, p->pdata.proc.nspace, PMIX_MAX_NSLEN);
662 pd[i].proc.rank = p->pdata.proc.rank;
663 (void)strncpy(pd[i].key, p->pdata.key, PMIX_MAX_KEYLEN);
664 pmix_value_xfer(&pd[i].value, &p->pdata.value);
665 }
666 }
667 }
668 PMIX_LIST_DESTRUCT(&results);
669 if (NULL != cbfunc) {
670 cbfunc(ret, pd, n, cbdata);
671 }
672 if (0 < n) {
673 PMIX_PDATA_FREE(pd, n);
674 }
675 return PMIX_SUCCESS;
676 }
677
678
679 static pmix_status_t unpublish_fn(const pmix_proc_t *proc, char **keys,
680 const pmix_info_t info[], size_t ninfo,
681 pmix_op_cbfunc_t cbfunc, void *cbdata)
682 {
683 pmix_locdat_t *p, *p2;
684 size_t n;
685
686 for (n=0; NULL != keys[n]; n++) {
687 PMIX_LIST_FOREACH_SAFE(p, p2, &pubdata, pmix_locdat_t) {
688 if (0 == strncmp(keys[n], p->pdata.key, PMIX_MAX_KEYLEN)) {
689 pmix_list_remove_item(&pubdata, &p->super);
690 PMIX_RELEASE(p);
691 break;
692 }
693 }
694 }
695 if (NULL != cbfunc) {
696 cbfunc(PMIX_SUCCESS, cbdata);
697 }
698 return PMIX_SUCCESS;
699 }
700
701 static void spcbfunc(pmix_status_t status, void *cbdata)
702 {
703 myxfer_t *x = (myxfer_t*)cbdata;
704
705 if (NULL != x->spcbfunc) {
706 x->spcbfunc(PMIX_SUCCESS, "DYNSPACE", x->cbdata);
707 }
708 }
709
710 static pmix_status_t spawn_fn(const pmix_proc_t *proc,
711 const pmix_info_t job_info[], size_t ninfo,
712 const pmix_app_t apps[], size_t napps,
713 pmix_spawn_cbfunc_t cbfunc, void *cbdata)
714 {
715 myxfer_t *x;
716
717
718
719
720
721
722
723
724 x = PMIX_NEW(myxfer_t);
725 x->spcbfunc = cbfunc;
726 x->cbdata = cbdata;
727
728 set_namespace(2, "0,1", "DYNSPACE", spcbfunc, x);
729
730 return PMIX_SUCCESS;
731 }
732
733 static int numconnects = 0;
734
735 static pmix_status_t connect_fn(const pmix_proc_t procs[], size_t nprocs,
736 const pmix_info_t info[], size_t ninfo,
737 pmix_op_cbfunc_t cbfunc, void *cbdata)
738 {
739
740
741
742
743 numconnects++;
744
745 if (NULL != cbfunc) {
746 cbfunc(PMIX_SUCCESS, cbdata);
747 }
748
749 return PMIX_SUCCESS;
750 }
751
752
753 static pmix_status_t disconnect_fn(const pmix_proc_t procs[], size_t nprocs,
754 const pmix_info_t info[], size_t ninfo,
755 pmix_op_cbfunc_t cbfunc, void *cbdata)
756 {
757
758
759
760
761 if (NULL != cbfunc) {
762 cbfunc(PMIX_SUCCESS, cbdata);
763 }
764
765 return PMIX_SUCCESS;
766 }
767
768 static pmix_status_t register_event_fn(pmix_status_t *codes, size_t ncodes,
769 const pmix_info_t info[], size_t ninfo,
770 pmix_op_cbfunc_t cbfunc, void *cbdata)
771 {
772 if (NULL != cbfunc) {
773 cbfunc(PMIX_SUCCESS, cbdata);
774 }
775 return PMIX_SUCCESS;
776 }
777
778 static pmix_status_t deregister_events(pmix_status_t *codes, size_t ncodes,
779 pmix_op_cbfunc_t cbfunc, void *cbdata)
780 {
781 return PMIX_SUCCESS;
782 }
783
784 static pmix_status_t notify_event(pmix_status_t code,
785 const pmix_proc_t *source,
786 pmix_data_range_t range,
787 pmix_info_t info[], size_t ninfo,
788 pmix_op_cbfunc_t cbfunc, void *cbdata)
789 {
790 return PMIX_SUCCESS;
791 }
792
793 typedef struct query_data_t {
794 pmix_info_t *data;
795 size_t ndata;
796 } query_data_t;
797
798 static pmix_status_t query_fn(pmix_proc_t *proct,
799 pmix_query_t *queries, size_t nqueries,
800 pmix_info_cbfunc_t cbfunc,
801 void *cbdata)
802 {
803 size_t n;
804 pmix_info_t *info;
805
806 if (NULL == cbfunc) {
807 return PMIX_ERROR;
808 }
809
810 PMIX_INFO_CREATE(info, nqueries);
811 for (n=0; n < nqueries; n++) {
812 (void)strncpy(info[n].key, queries[n].keys[0], PMIX_MAX_KEYLEN);
813 info[n].value.type = PMIX_STRING;
814 if (0 > asprintf(&info[n].value.data.string, "%d", (int)n)) {
815 return PMIX_ERROR;
816 }
817 }
818 cbfunc(PMIX_SUCCESS, info, nqueries, cbdata, NULL, NULL);
819 return PMIX_SUCCESS;
820 }
821
822 static void tool_connect_fn(pmix_info_t *info, size_t ninfo,
823 pmix_tool_connection_cbfunc_t cbfunc,
824 void *cbdata)
825 {
826 pmix_proc_t proc;
827
828
829 (void)strncpy(proc.nspace, "TOOL", PMIX_MAX_NSLEN);
830 proc.rank = 0;
831
832 if (NULL != cbfunc) {
833 cbfunc(PMIX_SUCCESS, &proc, cbdata);
834 }
835 }
836
837 static void log_fn(const pmix_proc_t *client,
838 const pmix_info_t data[], size_t ndata,
839 const pmix_info_t directives[], size_t ndirs,
840 pmix_op_cbfunc_t cbfunc, void *cbdata)
841 {
842 if (NULL != cbfunc) {
843 cbfunc(PMIX_SUCCESS, cbdata);
844 }
845 }
846
847 static void wait_signal_callback(int fd, short event, void *arg)
848 {
849 pmix_event_t *sig = (pmix_event_t*) arg;
850 int status;
851 pid_t pid;
852 wait_tracker_t *t2;
853
854 if (SIGCHLD != pmix_event_get_signal(sig)) {
855 return;
856 }
857
858
859
860
861 while (1) {
862 pid = waitpid(-1, &status, WNOHANG);
863 if (-1 == pid && EINTR == errno) {
864
865 continue;
866 }
867
868 if (pid <= 0) {
869 return;
870 }
871
872
873 PMIX_LIST_FOREACH(t2, &children, wait_tracker_t) {
874 if (pid == t2->pid) {
875 t2->exit_code = status;
876
877 if (0 != status && 0 == exit_code) {
878 exit_code = status;
879 }
880 --wakeup;
881 break;
882 }
883 }
884 }
885 }