root/orte/orted/pmix/pmix_server_dyn.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. pmix_server_launch_resp
  2. spawn
  3. pmix_server_spawn_fn
  4. _cnlk
  5. _cnct
  6. pmix_server_connect_fn
  7. mdxcbfunc
  8. pmix_server_disconnect_fn
  9. pmix_server_alloc_fn

   1 /*
   2  * Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2011 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2006-2013 Los Alamos National Security, LLC.
  13  *                         All rights reserved.
  14  * Copyright (c) 2009-2017 Cisco Systems, Inc.  All rights reserved
  15  * Copyright (c) 2011      Oak Ridge National Labs.  All rights reserved.
  16  * Copyright (c) 2013-2019 Intel, Inc.  All rights reserved.
  17  * Copyright (c) 2014      Mellanox Technologies, Inc.
  18  *                         All rights reserved.
  19  * Copyright (c) 2014-2016 Research Organization for Information Science
  20  *                         and Technology (RIST). All rights reserved.
  21  * $COPYRIGHT$
  22  *
  23  * Additional copyrights may follow
  24  *
  25  * $HEADER$
  26  *
  27  */
  28 
  29 #include "orte_config.h"
  30 
  31 #ifdef HAVE_UNISTD_H
  32 #include <unistd.h>
  33 #endif
  34 
  35 #include "opal/util/argv.h"
  36 #include "opal/util/opal_getcwd.h"
  37 #include "opal/util/os_path.h"
  38 #include "opal/util/output.h"
  39 #include "opal/util/path.h"
  40 #include "opal/dss/dss.h"
  41 #include "opal/mca/hwloc/hwloc-internal.h"
  42 
  43 #include "orte/mca/errmgr/errmgr.h"
  44 #include "orte/mca/rmaps/base/base.h"
  45 #include "orte/mca/rml/base/rml_contact.h"
  46 #include "orte/mca/state/state.h"
  47 #include "orte/util/name_fns.h"
  48 #include "orte/util/show_help.h"
  49 #include "orte/util/threads.h"
  50 #include "orte/runtime/orte_globals.h"
  51 #include "orte/mca/rml/rml.h"
  52 
  53 #include "orte/orted/pmix/pmix_server.h"
  54 #include "orte/orted/pmix/pmix_server_internal.h"
  55 
  56 void pmix_server_launch_resp(int status, orte_process_name_t* sender,
  57                              opal_buffer_t *buffer,
  58                              orte_rml_tag_t tg, void *cbdata)
  59 {
  60     pmix_server_req_t *req;
  61     int rc, room;
  62     int32_t ret, cnt;
  63     orte_jobid_t jobid;
  64     orte_job_t *jdata;
  65 
  66     /* unpack the status */
  67     cnt = 1;
  68     if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &ret, &cnt, OPAL_INT32))) {
  69         ORTE_ERROR_LOG(rc);
  70         return;
  71     }
  72 
  73     /* unpack the jobid */
  74     cnt = 1;
  75     if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &jobid, &cnt, ORTE_JOBID))) {
  76         ORTE_ERROR_LOG(rc);
  77         return;
  78     }
  79 
  80     /* unpack our tracking room number */
  81     cnt = 1;
  82     if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &room, &cnt, OPAL_INT))) {
  83         ORTE_ERROR_LOG(rc);
  84         return;
  85     }
  86 
  87     /* retrieve the request */
  88     opal_hotel_checkout_and_return_occupant(&orte_pmix_server_globals.reqs, room, (void**)&req);
  89     if (NULL == req) {
  90         /* we are hosed */
  91         ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
  92         return;
  93     }
  94 
  95     /* execute the callback */
  96     if (NULL != req->spcbfunc) {
  97         req->spcbfunc(ret, jobid, req->cbdata);
  98     }
  99     /* if we failed to launch, then ensure we cleanup */
 100     if (ORTE_SUCCESS != ret) {
 101         jdata = orte_get_job_data_object(jobid);
 102         ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_TERMINATED);
 103     }
 104     /* cleanup */
 105     OBJ_RELEASE(req);
 106 }
 107 
 108 static void spawn(int sd, short args, void *cbdata)
 109 {
 110     pmix_server_req_t *req = (pmix_server_req_t*)cbdata;
 111     int rc;
 112     opal_buffer_t *buf;
 113     orte_plm_cmd_flag_t command;
 114 
 115     ORTE_ACQUIRE_OBJECT(req);
 116 
 117     /* add this request to our tracker hotel */
 118     if (OPAL_SUCCESS != (rc = opal_hotel_checkin(&orte_pmix_server_globals.reqs, req, &req->room_num))) {
 119         orte_show_help("help-orted.txt", "noroom", true, req->operation, orte_pmix_server_globals.num_rooms);
 120         goto callback;
 121     }
 122 
 123     /* include the request room number for quick retrieval */
 124     orte_set_attribute(&req->jdata->attributes, ORTE_JOB_ROOM_NUM,
 125                        ORTE_ATTR_GLOBAL, &req->room_num, OPAL_INT);
 126 
 127     /* construct a spawn message */
 128     buf = OBJ_NEW(opal_buffer_t);
 129     command = ORTE_PLM_LAUNCH_JOB_CMD;
 130     if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_PLM_CMD))) {
 131         ORTE_ERROR_LOG(rc);
 132         OBJ_RELEASE(buf);
 133         opal_hotel_checkout(&orte_pmix_server_globals.reqs, req->room_num);
 134         goto callback;
 135     }
 136 
 137     /* pack the jdata object */
 138     if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &req->jdata, 1, ORTE_JOB))) {
 139         ORTE_ERROR_LOG(rc);
 140         opal_hotel_checkout(&orte_pmix_server_globals.reqs, req->room_num);
 141         OBJ_RELEASE(buf);
 142         goto callback;
 143 
 144     }
 145 
 146     /* send it to the HNP for processing - might be myself! */
 147     if (ORTE_SUCCESS != (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buf,
 148                                                       ORTE_RML_TAG_PLM,
 149                                                       orte_rml_send_callback, NULL))) {
 150         ORTE_ERROR_LOG(rc);
 151         opal_hotel_checkout(&orte_pmix_server_globals.reqs, req->room_num);
 152         OBJ_RELEASE(buf);
 153         goto callback;
 154     }
 155     return;
 156 
 157   callback:
 158     /* this section gets executed solely upon an error */
 159     if (NULL != req->mdxcbfunc) {
 160         req->mdxcbfunc(rc, NULL, 0, req->cbdata, NULL, NULL);
 161     }
 162     OBJ_RELEASE(req);
 163 }
 164 
 165 int pmix_server_spawn_fn(opal_process_name_t *requestor,
 166                          opal_list_t *job_info, opal_list_t *apps,
 167                          opal_pmix_spawn_cbfunc_t cbfunc, void *cbdata)
 168 {
 169     orte_job_t *jdata;
 170     orte_app_context_t *app;
 171     opal_pmix_app_t *papp;
 172     opal_value_t *info, *next;
 173     opal_list_t *cache;
 174     int rc, i;
 175     char cwd[OPAL_PATH_MAX];
 176     bool flag;
 177 
 178     opal_output_verbose(2, orte_pmix_server_globals.output,
 179                         "%s spawn called from proc %s",
 180                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 181                         ORTE_NAME_PRINT(requestor));
 182 
 183     /* create the job object */
 184     jdata = OBJ_NEW(orte_job_t);
 185     jdata->map = OBJ_NEW(orte_job_map_t);
 186 
 187     /* transfer the apps across */
 188     OPAL_LIST_FOREACH(papp, apps, opal_pmix_app_t) {
 189         app = OBJ_NEW(orte_app_context_t);
 190         app->idx = opal_pointer_array_add(jdata->apps, app);
 191         jdata->num_apps++;
 192         if (NULL != papp->cmd) {
 193             app->app = strdup(papp->cmd);
 194         } else if (NULL == papp->argv ||
 195                    NULL == papp->argv[0]) {
 196             ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
 197             OBJ_RELEASE(jdata);
 198             return ORTE_ERR_BAD_PARAM;
 199         } else {
 200             app->app = strdup(papp->argv[0]);
 201         }
 202         if (NULL != papp->argv) {
 203             app->argv = opal_argv_copy(papp->argv);
 204         }
 205         if (NULL != papp->env) {
 206             app->env = opal_argv_copy(papp->env);
 207         }
 208         if (NULL != papp->cwd) {
 209             app->cwd = strdup(papp->cwd);
 210         }
 211         app->num_procs = papp->maxprocs;
 212         OPAL_LIST_FOREACH(info, &papp->info, opal_value_t) {
 213             if (0 == strcmp(info->key, OPAL_PMIX_HOST)) {
 214                 orte_set_attribute(&app->attributes, ORTE_APP_DASH_HOST,
 215                                    ORTE_ATTR_GLOBAL, info->data.string, OPAL_STRING);
 216             } else if (0 == strcmp(info->key, OPAL_PMIX_HOSTFILE)) {
 217                 orte_set_attribute(&app->attributes, ORTE_APP_HOSTFILE,
 218                                    ORTE_ATTR_GLOBAL, info->data.string, OPAL_STRING);
 219             } else if (0 == strcmp(info->key, OPAL_PMIX_ADD_HOSTFILE)) {
 220                 orte_set_attribute(&app->attributes, ORTE_APP_ADD_HOSTFILE,
 221                                    ORTE_ATTR_GLOBAL, info->data.string, OPAL_STRING);
 222             } else if (0 == strcmp(info->key, OPAL_PMIX_ADD_HOST)) {
 223                 orte_set_attribute(&app->attributes, ORTE_APP_ADD_HOST,
 224                                    ORTE_ATTR_GLOBAL, info->data.string, OPAL_STRING);
 225             } else if (0 == strcmp(info->key, OPAL_PMIX_PREFIX)) {
 226                 orte_set_attribute(&app->attributes, ORTE_APP_PREFIX_DIR,
 227                                    ORTE_ATTR_GLOBAL, info->data.string, OPAL_STRING);
 228             } else if (0 == strcmp(info->key, OPAL_PMIX_WDIR)) {
 229                 /* if this is a relative path, convert it to an absolute path */
 230                 if (opal_path_is_absolute(info->data.string)) {
 231                     app->cwd = strdup(info->data.string);
 232                 } else {
 233                     /* get the cwd */
 234                     if (OPAL_SUCCESS != (rc = opal_getcwd(cwd, sizeof(cwd)))) {
 235                         orte_show_help("help-orted.txt", "cwd", true, "spawn", rc);
 236                         OBJ_RELEASE(jdata);
 237                         return rc;
 238                     }
 239                     /* construct the absolute path */
 240                     app->cwd = opal_os_path(false, cwd, info->data.string, NULL);
 241                 }
 242             } else if (0 == strcmp(info->key, OPAL_PMIX_PRELOAD_BIN)) {
 243                 OPAL_CHECK_BOOL(info, flag);
 244                 orte_set_attribute(&app->attributes, ORTE_APP_PRELOAD_BIN,
 245                                    ORTE_ATTR_GLOBAL, &flag, OPAL_BOOL);
 246             } else if (0 == strcmp(info->key, OPAL_PMIX_PRELOAD_FILES)) {
 247                 orte_set_attribute(&app->attributes, ORTE_APP_PRELOAD_FILES,
 248                                    ORTE_ATTR_GLOBAL, info->data.string, OPAL_STRING);
 249 
 250             /***   ENVIRONMENTAL VARIABLE DIRECTIVES   ***/
 251             /* there can be multiple of these, so we add them to the attribute list */
 252             } else if (0 == strcmp(info->key, OPAL_PMIX_SET_ENVAR)) {
 253                 orte_add_attribute(&app->attributes, ORTE_APP_SET_ENVAR,
 254                                    ORTE_ATTR_GLOBAL, &info->data.envar, OPAL_ENVAR);
 255             } else if (0 == strcmp(info->key, OPAL_PMIX_ADD_ENVAR)) {
 256                 orte_add_attribute(&app->attributes, ORTE_APP_ADD_ENVAR,
 257                                    ORTE_ATTR_GLOBAL, &info->data.envar, OPAL_ENVAR);
 258             } else if (0 == strcmp(info->key, OPAL_PMIX_UNSET_ENVAR)) {
 259                 orte_add_attribute(&app->attributes, ORTE_APP_UNSET_ENVAR,
 260                                    ORTE_ATTR_GLOBAL, info->data.string, OPAL_STRING);
 261             } else if (0 == strcmp(info->key, OPAL_PMIX_PREPEND_ENVAR)) {
 262                 orte_add_attribute(&app->attributes, ORTE_APP_PREPEND_ENVAR,
 263                                    ORTE_ATTR_GLOBAL, &info->data.envar, OPAL_ENVAR);
 264             } else if (0 == strcmp(info->key, OPAL_PMIX_APPEND_ENVAR)) {
 265                 orte_add_attribute(&app->attributes, ORTE_APP_APPEND_ENVAR,
 266                                    ORTE_ATTR_GLOBAL, &info->data.envar, OPAL_ENVAR);
 267                 } else if (0 == strcmp(info->key, OPAL_PMIX_PSET_NAME)) {
 268                     orte_set_attribute(&app->attributes, ORTE_APP_PSET_NAME,
 269                                        ORTE_ATTR_GLOBAL, info->data.string, OPAL_STRING);
 270 
 271             } else {
 272                 /* unrecognized key */
 273                 orte_show_help("help-orted.txt", "bad-key",
 274                                true, "spawn", "application", info->key);
 275            }
 276         }
 277     }
 278 
 279     /* transfer the job info across */
 280     OPAL_LIST_FOREACH_SAFE(info, next, job_info, opal_value_t) {
 281         /***   PERSONALITY   ***/
 282         if (0 == strcmp(info->key, OPAL_PMIX_PERSONALITY)) {
 283             jdata->personality = opal_argv_split(info->data.string, ',');
 284 
 285         /***   REQUESTED MAPPER   ***/
 286         } else if (0 == strcmp(info->key, OPAL_PMIX_MAPPER)) {
 287             jdata->map->req_mapper = strdup(info->data.string);
 288 
 289         /***   DISPLAY MAP   ***/
 290         } else if (0 == strcmp(info->key, OPAL_PMIX_DISPLAY_MAP)) {
 291             OPAL_CHECK_BOOL(info, jdata->map->display_map);
 292 
 293         /***   PPR (PROCS-PER-RESOURCE)   ***/
 294         } else if (0 == strcmp(info->key, OPAL_PMIX_PPR)) {
 295             if (ORTE_MAPPING_POLICY_IS_SET(jdata->map->mapping)) {
 296                 /* not allowed to provide multiple mapping policies */
 297                 orte_show_help("help-orte-rmaps-base.txt", "redefining-policy",
 298                                true, "mapping", info->data.string,
 299                                orte_rmaps_base_print_mapping(orte_rmaps_base.mapping));
 300                 return ORTE_ERR_BAD_PARAM;
 301             }
 302             ORTE_SET_MAPPING_DIRECTIVE(jdata->map->mapping, ORTE_MAPPING_PPR);
 303             jdata->map->ppr = strdup(info->data.string);
 304 
 305         /***   MAP-BY   ***/
 306         } else if (0 == strcmp(info->key, OPAL_PMIX_MAPBY)) {
 307             if (ORTE_MAPPING_POLICY_IS_SET(jdata->map->mapping)) {
 308                 /* not allowed to provide multiple mapping policies */
 309                 orte_show_help("help-orte-rmaps-base.txt", "redefining-policy",
 310                                true, "mapping", info->data.string,
 311                                orte_rmaps_base_print_mapping(orte_rmaps_base.mapping));
 312                 return ORTE_ERR_BAD_PARAM;
 313             }
 314             rc = orte_rmaps_base_set_mapping_policy(jdata, &jdata->map->mapping,
 315                                                     NULL, info->data.string);
 316             if (ORTE_SUCCESS != rc) {
 317                 return rc;
 318             }
 319 
 320         /***   RANK-BY   ***/
 321         } else if (0 == strcmp(info->key, OPAL_PMIX_RANKBY)) {
 322             if (ORTE_RANKING_POLICY_IS_SET(jdata->map->ranking)) {
 323                 /* not allowed to provide multiple ranking policies */
 324                 orte_show_help("help-orte-rmaps-base.txt", "redefining-policy",
 325                                true, "ranking", info->data.string,
 326                                orte_rmaps_base_print_ranking(orte_rmaps_base.ranking));
 327                 return ORTE_ERR_BAD_PARAM;
 328             }
 329             rc = orte_rmaps_base_set_ranking_policy(&jdata->map->ranking,
 330                                                     jdata->map->mapping,
 331                                                     info->data.string);
 332             if (ORTE_SUCCESS != rc) {
 333                 return rc;
 334             }
 335 
 336         /***   BIND-TO   ***/
 337         } else if (0 == strcmp(info->key, OPAL_PMIX_BINDTO)) {
 338             if (OPAL_BINDING_POLICY_IS_SET(jdata->map->binding)) {
 339                 /* not allowed to provide multiple mapping policies */
 340                 orte_show_help("help-opal-hwloc-base.txt", "redefining-policy", true,
 341                                info->data.string,
 342                                opal_hwloc_base_print_binding(opal_hwloc_binding_policy));
 343                 return ORTE_ERR_BAD_PARAM;
 344             }
 345             rc = opal_hwloc_base_set_binding_policy(&jdata->map->binding,
 346                                                     info->data.string);
 347             if (ORTE_SUCCESS != rc) {
 348                 return rc;
 349             }
 350 
 351         /***   CPUS/RANK   ***/
 352         } else if (0 == strcmp(info->key, OPAL_PMIX_CPUS_PER_PROC)) {
 353             jdata->map->cpus_per_rank = info->data.uint32;
 354 
 355         /***   NO USE LOCAL   ***/
 356         } else if (0 == strcmp(info->key, OPAL_PMIX_NO_PROCS_ON_HEAD)) {
 357             OPAL_CHECK_BOOL(info, flag);
 358             if (flag) {
 359                 ORTE_SET_MAPPING_DIRECTIVE(jdata->map->mapping, ORTE_MAPPING_NO_USE_LOCAL);
 360             } else {
 361                 ORTE_UNSET_MAPPING_DIRECTIVE(jdata->map->mapping, ORTE_MAPPING_NO_USE_LOCAL);
 362             }
 363             /* mark that the user specified it */
 364             ORTE_SET_MAPPING_DIRECTIVE(jdata->map->mapping, ORTE_MAPPING_LOCAL_GIVEN);
 365 
 366         /***   OVERSUBSCRIBE   ***/
 367         } else if (0 == strcmp(info->key, OPAL_PMIX_NO_OVERSUBSCRIBE)) {
 368             OPAL_CHECK_BOOL(info, flag);
 369             if (flag) {
 370                 ORTE_SET_MAPPING_DIRECTIVE(jdata->map->mapping, ORTE_MAPPING_NO_OVERSUBSCRIBE);
 371             } else {
 372                 ORTE_UNSET_MAPPING_DIRECTIVE(jdata->map->mapping, ORTE_MAPPING_NO_OVERSUBSCRIBE);
 373             }
 374             /* mark that the user specified it */
 375             ORTE_SET_MAPPING_DIRECTIVE(jdata->map->mapping, ORTE_MAPPING_SUBSCRIBE_GIVEN);
 376 
 377         /***   REPORT BINDINGS  ***/
 378         } else if (0 == strcmp(info->key, OPAL_PMIX_REPORT_BINDINGS)) {
 379             OPAL_CHECK_BOOL(info, flag);
 380             orte_set_attribute(&jdata->attributes, ORTE_JOB_REPORT_BINDINGS,
 381                                ORTE_ATTR_GLOBAL, &flag, OPAL_BOOL);
 382 
 383         /***   CPU LIST  ***/
 384         } else if (0 == strcmp(info->key, OPAL_PMIX_CPU_LIST)) {
 385             orte_set_attribute(&jdata->attributes, ORTE_JOB_CPU_LIST,
 386                                ORTE_ATTR_GLOBAL, info->data.string, OPAL_BOOL);
 387 
 388         /***   RECOVERABLE  ***/
 389         } else if (0 == strcmp(info->key, OPAL_PMIX_JOB_RECOVERABLE)) {
 390             OPAL_CHECK_BOOL(info, flag);
 391             if (flag) {
 392                 ORTE_FLAG_SET(jdata, ORTE_JOB_FLAG_RECOVERABLE);
 393             } else {
 394                 ORTE_FLAG_UNSET(jdata, ORTE_JOB_FLAG_RECOVERABLE);
 395             }
 396 
 397         /***   MAX RESTARTS  ***/
 398         } else if (0 == strcmp(info->key, OPAL_PMIX_MAX_RESTARTS)) {
 399             for (i=0; i < jdata->apps->size; i++) {
 400                 if (NULL == (app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, i))) {
 401                     continue;
 402                 }
 403                 orte_set_attribute(&app->attributes, ORTE_APP_MAX_RESTARTS,
 404                                    ORTE_ATTR_GLOBAL, &info->data.uint32, OPAL_INT32);
 405             }
 406 
 407         /***   CONTINUOUS OPERATION  ***/
 408         } else if (0 == strcmp(info->key, OPAL_PMIX_JOB_CONTINUOUS)) {
 409             OPAL_CHECK_BOOL(info, flag);
 410             orte_set_attribute(&jdata->attributes, ORTE_JOB_CONTINUOUS_OP,
 411                                ORTE_ATTR_GLOBAL, &flag, OPAL_BOOL);
 412 
 413         /***   NON-PMI JOB   ***/
 414         } else if (0 == strcmp(info->key, OPAL_PMIX_NON_PMI)) {
 415             OPAL_CHECK_BOOL(info, flag);
 416             orte_set_attribute(&jdata->attributes, ORTE_JOB_NON_ORTE_JOB,
 417                                ORTE_ATTR_GLOBAL, &flag, OPAL_BOOL);
 418 
 419         /***   SPAWN REQUESTOR IS TOOL   ***/
 420         } else if (0 == strcmp(info->key, OPAL_PMIX_REQUESTOR_IS_TOOL)) {
 421             OPAL_CHECK_BOOL(info, flag);
 422             orte_set_attribute(&jdata->attributes, ORTE_JOB_DVM_JOB,
 423                                ORTE_ATTR_GLOBAL, &flag, OPAL_BOOL);
 424             if (flag) {
 425                 /* request that IO be forwarded to the requesting tool */
 426                 orte_set_attribute(&jdata->attributes, ORTE_JOB_FWDIO_TO_TOOL,
 427                                    ORTE_ATTR_GLOBAL, &flag, OPAL_BOOL);
 428             }
 429 
 430         /***   NOTIFY UPON JOB COMPLETION   ***/
 431         } else if (0 == strcmp(info->key, OPAL_PMIX_NOTIFY_COMPLETION)) {
 432             OPAL_CHECK_BOOL(info, flag);
 433             orte_set_attribute(&jdata->attributes, ORTE_JOB_NOTIFY_COMPLETION,
 434                                ORTE_ATTR_GLOBAL, &flag, OPAL_BOOL);
 435 
 436         /***   STOP ON EXEC FOR DEBUGGER   ***/
 437         } else if (0 == strcmp(info->key, OPAL_PMIX_DEBUG_STOP_ON_EXEC)) {
 438             /* we don't know how to do this */
 439             return ORTE_ERR_NOT_SUPPORTED;
 440 
 441         /***   TAG STDOUT   ***/
 442         } else if (0 == strcmp(info->key, OPAL_PMIX_TAG_OUTPUT)) {
 443             OPAL_CHECK_BOOL(info, flag);
 444             orte_set_attribute(&jdata->attributes, ORTE_JOB_TAG_OUTPUT,
 445                                ORTE_ATTR_GLOBAL, &flag, OPAL_BOOL);
 446 
 447         /***   TIMESTAMP OUTPUT   ***/
 448         } else if (0 == strcmp(info->key, OPAL_PMIX_TIMESTAMP_OUTPUT)) {
 449             OPAL_CHECK_BOOL(info, flag);
 450             orte_set_attribute(&jdata->attributes, ORTE_JOB_TIMESTAMP_OUTPUT,
 451                                ORTE_ATTR_GLOBAL, &flag, OPAL_BOOL);
 452 
 453         /***   OUTPUT TO FILES   ***/
 454         } else if (0 == strcmp(info->key, OPAL_PMIX_OUTPUT_TO_FILE)) {
 455             orte_set_attribute(&jdata->attributes, ORTE_JOB_OUTPUT_TO_FILE,
 456                                ORTE_ATTR_GLOBAL, info->data.string, OPAL_STRING);
 457 
 458         /***   MERGE STDERR TO STDOUT   ***/
 459         } else if (0 == strcmp(info->key, OPAL_PMIX_MERGE_STDERR_STDOUT)) {
 460             OPAL_CHECK_BOOL(info, flag);
 461             orte_set_attribute(&jdata->attributes, ORTE_JOB_MERGE_STDERR_STDOUT,
 462                                ORTE_ATTR_GLOBAL, &flag, OPAL_BOOL);
 463 
 464         /***   STDIN TARGET   ***/
 465         } else if (0 == strcmp(info->key, OPAL_PMIX_STDIN_TGT)) {
 466             if (0 == strcmp(info->data.string, "all")) {
 467                 jdata->stdin_target = ORTE_VPID_WILDCARD;
 468             } else if (0 == strcmp(info->data.string, "none")) {
 469                 jdata->stdin_target = ORTE_VPID_INVALID;
 470             } else {
 471                 jdata->stdin_target = strtoul(info->data.string, NULL, 10);
 472             }
 473 
 474         /***   INDEX ARGV   ***/
 475         } else if (0 == strcmp(info->key, OPAL_PMIX_INDEX_ARGV)) {
 476             OPAL_CHECK_BOOL(info, flag);
 477             orte_set_attribute(&jdata->attributes, ORTE_JOB_INDEX_ARGV,
 478                                ORTE_ATTR_GLOBAL, &flag, OPAL_BOOL);
 479 
 480         /***   DEBUGGER DAEMONS   ***/
 481         } else if (0 == strcmp(info->key, OPAL_PMIX_DEBUGGER_DAEMONS)) {
 482             ORTE_FLAG_SET(jdata, ORTE_JOB_FLAG_DEBUGGER_DAEMON);
 483             ORTE_SET_MAPPING_DIRECTIVE(jdata->map->mapping, ORTE_MAPPING_DEBUGGER);
 484 
 485         /***   ENVIRONMENTAL VARIABLE DIRECTIVES   ***/
 486         /* there can be multiple of these, so we add them to the attribute list */
 487         } else if (0 == strcmp(info->key, OPAL_PMIX_SET_ENVAR)) {
 488             orte_add_attribute(&jdata->attributes, ORTE_JOB_SET_ENVAR,
 489                                ORTE_ATTR_GLOBAL, &info->data.envar, OPAL_ENVAR);
 490         } else if (0 == strcmp(info->key, OPAL_PMIX_ADD_ENVAR)) {
 491             orte_add_attribute(&jdata->attributes, ORTE_JOB_ADD_ENVAR,
 492                                ORTE_ATTR_GLOBAL, &info->data.envar, OPAL_ENVAR);
 493         } else if (0 == strcmp(info->key, OPAL_PMIX_UNSET_ENVAR)) {
 494             orte_add_attribute(&jdata->attributes, ORTE_JOB_UNSET_ENVAR,
 495                                ORTE_ATTR_GLOBAL, info->data.string, OPAL_STRING);
 496         } else if (0 == strcmp(info->key, OPAL_PMIX_PREPEND_ENVAR)) {
 497             orte_add_attribute(&jdata->attributes, ORTE_JOB_PREPEND_ENVAR,
 498                                ORTE_ATTR_GLOBAL, &info->data.envar, OPAL_ENVAR);
 499         } else if (0 == strcmp(info->key, OPAL_PMIX_APPEND_ENVAR)) {
 500             orte_add_attribute(&jdata->attributes, ORTE_JOB_APPEND_ENVAR,
 501                                ORTE_ATTR_GLOBAL, &info->data.envar, OPAL_ENVAR);
 502 
 503         /***   DEFAULT - CACHE FOR INCLUSION WITH JOB INFO   ***/
 504         } else {
 505             /* cache for inclusion with job info at registration */
 506             cache = NULL;
 507             opal_list_remove_item(job_info, &info->super);
 508             if (orte_get_attribute(&jdata->attributes, ORTE_JOB_INFO_CACHE, (void**)&cache, OPAL_PTR) &&
 509                 NULL != cache) {
 510                 opal_list_append(cache, &info->super);
 511             } else {
 512                 cache = OBJ_NEW(opal_list_t);
 513                 opal_list_append(cache, &info->super);
 514                 orte_set_attribute(&jdata->attributes, ORTE_JOB_INFO_CACHE, ORTE_ATTR_LOCAL, (void*)cache, OPAL_PTR);
 515             }
 516         }
 517     }
 518     /* if the job is missing a personality setting, add it */
 519     if (NULL == jdata->personality) {
 520         opal_argv_append_nosize(&jdata->personality, "ompi");
 521     }
 522 
 523     /* indicate the requestor so bookmarks can be correctly set */
 524     orte_set_attribute(&jdata->attributes, ORTE_JOB_LAUNCH_PROXY,
 525                        ORTE_ATTR_GLOBAL, requestor, OPAL_NAME);
 526 
 527     /* setup a spawn tracker so we know who to call back when this is done
 528      * and thread-shift the entire thing so it can be safely added to
 529      * our tracking list */
 530     ORTE_SPN_REQ(jdata, spawn, cbfunc, cbdata);
 531 
 532     return OPAL_SUCCESS;
 533 }
 534 
 535 static void _cnct(int sd, short args, void *cbdata);
 536 
 537 static void _cnlk(int status, opal_list_t *data, void *cbdata)
 538 {
 539     orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
 540     int rc, cnt;
 541     opal_pmix_pdata_t *pdat;
 542     orte_job_t *jdata;
 543     orte_node_t *node;
 544     orte_proc_t *proc;
 545     opal_buffer_t buf, bucket;
 546     opal_byte_object_t *bo;
 547     orte_process_name_t dmn, pname;
 548     char *uri;
 549     opal_value_t val;
 550     opal_list_t nodes;
 551 
 552     ORTE_ACQUIRE_OBJECT(cd);
 553 
 554     /* if we failed to get the required data, then just inform
 555      * the embedded server that the connect cannot succeed */
 556     if (ORTE_SUCCESS != status || NULL == data) {
 557         if (NULL != cd->cbfunc) {
 558             rc = status;
 559             goto release;
 560         }
 561     }
 562 
 563     /* register the returned data with the embedded PMIx server */
 564     pdat = (opal_pmix_pdata_t*)opal_list_get_first(data);
 565     if (OPAL_BYTE_OBJECT != pdat->value.type) {
 566         rc = ORTE_ERR_BAD_PARAM;
 567         ORTE_ERROR_LOG(rc);
 568         goto release;
 569     }
 570     /* the data will consist of a packed buffer with the job data in it */
 571     OBJ_CONSTRUCT(&buf, opal_buffer_t);
 572     opal_dss.load(&buf, pdat->value.data.bo.bytes, pdat->value.data.bo.size);
 573     pdat->value.data.bo.bytes = NULL;
 574     pdat->value.data.bo.size = 0;
 575     cnt = 1;
 576     if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &jdata, &cnt, ORTE_JOB))) {
 577         ORTE_ERROR_LOG(rc);
 578         OBJ_DESTRUCT(&buf);
 579         goto release;
 580     }
 581 
 582     /* unpack the byte object containing the daemon uri's */
 583     cnt=1;
 584     if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &bo, &cnt, OPAL_BYTE_OBJECT))) {
 585         ORTE_ERROR_LOG(rc);
 586         OBJ_DESTRUCT(&buf);
 587         goto release;
 588     }
 589     /* load it into a buffer */
 590     OBJ_CONSTRUCT(&bucket, opal_buffer_t);
 591     opal_dss.load(&bucket, bo->bytes, bo->size);
 592     bo->bytes = NULL;
 593     free(bo);
 594     /* prep a list to save the nodes */
 595     OBJ_CONSTRUCT(&nodes, opal_list_t);
 596     /* unpack and store the URI's */
 597     cnt = 1;
 598     while (OPAL_SUCCESS == (rc = opal_dss.unpack(&bucket, &uri, &cnt, OPAL_STRING))) {
 599         rc = orte_rml_base_parse_uris(uri, &dmn, NULL);
 600         if (ORTE_SUCCESS != rc) {
 601             OBJ_DESTRUCT(&buf);
 602             OBJ_DESTRUCT(&bucket);
 603             goto release;
 604         }
 605         /* save a node object for this daemon */
 606         node = OBJ_NEW(orte_node_t);
 607         node->daemon = OBJ_NEW(orte_proc_t);
 608         memcpy(&node->daemon->name, &dmn, sizeof(orte_process_name_t));
 609         opal_list_append(&nodes, &node->super);
 610         /* register the URI */
 611         OBJ_CONSTRUCT(&val, opal_value_t);
 612         val.key = OPAL_PMIX_PROC_URI;
 613         val.type = OPAL_STRING;
 614         val.data.string = uri;
 615         if (OPAL_SUCCESS != (rc = opal_pmix.store_local(&dmn, &val))) {
 616             ORTE_ERROR_LOG(rc);
 617             val.key = NULL;
 618             val.data.string = NULL;
 619             OBJ_DESTRUCT(&val);
 620             OBJ_DESTRUCT(&buf);
 621             OBJ_DESTRUCT(&bucket);
 622             goto release;
 623         }
 624         val.key = NULL;
 625         val.data.string = NULL;
 626         OBJ_DESTRUCT(&val);
 627         cnt = 1;
 628     }
 629     OBJ_DESTRUCT(&bucket);
 630 
 631     /* unpack the proc-to-daemon map */
 632     cnt=1;
 633     if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &bo, &cnt, OPAL_BYTE_OBJECT))) {
 634         ORTE_ERROR_LOG(rc);
 635         OBJ_DESTRUCT(&buf);
 636         goto release;
 637     }
 638     /* load it into a buffer */
 639     OBJ_CONSTRUCT(&bucket, opal_buffer_t);
 640     opal_dss.load(&bucket, bo->bytes, bo->size);
 641     bo->bytes = NULL;
 642     free(bo);
 643     /* unpack and store the map */
 644     cnt = 1;
 645     while (OPAL_SUCCESS == (rc = opal_dss.unpack(&bucket, &pname, &cnt, ORTE_NAME))) {
 646         /* get the name of the daemon hosting it */
 647         if (OPAL_SUCCESS != (rc = opal_dss.unpack(&bucket, &dmn, &cnt, ORTE_NAME))) {
 648             OBJ_DESTRUCT(&buf);
 649             OBJ_DESTRUCT(&bucket);
 650             goto release;
 651         }
 652         /* create the proc object */
 653         proc = OBJ_NEW(orte_proc_t);
 654         memcpy(&proc->name, &pname, sizeof(orte_process_name_t));
 655         opal_pointer_array_set_item(jdata->procs, pname.vpid, proc);
 656         /* find the daemon */
 657         OPAL_LIST_FOREACH(node, &nodes, orte_node_t) {
 658             if (node->daemon->name.vpid == dmn.vpid) {
 659                 OBJ_RETAIN(node);
 660                 proc->node = node;
 661                 break;
 662             }
 663         }
 664     }
 665     OBJ_DESTRUCT(&bucket);
 666     OPAL_LIST_DESTRUCT(&nodes);
 667     OBJ_DESTRUCT(&buf);
 668 
 669     /* register the nspace */
 670     if (ORTE_SUCCESS != (rc = orte_pmix_server_register_nspace(jdata, true))) {
 671         ORTE_ERROR_LOG(rc);
 672         OBJ_RELEASE(jdata);
 673         goto release;
 674     }
 675 
 676     /* save the job object so we don't endlessly cycle */
 677     opal_hash_table_set_value_uint32(orte_job_data, jdata->jobid, jdata);
 678 
 679     /* restart the cnct processor */
 680     ORTE_PMIX_OPERATION(cd->procs, cd->info, _cnct, cd->cbfunc, cd->cbdata);
 681     /* protect the re-referenced data */
 682     cd->procs = NULL;
 683     cd->info = NULL;
 684     OBJ_RELEASE(cd);
 685     return;
 686 
 687   release:
 688     if (NULL != cd->cbfunc) {
 689         cd->cbfunc(rc, cd->cbdata);
 690     }
 691     OBJ_RELEASE(cd);
 692 }
 693 
 694 static void _cnct(int sd, short args, void *cbdata)
 695 {
 696     orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
 697     orte_namelist_t *nm;
 698     char **keys = NULL, *key;
 699     orte_job_t *jdata;
 700     int rc = ORTE_SUCCESS;
 701     opal_value_t *kv;
 702 
 703     ORTE_ACQUIRE_OBJECT(cd);
 704 
 705     /* at some point, we need to add bookeeping to track which
 706      * procs are "connected" so we know who to notify upon
 707      * termination or failure. For now, we have to ensure
 708      * that we have registered all participating nspaces so
 709      * the embedded PMIx server can provide them to the client.
 710      * Otherwise, the client will receive an error as it won't
 711      * be able to resolve any of the required data for the
 712      * missing nspaces */
 713 
 714     /* cycle thru the procs */
 715     OPAL_LIST_FOREACH(nm, cd->procs, orte_namelist_t) {
 716         /* see if we have the job object for this job */
 717         if (NULL == (jdata = orte_get_job_data_object(nm->name.jobid))) {
 718             /* we don't know about this job. If our "global" data
 719              * server is just our HNP, then we have no way of finding
 720              * out about it, and all we can do is return an error */
 721             if (orte_pmix_server_globals.server.jobid == ORTE_PROC_MY_HNP->jobid &&
 722                 orte_pmix_server_globals.server.vpid == ORTE_PROC_MY_HNP->vpid) {
 723                 ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED);
 724                 rc = ORTE_ERR_NOT_SUPPORTED;
 725                 goto release;
 726             }
 727             /* ask the global data server for the data - if we get it,
 728              * then we can complete the request */
 729             orte_util_convert_jobid_to_string(&key, nm->name.jobid);
 730             opal_argv_append_nosize(&keys, key);
 731             free(key);
 732             /* we have to add the user's id to our list of info */
 733             kv = OBJ_NEW(opal_value_t);
 734             kv->key = strdup(OPAL_PMIX_USERID);
 735             kv->type = OPAL_UINT32;
 736             kv->data.uint32 = geteuid();
 737             opal_list_append(cd->info, &kv->super);
 738             if (ORTE_SUCCESS != (rc = pmix_server_lookup_fn(&nm->name, keys, cd->info, _cnlk, cd))) {
 739                 ORTE_ERROR_LOG(rc);
 740                 opal_argv_free(keys);
 741                 goto release;
 742             }
 743             opal_argv_free(keys);
 744             /* the callback function on this lookup will return us to this
 745              * routine so we can continue the process */
 746             return;
 747         }
 748         /* we know about the job - check to ensure it has been
 749          * registered with the local PMIx server */
 750         if (!orte_get_attribute(&jdata->attributes, ORTE_JOB_NSPACE_REGISTERED, NULL, OPAL_BOOL)) {
 751             /* it hasn't been registered yet, so register it now */
 752             if (ORTE_SUCCESS != (rc = orte_pmix_server_register_nspace(jdata, true))) {
 753                 ORTE_ERROR_LOG(rc);
 754                 goto release;
 755             }
 756         }
 757     }
 758 
 759   release:
 760     if (NULL != cd->cbfunc) {
 761         cd->cbfunc(rc, cd->cbdata);
 762     }
 763     OBJ_RELEASE(cd);
 764 }
 765 
 766 int pmix_server_connect_fn(opal_list_t *procs, opal_list_t *info,
 767                            opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
 768 {
 769     opal_output_verbose(2, orte_pmix_server_globals.output,
 770                         "%s connect called with %d procs",
 771                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 772                         (int)opal_list_get_size(procs));
 773 
 774     /* protect ourselves */
 775     if (NULL == procs || 0 == opal_list_get_size(procs)) {
 776         return ORTE_ERR_BAD_PARAM;
 777     }
 778     /* must thread shift this as we will be accessing global data */
 779     ORTE_PMIX_OPERATION(procs, info, _cnct, cbfunc, cbdata);
 780     return ORTE_SUCCESS;
 781 }
 782 
 783 static void mdxcbfunc(int status,
 784                       const char *data, size_t ndata, void *cbdata,
 785                       opal_pmix_release_cbfunc_t relcbfunc, void *relcbdata)
 786 {
 787     orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
 788 
 789     ORTE_ACQUIRE_OBJECT(cd);
 790 
 791     /* ack the call */
 792     if (NULL != cd->cbfunc) {
 793         cd->cbfunc(status, cd->cbdata);
 794     }
 795     OBJ_RELEASE(cd);
 796 }
 797 
 798 int pmix_server_disconnect_fn(opal_list_t *procs, opal_list_t *info,
 799                               opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
 800 {
 801     orte_pmix_server_op_caddy_t *cd;
 802     int rc;
 803 
 804     opal_output_verbose(2, orte_pmix_server_globals.output,
 805                         "%s disconnect called with %d procs",
 806                         ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 807                         (int)opal_list_get_size(procs));
 808 
 809     /* at some point, we need to add bookeeping to track which
 810      * procs are "connected" so we know who to notify upon
 811      * termination or failure. For now, just execute a fence
 812      * Note that we do not need to thread-shift here as the
 813      * fence function will do it for us */
 814     cd = OBJ_NEW(orte_pmix_server_op_caddy_t);
 815     cd->cbfunc = cbfunc;
 816     cd->cbdata = cbdata;
 817 
 818     if (ORTE_SUCCESS != (rc = pmix_server_fencenb_fn(procs, info, NULL, 0,
 819                                                      mdxcbfunc, cd))) {
 820         OBJ_RELEASE(cd);
 821     }
 822 
 823     return rc;
 824 }
 825 
 826 int pmix_server_alloc_fn(const opal_process_name_t *requestor,
 827                          opal_pmix_alloc_directive_t dir,
 828                          opal_list_t *info,
 829                          opal_pmix_info_cbfunc_t cbfunc,
 830                          void *cbdata)
 831 {
 832     /* ORTE currently has no way of supporting allocation requests */
 833     return ORTE_ERR_NOT_SUPPORTED;
 834 }

/* [<][>][^][v][top][bottom][index][help] */