root/orte/test/mpi/pinterlib.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. model_registration_callback
  2. model_callback
  3. opcbfunc
  4. infocb
  5. mylib
  6. main

   1 /* -*- C -*-
   2  *
   3  * $HEADER$
   4  *
   5  * The most basic of MPI applications
   6  */
   7 
   8 #include <stdio.h>
   9 #include <pthread.h>
  10 #include "mpi.h"
  11 #include "pmix.h"
  12 
  13 typedef struct {
  14     pthread_mutex_t mutex;
  15     pthread_cond_t cond;
  16     volatile bool active;
  17     pmix_status_t status;
  18 } mylock_t;
  19 
  20 #define MY_CONSTRUCT_LOCK(l)                     \
  21     do {                                            \
  22         pthread_mutex_init(&(l)->mutex, NULL);      \
  23         pthread_cond_init(&(l)->cond, NULL);        \
  24         (l)->active = true;                         \
  25         (l)->status = PMIX_SUCCESS;                 \
  26     } while(0)
  27 
  28 #define MY_DESTRUCT_LOCK(l)              \
  29     do {                                    \
  30         pthread_mutex_destroy(&(l)->mutex); \
  31         pthread_cond_destroy(&(l)->cond);   \
  32     } while(0)
  33 
  34 #define MY_WAIT_THREAD(lck)                                      \
  35     do {                                                            \
  36         pthread_mutex_lock(&(lck)->mutex);                          \
  37         while ((lck)->active) {                                     \
  38             pthread_cond_wait(&(lck)->cond, &(lck)->mutex);         \
  39         }                                                           \
  40         pthread_mutex_unlock(&(lck)->mutex);                        \
  41     } while(0)
  42 
  43 #define MY_WAKEUP_THREAD(lck)                        \
  44     do {                                                \
  45         pthread_mutex_lock(&(lck)->mutex);              \
  46         (lck)->active = false;                          \
  47         pthread_cond_broadcast(&(lck)->cond);           \
  48         pthread_mutex_unlock(&(lck)->mutex);            \
  49     } while(0)
  50 
  51 
  52 static size_t interlibhandler_id = SIZE_MAX;
  53 static mylock_t thread_complete;
  54 static pmix_proc_t myproc;
  55 
  56 static void model_registration_callback(pmix_status_t status,
  57                                         size_t errhandler_ref,
  58                                         void *cbdata)
  59 {
  60     mylock_t *lock = (mylock_t*)cbdata;
  61 
  62     interlibhandler_id = errhandler_ref;
  63     MY_WAKEUP_THREAD(lock);
  64 }
  65 static void model_callback(size_t evhdlr_registration_id,
  66                            pmix_status_t status,
  67                            const pmix_proc_t *source,
  68                            pmix_info_t info[], size_t ninfo,
  69                            pmix_info_t *results, size_t nresults,
  70                            pmix_event_notification_cbfunc_fn_t cbfunc,
  71                            void *cbdata)
  72 {
  73     size_t n;
  74 
  75     /* we can ignore our own callback as we obviously
  76      * know that we are OpenMP */
  77     if (NULL != info) {
  78         for (n=0; n < ninfo; n++) {
  79             if (0 == strcmp(info[n].key, PMIX_PROGRAMMING_MODEL) &&
  80                 0 == strcmp(info[n].value.data.string, "OpenMP")) {
  81                 goto cback;
  82             }
  83             if (PMIX_STRING == info[n].value.type) {
  84                 fprintf(stderr, "Thread Model Callback Key: %s Val %s\n", info[n].key, info[n].value.data.string);
  85             }
  86         }
  87     }
  88     /* otherwise, do something clever here */
  89 
  90   cback:
  91     /* we must NOT tell the event handler state machine that we
  92      * are the last step as that will prevent it from notifying
  93      * anyone else that might be listening for declarations */
  94     if (NULL != cbfunc) {
  95         cbfunc(PMIX_SUCCESS, NULL, 0, NULL, NULL, cbdata);
  96     }
  97     MY_WAKEUP_THREAD(&thread_complete);
  98 }
  99 
 100 static void opcbfunc(pmix_status_t status, void *cbdata)
 101 {
 102     mylock_t *lock = (mylock_t*)cbdata;
 103     MY_WAKEUP_THREAD(lock);
 104 }
 105 
 106 static void infocb(pmix_status_t status,
 107                    pmix_info_t *info, size_t ninfo,
 108                    void *cbdata,
 109                    pmix_release_cbfunc_t release_fn,
 110                    void *release_cbdata)
 111 {
 112     mylock_t *lock = (mylock_t*)cbdata;
 113     size_t n;
 114 
 115     for (n=0; n < ninfo; n++) {
 116         fprintf(stderr, "QUERY DATA KEY: %s VALUE %s\n", info[n].key, info[n].value.data.string);
 117     }
 118     if (NULL != release_fn) {
 119         release_fn(release_cbdata);
 120     }
 121     MY_WAKEUP_THREAD(lock);
 122 }
 123 
 124 static void *mylib(void *ptr)
 125 {
 126     pmix_info_t *info, *directives;
 127     pmix_status_t ret;
 128     mylock_t lock;
 129     bool init = false, flag;
 130     pmix_query_t *query;
 131     pmix_pdata_t *pdata;
 132     pmix_status_t code = PMIX_MODEL_DECLARED;
 133     pmix_value_t *val;
 134     int wait = 0;
 135 
 136     MY_CONSTRUCT_LOCK(&thread_complete);
 137 
 138     /* declare that we are present and active */
 139     PMIX_INFO_CREATE(info, 5);
 140     PMIX_INFO_LOAD(&info[0], PMIX_PROGRAMMING_MODEL, "OpenMP", PMIX_STRING);
 141     PMIX_INFO_LOAD(&info[1], PMIX_MODEL_LIBRARY_NAME, "foobar", PMIX_STRING);
 142     PMIX_INFO_LOAD(&info[2], PMIX_MODEL_LIBRARY_VERSION, "1.2.3.4", PMIX_STRING);
 143     PMIX_INFO_LOAD(&info[3], PMIX_THREADING_MODEL, "PTHREAD", PMIX_STRING);
 144     /* mark that this isn't to go to any default event handler - pmix_init
 145      * takes care of that for us, but we have to explicitly do it here */
 146     flag = true;
 147     PMIX_INFO_LOAD(&info[4], PMIX_EVENT_NON_DEFAULT, &flag, PMIX_BOOL);
 148 
 149     /* see if pmix is already initialized - note that if we
 150      * don't know our process identifier at this point (e.g.,
 151      * we don't store it in some global location), then we
 152      * could always call PMIx_Init anyway as it is just
 153      * reference counted. */
 154     if (PMIx_Initialized()) {
 155         /* it is, so let's just use the event notification
 156          * API to let everyone know we are here */
 157         MY_CONSTRUCT_LOCK(&lock);
 158         ret = PMIx_Notify_event(code, &myproc,
 159                                 PMIX_RANGE_PROC_LOCAL,
 160                                 info, 5,
 161                                 opcbfunc, (void*)&lock);
 162         MY_WAIT_THREAD(&lock);
 163         MY_DESTRUCT_LOCK(&lock);
 164     } else {
 165         /* call pmix to initialize these values */
 166         ret = PMIx_Init(&myproc, info, 5);
 167         init = true;
 168     }
 169     PMIX_INFO_FREE(info, 5);
 170 
 171     /* register to receive model callbacks */
 172     PMIX_INFO_CREATE(directives, 1);
 173     /* give the event a name so we can distinguish it */
 174     PMIX_INFO_LOAD(&directives[0], PMIX_EVENT_HDLR_NAME, "My-Declarations", PMIX_STRING);
 175 
 176     /* we could constrain the range to proc_local - technically, this
 177      * isn't required so long as the code that generates
 178      * the event stipulates its range as proc_local. We rely
 179      * on that here */
 180     MY_CONSTRUCT_LOCK(&lock);
 181     PMIx_Register_event_handler(&code, 1, directives, 1,
 182                                 model_callback,
 183                                 model_registration_callback,
 184                                 (void*)&lock);
 185     MY_WAIT_THREAD(&lock);
 186     MY_DESTRUCT_LOCK(&lock);
 187     PMIX_INFO_FREE(directives, 1);
 188 
 189     /* wait for the model callback */
 190     MY_WAIT_THREAD(&thread_complete);
 191 
 192     /* let's do a couple of operations just to verify we can,
 193      * starting with a query */
 194     PMIX_QUERY_CREATE(query, 1);
 195     PMIX_ARGV_APPEND(ret, query->keys, PMIX_QUERY_NAMESPACES);
 196 
 197     MY_CONSTRUCT_LOCK(&lock);
 198     PMIx_Query_info_nb(query, 1, infocb, &lock);
 199     MY_WAIT_THREAD(&lock);
 200     MY_DESTRUCT_LOCK(&lock);
 201     PMIX_QUERY_FREE(query, 1);
 202 
 203     /* Get something */
 204     val = NULL;
 205     PMIx_Get(&myproc, "WASSUP", NULL, 0, &val);
 206     if (NULL == val) {
 207         fprintf(stderr, "ERROR GETTING WASSUP\n");
 208     } else {
 209         fprintf(stderr, "THREAD WASSUP: %s\n", val->data.string);
 210         PMIX_VALUE_FREE(val, 1);
 211     }
 212 
 213     /* lookup something published by the main thread */
 214     PMIX_PDATA_CREATE(pdata, 1);
 215     PMIX_PDATA_LOAD(&pdata[0], &myproc, "SOMETHING", NULL, PMIX_BOOL);
 216 
 217     /* tell the call to wait for the data to be published */
 218     PMIX_INFO_CREATE(directives, 1);
 219     PMIX_INFO_LOAD(&directives[0], PMIX_WAIT, &wait, PMIX_INT);
 220 
 221     if (PMIX_SUCCESS != PMIx_Lookup(pdata, 1, directives, 1)) {
 222         fprintf(stderr, "LOOKUP FAILED\n");
 223     } else {
 224         fprintf(stderr, "LOOKUP RETURNED %s\n", pdata[0].value.data.string);
 225     }
 226     PMIX_PDATA_FREE(pdata, 1);
 227     PMIX_INFO_FREE(directives, 1);
 228 
 229     if (init) {
 230         /* need to finalize to maintain refcount */
 231         PMIx_Finalize(NULL, 0);
 232     }
 233 
 234     /* done */
 235     return NULL;
 236 }
 237 
 238 int main(int argc, char* argv[])
 239 {
 240     int rank, size, rc;
 241     pid_t pid;
 242     pthread_t mythread;
 243     bool before = false;
 244     pmix_info_t *info;
 245     pmix_value_t value;
 246     char *valstring;
 247     pmix_data_range_t range = PMIX_RANGE_LOCAL;
 248 
 249     if (1 < argc) {
 250         if (0 == strcmp(argv[1], "-b") || 0 == strcmp(argv[1], "--before")) {
 251             before = true;
 252         }
 253     }
 254 
 255     if (before) {
 256         /* spin up a thread */
 257         if (pthread_create(&mythread, NULL, mylib, NULL)) {
 258             fprintf(stderr, "Error creating thread\n");
 259             goto done;
 260         }
 261     }
 262 
 263     MPI_Init(&argc, &argv);
 264     MPI_Comm_rank(MPI_COMM_WORLD, &rank);
 265     MPI_Comm_size(MPI_COMM_WORLD, &size);
 266     pid = getpid();
 267 
 268     if (!before) {
 269         /* spin up a thread */
 270         if (pthread_create(&mythread, NULL, mylib, NULL)) {
 271             fprintf(stderr, "Error creating thread\n");
 272             goto done;
 273         }
 274     }
 275 
 276     /* push something the thread can recognize */
 277     PMIX_VALUE_CONSTRUCT(&value);
 278     value.type = PMIX_STRING;
 279     value.data.string = strdup("nothing");
 280     PMIx_Put(PMIX_LOCAL, "WASSUP", &value);
 281     PMIX_VALUE_DESTRUCT(&value);
 282     /* no need to commit it as this is strictly within ourselves */
 283 
 284     printf("Hello, World, I am %d of %d\n", rank, size);
 285 
 286     /* publish something */
 287     PMIX_INFO_CREATE(info, 2);
 288     PMIX_INFO_LOAD(&info[0], "SOMETHING", "foobar", PMIX_STRING);
 289     PMIX_INFO_LOAD(&info[1], PMIX_RANGE, &range, PMIX_DATA_RANGE);
 290     PMIx_Publish(info, 2);
 291     PMIX_INFO_FREE(info, 2);
 292 
 293     /* wait for the thread to finish */
 294     if (pthread_join(mythread, NULL)) {
 295         fprintf(stderr, "Error joining thread\n");
 296     }
 297 
 298   done:
 299     MPI_Finalize();
 300     return 0;
 301 }

/* [<][>][^][v][top][bottom][index][help] */