root/ompi/mca/io/romio321/romio/adio/ad_gridftp/ad_gridftp_write.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. writecontig_ctl_cb
  2. writecontig_data_cb
  3. writediscontig_ctl_cb
  4. writediscontig_data_cb
  5. ADIOI_GRIDFTP_WriteContig
  6. ADIOI_GRIDFTP_WriteDiscontig
  7. ADIOI_GRIDFTP_WriteStrided

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
   2 /* 
   3  *
   4  *   Copyright (C) 2003 University of Chicago, Ohio Supercomputer Center. 
   5  *   See COPYRIGHT notice in top-level directory.
   6  */
   7 
   8 #include "ad_gridftp.h"
   9 #include "adioi.h"
  10 #include "adio_extern.h"
  11 
  12 static globus_mutex_t writecontig_ctl_lock;
  13 static globus_cond_t writecontig_ctl_cond;
  14 static globus_bool_t writecontig_ctl_done;
  15 static void writecontig_ctl_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error)
  16 {
  17     if (error)
  18         {
  19             FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
  20         }
  21     globus_mutex_lock(&writecontig_ctl_lock);
  22     if ( writecontig_ctl_done!=GLOBUS_TRUE )
  23         writecontig_ctl_done=GLOBUS_TRUE;
  24     globus_cond_signal(&writecontig_ctl_cond);
  25     globus_mutex_unlock(&writecontig_ctl_lock);
  26 #ifdef PRINT_ERR_MSG
  27     FPRINTF(stderr,"finished with contig write transaction\n");
  28 #endif /* PRINT_ERR_MSG */
  29     return;
  30 }
  31 
  32 static void writecontig_data_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error,
  33                                globus_byte_t *buffer, globus_size_t length, globus_off_t offset,
  34                                globus_bool_t eof)
  35 {
  36    globus_size_t *bytes_written;
  37 
  38     bytes_written=(globus_size_t *)myargs;
  39     if (error)
  40         {
  41             FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
  42         }
  43     *bytes_written+=length;
  44     /* I don't understand why the data callback has to keep recalling register_write,
  45        but everything I've done and all the examples I've seen seem to require
  46        that behavior to work... */
  47     if ( !eof )
  48         {
  49             globus_ftp_client_register_write(handle,
  50                                              buffer,
  51                                              length,
  52                                              offset,
  53                                              GLOBUS_TRUE,
  54                                              writecontig_data_cb,
  55                                              (void *)(bytes_written));
  56         }
  57 #ifdef PRINT_ERR_MSG
  58     FPRINTF(stderr,"wrote %Ld bytes...",(long long)length);
  59 #endif /* PRINT_ERR_MSG */
  60     return;
  61 }
  62 
  63 
  64 static globus_mutex_t writediscontig_ctl_lock;
  65 static globus_cond_t writediscontig_ctl_cond;
  66 static globus_bool_t writediscontig_ctl_done;
  67 static void writediscontig_ctl_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error)
  68 {
  69     if (error)
  70         {
  71             FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
  72         }
  73     globus_mutex_lock(&writediscontig_ctl_lock);
  74     if ( writediscontig_ctl_done!=GLOBUS_TRUE )
  75         writediscontig_ctl_done=GLOBUS_TRUE;
  76     globus_cond_signal(&writediscontig_ctl_cond);
  77     globus_mutex_unlock(&writediscontig_ctl_lock);
  78     return;
  79 }
  80 
  81 static void writediscontig_data_cb(void *myargs, globus_ftp_client_handle_t *handle, globus_object_t *error,
  82                                globus_byte_t *buffer, globus_size_t length, globus_off_t offset,
  83                                globus_bool_t eof)
  84 {
  85    globus_size_t *bytes_written;
  86 
  87     bytes_written=(globus_size_t *)myargs;
  88     if (error)
  89         {
  90             FPRINTF(stderr, "%s\n", globus_object_printable_to_string(error));
  91         }
  92     *bytes_written+=length;
  93     /* I don't understand why the data callback has to keep recalling register_read,
  94        but everything I've done and all the examples I've seen seem to require
  95        that behavior to work... */
  96     if ( !eof )
  97         globus_ftp_client_register_write(handle,
  98                                          buffer,
  99                                          length,
 100                                          offset,
 101                                          eof,
 102                                          writediscontig_data_cb,
 103                                          (void *)(bytes_written));
 104     FPRINTF(stderr,"wrote %Ld bytes...",(long long)length); 
 105     return;
 106 }
 107 
 108 
 109 void ADIOI_GRIDFTP_WriteContig(ADIO_File fd, void *buf, int count, 
 110                              MPI_Datatype datatype, int file_ptr_type,
 111                              ADIO_Offset offset, ADIO_Status *status, int
 112                              *error_code)
 113 {
 114     char myname[]="ADIOI_GRIDFTP_WriteContig";
 115     int myrank, nprocs;
 116     MPI_Count datatype_size;
 117     globus_size_t len,bytes_written=0;
 118     globus_off_t goff;
 119     globus_result_t result;
 120 
 121     if ( fd->access_mode&ADIO_RDONLY )
 122         {
 123             *error_code=MPI_ERR_AMODE;
 124             return;
 125         }
 126 
 127     *error_code = MPI_SUCCESS;
 128 
 129     MPI_Comm_size(fd->comm, &nprocs);
 130     MPI_Comm_rank(fd->comm, &myrank);
 131     MPI_Type_size_x(datatype, &datatype_size);
 132 
 133     if (file_ptr_type != ADIO_EXPLICIT_OFFSET)
 134     {
 135         offset = fd->fp_ind;
 136     }
 137 
 138     /* Do the gridftp I/O transfer */
 139     goff = (globus_off_t)offset;
 140     len = ((globus_size_t)datatype_size)*((globus_size_t)count);
 141 
 142     globus_mutex_init(&writecontig_ctl_lock, GLOBUS_NULL);
 143     globus_cond_init(&writecontig_ctl_cond, GLOBUS_NULL);
 144     writecontig_ctl_done=GLOBUS_FALSE;
 145     if ( (result=globus_ftp_client_partial_put(&(gridftp_fh[fd->fd_sys]),
 146                                                fd->filename,
 147                                                &(oattr[fd->fd_sys]),
 148                                                GLOBUS_NULL,
 149                                                goff,
 150                                                goff+(globus_off_t)len,
 151                                                writecontig_ctl_cb,
 152                                                GLOBUS_NULL))!=GLOBUS_SUCCESS )
 153         {
 154             globus_err_handler("globus_ftp_client_partial_put",myname,result);
 155             *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
 156                     myname, __LINE__, MPI_ERR_IO,
 157                     "**io",
 158                     "**io %s", globus_object_printable_to_string(globus_error_get(result)));
 159             return;
 160         }
 161     if ( (result=globus_ftp_client_register_write(&(gridftp_fh[fd->fd_sys]),
 162                                                   (globus_byte_t *)buf,
 163                                                   len,
 164                                                   goff,
 165                                                   GLOBUS_TRUE,
 166                                                   writecontig_data_cb,
 167                                                   (void *)(&bytes_written)))!=GLOBUS_SUCCESS )
 168         {
 169             globus_err_handler("globus_ftp_client_register_write",myname,result);
 170             *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
 171                     myname, __LINE__, MPI_ERR_IO,
 172                     "**io",
 173                     "**io %s", globus_object_printable_to_string(globus_error_get(result)));
 174             return;
 175         }
 176 
 177 
 178     /* The ctl callback won't start till the data callbacks complete, so it's
 179        safe to wait on just the ctl callback */
 180     globus_mutex_lock(&writecontig_ctl_lock);
 181     while ( writecontig_ctl_done!=GLOBUS_TRUE )
 182         globus_cond_wait(&writecontig_ctl_cond,&writecontig_ctl_lock);
 183     globus_mutex_unlock(&writecontig_ctl_lock);
 184 
 185     globus_mutex_destroy(&writecontig_ctl_lock);
 186     globus_cond_destroy(&writecontig_ctl_cond);
 187 
 188 #ifdef HAVE_STATUS_SET_BYTES
 189     MPIR_Status_set_bytes(status, datatype, bytes_written);
 190 #endif
 191     if (file_ptr_type != ADIO_EXPLICIT_OFFSET)
 192     {
 193         offset = fd->fp_ind;
 194         fd->fp_ind += bytes_written;
 195         fd->fp_sys_posn = fd->fp_ind;
 196     }
 197     else {
 198         fd->fp_sys_posn = offset + bytes_written;
 199     }
 200 }
 201 
 202 
 203 void ADIOI_GRIDFTP_WriteDiscontig(ADIO_File fd, void *buf, int count,
 204                                  MPI_Datatype datatype, int file_ptr_type,
 205                                  ADIO_Offset offset, ADIO_Status *status, int
 206                                  *error_code)
 207 {
 208     char myname[]="ADIOI_GRIDFTP_WriteDiscontig";
 209     int myrank,nprocs;
 210     MPI_Aint btype_size,btype_extent,btype_lb;
 211     MPI_Aint ftype_size,ftype_extent,ftype_lb;
 212     MPI_Aint etype_size;
 213     MPI_Aint extent;
 214     ADIOI_Flatlist_node *flat_file;
 215     int buf_contig,boff,i,nblks;
 216     globus_off_t start,end,goff;
 217     globus_size_t bytes_written;
 218     globus_result_t result;
 219 
 220     MPI_Comm_rank(fd->comm,&myrank);
 221     MPI_Comm_size(fd->comm,&nprocs);
 222     etype_size=fd->etype_size;
 223     MPI_Type_size_x(fd->filetype,&ftype_size);
 224     MPI_Type_get_extent(fd->filetype,&ftype_lb,&ftype_extent);
 225     /* This is arguably unnecessary, as this routine assumes that the
 226        buffer in memory is contiguous */
 227     MPI_Type_size_x(datatype,&btype_size);
 228     MPI_Type_get_extent(datatype,&btype_lb,&btype_extent);
 229     ADIOI_Datatype_iscontig(datatype,&buf_contig);
 230     
 231     if ( ( btype_extent!=btype_size ) || ( ! buf_contig ) )
 232         {
 233             FPRINTF(stderr,"[%d/%d] %s called with discontigous memory buffer\n",
 234                     myrank,nprocs,myname);
 235             fflush(stderr);
 236             *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
 237                     myname, __LINE__, MPI_ERR_IO,
 238                     "**io",
 239                     "**io %s", globus_object_printable_to_string(globus_error_get(result)));
 240             return;
 241         }
 242     /* from here we can assume btype_extent==btype_size */
 243 
 244     /* Flatten out fd->filetype so we know which blocks to skip */
 245     flat_file = ADIOI_Flatten_and_find(fd->filetype);
 246 
 247     /* Figure out how big the area to write is */
 248     /* ASSUMPTION: ftype_size is an integer multiple of btype_size or vice versa. */
 249     start=(globus_off_t)(offset*etype_size);
 250     goff=start;
 251     boff=0;
 252     extent=0;
 253     nblks=0;
 254     while ( boff < (count*btype_size) )
 255         {
 256             int blklen;
 257 
 258             for (i=0;i<flat_file->count;i++)
 259                 {
 260                     if ( (boff+flat_file->blocklens[i]) < (count*btype_size) )
 261                         blklen=flat_file->blocklens[i];
 262                     else
 263                         blklen=(count*btype_size)-boff;
 264                     boff+=blklen;
 265                     extent=MAX(extent,nblks*ftype_extent+flat_file->indices[i]+blklen);
 266                     if ( boff>=(count*btype_size) )
 267                         break;
 268                 }
 269             nblks++;
 270         }
 271     if ( extent < count*btype_size )
 272         {
 273             FPRINTF(stderr,"[%d/%d] %s error in computing extent -- extent %d is smaller than total bytes requested %d!\n",
 274                     myrank,nprocs,myname,extent,count*btype_size);
 275             fflush(stderr);
 276             *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
 277                     myname, __LINE__, MPI_ERR_IO,
 278                     "**io",
 279                     "**io %s", globus_object_printable_to_string(globus_error_get(result)));
 280             return;
 281         }
 282     end=start+(globus_off_t)extent;
 283     FPRINTF(stderr,"[%d/%d] %s writing %d bytes into extent of %d bytes starting at offset %Ld\n",
 284             myrank,nprocs,myname,count*btype_size,extent,(long long)start);
 285     fflush(stderr);
 286 
 287     /* start up the globus partial write */
 288     globus_mutex_init(&writediscontig_ctl_lock, GLOBUS_NULL);
 289     globus_cond_init(&writediscontig_ctl_cond, GLOBUS_NULL);
 290     writediscontig_ctl_done=GLOBUS_FALSE;
 291     if ( (result=globus_ftp_client_partial_put(&(gridftp_fh[fd->fd_sys]),
 292                                                fd->filename,
 293                                                &(oattr[fd->fd_sys]),
 294                                                GLOBUS_NULL,
 295                                                start,
 296                                                end,
 297                                                writediscontig_ctl_cb,
 298                                                GLOBUS_NULL))!=GLOBUS_SUCCESS )
 299         {
 300             globus_err_handler("globus_ftp_client_partial_get",myname,result);
 301             *error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
 302                     myname, __LINE__, MPI_ERR_IO,
 303                     "**io",
 304                     "**io %s", globus_object_printable_to_string(globus_error_get(result)));
 305             return;
 306         }
 307 
 308     /* Do all the actual I/Os */
 309     boff=0;
 310     nblks=0;
 311     while ( boff < (count*btype_size) )
 312         {
 313             int i,blklen;
 314 
 315             for (i=0;i<flat_file->count;i++)
 316                 {
 317                     if ( (boff+flat_file->blocklens[i]) < (count*btype_size) )
 318                         blklen=flat_file->blocklens[i];
 319                     else
 320                         blklen=(count*btype_size)-boff;
 321                     if ( blklen > 0 )
 322                         {
 323                             goff=start+nblks*ftype_extent+((globus_off_t)flat_file->indices[i]);
 324                             /*
 325                             FPRINTF(stderr,"[%d/%d] %s writing %d bytes from boff=%d at goff=%Ld\n",myrank,nprocs,myname,blklen,boff,goff);
 326                             */
 327                             if ( (result=globus_ftp_client_register_write(&(gridftp_fh[fd->fd_sys]),
 328                                                                           ((globus_byte_t *)buf)+boff,
 329                                                                           (globus_size_t)blklen,
 330                                                                           goff,
 331                                                                           GLOBUS_TRUE,
 332                                                                           writediscontig_data_cb,
 333                                                                           (void *)(&bytes_written)))!=GLOBUS_SUCCESS )
 334                                 {
 335                                     globus_err_handler("globus_ftp_client_register_write",myname,result);
 336                                     *error_code=MPI_ERR_IO;
 337                                     ADIOI_Error(fd,*error_code,myname);
 338                                     return;
 339                                 }
 340                             boff+=blklen;
 341                             if ( boff>=(count*btype_size) )
 342                                 break;
 343                         }
 344                 }
 345             nblks++;
 346         }
 347 
 348     
 349     /* The ctl callback won't start till the data callbacks complete, so it's
 350        safe to wait on just the ctl callback */
 351     globus_mutex_lock(&writediscontig_ctl_lock);
 352     while ( writediscontig_ctl_done!=GLOBUS_TRUE )
 353         globus_cond_wait(&writediscontig_ctl_cond,&writediscontig_ctl_lock);
 354     globus_mutex_unlock(&writediscontig_ctl_lock);
 355     globus_mutex_destroy(&writediscontig_ctl_lock);
 356     globus_cond_destroy(&writediscontig_ctl_cond);
 357 
 358 #ifdef HAVE_STATUS_SET_BYTES
 359     MPIR_Status_set_bytes(status, datatype, bytes_written);
 360 #endif
 361     if (file_ptr_type != ADIO_EXPLICIT_OFFSET)
 362     {
 363         fd->fp_ind += extent;
 364         fd->fp_sys_posn = fd->fp_ind;
 365     }
 366     else {
 367         fd->fp_sys_posn = offset + extent;
 368     }
 369 }
 370 
 371 
 372 #define GRIDFTP_USE_GENERIC_STRIDED
 373 void ADIOI_GRIDFTP_WriteStrided(ADIO_File fd, void *buf, int count,
 374                                MPI_Datatype datatype, int file_ptr_type,
 375                                ADIO_Offset offset, ADIO_Status *status,
 376                                int *error_code)
 377 {
 378 #ifdef GRIDFTP_USE_GENERIC_STRIDED
 379     int myrank, nprocs;
 380 
 381     if ( fd->access_mode&ADIO_RDONLY )
 382         {
 383             *error_code=MPI_ERR_AMODE;
 384             return;
 385         }
 386 
 387     *error_code = MPI_SUCCESS;
 388 
 389     MPI_Comm_size(fd->comm, &nprocs);
 390     MPI_Comm_rank(fd->comm, &myrank);
 391 
 392     ADIOI_GEN_WriteStrided(fd, buf, count, datatype, file_ptr_type, offset, 
 393                            status, error_code);
 394     return;
 395 #else
 396     char myname[]="ADIOI_GRIDFTP_WriteStrided";
 397     int myrank, nprocs;
 398     int buf_contig,file_contig;
 399     MPI_Aint btype_size,bufsize;
 400     globus_byte_t *intermediate;
 401 
 402     *error_code = MPI_SUCCESS;
 403 
 404     MPI_Comm_size(fd->comm, &nprocs);
 405     MPI_Comm_rank(fd->comm, &myrank);
 406 
 407     MPI_Type_size_x(datatype,&btype_size);
 408     bufsize=count*btype_size;
 409     ADIOI_Datatype_iscontig(fd->filetype,&file_contig);
 410     ADIOI_Datatype_iscontig(datatype,&buf_contig);
 411     if ( buf_contig && !file_contig )
 412         {
 413             /* Contiguous in memory, discontig in file */
 414             FPRINTF(stderr,"[%d/%d] %s called w/ contig mem, discontig file\n",
 415                     myrank,nprocs,myname);
 416             fflush(stderr);
 417 
 418             ADIOI_GRIDFTP_WriteDiscontig(fd, buf, count, datatype,
 419                                         file_ptr_type, offset, status, error_code);
 420         }
 421     else if ( !buf_contig && file_contig )
 422         {
 423             /* Discontiguous in mem, contig in file -- comparatively easy */
 424             int posn=0;
 425 
 426             FPRINTF(stderr,"[%d/%d] %s called w/ discontig mem, contig file\n",
 427                     myrank,nprocs,myname);
 428             fflush(stderr);
 429 
 430 
 431             /* squeeze contents of main buffer into intermediate buffer*/
 432             intermediate=(globus_byte_t *)ADIOI_Malloc((size_t)bufsize);
 433             MPI_Pack(buf,count,datatype,intermediate,bufsize,&posn,fd->comm);
 434 
 435             /* write contiguous data from intermediate buffer */
 436             ADIOI_GRIDFTP_WriteContig(fd, intermediate, bufsize, MPI_BYTE,
 437                                      file_ptr_type, offset, status, error_code);
 438 
 439             ADIOI_Free(intermediate);
 440         }
 441     else if ( !buf_contig && !file_contig )
 442         {
 443             /* Discontig in both mem and file -- the hardest case */
 444             int posn=0;
 445 
 446             FPRINTF(stderr,"[%d/%d] %s called w/ discontig mem, discontig file\n",
 447                     myrank,nprocs,myname);
 448             fflush(stderr);
 449 
 450             /* squeeze contents of main buffer into intermediate buffer*/
 451             intermediate=(globus_byte_t *)ADIOI_Malloc((size_t)bufsize);
 452             MPI_Pack(buf,count,datatype,intermediate,bufsize,&posn,fd->comm);
 453 
 454             /* write contiguous data from intermediate buffer */
 455             ADIOI_GRIDFTP_WriteDiscontig(fd, intermediate, bufsize, MPI_BYTE,
 456                                      file_ptr_type, offset, status, error_code);
 457 
 458             ADIOI_Free(intermediate);
 459         }
 460     else 
 461         {
 462             /* Why did you bother calling WriteStrided?!?!?! */
 463             FPRINTF(stderr,"[%d/%d] Why the heck did you call %s with contiguous buffer *and* file types?\n",
 464                     myrank,nprocs,myname);
 465             ADIOI_GRIDFTP_WriteContig(fd, buf, count, datatype,
 466                                       file_ptr_type, offset, status, error_code);
 467         }
 468 #endif /* ! GRIDFTP_USE_GENERIC_STRIDED */
 469 }
 470 

/* [<][>][^][v][top][bottom][index][help] */