This source file includes following definitions.
- xfcon
- xfdes
- opcbfunc
- dlcbfunc
- infocbfunc
- model_callback
- model_registration_callback
- set_handler_default
- main
- set_namespace
- errhandler
- errhandler_reg_callbk
- connected
- finalized
- abcbfunc
- abort_fn
- fencbfn
- fencenb_fn
- dmodex_fn
- publish_fn
- lkcbfn
- lookup_fn
- unpublish_fn
- spcbfunc
- spawn_fn
- connect_fn
- disconnect_fn
- register_event_fn
- deregister_events
- notify_event
- qfn
- query_fn
- tool_connect_fn
- foobar
- log_fn
- alloc_fn
- jctrl_fn
- mon_fn
- wait_signal_callback
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 #include <src/include/pmix_config.h>
29 #include <pmix_server.h>
30 #include <src/include/types.h>
31 #include <src/include/pmix_globals.h>
32
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <unistd.h>
36 #include <time.h>
37 #include <sys/types.h>
38 #include <sys/wait.h>
39 #include <errno.h>
40 #include <signal.h>
41
42 #if PMIX_HAVE_HWLOC
43 #include <src/hwloc/hwloc-internal.h>
44 #endif
45
46 #include "src/class/pmix_list.h"
47 #include "src/util/pmix_environ.h"
48 #include "src/util/output.h"
49 #include "src/util/printf.h"
50 #include "src/util/argv.h"
51
52 #include "simptest.h"
53
54 static pmix_status_t connected(const pmix_proc_t *proc, void *server_object,
55 pmix_op_cbfunc_t cbfunc, void *cbdata);
56 static pmix_status_t finalized(const pmix_proc_t *proc, void *server_object,
57 pmix_op_cbfunc_t cbfunc, void *cbdata);
58 static pmix_status_t abort_fn(const pmix_proc_t *proc, void *server_object,
59 int status, const char msg[],
60 pmix_proc_t procs[], size_t nprocs,
61 pmix_op_cbfunc_t cbfunc, void *cbdata);
62 static pmix_status_t fencenb_fn(const pmix_proc_t procs[], size_t nprocs,
63 const pmix_info_t info[], size_t ninfo,
64 char *data, size_t ndata,
65 pmix_modex_cbfunc_t cbfunc, void *cbdata);
66 static pmix_status_t dmodex_fn(const pmix_proc_t *proc,
67 const pmix_info_t info[], size_t ninfo,
68 pmix_modex_cbfunc_t cbfunc, void *cbdata);
69 static pmix_status_t publish_fn(const pmix_proc_t *proc,
70 const pmix_info_t info[], size_t ninfo,
71 pmix_op_cbfunc_t cbfunc, void *cbdata);
72 static pmix_status_t lookup_fn(const pmix_proc_t *proc, char **keys,
73 const pmix_info_t info[], size_t ninfo,
74 pmix_lookup_cbfunc_t cbfunc, void *cbdata);
75 static pmix_status_t unpublish_fn(const pmix_proc_t *proc, char **keys,
76 const pmix_info_t info[], size_t ninfo,
77 pmix_op_cbfunc_t cbfunc, void *cbdata);
78 static pmix_status_t spawn_fn(const pmix_proc_t *proc,
79 const pmix_info_t job_info[], size_t ninfo,
80 const pmix_app_t apps[], size_t napps,
81 pmix_spawn_cbfunc_t cbfunc, void *cbdata);
82 static pmix_status_t connect_fn(const pmix_proc_t procs[], size_t nprocs,
83 const pmix_info_t info[], size_t ninfo,
84 pmix_op_cbfunc_t cbfunc, void *cbdata);
85 static pmix_status_t disconnect_fn(const pmix_proc_t procs[], size_t nprocs,
86 const pmix_info_t info[], size_t ninfo,
87 pmix_op_cbfunc_t cbfunc, void *cbdata);
88 static pmix_status_t register_event_fn(pmix_status_t *codes, size_t ncodes,
89 const pmix_info_t info[], size_t ninfo,
90 pmix_op_cbfunc_t cbfunc, void *cbdata);
91 static pmix_status_t deregister_events(pmix_status_t *codes, size_t ncodes,
92 pmix_op_cbfunc_t cbfunc, void *cbdata);
93 static pmix_status_t notify_event(pmix_status_t code,
94 const pmix_proc_t *source,
95 pmix_data_range_t range,
96 pmix_info_t info[], size_t ninfo,
97 pmix_op_cbfunc_t cbfunc, void *cbdata);
98 static pmix_status_t query_fn(pmix_proc_t *proct,
99 pmix_query_t *queries, size_t nqueries,
100 pmix_info_cbfunc_t cbfunc,
101 void *cbdata);
102 static void tool_connect_fn(pmix_info_t *info, size_t ninfo,
103 pmix_tool_connection_cbfunc_t cbfunc,
104 void *cbdata);
105 static void log_fn(const pmix_proc_t *client,
106 const pmix_info_t data[], size_t ndata,
107 const pmix_info_t directives[], size_t ndirs,
108 pmix_op_cbfunc_t cbfunc, void *cbdata);
109 static pmix_status_t alloc_fn(const pmix_proc_t *client,
110 pmix_alloc_directive_t directive,
111 const pmix_info_t data[], size_t ndata,
112 pmix_info_cbfunc_t cbfunc, void *cbdata);
113 static pmix_status_t jctrl_fn(const pmix_proc_t *requestor,
114 const pmix_proc_t targets[], size_t ntargets,
115 const pmix_info_t directives[], size_t ndirs,
116 pmix_info_cbfunc_t cbfunc, void *cbdata);
117 static pmix_status_t mon_fn(const pmix_proc_t *requestor,
118 const pmix_info_t *monitor, pmix_status_t error,
119 const pmix_info_t directives[], size_t ndirs,
120 pmix_info_cbfunc_t cbfunc, void *cbdata);
121
122 static pmix_server_module_t mymodule = {
123 .client_connected = connected,
124 .client_finalized = finalized,
125 .abort = abort_fn,
126 .fence_nb = fencenb_fn,
127 .direct_modex = dmodex_fn,
128 .publish = publish_fn,
129 .lookup = lookup_fn,
130 .unpublish = unpublish_fn,
131 .spawn = spawn_fn,
132 .connect = connect_fn,
133 .disconnect = disconnect_fn,
134 .register_events = register_event_fn,
135 .deregister_events = deregister_events,
136 .notify_event = notify_event,
137 .query = query_fn,
138 .tool_connected = tool_connect_fn,
139 .log = log_fn,
140 .allocate = alloc_fn,
141 .job_control = jctrl_fn,
142 .monitor = mon_fn
143 };
144
145 typedef struct {
146 pmix_list_item_t super;
147 pmix_pdata_t pdata;
148 } pmix_locdat_t;
149 PMIX_CLASS_INSTANCE(pmix_locdat_t,
150 pmix_list_item_t,
151 NULL, NULL);
152
153 typedef struct {
154 pmix_object_t super;
155 mylock_t lock;
156 pmix_event_t ev;
157 pmix_proc_t caller;
158 pmix_info_t *info;
159 size_t ninfo;
160 pmix_op_cbfunc_t cbfunc;
161 pmix_spawn_cbfunc_t spcbfunc;
162 pmix_release_cbfunc_t relcbfunc;
163 void *cbdata;
164 } myxfer_t;
165 static void xfcon(myxfer_t *p)
166 {
167 DEBUG_CONSTRUCT_LOCK(&p->lock);
168 p->info = NULL;
169 p->ninfo = 0;
170 p->cbfunc = NULL;
171 p->spcbfunc = NULL;
172 p->cbdata = NULL;
173 }
174 static void xfdes(myxfer_t *p)
175 {
176 DEBUG_DESTRUCT_LOCK(&p->lock);
177 if (NULL != p->info) {
178 PMIX_INFO_FREE(p->info, p->ninfo);
179 }
180 }
181 PMIX_CLASS_INSTANCE(myxfer_t,
182 pmix_object_t,
183 xfcon, xfdes);
184
185 typedef struct {
186 pmix_list_item_t super;
187 int exit_code;
188 pid_t pid;
189 } wait_tracker_t;
190 PMIX_CLASS_INSTANCE(wait_tracker_t,
191 pmix_list_item_t,
192 NULL, NULL);
193
194 static volatile int wakeup;
195 static int exit_code = 0;
196 static pmix_list_t pubdata;
197 static pmix_event_t handler;
198 static pmix_list_t children;
199 static bool istimeouttest = false;
200 static mylock_t globallock;
201
202 static void set_namespace(int nprocs, char *ranks, char *nspace,
203 pmix_op_cbfunc_t cbfunc, myxfer_t *x);
204 static void errhandler(size_t evhdlr_registration_id,
205 pmix_status_t status,
206 const pmix_proc_t *source,
207 pmix_info_t info[], size_t ninfo,
208 pmix_info_t results[], size_t nresults,
209 pmix_event_notification_cbfunc_fn_t cbfunc,
210 void *cbdata);
211 static void wait_signal_callback(int fd, short event, void *arg);
212 static void errhandler_reg_callbk (pmix_status_t status,
213 size_t errhandler_ref,
214 void *cbdata);
215
216 static void opcbfunc(pmix_status_t status, void *cbdata)
217 {
218 myxfer_t *x = (myxfer_t*)cbdata;
219
220
221 if (NULL != x->cbfunc) {
222 x->cbfunc(PMIX_SUCCESS, x->cbdata);
223 }
224 DEBUG_WAKEUP_THREAD(&x->lock);
225 }
226
227
228 static void dlcbfunc(int sd, short flags, void *cbdata)
229 {
230 myxfer_t *x = (myxfer_t*)cbdata;
231
232 pmix_output(0, "INVENTORY READY FOR DELIVERY");
233
234 PMIx_server_deliver_inventory(x->info, x->ninfo, NULL, 0, opcbfunc, (void*)x);
235 }
236
237 static void infocbfunc(pmix_status_t status,
238 pmix_info_t *info, size_t ninfo,
239 void *cbdata,
240 pmix_release_cbfunc_t release_fn,
241 void *release_cbdata)
242 {
243 mylock_t *lock = (mylock_t*)cbdata;
244 myxfer_t *x;
245 size_t n;
246
247 pmix_output(0, "INVENTORY RECEIVED");
248
249
250
251
252
253
254 x = PMIX_NEW(myxfer_t);
255 x->ninfo = ninfo;
256 PMIX_INFO_CREATE(x->info, x->ninfo);
257 for (n=0; n < ninfo; n++) {
258 PMIX_INFO_XFER(&x->info[n], &info[n]);
259 }
260 PMIX_THREADSHIFT(x, dlcbfunc);
261
262 if (NULL != release_fn) {
263 release_fn(release_cbdata);
264 }
265 lock->status = status;
266 DEBUG_WAKEUP_THREAD(lock);
267 }
268
269
270
271
272
273
274
275
276 static void model_callback(size_t evhdlr_registration_id,
277 pmix_status_t status,
278 const pmix_proc_t *source,
279 pmix_info_t info[], size_t ninfo,
280 pmix_info_t results[], size_t nresults,
281 pmix_event_notification_cbfunc_fn_t cbfunc,
282 void *cbdata)
283 {
284 size_t n;
285
286
287 fprintf(stderr, "SIMPTEST: Model event handler called with status %d(%s)\n",
288 status, PMIx_Error_string(status));
289 for (n=0; n < ninfo; n++) {
290 if (PMIX_STRING == info[n].value.type) {
291 fprintf(stderr, "\t%s:\t%s\n", info[n].key, info[n].value.data.string);
292 }
293 }
294
295
296
297
298 if (NULL != cbfunc) {
299 cbfunc(PMIX_SUCCESS, NULL, 0, NULL, NULL, cbdata);
300 }
301 DEBUG_WAKEUP_THREAD(&globallock);
302 }
303
304
305 static void model_registration_callback(pmix_status_t status,
306 size_t evhandler_ref,
307 void *cbdata)
308 {
309 mylock_t *lock = (mylock_t*)cbdata;
310
311 if (PMIX_SUCCESS != status) {
312 fprintf(stderr, "simptest EVENT HANDLER REGISTRATION FAILED WITH STATUS %d, ref=%lu\n",
313 status, (unsigned long)evhandler_ref);
314 }
315 lock->status = status;
316 DEBUG_WAKEUP_THREAD(lock);
317 }
318
319 static void set_handler_default(int sig)
320 {
321 struct sigaction act;
322
323 act.sa_handler = SIG_DFL;
324 act.sa_flags = 0;
325 sigemptyset(&act.sa_mask);
326
327 sigaction(sig, &act, (struct sigaction *)0);
328 }
329
330 int main(int argc, char **argv)
331 {
332 char **client_env=NULL;
333 char **client_argv=NULL;
334 char *tmp, **atmp, *executable=NULL;
335 int rc, nprocs=1, n, k;
336 uid_t myuid;
337 gid_t mygid;
338 pid_t pid;
339 myxfer_t *x;
340 pmix_proc_t proc;
341 wait_tracker_t *child;
342 pmix_info_t *info;
343 size_t ninfo;
344 bool cross_version = false;
345 bool usock = true;
346 bool hwloc = false;
347 #if PMIX_HAVE_HWLOC
348 char *hwloc_file = NULL;
349 #endif
350 mylock_t mylock;
351 pmix_status_t code;
352 sigset_t unblock;
353
354
355 if (PMIX_SUCCESS != 0) {
356 fprintf(stderr, "ERROR IN COMPUTING CONSTANTS: PMIX_SUCCESS = %d\n", PMIX_SUCCESS);
357 exit(1);
358 }
359
360
361
362 for (n=1; n < argc; n++) {
363 if (0 == strcmp("-n", argv[n]) &&
364 NULL != argv[n+1]) {
365 nprocs = strtol(argv[n+1], NULL, 10);
366 ++n;
367 } else if (0 == strcmp("-e", argv[n]) &&
368 NULL != argv[n+1]) {
369 executable = strdup(argv[n+1]);
370
371 if (NULL != strstr(executable, "simptimeout")) {
372 istimeouttest = true;
373 }
374 for (k=n+2; NULL != argv[k]; k++) {
375 pmix_argv_append_nosize(&client_argv, argv[k]);
376 }
377 n += k;
378 } else if (0 == strcmp("-x", argv[n])) {
379
380
381 cross_version = true;
382 usock = false;
383 } else if (0 == strcmp("-u", argv[n])) {
384
385 usock = false;
386 #if PMIX_HAVE_HWLOC
387 } else if (0 == strcmp("-hwloc", argv[n]) ||
388 0 == strcmp("--hwloc", argv[n])) {
389
390 hwloc = true;
391 } else if (0 == strcmp("-hwloc-file", argv[n]) ||
392 0 == strcmp("--hwloc-file", argv[n])) {
393 if (NULL == argv[n+1]) {
394 fprintf(stderr, "The --hwloc-file option requires an argument\n");
395 exit(1);
396 }
397 hwloc_file = strdup(argv[n+1]);
398 hwloc = true;
399 ++n;
400 #endif
401 } else if (0 == strcmp("-h", argv[n])) {
402
403 fprintf(stderr, "usage: simptest <options>\n");
404 fprintf(stderr, " -n N Number of clients to run\n");
405 fprintf(stderr, " -e foo Name of the client executable to run (default: simpclient\n");
406 fprintf(stderr, " -x Test cross-version support\n");
407 fprintf(stderr, " -u Enable legacy usock support\n");
408 fprintf(stderr, " -hwloc Test hwloc support\n");
409 fprintf(stderr, " -hwloc-file FILE Use file to import topology\n");
410 exit(0);
411 }
412 }
413 if (NULL == executable) {
414 executable = strdup("./simpclient");
415 }
416 if (cross_version && nprocs < 2) {
417 fprintf(stderr, "Cross-version testing requires at least two clients\n");
418 exit(1);
419 }
420
421 #if !PMIX_HAVE_HWLOC
422 if (hwloc) {
423 fprintf(stderr, "PMIx was not configured with HWLOC support - cannot continue\n");
424 exit(1);
425 }
426 #endif
427
428 fprintf(stderr, "Testing version %s\n", PMIx_Get_version());
429
430
431 if (0 != sigemptyset(&unblock)) {
432 fprintf(stderr, "SIGEMPTYSET FAILED\n");
433 exit(1);
434 }
435 if (0 != sigaddset(&unblock, SIGCHLD)) {
436 fprintf(stderr, "SIGADDSET FAILED\n");
437 exit(1);
438 }
439 if (0 != sigprocmask(SIG_UNBLOCK, &unblock, NULL)) {
440 fprintf(stderr, "SIG_UNBLOCK FAILED\n");
441 exit(1);
442 }
443
444
445
446 #if PMIX_HAVE_HWLOC
447 if (hwloc) {
448 #if HWLOC_API_VERSION < 0x20000
449 ninfo = 3;
450 #else
451 ninfo = 4;
452 #endif
453 } else {
454 ninfo = 2;
455 }
456 #else
457 ninfo = 2;
458 #endif
459
460 PMIX_INFO_CREATE(info, ninfo);
461 PMIX_INFO_LOAD(&info[0], PMIX_SERVER_TOOL_SUPPORT, NULL, PMIX_BOOL);
462 PMIX_INFO_LOAD(&info[1], PMIX_SERVER_GATEWAY, NULL, PMIX_BOOL);
463 #if PMIX_HAVE_HWLOC
464 if (hwloc) {
465 if (NULL != hwloc_file) {
466 PMIX_INFO_LOAD(&info[2], PMIX_TOPOLOGY_FILE, hwloc_file, PMIX_STRING);
467 } else {
468 PMIX_INFO_LOAD(&info[2], PMIX_TOPOLOGY, NULL, PMIX_STRING);
469 }
470 #if HWLOC_API_VERSION >= 0x20000
471 PMIX_INFO_LOAD(&info[3], PMIX_HWLOC_SHARE_TOPO, NULL, PMIX_BOOL);
472 #endif
473 }
474 #endif
475 if (PMIX_SUCCESS != (rc = PMIx_server_init(&mymodule, info, ninfo))) {
476 fprintf(stderr, "Init failed with error %d\n", rc);
477 return rc;
478 }
479 PMIX_INFO_FREE(info, ninfo);
480
481
482 DEBUG_CONSTRUCT_LOCK(&mylock);
483 ninfo = 1;
484 PMIX_INFO_CREATE(info, ninfo);
485 PMIX_INFO_LOAD(&info[0], PMIX_EVENT_HDLR_NAME, "SIMPTEST-DEFAULT", PMIX_STRING);
486 PMIx_Register_event_handler(NULL, 0, info, ninfo,
487 errhandler, errhandler_reg_callbk, (void*)&mylock);
488 DEBUG_WAIT_THREAD(&mylock);
489 PMIX_INFO_FREE(info, ninfo);
490 if (PMIX_SUCCESS != mylock.status) {
491 exit(mylock.status);
492 }
493 DEBUG_DESTRUCT_LOCK(&mylock);
494
495
496 DEBUG_CONSTRUCT_LOCK(&mylock);
497 ninfo = 1;
498 PMIX_INFO_CREATE(info, ninfo);
499 PMIX_INFO_LOAD(&info[0], PMIX_EVENT_HDLR_NAME, "SIMPTEST-MODEL", PMIX_STRING);
500 code = PMIX_MODEL_DECLARED;
501 PMIx_Register_event_handler(&code, 1, info, ninfo,
502 model_callback, model_registration_callback, (void*)&mylock);
503 DEBUG_WAIT_THREAD(&mylock);
504 PMIX_INFO_FREE(info, ninfo);
505 if (PMIX_SUCCESS != mylock.status) {
506 exit(mylock.status);
507 }
508 DEBUG_DESTRUCT_LOCK(&mylock);
509
510
511 PMIX_CONSTRUCT(&pubdata, pmix_list_t);
512
513
514 PMIX_CONSTRUCT(&children, pmix_list_t);
515 pmix_event_assign(&handler, pmix_globals.evbase, SIGCHLD,
516 EV_SIGNAL|EV_PERSIST,wait_signal_callback, &handler);
517 pmix_event_add(&handler, NULL);
518
519
520 atmp = NULL;
521 for (n=0; n < nprocs; n++) {
522 asprintf(&tmp, "%d", n);
523 pmix_argv_append_nosize(&atmp, tmp);
524 free(tmp);
525 }
526 tmp = pmix_argv_join(atmp, ',');
527 pmix_argv_free(atmp);
528 x = PMIX_NEW(myxfer_t);
529 set_namespace(nprocs, tmp, "foobar", opcbfunc, x);
530
531
532 client_env = pmix_argv_copy(environ);
533 pmix_argv_prepend_nosize(&client_argv, executable);
534
535 wakeup = nprocs;
536 myuid = getuid();
537 mygid = getgid();
538
539
540 DEBUG_CONSTRUCT_LOCK(&mylock);
541 fprintf(stderr, "Collecting inventory\n");
542 if (PMIX_SUCCESS != (rc = PMIx_server_collect_inventory(NULL, 0, infocbfunc, (void*)&mylock))) {
543 fprintf(stderr, "Collect inventory failed: %d\n", rc);
544 DEBUG_DESTRUCT_LOCK(&mylock);
545 goto done;
546 }
547 DEBUG_WAIT_THREAD(&mylock);
548 fprintf(stderr, "Inventory collected: %d\n", mylock.status);
549 if (PMIX_SUCCESS != mylock.status) {
550 exit(mylock.status);
551 }
552 DEBUG_DESTRUCT_LOCK(&mylock);
553
554
555
556 DEBUG_WAIT_THREAD(&x->lock);
557 free(tmp);
558 PMIX_RELEASE(x);
559
560
561 (void)strncpy(proc.nspace, "foobar", PMIX_MAX_NSLEN);
562 for (n = 0; n < nprocs; n++) {
563 proc.rank = n;
564 if (PMIX_SUCCESS != (rc = PMIx_server_setup_fork(&proc, &client_env))) {
565 fprintf(stderr, "Server fork setup failed with error %d\n", rc);
566 PMIx_server_finalize();
567 return rc;
568 }
569
570
571 if (cross_version) {
572 if (0 == n % 2) {
573 pmix_setenv("PMIX_MCA_ptl", "tcp", true, &client_env);
574 } else {
575 pmix_setenv("PMIX_MCA_ptl", "usock", true, &client_env);
576 }
577 } else if (!usock) {
578
579 pmix_setenv("PMIX_MCA_ptl", "usock", true, &client_env);
580 }
581 x = PMIX_NEW(myxfer_t);
582 if (PMIX_SUCCESS != (rc = PMIx_server_register_client(&proc, myuid, mygid,
583 NULL, opcbfunc, x))) {
584 fprintf(stderr, "Server register client failed with error %d\n", rc);
585 PMIx_server_finalize();
586 return rc;
587 }
588
589
590 DEBUG_WAIT_THREAD(&x->lock);
591 PMIX_RELEASE(x);
592 pid = fork();
593 if (pid < 0) {
594 fprintf(stderr, "Fork failed\n");
595 PMIx_server_finalize();
596 return -1;
597 }
598 if (pid == 0) {
599 sigset_t sigs;
600 set_handler_default(SIGTERM);
601 set_handler_default(SIGINT);
602 set_handler_default(SIGHUP);
603 set_handler_default(SIGPIPE);
604 set_handler_default(SIGCHLD);
605 sigprocmask(0, 0, &sigs);
606 sigprocmask(SIG_UNBLOCK, &sigs, 0);
607 execve(executable, client_argv, client_env);
608
609 exit(0);
610 } else {
611 child = PMIX_NEW(wait_tracker_t);
612 child->pid = pid;
613 pmix_list_append(&children, &child->super);
614 }
615 }
616 pmix_argv_free(client_argv);
617 pmix_argv_free(client_env);
618
619
620 while (0 < wakeup) {
621 struct timespec ts;
622 ts.tv_sec = 0;
623 ts.tv_nsec = 100000;
624 nanosleep(&ts, NULL);
625 }
626
627
628
629 if (NULL == strstr(executable, "simpdie")) {
630 n=0;
631 PMIX_LIST_FOREACH(child, &children, wait_tracker_t) {
632 if (0 != child->exit_code) {
633 fprintf(stderr, "Child %d [%d] exited with status %d - test FAILED\n", n, child->pid, child->exit_code);
634 }
635 ++n;
636 }
637 } else if (1 == exit_code) {
638 exit_code = 0;
639 }
640 free(executable);
641
642
643 ninfo = 3;
644 PMIX_INFO_CREATE(info, ninfo);
645 PMIX_INFO_LOAD(&info[0], PMIX_PROGRAMMING_MODEL, "PMIX", PMIX_STRING);
646 PMIX_INFO_LOAD(&info[1], PMIX_MODEL_LIBRARY_NAME, "test", PMIX_STRING);
647
648 PMIX_INFO_LOAD(&info[2], PMIX_EVENT_NON_DEFAULT, NULL, PMIX_BOOL);
649 DEBUG_CONSTRUCT_LOCK(&globallock);
650 PMIx_Notify_event(PMIX_MODEL_DECLARED,
651 &pmix_globals.myid, PMIX_RANGE_PROC_LOCAL,
652 info, ninfo, NULL, NULL);
653 DEBUG_WAIT_THREAD(&globallock);
654 DEBUG_DESTRUCT_LOCK(&globallock);
655 PMIX_INFO_FREE(info, ninfo);
656
657 #if 0
658 fprintf(stderr, "TEST NONDEFAULT NOTIFICATION\n");
659
660 ninfo = 1;
661 PMIX_INFO_CREATE(info, ninfo);
662
663 PMIX_INFO_LOAD(&info[0], PMIX_EVENT_NON_DEFAULT, NULL, PMIX_BOOL);
664 PMIx_Notify_event(PMIX_ERR_DEBUGGER_RELEASE,
665 &pmix_globals.myid, PMIX_RANGE_LOCAL,
666 info, ninfo, NULL, NULL);
667 PMIX_INFO_FREE(info, ninfo);
668
669 for (ninfo=0; ninfo < 100000; ninfo++) {
670 struct timespec t = {0, 100};
671 nanosleep(&t, NULL);
672 }
673 #endif
674
675 done:
676
677 PMIx_Deregister_event_handler(0, NULL, NULL);
678
679
680 PMIX_LIST_DESTRUCT(&pubdata);
681
682
683 PMIX_LIST_DESTRUCT(&children);
684
685
686 if (PMIX_SUCCESS != (rc = PMIx_server_finalize())) {
687 fprintf(stderr, "Finalize failed with error %d\n", rc);
688 exit_code = rc;
689 }
690
691 if (0 == exit_code) {
692 fprintf(stderr, "Test finished OK!\n");
693 } else {
694 fprintf(stderr, "TEST FAILED WITH ERROR %d\n", exit_code);
695 }
696
697 return exit_code;
698 }
699
700 static void set_namespace(int nprocs, char *ranks, char *nspace,
701 pmix_op_cbfunc_t cbfunc, myxfer_t *x)
702 {
703 char *regex, *ppn;
704 char hostname[PMIX_MAXHOSTNAMELEN];
705
706 gethostname(hostname, sizeof(hostname));
707 x->ninfo = 7;
708
709 PMIX_INFO_CREATE(x->info, x->ninfo);
710 (void)strncpy(x->info[0].key, PMIX_UNIV_SIZE, PMIX_MAX_KEYLEN);
711 x->info[0].value.type = PMIX_UINT32;
712 x->info[0].value.data.uint32 = nprocs;
713
714 (void)strncpy(x->info[1].key, PMIX_SPAWNED, PMIX_MAX_KEYLEN);
715 x->info[1].value.type = PMIX_UINT32;
716 x->info[1].value.data.uint32 = 0;
717
718 (void)strncpy(x->info[2].key, PMIX_LOCAL_SIZE, PMIX_MAX_KEYLEN);
719 x->info[2].value.type = PMIX_UINT32;
720 x->info[2].value.data.uint32 = nprocs;
721
722 (void)strncpy(x->info[3].key, PMIX_LOCAL_PEERS, PMIX_MAX_KEYLEN);
723 x->info[3].value.type = PMIX_STRING;
724 x->info[3].value.data.string = strdup(ranks);
725
726 PMIx_generate_regex(hostname, ®ex);
727 (void)strncpy(x->info[4].key, PMIX_NODE_MAP, PMIX_MAX_KEYLEN);
728 x->info[4].value.type = PMIX_STRING;
729 x->info[4].value.data.string = regex;
730
731 PMIx_generate_ppn(ranks, &ppn);
732 (void)strncpy(x->info[5].key, PMIX_PROC_MAP, PMIX_MAX_KEYLEN);
733 x->info[5].value.type = PMIX_STRING;
734 x->info[5].value.data.string = ppn;
735
736 (void)strncpy(x->info[6].key, PMIX_JOB_SIZE, PMIX_MAX_KEYLEN);
737 x->info[6].value.type = PMIX_UINT32;
738 x->info[6].value.data.uint32 = nprocs;
739
740 PMIx_server_register_nspace(nspace, nprocs, x->info, x->ninfo,
741 cbfunc, x);
742 }
743
744 static void errhandler(size_t evhdlr_registration_id,
745 pmix_status_t status,
746 const pmix_proc_t *source,
747 pmix_info_t info[], size_t ninfo,
748 pmix_info_t results[], size_t nresults,
749 pmix_event_notification_cbfunc_fn_t cbfunc,
750 void *cbdata)
751 {
752 pmix_output(0, "SERVER: ERRHANDLER CALLED WITH STATUS %d", status);
753
754
755
756 if (NULL != cbfunc) {
757 cbfunc(PMIX_SUCCESS, NULL, 0, NULL, NULL, cbdata);
758 }
759 }
760
761 static void errhandler_reg_callbk (pmix_status_t status,
762 size_t errhandler_ref,
763 void *cbdata)
764 {
765 mylock_t *lock = (mylock_t*)cbdata;
766
767 pmix_output(0, "SERVER: ERRHANDLER REGISTRATION CALLBACK CALLED WITH STATUS %d, ref=%lu",
768 status, (unsigned long)errhandler_ref);
769 lock->status = status;
770 DEBUG_WAKEUP_THREAD(lock);
771 }
772
773 static pmix_status_t connected(const pmix_proc_t *proc, void *server_object,
774 pmix_op_cbfunc_t cbfunc, void *cbdata)
775 {
776 return PMIX_OPERATION_SUCCEEDED;
777 }
778 static pmix_status_t finalized(const pmix_proc_t *proc, void *server_object,
779 pmix_op_cbfunc_t cbfunc, void *cbdata)
780 {
781 pmix_output(0, "SERVER: FINALIZED %s:%d WAKEUP %d",
782 proc->nspace, proc->rank, wakeup);
783 return PMIX_OPERATION_SUCCEEDED;
784 }
785
786 static void abcbfunc(pmix_status_t status, void *cbdata)
787 {
788 myxfer_t *x = (myxfer_t*)cbdata;
789
790
791 if (NULL != x->cbfunc) {
792 x->cbfunc(status, x->cbdata);
793 }
794 PMIX_RELEASE(x);
795 }
796
797 static pmix_status_t abort_fn(const pmix_proc_t *proc,
798 void *server_object,
799 int status, const char msg[],
800 pmix_proc_t procs[], size_t nprocs,
801 pmix_op_cbfunc_t cbfunc, void *cbdata)
802 {
803 pmix_status_t rc;
804 myxfer_t *x;
805
806 if (NULL != procs) {
807 pmix_output(0, "SERVER: ABORT on %s:%d", procs[0].nspace, procs[0].rank);
808 } else {
809 pmix_output(0, "SERVER: ABORT OF ALL PROCS IN NSPACE %s", proc->nspace);
810 }
811
812
813
814
815
816
817 x = PMIX_NEW(myxfer_t);
818 (void)strncpy(x->caller.nspace, proc->nspace, PMIX_MAX_NSLEN);
819 x->caller.rank = proc->rank;
820
821 PMIX_INFO_CREATE(x->info, 2);
822 (void)strncpy(x->info[0].key, "DARTH", PMIX_MAX_KEYLEN);
823 x->info[0].value.type = PMIX_INT8;
824 x->info[0].value.data.int8 = 12;
825 (void)strncpy(x->info[1].key, "VADER", PMIX_MAX_KEYLEN);
826 x->info[1].value.type = PMIX_DOUBLE;
827 x->info[1].value.data.dval = 12.34;
828 x->cbfunc = cbfunc;
829 x->cbdata = cbdata;
830
831 if (PMIX_SUCCESS != (rc = PMIx_Notify_event(status, &x->caller,
832 PMIX_RANGE_NAMESPACE,
833 x->info, 2,
834 abcbfunc, x))) {
835 pmix_output(0, "SERVER: FAILED NOTIFY ERROR %d", (int)rc);
836 }
837
838 return PMIX_SUCCESS;
839 }
840
841 static void fencbfn(int sd, short args, void *cbdata)
842 {
843 pmix_shift_caddy_t *scd = (pmix_shift_caddy_t*)cbdata;
844
845
846 if (NULL != scd->cbfunc.modexcbfunc) {
847 scd->cbfunc.modexcbfunc(scd->status, scd->data, scd->ndata, scd->cbdata, NULL, NULL);
848 }
849 PMIX_RELEASE(scd);
850 }
851 static pmix_status_t fencenb_fn(const pmix_proc_t procs[], size_t nprocs,
852 const pmix_info_t info[], size_t ninfo,
853 char *data, size_t ndata,
854 pmix_modex_cbfunc_t cbfunc, void *cbdata)
855 {
856 pmix_shift_caddy_t *scd;
857
858 pmix_output(0, "SERVER: FENCENB");
859 scd = PMIX_NEW(pmix_shift_caddy_t);
860 scd->status = PMIX_SUCCESS;
861 scd->data = data;
862 scd->ndata = ndata;
863 scd->cbfunc.modexcbfunc = cbfunc;
864 scd->cbdata = cbdata;
865 PMIX_THREADSHIFT(scd, fencbfn);
866 return PMIX_SUCCESS;
867 }
868
869
870 static pmix_status_t dmodex_fn(const pmix_proc_t *proc,
871 const pmix_info_t info[], size_t ninfo,
872 pmix_modex_cbfunc_t cbfunc, void *cbdata)
873 {
874 pmix_shift_caddy_t *scd;
875
876 pmix_output(0, "SERVER: DMODEX");
877
878
879 if (istimeouttest) {
880 return PMIX_SUCCESS;
881 }
882
883 scd = PMIX_NEW(pmix_shift_caddy_t);
884 scd->status = PMIX_ERR_NOT_FOUND;
885 scd->cbfunc.modexcbfunc = cbfunc;
886 scd->cbdata = cbdata;
887 PMIX_THREADSHIFT(scd, fencbfn);
888
889 return PMIX_SUCCESS;
890 }
891
892
893 static pmix_status_t publish_fn(const pmix_proc_t *proc,
894 const pmix_info_t info[], size_t ninfo,
895 pmix_op_cbfunc_t cbfunc, void *cbdata)
896 {
897 pmix_locdat_t *p;
898 size_t n;
899
900 pmix_output(0, "SERVER: PUBLISH");
901
902 for (n=0; n < ninfo; n++) {
903 p = PMIX_NEW(pmix_locdat_t);
904 (void)strncpy(p->pdata.proc.nspace, proc->nspace, PMIX_MAX_NSLEN);
905 p->pdata.proc.rank = proc->rank;
906 (void)strncpy(p->pdata.key, info[n].key, PMIX_MAX_KEYLEN);
907 pmix_value_xfer(&p->pdata.value, (pmix_value_t*)&info[n].value);
908 pmix_list_append(&pubdata, &p->super);
909 }
910
911 return PMIX_OPERATION_SUCCEEDED;
912 }
913
914 typedef struct {
915 pmix_event_t ev;
916 pmix_pdata_t *pd;
917 size_t n;
918 pmix_lookup_cbfunc_t cbfunc;
919 void *cbdata;
920 } lkobj_t;
921
922 static void lkcbfn(int sd, short args, void *cbdata)
923 {
924 lkobj_t *lk = (lkobj_t*)cbdata;
925
926 lk->cbfunc(PMIX_SUCCESS, lk->pd, lk->n, lk->cbdata);
927 PMIX_PDATA_FREE(lk->pd, lk->n);
928 free(lk);
929 }
930
931 static pmix_status_t lookup_fn(const pmix_proc_t *proc, char **keys,
932 const pmix_info_t info[], size_t ninfo,
933 pmix_lookup_cbfunc_t cbfunc, void *cbdata)
934 {
935 pmix_locdat_t *p, *p2;
936 pmix_list_t results;
937 size_t i, n;
938 pmix_pdata_t *pd = NULL;
939 pmix_status_t ret = PMIX_ERR_NOT_FOUND;
940 lkobj_t *lk;
941
942 pmix_output(0, "SERVER: LOOKUP");
943
944 PMIX_CONSTRUCT(&results, pmix_list_t);
945
946 for (n=0; NULL != keys[n]; n++) {
947 PMIX_LIST_FOREACH(p, &pubdata, pmix_locdat_t) {
948 if (0 == strncmp(keys[n], p->pdata.key, PMIX_MAX_KEYLEN)) {
949 p2 = PMIX_NEW(pmix_locdat_t);
950 (void)strncpy(p2->pdata.proc.nspace, p->pdata.proc.nspace, PMIX_MAX_NSLEN);
951 p2->pdata.proc.rank = p->pdata.proc.rank;
952 (void)strncpy(p2->pdata.key, p->pdata.key, PMIX_MAX_KEYLEN);
953 pmix_value_xfer(&p2->pdata.value, &p->pdata.value);
954 pmix_list_append(&results, &p2->super);
955 break;
956 }
957 }
958 }
959 if (0 < (n = pmix_list_get_size(&results))) {
960 ret = PMIX_SUCCESS;
961 PMIX_PDATA_CREATE(pd, n);
962 for (i=0; i < n; i++) {
963 p = (pmix_locdat_t*)pmix_list_remove_first(&results);
964 if (p) {
965 (void)strncpy(pd[i].proc.nspace, p->pdata.proc.nspace, PMIX_MAX_NSLEN);
966 pd[i].proc.rank = p->pdata.proc.rank;
967 (void)strncpy(pd[i].key, p->pdata.key, PMIX_MAX_KEYLEN);
968 pmix_value_xfer(&pd[i].value, &p->pdata.value);
969 }
970 }
971 }
972 PMIX_LIST_DESTRUCT(&results);
973 if (PMIX_SUCCESS == ret) {
974 lk = (lkobj_t*)malloc(sizeof(lkobj_t));
975 lk->pd = pd;
976 lk->n = n;
977 lk->cbfunc = cbfunc;
978 lk->cbdata = cbdata;
979 PMIX_THREADSHIFT(lk, lkcbfn);
980 }
981
982 return ret;
983 }
984
985
986 static pmix_status_t unpublish_fn(const pmix_proc_t *proc, char **keys,
987 const pmix_info_t info[], size_t ninfo,
988 pmix_op_cbfunc_t cbfunc, void *cbdata)
989 {
990 pmix_locdat_t *p, *p2;
991 size_t n;
992
993 pmix_output(0, "SERVER: UNPUBLISH");
994
995 for (n=0; NULL != keys[n]; n++) {
996 PMIX_LIST_FOREACH_SAFE(p, p2, &pubdata, pmix_locdat_t) {
997 if (0 == strncmp(keys[n], p->pdata.key, PMIX_MAX_KEYLEN)) {
998 pmix_list_remove_item(&pubdata, &p->super);
999 PMIX_RELEASE(p);
1000 break;
1001 }
1002 }
1003 }
1004 return PMIX_OPERATION_SUCCEEDED;
1005 }
1006
1007 static void spcbfunc(pmix_status_t status, void *cbdata)
1008 {
1009 myxfer_t *x = (myxfer_t*)cbdata;
1010
1011 if (NULL != x->spcbfunc) {
1012 x->spcbfunc(PMIX_SUCCESS, "DYNSPACE", x->cbdata);
1013 }
1014 }
1015
1016 static pmix_status_t spawn_fn(const pmix_proc_t *proc,
1017 const pmix_info_t job_info[], size_t ninfo,
1018 const pmix_app_t apps[], size_t napps,
1019 pmix_spawn_cbfunc_t cbfunc, void *cbdata)
1020 {
1021 myxfer_t *x;
1022 size_t n;
1023 pmix_proc_t *pptr;
1024 bool spawned;
1025
1026 pmix_output(0, "SERVER: SPAWN");
1027
1028
1029 for (n=0; n < ninfo; n++) {
1030 if (0 == strncmp(job_info[n].key, PMIX_PARENT_ID, PMIX_MAX_KEYLEN)) {
1031 pptr = job_info[n].value.data.proc;
1032 pmix_output(0, "SPAWN: Parent ID %s:%d", pptr->nspace, pptr->rank);
1033 } else if (0 == strncmp(job_info[n].key, PMIX_SPAWNED, PMIX_MAX_KEYLEN)) {
1034 spawned = PMIX_INFO_TRUE(&job_info[n]);
1035 pmix_output(0, "SPAWN: Spawned %s", spawned ? "TRUE" : "FALSE");
1036 }
1037 }
1038
1039
1040
1041
1042
1043
1044
1045
1046 x = PMIX_NEW(myxfer_t);
1047 x->spcbfunc = cbfunc;
1048 x->cbdata = cbdata;
1049
1050 set_namespace(2, "0,1", "DYNSPACE", spcbfunc, x);
1051
1052 return PMIX_SUCCESS;
1053 }
1054
1055 static int numconnects = 0;
1056
1057 static pmix_status_t connect_fn(const pmix_proc_t procs[], size_t nprocs,
1058 const pmix_info_t info[], size_t ninfo,
1059 pmix_op_cbfunc_t cbfunc, void *cbdata)
1060 {
1061 pmix_output(0, "SERVER: CONNECT");
1062
1063
1064
1065
1066 numconnects++;
1067
1068 return PMIX_OPERATION_SUCCEEDED;
1069 }
1070
1071
1072 static pmix_status_t disconnect_fn(const pmix_proc_t procs[], size_t nprocs,
1073 const pmix_info_t info[], size_t ninfo,
1074 pmix_op_cbfunc_t cbfunc, void *cbdata)
1075 {
1076 pmix_output(0, "SERVER: DISCONNECT");
1077
1078 return PMIX_OPERATION_SUCCEEDED;
1079 }
1080
1081 static pmix_status_t register_event_fn(pmix_status_t *codes, size_t ncodes,
1082 const pmix_info_t info[], size_t ninfo,
1083 pmix_op_cbfunc_t cbfunc, void *cbdata)
1084 {
1085 return PMIX_OPERATION_SUCCEEDED;
1086 }
1087
1088 static pmix_status_t deregister_events(pmix_status_t *codes, size_t ncodes,
1089 pmix_op_cbfunc_t cbfunc, void *cbdata)
1090 {
1091 return PMIX_OPERATION_SUCCEEDED;
1092 }
1093
1094 static pmix_status_t notify_event(pmix_status_t code,
1095 const pmix_proc_t *source,
1096 pmix_data_range_t range,
1097 pmix_info_t info[], size_t ninfo,
1098 pmix_op_cbfunc_t cbfunc, void *cbdata)
1099 {
1100 pmix_output(0, "SERVER: NOTIFY EVENT");
1101 return PMIX_OPERATION_SUCCEEDED;
1102 }
1103
1104 typedef struct query_data_t {
1105 pmix_event_t ev;
1106 pmix_info_t *data;
1107 size_t ndata;
1108 pmix_info_cbfunc_t cbfunc;
1109 void *cbdata;
1110 } query_data_t;
1111
1112 static void qfn(int sd, short args, void *cbdata)
1113 {
1114 query_data_t *qd = (query_data_t*)cbdata;
1115
1116 qd->cbfunc(PMIX_SUCCESS, qd->data, qd->ndata, qd->cbdata, NULL, NULL);
1117 PMIX_INFO_FREE(qd->data, qd->ndata);
1118 }
1119
1120 static pmix_status_t query_fn(pmix_proc_t *proct,
1121 pmix_query_t *queries, size_t nqueries,
1122 pmix_info_cbfunc_t cbfunc,
1123 void *cbdata)
1124 {
1125 size_t n;
1126 pmix_info_t *info;
1127 query_data_t qd;
1128
1129 pmix_output(0, "SERVER: QUERY");
1130
1131 if (NULL == cbfunc) {
1132 return PMIX_ERROR;
1133 }
1134
1135 PMIX_INFO_CREATE(info, nqueries);
1136 for (n=0; n < nqueries; n++) {
1137 pmix_output(0, "\tKey: %s", queries[n].keys[0]);
1138 (void)strncpy(info[n].key, queries[n].keys[0], PMIX_MAX_KEYLEN);
1139 info[n].value.type = PMIX_STRING;
1140 if (0 > asprintf(&info[n].value.data.string, "%d", (int)n)) {
1141 return PMIX_ERROR;
1142 }
1143 }
1144 qd.data = info;
1145 qd.ndata = nqueries;
1146 qd.cbfunc = cbfunc;
1147 qd.cbdata = cbdata;
1148 PMIX_THREADSHIFT(&qd, qfn);
1149 return PMIX_SUCCESS;
1150 }
1151
1152 static void tool_connect_fn(pmix_info_t *info, size_t ninfo,
1153 pmix_tool_connection_cbfunc_t cbfunc,
1154 void *cbdata)
1155 {
1156 pmix_proc_t proc;
1157
1158 pmix_output(0, "SERVER: TOOL CONNECT");
1159
1160
1161 (void)strncpy(proc.nspace, "TOOL", PMIX_MAX_NSLEN);
1162 proc.rank = 0;
1163
1164 if (NULL != cbfunc) {
1165 cbfunc(PMIX_SUCCESS, &proc, cbdata);
1166 }
1167 }
1168
1169 typedef struct {
1170 pmix_event_t ev;
1171 pmix_op_cbfunc_t cbfunc;
1172 void *cbdata;
1173 } mylog_t;
1174
1175 static void foobar(int sd, short args, void *cbdata)
1176 {
1177 mylog_t *lg = (mylog_t*)cbdata;
1178 lg->cbfunc(PMIX_SUCCESS, lg->cbdata);
1179 }
1180 static void log_fn(const pmix_proc_t *client,
1181 const pmix_info_t data[], size_t ndata,
1182 const pmix_info_t directives[], size_t ndirs,
1183 pmix_op_cbfunc_t cbfunc, void *cbdata)
1184 {
1185 mylog_t *lg = (mylog_t *)malloc(sizeof(mylog_t));
1186
1187 pmix_output(0, "SERVER: LOG");
1188
1189 lg->cbfunc = cbfunc;
1190 lg->cbdata = cbdata;
1191 PMIX_THREADSHIFT(lg, foobar);
1192 }
1193
1194 static pmix_status_t alloc_fn(const pmix_proc_t *client,
1195 pmix_alloc_directive_t directive,
1196 const pmix_info_t data[], size_t ndata,
1197 pmix_info_cbfunc_t cbfunc, void *cbdata)
1198 {
1199 return PMIX_OPERATION_SUCCEEDED;
1200 }
1201
1202 static pmix_status_t jctrl_fn(const pmix_proc_t *requestor,
1203 const pmix_proc_t targets[], size_t ntargets,
1204 const pmix_info_t directives[], size_t ndirs,
1205 pmix_info_cbfunc_t cbfunc, void *cbdata)
1206 {
1207 return PMIX_OPERATION_SUCCEEDED;
1208 }
1209
1210 static pmix_status_t mon_fn(const pmix_proc_t *requestor,
1211 const pmix_info_t *monitor, pmix_status_t error,
1212 const pmix_info_t directives[], size_t ndirs,
1213 pmix_info_cbfunc_t cbfunc, void *cbdata)
1214 {
1215 return PMIX_ERR_NOT_SUPPORTED;
1216 }
1217
1218
1219 static void wait_signal_callback(int fd, short event, void *arg)
1220 {
1221 pmix_event_t *sig = (pmix_event_t*) arg;
1222 int status;
1223 pid_t pid;
1224 wait_tracker_t *t2;
1225
1226 if (SIGCHLD != pmix_event_get_signal(sig)) {
1227 return;
1228 }
1229
1230
1231
1232
1233 while (1) {
1234 pid = waitpid(-1, &status, WNOHANG);
1235 if (-1 == pid && EINTR == errno) {
1236
1237 continue;
1238 }
1239
1240 if (pid <= 0) {
1241 return;
1242 }
1243
1244
1245 PMIX_LIST_FOREACH(t2, &children, wait_tracker_t) {
1246 if (pid == t2->pid) {
1247
1248 if (WIFEXITED(status)) {
1249 t2->exit_code = WEXITSTATUS(status);
1250 } else {
1251 if (WIFSIGNALED(status)) {
1252 t2->exit_code = WTERMSIG(status) + 128;
1253 }
1254 }
1255 if (0 != t2->exit_code && 0 == exit_code) {
1256 exit_code = t2->exit_code;
1257 }
1258 --wakeup;
1259 break;
1260 }
1261 }
1262 }
1263 fprintf(stderr, "ENDLOOP\n");
1264 }