This source file includes following definitions.
- notification_fn
- op_callbk
- errhandler_reg_callbk
- grpcomplete
- invitefn
- main
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 #include <stdbool.h>
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <unistd.h>
30 #include <time.h>
31 #include <pthread.h>
32
33 #include <pmix.h>
34
35 typedef struct {
36 pthread_mutex_t mutex;
37 pthread_cond_t cond;
38 volatile bool active;
39 pmix_status_t status;
40 } mylock_t;
41
42 #define DEBUG_CONSTRUCT_LOCK(l) \
43 do { \
44 pthread_mutex_init(&(l)->mutex, NULL); \
45 pthread_cond_init(&(l)->cond, NULL); \
46 (l)->active = true; \
47 (l)->status = PMIX_SUCCESS; \
48 } while(0)
49
50 #define DEBUG_DESTRUCT_LOCK(l) \
51 do { \
52 pthread_mutex_destroy(&(l)->mutex); \
53 pthread_cond_destroy(&(l)->cond); \
54 } while(0)
55
56 #define DEBUG_WAIT_THREAD(lck) \
57 do { \
58 pthread_mutex_lock(&(lck)->mutex); \
59 while ((lck)->active) { \
60 pthread_cond_wait(&(lck)->cond, &(lck)->mutex); \
61 } \
62 pthread_mutex_unlock(&(lck)->mutex); \
63 } while(0)
64
65 #define DEBUG_WAKEUP_THREAD(lck) \
66 do { \
67 pthread_mutex_lock(&(lck)->mutex); \
68 (lck)->active = false; \
69 pthread_cond_broadcast(&(lck)->cond); \
70 pthread_mutex_unlock(&(lck)->mutex); \
71 } while(0)
72
73
74 static pmix_proc_t myproc;
75 static mylock_t invitedlock;
76
77 static void notification_fn(size_t evhdlr_registration_id,
78 pmix_status_t status,
79 const pmix_proc_t *source,
80 pmix_info_t info[], size_t ninfo,
81 pmix_info_t results[], size_t nresults,
82 pmix_event_notification_cbfunc_fn_t cbfunc,
83 void *cbdata)
84 {
85 fprintf(stderr, "Client %s:%d NOTIFIED with status %d\n", myproc.nspace, myproc.rank, status);
86 if (NULL != cbfunc) {
87 cbfunc(PMIX_EVENT_ACTION_COMPLETE, NULL, 0, NULL, NULL, cbdata);
88 }
89 }
90
91 static void op_callbk(pmix_status_t status,
92 void *cbdata)
93 {
94 mylock_t *lock = (mylock_t*)cbdata;
95
96 lock->status = status;
97 DEBUG_WAKEUP_THREAD(lock);
98 }
99
100 static void errhandler_reg_callbk(pmix_status_t status,
101 size_t errhandler_ref,
102 void *cbdata)
103 {
104 mylock_t *lock = (mylock_t*)cbdata;
105
106 lock->status = status;
107 DEBUG_WAKEUP_THREAD(lock);
108 }
109
110 static void grpcomplete(pmix_status_t status,
111 pmix_info_t *info, size_t ninfo,
112 void *cbdata,
113 pmix_release_cbfunc_t release_fn,
114 void *release_cbdata)
115 {
116 fprintf(stderr, "%s:%d GRPCOMPLETE\n", myproc.nspace, myproc.rank);
117 DEBUG_WAKEUP_THREAD(&invitedlock);
118 }
119
120 static void invitefn(size_t evhdlr_registration_id,
121 pmix_status_t status,
122 const pmix_proc_t *source,
123 pmix_info_t info[], size_t ninfo,
124 pmix_info_t results[], size_t nresults,
125 pmix_event_notification_cbfunc_fn_t cbfunc,
126 void *cbdata)
127 {
128 size_t n;
129 char *grp;
130 pmix_status_t rc;
131
132
133 if (PMIX_CHECK_PROCID(source, &myproc)) {
134 fprintf(stderr, "%s:%d INVITED, BUT LEADER\n", myproc.nspace, myproc.rank);
135
136 if (NULL != cbfunc) {
137 cbfunc(PMIX_EVENT_ACTION_COMPLETE, NULL, 0, NULL, NULL, cbdata);
138 }
139 return;
140 }
141
142
143 for (n=0; n < ninfo; n++) {
144 if (PMIX_CHECK_KEY(&info[n], PMIX_GROUP_ID)) {
145 grp = info[n].value.data.string;
146 break;
147 }
148 }
149 fprintf(stderr, "Client %s:%d INVITED by source %s:%d\n",
150 myproc.nspace, myproc.rank,
151 source->nspace, source->rank);
152 invitedlock.status = status;
153 fprintf(stderr, "%s:%d ACCEPTING INVITE\n", myproc.nspace, myproc.rank);
154 rc = PMIx_Group_join_nb(grp, source, PMIX_GROUP_ACCEPT, NULL, 0, grpcomplete, NULL);
155 if (PMIX_SUCCESS != rc) {
156 fprintf(stderr, "%s:%d Error in Group_join_nb: %sn", myproc.nspace, myproc.rank, PMIx_Error_string(rc));
157 }
158
159
160 if (NULL != cbfunc) {
161 cbfunc(PMIX_EVENT_ACTION_COMPLETE, NULL, 0, NULL, NULL, cbdata);
162 }
163 }
164
165
166 int main(int argc, char **argv)
167 {
168 int rc;
169 pmix_value_t value;
170 pmix_value_t *val = &value;
171 pmix_proc_t proc, *procs;
172 uint32_t nprocs;
173 mylock_t lock;
174 pmix_status_t code;
175 pmix_info_t *results;
176 size_t nresults;
177
178
179 if (PMIX_SUCCESS != (rc = PMIx_Init(&myproc, NULL, 0))) {
180 fprintf(stderr, "Client ns %s rank %d: PMIx_Init failed: %s\n", myproc.nspace, myproc.rank, PMIx_Error_string(rc));
181 exit(0);
182 }
183 fprintf(stderr, "[%d] Client ns %s rank %d: Running\n", (int)getpid(), myproc.nspace, myproc.rank);
184
185 DEBUG_CONSTRUCT_LOCK(&invitedlock);
186
187 PMIX_PROC_CONSTRUCT(&proc);
188 (void)strncpy(proc.nspace, myproc.nspace, PMIX_MAX_NSLEN);
189 proc.rank = PMIX_RANK_WILDCARD;
190
191
192 if (PMIX_SUCCESS != (rc = PMIx_Get(&proc, PMIX_UNIV_SIZE, NULL, 0, &val))) {
193 fprintf(stderr, "Client ns %s rank %d: PMIx_Get universe size failed: %s\n", myproc.nspace, myproc.rank, PMIx_Error_string(rc));
194 goto done;
195 }
196 nprocs = val->data.uint32;
197 PMIX_VALUE_RELEASE(val);
198 if (nprocs < 4) {
199 if (0 == myproc.rank) {
200 fprintf(stderr, "This example requires a minimum of 4 processes\n");
201 }
202 goto done;
203 }
204 fprintf(stderr, "Client %s:%d universe size %d\n", myproc.nspace, myproc.rank, nprocs);
205
206
207 DEBUG_CONSTRUCT_LOCK(&lock);
208 PMIx_Register_event_handler(NULL, 0, NULL, 0,
209 notification_fn, errhandler_reg_callbk, (void*)&lock);
210 DEBUG_WAIT_THREAD(&lock);
211 rc = lock.status;
212 DEBUG_DESTRUCT_LOCK(&lock);
213 if (PMIX_SUCCESS != rc) {
214 goto done;
215 }
216
217
218 DEBUG_CONSTRUCT_LOCK(&lock);
219 code = PMIX_GROUP_INVITED;
220 PMIx_Register_event_handler(&code, 1, NULL, 0,
221 invitefn, errhandler_reg_callbk, (void*)&lock);
222 DEBUG_WAIT_THREAD(&lock);
223 rc = lock.status;
224 DEBUG_DESTRUCT_LOCK(&lock);
225 if (PMIX_SUCCESS != rc) {
226 goto done;
227 }
228
229
230 PMIX_PROC_CONSTRUCT(&proc);
231 (void)strncpy(proc.nspace, myproc.nspace, PMIX_MAX_NSLEN);
232 proc.rank = PMIX_RANK_WILDCARD;
233 if (PMIX_SUCCESS != (rc = PMIx_Fence(&proc, 1, NULL, 0))) {
234 fprintf(stderr, "Client ns %s rank %d: PMIx_Fence failed: %d\n", myproc.nspace, myproc.rank, rc);
235 goto done;
236 }
237
238
239 if (0 == myproc.rank) {
240 fprintf(stderr, "%d executing Group_invite\n", myproc.rank);
241 nprocs = 3;
242 PMIX_PROC_CREATE(procs, nprocs);
243 PMIX_PROC_LOAD(&procs[0], myproc.nspace, 0);
244 PMIX_PROC_LOAD(&procs[1], myproc.nspace, 2);
245 PMIX_PROC_LOAD(&procs[2], myproc.nspace, 3);
246 rc = PMIx_Group_invite("ourgroup", procs, nprocs, NULL, 0, &results, &nresults);
247 if (PMIX_SUCCESS != rc) {
248 fprintf(stderr, "Client ns %s rank %d: PMIx_Group_invite failed: %s\n", myproc.nspace, myproc.rank, PMIx_Error_string(rc));
249 goto done;
250 }
251 PMIX_PROC_FREE(procs, nprocs);
252 fprintf(stderr, "%s:%d Execute fence across group\n", myproc.nspace, myproc.rank);
253 PMIX_PROC_LOAD(&proc, "ourgroup", PMIX_RANK_WILDCARD);
254 rc = PMIx_Fence(&proc, 1, NULL, 0);
255 if (PMIX_SUCCESS != rc) {
256 fprintf(stderr, "Client ns %s rank %d: PMIx_Fence across group failed: %d\n", myproc.nspace, myproc.rank, rc);
257 goto done;
258 }
259 fprintf(stderr, "%d executing Group_destruct\n", myproc.rank);
260 rc = PMIx_Group_destruct("ourgroup", NULL, 0);
261 if (PMIX_SUCCESS != rc) {
262 fprintf(stderr, "Client ns %s rank %d: PMIx_Group_destruct failed: %s\n", myproc.nspace, myproc.rank, PMIx_Error_string(rc));
263 goto done;
264 }
265 } else if (2 == myproc.rank || 3 == myproc.rank) {
266
267 fprintf(stderr, "%s:%d waiting to be invited\n", myproc.nspace, myproc.rank);
268 DEBUG_WAIT_THREAD(&invitedlock);
269 DEBUG_DESTRUCT_LOCK(&invitedlock);
270 fprintf(stderr, "%s:%d Execute fence across group\n", myproc.nspace, myproc.rank);
271 PMIX_PROC_LOAD(&proc, "ourgroup", PMIX_RANK_WILDCARD);
272 rc = PMIx_Fence(&proc, 1, NULL, 0);
273 if (PMIX_SUCCESS != rc) {
274 fprintf(stderr, "Client ns %s rank %d: PMIx_Fence across group failed: %d\n", myproc.nspace, myproc.rank, rc);
275 goto done;
276 }
277 fprintf(stderr, "%d executing Group_destruct\n", myproc.rank);
278 rc = PMIx_Group_destruct("ourgroup", NULL, 0);
279 if (PMIX_SUCCESS != rc) {
280 fprintf(stderr, "Client ns %s rank %d: PMIx_Group_destruct failed: %s\n", myproc.nspace, myproc.rank, PMIx_Error_string(rc));
281 goto done;
282 }
283 }
284
285
286 PMIX_PROC_CONSTRUCT(&proc);
287 (void)strncpy(proc.nspace, myproc.nspace, PMIX_MAX_NSLEN);
288 proc.rank = PMIX_RANK_WILDCARD;
289 if (PMIX_SUCCESS != (rc = PMIx_Fence(&proc, 1, NULL, 0))) {
290 fprintf(stderr, "Client ns %s rank %d: PMIx_Fence failed: %s\n", myproc.nspace, myproc.rank, PMIx_Error_string(rc));
291 goto done;
292 }
293
294 done:
295
296 DEBUG_CONSTRUCT_LOCK(&lock);
297 PMIx_Deregister_event_handler(1, op_callbk, &lock);
298 DEBUG_WAIT_THREAD(&lock);
299 DEBUG_DESTRUCT_LOCK(&lock);
300
301 fprintf(stderr, "Client ns %s rank %d: Finalizing\n", myproc.nspace, myproc.rank);
302 if (PMIX_SUCCESS != (rc = PMIx_Finalize(NULL, 0))) {
303 fprintf(stderr, "Client ns %s rank %d:PMIx_Finalize failed: %s\n", myproc.nspace, myproc.rank, PMIx_Error_string(rc));
304 } else {
305 fprintf(stderr, "Client ns %s rank %d:PMIx_Finalize successfully completed\n", myproc.nspace, myproc.rank);
306 }
307 fprintf(stderr, "%s:%d COMPLETE\n", myproc.nspace, myproc.rank);
308 fflush(stderr);
309 return(0);
310 }