This source file includes following definitions.
- mca_common_ompio_file_open
- mca_common_ompio_file_close
- mca_common_ompio_file_get_size
- mca_common_ompio_file_get_position
- mca_common_ompio_set_file_defaults
- mca_common_ompio_file_delete
- mca_common_ompio_create_incomplete_file_handle
- mca_common_ompio_decode_datatype
- mca_common_ompio_set_callbacks
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 #include "ompi_config.h"
26
27 #include "ompi/communicator/communicator.h"
28 #include "ompi/info/info.h"
29 #include "ompi/file/file.h"
30 #include "ompi/mca/io/base/base.h"
31 #include "ompi/mca/fs/fs.h"
32 #include "ompi/mca/fs/base/base.h"
33 #include "ompi/mca/fcoll/fcoll.h"
34 #include "ompi/mca/fcoll/base/base.h"
35 #include "ompi/mca/fbtl/fbtl.h"
36 #include "ompi/mca/fbtl/base/base.h"
37 #include "ompi/mca/sharedfp/sharedfp.h"
38 #include "ompi/mca/sharedfp/base/base.h"
39
40 #include <unistd.h>
41 #include <math.h>
42 #include "common_ompio.h"
43 #include "ompi/mca/topo/topo.h"
44
45 static mca_common_ompio_generate_current_file_view_fn_t generate_current_file_view_fn;
46 static mca_common_ompio_get_mca_parameter_value_fn_t get_mca_parameter_value_fn;
47
48 int mca_common_ompio_file_open (ompi_communicator_t *comm,
49 const char *filename,
50 int amode,
51 opal_info_t *info,
52 ompio_file_t *ompio_fh, bool use_sharedfp)
53 {
54 int ret = OMPI_SUCCESS;
55 int remote_arch;
56
57
58 ompio_fh->f_iov_type = MPI_DATATYPE_NULL;
59 ompio_fh->f_comm = MPI_COMM_NULL;
60
61 if ( ((amode&MPI_MODE_RDONLY)?1:0) + ((amode&MPI_MODE_RDWR)?1:0) +
62 ((amode&MPI_MODE_WRONLY)?1:0) != 1 ) {
63 return MPI_ERR_AMODE;
64 }
65
66 if ((amode & MPI_MODE_RDONLY) &&
67 ((amode & MPI_MODE_CREATE) || (amode & MPI_MODE_EXCL))) {
68 return MPI_ERR_AMODE;
69 }
70
71 if ((amode & MPI_MODE_RDWR) && (amode & MPI_MODE_SEQUENTIAL)) {
72 return MPI_ERR_AMODE;
73 }
74
75 ompio_fh->f_rank = ompi_comm_rank (comm);
76 ompio_fh->f_size = ompi_comm_size (comm);
77 remote_arch = opal_local_arch;
78 ompio_fh->f_mem_convertor = opal_convertor_create (remote_arch, 0);
79 ompio_fh->f_file_convertor = opal_convertor_create (remote_arch, 0);
80
81 if ( true == use_sharedfp ) {
82 ret = ompi_comm_dup (comm, &ompio_fh->f_comm);
83 if ( OMPI_SUCCESS != ret ) {
84 goto fn_fail;
85 }
86 }
87 else {
88
89
90
91 ompio_fh->f_comm = comm;
92 }
93
94 ompio_fh->f_fstype = NONE;
95 ompio_fh->f_amode = amode;
96 ompio_fh->f_info = info;
97
98
99 ompio_fh->f_generate_current_file_view=generate_current_file_view_fn;
100 ompio_fh->f_get_mca_parameter_value=get_mca_parameter_value_fn;
101
102 ompio_fh->f_filename = filename;
103 mca_common_ompio_set_file_defaults (ompio_fh);
104
105 ompio_fh->f_split_coll_req = NULL;
106 ompio_fh->f_split_coll_in_use = false;
107
108
109 mca_common_ompio_initialize_print_queue(&ompio_fh->f_coll_write_time);
110 mca_common_ompio_initialize_print_queue(&ompio_fh->f_coll_read_time);
111
112
113
114 if ( OMPIO_MCA_GET(ompio_fh, overwrite_amode) && !(amode & MPI_MODE_SEQUENTIAL) ) {
115
116 if ((amode & MPI_MODE_WRONLY)){
117 amode -= MPI_MODE_WRONLY;
118 amode += MPI_MODE_RDWR;
119 }
120 }
121
122
123
124 if (OMPI_SUCCESS != (ret = mca_fs_base_file_select (ompio_fh,
125 NULL))) {
126 opal_output(1, "mca_fs_base_file_select() failed\n");
127 goto fn_fail;
128 }
129 if (OMPI_SUCCESS != (ret = mca_fbtl_base_file_select (ompio_fh,
130 NULL))) {
131 opal_output(1, "mca_fbtl_base_file_select() failed\n");
132 goto fn_fail;
133 }
134
135
136 ompio_fh->f_sharedfp_component = NULL;
137 ompio_fh->f_sharedfp = NULL;
138 ompio_fh->f_sharedfp_data = NULL;
139
140 if ( true == use_sharedfp ) {
141 if (OMPI_SUCCESS != (ret = mca_sharedfp_base_file_select (ompio_fh, NULL))) {
142 opal_output ( ompi_io_base_framework.framework_output,
143 "mca_sharedfp_base_file_select() failed\n");
144 ompio_fh->f_sharedfp = NULL;
145
146
147
148
149 }
150 }
151 else {
152 ompio_fh->f_flags |= OMPIO_SHAREDFP_IS_SET;
153 }
154
155 ret = ompio_fh->f_fs->fs_file_open (comm,
156 filename,
157 amode,
158 info,
159 ompio_fh);
160
161 if ( OMPI_SUCCESS != ret ) {
162 #ifdef OMPIO_DEBUG
163 opal_output(1, "fs_file failed, error code %d\n", ret);
164 #endif
165 goto fn_fail;
166 }
167
168 if ( true == use_sharedfp ) {
169
170
171
172
173 if ( NULL != ompio_fh->f_sharedfp ) {
174 ret = ompio_fh->f_sharedfp->sharedfp_file_open(comm,
175 filename,
176 amode,
177 info,
178 ompio_fh);
179
180 if ( OMPI_SUCCESS != ret ) {
181 goto fn_fail;
182 }
183 }
184 }
185
186
187 mca_common_ompio_set_view(ompio_fh,
188 0,
189 &ompi_mpi_byte.dt,
190 &ompi_mpi_byte.dt,
191 "native",
192 info);
193
194
195
196
197
198 if ( ompio_fh->f_amode & MPI_MODE_APPEND ) {
199 OMPI_MPI_OFFSET_TYPE current_size;
200 mca_sharedfp_base_module_t * shared_fp_base_module;
201
202 ompio_fh->f_fs->fs_file_get_size( ompio_fh,
203 ¤t_size);
204 mca_common_ompio_set_explicit_offset (ompio_fh, current_size);
205 if ( true == use_sharedfp ) {
206 if ( NULL != ompio_fh->f_sharedfp ) {
207 shared_fp_base_module = ompio_fh->f_sharedfp;
208 ret = shared_fp_base_module->sharedfp_seek(ompio_fh,current_size, MPI_SEEK_SET);
209 if ( MPI_SUCCESS != ret ) {
210 opal_output(1, "mca_common_ompio_file_open: Could not adjust position of "
211 "shared file pointer with MPI_MODE_APPEND\n");
212 ret = MPI_ERR_OTHER;
213 goto fn_fail;
214 }
215 }
216 }
217
218 }
219
220
221
222 return OMPI_SUCCESS;
223
224 fn_fail:
225
226
227
228
229 return ret;
230 }
231
232 int mca_common_ompio_file_close (ompio_file_t *ompio_fh)
233 {
234 int ret = OMPI_SUCCESS;
235 int delete_flag = 0;
236 char name[256];
237
238 ret = ompio_fh->f_comm->c_coll->coll_barrier ( ompio_fh->f_comm, ompio_fh->f_comm->c_coll->coll_barrier_module);
239 if ( OMPI_SUCCESS != ret ) {
240
241 opal_output (1,"mca_common_ompio_file_close: error in Barrier \n");
242 return ret;
243 }
244
245
246 if(OMPIO_MCA_GET(ompio_fh, coll_timing_info)){
247 strcpy (name, "WRITE");
248 if (!mca_common_ompio_empty_print_queue(ompio_fh->f_coll_write_time)){
249 ret = mca_common_ompio_print_time_info(ompio_fh->f_coll_write_time,
250 name,
251 ompio_fh);
252 if (OMPI_SUCCESS != ret){
253 printf("Error in print_time_info ");
254 }
255
256 }
257 strcpy (name, "READ");
258 if (!mca_common_ompio_empty_print_queue(ompio_fh->f_coll_read_time)){
259 ret = mca_common_ompio_print_time_info(ompio_fh->f_coll_read_time,
260 name,
261 ompio_fh);
262 if (OMPI_SUCCESS != ret){
263 printf("Error in print_time_info ");
264 }
265 }
266 }
267 if ( ompio_fh->f_amode & MPI_MODE_DELETE_ON_CLOSE ) {
268 delete_flag = 1;
269 }
270
271
272 if( NULL != ompio_fh->f_sharedfp ){
273 ret = ompio_fh->f_sharedfp->sharedfp_file_close(ompio_fh);
274 }
275 if ( NULL != ompio_fh->f_fs ) {
276
277
278
279
280 ret = ompio_fh->f_fs->fs_file_close (ompio_fh);
281 }
282 if ( delete_flag ) {
283 ret = mca_common_ompio_file_delete ( ompio_fh->f_filename, &(MPI_INFO_NULL->super) );
284 }
285
286 if ( NULL != ompio_fh->f_fs ) {
287 mca_fs_base_file_unselect (ompio_fh);
288 }
289 if ( NULL != ompio_fh->f_fbtl ) {
290 mca_fbtl_base_file_unselect (ompio_fh);
291 }
292
293 if ( NULL != ompio_fh->f_fcoll ) {
294 mca_fcoll_base_file_unselect (ompio_fh);
295 }
296 if ( NULL != ompio_fh->f_sharedfp) {
297 mca_sharedfp_base_file_unselect (ompio_fh);
298 }
299
300 if (NULL != ompio_fh->f_io_array) {
301 free (ompio_fh->f_io_array);
302 ompio_fh->f_io_array = NULL;
303 }
304
305 if (NULL != ompio_fh->f_init_aggr_list) {
306 free (ompio_fh->f_init_aggr_list);
307 ompio_fh->f_init_aggr_list = NULL;
308 }
309 if (NULL != ompio_fh->f_aggr_list) {
310 free (ompio_fh->f_aggr_list);
311 ompio_fh->f_aggr_list = NULL;
312 }
313 if (NULL != ompio_fh->f_init_procs_in_group) {
314 free (ompio_fh->f_init_procs_in_group);
315 ompio_fh->f_init_procs_in_group = NULL;
316 }
317 if (NULL != ompio_fh->f_procs_in_group) {
318 free (ompio_fh->f_procs_in_group);
319 ompio_fh->f_procs_in_group = NULL;
320 }
321
322 if (NULL != ompio_fh->f_decoded_iov) {
323 free (ompio_fh->f_decoded_iov);
324 ompio_fh->f_decoded_iov = NULL;
325 }
326
327 if (NULL != ompio_fh->f_mem_convertor) {
328 opal_convertor_cleanup (ompio_fh->f_mem_convertor);
329 free (ompio_fh->f_mem_convertor);
330 ompio_fh->f_mem_convertor = NULL;
331 }
332
333 if (NULL != ompio_fh->f_file_convertor) {
334 opal_convertor_cleanup (ompio_fh->f_file_convertor);
335 free (ompio_fh->f_file_convertor);
336 ompio_fh->f_file_convertor = NULL;
337 }
338
339 if (NULL != ompio_fh->f_datarep) {
340 free (ompio_fh->f_datarep);
341 ompio_fh->f_datarep = NULL;
342 }
343
344 if ( NULL != ompio_fh->f_coll_write_time ) {
345 free ( ompio_fh->f_coll_write_time );
346 ompio_fh->f_coll_write_time = NULL;
347 }
348
349 if ( NULL != ompio_fh->f_coll_read_time ) {
350 free ( ompio_fh->f_coll_read_time );
351 ompio_fh->f_coll_read_time = NULL;
352 }
353
354 if (MPI_DATATYPE_NULL != ompio_fh->f_iov_type) {
355 ompi_datatype_destroy (&ompio_fh->f_iov_type);
356 ompio_fh->f_iov_type=MPI_DATATYPE_NULL;
357 }
358
359 if ( MPI_DATATYPE_NULL != ompio_fh->f_etype ) {
360 ompi_datatype_destroy (&ompio_fh->f_etype);
361 }
362 if ( MPI_DATATYPE_NULL != ompio_fh->f_filetype ){
363 ompi_datatype_destroy (&ompio_fh->f_filetype);
364 }
365
366 if ( MPI_DATATYPE_NULL != ompio_fh->f_orig_filetype ){
367 ompi_datatype_destroy (&ompio_fh->f_orig_filetype);
368 }
369
370
371 if (MPI_COMM_NULL != ompio_fh->f_comm && !(ompio_fh->f_flags & OMPIO_SHAREDFP_IS_SET) ) {
372 ompi_comm_free (&ompio_fh->f_comm);
373 }
374
375 return ret;
376 }
377
378 int mca_common_ompio_file_get_size (ompio_file_t *ompio_fh,
379 OMPI_MPI_OFFSET_TYPE *size)
380 {
381 int ret = OMPI_SUCCESS;
382
383 ret = ompio_fh->f_fs->fs_file_get_size (ompio_fh, size);
384
385 return ret;
386 }
387
388
389 int mca_common_ompio_file_get_position (ompio_file_t *fh,
390 OMPI_MPI_OFFSET_TYPE *offset)
391 {
392 OMPI_MPI_OFFSET_TYPE off;
393
394 if ( 0 == fh->f_view_extent ||
395 0 == fh->f_view_size ||
396 0 == fh->f_etype_size ) {
397
398 *offset = 0;
399 return OMPI_SUCCESS;
400 }
401
402 off = (fh->f_offset - fh->f_disp)/fh->f_view_extent;
403
404
405 off *= (fh->f_view_size / fh->f_etype_size);
406
407
408 off += fh->f_total_bytes / fh->f_etype_size;
409
410 *offset = off;
411 return OMPI_SUCCESS;
412 }
413
414 int mca_common_ompio_set_file_defaults (ompio_file_t *fh)
415 {
416
417 if (NULL != fh) {
418 char char_stripe[MPI_MAX_INFO_VAL];
419 ompi_datatype_t *types[2];
420 int blocklen[2] = {1, 1};
421 ptrdiff_t d[2], base;
422 int i, flag;
423
424 fh->f_io_array = NULL;
425 fh->f_perm = OMPIO_PERM_NULL;
426 fh->f_flags = 0;
427
428 fh->f_bytes_per_agg = OMPIO_MCA_GET(fh, bytes_per_agg);
429 opal_info_get (fh->f_info, "cb_buffer_size", MPI_MAX_INFO_VAL, char_stripe, &flag);
430 if ( flag ) {
431
432 sscanf ( char_stripe, "%d", &fh->f_bytes_per_agg );
433 OMPIO_MCA_PRINT_INFO(fh, "cb_buffer_size", char_stripe, "");
434 }
435
436 fh->f_atomicity = 0;
437 fh->f_fs_block_size = 4096;
438
439 fh->f_offset = 0;
440 fh->f_disp = 0;
441 fh->f_position_in_file_view = 0;
442 fh->f_index_in_file_view = 0;
443 fh->f_total_bytes = 0;
444
445 fh->f_init_procs_per_group = -1;
446 fh->f_init_procs_in_group = NULL;
447
448 fh->f_procs_per_group = -1;
449 fh->f_procs_in_group = NULL;
450
451 fh->f_init_num_aggrs = -1;
452 fh->f_init_aggr_list = NULL;
453
454 fh->f_num_aggrs = -1;
455 fh->f_aggr_list = NULL;
456
457
458 fh->f_iov_type = MPI_DATATYPE_NULL;
459 fh->f_stripe_size = 0;
460
461 fh->f_decoded_iov = NULL;
462 fh->f_etype = MPI_DATATYPE_NULL;
463 fh->f_filetype = MPI_DATATYPE_NULL;
464 fh->f_orig_filetype = MPI_DATATYPE_NULL;
465 fh->f_datarep = NULL;
466
467
468 types[0] = &ompi_mpi_long.dt;
469 types[1] = &ompi_mpi_long.dt;
470
471 d[0] = (ptrdiff_t) fh->f_decoded_iov;
472 d[1] = (ptrdiff_t) &fh->f_decoded_iov[0].iov_len;
473
474 base = d[0];
475 for (i=0 ; i<2 ; i++) {
476 d[i] -= base;
477 }
478
479 ompi_datatype_create_struct (2,
480 blocklen,
481 d,
482 types,
483 &fh->f_iov_type);
484 ompi_datatype_commit (&fh->f_iov_type);
485
486 return OMPI_SUCCESS;
487 }
488 else {
489 return OMPI_ERROR;
490 }
491 }
492
493
494 int mca_common_ompio_file_delete (const char *filename,
495 struct opal_info_t *info)
496 {
497 int ret = OMPI_SUCCESS;
498 ompio_file_t *fh = NULL;
499
500
501
502
503
504
505
506
507
508
509
510 ret = mca_common_ompio_create_incomplete_file_handle(filename, &fh);
511 if (OMPI_SUCCESS != ret) {
512 return ret;
513 }
514
515 ret = mca_fs_base_file_select (fh, NULL);
516 if (OMPI_SUCCESS != ret) {
517 opal_output(1, "error in mca_common_ompio_file_delete: "
518 "mca_fs_base_file_select() failed\n");
519 free(fh);
520 return ret;
521 }
522
523 ret = fh->f_fs->fs_file_delete ( (char *)filename, NULL);
524 free(fh);
525
526 if (OMPI_SUCCESS != ret) {
527 return ret;
528 }
529 return OMPI_SUCCESS;
530 }
531
532 int mca_common_ompio_create_incomplete_file_handle (const char *filename,
533 ompio_file_t **fh)
534 {
535 ompio_file_t *file;
536
537 if (NULL == filename) {
538 opal_output(1, "error in mca_common_ompio_create_incomplete_file_handle"
539 ", filename is NULL.\n");
540 return OMPI_ERROR;
541 }
542
543 file = calloc(1, sizeof(ompio_file_t));
544 if (NULL == file) {
545 opal_output(1, "Out of memory.\n");
546 return OMPI_ERR_OUT_OF_RESOURCE;
547 }
548
549
550
551 file->f_comm = MPI_COMM_NULL;
552 file->f_rank = OMPIO_ROOT;
553
554
555
556
557
558
559
560
561
562
563 file->f_filename = filename;
564
565 *fh = file;
566 return OMPI_SUCCESS;
567 }
568
569 int mca_common_ompio_decode_datatype (struct ompio_file_t *fh,
570 ompi_datatype_t *datatype,
571 int count,
572 const void *buf,
573 size_t *max_data,
574 opal_convertor_t *conv,
575 struct iovec **iov,
576 uint32_t *iovec_count)
577 {
578
579
580
581 opal_convertor_t convertor;
582 size_t remaining_length = 0;
583 uint32_t i;
584 uint32_t temp_count;
585 struct iovec *temp_iov=NULL;
586 size_t temp_data;
587
588
589 opal_convertor_clone (conv, &convertor, 0);
590
591 if (OMPI_SUCCESS != opal_convertor_prepare_for_send (&convertor,
592 &(datatype->super),
593 count,
594 buf)) {
595 opal_output (1, "Cannot attach the datatype to a convertor\n");
596 return OMPI_ERROR;
597 }
598
599 if ( 0 == datatype->super.size ) {
600 *max_data = 0;
601 *iovec_count = 0;
602 *iov = NULL;
603 return OMPI_SUCCESS;
604 }
605
606 remaining_length = count * datatype->super.size;
607
608 temp_count = OMPIO_IOVEC_INITIAL_SIZE;
609 temp_iov = (struct iovec*)malloc(temp_count * sizeof(struct iovec));
610 if (NULL == temp_iov) {
611 opal_output (1, "OUT OF MEMORY\n");
612 return OMPI_ERR_OUT_OF_RESOURCE;
613 }
614
615 while (0 == opal_convertor_raw(&convertor,
616 temp_iov,
617 &temp_count,
618 &temp_data)) {
619 #if 0
620 printf ("%d: New raw extraction (iovec_count = %d, max_data = %lu)\n",
621 fh->f_rank,temp_count, (unsigned long)temp_data);
622 for (i = 0; i < temp_count; i++) {
623 printf ("%d: \t{%p, %lu}\n",fh->f_rank,
624 temp_iov[i].iov_base,
625 (unsigned long)temp_iov[i].iov_len);
626 }
627 #endif
628
629 *iovec_count = *iovec_count + temp_count;
630 *max_data = *max_data + temp_data;
631 *iov = (struct iovec *) realloc (*iov, *iovec_count * sizeof(struct iovec));
632 if (NULL == *iov) {
633 opal_output(1, "OUT OF MEMORY\n");
634 free(temp_iov);
635 return OMPI_ERR_OUT_OF_RESOURCE;
636 }
637 for (i=0 ; i<temp_count ; i++) {
638 (*iov)[i+(*iovec_count-temp_count)].iov_base = temp_iov[i].iov_base;
639 (*iov)[i+(*iovec_count-temp_count)].iov_len = temp_iov[i].iov_len;
640 }
641
642 remaining_length -= temp_data;
643 temp_count = OMPIO_IOVEC_INITIAL_SIZE;
644 }
645 #if 0
646 printf ("%d: LAST raw extraction (iovec_count = %d, max_data = %d)\n",
647 fh->f_rank,temp_count, temp_data);
648 for (i = 0; i < temp_count; i++) {
649 printf ("%d: \t offset[%d]: %ld; length[%d]: %ld\n", fh->f_rank,i,temp_iov[i].iov_base, i,temp_iov[i].iov_len);
650 }
651 #endif
652 *iovec_count = *iovec_count + temp_count;
653 *max_data = *max_data + temp_data;
654 if ( temp_count > 0 ) {
655 *iov = (struct iovec *) realloc (*iov, *iovec_count * sizeof(struct iovec));
656 if (NULL == *iov) {
657 opal_output(1, "OUT OF MEMORY\n");
658 free(temp_iov);
659 return OMPI_ERR_OUT_OF_RESOURCE;
660 }
661 }
662 for (i=0 ; i<temp_count ; i++) {
663 (*iov)[i+(*iovec_count-temp_count)].iov_base = temp_iov[i].iov_base;
664 (*iov)[i+(*iovec_count-temp_count)].iov_len = temp_iov[i].iov_len;
665 }
666
667 remaining_length -= temp_data;
668
669 #if 0
670 if (0 == fh->f_rank) {
671 printf ("%d Entries: \n",*iovec_count);
672 for (i=0 ; i<*iovec_count ; i++) {
673 printf ("\t{%p, %d}\n",
674 (*iov)[i].iov_base,
675 (*iov)[i].iov_len);
676 }
677 }
678 #endif
679 if (remaining_length != 0) {
680 printf( "Not all raw description was been extracted (%lu bytes missing)\n",
681 (unsigned long) remaining_length );
682 }
683
684 free (temp_iov);
685 opal_convertor_cleanup (&convertor);
686
687 return OMPI_SUCCESS;
688 }
689
690 int mca_common_ompio_set_callbacks(mca_common_ompio_generate_current_file_view_fn_t generate_current_file_view,
691 mca_common_ompio_get_mca_parameter_value_fn_t get_mca_parameter_value)
692 {
693 generate_current_file_view_fn = generate_current_file_view;
694 get_mca_parameter_value_fn = get_mca_parameter_value;
695 return OMPI_SUCCESS;
696 }