This source file includes following definitions.
- _release_fn
- _register_fn
- qcbfunc
- notifycbfunc
- sample
- main
1
2
3
4
5
6
7
8 #include "orte_config.h"
9
10 #include <stdio.h>
11 #include "mpi.h"
12 #include "opal/mca/pmix/pmix.h"
13 #include "opal/util/argv.h"
14 #include "opal/util/printf.h"
15 #include "orte/runtime/runtime.h"
16 #include "orte/util/proc_info.h"
17 #include "orte/util/name_fns.h"
18 #include "orte/runtime/orte_globals.h"
19 #include "orte/mca/errmgr/errmgr.h"
20
21 static int rank, size;
22 static volatile bool wait_for_release = true;
23 #define MEMPROBE_RELEASE 12345
24
25 static void _release_fn(int status,
26 const opal_process_name_t *source,
27 opal_list_t *info, opal_list_t *results,
28 opal_pmix_notification_complete_fn_t cbfunc,
29 void *cbdata)
30 {
31
32 if (NULL != cbfunc) {
33 cbfunc(OPAL_ERR_HANDLERS_COMPLETE, NULL, NULL, NULL, cbdata);
34 }
35
36 wait_for_release = false;
37 }
38
39 static void _register_fn(int status,
40 size_t evhandler_ref,
41 void *cbdata)
42 {
43 volatile int *active = (volatile int*)cbdata;
44
45 if (0 != status) {
46 fprintf(stderr, "Client EVENT HANDLER REGISTRATION FAILED WITH STATUS %d, ref=%lu\n",
47 status, (unsigned long)evhandler_ref);
48 }
49 *active = status;
50 }
51
52 static void qcbfunc(int status,
53 opal_list_t *info,
54 void *cbdata,
55 opal_pmix_release_cbfunc_t release_fn,
56 void *release_cbdata)
57 {
58 opal_list_t *results = (opal_list_t*)cbdata;
59 opal_value_t *kv;
60
61 if (NULL != info) {
62 while (NULL != (kv = (opal_value_t*)opal_list_remove_first(info))) {
63 opal_list_append(results, &kv->super);
64 }
65 }
66 if (NULL != release_fn) {
67 release_fn(release_cbdata);
68 }
69 wait_for_release = false;
70 }
71
72 static void notifycbfunc(int status, void *cbdata)
73 {
74 volatile int *active = (volatile int*)cbdata;
75 *active = status;
76 }
77
78 static void sample(void)
79 {
80 opal_value_t *kv, *ival;
81 opal_pmix_query_t *q;
82 opal_list_t query, response, *lt;
83 volatile int active;
84 char **answer = NULL, *tmp, *msg;
85
86 OBJ_CONSTRUCT(&query, opal_list_t);
87 OBJ_CONSTRUCT(&response, opal_list_t);
88 q = OBJ_NEW(opal_pmix_query_t);
89 opal_list_append(&query, &q->super);
90 opal_argv_append_nosize(&q->keys, OPAL_PMIX_QUERY_MEMORY_USAGE);
91
92 kv = OBJ_NEW(opal_value_t);
93 kv->key = strdup(OPAL_PMIX_QUERY_LOCAL_ONLY);
94 kv->type = OPAL_BOOL;
95 kv->data.flag = true;
96 opal_list_append(&q->qualifiers, &kv->super);
97 kv = OBJ_NEW(opal_value_t);
98 kv->key = strdup(OPAL_PMIX_QUERY_REPORT_AVG);
99 kv->type = OPAL_BOOL;
100 kv->data.flag = true;
101 opal_list_append(&q->qualifiers, &kv->super);
102 kv = OBJ_NEW(opal_value_t);
103 kv->key = strdup(OPAL_PMIX_QUERY_REPORT_MINMAX);
104 kv->type = OPAL_BOOL;
105 kv->data.flag = true;
106 opal_list_append(&q->qualifiers, &kv->super);
107
108 wait_for_release = true;
109 opal_pmix.query(&query, qcbfunc, (void*)&response);
110
111 while (wait_for_release) {
112 usleep(10);
113 }
114 wait_for_release = true;
115
116
117 opal_asprintf(&tmp, "Data for node %s", orte_process_info.nodename);
118 opal_argv_append_nosize(&answer, tmp);
119 free(tmp);
120 OPAL_LIST_FOREACH(kv, &response, opal_value_t) {
121 lt = (opal_list_t*)kv->data.ptr;
122 if (NULL != lt) {
123 OPAL_LIST_FOREACH(ival, lt, opal_value_t) {
124 if (0 == strcmp(ival->key, OPAL_PMIX_DAEMON_MEMORY)) {
125 opal_asprintf(&tmp, "\tDaemon: %f", ival->data.fval);
126 opal_argv_append_nosize(&answer, tmp);
127 free(tmp);
128 } else if (0 == strcmp(ival->key, OPAL_PMIX_CLIENT_AVG_MEMORY)) {
129 opal_asprintf(&tmp, "\tClient: %f", ival->data.fval);
130 opal_argv_append_nosize(&answer, tmp);
131 free(tmp);
132 } else {
133 fprintf(stderr, "\tUnknown key: %s", ival->key);
134 }
135 }
136 }
137 }
138 opal_argv_append_nosize(&answer, "\n");
139 OPAL_LIST_DESTRUCT(&response);
140
141
142 OBJ_CONSTRUCT(&response, opal_list_t);
143 kv = OBJ_NEW(opal_value_t);
144 kv->key = strdup(OPAL_PMIX_LOG_STDOUT);
145 kv->type = OPAL_STRING;
146 kv->data.string = opal_argv_join(answer, '\n');
147 opal_list_append(&response, &kv->super);
148 opal_argv_free(answer);
149 active = -1;
150 opal_pmix.log(&response, notifycbfunc, (void*)&active);
151 while (-1 == active) {
152 usleep(10);
153 }
154 OPAL_LIST_DESTRUCT(&response);
155
156 if (0 == rank) {
157
158 wait_for_release = true;
159 OBJ_CONSTRUCT(&response, opal_list_t);
160 kv = OBJ_NEW(opal_value_t);
161 kv->key = strdup(OPAL_PMIX_EVENT_NON_DEFAULT);
162 kv->type = OPAL_BOOL;
163 kv->data.flag = true;
164 opal_list_append(&response, &kv->super);
165 active = -1;
166 if (OPAL_SUCCESS != opal_pmix.notify_event(MEMPROBE_RELEASE, NULL,
167 OPAL_PMIX_RANGE_GLOBAL, &response,
168 NULL, NULL)) {
169 fprintf(stderr, "Notify event failed\n");
170 exit(1);
171 }
172 } else {
173
174 while (wait_for_release) {
175 usleep(10);
176 }
177 }
178 }
179
180 int main(int argc, char* argv[])
181 {
182 opal_list_t *codes;
183 opal_value_t *kv;
184 volatile int active;
185
186 MPI_Init(&argc, &argv);
187 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
188 MPI_Comm_size(MPI_COMM_WORLD, &size);
189
190 if (0 == rank) {
191 fprintf(stderr, "Sampling memory usage after MPI_Init\n");
192 }
193
194
195 codes = OBJ_NEW(opal_list_t);
196 kv = OBJ_NEW(opal_value_t);
197 kv->key = strdup("errorcode");
198 kv->type = OPAL_INT;
199 kv->data.integer = MEMPROBE_RELEASE;
200 opal_list_append(codes, &kv->super);
201
202 active = -1;
203 opal_pmix.register_evhandler(codes, NULL, _release_fn, _register_fn, (void*)&active);
204 while (-1 == active) {
205 usleep(10);
206 }
207
208
209
210
211 if (0 == orte_process_info.my_local_rank) {
212 sample();
213 } else {
214
215 while (wait_for_release) {
216 usleep(10);
217 }
218 }
219 wait_for_release = true;
220
221
222
223 MPI_Barrier(MPI_COMM_WORLD);
224
225 if (0 == rank) {
226 fprintf(stderr, "\n\nSampling memory usage after MPI_Barrier\n");
227 }
228
229 if (0 == orte_process_info.my_local_rank) {
230 if (0 != rank) {
231
232 usleep(1000);
233 }
234 sample();
235 } else {
236
237 while (wait_for_release) {
238 usleep(10);
239 }
240 }
241
242 MPI_Finalize();
243 return 0;
244 }