This source file includes following definitions.
- orte_sstore_central_local_snapshot_info_construct
- orte_sstore_central_local_snapshot_info_destruct
- orte_sstore_central_local_app_snapshot_info_construct
- orte_sstore_central_local_app_snapshot_info_destruct
- orte_sstore_central_local_module_init
- orte_sstore_central_local_module_finalize
- orte_sstore_central_local_request_checkpoint_handle
- orte_sstore_central_local_register
- orte_sstore_central_local_get_attr
- orte_sstore_central_local_set_attr
- orte_sstore_central_local_sync
- orte_sstore_central_local_remove
- orte_sstore_central_local_pack
- orte_sstore_central_local_unpack
- orte_sstore_central_local_recv
- create_new_handle_info
- find_handle_info
- append_new_app_handle_info
- find_app_handle_info
- sstore_central_local_start_listener
- sstore_central_local_stop_listener
- process_global_pull
- process_global_push
- process_app_pull
- process_app_push
- wait_all_apps_updated
- pull_handle_info
- push_handle_info
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 #include "orte_config.h"
20
21 #include <string.h>
22 #include <stdlib.h>
23 #include <sys/types.h>
24 #include <sys/stat.h>
25 #include <sys/wait.h>
26 #ifdef HAVE_UNISTD_H
27 #include <unistd.h>
28 #endif
29
30 #include "orte/mca/mca.h"
31 #include "opal/mca/base/base.h"
32
33 #include "opal/mca/event/event.h"
34
35 #include "orte/constants.h"
36 #include "orte/util/show_help.h"
37 #include "opal/util/argv.h"
38 #include "opal/util/output.h"
39 #include "opal/util/opal_environ.h"
40 #include "opal/util/basename.h"
41
42 #include "opal/threads/mutex.h"
43 #include "opal/threads/condition.h"
44
45 #include "orte/util/name_fns.h"
46 #include "orte/util/proc_info.h"
47 #include "orte/runtime/orte_globals.h"
48 #include "orte/runtime/orte_wait.h"
49 #include "orte/mca/errmgr/errmgr.h"
50 #include "orte/mca/rml/rml.h"
51 #include "orte/mca/rml/rml_types.h"
52 #include "orte/mca/odls/odls_types.h"
53
54 #include "orte/mca/sstore/sstore.h"
55 #include "orte/mca/sstore/base/base.h"
56
57 #include "sstore_central.h"
58
59
60
61
62 #define SSTORE_LOCAL_NONE 0
63 #define SSTORE_LOCAL_ERROR 1
64 #define SSTORE_LOCAL_INIT 2
65 #define SSTORE_LOCAL_READY 3
66 #define SSTORE_LOCAL_SYNCED 4
67
68 struct orte_sstore_central_local_snapshot_info_t {
69
70 opal_list_item_t super;
71
72
73 orte_sstore_base_handle_t id;
74
75
76 int status;
77
78
79 int seq_num;
80
81
82 char * global_ref_name;
83
84
85 char * location_fmt;
86
87
88 opal_list_t *app_info_handle;
89 };
90 typedef struct orte_sstore_central_local_snapshot_info_t orte_sstore_central_local_snapshot_info_t;
91 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_sstore_central_local_snapshot_info_t);
92
93 void orte_sstore_central_local_snapshot_info_construct(orte_sstore_central_local_snapshot_info_t *info);
94 void orte_sstore_central_local_snapshot_info_destruct( orte_sstore_central_local_snapshot_info_t *info);
95
96 OBJ_CLASS_INSTANCE(orte_sstore_central_local_snapshot_info_t,
97 opal_list_item_t,
98 orte_sstore_central_local_snapshot_info_construct,
99 orte_sstore_central_local_snapshot_info_destruct);
100
101 struct orte_sstore_central_local_app_snapshot_info_t {
102
103 opal_list_item_t super;
104
105
106 orte_process_name_t name;
107
108
109 char * local_location;
110
111
112 char * metadata_filename;
113
114
115 char * crs_comp;
116
117
118 bool ckpt_skipped;
119 };
120 typedef struct orte_sstore_central_local_app_snapshot_info_t orte_sstore_central_local_app_snapshot_info_t;
121 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_sstore_central_local_app_snapshot_info_t);
122
123 void orte_sstore_central_local_app_snapshot_info_construct(orte_sstore_central_local_app_snapshot_info_t *info);
124 void orte_sstore_central_local_app_snapshot_info_destruct( orte_sstore_central_local_app_snapshot_info_t *info);
125
126 OBJ_CLASS_INSTANCE(orte_sstore_central_local_app_snapshot_info_t,
127 opal_list_item_t,
128 orte_sstore_central_local_app_snapshot_info_construct,
129 orte_sstore_central_local_app_snapshot_info_destruct);
130
131
132
133
134
135
136 static bool is_global_listener_active = false;
137 static int sstore_central_local_start_listener(void);
138 static int sstore_central_local_stop_listener(void);
139
140 static int process_global_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_central_local_snapshot_info_t *handle_info);
141 static int process_global_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_central_local_snapshot_info_t *handle_info);
142 static int process_app_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_central_local_snapshot_info_t *handle_info);
143 static int process_app_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_central_local_snapshot_info_t *handle_info);
144
145 static orte_sstore_central_local_snapshot_info_t *create_new_handle_info(orte_sstore_base_handle_t handle);
146 static orte_sstore_central_local_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle);
147
148 static int append_new_app_handle_info(orte_sstore_central_local_snapshot_info_t *handle_info,
149 orte_process_name_t *name);
150 static orte_sstore_central_local_app_snapshot_info_t *find_app_handle_info(orte_sstore_central_local_snapshot_info_t *handle_info,
151 orte_process_name_t *name);
152
153 static int pull_handle_info(orte_sstore_central_local_snapshot_info_t *handle_info );
154 static int push_handle_info(orte_sstore_central_local_snapshot_info_t *handle_info );
155
156 static int wait_all_apps_updated(orte_sstore_central_local_snapshot_info_t *handle_info);
157
158
159 static opal_list_t *active_handles = NULL;
160
161
162
163
164 void orte_sstore_central_local_snapshot_info_construct(orte_sstore_central_local_snapshot_info_t *info)
165 {
166 info->id = 0;
167
168 info->status = SSTORE_LOCAL_NONE;
169
170 info->seq_num = -1;
171
172 info->global_ref_name = NULL;
173
174 info->location_fmt = NULL;
175
176 info->app_info_handle = OBJ_NEW(opal_list_t);
177 }
178
179 void orte_sstore_central_local_snapshot_info_destruct( orte_sstore_central_local_snapshot_info_t *info)
180 {
181 info->id = 0;
182
183 info->status = SSTORE_LOCAL_NONE;
184
185 info->seq_num = -1;
186
187 if( NULL != info->global_ref_name ) {
188 free( info->global_ref_name );
189 info->global_ref_name = NULL;
190 }
191
192 if( NULL != info->location_fmt ) {
193 free( info->location_fmt );
194 info->location_fmt = NULL;
195 }
196
197 if( NULL != info->app_info_handle ) {
198 OBJ_RELEASE(info->app_info_handle);
199 info->app_info_handle = NULL;
200 }
201 }
202
203 void orte_sstore_central_local_app_snapshot_info_construct(orte_sstore_central_local_app_snapshot_info_t *info)
204 {
205 info->name.jobid = ORTE_JOBID_INVALID;
206 info->name.vpid = ORTE_VPID_INVALID;
207
208 info->local_location = NULL;
209 info->metadata_filename = NULL;
210 info->crs_comp = NULL;
211 info->ckpt_skipped = false;
212 }
213
214 void orte_sstore_central_local_app_snapshot_info_destruct( orte_sstore_central_local_app_snapshot_info_t *info)
215 {
216 info->name.jobid = ORTE_JOBID_INVALID;
217 info->name.vpid = ORTE_VPID_INVALID;
218
219 if( NULL != info->local_location ) {
220 free(info->local_location);
221 info->local_location = NULL;
222 }
223
224 if( NULL != info->metadata_filename ) {
225 free(info->metadata_filename);
226 info->metadata_filename = NULL;
227 }
228
229 if( NULL != info->crs_comp ) {
230 free(info->crs_comp);
231 info->crs_comp = NULL;
232 }
233
234 info->ckpt_skipped = false;
235 }
236
237
238
239
240 int orte_sstore_central_local_module_init(void)
241 {
242 int ret, exit_status = ORTE_SUCCESS;
243
244 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
245 "sstore:central:(local): init()"));
246
247 if( NULL == active_handles ) {
248 active_handles = OBJ_NEW(opal_list_t);
249 }
250
251
252
253
254
255 if( !ORTE_PROC_IS_HNP ) {
256 if( ORTE_SUCCESS != (ret = sstore_central_local_start_listener()) ) {
257 ORTE_ERROR_LOG(ret);
258 exit_status = ret;
259 goto cleanup;
260 }
261 }
262
263 cleanup:
264 return exit_status;
265 }
266
267 int orte_sstore_central_local_module_finalize(void)
268 {
269 int ret, exit_status = ORTE_SUCCESS;
270
271 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
272 "sstore:central:(local): finalize()"));
273
274 if( NULL != active_handles ) {
275 OBJ_RELEASE(active_handles);
276 }
277
278
279
280
281
282 if( !ORTE_PROC_IS_HNP ) {
283 if( ORTE_SUCCESS != (ret = sstore_central_local_stop_listener()) ) {
284 ORTE_ERROR_LOG(ret);
285 exit_status = ret;
286 goto cleanup;
287 }
288 }
289
290 cleanup:
291 return exit_status;
292 }
293
294 int orte_sstore_central_local_request_checkpoint_handle(orte_sstore_base_handle_t *handle, int seq, orte_jobid_t jobid)
295 {
296 opal_output(0, "sstore:central:(local): request_checkpoint_handle() Not implemented!");
297 return ORTE_ERR_NOT_IMPLEMENTED;
298 }
299
300 int orte_sstore_central_local_register(orte_sstore_base_handle_t handle)
301 {
302 int ret, exit_status = ORTE_SUCCESS;
303 orte_sstore_central_local_snapshot_info_t *handle_info = NULL;
304
305 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
306 "sstore:central:(local): register()"));
307
308
309
310
311 if( NULL == (handle_info = find_handle_info(handle)) ) {
312 handle_info = create_new_handle_info(handle);
313 }
314
315
316
317
318 if( ORTE_SUCCESS != (ret = pull_handle_info(handle_info)) ) {
319 ORTE_ERROR_LOG(ret);
320 exit_status = ret;
321 goto cleanup;
322 }
323
324
325
326
327 while(SSTORE_LOCAL_READY != handle_info->status &&
328 SSTORE_LOCAL_ERROR != handle_info->status ) {
329 opal_progress();
330 }
331
332 cleanup:
333 return exit_status;
334 }
335
336 int orte_sstore_central_local_get_attr(orte_sstore_base_handle_t handle, orte_sstore_base_key_t key, char **value)
337 {
338 opal_output(0, "sstore:central:(local): get_attr() Not implemented!");
339 return ORTE_ERR_NOT_IMPLEMENTED;
340 }
341
342 int orte_sstore_central_local_set_attr(orte_sstore_base_handle_t handle, orte_sstore_base_key_t key, char *value)
343 {
344 opal_output(0, "sstore:central:(local): set_attr() Not implemented!");
345 return ORTE_ERR_NOT_IMPLEMENTED;
346 }
347
348 int orte_sstore_central_local_sync(orte_sstore_base_handle_t handle)
349 {
350 int ret, exit_status = ORTE_SUCCESS;
351 orte_sstore_central_local_snapshot_info_t *handle_info = NULL;
352
353 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
354 "sstore:central:(local): sync()"));
355
356
357
358
359 handle_info = find_handle_info(handle);
360
361
362
363
364 if( ORTE_SUCCESS != (ret = wait_all_apps_updated(handle_info))) {
365 ORTE_ERROR_LOG(ret);
366 exit_status = ret;
367 goto cleanup;
368 }
369
370
371
372
373 if( ORTE_SUCCESS != (ret = push_handle_info(handle_info)) ) {
374 ORTE_ERROR_LOG(ret);
375 exit_status = ret;
376 goto cleanup;
377 }
378
379 handle_info->status = SSTORE_LOCAL_SYNCED;
380
381 cleanup:
382 return exit_status;
383 }
384
385 int orte_sstore_central_local_remove(orte_sstore_base_handle_t handle)
386 {
387 opal_output(0, "sstore:central:(local): remove() Not implemented!");
388 return ORTE_ERR_NOT_IMPLEMENTED;
389 }
390
391 int orte_sstore_central_local_pack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t handle)
392 {
393 int ret, exit_status = ORTE_SUCCESS;
394
395 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
396 "sstore:central:(local): pack()"));
397
398
399
400
401
402
403
404
405
406 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &handle, 1, ORTE_SSTORE_HANDLE))) {
407 ORTE_ERROR_LOG(ret);
408 exit_status = ret;
409 goto cleanup;
410 }
411
412
413
414
415
416 cleanup:
417 return exit_status;
418 }
419
420 int orte_sstore_central_local_unpack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t *handle)
421 {
422 int ret, exit_status = ORTE_SUCCESS;
423 orte_sstore_central_local_snapshot_info_t *handle_info = NULL;
424 orte_std_cntr_t count;
425
426 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
427 "sstore:central:(local): unpack()"));
428
429
430
431
432 count = 1;
433 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, handle, &count, ORTE_SSTORE_HANDLE))) {
434 ORTE_ERROR_LOG(ret);
435 exit_status = ret;
436 goto cleanup;
437 }
438
439
440
441
442 if( NULL == (handle_info = find_handle_info(*handle)) ) {
443 handle_info = create_new_handle_info(*handle);
444 }
445
446
447
448
449 if( ORTE_SUCCESS != (ret = process_global_push(peer, buffer, handle_info))) {
450 ORTE_ERROR_LOG(ret);
451 exit_status = ret;
452 goto cleanup;
453 }
454
455 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
456 "sstore:central:(local): unpack(%d, %d, %s)",
457 handle_info->id,
458 handle_info->seq_num,
459 handle_info->global_ref_name));
460
461 cleanup:
462 return exit_status;
463 }
464
465 void orte_sstore_central_local_recv(int status,
466 orte_process_name_t* sender,
467 opal_buffer_t* buffer,
468 orte_rml_tag_t tag,
469 void* cbdata)
470 {
471 int ret;
472 orte_sstore_central_cmd_flag_t command;
473 orte_std_cntr_t count;
474 orte_sstore_base_handle_t loc_id;
475 orte_sstore_central_local_snapshot_info_t *handle_info = NULL;
476
477 if( ORTE_RML_TAG_SSTORE_INTERNAL != tag ) {
478 return;
479 }
480
481 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
482 "sstore:central:(local): process_cmd(%s)",
483 ORTE_NAME_PRINT(sender)));
484
485 count = 1;
486 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SSTORE_CENTRAL_CMD))) {
487 ORTE_ERROR_LOG(ret);
488 goto cleanup;
489 }
490
491 count = 1;
492 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &loc_id, &count, ORTE_SSTORE_HANDLE )) ) {
493 ORTE_ERROR_LOG(ret);
494 goto cleanup;
495 }
496
497
498
499
500 if(NULL == (handle_info = find_handle_info(loc_id)) ) {
501 handle_info = create_new_handle_info(loc_id);
502 }
503
504
505
506
507 if( ORTE_SSTORE_CENTRAL_PULL == command ) {
508 if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, ORTE_PROC_MY_HNP, sender)) {
509 process_global_pull(sender, buffer, handle_info);
510 } else {
511 process_app_pull(sender, buffer, handle_info);
512 }
513 }
514 else if( ORTE_SSTORE_CENTRAL_PUSH == command ) {
515 if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, ORTE_PROC_MY_HNP, sender)) {
516 process_global_push(sender, buffer, handle_info);
517 } else {
518 process_app_push(sender, buffer, handle_info);
519 }
520 }
521
522 cleanup:
523 return;
524 }
525
526
527
528
529 static orte_sstore_central_local_snapshot_info_t *create_new_handle_info(orte_sstore_base_handle_t handle)
530 {
531 orte_sstore_central_local_snapshot_info_t *handle_info = NULL;
532 int i;
533 orte_proc_t *child = NULL;
534
535 if( NULL == active_handles ) {
536 active_handles = OBJ_NEW(opal_list_t);
537 }
538
539 handle_info = OBJ_NEW(orte_sstore_central_local_snapshot_info_t);
540
541 handle_info->id = handle;
542
543 opal_list_append(active_handles, &(handle_info->super));
544
545
546
547
548 for (i=0; i < orte_local_children->size; i++) {
549 if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
550 continue;
551 }
552 append_new_app_handle_info(handle_info, &child->name);
553 }
554
555 handle_info->status = SSTORE_LOCAL_INIT;
556
557 return handle_info;
558 }
559
560 static orte_sstore_central_local_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle)
561 {
562 orte_sstore_central_local_snapshot_info_t *handle_info = NULL;
563 opal_list_item_t* item = NULL;
564
565 if( NULL == active_handles ) {
566 return NULL;
567 }
568
569 for(item = opal_list_get_first(active_handles);
570 item != opal_list_get_end(active_handles);
571 item = opal_list_get_next(item) ) {
572 handle_info = (orte_sstore_central_local_snapshot_info_t*)item;
573
574 if( handle_info->id == handle ) {
575 return handle_info;
576 }
577 }
578
579 return NULL;
580 }
581
582 static int append_new_app_handle_info(orte_sstore_central_local_snapshot_info_t *handle_info,
583 orte_process_name_t *name)
584 {
585 orte_sstore_central_local_app_snapshot_info_t *app_info = NULL;
586
587 app_info = OBJ_NEW(orte_sstore_central_local_app_snapshot_info_t);
588
589 app_info->name.jobid = name->jobid;
590 app_info->name.vpid = name->vpid;
591
592 opal_list_append(handle_info->app_info_handle, &(app_info->super));
593
594 return ORTE_SUCCESS;
595 }
596
597 static orte_sstore_central_local_app_snapshot_info_t *find_app_handle_info(orte_sstore_central_local_snapshot_info_t *handle_info,
598 orte_process_name_t *name)
599 {
600 orte_sstore_central_local_app_snapshot_info_t *app_info = NULL;
601 opal_list_item_t* item = NULL;
602 orte_ns_cmp_bitmask_t mask;
603
604 for(item = opal_list_get_first(handle_info->app_info_handle);
605 item != opal_list_get_end(handle_info->app_info_handle);
606 item = opal_list_get_next(item) ) {
607 app_info = (orte_sstore_central_local_app_snapshot_info_t*)item;
608
609 mask = ORTE_NS_CMP_ALL;
610
611 if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &app_info->name, name)) {
612 return app_info;
613 }
614 }
615
616 return NULL;
617 }
618
619 static int sstore_central_local_start_listener(void)
620 {
621 if( is_global_listener_active ) {
622 return ORTE_SUCCESS;
623 }
624
625 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL,
626 ORTE_RML_PERSISTENT, orte_sstore_central_local_recv, NULL);
627
628 is_global_listener_active = true;
629 return ORTE_SUCCESS;
630 }
631
632 static int sstore_central_local_stop_listener(void)
633 {
634 orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL);
635
636 is_global_listener_active = false;
637 return ORTE_SUCCESS;
638 }
639
640 static int process_global_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_central_local_snapshot_info_t *handle_info)
641 {
642
643 opal_output(0, "sstore:central:(local): process_global_pull() Not implemented!");
644 return ORTE_ERR_NOT_IMPLEMENTED;
645 }
646
647 static int process_global_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_central_local_snapshot_info_t *handle_info)
648 {
649 int ret, exit_status = ORTE_SUCCESS;
650 orte_std_cntr_t count;
651 orte_sstore_central_local_app_snapshot_info_t *app_info = NULL;
652 opal_list_item_t* item = NULL;
653
654 count = 1;
655 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->seq_num), &count, OPAL_INT))) {
656 ORTE_ERROR_LOG(ret);
657 exit_status = ret;
658 goto cleanup;
659 }
660
661 count = 1;
662 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->global_ref_name), &count, OPAL_STRING))) {
663 ORTE_ERROR_LOG(ret);
664 exit_status = ret;
665 goto cleanup;
666 }
667
668 count = 1;
669 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->location_fmt), &count, OPAL_STRING))) {
670 ORTE_ERROR_LOG(ret);
671 exit_status = ret;
672 goto cleanup;
673 }
674
675
676
677
678 for(item = opal_list_get_first(handle_info->app_info_handle);
679 item != opal_list_get_end(handle_info->app_info_handle);
680 item = opal_list_get_next(item) ) {
681 app_info = (orte_sstore_central_local_app_snapshot_info_t*)item;
682
683 if( NULL != app_info->local_location ) {
684 free(app_info->local_location);
685 app_info->local_location = NULL;
686 }
687 opal_asprintf(&(app_info->local_location), handle_info->location_fmt, app_info->name.vpid);
688
689 if( NULL != app_info->metadata_filename ) {
690 free(app_info->metadata_filename);
691 app_info->metadata_filename = NULL;
692 }
693 opal_asprintf(&(app_info->metadata_filename), "%s/%s",
694 app_info->local_location,
695 orte_sstore_base_local_metadata_filename);
696 }
697
698 cleanup:
699 if( ORTE_SUCCESS == exit_status ) {
700 handle_info->status = SSTORE_LOCAL_READY;
701 } else {
702 handle_info->status = SSTORE_LOCAL_ERROR;
703 }
704
705 return exit_status;
706 }
707
708 static int process_app_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_central_local_snapshot_info_t *handle_info)
709 {
710 int ret, exit_status = ORTE_SUCCESS;
711 opal_buffer_t *loc_buffer = NULL;
712 orte_sstore_central_cmd_flag_t command;
713 orte_sstore_central_local_app_snapshot_info_t *app_info = NULL;
714
715
716
717
718 app_info = find_app_handle_info(handle_info, peer);
719
720
721
722
723 loc_buffer = OBJ_NEW(opal_buffer_t);
724
725 command = ORTE_SSTORE_CENTRAL_PUSH;
726 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &command, 1, ORTE_SSTORE_CENTRAL_CMD))) {
727 ORTE_ERROR_LOG(ret);
728 exit_status = ret;
729 goto cleanup;
730 }
731
732 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
733 ORTE_ERROR_LOG(ret);
734 exit_status = ret;
735 goto cleanup;
736 }
737
738 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->seq_num), 1, OPAL_INT))) {
739 ORTE_ERROR_LOG(ret);
740 exit_status = ret;
741 goto cleanup;
742 }
743
744 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->global_ref_name), 1, OPAL_STRING))) {
745 ORTE_ERROR_LOG(ret);
746 exit_status = ret;
747 goto cleanup;
748 }
749
750 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(app_info->local_location), 1, OPAL_STRING))) {
751 ORTE_ERROR_LOG(ret);
752 exit_status = ret;
753 goto cleanup;
754 }
755
756 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(app_info->metadata_filename), 1, OPAL_STRING))) {
757 ORTE_ERROR_LOG(ret);
758 exit_status = ret;
759 goto cleanup;
760 }
761
762 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(peer, loc_buffer, ORTE_RML_TAG_SSTORE_INTERNAL,
763 orte_rml_send_callback, NULL))) {
764 ORTE_ERROR_LOG(ret);
765 exit_status = ret;
766 goto cleanup;
767 }
768
769 loc_buffer = NULL;
770
771 cleanup:
772 if (NULL != loc_buffer) {
773 OBJ_RELEASE(loc_buffer);
774 loc_buffer = NULL;
775 }
776
777 return exit_status;
778 }
779
780 static int process_app_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_central_local_snapshot_info_t *handle_info)
781 {
782 int ret, exit_status = ORTE_SUCCESS;
783 orte_std_cntr_t count;
784 orte_sstore_central_local_app_snapshot_info_t *app_info = NULL;
785
786
787
788
789 app_info = find_app_handle_info(handle_info, peer);
790
791
792
793
794 count = 1;
795 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(app_info->ckpt_skipped), &count, OPAL_BOOL))) {
796 ORTE_ERROR_LOG(ret);
797 exit_status = ret;
798 goto cleanup;
799 }
800
801 if( !app_info->ckpt_skipped ) {
802 count = 1;
803 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(app_info->crs_comp), &count, OPAL_STRING))) {
804 ORTE_ERROR_LOG(ret);
805 exit_status = ret;
806 goto cleanup;
807 }
808 }
809
810 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
811 "sstore:central:(local): app_push(%s, skip=%s, %s)",
812 ORTE_NAME_PRINT(&(app_info->name)),
813 (app_info->ckpt_skipped ? "T" : "F"),
814 app_info->crs_comp));
815
816 cleanup:
817 return exit_status;
818 }
819
820 static int wait_all_apps_updated(orte_sstore_central_local_snapshot_info_t *handle_info)
821 {
822 orte_sstore_central_local_app_snapshot_info_t *app_info = NULL;
823 opal_list_item_t *item = NULL;
824 bool is_done = true;
825
826 do {
827 is_done = true;
828 for(item = opal_list_get_first(handle_info->app_info_handle);
829 item != opal_list_get_end(handle_info->app_info_handle);
830 item = opal_list_get_next(item) ) {
831 app_info = (orte_sstore_central_local_app_snapshot_info_t*)item;
832
833 if( NULL == app_info->crs_comp && !app_info->ckpt_skipped ) {
834 is_done = false;
835 break;
836 }
837 }
838
839 if( !is_done ) {
840 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
841 "sstore:central:(local): Waiting for appliccation %s",
842 ORTE_NAME_PRINT(&(app_info->name)) ));
843 opal_progress();
844 }
845 } while(!is_done);
846
847 return ORTE_SUCCESS;
848 }
849
850 static int pull_handle_info(orte_sstore_central_local_snapshot_info_t *handle_info )
851 {
852 int ret, exit_status = ORTE_SUCCESS;
853 opal_buffer_t *buffer = NULL;
854 orte_sstore_central_cmd_flag_t command;
855
856
857
858
859
860 if( 0 <= handle_info->seq_num &&
861 NULL != handle_info->global_ref_name &&
862 NULL != handle_info->location_fmt ) {
863 handle_info->status = SSTORE_LOCAL_READY;
864 return ORTE_SUCCESS;
865 }
866
867 buffer = OBJ_NEW(opal_buffer_t);
868
869
870
871
872 command = ORTE_SSTORE_CENTRAL_PULL;
873 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SSTORE_CENTRAL_CMD))) {
874 ORTE_ERROR_LOG(ret);
875 exit_status = ret;
876 goto cleanup;
877 }
878
879 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
880 ORTE_ERROR_LOG(ret);
881 exit_status = ret;
882 goto cleanup;
883 }
884
885 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer,
886 ORTE_RML_TAG_SSTORE_INTERNAL,
887 orte_rml_send_callback, NULL))) {
888 ORTE_ERROR_LOG(ret);
889 exit_status = ret;
890 goto cleanup;
891 }
892
893
894 buffer = NULL;
895
896 cleanup:
897 if (NULL != buffer) {
898 OBJ_RELEASE(buffer);
899 buffer = NULL;
900 }
901
902 return exit_status;
903 }
904
905 static int push_handle_info(orte_sstore_central_local_snapshot_info_t *handle_info )
906 {
907 int ret, exit_status = ORTE_SUCCESS;
908 opal_buffer_t *buffer = NULL;
909 orte_sstore_central_cmd_flag_t command;
910 orte_sstore_central_local_app_snapshot_info_t *app_info = NULL;
911 opal_list_item_t *item = NULL;
912 size_t list_size;
913
914 buffer = OBJ_NEW(opal_buffer_t);
915
916 command = ORTE_SSTORE_CENTRAL_PUSH;
917 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SSTORE_CENTRAL_CMD))) {
918 ORTE_ERROR_LOG(ret);
919 exit_status = ret;
920 goto cleanup;
921 }
922
923 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
924 ORTE_ERROR_LOG(ret);
925 exit_status = ret;
926 goto cleanup;
927 }
928
929 list_size = opal_list_get_size(handle_info->app_info_handle);
930 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &list_size, 1, OPAL_SIZE))) {
931 ORTE_ERROR_LOG(ret);
932 exit_status = ret;
933 goto cleanup;
934 }
935
936
937
938
939 for(item = opal_list_get_first(handle_info->app_info_handle);
940 item != opal_list_get_end(handle_info->app_info_handle);
941 item = opal_list_get_next(item) ) {
942 app_info = (orte_sstore_central_local_app_snapshot_info_t*)item;
943
944 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(app_info->name), 1, ORTE_NAME))) {
945 ORTE_ERROR_LOG(ret);
946 exit_status = ret;
947 goto cleanup;
948 }
949
950 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(app_info->ckpt_skipped), 1, OPAL_BOOL))) {
951 ORTE_ERROR_LOG(ret);
952 exit_status = ret;
953 goto cleanup;
954 }
955
956 if( !app_info->ckpt_skipped ) {
957 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(app_info->crs_comp), 1, OPAL_STRING))) {
958 ORTE_ERROR_LOG(ret);
959 exit_status = ret;
960 goto cleanup;
961 }
962 }
963 }
964
965 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer,
966 ORTE_RML_TAG_SSTORE_INTERNAL,
967 orte_rml_send_callback, NULL))) {
968 ORTE_ERROR_LOG(ret);
969 exit_status = ret;
970 goto cleanup;
971 }
972
973
974 buffer = NULL;
975
976 cleanup:
977 if (NULL != buffer) {
978 OBJ_RELEASE(buffer);
979 buffer = NULL;
980 }
981
982 return exit_status;
983 }