This source file includes following definitions.
- mca_fbtl_posix_component_init_query
- mca_fbtl_posix_component_file_query
- mca_fbtl_posix_component_file_unquery
- mca_fbtl_posix_module_init
- mca_fbtl_posix_module_finalize
- mca_fbtl_posix_progress
- mca_fbtl_posix_request_free
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 #include "ompi_config.h"
30 #include "mpi.h"
31
32 #include <unistd.h>
33 #include <sys/uio.h>
34 #if HAVE_AIO_H
35 #include <aio.h>
36 #endif
37
38 int fbtl_posix_max_aio_active_reqs=2048;
39
40 #include "ompi/mca/fbtl/fbtl.h"
41 #include "ompi/mca/fbtl/posix/fbtl_posix.h"
42
43
44
45
46
47
48 static mca_fbtl_base_module_1_0_0_t posix = {
49 mca_fbtl_posix_module_init,
50 mca_fbtl_posix_module_finalize,
51 mca_fbtl_posix_preadv,
52 #if defined (FBTL_POSIX_HAVE_AIO)
53 mca_fbtl_posix_ipreadv,
54 #else
55 NULL,
56 #endif
57 mca_fbtl_posix_pwritev,
58 #if defined (FBTL_POSIX_HAVE_AIO)
59 mca_fbtl_posix_ipwritev,
60 mca_fbtl_posix_progress,
61 mca_fbtl_posix_request_free
62 #else
63 NULL,
64 NULL,
65 NULL
66 #endif
67 };
68
69
70
71
72
73
74 int mca_fbtl_posix_component_init_query(bool enable_progress_threads,
75 bool enable_mpi_threads) {
76
77
78 return OMPI_SUCCESS;
79 }
80
81 struct mca_fbtl_base_module_1_0_0_t *
82 mca_fbtl_posix_component_file_query (ompio_file_t *fh, int *priority) {
83 *priority = mca_fbtl_posix_priority;
84
85 if (UFS == fh->f_fstype) {
86 if (*priority < 50) {
87 *priority = 50;
88 }
89 }
90
91 return &posix;
92 }
93
94 int mca_fbtl_posix_component_file_unquery (ompio_file_t *file) {
95
96
97
98
99 return OMPI_SUCCESS;
100 }
101
102 int mca_fbtl_posix_module_init (ompio_file_t *file) {
103
104 #if defined (FBTL_POSIX_HAVE_AIO)
105 long val = sysconf(_SC_AIO_MAX);
106 if ( -1 != val ) {
107 fbtl_posix_max_aio_active_reqs = (int)val;
108 }
109 #endif
110 return OMPI_SUCCESS;
111 }
112
113
114 int mca_fbtl_posix_module_finalize (ompio_file_t *file) {
115 return OMPI_SUCCESS;
116 }
117
118 bool mca_fbtl_posix_progress ( mca_ompio_request_t *req)
119 {
120 bool ret=false;
121 #if defined (FBTL_POSIX_HAVE_AIO)
122 int i=0, lcount=0, ret_code=0;
123 mca_fbtl_posix_request_data_t *data=(mca_fbtl_posix_request_data_t *)req->req_data;
124 off_t start_offset, end_offset, total_length;
125
126 for (i=data->aio_first_active_req; i < data->aio_last_active_req; i++ ) {
127 if ( EINPROGRESS == data->aio_req_status[i] ) {
128 data->aio_req_status[i] = aio_error ( &data->aio_reqs[i]);
129 if ( 0 == data->aio_req_status[i]){
130 data->aio_open_reqs--;
131 lcount++;
132
133
134
135
136
137 data->aio_total_len += aio_return (&data->aio_reqs[i]);
138 }
139 else if ( EINPROGRESS == data->aio_req_status[i]){
140
141 continue;
142 }
143 else {
144
145
146 req->req_ompi.req_status.MPI_ERROR = OMPI_ERROR;
147 req->req_ompi.req_status._ucount = data->aio_total_len;
148 ret = true;
149 break;
150 }
151 }
152 else {
153 lcount++;
154 }
155 }
156 #if 0
157 printf("lcount=%d open_reqs=%d\n", lcount, data->aio_open_reqs );
158 #endif
159
160 if ( (lcount == data->aio_req_chunks) && (0 != data->aio_open_reqs )) {
161
162 mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
163
164
165 data->aio_first_active_req = data->aio_last_active_req;
166 if ( (data->aio_req_count-data->aio_last_active_req) > data->aio_req_chunks ) {
167 data->aio_last_active_req += data->aio_req_chunks;
168 }
169 else {
170 data->aio_last_active_req = data->aio_req_count;
171 }
172
173 start_offset = data->aio_reqs[data->aio_first_active_req].aio_offset;
174 end_offset = data->aio_reqs[data->aio_last_active_req-1].aio_offset + data->aio_reqs[data->aio_last_active_req-1].aio_nbytes;
175 total_length = (end_offset - start_offset);
176
177 if ( FBTL_POSIX_READ == data->aio_req_type ) {
178 ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_RDLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
179 }
180 else if ( FBTL_POSIX_WRITE == data->aio_req_type ) {
181 ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_WRLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
182 }
183 if ( 0 < ret_code ) {
184 opal_output(1, "mca_fbtl_posix_progress: error in mca_fbtl_posix_lock() %d", ret_code);
185
186 mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
187 return OMPI_ERROR;
188 }
189
190 for ( i=data->aio_first_active_req; i< data->aio_last_active_req; i++ ) {
191 if ( FBTL_POSIX_READ == data->aio_req_type ) {
192 if (-1 == aio_read(&data->aio_reqs[i])) {
193 opal_output(1, "mca_fbtl_posix_progress: error in aio_read()");
194 mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
195 return OMPI_ERROR;
196 }
197 }
198 else if ( FBTL_POSIX_WRITE == data->aio_req_type ) {
199 if (-1 == aio_write(&data->aio_reqs[i])) {
200 opal_output(1, "mca_fbtl_posix_progress: error in aio_write()");
201 mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
202 return OMPI_ERROR;
203 }
204 }
205 }
206 #if 0
207 printf("posting new batch: first=%d last=%d\n", data->aio_first_active_req, data->aio_last_active_req );
208 #endif
209 }
210
211 if ( 0 == data->aio_open_reqs ) {
212
213 req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
214 req->req_ompi.req_status._ucount = data->aio_total_len;
215 mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
216 ret = true;
217 }
218 #endif
219 return ret;
220 }
221
222 void mca_fbtl_posix_request_free ( mca_ompio_request_t *req)
223 {
224 #if defined (FBTL_POSIX_HAVE_AIO)
225
226 mca_fbtl_posix_request_data_t *data=(mca_fbtl_posix_request_data_t *)req->req_data;
227 if (NULL != data ) {
228 mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
229 if ( NULL != data->aio_reqs ) {
230 free ( data->aio_reqs);
231 }
232 if ( NULL != data->aio_req_status ) {
233 free ( data->aio_req_status );
234 }
235 free ( data );
236 req->req_data = NULL;
237 }
238 #endif
239 return;
240 }