This source file includes following definitions.
- _client_conn
- pmix_server_client_connected_fn
- _client_finalized
- pmix_server_client_finalized_fn
- _client_abort
- pmix_server_abort_fn
- _register_events
- pmix_server_register_events_fn
- _deregister_events
- pmix_server_deregister_events_fn
- _notify_release
- pmix_server_notify
- pmix_server_notify_event
- qrel
- _query
- pmix_server_query_fn
- _toolconn
- pmix_tool_connected_fn
- lgcbfn
- pmix_server_log_fn
- pmix_server_job_ctrl_fn
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 #include "orte_config.h"
30
31 #ifdef HAVE_UNISTD_H
32 #include <unistd.h>
33 #endif
34
35 #include "opal/util/argv.h"
36 #include "opal/util/output.h"
37 #include "opal/dss/dss.h"
38 #include "opal/mca/hwloc/hwloc-internal.h"
39 #include "opal/mca/pstat/pstat.h"
40
41 #include "orte/mca/errmgr/errmgr.h"
42 #include "orte/mca/iof/iof.h"
43 #include "orte/mca/rmaps/rmaps_types.h"
44 #include "orte/mca/schizo/schizo.h"
45 #include "orte/mca/state/state.h"
46 #include "orte/util/name_fns.h"
47 #include "orte/util/threads.h"
48 #include "orte/runtime/orte_globals.h"
49 #include "orte/mca/rml/rml.h"
50 #include "orte/mca/plm/plm.h"
51 #include "orte/mca/plm/base/plm_private.h"
52
53 #include "pmix_server_internal.h"
54
55 static void _client_conn(int sd, short args, void *cbdata)
56 {
57 orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
58 orte_job_t *jdata;
59 orte_proc_t *p, *ptr;
60 int i;
61
62 ORTE_ACQUIRE_OBJECT(cd);
63
64 if (NULL != cd->server_object) {
65
66 p = (orte_proc_t*)cd->server_object;
67 } else {
68
69 p = NULL;
70 if (NULL == (jdata = orte_get_job_data_object(cd->proc.jobid))) {
71 return;
72 }
73 for (i=0; i < jdata->procs->size; i++) {
74 if (NULL == (ptr = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) {
75 continue;
76 }
77 if (cd->proc.jobid != ptr->name.jobid) {
78 continue;
79 }
80 if (cd->proc.vpid == ptr->name.vpid) {
81 p = ptr;
82 break;
83 }
84 }
85 }
86 if (NULL != p) {
87 ORTE_FLAG_SET(p, ORTE_PROC_FLAG_REG);
88 ORTE_ACTIVATE_PROC_STATE(&p->name, ORTE_PROC_STATE_REGISTERED);
89 }
90 if (NULL != cd->cbfunc) {
91 cd->cbfunc(OPAL_SUCCESS, cd->cbdata);
92 }
93 OBJ_RELEASE(cd);
94 }
95
96 int pmix_server_client_connected_fn(opal_process_name_t *proc, void *server_object,
97 opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
98 {
99
100
101 ORTE_PMIX_THREADSHIFT(proc, server_object, OPAL_SUCCESS, NULL,
102 NULL, _client_conn, cbfunc, cbdata);
103 return ORTE_SUCCESS;
104 }
105
106 static void _client_finalized(int sd, short args, void *cbdata)
107 {
108 orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
109 orte_job_t *jdata;
110 orte_proc_t *p, *ptr;
111 int i;
112
113 ORTE_ACQUIRE_OBJECT(cd);
114
115 if (NULL != cd->server_object) {
116
117 p = (orte_proc_t*)cd->server_object;
118 } else {
119
120 p = NULL;
121 if (NULL == (jdata = orte_get_job_data_object(cd->proc.jobid))) {
122 return;
123 }
124 for (i=0; i < jdata->procs->size; i++) {
125 if (NULL == (ptr = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) {
126 continue;
127 }
128 if (cd->proc.jobid != ptr->name.jobid) {
129 continue;
130 }
131 if (cd->proc.vpid == ptr->name.vpid) {
132 p = ptr;
133 break;
134 }
135 }
136
137
138
139
140
141 ORTE_FLAG_SET(p, ORTE_PROC_FLAG_IOF_COMPLETE);
142 ORTE_FLAG_SET(p, ORTE_PROC_FLAG_WAITPID);
143 ORTE_ACTIVATE_PROC_STATE(&cd->proc, ORTE_PROC_STATE_TERMINATED);
144 }
145 if (NULL != p) {
146 ORTE_FLAG_SET(p, ORTE_PROC_FLAG_HAS_DEREG);
147
148 if (NULL != cd->cbfunc) {
149 cd->cbfunc(ORTE_SUCCESS, cd->cbdata);
150 }
151 }
152 OBJ_RELEASE(cd);
153 }
154
155 int pmix_server_client_finalized_fn(opal_process_name_t *proc, void *server_object,
156 opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
157 {
158
159
160 ORTE_PMIX_THREADSHIFT(proc, server_object, OPAL_SUCCESS, NULL,
161 NULL, _client_finalized, cbfunc, cbdata);
162 return ORTE_SUCCESS;
163
164 }
165
166 static void _client_abort(int sd, short args, void *cbdata)
167 {
168 orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
169 orte_job_t *jdata;
170 orte_proc_t *p, *ptr;
171 int i;
172
173 ORTE_ACQUIRE_OBJECT(cd);
174
175 if (NULL != cd->server_object) {
176 p = (orte_proc_t*)cd->server_object;
177 } else {
178
179 p = NULL;
180 if (NULL == (jdata = orte_get_job_data_object(cd->proc.jobid))) {
181 return;
182 }
183 for (i=0; i < jdata->procs->size; i++) {
184 if (NULL == (ptr = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) {
185 continue;
186 }
187 if (cd->proc.jobid != ptr->name.jobid) {
188 continue;
189 }
190 if (cd->proc.vpid == ptr->name.vpid) {
191 p = ptr;
192 break;
193 }
194 }
195 }
196 if (NULL != p) {
197 p->exit_code = cd->status;
198 ORTE_ACTIVATE_PROC_STATE(&p->name, ORTE_PROC_STATE_CALLED_ABORT);
199 }
200
201
202 if (NULL != cd->cbfunc) {
203 cd->cbfunc(OPAL_SUCCESS, cd->cbdata);
204 }
205 OBJ_RELEASE(cd);
206 }
207
208 int pmix_server_abort_fn(opal_process_name_t *proc, void *server_object,
209 int status, const char msg[],
210 opal_list_t *procs_to_abort,
211 opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
212 {
213
214
215 ORTE_PMIX_THREADSHIFT(proc, server_object, status, msg,
216 procs_to_abort, _client_abort, cbfunc, cbdata);
217 return ORTE_SUCCESS;
218 }
219
220 static void _register_events(int sd, short args, void *cbdata)
221 {
222 orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
223 opal_value_t *info;
224
225 ORTE_ACQUIRE_OBJECT(cd);
226
227
228
229 while (NULL != (info = (opal_value_t*)opal_list_remove_first(cd->info))) {
230
231
232 opal_list_append(&orte_pmix_server_globals.notifications, &info->super);
233 }
234
235 if (NULL != cd->cbfunc) {
236 cd->cbfunc(ORTE_SUCCESS, cd->cbdata);
237 }
238 OBJ_RELEASE(cd);
239 }
240
241
242
243
244 int pmix_server_register_events_fn(opal_list_t *info,
245 opal_pmix_op_cbfunc_t cbfunc,
246 void *cbdata)
247 {
248
249
250 ORTE_PMIX_OPERATION(NULL, info, _register_events, cbfunc, cbdata);
251 return ORTE_SUCCESS;
252 }
253
254 static void _deregister_events(int sd, short args, void *cbdata)
255 {
256 orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
257 opal_value_t *info, *iptr, *nptr;
258
259 ORTE_ACQUIRE_OBJECT(cd);
260
261
262
263 while (NULL != (info = (opal_value_t*)opal_list_remove_first(cd->info))) {
264
265 OPAL_LIST_FOREACH_SAFE(iptr, nptr, &orte_pmix_server_globals.notifications, opal_value_t) {
266 if (OPAL_EQUAL == opal_dss.compare(iptr, info, OPAL_VALUE)) {
267 opal_list_remove_item(&orte_pmix_server_globals.notifications, &iptr->super);
268 OBJ_RELEASE(iptr);
269 break;
270 }
271 }
272 OBJ_RELEASE(info);
273 }
274
275 if (NULL != cd->cbfunc) {
276 cd->cbfunc(ORTE_SUCCESS, cd->cbdata);
277 }
278 OBJ_RELEASE(cd);
279 }
280
281
282 int pmix_server_deregister_events_fn(opal_list_t *info,
283 opal_pmix_op_cbfunc_t cbfunc,
284 void *cbdata)
285 {
286
287
288 ORTE_PMIX_OPERATION(NULL, info, _deregister_events, cbfunc, cbdata);
289 return ORTE_SUCCESS;
290 }
291
292 static void _notify_release(int status, void *cbdata)
293 {
294 orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
295
296 ORTE_ACQUIRE_OBJECT(cd);
297
298 if (NULL != cd->info) {
299 OPAL_LIST_RELEASE(cd->info);
300 }
301 OBJ_RELEASE(cd);
302 }
303
304
305
306 void pmix_server_notify(int status, orte_process_name_t* sender,
307 opal_buffer_t *buffer,
308 orte_rml_tag_t tg, void *cbdata)
309 {
310 int cnt, rc, ret, ninfo, n;
311 opal_value_t *val;
312 orte_pmix_server_op_caddy_t *cd;
313 orte_process_name_t source;
314
315 opal_output_verbose(2, orte_pmix_server_globals.output,
316 "%s Notification received from %s",
317 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
318 ORTE_NAME_PRINT(sender));
319
320
321 cnt = 1;
322 if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &ret, &cnt, OPAL_INT))) {
323 ORTE_ERROR_LOG(rc);
324 return;
325 }
326
327
328 cnt = 1;
329 if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &source, &cnt, ORTE_NAME))) {
330 ORTE_ERROR_LOG(rc);
331 return;
332 }
333
334
335 cnt = 1;
336 if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &ninfo, &cnt, OPAL_INT))) {
337 ORTE_ERROR_LOG(rc);
338 return;
339 }
340
341
342 cd = OBJ_NEW(orte_pmix_server_op_caddy_t);
343
344 if (0 < ninfo) {
345 cd->info = OBJ_NEW(opal_list_t);
346 for (n=0; n < ninfo; n++) {
347 val = OBJ_NEW(opal_value_t);
348 if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &val, &cnt, OPAL_VALUE))) {
349 ORTE_ERROR_LOG(rc);
350 OBJ_RELEASE(val);
351 OPAL_LIST_RELEASE(cd->info);
352 OBJ_RELEASE(cd);
353 return;
354 }
355 opal_list_append(cd->info, &val->super);
356 }
357 }
358
359
360
361 if (NULL == cd->info) {
362 cd->info = OBJ_NEW(opal_list_t);
363 }
364 val = OBJ_NEW(opal_value_t);
365 val->key = strdup("orte.notify.donotloop");
366 val->type = OPAL_BOOL;
367 val->data.flag = true;
368 opal_list_append(cd->info, &val->super);
369
370 opal_output_verbose(2, orte_pmix_server_globals.output,
371 "%s NOTIFYING PMIX SERVER OF STATUS %d",
372 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ret);
373 if (OPAL_SUCCESS != (rc = opal_pmix.server_notify_event(ret, &source, cd->info, _notify_release, cd))) {
374 ORTE_ERROR_LOG(rc);
375 if (NULL != cd->info) {
376 OPAL_LIST_RELEASE(cd->info);
377 }
378 OBJ_RELEASE(cd);
379 }
380 }
381
382 int pmix_server_notify_event(int code, opal_process_name_t *source,
383 opal_list_t *info,
384 opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
385 {
386 opal_buffer_t *buf;
387 int rc, ninfo;
388 opal_value_t *val;
389 orte_grpcomm_signature_t *sig;
390
391 opal_output_verbose(2, orte_pmix_server_globals.output,
392 "%s local process %s generated event code %d",
393 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
394 ORTE_NAME_PRINT(source), code);
395
396
397 OPAL_LIST_FOREACH(val, info, opal_value_t) {
398 if (0 == strcmp(val->key, "orte.notify.donotloop")) {
399
400 goto done;
401 }
402 }
403
404
405
406
407 buf = OBJ_NEW(opal_buffer_t);
408 if (NULL == buf) {
409 return ORTE_ERR_OUT_OF_RESOURCE;
410 }
411
412 if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &code, 1, OPAL_INT))) {
413 ORTE_ERROR_LOG(rc);
414 OBJ_RELEASE(buf);
415 return rc;
416 }
417
418 if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, source, 1, ORTE_NAME))) {
419 ORTE_ERROR_LOG(rc);
420 OBJ_RELEASE(buf);
421 return rc;
422 }
423
424
425 if (NULL == info) {
426 ninfo = 0;
427 } else {
428 ninfo = opal_list_get_size(info);
429 }
430 if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &ninfo, 1, OPAL_INT))) {
431 ORTE_ERROR_LOG(rc);
432 OBJ_RELEASE(buf);
433 return rc;
434 }
435 if (0 < ninfo) {
436 OPAL_LIST_FOREACH(val, info, opal_value_t) {
437 if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &val, 1, OPAL_VALUE))) {
438 ORTE_ERROR_LOG(rc);
439 OBJ_RELEASE(buf);
440 return rc;
441 }
442 }
443 }
444
445
446 sig = OBJ_NEW(orte_grpcomm_signature_t);
447 if (NULL == sig) {
448 OBJ_RELEASE(buf);
449 return ORTE_ERR_OUT_OF_RESOURCE;
450 }
451 sig->signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t));
452 if (NULL == sig->signature) {
453 OBJ_RELEASE(buf);
454 OBJ_RELEASE(sig);
455 return ORTE_ERR_OUT_OF_RESOURCE;
456 }
457 sig->signature[0].jobid = ORTE_PROC_MY_NAME->jobid;
458 sig->signature[0].vpid = ORTE_VPID_WILDCARD;
459 sig->sz = 1;
460 if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(sig, ORTE_RML_TAG_NOTIFICATION, buf))) {
461 ORTE_ERROR_LOG(rc);
462 OBJ_RELEASE(buf);
463 OBJ_RELEASE(sig);
464 return rc;
465 }
466 OBJ_RELEASE(buf);
467
468 OBJ_RELEASE(sig);
469
470 done:
471
472 if (NULL != cbfunc) {
473 cbfunc(ORTE_SUCCESS, cbdata);
474 }
475 return ORTE_SUCCESS;
476 }
477
478 static void qrel(void *cbdata)
479 {
480 opal_list_t *l = (opal_list_t*)cbdata;
481 OPAL_LIST_RELEASE(l);
482 }
483 static void _query(int sd, short args, void *cbdata)
484 {
485 orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
486 opal_pmix_query_t *q;
487 opal_value_t *kv;
488 orte_jobid_t jobid;
489 orte_job_t *jdata;
490 orte_proc_t *proct;
491 orte_app_context_t *app;
492 int rc = ORTE_SUCCESS, i, k, num_replies;
493 opal_list_t *results, targets, *array;
494 size_t n;
495 uint32_t key;
496 void *nptr;
497 char **nspaces=NULL, nspace[512];
498 char **ans = NULL;
499 bool local_only;
500 orte_namelist_t *nm;
501 opal_pstats_t pstat;
502 float pss;
503
504 ORTE_ACQUIRE_OBJECT(cd);
505
506 opal_output_verbose(2, orte_pmix_server_globals.output,
507 "%s processing query",
508 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
509
510 results = OBJ_NEW(opal_list_t);
511
512
513 OPAL_LIST_FOREACH(q, cd->info, opal_pmix_query_t) {
514 for (n=0; NULL != q->keys[n]; n++) {
515 opal_output_verbose(2, orte_pmix_server_globals.output,
516 "%s processing key %s",
517 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), q->keys[n]);
518 if (0 == strcmp(q->keys[n], OPAL_PMIX_QUERY_NAMESPACES)) {
519
520 rc = opal_hash_table_get_first_key_uint32(orte_job_data, &key, (void **)&jdata, &nptr);
521 while (OPAL_SUCCESS == rc) {
522 if (ORTE_PROC_MY_NAME->jobid != jdata->jobid) {
523 memset(nspace, 0, 512);
524 (void)opal_snprintf_jobid(nspace, 512, jdata->jobid);
525 opal_argv_append_nosize(&nspaces, nspace);
526 }
527 rc = opal_hash_table_get_next_key_uint32(orte_job_data, &key, (void **)&jdata, nptr, &nptr);
528 }
529
530 kv = OBJ_NEW(opal_value_t);
531 kv->key = strdup(OPAL_PMIX_QUERY_NAMESPACES);
532 kv->type = OPAL_STRING;
533 if (NULL != nspaces) {
534 kv->data.string = opal_argv_join(nspaces, ',');
535 } else {
536 kv->data.string = NULL;
537 }
538 opal_list_append(results, &kv->super);
539 } else if (0 == strcmp(q->keys[n], OPAL_PMIX_QUERY_SPAWN_SUPPORT)) {
540 opal_argv_append_nosize(&ans, OPAL_PMIX_HOST);
541 opal_argv_append_nosize(&ans, OPAL_PMIX_HOSTFILE);
542 opal_argv_append_nosize(&ans, OPAL_PMIX_ADD_HOST);
543 opal_argv_append_nosize(&ans, OPAL_PMIX_ADD_HOSTFILE);
544 opal_argv_append_nosize(&ans, OPAL_PMIX_PREFIX);
545 opal_argv_append_nosize(&ans, OPAL_PMIX_WDIR);
546 opal_argv_append_nosize(&ans, OPAL_PMIX_MAPPER);
547 opal_argv_append_nosize(&ans, OPAL_PMIX_PPR);
548 opal_argv_append_nosize(&ans, OPAL_PMIX_MAPBY);
549 opal_argv_append_nosize(&ans, OPAL_PMIX_RANKBY);
550 opal_argv_append_nosize(&ans, OPAL_PMIX_BINDTO);
551
552 kv = OBJ_NEW(opal_value_t);
553 kv->key = strdup(OPAL_PMIX_QUERY_SPAWN_SUPPORT);
554 kv->type = OPAL_STRING;
555 kv->data.string = opal_argv_join(ans, ',');
556 opal_list_append(results, &kv->super);
557 opal_argv_free(ans);
558 ans = NULL;
559 } else if (0 == strcmp(q->keys[n], OPAL_PMIX_QUERY_DEBUG_SUPPORT)) {
560 opal_argv_append_nosize(&ans, OPAL_PMIX_DEBUG_STOP_IN_INIT);
561 opal_argv_append_nosize(&ans, OPAL_PMIX_DEBUG_JOB);
562 opal_argv_append_nosize(&ans, OPAL_PMIX_DEBUG_WAIT_FOR_NOTIFY);
563
564 kv = OBJ_NEW(opal_value_t);
565 kv->key = strdup(OPAL_PMIX_QUERY_DEBUG_SUPPORT);
566 kv->type = OPAL_STRING;
567 kv->data.string = opal_argv_join(ans, ',');
568 opal_list_append(results, &kv->super);
569 opal_argv_free(ans);
570 ans = NULL;
571 } else if (0 == strcmp(q->keys[n], OPAL_PMIX_QUERY_MEMORY_USAGE)) {
572 OBJ_CONSTRUCT(&targets, opal_list_t);
573
574
575
576 if (0 == opal_list_get_size(&q->qualifiers)) {
577
578
579
580
581 OPAL_LIST_DESTRUCT(&targets);
582 return;
583 }
584
585
586 local_only = false;
587 OPAL_LIST_FOREACH(kv, &q->qualifiers, opal_value_t) {
588 if (0 == strcmp(kv->key, OPAL_PMIX_QUERY_LOCAL_ONLY)) {
589 if (OPAL_UNDEF == kv->type || kv->data.flag) {
590 local_only = true;
591 }
592 } else if (0 == strcmp(kv->key, OPAL_PMIX_PROCID)) {
593
594 nm = OBJ_NEW(orte_namelist_t);
595 memcpy(&nm->name, &kv->data.name, sizeof(opal_process_name_t));
596 opal_list_append(&targets, &nm->super);
597 }
598 }
599
600
601
602 if (local_only) {
603 if (0 == opal_list_get_size(&targets)) {
604 kv = OBJ_NEW(opal_value_t);
605 kv->key = strdup(OPAL_PMIX_QUERY_MEMORY_USAGE);
606 kv->type = OPAL_PTR;
607 array = OBJ_NEW(opal_list_t);
608 kv->data.ptr = array;
609 opal_list_append(results, &kv->super);
610
611 OBJ_CONSTRUCT(&pstat, opal_pstats_t);
612 opal_pstat.query(orte_process_info.pid, &pstat, NULL);
613 kv = OBJ_NEW(opal_value_t);
614 kv->key = strdup(OPAL_PMIX_DAEMON_MEMORY);
615 kv->type = OPAL_FLOAT;
616 kv->data.fval = pstat.pss;
617 opal_list_append(array, &kv->super);
618 OBJ_DESTRUCT(&pstat);
619
620 pss = 0.0;
621 num_replies = 0;
622 for (i=0; i < orte_local_children->size; i++) {
623 if (NULL != (proct = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i)) &&
624 ORTE_FLAG_TEST(proct, ORTE_PROC_FLAG_ALIVE)) {
625
626 OBJ_CONSTRUCT(&pstat, opal_pstats_t);
627 if (OPAL_SUCCESS == opal_pstat.query(proct->pid, &pstat, NULL)) {
628 pss += pstat.pss;
629 ++num_replies;
630 }
631 OBJ_DESTRUCT(&pstat);
632 }
633 }
634
635 if (0 < num_replies) {
636 pss /= (float)num_replies;
637 }
638 kv = OBJ_NEW(opal_value_t);
639 kv->key = strdup(OPAL_PMIX_CLIENT_AVG_MEMORY);
640 kv->type = OPAL_FLOAT;
641 kv->data.fval = pss;
642 opal_list_append(array, &kv->super);
643 } else {
644
645 }
646 } else {
647
648
649
650 }
651 } else if (0 == strcmp(q->keys[n], OPAL_PMIX_TIME_REMAINING)) {
652 kv = OBJ_NEW(opal_value_t);
653 kv->key = strdup(OPAL_PMIX_TIME_REMAINING);
654 kv->type = OPAL_UINT32;
655 if (ORTE_SUCCESS != orte_schizo.get_remaining_time(&kv->data.uint32)) {
656 OBJ_RELEASE(kv);
657 } else {
658 opal_list_append(results, &kv->super);
659 }
660 } else if (0 == strcmp(q->keys[n], OPAL_PMIX_HWLOC_XML_V1)) {
661 if (NULL != opal_hwloc_topology) {
662 char *xmlbuffer=NULL;
663 int len;
664 kv = OBJ_NEW(opal_value_t);
665 kv->key = strdup(OPAL_PMIX_HWLOC_XML_V1);
666 #if HWLOC_API_VERSION < 0x20000
667
668 if (0 != hwloc_topology_export_xmlbuffer(opal_hwloc_topology, &xmlbuffer, &len)) {
669 OBJ_RELEASE(kv);
670 continue;
671 }
672 #else
673
674 if (0 != hwloc_topology_export_xmlbuffer(opal_hwloc_topology, &xmlbuffer, &len,
675 HWLOC_TOPOLOGY_EXPORT_XML_FLAG_V1)) {
676 OBJ_RELEASE(kv);
677 continue;
678 }
679 #endif
680 kv->data.string = xmlbuffer;
681 kv->type = OPAL_STRING;
682 opal_list_append(results, &kv->super);
683 }
684 } else if (0 == strcmp(q->keys[n], OPAL_PMIX_HWLOC_XML_V2)) {
685
686 #if HWLOC_API_VERSION >= 0x20000
687 if (NULL != opal_hwloc_topology) {
688 char *xmlbuffer=NULL;
689 int len;
690 kv = OBJ_NEW(opal_value_t);
691 kv->key = strdup(OPAL_PMIX_HWLOC_XML_V2);
692 if (0 != hwloc_topology_export_xmlbuffer(opal_hwloc_topology, &xmlbuffer, &len, 0)) {
693 OBJ_RELEASE(kv);
694 continue;
695 }
696 kv->data.string = xmlbuffer;
697 kv->type = OPAL_STRING;
698 opal_list_append(results, &kv->super);
699 }
700 #endif
701 } else if (0 == strcmp(q->keys[n], OPAL_PMIX_SERVER_URI)) {
702
703 kv = OBJ_NEW(opal_value_t);
704 kv->key = strdup(OPAL_PMIX_SERVER_URI);
705 kv->type = OPAL_STRING;
706 kv->data.string = strdup(orte_process_info.my_hnp_uri);
707 opal_list_append(results, &kv->super);
708 } else if (0 == strcmp(q->keys[n], OPAL_PMIX_QUERY_PROC_TABLE)) {
709
710 jobid = ORTE_JOBID_INVALID;
711 OPAL_LIST_FOREACH(kv, &q->qualifiers, opal_value_t) {
712 if (0 == strcmp(kv->key, OPAL_PMIX_PROCID)) {
713
714 jobid = kv->data.name.jobid;
715 break;
716 }
717 }
718 if (ORTE_JOBID_INVALID == jobid) {
719 rc = ORTE_ERR_NOT_FOUND;
720 goto done;
721 }
722
723
724 jdata = orte_get_job_data_object(jobid);
725 if (NULL == jdata) {
726 rc = ORTE_ERR_NOT_FOUND;
727 goto done;
728 }
729
730 kv = OBJ_NEW(opal_value_t);
731 kv->key = strdup(OPAL_PMIX_QUERY_PROC_TABLE);
732 kv->type = OPAL_PTR;
733 array = OBJ_NEW(opal_list_t);
734 kv->data.ptr = array;
735 opal_list_append(results, &kv->super);
736
737 for (k=0; k < jdata->procs->size; k++) {
738 if (NULL == (proct = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, k))) {
739 continue;
740 }
741 kv = OBJ_NEW(opal_value_t);
742 kv->type = OPAL_PROC_INFO;
743 kv->data.pinfo.name.jobid = jobid;
744 kv->data.pinfo.name.vpid = proct->name.vpid;
745 if (NULL != proct->node && NULL != proct->node->name) {
746 kv->data.pinfo.hostname = strdup(proct->node->name);
747 }
748 app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, proct->app_idx);
749 if (NULL != app && NULL != app->app) {
750 kv->data.pinfo.executable_name = strdup(app->app);
751 }
752 kv->data.pinfo.pid = proct->pid;
753 kv->data.pinfo.exit_code = proct->exit_code;
754 kv->data.pinfo.state = proct->state;
755 opal_list_append(array, &kv->super);
756 }
757 } else if (0 == strcmp(q->keys[n], OPAL_PMIX_QUERY_LOCAL_PROC_TABLE)) {
758
759 jobid = ORTE_JOBID_INVALID;
760 OPAL_LIST_FOREACH(kv, &q->qualifiers, opal_value_t) {
761 if (0 == strcmp(kv->key, OPAL_PMIX_PROCID)) {
762
763 jobid = kv->data.name.jobid;
764 break;
765 }
766 }
767 if (ORTE_JOBID_INVALID == jobid) {
768 rc = ORTE_ERR_BAD_PARAM;
769 goto done;
770 }
771
772
773 jdata = orte_get_job_data_object(jobid);
774 if (NULL == jdata) {
775 rc = ORTE_ERR_NOT_FOUND;
776 goto done;
777 }
778
779 kv = OBJ_NEW(opal_value_t);
780 kv->key = strdup(OPAL_PMIX_QUERY_LOCAL_PROC_TABLE);
781 kv->type = OPAL_PTR;
782 array = OBJ_NEW(opal_list_t);
783 kv->data.ptr = array;
784 opal_list_append(results, &kv->super);
785
786 for (k=0; k < jdata->procs->size; k++) {
787 if (NULL == (proct = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, k))) {
788 continue;
789 }
790 if (ORTE_FLAG_TEST(proct, ORTE_PROC_FLAG_LOCAL)) {
791 kv = OBJ_NEW(opal_value_t);
792 kv->type = OPAL_PROC_INFO;
793 kv->data.pinfo.name.jobid = jobid;
794 kv->data.pinfo.name.vpid = proct->name.vpid;
795 if (NULL != proct->node && NULL != proct->node->name) {
796 kv->data.pinfo.hostname = strdup(proct->node->name);
797 }
798 app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, proct->app_idx);
799 if (NULL != app && NULL != app->app) {
800 kv->data.pinfo.executable_name = strdup(app->app);
801 }
802 kv->data.pinfo.pid = proct->pid;
803 kv->data.pinfo.exit_code = proct->exit_code;
804 kv->data.pinfo.state = proct->state;
805 opal_list_append(array, &kv->super);
806 }
807 }
808 }
809 }
810 }
811
812 done:
813 if (ORTE_SUCCESS == rc) {
814 if (0 == opal_list_get_size(results)) {
815 rc = ORTE_ERR_NOT_FOUND;
816 } else if (opal_list_get_size(results) < opal_list_get_size(cd->info)) {
817 rc = ORTE_ERR_PARTIAL_SUCCESS;
818 }
819 }
820 cd->infocbfunc(rc, results, cd->cbdata, qrel, results);
821 }
822
823 int pmix_server_query_fn(opal_process_name_t *requestor,
824 opal_list_t *queries,
825 opal_pmix_info_cbfunc_t cbfunc, void *cbdata)
826 {
827 orte_pmix_server_op_caddy_t *cd;
828
829 if (NULL == queries || NULL == cbfunc) {
830 return OPAL_ERR_BAD_PARAM;
831 }
832
833
834 cd = OBJ_NEW(orte_pmix_server_op_caddy_t);
835 cd->proc = *requestor;
836 cd->info = queries;
837 cd->infocbfunc = cbfunc;
838 cd->cbdata = cbdata;
839
840 opal_event_set(orte_event_base, &(cd->ev), -1,
841 OPAL_EV_WRITE, _query, cd);
842 opal_event_set_priority(&(cd->ev), ORTE_MSG_PRI);
843 ORTE_POST_OBJECT(cd);
844 opal_event_active(&(cd->ev), OPAL_EV_WRITE, 1);
845
846 return ORTE_SUCCESS;
847 }
848
849 static void _toolconn(int sd, short args, void *cbdata)
850 {
851 orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
852 orte_job_t *jdata;
853 orte_app_context_t *app;
854 orte_proc_t *proc;
855 orte_node_t *node, *nptr;
856 char *hostname = NULL;
857 orte_process_name_t tool = {ORTE_JOBID_INVALID, ORTE_VPID_INVALID};
858 int rc, i;
859 opal_value_t *val;
860 bool flag = false, flag_given = false;;
861
862 ORTE_ACQUIRE_OBJECT(cd);
863
864 opal_output_verbose(2, orte_pmix_server_globals.output,
865 "%s TOOL CONNECTION PROCESSING",
866 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
867
868
869 if (NULL != cd->info) {
870 OPAL_LIST_FOREACH(val, cd->info, opal_value_t) {
871 if (0 == strcmp(val->key, OPAL_PMIX_EVENT_SILENT_TERMINATION)) {
872 if (OPAL_UNDEF == val->type || val->data.flag) {
873 flag = true;
874 flag_given = true;
875 }
876 } else if (0 == strcmp(val->key, OPAL_PMIX_NSPACE)) {
877 tool.jobid = val->data.name.jobid;
878 } else if (0 == strcmp(val->key, OPAL_PMIX_RANK)) {
879 tool.vpid = val->data.name.vpid;
880 } else if (0 == strcmp(val->key, OPAL_PMIX_HOSTNAME)) {
881 if (NULL != hostname) {
882
883 free(hostname);
884 }
885 hostname = strdup(val->data.string);
886 }
887 }
888 }
889
890
891
892
893
894
895 if (ORTE_JOBID_INVALID == tool.jobid ||
896 ORTE_VPID_INVALID == tool.vpid) {
897
898 if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_MASTER) {
899 jdata = OBJ_NEW(orte_job_t);
900 rc = orte_plm_base_create_jobid(jdata);
901 if (ORTE_SUCCESS != rc) {
902 OBJ_RELEASE(jdata);
903 if (NULL != cd->toolcbfunc) {
904 cd->toolcbfunc(ORTE_ERROR, tool, cd->cbdata);
905 }
906 OBJ_RELEASE(cd);
907 if (NULL != hostname) {
908 free(hostname);
909 }
910 return;
911 }
912 tool.jobid = jdata->jobid;
913 tool.vpid = 0;
914 } else {
915
916
917 if (NULL != cd->toolcbfunc) {
918 cd->toolcbfunc(ORTE_ERR_NOT_SUPPORTED, tool, cd->cbdata);
919 }
920 OBJ_RELEASE(cd);
921 if (NULL != hostname) {
922 free(hostname);
923 }
924 return;
925 }
926 } else {
927 jdata = OBJ_NEW(orte_job_t);
928 jdata->jobid = tool.jobid;
929 }
930
931 opal_hash_table_set_value_uint32(orte_job_data, jdata->jobid, jdata);
932
933
934
935
936
937
938
939
940 jdata->map = OBJ_NEW(orte_job_map_t);
941
942
943 app = OBJ_NEW(orte_app_context_t);
944 app->app = strdup("tool");
945 app->num_procs = 1;
946 opal_pointer_array_add(jdata->apps, app);
947 jdata->num_apps = 1;
948
949
950
951
952
953
954 proc = OBJ_NEW(orte_proc_t);
955 proc->name.jobid = jdata->jobid;
956 proc->name.vpid = tool.vpid;
957 proc->parent = ORTE_PROC_MY_NAME->vpid;
958 ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_ALIVE);
959 ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_TOOL);
960 proc->state = ORTE_PROC_STATE_RUNNING;
961
962 proc->local_rank = 0;
963 proc->node_rank = 0;
964 proc->app_rank = 0;
965 proc->app_idx = 0;
966 if (NULL == hostname) {
967
968 node = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, 0);
969 ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_LOCAL);
970 } else {
971
972 node = NULL;
973 for (i=0; i < orte_node_pool->size; i++) {
974 if (NULL == (nptr = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, i))) {
975 continue;
976 }
977 if (0 == strcmp(hostname, nptr->name)) {
978 node = nptr;
979 break;
980 }
981 }
982 if (NULL == node) {
983
984 node = OBJ_NEW(orte_node_t);
985 node->name = strdup(hostname);
986 ORTE_FLAG_SET(node, ORTE_NODE_NON_USABLE);
987 opal_pointer_array_add(orte_node_pool, node);
988 }
989 free(hostname);
990 }
991 proc->node = node;
992 OBJ_RETAIN(node);
993 opal_pointer_array_add(jdata->procs, proc);
994 jdata->num_procs = 1;
995
996 OBJ_RETAIN(node);
997 opal_pointer_array_add(jdata->map->nodes, node);
998 jdata->map->num_nodes++;
999
1000
1001
1002
1003 OBJ_RETAIN(proc);
1004 opal_pointer_array_add(node->procs, proc);
1005
1006 if (flag_given) {
1007 orte_set_attribute(&jdata->attributes, ORTE_JOB_SILENT_TERMINATION,
1008 ORTE_ATTR_GLOBAL, &flag, OPAL_BOOL);
1009 } else {
1010
1011 flag = true;
1012 orte_set_attribute(&jdata->attributes, ORTE_JOB_SILENT_TERMINATION,
1013 ORTE_ATTR_GLOBAL, &flag, OPAL_BOOL);
1014 }
1015
1016 if (NULL != cd->toolcbfunc) {
1017 cd->toolcbfunc(ORTE_ERR_NOT_SUPPORTED, tool, cd->cbdata);
1018 }
1019 OBJ_RELEASE(cd);
1020 }
1021
1022 void pmix_tool_connected_fn(opal_list_t *info,
1023 opal_pmix_tool_connection_cbfunc_t cbfunc,
1024 void *cbdata)
1025 {
1026 orte_pmix_server_op_caddy_t *cd;
1027
1028 opal_output_verbose(2, orte_pmix_server_globals.output,
1029 "%s TOOL CONNECTION REQUEST RECVD",
1030 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
1031
1032
1033 cd = OBJ_NEW(orte_pmix_server_op_caddy_t);
1034 cd->info = info;
1035 cd->toolcbfunc = cbfunc;
1036 cd->cbdata = cbdata;
1037
1038 opal_event_set(orte_event_base, &(cd->ev), -1,
1039 OPAL_EV_WRITE, _toolconn, cd);
1040 opal_event_set_priority(&(cd->ev), ORTE_MSG_PRI);
1041 ORTE_POST_OBJECT(cd);
1042 opal_event_active(&(cd->ev), OPAL_EV_WRITE, 1);
1043
1044 }
1045
1046 static void lgcbfn(int sd, short args, void *cbdata)
1047 {
1048 orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
1049
1050 if (NULL != cd->cbfunc) {
1051 cd->cbfunc(cd->status, cd->cbdata);
1052 }
1053 OBJ_RELEASE(cd);
1054 }
1055
1056 void pmix_server_log_fn(opal_process_name_t *requestor,
1057 opal_list_t *info,
1058 opal_list_t *directives,
1059 opal_pmix_op_cbfunc_t cbfunc,
1060 void *cbdata)
1061 {
1062 opal_value_t *val;
1063 opal_buffer_t *buf;
1064 int rc;
1065
1066 opal_output_verbose(2, orte_pmix_server_globals.output,
1067 "%s logging info",
1068 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
1069
1070 OPAL_LIST_FOREACH(val, info, opal_value_t) {
1071 if (NULL == val->key) {
1072 ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
1073 continue;
1074 }
1075 if (0 == strcmp(val->key, OPAL_PMIX_LOG_MSG)) {
1076
1077 if (OPAL_BYTE_OBJECT != val->type) {
1078 continue;
1079 }
1080 buf = OBJ_NEW(opal_buffer_t);
1081 opal_dss.load(buf, val->data.bo.bytes, val->data.bo.size);
1082 val->data.bo.bytes = NULL;
1083 if (ORTE_SUCCESS != (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf,
1084 ORTE_RML_TAG_SHOW_HELP,
1085 orte_rml_send_callback, NULL))) {
1086 ORTE_ERROR_LOG(rc);
1087 OBJ_RELEASE(buf);
1088 }
1089 } else if (0 == strcmp(val->key, OPAL_PMIX_LOG_STDERR)) {
1090 if (ORTE_SUCCESS != (rc = orte_iof.output(requestor, ORTE_IOF_STDERR, val->data.string))) {
1091 ORTE_ERROR_LOG(rc);
1092 }
1093 } else if (0 == strcmp(val->key, OPAL_PMIX_LOG_STDOUT)) {
1094 if (ORTE_SUCCESS != (rc = orte_iof.output(requestor, ORTE_IOF_STDOUT, val->data.string))) {
1095 ORTE_ERROR_LOG(rc);
1096 }
1097 }
1098 }
1099
1100
1101
1102
1103 rc = ORTE_SUCCESS;
1104 ORTE_PMIX_THREADSHIFT(requestor, NULL, rc,
1105 NULL, NULL, lgcbfn,
1106 cbfunc, cbdata);
1107 }
1108
1109 int pmix_server_job_ctrl_fn(const opal_process_name_t *requestor,
1110 opal_list_t *targets,
1111 opal_list_t *info,
1112 opal_pmix_info_cbfunc_t cbfunc,
1113 void *cbdata)
1114 {
1115 opal_value_t *val;
1116 int rc, n;
1117 orte_proc_t *proc;
1118 opal_pointer_array_t parray, *ptrarray;
1119 opal_namelist_t *nm;
1120 opal_buffer_t *cmd;
1121 orte_daemon_cmd_flag_t cmmnd = ORTE_DAEMON_HALT_VM_CMD;
1122 orte_grpcomm_signature_t *sig;
1123
1124 opal_output_verbose(2, orte_pmix_server_globals.output,
1125 "%s job control request from %s",
1126 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1127 ORTE_NAME_PRINT(requestor));
1128
1129 OPAL_LIST_FOREACH(val, info, opal_value_t) {
1130 if (NULL == val->key) {
1131 ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
1132 continue;
1133 }
1134 if (0 == strcmp(val->key, OPAL_PMIX_JOB_CTRL_KILL)) {
1135
1136 if (0 == opal_list_get_size(targets)) {
1137 ptrarray = NULL;
1138 } else {
1139 OBJ_CONSTRUCT(&parray, opal_pointer_array_t);
1140 OPAL_LIST_FOREACH(nm, targets, opal_namelist_t) {
1141
1142 if (NULL == (proc = orte_get_proc_object(&nm->name))) {
1143 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
1144 continue;
1145 }
1146 OBJ_RETAIN(proc);
1147 opal_pointer_array_add(&parray, proc);
1148 }
1149 ptrarray = &parray;
1150 }
1151 if (ORTE_SUCCESS != (rc = orte_plm.terminate_procs(ptrarray))) {
1152 ORTE_ERROR_LOG(rc);
1153 }
1154 if (NULL != ptrarray) {
1155
1156 for (n=0; n < parray.size; n++) {
1157 if (NULL != (proc = (orte_proc_t*)opal_pointer_array_get_item(&parray, n))) {
1158 OBJ_RELEASE(proc);
1159 }
1160 }
1161 OBJ_DESTRUCT(&parray);
1162 }
1163 continue;
1164 } else if (0 == strcmp(val->key, OPAL_PMIX_JOB_CTRL_TERMINATE)) {
1165 if (0 == opal_list_get_size(targets)) {
1166
1167 cmd = OBJ_NEW(opal_buffer_t);
1168
1169 if (ORTE_SUCCESS != (rc = opal_dss.pack(cmd, &cmmnd, 1, ORTE_DAEMON_CMD))) {
1170 ORTE_ERROR_LOG(rc);
1171 OBJ_RELEASE(cmd);
1172 return rc;
1173 }
1174
1175 sig = OBJ_NEW(orte_grpcomm_signature_t);
1176 sig->signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t));
1177 sig->signature[0].jobid = ORTE_PROC_MY_NAME->jobid;
1178 sig->signature[0].vpid = ORTE_VPID_WILDCARD;
1179 if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(sig, ORTE_RML_TAG_DAEMON, cmd))) {
1180 ORTE_ERROR_LOG(rc);
1181 }
1182 OBJ_RELEASE(cmd);
1183 OBJ_RELEASE(sig);
1184 }
1185 }
1186 }
1187
1188 return ORTE_OPERATION_SUCCEEDED;
1189 }