root/ompi/mca/sharedfp/sm/sharedfp_sm_iwrite.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_sharedfp_sm_iwrite
  2. mca_sharedfp_sm_write_ordered_begin
  3. mca_sharedfp_sm_write_ordered_end

   1 /*
   2  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2017 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2013-2018 University of Houston. All rights reserved.
  13  * Copyright (c) 2015-2018 Research Organization for Information Science
  14  *                         and Technology (RIST). All rights reserved.
  15  * $COPYRIGHT$
  16  *
  17  * Additional copyrights may follow
  18  *
  19  * $HEADER$
  20  */
  21 
  22 
  23 #include "ompi_config.h"
  24 #include "sharedfp_sm.h"
  25 
  26 #include "mpi.h"
  27 #include "ompi/constants.h"
  28 #include "ompi/mca/sharedfp/sharedfp.h"
  29 #include "ompi/mca/sharedfp/base/base.h"
  30 
  31 int mca_sharedfp_sm_iwrite(ompio_file_t *fh,
  32                            const void *buf,
  33                            int count,
  34                            ompi_datatype_t *datatype,
  35                            MPI_Request * request)
  36 {
  37      int ret = OMPI_SUCCESS;
  38      OMPI_MPI_OFFSET_TYPE offset = 0;
  39      long bytesRequested = 0;
  40      size_t numofBytes;
  41 
  42      if( NULL == fh->f_sharedfp_data){
  43          opal_output(ompi_sharedfp_base_framework.framework_output,
  44                      "sharedfp_sm_iwrite - module not initialized\n");
  45          return OMPI_ERROR;
  46      }
  47 
  48     /* Calculate the number of bytes to write */
  49      opal_datatype_type_size ( &datatype->super, &numofBytes);
  50      bytesRequested = count * numofBytes;
  51 
  52      if ( mca_sharedfp_sm_verbose ) {
  53          opal_output(ompi_sharedfp_base_framework.framework_output,
  54                      "sharedfp_sm_iwrite: Bytes Requested is %ld\n",bytesRequested);
  55      }
  56     /* Request the offset to write bytesRequested bytes */
  57      ret = mca_sharedfp_sm_request_position(fh,bytesRequested,&offset);
  58      offset /= fh->f_etype_size;
  59 
  60      if ( -1 != ret ) {
  61         if ( mca_sharedfp_sm_verbose ) {
  62             opal_output(ompi_sharedfp_base_framework.framework_output,
  63                         "sharedfp_sm_iwrite: Offset received is %lld\n",offset);
  64         }
  65         /* Write to the file */
  66         ret = mca_common_ompio_file_iwrite_at(fh,offset,buf,count,datatype,request);
  67     }
  68 
  69     return ret;
  70 
  71 }
  72 
  73 int mca_sharedfp_sm_write_ordered_begin(ompio_file_t *fh,
  74                                         const void *buf,
  75                                         int count,
  76                                         struct ompi_datatype_t *datatype)
  77 {
  78     int ret = OMPI_SUCCESS;
  79     OMPI_MPI_OFFSET_TYPE offset = 0;
  80     long sendBuff = 0;
  81     long *buff=NULL;
  82     long offsetBuff;
  83     OMPI_MPI_OFFSET_TYPE offsetReceived = 0;
  84     long bytesRequested = 0;
  85     int recvcnt = 1, sendcnt = 1;
  86     size_t numofBytes;
  87     int i;
  88 
  89     if ( NULL == fh->f_sharedfp_data){
  90         opal_output(ompi_sharedfp_base_framework.framework_output,
  91                     "sharedfp_sm_write_ordered_begin: module not initialized\n");
  92         return OMPI_ERROR;
  93     }
  94 
  95     if ( true == fh->f_split_coll_in_use ) {
  96         opal_output(0, "Only one split collective I/O operation allowed per file "
  97                     "handle at any given point in time!\n");
  98         return MPI_ERR_REQUEST;
  99     }
 100 
 101     /* Calculate the number of bytes to read*/
 102     opal_datatype_type_size ( &datatype->super, &numofBytes);
 103     sendBuff = count * numofBytes;
 104 
 105     if ( 0  == fh->f_rank ) {
 106         buff = (long*)malloc(sizeof(long) * fh->f_size);
 107         if (  NULL == buff )
 108             return OMPI_ERR_OUT_OF_RESOURCE;
 109     }
 110 
 111     ret = fh->f_comm->c_coll->coll_gather ( &sendBuff, sendcnt, OMPI_OFFSET_DATATYPE,
 112                                             buff, recvcnt, OMPI_OFFSET_DATATYPE, 0,
 113                                             fh->f_comm, fh->f_comm->c_coll->coll_gather_module );
 114     if( OMPI_SUCCESS != ret){
 115         goto exit;
 116     }
 117 
 118     /* All the counts are present now in the recvBuff.
 119     ** The size of recvBuff is sizeof_newComm
 120     */
 121     if (  0 == fh->f_rank ) {
 122         for (i = 0; i < fh->f_size ; i ++) {
 123             bytesRequested += buff[i];
 124             if ( mca_sharedfp_sm_verbose ) {
 125                 opal_output(ompi_sharedfp_base_framework.framework_output,
 126                             "mca_sharedfp_sm_write_ordered_begin: Bytes requested are %ld\n",
 127                             bytesRequested);
 128             }
 129         }
 130 
 131         /* Request the offset to read bytesRequested bytes
 132         ** only the root process needs to do the request,
 133         ** since the root process will then tell the other
 134         ** processes at what offset they should read their
 135         ** share of the data.
 136         */
 137         ret = mca_sharedfp_sm_request_position(fh,bytesRequested,&offsetReceived);
 138         if( OMPI_SUCCESS != ret){
 139             goto exit;
 140         }
 141         if ( mca_sharedfp_sm_verbose ) {
 142             opal_output(ompi_sharedfp_base_framework.framework_output,
 143                         "mca_sharedfp_sm_write_ordered_begin: Offset received is %lld\n",offsetReceived);
 144         }
 145 
 146         buff[0] += offsetReceived;
 147         for (i = 1 ; i < fh->f_size; i++)  {
 148             buff[i] += buff[i-1];
 149         }
 150     }
 151 
 152     /* Scatter the results to the other processes*/
 153     ret = fh->f_comm->c_coll->coll_scatter ( buff, sendcnt, OMPI_OFFSET_DATATYPE,
 154                                              &offsetBuff, recvcnt, OMPI_OFFSET_DATATYPE, 0,
 155                                              fh->f_comm, fh->f_comm->c_coll->coll_scatter_module );
 156     if( OMPI_SUCCESS != ret){
 157         goto exit;
 158     }
 159 
 160     /*Each process now has its own individual offset in recvBUFF*/
 161     offset = offsetBuff - sendBuff;
 162     offset /= fh->f_etype_size;
 163 
 164     if ( mca_sharedfp_sm_verbose ) {
 165         opal_output(ompi_sharedfp_base_framework.framework_output,
 166                     "mca_sharedfp_sm_write_ordered_begin: Offset returned is %lld\n",offset);
 167     }
 168 
 169     /* read to the file */
 170     ret = mca_common_ompio_file_iwrite_at_all(fh,offset,buf,count,datatype,
 171                                            &fh->f_split_coll_req);
 172     fh->f_split_coll_in_use = true;
 173 
 174 exit:
 175     if ( NULL != buff ) {
 176         free ( buff );
 177     }
 178 
 179     return ret;
 180 }
 181 
 182 
 183 int mca_sharedfp_sm_write_ordered_end(ompio_file_t *fh,
 184                                       const void *buf,
 185                                       ompi_status_public_t *status)
 186 {
 187     int ret = OMPI_SUCCESS;
 188     ret = ompi_request_wait ( &fh->f_split_coll_req, status );
 189 
 190     /* remove the flag again */
 191     fh->f_split_coll_in_use = false;
 192     return ret;
 193 }

/* [<][>][^][v][top][bottom][index][help] */