This source file includes following definitions.
- errreg_cbfunc
- opcbfunc
- lkcbfunc
- ext2x_server_init
- dereg_cbfunc
- ext2x_server_finalize
- ext2x_server_gen_regex
- ext2x_server_gen_ppn
- ext2x_server_register_nspace
- ext2x_server_deregister_nspace
- ext2x_server_register_client
- ext2x_server_deregister_client
- ext2x_server_setup_fork
- dmdx_response
- ext2x_server_dmodex
- ext2x_server_notify_event
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 #include "opal_config.h"
23 #include "opal/constants.h"
24 #include "opal/types.h"
25
26 #ifdef HAVE_STRING_H
27 #include <string.h>
28 #endif
29 #ifdef HAVE_UNISTD_H
30 #include <unistd.h>
31 #endif
32
33 #include "opal/dss/dss.h"
34 #include "opal/mca/event/event.h"
35 #include "opal/mca/hwloc/base/base.h"
36 #include "opal/runtime/opal.h"
37 #include "opal/runtime/opal_progress_threads.h"
38 #include "opal/threads/threads.h"
39 #include "opal/util/argv.h"
40 #include "opal/util/error.h"
41 #include "opal/util/output.h"
42 #include "opal/util/opal_environ.h"
43 #include "opal/util/proc.h"
44 #include "opal/util/show_help.h"
45 #include "opal/util/string_copy.h"
46 #include "opal/mca/pmix/base/base.h"
47 #include "ext2x.h"
48
49 #include "pmix.h"
50 #include "pmix_server.h"
51
52
53
54
55
56
57 extern pmix_server_module_t mymodule;
58 extern opal_pmix_server_module_t *host_module;
59 static char *dbgvalue=NULL;
60
61 static void errreg_cbfunc (pmix_status_t status,
62 size_t errhandler_ref,
63 void *cbdata)
64 {
65 opal_ext2x_event_t *ev = (opal_ext2x_event_t*)cbdata;
66
67 OPAL_ACQUIRE_OBJECT(ev);
68 ev->index = errhandler_ref;
69 opal_output_verbose(5, opal_pmix_base_framework.framework_output,
70 "PMIX server errreg_cbfunc - error handler registered status=%d, reference=%lu",
71 status, (unsigned long)errhandler_ref);
72 OPAL_POST_OBJECT(ev);
73 OPAL_PMIX_WAKEUP_THREAD(&ev->lock);
74 }
75
76 static void opcbfunc(pmix_status_t status, void *cbdata)
77 {
78 ext2x_opcaddy_t *op = (ext2x_opcaddy_t*)cbdata;
79
80 OPAL_ACQUIRE_OBJECT(op);
81
82 if (NULL != op->opcbfunc) {
83 op->opcbfunc(ext2x_convert_rc(status), op->cbdata);
84 }
85 OBJ_RELEASE(op);
86 }
87
88 static void lkcbfunc(pmix_status_t status, void *cbdata)
89 {
90 opal_pmix_lock_t *lk = (opal_pmix_lock_t*)cbdata;
91
92 OPAL_POST_OBJECT(lk);
93 OPAL_PMIX_WAKEUP_THREAD(lk);
94 }
95
96 int ext2x_server_init(opal_pmix_server_module_t *module,
97 opal_list_t *info)
98 {
99 pmix_status_t rc;
100 int dbg;
101 opal_value_t *kv;
102 pmix_info_t *pinfo;
103 size_t sz, n;
104 opal_ext2x_event_t *event;
105 opal_ext2x_jobid_trkr_t *job;
106 opal_pmix_lock_t lk;
107
108 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
109
110 if (0 == opal_pmix_base.initialized) {
111 if (0 < (dbg = opal_output_get_verbosity(opal_pmix_base_framework.framework_output))) {
112 asprintf(&dbgvalue, "PMIX_DEBUG=%d", dbg);
113 putenv(dbgvalue);
114 }
115
116 if (OPAL_SUCCESS != (dbg = opal_pmix_ext2x_check_evars())) {
117 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
118 return dbg;
119 }
120 }
121 ++opal_pmix_base.initialized;
122
123
124 sz = 2 + ((NULL==info)?0:opal_list_get_size(info));
125 PMIX_INFO_CREATE(pinfo, sz);
126 n = 0;
127 if (NULL != info) {
128 OPAL_LIST_FOREACH(kv, info, opal_value_t) {
129 (void)opal_string_copy(pinfo[n].key, kv->key, PMIX_MAX_KEYLEN);
130 ext2x_value_load(&pinfo[n].value, kv);
131 ++n;
132 }
133 }
134
135
136
137 job = OBJ_NEW(opal_ext2x_jobid_trkr_t);
138 (void)opal_snprintf_jobid(job->nspace, PMIX_MAX_NSLEN, OPAL_PROC_MY_NAME.jobid);
139 job->jobid = OPAL_PROC_MY_NAME.jobid;
140 opal_list_append(&mca_pmix_ext2x_component.jobids, &job->super);
141 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
142
143
144 PMIX_INFO_LOAD(&pinfo[sz-2], PMIX_SERVER_NSPACE, job->nspace, PMIX_STRING);
145 PMIX_INFO_LOAD(&pinfo[sz-1], PMIX_SERVER_RANK, &OPAL_PROC_MY_NAME.vpid, PMIX_PROC_RANK);
146 if (PMIX_SUCCESS != (rc = PMIx_server_init(&mymodule, pinfo, sz))) {
147 PMIX_INFO_FREE(pinfo, sz);
148 return ext2x_convert_rc(rc);
149 }
150 PMIX_INFO_FREE(pinfo, sz);
151
152
153 host_module = module;
154
155
156 event = OBJ_NEW(opal_ext2x_event_t);
157 opal_list_append(&mca_pmix_ext2x_component.events, &event->super);
158 PMIX_INFO_CREATE(pinfo, 1);
159 PMIX_INFO_LOAD(&pinfo[0], PMIX_EVENT_HDLR_NAME, "OPAL-PMIX-2X-SERVER-DEFAULT", PMIX_STRING);
160 PMIx_Register_event_handler(NULL, 0, pinfo, 1, ext2x_event_hdlr, errreg_cbfunc, (void*)event);
161 OPAL_PMIX_WAIT_THREAD(&event->lock);
162 PMIX_INFO_FREE(pinfo, 1);
163
164
165
166 OPAL_PMIX_CONSTRUCT_LOCK(&lk);
167 PMIX_INFO_CREATE(pinfo, 1);
168 PMIX_INFO_LOAD(&pinfo[0], PMIX_REGISTER_NODATA, NULL, PMIX_BOOL);
169 PMIx_server_register_nspace(job->nspace, 1, pinfo, 1, lkcbfunc, (void*)&lk);
170 OPAL_PMIX_WAIT_THREAD(&lk);
171 OPAL_PMIX_DESTRUCT_LOCK(&lk);
172 PMIX_INFO_FREE(pinfo, 1);
173
174 return OPAL_SUCCESS;
175 }
176
177 static void dereg_cbfunc(pmix_status_t st, void *cbdata)
178 {
179 opal_ext2x_event_t *ev = (opal_ext2x_event_t*)cbdata;
180 OPAL_PMIX_WAKEUP_THREAD(&ev->lock);
181 }
182
183 int ext2x_server_finalize(void)
184 {
185 pmix_status_t rc;
186 opal_ext2x_event_t *event, *ev2;
187 opal_list_t evlist;
188 OBJ_CONSTRUCT(&evlist, opal_list_t);
189
190 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
191 --opal_pmix_base.initialized;
192
193 if (0 < opal_pmix_base.initialized) {
194
195 OPAL_LIST_FOREACH_SAFE(event, ev2, &mca_pmix_ext2x_component.events, opal_ext2x_event_t) {
196 OPAL_PMIX_DESTRUCT_LOCK(&event->lock);
197 OPAL_PMIX_CONSTRUCT_LOCK(&event->lock);
198 PMIx_Deregister_event_handler(event->index, dereg_cbfunc, (void*)event);
199 opal_list_remove_item(&mca_pmix_ext2x_component.events, &event->super);
200
201
202 opal_list_append(&evlist, &event->super);
203 }
204 }
205 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
206 OPAL_LIST_FOREACH_SAFE(event, ev2, &evlist, opal_ext2x_event_t) {
207 OPAL_PMIX_WAIT_THREAD(&event->lock);
208 opal_list_remove_item(&evlist, &event->super);
209 OBJ_RELEASE(event);
210 }
211 OBJ_DESTRUCT(&evlist);
212 rc = PMIx_server_finalize();
213 return ext2x_convert_rc(rc);
214 }
215
216 int ext2x_server_gen_regex(const char *input, char **regex)
217 {
218 pmix_status_t rc;
219
220 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
221 if (0 >= opal_pmix_base.initialized) {
222 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
223 return OPAL_ERR_NOT_INITIALIZED;
224 }
225 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
226
227 rc = PMIx_generate_regex(input, regex);
228 return ext2x_convert_rc(rc);
229 }
230
231
232 int ext2x_server_gen_ppn(const char *input, char **ppn)
233 {
234 pmix_status_t rc;
235
236 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
237 if (0 >= opal_pmix_base.initialized) {
238 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
239 return OPAL_ERR_NOT_INITIALIZED;
240 }
241 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
242
243 rc = PMIx_generate_ppn(input, ppn);
244 return ext2x_convert_rc(rc);
245 }
246
247 int ext2x_server_register_nspace(opal_jobid_t jobid,
248 int nlocalprocs,
249 opal_list_t *info,
250 opal_pmix_op_cbfunc_t cbfunc,
251 void *cbdata)
252 {
253 opal_value_t *kv, *k2;
254 pmix_info_t *pinfo = NULL, *pmap;
255 size_t sz, szmap, m, n;
256 char nspace[PMIX_MAX_NSLEN];
257 pmix_status_t rc;
258 opal_list_t *pmapinfo;
259 opal_ext2x_jobid_trkr_t *job;
260 opal_pmix_lock_t lock;
261 int ret;
262
263 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
264 if (0 >= opal_pmix_base.initialized) {
265 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
266 return OPAL_ERR_NOT_INITIALIZED;
267 }
268
269
270 (void)opal_snprintf_jobid(nspace, PMIX_MAX_NSLEN, jobid);
271
272
273 job = OBJ_NEW(opal_ext2x_jobid_trkr_t);
274 (void)opal_string_copy(job->nspace, nspace, PMIX_MAX_NSLEN);
275 job->jobid = jobid;
276 opal_list_append(&mca_pmix_ext2x_component.jobids, &job->super);
277 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
278
279
280 if (NULL != info && 0 < (sz = opal_list_get_size(info))) {
281 PMIX_INFO_CREATE(pinfo, sz);
282 n = 0;
283 OPAL_LIST_FOREACH(kv, info, opal_value_t) {
284 (void)opal_string_copy(pinfo[n].key, kv->key, PMIX_MAX_KEYLEN);
285 if (0 == strcmp(kv->key, OPAL_PMIX_PROC_DATA)) {
286 pinfo[n].value.type = PMIX_DATA_ARRAY;
287
288
289 pmapinfo = (opal_list_t*)kv->data.ptr;
290 szmap = opal_list_get_size(pmapinfo);
291 if (0 < szmap) {
292 PMIX_INFO_CREATE(pmap, szmap);
293 pinfo[n].value.data.darray = (pmix_data_array_t*)calloc(1, sizeof(pmix_data_array_t));
294 pinfo[n].value.data.darray->type = PMIX_INFO;
295 pinfo[n].value.data.darray->array = (struct pmix_info_t*)pmap;
296 pinfo[n].value.data.darray->size = szmap;
297 m = 0;
298 OPAL_LIST_FOREACH(k2, pmapinfo, opal_value_t) {
299 (void)opal_string_copy(pmap[m].key, k2->key, PMIX_MAX_KEYLEN);
300 ext2x_value_load(&pmap[m].value, k2);
301 ++m;
302 }
303 }
304 OPAL_LIST_RELEASE(pmapinfo);
305 } else {
306 ext2x_value_load(&pinfo[n].value, kv);
307 }
308 ++n;
309 }
310 } else {
311 sz = 0;
312 pinfo = NULL;
313 }
314
315 OPAL_PMIX_CONSTRUCT_LOCK(&lock);
316 rc = PMIx_server_register_nspace(nspace, nlocalprocs, pinfo, sz,
317 lkcbfunc, (void*)&lock);
318 if (PMIX_SUCCESS == rc) {
319 OPAL_PMIX_WAIT_THREAD(&lock);
320 }
321 OPAL_PMIX_DESTRUCT_LOCK(&lock);
322
323 if (NULL != pinfo) {
324 PMIX_INFO_FREE(pinfo, sz);
325 }
326
327 ret = ext2x_convert_rc(rc);
328
329
330 if (NULL != cbfunc) {
331 cbfunc(ret, cbdata);
332 }
333 return ret;
334 }
335
336 void ext2x_server_deregister_nspace(opal_jobid_t jobid,
337 opal_pmix_op_cbfunc_t cbfunc,
338 void *cbdata)
339 {
340 opal_ext2x_jobid_trkr_t *jptr;
341 opal_pmix_lock_t lock;
342
343 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
344 if (0 >= opal_pmix_base.initialized) {
345 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
346
347 if (NULL != cbfunc) {
348 cbfunc(OPAL_ERR_NOT_INITIALIZED, cbdata);
349 }
350 return;
351 }
352
353
354 OPAL_LIST_FOREACH(jptr, &mca_pmix_ext2x_component.jobids, opal_ext2x_jobid_trkr_t) {
355 if (jptr->jobid == jobid) {
356
357 OPAL_PMIX_CONSTRUCT_LOCK(&lock);
358 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
359 PMIx_server_deregister_nspace(jptr->nspace, lkcbfunc, (void*)&lock);
360 OPAL_PMIX_WAIT_THREAD(&lock);
361 OPAL_PMIX_DESTRUCT_LOCK(&lock);
362
363 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
364 opal_list_remove_item(&mca_pmix_ext2x_component.jobids, &jptr->super);
365 OBJ_RELEASE(jptr);
366 break;
367 }
368 }
369
370 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
371
372 if (NULL != cbfunc) {
373 cbfunc(OPAL_SUCCESS, cbdata);
374 }
375 }
376
377 int ext2x_server_register_client(const opal_process_name_t *proc,
378 uid_t uid, gid_t gid,
379 void *server_object,
380 opal_pmix_op_cbfunc_t cbfunc,
381 void *cbdata)
382 {
383 pmix_status_t rc;
384 pmix_proc_t p;
385 opal_pmix_lock_t lock;
386
387 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
388 if (0 >= opal_pmix_base.initialized) {
389 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
390 return OPAL_ERR_NOT_INITIALIZED;
391 }
392 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
393
394
395 (void)opal_snprintf_jobid(p.nspace, PMIX_MAX_NSLEN, proc->jobid);
396 p.rank = ext2x_convert_opalrank(proc->vpid);
397
398 OPAL_PMIX_CONSTRUCT_LOCK(&lock);
399 rc = PMIx_server_register_client(&p, uid, gid, server_object,
400 lkcbfunc, (void*)&lock);
401 if (PMIX_SUCCESS == rc) {
402 OPAL_PMIX_WAIT_THREAD(&lock);
403 }
404 OPAL_PMIX_DESTRUCT_LOCK(&lock);
405 return ext2x_convert_rc(rc);
406 }
407
408
409
410 void ext2x_server_deregister_client(const opal_process_name_t *proc,
411 opal_pmix_op_cbfunc_t cbfunc,
412 void *cbdata)
413 {
414 opal_ext2x_jobid_trkr_t *jptr;
415 pmix_proc_t p;
416 opal_pmix_lock_t lock;
417
418 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
419 if (0 >= opal_pmix_base.initialized) {
420 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
421 if (NULL != cbfunc) {
422 cbfunc(OPAL_ERR_NOT_INITIALIZED, cbdata);
423 }
424 return;
425 }
426
427
428 OPAL_LIST_FOREACH(jptr, &mca_pmix_ext2x_component.jobids, opal_ext2x_jobid_trkr_t) {
429 if (jptr->jobid == proc->jobid) {
430
431 (void)opal_string_copy(p.nspace, jptr->nspace, PMIX_MAX_NSLEN);
432 p.rank = ext2x_convert_opalrank(proc->vpid);
433 OPAL_PMIX_CONSTRUCT_LOCK(&lock);
434 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
435 PMIx_server_deregister_client(&p, lkcbfunc, (void*)&lock);
436 OPAL_PMIX_WAIT_THREAD(&lock);
437 OPAL_PMIX_DESTRUCT_LOCK(&lock);
438 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
439 break;
440 }
441 }
442 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
443 if (NULL != cbfunc) {
444 cbfunc(OPAL_SUCCESS, cbdata);
445 }
446 }
447
448
449 int ext2x_server_setup_fork(const opal_process_name_t *proc, char ***env)
450 {
451 pmix_status_t rc;
452 pmix_proc_t p;
453
454 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
455 if (0 >= opal_pmix_base.initialized) {
456 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
457 return OPAL_ERR_NOT_INITIALIZED;
458 }
459 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
460
461
462 (void)opal_snprintf_jobid(p.nspace, PMIX_MAX_NSLEN, proc->jobid);
463 p.rank = ext2x_convert_opalrank(proc->vpid);
464
465 rc = PMIx_server_setup_fork(&p, env);
466 return ext2x_convert_rc(rc);
467 }
468
469
470
471
472 static void dmdx_response(pmix_status_t status, char *data, size_t sz, void *cbdata)
473 {
474 int rc;
475 ext2x_opcaddy_t *op = (ext2x_opcaddy_t*)cbdata;
476
477 rc = ext2x_convert_rc(status);
478 if (NULL != op->mdxcbfunc) {
479 op->mdxcbfunc(rc, data, sz, op->cbdata, NULL, NULL);
480 }
481 OBJ_RELEASE(op);
482 }
483
484
485 int ext2x_server_dmodex(const opal_process_name_t *proc,
486 opal_pmix_modex_cbfunc_t cbfunc, void *cbdata)
487 {
488 ext2x_opcaddy_t *op;
489 pmix_status_t rc;
490
491 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
492 if (0 >= opal_pmix_base.initialized) {
493 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
494 return OPAL_ERR_NOT_INITIALIZED;
495 }
496 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
497
498
499 op = OBJ_NEW(ext2x_opcaddy_t);
500 op->mdxcbfunc = cbfunc;
501 op->cbdata = cbdata;
502
503
504 (void)opal_snprintf_jobid(op->p.nspace, PMIX_MAX_NSLEN, proc->jobid);
505 op->p.rank = ext2x_convert_opalrank(proc->vpid);
506
507
508 rc = PMIx_server_dmodex_request(&op->p, dmdx_response, op);
509 if (PMIX_SUCCESS != rc) {
510 OBJ_RELEASE(op);
511 }
512 return ext2x_convert_rc(rc);
513 }
514
515
516 int ext2x_server_notify_event(int status,
517 const opal_process_name_t *source,
518 opal_list_t *info,
519 opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
520 {
521 opal_value_t *kv;
522 pmix_info_t *pinfo;
523 size_t sz, n;
524 pmix_status_t rc;
525 ext2x_opcaddy_t *op;
526
527 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
528 if (0 >= opal_pmix_base.initialized) {
529 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
530 return OPAL_ERR_NOT_INITIALIZED;
531 }
532 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
533
534
535 if (NULL != info && 0 < (sz = opal_list_get_size(info))) {
536 PMIX_INFO_CREATE(pinfo, sz);
537 n = 0;
538 OPAL_LIST_FOREACH(kv, info, opal_value_t) {
539 (void)opal_string_copy(pinfo[n].key, kv->key, PMIX_MAX_KEYLEN);
540 ext2x_value_load(&pinfo[n].value, kv);
541 ++n;
542 }
543 } else {
544 sz = 0;
545 pinfo = NULL;
546 }
547
548 op = OBJ_NEW(ext2x_opcaddy_t);
549 op->info = pinfo;
550 op->sz = sz;
551 op->opcbfunc = cbfunc;
552 op->cbdata = cbdata;
553
554 if (NULL == source) {
555 (void)opal_snprintf_jobid(op->p.nspace, PMIX_MAX_NSLEN, OPAL_JOBID_INVALID);
556 op->p.rank = ext2x_convert_opalrank(OPAL_VPID_INVALID);
557 } else {
558 (void)opal_snprintf_jobid(op->p.nspace, PMIX_MAX_NSLEN, source->jobid);
559 op->p.rank = ext2x_convert_opalrank(source->vpid);
560 }
561
562
563 rc = ext2x_convert_opalrc(status);
564
565
566 rc = PMIx_Notify_event(rc, &op->p, PMIX_RANGE_SESSION,
567 pinfo, sz, opcbfunc, op);
568 if (PMIX_SUCCESS != rc) {
569 OBJ_RELEASE(op);
570 }
571 return ext2x_convert_rc(rc);
572 }