root/ompi/mca/io/romio321/romio/adio/ad_gridftp/ad_gridftp_open.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. exists_cb
  2. touch_ctl_cb
  3. touch_data_cb
  4. ADIOI_GRIDFTP_Open

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
   2 /* 
   3  *   Copyright (C) 2003 University of Chicago, Ohio Supercomputer Center.
   4  *   See COPYRIGHT notice in top-level directory.
   5  */
   6 
   7 #include "ad_gridftp.h"
   8 #include "adioi.h"
   9 
  10 static globus_mutex_t lock;
  11 static globus_cond_t cond;
  12 
  13 static globus_bool_t file_exists,exists_done;
  14 static void exists_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error)
  15 {    
  16     if (error)
  17         {
  18             FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
  19         }
  20     else
  21         {
  22             file_exists=GLOBUS_TRUE;
  23         }
  24     exists_done=GLOBUS_TRUE;
  25 }
  26 
  27 static globus_bool_t touch_ctl_done;
  28 static void touch_ctl_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error)
  29 {
  30     if (error)
  31         {
  32             FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
  33         }
  34     globus_mutex_lock(&lock);
  35     touch_ctl_done=GLOBUS_TRUE;
  36     globus_cond_signal(&cond);
  37     globus_mutex_unlock(&lock);
  38 }
  39 
  40 static void touch_data_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error,
  41                           globus_byte_t *buffer, globus_size_t length, globus_off_t offset,
  42                           globus_bool_t eof)
  43 {
  44     if (error)
  45         FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
  46     globus_ftp_client_register_read(handle,buffer,length,touch_data_cb,myargs);
  47     return;
  48 }
  49 
  50 void ADIOI_GRIDFTP_Open(ADIO_File fd, int *error_code)
  51 {
  52     static char myname[]="ADIOI_GRIDFTP_Open";
  53     int myrank, nprocs, keyfound;
  54     char hintval[MPI_MAX_INFO_VAL+1];
  55     globus_ftp_client_handleattr_t hattr;
  56     globus_result_t result;
  57 
  58     MPI_Comm_size(fd->comm, &nprocs);
  59     MPI_Comm_rank(fd->comm, &myrank);
  60 
  61     /* activate Globus ftp client module -- can be called multiple times, so
  62        it's safest to call once per file/connection */
  63     globus_module_activate(GLOBUS_FTP_CLIENT_MODULE);
  64     fd->fd_sys = num_gridftp_handles;
  65     /* No shared file pointers for now */
  66     fd->shared_fp_fname = NULL;
  67     *error_code = MPI_SUCCESS;
  68 
  69     /* Access modes here mean something very different here than they
  70        would on a "real" filesystem...  As a result, the amode and hint
  71        processing here is intermingled and a little weird because many
  72        of them have to do with the connection rather than the file itself.
  73        The thing that sucks about this is that read and write ops will
  74        have to check themselves if the file is being accessed rdonly, rdwr,
  75        or wronly.
  76        */
  77     result=globus_ftp_client_handleattr_init(&hattr);
  78     if ( result != GLOBUS_SUCCESS )
  79         {
  80             
  81 
  82             globus_err_handler("globus_ftp_client_handleattr_init",
  83                                myname,result);
  84             fd->fd_sys = -1;
  85             *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
  86                     myname, __LINE__, MPI_ERR_IO,
  87                     "**io",
  88                     "**io %s", globus_object_printable_to_string(globus_error_get(result)));
  89             return;
  90         }
  91     result = globus_ftp_client_operationattr_init(&(oattr[fd->fd_sys]));
  92     if ( result != GLOBUS_SUCCESS )
  93         {
  94             globus_err_handler("globus_ftp_client_operationattr_init",
  95                                myname,result);
  96             fd->fd_sys = -1;
  97             *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
  98                     myname, __LINE__, MPI_ERR_IO,
  99                     "**io",
 100                     "**io %s", globus_object_printable_to_string(globus_error_get(result)));
 101             return;
 102         }
 103 
 104 
 105     /* Always use connection caching unless told otherwise */
 106     result=globus_ftp_client_handleattr_set_cache_all(&hattr,GLOBUS_TRUE);
 107     if ( result !=GLOBUS_SUCCESS )
 108         globus_err_handler("globus_ftp_client_handleattr_set_cache_all",myname,result);
 109 
 110     /* Assume that it's safe to cache a file if it's read-only */
 111     if ( (fd->access_mode&ADIO_RDONLY) &&
 112          (result=globus_ftp_client_handleattr_add_cached_url(&hattr,fd->filename))!=GLOBUS_SUCCESS )
 113         globus_err_handler("globus_ftp_client_handleattr_add_cached_url",myname,result);
 114 
 115     /* Since we're (almost by definition) doing things that FTP S (stream)
 116        control mode can't handle, default to E (extended block) control mode
 117        for gsiftp:// URLs.  ftp:// URLs use standard stream control mode 
 118        by default.  This behavior can be overridden by the ftp_control_mode
 119        hint. */
 120 
 121     /*
 122     if ( !strncmp(fd->filename,"gsiftp:",7) && 
 123          (result=globus_ftp_client_operationattr_set_mode(&(oattr[fd->fd_sys]),GLOBUS_FTP_CONTROL_MODE_EXTENDED_BLOCK))!=GLOBUS_SUCCESS )
 124         globus_err_handler("globus_ftp_client_operationattr_set_mode",myname,result);
 125     else if ( !strncmp(fd->filename,"ftp:",4) && 
 126               (result=globus_ftp_client_operationattr_set_mode(&(oattr[fd->fd_sys]),GLOBUS_FTP_CONTROL_MODE_STREAM))!=GLOBUS_SUCCESS )
 127         globus_err_handler("globus_ftp_client_operationattr_set_mode",myname,result);
 128     */
 129 
 130     /* Set append mode if necessary */
 131     if ( (fd->access_mode&ADIO_APPEND) && 
 132          ((result=globus_ftp_client_operationattr_set_append(&(oattr[fd->fd_sys]),GLOBUS_TRUE))!=GLOBUS_SUCCESS) )
 133         globus_err_handler("globus_ftp_client_operationattr_set_append",myname,result);
 134 
 135     /* Other hint and amode processing that would affect hattr and/or 
 136        oattr[] (eg. parallelism, striping, etc.) goes here */
 137     if ( fd->info!=MPI_INFO_NULL )
 138         {
 139             ADIOI_Info_get(fd->info,"ftp_control_mode",MPI_MAX_INFO_VAL,hintval,&keyfound);
 140             if ( keyfound )
 141                 {
 142                     if ( ( !strcmp(hintval,"extended") || !strcmp(hintval,"extended_block") ) && 
 143                          (result=globus_ftp_client_operationattr_set_mode(&(oattr[fd->fd_sys]),GLOBUS_FTP_CONTROL_MODE_EXTENDED_BLOCK))!=GLOBUS_SUCCESS )
 144                         globus_err_handler("globus_ftp_client_operationattr_set_mode",myname,result);
 145                     else if ( !strcmp(hintval,"block") && 
 146                               (result=globus_ftp_client_operationattr_set_mode(&(oattr[fd->fd_sys]),GLOBUS_FTP_CONTROL_MODE_BLOCK))!=GLOBUS_SUCCESS )
 147                         globus_err_handler("globus_ftp_client_operationattr_set_mode",myname,result);
 148                     else if ( !strcmp(hintval,"compressed") && 
 149                               (result=globus_ftp_client_operationattr_set_mode(&(oattr[fd->fd_sys]),GLOBUS_FTP_CONTROL_MODE_COMPRESSED))!=GLOBUS_SUCCESS )
 150                         globus_err_handler("globus_ftp_client_operationattr_set_mode",myname,result);
 151                     else if ( !strcmp(hintval,"stream") && 
 152                               (result=globus_ftp_client_operationattr_set_mode(&(oattr[fd->fd_sys]),GLOBUS_FTP_CONTROL_MODE_STREAM))!=GLOBUS_SUCCESS )
 153                         globus_err_handler("globus_ftp_client_operationattr_set_mode",myname,result);
 154                 }
 155 
 156             ADIOI_Info_get(fd->info,"parallelism",MPI_MAX_INFO_VAL,hintval,&keyfound);
 157             if ( keyfound )
 158                 {
 159                     int nftpthreads;
 160                     
 161                     if ( sscanf(hintval,"%d",&nftpthreads)==1 )
 162                         {
 163                             globus_ftp_control_parallelism_t parallelism;
 164 
 165                             parallelism.mode = GLOBUS_FTP_CONTROL_PARALLELISM_FIXED;
 166                             parallelism.fixed.size = nftpthreads;
 167                             if ( (result=globus_ftp_client_operationattr_set_parallelism(&(oattr[fd->fd_sys]),
 168                                                                                          &parallelism))!=GLOBUS_SUCCESS )
 169                                 globus_err_handler("globus_ftp_client_operationattr_set_parallelism",myname,result);
 170                         }
 171                 }
 172 
 173             ADIOI_Info_get(fd->info,"striped_ftp",MPI_MAX_INFO_VAL,hintval,&keyfound);
 174             if ( keyfound )
 175                 {
 176                     /* if set to "true" or "enable", set up round-robin block layout */
 177                     if ( !strncmp("true",hintval,4) || !strncmp("TRUE",hintval,4) ||
 178                          !strncmp("enable",hintval,4) || !strncmp("ENABLE",hintval,4) )
 179                         {
 180                             ADIOI_Info_get(fd->info,"striping_factor",MPI_MAX_INFO_VAL,hintval,&keyfound);
 181                             if ( keyfound )
 182                                 {
 183                                     int striping_factor;
 184 
 185                                     if ( sscanf(hintval,"%d",&striping_factor)==1 )
 186                                         {
 187                                             globus_ftp_control_layout_t layout;
 188 
 189                                             layout.mode = GLOBUS_FTP_CONTROL_STRIPING_BLOCKED_ROUND_ROBIN;
 190                                             layout.round_robin.block_size = striping_factor;
 191                                             if ( (result=globus_ftp_client_operationattr_set_layout(&(oattr[fd->fd_sys]),
 192                                                                                                     &layout))!=GLOBUS_SUCCESS  )
 193                                                 globus_err_handler("globus_ftp_client_operationattr_set_layout",
 194                                                                    myname,result);
 195                                         }
 196                                 }
 197                         }
 198                 }
 199 
 200             ADIOI_Info_get(fd->info,"tcp_buffer",MPI_MAX_INFO_VAL,hintval,&keyfound);
 201             if ( keyfound )
 202                 {
 203                     /* set tcp buffer size */
 204                     int buffer_size;
 205                     if ( sscanf(hintval,"%d",&buffer_size)==1 )
 206                         {
 207                             globus_ftp_control_tcpbuffer_t tcpbuf;
 208 
 209                             tcpbuf.mode = GLOBUS_FTP_CONTROL_TCPBUFFER_FIXED;
 210                             tcpbuf.fixed.size = buffer_size;
 211                             if ( (result=globus_ftp_client_operationattr_set_tcp_buffer(&(oattr[fd->fd_sys]),
 212                                                                                         &tcpbuf))!=GLOBUS_SUCCESS )
 213                                 globus_err_handler("globus_ftp_client_operationattr_set_tcp_buffer",myname,result);
 214                         }
 215                 }
 216 
 217             ADIOI_Info_get(fd->info,"transfer_type",MPI_MAX_INFO_VAL,hintval,&keyfound);
 218             if ( keyfound )
 219                 {
 220                     globus_ftp_control_type_t filetype;
 221                     /* set transfer type (i.e. ASCII or binary) */
 222                     if ( !strcmp("ascii",hintval) || !strcmp("ASCII",hintval) )
 223                         {
 224                             filetype=GLOBUS_FTP_CONTROL_TYPE_ASCII;
 225                         }
 226                     else
 227                         {
 228                             filetype=GLOBUS_FTP_CONTROL_TYPE_IMAGE;
 229                         }
 230                     if ( (result=globus_ftp_client_operationattr_set_type(&(oattr[fd->fd_sys]),filetype))!=GLOBUS_SUCCESS )
 231                         globus_err_handler("globus_ftp_client_operationattr_set_type",myname,result);
 232                 }
 233         }
 234     else
 235         FPRINTF(stderr,"no MPI_Info object associated with %s\n",fd->filename);
 236 
 237     /* Create the ftp handle */
 238     result=globus_ftp_client_handle_init(&(gridftp_fh[fd->fd_sys]),&hattr);
 239     if ( result != GLOBUS_SUCCESS )
 240         {
 241             globus_err_handler("globus_ftp_client_handle_init",myname,result);
 242             fd->fd_sys = -1;
 243             *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
 244                     myname, __LINE__, MPI_ERR_IO,
 245                     "**io",
 246                     "**io %s", globus_object_printable_to_string(globus_error_get(result)));
 247             return;
 248         }
 249 
 250     /* Check for existence of the file */
 251     globus_mutex_init(&lock, GLOBUS_NULL);
 252     globus_cond_init(&cond, GLOBUS_NULL);
 253     file_exists=GLOBUS_FALSE;
 254     exists_done=GLOBUS_FALSE;
 255     if ( myrank==0 )
 256         {
 257             if ( (result=globus_ftp_client_exists(&(gridftp_fh[fd->fd_sys]),
 258                                                   fd->filename,
 259                                                   &(oattr[fd->fd_sys]),
 260                                                   exists_cb,
 261                                                   GLOBUS_NULL))!=GLOBUS_SUCCESS )
 262                 {
 263                     globus_err_handler("globus_ftp_client_exists",myname,result);
 264                     fd->fd_sys = -1; 
 265                     *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
 266                                     myname, __LINE__, MPI_ERR_IO,
 267                                     "**io", "**io %s", 
 268                                     globus_object_printable_to_string(globus_error_get(result)));
 269                     return;
 270                 }
 271             /* wait till the callback completes */
 272             globus_mutex_lock(&lock);
 273             while ( exists_done!=GLOBUS_TRUE )
 274                 globus_cond_wait(&cond,&lock);
 275             globus_mutex_unlock(&lock);
 276         }
 277     MPI_Barrier(fd->comm);
 278     MPI_Bcast(&file_exists,1,MPI_INT,0,fd->comm);
 279 
 280     /* It turns out that this is handled by MPI_File_open() directly */
 281     if ( (file_exists!=GLOBUS_TRUE) && (fd->access_mode&ADIO_CREATE) &&
 282          !(fd->access_mode&ADIO_EXCL) && !(fd->access_mode&ADIO_RDONLY) )
 283         {
 284             if ( myrank==0 )
 285                 {
 286                     /* if the file doesn't exist, write a single NULL to it */
 287                     globus_byte_t touchbuf=(globus_byte_t)'\0';
 288                     touch_ctl_done=GLOBUS_FALSE;
 289                     if ( (result=globus_ftp_client_put(&(gridftp_fh[fd->fd_sys]),
 290                                                        fd->filename,
 291                                                        &(oattr[fd->fd_sys]),
 292                                                        GLOBUS_NULL,
 293                                                        touch_ctl_cb,
 294                                                        GLOBUS_NULL))!=GLOBUS_SUCCESS )
 295                         {
 296                             globus_err_handler("globus_ftp_client_put",myname,result);
 297                             fd->fd_sys = -1;
 298                             *error_code = MPIO_Err_create_code(MPI_SUCCESS, 
 299                                 MPIR_ERR_RECOVERABLE,
 300                                 myname, __LINE__, MPI_ERR_IO,
 301                                 "**io", "**io %s", 
 302                                 globus_object_printable_to_string(globus_error_get(result)));
 303                             return;
 304                         }
 305                     result=globus_ftp_client_register_write(&(gridftp_fh[fd->fd_sys]),
 306                                   (globus_byte_t *)&touchbuf, 0,
 307                                   (globus_off_t)0, GLOBUS_TRUE,
 308                                   touch_data_cb, GLOBUS_NULL);
 309 
 310                     if ( result != GLOBUS_SUCCESS )
 311                         {
 312                             globus_err_handler("globus_ftp_client_register_write",myname,result);
 313                             *error_code = MPIO_Err_create_code(MPI_SUCCESS, 
 314                                 MPIR_ERR_RECOVERABLE,
 315                                 myname, __LINE__, MPI_ERR_IO,
 316                                 "**io", "**io %s", 
 317                                 globus_object_printable_to_string(globus_error_get(result)));
 318                             return;
 319                         }
 320                     globus_mutex_lock(&lock);
 321                     while ( touch_ctl_done!=GLOBUS_TRUE )
 322                         globus_cond_wait(&cond,&lock);
 323                     globus_mutex_unlock(&lock);
 324                 }
 325             MPI_Barrier(fd->comm);
 326         }
 327     else if ( (fd->access_mode&ADIO_EXCL) && (file_exists==GLOBUS_TRUE) )
 328         {
 329             fd->fd_sys = -1;
 330             *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
 331                             myname, __LINE__, MPI_ERR_IO, 
 332                             "**io", 0);
 333             return;
 334         }
 335     else if ( (fd->access_mode&ADIO_RDONLY) && (file_exists!=GLOBUS_TRUE) )
 336         {
 337             if ( myrank==0 )
 338                 {
 339                     FPRINTF(stderr,"WARNING:  read-only file %s does not exist!\n",fd->filename);
 340                 }
 341         }
 342     num_gridftp_handles++;
 343 }

/* [<][>][^][v][top][bottom][index][help] */