This source file includes following definitions.
- errreg_cbfunc
- opcbfunc
- lkcbfunc
- pmix4x_server_init
- dereg_cbfunc
- pmix4x_server_finalize
- pmix4x_server_gen_regex
- pmix4x_server_gen_ppn
- pmix4x_server_register_nspace
- pmix4x_server_deregister_nspace
- pmix4x_server_register_client
- pmix4x_server_deregister_client
- pmix4x_server_setup_fork
- dmdx_response
- pmix4x_server_dmodex
- pmix4x_server_notify_event
- pmix4x_server_iof_push
- final_cleanup
- setup_cbfunc
- pmix4x_server_setup_application
- pmix4x_server_setup_local_support
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 #include "opal_config.h"
24 #include "opal/constants.h"
25 #include "opal/types.h"
26
27 #ifdef HAVE_STRING_H
28 #include <string.h>
29 #endif
30 #ifdef HAVE_UNISTD_H
31 #include <unistd.h>
32 #endif
33
34 #include "opal/dss/dss.h"
35 #include "opal/mca/event/event.h"
36 #include "opal/mca/hwloc/base/base.h"
37 #include "opal/runtime/opal.h"
38 #include "opal/runtime/opal_progress_threads.h"
39 #include "opal/threads/threads.h"
40 #include "opal/util/argv.h"
41 #include "opal/util/error.h"
42 #include "opal/util/output.h"
43 #include "opal/util/opal_environ.h"
44 #include "opal/util/proc.h"
45 #include "opal/util/show_help.h"
46 #include "opal/mca/pmix/base/base.h"
47 #include "pmix4x.h"
48
49 #include "pmix.h"
50 #include "pmix_server.h"
51
52
53
54
55
56
57 extern pmix_server_module_t mymodule;
58 extern opal_pmix_server_module_t *host_module;
59 static char *dbgvalue=NULL;
60
61 static void errreg_cbfunc (pmix_status_t status,
62 size_t errhandler_ref,
63 void *cbdata)
64 {
65 opal_pmix4x_event_t *ev = (opal_pmix4x_event_t*)cbdata;
66
67 OPAL_ACQUIRE_OBJECT(ev);
68 ev->index = errhandler_ref;
69 opal_output_verbose(5, opal_pmix_base_framework.framework_output,
70 "PMIX server errreg_cbfunc - error handler registered status=%d, reference=%lu",
71 status, (unsigned long)errhandler_ref);
72 OPAL_POST_OBJECT(ev);
73 OPAL_PMIX_WAKEUP_THREAD(&ev->lock);
74 }
75
76 static void opcbfunc(pmix_status_t status, void *cbdata)
77 {
78 pmix4x_opcaddy_t *op = (pmix4x_opcaddy_t*)cbdata;
79
80 OPAL_ACQUIRE_OBJECT(op);
81
82 if (NULL != op->opcbfunc) {
83 op->opcbfunc(pmix4x_convert_rc(status), op->cbdata);
84 }
85 OBJ_RELEASE(op);
86 }
87
88 static void lkcbfunc(pmix_status_t status, void *cbdata)
89 {
90 opal_pmix_lock_t *lk = (opal_pmix_lock_t*)cbdata;
91
92 OPAL_POST_OBJECT(lk);
93 lk->status = pmix4x_convert_rc(status);
94 OPAL_PMIX_WAKEUP_THREAD(lk);
95 }
96
97 int pmix4x_server_init(opal_pmix_server_module_t *module,
98 opal_list_t *info)
99 {
100 pmix_status_t rc;
101 int dbg;
102 opal_value_t *kv;
103 pmix_info_t *pinfo;
104 size_t sz, n;
105 opal_pmix4x_event_t *event;
106 opal_pmix4x_jobid_trkr_t *job;
107 opal_pmix_lock_t lk;
108
109 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
110
111 if (0 == opal_pmix_base.initialized) {
112 if (0 < (dbg = opal_output_get_verbosity(opal_pmix_base_framework.framework_output))) {
113 asprintf(&dbgvalue, "PMIX_DEBUG=%d", dbg);
114 putenv(dbgvalue);
115 }
116
117 if (OPAL_SUCCESS != (dbg = opal_pmix_pmix4x_check_evars())) {
118 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
119 return dbg;
120 }
121 }
122 ++opal_pmix_base.initialized;
123
124
125 sz = 2 + ((NULL==info)?0:opal_list_get_size(info));
126 PMIX_INFO_CREATE(pinfo, sz);
127 n = 0;
128 if (NULL != info) {
129 OPAL_LIST_FOREACH(kv, info, opal_value_t) {
130 (void)strncpy(pinfo[n].key, kv->key, PMIX_MAX_KEYLEN);
131 pmix4x_value_load(&pinfo[n].value, kv);
132 ++n;
133 }
134 }
135
136
137
138 job = OBJ_NEW(opal_pmix4x_jobid_trkr_t);
139 (void)opal_snprintf_jobid(job->nspace, PMIX_MAX_NSLEN, OPAL_PROC_MY_NAME.jobid);
140 job->jobid = OPAL_PROC_MY_NAME.jobid;
141 opal_list_append(&mca_pmix_pmix4x_component.jobids, &job->super);
142 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
143
144
145 PMIX_INFO_LOAD(&pinfo[sz-2], PMIX_SERVER_NSPACE, job->nspace, PMIX_STRING);
146 PMIX_INFO_LOAD(&pinfo[sz-1], PMIX_SERVER_RANK, &OPAL_PROC_MY_NAME.vpid, PMIX_PROC_RANK);
147 if (PMIX_SUCCESS != (rc = PMIx_server_init(&mymodule, pinfo, sz))) {
148 PMIX_INFO_FREE(pinfo, sz);
149 return pmix4x_convert_rc(rc);
150 }
151 PMIX_INFO_FREE(pinfo, sz);
152
153
154 host_module = module;
155
156
157 event = OBJ_NEW(opal_pmix4x_event_t);
158 opal_list_append(&mca_pmix_pmix4x_component.events, &event->super);
159 PMIX_INFO_CREATE(pinfo, 1);
160 PMIX_INFO_LOAD(&pinfo[0], PMIX_EVENT_HDLR_NAME, "OPAL-PMIX-2X-SERVER-DEFAULT", PMIX_STRING);
161 PMIx_Register_event_handler(NULL, 0, pinfo, 1, pmix4x_event_hdlr, errreg_cbfunc, (void*)event);
162 OPAL_PMIX_WAIT_THREAD(&event->lock);
163 PMIX_INFO_FREE(pinfo, 1);
164
165
166
167 OPAL_PMIX_CONSTRUCT_LOCK(&lk);
168 PMIX_INFO_CREATE(pinfo, 1);
169 PMIX_INFO_LOAD(&pinfo[0], PMIX_REGISTER_NODATA, NULL, PMIX_BOOL);
170 PMIx_server_register_nspace(job->nspace, 1, pinfo, 1, lkcbfunc, (void*)&lk);
171 OPAL_PMIX_WAIT_THREAD(&lk);
172 OPAL_PMIX_DESTRUCT_LOCK(&lk);
173 PMIX_INFO_FREE(pinfo, 1);
174
175 return OPAL_SUCCESS;
176 }
177
178 static void dereg_cbfunc(pmix_status_t st, void *cbdata)
179 {
180 opal_pmix4x_event_t *ev = (opal_pmix4x_event_t*)cbdata;
181 OPAL_PMIX_WAKEUP_THREAD(&ev->lock);
182 }
183
184 int pmix4x_server_finalize(void)
185 {
186 pmix_status_t rc;
187 opal_pmix4x_event_t *event, *ev2;
188 opal_list_t evlist;
189 OBJ_CONSTRUCT(&evlist, opal_list_t);
190
191 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
192 --opal_pmix_base.initialized;
193
194 if (0 < opal_pmix_base.initialized) {
195
196 OPAL_LIST_FOREACH_SAFE(event, ev2, &mca_pmix_pmix4x_component.events, opal_pmix4x_event_t) {
197 OPAL_PMIX_DESTRUCT_LOCK(&event->lock);
198 OPAL_PMIX_CONSTRUCT_LOCK(&event->lock);
199 PMIx_Deregister_event_handler(event->index, dereg_cbfunc, (void*)event);
200 opal_list_remove_item(&mca_pmix_pmix4x_component.events, &event->super);
201
202
203 opal_list_append(&evlist, &event->super);
204 }
205 }
206 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
207 OPAL_LIST_FOREACH_SAFE(event, ev2, &evlist, opal_pmix4x_event_t) {
208 OPAL_PMIX_WAIT_THREAD(&event->lock);
209 opal_list_remove_item(&evlist, &event->super);
210 OBJ_RELEASE(event);
211 }
212 OBJ_DESTRUCT(&evlist);
213 rc = PMIx_server_finalize();
214 return pmix4x_convert_rc(rc);
215 }
216
217 int pmix4x_server_gen_regex(const char *input, char **regex)
218 {
219 pmix_status_t rc;
220
221 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
222 if (0 >= opal_pmix_base.initialized) {
223 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
224 return OPAL_ERR_NOT_INITIALIZED;
225 }
226 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
227
228 rc = PMIx_generate_regex(input, regex);
229 return pmix4x_convert_rc(rc);
230 }
231
232
233 int pmix4x_server_gen_ppn(const char *input, char **ppn)
234 {
235 pmix_status_t rc;
236
237 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
238 if (0 >= opal_pmix_base.initialized) {
239 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
240 return OPAL_ERR_NOT_INITIALIZED;
241 }
242 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
243
244 rc = PMIx_generate_ppn(input, ppn);
245 return pmix4x_convert_rc(rc);
246 }
247
248 int pmix4x_server_register_nspace(opal_jobid_t jobid,
249 int nlocalprocs,
250 opal_list_t *info,
251 opal_pmix_op_cbfunc_t cbfunc,
252 void *cbdata)
253 {
254 opal_value_t *kv, *k2;
255 pmix_info_t *pinfo = NULL, *pmap;
256 size_t sz, szmap, m, n;
257 char nspace[PMIX_MAX_NSLEN];
258 pmix_status_t rc;
259 opal_list_t *pmapinfo;
260 opal_pmix4x_jobid_trkr_t *job;
261 opal_pmix_lock_t lock;
262 int ret;
263
264 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
265 if (0 >= opal_pmix_base.initialized) {
266 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
267 return OPAL_ERR_NOT_INITIALIZED;
268 }
269
270
271 (void)opal_snprintf_jobid(nspace, PMIX_MAX_NSLEN, jobid);
272
273
274 job = OBJ_NEW(opal_pmix4x_jobid_trkr_t);
275 (void)strncpy(job->nspace, nspace, PMIX_MAX_NSLEN);
276 job->jobid = jobid;
277 opal_list_append(&mca_pmix_pmix4x_component.jobids, &job->super);
278 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
279
280
281 if (NULL != info && 0 < (sz = opal_list_get_size(info))) {
282 PMIX_INFO_CREATE(pinfo, sz);
283 n = 0;
284 OPAL_LIST_FOREACH(kv, info, opal_value_t) {
285 (void)strncpy(pinfo[n].key, kv->key, PMIX_MAX_KEYLEN);
286 if (0 == strcmp(kv->key, OPAL_PMIX_PROC_DATA)) {
287 pinfo[n].value.type = PMIX_DATA_ARRAY;
288
289
290 pmapinfo = (opal_list_t*)kv->data.ptr;
291 szmap = opal_list_get_size(pmapinfo);
292 if (0 < szmap) {
293 PMIX_INFO_CREATE(pmap, szmap);
294 pinfo[n].value.data.darray = (pmix_data_array_t*)calloc(1, sizeof(pmix_data_array_t));
295 pinfo[n].value.data.darray->type = PMIX_INFO;
296 pinfo[n].value.data.darray->array = (struct pmix_info_t*)pmap;
297 pinfo[n].value.data.darray->size = szmap;
298 m = 0;
299 OPAL_LIST_FOREACH(k2, pmapinfo, opal_value_t) {
300 (void)strncpy(pmap[m].key, k2->key, PMIX_MAX_KEYLEN);
301 pmix4x_value_load(&pmap[m].value, k2);
302 ++m;
303 }
304 }
305 OPAL_LIST_RELEASE(pmapinfo);
306 } else {
307 pmix4x_value_load(&pinfo[n].value, kv);
308 }
309 ++n;
310 }
311 } else {
312 sz = 0;
313 pinfo = NULL;
314 }
315
316 OPAL_PMIX_CONSTRUCT_LOCK(&lock);
317 rc = PMIx_server_register_nspace(nspace, nlocalprocs, pinfo, sz,
318 lkcbfunc, (void*)&lock);
319 if (PMIX_SUCCESS == rc) {
320 OPAL_PMIX_WAIT_THREAD(&lock);
321 }
322 OPAL_PMIX_DESTRUCT_LOCK(&lock);
323
324 if (NULL != pinfo) {
325 PMIX_INFO_FREE(pinfo, sz);
326 }
327
328 ret = pmix4x_convert_rc(rc);
329
330
331 if (NULL != cbfunc) {
332 cbfunc(ret, cbdata);
333 }
334 return ret;
335 }
336
337 void pmix4x_server_deregister_nspace(opal_jobid_t jobid,
338 opal_pmix_op_cbfunc_t cbfunc,
339 void *cbdata)
340 {
341 opal_pmix4x_jobid_trkr_t *jptr;
342 opal_pmix_lock_t lock;
343
344 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
345 if (0 >= opal_pmix_base.initialized) {
346 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
347
348 if (NULL != cbfunc) {
349 cbfunc(OPAL_ERR_NOT_INITIALIZED, cbdata);
350 }
351 return;
352 }
353
354
355 OPAL_LIST_FOREACH(jptr, &mca_pmix_pmix4x_component.jobids, opal_pmix4x_jobid_trkr_t) {
356 if (jptr->jobid == jobid) {
357
358 OPAL_PMIX_CONSTRUCT_LOCK(&lock);
359 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
360 PMIx_server_deregister_nspace(jptr->nspace, lkcbfunc, (void*)&lock);
361 OPAL_PMIX_WAIT_THREAD(&lock);
362 OPAL_PMIX_DESTRUCT_LOCK(&lock);
363
364 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
365 opal_list_remove_item(&mca_pmix_pmix4x_component.jobids, &jptr->super);
366 OBJ_RELEASE(jptr);
367 break;
368 }
369 }
370
371 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
372
373 if (NULL != cbfunc) {
374 cbfunc(OPAL_SUCCESS, cbdata);
375 }
376 }
377
378 int pmix4x_server_register_client(const opal_process_name_t *proc,
379 uid_t uid, gid_t gid,
380 void *server_object,
381 opal_pmix_op_cbfunc_t cbfunc,
382 void *cbdata)
383 {
384 pmix_status_t rc;
385 pmix_proc_t p;
386 opal_pmix_lock_t lock;
387
388 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
389 if (0 >= opal_pmix_base.initialized) {
390 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
391 return OPAL_ERR_NOT_INITIALIZED;
392 }
393 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
394
395
396 (void)opal_snprintf_jobid(p.nspace, PMIX_MAX_NSLEN, proc->jobid);
397 p.rank = pmix4x_convert_opalrank(proc->vpid);
398
399 OPAL_PMIX_CONSTRUCT_LOCK(&lock);
400 rc = PMIx_server_register_client(&p, uid, gid, server_object,
401 lkcbfunc, (void*)&lock);
402 if (PMIX_SUCCESS == rc) {
403 OPAL_PMIX_WAIT_THREAD(&lock);
404 }
405 OPAL_PMIX_DESTRUCT_LOCK(&lock);
406 return pmix4x_convert_rc(rc);
407 }
408
409
410
411 void pmix4x_server_deregister_client(const opal_process_name_t *proc,
412 opal_pmix_op_cbfunc_t cbfunc,
413 void *cbdata)
414 {
415 opal_pmix4x_jobid_trkr_t *jptr;
416 pmix_proc_t p;
417 opal_pmix_lock_t lock;
418
419 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
420 if (0 >= opal_pmix_base.initialized) {
421 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
422 if (NULL != cbfunc) {
423 cbfunc(OPAL_ERR_NOT_INITIALIZED, cbdata);
424 }
425 return;
426 }
427
428
429 OPAL_LIST_FOREACH(jptr, &mca_pmix_pmix4x_component.jobids, opal_pmix4x_jobid_trkr_t) {
430 if (jptr->jobid == proc->jobid) {
431
432 (void)strncpy(p.nspace, jptr->nspace, PMIX_MAX_NSLEN);
433 p.rank = pmix4x_convert_opalrank(proc->vpid);
434 OPAL_PMIX_CONSTRUCT_LOCK(&lock);
435 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
436 PMIx_server_deregister_client(&p, lkcbfunc, (void*)&lock);
437 OPAL_PMIX_WAIT_THREAD(&lock);
438 OPAL_PMIX_DESTRUCT_LOCK(&lock);
439 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
440 break;
441 }
442 }
443 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
444 if (NULL != cbfunc) {
445 cbfunc(OPAL_SUCCESS, cbdata);
446 }
447 }
448
449
450 int pmix4x_server_setup_fork(const opal_process_name_t *proc, char ***env)
451 {
452 pmix_status_t rc;
453 pmix_proc_t p;
454
455 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
456 if (0 >= opal_pmix_base.initialized) {
457 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
458 return OPAL_ERR_NOT_INITIALIZED;
459 }
460 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
461
462
463 (void)opal_snprintf_jobid(p.nspace, PMIX_MAX_NSLEN, proc->jobid);
464 p.rank = pmix4x_convert_opalrank(proc->vpid);
465
466 rc = PMIx_server_setup_fork(&p, env);
467 return pmix4x_convert_rc(rc);
468 }
469
470
471
472
473 static void dmdx_response(pmix_status_t status, char *data, size_t sz, void *cbdata)
474 {
475 int rc;
476 pmix4x_opcaddy_t *op = (pmix4x_opcaddy_t*)cbdata;
477
478 rc = pmix4x_convert_rc(status);
479 if (NULL != op->mdxcbfunc) {
480 op->mdxcbfunc(rc, data, sz, op->cbdata, NULL, NULL);
481 }
482 OBJ_RELEASE(op);
483 }
484
485
486 int pmix4x_server_dmodex(const opal_process_name_t *proc,
487 opal_pmix_modex_cbfunc_t cbfunc, void *cbdata)
488 {
489 pmix4x_opcaddy_t *op;
490 pmix_status_t rc;
491
492 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
493 if (0 >= opal_pmix_base.initialized) {
494 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
495 return OPAL_ERR_NOT_INITIALIZED;
496 }
497 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
498
499
500 op = OBJ_NEW(pmix4x_opcaddy_t);
501 op->mdxcbfunc = cbfunc;
502 op->cbdata = cbdata;
503
504
505 (void)opal_snprintf_jobid(op->p.nspace, PMIX_MAX_NSLEN, proc->jobid);
506 op->p.rank = pmix4x_convert_opalrank(proc->vpid);
507
508
509 rc = PMIx_server_dmodex_request(&op->p, dmdx_response, op);
510 if (PMIX_SUCCESS != rc) {
511 OBJ_RELEASE(op);
512 }
513 return pmix4x_convert_rc(rc);
514 }
515
516
517 int pmix4x_server_notify_event(int status,
518 const opal_process_name_t *source,
519 opal_list_t *info,
520 opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
521 {
522 opal_value_t *kv;
523 pmix_info_t *pinfo;
524 size_t sz, n;
525 pmix_status_t rc;
526 pmix4x_opcaddy_t *op;
527 pmix_data_range_t range = PMIX_RANGE_SESSION;
528
529 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
530 if (0 >= opal_pmix_base.initialized) {
531 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
532 return OPAL_ERR_NOT_INITIALIZED;
533 }
534 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
535
536
537 if (NULL != info && 0 < (sz = opal_list_get_size(info))) {
538 PMIX_INFO_CREATE(pinfo, sz);
539 n = 0;
540 OPAL_LIST_FOREACH(kv, info, opal_value_t) {
541 (void)strncpy(pinfo[n].key, kv->key, PMIX_MAX_KEYLEN);
542 if (0 == strcmp(kv->key, OPAL_PMIX_JOB_TERM_STATUS)) {
543 pinfo[n].value.type = PMIX_STATUS;
544 pinfo[n].value.data.status = pmix4x_convert_opalrc(kv->data.integer);
545 } else {
546 pmix4x_value_load(&pinfo[n].value, kv);
547 if (0 == strcmp(kv->key, OPAL_PMIX_EVENT_CUSTOM_RANGE)) {
548 range = PMIX_RANGE_CUSTOM;
549 }
550 }
551 ++n;
552 }
553 } else {
554 sz = 0;
555 pinfo = NULL;
556 }
557
558 op = OBJ_NEW(pmix4x_opcaddy_t);
559 op->info = pinfo;
560 op->sz = sz;
561 op->opcbfunc = cbfunc;
562 op->cbdata = cbdata;
563
564 if (NULL == source) {
565 (void)opal_snprintf_jobid(op->p.nspace, PMIX_MAX_NSLEN, OPAL_JOBID_INVALID);
566 op->p.rank = pmix4x_convert_opalrank(OPAL_VPID_INVALID);
567 } else {
568 (void)opal_snprintf_jobid(op->p.nspace, PMIX_MAX_NSLEN, source->jobid);
569 op->p.rank = pmix4x_convert_opalrank(source->vpid);
570 }
571
572
573 rc = pmix4x_convert_opalrc(status);
574
575
576 rc = PMIx_Notify_event(rc, &op->p, range,
577 pinfo, sz, opcbfunc, op);
578 if (PMIX_SUCCESS != rc) {
579 OBJ_RELEASE(op);
580 }
581 return pmix4x_convert_rc(rc);
582 }
583
584 int pmix4x_server_iof_push(const opal_process_name_t *source,
585 opal_pmix_iof_channel_t channel,
586 unsigned char *data, size_t nbytes)
587 {
588 pmix4x_opcaddy_t *op;
589 pmix_byte_object_t bo;
590 pmix_iof_channel_t pchan;
591 opal_pmix_lock_t lock;
592 pmix_status_t rc;
593 int ret;
594
595 opal_output_verbose(2, opal_pmix_base_framework.framework_output,
596 "%s IOF push from %s with %d bytes",
597 OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
598 OPAL_NAME_PRINT(*source), (int)nbytes);
599
600 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
601 if (0 >= opal_pmix_base.initialized) {
602 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
603 return OPAL_ERR_NOT_INITIALIZED;
604 }
605 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
606
607
608 op = OBJ_NEW(pmix4x_opcaddy_t);
609
610 (void)opal_snprintf_jobid(op->p.nspace, PMIX_MAX_NSLEN, source->jobid);
611 op->p.rank = pmix4x_convert_opalrank(source->vpid);
612
613 pchan = 0;
614 if (OPAL_PMIX_FWD_STDIN_CHANNEL & channel) {
615 pchan |= PMIX_FWD_STDIN_CHANNEL;
616 }
617 if (OPAL_PMIX_FWD_STDOUT_CHANNEL & channel) {
618 pchan |= PMIX_FWD_STDOUT_CHANNEL;
619 }
620 if (OPAL_PMIX_FWD_STDERR_CHANNEL & channel) {
621 pchan |= PMIX_FWD_STDERR_CHANNEL;
622 }
623 if (OPAL_PMIX_FWD_STDDIAG_CHANNEL & channel) {
624 pchan |= PMIX_FWD_STDDIAG_CHANNEL;
625 }
626
627
628 PMIX_BYTE_OBJECT_CONSTRUCT(&bo);
629 if (0 < nbytes) {
630 bo.bytes = (char*)data;
631 }
632 bo.size = nbytes;
633
634
635 OPAL_PMIX_CONSTRUCT_LOCK(&lock);
636 rc = PMIx_server_IOF_deliver(&op->p, pchan, &bo, NULL, 0, lkcbfunc, (void*)&lock);
637 if (PMIX_SUCCESS != rc) {
638 ret = pmix4x_convert_rc(rc);
639 } else {
640
641 OPAL_PMIX_WAIT_THREAD(&lock);
642 ret = lock.status;
643 OPAL_PMIX_DESTRUCT_LOCK(&lock);
644 }
645
646 OBJ_RELEASE(op);
647
648 return ret;
649 }
650
651 static void final_cleanup(int status, void *cbdata)
652 {
653 pmix4x_opalcaddy_t *opalcaddy = (pmix4x_opalcaddy_t*)cbdata;
654 OBJ_RELEASE(opalcaddy);
655 }
656
657 static void setup_cbfunc(pmix_status_t status,
658 pmix_info_t info[], size_t ninfo,
659 void *provided_cbdata,
660 pmix_op_cbfunc_t cbfunc, void *cbdata)
661 {
662 pmix4x_opcaddy_t *op = (pmix4x_opcaddy_t*)provided_cbdata;
663 pmix4x_opalcaddy_t *opalcaddy;
664 size_t n;
665 opal_value_t *iptr;
666 int rc;
667 pmix_status_t ret = PMIX_SUCCESS;
668
669
670 opalcaddy = OBJ_NEW(pmix4x_opalcaddy_t);
671
672 rc = pmix4x_convert_rc(status);
673 if (OPAL_SUCCESS == rc && NULL != info) {
674
675 for (n=0; n < ninfo; n++) {
676 iptr = OBJ_NEW(opal_value_t);
677 opal_list_append(&opalcaddy->info, &iptr->super);
678 iptr->key = strdup(info[n].key);
679 if (OPAL_SUCCESS != (rc = pmix4x_value_unload(iptr, &info[n].value))) {
680 OBJ_RELEASE(opalcaddy);
681 ret = pmix4x_convert_opalrc(rc);
682 goto done;
683 }
684 }
685 }
686
687 done:
688
689 if (NULL != cbfunc) {
690 cbfunc(ret, cbdata);
691 }
692
693 if (NULL != op->setupcbfunc) {
694 op->setupcbfunc(rc, &opalcaddy->info, op->cbdata,
695 final_cleanup, opalcaddy);
696 }
697 OBJ_RELEASE(op);
698 }
699
700 int pmix4x_server_setup_application(opal_jobid_t jobid,
701 opal_list_t *info,
702 opal_pmix_setup_application_cbfunc_t cbfunc, void *cbdata)
703 {
704 opal_value_t *kv;
705 pmix_info_t *pinfo;
706 size_t sz, n;
707 pmix_status_t rc;
708 pmix4x_opcaddy_t *op;
709
710 opal_output_verbose(2, opal_pmix_base_framework.framework_output,
711 "%s setup application for job %s",
712 OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
713 OPAL_JOBID_PRINT(jobid));
714
715 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
716 if (0 >= opal_pmix_base.initialized) {
717 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
718 return OPAL_ERR_NOT_INITIALIZED;
719 }
720 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
721
722
723 if (NULL != info && 0 < (sz = opal_list_get_size(info))) {
724 PMIX_INFO_CREATE(pinfo, sz);
725 n = 0;
726 OPAL_LIST_FOREACH(kv, info, opal_value_t) {
727 (void)strncpy(pinfo[n].key, kv->key, PMIX_MAX_KEYLEN);
728 pmix4x_value_load(&pinfo[n].value, kv);
729 ++n;
730 }
731 } else {
732 sz = 0;
733 pinfo = NULL;
734 }
735
736 op = OBJ_NEW(pmix4x_opcaddy_t);
737 op->info = pinfo;
738 op->sz = sz;
739 op->setupcbfunc = cbfunc;
740 op->cbdata = cbdata;
741
742 (void)opal_snprintf_jobid(op->p.nspace, PMIX_MAX_NSLEN, jobid);
743
744 rc = PMIx_server_setup_application(op->p.nspace, op->info, op->sz,
745 setup_cbfunc, op);
746 if (PMIX_SUCCESS != rc) {
747 OBJ_RELEASE(op);
748 }
749 return pmix4x_convert_rc(rc);
750 }
751
752 int pmix4x_server_setup_local_support(opal_jobid_t jobid,
753 opal_list_t *info,
754 opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
755 {
756 opal_value_t *kv;
757 pmix_info_t *pinfo;
758 size_t sz, n;
759 pmix_status_t rc;
760 pmix4x_opcaddy_t *op;
761
762 opal_output_verbose(2, opal_pmix_base_framework.framework_output,
763 "%s setup local support for job %s",
764 OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
765 OPAL_JOBID_PRINT(jobid));
766
767 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
768 if (0 >= opal_pmix_base.initialized) {
769 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
770 return OPAL_ERR_NOT_INITIALIZED;
771 }
772 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
773
774
775 if (NULL != info && 0 < (sz = opal_list_get_size(info))) {
776 PMIX_INFO_CREATE(pinfo, sz);
777 n = 0;
778 OPAL_LIST_FOREACH(kv, info, opal_value_t) {
779 (void)strncpy(pinfo[n].key, kv->key, PMIX_MAX_KEYLEN);
780 pmix4x_value_load(&pinfo[n].value, kv);
781 ++n;
782 }
783 } else {
784 sz = 0;
785 pinfo = NULL;
786 }
787
788 op = OBJ_NEW(pmix4x_opcaddy_t);
789 op->info = pinfo;
790 op->sz = sz;
791 op->opcbfunc = cbfunc;
792 op->cbdata = cbdata;
793
794 (void)opal_snprintf_jobid(op->p.nspace, PMIX_MAX_NSLEN, jobid);
795
796 rc = PMIx_server_setup_local_support(op->p.nspace, op->info, op->sz,
797 opcbfunc, op);
798 if (PMIX_SUCCESS != rc) {
799 OBJ_RELEASE(op);
800 }
801 return pmix4x_convert_rc(rc);
802 }