root/ompi/mca/io/romio321/romio/adio/ad_gpfs/ad_gpfs_rdcoll.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. ADIOI_GPFS_ReadStridedColl
  2. ADIOI_Read_and_exch
  3. ADIOI_R_Exchange_data
  4. ADIOI_Fill_user_buffer
  5. ADIOI_R_Exchange_data_alltoallv

   1 /* ---------------------------------------------------------------- */
   2 /* (C)Copyright IBM Corp.  2007, 2008                               */
   3 /* ---------------------------------------------------------------- */
   4 /**
   5  * \file ad_gpfs_rdcoll.c
   6  * \brief ???
   7  */
   8 
   9 /* -*- Mode: C; c-basic-offset:4 ; -*- */
  10 /* 
  11  *
  12  *   Copyright (C) 1997 University of Chicago. 
  13  *   See COPYRIGHT notice in top-level directory.
  14  */
  15 
  16 #include "adio.h"
  17 #include "adio_extern.h"
  18 #include "ad_gpfs.h"
  19 #include "ad_gpfs_aggrs.h"
  20 
  21 #ifdef PROFILE
  22 #include "mpe.h"
  23 #endif
  24 
  25 #ifdef USE_DBG_LOGGING
  26   #define RDCOLL_DEBUG 1
  27 #endif
  28 #ifdef AGGREGATION_PROFILE
  29 #include "mpe.h"
  30 #endif
  31 
  32 /* prototypes of functions used for collective reads only. */
  33 static void ADIOI_Read_and_exch(ADIO_File fd, void *buf, MPI_Datatype
  34                                 datatype, int nprocs,
  35                                 int myrank, ADIOI_Access
  36                                 *others_req, ADIO_Offset *offset_list,
  37                                 ADIO_Offset *len_list, int contig_access_count, 
  38                                 ADIO_Offset
  39                                 min_st_offset, ADIO_Offset fd_size,
  40                                 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
  41                                 int *buf_idx, int *error_code);
  42 static void ADIOI_R_Exchange_data(ADIO_File fd, void *buf, ADIOI_Flatlist_node
  43                                   *flat_buf, ADIO_Offset *offset_list, ADIO_Offset
  44                                   *len_list, int *send_size, int *recv_size,
  45                                   int *count, int *start_pos, 
  46                                   int *partial_send, 
  47                                   int *recd_from_proc, int nprocs, 
  48                                   int myrank, int
  49                                   buftype_is_contig, int contig_access_count,
  50                                   ADIO_Offset min_st_offset, 
  51                                   ADIO_Offset fd_size,
  52                                   ADIO_Offset *fd_start, ADIO_Offset *fd_end, 
  53                                   ADIOI_Access *others_req, 
  54                                   int iter, 
  55                                   MPI_Aint buftype_extent, int *buf_idx);
  56 static void ADIOI_R_Exchange_data_alltoallv(ADIO_File fd, void *buf, ADIOI_Flatlist_node
  57                                   *flat_buf, ADIO_Offset *offset_list, ADIO_Offset
  58                                   *len_list, int *send_size, int *recv_size,
  59                                   int *count, int *start_pos,
  60                                   int *partial_send,
  61                                   int *recd_from_proc, int nprocs,
  62                                   int myrank, int
  63                                   buftype_is_contig, int contig_access_count,
  64                                   ADIO_Offset min_st_offset,
  65                                   ADIO_Offset fd_size,
  66                                   ADIO_Offset *fd_start, ADIO_Offset *fd_end,
  67                                   ADIOI_Access *others_req,
  68                                   int iter,
  69                                   MPI_Aint buftype_extent, int *buf_idx);
  70 static void ADIOI_Fill_user_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
  71                                    *flat_buf, char **recv_buf, ADIO_Offset 
  72                                    *offset_list, ADIO_Offset *len_list, 
  73                                    unsigned *recv_size, 
  74                                    MPI_Request *requests, MPI_Status *statuses,
  75                                    int *recd_from_proc, int nprocs,
  76                                    int contig_access_count, 
  77                                    ADIO_Offset min_st_offset, 
  78                                    ADIO_Offset fd_size, ADIO_Offset *fd_start, 
  79                                    ADIO_Offset *fd_end,
  80                                    MPI_Aint buftype_extent);
  81 
  82 extern void ADIOI_Calc_my_off_len(ADIO_File fd, int bufcount, MPI_Datatype
  83                             datatype, int file_ptr_type, ADIO_Offset
  84                             offset, ADIO_Offset **offset_list_ptr, ADIO_Offset
  85                             **len_list_ptr, ADIO_Offset *start_offset_ptr,
  86                             ADIO_Offset *end_offset_ptr, int
  87                            *contig_access_count_ptr);
  88 
  89 
  90 
  91 void ADIOI_GPFS_ReadStridedColl(ADIO_File fd, void *buf, int count,
  92                                MPI_Datatype datatype, int file_ptr_type,
  93                                ADIO_Offset offset, ADIO_Status *status, int
  94                                *error_code)
  95 {
  96 /* Uses a generalized version of the extended two-phase method described
  97    in "An Extended Two-Phase Method for Accessing Sections of 
  98    Out-of-Core Arrays", Rajeev Thakur and Alok Choudhary,
  99    Scientific Programming, (5)4:301--317, Winter 1996. 
 100    http://www.mcs.anl.gov/home/thakur/ext2ph.ps */
 101 
 102     ADIOI_Access *my_req; 
 103     /* array of nprocs structures, one for each other process in
 104        whose file domain this process's request lies */
 105     
 106     ADIOI_Access *others_req;
 107     /* array of nprocs structures, one for each other process
 108        whose request lies in this process's file domain. */
 109 
 110     int i, filetype_is_contig, nprocs, nprocs_for_coll, myrank;
 111     int contig_access_count=0, interleave_count = 0, buftype_is_contig;
 112     int *count_my_req_per_proc, count_my_req_procs, count_others_req_procs;
 113     ADIO_Offset start_offset, end_offset, orig_fp, fd_size, min_st_offset, off;
 114     ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *fd_start = NULL,
 115         *fd_end = NULL, *end_offsets = NULL;
 116     ADIO_Offset *gpfs_offsets0 = NULL, *gpfs_offsets = NULL;
 117     ADIO_Offset *count_sizes;
 118     int  ii;
 119     ADIO_Offset *len_list = NULL;
 120     int *buf_idx = NULL;
 121 
 122     GPFSMPIO_T_CIO_RESET( r);
 123 
 124 #ifdef HAVE_STATUS_SET_BYTES
 125     MPI_Count bufsize, size;
 126 #endif
 127 
 128 #if 0
 129 /*   From common code - not implemented for bg. */
 130     if (fd->hints->cb_pfr != ADIOI_HINT_DISABLE) {
 131         ADIOI_IOStridedColl (fd, buf, count, ADIOI_READ, datatype, 
 132                         file_ptr_type, offset, status, error_code);
 133         return;
 134     } */
 135 #endif
 136 #ifdef PROFILE
 137         MPE_Log_event(13, 0, "start computation");
 138 #endif
 139 
 140     MPI_Comm_size(fd->comm, &nprocs);
 141     MPI_Comm_rank(fd->comm, &myrank);
 142 
 143     /* number of aggregators, cb_nodes, is stored in the hints */
 144     nprocs_for_coll = fd->hints->cb_nodes;
 145     orig_fp = fd->fp_ind;
 146 
 147     GPFSMPIO_T_CIO_SET_GET( r, 1, 0, GPFSMPIO_CIO_T_MPIO_CRW, GPFSMPIO_CIO_LAST)
 148     GPFSMPIO_T_CIO_SET_GET( r, 1, 0, GPFSMPIO_CIO_T_LCOMP, GPFSMPIO_CIO_LAST )
 149 
 150     /* only check for interleaving if cb_read isn't disabled */
 151     if (fd->hints->cb_read != ADIOI_HINT_DISABLE) {
 152         /* For this process's request, calculate the list of offsets and
 153            lengths in the file and determine the start and end offsets. */
 154 
 155         /* Note: end_offset points to the last byte-offset that will be accessed.
 156            e.g., if start_offset=0 and 100 bytes to be read, end_offset=99*/
 157 
 158         ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,
 159                               &offset_list, &len_list, &start_offset,
 160                               &end_offset, &contig_access_count);
 161 
 162     GPFSMPIO_T_CIO_SET_GET( r, 1, 1, GPFSMPIO_CIO_T_GATHER, GPFSMPIO_CIO_T_LCOMP )
 163 
 164 #ifdef RDCOLL_DEBUG
 165     for (i=0; i<contig_access_count; i++) {
 166               DBG_FPRINTF(stderr, "rank %d  off %lld  len %lld\n", 
 167                               myrank, offset_list[i], len_list[i]);
 168     }
 169 #endif
 170 
 171         /* each process communicates its start and end offsets to other 
 172            processes. The result is an array each of start and end offsets
 173            stored in order of process rank. */ 
 174     
 175         st_offsets   = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
 176         end_offsets  = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
 177 
 178     ADIO_Offset my_count_size=0;
 179     /* One-sided aggregation needs the amount of data per rank as well because the difference in
 180      * starting and ending offsets for 1 byte is 0 the same as 0 bytes so it cannot be distiguished.
 181      */
 182     if ((gpfsmpio_read_aggmethod == 1) || (gpfsmpio_read_aggmethod == 2)) {
 183         count_sizes = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
 184         MPI_Count buftype_size;
 185         MPI_Type_size_x(datatype, &buftype_size);
 186         my_count_size = (ADIO_Offset) count  * (ADIO_Offset)buftype_size;
 187     }
 188     if (gpfsmpio_tunegather) {
 189       if ((gpfsmpio_read_aggmethod == 1) || (gpfsmpio_read_aggmethod == 2)) {
 190         gpfs_offsets0 = (ADIO_Offset *) ADIOI_Malloc(3*nprocs*sizeof(ADIO_Offset));
 191         gpfs_offsets  = (ADIO_Offset *) ADIOI_Malloc(3*nprocs*sizeof(ADIO_Offset));
 192         for (ii=0; ii<nprocs; ii++)  {
 193           gpfs_offsets0[ii*3]   = 0;
 194           gpfs_offsets0[ii*3+1] = 0;
 195           gpfs_offsets0[ii*3+2] = 0;
 196         }
 197         gpfs_offsets0[myrank*3]   = start_offset;
 198         gpfs_offsets0[myrank*3+1] =   end_offset;
 199         gpfs_offsets0[myrank*3+2] =   my_count_size;
 200         MPI_Allreduce( gpfs_offsets0, gpfs_offsets, nprocs*3, ADIO_OFFSET, MPI_MAX, fd->comm );
 201         for (ii=0; ii<nprocs; ii++)  {
 202           st_offsets [ii] = gpfs_offsets[ii*3]  ;
 203           end_offsets[ii] = gpfs_offsets[ii*3+1];
 204           count_sizes[ii] = gpfs_offsets[ii*3+2];
 205         }
 206       }
 207       else {
 208             gpfs_offsets0 = (ADIO_Offset *) ADIOI_Malloc(2*nprocs*sizeof(ADIO_Offset));
 209             gpfs_offsets  = (ADIO_Offset *) ADIOI_Malloc(2*nprocs*sizeof(ADIO_Offset));
 210             for (ii=0; ii<nprocs; ii++)  {
 211                 gpfs_offsets0[ii*2]   = 0;
 212                 gpfs_offsets0[ii*2+1] = 0;
 213             }
 214             gpfs_offsets0[myrank*2]   = start_offset;
 215             gpfs_offsets0[myrank*2+1] =   end_offset;
 216 
 217         MPI_Allreduce( gpfs_offsets0, gpfs_offsets, nprocs*2, ADIO_OFFSET, MPI_MAX, fd->comm );
 218 
 219             for (ii=0; ii<nprocs; ii++)  {
 220                 st_offsets [ii] = gpfs_offsets[ii*2]  ;
 221                 end_offsets[ii] = gpfs_offsets[ii*2+1];
 222             }
 223       }
 224             ADIOI_Free( gpfs_offsets0 );
 225             ADIOI_Free( gpfs_offsets  );
 226     } else {
 227         MPI_Allgather(&start_offset, 1, ADIO_OFFSET, st_offsets, 1,
 228                       ADIO_OFFSET, fd->comm);
 229         MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1,
 230                       ADIO_OFFSET, fd->comm);
 231         if ((gpfsmpio_read_aggmethod == 1) || (gpfsmpio_read_aggmethod == 2)) {
 232               MPI_Allgather(&count_sizes, 1, ADIO_OFFSET, count_sizes, 1,
 233                         ADIO_OFFSET, fd->comm);
 234         }
 235     }
 236 
 237     GPFSMPIO_T_CIO_SET_GET( r, 1, 1, GPFSMPIO_CIO_T_PATANA, GPFSMPIO_CIO_T_GATHER )
 238 
 239         /* are the accesses of different processes interleaved? */
 240         for (i=1; i<nprocs; i++)
 241             if ((st_offsets[i] < end_offsets[i-1]) && 
 242                 (st_offsets[i] <= end_offsets[i]))
 243                 interleave_count++;
 244         /* This is a rudimentary check for interleaving, but should suffice
 245            for the moment. */
 246     }
 247 
 248     ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
 249 
 250     if (fd->hints->cb_read == ADIOI_HINT_DISABLE
 251         || (!interleave_count && (fd->hints->cb_read == ADIOI_HINT_AUTO))) 
 252     {
 253         /* don't do aggregation */
 254         if (fd->hints->cb_read != ADIOI_HINT_DISABLE) {
 255             ADIOI_Free(offset_list);
 256             ADIOI_Free(len_list);
 257             ADIOI_Free(st_offsets);
 258             ADIOI_Free(end_offsets);
 259         }
 260 
 261         fd->fp_ind = orig_fp;
 262         ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
 263 
 264         if (buftype_is_contig && filetype_is_contig) {
 265             if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
 266                 off = fd->disp + (ADIO_Offset)(fd->etype_size) * offset;
 267                 ADIO_ReadContig(fd, buf, count, datatype, ADIO_EXPLICIT_OFFSET,
 268                        off, status, error_code);
 269             }
 270             else ADIO_ReadContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
 271                        0, status, error_code);
 272         }
 273         else ADIO_ReadStrided(fd, buf, count, datatype, file_ptr_type,
 274                        offset, status, error_code);
 275 
 276         return;
 277     }
 278 
 279     GPFSMPIO_T_CIO_SET_GET( r, 1, 1, GPFSMPIO_CIO_T_FD_PART, GPFSMPIO_CIO_T_PATANA )
 280 
 281     /* We're going to perform aggregation of I/O.  Here we call
 282      * ADIOI_Calc_file_domains() to determine what processes will handle I/O
 283      * to what regions.  We pass nprocs_for_coll into this function; it is
 284      * used to determine how many processes will perform I/O, which is also
 285      * the number of regions into which the range of bytes must be divided.
 286      * These regions are called "file domains", or FDs.
 287      *
 288      * When this function returns, fd_start, fd_end, fd_size, and
 289      * min_st_offset will be filled in.  fd_start holds the starting byte
 290      * location for each file domain.  fd_end holds the ending byte location.
 291      * min_st_offset holds the minimum byte location that will be accessed.
 292      *
 293      * Both fd_start[] and fd_end[] are indexed by an aggregator number; this
 294      * needs to be mapped to an actual rank in the communicator later.
 295      *
 296      */
 297     int currentNonZeroDataIndex = 0;
 298     if ((gpfsmpio_read_aggmethod == 1) || (gpfsmpio_read_aggmethod == 2)) {
 299       /* Take out the 0-data offsets by shifting the indexes with data to the
 300        * front and keeping track of the non-zero data index for use as the
 301        * length.  By doing this we will optimally use all available aggs
 302        * and spread the actual data across them instead of having offsets
 303        * with empty data potentially dilute the file domains and create
 304        * problems for the one-sided aggregation.
 305        */
 306       for (i=0; i<nprocs; i++) {
 307         if (count_sizes[i] > 0) {
 308           st_offsets[currentNonZeroDataIndex] = st_offsets[i];
 309           end_offsets[currentNonZeroDataIndex] = end_offsets[i];
 310           currentNonZeroDataIndex++;
 311         }
 312       }
 313     }
 314     if (gpfsmpio_tuneblocking) {
 315     if ((gpfsmpio_read_aggmethod == 1) || (gpfsmpio_read_aggmethod == 2)) {
 316     ADIOI_GPFS_Calc_file_domains(fd, st_offsets, end_offsets, currentNonZeroDataIndex,
 317                             nprocs_for_coll, &min_st_offset,
 318                             &fd_start, &fd_end, &fd_size, fd->fs_ptr);
 319     }
 320     else {
 321     ADIOI_GPFS_Calc_file_domains(fd, st_offsets, end_offsets, nprocs,
 322                             nprocs_for_coll, &min_st_offset,
 323                             &fd_start, &fd_end, &fd_size, fd->fs_ptr);
 324     }
 325     }
 326     else {
 327     if ((gpfsmpio_read_aggmethod == 1) || (gpfsmpio_read_aggmethod == 2)) {
 328     ADIOI_Calc_file_domains(st_offsets, end_offsets, currentNonZeroDataIndex,
 329                             nprocs_for_coll, &min_st_offset,
 330                             &fd_start, &fd_end,
 331                             fd->hints->min_fdomain_size, &fd_size,
 332                             fd->hints->striping_unit);
 333         }
 334     else {
 335     ADIOI_Calc_file_domains(st_offsets, end_offsets, nprocs,
 336                             nprocs_for_coll, &min_st_offset,
 337                             &fd_start, &fd_end,
 338                             fd->hints->min_fdomain_size, &fd_size, 
 339                             fd->hints->striping_unit);
 340         }
 341     }
 342 
 343     GPFSMPIO_T_CIO_SET_GET( r, 1, 1, GPFSMPIO_CIO_T_MYREQ, GPFSMPIO_CIO_T_FD_PART );
 344     if ((gpfsmpio_read_aggmethod == 1) || (gpfsmpio_read_aggmethod == 2)) {
 345     /* If the user has specified to use a one-sided aggregation method then do that at
 346      * this point instead of the two-phase I/O.
 347      */
 348       ADIOI_OneSidedReadAggregation(fd, offset_list, len_list, contig_access_count, buf,
 349                     datatype,error_code, st_offsets, end_offsets, currentNonZeroDataIndex, fd_start, fd_end);
 350       GPFSMPIO_T_CIO_REPORT( 0, fd, myrank, nprocs)
 351       ADIOI_Free(offset_list);
 352       ADIOI_Free(len_list);
 353       ADIOI_Free(st_offsets);
 354       ADIOI_Free(end_offsets);
 355       ADIOI_Free(fd_start);
 356       ADIOI_Free(fd_end);
 357       ADIOI_Free(count_sizes);
 358           goto fn_exit;
 359     }
 360     if (gpfsmpio_p2pcontig==1) {
 361         /* For some simple yet common(?) workloads, full-on two-phase I/O is
 362          * overkill.  We can establish sub-groups of processes and their
 363          * aggregator, and then these sub-groups will carry out a simplified
 364          * two-phase over that sub-group.
 365          *
 366          * First verify that the filetype is contig and the offsets are
 367          * increasing in rank order*/
 368         int x, inOrderAndNoGaps = 1;
 369         for (x=0;x<(nprocs-1);x++) {
 370             if (end_offsets[x] != (st_offsets[x+1]-1))
 371                 inOrderAndNoGaps = 0;
 372         }
 373         if (inOrderAndNoGaps && buftype_is_contig) {
 374             /* if these conditions exist then execute the P2PContig code else
 375              * execute the original code */
 376             ADIOI_P2PContigReadAggregation(fd, buf,
 377                     error_code, st_offsets, end_offsets, fd_start, fd_end);
 378 
 379             /* NOTE: we are skipping the rest of two-phase in this path */
 380             GPFSMPIO_T_CIO_REPORT( 0, fd, myrank, nprocs)
 381 
 382             ADIOI_Free(offset_list);
 383             ADIOI_Free(len_list);
 384             ADIOI_Free(st_offsets);
 385             ADIOI_Free(end_offsets);
 386             ADIOI_Free(fd_start);
 387             ADIOI_Free(fd_end);
 388             goto fn_exit;
 389 
 390         }
 391     }
 392 
 393     /* calculate where the portions of the access requests of this process 
 394      * are located in terms of the file domains.  this could be on the same
 395      * process or on other processes.  this function fills in:
 396      * count_my_req_procs - number of processes (including this one) for which
 397      *     this process has requests in their file domain
 398      * count_my_req_per_proc - count of requests for each process, indexed
 399      *     by rank of the process
 400      * my_req[] - array of data structures describing the requests to be
 401      *     performed by each process (including self).  indexed by rank.
 402      * buf_idx[] - array of locations into which data can be directly moved;
 403      *     this is only valid for contiguous buffer case
 404      */
 405     if (gpfsmpio_tuneblocking)
 406     ADIOI_GPFS_Calc_my_req(fd, offset_list, len_list, contig_access_count,
 407                       min_st_offset, fd_start, fd_end, fd_size,
 408                       nprocs, &count_my_req_procs, 
 409                       &count_my_req_per_proc, &my_req,
 410                       &buf_idx);
 411     else
 412     ADIOI_Calc_my_req(fd, offset_list, len_list, contig_access_count,
 413                       min_st_offset, fd_start, fd_end, fd_size,
 414                       nprocs, &count_my_req_procs, 
 415                       &count_my_req_per_proc, &my_req,
 416                       &buf_idx);
 417 
 418     GPFSMPIO_T_CIO_SET_GET( r, 1, 1, GPFSMPIO_CIO_T_OTHREQ, GPFSMPIO_CIO_T_MYREQ )
 419 
 420     /* perform a collective communication in order to distribute the
 421      * data calculated above.  fills in the following:
 422      * count_others_req_procs - number of processes (including this
 423      *     one) which have requests in this process's file domain.
 424      * count_others_req_per_proc[] - number of separate contiguous
 425      *     requests from proc i lie in this process's file domain.
 426      */
 427     if (gpfsmpio_tuneblocking)
 428     ADIOI_GPFS_Calc_others_req(fd, count_my_req_procs,
 429                           count_my_req_per_proc, my_req,
 430                           nprocs, myrank, &count_others_req_procs,
 431                           &others_req);
 432 
 433     else
 434     ADIOI_Calc_others_req(fd, count_my_req_procs, 
 435                           count_my_req_per_proc, my_req, 
 436                           nprocs, myrank, &count_others_req_procs, 
 437                           &others_req); 
 438 
 439     GPFSMPIO_T_CIO_SET_GET( r, 1, 1, GPFSMPIO_CIO_T_DEXCH, GPFSMPIO_CIO_T_OTHREQ )
 440 
 441     /* my_req[] and count_my_req_per_proc aren't needed at this point, so 
 442      * let's free the memory 
 443      */
 444     ADIOI_Free(count_my_req_per_proc);
 445     for (i=0; i<nprocs; i++) {
 446         if (my_req[i].count) {
 447             ADIOI_Free(my_req[i].offsets);
 448             ADIOI_Free(my_req[i].lens);
 449         }
 450     }
 451     ADIOI_Free(my_req);
 452 
 453 
 454     /* read data in sizes of no more than ADIOI_Coll_bufsize, 
 455      * communicate, and fill user buf. 
 456      */
 457     ADIOI_Read_and_exch(fd, buf, datatype, nprocs, myrank,
 458                         others_req, offset_list,
 459                         len_list, contig_access_count, min_st_offset,
 460                         fd_size, fd_start, fd_end, buf_idx, error_code);
 461 
 462     GPFSMPIO_T_CIO_SET_GET( r, 0, 1, GPFSMPIO_CIO_LAST, GPFSMPIO_CIO_T_DEXCH )
 463     GPFSMPIO_T_CIO_SET_GET( r, 0, 1, GPFSMPIO_CIO_LAST, GPFSMPIO_CIO_T_MPIO_CRW )
 464 
 465     GPFSMPIO_T_CIO_REPORT( 0, fd, myrank, nprocs)
 466 
 467     if (!buftype_is_contig) ADIOI_Delete_flattened(datatype);
 468 
 469     /* free all memory allocated for collective I/O */
 470     for (i=0; i<nprocs; i++) {
 471         if (others_req[i].count) {
 472             ADIOI_Free(others_req[i].offsets);
 473             ADIOI_Free(others_req[i].lens);
 474             ADIOI_Free(others_req[i].mem_ptrs);
 475         }
 476     }
 477     ADIOI_Free(others_req);
 478 
 479     ADIOI_Free(buf_idx);
 480     ADIOI_Free(offset_list);
 481     ADIOI_Free(len_list);
 482     ADIOI_Free(st_offsets);
 483     ADIOI_Free(end_offsets);
 484     ADIOI_Free(fd_start);
 485     ADIOI_Free(fd_end);
 486 
 487 fn_exit:
 488 #ifdef HAVE_STATUS_SET_BYTES
 489     MPI_Type_size_x(datatype, &size);
 490     bufsize = size * count;
 491     MPIR_Status_set_bytes(status, datatype, bufsize);
 492 /* This is a temporary way of filling in status. The right way is to 
 493    keep track of how much data was actually read and placed in buf 
 494    during collective I/O. */
 495 #endif
 496 
 497     fd->fp_sys_posn = -1;   /* set it to null. */
 498 }
 499 
 500 static void ADIOI_Read_and_exch(ADIO_File fd, void *buf, MPI_Datatype
 501                          datatype, int nprocs,
 502                          int myrank, ADIOI_Access
 503                          *others_req, ADIO_Offset *offset_list,
 504                          ADIO_Offset *len_list, int contig_access_count, ADIO_Offset
 505                          min_st_offset, ADIO_Offset fd_size,
 506                          ADIO_Offset *fd_start, ADIO_Offset *fd_end,
 507                          int *buf_idx, int *error_code)
 508 {
 509 /* Read in sizes of no more than coll_bufsize, an info parameter.
 510    Send data to appropriate processes. 
 511    Place recd. data in user buf.
 512    The idea is to reduce the amount of extra memory required for
 513    collective I/O. If all data were read all at once, which is much
 514    easier, it would require temp space more than the size of user_buf,
 515    which is often unacceptable. For example, to read a distributed
 516    array from a file, where each local array is 8Mbytes, requiring
 517    at least another 8Mbytes of temp space is unacceptable. */
 518 
 519     int i, j, m, ntimes, max_ntimes, buftype_is_contig;
 520     ADIO_Offset st_loc=-1, end_loc=-1, off, done, real_off, req_off;
 521     char *read_buf = NULL, *tmp_buf;
 522     int *curr_offlen_ptr, *count, *send_size, *recv_size;
 523     int *partial_send, *recd_from_proc, *start_pos;
 524     /* Not convinced end_loc-st_loc couldn't be > int, so make these offsets*/
 525     ADIO_Offset real_size, size, for_curr_iter, for_next_iter;
 526     int req_len, flag, rank;
 527     MPI_Status status;
 528     ADIOI_Flatlist_node *flat_buf=NULL;
 529     MPI_Aint buftype_extent, buftype_lb;
 530     int coll_bufsize;
 531 #ifdef RDCOLL_DEBUG
 532     int iii;
 533 #endif
 534     *error_code = MPI_SUCCESS;  /* changed below if error */
 535     /* only I/O errors are currently reported */
 536     
 537 /* calculate the number of reads of size coll_bufsize
 538    to be done by each process and the max among all processes.
 539    That gives the no. of communication phases as well.
 540    coll_bufsize is obtained from the hints object. */
 541 
 542     coll_bufsize = fd->hints->cb_buffer_size;
 543 
 544     /* grab some initial values for st_loc and end_loc */
 545     for (i=0; i < nprocs; i++) {
 546         if (others_req[i].count) {
 547             st_loc = others_req[i].offsets[0];
 548             end_loc = others_req[i].offsets[0];
 549             break;
 550         }
 551     }
 552 
 553     /* now find the real values */
 554     for (i=0; i < nprocs; i++)
 555         for (j=0; j<others_req[i].count; j++) {
 556             st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);
 557             end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j]
 558                                           + others_req[i].lens[j] - 1));
 559         }
 560 
 561     /* calculate ntimes, the number of times this process must perform I/O
 562      * operations in order to complete all the requests it has received.
 563      * the need for multiple I/O operations comes from the restriction that
 564      * we only use coll_bufsize bytes of memory for internal buffering.
 565      */
 566     if ((st_loc==-1) && (end_loc==-1)) {
 567         /* this process does no I/O. */
 568         ntimes = 0;
 569     }
 570     else {
 571         /* ntimes=ceiling_div(end_loc - st_loc + 1, coll_bufsize)*/
 572         ntimes = (int) ((end_loc - st_loc + coll_bufsize)/coll_bufsize);
 573     }
 574 
 575     MPI_Allreduce(&ntimes, &max_ntimes, 1, MPI_INT, MPI_MAX, fd->comm); 
 576 
 577     read_buf = fd->io_buf;
 578 
 579     curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int)); 
 580     /* its use is explained below. calloc initializes to 0. */
 581 
 582     count = (int *) ADIOI_Malloc(nprocs * sizeof(int));
 583     /* to store count of how many off-len pairs per proc are satisfied
 584        in an iteration. */
 585 
 586     partial_send = (int *) ADIOI_Calloc(nprocs, sizeof(int));
 587     /* if only a portion of the last off-len pair is sent to a process 
 588        in a particular iteration, the length sent is stored here.
 589        calloc initializes to 0. */
 590 
 591     send_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
 592     /* total size of data to be sent to each proc. in an iteration */
 593 
 594     recv_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
 595     /* total size of data to be recd. from each proc. in an iteration.
 596        Of size nprocs so that I can use MPI_Alltoall later. */
 597 
 598     recd_from_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int));
 599     /* amount of data recd. so far from each proc. Used in
 600        ADIOI_Fill_user_buffer. initialized to 0 here. */
 601 
 602     start_pos = (int *) ADIOI_Malloc(nprocs*sizeof(int));
 603     /* used to store the starting value of curr_offlen_ptr[i] in 
 604        this iteration */
 605 
 606     ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
 607     if (!buftype_is_contig) {
 608         flat_buf = ADIOI_Flatten_and_find(datatype);
 609     }
 610     MPI_Type_get_extent(datatype, &buftype_lb, &buftype_extent);
 611 
 612     done = 0;
 613     off = st_loc;
 614     for_curr_iter = for_next_iter = 0;
 615 
 616     MPI_Comm_rank(fd->comm, &rank);
 617 
 618 #ifdef PROFILE
 619         MPE_Log_event(14, 0, "end computation");
 620 #endif
 621 
 622     for (m=0; m<ntimes; m++) {
 623        /* read buf of size coll_bufsize (or less) */
 624        /* go through all others_req and check if any are satisfied
 625           by the current read */
 626 
 627        /* since MPI guarantees that displacements in filetypes are in 
 628           monotonically nondecreasing order, I can maintain a pointer
 629           (curr_offlen_ptr) to 
 630           current off-len pair for each process in others_req and scan
 631           further only from there. There is still a problem of filetypes
 632           such as:  (1, 2, 3 are not process nos. They are just numbers for
 633           three chunks of data, specified by a filetype.)
 634 
 635                    1  -------!--
 636                    2    -----!----
 637                    3       --!-----
 638 
 639           where ! indicates where the current read_size limitation cuts 
 640           through the filetype.  I resolve this by reading up to !, but
 641           filling the communication buffer only for 1. I copy the portion
 642           left over for 2 into a tmp_buf for use in the next
 643           iteration. i.e., 2 and 3 will be satisfied in the next
 644           iteration. This simplifies filling in the user's buf at the
 645           other end, as only one off-len pair with incomplete data
 646           will be sent. I also don't need to send the individual
 647           offsets and lens along with the data, as the data is being
 648           sent in a particular order. */ 
 649 
 650           /* off = start offset in the file for the data actually read in 
 651                    this iteration 
 652              size = size of data read corresponding to off
 653              real_off = off minus whatever data was retained in memory from
 654                   previous iteration for cases like 2, 3 illustrated above
 655              real_size = size plus the extra corresponding to real_off
 656              req_off = off in file for a particular contiguous request 
 657                        minus what was satisfied in previous iteration
 658              req_size = size corresponding to req_off */
 659 
 660 #ifdef PROFILE
 661         MPE_Log_event(13, 0, "start computation");
 662 #endif
 663         size = ADIOI_MIN((unsigned)coll_bufsize, end_loc-st_loc+1-done); 
 664         real_off = off - for_curr_iter;
 665         real_size = size + for_curr_iter;
 666 
 667         for (i=0; i<nprocs; i++) count[i] = send_size[i] = 0;
 668         for_next_iter = 0;
 669 
 670         for (i=0; i<nprocs; i++) {
 671 #ifdef RDCOLL_DEBUG
 672             DBG_FPRINTF(stderr, "rank %d, i %d, others_count %d\n", rank, i, others_req[i].count); 
 673 #endif
 674             if (others_req[i].count) {
 675                 start_pos[i] = curr_offlen_ptr[i];
 676                 for (j=curr_offlen_ptr[i]; j<others_req[i].count;
 677                      j++) {
 678                     if (partial_send[i]) {
 679                         /* this request may have been partially
 680                            satisfied in the previous iteration. */
 681                         req_off = others_req[i].offsets[j] +
 682                             partial_send[i]; 
 683                         req_len = others_req[i].lens[j] -
 684                             partial_send[i];
 685                         partial_send[i] = 0;
 686                         /* modify the off-len pair to reflect this change */
 687                         others_req[i].offsets[j] = req_off;
 688                         others_req[i].lens[j] = req_len;
 689                     }
 690                     else {
 691                         req_off = others_req[i].offsets[j];
 692                         req_len = others_req[i].lens[j];
 693                     }
 694                     if (req_off < real_off + real_size) {
 695                         count[i]++;
 696       ADIOI_Assert((((ADIO_Offset)(MPIU_Upint)read_buf)+req_off-real_off) == (ADIO_Offset)(MPIU_Upint)(read_buf+req_off-real_off));
 697                         MPI_Get_address(read_buf+req_off-real_off, 
 698                                &(others_req[i].mem_ptrs[j]));
 699       ADIOI_Assert((real_off + real_size - req_off) == (int)(real_off + real_size - req_off));
 700                         send_size[i] += (int)(ADIOI_MIN(real_off + real_size - req_off, 
 701                                       (ADIO_Offset)(unsigned)req_len)); 
 702 
 703                         if (real_off+real_size-req_off < (ADIO_Offset)(unsigned)req_len) {
 704                             partial_send[i] = (int) (real_off + real_size - req_off);
 705                             if ((j+1 < others_req[i].count) && 
 706                                  (others_req[i].offsets[j+1] < 
 707                                      real_off+real_size)) { 
 708                                 /* this is the case illustrated in the
 709                                    figure above. */
 710                                 for_next_iter = ADIOI_MAX(for_next_iter,
 711                                           real_off + real_size - others_req[i].offsets[j+1]); 
 712                                 /* max because it must cover requests 
 713                                    from different processes */
 714                             }
 715                             break;
 716                         }
 717                     }
 718                     else break;
 719                 }
 720                 curr_offlen_ptr[i] = j;
 721             }
 722         }
 723 
 724         flag = 0;
 725         for (i=0; i<nprocs; i++)
 726             if (count[i]) flag = 1;
 727 
 728 #ifdef PROFILE
 729         MPE_Log_event(14, 0, "end computation");
 730 #endif
 731         if (flag) {
 732             char round[50];
 733             sprintf(round, "two-phase-round=%d", m);
 734             setenv("LIBIOLOG_EXTRA_INFO", round, 1);
 735       ADIOI_Assert(size == (int)size);
 736             ADIO_ReadContig(fd, read_buf+for_curr_iter, (int)size, MPI_BYTE,
 737                             ADIO_EXPLICIT_OFFSET, off, &status, error_code);
 738 #ifdef RDCOLL_DEBUG
 739             DBG_FPRINTF(stderr, "\tread_coll: 700, data read [%lld] = ", size );
 740             for (iii=0; iii<size && iii<80; iii++) { DBGV_FPRINTF(stderr, "%3d,", *((unsigned char *)read_buf + for_curr_iter + iii) ); }
 741             DBG_FPRINTF(stderr, "\n" );
 742 #endif
 743 
 744             if (*error_code != MPI_SUCCESS) return;
 745         }
 746         
 747         for_curr_iter = for_next_iter;
 748         
 749 #ifdef PROFILE
 750         MPE_Log_event(7, 0, "start communication");
 751 #endif
 752         if (gpfsmpio_comm == 1)
 753         ADIOI_R_Exchange_data(fd, buf, flat_buf, offset_list, len_list,
 754                             send_size, recv_size, count, 
 755                             start_pos, partial_send, recd_from_proc, nprocs,
 756                             myrank, 
 757                             buftype_is_contig, contig_access_count,
 758                             min_st_offset, fd_size, fd_start, fd_end,
 759                             others_req,
 760                             m, buftype_extent, buf_idx);
 761         else
 762         if (gpfsmpio_comm == 0) {
 763         ADIOI_R_Exchange_data_alltoallv(fd, buf, flat_buf, offset_list, len_list,
 764                             send_size, recv_size, count,
 765                             start_pos, partial_send, recd_from_proc, nprocs,
 766                             myrank,
 767                             buftype_is_contig, contig_access_count,
 768                             min_st_offset, fd_size, fd_start, fd_end,
 769                             others_req,
 770                             m, buftype_extent, buf_idx);
 771         }
 772 
 773 
 774 #ifdef PROFILE
 775         MPE_Log_event(8, 0, "end communication");
 776 #endif
 777 
 778         if (for_next_iter) {
 779             tmp_buf = (char *) ADIOI_Malloc(for_next_iter);
 780       ADIOI_Assert((((ADIO_Offset)(MPIU_Upint)read_buf)+real_size-for_next_iter) == (ADIO_Offset)(MPIU_Upint)(read_buf+real_size-for_next_iter));
 781       ADIOI_Assert((for_next_iter+coll_bufsize) == (size_t)(for_next_iter+coll_bufsize));
 782             memcpy(tmp_buf, read_buf+real_size-for_next_iter, for_next_iter);
 783             ADIOI_Free(fd->io_buf);
 784             fd->io_buf = (char *) ADIOI_Malloc(for_next_iter+coll_bufsize);
 785             memcpy(fd->io_buf, tmp_buf, for_next_iter);
 786             read_buf = fd->io_buf;
 787             ADIOI_Free(tmp_buf);
 788         }
 789 
 790         off += size;
 791         done += size;
 792     }
 793 
 794     for (i=0; i<nprocs; i++) count[i] = send_size[i] = 0;
 795 #ifdef PROFILE
 796         MPE_Log_event(7, 0, "start communication");
 797 #endif
 798     for (m=ntimes; m<max_ntimes; m++) 
 799 /* nothing to send, but check for recv. */
 800 
 801         if (gpfsmpio_comm == 1)
 802         ADIOI_R_Exchange_data(fd, buf, flat_buf, offset_list, len_list,
 803                             send_size, recv_size, count, 
 804                             start_pos, partial_send, recd_from_proc, nprocs,
 805                             myrank, 
 806                             buftype_is_contig, contig_access_count,
 807                             min_st_offset, fd_size, fd_start, fd_end,
 808                             others_req, m,
 809                             buftype_extent, buf_idx); 
 810         else    /* strncmp( env_switch, "alltoall", 8 ) == 0 */
 811         if (gpfsmpio_comm == 0)
 812         ADIOI_R_Exchange_data_alltoallv(fd, buf, flat_buf, offset_list, len_list,
 813                             send_size, recv_size, count, 
 814                             start_pos, partial_send, recd_from_proc, nprocs,
 815                             myrank, 
 816                             buftype_is_contig, contig_access_count,
 817                             min_st_offset, fd_size, fd_start, fd_end,
 818                             others_req, 
 819                             m, buftype_extent, buf_idx);
 820 
 821 #ifdef PROFILE
 822         MPE_Log_event(8, 0, "end communication");
 823 #endif
 824 
 825     ADIOI_Free(curr_offlen_ptr);
 826     ADIOI_Free(count);
 827     ADIOI_Free(partial_send);
 828     ADIOI_Free(send_size);
 829     ADIOI_Free(recv_size);
 830     ADIOI_Free(recd_from_proc);
 831     ADIOI_Free(start_pos);
 832 
 833     unsetenv("LIBIOLOG_EXTRA_INFO");
 834 }
 835 
 836 static void ADIOI_R_Exchange_data(ADIO_File fd, void *buf, ADIOI_Flatlist_node
 837                          *flat_buf, ADIO_Offset *offset_list, ADIO_Offset
 838                          *len_list, int *send_size, int *recv_size,
 839                          int *count, int *start_pos, int *partial_send, 
 840                          int *recd_from_proc, int nprocs, 
 841                          int myrank, int
 842                          buftype_is_contig, int contig_access_count,
 843                          ADIO_Offset min_st_offset, ADIO_Offset fd_size,
 844                          ADIO_Offset *fd_start, ADIO_Offset *fd_end, 
 845                          ADIOI_Access *others_req, 
 846                          int iter, MPI_Aint buftype_extent, int *buf_idx)
 847 {
 848     int i, j, k=0, tmp=0, nprocs_recv, nprocs_send;
 849     char **recv_buf = NULL; 
 850     MPI_Request *requests;
 851     MPI_Datatype send_type;
 852     MPI_Status *statuses;
 853 
 854 /* exchange send_size info so that each process knows how much to
 855    receive from whom and how much memory to allocate. */
 856 
 857     MPI_Alltoall(send_size, 1, MPI_INT, recv_size, 1, MPI_INT, fd->comm);
 858 
 859     nprocs_recv = 0;
 860     for (i=0; i < nprocs; i++) if (recv_size[i]) nprocs_recv++;
 861 
 862     nprocs_send = 0;
 863     for (i=0; i<nprocs; i++) if (send_size[i]) nprocs_send++;
 864 
 865     requests = (MPI_Request *)
 866         ADIOI_Malloc((nprocs_send+nprocs_recv+1)*sizeof(MPI_Request));
 867 /* +1 to avoid a 0-size malloc */
 868 
 869 /* post recvs. if buftype_is_contig, data can be directly recd. into
 870    user buf at location given by buf_idx. else use recv_buf. */
 871 
 872 #ifdef AGGREGATION_PROFILE
 873     MPE_Log_event (5032, 0, NULL);
 874 #endif
 875 
 876     if (buftype_is_contig) {
 877         j = 0;
 878         for (i=0; i < nprocs; i++) 
 879             if (recv_size[i]) { 
 880                 MPI_Irecv(((char *) buf) + buf_idx[i], recv_size[i], 
 881                   MPI_BYTE, i, myrank+i+100*iter, fd->comm, requests+j);
 882                 j++;
 883                 buf_idx[i] += recv_size[i];
 884             }
 885     }
 886     else {
 887 /* allocate memory for recv_buf and post receives */
 888         recv_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char*));
 889         for (i=0; i < nprocs; i++) 
 890             if (recv_size[i]) recv_buf[i] = 
 891                                   (char *) ADIOI_Malloc(recv_size[i]);
 892 
 893             j = 0;
 894             for (i=0; i < nprocs; i++) 
 895                 if (recv_size[i]) {
 896                     MPI_Irecv(recv_buf[i], recv_size[i], MPI_BYTE, i, 
 897                               myrank+i+100*iter, fd->comm, requests+j);
 898                     j++;
 899 #ifdef RDCOLL_DEBUG
 900                     DBG_FPRINTF(stderr, "node %d, recv_size %d, tag %d \n", 
 901                        myrank, recv_size[i], myrank+i+100*iter); 
 902 #endif
 903                 }
 904     }
 905 
 906 /* create derived datatypes and send data */
 907 
 908     j = 0;
 909     for (i=0; i<nprocs; i++) {
 910         if (send_size[i]) {
 911 /* take care if the last off-len pair is a partial send */
 912             if (partial_send[i]) {
 913                 k = start_pos[i] + count[i] - 1;
 914                 tmp = others_req[i].lens[k];
 915                 others_req[i].lens[k] = partial_send[i];
 916             }
 917             ADIOI_Type_create_hindexed_x(count[i],
 918                   &(others_req[i].lens[start_pos[i]]),
 919                     &(others_req[i].mem_ptrs[start_pos[i]]), 
 920                          MPI_BYTE, &send_type);
 921             /* absolute displacement; use MPI_BOTTOM in send */
 922             MPI_Type_commit(&send_type);
 923             MPI_Isend(MPI_BOTTOM, 1, send_type, i, myrank+i+100*iter,
 924                       fd->comm, requests+nprocs_recv+j);
 925             MPI_Type_free(&send_type);
 926             if (partial_send[i]) others_req[i].lens[k] = tmp;
 927             j++;
 928         }
 929     }
 930 
 931     statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send+nprocs_recv+1) * \
 932                                      sizeof(MPI_Status)); 
 933      /* +1 to avoid a 0-size malloc */
 934 
 935     /* wait on the receives */
 936     if (nprocs_recv) {
 937 #ifdef NEEDS_MPI_TEST
 938         j = 0;
 939         while (!j) MPI_Testall(nprocs_recv, requests, &j, statuses);
 940 #else
 941         MPI_Waitall(nprocs_recv, requests, statuses);
 942 #endif
 943 
 944         /* if noncontiguous, to the copies from the recv buffers */
 945         if (!buftype_is_contig) 
 946             ADIOI_Fill_user_buffer(fd, buf, flat_buf, recv_buf,
 947                                    offset_list, len_list, (unsigned*)recv_size, 
 948                                    requests, statuses, recd_from_proc, 
 949                                    nprocs, contig_access_count,
 950                                    min_st_offset, fd_size, fd_start, fd_end,
 951                                    buftype_extent);
 952     }
 953 
 954     /* wait on the sends*/
 955     MPI_Waitall(nprocs_send, requests+nprocs_recv, statuses+nprocs_recv);
 956 
 957     ADIOI_Free(statuses);
 958     ADIOI_Free(requests);
 959 
 960     if (!buftype_is_contig) {
 961         for (i=0; i < nprocs; i++) 
 962             if (recv_size[i]) ADIOI_Free(recv_buf[i]);
 963         ADIOI_Free(recv_buf);
 964     }
 965 #ifdef AGGREGATION_PROFILE
 966     MPE_Log_event (5033, 0, NULL);
 967 #endif
 968 }
 969 
 970 #define ADIOI_BUF_INCR \
 971 { \
 972     while (buf_incr) { \
 973         size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \
 974         user_buf_idx += size_in_buf; \
 975         flat_buf_sz -= size_in_buf; \
 976         if (!flat_buf_sz) { \
 977             if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
 978             else { \
 979                 flat_buf_idx = 0; \
 980                 n_buftypes++; \
 981             } \
 982             user_buf_idx = flat_buf->indices[flat_buf_idx] + \
 983                               (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
 984             flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
 985         } \
 986         buf_incr -= size_in_buf; \
 987     } \
 988 }
 989 
 990 
 991 #define ADIOI_BUF_COPY \
 992 { \
 993     while (size) { \
 994         size_in_buf = ADIOI_MIN(size, flat_buf_sz); \
 995   ADIOI_Assert((((ADIO_Offset)(MPIU_Upint)buf) + user_buf_idx) == (ADIO_Offset)(MPIU_Upint)(buf + user_buf_idx)); \
 996   ADIOI_Assert(size_in_buf == (size_t)size_in_buf); \
 997         memcpy(((char *) buf) + user_buf_idx, \
 998                &(recv_buf[p][recv_buf_idx[p]]), size_in_buf); \
 999         recv_buf_idx[p] += size_in_buf; /* already tested (size_t)size_in_buf*/ \
1000         user_buf_idx += size_in_buf; \
1001         flat_buf_sz -= size_in_buf; \
1002         if (!flat_buf_sz) { \
1003             if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
1004             else { \
1005                 flat_buf_idx = 0; \
1006                 n_buftypes++; \
1007             } \
1008             user_buf_idx = flat_buf->indices[flat_buf_idx] + \
1009                               (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
1010             flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
1011         } \
1012         size -= size_in_buf; \
1013         buf_incr -= size_in_buf; \
1014     } \
1015     ADIOI_BUF_INCR \
1016 }
1017 
1018 static void ADIOI_Fill_user_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
1019                                    *flat_buf, char **recv_buf, ADIO_Offset 
1020                                    *offset_list, ADIO_Offset *len_list, 
1021                                    unsigned *recv_size, 
1022                                    MPI_Request *requests, MPI_Status *statuses,
1023                                    int *recd_from_proc, int nprocs,
1024                                    int contig_access_count, 
1025                                    ADIO_Offset min_st_offset, 
1026                                    ADIO_Offset fd_size, ADIO_Offset *fd_start, 
1027                                    ADIO_Offset *fd_end,
1028                                    MPI_Aint buftype_extent)
1029 {
1030 
1031 /* this function is only called if buftype is not contig */
1032 
1033     int i, p, flat_buf_idx;
1034     ADIO_Offset flat_buf_sz, size_in_buf, buf_incr, size;
1035     int n_buftypes;
1036     ADIO_Offset off, len, rem_len, user_buf_idx;
1037     /* Not sure unsigned is necessary, but it makes the math safer */
1038     unsigned *curr_from_proc, *done_from_proc, *recv_buf_idx;
1039 
1040     ADIOI_UNREFERENCED_ARG(requests);
1041     ADIOI_UNREFERENCED_ARG(statuses);
1042 
1043 /*  curr_from_proc[p] = amount of data recd from proc. p that has already
1044                         been accounted for so far
1045     done_from_proc[p] = amount of data already recd from proc. p and 
1046                         filled into user buffer in previous iterations
1047     user_buf_idx = current location in user buffer 
1048     recv_buf_idx[p] = current location in recv_buf of proc. p  */
1049     curr_from_proc = (unsigned *) ADIOI_Malloc(nprocs * sizeof(unsigned));
1050     done_from_proc = (unsigned *) ADIOI_Malloc(nprocs * sizeof(unsigned));
1051     recv_buf_idx   = (unsigned *) ADIOI_Malloc(nprocs * sizeof(unsigned));
1052 
1053     for (i=0; i < nprocs; i++) {
1054         recv_buf_idx[i] = curr_from_proc[i] = 0;
1055         done_from_proc[i] = recd_from_proc[i];
1056     }
1057 
1058     user_buf_idx = flat_buf->indices[0];
1059     flat_buf_idx = 0;
1060     n_buftypes = 0;
1061     flat_buf_sz = flat_buf->blocklens[0];
1062 
1063     /* flat_buf_idx = current index into flattened buftype
1064        flat_buf_sz = size of current contiguous component in 
1065                 flattened buf */
1066 
1067     for (i=0; i<contig_access_count; i++) { 
1068         off     = offset_list[i];
1069         rem_len = len_list[i];
1070 
1071         /* this request may span the file domains of more than one process */
1072         while (rem_len > 0) {
1073             len = rem_len;
1074             /* NOTE: len value is modified by ADIOI_Calc_aggregator() to be no
1075              * longer than the single region that processor "p" is responsible
1076              * for.
1077              */
1078             p = ADIOI_GPFS_Calc_aggregator(fd,
1079                                       off,
1080                                       min_st_offset,
1081                                       &len,
1082                                       fd_size,
1083                                       fd_start,
1084                                       fd_end);
1085 
1086             if (recv_buf_idx[p] < recv_size[p]) {
1087                 if (curr_from_proc[p]+len > done_from_proc[p]) {
1088                     if (done_from_proc[p] > curr_from_proc[p]) {
1089                         size = ADIOI_MIN(curr_from_proc[p] + len - 
1090                               done_from_proc[p], recv_size[p]-recv_buf_idx[p]);
1091                         buf_incr = done_from_proc[p] - curr_from_proc[p];
1092                         ADIOI_BUF_INCR
1093                         buf_incr = curr_from_proc[p]+len-done_from_proc[p];
1094       ADIOI_Assert((done_from_proc[p] + size) == (unsigned)((ADIO_Offset)done_from_proc[p] + size));
1095                         curr_from_proc[p] = done_from_proc[p] + size;
1096                         ADIOI_BUF_COPY
1097                     }
1098                     else {
1099                         size = ADIOI_MIN(len,recv_size[p]-recv_buf_idx[p]);
1100                         buf_incr = len;
1101       ADIOI_Assert((curr_from_proc[p] + size) == (unsigned)((ADIO_Offset)curr_from_proc[p] + size));
1102                         curr_from_proc[p] += (unsigned) size;
1103                         ADIOI_BUF_COPY
1104                     }
1105                 }
1106                 else {
1107         ADIOI_Assert((curr_from_proc[p] + len) == (unsigned)((ADIO_Offset)curr_from_proc[p] + len));
1108                     curr_from_proc[p] += (unsigned) len;
1109                     buf_incr = len;
1110                     ADIOI_BUF_INCR
1111                 }
1112             }
1113             else {
1114                 buf_incr = len;
1115                 ADIOI_BUF_INCR
1116             }
1117             off     += len;
1118             rem_len -= len;
1119         }
1120     }
1121     for (i=0; i < nprocs; i++) 
1122         if (recv_size[i]) recd_from_proc[i] = curr_from_proc[i];
1123 
1124     ADIOI_Free(curr_from_proc);
1125     ADIOI_Free(done_from_proc);
1126     ADIOI_Free(recv_buf_idx);
1127 }
1128 
1129 static void ADIOI_R_Exchange_data_alltoallv(
1130                 ADIO_File fd, void *buf, ADIOI_Flatlist_node
1131                 *flat_buf, ADIO_Offset *offset_list, ADIO_Offset
1132                 *len_list, int *send_size, int *recv_size, 
1133                 int *count, int *start_pos, int *partial_send,
1134                 int *recd_from_proc, int nprocs,
1135                 int myrank, int
1136                 buftype_is_contig, int contig_access_count,
1137                 ADIO_Offset min_st_offset, ADIO_Offset fd_size,
1138                 ADIO_Offset *fd_start, ADIO_Offset *fd_end, 
1139                 ADIOI_Access *others_req,
1140                 int iter, MPI_Aint buftype_extent, int *buf_idx)
1141 {   
1142     int i, j, k=0, tmp=0, nprocs_recv, nprocs_send;
1143     char **recv_buf = NULL;
1144     MPI_Request *requests=NULL;
1145     MPI_Status *statuses=NULL;
1146     int rtail, stail;
1147     char *sbuf_ptr, *from_ptr;
1148     int  len;
1149     int  *sdispls, *rdispls;
1150     char *all_recv_buf, *all_send_buf;
1151 
1152   /* exchange send_size info so that each process knows how much to
1153      receive from whom and how much memory to allocate. */
1154     MPI_Alltoall(send_size, 1, MPI_INT, recv_size, 1, MPI_INT, fd->comm);
1155     
1156     nprocs_recv = 0;
1157     for (i=0; i<nprocs; i++) if (recv_size[i]) { nprocs_recv++; break; }
1158     
1159     nprocs_send = 0;
1160     for (i=0; i<nprocs; i++) if (send_size[i]) { nprocs_send++; break; }
1161     
1162   /* receiver side data structures */
1163     rdispls = (int *) ADIOI_Malloc( nprocs * sizeof(int) );
1164     rtail = 0;
1165     for (i=0; i<nprocs; i++) { rdispls[i] = rtail; rtail += recv_size[i]; }
1166 
1167         /* data buffer */
1168     all_recv_buf = (char *) ADIOI_Malloc( rtail );
1169     recv_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char *));
1170     for (i=0; i<nprocs; i++) { recv_buf[i] = all_recv_buf + rdispls[i]; }
1171 
1172   /* sender side data structures */
1173     sdispls = (int *) ADIOI_Malloc( nprocs * sizeof(int) );
1174     stail = 0;
1175     for (i=0; i<nprocs; i++) { sdispls[i] = stail; stail += send_size[i]; }
1176 
1177         /* data buffer */
1178     all_send_buf = (char *) ADIOI_Malloc( stail );
1179     for (i=0; i<nprocs; i++)
1180     {
1181         if (send_size[i]) {
1182             if (partial_send[i]) {
1183                 k = start_pos[i] + count[i] - 1;
1184                 tmp = others_req[i].lens[k];
1185                 others_req[i].lens[k] = partial_send[i];
1186             }
1187             sbuf_ptr = all_send_buf + sdispls[i];
1188             for (j=0; j<count[i]; j++) {
1189                 ADIOI_ENSURE_AINT_FITS_IN_PTR( others_req[i].mem_ptrs[ start_pos[i]+j ]);
1190                 from_ptr = (char *) ADIOI_AINT_CAST_TO_VOID_PTR ( others_req[i].mem_ptrs[ start_pos[i]+j ] );
1191                 len      =           others_req[i].lens[     start_pos[i]+j ]  ;
1192                 memcpy( sbuf_ptr, from_ptr, len );
1193                 sbuf_ptr += len;
1194             }
1195             if (partial_send[i]) others_req[i].lens[k] = tmp;
1196         }
1197     }
1198 
1199 #if RDCOLL_DEBUG
1200     DBG_FPRINTF(stderr, "\tsend_size = [%d]%2d,",0,send_size[0]);
1201     for (i=1; i<nprocs; i++) if(send_size[i-1]!=send_size[i]){ DBG_FPRINTF(stderr, "\t\t[%d]%2d,", i,send_size[i] ); }
1202     DBG_FPRINTF(stderr, "\trecv_size =  [%d]%2d,",0,recv_size[0]);
1203     for (i=1; i<nprocs; i++) if(recv_size[i-1]!=recv_size[i]){ DBG_FPRINTF(stderr, "\t\t[%d]%2d,", i,recv_size[i] ); }
1204     DBG_FPRINTF(stderr, "\tsdispls   =  [%d]%2d,",0,sdispls[0]);
1205     for (i=1; i<nprocs; i++) if(sdispls[i-1]!=sdispls[i]){ DBG_FPRINTF(stderr, "\t\t[%d]%2d,", i,sdispls  [i] ); }
1206     DBG_FPRINTF(stderr, "\trdispls   =  [%d]%2d,",0,rdispls[0]);
1207     for (i=1; i<nprocs; i++) if(rdispls[i-1]!=rdispls[i]){ DBG_FPRINTF(stderr, "\t\t[%d]%2d,", i,rdispls  [i] ); }
1208     DBG_FPRINTF(stderr, "\ttails = %4d, %4d\n", stail, rtail );
1209     if (nprocs_send) {
1210     DBG_FPRINTF(stderr, "\tall_send_buf =  [%d]%2d,",0,all_send_buf[0]);
1211     /* someone at some point found it useful to look at the 128th kilobyte of data from each processor, but this segfaults in many situations if "all debugging" enabled */
1212     //for (i=1; i<nprocs; i++) if(all_send_buf[(i-1)*131072]!=all_send_buf[i*131072]){ DBG_FPRINTF(stderr, "\t\t[%d]%2d,", i, all_send_buf  [i*131072] ); }
1213     }
1214 #endif
1215     
1216   /* alltoallv */
1217     MPI_Alltoallv( 
1218             all_send_buf, send_size, sdispls, MPI_BYTE,
1219             all_recv_buf, recv_size, rdispls, MPI_BYTE,
1220             fd->comm ); 
1221 
1222 #if 0
1223     DBG_FPRINTF(stderr, "\tall_recv_buf = " );
1224     for (i=131072; i<131073; i++) { DBG_FPRINTF(stderr, "%2d,", all_recv_buf  [i] ); }
1225     DBG_FPRINTF(stderr, "\n" );
1226 #endif
1227     
1228   /* unpack at the receiver side */
1229     if (nprocs_recv) { 
1230         if (!buftype_is_contig)
1231             ADIOI_Fill_user_buffer(fd, buf, flat_buf, recv_buf,
1232                                    offset_list, len_list, (unsigned*)recv_size,
1233                                    requests, statuses,          /* never used inside */
1234                                    recd_from_proc,
1235                                    nprocs, contig_access_count,
1236                                    min_st_offset, fd_size, fd_start, fd_end,
1237                                buftype_extent);
1238         else {
1239             rtail = 0;
1240             for (i=0; i < nprocs; i++)
1241                 if (recv_size[i]) {
1242                     memcpy( (char *)buf + buf_idx[i], all_recv_buf + rtail, recv_size[i] );
1243                     buf_idx[i] += recv_size[i];
1244                     rtail += recv_size[i];
1245                 }
1246         }
1247     }
1248     
1249     ADIOI_Free( all_send_buf );
1250     ADIOI_Free( all_recv_buf );
1251     ADIOI_Free( recv_buf  );
1252     ADIOI_Free( sdispls );
1253     ADIOI_Free( rdispls );
1254     return; 
1255 }   

/* [<][>][^][v][top][bottom][index][help] */