root/ompi/mca/io/ompio/io_ompio_file_open.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_io_ompio_file_open
  2. mca_io_ompio_file_close
  3. mca_io_ompio_file_preallocate
  4. mca_io_ompio_file_set_size
  5. mca_io_ompio_file_get_size
  6. mca_io_ompio_file_get_amode
  7. mca_io_ompio_file_get_type_extent
  8. mca_io_ompio_file_set_atomicity
  9. mca_io_ompio_file_get_atomicity
  10. mca_io_ompio_file_sync
  11. mca_io_ompio_file_seek
  12. mca_io_ompio_file_get_position
  13. mca_io_ompio_file_get_byte_offset
  14. mca_io_ompio_file_seek_shared
  15. mca_io_ompio_file_get_position_shared

   1 /*
   2  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2017 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2008-2018 University of Houston. All rights reserved.
  13  * Copyright (c) 2015-2018 Research Organization for Information Science
  14  *                         and Technology (RIST). All rights reserved.
  15  * Copyright (c) 2016      Cisco Systems, Inc.  All rights reserved.
  16  * Copyright (c) 2016-2017 IBM Corporation. All rights reserved.
  17  * $COPYRIGHT$
  18  *
  19  * Additional copyrights may follow
  20  *
  21  * $HEADER$
  22  */
  23 
  24 #include "ompi_config.h"
  25 
  26 #include "ompi/communicator/communicator.h"
  27 #include "ompi/info/info.h"
  28 #include "ompi/file/file.h"
  29 #include "ompi/mca/io/base/base.h"
  30 #include "ompi/mca/fs/fs.h"
  31 #include "ompi/mca/fs/base/base.h"
  32 #include "ompi/mca/fcoll/fcoll.h"
  33 #include "ompi/mca/fcoll/base/base.h"
  34 #include "ompi/mca/fbtl/fbtl.h"
  35 #include "ompi/mca/fbtl/base/base.h"
  36 #include "ompi/mca/sharedfp/sharedfp.h"
  37 #include "ompi/mca/sharedfp/base/base.h"
  38 
  39 #include <unistd.h>
  40 #include <math.h>
  41 #include "io_ompio.h"
  42 #include "ompi/mca/common/ompio/common_ompio_request.h"
  43 #include "ompi/mca/topo/topo.h"
  44 
  45 int mca_io_ompio_file_open (ompi_communicator_t *comm,
  46                             const char *filename,
  47                             int amode,
  48                             opal_info_t *info,
  49                             ompi_file_t *fh)
  50 {
  51     int ret = OMPI_SUCCESS;
  52     mca_common_ompio_data_t *data=NULL;
  53     bool use_sharedfp = true;
  54 
  55 
  56     /* No locking required for file_open according to my understanding
  57        There is virtually no way on how to reach this point from multiple
  58        threads simultaniously 
  59     */
  60     data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
  61     if ( NULL == data ) {
  62         return  OMPI_ERR_OUT_OF_RESOURCE;
  63     }
  64 
  65 
  66     /*save pointer back to the file_t structure */
  67     data->ompio_fh.f_fh = fh;
  68     /* No lock required for file_open even in multi-threaded scenarios,
  69        since only one collective operation per communicator
  70        is allowed anyway */
  71     ret = mca_common_ompio_file_open(comm,filename,amode,info,&data->ompio_fh,use_sharedfp);
  72 
  73     if ( OMPI_SUCCESS == ret ) {
  74         fh->f_flags |= OMPIO_FILE_IS_OPEN;
  75     }
  76 
  77 
  78 
  79 
  80     return ret;
  81 }
  82 
  83 int mca_io_ompio_file_close (ompi_file_t *fh)
  84 {
  85     int ret = OMPI_SUCCESS;
  86     mca_common_ompio_data_t *data;
  87 
  88     data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
  89     if ( NULL == data ) {
  90         /* structure has already been freed, this is an erroneous call to file_close */
  91         return ret;
  92     }
  93     /* No locking required for file_close according to my understanding.
  94        Multiple threads closing the same file handle at the same time
  95        is a clear user error.
  96     */
  97     ret = mca_common_ompio_file_close(&data->ompio_fh);
  98 
  99     if ( NULL != data ) {
 100       free ( data );
 101     }
 102 
 103     return ret;
 104 }
 105 
 106 int mca_io_ompio_file_preallocate (ompi_file_t *fh,
 107                                    OMPI_MPI_OFFSET_TYPE diskspace)
 108 {
 109     int ret = OMPI_SUCCESS, cycles, i;
 110     OMPI_MPI_OFFSET_TYPE tmp, current_size, size, written, len;
 111     mca_common_ompio_data_t *data;
 112     char *buf = NULL;
 113     ompi_status_public_t *status = NULL;
 114 
 115     data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
 116 
 117     OPAL_THREAD_LOCK(&fh->f_lock);
 118     tmp = diskspace;
 119 
 120     ret = data->ompio_fh.f_comm->c_coll->coll_bcast (&tmp,
 121                                                     1,
 122                                                     OMPI_OFFSET_DATATYPE,
 123                                                     OMPIO_ROOT,
 124                                                     data->ompio_fh.f_comm,
 125                                                     data->ompio_fh.f_comm->c_coll->coll_bcast_module);
 126     if ( OMPI_SUCCESS != ret ) {
 127         OPAL_THREAD_UNLOCK(&fh->f_lock);
 128         return OMPI_ERROR;
 129     }
 130 
 131     if (tmp != diskspace) {
 132         OPAL_THREAD_UNLOCK(&fh->f_lock);
 133         return OMPI_ERROR;
 134     }
 135     ret = data->ompio_fh.f_fs->fs_file_get_size (&data->ompio_fh,
 136                                                  &current_size);
 137     if ( OMPI_SUCCESS != ret ) {
 138         OPAL_THREAD_UNLOCK(&fh->f_lock);
 139         return OMPI_ERROR;
 140     }
 141     
 142     if ( current_size > diskspace ) {
 143         OPAL_THREAD_UNLOCK(&fh->f_lock);
 144         return OMPI_SUCCESS;
 145     }
 146 
 147 
 148     /* ROMIO explanation
 149        On file systems with no preallocation function, we have to
 150        explicitly write to allocate space. Since there could be holes in the file,
 151        we need to read up to the current file size, write it back,
 152        and then write beyond that depending on how much
 153        preallocation is needed.
 154     */
 155     if (OMPIO_ROOT == data->ompio_fh.f_rank) {
 156         OMPI_MPI_OFFSET_TYPE prev_offset;
 157         mca_common_ompio_file_get_position (&data->ompio_fh, &prev_offset );
 158 
 159         size = diskspace;
 160         if (size > current_size) {
 161             size = current_size;
 162         }
 163 
 164         cycles = (size + OMPIO_PREALLOC_MAX_BUF_SIZE - 1)/
 165             OMPIO_PREALLOC_MAX_BUF_SIZE;
 166         buf = (char *) malloc (OMPIO_PREALLOC_MAX_BUF_SIZE);
 167         if (NULL == buf) {
 168             opal_output(1, "OUT OF MEMORY\n");
 169             ret = OMPI_ERR_OUT_OF_RESOURCE;
 170             goto exit;
 171         }
 172         written = 0;
 173 
 174         for (i=0; i<cycles; i++) {
 175             len = OMPIO_PREALLOC_MAX_BUF_SIZE;
 176             if (len > size-written) {
 177                 len = size - written;
 178             }
 179             ret = mca_common_ompio_file_read (&data->ompio_fh, buf, len, MPI_BYTE, status);
 180             if (ret != OMPI_SUCCESS) {
 181                 goto exit;
 182             }
 183             ret = mca_common_ompio_file_write (&data->ompio_fh, buf, len, MPI_BYTE, status);
 184             if (ret != OMPI_SUCCESS) {
 185                 goto exit;
 186             }
 187             written += len;
 188         }
 189 
 190         if (diskspace > current_size) {
 191             memset(buf, 0, OMPIO_PREALLOC_MAX_BUF_SIZE);
 192             size = diskspace - current_size;
 193             cycles = (size + OMPIO_PREALLOC_MAX_BUF_SIZE - 1) /
 194                 OMPIO_PREALLOC_MAX_BUF_SIZE;
 195             for (i=0; i<cycles; i++) {
 196                 len = OMPIO_PREALLOC_MAX_BUF_SIZE;
 197                 if (len > diskspace-written) {
 198                     len = diskspace - written;
 199                 }
 200                 ret = mca_common_ompio_file_write (&data->ompio_fh, buf, len, MPI_BYTE, status);
 201                 if (ret != OMPI_SUCCESS) {
 202                     goto exit;
 203                 }
 204                 written += len;
 205             }
 206         }
 207 
 208         // This operation should not affect file pointer position.
 209         mca_common_ompio_set_explicit_offset ( &data->ompio_fh, prev_offset);
 210     }
 211 
 212 exit:     
 213     free ( buf );
 214     fh->f_comm->c_coll->coll_bcast ( &ret, 1, MPI_INT, OMPIO_ROOT, fh->f_comm,
 215                                    fh->f_comm->c_coll->coll_bcast_module);
 216     
 217     if ( diskspace > current_size ) {
 218         data->ompio_fh.f_fs->fs_file_set_size (&data->ompio_fh, diskspace);
 219     }
 220     OPAL_THREAD_UNLOCK(&fh->f_lock);
 221 
 222     return ret;
 223 }
 224 
 225 int mca_io_ompio_file_set_size (ompi_file_t *fh,
 226                                 OMPI_MPI_OFFSET_TYPE size)
 227 {
 228     int ret = OMPI_SUCCESS;
 229     OMPI_MPI_OFFSET_TYPE tmp;
 230     mca_common_ompio_data_t *data;
 231 
 232     data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
 233 
 234     tmp = size;
 235     OPAL_THREAD_LOCK(&fh->f_lock);
 236     ret = data->ompio_fh.f_comm->c_coll->coll_bcast (&tmp,
 237                                                     1,
 238                                                     OMPI_OFFSET_DATATYPE,
 239                                                     OMPIO_ROOT,
 240                                                     data->ompio_fh.f_comm,
 241                                                     data->ompio_fh.f_comm->c_coll->coll_bcast_module);
 242     if ( OMPI_SUCCESS != ret ) {
 243         opal_output(1, ",mca_io_ompio_file_set_size: error in bcast\n");
 244         OPAL_THREAD_UNLOCK(&fh->f_lock);
 245         return ret;
 246     }
 247     
 248 
 249     if (tmp != size) {
 250         OPAL_THREAD_UNLOCK(&fh->f_lock);
 251         return OMPI_ERROR;
 252     }
 253 
 254     ret = data->ompio_fh.f_fs->fs_file_set_size (&data->ompio_fh, size);
 255     if ( OMPI_SUCCESS != ret ) {
 256         opal_output(1, ",mca_io_ompio_file_set_size: error in fs->set_size\n");
 257         OPAL_THREAD_UNLOCK(&fh->f_lock);
 258         return ret;
 259     }
 260     
 261     ret = data->ompio_fh.f_comm->c_coll->coll_barrier (data->ompio_fh.f_comm,
 262                                                       data->ompio_fh.f_comm->c_coll->coll_barrier_module);
 263     if ( OMPI_SUCCESS != ret ) {
 264         opal_output(1, ",mca_io_ompio_file_set_size: error in barrier\n");
 265         OPAL_THREAD_UNLOCK(&fh->f_lock);
 266         return ret;
 267     }
 268     OPAL_THREAD_UNLOCK(&fh->f_lock);
 269 
 270     return ret;
 271 }
 272 
 273 int mca_io_ompio_file_get_size (ompi_file_t *fh,
 274                             OMPI_MPI_OFFSET_TYPE *size)
 275 {
 276     int ret = OMPI_SUCCESS;
 277     mca_common_ompio_data_t *data;
 278 
 279     data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
 280     OPAL_THREAD_LOCK(&fh->f_lock);
 281     ret = mca_common_ompio_file_get_size(&data->ompio_fh,size);
 282     OPAL_THREAD_UNLOCK(&fh->f_lock);
 283 
 284     return ret;
 285 }
 286 
 287 
 288 int mca_io_ompio_file_get_amode (ompi_file_t *fh,
 289                                  int *amode)
 290 {
 291     mca_common_ompio_data_t *data;
 292 
 293     data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
 294     /* No lock necessary in this case, amode is set in file_open, and 
 295        not modified later on*/
 296     *amode = data->ompio_fh.f_amode;
 297 
 298     return OMPI_SUCCESS;
 299 }
 300 
 301 
 302 int mca_io_ompio_file_get_type_extent (ompi_file_t *fh,
 303                                        struct ompi_datatype_t *datatype,
 304                                        MPI_Aint *extent)
 305 {
 306     opal_datatype_type_extent (&datatype->super, extent);
 307     return OMPI_SUCCESS;
 308 }
 309 
 310 
 311 int mca_io_ompio_file_set_atomicity (ompi_file_t *fh,
 312                                      int flag)
 313 {
 314     int tmp;
 315     mca_common_ompio_data_t *data;
 316 
 317     data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
 318 
 319     OPAL_THREAD_LOCK(&fh->f_lock);
 320     if (flag) {
 321         flag = 1;
 322     }
 323 
 324     /* check if the atomicity flag is the same on all processes */
 325     tmp = flag;
 326     data->ompio_fh.f_comm->c_coll->coll_bcast (&tmp,
 327                                               1,
 328                                               MPI_INT,
 329                                               OMPIO_ROOT,
 330                                               data->ompio_fh.f_comm,
 331                                               data->ompio_fh.f_comm->c_coll->coll_bcast_module);
 332 
 333     if (tmp != flag) {
 334         OPAL_THREAD_UNLOCK(&fh->f_lock);
 335         return OMPI_ERROR;
 336     }
 337 
 338     data->ompio_fh.f_atomicity = flag;
 339     OPAL_THREAD_UNLOCK(&fh->f_lock);
 340 
 341     return OMPI_SUCCESS;
 342 }
 343 
 344 int mca_io_ompio_file_get_atomicity (ompi_file_t *fh,
 345                                      int *flag)
 346 {
 347     mca_common_ompio_data_t *data;
 348 
 349     data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
 350 
 351     OPAL_THREAD_LOCK(&fh->f_lock);
 352     *flag = data->ompio_fh.f_atomicity;
 353     OPAL_THREAD_UNLOCK(&fh->f_lock);
 354 
 355     return OMPI_SUCCESS;
 356 }
 357 
 358 int mca_io_ompio_file_sync (ompi_file_t *fh)
 359 {
 360     int ret = OMPI_SUCCESS;
 361     mca_common_ompio_data_t *data;
 362 
 363     data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
 364 
 365     OPAL_THREAD_LOCK(&fh->f_lock);
 366     if ( !opal_list_is_empty (&mca_common_ompio_pending_requests) ) {
 367         OPAL_THREAD_UNLOCK(&fh->f_lock);
 368         return MPI_ERR_OTHER;
 369     }
 370 
 371     if ( data->ompio_fh.f_amode & MPI_MODE_RDONLY ) {
 372         OPAL_THREAD_UNLOCK(&fh->f_lock);
 373         return MPI_ERR_ACCESS;
 374     }        
 375     // Make sure all processes reach this point before syncing the file.
 376     ret = data->ompio_fh.f_comm->c_coll->coll_barrier (data->ompio_fh.f_comm,
 377                                                        data->ompio_fh.f_comm->c_coll->coll_barrier_module);
 378     if ( MPI_SUCCESS != ret ) {
 379         OPAL_THREAD_UNLOCK(&fh->f_lock);
 380         return ret;
 381     }
 382     ret = data->ompio_fh.f_fs->fs_file_sync (&data->ompio_fh);
 383     OPAL_THREAD_UNLOCK(&fh->f_lock);
 384 
 385     return ret;
 386 }
 387 
 388 
 389 int mca_io_ompio_file_seek (ompi_file_t *fh,
 390                             OMPI_MPI_OFFSET_TYPE off,
 391                             int whence)
 392 {
 393     int ret = OMPI_SUCCESS;
 394     mca_common_ompio_data_t *data;
 395     OMPI_MPI_OFFSET_TYPE offset, temp_offset;
 396 
 397     data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
 398 
 399     OPAL_THREAD_LOCK(&fh->f_lock);
 400     offset = off * data->ompio_fh.f_etype_size;
 401 
 402     switch(whence) {
 403     case MPI_SEEK_SET:
 404         if (offset < 0) {
 405             OPAL_THREAD_UNLOCK(&fh->f_lock);
 406             return OMPI_ERROR;
 407         }
 408         break;
 409     case MPI_SEEK_CUR:
 410         ret = mca_common_ompio_file_get_position (&data->ompio_fh,
 411                                                   &temp_offset);
 412         offset += temp_offset;
 413         if (offset < 0) {
 414             OPAL_THREAD_UNLOCK(&fh->f_lock);
 415             return OMPI_ERROR;
 416         }
 417         break;
 418     case MPI_SEEK_END:
 419         ret = data->ompio_fh.f_fs->fs_file_get_size (&data->ompio_fh,
 420                                                      &temp_offset);
 421         offset += temp_offset;
 422         if (offset < 0 || OMPI_SUCCESS != ret) {
 423             OPAL_THREAD_UNLOCK(&fh->f_lock);
 424             return OMPI_ERROR;
 425         }
 426         break;
 427     default:
 428         OPAL_THREAD_UNLOCK(&fh->f_lock);
 429         return OMPI_ERROR;
 430     }
 431 
 432     ret = mca_common_ompio_set_explicit_offset (&data->ompio_fh,
 433                                              offset/data->ompio_fh.f_etype_size);
 434     OPAL_THREAD_UNLOCK(&fh->f_lock);
 435 
 436     return ret;
 437 }
 438 
 439 int mca_io_ompio_file_get_position (ompi_file_t *fd,
 440                                     OMPI_MPI_OFFSET_TYPE *offset)
 441 {
 442     int ret=OMPI_SUCCESS;
 443     mca_common_ompio_data_t *data=NULL;
 444     ompio_file_t *fh=NULL;
 445 
 446     data = (mca_common_ompio_data_t *) fd->f_io_selected_data;
 447     fh = &data->ompio_fh;
 448 
 449     OPAL_THREAD_LOCK(&fd->f_lock);
 450     ret = mca_common_ompio_file_get_position (fh, offset);
 451     OPAL_THREAD_UNLOCK(&fd->f_lock);
 452 
 453     return ret;
 454 }
 455 
 456 
 457 int mca_io_ompio_file_get_byte_offset (ompi_file_t *fh,
 458                                        OMPI_MPI_OFFSET_TYPE offset,
 459                                        OMPI_MPI_OFFSET_TYPE *disp)
 460 {
 461     mca_common_ompio_data_t *data;
 462     int i, k, index;
 463     long temp_offset;
 464 
 465     data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
 466 
 467     OPAL_THREAD_LOCK(&fh->f_lock);
 468     temp_offset = (long) data->ompio_fh.f_view_extent *
 469         (offset*data->ompio_fh.f_etype_size / data->ompio_fh.f_view_size);
 470     if ( 0 > temp_offset ) {
 471         OPAL_THREAD_UNLOCK(&fh->f_lock);
 472         return MPI_ERR_ARG;
 473     }
 474     
 475     i = (offset*data->ompio_fh.f_etype_size) % data->ompio_fh.f_view_size;
 476     index = 0;
 477     k = 0;
 478 
 479     while (1) {
 480         k = data->ompio_fh.f_decoded_iov[index].iov_len;
 481         if (i >= k) {
 482             i -= k;
 483             index++;
 484             if ( 0 == i ) {
 485                 k=0;
 486                 break;
 487             }
 488         }
 489         else {
 490             k=i;
 491             break;
 492         }
 493     }
 494 
 495     *disp = data->ompio_fh.f_disp + temp_offset +
 496         (OMPI_MPI_OFFSET_TYPE)(intptr_t)data->ompio_fh.f_decoded_iov[index].iov_base + k;
 497     OPAL_THREAD_UNLOCK(&fh->f_lock);
 498 
 499     return OMPI_SUCCESS;
 500 }
 501 
 502 int mca_io_ompio_file_seek_shared (ompi_file_t *fp,
 503                                    OMPI_MPI_OFFSET_TYPE offset,
 504                                    int whence)
 505 {
 506     int ret = OMPI_SUCCESS;
 507     mca_common_ompio_data_t *data;
 508     ompio_file_t *fh;
 509     mca_sharedfp_base_module_t * shared_fp_base_module;
 510 
 511     data = (mca_common_ompio_data_t *) fp->f_io_selected_data;
 512     fh = &data->ompio_fh;
 513 
 514     /*get the shared fp module associated with this file*/
 515     shared_fp_base_module = fh->f_sharedfp;
 516     if ( NULL == shared_fp_base_module ){
 517         opal_output(0, "No shared file pointer component found for this communicator. Can not execute\n");
 518         return OMPI_ERROR;
 519     }
 520 
 521     OPAL_THREAD_LOCK(&fp->f_lock);
 522     ret = shared_fp_base_module->sharedfp_seek(fh,offset,whence);
 523     OPAL_THREAD_UNLOCK(&fp->f_lock);
 524 
 525     return ret;
 526 }
 527 
 528 
 529 int mca_io_ompio_file_get_position_shared (ompi_file_t *fp,
 530                                            OMPI_MPI_OFFSET_TYPE * offset)
 531 {
 532     int ret = OMPI_SUCCESS;
 533     mca_common_ompio_data_t *data;
 534     ompio_file_t *fh;
 535     mca_sharedfp_base_module_t * shared_fp_base_module;
 536 
 537     data = (mca_common_ompio_data_t *) fp->f_io_selected_data;
 538     fh = &data->ompio_fh;
 539 
 540     /*get the shared fp module associated with this file*/
 541     shared_fp_base_module = fh->f_sharedfp;
 542     if ( NULL == shared_fp_base_module ){
 543         opal_output(0, "No shared file pointer component found for this communicator. Can not execute\n");
 544         return OMPI_ERROR;
 545     }
 546     OPAL_THREAD_LOCK(&fp->f_lock);
 547     ret = shared_fp_base_module->sharedfp_get_position(fh,offset);
 548     *offset = *offset / fh->f_etype_size;
 549     OPAL_THREAD_UNLOCK(&fp->f_lock);
 550 
 551     return ret;
 552 }
 553 

/* [<][>][^][v][top][bottom][index][help] */