This source file includes following definitions.
- readcontig_ctl_cb
- readcontig_data_cb
- readdiscontig_ctl_cb
- readdiscontig_data_cb
- ADIOI_GRIDFTP_ReadContig
- ADIOI_GRIDFTP_ReadDiscontig
- ADIOI_GRIDFTP_ReadStrided
1
2
3
4
5
6
7
8 #include "ad_gridftp.h"
9 #include "adioi.h"
10 #include "adio_extern.h"
11
12 static globus_mutex_t readcontig_ctl_lock;
13 static globus_cond_t readcontig_ctl_cond;
14 static globus_bool_t readcontig_ctl_done;
15 static void readcontig_ctl_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error)
16 {
17 if (error)
18 {
19 FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
20 }
21 globus_mutex_lock(&readcontig_ctl_lock);
22 if ( readcontig_ctl_done!=GLOBUS_TRUE )
23 readcontig_ctl_done=GLOBUS_TRUE;
24 globus_cond_signal(&readcontig_ctl_cond);
25 globus_mutex_unlock(&readcontig_ctl_lock);
26 return;
27 }
28
29 static void readcontig_data_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error,
30 globus_byte_t *buffer, globus_size_t length, globus_off_t offset,
31 globus_bool_t eof)
32 {
33 globus_size_t *bytes_read;
34
35 bytes_read=(globus_size_t *)myargs;
36 if (error)
37 {
38 FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
39 }
40 *bytes_read+=length;
41
42
43
44
45
46
47
48
49
50
51
52
53 if ( !eof )
54 globus_ftp_client_register_read(handle,
55 buffer+length,
56 length,
57 readcontig_data_cb,
58 (void *)(bytes_read));
59 return;
60 }
61
62 static globus_mutex_t readdiscontig_ctl_lock;
63 static globus_cond_t readdiscontig_ctl_cond;
64 static globus_bool_t readdiscontig_ctl_done;
65 static void readdiscontig_ctl_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error)
66 {
67 if (error)
68 {
69 FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
70 }
71 globus_mutex_lock(&readdiscontig_ctl_lock);
72 if ( readdiscontig_ctl_done!=GLOBUS_TRUE )
73 readdiscontig_ctl_done=GLOBUS_TRUE;
74 globus_cond_signal(&readdiscontig_ctl_cond);
75 globus_mutex_unlock(&readdiscontig_ctl_lock);
76 return;
77 }
78
79 static void readdiscontig_data_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error,
80 globus_byte_t *buffer, globus_size_t length, globus_off_t offset,
81 globus_bool_t eof)
82 {
83 globus_size_t *bytes_read;
84
85 bytes_read=(globus_size_t *)myargs;
86 if (error)
87 {
88 FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
89 }
90 *bytes_read+=length;
91
92
93
94 if ( !eof )
95 globus_ftp_client_register_read(handle,
96 buffer,
97 length,
98 readdiscontig_data_cb,
99 (void *)(bytes_read));
100 return;
101 }
102
103 void ADIOI_GRIDFTP_ReadContig(ADIO_File fd, void *buf, int count,
104 MPI_Datatype datatype, int file_ptr_type,
105 ADIO_Offset offset, ADIO_Status *status, int
106 *error_code)
107 {
108 static char myname[]="ADIOI_GRIDFTP_ReadContig";
109 int myrank, nprocs;
110 MPI_Count datatype_size;
111 globus_size_t len,bytes_read=0;
112 globus_off_t goff;
113 globus_result_t result;
114
115 if ( fd->access_mode&ADIO_WRONLY )
116 {
117 *error_code=MPIR_ERR_MODE_WRONLY;
118 return;
119 }
120
121 *error_code = MPI_SUCCESS;
122
123 MPI_Comm_size(fd->comm, &nprocs);
124 MPI_Comm_rank(fd->comm, &myrank);
125 MPI_Type_size_x(datatype, &datatype_size);
126
127 if (file_ptr_type != ADIO_EXPLICIT_OFFSET)
128 {
129 offset = fd->fp_ind;
130 }
131
132
133 goff = (globus_off_t)offset;
134 len = ((globus_size_t)datatype_size)*((globus_size_t)count);
135
136 globus_mutex_init(&readcontig_ctl_lock, GLOBUS_NULL);
137 globus_cond_init(&readcontig_ctl_cond, GLOBUS_NULL);
138 readcontig_ctl_done=GLOBUS_FALSE;
139 if ( (result=globus_ftp_client_partial_get(&(gridftp_fh[fd->fd_sys]),
140 fd->filename,
141 &(oattr[fd->fd_sys]),
142 GLOBUS_NULL,
143 goff,
144 goff+(globus_off_t)len,
145 readcontig_ctl_cb,
146 GLOBUS_NULL))!=GLOBUS_SUCCESS )
147 {
148 globus_err_handler("globus_ftp_client_partial_get",myname,result);
149 *error_code=MPI_ERR_IO;
150 ADIOI_Error(fd,*error_code,myname);
151 return;
152 }
153 result=globus_ftp_client_register_read(&(gridftp_fh[fd->fd_sys]),
154 (globus_byte_t *)buf, len, readcontig_data_cb,
155 (void *)(&bytes_read));
156 if ( result != GLOBUS_SUCCESS )
157 {
158 globus_err_handler("globus_ftp_client_register_read",myname,result);
159 *error_code = MPIO_Err_create_code(MPI_SUCCESS,
160 MPIR_ERR_RECOVERABLE, myname, __LINE__,
161 MPI_ERR_IO, "**io", "**io %s",
162 globus_object_printable_to_string(globus_error_get(result)));
163 return;
164 }
165
166
167
168
169 globus_mutex_lock(&readcontig_ctl_lock);
170 while ( readcontig_ctl_done!=GLOBUS_TRUE )
171 globus_cond_wait(&readcontig_ctl_cond,&readcontig_ctl_lock);
172 globus_mutex_unlock(&readcontig_ctl_lock);
173
174 globus_mutex_destroy(&readcontig_ctl_lock);
175 globus_cond_destroy(&readcontig_ctl_cond);
176
177 #ifdef HAVE_STATUS_SET_BYTES
178 MPIR_Status_set_bytes(status, datatype, bytes_read);
179 #endif
180 if (file_ptr_type != ADIO_EXPLICIT_OFFSET)
181 {
182 fd->fp_ind += bytes_read;
183 fd->fp_sys_posn = fd->fp_ind;
184 }
185 else {
186 fd->fp_sys_posn = offset + bytes_read;
187 }
188 }
189
190 void ADIOI_GRIDFTP_ReadDiscontig(ADIO_File fd, void *buf, int count,
191 MPI_Datatype datatype, int file_ptr_type,
192 ADIO_Offset offset, ADIO_Status *status, int
193 *error_code)
194 {
195 char myname[]="ADIOI_GRIDFTP_ReadDiscontig";
196 int myrank,nprocs;
197
198 MPI_Aint btype_size,btype_extent,btype_lb;
199
200 MPI_Aint ftype_size,ftype_extent,ftype_lb;
201
202 MPI_Aint etype_size;
203 MPI_Aint extent;
204 ADIOI_Flatlist_node *flat_file;
205 int i,buf_contig,boff,nblks;
206 globus_off_t start,end,goff;
207 globus_size_t bytes_read;
208 globus_result_t result;
209 globus_byte_t *tmp;
210
211 if ( fd->access_mode&ADIO_WRONLY )
212 {
213 *error_code=MPIR_ERR_MODE_WRONLY;
214 return;
215 }
216
217 *error_code=MPI_SUCCESS;
218
219 MPI_Comm_rank(fd->comm,&myrank);
220 MPI_Comm_size(fd->comm,&nprocs);
221
222 etype_size=fd->etype_size;
223 MPI_Type_size_x(fd->filetype,&ftype_size);
224 MPI_Type_get_extent(fd->filetype,&ftype_lb,&ftype_extent);
225
226
227 MPI_Type_size_x(datatype,&btype_size);
228 MPI_Type_get_extent(datatype,&btype_lb,&btype_extent);
229 ADIOI_Datatype_iscontig(datatype,&buf_contig);
230
231 if ( ( btype_extent!=btype_size ) || ( ! buf_contig ) )
232 {
233 FPRINTF(stderr,"[%d/%d] %s called with discontigous memory buffer\n",
234 myrank,nprocs,myname);
235 fflush(stderr);
236 *error_code = MPIO_Err_create_code(MPI_SUCCESS,
237 MPIR_ERR_RECOVERABLE, myname, __LINE__,
238 MPI_ERR_IO, "**io", 0 );
239 return;
240 }
241
242
243
244 flat_file = ADIOI_Flatten_and_find(fd->filetype);
245
246
247 start=(globus_off_t)(offset*etype_size);
248 goff=start;
249 boff=0;
250 extent=0;
251 nblks=0;
252 while ( boff < (count*btype_size) )
253 {
254 int blklen=0;
255
256 for (i=0;i<flat_file->count;i++)
257 {
258
259 if ( (boff+flat_file->blocklens[i]) < (count*btype_size) )
260 blklen=flat_file->blocklens[i];
261 else
262 blklen=(count*btype_size)-boff;
263
264 boff+=blklen;
265
266
267
268 extent=MAX(extent,nblks*ftype_extent+flat_file->indices[i]+blklen);
269 if ( boff>=(count*btype_size) )
270 break;
271 }
272 nblks++;
273 }
274 if ( extent < count*btype_size )
275 {
276 FPRINTF(stderr,"[%d/%d] %s error in computing extent -- extent %d is smaller than total bytes requested %d!\n",
277 myrank,nprocs,myname,extent,count*btype_size);
278 fflush(stderr);
279 *error_code = MPIO_Err_create_code(MPI_SUCCESS,
280 MPIR_ERR_RECOVERABLE, myname, __LINE__,
281 MPI_ERR_IO, "**io", 0);
282 return;
283 }
284 end=start+(globus_off_t)extent;
285 tmp=(globus_byte_t *)ADIOI_Malloc((size_t)extent*sizeof(globus_byte_t));
286
287
288 globus_mutex_init(&readdiscontig_ctl_lock, GLOBUS_NULL);
289 globus_cond_init(&readdiscontig_ctl_cond, GLOBUS_NULL);
290 readdiscontig_ctl_done=GLOBUS_FALSE;
291 if ( (result=globus_ftp_client_partial_get(&(gridftp_fh[fd->fd_sys]),
292 fd->filename,
293 &(oattr[fd->fd_sys]),
294 GLOBUS_NULL,
295 start,
296 end,
297 readdiscontig_ctl_cb,
298 GLOBUS_NULL))!=GLOBUS_SUCCESS )
299 {
300 globus_err_handler("globus_ftp_client_partial_get",myname,result);
301 *error_code = MPIO_Err_create_code(MPI_SUCCESS,
302 MPIR_ERR_RECOVERABLE, myname, __LINE__,
303 MPI_ERR_IO, "**io", "**io %s",
304 globus_object_printable_to_string(globus_error_get(result)));
305 return;
306 }
307
308
309
310
311
312
313
314
315
316 if ( (result=globus_ftp_client_register_read(&(gridftp_fh[fd->fd_sys]),
317 tmp,
318 (globus_size_t)extent,
319 readdiscontig_data_cb,
320 (void *)(&bytes_read)))!=GLOBUS_SUCCESS )
321 {
322 globus_err_handler("globus_ftp_client_register_read",myname,result);
323 *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
324 myname, __LINE__, MPI_ERR_IO,
325 "**io",
326 "**io %s", globus_object_printable_to_string(globus_error_get(result)));
327 return;
328 }
329
330
331 globus_mutex_lock(&readdiscontig_ctl_lock);
332 while ( readdiscontig_ctl_done!=GLOBUS_TRUE )
333 globus_cond_wait(&readdiscontig_ctl_cond,&readdiscontig_ctl_lock);
334 globus_mutex_unlock(&readdiscontig_ctl_lock);
335
336 globus_mutex_destroy(&readdiscontig_ctl_lock);
337 globus_cond_destroy(&readdiscontig_ctl_cond);
338
339 boff=0;
340 nblks=0;
341 goff=0;
342 while ( boff < (count*btype_size) )
343 {
344 int i,blklen;
345
346 for (i=0;i<flat_file->count;i++)
347 {
348 if ( (boff+flat_file->blocklens[i]) < (count*btype_size) )
349 blklen=flat_file->blocklens[i];
350 else
351 blklen=(count*btype_size)-boff;
352 if ( blklen > 0 )
353 {
354 goff=nblks*ftype_extent+flat_file->indices[i];
355 memcpy((globus_byte_t *)buf+boff,tmp+goff,(size_t)blklen);
356 boff+=blklen;
357 if ( boff>=(count*btype_size) )
358 break;
359 }
360 }
361 nblks++;
362 }
363 ADIOI_Free(tmp);
364
365 #ifdef HAVE_STATUS_SET_BYTES
366 MPIR_Status_set_bytes(status, datatype, bytes_read);
367 #endif
368 if (file_ptr_type != ADIO_EXPLICIT_OFFSET)
369 {
370 fd->fp_ind += extent;
371 fd->fp_sys_posn = fd->fp_ind;
372 }
373 else {
374 fd->fp_sys_posn = offset + extent;
375 }
376 }
377
378 void ADIOI_GRIDFTP_ReadStrided(ADIO_File fd, void *buf, int count,
379 MPI_Datatype datatype, int file_ptr_type,
380 ADIO_Offset offset, ADIO_Status *status, int
381 *error_code)
382 {
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402 char myname[]="ADIOI_GRIDFTP_ReadStrided";
403 int myrank, nprocs;
404 int i,j;
405 int buf_contig,file_contig;
406 MPI_Aint btype_size,bufsize;
407 globus_off_t start,disp;
408 globus_size_t bytes_read;
409 globus_byte_t *intermediate;
410
411 *error_code = MPI_SUCCESS;
412
413 MPI_Comm_size(fd->comm, &nprocs);
414 MPI_Comm_rank(fd->comm, &myrank);
415
416 MPI_Type_size_x(datatype,&btype_size);
417 bufsize=count*btype_size;
418 ADIOI_Datatype_iscontig(fd->filetype,&file_contig);
419 ADIOI_Datatype_iscontig(datatype,&buf_contig);
420 if ( buf_contig && !file_contig )
421 {
422
423 ADIOI_GRIDFTP_ReadDiscontig(fd, buf, count, datatype,
424 file_ptr_type, offset, status, error_code);
425 }
426 else if ( !buf_contig && file_contig )
427 {
428
429 int posn=0;
430
431
432 intermediate=(globus_byte_t *)ADIOI_Malloc((size_t)bufsize);
433 ADIOI_GRIDFTP_ReadContig(fd, intermediate, bufsize, MPI_BYTE,
434 file_ptr_type, offset, status, error_code);
435
436
437 MPI_Unpack(intermediate,bufsize,&posn,buf,count,datatype,fd->comm);
438
439 ADIOI_Free(intermediate);
440 }
441 else if ( !buf_contig && !file_contig )
442 {
443
444 int posn=0;
445
446
447 intermediate=(globus_byte_t *)ADIOI_Malloc((size_t)bufsize);
448 ADIOI_GRIDFTP_ReadDiscontig(fd, intermediate, bufsize, MPI_BYTE,
449 file_ptr_type, offset, status, error_code);
450
451
452 posn=0;
453 MPI_Unpack(intermediate,bufsize,&posn,buf,count,datatype,fd->comm);
454
455 ADIOI_Free(intermediate);
456 }
457 else
458 {
459
460 ADIOI_GRIDFTP_ReadContig(fd, buf, count, datatype,
461 file_ptr_type, offset, status, error_code);
462 }
463
464 }
465