This source file includes following definitions.
- model_registration_callback
- model_callback
- opcbfunc
- infocb
- mylib
- main
1
2
3
4
5
6
7
8 #include <stdio.h>
9 #include <pthread.h>
10 #include "mpi.h"
11 #include "pmix.h"
12
13 typedef struct {
14 pthread_mutex_t mutex;
15 pthread_cond_t cond;
16 volatile bool active;
17 pmix_status_t status;
18 } mylock_t;
19
20 #define MY_CONSTRUCT_LOCK(l) \
21 do { \
22 pthread_mutex_init(&(l)->mutex, NULL); \
23 pthread_cond_init(&(l)->cond, NULL); \
24 (l)->active = true; \
25 (l)->status = PMIX_SUCCESS; \
26 } while(0)
27
28 #define MY_DESTRUCT_LOCK(l) \
29 do { \
30 pthread_mutex_destroy(&(l)->mutex); \
31 pthread_cond_destroy(&(l)->cond); \
32 } while(0)
33
34 #define MY_WAIT_THREAD(lck) \
35 do { \
36 pthread_mutex_lock(&(lck)->mutex); \
37 while ((lck)->active) { \
38 pthread_cond_wait(&(lck)->cond, &(lck)->mutex); \
39 } \
40 pthread_mutex_unlock(&(lck)->mutex); \
41 } while(0)
42
43 #define MY_WAKEUP_THREAD(lck) \
44 do { \
45 pthread_mutex_lock(&(lck)->mutex); \
46 (lck)->active = false; \
47 pthread_cond_broadcast(&(lck)->cond); \
48 pthread_mutex_unlock(&(lck)->mutex); \
49 } while(0)
50
51
52 static size_t interlibhandler_id = SIZE_MAX;
53 static mylock_t thread_complete;
54 static pmix_proc_t myproc;
55
56 static void model_registration_callback(pmix_status_t status,
57 size_t errhandler_ref,
58 void *cbdata)
59 {
60 mylock_t *lock = (mylock_t*)cbdata;
61
62 interlibhandler_id = errhandler_ref;
63 MY_WAKEUP_THREAD(lock);
64 }
65 static void model_callback(size_t evhdlr_registration_id,
66 pmix_status_t status,
67 const pmix_proc_t *source,
68 pmix_info_t info[], size_t ninfo,
69 pmix_info_t *results, size_t nresults,
70 pmix_event_notification_cbfunc_fn_t cbfunc,
71 void *cbdata)
72 {
73 size_t n;
74
75
76
77 if (NULL != info) {
78 for (n=0; n < ninfo; n++) {
79 if (0 == strcmp(info[n].key, PMIX_PROGRAMMING_MODEL) &&
80 0 == strcmp(info[n].value.data.string, "OpenMP")) {
81 goto cback;
82 }
83 if (PMIX_STRING == info[n].value.type) {
84 fprintf(stderr, "Thread Model Callback Key: %s Val %s\n", info[n].key, info[n].value.data.string);
85 }
86 }
87 }
88
89
90 cback:
91
92
93
94 if (NULL != cbfunc) {
95 cbfunc(PMIX_SUCCESS, NULL, 0, NULL, NULL, cbdata);
96 }
97 MY_WAKEUP_THREAD(&thread_complete);
98 }
99
100 static void opcbfunc(pmix_status_t status, void *cbdata)
101 {
102 mylock_t *lock = (mylock_t*)cbdata;
103 MY_WAKEUP_THREAD(lock);
104 }
105
106 static void infocb(pmix_status_t status,
107 pmix_info_t *info, size_t ninfo,
108 void *cbdata,
109 pmix_release_cbfunc_t release_fn,
110 void *release_cbdata)
111 {
112 mylock_t *lock = (mylock_t*)cbdata;
113 size_t n;
114
115 for (n=0; n < ninfo; n++) {
116 fprintf(stderr, "QUERY DATA KEY: %s VALUE %s\n", info[n].key, info[n].value.data.string);
117 }
118 if (NULL != release_fn) {
119 release_fn(release_cbdata);
120 }
121 MY_WAKEUP_THREAD(lock);
122 }
123
124 static void *mylib(void *ptr)
125 {
126 pmix_info_t *info, *directives;
127 pmix_status_t ret;
128 mylock_t lock;
129 bool init = false, flag;
130 pmix_query_t *query;
131 pmix_pdata_t *pdata;
132 pmix_status_t code = PMIX_MODEL_DECLARED;
133 pmix_value_t *val;
134 int wait = 0;
135
136 MY_CONSTRUCT_LOCK(&thread_complete);
137
138
139 PMIX_INFO_CREATE(info, 5);
140 PMIX_INFO_LOAD(&info[0], PMIX_PROGRAMMING_MODEL, "OpenMP", PMIX_STRING);
141 PMIX_INFO_LOAD(&info[1], PMIX_MODEL_LIBRARY_NAME, "foobar", PMIX_STRING);
142 PMIX_INFO_LOAD(&info[2], PMIX_MODEL_LIBRARY_VERSION, "1.2.3.4", PMIX_STRING);
143 PMIX_INFO_LOAD(&info[3], PMIX_THREADING_MODEL, "PTHREAD", PMIX_STRING);
144
145
146 flag = true;
147 PMIX_INFO_LOAD(&info[4], PMIX_EVENT_NON_DEFAULT, &flag, PMIX_BOOL);
148
149
150
151
152
153
154 if (PMIx_Initialized()) {
155
156
157 MY_CONSTRUCT_LOCK(&lock);
158 ret = PMIx_Notify_event(code, &myproc,
159 PMIX_RANGE_PROC_LOCAL,
160 info, 5,
161 opcbfunc, (void*)&lock);
162 MY_WAIT_THREAD(&lock);
163 MY_DESTRUCT_LOCK(&lock);
164 } else {
165
166 ret = PMIx_Init(&myproc, info, 5);
167 init = true;
168 }
169 PMIX_INFO_FREE(info, 5);
170
171
172 PMIX_INFO_CREATE(directives, 1);
173
174 PMIX_INFO_LOAD(&directives[0], PMIX_EVENT_HDLR_NAME, "My-Declarations", PMIX_STRING);
175
176
177
178
179
180 MY_CONSTRUCT_LOCK(&lock);
181 PMIx_Register_event_handler(&code, 1, directives, 1,
182 model_callback,
183 model_registration_callback,
184 (void*)&lock);
185 MY_WAIT_THREAD(&lock);
186 MY_DESTRUCT_LOCK(&lock);
187 PMIX_INFO_FREE(directives, 1);
188
189
190 MY_WAIT_THREAD(&thread_complete);
191
192
193
194 PMIX_QUERY_CREATE(query, 1);
195 PMIX_ARGV_APPEND(ret, query->keys, PMIX_QUERY_NAMESPACES);
196
197 MY_CONSTRUCT_LOCK(&lock);
198 PMIx_Query_info_nb(query, 1, infocb, &lock);
199 MY_WAIT_THREAD(&lock);
200 MY_DESTRUCT_LOCK(&lock);
201 PMIX_QUERY_FREE(query, 1);
202
203
204 val = NULL;
205 PMIx_Get(&myproc, "WASSUP", NULL, 0, &val);
206 if (NULL == val) {
207 fprintf(stderr, "ERROR GETTING WASSUP\n");
208 } else {
209 fprintf(stderr, "THREAD WASSUP: %s\n", val->data.string);
210 PMIX_VALUE_FREE(val, 1);
211 }
212
213
214 PMIX_PDATA_CREATE(pdata, 1);
215 PMIX_PDATA_LOAD(&pdata[0], &myproc, "SOMETHING", NULL, PMIX_BOOL);
216
217
218 PMIX_INFO_CREATE(directives, 1);
219 PMIX_INFO_LOAD(&directives[0], PMIX_WAIT, &wait, PMIX_INT);
220
221 if (PMIX_SUCCESS != PMIx_Lookup(pdata, 1, directives, 1)) {
222 fprintf(stderr, "LOOKUP FAILED\n");
223 } else {
224 fprintf(stderr, "LOOKUP RETURNED %s\n", pdata[0].value.data.string);
225 }
226 PMIX_PDATA_FREE(pdata, 1);
227 PMIX_INFO_FREE(directives, 1);
228
229 if (init) {
230
231 PMIx_Finalize(NULL, 0);
232 }
233
234
235 return NULL;
236 }
237
238 int main(int argc, char* argv[])
239 {
240 int rank, size, rc;
241 pid_t pid;
242 pthread_t mythread;
243 bool before = false;
244 pmix_info_t *info;
245 pmix_value_t value;
246 char *valstring;
247 pmix_data_range_t range = PMIX_RANGE_LOCAL;
248
249 if (1 < argc) {
250 if (0 == strcmp(argv[1], "-b") || 0 == strcmp(argv[1], "--before")) {
251 before = true;
252 }
253 }
254
255 if (before) {
256
257 if (pthread_create(&mythread, NULL, mylib, NULL)) {
258 fprintf(stderr, "Error creating thread\n");
259 goto done;
260 }
261 }
262
263 MPI_Init(&argc, &argv);
264 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
265 MPI_Comm_size(MPI_COMM_WORLD, &size);
266 pid = getpid();
267
268 if (!before) {
269
270 if (pthread_create(&mythread, NULL, mylib, NULL)) {
271 fprintf(stderr, "Error creating thread\n");
272 goto done;
273 }
274 }
275
276
277 PMIX_VALUE_CONSTRUCT(&value);
278 value.type = PMIX_STRING;
279 value.data.string = strdup("nothing");
280 PMIx_Put(PMIX_LOCAL, "WASSUP", &value);
281 PMIX_VALUE_DESTRUCT(&value);
282
283
284 printf("Hello, World, I am %d of %d\n", rank, size);
285
286
287 PMIX_INFO_CREATE(info, 2);
288 PMIX_INFO_LOAD(&info[0], "SOMETHING", "foobar", PMIX_STRING);
289 PMIX_INFO_LOAD(&info[1], PMIX_RANGE, &range, PMIX_DATA_RANGE);
290 PMIx_Publish(info, 2);
291 PMIX_INFO_FREE(info, 2);
292
293
294 if (pthread_join(mythread, NULL)) {
295 fprintf(stderr, "Error joining thread\n");
296 }
297
298 done:
299 MPI_Finalize();
300 return 0;
301 }