This source file includes following definitions.
- mca_sharedfp_lockedfile_iread
- mca_sharedfp_lockedfile_read_ordered_begin
- mca_sharedfp_lockedfile_read_ordered_end
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 
  17 
  18 
  19 
  20 
  21 
  22 
  23 #include "ompi_config.h"
  24 #include "sharedfp_lockedfile.h"
  25 
  26 #include "mpi.h"
  27 #include "ompi/constants.h"
  28 #include "ompi/mca/sharedfp/sharedfp.h"
  29 #include "ompi/mca/sharedfp/base/base.h"
  30 #include "ompi/mca/common/ompio/common_ompio.h"
  31 
  32 int mca_sharedfp_lockedfile_iread(ompio_file_t *fh,
  33                                   void *buf,
  34                                   int count,
  35                                   ompi_datatype_t *datatype,
  36                                   MPI_Request * request)
  37 {
  38     int ret = OMPI_SUCCESS;
  39     OMPI_MPI_OFFSET_TYPE offset = 0;
  40     long bytesRequested = 0;
  41     size_t numofBytes;
  42     struct mca_sharedfp_base_data_t *sh = NULL;
  43 
  44     if ( NULL == fh->f_sharedfp_data ) {
  45         opal_output(ompi_sharedfp_base_framework.framework_output,
  46                     "sharedfp_lockedfile_iread: module not initialized\n");
  47         return OMPI_ERROR;
  48     }
  49 
  50     
  51     opal_datatype_type_size ( &datatype->super, &numofBytes);
  52     bytesRequested = count * numofBytes;
  53 
  54     if ( mca_sharedfp_lockedfile_verbose ) {
  55         opal_output(ompi_sharedfp_base_framework.framework_output,
  56                     "sharedfp_lockedfile_iread - Bytes Requested is %ld\n",bytesRequested);
  57     }
  58 
  59 
  60     
  61     sh = fh->f_sharedfp_data;
  62 
  63     
  64     ret = mca_sharedfp_lockedfile_request_position(sh,bytesRequested,&offset);
  65     offset /= fh->f_etype_size;
  66 
  67     if ( -1 != ret )  {
  68         if ( mca_sharedfp_lockedfile_verbose ) {
  69             opal_output(ompi_sharedfp_base_framework.framework_output,
  70                         "sharedfp_lockedfile_iread - Offset received is %lld\n",offset);
  71         }
  72 
  73         
  74         ret = mca_common_ompio_file_iread_at(fh,offset,buf,count,datatype,request);
  75     }
  76 
  77     return ret;
  78 }
  79 
  80 int mca_sharedfp_lockedfile_read_ordered_begin(ompio_file_t *fh,
  81                                        void *buf,
  82                                        int count,
  83                                        struct ompi_datatype_t *datatype)
  84 {
  85     int ret = OMPI_SUCCESS;
  86     OMPI_MPI_OFFSET_TYPE offset = 0;
  87     long sendBuff = 0;
  88     long *buff=NULL;
  89     long offsetBuff;
  90     OMPI_MPI_OFFSET_TYPE offsetReceived = 0;
  91     long bytesRequested = 0;
  92     int recvcnt = 1, sendcnt = 1;
  93     size_t numofBytes;
  94     int rank, size, i;
  95     struct mca_sharedfp_base_data_t *sh = NULL;
  96 
  97     if(fh->f_sharedfp_data==NULL){
  98         opal_output(ompi_sharedfp_base_framework.framework_output,
  99                     "sharedfp_lockedfile_read_ordered_begin: module not initialized\n");
 100         return OMPI_ERROR;
 101     }
 102 
 103 
 104     if ( true == fh->f_split_coll_in_use ) {
 105         opal_output(ompi_sharedfp_base_framework.framework_output,
 106                     "Only one split collective I/O operation allowed per file handle at any "
 107                     "given point in time!\n");
 108         return MPI_ERR_REQUEST;
 109     }
 110 
 111     
 112     sh = fh->f_sharedfp_data;
 113 
 114     
 115     opal_datatype_type_size ( &datatype->super, &numofBytes);
 116     sendBuff = count * numofBytes;
 117 
 118     
 119     rank = ompi_comm_rank ( fh->f_comm );
 120     size = ompi_comm_size ( fh->f_comm );
 121 
 122     if ( 0 == rank ) {
 123         buff = (long*) malloc (sizeof(long) * size);
 124         if ( NULL == buff ) {
 125             return OMPI_ERR_OUT_OF_RESOURCE;
 126         }
 127     }
 128 
 129     ret = fh->f_comm->c_coll->coll_gather ( &sendBuff, sendcnt, OMPI_OFFSET_DATATYPE, buff, recvcnt,
 130                                             OMPI_OFFSET_DATATYPE, 0, fh->f_comm,
 131                                             fh->f_comm->c_coll->coll_gather_module );
 132     if ( OMPI_SUCCESS != ret ) {
 133         goto exit;
 134     }
 135 
 136     
 137 
 138 
 139     if (rank == 0) {
 140         for ( i = 0; i < size ; i ++)  {
 141             bytesRequested += buff[i];
 142             if ( mca_sharedfp_lockedfile_verbose ) {
 143                 opal_output(ompi_sharedfp_base_framework.framework_output,
 144                             "sharedfp_lockedfile_read_ordered_begin: Bytes requested are %ld\n",bytesRequested);
 145             }
 146         }
 147 
 148         
 149 
 150 
 151 
 152 
 153 
 154         ret = mca_sharedfp_lockedfile_request_position(sh,bytesRequested,&offsetReceived);
 155         if ( OMPI_SUCCESS != ret ){
 156             goto exit;
 157         }
 158         if ( mca_sharedfp_lockedfile_verbose ) {
 159             opal_output(ompi_sharedfp_base_framework.framework_output,
 160                         "sharedfp_lockedfile_read_ordered_begin: Offset received is %lld\n",offsetReceived);
 161         }
 162         buff[0] += offsetReceived;
 163         for (i = 1 ; i < size; i++) {
 164             buff[i] += buff[i-1];
 165         }
 166     }
 167 
 168     
 169     ret = fh->f_comm->c_coll->coll_scatter ( buff, sendcnt, OMPI_OFFSET_DATATYPE,
 170                                              &offsetBuff, recvcnt, OMPI_OFFSET_DATATYPE, 0,
 171                                              fh->f_comm, fh->f_comm->c_coll->coll_scatter_module );
 172     if ( OMPI_SUCCESS != ret ) {
 173         goto exit;
 174     }
 175 
 176     
 177     offset = offsetBuff - sendBuff;
 178     offset /= fh->f_etype_size;
 179 
 180     if ( mca_sharedfp_lockedfile_verbose ) {
 181         opal_output(ompi_sharedfp_base_framework.framework_output,
 182                     "sharedfp_lockedfile_read_ordered_begin: Offset returned is %lld\n",offset);
 183     }
 184 
 185     ret = mca_common_ompio_file_iread_at_all ( fh, offset, buf, count, datatype, &fh->f_split_coll_req );
 186     fh->f_split_coll_in_use = true;
 187 
 188 exit:
 189     if ( NULL != buff ) {
 190         free ( buff);
 191     }
 192 
 193     return ret;
 194 }
 195 
 196 
 197 int mca_sharedfp_lockedfile_read_ordered_end(ompio_file_t *fh,
 198                                               void *buf,
 199                                               ompi_status_public_t *status)
 200 {
 201     int ret = OMPI_SUCCESS;
 202     ret = ompi_request_wait ( &fh->f_split_coll_req, status );
 203 
 204     
 205     fh->f_split_coll_in_use = false;
 206     return ret;
 207 }