This source file includes following definitions.
- pmix_server_launch_resp
- spawn
- pmix_server_spawn_fn
- _cnlk
- _cnct
- pmix_server_connect_fn
- mdxcbfunc
- pmix_server_disconnect_fn
- pmix_server_alloc_fn
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 #include "orte_config.h"
30
31 #ifdef HAVE_UNISTD_H
32 #include <unistd.h>
33 #endif
34
35 #include "opal/util/argv.h"
36 #include "opal/util/opal_getcwd.h"
37 #include "opal/util/os_path.h"
38 #include "opal/util/output.h"
39 #include "opal/util/path.h"
40 #include "opal/dss/dss.h"
41 #include "opal/mca/hwloc/hwloc-internal.h"
42
43 #include "orte/mca/errmgr/errmgr.h"
44 #include "orte/mca/rmaps/base/base.h"
45 #include "orte/mca/rml/base/rml_contact.h"
46 #include "orte/mca/state/state.h"
47 #include "orte/util/name_fns.h"
48 #include "orte/util/show_help.h"
49 #include "orte/util/threads.h"
50 #include "orte/runtime/orte_globals.h"
51 #include "orte/mca/rml/rml.h"
52
53 #include "orte/orted/pmix/pmix_server.h"
54 #include "orte/orted/pmix/pmix_server_internal.h"
55
56 void pmix_server_launch_resp(int status, orte_process_name_t* sender,
57 opal_buffer_t *buffer,
58 orte_rml_tag_t tg, void *cbdata)
59 {
60 pmix_server_req_t *req;
61 int rc, room;
62 int32_t ret, cnt;
63 orte_jobid_t jobid;
64 orte_job_t *jdata;
65
66
67 cnt = 1;
68 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &ret, &cnt, OPAL_INT32))) {
69 ORTE_ERROR_LOG(rc);
70 return;
71 }
72
73
74 cnt = 1;
75 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &jobid, &cnt, ORTE_JOBID))) {
76 ORTE_ERROR_LOG(rc);
77 return;
78 }
79
80
81 cnt = 1;
82 if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &room, &cnt, OPAL_INT))) {
83 ORTE_ERROR_LOG(rc);
84 return;
85 }
86
87
88 opal_hotel_checkout_and_return_occupant(&orte_pmix_server_globals.reqs, room, (void**)&req);
89 if (NULL == req) {
90
91 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
92 return;
93 }
94
95
96 if (NULL != req->spcbfunc) {
97 req->spcbfunc(ret, jobid, req->cbdata);
98 }
99
100 if (ORTE_SUCCESS != ret) {
101 jdata = orte_get_job_data_object(jobid);
102 ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_TERMINATED);
103 }
104
105 OBJ_RELEASE(req);
106 }
107
108 static void spawn(int sd, short args, void *cbdata)
109 {
110 pmix_server_req_t *req = (pmix_server_req_t*)cbdata;
111 int rc;
112 opal_buffer_t *buf;
113 orte_plm_cmd_flag_t command;
114
115 ORTE_ACQUIRE_OBJECT(req);
116
117
118 if (OPAL_SUCCESS != (rc = opal_hotel_checkin(&orte_pmix_server_globals.reqs, req, &req->room_num))) {
119 orte_show_help("help-orted.txt", "noroom", true, req->operation, orte_pmix_server_globals.num_rooms);
120 goto callback;
121 }
122
123
124 orte_set_attribute(&req->jdata->attributes, ORTE_JOB_ROOM_NUM,
125 ORTE_ATTR_GLOBAL, &req->room_num, OPAL_INT);
126
127
128 buf = OBJ_NEW(opal_buffer_t);
129 command = ORTE_PLM_LAUNCH_JOB_CMD;
130 if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_PLM_CMD))) {
131 ORTE_ERROR_LOG(rc);
132 OBJ_RELEASE(buf);
133 opal_hotel_checkout(&orte_pmix_server_globals.reqs, req->room_num);
134 goto callback;
135 }
136
137
138 if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &req->jdata, 1, ORTE_JOB))) {
139 ORTE_ERROR_LOG(rc);
140 opal_hotel_checkout(&orte_pmix_server_globals.reqs, req->room_num);
141 OBJ_RELEASE(buf);
142 goto callback;
143
144 }
145
146
147 if (ORTE_SUCCESS != (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf,
148 ORTE_RML_TAG_PLM,
149 orte_rml_send_callback, NULL))) {
150 ORTE_ERROR_LOG(rc);
151 opal_hotel_checkout(&orte_pmix_server_globals.reqs, req->room_num);
152 OBJ_RELEASE(buf);
153 goto callback;
154 }
155 return;
156
157 callback:
158
159 if (NULL != req->mdxcbfunc) {
160 req->mdxcbfunc(rc, NULL, 0, req->cbdata, NULL, NULL);
161 }
162 OBJ_RELEASE(req);
163 }
164
165 int pmix_server_spawn_fn(opal_process_name_t *requestor,
166 opal_list_t *job_info, opal_list_t *apps,
167 opal_pmix_spawn_cbfunc_t cbfunc, void *cbdata)
168 {
169 orte_job_t *jdata;
170 orte_app_context_t *app;
171 opal_pmix_app_t *papp;
172 opal_value_t *info, *next;
173 opal_list_t *cache;
174 int rc, i;
175 char cwd[OPAL_PATH_MAX];
176 bool flag;
177
178 opal_output_verbose(2, orte_pmix_server_globals.output,
179 "%s spawn called from proc %s",
180 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
181 ORTE_NAME_PRINT(requestor));
182
183
184 jdata = OBJ_NEW(orte_job_t);
185 jdata->map = OBJ_NEW(orte_job_map_t);
186
187
188 OPAL_LIST_FOREACH(papp, apps, opal_pmix_app_t) {
189 app = OBJ_NEW(orte_app_context_t);
190 app->idx = opal_pointer_array_add(jdata->apps, app);
191 jdata->num_apps++;
192 if (NULL != papp->cmd) {
193 app->app = strdup(papp->cmd);
194 } else if (NULL == papp->argv ||
195 NULL == papp->argv[0]) {
196 ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
197 OBJ_RELEASE(jdata);
198 return ORTE_ERR_BAD_PARAM;
199 } else {
200 app->app = strdup(papp->argv[0]);
201 }
202 if (NULL != papp->argv) {
203 app->argv = opal_argv_copy(papp->argv);
204 }
205 if (NULL != papp->env) {
206 app->env = opal_argv_copy(papp->env);
207 }
208 if (NULL != papp->cwd) {
209 app->cwd = strdup(papp->cwd);
210 }
211 app->num_procs = papp->maxprocs;
212 OPAL_LIST_FOREACH(info, &papp->info, opal_value_t) {
213 if (0 == strcmp(info->key, OPAL_PMIX_HOST)) {
214 orte_set_attribute(&app->attributes, ORTE_APP_DASH_HOST,
215 ORTE_ATTR_GLOBAL, info->data.string, OPAL_STRING);
216 } else if (0 == strcmp(info->key, OPAL_PMIX_HOSTFILE)) {
217 orte_set_attribute(&app->attributes, ORTE_APP_HOSTFILE,
218 ORTE_ATTR_GLOBAL, info->data.string, OPAL_STRING);
219 } else if (0 == strcmp(info->key, OPAL_PMIX_ADD_HOSTFILE)) {
220 orte_set_attribute(&app->attributes, ORTE_APP_ADD_HOSTFILE,
221 ORTE_ATTR_GLOBAL, info->data.string, OPAL_STRING);
222 } else if (0 == strcmp(info->key, OPAL_PMIX_ADD_HOST)) {
223 orte_set_attribute(&app->attributes, ORTE_APP_ADD_HOST,
224 ORTE_ATTR_GLOBAL, info->data.string, OPAL_STRING);
225 } else if (0 == strcmp(info->key, OPAL_PMIX_PREFIX)) {
226 orte_set_attribute(&app->attributes, ORTE_APP_PREFIX_DIR,
227 ORTE_ATTR_GLOBAL, info->data.string, OPAL_STRING);
228 } else if (0 == strcmp(info->key, OPAL_PMIX_WDIR)) {
229
230 if (opal_path_is_absolute(info->data.string)) {
231 app->cwd = strdup(info->data.string);
232 } else {
233
234 if (OPAL_SUCCESS != (rc = opal_getcwd(cwd, sizeof(cwd)))) {
235 orte_show_help("help-orted.txt", "cwd", true, "spawn", rc);
236 OBJ_RELEASE(jdata);
237 return rc;
238 }
239
240 app->cwd = opal_os_path(false, cwd, info->data.string, NULL);
241 }
242 } else if (0 == strcmp(info->key, OPAL_PMIX_PRELOAD_BIN)) {
243 OPAL_CHECK_BOOL(info, flag);
244 orte_set_attribute(&app->attributes, ORTE_APP_PRELOAD_BIN,
245 ORTE_ATTR_GLOBAL, &flag, OPAL_BOOL);
246 } else if (0 == strcmp(info->key, OPAL_PMIX_PRELOAD_FILES)) {
247 orte_set_attribute(&app->attributes, ORTE_APP_PRELOAD_FILES,
248 ORTE_ATTR_GLOBAL, info->data.string, OPAL_STRING);
249
250
251
252 } else if (0 == strcmp(info->key, OPAL_PMIX_SET_ENVAR)) {
253 orte_add_attribute(&app->attributes, ORTE_APP_SET_ENVAR,
254 ORTE_ATTR_GLOBAL, &info->data.envar, OPAL_ENVAR);
255 } else if (0 == strcmp(info->key, OPAL_PMIX_ADD_ENVAR)) {
256 orte_add_attribute(&app->attributes, ORTE_APP_ADD_ENVAR,
257 ORTE_ATTR_GLOBAL, &info->data.envar, OPAL_ENVAR);
258 } else if (0 == strcmp(info->key, OPAL_PMIX_UNSET_ENVAR)) {
259 orte_add_attribute(&app->attributes, ORTE_APP_UNSET_ENVAR,
260 ORTE_ATTR_GLOBAL, info->data.string, OPAL_STRING);
261 } else if (0 == strcmp(info->key, OPAL_PMIX_PREPEND_ENVAR)) {
262 orte_add_attribute(&app->attributes, ORTE_APP_PREPEND_ENVAR,
263 ORTE_ATTR_GLOBAL, &info->data.envar, OPAL_ENVAR);
264 } else if (0 == strcmp(info->key, OPAL_PMIX_APPEND_ENVAR)) {
265 orte_add_attribute(&app->attributes, ORTE_APP_APPEND_ENVAR,
266 ORTE_ATTR_GLOBAL, &info->data.envar, OPAL_ENVAR);
267 } else if (0 == strcmp(info->key, OPAL_PMIX_PSET_NAME)) {
268 orte_set_attribute(&app->attributes, ORTE_APP_PSET_NAME,
269 ORTE_ATTR_GLOBAL, info->data.string, OPAL_STRING);
270
271 } else {
272
273 orte_show_help("help-orted.txt", "bad-key",
274 true, "spawn", "application", info->key);
275 }
276 }
277 }
278
279
280 OPAL_LIST_FOREACH_SAFE(info, next, job_info, opal_value_t) {
281
282 if (0 == strcmp(info->key, OPAL_PMIX_PERSONALITY)) {
283 jdata->personality = opal_argv_split(info->data.string, ',');
284
285
286 } else if (0 == strcmp(info->key, OPAL_PMIX_MAPPER)) {
287 jdata->map->req_mapper = strdup(info->data.string);
288
289
290 } else if (0 == strcmp(info->key, OPAL_PMIX_DISPLAY_MAP)) {
291 OPAL_CHECK_BOOL(info, jdata->map->display_map);
292
293
294 } else if (0 == strcmp(info->key, OPAL_PMIX_PPR)) {
295 if (ORTE_MAPPING_POLICY_IS_SET(jdata->map->mapping)) {
296
297 orte_show_help("help-orte-rmaps-base.txt", "redefining-policy",
298 true, "mapping", info->data.string,
299 orte_rmaps_base_print_mapping(orte_rmaps_base.mapping));
300 return ORTE_ERR_BAD_PARAM;
301 }
302 ORTE_SET_MAPPING_DIRECTIVE(jdata->map->mapping, ORTE_MAPPING_PPR);
303 jdata->map->ppr = strdup(info->data.string);
304
305
306 } else if (0 == strcmp(info->key, OPAL_PMIX_MAPBY)) {
307 if (ORTE_MAPPING_POLICY_IS_SET(jdata->map->mapping)) {
308
309 orte_show_help("help-orte-rmaps-base.txt", "redefining-policy",
310 true, "mapping", info->data.string,
311 orte_rmaps_base_print_mapping(orte_rmaps_base.mapping));
312 return ORTE_ERR_BAD_PARAM;
313 }
314 rc = orte_rmaps_base_set_mapping_policy(jdata, &jdata->map->mapping,
315 NULL, info->data.string);
316 if (ORTE_SUCCESS != rc) {
317 return rc;
318 }
319
320
321 } else if (0 == strcmp(info->key, OPAL_PMIX_RANKBY)) {
322 if (ORTE_RANKING_POLICY_IS_SET(jdata->map->ranking)) {
323
324 orte_show_help("help-orte-rmaps-base.txt", "redefining-policy",
325 true, "ranking", info->data.string,
326 orte_rmaps_base_print_ranking(orte_rmaps_base.ranking));
327 return ORTE_ERR_BAD_PARAM;
328 }
329 rc = orte_rmaps_base_set_ranking_policy(&jdata->map->ranking,
330 jdata->map->mapping,
331 info->data.string);
332 if (ORTE_SUCCESS != rc) {
333 return rc;
334 }
335
336
337 } else if (0 == strcmp(info->key, OPAL_PMIX_BINDTO)) {
338 if (OPAL_BINDING_POLICY_IS_SET(jdata->map->binding)) {
339
340 orte_show_help("help-opal-hwloc-base.txt", "redefining-policy", true,
341 info->data.string,
342 opal_hwloc_base_print_binding(opal_hwloc_binding_policy));
343 return ORTE_ERR_BAD_PARAM;
344 }
345 rc = opal_hwloc_base_set_binding_policy(&jdata->map->binding,
346 info->data.string);
347 if (ORTE_SUCCESS != rc) {
348 return rc;
349 }
350
351
352 } else if (0 == strcmp(info->key, OPAL_PMIX_CPUS_PER_PROC)) {
353 jdata->map->cpus_per_rank = info->data.uint32;
354
355
356 } else if (0 == strcmp(info->key, OPAL_PMIX_NO_PROCS_ON_HEAD)) {
357 OPAL_CHECK_BOOL(info, flag);
358 if (flag) {
359 ORTE_SET_MAPPING_DIRECTIVE(jdata->map->mapping, ORTE_MAPPING_NO_USE_LOCAL);
360 } else {
361 ORTE_UNSET_MAPPING_DIRECTIVE(jdata->map->mapping, ORTE_MAPPING_NO_USE_LOCAL);
362 }
363
364 ORTE_SET_MAPPING_DIRECTIVE(jdata->map->mapping, ORTE_MAPPING_LOCAL_GIVEN);
365
366
367 } else if (0 == strcmp(info->key, OPAL_PMIX_NO_OVERSUBSCRIBE)) {
368 OPAL_CHECK_BOOL(info, flag);
369 if (flag) {
370 ORTE_SET_MAPPING_DIRECTIVE(jdata->map->mapping, ORTE_MAPPING_NO_OVERSUBSCRIBE);
371 } else {
372 ORTE_UNSET_MAPPING_DIRECTIVE(jdata->map->mapping, ORTE_MAPPING_NO_OVERSUBSCRIBE);
373 }
374
375 ORTE_SET_MAPPING_DIRECTIVE(jdata->map->mapping, ORTE_MAPPING_SUBSCRIBE_GIVEN);
376
377
378 } else if (0 == strcmp(info->key, OPAL_PMIX_REPORT_BINDINGS)) {
379 OPAL_CHECK_BOOL(info, flag);
380 orte_set_attribute(&jdata->attributes, ORTE_JOB_REPORT_BINDINGS,
381 ORTE_ATTR_GLOBAL, &flag, OPAL_BOOL);
382
383
384 } else if (0 == strcmp(info->key, OPAL_PMIX_CPU_LIST)) {
385 orte_set_attribute(&jdata->attributes, ORTE_JOB_CPU_LIST,
386 ORTE_ATTR_GLOBAL, info->data.string, OPAL_BOOL);
387
388
389 } else if (0 == strcmp(info->key, OPAL_PMIX_JOB_RECOVERABLE)) {
390 OPAL_CHECK_BOOL(info, flag);
391 if (flag) {
392 ORTE_FLAG_SET(jdata, ORTE_JOB_FLAG_RECOVERABLE);
393 } else {
394 ORTE_FLAG_UNSET(jdata, ORTE_JOB_FLAG_RECOVERABLE);
395 }
396
397
398 } else if (0 == strcmp(info->key, OPAL_PMIX_MAX_RESTARTS)) {
399 for (i=0; i < jdata->apps->size; i++) {
400 if (NULL == (app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, i))) {
401 continue;
402 }
403 orte_set_attribute(&app->attributes, ORTE_APP_MAX_RESTARTS,
404 ORTE_ATTR_GLOBAL, &info->data.uint32, OPAL_INT32);
405 }
406
407
408 } else if (0 == strcmp(info->key, OPAL_PMIX_JOB_CONTINUOUS)) {
409 OPAL_CHECK_BOOL(info, flag);
410 orte_set_attribute(&jdata->attributes, ORTE_JOB_CONTINUOUS_OP,
411 ORTE_ATTR_GLOBAL, &flag, OPAL_BOOL);
412
413
414 } else if (0 == strcmp(info->key, OPAL_PMIX_NON_PMI)) {
415 OPAL_CHECK_BOOL(info, flag);
416 orte_set_attribute(&jdata->attributes, ORTE_JOB_NON_ORTE_JOB,
417 ORTE_ATTR_GLOBAL, &flag, OPAL_BOOL);
418
419
420 } else if (0 == strcmp(info->key, OPAL_PMIX_REQUESTOR_IS_TOOL)) {
421 OPAL_CHECK_BOOL(info, flag);
422 orte_set_attribute(&jdata->attributes, ORTE_JOB_DVM_JOB,
423 ORTE_ATTR_GLOBAL, &flag, OPAL_BOOL);
424 if (flag) {
425
426 orte_set_attribute(&jdata->attributes, ORTE_JOB_FWDIO_TO_TOOL,
427 ORTE_ATTR_GLOBAL, &flag, OPAL_BOOL);
428 }
429
430
431 } else if (0 == strcmp(info->key, OPAL_PMIX_NOTIFY_COMPLETION)) {
432 OPAL_CHECK_BOOL(info, flag);
433 orte_set_attribute(&jdata->attributes, ORTE_JOB_NOTIFY_COMPLETION,
434 ORTE_ATTR_GLOBAL, &flag, OPAL_BOOL);
435
436
437 } else if (0 == strcmp(info->key, OPAL_PMIX_DEBUG_STOP_ON_EXEC)) {
438
439 return ORTE_ERR_NOT_SUPPORTED;
440
441
442 } else if (0 == strcmp(info->key, OPAL_PMIX_TAG_OUTPUT)) {
443 OPAL_CHECK_BOOL(info, flag);
444 orte_set_attribute(&jdata->attributes, ORTE_JOB_TAG_OUTPUT,
445 ORTE_ATTR_GLOBAL, &flag, OPAL_BOOL);
446
447
448 } else if (0 == strcmp(info->key, OPAL_PMIX_TIMESTAMP_OUTPUT)) {
449 OPAL_CHECK_BOOL(info, flag);
450 orte_set_attribute(&jdata->attributes, ORTE_JOB_TIMESTAMP_OUTPUT,
451 ORTE_ATTR_GLOBAL, &flag, OPAL_BOOL);
452
453
454 } else if (0 == strcmp(info->key, OPAL_PMIX_OUTPUT_TO_FILE)) {
455 orte_set_attribute(&jdata->attributes, ORTE_JOB_OUTPUT_TO_FILE,
456 ORTE_ATTR_GLOBAL, info->data.string, OPAL_STRING);
457
458
459 } else if (0 == strcmp(info->key, OPAL_PMIX_MERGE_STDERR_STDOUT)) {
460 OPAL_CHECK_BOOL(info, flag);
461 orte_set_attribute(&jdata->attributes, ORTE_JOB_MERGE_STDERR_STDOUT,
462 ORTE_ATTR_GLOBAL, &flag, OPAL_BOOL);
463
464
465 } else if (0 == strcmp(info->key, OPAL_PMIX_STDIN_TGT)) {
466 if (0 == strcmp(info->data.string, "all")) {
467 jdata->stdin_target = ORTE_VPID_WILDCARD;
468 } else if (0 == strcmp(info->data.string, "none")) {
469 jdata->stdin_target = ORTE_VPID_INVALID;
470 } else {
471 jdata->stdin_target = strtoul(info->data.string, NULL, 10);
472 }
473
474
475 } else if (0 == strcmp(info->key, OPAL_PMIX_INDEX_ARGV)) {
476 OPAL_CHECK_BOOL(info, flag);
477 orte_set_attribute(&jdata->attributes, ORTE_JOB_INDEX_ARGV,
478 ORTE_ATTR_GLOBAL, &flag, OPAL_BOOL);
479
480
481 } else if (0 == strcmp(info->key, OPAL_PMIX_DEBUGGER_DAEMONS)) {
482 ORTE_FLAG_SET(jdata, ORTE_JOB_FLAG_DEBUGGER_DAEMON);
483 ORTE_SET_MAPPING_DIRECTIVE(jdata->map->mapping, ORTE_MAPPING_DEBUGGER);
484
485
486
487 } else if (0 == strcmp(info->key, OPAL_PMIX_SET_ENVAR)) {
488 orte_add_attribute(&jdata->attributes, ORTE_JOB_SET_ENVAR,
489 ORTE_ATTR_GLOBAL, &info->data.envar, OPAL_ENVAR);
490 } else if (0 == strcmp(info->key, OPAL_PMIX_ADD_ENVAR)) {
491 orte_add_attribute(&jdata->attributes, ORTE_JOB_ADD_ENVAR,
492 ORTE_ATTR_GLOBAL, &info->data.envar, OPAL_ENVAR);
493 } else if (0 == strcmp(info->key, OPAL_PMIX_UNSET_ENVAR)) {
494 orte_add_attribute(&jdata->attributes, ORTE_JOB_UNSET_ENVAR,
495 ORTE_ATTR_GLOBAL, info->data.string, OPAL_STRING);
496 } else if (0 == strcmp(info->key, OPAL_PMIX_PREPEND_ENVAR)) {
497 orte_add_attribute(&jdata->attributes, ORTE_JOB_PREPEND_ENVAR,
498 ORTE_ATTR_GLOBAL, &info->data.envar, OPAL_ENVAR);
499 } else if (0 == strcmp(info->key, OPAL_PMIX_APPEND_ENVAR)) {
500 orte_add_attribute(&jdata->attributes, ORTE_JOB_APPEND_ENVAR,
501 ORTE_ATTR_GLOBAL, &info->data.envar, OPAL_ENVAR);
502
503
504 } else {
505
506 cache = NULL;
507 opal_list_remove_item(job_info, &info->super);
508 if (orte_get_attribute(&jdata->attributes, ORTE_JOB_INFO_CACHE, (void**)&cache, OPAL_PTR) &&
509 NULL != cache) {
510 opal_list_append(cache, &info->super);
511 } else {
512 cache = OBJ_NEW(opal_list_t);
513 opal_list_append(cache, &info->super);
514 orte_set_attribute(&jdata->attributes, ORTE_JOB_INFO_CACHE, ORTE_ATTR_LOCAL, (void*)cache, OPAL_PTR);
515 }
516 }
517 }
518
519 if (NULL == jdata->personality) {
520 opal_argv_append_nosize(&jdata->personality, "ompi");
521 }
522
523
524 orte_set_attribute(&jdata->attributes, ORTE_JOB_LAUNCH_PROXY,
525 ORTE_ATTR_GLOBAL, requestor, OPAL_NAME);
526
527
528
529
530 ORTE_SPN_REQ(jdata, spawn, cbfunc, cbdata);
531
532 return OPAL_SUCCESS;
533 }
534
535 static void _cnct(int sd, short args, void *cbdata);
536
537 static void _cnlk(int status, opal_list_t *data, void *cbdata)
538 {
539 orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
540 int rc, cnt;
541 opal_pmix_pdata_t *pdat;
542 orte_job_t *jdata;
543 orte_node_t *node;
544 orte_proc_t *proc;
545 opal_buffer_t buf, bucket;
546 opal_byte_object_t *bo;
547 orte_process_name_t dmn, pname;
548 char *uri;
549 opal_value_t val;
550 opal_list_t nodes;
551
552 ORTE_ACQUIRE_OBJECT(cd);
553
554
555
556 if (ORTE_SUCCESS != status || NULL == data) {
557 if (NULL != cd->cbfunc) {
558 rc = status;
559 goto release;
560 }
561 }
562
563
564 pdat = (opal_pmix_pdata_t*)opal_list_get_first(data);
565 if (OPAL_BYTE_OBJECT != pdat->value.type) {
566 rc = ORTE_ERR_BAD_PARAM;
567 ORTE_ERROR_LOG(rc);
568 goto release;
569 }
570
571 OBJ_CONSTRUCT(&buf, opal_buffer_t);
572 opal_dss.load(&buf, pdat->value.data.bo.bytes, pdat->value.data.bo.size);
573 pdat->value.data.bo.bytes = NULL;
574 pdat->value.data.bo.size = 0;
575 cnt = 1;
576 if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &jdata, &cnt, ORTE_JOB))) {
577 ORTE_ERROR_LOG(rc);
578 OBJ_DESTRUCT(&buf);
579 goto release;
580 }
581
582
583 cnt=1;
584 if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &bo, &cnt, OPAL_BYTE_OBJECT))) {
585 ORTE_ERROR_LOG(rc);
586 OBJ_DESTRUCT(&buf);
587 goto release;
588 }
589
590 OBJ_CONSTRUCT(&bucket, opal_buffer_t);
591 opal_dss.load(&bucket, bo->bytes, bo->size);
592 bo->bytes = NULL;
593 free(bo);
594
595 OBJ_CONSTRUCT(&nodes, opal_list_t);
596
597 cnt = 1;
598 while (OPAL_SUCCESS == (rc = opal_dss.unpack(&bucket, &uri, &cnt, OPAL_STRING))) {
599 rc = orte_rml_base_parse_uris(uri, &dmn, NULL);
600 if (ORTE_SUCCESS != rc) {
601 OBJ_DESTRUCT(&buf);
602 OBJ_DESTRUCT(&bucket);
603 goto release;
604 }
605
606 node = OBJ_NEW(orte_node_t);
607 node->daemon = OBJ_NEW(orte_proc_t);
608 memcpy(&node->daemon->name, &dmn, sizeof(orte_process_name_t));
609 opal_list_append(&nodes, &node->super);
610
611 OBJ_CONSTRUCT(&val, opal_value_t);
612 val.key = OPAL_PMIX_PROC_URI;
613 val.type = OPAL_STRING;
614 val.data.string = uri;
615 if (OPAL_SUCCESS != (rc = opal_pmix.store_local(&dmn, &val))) {
616 ORTE_ERROR_LOG(rc);
617 val.key = NULL;
618 val.data.string = NULL;
619 OBJ_DESTRUCT(&val);
620 OBJ_DESTRUCT(&buf);
621 OBJ_DESTRUCT(&bucket);
622 goto release;
623 }
624 val.key = NULL;
625 val.data.string = NULL;
626 OBJ_DESTRUCT(&val);
627 cnt = 1;
628 }
629 OBJ_DESTRUCT(&bucket);
630
631
632 cnt=1;
633 if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &bo, &cnt, OPAL_BYTE_OBJECT))) {
634 ORTE_ERROR_LOG(rc);
635 OBJ_DESTRUCT(&buf);
636 goto release;
637 }
638
639 OBJ_CONSTRUCT(&bucket, opal_buffer_t);
640 opal_dss.load(&bucket, bo->bytes, bo->size);
641 bo->bytes = NULL;
642 free(bo);
643
644 cnt = 1;
645 while (OPAL_SUCCESS == (rc = opal_dss.unpack(&bucket, &pname, &cnt, ORTE_NAME))) {
646
647 if (OPAL_SUCCESS != (rc = opal_dss.unpack(&bucket, &dmn, &cnt, ORTE_NAME))) {
648 OBJ_DESTRUCT(&buf);
649 OBJ_DESTRUCT(&bucket);
650 goto release;
651 }
652
653 proc = OBJ_NEW(orte_proc_t);
654 memcpy(&proc->name, &pname, sizeof(orte_process_name_t));
655 opal_pointer_array_set_item(jdata->procs, pname.vpid, proc);
656
657 OPAL_LIST_FOREACH(node, &nodes, orte_node_t) {
658 if (node->daemon->name.vpid == dmn.vpid) {
659 OBJ_RETAIN(node);
660 proc->node = node;
661 break;
662 }
663 }
664 }
665 OBJ_DESTRUCT(&bucket);
666 OPAL_LIST_DESTRUCT(&nodes);
667 OBJ_DESTRUCT(&buf);
668
669
670 if (ORTE_SUCCESS != (rc = orte_pmix_server_register_nspace(jdata, true))) {
671 ORTE_ERROR_LOG(rc);
672 OBJ_RELEASE(jdata);
673 goto release;
674 }
675
676
677 opal_hash_table_set_value_uint32(orte_job_data, jdata->jobid, jdata);
678
679
680 ORTE_PMIX_OPERATION(cd->procs, cd->info, _cnct, cd->cbfunc, cd->cbdata);
681
682 cd->procs = NULL;
683 cd->info = NULL;
684 OBJ_RELEASE(cd);
685 return;
686
687 release:
688 if (NULL != cd->cbfunc) {
689 cd->cbfunc(rc, cd->cbdata);
690 }
691 OBJ_RELEASE(cd);
692 }
693
694 static void _cnct(int sd, short args, void *cbdata)
695 {
696 orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
697 orte_namelist_t *nm;
698 char **keys = NULL, *key;
699 orte_job_t *jdata;
700 int rc = ORTE_SUCCESS;
701 opal_value_t *kv;
702
703 ORTE_ACQUIRE_OBJECT(cd);
704
705
706
707
708
709
710
711
712
713
714
715 OPAL_LIST_FOREACH(nm, cd->procs, orte_namelist_t) {
716
717 if (NULL == (jdata = orte_get_job_data_object(nm->name.jobid))) {
718
719
720
721 if (orte_pmix_server_globals.server.jobid == ORTE_PROC_MY_HNP->jobid &&
722 orte_pmix_server_globals.server.vpid == ORTE_PROC_MY_HNP->vpid) {
723 ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED);
724 rc = ORTE_ERR_NOT_SUPPORTED;
725 goto release;
726 }
727
728
729 orte_util_convert_jobid_to_string(&key, nm->name.jobid);
730 opal_argv_append_nosize(&keys, key);
731 free(key);
732
733 kv = OBJ_NEW(opal_value_t);
734 kv->key = strdup(OPAL_PMIX_USERID);
735 kv->type = OPAL_UINT32;
736 kv->data.uint32 = geteuid();
737 opal_list_append(cd->info, &kv->super);
738 if (ORTE_SUCCESS != (rc = pmix_server_lookup_fn(&nm->name, keys, cd->info, _cnlk, cd))) {
739 ORTE_ERROR_LOG(rc);
740 opal_argv_free(keys);
741 goto release;
742 }
743 opal_argv_free(keys);
744
745
746 return;
747 }
748
749
750 if (!orte_get_attribute(&jdata->attributes, ORTE_JOB_NSPACE_REGISTERED, NULL, OPAL_BOOL)) {
751
752 if (ORTE_SUCCESS != (rc = orte_pmix_server_register_nspace(jdata, true))) {
753 ORTE_ERROR_LOG(rc);
754 goto release;
755 }
756 }
757 }
758
759 release:
760 if (NULL != cd->cbfunc) {
761 cd->cbfunc(rc, cd->cbdata);
762 }
763 OBJ_RELEASE(cd);
764 }
765
766 int pmix_server_connect_fn(opal_list_t *procs, opal_list_t *info,
767 opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
768 {
769 opal_output_verbose(2, orte_pmix_server_globals.output,
770 "%s connect called with %d procs",
771 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
772 (int)opal_list_get_size(procs));
773
774
775 if (NULL == procs || 0 == opal_list_get_size(procs)) {
776 return ORTE_ERR_BAD_PARAM;
777 }
778
779 ORTE_PMIX_OPERATION(procs, info, _cnct, cbfunc, cbdata);
780 return ORTE_SUCCESS;
781 }
782
783 static void mdxcbfunc(int status,
784 const char *data, size_t ndata, void *cbdata,
785 opal_pmix_release_cbfunc_t relcbfunc, void *relcbdata)
786 {
787 orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
788
789 ORTE_ACQUIRE_OBJECT(cd);
790
791
792 if (NULL != cd->cbfunc) {
793 cd->cbfunc(status, cd->cbdata);
794 }
795 OBJ_RELEASE(cd);
796 }
797
798 int pmix_server_disconnect_fn(opal_list_t *procs, opal_list_t *info,
799 opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
800 {
801 orte_pmix_server_op_caddy_t *cd;
802 int rc;
803
804 opal_output_verbose(2, orte_pmix_server_globals.output,
805 "%s disconnect called with %d procs",
806 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
807 (int)opal_list_get_size(procs));
808
809
810
811
812
813
814 cd = OBJ_NEW(orte_pmix_server_op_caddy_t);
815 cd->cbfunc = cbfunc;
816 cd->cbdata = cbdata;
817
818 if (ORTE_SUCCESS != (rc = pmix_server_fencenb_fn(procs, info, NULL, 0,
819 mdxcbfunc, cd))) {
820 OBJ_RELEASE(cd);
821 }
822
823 return rc;
824 }
825
826 int pmix_server_alloc_fn(const opal_process_name_t *requestor,
827 opal_pmix_alloc_directive_t dir,
828 opal_list_t *info,
829 opal_pmix_info_cbfunc_t cbfunc,
830 void *cbdata)
831 {
832
833 return ORTE_ERR_NOT_SUPPORTED;
834 }