root/opal/mca/pmix/pmix4x/pmix/test/simple/stability.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. xfcon
  2. xfdes
  3. opcbfunc
  4. main
  5. set_namespace
  6. errhandler
  7. errhandler_reg_callbk
  8. connected
  9. finalized
  10. abcbfunc
  11. abort_fn
  12. fencenb_fn
  13. dmodex_fn
  14. publish_fn
  15. lookup_fn
  16. unpublish_fn
  17. spcbfunc
  18. spawn_fn
  19. connect_fn
  20. disconnect_fn
  21. register_event_fn
  22. deregister_events
  23. notify_event
  24. query_fn
  25. tool_connect_fn
  26. log_fn
  27. wait_signal_callback

   1 /*
   2  * Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2011 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2006-2013 Los Alamos National Security, LLC.
  13  *                         All rights reserved.
  14  * Copyright (c) 2009-2012 Cisco Systems, Inc.  All rights reserved.
  15  * Copyright (c) 2011      Oak Ridge National Labs.  All rights reserved.
  16  * Copyright (c) 2013-2019 Intel, Inc.  All rights reserved.
  17  * Copyright (c) 2015-2018 Research Organization for Information Science
  18  *                         and Technology (RIST). All rights reserved.
  19  * Copyright (c) 2016      IBM Corporation.  All rights reserved.
  20  * $COPYRIGHT$
  21  *
  22  * Additional copyrights may follow
  23  *
  24  * $HEADER$
  25  *
  26  */
  27 
  28 #include <src/include/pmix_config.h>
  29 #include <pmix_server.h>
  30 #include <src/include/types.h>
  31 #include <src/include/pmix_globals.h>
  32 
  33 #include <stdio.h>
  34 #include <stdlib.h>
  35 #include <unistd.h>
  36 #include <time.h>
  37 #include <sys/types.h>
  38 #include <sys/wait.h>
  39 #include <errno.h>
  40 #include <signal.h>
  41 
  42 #if PMIX_HAVE_HWLOC
  43 #include <src/hwloc/hwloc-internal.h>
  44 #endif
  45 
  46 #include "src/class/pmix_list.h"
  47 #include "src/util/pmix_environ.h"
  48 #include "src/util/output.h"
  49 #include "src/util/printf.h"
  50 #include "src/util/argv.h"
  51 
  52 #include "simptest.h"
  53 
  54 static pmix_status_t connected(const pmix_proc_t *proc, void *server_object,
  55                                pmix_op_cbfunc_t cbfunc, void *cbdata);
  56 static pmix_status_t finalized(const pmix_proc_t *proc, void *server_object,
  57                                pmix_op_cbfunc_t cbfunc, void *cbdata);
  58 static pmix_status_t abort_fn(const pmix_proc_t *proc, void *server_object,
  59                               int status, const char msg[],
  60                               pmix_proc_t procs[], size_t nprocs,
  61                               pmix_op_cbfunc_t cbfunc, void *cbdata);
  62 static pmix_status_t fencenb_fn(const pmix_proc_t procs[], size_t nprocs,
  63                                 const pmix_info_t info[], size_t ninfo,
  64                                 char *data, size_t ndata,
  65                                 pmix_modex_cbfunc_t cbfunc, void *cbdata);
  66 static pmix_status_t dmodex_fn(const pmix_proc_t *proc,
  67                                const pmix_info_t info[], size_t ninfo,
  68                                pmix_modex_cbfunc_t cbfunc, void *cbdata);
  69 static pmix_status_t publish_fn(const pmix_proc_t *proc,
  70                                 const pmix_info_t info[], size_t ninfo,
  71                                 pmix_op_cbfunc_t cbfunc, void *cbdata);
  72 static pmix_status_t lookup_fn(const pmix_proc_t *proc, char **keys,
  73                                const pmix_info_t info[], size_t ninfo,
  74                                pmix_lookup_cbfunc_t cbfunc, void *cbdata);
  75 static pmix_status_t unpublish_fn(const pmix_proc_t *proc, char **keys,
  76                                   const pmix_info_t info[], size_t ninfo,
  77                                   pmix_op_cbfunc_t cbfunc, void *cbdata);
  78 static pmix_status_t spawn_fn(const pmix_proc_t *proc,
  79                               const pmix_info_t job_info[], size_t ninfo,
  80                               const pmix_app_t apps[], size_t napps,
  81                               pmix_spawn_cbfunc_t cbfunc, void *cbdata);
  82 static pmix_status_t connect_fn(const pmix_proc_t procs[], size_t nprocs,
  83                                 const pmix_info_t info[], size_t ninfo,
  84                                 pmix_op_cbfunc_t cbfunc, void *cbdata);
  85 static pmix_status_t disconnect_fn(const pmix_proc_t procs[], size_t nprocs,
  86                                    const pmix_info_t info[], size_t ninfo,
  87                                    pmix_op_cbfunc_t cbfunc, void *cbdata);
  88 static pmix_status_t register_event_fn(pmix_status_t *codes, size_t ncodes,
  89                                        const pmix_info_t info[], size_t ninfo,
  90                                        pmix_op_cbfunc_t cbfunc, void *cbdata);
  91 static pmix_status_t deregister_events(pmix_status_t *codes, size_t ncodes,
  92                                        pmix_op_cbfunc_t cbfunc, void *cbdata);
  93 static pmix_status_t notify_event(pmix_status_t code,
  94                                   const pmix_proc_t *source,
  95                                   pmix_data_range_t range,
  96                                   pmix_info_t info[], size_t ninfo,
  97                                   pmix_op_cbfunc_t cbfunc, void *cbdata);
  98 static pmix_status_t query_fn(pmix_proc_t *proct,
  99                               pmix_query_t *queries, size_t nqueries,
 100                               pmix_info_cbfunc_t cbfunc,
 101                               void *cbdata);
 102 static void tool_connect_fn(pmix_info_t *info, size_t ninfo,
 103                             pmix_tool_connection_cbfunc_t cbfunc,
 104                             void *cbdata);
 105 static void log_fn(const pmix_proc_t *client,
 106                    const pmix_info_t data[], size_t ndata,
 107                    const pmix_info_t directives[], size_t ndirs,
 108                    pmix_op_cbfunc_t cbfunc, void *cbdata);
 109 
 110 static pmix_server_module_t mymodule = {
 111     .client_connected = connected,
 112     .client_finalized = finalized,
 113     .abort = abort_fn,
 114     .fence_nb = fencenb_fn,
 115     .direct_modex = dmodex_fn,
 116     .publish = publish_fn,
 117     .lookup = lookup_fn,
 118     .unpublish = unpublish_fn,
 119     .spawn = spawn_fn,
 120     .connect = connect_fn,
 121     .disconnect = disconnect_fn,
 122     .register_events = register_event_fn,
 123     .deregister_events = deregister_events,
 124     .notify_event = notify_event,
 125     .query = query_fn,
 126     .tool_connected = tool_connect_fn,
 127     .log = log_fn
 128 };
 129 
 130 typedef struct {
 131     pmix_list_item_t super;
 132     pmix_pdata_t pdata;
 133 } pmix_locdat_t;
 134 PMIX_CLASS_INSTANCE(pmix_locdat_t,
 135                     pmix_list_item_t,
 136                     NULL, NULL);
 137 
 138 typedef struct {
 139     pmix_object_t super;
 140     mylock_t lock;
 141     pmix_event_t ev;
 142     pmix_proc_t caller;
 143     pmix_info_t *info;
 144     size_t ninfo;
 145     pmix_op_cbfunc_t cbfunc;
 146     pmix_spawn_cbfunc_t spcbfunc;
 147     pmix_release_cbfunc_t relcbfunc;
 148     void *cbdata;
 149 } myxfer_t;
 150 static void xfcon(myxfer_t *p)
 151 {
 152     DEBUG_CONSTRUCT_LOCK(&p->lock);
 153     p->info = NULL;
 154     p->ninfo = 0;
 155     p->cbfunc = NULL;
 156     p->spcbfunc = NULL;
 157     p->cbdata = NULL;
 158 }
 159 static void xfdes(myxfer_t *p)
 160 {
 161     DEBUG_DESTRUCT_LOCK(&p->lock);
 162     if (NULL != p->info) {
 163         PMIX_INFO_FREE(p->info, p->ninfo);
 164     }
 165 }
 166 PMIX_CLASS_INSTANCE(myxfer_t,
 167                     pmix_object_t,
 168                     xfcon, xfdes);
 169 
 170 typedef struct {
 171     pmix_list_item_t super;
 172     int exit_code;
 173     pid_t pid;
 174 } wait_tracker_t;
 175 PMIX_CLASS_INSTANCE(wait_tracker_t,
 176                     pmix_list_item_t,
 177                     NULL, NULL);
 178 
 179 static volatile int wakeup;
 180 static int exit_code = 0;
 181 static pmix_list_t pubdata;
 182 static pmix_event_t handler;
 183 static pmix_list_t children;
 184 static bool istimeouttest = false;
 185 
 186 static void set_namespace(int nprocs, char *ranks, char *nspace,
 187                           pmix_op_cbfunc_t cbfunc, myxfer_t *x);
 188 static void errhandler(size_t evhdlr_registration_id,
 189                        pmix_status_t status,
 190                        const pmix_proc_t *source,
 191                        pmix_info_t info[], size_t ninfo,
 192                        pmix_info_t results[], size_t nresults,
 193                        pmix_event_notification_cbfunc_fn_t cbfunc,
 194                        void *cbdata);
 195 static void wait_signal_callback(int fd, short event, void *arg);
 196 static void errhandler_reg_callbk (pmix_status_t status,
 197                                    size_t errhandler_ref,
 198                                    void *cbdata);
 199 
 200 static void opcbfunc(pmix_status_t status, void *cbdata)
 201 {
 202     myxfer_t *x = (myxfer_t*)cbdata;
 203 
 204     /* release the caller, if necessary */
 205     if (NULL != x->cbfunc) {
 206         x->cbfunc(PMIX_SUCCESS, x->cbdata);
 207     }
 208     DEBUG_WAKEUP_THREAD(&x->lock);
 209 }
 210 
 211 int main(int argc, char **argv)
 212 {
 213     char **client_env=NULL;
 214     char **client_argv=NULL;
 215     char *tmp, **atmp, *executable=NULL, *nspace;
 216     int rc, nprocs=1, n, k;
 217     uid_t myuid;
 218     gid_t mygid;
 219     pid_t pid;
 220     myxfer_t *x;
 221     pmix_proc_t proc;
 222     wait_tracker_t *child;
 223     pmix_info_t *info;
 224     size_t ninfo;
 225     mylock_t mylock;
 226     int ncycles=1, m, delay=0;
 227 
 228     /* smoke test */
 229     if (PMIX_SUCCESS != 0) {
 230         fprintf(stderr, "ERROR IN COMPUTING CONSTANTS: PMIX_SUCCESS = %d\n", PMIX_SUCCESS);
 231         exit(1);
 232     }
 233 
 234     fprintf(stderr, "Testing version %s\n", PMIx_Get_version());
 235 
 236     /* see if we were passed the number of procs to run or
 237      * the executable to use */
 238     for (n=1; n < argc; n++) {
 239         if (0 == strcmp("-n", argv[n]) &&
 240             NULL != argv[n+1]) {
 241             nprocs = strtol(argv[n+1], NULL, 10);
 242             ++n;  // step over the argument
 243         } else if (0 == strcmp("-e", argv[n]) &&
 244                    NULL != argv[n+1]) {
 245             executable = strdup(argv[n+1]);
 246             /* check for timeout test */
 247             if (NULL != strstr(executable, "quietclient")) {
 248                 istimeouttest = true;
 249             }
 250             for (k=n+2; NULL != argv[k]; k++) {
 251                 pmix_argv_append_nosize(&client_argv, argv[k]);
 252             }
 253             n += k;
 254         } else if ((0 == strcmp("-reps", argv[n]) ||
 255                    0 == strcmp("--reps", argv[n])) &&
 256                    NULL != argv[n+1]) {
 257             ncycles = strtol(argv[n+1], NULL, 10);
 258         } else if ((0 == strcmp("-sleep", argv[n]) ||
 259                    0 == strcmp("--sleep", argv[n])) &&
 260                    NULL != argv[n+1]) {
 261             delay = strtol(argv[n+1], NULL, 10);
 262         } else if (0 == strcmp("-h", argv[n])) {
 263             /* print the options and exit */
 264             fprintf(stderr, "usage: simptest <options>\n");
 265             fprintf(stderr, "    -n N     Number of clients to run\n");
 266             fprintf(stderr, "    -e foo   Name of the client executable to run (default: simpclient\n");
 267             fprintf(stderr, "    -reps N  Cycle for N repetitions");
 268             exit(0);
 269         }
 270     }
 271     if (NULL == executable) {
 272         executable = strdup("./quietclient");
 273     }
 274     /* setup the server library and tell it to support tool connections */
 275     ninfo = 3;
 276 
 277     PMIX_INFO_CREATE(info, ninfo);
 278     PMIX_INFO_LOAD(&info[0], PMIX_SERVER_TOOL_SUPPORT, NULL, PMIX_BOOL);
 279     PMIX_INFO_LOAD(&info[1], PMIX_USOCK_DISABLE, NULL, PMIX_BOOL);
 280     PMIX_INFO_LOAD(&info[2], PMIX_SERVER_GATEWAY, NULL, PMIX_BOOL);
 281     if (PMIX_SUCCESS != (rc = PMIx_server_init(&mymodule, info, ninfo))) {
 282         fprintf(stderr, "Init failed with error %d\n", rc);
 283         return rc;
 284     }
 285     PMIX_INFO_FREE(info, ninfo);
 286 
 287     /* register the default errhandler */
 288     DEBUG_CONSTRUCT_LOCK(&mylock);
 289     ninfo = 1;
 290     PMIX_INFO_CREATE(info, ninfo);
 291     PMIX_INFO_LOAD(&info[0], PMIX_EVENT_HDLR_NAME, "SIMPTEST-DEFAULT", PMIX_STRING);
 292     PMIx_Register_event_handler(NULL, 0, info, ninfo,
 293                                 errhandler, errhandler_reg_callbk, (void*)&mylock);
 294     DEBUG_WAIT_THREAD(&mylock);
 295     PMIX_INFO_FREE(info, ninfo);
 296     if (PMIX_SUCCESS != mylock.status) {
 297         exit(mylock.status);
 298     }
 299     DEBUG_DESTRUCT_LOCK(&mylock);
 300 
 301     /* setup the pub data, in case it is used */
 302     PMIX_CONSTRUCT(&pubdata, pmix_list_t);
 303 
 304     /* setup to see sigchld on the forked tests */
 305     PMIX_CONSTRUCT(&children, pmix_list_t);
 306     pmix_event_assign(&handler, pmix_globals.evbase, SIGCHLD,
 307                       EV_SIGNAL|EV_PERSIST,wait_signal_callback, &handler);
 308     pmix_event_add(&handler, NULL);
 309 
 310     for (m=0; m < ncycles; m++) {
 311         fprintf(stderr, "Running cycle %d\n", m);
 312         /* we have a single namespace for all clients */
 313         atmp = NULL;
 314         for (n=0; n < nprocs; n++) {
 315             asprintf(&tmp, "%d", n);
 316             pmix_argv_append_nosize(&atmp, tmp);
 317             free(tmp);
 318         }
 319         tmp = pmix_argv_join(atmp, ',');
 320         pmix_argv_free(atmp);
 321         asprintf(&nspace, "foobar%d", m);
 322         (void)strncpy(proc.nspace, nspace, PMIX_MAX_NSLEN);
 323         x = PMIX_NEW(myxfer_t);
 324         set_namespace(nprocs, tmp, nspace, opcbfunc, x);
 325 
 326 
 327         /* set common argv and env */
 328         client_env = pmix_argv_copy(environ);
 329         pmix_argv_prepend_nosize(&client_argv, executable);
 330 
 331         wakeup = nprocs;
 332         myuid = getuid();
 333         mygid = getgid();
 334 
 335         /* if the nspace registration hasn't completed yet,
 336          * wait for it here */
 337         DEBUG_WAIT_THREAD(&x->lock);
 338         free(tmp);
 339         free(nspace);
 340         PMIX_RELEASE(x);
 341 
 342         /* fork/exec the test */
 343         for (n = 0; n < nprocs; n++) {
 344             proc.rank = n;
 345             if (PMIX_SUCCESS != (rc = PMIx_server_setup_fork(&proc, &client_env))) {
 346                 fprintf(stderr, "Server fork setup failed with error %d\n", rc);
 347                 PMIx_server_finalize();
 348                 return rc;
 349             }
 350             x = PMIX_NEW(myxfer_t);
 351             if (PMIX_SUCCESS != (rc = PMIx_server_register_client(&proc, myuid, mygid,
 352                                                                   NULL, opcbfunc, x))) {
 353                 fprintf(stderr, "Server register client failed with error %d\n", rc);
 354                 PMIx_server_finalize();
 355                 return rc;
 356             }
 357             /* don't fork/exec the client until we know it is registered
 358              * so we avoid a potential race condition in the server */
 359             DEBUG_WAIT_THREAD(&x->lock);
 360             PMIX_RELEASE(x);
 361             pid = fork();
 362             if (pid < 0) {
 363                 fprintf(stderr, "Fork failed\n");
 364                 PMIx_server_finalize();
 365                 return -1;
 366             }
 367             child = PMIX_NEW(wait_tracker_t);
 368             child->pid = pid;
 369             pmix_list_append(&children, &child->super);
 370 
 371             if (pid == 0) {
 372                 execve(executable, client_argv, client_env);
 373                 /* Does not return */
 374                 exit(0);
 375             }
 376         }
 377         pmix_argv_free(client_argv);
 378         client_argv = NULL;
 379         pmix_argv_free(client_env);
 380         client_env = NULL;
 381 
 382         /* hang around until the client(s) finalize */
 383         while (0 < wakeup) {
 384             struct timespec ts;
 385             ts.tv_sec = 0;
 386             ts.tv_nsec = 100000;
 387             nanosleep(&ts, NULL);
 388         }
 389 
 390         /* see if anyone exited with non-zero status */
 391         n=0;
 392         PMIX_LIST_FOREACH(child, &children, wait_tracker_t) {
 393             if (0 != child->exit_code) {
 394                 fprintf(stderr, "Child %d exited with status %d - test FAILED\n", n, child->exit_code);
 395                 goto done;
 396             }
 397             ++n;
 398         }
 399 
 400         /* deregister the clients */
 401         for (n = 0; n < nprocs; n++) {
 402             proc.rank = n;
 403             x = PMIX_NEW(myxfer_t);
 404             PMIx_server_deregister_client(&proc, opcbfunc, x);
 405             DEBUG_WAIT_THREAD(&x->lock);
 406             PMIX_RELEASE(x);
 407         }
 408         /* deregister the nspace */
 409         x = PMIX_NEW(myxfer_t);
 410         PMIx_server_deregister_nspace(proc.nspace, opcbfunc, x);
 411         DEBUG_WAIT_THREAD(&x->lock);
 412         PMIX_RELEASE(x);
 413 
 414         PMIX_LIST_DESTRUCT(&children);
 415         PMIX_CONSTRUCT(&children, pmix_list_t);
 416 
 417         sleep(delay);
 418     }
 419 
 420   done:
 421     /* deregister the event handlers */
 422     PMIx_Deregister_event_handler(0, NULL, NULL);
 423 
 424     /* release any pub data */
 425     PMIX_LIST_DESTRUCT(&pubdata);
 426 
 427     free(executable);
 428 
 429     /* finalize the server library */
 430     if (PMIX_SUCCESS != (rc = PMIx_server_finalize())) {
 431         fprintf(stderr, "Finalize failed with error %d\n", rc);
 432         exit_code = rc;
 433     }
 434 
 435     if (0 == exit_code) {
 436         fprintf(stderr, "Test finished OK!\n");
 437     } else {
 438         fprintf(stderr, "TEST FAILED WITH ERROR %d\n", exit_code);
 439     }
 440 
 441     return exit_code;
 442 }
 443 
 444 static void set_namespace(int nprocs, char *ranks, char *nspace,
 445                           pmix_op_cbfunc_t cbfunc, myxfer_t *x)
 446 {
 447     char *regex, *ppn;
 448     char hostname[PMIX_MAXHOSTNAMELEN];
 449 
 450     gethostname(hostname, sizeof(hostname));
 451     x->ninfo = 7;
 452 
 453     PMIX_INFO_CREATE(x->info, x->ninfo);
 454     (void)strncpy(x->info[0].key, PMIX_UNIV_SIZE, PMIX_MAX_KEYLEN);
 455     x->info[0].value.type = PMIX_UINT32;
 456     x->info[0].value.data.uint32 = nprocs;
 457 
 458     (void)strncpy(x->info[1].key, PMIX_SPAWNED, PMIX_MAX_KEYLEN);
 459     x->info[1].value.type = PMIX_UINT32;
 460     x->info[1].value.data.uint32 = 0;
 461 
 462     (void)strncpy(x->info[2].key, PMIX_LOCAL_SIZE, PMIX_MAX_KEYLEN);
 463     x->info[2].value.type = PMIX_UINT32;
 464     x->info[2].value.data.uint32 = nprocs;
 465 
 466     (void)strncpy(x->info[3].key, PMIX_LOCAL_PEERS, PMIX_MAX_KEYLEN);
 467     x->info[3].value.type = PMIX_STRING;
 468     x->info[3].value.data.string = strdup(ranks);
 469 
 470     PMIx_generate_regex(hostname, &regex);
 471     (void)strncpy(x->info[4].key, PMIX_NODE_MAP, PMIX_MAX_KEYLEN);
 472     x->info[4].value.type = PMIX_STRING;
 473     x->info[4].value.data.string = regex;
 474 
 475     PMIx_generate_ppn(ranks, &ppn);
 476     (void)strncpy(x->info[5].key, PMIX_PROC_MAP, PMIX_MAX_KEYLEN);
 477     x->info[5].value.type = PMIX_STRING;
 478     x->info[5].value.data.string = ppn;
 479 
 480     (void)strncpy(x->info[6].key, PMIX_JOB_SIZE, PMIX_MAX_KEYLEN);
 481     x->info[6].value.type = PMIX_UINT32;
 482     x->info[6].value.data.uint32 = nprocs;
 483 
 484     PMIx_server_register_nspace(nspace, nprocs, x->info, x->ninfo,
 485                                 cbfunc, x);
 486 }
 487 
 488 static void errhandler(size_t evhdlr_registration_id,
 489                        pmix_status_t status,
 490                        const pmix_proc_t *source,
 491                        pmix_info_t info[], size_t ninfo,
 492                        pmix_info_t results[], size_t nresults,
 493                        pmix_event_notification_cbfunc_fn_t cbfunc,
 494                        void *cbdata)
 495 {
 496     return;
 497 }
 498 
 499 static void errhandler_reg_callbk (pmix_status_t status,
 500                                    size_t errhandler_ref,
 501                                    void *cbdata)
 502 {
 503     mylock_t *lock = (mylock_t*)cbdata;
 504 
 505     lock->status = status;
 506     DEBUG_WAKEUP_THREAD(lock);
 507 }
 508 
 509 static pmix_status_t connected(const pmix_proc_t *proc, void *server_object,
 510                                pmix_op_cbfunc_t cbfunc, void *cbdata)
 511 {
 512     if (NULL != cbfunc) {
 513         cbfunc(PMIX_SUCCESS, cbdata);
 514     }
 515     return PMIX_SUCCESS;
 516 }
 517 static pmix_status_t finalized(const pmix_proc_t *proc, void *server_object,
 518                      pmix_op_cbfunc_t cbfunc, void *cbdata)
 519 {
 520     /* ensure we call the cbfunc so the proc can exit! */
 521     if (NULL != cbfunc) {
 522         cbfunc(PMIX_SUCCESS, cbdata);
 523     }
 524     return PMIX_SUCCESS;
 525 }
 526 
 527 static void abcbfunc(pmix_status_t status, void *cbdata)
 528 {
 529     myxfer_t *x = (myxfer_t*)cbdata;
 530 
 531     /* be sure to release the caller */
 532     if (NULL != x->cbfunc) {
 533         x->cbfunc(status, x->cbdata);
 534     }
 535     PMIX_RELEASE(x);
 536 }
 537 
 538 static pmix_status_t abort_fn(const pmix_proc_t *proc,
 539                               void *server_object,
 540                               int status, const char msg[],
 541                               pmix_proc_t procs[], size_t nprocs,
 542                               pmix_op_cbfunc_t cbfunc, void *cbdata)
 543 {
 544     pmix_status_t rc;
 545     myxfer_t *x;
 546 
 547     /* instead of aborting the specified procs, notify them
 548      * (if they have registered their errhandler) */
 549 
 550     /* use the myxfer_t object to ensure we release
 551      * the caller when notification has been queued */
 552     x = PMIX_NEW(myxfer_t);
 553     (void)strncpy(x->caller.nspace, proc->nspace, PMIX_MAX_NSLEN);
 554     x->caller.rank = proc->rank;
 555 
 556     PMIX_INFO_CREATE(x->info, 2);
 557     (void)strncpy(x->info[0].key, "DARTH", PMIX_MAX_KEYLEN);
 558     x->info[0].value.type = PMIX_INT8;
 559     x->info[0].value.data.int8 = 12;
 560     (void)strncpy(x->info[1].key, "VADER", PMIX_MAX_KEYLEN);
 561     x->info[1].value.type = PMIX_DOUBLE;
 562     x->info[1].value.data.dval = 12.34;
 563     x->cbfunc = cbfunc;
 564     x->cbdata = cbdata;
 565 
 566     if (PMIX_SUCCESS != (rc = PMIx_Notify_event(status, &x->caller,
 567                                                 PMIX_RANGE_NAMESPACE,
 568                                                 x->info, 2,
 569                                                 abcbfunc, x))) {
 570         pmix_output(0, "SERVER: FAILED NOTIFY ERROR %d", (int)rc);
 571     }
 572 
 573     return PMIX_SUCCESS;
 574 }
 575 
 576 
 577 static pmix_status_t fencenb_fn(const pmix_proc_t procs[], size_t nprocs,
 578                       const pmix_info_t info[], size_t ninfo,
 579                       char *data, size_t ndata,
 580                       pmix_modex_cbfunc_t cbfunc, void *cbdata)
 581 {
 582     /* pass the provided data back to each participating proc */
 583     if (NULL != cbfunc) {
 584         cbfunc(PMIX_SUCCESS, data, ndata, cbdata, free, data);
 585     }
 586     return PMIX_SUCCESS;
 587 }
 588 
 589 
 590 static pmix_status_t dmodex_fn(const pmix_proc_t *proc,
 591                      const pmix_info_t info[], size_t ninfo,
 592                      pmix_modex_cbfunc_t cbfunc, void *cbdata)
 593 {
 594     /* if this is a timeout test, then do nothing */
 595     if (istimeouttest) {
 596         return PMIX_SUCCESS;
 597     }
 598 
 599     /* we don't have any data for remote procs as this
 600      * test only runs one server - so report accordingly */
 601     if (NULL != cbfunc) {
 602         cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, cbdata, NULL, NULL);
 603     }
 604     return PMIX_SUCCESS;
 605 }
 606 
 607 
 608 static pmix_status_t publish_fn(const pmix_proc_t *proc,
 609                       const pmix_info_t info[], size_t ninfo,
 610                       pmix_op_cbfunc_t cbfunc, void *cbdata)
 611 {
 612     pmix_locdat_t *p;
 613     size_t n;
 614 
 615     for (n=0; n < ninfo; n++) {
 616         p = PMIX_NEW(pmix_locdat_t);
 617         (void)strncpy(p->pdata.proc.nspace, proc->nspace, PMIX_MAX_NSLEN);
 618         p->pdata.proc.rank = proc->rank;
 619         (void)strncpy(p->pdata.key, info[n].key, PMIX_MAX_KEYLEN);
 620         pmix_value_xfer(&p->pdata.value, (pmix_value_t*)&info[n].value);
 621         pmix_list_append(&pubdata, &p->super);
 622     }
 623     if (NULL != cbfunc) {
 624         cbfunc(PMIX_SUCCESS, cbdata);
 625     }
 626     return PMIX_SUCCESS;
 627 }
 628 
 629 
 630 static pmix_status_t lookup_fn(const pmix_proc_t *proc, char **keys,
 631                      const pmix_info_t info[], size_t ninfo,
 632                      pmix_lookup_cbfunc_t cbfunc, void *cbdata)
 633 {
 634     pmix_locdat_t *p, *p2;
 635     pmix_list_t results;
 636     size_t i, n;
 637     pmix_pdata_t *pd = NULL;
 638     pmix_status_t ret = PMIX_ERR_NOT_FOUND;
 639 
 640     PMIX_CONSTRUCT(&results, pmix_list_t);
 641 
 642     for (n=0; NULL != keys[n]; n++) {
 643         PMIX_LIST_FOREACH(p, &pubdata, pmix_locdat_t) {
 644             if (0 == strncmp(keys[n], p->pdata.key, PMIX_MAX_KEYLEN)) {
 645                 p2 = PMIX_NEW(pmix_locdat_t);
 646                 (void)strncpy(p2->pdata.proc.nspace, p->pdata.proc.nspace, PMIX_MAX_NSLEN);
 647                 p2->pdata.proc.rank = p->pdata.proc.rank;
 648                 (void)strncpy(p2->pdata.key, p->pdata.key, PMIX_MAX_KEYLEN);
 649                 pmix_value_xfer(&p2->pdata.value, &p->pdata.value);
 650                 pmix_list_append(&results, &p2->super);
 651                 break;
 652             }
 653         }
 654     }
 655     if (0 < (n = pmix_list_get_size(&results))) {
 656         ret = PMIX_SUCCESS;
 657         PMIX_PDATA_CREATE(pd, n);
 658         for (i=0; i < n; i++) {
 659             p = (pmix_locdat_t*)pmix_list_remove_first(&results);
 660             if (p) {
 661                 (void)strncpy(pd[i].proc.nspace, p->pdata.proc.nspace, PMIX_MAX_NSLEN);
 662                 pd[i].proc.rank = p->pdata.proc.rank;
 663                 (void)strncpy(pd[i].key, p->pdata.key, PMIX_MAX_KEYLEN);
 664                 pmix_value_xfer(&pd[i].value, &p->pdata.value);
 665             }
 666         }
 667     }
 668     PMIX_LIST_DESTRUCT(&results);
 669     if (NULL != cbfunc) {
 670         cbfunc(ret, pd, n, cbdata);
 671     }
 672     if (0 < n) {
 673         PMIX_PDATA_FREE(pd, n);
 674     }
 675     return PMIX_SUCCESS;
 676 }
 677 
 678 
 679 static pmix_status_t unpublish_fn(const pmix_proc_t *proc, char **keys,
 680                         const pmix_info_t info[], size_t ninfo,
 681                         pmix_op_cbfunc_t cbfunc, void *cbdata)
 682 {
 683     pmix_locdat_t *p, *p2;
 684     size_t n;
 685 
 686     for (n=0; NULL != keys[n]; n++) {
 687         PMIX_LIST_FOREACH_SAFE(p, p2, &pubdata, pmix_locdat_t) {
 688             if (0 == strncmp(keys[n], p->pdata.key, PMIX_MAX_KEYLEN)) {
 689                 pmix_list_remove_item(&pubdata, &p->super);
 690                 PMIX_RELEASE(p);
 691                 break;
 692             }
 693         }
 694     }
 695     if (NULL != cbfunc) {
 696         cbfunc(PMIX_SUCCESS, cbdata);
 697     }
 698     return PMIX_SUCCESS;
 699 }
 700 
 701 static void spcbfunc(pmix_status_t status, void *cbdata)
 702 {
 703     myxfer_t *x = (myxfer_t*)cbdata;
 704 
 705     if (NULL != x->spcbfunc) {
 706         x->spcbfunc(PMIX_SUCCESS, "DYNSPACE", x->cbdata);
 707     }
 708 }
 709 
 710 static pmix_status_t spawn_fn(const pmix_proc_t *proc,
 711                     const pmix_info_t job_info[], size_t ninfo,
 712                     const pmix_app_t apps[], size_t napps,
 713                     pmix_spawn_cbfunc_t cbfunc, void *cbdata)
 714 {
 715     myxfer_t *x;
 716 
 717     /* in practice, we would pass this request to the local
 718      * resource manager for launch, and then have that server
 719      * execute our callback function. For now, we will fake
 720      * the spawn and just pretend */
 721 
 722     /* must register the nspace for the new procs before
 723      * we return to the caller */
 724     x = PMIX_NEW(myxfer_t);
 725     x->spcbfunc = cbfunc;
 726     x->cbdata = cbdata;
 727 
 728     set_namespace(2, "0,1", "DYNSPACE", spcbfunc, x);
 729 
 730     return PMIX_SUCCESS;
 731 }
 732 
 733 static int numconnects = 0;
 734 
 735 static pmix_status_t connect_fn(const pmix_proc_t procs[], size_t nprocs,
 736                                 const pmix_info_t info[], size_t ninfo,
 737                                 pmix_op_cbfunc_t cbfunc, void *cbdata)
 738 {
 739 
 740     /* in practice, we would pass this request to the local
 741      * resource manager for handling */
 742 
 743     numconnects++;
 744 
 745     if (NULL != cbfunc) {
 746         cbfunc(PMIX_SUCCESS, cbdata);
 747     }
 748 
 749     return PMIX_SUCCESS;
 750 }
 751 
 752 
 753 static pmix_status_t disconnect_fn(const pmix_proc_t procs[], size_t nprocs,
 754                                    const pmix_info_t info[], size_t ninfo,
 755                                    pmix_op_cbfunc_t cbfunc, void *cbdata)
 756 {
 757 
 758     /* in practice, we would pass this request to the local
 759      * resource manager for handling */
 760 
 761     if (NULL != cbfunc) {
 762         cbfunc(PMIX_SUCCESS, cbdata);
 763     }
 764 
 765     return PMIX_SUCCESS;
 766 }
 767 
 768 static pmix_status_t register_event_fn(pmix_status_t *codes, size_t ncodes,
 769                                        const pmix_info_t info[], size_t ninfo,
 770                                        pmix_op_cbfunc_t cbfunc, void *cbdata)
 771 {
 772     if (NULL != cbfunc) {
 773         cbfunc(PMIX_SUCCESS, cbdata);
 774     }
 775     return PMIX_SUCCESS;
 776 }
 777 
 778 static pmix_status_t deregister_events(pmix_status_t *codes, size_t ncodes,
 779                                        pmix_op_cbfunc_t cbfunc, void *cbdata)
 780 {
 781     return PMIX_SUCCESS;
 782 }
 783 
 784 static pmix_status_t notify_event(pmix_status_t code,
 785                                   const pmix_proc_t *source,
 786                                   pmix_data_range_t range,
 787                                   pmix_info_t info[], size_t ninfo,
 788                                   pmix_op_cbfunc_t cbfunc, void *cbdata)
 789 {
 790     return PMIX_SUCCESS;
 791 }
 792 
 793 typedef struct query_data_t {
 794     pmix_info_t *data;
 795     size_t ndata;
 796 } query_data_t;
 797 
 798 static pmix_status_t query_fn(pmix_proc_t *proct,
 799                               pmix_query_t *queries, size_t nqueries,
 800                               pmix_info_cbfunc_t cbfunc,
 801                               void *cbdata)
 802 {
 803     size_t n;
 804     pmix_info_t *info;
 805 
 806     if (NULL == cbfunc) {
 807         return PMIX_ERROR;
 808     }
 809     /* keep this simple */
 810     PMIX_INFO_CREATE(info, nqueries);
 811     for (n=0; n < nqueries; n++) {
 812         (void)strncpy(info[n].key, queries[n].keys[0], PMIX_MAX_KEYLEN);
 813         info[n].value.type = PMIX_STRING;
 814         if (0 > asprintf(&info[n].value.data.string, "%d", (int)n)) {
 815             return PMIX_ERROR;
 816         }
 817     }
 818     cbfunc(PMIX_SUCCESS, info, nqueries, cbdata, NULL, NULL);
 819     return PMIX_SUCCESS;
 820 }
 821 
 822 static void tool_connect_fn(pmix_info_t *info, size_t ninfo,
 823                             pmix_tool_connection_cbfunc_t cbfunc,
 824                             void *cbdata)
 825 {
 826     pmix_proc_t proc;
 827 
 828     /* just pass back an arbitrary nspace */
 829     (void)strncpy(proc.nspace, "TOOL", PMIX_MAX_NSLEN);
 830     proc.rank = 0;
 831 
 832     if (NULL != cbfunc) {
 833         cbfunc(PMIX_SUCCESS, &proc, cbdata);
 834     }
 835 }
 836 
 837 static void log_fn(const pmix_proc_t *client,
 838                    const pmix_info_t data[], size_t ndata,
 839                    const pmix_info_t directives[], size_t ndirs,
 840                    pmix_op_cbfunc_t cbfunc, void *cbdata)
 841 {
 842     if (NULL != cbfunc) {
 843         cbfunc(PMIX_SUCCESS, cbdata);
 844     }
 845 }
 846 
 847 static void wait_signal_callback(int fd, short event, void *arg)
 848 {
 849     pmix_event_t *sig = (pmix_event_t*) arg;
 850     int status;
 851     pid_t pid;
 852     wait_tracker_t *t2;
 853 
 854     if (SIGCHLD != pmix_event_get_signal(sig)) {
 855         return;
 856     }
 857 
 858     /* we can have multiple children leave but only get one
 859      * sigchild callback, so reap all the waitpids until we
 860      * don't get anything valid back */
 861     while (1) {
 862         pid = waitpid(-1, &status, WNOHANG);
 863         if (-1 == pid && EINTR == errno) {
 864             /* try it again */
 865             continue;
 866         }
 867         /* if we got garbage, then nothing we can do */
 868         if (pid <= 0) {
 869             return;
 870         }
 871 
 872         /* we are already in an event, so it is safe to access the list */
 873         PMIX_LIST_FOREACH(t2, &children, wait_tracker_t) {
 874             if (pid == t2->pid) {
 875                 t2->exit_code = status;
 876                 /* found it! */
 877                 if (0 != status && 0 == exit_code) {
 878                     exit_code = status;
 879                 }
 880                 --wakeup;
 881                 break;
 882             }
 883         }
 884     }
 885 }

/* [<][>][^][v][top][bottom][index][help] */