This source file includes following definitions.
- readcontig_ctl_cb
- readcontig_data_cb
- readdiscontig_ctl_cb
- readdiscontig_data_cb
- ADIOI_GRIDFTP_ReadContig
- ADIOI_GRIDFTP_ReadDiscontig
- ADIOI_GRIDFTP_ReadStrided
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 #include "ad_gridftp.h"
   9 #include "adioi.h"
  10 #include "adio_extern.h"
  11 
  12 static globus_mutex_t readcontig_ctl_lock;
  13 static globus_cond_t readcontig_ctl_cond;
  14 static globus_bool_t readcontig_ctl_done;
  15 static void readcontig_ctl_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error)
  16 {
  17     if (error)
  18         {
  19             FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
  20         }
  21     globus_mutex_lock(&readcontig_ctl_lock);
  22     if ( readcontig_ctl_done!=GLOBUS_TRUE )
  23         readcontig_ctl_done=GLOBUS_TRUE;
  24     globus_cond_signal(&readcontig_ctl_cond);
  25     globus_mutex_unlock(&readcontig_ctl_lock);
  26     return;
  27 }
  28 
  29 static void readcontig_data_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error,
  30                                globus_byte_t *buffer, globus_size_t length, globus_off_t offset,
  31                                globus_bool_t eof)
  32 {
  33    globus_size_t *bytes_read;
  34 
  35     bytes_read=(globus_size_t *)myargs;
  36     if (error)
  37         {
  38             FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
  39         }
  40     *bytes_read+=length;
  41     
  42 
  43 
  44     
  45 
  46 
  47 
  48 
  49 
  50 
  51 
  52 
  53     if ( !eof )
  54             globus_ftp_client_register_read(handle,
  55                                             buffer+length,
  56                                             length,
  57                                             readcontig_data_cb,
  58                                             (void *)(bytes_read));
  59     return;
  60 }
  61 
  62 static globus_mutex_t readdiscontig_ctl_lock;
  63 static globus_cond_t readdiscontig_ctl_cond;
  64 static globus_bool_t readdiscontig_ctl_done;
  65 static void readdiscontig_ctl_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error)
  66 {
  67     if (error)
  68         {
  69             FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
  70         }
  71     globus_mutex_lock(&readdiscontig_ctl_lock);
  72     if ( readdiscontig_ctl_done!=GLOBUS_TRUE )
  73         readdiscontig_ctl_done=GLOBUS_TRUE;
  74     globus_cond_signal(&readdiscontig_ctl_cond);
  75     globus_mutex_unlock(&readdiscontig_ctl_lock);
  76     return;
  77 }
  78 
  79 static void readdiscontig_data_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error,
  80                                globus_byte_t *buffer, globus_size_t length, globus_off_t offset,
  81                                globus_bool_t eof)
  82 {
  83    globus_size_t *bytes_read;
  84 
  85     bytes_read=(globus_size_t *)myargs;
  86     if (error)
  87         {
  88             FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
  89         }
  90     *bytes_read+=length;
  91     
  92 
  93 
  94     if ( !eof )
  95             globus_ftp_client_register_read(handle,
  96                                             buffer,
  97                                             length,
  98                                             readdiscontig_data_cb,
  99                                             (void *)(bytes_read));
 100     return;
 101 }
 102 
 103 void ADIOI_GRIDFTP_ReadContig(ADIO_File fd, void *buf, int count, 
 104                              MPI_Datatype datatype, int file_ptr_type,
 105                              ADIO_Offset offset, ADIO_Status *status, int
 106                              *error_code)
 107 {
 108     static char myname[]="ADIOI_GRIDFTP_ReadContig";
 109     int myrank, nprocs;
 110     MPI_Count datatype_size;
 111     globus_size_t len,bytes_read=0;
 112     globus_off_t goff;
 113     globus_result_t result;
 114 
 115     if ( fd->access_mode&ADIO_WRONLY )
 116         {
 117             *error_code=MPIR_ERR_MODE_WRONLY;
 118             return;
 119         }
 120 
 121     *error_code = MPI_SUCCESS;
 122 
 123     MPI_Comm_size(fd->comm, &nprocs);
 124     MPI_Comm_rank(fd->comm, &myrank);
 125     MPI_Type_size_x(datatype, &datatype_size);
 126 
 127     if (file_ptr_type != ADIO_EXPLICIT_OFFSET)
 128     {
 129         offset = fd->fp_ind;
 130     }
 131 
 132     
 133     goff = (globus_off_t)offset;
 134     len = ((globus_size_t)datatype_size)*((globus_size_t)count);
 135 
 136     globus_mutex_init(&readcontig_ctl_lock, GLOBUS_NULL);
 137     globus_cond_init(&readcontig_ctl_cond, GLOBUS_NULL);
 138     readcontig_ctl_done=GLOBUS_FALSE;
 139     if ( (result=globus_ftp_client_partial_get(&(gridftp_fh[fd->fd_sys]),
 140                                                fd->filename,
 141                                                &(oattr[fd->fd_sys]),
 142                                                GLOBUS_NULL,
 143                                                goff,
 144                                                goff+(globus_off_t)len,
 145                                                readcontig_ctl_cb,
 146                                                GLOBUS_NULL))!=GLOBUS_SUCCESS )
 147         {
 148             globus_err_handler("globus_ftp_client_partial_get",myname,result);
 149             *error_code=MPI_ERR_IO;
 150             ADIOI_Error(fd,*error_code,myname);
 151             return;
 152         }
 153     result=globus_ftp_client_register_read(&(gridftp_fh[fd->fd_sys]),
 154                     (globus_byte_t *)buf, len, readcontig_data_cb,
 155                     (void *)(&bytes_read));
 156     if ( result != GLOBUS_SUCCESS )
 157         {
 158             globus_err_handler("globus_ftp_client_register_read",myname,result);
 159             *error_code = MPIO_Err_create_code(MPI_SUCCESS, 
 160                             MPIR_ERR_RECOVERABLE, myname, __LINE__, 
 161                             MPI_ERR_IO, "**io", "**io %s", 
 162                             globus_object_printable_to_string(globus_error_get(result)));
 163             return;
 164         }  
 165 
 166 
 167     
 168 
 169     globus_mutex_lock(&readcontig_ctl_lock);
 170     while ( readcontig_ctl_done!=GLOBUS_TRUE )
 171         globus_cond_wait(&readcontig_ctl_cond,&readcontig_ctl_lock);
 172     globus_mutex_unlock(&readcontig_ctl_lock);
 173 
 174     globus_mutex_destroy(&readcontig_ctl_lock);
 175     globus_cond_destroy(&readcontig_ctl_cond);
 176 
 177 #ifdef HAVE_STATUS_SET_BYTES
 178     MPIR_Status_set_bytes(status, datatype, bytes_read);
 179 #endif
 180     if (file_ptr_type != ADIO_EXPLICIT_OFFSET)
 181     {
 182         fd->fp_ind += bytes_read;
 183         fd->fp_sys_posn = fd->fp_ind;
 184     }
 185     else {
 186         fd->fp_sys_posn = offset + bytes_read;
 187     }
 188 }
 189 
 190 void ADIOI_GRIDFTP_ReadDiscontig(ADIO_File fd, void *buf, int count,
 191                                  MPI_Datatype datatype, int file_ptr_type,
 192                                  ADIO_Offset offset, ADIO_Status *status, int
 193                                  *error_code)
 194 {
 195     char myname[]="ADIOI_GRIDFTP_ReadDiscontig";
 196     int myrank,nprocs;
 197     
 198     MPI_Aint btype_size,btype_extent,btype_lb;
 199     
 200     MPI_Aint ftype_size,ftype_extent,ftype_lb;
 201     
 202     MPI_Aint etype_size;
 203     MPI_Aint extent;
 204     ADIOI_Flatlist_node *flat_file;
 205     int i,buf_contig,boff,nblks;
 206     globus_off_t start,end,goff;
 207     globus_size_t bytes_read;
 208     globus_result_t result;
 209     globus_byte_t *tmp;
 210 
 211     if ( fd->access_mode&ADIO_WRONLY )
 212         {
 213             *error_code=MPIR_ERR_MODE_WRONLY;
 214             return;
 215         }
 216 
 217     *error_code=MPI_SUCCESS;
 218 
 219     MPI_Comm_rank(fd->comm,&myrank);
 220     MPI_Comm_size(fd->comm,&nprocs);
 221 
 222     etype_size=fd->etype_size;
 223     MPI_Type_size_x(fd->filetype,&ftype_size);
 224     MPI_Type_get_extent(fd->filetype,&ftype_lb,&ftype_extent);
 225     
 226 
 227     MPI_Type_size_x(datatype,&btype_size);
 228     MPI_Type_get_extent(datatype,&btype_lb,&btype_extent);
 229     ADIOI_Datatype_iscontig(datatype,&buf_contig);
 230     
 231     if ( ( btype_extent!=btype_size ) || ( ! buf_contig ) )
 232         {
 233             FPRINTF(stderr,"[%d/%d] %s called with discontigous memory buffer\n",
 234                     myrank,nprocs,myname);
 235             fflush(stderr);
 236             *error_code = MPIO_Err_create_code(MPI_SUCCESS, 
 237                             MPIR_ERR_RECOVERABLE, myname, __LINE__, 
 238                             MPI_ERR_IO, "**io", 0 );
 239             return;
 240         }
 241     
 242 
 243     
 244     flat_file = ADIOI_Flatten_and_find(fd->filetype);
 245 
 246     
 247     start=(globus_off_t)(offset*etype_size);
 248     goff=start;
 249     boff=0;
 250     extent=0;
 251     nblks=0;
 252     while ( boff < (count*btype_size) )
 253         {
 254             int blklen=0;
 255 
 256             for (i=0;i<flat_file->count;i++)
 257                 {
 258                     
 259                     if ( (boff+flat_file->blocklens[i]) < (count*btype_size) )
 260                         blklen=flat_file->blocklens[i];
 261                     else
 262                         blklen=(count*btype_size)-boff;
 263                     
 264                     boff+=blklen;
 265                     
 266 
 267 
 268                     extent=MAX(extent,nblks*ftype_extent+flat_file->indices[i]+blklen);
 269                     if ( boff>=(count*btype_size) )
 270                         break;
 271                 }
 272             nblks++;
 273         }
 274     if ( extent < count*btype_size )
 275         {
 276             FPRINTF(stderr,"[%d/%d] %s error in computing extent -- extent %d is smaller than total bytes requested %d!\n",
 277                     myrank,nprocs,myname,extent,count*btype_size);
 278             fflush(stderr);
 279             *error_code = MPIO_Err_create_code(MPI_SUCCESS, 
 280                             MPIR_ERR_RECOVERABLE, myname, __LINE__, 
 281                             MPI_ERR_IO, "**io", 0);
 282             return;
 283         }
 284     end=start+(globus_off_t)extent;
 285     tmp=(globus_byte_t *)ADIOI_Malloc((size_t)extent*sizeof(globus_byte_t));
 286 
 287     
 288     globus_mutex_init(&readdiscontig_ctl_lock, GLOBUS_NULL);
 289     globus_cond_init(&readdiscontig_ctl_cond, GLOBUS_NULL);
 290     readdiscontig_ctl_done=GLOBUS_FALSE;
 291     if ( (result=globus_ftp_client_partial_get(&(gridftp_fh[fd->fd_sys]),
 292                                                fd->filename,
 293                                                &(oattr[fd->fd_sys]),
 294                                                GLOBUS_NULL,
 295                                                start,
 296                                                end,
 297                                                readdiscontig_ctl_cb,
 298                                                GLOBUS_NULL))!=GLOBUS_SUCCESS )
 299         {
 300             globus_err_handler("globus_ftp_client_partial_get",myname,result);
 301             *error_code = MPIO_Err_create_code(MPI_SUCCESS, 
 302                             MPIR_ERR_RECOVERABLE, myname, __LINE__, 
 303                             MPI_ERR_IO, "**io", "**io %s", 
 304                             globus_object_printable_to_string(globus_error_get(result)));
 305             return;
 306         }
 307 
 308     
 309     
 310 
 311 
 312 
 313 
 314 
 315 
 316     if ( (result=globus_ftp_client_register_read(&(gridftp_fh[fd->fd_sys]),
 317                                                  tmp,
 318                                                  (globus_size_t)extent,
 319                                                  readdiscontig_data_cb,
 320                                                  (void *)(&bytes_read)))!=GLOBUS_SUCCESS )
 321         {
 322             globus_err_handler("globus_ftp_client_register_read",myname,result);
 323             *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
 324                     myname, __LINE__, MPI_ERR_IO,
 325                     "**io",
 326                     "**io %s", globus_object_printable_to_string(globus_error_get(result)));
 327             return;
 328         }
 329     
 330 
 331     globus_mutex_lock(&readdiscontig_ctl_lock);
 332     while ( readdiscontig_ctl_done!=GLOBUS_TRUE )
 333         globus_cond_wait(&readdiscontig_ctl_cond,&readdiscontig_ctl_lock);
 334     globus_mutex_unlock(&readdiscontig_ctl_lock);
 335 
 336     globus_mutex_destroy(&readdiscontig_ctl_lock);
 337     globus_cond_destroy(&readdiscontig_ctl_cond);
 338 
 339     boff=0;
 340     nblks=0;
 341     goff=0;
 342     while ( boff < (count*btype_size) )
 343         {
 344             int i,blklen;
 345 
 346             for (i=0;i<flat_file->count;i++)
 347                 {
 348                     if ( (boff+flat_file->blocklens[i]) < (count*btype_size) )
 349                         blklen=flat_file->blocklens[i];
 350                     else
 351                         blklen=(count*btype_size)-boff;
 352                     if ( blklen > 0 )
 353                         {
 354                             goff=nblks*ftype_extent+flat_file->indices[i];
 355                             memcpy((globus_byte_t *)buf+boff,tmp+goff,(size_t)blklen);
 356                             boff+=blklen;
 357                             if ( boff>=(count*btype_size) )
 358                                 break;
 359                         }
 360                 }
 361             nblks++;
 362         }
 363     ADIOI_Free(tmp);
 364 
 365 #ifdef HAVE_STATUS_SET_BYTES
 366     MPIR_Status_set_bytes(status, datatype, bytes_read);
 367 #endif
 368     if (file_ptr_type != ADIO_EXPLICIT_OFFSET)
 369     {
 370         fd->fp_ind += extent;
 371         fd->fp_sys_posn = fd->fp_ind;
 372     }
 373     else {
 374         fd->fp_sys_posn = offset + extent;
 375     }
 376 }
 377 
 378 void ADIOI_GRIDFTP_ReadStrided(ADIO_File fd, void *buf, int count,
 379                               MPI_Datatype datatype, int file_ptr_type,
 380                               ADIO_Offset offset, ADIO_Status *status, int
 381                               *error_code)
 382 {
 383     
 384 
 385 
 386 
 387 
 388 
 389 
 390 
 391 
 392 
 393 
 394 
 395 
 396 
 397 
 398 
 399 
 400 
 401 
 402     char myname[]="ADIOI_GRIDFTP_ReadStrided";
 403     int myrank, nprocs;
 404     int i,j;
 405     int buf_contig,file_contig;
 406     MPI_Aint btype_size,bufsize;
 407     globus_off_t start,disp;
 408     globus_size_t bytes_read;
 409     globus_byte_t *intermediate;
 410 
 411     *error_code = MPI_SUCCESS;
 412 
 413     MPI_Comm_size(fd->comm, &nprocs);
 414     MPI_Comm_rank(fd->comm, &myrank);
 415 
 416     MPI_Type_size_x(datatype,&btype_size);
 417     bufsize=count*btype_size;
 418     ADIOI_Datatype_iscontig(fd->filetype,&file_contig);
 419     ADIOI_Datatype_iscontig(datatype,&buf_contig);
 420     if ( buf_contig && !file_contig )
 421         {
 422             
 423             ADIOI_GRIDFTP_ReadDiscontig(fd, buf, count, datatype,
 424                                         file_ptr_type, offset, status, error_code);
 425         }
 426     else if ( !buf_contig && file_contig )
 427         {
 428             
 429             int posn=0;
 430 
 431             
 432             intermediate=(globus_byte_t *)ADIOI_Malloc((size_t)bufsize);
 433             ADIOI_GRIDFTP_ReadContig(fd, intermediate, bufsize, MPI_BYTE,
 434                                      file_ptr_type, offset, status, error_code);
 435 
 436             
 437             MPI_Unpack(intermediate,bufsize,&posn,buf,count,datatype,fd->comm);
 438 
 439             ADIOI_Free(intermediate);
 440         }
 441     else if ( !buf_contig && !file_contig )
 442         {
 443             
 444             int posn=0;
 445 
 446             
 447             intermediate=(globus_byte_t *)ADIOI_Malloc((size_t)bufsize);
 448             ADIOI_GRIDFTP_ReadDiscontig(fd, intermediate, bufsize, MPI_BYTE,
 449                                         file_ptr_type, offset, status, error_code);
 450 
 451             
 452             posn=0;
 453             MPI_Unpack(intermediate,bufsize,&posn,buf,count,datatype,fd->comm);
 454 
 455             ADIOI_Free(intermediate);
 456         }
 457     else 
 458         {
 459             
 460             ADIOI_GRIDFTP_ReadContig(fd, buf, count, datatype,
 461                                      file_ptr_type, offset, status, error_code);
 462         }
 463 
 464 }
 465