This source file includes following definitions.
- tcon
- tdes
- connected
- finalized
- abort_fn
- fencenb_fn
- dmodex_fn
- publish_fn
- lookup_fn
- unpublish_fn
- _release_cb
- release_cb
- spawn_fn
- connect_fn
- disconnect_fn
- regevents_fn
- deregevents_fn
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 #include <pthread.h>
17 #include <stdio.h>
18 #include "server_callbacks.h"
19 #include "src/util/argv.h"
20 #include "test_server.h"
21
22 extern bool spawn_wait;
23
24 pmix_server_module_t mymodule = {
25 .client_connected = connected,
26 .client_finalized = finalized,
27 .abort = abort_fn,
28 .fence_nb = fencenb_fn,
29 .direct_modex = dmodex_fn,
30 .publish = publish_fn,
31 .lookup = lookup_fn,
32 .unpublish = unpublish_fn,
33 .spawn = spawn_fn,
34 .connect = connect_fn,
35 .disconnect = disconnect_fn,
36 .register_events = regevents_fn,
37 .deregister_events = deregevents_fn
38 };
39
40 typedef struct {
41 pmix_list_item_t super;
42 pmix_info_t data;
43 char *namespace_published;
44 int rank_published;
45 } pmix_test_info_t;
46
47 static void tcon(pmix_test_info_t *p)
48 {
49 PMIX_INFO_CONSTRUCT(&p->data);
50 }
51
52 static void tdes(pmix_test_info_t *p)
53 {
54 PMIX_INFO_DESTRUCT(&p->data);
55 }
56
57 PMIX_CLASS_INSTANCE(pmix_test_info_t,
58 pmix_list_item_t,
59 tcon, tdes);
60
61 pmix_list_t *pmix_test_published_list = NULL;
62
63 static int finalized_count = 0;
64
65 pmix_status_t connected(const pmix_proc_t *proc, void *server_object,
66 pmix_op_cbfunc_t cbfunc, void *cbdata)
67 {
68 if (NULL != cbfunc) {
69 cbfunc(PMIX_SUCCESS, cbdata);
70 }
71 return PMIX_SUCCESS;
72 }
73
74 pmix_status_t finalized(const pmix_proc_t *proc, void *server_object,
75 pmix_op_cbfunc_t cbfunc, void *cbdata)
76 {
77 cli_info_t *cli = NULL;
78 int i;
79 for (i = 0; i < cli_info_cnt; i++) {
80 if((proc->rank == cli_info[i].rank) &&
81 (0 == strcmp(proc->nspace, cli_info[i].ns))){
82 cli = &cli_info[i];
83 break;
84 }
85 }
86 if (NULL == cli) {
87 TEST_ERROR(("cannot found rank %d", proc->rank));
88 return PMIX_SUCCESS;
89 }
90 if( CLI_TERM <= cli->state ){
91 TEST_ERROR(("double termination of rank %d", proc->rank));
92 return PMIX_SUCCESS;
93 }
94 TEST_VERBOSE(("Rank %s:%d terminated", proc->nspace, proc->rank));
95 cli_finalize(cli);
96 finalized_count++;
97 if (finalized_count == cli_info_cnt) {
98 if (NULL != pmix_test_published_list) {
99 PMIX_LIST_RELEASE(pmix_test_published_list);
100 }
101 }
102 if (NULL != cbfunc) {
103 cbfunc(PMIX_SUCCESS, cbdata);
104 }
105 return PMIX_SUCCESS;
106 }
107
108 pmix_status_t abort_fn(const pmix_proc_t *proc, void *server_object,
109 int status, const char msg[],
110 pmix_proc_t procs[], size_t nprocs,
111 pmix_op_cbfunc_t cbfunc, void *cbdata)
112 {
113 if (NULL != cbfunc) {
114 cbfunc(PMIX_SUCCESS, cbdata);
115 }
116 TEST_VERBOSE(("Abort is called with status = %d, msg = %s",
117 status, msg));
118 test_abort = true;
119 return PMIX_SUCCESS;
120 }
121
122 pmix_status_t fencenb_fn(const pmix_proc_t procs[], size_t nprocs,
123 const pmix_info_t info[], size_t ninfo,
124 char *data, size_t ndata,
125 pmix_modex_cbfunc_t cbfunc, void *cbdata)
126 {
127 TEST_VERBOSE(("Getting data for %s:%d",
128 procs[0].nspace, procs[0].rank));
129
130 if ((pmix_list_get_size(server_list) == 1) && (my_server_id == 0)) {
131 if (NULL != cbfunc) {
132 cbfunc(PMIX_SUCCESS, data, ndata, cbdata, NULL, NULL);
133 }
134 return PMIX_SUCCESS;
135 }
136 return server_fence_contrib(data, ndata, cbfunc, cbdata);
137 }
138
139 pmix_status_t dmodex_fn(const pmix_proc_t *proc,
140 const pmix_info_t info[], size_t ninfo,
141 pmix_modex_cbfunc_t cbfunc, void *cbdata)
142 {
143 TEST_VERBOSE(("Getting data for %s:%d", proc->nspace, proc->rank));
144
145
146 if ((pmix_list_get_size(server_list) == 1) && (my_server_id == 0)) {
147 return PMIX_ERR_NOT_FOUND;
148 }
149
150 return server_dmdx_get(proc->nspace, proc->rank, cbfunc, cbdata);
151 }
152
153 pmix_status_t publish_fn(const pmix_proc_t *proc,
154 const pmix_info_t info[], size_t ninfo,
155 pmix_op_cbfunc_t cbfunc, void *cbdata)
156 {
157 size_t i;
158 int found;
159 pmix_test_info_t *new_info, *old_info;
160 if (NULL == pmix_test_published_list) {
161 pmix_test_published_list = PMIX_NEW(pmix_list_t);
162 }
163 for (i = 0; i < ninfo; i++) {
164 found = 0;
165 PMIX_LIST_FOREACH(old_info, pmix_test_published_list, pmix_test_info_t) {
166 if (!strcmp(old_info->data.key, info[i].key)) {
167 found = 1;
168 break;
169 }
170 }
171 if (!found) {
172 new_info = PMIX_NEW(pmix_test_info_t);
173 strncpy(new_info->data.key, info[i].key, strlen(info[i].key)+1);
174 pmix_value_xfer(&new_info->data.value, (pmix_value_t*)&info[i].value);
175 new_info->namespace_published = strdup(proc->nspace);
176 new_info->rank_published = proc->rank;
177 pmix_list_append(pmix_test_published_list, &new_info->super);
178 }
179 }
180 if (NULL != cbfunc) {
181 cbfunc(PMIX_SUCCESS, cbdata);
182 }
183 return PMIX_SUCCESS;
184 }
185
186 pmix_status_t lookup_fn(const pmix_proc_t *proc, char **keys,
187 const pmix_info_t info[], size_t ninfo,
188 pmix_lookup_cbfunc_t cbfunc, void *cbdata)
189 {
190 size_t i, ndata, ret;
191 pmix_status_t rc = PMIX_SUCCESS;
192 pmix_pdata_t *pdata;
193 pmix_test_info_t *tinfo;
194 if (NULL == pmix_test_published_list) {
195 return PMIX_ERR_NOT_FOUND;
196 }
197 ndata = pmix_argv_count(keys);
198 PMIX_PDATA_CREATE(pdata, ndata);
199 ret = 0;
200 for (i = 0; i < ndata; i++) {
201 PMIX_LIST_FOREACH(tinfo, pmix_test_published_list, pmix_test_info_t) {
202 if (0 == strcmp(tinfo->data.key, keys[i])) {
203 (void)strncpy(pdata[i].proc.nspace, tinfo->namespace_published, PMIX_MAX_NSLEN);
204 pdata[i].proc.rank = tinfo->rank_published;
205 memset(pdata[i].key, 0, PMIX_MAX_KEYLEN+1);
206 (void)strncpy(pdata[i].key, keys[i], PMIX_MAX_KEYLEN);
207 pmix_value_xfer(&pdata[i].value, &tinfo->data.value);
208 ret++;
209 break;
210 }
211 }
212 }
213 if (ret != ndata) {
214 rc = PMIX_ERR_NOT_FOUND;
215 goto error;
216 }
217 if (NULL != cbfunc) {
218 cbfunc(PMIX_SUCCESS, pdata, ndata, cbdata);
219 }
220 error:
221 PMIX_PDATA_FREE(pdata, ndata);
222 return rc;
223 }
224
225 pmix_status_t unpublish_fn(const pmix_proc_t *proc, char **keys,
226 const pmix_info_t info[], size_t ninfo,
227 pmix_op_cbfunc_t cbfunc, void *cbdata)
228 {
229 size_t i;
230 pmix_test_info_t *iptr, *next;
231 if (NULL == pmix_test_published_list) {
232 return PMIX_ERR_NOT_FOUND;
233 }
234 PMIX_LIST_FOREACH_SAFE(iptr, next, pmix_test_published_list, pmix_test_info_t) {
235 if (1) {
236 if (NULL == keys) {
237 pmix_list_remove_item(pmix_test_published_list, &iptr->super);
238 PMIX_RELEASE(iptr);
239 } else {
240 ninfo = pmix_argv_count(keys);
241 for (i = 0; i < ninfo; i++) {
242 if (!strcmp(iptr->data.key, keys[i])) {
243 pmix_list_remove_item(pmix_test_published_list, &iptr->super);
244 PMIX_RELEASE(iptr);
245 break;
246 }
247 }
248 }
249 }
250 }
251 if (NULL != cbfunc) {
252 cbfunc(PMIX_SUCCESS, cbdata);
253 }
254 return PMIX_SUCCESS;
255 }
256
257 typedef struct {
258 pmix_status_t status;
259 pmix_spawn_cbfunc_t cbfunc;
260 void *cbdata;
261 } release_cbdata;
262
263
264 static void * _release_cb(void *arg)
265 {
266 release_cbdata *cb = (release_cbdata*)arg;
267 if (NULL != cb->cbfunc) {
268 cb->cbfunc(cb->status, "foobar", cb->cbdata);
269 }
270 free(cb);
271 spawn_wait = false;
272 pthread_exit(NULL);
273 }
274
275 static void release_cb(pmix_status_t status, void *cbdata)
276 {
277 pthread_t thread;
278
279 if (0 > pthread_create(&thread, NULL, _release_cb, cbdata)) {
280 spawn_wait = false;
281 return;
282 }
283 pthread_detach(thread);
284 }
285
286 pmix_status_t spawn_fn(const pmix_proc_t *proc,
287 const pmix_info_t job_info[], size_t ninfo,
288 const pmix_app_t apps[], size_t napps,
289 pmix_spawn_cbfunc_t cbfunc, void *cbdata)
290 {
291 release_cbdata *cb = malloc(sizeof(release_cbdata));
292
293 cb->status = PMIX_SUCCESS;
294 cb->cbfunc = cbfunc;
295 cb->cbdata = cbdata;
296
297 spawn_wait = true;
298 PMIx_server_register_nspace("foobar", napps, NULL, 0, release_cb, (void*)cb);
299 return PMIX_SUCCESS;
300 }
301 static int numconnect = 0;
302
303 pmix_status_t connect_fn(const pmix_proc_t procs[], size_t nprocs,
304 const pmix_info_t info[], size_t ninfo,
305 pmix_op_cbfunc_t cbfunc, void *cbdata)
306 {
307 if (NULL != cbfunc) {
308 cbfunc(PMIX_SUCCESS, cbdata);
309 }
310 numconnect++;
311 return PMIX_SUCCESS;
312 }
313
314 pmix_status_t disconnect_fn(const pmix_proc_t procs[], size_t nprocs,
315 const pmix_info_t info[], size_t ninfo,
316 pmix_op_cbfunc_t cbfunc, void *cbdata)
317 {
318 if (NULL != cbfunc) {
319 cbfunc(PMIX_SUCCESS, cbdata);
320 }
321 return PMIX_SUCCESS;
322 }
323
324 pmix_status_t regevents_fn (pmix_status_t *codes, size_t ncodes,
325 const pmix_info_t info[], size_t ninfo,
326 pmix_op_cbfunc_t cbfunc, void *cbdata)
327 {
328 TEST_VERBOSE ((" pmix host server regevents_fn called "));
329 if (NULL != cbfunc) {
330 cbfunc(PMIX_SUCCESS, cbdata);
331 }
332 return PMIX_SUCCESS;
333 }
334
335 pmix_status_t deregevents_fn (pmix_status_t *codes, size_t ncodes,
336 pmix_op_cbfunc_t cbfunc, void *cbdata)
337 {
338 TEST_VERBOSE ((" pmix host server deregevents_fn called "));
339 if (NULL != cbfunc) {
340 cbfunc(PMIX_SUCCESS, cbdata);
341 }
342 return PMIX_SUCCESS;
343 }