root/ompi/mca/sharedfp/lockedfile/sharedfp_lockedfile_iwrite.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_sharedfp_lockedfile_iwrite
  2. mca_sharedfp_lockedfile_write_ordered_begin
  3. mca_sharedfp_lockedfile_write_ordered_end

   1 /*
   2  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2017 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2013-2018 University of Houston. All rights reserved.
  13  * Copyright (c) 2015-2018 Research Organization for Information Science
  14  *                         and Technology (RIST). All rights reserved.
  15  * $COPYRIGHT$
  16  *
  17  * Additional copyrights may follow
  18  *
  19  * $HEADER$
  20  */
  21 
  22 
  23 #include "ompi_config.h"
  24 #include "sharedfp_lockedfile.h"
  25 
  26 #include "mpi.h"
  27 #include "ompi/constants.h"
  28 #include "ompi/mca/sharedfp/sharedfp.h"
  29 #include "ompi/mca/sharedfp/base/base.h"
  30 #include "ompi/mca/common/ompio/common_ompio.h"
  31 
  32 int mca_sharedfp_lockedfile_iwrite(ompio_file_t *fh,
  33                                    const void *buf,
  34                                    int count,
  35                                    ompi_datatype_t *datatype,
  36                                    MPI_Request * request)
  37 {
  38     int ret = OMPI_SUCCESS;
  39     OMPI_MPI_OFFSET_TYPE offset = 0;
  40     long bytesRequested = 0;
  41     size_t numofBytes;
  42     struct mca_sharedfp_base_data_t *sh = NULL;
  43 
  44     if(fh->f_sharedfp_data==NULL){
  45         opal_output(ompi_sharedfp_base_framework.framework_output,
  46                     "sharedfp_lockedfile_iwrite: module not initialized \n");
  47         return OMPI_ERROR;
  48     }
  49 
  50     /*Calculate the number of bytes to write*/
  51     opal_datatype_type_size ( &datatype->super, &numofBytes);
  52     bytesRequested = count * numofBytes;
  53     if ( mca_sharedfp_lockedfile_verbose ) {
  54         opal_output(ompi_sharedfp_base_framework.framework_output,
  55                     "sharedfp_lockedfile_iwrite: Bytes Requested is %ld\n",bytesRequested);
  56     }
  57 
  58     /*Retrieve the shared file data struct*/
  59     sh = fh->f_sharedfp_data;
  60 
  61     /*Request the offset to write bytesRequested bytes*/
  62     ret = mca_sharedfp_lockedfile_request_position(sh,bytesRequested,&offset);
  63     offset /= fh->f_etype_size;
  64 
  65     if ( -1 != ret) {
  66         if ( mca_sharedfp_lockedfile_verbose ) {
  67             opal_output(ompi_sharedfp_base_framework.framework_output,
  68                         "sharedfp_lockedfile_iwrite: Offset received is %lld\n",offset);
  69         }
  70 
  71         /* Write to the file */
  72         ret = mca_common_ompio_file_iwrite_at(fh,offset,buf,count,datatype,request);
  73     }
  74 
  75     return ret;
  76 }
  77 
  78 int mca_sharedfp_lockedfile_write_ordered_begin(ompio_file_t *fh,
  79                                                 const void *buf,
  80                                                 int count,
  81                                                 struct ompi_datatype_t *datatype)
  82 {
  83     int ret = OMPI_SUCCESS;
  84     OMPI_MPI_OFFSET_TYPE offset = 0;
  85     long sendBuff = 0;
  86     long *buff=NULL;
  87     long offsetBuff;
  88     OMPI_MPI_OFFSET_TYPE offsetReceived = 0;
  89     long bytesRequested = 0;
  90     int recvcnt = 1, sendcnt = 1;
  91     size_t numofBytes;
  92     int rank, size, i;
  93     struct mca_sharedfp_base_data_t *sh = NULL;
  94 
  95     if(fh->f_sharedfp_data==NULL){
  96         opal_output(ompi_sharedfp_base_framework.framework_output,
  97                     "sharedfp_lockedfile_write_ordered_begin: module not initialized \n");
  98         return OMPI_ERROR;
  99     }
 100 
 101 
 102     if ( true == fh->f_split_coll_in_use ) {
 103         opal_output(0, "Only one split collective I/O operation allowed per file handle at "
 104                     "any given point in time!\n");
 105         return MPI_ERR_REQUEST;
 106     }
 107 
 108     /*Retrieve the new communicator*/
 109     sh = fh->f_sharedfp_data;
 110 
 111     /* Calculate the number of bytes to write*/
 112     opal_datatype_type_size ( &datatype->super, &numofBytes);
 113     sendBuff = count * numofBytes;
 114 
 115     /* Get the ranks in the communicator */
 116     rank = ompi_comm_rank ( fh->f_comm );
 117     size = ompi_comm_size ( fh->f_comm );
 118 
 119     if ( 0 == rank ) {
 120         buff = (long*) malloc (sizeof(long) * size);
 121         if ( NULL == buff ) {
 122             return OMPI_ERR_OUT_OF_RESOURCE;
 123         }
 124     }
 125 
 126     ret = fh->f_comm->c_coll->coll_gather ( &sendBuff, 
 127                                             sendcnt, 
 128                                             OMPI_OFFSET_DATATYPE, 
 129                                             buff, 
 130                                             recvcnt,
 131                                             OMPI_OFFSET_DATATYPE, 
 132                                             0, 
 133                                             fh->f_comm,
 134                                             fh->f_comm->c_coll->coll_gather_module );
 135     if ( OMPI_SUCCESS != ret ) {
 136         goto exit;
 137     }
 138 
 139     /* All the counts are present now in the recvBuff.
 140        The size of recvBuff is sizeof_newComm
 141      */
 142     if (rank == 0) {
 143         for ( i = 0; i < size ; i ++)  {
 144             bytesRequested += buff[i];
 145             if ( mca_sharedfp_lockedfile_verbose ) {
 146                 opal_output(ompi_sharedfp_base_framework.framework_output,
 147                             "sharedfp_lockedfile_write_ordered_begin: Bytes requested are %ld\n",bytesRequested);
 148             }
 149         }
 150 
 151         /*Request the offset to write bytesRequested bytes
 152           only the root process needs to do the request,
 153           since the root process will then tell the other
 154           processes at what offset they should write their
 155           share of the data.
 156          */
 157         ret = mca_sharedfp_lockedfile_request_position(sh,bytesRequested,&offsetReceived);
 158         if ( OMPI_SUCCESS != ret ){
 159             goto exit;
 160         }
 161         if ( mca_sharedfp_lockedfile_verbose ) {
 162             opal_output(ompi_sharedfp_base_framework.framework_output,
 163                         "sharedfp_lockedfile_write_ordered_begin: Offset received is %lld\n",offsetReceived);
 164         }
 165         buff[0] += offsetReceived;
 166         for (i = 1 ; i < size; i++) {
 167             buff[i] += buff[i-1];
 168         }
 169     }
 170 
 171     /* Scatter the results to the other processes*/
 172     ret = fh->f_comm->c_coll->coll_scatter ( buff, 
 173                                              sendcnt, 
 174                                              OMPI_OFFSET_DATATYPE,
 175                                              &offsetBuff, 
 176                                              recvcnt, 
 177                                              OMPI_OFFSET_DATATYPE, 
 178                                              0,
 179                                              fh->f_comm, 
 180                                              fh->f_comm->c_coll->coll_scatter_module );
 181     if ( OMPI_SUCCESS != ret ) {
 182         goto exit;
 183     }
 184 
 185     /*Each process now has its own individual offset*/
 186     offset = offsetBuff - sendBuff;
 187     offset /= fh->f_etype_size;
 188 
 189     if ( mca_sharedfp_lockedfile_verbose ) {
 190         opal_output(ompi_sharedfp_base_framework.framework_output,
 191                     "sharedfp_lockedfile_write_ordered_begin: Offset returned is %lld\n",offset);
 192      }
 193 
 194     ret = mca_common_ompio_file_iwrite_at_all ( fh, offset, buf, count, datatype, &fh->f_split_coll_req );
 195     fh->f_split_coll_in_use = true;
 196 
 197 exit:
 198     if ( NULL != buff ) {
 199         free ( buff);
 200     }
 201 
 202     return ret;
 203 }
 204 
 205 
 206 
 207 int mca_sharedfp_lockedfile_write_ordered_end(ompio_file_t *fh,
 208                                               const void *buf,
 209                                               ompi_status_public_t *status)
 210 {
 211     int ret = OMPI_SUCCESS;
 212     ret = ompi_request_wait ( &fh->f_split_coll_req, status );
 213 
 214     /* remove the flag again */
 215     fh->f_split_coll_in_use = false;
 216     return ret;
 217 }

/* [<][>][^][v][top][bottom][index][help] */