root/orte/test/mpi/interlib.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. model_registration_callback
  2. model_callback
  3. opcbfunc
  4. infocb
  5. mylib
  6. main

   1 /* -*- C -*-
   2  *
   3  * $HEADER$
   4  *
   5  * The most basic of MPI applications
   6  */
   7 
   8 #include <stdio.h>
   9 #include <pthread.h>
  10 
  11 #include "opal/mca/hwloc/base/base.h"
  12 #include "mpi.h"
  13 
  14 #include "orte/util/proc_info.h"
  15 #include "opal/mca/pmix/base/base.h"
  16 
  17 static size_t interlibhandler_id = SIZE_MAX;
  18 static opal_pmix_lock_t thread_complete;
  19 
  20 static void model_registration_callback(int status,
  21                                         size_t errhandler_ref,
  22                                         void *cbdata)
  23 {
  24     opal_pmix_lock_t *lock = (opal_pmix_lock_t*)cbdata;
  25 
  26     interlibhandler_id = errhandler_ref;
  27     OPAL_PMIX_WAKEUP_THREAD(lock);
  28 }
  29 static void model_callback(int status,
  30                            const opal_process_name_t *source,
  31                            opal_list_t *info, opal_list_t *results,
  32                            opal_pmix_notification_complete_fn_t cbfunc,
  33                            void *cbdata)
  34 {
  35     opal_value_t *val;
  36 
  37     /* we can ignore our own callback as we obviously
  38      * know that we are OpenMP */
  39     if (NULL != info) {
  40         OPAL_LIST_FOREACH(val, info, opal_value_t) {
  41             if (0 == strcmp(val->key, OPAL_PMIX_PROGRAMMING_MODEL) &&
  42                 0 == strcmp(val->data.string, "OpenMP")) {
  43                 goto cback;
  44             }
  45             if (OPAL_STRING == val->type) {
  46                 opal_output(0, "Thread Model Callback Key: %s Val %s", val->key, val->data.string);
  47             }
  48         }
  49     }
  50     /* otherwise, do something clever here */
  51 
  52   cback:
  53     /* we must NOT tell the event handler state machine that we
  54      * are the last step as that will prevent it from notifying
  55      * anyone else that might be listening for declarations */
  56     if (NULL != cbfunc) {
  57         cbfunc(OPAL_SUCCESS, NULL, NULL, NULL, cbdata);
  58     }
  59     OPAL_PMIX_WAKEUP_THREAD(&thread_complete);
  60 }
  61 
  62 static void opcbfunc(int status, void *cbdata)
  63 {
  64     opal_pmix_lock_t *lock = (opal_pmix_lock_t*)cbdata;
  65     OPAL_PMIX_WAKEUP_THREAD(lock);
  66 }
  67 
  68 static void infocb(int status,
  69                    opal_list_t *info,
  70                    void *cbdata,
  71                    opal_pmix_release_cbfunc_t release_fn,
  72                    void *release_cbdata)
  73 {
  74     opal_pmix_lock_t *lock = (opal_pmix_lock_t*)cbdata;
  75     opal_value_t *kv;
  76 
  77     OPAL_LIST_FOREACH(kv, info, opal_value_t) {
  78         opal_output(0, "QUERY DATA KEY: %s VALUE %s", kv->key, kv->data.string);
  79     }
  80     if (NULL != release_fn) {
  81         release_fn(release_cbdata);
  82     }
  83     OPAL_PMIX_WAKEUP_THREAD(lock);
  84 }
  85 
  86 static void *mylib(void *ptr)
  87 {
  88     opal_list_t info, directives;
  89     opal_value_t *kv;
  90     int ret;
  91     opal_pmix_lock_t lock;
  92     bool init = false;
  93     opal_pmix_query_t *query;
  94     opal_pmix_pdata_t *pdata;
  95 
  96     OPAL_PMIX_CONSTRUCT_LOCK(&thread_complete);
  97 
  98     /* declare that we are present and active */
  99     OBJ_CONSTRUCT(&info, opal_list_t);
 100     kv = OBJ_NEW(opal_value_t);
 101     kv->key = strdup(OPAL_PMIX_PROGRAMMING_MODEL);
 102     kv->type = OPAL_STRING;
 103     kv->data.string = strdup("OpenMP");
 104     opal_list_append(&info, &kv->super);
 105     kv = OBJ_NEW(opal_value_t);
 106     kv->key = strdup(OPAL_PMIX_MODEL_LIBRARY_NAME);
 107     kv->type = OPAL_STRING;
 108     kv->data.string = strdup("foobar");
 109     opal_list_append(&info, &kv->super);
 110     kv = OBJ_NEW(opal_value_t);
 111     kv->key = strdup(OPAL_PMIX_MODEL_LIBRARY_VERSION);
 112     kv->type = OPAL_STRING;
 113     kv->data.string = strdup("1.2.3.4");
 114     opal_list_append(&info, &kv->super);
 115     kv = OBJ_NEW(opal_value_t);
 116     kv->key = strdup(OPAL_PMIX_THREADING_MODEL);
 117     kv->type = OPAL_STRING;
 118     kv->data.string = strdup("PTHREAD");
 119     opal_list_append(&info, &kv->super);
 120 
 121     /* see if pmix is already initialized */
 122     if (opal_pmix.initialized()) {
 123         /* mark that this isn't to go to any default event handler - pmix_init
 124          * takes care of that for us, but we have to explicitly do it here */
 125         kv = OBJ_NEW(opal_value_t);
 126         kv->key = strdup(OPAL_PMIX_EVENT_NON_DEFAULT);
 127         kv->type = OPAL_BOOL;
 128         kv->data.flag = true;
 129         opal_list_append(&info, &kv->super);
 130         /* it is, so let's just use the event notification
 131          * API to let everyone know we are here */
 132         OPAL_PMIX_CONSTRUCT_LOCK(&lock);
 133         ret = opal_pmix.notify_event(OPAL_ERR_MODEL_DECLARED,
 134                                      &orte_process_info.my_name,
 135                                      OPAL_PMIX_RANGE_PROC_LOCAL, &info,
 136                                      opcbfunc, &lock);
 137         OPAL_PMIX_WAIT_THREAD(&lock);
 138         OPAL_PMIX_DESTRUCT_LOCK(&lock);
 139         OPAL_LIST_DESTRUCT(&info);
 140     } else {
 141         /* call pmix to initialize these values */
 142         ret = opal_pmix.init(&info);
 143         OPAL_LIST_DESTRUCT(&info);
 144         init = true;
 145     }
 146 
 147     /* register to receive model callbacks */
 148 
 149     /* give it a name so we can distinguish it */
 150     OBJ_CONSTRUCT(&directives, opal_list_t);
 151     kv = OBJ_NEW(opal_value_t);
 152     kv->key = strdup(OPAL_PMIX_EVENT_HDLR_NAME);
 153     kv->type = OPAL_STRING;
 154     kv->data.string = strdup("My-Declarations");
 155     opal_list_append(&directives, &kv->super);
 156     /* specify the event code */
 157     OBJ_CONSTRUCT(&info, opal_list_t);
 158     kv = OBJ_NEW(opal_value_t);
 159     kv->key = strdup("status");   // the key here is irrelevant
 160     kv->type = OPAL_INT;
 161     kv->data.integer = OPAL_ERR_MODEL_DECLARED;
 162     opal_list_append(&info, &kv->super);
 163     /* we could constrain the range to proc_local - technically, this
 164      * isn't required so long as the code that generates
 165      * the event stipulates its range as proc_local. We rely
 166      * on that here */
 167     OPAL_PMIX_CONSTRUCT_LOCK(&lock);
 168     opal_pmix.register_evhandler(&info, &directives, model_callback,
 169                                  model_registration_callback,
 170                                  (void*)&lock);
 171     OPAL_PMIX_WAIT_THREAD(&lock);
 172     OPAL_PMIX_DESTRUCT_LOCK(&lock);
 173     OPAL_LIST_DESTRUCT(&info);
 174     OPAL_LIST_DESTRUCT(&directives);
 175 
 176     /* wait for the model callback */
 177     OPAL_PMIX_WAIT_THREAD(&thread_complete);
 178 
 179     /* let's do a couple of operations just to verify we can,
 180      * starting with a query */
 181     OBJ_CONSTRUCT(&info, opal_list_t);
 182     query = OBJ_NEW(opal_pmix_query_t);
 183     opal_argv_append_nosize(&query->keys, OPAL_PMIX_QUERY_NAMESPACES);
 184     opal_list_append(&info, &query->super);
 185     OPAL_PMIX_CONSTRUCT_LOCK(&lock);
 186     opal_pmix.query(&info, infocb, &lock);
 187     OPAL_PMIX_WAIT_THREAD(&lock);
 188     OPAL_PMIX_DESTRUCT_LOCK(&lock);
 189     OPAL_LIST_DESTRUCT(&info);
 190 
 191     /* Get something */
 192     opal_pmix.get(&orte_process_info.my_name,
 193                   "WASSUP", NULL, &kv);
 194     if (NULL == kv) {
 195         fprintf(stderr, "ERROR GETTING WASSUP\n");
 196     } else {
 197         fprintf(stderr, "THREAD WASSUP: %s\n", kv->data.string);
 198         OBJ_RELEASE(kv);
 199     }
 200 
 201     /* lookup something published by the main thread */
 202     OBJ_CONSTRUCT(&info, opal_list_t);
 203     pdata = OBJ_NEW(opal_pmix_pdata_t);
 204     pdata->proc = orte_process_info.my_name;
 205     pdata->value.key = strdup("SOMETHING");
 206     opal_list_append(&info, &pdata->super);
 207     /* tell the call to wait for the data to be published */
 208     OBJ_CONSTRUCT(&directives, opal_list_t);
 209     kv = OBJ_NEW(opal_value_t);
 210     kv->key = strdup(OPAL_PMIX_WAIT);
 211     kv->type = OPAL_INT;
 212     kv->data.integer = 0;  // wait for all
 213     opal_list_append(&directives, &kv->super);
 214 
 215     if (OPAL_SUCCESS != opal_pmix.lookup(&info, &directives)) {
 216         fprintf(stderr, "LOOKUP FAILED\n");
 217     } else {
 218         pdata = (opal_pmix_pdata_t*)opal_list_get_first(&info);
 219         fprintf(stderr, "LOOKUP RETURNED %s\n", pdata->value.data.string);
 220     }
 221     OPAL_LIST_DESTRUCT(&info);
 222     OPAL_LIST_DESTRUCT(&directives);
 223 
 224     if (init) {
 225         /* need to finalize to maintain refcount */
 226         opal_pmix.finalize();
 227     }
 228 
 229     /* done */
 230     return NULL;
 231 }
 232 
 233 int main(int argc, char* argv[])
 234 {
 235     int rank, size, rc;
 236     hwloc_cpuset_t cpus;
 237     char *bindings = NULL;
 238     pid_t pid;
 239     pthread_t mythread;
 240     opal_value_t kv, *kptr;
 241     opal_list_t list;
 242 
 243     MPI_Init(&argc, &argv);
 244     MPI_Comm_rank(MPI_COMM_WORLD, &rank);
 245     MPI_Comm_size(MPI_COMM_WORLD, &size);
 246     pid = getpid();
 247 
 248     /* push something the thread can recognize */
 249     OBJ_CONSTRUCT(&kv, opal_value_t);
 250     kv.key = strdup("WASSUP");
 251     kv.type = OPAL_STRING;
 252     kv.data.string = strdup("nothing");
 253     opal_pmix.put(OPAL_PMIX_LOCAL, &kv);
 254     OBJ_DESTRUCT(&kv);
 255     /* no need to commit it as this is strictly within ourselves */
 256 
 257     /* spin up a thread */
 258     if (pthread_create(&mythread, NULL, mylib, NULL)) {
 259         fprintf(stderr, "Error creating thread\n");
 260         goto done;
 261     }
 262 
 263     printf("[%lu] Rank %d: getting topology\n", (unsigned long)pid, rank);
 264     fflush(stdout);
 265     if (OPAL_SUCCESS == opal_hwloc_base_get_topology()) {
 266         cpus = hwloc_bitmap_alloc();
 267         rc = hwloc_get_cpubind(opal_hwloc_topology, cpus, HWLOC_CPUBIND_PROCESS);
 268         hwloc_bitmap_list_asprintf(&bindings, cpus);
 269     }
 270 
 271     printf("Hello, World, I am %d of %d [%d local peers]: get_cpubind: %d bitmap %s\n",
 272            rank, size, orte_process_info.num_local_peers, rc,
 273            (NULL == bindings) ? "NULL" : bindings);
 274 
 275     /* publish something */
 276     OBJ_CONSTRUCT(&list, opal_list_t);
 277     kptr = OBJ_NEW(opal_value_t);
 278     kptr->key = strdup("SOMETHING");
 279     kptr->type = OPAL_STRING;
 280     kptr->data.string = strdup("SILLY-THING");
 281     opal_list_append(&list, &kptr->super);
 282     opal_pmix.publish(&list);
 283     OPAL_LIST_DESTRUCT(&list);
 284 
 285     /* wait for the thread to finish */
 286     if (pthread_join(mythread, NULL)) {
 287         fprintf(stderr, "Error joining thread\n");
 288     }
 289 
 290   done:
 291     MPI_Finalize();
 292     return 0;
 293 }

/* [<][>][^][v][top][bottom][index][help] */