This source file includes following definitions.
- orte_sstore_central_global_snapshot_info_construct
- orte_sstore_central_global_snapshot_info_destruct
- orte_sstore_central_global_module_init
- orte_sstore_central_global_module_finalize
- orte_sstore_central_global_request_checkpoint_handle
- orte_sstore_central_global_request_restart_handle
- orte_sstore_central_global_request_global_snapshot_data
- orte_sstore_central_global_register
- orte_sstore_central_global_get_attr
- orte_sstore_central_global_set_attr
- orte_sstore_central_global_sync
- orte_sstore_central_global_remove
- orte_sstore_central_global_pack
- orte_sstore_central_global_unpack
- create_new_handle_info
- find_handle_info
- find_handle_info_from_ref
- sstore_central_global_start_listener
- sstore_central_global_stop_listener
- sstore_central_global_recv
- process_local_pull
- process_local_push
- init_global_snapshot_directory
- metadata_open
- metadata_close
- metadata_write_int
- metadata_write_str
- metadata_write_timestamp
- orte_sstore_central_extract_global_metadata
- central_snapshot_sort_compare_fn
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 #include "orte_config.h"
20
21 #include <string.h>
22 #include <stdlib.h>
23 #include <sys/types.h>
24 #include <sys/stat.h>
25 #include <sys/wait.h>
26 #ifdef HAVE_UNISTD_H
27 #include <unistd.h>
28 #endif
29
30 #include "orte/mca/mca.h"
31 #include "opal/mca/base/base.h"
32
33 #include "opal/mca/event/event.h"
34
35 #include "orte/constants.h"
36 #include "orte/util/show_help.h"
37 #include "opal/util/argv.h"
38 #include "opal/util/output.h"
39 #include "opal/util/opal_environ.h"
40 #include "opal/util/basename.h"
41 #include "opal/util/os_dirpath.h"
42
43 #include "opal/threads/mutex.h"
44 #include "opal/threads/condition.h"
45
46 #include "orte/util/name_fns.h"
47 #include "orte/util/proc_info.h"
48 #include "orte/runtime/orte_globals.h"
49 #include "orte/runtime/orte_wait.h"
50 #include "orte/mca/errmgr/errmgr.h"
51 #include "orte/mca/ess/ess.h"
52 #include "orte/mca/rml/rml.h"
53 #include "orte/mca/rml/rml_types.h"
54 #include "orte/mca/snapc/snapc.h"
55 #include "orte/mca/snapc/base/base.h"
56
57 #include "orte/mca/sstore/sstore.h"
58 #include "orte/mca/sstore/base/base.h"
59
60 #include "sstore_central.h"
61
62 #define SSTORE_HANDLE_TYPE_NONE 0
63 #define SSTORE_HANDLE_TYPE_CKPT 1
64 #define SSTORE_HANDLE_TYPE_RESTART 2
65
66 #define SSTORE_GLOBAL_NONE 0
67 #define SSTORE_GLOBAL_ERROR 1
68 #define SSTORE_GLOBAL_INIT 2
69 #define SSTORE_GLOBAL_REG 3
70 #define SSTORE_GLOBAL_SYNCING 4
71 #define SSTORE_GLOBAL_SYNCED 5
72
73
74
75
76 struct orte_sstore_central_global_snapshot_info_t {
77
78 opal_list_item_t super;
79
80
81 orte_sstore_base_handle_t id;
82
83
84 orte_jobid_t jobid;
85
86
87 int state;
88
89
90 int handle_type;
91
92
93 int seq_num;
94
95
96 char * ref_name;
97
98
99 char * local_location;
100
101
102 char * app_location_fmt;
103
104
105 char * base_location;
106
107
108 char *metadata_filename;
109
110
111 FILE *metadata;
112
113
114 int num_procs_total;
115
116
117 int num_procs_synced;
118
119
120 bool migrating;
121 };
122 typedef struct orte_sstore_central_global_snapshot_info_t orte_sstore_central_global_snapshot_info_t;
123 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_sstore_central_global_snapshot_info_t);
124
125 void orte_sstore_central_global_snapshot_info_construct(orte_sstore_central_global_snapshot_info_t *info);
126 void orte_sstore_central_global_snapshot_info_destruct( orte_sstore_central_global_snapshot_info_t *info);
127
128 OBJ_CLASS_INSTANCE(orte_sstore_central_global_snapshot_info_t,
129 opal_list_item_t,
130 orte_sstore_central_global_snapshot_info_construct,
131 orte_sstore_central_global_snapshot_info_destruct);
132
133
134
135
136
137 static bool is_global_listener_active = false;
138 static int sstore_central_global_start_listener(void);
139 static int sstore_central_global_stop_listener(void);
140 static void sstore_central_global_recv(int status,
141 orte_process_name_t* sender,
142 opal_buffer_t* buffer,
143 orte_rml_tag_t tag,
144 void* cbdata);
145 static int process_local_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_central_global_snapshot_info_t *handle_info);
146 static int process_local_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_central_global_snapshot_info_t *handle_info);
147
148 static orte_sstore_central_global_snapshot_info_t *create_new_handle_info(int seq, int type, orte_jobid_t jobid);
149 static orte_sstore_central_global_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle);
150 static orte_sstore_central_global_snapshot_info_t *find_handle_info_from_ref(char *ref, int seq);
151
152 static int metadata_open(orte_sstore_central_global_snapshot_info_t * handle_info);
153 static int metadata_close(orte_sstore_central_global_snapshot_info_t * handle_info);
154 static int metadata_write_int(orte_sstore_central_global_snapshot_info_t * handle_info, char * key, int value);
155 static int metadata_write_str(orte_sstore_central_global_snapshot_info_t * handle_info, char * key, char *value);
156 static int metadata_write_timestamp(orte_sstore_central_global_snapshot_info_t * handle_info);
157
158 static int init_global_snapshot_directory(orte_sstore_central_global_snapshot_info_t *handle_info);
159 static int central_snapshot_sort_compare_fn(opal_list_item_t **a,
160 opal_list_item_t **b);
161 static int orte_sstore_central_extract_global_metadata(orte_sstore_central_global_snapshot_info_t * handle_info,
162 orte_sstore_base_global_snapshot_info_t *global_snapshot);
163
164 static int next_handle_id = 1;
165
166 static opal_list_t *active_handles = NULL;
167
168
169
170
171 void orte_sstore_central_global_snapshot_info_construct(orte_sstore_central_global_snapshot_info_t *info)
172 {
173 info->id = next_handle_id;
174 next_handle_id++;
175
176 info->jobid = ORTE_JOBID_INVALID;
177
178 info->state = SSTORE_GLOBAL_NONE;
179
180 info->handle_type = SSTORE_HANDLE_TYPE_NONE;
181
182 info->seq_num = -1;
183
184 info->base_location = strdup(orte_sstore_base_global_snapshot_dir);
185
186 info->ref_name = NULL;
187 info->local_location = NULL;
188 info->app_location_fmt = NULL;
189
190 info->metadata_filename = NULL;
191 info->metadata = NULL;
192
193 info->num_procs_total = 0;
194 info->num_procs_synced = 0;
195
196 info->migrating = false;
197 }
198
199 void orte_sstore_central_global_snapshot_info_destruct( orte_sstore_central_global_snapshot_info_t *info)
200 {
201 info->id = 0;
202 info->seq_num = -1;
203
204 info->jobid = ORTE_JOBID_INVALID;
205
206 info->state = SSTORE_GLOBAL_NONE;
207
208 info->handle_type = SSTORE_HANDLE_TYPE_NONE;
209
210 if( NULL != info->ref_name ) {
211 free( info->ref_name );
212 info->ref_name = NULL;
213 }
214
215 if( NULL != info->local_location ) {
216 free( info->local_location );
217 info->local_location = NULL;
218 }
219
220 if( NULL != info->app_location_fmt ) {
221 free( info->app_location_fmt );
222 info->app_location_fmt = NULL;
223 }
224
225 if( NULL != info->base_location ) {
226 free( info->base_location );
227 info->base_location = NULL;
228 }
229
230 if( NULL != info->metadata_filename ) {
231 free( info->metadata_filename ) ;
232 info->metadata_filename = NULL;
233 }
234
235 if( NULL != info->metadata ) {
236 fclose(info->metadata);
237 info->metadata = NULL;
238 }
239
240 info->num_procs_total = 0;
241 info->num_procs_synced = 0;
242
243 info->migrating = false;
244 }
245
246
247
248
249 int orte_sstore_central_global_module_init(void)
250 {
251 int ret, exit_status = ORTE_SUCCESS;
252
253 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
254 "sstore:central:(global): init()"));
255
256 if( NULL == active_handles ) {
257 active_handles = OBJ_NEW(opal_list_t);
258 }
259
260
261
262
263 if( ORTE_SUCCESS != (ret = sstore_central_global_start_listener()) ) {
264 ORTE_ERROR_LOG(ret);
265 exit_status = ret;
266 goto cleanup;
267 }
268
269 exit_status = orte_sstore_central_local_module_init();
270
271 cleanup:
272 return exit_status;
273 }
274
275 int orte_sstore_central_global_module_finalize(void)
276 {
277 int ret, exit_status = ORTE_SUCCESS;
278
279 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
280 "sstore:central:(global): finalize()"));
281
282 exit_status = orte_sstore_central_local_module_finalize();
283
284 if( NULL != active_handles ) {
285 OBJ_RELEASE(active_handles);
286 }
287
288
289
290
291 if( ORTE_SUCCESS != (ret = sstore_central_global_stop_listener()) ) {
292 ORTE_ERROR_LOG(ret);
293 exit_status = ret;
294 goto cleanup;
295 }
296
297 cleanup:
298 return exit_status;
299 }
300
301 int orte_sstore_central_global_request_checkpoint_handle(orte_sstore_base_handle_t *handle, int seq, orte_jobid_t jobid)
302 {
303 int ret, exit_status = ORTE_SUCCESS;
304 orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
305
306 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
307 "sstore:central:(global): request_checkpoint_handle()"));
308
309
310
311
312
313 handle_info = create_new_handle_info(seq, SSTORE_HANDLE_TYPE_CKPT, jobid);
314
315
316
317
318 if( ORTE_SUCCESS != (ret = init_global_snapshot_directory(handle_info)) ) {
319 ORTE_ERROR_LOG(ret);
320 exit_status = ret;
321 goto cleanup;
322 }
323
324
325
326
327 *handle = handle_info->id;
328
329 cleanup:
330 return exit_status;
331 }
332
333 int orte_sstore_central_global_request_restart_handle(orte_sstore_base_handle_t *handle, char *basedir,
334 char *ref, int seq,
335 orte_sstore_base_global_snapshot_info_t *snapshot)
336 {
337 int ret, exit_status = ORTE_SUCCESS;
338 orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
339
340 handle_info = find_handle_info_from_ref(ref, seq);
341 if( NULL == handle_info ) {
342 ret = ORTE_ERROR;
343 ORTE_ERROR_LOG(ret);
344 exit_status = ret;
345 goto cleanup;
346 }
347
348 *handle = handle_info->id;
349
350 cleanup:
351 return exit_status;
352 }
353
354 int orte_sstore_central_global_request_global_snapshot_data(orte_sstore_base_handle_t *handle,
355 orte_sstore_base_global_snapshot_info_t *snapshot)
356 {
357 int ret, exit_status = ORTE_SUCCESS;
358 orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
359
360 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
361 "sstore:central:(global): request_global_snapshot_data()"));
362
363
364
365
366 if( NULL != handle ) {
367 handle_info = find_handle_info(*handle);
368 snapshot->ss_handle = *handle;
369 } else {
370 handle_info = find_handle_info(orte_sstore_handle_last_stable);
371 snapshot->ss_handle = orte_sstore_handle_last_stable;
372 }
373
374
375
376
377 snapshot->seq_num = handle_info->seq_num;
378 snapshot->reference = strdup(handle_info->ref_name);
379 snapshot->basedir = strdup(handle_info->base_location);
380 snapshot->metadata_filename = strdup(handle_info->metadata_filename);
381
382
383 if( orte_sstore_handle_current == snapshot->ss_handle ) {
384 if( ORTE_SUCCESS != (ret = orte_sstore_central_extract_global_metadata(handle_info, snapshot)) ) {
385 ORTE_ERROR_LOG(ret);
386 exit_status = ret;
387 goto cleanup;
388 }
389 }
390
391 else {
392 if( ORTE_SUCCESS != (ret = orte_sstore_base_extract_global_metadata(snapshot)) ) {
393 ORTE_ERROR_LOG(ret);
394 exit_status = ret;
395 goto cleanup;
396 }
397 }
398
399 opal_list_sort(&snapshot->local_snapshots, central_snapshot_sort_compare_fn);
400
401 cleanup:
402 return exit_status;
403 }
404
405 int orte_sstore_central_global_register(orte_sstore_base_handle_t handle)
406 {
407 int ret, exit_status = ORTE_SUCCESS;
408 orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
409
410 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
411 "sstore:central:(global): register(%d) - Global", handle));
412
413
414
415
416 handle_info = find_handle_info(handle);
417 if( SSTORE_GLOBAL_REG != handle_info->state ) {
418 handle_info->state = SSTORE_GLOBAL_REG;
419 } else {
420 return orte_sstore_central_local_register(handle);
421 }
422
423 orte_sstore_handle_current = handle;
424
425
426
427
428 if( handle_info->migrating ) {
429 if( ORTE_SUCCESS != (ret = metadata_write_int(handle_info,
430 SSTORE_METADATA_INTERNAL_MIG_SEQ_STR,
431 handle_info->seq_num)) ) {
432 ORTE_ERROR_LOG(ret);
433 exit_status = ret;
434 goto cleanup;
435 }
436 } else {
437 if( ORTE_SUCCESS != (ret = metadata_write_int(handle_info,
438 SSTORE_METADATA_GLOBAL_SNAP_SEQ_STR,
439 handle_info->seq_num)) ) {
440 ORTE_ERROR_LOG(ret);
441 exit_status = ret;
442 goto cleanup;
443 }
444 }
445
446 if( ORTE_SUCCESS != (ret = metadata_write_str(handle_info,
447 SSTORE_METADATA_LOCAL_SNAP_REF_FMT_STR,
448 orte_sstore_base_local_snapshot_fmt)) ) {
449 ORTE_ERROR_LOG(ret);
450 exit_status = ret;
451 goto cleanup;
452 }
453
454 if( ORTE_SUCCESS != (ret = metadata_write_timestamp(handle_info)) ) {
455 ORTE_ERROR_LOG(ret);
456 exit_status = ret;
457 goto cleanup;
458 }
459
460 cleanup:
461 return exit_status;
462 }
463
464 int orte_sstore_central_global_get_attr(orte_sstore_base_handle_t handle, orte_sstore_base_key_t key, char **value)
465 {
466 int exit_status = ORTE_SUCCESS;
467 orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
468
469 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
470 "sstore:central:(global): get_attr()"));
471
472
473
474
475 handle_info = find_handle_info(handle);
476
477
478
479
480 if( SSTORE_METADATA_GLOBAL_SNAP_REF == key ) {
481 *value = strdup(handle_info->ref_name);
482 }
483 else if( SSTORE_METADATA_GLOBAL_SNAP_SEQ == key ) {
484 opal_asprintf(value, "%d", handle_info->seq_num);
485 }
486 else if( SSTORE_METADATA_LOCAL_SNAP_REF_FMT == key ) {
487 *value = strdup(orte_sstore_base_local_snapshot_fmt);
488 }
489
490 else if( SSTORE_METADATA_LOCAL_SNAP_LOC == key ) {
491 opal_asprintf(value, "%s/%s/%d",
492 handle_info->base_location,
493 handle_info->ref_name,
494 handle_info->seq_num);
495 }
496 else if( SSTORE_METADATA_LOCAL_SNAP_REF_LOC_FMT == key ) {
497 opal_asprintf(value, "%s/%s/%d/%s",
498 handle_info->base_location,
499 handle_info->ref_name,
500 handle_info->seq_num,
501 orte_sstore_base_local_snapshot_fmt);
502 }
503 else {
504 exit_status = ORTE_ERR_NOT_SUPPORTED;
505 }
506
507 return exit_status;
508 }
509
510 int orte_sstore_central_global_set_attr(orte_sstore_base_handle_t handle, orte_sstore_base_key_t key, char *value)
511 {
512 int ret, exit_status = ORTE_SUCCESS;
513 orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
514 char *key_str = NULL;
515
516 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
517 "sstore:central:(global): set_attr()"));
518
519
520
521
522 handle_info = find_handle_info(handle);
523
524
525
526
527 if( key == SSTORE_METADATA_GLOBAL_MIGRATING ) {
528 handle_info->migrating = true;
529 }
530 else {
531 orte_sstore_base_convert_key_to_string(key, &key_str);
532 if( NULL == key_str ) {
533 ORTE_ERROR_LOG(ORTE_ERROR);
534 exit_status = ORTE_ERROR;
535 goto cleanup;
536 }
537
538 if( ORTE_SUCCESS != (ret = metadata_write_str(handle_info, key_str, value))) {
539 ORTE_ERROR_LOG(ret);
540 exit_status = ret;
541 goto cleanup;
542 }
543 }
544
545 cleanup:
546 if( NULL != key_str ) {
547 free(key_str);
548 key_str = NULL;
549 }
550
551 return exit_status;
552 }
553
554 int orte_sstore_central_global_sync(orte_sstore_base_handle_t handle)
555 {
556 int ret, exit_status = ORTE_SUCCESS;
557 orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
558
559 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
560 "sstore:central:(global): sync()"));
561
562
563
564
565 handle_info = find_handle_info(handle);
566 if( SSTORE_GLOBAL_SYNCING != handle_info->state ) {
567 handle_info->state = SSTORE_GLOBAL_SYNCING;
568 if( ORTE_SNAPC_LOCAL_COORD_TYPE == (orte_snapc_coord_type & ORTE_SNAPC_LOCAL_COORD_TYPE) ) {
569 return orte_sstore_central_local_sync(handle);
570 }
571 }
572
573
574
575
576 while(handle_info->num_procs_synced < handle_info->num_procs_total) {
577 opal_progress();
578 }
579
580
581
582
583 if( ORTE_SUCCESS != (ret = metadata_write_timestamp(handle_info)) ) {
584 ORTE_ERROR_LOG(ret);
585 exit_status = ret;
586 goto cleanup;
587 }
588
589 if( handle_info->migrating ) {
590 if( ORTE_SUCCESS != (ret = metadata_write_int(handle_info,
591 SSTORE_METADATA_INTERNAL_DONE_MIG_SEQ_STR,
592 handle_info->seq_num)) ) {
593 ORTE_ERROR_LOG(ret);
594 exit_status = ret;
595 goto cleanup;
596 }
597 } else {
598 if( ORTE_SUCCESS != (ret = metadata_write_int(handle_info,
599 SSTORE_METADATA_INTERNAL_DONE_SEQ_STR,
600 handle_info->seq_num)) ) {
601 ORTE_ERROR_LOG(ret);
602 exit_status = ret;
603 goto cleanup;
604 }
605 }
606
607 if( ORTE_SUCCESS != (ret = metadata_close(handle_info)) ) {
608 ORTE_ERROR_LOG(ret);
609 exit_status = ret;
610 goto cleanup;
611 }
612
613
614 if( !handle_info->migrating ) {
615 orte_sstore_base_is_checkpoint_available = true;
616 orte_sstore_handle_last_stable = orte_sstore_handle_current;
617 }
618
619 cleanup:
620 return exit_status;
621 }
622
623 int orte_sstore_central_global_remove(orte_sstore_base_handle_t handle)
624 {
625 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
626 "sstore:central:(global): remove()"));
627
628
629
630
631
632 return ORTE_SUCCESS;
633 }
634
635 int orte_sstore_central_global_pack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t handle)
636 {
637 int ret, exit_status = ORTE_SUCCESS;
638 orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
639
640 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
641 "sstore:central:(global): pack()"));
642
643
644
645
646 handle_info = find_handle_info(handle);
647
648
649
650
651 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &handle, 1, ORTE_SSTORE_HANDLE))) {
652 ORTE_ERROR_LOG(ret);
653 exit_status = ret;
654 goto cleanup;
655 }
656
657 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
658 "sstore:central:(global): pack(%d, %d, %s)",
659 handle_info->id,
660 handle_info->seq_num,
661 handle_info->ref_name));
662
663
664
665
666 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->seq_num), 1, OPAL_INT )) ) {
667 ORTE_ERROR_LOG(ret);
668 exit_status = ret;
669 goto cleanup;
670 }
671
672 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->ref_name), 1, OPAL_STRING )) ) {
673 ORTE_ERROR_LOG(ret);
674 exit_status = ret;
675 goto cleanup;
676 }
677
678 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->app_location_fmt), 1, OPAL_STRING )) ) {
679 ORTE_ERROR_LOG(ret);
680 exit_status = ret;
681 goto cleanup;
682 }
683
684 cleanup:
685 return exit_status;
686 }
687
688 int orte_sstore_central_global_unpack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t *handle)
689 {
690 int ret, exit_status = ORTE_SUCCESS;
691
692 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
693 "sstore:central:(global): unpack()"));
694
695
696
697
698 if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_JOBID,
699 ORTE_PROC_MY_NAME,
700 peer)) {
701
702
703
704 if( ORTE_SUCCESS != (ret = orte_sstore_central_local_unpack(peer, buffer, handle)) ) {
705 ORTE_ERROR_LOG(ret);
706 exit_status = ret;
707 goto cleanup;
708 }
709 }
710
711 cleanup:
712 return exit_status;
713 }
714
715
716
717
718 static orte_sstore_central_global_snapshot_info_t *create_new_handle_info(int seq, int type, orte_jobid_t jobid)
719 {
720 orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
721 orte_job_t *jdata = NULL;
722
723 handle_info = OBJ_NEW(orte_sstore_central_global_snapshot_info_t);
724
725 handle_info->jobid = jobid;
726
727 handle_info->state = SSTORE_GLOBAL_INIT;
728
729 handle_info->handle_type = type;
730
731 handle_info->seq_num = seq;
732
733 orte_sstore_base_get_global_snapshot_ref(&(handle_info->ref_name), getpid());
734
735 opal_asprintf(&(handle_info->local_location), "%s/%d",
736 handle_info->ref_name, handle_info->seq_num);
737
738 opal_asprintf(&(handle_info->app_location_fmt), "%s/%s/%s",
739 handle_info->base_location,
740 handle_info->local_location,
741 orte_sstore_base_local_snapshot_fmt);
742
743 opal_asprintf(&(handle_info->metadata_filename), "%s/%s/%s",
744 handle_info->base_location,
745 handle_info->ref_name,
746 orte_sstore_base_global_metadata_filename);
747
748 jdata = orte_get_job_data_object(handle_info->jobid);
749 handle_info->num_procs_total = (int)jdata->num_procs;
750
751 opal_list_append(active_handles, &(handle_info->super));
752
753 return handle_info;
754 }
755
756 static orte_sstore_central_global_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle)
757 {
758 orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
759 opal_list_item_t* item = NULL;
760
761 for(item = opal_list_get_first(active_handles);
762 item != opal_list_get_end(active_handles);
763 item = opal_list_get_next(item) ) {
764 handle_info = (orte_sstore_central_global_snapshot_info_t*)item;
765
766 if( handle_info->id == handle ) {
767 return handle_info;
768 }
769 }
770
771 return NULL;
772 }
773
774 static orte_sstore_central_global_snapshot_info_t *find_handle_info_from_ref(char *ref, int seq)
775 {
776 orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
777 opal_list_item_t* item = NULL;
778
779 for(item = opal_list_get_first(active_handles);
780 item != opal_list_get_end(active_handles);
781 item = opal_list_get_next(item) ) {
782 handle_info = (orte_sstore_central_global_snapshot_info_t*)item;
783
784 if( handle_info->seq_num == seq ) {
785 if( NULL != ref &&
786 strncmp(handle_info->ref_name, ref, strlen(ref)) ) {
787 return handle_info;
788 } else {
789 return handle_info;
790 }
791 }
792 }
793
794 return NULL;
795 }
796
797 static int sstore_central_global_start_listener(void)
798 {
799 if( is_global_listener_active ) {
800 return ORTE_SUCCESS;
801 }
802
803 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL,
804 ORTE_RML_PERSISTENT, sstore_central_global_recv, NULL);
805
806 is_global_listener_active = true;
807 return ORTE_SUCCESS;
808 }
809
810 static int sstore_central_global_stop_listener(void)
811 {
812 orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL);
813
814 is_global_listener_active = false;
815 return ORTE_SUCCESS;
816 }
817
818 static void sstore_central_global_recv(int status,
819 orte_process_name_t* sender,
820 opal_buffer_t* buffer,
821 orte_rml_tag_t tag,
822 void* cbdata)
823 {
824 int ret;
825 orte_sstore_central_cmd_flag_t command;
826 orte_std_cntr_t count;
827 orte_sstore_base_handle_t loc_id;
828 orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
829
830 if( ORTE_RML_TAG_SSTORE_INTERNAL != tag ) {
831 return;
832 }
833
834
835
836
837
838 if(OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_JOBID,
839 ORTE_PROC_MY_NAME,
840 sender)) {
841 orte_sstore_central_local_recv(status, sender, buffer, tag, cbdata);
842 return;
843 }
844
845
846 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
847 "sstore:central:(global): process_cmd(%s)",
848 ORTE_NAME_PRINT(sender)));
849
850 count = 1;
851 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SSTORE_CENTRAL_CMD))) {
852 ORTE_ERROR_LOG(ret);
853 goto cleanup;
854 }
855
856 count = 1;
857 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &loc_id, &count, ORTE_SSTORE_HANDLE )) ) {
858 ORTE_ERROR_LOG(ret);
859 goto cleanup;
860 }
861
862
863
864
865 if(NULL == (handle_info = find_handle_info(loc_id)) ) {
866 ;
867 }
868
869
870
871
872 if( ORTE_SSTORE_CENTRAL_PULL == command ) {
873 process_local_pull(sender, buffer, handle_info);
874 }
875 else if( ORTE_SSTORE_CENTRAL_PUSH == command ) {
876 process_local_push(sender, buffer, handle_info);
877 }
878
879 cleanup:
880 return;
881 }
882
883 static int process_local_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_central_global_snapshot_info_t *handle_info)
884 {
885 int ret, exit_status = ORTE_SUCCESS;
886 opal_buffer_t *loc_buffer = NULL;
887 orte_sstore_central_cmd_flag_t command;
888
889
890
891
892 loc_buffer = OBJ_NEW(opal_buffer_t);
893
894 command = ORTE_SSTORE_CENTRAL_PUSH;
895 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &command, 1, ORTE_SSTORE_CENTRAL_CMD))) {
896 ORTE_ERROR_LOG(ret);
897 exit_status = ret;
898 goto cleanup;
899 }
900
901 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
902 ORTE_ERROR_LOG(ret);
903 exit_status = ret;
904 goto cleanup;
905 }
906
907 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->seq_num), 1, OPAL_INT))) {
908 ORTE_ERROR_LOG(ret);
909 exit_status = ret;
910 goto cleanup;
911 }
912
913 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->ref_name), 1, OPAL_STRING))) {
914 ORTE_ERROR_LOG(ret);
915 exit_status = ret;
916 goto cleanup;
917 }
918
919 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->app_location_fmt), 1, OPAL_STRING))) {
920 ORTE_ERROR_LOG(ret);
921 exit_status = ret;
922 goto cleanup;
923 }
924
925 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(peer, loc_buffer, ORTE_RML_TAG_SSTORE_INTERNAL,
926 orte_rml_send_callback, NULL))) {
927 ORTE_ERROR_LOG(ret);
928 exit_status = ret;
929 goto cleanup;
930 }
931
932 loc_buffer = NULL;
933
934 cleanup:
935 if (NULL != loc_buffer) {
936 OBJ_RELEASE(loc_buffer);
937 loc_buffer = NULL;
938 }
939
940 return exit_status;
941 }
942
943 static int process_local_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_central_global_snapshot_info_t *handle_info)
944 {
945 int ret, exit_status = ORTE_SUCCESS;
946 orte_std_cntr_t count;
947 size_t num_entries, i;
948 orte_process_name_t name;
949 bool ckpt_skipped = false;
950 char * crs_comp = NULL;
951 char * proc_name = NULL;
952
953
954
955
956 count = 1;
957 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &num_entries, &count, OPAL_SIZE))) {
958 ORTE_ERROR_LOG(ret);
959 exit_status = ret;
960 goto cleanup;
961 }
962
963 for(i = 0; i < num_entries; ++i ) {
964 count = 1;
965 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &name, &count, ORTE_NAME))) {
966 ORTE_ERROR_LOG(ret);
967 exit_status = ret;
968 goto cleanup;
969 }
970
971 count = 1;
972 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &ckpt_skipped, &count, OPAL_BOOL))) {
973 ORTE_ERROR_LOG(ret);
974 exit_status = ret;
975 goto cleanup;
976 }
977
978 if( !ckpt_skipped ) {
979 count = 1;
980 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &crs_comp, &count, OPAL_STRING))) {
981 ORTE_ERROR_LOG(ret);
982 exit_status = ret;
983 goto cleanup;
984 }
985
986
987
988
989 orte_util_convert_process_name_to_string(&proc_name, &name);
990
991 metadata_write_str(handle_info,
992 SSTORE_METADATA_INTERNAL_PROCESS_STR,
993 proc_name);
994 metadata_write_str(handle_info,
995 SSTORE_METADATA_LOCAL_CRS_COMP_STR,
996 crs_comp);
997 }
998
999 if( NULL != crs_comp ) {
1000 free(crs_comp);
1001 crs_comp = NULL;
1002 }
1003 if( NULL != proc_name ) {
1004 free(proc_name);
1005 proc_name = NULL;
1006 }
1007
1008 (handle_info->num_procs_synced)++;
1009 }
1010
1011 cleanup:
1012 if( NULL != crs_comp ) {
1013 free(crs_comp);
1014 crs_comp = NULL;
1015 }
1016 if( NULL != proc_name ) {
1017 free(proc_name);
1018 proc_name = NULL;
1019 }
1020
1021 return exit_status;
1022 }
1023
1024 static int init_global_snapshot_directory(orte_sstore_central_global_snapshot_info_t *handle_info)
1025 {
1026 int ret, exit_status = ORTE_SUCCESS;
1027 char * dir_name = NULL;
1028 mode_t my_mode = S_IRWXU;
1029
1030
1031
1032
1033 opal_asprintf(&dir_name, "%s/%s",
1034 handle_info->base_location,
1035 handle_info->local_location);
1036 if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(dir_name, my_mode)) ) {
1037 ORTE_ERROR_LOG(ret);
1038 exit_status = ret;
1039 goto cleanup;
1040 }
1041
1042
1043
1044
1045 if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
1046 ORTE_ERROR_LOG(ret);
1047 exit_status = ret;
1048 goto cleanup;
1049 }
1050
1051 cleanup:
1052 if(NULL != dir_name) {
1053 free(dir_name);
1054 dir_name = NULL;
1055 }
1056
1057 return exit_status;
1058 }
1059
1060
1061
1062
1063 static int metadata_open(orte_sstore_central_global_snapshot_info_t * handle_info)
1064 {
1065
1066 if( NULL != handle_info->metadata ) {
1067 return ORTE_SUCCESS;
1068 }
1069
1070 if (NULL == (handle_info->metadata = fopen(handle_info->metadata_filename, "a")) ) {
1071 opal_output(orte_sstore_base_framework.framework_output,
1072 "sstore:central:(global):init_dir() Unable to open the file (%s)\n",
1073 handle_info->metadata_filename);
1074 ORTE_ERROR_LOG(ORTE_ERROR);
1075 return ORTE_ERROR;
1076 }
1077
1078 return ORTE_SUCCESS;
1079 }
1080
1081 static int metadata_close(orte_sstore_central_global_snapshot_info_t * handle_info)
1082 {
1083
1084 if( NULL == handle_info->metadata ) {
1085 return ORTE_SUCCESS;
1086 }
1087
1088 fclose(handle_info->metadata);
1089 handle_info->metadata = NULL;
1090
1091 return ORTE_SUCCESS;
1092 }
1093
1094 static int metadata_write_int(orte_sstore_central_global_snapshot_info_t * handle_info, char *key, int value)
1095 {
1096 int ret, exit_status = ORTE_SUCCESS;
1097
1098
1099 if( NULL == handle_info->metadata ) {
1100 if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
1101 ORTE_ERROR_LOG(ret);
1102 exit_status = ret;
1103 goto cleanup;
1104 }
1105 }
1106
1107 fprintf(handle_info->metadata, "%s%d\n", key, value);
1108
1109 cleanup:
1110 return exit_status;
1111 }
1112
1113 static int metadata_write_str(orte_sstore_central_global_snapshot_info_t * handle_info, char *key, char *value)
1114 {
1115 int ret, exit_status = ORTE_SUCCESS;
1116
1117
1118 if( NULL == handle_info->metadata ) {
1119 if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
1120 ORTE_ERROR_LOG(ret);
1121 exit_status = ret;
1122 goto cleanup;
1123 }
1124 }
1125
1126 fprintf(handle_info->metadata, "%s%s\n", key, value);
1127
1128 cleanup:
1129 return exit_status;
1130 }
1131
1132 static int metadata_write_timestamp(orte_sstore_central_global_snapshot_info_t * handle_info)
1133 {
1134 int ret, exit_status = ORTE_SUCCESS;
1135 time_t timestamp;
1136
1137
1138 if( NULL == handle_info->metadata ) {
1139 if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
1140 ORTE_ERROR_LOG(ret);
1141 exit_status = ret;
1142 goto cleanup;
1143 }
1144 }
1145
1146 timestamp = time(NULL);
1147 fprintf(handle_info->metadata, "%s%s",
1148 SSTORE_METADATA_INTERNAL_TIME_STR,
1149 ctime(×tamp));
1150
1151 cleanup:
1152 return exit_status;
1153 }
1154
1155 static int orte_sstore_central_extract_global_metadata(orte_sstore_central_global_snapshot_info_t * handle_info,
1156 orte_sstore_base_global_snapshot_info_t *global_snapshot)
1157 {
1158 int exit_status = ORTE_SUCCESS;
1159 orte_sstore_base_local_snapshot_info_t *vpid_snapshot = NULL;
1160 opal_list_item_t* item = NULL;
1161 int i = 0;
1162
1163
1164
1165
1166 while (NULL != (item = opal_list_remove_first(&global_snapshot->local_snapshots))) {
1167 OBJ_RELEASE(item);
1168 }
1169
1170 if( NULL != global_snapshot->start_time ) {
1171 free( global_snapshot->start_time );
1172 global_snapshot->start_time = NULL;
1173 }
1174
1175 if( NULL != global_snapshot->end_time ) {
1176 free( global_snapshot->end_time );
1177 global_snapshot->end_time = NULL;
1178 }
1179
1180
1181
1182
1183 for(i = 0; i < handle_info->num_procs_total; ++i) {
1184 vpid_snapshot = OBJ_NEW(orte_sstore_base_local_snapshot_info_t);
1185 vpid_snapshot->ss_handle = handle_info->id;
1186
1187 vpid_snapshot->process_name.jobid = handle_info->jobid;
1188 vpid_snapshot->process_name.vpid = i;
1189
1190 vpid_snapshot->crs_comp = NULL;
1191 global_snapshot->start_time = NULL;
1192 global_snapshot->end_time = NULL;
1193
1194 opal_list_append(&global_snapshot->local_snapshots, &(vpid_snapshot->super));
1195 }
1196
1197 return exit_status;
1198 }
1199
1200 static int central_snapshot_sort_compare_fn(opal_list_item_t **a,
1201 opal_list_item_t **b)
1202 {
1203 orte_sstore_base_local_snapshot_info_t *snap_a, *snap_b;
1204
1205 snap_a = (orte_sstore_base_local_snapshot_info_t*)(*a);
1206 snap_b = (orte_sstore_base_local_snapshot_info_t*)(*b);
1207
1208 if( snap_a->process_name.vpid > snap_b->process_name.vpid ) {
1209 return 1;
1210 }
1211 else if( snap_a->process_name.vpid == snap_b->process_name.vpid ) {
1212 return 0;
1213 }
1214 else {
1215 return -1;
1216 }
1217 }