This source file includes following definitions.
- resize_cb
- resize_wrdata_cb
- ADIOI_GRIDFTP_Resize
1
2
3
4
5
6
7
8 #include "ad_gridftp.h"
9 #include "adioi.h"
10
11 static globus_mutex_t resize_lock;
12 static globus_cond_t resize_cond;
13 static globus_bool_t resize_done;
14 static globus_bool_t resize_success;
15
16 void resize_cb(void *myargs, globus_ftp_client_handle_t *handle,
17 globus_object_t *error)
18 {
19 if (error)
20 {
21 FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
22 globus_mutex_lock(&resize_lock);
23 resize_success=GLOBUS_FALSE;
24 globus_mutex_unlock(&resize_lock);
25 }
26 else
27 {
28 globus_mutex_lock(&resize_lock);
29 resize_success=GLOBUS_TRUE;
30 globus_mutex_unlock(&resize_lock);
31 }
32 globus_mutex_lock(&resize_lock);
33 resize_done=GLOBUS_TRUE;
34 globus_cond_signal(&resize_cond);
35 globus_mutex_unlock(&resize_lock);
36 }
37
38
39 static void resize_wrdata_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error,
40 globus_byte_t *buffer, globus_size_t length, globus_off_t offset,
41 globus_bool_t eof)
42 {
43 if (error)
44 FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
45 if (!eof)
46 globus_ftp_client_register_read(handle,
47 buffer,
48 length,
49 resize_wrdata_cb,
50 myargs);
51 return;
52 }
53
54
55 void ADIOI_GRIDFTP_Resize(ADIO_File fd, ADIO_Offset size, int *error_code)
56 {
57 int myrank, nprocs;
58 char myname[]="ADIOI_GRIDFTP_Resize";
59 globus_off_t fsize;
60 globus_result_t result;
61
62 *error_code = MPI_SUCCESS;
63
64 MPI_Comm_size(fd->comm, &nprocs);
65 MPI_Comm_rank(fd->comm, &myrank);
66
67
68 if ( fd->access_mode&ADIO_RDONLY )
69 {
70 FPRINTF(stderr,"%s: attempt to resize read-only file %s!\n",
71 myname,fd->filename);
72 *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
73 myname, __LINE__, MPI_ERR_IO,
74 "**io", 0);
75 return;
76 }
77
78
79
80 globus_mutex_init(&resize_lock,GLOBUS_NULL);
81 globus_cond_init(&resize_cond,GLOBUS_NULL);
82 resize_done=GLOBUS_FALSE;
83 if ( (result=globus_ftp_client_size(&(gridftp_fh[fd->fd_sys]),
84 fd->filename,
85 &(oattr[fd->fd_sys]),
86 &(fsize),
87 resize_cb,
88 GLOBUS_NULL))!=GLOBUS_SUCCESS )
89 {
90 globus_err_handler("globus_ftp_client_size",myname,result);
91 *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
92 myname, __LINE__, MPI_ERR_IO,
93 "**io",
94 "**io %s", globus_object_printable_to_string(globus_error_get(result)));
95 return;
96 }
97 globus_mutex_lock(&resize_lock);
98 while ( resize_done!=GLOBUS_TRUE )
99 globus_cond_wait(&resize_lock,&resize_cond);
100 if ( fsize < (globus_off_t)size )
101 {
102
103
104 globus_byte_t touchbuf=(globus_byte_t)'\0';
105 resize_done=GLOBUS_FALSE;
106 if ( (result=globus_ftp_client_partial_put(&(gridftp_fh[fd->fd_sys]),
107 fd->filename,
108 &(oattr[fd->fd_sys]),
109 GLOBUS_NULL,
110 (globus_off_t)size,
111 (globus_off_t)size,
112 resize_cb,
113 GLOBUS_NULL))!=GLOBUS_SUCCESS )
114 {
115 globus_err_handler("globus_ftp_client_partial_put",myname,result);
116 *error_code = MPIO_Err_create_code(MPI_SUCCESS,
117 MPIR_ERR_RECOVERABLE, myname, __LINE__,
118 MPI_ERR_IO, "**io", "**io %s",
119 globus_object_printable_to_string(globus_error_get(result)));
120 return;
121 }
122
123 if ( (result=globus_ftp_client_register_write(&(gridftp_fh[fd->fd_sys]),
124 (globus_byte_t *)&touchbuf,
125 0,
126 (globus_off_t)0,
127 GLOBUS_TRUE,
128 resize_wrdata_cb,
129 GLOBUS_NULL))!=GLOBUS_SUCCESS )
130 {
131 globus_err_handler("globus_ftp_client_register_write",myname,result);
132 *error_code = MPIO_Err_create_code(MPI_SUCCESS,
133 MPIR_ERR_RECOVERABLE, myname, __LINE__,
134 MPI_ERR_IO, "**io", "**io %s",
135 globus_object_printable_to_string(globus_error_get(result)));
136 return;
137 }
138 globus_mutex_lock(&resize_lock);
139 while ( resize_done!=GLOBUS_TRUE )
140 globus_cond_wait(&resize_cond,&resize_lock);
141 globus_mutex_unlock(&resize_lock);
142 }
143 else if ( fsize > (globus_off_t)size )
144 {
145
146
147
148 char *urlold;
149 size_t urllen;
150
151 urllen=strlen(fd->filename);
152 urlold=(char *)ADIOI_Malloc(urllen+5);
153 ADIOI_Snprintf(urlold,urllen+5,"%s.old",fd->filename);
154 resize_done=GLOBUS_FALSE;
155 resize_success=GLOBUS_FALSE;
156 if ( (result=globus_ftp_client_move(&(gridftp_fh[fd->fd_sys]),
157 fd->filename,
158 urlold,
159 &(oattr[fd->fd_sys]),
160 resize_cb,
161 GLOBUS_NULL))!=GLOBUS_SUCCESS )
162 {
163 globus_err_handler("globus_ftp_client_move",myname,result);
164 *error_code = MPIO_Err_create_code(MPI_SUCCESS,
165 MPIR_ERR_RECOVERABLE, myname, __LINE__,
166 MPI_ERR_IO, "**io", "**io %s",
167 globus_object_printable_to_string(globus_error_get(result)));
168 return;
169 }
170 globus_mutex_lock(&resize_lock);
171 while ( resize_done!=GLOBUS_TRUE )
172 globus_cond_wait(&resize_cond,&resize_lock);
173 globus_mutex_unlock(&resize_lock);
174 if ( resize_success!=GLOBUS_TRUE )
175 {
176 *error_code = MPI_ERR_IO;
177 return;
178 }
179 resize_done=GLOBUS_FALSE;
180 if ( (result=globus_ftp_client_partial_third_party_transfer(&(gridftp_fh[fd->fd_sys]),
181 urlold,
182 &(oattr[fd->fd_sys]),
183 fd->filename,
184 &(oattr[fd->fd_sys]),
185 GLOBUS_NULL,
186 0,
187 (globus_off_t)size,
188 resize_cb,
189 GLOBUS_NULL))!=GLOBUS_SUCCESS )
190 {
191 globus_err_handler("globus_ftp_client_partial_third_party_transfer",myname,result);
192 *error_code = MPIO_Err_create_code(MPI_SUCCESS,
193 MPIR_ERR_RECOVERABLE, myname, __LINE__,
194 MPI_ERR_IO, "**io", "**io %s",
195 globus_object_printable_to_string(globus_error_get(result)));
196 return;
197 }
198 globus_mutex_lock(&resize_lock);
199 while ( resize_done!=GLOBUS_TRUE )
200 globus_cond_wait(&resize_cond,&resize_lock);
201 globus_mutex_unlock(&resize_lock);
202 if ( resize_success!=GLOBUS_TRUE )
203 {
204 *error_code = MPI_ERR_IO;
205 ADIOI_Error(fd,*error_code,myname);
206 return;
207 }
208 resize_done=GLOBUS_FALSE;
209 if ( (result=globus_ftp_client_delete(&(gridftp_fh[fd->fd_sys]),
210 urlold,
211 &(oattr[fd->fd_sys]),
212 resize_cb,
213 GLOBUS_NULL))!=GLOBUS_SUCCESS )
214 {
215 globus_err_handler("globus_ftp_client_delete",myname,result);
216 *error_code = MPIO_Err_create_code(MPI_SUCCESS,
217 MPIR_ERR_RECOVERABLE, myname, __LINE__,
218 MPI_ERR_IO, "**io", "**io %s",
219 globus_object_printable_to_string(globus_error_get(result)));
220 return;
221 }
222 globus_mutex_lock(&resize_lock);
223 while ( resize_done!=GLOBUS_TRUE )
224 globus_cond_wait(&resize_cond,&resize_lock);
225 globus_mutex_unlock(&resize_lock);
226 if ( resize_success!=GLOBUS_TRUE )
227 {
228 *error_code = MPI_ERR_IO;
229 ADIOI_Error(fd,*error_code,myname);
230 return;
231 }
232 ADIOI_Free(urlold);
233 }
234 globus_mutex_destroy(&resize_lock);
235 globus_cond_destroy(&resize_cond);
236 }
237
238
239
240
241