This source file includes following definitions.
- ADIOI_GEN_IwriteContig
- ADIOI_GEN_aio
- ADIOI_GEN_IwriteStrided
- ADIOI_GEN_aio_poll_fn
- ADIOI_GEN_aio_wait_fn
- ADIOI_GEN_aio_free_fn
- ADIOI_GEN_aio_query_fn
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 #include "adio.h"
   9 
  10 #ifdef HAVE_UNISTD_H
  11 #include <unistd.h>
  12 #endif
  13 #ifdef HAVE_SIGNAL_H
  14 #include <signal.h>
  15 #endif
  16 #ifdef HAVE_SYS_TYPES_H
  17 #include <sys/types.h>
  18 #endif
  19 #ifdef HAVE_AIO_H
  20 #include <aio.h>
  21 #endif
  22 #ifdef HAVE_SYS_AIO_H
  23 #include <sys/aio.h>
  24 #endif
  25 #include <time.h>
  26 
  27 #include "../../mpi-io/mpioimpl.h"
  28 #include "../../mpi-io/mpioprof.h"
  29 #include "mpiu_greq.h"
  30 
  31 
  32 #if !defined(__REDIRECT) && defined(__USE_FILE_OFFSET64)
  33 #define aiocb aiocb64
  34 #endif
  35 
  36 #ifdef ROMIO_HAVE_WORKING_AIO
  37 
  38 static MPIX_Grequest_class ADIOI_GEN_greq_class = 0;
  39 
  40 
  41 
  42 
  43 
  44 
  45 
  46 
  47 void ADIOI_GEN_IwriteContig(ADIO_File fd, const void *buf, int count,
  48                             MPI_Datatype datatype, int file_ptr_type,
  49                             ADIO_Offset offset, ADIO_Request *request,
  50                             int *error_code)
  51 {
  52     MPI_Count len, typesize;
  53     int aio_errno = 0;
  54     static char myname[] = "ADIOI_GEN_IWRITECONTIG";
  55 
  56     MPI_Type_size_x(datatype, &typesize);
  57     len = count * typesize;
  58     ADIOI_Assert(len == (int)((ADIO_Offset)count * (ADIO_Offset)typesize)); 
  59 
  60     if (file_ptr_type == ADIO_INDIVIDUAL) offset = fd->fp_ind;
  61     
  62 
  63     aio_errno = ADIOI_GEN_aio(fd, (char *) buf, len, offset, 1, request);
  64     if (file_ptr_type == ADIO_INDIVIDUAL) fd->fp_ind += len;
  65 
  66     fd->fp_sys_posn = -1;
  67 
  68     
  69     if (aio_errno != 0) {
  70         MPIO_ERR_CREATE_CODE_ERRNO(myname, aio_errno, error_code);
  71         return;
  72     }
  73     
  74 
  75     *error_code = MPI_SUCCESS;
  76 }
  77 
  78 
  79 
  80 
  81 
  82 
  83 int ADIOI_GEN_aio(ADIO_File fd, void *buf, int len, ADIO_Offset offset,
  84                   int wr, MPI_Request *request)
  85 {
  86     int err=-1, fd_sys;
  87 
  88     int error_code;
  89     struct aiocb *aiocbp=NULL;
  90     ADIOI_AIO_Request *aio_req=NULL;
  91     MPI_Status status;
  92 #if defined(ROMIO_XFS)
  93     unsigned maxiosz = wr ? fd->hints->fs_hints.xfs.write_chunk_sz :
  94             fd->hints->fs_hints.xfs.read_chunk_sz;
  95 #endif 
  96 
  97     fd_sys = fd->fd_sys;
  98 
  99 #if defined(ROMIO_XFS)
 100     
 101     if (fd->fns == &ADIO_XFS_operations &&
 102          ((wr && fd->direct_write) || (!wr && fd->direct_read)) &&
 103          !(((long) buf) % fd->d_mem) && !(offset % fd->d_miniosz) && 
 104          !(len % fd->d_miniosz) && (len >= fd->d_miniosz) && 
 105          (len <= maxiosz)) {
 106             fd_sys = fd->fd_direct;
 107     }
 108 #endif 
 109 
 110     aio_req = (ADIOI_AIO_Request*)ADIOI_Calloc(sizeof(ADIOI_AIO_Request), 1);
 111     aiocbp = (struct aiocb *) ADIOI_Calloc(sizeof(struct aiocb), 1);
 112     aiocbp->aio_offset = offset;
 113     aiocbp->aio_buf    = buf;
 114     aiocbp->aio_nbytes = len;
 115 
 116 #ifdef HAVE_STRUCT_AIOCB_AIO_WHENCE
 117     aiocbp->aio_whence = SEEK_SET;
 118 #endif
 119 #ifdef HAVE_STRUCT_AIOCB_AIO_FILDES
 120     aiocbp->aio_fildes = fd_sys;
 121 #endif
 122 #ifdef HAVE_STRUCT_AIOCB_AIO_SIGEVENT
 123 # ifdef AIO_SIGNOTIFY_NONE
 124     aiocbp->aio_sigevent.sigev_notify = SIGEV_NONE;
 125 # endif
 126     aiocbp->aio_sigevent.sigev_signo = 0;
 127 #endif
 128 #ifdef HAVE_STRUCT_AIOCB_AIO_REQPRIO
 129 # ifdef AIO_PRIO_DFL
 130     aiocbp->aio_reqprio = AIO_PRIO_DFL;   
 131 # else
 132     aiocbp->aio_reqprio = 0;
 133 # endif
 134 #endif
 135 
 136 #ifndef ROMIO_HAVE_AIO_CALLS_NEED_FILEDES
 137 #ifndef HAVE_STRUCT_AIOCB_AIO_FILDES
 138 #error 'No fildes set for aio structure'
 139 #endif
 140     if (wr) err = aio_write(aiocbp);
 141     else err = aio_read(aiocbp);
 142 #else
 143     
 144     if (wr) err = aio_write(fd_sys, aiocbp);
 145     else err = aio_read(fd_sys, aiocbp);
 146 #endif
 147 
 148     if (err == -1) {
 149         if (errno == EAGAIN || errno == ENOSYS) { 
 150             
 151 
 152 
 153             if (wr) 
 154                 ADIO_WriteContig(fd, buf, len, MPI_BYTE, 
 155                             ADIO_EXPLICIT_OFFSET, offset, &status, &error_code);  
 156             else
 157                 ADIO_ReadContig(fd, buf, len, MPI_BYTE,
 158                             ADIO_EXPLICIT_OFFSET, offset, &status, &error_code);  
 159                     
 160             MPIO_Completed_request_create(&fd, len, &error_code, request);
 161             if (aiocbp != NULL) ADIOI_Free(aiocbp);
 162             if (aio_req != NULL) ADIOI_Free(aio_req);
 163             return 0;
 164         } else {
 165             ADIOI_Free(aio_req);
 166             ADIOI_Free(aiocbp);
 167             return errno;
 168         }
 169     }
 170     aio_req->aiocbp = aiocbp;
 171     if (ADIOI_GEN_greq_class == 0) {
 172             MPIX_Grequest_class_create(ADIOI_GEN_aio_query_fn, 
 173                             ADIOI_GEN_aio_free_fn, MPIU_Greq_cancel_fn, 
 174                             ADIOI_GEN_aio_poll_fn, ADIOI_GEN_aio_wait_fn, 
 175                             &ADIOI_GEN_greq_class);
 176     }
 177     MPIX_Grequest_class_allocate(ADIOI_GEN_greq_class, aio_req, request);
 178     memcpy(&(aio_req->req), request, sizeof(MPI_Request));
 179     return 0;
 180 }
 181 #endif
 182 
 183 
 184 
 185 
 186 
 187 void ADIOI_GEN_IwriteStrided(ADIO_File fd, const void *buf, int count,
 188                              MPI_Datatype datatype, int file_ptr_type,
 189                              ADIO_Offset offset, MPI_Request *request,
 190                              int *error_code)
 191 {
 192     ADIO_Status status;
 193     MPI_Count typesize;
 194     MPI_Offset nbytes=0;
 195 
 196     
 197 
 198 
 199     ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type, 
 200                       offset, &status, error_code);  
 201 
 202     if (*error_code == MPI_SUCCESS) {
 203         MPI_Type_size_x(datatype, &typesize);
 204         nbytes = (MPI_Offset)count * (MPI_Offset)typesize;
 205     }
 206     MPIO_Completed_request_create(&fd, nbytes, error_code, request);
 207 }
 208 
 209 #ifdef ROMIO_HAVE_WORKING_AIO
 210 
 211 int ADIOI_GEN_aio_poll_fn(void *extra_state, MPI_Status *status)
 212 {
 213     ADIOI_AIO_Request *aio_req;
 214     int errcode=MPI_SUCCESS;
 215 
 216     aio_req = (ADIOI_AIO_Request *)extra_state;
 217 
 218     
 219     errno = aio_error(aio_req->aiocbp);
 220     if (errno == EINPROGRESS) {
 221             
 222     }
 223     else if (errno == ECANCELED) {
 224             
 225     } else if (errno == 0) {
 226             ssize_t n = aio_return(aio_req->aiocbp);
 227             aio_req->nbytes = n;
 228             errcode = MPI_Grequest_complete(aio_req->req);
 229             
 230             if (errcode != MPI_SUCCESS) {
 231                     errcode = MPIO_Err_create_code(MPI_SUCCESS,
 232                                     MPIR_ERR_RECOVERABLE,
 233                                     "ADIOI_GEN_aio_poll_fn", __LINE__,
 234                                     MPI_ERR_IO, "**mpi_grequest_complete",
 235                                     0);
 236             }
 237             
 238     }
 239     return errcode;
 240 }
 241 
 242 
 243 int ADIOI_GEN_aio_wait_fn(int count, void ** array_of_states, 
 244                 double timeout, MPI_Status *status)
 245 {
 246         const struct aiocb **cblist;
 247         int err, errcode=MPI_SUCCESS;
 248         int nr_complete=0;
 249         double starttime;
 250         struct timespec aio_timer;
 251         struct timespec *aio_timer_p = NULL;
 252 
 253         ADIOI_AIO_Request **aio_reqlist;
 254         int i;
 255 
 256         aio_reqlist = (ADIOI_AIO_Request **)array_of_states;
 257 
 258         cblist = (const struct aiocb**) ADIOI_Calloc(count, sizeof(struct aiocb*));
 259 
 260         starttime = MPI_Wtime();
 261         if (timeout >0) {
 262             aio_timer.tv_sec = (time_t)timeout;
 263             aio_timer.tv_nsec = timeout - aio_timer.tv_sec;
 264             aio_timer_p = &aio_timer;
 265         }
 266         for (i=0; i< count; i++)
 267         {
 268                 cblist[i] = aio_reqlist[i]->aiocbp;
 269         }
 270 
 271         while(nr_complete < count) {
 272             do {
 273                 err = aio_suspend(cblist, count, aio_timer_p);
 274             } while (err < 0 && errno == EINTR);
 275             if (err == 0) 
 276             { 
 277 
 278                 for (i=0; i< count; i++)
 279                 {
 280                     
 281                     if (aio_reqlist[i]->aiocbp == NULL) 
 282                         continue;
 283                     errno = aio_error(aio_reqlist[i]->aiocbp);
 284                     if (errno == 0) {
 285                         ssize_t n = aio_return(aio_reqlist[i]->aiocbp);
 286                         aio_reqlist[i]->nbytes = n;
 287                         errcode = MPI_Grequest_complete(aio_reqlist[i]->req);
 288                         if (errcode != MPI_SUCCESS) {
 289                             errcode = MPIO_Err_create_code(MPI_SUCCESS,
 290                                     MPIR_ERR_RECOVERABLE,
 291                                     "ADIOI_GEN_aio_wait_fn", 
 292                                     __LINE__, MPI_ERR_IO, 
 293                                     "**mpi_grequest_complete", 0);
 294                         }
 295                         ADIOI_Free(aio_reqlist[i]->aiocbp);
 296                         aio_reqlist[i]->aiocbp = NULL;
 297                         cblist[i] = NULL;
 298                         nr_complete++;
 299                     } 
 300                     
 301                 }
 302             } 
 303             if ( (timeout > 0) && (timeout < (MPI_Wtime() - starttime) ))
 304                 break;
 305         }
 306 
 307         if (cblist != NULL) ADIOI_Free(cblist);
 308         return errcode;
 309 }
 310 
 311 int ADIOI_GEN_aio_free_fn(void *extra_state)
 312 {
 313         ADIOI_AIO_Request *aio_req;
 314         aio_req = (ADIOI_AIO_Request*)extra_state;
 315 
 316         if (aio_req->aiocbp != NULL)
 317                 ADIOI_Free(aio_req->aiocbp);
 318         ADIOI_Free(aio_req);
 319 
 320         return MPI_SUCCESS;
 321 }
 322 #endif 
 323 
 324 int ADIOI_GEN_aio_query_fn(void *extra_state, MPI_Status *status) 
 325 {
 326         ADIOI_AIO_Request *aio_req;
 327 
 328         aio_req = (ADIOI_AIO_Request *)extra_state;
 329 
 330         MPI_Status_set_elements_x(status, MPI_BYTE, aio_req->nbytes);
 331 
 332          
 333         MPI_Status_set_cancelled(status, 0); 
 334 
 335          
 336         status->MPI_SOURCE = MPI_UNDEFINED; 
 337          
 338         status->MPI_TAG = MPI_UNDEFINED; 
 339          
 340         return MPI_SUCCESS; 
 341 }
 342 
 343 
 344