root/ompi/mca/io/romio321/romio/adio/ad_ntfs/ad_ntfs_iwrite.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. ADIOI_NTFS_Strerror
  2. ADIOI_NTFS_aio_poll_fn
  3. ADIOI_NTFS_aio_wait_fn
  4. ADIOI_NTFS_aio_query_fn
  5. ADIOI_NTFS_aio_free_fn
  6. ADIOI_NTFS_IwriteContig
  7. ADIOI_NTFS_aio

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
   2 /* 
   3  *   Copyright (C) 1997 University of Chicago. 
   4  *   See COPYRIGHT notice in top-level directory.
   5  */
   6 
   7 #include "ad_ntfs.h"
   8 
   9 #include "../../mpi-io/mpioimpl.h"
  10 #include "../../mpi-io/mpioprof.h"
  11 #include "mpiu_greq.h"
  12 
  13 static MPIX_Grequest_class ADIOI_NTFS_greq_class = 0;
  14 
  15 /* Fills the input buffer, errMsg, with the error message 
  16    corresponding to error code, error */
  17 void ADIOI_NTFS_Strerror(int error, char *errMsg, int errMsgLen)
  18 {
  19     LPTSTR str;
  20     int num_bytes;
  21     num_bytes = FormatMessage(
  22         FORMAT_MESSAGE_FROM_SYSTEM |
  23         FORMAT_MESSAGE_ALLOCATE_BUFFER,
  24         NULL,
  25         error,
  26         0,
  27         &str,
  28         FORMAT_MESSAGE_MIN_SIZE,
  29     0);
  30     if (num_bytes == 0)
  31     {
  32         strncpy(errMsg, "\0", errMsgLen);
  33     }
  34     else
  35     {
  36         strncpy(errMsg, str, errMsgLen);
  37         LocalFree(str);
  38     }
  39 }
  40 
  41 /* poll for completion of a single outstanding AIO request */
  42 int ADIOI_NTFS_aio_poll_fn(void *extra_state, MPI_Status *status)
  43 {
  44     ADIOI_AIO_Request *aio_req;
  45     int mpi_errno = MPI_SUCCESS;
  46 
  47     /* FIXME: Validate the args -- has it already been done by the 
  48        caller ? */
  49 
  50     aio_req = (ADIOI_AIO_Request *)extra_state;
  51     
  52     /* XXX: test for AIO completion here */
  53     if(!GetOverlappedResult( aio_req->fd, aio_req->lpOvl, 
  54                             &(aio_req->nbytes), FALSE)){
  55         if(GetLastError() == ERROR_IO_INCOMPLETE){
  56         /* IO in progress */
  57             /* TODO: need to diddle with status somehow */
  58         }else{
  59         /* Error occured */
  60         /* TODO: unsure how to handle this */    
  61         }
  62     }else{
  63         mpi_errno = MPI_Grequest_complete(aio_req->req);
  64             if (mpi_errno != MPI_SUCCESS) {
  65                     mpi_errno = MPIO_Err_create_code(MPI_SUCCESS,
  66                                     MPIR_ERR_RECOVERABLE,
  67                                     "ADIOI_NTFS_aio_poll_fn", __LINE__,
  68                                     MPI_ERR_IO, "**mpi_grequest_complete",
  69                                     0);
  70             }
  71     }
  72     return mpi_errno;
  73 }
  74 
  75 
  76 /* Wait for completion of one of the outstanding AIO requests */
  77 int ADIOI_NTFS_aio_wait_fn(int count, void **array_of_states,
  78                 double timeout, MPI_Status *status)
  79 {
  80         int i, mpi_errno = MPI_SUCCESS;
  81         ADIOI_AIO_Request **aio_reqlist;
  82     LPHANDLE lpHandles;
  83     DWORD retObject=0;
  84 
  85     /* FIXME: Validate the args -- has it already been done by the 
  86        caller ? */
  87         aio_reqlist = (ADIOI_AIO_Request **)array_of_states;
  88     lpHandles = (LPHANDLE) ADIOI_Calloc(count, sizeof(HANDLE));
  89     if (lpHandles == NULL)
  90     {
  91         mpi_errno = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
  92             "ADIOI_NTFS_aio_wait_fn", __LINE__, MPI_ERR_IO,
  93             "**nomem", "**nomem %s", "Event handles");
  94         return mpi_errno;
  95     }
  96         /* XXX: set-up arrays of outstanding requests */
  97     for(i=0; i<count; i++){
  98         lpHandles[i] = (aio_reqlist[i])->lpOvl->hEvent;
  99     }
 100 
 101         /* XXX: wait for one request to complete */
 102     /* FIXME: Is the timeout in seconds ? */
 103     timeout = (timeout <= 0) ? INFINITE : (timeout * 1000);
 104     
 105     if((retObject = WaitForMultipleObjects(count, lpHandles,
 106                     FALSE, timeout)) != WAIT_FAILED){
 107         retObject = retObject - WAIT_OBJECT_0;
 108         if(GetOverlappedResult( aio_reqlist[retObject]->fd, 
 109                 aio_reqlist[retObject]->lpOvl, &(aio_reqlist[retObject]->nbytes), 
 110                 FALSE)){
 111                 /* XXX: mark completed requests as 'done'*/
 112             mpi_errno = MPI_Grequest_complete(aio_reqlist[retObject]->req);
 113             if (mpi_errno != MPI_SUCCESS) {
 114                     mpi_errno = MPIO_Err_create_code(MPI_SUCCESS,
 115                                     MPIR_ERR_RECOVERABLE,
 116                                     "ADIOI_NTFS_aio_wait_fn", __LINE__,
 117                                     MPI_ERR_IO, "**mpi_grequest_complete",
 118                                     0);
 119             }
 120         }else{
 121             if(GetLastError() == ERROR_IO_INCOMPLETE){
 122             /* IO in progress */
 123                 /* TODO: need to diddle with status somehow */
 124             }else{
 125             /* Error occured */
 126             /* TODO: not sure how to handle this */    
 127             }
 128         }
 129     }else{
 130         /* TODO: How to handle error while waiting ? */
 131     }
 132     ADIOI_Free(lpHandles);
 133         return mpi_errno;
 134 }
 135 
 136 int ADIOI_NTFS_aio_query_fn(void *extra_state, MPI_Status *status) 
 137 {
 138         ADIOI_AIO_Request *aio_req;
 139 
 140         aio_req = (ADIOI_AIO_Request *)extra_state;
 141 
 142 
 143         MPI_Status_set_elements(status, MPI_BYTE, aio_req->nbytes); 
 144 
 145         /* can never cancel so always true */ 
 146         MPI_Status_set_cancelled(status, 0); 
 147 
 148         /* choose not to return a value for this */ 
 149         status->MPI_SOURCE = MPI_UNDEFINED; 
 150         /* tag has no meaning for this generalized request */ 
 151         status->MPI_TAG = MPI_UNDEFINED; 
 152         /* this generalized request never fails */ 
 153         return MPI_SUCCESS; 
 154 }
 155 
 156 
 157 int ADIOI_NTFS_aio_free_fn(void *extra_state)
 158 {
 159         ADIOI_AIO_Request *aio_req;
 160     /* FIXME: Validate the args -- has it already been done by the 
 161        caller ? */
 162         aio_req = (ADIOI_AIO_Request*)extra_state;
 163     CloseHandle(aio_req->lpOvl->hEvent);
 164     ADIOI_Free(aio_req->lpOvl);
 165     ADIOI_Free(aio_req);
 166         return MPI_SUCCESS;    
 167 }
 168 
 169 void ADIOI_NTFS_IwriteContig(ADIO_File fd, void *buf, int count, 
 170                              MPI_Datatype datatype, int file_ptr_type,
 171                              ADIO_Offset offset, ADIO_Request *request,
 172                              int *error_code)  
 173 {
 174     MPI_Count len, typesize;
 175     int err;
 176     static char myname[] = "ADIOI_NTFS_IwriteContig";
 177 
 178     MPI_Type_size_x(datatype, &typesize);
 179     len = count * typesize;
 180 
 181     if (file_ptr_type == ADIO_INDIVIDUAL)
 182     {
 183         offset = fd->fp_ind;
 184     }
 185     err = ADIOI_NTFS_aio(fd, buf, len, offset, 1, request);
 186     if (file_ptr_type == ADIO_INDIVIDUAL)
 187     {
 188         fd->fp_ind += len;
 189     }
 190 
 191     /* --BEGIN ERROR HANDLING-- */
 192     if (err != MPI_SUCCESS)
 193     {
 194         *error_code = MPIO_Err_create_code(err, MPIR_ERR_RECOVERABLE,
 195                                            myname, __LINE__, MPI_ERR_IO,
 196                                            "**io", 0);
 197         return;
 198     }
 199     /* --END ERROR HANDLING-- */
 200     *error_code = MPI_SUCCESS;
 201 
 202     fd->fp_sys_posn = -1;   /* set it to null. */
 203 }
 204 
 205 
 206 /* This function is for implementation convenience. It is not user-visible.
 207  * If wr==1 write, wr==0 read.
 208  *
 209  * Returns MPI_SUCCESS on success, mpi_errno on failure.
 210  */
 211 int ADIOI_NTFS_aio(ADIO_File fd, void *buf, int len, ADIO_Offset offset,
 212                    int wr, MPI_Request *request)
 213 {
 214     static char myname[] = "ADIOI_NTFS_aio";
 215 
 216     ADIOI_AIO_Request *aio_req;
 217     static DWORD dwNumWritten, dwNumRead;
 218     BOOL ret_val = FALSE;
 219     FDTYPE fd_sys;
 220     int mpi_errno = MPI_SUCCESS;
 221     DWORD err;
 222 
 223     fd_sys = fd->fd_sys;
 224 
 225     aio_req = (ADIOI_AIO_Request *)ADIOI_Calloc(sizeof(ADIOI_AIO_Request), 1);
 226     if (aio_req == NULL)
 227     {
 228         mpi_errno = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
 229             myname, __LINE__, MPI_ERR_IO,
 230             "**nomem", "**nomem %s", "AIO_REQ");
 231         return mpi_errno;
 232     }
 233     aio_req->lpOvl = (LPOVERLAPPED ) ADIOI_Calloc(sizeof(OVERLAPPED), 1);
 234     if (aio_req->lpOvl == NULL)
 235     {
 236         mpi_errno = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
 237             myname, __LINE__, MPI_ERR_IO,
 238             "**nomem", "**nomem %s", "OVERLAPPED");
 239     ADIOI_Free(aio_req);
 240         return mpi_errno;
 241     }
 242     aio_req->lpOvl->hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
 243     if (aio_req->lpOvl->hEvent == NULL)
 244     {
 245     char errMsg[ADIOI_NTFS_ERR_MSG_MAX];
 246         err = GetLastError();
 247     ADIOI_NTFS_Strerror(err, errMsg, ADIOI_NTFS_ERR_MSG_MAX);
 248         mpi_errno = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
 249             myname, __LINE__, MPI_ERR_IO,
 250             "**io", "**io %s", errMsg);
 251     ADIOI_Free(aio_req->lpOvl);
 252     ADIOI_Free(aio_req);
 253         return mpi_errno;
 254     }
 255     aio_req->lpOvl->Offset = DWORDLOW(offset);
 256     aio_req->lpOvl->OffsetHigh = DWORDHIGH(offset);
 257     aio_req->fd = fd_sys;
 258     
 259     /* XXX: initiate async I/O  */
 260     if (wr)
 261     {
 262         ret_val = WriteFile(fd_sys, buf, len, &dwNumWritten, aio_req->lpOvl);
 263     }
 264     else
 265     {
 266         ret_val = ReadFile(fd_sys, buf, len, &dwNumRead, aio_req->lpOvl);
 267     }
 268 
 269     /* --BEGIN ERROR HANDLING-- */
 270     if (ret_val == FALSE) 
 271     {
 272         mpi_errno = GetLastError();
 273         if (mpi_errno != ERROR_IO_PENDING)
 274         {
 275         char errMsg[ADIOI_NTFS_ERR_MSG_MAX];
 276         ADIOI_NTFS_Strerror(mpi_errno, errMsg, ADIOI_NTFS_ERR_MSG_MAX);
 277             mpi_errno = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
 278                 myname, __LINE__, MPI_ERR_IO,
 279                 "**io",
 280                 "**io %s", errMsg);
 281             return mpi_errno;
 282         }
 283         mpi_errno = MPI_SUCCESS;
 284     }
 285     /* --END ERROR HANDLING-- */
 286 
 287     /* XXX: set up generalized request class and request */
 288     if (ADIOI_NTFS_greq_class == 0) {
 289             mpi_errno = MPIX_Grequest_class_create(ADIOI_NTFS_aio_query_fn,
 290                             ADIOI_NTFS_aio_free_fn, MPIU_Greq_cancel_fn,
 291                             ADIOI_NTFS_aio_poll_fn, ADIOI_NTFS_aio_wait_fn,
 292                             &ADIOI_NTFS_greq_class);
 293         if(mpi_errno != MPI_SUCCESS){
 294         /* FIXME: Pass appropriate error code to user */
 295         }
 296     }
 297     mpi_errno = MPIX_Grequest_class_allocate(ADIOI_NTFS_greq_class, aio_req, request);
 298     if(mpi_errno != MPI_SUCCESS){
 299     /* FIXME: Pass appropriate error code to user */
 300     }
 301     memcpy(&(aio_req->req), request, sizeof(MPI_Request));
 302     return mpi_errno;
 303 }

/* [<][>][^][v][top][bottom][index][help] */