root/opal/mca/pmix/pmix4x/pmix/test/simple/gwtest.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. xfcon
  2. xfdes
  3. opcbfunc
  4. sacbfunc
  5. infocbfunc
  6. main
  7. set_namespace
  8. errhandler
  9. errhandler_reg_callbk
  10. connected
  11. finalized
  12. abcbfunc
  13. abort_fn
  14. fencenb_fn
  15. dmodex_fn
  16. publish_fn
  17. lookup_fn
  18. unpublish_fn
  19. spcbfunc
  20. spawn_fn
  21. connect_fn
  22. disconnect_fn
  23. register_event_fn
  24. deregister_events
  25. notify_event
  26. query_fn
  27. tool_connect_fn
  28. log_fn
  29. wait_signal_callback

   1 /*
   2  * Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2011 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2006-2013 Los Alamos National Security, LLC.
  13  *                         All rights reserved.
  14  * Copyright (c) 2009-2012 Cisco Systems, Inc.  All rights reserved.
  15  * Copyright (c) 2011      Oak Ridge National Labs.  All rights reserved.
  16  * Copyright (c) 2013-2019 Intel, Inc.  All rights reserved.
  17  * Copyright (c) 2015      Research Organization for Information Science
  18  *                         and Technology (RIST). All rights reserved.
  19  * Copyright (c) 2016      IBM Corporation.  All rights reserved.
  20  * $COPYRIGHT$
  21  *
  22  * Additional copyrights may follow
  23  *
  24  * $HEADER$
  25  *
  26  */
  27 
  28 #include <src/include/pmix_config.h>
  29 #include <pmix_server.h>
  30 #include <src/include/types.h>
  31 #include <src/include/pmix_globals.h>
  32 
  33 #include <stdio.h>
  34 #include <stdlib.h>
  35 #include <unistd.h>
  36 #include <time.h>
  37 #include <sys/types.h>
  38 #include <sys/wait.h>
  39 #include <errno.h>
  40 #include <signal.h>
  41 
  42 #include "src/class/pmix_list.h"
  43 #include "src/util/pmix_environ.h"
  44 #include "src/util/output.h"
  45 #include "src/util/printf.h"
  46 #include "src/util/argv.h"
  47 
  48 static pmix_status_t connected(const pmix_proc_t *proc, void *server_object,
  49                                pmix_op_cbfunc_t cbfunc, void *cbdata);
  50 static pmix_status_t finalized(const pmix_proc_t *proc, void *server_object,
  51                                pmix_op_cbfunc_t cbfunc, void *cbdata);
  52 static pmix_status_t abort_fn(const pmix_proc_t *proc, void *server_object,
  53                               int status, const char msg[],
  54                               pmix_proc_t procs[], size_t nprocs,
  55                               pmix_op_cbfunc_t cbfunc, void *cbdata);
  56 static pmix_status_t fencenb_fn(const pmix_proc_t procs[], size_t nprocs,
  57                                 const pmix_info_t info[], size_t ninfo,
  58                                 char *data, size_t ndata,
  59                                 pmix_modex_cbfunc_t cbfunc, void *cbdata);
  60 static pmix_status_t dmodex_fn(const pmix_proc_t *proc,
  61                                const pmix_info_t info[], size_t ninfo,
  62                                pmix_modex_cbfunc_t cbfunc, void *cbdata);
  63 static pmix_status_t publish_fn(const pmix_proc_t *proc,
  64                                 const pmix_info_t info[], size_t ninfo,
  65                                 pmix_op_cbfunc_t cbfunc, void *cbdata);
  66 static pmix_status_t lookup_fn(const pmix_proc_t *proc, char **keys,
  67                                const pmix_info_t info[], size_t ninfo,
  68                                pmix_lookup_cbfunc_t cbfunc, void *cbdata);
  69 static pmix_status_t unpublish_fn(const pmix_proc_t *proc, char **keys,
  70                                   const pmix_info_t info[], size_t ninfo,
  71                                   pmix_op_cbfunc_t cbfunc, void *cbdata);
  72 static pmix_status_t spawn_fn(const pmix_proc_t *proc,
  73                               const pmix_info_t job_info[], size_t ninfo,
  74                               const pmix_app_t apps[], size_t napps,
  75                               pmix_spawn_cbfunc_t cbfunc, void *cbdata);
  76 static pmix_status_t connect_fn(const pmix_proc_t procs[], size_t nprocs,
  77                                 const pmix_info_t info[], size_t ninfo,
  78                                 pmix_op_cbfunc_t cbfunc, void *cbdata);
  79 static pmix_status_t disconnect_fn(const pmix_proc_t procs[], size_t nprocs,
  80                                    const pmix_info_t info[], size_t ninfo,
  81                                    pmix_op_cbfunc_t cbfunc, void *cbdata);
  82 static pmix_status_t register_event_fn(pmix_status_t *codes, size_t ncodes,
  83                                        const pmix_info_t info[], size_t ninfo,
  84                                        pmix_op_cbfunc_t cbfunc, void *cbdata);
  85 static pmix_status_t deregister_events(pmix_status_t *codes, size_t ncodes,
  86                                        pmix_op_cbfunc_t cbfunc, void *cbdata);
  87 static pmix_status_t notify_event(pmix_status_t code,
  88                                   const pmix_proc_t *source,
  89                                   pmix_data_range_t range,
  90                                   pmix_info_t info[], size_t ninfo,
  91                                   pmix_op_cbfunc_t cbfunc, void *cbdata);
  92 static pmix_status_t query_fn(pmix_proc_t *proct,
  93                               pmix_query_t *queries, size_t nqueries,
  94                               pmix_info_cbfunc_t cbfunc,
  95                               void *cbdata);
  96 static void tool_connect_fn(pmix_info_t *info, size_t ninfo,
  97                             pmix_tool_connection_cbfunc_t cbfunc,
  98                             void *cbdata);
  99 static void log_fn(const pmix_proc_t *client,
 100                    const pmix_info_t data[], size_t ndata,
 101                    const pmix_info_t directives[], size_t ndirs,
 102                    pmix_op_cbfunc_t cbfunc, void *cbdata);
 103 
 104 static pmix_server_module_t mymodule = {
 105     .client_connected = connected,
 106     .client_finalized = finalized,
 107     .abort = abort_fn,
 108     .fence_nb = fencenb_fn,
 109     .direct_modex = dmodex_fn,
 110     .publish = publish_fn,
 111     .lookup = lookup_fn,
 112     .unpublish = unpublish_fn,
 113     .spawn = spawn_fn,
 114     .connect = connect_fn,
 115     .disconnect = disconnect_fn,
 116     .register_events = register_event_fn,
 117     .deregister_events = deregister_events,
 118     .notify_event = notify_event,
 119     .query = query_fn,
 120     .tool_connected = tool_connect_fn,
 121     .log = log_fn
 122 };
 123 
 124 typedef struct {
 125     pthread_mutex_t mutex;
 126     pthread_cond_t cond;
 127     volatile bool active;
 128     pmix_status_t status;
 129 } mylock_t;
 130 
 131 #define DEBUG_CONSTRUCT_LOCK(l)                     \
 132     do {                                            \
 133         pthread_mutex_init(&(l)->mutex, NULL);      \
 134         pthread_cond_init(&(l)->cond, NULL);        \
 135         (l)->active = true;                         \
 136         (l)->status = PMIX_SUCCESS;                 \
 137     } while(0)
 138 
 139 #define DEBUG_DESTRUCT_LOCK(l)              \
 140     do {                                    \
 141         pthread_mutex_destroy(&(l)->mutex); \
 142         pthread_cond_destroy(&(l)->cond);   \
 143     } while(0)
 144 
 145 #define DEBUG_WAIT_THREAD(lck)                                      \
 146     do {                                                            \
 147         pthread_mutex_lock(&(lck)->mutex);                          \
 148         while ((lck)->active) {                                     \
 149             pthread_cond_wait(&(lck)->cond, &(lck)->mutex);         \
 150         }                                                           \
 151         pthread_mutex_unlock(&(lck)->mutex);                        \
 152     } while(0)
 153 
 154 #define DEBUG_WAKEUP_THREAD(lck)                        \
 155     do {                                                \
 156         pthread_mutex_lock(&(lck)->mutex);              \
 157         (lck)->active = false;                          \
 158         pthread_cond_broadcast(&(lck)->cond);           \
 159         pthread_mutex_unlock(&(lck)->mutex);            \
 160     } while(0)
 161 
 162 typedef struct {
 163     pmix_list_item_t super;
 164     pmix_pdata_t pdata;
 165 } pmix_locdat_t;
 166 PMIX_CLASS_INSTANCE(pmix_locdat_t,
 167                     pmix_list_item_t,
 168                     NULL, NULL);
 169 
 170 typedef struct {
 171     pmix_object_t super;
 172     mylock_t lock;
 173     pmix_status_t status;
 174     pmix_proc_t caller;
 175     pmix_info_t *info;
 176     size_t ninfo;
 177     pmix_op_cbfunc_t cbfunc;
 178     pmix_spawn_cbfunc_t spcbfunc;
 179     void *cbdata;
 180 } myxfer_t;
 181 static void xfcon(myxfer_t *p)
 182 {
 183     DEBUG_CONSTRUCT_LOCK(&p->lock);
 184     p->info = NULL;
 185     p->ninfo = 0;
 186     p->cbfunc = NULL;
 187     p->spcbfunc = NULL;
 188     p->cbdata = NULL;
 189 }
 190 static void xfdes(myxfer_t *p)
 191 {
 192     DEBUG_DESTRUCT_LOCK(&p->lock);
 193     if (NULL != p->info) {
 194         PMIX_INFO_FREE(p->info, p->ninfo);
 195     }
 196 }
 197 PMIX_CLASS_INSTANCE(myxfer_t,
 198                     pmix_object_t,
 199                     xfcon, xfdes);
 200 
 201 typedef struct {
 202     pmix_list_item_t super;
 203     int exit_code;
 204     pid_t pid;
 205 } wait_tracker_t;
 206 PMIX_CLASS_INSTANCE(wait_tracker_t,
 207                     pmix_list_item_t,
 208                     NULL, NULL);
 209 
 210 static volatile int wakeup;
 211 static int exit_code = 0;
 212 static pmix_list_t pubdata;
 213 static pmix_event_t handler;
 214 static pmix_list_t children;
 215 static bool istimeouttest = false;
 216 
 217 static void set_namespace(int nprocs, char *ranks, char *nspace,
 218                           pmix_op_cbfunc_t cbfunc, myxfer_t *x);
 219 static void errhandler(size_t evhdlr_registration_id,
 220                        pmix_status_t status,
 221                        const pmix_proc_t *source,
 222                        pmix_info_t info[], size_t ninfo,
 223                        pmix_info_t results[], size_t nresults,
 224                        pmix_event_notification_cbfunc_fn_t cbfunc,
 225                        void *cbdata);
 226 static void wait_signal_callback(int fd, short event, void *arg);
 227 static void errhandler_reg_callbk (pmix_status_t status,
 228                                    size_t errhandler_ref,
 229                                    void *cbdata);
 230 
 231 static void opcbfunc(pmix_status_t status, void *cbdata)
 232 {
 233     myxfer_t *x = (myxfer_t*)cbdata;
 234 
 235     x->status = status;
 236     /* release the caller, if necessary */
 237     if (NULL != x->cbfunc) {
 238         x->cbfunc(PMIX_SUCCESS, x->cbdata);
 239     }
 240     DEBUG_WAKEUP_THREAD(&x->lock);
 241 }
 242 
 243 static void sacbfunc(pmix_status_t status,
 244                      pmix_info_t info[], size_t ninfo,
 245                      void *provided_cbdata,
 246                      pmix_op_cbfunc_t cbfunc, void *cbdata)
 247 {
 248     myxfer_t *x = (myxfer_t*)provided_cbdata;
 249     size_t n;
 250 
 251     x->status = status;
 252     if (NULL != info) {
 253         x->ninfo = ninfo;
 254         PMIX_INFO_CREATE(x->info, x->ninfo);
 255         for (n=0; n < ninfo; n++) {
 256             /* copy the data across */
 257             PMIX_INFO_XFER(&x->info[n], &info[n]);
 258         }
 259     }
 260     if (NULL != cbfunc) {
 261         cbfunc(PMIX_SUCCESS, cbdata);
 262     }
 263     DEBUG_WAKEUP_THREAD(&x->lock);
 264 }
 265 
 266 static void infocbfunc(pmix_status_t status,
 267                        pmix_info_t *info, size_t ninfo,
 268                        void *cbdata,
 269                        pmix_release_cbfunc_t release_fn,
 270                        void *release_cbdata)
 271 {
 272     myxfer_t *x = (myxfer_t*)cbdata;
 273     size_t n;
 274 
 275     x->status = status;
 276     if (NULL != info) {
 277         x->ninfo = ninfo;
 278         PMIX_INFO_CREATE(x->info, x->ninfo);
 279         for (n=0; n < ninfo; n++) {
 280             /* copy the data across */
 281             PMIX_INFO_XFER(&x->info[n], &info[n]);
 282         }
 283     }
 284     if (NULL != release_fn) {
 285         release_fn(release_cbdata);
 286     }
 287     DEBUG_WAKEUP_THREAD(&x->lock);
 288 }
 289 
 290 int main(int argc, char **argv)
 291 {
 292     char **client_env=NULL;
 293     char **client_argv=NULL;
 294     char *tmp, **atmp, *executable=NULL;
 295     int rc, nprocs=1, n;
 296     uid_t myuid;
 297     gid_t mygid;
 298     pid_t pid;
 299     myxfer_t *x;
 300     pmix_proc_t proc;
 301     wait_tracker_t *child;
 302     pmix_info_t *info;
 303     size_t ninfo;
 304     mylock_t mylock;
 305     pmix_data_array_t *darray;
 306     pmix_info_t *iarray;
 307 
 308     /* smoke test */
 309     if (PMIX_SUCCESS != 0) {
 310         fprintf(stderr, "ERROR IN COMPUTING CONSTANTS: PMIX_SUCCESS = %d\n", PMIX_SUCCESS);
 311         exit(1);
 312     }
 313 
 314     fprintf(stderr, "GW[%d]: Testing version %s\n", (int)getpid(), PMIx_Get_version());
 315 
 316     /* see if we were passed the number of procs to run or
 317      * the executable to use */
 318     for (n=1; n < argc; n++) {
 319         if (0 == strcmp("-n", argv[n]) &&
 320             NULL != argv[n+1]) {
 321             nprocs = strtol(argv[n+1], NULL, 10);
 322             ++n;  // step over the argument
 323         } else if (0 == strcmp("-h", argv[n])) {
 324             /* print the options and exit */
 325             fprintf(stderr, "usage: simptest <options>\n");
 326             fprintf(stderr, "    -n N     Number of clients to run\n");
 327             exit(0);
 328         }
 329     }
 330     executable = strdup("./gwclient");
 331 
 332     /* setup the server library and tell it to support tool connections */
 333     ninfo = 3;
 334     PMIX_INFO_CREATE(info, ninfo);
 335     PMIX_INFO_LOAD(&info[0], PMIX_SERVER_TOOL_SUPPORT, NULL, PMIX_BOOL);
 336     PMIX_INFO_LOAD(&info[1], PMIX_USOCK_DISABLE, NULL, PMIX_BOOL);
 337     PMIX_INFO_LOAD(&info[2], PMIX_SERVER_GATEWAY, NULL, PMIX_BOOL);
 338     if (PMIX_SUCCESS != (rc = PMIx_server_init(&mymodule, info, ninfo))) {
 339         fprintf(stderr, "Init failed with error %d\n", rc);
 340         return rc;
 341     }
 342     PMIX_INFO_FREE(info, ninfo);
 343 
 344     /* register the default errhandler */
 345     DEBUG_CONSTRUCT_LOCK(&mylock);
 346     ninfo = 1;
 347     PMIX_INFO_CREATE(info, ninfo);
 348     PMIX_INFO_LOAD(&info[0], PMIX_EVENT_HDLR_NAME, "GWTEST-DEFAULT", PMIX_STRING);
 349     PMIx_Register_event_handler(NULL, 0, info, ninfo,
 350                                 errhandler, errhandler_reg_callbk, (void*)&mylock);
 351     DEBUG_WAIT_THREAD(&mylock);
 352     PMIX_INFO_FREE(info, ninfo);
 353     if (PMIX_SUCCESS != mylock.status) {
 354         exit(mylock.status);
 355     }
 356     DEBUG_DESTRUCT_LOCK(&mylock);
 357 
 358     /* setup the pub data, in case it is used */
 359     PMIX_CONSTRUCT(&pubdata, pmix_list_t);
 360 
 361     /* collect the inventory */
 362     x = PMIX_NEW(myxfer_t);
 363     rc = PMIx_server_collect_inventory(NULL, 0, infocbfunc, (void*)x);
 364     if (PMIX_SUCCESS != rc) {
 365         fprintf(stderr, "Collect inventory failed: %s\n", PMIx_Error_string(rc));
 366         PMIX_RELEASE(x);
 367         exit(1);
 368     }
 369     DEBUG_WAIT_THREAD(&x->lock);
 370     if (PMIX_SUCCESS != x->status) {
 371         fprintf(stderr, "Collect inventory failed: %s\n", PMIx_Error_string(x->status));
 372         PMIX_RELEASE(x);
 373         exit(1);
 374     }
 375     DEBUG_DESTRUCT_LOCK(&x->lock);
 376     /* pass the info down */
 377     DEBUG_CONSTRUCT_LOCK(&x->lock);
 378     rc = PMIx_server_deliver_inventory(x->info, x->ninfo, NULL, 0, opcbfunc, x);
 379     if (PMIX_SUCCESS != rc) {
 380         fprintf(stderr, "Deliver inventory failed: %s\n", PMIx_Error_string(rc));
 381         PMIX_RELEASE(x);
 382         exit(1);
 383     }
 384     DEBUG_WAIT_THREAD(&x->lock);
 385     if (PMIX_SUCCESS != x->status) {
 386         fprintf(stderr, "Deliver inventory failed: %s\n", PMIx_Error_string(x->status));
 387         PMIX_RELEASE(x);
 388         exit(1);
 389     }
 390     PMIX_RELEASE(x);
 391 
 392     /* setup to see sigchld on the forked tests */
 393     PMIX_CONSTRUCT(&children, pmix_list_t);
 394     pmix_event_assign(&handler, pmix_globals.evbase, SIGCHLD,
 395                       EV_SIGNAL|EV_PERSIST,wait_signal_callback, &handler);
 396     pmix_event_add(&handler, NULL);
 397 
 398     /* we have a single namespace for all clients */
 399     atmp = NULL;
 400     for (n=0; n < nprocs; n++) {
 401         asprintf(&tmp, "%d", n);
 402         pmix_argv_append_nosize(&atmp, tmp);
 403         free(tmp);
 404     }
 405     tmp = pmix_argv_join(atmp, ',');
 406     pmix_argv_free(atmp);
 407     x = PMIX_NEW(myxfer_t);
 408     set_namespace(nprocs, tmp, "foobar", opcbfunc, x);
 409 
 410     /* set common argv and env */
 411     client_env = pmix_argv_copy(environ);
 412     pmix_argv_prepend_nosize(&client_argv, executable);
 413 
 414     wakeup = nprocs;
 415     myuid = getuid();
 416     mygid = getgid();
 417 
 418     /* if the nspace registration hasn't completed yet,
 419      * wait for it here */
 420     DEBUG_WAIT_THREAD(&x->lock);
 421     free(tmp);
 422     PMIX_RELEASE(x);
 423 
 424     /* collect the launch blob */
 425     x = PMIX_NEW(myxfer_t);
 426     ninfo = 1;
 427     PMIX_INFO_CREATE(info, ninfo);
 428     /* the 2nd info is going to carry our network allocation request
 429      * consisting of:
 430      *
 431      * PMIX_ALLOC_NETWORK_ID - caller-provided key for resulting allocation
 432      * PMIX_ALLOC_NETWORK_TYPE - type of network whose resources we want
 433      * PMIX_ALLOC_NETWORK_ENDPTS - number of endpoints from that network
 434      */
 435     darray = (pmix_data_array_t*)malloc(sizeof(pmix_data_array_t));
 436     darray->type = PMIX_INFO;
 437     darray->size = 4;
 438     PMIX_INFO_CREATE(darray->array, darray->size);
 439     iarray = (pmix_info_t*)darray->array;
 440     PMIX_INFO_LOAD(&iarray[0], PMIX_ALLOC_NETWORK_ID, "my.net.key", PMIX_STRING);
 441     PMIX_INFO_LOAD(&iarray[1], PMIX_ALLOC_NETWORK_TYPE, "tcp", PMIX_STRING);
 442     PMIX_INFO_LOAD(&iarray[2], PMIX_ALLOC_NETWORK_ENDPTS, &nprocs, PMIX_SIZE);
 443     PMIX_INFO_LOAD(&iarray[3], PMIX_ALLOC_NETWORK_SEC_KEY, NULL, PMIX_BOOL);
 444     /* now load the array */
 445     PMIX_INFO_LOAD(&info[0], PMIX_ALLOC_NETWORK, darray, PMIX_DATA_ARRAY);
 446 
 447     if (PMIX_SUCCESS != (rc = PMIx_server_setup_application("foobar", info, ninfo, sacbfunc, (void*)x))) {
 448         return rc;
 449     }
 450     DEBUG_WAIT_THREAD(&x->lock);
 451     DEBUG_DESTRUCT_LOCK(&x->lock);
 452     PMIX_INFO_FREE(info, ninfo);
 453 
 454     /* pass any returned data down */
 455     DEBUG_CONSTRUCT_LOCK(&x->lock);
 456     if (PMIX_SUCCESS != (rc = PMIx_server_setup_local_support("foobar", x->info, x->ninfo, opcbfunc, x))) {
 457         return rc;
 458     }
 459     DEBUG_WAIT_THREAD(&x->lock);
 460     PMIX_RELEASE(x);
 461 
 462     /* fork/exec the test */
 463     (void)strncpy(proc.nspace, "foobar", PMIX_MAX_NSLEN);
 464     for (n = 0; n < nprocs; n++) {
 465         proc.rank = n;
 466         x = PMIX_NEW(myxfer_t);
 467         if (PMIX_SUCCESS != (rc = PMIx_server_register_client(&proc, myuid, mygid,
 468                                                               NULL, opcbfunc, x))) {
 469             fprintf(stderr, "Server register client failed with error %d\n", rc);
 470             PMIx_server_finalize();
 471             return rc;
 472         }
 473         /* don't fork/exec the client until we know it is registered
 474          * so we avoid a potential race condition in the server */
 475         DEBUG_WAIT_THREAD(&x->lock);
 476         PMIX_RELEASE(x);
 477         if (PMIX_SUCCESS != (rc = PMIx_server_setup_fork(&proc, &client_env))) {//n
 478             fprintf(stderr, "Server fork setup failed with error %d\n", rc);
 479             PMIx_server_finalize();
 480             return rc;
 481         }
 482         pid = fork();
 483         if (pid < 0) {
 484             fprintf(stderr, "Fork failed\n");
 485             PMIx_server_finalize();
 486             return -1;
 487         }
 488         child = PMIX_NEW(wait_tracker_t);
 489         child->pid = pid;
 490         pmix_list_append(&children, &child->super);
 491 
 492         if (pid == 0) {
 493             execve(executable, client_argv, client_env);
 494             /* Does not return */
 495             exit(0);
 496         }
 497     }
 498     free(executable);
 499     pmix_argv_free(client_argv);
 500     pmix_argv_free(client_env);
 501 
 502     /* hang around until the client(s) finalize */
 503     while (0 < wakeup) {
 504         struct timespec ts;
 505         ts.tv_sec = 0;
 506         ts.tv_nsec = 100000;
 507         nanosleep(&ts, NULL);
 508     }
 509 
 510     /* see if anyone exited with non-zero status */
 511     n=0;
 512     PMIX_LIST_FOREACH(child, &children, wait_tracker_t) {
 513         if (0 != child->exit_code) {
 514             fprintf(stderr, "Child %d exited with status %d - test FAILED\n", n, child->exit_code);
 515             goto done;
 516         }
 517         ++n;
 518     }
 519 
 520     /* deregister the nspace */
 521     x = PMIX_NEW(myxfer_t);
 522     PMIx_server_deregister_nspace("foobar", opcbfunc, (void*)x);
 523     DEBUG_WAIT_THREAD(&x->lock);
 524     PMIX_RELEASE(x);
 525 
 526   done:
 527     /* deregister the event handlers */
 528     PMIx_Deregister_event_handler(0, NULL, NULL);
 529 
 530     /* release any pub data */
 531     PMIX_LIST_DESTRUCT(&pubdata);
 532 
 533     /* release the child tracker */
 534     PMIX_LIST_DESTRUCT(&children);
 535 
 536     /* finalize the server library */
 537     if (PMIX_SUCCESS != (rc = PMIx_server_finalize())) {
 538         fprintf(stderr, "Finalize failed with error %d\n", rc);
 539         exit_code = rc;
 540     }
 541 
 542     if (0 == exit_code) {
 543         fprintf(stderr, "Test finished OK!\n");
 544     } else {
 545         fprintf(stderr, "TEST FAILED WITH ERROR %d\n", exit_code);
 546     }
 547 
 548     return exit_code;
 549 }
 550 
 551 static void set_namespace(int nprocs, char *ranks, char *nspace,
 552                           pmix_op_cbfunc_t cbfunc, myxfer_t *x)
 553 {
 554     char *regex, *ppn;
 555     char hostname[PMIX_MAXHOSTNAMELEN];
 556     int n;
 557     pmix_data_array_t *darray;
 558     pmix_info_t *info;
 559     pmix_rank_t rank;
 560     uint16_t lr;
 561 
 562     gethostname(hostname, sizeof(hostname));
 563     x->ninfo = 7 + nprocs;
 564 
 565     PMIX_INFO_CREATE(x->info, x->ninfo);
 566     (void)strncpy(x->info[0].key, PMIX_UNIV_SIZE, PMIX_MAX_KEYLEN);
 567     x->info[0].value.type = PMIX_UINT32;
 568     x->info[0].value.data.uint32 = nprocs;
 569 
 570     (void)strncpy(x->info[1].key, PMIX_SPAWNED, PMIX_MAX_KEYLEN);
 571     x->info[1].value.type = PMIX_UINT32;
 572     x->info[1].value.data.uint32 = 0;
 573 
 574     (void)strncpy(x->info[2].key, PMIX_LOCAL_SIZE, PMIX_MAX_KEYLEN);
 575     x->info[2].value.type = PMIX_UINT32;
 576     x->info[2].value.data.uint32 = nprocs;
 577 
 578     (void)strncpy(x->info[3].key, PMIX_LOCAL_PEERS, PMIX_MAX_KEYLEN);
 579     x->info[3].value.type = PMIX_STRING;
 580     x->info[3].value.data.string = strdup(ranks);
 581 
 582     PMIx_generate_regex(hostname, &regex);
 583     (void)strncpy(x->info[4].key, PMIX_NODE_MAP, PMIX_MAX_KEYLEN);
 584     x->info[4].value.type = PMIX_STRING;
 585     x->info[4].value.data.string = regex;
 586 
 587     PMIx_generate_ppn(ranks, &ppn);
 588     (void)strncpy(x->info[5].key, PMIX_PROC_MAP, PMIX_MAX_KEYLEN);
 589     x->info[5].value.type = PMIX_STRING;
 590     x->info[5].value.data.string = ppn;
 591 
 592     (void)strncpy(x->info[6].key, PMIX_JOB_SIZE, PMIX_MAX_KEYLEN);
 593     x->info[6].value.type = PMIX_UINT32;
 594     x->info[6].value.data.uint32 = nprocs;
 595 
 596     for (n=0; n < nprocs; n++) {
 597         (void)strncpy(x->info[7+n].key, PMIX_PROC_DATA, PMIX_MAX_KEYLEN);
 598         x->info[7+n].value.type = PMIX_DATA_ARRAY;
 599         darray = (pmix_data_array_t*)malloc(sizeof(pmix_data_array_t));
 600         darray->size = 2;
 601         darray->type = PMIX_INFO;
 602         PMIX_INFO_CREATE(darray->array, 2);
 603         info = (pmix_info_t*)darray->array;
 604         rank = n;
 605         PMIX_INFO_LOAD(&info[0], PMIX_RANK, &rank, PMIX_PROC_RANK);
 606         lr = n;
 607         PMIX_INFO_LOAD(&info[1], PMIX_LOCAL_RANK, &lr, PMIX_UINT16);
 608         x->info[7+n].value.data.darray = darray;
 609     }
 610 
 611     PMIx_server_register_nspace(nspace, nprocs, x->info, x->ninfo,
 612                                 cbfunc, x);
 613 }
 614 
 615 static void errhandler(size_t evhdlr_registration_id,
 616                        pmix_status_t status,
 617                        const pmix_proc_t *source,
 618                        pmix_info_t info[], size_t ninfo,
 619                        pmix_info_t results[], size_t nresults,
 620                        pmix_event_notification_cbfunc_fn_t cbfunc,
 621                        void *cbdata)
 622 {
 623     pmix_output(0, "SERVER: ERRHANDLER CALLED WITH STATUS %d", status);
 624 }
 625 
 626 static void errhandler_reg_callbk (pmix_status_t status,
 627                                    size_t errhandler_ref,
 628                                    void *cbdata)
 629 {
 630     mylock_t *lock = (mylock_t*)cbdata;
 631 
 632     pmix_output(0, "SERVER: ERRHANDLER REGISTRATION CALLBACK CALLED WITH STATUS %d, ref=%lu",
 633                 status, (unsigned long)errhandler_ref);
 634     lock->status = status;
 635     DEBUG_WAKEUP_THREAD(lock);
 636 }
 637 
 638 static pmix_status_t connected(const pmix_proc_t *proc, void *server_object,
 639                                pmix_op_cbfunc_t cbfunc, void *cbdata)
 640 {
 641     if (NULL != cbfunc) {
 642         cbfunc(PMIX_SUCCESS, cbdata);
 643     }
 644     return PMIX_SUCCESS;
 645 }
 646 static pmix_status_t finalized(const pmix_proc_t *proc, void *server_object,
 647                      pmix_op_cbfunc_t cbfunc, void *cbdata)
 648 {
 649     pmix_output(0, "SERVER: FINALIZED %s:%d WAKEUP %d",
 650                 proc->nspace, proc->rank, wakeup);
 651     /* ensure we call the cbfunc so the proc can exit! */
 652     if (NULL != cbfunc) {
 653         cbfunc(PMIX_SUCCESS, cbdata);
 654     }
 655     return PMIX_SUCCESS;
 656 }
 657 
 658 static void abcbfunc(pmix_status_t status, void *cbdata)
 659 {
 660     myxfer_t *x = (myxfer_t*)cbdata;
 661 
 662     /* be sure to release the caller */
 663     if (NULL != x->cbfunc) {
 664         x->cbfunc(status, x->cbdata);
 665     }
 666     PMIX_RELEASE(x);
 667 }
 668 
 669 static pmix_status_t abort_fn(const pmix_proc_t *proc,
 670                               void *server_object,
 671                               int status, const char msg[],
 672                               pmix_proc_t procs[], size_t nprocs,
 673                               pmix_op_cbfunc_t cbfunc, void *cbdata)
 674 {
 675     pmix_status_t rc;
 676     myxfer_t *x;
 677 
 678     if (NULL != procs) {
 679         pmix_output(0, "SERVER: ABORT on %s:%d", procs[0].nspace, procs[0].rank);
 680     } else {
 681         pmix_output(0, "SERVER: ABORT OF ALL PROCS IN NSPACE %s", proc->nspace);
 682     }
 683 
 684     /* instead of aborting the specified procs, notify them
 685      * (if they have registered their errhandler) */
 686 
 687     /* use the myxfer_t object to ensure we release
 688      * the caller when notification has been queued */
 689     x = PMIX_NEW(myxfer_t);
 690     (void)strncpy(x->caller.nspace, proc->nspace, PMIX_MAX_NSLEN);
 691     x->caller.rank = proc->rank;
 692 
 693     PMIX_INFO_CREATE(x->info, 2);
 694     (void)strncpy(x->info[0].key, "DARTH", PMIX_MAX_KEYLEN);
 695     x->info[0].value.type = PMIX_INT8;
 696     x->info[0].value.data.int8 = 12;
 697     (void)strncpy(x->info[1].key, "VADER", PMIX_MAX_KEYLEN);
 698     x->info[1].value.type = PMIX_DOUBLE;
 699     x->info[1].value.data.dval = 12.34;
 700     x->cbfunc = cbfunc;
 701     x->cbdata = cbdata;
 702 
 703     if (PMIX_SUCCESS != (rc = PMIx_Notify_event(status, &x->caller,
 704                                                 PMIX_RANGE_NAMESPACE,
 705                                                 x->info, 2,
 706                                                 abcbfunc, x))) {
 707         pmix_output(0, "SERVER: FAILED NOTIFY ERROR %d", (int)rc);
 708     }
 709 
 710     return PMIX_SUCCESS;
 711 }
 712 
 713 
 714 static pmix_status_t fencenb_fn(const pmix_proc_t procs[], size_t nprocs,
 715                       const pmix_info_t info[], size_t ninfo,
 716                       char *data, size_t ndata,
 717                       pmix_modex_cbfunc_t cbfunc, void *cbdata)
 718 {
 719     pmix_output(0, "SERVER: FENCENB");
 720     /* pass the provided data back to each participating proc */
 721     if (NULL != cbfunc) {
 722         cbfunc(PMIX_SUCCESS, data, ndata, cbdata, NULL, NULL);
 723     }
 724     return PMIX_SUCCESS;
 725 }
 726 
 727 
 728 static pmix_status_t dmodex_fn(const pmix_proc_t *proc,
 729                      const pmix_info_t info[], size_t ninfo,
 730                      pmix_modex_cbfunc_t cbfunc, void *cbdata)
 731 {
 732     pmix_output(0, "SERVER: DMODEX");
 733 
 734     /* if this is a timeout test, then do nothing */
 735     if (istimeouttest) {
 736         return PMIX_SUCCESS;
 737     }
 738 
 739     /* we don't have any data for remote procs as this
 740      * test only runs one server - so report accordingly */
 741     if (NULL != cbfunc) {
 742         cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, cbdata, NULL, NULL);
 743     }
 744     return PMIX_SUCCESS;
 745 }
 746 
 747 
 748 static pmix_status_t publish_fn(const pmix_proc_t *proc,
 749                       const pmix_info_t info[], size_t ninfo,
 750                       pmix_op_cbfunc_t cbfunc, void *cbdata)
 751 {
 752     pmix_locdat_t *p;
 753     size_t n;
 754 
 755     pmix_output(0, "SERVER: PUBLISH");
 756 
 757     for (n=0; n < ninfo; n++) {
 758         p = PMIX_NEW(pmix_locdat_t);
 759         (void)strncpy(p->pdata.proc.nspace, proc->nspace, PMIX_MAX_NSLEN);
 760         p->pdata.proc.rank = proc->rank;
 761         (void)strncpy(p->pdata.key, info[n].key, PMIX_MAX_KEYLEN);
 762         pmix_value_xfer(&p->pdata.value, (pmix_value_t*)&info[n].value);
 763         pmix_list_append(&pubdata, &p->super);
 764     }
 765     if (NULL != cbfunc) {
 766         cbfunc(PMIX_SUCCESS, cbdata);
 767     }
 768     return PMIX_SUCCESS;
 769 }
 770 
 771 
 772 static pmix_status_t lookup_fn(const pmix_proc_t *proc, char **keys,
 773                      const pmix_info_t info[], size_t ninfo,
 774                      pmix_lookup_cbfunc_t cbfunc, void *cbdata)
 775 {
 776     pmix_locdat_t *p, *p2;
 777     pmix_list_t results;
 778     size_t i, n;
 779     pmix_pdata_t *pd = NULL;
 780     pmix_status_t ret = PMIX_ERR_NOT_FOUND;
 781 
 782     pmix_output(0, "SERVER: LOOKUP");
 783 
 784     PMIX_CONSTRUCT(&results, pmix_list_t);
 785 
 786     for (n=0; NULL != keys[n]; n++) {
 787         PMIX_LIST_FOREACH(p, &pubdata, pmix_locdat_t) {
 788             if (0 == strncmp(keys[n], p->pdata.key, PMIX_MAX_KEYLEN)) {
 789                 p2 = PMIX_NEW(pmix_locdat_t);
 790                 (void)strncpy(p2->pdata.proc.nspace, p->pdata.proc.nspace, PMIX_MAX_NSLEN);
 791                 p2->pdata.proc.rank = p->pdata.proc.rank;
 792                 (void)strncpy(p2->pdata.key, p->pdata.key, PMIX_MAX_KEYLEN);
 793                 pmix_value_xfer(&p2->pdata.value, &p->pdata.value);
 794                 pmix_list_append(&results, &p2->super);
 795                 break;
 796             }
 797         }
 798     }
 799     if (0 < (n = pmix_list_get_size(&results))) {
 800         ret = PMIX_SUCCESS;
 801         PMIX_PDATA_CREATE(pd, n);
 802         for (i=0; i < n; i++) {
 803             p = (pmix_locdat_t*)pmix_list_remove_first(&results);
 804             if (p) {
 805                 (void)strncpy(pd[i].proc.nspace, p->pdata.proc.nspace, PMIX_MAX_NSLEN);
 806                 pd[i].proc.rank = p->pdata.proc.rank;
 807                 (void)strncpy(pd[i].key, p->pdata.key, PMIX_MAX_KEYLEN);
 808                 pmix_value_xfer(&pd[i].value, &p->pdata.value);
 809             }
 810         }
 811     }
 812     PMIX_LIST_DESTRUCT(&results);
 813     if (NULL != cbfunc) {
 814         cbfunc(ret, pd, n, cbdata);
 815     }
 816     if (0 < n) {
 817         PMIX_PDATA_FREE(pd, n);
 818     }
 819     return PMIX_SUCCESS;
 820 }
 821 
 822 
 823 static pmix_status_t unpublish_fn(const pmix_proc_t *proc, char **keys,
 824                         const pmix_info_t info[], size_t ninfo,
 825                         pmix_op_cbfunc_t cbfunc, void *cbdata)
 826 {
 827     pmix_locdat_t *p, *p2;
 828     size_t n;
 829 
 830     pmix_output(0, "SERVER: UNPUBLISH");
 831 
 832     for (n=0; NULL != keys[n]; n++) {
 833         PMIX_LIST_FOREACH_SAFE(p, p2, &pubdata, pmix_locdat_t) {
 834             if (0 == strncmp(keys[n], p->pdata.key, PMIX_MAX_KEYLEN)) {
 835                 pmix_list_remove_item(&pubdata, &p->super);
 836                 PMIX_RELEASE(p);
 837                 break;
 838             }
 839         }
 840     }
 841     if (NULL != cbfunc) {
 842         cbfunc(PMIX_SUCCESS, cbdata);
 843     }
 844     return PMIX_SUCCESS;
 845 }
 846 
 847 static void spcbfunc(pmix_status_t status, void *cbdata)
 848 {
 849     myxfer_t *x = (myxfer_t*)cbdata;
 850 
 851     if (NULL != x->spcbfunc) {
 852         x->spcbfunc(PMIX_SUCCESS, "DYNSPACE", x->cbdata);
 853     }
 854 }
 855 
 856 static pmix_status_t spawn_fn(const pmix_proc_t *proc,
 857                     const pmix_info_t job_info[], size_t ninfo,
 858                     const pmix_app_t apps[], size_t napps,
 859                     pmix_spawn_cbfunc_t cbfunc, void *cbdata)
 860 {
 861     myxfer_t *x;
 862     size_t n;
 863     pmix_proc_t *pptr;
 864     bool spawned;
 865 
 866     pmix_output(0, "SERVER: SPAWN");
 867 
 868     /* check the job info for parent and spawned keys */
 869     for (n=0; n < ninfo; n++) {
 870         if (0 == strncmp(job_info[n].key, PMIX_PARENT_ID, PMIX_MAX_KEYLEN)) {
 871             pptr = job_info[n].value.data.proc;
 872             pmix_output(0, "SPAWN: Parent ID %s:%d", pptr->nspace, pptr->rank);
 873         } else if (0 == strncmp(job_info[n].key, PMIX_SPAWNED, PMIX_MAX_KEYLEN)) {
 874             spawned = PMIX_INFO_TRUE(&job_info[n]);
 875             pmix_output(0, "SPAWN: Spawned %s", spawned ? "TRUE" : "FALSE");
 876         }
 877     }
 878 
 879     /* in practice, we would pass this request to the local
 880      * resource manager for launch, and then have that server
 881      * execute our callback function. For now, we will fake
 882      * the spawn and just pretend */
 883 
 884     /* must register the nspace for the new procs before
 885      * we return to the caller */
 886     x = PMIX_NEW(myxfer_t);
 887     x->spcbfunc = cbfunc;
 888     x->cbdata = cbdata;
 889 
 890     set_namespace(2, "0,1", "DYNSPACE", spcbfunc, x);
 891 
 892     return PMIX_SUCCESS;
 893 }
 894 
 895 static int numconnects = 0;
 896 
 897 static pmix_status_t connect_fn(const pmix_proc_t procs[], size_t nprocs,
 898                                 const pmix_info_t info[], size_t ninfo,
 899                                 pmix_op_cbfunc_t cbfunc, void *cbdata)
 900 {
 901     pmix_output(0, "SERVER: CONNECT");
 902 
 903     /* in practice, we would pass this request to the local
 904      * resource manager for handling */
 905 
 906     numconnects++;
 907 
 908     if (NULL != cbfunc) {
 909         cbfunc(PMIX_SUCCESS, cbdata);
 910     }
 911 
 912     return PMIX_SUCCESS;
 913 }
 914 
 915 
 916 static pmix_status_t disconnect_fn(const pmix_proc_t procs[], size_t nprocs,
 917                                    const pmix_info_t info[], size_t ninfo,
 918                                    pmix_op_cbfunc_t cbfunc, void *cbdata)
 919 {
 920     pmix_output(0, "SERVER: DISCONNECT");
 921 
 922     /* in practice, we would pass this request to the local
 923      * resource manager for handling */
 924 
 925     if (NULL != cbfunc) {
 926         cbfunc(PMIX_SUCCESS, cbdata);
 927     }
 928 
 929     return PMIX_SUCCESS;
 930 }
 931 
 932 static pmix_status_t register_event_fn(pmix_status_t *codes, size_t ncodes,
 933                                        const pmix_info_t info[], size_t ninfo,
 934                                        pmix_op_cbfunc_t cbfunc, void *cbdata)
 935 {
 936     if (NULL != cbfunc) {
 937         cbfunc(PMIX_SUCCESS, cbdata);
 938     }
 939     return PMIX_SUCCESS;
 940 }
 941 
 942 static pmix_status_t deregister_events(pmix_status_t *codes, size_t ncodes,
 943                                        pmix_op_cbfunc_t cbfunc, void *cbdata)
 944 {
 945     return PMIX_SUCCESS;
 946 }
 947 
 948 static pmix_status_t notify_event(pmix_status_t code,
 949                                   const pmix_proc_t *source,
 950                                   pmix_data_range_t range,
 951                                   pmix_info_t info[], size_t ninfo,
 952                                   pmix_op_cbfunc_t cbfunc, void *cbdata)
 953 {
 954     return PMIX_SUCCESS;
 955 }
 956 
 957 typedef struct query_data_t {
 958     pmix_info_t *data;
 959     size_t ndata;
 960 } query_data_t;
 961 
 962 static pmix_status_t query_fn(pmix_proc_t *proct,
 963                               pmix_query_t *queries, size_t nqueries,
 964                               pmix_info_cbfunc_t cbfunc,
 965                               void *cbdata)
 966 {
 967     size_t n;
 968     pmix_info_t *info;
 969 
 970     pmix_output(0, "SERVER: QUERY");
 971 
 972     if (NULL == cbfunc) {
 973         return PMIX_ERROR;
 974     }
 975     /* keep this simple */
 976     PMIX_INFO_CREATE(info, nqueries);
 977     for (n=0; n < nqueries; n++) {
 978         pmix_output(0, "\tKey: %s", queries[n].keys[0]);
 979         (void)strncpy(info[n].key, queries[n].keys[0], PMIX_MAX_KEYLEN);
 980         info[n].value.type = PMIX_STRING;
 981         if (0 > asprintf(&info[n].value.data.string, "%d", (int)n)) {
 982             return PMIX_ERROR;
 983         }
 984     }
 985     cbfunc(PMIX_SUCCESS, info, nqueries, cbdata, NULL, NULL);
 986     return PMIX_SUCCESS;
 987 }
 988 
 989 static void tool_connect_fn(pmix_info_t *info, size_t ninfo,
 990                             pmix_tool_connection_cbfunc_t cbfunc,
 991                             void *cbdata)
 992 {
 993     pmix_proc_t proc;
 994 
 995     pmix_output(0, "SERVER: TOOL CONNECT");
 996 
 997     /* just pass back an arbitrary nspace */
 998     (void)strncpy(proc.nspace, "TOOL", PMIX_MAX_NSLEN);
 999     proc.rank = 0;
1000 
1001     if (NULL != cbfunc) {
1002         cbfunc(PMIX_SUCCESS, &proc, cbdata);
1003     }
1004 }
1005 
1006 static void log_fn(const pmix_proc_t *client,
1007                    const pmix_info_t data[], size_t ndata,
1008                    const pmix_info_t directives[], size_t ndirs,
1009                    pmix_op_cbfunc_t cbfunc, void *cbdata)
1010 {
1011     pmix_output(0, "SERVER: LOG");
1012 
1013     if (NULL != cbfunc) {
1014         cbfunc(PMIX_SUCCESS, cbdata);
1015     }
1016 }
1017 
1018 static void wait_signal_callback(int fd, short event, void *arg)
1019 {
1020     pmix_event_t *sig = (pmix_event_t*) arg;
1021     int status;
1022     pid_t pid;
1023     wait_tracker_t *t2;
1024 
1025     if (SIGCHLD != pmix_event_get_signal(sig)) {
1026         return;
1027     }
1028 
1029     /* we can have multiple children leave but only get one
1030      * sigchild callback, so reap all the waitpids until we
1031      * don't get anything valid back */
1032     while (1) {
1033         pid = waitpid(-1, &status, WNOHANG);
1034         if (-1 == pid && EINTR == errno) {
1035             /* try it again */
1036             continue;
1037         }
1038         /* if we got garbage, then nothing we can do */
1039         if (pid <= 0) {
1040             return;
1041         }
1042 
1043         /* we are already in an event, so it is safe to access the list */
1044         PMIX_LIST_FOREACH(t2, &children, wait_tracker_t) {
1045             if (pid == t2->pid) {
1046                 t2->exit_code = status;
1047                 /* found it! */
1048                 if (0 != status && 0 == exit_code) {
1049                     exit_code = status;
1050                 }
1051                 --wakeup;
1052                 break;
1053             }
1054         }
1055     }
1056 }

/* [<][>][^][v][top][bottom][index][help] */