This source file includes following definitions.
- plm_tm_init
- plm_tm_launch_job
- launch_daemons
- poll_spawns
- plm_tm_terminate_orteds
- plm_tm_signal_job
- plm_tm_finalize
- plm_tm_connect
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 #include "orte_config.h"
32 #include "orte/constants.h"
33 #include "orte/types.h"
34
35 #include <string.h>
36
37 #ifdef HAVE_UNISTD_H
38 #include <unistd.h>
39 #endif
40 #include <signal.h>
41 #ifdef HAVE_SYS_TYPES_H
42 #include <sys/types.h>
43 #endif
44 #ifdef HAVE_SYS_STAT_H
45 #include <sys/stat.h>
46 #endif
47 #ifdef HAVE_SYS_WAIT_H
48 #include <sys/wait.h>
49 #endif
50 #ifdef HAVE_SCHED_H
51 #include <sched.h>
52 #endif
53 #ifdef HAVE_SYS_TIME_H
54 #include <sys/time.h>
55 #endif
56 #include <errno.h>
57 #include <tm.h>
58
59 #include "opal/mca/installdirs/installdirs.h"
60 #include "opal/mca/event/event.h"
61 #include "opal/util/argv.h"
62 #include "opal/util/output.h"
63 #include "orte/util/show_help.h"
64 #include "opal/util/opal_environ.h"
65 #include "opal/util/basename.h"
66 #include "opal/util/printf.h"
67
68 #include "orte/util/name_fns.h"
69 #include "orte/util/threads.h"
70 #include "orte/runtime/orte_globals.h"
71 #include "orte/runtime/orte_wait.h"
72 #include "orte/mca/errmgr/errmgr.h"
73 #include "orte/mca/rmaps/rmaps.h"
74 #include "orte/mca/state/state.h"
75
76 #include "orte/mca/plm/plm.h"
77 #include "orte/mca/plm/base/plm_private.h"
78 #include "plm_tm.h"
79
80
81
82
83
84
85 static int plm_tm_init(void);
86 static int plm_tm_launch_job(orte_job_t *jdata);
87 static int plm_tm_terminate_orteds(void);
88 static int plm_tm_signal_job(orte_jobid_t jobid, int32_t signal);
89 static int plm_tm_finalize(void);
90
91
92
93
94 static orte_std_cntr_t launched = 0;
95 static bool connected = false;
96
97
98
99
100 orte_plm_base_module_t orte_plm_tm_module = {
101 plm_tm_init,
102 orte_plm_base_set_hnp_name,
103 plm_tm_launch_job,
104 NULL,
105 orte_plm_base_orted_terminate_job,
106 plm_tm_terminate_orteds,
107 orte_plm_base_orted_kill_local_procs,
108 plm_tm_signal_job,
109 plm_tm_finalize
110 };
111
112
113 static int plm_tm_connect(void);
114 static void launch_daemons(int fd, short args, void *cbdata);
115 static void poll_spawns(int fd, short args, void *cbdata);
116
117
118
119
120
121 static int plm_tm_init(void)
122 {
123 int rc;
124
125 if (ORTE_SUCCESS != (rc = orte_plm_base_comm_start())) {
126 ORTE_ERROR_LOG(rc);
127 }
128
129
130 orte_plm_globals.daemon_nodes_assigned_at_launch = true;
131
132
133 if (ORTE_SUCCESS != (rc = orte_state.add_job_state(ORTE_JOB_STATE_LAUNCH_DAEMONS,
134 launch_daemons, ORTE_SYS_PRI))) {
135 ORTE_ERROR_LOG(rc);
136 return rc;
137 }
138
139
140
141
142 if (ORTE_SUCCESS != (rc = orte_state.set_job_state_callback(ORTE_JOB_STATE_DAEMONS_LAUNCHED,
143 poll_spawns))) {
144 ORTE_ERROR_LOG(rc);
145 return rc;
146 }
147
148 return rc;
149 }
150
151
152 static int plm_tm_launch_job(orte_job_t *jdata)
153 {
154 if (ORTE_FLAG_TEST(jdata, ORTE_JOB_FLAG_RESTART)) {
155
156 ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_MAP);
157 } else {
158
159 ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_INIT);
160 }
161 return ORTE_SUCCESS;
162 }
163
164
165
166
167
168 static void launch_daemons(int fd, short args, void *cbdata)
169 {
170 orte_job_map_t *map = NULL;
171 orte_app_context_t *app;
172 orte_node_t *node;
173 int proc_vpid_index;
174 char *param;
175 char **env = NULL;
176 char *var;
177 char **argv = NULL;
178 int argc = 0;
179 int rc;
180 orte_std_cntr_t i;
181 char *bin_base = NULL, *lib_base = NULL;
182 tm_event_t *tm_events = NULL;
183 tm_task_id *tm_task_ids = NULL;
184 bool failed_launch = true;
185 mode_t current_umask;
186 char* vpid_string;
187 orte_job_t *daemons, *jdata;
188 orte_state_caddy_t *state = (orte_state_caddy_t*)cbdata;
189 int32_t launchid, *ldptr;
190 char *prefix_dir = NULL;
191
192 ORTE_ACQUIRE_OBJECT(state);
193
194 jdata = state->jdata;
195
196
197
198
199 if (ORTE_FLAG_TEST(state->jdata, ORTE_JOB_FLAG_DEBUGGER_DAEMON)) {
200 jdata->state = ORTE_JOB_STATE_DAEMONS_LAUNCHED;
201 ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_DAEMONS_REPORTED);
202 OBJ_RELEASE(state);
203 return;
204 }
205
206
207 daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid);
208 if (ORTE_SUCCESS != (rc = orte_plm_base_setup_virtual_machine(jdata))) {
209 ORTE_ERROR_LOG(rc);
210 goto cleanup;
211 }
212
213
214
215
216
217 if (orte_do_not_launch) {
218
219
220
221
222 jdata->state = ORTE_JOB_STATE_DAEMONS_LAUNCHED;
223 ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_DAEMONS_REPORTED);
224 OBJ_RELEASE(state);
225 return;
226 }
227
228
229 if (NULL == (map = daemons->map)) {
230 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
231 rc = ORTE_ERR_NOT_FOUND;
232 goto cleanup;
233 }
234
235 if (0 == map->num_new_daemons) {
236
237
238
239
240 jdata->state = ORTE_JOB_STATE_DAEMONS_LAUNCHED;
241 ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_DAEMONS_REPORTED);
242 OBJ_RELEASE(state);
243 return;
244 }
245
246 OPAL_OUTPUT_VERBOSE((1, orte_plm_base_framework.framework_output,
247 "%s plm:tm: launching vm",
248 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
249
250
251 tm_events = malloc(sizeof(tm_event_t) * map->num_new_daemons);
252 if (NULL == tm_events) {
253 rc = ORTE_ERR_OUT_OF_RESOURCE;
254 ORTE_ERROR_LOG(rc);
255 goto cleanup;
256 }
257 tm_task_ids = malloc(sizeof(tm_task_id) * map->num_new_daemons);
258 if (NULL == tm_task_ids) {
259 rc = ORTE_ERR_OUT_OF_RESOURCE;
260 ORTE_ERROR_LOG(rc);
261 goto cleanup;
262 }
263
264
265 orte_plm_base_setup_orted_cmd(&argc, &argv);
266
267
268 orte_plm_base_orted_append_basic_args(&argc, &argv, "tm",
269 &proc_vpid_index);
270
271 if (0 < opal_output_get_verbosity(orte_plm_base_framework.framework_output)) {
272 param = opal_argv_join(argv, ' ');
273 OPAL_OUTPUT_VERBOSE((1, orte_plm_base_framework.framework_output,
274 "%s plm:tm: final top-level argv:\n\t%s",
275 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
276 (NULL == param) ? "NULL" : param));
277 if (NULL != param) free(param);
278 }
279
280 if (!connected) {
281 if (ORTE_SUCCESS != plm_tm_connect()) {
282 goto cleanup;
283 }
284 connected = true;
285 }
286
287
288
289
290 lib_base = opal_basename(opal_install_dirs.libdir);
291 bin_base = opal_basename(opal_install_dirs.bindir);
292
293
294 env = opal_argv_copy(orte_launch_environ);
295
296
297 (void) mca_base_var_env_name ("plm", &var);
298 opal_setenv(var, "rsh", true, &env);
299 free(var);
300
301
302 current_umask = umask(0);
303 umask(current_umask);
304 opal_asprintf(&var, "0%o", current_umask);
305 opal_setenv("ORTE_DAEMON_UMASK_VALUE", var, true, &env);
306 free(var);
307
308
309
310
311
312
313
314 app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, 0);
315 orte_get_attribute(&app->attributes, ORTE_APP_PREFIX_DIR, (void**)&prefix_dir, OPAL_STRING);
316 if (NULL != prefix_dir) {
317 char *newenv;
318
319 for (i = 0; NULL != env && NULL != env[i]; ++i) {
320
321 if (0 == strncmp("PATH=", env[i], 5)) {
322 opal_asprintf(&newenv, "%s/%s:%s",
323 prefix_dir, bin_base, env[i] + 5);
324 OPAL_OUTPUT_VERBOSE((1, orte_plm_base_framework.framework_output,
325 "%s plm:tm: resetting PATH: %s",
326 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
327 newenv));
328 opal_setenv("PATH", newenv, true, &env);
329 free(newenv);
330 }
331
332
333 else if (0 == strncmp("LD_LIBRARY_PATH=", env[i], 16)) {
334 opal_asprintf(&newenv, "%s/%s:%s",
335 prefix_dir, lib_base, env[i] + 16);
336 OPAL_OUTPUT_VERBOSE((1, orte_plm_base_framework.framework_output,
337 "%s plm:tm: resetting LD_LIBRARY_PATH: %s",
338 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
339 newenv));
340 opal_setenv("LD_LIBRARY_PATH", newenv, true, &env);
341 free(newenv);
342 }
343 }
344 free(prefix_dir);
345 }
346
347
348
349
350 ldptr = &launchid;
351 for (i = 0; i < map->nodes->size; i++) {
352 if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, i))) {
353 continue;
354 }
355
356 if (ORTE_FLAG_TEST(node, ORTE_NODE_FLAG_DAEMON_LAUNCHED)) {
357 continue;
358 }
359
360 OPAL_OUTPUT_VERBOSE((1, orte_plm_base_framework.framework_output,
361 "%s plm:tm: launching on node %s",
362 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
363 node->name));
364
365
366 rc = orte_util_convert_vpid_to_string(&vpid_string, node->daemon->name.vpid);
367 if (ORTE_SUCCESS != rc) {
368 opal_output(0, "plm:tm: unable to get daemon vpid as string");
369 exit(-1);
370 }
371 free(argv[proc_vpid_index]);
372 argv[proc_vpid_index] = strdup(vpid_string);
373 free(vpid_string);
374
375
376 if (0 < opal_output_get_verbosity(orte_plm_base_framework.framework_output)) {
377 param = opal_argv_join(argv, ' ');
378 OPAL_OUTPUT_VERBOSE((1, orte_plm_base_framework.framework_output,
379 "%s plm:tm: executing:\n\t%s",
380 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
381 (NULL == param) ? "NULL" : param));
382 if (NULL != param) free(param);
383 }
384
385 launchid = 0;
386 if (!orte_get_attribute(&node->attributes, ORTE_NODE_LAUNCH_ID, (void**)&ldptr, OPAL_INT32)) {
387 orte_show_help("help-plm-tm.txt", "tm-spawn-failed", true, argv[0], node->name, 0);
388 rc = ORTE_ERROR;
389 goto cleanup;
390 }
391 rc = tm_spawn(argc, argv, env, launchid, tm_task_ids + launched, tm_events + launched);
392 if (TM_SUCCESS != rc) {
393 orte_show_help("help-plm-tm.txt", "tm-spawn-failed", true, argv[0], node->name, launchid);
394 rc = ORTE_ERROR;
395 goto cleanup;
396 }
397
398 launched++;
399 }
400
401
402 state->jdata->state = ORTE_JOB_STATE_DAEMONS_LAUNCHED;
403 daemons->state = ORTE_JOB_STATE_DAEMONS_LAUNCHED;
404
405
406 failed_launch = false;
407
408 OPAL_OUTPUT_VERBOSE((1, orte_plm_base_framework.framework_output,
409 "%s plm:tm:launch: finished spawning orteds",
410 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
411
412 cleanup:
413
414 OBJ_RELEASE(state);
415
416
417 if (failed_launch) {
418 ORTE_ACTIVATE_JOB_STATE(daemons, ORTE_JOB_STATE_FAILED_TO_START);
419 }
420 }
421
422 static void poll_spawns(int fd, short args, void *cbdata)
423 {
424 orte_state_caddy_t *state = (orte_state_caddy_t*)cbdata;
425 int i, rc;
426 bool failed_launch = true;
427 int local_err;
428 tm_event_t event;
429
430 ORTE_ACQUIRE_OBJECT(state);
431
432
433 for (i = 0; i < launched; ++i) {
434 rc = tm_poll(TM_NULL_EVENT, &event, 1, &local_err);
435 if (TM_SUCCESS != rc) {
436 opal_output(0, "plm:tm: failed to poll for a spawned daemon, return status = %d", rc);
437 goto cleanup;
438 }
439 if (TM_SUCCESS != local_err) {
440 opal_output(0, "plm:tm: failed to spawn daemon, error code = %d", local_err );
441 goto cleanup;
442 }
443 }
444 failed_launch = false;
445
446 cleanup:
447
448 OBJ_RELEASE(state);
449
450
451 if (failed_launch) {
452 ORTE_ACTIVATE_JOB_STATE(state->jdata, ORTE_JOB_STATE_FAILED_TO_START);
453 }
454 }
455
456
457
458
459
460 int plm_tm_terminate_orteds(void)
461 {
462 int rc;
463
464 if (ORTE_SUCCESS != (rc = orte_plm_base_orted_exit(ORTE_DAEMON_EXIT_CMD))) {
465 ORTE_ERROR_LOG(rc);
466 }
467
468 return rc;
469 }
470
471 static int plm_tm_signal_job(orte_jobid_t jobid, int32_t signal)
472 {
473 int rc;
474
475
476 if (ORTE_SUCCESS != (rc = orte_plm_base_orted_signal_local_procs(jobid, signal))) {
477 ORTE_ERROR_LOG(rc);
478 }
479
480 return rc;
481 }
482
483
484
485
486
487 static int plm_tm_finalize(void)
488 {
489 int rc;
490
491
492 if (ORTE_SUCCESS != (rc = orte_plm_base_comm_stop())) {
493 ORTE_ERROR_LOG(rc);
494 }
495
496 if (connected) {
497 tm_finalize();
498 connected = false;
499 }
500
501 return ORTE_SUCCESS;
502 }
503
504
505 static int plm_tm_connect(void)
506 {
507 int ret;
508 struct tm_roots tm_root;
509 int count;
510 struct timespec tp = {0, 100};
511
512
513
514 for (count = 0 ; count < 10; ++count) {
515 ret = tm_init(NULL, &tm_root);
516 if (TM_SUCCESS == ret) {
517 return ORTE_SUCCESS;
518 }
519
520
521
522
523 nanosleep(&tp, NULL);
524 #ifdef HAVE_SCHED_H
525 sched_yield();
526 #endif
527 }
528
529 return ORTE_ERR_RESOURCE_BUSY;
530 }