This source file includes following definitions.
- mca_io_ompio_file_open
- mca_io_ompio_file_close
- mca_io_ompio_file_preallocate
- mca_io_ompio_file_set_size
- mca_io_ompio_file_get_size
- mca_io_ompio_file_get_amode
- mca_io_ompio_file_get_type_extent
- mca_io_ompio_file_set_atomicity
- mca_io_ompio_file_get_atomicity
- mca_io_ompio_file_sync
- mca_io_ompio_file_seek
- mca_io_ompio_file_get_position
- mca_io_ompio_file_get_byte_offset
- mca_io_ompio_file_seek_shared
- mca_io_ompio_file_get_position_shared
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 #include "ompi_config.h"
25
26 #include "ompi/communicator/communicator.h"
27 #include "ompi/info/info.h"
28 #include "ompi/file/file.h"
29 #include "ompi/mca/io/base/base.h"
30 #include "ompi/mca/fs/fs.h"
31 #include "ompi/mca/fs/base/base.h"
32 #include "ompi/mca/fcoll/fcoll.h"
33 #include "ompi/mca/fcoll/base/base.h"
34 #include "ompi/mca/fbtl/fbtl.h"
35 #include "ompi/mca/fbtl/base/base.h"
36 #include "ompi/mca/sharedfp/sharedfp.h"
37 #include "ompi/mca/sharedfp/base/base.h"
38
39 #include <unistd.h>
40 #include <math.h>
41 #include "io_ompio.h"
42 #include "ompi/mca/common/ompio/common_ompio_request.h"
43 #include "ompi/mca/topo/topo.h"
44
45 int mca_io_ompio_file_open (ompi_communicator_t *comm,
46 const char *filename,
47 int amode,
48 opal_info_t *info,
49 ompi_file_t *fh)
50 {
51 int ret = OMPI_SUCCESS;
52 mca_common_ompio_data_t *data=NULL;
53 bool use_sharedfp = true;
54
55
56
57
58
59
60 data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
61 if ( NULL == data ) {
62 return OMPI_ERR_OUT_OF_RESOURCE;
63 }
64
65
66
67 data->ompio_fh.f_fh = fh;
68
69
70
71 ret = mca_common_ompio_file_open(comm,filename,amode,info,&data->ompio_fh,use_sharedfp);
72
73 if ( OMPI_SUCCESS == ret ) {
74 fh->f_flags |= OMPIO_FILE_IS_OPEN;
75 }
76
77
78
79
80 return ret;
81 }
82
83 int mca_io_ompio_file_close (ompi_file_t *fh)
84 {
85 int ret = OMPI_SUCCESS;
86 mca_common_ompio_data_t *data;
87
88 data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
89 if ( NULL == data ) {
90
91 return ret;
92 }
93
94
95
96
97 ret = mca_common_ompio_file_close(&data->ompio_fh);
98
99 if ( NULL != data ) {
100 free ( data );
101 }
102
103 return ret;
104 }
105
106 int mca_io_ompio_file_preallocate (ompi_file_t *fh,
107 OMPI_MPI_OFFSET_TYPE diskspace)
108 {
109 int ret = OMPI_SUCCESS, cycles, i;
110 OMPI_MPI_OFFSET_TYPE tmp, current_size, size, written, len;
111 mca_common_ompio_data_t *data;
112 char *buf = NULL;
113 ompi_status_public_t *status = NULL;
114
115 data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
116
117 OPAL_THREAD_LOCK(&fh->f_lock);
118 tmp = diskspace;
119
120 ret = data->ompio_fh.f_comm->c_coll->coll_bcast (&tmp,
121 1,
122 OMPI_OFFSET_DATATYPE,
123 OMPIO_ROOT,
124 data->ompio_fh.f_comm,
125 data->ompio_fh.f_comm->c_coll->coll_bcast_module);
126 if ( OMPI_SUCCESS != ret ) {
127 OPAL_THREAD_UNLOCK(&fh->f_lock);
128 return OMPI_ERROR;
129 }
130
131 if (tmp != diskspace) {
132 OPAL_THREAD_UNLOCK(&fh->f_lock);
133 return OMPI_ERROR;
134 }
135 ret = data->ompio_fh.f_fs->fs_file_get_size (&data->ompio_fh,
136 ¤t_size);
137 if ( OMPI_SUCCESS != ret ) {
138 OPAL_THREAD_UNLOCK(&fh->f_lock);
139 return OMPI_ERROR;
140 }
141
142 if ( current_size > diskspace ) {
143 OPAL_THREAD_UNLOCK(&fh->f_lock);
144 return OMPI_SUCCESS;
145 }
146
147
148
149
150
151
152
153
154
155 if (OMPIO_ROOT == data->ompio_fh.f_rank) {
156 OMPI_MPI_OFFSET_TYPE prev_offset;
157 mca_common_ompio_file_get_position (&data->ompio_fh, &prev_offset );
158
159 size = diskspace;
160 if (size > current_size) {
161 size = current_size;
162 }
163
164 cycles = (size + OMPIO_PREALLOC_MAX_BUF_SIZE - 1)/
165 OMPIO_PREALLOC_MAX_BUF_SIZE;
166 buf = (char *) malloc (OMPIO_PREALLOC_MAX_BUF_SIZE);
167 if (NULL == buf) {
168 opal_output(1, "OUT OF MEMORY\n");
169 ret = OMPI_ERR_OUT_OF_RESOURCE;
170 goto exit;
171 }
172 written = 0;
173
174 for (i=0; i<cycles; i++) {
175 len = OMPIO_PREALLOC_MAX_BUF_SIZE;
176 if (len > size-written) {
177 len = size - written;
178 }
179 ret = mca_common_ompio_file_read (&data->ompio_fh, buf, len, MPI_BYTE, status);
180 if (ret != OMPI_SUCCESS) {
181 goto exit;
182 }
183 ret = mca_common_ompio_file_write (&data->ompio_fh, buf, len, MPI_BYTE, status);
184 if (ret != OMPI_SUCCESS) {
185 goto exit;
186 }
187 written += len;
188 }
189
190 if (diskspace > current_size) {
191 memset(buf, 0, OMPIO_PREALLOC_MAX_BUF_SIZE);
192 size = diskspace - current_size;
193 cycles = (size + OMPIO_PREALLOC_MAX_BUF_SIZE - 1) /
194 OMPIO_PREALLOC_MAX_BUF_SIZE;
195 for (i=0; i<cycles; i++) {
196 len = OMPIO_PREALLOC_MAX_BUF_SIZE;
197 if (len > diskspace-written) {
198 len = diskspace - written;
199 }
200 ret = mca_common_ompio_file_write (&data->ompio_fh, buf, len, MPI_BYTE, status);
201 if (ret != OMPI_SUCCESS) {
202 goto exit;
203 }
204 written += len;
205 }
206 }
207
208
209 mca_common_ompio_set_explicit_offset ( &data->ompio_fh, prev_offset);
210 }
211
212 exit:
213 free ( buf );
214 fh->f_comm->c_coll->coll_bcast ( &ret, 1, MPI_INT, OMPIO_ROOT, fh->f_comm,
215 fh->f_comm->c_coll->coll_bcast_module);
216
217 if ( diskspace > current_size ) {
218 data->ompio_fh.f_fs->fs_file_set_size (&data->ompio_fh, diskspace);
219 }
220 OPAL_THREAD_UNLOCK(&fh->f_lock);
221
222 return ret;
223 }
224
225 int mca_io_ompio_file_set_size (ompi_file_t *fh,
226 OMPI_MPI_OFFSET_TYPE size)
227 {
228 int ret = OMPI_SUCCESS;
229 OMPI_MPI_OFFSET_TYPE tmp;
230 mca_common_ompio_data_t *data;
231
232 data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
233
234 tmp = size;
235 OPAL_THREAD_LOCK(&fh->f_lock);
236 ret = data->ompio_fh.f_comm->c_coll->coll_bcast (&tmp,
237 1,
238 OMPI_OFFSET_DATATYPE,
239 OMPIO_ROOT,
240 data->ompio_fh.f_comm,
241 data->ompio_fh.f_comm->c_coll->coll_bcast_module);
242 if ( OMPI_SUCCESS != ret ) {
243 opal_output(1, ",mca_io_ompio_file_set_size: error in bcast\n");
244 OPAL_THREAD_UNLOCK(&fh->f_lock);
245 return ret;
246 }
247
248
249 if (tmp != size) {
250 OPAL_THREAD_UNLOCK(&fh->f_lock);
251 return OMPI_ERROR;
252 }
253
254 ret = data->ompio_fh.f_fs->fs_file_set_size (&data->ompio_fh, size);
255 if ( OMPI_SUCCESS != ret ) {
256 opal_output(1, ",mca_io_ompio_file_set_size: error in fs->set_size\n");
257 OPAL_THREAD_UNLOCK(&fh->f_lock);
258 return ret;
259 }
260
261 ret = data->ompio_fh.f_comm->c_coll->coll_barrier (data->ompio_fh.f_comm,
262 data->ompio_fh.f_comm->c_coll->coll_barrier_module);
263 if ( OMPI_SUCCESS != ret ) {
264 opal_output(1, ",mca_io_ompio_file_set_size: error in barrier\n");
265 OPAL_THREAD_UNLOCK(&fh->f_lock);
266 return ret;
267 }
268 OPAL_THREAD_UNLOCK(&fh->f_lock);
269
270 return ret;
271 }
272
273 int mca_io_ompio_file_get_size (ompi_file_t *fh,
274 OMPI_MPI_OFFSET_TYPE *size)
275 {
276 int ret = OMPI_SUCCESS;
277 mca_common_ompio_data_t *data;
278
279 data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
280 OPAL_THREAD_LOCK(&fh->f_lock);
281 ret = mca_common_ompio_file_get_size(&data->ompio_fh,size);
282 OPAL_THREAD_UNLOCK(&fh->f_lock);
283
284 return ret;
285 }
286
287
288 int mca_io_ompio_file_get_amode (ompi_file_t *fh,
289 int *amode)
290 {
291 mca_common_ompio_data_t *data;
292
293 data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
294
295
296 *amode = data->ompio_fh.f_amode;
297
298 return OMPI_SUCCESS;
299 }
300
301
302 int mca_io_ompio_file_get_type_extent (ompi_file_t *fh,
303 struct ompi_datatype_t *datatype,
304 MPI_Aint *extent)
305 {
306 opal_datatype_type_extent (&datatype->super, extent);
307 return OMPI_SUCCESS;
308 }
309
310
311 int mca_io_ompio_file_set_atomicity (ompi_file_t *fh,
312 int flag)
313 {
314 int tmp;
315 mca_common_ompio_data_t *data;
316
317 data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
318
319 OPAL_THREAD_LOCK(&fh->f_lock);
320 if (flag) {
321 flag = 1;
322 }
323
324
325 tmp = flag;
326 data->ompio_fh.f_comm->c_coll->coll_bcast (&tmp,
327 1,
328 MPI_INT,
329 OMPIO_ROOT,
330 data->ompio_fh.f_comm,
331 data->ompio_fh.f_comm->c_coll->coll_bcast_module);
332
333 if (tmp != flag) {
334 OPAL_THREAD_UNLOCK(&fh->f_lock);
335 return OMPI_ERROR;
336 }
337
338 data->ompio_fh.f_atomicity = flag;
339 OPAL_THREAD_UNLOCK(&fh->f_lock);
340
341 return OMPI_SUCCESS;
342 }
343
344 int mca_io_ompio_file_get_atomicity (ompi_file_t *fh,
345 int *flag)
346 {
347 mca_common_ompio_data_t *data;
348
349 data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
350
351 OPAL_THREAD_LOCK(&fh->f_lock);
352 *flag = data->ompio_fh.f_atomicity;
353 OPAL_THREAD_UNLOCK(&fh->f_lock);
354
355 return OMPI_SUCCESS;
356 }
357
358 int mca_io_ompio_file_sync (ompi_file_t *fh)
359 {
360 int ret = OMPI_SUCCESS;
361 mca_common_ompio_data_t *data;
362
363 data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
364
365 OPAL_THREAD_LOCK(&fh->f_lock);
366 if ( !opal_list_is_empty (&mca_common_ompio_pending_requests) ) {
367 OPAL_THREAD_UNLOCK(&fh->f_lock);
368 return MPI_ERR_OTHER;
369 }
370
371 if ( data->ompio_fh.f_amode & MPI_MODE_RDONLY ) {
372 OPAL_THREAD_UNLOCK(&fh->f_lock);
373 return MPI_ERR_ACCESS;
374 }
375
376 ret = data->ompio_fh.f_comm->c_coll->coll_barrier (data->ompio_fh.f_comm,
377 data->ompio_fh.f_comm->c_coll->coll_barrier_module);
378 if ( MPI_SUCCESS != ret ) {
379 OPAL_THREAD_UNLOCK(&fh->f_lock);
380 return ret;
381 }
382 ret = data->ompio_fh.f_fs->fs_file_sync (&data->ompio_fh);
383 OPAL_THREAD_UNLOCK(&fh->f_lock);
384
385 return ret;
386 }
387
388
389 int mca_io_ompio_file_seek (ompi_file_t *fh,
390 OMPI_MPI_OFFSET_TYPE off,
391 int whence)
392 {
393 int ret = OMPI_SUCCESS;
394 mca_common_ompio_data_t *data;
395 OMPI_MPI_OFFSET_TYPE offset, temp_offset;
396
397 data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
398
399 OPAL_THREAD_LOCK(&fh->f_lock);
400 offset = off * data->ompio_fh.f_etype_size;
401
402 switch(whence) {
403 case MPI_SEEK_SET:
404 if (offset < 0) {
405 OPAL_THREAD_UNLOCK(&fh->f_lock);
406 return OMPI_ERROR;
407 }
408 break;
409 case MPI_SEEK_CUR:
410 ret = mca_common_ompio_file_get_position (&data->ompio_fh,
411 &temp_offset);
412 offset += temp_offset;
413 if (offset < 0) {
414 OPAL_THREAD_UNLOCK(&fh->f_lock);
415 return OMPI_ERROR;
416 }
417 break;
418 case MPI_SEEK_END:
419 ret = data->ompio_fh.f_fs->fs_file_get_size (&data->ompio_fh,
420 &temp_offset);
421 offset += temp_offset;
422 if (offset < 0 || OMPI_SUCCESS != ret) {
423 OPAL_THREAD_UNLOCK(&fh->f_lock);
424 return OMPI_ERROR;
425 }
426 break;
427 default:
428 OPAL_THREAD_UNLOCK(&fh->f_lock);
429 return OMPI_ERROR;
430 }
431
432 ret = mca_common_ompio_set_explicit_offset (&data->ompio_fh,
433 offset/data->ompio_fh.f_etype_size);
434 OPAL_THREAD_UNLOCK(&fh->f_lock);
435
436 return ret;
437 }
438
439 int mca_io_ompio_file_get_position (ompi_file_t *fd,
440 OMPI_MPI_OFFSET_TYPE *offset)
441 {
442 int ret=OMPI_SUCCESS;
443 mca_common_ompio_data_t *data=NULL;
444 ompio_file_t *fh=NULL;
445
446 data = (mca_common_ompio_data_t *) fd->f_io_selected_data;
447 fh = &data->ompio_fh;
448
449 OPAL_THREAD_LOCK(&fd->f_lock);
450 ret = mca_common_ompio_file_get_position (fh, offset);
451 OPAL_THREAD_UNLOCK(&fd->f_lock);
452
453 return ret;
454 }
455
456
457 int mca_io_ompio_file_get_byte_offset (ompi_file_t *fh,
458 OMPI_MPI_OFFSET_TYPE offset,
459 OMPI_MPI_OFFSET_TYPE *disp)
460 {
461 mca_common_ompio_data_t *data;
462 int i, k, index;
463 long temp_offset;
464
465 data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
466
467 OPAL_THREAD_LOCK(&fh->f_lock);
468 temp_offset = (long) data->ompio_fh.f_view_extent *
469 (offset*data->ompio_fh.f_etype_size / data->ompio_fh.f_view_size);
470 if ( 0 > temp_offset ) {
471 OPAL_THREAD_UNLOCK(&fh->f_lock);
472 return MPI_ERR_ARG;
473 }
474
475 i = (offset*data->ompio_fh.f_etype_size) % data->ompio_fh.f_view_size;
476 index = 0;
477 k = 0;
478
479 while (1) {
480 k = data->ompio_fh.f_decoded_iov[index].iov_len;
481 if (i >= k) {
482 i -= k;
483 index++;
484 if ( 0 == i ) {
485 k=0;
486 break;
487 }
488 }
489 else {
490 k=i;
491 break;
492 }
493 }
494
495 *disp = data->ompio_fh.f_disp + temp_offset +
496 (OMPI_MPI_OFFSET_TYPE)(intptr_t)data->ompio_fh.f_decoded_iov[index].iov_base + k;
497 OPAL_THREAD_UNLOCK(&fh->f_lock);
498
499 return OMPI_SUCCESS;
500 }
501
502 int mca_io_ompio_file_seek_shared (ompi_file_t *fp,
503 OMPI_MPI_OFFSET_TYPE offset,
504 int whence)
505 {
506 int ret = OMPI_SUCCESS;
507 mca_common_ompio_data_t *data;
508 ompio_file_t *fh;
509 mca_sharedfp_base_module_t * shared_fp_base_module;
510
511 data = (mca_common_ompio_data_t *) fp->f_io_selected_data;
512 fh = &data->ompio_fh;
513
514
515 shared_fp_base_module = fh->f_sharedfp;
516 if ( NULL == shared_fp_base_module ){
517 opal_output(0, "No shared file pointer component found for this communicator. Can not execute\n");
518 return OMPI_ERROR;
519 }
520
521 OPAL_THREAD_LOCK(&fp->f_lock);
522 ret = shared_fp_base_module->sharedfp_seek(fh,offset,whence);
523 OPAL_THREAD_UNLOCK(&fp->f_lock);
524
525 return ret;
526 }
527
528
529 int mca_io_ompio_file_get_position_shared (ompi_file_t *fp,
530 OMPI_MPI_OFFSET_TYPE * offset)
531 {
532 int ret = OMPI_SUCCESS;
533 mca_common_ompio_data_t *data;
534 ompio_file_t *fh;
535 mca_sharedfp_base_module_t * shared_fp_base_module;
536
537 data = (mca_common_ompio_data_t *) fp->f_io_selected_data;
538 fh = &data->ompio_fh;
539
540
541 shared_fp_base_module = fh->f_sharedfp;
542 if ( NULL == shared_fp_base_module ){
543 opal_output(0, "No shared file pointer component found for this communicator. Can not execute\n");
544 return OMPI_ERROR;
545 }
546 OPAL_THREAD_LOCK(&fp->f_lock);
547 ret = shared_fp_base_module->sharedfp_get_position(fh,offset);
548 *offset = *offset / fh->f_etype_size;
549 OPAL_THREAD_UNLOCK(&fp->f_lock);
550
551 return ret;
552 }
553