root/contrib/scaling/mpi_memprobe.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. _release_fn
  2. _register_fn
  3. qcbfunc
  4. notifycbfunc
  5. sample
  6. main

   1 /* -*- C -*-
   2  *
   3  * $HEADER$
   4  *
   5  * The most basic of MPI applications
   6  */
   7 
   8 #include "orte_config.h"
   9 
  10 #include <stdio.h>
  11 #include "mpi.h"
  12 #include "opal/mca/pmix/pmix.h"
  13 #include "opal/util/argv.h"
  14 #include "opal/util/printf.h"
  15 #include "orte/runtime/runtime.h"
  16 #include "orte/util/proc_info.h"
  17 #include "orte/util/name_fns.h"
  18 #include "orte/runtime/orte_globals.h"
  19 #include "orte/mca/errmgr/errmgr.h"
  20 
  21 static int rank, size;
  22 static volatile bool wait_for_release = true;
  23 #define MEMPROBE_RELEASE 12345
  24 
  25 static void _release_fn(int status,
  26                         const opal_process_name_t *source,
  27                         opal_list_t *info, opal_list_t *results,
  28                         opal_pmix_notification_complete_fn_t cbfunc,
  29                         void *cbdata)
  30 {
  31     /* must let the notifier know we are done */
  32     if (NULL != cbfunc) {
  33         cbfunc(OPAL_ERR_HANDLERS_COMPLETE, NULL, NULL, NULL, cbdata);
  34     }
  35     /* flag that the debugger is complete so we can exit */
  36     wait_for_release = false;
  37 }
  38 
  39 static void _register_fn(int status,
  40                          size_t evhandler_ref,
  41                          void *cbdata)
  42 {
  43     volatile int *active = (volatile int*)cbdata;
  44 
  45     if (0 != status) {
  46         fprintf(stderr, "Client EVENT HANDLER REGISTRATION FAILED WITH STATUS %d, ref=%lu\n",
  47                    status, (unsigned long)evhandler_ref);
  48     }
  49     *active = status;
  50 }
  51 
  52 static void qcbfunc(int status,
  53                     opal_list_t *info,
  54                     void *cbdata,
  55                     opal_pmix_release_cbfunc_t release_fn,
  56                     void *release_cbdata)
  57 {
  58     opal_list_t *results = (opal_list_t*)cbdata;
  59     opal_value_t *kv;
  60 
  61     if (NULL != info) {
  62         while (NULL != (kv = (opal_value_t*)opal_list_remove_first(info))) {
  63             opal_list_append(results, &kv->super);
  64         }
  65     }
  66     if (NULL != release_fn) {
  67         release_fn(release_cbdata);
  68     }
  69     wait_for_release = false;
  70 }
  71 
  72 static void notifycbfunc(int status, void *cbdata)
  73 {
  74     volatile int *active = (volatile int*)cbdata;
  75     *active = status;
  76 }
  77 
  78 static void sample(void)
  79 {
  80     opal_value_t *kv, *ival;
  81     opal_pmix_query_t *q;
  82     opal_list_t query, response, *lt;
  83     volatile int active;
  84     char **answer = NULL, *tmp, *msg;
  85 
  86     OBJ_CONSTRUCT(&query, opal_list_t);
  87     OBJ_CONSTRUCT(&response, opal_list_t);
  88     q = OBJ_NEW(opal_pmix_query_t);
  89     opal_list_append(&query, &q->super);
  90     opal_argv_append_nosize(&q->keys, OPAL_PMIX_QUERY_MEMORY_USAGE);
  91     /* qualify that we just want local avg, min/max values reported */
  92     kv = OBJ_NEW(opal_value_t);
  93     kv->key = strdup(OPAL_PMIX_QUERY_LOCAL_ONLY);
  94     kv->type = OPAL_BOOL;
  95     kv->data.flag = true;
  96     opal_list_append(&q->qualifiers, &kv->super);
  97     kv = OBJ_NEW(opal_value_t);
  98     kv->key = strdup(OPAL_PMIX_QUERY_REPORT_AVG);
  99     kv->type = OPAL_BOOL;
 100     kv->data.flag = true;
 101     opal_list_append(&q->qualifiers, &kv->super);
 102     kv = OBJ_NEW(opal_value_t);
 103     kv->key = strdup(OPAL_PMIX_QUERY_REPORT_MINMAX);
 104     kv->type = OPAL_BOOL;
 105     kv->data.flag = true;
 106     opal_list_append(&q->qualifiers, &kv->super);
 107     /* issue the request */
 108     wait_for_release = true;
 109     opal_pmix.query(&query, qcbfunc, (void*)&response);
 110     /* wait for the query to complete */
 111     while (wait_for_release) {
 112         usleep(10);
 113     }
 114     wait_for_release = true;
 115     /* log my own results as a single string so the output
 116      * doesn't get garbled on the other end */
 117     opal_asprintf(&tmp, "Data for node %s", orte_process_info.nodename);
 118     opal_argv_append_nosize(&answer, tmp);
 119     free(tmp);
 120     OPAL_LIST_FOREACH(kv, &response, opal_value_t) {
 121         lt = (opal_list_t*)kv->data.ptr;
 122         if (NULL != lt) {
 123             OPAL_LIST_FOREACH(ival, lt, opal_value_t) {
 124                 if (0 == strcmp(ival->key, OPAL_PMIX_DAEMON_MEMORY)) {
 125                     opal_asprintf(&tmp, "\tDaemon: %f", ival->data.fval);
 126                     opal_argv_append_nosize(&answer, tmp);
 127                     free(tmp);
 128                 } else if (0 == strcmp(ival->key, OPAL_PMIX_CLIENT_AVG_MEMORY)) {
 129                     opal_asprintf(&tmp, "\tClient: %f", ival->data.fval);
 130                     opal_argv_append_nosize(&answer, tmp);
 131                     free(tmp);
 132                 } else {
 133                     fprintf(stderr, "\tUnknown key: %s", ival->key);
 134                 }
 135             }
 136         }
 137     }
 138     opal_argv_append_nosize(&answer, "\n");
 139     OPAL_LIST_DESTRUCT(&response);
 140 
 141     /* construct the log output */
 142     OBJ_CONSTRUCT(&response, opal_list_t);
 143     kv = OBJ_NEW(opal_value_t);
 144     kv->key = strdup(OPAL_PMIX_LOG_STDOUT);
 145     kv->type = OPAL_STRING;
 146     kv->data.string = opal_argv_join(answer, '\n');
 147     opal_list_append(&response, &kv->super);
 148     opal_argv_free(answer);
 149     active = -1;
 150     opal_pmix.log(&response, notifycbfunc, (void*)&active);
 151     while (-1 == active) {
 152         usleep(10);
 153     }
 154     OPAL_LIST_DESTRUCT(&response);
 155 
 156     if (0 == rank) {
 157         /* send the notification to release the other procs */
 158         wait_for_release = true;
 159         OBJ_CONSTRUCT(&response, opal_list_t);
 160         kv = OBJ_NEW(opal_value_t);
 161         kv->key = strdup(OPAL_PMIX_EVENT_NON_DEFAULT);
 162         kv->type = OPAL_BOOL;
 163         kv->data.flag = true;
 164         opal_list_append(&response, &kv->super);
 165         active = -1;
 166         if (OPAL_SUCCESS != opal_pmix.notify_event(MEMPROBE_RELEASE, NULL,
 167                                                    OPAL_PMIX_RANGE_GLOBAL, &response,
 168                                                    NULL, NULL)) {
 169             fprintf(stderr, "Notify event failed\n");
 170             exit(1);
 171         }
 172     } else {
 173         /* now wait for notification */
 174         while (wait_for_release) {
 175             usleep(10);
 176         }
 177     }
 178 }
 179 
 180 int main(int argc, char* argv[])
 181 {
 182     opal_list_t *codes;
 183     opal_value_t *kv;
 184     volatile int active;
 185 
 186     MPI_Init(&argc, &argv);
 187     MPI_Comm_rank(MPI_COMM_WORLD, &rank);
 188     MPI_Comm_size(MPI_COMM_WORLD, &size);
 189 
 190     if (0 == rank) {
 191         fprintf(stderr, "Sampling memory usage after MPI_Init\n");
 192     }
 193 
 194     /* everyone registers their event handler */
 195     codes = OBJ_NEW(opal_list_t);
 196     kv = OBJ_NEW(opal_value_t);
 197     kv->key = strdup("errorcode");
 198     kv->type = OPAL_INT;
 199     kv->data.integer = MEMPROBE_RELEASE;
 200     opal_list_append(codes, &kv->super);
 201 
 202     active = -1;
 203     opal_pmix.register_evhandler(codes, NULL, _release_fn, _register_fn, (void*)&active);
 204     while (-1 == active) {
 205         usleep(10);
 206     }
 207 
 208     /* if I am the local leader (i.e., local_rank=0), then I ask
 209      * my daemon to report the local memory usage, and send it
 210      * to rank=0 */
 211     if (0 == orte_process_info.my_local_rank) {
 212         sample();
 213     } else {
 214         /* now wait for notification */
 215         while (wait_for_release) {
 216             usleep(10);
 217         }
 218     }
 219     wait_for_release = true;
 220 
 221     /* perform a barrier so some communication will occur, thus
 222      * requiring exchange of endpoint info */
 223     MPI_Barrier(MPI_COMM_WORLD);
 224 
 225     if (0 == rank) {
 226         fprintf(stderr, "\n\nSampling memory usage after MPI_Barrier\n");
 227     }
 228 
 229     if (0 == orte_process_info.my_local_rank) {
 230         if (0 != rank) {
 231             /* wait a little */
 232             usleep(1000);
 233         }
 234         sample();
 235     } else {
 236         /* wait again while memory is sampled */
 237         while (wait_for_release) {
 238             usleep(10);
 239         }
 240     }
 241 
 242     MPI_Finalize();
 243     return 0;
 244 }

/* [<][>][^][v][top][bottom][index][help] */