This source file includes following definitions.
- orte_sstore_stage_global_snapshot_info_construct
- orte_sstore_stage_global_snapshot_info_destruct
- orte_sstore_stage_global_module_init
- orte_sstore_stage_global_module_finalize
- orte_sstore_stage_global_request_checkpoint_handle
- orte_sstore_stage_global_request_global_snapshot_data
- orte_sstore_stage_global_register
- orte_sstore_stage_global_get_attr
- orte_sstore_stage_global_set_attr
- orte_sstore_stage_global_sync
- sync_global_dir
- orte_sstore_stage_global_remove
- orte_sstore_stage_global_pack
- orte_sstore_stage_global_unpack
- create_new_handle_info
- find_handle_info
- sstore_stage_global_start_listener
- sstore_stage_global_stop_listener
- sstore_stage_global_recv
- process_local_pull
- process_local_push
- process_local_done
- init_global_snapshot_directory
- wait_all_filem
- xcast_remove_all
- metadata_open
- metadata_close
- metadata_write_int
- metadata_write_str
- metadata_write_timestamp
- orte_sstore_stage_extract_global_metadata
- stage_snapshot_sort_compare_fn
- sstore_stage_report_progress
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 #include "orte_config.h"
20
21 #include <string.h>
22 #include <stdlib.h>
23 #include <sys/types.h>
24 #include <sys/stat.h>
25 #include <sys/wait.h>
26 #ifdef HAVE_UNISTD_H
27 #include <unistd.h>
28 #endif
29
30 #include "orte/mca/mca.h"
31 #include "opal/mca/base/base.h"
32
33 #include "opal/mca/event/event.h"
34
35 #include "orte/constants.h"
36 #include "opal/util/argv.h"
37 #include "opal/util/output.h"
38 #include "opal/util/show_help.h"
39 #include "opal/util/opal_environ.h"
40 #include "opal/util/basename.h"
41 #include "opal/util/os_dirpath.h"
42 #include "opal/util/opal_getcwd.h"
43
44 #include "opal/threads/mutex.h"
45 #include "opal/threads/condition.h"
46
47 #include "orte/util/show_help.h"
48 #include "orte/util/name_fns.h"
49 #include "orte/util/proc_info.h"
50 #include "orte/runtime/orte_globals.h"
51 #include "orte/runtime/orte_wait.h"
52
53 #include "orte/mca/errmgr/errmgr.h"
54 #include "orte/mca/errmgr/base/base.h"
55 #include "orte/mca/errmgr/base/errmgr_private.h"
56 #include "orte/mca/ess/ess.h"
57 #include "orte/mca/rml/rml.h"
58 #include "orte/mca/rml/rml_types.h"
59 #include "orte/mca/filem/filem.h"
60 #include "orte/mca/grpcomm/grpcomm.h"
61 #include "orte/mca/snapc/snapc.h"
62 #include "orte/mca/snapc/base/base.h"
63
64 #include "orte/mca/sstore/sstore.h"
65 #include "orte/mca/sstore/base/base.h"
66
67 #include "sstore_stage.h"
68
69 #define SSTORE_HANDLE_TYPE_NONE 0
70 #define SSTORE_HANDLE_TYPE_CKPT 1
71 #define SSTORE_HANDLE_TYPE_RESTART 2
72
73 #define SSTORE_GLOBAL_NONE 0
74 #define SSTORE_GLOBAL_ERROR 1
75 #define SSTORE_GLOBAL_INIT 2
76 #define SSTORE_GLOBAL_REG 3
77 #define SSTORE_GLOBAL_SYNCING 4
78 #define SSTORE_GLOBAL_SYNCED 5
79
80
81
82
83 struct orte_sstore_stage_global_snapshot_info_t {
84
85 opal_list_item_t super;
86
87
88 orte_sstore_base_handle_t id;
89
90
91 orte_jobid_t jobid;
92
93
94 int state;
95
96
97 int handle_type;
98
99
100 int seq_num;
101
102
103 char * ref_name;
104
105
106 char * local_location;
107
108
109 char * app_global_location_fmt;
110
111
112 char * app_local_location_fmt;
113
114
115 char * app_local_cache_location_fmt;
116
117
118 char * base_location;
119
120
121 char *metadata_filename;
122
123
124 FILE *metadata;
125
126
127 int num_procs_synced;
128
129
130 int num_procs_done;
131
132
133 int num_procs_total;
134
135
136 opal_list_t *filem_requests;
137
138
139 bool migrating;
140
141
142 char * compress_comp;
143 char * compress_postfix;
144
145
146 double last_progress_report;
147 };
148 typedef struct orte_sstore_stage_global_snapshot_info_t orte_sstore_stage_global_snapshot_info_t;
149 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_sstore_stage_global_snapshot_info_t);
150
151 void orte_sstore_stage_global_snapshot_info_construct(orte_sstore_stage_global_snapshot_info_t *info);
152 void orte_sstore_stage_global_snapshot_info_destruct( orte_sstore_stage_global_snapshot_info_t *info);
153
154 OBJ_CLASS_INSTANCE(orte_sstore_stage_global_snapshot_info_t,
155 opal_list_item_t,
156 orte_sstore_stage_global_snapshot_info_construct,
157 orte_sstore_stage_global_snapshot_info_destruct);
158
159
160
161
162
163 static bool is_global_listener_active = false;
164 static int sstore_stage_global_start_listener(void);
165 static int sstore_stage_global_stop_listener(void);
166 static void sstore_stage_global_recv(int status,
167 orte_process_name_t* sender,
168 opal_buffer_t* buffer,
169 orte_rml_tag_t tag,
170 void* cbdata);
171
172 static int process_local_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_global_snapshot_info_t *handle_info);
173 static int process_local_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_global_snapshot_info_t *handle_info);
174 static int process_local_done(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_global_snapshot_info_t *handle_info);
175 static int xcast_remove_all(orte_sstore_stage_global_snapshot_info_t *handle_info);
176
177 static orte_sstore_stage_global_snapshot_info_t *create_new_handle_info(int seq, int type, orte_jobid_t jobid);
178 static orte_sstore_stage_global_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle);
179
180 static int metadata_open(orte_sstore_stage_global_snapshot_info_t * handle_info);
181 static int metadata_close(orte_sstore_stage_global_snapshot_info_t * handle_info);
182 static int metadata_write_int(orte_sstore_stage_global_snapshot_info_t * handle_info, char * key, int value);
183 static int metadata_write_str(orte_sstore_stage_global_snapshot_info_t * handle_info, char * key, char *value);
184 static int metadata_write_timestamp(orte_sstore_stage_global_snapshot_info_t * handle_info);
185
186 static int init_global_snapshot_directory(orte_sstore_stage_global_snapshot_info_t *handle_info);
187 static int stage_snapshot_sort_compare_fn(opal_list_item_t **a,
188 opal_list_item_t **b);
189 static int orte_sstore_stage_extract_global_metadata(orte_sstore_stage_global_snapshot_info_t * handle_info,
190 orte_sstore_base_global_snapshot_info_t *global_snapshot);
191
192 static int wait_all_filem(orte_sstore_stage_global_snapshot_info_t *handle_info);
193 static void sync_global_dir(orte_sstore_stage_global_snapshot_info_t *handle_info);
194
195 static int next_handle_id = 1;
196 static opal_list_t *active_handles = NULL;
197
198
199
200
201 static void sstore_stage_report_progress(orte_sstore_stage_global_snapshot_info_t *handle_info);
202
203 #define SSTORE_STAGE_REPORT_PROGRESS(handle_info) \
204 { \
205 if(OPAL_UNLIKELY(orte_sstore_stage_progress_meter > 0)) { \
206 sstore_stage_report_progress(handle_info); \
207 } \
208 }
209
210
211
212
213 void orte_sstore_stage_global_snapshot_info_construct(orte_sstore_stage_global_snapshot_info_t *info)
214 {
215 info->id = next_handle_id;
216 next_handle_id++;
217
218 info->jobid = ORTE_JOBID_INVALID;
219
220 info->state = SSTORE_GLOBAL_NONE;
221
222 info->handle_type = SSTORE_HANDLE_TYPE_NONE;
223
224 info->seq_num = -1;
225
226 info->base_location = strdup(orte_sstore_base_global_snapshot_dir);
227
228 info->ref_name = NULL;
229 info->local_location = NULL;
230 info->app_global_location_fmt = NULL;
231 info->app_local_location_fmt = NULL;
232 info->app_local_cache_location_fmt = NULL;
233
234 info->metadata_filename = NULL;
235 info->metadata = NULL;
236
237 info->filem_requests = OBJ_NEW(opal_list_t);
238
239 info->num_procs_synced = 0;
240 info->num_procs_done = 0;
241 info->num_procs_total = 0;
242
243 info->migrating = false;
244
245 info->compress_comp = NULL;
246 info->compress_postfix = NULL;
247
248 info->last_progress_report = 0.0;
249 }
250
251 void orte_sstore_stage_global_snapshot_info_destruct( orte_sstore_stage_global_snapshot_info_t *info)
252 {
253 info->id = 0;
254 info->seq_num = -1;
255
256 info->jobid = ORTE_JOBID_INVALID;
257
258 info->state = SSTORE_GLOBAL_NONE;
259
260 info->handle_type = SSTORE_HANDLE_TYPE_NONE;
261
262 if( NULL != info->ref_name ) {
263 free( info->ref_name );
264 info->ref_name = NULL;
265 }
266
267 if( NULL != info->local_location ) {
268 free( info->local_location );
269 info->local_location = NULL;
270 }
271
272 if( NULL != info->app_global_location_fmt ) {
273 free( info->app_global_location_fmt );
274 info->app_global_location_fmt = NULL;
275 }
276
277 if( NULL != info->app_local_location_fmt ) {
278 free( info->app_local_location_fmt );
279 info->app_local_location_fmt = NULL;
280 }
281
282 if( NULL != info->app_local_cache_location_fmt ) {
283 free( info->app_local_cache_location_fmt );
284 info->app_local_cache_location_fmt = NULL;
285 }
286
287 if( NULL != info->base_location ) {
288 free( info->base_location );
289 info->base_location = NULL;
290 }
291
292 if( NULL != info->metadata_filename ) {
293 free( info->metadata_filename ) ;
294 info->metadata_filename = NULL;
295 }
296
297 if( NULL != info->metadata ) {
298 fclose(info->metadata);
299 info->metadata = NULL;
300 }
301
302 if( NULL != info->filem_requests ) {
303 OBJ_RELEASE(info->filem_requests);
304 info->filem_requests = NULL;
305 }
306
307 info->num_procs_synced = 0;
308 info->num_procs_done = 0;
309 info->num_procs_total = 0;
310
311 info->migrating = false;
312
313 if( NULL != info->compress_comp ) {
314 free(info->compress_comp);
315 info->compress_comp = NULL;
316 }
317
318 if( NULL != info->compress_postfix ) {
319 free(info->compress_postfix);
320 info->compress_postfix = NULL;
321 }
322
323 info->last_progress_report = 0.0;
324 }
325
326
327
328
329 int orte_sstore_stage_global_module_init(void)
330 {
331 int ret, exit_status = ORTE_SUCCESS;
332
333 if( NULL == active_handles ) {
334 active_handles = OBJ_NEW(opal_list_t);
335 }
336
337
338
339
340
341 if( orte_sstore_stage_enabled_caching && !orte_enable_recovery ) {
342 opal_show_help("help-orte-sstore-stage.txt", "caching_no_recovery", true);
343 }
344
345
346
347
348 if( ORTE_SUCCESS != (ret = sstore_stage_global_start_listener()) ) {
349 ORTE_ERROR_LOG(ret);
350 exit_status = ret;
351 goto cleanup;
352 }
353
354 exit_status = orte_sstore_stage_local_module_init();
355
356 cleanup:
357 return exit_status;
358 }
359
360 int orte_sstore_stage_global_module_finalize(void)
361 {
362 int ret, exit_status = ORTE_SUCCESS;
363 orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
364 opal_list_item_t* item = NULL;
365 bool done = false;
366 int cur_time = 0, max_time = 120;
367
368
369
370
371 done = false;
372 while( 0 < opal_list_get_size(active_handles) && !done ) {
373 done = true;
374 for(item = opal_list_get_first(active_handles);
375 item != opal_list_get_end(active_handles);
376 item = opal_list_get_next(item) ) {
377 handle_info = (orte_sstore_stage_global_snapshot_info_t*)item;
378 if( SSTORE_GLOBAL_SYNCED != handle_info->state &&
379 SSTORE_GLOBAL_NONE != handle_info->state ) {
380 done = false;
381 break;
382 }
383 }
384 if( done ) {
385 break;
386 }
387 else {
388 if( cur_time != 0 && cur_time % 30 == 0 ) {
389 opal_output(0, "---> Waiting for sync(): %3d / %3d\n",
390 cur_time, max_time);
391 }
392
393 opal_progress();
394 if( cur_time >= max_time ) {
395 break;
396 } else {
397 sleep(1);
398 }
399 cur_time++;
400 }
401 }
402
403 exit_status = orte_sstore_stage_local_module_finalize();
404
405 if( NULL != active_handles ) {
406 OBJ_RELEASE(active_handles);
407 }
408
409
410
411
412 if( ORTE_SUCCESS != (ret = sstore_stage_global_stop_listener()) ) {
413 ORTE_ERROR_LOG(ret);
414 exit_status = ret;
415 goto cleanup;
416 }
417
418 cleanup:
419 return exit_status;
420 }
421
422 int orte_sstore_stage_global_request_checkpoint_handle(orte_sstore_base_handle_t *handle, int seq, orte_jobid_t jobid)
423 {
424 int ret, exit_status = ORTE_SUCCESS;
425 orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
426
427 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
428 "sstore:stage:(global): request_checkpoint_handle()"));
429
430
431
432
433
434 handle_info = create_new_handle_info(seq, SSTORE_HANDLE_TYPE_CKPT, jobid);
435
436
437
438
439 if( ORTE_SUCCESS != (ret = init_global_snapshot_directory(handle_info)) ) {
440 ORTE_ERROR_LOG(ret);
441 exit_status = ret;
442 goto cleanup;
443 }
444
445
446
447
448 *handle = handle_info->id;
449
450 cleanup:
451 return exit_status;
452 }
453
454 int orte_sstore_stage_global_request_global_snapshot_data(orte_sstore_base_handle_t *handle,
455 orte_sstore_base_global_snapshot_info_t *snapshot)
456 {
457 int ret, exit_status = ORTE_SUCCESS;
458 orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
459
460 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
461 "sstore:stage:(global): request_global_snapshot_data()"));
462
463
464
465
466 if( NULL != handle ) {
467 handle_info = find_handle_info(*handle);
468 snapshot->ss_handle = *handle;
469 } else {
470 handle_info = find_handle_info(orte_sstore_handle_last_stable);
471 snapshot->ss_handle = orte_sstore_handle_last_stable;
472 }
473
474
475
476
477 snapshot->seq_num = handle_info->seq_num;
478 snapshot->reference = strdup(handle_info->ref_name);
479 snapshot->basedir = strdup(handle_info->base_location);
480 snapshot->metadata_filename = strdup(handle_info->metadata_filename);
481
482
483 if( orte_sstore_handle_current == snapshot->ss_handle ) {
484 if( ORTE_SUCCESS != (ret = orte_sstore_stage_extract_global_metadata(handle_info, snapshot)) ) {
485 ORTE_ERROR_LOG(ret);
486 exit_status = ret;
487 goto cleanup;
488 }
489 }
490
491 else {
492 if( ORTE_SUCCESS != (ret = orte_sstore_base_extract_global_metadata(snapshot)) ) {
493 ORTE_ERROR_LOG(ret);
494 exit_status = ret;
495 goto cleanup;
496 }
497 }
498
499 opal_list_sort(&snapshot->local_snapshots, stage_snapshot_sort_compare_fn);
500
501 cleanup:
502 return exit_status;
503 }
504
505 int orte_sstore_stage_global_register(orte_sstore_base_handle_t handle)
506 {
507 int ret, exit_status = ORTE_SUCCESS;
508 orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
509
510 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
511 "sstore:stage:(global): register(%d) - Global", handle));
512
513
514
515
516 handle_info = find_handle_info(handle);
517 if( SSTORE_GLOBAL_REG != handle_info->state ) {
518 handle_info->state = SSTORE_GLOBAL_REG;
519 } else {
520 return orte_sstore_stage_local_register(handle);
521 }
522
523 orte_sstore_handle_current = handle;
524
525
526
527
528 if( handle_info->migrating ) {
529 if( ORTE_SUCCESS != (ret = metadata_write_int(handle_info,
530 SSTORE_METADATA_INTERNAL_MIG_SEQ_STR,
531 handle_info->seq_num)) ) {
532 ORTE_ERROR_LOG(ret);
533 exit_status = ret;
534 goto cleanup;
535 }
536 } else {
537 if( ORTE_SUCCESS != (ret = metadata_write_int(handle_info,
538 SSTORE_METADATA_GLOBAL_SNAP_SEQ_STR,
539 handle_info->seq_num)) ) {
540 ORTE_ERROR_LOG(ret);
541 exit_status = ret;
542 goto cleanup;
543 }
544 }
545
546 if( ORTE_SUCCESS != (ret = metadata_write_str(handle_info,
547 SSTORE_METADATA_LOCAL_SNAP_REF_FMT_STR,
548 orte_sstore_base_local_snapshot_fmt)) ) {
549 ORTE_ERROR_LOG(ret);
550 exit_status = ret;
551 goto cleanup;
552 }
553
554 if( ORTE_SUCCESS != (ret = metadata_write_timestamp(handle_info)) ) {
555 ORTE_ERROR_LOG(ret);
556 exit_status = ret;
557 goto cleanup;
558 }
559
560 cleanup:
561 return exit_status;
562 }
563
564 int orte_sstore_stage_global_get_attr(orte_sstore_base_handle_t handle, orte_sstore_base_key_t key, char **value)
565 {
566 int exit_status = ORTE_SUCCESS;
567 orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
568
569 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
570 "sstore:stage:(global): get_attr()"));
571
572
573
574
575 handle_info = find_handle_info(handle);
576
577
578
579
580
581 if( SSTORE_METADATA_GLOBAL_SNAP_REF == key ) {
582 *value = strdup(handle_info->ref_name);
583 }
584
585 else if( SSTORE_METADATA_GLOBAL_SNAP_SEQ == key ) {
586 opal_asprintf(value, "%d", handle_info->seq_num);
587 }
588
589 else if( SSTORE_METADATA_LOCAL_SNAP_LOC == key ) {
590 opal_asprintf(value, "%s/%s/%d",
591 handle_info->base_location,
592 handle_info->ref_name,
593 handle_info->seq_num);
594 }
595
596 else if( SSTORE_METADATA_LOCAL_SNAP_REF_FMT == key ) {
597 *value = strdup(orte_sstore_base_local_snapshot_fmt);
598 }
599
600 else if( SSTORE_METADATA_LOCAL_SNAP_REF_LOC_FMT == key ) {
601 opal_asprintf(value, "%s/%s/%d/%s",
602 handle_info->base_location,
603 handle_info->ref_name,
604 handle_info->seq_num,
605 orte_sstore_base_local_snapshot_fmt);
606 }
607 else {
608 exit_status = ORTE_ERR_NOT_SUPPORTED;
609 }
610
611 return exit_status;
612 }
613
614 int orte_sstore_stage_global_set_attr(orte_sstore_base_handle_t handle, orte_sstore_base_key_t key, char *value)
615 {
616 int ret, exit_status = ORTE_SUCCESS;
617 orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
618 char *key_str = NULL;
619
620 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
621 "sstore:stage:(global): set_attr()"));
622
623
624
625
626 handle_info = find_handle_info(handle);
627
628
629
630
631 if( key == SSTORE_METADATA_GLOBAL_MIGRATING ) {
632 handle_info->migrating = true;
633 }
634 else {
635 orte_sstore_base_convert_key_to_string(key, &key_str);
636 if( NULL == key_str ) {
637 ORTE_ERROR_LOG(ORTE_ERROR);
638 exit_status = ORTE_ERROR;
639 goto cleanup;
640 }
641
642 if( ORTE_SUCCESS != (ret = metadata_write_str(handle_info, key_str, value))) {
643 ORTE_ERROR_LOG(ret);
644 exit_status = ret;
645 goto cleanup;
646 }
647 }
648
649 cleanup:
650 if( NULL != key_str ) {
651 free(key_str);
652 key_str = NULL;
653 }
654
655 return exit_status;
656 }
657
658 int orte_sstore_stage_global_sync(orte_sstore_base_handle_t handle)
659 {
660 int ret, exit_status = ORTE_SUCCESS;
661 orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
662
663 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
664 "sstore:stage:(global): sync()"));
665
666
667
668
669 handle_info = find_handle_info(handle);
670 if( SSTORE_GLOBAL_SYNCING != handle_info->state ) {
671 handle_info->state = SSTORE_GLOBAL_SYNCING;
672 if( ORTE_SNAPC_LOCAL_COORD_TYPE == (orte_snapc_coord_type & ORTE_SNAPC_LOCAL_COORD_TYPE) ) {
673 return orte_sstore_stage_local_sync(handle);
674 }
675 }
676
677
678
679
680 while(handle_info->num_procs_synced < handle_info->num_procs_total) {
681 opal_progress();
682 }
683
684
685
686
687
688 if( !orte_sstore_stage_skip_filem ) {
689 if( ORTE_SUCCESS != (ret = wait_all_filem(handle_info))) {
690 ORTE_ERROR_LOG(ret);
691 exit_status = ret;
692 goto cleanup;
693 }
694 }
695
696
697
698
699 if( ORTE_SUCCESS != (ret = metadata_write_timestamp(handle_info)) ) {
700 ORTE_ERROR_LOG(ret);
701 exit_status = ret;
702 goto cleanup;
703 }
704
705 if( handle_info->migrating ) {
706 if( ORTE_SUCCESS != (ret = metadata_write_int(handle_info,
707 SSTORE_METADATA_INTERNAL_DONE_MIG_SEQ_STR,
708 handle_info->seq_num)) ) {
709 ORTE_ERROR_LOG(ret);
710 exit_status = ret;
711 goto cleanup;
712 }
713 } else {
714 if( ORTE_SUCCESS != (ret = metadata_write_int(handle_info,
715 SSTORE_METADATA_INTERNAL_DONE_SEQ_STR,
716 handle_info->seq_num)) ) {
717 ORTE_ERROR_LOG(ret);
718 exit_status = ret;
719 goto cleanup;
720 }
721 }
722
723 if( ORTE_SUCCESS != (ret = metadata_close(handle_info)) ) {
724 ORTE_ERROR_LOG(ret);
725 exit_status = ret;
726 goto cleanup;
727 }
728
729
730 if( !handle_info->migrating ) {
731 orte_sstore_base_is_checkpoint_available = true;
732 orte_sstore_handle_last_stable = orte_sstore_handle_current;
733 }
734
735 handle_info->state = SSTORE_GLOBAL_SYNCED;
736
737 cleanup:
738 return exit_status;
739 }
740
741 static void sync_global_dir(orte_sstore_stage_global_snapshot_info_t *handle_info)
742 {
743 opal_list_item_t* item = NULL, *f_item = NULL;
744 orte_filem_base_request_t *filem_request = NULL;
745 orte_filem_base_file_set_t * f_set = NULL;
746 char * fs_str = NULL;
747 char cwd[OPAL_PATH_MAX];
748
749 opal_getcwd(cwd, OPAL_PATH_MAX);
750
751
752
753
754 opal_asprintf(&fs_str, "%s/%s/%d",
755 handle_info->base_location,
756 handle_info->ref_name,
757 handle_info->seq_num);
758 OPAL_OUTPUT_VERBOSE((20, mca_sstore_stage_component.super.output_handle,
759 "sstore:stage:(global): sync_dir(): Sync'ing on %s",
760 fs_str));
761 if( 0 != chdir(fs_str) ) {
762 opal_output(0, "sstore:stage:(global): Failed to chdir(%s)",
763 fs_str);
764 goto cleanup;
765 }
766 system("sync ; sync ; ls > /dev/null");
767
768
769
770
771
772 if( orte_sstore_stage_enabled_compression ) {
773 goto cleanup;
774 }
775
776 for(f_item = opal_list_get_first(handle_info->filem_requests);
777 f_item != opal_list_get_end(handle_info->filem_requests);
778 f_item = opal_list_get_next(f_item) ) {
779 filem_request = (orte_filem_base_request_t *)f_item;
780
781 for(item = opal_list_get_first(&(filem_request->file_sets));
782 item != opal_list_get_end(&(filem_request->file_sets));
783 item = opal_list_get_next(item) ) {
784 f_set = (orte_filem_base_file_set_t *) item;
785
786 if( NULL != fs_str ) {
787 free(fs_str);
788 fs_str = NULL;
789 }
790
791 if( ORTE_FILEM_TYPE_FILE != f_set->target_flag ) {
792 OPAL_OUTPUT_VERBOSE((20, mca_sstore_stage_component.super.output_handle,
793 "sstore:stage:(global): sync_dir(): Sync'ing on %s",
794 f_set->local_target));
795 if( 0 != chdir(f_set->local_target) ) {
796 opal_output(0, "sstore:stage:(global): Failed to chdir(%s)",
797 f_set->local_target);
798 } else {
799 system("sync ; sync ");
800 }
801 }
802 }
803 }
804
805 cleanup:
806 chdir(cwd);
807
808 if( NULL != fs_str ) {
809 free(fs_str);
810 fs_str = NULL;
811 }
812
813 return;
814 }
815
816 int orte_sstore_stage_global_remove(orte_sstore_base_handle_t handle)
817 {
818 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
819 "sstore:stage:(global): remove()"));
820
821
822
823
824
825 return ORTE_SUCCESS;
826 }
827
828 int orte_sstore_stage_global_pack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t handle)
829 {
830 int ret, exit_status = ORTE_SUCCESS;
831 orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
832
833 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
834 "sstore:stage:(global): pack()"));
835
836
837
838
839 handle_info = find_handle_info(handle);
840
841
842
843
844 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &handle, 1, ORTE_SSTORE_HANDLE))) {
845 ORTE_ERROR_LOG(ret);
846 exit_status = ret;
847 goto cleanup;
848 }
849
850
851
852
853 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->seq_num), 1, OPAL_INT )) ) {
854 ORTE_ERROR_LOG(ret);
855 exit_status = ret;
856 goto cleanup;
857 }
858
859 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->ref_name), 1, OPAL_STRING )) ) {
860 ORTE_ERROR_LOG(ret);
861 exit_status = ret;
862 goto cleanup;
863 }
864
865 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->app_local_location_fmt), 1, OPAL_STRING )) ) {
866 ORTE_ERROR_LOG(ret);
867 exit_status = ret;
868 goto cleanup;
869 }
870
871 if( orte_sstore_stage_enabled_caching ) {
872 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->app_local_cache_location_fmt), 1, OPAL_STRING )) ) {
873 ORTE_ERROR_LOG(ret);
874 exit_status = ret;
875 goto cleanup;
876 }
877 }
878
879 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->migrating), 1, OPAL_BOOL )) ) {
880 ORTE_ERROR_LOG(ret);
881 exit_status = ret;
882 goto cleanup;
883 }
884
885 cleanup:
886 return exit_status;
887 }
888
889 int orte_sstore_stage_global_unpack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t *handle)
890 {
891 int ret, exit_status = ORTE_SUCCESS;
892
893 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
894 "sstore:stage:(global): unpack()"));
895
896
897
898
899 if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_JOBID,
900 ORTE_PROC_MY_NAME,
901 peer)) {
902
903
904
905 if( ORTE_SUCCESS != (ret = orte_sstore_stage_local_unpack(peer, buffer, handle)) ) {
906 ORTE_ERROR_LOG(ret);
907 exit_status = ret;
908 goto cleanup;
909 }
910 }
911
912 cleanup:
913 return exit_status;
914 }
915
916
917
918
919 static orte_sstore_stage_global_snapshot_info_t *create_new_handle_info(int seq, int type, orte_jobid_t jobid)
920 {
921 orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
922 orte_job_t *jdata = NULL;
923
924 handle_info = OBJ_NEW(orte_sstore_stage_global_snapshot_info_t);
925
926 handle_info->jobid = jobid;
927
928 handle_info->state = SSTORE_GLOBAL_INIT;
929
930 handle_info->handle_type = type;
931
932 handle_info->seq_num = seq;
933
934 orte_sstore_base_get_global_snapshot_ref(&(handle_info->ref_name), getpid());
935
936 opal_asprintf(&(handle_info->local_location), "%s/%d",
937 handle_info->ref_name, handle_info->seq_num);
938
939
940 opal_asprintf(&(handle_info->app_local_location_fmt), "%s/%s/%s/%s",
941 orte_sstore_stage_local_snapshot_dir,
942 ORTE_SSTORE_LOCAL_SNAPSHOT_DIR_NAME,
943 ORTE_SSTORE_LOCAL_SNAPSHOT_STAGE_DIR_NAME,
944 orte_sstore_base_local_snapshot_fmt);
945
946 if( orte_sstore_stage_enabled_caching ) {
947 opal_asprintf(&(handle_info->app_local_cache_location_fmt), "%s/%s/%s/%d/%s",
948 orte_sstore_stage_local_snapshot_dir,
949 ORTE_SSTORE_LOCAL_SNAPSHOT_DIR_NAME,
950 ORTE_SSTORE_LOCAL_SNAPSHOT_CACHE_DIR_NAME,
951 handle_info->seq_num,
952 orte_sstore_base_local_snapshot_fmt);
953 }
954
955
956 opal_asprintf(&(handle_info->app_global_location_fmt), "%s/%s/%s",
957 handle_info->base_location,
958 handle_info->local_location,
959 orte_sstore_base_local_snapshot_fmt);
960
961 opal_asprintf(&(handle_info->metadata_filename), "%s/%s/%s",
962 handle_info->base_location,
963 handle_info->ref_name,
964 orte_sstore_base_global_metadata_filename);
965
966 jdata = orte_get_job_data_object(handle_info->jobid);
967 handle_info->num_procs_total = (int)jdata->num_procs;
968
969 opal_list_append(active_handles, &(handle_info->super));
970
971 return handle_info;
972 }
973
974 static orte_sstore_stage_global_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle)
975 {
976 orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
977 opal_list_item_t* item = NULL;
978
979 for(item = opal_list_get_first(active_handles);
980 item != opal_list_get_end(active_handles);
981 item = opal_list_get_next(item) ) {
982 handle_info = (orte_sstore_stage_global_snapshot_info_t*)item;
983
984 if( handle_info->id == handle ) {
985 return handle_info;
986 }
987 }
988
989 return NULL;
990 }
991
992 static int sstore_stage_global_start_listener(void)
993 {
994 if( is_global_listener_active ) {
995 return ORTE_SUCCESS;
996 }
997
998 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL,
999 ORTE_RML_PERSISTENT, sstore_stage_global_recv, NULL);
1000
1001 is_global_listener_active = true;
1002 return ORTE_SUCCESS;
1003 }
1004
1005 static int sstore_stage_global_stop_listener(void)
1006 {
1007 orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL);
1008
1009 is_global_listener_active = false;
1010 return ORTE_SUCCESS;
1011 }
1012
1013 static void sstore_stage_global_recv(int status,
1014 orte_process_name_t* sender,
1015 opal_buffer_t* buffer,
1016 orte_rml_tag_t tag,
1017 void* cbdata)
1018 {
1019 int ret;
1020 orte_sstore_stage_cmd_flag_t command;
1021 orte_std_cntr_t count;
1022 orte_sstore_base_handle_t loc_id;
1023 orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
1024
1025 if( ORTE_RML_TAG_SSTORE_INTERNAL != tag ) {
1026 return;
1027 }
1028
1029 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1030 "sstore:stage:(global): process_cmd(%s)",
1031 ORTE_NAME_PRINT(sender)));
1032
1033 count = 1;
1034 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SSTORE_STAGE_CMD))) {
1035 ORTE_ERROR_LOG(ret);
1036 goto cleanup;
1037 }
1038
1039 count = 1;
1040 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &loc_id, &count, ORTE_SSTORE_HANDLE )) ) {
1041 ORTE_ERROR_LOG(ret);
1042 goto cleanup;
1043 }
1044
1045
1046
1047
1048
1049 if(OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_JOBID,
1050 ORTE_PROC_MY_NAME,
1051 sender)) {
1052
1053 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1054 "sstore:stage:(local): process_cmd(%s)",
1055 ORTE_NAME_PRINT(sender)));
1056
1057 orte_sstore_stage_local_process_cmd_action(sender, command, loc_id, buffer);
1058 return;
1059 }
1060
1061
1062
1063
1064 if(NULL == (handle_info = find_handle_info(loc_id)) ) {
1065 ;
1066 }
1067
1068 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1069 "sstore:stage:(global): process_cmd(%s) - Command = %s",
1070 ORTE_NAME_PRINT(sender),
1071 (ORTE_SSTORE_STAGE_PULL == command ? "Pull" :
1072 (ORTE_SSTORE_STAGE_PUSH == command ? "Push" :
1073 (ORTE_SSTORE_STAGE_REMOVE == command ? "Remove" :
1074 (ORTE_SSTORE_STAGE_DONE == command ? "Done" : "Unknown")))) ));
1075
1076
1077
1078
1079 if( ORTE_SSTORE_STAGE_PULL == command ) {
1080 process_local_pull(sender, buffer, handle_info);
1081 }
1082 else if( ORTE_SSTORE_STAGE_PUSH == command ) {
1083 process_local_push(sender, buffer, handle_info);
1084 }
1085 else if( ORTE_SSTORE_STAGE_REMOVE == command ) {
1086
1087 orte_sstore_stage_local_process_cmd_action(sender, command, loc_id, buffer);
1088 }
1089 else if( ORTE_SSTORE_STAGE_DONE == command ) {
1090 process_local_done(sender, buffer, handle_info);
1091 }
1092
1093 cleanup:
1094 return;
1095 }
1096
1097 static int process_local_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_global_snapshot_info_t *handle_info)
1098 {
1099 int ret, exit_status = ORTE_SUCCESS;
1100 opal_buffer_t *loc_buffer = NULL;
1101 orte_sstore_stage_cmd_flag_t command;
1102
1103
1104
1105
1106 loc_buffer = OBJ_NEW(opal_buffer_t);
1107
1108 command = ORTE_SSTORE_STAGE_PUSH;
1109 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
1110 ORTE_ERROR_LOG(ret);
1111 exit_status = ret;
1112 goto cleanup;
1113 }
1114
1115 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
1116 ORTE_ERROR_LOG(ret);
1117 exit_status = ret;
1118 goto cleanup;
1119 }
1120
1121 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->seq_num), 1, OPAL_INT))) {
1122 ORTE_ERROR_LOG(ret);
1123 exit_status = ret;
1124 goto cleanup;
1125 }
1126
1127 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->ref_name), 1, OPAL_STRING))) {
1128 ORTE_ERROR_LOG(ret);
1129 exit_status = ret;
1130 goto cleanup;
1131 }
1132
1133 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->app_local_location_fmt), 1, OPAL_STRING))) {
1134 ORTE_ERROR_LOG(ret);
1135 exit_status = ret;
1136 goto cleanup;
1137 }
1138
1139 if( orte_sstore_stage_enabled_caching ) {
1140 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->app_local_cache_location_fmt), 1, OPAL_STRING))) {
1141 ORTE_ERROR_LOG(ret);
1142 exit_status = ret;
1143 goto cleanup;
1144 }
1145 }
1146
1147 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->migrating), 1, OPAL_BOOL))) {
1148 ORTE_ERROR_LOG(ret);
1149 exit_status = ret;
1150 goto cleanup;
1151 }
1152
1153 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(peer, loc_buffer, ORTE_RML_TAG_SSTORE_INTERNAL,
1154 orte_rml_send_callback, NULL))) {
1155 ORTE_ERROR_LOG(ret);
1156 exit_status = ret;
1157 goto cleanup;
1158 }
1159
1160 loc_buffer = NULL;
1161
1162 cleanup:
1163 if (NULL != loc_buffer) {
1164 OBJ_RELEASE(loc_buffer);
1165 loc_buffer = NULL;
1166 }
1167
1168 return exit_status;
1169 }
1170
1171 static int process_local_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_global_snapshot_info_t *handle_info)
1172 {
1173 int ret, exit_status = ORTE_SUCCESS;
1174 orte_std_cntr_t count;
1175 size_t num_entries, i;
1176 orte_process_name_t name;
1177 bool ckpt_skipped = false;
1178 char * crs_comp = NULL;
1179 char * compress_comp = NULL;
1180 char * compress_postfix = NULL;
1181 char * proc_name = NULL;
1182 char * tmp_str = NULL;
1183 orte_filem_base_request_t *filem_request = NULL;
1184 orte_filem_base_process_set_t *p_set = NULL;
1185 orte_filem_base_file_set_t * f_set = NULL;
1186
1187 if( !orte_sstore_stage_skip_filem ) {
1188 filem_request = OBJ_NEW(orte_filem_base_request_t);
1189
1190
1191
1192
1193 p_set = OBJ_NEW(orte_filem_base_process_set_t);
1194 p_set->source.jobid = peer->jobid;
1195 p_set->source.vpid = peer->vpid;
1196 p_set->sink.jobid = ORTE_PROC_MY_NAME->jobid;
1197 p_set->sink.vpid = ORTE_PROC_MY_NAME->vpid;
1198 opal_list_append(&(filem_request->process_sets), &(p_set->super) );
1199 }
1200
1201
1202
1203
1204 count = 1;
1205 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &num_entries, &count, OPAL_SIZE))) {
1206 ORTE_ERROR_LOG(ret);
1207 exit_status = ret;
1208 goto cleanup;
1209 }
1210
1211 for(i = 0; i < num_entries; ++i ) {
1212 count = 1;
1213 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &name, &count, ORTE_NAME))) {
1214 ORTE_ERROR_LOG(ret);
1215 exit_status = ret;
1216 goto cleanup;
1217 }
1218
1219 count = 1;
1220 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &ckpt_skipped, &count, OPAL_BOOL))) {
1221 ORTE_ERROR_LOG(ret);
1222 exit_status = ret;
1223 goto cleanup;
1224 }
1225
1226 if( !ckpt_skipped ) {
1227 count = 1;
1228 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &crs_comp, &count, OPAL_STRING))) {
1229 ORTE_ERROR_LOG(ret);
1230 exit_status = ret;
1231 goto cleanup;
1232 }
1233
1234 if( orte_sstore_stage_enabled_compression ) {
1235 count = 1;
1236 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &compress_comp, &count, OPAL_STRING))) {
1237 ORTE_ERROR_LOG(ret);
1238 exit_status = ret;
1239 goto cleanup;
1240 }
1241 if( NULL == handle_info->compress_comp ) {
1242 handle_info->compress_comp = strdup(compress_comp);
1243 }
1244
1245 count = 1;
1246 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &compress_postfix, &count, OPAL_STRING))) {
1247 ORTE_ERROR_LOG(ret);
1248 exit_status = ret;
1249 goto cleanup;
1250 }
1251 if( NULL == handle_info->compress_postfix ) {
1252 handle_info->compress_postfix = strdup(compress_postfix);
1253 }
1254 }
1255
1256 if( !orte_sstore_stage_skip_filem ) {
1257
1258
1259
1260 f_set = OBJ_NEW(orte_filem_base_file_set_t);
1261 if( orte_sstore_stage_enabled_compression ) {
1262 f_set->target_flag = ORTE_FILEM_TYPE_FILE;
1263 } else {
1264 f_set->target_flag = ORTE_FILEM_TYPE_DIR;
1265 }
1266
1267 if( orte_sstore_stage_enabled_compression ) {
1268 opal_asprintf(&tmp_str,
1269 handle_info->app_global_location_fmt,
1270 name.vpid);
1271 opal_asprintf(&(f_set->local_target), "%s%s",
1272 tmp_str,
1273 compress_postfix);
1274 } else {
1275 opal_asprintf(&(f_set->local_target),
1276 handle_info->app_global_location_fmt,
1277 name.vpid);
1278 }
1279
1280 if( orte_sstore_stage_global_is_shared ) {
1281 f_set->local_hint = ORTE_FILEM_HINT_SHARED;
1282 }
1283
1284 if( orte_sstore_stage_enabled_compression ) {
1285 opal_asprintf(&tmp_str,
1286 handle_info->app_local_location_fmt,
1287 name.vpid);
1288 opal_asprintf(&(f_set->remote_target), "%s%s",
1289 tmp_str,
1290 compress_postfix);
1291 } else {
1292 opal_asprintf(&(f_set->remote_target),
1293 handle_info->app_local_location_fmt,
1294 name.vpid);
1295 }
1296
1297 opal_list_append(&(filem_request->file_sets), &(f_set->super) );
1298
1299 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1300 "sstore:stage:(global): push(): Pulling remote file <%s> to <%s>",
1301 f_set->remote_target,
1302 f_set->local_target));
1303 }
1304
1305
1306
1307
1308 orte_util_convert_process_name_to_string(&proc_name, &name);
1309
1310 metadata_write_str(handle_info,
1311 SSTORE_METADATA_INTERNAL_PROCESS_STR,
1312 proc_name);
1313 metadata_write_str(handle_info,
1314 SSTORE_METADATA_LOCAL_CRS_COMP_STR,
1315 crs_comp);
1316 if( orte_sstore_stage_enabled_compression ) {
1317 metadata_write_str(handle_info,
1318 SSTORE_METADATA_LOCAL_COMPRESS_COMP_STR,
1319 compress_comp);
1320 metadata_write_str(handle_info,
1321 SSTORE_METADATA_LOCAL_COMPRESS_POSTFIX_STR,
1322 compress_postfix);
1323 }
1324 }
1325
1326 if( NULL != crs_comp ) {
1327 free(crs_comp);
1328 crs_comp = NULL;
1329 }
1330 if( NULL != compress_comp ) {
1331 free(compress_comp);
1332 compress_comp = NULL;
1333 }
1334 if( NULL != compress_postfix ) {
1335 free(compress_postfix);
1336 compress_postfix = NULL;
1337 }
1338 if( NULL != proc_name ) {
1339 free(proc_name);
1340 proc_name = NULL;
1341 }
1342 if( NULL != tmp_str ) {
1343 free(tmp_str);
1344 tmp_str = NULL;
1345 }
1346
1347 (handle_info->num_procs_synced)++;
1348 }
1349
1350 if( !orte_sstore_stage_skip_filem && 0 < opal_list_get_size(&(filem_request->file_sets)) ) {
1351
1352
1353
1354 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1355 "sstore:stage:(global): push(): Pulling remote files from %s (%3d of %3d done)",
1356 ORTE_NAME_PRINT(peer),
1357 handle_info->num_procs_synced,
1358 handle_info->num_procs_total));
1359 opal_list_append(handle_info->filem_requests, &(filem_request->super));
1360 if(ORTE_SUCCESS != (ret = orte_filem.get_nb(filem_request)) ) {
1361 ORTE_ERROR_LOG(ret);
1362 exit_status = ret;
1363 goto cleanup;
1364 }
1365 }
1366
1367 cleanup:
1368 if( NULL != crs_comp ) {
1369 free(crs_comp);
1370 crs_comp = NULL;
1371 }
1372 if( NULL != compress_comp ) {
1373 free(compress_comp);
1374 compress_comp = NULL;
1375 }
1376 if( NULL != compress_postfix ) {
1377 free(compress_postfix);
1378 compress_postfix = NULL;
1379 }
1380 if( NULL != proc_name ) {
1381 free(proc_name);
1382 proc_name = NULL;
1383 }
1384 if( NULL != tmp_str ) {
1385 free(tmp_str);
1386 tmp_str = NULL;
1387 }
1388
1389 return exit_status;
1390 }
1391
1392 static int process_local_done(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_global_snapshot_info_t *handle_info)
1393 {
1394 int ret, exit_status = ORTE_SUCCESS;
1395 orte_std_cntr_t count;
1396 size_t num_entries;
1397
1398
1399
1400
1401 count = 1;
1402 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &num_entries, &count, OPAL_SIZE))) {
1403 ORTE_ERROR_LOG(ret);
1404 exit_status = ret;
1405 goto cleanup;
1406 }
1407
1408 (handle_info->num_procs_done) += (int)num_entries;
1409
1410 SSTORE_STAGE_REPORT_PROGRESS(handle_info);
1411
1412 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1413 "sstore:stage:(global): done(): [Peer %s] Moved %d files (%3d of %3d reported as done)",
1414 ORTE_NAME_PRINT(peer),
1415 (int)num_entries,
1416 handle_info->num_procs_done,
1417 handle_info->num_procs_total));
1418
1419 cleanup:
1420 return exit_status;
1421 }
1422
1423 static int init_global_snapshot_directory(orte_sstore_stage_global_snapshot_info_t *handle_info)
1424 {
1425 int ret, exit_status = ORTE_SUCCESS;
1426 char * dir_name = NULL;
1427 mode_t my_mode = S_IRWXU;
1428
1429
1430
1431
1432 opal_asprintf(&dir_name, "%s/%s",
1433 handle_info->base_location,
1434 handle_info->local_location);
1435 if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(dir_name, my_mode)) ) {
1436 ORTE_ERROR_LOG(ret);
1437 exit_status = ret;
1438 goto cleanup;
1439 }
1440
1441
1442
1443
1444 if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
1445 ORTE_ERROR_LOG(ret);
1446 exit_status = ret;
1447 goto cleanup;
1448 }
1449
1450 cleanup:
1451 if(NULL != dir_name) {
1452 free(dir_name);
1453 dir_name = NULL;
1454 }
1455
1456 return exit_status;
1457 }
1458
1459 static int wait_all_filem(orte_sstore_stage_global_snapshot_info_t *handle_info)
1460 {
1461 int ret, exit_status = ORTE_SUCCESS;
1462 opal_list_item_t* item = NULL;
1463
1464 if( orte_sstore_stage_skip_filem ) {
1465 return exit_status;
1466 }
1467
1468
1469
1470
1471 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1472 "sstore:stage:(global): wait_all_filem(): Waiting on all outstanding FileM requests (%d)",
1473 (int)opal_list_get_size(handle_info->filem_requests) ));
1474
1475 if(ORTE_SUCCESS != (ret = orte_filem.wait_all(handle_info->filem_requests)) ) {
1476 ORTE_ERROR_LOG(ret);
1477 exit_status = ret;
1478 goto cleanup;
1479 }
1480
1481
1482
1483
1484 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1485 "sstore:stage:(global): wait_all_filem(): Removing all local files"));
1486 if( ORTE_SUCCESS != (ret = xcast_remove_all(handle_info))) {
1487 ORTE_ERROR_LOG(ret);
1488 exit_status = ret;
1489 goto cleanup;
1490 }
1491
1492
1493
1494
1495 sync_global_dir(handle_info);
1496
1497
1498
1499
1500 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1501 "sstore:stage:(global): wait_all_filem(): Waiting for remove to finish..."));
1502 while(handle_info->num_procs_done < handle_info->num_procs_total) {
1503 opal_progress();
1504 }
1505
1506 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1507 "sstore:stage:(global): wait_all_filem(): All files have been transfered"));
1508
1509 cleanup:
1510 while (NULL != (item = opal_list_remove_first(handle_info->filem_requests) ) ) {
1511 OBJ_RELEASE(item);
1512 }
1513 OBJ_DESTRUCT(handle_info->filem_requests);
1514
1515 return exit_status;
1516 }
1517
1518 static int xcast_remove_all(orte_sstore_stage_global_snapshot_info_t *handle_info)
1519 {
1520 int ret, exit_status = ORTE_SUCCESS;
1521 opal_buffer_t *loc_buffer = NULL;
1522 orte_sstore_stage_cmd_flag_t command;
1523 orte_grpcomm_signature_t *sig;
1524
1525 handle_info->num_procs_done = 0;
1526
1527 loc_buffer = OBJ_NEW(opal_buffer_t);
1528
1529 command = ORTE_SSTORE_STAGE_REMOVE;
1530 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
1531 ORTE_ERROR_LOG(ret);
1532 exit_status = ret;
1533 goto cleanup;
1534 }
1535
1536 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
1537 ORTE_ERROR_LOG(ret);
1538 exit_status = ret;
1539 goto cleanup;
1540 }
1541
1542
1543 sig = OBJ_NEW(orte_grpcomm_signature_t);
1544 sig->signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t));
1545 sig->signature[0].jobid = ORTE_PROC_MY_NAME->jobid;
1546 sig->signature[0].vpid = ORTE_VPID_WILDCARD;
1547 if (ORTE_SUCCESS != (ret = orte_grpcomm.xcast(sig, ORTE_RML_TAG_SSTORE_INTERNAL, loc_buffer))) {
1548 ORTE_ERROR_LOG(ret);
1549 exit_status = ret;
1550 goto cleanup;
1551 }
1552
1553
1554 loc_buffer = NULL;
1555
1556 cleanup:
1557 if (NULL != loc_buffer) {
1558 OBJ_RELEASE(loc_buffer);
1559 loc_buffer = NULL;
1560 }
1561
1562 OBJ_RELEASE(sig);
1563
1564 return exit_status;
1565 }
1566
1567
1568
1569
1570 static int metadata_open(orte_sstore_stage_global_snapshot_info_t * handle_info)
1571 {
1572
1573 if( NULL != handle_info->metadata ) {
1574 return ORTE_SUCCESS;
1575 }
1576
1577 if (NULL == (handle_info->metadata = fopen(handle_info->metadata_filename, "a")) ) {
1578 opal_output(orte_sstore_base_framework.framework_output,
1579 "sstore:stage:(global):init_dir() Unable to open the file (%s)\n",
1580 handle_info->metadata_filename);
1581 ORTE_ERROR_LOG(ORTE_ERROR);
1582 return ORTE_ERROR;
1583 }
1584
1585 return ORTE_SUCCESS;
1586 }
1587
1588 static int metadata_close(orte_sstore_stage_global_snapshot_info_t * handle_info)
1589 {
1590
1591 if( NULL == handle_info->metadata ) {
1592 return ORTE_SUCCESS;
1593 }
1594
1595 fclose(handle_info->metadata);
1596 handle_info->metadata = NULL;
1597
1598 return ORTE_SUCCESS;
1599 }
1600
1601 static int metadata_write_int(orte_sstore_stage_global_snapshot_info_t * handle_info, char *key, int value)
1602 {
1603 int ret, exit_status = ORTE_SUCCESS;
1604
1605
1606 if( NULL == handle_info->metadata ) {
1607 if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
1608 ORTE_ERROR_LOG(ret);
1609 exit_status = ret;
1610 goto cleanup;
1611 }
1612 }
1613
1614 fprintf(handle_info->metadata, "%s%d\n", key, value);
1615
1616 cleanup:
1617 return exit_status;
1618 }
1619
1620 static int metadata_write_str(orte_sstore_stage_global_snapshot_info_t * handle_info, char *key, char *value)
1621 {
1622 int ret, exit_status = ORTE_SUCCESS;
1623
1624
1625 if( NULL == handle_info->metadata ) {
1626 if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
1627 ORTE_ERROR_LOG(ret);
1628 exit_status = ret;
1629 goto cleanup;
1630 }
1631 }
1632
1633 fprintf(handle_info->metadata, "%s%s\n", key, value);
1634
1635 cleanup:
1636 return exit_status;
1637 }
1638
1639 static int metadata_write_timestamp(orte_sstore_stage_global_snapshot_info_t * handle_info)
1640 {
1641 int ret, exit_status = ORTE_SUCCESS;
1642 time_t timestamp;
1643
1644
1645 if( NULL == handle_info->metadata ) {
1646 if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
1647 ORTE_ERROR_LOG(ret);
1648 exit_status = ret;
1649 goto cleanup;
1650 }
1651 }
1652
1653 timestamp = time(NULL);
1654 fprintf(handle_info->metadata, "%s%s",
1655 SSTORE_METADATA_INTERNAL_TIME_STR,
1656 ctime(×tamp));
1657
1658 cleanup:
1659 return exit_status;
1660 }
1661
1662 static int orte_sstore_stage_extract_global_metadata(orte_sstore_stage_global_snapshot_info_t * handle_info,
1663 orte_sstore_base_global_snapshot_info_t *global_snapshot)
1664 {
1665 int exit_status = ORTE_SUCCESS;
1666 orte_sstore_base_local_snapshot_info_t *vpid_snapshot = NULL;
1667 opal_list_item_t* item = NULL;
1668 int i = 0;
1669
1670
1671
1672
1673 while (NULL != (item = opal_list_remove_first(&global_snapshot->local_snapshots))) {
1674 OBJ_RELEASE(item);
1675 }
1676
1677 if( NULL != global_snapshot->start_time ) {
1678 free( global_snapshot->start_time );
1679 global_snapshot->start_time = NULL;
1680 }
1681
1682 if( NULL != global_snapshot->end_time ) {
1683 free( global_snapshot->end_time );
1684 global_snapshot->end_time = NULL;
1685 }
1686
1687
1688
1689
1690 for(i = 0; i < handle_info->num_procs_total; ++i) {
1691 vpid_snapshot = OBJ_NEW(orte_sstore_base_local_snapshot_info_t);
1692 vpid_snapshot->ss_handle = handle_info->id;
1693
1694 vpid_snapshot->process_name.jobid = handle_info->jobid;
1695 vpid_snapshot->process_name.vpid = i;
1696
1697
1698
1699
1700
1701 vpid_snapshot->crs_comp = NULL;
1702 if( NULL != handle_info->compress_comp ) {
1703 vpid_snapshot->compress_comp = strdup(handle_info->compress_comp);
1704 } else {
1705 vpid_snapshot->compress_comp = NULL;
1706 }
1707 if( NULL != handle_info->compress_postfix ) {
1708 vpid_snapshot->compress_postfix = strdup(handle_info->compress_postfix);
1709 } else {
1710 vpid_snapshot->compress_postfix = NULL;
1711 }
1712 vpid_snapshot->start_time = NULL;
1713 vpid_snapshot->end_time = NULL;
1714
1715 opal_list_append(&global_snapshot->local_snapshots, &(vpid_snapshot->super));
1716 }
1717
1718 return exit_status;
1719 }
1720
1721 static int stage_snapshot_sort_compare_fn(opal_list_item_t **a,
1722 opal_list_item_t **b)
1723 {
1724 orte_sstore_base_local_snapshot_info_t *snap_a, *snap_b;
1725
1726 snap_a = (orte_sstore_base_local_snapshot_info_t*)(*a);
1727 snap_b = (orte_sstore_base_local_snapshot_info_t*)(*b);
1728
1729 if( snap_a->process_name.vpid > snap_b->process_name.vpid ) {
1730 return 1;
1731 }
1732 else if( snap_a->process_name.vpid == snap_b->process_name.vpid ) {
1733 return 0;
1734 }
1735 else {
1736 return -1;
1737 }
1738 }
1739
1740 static void sstore_stage_report_progress(orte_sstore_stage_global_snapshot_info_t *handle_info)
1741 {
1742 double perc_done;
1743
1744 perc_done = (handle_info->num_procs_total - handle_info->num_procs_done);
1745 perc_done = perc_done / (1.0 * handle_info->num_procs_total);
1746 perc_done = (perc_done-1)*(-100.0);
1747
1748 if( perc_done >= (handle_info->last_progress_report + orte_sstore_stage_progress_meter ) ||
1749 handle_info->last_progress_report == 0.0 ) {
1750 handle_info->last_progress_report = perc_done;
1751 opal_output(0, "sstore:stage: progress: %10.2f %c Finished\n",
1752 perc_done, '%');
1753 }
1754
1755 return;
1756 }