This source file includes following definitions.
- ADIOI_PVFS2_IReadContig
- ADIOI_PVFS2_IWriteContig
- ADIOI_PVFS2_AIO_contig
- ADIOI_PVFS2_aio_free_fn
- ADIOI_PVFS2_aio_poll_fn
- ADIOI_PVFS2_aio_wait_fn
1
2
3
4
5
6
7
8 #include "adio.h"
9 #include "adio_extern.h"
10 #include "ad_pvfs2.h"
11 #include <string.h>
12
13 #include "ad_pvfs2_common.h"
14 #include "mpiu_greq.h"
15 #include "../../mpi-io/mpioimpl.h"
16
17 #define READ 0
18 #define WRITE 1
19
20 static int ADIOI_PVFS2_greq_class = 0;
21 int ADIOI_PVFS2_aio_free_fn(void *extra_state);
22 int ADIOI_PVFS2_aio_poll_fn(void *extra_state, MPI_Status *status);
23 int ADIOI_PVFS2_aio_wait_fn(int count, void ** array_of_states,
24 double timeout, MPI_Status *status);
25
26 void ADIOI_PVFS2_IReadContig(ADIO_File fd, void *buf, int count,
27 MPI_Datatype datatype, int file_ptr_type,
28 ADIO_Offset offset, MPI_Request *request,
29 int *error_code)
30 {
31 ADIOI_PVFS2_AIO_contig(fd, buf, count, datatype, file_ptr_type,
32 offset, request, READ, error_code);
33 }
34
35 void ADIOI_PVFS2_IWriteContig(ADIO_File fd, const void *buf, int count,
36 MPI_Datatype datatype, int file_ptr_type,
37 ADIO_Offset offset, MPI_Request *request,
38 int *error_code)
39 {
40 ADIOI_PVFS2_AIO_contig(fd, (void *)buf, count, datatype, file_ptr_type,
41 offset, request, WRITE, error_code);
42 }
43
44 void ADIOI_PVFS2_AIO_contig(ADIO_File fd, void *buf, int count,
45 MPI_Datatype datatype, int file_ptr_type,
46 ADIO_Offset offset, MPI_Request *request,
47 int flag, int *error_code)
48 {
49
50 int ret;
51 MPI_Count datatype_size, len;
52 ADIOI_PVFS2_fs *pvfs_fs;
53 ADIOI_AIO_Request *aio_req;
54 static char myname[] = "ADIOI_PVFS2_AIO_contig";
55
56 pvfs_fs = (ADIOI_PVFS2_fs*)fd->fs_ptr;
57
58 aio_req = (ADIOI_AIO_Request*)ADIOI_Calloc(sizeof(ADIOI_AIO_Request), 1);
59
60 MPI_Type_size_x(datatype, &datatype_size);
61 len = datatype_size * count;
62
63 ret = PVFS_Request_contiguous(len, PVFS_BYTE, &(aio_req->mem_req));
64
65 if (ret != 0) {
66 *error_code = MPIO_Err_create_code(MPI_SUCCESS,
67 MPIR_ERR_RECOVERABLE,
68 myname, __LINE__,
69 ADIOI_PVFS2_error_convert(ret),
70 "Error in pvfs_request_contig (memory)", 0);
71 return;
72 }
73
74
75 ret = PVFS_Request_contiguous(len, PVFS_BYTE, &(aio_req->file_req));
76
77 if (ret != 0) {
78 *error_code = MPIO_Err_create_code(MPI_SUCCESS,
79 MPIR_ERR_RECOVERABLE,
80 myname, __LINE__,
81 ADIOI_PVFS2_error_convert(ret),
82 "Error in pvfs_request_contig (file)", 0);
83 return;
84 }
85
86
87 if (file_ptr_type == ADIO_INDIVIDUAL) {
88
89 offset = fd->fp_ind;
90 }
91 if (flag == READ) {
92 #ifdef ADIOI_MPE_LOGGING
93 MPE_Log_event( ADIOI_MPE_iread_a, 0, NULL );
94 #endif
95 ret = PVFS_isys_read(pvfs_fs->object_ref, aio_req->file_req, offset,
96 buf, aio_req->mem_req, &(pvfs_fs->credentials),
97 &(aio_req->resp_io), &(aio_req->op_id), NULL);
98 #ifdef ADIOI_MPE_LOGGING
99 MPE_Log_event( ADIOI_MPE_iread_b, 0, NULL );
100 #endif
101 } else if (flag == WRITE) {
102 #ifdef ADIOI_MPE_LOGGING
103 MPE_Log_event( ADIOI_MPE_iwrite_a, 0, NULL );
104 #endif
105 ret = PVFS_isys_write(pvfs_fs->object_ref, aio_req->file_req, offset,
106 buf, aio_req->mem_req, &(pvfs_fs->credentials),
107 &(aio_req->resp_io), &(aio_req->op_id), NULL);
108 #ifdef ADIOI_MPE_LOGGING
109 MPE_Log_event( ADIOI_MPE_iwrite_b, 0, NULL );
110 #endif
111 }
112
113
114 if (ret < 0 ) {
115 *error_code = MPIO_Err_create_code(MPI_SUCCESS,
116 MPIR_ERR_RECOVERABLE,
117 myname, __LINE__,
118 ADIOI_PVFS2_error_convert(ret),
119 "Error in PVFS_isys_io", 0);
120 goto fn_exit;
121 }
122
123
124
125 if (ret == 0) {
126 if (ADIOI_PVFS2_greq_class == 0) {
127 MPIX_Grequest_class_create(ADIOI_GEN_aio_query_fn,
128 ADIOI_PVFS2_aio_free_fn, MPIU_Greq_cancel_fn,
129 ADIOI_PVFS2_aio_poll_fn, ADIOI_PVFS2_aio_wait_fn,
130 &ADIOI_PVFS2_greq_class);
131 }
132 MPIX_Grequest_class_allocate(ADIOI_PVFS2_greq_class, aio_req, request);
133 memcpy(&(aio_req->req), request, sizeof(*request));
134 }
135
136
137 if (ret == 1) {
138 MPIO_Completed_request_create(&fd, len, error_code, request);
139 }
140
141 if (file_ptr_type == ADIO_INDIVIDUAL) {
142 fd->fp_ind += len;
143 }
144 fd->fp_sys_posn = offset + len;
145
146 *error_code = MPI_SUCCESS;
147 fn_exit:
148 return;
149 }
150
151 int ADIOI_PVFS2_aio_free_fn(void *extra_state)
152 {
153 ADIOI_AIO_Request *aio_req;
154 aio_req = (ADIOI_AIO_Request*)extra_state;
155
156 PVFS_Request_free(&(aio_req->mem_req));
157 PVFS_Request_free(&(aio_req->file_req));
158 ADIOI_Free(aio_req);
159
160 return MPI_SUCCESS;
161 }
162
163 int ADIOI_PVFS2_aio_poll_fn(void *extra_state, MPI_Status *status)
164 {
165 ADIOI_AIO_Request *aio_req;
166 int ret, error;
167
168 aio_req = (ADIOI_AIO_Request *)extra_state;
169
170
171 ret = PVFS_sys_wait(aio_req->op_id, "ADIOI_PVFS2_aio_poll_fn", &error);
172 if (ret == 0) {
173 aio_req->nbytes = aio_req->resp_io.total_completed;
174 MPI_Grequest_complete(aio_req->req);
175 return MPI_SUCCESS;
176 } else
177 return MPI_UNDEFINED;
178 }
179
180
181 int ADIOI_PVFS2_aio_wait_fn(int count, void ** array_of_states,
182 double timeout, MPI_Status *status)
183 {
184
185 ADIOI_AIO_Request **aio_reqlist;
186 PVFS_sys_op_id *op_id_array;
187 int i,j, greq_count, completed_count=0;
188 int *error_array;
189
190 aio_reqlist = (ADIOI_AIO_Request **)array_of_states;
191
192 op_id_array = (PVFS_sys_op_id*)ADIOI_Calloc(count, sizeof(PVFS_sys_op_id));
193 error_array = (int *)ADIOI_Calloc(count, sizeof(int));
194 greq_count = count;
195
196
197
198
199
200 while (completed_count < greq_count ) {
201 count = greq_count;
202 PVFS_sys_testsome(op_id_array, &count, NULL, error_array, INT_MAX);
203 completed_count += count;
204 for (i=0; i< count; i++) {
205 for (j=0; j<greq_count; j++) {
206 if (op_id_array[i] == aio_reqlist[j]->op_id) {
207 aio_reqlist[j]->nbytes =
208 aio_reqlist[j]->resp_io.total_completed;
209 MPI_Grequest_complete(aio_reqlist[j]->req);
210 }
211 }
212 }
213 }
214 return MPI_SUCCESS;
215 }
216
217
218
219
220