root/ompi/mca/fbtl/ime/fbtl_ime_nonblocking_op.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_fbtl_ime_ipreadv
  2. mca_fbtl_ime_ipwritev
  3. mca_fbtl_ime_nonblocking_op

   1 /*
   2  * Copyright (c) 2018      DataDirect Networks. All rights reserved.
   3  * $COPYRIGHT$
   4  *
   5  * Additional copyrights may follow
   6  *
   7  * $HEADER$
   8  */
   9 
  10 #include "ompi_config.h"
  11 #include "fbtl_ime.h"
  12 
  13 #include "mpi.h"
  14 #include "ompi/constants.h"
  15 #include "ompi/mca/fbtl/fbtl.h"
  16 
  17 static ssize_t  mca_fbtl_ime_nonblocking_op (ompio_file_t *fh,
  18                  ompi_request_t *request, int io_op);
  19 
  20 ssize_t mca_fbtl_ime_ipreadv (ompio_file_t *fh, ompi_request_t *request)
  21 {
  22     return mca_fbtl_ime_nonblocking_op(fh, request, FBTL_IME_READ);
  23 }
  24 ssize_t  mca_fbtl_ime_ipwritev (ompio_file_t *fh, ompi_request_t *request)
  25 {
  26     return mca_fbtl_ime_nonblocking_op(fh, request, FBTL_IME_WRITE);
  27 }
  28 
  29 static ssize_t mca_fbtl_ime_nonblocking_op (ompio_file_t *fh,
  30                                             ompi_request_t *request, int io_op)
  31 {
  32     mca_fbtl_ime_request_data_t *data;
  33     mca_ompio_request_t *req = (mca_ompio_request_t *) request;
  34     int i=0, req_index = 0, ret;
  35 
  36     data = (mca_fbtl_ime_request_data_t *) malloc ( sizeof (mca_fbtl_ime_request_data_t));
  37     if ( NULL == data ) {
  38         opal_output (1,"could not allocate memory\n");
  39         return OMPI_ERR_OUT_OF_RESOURCE;
  40     }
  41 
  42     /* We might allocate too much memory here because we don't know
  43        how many IME requests will be necessary.
  44 
  45        We will use all the iovec "slots" in the array,
  46        but maybe not all the request and request status slots.
  47        That is, because an IME request can handle several iovecs,
  48        not just one. */
  49     data->allocated_data = (void*) malloc( fh->f_num_of_io_entries *
  50         (sizeof(struct iovec) +
  51          sizeof(struct ime_aiocb) +
  52          sizeof(ssize_t)) );
  53     if (NULL == data->allocated_data) {
  54         opal_output(1, "OUT OF MEMORY\n");
  55         free(data);
  56         return OMPI_ERR_OUT_OF_RESOURCE;
  57     }
  58     data->aio_iovecs = (struct iovec *) data->allocated_data;
  59     data->aio_reqs = (struct ime_aiocb *) (data->aio_iovecs +
  60         fh->f_num_of_io_entries);
  61     data->aio_req_status = (ssize_t *) (data->aio_reqs +
  62         fh->f_num_of_io_entries);
  63 
  64     /* Fill some attributes of the OMPIO request data */
  65     data->aio_req_type  = io_op;    /* The correctness of io_op will be checked later */
  66     data->aio_req_chunks = mca_fbtl_ime_aio_reqs_max;
  67     data->aio_req_fail_count = 0;
  68     data->aio_total_len = 0;
  69     data->aio_fh = fh;
  70     data->aio_reqs[0].iovcnt = 0;
  71 
  72     /* Go through all IO entries and try to aggregate them. */
  73     for ( i=0; i<fh->f_num_of_io_entries; i++ ) {
  74         data->aio_iovecs[i].iov_base = fh->f_io_array[i].memory_address;
  75         data->aio_iovecs[i].iov_len = fh->f_io_array[i].length;
  76 
  77         /* If the processed iovec will be the first in our ime_aiocb request,
  78            then we initialize this aio request for IME. */
  79         if (data->aio_reqs[req_index].iovcnt == 0) {
  80             data->aio_reqs[req_index].iov = &data->aio_iovecs[i];
  81             data->aio_reqs[req_index].iovcnt = 1;
  82             data->aio_reqs[req_index].file_offset  = (off_t)
  83                 fh->f_io_array[i].offset;
  84             data->aio_reqs[req_index].fd  = fh->fd;
  85             data->aio_reqs[req_index].complete_cb = &mca_fbtl_ime_complete_cb;
  86             data->aio_reqs[req_index].user_context = (intptr_t)
  87                 &data->aio_req_status[req_index];
  88             data->aio_req_status[req_index] = FBTL_IME_IN_PROGRESS;
  89         }
  90 
  91         /* Here we check if the next iovec will be appended to
  92            the current ime_aiocb request.
  93            ie: if data is contiguous 
  94                AND we don't exceed the advised number of iovecs for IME
  95            In that case, the next iovec will be appended to the IME req. */
  96         if (i+1 != fh->f_num_of_io_entries &&
  97             ((OMPI_MPI_OFFSET_TYPE)(intptr_t)fh->f_io_array[i].offset +
  98              (ptrdiff_t)fh->f_io_array[i].length) ==
  99               (OMPI_MPI_OFFSET_TYPE)(intptr_t)fh->f_io_array[i+1].offset &&
 100             data->aio_reqs[req_index].iovcnt < mca_fbtl_ime_iov_max ) {
 101             data->aio_reqs[req_index].iovcnt++;
 102         }
 103 
 104         /* Otherwise, we need to create a new request
 105            (except if there is no next iovec to process) */
 106         else if ( i+1 != fh->f_num_of_io_entries ) {
 107             req_index++;
 108             data->aio_reqs[req_index].iovcnt = 0;
 109         }
 110     }
 111 
 112     /* Fill the missing attributes of the OMPI request */
 113     data->aio_req_count = req_index + 1;
 114     data->aio_open_reqs = req_index + 1;
 115     data->aio_first_active_req = 0;
 116     if ( data->aio_req_count > data->aio_req_chunks ) {
 117         data->aio_last_active_req = data->aio_req_chunks;
 118     }
 119     else {
 120         data->aio_last_active_req = data->aio_req_count;
 121     }
 122 
 123     /* Actually start the requests (or at least the first batch).
 124        In case an error happened when one request is started, we
 125        don't send the next ones and mark the failing request as
 126        the last active one. Finally we exit as if no error happened,
 127        because some other requests might have already been started
 128        and they need to be finalized properly (via the progress function).
 129      */
 130     for (i=0; i < data->aio_last_active_req; i++) {
 131         switch(io_op) {
 132 
 133         case FBTL_IME_READ:
 134             ret = ime_native_aio_read(&data->aio_reqs[i]);
 135             if (ret < 0) {
 136                 opal_output(1, "mca_fbtl_ime_nonblocking_op: error in "
 137                                "ime_native_aio_read() error ret=%d  %s",
 138                                ret, strerror(errno));
 139                 data->aio_req_status[i] = FBTL_IME_REQ_ERROR;
 140                 data->aio_last_active_req = i + 1;
 141                 goto standard_exit;
 142             }
 143             break;
 144 
 145         case FBTL_IME_WRITE:
 146             ret = ime_native_aio_write(&data->aio_reqs[i]);
 147             if (ret < 0) {
 148                 opal_output(1, "mca_fbtl_ime_nonblocking_op: error in "
 149                                "ime_native_aio_write() error ret=%d  %s",
 150                                ret, strerror(errno));
 151                 data->aio_req_status[i] = FBTL_IME_REQ_ERROR;
 152                 data->aio_last_active_req = i + 1;
 153                 goto standard_exit;
 154             }
 155             break;
 156 
 157         default:
 158             opal_output(1, "mca_fbtl_ime_nonblocking_op: an unsupported "
 159                            "IO operation was requested. io_op=%d", io_op);
 160             goto error_exit;
 161         }
 162     }
 163 
 164 standard_exit:
 165     req->req_data = data;
 166     req->req_progress_fn = mca_fbtl_ime_progress;
 167     req->req_free_fn     = mca_fbtl_ime_request_free;
 168 
 169     return OMPI_SUCCESS;
 170 
 171 error_exit:
 172     free(data->allocated_data);
 173     free(data);
 174     return OMPI_ERROR;
 175 }

/* [<][>][^][v][top][bottom][index][help] */