This source file includes following definitions.
- mca_fbtl_posix_component_init_query
- mca_fbtl_posix_component_file_query
- mca_fbtl_posix_component_file_unquery
- mca_fbtl_posix_module_init
- mca_fbtl_posix_module_finalize
- mca_fbtl_posix_progress
- mca_fbtl_posix_request_free
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 
  17 
  18 
  19 
  20 
  21 
  22 
  23 
  24 
  25 
  26 
  27 
  28 
  29 #include "ompi_config.h"
  30 #include "mpi.h"
  31 
  32 #include <unistd.h>
  33 #include <sys/uio.h>
  34 #if HAVE_AIO_H
  35 #include <aio.h>
  36 #endif
  37 
  38 int fbtl_posix_max_aio_active_reqs=2048;
  39 
  40 #include "ompi/mca/fbtl/fbtl.h"
  41 #include "ompi/mca/fbtl/posix/fbtl_posix.h"
  42 
  43 
  44 
  45 
  46 
  47 
  48 static mca_fbtl_base_module_1_0_0_t posix =  {
  49     mca_fbtl_posix_module_init,     
  50     mca_fbtl_posix_module_finalize, 
  51     mca_fbtl_posix_preadv,          
  52 #if defined (FBTL_POSIX_HAVE_AIO)
  53     mca_fbtl_posix_ipreadv,         
  54 #else
  55     NULL,                           
  56 #endif
  57     mca_fbtl_posix_pwritev,         
  58 #if defined (FBTL_POSIX_HAVE_AIO)
  59     mca_fbtl_posix_ipwritev,        
  60     mca_fbtl_posix_progress,        
  61     mca_fbtl_posix_request_free     
  62 #else
  63     NULL,                           
  64     NULL,                           
  65     NULL                            
  66 #endif
  67 };
  68 
  69 
  70 
  71 
  72 
  73 
  74 int mca_fbtl_posix_component_init_query(bool enable_progress_threads,
  75                                       bool enable_mpi_threads) {
  76     
  77 
  78    return OMPI_SUCCESS;
  79 }
  80 
  81 struct mca_fbtl_base_module_1_0_0_t *
  82 mca_fbtl_posix_component_file_query (ompio_file_t *fh, int *priority) {
  83    *priority = mca_fbtl_posix_priority;
  84 
  85    if (UFS == fh->f_fstype) {
  86        if (*priority < 50) {
  87            *priority = 50;
  88        }
  89    }
  90 
  91    return &posix;
  92 }
  93 
  94 int mca_fbtl_posix_component_file_unquery (ompio_file_t *file) {
  95    
  96 
  97 
  98 
  99    return OMPI_SUCCESS;
 100 }
 101 
 102 int mca_fbtl_posix_module_init (ompio_file_t *file) {
 103 
 104 #if defined (FBTL_POSIX_HAVE_AIO)
 105     long val = sysconf(_SC_AIO_MAX);
 106     if ( -1 != val ) {
 107         fbtl_posix_max_aio_active_reqs = (int)val;
 108     }
 109 #endif
 110     return OMPI_SUCCESS;
 111 }
 112 
 113 
 114 int mca_fbtl_posix_module_finalize (ompio_file_t *file) {
 115     return OMPI_SUCCESS;
 116 }
 117 
 118 bool mca_fbtl_posix_progress ( mca_ompio_request_t *req)
 119 {
 120     bool ret=false;
 121 #if defined (FBTL_POSIX_HAVE_AIO)
 122     int i=0, lcount=0, ret_code=0;
 123     mca_fbtl_posix_request_data_t *data=(mca_fbtl_posix_request_data_t *)req->req_data;
 124     off_t start_offset, end_offset, total_length;
 125 
 126     for (i=data->aio_first_active_req; i < data->aio_last_active_req; i++ ) {
 127         if ( EINPROGRESS == data->aio_req_status[i] ) {
 128             data->aio_req_status[i] = aio_error ( &data->aio_reqs[i]);
 129             if ( 0 == data->aio_req_status[i]){
 130                 data->aio_open_reqs--;
 131                 lcount++;
 132                 
 133 
 134 
 135 
 136 
 137                 data->aio_total_len += aio_return (&data->aio_reqs[i]);
 138             }
 139             else if ( EINPROGRESS == data->aio_req_status[i]){
 140                 
 141                 continue;
 142             }
 143             else {
 144                 
 145 
 146                 req->req_ompi.req_status.MPI_ERROR = OMPI_ERROR;
 147                 req->req_ompi.req_status._ucount = data->aio_total_len;
 148                 ret = true;
 149                 break;
 150             }
 151         }
 152         else {
 153             lcount++;
 154         }
 155     }
 156 #if 0
 157     printf("lcount=%d open_reqs=%d\n", lcount, data->aio_open_reqs );
 158 #endif
 159 
 160     if ( (lcount == data->aio_req_chunks) && (0 != data->aio_open_reqs )) {
 161         
 162         mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
 163         
 164         
 165         data->aio_first_active_req = data->aio_last_active_req;
 166         if ( (data->aio_req_count-data->aio_last_active_req) > data->aio_req_chunks ) {
 167             data->aio_last_active_req += data->aio_req_chunks;
 168         }
 169         else {
 170             data->aio_last_active_req = data->aio_req_count;
 171         }
 172 
 173         start_offset = data->aio_reqs[data->aio_first_active_req].aio_offset;
 174         end_offset   = data->aio_reqs[data->aio_last_active_req-1].aio_offset + data->aio_reqs[data->aio_last_active_req-1].aio_nbytes;
 175         total_length = (end_offset - start_offset);
 176 
 177         if ( FBTL_POSIX_READ == data->aio_req_type ) {
 178             ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_RDLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
 179         }
 180         else if ( FBTL_POSIX_WRITE == data->aio_req_type ) {
 181             ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_WRLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
 182         }
 183         if ( 0 < ret_code ) {
 184             opal_output(1, "mca_fbtl_posix_progress: error in mca_fbtl_posix_lock() %d", ret_code);
 185             
 186             mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
 187             return OMPI_ERROR;
 188         }
 189         
 190         for ( i=data->aio_first_active_req; i< data->aio_last_active_req; i++ ) {
 191             if ( FBTL_POSIX_READ == data->aio_req_type ) {
 192                 if (-1 == aio_read(&data->aio_reqs[i])) {
 193                     opal_output(1, "mca_fbtl_posix_progress: error in aio_read()");
 194                     mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
 195                     return OMPI_ERROR;
 196                 }
 197             }
 198             else if ( FBTL_POSIX_WRITE == data->aio_req_type ) {
 199                 if (-1 == aio_write(&data->aio_reqs[i])) {
 200                     opal_output(1, "mca_fbtl_posix_progress: error in aio_write()");
 201                     mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
 202                     return OMPI_ERROR;
 203                 }
 204             }
 205         }
 206 #if 0
 207         printf("posting new batch: first=%d last=%d\n", data->aio_first_active_req, data->aio_last_active_req );
 208 #endif
 209     }
 210 
 211     if ( 0 == data->aio_open_reqs ) {
 212         
 213         req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
 214         req->req_ompi.req_status._ucount = data->aio_total_len;
 215         mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
 216         ret = true;
 217     }
 218 #endif
 219     return ret;
 220 }
 221 
 222 void mca_fbtl_posix_request_free ( mca_ompio_request_t *req)
 223 {
 224 #if defined (FBTL_POSIX_HAVE_AIO)
 225     
 226     mca_fbtl_posix_request_data_t *data=(mca_fbtl_posix_request_data_t *)req->req_data;
 227     if (NULL != data ) {
 228         mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
 229         if ( NULL != data->aio_reqs ) {
 230             free ( data->aio_reqs);
 231         }
 232         if ( NULL != data->aio_req_status ) {
 233             free ( data->aio_req_status );
 234         }
 235         free ( data );
 236         req->req_data = NULL;
 237     }
 238 #endif
 239   return;
 240 }