root/ompi/mca/sharedfp/sm/sharedfp_sm_iread.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_sharedfp_sm_iread
  2. mca_sharedfp_sm_read_ordered_begin
  3. mca_sharedfp_sm_read_ordered_end

   1 /*
   2  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2017 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2013-2018 University of Houston. All rights reserved.
  13  * Copyright (c) 2018      Research Organization for Information Science
  14  *                         and Technology (RIST). All rights reserved.
  15  * $COPYRIGHT$
  16  *
  17  * Additional copyrights may follow
  18  *
  19  * $HEADER$
  20  */
  21 
  22 
  23 #include "ompi_config.h"
  24 #include "sharedfp_sm.h"
  25 
  26 #include "mpi.h"
  27 #include "ompi/constants.h"
  28 #include "ompi/mca/sharedfp/sharedfp.h"
  29 #include "ompi/mca/sharedfp/base/base.h"
  30 
  31 int mca_sharedfp_sm_iread(ompio_file_t *fh,
  32                           void *buf,
  33                           int count,
  34                           ompi_datatype_t *datatype,
  35                           MPI_Request * request)
  36 {
  37     int ret = OMPI_SUCCESS;
  38     OMPI_MPI_OFFSET_TYPE offset = 0;
  39     long bytesRequested = 0;
  40     size_t numofBytes;
  41 
  42     if( NULL == fh->f_sharedfp_data){
  43         opal_output(ompi_sharedfp_base_framework.framework_output,
  44                     "sharedfp_sm_iread: module not initialized\n");
  45         return OMPI_ERROR;
  46     }
  47 
  48     /* Calculate the number of bytes to write */
  49     opal_datatype_type_size ( &datatype->super, &numofBytes);
  50     bytesRequested = count * numofBytes;
  51 
  52     if ( mca_sharedfp_sm_verbose ) {
  53         opal_output(ompi_sharedfp_base_framework.framework_output,
  54                     "sharedfp_sm_iread: Bytes Requested is %ld\n",bytesRequested);
  55     }
  56     /*Request the offset to write bytesRequested bytes*/
  57     ret = mca_sharedfp_sm_request_position(fh,bytesRequested,&offset);
  58     offset /= fh->f_etype_size;
  59 
  60     if (  -1 != ret ) {
  61         if ( mca_sharedfp_sm_verbose ) {
  62             opal_output(ompi_sharedfp_base_framework.framework_output,
  63                         "sharedfp_sm_iread: Offset received is %lld\n",offset);
  64         }
  65         /* Read the file */
  66         ret = mca_common_ompio_file_iread_at(fh,offset,buf,count,datatype,request);
  67     }
  68 
  69     return ret;
  70 }
  71 
  72 int mca_sharedfp_sm_read_ordered_begin(ompio_file_t *fh,
  73                                        void *buf,
  74                                        int count,
  75                                        struct ompi_datatype_t *datatype)
  76 {
  77     int ret = OMPI_SUCCESS;
  78     OMPI_MPI_OFFSET_TYPE offset = 0;
  79     long sendBuff = 0;
  80     long *buff=NULL;
  81     long offsetBuff;
  82     OMPI_MPI_OFFSET_TYPE offsetReceived = 0;
  83     long bytesRequested = 0;
  84     int recvcnt = 1, sendcnt = 1;
  85     size_t numofBytes;
  86     int i;
  87 
  88     if ( NULL == fh->f_sharedfp_data){
  89         opal_output(ompi_sharedfp_base_framework.framework_output,
  90                     "sharedfp_sm_read_ordered_begin: module not initialized \n");
  91         return OMPI_ERROR;
  92     }
  93 
  94     if ( true == fh->f_split_coll_in_use ) {
  95         opal_output(0,"Only one split collective I/O operation allowed per file "
  96                     "handle at any given point in time!\n");
  97         return MPI_ERR_REQUEST;
  98     }
  99 
 100     /* Calculate the number of bytes to read*/
 101     opal_datatype_type_size ( &datatype->super, &numofBytes);
 102     sendBuff = count * numofBytes;
 103 
 104 
 105     if ( 0  == fh->f_rank ) {
 106         buff = (long*)malloc(sizeof(long) * fh->f_size);
 107         if (  NULL == buff )
 108             return OMPI_ERR_OUT_OF_RESOURCE;
 109     }
 110 
 111     ret = fh->f_comm->c_coll->coll_gather ( &sendBuff, 
 112                                             sendcnt, 
 113                                             OMPI_OFFSET_DATATYPE,
 114                                             buff, 
 115                                             recvcnt, 
 116                                             OMPI_OFFSET_DATATYPE, 
 117                                             0,
 118                                             fh->f_comm, 
 119                                             fh->f_comm->c_coll->coll_gather_module );
 120     if( OMPI_SUCCESS != ret){
 121         goto exit;
 122     }
 123 
 124     /* All the counts are present now in the recvBuff.
 125     ** The size of recvBuff is sizeof_newComm
 126     */
 127     if (  0 == fh->f_rank ) {
 128         for (i = 0; i < fh->f_size ; i ++) {
 129             bytesRequested += buff[i];
 130             if ( mca_sharedfp_sm_verbose ) {
 131                 opal_output(ompi_sharedfp_base_framework.framework_output,
 132                             "mca_sharedfp_sm_read_ordered_begin: Bytes requested are %ld\n",
 133                             bytesRequested);
 134             }
 135         }
 136 
 137         /* Request the offset to read bytesRequested bytes
 138         ** only the root process needs to do the request,
 139         ** since the root process will then tell the other
 140         ** processes at what offset they should read their
 141         ** share of the data.
 142         */
 143         ret = mca_sharedfp_sm_request_position(fh,bytesRequested,&offsetReceived);
 144         if( OMPI_SUCCESS != ret){
 145             goto exit;
 146         }
 147         if ( mca_sharedfp_sm_verbose ) {
 148             opal_output(ompi_sharedfp_base_framework.framework_output,
 149                         "mca_sharedfp_sm_read_ordered_begin: Offset received is %lld\n",offsetReceived);
 150         }
 151 
 152         buff[0] += offsetReceived;
 153         for (i = 1 ; i < fh->f_size; i++)  {
 154             buff[i] += buff[i-1];
 155         }
 156     }
 157 
 158     /* Scatter the results to the other processes*/
 159     ret = fh->f_comm->c_coll->coll_scatter ( buff, 
 160                                              sendcnt, 
 161                                              OMPI_OFFSET_DATATYPE,
 162                                              &offsetBuff, 
 163                                              recvcnt, 
 164                                              OMPI_OFFSET_DATATYPE, 
 165                                              0,
 166                                              fh->f_comm, 
 167                                              fh->f_comm->c_coll->coll_scatter_module );
 168     if( OMPI_SUCCESS != ret){
 169         goto exit;
 170     }
 171 
 172     /*Each process now has its own individual offset in recvBUFF*/
 173     offset = offsetBuff - sendBuff;
 174     offset /= fh->f_etype_size;
 175 
 176     if ( mca_sharedfp_sm_verbose ) {
 177         opal_output(ompi_sharedfp_base_framework.framework_output,
 178                     "mca_sharedfp_sm_read_ordered_begin: Offset returned is %lld\n",offset);
 179     }
 180 
 181     /* read to the file */
 182     ret = mca_common_ompio_file_iread_at_all(fh,offset,buf,count,datatype,
 183                                              &fh->f_split_coll_req);
 184     fh->f_split_coll_in_use = true;
 185 
 186 exit:
 187     if ( NULL != buff ) {
 188         free ( buff );
 189     }
 190 
 191     return ret;
 192 }
 193 
 194 
 195 int mca_sharedfp_sm_read_ordered_end(ompio_file_t *fh,
 196                                      void *buf,
 197                                      ompi_status_public_t *status)
 198 {
 199     int ret = OMPI_SUCCESS;
 200     ret = ompi_request_wait ( &fh->f_split_coll_req, status );
 201 
 202     /* remove the flag again */
 203     fh->f_split_coll_in_use = false;
 204     return ret;
 205 }

/* [<][>][^][v][top][bottom][index][help] */