This source file includes following definitions.
- errreg_cbfunc
- opcbfunc
- lkcbfunc
- ext3x_server_init
- dereg_cbfunc
- ext3x_server_finalize
- ext3x_server_gen_regex
- ext3x_server_gen_ppn
- ext3x_server_register_nspace
- ext3x_server_deregister_nspace
- ext3x_server_register_client
- ext3x_server_deregister_client
- ext3x_server_setup_fork
- dmdx_response
- ext3x_server_dmodex
- ext3x_server_notify_event
- ext3x_server_iof_push
- final_cleanup
- setup_cbfunc
- ext3x_server_setup_application
- ext3x_server_setup_local_support
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 #include "opal_config.h"
24 #include "opal/constants.h"
25 #include "opal/types.h"
26
27 #ifdef HAVE_STRING_H
28 #include <string.h>
29 #endif
30 #ifdef HAVE_UNISTD_H
31 #include <unistd.h>
32 #endif
33
34 #include "opal/dss/dss.h"
35 #include "opal/mca/event/event.h"
36 #include "opal/mca/hwloc/base/base.h"
37 #include "opal/runtime/opal.h"
38 #include "opal/runtime/opal_progress_threads.h"
39 #include "opal/threads/threads.h"
40 #include "opal/util/argv.h"
41 #include "opal/util/error.h"
42 #include "opal/util/output.h"
43 #include "opal/util/opal_environ.h"
44 #include "opal/util/proc.h"
45 #include "opal/util/show_help.h"
46 #include "opal/util/string_copy.h"
47 #include "opal/mca/pmix/base/base.h"
48 #include "ext3x.h"
49
50 #include "pmix.h"
51 #include "pmix_server.h"
52
53
54
55
56
57
58 extern pmix_server_module_t mymodule;
59 extern opal_pmix_server_module_t *host_module;
60 static char *dbgvalue=NULL;
61
62 static void errreg_cbfunc (pmix_status_t status,
63 size_t errhandler_ref,
64 void *cbdata)
65 {
66 opal_ext3x_event_t *ev = (opal_ext3x_event_t*)cbdata;
67
68 OPAL_ACQUIRE_OBJECT(ev);
69 ev->index = errhandler_ref;
70 opal_output_verbose(5, opal_pmix_base_framework.framework_output,
71 "PMIX server errreg_cbfunc - error handler registered status=%d, reference=%lu",
72 status, (unsigned long)errhandler_ref);
73 OPAL_POST_OBJECT(ev);
74 OPAL_PMIX_WAKEUP_THREAD(&ev->lock);
75 }
76
77 static void opcbfunc(pmix_status_t status, void *cbdata)
78 {
79 ext3x_opcaddy_t *op = (ext3x_opcaddy_t*)cbdata;
80
81 OPAL_ACQUIRE_OBJECT(op);
82
83 if (NULL != op->opcbfunc) {
84 op->opcbfunc(ext3x_convert_rc(status), op->cbdata);
85 }
86 OBJ_RELEASE(op);
87 }
88
89 static void lkcbfunc(pmix_status_t status, void *cbdata)
90 {
91 opal_pmix_lock_t *lk = (opal_pmix_lock_t*)cbdata;
92
93 OPAL_POST_OBJECT(lk);
94 lk->status = ext3x_convert_rc(status);
95 OPAL_PMIX_WAKEUP_THREAD(lk);
96 }
97
98 int ext3x_server_init(opal_pmix_server_module_t *module,
99 opal_list_t *info)
100 {
101 pmix_status_t rc;
102 int dbg;
103 opal_value_t *kv;
104 pmix_info_t *pinfo;
105 size_t sz, n;
106 opal_ext3x_event_t *event;
107 opal_ext3x_jobid_trkr_t *job;
108 opal_pmix_lock_t lk;
109
110 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
111
112 if (0 == opal_pmix_base.initialized) {
113 if (0 < (dbg = opal_output_get_verbosity(opal_pmix_base_framework.framework_output))) {
114 asprintf(&dbgvalue, "PMIX_DEBUG=%d", dbg);
115 putenv(dbgvalue);
116 }
117
118 if (OPAL_SUCCESS != (dbg = opal_pmix_ext3x_check_evars())) {
119 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
120 return dbg;
121 }
122 }
123 ++opal_pmix_base.initialized;
124
125
126 sz = 2 + ((NULL==info)?0:opal_list_get_size(info));
127 PMIX_INFO_CREATE(pinfo, sz);
128 n = 0;
129 if (NULL != info) {
130 OPAL_LIST_FOREACH(kv, info, opal_value_t) {
131 (void)opal_string_copy(pinfo[n].key, kv->key, PMIX_MAX_KEYLEN);
132 ext3x_value_load(&pinfo[n].value, kv);
133 ++n;
134 }
135 }
136
137
138
139 job = OBJ_NEW(opal_ext3x_jobid_trkr_t);
140 (void)opal_snprintf_jobid(job->nspace, PMIX_MAX_NSLEN, OPAL_PROC_MY_NAME.jobid);
141 job->jobid = OPAL_PROC_MY_NAME.jobid;
142 opal_list_append(&mca_pmix_ext3x_component.jobids, &job->super);
143 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
144
145
146 PMIX_INFO_LOAD(&pinfo[sz-2], PMIX_SERVER_NSPACE, job->nspace, PMIX_STRING);
147 PMIX_INFO_LOAD(&pinfo[sz-1], PMIX_SERVER_RANK, &OPAL_PROC_MY_NAME.vpid, PMIX_PROC_RANK);
148 if (PMIX_SUCCESS != (rc = PMIx_server_init(&mymodule, pinfo, sz))) {
149 PMIX_INFO_FREE(pinfo, sz);
150 return ext3x_convert_rc(rc);
151 }
152 PMIX_INFO_FREE(pinfo, sz);
153
154
155 host_module = module;
156
157
158 event = OBJ_NEW(opal_ext3x_event_t);
159 opal_list_append(&mca_pmix_ext3x_component.events, &event->super);
160 PMIX_INFO_CREATE(pinfo, 1);
161 PMIX_INFO_LOAD(&pinfo[0], PMIX_EVENT_HDLR_NAME, "OPAL-PMIX-2X-SERVER-DEFAULT", PMIX_STRING);
162 PMIx_Register_event_handler(NULL, 0, pinfo, 1, ext3x_event_hdlr, errreg_cbfunc, (void*)event);
163 OPAL_PMIX_WAIT_THREAD(&event->lock);
164 PMIX_INFO_FREE(pinfo, 1);
165
166
167
168 OPAL_PMIX_CONSTRUCT_LOCK(&lk);
169 PMIX_INFO_CREATE(pinfo, 1);
170 PMIX_INFO_LOAD(&pinfo[0], PMIX_REGISTER_NODATA, NULL, PMIX_BOOL);
171 PMIx_server_register_nspace(job->nspace, 1, pinfo, 1, lkcbfunc, (void*)&lk);
172 OPAL_PMIX_WAIT_THREAD(&lk);
173 OPAL_PMIX_DESTRUCT_LOCK(&lk);
174 PMIX_INFO_FREE(pinfo, 1);
175
176 return OPAL_SUCCESS;
177 }
178
179 static void dereg_cbfunc(pmix_status_t st, void *cbdata)
180 {
181 opal_ext3x_event_t *ev = (opal_ext3x_event_t*)cbdata;
182 OPAL_PMIX_WAKEUP_THREAD(&ev->lock);
183 }
184
185 int ext3x_server_finalize(void)
186 {
187 pmix_status_t rc;
188 opal_ext3x_event_t *event, *ev2;
189 opal_list_t evlist;
190 OBJ_CONSTRUCT(&evlist, opal_list_t);
191
192 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
193 --opal_pmix_base.initialized;
194
195 if (0 < opal_pmix_base.initialized) {
196
197 OPAL_LIST_FOREACH_SAFE(event, ev2, &mca_pmix_ext3x_component.events, opal_ext3x_event_t) {
198 OPAL_PMIX_DESTRUCT_LOCK(&event->lock);
199 OPAL_PMIX_CONSTRUCT_LOCK(&event->lock);
200 PMIx_Deregister_event_handler(event->index, dereg_cbfunc, (void*)event);
201 opal_list_remove_item(&mca_pmix_ext3x_component.events, &event->super);
202
203
204 opal_list_append(&evlist, &event->super);
205 }
206 }
207 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
208 OPAL_LIST_FOREACH_SAFE(event, ev2, &evlist, opal_ext3x_event_t) {
209 OPAL_PMIX_WAIT_THREAD(&event->lock);
210 opal_list_remove_item(&evlist, &event->super);
211 OBJ_RELEASE(event);
212 }
213 OBJ_DESTRUCT(&evlist);
214 rc = PMIx_server_finalize();
215 return ext3x_convert_rc(rc);
216 }
217
218 int ext3x_server_gen_regex(const char *input, char **regex)
219 {
220 pmix_status_t rc;
221
222 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
223 if (0 >= opal_pmix_base.initialized) {
224 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
225 return OPAL_ERR_NOT_INITIALIZED;
226 }
227 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
228
229 rc = PMIx_generate_regex(input, regex);
230 return ext3x_convert_rc(rc);
231 }
232
233
234 int ext3x_server_gen_ppn(const char *input, char **ppn)
235 {
236 pmix_status_t rc;
237
238 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
239 if (0 >= opal_pmix_base.initialized) {
240 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
241 return OPAL_ERR_NOT_INITIALIZED;
242 }
243 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
244
245 rc = PMIx_generate_ppn(input, ppn);
246 return ext3x_convert_rc(rc);
247 }
248
249 int ext3x_server_register_nspace(opal_jobid_t jobid,
250 int nlocalprocs,
251 opal_list_t *info,
252 opal_pmix_op_cbfunc_t cbfunc,
253 void *cbdata)
254 {
255 opal_value_t *kv, *k2;
256 pmix_info_t *pinfo = NULL, *pmap;
257 size_t sz, szmap, m, n;
258 char nspace[PMIX_MAX_NSLEN];
259 pmix_status_t rc;
260 opal_list_t *pmapinfo;
261 opal_ext3x_jobid_trkr_t *job;
262 opal_pmix_lock_t lock;
263 int ret;
264
265 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
266 if (0 >= opal_pmix_base.initialized) {
267 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
268 return OPAL_ERR_NOT_INITIALIZED;
269 }
270
271
272 (void)opal_snprintf_jobid(nspace, PMIX_MAX_NSLEN, jobid);
273
274
275 job = OBJ_NEW(opal_ext3x_jobid_trkr_t);
276 (void)opal_string_copy(job->nspace, nspace, PMIX_MAX_NSLEN);
277 job->jobid = jobid;
278 opal_list_append(&mca_pmix_ext3x_component.jobids, &job->super);
279 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
280
281
282 if (NULL != info && 0 < (sz = opal_list_get_size(info))) {
283 PMIX_INFO_CREATE(pinfo, sz);
284 n = 0;
285 OPAL_LIST_FOREACH(kv, info, opal_value_t) {
286 (void)opal_string_copy(pinfo[n].key, kv->key, PMIX_MAX_KEYLEN);
287 if (0 == strcmp(kv->key, OPAL_PMIX_PROC_DATA)) {
288 pinfo[n].value.type = PMIX_DATA_ARRAY;
289
290
291 pmapinfo = (opal_list_t*)kv->data.ptr;
292 szmap = opal_list_get_size(pmapinfo);
293 if (0 < szmap) {
294 PMIX_INFO_CREATE(pmap, szmap);
295 pinfo[n].value.data.darray = (pmix_data_array_t*)calloc(1, sizeof(pmix_data_array_t));
296 pinfo[n].value.data.darray->type = PMIX_INFO;
297 pinfo[n].value.data.darray->array = (struct pmix_info_t*)pmap;
298 pinfo[n].value.data.darray->size = szmap;
299 m = 0;
300 OPAL_LIST_FOREACH(k2, pmapinfo, opal_value_t) {
301 (void)opal_string_copy(pmap[m].key, k2->key, PMIX_MAX_KEYLEN);
302 ext3x_value_load(&pmap[m].value, k2);
303 ++m;
304 }
305 }
306 OPAL_LIST_RELEASE(pmapinfo);
307 } else {
308 ext3x_value_load(&pinfo[n].value, kv);
309 }
310 ++n;
311 }
312 } else {
313 sz = 0;
314 pinfo = NULL;
315 }
316
317 OPAL_PMIX_CONSTRUCT_LOCK(&lock);
318 rc = PMIx_server_register_nspace(nspace, nlocalprocs, pinfo, sz,
319 lkcbfunc, (void*)&lock);
320 if (PMIX_SUCCESS == rc) {
321 OPAL_PMIX_WAIT_THREAD(&lock);
322 }
323 OPAL_PMIX_DESTRUCT_LOCK(&lock);
324
325 if (NULL != pinfo) {
326 PMIX_INFO_FREE(pinfo, sz);
327 }
328
329 ret = ext3x_convert_rc(rc);
330
331
332 if (NULL != cbfunc) {
333 cbfunc(ret, cbdata);
334 }
335 return ret;
336 }
337
338 void ext3x_server_deregister_nspace(opal_jobid_t jobid,
339 opal_pmix_op_cbfunc_t cbfunc,
340 void *cbdata)
341 {
342 opal_ext3x_jobid_trkr_t *jptr;
343 opal_pmix_lock_t lock;
344
345 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
346 if (0 >= opal_pmix_base.initialized) {
347 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
348
349 if (NULL != cbfunc) {
350 cbfunc(OPAL_ERR_NOT_INITIALIZED, cbdata);
351 }
352 return;
353 }
354
355
356 OPAL_LIST_FOREACH(jptr, &mca_pmix_ext3x_component.jobids, opal_ext3x_jobid_trkr_t) {
357 if (jptr->jobid == jobid) {
358
359 OPAL_PMIX_CONSTRUCT_LOCK(&lock);
360 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
361 PMIx_server_deregister_nspace(jptr->nspace, lkcbfunc, (void*)&lock);
362 OPAL_PMIX_WAIT_THREAD(&lock);
363 OPAL_PMIX_DESTRUCT_LOCK(&lock);
364
365 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
366 opal_list_remove_item(&mca_pmix_ext3x_component.jobids, &jptr->super);
367 OBJ_RELEASE(jptr);
368 break;
369 }
370 }
371
372 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
373
374 if (NULL != cbfunc) {
375 cbfunc(OPAL_SUCCESS, cbdata);
376 }
377 }
378
379 int ext3x_server_register_client(const opal_process_name_t *proc,
380 uid_t uid, gid_t gid,
381 void *server_object,
382 opal_pmix_op_cbfunc_t cbfunc,
383 void *cbdata)
384 {
385 pmix_status_t rc;
386 pmix_proc_t p;
387 opal_pmix_lock_t lock;
388
389 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
390 if (0 >= opal_pmix_base.initialized) {
391 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
392 return OPAL_ERR_NOT_INITIALIZED;
393 }
394 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
395
396
397 (void)opal_snprintf_jobid(p.nspace, PMIX_MAX_NSLEN, proc->jobid);
398 p.rank = ext3x_convert_opalrank(proc->vpid);
399
400 OPAL_PMIX_CONSTRUCT_LOCK(&lock);
401 rc = PMIx_server_register_client(&p, uid, gid, server_object,
402 lkcbfunc, (void*)&lock);
403 if (PMIX_SUCCESS == rc) {
404 OPAL_PMIX_WAIT_THREAD(&lock);
405 }
406 OPAL_PMIX_DESTRUCT_LOCK(&lock);
407 return ext3x_convert_rc(rc);
408 }
409
410
411
412 void ext3x_server_deregister_client(const opal_process_name_t *proc,
413 opal_pmix_op_cbfunc_t cbfunc,
414 void *cbdata)
415 {
416 opal_ext3x_jobid_trkr_t *jptr;
417 pmix_proc_t p;
418 opal_pmix_lock_t lock;
419
420 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
421 if (0 >= opal_pmix_base.initialized) {
422 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
423 if (NULL != cbfunc) {
424 cbfunc(OPAL_ERR_NOT_INITIALIZED, cbdata);
425 }
426 return;
427 }
428
429
430 OPAL_LIST_FOREACH(jptr, &mca_pmix_ext3x_component.jobids, opal_ext3x_jobid_trkr_t) {
431 if (jptr->jobid == proc->jobid) {
432
433 (void)opal_string_copy(p.nspace, jptr->nspace, PMIX_MAX_NSLEN);
434 p.rank = ext3x_convert_opalrank(proc->vpid);
435 OPAL_PMIX_CONSTRUCT_LOCK(&lock);
436 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
437 PMIx_server_deregister_client(&p, lkcbfunc, (void*)&lock);
438 OPAL_PMIX_WAIT_THREAD(&lock);
439 OPAL_PMIX_DESTRUCT_LOCK(&lock);
440 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
441 break;
442 }
443 }
444 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
445 if (NULL != cbfunc) {
446 cbfunc(OPAL_SUCCESS, cbdata);
447 }
448 }
449
450
451 int ext3x_server_setup_fork(const opal_process_name_t *proc, char ***env)
452 {
453 pmix_status_t rc;
454 pmix_proc_t p;
455
456 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
457 if (0 >= opal_pmix_base.initialized) {
458 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
459 return OPAL_ERR_NOT_INITIALIZED;
460 }
461 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
462
463
464 (void)opal_snprintf_jobid(p.nspace, PMIX_MAX_NSLEN, proc->jobid);
465 p.rank = ext3x_convert_opalrank(proc->vpid);
466
467 rc = PMIx_server_setup_fork(&p, env);
468 return ext3x_convert_rc(rc);
469 }
470
471
472
473
474 static void dmdx_response(pmix_status_t status, char *data, size_t sz, void *cbdata)
475 {
476 int rc;
477 ext3x_opcaddy_t *op = (ext3x_opcaddy_t*)cbdata;
478
479 rc = ext3x_convert_rc(status);
480 if (NULL != op->mdxcbfunc) {
481 op->mdxcbfunc(rc, data, sz, op->cbdata, NULL, NULL);
482 }
483 OBJ_RELEASE(op);
484 }
485
486
487 int ext3x_server_dmodex(const opal_process_name_t *proc,
488 opal_pmix_modex_cbfunc_t cbfunc, void *cbdata)
489 {
490 ext3x_opcaddy_t *op;
491 pmix_status_t rc;
492
493 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
494 if (0 >= opal_pmix_base.initialized) {
495 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
496 return OPAL_ERR_NOT_INITIALIZED;
497 }
498 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
499
500
501 op = OBJ_NEW(ext3x_opcaddy_t);
502 op->mdxcbfunc = cbfunc;
503 op->cbdata = cbdata;
504
505
506 (void)opal_snprintf_jobid(op->p.nspace, PMIX_MAX_NSLEN, proc->jobid);
507 op->p.rank = ext3x_convert_opalrank(proc->vpid);
508
509
510 rc = PMIx_server_dmodex_request(&op->p, dmdx_response, op);
511 if (PMIX_SUCCESS != rc) {
512 OBJ_RELEASE(op);
513 }
514 return ext3x_convert_rc(rc);
515 }
516
517
518 int ext3x_server_notify_event(int status,
519 const opal_process_name_t *source,
520 opal_list_t *info,
521 opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
522 {
523 opal_value_t *kv;
524 pmix_info_t *pinfo;
525 size_t sz, n;
526 pmix_status_t rc;
527 ext3x_opcaddy_t *op;
528
529 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
530 if (0 >= opal_pmix_base.initialized) {
531 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
532 return OPAL_ERR_NOT_INITIALIZED;
533 }
534 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
535
536
537 if (NULL != info && 0 < (sz = opal_list_get_size(info))) {
538 PMIX_INFO_CREATE(pinfo, sz);
539 n = 0;
540 OPAL_LIST_FOREACH(kv, info, opal_value_t) {
541 (void)opal_string_copy(pinfo[n].key, kv->key, PMIX_MAX_KEYLEN);
542 if (0 == strcmp(kv->key, OPAL_PMIX_JOB_TERM_STATUS)) {
543 pinfo[n].value.type = PMIX_STATUS;
544 pinfo[n].value.data.status = ext3x_convert_opalrc(kv->data.integer);
545 } else {
546 ext3x_value_load(&pinfo[n].value, kv);
547 }
548 ++n;
549 }
550 } else {
551 sz = 0;
552 pinfo = NULL;
553 }
554
555 op = OBJ_NEW(ext3x_opcaddy_t);
556 op->info = pinfo;
557 op->sz = sz;
558 op->opcbfunc = cbfunc;
559 op->cbdata = cbdata;
560
561 if (NULL == source) {
562 (void)opal_snprintf_jobid(op->p.nspace, PMIX_MAX_NSLEN, OPAL_JOBID_INVALID);
563 op->p.rank = ext3x_convert_opalrank(OPAL_VPID_INVALID);
564 } else {
565 (void)opal_snprintf_jobid(op->p.nspace, PMIX_MAX_NSLEN, source->jobid);
566 op->p.rank = ext3x_convert_opalrank(source->vpid);
567 }
568
569
570 rc = ext3x_convert_opalrc(status);
571
572
573 rc = PMIx_Notify_event(rc, &op->p, PMIX_RANGE_SESSION,
574 pinfo, sz, opcbfunc, op);
575 if (PMIX_SUCCESS != rc) {
576 OBJ_RELEASE(op);
577 }
578 return ext3x_convert_rc(rc);
579 }
580
581 int ext3x_server_iof_push(const opal_process_name_t *source,
582 opal_pmix_iof_channel_t channel,
583 unsigned char *data, size_t nbytes)
584 {
585 ext3x_opcaddy_t *op;
586 pmix_byte_object_t bo;
587 pmix_iof_channel_t pchan;
588 opal_pmix_lock_t lock;
589 pmix_status_t rc;
590 int ret;
591
592 opal_output_verbose(2, opal_pmix_base_framework.framework_output,
593 "%s IOF push from %s with %d bytes",
594 OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
595 OPAL_NAME_PRINT(*source), (int)nbytes);
596
597 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
598 if (0 >= opal_pmix_base.initialized) {
599 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
600 return OPAL_ERR_NOT_INITIALIZED;
601 }
602 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
603
604
605 op = OBJ_NEW(ext3x_opcaddy_t);
606
607 (void)opal_snprintf_jobid(op->p.nspace, PMIX_MAX_NSLEN, source->jobid);
608 op->p.rank = ext3x_convert_opalrank(source->vpid);
609
610 pchan = 0;
611 if (OPAL_PMIX_FWD_STDIN_CHANNEL & channel) {
612 pchan |= PMIX_FWD_STDIN_CHANNEL;
613 }
614 if (OPAL_PMIX_FWD_STDOUT_CHANNEL & channel) {
615 pchan |= PMIX_FWD_STDOUT_CHANNEL;
616 }
617 if (OPAL_PMIX_FWD_STDERR_CHANNEL & channel) {
618 pchan |= PMIX_FWD_STDERR_CHANNEL;
619 }
620 if (OPAL_PMIX_FWD_STDDIAG_CHANNEL & channel) {
621 pchan |= PMIX_FWD_STDDIAG_CHANNEL;
622 }
623
624
625 PMIX_BYTE_OBJECT_CONSTRUCT(&bo);
626 if (0 < nbytes) {
627 bo.bytes = (char*)data;
628 }
629 bo.size = nbytes;
630
631
632 OPAL_PMIX_CONSTRUCT_LOCK(&lock);
633 rc = PMIx_server_IOF_deliver(&op->p, pchan, &bo, NULL, 0, lkcbfunc, (void*)&lock);
634 if (PMIX_SUCCESS != rc) {
635 ret = ext3x_convert_rc(rc);
636 } else {
637
638 OPAL_PMIX_WAIT_THREAD(&lock);
639 ret = lock.status;
640 OPAL_PMIX_DESTRUCT_LOCK(&lock);
641 }
642
643 OBJ_RELEASE(op);
644
645 return ret;
646 }
647
648 static void final_cleanup(int status, void *cbdata)
649 {
650 ext3x_opalcaddy_t *opalcaddy = (ext3x_opalcaddy_t*)cbdata;
651 OBJ_RELEASE(opalcaddy);
652 }
653
654 static void setup_cbfunc(pmix_status_t status,
655 pmix_info_t info[], size_t ninfo,
656 void *provided_cbdata,
657 pmix_op_cbfunc_t cbfunc, void *cbdata)
658 {
659 ext3x_opcaddy_t *op = (ext3x_opcaddy_t*)provided_cbdata;
660 ext3x_opalcaddy_t *opalcaddy;
661 size_t n;
662 opal_value_t *iptr;
663 int rc;
664 pmix_status_t ret = PMIX_SUCCESS;
665
666
667 opalcaddy = OBJ_NEW(ext3x_opalcaddy_t);
668
669 rc = ext3x_convert_rc(status);
670 if (OPAL_SUCCESS == rc && NULL != info) {
671
672 for (n=0; n < ninfo; n++) {
673 iptr = OBJ_NEW(opal_value_t);
674 opal_list_append(&opalcaddy->info, &iptr->super);
675 iptr->key = strdup(info[n].key);
676 if (OPAL_SUCCESS != (rc = ext3x_value_unload(iptr, &info[n].value))) {
677 OBJ_RELEASE(opalcaddy);
678 ret = ext3x_convert_opalrc(rc);
679 goto done;
680 }
681 }
682 }
683
684 done:
685
686 if (NULL != cbfunc) {
687 cbfunc(ret, cbdata);
688 }
689
690 if (NULL != op->setupcbfunc) {
691 op->setupcbfunc(rc, &opalcaddy->info, op->cbdata,
692 final_cleanup, opalcaddy);
693 }
694 OBJ_RELEASE(op);
695 }
696
697 int ext3x_server_setup_application(opal_jobid_t jobid,
698 opal_list_t *info,
699 opal_pmix_setup_application_cbfunc_t cbfunc, void *cbdata)
700 {
701 opal_value_t *kv;
702 pmix_info_t *pinfo;
703 size_t sz, n;
704 pmix_status_t rc;
705 ext3x_opcaddy_t *op;
706
707 opal_output_verbose(2, opal_pmix_base_framework.framework_output,
708 "%s setup application for job %s",
709 OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
710 OPAL_JOBID_PRINT(jobid));
711
712 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
713 if (0 >= opal_pmix_base.initialized) {
714 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
715 return OPAL_ERR_NOT_INITIALIZED;
716 }
717 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
718
719
720 if (NULL != info && 0 < (sz = opal_list_get_size(info))) {
721 PMIX_INFO_CREATE(pinfo, sz);
722 n = 0;
723 OPAL_LIST_FOREACH(kv, info, opal_value_t) {
724 (void)opal_string_copy(pinfo[n].key, kv->key, PMIX_MAX_KEYLEN);
725 ext3x_value_load(&pinfo[n].value, kv);
726 ++n;
727 }
728 } else {
729 sz = 0;
730 pinfo = NULL;
731 }
732
733 op = OBJ_NEW(ext3x_opcaddy_t);
734 op->info = pinfo;
735 op->sz = sz;
736 op->setupcbfunc = cbfunc;
737 op->cbdata = cbdata;
738
739 (void)opal_snprintf_jobid(op->p.nspace, PMIX_MAX_NSLEN, jobid);
740
741 rc = PMIx_server_setup_application(op->p.nspace, op->info, op->sz,
742 setup_cbfunc, op);
743 if (PMIX_SUCCESS != rc) {
744 OBJ_RELEASE(op);
745 }
746 return ext3x_convert_rc(rc);
747 }
748
749 int ext3x_server_setup_local_support(opal_jobid_t jobid,
750 opal_list_t *info,
751 opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
752 {
753 opal_value_t *kv;
754 pmix_info_t *pinfo;
755 size_t sz, n;
756 pmix_status_t rc;
757 ext3x_opcaddy_t *op;
758
759 opal_output_verbose(2, opal_pmix_base_framework.framework_output,
760 "%s setup local support for job %s",
761 OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
762 OPAL_JOBID_PRINT(jobid));
763
764 OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
765 if (0 >= opal_pmix_base.initialized) {
766 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
767 return OPAL_ERR_NOT_INITIALIZED;
768 }
769 OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
770
771
772 if (NULL != info && 0 < (sz = opal_list_get_size(info))) {
773 PMIX_INFO_CREATE(pinfo, sz);
774 n = 0;
775 OPAL_LIST_FOREACH(kv, info, opal_value_t) {
776 (void)opal_string_copy(pinfo[n].key, kv->key, PMIX_MAX_KEYLEN);
777 ext3x_value_load(&pinfo[n].value, kv);
778 ++n;
779 }
780 } else {
781 sz = 0;
782 pinfo = NULL;
783 }
784
785 op = OBJ_NEW(ext3x_opcaddy_t);
786 op->info = pinfo;
787 op->sz = sz;
788 op->opcbfunc = cbfunc;
789 op->cbdata = cbdata;
790
791 (void)opal_snprintf_jobid(op->p.nspace, PMIX_MAX_NSLEN, jobid);
792
793 rc = PMIx_server_setup_local_support(op->p.nspace, op->info, op->sz,
794 opcbfunc, op);
795 if (PMIX_SUCCESS != rc) {
796 OBJ_RELEASE(op);
797 }
798 return ext3x_convert_rc(rc);
799 }