root/ompi/mca/io/romio321/romio/adio/ad_gridftp/ad_gridftp_read.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. readcontig_ctl_cb
  2. readcontig_data_cb
  3. readdiscontig_ctl_cb
  4. readdiscontig_data_cb
  5. ADIOI_GRIDFTP_ReadContig
  6. ADIOI_GRIDFTP_ReadDiscontig
  7. ADIOI_GRIDFTP_ReadStrided

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
   2 /* 
   3  *
   4  *   Copyright (C) 2003 University of Chicago, Ohio Supercomputer Center. 
   5  *   See COPYRIGHT notice in top-level directory.
   6  */
   7 
   8 #include "ad_gridftp.h"
   9 #include "adioi.h"
  10 #include "adio_extern.h"
  11 
  12 static globus_mutex_t readcontig_ctl_lock;
  13 static globus_cond_t readcontig_ctl_cond;
  14 static globus_bool_t readcontig_ctl_done;
  15 static void readcontig_ctl_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error)
  16 {
  17     if (error)
  18         {
  19             FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
  20         }
  21     globus_mutex_lock(&readcontig_ctl_lock);
  22     if ( readcontig_ctl_done!=GLOBUS_TRUE )
  23         readcontig_ctl_done=GLOBUS_TRUE;
  24     globus_cond_signal(&readcontig_ctl_cond);
  25     globus_mutex_unlock(&readcontig_ctl_lock);
  26     return;
  27 }
  28 
  29 static void readcontig_data_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error,
  30                                globus_byte_t *buffer, globus_size_t length, globus_off_t offset,
  31                                globus_bool_t eof)
  32 {
  33    globus_size_t *bytes_read;
  34 
  35     bytes_read=(globus_size_t *)myargs;
  36     if (error)
  37         {
  38             FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
  39         }
  40     *bytes_read+=length;
  41     /* I don't understand why the data callback has to keep recalling register_read,
  42        but everything I've done and all the examples I've seen seem to require
  43        that behavior to work... */
  44     /*
  45      * Using buffer+length seems to work, but is probably not the correct
  46      * solution.  A big read of 256kB chunks will have lines like this:
  47         readcontig_data_cb: buffer 0x404e0008 length 0 offset 31719424 eof 1
  48         readcontig_data_cb: buffer 0x404a0008 length 65536 offset 31981568 eof 0
  49         readcontig_data_cb: buffer 0x404b0008 length 65536 offset 32047104 eof 0
  50         readcontig_data_cb: buffer 0x404c0008 length 65536 offset 32112640 eof 0
  51         readcontig_data_cb: buffer 0x404d0008 length 65536 offset 32178176 eof 0
  52      */
  53     if ( !eof )
  54             globus_ftp_client_register_read(handle,
  55                                             buffer+length,
  56                                             length,
  57                                             readcontig_data_cb,
  58                                             (void *)(bytes_read));
  59     return;
  60 }
  61 
  62 static globus_mutex_t readdiscontig_ctl_lock;
  63 static globus_cond_t readdiscontig_ctl_cond;
  64 static globus_bool_t readdiscontig_ctl_done;
  65 static void readdiscontig_ctl_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error)
  66 {
  67     if (error)
  68         {
  69             FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
  70         }
  71     globus_mutex_lock(&readdiscontig_ctl_lock);
  72     if ( readdiscontig_ctl_done!=GLOBUS_TRUE )
  73         readdiscontig_ctl_done=GLOBUS_TRUE;
  74     globus_cond_signal(&readdiscontig_ctl_cond);
  75     globus_mutex_unlock(&readdiscontig_ctl_lock);
  76     return;
  77 }
  78 
  79 static void readdiscontig_data_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error,
  80                                globus_byte_t *buffer, globus_size_t length, globus_off_t offset,
  81                                globus_bool_t eof)
  82 {
  83    globus_size_t *bytes_read;
  84 
  85     bytes_read=(globus_size_t *)myargs;
  86     if (error)
  87         {
  88             FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
  89         }
  90     *bytes_read+=length;
  91     /* I don't understand why the data callback has to keep recalling register_read,
  92        but everything I've done and all the examples I've seen seem to require
  93        that behavior to work... */
  94     if ( !eof )
  95             globus_ftp_client_register_read(handle,
  96                                             buffer,
  97                                             length,
  98                                             readdiscontig_data_cb,
  99                                             (void *)(bytes_read));
 100     return;
 101 }
 102 
 103 void ADIOI_GRIDFTP_ReadContig(ADIO_File fd, void *buf, int count, 
 104                              MPI_Datatype datatype, int file_ptr_type,
 105                              ADIO_Offset offset, ADIO_Status *status, int
 106                              *error_code)
 107 {
 108     static char myname[]="ADIOI_GRIDFTP_ReadContig";
 109     int myrank, nprocs;
 110     MPI_Count datatype_size;
 111     globus_size_t len,bytes_read=0;
 112     globus_off_t goff;
 113     globus_result_t result;
 114 
 115     if ( fd->access_mode&ADIO_WRONLY )
 116         {
 117             *error_code=MPIR_ERR_MODE_WRONLY;
 118             return;
 119         }
 120 
 121     *error_code = MPI_SUCCESS;
 122 
 123     MPI_Comm_size(fd->comm, &nprocs);
 124     MPI_Comm_rank(fd->comm, &myrank);
 125     MPI_Type_size_x(datatype, &datatype_size);
 126 
 127     if (file_ptr_type != ADIO_EXPLICIT_OFFSET)
 128     {
 129         offset = fd->fp_ind;
 130     }
 131 
 132     /* Do the gridftp I/O transfer */
 133     goff = (globus_off_t)offset;
 134     len = ((globus_size_t)datatype_size)*((globus_size_t)count);
 135 
 136     globus_mutex_init(&readcontig_ctl_lock, GLOBUS_NULL);
 137     globus_cond_init(&readcontig_ctl_cond, GLOBUS_NULL);
 138     readcontig_ctl_done=GLOBUS_FALSE;
 139     if ( (result=globus_ftp_client_partial_get(&(gridftp_fh[fd->fd_sys]),
 140                                                fd->filename,
 141                                                &(oattr[fd->fd_sys]),
 142                                                GLOBUS_NULL,
 143                                                goff,
 144                                                goff+(globus_off_t)len,
 145                                                readcontig_ctl_cb,
 146                                                GLOBUS_NULL))!=GLOBUS_SUCCESS )
 147         {
 148             globus_err_handler("globus_ftp_client_partial_get",myname,result);
 149             *error_code=MPI_ERR_IO;
 150             ADIOI_Error(fd,*error_code,myname);
 151             return;
 152         }
 153     result=globus_ftp_client_register_read(&(gridftp_fh[fd->fd_sys]),
 154                     (globus_byte_t *)buf, len, readcontig_data_cb,
 155                     (void *)(&bytes_read));
 156     if ( result != GLOBUS_SUCCESS )
 157         {
 158             globus_err_handler("globus_ftp_client_register_read",myname,result);
 159             *error_code = MPIO_Err_create_code(MPI_SUCCESS, 
 160                             MPIR_ERR_RECOVERABLE, myname, __LINE__, 
 161                             MPI_ERR_IO, "**io", "**io %s", 
 162                             globus_object_printable_to_string(globus_error_get(result)));
 163             return;
 164         }  
 165 
 166 
 167     /* The ctl callback won't start till the data callbacks complete, so it's
 168        safe to wait on just the ctl callback */
 169     globus_mutex_lock(&readcontig_ctl_lock);
 170     while ( readcontig_ctl_done!=GLOBUS_TRUE )
 171         globus_cond_wait(&readcontig_ctl_cond,&readcontig_ctl_lock);
 172     globus_mutex_unlock(&readcontig_ctl_lock);
 173 
 174     globus_mutex_destroy(&readcontig_ctl_lock);
 175     globus_cond_destroy(&readcontig_ctl_cond);
 176 
 177 #ifdef HAVE_STATUS_SET_BYTES
 178     MPIR_Status_set_bytes(status, datatype, bytes_read);
 179 #endif
 180     if (file_ptr_type != ADIO_EXPLICIT_OFFSET)
 181     {
 182         fd->fp_ind += bytes_read;
 183         fd->fp_sys_posn = fd->fp_ind;
 184     }
 185     else {
 186         fd->fp_sys_posn = offset + bytes_read;
 187     }
 188 }
 189 
 190 void ADIOI_GRIDFTP_ReadDiscontig(ADIO_File fd, void *buf, int count,
 191                                  MPI_Datatype datatype, int file_ptr_type,
 192                                  ADIO_Offset offset, ADIO_Status *status, int
 193                                  *error_code)
 194 {
 195     char myname[]="ADIOI_GRIDFTP_ReadDiscontig";
 196     int myrank,nprocs;
 197     /* size and extent of buffer in memory */
 198     MPI_Aint btype_size,btype_extent,btype_lb;
 199     /* size and extent of file record layout */
 200     MPI_Aint ftype_size,ftype_extent,ftype_lb;
 201     /* size of file elemental type; seeks are done in units of this */
 202     MPI_Aint etype_size;
 203     MPI_Aint extent;
 204     ADIOI_Flatlist_node *flat_file;
 205     int i,buf_contig,boff,nblks;
 206     globus_off_t start,end,goff;
 207     globus_size_t bytes_read;
 208     globus_result_t result;
 209     globus_byte_t *tmp;
 210 
 211     if ( fd->access_mode&ADIO_WRONLY )
 212         {
 213             *error_code=MPIR_ERR_MODE_WRONLY;
 214             return;
 215         }
 216 
 217     *error_code=MPI_SUCCESS;
 218 
 219     MPI_Comm_rank(fd->comm,&myrank);
 220     MPI_Comm_size(fd->comm,&nprocs);
 221 
 222     etype_size=fd->etype_size;
 223     MPI_Type_size_x(fd->filetype,&ftype_size);
 224     MPI_Type_get_extent(fd->filetype,&ftype_lb,&ftype_extent);
 225     /* This is arguably unnecessary, as this routine assumes that the
 226        buffer in memory is contiguous */
 227     MPI_Type_size_x(datatype,&btype_size);
 228     MPI_Type_get_extent(datatype,&btype_lb,&btype_extent);
 229     ADIOI_Datatype_iscontig(datatype,&buf_contig);
 230     
 231     if ( ( btype_extent!=btype_size ) || ( ! buf_contig ) )
 232         {
 233             FPRINTF(stderr,"[%d/%d] %s called with discontigous memory buffer\n",
 234                     myrank,nprocs,myname);
 235             fflush(stderr);
 236             *error_code = MPIO_Err_create_code(MPI_SUCCESS, 
 237                             MPIR_ERR_RECOVERABLE, myname, __LINE__, 
 238                             MPI_ERR_IO, "**io", 0 );
 239             return;
 240         }
 241     /* from here we can assume btype_extent==btype_size */
 242 
 243     /* Flatten out fd->filetype so we know which blocks to skip */
 244     flat_file = ADIOI_Flatten_and_find(fd->filetype);
 245 
 246     /* Figure out how big the area to read is */
 247     start=(globus_off_t)(offset*etype_size);
 248     goff=start;
 249     boff=0;
 250     extent=0;
 251     nblks=0;
 252     while ( boff < (count*btype_size) )
 253         {
 254             int blklen=0;
 255 
 256             for (i=0;i<flat_file->count;i++)
 257                 {
 258                     /* find the length of the next block */
 259                     if ( (boff+flat_file->blocklens[i]) < (count*btype_size) )
 260                         blklen=flat_file->blocklens[i];
 261                     else
 262                         blklen=(count*btype_size)-boff;
 263                     /* increment buffer size to be used */
 264                     boff+=blklen;
 265                     /* compute extent -- the nblks*ftype_extent bit is
 266                        there so we remember how many ftypes we've already
 267                        been through */
 268                     extent=MAX(extent,nblks*ftype_extent+flat_file->indices[i]+blklen);
 269                     if ( boff>=(count*btype_size) )
 270                         break;
 271                 }
 272             nblks++;
 273         }
 274     if ( extent < count*btype_size )
 275         {
 276             FPRINTF(stderr,"[%d/%d] %s error in computing extent -- extent %d is smaller than total bytes requested %d!\n",
 277                     myrank,nprocs,myname,extent,count*btype_size);
 278             fflush(stderr);
 279             *error_code = MPIO_Err_create_code(MPI_SUCCESS, 
 280                             MPIR_ERR_RECOVERABLE, myname, __LINE__, 
 281                             MPI_ERR_IO, "**io", 0);
 282             return;
 283         }
 284     end=start+(globus_off_t)extent;
 285     tmp=(globus_byte_t *)ADIOI_Malloc((size_t)extent*sizeof(globus_byte_t));
 286 
 287     /* start up the globus partial read */
 288     globus_mutex_init(&readdiscontig_ctl_lock, GLOBUS_NULL);
 289     globus_cond_init(&readdiscontig_ctl_cond, GLOBUS_NULL);
 290     readdiscontig_ctl_done=GLOBUS_FALSE;
 291     if ( (result=globus_ftp_client_partial_get(&(gridftp_fh[fd->fd_sys]),
 292                                                fd->filename,
 293                                                &(oattr[fd->fd_sys]),
 294                                                GLOBUS_NULL,
 295                                                start,
 296                                                end,
 297                                                readdiscontig_ctl_cb,
 298                                                GLOBUS_NULL))!=GLOBUS_SUCCESS )
 299         {
 300             globus_err_handler("globus_ftp_client_partial_get",myname,result);
 301             *error_code = MPIO_Err_create_code(MPI_SUCCESS, 
 302                             MPIR_ERR_RECOVERABLE, myname, __LINE__, 
 303                             MPI_ERR_IO, "**io", "**io %s", 
 304                             globus_object_printable_to_string(globus_error_get(result)));
 305             return;
 306         }
 307 
 308     /* Do all the actual I/Os */
 309     /* Since globus_ftp_client_register_read() is brain-dead and doesn't
 310        let you specify an offset, we have to slurp the entire extent into
 311        memory and then parse out the pieces we want...  Sucks, doesn't it?
 312 
 313        This should probably be done in chunks (preferably of a size
 314        set using a file hint), but that'll have to come later.
 315        --TB */
 316     if ( (result=globus_ftp_client_register_read(&(gridftp_fh[fd->fd_sys]),
 317                                                  tmp,
 318                                                  (globus_size_t)extent,
 319                                                  readdiscontig_data_cb,
 320                                                  (void *)(&bytes_read)))!=GLOBUS_SUCCESS )
 321         {
 322             globus_err_handler("globus_ftp_client_register_read",myname,result);
 323             *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
 324                     myname, __LINE__, MPI_ERR_IO,
 325                     "**io",
 326                     "**io %s", globus_object_printable_to_string(globus_error_get(result)));
 327             return;
 328         }
 329     /* The ctl callback won't start till the data callbacks complete, so it's
 330        safe to wait on just the ctl callback */
 331     globus_mutex_lock(&readdiscontig_ctl_lock);
 332     while ( readdiscontig_ctl_done!=GLOBUS_TRUE )
 333         globus_cond_wait(&readdiscontig_ctl_cond,&readdiscontig_ctl_lock);
 334     globus_mutex_unlock(&readdiscontig_ctl_lock);
 335 
 336     globus_mutex_destroy(&readdiscontig_ctl_lock);
 337     globus_cond_destroy(&readdiscontig_ctl_cond);
 338 
 339     boff=0;
 340     nblks=0;
 341     goff=0;
 342     while ( boff < (count*btype_size) )
 343         {
 344             int i,blklen;
 345 
 346             for (i=0;i<flat_file->count;i++)
 347                 {
 348                     if ( (boff+flat_file->blocklens[i]) < (count*btype_size) )
 349                         blklen=flat_file->blocklens[i];
 350                     else
 351                         blklen=(count*btype_size)-boff;
 352                     if ( blklen > 0 )
 353                         {
 354                             goff=nblks*ftype_extent+flat_file->indices[i];
 355                             memcpy((globus_byte_t *)buf+boff,tmp+goff,(size_t)blklen);
 356                             boff+=blklen;
 357                             if ( boff>=(count*btype_size) )
 358                                 break;
 359                         }
 360                 }
 361             nblks++;
 362         }
 363     ADIOI_Free(tmp);
 364 
 365 #ifdef HAVE_STATUS_SET_BYTES
 366     MPIR_Status_set_bytes(status, datatype, bytes_read);
 367 #endif
 368     if (file_ptr_type != ADIO_EXPLICIT_OFFSET)
 369     {
 370         fd->fp_ind += extent;
 371         fd->fp_sys_posn = fd->fp_ind;
 372     }
 373     else {
 374         fd->fp_sys_posn = offset + extent;
 375     }
 376 }
 377 
 378 void ADIOI_GRIDFTP_ReadStrided(ADIO_File fd, void *buf, int count,
 379                               MPI_Datatype datatype, int file_ptr_type,
 380                               ADIO_Offset offset, ADIO_Status *status, int
 381                               *error_code)
 382 {
 383     /*
 384     int myrank, nprocs;
 385 
 386     *error_code = MPI_SUCCESS;
 387 
 388     MPI_Comm_size(fd->comm, &nprocs);
 389     MPI_Comm_rank(fd->comm, &myrank);
 390 #ifdef PRINT_ERR_MSG
 391     FPRINTF(stdout, "[%d/%d] ADIOI_GRIDFTP_ReadStrided called on %s\n", myrank, 
 392             nprocs, fd->filename);
 393     FPRINTF(stdout, "[%d/%d]    calling ADIOI_GEN_ReadStrided\n", myrank, 
 394             nprocs);
 395 #endif
 396 
 397     ADIOI_GEN_ReadStrided(fd, buf, count, datatype, file_ptr_type, offset,
 398                           status, error_code);
 399     
 400     */
 401 
 402     char myname[]="ADIOI_GRIDFTP_ReadStrided";
 403     int myrank, nprocs;
 404     int i,j;
 405     int buf_contig,file_contig;
 406     MPI_Aint btype_size,bufsize;
 407     globus_off_t start,disp;
 408     globus_size_t bytes_read;
 409     globus_byte_t *intermediate;
 410 
 411     *error_code = MPI_SUCCESS;
 412 
 413     MPI_Comm_size(fd->comm, &nprocs);
 414     MPI_Comm_rank(fd->comm, &myrank);
 415 
 416     MPI_Type_size_x(datatype,&btype_size);
 417     bufsize=count*btype_size;
 418     ADIOI_Datatype_iscontig(fd->filetype,&file_contig);
 419     ADIOI_Datatype_iscontig(datatype,&buf_contig);
 420     if ( buf_contig && !file_contig )
 421         {
 422             /* Contiguous in memory, discontig in file */
 423             ADIOI_GRIDFTP_ReadDiscontig(fd, buf, count, datatype,
 424                                         file_ptr_type, offset, status, error_code);
 425         }
 426     else if ( !buf_contig && file_contig )
 427         {
 428             /* Discontiguous in mem, contig in file -- comparatively easy */
 429             int posn=0;
 430 
 431             /* read contiguous data into intermediate buffer */
 432             intermediate=(globus_byte_t *)ADIOI_Malloc((size_t)bufsize);
 433             ADIOI_GRIDFTP_ReadContig(fd, intermediate, bufsize, MPI_BYTE,
 434                                      file_ptr_type, offset, status, error_code);
 435 
 436             /* explode contents of intermediate buffer into main buffer */
 437             MPI_Unpack(intermediate,bufsize,&posn,buf,count,datatype,fd->comm);
 438 
 439             ADIOI_Free(intermediate);
 440         }
 441     else if ( !buf_contig && !file_contig )
 442         {
 443             /* Discontig in both mem and file -- the hardest case */
 444             int posn=0;
 445 
 446             /* Read discontiguous data into intermediate buffer */
 447             intermediate=(globus_byte_t *)ADIOI_Malloc((size_t)bufsize);
 448             ADIOI_GRIDFTP_ReadDiscontig(fd, intermediate, bufsize, MPI_BYTE,
 449                                         file_ptr_type, offset, status, error_code);
 450 
 451             /* explode contents of intermediate buffer into main buffer */
 452             posn=0;
 453             MPI_Unpack(intermediate,bufsize,&posn,buf,count,datatype,fd->comm);
 454 
 455             ADIOI_Free(intermediate);
 456         }
 457     else 
 458         {
 459             /* Why did you bother calling ReadStrided?!?!?! */
 460             ADIOI_GRIDFTP_ReadContig(fd, buf, count, datatype,
 461                                      file_ptr_type, offset, status, error_code);
 462         }
 463 
 464 }
 465 

/* [<][>][^][v][top][bottom][index][help] */