This source file includes following definitions.
- writecontig_ctl_cb
- writecontig_data_cb
- writediscontig_ctl_cb
- writediscontig_data_cb
- ADIOI_GRIDFTP_WriteContig
- ADIOI_GRIDFTP_WriteDiscontig
- ADIOI_GRIDFTP_WriteStrided
1
2
3
4
5
6
7
8 #include "ad_gridftp.h"
9 #include "adioi.h"
10 #include "adio_extern.h"
11
12 static globus_mutex_t writecontig_ctl_lock;
13 static globus_cond_t writecontig_ctl_cond;
14 static globus_bool_t writecontig_ctl_done;
15 static void writecontig_ctl_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error)
16 {
17 if (error)
18 {
19 FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
20 }
21 globus_mutex_lock(&writecontig_ctl_lock);
22 if ( writecontig_ctl_done!=GLOBUS_TRUE )
23 writecontig_ctl_done=GLOBUS_TRUE;
24 globus_cond_signal(&writecontig_ctl_cond);
25 globus_mutex_unlock(&writecontig_ctl_lock);
26 #ifdef PRINT_ERR_MSG
27 FPRINTF(stderr,"finished with contig write transaction\n");
28 #endif
29 return;
30 }
31
32 static void writecontig_data_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error,
33 globus_byte_t *buffer, globus_size_t length, globus_off_t offset,
34 globus_bool_t eof)
35 {
36 globus_size_t *bytes_written;
37
38 bytes_written=(globus_size_t *)myargs;
39 if (error)
40 {
41 FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
42 }
43 *bytes_written+=length;
44
45
46
47 if ( !eof )
48 {
49 globus_ftp_client_register_write(handle,
50 buffer,
51 length,
52 offset,
53 GLOBUS_TRUE,
54 writecontig_data_cb,
55 (void *)(bytes_written));
56 }
57 #ifdef PRINT_ERR_MSG
58 FPRINTF(stderr,"wrote %Ld bytes...",(long long)length);
59 #endif
60 return;
61 }
62
63
64 static globus_mutex_t writediscontig_ctl_lock;
65 static globus_cond_t writediscontig_ctl_cond;
66 static globus_bool_t writediscontig_ctl_done;
67 static void writediscontig_ctl_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error)
68 {
69 if (error)
70 {
71 FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
72 }
73 globus_mutex_lock(&writediscontig_ctl_lock);
74 if ( writediscontig_ctl_done!=GLOBUS_TRUE )
75 writediscontig_ctl_done=GLOBUS_TRUE;
76 globus_cond_signal(&writediscontig_ctl_cond);
77 globus_mutex_unlock(&writediscontig_ctl_lock);
78 return;
79 }
80
81 static void writediscontig_data_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error,
82 globus_byte_t *buffer, globus_size_t length, globus_off_t offset,
83 globus_bool_t eof)
84 {
85 globus_size_t *bytes_written;
86
87 bytes_written=(globus_size_t *)myargs;
88 if (error)
89 {
90 FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
91 }
92 *bytes_written+=length;
93
94
95
96 if ( !eof )
97 globus_ftp_client_register_write(handle,
98 buffer,
99 length,
100 offset,
101 eof,
102 writediscontig_data_cb,
103 (void *)(bytes_written));
104 FPRINTF(stderr,"wrote %Ld bytes...",(long long)length);
105 return;
106 }
107
108
109 void ADIOI_GRIDFTP_WriteContig(ADIO_File fd, void *buf, int count,
110 MPI_Datatype datatype, int file_ptr_type,
111 ADIO_Offset offset, ADIO_Status *status, int
112 *error_code)
113 {
114 char myname[]="ADIOI_GRIDFTP_WriteContig";
115 int myrank, nprocs;
116 MPI_Count datatype_size;
117 globus_size_t len,bytes_written=0;
118 globus_off_t goff;
119 globus_result_t result;
120
121 if ( fd->access_mode&ADIO_RDONLY )
122 {
123 *error_code=MPI_ERR_AMODE;
124 return;
125 }
126
127 *error_code = MPI_SUCCESS;
128
129 MPI_Comm_size(fd->comm, &nprocs);
130 MPI_Comm_rank(fd->comm, &myrank);
131 MPI_Type_size_x(datatype, &datatype_size);
132
133 if (file_ptr_type != ADIO_EXPLICIT_OFFSET)
134 {
135 offset = fd->fp_ind;
136 }
137
138
139 goff = (globus_off_t)offset;
140 len = ((globus_size_t)datatype_size)*((globus_size_t)count);
141
142 globus_mutex_init(&writecontig_ctl_lock, GLOBUS_NULL);
143 globus_cond_init(&writecontig_ctl_cond, GLOBUS_NULL);
144 writecontig_ctl_done=GLOBUS_FALSE;
145 if ( (result=globus_ftp_client_partial_put(&(gridftp_fh[fd->fd_sys]),
146 fd->filename,
147 &(oattr[fd->fd_sys]),
148 GLOBUS_NULL,
149 goff,
150 goff+(globus_off_t)len,
151 writecontig_ctl_cb,
152 GLOBUS_NULL))!=GLOBUS_SUCCESS )
153 {
154 globus_err_handler("globus_ftp_client_partial_put",myname,result);
155 *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
156 myname, __LINE__, MPI_ERR_IO,
157 "**io",
158 "**io %s", globus_object_printable_to_string(globus_error_get(result)));
159 return;
160 }
161 if ( (result=globus_ftp_client_register_write(&(gridftp_fh[fd->fd_sys]),
162 (globus_byte_t *)buf,
163 len,
164 goff,
165 GLOBUS_TRUE,
166 writecontig_data_cb,
167 (void *)(&bytes_written)))!=GLOBUS_SUCCESS )
168 {
169 globus_err_handler("globus_ftp_client_register_write",myname,result);
170 *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
171 myname, __LINE__, MPI_ERR_IO,
172 "**io",
173 "**io %s", globus_object_printable_to_string(globus_error_get(result)));
174 return;
175 }
176
177
178
179
180 globus_mutex_lock(&writecontig_ctl_lock);
181 while ( writecontig_ctl_done!=GLOBUS_TRUE )
182 globus_cond_wait(&writecontig_ctl_cond,&writecontig_ctl_lock);
183 globus_mutex_unlock(&writecontig_ctl_lock);
184
185 globus_mutex_destroy(&writecontig_ctl_lock);
186 globus_cond_destroy(&writecontig_ctl_cond);
187
188 #ifdef HAVE_STATUS_SET_BYTES
189 MPIR_Status_set_bytes(status, datatype, bytes_written);
190 #endif
191 if (file_ptr_type != ADIO_EXPLICIT_OFFSET)
192 {
193 offset = fd->fp_ind;
194 fd->fp_ind += bytes_written;
195 fd->fp_sys_posn = fd->fp_ind;
196 }
197 else {
198 fd->fp_sys_posn = offset + bytes_written;
199 }
200 }
201
202
203 void ADIOI_GRIDFTP_WriteDiscontig(ADIO_File fd, void *buf, int count,
204 MPI_Datatype datatype, int file_ptr_type,
205 ADIO_Offset offset, ADIO_Status *status, int
206 *error_code)
207 {
208 char myname[]="ADIOI_GRIDFTP_WriteDiscontig";
209 int myrank,nprocs;
210 MPI_Aint btype_size,btype_extent,btype_lb;
211 MPI_Aint ftype_size,ftype_extent,ftype_lb;
212 MPI_Aint etype_size;
213 MPI_Aint extent;
214 ADIOI_Flatlist_node *flat_file;
215 int buf_contig,boff,i,nblks;
216 globus_off_t start,end,goff;
217 globus_size_t bytes_written;
218 globus_result_t result;
219
220 MPI_Comm_rank(fd->comm,&myrank);
221 MPI_Comm_size(fd->comm,&nprocs);
222 etype_size=fd->etype_size;
223 MPI_Type_size_x(fd->filetype,&ftype_size);
224 MPI_Type_get_extent(fd->filetype,&ftype_lb,&ftype_extent);
225
226
227 MPI_Type_size_x(datatype,&btype_size);
228 MPI_Type_get_extent(datatype,&btype_lb,&btype_extent);
229 ADIOI_Datatype_iscontig(datatype,&buf_contig);
230
231 if ( ( btype_extent!=btype_size ) || ( ! buf_contig ) )
232 {
233 FPRINTF(stderr,"[%d/%d] %s called with discontigous memory buffer\n",
234 myrank,nprocs,myname);
235 fflush(stderr);
236 *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
237 myname, __LINE__, MPI_ERR_IO,
238 "**io",
239 "**io %s", globus_object_printable_to_string(globus_error_get(result)));
240 return;
241 }
242
243
244
245 flat_file = ADIOI_Flatten_and_find(fd->filetype);
246
247
248
249 start=(globus_off_t)(offset*etype_size);
250 goff=start;
251 boff=0;
252 extent=0;
253 nblks=0;
254 while ( boff < (count*btype_size) )
255 {
256 int blklen;
257
258 for (i=0;i<flat_file->count;i++)
259 {
260 if ( (boff+flat_file->blocklens[i]) < (count*btype_size) )
261 blklen=flat_file->blocklens[i];
262 else
263 blklen=(count*btype_size)-boff;
264 boff+=blklen;
265 extent=MAX(extent,nblks*ftype_extent+flat_file->indices[i]+blklen);
266 if ( boff>=(count*btype_size) )
267 break;
268 }
269 nblks++;
270 }
271 if ( extent < count*btype_size )
272 {
273 FPRINTF(stderr,"[%d/%d] %s error in computing extent -- extent %d is smaller than total bytes requested %d!\n",
274 myrank,nprocs,myname,extent,count*btype_size);
275 fflush(stderr);
276 *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
277 myname, __LINE__, MPI_ERR_IO,
278 "**io",
279 "**io %s", globus_object_printable_to_string(globus_error_get(result)));
280 return;
281 }
282 end=start+(globus_off_t)extent;
283 FPRINTF(stderr,"[%d/%d] %s writing %d bytes into extent of %d bytes starting at offset %Ld\n",
284 myrank,nprocs,myname,count*btype_size,extent,(long long)start);
285 fflush(stderr);
286
287
288 globus_mutex_init(&writediscontig_ctl_lock, GLOBUS_NULL);
289 globus_cond_init(&writediscontig_ctl_cond, GLOBUS_NULL);
290 writediscontig_ctl_done=GLOBUS_FALSE;
291 if ( (result=globus_ftp_client_partial_put(&(gridftp_fh[fd->fd_sys]),
292 fd->filename,
293 &(oattr[fd->fd_sys]),
294 GLOBUS_NULL,
295 start,
296 end,
297 writediscontig_ctl_cb,
298 GLOBUS_NULL))!=GLOBUS_SUCCESS )
299 {
300 globus_err_handler("globus_ftp_client_partial_get",myname,result);
301 *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
302 myname, __LINE__, MPI_ERR_IO,
303 "**io",
304 "**io %s", globus_object_printable_to_string(globus_error_get(result)));
305 return;
306 }
307
308
309 boff=0;
310 nblks=0;
311 while ( boff < (count*btype_size) )
312 {
313 int i,blklen;
314
315 for (i=0;i<flat_file->count;i++)
316 {
317 if ( (boff+flat_file->blocklens[i]) < (count*btype_size) )
318 blklen=flat_file->blocklens[i];
319 else
320 blklen=(count*btype_size)-boff;
321 if ( blklen > 0 )
322 {
323 goff=start+nblks*ftype_extent+((globus_off_t)flat_file->indices[i]);
324
325
326
327 if ( (result=globus_ftp_client_register_write(&(gridftp_fh[fd->fd_sys]),
328 ((globus_byte_t *)buf)+boff,
329 (globus_size_t)blklen,
330 goff,
331 GLOBUS_TRUE,
332 writediscontig_data_cb,
333 (void *)(&bytes_written)))!=GLOBUS_SUCCESS )
334 {
335 globus_err_handler("globus_ftp_client_register_write",myname,result);
336 *error_code=MPI_ERR_IO;
337 ADIOI_Error(fd,*error_code,myname);
338 return;
339 }
340 boff+=blklen;
341 if ( boff>=(count*btype_size) )
342 break;
343 }
344 }
345 nblks++;
346 }
347
348
349
350
351 globus_mutex_lock(&writediscontig_ctl_lock);
352 while ( writediscontig_ctl_done!=GLOBUS_TRUE )
353 globus_cond_wait(&writediscontig_ctl_cond,&writediscontig_ctl_lock);
354 globus_mutex_unlock(&writediscontig_ctl_lock);
355 globus_mutex_destroy(&writediscontig_ctl_lock);
356 globus_cond_destroy(&writediscontig_ctl_cond);
357
358 #ifdef HAVE_STATUS_SET_BYTES
359 MPIR_Status_set_bytes(status, datatype, bytes_written);
360 #endif
361 if (file_ptr_type != ADIO_EXPLICIT_OFFSET)
362 {
363 fd->fp_ind += extent;
364 fd->fp_sys_posn = fd->fp_ind;
365 }
366 else {
367 fd->fp_sys_posn = offset + extent;
368 }
369 }
370
371
372 #define GRIDFTP_USE_GENERIC_STRIDED
373 void ADIOI_GRIDFTP_WriteStrided(ADIO_File fd, void *buf, int count,
374 MPI_Datatype datatype, int file_ptr_type,
375 ADIO_Offset offset, ADIO_Status *status,
376 int *error_code)
377 {
378 #ifdef GRIDFTP_USE_GENERIC_STRIDED
379 int myrank, nprocs;
380
381 if ( fd->access_mode&ADIO_RDONLY )
382 {
383 *error_code=MPI_ERR_AMODE;
384 return;
385 }
386
387 *error_code = MPI_SUCCESS;
388
389 MPI_Comm_size(fd->comm, &nprocs);
390 MPI_Comm_rank(fd->comm, &myrank);
391
392 ADIOI_GEN_WriteStrided(fd, buf, count, datatype, file_ptr_type, offset,
393 status, error_code);
394 return;
395 #else
396 char myname[]="ADIOI_GRIDFTP_WriteStrided";
397 int myrank, nprocs;
398 int buf_contig,file_contig;
399 MPI_Aint btype_size,bufsize;
400 globus_byte_t *intermediate;
401
402 *error_code = MPI_SUCCESS;
403
404 MPI_Comm_size(fd->comm, &nprocs);
405 MPI_Comm_rank(fd->comm, &myrank);
406
407 MPI_Type_size_x(datatype,&btype_size);
408 bufsize=count*btype_size;
409 ADIOI_Datatype_iscontig(fd->filetype,&file_contig);
410 ADIOI_Datatype_iscontig(datatype,&buf_contig);
411 if ( buf_contig && !file_contig )
412 {
413
414 FPRINTF(stderr,"[%d/%d] %s called w/ contig mem, discontig file\n",
415 myrank,nprocs,myname);
416 fflush(stderr);
417
418 ADIOI_GRIDFTP_WriteDiscontig(fd, buf, count, datatype,
419 file_ptr_type, offset, status, error_code);
420 }
421 else if ( !buf_contig && file_contig )
422 {
423
424 int posn=0;
425
426 FPRINTF(stderr,"[%d/%d] %s called w/ discontig mem, contig file\n",
427 myrank,nprocs,myname);
428 fflush(stderr);
429
430
431
432 intermediate=(globus_byte_t *)ADIOI_Malloc((size_t)bufsize);
433 MPI_Pack(buf,count,datatype,intermediate,bufsize,&posn,fd->comm);
434
435
436 ADIOI_GRIDFTP_WriteContig(fd, intermediate, bufsize, MPI_BYTE,
437 file_ptr_type, offset, status, error_code);
438
439 ADIOI_Free(intermediate);
440 }
441 else if ( !buf_contig && !file_contig )
442 {
443
444 int posn=0;
445
446 FPRINTF(stderr,"[%d/%d] %s called w/ discontig mem, discontig file\n",
447 myrank,nprocs,myname);
448 fflush(stderr);
449
450
451 intermediate=(globus_byte_t *)ADIOI_Malloc((size_t)bufsize);
452 MPI_Pack(buf,count,datatype,intermediate,bufsize,&posn,fd->comm);
453
454
455 ADIOI_GRIDFTP_WriteDiscontig(fd, intermediate, bufsize, MPI_BYTE,
456 file_ptr_type, offset, status, error_code);
457
458 ADIOI_Free(intermediate);
459 }
460 else
461 {
462
463 FPRINTF(stderr,"[%d/%d] Why the heck did you call %s with contiguous buffer *and* file types?\n",
464 myrank,nprocs,myname);
465 ADIOI_GRIDFTP_WriteContig(fd, buf, count, datatype,
466 file_ptr_type, offset, status, error_code);
467 }
468 #endif
469 }
470