root/ompi/mca/io/romio321/romio/adio/common/ad_iwrite.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. ADIOI_GEN_IwriteContig
  2. ADIOI_GEN_aio
  3. ADIOI_GEN_IwriteStrided
  4. ADIOI_GEN_aio_poll_fn
  5. ADIOI_GEN_aio_wait_fn
  6. ADIOI_GEN_aio_free_fn
  7. ADIOI_GEN_aio_query_fn

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
   2 /* 
   3  *
   4  *   Copyright (C) 2004 University of Chicago. 
   5  *   See COPYRIGHT notice in top-level directory.
   6  */
   7 
   8 #include "adio.h"
   9 
  10 #ifdef HAVE_UNISTD_H
  11 #include <unistd.h>
  12 #endif
  13 #ifdef HAVE_SIGNAL_H
  14 #include <signal.h>
  15 #endif
  16 #ifdef HAVE_SYS_TYPES_H
  17 #include <sys/types.h>
  18 #endif
  19 #ifdef HAVE_AIO_H
  20 #include <aio.h>
  21 #endif
  22 #ifdef HAVE_SYS_AIO_H
  23 #include <sys/aio.h>
  24 #endif
  25 #include <time.h>
  26 
  27 #include "../../mpi-io/mpioimpl.h"
  28 #include "../../mpi-io/mpioprof.h"
  29 #include "mpiu_greq.h"
  30 /* Workaround for incomplete set of definitions if __REDIRECT is not 
  31    defined and large file support is used in aio.h */
  32 #if !defined(__REDIRECT) && defined(__USE_FILE_OFFSET64)
  33 #define aiocb aiocb64
  34 #endif
  35 
  36 #ifdef ROMIO_HAVE_WORKING_AIO
  37 
  38 static MPIX_Grequest_class ADIOI_GEN_greq_class = 0;
  39 
  40 /* ADIOI_GEN_IwriteContig
  41  *
  42  * This code handles only the case where ROMIO_HAVE_WORKING_AIO is 
  43  * defined. We post an asynchronous I/O operations using the appropriate aio
  44  * routines.  Otherwise, the ADIOI_Fns_struct will point to the FAKE
  45  * version.
  46  */
  47 void ADIOI_GEN_IwriteContig(ADIO_File fd, const void *buf, int count,
  48                             MPI_Datatype datatype, int file_ptr_type,
  49                             ADIO_Offset offset, ADIO_Request *request,
  50                             int *error_code)
  51 {
  52     MPI_Count len, typesize;
  53     int aio_errno = 0;
  54     static char myname[] = "ADIOI_GEN_IWRITECONTIG";
  55 
  56     MPI_Type_size_x(datatype, &typesize);
  57     len = count * typesize;
  58     ADIOI_Assert(len == (int)((ADIO_Offset)count * (ADIO_Offset)typesize)); /* the count is an int parm */
  59 
  60     if (file_ptr_type == ADIO_INDIVIDUAL) offset = fd->fp_ind;
  61     /* Cast away the const'ness of 'buf' as ADIOI_GEN_aio is used for
  62      * both read and write calls */
  63     aio_errno = ADIOI_GEN_aio(fd, (char *) buf, len, offset, 1, request);
  64     if (file_ptr_type == ADIO_INDIVIDUAL) fd->fp_ind += len;
  65 
  66     fd->fp_sys_posn = -1;
  67 
  68     /* --BEGIN ERROR HANDLING-- */
  69     if (aio_errno != 0) {
  70         MPIO_ERR_CREATE_CODE_ERRNO(myname, aio_errno, error_code);
  71         return;
  72     }
  73     /* --END ERROR HANDLING-- */
  74 
  75     *error_code = MPI_SUCCESS;
  76 }
  77 /* This function is for implementation convenience.
  78  * It takes care of the differences in the interface for nonblocking I/O
  79  * on various Unix machines! If wr==1 write, wr==0 read.
  80  *
  81  * Returns 0 on success, -errno on failure.
  82  */
  83 int ADIOI_GEN_aio(ADIO_File fd, void *buf, int len, ADIO_Offset offset,
  84                   int wr, MPI_Request *request)
  85 {
  86     int err=-1, fd_sys;
  87 
  88     int error_code;
  89     struct aiocb *aiocbp=NULL;
  90     ADIOI_AIO_Request *aio_req=NULL;
  91     MPI_Status status;
  92 #if defined(ROMIO_XFS)
  93     unsigned maxiosz = wr ? fd->hints->fs_hints.xfs.write_chunk_sz :
  94             fd->hints->fs_hints.xfs.read_chunk_sz;
  95 #endif /* ROMIO_XFS */
  96 
  97     fd_sys = fd->fd_sys;
  98 
  99 #if defined(ROMIO_XFS)
 100     /* Use Direct I/O if desired and properly aligned */
 101     if (fd->fns == &ADIO_XFS_operations &&
 102          ((wr && fd->direct_write) || (!wr && fd->direct_read)) &&
 103          !(((long) buf) % fd->d_mem) && !(offset % fd->d_miniosz) && 
 104          !(len % fd->d_miniosz) && (len >= fd->d_miniosz) && 
 105          (len <= maxiosz)) {
 106             fd_sys = fd->fd_direct;
 107     }
 108 #endif /* ROMIO_XFS */
 109 
 110     aio_req = (ADIOI_AIO_Request*)ADIOI_Calloc(sizeof(ADIOI_AIO_Request), 1);
 111     aiocbp = (struct aiocb *) ADIOI_Calloc(sizeof(struct aiocb), 1);
 112     aiocbp->aio_offset = offset;
 113     aiocbp->aio_buf    = buf;
 114     aiocbp->aio_nbytes = len;
 115 
 116 #ifdef HAVE_STRUCT_AIOCB_AIO_WHENCE
 117     aiocbp->aio_whence = SEEK_SET;
 118 #endif
 119 #ifdef HAVE_STRUCT_AIOCB_AIO_FILDES
 120     aiocbp->aio_fildes = fd_sys;
 121 #endif
 122 #ifdef HAVE_STRUCT_AIOCB_AIO_SIGEVENT
 123 # ifdef AIO_SIGNOTIFY_NONE
 124     aiocbp->aio_sigevent.sigev_notify = SIGEV_NONE;
 125 # endif
 126     aiocbp->aio_sigevent.sigev_signo = 0;
 127 #endif
 128 #ifdef HAVE_STRUCT_AIOCB_AIO_REQPRIO
 129 # ifdef AIO_PRIO_DFL
 130     aiocbp->aio_reqprio = AIO_PRIO_DFL;   /* not needed in DEC Unix 4.0 */
 131 # else
 132     aiocbp->aio_reqprio = 0;
 133 # endif
 134 #endif
 135 
 136 #ifndef ROMIO_HAVE_AIO_CALLS_NEED_FILEDES
 137 #ifndef HAVE_STRUCT_AIOCB_AIO_FILDES
 138 #error 'No fildes set for aio structure'
 139 #endif
 140     if (wr) err = aio_write(aiocbp);
 141     else err = aio_read(aiocbp);
 142 #else
 143     /* Broken IBM interface */
 144     if (wr) err = aio_write(fd_sys, aiocbp);
 145     else err = aio_read(fd_sys, aiocbp);
 146 #endif
 147 
 148     if (err == -1) {
 149         if (errno == EAGAIN || errno == ENOSYS) { 
 150             /* exceeded the max. no. of outstanding requests.
 151                or, aio routines are not actually implemented 
 152             treat this as a blocking request and return.  */
 153             if (wr) 
 154                 ADIO_WriteContig(fd, buf, len, MPI_BYTE, 
 155                             ADIO_EXPLICIT_OFFSET, offset, &status, &error_code);  
 156             else
 157                 ADIO_ReadContig(fd, buf, len, MPI_BYTE,
 158                             ADIO_EXPLICIT_OFFSET, offset, &status, &error_code);  
 159                     
 160             MPIO_Completed_request_create(&fd, len, &error_code, request);
 161             if (aiocbp != NULL) ADIOI_Free(aiocbp);
 162             if (aio_req != NULL) ADIOI_Free(aio_req);
 163             return 0;
 164         } else {
 165             ADIOI_Free(aio_req);
 166             ADIOI_Free(aiocbp);
 167             return errno;
 168         }
 169     }
 170     aio_req->aiocbp = aiocbp;
 171     if (ADIOI_GEN_greq_class == 0) {
 172             MPIX_Grequest_class_create(ADIOI_GEN_aio_query_fn, 
 173                             ADIOI_GEN_aio_free_fn, MPIU_Greq_cancel_fn, 
 174                             ADIOI_GEN_aio_poll_fn, ADIOI_GEN_aio_wait_fn, 
 175                             &ADIOI_GEN_greq_class);
 176     }
 177     MPIX_Grequest_class_allocate(ADIOI_GEN_greq_class, aio_req, request);
 178     memcpy(&(aio_req->req), request, sizeof(MPI_Request));
 179     return 0;
 180 }
 181 #endif
 182 
 183 
 184 /* Generic implementation of IwriteStrided calls the blocking WriteStrided
 185  * immediately.
 186  */
 187 void ADIOI_GEN_IwriteStrided(ADIO_File fd, const void *buf, int count,
 188                              MPI_Datatype datatype, int file_ptr_type,
 189                              ADIO_Offset offset, MPI_Request *request,
 190                              int *error_code)
 191 {
 192     ADIO_Status status;
 193     MPI_Count typesize;
 194     MPI_Offset nbytes=0;
 195 
 196     /* Call the blocking function.  It will create an error code 
 197      * if necessary.
 198      */
 199     ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type, 
 200                       offset, &status, error_code);  
 201 
 202     if (*error_code == MPI_SUCCESS) {
 203         MPI_Type_size_x(datatype, &typesize);
 204         nbytes = (MPI_Offset)count * (MPI_Offset)typesize;
 205     }
 206     MPIO_Completed_request_create(&fd, nbytes, error_code, request);
 207 }
 208 
 209 #ifdef ROMIO_HAVE_WORKING_AIO
 210 /* generic POSIX aio completion test routine */
 211 int ADIOI_GEN_aio_poll_fn(void *extra_state, MPI_Status *status)
 212 {
 213     ADIOI_AIO_Request *aio_req;
 214     int errcode=MPI_SUCCESS;
 215 
 216     aio_req = (ADIOI_AIO_Request *)extra_state;
 217 
 218     /* aio_error returns an ERRNO value */
 219     errno = aio_error(aio_req->aiocbp);
 220     if (errno == EINPROGRESS) {
 221             /* TODO: need to diddle with status somehow */
 222     }
 223     else if (errno == ECANCELED) {
 224             /* TODO: unsure how to handle this */
 225     } else if (errno == 0) {
 226             ssize_t n = aio_return(aio_req->aiocbp);
 227             aio_req->nbytes = n;
 228             errcode = MPI_Grequest_complete(aio_req->req);
 229             /* --BEGIN ERROR HANDLING-- */
 230             if (errcode != MPI_SUCCESS) {
 231                     errcode = MPIO_Err_create_code(MPI_SUCCESS,
 232                                     MPIR_ERR_RECOVERABLE,
 233                                     "ADIOI_GEN_aio_poll_fn", __LINE__,
 234                                     MPI_ERR_IO, "**mpi_grequest_complete",
 235                                     0);
 236             }
 237             /* --END ERROR HANDLING-- */
 238     }
 239     return errcode;
 240 }
 241 
 242 /* wait for multiple requests to complete */
 243 int ADIOI_GEN_aio_wait_fn(int count, void ** array_of_states, 
 244                 double timeout, MPI_Status *status)
 245 {
 246         const struct aiocb **cblist;
 247         int err, errcode=MPI_SUCCESS;
 248         int nr_complete=0;
 249         double starttime;
 250         struct timespec aio_timer;
 251         struct timespec *aio_timer_p = NULL;
 252 
 253         ADIOI_AIO_Request **aio_reqlist;
 254         int i;
 255 
 256         aio_reqlist = (ADIOI_AIO_Request **)array_of_states;
 257 
 258         cblist = (const struct aiocb**) ADIOI_Calloc(count, sizeof(struct aiocb*));
 259 
 260         starttime = MPI_Wtime();
 261         if (timeout >0) {
 262             aio_timer.tv_sec = (time_t)timeout;
 263             aio_timer.tv_nsec = timeout - aio_timer.tv_sec;
 264             aio_timer_p = &aio_timer;
 265         }
 266         for (i=0; i< count; i++)
 267         {
 268                 cblist[i] = aio_reqlist[i]->aiocbp;
 269         }
 270 
 271         while(nr_complete < count) {
 272             do {
 273                 err = aio_suspend(cblist, count, aio_timer_p);
 274             } while (err < 0 && errno == EINTR);
 275             if (err == 0) 
 276             { /* run through the list of requests, and mark all the completed
 277                  ones as done */
 278                 for (i=0; i< count; i++)
 279                 {
 280                     /* aio_error returns an ERRNO value */
 281                     if (aio_reqlist[i]->aiocbp == NULL) 
 282                         continue;
 283                     errno = aio_error(aio_reqlist[i]->aiocbp);
 284                     if (errno == 0) {
 285                         ssize_t n = aio_return(aio_reqlist[i]->aiocbp);
 286                         aio_reqlist[i]->nbytes = n;
 287                         errcode = MPI_Grequest_complete(aio_reqlist[i]->req);
 288                         if (errcode != MPI_SUCCESS) {
 289                             errcode = MPIO_Err_create_code(MPI_SUCCESS,
 290                                     MPIR_ERR_RECOVERABLE,
 291                                     "ADIOI_GEN_aio_wait_fn", 
 292                                     __LINE__, MPI_ERR_IO, 
 293                                     "**mpi_grequest_complete", 0);
 294                         }
 295                         ADIOI_Free(aio_reqlist[i]->aiocbp);
 296                         aio_reqlist[i]->aiocbp = NULL;
 297                         cblist[i] = NULL;
 298                         nr_complete++;
 299                     } 
 300                     /* TODO: need to handle error conditions somehow*/
 301                 }
 302             } /* TODO: also need to handle errors here  */
 303             if ( (timeout > 0) && (timeout < (MPI_Wtime() - starttime) ))
 304                 break;
 305         }
 306 
 307         if (cblist != NULL) ADIOI_Free(cblist);
 308         return errcode;
 309 }
 310 
 311 int ADIOI_GEN_aio_free_fn(void *extra_state)
 312 {
 313         ADIOI_AIO_Request *aio_req;
 314         aio_req = (ADIOI_AIO_Request*)extra_state;
 315 
 316         if (aio_req->aiocbp != NULL)
 317                 ADIOI_Free(aio_req->aiocbp);
 318         ADIOI_Free(aio_req);
 319 
 320         return MPI_SUCCESS;
 321 }
 322 #endif /* working AIO */
 323 
 324 int ADIOI_GEN_aio_query_fn(void *extra_state, MPI_Status *status) 
 325 {
 326         ADIOI_AIO_Request *aio_req;
 327 
 328         aio_req = (ADIOI_AIO_Request *)extra_state;
 329 
 330         MPI_Status_set_elements_x(status, MPI_BYTE, aio_req->nbytes);
 331 
 332         /* can never cancel so always true */ 
 333         MPI_Status_set_cancelled(status, 0); 
 334 
 335         /* choose not to return a value for this */ 
 336         status->MPI_SOURCE = MPI_UNDEFINED; 
 337         /* tag has no meaning for this generalized request */ 
 338         status->MPI_TAG = MPI_UNDEFINED; 
 339         /* this generalized request never fails */ 
 340         return MPI_SUCCESS; 
 341 }
 342 /* 
 343  * vim: ts=8 sts=4 sw=4 noexpandtab 
 344  */

/* [<][>][^][v][top][bottom][index][help] */