root/ompi/mca/io/romio321/romio/adio/common/ad_write_coll.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. ADIOI_GEN_WriteStridedColl
  2. ADIOI_Exch_and_write
  3. ADIOI_W_Exchange_data
  4. ADIOI_Fill_send_buffer
  5. ADIOI_Heap_merge

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
   2 /* 
   3  *
   4  *   Copyright (C) 1997 University of Chicago. 
   5  *   See COPYRIGHT notice in top-level directory.
   6  */
   7 
   8 #include "adio.h"
   9 #include "adio_extern.h"
  10 
  11 #ifdef AGGREGATION_PROFILE
  12 #include "mpe.h"
  13 #endif
  14 
  15 /* prototypes of functions used for collective writes only. */
  16 static void ADIOI_Exch_and_write(ADIO_File fd, void *buf, MPI_Datatype
  17                          datatype, int nprocs, int myrank,
  18                          ADIOI_Access
  19                          *others_req, ADIO_Offset *offset_list,
  20                          ADIO_Offset *len_list, int contig_access_count, ADIO_Offset
  21                          min_st_offset, ADIO_Offset fd_size,
  22                          ADIO_Offset *fd_start, ADIO_Offset *fd_end,
  23                          int *buf_idx, int *error_code);
  24 static void ADIOI_W_Exchange_data(ADIO_File fd, void *buf, char *write_buf,
  25                          ADIOI_Flatlist_node *flat_buf, ADIO_Offset 
  26                          *offset_list, ADIO_Offset *len_list, int *send_size, 
  27                          int *recv_size, ADIO_Offset off, int size,
  28                          int *count, int *start_pos, int *partial_recv, 
  29                          int *sent_to_proc, int nprocs, 
  30                          int myrank, int
  31                          buftype_is_contig, int contig_access_count,
  32                          ADIO_Offset min_st_offset, ADIO_Offset fd_size,
  33                          ADIO_Offset *fd_start, ADIO_Offset *fd_end, 
  34                          ADIOI_Access *others_req, 
  35                          int *send_buf_idx, int *curr_to_proc,
  36                          int *done_to_proc, int *hole, int iter, 
  37                          MPI_Aint buftype_extent, int *buf_idx, int *error_code);
  38 void ADIOI_Fill_send_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
  39                            *flat_buf, char **send_buf, ADIO_Offset 
  40                            *offset_list, ADIO_Offset *len_list, int *send_size, 
  41                            MPI_Request *requests, int *sent_to_proc, 
  42                            int nprocs, int myrank, 
  43                            int contig_access_count, ADIO_Offset
  44                            min_st_offset, ADIO_Offset fd_size,
  45                            ADIO_Offset *fd_start, ADIO_Offset *fd_end, 
  46                            int *send_buf_idx, int *curr_to_proc, 
  47                            int *done_to_proc, int iter, 
  48                            MPI_Aint buftype_extent);
  49 void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count, 
  50                       ADIO_Offset *srt_off, int *srt_len, int *start_pos,
  51                       int nprocs, int nprocs_recv, int total_elements);
  52 
  53 
  54 void ADIOI_GEN_WriteStridedColl(ADIO_File fd, const void *buf, int count,
  55                        MPI_Datatype datatype, int file_ptr_type,
  56                        ADIO_Offset offset, ADIO_Status *status, int
  57                        *error_code)
  58 {
  59 /* Uses a generalized version of the extended two-phase method described
  60    in "An Extended Two-Phase Method for Accessing Sections of 
  61    Out-of-Core Arrays", Rajeev Thakur and Alok Choudhary,
  62    Scientific Programming, (5)4:301--317, Winter 1996. 
  63    http://www.mcs.anl.gov/home/thakur/ext2ph.ps */
  64 
  65     ADIOI_Access *my_req; 
  66     /* array of nprocs access structures, one for each other process in
  67        whose file domain this process's request lies */
  68     
  69     ADIOI_Access *others_req;
  70     /* array of nprocs access structures, one for each other process
  71        whose request lies in this process's file domain. */
  72 
  73     int i, filetype_is_contig, nprocs, nprocs_for_coll, myrank;
  74     int contig_access_count=0, interleave_count = 0, buftype_is_contig;
  75     int *count_my_req_per_proc, count_my_req_procs, count_others_req_procs;
  76     ADIO_Offset orig_fp, start_offset, end_offset, fd_size, min_st_offset, off;
  77     ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *fd_start = NULL,
  78         *fd_end = NULL, *end_offsets = NULL;
  79     int *buf_idx = NULL;
  80     ADIO_Offset *len_list = NULL;
  81     int old_error, tmp_error;
  82 
  83     if (fd->hints->cb_pfr != ADIOI_HINT_DISABLE) { 
  84         /* Cast away const'ness as the below function is used for read
  85          * and write */
  86         ADIOI_IOStridedColl (fd, (char *) buf, count, ADIOI_WRITE, datatype,
  87                         file_ptr_type, offset, status, error_code);
  88         return;
  89     }
  90 
  91     MPI_Comm_size(fd->comm, &nprocs);
  92     MPI_Comm_rank(fd->comm, &myrank);
  93 
  94 /* the number of processes that actually perform I/O, nprocs_for_coll,
  95  * is stored in the hints off the ADIO_File structure
  96  */
  97     nprocs_for_coll = fd->hints->cb_nodes;
  98     orig_fp = fd->fp_ind;
  99 
 100     /* only check for interleaving if cb_write isn't disabled */
 101     if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
 102         /* For this process's request, calculate the list of offsets and
 103            lengths in the file and determine the start and end offsets. */
 104 
 105         /* Note: end_offset points to the last byte-offset that will be accessed.
 106            e.g., if start_offset=0 and 100 bytes to be read, end_offset=99*/
 107 
 108         ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,
 109                               &offset_list, &len_list, &start_offset,
 110                               &end_offset, &contig_access_count); 
 111 
 112         /* each process communicates its start and end offsets to other 
 113            processes. The result is an array each of start and end offsets stored
 114            in order of process rank. */ 
 115     
 116         st_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
 117         end_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
 118 
 119         MPI_Allgather(&start_offset, 1, ADIO_OFFSET, st_offsets, 1,
 120                       ADIO_OFFSET, fd->comm);
 121         MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1,
 122                       ADIO_OFFSET, fd->comm);
 123 
 124         /* are the accesses of different processes interleaved? */
 125         for (i=1; i<nprocs; i++)
 126             if ((st_offsets[i] < end_offsets[i-1]) && 
 127                 (st_offsets[i] <= end_offsets[i]))
 128                 interleave_count++;
 129         /* This is a rudimentary check for interleaving, but should suffice
 130            for the moment. */
 131     }
 132 
 133     ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
 134 
 135     if (fd->hints->cb_write == ADIOI_HINT_DISABLE ||
 136         (!interleave_count && (fd->hints->cb_write == ADIOI_HINT_AUTO)))
 137     {
 138         /* use independent accesses */
 139         if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
 140             ADIOI_Free(offset_list);
 141             ADIOI_Free(len_list);
 142             ADIOI_Free(st_offsets);
 143             ADIOI_Free(end_offsets);
 144         }
 145 
 146         fd->fp_ind = orig_fp;
 147         ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
 148 
 149         if (buftype_is_contig && filetype_is_contig) {
 150             if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
 151                 off = fd->disp + (ADIO_Offset)(fd->etype_size) * offset;
 152                 ADIO_WriteContig(fd, buf, count, datatype,
 153                                  ADIO_EXPLICIT_OFFSET,
 154                                  off, status, error_code);
 155             }
 156             else ADIO_WriteContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
 157                                   0, status, error_code);
 158         }
 159         else ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type,
 160                                offset, status, error_code);
 161 
 162         return;
 163     }
 164 
 165 /* Divide the I/O workload among "nprocs_for_coll" processes. This is
 166    done by (logically) dividing the file into file domains (FDs); each
 167    process may directly access only its own file domain. */
 168 
 169     ADIOI_Calc_file_domains(st_offsets, end_offsets, nprocs,
 170                             nprocs_for_coll, &min_st_offset,
 171                             &fd_start, &fd_end, 
 172                             fd->hints->min_fdomain_size, &fd_size,
 173                             fd->hints->striping_unit);   
 174 
 175 
 176 /* calculate what portions of the access requests of this process are
 177    located in what file domains */
 178 
 179     ADIOI_Calc_my_req(fd, offset_list, len_list, contig_access_count,
 180                       min_st_offset, fd_start, fd_end, fd_size,
 181                       nprocs, &count_my_req_procs, 
 182                       &count_my_req_per_proc, &my_req,
 183                       &buf_idx); 
 184 
 185 /* based on everyone's my_req, calculate what requests of other
 186    processes lie in this process's file domain.
 187    count_others_req_procs = number of processes whose requests lie in
 188    this process's file domain (including this process itself) 
 189    count_others_req_per_proc[i] indicates how many separate contiguous
 190    requests of proc. i lie in this process's file domain. */
 191 
 192     ADIOI_Calc_others_req(fd, count_my_req_procs, 
 193                           count_my_req_per_proc, my_req, 
 194                           nprocs, myrank,
 195                           &count_others_req_procs, &others_req); 
 196     
 197     ADIOI_Free(count_my_req_per_proc);
 198     for (i=0; i < nprocs; i++) {
 199         if (my_req[i].count) {
 200             ADIOI_Free(my_req[i].offsets);
 201             ADIOI_Free(my_req[i].lens);
 202         }
 203     }
 204     ADIOI_Free(my_req);
 205 
 206 /* exchange data and write in sizes of no more than coll_bufsize. */
 207     /* Cast away const'ness for the below function */
 208     ADIOI_Exch_and_write(fd, (char *) buf, datatype, nprocs, myrank,
 209                         others_req, offset_list,
 210                         len_list, contig_access_count, min_st_offset,
 211                         fd_size, fd_start, fd_end, buf_idx, error_code);
 212 
 213     /* If this collective write is followed by an independent write,
 214      * it's possible to have those subsequent writes on other processes
 215      * race ahead and sneak in before the read-modify-write completes.
 216      * We carry out a collective communication at the end here so no one
 217      * can start independent i/o before collective I/O completes. 
 218      *
 219      * need to do some gymnastics with the error codes so that if something
 220      * went wrong, all processes report error, but if a process has a more
 221      * specific error code, we can still have that process report the
 222      * additional information */
 223 
 224     old_error = *error_code;
 225     if (*error_code != MPI_SUCCESS) *error_code = MPI_ERR_IO;
 226 
 227      /* optimization: if only one process performing i/o, we can perform
 228      * a less-expensive Bcast  */
 229 #ifdef ADIOI_MPE_LOGGING
 230     MPE_Log_event( ADIOI_MPE_postwrite_a, 0, NULL );
 231 #endif
 232     if (fd->hints->cb_nodes == 1) 
 233             MPI_Bcast(error_code, 1, MPI_INT, 
 234                             fd->hints->ranklist[0], fd->comm);
 235     else {
 236             tmp_error = *error_code;
 237             MPI_Allreduce(&tmp_error, error_code, 1, MPI_INT, 
 238                             MPI_MAX, fd->comm);
 239     }
 240 #ifdef ADIOI_MPE_LOGGING
 241     MPE_Log_event( ADIOI_MPE_postwrite_b, 0, NULL );
 242 #endif
 243 #ifdef AGGREGATION_PROFILE
 244         MPE_Log_event (5012, 0, NULL);
 245 #endif
 246 
 247     if ( (old_error != MPI_SUCCESS) && (old_error != MPI_ERR_IO) )
 248             *error_code = old_error;
 249 
 250 
 251     if (!buftype_is_contig) ADIOI_Delete_flattened(datatype);
 252 
 253 /* free all memory allocated for collective I/O */
 254 
 255     for (i=0; i<nprocs; i++) {
 256         if (others_req[i].count) {
 257             ADIOI_Free(others_req[i].offsets);
 258             ADIOI_Free(others_req[i].lens);
 259             ADIOI_Free(others_req[i].mem_ptrs);
 260         }
 261     }
 262     ADIOI_Free(others_req);
 263 
 264     ADIOI_Free(buf_idx);
 265     ADIOI_Free(offset_list);
 266     ADIOI_Free(len_list);
 267     ADIOI_Free(st_offsets);
 268     ADIOI_Free(end_offsets);
 269     ADIOI_Free(fd_start);
 270     ADIOI_Free(fd_end);
 271 
 272 #ifdef HAVE_STATUS_SET_BYTES
 273     if (status) {
 274       MPI_Count bufsize, size;
 275       /* Don't set status if it isn't needed */
 276       MPI_Type_size_x(datatype, &size);
 277       bufsize = size * count;
 278       MPIR_Status_set_bytes(status, datatype, bufsize);
 279     }
 280 /* This is a temporary way of filling in status. The right way is to 
 281    keep track of how much data was actually written during collective I/O. */
 282 #endif
 283 
 284     fd->fp_sys_posn = -1;   /* set it to null. */
 285 #ifdef AGGREGATION_PROFILE
 286         MPE_Log_event (5013, 0, NULL);
 287 #endif
 288 }
 289 
 290 
 291 
 292 /* If successful, error_code is set to MPI_SUCCESS.  Otherwise an error
 293  * code is created and returned in error_code.
 294  */
 295 static void ADIOI_Exch_and_write(ADIO_File fd, void *buf, MPI_Datatype
 296                                  datatype, int nprocs, 
 297                                  int myrank,
 298                                  ADIOI_Access
 299                                  *others_req, ADIO_Offset *offset_list,
 300                                  ADIO_Offset *len_list, int contig_access_count,
 301                                  ADIO_Offset min_st_offset, ADIO_Offset fd_size,
 302                                  ADIO_Offset *fd_start, ADIO_Offset *fd_end,
 303                                  int *buf_idx, int *error_code)
 304 {
 305 /* Send data to appropriate processes and write in sizes of no more
 306    than coll_bufsize.    
 307    The idea is to reduce the amount of extra memory required for
 308    collective I/O. If all data were written all at once, which is much
 309    easier, it would require temp space more than the size of user_buf,
 310    which is often unacceptable. For example, to write a distributed
 311    array to a file, where each local array is 8Mbytes, requiring
 312    at least another 8Mbytes of temp space is unacceptable. */
 313 
 314     /* Not convinced end_loc-st_loc couldn't be > int, so make these offsets*/
 315     ADIO_Offset size=0;
 316     int hole, i, j, m, ntimes, max_ntimes, buftype_is_contig;
 317     ADIO_Offset st_loc=-1, end_loc=-1, off, done, req_off;
 318     char *write_buf=NULL;
 319     int *curr_offlen_ptr, *count, *send_size, req_len, *recv_size;
 320     int *partial_recv, *sent_to_proc, *start_pos, flag;
 321     int *send_buf_idx, *curr_to_proc, *done_to_proc;
 322     MPI_Status status;
 323     ADIOI_Flatlist_node *flat_buf=NULL;
 324     MPI_Aint buftype_extent, lb;
 325     int info_flag, coll_bufsize;
 326     char *value;
 327     static char myname[] = "ADIOI_EXCH_AND_WRITE";
 328 
 329     *error_code = MPI_SUCCESS;  /* changed below if error */
 330     /* only I/O errors are currently reported */
 331 
 332 /* calculate the number of writes of size coll_bufsize
 333    to be done by each process and the max among all processes.
 334    That gives the no. of communication phases as well. */
 335 
 336     value = (char *) ADIOI_Malloc((MPI_MAX_INFO_VAL+1)*sizeof(char));
 337     ADIOI_Info_get(fd->info, "cb_buffer_size", MPI_MAX_INFO_VAL, value, 
 338                  &info_flag);
 339     coll_bufsize = atoi(value);
 340     ADIOI_Free(value);
 341 
 342 
 343     for (i=0; i < nprocs; i++) {
 344         if (others_req[i].count) {
 345             st_loc = others_req[i].offsets[0];
 346             end_loc = others_req[i].offsets[0];
 347             break;
 348         }
 349     }
 350 
 351     for (i=0; i < nprocs; i++)
 352         for (j=0; j < others_req[i].count; j++) {
 353             st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);
 354             end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j]
 355                                        + others_req[i].lens[j] - 1));
 356         }
 357 
 358 /* ntimes=ceiling_div(end_loc - st_loc + 1, coll_bufsize)*/
 359 
 360     ntimes = (int) ((end_loc - st_loc + coll_bufsize)/coll_bufsize);
 361 
 362     if ((st_loc==-1) && (end_loc==-1)) {
 363         ntimes = 0; /* this process does no writing. */
 364     }
 365 
 366     MPI_Allreduce(&ntimes, &max_ntimes, 1, MPI_INT, MPI_MAX,
 367                   fd->comm); 
 368 
 369     write_buf = fd->io_buf;
 370 
 371     curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int)); 
 372     /* its use is explained below. calloc initializes to 0. */
 373 
 374     count = (int *) ADIOI_Malloc(nprocs*sizeof(int));
 375     /* to store count of how many off-len pairs per proc are satisfied
 376        in an iteration. */
 377 
 378     partial_recv = (int *) ADIOI_Calloc(nprocs, sizeof(int));
 379     /* if only a portion of the last off-len pair is recd. from a process
 380        in a particular iteration, the length recd. is stored here.
 381        calloc initializes to 0. */
 382 
 383     send_size = (int *) ADIOI_Malloc(nprocs*sizeof(int));
 384     /* total size of data to be sent to each proc. in an iteration.
 385        Of size nprocs so that I can use MPI_Alltoall later. */
 386 
 387     recv_size = (int *) ADIOI_Malloc(nprocs*sizeof(int));
 388     /* total size of data to be recd. from each proc. in an iteration.*/
 389 
 390     sent_to_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int));
 391     /* amount of data sent to each proc so far. Used in
 392        ADIOI_Fill_send_buffer. initialized to 0 here. */
 393 
 394     send_buf_idx = (int *) ADIOI_Malloc(nprocs*sizeof(int));
 395     curr_to_proc = (int *) ADIOI_Malloc(nprocs*sizeof(int));
 396     done_to_proc = (int *) ADIOI_Malloc(nprocs*sizeof(int));
 397     /* Above three are used in ADIOI_Fill_send_buffer*/
 398 
 399     start_pos = (int *) ADIOI_Malloc(nprocs*sizeof(int));
 400     /* used to store the starting value of curr_offlen_ptr[i] in 
 401        this iteration */
 402 
 403     ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
 404     if (!buftype_is_contig) {
 405         flat_buf = ADIOI_Flatten_and_find(datatype);
 406     }
 407     MPI_Type_get_extent(datatype, &lb, &buftype_extent);
 408 
 409 
 410 /* I need to check if there are any outstanding nonblocking writes to
 411    the file, which could potentially interfere with the writes taking
 412    place in this collective write call. Since this is not likely to be
 413    common, let me do the simplest thing possible here: Each process
 414    completes all pending nonblocking operations before completing. */
 415 
 416     /*ADIOI_Complete_async(error_code);
 417     if (*error_code != MPI_SUCCESS) return;
 418     MPI_Barrier(fd->comm);
 419     */
 420 
 421     done = 0;
 422     off = st_loc;
 423 
 424     for (m=0; m < ntimes; m++) {
 425        /* go through all others_req and check which will be satisfied
 426           by the current write */
 427 
 428        /* Note that MPI guarantees that displacements in filetypes are in 
 429           monotonically nondecreasing order and that, for writes, the
 430           filetypes cannot specify overlapping regions in the file. This
 431           simplifies implementation a bit compared to reads. */
 432 
 433           /* off = start offset in the file for the data to be written in 
 434                    this iteration 
 435              size = size of data written (bytes) corresponding to off
 436              req_off = off in file for a particular contiguous request 
 437                        minus what was satisfied in previous iteration
 438              req_size = size corresponding to req_off */
 439 
 440         /* first calculate what should be communicated */
 441 
 442         for (i=0; i < nprocs; i++) count[i] = recv_size[i] = 0;
 443 
 444         size = ADIOI_MIN((unsigned)coll_bufsize, end_loc-st_loc+1-done); 
 445 
 446         for (i=0; i < nprocs; i++) {
 447             if (others_req[i].count) {
 448                 start_pos[i] = curr_offlen_ptr[i];
 449                 for (j=curr_offlen_ptr[i]; j<others_req[i].count; j++) {
 450                     if (partial_recv[i]) {
 451                         /* this request may have been partially
 452                            satisfied in the previous iteration. */
 453                         req_off = others_req[i].offsets[j] +
 454                             partial_recv[i]; 
 455                         req_len = others_req[i].lens[j] -
 456                             partial_recv[i];
 457                         partial_recv[i] = 0;
 458                         /* modify the off-len pair to reflect this change */
 459                         others_req[i].offsets[j] = req_off;
 460                         others_req[i].lens[j] = req_len;
 461                     }
 462                     else {
 463                         req_off = others_req[i].offsets[j];
 464                         req_len = others_req[i].lens[j];
 465                     }
 466                     if (req_off < off + size) {
 467                         count[i]++;
 468       ADIOI_Assert((((ADIO_Offset)(MPIU_Upint)write_buf)+req_off-off) == (ADIO_Offset)(MPIU_Upint)(write_buf+req_off-off));
 469                         MPI_Get_address(write_buf+req_off-off, 
 470                                         &(others_req[i].mem_ptrs[j]));
 471       ADIOI_Assert((off + size - req_off) == (int)(off + size - req_off));
 472                         recv_size[i] += (int)(ADIOI_MIN(off + size - req_off, 
 473                                       (unsigned)req_len));
 474 
 475                         if (off+size-req_off < (unsigned)req_len)
 476                         {
 477                             partial_recv[i] = (int) (off + size - req_off);
 478 
 479                             /* --BEGIN ERROR HANDLING-- */
 480                             if ((j+1 < others_req[i].count) && 
 481                                  (others_req[i].offsets[j+1] < off+size))
 482                             { 
 483                                 *error_code = MPIO_Err_create_code(MPI_SUCCESS,
 484                                                                    MPIR_ERR_RECOVERABLE,
 485                                                                    myname,
 486                                                                    __LINE__,
 487                                                                    MPI_ERR_ARG,
 488                                                                    "Filetype specifies overlapping write regions (which is illegal according to the MPI-2 specification)", 0);
 489                                 /* allow to continue since additional
 490                                  * communication might have to occur
 491                                  */
 492                             }
 493                             /* --END ERROR HANDLING-- */
 494                             break;
 495                         }
 496                     }
 497                     else break;
 498                 }
 499                 curr_offlen_ptr[i] = j;
 500             }
 501         }
 502         
 503         ADIOI_W_Exchange_data(fd, buf, write_buf, flat_buf, offset_list, 
 504                             len_list, send_size, recv_size, off, size, count, 
 505                             start_pos, partial_recv, 
 506                             sent_to_proc, nprocs, myrank, 
 507                             buftype_is_contig, contig_access_count,
 508                             min_st_offset, fd_size, fd_start, fd_end,
 509                             others_req, send_buf_idx, curr_to_proc,
 510                             done_to_proc, &hole, m, buftype_extent, buf_idx,
 511                             error_code); 
 512         if (*error_code != MPI_SUCCESS) return;
 513 
 514         flag = 0;
 515         for (i=0; i<nprocs; i++)
 516             if (count[i]) flag = 1;
 517 
 518         if (flag) {
 519       ADIOI_Assert(size == (int)size);
 520             ADIO_WriteContig(fd, write_buf, (int)size, MPI_BYTE, ADIO_EXPLICIT_OFFSET, 
 521                         off, &status, error_code);
 522             if (*error_code != MPI_SUCCESS) return;
 523         }
 524 
 525         off += size;
 526         done += size;
 527     }
 528 
 529     for (i=0; i<nprocs; i++) count[i] = recv_size[i] = 0;
 530     for (m=ntimes; m<max_ntimes; m++) {
 531         ADIOI_Assert(size == (int)size);
 532         /* nothing to recv, but check for send. */
 533         ADIOI_W_Exchange_data(fd, buf, write_buf, flat_buf, offset_list, 
 534                             len_list, send_size, recv_size, off, (int)size, count,
 535                             start_pos, partial_recv, 
 536                             sent_to_proc, nprocs, myrank, 
 537                             buftype_is_contig, contig_access_count,
 538                             min_st_offset, fd_size, fd_start, fd_end,
 539                             others_req, send_buf_idx, 
 540                             curr_to_proc, done_to_proc, &hole, m, 
 541                             buftype_extent, buf_idx, error_code); 
 542         if (*error_code != MPI_SUCCESS) return;
 543     }
 544 
 545     ADIOI_Free(curr_offlen_ptr);
 546     ADIOI_Free(count);
 547     ADIOI_Free(partial_recv);
 548     ADIOI_Free(send_size);
 549     ADIOI_Free(recv_size);
 550     ADIOI_Free(sent_to_proc);
 551     ADIOI_Free(start_pos);
 552     ADIOI_Free(send_buf_idx);
 553     ADIOI_Free(curr_to_proc);
 554     ADIOI_Free(done_to_proc);
 555 }
 556 
 557 
 558 /* Sets error_code to MPI_SUCCESS if successful, or creates an error code
 559  * in the case of error.
 560  */
 561 static void ADIOI_W_Exchange_data(ADIO_File fd, void *buf, char *write_buf,
 562                                   ADIOI_Flatlist_node *flat_buf, ADIO_Offset 
 563                                   *offset_list, ADIO_Offset *len_list, int *send_size, 
 564                                   int *recv_size, ADIO_Offset off, int size,
 565                                   int *count, int *start_pos,
 566                                   int *partial_recv,
 567                                   int *sent_to_proc, int nprocs, 
 568                                   int myrank, int
 569                                   buftype_is_contig, int contig_access_count,
 570                                   ADIO_Offset min_st_offset,
 571                                   ADIO_Offset fd_size,
 572                                   ADIO_Offset *fd_start, ADIO_Offset *fd_end, 
 573                                   ADIOI_Access *others_req, 
 574                                   int *send_buf_idx, int *curr_to_proc,
 575                                   int *done_to_proc, int *hole, int iter, 
 576                                   MPI_Aint buftype_extent, int *buf_idx,
 577                                   int *error_code)
 578 {
 579     int i, j, k, *tmp_len, nprocs_recv, nprocs_send, err;
 580     char **send_buf = NULL; 
 581     MPI_Request *requests, *send_req;
 582     MPI_Datatype *recv_types;
 583     MPI_Status *statuses, status;
 584     int *srt_len=NULL, sum;
 585     ADIO_Offset *srt_off=NULL;
 586     static char myname[] = "ADIOI_W_EXCHANGE_DATA";
 587 
 588 /* exchange recv_size info so that each process knows how much to
 589    send to whom. */
 590 
 591     MPI_Alltoall(recv_size, 1, MPI_INT, send_size, 1, MPI_INT, fd->comm);
 592 
 593     /* create derived datatypes for recv */
 594 
 595     nprocs_recv = 0;
 596     for (i=0; i<nprocs; i++) if (recv_size[i]) nprocs_recv++;
 597 
 598     recv_types = (MPI_Datatype *)
 599         ADIOI_Malloc((nprocs_recv+1)*sizeof(MPI_Datatype)); 
 600 /* +1 to avoid a 0-size malloc */
 601 
 602     tmp_len = (int *) ADIOI_Malloc(nprocs*sizeof(int));
 603     j = 0;
 604     for (i=0; i<nprocs; i++) {
 605         if (recv_size[i]) {
 606 /* take care if the last off-len pair is a partial recv */
 607             if (partial_recv[i]) {
 608                 k = start_pos[i] + count[i] - 1;
 609                 tmp_len[i] = others_req[i].lens[k];
 610                 others_req[i].lens[k] = partial_recv[i];
 611             }
 612             ADIOI_Type_create_hindexed_x(count[i],
 613                      &(others_req[i].lens[start_pos[i]]),
 614                      &(others_req[i].mem_ptrs[start_pos[i]]), 
 615                          MPI_BYTE, recv_types+j);
 616             /* absolute displacements; use MPI_BOTTOM in recv */
 617             MPI_Type_commit(recv_types+j);
 618             j++;
 619         }
 620     }
 621 
 622     /* To avoid a read-modify-write, check if there are holes in the 
 623        data to be written. For this, merge the (sorted) offset lists
 624        others_req using a heap-merge. */
 625 
 626     sum = 0;
 627     for (i=0; i<nprocs; i++) sum += count[i];
 628     /* valgrind-detcted optimization: if there is no work on this process we do
 629      * not need to search for holes */
 630     if (sum) {
 631         srt_off = (ADIO_Offset *) ADIOI_Malloc(sum*sizeof(ADIO_Offset));
 632         srt_len = (int *) ADIOI_Malloc(sum*sizeof(int));
 633 
 634         ADIOI_Heap_merge(others_req, count, srt_off, srt_len, start_pos,
 635                          nprocs, nprocs_recv, sum);
 636     }
 637 
 638 /* for partial recvs, restore original lengths */
 639     for (i=0; i<nprocs; i++) 
 640         if (partial_recv[i]) {
 641             k = start_pos[i] + count[i] - 1;
 642             others_req[i].lens[k] = tmp_len[i];
 643         }
 644     ADIOI_Free(tmp_len);
 645 
 646     /* check if there are any holes. If yes, must do read-modify-write.
 647      * holes can be in three places.  'middle' is what you'd expect: the
 648      * processes are operating on noncontigous data.  But holes can also show
 649      * up at the beginning or end of the file domain (see John Bent ROMIO REQ
 650      * #835). Missing these holes would result in us writing more data than
 651      * recieved by everyone else. */
 652 
 653     *hole = 0;
 654     if (sum) {
 655         if (off != srt_off[0]) /* hole at the front */
 656             *hole = 1;
 657         else { /* coalesce the sorted offset-length pairs */
 658             for (i=1; i<sum; i++) {
 659                 if (srt_off[i] <= srt_off[0] + srt_len[0]) {
 660                     /* ok to cast: operating on cb_buffer_size chunks */
 661                     int new_len = (int)srt_off[i] + srt_len[i] - (int)srt_off[0];
 662                     if (new_len > srt_len[0]) srt_len[0] = new_len;
 663                 }
 664                 else
 665                         break;
 666             }
 667             if (i < sum || size != srt_len[0]) /* hole in middle or end */
 668                 *hole = 1;
 669         }
 670 
 671         ADIOI_Free(srt_off);
 672         ADIOI_Free(srt_len);
 673     }
 674 
 675     if (nprocs_recv) {
 676         if (*hole) {
 677             ADIO_ReadContig(fd, write_buf, size, MPI_BYTE, 
 678                             ADIO_EXPLICIT_OFFSET, off, &status, &err);
 679             /* --BEGIN ERROR HANDLING-- */
 680             if (err != MPI_SUCCESS) {
 681                 *error_code = MPIO_Err_create_code(err,
 682                                                    MPIR_ERR_RECOVERABLE, myname,
 683                                                    __LINE__, MPI_ERR_IO,
 684                                                    "**ioRMWrdwr", 0);
 685                 return;
 686             } 
 687             /* --END ERROR HANDLING-- */
 688         }
 689     }
 690 
 691     nprocs_send = 0;
 692     for (i=0; i < nprocs; i++) if (send_size[i]) nprocs_send++;
 693 
 694     if (fd->atomicity) {
 695         /* bug fix from Wei-keng Liao and Kenin Coloma */
 696         requests = (MPI_Request *)
 697             ADIOI_Malloc((nprocs_send+1)*sizeof(MPI_Request)); 
 698         send_req = requests;
 699     }
 700     else {
 701         requests = (MPI_Request *)      
 702             ADIOI_Malloc((nprocs_send+nprocs_recv+1)*sizeof(MPI_Request)); 
 703         /* +1 to avoid a 0-size malloc */
 704 
 705         /* post receives */
 706         j = 0;
 707         for (i=0; i<nprocs; i++) {
 708             if (recv_size[i]) {
 709                 MPI_Irecv(MPI_BOTTOM, 1, recv_types[j], i, myrank+i+100*iter,
 710                           fd->comm, requests+j);
 711                 j++;
 712             }
 713         }
 714         send_req = requests + nprocs_recv;
 715     }
 716 
 717 /* post sends. if buftype_is_contig, data can be directly sent from
 718    user buf at location given by buf_idx. else use send_buf. */
 719 
 720 #ifdef AGGREGATION_PROFILE
 721     MPE_Log_event (5032, 0, NULL);
 722 #endif
 723     if (buftype_is_contig) {
 724         j = 0;
 725         for (i=0; i < nprocs; i++) 
 726             if (send_size[i]) {
 727                 MPI_Isend(((char *) buf) + buf_idx[i], send_size[i], 
 728                             MPI_BYTE, i,  myrank+i+100*iter, fd->comm, 
 729                                   send_req+j);
 730                 j++;
 731                 buf_idx[i] += send_size[i];
 732             }
 733     }
 734     else if (nprocs_send) {
 735         /* buftype is not contig */
 736         send_buf = (char **) ADIOI_Malloc(nprocs*sizeof(char*));
 737         for (i=0; i < nprocs; i++) 
 738             if (send_size[i]) 
 739                 send_buf[i] = (char *) ADIOI_Malloc(send_size[i]);
 740 
 741         ADIOI_Fill_send_buffer(fd, buf, flat_buf, send_buf,
 742                            offset_list, len_list, send_size, 
 743                            send_req,
 744                            sent_to_proc, nprocs, myrank, 
 745                            contig_access_count,
 746                            min_st_offset, fd_size, fd_start, fd_end, 
 747                            send_buf_idx, curr_to_proc, done_to_proc, iter,
 748                            buftype_extent);
 749         /* the send is done in ADIOI_Fill_send_buffer */
 750     }
 751 
 752     if (fd->atomicity) {
 753         /* bug fix from Wei-keng Liao and Kenin Coloma */
 754         j = 0;
 755         for (i=0; i<nprocs; i++) {
 756             MPI_Status wkl_status;
 757             if (recv_size[i]) {
 758                 MPI_Recv(MPI_BOTTOM, 1, recv_types[j], i, myrank+i+100*iter,
 759                           fd->comm, &wkl_status);
 760                 j++;
 761             }
 762         }
 763     }
 764 
 765     for (i=0; i<nprocs_recv; i++) MPI_Type_free(recv_types+i);
 766     ADIOI_Free(recv_types);
 767     
 768     if (fd->atomicity) {
 769         /* bug fix from Wei-keng Liao and Kenin Coloma */
 770         statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send+1) * \
 771                                          sizeof(MPI_Status)); 
 772          /* +1 to avoid a 0-size malloc */
 773     }
 774     else {
 775         statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send+nprocs_recv+1) * \
 776                                      sizeof(MPI_Status)); 
 777         /* +1 to avoid a 0-size malloc */
 778     }
 779 
 780 #ifdef NEEDS_MPI_TEST
 781     i = 0;
 782     if (fd->atomicity) {
 783         /* bug fix from Wei-keng Liao and Kenin Coloma */
 784         while (!i) MPI_Testall(nprocs_send, send_req, &i, statuses);
 785     }
 786     else {
 787         while (!i) MPI_Testall(nprocs_send+nprocs_recv, requests, &i, statuses);
 788     }
 789 #else
 790     if (fd->atomicity)
 791         /* bug fix from Wei-keng Liao and Kenin Coloma */
 792         MPI_Waitall(nprocs_send, send_req, statuses);
 793     else
 794         MPI_Waitall(nprocs_send+nprocs_recv, requests, statuses);
 795 #endif
 796 
 797 #ifdef AGGREGATION_PROFILE
 798     MPE_Log_event (5033, 0, NULL);
 799 #endif
 800     ADIOI_Free(statuses);
 801     ADIOI_Free(requests);
 802     if (!buftype_is_contig && nprocs_send) {
 803         for (i=0; i < nprocs; i++) 
 804             if (send_size[i]) ADIOI_Free(send_buf[i]);
 805         ADIOI_Free(send_buf);
 806     }
 807 }
 808 
 809 #define ADIOI_BUF_INCR \
 810 { \
 811     while (buf_incr) { \
 812         size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \
 813         user_buf_idx += size_in_buf; \
 814         flat_buf_sz -= size_in_buf; \
 815         if (!flat_buf_sz) { \
 816             if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
 817             else { \
 818                 flat_buf_idx = 0; \
 819                 n_buftypes++; \
 820             } \
 821             user_buf_idx = flat_buf->indices[flat_buf_idx] + \
 822                               (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
 823             flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
 824         } \
 825         buf_incr -= size_in_buf; \
 826     } \
 827 }
 828 
 829 
 830 #define ADIOI_BUF_COPY \
 831 { \
 832     while (size) { \
 833         size_in_buf = ADIOI_MIN(size, flat_buf_sz); \
 834   ADIOI_Assert((((ADIO_Offset)(MPIU_Upint)buf) + user_buf_idx) == (ADIO_Offset)(MPIU_Upint)((MPIU_Upint)buf + user_buf_idx)); \
 835   ADIOI_Assert((ADIO_Size) size_in_buf == (size_t)size_in_buf);          \
 836         memcpy(&(send_buf[p][send_buf_idx[p]]), \
 837                ((char *) buf) + user_buf_idx, size_in_buf); \
 838         send_buf_idx[p] += size_in_buf; \
 839         user_buf_idx += size_in_buf; \
 840         flat_buf_sz -= size_in_buf; \
 841         if (!flat_buf_sz) { \
 842             if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
 843             else { \
 844                 flat_buf_idx = 0; \
 845                 n_buftypes++; \
 846             } \
 847             user_buf_idx = flat_buf->indices[flat_buf_idx] + \
 848                               (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
 849             flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
 850         } \
 851         size -= size_in_buf; \
 852         buf_incr -= size_in_buf; \
 853     } \
 854     ADIOI_BUF_INCR \
 855 }
 856 
 857 
 858 
 859 
 860 
 861 void ADIOI_Fill_send_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
 862                            *flat_buf, char **send_buf, ADIO_Offset 
 863                            *offset_list, ADIO_Offset *len_list, int *send_size, 
 864                            MPI_Request *requests, int *sent_to_proc, 
 865                            int nprocs, int myrank, 
 866                            int contig_access_count, 
 867                            ADIO_Offset min_st_offset, ADIO_Offset fd_size,
 868                            ADIO_Offset *fd_start, ADIO_Offset *fd_end, 
 869                            int *send_buf_idx, int *curr_to_proc, 
 870                            int *done_to_proc, int iter,
 871                            MPI_Aint buftype_extent)
 872 {
 873 /* this function is only called if buftype is not contig */
 874 
 875     int i, p, flat_buf_idx;
 876     ADIO_Offset flat_buf_sz, size_in_buf, buf_incr, size;
 877     int jj, n_buftypes;
 878     ADIO_Offset off, len, rem_len, user_buf_idx;
 879 
 880 /*  curr_to_proc[p] = amount of data sent to proc. p that has already
 881     been accounted for so far
 882     done_to_proc[p] = amount of data already sent to proc. p in 
 883     previous iterations
 884     user_buf_idx = current location in user buffer 
 885     send_buf_idx[p] = current location in send_buf of proc. p  */
 886 
 887     for (i=0; i < nprocs; i++) {
 888         send_buf_idx[i] = curr_to_proc[i] = 0;
 889         done_to_proc[i] = sent_to_proc[i];
 890     }
 891     jj = 0;
 892 
 893     user_buf_idx = flat_buf->indices[0];
 894     flat_buf_idx = 0;
 895     n_buftypes = 0;
 896     flat_buf_sz = flat_buf->blocklens[0];
 897 
 898     /* flat_buf_idx = current index into flattened buftype
 899        flat_buf_sz = size of current contiguous component in 
 900                          flattened buf */
 901 
 902     for (i=0; i<contig_access_count; i++) { 
 903         off     = offset_list[i];
 904         rem_len = len_list[i];
 905 
 906         /*this request may span the file domains of more than one process*/
 907         while (rem_len != 0) {
 908             len = rem_len;
 909             /* NOTE: len value is modified by ADIOI_Calc_aggregator() to be no
 910              * longer than the single region that processor "p" is responsible
 911              * for.
 912              */
 913             p = ADIOI_Calc_aggregator(fd,
 914                                       off,
 915                                       min_st_offset,
 916                                       &len,
 917                                       fd_size,
 918                                       fd_start,
 919                                       fd_end);
 920 
 921             if (send_buf_idx[p] < send_size[p]) {
 922                 if (curr_to_proc[p]+len > done_to_proc[p]) {
 923                     if (done_to_proc[p] > curr_to_proc[p]) {
 924                         size = ADIOI_MIN(curr_to_proc[p] + len - 
 925                                 done_to_proc[p], send_size[p]-send_buf_idx[p]);
 926                         buf_incr = done_to_proc[p] - curr_to_proc[p];
 927                         ADIOI_BUF_INCR
 928       ADIOI_Assert((curr_to_proc[p] + len - done_to_proc[p]) == (unsigned)(curr_to_proc[p] + len - done_to_proc[p]));
 929                         buf_incr = curr_to_proc[p] + len - done_to_proc[p];
 930       ADIOI_Assert((done_to_proc[p] + size) == (unsigned)(done_to_proc[p] + size));
 931                         /* ok to cast: bounded by cb buffer size */
 932                         curr_to_proc[p] = done_to_proc[p] + (int)size;
 933                         ADIOI_BUF_COPY
 934                     }
 935                     else {
 936                         size = ADIOI_MIN(len,send_size[p]-send_buf_idx[p]);
 937                         buf_incr = len;
 938       ADIOI_Assert((curr_to_proc[p] + size) == (unsigned)((ADIO_Offset)curr_to_proc[p] + size));
 939                         curr_to_proc[p] += size;
 940                         ADIOI_BUF_COPY
 941                     }
 942                     if (send_buf_idx[p] == send_size[p]) {
 943                         MPI_Isend(send_buf[p], send_size[p], MPI_BYTE, p, 
 944                                 myrank+p+100*iter, fd->comm, requests+jj);
 945                         jj++;
 946                     }
 947                 }
 948                 else {
 949         ADIOI_Assert((curr_to_proc[p] + len) == (unsigned)((ADIO_Offset)curr_to_proc[p] + len));
 950                     curr_to_proc[p] += len;
 951                     buf_incr = len;
 952                     ADIOI_BUF_INCR
 953                 }
 954             }
 955             else {
 956                 buf_incr = len;
 957                 ADIOI_BUF_INCR
 958             }
 959             off     += len;
 960             rem_len -= len;
 961         }
 962     }
 963     for (i=0; i < nprocs; i++) 
 964         if (send_size[i]) sent_to_proc[i] = curr_to_proc[i];
 965 }
 966 
 967 
 968 
 969 void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count, 
 970                       ADIO_Offset *srt_off, int *srt_len, int *start_pos,
 971                       int nprocs, int nprocs_recv, int total_elements)
 972 {
 973     typedef struct {
 974         ADIO_Offset *off_list;
 975         ADIO_Offset *len_list;
 976         int nelem;
 977     } heap_struct;
 978 
 979     heap_struct *a, tmp;
 980     int i, j, heapsize, l, r, k, smallest;
 981 
 982     a = (heap_struct *) ADIOI_Malloc((nprocs_recv+1)*sizeof(heap_struct));
 983 
 984     j = 0;
 985     for (i=0; i<nprocs; i++)
 986         if (count[i]) {
 987             a[j].off_list = &(others_req[i].offsets[start_pos[i]]);
 988             a[j].len_list = &(others_req[i].lens[start_pos[i]]);
 989             a[j].nelem = count[i];
 990             j++;
 991         }
 992 
 993     /* build a heap out of the first element from each list, with
 994        the smallest element of the heap at the root */
 995 
 996     heapsize = nprocs_recv;
 997     for (i=heapsize/2 - 1; i>=0; i--) {
 998         /* Heapify(a, i, heapsize); Algorithm from Cormen et al. pg. 143
 999            modified for a heap with smallest element at root. I have 
1000            removed the recursion so that there are no function calls.
1001            Function calls are too expensive. */
1002         k = i;
1003         for(;;) {
1004             l = 2*(k+1) - 1;
1005             r = 2*(k+1);
1006 
1007             if ((l < heapsize) && 
1008                 (*(a[l].off_list) < *(a[k].off_list)))
1009                 smallest = l;
1010             else smallest = k;
1011 
1012             if ((r < heapsize) && 
1013                 (*(a[r].off_list) < *(a[smallest].off_list)))
1014                 smallest = r;
1015 
1016             if (smallest != k) {
1017                 tmp.off_list = a[k].off_list;
1018                 tmp.len_list = a[k].len_list;
1019                 tmp.nelem = a[k].nelem;
1020 
1021                 a[k].off_list = a[smallest].off_list;
1022                 a[k].len_list = a[smallest].len_list;
1023                 a[k].nelem = a[smallest].nelem;
1024                 
1025                 a[smallest].off_list = tmp.off_list;
1026                 a[smallest].len_list = tmp.len_list;
1027                 a[smallest].nelem = tmp.nelem;
1028             
1029                 k = smallest;
1030             }
1031             else break;
1032         }
1033     }
1034 
1035     for (i=0; i<total_elements; i++) {
1036         /* extract smallest element from heap, i.e. the root */
1037         srt_off[i] = *(a[0].off_list);
1038         srt_len[i] = *(a[0].len_list);
1039         (a[0].nelem)--;
1040 
1041         if (!a[0].nelem) {
1042             a[0].off_list = a[heapsize-1].off_list;
1043             a[0].len_list = a[heapsize-1].len_list;
1044             a[0].nelem = a[heapsize-1].nelem;
1045             heapsize--;
1046         }
1047         else {
1048             (a[0].off_list)++;
1049             (a[0].len_list)++;
1050         }
1051 
1052         /* Heapify(a, 0, heapsize); */
1053         k = 0;
1054         for (;;) {
1055             l = 2*(k+1) - 1;
1056             r = 2*(k+1);
1057 
1058             if ((l < heapsize) && 
1059                 (*(a[l].off_list) < *(a[k].off_list)))
1060                 smallest = l;
1061             else smallest = k;
1062 
1063             if ((r < heapsize) && 
1064                 (*(a[r].off_list) < *(a[smallest].off_list)))
1065                 smallest = r;
1066 
1067             if (smallest != k) {
1068                 tmp.off_list = a[k].off_list;
1069                 tmp.len_list = a[k].len_list;
1070                 tmp.nelem = a[k].nelem;
1071 
1072                 a[k].off_list = a[smallest].off_list;
1073                 a[k].len_list = a[smallest].len_list;
1074                 a[k].nelem = a[smallest].nelem;
1075                 
1076                 a[smallest].off_list = tmp.off_list;
1077                 a[smallest].len_list = tmp.len_list;
1078                 a[smallest].nelem = tmp.nelem;
1079             
1080                 k = smallest;
1081             }
1082             else break;
1083         }
1084     }
1085     ADIOI_Free(a);
1086 }

/* [<][>][^][v][top][bottom][index][help] */