This source file includes following definitions.
- xfcon
- xfdes
- opcbfunc
- sacbfunc
- infocbfunc
- main
- set_namespace
- errhandler
- errhandler_reg_callbk
- connected
- finalized
- abcbfunc
- abort_fn
- fencenb_fn
- dmodex_fn
- publish_fn
- lookup_fn
- unpublish_fn
- spcbfunc
- spawn_fn
- connect_fn
- disconnect_fn
- register_event_fn
- deregister_events
- notify_event
- query_fn
- tool_connect_fn
- log_fn
- wait_signal_callback
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 #include <src/include/pmix_config.h>
29 #include <pmix_server.h>
30 #include <src/include/types.h>
31 #include <src/include/pmix_globals.h>
32
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <unistd.h>
36 #include <time.h>
37 #include <sys/types.h>
38 #include <sys/wait.h>
39 #include <errno.h>
40 #include <signal.h>
41
42 #include "src/class/pmix_list.h"
43 #include "src/util/pmix_environ.h"
44 #include "src/util/output.h"
45 #include "src/util/printf.h"
46 #include "src/util/argv.h"
47
48 static pmix_status_t connected(const pmix_proc_t *proc, void *server_object,
49 pmix_op_cbfunc_t cbfunc, void *cbdata);
50 static pmix_status_t finalized(const pmix_proc_t *proc, void *server_object,
51 pmix_op_cbfunc_t cbfunc, void *cbdata);
52 static pmix_status_t abort_fn(const pmix_proc_t *proc, void *server_object,
53 int status, const char msg[],
54 pmix_proc_t procs[], size_t nprocs,
55 pmix_op_cbfunc_t cbfunc, void *cbdata);
56 static pmix_status_t fencenb_fn(const pmix_proc_t procs[], size_t nprocs,
57 const pmix_info_t info[], size_t ninfo,
58 char *data, size_t ndata,
59 pmix_modex_cbfunc_t cbfunc, void *cbdata);
60 static pmix_status_t dmodex_fn(const pmix_proc_t *proc,
61 const pmix_info_t info[], size_t ninfo,
62 pmix_modex_cbfunc_t cbfunc, void *cbdata);
63 static pmix_status_t publish_fn(const pmix_proc_t *proc,
64 const pmix_info_t info[], size_t ninfo,
65 pmix_op_cbfunc_t cbfunc, void *cbdata);
66 static pmix_status_t lookup_fn(const pmix_proc_t *proc, char **keys,
67 const pmix_info_t info[], size_t ninfo,
68 pmix_lookup_cbfunc_t cbfunc, void *cbdata);
69 static pmix_status_t unpublish_fn(const pmix_proc_t *proc, char **keys,
70 const pmix_info_t info[], size_t ninfo,
71 pmix_op_cbfunc_t cbfunc, void *cbdata);
72 static pmix_status_t spawn_fn(const pmix_proc_t *proc,
73 const pmix_info_t job_info[], size_t ninfo,
74 const pmix_app_t apps[], size_t napps,
75 pmix_spawn_cbfunc_t cbfunc, void *cbdata);
76 static pmix_status_t connect_fn(const pmix_proc_t procs[], size_t nprocs,
77 const pmix_info_t info[], size_t ninfo,
78 pmix_op_cbfunc_t cbfunc, void *cbdata);
79 static pmix_status_t disconnect_fn(const pmix_proc_t procs[], size_t nprocs,
80 const pmix_info_t info[], size_t ninfo,
81 pmix_op_cbfunc_t cbfunc, void *cbdata);
82 static pmix_status_t register_event_fn(pmix_status_t *codes, size_t ncodes,
83 const pmix_info_t info[], size_t ninfo,
84 pmix_op_cbfunc_t cbfunc, void *cbdata);
85 static pmix_status_t deregister_events(pmix_status_t *codes, size_t ncodes,
86 pmix_op_cbfunc_t cbfunc, void *cbdata);
87 static pmix_status_t notify_event(pmix_status_t code,
88 const pmix_proc_t *source,
89 pmix_data_range_t range,
90 pmix_info_t info[], size_t ninfo,
91 pmix_op_cbfunc_t cbfunc, void *cbdata);
92 static pmix_status_t query_fn(pmix_proc_t *proct,
93 pmix_query_t *queries, size_t nqueries,
94 pmix_info_cbfunc_t cbfunc,
95 void *cbdata);
96 static void tool_connect_fn(pmix_info_t *info, size_t ninfo,
97 pmix_tool_connection_cbfunc_t cbfunc,
98 void *cbdata);
99 static void log_fn(const pmix_proc_t *client,
100 const pmix_info_t data[], size_t ndata,
101 const pmix_info_t directives[], size_t ndirs,
102 pmix_op_cbfunc_t cbfunc, void *cbdata);
103
104 static pmix_server_module_t mymodule = {
105 .client_connected = connected,
106 .client_finalized = finalized,
107 .abort = abort_fn,
108 .fence_nb = fencenb_fn,
109 .direct_modex = dmodex_fn,
110 .publish = publish_fn,
111 .lookup = lookup_fn,
112 .unpublish = unpublish_fn,
113 .spawn = spawn_fn,
114 .connect = connect_fn,
115 .disconnect = disconnect_fn,
116 .register_events = register_event_fn,
117 .deregister_events = deregister_events,
118 .notify_event = notify_event,
119 .query = query_fn,
120 .tool_connected = tool_connect_fn,
121 .log = log_fn
122 };
123
124 typedef struct {
125 pthread_mutex_t mutex;
126 pthread_cond_t cond;
127 volatile bool active;
128 pmix_status_t status;
129 } mylock_t;
130
131 #define DEBUG_CONSTRUCT_LOCK(l) \
132 do { \
133 pthread_mutex_init(&(l)->mutex, NULL); \
134 pthread_cond_init(&(l)->cond, NULL); \
135 (l)->active = true; \
136 (l)->status = PMIX_SUCCESS; \
137 } while(0)
138
139 #define DEBUG_DESTRUCT_LOCK(l) \
140 do { \
141 pthread_mutex_destroy(&(l)->mutex); \
142 pthread_cond_destroy(&(l)->cond); \
143 } while(0)
144
145 #define DEBUG_WAIT_THREAD(lck) \
146 do { \
147 pthread_mutex_lock(&(lck)->mutex); \
148 while ((lck)->active) { \
149 pthread_cond_wait(&(lck)->cond, &(lck)->mutex); \
150 } \
151 pthread_mutex_unlock(&(lck)->mutex); \
152 } while(0)
153
154 #define DEBUG_WAKEUP_THREAD(lck) \
155 do { \
156 pthread_mutex_lock(&(lck)->mutex); \
157 (lck)->active = false; \
158 pthread_cond_broadcast(&(lck)->cond); \
159 pthread_mutex_unlock(&(lck)->mutex); \
160 } while(0)
161
162 typedef struct {
163 pmix_list_item_t super;
164 pmix_pdata_t pdata;
165 } pmix_locdat_t;
166 PMIX_CLASS_INSTANCE(pmix_locdat_t,
167 pmix_list_item_t,
168 NULL, NULL);
169
170 typedef struct {
171 pmix_object_t super;
172 mylock_t lock;
173 pmix_status_t status;
174 pmix_proc_t caller;
175 pmix_info_t *info;
176 size_t ninfo;
177 pmix_op_cbfunc_t cbfunc;
178 pmix_spawn_cbfunc_t spcbfunc;
179 void *cbdata;
180 } myxfer_t;
181 static void xfcon(myxfer_t *p)
182 {
183 DEBUG_CONSTRUCT_LOCK(&p->lock);
184 p->info = NULL;
185 p->ninfo = 0;
186 p->cbfunc = NULL;
187 p->spcbfunc = NULL;
188 p->cbdata = NULL;
189 }
190 static void xfdes(myxfer_t *p)
191 {
192 DEBUG_DESTRUCT_LOCK(&p->lock);
193 if (NULL != p->info) {
194 PMIX_INFO_FREE(p->info, p->ninfo);
195 }
196 }
197 PMIX_CLASS_INSTANCE(myxfer_t,
198 pmix_object_t,
199 xfcon, xfdes);
200
201 typedef struct {
202 pmix_list_item_t super;
203 int exit_code;
204 pid_t pid;
205 } wait_tracker_t;
206 PMIX_CLASS_INSTANCE(wait_tracker_t,
207 pmix_list_item_t,
208 NULL, NULL);
209
210 static volatile int wakeup;
211 static int exit_code = 0;
212 static pmix_list_t pubdata;
213 static pmix_event_t handler;
214 static pmix_list_t children;
215 static bool istimeouttest = false;
216
217 static void set_namespace(int nprocs, char *ranks, char *nspace,
218 pmix_op_cbfunc_t cbfunc, myxfer_t *x);
219 static void errhandler(size_t evhdlr_registration_id,
220 pmix_status_t status,
221 const pmix_proc_t *source,
222 pmix_info_t info[], size_t ninfo,
223 pmix_info_t results[], size_t nresults,
224 pmix_event_notification_cbfunc_fn_t cbfunc,
225 void *cbdata);
226 static void wait_signal_callback(int fd, short event, void *arg);
227 static void errhandler_reg_callbk (pmix_status_t status,
228 size_t errhandler_ref,
229 void *cbdata);
230
231 static void opcbfunc(pmix_status_t status, void *cbdata)
232 {
233 myxfer_t *x = (myxfer_t*)cbdata;
234
235 x->status = status;
236
237 if (NULL != x->cbfunc) {
238 x->cbfunc(PMIX_SUCCESS, x->cbdata);
239 }
240 DEBUG_WAKEUP_THREAD(&x->lock);
241 }
242
243 static void sacbfunc(pmix_status_t status,
244 pmix_info_t info[], size_t ninfo,
245 void *provided_cbdata,
246 pmix_op_cbfunc_t cbfunc, void *cbdata)
247 {
248 myxfer_t *x = (myxfer_t*)provided_cbdata;
249 size_t n;
250
251 x->status = status;
252 if (NULL != info) {
253 x->ninfo = ninfo;
254 PMIX_INFO_CREATE(x->info, x->ninfo);
255 for (n=0; n < ninfo; n++) {
256
257 PMIX_INFO_XFER(&x->info[n], &info[n]);
258 }
259 }
260 if (NULL != cbfunc) {
261 cbfunc(PMIX_SUCCESS, cbdata);
262 }
263 DEBUG_WAKEUP_THREAD(&x->lock);
264 }
265
266 static void infocbfunc(pmix_status_t status,
267 pmix_info_t *info, size_t ninfo,
268 void *cbdata,
269 pmix_release_cbfunc_t release_fn,
270 void *release_cbdata)
271 {
272 myxfer_t *x = (myxfer_t*)cbdata;
273 size_t n;
274
275 x->status = status;
276 if (NULL != info) {
277 x->ninfo = ninfo;
278 PMIX_INFO_CREATE(x->info, x->ninfo);
279 for (n=0; n < ninfo; n++) {
280
281 PMIX_INFO_XFER(&x->info[n], &info[n]);
282 }
283 }
284 if (NULL != release_fn) {
285 release_fn(release_cbdata);
286 }
287 DEBUG_WAKEUP_THREAD(&x->lock);
288 }
289
290 int main(int argc, char **argv)
291 {
292 char **client_env=NULL;
293 char **client_argv=NULL;
294 char *tmp, **atmp, *executable=NULL;
295 int rc, nprocs=1, n;
296 uid_t myuid;
297 gid_t mygid;
298 pid_t pid;
299 myxfer_t *x;
300 pmix_proc_t proc;
301 wait_tracker_t *child;
302 pmix_info_t *info;
303 size_t ninfo;
304 mylock_t mylock;
305 pmix_data_array_t *darray;
306 pmix_info_t *iarray;
307
308
309 if (PMIX_SUCCESS != 0) {
310 fprintf(stderr, "ERROR IN COMPUTING CONSTANTS: PMIX_SUCCESS = %d\n", PMIX_SUCCESS);
311 exit(1);
312 }
313
314 fprintf(stderr, "GW[%d]: Testing version %s\n", (int)getpid(), PMIx_Get_version());
315
316
317
318 for (n=1; n < argc; n++) {
319 if (0 == strcmp("-n", argv[n]) &&
320 NULL != argv[n+1]) {
321 nprocs = strtol(argv[n+1], NULL, 10);
322 ++n;
323 } else if (0 == strcmp("-h", argv[n])) {
324
325 fprintf(stderr, "usage: simptest <options>\n");
326 fprintf(stderr, " -n N Number of clients to run\n");
327 exit(0);
328 }
329 }
330 executable = strdup("./gwclient");
331
332
333 ninfo = 3;
334 PMIX_INFO_CREATE(info, ninfo);
335 PMIX_INFO_LOAD(&info[0], PMIX_SERVER_TOOL_SUPPORT, NULL, PMIX_BOOL);
336 PMIX_INFO_LOAD(&info[1], PMIX_USOCK_DISABLE, NULL, PMIX_BOOL);
337 PMIX_INFO_LOAD(&info[2], PMIX_SERVER_GATEWAY, NULL, PMIX_BOOL);
338 if (PMIX_SUCCESS != (rc = PMIx_server_init(&mymodule, info, ninfo))) {
339 fprintf(stderr, "Init failed with error %d\n", rc);
340 return rc;
341 }
342 PMIX_INFO_FREE(info, ninfo);
343
344
345 DEBUG_CONSTRUCT_LOCK(&mylock);
346 ninfo = 1;
347 PMIX_INFO_CREATE(info, ninfo);
348 PMIX_INFO_LOAD(&info[0], PMIX_EVENT_HDLR_NAME, "GWTEST-DEFAULT", PMIX_STRING);
349 PMIx_Register_event_handler(NULL, 0, info, ninfo,
350 errhandler, errhandler_reg_callbk, (void*)&mylock);
351 DEBUG_WAIT_THREAD(&mylock);
352 PMIX_INFO_FREE(info, ninfo);
353 if (PMIX_SUCCESS != mylock.status) {
354 exit(mylock.status);
355 }
356 DEBUG_DESTRUCT_LOCK(&mylock);
357
358
359 PMIX_CONSTRUCT(&pubdata, pmix_list_t);
360
361
362 x = PMIX_NEW(myxfer_t);
363 rc = PMIx_server_collect_inventory(NULL, 0, infocbfunc, (void*)x);
364 if (PMIX_SUCCESS != rc) {
365 fprintf(stderr, "Collect inventory failed: %s\n", PMIx_Error_string(rc));
366 PMIX_RELEASE(x);
367 exit(1);
368 }
369 DEBUG_WAIT_THREAD(&x->lock);
370 if (PMIX_SUCCESS != x->status) {
371 fprintf(stderr, "Collect inventory failed: %s\n", PMIx_Error_string(x->status));
372 PMIX_RELEASE(x);
373 exit(1);
374 }
375 DEBUG_DESTRUCT_LOCK(&x->lock);
376
377 DEBUG_CONSTRUCT_LOCK(&x->lock);
378 rc = PMIx_server_deliver_inventory(x->info, x->ninfo, NULL, 0, opcbfunc, x);
379 if (PMIX_SUCCESS != rc) {
380 fprintf(stderr, "Deliver inventory failed: %s\n", PMIx_Error_string(rc));
381 PMIX_RELEASE(x);
382 exit(1);
383 }
384 DEBUG_WAIT_THREAD(&x->lock);
385 if (PMIX_SUCCESS != x->status) {
386 fprintf(stderr, "Deliver inventory failed: %s\n", PMIx_Error_string(x->status));
387 PMIX_RELEASE(x);
388 exit(1);
389 }
390 PMIX_RELEASE(x);
391
392
393 PMIX_CONSTRUCT(&children, pmix_list_t);
394 pmix_event_assign(&handler, pmix_globals.evbase, SIGCHLD,
395 EV_SIGNAL|EV_PERSIST,wait_signal_callback, &handler);
396 pmix_event_add(&handler, NULL);
397
398
399 atmp = NULL;
400 for (n=0; n < nprocs; n++) {
401 asprintf(&tmp, "%d", n);
402 pmix_argv_append_nosize(&atmp, tmp);
403 free(tmp);
404 }
405 tmp = pmix_argv_join(atmp, ',');
406 pmix_argv_free(atmp);
407 x = PMIX_NEW(myxfer_t);
408 set_namespace(nprocs, tmp, "foobar", opcbfunc, x);
409
410
411 client_env = pmix_argv_copy(environ);
412 pmix_argv_prepend_nosize(&client_argv, executable);
413
414 wakeup = nprocs;
415 myuid = getuid();
416 mygid = getgid();
417
418
419
420 DEBUG_WAIT_THREAD(&x->lock);
421 free(tmp);
422 PMIX_RELEASE(x);
423
424
425 x = PMIX_NEW(myxfer_t);
426 ninfo = 1;
427 PMIX_INFO_CREATE(info, ninfo);
428
429
430
431
432
433
434
435 darray = (pmix_data_array_t*)malloc(sizeof(pmix_data_array_t));
436 darray->type = PMIX_INFO;
437 darray->size = 4;
438 PMIX_INFO_CREATE(darray->array, darray->size);
439 iarray = (pmix_info_t*)darray->array;
440 PMIX_INFO_LOAD(&iarray[0], PMIX_ALLOC_NETWORK_ID, "my.net.key", PMIX_STRING);
441 PMIX_INFO_LOAD(&iarray[1], PMIX_ALLOC_NETWORK_TYPE, "tcp", PMIX_STRING);
442 PMIX_INFO_LOAD(&iarray[2], PMIX_ALLOC_NETWORK_ENDPTS, &nprocs, PMIX_SIZE);
443 PMIX_INFO_LOAD(&iarray[3], PMIX_ALLOC_NETWORK_SEC_KEY, NULL, PMIX_BOOL);
444
445 PMIX_INFO_LOAD(&info[0], PMIX_ALLOC_NETWORK, darray, PMIX_DATA_ARRAY);
446
447 if (PMIX_SUCCESS != (rc = PMIx_server_setup_application("foobar", info, ninfo, sacbfunc, (void*)x))) {
448 return rc;
449 }
450 DEBUG_WAIT_THREAD(&x->lock);
451 DEBUG_DESTRUCT_LOCK(&x->lock);
452 PMIX_INFO_FREE(info, ninfo);
453
454
455 DEBUG_CONSTRUCT_LOCK(&x->lock);
456 if (PMIX_SUCCESS != (rc = PMIx_server_setup_local_support("foobar", x->info, x->ninfo, opcbfunc, x))) {
457 return rc;
458 }
459 DEBUG_WAIT_THREAD(&x->lock);
460 PMIX_RELEASE(x);
461
462
463 (void)strncpy(proc.nspace, "foobar", PMIX_MAX_NSLEN);
464 for (n = 0; n < nprocs; n++) {
465 proc.rank = n;
466 x = PMIX_NEW(myxfer_t);
467 if (PMIX_SUCCESS != (rc = PMIx_server_register_client(&proc, myuid, mygid,
468 NULL, opcbfunc, x))) {
469 fprintf(stderr, "Server register client failed with error %d\n", rc);
470 PMIx_server_finalize();
471 return rc;
472 }
473
474
475 DEBUG_WAIT_THREAD(&x->lock);
476 PMIX_RELEASE(x);
477 if (PMIX_SUCCESS != (rc = PMIx_server_setup_fork(&proc, &client_env))) {
478 fprintf(stderr, "Server fork setup failed with error %d\n", rc);
479 PMIx_server_finalize();
480 return rc;
481 }
482 pid = fork();
483 if (pid < 0) {
484 fprintf(stderr, "Fork failed\n");
485 PMIx_server_finalize();
486 return -1;
487 }
488 child = PMIX_NEW(wait_tracker_t);
489 child->pid = pid;
490 pmix_list_append(&children, &child->super);
491
492 if (pid == 0) {
493 execve(executable, client_argv, client_env);
494
495 exit(0);
496 }
497 }
498 free(executable);
499 pmix_argv_free(client_argv);
500 pmix_argv_free(client_env);
501
502
503 while (0 < wakeup) {
504 struct timespec ts;
505 ts.tv_sec = 0;
506 ts.tv_nsec = 100000;
507 nanosleep(&ts, NULL);
508 }
509
510
511 n=0;
512 PMIX_LIST_FOREACH(child, &children, wait_tracker_t) {
513 if (0 != child->exit_code) {
514 fprintf(stderr, "Child %d exited with status %d - test FAILED\n", n, child->exit_code);
515 goto done;
516 }
517 ++n;
518 }
519
520
521 x = PMIX_NEW(myxfer_t);
522 PMIx_server_deregister_nspace("foobar", opcbfunc, (void*)x);
523 DEBUG_WAIT_THREAD(&x->lock);
524 PMIX_RELEASE(x);
525
526 done:
527
528 PMIx_Deregister_event_handler(0, NULL, NULL);
529
530
531 PMIX_LIST_DESTRUCT(&pubdata);
532
533
534 PMIX_LIST_DESTRUCT(&children);
535
536
537 if (PMIX_SUCCESS != (rc = PMIx_server_finalize())) {
538 fprintf(stderr, "Finalize failed with error %d\n", rc);
539 exit_code = rc;
540 }
541
542 if (0 == exit_code) {
543 fprintf(stderr, "Test finished OK!\n");
544 } else {
545 fprintf(stderr, "TEST FAILED WITH ERROR %d\n", exit_code);
546 }
547
548 return exit_code;
549 }
550
551 static void set_namespace(int nprocs, char *ranks, char *nspace,
552 pmix_op_cbfunc_t cbfunc, myxfer_t *x)
553 {
554 char *regex, *ppn;
555 char hostname[PMIX_MAXHOSTNAMELEN];
556 int n;
557 pmix_data_array_t *darray;
558 pmix_info_t *info;
559 pmix_rank_t rank;
560 uint16_t lr;
561
562 gethostname(hostname, sizeof(hostname));
563 x->ninfo = 7 + nprocs;
564
565 PMIX_INFO_CREATE(x->info, x->ninfo);
566 (void)strncpy(x->info[0].key, PMIX_UNIV_SIZE, PMIX_MAX_KEYLEN);
567 x->info[0].value.type = PMIX_UINT32;
568 x->info[0].value.data.uint32 = nprocs;
569
570 (void)strncpy(x->info[1].key, PMIX_SPAWNED, PMIX_MAX_KEYLEN);
571 x->info[1].value.type = PMIX_UINT32;
572 x->info[1].value.data.uint32 = 0;
573
574 (void)strncpy(x->info[2].key, PMIX_LOCAL_SIZE, PMIX_MAX_KEYLEN);
575 x->info[2].value.type = PMIX_UINT32;
576 x->info[2].value.data.uint32 = nprocs;
577
578 (void)strncpy(x->info[3].key, PMIX_LOCAL_PEERS, PMIX_MAX_KEYLEN);
579 x->info[3].value.type = PMIX_STRING;
580 x->info[3].value.data.string = strdup(ranks);
581
582 PMIx_generate_regex(hostname, ®ex);
583 (void)strncpy(x->info[4].key, PMIX_NODE_MAP, PMIX_MAX_KEYLEN);
584 x->info[4].value.type = PMIX_STRING;
585 x->info[4].value.data.string = regex;
586
587 PMIx_generate_ppn(ranks, &ppn);
588 (void)strncpy(x->info[5].key, PMIX_PROC_MAP, PMIX_MAX_KEYLEN);
589 x->info[5].value.type = PMIX_STRING;
590 x->info[5].value.data.string = ppn;
591
592 (void)strncpy(x->info[6].key, PMIX_JOB_SIZE, PMIX_MAX_KEYLEN);
593 x->info[6].value.type = PMIX_UINT32;
594 x->info[6].value.data.uint32 = nprocs;
595
596 for (n=0; n < nprocs; n++) {
597 (void)strncpy(x->info[7+n].key, PMIX_PROC_DATA, PMIX_MAX_KEYLEN);
598 x->info[7+n].value.type = PMIX_DATA_ARRAY;
599 darray = (pmix_data_array_t*)malloc(sizeof(pmix_data_array_t));
600 darray->size = 2;
601 darray->type = PMIX_INFO;
602 PMIX_INFO_CREATE(darray->array, 2);
603 info = (pmix_info_t*)darray->array;
604 rank = n;
605 PMIX_INFO_LOAD(&info[0], PMIX_RANK, &rank, PMIX_PROC_RANK);
606 lr = n;
607 PMIX_INFO_LOAD(&info[1], PMIX_LOCAL_RANK, &lr, PMIX_UINT16);
608 x->info[7+n].value.data.darray = darray;
609 }
610
611 PMIx_server_register_nspace(nspace, nprocs, x->info, x->ninfo,
612 cbfunc, x);
613 }
614
615 static void errhandler(size_t evhdlr_registration_id,
616 pmix_status_t status,
617 const pmix_proc_t *source,
618 pmix_info_t info[], size_t ninfo,
619 pmix_info_t results[], size_t nresults,
620 pmix_event_notification_cbfunc_fn_t cbfunc,
621 void *cbdata)
622 {
623 pmix_output(0, "SERVER: ERRHANDLER CALLED WITH STATUS %d", status);
624 }
625
626 static void errhandler_reg_callbk (pmix_status_t status,
627 size_t errhandler_ref,
628 void *cbdata)
629 {
630 mylock_t *lock = (mylock_t*)cbdata;
631
632 pmix_output(0, "SERVER: ERRHANDLER REGISTRATION CALLBACK CALLED WITH STATUS %d, ref=%lu",
633 status, (unsigned long)errhandler_ref);
634 lock->status = status;
635 DEBUG_WAKEUP_THREAD(lock);
636 }
637
638 static pmix_status_t connected(const pmix_proc_t *proc, void *server_object,
639 pmix_op_cbfunc_t cbfunc, void *cbdata)
640 {
641 if (NULL != cbfunc) {
642 cbfunc(PMIX_SUCCESS, cbdata);
643 }
644 return PMIX_SUCCESS;
645 }
646 static pmix_status_t finalized(const pmix_proc_t *proc, void *server_object,
647 pmix_op_cbfunc_t cbfunc, void *cbdata)
648 {
649 pmix_output(0, "SERVER: FINALIZED %s:%d WAKEUP %d",
650 proc->nspace, proc->rank, wakeup);
651
652 if (NULL != cbfunc) {
653 cbfunc(PMIX_SUCCESS, cbdata);
654 }
655 return PMIX_SUCCESS;
656 }
657
658 static void abcbfunc(pmix_status_t status, void *cbdata)
659 {
660 myxfer_t *x = (myxfer_t*)cbdata;
661
662
663 if (NULL != x->cbfunc) {
664 x->cbfunc(status, x->cbdata);
665 }
666 PMIX_RELEASE(x);
667 }
668
669 static pmix_status_t abort_fn(const pmix_proc_t *proc,
670 void *server_object,
671 int status, const char msg[],
672 pmix_proc_t procs[], size_t nprocs,
673 pmix_op_cbfunc_t cbfunc, void *cbdata)
674 {
675 pmix_status_t rc;
676 myxfer_t *x;
677
678 if (NULL != procs) {
679 pmix_output(0, "SERVER: ABORT on %s:%d", procs[0].nspace, procs[0].rank);
680 } else {
681 pmix_output(0, "SERVER: ABORT OF ALL PROCS IN NSPACE %s", proc->nspace);
682 }
683
684
685
686
687
688
689 x = PMIX_NEW(myxfer_t);
690 (void)strncpy(x->caller.nspace, proc->nspace, PMIX_MAX_NSLEN);
691 x->caller.rank = proc->rank;
692
693 PMIX_INFO_CREATE(x->info, 2);
694 (void)strncpy(x->info[0].key, "DARTH", PMIX_MAX_KEYLEN);
695 x->info[0].value.type = PMIX_INT8;
696 x->info[0].value.data.int8 = 12;
697 (void)strncpy(x->info[1].key, "VADER", PMIX_MAX_KEYLEN);
698 x->info[1].value.type = PMIX_DOUBLE;
699 x->info[1].value.data.dval = 12.34;
700 x->cbfunc = cbfunc;
701 x->cbdata = cbdata;
702
703 if (PMIX_SUCCESS != (rc = PMIx_Notify_event(status, &x->caller,
704 PMIX_RANGE_NAMESPACE,
705 x->info, 2,
706 abcbfunc, x))) {
707 pmix_output(0, "SERVER: FAILED NOTIFY ERROR %d", (int)rc);
708 }
709
710 return PMIX_SUCCESS;
711 }
712
713
714 static pmix_status_t fencenb_fn(const pmix_proc_t procs[], size_t nprocs,
715 const pmix_info_t info[], size_t ninfo,
716 char *data, size_t ndata,
717 pmix_modex_cbfunc_t cbfunc, void *cbdata)
718 {
719 pmix_output(0, "SERVER: FENCENB");
720
721 if (NULL != cbfunc) {
722 cbfunc(PMIX_SUCCESS, data, ndata, cbdata, NULL, NULL);
723 }
724 return PMIX_SUCCESS;
725 }
726
727
728 static pmix_status_t dmodex_fn(const pmix_proc_t *proc,
729 const pmix_info_t info[], size_t ninfo,
730 pmix_modex_cbfunc_t cbfunc, void *cbdata)
731 {
732 pmix_output(0, "SERVER: DMODEX");
733
734
735 if (istimeouttest) {
736 return PMIX_SUCCESS;
737 }
738
739
740
741 if (NULL != cbfunc) {
742 cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, cbdata, NULL, NULL);
743 }
744 return PMIX_SUCCESS;
745 }
746
747
748 static pmix_status_t publish_fn(const pmix_proc_t *proc,
749 const pmix_info_t info[], size_t ninfo,
750 pmix_op_cbfunc_t cbfunc, void *cbdata)
751 {
752 pmix_locdat_t *p;
753 size_t n;
754
755 pmix_output(0, "SERVER: PUBLISH");
756
757 for (n=0; n < ninfo; n++) {
758 p = PMIX_NEW(pmix_locdat_t);
759 (void)strncpy(p->pdata.proc.nspace, proc->nspace, PMIX_MAX_NSLEN);
760 p->pdata.proc.rank = proc->rank;
761 (void)strncpy(p->pdata.key, info[n].key, PMIX_MAX_KEYLEN);
762 pmix_value_xfer(&p->pdata.value, (pmix_value_t*)&info[n].value);
763 pmix_list_append(&pubdata, &p->super);
764 }
765 if (NULL != cbfunc) {
766 cbfunc(PMIX_SUCCESS, cbdata);
767 }
768 return PMIX_SUCCESS;
769 }
770
771
772 static pmix_status_t lookup_fn(const pmix_proc_t *proc, char **keys,
773 const pmix_info_t info[], size_t ninfo,
774 pmix_lookup_cbfunc_t cbfunc, void *cbdata)
775 {
776 pmix_locdat_t *p, *p2;
777 pmix_list_t results;
778 size_t i, n;
779 pmix_pdata_t *pd = NULL;
780 pmix_status_t ret = PMIX_ERR_NOT_FOUND;
781
782 pmix_output(0, "SERVER: LOOKUP");
783
784 PMIX_CONSTRUCT(&results, pmix_list_t);
785
786 for (n=0; NULL != keys[n]; n++) {
787 PMIX_LIST_FOREACH(p, &pubdata, pmix_locdat_t) {
788 if (0 == strncmp(keys[n], p->pdata.key, PMIX_MAX_KEYLEN)) {
789 p2 = PMIX_NEW(pmix_locdat_t);
790 (void)strncpy(p2->pdata.proc.nspace, p->pdata.proc.nspace, PMIX_MAX_NSLEN);
791 p2->pdata.proc.rank = p->pdata.proc.rank;
792 (void)strncpy(p2->pdata.key, p->pdata.key, PMIX_MAX_KEYLEN);
793 pmix_value_xfer(&p2->pdata.value, &p->pdata.value);
794 pmix_list_append(&results, &p2->super);
795 break;
796 }
797 }
798 }
799 if (0 < (n = pmix_list_get_size(&results))) {
800 ret = PMIX_SUCCESS;
801 PMIX_PDATA_CREATE(pd, n);
802 for (i=0; i < n; i++) {
803 p = (pmix_locdat_t*)pmix_list_remove_first(&results);
804 if (p) {
805 (void)strncpy(pd[i].proc.nspace, p->pdata.proc.nspace, PMIX_MAX_NSLEN);
806 pd[i].proc.rank = p->pdata.proc.rank;
807 (void)strncpy(pd[i].key, p->pdata.key, PMIX_MAX_KEYLEN);
808 pmix_value_xfer(&pd[i].value, &p->pdata.value);
809 }
810 }
811 }
812 PMIX_LIST_DESTRUCT(&results);
813 if (NULL != cbfunc) {
814 cbfunc(ret, pd, n, cbdata);
815 }
816 if (0 < n) {
817 PMIX_PDATA_FREE(pd, n);
818 }
819 return PMIX_SUCCESS;
820 }
821
822
823 static pmix_status_t unpublish_fn(const pmix_proc_t *proc, char **keys,
824 const pmix_info_t info[], size_t ninfo,
825 pmix_op_cbfunc_t cbfunc, void *cbdata)
826 {
827 pmix_locdat_t *p, *p2;
828 size_t n;
829
830 pmix_output(0, "SERVER: UNPUBLISH");
831
832 for (n=0; NULL != keys[n]; n++) {
833 PMIX_LIST_FOREACH_SAFE(p, p2, &pubdata, pmix_locdat_t) {
834 if (0 == strncmp(keys[n], p->pdata.key, PMIX_MAX_KEYLEN)) {
835 pmix_list_remove_item(&pubdata, &p->super);
836 PMIX_RELEASE(p);
837 break;
838 }
839 }
840 }
841 if (NULL != cbfunc) {
842 cbfunc(PMIX_SUCCESS, cbdata);
843 }
844 return PMIX_SUCCESS;
845 }
846
847 static void spcbfunc(pmix_status_t status, void *cbdata)
848 {
849 myxfer_t *x = (myxfer_t*)cbdata;
850
851 if (NULL != x->spcbfunc) {
852 x->spcbfunc(PMIX_SUCCESS, "DYNSPACE", x->cbdata);
853 }
854 }
855
856 static pmix_status_t spawn_fn(const pmix_proc_t *proc,
857 const pmix_info_t job_info[], size_t ninfo,
858 const pmix_app_t apps[], size_t napps,
859 pmix_spawn_cbfunc_t cbfunc, void *cbdata)
860 {
861 myxfer_t *x;
862 size_t n;
863 pmix_proc_t *pptr;
864 bool spawned;
865
866 pmix_output(0, "SERVER: SPAWN");
867
868
869 for (n=0; n < ninfo; n++) {
870 if (0 == strncmp(job_info[n].key, PMIX_PARENT_ID, PMIX_MAX_KEYLEN)) {
871 pptr = job_info[n].value.data.proc;
872 pmix_output(0, "SPAWN: Parent ID %s:%d", pptr->nspace, pptr->rank);
873 } else if (0 == strncmp(job_info[n].key, PMIX_SPAWNED, PMIX_MAX_KEYLEN)) {
874 spawned = PMIX_INFO_TRUE(&job_info[n]);
875 pmix_output(0, "SPAWN: Spawned %s", spawned ? "TRUE" : "FALSE");
876 }
877 }
878
879
880
881
882
883
884
885
886 x = PMIX_NEW(myxfer_t);
887 x->spcbfunc = cbfunc;
888 x->cbdata = cbdata;
889
890 set_namespace(2, "0,1", "DYNSPACE", spcbfunc, x);
891
892 return PMIX_SUCCESS;
893 }
894
895 static int numconnects = 0;
896
897 static pmix_status_t connect_fn(const pmix_proc_t procs[], size_t nprocs,
898 const pmix_info_t info[], size_t ninfo,
899 pmix_op_cbfunc_t cbfunc, void *cbdata)
900 {
901 pmix_output(0, "SERVER: CONNECT");
902
903
904
905
906 numconnects++;
907
908 if (NULL != cbfunc) {
909 cbfunc(PMIX_SUCCESS, cbdata);
910 }
911
912 return PMIX_SUCCESS;
913 }
914
915
916 static pmix_status_t disconnect_fn(const pmix_proc_t procs[], size_t nprocs,
917 const pmix_info_t info[], size_t ninfo,
918 pmix_op_cbfunc_t cbfunc, void *cbdata)
919 {
920 pmix_output(0, "SERVER: DISCONNECT");
921
922
923
924
925 if (NULL != cbfunc) {
926 cbfunc(PMIX_SUCCESS, cbdata);
927 }
928
929 return PMIX_SUCCESS;
930 }
931
932 static pmix_status_t register_event_fn(pmix_status_t *codes, size_t ncodes,
933 const pmix_info_t info[], size_t ninfo,
934 pmix_op_cbfunc_t cbfunc, void *cbdata)
935 {
936 if (NULL != cbfunc) {
937 cbfunc(PMIX_SUCCESS, cbdata);
938 }
939 return PMIX_SUCCESS;
940 }
941
942 static pmix_status_t deregister_events(pmix_status_t *codes, size_t ncodes,
943 pmix_op_cbfunc_t cbfunc, void *cbdata)
944 {
945 return PMIX_SUCCESS;
946 }
947
948 static pmix_status_t notify_event(pmix_status_t code,
949 const pmix_proc_t *source,
950 pmix_data_range_t range,
951 pmix_info_t info[], size_t ninfo,
952 pmix_op_cbfunc_t cbfunc, void *cbdata)
953 {
954 return PMIX_SUCCESS;
955 }
956
957 typedef struct query_data_t {
958 pmix_info_t *data;
959 size_t ndata;
960 } query_data_t;
961
962 static pmix_status_t query_fn(pmix_proc_t *proct,
963 pmix_query_t *queries, size_t nqueries,
964 pmix_info_cbfunc_t cbfunc,
965 void *cbdata)
966 {
967 size_t n;
968 pmix_info_t *info;
969
970 pmix_output(0, "SERVER: QUERY");
971
972 if (NULL == cbfunc) {
973 return PMIX_ERROR;
974 }
975
976 PMIX_INFO_CREATE(info, nqueries);
977 for (n=0; n < nqueries; n++) {
978 pmix_output(0, "\tKey: %s", queries[n].keys[0]);
979 (void)strncpy(info[n].key, queries[n].keys[0], PMIX_MAX_KEYLEN);
980 info[n].value.type = PMIX_STRING;
981 if (0 > asprintf(&info[n].value.data.string, "%d", (int)n)) {
982 return PMIX_ERROR;
983 }
984 }
985 cbfunc(PMIX_SUCCESS, info, nqueries, cbdata, NULL, NULL);
986 return PMIX_SUCCESS;
987 }
988
989 static void tool_connect_fn(pmix_info_t *info, size_t ninfo,
990 pmix_tool_connection_cbfunc_t cbfunc,
991 void *cbdata)
992 {
993 pmix_proc_t proc;
994
995 pmix_output(0, "SERVER: TOOL CONNECT");
996
997
998 (void)strncpy(proc.nspace, "TOOL", PMIX_MAX_NSLEN);
999 proc.rank = 0;
1000
1001 if (NULL != cbfunc) {
1002 cbfunc(PMIX_SUCCESS, &proc, cbdata);
1003 }
1004 }
1005
1006 static void log_fn(const pmix_proc_t *client,
1007 const pmix_info_t data[], size_t ndata,
1008 const pmix_info_t directives[], size_t ndirs,
1009 pmix_op_cbfunc_t cbfunc, void *cbdata)
1010 {
1011 pmix_output(0, "SERVER: LOG");
1012
1013 if (NULL != cbfunc) {
1014 cbfunc(PMIX_SUCCESS, cbdata);
1015 }
1016 }
1017
1018 static void wait_signal_callback(int fd, short event, void *arg)
1019 {
1020 pmix_event_t *sig = (pmix_event_t*) arg;
1021 int status;
1022 pid_t pid;
1023 wait_tracker_t *t2;
1024
1025 if (SIGCHLD != pmix_event_get_signal(sig)) {
1026 return;
1027 }
1028
1029
1030
1031
1032 while (1) {
1033 pid = waitpid(-1, &status, WNOHANG);
1034 if (-1 == pid && EINTR == errno) {
1035
1036 continue;
1037 }
1038
1039 if (pid <= 0) {
1040 return;
1041 }
1042
1043
1044 PMIX_LIST_FOREACH(t2, &children, wait_tracker_t) {
1045 if (pid == t2->pid) {
1046 t2->exit_code = status;
1047
1048 if (0 != status && 0 == exit_code) {
1049 exit_code = status;
1050 }
1051 --wakeup;
1052 break;
1053 }
1054 }
1055 }
1056 }