root/ompi/mca/fbtl/ime/fbtl_ime.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_fbtl_ime_component_init_query
  2. mca_fbtl_ime_component_file_query
  3. mca_fbtl_ime_component_file_unquery
  4. mca_fbtl_ime_module_init
  5. mca_fbtl_ime_module_finalize
  6. mca_fbtl_ime_progress
  7. mca_fbtl_ime_request_free
  8. mca_fbtl_ime_complete_cb

   1 /*
   2  * Copyright (c) 2018      DataDirect Networks. All rights reserved.
   3  * $COPYRIGHT$
   4  *
   5  * Additional copyrights may follow
   6  *
   7  * $HEADER$
   8  */
   9 
  10 #include "ompi_config.h"
  11 #include "mpi.h"
  12 
  13 #include "ompi/mca/fbtl/fbtl.h"
  14 #include "ompi/mca/fbtl/ime/fbtl_ime.h"
  15 
  16 /*
  17  * *******************************************************************
  18  * ************************ actions structure ************************
  19  * *******************************************************************
  20  */
  21 static mca_fbtl_base_module_1_0_0_t ime =  {
  22     mca_fbtl_ime_module_init,     /* initalise after being selected */
  23     mca_fbtl_ime_module_finalize, /* close a module on a communicator */
  24     mca_fbtl_ime_preadv,          /* blocking read */
  25     mca_fbtl_ime_ipreadv,         /* non-blocking read*/
  26     mca_fbtl_ime_pwritev,         /* blocking write */
  27     mca_fbtl_ime_ipwritev,        /* non-blocking write */
  28     mca_fbtl_ime_progress,        /* module specific progress */
  29     mca_fbtl_ime_request_free     /* free module specific data items on the request */
  30 };
  31 /*
  32  * *******************************************************************
  33  * ************************* structure ends **************************
  34  * *******************************************************************
  35  */
  36 
  37 int mca_fbtl_ime_component_init_query(bool enable_progress_threads,
  38                                       bool enable_mpi_threads)
  39 {
  40     /* Nothing to do */
  41    return OMPI_SUCCESS;
  42 }
  43 
  44 struct mca_fbtl_base_module_1_0_0_t *
  45 mca_fbtl_ime_component_file_query (ompio_file_t *fh, int *priority)
  46 {
  47    *priority = mca_fbtl_ime_priority;
  48 
  49     /* Do the same as the FS component:
  50        Only return a non-null component if IME
  51        can handle the IO operations. */
  52     if (IME == fh->f_fstype) {
  53         if (*priority < FBTL_IME_INCREASED_PRIORITY) {
  54             *priority = FBTL_IME_INCREASED_PRIORITY;
  55         }
  56         return &ime;
  57     }
  58 
  59    return NULL;
  60 }
  61 
  62 int mca_fbtl_ime_component_file_unquery (ompio_file_t *file)
  63 {
  64    /* This function might be needed for some purposes later. for now it
  65     * does not have anything to do since there are no steps which need
  66     * to be undone if this module is not selected */
  67 
  68    return OMPI_SUCCESS;
  69 }
  70 
  71 int mca_fbtl_ime_module_init (ompio_file_t *file)
  72 {
  73     return OMPI_SUCCESS;
  74 }
  75 
  76 
  77 int mca_fbtl_ime_module_finalize (ompio_file_t *file)
  78 {
  79     return OMPI_SUCCESS;
  80 }
  81 
  82 bool mca_fbtl_ime_progress ( mca_ompio_request_t *req)
  83 {
  84     int i=0, lcount=0, ret_code=0;
  85     mca_fbtl_ime_request_data_t *data=(mca_fbtl_ime_request_data_t *)req->req_data;
  86 
  87     /* Go through all the requests in the current batch to check
  88      * if they have finished. */
  89     for (i=data->aio_first_active_req; i < data->aio_last_active_req; i++ ) {
  90         if ( data->aio_req_status[i] == FBTL_IME_REQ_CLOSED ) {
  91             lcount++;
  92         }
  93         else if ( data->aio_req_status[i] >= 0 ) {
  94             /* request has finished */
  95             data->aio_open_reqs--;
  96             lcount++;
  97             data->aio_total_len += data->aio_req_status[i];
  98             data->aio_req_status[i] = FBTL_IME_REQ_CLOSED;
  99         }
 100         else if ( data->aio_req_status[i] == FBTL_IME_REQ_ERROR ) {
 101             /* an error occured. */
 102             data->aio_open_reqs--;
 103             lcount++;
 104             data->aio_req_fail_count++;
 105             data->aio_req_status[i] = FBTL_IME_REQ_CLOSED;
 106         }
 107         else {
 108             /* not yet done */
 109         }
 110     }
 111 
 112     /* In case the current batch of requests terminated, exit if an error
 113      * happened for any request.
 114      */
 115     if ( data->aio_req_fail_count > 0 &&
 116          lcount == data->aio_last_active_req - data->aio_first_active_req ) {
 117         goto error_exit;
 118     }
 119 
 120     /* In case some requests are pending, and no error happened in any of the
 121      * previous requests, then the next batch of operations should be prepared.
 122      */
 123     if ( (lcount == data->aio_req_chunks) && (0 != data->aio_open_reqs) ) {
 124 
 125         /* prepare the next batch of operations */
 126         data->aio_first_active_req = data->aio_last_active_req;
 127         if ( (data->aio_req_count-data->aio_last_active_req) > data->aio_req_chunks ) {
 128             data->aio_last_active_req += data->aio_req_chunks;
 129         }
 130         else {
 131             data->aio_last_active_req = data->aio_req_count;
 132         }
 133 
 134         /* Send the requests. */
 135         for ( i=data->aio_first_active_req; i< data->aio_last_active_req; i++ ) {
 136             if ( FBTL_IME_READ == data->aio_req_type &&
 137                  ime_native_aio_read(&data->aio_reqs[i]) < 0 ) {
 138                 opal_output(1, "mca_fbtl_ime_progress: error in aio_read()");
 139                 data->aio_req_status[i] = FBTL_IME_REQ_ERROR;
 140                 data->aio_last_active_req = i + 1;
 141                 break;
 142             }
 143             else if ( FBTL_IME_WRITE == data->aio_req_type &&
 144                       ime_native_aio_write(&data->aio_reqs[i]) < 0 ) {
 145                 opal_output(1, "mca_fbtl_ime_progress: error in aio_write()");
 146                 data->aio_req_status[i] = FBTL_IME_REQ_ERROR;
 147                 data->aio_last_active_req = i + 1;
 148                 break;
 149             }
 150         }
 151     }
 152 
 153     if ( 0 == data->aio_open_reqs ) {
 154         /* all pending operations are finished for this request */
 155         req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
 156         req->req_ompi.req_status._ucount = data->aio_total_len;
 157         return true;
 158     }
 159     return false;
 160 
 161 error_exit:
 162     req->req_ompi.req_status.MPI_ERROR = OMPI_ERROR;
 163     req->req_ompi.req_status._ucount = data->aio_total_len;
 164     return true;
 165 }
 166 
 167 void mca_fbtl_ime_request_free ( mca_ompio_request_t *req)
 168 {
 169     /* Free the fbtl specific data structures */
 170     mca_fbtl_ime_request_data_t *data=(mca_fbtl_ime_request_data_t *)req->req_data;
 171     if (NULL != data) {
 172         free (data->allocated_data);
 173         free (data);
 174         req->req_data = NULL;
 175     }
 176 }
 177 
 178 void mca_fbtl_ime_complete_cb  (struct ime_aiocb *aiocb, int err, ssize_t bytes)
 179 {
 180     ssize_t *req_status = (ssize_t *) aiocb->user_context;
 181     *req_status = err == 0 ? bytes : FBTL_IME_REQ_ERROR;

/* [<][>][^][v][top][bottom][index][help] */