This source file includes following definitions.
- ADIOI_GEN_IwriteContig
- ADIOI_GEN_aio
- ADIOI_GEN_IwriteStrided
- ADIOI_GEN_aio_poll_fn
- ADIOI_GEN_aio_wait_fn
- ADIOI_GEN_aio_free_fn
- ADIOI_GEN_aio_query_fn
1
2
3
4
5
6
7
8 #include "adio.h"
9
10 #ifdef HAVE_UNISTD_H
11 #include <unistd.h>
12 #endif
13 #ifdef HAVE_SIGNAL_H
14 #include <signal.h>
15 #endif
16 #ifdef HAVE_SYS_TYPES_H
17 #include <sys/types.h>
18 #endif
19 #ifdef HAVE_AIO_H
20 #include <aio.h>
21 #endif
22 #ifdef HAVE_SYS_AIO_H
23 #include <sys/aio.h>
24 #endif
25 #include <time.h>
26
27 #include "../../mpi-io/mpioimpl.h"
28 #include "../../mpi-io/mpioprof.h"
29 #include "mpiu_greq.h"
30
31
32 #if !defined(__REDIRECT) && defined(__USE_FILE_OFFSET64)
33 #define aiocb aiocb64
34 #endif
35
36 #ifdef ROMIO_HAVE_WORKING_AIO
37
38 static MPIX_Grequest_class ADIOI_GEN_greq_class = 0;
39
40
41
42
43
44
45
46
47 void ADIOI_GEN_IwriteContig(ADIO_File fd, const void *buf, int count,
48 MPI_Datatype datatype, int file_ptr_type,
49 ADIO_Offset offset, ADIO_Request *request,
50 int *error_code)
51 {
52 MPI_Count len, typesize;
53 int aio_errno = 0;
54 static char myname[] = "ADIOI_GEN_IWRITECONTIG";
55
56 MPI_Type_size_x(datatype, &typesize);
57 len = count * typesize;
58 ADIOI_Assert(len == (int)((ADIO_Offset)count * (ADIO_Offset)typesize));
59
60 if (file_ptr_type == ADIO_INDIVIDUAL) offset = fd->fp_ind;
61
62
63 aio_errno = ADIOI_GEN_aio(fd, (char *) buf, len, offset, 1, request);
64 if (file_ptr_type == ADIO_INDIVIDUAL) fd->fp_ind += len;
65
66 fd->fp_sys_posn = -1;
67
68
69 if (aio_errno != 0) {
70 MPIO_ERR_CREATE_CODE_ERRNO(myname, aio_errno, error_code);
71 return;
72 }
73
74
75 *error_code = MPI_SUCCESS;
76 }
77
78
79
80
81
82
83 int ADIOI_GEN_aio(ADIO_File fd, void *buf, int len, ADIO_Offset offset,
84 int wr, MPI_Request *request)
85 {
86 int err=-1, fd_sys;
87
88 int error_code;
89 struct aiocb *aiocbp=NULL;
90 ADIOI_AIO_Request *aio_req=NULL;
91 MPI_Status status;
92 #if defined(ROMIO_XFS)
93 unsigned maxiosz = wr ? fd->hints->fs_hints.xfs.write_chunk_sz :
94 fd->hints->fs_hints.xfs.read_chunk_sz;
95 #endif
96
97 fd_sys = fd->fd_sys;
98
99 #if defined(ROMIO_XFS)
100
101 if (fd->fns == &ADIO_XFS_operations &&
102 ((wr && fd->direct_write) || (!wr && fd->direct_read)) &&
103 !(((long) buf) % fd->d_mem) && !(offset % fd->d_miniosz) &&
104 !(len % fd->d_miniosz) && (len >= fd->d_miniosz) &&
105 (len <= maxiosz)) {
106 fd_sys = fd->fd_direct;
107 }
108 #endif
109
110 aio_req = (ADIOI_AIO_Request*)ADIOI_Calloc(sizeof(ADIOI_AIO_Request), 1);
111 aiocbp = (struct aiocb *) ADIOI_Calloc(sizeof(struct aiocb), 1);
112 aiocbp->aio_offset = offset;
113 aiocbp->aio_buf = buf;
114 aiocbp->aio_nbytes = len;
115
116 #ifdef HAVE_STRUCT_AIOCB_AIO_WHENCE
117 aiocbp->aio_whence = SEEK_SET;
118 #endif
119 #ifdef HAVE_STRUCT_AIOCB_AIO_FILDES
120 aiocbp->aio_fildes = fd_sys;
121 #endif
122 #ifdef HAVE_STRUCT_AIOCB_AIO_SIGEVENT
123 # ifdef AIO_SIGNOTIFY_NONE
124 aiocbp->aio_sigevent.sigev_notify = SIGEV_NONE;
125 # endif
126 aiocbp->aio_sigevent.sigev_signo = 0;
127 #endif
128 #ifdef HAVE_STRUCT_AIOCB_AIO_REQPRIO
129 # ifdef AIO_PRIO_DFL
130 aiocbp->aio_reqprio = AIO_PRIO_DFL;
131 # else
132 aiocbp->aio_reqprio = 0;
133 # endif
134 #endif
135
136 #ifndef ROMIO_HAVE_AIO_CALLS_NEED_FILEDES
137 #ifndef HAVE_STRUCT_AIOCB_AIO_FILDES
138 #error 'No fildes set for aio structure'
139 #endif
140 if (wr) err = aio_write(aiocbp);
141 else err = aio_read(aiocbp);
142 #else
143
144 if (wr) err = aio_write(fd_sys, aiocbp);
145 else err = aio_read(fd_sys, aiocbp);
146 #endif
147
148 if (err == -1) {
149 if (errno == EAGAIN || errno == ENOSYS) {
150
151
152
153 if (wr)
154 ADIO_WriteContig(fd, buf, len, MPI_BYTE,
155 ADIO_EXPLICIT_OFFSET, offset, &status, &error_code);
156 else
157 ADIO_ReadContig(fd, buf, len, MPI_BYTE,
158 ADIO_EXPLICIT_OFFSET, offset, &status, &error_code);
159
160 MPIO_Completed_request_create(&fd, len, &error_code, request);
161 if (aiocbp != NULL) ADIOI_Free(aiocbp);
162 if (aio_req != NULL) ADIOI_Free(aio_req);
163 return 0;
164 } else {
165 ADIOI_Free(aio_req);
166 ADIOI_Free(aiocbp);
167 return errno;
168 }
169 }
170 aio_req->aiocbp = aiocbp;
171 if (ADIOI_GEN_greq_class == 0) {
172 MPIX_Grequest_class_create(ADIOI_GEN_aio_query_fn,
173 ADIOI_GEN_aio_free_fn, MPIU_Greq_cancel_fn,
174 ADIOI_GEN_aio_poll_fn, ADIOI_GEN_aio_wait_fn,
175 &ADIOI_GEN_greq_class);
176 }
177 MPIX_Grequest_class_allocate(ADIOI_GEN_greq_class, aio_req, request);
178 memcpy(&(aio_req->req), request, sizeof(MPI_Request));
179 return 0;
180 }
181 #endif
182
183
184
185
186
187 void ADIOI_GEN_IwriteStrided(ADIO_File fd, const void *buf, int count,
188 MPI_Datatype datatype, int file_ptr_type,
189 ADIO_Offset offset, MPI_Request *request,
190 int *error_code)
191 {
192 ADIO_Status status;
193 MPI_Count typesize;
194 MPI_Offset nbytes=0;
195
196
197
198
199 ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type,
200 offset, &status, error_code);
201
202 if (*error_code == MPI_SUCCESS) {
203 MPI_Type_size_x(datatype, &typesize);
204 nbytes = (MPI_Offset)count * (MPI_Offset)typesize;
205 }
206 MPIO_Completed_request_create(&fd, nbytes, error_code, request);
207 }
208
209 #ifdef ROMIO_HAVE_WORKING_AIO
210
211 int ADIOI_GEN_aio_poll_fn(void *extra_state, MPI_Status *status)
212 {
213 ADIOI_AIO_Request *aio_req;
214 int errcode=MPI_SUCCESS;
215
216 aio_req = (ADIOI_AIO_Request *)extra_state;
217
218
219 errno = aio_error(aio_req->aiocbp);
220 if (errno == EINPROGRESS) {
221
222 }
223 else if (errno == ECANCELED) {
224
225 } else if (errno == 0) {
226 ssize_t n = aio_return(aio_req->aiocbp);
227 aio_req->nbytes = n;
228 errcode = MPI_Grequest_complete(aio_req->req);
229
230 if (errcode != MPI_SUCCESS) {
231 errcode = MPIO_Err_create_code(MPI_SUCCESS,
232 MPIR_ERR_RECOVERABLE,
233 "ADIOI_GEN_aio_poll_fn", __LINE__,
234 MPI_ERR_IO, "**mpi_grequest_complete",
235 0);
236 }
237
238 }
239 return errcode;
240 }
241
242
243 int ADIOI_GEN_aio_wait_fn(int count, void ** array_of_states,
244 double timeout, MPI_Status *status)
245 {
246 const struct aiocb **cblist;
247 int err, errcode=MPI_SUCCESS;
248 int nr_complete=0;
249 double starttime;
250 struct timespec aio_timer;
251 struct timespec *aio_timer_p = NULL;
252
253 ADIOI_AIO_Request **aio_reqlist;
254 int i;
255
256 aio_reqlist = (ADIOI_AIO_Request **)array_of_states;
257
258 cblist = (const struct aiocb**) ADIOI_Calloc(count, sizeof(struct aiocb*));
259
260 starttime = MPI_Wtime();
261 if (timeout >0) {
262 aio_timer.tv_sec = (time_t)timeout;
263 aio_timer.tv_nsec = timeout - aio_timer.tv_sec;
264 aio_timer_p = &aio_timer;
265 }
266 for (i=0; i< count; i++)
267 {
268 cblist[i] = aio_reqlist[i]->aiocbp;
269 }
270
271 while(nr_complete < count) {
272 do {
273 err = aio_suspend(cblist, count, aio_timer_p);
274 } while (err < 0 && errno == EINTR);
275 if (err == 0)
276 {
277
278 for (i=0; i< count; i++)
279 {
280
281 if (aio_reqlist[i]->aiocbp == NULL)
282 continue;
283 errno = aio_error(aio_reqlist[i]->aiocbp);
284 if (errno == 0) {
285 ssize_t n = aio_return(aio_reqlist[i]->aiocbp);
286 aio_reqlist[i]->nbytes = n;
287 errcode = MPI_Grequest_complete(aio_reqlist[i]->req);
288 if (errcode != MPI_SUCCESS) {
289 errcode = MPIO_Err_create_code(MPI_SUCCESS,
290 MPIR_ERR_RECOVERABLE,
291 "ADIOI_GEN_aio_wait_fn",
292 __LINE__, MPI_ERR_IO,
293 "**mpi_grequest_complete", 0);
294 }
295 ADIOI_Free(aio_reqlist[i]->aiocbp);
296 aio_reqlist[i]->aiocbp = NULL;
297 cblist[i] = NULL;
298 nr_complete++;
299 }
300
301 }
302 }
303 if ( (timeout > 0) && (timeout < (MPI_Wtime() - starttime) ))
304 break;
305 }
306
307 if (cblist != NULL) ADIOI_Free(cblist);
308 return errcode;
309 }
310
311 int ADIOI_GEN_aio_free_fn(void *extra_state)
312 {
313 ADIOI_AIO_Request *aio_req;
314 aio_req = (ADIOI_AIO_Request*)extra_state;
315
316 if (aio_req->aiocbp != NULL)
317 ADIOI_Free(aio_req->aiocbp);
318 ADIOI_Free(aio_req);
319
320 return MPI_SUCCESS;
321 }
322 #endif
323
324 int ADIOI_GEN_aio_query_fn(void *extra_state, MPI_Status *status)
325 {
326 ADIOI_AIO_Request *aio_req;
327
328 aio_req = (ADIOI_AIO_Request *)extra_state;
329
330 MPI_Status_set_elements_x(status, MPI_BYTE, aio_req->nbytes);
331
332
333 MPI_Status_set_cancelled(status, 0);
334
335
336 status->MPI_SOURCE = MPI_UNDEFINED;
337
338 status->MPI_TAG = MPI_UNDEFINED;
339
340 return MPI_SUCCESS;
341 }
342
343
344