This source file includes following definitions.
- exists_cb
- touch_ctl_cb
- touch_data_cb
- ADIOI_GRIDFTP_Open
   1 
   2 
   3 
   4 
   5 
   6 
   7 #include "ad_gridftp.h"
   8 #include "adioi.h"
   9 
  10 static globus_mutex_t lock;
  11 static globus_cond_t cond;
  12 
  13 static globus_bool_t file_exists,exists_done;
  14 static void exists_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error)
  15 {    
  16     if (error)
  17         {
  18             FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
  19         }
  20     else
  21         {
  22             file_exists=GLOBUS_TRUE;
  23         }
  24     exists_done=GLOBUS_TRUE;
  25 }
  26 
  27 static globus_bool_t touch_ctl_done;
  28 static void touch_ctl_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error)
  29 {
  30     if (error)
  31         {
  32             FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
  33         }
  34     globus_mutex_lock(&lock);
  35     touch_ctl_done=GLOBUS_TRUE;
  36     globus_cond_signal(&cond);
  37     globus_mutex_unlock(&lock);
  38 }
  39 
  40 static void touch_data_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error,
  41                           globus_byte_t *buffer, globus_size_t length, globus_off_t offset,
  42                           globus_bool_t eof)
  43 {
  44     if (error)
  45         FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
  46     globus_ftp_client_register_read(handle,buffer,length,touch_data_cb,myargs);
  47     return;
  48 }
  49 
  50 void ADIOI_GRIDFTP_Open(ADIO_File fd, int *error_code)
  51 {
  52     static char myname[]="ADIOI_GRIDFTP_Open";
  53     int myrank, nprocs, keyfound;
  54     char hintval[MPI_MAX_INFO_VAL+1];
  55     globus_ftp_client_handleattr_t hattr;
  56     globus_result_t result;
  57 
  58     MPI_Comm_size(fd->comm, &nprocs);
  59     MPI_Comm_rank(fd->comm, &myrank);
  60 
  61     
  62 
  63     globus_module_activate(GLOBUS_FTP_CLIENT_MODULE);
  64     fd->fd_sys = num_gridftp_handles;
  65     
  66     fd->shared_fp_fname = NULL;
  67     *error_code = MPI_SUCCESS;
  68 
  69     
  70 
  71 
  72 
  73 
  74 
  75 
  76 
  77     result=globus_ftp_client_handleattr_init(&hattr);
  78     if ( result != GLOBUS_SUCCESS )
  79         {
  80             
  81 
  82             globus_err_handler("globus_ftp_client_handleattr_init",
  83                                myname,result);
  84             fd->fd_sys = -1;
  85             *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
  86                     myname, __LINE__, MPI_ERR_IO,
  87                     "**io",
  88                     "**io %s", globus_object_printable_to_string(globus_error_get(result)));
  89             return;
  90         }
  91     result = globus_ftp_client_operationattr_init(&(oattr[fd->fd_sys]));
  92     if ( result != GLOBUS_SUCCESS )
  93         {
  94             globus_err_handler("globus_ftp_client_operationattr_init",
  95                                myname,result);
  96             fd->fd_sys = -1;
  97             *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
  98                     myname, __LINE__, MPI_ERR_IO,
  99                     "**io",
 100                     "**io %s", globus_object_printable_to_string(globus_error_get(result)));
 101             return;
 102         }
 103 
 104 
 105     
 106     result=globus_ftp_client_handleattr_set_cache_all(&hattr,GLOBUS_TRUE);
 107     if ( result !=GLOBUS_SUCCESS )
 108         globus_err_handler("globus_ftp_client_handleattr_set_cache_all",myname,result);
 109 
 110     
 111     if ( (fd->access_mode&ADIO_RDONLY) &&
 112          (result=globus_ftp_client_handleattr_add_cached_url(&hattr,fd->filename))!=GLOBUS_SUCCESS )
 113         globus_err_handler("globus_ftp_client_handleattr_add_cached_url",myname,result);
 114 
 115     
 116 
 117 
 118 
 119 
 120 
 121     
 122 
 123 
 124 
 125 
 126 
 127 
 128 
 129 
 130     
 131     if ( (fd->access_mode&ADIO_APPEND) && 
 132          ((result=globus_ftp_client_operationattr_set_append(&(oattr[fd->fd_sys]),GLOBUS_TRUE))!=GLOBUS_SUCCESS) )
 133         globus_err_handler("globus_ftp_client_operationattr_set_append",myname,result);
 134 
 135     
 136 
 137     if ( fd->info!=MPI_INFO_NULL )
 138         {
 139             ADIOI_Info_get(fd->info,"ftp_control_mode",MPI_MAX_INFO_VAL,hintval,&keyfound);
 140             if ( keyfound )
 141                 {
 142                     if ( ( !strcmp(hintval,"extended") || !strcmp(hintval,"extended_block") ) && 
 143                          (result=globus_ftp_client_operationattr_set_mode(&(oattr[fd->fd_sys]),GLOBUS_FTP_CONTROL_MODE_EXTENDED_BLOCK))!=GLOBUS_SUCCESS )
 144                         globus_err_handler("globus_ftp_client_operationattr_set_mode",myname,result);
 145                     else if ( !strcmp(hintval,"block") && 
 146                               (result=globus_ftp_client_operationattr_set_mode(&(oattr[fd->fd_sys]),GLOBUS_FTP_CONTROL_MODE_BLOCK))!=GLOBUS_SUCCESS )
 147                         globus_err_handler("globus_ftp_client_operationattr_set_mode",myname,result);
 148                     else if ( !strcmp(hintval,"compressed") && 
 149                               (result=globus_ftp_client_operationattr_set_mode(&(oattr[fd->fd_sys]),GLOBUS_FTP_CONTROL_MODE_COMPRESSED))!=GLOBUS_SUCCESS )
 150                         globus_err_handler("globus_ftp_client_operationattr_set_mode",myname,result);
 151                     else if ( !strcmp(hintval,"stream") && 
 152                               (result=globus_ftp_client_operationattr_set_mode(&(oattr[fd->fd_sys]),GLOBUS_FTP_CONTROL_MODE_STREAM))!=GLOBUS_SUCCESS )
 153                         globus_err_handler("globus_ftp_client_operationattr_set_mode",myname,result);
 154                 }
 155 
 156             ADIOI_Info_get(fd->info,"parallelism",MPI_MAX_INFO_VAL,hintval,&keyfound);
 157             if ( keyfound )
 158                 {
 159                     int nftpthreads;
 160                     
 161                     if ( sscanf(hintval,"%d",&nftpthreads)==1 )
 162                         {
 163                             globus_ftp_control_parallelism_t parallelism;
 164 
 165                             parallelism.mode = GLOBUS_FTP_CONTROL_PARALLELISM_FIXED;
 166                             parallelism.fixed.size = nftpthreads;
 167                             if ( (result=globus_ftp_client_operationattr_set_parallelism(&(oattr[fd->fd_sys]),
 168                                                                                          ¶llelism))!=GLOBUS_SUCCESS )
 169                                 globus_err_handler("globus_ftp_client_operationattr_set_parallelism",myname,result);
 170                         }
 171                 }
 172 
 173             ADIOI_Info_get(fd->info,"striped_ftp",MPI_MAX_INFO_VAL,hintval,&keyfound);
 174             if ( keyfound )
 175                 {
 176                     
 177                     if ( !strncmp("true",hintval,4) || !strncmp("TRUE",hintval,4) ||
 178                          !strncmp("enable",hintval,4) || !strncmp("ENABLE",hintval,4) )
 179                         {
 180                             ADIOI_Info_get(fd->info,"striping_factor",MPI_MAX_INFO_VAL,hintval,&keyfound);
 181                             if ( keyfound )
 182                                 {
 183                                     int striping_factor;
 184 
 185                                     if ( sscanf(hintval,"%d",&striping_factor)==1 )
 186                                         {
 187                                             globus_ftp_control_layout_t layout;
 188 
 189                                             layout.mode = GLOBUS_FTP_CONTROL_STRIPING_BLOCKED_ROUND_ROBIN;
 190                                             layout.round_robin.block_size = striping_factor;
 191                                             if ( (result=globus_ftp_client_operationattr_set_layout(&(oattr[fd->fd_sys]),
 192                                                                                                     &layout))!=GLOBUS_SUCCESS  )
 193                                                 globus_err_handler("globus_ftp_client_operationattr_set_layout",
 194                                                                    myname,result);
 195                                         }
 196                                 }
 197                         }
 198                 }
 199 
 200             ADIOI_Info_get(fd->info,"tcp_buffer",MPI_MAX_INFO_VAL,hintval,&keyfound);
 201             if ( keyfound )
 202                 {
 203                     
 204                     int buffer_size;
 205                     if ( sscanf(hintval,"%d",&buffer_size)==1 )
 206                         {
 207                             globus_ftp_control_tcpbuffer_t tcpbuf;
 208 
 209                             tcpbuf.mode = GLOBUS_FTP_CONTROL_TCPBUFFER_FIXED;
 210                             tcpbuf.fixed.size = buffer_size;
 211                             if ( (result=globus_ftp_client_operationattr_set_tcp_buffer(&(oattr[fd->fd_sys]),
 212                                                                                         &tcpbuf))!=GLOBUS_SUCCESS )
 213                                 globus_err_handler("globus_ftp_client_operationattr_set_tcp_buffer",myname,result);
 214                         }
 215                 }
 216 
 217             ADIOI_Info_get(fd->info,"transfer_type",MPI_MAX_INFO_VAL,hintval,&keyfound);
 218             if ( keyfound )
 219                 {
 220                     globus_ftp_control_type_t filetype;
 221                     
 222                     if ( !strcmp("ascii",hintval) || !strcmp("ASCII",hintval) )
 223                         {
 224                             filetype=GLOBUS_FTP_CONTROL_TYPE_ASCII;
 225                         }
 226                     else
 227                         {
 228                             filetype=GLOBUS_FTP_CONTROL_TYPE_IMAGE;
 229                         }
 230                     if ( (result=globus_ftp_client_operationattr_set_type(&(oattr[fd->fd_sys]),filetype))!=GLOBUS_SUCCESS )
 231                         globus_err_handler("globus_ftp_client_operationattr_set_type",myname,result);
 232                 }
 233         }
 234     else
 235         FPRINTF(stderr,"no MPI_Info object associated with %s\n",fd->filename);
 236 
 237     
 238     result=globus_ftp_client_handle_init(&(gridftp_fh[fd->fd_sys]),&hattr);
 239     if ( result != GLOBUS_SUCCESS )
 240         {
 241             globus_err_handler("globus_ftp_client_handle_init",myname,result);
 242             fd->fd_sys = -1;
 243             *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
 244                     myname, __LINE__, MPI_ERR_IO,
 245                     "**io",
 246                     "**io %s", globus_object_printable_to_string(globus_error_get(result)));
 247             return;
 248         }
 249 
 250     
 251     globus_mutex_init(&lock, GLOBUS_NULL);
 252     globus_cond_init(&cond, GLOBUS_NULL);
 253     file_exists=GLOBUS_FALSE;
 254     exists_done=GLOBUS_FALSE;
 255     if ( myrank==0 )
 256         {
 257             if ( (result=globus_ftp_client_exists(&(gridftp_fh[fd->fd_sys]),
 258                                                   fd->filename,
 259                                                   &(oattr[fd->fd_sys]),
 260                                                   exists_cb,
 261                                                   GLOBUS_NULL))!=GLOBUS_SUCCESS )
 262                 {
 263                     globus_err_handler("globus_ftp_client_exists",myname,result);
 264                     fd->fd_sys = -1; 
 265                     *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
 266                                     myname, __LINE__, MPI_ERR_IO,
 267                                     "**io", "**io %s", 
 268                                     globus_object_printable_to_string(globus_error_get(result)));
 269                     return;
 270                 }
 271             
 272             globus_mutex_lock(&lock);
 273             while ( exists_done!=GLOBUS_TRUE )
 274                 globus_cond_wait(&cond,&lock);
 275             globus_mutex_unlock(&lock);
 276         }
 277     MPI_Barrier(fd->comm);
 278     MPI_Bcast(&file_exists,1,MPI_INT,0,fd->comm);
 279 
 280     
 281     if ( (file_exists!=GLOBUS_TRUE) && (fd->access_mode&ADIO_CREATE) &&
 282          !(fd->access_mode&ADIO_EXCL) && !(fd->access_mode&ADIO_RDONLY) )
 283         {
 284             if ( myrank==0 )
 285                 {
 286                     
 287                     globus_byte_t touchbuf=(globus_byte_t)'\0';
 288                     touch_ctl_done=GLOBUS_FALSE;
 289                     if ( (result=globus_ftp_client_put(&(gridftp_fh[fd->fd_sys]),
 290                                                        fd->filename,
 291                                                        &(oattr[fd->fd_sys]),
 292                                                        GLOBUS_NULL,
 293                                                        touch_ctl_cb,
 294                                                        GLOBUS_NULL))!=GLOBUS_SUCCESS )
 295                         {
 296                             globus_err_handler("globus_ftp_client_put",myname,result);
 297                             fd->fd_sys = -1;
 298                             *error_code = MPIO_Err_create_code(MPI_SUCCESS, 
 299                                 MPIR_ERR_RECOVERABLE,
 300                                 myname, __LINE__, MPI_ERR_IO,
 301                                 "**io", "**io %s", 
 302                                 globus_object_printable_to_string(globus_error_get(result)));
 303                             return;
 304                         }
 305                     result=globus_ftp_client_register_write(&(gridftp_fh[fd->fd_sys]),
 306                                   (globus_byte_t *)&touchbuf, 0,
 307                                   (globus_off_t)0, GLOBUS_TRUE,
 308                                   touch_data_cb, GLOBUS_NULL);
 309 
 310                     if ( result != GLOBUS_SUCCESS )
 311                         {
 312                             globus_err_handler("globus_ftp_client_register_write",myname,result);
 313                             *error_code = MPIO_Err_create_code(MPI_SUCCESS, 
 314                                 MPIR_ERR_RECOVERABLE,
 315                                 myname, __LINE__, MPI_ERR_IO,
 316                                 "**io", "**io %s", 
 317                                 globus_object_printable_to_string(globus_error_get(result)));
 318                             return;
 319                         }
 320                     globus_mutex_lock(&lock);
 321                     while ( touch_ctl_done!=GLOBUS_TRUE )
 322                         globus_cond_wait(&cond,&lock);
 323                     globus_mutex_unlock(&lock);
 324                 }
 325             MPI_Barrier(fd->comm);
 326         }
 327     else if ( (fd->access_mode&ADIO_EXCL) && (file_exists==GLOBUS_TRUE) )
 328         {
 329             fd->fd_sys = -1;
 330             *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
 331                             myname, __LINE__, MPI_ERR_IO, 
 332                             "**io", 0);
 333             return;
 334         }
 335     else if ( (fd->access_mode&ADIO_RDONLY) && (file_exists!=GLOBUS_TRUE) )
 336         {
 337             if ( myrank==0 )
 338                 {
 339                     FPRINTF(stderr,"WARNING:  read-only file %s does not exist!\n",fd->filename);
 340                 }
 341         }
 342     num_gridftp_handles++;
 343 }