root/ompi/mca/sharedfp/individual/sharedfp_individual_write.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_sharedfp_individual_write
  2. mca_sharedfp_individual_write_ordered

   1 /*
   2  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2017 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2013-2018 University of Houston. All rights reserved.
  13  * Copyright (c) 2015-2018 Research Organization for Information Science
  14  *                         and Technology (RIST). All rights reserved.
  15  * $COPYRIGHT$
  16  *
  17  * Additional copyrights may follow
  18  *
  19  * $HEADER$
  20  */
  21 
  22 
  23 #include "ompi_config.h"
  24 #include "sharedfp_individual.h"
  25 
  26 #include "mpi.h"
  27 #include "ompi/constants.h"
  28 #include "ompi/mca/sharedfp/sharedfp.h"
  29 #include "ompi/mca/sharedfp/base/base.h"
  30 
  31 int mca_sharedfp_individual_write (ompio_file_t *fh,
  32                                        const void *buf,
  33                                        int count,
  34                                        struct ompi_datatype_t *datatype,
  35                                        ompi_status_public_t *status)
  36 {
  37     int ret = OMPI_SUCCESS;
  38     size_t numofbytes = 0;
  39     size_t totalbytes = 0;
  40     mca_sharedfp_individual_header_record *headnode = NULL;
  41     struct mca_sharedfp_base_data_t *sh = NULL;
  42 
  43     if ( NULL == fh->f_sharedfp_data ) {
  44         opal_output(ompi_sharedfp_base_framework.framework_output,
  45                     "sharedfp_individual_write: module not initialized \n");
  46         return OMPI_ERROR;
  47     }
  48     mca_sharedfp_individual_usage_counter++;
  49 
  50     /* Calculate the number of bytes of data that need to be written*/
  51     opal_datatype_type_size ( &datatype->super, &numofbytes);
  52     totalbytes = count * numofbytes;
  53 
  54     /*Retrieve data structure for shared file pointer operations*/
  55     sh = fh->f_sharedfp_data;
  56     headnode = (mca_sharedfp_individual_header_record*)sh->selected_module_data;
  57 
  58     if (headnode)  {
  59         /*Insert metadata record into a queue*/
  60         mca_sharedfp_individual_insert_metadata(OMPI_FILE_WRITE_SHARED, totalbytes, sh);
  61 
  62         /*Write the data into individual file*/
  63         ret = mca_common_ompio_file_write_at ( headnode->datafilehandle,
  64                                                headnode->datafile_offset,
  65                                                buf, count, datatype, status);
  66         if ( OMPI_SUCCESS != ret ) {
  67             opal_output(0,"mca_sharedfp_individual_write: Error while writing the datafile \n");
  68             return -1;
  69         }
  70 
  71         /* Update the datafileoffset*/
  72         headnode->datafile_offset = headnode->datafile_offset + totalbytes;
  73     }
  74 
  75     return ret;
  76 }
  77 
  78 int mca_sharedfp_individual_write_ordered (ompio_file_t *fh,
  79                                            const void *buf,
  80                                            int count,
  81                                            struct ompi_datatype_t *datatype,
  82                                            ompi_status_public_t *status)
  83 {
  84     int ret = OMPI_SUCCESS;
  85     int i = 0;
  86     size_t numofbytes = 0;
  87     size_t totalbytes = 0;
  88     OMPI_MPI_OFFSET_TYPE *offbuff=NULL;
  89     OMPI_MPI_OFFSET_TYPE global_offset = 0;
  90     OMPI_MPI_OFFSET_TYPE prev_offset = 0;
  91     OMPI_MPI_OFFSET_TYPE temp = 0, offset = 0;
  92     mca_sharedfp_individual_header_record *headnode = NULL;
  93     struct mca_sharedfp_base_data_t *sh = NULL;
  94 
  95 
  96     if(fh->f_sharedfp_data==NULL){
  97         opal_output(ompi_sharedfp_base_framework.framework_output,
  98                     "sharedfp_individual_write_ordered: module not initialized \n");
  99         return OMPI_ERROR;
 100     }
 101 
 102     mca_sharedfp_individual_usage_counter++;
 103 
 104     /*Retrieve the sharedfp data structures*/
 105     sh = fh->f_sharedfp_data;
 106 
 107     /* Calculate the number of bytes of data that needs to be written*/
 108     opal_datatype_type_size ( &datatype->super, &numofbytes);
 109     totalbytes = count * numofbytes;
 110 
 111     headnode = (mca_sharedfp_individual_header_record*)sh->selected_module_data;
 112     if ( NULL == headnode)  {
 113         opal_output (0, "sharedfp_individual_write_ordered: headnode is NULL but file is open\n");
 114         return OMPI_ERROR;
 115     }
 116 
 117     /* Data from all the metadata is combined and written to the main file */
 118     ret  = mca_sharedfp_individual_collaborate_data ( sh, fh );
 119     if ( OMPI_SUCCESS != ret)  {
 120         return ret;
 121     }
 122 
 123     if ( 0 == fh->f_rank )  {
 124         offbuff = (OMPI_MPI_OFFSET_TYPE *)malloc ( sizeof(OMPI_MPI_OFFSET_TYPE) * fh->f_size);
 125         if (NULL == offbuff ) {
 126             return OMPI_ERR_OUT_OF_RESOURCE;
 127         }
 128     }
 129 
 130     /*collect the total bytes to be written*/
 131     ret = fh->f_comm->c_coll->coll_gather ( &totalbytes, 
 132                                             1, 
 133                                             OMPI_OFFSET_DATATYPE,
 134                                             offbuff, 
 135                                             1, 
 136                                             OMPI_OFFSET_DATATYPE, 
 137                                             0,
 138                                             fh->f_comm, 
 139                                             fh->f_comm->c_coll->coll_gather_module );
 140 
 141     if ( OMPI_SUCCESS != ret ) {
 142         opal_output(0,"sharedfp_individual_write_ordered: Error in gathering offsets \n");
 143         goto exit;
 144     }
 145 
 146     if ( 0 == fh->f_rank ) {
 147         prev_offset = offbuff[0];
 148         offbuff[0]   = sh->global_offset;
 149 
 150         for (i = 1; i < fh->f_size ; i++){
 151             temp = offbuff[i];
 152             offbuff[i] = offbuff[i - 1] + prev_offset;
 153             prev_offset = temp;
 154         }
 155 
 156         for (i = 0; i < fh->f_size; i++){
 157             global_offset = offbuff[fh->f_size - 1] + prev_offset;
 158         }
 159     }
 160 
 161 
 162     /* Scatter the results to the other processes */
 163     ret = fh->f_comm->c_coll->coll_scatter ( offbuff, 
 164                                              1, 
 165                                              OMPI_OFFSET_DATATYPE,
 166                                              &offset, 
 167                                              1, 
 168                                              OMPI_OFFSET_DATATYPE, 
 169                                              0,
 170                                              fh->f_comm, 
 171                                              fh->f_comm->c_coll->coll_scatter_module );
 172     if ( OMPI_SUCCESS != ret )  {
 173         opal_output(0,"sharedfp_individual_write_ordered: Error in scattering offsets \n");
 174         goto exit;
 175     }
 176 
 177     ret = fh->f_comm->c_coll->coll_bcast ( &global_offset, 
 178                                            1, 
 179                                            OMPI_OFFSET_DATATYPE,
 180                                            0, 
 181                                            fh->f_comm, 
 182                                            fh->f_comm->c_coll->coll_bcast_module );
 183     if ( OMPI_SUCCESS != ret )  {
 184         opal_output(0,"sharedfp_individual_write_ordered: Error while bcasting global offset \n");
 185         goto exit;
 186     }
 187 
 188     sh->global_offset = global_offset;
 189 
 190     /*use file_write_at_all to ensure the order*/
 191     ret = mca_common_ompio_file_write_at_all(fh, offset, buf,count,datatype,status);
 192     if ( OMPI_SUCCESS != ret )  {
 193         opal_output(0,"sharedfp_individual_write_ordered: Error while writing the datafile \n");
 194     }
 195 
 196 exit:
 197     if ( NULL != offbuff ) {
 198         free ( offbuff);
 199     }
 200 
 201     return ret;
 202 }

/* [<][>][^][v][top][bottom][index][help] */