This source file includes following definitions.
- orte_sstore_stage_local_snapshot_info_construct
- orte_sstore_stage_local_snapshot_info_destruct
- orte_sstore_stage_local_app_snapshot_info_construct
- orte_sstore_stage_local_app_snapshot_info_destruct
- orte_sstore_stage_local_module_init
- orte_sstore_stage_local_module_finalize
- orte_sstore_stage_local_request_checkpoint_handle
- orte_sstore_stage_local_register
- orte_sstore_stage_local_get_attr
- orte_sstore_stage_local_set_attr
- orte_sstore_stage_local_sync
- orte_sstore_stage_local_remove
- orte_sstore_stage_local_pack
- orte_sstore_stage_local_unpack
- orte_sstore_stage_local_fetch_app_deps
- orte_sstore_stage_local_wait_all_deps
- create_new_handle_info
- find_handle_info
- find_handle_info_ref
- append_new_app_handle_info
- find_app_handle_info
- sstore_stage_local_start_listener
- sstore_stage_local_stop_listener
- sstore_stage_local_recv
- orte_sstore_stage_local_process_cmd_action
- process_global_pull
- process_global_push
- process_global_remove
- process_app_pull
- process_app_push
- wait_all_apps_updated
- start_compression
- sstore_stage_local_compress_waitpid_cb
- wait_all_compressed
- pull_handle_info
- push_handle_info
- sstore_stage_create_local_dir
- sstore_stage_destroy_local_dir
- sstore_stage_create_cache
- sstore_stage_destroy_cache
- sstore_stage_update_cache
- orte_sstore_stage_local_preload_files
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 #include "orte_config.h"
22
23 #include <string.h>
24 #include <stdlib.h>
25 #include <sys/types.h>
26 #include <sys/stat.h>
27 #include <sys/wait.h>
28 #ifdef HAVE_UNISTD_H
29 #include <unistd.h>
30 #endif
31
32 #include "orte/mca/mca.h"
33 #include "opal/mca/base/base.h"
34
35 #include "opal/mca/event/event.h"
36
37 #include "orte/constants.h"
38 #include "orte/util/show_help.h"
39 #include "opal/util/argv.h"
40 #include "opal/util/output.h"
41 #include "opal/util/opal_environ.h"
42 #include "opal/util/basename.h"
43 #include "opal/util/os_dirpath.h"
44
45 #include "opal/mca/compress/compress.h"
46 #include "opal/mca/compress/base/base.h"
47
48 #include "opal/threads/mutex.h"
49 #include "opal/threads/condition.h"
50
51 #include "orte/util/name_fns.h"
52 #include "orte/util/proc_info.h"
53 #include "orte/runtime/orte_globals.h"
54 #include "orte/runtime/orte_wait.h"
55 #include "orte/mca/errmgr/errmgr.h"
56 #include "orte/mca/rml/rml.h"
57 #include "orte/mca/rml/rml_types.h"
58 #include "orte/mca/odls/odls_types.h"
59 #include "orte/mca/filem/filem.h"
60 #include "orte/mca/filem/base/base.h"
61
62 #include "orte/mca/sstore/sstore.h"
63 #include "orte/mca/sstore/base/base.h"
64
65 #include "sstore_stage.h"
66
67
68
69
70 #define SSTORE_LOCAL_NONE 0
71 #define SSTORE_LOCAL_ERROR 1
72 #define SSTORE_LOCAL_INIT 2
73 #define SSTORE_LOCAL_READY 3
74 #define SSTORE_LOCAL_SYNCED 4
75 #define SSTORE_LOCAL_DONE 5
76
77 struct orte_sstore_stage_local_snapshot_info_t {
78
79 opal_list_item_t super;
80
81
82 orte_sstore_base_handle_t id;
83
84
85 int status;
86
87
88 int seq_num;
89
90
91 char * global_ref_name;
92
93
94 char * location_fmt;
95
96
97 char * cache_location_fmt;
98
99
100 opal_list_t *app_info_handle;
101
102
103 char * compress_comp;
104
105
106 char * compress_postfix;
107
108
109 bool migrating;
110 };
111 typedef struct orte_sstore_stage_local_snapshot_info_t orte_sstore_stage_local_snapshot_info_t;
112 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_sstore_stage_local_snapshot_info_t);
113
114 void orte_sstore_stage_local_snapshot_info_construct(orte_sstore_stage_local_snapshot_info_t *info);
115 void orte_sstore_stage_local_snapshot_info_destruct( orte_sstore_stage_local_snapshot_info_t *info);
116
117 OBJ_CLASS_INSTANCE(orte_sstore_stage_local_snapshot_info_t,
118 opal_list_item_t,
119 orte_sstore_stage_local_snapshot_info_construct,
120 orte_sstore_stage_local_snapshot_info_destruct);
121
122 struct orte_sstore_stage_local_app_snapshot_info_t {
123
124 opal_list_item_t super;
125
126
127 orte_process_name_t name;
128
129
130 char * local_location;
131
132
133 char * compressed_local_location;
134
135
136 char * local_cache_location;
137
138
139 char * metadata_filename;
140
141
142 char * crs_comp;
143
144
145 bool ckpt_skipped;
146
147
148 pid_t compress_pid;
149 };
150 typedef struct orte_sstore_stage_local_app_snapshot_info_t orte_sstore_stage_local_app_snapshot_info_t;
151 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_sstore_stage_local_app_snapshot_info_t);
152
153 void orte_sstore_stage_local_app_snapshot_info_construct(orte_sstore_stage_local_app_snapshot_info_t *info);
154 void orte_sstore_stage_local_app_snapshot_info_destruct( orte_sstore_stage_local_app_snapshot_info_t *info);
155
156 OBJ_CLASS_INSTANCE(orte_sstore_stage_local_app_snapshot_info_t,
157 opal_list_item_t,
158 orte_sstore_stage_local_app_snapshot_info_construct,
159 orte_sstore_stage_local_app_snapshot_info_destruct);
160
161
162
163
164
165
166 static bool is_global_listener_active = false;
167 static int sstore_stage_local_start_listener(void);
168 static int sstore_stage_local_stop_listener(void);
169 static void sstore_stage_local_recv(int status,
170 orte_process_name_t* sender,
171 opal_buffer_t* buffer,
172 orte_rml_tag_t tag,
173 void* cbdata);
174
175 static int process_global_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info);
176 static int process_global_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info);
177 static int process_global_remove(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info);
178 static int process_app_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info);
179 static int process_app_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info);
180
181 static orte_sstore_stage_local_snapshot_info_t *create_new_handle_info(orte_sstore_base_handle_t handle);
182 static orte_sstore_stage_local_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle);
183 static orte_sstore_stage_local_snapshot_info_t *find_handle_info_ref(char * ref, int seq);
184
185 static int append_new_app_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info,
186 orte_process_name_t *name);
187 static orte_sstore_stage_local_app_snapshot_info_t *find_app_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info,
188 orte_process_name_t *name);
189
190 static int pull_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info );
191 static int push_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info );
192
193 static int wait_all_apps_updated(orte_sstore_stage_local_snapshot_info_t *handle_info);
194
195 static int start_compression(orte_sstore_stage_local_snapshot_info_t *handle_info,
196 orte_sstore_stage_local_app_snapshot_info_t *app_info);
197 static void sstore_stage_local_compress_waitpid_cb(orte_proc_t *proc, void* cbdata);
198 static int wait_all_compressed(orte_sstore_stage_local_snapshot_info_t *handle_info);
199
200 static int orte_sstore_stage_local_preload_files(char **local_location, bool *skip_xfer,
201 char *global_loc, char *ref, char *postfix, int seq);
202
203 static int sstore_stage_create_local_dir(void);
204 static int sstore_stage_destroy_local_dir(void);
205
206 static int sstore_stage_create_cache(void);
207 static int sstore_stage_update_cache(orte_sstore_stage_local_snapshot_info_t *handle_info);
208 static int sstore_stage_destroy_cache(void);
209
210 static opal_list_t *active_handles = NULL;
211 static char * sstore_stage_local_basedir = NULL;
212
213 static char * sstore_stage_cache_basedir = NULL;
214
215 static char * sstore_stage_cache_current_dir = NULL;
216 static char * sstore_stage_cache_last_dir = NULL;
217
218 static opal_list_t * preload_filem_requests = NULL;
219
220
221
222
223 void orte_sstore_stage_local_snapshot_info_construct(orte_sstore_stage_local_snapshot_info_t *info)
224 {
225 info->id = 0;
226
227 info->status = SSTORE_LOCAL_NONE;
228
229 info->seq_num = -1;
230
231 info->global_ref_name = NULL;
232
233 info->location_fmt = NULL;
234
235 info->cache_location_fmt = NULL;
236
237 info->app_info_handle = OBJ_NEW(opal_list_t);
238
239 info->compress_comp = NULL;
240
241 info->compress_postfix = NULL;
242
243 info->migrating = false;
244 }
245
246 void orte_sstore_stage_local_snapshot_info_destruct( orte_sstore_stage_local_snapshot_info_t *info)
247 {
248 info->id = 0;
249
250 info->status = SSTORE_LOCAL_NONE;
251
252 info->seq_num = -1;
253
254 if( NULL != info->global_ref_name ) {
255 free( info->global_ref_name );
256 info->global_ref_name = NULL;
257 }
258
259 if( NULL != info->location_fmt ) {
260 free( info->location_fmt );
261 info->location_fmt = NULL;
262 }
263
264 if( NULL != info->cache_location_fmt ) {
265 free( info->cache_location_fmt );
266 info->cache_location_fmt = NULL;
267 }
268
269 if( NULL != info->app_info_handle ) {
270 OBJ_RELEASE(info->app_info_handle);
271 info->app_info_handle = NULL;
272 }
273
274 if( NULL != info->compress_comp ) {
275 free(info->compress_comp);
276 info->compress_comp = NULL;
277 }
278
279 if( NULL != info->compress_postfix ) {
280 free(info->compress_postfix);
281 info->compress_postfix = NULL;
282 }
283
284 info->migrating = false;
285 }
286
287 void orte_sstore_stage_local_app_snapshot_info_construct(orte_sstore_stage_local_app_snapshot_info_t *info)
288 {
289 info->name.jobid = ORTE_JOBID_INVALID;
290 info->name.vpid = ORTE_VPID_INVALID;
291
292 info->local_location = NULL;
293 info->compressed_local_location = NULL;
294 info->local_cache_location = NULL;
295 info->metadata_filename = NULL;
296 info->crs_comp = NULL;
297 info->ckpt_skipped = false;
298 info->compress_pid = 0;
299 }
300
301 void orte_sstore_stage_local_app_snapshot_info_destruct( orte_sstore_stage_local_app_snapshot_info_t *info)
302 {
303 info->name.jobid = ORTE_JOBID_INVALID;
304 info->name.vpid = ORTE_VPID_INVALID;
305
306 if( NULL != info->local_location ) {
307 free(info->local_location);
308 info->local_location = NULL;
309 }
310
311 if( NULL != info->compressed_local_location ) {
312 free(info->compressed_local_location);
313 info->compressed_local_location = NULL;
314 }
315
316 if( NULL != info->local_cache_location ) {
317 free(info->local_cache_location);
318 info->local_cache_location = NULL;
319 }
320
321 if( NULL != info->metadata_filename ) {
322 free(info->metadata_filename);
323 info->metadata_filename = NULL;
324 }
325
326 if( NULL != info->crs_comp ) {
327 free(info->crs_comp);
328 info->crs_comp = NULL;
329 }
330
331 info->ckpt_skipped = false;
332
333 info->compress_pid = 0;
334 }
335
336
337
338
339 int orte_sstore_stage_local_module_init(void)
340 {
341 int ret, exit_status = ORTE_SUCCESS;
342
343 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
344 "sstore:stage:(local): init()"));
345
346 if( NULL == active_handles ) {
347 active_handles = OBJ_NEW(opal_list_t);
348 }
349
350 if( NULL == preload_filem_requests ) {
351 preload_filem_requests = OBJ_NEW(opal_list_t);
352 }
353
354
355
356
357 opal_asprintf(&sstore_stage_local_basedir, "%s/%s/%s",
358 orte_sstore_stage_local_snapshot_dir,
359 ORTE_SSTORE_LOCAL_SNAPSHOT_DIR_NAME,
360 ORTE_SSTORE_LOCAL_SNAPSHOT_STAGE_DIR_NAME);
361 if( ORTE_SUCCESS != (ret = sstore_stage_create_local_dir()) ) {
362 ORTE_ERROR_LOG(ret);
363 exit_status = ret;
364 goto cleanup;
365 }
366
367
368
369
370 if( orte_sstore_stage_enabled_caching ) {
371 opal_asprintf(&sstore_stage_cache_basedir, "%s/%s/%s",
372 orte_sstore_stage_local_snapshot_dir,
373 ORTE_SSTORE_LOCAL_SNAPSHOT_DIR_NAME,
374 ORTE_SSTORE_LOCAL_SNAPSHOT_CACHE_DIR_NAME);
375
376 if( ORTE_SUCCESS != (ret = sstore_stage_create_cache()) ) {
377 ORTE_ERROR_LOG(ret);
378 exit_status = ret;
379 goto cleanup;
380 }
381 }
382
383
384
385
386
387 if( !ORTE_PROC_IS_HNP ) {
388 if( ORTE_SUCCESS != (ret = sstore_stage_local_start_listener()) ) {
389 ORTE_ERROR_LOG(ret);
390 exit_status = ret;
391 goto cleanup;
392 }
393 }
394
395 cleanup:
396 return exit_status;
397 }
398
399 int orte_sstore_stage_local_module_finalize(void)
400 {
401 int ret, exit_status = ORTE_SUCCESS;
402 orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
403 opal_list_item_t* item = NULL;
404 bool done = false;
405 int cur_time = 0, max_time = 120;
406
407 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
408 "sstore:stage:(local): finalize()"));
409
410
411
412
413 if( !ORTE_PROC_IS_HNP ) {
414 done = false;
415 while( 0 < opal_list_get_size(active_handles) && !done ) {
416 done = true;
417 for(item = opal_list_get_first(active_handles);
418 item != opal_list_get_end(active_handles);
419 item = opal_list_get_next(item) ) {
420 handle_info = (orte_sstore_stage_local_snapshot_info_t*)item;
421 if( SSTORE_LOCAL_DONE != handle_info->status &&
422 SSTORE_LOCAL_NONE != handle_info->status &&
423 SSTORE_LOCAL_ERROR != handle_info->status ) {
424 done = false;
425 break;
426 }
427 }
428 if( done ) {
429 break;
430 }
431 else {
432 if( cur_time != 0 && cur_time % 30 == 0 ) {
433 opal_output(0, "---> Waiting for fin(): %3d / %3d\n",
434 cur_time, max_time);
435 }
436
437 opal_progress();
438 if( cur_time >= max_time ) {
439 break;
440 } else {
441 sleep(1);
442 }
443 cur_time++;
444 }
445 }
446 }
447
448 if( NULL != active_handles ) {
449 OBJ_RELEASE(active_handles);
450 }
451
452 if( NULL != preload_filem_requests ) {
453 OBJ_RELEASE(preload_filem_requests);
454 }
455
456
457
458
459
460 if( !ORTE_PROC_IS_HNP ) {
461 if( ORTE_SUCCESS != (ret = sstore_stage_local_stop_listener()) ) {
462 ORTE_ERROR_LOG(ret);
463 exit_status = ret;
464 goto cleanup;
465 }
466 }
467
468
469
470
471 if( orte_sstore_stage_enabled_caching ) {
472 if( ORTE_SUCCESS != (ret = sstore_stage_destroy_cache()) ) {
473 ORTE_ERROR_LOG(ret);
474 exit_status = ret;
475 goto cleanup;
476 }
477 }
478
479
480
481
482 if( ORTE_SUCCESS != (ret = sstore_stage_destroy_local_dir()) ) {
483 ORTE_ERROR_LOG(ret);
484 exit_status = ret;
485 goto cleanup;
486 }
487
488 cleanup:
489 if( orte_sstore_stage_enabled_caching ) {
490 if( NULL != sstore_stage_cache_basedir ) {
491 free(sstore_stage_cache_basedir);
492 sstore_stage_cache_basedir = NULL;
493 }
494 }
495
496 if( NULL != sstore_stage_local_basedir ) {
497 free(sstore_stage_local_basedir);
498 sstore_stage_local_basedir = NULL;
499 }
500
501 return exit_status;
502 }
503
504 int orte_sstore_stage_local_request_checkpoint_handle(orte_sstore_base_handle_t *handle, int seq, orte_jobid_t jobid)
505 {
506 opal_output(0, "sstore:stage:(local): request_checkpoint_handle() Not implemented!");
507 return ORTE_ERR_NOT_IMPLEMENTED;
508 }
509
510 int orte_sstore_stage_local_register(orte_sstore_base_handle_t handle)
511 {
512 int ret, exit_status = ORTE_SUCCESS;
513 orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
514
515 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
516 "sstore:stage:(local): register()"));
517
518
519
520
521 if( NULL == (handle_info = find_handle_info(handle)) ) {
522 handle_info = create_new_handle_info(handle);
523 }
524
525
526
527
528 if( ORTE_SUCCESS != (ret = pull_handle_info(handle_info)) ) {
529 ORTE_ERROR_LOG(ret);
530 exit_status = ret;
531 goto cleanup;
532 }
533
534
535
536
537 while(SSTORE_LOCAL_READY != handle_info->status &&
538 SSTORE_LOCAL_ERROR != handle_info->status ) {
539 opal_progress();
540 }
541
542 cleanup:
543 return exit_status;
544 }
545
546 int orte_sstore_stage_local_get_attr(orte_sstore_base_handle_t handle, orte_sstore_base_key_t key, char **value)
547 {
548 opal_output(0, "sstore:stage:(local): get_attr() Not implemented!");
549 return ORTE_ERR_NOT_IMPLEMENTED;
550 }
551
552 int orte_sstore_stage_local_set_attr(orte_sstore_base_handle_t handle, orte_sstore_base_key_t key, char *value)
553 {
554 opal_output(0, "sstore:stage:(local): set_attr() Not implemented!");
555 return ORTE_ERR_NOT_IMPLEMENTED;
556 }
557
558 int orte_sstore_stage_local_sync(orte_sstore_base_handle_t handle)
559 {
560 int ret, exit_status = ORTE_SUCCESS;
561 orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
562
563 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
564 "sstore:stage:(local): sync()"));
565
566
567
568
569 handle_info = find_handle_info(handle);
570
571
572
573
574 if( ORTE_SUCCESS != (ret = wait_all_apps_updated(handle_info))) {
575 ORTE_ERROR_LOG(ret);
576 exit_status = ret;
577 goto cleanup;
578 }
579
580
581
582
583 if( orte_sstore_stage_enabled_compression ) {
584 if( ORTE_SUCCESS != (ret = wait_all_compressed(handle_info))) {
585 ORTE_ERROR_LOG(ret);
586 exit_status = ret;
587 goto cleanup;
588 }
589 }
590
591
592
593
594 if( ORTE_SUCCESS != (ret = push_handle_info(handle_info)) ) {
595 ORTE_ERROR_LOG(ret);
596 exit_status = ret;
597 goto cleanup;
598 }
599
600 handle_info->status = SSTORE_LOCAL_SYNCED;
601
602 cleanup:
603 return exit_status;
604 }
605
606 int orte_sstore_stage_local_remove(orte_sstore_base_handle_t handle)
607 {
608 opal_output(0, "sstore:stage:(local): remove() Not implemented!");
609 return ORTE_ERR_NOT_IMPLEMENTED;
610 }
611
612 int orte_sstore_stage_local_pack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t handle)
613 {
614 int ret, exit_status = ORTE_SUCCESS;
615
616 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
617 "sstore:stage:(local): pack()"));
618
619
620
621
622
623
624
625
626
627 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &handle, 1, ORTE_SSTORE_HANDLE))) {
628 ORTE_ERROR_LOG(ret);
629 exit_status = ret;
630 goto cleanup;
631 }
632
633
634
635
636
637 cleanup:
638 return exit_status;
639 }
640
641 int orte_sstore_stage_local_unpack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t *handle)
642 {
643 int ret, exit_status = ORTE_SUCCESS;
644 orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
645 orte_std_cntr_t count;
646
647 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
648 "sstore:stage:(local): unpack()"));
649
650
651
652
653 count = 1;
654 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, handle, &count, ORTE_SSTORE_HANDLE))) {
655 ORTE_ERROR_LOG(ret);
656 exit_status = ret;
657 goto cleanup;
658 }
659
660
661
662
663 if( NULL == (handle_info = find_handle_info(*handle)) ) {
664 handle_info = create_new_handle_info(*handle);
665 }
666
667
668
669
670 if( ORTE_SUCCESS != (ret = process_global_push(peer, buffer, handle_info))) {
671 ORTE_ERROR_LOG(ret);
672 exit_status = ret;
673 goto cleanup;
674 }
675
676 cleanup:
677 return exit_status;
678 }
679
680 int orte_sstore_stage_local_fetch_app_deps(orte_app_context_t *app)
681 {
682 int ret, exit_status = ORTE_SUCCESS;
683 orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
684 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
685 char **sstore_args = NULL;
686 char * req_snap_loc = NULL;
687 char * req_snap_global_ref = NULL;
688 char * req_snap_ref = NULL;
689 char * req_snap_postfix = NULL;
690 char * local_location = NULL;
691 char * req_snap_compress = NULL;
692 char * compress_local_location = NULL;
693 char * compress_ref = NULL;
694 char * tmp_str = NULL;
695 int req_snap_seq = 0;
696 int i;
697 orte_proc_t *child = NULL;
698 int loc_argc = 0;
699 bool skip_xfer = false;
700 char *sload = NULL;
701
702 orte_get_attribute(&app->attributes, ORTE_APP_SSTORE_LOAD, (void **)&sload, OPAL_STRING);
703
704 if(!ORTE_FLAG_TEST(app, ORTE_APP_FLAG_USED_ON_NODE) || NULL == sload) {
705 OPAL_OUTPUT_VERBOSE((30, mca_sstore_stage_component.super.output_handle,
706 "sstore:stage:(local): fetch_app_deps(%3d): Not for this daemon (%s, %d, %s)",
707 app->idx, (ORTE_FLAG_TEST(app, ORTE_APP_FLAG_USED_ON_NODE) ? "T" : "F"),
708 (int)app->num_procs, sload));
709
710 goto cleanup;
711 }
712
713 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
714 "sstore:stage:(local): fetch_app_deps(%3d): %s",
715 app->idx, sload));
716
717
718
719
720 sstore_args = opal_argv_split(sload, ':');
721 req_snap_loc = strdup(sstore_args[0]);
722 req_snap_global_ref = strdup(sstore_args[1]);
723 req_snap_ref = strdup(sstore_args[2]);
724 if( NULL == sstore_args[4] ) {
725 req_snap_seq = atoi( sstore_args[3]);
726 } else {
727 req_snap_compress = strdup(sstore_args[3]);
728 req_snap_postfix = strdup(sstore_args[4]);
729 req_snap_seq = atoi( sstore_args[5]);
730 }
731
732 handle_info = find_handle_info_ref(req_snap_global_ref, req_snap_seq);
733 if( NULL == handle_info ) {
734
735 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
736 "sstore:stage:(local): fetch_app_deps(%3d): No known checkpoint [%s, %d]",
737 app->idx,
738 req_snap_ref,
739 req_snap_seq));
740 goto filem_preload;
741 }
742
743
744
745
746
747
748 if( orte_sstore_stage_enabled_caching && !handle_info->migrating ) {
749
750
751
752 for (i=0; i < orte_local_children->size; i++) {
753 if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
754 continue;
755 }
756
757 if( app->idx == child->app_idx ) {
758
759
760
761 app_info = find_app_handle_info(handle_info, &child->name);
762 break;
763 }
764 }
765
766 if( NULL == app_info ) {
767 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
768 "sstore:stage:(local): fetch_app_deps(%3d): No processes known for this app context",
769 app->idx));
770 goto filem_preload;
771 }
772
773
774
775
776 if( NULL != app_info->local_cache_location &&
777 0 == (ret = access(app_info->local_cache_location, F_OK)) ) {
778 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
779 "sstore:stage:(local): fetch_app_deps(%3d): Using local cache. (%s)",
780 app->idx,
781 app_info->local_cache_location));
782
783 opal_argv_append(&loc_argc, &(app->argv), "-c");
784 opal_argv_append(&loc_argc, &(app->argv), app_info->local_cache_location);
785 goto cleanup;
786 } else {
787 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
788 "sstore:stage:(local): fetch_app_deps(%3d): No cache available for %s. (%s)",
789 app->idx,
790 ORTE_NAME_PRINT(&app_info->name),
791 app_info->local_cache_location));
792 }
793 }
794
795 filem_preload:
796 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
797 "sstore:stage:(local): fetch_app_deps(%3d): Fetch files from Central storage",
798 app->idx));
799
800
801
802
803
804 if( ORTE_SUCCESS != (ret = orte_sstore_stage_local_preload_files(&local_location,
805 &skip_xfer,
806 req_snap_loc,
807 req_snap_ref,
808 req_snap_postfix,
809 req_snap_seq)) ) {
810 ORTE_ERROR_LOG(ret);
811 exit_status = ret;
812 goto cleanup;
813 }
814 opal_argv_append(&loc_argc, &(app->argv), "-l");
815 opal_argv_append(&loc_argc, &(app->argv), local_location);
816
817
818
819
820
821 if( !skip_xfer ) {
822 if( NULL != req_snap_compress && 0 < strlen(req_snap_compress) ) {
823 opal_argv_append(&loc_argc, &(app->argv), "-d");
824 opal_argv_append(&loc_argc, &(app->argv), req_snap_compress);
825 }
826 if( NULL != req_snap_postfix && 0 < strlen(req_snap_postfix) ) {
827 opal_argv_append(&loc_argc, &(app->argv), "-p");
828 opal_argv_append(&loc_argc, &(app->argv), req_snap_postfix);
829 }
830 }
831
832 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
833 "sstore:stage:(local): fetch_app_deps(%3d): Fetching to (%s)",
834 app->idx,
835 local_location));
836
837 cleanup:
838 if( NULL != req_snap_compress ) {
839 free(req_snap_compress);
840 req_snap_compress = NULL;
841 }
842
843 if( NULL != tmp_str ) {
844 free(tmp_str);
845 tmp_str = NULL;
846 }
847
848 if( NULL != compress_local_location ) {
849 free(compress_local_location);
850 compress_local_location = NULL;
851 }
852
853 if( NULL != compress_ref ) {
854 free(compress_ref);
855 compress_ref = NULL;
856 }
857
858 if( NULL != sstore_args ) {
859 opal_argv_free(sstore_args);
860 sstore_args = NULL;
861 }
862
863 if( NULL != req_snap_ref ) {
864 free(req_snap_ref);
865 req_snap_ref = NULL;
866 }
867
868 if( NULL != req_snap_postfix ) {
869 free(req_snap_postfix);
870 req_snap_postfix = NULL;
871 }
872
873 if( NULL != req_snap_loc ) {
874 free(req_snap_loc);
875 req_snap_loc = NULL;
876 }
877
878 if( NULL != req_snap_global_ref ) {
879 free(req_snap_global_ref);
880 req_snap_global_ref = NULL;
881 }
882
883 return exit_status;
884 }
885
886 int orte_sstore_stage_local_wait_all_deps(void)
887 {
888 int ret, exit_status = ORTE_SUCCESS;
889 opal_list_item_t* item = NULL;
890
891
892 if( 0 >= opal_list_get_size(preload_filem_requests) ) {
893 return ORTE_SUCCESS;
894 }
895
896
897
898
899 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
900 "sstore:stage:(local): wait_all_deps(): Waiting on %d requests",
901 (int)opal_list_get_size(preload_filem_requests)));
902
903 if(ORTE_SUCCESS != (ret = orte_filem.wait_all(preload_filem_requests)) ) {
904 ORTE_ERROR_LOG(ret);
905 exit_status = ret;
906 goto cleanup;
907 }
908
909
910
911
912
913
914
915
916
917
918 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
919 "sstore:stage:(local): wait_all_deps(): Finished waiting on %d requests!",
920 (int)opal_list_get_size(preload_filem_requests)));
921
922 cleanup:
923 while (NULL != (item = opal_list_remove_first(preload_filem_requests) ) ) {
924 OBJ_RELEASE(item);
925 }
926
927 return exit_status;
928 }
929
930
931
932
933 static orte_sstore_stage_local_snapshot_info_t *create_new_handle_info(orte_sstore_base_handle_t handle)
934 {
935 orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
936 int i;
937 orte_proc_t *child = NULL;
938
939 if( NULL == active_handles ) {
940 active_handles = OBJ_NEW(opal_list_t);
941 }
942
943 handle_info = OBJ_NEW(orte_sstore_stage_local_snapshot_info_t);
944
945 handle_info->id = handle;
946
947 opal_list_append(active_handles, &(handle_info->super));
948
949
950
951
952 for (i=0; i < orte_local_children->size; i++) {
953 if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
954 continue;
955 }
956 append_new_app_handle_info(handle_info, &child->name);
957 }
958
959 handle_info->status = SSTORE_LOCAL_INIT;
960
961 return handle_info;
962 }
963
964 static orte_sstore_stage_local_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle)
965 {
966 orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
967 opal_list_item_t* item = NULL;
968
969 if( NULL == active_handles ) {
970 return NULL;
971 }
972
973 for(item = opal_list_get_first(active_handles);
974 item != opal_list_get_end(active_handles);
975 item = opal_list_get_next(item) ) {
976 handle_info = (orte_sstore_stage_local_snapshot_info_t*)item;
977
978 if( handle_info->id == handle ) {
979 return handle_info;
980 }
981 }
982
983 return NULL;
984 }
985
986 static orte_sstore_stage_local_snapshot_info_t *find_handle_info_ref(char * ref, int seq)
987 {
988 orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
989 opal_list_item_t* item = NULL;
990
991 if( NULL == active_handles ) {
992 return NULL;
993 }
994
995 for(item = opal_list_get_first(active_handles);
996 item != opal_list_get_end(active_handles);
997 item = opal_list_get_next(item) ) {
998 handle_info = (orte_sstore_stage_local_snapshot_info_t*)item;
999
1000 if( 0 == strncmp(handle_info->global_ref_name, ref, strlen(ref)) &&
1001 handle_info->seq_num == seq ) {
1002 return handle_info;
1003 }
1004 }
1005
1006 return NULL;
1007 }
1008
1009 static int append_new_app_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info,
1010 orte_process_name_t *name)
1011 {
1012 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1013
1014 app_info = OBJ_NEW(orte_sstore_stage_local_app_snapshot_info_t);
1015
1016 app_info->name.jobid = name->jobid;
1017 app_info->name.vpid = name->vpid;
1018
1019 opal_list_append(handle_info->app_info_handle, &(app_info->super));
1020
1021 return ORTE_SUCCESS;
1022 }
1023
1024 static orte_sstore_stage_local_app_snapshot_info_t *find_app_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info,
1025 orte_process_name_t *name)
1026 {
1027 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1028 opal_list_item_t* item = NULL;
1029 orte_ns_cmp_bitmask_t mask;
1030
1031 for(item = opal_list_get_first(handle_info->app_info_handle);
1032 item != opal_list_get_end(handle_info->app_info_handle);
1033 item = opal_list_get_next(item) ) {
1034 app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1035
1036 mask = ORTE_NS_CMP_ALL;
1037
1038 if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &app_info->name, name)) {
1039 return app_info;
1040 }
1041 }
1042
1043 return NULL;
1044 }
1045
1046 static int sstore_stage_local_start_listener(void)
1047 {
1048 if( is_global_listener_active ) {
1049 return ORTE_SUCCESS;
1050 }
1051
1052 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL,
1053 ORTE_RML_PERSISTENT, sstore_stage_local_recv, NULL);
1054
1055 is_global_listener_active = true;
1056 return ORTE_SUCCESS;
1057 }
1058
1059 static int sstore_stage_local_stop_listener(void)
1060 {
1061 orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL);
1062 is_global_listener_active = false;
1063 return ORTE_SUCCESS;
1064 }
1065
1066 static void sstore_stage_local_recv(int status,
1067 orte_process_name_t* sender,
1068 opal_buffer_t* buffer,
1069 orte_rml_tag_t tag,
1070 void* cbdata)
1071 {
1072 int ret;
1073 orte_sstore_stage_cmd_flag_t command;
1074 orte_std_cntr_t count;
1075 orte_sstore_base_handle_t loc_id;
1076
1077 if( ORTE_RML_TAG_SSTORE_INTERNAL != tag ) {
1078 return;
1079 }
1080
1081 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1082 "sstore:stage:(local): process_cmd(%s)",
1083 ORTE_NAME_PRINT(sender)));
1084
1085 count = 1;
1086 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SSTORE_STAGE_CMD))) {
1087 ORTE_ERROR_LOG(ret);
1088 goto cleanup;
1089 }
1090
1091 count = 1;
1092 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &loc_id, &count, ORTE_SSTORE_HANDLE )) ) {
1093 ORTE_ERROR_LOG(ret);
1094 goto cleanup;
1095 }
1096
1097 orte_sstore_stage_local_process_cmd_action(sender, command, loc_id, buffer);
1098
1099 cleanup:
1100 return;
1101 }
1102
1103 int orte_sstore_stage_local_process_cmd_action(orte_process_name_t *sender,
1104 orte_sstore_stage_cmd_flag_t command,
1105 orte_sstore_base_handle_t loc_id,
1106 opal_buffer_t* buffer)
1107 {
1108 orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
1109
1110
1111
1112
1113 if(NULL == (handle_info = find_handle_info(loc_id)) ) {
1114 handle_info = create_new_handle_info(loc_id);
1115 }
1116
1117 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1118 "sstore:stage:(local): process_cmd(%s) - Command = %s",
1119 ORTE_NAME_PRINT(sender),
1120 (ORTE_SSTORE_STAGE_PULL == command ? "Pull" :
1121 (ORTE_SSTORE_STAGE_PUSH == command ? "Push" :
1122 (ORTE_SSTORE_STAGE_REMOVE == command ? "Remove" :
1123 (ORTE_SSTORE_STAGE_DONE == command ? "Done" : "Unknown")))) ));
1124
1125
1126
1127
1128 if( ORTE_SSTORE_STAGE_PULL == command ) {
1129 if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, ORTE_PROC_MY_HNP, sender)) {
1130 process_global_pull(sender, buffer, handle_info);
1131 } else {
1132 process_app_pull(sender, buffer, handle_info);
1133 }
1134 }
1135 else if( ORTE_SSTORE_STAGE_PUSH == command ) {
1136 if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, ORTE_PROC_MY_HNP, sender)) {
1137 process_global_push(sender, buffer, handle_info);
1138 } else {
1139 process_app_push(sender, buffer, handle_info);
1140 }
1141 }
1142 else if( ORTE_SSTORE_STAGE_REMOVE == command ) {
1143
1144
1145 process_global_remove(ORTE_PROC_MY_HNP, buffer, handle_info);
1146 }
1147
1148 return ORTE_SUCCESS;
1149 }
1150
1151 static int process_global_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info)
1152 {
1153
1154 opal_output(0, "sstore:stage:(local): process_global_pull() Not implemented!");
1155 return ORTE_ERR_NOT_IMPLEMENTED;
1156 }
1157
1158 static int process_global_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info)
1159 {
1160 int ret, exit_status = ORTE_SUCCESS;
1161 orte_std_cntr_t count;
1162 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1163 opal_list_item_t* item = NULL;
1164
1165 count = 1;
1166 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->seq_num), &count, OPAL_INT))) {
1167 ORTE_ERROR_LOG(ret);
1168 exit_status = ret;
1169 goto cleanup;
1170 }
1171
1172 count = 1;
1173 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->global_ref_name), &count, OPAL_STRING))) {
1174 ORTE_ERROR_LOG(ret);
1175 exit_status = ret;
1176 goto cleanup;
1177 }
1178
1179 count = 1;
1180 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->location_fmt), &count, OPAL_STRING))) {
1181 ORTE_ERROR_LOG(ret);
1182 exit_status = ret;
1183 goto cleanup;
1184 }
1185
1186 if( orte_sstore_stage_enabled_caching ) {
1187 count = 1;
1188 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->cache_location_fmt), &count, OPAL_STRING))) {
1189 ORTE_ERROR_LOG(ret);
1190 exit_status = ret;
1191 goto cleanup;
1192 }
1193 }
1194
1195 count = 1;
1196 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->migrating), &count, OPAL_BOOL))) {
1197 ORTE_ERROR_LOG(ret);
1198 exit_status = ret;
1199 goto cleanup;
1200 }
1201
1202
1203
1204
1205 for(item = opal_list_get_first(handle_info->app_info_handle);
1206 item != opal_list_get_end(handle_info->app_info_handle);
1207 item = opal_list_get_next(item) ) {
1208 app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1209
1210 if( NULL != app_info->local_location ) {
1211 free(app_info->local_location);
1212 app_info->local_location = NULL;
1213 }
1214 opal_asprintf(&(app_info->local_location), handle_info->location_fmt, app_info->name.vpid);
1215
1216 if( orte_sstore_stage_enabled_caching ) {
1217 if( NULL != app_info->local_cache_location ) {
1218 free(app_info->local_cache_location);
1219 app_info->local_cache_location = NULL;
1220 }
1221 opal_asprintf(&(app_info->local_cache_location), handle_info->cache_location_fmt, app_info->name.vpid);
1222 }
1223
1224 if( NULL != app_info->metadata_filename ) {
1225 free(app_info->metadata_filename);
1226 app_info->metadata_filename = NULL;
1227 }
1228 opal_asprintf(&(app_info->metadata_filename), "%s/%s",
1229 app_info->local_location,
1230 orte_sstore_base_local_metadata_filename);
1231 }
1232
1233 cleanup:
1234 if( ORTE_SUCCESS == exit_status ) {
1235 handle_info->status = SSTORE_LOCAL_READY;
1236 } else {
1237 handle_info->status = SSTORE_LOCAL_ERROR;
1238 }
1239
1240 return exit_status;
1241 }
1242
1243 static int process_global_remove(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info)
1244 {
1245 int ret, exit_status = ORTE_SUCCESS;
1246 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1247 opal_list_item_t* item = NULL;
1248 opal_buffer_t *loc_buffer = NULL;
1249 orte_sstore_stage_cmd_flag_t command;
1250 size_t list_size;
1251 char * cmd = NULL;
1252
1253
1254
1255
1256
1257
1258 if( !orte_sstore_stage_enabled_caching || handle_info->migrating ) {
1259 for(item = opal_list_get_first(handle_info->app_info_handle);
1260 item != opal_list_get_end(handle_info->app_info_handle);
1261 item = opal_list_get_next(item) ) {
1262 app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1263
1264 opal_asprintf(&cmd, "rm -rf %s", app_info->local_location);
1265 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1266 "sstore:stage:(local): update_cache(): Removing with command (%s)",
1267 cmd));
1268 system(cmd);
1269
1270 if( orte_sstore_stage_enabled_compression && NULL != app_info->compressed_local_location) {
1271 free(cmd);
1272 cmd = NULL;
1273
1274 opal_asprintf(&cmd, "rm -rf %s", app_info->compressed_local_location);
1275 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1276 "sstore:stage:(local): update_cache(): Removing with command (%s)",
1277 cmd));
1278 system(cmd);
1279 }
1280 }
1281 }
1282 else {
1283
1284
1285
1286 if( ORTE_SUCCESS != (ret = sstore_stage_update_cache(handle_info)) ) {
1287 ORTE_ERROR_LOG(ret);
1288 exit_status = ret;
1289 goto cleanup;
1290 }
1291 }
1292
1293 loc_buffer = OBJ_NEW(opal_buffer_t);
1294
1295 command = ORTE_SSTORE_STAGE_DONE;
1296 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
1297 ORTE_ERROR_LOG(ret);
1298 exit_status = ret;
1299 goto cleanup;
1300 }
1301
1302 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
1303 ORTE_ERROR_LOG(ret);
1304 exit_status = ret;
1305 goto cleanup;
1306 }
1307
1308 list_size = opal_list_get_size(handle_info->app_info_handle);
1309 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &list_size, 1, OPAL_SIZE))) {
1310 ORTE_ERROR_LOG(ret);
1311 exit_status = ret;
1312 goto cleanup;
1313 }
1314
1315 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(peer, loc_buffer, ORTE_RML_TAG_SSTORE_INTERNAL,
1316 orte_rml_send_callback, NULL))) {
1317 ORTE_ERROR_LOG(ret);
1318 exit_status = ret;
1319 goto cleanup;
1320 }
1321
1322 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1323 "sstore:stage:(local): remove(): Sent done for %d files to %s",
1324 (int)list_size,
1325 ORTE_NAME_PRINT(peer)));
1326
1327 handle_info->status = SSTORE_LOCAL_DONE;
1328
1329 loc_buffer = NULL;
1330
1331 cleanup:
1332 if( NULL != cmd ) {
1333 free(cmd);
1334 cmd = NULL;
1335 }
1336
1337 if (NULL != loc_buffer) {
1338 OBJ_RELEASE(loc_buffer);
1339 loc_buffer = NULL;
1340 }
1341
1342 return exit_status;
1343 }
1344
1345 static int process_app_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info)
1346 {
1347 int ret, exit_status = ORTE_SUCCESS;
1348 opal_buffer_t *loc_buffer = NULL;
1349 orte_sstore_stage_cmd_flag_t command;
1350 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1351
1352
1353
1354
1355 app_info = find_app_handle_info(handle_info, peer);
1356
1357
1358
1359
1360 loc_buffer = OBJ_NEW(opal_buffer_t);
1361
1362 command = ORTE_SSTORE_STAGE_PUSH;
1363 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
1364 ORTE_ERROR_LOG(ret);
1365 exit_status = ret;
1366 goto cleanup;
1367 }
1368
1369 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
1370 ORTE_ERROR_LOG(ret);
1371 exit_status = ret;
1372 goto cleanup;
1373 }
1374
1375 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->seq_num), 1, OPAL_INT))) {
1376 ORTE_ERROR_LOG(ret);
1377 exit_status = ret;
1378 goto cleanup;
1379 }
1380
1381 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->global_ref_name), 1, OPAL_STRING))) {
1382 ORTE_ERROR_LOG(ret);
1383 exit_status = ret;
1384 goto cleanup;
1385 }
1386
1387 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(app_info->local_location), 1, OPAL_STRING))) {
1388 ORTE_ERROR_LOG(ret);
1389 exit_status = ret;
1390 goto cleanup;
1391 }
1392
1393 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(app_info->metadata_filename), 1, OPAL_STRING))) {
1394 ORTE_ERROR_LOG(ret);
1395 exit_status = ret;
1396 goto cleanup;
1397 }
1398
1399 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(peer, loc_buffer, ORTE_RML_TAG_SSTORE_INTERNAL,
1400 orte_rml_send_callback, NULL))) {
1401 ORTE_ERROR_LOG(ret);
1402 exit_status = ret;
1403 goto cleanup;
1404 }
1405
1406
1407 loc_buffer = NULL;
1408
1409 cleanup:
1410 if (NULL != loc_buffer) {
1411 OBJ_RELEASE(loc_buffer);
1412 loc_buffer = NULL;
1413 }
1414
1415 return exit_status;
1416 }
1417
1418 static int process_app_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info)
1419 {
1420 int ret, exit_status = ORTE_SUCCESS;
1421 orte_std_cntr_t count;
1422 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1423
1424
1425
1426
1427 app_info = find_app_handle_info(handle_info, peer);
1428
1429
1430
1431
1432 count = 1;
1433 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(app_info->ckpt_skipped), &count, OPAL_BOOL))) {
1434 ORTE_ERROR_LOG(ret);
1435 exit_status = ret;
1436 goto cleanup;
1437 }
1438
1439 if( !app_info->ckpt_skipped ) {
1440 count = 1;
1441 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(app_info->crs_comp), &count, OPAL_STRING))) {
1442 ORTE_ERROR_LOG(ret);
1443 exit_status = ret;
1444 goto cleanup;
1445 }
1446 }
1447
1448 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1449 "sstore:stage:(local): app_push(%s, skip=%s, %s)",
1450 ORTE_NAME_PRINT(&(app_info->name)),
1451 (app_info->ckpt_skipped ? "T" : "F"),
1452 app_info->crs_comp));
1453
1454
1455
1456 cleanup:
1457 return exit_status;
1458 }
1459
1460 static int wait_all_apps_updated(orte_sstore_stage_local_snapshot_info_t *handle_info)
1461 {
1462 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1463 opal_list_item_t *item = NULL;
1464 bool is_done = true;
1465
1466 do {
1467 is_done = true;
1468 for(item = opal_list_get_first(handle_info->app_info_handle);
1469 item != opal_list_get_end(handle_info->app_info_handle);
1470 item = opal_list_get_next(item) ) {
1471 app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1472
1473 if( NULL == app_info->crs_comp && !app_info->ckpt_skipped ) {
1474 is_done = false;
1475 break;
1476 }
1477 }
1478
1479 if( !is_done ) {
1480 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1481 "sstore:stage:(local): Waiting for appliccation %s",
1482 ORTE_NAME_PRINT(&(app_info->name)) ));
1483 opal_progress();
1484 }
1485 } while(!is_done);
1486
1487 return ORTE_SUCCESS;
1488 }
1489
1490 static int start_compression(orte_sstore_stage_local_snapshot_info_t *handle_info,
1491 orte_sstore_stage_local_app_snapshot_info_t *app_info)
1492 {
1493 int ret, exit_status = ORTE_SUCCESS;
1494 char * postfix = NULL;
1495 orte_proc_t *proc;
1496
1497
1498 if( !orte_sstore_stage_enabled_compression ) {
1499 goto cleanup;
1500 }
1501
1502 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1503 "sstore:stage:(local): start_compression() Starting compression for process %s of (%s)",
1504 ORTE_NAME_PRINT(&(app_info->name)),
1505 app_info->local_location ));
1506
1507
1508
1509
1510 if( ORTE_SUCCESS != (ret = opal_compress.compress_nb(app_info->local_location,
1511 &(app_info->compressed_local_location),
1512 &(postfix),
1513 &(app_info->compress_pid))) ) {
1514 ORTE_ERROR_LOG(ret);
1515 exit_status = ret;
1516 goto cleanup;
1517 }
1518 if( app_info->compress_pid <= 0 ) {
1519 ORTE_ERROR_LOG(ORTE_ERROR);
1520 exit_status = ORTE_ERROR;
1521 goto cleanup;
1522 }
1523
1524 if( NULL == handle_info->compress_comp ) {
1525 handle_info->compress_comp = strdup(opal_compress_base_selected_component.base_version.mca_component_name);
1526 handle_info->compress_postfix = strdup(postfix);
1527 }
1528
1529
1530
1531
1532 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1533 "sstore:stage:(local): start_compression() Waiting for compression (%d) for process %s",
1534 app_info->compress_pid,
1535 ORTE_NAME_PRINT(&(app_info->name)) ));
1536
1537 proc = OBJ_NEW(orte_proc_t);
1538 proc->pid = app_info->compress_pid;
1539
1540 ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_ALIVE);
1541
1542 orte_wait_cb(proc, sstore_stage_local_compress_waitpid_cb, app_info);
1543
1544 cleanup:
1545 if( NULL != postfix ) {
1546 free(postfix);
1547 postfix = NULL;
1548 }
1549
1550 return exit_status;
1551 }
1552
1553 static void sstore_stage_local_compress_waitpid_cb(orte_proc_t *proc, void* cbdata)
1554 {
1555 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1556 orte_wait_tracker_t *t2 = (orte_wait_tracker_t *)cbdata;
1557
1558 app_info = (orte_sstore_stage_local_app_snapshot_info_t*)t2->cbdata;
1559
1560 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1561 "sstore:stage:(local): waitpid(%6d) Compression finished for Process %s",
1562 (int)proc->pid,
1563 ORTE_NAME_PRINT(&(app_info->name)) ));
1564
1565 app_info->compress_pid = 0;
1566 OBJ_RELEASE(proc);
1567 OBJ_RELEASE(t2);
1568 }
1569
1570 static int wait_all_compressed(orte_sstore_stage_local_snapshot_info_t *handle_info)
1571 {
1572 int ret, exit_status = ORTE_SUCCESS;
1573 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1574 opal_list_item_t *item = NULL;
1575 bool is_done = true;
1576 int usleep_time = 1000;
1577 int s_time = 0, max_wait_time;
1578
1579
1580 if( !orte_sstore_stage_enabled_compression ) {
1581 return ORTE_SUCCESS;
1582 }
1583
1584
1585
1586
1587 if( orte_sstore_stage_compress_delay > 0 ) {
1588 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1589 "sstore:stage:(local): Delaying %d second before starting compression...",
1590 orte_sstore_stage_compress_delay));
1591 max_wait_time = orte_sstore_stage_compress_delay * (1000000/usleep_time);
1592 for( s_time = 0; s_time < max_wait_time; ++s_time) {
1593 opal_progress();
1594 usleep(1000);
1595 }
1596 }
1597
1598 for(item = opal_list_get_first(handle_info->app_info_handle);
1599 item != opal_list_get_end(handle_info->app_info_handle);
1600 item = opal_list_get_next(item) ) {
1601 app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1602
1603 if( ORTE_SUCCESS != (ret = start_compression(handle_info, app_info)) ) {
1604 ORTE_ERROR_LOG(ret);
1605 exit_status = ret;
1606 goto cleanup;
1607 }
1608 }
1609
1610
1611
1612
1613 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1614 "sstore:stage:(local): Waiting for compression to finish..."));
1615 do {
1616 is_done = true;
1617 for(item = opal_list_get_first(handle_info->app_info_handle);
1618 item != opal_list_get_end(handle_info->app_info_handle);
1619 item = opal_list_get_next(item) ) {
1620 app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1621
1622 if( 0 < app_info->compress_pid ) {
1623 is_done = false;
1624 break;
1625 }
1626 }
1627
1628 if( !is_done ) {
1629 OPAL_OUTPUT_VERBOSE((30, mca_sstore_stage_component.super.output_handle,
1630 "sstore:stage:(local): Waiting for compression to finish for appliccation %s",
1631 ORTE_NAME_PRINT(&(app_info->name)) ));
1632 opal_progress();
1633 }
1634 } while(!is_done);
1635
1636 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1637 "sstore:stage:(local): Compression finished!"));
1638 cleanup:
1639 return exit_status;
1640 }
1641
1642 static int pull_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info )
1643 {
1644 int ret, exit_status = ORTE_SUCCESS;
1645 opal_buffer_t *buffer = NULL;
1646 orte_sstore_stage_cmd_flag_t command;
1647
1648
1649
1650
1651
1652 if( 0 <= handle_info->seq_num &&
1653 NULL != handle_info->global_ref_name &&
1654 NULL != handle_info->location_fmt ) {
1655 handle_info->status = SSTORE_LOCAL_READY;
1656 return ORTE_SUCCESS;
1657 }
1658
1659 buffer = OBJ_NEW(opal_buffer_t);
1660
1661
1662
1663
1664 command = ORTE_SSTORE_STAGE_PULL;
1665 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
1666 ORTE_ERROR_LOG(ret);
1667 exit_status = ret;
1668 goto cleanup;
1669 }
1670
1671 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
1672 ORTE_ERROR_LOG(ret);
1673 exit_status = ret;
1674 goto cleanup;
1675 }
1676
1677 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer,
1678 ORTE_RML_TAG_SSTORE_INTERNAL,
1679 orte_rml_send_callback, NULL))) {
1680 ORTE_ERROR_LOG(ret);
1681 exit_status = ret;
1682 goto cleanup;
1683 }
1684
1685
1686 buffer = NULL;
1687
1688 cleanup:
1689 if (NULL != buffer) {
1690 OBJ_RELEASE(buffer);
1691 buffer = NULL;
1692 }
1693
1694 return exit_status;
1695 }
1696
1697 static int push_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info )
1698 {
1699 int ret, exit_status = ORTE_SUCCESS;
1700 opal_buffer_t *buffer = NULL;
1701 orte_sstore_stage_cmd_flag_t command;
1702 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1703 opal_list_item_t *item = NULL;
1704 size_t list_size;
1705
1706 buffer = OBJ_NEW(opal_buffer_t);
1707
1708 command = ORTE_SSTORE_STAGE_PUSH;
1709 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
1710 ORTE_ERROR_LOG(ret);
1711 exit_status = ret;
1712 goto cleanup;
1713 }
1714
1715 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
1716 ORTE_ERROR_LOG(ret);
1717 exit_status = ret;
1718 goto cleanup;
1719 }
1720
1721 list_size = opal_list_get_size(handle_info->app_info_handle);
1722 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &list_size, 1, OPAL_SIZE))) {
1723 ORTE_ERROR_LOG(ret);
1724 exit_status = ret;
1725 goto cleanup;
1726 }
1727
1728
1729
1730
1731 for(item = opal_list_get_first(handle_info->app_info_handle);
1732 item != opal_list_get_end(handle_info->app_info_handle);
1733 item = opal_list_get_next(item) ) {
1734 app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1735
1736 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(app_info->name), 1, ORTE_NAME))) {
1737 ORTE_ERROR_LOG(ret);
1738 exit_status = ret;
1739 goto cleanup;
1740 }
1741
1742 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(app_info->ckpt_skipped), 1, OPAL_BOOL))) {
1743 ORTE_ERROR_LOG(ret);
1744 exit_status = ret;
1745 goto cleanup;
1746 }
1747
1748 if( !app_info->ckpt_skipped ) {
1749 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(app_info->crs_comp), 1, OPAL_STRING))) {
1750 ORTE_ERROR_LOG(ret);
1751 exit_status = ret;
1752 goto cleanup;
1753 }
1754
1755 if( orte_sstore_stage_enabled_compression ) {
1756 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->compress_comp), 1, OPAL_STRING))) {
1757 ORTE_ERROR_LOG(ret);
1758 exit_status = ret;
1759 goto cleanup;
1760 }
1761 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->compress_postfix), 1, OPAL_STRING))) {
1762 ORTE_ERROR_LOG(ret);
1763 exit_status = ret;
1764 goto cleanup;
1765 }
1766 }
1767 }
1768 }
1769
1770 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer, ORTE_RML_TAG_SSTORE_INTERNAL,
1771 orte_rml_send_callback, NULL))) {
1772 ORTE_ERROR_LOG(ret);
1773 exit_status = ret;
1774 goto cleanup;
1775 }
1776
1777
1778 buffer = NULL;
1779
1780 cleanup:
1781 if (NULL != buffer) {
1782 OBJ_RELEASE(buffer);
1783 buffer = NULL;
1784 }
1785
1786 return exit_status;
1787 }
1788
1789 static int sstore_stage_create_local_dir(void)
1790 {
1791 int ret, exit_status = ORTE_SUCCESS;
1792 mode_t my_mode = S_IRWXU;
1793
1794 if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(sstore_stage_local_basedir, my_mode)) ) {
1795 ORTE_ERROR_LOG(ret);
1796 exit_status = ret;
1797 goto cleanup;
1798 }
1799
1800 cleanup:
1801 return exit_status;
1802 }
1803
1804 static int sstore_stage_destroy_local_dir(void)
1805 {
1806 int ret, exit_status = ORTE_SUCCESS;
1807 char * basedir_root = NULL;
1808
1809 opal_asprintf(&basedir_root, "%s/%s",
1810 orte_sstore_stage_local_snapshot_dir,
1811 ORTE_SSTORE_LOCAL_SNAPSHOT_DIR_NAME);
1812
1813 if(OPAL_SUCCESS != (ret = opal_os_dirpath_destroy(basedir_root, true, NULL)) ) {
1814 ORTE_ERROR_LOG(ret);
1815 exit_status = ret;
1816 goto cleanup;
1817 }
1818
1819 cleanup:
1820 if( NULL != basedir_root ) {
1821 free(basedir_root);
1822 basedir_root = NULL;
1823 }
1824
1825 return exit_status;
1826 }
1827
1828 static int sstore_stage_create_cache(void)
1829 {
1830 int ret, exit_status = ORTE_SUCCESS;
1831 mode_t my_mode = S_IRWXU;
1832
1833
1834 if( !orte_sstore_stage_enabled_caching ) {
1835 goto cleanup;
1836 }
1837
1838 if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(sstore_stage_cache_basedir, my_mode)) ) {
1839 ORTE_ERROR_LOG(ret);
1840 exit_status = ret;
1841 goto cleanup;
1842 }
1843
1844 cleanup:
1845 return exit_status;
1846 }
1847
1848 static int sstore_stage_destroy_cache(void)
1849 {
1850 int ret, exit_status = ORTE_SUCCESS;
1851
1852
1853 if( !orte_sstore_stage_enabled_caching ) {
1854 goto cleanup;
1855 }
1856
1857 if(OPAL_SUCCESS != (ret = opal_os_dirpath_destroy(sstore_stage_cache_basedir, true, NULL)) ) {
1858 ORTE_ERROR_LOG(ret);
1859 exit_status = ret;
1860 goto cleanup;
1861 }
1862
1863 cleanup:
1864 return exit_status;
1865 }
1866
1867 static int sstore_stage_update_cache(orte_sstore_stage_local_snapshot_info_t *handle_info)
1868 {
1869 int ret, exit_status = ORTE_SUCCESS;
1870 char *cmd = NULL;
1871 mode_t my_mode = S_IRWXU;
1872 char *cache_dirname = NULL;
1873 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1874 opal_list_item_t* item = NULL;
1875 size_t list_size;
1876
1877
1878 if( !orte_sstore_stage_enabled_caching || handle_info->migrating) {
1879 goto cleanup;
1880 }
1881
1882 list_size = opal_list_get_size(handle_info->app_info_handle);
1883 if( 0 >= list_size ) {
1884
1885 exit_status = ORTE_SUCCESS;
1886 goto cleanup;
1887 }
1888
1889 app_info = (orte_sstore_stage_local_app_snapshot_info_t*)opal_list_get_first(handle_info->app_info_handle);
1890 if( NULL == app_info ) {
1891 ORTE_ERROR_LOG(ORTE_ERROR);
1892 exit_status = ORTE_ERROR;
1893 goto cleanup;
1894 }
1895
1896
1897
1898
1899 cache_dirname = opal_dirname(app_info->local_cache_location);
1900 if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(cache_dirname, my_mode)) ) {
1901 ORTE_ERROR_LOG(ret);
1902 exit_status = ret;
1903 goto cleanup;
1904 }
1905
1906
1907
1908
1909
1910 for(item = opal_list_get_first(handle_info->app_info_handle);
1911 item != opal_list_get_end(handle_info->app_info_handle);
1912 item = opal_list_get_next(item) ) {
1913 app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1914
1915 opal_asprintf(&cmd, "mv %s %s", app_info->local_location, cache_dirname);
1916 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1917 "sstore:stage:(local): update_cache(): Caching snapshot for process %s [%s]",
1918 ORTE_NAME_PRINT(&app_info->name),
1919 cmd));
1920 system(cmd);
1921
1922
1923 if( orte_sstore_stage_enabled_compression && NULL != app_info->compressed_local_location) {
1924 free(cmd);
1925 cmd = NULL;
1926
1927 opal_asprintf(&cmd, "rm -rf %s", app_info->compressed_local_location);
1928 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1929 "sstore:stage:(local): update_cache(): Removing with command (%s)",
1930 cmd));
1931 system(cmd);
1932 }
1933 }
1934
1935
1936
1937
1938 if( NULL != sstore_stage_cache_last_dir ) {
1939 opal_asprintf(&cmd, "rm -rf %s", sstore_stage_cache_last_dir);
1940 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1941 "sstore:stage:(local): update_cache(): Removing old cache dir command (%s)",
1942 sstore_stage_cache_last_dir));
1943 system(cmd);
1944 }
1945
1946
1947
1948
1949 if( NULL != sstore_stage_cache_last_dir ) {
1950 free(sstore_stage_cache_last_dir);
1951 sstore_stage_cache_last_dir = NULL;
1952 }
1953 if( NULL != sstore_stage_cache_current_dir ) {
1954 sstore_stage_cache_last_dir = strdup(sstore_stage_cache_current_dir);
1955 }
1956
1957
1958
1959
1960 if( NULL != sstore_stage_cache_current_dir ) {
1961 free(sstore_stage_cache_current_dir);
1962 sstore_stage_cache_current_dir = NULL;
1963 }
1964 sstore_stage_cache_current_dir = strdup(cache_dirname);
1965
1966 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1967 "sstore:stage:(local): update_cache(): Cache Pointers cur(%s), last(%s)",
1968 sstore_stage_cache_current_dir, sstore_stage_cache_last_dir));
1969
1970 cleanup:
1971 if( NULL != cmd ) {
1972 free(cmd);
1973 cmd = NULL;
1974 }
1975
1976 return exit_status;
1977 }
1978
1979 static int orte_sstore_stage_local_preload_files(char **local_location, bool *skip_xfer,
1980 char *global_loc, char *ref, char *postfix, int seq)
1981 {
1982 int ret, exit_status = ORTE_SUCCESS;
1983 mode_t my_mode = S_IRWXU;
1984 orte_filem_base_request_t *filem_request;
1985 orte_filem_base_process_set_t *p_set = NULL;
1986 orte_filem_base_file_set_t * f_set = NULL;
1987 char * full_local_location = NULL;
1988
1989 *skip_xfer = false;
1990
1991 if( NULL != *local_location) {
1992 free(*local_location);
1993 *local_location = NULL;
1994 }
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009 #if 0
2010 if( orte_sstore_stage_global_is_shared &&
2011 (NULL == postfix || 0 >= strlen(postfix) ) ) {
2012 *local_location = strdup(global_loc);
2013 *skip_xfer = true;
2014 goto cleanup;
2015 }
2016 #endif
2017
2018 opal_asprintf(local_location, "%s/%s/%s/%d",
2019 orte_sstore_stage_local_snapshot_dir,
2020 ORTE_SSTORE_LOCAL_SNAPSHOT_DIR_NAME,
2021 ORTE_SSTORE_LOCAL_SNAPSHOT_RESTART_DIR_NAME,
2022 seq);
2023 opal_asprintf(&full_local_location, "%s/%s",
2024 *local_location,
2025 ref);
2026
2027
2028
2029
2030
2031 if( 0 == (ret = access(full_local_location, F_OK)) ) {
2032 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
2033 "sstore:stage:(local): preload_files() Local snapshot already exists, reuse it (%s)",
2034 full_local_location));
2035 *skip_xfer = true;
2036 goto cleanup;
2037 }
2038
2039
2040
2041
2042 if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(*local_location, my_mode)) ) {
2043 ORTE_ERROR_LOG(ret);
2044 exit_status = ret;
2045 goto cleanup;
2046 }
2047
2048
2049
2050
2051 filem_request = OBJ_NEW(orte_filem_base_request_t);
2052
2053
2054 p_set = OBJ_NEW(orte_filem_base_process_set_t);
2055 if( ORTE_PROC_IS_HNP ) {
2056
2057 p_set->source.jobid = ORTE_PROC_MY_NAME->jobid;
2058 p_set->source.vpid = ORTE_PROC_MY_NAME->vpid;
2059 }
2060 else {
2061
2062 p_set->source.jobid = ORTE_PROC_MY_HNP->jobid;
2063 p_set->source.vpid = ORTE_PROC_MY_HNP->vpid;
2064 }
2065 p_set->sink.jobid = ORTE_PROC_MY_NAME->jobid;
2066 p_set->sink.vpid = ORTE_PROC_MY_NAME->vpid;
2067 opal_list_append(&(filem_request->process_sets), &(p_set->super) );
2068
2069
2070 f_set = OBJ_NEW(orte_filem_base_file_set_t);
2071
2072 f_set->local_target = strdup(*local_location);
2073 if( NULL != postfix && 0 < strlen(postfix) ) {
2074 opal_asprintf(&(f_set->remote_target), "%s/%s%s",
2075 global_loc,
2076 ref,
2077 postfix);
2078 } else {
2079 opal_asprintf(&(f_set->remote_target), "%s/%s",
2080 global_loc,
2081 ref);
2082 }
2083 if( NULL != postfix && 0 < strlen(postfix) ) {
2084 f_set->target_flag = ORTE_FILEM_TYPE_FILE;
2085 } else {
2086 f_set->target_flag = ORTE_FILEM_TYPE_DIR;
2087 }
2088
2089 opal_list_append(&(filem_request->file_sets), &(f_set->super) );
2090
2091
2092 opal_list_append(preload_filem_requests, &(filem_request->super));
2093 if(ORTE_SUCCESS != (ret = orte_filem.get_nb(filem_request)) ) {
2094 ORTE_ERROR_LOG(ret);
2095 exit_status = ret;
2096 goto cleanup;
2097 }
2098
2099 cleanup:
2100 if( NULL != full_local_location ) {
2101 free(full_local_location);
2102 full_local_location = NULL;
2103 }
2104
2105 return exit_status;
2106 }