This source file includes following definitions.
- completion_handler
- myerr
- errreg_cbfunc
- op2cbfunc
- pmix1_server_init
- pmix1_server_finalize
- pmix1_server_gen_regex
- pmix1_server_gen_ppn
- opcbfunc
- pmix1_server_register_nspace
- pmix1_server_deregister_nspace
- pmix1_server_register_client
- pmix1_server_deregister_client
- pmix1_server_setup_fork
- dmdx_response
- pmix1_server_dmodex
- pmix1_server_notify_error
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 #include "opal_config.h"
18 #include "opal/constants.h"
19 #include "opal/types.h"
20
21 #ifdef HAVE_STRING_H
22 #include <string.h>
23 #endif
24 #ifdef HAVE_UNISTD_H
25 #include <unistd.h>
26 #endif
27
28 #include "opal/dss/dss.h"
29 #include "opal/mca/event/event.h"
30 #include "opal/mca/hwloc/base/base.h"
31 #include "opal/runtime/opal.h"
32 #include "opal/runtime/opal_progress_threads.h"
33 #include "opal/util/argv.h"
34 #include "opal/util/error.h"
35 #include "opal/util/output.h"
36 #include "opal/util/proc.h"
37 #include "opal/util/show_help.h"
38 #include "opal/util/string_copy.h"
39 #include "opal/mca/pmix/base/base.h"
40 #include "pmix1x.h"
41
42 #include "pmix.h"
43 #include "pmix_server.h"
44
45
46
47
48
49
50 extern pmix_server_module_t mymodule;
51 extern opal_pmix_server_module_t *host_module;
52 static char *dbgvalue=NULL;
53 static int errhdler_ref = 0;
54
55 static void completion_handler(int status, opal_list_t *results,
56 opal_pmix_op_cbfunc_t cbfunc, void *thiscbdata,
57 void *notification_cbdata) {
58 int * cond = (int *)notification_cbdata;
59 *cond = 0;
60 }
61
62 #define PMIX_WAIT_FOR_COMPLETION(a) \
63 do { \
64 while ((a)) { \
65 usleep(10); \
66 } \
67 } while (0);
68
69 static void myerr(pmix_status_t status,
70 pmix_proc_t procs[], size_t nprocs,
71 pmix_info_t info[], size_t ninfo)
72 {
73 int rc;
74 opal_list_t plist, ilist;
75 opal_namelist_t *nm;
76 opal_value_t *iptr;
77 volatile int cond = 1;
78 size_t n;
79
80
81 rc = pmix1_convert_rc(status);
82
83
84 OBJ_CONSTRUCT(&plist, opal_list_t);
85 for (n=0; n < nprocs; n++) {
86 nm = OBJ_NEW(opal_namelist_t);
87 nm->name.jobid = strtoul(procs[n].nspace, NULL, 10);
88 nm->name.vpid = procs[n].rank;
89 opal_list_append(&plist, &nm->super);
90 }
91
92
93 OBJ_CONSTRUCT(&ilist, opal_list_t);
94 for (n=0; n < ninfo; n++) {
95 iptr = OBJ_NEW(opal_value_t);
96 iptr->key = strdup(info[n].key);
97 pmix1_value_unload(iptr, &info[n].value);
98 opal_list_append(&plist, &iptr->super);
99 }
100
101
102 opal_pmix_base_evhandler(rc, &OPAL_PROC_MY_NAME, &plist, &ilist, completion_handler, (void *)&cond);
103 PMIX_WAIT_FOR_COMPLETION(cond);
104
105 OPAL_LIST_DESTRUCT(&plist);
106 OPAL_LIST_DESTRUCT(&ilist);
107 }
108
109 static void errreg_cbfunc(pmix_status_t status,
110 int errhandler_ref,
111 void *cbdata)
112 {
113 errhdler_ref = errhandler_ref;
114 opal_output_verbose(5, opal_pmix_base_framework.framework_output,
115 "PMIX server errreg_cbfunc - error handler registered status=%d, reference=%d",
116 status, errhandler_ref);
117 }
118
119 static void op2cbfunc(pmix_status_t status, void *cbdata)
120 {
121 volatile bool *active = (volatile bool*)cbdata;
122 if (active)
123 *active = false;
124 }
125
126 int pmix1_server_init(opal_pmix_server_module_t *module,
127 opal_list_t *info)
128 {
129 pmix_status_t rc;
130 int dbg;
131 opal_value_t *kv;
132 pmix_info_t *pinfo;
133 size_t sz, n;
134 opal_pmix1_jobid_trkr_t *job;
135 volatile bool active;
136
137 if (0 < (dbg = opal_output_get_verbosity(opal_pmix_base_framework.framework_output))) {
138 asprintf(&dbgvalue, "PMIX_DEBUG=%d", dbg);
139 putenv(dbgvalue);
140 }
141
142
143 if (NULL != info && 0 < (sz = opal_list_get_size(info))) {
144 PMIX_INFO_CREATE(pinfo, sz);
145 n = 0;
146 OPAL_LIST_FOREACH(kv, info, opal_value_t) {
147 (void)opal_string_copy(pinfo[n].key, kv->key, PMIX_MAX_KEYLEN);
148 pmix1_value_load(&pinfo[n].value, kv);
149 ++n;
150 }
151 } else {
152 sz = 0;
153 pinfo = NULL;
154 }
155
156
157
158 job = OBJ_NEW(opal_pmix1_jobid_trkr_t);
159 (void)opal_snprintf_jobid(job->nspace, PMIX_MAX_NSLEN, OPAL_PROC_MY_NAME.jobid);
160 job->jobid = OPAL_PROC_MY_NAME.jobid;
161 opal_list_append(&mca_pmix_ext1x_component.jobids, &job->super);
162
163 if (PMIX_SUCCESS != (rc = PMIx_server_init(&mymodule, pinfo, sz))) {
164 PMIX_INFO_FREE(pinfo, sz);
165 return pmix1_convert_rc(rc);
166 }
167 PMIX_INFO_FREE(pinfo, sz);
168
169
170 host_module = module;
171
172
173 PMIx_Register_errhandler(NULL, 0, myerr, errreg_cbfunc, NULL);
174
175
176
177 active = true;
178 PMIx_server_register_nspace(job->nspace, 1, NULL, 0, op2cbfunc, (void*)&active);
179 PMIX_WAIT_FOR_COMPLETION(active);
180
181 return OPAL_SUCCESS;
182 }
183
184 int pmix1_server_finalize(void)
185 {
186 pmix_status_t rc;
187
188
189 PMIx_Deregister_errhandler(errhdler_ref, NULL, NULL);
190
191 rc = PMIx_server_finalize();
192 return pmix1_convert_rc(rc);
193 }
194
195 int pmix1_server_gen_regex(const char *input, char **regex)
196 {
197 pmix_status_t rc;
198
199 rc = PMIx_generate_regex(input, regex);
200 return pmix1_convert_rc(rc);
201 }
202
203
204 int pmix1_server_gen_ppn(const char *input, char **ppn)
205 {
206 pmix_status_t rc;
207
208 rc = PMIx_generate_ppn(input, ppn);
209 return pmix1_convert_rc(rc);
210 }
211
212 static void opcbfunc(pmix_status_t status, void *cbdata)
213 {
214 pmix1_opcaddy_t *op = (pmix1_opcaddy_t*)cbdata;
215
216 if (NULL != op->opcbfunc) {
217 op->opcbfunc(pmix1_convert_rc(status), op->cbdata);
218 }
219 if (op->active) {
220 op->active = false;
221 } else {
222 OBJ_RELEASE(op);
223 }
224 }
225
226 int pmix1_server_register_nspace(opal_jobid_t jobid,
227 int nlocalprocs,
228 opal_list_t *info,
229 opal_pmix_op_cbfunc_t cbfunc,
230 void *cbdata)
231 {
232 opal_value_t *kv, *k2;
233 pmix_info_t *pinfo, *pmap;
234 size_t sz, szmap, m, n;
235 char nspace[PMIX_MAX_NSLEN];
236 pmix_status_t rc;
237 pmix1_opcaddy_t op;
238 opal_list_t *pmapinfo;
239 opal_pmix1_jobid_trkr_t *job;
240
241
242 (void)opal_snprintf_jobid(nspace, PMIX_MAX_NSLEN, jobid);
243
244
245 job = OBJ_NEW(opal_pmix1_jobid_trkr_t);
246 (void)opal_string_copy(job->nspace, nspace, PMIX_MAX_NSLEN);
247 job->jobid = jobid;
248 opal_list_append(&mca_pmix_ext1x_component.jobids, &job->super);
249
250
251 if (NULL != info && 0 < (sz = opal_list_get_size(info))) {
252 PMIX_INFO_CREATE(pinfo, sz);
253 n = 0;
254 OPAL_LIST_FOREACH(kv, info, opal_value_t) {
255 (void)opal_string_copy(pinfo[n].key, kv->key, PMIX_MAX_KEYLEN);
256 if (0 == strcmp(kv->key, OPAL_PMIX_PROC_DATA)) {
257 pinfo[n].value.type = PMIX_INFO_ARRAY;
258
259
260 pmapinfo = (opal_list_t*)kv->data.ptr;
261 szmap = opal_list_get_size(pmapinfo);
262 if (0 < szmap) {
263 PMIX_INFO_CREATE(pmap, szmap);
264 pinfo[n].value.data.array.array = (struct pmix_info_t*)pmap;
265 pinfo[n].value.data.array.size = szmap;
266 m = 0;
267 OPAL_LIST_FOREACH(k2, pmapinfo, opal_value_t) {
268 (void)opal_string_copy(pmap[m].key, k2->key, PMIX_MAX_KEYLEN);
269 pmix1_value_load(&pmap[m].value, k2);
270 ++m;
271 }
272 }
273 OPAL_LIST_RELEASE(pmapinfo);
274 } else {
275 pmix1_value_load(&pinfo[n].value, kv);
276 }
277 ++n;
278 }
279 } else {
280 sz = 0;
281 pinfo = NULL;
282 }
283
284
285 OBJ_CONSTRUCT(&op, pmix1_opcaddy_t);
286 op.info = pinfo;
287 op.sz = sz;
288 op.opcbfunc = cbfunc;
289 op.cbdata = cbdata;
290 op.active = true;
291 rc = PMIx_server_register_nspace(nspace, nlocalprocs, pinfo, sz,
292 opcbfunc, &op);
293 if (PMIX_SUCCESS == rc) {
294 PMIX_WAIT_FOR_COMPLETION(op.active);
295 }
296 PMIX_INFO_FREE(pinfo, sz);
297 return pmix1_convert_rc(rc);
298 }
299
300 void pmix1_server_deregister_nspace(opal_jobid_t jobid,
301 opal_pmix_op_cbfunc_t cbfunc,
302 void *cbdata)
303 {
304 opal_pmix1_jobid_trkr_t *jptr;
305
306
307 OPAL_LIST_FOREACH(jptr, &mca_pmix_ext1x_component.jobids, opal_pmix1_jobid_trkr_t) {
308 if (jptr->jobid == jobid) {
309
310 PMIx_server_deregister_nspace(jptr->nspace);
311
312 opal_list_remove_item(&mca_pmix_ext1x_component.jobids, &jptr->super);
313 OBJ_RELEASE(jptr);
314 return;
315 }
316 }
317 }
318
319 int pmix1_server_register_client(const opal_process_name_t *proc,
320 uid_t uid, gid_t gid,
321 void *server_object,
322 opal_pmix_op_cbfunc_t cbfunc,
323 void *cbdata)
324 {
325 pmix_status_t rc;
326 pmix1_opcaddy_t *op;
327
328
329 op = OBJ_NEW(pmix1_opcaddy_t);
330 op->opcbfunc = cbfunc;
331 op->cbdata = cbdata;
332
333
334 (void)opal_snprintf_jobid(op->p.nspace, PMIX_MAX_NSLEN, proc->jobid);
335 op->p.rank = proc->vpid;
336
337 rc = PMIx_server_register_client(&op->p, uid, gid, server_object,
338 opcbfunc, op);
339 if (PMIX_SUCCESS != rc) {
340 OBJ_RELEASE(op);
341 }
342 return pmix1_convert_rc(rc);
343 }
344
345 void pmix1_server_deregister_client(const opal_process_name_t *proc,
346 opal_pmix_op_cbfunc_t cbfunc,
347 void *cbdata)
348 {
349 opal_pmix1_jobid_trkr_t *jptr;
350 pmix_proc_t p;
351
352
353 OPAL_LIST_FOREACH(jptr, &mca_pmix_ext1x_component.jobids, opal_pmix1_jobid_trkr_t) {
354 if (jptr->jobid == proc->jobid) {
355
356 (void)opal_string_copy(p.nspace, jptr->nspace, PMIX_MAX_NSLEN);
357 p.rank = proc->vpid;
358 PMIx_server_deregister_client(&p);
359 return;
360 }
361 }
362 }
363
364
365 int pmix1_server_setup_fork(const opal_process_name_t *proc, char ***env)
366 {
367 pmix_status_t rc;
368 pmix_proc_t p;
369
370
371 (void)opal_snprintf_jobid(p.nspace, PMIX_MAX_NSLEN, proc->jobid);
372 p.rank = proc->vpid;
373
374 rc = PMIx_server_setup_fork(&p, env);
375 return pmix1_convert_rc(rc);
376 }
377
378
379
380
381 static void dmdx_response(pmix_status_t status, char *data, size_t sz, void *cbdata)
382 {
383 int rc;
384 pmix1_opcaddy_t *op = (pmix1_opcaddy_t*)cbdata;
385
386 rc = pmix1_convert_rc(status);
387 if (NULL != op->mdxcbfunc) {
388 op->mdxcbfunc(rc, data, sz, op->cbdata, NULL, NULL);
389 }
390 OBJ_RELEASE(op);
391 }
392
393 int pmix1_server_dmodex(const opal_process_name_t *proc,
394 opal_pmix_modex_cbfunc_t cbfunc, void *cbdata)
395 {
396 pmix1_opcaddy_t *op;
397 pmix_status_t rc;
398
399
400 op = OBJ_NEW(pmix1_opcaddy_t);
401 op->mdxcbfunc = cbfunc;
402 op->cbdata = cbdata;
403
404
405 (void)opal_snprintf_jobid(op->p.nspace, PMIX_MAX_NSLEN, proc->jobid);
406 op->p.rank = proc->vpid;
407
408
409 rc = PMIx_server_dmodex_request(&op->p, dmdx_response, op);
410 if (PMIX_SUCCESS != rc) {
411 OBJ_RELEASE(op);
412 }
413 return pmix1_convert_rc(rc);
414 }
415
416 int pmix1_server_notify_error(int status,
417 const opal_process_name_t *source,
418 opal_list_t *info,
419 opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
420 {
421 opal_value_t *kv;
422 pmix_info_t *pinfo;
423 size_t sz, n;
424 pmix_status_t rc;
425 pmix1_opcaddy_t *op;
426
427
428 op = OBJ_NEW(pmix1_opcaddy_t);
429 op->opcbfunc = cbfunc;
430 op->cbdata = cbdata;
431
432
433 if (NULL != info && 0 < (sz = opal_list_get_size(info))) {
434 PMIX_INFO_CREATE(pinfo, sz);
435 n = 0;
436 OPAL_LIST_FOREACH(kv, info, opal_value_t) {
437 (void)opal_string_copy(pinfo[n].key, kv->key, PMIX_MAX_KEYLEN);
438 pmix1_value_load(&pinfo[n].value, kv);
439 }
440 } else {
441 sz = 0;
442 pinfo = NULL;
443 }
444 op->info = pinfo;
445 op->sz = sz;
446
447 rc = pmix1_convert_opalrc(status);
448 rc = PMIx_Notify_error(rc, NULL, 0, NULL, 0,
449 pinfo, sz, opcbfunc, op);
450 if (PMIX_SUCCESS != rc) {
451 OBJ_RELEASE(op);
452 }
453 return pmix1_convert_rc(rc);
454 }