root/ompi/mca/sharedfp/lockedfile/sharedfp_lockedfile_read.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_sharedfp_lockedfile_read
  2. mca_sharedfp_lockedfile_read_ordered

   1 /*
   2  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2017 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2013-2018 University of Houston. All rights reserved.
  13  * Copyright (c) 2018      Research Organization for Information Science
  14  *                         and Technology (RIST). All rights reserved.
  15  * $COPYRIGHT$
  16  *
  17  * Additional copyrights may follow
  18  *
  19  * $HEADER$
  20  */
  21 
  22 
  23 #include "ompi_config.h"
  24 #include "sharedfp_lockedfile.h"
  25 
  26 #include "mpi.h"
  27 #include "ompi/constants.h"
  28 #include "ompi/mca/sharedfp/sharedfp.h"
  29 #include "ompi/mca/sharedfp/base/base.h"
  30 
  31 int mca_sharedfp_lockedfile_read ( ompio_file_t *fh,
  32                                    void *buf, int count, MPI_Datatype datatype, MPI_Status *status)
  33 {
  34     int ret = OMPI_SUCCESS;
  35     OMPI_MPI_OFFSET_TYPE offset = 0;
  36     long bytesRequested = 0;
  37     size_t numofBytes;
  38     struct mca_sharedfp_base_data_t *sh = NULL;
  39 
  40     if ( fh->f_sharedfp_data == NULL ) {
  41         if ( mca_sharedfp_lockedfile_verbose ) {
  42             opal_output(ompi_sharedfp_base_framework.framework_output,
  43                         "sharedfp_lockedfile_read: module not initialized\n");
  44         }
  45         return OMPI_ERROR;
  46     }
  47 
  48     /* Calculate the number of bytes to read */
  49     opal_datatype_type_size ( &datatype->super, &numofBytes);
  50     bytesRequested = count * numofBytes;
  51 
  52     if ( mca_sharedfp_lockedfile_verbose ) {
  53         opal_output(ompi_sharedfp_base_framework.framework_output,
  54                     "sharedfp_lockedfile_read: Bytes Requested is %ld\n",bytesRequested);
  55     }
  56 
  57     /*Retrieve the shared file data struct*/
  58     sh = fh->f_sharedfp_data;
  59 
  60     /*Request the offset to write bytesRequested bytes*/
  61     ret = mca_sharedfp_lockedfile_request_position(sh,bytesRequested,&offset);
  62     offset /= fh->f_etype_size;
  63 
  64     if (-1 != ret )  {
  65         if ( mca_sharedfp_lockedfile_verbose ) {
  66             opal_output(ompi_sharedfp_base_framework.framework_output,
  67                         "sharedfp_lockedfile_read: Offset received is %lld\n",offset);
  68         }
  69 
  70         /* Read the file */
  71         ret = mca_common_ompio_file_read_at(fh,offset,buf,count,datatype,status);
  72     }
  73 
  74     return ret;
  75 }
  76 
  77 int mca_sharedfp_lockedfile_read_ordered (ompio_file_t *fh,
  78                                            void *buf,
  79                                            int count,
  80                                            struct ompi_datatype_t *datatype,
  81                                            ompi_status_public_t *status)
  82 {
  83     int ret = OMPI_SUCCESS;
  84     OMPI_MPI_OFFSET_TYPE offset = 0;
  85     long sendBuff = 0;
  86     long *buff=NULL;
  87     long offsetBuff;
  88     OMPI_MPI_OFFSET_TYPE offsetReceived = 0;
  89     long bytesRequested = 0;
  90     int recvcnt = 1, sendcnt = 1;
  91     size_t numofBytes;
  92     int rank, size, i;
  93     struct mca_sharedfp_base_data_t *sh = NULL;
  94 
  95     if ( fh->f_sharedfp_data == NULL){
  96         opal_output(ompi_sharedfp_base_framework.framework_output,
  97                     "sharedfp_lockedfile_read_ordered: module not initialized\n");
  98         return OMPI_ERROR;
  99     }
 100 
 101     /*Retrieve the new communicator*/
 102     sh = fh->f_sharedfp_data;
 103 
 104     /* Calculate the number of bytes to read*/
 105     opal_datatype_type_size ( &datatype->super, &numofBytes );
 106     sendBuff = count * numofBytes;
 107 
 108     /* Get the ranks in the communicator */
 109     rank = ompi_comm_rank ( fh->f_comm );
 110     size = ompi_comm_size ( fh->f_comm );
 111 
 112     if ( 0 == rank ) {
 113         buff = (long*)malloc(sizeof(long) * size);
 114         if ( NULL == buff )
 115             return OMPI_ERR_OUT_OF_RESOURCE;
 116     }
 117 
 118     ret = fh->f_comm->c_coll->coll_gather ( &sendBuff, sendcnt, OMPI_OFFSET_DATATYPE,
 119                                             buff, recvcnt, OMPI_OFFSET_DATATYPE, 0,
 120                                             fh->f_comm, fh->f_comm->c_coll->coll_gather_module );
 121     if ( OMPI_SUCCESS != ret ) {
 122         goto exit;
 123     }
 124 
 125     /* All the counts are present now in the recvBuff.
 126        The size of recvBuff is sizeof_newComm
 127      */
 128     if ( 0 == rank ) {
 129         for (i = 0; i < size ; i ++)  {
 130             bytesRequested += buff[i];
 131             if ( mca_sharedfp_lockedfile_verbose ) {
 132                 opal_output(ompi_sharedfp_base_framework.framework_output,
 133                             "sharedfp_lockedfile_read_ordered: Bytes requested are %ld\n",bytesRequested);
 134             }
 135         }
 136 
 137         /*Request the offset to read bytesRequested bytes
 138           only the root process needs to do the request,
 139           since the root process will then tell the other
 140           processes at what offset they should read their
 141           share of the data.
 142          */
 143         ret = mca_sharedfp_lockedfile_request_position(sh,bytesRequested,&offsetReceived);
 144         if( OMPI_SUCCESS != ret ){
 145             goto exit;
 146         }
 147         if ( mca_sharedfp_lockedfile_verbose ) {
 148             opal_output(ompi_sharedfp_base_framework.framework_output,
 149                         "sharedfp_lockedfile_read_ordered: Offset received is %lld\n",offsetReceived);
 150         }
 151         buff[0] += offsetReceived;
 152 
 153         for (i = 1 ; i < size; i++) {
 154             buff[i] += buff[i-1];
 155         }
 156     }
 157 
 158     /* Scatter the results to the other processes*/
 159     ret = fh->f_comm->c_coll->coll_scatter ( buff, sendcnt, OMPI_OFFSET_DATATYPE,
 160                                              &offsetBuff, recvcnt, OMPI_OFFSET_DATATYPE, 0,
 161                                              fh->f_comm, fh->f_comm->c_coll->coll_scatter_module );
 162 
 163     /*Each process now has its own individual offset in recvBUFF*/
 164     offset = offsetBuff - sendBuff;
 165     offset /= fh->f_etype_size;
 166 
 167     if ( mca_sharedfp_lockedfile_verbose ) {
 168         opal_output(ompi_sharedfp_base_framework.framework_output,
 169                     "sharedfp_lockedfile_read_ordered: Offset returned is %lld\n",offset);
 170     }
 171 
 172     /* read to the file */
 173     ret = mca_common_ompio_file_read_at_all(fh,offset,buf,count,datatype,status);
 174 
 175 exit:
 176     if ( NULL != buff ) {
 177         free ( buff );
 178     }
 179 
 180     return ret;
 181 }

/* [<][>][^][v][top][bottom][index][help] */