This source file includes following definitions.
- global_coord_init
- global_coord_finalize
- global_coord_setup_job
- global_coord_release_job
- global_coord_start_ckpt
- global_coord_end_ckpt
- global_init_job_structs
- global_refresh_job_structs
- snapc_full_global_start_listener
- snapc_full_global_stop_listener
- snapc_full_global_start_cmdline_listener
- snapc_full_global_stop_cmdline_listener
- snapc_full_global_cmdline_recv
- snapc_full_global_orted_recv
- snapc_full_process_request_op_cmd
- snapc_full_process_orted_update_cmd
- snapc_full_process_restart_proc_info_cmd
- global_coord_restart_proc_info
- snapc_full_process_job_update_cmd
- snapc_full_establish_snapshot_dir
- snapc_full_global_checkpoint
- snapc_full_global_notify_checkpoint
- orte_snapc_full_global_set_job_ckpt_info
- global_coord_job_state_update
- write_out_global_metadata
- find_orted_snapshot
- snapc_full_global_get_min_state
- orte_snapc_full_global_reset_coord
- snapc_full_set_time
- snapc_full_display_all_timers
- snapc_full_display_recovered_timers
- snapc_full_clear_timers
- snapc_full_get_time
- snapc_full_display_indv_timer_core
- snapc_full_report_progress
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 #include "orte_config.h"
20
21 #include <sys/types.h>
22 #ifdef HAVE_UNISTD_H
23 #include <unistd.h>
24 #endif
25 #include <string.h>
26
27 #include "opal/include/opal/prefetch.h"
28 #include "opal/util/output.h"
29 #include "opal/util/opal_environ.h"
30 #include "opal/util/basename.h"
31 #include "opal/util/show_help.h"
32 #include "opal/util/string_copy.h"
33 #include "orte/mca/mca.h"
34 #include "opal/mca/base/base.h"
35 #include "opal/mca/crs/crs.h"
36 #include "opal/mca/crs/base/base.h"
37
38 #include "orte/util/name_fns.h"
39 #include "orte/util/proc_info.h"
40 #include "orte/runtime/orte_globals.h"
41 #include "opal/dss/dss.h"
42 #include "orte/mca/rml/rml.h"
43 #include "orte/mca/rml/rml_types.h"
44 #include "orte/mca/rmaps/rmaps.h"
45 #include "orte/mca/rmaps/rmaps_types.h"
46 #include "orte/mca/plm/plm.h"
47 #include "orte/mca/grpcomm/grpcomm.h"
48 #include "orte/runtime/orte_wait.h"
49 #include "orte/mca/errmgr/errmgr.h"
50 #include "orte/mca/errmgr/base/base.h"
51
52 #include "orte/mca/snapc/snapc.h"
53 #include "orte/mca/snapc/base/base.h"
54
55 #include "snapc_full.h"
56
57 #include MCA_timer_IMPLEMENTATION_HEADER
58
59
60
61
62 #define INC_SEQ_NUM() \
63 { \
64 if(orte_snapc_base_store_only_one_seq) { \
65 orte_snapc_base_snapshot_seq_number = 0; \
66 } else { \
67 orte_snapc_base_snapshot_seq_number++; \
68 } \
69 }
70
71 static orte_jobid_t current_global_jobid = ORTE_JOBID_INVALID;
72 static orte_snapc_base_global_snapshot_t global_snapshot;
73 static int current_total_orteds = 0;
74 static bool updated_job_to_running;
75 static int current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_NONE;
76 static bool cleanup_on_establish = false;
77 static bool global_coord_has_local_children = false;
78
79 static bool currently_migrating = false;
80 static opal_list_t *migrating_procs = NULL;
81
82 static int global_init_job_structs(void);
83 static int global_refresh_job_structs(void);
84
85 static bool snapc_orted_recv_issued = false;
86 static bool is_orte_checkpoint_connected = false;
87 static bool is_app_checkpointable = false;
88 static int snapc_full_global_start_listener(void);
89 static int snapc_full_global_stop_listener(void);
90 static void snapc_full_global_orted_recv(int status,
91 orte_process_name_t* sender,
92 opal_buffer_t* buffer,
93 orte_rml_tag_t tag,
94 void* cbdata);
95
96 static void snapc_full_process_restart_proc_info_cmd(orte_process_name_t* sender,
97 opal_buffer_t* buffer);
98
99 static void snapc_full_process_request_op_cmd(orte_process_name_t* sender,
100 opal_buffer_t* buffer);
101
102
103 static orte_process_name_t orte_checkpoint_sender;
104 static bool snapc_cmdline_recv_issued = false;
105 static int snapc_full_global_start_cmdline_listener(void);
106 static int snapc_full_global_stop_cmdline_listener(void);
107 static void snapc_full_global_cmdline_recv(int status,
108 orte_process_name_t* sender,
109 opal_buffer_t* buffer,
110 orte_rml_tag_t tag,
111 void* cbdata);
112
113 static int snapc_full_establish_snapshot_dir(bool empty_metadata);
114
115
116 static int snapc_full_global_checkpoint(opal_crs_base_ckpt_options_t *options);
117 static int snapc_full_global_notify_checkpoint(orte_jobid_t jobid,
118 opal_crs_base_ckpt_options_t *options);
119 static int orte_snapc_full_global_set_job_ckpt_info( orte_jobid_t jobid,
120 int ckpt_state,
121 orte_sstore_base_handle_t handle,
122 bool quick,
123 opal_crs_base_ckpt_options_t *options);
124 int global_coord_job_state_update(orte_jobid_t jobid,
125 int job_ckpt_state,
126 orte_sstore_base_handle_t handle,
127 opal_crs_base_ckpt_options_t *options);
128 static void snapc_full_process_job_update_cmd(orte_process_name_t* sender,
129 opal_buffer_t* buffer,
130 bool quick);
131 static int snapc_full_process_orted_update_cmd(orte_process_name_t* sender,
132 opal_buffer_t* buffer,
133 bool quick);
134 static orte_snapc_full_orted_snapshot_t *find_orted_snapshot(orte_process_name_t *name );
135
136 static int snapc_full_global_get_min_state(void);
137 static int write_out_global_metadata(void);
138
139 static int orte_snapc_full_global_reset_coord(void);
140
141
142
143
144 static void snapc_full_set_time(int idx);
145 static void snapc_full_display_all_timers(void);
146 static void snapc_full_display_recovered_timers(void);
147 static void snapc_full_clear_timers(void);
148
149 static double snapc_full_get_time(void);
150 static void snapc_full_display_indv_timer_core(double diff, char *str);
151
152 #define SNAPC_FULL_TIMER_START 0
153 #define SNAPC_FULL_TIMER_RUNNING 1
154 #define SNAPC_FULL_TIMER_FIN_LOCAL 2
155 #define SNAPC_FULL_TIMER_SS_SYNC 3
156 #define SNAPC_FULL_TIMER_ESTABLISH 4
157 #define SNAPC_FULL_TIMER_RECOVERED 5
158 #define SNAPC_FULL_TIMER_MAX 6
159
160 static double timer_start[SNAPC_FULL_TIMER_MAX];
161
162 #define SNAPC_FULL_CLEAR_TIMERS() \
163 { \
164 if(OPAL_UNLIKELY(orte_snapc_full_timing_enabled)) { \
165 snapc_full_clear_timers(); \
166 } \
167 }
168
169 #define SNAPC_FULL_SET_TIMER(idx) \
170 { \
171 if(OPAL_UNLIKELY(orte_snapc_full_timing_enabled)) { \
172 snapc_full_set_time(idx); \
173 } \
174 }
175
176 #define SNAPC_FULL_DISPLAY_ALL_TIMERS() \
177 { \
178 if(OPAL_UNLIKELY(orte_snapc_full_timing_enabled)) { \
179 snapc_full_display_all_timers(); \
180 } \
181 }
182 #define SNAPC_FULL_DISPLAY_RECOVERED_TIMER() \
183 { \
184 if(OPAL_UNLIKELY(orte_snapc_full_timing_enabled)) { \
185 snapc_full_display_recovered_timers(); \
186 } \
187 }
188
189
190
191
192 static void snapc_full_report_progress(orte_snapc_full_orted_snapshot_t *orted_snapshot,
193 int total,
194 int min_state);
195 static int report_progress_cur_loc_finished = 0;
196 static double report_progress_last_reported_loc_finished = 0;
197 #define SNAPC_FULL_REPORT_PROGRESS(orted, total, min_state) \
198 { \
199 if(OPAL_UNLIKELY(orte_snapc_full_progress_meter > 0)) { \
200 snapc_full_report_progress(orted, total, min_state); \
201 } \
202 }
203
204
205
206
207 int global_coord_init(void)
208 {
209 current_global_jobid = ORTE_JOBID_INVALID;
210 orte_snapc_base_snapshot_seq_number = -1;
211
212 orte_checkpoint_sender = orte_name_invalid;
213
214 SNAPC_FULL_CLEAR_TIMERS();
215
216 return ORTE_SUCCESS;
217 }
218
219 int global_coord_finalize(void)
220 {
221 current_global_jobid = ORTE_JOBID_INVALID;
222 orte_snapc_base_snapshot_seq_number = -1;
223
224 SNAPC_FULL_CLEAR_TIMERS();
225
226 return ORTE_SUCCESS;
227 }
228
229 int global_coord_setup_job(orte_jobid_t jobid) {
230 int ret, exit_status = ORTE_SUCCESS;
231 orte_job_t *jdata = NULL;
232
233
234
235
236
237
238
239
240
241
242 if( ORTE_JOBID_INVALID == current_global_jobid ) {
243 current_global_jobid = jobid;
244 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
245 "Global) Setup job %s as the Global Coordinator\n",
246 ORTE_JOBID_PRINT(jobid)));
247
248 SNAPC_FULL_CLEAR_TIMERS();
249 SNAPC_FULL_SET_TIMER(SNAPC_FULL_TIMER_START);
250 }
251
252 else if ( jobid == current_global_jobid ) {
253
254
255 if (NULL == (jdata = orte_get_job_data_object(current_global_jobid))) {
256 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
257 return ORTE_ERR_NOT_FOUND;
258 }
259
260 if (ORTE_FLAG_TEST(jdata, ORTE_JOB_FLAG_RESTART)) {
261 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
262 "Global) Restarting Job %s...",
263 ORTE_JOBID_PRINT(jobid)));
264 SNAPC_FULL_CLEAR_TIMERS();
265 SNAPC_FULL_SET_TIMER(SNAPC_FULL_TIMER_START);
266
267 if( ORTE_SUCCESS != (ret = global_refresh_job_structs()) ) {
268 ORTE_ERROR_LOG(ret);
269 return ret;
270 }
271 if( ORTE_SNAPC_LOCAL_COORD_TYPE == (orte_snapc_coord_type & ORTE_SNAPC_LOCAL_COORD_TYPE) ) {
272 return local_coord_setup_job(jobid);
273 }
274 return ORTE_SUCCESS;
275 }
276
277
278 if( !global_coord_has_local_children ) {
279 return ORTE_SUCCESS;
280 }
281
282 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
283 "Global) Setup job %s as the Local Coordinator\n",
284 ORTE_JOBID_PRINT(jobid)));
285 orte_snapc_coord_type |= ORTE_SNAPC_LOCAL_COORD_TYPE;
286 return local_coord_setup_job(jobid);
287 }
288
289 else {
290 opal_output(mca_snapc_full_component.super.output_handle,
291 "Global) Setup of job %s Failed! Already setup job %s\n",
292 ORTE_JOBID_PRINT(jobid), ORTE_JOBID_PRINT(current_global_jobid));
293 ORTE_ERROR_LOG(ORTE_ERROR);
294 return ORTE_ERROR;
295 }
296
297
298
299
300
301 orte_snapc_base_snapshot_seq_number = -1;
302
303
304
305
306 if( ORTE_SUCCESS != (ret = global_init_job_structs()) ) {
307 ORTE_ERROR_LOG(ret);
308 exit_status = ret;
309 goto cleanup;
310 }
311
312
313
314
315 if( ORTE_SUCCESS != (ret = snapc_full_global_start_listener()) ) {
316 ORTE_ERROR_LOG(ret);
317 exit_status = ret;
318 goto cleanup;
319 }
320
321
322
323
324 if( ORTE_SUCCESS != (ret = snapc_full_global_start_cmdline_listener()) ) {
325 ORTE_ERROR_LOG(ret);
326 exit_status = ret;
327 goto cleanup;
328 }
329
330
331
332
333 #if 0
334 if(orte_snapc_base_establish_global_snapshot_dir) {
335 opal_output(0, "Global) Error: Pre-establishment of snapshot directory currently not supported!");
336 ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED);
337
338 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
339 "Global) Pre-establish the global snapshot directory\n"));
340 if( ORTE_SUCCESS != (ret = snapc_full_establish_snapshot_dir(true))) {
341 ORTE_ERROR_LOG(ret);
342 exit_status = ret;
343 goto cleanup;
344 }
345 }
346 #endif
347
348 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
349 "Global) Finished setup of job %s ",
350 ORTE_JOBID_PRINT(jobid)));
351
352 cleanup:
353 return exit_status;
354 }
355
356 int global_coord_release_job(orte_jobid_t jobid) {
357 int ret, exit_status = ORTE_SUCCESS;
358
359
360
361
362 if( is_orte_checkpoint_connected ) {
363 if( ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_update_cmd(&orte_checkpoint_sender,
364 global_snapshot.ss_handle,
365 ORTE_SNAPC_CKPT_STATE_ERROR)) ) {
366 ORTE_ERROR_LOG(ret);
367 }
368 }
369
370
371
372
373 if( ORTE_SUCCESS != (ret = snapc_full_global_stop_cmdline_listener()) ) {
374 ORTE_ERROR_LOG(ret);
375 exit_status = ret;
376 }
377
378 if( ORTE_SUCCESS != (ret = snapc_full_global_stop_listener()) ) {
379 ORTE_ERROR_LOG(ret);
380 exit_status = ret;
381 }
382
383 OBJ_DESTRUCT(&global_snapshot);
384
385 return exit_status;
386 }
387
388 int global_coord_start_ckpt(orte_snapc_base_quiesce_t *datum)
389 {
390 int ret, exit_status = ORTE_SUCCESS;
391 orte_std_cntr_t i_proc;
392 orte_proc_t *proc = NULL;
393 orte_proc_t *new_proc = NULL;
394 opal_list_item_t *item = NULL;
395 opal_crs_base_ckpt_options_t *options = NULL;
396 char *tmp_str = NULL;
397
398 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
399 "Global) Starting checkpoint (internally requested)"));
400
401 orte_checkpoint_sender = orte_name_invalid;
402
403
404
405
406 if( datum->migrating ) {
407 currently_migrating = true;
408 if( NULL != migrating_procs ) {
409 while( NULL != (item = opal_list_remove_first(migrating_procs)) ) {
410 proc = (orte_proc_t*)item;
411 OBJ_RELEASE(proc);
412 }
413 } else {
414 migrating_procs = OBJ_NEW(opal_list_t);
415 }
416
417
418
419
420 for(i_proc = 0; i_proc < opal_pointer_array_get_size(&(datum->migrating_procs)); ++i_proc) {
421 proc = (orte_proc_t*)opal_pointer_array_get_item(&(datum->migrating_procs), i_proc);
422 if( NULL == proc ) {
423 continue;
424 }
425
426 new_proc = OBJ_NEW(orte_proc_t);
427 new_proc->name.jobid = proc->name.jobid;
428 new_proc->name.vpid = proc->name.vpid;
429 new_proc->node = OBJ_NEW(orte_node_t);
430 new_proc->node->name = proc->node->name;
431 opal_list_append(migrating_procs, &new_proc->super);
432 OBJ_RETAIN(new_proc);
433 }
434
435 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
436 "Global) SnapC Migrating Processes: (%d procs) [Updated]\n",
437 (int)opal_list_get_size(migrating_procs) ));
438 for (item = opal_list_get_first(migrating_procs);
439 item != opal_list_get_end(migrating_procs);
440 item = opal_list_get_next(item)) {
441 new_proc = (orte_proc_t*)item;
442 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
443 "\t\"%s\" [%s]\n",
444 ORTE_NAME_PRINT(&new_proc->name),new_proc->node->name));
445 }
446 }
447
448
449
450
451 options = OBJ_NEW(opal_crs_base_ckpt_options_t);
452 if( ORTE_SUCCESS != (ret = snapc_full_global_checkpoint(options) ) ) {
453 ORTE_ERROR_LOG(ret);
454 exit_status = ret;
455 goto cleanup;
456 }
457
458
459
460
461 while(((currently_migrating && current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_MIGRATING) ||
462 (!currently_migrating && current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_FINISHED_LOCAL)) &&
463 current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_ESTABLISHED &&
464 current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_RECOVERED &&
465 current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_ERROR &&
466 current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_NONE ) {
467 opal_progress();
468 }
469
470
471
472
473 datum->snapshot = OBJ_NEW(orte_snapc_base_global_snapshot_t);
474
475 datum->ss_handle = global_snapshot.ss_handle;
476 datum->ss_snapshot = OBJ_NEW(orte_sstore_base_global_snapshot_info_t);
477 if( ORTE_SUCCESS != (ret = orte_sstore.request_global_snapshot_data(&(datum->ss_handle), datum->ss_snapshot)) ) {
478 ORTE_ERROR_LOG(ret);
479 exit_status = ret;
480 goto cleanup;
481 }
482
483
484 orte_sstore.get_attr(global_snapshot.ss_handle,
485 SSTORE_METADATA_GLOBAL_SNAP_SEQ,
486 &tmp_str);
487 datum->epoch = atoi(tmp_str);
488
489 if( NULL != tmp_str ) {
490 free(tmp_str);
491 tmp_str = NULL;
492 }
493
494 cleanup:
495 if( NULL != options ) {
496 OBJ_RELEASE(options);
497 options = NULL;
498 }
499
500 return exit_status;
501 }
502
503 int global_coord_end_ckpt(orte_snapc_base_quiesce_t *datum)
504 {
505 int ret, exit_status = ORTE_SUCCESS;
506 opal_list_item_t* item = NULL;
507
508 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
509 "Global) Finishing checkpoint (internally requested) [%3d]",
510 current_job_ckpt_state));
511
512 if( currently_migrating ) {
513 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
514 "Global) End Ckpt: Flush the modex cached data\n"));
515
516
517
518
519 #if 0
520 if (OPAL_SUCCESS != (ret = opal_dstore.remove(NULL, NULL))) {
521 ORTE_ERROR_LOG(ret);
522 exit_status = ret;
523 goto cleanup;
524 }
525 #endif
526
527 SNAPC_FULL_SET_TIMER(SNAPC_FULL_TIMER_ESTABLISH);
528 if( ORTE_SUCCESS != (ret = orte_snapc_full_global_set_job_ckpt_info(current_global_jobid,
529 ORTE_SNAPC_CKPT_STATE_FINISHED_LOCAL,
530 global_snapshot.ss_handle,
531 true, NULL) ) ) {
532 ORTE_ERROR_LOG(ret);
533 exit_status = ret;
534 goto cleanup;
535 }
536 }
537
538 while(current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_RECOVERED &&
539 current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_ERROR &&
540 current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_NONE ) {
541 opal_progress();
542 }
543
544
545
546
547 if( ORTE_SUCCESS != (ret = global_refresh_job_structs()) ) {
548 ORTE_ERROR_LOG(ret);
549 exit_status = ret;
550 goto cleanup;
551 }
552
553 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
554 "Global) Finished checkpoint (internally requested) [%d]",
555 current_job_ckpt_state));
556
557 if( currently_migrating ) {
558 current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_NONE;
559 cleanup_on_establish = false;
560
561 report_progress_cur_loc_finished = 0;
562 report_progress_last_reported_loc_finished = 0;
563 }
564
565 cleanup:
566
567 currently_migrating = false;
568 if( NULL != migrating_procs ) {
569 while( NULL != (item = opal_list_remove_first(migrating_procs)) ) {
570 OBJ_RELEASE(item);
571 }
572 OBJ_RELEASE(migrating_procs);
573 migrating_procs = NULL;
574 }
575
576 return exit_status;
577 }
578
579
580
581
582 static int global_init_job_structs(void)
583 {
584 orte_snapc_full_orted_snapshot_t *orted_snapshot = NULL;
585 orte_snapc_base_local_snapshot_t *app_snapshot = NULL;
586 opal_list_item_t* orted_item = NULL;
587 orte_node_t *cur_node = NULL;
588 orte_job_map_t *map = NULL;
589 orte_job_t *jdata = NULL;
590 orte_proc_t **procs = NULL;
591 orte_std_cntr_t i = 0;
592 orte_vpid_t p = 0;
593 orte_ns_cmp_bitmask_t mask;
594 bool found = false;
595
596
597 if (NULL == (jdata = orte_get_job_data_object(current_global_jobid))) {
598 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
599 return ORTE_ERR_NOT_FOUND;
600 }
601
602 OBJ_CONSTRUCT(&global_snapshot, orte_snapc_base_global_snapshot_t);
603
604 map = jdata->map;
605
606 for (i=0; i < map->nodes->size; i++) {
607 if (NULL == (cur_node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, i))) {
608 continue;
609 }
610
611 procs = (orte_proc_t**)cur_node->procs->addr;
612
613
614
615
616
617 found = false;
618 for(orted_item = opal_list_get_first(&(global_snapshot.local_snapshots));
619 orted_item != opal_list_get_end(&(global_snapshot.local_snapshots));
620 orted_item = opal_list_get_next(orted_item) ) {
621 orted_snapshot = (orte_snapc_full_orted_snapshot_t*)orted_item;
622
623
624
625 if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL,
626 &(cur_node->daemon->name),
627 &(orted_snapshot->process_name) )) {
628 found = true;
629 break;
630 }
631 }
632 if( found ) {
633 OPAL_OUTPUT_VERBOSE((1, mca_snapc_full_component.super.output_handle,
634 "Global) [%d] Found Daemon %s with %d procs - Duplicate!! - Should not happen!",
635 i, ORTE_NAME_PRINT(&(cur_node->daemon->name)), cur_node->num_procs));
636 continue;
637 }
638
639 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
640 "Global) [%d] Found Daemon %s with %d procs",
641 i, ORTE_NAME_PRINT(&(cur_node->daemon->name)), cur_node->num_procs));
642
643 orted_snapshot = OBJ_NEW(orte_snapc_full_orted_snapshot_t);
644
645 orted_snapshot->process_name.jobid = cur_node->daemon->name.jobid;
646 orted_snapshot->process_name.vpid = cur_node->daemon->name.vpid;
647
648 mask = ORTE_NS_CMP_JOBID;
649
650 if (OPAL_EQUAL ==
651 orte_util_compare_name_fields(mask, &orted_snapshot->process_name, ORTE_PROC_MY_NAME)) {
652 global_coord_has_local_children = true;
653 }
654
655 for(p = 0; p < cur_node->num_procs; ++p) {
656 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
657 "Global) \t [%d] Found Process %s on Daemon %s",
658 p, ORTE_NAME_PRINT(&(procs[p]->name)), ORTE_NAME_PRINT(&(cur_node->daemon->name)) ));
659
660 app_snapshot = OBJ_NEW(orte_snapc_base_local_snapshot_t);
661
662 app_snapshot->process_name.jobid = procs[p]->name.jobid;
663 app_snapshot->process_name.vpid = procs[p]->name.vpid;
664
665 opal_list_append(&(orted_snapshot->super.local_snapshots), &(app_snapshot->super));
666 }
667
668
669 opal_list_append(&global_snapshot.local_snapshots, &(orted_snapshot->super.super));
670 }
671
672 return ORTE_SUCCESS;
673 }
674
675 static int global_refresh_job_structs(void)
676 {
677 orte_snapc_full_orted_snapshot_t *orted_snapshot = NULL;
678 orte_snapc_base_local_snapshot_t *app_snapshot = NULL;
679 opal_list_item_t* orted_item = NULL;
680 opal_list_item_t* app_item = NULL;
681 opal_list_item_t* item = NULL;
682 orte_node_t *cur_node = NULL;
683 orte_job_map_t *map = NULL;
684 orte_job_t *jdata = NULL;
685 orte_proc_t **procs = NULL;
686 orte_proc_t *new_proc = NULL;
687 orte_std_cntr_t i = 0;
688 orte_vpid_t p = 0;
689 bool found = false;
690 orte_ns_cmp_bitmask_t mask;
691
692
693 if (NULL == (jdata = orte_get_job_data_object(current_global_jobid))) {
694 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
695 return ORTE_ERR_NOT_FOUND;
696 }
697
698 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
699 "Global) Refreshing Job Structures... [%3d]",
700 current_job_ckpt_state));
701
702 if( NULL != migrating_procs ) {
703 for (item = opal_list_get_first(migrating_procs);
704 item != opal_list_get_end(migrating_procs);
705 item = opal_list_get_next(item)) {
706 new_proc = (orte_proc_t*)item;
707
708
709
710
711 found = false;
712 for(orted_item = opal_list_get_first(&(global_snapshot.local_snapshots));
713 orted_item != opal_list_get_end(&(global_snapshot.local_snapshots));
714 orted_item = opal_list_get_next(orted_item) ) {
715 orted_snapshot = (orte_snapc_full_orted_snapshot_t*)orted_item;
716
717
718
719
720 for(app_item = opal_list_get_first(&(orted_snapshot->super.local_snapshots));
721 app_item != opal_list_get_end(&(orted_snapshot->super.local_snapshots));
722 app_item = opal_list_get_next(app_item) ) {
723 app_snapshot = (orte_snapc_base_local_snapshot_t*)app_item;
724
725 if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL,
726 &(new_proc->name),
727 &(app_snapshot->process_name) )) {
728 found = true;
729 opal_list_remove_item(&(orted_snapshot->super.local_snapshots), app_item);
730 break;
731 }
732 }
733
734 if( found ) {
735 break;
736 }
737 }
738 }
739 }
740
741
742
743
744
745 map = jdata->map;
746 for(orted_item = opal_list_get_first(&(global_snapshot.local_snapshots));
747 orted_item != opal_list_get_end(&(global_snapshot.local_snapshots));
748 orted_item = opal_list_get_next(orted_item) ) {
749 orted_snapshot = (orte_snapc_full_orted_snapshot_t*)orted_item;
750
751
752 found = false;
753 for (i=0; i < map->nodes->size; i++) {
754 if (NULL == (cur_node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, i))) {
755 continue;
756 }
757
758 if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL,
759 &(cur_node->daemon->name),
760 &(orted_snapshot->process_name) )) {
761 found = true;
762 break;
763 }
764 }
765
766 if( !found ) {
767 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
768 "Global) Found Empty Daemon %s not in map (Refresh)",
769 ORTE_NAME_PRINT(&(orted_snapshot->process_name)) ));
770 while( NULL != (item = opal_list_remove_first(&(orted_snapshot->super.local_snapshots))) ) {
771 OBJ_RELEASE(item);
772 }
773 }
774 }
775
776
777
778
779 for (i=0; i < map->nodes->size; i++) {
780 if (NULL == (cur_node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, i))) {
781 continue;
782 }
783
784 procs = (orte_proc_t**)cur_node->procs->addr;
785
786
787
788
789
790 found = false;
791 for(orted_item = opal_list_get_first(&(global_snapshot.local_snapshots));
792 orted_item != opal_list_get_end(&(global_snapshot.local_snapshots));
793 orted_item = opal_list_get_next(orted_item) ) {
794 orted_snapshot = (orte_snapc_full_orted_snapshot_t*)orted_item;
795
796 if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL,
797 &(cur_node->daemon->name),
798 &(orted_snapshot->process_name) )) {
799 found = true;
800 break;
801 }
802 }
803 if( found ) {
804 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
805 "Global) [%d] Found Daemon %s with %d procs (Refresh)",
806 i, ORTE_NAME_PRINT(&(cur_node->daemon->name)), cur_node->num_procs));
807
808
809 while( NULL != (item = opal_list_remove_first(&(orted_snapshot->super.local_snapshots))) ) {
810 OBJ_RELEASE(item);
811 }
812
813
814 for(p = 0; p < cur_node->num_procs; ++p) {
815 if( NULL == procs[p] ) {
816 continue;
817 }
818
819 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
820 "Global) \t [%d] Found Process %s on Daemon %s",
821 p, ORTE_NAME_PRINT(&(procs[p]->name)), ORTE_NAME_PRINT(&(cur_node->daemon->name)) ));
822
823 app_snapshot = OBJ_NEW(orte_snapc_base_local_snapshot_t);
824
825 app_snapshot->process_name.jobid = procs[p]->name.jobid;
826 app_snapshot->process_name.vpid = procs[p]->name.vpid;
827
828 opal_list_append(&(orted_snapshot->super.local_snapshots), &(app_snapshot->super));
829 }
830
831 continue;
832 }
833
834 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
835 "Global) [%d] Found Daemon %s with %d procs",
836 i, ORTE_NAME_PRINT(&(cur_node->daemon->name)), cur_node->num_procs));
837
838 orted_snapshot = OBJ_NEW(orte_snapc_full_orted_snapshot_t);
839
840 orted_snapshot->process_name.jobid = cur_node->daemon->name.jobid;
841 orted_snapshot->process_name.vpid = cur_node->daemon->name.vpid;
842
843 mask = ORTE_NS_CMP_ALL;
844
845 if (OPAL_EQUAL ==
846 orte_util_compare_name_fields(mask, &orted_snapshot->process_name, ORTE_PROC_MY_NAME)) {
847 global_coord_has_local_children = true;
848 }
849 for(p = 0; p < cur_node->num_procs; ++p) {
850 if( NULL == procs[p] ) {
851 continue;
852 }
853
854 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
855 "Global) \t [%d] Found Process %s on Daemon %s",
856 p, ORTE_NAME_PRINT(&(procs[p]->name)), ORTE_NAME_PRINT(&(cur_node->daemon->name)) ));
857
858 app_snapshot = OBJ_NEW(orte_snapc_base_local_snapshot_t);
859
860 app_snapshot->process_name.jobid = procs[p]->name.jobid;
861 app_snapshot->process_name.vpid = procs[p]->name.vpid;
862
863 opal_list_append(&(orted_snapshot->super.local_snapshots), &(app_snapshot->super));
864 }
865
866 opal_list_append(&global_snapshot.local_snapshots, &(orted_snapshot->super.super));
867 }
868
869 return ORTE_SUCCESS;
870 }
871
872
873
874
875 static int snapc_full_global_start_listener(void)
876 {
877 if (snapc_orted_recv_issued && ORTE_PROC_IS_HNP) {
878 return ORTE_SUCCESS;
879 }
880
881 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
882 "Global) Startup Coordinator Channel"));
883
884
885
886
887 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SNAPC_FULL,
888 ORTE_RML_PERSISTENT, snapc_full_global_orted_recv, NULL);
889
890 snapc_orted_recv_issued = true;
891
892 return ORTE_SUCCESS;
893 }
894
895 static int snapc_full_global_stop_listener(void)
896 {
897 if (!snapc_orted_recv_issued && ORTE_PROC_IS_HNP) {
898 return ORTE_SUCCESS;
899 }
900
901 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
902 "Global) Shutdown Coordinator Channel"));
903
904 orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SNAPC_FULL);
905
906 snapc_orted_recv_issued = false;
907 return ORTE_SUCCESS;
908 }
909
910 static int snapc_full_global_start_cmdline_listener(void)
911 {
912 if (snapc_cmdline_recv_issued && ORTE_PROC_IS_HNP) {
913 return ORTE_SUCCESS;
914 }
915
916 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
917 "Global) Startup Command Line Channel"));
918
919
920
921
922 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_CKPT, 0,
923 snapc_full_global_cmdline_recv, NULL);
924
925 snapc_cmdline_recv_issued = true;
926 return ORTE_SUCCESS;
927 }
928
929 static int snapc_full_global_stop_cmdline_listener(void)
930 {
931 if (!snapc_cmdline_recv_issued && ORTE_PROC_IS_HNP) {
932 return ORTE_SUCCESS;
933 }
934
935 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
936 "Global) Shutdown Command Line Channel"));
937
938 orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_CKPT);
939
940 snapc_cmdline_recv_issued = false;
941 return ORTE_SUCCESS;
942 }
943
944
945
946
947 static void snapc_full_global_cmdline_recv(int status,
948 orte_process_name_t* sender,
949 opal_buffer_t* buffer,
950 orte_rml_tag_t tag,
951 void* cbdata)
952 {
953 int ret;
954 orte_snapc_cmd_flag_t command;
955 orte_std_cntr_t count = 1;
956 orte_jobid_t jobid;
957 opal_crs_base_ckpt_options_t *options = NULL;
958
959 if( ORTE_RML_TAG_CKPT != tag ) {
960 opal_output(mca_snapc_full_component.super.output_handle,
961 "Global) Error: Unknown tag: Received a command message from %s (tag = %d).",
962 ORTE_NAME_PRINT(sender), tag);
963 ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
964 return;
965 }
966
967 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
968 "Global) Command Line: Start a checkpoint operation [Sender = %s]",
969 ORTE_NAME_PRINT(sender)));
970
971 snapc_cmdline_recv_issued = false;
972
973 options = OBJ_NEW(opal_crs_base_ckpt_options_t);
974
975 count = 1;
976 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SNAPC_CMD))) {
977 ORTE_ERROR_LOG(ret);
978 goto cleanup;
979 }
980
981
982
983
984 if (ORTE_SNAPC_GLOBAL_INIT_CMD == command) {
985 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
986 "Global) Command line requested a checkpoint [command %d]\n",
987 command));
988
989
990
991
992 if( ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_init_cmd(sender,
993 buffer,
994 options,
995 &jobid)) ) {
996 ORTE_ERROR_LOG(ret);
997 goto cleanup;
998 }
999
1000 orte_checkpoint_sender = *sender;
1001 is_orte_checkpoint_connected = true;
1002
1003
1004
1005
1006
1007 if( !is_app_checkpointable ) {
1008 OPAL_OUTPUT_VERBOSE((1, mca_snapc_full_component.super.output_handle,
1009 "Global) request_cmd(): Checkpointing currently disabled, rejecting request"));
1010 if( ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_update_cmd(&orte_checkpoint_sender,
1011 0,
1012 ORTE_SNAPC_CKPT_STATE_ERROR))) {
1013 ORTE_ERROR_LOG(ret);
1014 }
1015
1016 orte_checkpoint_sender = orte_name_invalid;
1017 is_orte_checkpoint_connected = false;
1018
1019
1020 if( ORTE_SUCCESS != (ret = snapc_full_global_start_cmdline_listener() ) ){
1021 ORTE_ERROR_LOG(ret);
1022 }
1023
1024 goto cleanup;
1025 }
1026
1027
1028
1029
1030 if( ORTE_JOBID_INVALID != jobid && jobid != current_global_jobid) {
1031 opal_output(mca_snapc_full_component.super.output_handle,
1032 "Global) Error: Jobid %s does not match the current jobid %s",
1033 ORTE_JOBID_PRINT(jobid), ORTE_JOBID_PRINT(current_global_jobid));
1034 ORTE_ERROR_LOG(ORTE_ERROR);
1035 goto cleanup;
1036 }
1037
1038
1039
1040
1041 SNAPC_FULL_CLEAR_TIMERS();
1042 SNAPC_FULL_SET_TIMER(SNAPC_FULL_TIMER_START);
1043
1044 if( ORTE_SUCCESS != (ret = snapc_full_global_checkpoint(options) ) ) {
1045 ORTE_ERROR_LOG(ret);
1046 goto cleanup;
1047 }
1048
1049 }
1050
1051
1052
1053 else if (ORTE_SNAPC_GLOBAL_TERM_CMD == command) {
1054 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1055 "Global) Command line requested to terminate connection (command %d)\n",
1056 command));
1057 ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED);
1058 goto cleanup;
1059 }
1060
1061
1062
1063 else {
1064 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1065 "Global) Command line sent an unknown command (command %d)\n",
1066 command));
1067 ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED);
1068 goto cleanup;
1069 }
1070
1071 cleanup:
1072 if( NULL != options ) {
1073 OBJ_RELEASE(options);
1074 options = NULL;
1075 }
1076
1077 return;
1078 }
1079
1080 void snapc_full_global_orted_recv(int status,
1081 orte_process_name_t* sender,
1082 opal_buffer_t* buffer,
1083 orte_rml_tag_t tag,
1084 void* cbdata)
1085 {
1086 int ret;
1087 orte_snapc_full_cmd_flag_t command;
1088 orte_std_cntr_t count;
1089 static int num_inside = 0;
1090
1091 if( ORTE_RML_TAG_SNAPC_FULL != tag ) {
1092 opal_output(mca_snapc_full_component.super.output_handle,
1093 "Global) Error: Unknown tag: Received a command message from %s (tag = %d).",
1094 ORTE_NAME_PRINT(sender), tag);
1095 ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
1096 return;
1097 }
1098
1099
1100
1101
1102 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1103 "Global) Receive a command message from %s.",
1104 ORTE_NAME_PRINT(sender)));
1105
1106 count = 1;
1107 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SNAPC_FULL_CMD))) {
1108 ORTE_ERROR_LOG(ret);
1109 return;
1110 }
1111
1112 ++num_inside;
1113
1114 switch (command) {
1115 case ORTE_SNAPC_FULL_UPDATE_JOB_STATE_QUICK_CMD:
1116 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1117 "Global) Command: Job State Update (quick)"));
1118
1119 snapc_full_process_job_update_cmd(sender, buffer, true);
1120 break;
1121
1122 case ORTE_SNAPC_FULL_UPDATE_JOB_STATE_CMD:
1123 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1124 "Global) Command: Job State Update"));
1125
1126 snapc_full_process_job_update_cmd(sender, buffer, false);
1127 break;
1128
1129 case ORTE_SNAPC_FULL_UPDATE_ORTED_STATE_QUICK_CMD:
1130 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1131 "Global) Command: Daemon State Update (quick)"));
1132
1133 snapc_full_process_orted_update_cmd(sender, buffer, true);
1134 break;
1135
1136 case ORTE_SNAPC_FULL_UPDATE_ORTED_STATE_CMD:
1137 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1138 "Global) Command: Daemon State Update"));
1139
1140 snapc_full_process_orted_update_cmd(sender, buffer, false);
1141 break;
1142
1143 case ORTE_SNAPC_FULL_RESTART_PROC_INFO:
1144 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1145 "Global) Command: Update hostname/pid associations"));
1146
1147 snapc_full_process_restart_proc_info_cmd(sender, buffer);
1148 break;
1149
1150 case ORTE_SNAPC_FULL_REQUEST_OP_CMD:
1151 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1152 "Global) Command: Request Op"));
1153
1154 snapc_full_process_request_op_cmd(sender, buffer);
1155 break;
1156
1157 default:
1158 ORTE_ERROR_LOG(ORTE_ERR_VALUE_OUT_OF_BOUNDS);
1159 }
1160
1161 return;
1162 }
1163
1164 static void snapc_full_process_request_op_cmd(orte_process_name_t* sender,
1165 opal_buffer_t* sbuffer)
1166 {
1167 int ret;
1168 orte_std_cntr_t count = 1;
1169 orte_jobid_t jobid;
1170 int op_event, op_state;
1171 opal_crs_base_ckpt_options_t *options = NULL;
1172 opal_buffer_t *buffer = NULL;
1173 orte_snapc_full_cmd_flag_t command = ORTE_SNAPC_FULL_REQUEST_OP_CMD;
1174 int seq_num = -1, i;
1175 char * global_handle = NULL, *tmp_str = NULL;
1176 orte_snapc_base_request_op_t *datum = NULL;
1177
1178 orte_checkpoint_sender = orte_name_invalid;
1179
1180 count = 1;
1181 if (ORTE_SUCCESS != (ret = opal_dss.unpack(sbuffer, &jobid, &count, ORTE_JOBID))) {
1182 ORTE_ERROR_LOG(ret);
1183 goto cleanup;
1184 }
1185
1186 count = 1;
1187 if (ORTE_SUCCESS != (ret = opal_dss.unpack(sbuffer, &op_event, &count, OPAL_INT))) {
1188 ORTE_ERROR_LOG(ret);
1189 goto cleanup;
1190 }
1191
1192 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1193 "Global) process_request_op(): Op Code %2d\n",
1194 op_event));
1195
1196
1197
1198
1199 if( ORTE_SNAPC_OP_INIT == op_event ) {
1200 OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
1201 "Global) process_request_op(): Checkpointing Enabled (%2d)\n",
1202 op_event));
1203 is_app_checkpointable = true;
1204 }
1205
1206
1207
1208 else if( ORTE_SNAPC_OP_FIN == op_event ) {
1209 OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
1210 "Global) process_request_op(): Checkpointing Disabled (%2d)\n",
1211 op_event));
1212 is_app_checkpointable = false;
1213
1214
1215
1216
1217 if( current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_ERROR &&
1218 current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_NONE ) {
1219 OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
1220 "Global) process_request_op(): Wait for ongoing checkpoint to complete..."));
1221 while( current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_ERROR &&
1222 current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_NONE ) {
1223 opal_progress();
1224 }
1225 }
1226
1227
1228
1229
1230 OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
1231 "Global) process_request_op(): Send Finalize ACK to the job"));
1232
1233 buffer = OBJ_NEW(opal_buffer_t);
1234 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
1235 ORTE_ERROR_LOG(ret);
1236 goto cleanup;
1237 }
1238
1239 op_event = ORTE_SNAPC_OP_FIN_ACK;
1240 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &op_event, 1, OPAL_INT))) {
1241 ORTE_ERROR_LOG(ret);
1242 goto cleanup;
1243 }
1244
1245 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(sender, buffer, ORTE_RML_TAG_SNAPC_FULL,
1246 orte_rml_send_callback, NULL))) {
1247 ORTE_ERROR_LOG(ret);
1248 goto cleanup;
1249 }
1250
1251 buffer = NULL;
1252 }
1253
1254
1255
1256 else if( ORTE_SNAPC_OP_CHECKPOINT == op_event ) {
1257 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1258 "Global) process_request_op(): Starting checkpoint (%2d)\n",
1259 op_event));
1260
1261 options = OBJ_NEW(opal_crs_base_ckpt_options_t);
1262 if( ORTE_SUCCESS != (ret = snapc_full_global_checkpoint(options) ) ) {
1263 ORTE_ERROR_LOG(ret);
1264 goto cleanup;
1265 }
1266
1267
1268
1269
1270 while( current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_ERROR &&
1271 current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_NONE ) {
1272 opal_progress();
1273 }
1274
1275 if( ORTE_SNAPC_CKPT_STATE_ERROR == current_job_ckpt_state ) {
1276 op_state = -1;
1277 } else {
1278 op_state = 0;
1279 }
1280
1281
1282
1283
1284 buffer = OBJ_NEW(opal_buffer_t);
1285 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
1286 ORTE_ERROR_LOG(ret);
1287 goto cleanup;
1288 }
1289
1290 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &op_event, 1, OPAL_INT))) {
1291 ORTE_ERROR_LOG(ret);
1292 goto cleanup;
1293 }
1294
1295 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &op_state, 1, OPAL_INT))) {
1296 ORTE_ERROR_LOG(ret);
1297 goto cleanup;
1298 }
1299
1300 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(sender, buffer, ORTE_RML_TAG_SNAPC_FULL,
1301 orte_rml_send_callback, NULL))) {
1302 ORTE_ERROR_LOG(ret);
1303 goto cleanup;
1304 }
1305
1306 buffer = NULL;
1307 }
1308
1309
1310
1311 else if( ORTE_SNAPC_OP_RESTART == op_event ) {
1312 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1313 "Global) process_request_op(): Starting restart (%2d)\n",
1314 op_event));
1315
1316 count = 1;
1317 if (ORTE_SUCCESS != (ret = opal_dss.unpack(sbuffer, &seq_num, &count, OPAL_INT))) {
1318 ORTE_ERROR_LOG(ret);
1319 orte_snapc_ckpt_state_notify(ORTE_SNAPC_CKPT_STATE_NO_RESTART);
1320 goto cleanup;
1321 }
1322
1323 count = 1;
1324 if (ORTE_SUCCESS != (ret = opal_dss.unpack(sbuffer, &global_handle, &count, OPAL_STRING))) {
1325 ORTE_ERROR_LOG(ret);
1326 orte_snapc_ckpt_state_notify(ORTE_SNAPC_CKPT_STATE_NO_RESTART);
1327 goto cleanup;
1328 }
1329
1330
1331
1332
1333 if( ORTE_SUCCESS != (ret = orte_errmgr_base_restart_job(current_global_jobid, global_handle, seq_num) ) ) {
1334 ORTE_ERROR_LOG(ret);
1335 orte_snapc_ckpt_state_notify(ORTE_SNAPC_CKPT_STATE_NO_RESTART);
1336 goto cleanup;
1337 }
1338 }
1339
1340
1341
1342 else if( ORTE_SNAPC_OP_MIGRATE == op_event ) {
1343 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1344 "Global) process_request_op(): Starting migration (%2d)\n",
1345 op_event));
1346
1347 datum = OBJ_NEW(orte_snapc_base_request_op_t);
1348
1349
1350
1351
1352 count = 1;
1353 if (ORTE_SUCCESS != (ret = opal_dss.unpack(sbuffer, &(datum->mig_num), &count, OPAL_INT))) {
1354 ORTE_ERROR_LOG(ret);
1355 goto cleanup;
1356 }
1357
1358 datum->mig_vpids = malloc(sizeof(int) * datum->mig_num);
1359 datum->mig_host_pref = malloc(sizeof(char) * datum->mig_num * OPAL_MAX_PROCESSOR_NAME);
1360 datum->mig_vpid_pref = malloc(sizeof(int) * datum->mig_num);
1361 datum->mig_off_node = malloc(sizeof(int) * datum->mig_num);
1362
1363 for( i = 0; i < datum->mig_num; ++i ) {
1364 (datum->mig_vpids)[i] = 0;
1365 (datum->mig_host_pref)[i][0] = '\0';
1366 (datum->mig_vpid_pref)[i] = 0;
1367 (datum->mig_off_node)[i] = (int)false;
1368 }
1369
1370 for( i = 0; i < datum->mig_num; ++i ) {
1371 count = 1;
1372 if (ORTE_SUCCESS != (ret = opal_dss.unpack(sbuffer, &((datum->mig_vpids)[i]), &count, OPAL_INT))) {
1373 ORTE_ERROR_LOG(ret);
1374 goto cleanup;
1375 }
1376
1377 if(NULL != tmp_str ) {
1378 free(tmp_str);
1379 tmp_str = NULL;
1380 }
1381 count = 1;
1382 if (ORTE_SUCCESS != (ret = opal_dss.unpack(sbuffer, &tmp_str, &count, OPAL_STRING))) {
1383 ORTE_ERROR_LOG(ret);
1384 goto cleanup;
1385 }
1386 opal_string_copy( ((datum->mig_host_pref)[i]), tmp_str, OPAL_MAX_PROCESSOR_NAME);
1387
1388 count = 1;
1389 if (ORTE_SUCCESS != (ret = opal_dss.unpack(sbuffer, &((datum->mig_vpid_pref)[i]), &count, OPAL_INT))) {
1390 ORTE_ERROR_LOG(ret);
1391 goto cleanup;
1392 }
1393
1394 count = 1;
1395 if (ORTE_SUCCESS != (ret = opal_dss.unpack(sbuffer, &((datum->mig_off_node)[i]), &count, OPAL_INT))) {
1396 ORTE_ERROR_LOG(ret);
1397 goto cleanup;
1398 }
1399
1400 OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
1401 "Global) Migration %3d/%3d: Received Rank %3d - Requested <%s> (%3d) %c\n",
1402 datum->mig_num, i,
1403 (datum->mig_vpids)[i],
1404 (datum->mig_host_pref)[i],
1405 (datum->mig_vpid_pref)[i],
1406 (OPAL_INT_TO_BOOL((datum->mig_off_node)[i]) ? 'T' : 'F')
1407 ));
1408 }
1409
1410
1411
1412
1413 OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
1414 "Global) ------ Kick Off Migration -----"));
1415 if( ORTE_SUCCESS != (ret = orte_errmgr_base_migrate_job(current_global_jobid, datum) ) ) {
1416 ORTE_ERROR_LOG(ret);
1417 goto cleanup;
1418 }
1419
1420
1421
1422
1423 OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
1424 "Global) ------ Finished Migration. Release processes (%15s )-----",
1425 ORTE_NAME_PRINT(sender) ));
1426 buffer = OBJ_NEW(opal_buffer_t);
1427 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
1428 ORTE_ERROR_LOG(ret);
1429 goto cleanup;
1430 }
1431
1432 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &op_event, 1, OPAL_INT))) {
1433 ORTE_ERROR_LOG(ret);
1434 goto cleanup;
1435 }
1436
1437 op_state = 0;
1438 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &op_state, 1, OPAL_INT))) {
1439 ORTE_ERROR_LOG(ret);
1440 goto cleanup;
1441 }
1442
1443 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(sender, buffer, ORTE_RML_TAG_SNAPC_FULL,
1444 orte_rml_send_callback, NULL))) {
1445 ORTE_ERROR_LOG(ret);
1446 goto cleanup;
1447 }
1448
1449 OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
1450 "Global) ------ Finished Migration. Released processes (%15s )-----",
1451 ORTE_NAME_PRINT(sender) ));
1452 }
1453
1454
1455
1456 else if( ORTE_SNAPC_OP_QUIESCE_START == op_event) {
1457 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1458 "Global) process_request_op(): Starting quiesce (%2d)\n",
1459 op_event));
1460
1461 options = OBJ_NEW(opal_crs_base_ckpt_options_t);
1462 options->inc_prep_only = true;
1463 if( ORTE_SUCCESS != (ret = snapc_full_global_checkpoint(options) ) ) {
1464 ORTE_ERROR_LOG(ret);
1465 goto cleanup;
1466 }
1467
1468
1469
1470
1471 while( current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_ERROR &&
1472 current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_INC_PREPED ) {
1473 opal_progress();
1474 }
1475
1476 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1477 "Global) process_request_op(): Quiesce_start finished(%2d)\n",
1478 op_event));
1479 }
1480
1481
1482
1483 else if( ORTE_SNAPC_OP_QUIESCE_END == op_event) {
1484 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1485 "Global) process_request_op(): Ending quiesce (%2d)\n",
1486 op_event));
1487
1488
1489
1490
1491 while( current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_ERROR &&
1492 current_job_ckpt_state != ORTE_SNAPC_CKPT_STATE_NONE ) {
1493 opal_progress();
1494 }
1495
1496 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1497 "Global) process_request_op(): Quiesce_end finished(%2d)\n",
1498 op_event));
1499 }
1500
1501 cleanup:
1502 if (NULL != buffer) {
1503 OBJ_RELEASE(buffer);
1504 buffer = NULL;
1505 }
1506
1507 if( NULL != options ) {
1508 OBJ_RELEASE(options);
1509 options = NULL;
1510 }
1511
1512 if(NULL != tmp_str ) {
1513 free(tmp_str);
1514 tmp_str = NULL;
1515 }
1516
1517 return;
1518 }
1519
1520 static int snapc_full_process_orted_update_cmd(orte_process_name_t* sender,
1521 opal_buffer_t* buffer,
1522 bool quick)
1523 {
1524 int ret, exit_status = ORTE_SUCCESS;
1525 orte_std_cntr_t count;
1526 int remote_ckpt_state;
1527 opal_list_item_t* item = NULL;
1528 opal_list_item_t* aitem = NULL;
1529 orte_snapc_full_orted_snapshot_t *orted_snapshot = NULL;
1530 orte_snapc_base_local_snapshot_t *app_snapshot = NULL;
1531 int loc_min_state;
1532 char *state_str = NULL;
1533
1534 orted_snapshot = find_orted_snapshot(sender);
1535 if( NULL == orted_snapshot ) {
1536 opal_output(mca_snapc_full_component.super.output_handle,
1537 "Global) Error: Unknown Daemon %s",
1538 ORTE_NAME_PRINT(sender) );
1539 exit_status = ORTE_ERROR;
1540 ORTE_ERROR_LOG(ORTE_ERROR);
1541 goto cleanup;
1542 }
1543
1544 OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
1545 "Global) Daemon %s: Changed state to:\n",
1546 ORTE_NAME_PRINT(&(orted_snapshot->process_name)) ));
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557 count = 1;
1558 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &remote_ckpt_state, &count, OPAL_INT))) {
1559 ORTE_ERROR_LOG(ret);
1560 exit_status = ret;
1561 goto cleanup;
1562 }
1563 orted_snapshot->state = remote_ckpt_state;
1564 orte_snapc_ckpt_state_str(&state_str, orted_snapshot->state);
1565 OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
1566 "Global) State: %d (%s)\n",
1567 (int)(orted_snapshot->state), state_str));
1568 free(state_str);
1569 state_str = NULL;
1570
1571
1572
1573
1574
1575
1576 if( quick ) {
1577 exit_status = ORTE_SUCCESS;
1578 goto post_process;
1579 }
1580
1581 post_process:
1582 loc_min_state = snapc_full_global_get_min_state();
1583
1584 SNAPC_FULL_REPORT_PROGRESS(orted_snapshot, current_total_orteds, loc_min_state);
1585
1586
1587
1588
1589
1590 if( ORTE_SNAPC_CKPT_STATE_RUNNING == loc_min_state &&
1591 ORTE_SNAPC_CKPT_STATE_RUNNING != current_job_ckpt_state) {
1592 current_job_ckpt_state = loc_min_state;
1593
1594 SNAPC_FULL_SET_TIMER(SNAPC_FULL_TIMER_RUNNING);
1595
1596 if( is_orte_checkpoint_connected &&
1597 ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_update_cmd(&orte_checkpoint_sender,
1598 global_snapshot.ss_handle,
1599 current_job_ckpt_state)) ) {
1600 ORTE_ERROR_LOG(ret);
1601 exit_status = ret;
1602 goto cleanup;
1603 }
1604 }
1605
1606
1607
1608
1609 if( ORTE_SNAPC_CKPT_STATE_INC_PREPED == loc_min_state &&
1610 ORTE_SNAPC_CKPT_STATE_INC_PREPED > current_job_ckpt_state) {
1611 current_job_ckpt_state = loc_min_state;
1612
1613 OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
1614 "Global) All Processes have finished the INC prep!\n"));
1615 }
1616
1617
1618
1619
1620
1621 if( ORTE_SNAPC_CKPT_STATE_STOPPED == loc_min_state &&
1622 ORTE_SNAPC_CKPT_STATE_STOPPED > current_job_ckpt_state) {
1623 current_job_ckpt_state = loc_min_state;
1624
1625 OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
1626 "Global) All Processes have been stopped!\n"));
1627
1628 if( is_orte_checkpoint_connected &&
1629 ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_update_cmd(&orte_checkpoint_sender,
1630 global_snapshot.ss_handle,
1631 current_job_ckpt_state)) ) {
1632 ORTE_ERROR_LOG(ret);
1633 exit_status = ret;
1634 goto cleanup;
1635 }
1636
1637
1638 is_orte_checkpoint_connected = false;
1639
1640
1641
1642
1643 write_out_global_metadata();
1644 }
1645
1646
1647
1648
1649 if( ORTE_SNAPC_CKPT_STATE_FINISHED_LOCAL == loc_min_state &&
1650 ORTE_SNAPC_CKPT_STATE_FINISHED_LOCAL > current_job_ckpt_state) {
1651
1652 SNAPC_FULL_SET_TIMER(SNAPC_FULL_TIMER_FIN_LOCAL);
1653
1654 if( ORTE_SNAPC_CKPT_STATE_NONE != current_job_ckpt_state ) {
1655 if( loc_min_state == current_job_ckpt_state) {
1656 opal_output(0, "Global) JJH WARNING!!: (%d) == (%d)", loc_min_state, current_job_ckpt_state);
1657 }
1658 }
1659
1660 if( currently_migrating ) {
1661 write_out_global_metadata();
1662 current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_MIGRATING;
1663 }
1664 else {
1665 current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_FINISHED_LOCAL;
1666 }
1667
1668 if( NULL != state_str ) {
1669 free(state_str);
1670 }
1671 orte_snapc_ckpt_state_str(&state_str, current_job_ckpt_state);
1672 OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
1673 "Global) Job State Changed: %d (%s)\n",
1674 (int)current_job_ckpt_state, state_str ));
1675 free(state_str);
1676 state_str = NULL;
1677
1678 if( ORTE_SUCCESS != (ret = orte_snapc_full_global_set_job_ckpt_info(current_global_jobid,
1679 current_job_ckpt_state,
1680 global_snapshot.ss_handle,
1681 true, NULL) ) ) {
1682 ORTE_ERROR_LOG(ret);
1683 exit_status = ret;
1684 goto cleanup;
1685 }
1686
1687
1688
1689
1690
1691
1692
1693 if( !(global_snapshot.options->stop) && !currently_migrating ) {
1694 write_out_global_metadata();
1695 }
1696
1697 SNAPC_FULL_SET_TIMER(SNAPC_FULL_TIMER_ESTABLISH);
1698
1699 if( ORTE_SUCCESS != (ret = orte_snapc_full_global_set_job_ckpt_info(current_global_jobid,
1700 ORTE_SNAPC_CKPT_STATE_ESTABLISHED,
1701 global_snapshot.ss_handle,
1702 true, NULL) ) ) {
1703 ORTE_ERROR_LOG(ret);
1704 exit_status = ret;
1705 goto cleanup;
1706 }
1707 }
1708
1709
1710
1711
1712
1713
1714 if( ORTE_SNAPC_CKPT_STATE_RECOVERED == loc_min_state &&
1715 ORTE_SNAPC_CKPT_STATE_RECOVERED > current_job_ckpt_state ) {
1716
1717
1718
1719
1720 if( current_job_ckpt_state == ORTE_SNAPC_CKPT_STATE_NONE ) {
1721 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1722 "Global) Job has been successfully restarted"));
1723
1724
1725 orte_snapc_ckpt_state_notify(ORTE_SNAPC_CKPT_STATE_RECOVERED);
1726
1727 for(item = opal_list_get_first(&(global_snapshot.local_snapshots));
1728 item != opal_list_get_end(&(global_snapshot.local_snapshots));
1729 item = opal_list_get_next(item) ) {
1730 orted_snapshot = (orte_snapc_full_orted_snapshot_t*)item;
1731
1732 orted_snapshot->state = ORTE_SNAPC_CKPT_STATE_NONE;
1733
1734 for(aitem = opal_list_get_first(&(orted_snapshot->super.local_snapshots));
1735 aitem != opal_list_get_end(&(orted_snapshot->super.local_snapshots));
1736 aitem = opal_list_get_next(aitem) ) {
1737 app_snapshot = (orte_snapc_base_local_snapshot_t*)aitem;
1738
1739 app_snapshot->state = ORTE_SNAPC_CKPT_STATE_NONE;
1740 }
1741 }
1742
1743 SNAPC_FULL_SET_TIMER(SNAPC_FULL_TIMER_RECOVERED);
1744 SNAPC_FULL_DISPLAY_RECOVERED_TIMER();
1745 orte_snapc_base_has_recovered = true;
1746 is_app_checkpointable = true;
1747
1748 exit_status = ORTE_SUCCESS;
1749 goto cleanup;
1750 }
1751
1752
1753
1754
1755
1756 if(ORTE_SNAPC_CKPT_STATE_ESTABLISHED != current_job_ckpt_state ) {
1757 cleanup_on_establish = true;
1758 }
1759
1760 current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_RECOVERED;
1761
1762 if( NULL != state_str ) {
1763 free(state_str);
1764 }
1765 orte_snapc_ckpt_state_str(&state_str, current_job_ckpt_state);
1766 OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
1767 "Global) Job State Changed: %d (%s)\n",
1768 (int)current_job_ckpt_state, state_str ));
1769 free(state_str);
1770 state_str = NULL;
1771
1772
1773
1774
1775 if( is_orte_checkpoint_connected &&
1776 ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_update_cmd(&orte_checkpoint_sender,
1777 global_snapshot.ss_handle,
1778 current_job_ckpt_state)) ) {
1779 ORTE_ERROR_LOG(ret);
1780 exit_status = ret;
1781 goto cleanup;
1782 }
1783
1784 SNAPC_FULL_SET_TIMER(SNAPC_FULL_TIMER_RECOVERED);
1785
1786
1787
1788
1789 if( !cleanup_on_establish && ORTE_SNAPC_CKPT_STATE_RECOVERED == current_job_ckpt_state) {
1790 if( ORTE_SUCCESS != (ret = orte_snapc_full_global_reset_coord()) ) {
1791 ORTE_ERROR_LOG(ret);
1792 exit_status = ret;
1793 goto cleanup;
1794 }
1795 }
1796 }
1797
1798 cleanup:
1799 if( NULL != state_str ) {
1800 free(state_str);
1801 state_str = NULL;
1802 }
1803
1804 return exit_status;
1805 }
1806
1807 static void snapc_full_process_restart_proc_info_cmd(orte_process_name_t* sender,
1808 opal_buffer_t* buffer)
1809 {
1810 int ret;
1811 orte_std_cntr_t count;
1812 size_t num_vpids = 0, i;
1813 pid_t tmp_pid;
1814 char * tmp_hostname = NULL;
1815
1816 count = 1;
1817 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &tmp_hostname, &count, OPAL_STRING))) {
1818 opal_output(mca_snapc_full_component.super.output_handle,
1819 "Global) vpid_assoc: Failed to unpack process Hostname from peer %s\n",
1820 ORTE_NAME_PRINT(sender));
1821 goto cleanup;
1822 }
1823
1824 count = 1;
1825 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &num_vpids, &count, OPAL_SIZE))) {
1826 opal_output(mca_snapc_full_component.super.output_handle,
1827 "Global) vpid_assoc: Failed to unpack num_vpids from peer %s\n",
1828 ORTE_NAME_PRINT(sender));
1829 goto cleanup;
1830 }
1831
1832 for(i = 0; i < num_vpids; ++i) {
1833 count = 1;
1834 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &tmp_pid, &count, OPAL_PID))) {
1835 opal_output(mca_snapc_full_component.super.output_handle,
1836 "Global) vpid_assoc: Failed to unpack process PID from peer %s\n",
1837 ORTE_NAME_PRINT(sender));
1838 goto cleanup;
1839 }
1840
1841 global_coord_restart_proc_info(tmp_pid, tmp_hostname);
1842 }
1843
1844
1845
1846
1847 fflush(stdout);
1848
1849 cleanup:
1850 return;
1851 }
1852
1853 int global_coord_restart_proc_info(pid_t local_pid, char * local_hostname)
1854 {
1855 printf("MPIR_debug_info) %s:%d\n", local_hostname, local_pid);
1856 return 0;
1857 }
1858
1859 static void snapc_full_process_job_update_cmd(orte_process_name_t* sender,
1860 opal_buffer_t* buffer,
1861 bool quick)
1862 {
1863 int ret;
1864 orte_std_cntr_t count;
1865 orte_jobid_t jobid;
1866 int job_ckpt_state = ORTE_SNAPC_CKPT_STATE_NONE;
1867 opal_crs_base_ckpt_options_t *options = NULL;
1868 bool loc_migrating = false;
1869 size_t loc_num_procs = 0;
1870 orte_proc_t *proc = NULL;
1871 size_t i;
1872 orte_sstore_base_handle_t ss_handle;
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884 count = 1;
1885 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &jobid, &count, ORTE_JOBID))) {
1886 ORTE_ERROR_LOG(ret);
1887 return;
1888 }
1889
1890 count = 1;
1891 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &job_ckpt_state, &count, OPAL_INT))) {
1892 ORTE_ERROR_LOG(ret);
1893 return;
1894 }
1895
1896 if( !quick ) {
1897 if (ORTE_SUCCESS != (ret = orte_sstore.unpack_handle(sender, buffer, &ss_handle)) ) {
1898 ORTE_ERROR_LOG(ret);
1899 return;
1900 }
1901
1902 options = OBJ_NEW(opal_crs_base_ckpt_options_t);
1903 if( ORTE_SUCCESS != (ret = orte_snapc_base_unpack_options(buffer, options)) ) {
1904 ORTE_ERROR_LOG(ret);
1905 return;
1906 }
1907
1908
1909
1910 opal_crs_base_copy_options(options, global_snapshot.options);
1911
1912 count = 1;
1913 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(loc_migrating), &count, OPAL_BOOL))) {
1914 ORTE_ERROR_LOG(ret);
1915 goto cleanup;
1916 }
1917
1918 if( loc_migrating ) {
1919 count = 1;
1920 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &loc_num_procs, &count, OPAL_SIZE))) {
1921 ORTE_ERROR_LOG(ret);
1922 goto cleanup;
1923 }
1924
1925 for( i = 0; i < loc_num_procs; ++i ) {
1926 count = 1;
1927 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &proc, &count, ORTE_NAME))) {
1928 ORTE_ERROR_LOG(ret);
1929 goto cleanup;
1930 }
1931
1932 }
1933 }
1934 }
1935
1936 if( ORTE_SUCCESS != (ret = global_coord_job_state_update(jobid,
1937 job_ckpt_state,
1938 ss_handle,
1939 global_snapshot.options) ) ) {
1940 ORTE_ERROR_LOG(ret);
1941 }
1942
1943 cleanup:
1944 if( NULL != options ) {
1945 OBJ_RELEASE(options);
1946 options = NULL;
1947 }
1948
1949 return;
1950 }
1951
1952 static int snapc_full_establish_snapshot_dir(bool empty_metadata)
1953 {
1954 char **value = NULL;
1955 int idx = 0;
1956
1957
1958
1959
1960 INC_SEQ_NUM();
1961 orte_sstore.request_checkpoint_handle(&(global_snapshot.ss_handle),
1962 orte_snapc_base_snapshot_seq_number,
1963 current_global_jobid);
1964 if( currently_migrating ) {
1965 orte_sstore.set_attr(global_snapshot.ss_handle,
1966 SSTORE_METADATA_GLOBAL_MIGRATING,
1967 "1");
1968 }
1969 orte_sstore.register_handle(global_snapshot.ss_handle);
1970
1971
1972
1973
1974 if( 0 > (idx = mca_base_var_find("opal", "mca", "base", "param_file_prefix")) ) {
1975 opal_show_help("help-orte-restart.txt", "amca_param_not_found", true);
1976 }
1977 if( 0 < idx ) {
1978 mca_base_var_get_value (idx, &value, NULL, NULL);
1979
1980 if (*value) {
1981 orte_sstore.set_attr(global_snapshot.ss_handle,
1982 SSTORE_METADATA_GLOBAL_AMCA_PARAM,
1983 *value);
1984
1985 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1986 "Global) AMCA Parameter Preserved: %s",
1987 *value));
1988 }
1989 }
1990
1991
1992
1993
1994 if( 0 > (idx = mca_base_var_find("opal", "mca", "base", "envar_file_prefix")) ) {
1995 opal_show_help("help-orte-restart.txt", "tune_param_not_found", true);
1996 }
1997 if( 0 < idx ) {
1998 mca_base_var_get_value (idx, &value, NULL, NULL);
1999
2000 if (*value) {
2001 orte_sstore.set_attr(global_snapshot.ss_handle,
2002 SSTORE_METADATA_GLOBAL_TUNE_PARAM,
2003 *value);
2004
2005 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
2006 "Global) TUNE Parameter Preserved: %s",
2007 *value));
2008 }
2009 }
2010
2011 return ORTE_SUCCESS;
2012 }
2013
2014 static int snapc_full_global_checkpoint(opal_crs_base_ckpt_options_t *options)
2015 {
2016 int ret, exit_status = ORTE_SUCCESS;
2017
2018 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
2019 "Global) Checkpoint of job %s has been requested\n",
2020 ORTE_JOBID_PRINT(current_global_jobid)));
2021
2022
2023
2024 current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_REQUEST;
2025
2026
2027
2028
2029 if( ORTE_SUCCESS != (ret = snapc_full_establish_snapshot_dir(false))) {
2030 ORTE_ERROR_LOG(ret);
2031 exit_status = ret;
2032 goto cleanup;
2033 }
2034
2035
2036
2037
2038 updated_job_to_running = false;
2039 if( is_orte_checkpoint_connected &&
2040 ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_update_cmd(&orte_checkpoint_sender,
2041 global_snapshot.ss_handle,
2042 current_job_ckpt_state) ) ) {
2043 ORTE_ERROR_LOG(ret);
2044 exit_status = ret;
2045 goto cleanup;
2046 }
2047
2048
2049
2050
2051 OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
2052 "Global) Notifying the Local Coordinators\n"));
2053
2054 if( ORTE_SUCCESS != (ret = snapc_full_global_notify_checkpoint(current_global_jobid, options)) ) {
2055 ORTE_ERROR_LOG(ret);
2056 exit_status = ret;
2057 goto cleanup;
2058 }
2059
2060 cleanup:
2061 return exit_status;
2062 }
2063
2064 static int snapc_full_global_notify_checkpoint(orte_jobid_t jobid,
2065 opal_crs_base_ckpt_options_t *options)
2066 {
2067 int ret, exit_status = ORTE_SUCCESS;
2068 orte_snapc_full_orted_snapshot_t *orted_snapshot = NULL;
2069 opal_list_item_t* item = NULL;
2070 int ckpt_state;
2071
2072 ckpt_state = ORTE_SNAPC_CKPT_STATE_PENDING;
2073
2074
2075
2076
2077 opal_crs_base_copy_options(options, global_snapshot.options);
2078
2079
2080
2081
2082 for(item = opal_list_get_first(&global_snapshot.local_snapshots);
2083 item != opal_list_get_end(&global_snapshot.local_snapshots);
2084 item = opal_list_get_next(item) ) {
2085 orted_snapshot = (orte_snapc_full_orted_snapshot_t*)item;
2086
2087 orted_snapshot->state = ckpt_state;
2088 }
2089
2090
2091
2092
2093 if( ORTE_SUCCESS != (ret = orte_snapc_full_global_set_job_ckpt_info(jobid,
2094 ckpt_state,
2095 global_snapshot.ss_handle,
2096 false, options) ) ) {
2097 ORTE_ERROR_LOG(ret);
2098 exit_status = ret;
2099 goto cleanup;
2100 }
2101
2102 cleanup:
2103 return exit_status;
2104 }
2105
2106
2107
2108
2109 static int orte_snapc_full_global_set_job_ckpt_info( orte_jobid_t jobid,
2110 int ckpt_state,
2111 orte_sstore_base_handle_t handle,
2112 bool quick,
2113 opal_crs_base_ckpt_options_t *options)
2114 {
2115 int ret, exit_status = ORTE_SUCCESS;
2116 orte_snapc_full_cmd_flag_t command;
2117 opal_buffer_t *buffer = NULL;
2118 char * state_str = NULL;
2119 orte_proc_t *proc = NULL;
2120 opal_list_item_t *item = NULL;
2121 size_t num_procs;
2122 orte_grpcomm_signature_t *sig;
2123
2124
2125
2126
2127 buffer = OBJ_NEW(opal_buffer_t);
2128
2129 if( quick ) {
2130 command = ORTE_SNAPC_FULL_UPDATE_JOB_STATE_QUICK_CMD;
2131 } else {
2132 command = ORTE_SNAPC_FULL_UPDATE_JOB_STATE_CMD;
2133 }
2134
2135 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
2136 ORTE_ERROR_LOG(ret);
2137 exit_status = ret;
2138 goto cleanup;
2139 }
2140
2141 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &jobid, 1, ORTE_JOBID))) {
2142 ORTE_ERROR_LOG(ret);
2143 exit_status = ret;
2144 goto cleanup;
2145 }
2146
2147 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &ckpt_state, 1, OPAL_INT))) {
2148 ORTE_ERROR_LOG(ret);
2149 exit_status = ret;
2150 goto cleanup;
2151 }
2152
2153 if( quick ) {
2154 goto process_msg;
2155 }
2156
2157 if (ORTE_SUCCESS != (ret = orte_sstore.pack_handle(NULL, buffer, handle))) {
2158 ORTE_ERROR_LOG(ret);
2159 exit_status = ret;
2160 goto cleanup;
2161 }
2162
2163 if(ORTE_SUCCESS != (ret = orte_snapc_base_pack_options(buffer, options))) {
2164 ORTE_ERROR_LOG(ret);
2165 exit_status = ret;
2166 goto cleanup;
2167 }
2168
2169 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(currently_migrating), 1, OPAL_BOOL))) {
2170 ORTE_ERROR_LOG(ret);
2171 exit_status = ret;
2172 goto cleanup;
2173 }
2174
2175 if( currently_migrating ) {
2176 num_procs = opal_list_get_size(migrating_procs);
2177
2178 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &num_procs, 1, OPAL_SIZE))) {
2179 ORTE_ERROR_LOG(ret);
2180 exit_status = ret;
2181 goto cleanup;
2182 }
2183
2184 for (item = opal_list_get_first(migrating_procs);
2185 item != opal_list_get_end(migrating_procs);
2186 item = opal_list_get_next(item)) {
2187 proc = (orte_proc_t*)item;
2188 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(proc->name), 1, ORTE_NAME))) {
2189 ORTE_ERROR_LOG(ret);
2190 exit_status = ret;
2191 goto cleanup;
2192 }
2193 }
2194 }
2195
2196 process_msg:
2197 orte_snapc_ckpt_state_str(&state_str, ckpt_state);
2198 OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
2199 "Global) Notify Local Coordinators of job %s state change to %d (%s)\n",
2200 ORTE_JOBID_PRINT(jobid), (int)ckpt_state, state_str ));
2201 free(state_str);
2202 state_str = NULL;
2203
2204
2205 sig = OBJ_NEW(orte_grpcomm_signature_t);
2206 sig->signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t));
2207 sig->signature[0].jobid = ORTE_PROC_MY_NAME->jobid;
2208 sig->signature[0].vpid = ORTE_VPID_WILDCARD;
2209 if (ORTE_SUCCESS != (ret = orte_grpcomm.xcast(sig, ORTE_RML_TAG_SNAPC_FULL, buffer))) {
2210 ORTE_ERROR_LOG(ret);
2211 exit_status = ret;
2212 goto cleanup;
2213 }
2214
2215
2216
2217
2218
2219 cleanup:
2220 if( NULL != state_str ) {
2221 free(state_str);
2222 state_str = NULL;
2223 }
2224
2225 OBJ_RELEASE(buffer);
2226 OBJ_RELEASE(sig);
2227
2228 return exit_status;
2229 }
2230
2231 int global_coord_job_state_update(orte_jobid_t jobid,
2232 int job_ckpt_state,
2233 orte_sstore_base_handle_t ss_handle,
2234 opal_crs_base_ckpt_options_t *options)
2235 {
2236 int ret, exit_status = ORTE_SUCCESS;
2237 char * state_str = NULL;
2238
2239 orte_snapc_ckpt_state_str(&state_str, job_ckpt_state);
2240 OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
2241 "Global) Job update command: jobid %s -> state %d (%s)\n",
2242 ORTE_JOBID_PRINT(jobid), (int)job_ckpt_state, state_str ));
2243 free(state_str);
2244 state_str = NULL;
2245
2246
2247
2248
2249 current_job_ckpt_state = job_ckpt_state;
2250 if( is_orte_checkpoint_connected &&
2251 ORTE_SUCCESS != (ret = orte_snapc_base_global_coord_ckpt_update_cmd(&orte_checkpoint_sender,
2252 global_snapshot.ss_handle,
2253 current_job_ckpt_state)) ) {
2254 ORTE_ERROR_LOG(ret);
2255 exit_status = ret;
2256 goto cleanup;
2257 }
2258
2259
2260
2261
2262 if( ORTE_SNAPC_LOCAL_COORD_TYPE == (orte_snapc_coord_type & ORTE_SNAPC_LOCAL_COORD_TYPE) ) {
2263 if( ORTE_SUCCESS != (ret = local_coord_job_state_update(jobid,
2264 job_ckpt_state,
2265 ss_handle,
2266 options)) ) {
2267 ORTE_ERROR_LOG(ret);
2268 exit_status = ret;
2269 goto cleanup;
2270 }
2271 }
2272
2273
2274
2275
2276 if(ORTE_SNAPC_CKPT_STATE_ESTABLISHED == job_ckpt_state ) {
2277
2278
2279
2280
2281 if( cleanup_on_establish ) {
2282 if( ORTE_SUCCESS != (ret = orte_snapc_full_global_reset_coord()) ) {
2283 ORTE_ERROR_LOG(ret);
2284 exit_status = ret;
2285 goto cleanup;
2286 }
2287 }
2288 }
2289 else if(ORTE_SNAPC_CKPT_STATE_ERROR == job_ckpt_state ) {
2290 opal_output(mca_snapc_full_component.super.output_handle,
2291 "Error: Checkpoint failed!");
2292 }
2293
2294
2295
2296 else if(ORTE_SNAPC_CKPT_STATE_STOPPED == job_ckpt_state ) {
2297 ;
2298 }
2299
2300
2301
2302 else if(ORTE_SNAPC_CKPT_STATE_REQUEST == job_ckpt_state ) {
2303 opal_output(mca_snapc_full_component.super.output_handle,
2304 "ERROR: Internal Checkpoint request not implemented.");
2305 ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED);
2306 }
2307
2308 cleanup:
2309 if( NULL != state_str) {
2310 free(state_str);
2311 state_str = NULL;
2312 }
2313
2314 return exit_status;
2315 }
2316
2317 static int write_out_global_metadata(void)
2318 {
2319 orte_snapc_full_orted_snapshot_t *orted_snapshot = NULL;
2320 opal_list_item_t* orted_item = NULL;
2321
2322 OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
2323 "Global) Updating Metadata"));
2324
2325
2326
2327
2328
2329
2330 for(orted_item = opal_list_get_first(&(global_snapshot.local_snapshots));
2331 orted_item != opal_list_get_end(&(global_snapshot.local_snapshots));
2332 orted_item = opal_list_get_next(orted_item) ) {
2333 orted_snapshot = (orte_snapc_full_orted_snapshot_t*)orted_item;
2334
2335 if( ORTE_SNAPC_CKPT_STATE_ERROR == orted_snapshot->state ) {
2336 return ORTE_ERROR;
2337 }
2338 }
2339
2340
2341
2342
2343 orte_sstore.sync(global_snapshot.ss_handle);
2344
2345 SNAPC_FULL_SET_TIMER(SNAPC_FULL_TIMER_SS_SYNC);
2346
2347 return ORTE_SUCCESS;
2348 }
2349
2350 static orte_snapc_full_orted_snapshot_t *find_orted_snapshot(orte_process_name_t *name )
2351 {
2352 int ret;
2353
2354 orte_snapc_full_orted_snapshot_t *orted_snapshot = NULL;
2355 opal_list_item_t* item = NULL;
2356 orte_ns_cmp_bitmask_t mask;
2357
2358 for(item = opal_list_get_first(&(global_snapshot.local_snapshots));
2359 item != opal_list_get_end(&(global_snapshot.local_snapshots));
2360 item = opal_list_get_next(item) ) {
2361 orted_snapshot = (orte_snapc_full_orted_snapshot_t*)item;
2362
2363 mask = ORTE_NS_CMP_ALL;
2364
2365 if (OPAL_EQUAL ==
2366 orte_util_compare_name_fields(mask, name, &orted_snapshot->process_name)) {
2367 return orted_snapshot;
2368 }
2369 }
2370
2371
2372
2373
2374 OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
2375 "Global) find_orted(%s) failed. Refreshing and trying again...",
2376 ORTE_NAME_PRINT(name) ));
2377
2378 if( ORTE_SUCCESS != (ret = global_refresh_job_structs()) ) {
2379 ORTE_ERROR_LOG(ret);
2380 return NULL;
2381 }
2382
2383 for(item = opal_list_get_first(&(global_snapshot.local_snapshots));
2384 item != opal_list_get_end(&(global_snapshot.local_snapshots));
2385 item = opal_list_get_next(item) ) {
2386 orted_snapshot = (orte_snapc_full_orted_snapshot_t*)item;
2387
2388 mask = ORTE_NS_CMP_ALL;
2389
2390 if (OPAL_EQUAL ==
2391 orte_util_compare_name_fields(mask, name, &orted_snapshot->process_name)) {
2392 return orted_snapshot;
2393 }
2394 }
2395
2396 return NULL;
2397 }
2398
2399 static int snapc_full_global_get_min_state(void)
2400 {
2401 int min_state = ORTE_SNAPC_CKPT_MAX;
2402 orte_snapc_full_orted_snapshot_t *orted_snapshot = NULL;
2403 opal_list_item_t* item = NULL;
2404 char * state_str_a = NULL;
2405 char * state_str_b = NULL;
2406
2407 current_total_orteds = 0;
2408
2409 for(item = opal_list_get_first(&(global_snapshot.local_snapshots));
2410 item != opal_list_get_end(&(global_snapshot.local_snapshots));
2411 item = opal_list_get_next(item) ) {
2412 orted_snapshot = (orte_snapc_full_orted_snapshot_t*)item;
2413
2414
2415 if( 0 >= opal_list_get_size(&(orted_snapshot->super.local_snapshots)) ) {
2416 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
2417 "Global) ... %s Skipping - (no children)",
2418 ORTE_NAME_PRINT(&orted_snapshot->process_name) ));
2419 continue;
2420 }
2421
2422 current_total_orteds++;
2423
2424 if( NULL != state_str_a ) {
2425 free(state_str_a);
2426 state_str_a = NULL;
2427 }
2428 if( NULL != state_str_b ) {
2429 free(state_str_b);
2430 state_str_b = NULL;
2431 }
2432
2433 orte_snapc_ckpt_state_str(&state_str_a, orted_snapshot->state);
2434 orte_snapc_ckpt_state_str(&state_str_b, min_state);
2435
2436 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
2437 "Global) ... %s Checking [%d %s] vs [%d %s]",
2438 ORTE_NAME_PRINT(&orted_snapshot->process_name),
2439 (int)orted_snapshot->state, state_str_a,
2440 min_state, state_str_b ));
2441
2442 if( (int)min_state > (int)orted_snapshot->state ) {
2443 min_state = orted_snapshot->state;
2444
2445 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
2446 "Global) ... %s Update --> Min State [%d %s]",
2447 ORTE_NAME_PRINT(&orted_snapshot->process_name),
2448 (int)min_state, state_str_a ));
2449 }
2450 }
2451
2452 if( NULL != state_str_b ) {
2453 free(state_str_b);
2454 state_str_b = NULL;
2455 }
2456 orte_snapc_ckpt_state_str(&state_str_b, min_state);
2457 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
2458 "Global) ... Min State [%d %s]",
2459 (int)min_state, state_str_b ));
2460
2461 if( NULL != state_str_a ) {
2462 free(state_str_a);
2463 state_str_a = NULL;
2464 }
2465 if( NULL != state_str_b ) {
2466 free(state_str_b);
2467 state_str_b = NULL;
2468 }
2469
2470 return min_state;
2471 }
2472
2473 static int orte_snapc_full_global_reset_coord(void)
2474 {
2475 int ret, exit_status = ORTE_SUCCESS;
2476 opal_list_item_t* item = NULL;
2477 opal_list_item_t* aitem = NULL;
2478 orte_snapc_full_orted_snapshot_t *orted_snapshot = NULL;
2479 orte_snapc_base_local_snapshot_t *app_snapshot = NULL;
2480
2481
2482
2483
2484
2485
2486
2487 if( global_snapshot.options->term ) {
2488 SNAPC_FULL_DISPLAY_ALL_TIMERS();
2489 orte_plm.terminate_job(current_global_jobid);
2490 } else {
2491 SNAPC_FULL_DISPLAY_ALL_TIMERS();
2492 }
2493
2494
2495
2496
2497 opal_crs_base_clear_options(global_snapshot.options);
2498
2499
2500
2501
2502 for(item = opal_list_get_first(&(global_snapshot.local_snapshots));
2503 item != opal_list_get_end(&(global_snapshot.local_snapshots));
2504 item = opal_list_get_next(item) ) {
2505 orted_snapshot = (orte_snapc_full_orted_snapshot_t*)item;
2506
2507 orted_snapshot->state = ORTE_SNAPC_CKPT_STATE_NONE;
2508
2509 for(aitem = opal_list_get_first(&(orted_snapshot->super.local_snapshots));
2510 aitem != opal_list_get_end(&(orted_snapshot->super.local_snapshots));
2511 aitem = opal_list_get_next(aitem) ) {
2512 app_snapshot = (orte_snapc_base_local_snapshot_t*)aitem;
2513
2514 app_snapshot->state = ORTE_SNAPC_CKPT_STATE_NONE;
2515 }
2516 }
2517
2518
2519
2520
2521 is_orte_checkpoint_connected = false;
2522 if( ORTE_SUCCESS != (ret = snapc_full_global_start_cmdline_listener() ) ){
2523 ORTE_ERROR_LOG(ret);
2524 exit_status = ret;
2525 }
2526
2527 current_job_ckpt_state = ORTE_SNAPC_CKPT_STATE_NONE;
2528 cleanup_on_establish = false;
2529
2530 report_progress_cur_loc_finished = 0;
2531 report_progress_last_reported_loc_finished = 0;
2532
2533 return exit_status;
2534 }
2535
2536
2537
2538
2539 static void snapc_full_set_time(int idx)
2540 {
2541 if(idx < SNAPC_FULL_TIMER_MAX ) {
2542 if( timer_start[idx] <= 0.0 ) {
2543 timer_start[idx] = snapc_full_get_time();
2544 }
2545 }
2546 }
2547
2548 static void snapc_full_display_all_timers(void)
2549 {
2550 double diff = 0.0;
2551 char * label = NULL;
2552
2553 opal_output(0, "Snapshot Coordination Timing: ******************** Summary Begin\n");
2554
2555
2556 label = strdup("Running");
2557 diff = timer_start[SNAPC_FULL_TIMER_RUNNING] - timer_start[SNAPC_FULL_TIMER_START];
2558 snapc_full_display_indv_timer_core(diff, label);
2559 free(label);
2560
2561
2562 label = strdup("Finish Locally");
2563 diff = timer_start[SNAPC_FULL_TIMER_FIN_LOCAL] - timer_start[SNAPC_FULL_TIMER_RUNNING];
2564 snapc_full_display_indv_timer_core(diff, label);
2565 free(label);
2566
2567 if( timer_start[SNAPC_FULL_TIMER_SS_SYNC] <= timer_start[SNAPC_FULL_TIMER_RECOVERED] ) {
2568
2569 label = strdup("SStore Sync");
2570 diff = timer_start[SNAPC_FULL_TIMER_SS_SYNC] - timer_start[SNAPC_FULL_TIMER_FIN_LOCAL];
2571 snapc_full_display_indv_timer_core(diff, label);
2572 free(label);
2573
2574
2575 label = strdup("Establish");
2576 diff = timer_start[SNAPC_FULL_TIMER_ESTABLISH] - timer_start[SNAPC_FULL_TIMER_SS_SYNC];
2577 snapc_full_display_indv_timer_core(diff, label);
2578 free(label);
2579
2580
2581 label = strdup("Continue/Recover");
2582 diff = timer_start[SNAPC_FULL_TIMER_RECOVERED] - timer_start[SNAPC_FULL_TIMER_ESTABLISH];
2583 snapc_full_display_indv_timer_core(diff, label);
2584 free(label);
2585 } else {
2586
2587 label = strdup("SStore Sync*");
2588 diff = timer_start[SNAPC_FULL_TIMER_SS_SYNC] - timer_start[SNAPC_FULL_TIMER_RECOVERED];
2589 snapc_full_display_indv_timer_core(diff, label);
2590 free(label);
2591
2592
2593 label = strdup("Establish*");
2594 diff = timer_start[SNAPC_FULL_TIMER_ESTABLISH] - timer_start[SNAPC_FULL_TIMER_SS_SYNC];
2595 snapc_full_display_indv_timer_core(diff, label);
2596 free(label);
2597
2598
2599 label = strdup("Continue/Recover*");
2600 diff = timer_start[SNAPC_FULL_TIMER_RECOVERED] - timer_start[SNAPC_FULL_TIMER_FIN_LOCAL];
2601 snapc_full_display_indv_timer_core(diff, label);
2602 free(label);
2603 }
2604
2605 opal_output(0, "Snapshot Coordination Timing: ******************** Summary End\n");
2606 }
2607
2608 static void snapc_full_display_recovered_timers(void)
2609 {
2610 double diff = 0.0;
2611 char * label = NULL;
2612
2613 opal_output(0, "Snapshot Coordination Timing: ******************** Summary Begin\n");
2614
2615
2616 label = strdup("Recover");
2617 diff = timer_start[SNAPC_FULL_TIMER_RECOVERED] - timer_start[SNAPC_FULL_TIMER_START];
2618 snapc_full_display_indv_timer_core(diff, label);
2619 free(label);
2620
2621 opal_output(0, "Snapshot Coordination Timing: ******************** Summary End\n");
2622 }
2623
2624 static void snapc_full_clear_timers(void)
2625 {
2626 int i;
2627 for(i = 0; i < SNAPC_FULL_TIMER_MAX; ++i) {
2628 timer_start[i] = 0.0;
2629 }
2630 }
2631
2632 static double snapc_full_get_time(void)
2633 {
2634 double wtime;
2635
2636 #if OPAL_TIMER_USEC_NATIVE
2637 wtime = (double)opal_timer_base_get_usec() / 1000000.0;
2638 #else
2639 struct timeval tv;
2640 gettimeofday(&tv, NULL);
2641 wtime = tv.tv_sec;
2642 wtime += (double)tv.tv_usec / 1000000.0;
2643 #endif
2644
2645 return wtime;
2646 }
2647
2648 static void snapc_full_display_indv_timer_core(double diff, char *str)
2649 {
2650 double total = 0;
2651 double perc = 0;
2652
2653 if( timer_start[SNAPC_FULL_TIMER_SS_SYNC] <= timer_start[SNAPC_FULL_TIMER_RECOVERED] ) {
2654 total = timer_start[SNAPC_FULL_TIMER_RECOVERED] - timer_start[SNAPC_FULL_TIMER_START];
2655 } else {
2656 total = timer_start[SNAPC_FULL_TIMER_ESTABLISH] - timer_start[SNAPC_FULL_TIMER_START];
2657 }
2658 perc = (diff/total) * 100;
2659
2660 opal_output(0,
2661 "snapc_full: timing: %-20s = %10.2f s\t%10.2f s\t%6.2f\n",
2662 str,
2663 diff,
2664 total,
2665 perc);
2666 return;
2667 }
2668
2669 static void snapc_full_report_progress(orte_snapc_full_orted_snapshot_t *orted_snapshot, int total, int min_state)
2670 {
2671 orte_snapc_full_orted_snapshot_t *loc_orted_snapshot = NULL;
2672 opal_list_item_t* item = NULL;
2673 double perc_done;
2674
2675 if( ORTE_SNAPC_CKPT_STATE_FINISHED_LOCAL != orted_snapshot->state ) {
2676 return;
2677 }
2678
2679 report_progress_cur_loc_finished++;
2680 perc_done = (total-report_progress_cur_loc_finished)/(total*1.0);
2681 perc_done = (perc_done-1)*(-100.0);
2682
2683 if( perc_done >= (report_progress_last_reported_loc_finished + orte_snapc_full_progress_meter) ||
2684 report_progress_last_reported_loc_finished == 0.0 ) {
2685 report_progress_last_reported_loc_finished = perc_done;
2686 opal_output(0, "snapc_full: progress: %10.2f %c Locally Finished\n",
2687 perc_done, '%');
2688 }
2689
2690 if( perc_done > 95.0 ) {
2691 opal_output(0, "snapc_full: progress: Waiting on the following daemons (%10.2f %c):", perc_done, '%');
2692
2693 for(item = opal_list_get_first(&(global_snapshot.local_snapshots));
2694 item != opal_list_get_end(&(global_snapshot.local_snapshots));
2695 item = opal_list_get_next(item) ) {
2696 loc_orted_snapshot = (orte_snapc_full_orted_snapshot_t*)item;
2697
2698 if( ORTE_SNAPC_CKPT_STATE_FINISHED_LOCAL != loc_orted_snapshot->state ) {
2699 opal_output(0, "snapc_full: progress: Daemon %s",
2700 ORTE_NAME_PRINT(&loc_orted_snapshot->process_name));
2701 }
2702 }
2703 }
2704
2705 return;
2706 }