root/ompi/mca/io/romio321/romio/adio/ad_pvfs2/ad_pvfs2_aio.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. ADIOI_PVFS2_IReadContig
  2. ADIOI_PVFS2_IWriteContig
  3. ADIOI_PVFS2_AIO_contig
  4. ADIOI_PVFS2_aio_free_fn
  5. ADIOI_PVFS2_aio_poll_fn
  6. ADIOI_PVFS2_aio_wait_fn

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- 
   2  *     vim: ts=8 sts=4 sw=4 noexpandtab 
   3  * 
   4  *   Copyright (C) 1997 University of Chicago. 
   5  *   See COPYRIGHT notice in top-level directory.
   6  */
   7 
   8 #include "adio.h"
   9 #include "adio_extern.h"
  10 #include "ad_pvfs2.h"
  11 #include <string.h>
  12 
  13 #include "ad_pvfs2_common.h"
  14 #include "mpiu_greq.h"
  15 #include "../../mpi-io/mpioimpl.h"
  16 
  17 #define READ 0
  18 #define WRITE 1
  19 
  20 static int ADIOI_PVFS2_greq_class = 0;
  21 int ADIOI_PVFS2_aio_free_fn(void *extra_state);
  22 int ADIOI_PVFS2_aio_poll_fn(void *extra_state, MPI_Status *status);
  23 int ADIOI_PVFS2_aio_wait_fn(int count, void ** array_of_states, 
  24                 double timeout, MPI_Status *status);
  25 
  26 void ADIOI_PVFS2_IReadContig(ADIO_File fd, void *buf, int count, 
  27                             MPI_Datatype datatype, int file_ptr_type,
  28                             ADIO_Offset offset, MPI_Request *request,
  29                             int *error_code)
  30 {
  31     ADIOI_PVFS2_AIO_contig(fd, buf, count, datatype, file_ptr_type,
  32             offset, request, READ, error_code);
  33 }
  34 
  35 void ADIOI_PVFS2_IWriteContig(ADIO_File fd, const void *buf, int count,
  36                             MPI_Datatype datatype, int file_ptr_type,
  37                             ADIO_Offset offset, MPI_Request *request,
  38                             int *error_code)
  39 {
  40     ADIOI_PVFS2_AIO_contig(fd, (void *)buf, count, datatype, file_ptr_type,
  41             offset, request, WRITE, error_code);
  42 }
  43 
  44 void ADIOI_PVFS2_AIO_contig(ADIO_File fd, void *buf, int count, 
  45                             MPI_Datatype datatype, int file_ptr_type,
  46                             ADIO_Offset offset, MPI_Request *request,
  47                             int flag, int *error_code)
  48 {
  49 
  50     int ret;
  51     MPI_Count datatype_size, len;
  52     ADIOI_PVFS2_fs *pvfs_fs;
  53     ADIOI_AIO_Request *aio_req;
  54     static char myname[] = "ADIOI_PVFS2_AIO_contig";
  55 
  56     pvfs_fs = (ADIOI_PVFS2_fs*)fd->fs_ptr;
  57 
  58     aio_req = (ADIOI_AIO_Request*)ADIOI_Calloc(sizeof(ADIOI_AIO_Request), 1);
  59 
  60     MPI_Type_size_x(datatype, &datatype_size);
  61     len = datatype_size * count;
  62 
  63     ret = PVFS_Request_contiguous(len, PVFS_BYTE, &(aio_req->mem_req));
  64     /* --BEGIN ERROR HANDLING-- */
  65     if (ret != 0) {
  66         *error_code = MPIO_Err_create_code(MPI_SUCCESS,
  67                                            MPIR_ERR_RECOVERABLE,
  68                                            myname, __LINE__,
  69                                            ADIOI_PVFS2_error_convert(ret),
  70                                            "Error in pvfs_request_contig (memory)", 0);
  71         return;
  72     }
  73     /* --END ERROR HANDLING-- */
  74 
  75     ret = PVFS_Request_contiguous(len, PVFS_BYTE, &(aio_req->file_req));
  76     /* --BEGIN ERROR HANDLING-- */
  77     if (ret != 0) {
  78         *error_code = MPIO_Err_create_code(MPI_SUCCESS,
  79                                            MPIR_ERR_RECOVERABLE,
  80                                            myname, __LINE__,
  81                                            ADIOI_PVFS2_error_convert(ret),
  82                                            "Error in pvfs_request_contig (file)", 0);
  83         return;
  84     }
  85     /* --END ERROR HANDLING-- */
  86 
  87     if (file_ptr_type == ADIO_INDIVIDUAL) {
  88         /* copy individual file pointer into offset variable, continue */
  89         offset = fd->fp_ind;
  90     } 
  91     if (flag == READ) {
  92 #ifdef ADIOI_MPE_LOGGING
  93         MPE_Log_event( ADIOI_MPE_iread_a, 0, NULL );
  94 #endif
  95         ret = PVFS_isys_read(pvfs_fs->object_ref, aio_req->file_req, offset, 
  96                 buf, aio_req->mem_req, &(pvfs_fs->credentials), 
  97                 &(aio_req->resp_io), &(aio_req->op_id), NULL);
  98 #ifdef ADIOI_MPE_LOGGING
  99         MPE_Log_event( ADIOI_MPE_iread_b, 0, NULL );
 100 #endif
 101     } else if (flag == WRITE) {
 102 #ifdef ADIOI_MPE_LOGGING
 103         MPE_Log_event( ADIOI_MPE_iwrite_a, 0, NULL );
 104 #endif
 105         ret = PVFS_isys_write(pvfs_fs->object_ref, aio_req->file_req, offset, 
 106                 buf, aio_req->mem_req, &(pvfs_fs->credentials), 
 107                 &(aio_req->resp_io), &(aio_req->op_id), NULL);
 108 #ifdef ADIOI_MPE_LOGGING
 109         MPE_Log_event( ADIOI_MPE_iwrite_b, 0, NULL );
 110 #endif 
 111     } 
 112 
 113     /* --BEGIN ERROR HANDLING-- */
 114     if (ret < 0 ) {
 115         *error_code = MPIO_Err_create_code(MPI_SUCCESS,
 116                                            MPIR_ERR_RECOVERABLE,
 117                                            myname, __LINE__,
 118                                            ADIOI_PVFS2_error_convert(ret),
 119                                            "Error in PVFS_isys_io", 0);
 120         goto fn_exit;
 121     }
 122     /* --END ERROR HANDLING-- */
 123 
 124     /* posted. defered completion */
 125     if (ret == 0) { 
 126         if (ADIOI_PVFS2_greq_class == 0) {
 127             MPIX_Grequest_class_create(ADIOI_GEN_aio_query_fn, 
 128                     ADIOI_PVFS2_aio_free_fn, MPIU_Greq_cancel_fn,
 129                     ADIOI_PVFS2_aio_poll_fn, ADIOI_PVFS2_aio_wait_fn,
 130                     &ADIOI_PVFS2_greq_class);
 131         }
 132         MPIX_Grequest_class_allocate(ADIOI_PVFS2_greq_class, aio_req, request);
 133         memcpy(&(aio_req->req), request, sizeof(*request));
 134     }
 135 
 136     /* immediate completion */
 137     if (ret == 1) {
 138         MPIO_Completed_request_create(&fd, len, error_code, request);
 139     }
 140 
 141     if (file_ptr_type == ADIO_INDIVIDUAL) {
 142         fd->fp_ind += len;
 143     }
 144     fd->fp_sys_posn = offset + len;
 145 
 146     *error_code = MPI_SUCCESS;
 147 fn_exit:
 148     return;
 149 }
 150 
 151 int ADIOI_PVFS2_aio_free_fn(void *extra_state)
 152 {
 153     ADIOI_AIO_Request *aio_req;
 154     aio_req = (ADIOI_AIO_Request*)extra_state;
 155 
 156     PVFS_Request_free(&(aio_req->mem_req));
 157     PVFS_Request_free(&(aio_req->file_req));
 158     ADIOI_Free(aio_req);
 159 
 160     return MPI_SUCCESS;
 161 }
 162 
 163 int ADIOI_PVFS2_aio_poll_fn(void *extra_state, MPI_Status *status)
 164 {
 165     ADIOI_AIO_Request *aio_req;
 166     int ret, error;
 167 
 168     aio_req = (ADIOI_AIO_Request *)extra_state;
 169 
 170     /* BUG: cannot PVFS_sys_testsome: does not work for a specific request */
 171     ret = PVFS_sys_wait(aio_req->op_id, "ADIOI_PVFS2_aio_poll_fn", &error);
 172     if (ret == 0) {
 173         aio_req->nbytes = aio_req->resp_io.total_completed;
 174         MPI_Grequest_complete(aio_req->req);
 175         return MPI_SUCCESS;
 176     } else
 177         return MPI_UNDEFINED; /* TODO: what's this error? */
 178 }
 179 
 180 /* wait for multiple requests to complete */
 181 int ADIOI_PVFS2_aio_wait_fn(int count, void ** array_of_states, 
 182                 double timeout, MPI_Status *status)
 183 {
 184 
 185     ADIOI_AIO_Request **aio_reqlist;
 186     PVFS_sys_op_id *op_id_array;
 187     int i,j, greq_count, completed_count=0;
 188     int *error_array;
 189 
 190     aio_reqlist = (ADIOI_AIO_Request **)array_of_states;
 191 
 192     op_id_array = (PVFS_sys_op_id*)ADIOI_Calloc(count, sizeof(PVFS_sys_op_id));
 193     error_array = (int *)ADIOI_Calloc(count, sizeof(int));
 194     greq_count = count;
 195 
 196 
 197     /* PVFS-2.6: testsome actually tests all requests and fills in op_id_array
 198      * with the ones that have completed.  count is an in/out parameter.
 199      * returns with the number of completed operations.  what a mess! */
 200     while (completed_count < greq_count ) {
 201         count = greq_count;
 202         PVFS_sys_testsome(op_id_array, &count, NULL, error_array, INT_MAX);
 203         completed_count += count;
 204         for (i=0; i< count; i++) {
 205             for (j=0; j<greq_count; j++) {
 206                 if (op_id_array[i] == aio_reqlist[j]->op_id) {
 207                     aio_reqlist[j]->nbytes = 
 208                         aio_reqlist[j]->resp_io.total_completed;
 209                     MPI_Grequest_complete(aio_reqlist[j]->req);
 210                 }
 211             }
 212         }
 213     }
 214     return MPI_SUCCESS; /* TODO: no idea how to deal with errors */
 215 }
 216 
 217 
 218 /*
 219  * vim: ts=8 sts=4 sw=4 noexpandtab 
 220  */

/* [<][>][^][v][top][bottom][index][help] */