root/ompi/mca/sharedfp/individual/sharedfp_individual_iwrite.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_sharedfp_individual_iwrite
  2. mca_sharedfp_individual_write_ordered_begin
  3. mca_sharedfp_individual_write_ordered_end

   1 /*
   2  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2017 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2013-2018 University of Houston. All rights reserved.
  13  * Copyright (c) 2015-2018 Research Organization for Information Science
  14  *                         and Technology (RIST). All rights reserved.
  15  * $COPYRIGHT$
  16  *
  17  * Additional copyrights may follow
  18  *
  19  * $HEADER$
  20  */
  21 
  22 
  23 #include "ompi_config.h"
  24 #include "sharedfp_individual.h"
  25 
  26 #include "mpi.h"
  27 #include "ompi/constants.h"
  28 #include "ompi/mca/sharedfp/sharedfp.h"
  29 #include "ompi/mca/sharedfp/base/base.h"
  30 
  31 int mca_sharedfp_individual_iwrite(ompio_file_t *fh,
  32                                    const void *buf,
  33                                    int count,
  34                                    ompi_datatype_t *datatype,
  35                                    MPI_Request * request)
  36 {
  37     int ret = OMPI_SUCCESS;
  38     size_t numofbytes = 0;
  39     OMPI_MPI_OFFSET_TYPE totalbytes = 0;
  40     mca_sharedfp_individual_header_record *headnode = NULL;
  41     struct mca_sharedfp_base_data_t *sh = NULL;
  42 
  43     if(fh->f_sharedfp_data==NULL){
  44         opal_output(ompi_sharedfp_base_framework.framework_output,
  45                     "mca_sharedfp_individual_iwrite: module not initialized \n");
  46         return OMPI_ERROR;
  47     }
  48     mca_sharedfp_individual_usage_counter++;
  49 
  50     /* Calculate the number of bytes of data that needs to be written*/
  51     opal_datatype_type_size ( &datatype->super, &numofbytes);
  52     totalbytes = count * numofbytes;
  53 
  54     sh = fh->f_sharedfp_data;
  55 
  56     headnode = (mca_sharedfp_individual_header_record*)sh->selected_module_data;
  57     if ( NULL == headnode)  {
  58         opal_output (0, "sharedfp_individual_iwrite: headnode is NULL but file is open\n");
  59         return OMPI_ERROR;
  60     }
  61 
  62     /*Insert metadata record into a queue*/
  63     ret = mca_sharedfp_individual_insert_metadata(OMPI_FILE_WRITE_SHARED,totalbytes,sh);
  64 
  65 
  66     /*Write the data into individual file*/
  67     ret = mca_common_ompio_file_iwrite_at ( headnode->datafilehandle, headnode->datafile_offset,
  68                                             buf, count, datatype, request);
  69     if ( OMPI_SUCCESS != ret )  {
  70         opal_output(0,"sharedfp_individual_iwrite: Error while iwriting the datafile \n");
  71         return ret;
  72     }
  73 
  74     /* Update the datafileoffset */
  75     headnode->datafile_offset = headnode->datafile_offset + totalbytes;
  76 
  77     return ret;
  78 }
  79 
  80 int mca_sharedfp_individual_write_ordered_begin(ompio_file_t *fh,
  81                                                 const void *buf,
  82                                                 int count,
  83                                                 struct ompi_datatype_t *datatype)
  84 {
  85     int ret = OMPI_SUCCESS;
  86     int i = 0;
  87     size_t numofbytes = 0;
  88     size_t totalbytes = 0;
  89     OMPI_MPI_OFFSET_TYPE *offbuff=NULL;
  90     OMPI_MPI_OFFSET_TYPE global_offset = 0;
  91     OMPI_MPI_OFFSET_TYPE prev_offset = 0;
  92     OMPI_MPI_OFFSET_TYPE temp = 0, offset = 0;
  93     mca_sharedfp_individual_header_record *headnode = NULL;
  94     struct mca_sharedfp_base_data_t *sh = NULL;
  95 
  96     if(fh->f_sharedfp_data==NULL){
  97         opal_output(ompi_sharedfp_base_framework.framework_output,
  98                     "sharedfp_individual_write_ordered_begin - module not initialized\n");
  99         return OMPI_ERROR;
 100     }
 101 
 102     if ( true == fh->f_split_coll_in_use ) {
 103         opal_output(0, "Only one split collective I/O operation allowed per file handle "
 104                     "at any given point in time!\n");
 105         return MPI_ERR_REQUEST;
 106     }
 107     mca_sharedfp_individual_usage_counter++;
 108 
 109     /*Retrieve the sharedfp data structures*/
 110     sh = fh->f_sharedfp_data;
 111 
 112     /* Calculate the number of bytes of data that needs to be written*/
 113     opal_datatype_type_size ( &datatype->super, &numofbytes);
 114     totalbytes = count * numofbytes;
 115 
 116     headnode = (mca_sharedfp_individual_header_record*)sh->selected_module_data;
 117     if ( NULL == headnode)  {
 118         opal_output (0, "sharedfp_individual_write_ordered_begin: headnode is NULL but file is open\n");
 119         return OMPI_ERROR;
 120     }
 121 
 122     /* Data from all the metadata is combined and written to the main file */
 123     ret  = mca_sharedfp_individual_collaborate_data ( sh, fh );
 124     if ( OMPI_SUCCESS != ret)  {
 125         return ret;
 126     }
 127 
 128     if ( 0 == fh->f_rank )  {
 129         offbuff = (OMPI_MPI_OFFSET_TYPE *)malloc ( sizeof(OMPI_MPI_OFFSET_TYPE) * fh->f_size);
 130         if (NULL == offbuff ) {
 131             return OMPI_ERR_OUT_OF_RESOURCE;
 132         }
 133     }
 134 
 135     /*collect the total bytes to be written*/
 136     ret = fh->f_comm->c_coll->coll_gather ( &totalbytes, 
 137                                             1, 
 138                                             OMPI_OFFSET_DATATYPE,
 139                                             offbuff, 
 140                                             1, 
 141                                             OMPI_OFFSET_DATATYPE, 
 142                                             0,
 143                                             fh->f_comm, 
 144                                             fh->f_comm->c_coll->coll_gather_module );
 145 
 146     if ( OMPI_SUCCESS != ret ) {
 147         opal_output(0,"sharedfp_individual_write_ordered_begin: Error in gatherring offsets \n");
 148         goto exit;
 149     }
 150     
 151     if ( 0 == fh->f_rank ) {
 152         prev_offset = offbuff[0];
 153         offbuff[0]   = sh->global_offset;
 154 
 155         for (i = 1; i < fh->f_size ; i++){
 156             temp = offbuff[i];
 157             offbuff[i] = offbuff[i - 1] + prev_offset;
 158             prev_offset = temp;
 159         }
 160 
 161         for (i = 0; i < fh->f_size; i++){
 162             global_offset = offbuff[fh->f_size - 1] + prev_offset;
 163         }
 164     }
 165 
 166 
 167     /* Scatter the results to the other processes */
 168     ret = fh->f_comm->c_coll->coll_scatter ( offbuff, 
 169                                              1, 
 170                                              OMPI_OFFSET_DATATYPE,
 171                                              &offset, 
 172                                              1, 
 173                                              OMPI_OFFSET_DATATYPE, 
 174                                              0,
 175                                              fh->f_comm, 
 176                                              fh->f_comm->c_coll->coll_scatter_module );
 177     if ( OMPI_SUCCESS != ret )  {
 178         opal_output(0,"sharedfp_individual_write_ordered_begin: Error in scattering offsets \n");
 179         goto exit;
 180     }
 181 
 182     ret = fh->f_comm->c_coll->coll_bcast ( &global_offset, 
 183                                            1, 
 184                                            OMPI_OFFSET_DATATYPE,
 185                                            0, 
 186                                            fh->f_comm, 
 187                                            fh->f_comm->c_coll->coll_bcast_module );
 188     if ( OMPI_SUCCESS != ret )  {
 189         opal_output(0,"sharedfp_individual_write_ordered_begin: Error while bcasting global offset \n");
 190         goto exit;
 191     }
 192 
 193     sh->global_offset = global_offset;
 194 
 195     /*use file_write_at_all to ensure the order*/
 196     ret = mca_common_ompio_file_iwrite_at_all(fh, offset, buf, count, datatype,
 197                                               &fh->f_split_coll_req);
 198     fh->f_split_coll_in_use = true;
 199     if ( OMPI_SUCCESS != ret )  {
 200         opal_output(0,"sharedfp_individual_write_ordered_begin: Error while writing the datafile \n");
 201     }
 202 
 203 exit:
 204     if ( NULL != offbuff ) {
 205         free ( offbuff);
 206     }
 207 
 208     return ret;
 209 }
 210 
 211 int mca_sharedfp_individual_write_ordered_end(ompio_file_t *fh,
 212                                               const void *buf,
 213                                               ompi_status_public_t *status)
 214 {
 215     int ret = OMPI_SUCCESS;
 216     ret = ompi_request_wait ( &fh->f_split_coll_req, status );
 217 
 218     /* remove the flag again */
 219     fh->f_split_coll_in_use = false;
 220     return ret;
 221 }

/* [<][>][^][v][top][bottom][index][help] */