root/ompi/mca/fbtl/posix/fbtl_posix.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_fbtl_posix_component_init_query
  2. mca_fbtl_posix_component_file_query
  3. mca_fbtl_posix_component_file_unquery
  4. mca_fbtl_posix_module_init
  5. mca_fbtl_posix_module_finalize
  6. mca_fbtl_posix_progress
  7. mca_fbtl_posix_request_free

   1 /*
   2  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2006 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2008-2015 University of Houston. All rights reserved.
  13  * Copyright (c) 2018      Cisco Systems, Inc.  All rights reserved
  14  * Copyright (c) 2018      Research Organization for Information Science
  15  *                         and Technology (RIST). All rights reserved.
  16  * $COPYRIGHT$
  17  *
  18  * Additional copyrights may follow
  19  *
  20  * $HEADER$
  21  *
  22  * These symbols are in a file by themselves to provide nice linker
  23  * semantics. Since linkers generally pull in symbols by object fules,
  24  * keeping these symbols as the only symbols in this file prevents
  25  * utility programs such as "ompi_info" from having to import entire
  26  * modules just to query their version and parameters
  27  */
  28 
  29 #include "ompi_config.h"
  30 #include "mpi.h"
  31 
  32 #include <unistd.h>
  33 #include <sys/uio.h>
  34 #if HAVE_AIO_H
  35 #include <aio.h>
  36 #endif
  37 
  38 int fbtl_posix_max_aio_active_reqs=2048;
  39 
  40 #include "ompi/mca/fbtl/fbtl.h"
  41 #include "ompi/mca/fbtl/posix/fbtl_posix.h"
  42 
  43 /*
  44  * *******************************************************************
  45  * ************************ actions structure ************************
  46  * *******************************************************************
  47  */
  48 static mca_fbtl_base_module_1_0_0_t posix =  {
  49     mca_fbtl_posix_module_init,     /* initalise after being selected */
  50     mca_fbtl_posix_module_finalize, /* close a module on a communicator */
  51     mca_fbtl_posix_preadv,          /* blocking read */
  52 #if defined (FBTL_POSIX_HAVE_AIO)
  53     mca_fbtl_posix_ipreadv,         /* non-blocking read*/
  54 #else
  55     NULL,                           /* non-blocking read */
  56 #endif
  57     mca_fbtl_posix_pwritev,         /* blocking write */
  58 #if defined (FBTL_POSIX_HAVE_AIO)
  59     mca_fbtl_posix_ipwritev,        /* non-blocking write */
  60     mca_fbtl_posix_progress,        /* module specific progress */
  61     mca_fbtl_posix_request_free     /* free module specific data items on the request */
  62 #else
  63     NULL,                           /* non-blocking write */
  64     NULL,                           /* module specific progress */
  65     NULL                            /* free module specific data items on the request */
  66 #endif
  67 };
  68 /*
  69  * *******************************************************************
  70  * ************************* structure ends **************************
  71  * *******************************************************************
  72  */
  73 
  74 int mca_fbtl_posix_component_init_query(bool enable_progress_threads,
  75                                       bool enable_mpi_threads) {
  76     /* Nothing to do */
  77 
  78    return OMPI_SUCCESS;
  79 }
  80 
  81 struct mca_fbtl_base_module_1_0_0_t *
  82 mca_fbtl_posix_component_file_query (ompio_file_t *fh, int *priority) {
  83    *priority = mca_fbtl_posix_priority;
  84 
  85    if (UFS == fh->f_fstype) {
  86        if (*priority < 50) {
  87            *priority = 50;
  88        }
  89    }
  90 
  91    return &posix;
  92 }
  93 
  94 int mca_fbtl_posix_component_file_unquery (ompio_file_t *file) {
  95    /* This function might be needed for some purposes later. for now it
  96     * does not have anything to do since there are no steps which need
  97     * to be undone if this module is not selected */
  98 
  99    return OMPI_SUCCESS;
 100 }
 101 
 102 int mca_fbtl_posix_module_init (ompio_file_t *file) {
 103 
 104 #if defined (FBTL_POSIX_HAVE_AIO)
 105     long val = sysconf(_SC_AIO_MAX);
 106     if ( -1 != val ) {
 107         fbtl_posix_max_aio_active_reqs = (int)val;
 108     }
 109 #endif
 110     return OMPI_SUCCESS;
 111 }
 112 
 113 
 114 int mca_fbtl_posix_module_finalize (ompio_file_t *file) {
 115     return OMPI_SUCCESS;
 116 }
 117 
 118 bool mca_fbtl_posix_progress ( mca_ompio_request_t *req)
 119 {
 120     bool ret=false;
 121 #if defined (FBTL_POSIX_HAVE_AIO)
 122     int i=0, lcount=0, ret_code=0;
 123     mca_fbtl_posix_request_data_t *data=(mca_fbtl_posix_request_data_t *)req->req_data;
 124     off_t start_offset, end_offset, total_length;
 125 
 126     for (i=data->aio_first_active_req; i < data->aio_last_active_req; i++ ) {
 127         if ( EINPROGRESS == data->aio_req_status[i] ) {
 128             data->aio_req_status[i] = aio_error ( &data->aio_reqs[i]);
 129             if ( 0 == data->aio_req_status[i]){
 130                 data->aio_open_reqs--;
 131                 lcount++;
 132                 /* assuming right now that aio_return will return
 133                 ** the number of bytes written/read and not an error code,
 134                 ** since aio_error should have returned an error in that
 135                 ** case and not 0 ( which means request is complete)
 136                 */
 137                 data->aio_total_len += aio_return (&data->aio_reqs[i]);
 138             }
 139             else if ( EINPROGRESS == data->aio_req_status[i]){
 140                 /* not yet done */
 141                 continue;
 142             }
 143             else {
 144                 /* an error occured. Mark the request done, but
 145                    set an error code in the status */
 146                 req->req_ompi.req_status.MPI_ERROR = OMPI_ERROR;
 147                 req->req_ompi.req_status._ucount = data->aio_total_len;
 148                 ret = true;
 149                 break;
 150             }
 151         }
 152         else {
 153             lcount++;
 154         }
 155     }
 156 #if 0
 157     printf("lcount=%d open_reqs=%d\n", lcount, data->aio_open_reqs );
 158 #endif
 159 
 160     if ( (lcount == data->aio_req_chunks) && (0 != data->aio_open_reqs )) {
 161         /* release the lock of the previous operations */
 162         mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
 163         
 164         /* post the next batch of operations */
 165         data->aio_first_active_req = data->aio_last_active_req;
 166         if ( (data->aio_req_count-data->aio_last_active_req) > data->aio_req_chunks ) {
 167             data->aio_last_active_req += data->aio_req_chunks;
 168         }
 169         else {
 170             data->aio_last_active_req = data->aio_req_count;
 171         }
 172 
 173         start_offset = data->aio_reqs[data->aio_first_active_req].aio_offset;
 174         end_offset   = data->aio_reqs[data->aio_last_active_req-1].aio_offset + data->aio_reqs[data->aio_last_active_req-1].aio_nbytes;
 175         total_length = (end_offset - start_offset);
 176 
 177         if ( FBTL_POSIX_READ == data->aio_req_type ) {
 178             ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_RDLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
 179         }
 180         else if ( FBTL_POSIX_WRITE == data->aio_req_type ) {
 181             ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_WRLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
 182         }
 183         if ( 0 < ret_code ) {
 184             opal_output(1, "mca_fbtl_posix_progress: error in mca_fbtl_posix_lock() %d", ret_code);
 185             /* Just in case some part of the lock actually succeeded. */
 186             mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
 187             return OMPI_ERROR;
 188         }
 189         
 190         for ( i=data->aio_first_active_req; i< data->aio_last_active_req; i++ ) {
 191             if ( FBTL_POSIX_READ == data->aio_req_type ) {
 192                 if (-1 == aio_read(&data->aio_reqs[i])) {
 193                     opal_output(1, "mca_fbtl_posix_progress: error in aio_read()");
 194                     mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
 195                     return OMPI_ERROR;
 196                 }
 197             }
 198             else if ( FBTL_POSIX_WRITE == data->aio_req_type ) {
 199                 if (-1 == aio_write(&data->aio_reqs[i])) {
 200                     opal_output(1, "mca_fbtl_posix_progress: error in aio_write()");
 201                     mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
 202                     return OMPI_ERROR;
 203                 }
 204             }
 205         }
 206 #if 0
 207         printf("posting new batch: first=%d last=%d\n", data->aio_first_active_req, data->aio_last_active_req );
 208 #endif
 209     }
 210 
 211     if ( 0 == data->aio_open_reqs ) {
 212         /* all pending operations are finished for this request */
 213         req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
 214         req->req_ompi.req_status._ucount = data->aio_total_len;
 215         mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
 216         ret = true;
 217     }
 218 #endif
 219     return ret;
 220 }
 221 
 222 void mca_fbtl_posix_request_free ( mca_ompio_request_t *req)
 223 {
 224 #if defined (FBTL_POSIX_HAVE_AIO)
 225     /* Free the fbtl specific data structures */
 226     mca_fbtl_posix_request_data_t *data=(mca_fbtl_posix_request_data_t *)req->req_data;
 227     if (NULL != data ) {
 228         mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
 229         if ( NULL != data->aio_reqs ) {
 230             free ( data->aio_reqs);
 231         }
 232         if ( NULL != data->aio_req_status ) {
 233             free ( data->aio_req_status );
 234         }
 235         free ( data );
 236         req->req_data = NULL;
 237     }
 238 #endif
 239   return;
 240 }

/* [<][>][^][v][top][bottom][index][help] */