This source file includes following definitions.
- exists_cb
- touch_ctl_cb
- touch_data_cb
- ADIOI_GRIDFTP_Open
1
2
3
4
5
6
7 #include "ad_gridftp.h"
8 #include "adioi.h"
9
10 static globus_mutex_t lock;
11 static globus_cond_t cond;
12
13 static globus_bool_t file_exists,exists_done;
14 static void exists_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error)
15 {
16 if (error)
17 {
18 FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
19 }
20 else
21 {
22 file_exists=GLOBUS_TRUE;
23 }
24 exists_done=GLOBUS_TRUE;
25 }
26
27 static globus_bool_t touch_ctl_done;
28 static void touch_ctl_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error)
29 {
30 if (error)
31 {
32 FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
33 }
34 globus_mutex_lock(&lock);
35 touch_ctl_done=GLOBUS_TRUE;
36 globus_cond_signal(&cond);
37 globus_mutex_unlock(&lock);
38 }
39
40 static void touch_data_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error,
41 globus_byte_t *buffer, globus_size_t length, globus_off_t offset,
42 globus_bool_t eof)
43 {
44 if (error)
45 FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
46 globus_ftp_client_register_read(handle,buffer,length,touch_data_cb,myargs);
47 return;
48 }
49
50 void ADIOI_GRIDFTP_Open(ADIO_File fd, int *error_code)
51 {
52 static char myname[]="ADIOI_GRIDFTP_Open";
53 int myrank, nprocs, keyfound;
54 char hintval[MPI_MAX_INFO_VAL+1];
55 globus_ftp_client_handleattr_t hattr;
56 globus_result_t result;
57
58 MPI_Comm_size(fd->comm, &nprocs);
59 MPI_Comm_rank(fd->comm, &myrank);
60
61
62
63 globus_module_activate(GLOBUS_FTP_CLIENT_MODULE);
64 fd->fd_sys = num_gridftp_handles;
65
66 fd->shared_fp_fname = NULL;
67 *error_code = MPI_SUCCESS;
68
69
70
71
72
73
74
75
76
77 result=globus_ftp_client_handleattr_init(&hattr);
78 if ( result != GLOBUS_SUCCESS )
79 {
80
81
82 globus_err_handler("globus_ftp_client_handleattr_init",
83 myname,result);
84 fd->fd_sys = -1;
85 *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
86 myname, __LINE__, MPI_ERR_IO,
87 "**io",
88 "**io %s", globus_object_printable_to_string(globus_error_get(result)));
89 return;
90 }
91 result = globus_ftp_client_operationattr_init(&(oattr[fd->fd_sys]));
92 if ( result != GLOBUS_SUCCESS )
93 {
94 globus_err_handler("globus_ftp_client_operationattr_init",
95 myname,result);
96 fd->fd_sys = -1;
97 *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
98 myname, __LINE__, MPI_ERR_IO,
99 "**io",
100 "**io %s", globus_object_printable_to_string(globus_error_get(result)));
101 return;
102 }
103
104
105
106 result=globus_ftp_client_handleattr_set_cache_all(&hattr,GLOBUS_TRUE);
107 if ( result !=GLOBUS_SUCCESS )
108 globus_err_handler("globus_ftp_client_handleattr_set_cache_all",myname,result);
109
110
111 if ( (fd->access_mode&ADIO_RDONLY) &&
112 (result=globus_ftp_client_handleattr_add_cached_url(&hattr,fd->filename))!=GLOBUS_SUCCESS )
113 globus_err_handler("globus_ftp_client_handleattr_add_cached_url",myname,result);
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131 if ( (fd->access_mode&ADIO_APPEND) &&
132 ((result=globus_ftp_client_operationattr_set_append(&(oattr[fd->fd_sys]),GLOBUS_TRUE))!=GLOBUS_SUCCESS) )
133 globus_err_handler("globus_ftp_client_operationattr_set_append",myname,result);
134
135
136
137 if ( fd->info!=MPI_INFO_NULL )
138 {
139 ADIOI_Info_get(fd->info,"ftp_control_mode",MPI_MAX_INFO_VAL,hintval,&keyfound);
140 if ( keyfound )
141 {
142 if ( ( !strcmp(hintval,"extended") || !strcmp(hintval,"extended_block") ) &&
143 (result=globus_ftp_client_operationattr_set_mode(&(oattr[fd->fd_sys]),GLOBUS_FTP_CONTROL_MODE_EXTENDED_BLOCK))!=GLOBUS_SUCCESS )
144 globus_err_handler("globus_ftp_client_operationattr_set_mode",myname,result);
145 else if ( !strcmp(hintval,"block") &&
146 (result=globus_ftp_client_operationattr_set_mode(&(oattr[fd->fd_sys]),GLOBUS_FTP_CONTROL_MODE_BLOCK))!=GLOBUS_SUCCESS )
147 globus_err_handler("globus_ftp_client_operationattr_set_mode",myname,result);
148 else if ( !strcmp(hintval,"compressed") &&
149 (result=globus_ftp_client_operationattr_set_mode(&(oattr[fd->fd_sys]),GLOBUS_FTP_CONTROL_MODE_COMPRESSED))!=GLOBUS_SUCCESS )
150 globus_err_handler("globus_ftp_client_operationattr_set_mode",myname,result);
151 else if ( !strcmp(hintval,"stream") &&
152 (result=globus_ftp_client_operationattr_set_mode(&(oattr[fd->fd_sys]),GLOBUS_FTP_CONTROL_MODE_STREAM))!=GLOBUS_SUCCESS )
153 globus_err_handler("globus_ftp_client_operationattr_set_mode",myname,result);
154 }
155
156 ADIOI_Info_get(fd->info,"parallelism",MPI_MAX_INFO_VAL,hintval,&keyfound);
157 if ( keyfound )
158 {
159 int nftpthreads;
160
161 if ( sscanf(hintval,"%d",&nftpthreads)==1 )
162 {
163 globus_ftp_control_parallelism_t parallelism;
164
165 parallelism.mode = GLOBUS_FTP_CONTROL_PARALLELISM_FIXED;
166 parallelism.fixed.size = nftpthreads;
167 if ( (result=globus_ftp_client_operationattr_set_parallelism(&(oattr[fd->fd_sys]),
168 ¶llelism))!=GLOBUS_SUCCESS )
169 globus_err_handler("globus_ftp_client_operationattr_set_parallelism",myname,result);
170 }
171 }
172
173 ADIOI_Info_get(fd->info,"striped_ftp",MPI_MAX_INFO_VAL,hintval,&keyfound);
174 if ( keyfound )
175 {
176
177 if ( !strncmp("true",hintval,4) || !strncmp("TRUE",hintval,4) ||
178 !strncmp("enable",hintval,4) || !strncmp("ENABLE",hintval,4) )
179 {
180 ADIOI_Info_get(fd->info,"striping_factor",MPI_MAX_INFO_VAL,hintval,&keyfound);
181 if ( keyfound )
182 {
183 int striping_factor;
184
185 if ( sscanf(hintval,"%d",&striping_factor)==1 )
186 {
187 globus_ftp_control_layout_t layout;
188
189 layout.mode = GLOBUS_FTP_CONTROL_STRIPING_BLOCKED_ROUND_ROBIN;
190 layout.round_robin.block_size = striping_factor;
191 if ( (result=globus_ftp_client_operationattr_set_layout(&(oattr[fd->fd_sys]),
192 &layout))!=GLOBUS_SUCCESS )
193 globus_err_handler("globus_ftp_client_operationattr_set_layout",
194 myname,result);
195 }
196 }
197 }
198 }
199
200 ADIOI_Info_get(fd->info,"tcp_buffer",MPI_MAX_INFO_VAL,hintval,&keyfound);
201 if ( keyfound )
202 {
203
204 int buffer_size;
205 if ( sscanf(hintval,"%d",&buffer_size)==1 )
206 {
207 globus_ftp_control_tcpbuffer_t tcpbuf;
208
209 tcpbuf.mode = GLOBUS_FTP_CONTROL_TCPBUFFER_FIXED;
210 tcpbuf.fixed.size = buffer_size;
211 if ( (result=globus_ftp_client_operationattr_set_tcp_buffer(&(oattr[fd->fd_sys]),
212 &tcpbuf))!=GLOBUS_SUCCESS )
213 globus_err_handler("globus_ftp_client_operationattr_set_tcp_buffer",myname,result);
214 }
215 }
216
217 ADIOI_Info_get(fd->info,"transfer_type",MPI_MAX_INFO_VAL,hintval,&keyfound);
218 if ( keyfound )
219 {
220 globus_ftp_control_type_t filetype;
221
222 if ( !strcmp("ascii",hintval) || !strcmp("ASCII",hintval) )
223 {
224 filetype=GLOBUS_FTP_CONTROL_TYPE_ASCII;
225 }
226 else
227 {
228 filetype=GLOBUS_FTP_CONTROL_TYPE_IMAGE;
229 }
230 if ( (result=globus_ftp_client_operationattr_set_type(&(oattr[fd->fd_sys]),filetype))!=GLOBUS_SUCCESS )
231 globus_err_handler("globus_ftp_client_operationattr_set_type",myname,result);
232 }
233 }
234 else
235 FPRINTF(stderr,"no MPI_Info object associated with %s\n",fd->filename);
236
237
238 result=globus_ftp_client_handle_init(&(gridftp_fh[fd->fd_sys]),&hattr);
239 if ( result != GLOBUS_SUCCESS )
240 {
241 globus_err_handler("globus_ftp_client_handle_init",myname,result);
242 fd->fd_sys = -1;
243 *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
244 myname, __LINE__, MPI_ERR_IO,
245 "**io",
246 "**io %s", globus_object_printable_to_string(globus_error_get(result)));
247 return;
248 }
249
250
251 globus_mutex_init(&lock, GLOBUS_NULL);
252 globus_cond_init(&cond, GLOBUS_NULL);
253 file_exists=GLOBUS_FALSE;
254 exists_done=GLOBUS_FALSE;
255 if ( myrank==0 )
256 {
257 if ( (result=globus_ftp_client_exists(&(gridftp_fh[fd->fd_sys]),
258 fd->filename,
259 &(oattr[fd->fd_sys]),
260 exists_cb,
261 GLOBUS_NULL))!=GLOBUS_SUCCESS )
262 {
263 globus_err_handler("globus_ftp_client_exists",myname,result);
264 fd->fd_sys = -1;
265 *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
266 myname, __LINE__, MPI_ERR_IO,
267 "**io", "**io %s",
268 globus_object_printable_to_string(globus_error_get(result)));
269 return;
270 }
271
272 globus_mutex_lock(&lock);
273 while ( exists_done!=GLOBUS_TRUE )
274 globus_cond_wait(&cond,&lock);
275 globus_mutex_unlock(&lock);
276 }
277 MPI_Barrier(fd->comm);
278 MPI_Bcast(&file_exists,1,MPI_INT,0,fd->comm);
279
280
281 if ( (file_exists!=GLOBUS_TRUE) && (fd->access_mode&ADIO_CREATE) &&
282 !(fd->access_mode&ADIO_EXCL) && !(fd->access_mode&ADIO_RDONLY) )
283 {
284 if ( myrank==0 )
285 {
286
287 globus_byte_t touchbuf=(globus_byte_t)'\0';
288 touch_ctl_done=GLOBUS_FALSE;
289 if ( (result=globus_ftp_client_put(&(gridftp_fh[fd->fd_sys]),
290 fd->filename,
291 &(oattr[fd->fd_sys]),
292 GLOBUS_NULL,
293 touch_ctl_cb,
294 GLOBUS_NULL))!=GLOBUS_SUCCESS )
295 {
296 globus_err_handler("globus_ftp_client_put",myname,result);
297 fd->fd_sys = -1;
298 *error_code = MPIO_Err_create_code(MPI_SUCCESS,
299 MPIR_ERR_RECOVERABLE,
300 myname, __LINE__, MPI_ERR_IO,
301 "**io", "**io %s",
302 globus_object_printable_to_string(globus_error_get(result)));
303 return;
304 }
305 result=globus_ftp_client_register_write(&(gridftp_fh[fd->fd_sys]),
306 (globus_byte_t *)&touchbuf, 0,
307 (globus_off_t)0, GLOBUS_TRUE,
308 touch_data_cb, GLOBUS_NULL);
309
310 if ( result != GLOBUS_SUCCESS )
311 {
312 globus_err_handler("globus_ftp_client_register_write",myname,result);
313 *error_code = MPIO_Err_create_code(MPI_SUCCESS,
314 MPIR_ERR_RECOVERABLE,
315 myname, __LINE__, MPI_ERR_IO,
316 "**io", "**io %s",
317 globus_object_printable_to_string(globus_error_get(result)));
318 return;
319 }
320 globus_mutex_lock(&lock);
321 while ( touch_ctl_done!=GLOBUS_TRUE )
322 globus_cond_wait(&cond,&lock);
323 globus_mutex_unlock(&lock);
324 }
325 MPI_Barrier(fd->comm);
326 }
327 else if ( (fd->access_mode&ADIO_EXCL) && (file_exists==GLOBUS_TRUE) )
328 {
329 fd->fd_sys = -1;
330 *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
331 myname, __LINE__, MPI_ERR_IO,
332 "**io", 0);
333 return;
334 }
335 else if ( (fd->access_mode&ADIO_RDONLY) && (file_exists!=GLOBUS_TRUE) )
336 {
337 if ( myrank==0 )
338 {
339 FPRINTF(stderr,"WARNING: read-only file %s does not exist!\n",fd->filename);
340 }
341 }
342 num_gridftp_handles++;
343 }