This source file includes following definitions.
- mca_io_ompio_file_open
- mca_io_ompio_file_close
- mca_io_ompio_file_preallocate
- mca_io_ompio_file_set_size
- mca_io_ompio_file_get_size
- mca_io_ompio_file_get_amode
- mca_io_ompio_file_get_type_extent
- mca_io_ompio_file_set_atomicity
- mca_io_ompio_file_get_atomicity
- mca_io_ompio_file_sync
- mca_io_ompio_file_seek
- mca_io_ompio_file_get_position
- mca_io_ompio_file_get_byte_offset
- mca_io_ompio_file_seek_shared
- mca_io_ompio_file_get_position_shared
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 
  17 
  18 
  19 
  20 
  21 
  22 
  23 
  24 #include "ompi_config.h"
  25 
  26 #include "ompi/communicator/communicator.h"
  27 #include "ompi/info/info.h"
  28 #include "ompi/file/file.h"
  29 #include "ompi/mca/io/base/base.h"
  30 #include "ompi/mca/fs/fs.h"
  31 #include "ompi/mca/fs/base/base.h"
  32 #include "ompi/mca/fcoll/fcoll.h"
  33 #include "ompi/mca/fcoll/base/base.h"
  34 #include "ompi/mca/fbtl/fbtl.h"
  35 #include "ompi/mca/fbtl/base/base.h"
  36 #include "ompi/mca/sharedfp/sharedfp.h"
  37 #include "ompi/mca/sharedfp/base/base.h"
  38 
  39 #include <unistd.h>
  40 #include <math.h>
  41 #include "io_ompio.h"
  42 #include "ompi/mca/common/ompio/common_ompio_request.h"
  43 #include "ompi/mca/topo/topo.h"
  44 
  45 int mca_io_ompio_file_open (ompi_communicator_t *comm,
  46                             const char *filename,
  47                             int amode,
  48                             opal_info_t *info,
  49                             ompi_file_t *fh)
  50 {
  51     int ret = OMPI_SUCCESS;
  52     mca_common_ompio_data_t *data=NULL;
  53     bool use_sharedfp = true;
  54 
  55 
  56     
  57 
  58 
  59 
  60     data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
  61     if ( NULL == data ) {
  62         return  OMPI_ERR_OUT_OF_RESOURCE;
  63     }
  64 
  65 
  66     
  67     data->ompio_fh.f_fh = fh;
  68     
  69 
  70 
  71     ret = mca_common_ompio_file_open(comm,filename,amode,info,&data->ompio_fh,use_sharedfp);
  72 
  73     if ( OMPI_SUCCESS == ret ) {
  74         fh->f_flags |= OMPIO_FILE_IS_OPEN;
  75     }
  76 
  77 
  78 
  79 
  80     return ret;
  81 }
  82 
  83 int mca_io_ompio_file_close (ompi_file_t *fh)
  84 {
  85     int ret = OMPI_SUCCESS;
  86     mca_common_ompio_data_t *data;
  87 
  88     data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
  89     if ( NULL == data ) {
  90         
  91         return ret;
  92     }
  93     
  94 
  95 
  96 
  97     ret = mca_common_ompio_file_close(&data->ompio_fh);
  98 
  99     if ( NULL != data ) {
 100       free ( data );
 101     }
 102 
 103     return ret;
 104 }
 105 
 106 int mca_io_ompio_file_preallocate (ompi_file_t *fh,
 107                                    OMPI_MPI_OFFSET_TYPE diskspace)
 108 {
 109     int ret = OMPI_SUCCESS, cycles, i;
 110     OMPI_MPI_OFFSET_TYPE tmp, current_size, size, written, len;
 111     mca_common_ompio_data_t *data;
 112     char *buf = NULL;
 113     ompi_status_public_t *status = NULL;
 114 
 115     data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
 116 
 117     OPAL_THREAD_LOCK(&fh->f_lock);
 118     tmp = diskspace;
 119 
 120     ret = data->ompio_fh.f_comm->c_coll->coll_bcast (&tmp,
 121                                                     1,
 122                                                     OMPI_OFFSET_DATATYPE,
 123                                                     OMPIO_ROOT,
 124                                                     data->ompio_fh.f_comm,
 125                                                     data->ompio_fh.f_comm->c_coll->coll_bcast_module);
 126     if ( OMPI_SUCCESS != ret ) {
 127         OPAL_THREAD_UNLOCK(&fh->f_lock);
 128         return OMPI_ERROR;
 129     }
 130 
 131     if (tmp != diskspace) {
 132         OPAL_THREAD_UNLOCK(&fh->f_lock);
 133         return OMPI_ERROR;
 134     }
 135     ret = data->ompio_fh.f_fs->fs_file_get_size (&data->ompio_fh,
 136                                                  ¤t_size);
 137     if ( OMPI_SUCCESS != ret ) {
 138         OPAL_THREAD_UNLOCK(&fh->f_lock);
 139         return OMPI_ERROR;
 140     }
 141     
 142     if ( current_size > diskspace ) {
 143         OPAL_THREAD_UNLOCK(&fh->f_lock);
 144         return OMPI_SUCCESS;
 145     }
 146 
 147 
 148     
 149 
 150 
 151 
 152 
 153 
 154 
 155     if (OMPIO_ROOT == data->ompio_fh.f_rank) {
 156         OMPI_MPI_OFFSET_TYPE prev_offset;
 157         mca_common_ompio_file_get_position (&data->ompio_fh, &prev_offset );
 158 
 159         size = diskspace;
 160         if (size > current_size) {
 161             size = current_size;
 162         }
 163 
 164         cycles = (size + OMPIO_PREALLOC_MAX_BUF_SIZE - 1)/
 165             OMPIO_PREALLOC_MAX_BUF_SIZE;
 166         buf = (char *) malloc (OMPIO_PREALLOC_MAX_BUF_SIZE);
 167         if (NULL == buf) {
 168             opal_output(1, "OUT OF MEMORY\n");
 169             ret = OMPI_ERR_OUT_OF_RESOURCE;
 170             goto exit;
 171         }
 172         written = 0;
 173 
 174         for (i=0; i<cycles; i++) {
 175             len = OMPIO_PREALLOC_MAX_BUF_SIZE;
 176             if (len > size-written) {
 177                 len = size - written;
 178             }
 179             ret = mca_common_ompio_file_read (&data->ompio_fh, buf, len, MPI_BYTE, status);
 180             if (ret != OMPI_SUCCESS) {
 181                 goto exit;
 182             }
 183             ret = mca_common_ompio_file_write (&data->ompio_fh, buf, len, MPI_BYTE, status);
 184             if (ret != OMPI_SUCCESS) {
 185                 goto exit;
 186             }
 187             written += len;
 188         }
 189 
 190         if (diskspace > current_size) {
 191             memset(buf, 0, OMPIO_PREALLOC_MAX_BUF_SIZE);
 192             size = diskspace - current_size;
 193             cycles = (size + OMPIO_PREALLOC_MAX_BUF_SIZE - 1) /
 194                 OMPIO_PREALLOC_MAX_BUF_SIZE;
 195             for (i=0; i<cycles; i++) {
 196                 len = OMPIO_PREALLOC_MAX_BUF_SIZE;
 197                 if (len > diskspace-written) {
 198                     len = diskspace - written;
 199                 }
 200                 ret = mca_common_ompio_file_write (&data->ompio_fh, buf, len, MPI_BYTE, status);
 201                 if (ret != OMPI_SUCCESS) {
 202                     goto exit;
 203                 }
 204                 written += len;
 205             }
 206         }
 207 
 208         
 209         mca_common_ompio_set_explicit_offset ( &data->ompio_fh, prev_offset);
 210     }
 211 
 212 exit:     
 213     free ( buf );
 214     fh->f_comm->c_coll->coll_bcast ( &ret, 1, MPI_INT, OMPIO_ROOT, fh->f_comm,
 215                                    fh->f_comm->c_coll->coll_bcast_module);
 216     
 217     if ( diskspace > current_size ) {
 218         data->ompio_fh.f_fs->fs_file_set_size (&data->ompio_fh, diskspace);
 219     }
 220     OPAL_THREAD_UNLOCK(&fh->f_lock);
 221 
 222     return ret;
 223 }
 224 
 225 int mca_io_ompio_file_set_size (ompi_file_t *fh,
 226                                 OMPI_MPI_OFFSET_TYPE size)
 227 {
 228     int ret = OMPI_SUCCESS;
 229     OMPI_MPI_OFFSET_TYPE tmp;
 230     mca_common_ompio_data_t *data;
 231 
 232     data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
 233 
 234     tmp = size;
 235     OPAL_THREAD_LOCK(&fh->f_lock);
 236     ret = data->ompio_fh.f_comm->c_coll->coll_bcast (&tmp,
 237                                                     1,
 238                                                     OMPI_OFFSET_DATATYPE,
 239                                                     OMPIO_ROOT,
 240                                                     data->ompio_fh.f_comm,
 241                                                     data->ompio_fh.f_comm->c_coll->coll_bcast_module);
 242     if ( OMPI_SUCCESS != ret ) {
 243         opal_output(1, ",mca_io_ompio_file_set_size: error in bcast\n");
 244         OPAL_THREAD_UNLOCK(&fh->f_lock);
 245         return ret;
 246     }
 247     
 248 
 249     if (tmp != size) {
 250         OPAL_THREAD_UNLOCK(&fh->f_lock);
 251         return OMPI_ERROR;
 252     }
 253 
 254     ret = data->ompio_fh.f_fs->fs_file_set_size (&data->ompio_fh, size);
 255     if ( OMPI_SUCCESS != ret ) {
 256         opal_output(1, ",mca_io_ompio_file_set_size: error in fs->set_size\n");
 257         OPAL_THREAD_UNLOCK(&fh->f_lock);
 258         return ret;
 259     }
 260     
 261     ret = data->ompio_fh.f_comm->c_coll->coll_barrier (data->ompio_fh.f_comm,
 262                                                       data->ompio_fh.f_comm->c_coll->coll_barrier_module);
 263     if ( OMPI_SUCCESS != ret ) {
 264         opal_output(1, ",mca_io_ompio_file_set_size: error in barrier\n");
 265         OPAL_THREAD_UNLOCK(&fh->f_lock);
 266         return ret;
 267     }
 268     OPAL_THREAD_UNLOCK(&fh->f_lock);
 269 
 270     return ret;
 271 }
 272 
 273 int mca_io_ompio_file_get_size (ompi_file_t *fh,
 274                             OMPI_MPI_OFFSET_TYPE *size)
 275 {
 276     int ret = OMPI_SUCCESS;
 277     mca_common_ompio_data_t *data;
 278 
 279     data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
 280     OPAL_THREAD_LOCK(&fh->f_lock);
 281     ret = mca_common_ompio_file_get_size(&data->ompio_fh,size);
 282     OPAL_THREAD_UNLOCK(&fh->f_lock);
 283 
 284     return ret;
 285 }
 286 
 287 
 288 int mca_io_ompio_file_get_amode (ompi_file_t *fh,
 289                                  int *amode)
 290 {
 291     mca_common_ompio_data_t *data;
 292 
 293     data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
 294     
 295 
 296     *amode = data->ompio_fh.f_amode;
 297 
 298     return OMPI_SUCCESS;
 299 }
 300 
 301 
 302 int mca_io_ompio_file_get_type_extent (ompi_file_t *fh,
 303                                        struct ompi_datatype_t *datatype,
 304                                        MPI_Aint *extent)
 305 {
 306     opal_datatype_type_extent (&datatype->super, extent);
 307     return OMPI_SUCCESS;
 308 }
 309 
 310 
 311 int mca_io_ompio_file_set_atomicity (ompi_file_t *fh,
 312                                      int flag)
 313 {
 314     int tmp;
 315     mca_common_ompio_data_t *data;
 316 
 317     data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
 318 
 319     OPAL_THREAD_LOCK(&fh->f_lock);
 320     if (flag) {
 321         flag = 1;
 322     }
 323 
 324     
 325     tmp = flag;
 326     data->ompio_fh.f_comm->c_coll->coll_bcast (&tmp,
 327                                               1,
 328                                               MPI_INT,
 329                                               OMPIO_ROOT,
 330                                               data->ompio_fh.f_comm,
 331                                               data->ompio_fh.f_comm->c_coll->coll_bcast_module);
 332 
 333     if (tmp != flag) {
 334         OPAL_THREAD_UNLOCK(&fh->f_lock);
 335         return OMPI_ERROR;
 336     }
 337 
 338     data->ompio_fh.f_atomicity = flag;
 339     OPAL_THREAD_UNLOCK(&fh->f_lock);
 340 
 341     return OMPI_SUCCESS;
 342 }
 343 
 344 int mca_io_ompio_file_get_atomicity (ompi_file_t *fh,
 345                                      int *flag)
 346 {
 347     mca_common_ompio_data_t *data;
 348 
 349     data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
 350 
 351     OPAL_THREAD_LOCK(&fh->f_lock);
 352     *flag = data->ompio_fh.f_atomicity;
 353     OPAL_THREAD_UNLOCK(&fh->f_lock);
 354 
 355     return OMPI_SUCCESS;
 356 }
 357 
 358 int mca_io_ompio_file_sync (ompi_file_t *fh)
 359 {
 360     int ret = OMPI_SUCCESS;
 361     mca_common_ompio_data_t *data;
 362 
 363     data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
 364 
 365     OPAL_THREAD_LOCK(&fh->f_lock);
 366     if ( !opal_list_is_empty (&mca_common_ompio_pending_requests) ) {
 367         OPAL_THREAD_UNLOCK(&fh->f_lock);
 368         return MPI_ERR_OTHER;
 369     }
 370 
 371     if ( data->ompio_fh.f_amode & MPI_MODE_RDONLY ) {
 372         OPAL_THREAD_UNLOCK(&fh->f_lock);
 373         return MPI_ERR_ACCESS;
 374     }        
 375     
 376     ret = data->ompio_fh.f_comm->c_coll->coll_barrier (data->ompio_fh.f_comm,
 377                                                        data->ompio_fh.f_comm->c_coll->coll_barrier_module);
 378     if ( MPI_SUCCESS != ret ) {
 379         OPAL_THREAD_UNLOCK(&fh->f_lock);
 380         return ret;
 381     }
 382     ret = data->ompio_fh.f_fs->fs_file_sync (&data->ompio_fh);
 383     OPAL_THREAD_UNLOCK(&fh->f_lock);
 384 
 385     return ret;
 386 }
 387 
 388 
 389 int mca_io_ompio_file_seek (ompi_file_t *fh,
 390                             OMPI_MPI_OFFSET_TYPE off,
 391                             int whence)
 392 {
 393     int ret = OMPI_SUCCESS;
 394     mca_common_ompio_data_t *data;
 395     OMPI_MPI_OFFSET_TYPE offset, temp_offset;
 396 
 397     data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
 398 
 399     OPAL_THREAD_LOCK(&fh->f_lock);
 400     offset = off * data->ompio_fh.f_etype_size;
 401 
 402     switch(whence) {
 403     case MPI_SEEK_SET:
 404         if (offset < 0) {
 405             OPAL_THREAD_UNLOCK(&fh->f_lock);
 406             return OMPI_ERROR;
 407         }
 408         break;
 409     case MPI_SEEK_CUR:
 410         ret = mca_common_ompio_file_get_position (&data->ompio_fh,
 411                                                   &temp_offset);
 412         offset += temp_offset;
 413         if (offset < 0) {
 414             OPAL_THREAD_UNLOCK(&fh->f_lock);
 415             return OMPI_ERROR;
 416         }
 417         break;
 418     case MPI_SEEK_END:
 419         ret = data->ompio_fh.f_fs->fs_file_get_size (&data->ompio_fh,
 420                                                      &temp_offset);
 421         offset += temp_offset;
 422         if (offset < 0 || OMPI_SUCCESS != ret) {
 423             OPAL_THREAD_UNLOCK(&fh->f_lock);
 424             return OMPI_ERROR;
 425         }
 426         break;
 427     default:
 428         OPAL_THREAD_UNLOCK(&fh->f_lock);
 429         return OMPI_ERROR;
 430     }
 431 
 432     ret = mca_common_ompio_set_explicit_offset (&data->ompio_fh,
 433                                              offset/data->ompio_fh.f_etype_size);
 434     OPAL_THREAD_UNLOCK(&fh->f_lock);
 435 
 436     return ret;
 437 }
 438 
 439 int mca_io_ompio_file_get_position (ompi_file_t *fd,
 440                                     OMPI_MPI_OFFSET_TYPE *offset)
 441 {
 442     int ret=OMPI_SUCCESS;
 443     mca_common_ompio_data_t *data=NULL;
 444     ompio_file_t *fh=NULL;
 445 
 446     data = (mca_common_ompio_data_t *) fd->f_io_selected_data;
 447     fh = &data->ompio_fh;
 448 
 449     OPAL_THREAD_LOCK(&fd->f_lock);
 450     ret = mca_common_ompio_file_get_position (fh, offset);
 451     OPAL_THREAD_UNLOCK(&fd->f_lock);
 452 
 453     return ret;
 454 }
 455 
 456 
 457 int mca_io_ompio_file_get_byte_offset (ompi_file_t *fh,
 458                                        OMPI_MPI_OFFSET_TYPE offset,
 459                                        OMPI_MPI_OFFSET_TYPE *disp)
 460 {
 461     mca_common_ompio_data_t *data;
 462     int i, k, index;
 463     long temp_offset;
 464 
 465     data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
 466 
 467     OPAL_THREAD_LOCK(&fh->f_lock);
 468     temp_offset = (long) data->ompio_fh.f_view_extent *
 469         (offset*data->ompio_fh.f_etype_size / data->ompio_fh.f_view_size);
 470     if ( 0 > temp_offset ) {
 471         OPAL_THREAD_UNLOCK(&fh->f_lock);
 472         return MPI_ERR_ARG;
 473     }
 474     
 475     i = (offset*data->ompio_fh.f_etype_size) % data->ompio_fh.f_view_size;
 476     index = 0;
 477     k = 0;
 478 
 479     while (1) {
 480         k = data->ompio_fh.f_decoded_iov[index].iov_len;
 481         if (i >= k) {
 482             i -= k;
 483             index++;
 484             if ( 0 == i ) {
 485                 k=0;
 486                 break;
 487             }
 488         }
 489         else {
 490             k=i;
 491             break;
 492         }
 493     }
 494 
 495     *disp = data->ompio_fh.f_disp + temp_offset +
 496         (OMPI_MPI_OFFSET_TYPE)(intptr_t)data->ompio_fh.f_decoded_iov[index].iov_base + k;
 497     OPAL_THREAD_UNLOCK(&fh->f_lock);
 498 
 499     return OMPI_SUCCESS;
 500 }
 501 
 502 int mca_io_ompio_file_seek_shared (ompi_file_t *fp,
 503                                    OMPI_MPI_OFFSET_TYPE offset,
 504                                    int whence)
 505 {
 506     int ret = OMPI_SUCCESS;
 507     mca_common_ompio_data_t *data;
 508     ompio_file_t *fh;
 509     mca_sharedfp_base_module_t * shared_fp_base_module;
 510 
 511     data = (mca_common_ompio_data_t *) fp->f_io_selected_data;
 512     fh = &data->ompio_fh;
 513 
 514     
 515     shared_fp_base_module = fh->f_sharedfp;
 516     if ( NULL == shared_fp_base_module ){
 517         opal_output(0, "No shared file pointer component found for this communicator. Can not execute\n");
 518         return OMPI_ERROR;
 519     }
 520 
 521     OPAL_THREAD_LOCK(&fp->f_lock);
 522     ret = shared_fp_base_module->sharedfp_seek(fh,offset,whence);
 523     OPAL_THREAD_UNLOCK(&fp->f_lock);
 524 
 525     return ret;
 526 }
 527 
 528 
 529 int mca_io_ompio_file_get_position_shared (ompi_file_t *fp,
 530                                            OMPI_MPI_OFFSET_TYPE * offset)
 531 {
 532     int ret = OMPI_SUCCESS;
 533     mca_common_ompio_data_t *data;
 534     ompio_file_t *fh;
 535     mca_sharedfp_base_module_t * shared_fp_base_module;
 536 
 537     data = (mca_common_ompio_data_t *) fp->f_io_selected_data;
 538     fh = &data->ompio_fh;
 539 
 540     
 541     shared_fp_base_module = fh->f_sharedfp;
 542     if ( NULL == shared_fp_base_module ){
 543         opal_output(0, "No shared file pointer component found for this communicator. Can not execute\n");
 544         return OMPI_ERROR;
 545     }
 546     OPAL_THREAD_LOCK(&fp->f_lock);
 547     ret = shared_fp_base_module->sharedfp_get_position(fh,offset);
 548     *offset = *offset / fh->f_etype_size;
 549     OPAL_THREAD_UNLOCK(&fp->f_lock);
 550 
 551     return ret;
 552 }
 553