root/orte/orted/pmix/pmix_server.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. pmix_server_register_params
  2. eviction_cbfunc
  3. pmix_server_init
  4. pmix_server_start
  5. pmix_server_finalize
  6. send_error
  7. _mdxresp
  8. modex_resp
  9. pmix_server_dmdx_recv
  10. dccon
  11. dcdes
  12. relcbfunc
  13. pmix_server_dmdx_resp
  14. opcon
  15. rqcon
  16. rqdes
  17. mdcon
  18. mddes

   1 /*
   2  * Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2011 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2006-2013 Los Alamos National Security, LLC.
  13  *                         All rights reserved.
  14  * Copyright (c) 2009-2012 Cisco Systems, Inc.  All rights reserved.
  15  * Copyright (c) 2011      Oak Ridge National Labs.  All rights reserved.
  16  * Copyright (c) 2013-2019 Intel, Inc.  All rights reserved.
  17  * Copyright (c) 2014-2017 Mellanox Technologies, Inc.
  18  *                         All rights reserved.
  19  * Copyright (c) 2014-2015 Research Organization for Information Science
  20  *                         and Technology (RIST). All rights reserved.
  21  * $COPYRIGHT$
  22  *
  23  * Additional copyrights may follow
  24  *
  25  * $HEADER$
  26  *
  27  */
  28 
  29 #include "orte_config.h"
  30 #include "orte/types.h"
  31 #include "opal/types.h"
  32 
  33 #ifdef HAVE_UNISTD_H
  34 #include <unistd.h>
  35 #endif
  36 #ifdef HAVE_SYS_TYPES_H
  37 #include <sys/types.h>
  38 #endif
  39 #include <fcntl.h>
  40 #ifdef HAVE_NETINET_IN_H
  41 #include <netinet/in.h>
  42 #endif
  43 #ifdef HAVE_ARPA_INET_H
  44 #include <arpa/inet.h>
  45 #endif
  46 #ifdef HAVE_NETDB_H
  47 #include <netdb.h>
  48 #endif
  49 #include <ctype.h>
  50 
  51 #include "opal_stdint.h"
  52 #include "opal/class/opal_hotel.h"
  53 #include "opal/class/opal_list.h"
  54 #include "opal/mca/base/mca_base_var.h"
  55 #include "opal/mca/pmix/pmix.h"
  56 #include "opal/util/opal_environ.h"
  57 #include "opal/util/show_help.h"
  58 #include "opal/util/error.h"
  59 #include "opal/util/output.h"
  60 #include "opal/util/os_path.h"
  61 #include "opal/util/argv.h"
  62 #include "opal/util/printf.h"
  63 
  64 #include "orte/mca/errmgr/errmgr.h"
  65 #include "orte/mca/grpcomm/grpcomm.h"
  66 #include "orte/mca/rml/rml.h"
  67 #include "orte/mca/rml/base/rml_contact.h"
  68 #include "orte/util/name_fns.h"
  69 #include "orte/util/proc_info.h"
  70 #include "orte/util/session_dir.h"
  71 #include "orte/util/show_help.h"
  72 #include "orte/util/threads.h"
  73 #include "orte/runtime/orte_globals.h"
  74 #include "orte/runtime/orte_data_server.h"
  75 
  76 #include "pmix_server.h"
  77 #include "pmix_server_internal.h"
  78 
  79 /*
  80  * Local utility functions
  81  */
  82 static void pmix_server_dmdx_recv(int status, orte_process_name_t* sender,
  83                                   opal_buffer_t *buffer,
  84                                   orte_rml_tag_t tg, void *cbdata);
  85 static void pmix_server_dmdx_resp(int status, orte_process_name_t* sender,
  86                                   opal_buffer_t *buffer,
  87                                   orte_rml_tag_t tg, void *cbdata);
  88 
  89 #define ORTE_PMIX_SERVER_MIN_ROOMS    4096
  90 
  91 pmix_server_globals_t orte_pmix_server_globals = {0};
  92 
  93 static opal_pmix_server_module_t pmix_server = {
  94     .client_connected = pmix_server_client_connected_fn,
  95     .client_finalized = pmix_server_client_finalized_fn,
  96     .abort = pmix_server_abort_fn,
  97     .fence_nb = pmix_server_fencenb_fn,
  98     .direct_modex = pmix_server_dmodex_req_fn,
  99     .publish = pmix_server_publish_fn,
 100     .lookup = pmix_server_lookup_fn,
 101     .unpublish = pmix_server_unpublish_fn,
 102     .spawn = pmix_server_spawn_fn,
 103     .connect = pmix_server_connect_fn,
 104     .disconnect = pmix_server_disconnect_fn,
 105     .register_events = pmix_server_register_events_fn,
 106     .deregister_events = pmix_server_deregister_events_fn,
 107     .notify_event = pmix_server_notify_event,
 108     .query = pmix_server_query_fn,
 109     .tool_connected = pmix_tool_connected_fn,
 110     .log = pmix_server_log_fn,
 111     .allocate = pmix_server_alloc_fn,
 112     .job_control = pmix_server_job_ctrl_fn
 113 };
 114 
 115 void pmix_server_register_params(void)
 116 {
 117     /* register a verbosity */
 118     orte_pmix_server_globals.verbosity = -1;
 119     (void) mca_base_var_register ("orte", "pmix", NULL, "server_verbose",
 120                                   "Debug verbosity for PMIx server",
 121                                   MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
 122                                   OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_ALL,
 123                                   &orte_pmix_server_globals.verbosity);
 124     if (0 <= orte_pmix_server_globals.verbosity) {
 125         orte_pmix_server_globals.output = opal_output_open(NULL);
 126         opal_output_set_verbosity(orte_pmix_server_globals.output,
 127                                   orte_pmix_server_globals.verbosity);
 128     }
 129     /* specify the size of the hotel */
 130     orte_pmix_server_globals.num_rooms = -1;
 131     (void) mca_base_var_register ("orte", "pmix", NULL, "server_max_reqs",
 132                                   "Maximum number of backlogged PMIx server direct modex requests",
 133                                   MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
 134                                   OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_ALL,
 135                                   &orte_pmix_server_globals.num_rooms);
 136     /* specify the timeout for the hotel */
 137     orte_pmix_server_globals.timeout = 2;
 138     (void) mca_base_var_register ("orte", "pmix", NULL, "server_max_wait",
 139                                   "Maximum time (in seconds) the PMIx server should wait to service direct modex requests",
 140                                   MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
 141                                   OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_ALL,
 142                                   &orte_pmix_server_globals.timeout);
 143 
 144     /* whether or not to wait for the universal server */
 145     orte_pmix_server_globals.wait_for_server = false;
 146     (void) mca_base_var_register ("orte", "pmix", NULL, "wait_for_server",
 147                                   "Whether or not to wait for the session-level server to start",
 148                                   MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
 149                                   OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_ALL,
 150                                   &orte_pmix_server_globals.wait_for_server);
 151 
 152     /* whether or not to support legacy usock connections as well as tcp */
 153     orte_pmix_server_globals.legacy = false;
 154     (void) mca_base_var_register ("orte", "pmix", NULL, "server_usock_connections",
 155                                   "Whether or not to support legacy usock connections",
 156                                   MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
 157                                   OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_ALL,
 158                                   &orte_pmix_server_globals.legacy);
 159 
 160     /* whether or not to drop a session-level tool rendezvous point */
 161     orte_pmix_server_globals.session_server = false;
 162     (void) mca_base_var_register ("orte", "pmix", NULL, "session_server",
 163                                   "Whether or not to drop a session-level tool rendezvous point",
 164                                   MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
 165                                   OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_ALL,
 166                                   &orte_pmix_server_globals.session_server);
 167 
 168     /* whether or not to drop a system-level tool rendezvous point */
 169     orte_pmix_server_globals.system_server = false;
 170     (void) mca_base_var_register ("orte", "pmix", NULL, "system_server",
 171                                   "Whether or not to drop a system-level tool rendezvous point",
 172                                   MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
 173                                   OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_ALL,
 174                                   &orte_pmix_server_globals.system_server);
 175 }
 176 
 177 static void eviction_cbfunc(struct opal_hotel_t *hotel,
 178                             int room_num, void *occupant)
 179 {
 180     pmix_server_req_t *req = (pmix_server_req_t*)occupant;
 181     bool timeout = false;
 182     int rc=OPAL_ERR_TIMEOUT;
 183 
 184     /* decrement the request timeout */
 185     req->timeout -= orte_pmix_server_globals.timeout;
 186     if (req->timeout > 0) {
 187         req->timeout -= orte_pmix_server_globals.timeout;
 188         if (0 >= req->timeout) {
 189             timeout = true;
 190         }
 191     }
 192     if (!timeout) {
 193         /* not done yet - check us back in */
 194         if (OPAL_SUCCESS == (rc = opal_hotel_checkin(&orte_pmix_server_globals.reqs, req, &req->room_num))) {
 195             return;
 196         }
 197         ORTE_ERROR_LOG(rc);
 198         /* fall thru and return an error so the caller doesn't hang */
 199     } else {
 200         orte_show_help("help-orted.txt", "timedout", true, req->operation);
 201     }
 202     /* don't let the caller hang */
 203     if (NULL != req->opcbfunc) {
 204         req->opcbfunc(OPAL_ERR_TIMEOUT, req->cbdata);
 205     } else if (NULL != req->mdxcbfunc) {
 206         req->mdxcbfunc(OPAL_ERR_TIMEOUT, NULL, 0, req->cbdata, NULL, NULL);
 207     } else if (NULL != req->spcbfunc) {
 208         req->spcbfunc(OPAL_ERR_TIMEOUT, ORTE_JOBID_INVALID, req->cbdata);
 209     } else if (NULL != req->lkcbfunc) {
 210         req->lkcbfunc(OPAL_ERR_TIMEOUT, NULL, req->cbdata);
 211     }
 212     OBJ_RELEASE(req);
 213 }
 214 
 215 /*
 216  * Initialize global variables used w/in the server.
 217  */
 218 int pmix_server_init(void)
 219 {
 220     int rc;
 221     opal_list_t info;
 222     opal_value_t *kv;
 223 
 224     if (orte_pmix_server_globals.initialized) {
 225         return ORTE_SUCCESS;
 226     }
 227     orte_pmix_server_globals.initialized = true;
 228 
 229     /* setup the server's state variables */
 230     OBJ_CONSTRUCT(&orte_pmix_server_globals.reqs, opal_hotel_t);
 231     /* by the time we init the server, we should know how many nodes we
 232      * have in our environment - with the exception of mpirun. If the
 233      * user specified the size of the hotel, then use that value. Otherwise,
 234      * set the value to something large to avoid running out of rooms on
 235      * large machines */
 236     if (-1 == orte_pmix_server_globals.num_rooms) {
 237         orte_pmix_server_globals.num_rooms = orte_process_info.num_procs * 2;
 238         if (orte_pmix_server_globals.num_rooms < ORTE_PMIX_SERVER_MIN_ROOMS) {
 239             orte_pmix_server_globals.num_rooms = ORTE_PMIX_SERVER_MIN_ROOMS;
 240         }
 241     }
 242     if (OPAL_SUCCESS != (rc = opal_hotel_init(&orte_pmix_server_globals.reqs,
 243                                               orte_pmix_server_globals.num_rooms,
 244                                               orte_event_base, orte_pmix_server_globals.timeout*1000000,
 245                                               ORTE_ERROR_PRI, eviction_cbfunc))) {
 246         ORTE_ERROR_LOG(rc);
 247         return rc;
 248     }
 249     OBJ_CONSTRUCT(&orte_pmix_server_globals.notifications, opal_list_t);
 250     orte_pmix_server_globals.server = *ORTE_NAME_INVALID;
 251 
 252     OBJ_CONSTRUCT(&info, opal_list_t);
 253     /* tell the server our temp directory */
 254     kv = OBJ_NEW(opal_value_t);
 255     kv->key = strdup(OPAL_PMIX_SERVER_TMPDIR);
 256     kv->type = OPAL_STRING;
 257     kv->data.string = opal_os_path(false, orte_process_info.jobfam_session_dir, NULL);
 258     opal_list_append(&info, &kv->super);
 259     if (!orte_pmix_server_globals.legacy) {
 260         /* use only one listener */
 261         kv = OBJ_NEW(opal_value_t);
 262         kv->key = strdup(OPAL_PMIX_SINGLE_LISTENER);
 263         kv->type = OPAL_BOOL;
 264         kv->data.flag = true;
 265         opal_list_append(&info, &kv->super);
 266     }
 267     /* tell the server to use its own internal monitoring */
 268     kv = OBJ_NEW(opal_value_t);
 269     kv->key = strdup(OPAL_PMIX_SERVER_ENABLE_MONITORING);
 270     kv->type = OPAL_BOOL;
 271     kv->data.flag = true;
 272     opal_list_append(&info, &kv->super);
 273     /* if requested, tell the server to drop a session-level
 274      * PMIx connection point */
 275     if (orte_pmix_server_globals.session_server) {
 276         kv = OBJ_NEW(opal_value_t);
 277         kv->key = strdup(OPAL_PMIX_SERVER_TOOL_SUPPORT);
 278         kv->type = OPAL_BOOL;
 279         kv->data.flag = true;
 280         opal_list_append(&info, &kv->super);
 281     }
 282 
 283     /* if requested, tell the server to drop a system-level
 284      * PMIx connection point - only do this for the HNP as, in
 285      * at least one case, a daemon can be colocated with the
 286      * HNP and would overwrite the server rendezvous file */
 287     if (orte_pmix_server_globals.system_server &&
 288         (ORTE_PROC_IS_HNP || ORTE_PROC_IS_MASTER)) {
 289         kv = OBJ_NEW(opal_value_t);
 290         kv->key = strdup(OPAL_PMIX_SERVER_SYSTEM_SUPPORT);
 291         kv->type = OPAL_BOOL;
 292         kv->data.flag = true;
 293         opal_list_append(&info, &kv->super);
 294     }
 295 
 296     /* if we are the HNP or MASTER, then we are a gateway */
 297     if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_MASTER) {
 298         kv = OBJ_NEW(opal_value_t);
 299         kv->key = strdup(OPAL_PMIX_SERVER_GATEWAY);
 300         kv->type = OPAL_BOOL;
 301         kv->data.flag = true;
 302         opal_list_append(&info, &kv->super);
 303     }
 304 
 305     /* setup the local server */
 306     if (ORTE_SUCCESS != (rc = opal_pmix.server_init(&pmix_server, &info))) {
 307         /* pmix will provide a nice show_help output here */
 308         return rc;
 309     }
 310     OPAL_LIST_DESTRUCT(&info);
 311 
 312     return rc;
 313 }
 314 
 315 void pmix_server_start(void)
 316 {
 317     /* setup our local data server */
 318     orte_data_server_init();
 319 
 320     /* setup recv for direct modex requests */
 321      orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DIRECT_MODEX,
 322                              ORTE_RML_PERSISTENT, pmix_server_dmdx_recv, NULL);
 323 
 324      /* setup recv for replies to direct modex requests */
 325      orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DIRECT_MODEX_RESP,
 326                              ORTE_RML_PERSISTENT, pmix_server_dmdx_resp, NULL);
 327 
 328      /* setup recv for replies to proxy launch requests */
 329      orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_LAUNCH_RESP,
 330                              ORTE_RML_PERSISTENT, pmix_server_launch_resp, NULL);
 331 
 332      /* setup recv for replies from data server */
 333      orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DATA_CLIENT,
 334                              ORTE_RML_PERSISTENT, pmix_server_keyval_client, NULL);
 335 
 336      /* setup recv for notifications */
 337      orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_NOTIFICATION,
 338                              ORTE_RML_PERSISTENT, pmix_server_notify, NULL);
 339 }
 340 
 341 void pmix_server_finalize(void)
 342 {
 343     if (!orte_pmix_server_globals.initialized) {
 344         return;
 345     }
 346 
 347     opal_output_verbose(2, orte_pmix_server_globals.output,
 348                         "%s Finalizing PMIX server",
 349                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
 350 
 351     /* stop receives */
 352     orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DIRECT_MODEX);
 353     orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DIRECT_MODEX_RESP);
 354     orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_LAUNCH_RESP);
 355     orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DATA_CLIENT);
 356     orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_NOTIFICATION);
 357 
 358     /* finalize our local data server */
 359     orte_data_server_finalize();
 360 
 361     /* shutdown the local server */
 362     opal_pmix.server_finalize();
 363 
 364     /* cleanup collectives */
 365     OBJ_DESTRUCT(&orte_pmix_server_globals.reqs);
 366     OPAL_LIST_DESTRUCT(&orte_pmix_server_globals.notifications);
 367 }
 368 
 369 static void send_error(int status, opal_process_name_t *idreq,
 370                        orte_process_name_t *remote, int remote_room)
 371 {
 372     opal_buffer_t *reply;
 373     int rc;
 374 
 375     reply = OBJ_NEW(opal_buffer_t);
 376     /* pack the status */
 377     if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &status, 1, OPAL_INT))) {
 378         ORTE_ERROR_LOG(rc);
 379         goto error;
 380     }
 381     /* pack the id of the requested proc */
 382     if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, idreq, 1, OPAL_NAME))) {
 383         ORTE_ERROR_LOG(rc);
 384         goto error;
 385     }
 386 
 387     /* pack the remote daemon's request room number */
 388     if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &remote_room, 1, OPAL_INT))) {
 389         ORTE_ERROR_LOG(rc);
 390         goto error;
 391     }
 392 
 393     /* send the response */
 394     orte_rml.send_buffer_nb(remote, reply,
 395                             ORTE_RML_TAG_DIRECT_MODEX_RESP,
 396                             orte_rml_send_callback, NULL);
 397     return;
 398 error:
 399     OBJ_RELEASE(reply);
 400     return;
 401 }
 402 
 403 static void _mdxresp(int sd, short args, void *cbdata)
 404 {
 405     pmix_server_req_t *req = (pmix_server_req_t*)cbdata;
 406     int rc;
 407     opal_buffer_t *reply;
 408 
 409     ORTE_ACQUIRE_OBJECT(req);
 410 
 411     /* check us out of the hotel */
 412     opal_hotel_checkout(&orte_pmix_server_globals.reqs, req->room_num);
 413 
 414     reply = OBJ_NEW(opal_buffer_t);
 415     /* return the status */
 416     if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &req->status, 1, OPAL_INT))) {
 417         ORTE_ERROR_LOG(rc);
 418         OBJ_RELEASE(reply);
 419         goto done;
 420     }
 421     /* pack the id of the requested proc */
 422     if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &req->target, 1, OPAL_NAME))) {
 423         ORTE_ERROR_LOG(rc);
 424         OBJ_RELEASE(reply);
 425         goto done;
 426     }
 427     /* pack the remote daemon's request room number */
 428     if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &req->remote_room_num, 1, OPAL_INT))) {
 429         ORTE_ERROR_LOG(rc);
 430         OBJ_RELEASE(reply);
 431         goto done;
 432     }
 433     /* return any provided data */
 434     opal_dss.copy_payload(reply, &req->msg);
 435 
 436     /* send the response */
 437     orte_rml.send_buffer_nb(&req->proxy, reply,
 438                             ORTE_RML_TAG_DIRECT_MODEX_RESP,
 439                             orte_rml_send_callback, NULL);
 440 
 441   done:
 442     /* if they asked for a release, give it to them */
 443     if (NULL != req->rlcbfunc) {
 444         req->rlcbfunc(req->cbdata);
 445     }
 446     OBJ_RELEASE(req);
 447     return;
 448 }
 449 /* the modex_resp function takes place in the local PMIx server's
 450  * progress thread - we must therefore thread-shift it so we can
 451  * access our global data */
 452 static void modex_resp(int status,
 453                        const char *data, size_t sz, void *cbdata,
 454                        opal_pmix_release_cbfunc_t relcbfunc, void *relcbdata)
 455 {
 456     pmix_server_req_t *req = (pmix_server_req_t*)cbdata;
 457     opal_buffer_t xfer;
 458 
 459     ORTE_ACQUIRE_OBJECT(req);
 460 
 461     req->status = status;
 462     /* we need to preserve the data as the caller
 463      * will free it upon our return */
 464     OBJ_CONSTRUCT(&xfer, opal_buffer_t);
 465     opal_dss.load(&xfer, (void*)data, sz);
 466     opal_dss.copy_payload(&req->msg, &xfer);
 467     xfer.base_ptr = NULL; // protect the incoming data
 468     OBJ_DESTRUCT(&xfer);
 469     /* point to the callback */
 470     req->rlcbfunc = relcbfunc;
 471     req->cbdata = relcbdata;
 472     opal_event_set(orte_event_base, &(req->ev),
 473                    -1, OPAL_EV_WRITE, _mdxresp, req);
 474     opal_event_set_priority(&(req->ev), ORTE_MSG_PRI);
 475     ORTE_POST_OBJECT(req);
 476     opal_event_active(&(req->ev), OPAL_EV_WRITE, 1);
 477 }
 478 static void pmix_server_dmdx_recv(int status, orte_process_name_t* sender,
 479                                   opal_buffer_t *buffer,
 480                                   orte_rml_tag_t tg, void *cbdata)
 481 {
 482     int rc, room_num;
 483     int32_t cnt;
 484     opal_process_name_t idreq;
 485     orte_process_name_t name;
 486     orte_job_t *jdata;
 487     orte_proc_t *proc;
 488     pmix_server_req_t *req;
 489 
 490 
 491     /* unpack the id of the proc whose data is being requested */
 492     cnt = 1;
 493     if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &idreq, &cnt, OPAL_NAME))) {
 494         ORTE_ERROR_LOG(rc);
 495         return;
 496     }
 497     opal_output_verbose(2, orte_pmix_server_globals.output,
 498                         "%s dmdx:recv request from proc %s for proc %s",
 499                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 500                         ORTE_NAME_PRINT(sender),
 501                         ORTE_NAME_PRINT(&idreq));
 502     /* and the remote daemon's tracking room number */
 503     cnt = 1;
 504     if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &room_num, &cnt, OPAL_INT))) {
 505         ORTE_ERROR_LOG(rc);
 506         return;
 507     }
 508     /* is this proc one of mine? */
 509     memcpy((char*)&name, (char*)&idreq, sizeof(orte_process_name_t));
 510     if (NULL == (jdata = orte_get_job_data_object(name.jobid))) {
 511         /* not having the jdata means that we haven't unpacked the
 512          * the launch message for this job yet - this is a race
 513          * condition, so just log the request and we will fill
 514          * it later */
 515         req = OBJ_NEW(pmix_server_req_t);
 516         opal_asprintf(&req->operation, "DMDX: %s:%d", __FILE__, __LINE__);
 517         req->proxy = *sender;
 518         req->target = idreq;
 519         req->remote_room_num = room_num;
 520         /* adjust the timeout to reflect the size of the job as it can take some
 521          * amount of time to start the job */
 522         ORTE_ADJUST_TIMEOUT(req);
 523         if (OPAL_SUCCESS != (rc = opal_hotel_checkin(&orte_pmix_server_globals.reqs, req, &req->room_num))) {
 524             orte_show_help("help-orted.txt", "noroom", true, req->operation, orte_pmix_server_globals.num_rooms);
 525             OBJ_RELEASE(req);
 526             send_error(rc, &idreq, sender, room_num);
 527         }
 528         return;
 529     }
 530     if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, name.vpid))) {
 531         /* this is truly an error, so notify the sender */
 532         send_error(ORTE_ERR_NOT_FOUND, &idreq, sender, room_num);
 533         return;
 534     }
 535     if (!ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_LOCAL)) {
 536         /* send back an error - they obviously have made a mistake */
 537         send_error(ORTE_ERR_NOT_FOUND, &idreq, sender, room_num);
 538         return;
 539     }
 540     /* track the request since the call down to the PMIx server
 541      * is asynchronous */
 542     req = OBJ_NEW(pmix_server_req_t);
 543     opal_asprintf(&req->operation, "DMDX: %s:%d", __FILE__, __LINE__);
 544     req->proxy = *sender;
 545     req->target = idreq;
 546     req->remote_room_num = room_num;
 547     /* adjust the timeout to reflect the size of the job as it can take some
 548      * amount of time to start the job */
 549     ORTE_ADJUST_TIMEOUT(req);
 550     if (OPAL_SUCCESS != (rc = opal_hotel_checkin(&orte_pmix_server_globals.reqs, req, &req->room_num))) {
 551         orte_show_help("help-orted.txt", "noroom", true, req->operation, orte_pmix_server_globals.num_rooms);
 552         OBJ_RELEASE(req);
 553         send_error(rc, &idreq, sender, room_num);
 554         return;
 555     }
 556 
 557     /* ask our local pmix server for the data */
 558     if (OPAL_SUCCESS != (rc = opal_pmix.server_dmodex_request(&idreq, modex_resp, req))) {
 559         ORTE_ERROR_LOG(rc);
 560         opal_hotel_checkout(&orte_pmix_server_globals.reqs, req->room_num);
 561         OBJ_RELEASE(req);
 562         send_error(rc, &idreq, sender, room_num);
 563         return;
 564     }
 565     return;
 566 }
 567 
 568 typedef struct {
 569     opal_object_t super;
 570     char *data;
 571     int32_t ndata;
 572 } datacaddy_t;
 573 static void dccon(datacaddy_t *p)
 574 {
 575     p->data = NULL;
 576 }
 577 static void dcdes(datacaddy_t *p)
 578 {
 579     if (NULL != p->data) {
 580         free(p->data);
 581     }
 582 }
 583 static OBJ_CLASS_INSTANCE(datacaddy_t,
 584                           opal_object_t,
 585                           dccon, dcdes);
 586 
 587 static void relcbfunc(void *relcbdata)
 588 {
 589     datacaddy_t *d = (datacaddy_t*)relcbdata;
 590 
 591     OBJ_RELEASE(d);
 592 }
 593 
 594 static void pmix_server_dmdx_resp(int status, orte_process_name_t* sender,
 595                                   opal_buffer_t *buffer,
 596                                   orte_rml_tag_t tg, void *cbdata)
 597 {
 598     int rc, ret, room_num, rnum;
 599     int32_t cnt;
 600     opal_process_name_t target;
 601     pmix_server_req_t *req;
 602     datacaddy_t *d;
 603 
 604     opal_output_verbose(2, orte_pmix_server_globals.output,
 605                         "%s dmdx:recv response from proc %s",
 606                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 607                         ORTE_NAME_PRINT(sender));
 608 
 609     /* unpack the status */
 610     cnt = 1;
 611     if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &ret, &cnt, OPAL_INT))) {
 612         ORTE_ERROR_LOG(rc);
 613         return;
 614     }
 615 
 616     /* unpack the id of the target whose info we just received */
 617     cnt = 1;
 618     if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &target, &cnt, OPAL_NAME))) {
 619         ORTE_ERROR_LOG(rc);
 620         return;
 621     }
 622 
 623     /* unpack our tracking room number */
 624     cnt = 1;
 625     if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &room_num, &cnt, OPAL_INT))) {
 626         ORTE_ERROR_LOG(rc);
 627         return;
 628     }
 629 
 630     /* unload the remainder of the buffer */
 631     d = OBJ_NEW(datacaddy_t);
 632     if (OPAL_SUCCESS != (rc = opal_dss.unload(buffer, (void**)&d->data, &d->ndata))) {
 633         ORTE_ERROR_LOG(rc);
 634         return;
 635     }
 636 
 637     /* check the request out of the tracking hotel */
 638     opal_hotel_checkout_and_return_occupant(&orte_pmix_server_globals.reqs, room_num, (void**)&req);
 639     /* return the returned data to the requestor */
 640     if (NULL != req) {
 641         if (NULL != req->mdxcbfunc) {
 642             OBJ_RETAIN(d);
 643             req->mdxcbfunc(ret, d->data, d->ndata, req->cbdata, relcbfunc, d);
 644         }
 645         OBJ_RELEASE(req);
 646     }
 647 
 648     /* now see if anyone else was waiting for data from this target */
 649     for (rnum=0; rnum < orte_pmix_server_globals.reqs.num_rooms; rnum++) {
 650         opal_hotel_knock(&orte_pmix_server_globals.reqs, rnum, (void**)&req);
 651         if (NULL == req) {
 652             continue;
 653         }
 654         if (req->target.jobid == target.jobid &&
 655             req->target.vpid == target.vpid) {
 656             if (NULL != req->mdxcbfunc) {
 657                 OBJ_RETAIN(d);
 658                 req->mdxcbfunc(ret, d->data, d->ndata, req->cbdata, relcbfunc, d);
 659             }
 660             opal_hotel_checkout(&orte_pmix_server_globals.reqs, rnum);
 661             OBJ_RELEASE(req);
 662         }
 663     }
 664     OBJ_RELEASE(d);  // maintain accounting
 665 }
 666 
 667 static void opcon(orte_pmix_server_op_caddy_t *p)
 668 {
 669     p->procs = NULL;
 670     p->eprocs = NULL;
 671     p->info = NULL;
 672     p->cbfunc = NULL;
 673     p->infocbfunc = NULL;
 674     p->toolcbfunc = NULL;
 675     p->cbdata = NULL;
 676 }
 677 OBJ_CLASS_INSTANCE(orte_pmix_server_op_caddy_t,
 678                    opal_object_t,
 679                    opcon, NULL);
 680 
 681 static void rqcon(pmix_server_req_t *p)
 682 {
 683     p->operation = NULL;
 684     p->range = OPAL_PMIX_RANGE_SESSION;
 685     p->proxy = *ORTE_NAME_INVALID;
 686     p->target = *ORTE_NAME_INVALID;
 687     p->timeout = orte_pmix_server_globals.timeout;
 688     p->jdata = NULL;
 689     OBJ_CONSTRUCT(&p->msg, opal_buffer_t);
 690     p->opcbfunc = NULL;
 691     p->mdxcbfunc = NULL;
 692     p->spcbfunc = NULL;
 693     p->lkcbfunc = NULL;
 694     p->rlcbfunc = NULL;
 695     p->cbdata = NULL;
 696 }
 697 static void rqdes(pmix_server_req_t *p)
 698 {
 699     if (NULL != p->operation) {
 700         free(p->operation);
 701     }
 702     if (NULL != p->jdata) {
 703         OBJ_RELEASE(p->jdata);
 704     }
 705     OBJ_DESTRUCT(&p->msg);
 706 }
 707 OBJ_CLASS_INSTANCE(pmix_server_req_t,
 708                    opal_object_t,
 709                    rqcon, rqdes);
 710 
 711 static void mdcon(orte_pmix_mdx_caddy_t *p)
 712 {
 713     p->sig = NULL;
 714     p->cbfunc = NULL;
 715     p->cbdata = NULL;
 716 }
 717 static void mddes(orte_pmix_mdx_caddy_t *p)
 718 {
 719     if (NULL != p->sig) {
 720         OBJ_RELEASE(p->sig);
 721     }
 722 }
 723 OBJ_CLASS_INSTANCE(orte_pmix_mdx_caddy_t,
 724                    opal_object_t,
 725                    mdcon, mddes);

/* [<][>][^][v][top][bottom][index][help] */