This source file includes following definitions.
- notification_fn
- op_callbk
- errhandler_reg_callbk
- main
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 #include <stdbool.h>
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <unistd.h>
30 #include <time.h>
31 #include <pthread.h>
32
33 #include <pmix.h>
34
35 typedef struct {
36 pthread_mutex_t mutex;
37 pthread_cond_t cond;
38 volatile bool active;
39 pmix_status_t status;
40 } mylock_t;
41
42 #define DEBUG_CONSTRUCT_LOCK(l) \
43 do { \
44 pthread_mutex_init(&(l)->mutex, NULL); \
45 pthread_cond_init(&(l)->cond, NULL); \
46 (l)->active = true; \
47 (l)->status = PMIX_SUCCESS; \
48 } while(0)
49
50 #define DEBUG_DESTRUCT_LOCK(l) \
51 do { \
52 pthread_mutex_destroy(&(l)->mutex); \
53 pthread_cond_destroy(&(l)->cond); \
54 } while(0)
55
56 #define DEBUG_WAIT_THREAD(lck) \
57 do { \
58 pthread_mutex_lock(&(lck)->mutex); \
59 while ((lck)->active) { \
60 pthread_cond_wait(&(lck)->cond, &(lck)->mutex); \
61 } \
62 pthread_mutex_unlock(&(lck)->mutex); \
63 } while(0)
64
65 #define DEBUG_WAKEUP_THREAD(lck) \
66 do { \
67 pthread_mutex_lock(&(lck)->mutex); \
68 (lck)->active = false; \
69 pthread_cond_broadcast(&(lck)->cond); \
70 pthread_mutex_unlock(&(lck)->mutex); \
71 } while(0)
72
73
74 static pmix_proc_t myproc;
75
76 static void notification_fn(size_t evhdlr_registration_id,
77 pmix_status_t status,
78 const pmix_proc_t *source,
79 pmix_info_t info[], size_t ninfo,
80 pmix_info_t results[], size_t nresults,
81 pmix_event_notification_cbfunc_fn_t cbfunc,
82 void *cbdata)
83 {
84 fprintf(stderr, "Client %s:%d NOTIFIED with status %d\n", myproc.nspace, myproc.rank, status);
85 }
86
87 static void op_callbk(pmix_status_t status,
88 void *cbdata)
89 {
90 mylock_t *lock = (mylock_t*)cbdata;
91
92 fprintf(stderr, "Client %s:%d OP CALLBACK CALLED WITH STATUS %d\n", myproc.nspace, myproc.rank, status);
93 lock->status = status;
94 DEBUG_WAKEUP_THREAD(lock);
95 }
96
97 static void errhandler_reg_callbk(pmix_status_t status,
98 size_t errhandler_ref,
99 void *cbdata)
100 {
101 mylock_t *lock = (mylock_t*)cbdata;
102
103 fprintf(stderr, "Client %s:%d ERRHANDLER REGISTRATION CALLBACK CALLED WITH STATUS %d, ref=%lu\n",
104 myproc.nspace, myproc.rank, status, (unsigned long)errhandler_ref);
105 lock->status = status;
106 DEBUG_WAKEUP_THREAD(lock);
107 }
108
109 int main(int argc, char **argv)
110 {
111 int rc;
112 pmix_value_t value;
113 pmix_value_t *val = &value;
114 pmix_proc_t proc, *procs;
115 uint32_t nprocs;
116 mylock_t lock;
117 pmix_info_t *results, info;
118 size_t nresults, cid;
119
120
121 if (PMIX_SUCCESS != (rc = PMIx_Init(&myproc, NULL, 0))) {
122 fprintf(stderr, "Client ns %s rank %d: PMIx_Init failed: %s\n", myproc.nspace, myproc.rank, PMIx_Error_string(rc));
123 exit(0);
124 }
125 fprintf(stderr, "Client ns %s rank %d: Running\n", myproc.nspace, myproc.rank);
126
127 PMIX_PROC_CONSTRUCT(&proc);
128 (void)strncpy(proc.nspace, myproc.nspace, PMIX_MAX_NSLEN);
129 proc.rank = PMIX_RANK_WILDCARD;
130
131
132 if (PMIX_SUCCESS != (rc = PMIx_Get(&proc, PMIX_UNIV_SIZE, NULL, 0, &val))) {
133 fprintf(stderr, "Client ns %s rank %d: PMIx_Get universe size failed: %s\n", myproc.nspace, myproc.rank, PMIx_Error_string(rc));
134 goto done;
135 }
136 nprocs = val->data.uint32;
137 PMIX_VALUE_RELEASE(val);
138 if (nprocs < 4) {
139 if (0 == myproc.rank) {
140 fprintf(stderr, "This example requires a minimum of 4 processes\n");
141 }
142 goto done;
143 }
144 fprintf(stderr, "Client %s:%d universe size %d\n", myproc.nspace, myproc.rank, nprocs);
145
146
147 DEBUG_CONSTRUCT_LOCK(&lock);
148 PMIx_Register_event_handler(NULL, 0, NULL, 0,
149 notification_fn, errhandler_reg_callbk, (void*)&lock);
150 DEBUG_WAIT_THREAD(&lock);
151 rc = lock.status;
152 DEBUG_DESTRUCT_LOCK(&lock);
153 if (PMIX_SUCCESS != rc) {
154 goto done;
155 }
156
157
158 PMIX_PROC_CONSTRUCT(&proc);
159 (void)strncpy(proc.nspace, myproc.nspace, PMIX_MAX_NSLEN);
160 proc.rank = PMIX_RANK_WILDCARD;
161 if (PMIX_SUCCESS != (rc = PMIx_Fence(&proc, 1, NULL, 0))) {
162 fprintf(stderr, "Client ns %s rank %d: PMIx_Fence failed: %d\n", myproc.nspace, myproc.rank, rc);
163 goto done;
164 }
165
166
167 if (0 == myproc.rank || 2 == myproc.rank || 3 == myproc.rank) {
168 fprintf(stderr, "%d executing Group_construct\n", myproc.rank);
169 nprocs = 3;
170 PMIX_PROC_CREATE(procs, nprocs);
171 PMIX_PROC_LOAD(&procs[0], myproc.nspace, 0);
172 PMIX_PROC_LOAD(&procs[1], myproc.nspace, 2);
173 PMIX_PROC_LOAD(&procs[2], myproc.nspace, 3);
174 PMIX_INFO_LOAD(&info, PMIX_GROUP_ASSIGN_CONTEXT_ID, NULL, PMIX_BOOL);
175 rc = PMIx_Group_construct("ourgroup", procs, nprocs, &info, 1, &results, &nresults);
176 if (PMIX_SUCCESS != rc) {
177 fprintf(stderr, "Client ns %s rank %d: PMIx_Group_construct failed: %s\n", myproc.nspace, myproc.rank, PMIx_Error_string(rc));
178 goto done;
179 }
180
181 if (NULL != results) {
182 cid = 0;
183 PMIX_VALUE_GET_NUMBER(rc, &results[0].value, cid, size_t);
184 fprintf(stderr, "%d Group construct complete with status %s KEY %s CID %d\n",
185 myproc.rank, PMIx_Error_string(rc), results[0].key, (int)cid);
186 } else {
187 fprintf(stderr, "%d Group construct complete, but no CID returned\n", myproc.rank);
188 }
189 PMIX_PROC_FREE(procs, nprocs);
190 fprintf(stderr, "%d executing Group_destruct\n", myproc.rank);
191 rc = PMIx_Group_destruct("ourgroup", NULL, 0);
192 if (PMIX_SUCCESS != rc) {
193 fprintf(stderr, "Client ns %s rank %d: PMIx_Group_destruct failed: %s\n", myproc.nspace, myproc.rank, PMIx_Error_string(rc));
194 goto done;
195 }
196 }
197
198 done:
199
200 DEBUG_CONSTRUCT_LOCK(&lock);
201 PMIx_Deregister_event_handler(1, op_callbk, &lock);
202 DEBUG_WAIT_THREAD(&lock);
203 DEBUG_DESTRUCT_LOCK(&lock);
204
205 fprintf(stderr, "Client ns %s rank %d: Finalizing\n", myproc.nspace, myproc.rank);
206 if (PMIX_SUCCESS != (rc = PMIx_Finalize(NULL, 0))) {
207 fprintf(stderr, "Client ns %s rank %d:PMIx_Finalize failed: %s\n", myproc.nspace, myproc.rank, PMIx_Error_string(rc));
208 } else {
209 fprintf(stderr, "Client ns %s rank %d:PMIx_Finalize successfully completed\n", myproc.nspace, myproc.rank);
210 }
211 fprintf(stderr, "%s:%d COMPLETE\n", myproc.nspace, myproc.rank);
212 fflush(stderr);
213 return(0);
214 }