root/ompi/mca/common/ompio/common_ompio_file_read.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_common_ompio_file_read
  2. mca_common_ompio_file_read_at
  3. mca_common_ompio_file_iread
  4. mca_common_ompio_file_iread_at
  5. mca_common_ompio_file_read_all
  6. mca_common_ompio_file_read_at_all
  7. mca_common_ompio_file_iread_all
  8. mca_common_ompio_file_iread_at_all
  9. mca_common_ompio_set_explicit_offset

   1 /*
   2  *  Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   3  *                          University Research and Technology
   4  *                          Corporation.  All rights reserved.
   5  *  Copyright (c) 2004-2016 The University of Tennessee and The University
   6  *                          of Tennessee Research Foundation.  All rights
   7  *                          reserved.
   8  *  Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                          University of Stuttgart.  All rights reserved.
  10  *  Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                          All rights reserved.
  12  *  Copyright (c) 2008-2019 University of Houston. All rights reserved.
  13  *  Copyright (c) 2018      Research Organization for Information Science
  14  *                          and Technology (RIST). All rights reserved.
  15  *  $COPYRIGHT$
  16  *
  17  *  Additional copyrights may follow
  18  *
  19  *  $HEADER$
  20  */
  21 
  22 #include "ompi_config.h"
  23 
  24 #include "ompi/communicator/communicator.h"
  25 #include "ompi/info/info.h"
  26 #include "ompi/file/file.h"
  27 #include "ompi/mca/fs/fs.h"
  28 #include "ompi/mca/fs/base/base.h"
  29 #include "ompi/mca/fcoll/fcoll.h"
  30 #include "ompi/mca/fcoll/base/base.h"
  31 #include "ompi/mca/fbtl/fbtl.h"
  32 #include "ompi/mca/fbtl/base/base.h"
  33 
  34 #include "common_ompio.h"
  35 #include "common_ompio_request.h"
  36 #include "common_ompio_buffer.h"
  37 #include <unistd.h>
  38 #include <math.h>
  39 
  40 
  41 /* Read and write routines are split into two interfaces.
  42 **   The
  43 **   mca_io_ompio_file_read/write[_at]
  44 **
  45 ** routines are the ones registered with the ompio modules.
  46 ** The
  47 **
  48 ** mca_common_ompio_file_read/write[_at]
  49 **
  50 ** routesin are used e.g. from the shared file pointer modules.
  51 ** The main difference is, that the first one takes an ompi_file_t
  52 ** as a file pointer argument, while the second uses the ompio internal
  53 ** ompio_file_t structure.
  54 */
  55 
  56 int mca_common_ompio_file_read (ompio_file_t *fh,
  57                               void *buf,
  58                               int count,
  59                               struct ompi_datatype_t *datatype,
  60                               ompi_status_public_t *status)
  61 {
  62     int ret = OMPI_SUCCESS;
  63 
  64     size_t total_bytes_read = 0;       /* total bytes that have been read*/
  65     size_t bytes_per_cycle = 0;        /* total read in each cycle by each process*/
  66     int index = 0;
  67     int cycles = 0;
  68 
  69     uint32_t iov_count = 0;
  70     struct iovec *decoded_iov = NULL;
  71 
  72     size_t max_data=0, real_bytes_read=0;
  73     size_t spc=0;
  74     ssize_t ret_code=0;
  75     int i = 0; /* index into the decoded iovec of the buffer */
  76     int j = 0; /* index into the file vie iovec */
  77 
  78     if (fh->f_amode & MPI_MODE_WRONLY){
  79 //      opal_output(10, "Improper use of FILE Mode, Using WRONLY for Read!\n");
  80         ret = MPI_ERR_ACCESS;
  81       return ret;
  82     }
  83 
  84     if ( 0 == count ) {
  85         if ( MPI_STATUS_IGNORE != status ) {
  86             status->_ucount = 0;
  87         }
  88         return ret;
  89     }
  90 
  91     bool need_to_copy = false;    
  92     opal_convertor_t convertor;
  93 #if OPAL_CUDA_SUPPORT
  94     int is_gpu, is_managed;
  95     mca_common_ompio_check_gpu_buf ( fh, buf, &is_gpu, &is_managed);
  96     if ( is_gpu && !is_managed ) {
  97         need_to_copy = true;
  98     }
  99 #endif
 100 
 101     if ( !( fh->f_flags & OMPIO_DATAREP_NATIVE ) &&
 102          !(datatype == &ompi_mpi_byte.dt  ||
 103            datatype == &ompi_mpi_char.dt   )) {
 104         /* only need to copy if any of these conditions are given:
 105            1. buffer is an unmanaged CUDA buffer (checked above).
 106            2. Datarepresentation is anything other than 'native' and
 107            3. datatype is not byte or char (i.e it does require some actual
 108               work to be done e.g. for external32.
 109         */
 110         need_to_copy = true;
 111     }         
 112     
 113     if ( need_to_copy ) {
 114         char *tbuf=NULL;
 115 
 116         OMPIO_PREPARE_READ_BUF(fh,buf,count,datatype,tbuf,&convertor,max_data,decoded_iov,iov_count);        
 117     }
 118     else {
 119         mca_common_ompio_decode_datatype (fh,
 120                                           datatype,
 121                                           count,
 122                                           buf,
 123                                           &max_data,
 124                                           fh->f_mem_convertor,
 125                                           &decoded_iov,
 126                                           &iov_count);
 127     }
 128 
 129     if ( 0 < max_data && 0 == fh->f_iov_count  ) {
 130         if ( MPI_STATUS_IGNORE != status ) {
 131             status->_ucount = 0;
 132         }
 133         if (NULL != decoded_iov) {
 134             free (decoded_iov);
 135             decoded_iov = NULL;
 136         }
 137         return OMPI_SUCCESS;
 138     }
 139 
 140     if ( -1 == OMPIO_MCA_GET(fh, cycle_buffer_size )) {
 141         bytes_per_cycle = max_data;
 142     }
 143     else {
 144         bytes_per_cycle = OMPIO_MCA_GET(fh, cycle_buffer_size);
 145     }
 146     cycles = ceil((double)max_data/bytes_per_cycle);
 147     
 148 #if 0
 149         printf ("Bytes per Cycle: %d   Cycles: %d max_data:%d \n",bytes_per_cycle, cycles, max_data);
 150 #endif
 151 
 152     j = fh->f_index_in_file_view;
 153 
 154     for (index = 0; index < cycles; index++) {
 155 
 156         mca_common_ompio_build_io_array ( fh,
 157                                           index,
 158                                           cycles,
 159                                           bytes_per_cycle,
 160                                           max_data,
 161                                           iov_count,
 162                                           decoded_iov,
 163                                           &i,
 164                                           &j,
 165                                           &total_bytes_read, 
 166                                           &spc,
 167                                           &fh->f_io_array,
 168                                           &fh->f_num_of_io_entries);
 169 
 170         if (fh->f_num_of_io_entries) {
 171             ret_code = fh->f_fbtl->fbtl_preadv (fh);
 172             if ( 0<= ret_code ) {
 173                 real_bytes_read+=(size_t)ret_code;
 174             }
 175         }
 176 
 177         fh->f_num_of_io_entries = 0;
 178         if (NULL != fh->f_io_array) {
 179             free (fh->f_io_array);
 180             fh->f_io_array = NULL;
 181         }
 182     }
 183 
 184     if ( need_to_copy ) {
 185         size_t pos=0;
 186 
 187         opal_convertor_unpack (&convertor, decoded_iov, &iov_count, &pos );
 188         opal_convertor_cleanup (&convertor);
 189         mca_common_ompio_release_buf (fh, decoded_iov->iov_base);
 190     }
 191 
 192     if (NULL != decoded_iov) {
 193         free (decoded_iov);
 194         decoded_iov = NULL;
 195     }
 196 
 197     if ( MPI_STATUS_IGNORE != status ) {
 198         status->_ucount = real_bytes_read;
 199     }
 200 
 201     return ret;
 202 }
 203 
 204 int mca_common_ompio_file_read_at (ompio_file_t *fh,
 205                                  OMPI_MPI_OFFSET_TYPE offset,
 206                                  void *buf,
 207                                  int count,
 208                                  struct ompi_datatype_t *datatype,
 209                                  ompi_status_public_t * status)
 210 {
 211     int ret = OMPI_SUCCESS;
 212     OMPI_MPI_OFFSET_TYPE prev_offset;
 213 
 214     mca_common_ompio_file_get_position (fh, &prev_offset );
 215 
 216     mca_common_ompio_set_explicit_offset (fh, offset);
 217     ret = mca_common_ompio_file_read (fh,
 218                                     buf,
 219                                     count,
 220                                     datatype,
 221                                     status);
 222 
 223     // An explicit offset file operation is not suppsed to modify
 224     // the internal file pointer. So reset the pointer
 225     // to the previous value
 226     mca_common_ompio_set_explicit_offset (fh, prev_offset);
 227 
 228     return ret;
 229 }
 230 
 231 
 232 int mca_common_ompio_file_iread (ompio_file_t *fh,
 233                                void *buf,
 234                                int count,
 235                                struct ompi_datatype_t *datatype,
 236                                ompi_request_t **request)
 237 {
 238     int ret = OMPI_SUCCESS;
 239     mca_ompio_request_t *ompio_req=NULL;
 240     size_t spc=0;
 241 
 242     if (fh->f_amode & MPI_MODE_WRONLY){
 243 //      opal_output(10, "Improper use of FILE Mode, Using WRONLY for Read!\n");
 244         ret = MPI_ERR_ACCESS;
 245       return ret;
 246     }
 247 
 248     mca_common_ompio_request_alloc ( &ompio_req, MCA_OMPIO_REQUEST_READ);
 249 
 250     if ( 0 == count ) {
 251         ompio_req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
 252         ompio_req->req_ompi.req_status._ucount = 0;
 253         ompi_request_complete (&ompio_req->req_ompi, false);
 254         *request = (ompi_request_t *) ompio_req;
 255         
 256         return OMPI_SUCCESS;
 257     }
 258 
 259     if ( NULL != fh->f_fbtl->fbtl_ipreadv ) {
 260         // This fbtl has support for non-blocking operations
 261 
 262         size_t total_bytes_read = 0;       /* total bytes that have been read*/
 263         uint32_t iov_count = 0;
 264         struct iovec *decoded_iov = NULL;
 265         
 266         size_t max_data = 0;
 267         int i = 0; /* index into the decoded iovec of the buffer */
 268         int j = 0; /* index into the file vie iovec */
 269         
 270         bool need_to_copy = false;    
 271     
 272 #if OPAL_CUDA_SUPPORT
 273         int is_gpu, is_managed;
 274         mca_common_ompio_check_gpu_buf ( fh, buf, &is_gpu, &is_managed);
 275         if ( is_gpu && !is_managed ) {
 276             need_to_copy = true;
 277         }
 278 #endif
 279 
 280         if ( !( fh->f_flags & OMPIO_DATAREP_NATIVE ) &&
 281              !(datatype == &ompi_mpi_byte.dt  ||
 282                datatype == &ompi_mpi_char.dt   )) {
 283             /* only need to copy if any of these conditions are given:
 284                1. buffer is an unmanaged CUDA buffer (checked above).
 285                2. Datarepresentation is anything other than 'native' and
 286                3. datatype is not byte or char (i.e it does require some actual
 287                work to be done e.g. for external32.
 288             */
 289             need_to_copy = true;
 290         }         
 291         
 292         if ( need_to_copy ) {
 293             char *tbuf=NULL;
 294             
 295             OMPIO_PREPARE_READ_BUF(fh,buf,count,datatype,tbuf,&ompio_req->req_convertor,max_data,decoded_iov,iov_count); 
 296             
 297             ompio_req->req_tbuf = tbuf;
 298             ompio_req->req_size = max_data;
 299         }
 300         else {
 301             mca_common_ompio_decode_datatype (fh,
 302                                               datatype,
 303                                               count,
 304                                               buf,
 305                                               &max_data,
 306                                               fh->f_mem_convertor,
 307                                               &decoded_iov,
 308                                               &iov_count);
 309         }
 310     
 311         if ( 0 < max_data && 0 == fh->f_iov_count  ) {
 312             ompio_req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
 313             ompio_req->req_ompi.req_status._ucount = 0;
 314             ompi_request_complete (&ompio_req->req_ompi, false);
 315             *request = (ompi_request_t *) ompio_req;
 316             if (NULL != decoded_iov) {
 317                 free (decoded_iov);
 318                 decoded_iov = NULL;
 319             }
 320 
 321             return OMPI_SUCCESS;
 322         }
 323 
 324         // Non-blocking operations have to occur in a single cycle
 325         j = fh->f_index_in_file_view;
 326         
 327         mca_common_ompio_build_io_array ( fh,
 328                                           0,         // index
 329                                           1,         // no. of cyces
 330                                           max_data,  // setting bytes per cycle to match data
 331                                           max_data,
 332                                           iov_count,
 333                                           decoded_iov,
 334                                           &i,
 335                                           &j,
 336                                           &total_bytes_read, 
 337                                           &spc,
 338                                           &fh->f_io_array,
 339                                           &fh->f_num_of_io_entries);
 340 
 341         if (fh->f_num_of_io_entries) {
 342           fh->f_fbtl->fbtl_ipreadv (fh, (ompi_request_t *) ompio_req);
 343         }
 344 
 345         mca_common_ompio_register_progress ();
 346 
 347         fh->f_num_of_io_entries = 0;
 348         if (NULL != fh->f_io_array) {
 349             free (fh->f_io_array);
 350             fh->f_io_array = NULL;
 351         }
 352 
 353         if (NULL != decoded_iov) {
 354             free (decoded_iov);
 355             decoded_iov = NULL;
 356         }
 357     }
 358     else {
 359         // This fbtl does not  support non-blocking operations
 360         ompi_status_public_t status;
 361         ret = mca_common_ompio_file_read (fh, buf, count, datatype, &status);
 362 
 363         ompio_req->req_ompi.req_status.MPI_ERROR = ret;
 364         ompio_req->req_ompi.req_status._ucount = status._ucount;
 365         ompi_request_complete (&ompio_req->req_ompi, false);
 366     }
 367 
 368     *request = (ompi_request_t *) ompio_req;
 369     return ret;
 370 }
 371 
 372 
 373 int mca_common_ompio_file_iread_at (ompio_file_t *fh,
 374                                   OMPI_MPI_OFFSET_TYPE offset,
 375                                   void *buf,
 376                                   int count,
 377                                   struct ompi_datatype_t *datatype,
 378                                   ompi_request_t **request)
 379 {
 380     int ret = OMPI_SUCCESS;
 381     OMPI_MPI_OFFSET_TYPE prev_offset;
 382     mca_common_ompio_file_get_position (fh, &prev_offset );
 383 
 384     mca_common_ompio_set_explicit_offset (fh, offset);
 385     ret = mca_common_ompio_file_iread (fh,
 386                                     buf,
 387                                     count,
 388                                     datatype,
 389                                     request);
 390 
 391     /* An explicit offset file operation is not suppsed to modify
 392     ** the internal file pointer. So reset the pointer
 393     ** to the previous value
 394     ** It is OK to reset the position already here, althgouth
 395     ** the operation might still be pending/ongoing, since
 396     ** the entire array of <offset, length, memaddress> have
 397     ** already been constructed in the file_iread operation
 398     */
 399     mca_common_ompio_set_explicit_offset (fh, prev_offset);
 400 
 401     return ret;
 402 }
 403 
 404 
 405 /* Infrastructure for collective operations  */
 406 int mca_common_ompio_file_read_all (ompio_file_t *fh,
 407                                     void *buf,
 408                                     int count,
 409                                     struct ompi_datatype_t *datatype,
 410                                     ompi_status_public_t * status)
 411 {
 412     int ret = OMPI_SUCCESS;
 413 
 414 
 415     if ( !( fh->f_flags & OMPIO_DATAREP_NATIVE ) &&
 416          !(datatype == &ompi_mpi_byte.dt  ||
 417            datatype == &ompi_mpi_char.dt   )) {
 418         /* No need to check for GPU buffer for collective I/O.
 419            Most algorithms copy data from aggregators, and send/recv
 420            to/from GPU buffers works if ompi was compiled was GPU support.
 421            
 422            If the individual fcoll component is used: there are no aggregators 
 423            in that concept. However, since they call common_ompio_file_write, 
 424            CUDA buffers are handled by that routine.
 425 
 426            Thus, we only check for
 427            1. Datarepresentation is anything other than 'native' and
 428            2. datatype is not byte or char (i.e it does require some actual
 429               work to be done e.g. for external32.
 430         */
 431         size_t pos=0, max_data=0;
 432         char *tbuf=NULL;
 433         opal_convertor_t convertor;
 434         struct iovec *decoded_iov = NULL;
 435         uint32_t iov_count = 0;
 436 
 437         OMPIO_PREPARE_READ_BUF(fh,buf,count,datatype,tbuf,&convertor,max_data,decoded_iov,iov_count);   
 438         ret = fh->f_fcoll->fcoll_file_read_all (fh,
 439                                                 decoded_iov->iov_base,
 440                                                 decoded_iov->iov_len,
 441                                                 MPI_BYTE,
 442                                                 status);
 443         opal_convertor_unpack (&convertor, decoded_iov, &iov_count, &pos );
 444 
 445         opal_convertor_cleanup (&convertor);
 446         mca_common_ompio_release_buf (fh, decoded_iov->iov_base);
 447         if (NULL != decoded_iov) {
 448             free (decoded_iov);
 449             decoded_iov = NULL;
 450         }
 451     }
 452     else {
 453         ret = fh->f_fcoll->fcoll_file_read_all (fh,
 454                                                 buf,
 455                                                 count,
 456                                                 datatype,
 457                                                 status);
 458     }
 459     return ret;
 460 }
 461 
 462 int mca_common_ompio_file_read_at_all (ompio_file_t *fh,
 463                                      OMPI_MPI_OFFSET_TYPE offset,
 464                                      void *buf,
 465                                      int count,
 466                                      struct ompi_datatype_t *datatype,
 467                                      ompi_status_public_t * status)
 468 {
 469     int ret = OMPI_SUCCESS;
 470     OMPI_MPI_OFFSET_TYPE prev_offset;
 471     mca_common_ompio_file_get_position (fh, &prev_offset );
 472 
 473     mca_common_ompio_set_explicit_offset (fh, offset);
 474     ret = mca_common_ompio_file_read_all (fh,
 475                                           buf,
 476                                           count,
 477                                           datatype,
 478                                           status);
 479     
 480     mca_common_ompio_set_explicit_offset (fh, prev_offset);
 481     return ret;
 482 }
 483 
 484 int mca_common_ompio_file_iread_all (ompio_file_t *fp,
 485                                      void *buf,
 486                                      int count,
 487                                      struct ompi_datatype_t *datatype,
 488                                      ompi_request_t **request)
 489 {
 490     int ret = OMPI_SUCCESS;
 491 
 492     if ( NULL != fp->f_fcoll->fcoll_file_iread_all ) {
 493         ret = fp->f_fcoll->fcoll_file_iread_all (fp,
 494                                                  buf,
 495                                                  count,
 496                                                  datatype,
 497                                                  request);
 498     }
 499     else {
 500         /* this fcoll component does not support non-blocking
 501            collective I/O operations. WE fake it with
 502            individual non-blocking I/O operations. */
 503         ret = mca_common_ompio_file_iread ( fp, buf, count, datatype, request );
 504     }
 505 
 506     return ret;
 507 }
 508 
 509 int mca_common_ompio_file_iread_at_all (ompio_file_t *fp,
 510                                       OMPI_MPI_OFFSET_TYPE offset,
 511                                       void *buf,
 512                                       int count,
 513                                       struct ompi_datatype_t *datatype,
 514                                       ompi_request_t **request)
 515 {
 516     int ret = OMPI_SUCCESS;
 517     OMPI_MPI_OFFSET_TYPE prev_offset;
 518 
 519     mca_common_ompio_file_get_position (fp, &prev_offset );
 520     mca_common_ompio_set_explicit_offset (fp, offset);
 521 
 522     ret = mca_common_ompio_file_iread_all (fp,
 523                                            buf,
 524                                            count,
 525                                            datatype,
 526                                            request);
 527     
 528     mca_common_ompio_set_explicit_offset (fp, prev_offset);
 529     return ret;
 530 }
 531 
 532 
 533 int mca_common_ompio_set_explicit_offset (ompio_file_t *fh,
 534                                           OMPI_MPI_OFFSET_TYPE offset)
 535 {
 536     int i = 0;
 537     int k = 0;
 538 
 539     if ( fh->f_view_size  > 0 ) {
 540         /* starting offset of the current copy of the filew view */
 541         fh->f_offset = (fh->f_view_extent *
 542                         ((offset*fh->f_etype_size) / fh->f_view_size)) + fh->f_disp;
 543 
 544 
 545         /* number of bytes used within the current copy of the file view */
 546         fh->f_total_bytes = (offset*fh->f_etype_size) % fh->f_view_size;
 547         i = fh->f_total_bytes;
 548 
 549 
 550         /* Initialize the block id and the starting offset of the current block
 551            within the current copy of the file view to zero */
 552         fh->f_index_in_file_view = 0;
 553         fh->f_position_in_file_view = 0;
 554 
 555         /* determine block id that the offset is located in and
 556            the starting offset of that block */
 557         k = fh->f_decoded_iov[fh->f_index_in_file_view].iov_len;
 558         while (i >= k) {
 559             fh->f_position_in_file_view = k;
 560             fh->f_index_in_file_view++;
 561             k += fh->f_decoded_iov[fh->f_index_in_file_view].iov_len;
 562         }
 563     }
 564 
 565     return OMPI_SUCCESS;
 566 }

/* [<][>][^][v][top][bottom][index][help] */