root/opal/mca/pmix/pmix4x/pmix/examples/server.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. xfcon
  2. xfdes
  3. opcbfunc
  4. main
  5. setup_cbfunc
  6. set_namespace
  7. errhandler
  8. errhandler_reg_callbk
  9. connected
  10. finalized
  11. abcbfunc
  12. abort_fn
  13. fencenb_fn
  14. dmodex_fn
  15. publish_fn
  16. lookup_fn
  17. unpublish_fn
  18. spcbfunc
  19. spawn_fn
  20. connect_fn
  21. disconnect_fn
  22. register_event_fn
  23. deregister_events
  24. notify_event
  25. query_fn
  26. tool_connect_fn
  27. log_fn
  28. wait_signal_callback

   1 /*
   2  * Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2011 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2006-2013 Los Alamos National Security, LLC.
  13  *                         All rights reserved.
  14  * Copyright (c) 2009-2012 Cisco Systems, Inc.  All rights reserved.
  15  * Copyright (c) 2011      Oak Ridge National Labs.  All rights reserved.
  16  * Copyright (c) 2013-2019 Intel, Inc.  All rights reserved.
  17  * Copyright (c) 2015      Research Organization for Information Science
  18  *                         and Technology (RIST). All rights reserved.
  19  * Copyright (c) 2016      IBM Corporation.  All rights reserved.
  20  * $COPYRIGHT$
  21  *
  22  * Additional copyrights may follow
  23  *
  24  * $HEADER$
  25  *
  26  */
  27 
  28 #include <src/include/pmix_config.h>
  29 #include <pmix_server.h>
  30 #include <src/include/types.h>
  31 #include <src/include/pmix_globals.h>
  32 
  33 #include <stdio.h>
  34 #include <stdlib.h>
  35 #include <unistd.h>
  36 #include <time.h>
  37 #include <sys/types.h>
  38 #include <sys/wait.h>
  39 #include <errno.h>
  40 #include <signal.h>
  41 #include <pwd.h>
  42 #include <sys/stat.h>
  43 #include <dirent.h>
  44 
  45 #include "src/class/pmix_list.h"
  46 #include "src/util/pmix_environ.h"
  47 #include "src/util/output.h"
  48 #include "src/util/printf.h"
  49 #include "src/util/argv.h"
  50 
  51 static pmix_status_t connected(const pmix_proc_t *proc, void *server_object,
  52                                pmix_op_cbfunc_t cbfunc, void *cbdata);
  53 static pmix_status_t finalized(const pmix_proc_t *proc, void *server_object,
  54                                pmix_op_cbfunc_t cbfunc, void *cbdata);
  55 static pmix_status_t abort_fn(const pmix_proc_t *proc, void *server_object,
  56                               int status, const char msg[],
  57                               pmix_proc_t procs[], size_t nprocs,
  58                               pmix_op_cbfunc_t cbfunc, void *cbdata);
  59 static pmix_status_t fencenb_fn(const pmix_proc_t procs[], size_t nprocs,
  60                                 const pmix_info_t info[], size_t ninfo,
  61                                 char *data, size_t ndata,
  62                                 pmix_modex_cbfunc_t cbfunc, void *cbdata);
  63 static pmix_status_t dmodex_fn(const pmix_proc_t *proc,
  64                                const pmix_info_t info[], size_t ninfo,
  65                                pmix_modex_cbfunc_t cbfunc, void *cbdata);
  66 static pmix_status_t publish_fn(const pmix_proc_t *proc,
  67                                 const pmix_info_t info[], size_t ninfo,
  68                                 pmix_op_cbfunc_t cbfunc, void *cbdata);
  69 static pmix_status_t lookup_fn(const pmix_proc_t *proc, char **keys,
  70                                const pmix_info_t info[], size_t ninfo,
  71                                pmix_lookup_cbfunc_t cbfunc, void *cbdata);
  72 static pmix_status_t unpublish_fn(const pmix_proc_t *proc, char **keys,
  73                                   const pmix_info_t info[], size_t ninfo,
  74                                   pmix_op_cbfunc_t cbfunc, void *cbdata);
  75 static pmix_status_t spawn_fn(const pmix_proc_t *proc,
  76                               const pmix_info_t job_info[], size_t ninfo,
  77                               const pmix_app_t apps[], size_t napps,
  78                               pmix_spawn_cbfunc_t cbfunc, void *cbdata);
  79 static pmix_status_t connect_fn(const pmix_proc_t procs[], size_t nprocs,
  80                                 const pmix_info_t info[], size_t ninfo,
  81                                 pmix_op_cbfunc_t cbfunc, void *cbdata);
  82 static pmix_status_t disconnect_fn(const pmix_proc_t procs[], size_t nprocs,
  83                                    const pmix_info_t info[], size_t ninfo,
  84                                    pmix_op_cbfunc_t cbfunc, void *cbdata);
  85 static pmix_status_t register_event_fn(pmix_status_t *codes, size_t ncodes,
  86                                        const pmix_info_t info[], size_t ninfo,
  87                                        pmix_op_cbfunc_t cbfunc, void *cbdata);
  88 static pmix_status_t deregister_events(pmix_status_t *codes, size_t ncodes,
  89                                        pmix_op_cbfunc_t cbfunc, void *cbdata);
  90 static pmix_status_t notify_event(pmix_status_t code,
  91                                   const pmix_proc_t *source,
  92                                   pmix_data_range_t range,
  93                                   pmix_info_t info[], size_t ninfo,
  94                                   pmix_op_cbfunc_t cbfunc, void *cbdata);
  95 static pmix_status_t query_fn(pmix_proc_t *proct,
  96                               pmix_query_t *queries, size_t nqueries,
  97                               pmix_info_cbfunc_t cbfunc,
  98                               void *cbdata);
  99 static void tool_connect_fn(pmix_info_t *info, size_t ninfo,
 100                             pmix_tool_connection_cbfunc_t cbfunc,
 101                             void *cbdata);
 102 static void log_fn(const pmix_proc_t *client,
 103                    const pmix_info_t data[], size_t ndata,
 104                    const pmix_info_t directives[], size_t ndirs,
 105                    pmix_op_cbfunc_t cbfunc, void *cbdata);
 106 
 107 static pmix_server_module_t mymodule = {
 108     .client_connected = connected,
 109     .client_finalized = finalized,
 110     .abort = abort_fn,
 111     .fence_nb = fencenb_fn,
 112     .direct_modex = dmodex_fn,
 113     .publish = publish_fn,
 114     .lookup = lookup_fn,
 115     .unpublish = unpublish_fn,
 116     .spawn = spawn_fn,
 117     .connect = connect_fn,
 118     .disconnect = disconnect_fn,
 119     .register_events = register_event_fn,
 120     .deregister_events = deregister_events,
 121     .notify_event = notify_event,
 122     .query = query_fn,
 123     .tool_connected = tool_connect_fn,
 124     .log = log_fn
 125 };
 126 
 127 typedef struct {
 128     pmix_list_item_t super;
 129     pmix_pdata_t pdata;
 130 } pmix_locdat_t;
 131 PMIX_CLASS_INSTANCE(pmix_locdat_t,
 132                     pmix_list_item_t,
 133                     NULL, NULL);
 134 
 135 #define PMIX_WAIT_FOR_COMPLETION(a)             \
 136     do {                                        \
 137         while ((a)) {                           \
 138             usleep(10);                         \
 139         }                                       \
 140         PMIX_ACQUIRE_OBJECT((a));               \
 141     } while (0)
 142 
 143 typedef struct {
 144     pmix_object_t super;
 145     volatile bool active;
 146     pmix_proc_t caller;
 147     pmix_info_t *info;
 148     size_t ninfo;
 149     pmix_op_cbfunc_t cbfunc;
 150     pmix_spawn_cbfunc_t spcbfunc;
 151     void *cbdata;
 152 } myxfer_t;
 153 static void xfcon(myxfer_t *p)
 154 {
 155     p->info = NULL;
 156     p->ninfo = 0;
 157     p->active = true;
 158     p->cbfunc = NULL;
 159     p->spcbfunc = NULL;
 160     p->cbdata = NULL;
 161 }
 162 static void xfdes(myxfer_t *p)
 163 {
 164     if (NULL != p->info) {
 165         PMIX_INFO_FREE(p->info, p->ninfo);
 166     }
 167 }
 168 PMIX_CLASS_INSTANCE(myxfer_t,
 169                     pmix_object_t,
 170                     xfcon, xfdes);
 171 
 172 typedef struct {
 173     pmix_list_item_t super;
 174     pid_t pid;
 175 } wait_tracker_t;
 176 PMIX_CLASS_INSTANCE(wait_tracker_t,
 177                     pmix_list_item_t,
 178                     NULL, NULL);
 179 
 180 static volatile int wakeup;
 181 static pmix_list_t pubdata;
 182 static pmix_event_t handler;
 183 static pmix_list_t children;
 184 
 185 static void set_namespace(int nprocs, char *ranks, char *nspace,
 186                           pmix_op_cbfunc_t cbfunc, myxfer_t *x);
 187 static void errhandler(size_t evhdlr_registration_id,
 188                        pmix_status_t status,
 189                        const pmix_proc_t *source,
 190                        pmix_info_t info[], size_t ninfo,
 191                        pmix_info_t results[], size_t nresults,
 192                        pmix_event_notification_cbfunc_fn_t cbfunc,
 193                        void *cbdata);
 194 static void wait_signal_callback(int fd, short event, void *arg);
 195 static void errhandler_reg_callbk (pmix_status_t status,
 196                                    size_t errhandler_ref,
 197                                    void *cbdata);
 198 
 199 static void opcbfunc(pmix_status_t status, void *cbdata)
 200 {
 201     myxfer_t *x = (myxfer_t*)cbdata;
 202 
 203     /* release the caller, if necessary */
 204     if (NULL != x->cbfunc) {
 205         x->cbfunc(PMIX_SUCCESS, x->cbdata);
 206     }
 207     x->active = false;
 208 }
 209 
 210 int main(int argc, char **argv)
 211 {
 212     char **client_env=NULL;
 213     char **client_argv=NULL;
 214     char *tmp, **atmp, *executable=NULL, *tmpdir, *cleanup;
 215     int rc, nprocs=1, n, k;
 216     uid_t myuid;
 217     gid_t mygid;
 218     pid_t pid;
 219     myxfer_t *x;
 220     pmix_proc_t proc;
 221     wait_tracker_t *child;
 222     char *tdir;
 223     uid_t uid = geteuid();
 224     pmix_info_t *info;
 225     struct stat buf;
 226 
 227     /* define and pass a personal tmpdir to protect the system */
 228     if (NULL == (tdir = getenv("TMPDIR"))) {
 229         if (NULL == (tdir = getenv("TEMP"))) {
 230             if (NULL == (tdir = getenv("TMP"))) {
 231                 tdir = "/tmp";
 232             }
 233         }
 234     }
 235     if (0 > asprintf(&tmpdir, "%s/pmix.%lu", tdir, (long unsigned)uid)) {
 236         fprintf(stderr, "Out of memory\n");
 237         exit(1);
 238     }
 239     /* create the directory */
 240     if (0 != stat(tmpdir, &buf)) {
 241         /* try to make directory */
 242         if (0 != mkdir(tmpdir, S_IRWXU)) {
 243             fprintf(stderr, "Cannot make tmpdir %s", tmpdir);
 244             exit(1);
 245         }
 246     }
 247     asprintf(&cleanup, "rm -rf %s", tmpdir);
 248     PMIX_INFO_CREATE(info, 1);
 249     PMIX_INFO_LOAD(&info[0], PMIX_SERVER_TMPDIR, tmpdir, PMIX_STRING);
 250 
 251     /* setup the server library */
 252     if (PMIX_SUCCESS != (rc = PMIx_server_init(&mymodule, info, 1))) {
 253         fprintf(stderr, "Init failed with error %d\n", rc);
 254         return rc;
 255     }
 256     PMIX_INFO_FREE(info, 1);
 257 
 258     /* register the errhandler */
 259     PMIx_Register_event_handler(NULL, 0, NULL, 0,
 260                                 errhandler, errhandler_reg_callbk, NULL);
 261 
 262     /* setup the pub data, in case it is used */
 263     PMIX_CONSTRUCT(&pubdata, pmix_list_t);
 264 
 265     /* setup to see sigchld on the forked tests */
 266     PMIX_CONSTRUCT(&children, pmix_list_t);
 267     pmix_event_assign(&handler, pmix_globals.evbase, SIGCHLD,
 268                       EV_SIGNAL|EV_PERSIST,wait_signal_callback, &handler);
 269     pmix_event_add(&handler, NULL);
 270 
 271     /* see if we were passed the number of procs to run or
 272      * the executable to use */
 273     for (n=1; n < (argc-1); n++) {
 274         if (0 == strcmp("-n", argv[n]) &&
 275             NULL != argv[n+1]) {
 276             nprocs = strtol(argv[n+1], NULL, 10);
 277             ++n;  // step over the argument
 278         } else if (0 == strcmp("-e", argv[n]) &&
 279                    NULL != argv[n+1]) {
 280             executable = strdup(argv[n+1]);
 281             for (k=n+2; NULL != argv[k]; k++) {
 282                 pmix_argv_append_nosize(&client_argv, argv[k]);
 283             }
 284             n += k;
 285         }
 286     }
 287     if (NULL == executable) {
 288         executable = strdup("./simpclient");
 289     }
 290 
 291     /* we have a single namespace for all clients */
 292     atmp = NULL;
 293     for (n=0; n < nprocs; n++) {
 294         asprintf(&tmp, "%d", n);
 295         pmix_argv_append_nosize(&atmp, tmp);
 296         free(tmp);
 297     }
 298     tmp = pmix_argv_join(atmp, ',');
 299     pmix_argv_free(atmp);
 300     /* register the nspace */
 301     x = PMIX_NEW(myxfer_t);
 302     set_namespace(nprocs, tmp, "foobar", opcbfunc, x);
 303 
 304     /* set common argv and env */
 305     client_env = pmix_argv_copy(environ);
 306     pmix_argv_prepend_nosize(&client_argv, executable);
 307 
 308     wakeup = nprocs;
 309     myuid = getuid();
 310     mygid = getgid();
 311 
 312     /* if the nspace registration hasn't completed yet,
 313      * wait for it here */
 314     PMIX_WAIT_FOR_COMPLETION(x->active);
 315     free(tmp);
 316     PMIX_RELEASE(x);
 317 
 318     /* prep the local node for launch */
 319     x = PMIX_NEW(myxfer_t);
 320     if (PMIX_SUCCESS != (rc = PMIx_server_setup_local_support("foobar", NULL, 0, opcbfunc, x))) {
 321         fprintf(stderr, "Setup local support failed: %d\n", rc);
 322         PMIx_server_finalize();
 323         system(cleanup);
 324         return rc;
 325     }
 326     PMIX_WAIT_FOR_COMPLETION(x->active);
 327     PMIX_RELEASE(x);
 328 
 329     /* fork/exec the test */
 330     (void)strncpy(proc.nspace, "foobar", PMIX_MAX_NSLEN);
 331     for (n = 0; n < nprocs; n++) {
 332         proc.rank = n;
 333         if (PMIX_SUCCESS != (rc = PMIx_server_setup_fork(&proc, &client_env))) {//n
 334             fprintf(stderr, "Server fork setup failed with error %d\n", rc);
 335             PMIx_server_finalize();
 336             system(cleanup);
 337             return rc;
 338         }
 339         x = PMIX_NEW(myxfer_t);
 340         if (PMIX_SUCCESS != (rc = PMIx_server_register_client(&proc, myuid, mygid,
 341                                                               NULL, opcbfunc, x))) {
 342             fprintf(stderr, "Server fork setup failed with error %d\n", rc);
 343             PMIx_server_finalize();
 344             system(cleanup);
 345             return rc;
 346         }
 347         /* don't fork/exec the client until we know it is registered
 348          * so we avoid a potential race condition in the server */
 349         PMIX_WAIT_FOR_COMPLETION(x->active);
 350         PMIX_RELEASE(x);
 351         pid = fork();
 352         if (pid < 0) {
 353             fprintf(stderr, "Fork failed\n");
 354             PMIx_server_finalize();
 355             system(cleanup);
 356             return -1;
 357         }
 358         child = PMIX_NEW(wait_tracker_t);
 359         child->pid = pid;
 360         pmix_list_append(&children, &child->super);
 361 
 362         if (pid == 0) {
 363             execve(executable, client_argv, client_env);
 364             /* Does not return */
 365             exit(0);
 366         }
 367     }
 368     free(executable);
 369     pmix_argv_free(client_argv);
 370     pmix_argv_free(client_env);
 371 
 372     /* hang around until the client(s) finalize */
 373     while (0 < wakeup) {
 374         struct timespec ts;
 375         ts.tv_sec = 0;
 376         ts.tv_nsec = 100000;
 377         nanosleep(&ts, NULL);
 378     }
 379 
 380     /* deregister the errhandler */
 381     PMIx_Deregister_event_handler(0, NULL, NULL);
 382 
 383     /* release any pub data */
 384     PMIX_LIST_DESTRUCT(&pubdata);
 385 
 386     /* finalize the server library */
 387     if (PMIX_SUCCESS != (rc = PMIx_server_finalize())) {
 388         fprintf(stderr, "Finalize failed with error %d\n", rc);
 389     }
 390 
 391     fprintf(stderr, "Test finished OK!\n");
 392     system(cleanup);
 393 
 394     return rc;
 395 }
 396 
 397 static void setup_cbfunc(pmix_status_t status,
 398                          pmix_info_t info[], size_t ninfo,
 399                          void *provided_cbdata,
 400                          pmix_op_cbfunc_t cbfunc, void *cbdata)
 401 {
 402     myxfer_t *myxfer = (myxfer_t*)provided_cbdata;
 403     size_t i;
 404 
 405     if (PMIX_SUCCESS == status && 0 < ninfo) {
 406         myxfer->ninfo = ninfo;
 407         PMIX_INFO_CREATE(myxfer->info, ninfo);
 408         for (i=0; i < ninfo; i++) {
 409             PMIX_INFO_XFER(&myxfer->info[i], &info[i]);
 410         }
 411     }
 412     if (NULL != cbfunc) {
 413         cbfunc(PMIX_SUCCESS, cbdata);
 414     }
 415     myxfer->active = false;
 416 }
 417 
 418 static void set_namespace(int nprocs, char *ranks, char *nspace,
 419                           pmix_op_cbfunc_t cbfunc, myxfer_t *x)
 420 {
 421     char *regex, *ppn;
 422     char hostname[PMIX_MAXHOSTNAMELEN];
 423     pmix_status_t rc;
 424     myxfer_t myxfer;
 425     size_t i;
 426 
 427     gethostname(hostname, sizeof(hostname));
 428 
 429     /* request application setup information - e.g., network
 430      * security keys or endpoint info */
 431     PMIX_CONSTRUCT(&myxfer, myxfer_t);
 432     myxfer.active = true;
 433     if (PMIX_SUCCESS != (rc = PMIx_server_setup_application(nspace, NULL, 0, setup_cbfunc, &myxfer))) {
 434         PMIX_DESTRUCT(&myxfer);
 435         fprintf(stderr, "Failed to setup application: %d\n", rc);
 436         exit(1);
 437     }
 438     PMIX_WAIT_FOR_COMPLETION(myxfer.active);
 439     x->ninfo = myxfer.ninfo + 7;
 440 
 441     PMIX_INFO_CREATE(x->info, x->ninfo);
 442     if (0 < myxfer.ninfo) {
 443         for (i=0; i < myxfer.ninfo; i++) {
 444             PMIX_INFO_XFER(&x->info[i], &myxfer.info[i]);
 445         }
 446     }
 447     PMIX_DESTRUCT(&myxfer);
 448 
 449     (void)strncpy(x->info[i].key, PMIX_UNIV_SIZE, PMIX_MAX_KEYLEN);
 450     x->info[i].value.type = PMIX_UINT32;
 451     x->info[i].value.data.uint32 = nprocs;
 452 
 453     ++i;
 454     (void)strncpy(x->info[i].key, PMIX_SPAWNED, PMIX_MAX_KEYLEN);
 455     x->info[i].value.type = PMIX_UINT32;
 456     x->info[i].value.data.uint32 = 0;
 457 
 458     ++i;
 459     (void)strncpy(x->info[i].key, PMIX_LOCAL_SIZE, PMIX_MAX_KEYLEN);
 460     x->info[i].value.type = PMIX_UINT32;
 461     x->info[i].value.data.uint32 = nprocs;
 462 
 463     ++i;
 464     (void)strncpy(x->info[i].key, PMIX_LOCAL_PEERS, PMIX_MAX_KEYLEN);
 465     x->info[i].value.type = PMIX_STRING;
 466     x->info[i].value.data.string = strdup(ranks);
 467 
 468     ++i;
 469     PMIx_generate_regex(hostname, &regex);
 470     (void)strncpy(x->info[i].key, PMIX_NODE_MAP, PMIX_MAX_KEYLEN);
 471     x->info[i].value.type = PMIX_STRING;
 472     x->info[i].value.data.string = regex;
 473 
 474     ++i;
 475     PMIx_generate_ppn(ranks, &ppn);
 476     (void)strncpy(x->info[i].key, PMIX_PROC_MAP, PMIX_MAX_KEYLEN);
 477     x->info[i].value.type = PMIX_STRING;
 478     x->info[i].value.data.string = ppn;
 479 
 480     ++i;
 481     (void)strncpy(x->info[i].key, PMIX_JOB_SIZE, PMIX_MAX_KEYLEN);
 482     x->info[i].value.type = PMIX_UINT32;
 483     x->info[i].value.data.uint32 = nprocs;
 484 
 485     PMIx_server_register_nspace(nspace, nprocs, x->info, x->ninfo,
 486                                 cbfunc, x);
 487 }
 488 
 489 static void errhandler(size_t evhdlr_registration_id,
 490                        pmix_status_t status,
 491                        const pmix_proc_t *source,
 492                        pmix_info_t info[], size_t ninfo,
 493                        pmix_info_t results[], size_t nresults,
 494                        pmix_event_notification_cbfunc_fn_t cbfunc,
 495                        void *cbdata)
 496 {
 497     pmix_output(0, "SERVER: ERRHANDLER CALLED WITH STATUS %d", status);
 498 }
 499 
 500 static void errhandler_reg_callbk (pmix_status_t status,
 501                                    size_t errhandler_ref,
 502                                    void *cbdata)
 503 {
 504     return;
 505 }
 506 
 507 static pmix_status_t connected(const pmix_proc_t *proc, void *server_object,
 508                                pmix_op_cbfunc_t cbfunc, void *cbdata)
 509 {
 510     if (NULL != cbfunc) {
 511         cbfunc(PMIX_SUCCESS, cbdata);
 512     }
 513     return PMIX_SUCCESS;
 514 }
 515 static pmix_status_t finalized(const pmix_proc_t *proc, void *server_object,
 516                      pmix_op_cbfunc_t cbfunc, void *cbdata)
 517 {
 518     pmix_output(0, "SERVER: FINALIZED %s:%d",
 519                 proc->nspace, proc->rank);
 520     --wakeup;
 521     /* ensure we call the cbfunc so the proc can exit! */
 522     if (NULL != cbfunc) {
 523         cbfunc(PMIX_SUCCESS, cbdata);
 524     }
 525     return PMIX_SUCCESS;
 526 }
 527 
 528 static void abcbfunc(pmix_status_t status, void *cbdata)
 529 {
 530     myxfer_t *x = (myxfer_t*)cbdata;
 531 
 532     /* be sure to release the caller */
 533     if (NULL != x->cbfunc) {
 534         x->cbfunc(status, x->cbdata);
 535     }
 536     PMIX_RELEASE(x);
 537 }
 538 
 539 static pmix_status_t abort_fn(const pmix_proc_t *proc,
 540                               void *server_object,
 541                               int status, const char msg[],
 542                               pmix_proc_t procs[], size_t nprocs,
 543                               pmix_op_cbfunc_t cbfunc, void *cbdata)
 544 {
 545     pmix_status_t rc;
 546     myxfer_t *x;
 547 
 548     if (NULL != procs) {
 549         pmix_output(0, "SERVER: ABORT on %s:%d", procs[0].nspace, procs[0].rank);
 550     } else {
 551         pmix_output(0, "SERVER: ABORT OF ALL PROCS IN NSPACE %s", proc->nspace);
 552     }
 553 
 554     /* instead of aborting the specified procs, notify them
 555      * (if they have registered their errhandler) */
 556 
 557     /* use the myxfer_t object to ensure we release
 558      * the caller when notification has been queued */
 559     x = PMIX_NEW(myxfer_t);
 560     (void)strncpy(x->caller.nspace, proc->nspace, PMIX_MAX_NSLEN);
 561     x->caller.rank = proc->rank;
 562 
 563     PMIX_INFO_CREATE(x->info, 2);
 564     (void)strncpy(x->info[0].key, "DARTH", PMIX_MAX_KEYLEN);
 565     x->info[0].value.type = PMIX_INT8;
 566     x->info[0].value.data.int8 = 12;
 567     (void)strncpy(x->info[1].key, "VADER", PMIX_MAX_KEYLEN);
 568     x->info[1].value.type = PMIX_DOUBLE;
 569     x->info[1].value.data.dval = 12.34;
 570     x->cbfunc = cbfunc;
 571     x->cbdata = cbdata;
 572 
 573     if (PMIX_SUCCESS != (rc = PMIx_Notify_event(status, &x->caller,
 574                                                 PMIX_RANGE_NAMESPACE,
 575                                                 x->info, 2,
 576                                                 abcbfunc, x))) {
 577         pmix_output(0, "SERVER: FAILED NOTIFY ERROR %d", (int)rc);
 578     }
 579 
 580     return PMIX_SUCCESS;
 581 }
 582 
 583 
 584 static pmix_status_t fencenb_fn(const pmix_proc_t procs[], size_t nprocs,
 585                       const pmix_info_t info[], size_t ninfo,
 586                       char *data, size_t ndata,
 587                       pmix_modex_cbfunc_t cbfunc, void *cbdata)
 588 {
 589     pmix_output(0, "SERVER: FENCENB");
 590     /* pass the provided data back to each participating proc */
 591     if (NULL != cbfunc) {
 592         cbfunc(PMIX_SUCCESS, data, ndata, cbdata, NULL, NULL);
 593     }
 594     return PMIX_SUCCESS;
 595 }
 596 
 597 
 598 static pmix_status_t dmodex_fn(const pmix_proc_t *proc,
 599                      const pmix_info_t info[], size_t ninfo,
 600                      pmix_modex_cbfunc_t cbfunc, void *cbdata)
 601 {
 602     pmix_output(0, "SERVER: DMODEX");
 603 
 604     /* we don't have any data for remote procs as this
 605      * test only runs one server - so report accordingly */
 606     if (NULL != cbfunc) {
 607         cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, cbdata, NULL, NULL);
 608     }
 609     return PMIX_SUCCESS;
 610 }
 611 
 612 
 613 static pmix_status_t publish_fn(const pmix_proc_t *proc,
 614                       const pmix_info_t info[], size_t ninfo,
 615                       pmix_op_cbfunc_t cbfunc, void *cbdata)
 616 {
 617     pmix_locdat_t *p;
 618     size_t n;
 619 
 620     pmix_output(0, "SERVER: PUBLISH");
 621 
 622     for (n=0; n < ninfo; n++) {
 623         p = PMIX_NEW(pmix_locdat_t);
 624         (void)strncpy(p->pdata.proc.nspace, proc->nspace, PMIX_MAX_NSLEN);
 625         p->pdata.proc.rank = proc->rank;
 626         (void)strncpy(p->pdata.key, info[n].key, PMIX_MAX_KEYLEN);
 627         pmix_value_xfer(&p->pdata.value, (pmix_value_t*)&info[n].value);
 628         pmix_list_append(&pubdata, &p->super);
 629     }
 630     if (NULL != cbfunc) {
 631         cbfunc(PMIX_SUCCESS, cbdata);
 632     }
 633     return PMIX_SUCCESS;
 634 }
 635 
 636 
 637 static pmix_status_t lookup_fn(const pmix_proc_t *proc, char **keys,
 638                      const pmix_info_t info[], size_t ninfo,
 639                      pmix_lookup_cbfunc_t cbfunc, void *cbdata)
 640 {
 641     pmix_locdat_t *p, *p2;
 642     pmix_list_t results;
 643     size_t i, n;
 644     pmix_pdata_t *pd = NULL;
 645     pmix_status_t ret = PMIX_ERR_NOT_FOUND;
 646 
 647     pmix_output(0, "SERVER: LOOKUP");
 648 
 649     PMIX_CONSTRUCT(&results, pmix_list_t);
 650 
 651     for (n=0; NULL != keys[n]; n++) {
 652         PMIX_LIST_FOREACH(p, &pubdata, pmix_locdat_t) {
 653             if (0 == strncmp(keys[n], p->pdata.key, PMIX_MAX_KEYLEN)) {
 654                 p2 = PMIX_NEW(pmix_locdat_t);
 655                 (void)strncpy(p2->pdata.proc.nspace, p->pdata.proc.nspace, PMIX_MAX_NSLEN);
 656                 p2->pdata.proc.rank = p->pdata.proc.rank;
 657                 (void)strncpy(p2->pdata.key, p->pdata.key, PMIX_MAX_KEYLEN);
 658                 pmix_value_xfer(&p2->pdata.value, &p->pdata.value);
 659                 pmix_list_append(&results, &p2->super);
 660                 break;
 661             }
 662         }
 663     }
 664     if (0 < (n = pmix_list_get_size(&results))) {
 665         ret = PMIX_SUCCESS;
 666         PMIX_PDATA_CREATE(pd, n);
 667         for (i=0; i < n; i++) {
 668             p = (pmix_locdat_t*)pmix_list_remove_first(&results);
 669             if (p) {
 670                 (void)strncpy(pd[i].proc.nspace, p->pdata.proc.nspace, PMIX_MAX_NSLEN);
 671                 pd[i].proc.rank = p->pdata.proc.rank;
 672                 (void)strncpy(pd[i].key, p->pdata.key, PMIX_MAX_KEYLEN);
 673                 pmix_value_xfer(&pd[i].value, &p->pdata.value);
 674             }
 675         }
 676     }
 677     PMIX_LIST_DESTRUCT(&results);
 678     if (NULL != cbfunc) {
 679         cbfunc(ret, pd, n, cbdata);
 680     }
 681     if (0 < n) {
 682         PMIX_PDATA_FREE(pd, n);
 683     }
 684     return PMIX_SUCCESS;
 685 }
 686 
 687 
 688 static pmix_status_t unpublish_fn(const pmix_proc_t *proc, char **keys,
 689                         const pmix_info_t info[], size_t ninfo,
 690                         pmix_op_cbfunc_t cbfunc, void *cbdata)
 691 {
 692     pmix_locdat_t *p, *p2;
 693     size_t n;
 694 
 695     pmix_output(0, "SERVER: UNPUBLISH");
 696 
 697     for (n=0; NULL != keys[n]; n++) {
 698         PMIX_LIST_FOREACH_SAFE(p, p2, &pubdata, pmix_locdat_t) {
 699             if (0 == strncmp(keys[n], p->pdata.key, PMIX_MAX_KEYLEN)) {
 700                 pmix_list_remove_item(&pubdata, &p->super);
 701                 PMIX_RELEASE(p);
 702                 break;
 703             }
 704         }
 705     }
 706     if (NULL != cbfunc) {
 707         cbfunc(PMIX_SUCCESS, cbdata);
 708     }
 709     return PMIX_SUCCESS;
 710 }
 711 
 712 static void spcbfunc(pmix_status_t status, void *cbdata)
 713 {
 714     myxfer_t *x = (myxfer_t*)cbdata;
 715 
 716     if (NULL != x->spcbfunc) {
 717         x->spcbfunc(PMIX_SUCCESS, "DYNSPACE", x->cbdata);
 718     }
 719 }
 720 
 721 static pmix_status_t spawn_fn(const pmix_proc_t *proc,
 722                     const pmix_info_t job_info[], size_t ninfo,
 723                     const pmix_app_t apps[], size_t napps,
 724                     pmix_spawn_cbfunc_t cbfunc, void *cbdata)
 725 {
 726     myxfer_t *x;
 727 
 728     pmix_output(0, "SERVER: SPAWN");
 729 
 730     /* in practice, we would pass this request to the local
 731      * resource manager for launch, and then have that server
 732      * execute our callback function. For now, we will fake
 733      * the spawn and just pretend */
 734 
 735     /* must register the nspace for the new procs before
 736      * we return to the caller */
 737     x = PMIX_NEW(myxfer_t);
 738     x->spcbfunc = cbfunc;
 739     x->cbdata = cbdata;
 740 
 741     set_namespace(2, "0,1", "DYNSPACE", spcbfunc, x);
 742 
 743     return PMIX_SUCCESS;
 744 }
 745 
 746 
 747 static pmix_status_t connect_fn(const pmix_proc_t procs[], size_t nprocs,
 748                                 const pmix_info_t info[], size_t ninfo,
 749                                 pmix_op_cbfunc_t cbfunc, void *cbdata)
 750 {
 751     pmix_output(0, "SERVER: CONNECT");
 752 
 753     /* in practice, we would pass this request to the local
 754      * resource manager for handling */
 755 
 756     if (NULL != cbfunc) {
 757         cbfunc(PMIX_SUCCESS, cbdata);
 758     }
 759 
 760     return PMIX_SUCCESS;
 761 }
 762 
 763 
 764 static pmix_status_t disconnect_fn(const pmix_proc_t procs[], size_t nprocs,
 765                                    const pmix_info_t info[], size_t ninfo,
 766                                    pmix_op_cbfunc_t cbfunc, void *cbdata)
 767 {
 768     pmix_output(0, "SERVER: DISCONNECT");
 769 
 770     /* in practice, we would pass this request to the local
 771      * resource manager for handling */
 772 
 773     if (NULL != cbfunc) {
 774         cbfunc(PMIX_SUCCESS, cbdata);
 775     }
 776 
 777     return PMIX_SUCCESS;
 778 }
 779 
 780 static pmix_status_t register_event_fn(pmix_status_t *codes, size_t ncodes,
 781                                        const pmix_info_t info[], size_t ninfo,
 782                                        pmix_op_cbfunc_t cbfunc, void *cbdata)
 783 {
 784     if (NULL != cbfunc) {
 785         cbfunc(PMIX_SUCCESS, cbdata);
 786     }
 787     return PMIX_SUCCESS;
 788 }
 789 
 790 static pmix_status_t deregister_events(pmix_status_t *codes, size_t ncodes,
 791                                        pmix_op_cbfunc_t cbfunc, void *cbdata)
 792 {
 793     return PMIX_SUCCESS;
 794 }
 795 
 796 static pmix_status_t notify_event(pmix_status_t code,
 797                                   const pmix_proc_t *source,
 798                                   pmix_data_range_t range,
 799                                   pmix_info_t info[], size_t ninfo,
 800                                   pmix_op_cbfunc_t cbfunc, void *cbdata)
 801 {
 802     return PMIX_SUCCESS;
 803 }
 804 
 805 typedef struct query_data_t {
 806     pmix_info_t *data;
 807     size_t ndata;
 808 } query_data_t;
 809 
 810 static pmix_status_t query_fn(pmix_proc_t *proct,
 811                               pmix_query_t *queries, size_t nqueries,
 812                               pmix_info_cbfunc_t cbfunc,
 813                               void *cbdata)
 814 {
 815     size_t n;
 816     pmix_info_t *info;
 817 
 818     pmix_output(0, "SERVER: QUERY");
 819 
 820     if (NULL == cbfunc) {
 821         return PMIX_ERROR;
 822     }
 823     /* keep this simple */
 824     PMIX_INFO_CREATE(info, nqueries);
 825     for (n=0; n < nqueries; n++) {
 826         (void)strncpy(info[n].key, queries[n].keys[0], PMIX_MAX_KEYLEN);
 827         info[n].value.type = PMIX_STRING;
 828         if (0 > asprintf(&info[n].value.data.string, "%d", (int)n)) {
 829             return PMIX_ERROR;
 830         }
 831     }
 832     cbfunc(PMIX_SUCCESS, info, nqueries, cbdata, NULL, NULL);
 833     return PMIX_SUCCESS;
 834 }
 835 
 836 static void tool_connect_fn(pmix_info_t *info, size_t ninfo,
 837                             pmix_tool_connection_cbfunc_t cbfunc,
 838                             void *cbdata)
 839 {
 840     pmix_proc_t proc;
 841 
 842     pmix_output(0, "SERVER: TOOL CONNECT");
 843 
 844     /* just pass back an arbitrary nspace */
 845     (void)strncpy(proc.nspace, "TOOL", PMIX_MAX_NSLEN);
 846     proc.rank = 0;
 847 
 848     if (NULL != cbfunc) {
 849         cbfunc(PMIX_SUCCESS, &proc, cbdata);
 850     }
 851 }
 852 
 853 static void log_fn(const pmix_proc_t *client,
 854                    const pmix_info_t data[], size_t ndata,
 855                    const pmix_info_t directives[], size_t ndirs,
 856                    pmix_op_cbfunc_t cbfunc, void *cbdata)
 857 {
 858     pmix_output(0, "SERVER: LOG");
 859 
 860     if (NULL != cbfunc) {
 861         cbfunc(PMIX_SUCCESS, cbdata);
 862     }
 863 }
 864 
 865 static void wait_signal_callback(int fd, short event, void *arg)
 866 {
 867     pmix_event_t *sig = (pmix_event_t*) arg;
 868     int status;
 869     pid_t pid;
 870     wait_tracker_t *t2;
 871 
 872     if (SIGCHLD != pmix_event_get_signal(sig)) {
 873         return;
 874     }
 875 
 876     /* we can have multiple children leave but only get one
 877      * sigchild callback, so reap all the waitpids until we
 878      * don't get anything valid back */
 879     while (1) {
 880         pid = waitpid(-1, &status, WNOHANG);
 881         if (-1 == pid && EINTR == errno) {
 882             /* try it again */
 883             continue;
 884         }
 885         /* if we got garbage, then nothing we can do */
 886         if (pid <= 0) {
 887             return;
 888         }
 889 
 890         /* we are already in an event, so it is safe to access the list */
 891         PMIX_LIST_FOREACH(t2, &children, wait_tracker_t) {
 892             if (pid == t2->pid) {
 893                 /* found it! */
 894                 --wakeup;
 895                 break;
 896             }
 897         }
 898     }
 899 }

/* [<][>][^][v][top][bottom][index][help] */