This source file includes following definitions.
- mca_sharedfp_lockedfile_iwrite
- mca_sharedfp_lockedfile_write_ordered_begin
- mca_sharedfp_lockedfile_write_ordered_end
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 
  17 
  18 
  19 
  20 
  21 
  22 
  23 #include "ompi_config.h"
  24 #include "sharedfp_lockedfile.h"
  25 
  26 #include "mpi.h"
  27 #include "ompi/constants.h"
  28 #include "ompi/mca/sharedfp/sharedfp.h"
  29 #include "ompi/mca/sharedfp/base/base.h"
  30 #include "ompi/mca/common/ompio/common_ompio.h"
  31 
  32 int mca_sharedfp_lockedfile_iwrite(ompio_file_t *fh,
  33                                    const void *buf,
  34                                    int count,
  35                                    ompi_datatype_t *datatype,
  36                                    MPI_Request * request)
  37 {
  38     int ret = OMPI_SUCCESS;
  39     OMPI_MPI_OFFSET_TYPE offset = 0;
  40     long bytesRequested = 0;
  41     size_t numofBytes;
  42     struct mca_sharedfp_base_data_t *sh = NULL;
  43 
  44     if(fh->f_sharedfp_data==NULL){
  45         opal_output(ompi_sharedfp_base_framework.framework_output,
  46                     "sharedfp_lockedfile_iwrite: module not initialized \n");
  47         return OMPI_ERROR;
  48     }
  49 
  50     
  51     opal_datatype_type_size ( &datatype->super, &numofBytes);
  52     bytesRequested = count * numofBytes;
  53     if ( mca_sharedfp_lockedfile_verbose ) {
  54         opal_output(ompi_sharedfp_base_framework.framework_output,
  55                     "sharedfp_lockedfile_iwrite: Bytes Requested is %ld\n",bytesRequested);
  56     }
  57 
  58     
  59     sh = fh->f_sharedfp_data;
  60 
  61     
  62     ret = mca_sharedfp_lockedfile_request_position(sh,bytesRequested,&offset);
  63     offset /= fh->f_etype_size;
  64 
  65     if ( -1 != ret) {
  66         if ( mca_sharedfp_lockedfile_verbose ) {
  67             opal_output(ompi_sharedfp_base_framework.framework_output,
  68                         "sharedfp_lockedfile_iwrite: Offset received is %lld\n",offset);
  69         }
  70 
  71         
  72         ret = mca_common_ompio_file_iwrite_at(fh,offset,buf,count,datatype,request);
  73     }
  74 
  75     return ret;
  76 }
  77 
  78 int mca_sharedfp_lockedfile_write_ordered_begin(ompio_file_t *fh,
  79                                                 const void *buf,
  80                                                 int count,
  81                                                 struct ompi_datatype_t *datatype)
  82 {
  83     int ret = OMPI_SUCCESS;
  84     OMPI_MPI_OFFSET_TYPE offset = 0;
  85     long sendBuff = 0;
  86     long *buff=NULL;
  87     long offsetBuff;
  88     OMPI_MPI_OFFSET_TYPE offsetReceived = 0;
  89     long bytesRequested = 0;
  90     int recvcnt = 1, sendcnt = 1;
  91     size_t numofBytes;
  92     int rank, size, i;
  93     struct mca_sharedfp_base_data_t *sh = NULL;
  94 
  95     if(fh->f_sharedfp_data==NULL){
  96         opal_output(ompi_sharedfp_base_framework.framework_output,
  97                     "sharedfp_lockedfile_write_ordered_begin: module not initialized \n");
  98         return OMPI_ERROR;
  99     }
 100 
 101 
 102     if ( true == fh->f_split_coll_in_use ) {
 103         opal_output(0, "Only one split collective I/O operation allowed per file handle at "
 104                     "any given point in time!\n");
 105         return MPI_ERR_REQUEST;
 106     }
 107 
 108     
 109     sh = fh->f_sharedfp_data;
 110 
 111     
 112     opal_datatype_type_size ( &datatype->super, &numofBytes);
 113     sendBuff = count * numofBytes;
 114 
 115     
 116     rank = ompi_comm_rank ( fh->f_comm );
 117     size = ompi_comm_size ( fh->f_comm );
 118 
 119     if ( 0 == rank ) {
 120         buff = (long*) malloc (sizeof(long) * size);
 121         if ( NULL == buff ) {
 122             return OMPI_ERR_OUT_OF_RESOURCE;
 123         }
 124     }
 125 
 126     ret = fh->f_comm->c_coll->coll_gather ( &sendBuff, 
 127                                             sendcnt, 
 128                                             OMPI_OFFSET_DATATYPE, 
 129                                             buff, 
 130                                             recvcnt,
 131                                             OMPI_OFFSET_DATATYPE, 
 132                                             0, 
 133                                             fh->f_comm,
 134                                             fh->f_comm->c_coll->coll_gather_module );
 135     if ( OMPI_SUCCESS != ret ) {
 136         goto exit;
 137     }
 138 
 139     
 140 
 141 
 142     if (rank == 0) {
 143         for ( i = 0; i < size ; i ++)  {
 144             bytesRequested += buff[i];
 145             if ( mca_sharedfp_lockedfile_verbose ) {
 146                 opal_output(ompi_sharedfp_base_framework.framework_output,
 147                             "sharedfp_lockedfile_write_ordered_begin: Bytes requested are %ld\n",bytesRequested);
 148             }
 149         }
 150 
 151         
 152 
 153 
 154 
 155 
 156 
 157         ret = mca_sharedfp_lockedfile_request_position(sh,bytesRequested,&offsetReceived);
 158         if ( OMPI_SUCCESS != ret ){
 159             goto exit;
 160         }
 161         if ( mca_sharedfp_lockedfile_verbose ) {
 162             opal_output(ompi_sharedfp_base_framework.framework_output,
 163                         "sharedfp_lockedfile_write_ordered_begin: Offset received is %lld\n",offsetReceived);
 164         }
 165         buff[0] += offsetReceived;
 166         for (i = 1 ; i < size; i++) {
 167             buff[i] += buff[i-1];
 168         }
 169     }
 170 
 171     
 172     ret = fh->f_comm->c_coll->coll_scatter ( buff, 
 173                                              sendcnt, 
 174                                              OMPI_OFFSET_DATATYPE,
 175                                              &offsetBuff, 
 176                                              recvcnt, 
 177                                              OMPI_OFFSET_DATATYPE, 
 178                                              0,
 179                                              fh->f_comm, 
 180                                              fh->f_comm->c_coll->coll_scatter_module );
 181     if ( OMPI_SUCCESS != ret ) {
 182         goto exit;
 183     }
 184 
 185     
 186     offset = offsetBuff - sendBuff;
 187     offset /= fh->f_etype_size;
 188 
 189     if ( mca_sharedfp_lockedfile_verbose ) {
 190         opal_output(ompi_sharedfp_base_framework.framework_output,
 191                     "sharedfp_lockedfile_write_ordered_begin: Offset returned is %lld\n",offset);
 192      }
 193 
 194     ret = mca_common_ompio_file_iwrite_at_all ( fh, offset, buf, count, datatype, &fh->f_split_coll_req );
 195     fh->f_split_coll_in_use = true;
 196 
 197 exit:
 198     if ( NULL != buff ) {
 199         free ( buff);
 200     }
 201 
 202     return ret;
 203 }
 204 
 205 
 206 
 207 int mca_sharedfp_lockedfile_write_ordered_end(ompio_file_t *fh,
 208                                               const void *buf,
 209                                               ompi_status_public_t *status)
 210 {
 211     int ret = OMPI_SUCCESS;
 212     ret = ompi_request_wait ( &fh->f_split_coll_req, status );
 213 
 214     
 215     fh->f_split_coll_in_use = false;
 216     return ret;
 217 }