root/orte/orted/pmix/pmix_server_gen.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. _client_conn
  2. pmix_server_client_connected_fn
  3. _client_finalized
  4. pmix_server_client_finalized_fn
  5. _client_abort
  6. pmix_server_abort_fn
  7. _register_events
  8. pmix_server_register_events_fn
  9. _deregister_events
  10. pmix_server_deregister_events_fn
  11. _notify_release
  12. pmix_server_notify
  13. pmix_server_notify_event
  14. qrel
  15. _query
  16. pmix_server_query_fn
  17. _toolconn
  18. pmix_tool_connected_fn
  19. lgcbfn
  20. pmix_server_log_fn
  21. pmix_server_job_ctrl_fn

   1 /*
   2  * Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2011 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2006-2013 Los Alamos National Security, LLC.
  13  *                         All rights reserved.
  14  * Copyright (c) 2009      Cisco Systems, Inc.  All rights reserved.
  15  * Copyright (c) 2011      Oak Ridge National Labs.  All rights reserved.
  16  * Copyright (c) 2013-2019 Intel, Inc.  All rights reserved.
  17  * Copyright (c) 2014-2017 Mellanox Technologies, Inc.
  18  *                         All rights reserved.
  19  * Copyright (c) 2014      Research Organization for Information Science
  20  *                         and Technology (RIST). All rights reserved.
  21  * $COPYRIGHT$
  22  *
  23  * Additional copyrights may follow
  24  *
  25  * $HEADER$
  26  *
  27  */
  28 
  29 #include "orte_config.h"
  30 
  31 #ifdef HAVE_UNISTD_H
  32 #include <unistd.h>
  33 #endif
  34 
  35 #include "opal/util/argv.h"
  36 #include "opal/util/output.h"
  37 #include "opal/dss/dss.h"
  38 #include "opal/mca/hwloc/hwloc-internal.h"
  39 #include "opal/mca/pstat/pstat.h"
  40 
  41 #include "orte/mca/errmgr/errmgr.h"
  42 #include "orte/mca/iof/iof.h"
  43 #include "orte/mca/rmaps/rmaps_types.h"
  44 #include "orte/mca/schizo/schizo.h"
  45 #include "orte/mca/state/state.h"
  46 #include "orte/util/name_fns.h"
  47 #include "orte/util/threads.h"
  48 #include "orte/runtime/orte_globals.h"
  49 #include "orte/mca/rml/rml.h"
  50 #include "orte/mca/plm/plm.h"
  51 #include "orte/mca/plm/base/plm_private.h"
  52 
  53 #include "pmix_server_internal.h"
  54 
  55 static void _client_conn(int sd, short args, void *cbdata)
  56 {
  57     orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
  58     orte_job_t *jdata;
  59     orte_proc_t *p, *ptr;
  60     int i;
  61 
  62     ORTE_ACQUIRE_OBJECT(cd);
  63 
  64     if (NULL != cd->server_object) {
  65         /* we were passed back the orte_proc_t */
  66         p = (orte_proc_t*)cd->server_object;
  67     } else {
  68         /* find the named process */
  69         p = NULL;
  70         if (NULL == (jdata = orte_get_job_data_object(cd->proc.jobid))) {
  71             return;
  72         }
  73         for (i=0; i < jdata->procs->size; i++) {
  74             if (NULL == (ptr = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) {
  75                 continue;
  76             }
  77             if (cd->proc.jobid != ptr->name.jobid) {
  78                 continue;
  79             }
  80             if (cd->proc.vpid == ptr->name.vpid) {
  81                 p = ptr;
  82                 break;
  83             }
  84         }
  85     }
  86     if (NULL != p) {
  87         ORTE_FLAG_SET(p, ORTE_PROC_FLAG_REG);
  88         ORTE_ACTIVATE_PROC_STATE(&p->name, ORTE_PROC_STATE_REGISTERED);
  89     }
  90     if (NULL != cd->cbfunc) {
  91         cd->cbfunc(OPAL_SUCCESS, cd->cbdata);
  92     }
  93     OBJ_RELEASE(cd);
  94 }
  95 
  96 int pmix_server_client_connected_fn(opal_process_name_t *proc, void *server_object,
  97                                     opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
  98 {
  99     /* need to thread-shift this request as we are going
 100      * to access our global list of registered events */
 101     ORTE_PMIX_THREADSHIFT(proc, server_object, OPAL_SUCCESS, NULL,
 102                           NULL, _client_conn, cbfunc, cbdata);
 103     return ORTE_SUCCESS;
 104 }
 105 
 106 static void _client_finalized(int sd, short args, void *cbdata)
 107 {
 108     orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
 109     orte_job_t *jdata;
 110     orte_proc_t *p, *ptr;
 111     int i;
 112 
 113     ORTE_ACQUIRE_OBJECT(cd);
 114 
 115     if (NULL != cd->server_object) {
 116         /* we were passed back the orte_proc_t */
 117         p = (orte_proc_t*)cd->server_object;
 118     } else {
 119         /* find the named process */
 120         p = NULL;
 121         if (NULL == (jdata = orte_get_job_data_object(cd->proc.jobid))) {
 122             return;
 123         }
 124         for (i=0; i < jdata->procs->size; i++) {
 125             if (NULL == (ptr = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) {
 126                 continue;
 127             }
 128             if (cd->proc.jobid != ptr->name.jobid) {
 129                 continue;
 130             }
 131             if (cd->proc.vpid == ptr->name.vpid) {
 132                 p = ptr;
 133                 break;
 134             }
 135         }
 136         /* if we came thru this code path, then this client must be an
 137          * independent tool that connected to us - i.e., it wasn't
 138          * something we spawned. For accounting purposes, we have to
 139          * ensure the job complete procedure is run - otherwise, slots
 140          * and other resources won't correctly be released */
 141         ORTE_FLAG_SET(p, ORTE_PROC_FLAG_IOF_COMPLETE);
 142         ORTE_FLAG_SET(p, ORTE_PROC_FLAG_WAITPID);
 143         ORTE_ACTIVATE_PROC_STATE(&cd->proc, ORTE_PROC_STATE_TERMINATED);
 144     }
 145     if (NULL != p) {
 146         ORTE_FLAG_SET(p, ORTE_PROC_FLAG_HAS_DEREG);
 147         /* release the caller */
 148         if (NULL != cd->cbfunc) {
 149             cd->cbfunc(ORTE_SUCCESS, cd->cbdata);
 150         }
 151     }
 152     OBJ_RELEASE(cd);
 153 }
 154 
 155 int pmix_server_client_finalized_fn(opal_process_name_t *proc, void *server_object,
 156                                     opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
 157 {
 158     /* need to thread-shift this request as we are going
 159      * to access our global list of registered events */
 160     ORTE_PMIX_THREADSHIFT(proc, server_object, OPAL_SUCCESS, NULL,
 161                           NULL, _client_finalized, cbfunc, cbdata);
 162     return ORTE_SUCCESS;
 163 
 164 }
 165 
 166 static void _client_abort(int sd, short args, void *cbdata)
 167 {
 168     orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
 169     orte_job_t *jdata;
 170     orte_proc_t *p, *ptr;
 171     int i;
 172 
 173     ORTE_ACQUIRE_OBJECT(cd);
 174 
 175     if (NULL != cd->server_object) {
 176         p = (orte_proc_t*)cd->server_object;
 177     } else {
 178         /* find the named process */
 179         p = NULL;
 180         if (NULL == (jdata = orte_get_job_data_object(cd->proc.jobid))) {
 181             return;
 182         }
 183         for (i=0; i < jdata->procs->size; i++) {
 184             if (NULL == (ptr = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) {
 185                 continue;
 186             }
 187             if (cd->proc.jobid != ptr->name.jobid) {
 188                 continue;
 189             }
 190             if (cd->proc.vpid == ptr->name.vpid) {
 191                 p = ptr;
 192                 break;
 193             }
 194         }
 195     }
 196     if (NULL != p) {
 197         p->exit_code = cd->status;
 198         ORTE_ACTIVATE_PROC_STATE(&p->name, ORTE_PROC_STATE_CALLED_ABORT);
 199     }
 200 
 201     /* release the caller */
 202     if (NULL != cd->cbfunc) {
 203         cd->cbfunc(OPAL_SUCCESS, cd->cbdata);
 204     }
 205     OBJ_RELEASE(cd);
 206 }
 207 
 208 int pmix_server_abort_fn(opal_process_name_t *proc, void *server_object,
 209                          int status, const char msg[],
 210                          opal_list_t *procs_to_abort,
 211                          opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
 212 {
 213     /* need to thread-shift this request as we are going
 214      * to access our global list of registered events */
 215     ORTE_PMIX_THREADSHIFT(proc, server_object, status, msg,
 216                           procs_to_abort, _client_abort, cbfunc, cbdata);
 217     return ORTE_SUCCESS;
 218 }
 219 
 220 static void _register_events(int sd, short args, void *cbdata)
 221 {
 222     orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
 223     opal_value_t *info;
 224 
 225     ORTE_ACQUIRE_OBJECT(cd);
 226 
 227     /* the OPAL layer "owns" the list, but let's deconstruct it
 228      * here so we don't have to duplicate the data */
 229     while (NULL != (info = (opal_value_t*)opal_list_remove_first(cd->info))) {
 230         /* don't worry about duplication as the underlying host
 231          * server is already protecting us from it */
 232         opal_list_append(&orte_pmix_server_globals.notifications, &info->super);
 233     }
 234 
 235     if (NULL != cd->cbfunc) {
 236         cd->cbfunc(ORTE_SUCCESS, cd->cbdata);
 237     }
 238     OBJ_RELEASE(cd);
 239 }
 240 
 241 /* hook for the local PMIX server to pass event registrations
 242  * up to us - we will assume the responsibility for providing
 243  * notifications for registered events */
 244 int pmix_server_register_events_fn(opal_list_t *info,
 245                                    opal_pmix_op_cbfunc_t cbfunc,
 246                                    void *cbdata)
 247 {
 248     /* need to thread-shift this request as we are going
 249      * to access our global list of registered events */
 250     ORTE_PMIX_OPERATION(NULL, info, _register_events, cbfunc, cbdata);
 251     return ORTE_SUCCESS;
 252 }
 253 
 254 static void _deregister_events(int sd, short args, void *cbdata)
 255 {
 256     orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
 257     opal_value_t *info, *iptr, *nptr;
 258 
 259     ORTE_ACQUIRE_OBJECT(cd);
 260 
 261     /* the OPAL layer "owns" the list, but let's deconstruct it
 262      * here for consistency */
 263     while (NULL != (info = (opal_value_t*)opal_list_remove_first(cd->info))) {
 264         /* search for matching requests */
 265         OPAL_LIST_FOREACH_SAFE(iptr, nptr, &orte_pmix_server_globals.notifications, opal_value_t) {
 266             if (OPAL_EQUAL == opal_dss.compare(iptr, info, OPAL_VALUE)) {
 267                 opal_list_remove_item(&orte_pmix_server_globals.notifications, &iptr->super);
 268                 OBJ_RELEASE(iptr);
 269                 break;
 270             }
 271         }
 272         OBJ_RELEASE(info);
 273     }
 274 
 275     if (NULL != cd->cbfunc) {
 276         cd->cbfunc(ORTE_SUCCESS, cd->cbdata);
 277     }
 278     OBJ_RELEASE(cd);
 279 }
 280 /* hook for the local PMIX server to pass event deregistrations
 281  * up to us */
 282 int pmix_server_deregister_events_fn(opal_list_t *info,
 283                                      opal_pmix_op_cbfunc_t cbfunc,
 284                                      void *cbdata)
 285 {
 286     /* need to thread-shift this request as we are going
 287      * to access our global list of registered events */
 288     ORTE_PMIX_OPERATION(NULL, info, _deregister_events, cbfunc, cbdata);
 289     return ORTE_SUCCESS;
 290 }
 291 
 292 static void _notify_release(int status, void *cbdata)
 293 {
 294     orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
 295 
 296     ORTE_ACQUIRE_OBJECT(cd);
 297 
 298     if (NULL != cd->info) {
 299         OPAL_LIST_RELEASE(cd->info);
 300     }
 301     OBJ_RELEASE(cd);
 302 }
 303 
 304 /* someone has sent us an event that we need to distribute
 305  * to our local clients */
 306 void pmix_server_notify(int status, orte_process_name_t* sender,
 307                         opal_buffer_t *buffer,
 308                         orte_rml_tag_t tg, void *cbdata)
 309 {
 310     int cnt, rc, ret, ninfo, n;
 311     opal_value_t *val;
 312     orte_pmix_server_op_caddy_t *cd;
 313     orte_process_name_t source;
 314 
 315     opal_output_verbose(2, orte_pmix_server_globals.output,
 316                         "%s Notification received from %s",
 317                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 318                         ORTE_NAME_PRINT(sender));
 319 
 320     /* unpack the status */
 321     cnt = 1;
 322     if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &ret, &cnt, OPAL_INT))) {
 323         ORTE_ERROR_LOG(rc);
 324         return;
 325     }
 326 
 327     /* unpack the source */
 328     cnt = 1;
 329     if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &source, &cnt, ORTE_NAME))) {
 330         ORTE_ERROR_LOG(rc);
 331         return;
 332     }
 333 
 334     /* unpack the infos that were provided */
 335     cnt = 1;
 336     if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &ninfo, &cnt, OPAL_INT))) {
 337         ORTE_ERROR_LOG(rc);
 338         return;
 339     }
 340 
 341     /* if any were provided, add them to the list */
 342     cd = OBJ_NEW(orte_pmix_server_op_caddy_t);
 343 
 344     if (0 < ninfo) {
 345         cd->info = OBJ_NEW(opal_list_t);
 346         for (n=0; n < ninfo; n++) {
 347             val = OBJ_NEW(opal_value_t);
 348             if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &val, &cnt, OPAL_VALUE))) {
 349                 ORTE_ERROR_LOG(rc);
 350                 OBJ_RELEASE(val);
 351                 OPAL_LIST_RELEASE(cd->info);
 352                 OBJ_RELEASE(cd);
 353                 return;
 354             }
 355             opal_list_append(cd->info, &val->super);
 356         }
 357     }
 358 
 359     /* protect against infinite loops by marking that this notification was
 360      * passed down to the server by me */
 361     if (NULL == cd->info) {
 362         cd->info = OBJ_NEW(opal_list_t);
 363     }
 364     val = OBJ_NEW(opal_value_t);
 365     val->key = strdup("orte.notify.donotloop");
 366     val->type = OPAL_BOOL;
 367     val->data.flag = true;
 368     opal_list_append(cd->info, &val->super);
 369 
 370     opal_output_verbose(2, orte_pmix_server_globals.output,
 371                         "%s NOTIFYING PMIX SERVER OF STATUS %d",
 372                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ret);
 373     if (OPAL_SUCCESS != (rc = opal_pmix.server_notify_event(ret, &source, cd->info, _notify_release, cd))) {
 374         ORTE_ERROR_LOG(rc);
 375         if (NULL != cd->info) {
 376             OPAL_LIST_RELEASE(cd->info);
 377         }
 378         OBJ_RELEASE(cd);
 379     }
 380 }
 381 
 382 int pmix_server_notify_event(int code, opal_process_name_t *source,
 383                              opal_list_t *info,
 384                              opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
 385 {
 386     opal_buffer_t *buf;
 387     int rc, ninfo;
 388     opal_value_t *val;
 389     orte_grpcomm_signature_t *sig;
 390 
 391     opal_output_verbose(2, orte_pmix_server_globals.output,
 392                         "%s local process %s generated event code %d",
 393                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 394                         ORTE_NAME_PRINT(source), code);
 395 
 396     /* check to see if this is one we sent down */
 397     OPAL_LIST_FOREACH(val, info, opal_value_t) {
 398         if (0 == strcmp(val->key, "orte.notify.donotloop")) {
 399             /* yep - do not process */
 400             goto done;
 401         }
 402     }
 403 
 404     /* a local process has generated an event - we need to xcast it
 405      * to all the daemons so it can be passed down to their local
 406      * procs */
 407     buf = OBJ_NEW(opal_buffer_t);
 408     if (NULL == buf) {
 409         return ORTE_ERR_OUT_OF_RESOURCE;
 410     }
 411     /* pack the status code */
 412     if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &code, 1, OPAL_INT))) {
 413         ORTE_ERROR_LOG(rc);
 414         OBJ_RELEASE(buf);
 415         return rc;
 416     }
 417     /* pack the source */
 418     if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, source, 1, ORTE_NAME))) {
 419         ORTE_ERROR_LOG(rc);
 420         OBJ_RELEASE(buf);
 421         return rc;
 422     }
 423 
 424     /* pack the number of infos */
 425     if (NULL == info) {
 426         ninfo = 0;
 427     } else {
 428         ninfo = opal_list_get_size(info);
 429     }
 430     if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &ninfo, 1, OPAL_INT))) {
 431         ORTE_ERROR_LOG(rc);
 432         OBJ_RELEASE(buf);
 433         return rc;
 434     }
 435     if (0 < ninfo) {
 436         OPAL_LIST_FOREACH(val, info, opal_value_t) {
 437             if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &val, 1, OPAL_VALUE))) {
 438                 ORTE_ERROR_LOG(rc);
 439                 OBJ_RELEASE(buf);
 440                 return rc;
 441             }
 442         }
 443     }
 444 
 445     /* goes to all daemons */
 446     sig = OBJ_NEW(orte_grpcomm_signature_t);
 447     if (NULL == sig) {
 448         OBJ_RELEASE(buf);
 449         return ORTE_ERR_OUT_OF_RESOURCE;
 450     }
 451     sig->signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t));
 452     if (NULL == sig->signature) {
 453         OBJ_RELEASE(buf);
 454         OBJ_RELEASE(sig);
 455         return ORTE_ERR_OUT_OF_RESOURCE;
 456     }
 457     sig->signature[0].jobid = ORTE_PROC_MY_NAME->jobid;
 458     sig->signature[0].vpid = ORTE_VPID_WILDCARD;
 459     sig->sz = 1;
 460     if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(sig, ORTE_RML_TAG_NOTIFICATION, buf))) {
 461         ORTE_ERROR_LOG(rc);
 462         OBJ_RELEASE(buf);
 463         OBJ_RELEASE(sig);
 464         return rc;
 465     }
 466     OBJ_RELEASE(buf);
 467     /* maintain accounting */
 468     OBJ_RELEASE(sig);
 469 
 470   done:
 471     /* execute the callback */
 472     if (NULL != cbfunc) {
 473         cbfunc(ORTE_SUCCESS, cbdata);
 474     }
 475     return ORTE_SUCCESS;
 476 }
 477 
 478 static void qrel(void *cbdata)
 479 {
 480     opal_list_t *l = (opal_list_t*)cbdata;
 481     OPAL_LIST_RELEASE(l);
 482 }
 483 static void _query(int sd, short args, void *cbdata)
 484 {
 485     orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
 486     opal_pmix_query_t *q;
 487     opal_value_t *kv;
 488     orte_jobid_t jobid;
 489     orte_job_t *jdata;
 490     orte_proc_t *proct;
 491     orte_app_context_t *app;
 492     int rc = ORTE_SUCCESS, i, k, num_replies;
 493     opal_list_t *results, targets, *array;
 494     size_t n;
 495     uint32_t key;
 496     void *nptr;
 497     char **nspaces=NULL, nspace[512];
 498     char **ans = NULL;
 499     bool local_only;
 500     orte_namelist_t *nm;
 501     opal_pstats_t pstat;
 502     float pss;
 503 
 504     ORTE_ACQUIRE_OBJECT(cd);
 505 
 506     opal_output_verbose(2, orte_pmix_server_globals.output,
 507                         "%s processing query",
 508                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 509 
 510     results = OBJ_NEW(opal_list_t);
 511 
 512     /* see what they wanted */
 513     OPAL_LIST_FOREACH(q, cd->info, opal_pmix_query_t) {
 514         for (n=0; NULL != q->keys[n]; n++) {
 515             opal_output_verbose(2, orte_pmix_server_globals.output,
 516                                 "%s processing key %s",
 517                                 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), q->keys[n]);
 518             if (0 == strcmp(q->keys[n], OPAL_PMIX_QUERY_NAMESPACES)) {
 519                 /* get the current jobids */
 520                 rc = opal_hash_table_get_first_key_uint32(orte_job_data, &key, (void **)&jdata, &nptr);
 521                 while (OPAL_SUCCESS == rc) {
 522                     if (ORTE_PROC_MY_NAME->jobid != jdata->jobid) {
 523                         memset(nspace, 0, 512);
 524                         (void)opal_snprintf_jobid(nspace, 512, jdata->jobid);
 525                         opal_argv_append_nosize(&nspaces, nspace);
 526                     }
 527                     rc = opal_hash_table_get_next_key_uint32(orte_job_data, &key, (void **)&jdata, nptr, &nptr);
 528                 }
 529                 /* join the results into a single comma-delimited string */
 530                 kv = OBJ_NEW(opal_value_t);
 531                 kv->key = strdup(OPAL_PMIX_QUERY_NAMESPACES);
 532                 kv->type = OPAL_STRING;
 533                 if (NULL != nspaces) {
 534                     kv->data.string = opal_argv_join(nspaces, ',');
 535                 } else {
 536                     kv->data.string = NULL;
 537                 }
 538                 opal_list_append(results, &kv->super);
 539             } else if (0 == strcmp(q->keys[n], OPAL_PMIX_QUERY_SPAWN_SUPPORT)) {
 540                 opal_argv_append_nosize(&ans, OPAL_PMIX_HOST);
 541                 opal_argv_append_nosize(&ans, OPAL_PMIX_HOSTFILE);
 542                 opal_argv_append_nosize(&ans, OPAL_PMIX_ADD_HOST);
 543                 opal_argv_append_nosize(&ans, OPAL_PMIX_ADD_HOSTFILE);
 544                 opal_argv_append_nosize(&ans, OPAL_PMIX_PREFIX);
 545                 opal_argv_append_nosize(&ans, OPAL_PMIX_WDIR);
 546                 opal_argv_append_nosize(&ans, OPAL_PMIX_MAPPER);
 547                 opal_argv_append_nosize(&ans, OPAL_PMIX_PPR);
 548                 opal_argv_append_nosize(&ans, OPAL_PMIX_MAPBY);
 549                 opal_argv_append_nosize(&ans, OPAL_PMIX_RANKBY);
 550                 opal_argv_append_nosize(&ans, OPAL_PMIX_BINDTO);
 551                 /* create the return kv */
 552                 kv = OBJ_NEW(opal_value_t);
 553                 kv->key = strdup(OPAL_PMIX_QUERY_SPAWN_SUPPORT);
 554                 kv->type = OPAL_STRING;
 555                 kv->data.string = opal_argv_join(ans, ',');
 556                 opal_list_append(results, &kv->super);
 557                 opal_argv_free(ans);
 558                 ans = NULL;
 559             } else if (0 == strcmp(q->keys[n], OPAL_PMIX_QUERY_DEBUG_SUPPORT)) {
 560                 opal_argv_append_nosize(&ans, OPAL_PMIX_DEBUG_STOP_IN_INIT);
 561                 opal_argv_append_nosize(&ans, OPAL_PMIX_DEBUG_JOB);
 562                 opal_argv_append_nosize(&ans, OPAL_PMIX_DEBUG_WAIT_FOR_NOTIFY);
 563                 /* create the return kv */
 564                 kv = OBJ_NEW(opal_value_t);
 565                 kv->key = strdup(OPAL_PMIX_QUERY_DEBUG_SUPPORT);
 566                 kv->type = OPAL_STRING;
 567                 kv->data.string = opal_argv_join(ans, ',');
 568                 opal_list_append(results, &kv->super);
 569                 opal_argv_free(ans);
 570                 ans = NULL;
 571             } else if (0 == strcmp(q->keys[n], OPAL_PMIX_QUERY_MEMORY_USAGE)) {
 572                 OBJ_CONSTRUCT(&targets, opal_list_t);
 573                 /* check the qualifiers to find the procs they want to
 574                  * know about - if qualifiers are NULL, then get it for
 575                  * the daemons + all active jobs */
 576                 if (0 == opal_list_get_size(&q->qualifiers)) {
 577                     /* create a request tracker */
 578                     /* xcast a request for all memory usage */
 579                     /* return success - the callback will be done
 580                      * once we get the results */
 581                     OPAL_LIST_DESTRUCT(&targets);
 582                     return;
 583                 }
 584 
 585                 /* scan the qualifiers */
 586                 local_only = false;
 587                 OPAL_LIST_FOREACH(kv, &q->qualifiers, opal_value_t) {
 588                     if (0 == strcmp(kv->key, OPAL_PMIX_QUERY_LOCAL_ONLY)) {
 589                         if (OPAL_UNDEF == kv->type || kv->data.flag) {
 590                             local_only = true;
 591                         }
 592                     } else if (0 == strcmp(kv->key, OPAL_PMIX_PROCID)) {
 593                         /* save this directive on our list of targets */
 594                         nm = OBJ_NEW(orte_namelist_t);
 595                         memcpy(&nm->name, &kv->data.name, sizeof(opal_process_name_t));
 596                         opal_list_append(&targets, &nm->super);
 597                     }
 598                 }
 599 
 600                 /* if they have asked for only our local procs or daemon,
 601                  * then we can just get the data directly */
 602                 if (local_only) {
 603                     if (0 == opal_list_get_size(&targets)) {
 604                         kv = OBJ_NEW(opal_value_t);
 605                         kv->key = strdup(OPAL_PMIX_QUERY_MEMORY_USAGE);
 606                         kv->type = OPAL_PTR;
 607                         array = OBJ_NEW(opal_list_t);
 608                         kv->data.ptr = array;
 609                         opal_list_append(results, &kv->super);
 610                         /* collect my memory usage */
 611                         OBJ_CONSTRUCT(&pstat, opal_pstats_t);
 612                         opal_pstat.query(orte_process_info.pid, &pstat, NULL);
 613                         kv = OBJ_NEW(opal_value_t);
 614                         kv->key = strdup(OPAL_PMIX_DAEMON_MEMORY);
 615                         kv->type = OPAL_FLOAT;
 616                         kv->data.fval = pstat.pss;
 617                         opal_list_append(array, &kv->super);
 618                         OBJ_DESTRUCT(&pstat);
 619                         /* collect the memory usage of all my children */
 620                         pss = 0.0;
 621                         num_replies = 0;
 622                         for (i=0; i < orte_local_children->size; i++) {
 623                             if (NULL != (proct = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i)) &&
 624                                 ORTE_FLAG_TEST(proct, ORTE_PROC_FLAG_ALIVE)) {
 625                                 /* collect the stats on this proc */
 626                                 OBJ_CONSTRUCT(&pstat, opal_pstats_t);
 627                                 if (OPAL_SUCCESS == opal_pstat.query(proct->pid, &pstat, NULL)) {
 628                                     pss += pstat.pss;
 629                                     ++num_replies;
 630                                 }
 631                                 OBJ_DESTRUCT(&pstat);
 632                             }
 633                         }
 634                         /* compute the average value */
 635                         if (0 < num_replies) {
 636                             pss /= (float)num_replies;
 637                         }
 638                         kv = OBJ_NEW(opal_value_t);
 639                         kv->key = strdup(OPAL_PMIX_CLIENT_AVG_MEMORY);
 640                         kv->type = OPAL_FLOAT;
 641                         kv->data.fval = pss;
 642                         opal_list_append(array, &kv->super);
 643                     } else {
 644 
 645                     }
 646                 } else {
 647                     /* if they want it for remote procs, see who is hosting them
 648                      * and ask directly for the info - if rank=wildcard, then
 649                      * we need to xcast the request and collect the results */
 650                 }
 651             } else if (0 == strcmp(q->keys[n], OPAL_PMIX_TIME_REMAINING)) {
 652                 kv = OBJ_NEW(opal_value_t);
 653                 kv->key = strdup(OPAL_PMIX_TIME_REMAINING);
 654                 kv->type = OPAL_UINT32;
 655                 if (ORTE_SUCCESS != orte_schizo.get_remaining_time(&kv->data.uint32)) {
 656                     OBJ_RELEASE(kv);
 657                 } else {
 658                     opal_list_append(results, &kv->super);
 659                 }
 660             } else if (0 == strcmp(q->keys[n], OPAL_PMIX_HWLOC_XML_V1)) {
 661                 if (NULL != opal_hwloc_topology) {
 662                     char *xmlbuffer=NULL;
 663                     int len;
 664                     kv = OBJ_NEW(opal_value_t);
 665                     kv->key = strdup(OPAL_PMIX_HWLOC_XML_V1);
 666                     #if HWLOC_API_VERSION < 0x20000
 667                         /* get this from the v1.x API */
 668                         if (0 != hwloc_topology_export_xmlbuffer(opal_hwloc_topology, &xmlbuffer, &len)) {
 669                             OBJ_RELEASE(kv);
 670                             continue;
 671                         }
 672                     #else
 673                         /* get it from the v2 API */
 674                         if (0 != hwloc_topology_export_xmlbuffer(opal_hwloc_topology, &xmlbuffer, &len,
 675                                                                  HWLOC_TOPOLOGY_EXPORT_XML_FLAG_V1)) {
 676                             OBJ_RELEASE(kv);
 677                             continue;
 678                         }
 679                     #endif
 680                     kv->data.string = xmlbuffer;
 681                     kv->type = OPAL_STRING;
 682                     opal_list_append(results, &kv->super);
 683                 }
 684             } else if (0 == strcmp(q->keys[n], OPAL_PMIX_HWLOC_XML_V2)) {
 685                 /* we cannot provide it if we are using v1.x */
 686                 #if HWLOC_API_VERSION >= 0x20000
 687                     if (NULL != opal_hwloc_topology) {
 688                         char *xmlbuffer=NULL;
 689                         int len;
 690                         kv = OBJ_NEW(opal_value_t);
 691                         kv->key = strdup(OPAL_PMIX_HWLOC_XML_V2);
 692                         if (0 != hwloc_topology_export_xmlbuffer(opal_hwloc_topology, &xmlbuffer, &len, 0)) {
 693                             OBJ_RELEASE(kv);
 694                             continue;
 695                         }
 696                         kv->data.string = xmlbuffer;
 697                         kv->type = OPAL_STRING;
 698                         opal_list_append(results, &kv->super);
 699                     }
 700                 #endif
 701             } else if (0 == strcmp(q->keys[n], OPAL_PMIX_SERVER_URI)) {
 702                 /* they want our URI */
 703                 kv = OBJ_NEW(opal_value_t);
 704                 kv->key = strdup(OPAL_PMIX_SERVER_URI);
 705                 kv->type = OPAL_STRING;
 706                 kv->data.string = strdup(orte_process_info.my_hnp_uri);
 707                 opal_list_append(results, &kv->super);
 708             } else if (0 == strcmp(q->keys[n], OPAL_PMIX_QUERY_PROC_TABLE)) {
 709                 /* the job they are asking about is in the qualifiers */
 710                 jobid = ORTE_JOBID_INVALID;
 711                 OPAL_LIST_FOREACH(kv, &q->qualifiers, opal_value_t) {
 712                     if (0 == strcmp(kv->key, OPAL_PMIX_PROCID)) {
 713                         /* save the id */
 714                         jobid = kv->data.name.jobid;
 715                         break;
 716                     }
 717                 }
 718                 if (ORTE_JOBID_INVALID == jobid) {
 719                     rc = ORTE_ERR_NOT_FOUND;
 720                     goto done;
 721                 }
 722                 /* construct a list of values with opal_proc_info_t
 723                  * entries for each proc in the indicated job */
 724                 jdata = orte_get_job_data_object(jobid);
 725                 if (NULL == jdata) {
 726                     rc = ORTE_ERR_NOT_FOUND;
 727                     goto done;
 728                 }
 729                 /* setup the reply */
 730                 kv = OBJ_NEW(opal_value_t);
 731                 kv->key = strdup(OPAL_PMIX_QUERY_PROC_TABLE);
 732                 kv->type = OPAL_PTR;
 733                 array = OBJ_NEW(opal_list_t);
 734                 kv->data.ptr = array;
 735                 opal_list_append(results, &kv->super);
 736                 /* cycle thru the job and create an entry for each proc */
 737                 for (k=0; k < jdata->procs->size; k++) {
 738                     if (NULL == (proct = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, k))) {
 739                         continue;
 740                     }
 741                     kv = OBJ_NEW(opal_value_t);
 742                     kv->type = OPAL_PROC_INFO;
 743                     kv->data.pinfo.name.jobid = jobid;
 744                     kv->data.pinfo.name.vpid = proct->name.vpid;
 745                     if (NULL != proct->node && NULL != proct->node->name) {
 746                         kv->data.pinfo.hostname = strdup(proct->node->name);
 747                     }
 748                     app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, proct->app_idx);
 749                     if (NULL != app && NULL != app->app) {
 750                         kv->data.pinfo.executable_name = strdup(app->app);
 751                     }
 752                     kv->data.pinfo.pid = proct->pid;
 753                     kv->data.pinfo.exit_code = proct->exit_code;
 754                     kv->data.pinfo.state = proct->state;
 755                     opal_list_append(array, &kv->super);
 756                 }
 757             } else if (0 == strcmp(q->keys[n], OPAL_PMIX_QUERY_LOCAL_PROC_TABLE)) {
 758                 /* the job they are asking about is in the qualifiers */
 759                 jobid = ORTE_JOBID_INVALID;
 760                 OPAL_LIST_FOREACH(kv, &q->qualifiers, opal_value_t) {
 761                     if (0 == strcmp(kv->key, OPAL_PMIX_PROCID)) {
 762                         /* save the id */
 763                         jobid = kv->data.name.jobid;
 764                         break;
 765                     }
 766                 }
 767                 if (ORTE_JOBID_INVALID == jobid) {
 768                     rc = ORTE_ERR_BAD_PARAM;
 769                     goto done;
 770                 }
 771                 /* construct a list of values with opal_proc_info_t
 772                  * entries for each LOCAL proc in the indicated job */
 773                 jdata = orte_get_job_data_object(jobid);
 774                 if (NULL == jdata) {
 775                     rc = ORTE_ERR_NOT_FOUND;
 776                     goto done;
 777                 }
 778                 /* setup the reply */
 779                 kv = OBJ_NEW(opal_value_t);
 780                 kv->key = strdup(OPAL_PMIX_QUERY_LOCAL_PROC_TABLE);
 781                 kv->type = OPAL_PTR;
 782                 array = OBJ_NEW(opal_list_t);
 783                 kv->data.ptr = array;
 784                 opal_list_append(results, &kv->super);
 785                 /* cycle thru the job and create an entry for each proc */
 786                 for (k=0; k < jdata->procs->size; k++) {
 787                     if (NULL == (proct = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, k))) {
 788                         continue;
 789                     }
 790                     if (ORTE_FLAG_TEST(proct, ORTE_PROC_FLAG_LOCAL)) {
 791                         kv = OBJ_NEW(opal_value_t);
 792                         kv->type = OPAL_PROC_INFO;
 793                         kv->data.pinfo.name.jobid = jobid;
 794                         kv->data.pinfo.name.vpid = proct->name.vpid;
 795                         if (NULL != proct->node && NULL != proct->node->name) {
 796                             kv->data.pinfo.hostname = strdup(proct->node->name);
 797                         }
 798                         app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, proct->app_idx);
 799                         if (NULL != app && NULL != app->app) {
 800                             kv->data.pinfo.executable_name = strdup(app->app);
 801                         }
 802                         kv->data.pinfo.pid = proct->pid;
 803                         kv->data.pinfo.exit_code = proct->exit_code;
 804                         kv->data.pinfo.state = proct->state;
 805                         opal_list_append(array, &kv->super);
 806                     }
 807                 }
 808             }
 809         }
 810     }
 811 
 812   done:
 813     if (ORTE_SUCCESS == rc) {
 814         if (0 == opal_list_get_size(results)) {
 815             rc = ORTE_ERR_NOT_FOUND;
 816         } else if (opal_list_get_size(results) < opal_list_get_size(cd->info)) {
 817             rc = ORTE_ERR_PARTIAL_SUCCESS;
 818         }
 819     }
 820     cd->infocbfunc(rc, results, cd->cbdata, qrel, results);
 821 }
 822 
 823 int pmix_server_query_fn(opal_process_name_t *requestor,
 824                          opal_list_t *queries,
 825                          opal_pmix_info_cbfunc_t cbfunc, void *cbdata)
 826 {
 827     orte_pmix_server_op_caddy_t *cd;
 828 
 829     if (NULL == queries || NULL == cbfunc) {
 830         return OPAL_ERR_BAD_PARAM;
 831     }
 832 
 833     /* need to threadshift this request */
 834     cd = OBJ_NEW(orte_pmix_server_op_caddy_t);
 835     cd->proc = *requestor;
 836     cd->info = queries;
 837     cd->infocbfunc = cbfunc;
 838     cd->cbdata = cbdata;
 839 
 840     opal_event_set(orte_event_base, &(cd->ev), -1,
 841                    OPAL_EV_WRITE, _query, cd);
 842     opal_event_set_priority(&(cd->ev), ORTE_MSG_PRI);
 843     ORTE_POST_OBJECT(cd);
 844     opal_event_active(&(cd->ev), OPAL_EV_WRITE, 1);
 845 
 846     return ORTE_SUCCESS;
 847 }
 848 
 849 static void _toolconn(int sd, short args, void *cbdata)
 850 {
 851     orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
 852     orte_job_t *jdata;
 853     orte_app_context_t *app;
 854     orte_proc_t *proc;
 855     orte_node_t *node, *nptr;
 856     char *hostname = NULL;
 857     orte_process_name_t tool = {ORTE_JOBID_INVALID, ORTE_VPID_INVALID};
 858     int rc, i;
 859     opal_value_t *val;
 860     bool flag = false, flag_given = false;;
 861 
 862     ORTE_ACQUIRE_OBJECT(cd);
 863 
 864     opal_output_verbose(2, orte_pmix_server_globals.output,
 865                         "%s TOOL CONNECTION PROCESSING",
 866                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 867 
 868    /* check for directives */
 869    if (NULL != cd->info) {
 870        OPAL_LIST_FOREACH(val, cd->info, opal_value_t) {
 871             if (0 == strcmp(val->key, OPAL_PMIX_EVENT_SILENT_TERMINATION)) {
 872                 if (OPAL_UNDEF == val->type || val->data.flag) {
 873                     flag = true;
 874                     flag_given = true;
 875                 }
 876             } else if (0 == strcmp(val->key, OPAL_PMIX_NSPACE)) {
 877                 tool.jobid = val->data.name.jobid;
 878             } else if (0 == strcmp(val->key, OPAL_PMIX_RANK)) {
 879                 tool.vpid = val->data.name.vpid;
 880             } else if (0 == strcmp(val->key, OPAL_PMIX_HOSTNAME)) {
 881                 if (NULL != hostname) {
 882                     /* shouldn't happen, but just take the last one */
 883                     free(hostname);
 884                 }
 885                 hostname = strdup(val->data.string);
 886             }
 887         }
 888     }
 889 
 890     /* if we are not the HNP or master, and the tool doesn't
 891      * already have a name (i.e., we didn't spawn it), then
 892      * there is nothing we can currently do.
 893      * Eventually, when we switch to nspace instead of an
 894      * integer jobid, we'll just locally assign this value */
 895     if (ORTE_JOBID_INVALID == tool.jobid ||
 896         ORTE_VPID_INVALID == tool.vpid) {
 897         /* if we are the HNP, we can directly assign the jobid */
 898         if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_MASTER) {
 899             jdata = OBJ_NEW(orte_job_t);
 900             rc = orte_plm_base_create_jobid(jdata);
 901             if (ORTE_SUCCESS != rc) {
 902                 OBJ_RELEASE(jdata);
 903                 if (NULL != cd->toolcbfunc) {
 904                     cd->toolcbfunc(ORTE_ERROR, tool, cd->cbdata);
 905                 }
 906                 OBJ_RELEASE(cd);
 907                 if (NULL != hostname) {
 908                     free(hostname);
 909                 }
 910                 return;
 911             }
 912             tool.jobid = jdata->jobid;
 913             tool.vpid = 0;
 914         } else {
 915             /* we currently do not support connections to non-HNP/master
 916              * daemons from tools that were not spawned by a daemon */
 917             if (NULL != cd->toolcbfunc) {
 918                 cd->toolcbfunc(ORTE_ERR_NOT_SUPPORTED, tool, cd->cbdata);
 919             }
 920             OBJ_RELEASE(cd);
 921             if (NULL != hostname) {
 922                 free(hostname);
 923             }
 924             return;
 925         }
 926     } else {
 927         jdata = OBJ_NEW(orte_job_t);
 928         jdata->jobid = tool.jobid;
 929     }
 930 
 931     opal_hash_table_set_value_uint32(orte_job_data, jdata->jobid, jdata);
 932     /* setup some required job-level fields in case this
 933      * tool calls spawn, or uses some other functions that
 934      * need them */
 935     /* must create a map for it (even though it has no
 936      * info in it) so that the job info will be picked
 937      * up in subsequent pidmaps or other daemons won't
 938      * know how to route
 939      */
 940     jdata->map = OBJ_NEW(orte_job_map_t);
 941 
 942     /* setup an app_context for the singleton */
 943     app = OBJ_NEW(orte_app_context_t);
 944     app->app = strdup("tool");
 945     app->num_procs = 1;
 946     opal_pointer_array_add(jdata->apps, app);
 947     jdata->num_apps = 1;
 948 
 949     /* setup a proc object for the singleton - since we
 950      * -must- be the HNP, and therefore we stored our
 951      * node on the global node pool, and since the singleton
 952      * -must- be on the same node as us, indicate that
 953      */
 954     proc = OBJ_NEW(orte_proc_t);
 955     proc->name.jobid = jdata->jobid;
 956     proc->name.vpid = tool.vpid;
 957     proc->parent = ORTE_PROC_MY_NAME->vpid;
 958     ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_ALIVE);
 959     ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_TOOL);
 960     proc->state = ORTE_PROC_STATE_RUNNING;
 961     /* set the trivial */
 962     proc->local_rank = 0;
 963     proc->node_rank = 0;
 964     proc->app_rank = 0;
 965     proc->app_idx = 0;
 966     if (NULL == hostname) {
 967         /* it is on my node */
 968         node = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, 0);
 969         ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_LOCAL);
 970     } else {
 971         /* we need to locate it */
 972         node = NULL;
 973         for (i=0; i < orte_node_pool->size; i++) {
 974             if (NULL == (nptr = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, i))) {
 975                 continue;
 976             }
 977             if (0 == strcmp(hostname, nptr->name)) {
 978                 node = nptr;
 979                 break;
 980             }
 981         }
 982         if (NULL == node) {
 983             /* not in our allocation - which is still okay */
 984             node = OBJ_NEW(orte_node_t);
 985             node->name = strdup(hostname);
 986             ORTE_FLAG_SET(node, ORTE_NODE_NON_USABLE);
 987             opal_pointer_array_add(orte_node_pool, node);
 988         }
 989         free(hostname);  // no longer needed
 990     }
 991     proc->node = node;
 992     OBJ_RETAIN(node);  /* keep accounting straight */
 993     opal_pointer_array_add(jdata->procs, proc);
 994     jdata->num_procs = 1;
 995     /* add the node to the job map */
 996     OBJ_RETAIN(node);
 997     opal_pointer_array_add(jdata->map->nodes, node);
 998     jdata->map->num_nodes++;
 999     /* and it obviously is on the node - note that
1000      * we do _not_ increment the #procs on the node
1001      * as the tool doesn't count against the slot
1002      * allocation */
1003     OBJ_RETAIN(proc);
1004     opal_pointer_array_add(node->procs, proc);
1005     /* if they indicated a preference for termination, set it */
1006     if (flag_given) {
1007         orte_set_attribute(&jdata->attributes, ORTE_JOB_SILENT_TERMINATION,
1008                            ORTE_ATTR_GLOBAL, &flag, OPAL_BOOL);
1009     } else {
1010         /* we default to silence */
1011         flag = true;
1012         orte_set_attribute(&jdata->attributes, ORTE_JOB_SILENT_TERMINATION,
1013                            ORTE_ATTR_GLOBAL, &flag, OPAL_BOOL);
1014     }
1015 
1016     if (NULL != cd->toolcbfunc) {
1017         cd->toolcbfunc(ORTE_ERR_NOT_SUPPORTED, tool, cd->cbdata);
1018     }
1019     OBJ_RELEASE(cd);
1020 }
1021 
1022 void pmix_tool_connected_fn(opal_list_t *info,
1023                             opal_pmix_tool_connection_cbfunc_t cbfunc,
1024                             void *cbdata)
1025 {
1026     orte_pmix_server_op_caddy_t *cd;
1027 
1028     opal_output_verbose(2, orte_pmix_server_globals.output,
1029                         "%s TOOL CONNECTION REQUEST RECVD",
1030                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
1031 
1032     /* need to threadshift this request */
1033     cd = OBJ_NEW(orte_pmix_server_op_caddy_t);
1034     cd->info = info;
1035     cd->toolcbfunc = cbfunc;
1036     cd->cbdata = cbdata;
1037 
1038     opal_event_set(orte_event_base, &(cd->ev), -1,
1039                    OPAL_EV_WRITE, _toolconn, cd);
1040     opal_event_set_priority(&(cd->ev), ORTE_MSG_PRI);
1041     ORTE_POST_OBJECT(cd);
1042     opal_event_active(&(cd->ev), OPAL_EV_WRITE, 1);
1043 
1044 }
1045 
1046 static void lgcbfn(int sd, short args, void *cbdata)
1047 {
1048     orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
1049 
1050     if (NULL != cd->cbfunc) {
1051         cd->cbfunc(cd->status, cd->cbdata);
1052     }
1053     OBJ_RELEASE(cd);
1054 }
1055 
1056 void pmix_server_log_fn(opal_process_name_t *requestor,
1057                         opal_list_t *info,
1058                         opal_list_t *directives,
1059                         opal_pmix_op_cbfunc_t cbfunc,
1060                         void *cbdata)
1061 {
1062     opal_value_t *val;
1063     opal_buffer_t *buf;
1064     int rc;
1065 
1066     opal_output_verbose(2, orte_pmix_server_globals.output,
1067                         "%s logging info",
1068                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
1069 
1070     OPAL_LIST_FOREACH(val, info, opal_value_t) {
1071         if (NULL == val->key) {
1072             ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
1073             continue;
1074         }
1075         if (0 == strcmp(val->key, OPAL_PMIX_LOG_MSG)) {
1076             /* pull out the blob */
1077             if (OPAL_BYTE_OBJECT != val->type) {
1078                 continue;
1079             }
1080             buf = OBJ_NEW(opal_buffer_t);
1081             opal_dss.load(buf, val->data.bo.bytes, val->data.bo.size);
1082             val->data.bo.bytes = NULL;
1083             if (ORTE_SUCCESS != (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf,
1084                                                               ORTE_RML_TAG_SHOW_HELP,
1085                                                               orte_rml_send_callback, NULL))) {
1086                 ORTE_ERROR_LOG(rc);
1087                 OBJ_RELEASE(buf);
1088             }
1089         } else if (0 == strcmp(val->key, OPAL_PMIX_LOG_STDERR)) {
1090             if (ORTE_SUCCESS != (rc = orte_iof.output(requestor, ORTE_IOF_STDERR, val->data.string))) {
1091                 ORTE_ERROR_LOG(rc);
1092             }
1093         } else if (0 == strcmp(val->key, OPAL_PMIX_LOG_STDOUT)) {
1094             if (ORTE_SUCCESS != (rc = orte_iof.output(requestor, ORTE_IOF_STDOUT, val->data.string))) {
1095                 ORTE_ERROR_LOG(rc);
1096             }
1097         }
1098     }
1099 
1100     /* we cannot directly execute the callback here
1101      * as it would threadlock - so shift to somewhere
1102      * safe */
1103     rc = ORTE_SUCCESS;  // unused - silence compiler warning
1104     ORTE_PMIX_THREADSHIFT(requestor, NULL, rc,
1105                           NULL, NULL, lgcbfn,
1106                           cbfunc, cbdata);
1107 }
1108 
1109 int pmix_server_job_ctrl_fn(const opal_process_name_t *requestor,
1110                             opal_list_t *targets,
1111                             opal_list_t *info,
1112                             opal_pmix_info_cbfunc_t cbfunc,
1113                             void *cbdata)
1114 {
1115     opal_value_t *val;
1116     int rc, n;
1117     orte_proc_t *proc;
1118     opal_pointer_array_t parray, *ptrarray;
1119     opal_namelist_t *nm;
1120     opal_buffer_t *cmd;
1121     orte_daemon_cmd_flag_t cmmnd = ORTE_DAEMON_HALT_VM_CMD;
1122     orte_grpcomm_signature_t *sig;
1123 
1124     opal_output_verbose(2, orte_pmix_server_globals.output,
1125                         "%s job control request from %s",
1126                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1127                         ORTE_NAME_PRINT(requestor));
1128 
1129     OPAL_LIST_FOREACH(val, info, opal_value_t) {
1130         if (NULL == val->key) {
1131             ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
1132             continue;
1133         }
1134         if (0 == strcmp(val->key, OPAL_PMIX_JOB_CTRL_KILL)) {
1135             /* convert the list of targets to a pointer array */
1136             if (0 == opal_list_get_size(targets)) {
1137                 ptrarray = NULL;
1138             } else {
1139                 OBJ_CONSTRUCT(&parray, opal_pointer_array_t);
1140                 OPAL_LIST_FOREACH(nm, targets, opal_namelist_t) {
1141                     /* get the proc object for this proc */
1142                     if (NULL == (proc = orte_get_proc_object(&nm->name))) {
1143                         ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
1144                         continue;
1145                     }
1146                     OBJ_RETAIN(proc);
1147                     opal_pointer_array_add(&parray, proc);
1148                 }
1149                 ptrarray = &parray;
1150             }
1151             if (ORTE_SUCCESS != (rc = orte_plm.terminate_procs(ptrarray))) {
1152                 ORTE_ERROR_LOG(rc);
1153             }
1154             if (NULL != ptrarray) {
1155                 /* cleanup the array */
1156                 for (n=0; n < parray.size; n++) {
1157                     if (NULL != (proc = (orte_proc_t*)opal_pointer_array_get_item(&parray, n))) {
1158                         OBJ_RELEASE(proc);
1159                     }
1160                 }
1161                 OBJ_DESTRUCT(&parray);
1162             }
1163             continue;
1164         } else if (0 == strcmp(val->key, OPAL_PMIX_JOB_CTRL_TERMINATE)) {
1165             if (0 == opal_list_get_size(targets)) {
1166                 /* terminate the daemons and all running jobs */
1167                 cmd = OBJ_NEW(opal_buffer_t);
1168                 /* pack the command */
1169                 if (ORTE_SUCCESS != (rc = opal_dss.pack(cmd, &cmmnd, 1, ORTE_DAEMON_CMD))) {
1170                     ORTE_ERROR_LOG(rc);
1171                     OBJ_RELEASE(cmd);
1172                     return rc;
1173                 }
1174                 /* goes to all daemons */
1175                 sig = OBJ_NEW(orte_grpcomm_signature_t);
1176                 sig->signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t));
1177                 sig->signature[0].jobid = ORTE_PROC_MY_NAME->jobid;
1178                 sig->signature[0].vpid = ORTE_VPID_WILDCARD;
1179                 if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(sig, ORTE_RML_TAG_DAEMON, cmd))) {
1180                     ORTE_ERROR_LOG(rc);
1181                 }
1182                 OBJ_RELEASE(cmd);
1183                 OBJ_RELEASE(sig);
1184             }
1185         }
1186     }
1187 
1188     return ORTE_OPERATION_SUCCEEDED;
1189 }

/* [<][>][^][v][top][bottom][index][help] */