This source file includes following definitions.
- plm_tm_init
- plm_tm_launch_job
- launch_daemons
- poll_spawns
- plm_tm_terminate_orteds
- plm_tm_signal_job
- plm_tm_finalize
- plm_tm_connect
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 
  17 
  18 
  19 
  20 
  21 
  22 
  23 
  24 
  25 
  26 
  27 
  28 
  29 
  30 
  31 #include "orte_config.h"
  32 #include "orte/constants.h"
  33 #include "orte/types.h"
  34 
  35 #include <string.h>
  36 
  37 #ifdef HAVE_UNISTD_H
  38 #include <unistd.h>
  39 #endif
  40 #include <signal.h>
  41 #ifdef HAVE_SYS_TYPES_H
  42 #include <sys/types.h>
  43 #endif
  44 #ifdef HAVE_SYS_STAT_H
  45 #include <sys/stat.h>
  46 #endif
  47 #ifdef HAVE_SYS_WAIT_H
  48 #include <sys/wait.h>
  49 #endif
  50 #ifdef HAVE_SCHED_H
  51 #include <sched.h>
  52 #endif
  53 #ifdef HAVE_SYS_TIME_H
  54 #include <sys/time.h>
  55 #endif
  56 #include <errno.h>
  57 #include <tm.h>
  58 
  59 #include "opal/mca/installdirs/installdirs.h"
  60 #include "opal/mca/event/event.h"
  61 #include "opal/util/argv.h"
  62 #include "opal/util/output.h"
  63 #include "orte/util/show_help.h"
  64 #include "opal/util/opal_environ.h"
  65 #include "opal/util/basename.h"
  66 #include "opal/util/printf.h"
  67 
  68 #include "orte/util/name_fns.h"
  69 #include "orte/util/threads.h"
  70 #include "orte/runtime/orte_globals.h"
  71 #include "orte/runtime/orte_wait.h"
  72 #include "orte/mca/errmgr/errmgr.h"
  73 #include "orte/mca/rmaps/rmaps.h"
  74 #include "orte/mca/state/state.h"
  75 
  76 #include "orte/mca/plm/plm.h"
  77 #include "orte/mca/plm/base/plm_private.h"
  78 #include "plm_tm.h"
  79 
  80 
  81 
  82 
  83 
  84 
  85 static int plm_tm_init(void);
  86 static int plm_tm_launch_job(orte_job_t *jdata);
  87 static int plm_tm_terminate_orteds(void);
  88 static int plm_tm_signal_job(orte_jobid_t jobid, int32_t signal);
  89 static int plm_tm_finalize(void);
  90 
  91 
  92 
  93 
  94 static orte_std_cntr_t launched = 0;
  95 static bool connected = false;
  96 
  97 
  98 
  99 
 100 orte_plm_base_module_t orte_plm_tm_module = {
 101     plm_tm_init,
 102     orte_plm_base_set_hnp_name,
 103     plm_tm_launch_job,
 104     NULL,
 105     orte_plm_base_orted_terminate_job,
 106     plm_tm_terminate_orteds,
 107     orte_plm_base_orted_kill_local_procs,
 108     plm_tm_signal_job,
 109     plm_tm_finalize
 110 };
 111 
 112 
 113 static int plm_tm_connect(void);
 114 static void launch_daemons(int fd, short args, void *cbdata);
 115 static void poll_spawns(int fd, short args, void *cbdata);
 116 
 117 
 118 
 119 
 120 
 121 static int plm_tm_init(void)
 122 {
 123     int rc;
 124 
 125     if (ORTE_SUCCESS != (rc = orte_plm_base_comm_start())) {
 126         ORTE_ERROR_LOG(rc);
 127     }
 128 
 129     
 130     orte_plm_globals.daemon_nodes_assigned_at_launch = true;
 131 
 132     
 133     if (ORTE_SUCCESS != (rc = orte_state.add_job_state(ORTE_JOB_STATE_LAUNCH_DAEMONS,
 134                                                        launch_daemons, ORTE_SYS_PRI))) {
 135         ORTE_ERROR_LOG(rc);
 136         return rc;
 137     }
 138 
 139     
 140 
 141 
 142     if (ORTE_SUCCESS != (rc = orte_state.set_job_state_callback(ORTE_JOB_STATE_DAEMONS_LAUNCHED,
 143                                                                 poll_spawns))) {
 144         ORTE_ERROR_LOG(rc);
 145         return rc;
 146     }
 147 
 148     return rc;
 149 }
 150 
 151 
 152 static int plm_tm_launch_job(orte_job_t *jdata)
 153 {
 154     if (ORTE_FLAG_TEST(jdata, ORTE_JOB_FLAG_RESTART)) {
 155         
 156         ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_MAP);
 157     } else {
 158         
 159         ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_INIT);
 160     }
 161     return ORTE_SUCCESS;
 162 }
 163 
 164 
 165 
 166 
 167 
 168 static void launch_daemons(int fd, short args, void *cbdata)
 169 {
 170     orte_job_map_t *map = NULL;
 171     orte_app_context_t *app;
 172     orte_node_t *node;
 173     int proc_vpid_index;
 174     char *param;
 175     char **env = NULL;
 176     char *var;
 177     char **argv = NULL;
 178     int argc = 0;
 179     int rc;
 180     orte_std_cntr_t i;
 181     char *bin_base = NULL, *lib_base = NULL;
 182     tm_event_t *tm_events = NULL;
 183     tm_task_id *tm_task_ids = NULL;
 184     bool failed_launch = true;
 185     mode_t current_umask;
 186     char* vpid_string;
 187     orte_job_t *daemons, *jdata;
 188     orte_state_caddy_t *state = (orte_state_caddy_t*)cbdata;
 189     int32_t launchid, *ldptr;
 190     char *prefix_dir = NULL;
 191 
 192     ORTE_ACQUIRE_OBJECT(state);
 193 
 194     jdata = state->jdata;
 195 
 196     
 197 
 198 
 199     if (ORTE_FLAG_TEST(state->jdata, ORTE_JOB_FLAG_DEBUGGER_DAEMON)) {
 200         jdata->state = ORTE_JOB_STATE_DAEMONS_LAUNCHED;
 201         ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_DAEMONS_REPORTED);
 202         OBJ_RELEASE(state);
 203         return;
 204     }
 205 
 206     
 207     daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid);
 208     if (ORTE_SUCCESS != (rc = orte_plm_base_setup_virtual_machine(jdata))) {
 209         ORTE_ERROR_LOG(rc);
 210         goto cleanup;
 211     }
 212 
 213     
 214 
 215 
 216 
 217     if (orte_do_not_launch) {
 218         
 219 
 220 
 221 
 222         jdata->state = ORTE_JOB_STATE_DAEMONS_LAUNCHED;
 223         ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_DAEMONS_REPORTED);
 224         OBJ_RELEASE(state);
 225         return;
 226     }
 227 
 228     
 229     if (NULL == (map = daemons->map)) {
 230         ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
 231         rc = ORTE_ERR_NOT_FOUND;
 232         goto cleanup;
 233     }
 234 
 235     if (0 == map->num_new_daemons) {
 236         
 237 
 238 
 239 
 240         jdata->state = ORTE_JOB_STATE_DAEMONS_LAUNCHED;
 241         ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_DAEMONS_REPORTED);
 242         OBJ_RELEASE(state);
 243         return;
 244     }
 245 
 246     OPAL_OUTPUT_VERBOSE((1, orte_plm_base_framework.framework_output,
 247                          "%s plm:tm: launching vm",
 248                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
 249 
 250     
 251     tm_events = malloc(sizeof(tm_event_t) * map->num_new_daemons);
 252     if (NULL == tm_events) {
 253         rc = ORTE_ERR_OUT_OF_RESOURCE;
 254         ORTE_ERROR_LOG(rc);
 255         goto cleanup;
 256     }
 257     tm_task_ids = malloc(sizeof(tm_task_id) * map->num_new_daemons);
 258     if (NULL == tm_task_ids) {
 259         rc = ORTE_ERR_OUT_OF_RESOURCE;
 260         ORTE_ERROR_LOG(rc);
 261         goto cleanup;
 262     }
 263 
 264     
 265     orte_plm_base_setup_orted_cmd(&argc, &argv);
 266 
 267     
 268     orte_plm_base_orted_append_basic_args(&argc, &argv, "tm",
 269                                           &proc_vpid_index);
 270 
 271     if (0 < opal_output_get_verbosity(orte_plm_base_framework.framework_output)) {
 272         param = opal_argv_join(argv, ' ');
 273         OPAL_OUTPUT_VERBOSE((1, orte_plm_base_framework.framework_output,
 274                              "%s plm:tm: final top-level argv:\n\t%s",
 275                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 276                              (NULL == param) ? "NULL" : param));
 277         if (NULL != param) free(param);
 278     }
 279 
 280     if (!connected) {
 281         if (ORTE_SUCCESS != plm_tm_connect()) {
 282             goto cleanup;
 283         }
 284         connected = true;
 285     }
 286 
 287     
 288 
 289 
 290     lib_base = opal_basename(opal_install_dirs.libdir);
 291     bin_base = opal_basename(opal_install_dirs.bindir);
 292 
 293     
 294     env = opal_argv_copy(orte_launch_environ);
 295 
 296     
 297     (void) mca_base_var_env_name ("plm", &var);
 298     opal_setenv(var, "rsh", true, &env);
 299     free(var);
 300 
 301     
 302     current_umask = umask(0);
 303     umask(current_umask);
 304     opal_asprintf(&var, "0%o", current_umask);
 305     opal_setenv("ORTE_DAEMON_UMASK_VALUE", var, true, &env);
 306     free(var);
 307 
 308     
 309 
 310 
 311 
 312 
 313 
 314     app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, 0);
 315     orte_get_attribute(&app->attributes, ORTE_APP_PREFIX_DIR, (void**)&prefix_dir, OPAL_STRING);
 316     if (NULL != prefix_dir) {
 317         char *newenv;
 318 
 319         for (i = 0; NULL != env && NULL != env[i]; ++i) {
 320             
 321             if (0 == strncmp("PATH=", env[i], 5)) {
 322                 opal_asprintf(&newenv, "%s/%s:%s",
 323                                prefix_dir, bin_base, env[i] + 5);
 324                 OPAL_OUTPUT_VERBOSE((1, orte_plm_base_framework.framework_output,
 325                                      "%s plm:tm: resetting PATH: %s",
 326                                      ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 327                                      newenv));
 328                 opal_setenv("PATH", newenv, true, &env);
 329                 free(newenv);
 330             }
 331 
 332             
 333             else if (0 == strncmp("LD_LIBRARY_PATH=", env[i], 16)) {
 334                 opal_asprintf(&newenv, "%s/%s:%s",
 335                                prefix_dir, lib_base, env[i] + 16);
 336                 OPAL_OUTPUT_VERBOSE((1, orte_plm_base_framework.framework_output,
 337                                      "%s plm:tm: resetting LD_LIBRARY_PATH: %s",
 338                                      ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 339                                      newenv));
 340                 opal_setenv("LD_LIBRARY_PATH", newenv, true, &env);
 341                 free(newenv);
 342             }
 343         }
 344         free(prefix_dir);
 345     }
 346 
 347     
 348 
 349 
 350     ldptr = &launchid;
 351     for (i = 0; i < map->nodes->size; i++) {
 352         if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, i))) {
 353             continue;
 354         }
 355         
 356         if (ORTE_FLAG_TEST(node, ORTE_NODE_FLAG_DAEMON_LAUNCHED)) {
 357             continue;
 358         }
 359 
 360         OPAL_OUTPUT_VERBOSE((1, orte_plm_base_framework.framework_output,
 361                              "%s plm:tm: launching on node %s",
 362                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 363                              node->name));
 364 
 365         
 366         rc = orte_util_convert_vpid_to_string(&vpid_string, node->daemon->name.vpid);
 367         if (ORTE_SUCCESS != rc) {
 368             opal_output(0, "plm:tm: unable to get daemon vpid as string");
 369             exit(-1);
 370         }
 371         free(argv[proc_vpid_index]);
 372         argv[proc_vpid_index] = strdup(vpid_string);
 373         free(vpid_string);
 374 
 375         
 376         if (0 < opal_output_get_verbosity(orte_plm_base_framework.framework_output)) {
 377             param = opal_argv_join(argv, ' ');
 378             OPAL_OUTPUT_VERBOSE((1, orte_plm_base_framework.framework_output,
 379                                  "%s plm:tm: executing:\n\t%s",
 380                                  ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
 381                                  (NULL == param) ? "NULL" : param));
 382             if (NULL != param) free(param);
 383         }
 384 
 385         launchid = 0;
 386         if (!orte_get_attribute(&node->attributes, ORTE_NODE_LAUNCH_ID, (void**)&ldptr, OPAL_INT32)) {
 387             orte_show_help("help-plm-tm.txt", "tm-spawn-failed", true, argv[0], node->name, 0);
 388             rc = ORTE_ERROR;
 389             goto cleanup;
 390         }
 391         rc = tm_spawn(argc, argv, env, launchid, tm_task_ids + launched, tm_events + launched);
 392         if (TM_SUCCESS != rc) {
 393             orte_show_help("help-plm-tm.txt", "tm-spawn-failed", true, argv[0], node->name, launchid);
 394             rc = ORTE_ERROR;
 395             goto cleanup;
 396         }
 397 
 398         launched++;
 399     }
 400 
 401     
 402     state->jdata->state = ORTE_JOB_STATE_DAEMONS_LAUNCHED;
 403     daemons->state = ORTE_JOB_STATE_DAEMONS_LAUNCHED;
 404 
 405     
 406     failed_launch = false;
 407 
 408     OPAL_OUTPUT_VERBOSE((1, orte_plm_base_framework.framework_output,
 409                          "%s plm:tm:launch: finished spawning orteds",
 410                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
 411 
 412   cleanup:
 413     
 414     OBJ_RELEASE(state);
 415 
 416     
 417     if (failed_launch) {
 418         ORTE_ACTIVATE_JOB_STATE(daemons, ORTE_JOB_STATE_FAILED_TO_START);
 419     }
 420 }
 421 
 422 static void poll_spawns(int fd, short args, void *cbdata)
 423 {
 424     orte_state_caddy_t *state = (orte_state_caddy_t*)cbdata;
 425     int i, rc;
 426     bool failed_launch = true;
 427     int local_err;
 428     tm_event_t event;
 429 
 430     ORTE_ACQUIRE_OBJECT(state);
 431 
 432     
 433     for (i = 0; i < launched; ++i) {
 434         rc = tm_poll(TM_NULL_EVENT, &event, 1, &local_err);
 435         if (TM_SUCCESS != rc) {
 436             opal_output(0, "plm:tm: failed to poll for a spawned daemon, return status = %d", rc);
 437             goto cleanup;
 438         }
 439         if (TM_SUCCESS != local_err) {
 440             opal_output(0, "plm:tm: failed to spawn daemon, error code = %d", local_err );
 441             goto cleanup;
 442         }
 443     }
 444     failed_launch = false;
 445 
 446   cleanup:
 447     
 448     OBJ_RELEASE(state);
 449 
 450     
 451     if (failed_launch) {
 452         ORTE_ACTIVATE_JOB_STATE(state->jdata, ORTE_JOB_STATE_FAILED_TO_START);
 453     }
 454 }
 455 
 456 
 457 
 458 
 459 
 460 int plm_tm_terminate_orteds(void)
 461 {
 462     int rc;
 463 
 464     if (ORTE_SUCCESS != (rc = orte_plm_base_orted_exit(ORTE_DAEMON_EXIT_CMD))) {
 465         ORTE_ERROR_LOG(rc);
 466     }
 467 
 468     return rc;
 469 }
 470 
 471 static int plm_tm_signal_job(orte_jobid_t jobid, int32_t signal)
 472 {
 473     int rc;
 474 
 475     
 476     if (ORTE_SUCCESS != (rc = orte_plm_base_orted_signal_local_procs(jobid, signal))) {
 477         ORTE_ERROR_LOG(rc);
 478     }
 479 
 480     return rc;
 481 }
 482 
 483 
 484 
 485 
 486 
 487 static int plm_tm_finalize(void)
 488 {
 489     int rc;
 490 
 491     
 492     if (ORTE_SUCCESS != (rc = orte_plm_base_comm_stop())) {
 493         ORTE_ERROR_LOG(rc);
 494     }
 495 
 496     if (connected) {
 497         tm_finalize();
 498         connected = false;
 499     }
 500 
 501     return ORTE_SUCCESS;
 502 }
 503 
 504 
 505 static int plm_tm_connect(void)
 506 {
 507     int ret;
 508     struct tm_roots tm_root;
 509     int count;
 510     struct timespec tp = {0, 100};
 511 
 512     
 513 
 514     for (count = 0 ; count < 10; ++count) {
 515         ret = tm_init(NULL, &tm_root);
 516         if (TM_SUCCESS == ret) {
 517             return ORTE_SUCCESS;
 518         }
 519 
 520         
 521 
 522 
 523         nanosleep(&tp, NULL);
 524 #ifdef HAVE_SCHED_H
 525         sched_yield();
 526 #endif
 527     }
 528 
 529     return ORTE_ERR_RESOURCE_BUSY;
 530 }