root/ompi/mca/sharedfp/lockedfile/sharedfp_lockedfile_iread.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_sharedfp_lockedfile_iread
  2. mca_sharedfp_lockedfile_read_ordered_begin
  3. mca_sharedfp_lockedfile_read_ordered_end

   1 /*
   2  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2017 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2013-2018 University of Houston. All rights reserved.
  13  * Copyright (c) 2018      Research Organization for Information Science
  14  *                         and Technology (RIST). All rights reserved.
  15  * $COPYRIGHT$
  16  *
  17  * Additional copyrights may follow
  18  *
  19  * $HEADER$
  20  */
  21 
  22 
  23 #include "ompi_config.h"
  24 #include "sharedfp_lockedfile.h"
  25 
  26 #include "mpi.h"
  27 #include "ompi/constants.h"
  28 #include "ompi/mca/sharedfp/sharedfp.h"
  29 #include "ompi/mca/sharedfp/base/base.h"
  30 #include "ompi/mca/common/ompio/common_ompio.h"
  31 
  32 int mca_sharedfp_lockedfile_iread(ompio_file_t *fh,
  33                                   void *buf,
  34                                   int count,
  35                                   ompi_datatype_t *datatype,
  36                                   MPI_Request * request)
  37 {
  38     int ret = OMPI_SUCCESS;
  39     OMPI_MPI_OFFSET_TYPE offset = 0;
  40     long bytesRequested = 0;
  41     size_t numofBytes;
  42     struct mca_sharedfp_base_data_t *sh = NULL;
  43 
  44     if ( NULL == fh->f_sharedfp_data ) {
  45         opal_output(ompi_sharedfp_base_framework.framework_output,
  46                     "sharedfp_lockedfile_iread: module not initialized\n");
  47         return OMPI_ERROR;
  48     }
  49 
  50     /* Calculate the number of bytes to read */
  51     opal_datatype_type_size ( &datatype->super, &numofBytes);
  52     bytesRequested = count * numofBytes;
  53 
  54     if ( mca_sharedfp_lockedfile_verbose ) {
  55         opal_output(ompi_sharedfp_base_framework.framework_output,
  56                     "sharedfp_lockedfile_iread - Bytes Requested is %ld\n",bytesRequested);
  57     }
  58 
  59 
  60     /*Retrieve the shared file data struct*/
  61     sh = fh->f_sharedfp_data;
  62 
  63     /*Request the offset to write bytesRequested bytes*/
  64     ret = mca_sharedfp_lockedfile_request_position(sh,bytesRequested,&offset);
  65     offset /= fh->f_etype_size;
  66 
  67     if ( -1 != ret )  {
  68         if ( mca_sharedfp_lockedfile_verbose ) {
  69             opal_output(ompi_sharedfp_base_framework.framework_output,
  70                         "sharedfp_lockedfile_iread - Offset received is %lld\n",offset);
  71         }
  72 
  73         /* Read the file */
  74         ret = mca_common_ompio_file_iread_at(fh,offset,buf,count,datatype,request);
  75     }
  76 
  77     return ret;
  78 }
  79 
  80 int mca_sharedfp_lockedfile_read_ordered_begin(ompio_file_t *fh,
  81                                        void *buf,
  82                                        int count,
  83                                        struct ompi_datatype_t *datatype)
  84 {
  85     int ret = OMPI_SUCCESS;
  86     OMPI_MPI_OFFSET_TYPE offset = 0;
  87     long sendBuff = 0;
  88     long *buff=NULL;
  89     long offsetBuff;
  90     OMPI_MPI_OFFSET_TYPE offsetReceived = 0;
  91     long bytesRequested = 0;
  92     int recvcnt = 1, sendcnt = 1;
  93     size_t numofBytes;
  94     int rank, size, i;
  95     struct mca_sharedfp_base_data_t *sh = NULL;
  96 
  97     if(fh->f_sharedfp_data==NULL){
  98         opal_output(ompi_sharedfp_base_framework.framework_output,
  99                     "sharedfp_lockedfile_read_ordered_begin: module not initialized\n");
 100         return OMPI_ERROR;
 101     }
 102 
 103 
 104     if ( true == fh->f_split_coll_in_use ) {
 105         opal_output(ompi_sharedfp_base_framework.framework_output,
 106                     "Only one split collective I/O operation allowed per file handle at any "
 107                     "given point in time!\n");
 108         return MPI_ERR_REQUEST;
 109     }
 110 
 111     /*Retrieve the new communicator*/
 112     sh = fh->f_sharedfp_data;
 113 
 114     /* Calculate the number of bytes to write*/
 115     opal_datatype_type_size ( &datatype->super, &numofBytes);
 116     sendBuff = count * numofBytes;
 117 
 118     /* Get the ranks in the communicator */
 119     rank = ompi_comm_rank ( fh->f_comm );
 120     size = ompi_comm_size ( fh->f_comm );
 121 
 122     if ( 0 == rank ) {
 123         buff = (long*) malloc (sizeof(long) * size);
 124         if ( NULL == buff ) {
 125             return OMPI_ERR_OUT_OF_RESOURCE;
 126         }
 127     }
 128 
 129     ret = fh->f_comm->c_coll->coll_gather ( &sendBuff, sendcnt, OMPI_OFFSET_DATATYPE, buff, recvcnt,
 130                                             OMPI_OFFSET_DATATYPE, 0, fh->f_comm,
 131                                             fh->f_comm->c_coll->coll_gather_module );
 132     if ( OMPI_SUCCESS != ret ) {
 133         goto exit;
 134     }
 135 
 136     /* All the counts are present now in the recvBuff.
 137        The size of recvBuff is sizeof_newComm
 138      */
 139     if (rank == 0) {
 140         for ( i = 0; i < size ; i ++)  {
 141             bytesRequested += buff[i];
 142             if ( mca_sharedfp_lockedfile_verbose ) {
 143                 opal_output(ompi_sharedfp_base_framework.framework_output,
 144                             "sharedfp_lockedfile_read_ordered_begin: Bytes requested are %ld\n",bytesRequested);
 145             }
 146         }
 147 
 148         /*Request the offset to write bytesRequested bytes
 149           only the root process needs to do the request,
 150           since the root process will then tell the other
 151           processes at what offset they should write their
 152           share of the data.
 153          */
 154         ret = mca_sharedfp_lockedfile_request_position(sh,bytesRequested,&offsetReceived);
 155         if ( OMPI_SUCCESS != ret ){
 156             goto exit;
 157         }
 158         if ( mca_sharedfp_lockedfile_verbose ) {
 159             opal_output(ompi_sharedfp_base_framework.framework_output,
 160                         "sharedfp_lockedfile_read_ordered_begin: Offset received is %lld\n",offsetReceived);
 161         }
 162         buff[0] += offsetReceived;
 163         for (i = 1 ; i < size; i++) {
 164             buff[i] += buff[i-1];
 165         }
 166     }
 167 
 168     /* Scatter the results to the other processes*/
 169     ret = fh->f_comm->c_coll->coll_scatter ( buff, sendcnt, OMPI_OFFSET_DATATYPE,
 170                                              &offsetBuff, recvcnt, OMPI_OFFSET_DATATYPE, 0,
 171                                              fh->f_comm, fh->f_comm->c_coll->coll_scatter_module );
 172     if ( OMPI_SUCCESS != ret ) {
 173         goto exit;
 174     }
 175 
 176     /*Each process now has its own individual offset*/
 177     offset = offsetBuff - sendBuff;
 178     offset /= fh->f_etype_size;
 179 
 180     if ( mca_sharedfp_lockedfile_verbose ) {
 181         opal_output(ompi_sharedfp_base_framework.framework_output,
 182                     "sharedfp_lockedfile_read_ordered_begin: Offset returned is %lld\n",offset);
 183     }
 184 
 185     ret = mca_common_ompio_file_iread_at_all ( fh, offset, buf, count, datatype, &fh->f_split_coll_req );
 186     fh->f_split_coll_in_use = true;
 187 
 188 exit:
 189     if ( NULL != buff ) {
 190         free ( buff);
 191     }
 192 
 193     return ret;
 194 }
 195 
 196 
 197 int mca_sharedfp_lockedfile_read_ordered_end(ompio_file_t *fh,
 198                                               void *buf,
 199                                               ompi_status_public_t *status)
 200 {
 201     int ret = OMPI_SUCCESS;
 202     ret = ompi_request_wait ( &fh->f_split_coll_req, status );
 203 
 204     /* remove the flag again */
 205     fh->f_split_coll_in_use = false;
 206     return ret;
 207 }

/* [<][>][^][v][top][bottom][index][help] */