root/ompi/mca/io/romio321/romio/adio/ad_gpfs/ad_gpfs_wrcoll.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. ADIOI_GPFS_WriteStridedColl
  2. gpfs_wr_access_start
  3. gpfs_wr_access_end
  4. gpfs_find_access_for_ion
  5. ADIOI_Exch_and_write
  6. ADIOI_W_Exchange_data
  7. ADIOI_Fill_send_buffer
  8. ADIOI_Heap_merge
  9. ADIOI_W_Exchange_data_alltoallv
  10. ADIOI_Fill_send_buffer_nosend

   1 /* ---------------------------------------------------------------- */
   2 /* (C)Copyright IBM Corp.  2007, 2008                               */
   3 /* ---------------------------------------------------------------- */
   4 /**
   5  * \file ad_gpfs_wrcoll.c
   6  * \brief ???
   7  */
   8 
   9 /* -*- Mode: C; c-basic-offset:4 ; -*- */
  10 /* 
  11  *   Copyright (C) 1997 University of Chicago. 
  12  *   See COPYRIGHT notice in top-level directory.
  13  */
  14 
  15 #include "adio.h"
  16 #include "adio_extern.h"
  17 #include "ad_gpfs.h"
  18 #include "ad_gpfs_aggrs.h"
  19 
  20 #ifdef BGQPLATFORM
  21 #include "bg/ad_bg_pset.h"
  22 #endif
  23 
  24 #ifdef AGGREGATION_PROFILE
  25 #include "mpe.h"
  26 #endif
  27 #ifdef PROFILE
  28 #include "mpe.h"
  29 #endif
  30 
  31 #include <pthread.h>
  32 
  33 #ifdef HAVE_GPFS_H
  34 #include <gpfs.h>
  35 #endif
  36 #ifdef HAVE_GPFS_FCNTL_H
  37 #include <gpfs_fcntl.h>
  38 #endif
  39 
  40 #include <limits.h>
  41 /* prototypes of functions used for collective writes only. */
  42 static void ADIOI_Exch_and_write(ADIO_File fd, const void *buf, MPI_Datatype
  43                          datatype, int nprocs, int myrank, ADIOI_Access
  44                          *others_req, ADIO_Offset *offset_list,
  45                          ADIO_Offset *len_list, int contig_access_count, ADIO_Offset
  46                          min_st_offset, ADIO_Offset fd_size,
  47                          ADIO_Offset *fd_start, ADIO_Offset *fd_end,
  48                          int *buf_idx, int *error_code);
  49 static void ADIOI_W_Exchange_data(ADIO_File fd, const void *buf, char *write_buf,
  50                          ADIOI_Flatlist_node *flat_buf, ADIO_Offset 
  51                          *offset_list, ADIO_Offset *len_list, int *send_size, 
  52                          int *recv_size, ADIO_Offset off, int size,
  53                          int *count, int *start_pos, int *partial_recv, 
  54                          int *sent_to_proc, int nprocs, 
  55                          int myrank, int
  56                          buftype_is_contig, int contig_access_count,
  57                          ADIO_Offset min_st_offset, ADIO_Offset fd_size,
  58                          ADIO_Offset *fd_start, ADIO_Offset *fd_end, 
  59                          ADIOI_Access *others_req, 
  60                          int *send_buf_idx, int *curr_to_proc,
  61                          int *done_to_proc, int *hole, int iter, 
  62                          MPI_Aint buftype_extent, int *buf_idx, int *error_code);
  63 static void ADIOI_W_Exchange_data_alltoallv(
  64                 ADIO_File fd, const void *buf, 
  65                 char *write_buf,                                        /* 1 */
  66                 ADIOI_Flatlist_node *flat_buf, 
  67                 ADIO_Offset *offset_list, 
  68                 ADIO_Offset *len_list, int *send_size, int *recv_size, 
  69                 ADIO_Offset off, int size,                              /* 2 */
  70                 int *count, int *start_pos, int *partial_recv,
  71                 int *sent_to_proc, int nprocs, int myrank, 
  72                 int buftype_is_contig, int contig_access_count,
  73                 ADIO_Offset min_st_offset,
  74                 ADIO_Offset fd_size,
  75                 ADIO_Offset *fd_start, 
  76                 ADIO_Offset *fd_end,
  77                 ADIOI_Access *others_req,
  78                 int *send_buf_idx, int *curr_to_proc,                   /* 3 */
  79                 int *done_to_proc, int *hole,                           /* 4 */
  80                 int iter, MPI_Aint buftype_extent, int *buf_idx,
  81                 int *error_code);
  82 static void ADIOI_Fill_send_buffer(ADIO_File fd, const void *buf, ADIOI_Flatlist_node
  83                            *flat_buf, char **send_buf, ADIO_Offset 
  84                            *offset_list, ADIO_Offset *len_list, int *send_size, 
  85                            MPI_Request *requests, int *sent_to_proc, 
  86                            int nprocs, int myrank, 
  87                            int contig_access_count, ADIO_Offset
  88                            min_st_offset, ADIO_Offset fd_size,
  89                            ADIO_Offset *fd_start, ADIO_Offset *fd_end, 
  90                            int *send_buf_idx, int *curr_to_proc, 
  91                            int *done_to_proc, int iter, 
  92                            MPI_Aint buftype_extent);
  93 static void ADIOI_Fill_send_buffer_nosend(ADIO_File fd, const void *buf, ADIOI_Flatlist_node
  94                            *flat_buf, char **send_buf, ADIO_Offset 
  95                            *offset_list, ADIO_Offset *len_list, int *send_size, 
  96                            MPI_Request *requests, int *sent_to_proc, 
  97                            int nprocs, int myrank, 
  98                            int contig_access_count, ADIO_Offset
  99                            min_st_offset, ADIO_Offset fd_size,
 100                            ADIO_Offset *fd_start, ADIO_Offset *fd_end, 
 101                            int *send_buf_idx, int *curr_to_proc, 
 102                            int *done_to_proc, int iter, 
 103                            MPI_Aint buftype_extent);
 104 static void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count, 
 105                       ADIO_Offset *srt_off, int *srt_len, int *start_pos,
 106                       int nprocs, int nprocs_recv, int total_elements);
 107 
 108 
 109 void ADIOI_GPFS_WriteStridedColl(ADIO_File fd, const void *buf, int count,
 110                        MPI_Datatype datatype, int file_ptr_type,
 111                        ADIO_Offset offset, ADIO_Status *status, int
 112                        *error_code)
 113 {
 114 /* Uses a generalized version of the extended two-phase method described
 115    in "An Extended Two-Phase Method for Accessing Sections of 
 116    Out-of-Core Arrays", Rajeev Thakur and Alok Choudhary,
 117    Scientific Programming, (5)4:301--317, Winter 1996. 
 118    http://www.mcs.anl.gov/home/thakur/ext2ph.ps */
 119 
 120     ADIOI_Access *my_req; 
 121     /* array of nprocs access structures, one for each other process in
 122        whose file domain this process's request lies */
 123     
 124     ADIOI_Access *others_req;
 125     /* array of nprocs access structures, one for each other process
 126        whose request lies in this process's file domain. */
 127 
 128     int i, filetype_is_contig, nprocs, nprocs_for_coll, myrank;
 129     int contig_access_count=0, interleave_count = 0, buftype_is_contig;
 130     int *count_my_req_per_proc, count_my_req_procs, count_others_req_procs;
 131     ADIO_Offset orig_fp, start_offset, end_offset, fd_size, min_st_offset, off;
 132     ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *fd_start = NULL,
 133         *fd_end = NULL, *end_offsets = NULL;
 134     ADIO_Offset *gpfs_offsets0 = NULL, *gpfs_offsets = NULL;
 135     ADIO_Offset *count_sizes;
 136     int  ii;
 137 
 138     int *buf_idx = NULL;
 139     ADIO_Offset *len_list = NULL;
 140     GPFSMPIO_T_CIO_RESET( w )
 141 #ifdef PROFILE
 142         MPE_Log_event(13, 0, "start computation");
 143 #endif
 144 
 145     MPI_Comm_size(fd->comm, &nprocs);
 146     MPI_Comm_rank(fd->comm, &myrank);
 147 
 148 /* the number of processes that actually perform I/O, nprocs_for_coll,
 149  * is stored in the hints off the ADIO_File structure
 150  */
 151     nprocs_for_coll = fd->hints->cb_nodes;
 152     orig_fp = fd->fp_ind;
 153 
 154     GPFSMPIO_T_CIO_SET_GET( w, 1, 0, GPFSMPIO_CIO_T_MPIO_CRW, GPFSMPIO_CIO_LAST)
 155     GPFSMPIO_T_CIO_SET_GET( w, 1, 0, GPFSMPIO_CIO_T_LCOMP, GPFSMPIO_CIO_LAST )
 156 
 157 
 158     /* only check for interleaving if cb_write isn't disabled */
 159     if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
 160         /* For this process's request, calculate the list of offsets and
 161            lengths in the file and determine the start and end offsets. */
 162 
 163         /* Note: end_offset points to the last byte-offset that will be accessed.
 164            e.g., if start_offset=0 and 100 bytes to be read, end_offset=99*/
 165 
 166         ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,
 167                               &offset_list, &len_list, &start_offset,
 168                               &end_offset, &contig_access_count); 
 169 
 170     GPFSMPIO_T_CIO_SET_GET( w, 1, 1, GPFSMPIO_CIO_T_GATHER, GPFSMPIO_CIO_T_LCOMP )
 171 
 172         /* each process communicates its start and end offsets to other 
 173            processes. The result is an array each of start and end offsets stored
 174            in order of process rank. */ 
 175 
 176         st_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
 177         end_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
 178 
 179     ADIO_Offset my_count_size=0;
 180     /* One-sided aggregation needs the amount of data per rank as well because
 181      * the difference in starting and ending offsets for 1 byte is 0 the same
 182      * as 0 bytes so it cannot be distiguished.
 183      */
 184     if ((gpfsmpio_write_aggmethod == 1) || (gpfsmpio_write_aggmethod == 2)) {
 185         count_sizes = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
 186         MPI_Count buftype_size;
 187         MPI_Type_size_x(datatype, &buftype_size);
 188         my_count_size = (ADIO_Offset) count  * (ADIO_Offset)buftype_size;
 189     }
 190     if (gpfsmpio_tunegather) {
 191       if ((gpfsmpio_write_aggmethod == 1) || (gpfsmpio_write_aggmethod == 2)) {
 192         gpfs_offsets0 = (ADIO_Offset *) ADIOI_Malloc(3*nprocs*sizeof(ADIO_Offset));
 193         gpfs_offsets  = (ADIO_Offset *) ADIOI_Malloc(3*nprocs*sizeof(ADIO_Offset));
 194         for (ii=0; ii<nprocs; ii++)  {
 195           gpfs_offsets0[ii*3]   = 0;
 196           gpfs_offsets0[ii*3+1] = 0;
 197           gpfs_offsets0[ii*3+2] = 0;
 198         }
 199         gpfs_offsets0[myrank*3]   = start_offset;
 200         gpfs_offsets0[myrank*3+1] =   end_offset;
 201         gpfs_offsets0[myrank*3+2] =   my_count_size;
 202         MPI_Allreduce( gpfs_offsets0, gpfs_offsets, nprocs*3, ADIO_OFFSET, MPI_MAX, fd->comm );
 203         for (ii=0; ii<nprocs; ii++)  {
 204           st_offsets [ii] = gpfs_offsets[ii*3]  ;
 205           end_offsets[ii] = gpfs_offsets[ii*3+1];
 206           count_sizes[ii] = gpfs_offsets[ii*3+2];
 207         }
 208       }
 209       else {
 210             gpfs_offsets0 = (ADIO_Offset *) ADIOI_Malloc(2*nprocs*sizeof(ADIO_Offset));
 211             gpfs_offsets  = (ADIO_Offset *) ADIOI_Malloc(2*nprocs*sizeof(ADIO_Offset));
 212             for (ii=0; ii<nprocs; ii++)  {
 213                 gpfs_offsets0[ii*2]   = 0;
 214                 gpfs_offsets0[ii*2+1] = 0;
 215             }
 216             gpfs_offsets0[myrank*2]   = start_offset;
 217             gpfs_offsets0[myrank*2+1] =   end_offset;
 218 
 219         MPI_Allreduce( gpfs_offsets0, gpfs_offsets, nprocs*2, ADIO_OFFSET, MPI_MAX, fd->comm );
 220 
 221             for (ii=0; ii<nprocs; ii++)  {
 222                 st_offsets [ii] = gpfs_offsets[ii*2]  ;
 223                 end_offsets[ii] = gpfs_offsets[ii*2+1];
 224             }
 225       }
 226             ADIOI_Free( gpfs_offsets0 );
 227             ADIOI_Free( gpfs_offsets  );
 228     } else {
 229         MPI_Allgather(&start_offset, 1, ADIO_OFFSET, st_offsets, 1,
 230                       ADIO_OFFSET, fd->comm);
 231         MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1,
 232                       ADIO_OFFSET, fd->comm);
 233         if ((gpfsmpio_write_aggmethod == 1) || (gpfsmpio_write_aggmethod == 2)) {
 234             MPI_Allgather(&count_sizes, 1, ADIO_OFFSET, count_sizes, 1,
 235                      ADIO_OFFSET, fd->comm);
 236         }
 237     }
 238 
 239     GPFSMPIO_T_CIO_SET_GET(w, 1, 1, GPFSMPIO_CIO_T_PATANA, GPFSMPIO_CIO_T_GATHER )
 240 
 241         /* are the accesses of different processes interleaved? */
 242         for (i=1; i<nprocs; i++)
 243       if ((st_offsets[i] < end_offsets[i-1]) &&
 244                 (st_offsets[i] <= end_offsets[i]))
 245                 interleave_count++;
 246         /* This is a rudimentary check for interleaving, but should suffice
 247            for the moment. */
 248     }
 249 
 250     ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
 251 
 252     if (fd->hints->cb_write == ADIOI_HINT_DISABLE ||
 253         (!interleave_count && (fd->hints->cb_write == ADIOI_HINT_AUTO)))
 254     {
 255         /* use independent accesses */
 256         if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
 257             ADIOI_Free(offset_list);
 258             ADIOI_Free(len_list);
 259             ADIOI_Free(st_offsets);
 260             ADIOI_Free(end_offsets);
 261         }
 262 
 263         fd->fp_ind = orig_fp;
 264         ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
 265 
 266         if (buftype_is_contig && filetype_is_contig) {
 267 
 268             if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
 269                 off = fd->disp + (ADIO_Offset)(fd->etype_size) * offset;
 270                 ADIO_WriteContig(fd, buf, count, datatype,
 271                                  ADIO_EXPLICIT_OFFSET,
 272                                  off, status, error_code);
 273             }
 274             else ADIO_WriteContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
 275                                   0, status, error_code);
 276         }
 277         else ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type,
 278                                offset, status, error_code);
 279 
 280         return;
 281     }
 282 
 283     GPFSMPIO_T_CIO_SET_GET( w, 1, 1, GPFSMPIO_CIO_T_FD_PART, GPFSMPIO_CIO_T_PATANA )
 284 
 285 /* Divide the I/O workload among "nprocs_for_coll" processes. This is
 286    done by (logically) dividing the file into file domains (FDs); each
 287    process may directly access only its own file domain. */
 288 
 289     int currentValidDataIndex = 0;
 290     if ((gpfsmpio_write_aggmethod == 1) || (gpfsmpio_write_aggmethod == 2)) {
 291       /* Take out the 0-data offsets by shifting the indexes with data to the front
 292        * and keeping track of the valid data index for use as the length.
 293        */
 294       for (i=0; i<nprocs; i++) {
 295         if (count_sizes[i] > 0) {
 296           st_offsets[currentValidDataIndex] = st_offsets[i];
 297           end_offsets[currentValidDataIndex] = end_offsets[i];
 298           currentValidDataIndex++;
 299         }
 300       }
 301     }
 302 
 303     if (gpfsmpio_tuneblocking) {
 304         if ((gpfsmpio_write_aggmethod == 1) || (gpfsmpio_write_aggmethod == 2)) {
 305             ADIOI_GPFS_Calc_file_domains(fd, st_offsets, end_offsets,
 306                     currentValidDataIndex,
 307                     nprocs_for_coll, &min_st_offset,
 308                     &fd_start, &fd_end, &fd_size, fd->fs_ptr);
 309         }
 310         else {
 311 
 312     ADIOI_GPFS_Calc_file_domains(fd, st_offsets, end_offsets, nprocs,
 313                             nprocs_for_coll, &min_st_offset,
 314                             &fd_start, &fd_end, &fd_size, fd->fs_ptr);   
 315         }
 316     }
 317     else {
 318         if ((gpfsmpio_write_aggmethod == 1) || (gpfsmpio_write_aggmethod == 2)) {
 319             ADIOI_Calc_file_domains(st_offsets, end_offsets, currentValidDataIndex,
 320                     nprocs_for_coll, &min_st_offset,
 321                     &fd_start, &fd_end,
 322                     fd->hints->min_fdomain_size, &fd_size,
 323                     fd->hints->striping_unit);
 324         }
 325         else {
 326     ADIOI_Calc_file_domains(st_offsets, end_offsets, nprocs,
 327                             nprocs_for_coll, &min_st_offset,
 328                             &fd_start, &fd_end,
 329                             fd->hints->min_fdomain_size, &fd_size,
 330                             fd->hints->striping_unit);   
 331         }
 332     }
 333 
 334     GPFSMPIO_T_CIO_SET_GET( w, 1, 1, GPFSMPIO_CIO_T_MYREQ, GPFSMPIO_CIO_T_FD_PART );
 335 
 336     if ((gpfsmpio_write_aggmethod == 1) || (gpfsmpio_write_aggmethod == 2)) {
 337     /* If the user has specified to use a one-sided aggregation method then do that at
 338      * this point instead of the two-phase I/O.
 339      */
 340       int holeFound = 0;
 341       ADIOI_OneSidedWriteAggregation(fd, offset_list, len_list, contig_access_count,
 342               buf, datatype, error_code, st_offsets, end_offsets,
 343               currentValidDataIndex, fd_start, fd_end, &holeFound);
 344       int anyHolesFound = 0;
 345       if (!gpfsmpio_onesided_no_rmw)
 346         MPI_Allreduce(&holeFound, &anyHolesFound, 1, MPI_INT, MPI_MAX, fd->comm);
 347       if (anyHolesFound == 0) {
 348       GPFSMPIO_T_CIO_REPORT( 1, fd, myrank, nprocs)
 349       ADIOI_Free(offset_list);
 350       ADIOI_Free(len_list);
 351       ADIOI_Free(st_offsets);
 352       ADIOI_Free(end_offsets);
 353       ADIOI_Free(fd_start);
 354       ADIOI_Free(fd_end);
 355       ADIOI_Free(count_sizes);
 356           goto fn_exit;
 357           }
 358       else {
 359         /* Holes are found in the data and the user has not set
 360          * gpfsmpio_onesided_no_rmw --- set gpfsmpio_onesided_always_rmw to 1
 361          * and re-call ADIOI_OneSidedWriteAggregation and if the user has
 362          * gpfsmpio_onesided_inform_rmw set then inform him of this condition
 363          * and behavior.
 364          */
 365 
 366         if (gpfsmpio_onesided_inform_rmw && (myrank ==0))
 367           FPRINTF(stderr,"Information: Holes found during one-sided "
 368                   "write aggregation algorithm --- re-running one-sided "
 369                   "write aggregation with GPFSMPIO_ONESIDED_ALWAYS_RMW set to 1.\n");
 370           gpfsmpio_onesided_always_rmw = 1;
 371           int prev_gpfsmpio_onesided_no_rmw = gpfsmpio_onesided_no_rmw;
 372           gpfsmpio_onesided_no_rmw = 1;
 373           ADIOI_OneSidedWriteAggregation(fd, offset_list, len_list, contig_access_count, buf, datatype, error_code, st_offsets, end_offsets, currentValidDataIndex, fd_start, fd_end, &holeFound);
 374           gpfsmpio_onesided_no_rmw = prev_gpfsmpio_onesided_no_rmw;
 375           GPFSMPIO_T_CIO_REPORT( 1, fd, myrank, nprocs)
 376           ADIOI_Free(offset_list);
 377           ADIOI_Free(len_list);
 378           ADIOI_Free(st_offsets);
 379           ADIOI_Free(end_offsets);
 380           ADIOI_Free(fd_start);
 381           ADIOI_Free(fd_end);
 382           ADIOI_Free(count_sizes);
 383           goto fn_exit;
 384       }
 385     }
 386     if (gpfsmpio_p2pcontig==1) {
 387         /* For some simple yet common(?) workloads, full-on two-phase I/O is overkill.  We can establish sub-groups of processes and their aggregator, and then these sub-groups will carry out a simplified two-phase over that sub-group.
 388          *
 389          * First verify that the filetype is contig and the offsets are
 390          * increasing in rank order*/
 391         int inOrderAndNoGaps = 1;
 392         for (i=0;i<(nprocs-1);i++) {
 393             if (end_offsets[i] != (st_offsets[i+1]-1))
 394                 inOrderAndNoGaps = 0;
 395         }
 396         if (inOrderAndNoGaps && buftype_is_contig) {
 397             /* if these conditions exist then execute the P2PContig code else
 398              * execute the original code */
 399             ADIOI_P2PContigWriteAggregation(fd, buf,
 400                     error_code, st_offsets, end_offsets, fd_start, fd_end);
 401             /* NOTE: we are skipping the rest of two-phase in this path */
 402             GPFSMPIO_T_CIO_REPORT( 1, fd, myrank, nprocs)
 403 
 404             ADIOI_Free(offset_list);
 405             ADIOI_Free(len_list);
 406             ADIOI_Free(st_offsets);
 407             ADIOI_Free(end_offsets);
 408             ADIOI_Free(fd_start);
 409             ADIOI_Free(fd_end);
 410 
 411             goto fn_exit;
 412         }
 413     }
 414 
 415 /* calculate what portions of the access requests of this process are
 416    located in what file domains */
 417 
 418     if (gpfsmpio_tuneblocking)
 419     ADIOI_GPFS_Calc_my_req(fd, offset_list, len_list, contig_access_count,
 420                       min_st_offset, fd_start, fd_end, fd_size,
 421                       nprocs, &count_my_req_procs, 
 422                       &count_my_req_per_proc, &my_req,
 423                       &buf_idx); 
 424     else
 425     ADIOI_Calc_my_req(fd, offset_list, len_list, contig_access_count,
 426                       min_st_offset, fd_start, fd_end, fd_size,
 427                       nprocs, &count_my_req_procs, 
 428                       &count_my_req_per_proc, &my_req,
 429                       &buf_idx);
 430 
 431     GPFSMPIO_T_CIO_SET_GET( w, 1, 1, GPFSMPIO_CIO_T_OTHREQ, GPFSMPIO_CIO_T_MYREQ )
 432 
 433 /* based on everyone's my_req, calculate what requests of other
 434    processes lie in this process's file domain.
 435    count_others_req_procs = number of processes whose requests lie in
 436    this process's file domain (including this process itself) 
 437    count_others_req_per_proc[i] indicates how many separate contiguous
 438    requests of proc. i lie in this process's file domain. */
 439 
 440     if (gpfsmpio_tuneblocking)
 441         ADIOI_GPFS_Calc_others_req(fd, count_my_req_procs,
 442                               count_my_req_per_proc, my_req,
 443                               nprocs, myrank,
 444                               &count_others_req_procs, &others_req);
 445     else
 446     ADIOI_Calc_others_req(fd, count_my_req_procs, 
 447                           count_my_req_per_proc, my_req, 
 448                           nprocs, myrank,
 449                           &count_others_req_procs, &others_req);
 450 
 451     GPFSMPIO_T_CIO_SET_GET( w, 1, 1, GPFSMPIO_CIO_T_DEXCH, GPFSMPIO_CIO_T_OTHREQ )
 452 
 453     ADIOI_Free(count_my_req_per_proc);
 454     for (i=0; i < nprocs; i++) {
 455         if (my_req[i].count) {
 456             ADIOI_Free(my_req[i].offsets);
 457             ADIOI_Free(my_req[i].lens);
 458         }
 459     }
 460     ADIOI_Free(my_req);
 461 
 462 /* exchange data and write in sizes of no more than coll_bufsize. */
 463     ADIOI_Exch_and_write(fd, buf, datatype, nprocs, myrank,
 464                         others_req, offset_list,
 465                         len_list, contig_access_count, min_st_offset,
 466                         fd_size, fd_start, fd_end, buf_idx, error_code);
 467 
 468     GPFSMPIO_T_CIO_SET_GET( w, 0, 1, GPFSMPIO_CIO_LAST, GPFSMPIO_CIO_T_DEXCH )
 469     GPFSMPIO_T_CIO_SET_GET( w, 0, 1, GPFSMPIO_CIO_LAST, GPFSMPIO_CIO_T_MPIO_CRW )
 470 
 471     GPFSMPIO_T_CIO_REPORT( 1, fd, myrank, nprocs)
 472 
 473 /* free all memory allocated for collective I/O */
 474     if (!buftype_is_contig) ADIOI_Delete_flattened(datatype);
 475 
 476     for (i=0; i<nprocs; i++) {
 477         if (others_req[i].count) {
 478             ADIOI_Free(others_req[i].offsets);
 479             ADIOI_Free(others_req[i].lens);
 480             ADIOI_Free(others_req[i].mem_ptrs);
 481         }
 482     }
 483     ADIOI_Free(others_req);
 484 
 485     ADIOI_Free(buf_idx);
 486     ADIOI_Free(offset_list);
 487     ADIOI_Free(len_list);
 488     ADIOI_Free(st_offsets);
 489     ADIOI_Free(end_offsets);
 490     ADIOI_Free(fd_start);
 491     ADIOI_Free(fd_end);
 492 
 493 fn_exit:
 494 #ifdef HAVE_STATUS_SET_BYTES
 495     if (status) {
 496       MPI_Count bufsize, size;
 497       /* Don't set status if it isn't needed */
 498       MPI_Type_size_x(datatype, &size);
 499       bufsize = size * count;
 500       MPIR_Status_set_bytes(status, datatype, bufsize);
 501     }
 502 /* This is a temporary way of filling in status. The right way is to 
 503    keep track of how much data was actually written during collective I/O. */
 504 #endif
 505 
 506     fd->fp_sys_posn = -1;   /* set it to null. */
 507 #ifdef AGGREGATION_PROFILE
 508         MPE_Log_event (5013, 0, NULL);
 509 #endif
 510 }
 511 
 512 static void gpfs_wr_access_start(int fd, ADIO_Offset offset, ADIO_Offset length)
 513 {
 514         int rc=0;
 515 #ifdef HAVE_GPFS_FCNTL_H
 516         struct {
 517                 gpfsFcntlHeader_t header;
 518                 gpfsAccessRange_t access;
 519         } take_locks;
 520 
 521         take_locks.header.totalLength = sizeof(take_locks);
 522         take_locks.header.fcntlVersion = GPFS_FCNTL_CURRENT_VERSION;
 523         take_locks.header.fcntlReserved = 0;
 524 
 525         take_locks.access.structLen = sizeof(take_locks.access);
 526         take_locks.access.structType = GPFS_ACCESS_RANGE;
 527         take_locks.access.start = offset;
 528         take_locks.access.length = length;
 529         take_locks.access.isWrite = 1;
 530 
 531         rc = gpfs_fcntl(fd, &take_locks);
 532 #endif
 533         ADIOI_Assert(rc == 0);
 534 }
 535 
 536 static void gpfs_wr_access_end(int fd, ADIO_Offset offset, ADIO_Offset length)
 537 {
 538         int rc=0;
 539 #ifdef HAVE_GPFS_FCNTL_H
 540         struct {
 541                 gpfsFcntlHeader_t header;
 542                 gpfsFreeRange_t free;
 543         } free_locks;
 544 
 545 
 546         free_locks.header.totalLength = sizeof(free_locks);
 547         free_locks.header.fcntlVersion = GPFS_FCNTL_CURRENT_VERSION;
 548         free_locks.header.fcntlReserved = 0;
 549 
 550         free_locks.free.structLen = sizeof(free_locks.free);
 551         free_locks.free.structType = GPFS_FREE_RANGE;
 552         free_locks.free.start = offset;
 553         free_locks.free.length = length;
 554 
 555         rc = gpfs_fcntl(fd, &free_locks);
 556 #endif
 557         ADIOI_Assert(rc == 0);
 558 }
 559 
 560 #ifdef BGQPLATFORM
 561 /* my_start, my_end: this processes file domain.  coudd be -1,-1 for "no i/o"
 562  * fd_start, fd_end: arrays of length fd->hints->cb_nodes specifying all file domains */
 563 static int gpfs_find_access_for_ion(ADIO_File fd,
 564         ADIO_Offset my_start, ADIO_Offset my_end,
 565         ADIO_Offset *fd_start, ADIO_Offset *fd_end,
 566         ADIO_Offset *start, ADIO_Offset *end)
 567 {
 568     int my_ionode = BGQ_IO_node_id();
 569     int *rank_to_ionode;
 570     int i, nprocs, rank;
 571     ADIO_Offset group_start=LLONG_MAX, group_end=0;
 572 
 573     MPI_Comm_size(fd->comm, &nprocs);
 574     MPI_Comm_rank(fd->comm, &rank);
 575 
 576     rank_to_ionode = ADIOI_Calloc(nprocs, sizeof(int));
 577     MPI_Allgather(&my_ionode, 1, MPI_INT,  rank_to_ionode, 1, MPI_INT, fd->comm);
 578 
 579     /* rank_to_ionode now contains a mapping from MPI rank to IO node */
 580     /* fd->hints->ranklist[] contains a list of MPI ranks that are aggregators */
 581     /* fd_start[] and fd_end[] contain a list of file domains. */
 582 
 583     /* what we really want to do is take all the file domains associated
 584      * with a given i/o node and find the begin/end of that range.
 585      *
 586      * Because gpfs_fcntl hints are expected to be released, we'll pass this
 587      * start/end back to the caller, who will both declare and free this range
 588      */
 589     if (my_start == -1 || my_end == -1) {
 590         ADIOI_Free(rank_to_ionode);
 591         return 0; /* no work to do */
 592     }
 593 
 594     for (i=0; i<fd->hints->cb_nodes; i++ ){
 595         if (my_ionode == rank_to_ionode[fd->hints->ranklist[i]] ) {
 596             group_start = ADIOI_MIN(fd_start[i], group_start);
 597             group_end = ADIOI_MAX(fd_end[i], group_end);
 598         }
 599     }
 600     *start = group_start;
 601     *end = group_end;
 602     ADIOI_Free(rank_to_ionode);
 603     return 1;
 604 }
 605 #endif // BGQPLATFORM
 606 
 607 
 608 /* If successful, error_code is set to MPI_SUCCESS.  Otherwise an error
 609  * code is created and returned in error_code.
 610  */
 611 static void ADIOI_Exch_and_write(ADIO_File fd, const void *buf, MPI_Datatype
 612                                  datatype, int nprocs, 
 613                                  int myrank,
 614                                  ADIOI_Access
 615                                  *others_req, ADIO_Offset *offset_list,
 616                                  ADIO_Offset *len_list, int contig_access_count,
 617                                  ADIO_Offset min_st_offset, ADIO_Offset fd_size,
 618                                  ADIO_Offset *fd_start, ADIO_Offset *fd_end,
 619                                  int *buf_idx, int *error_code)
 620 {
 621 /* Send data to appropriate processes and write in sizes of no more
 622    than coll_bufsize.    
 623    The idea is to reduce the amount of extra memory required for
 624    collective I/O. If all data were written all at once, which is much
 625    easier, it would require temp space more than the size of user_buf,
 626    which is often unacceptable. For example, to write a distributed
 627    array to a file, where each local array is 8Mbytes, requiring
 628    at least another 8Mbytes of temp space is unacceptable. */
 629 
 630     /* Not convinced end_loc-st_loc couldn't be > int, so make these offsets*/
 631     ADIO_Offset size=0;
 632     int hole, i, j, m, ntimes, max_ntimes, buftype_is_contig;
 633     ADIO_Offset st_loc=-1, end_loc=-1, off, done, req_off;
 634     char *write_buf=NULL, *write_buf2=NULL;
 635     int *curr_offlen_ptr, *count, *send_size, req_len, *recv_size;
 636     int *partial_recv, *sent_to_proc, *start_pos, flag;
 637     int *send_buf_idx, *curr_to_proc, *done_to_proc;
 638     MPI_Status status;
 639     ADIOI_Flatlist_node *flat_buf=NULL;
 640     MPI_Aint buftype_extent, buftype_lb;
 641     int info_flag, coll_bufsize;
 642     char *value;
 643     static char myname[] = "ADIOI_EXCH_AND_WRITE";
 644     pthread_t io_thread;
 645     void *thread_ret;
 646     ADIOI_IO_ThreadFuncData io_thread_args;
 647 
 648     *error_code = MPI_SUCCESS;  /* changed below if error */
 649     /* only I/O errors are currently reported */
 650 
 651 /* calculate the number of writes of size coll_bufsize
 652    to be done by each process and the max among all processes.
 653    That gives the no. of communication phases as well. */
 654 
 655     value = (char *) ADIOI_Malloc((MPI_MAX_INFO_VAL+1)*sizeof(char));
 656     ADIOI_Info_get(fd->info, "cb_buffer_size", MPI_MAX_INFO_VAL, value, 
 657                  &info_flag);
 658     coll_bufsize = atoi(value);
 659     ADIOI_Free(value);
 660 
 661     if (gpfsmpio_pthreadio == 1){
 662         /* ROMIO will spawn an additional thread. both threads use separate
 663          * halves of the collective buffer*/
 664         coll_bufsize = coll_bufsize/2;
 665     }
 666 
 667     for (i=0; i < nprocs; i++) {
 668         if (others_req[i].count) {
 669             st_loc = others_req[i].offsets[0];
 670             end_loc = others_req[i].offsets[0];
 671             break;
 672         }
 673     }
 674 
 675     for (i=0; i < nprocs; i++)
 676         for (j=0; j < others_req[i].count; j++) {
 677             st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);
 678             end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j]
 679                                        + others_req[i].lens[j] - 1));
 680         }
 681 
 682 /* ntimes=ceiling_div(end_loc - st_loc + 1, coll_bufsize)*/
 683 
 684     ntimes = (int) ((end_loc - st_loc + coll_bufsize)/coll_bufsize);
 685 
 686     if ((st_loc==-1) && (end_loc==-1)) {
 687         ntimes = 0; /* this process does no writing. */
 688     }
 689     if (ntimes > 0) { /* only set the gpfs hint if we have io - ie this rank is
 690                          an aggregator -- otherwise will fail for deferred open */
 691       if (getenv("ROMIO_GPFS_DECLARE_ACCESS")!=NULL) {
 692         gpfs_wr_access_start(fd->fd_sys, st_loc, end_loc - st_loc);
 693       }
 694     }
 695 
 696     ADIO_Offset st_loc_ion=0, end_loc_ion=0, needs_gpfs_access_cleanup=0;
 697 #ifdef BGQPLATFORM
 698     if (ntimes > 0) { /* only set the gpfs hint if we have io - ie this rank is
 699                          an aggregator -- otherwise will fail for deferred open */
 700 
 701       if (getenv("ROMIO_GPFS_DECLARE_ION_ACCESS")!=NULL) {
 702         if (gpfs_find_access_for_ion(fd, st_loc, end_loc, fd_start, fd_end,
 703                     &st_loc_ion, &end_loc_ion)) {
 704             gpfs_wr_access_start(fd->fd_sys, st_loc_ion, end_loc_ion-st_loc_ion);
 705             needs_gpfs_access_cleanup=1;
 706         }
 707       }
 708     }
 709 #endif
 710 
 711     MPI_Allreduce(&ntimes, &max_ntimes, 1, MPI_INT, MPI_MAX,
 712                   fd->comm); 
 713 
 714     write_buf = fd->io_buf;
 715     if (gpfsmpio_pthreadio == 1) {
 716         write_buf2 = fd->io_buf + coll_bufsize;
 717     }
 718 
 719     curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int)); 
 720     /* its use is explained below. calloc initializes to 0. */
 721 
 722     count = (int *) ADIOI_Malloc(nprocs*sizeof(int));
 723     /* to store count of how many off-len pairs per proc are satisfied
 724        in an iteration. */
 725 
 726     partial_recv = (int *) ADIOI_Calloc(nprocs, sizeof(int));
 727     /* if only a portion of the last off-len pair is recd. from a process
 728        in a particular iteration, the length recd. is stored here.
 729        calloc initializes to 0. */
 730 
 731     send_size = (int *) ADIOI_Malloc(nprocs*sizeof(int));
 732     /* total size of data to be sent to each proc. in an iteration.
 733        Of size nprocs so that I can use MPI_Alltoall later. */
 734 
 735     recv_size = (int *) ADIOI_Malloc(nprocs*sizeof(int));
 736     /* total size of data to be recd. from each proc. in an iteration.*/
 737 
 738     sent_to_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int));
 739     /* amount of data sent to each proc so far. Used in
 740        ADIOI_Fill_send_buffer. initialized to 0 here. */
 741 
 742     send_buf_idx = (int *) ADIOI_Malloc(nprocs*sizeof(int));
 743     curr_to_proc = (int *) ADIOI_Malloc(nprocs*sizeof(int));
 744     done_to_proc = (int *) ADIOI_Malloc(nprocs*sizeof(int));
 745     /* Above three are used in ADIOI_Fill_send_buffer*/
 746 
 747     start_pos = (int *) ADIOI_Malloc(nprocs*sizeof(int));
 748     /* used to store the starting value of curr_offlen_ptr[i] in 
 749        this iteration */
 750 
 751     ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
 752     if (!buftype_is_contig) {
 753         flat_buf = ADIOI_Flatten_and_find(datatype);
 754     }
 755     MPI_Type_get_extent(datatype, &buftype_lb, &buftype_extent);
 756 
 757 
 758 /* I need to check if there are any outstanding nonblocking writes to
 759    the file, which could potentially interfere with the writes taking
 760    place in this collective write call. Since this is not likely to be
 761    common, let me do the simplest thing possible here: Each process
 762    completes all pending nonblocking operations before completing. */
 763 
 764     /*ADIOI_Complete_async(error_code);
 765     if (*error_code != MPI_SUCCESS) return;
 766     MPI_Barrier(fd->comm);
 767     */
 768 
 769     done = 0;
 770     off = st_loc;
 771 
 772     if(gpfsmpio_pthreadio == 1)
 773         io_thread = pthread_self();
 774 
 775 #ifdef PROFILE
 776         MPE_Log_event(14, 0, "end computation");
 777 #endif
 778 
 779     for (m=0; m < ntimes; m++) {
 780        /* go through all others_req and check which will be satisfied
 781           by the current write */
 782 
 783        /* Note that MPI guarantees that displacements in filetypes are in 
 784           monotonically nondecreasing order and that, for writes, the
 785           filetypes cannot specify overlapping regions in the file. This
 786           simplifies implementation a bit compared to reads. */
 787 
 788           /* off = start offset in the file for the data to be written in 
 789                    this iteration 
 790              size = size of data written (bytes) corresponding to off
 791              req_off = off in file for a particular contiguous request 
 792                        minus what was satisfied in previous iteration
 793              req_size = size corresponding to req_off */
 794 
 795         /* first calculate what should be communicated */
 796 
 797 #ifdef PROFILE
 798         MPE_Log_event(13, 0, "start computation");
 799 #endif
 800         for (i=0; i < nprocs; i++) count[i] = recv_size[i] = 0;
 801 
 802         size = ADIOI_MIN((unsigned)coll_bufsize, end_loc-st_loc+1-done); 
 803 
 804         for (i=0; i < nprocs; i++) {
 805             if (others_req[i].count) {
 806                 start_pos[i] = curr_offlen_ptr[i];
 807                 for (j=curr_offlen_ptr[i]; j<others_req[i].count; j++) {
 808                     if (partial_recv[i]) {
 809                         /* this request may have been partially
 810                            satisfied in the previous iteration. */
 811                         req_off = others_req[i].offsets[j] +
 812                             partial_recv[i]; 
 813                         req_len = others_req[i].lens[j] -
 814                             partial_recv[i];
 815                         partial_recv[i] = 0;
 816                         /* modify the off-len pair to reflect this change */
 817                         others_req[i].offsets[j] = req_off;
 818                         others_req[i].lens[j] = req_len;
 819                     }
 820                     else {
 821                         req_off = others_req[i].offsets[j];
 822                         req_len = others_req[i].lens[j];
 823                     }
 824                     if (req_off < off + size) {
 825                         count[i]++;
 826       ADIOI_Assert((((ADIO_Offset)(MPIU_Upint)write_buf)+req_off-off) == (ADIO_Offset)(MPIU_Upint)(write_buf+req_off-off));
 827                         MPI_Get_address(write_buf+req_off-off, 
 828                                &(others_req[i].mem_ptrs[j]));
 829       ADIOI_Assert((off + size - req_off) == (int)(off + size - req_off));
 830                         recv_size[i] += (int)(ADIOI_MIN(off + size - req_off, 
 831                                       (unsigned)req_len));
 832 
 833                         if (off+size-req_off < (unsigned)req_len)
 834                         {
 835                             partial_recv[i] = (int) (off + size - req_off);
 836 
 837                             /* --BEGIN ERROR HANDLING-- */
 838                             if ((j+1 < others_req[i].count) && 
 839                                  (others_req[i].offsets[j+1] < off+size))
 840                             { 
 841                                 *error_code = MPIO_Err_create_code(MPI_SUCCESS,
 842                                                                    MPIR_ERR_RECOVERABLE,
 843                                                                    myname,
 844                                                                    __LINE__,
 845                                                                    MPI_ERR_ARG,
 846                                                                    "Filetype specifies overlapping write regions (which is illegal according to the MPI-2 specification)", 0);
 847                                 /* allow to continue since additional
 848                                  * communication might have to occur
 849                                  */
 850                             }
 851                             /* --END ERROR HANDLING-- */
 852                             break;
 853                         }
 854                     }
 855                     else break;
 856                 }
 857                 curr_offlen_ptr[i] = j;
 858             }
 859         }
 860         
 861 #ifdef PROFILE
 862         MPE_Log_event(14, 0, "end computation");
 863         MPE_Log_event(7, 0, "start communication");
 864 #endif
 865         if (gpfsmpio_comm == 1)
 866         ADIOI_W_Exchange_data(fd, buf, write_buf, flat_buf, offset_list,
 867                             len_list, send_size, recv_size, off, size, count,
 868                             start_pos, partial_recv,
 869                             sent_to_proc, nprocs, myrank,
 870                             buftype_is_contig, contig_access_count,
 871                             min_st_offset, fd_size, fd_start, fd_end,
 872                             others_req, send_buf_idx, curr_to_proc,
 873                             done_to_proc, &hole, m, buftype_extent, buf_idx,
 874                             error_code); 
 875         else
 876         if (gpfsmpio_comm == 0)
 877         ADIOI_W_Exchange_data_alltoallv(fd, buf, write_buf, flat_buf, offset_list,
 878                             len_list, send_size, recv_size, off, size, count,
 879                             start_pos, partial_recv,
 880                             sent_to_proc, nprocs, myrank,
 881                             buftype_is_contig, contig_access_count,
 882                             min_st_offset, fd_size, fd_start, fd_end,
 883                             others_req, send_buf_idx, curr_to_proc,
 884                             done_to_proc, &hole, m, buftype_extent, buf_idx,
 885                             error_code); 
 886         if (*error_code != MPI_SUCCESS) return;
 887 #ifdef PROFILE
 888         MPE_Log_event(8, 0, "end communication");
 889 #endif
 890 
 891         flag = 0;
 892         for (i=0; i<nprocs; i++)
 893             if (count[i]) flag = 1;
 894 
 895         if (flag) {
 896             char round[50];
 897             sprintf(round, "two-phase-round=%d", m);
 898             setenv("LIBIOLOG_EXTRA_INFO", round, 1);
 899       ADIOI_Assert(size == (int)size);
 900             if (gpfsmpio_pthreadio == 1) {
 901                 /* there is no such thing as "invalid pthread identifier", so
 902                  * we'll use pthread_self() instead.  Before we do I/O we want
 903                  * to complete I/O from any previous iteration -- but only a
 904                  * previous iteration that had I/O work to do (i.e. set 'flag')
 905                  */
 906                 if(!pthread_equal(io_thread, pthread_self())) {
 907                     pthread_join(io_thread, &thread_ret);
 908                     *error_code = *(int *)thread_ret;
 909                     if (*error_code != MPI_SUCCESS) return;
 910                     io_thread = pthread_self();
 911 
 912                 }
 913                 io_thread_args.fd = fd;
 914                 /* do a little pointer shuffling: background I/O works from one
 915                  * buffer while two-phase machinery fills up another */
 916                 io_thread_args.buf = write_buf;
 917                 ADIOI_SWAP(write_buf, write_buf2, char*);
 918                 io_thread_args.io_kind = ADIOI_WRITE;
 919                 io_thread_args.size = size;
 920                 io_thread_args.offset = off;
 921                 io_thread_args.status = &status;
 922                 io_thread_args.error_code = *error_code;
 923                 if ( (pthread_create(&io_thread, NULL,
 924                         ADIOI_IO_Thread_Func, &(io_thread_args))) != 0)
 925                     io_thread = pthread_self();
 926             } else {
 927                 ADIO_WriteContig(fd, write_buf, (int)size, MPI_BYTE,
 928                         ADIO_EXPLICIT_OFFSET, off, &status, error_code);
 929                 if (*error_code != MPI_SUCCESS) return;
 930             }
 931         }
 932 
 933         off += size;
 934         done += size;
 935     }
 936     if (gpfsmpio_pthreadio == 1) {
 937         if ( !pthread_equal(io_thread, pthread_self()) ) {
 938             pthread_join(io_thread, &thread_ret);
 939             *error_code = *(int *)thread_ret;
 940         }
 941     }
 942 
 943     for (i=0; i<nprocs; i++) count[i] = recv_size[i] = 0;
 944 #ifdef PROFILE
 945         MPE_Log_event(7, 0, "start communication");
 946 #endif
 947     for (m=ntimes; m<max_ntimes; m++) 
 948         /* nothing to recv, but check for send. */
 949         if (gpfsmpio_comm == 1)
 950         ADIOI_W_Exchange_data(fd, buf, write_buf, flat_buf, offset_list,
 951                             len_list, send_size, recv_size, off, size, count,
 952                             start_pos, partial_recv,
 953                             sent_to_proc, nprocs, myrank,
 954                             buftype_is_contig, contig_access_count,
 955                             min_st_offset, fd_size, fd_start, fd_end,
 956                             others_req, send_buf_idx, 
 957                             curr_to_proc, done_to_proc, &hole, m, 
 958                             buftype_extent, buf_idx, error_code); 
 959         else
 960         if (gpfsmpio_comm == 0)
 961         ADIOI_W_Exchange_data_alltoallv(fd, buf, write_buf, flat_buf, offset_list,
 962                             len_list, send_size, recv_size, off, size, count,
 963                             start_pos, partial_recv,
 964                             sent_to_proc, nprocs, myrank,
 965                             buftype_is_contig, contig_access_count,
 966                             min_st_offset, fd_size, fd_start, fd_end,
 967                             others_req, send_buf_idx, 
 968                             curr_to_proc, done_to_proc, &hole, m, 
 969                             buftype_extent, buf_idx, error_code); 
 970         if (*error_code != MPI_SUCCESS) return;
 971 #ifdef PROFILE
 972         MPE_Log_event(8, 0, "end communication");
 973 #endif
 974 
 975     ADIOI_Free(curr_offlen_ptr);
 976     ADIOI_Free(count);
 977     ADIOI_Free(partial_recv);
 978     ADIOI_Free(send_size);
 979     ADIOI_Free(recv_size);
 980     ADIOI_Free(sent_to_proc);
 981     ADIOI_Free(start_pos);
 982     ADIOI_Free(send_buf_idx);
 983     ADIOI_Free(curr_to_proc);
 984     ADIOI_Free(done_to_proc);
 985 
 986     if (ntimes != 0 && getenv("ROMIO_GPFS_DECLARE_ACCESS")!=NULL) {
 987         gpfs_wr_access_end(fd->fd_sys, st_loc, end_loc-st_loc);
 988     }
 989 
 990     if (needs_gpfs_access_cleanup) {
 991         gpfs_wr_access_end(fd->fd_sys, st_loc_ion, end_loc_ion-st_loc_ion);
 992         needs_gpfs_access_cleanup=0;
 993     }
 994 
 995     unsetenv("LIBIOLOG_EXTRA_INFO");
 996 }
 997 
 998 
 999 /* Sets error_code to MPI_SUCCESS if successful, or creates an error code
1000  * in the case of error.
1001  */
1002 static void ADIOI_W_Exchange_data(ADIO_File fd, const void *buf, char *write_buf,
1003                                   ADIOI_Flatlist_node *flat_buf, ADIO_Offset 
1004                                   *offset_list, ADIO_Offset *len_list, int *send_size, 
1005                                   int *recv_size, ADIO_Offset off, int size,
1006                                   int *count, int *start_pos,
1007                                   int *partial_recv,
1008                                   int *sent_to_proc, int nprocs, 
1009                                   int myrank, int
1010                                   buftype_is_contig, int contig_access_count,
1011                                   ADIO_Offset min_st_offset,
1012                                   ADIO_Offset fd_size,
1013                                   ADIO_Offset *fd_start, ADIO_Offset *fd_end, 
1014                                   ADIOI_Access *others_req, 
1015                                   int *send_buf_idx, int *curr_to_proc,
1016                                   int *done_to_proc, int *hole, int iter, 
1017                                   MPI_Aint buftype_extent, int *buf_idx,
1018                                   int *error_code)
1019 {
1020     int i, j, k, *tmp_len, nprocs_recv, nprocs_send, err;
1021     char **send_buf = NULL; 
1022     MPI_Request *requests, *send_req;
1023     MPI_Datatype *recv_types;
1024     MPI_Status *statuses, status;
1025     int *srt_len, sum;
1026     ADIO_Offset *srt_off;
1027     static char myname[] = "ADIOI_W_EXCHANGE_DATA";
1028 
1029 /* exchange recv_size info so that each process knows how much to
1030    send to whom. */
1031 
1032     MPI_Alltoall(recv_size, 1, MPI_INT, send_size, 1, MPI_INT, fd->comm);
1033 
1034     /* create derived datatypes for recv */
1035 
1036     nprocs_recv = 0;
1037     for (i=0; i<nprocs; i++) if (recv_size[i]) nprocs_recv++;
1038 
1039     recv_types = (MPI_Datatype *)
1040         ADIOI_Malloc((nprocs_recv+1)*sizeof(MPI_Datatype)); 
1041 /* +1 to avoid a 0-size malloc */
1042 
1043     tmp_len = (int *) ADIOI_Malloc(nprocs*sizeof(int));
1044     j = 0;
1045     for (i=0; i<nprocs; i++) {
1046         if (recv_size[i]) {
1047 /* take care if the last off-len pair is a partial recv */
1048             if (partial_recv[i]) {
1049                 k = start_pos[i] + count[i] - 1;
1050                 tmp_len[i] = others_req[i].lens[k];
1051                 others_req[i].lens[k] = partial_recv[i];
1052             }
1053             ADIOI_Type_create_hindexed_x(count[i],
1054                   &(others_req[i].lens[start_pos[i]]),
1055                      &(others_req[i].mem_ptrs[start_pos[i]]), 
1056                          MPI_BYTE, recv_types+j);
1057             /* absolute displacements; use MPI_BOTTOM in recv */
1058             MPI_Type_commit(recv_types+j);
1059             j++;
1060         }
1061     }
1062 
1063     /* To avoid a read-modify-write, check if there are holes in the 
1064        data to be written. For this, merge the (sorted) offset lists
1065        others_req using a heap-merge. */
1066 
1067     sum = 0;
1068     for (i=0; i<nprocs; i++) sum += count[i];
1069     srt_off = (ADIO_Offset *) ADIOI_Malloc((sum+1)*sizeof(ADIO_Offset));
1070     srt_len = (int *) ADIOI_Malloc((sum+1)*sizeof(int));
1071     /* +1 to avoid a 0-size malloc */
1072 
1073     ADIOI_Heap_merge(others_req, count, srt_off, srt_len, start_pos,
1074                      nprocs, nprocs_recv, sum);
1075 
1076 /* for partial recvs, restore original lengths */
1077     for (i=0; i<nprocs; i++) 
1078         if (partial_recv[i]) {
1079             k = start_pos[i] + count[i] - 1;
1080             others_req[i].lens[k] = tmp_len[i];
1081         }
1082     ADIOI_Free(tmp_len);
1083 
1084     /* check if there are any holes. If yes, must do read-modify-write.
1085      * holes can be in three places.  'middle' is what you'd expect: the
1086      * processes are operating on noncontigous data.  But holes can also show
1087      * up at the beginning or end of the file domain (see John Bent ROMIO REQ
1088      * #835). Missing these holes would result in us writing more data than
1089      * recieved by everyone else. */
1090     *hole = 0;
1091     if (off != srt_off[0]) /* hole at the front */
1092         *hole = 1;
1093     else { /* coalesce the sorted offset-length pairs */
1094         for (i=1; i<sum; i++) {
1095             if (srt_off[i] <= srt_off[0] + srt_len[0]) {
1096                 int new_len = srt_off[i] + srt_len[i] - srt_off[0];
1097                 if (new_len > srt_len[0]) srt_len[0] = new_len;
1098             }
1099             else
1100                 break;
1101         }
1102         if (i < sum || size != srt_len[0]) /* hole in middle or end */
1103             *hole = 1;
1104         }
1105 
1106     ADIOI_Free(srt_off);
1107     ADIOI_Free(srt_len);
1108 
1109     if (nprocs_recv) {
1110         if (*hole) {
1111             const char * stuff = "data-sieve-in-two-phase";
1112             setenv("LIBIOLOG_EXTRA_INFO", stuff, 1);
1113             ADIO_ReadContig(fd, write_buf, size, MPI_BYTE, 
1114                             ADIO_EXPLICIT_OFFSET, off, &status, &err);
1115             /* --BEGIN ERROR HANDLING-- */
1116             if (err != MPI_SUCCESS) {
1117                 *error_code = MPIO_Err_create_code(err,
1118                                                    MPIR_ERR_RECOVERABLE, myname,
1119                                                    __LINE__, MPI_ERR_IO,
1120                                                    "**ioRMWrdwr", 0);
1121                 return;
1122             } 
1123             /* --END ERROR HANDLING-- */
1124             unsetenv("LIBIOLOG_EXTRA_INFO");
1125         }
1126     }
1127 
1128     nprocs_send = 0;
1129     for (i=0; i < nprocs; i++) if (send_size[i]) nprocs_send++;
1130 
1131     if (fd->atomicity) {
1132         /* bug fix from Wei-keng Liao and Kenin Coloma */
1133         requests = (MPI_Request *)
1134             ADIOI_Malloc((nprocs_send+1)*sizeof(MPI_Request)); 
1135         send_req = requests;
1136     }
1137     else {
1138         requests = (MPI_Request *)      
1139             ADIOI_Malloc((nprocs_send+nprocs_recv+1)*sizeof(MPI_Request)); 
1140         /* +1 to avoid a 0-size malloc */
1141 
1142         /* post receives */
1143         j = 0;
1144         for (i=0; i<nprocs; i++) {
1145             if (recv_size[i]) {
1146                 MPI_Irecv(MPI_BOTTOM, 1, recv_types[j], i, myrank+i+100*iter,
1147                           fd->comm, requests+j);
1148                 j++;
1149             }
1150         }
1151         send_req = requests + nprocs_recv;
1152     }
1153 
1154 /* post sends. if buftype_is_contig, data can be directly sent from
1155    user buf at location given by buf_idx. else use send_buf. */
1156 
1157 #ifdef AGGREGATION_PROFILE
1158     MPE_Log_event (5032, 0, NULL);
1159 #endif
1160     if (buftype_is_contig) {
1161         j = 0;
1162         for (i=0; i < nprocs; i++) 
1163             if (send_size[i]) {
1164                 MPI_Isend(((char *) buf) + buf_idx[i], send_size[i], 
1165                             MPI_BYTE, i,  myrank+i+100*iter, fd->comm, 
1166                                   send_req+j);
1167                 j++;
1168                 buf_idx[i] += send_size[i];
1169             }
1170     }
1171     else if (nprocs_send) {
1172         /* buftype is not contig */
1173         send_buf = (char **) ADIOI_Malloc(nprocs*sizeof(char*));
1174         for (i=0; i < nprocs; i++) 
1175             if (send_size[i]) 
1176                 send_buf[i] = (char *) ADIOI_Malloc(send_size[i]);
1177 
1178         ADIOI_Fill_send_buffer(fd, buf, flat_buf, send_buf,
1179                            offset_list, len_list, send_size, 
1180                            send_req,
1181                            sent_to_proc, nprocs, myrank, 
1182                            contig_access_count,
1183                            min_st_offset, fd_size, fd_start, fd_end, 
1184                            send_buf_idx, curr_to_proc, done_to_proc, iter,
1185                            buftype_extent);
1186         /* the send is done in ADIOI_Fill_send_buffer */
1187     }
1188 
1189     if (fd->atomicity) {
1190         /* bug fix from Wei-keng Liao and Kenin Coloma */
1191         j = 0;
1192         for (i=0; i<nprocs; i++) {
1193             MPI_Status wkl_status;
1194             if (recv_size[i]) {
1195                 MPI_Recv(MPI_BOTTOM, 1, recv_types[j], i, myrank+i+100*iter,
1196                           fd->comm, &wkl_status);
1197                 j++;
1198             }
1199         }
1200     }
1201 
1202     for (i=0; i<nprocs_recv; i++) MPI_Type_free(recv_types+i);
1203     ADIOI_Free(recv_types);
1204     
1205     if (fd->atomicity) {
1206         /* bug fix from Wei-keng Liao and Kenin Coloma */
1207         statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send+1) * \
1208                                          sizeof(MPI_Status)); 
1209          /* +1 to avoid a 0-size malloc */
1210     }
1211     else {
1212         statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send+nprocs_recv+1) * \
1213                                      sizeof(MPI_Status)); 
1214         /* +1 to avoid a 0-size malloc */
1215     }
1216 
1217 #ifdef NEEDS_MPI_TEST
1218     i = 0;
1219     if (fd->atomicity) {
1220         /* bug fix from Wei-keng Liao and Kenin Coloma */
1221         while (!i) MPI_Testall(nprocs_send, send_req, &i, statuses);
1222     }
1223     else {
1224         while (!i) MPI_Testall(nprocs_send+nprocs_recv, requests, &i, statuses);
1225     }
1226 #else
1227     if (fd->atomicity)
1228         /* bug fix from Wei-keng Liao and Kenin Coloma */
1229         MPI_Waitall(nprocs_send, send_req, statuses);
1230     else
1231         MPI_Waitall(nprocs_send+nprocs_recv, requests, statuses);
1232 #endif
1233 
1234 #ifdef AGGREGATION_PROFILE
1235     MPE_Log_event (5033, 0, NULL);
1236 #endif
1237     ADIOI_Free(statuses);
1238     ADIOI_Free(requests);
1239     if (!buftype_is_contig && nprocs_send) {
1240         for (i=0; i < nprocs; i++) 
1241             if (send_size[i]) ADIOI_Free(send_buf[i]);
1242         ADIOI_Free(send_buf);
1243     }
1244 }
1245 
1246 
1247 #define ADIOI_BUF_INCR \
1248 { \
1249     while (buf_incr) { \
1250         size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \
1251         user_buf_idx += size_in_buf; \
1252         flat_buf_sz -= size_in_buf; \
1253         if (!flat_buf_sz) { \
1254             if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
1255             else { \
1256                 flat_buf_idx = 0; \
1257                 n_buftypes++; \
1258             } \
1259             user_buf_idx = flat_buf->indices[flat_buf_idx] + \
1260                               (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
1261             flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
1262         } \
1263         buf_incr -= size_in_buf; \
1264     } \
1265 }
1266 
1267 
1268 #define ADIOI_BUF_COPY \
1269 { \
1270     while (size) { \
1271         size_in_buf = ADIOI_MIN(size, flat_buf_sz); \
1272   ADIOI_Assert((((ADIO_Offset)(MPIU_Upint)buf) + user_buf_idx) == (ADIO_Offset)(MPIU_Upint)((MPIU_Upint)buf + user_buf_idx)); \
1273   ADIOI_Assert(size_in_buf == (size_t)size_in_buf); \
1274         memcpy(&(send_buf[p][send_buf_idx[p]]), \
1275                ((char *) buf) + user_buf_idx, size_in_buf); \
1276         send_buf_idx[p] += size_in_buf; \
1277         user_buf_idx += size_in_buf; \
1278         flat_buf_sz -= size_in_buf; \
1279         if (!flat_buf_sz) { \
1280             if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
1281             else { \
1282                 flat_buf_idx = 0; \
1283                 n_buftypes++; \
1284             } \
1285             user_buf_idx = flat_buf->indices[flat_buf_idx] + \
1286                               (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
1287             flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
1288         } \
1289         size -= size_in_buf; \
1290         buf_incr -= size_in_buf; \
1291     } \
1292     ADIOI_BUF_INCR \
1293 }
1294 
1295 static void ADIOI_Fill_send_buffer(ADIO_File fd, const void *buf, ADIOI_Flatlist_node
1296                            *flat_buf, char **send_buf, ADIO_Offset 
1297                            *offset_list, ADIO_Offset *len_list, int *send_size, 
1298                            MPI_Request *requests, int *sent_to_proc, 
1299                            int nprocs, int myrank, 
1300                            int contig_access_count, 
1301                            ADIO_Offset min_st_offset, ADIO_Offset fd_size,
1302                            ADIO_Offset *fd_start, ADIO_Offset *fd_end, 
1303                            int *send_buf_idx, int *curr_to_proc, 
1304                            int *done_to_proc, int iter,
1305                            MPI_Aint buftype_extent)
1306 {
1307 /* this function is only called if buftype is not contig */
1308 
1309     int i, p, flat_buf_idx;
1310     ADIO_Offset flat_buf_sz, size_in_buf, buf_incr, size;
1311     int jj, n_buftypes;
1312     ADIO_Offset off, len, rem_len, user_buf_idx;
1313 
1314 /*  curr_to_proc[p] = amount of data sent to proc. p that has already
1315     been accounted for so far
1316     done_to_proc[p] = amount of data already sent to proc. p in 
1317     previous iterations
1318     user_buf_idx = current location in user buffer 
1319     send_buf_idx[p] = current location in send_buf of proc. p  */
1320 
1321     for (i=0; i < nprocs; i++) {
1322         send_buf_idx[i] = curr_to_proc[i] = 0;
1323         done_to_proc[i] = sent_to_proc[i];
1324     }
1325     jj = 0;
1326 
1327     user_buf_idx = flat_buf->indices[0];
1328     flat_buf_idx = 0;
1329     n_buftypes = 0;
1330     flat_buf_sz = flat_buf->blocklens[0];
1331 
1332     /* flat_buf_idx = current index into flattened buftype
1333        flat_buf_sz = size of current contiguous component in 
1334                          flattened buf */
1335 
1336     for (i=0; i<contig_access_count; i++) { 
1337         off     = offset_list[i];
1338         rem_len = len_list[i];
1339 
1340         /*this request may span the file domains of more than one process*/
1341   while (rem_len != 0) {
1342             len = rem_len;
1343             /* NOTE: len value is modified by ADIOI_Calc_aggregator() to be no
1344              * longer than the single region that processor "p" is responsible
1345              * for.
1346              */
1347             p = ADIOI_GPFS_Calc_aggregator(fd,
1348                                       off,
1349                                       min_st_offset,
1350                                       &len,
1351                                       fd_size,
1352                                       fd_start,
1353                                       fd_end);
1354 
1355             if (send_buf_idx[p] < send_size[p]) {
1356                 if (curr_to_proc[p]+len > done_to_proc[p]) {
1357                     if (done_to_proc[p] > curr_to_proc[p]) {
1358                         size = ADIOI_MIN(curr_to_proc[p] + len - 
1359                                 done_to_proc[p], send_size[p]-send_buf_idx[p]);
1360                         buf_incr = done_to_proc[p] - curr_to_proc[p];
1361                         ADIOI_BUF_INCR
1362       ADIOI_Assert((curr_to_proc[p] + len - done_to_proc[p]) == (unsigned)(curr_to_proc[p] + len - done_to_proc[p]));
1363                         buf_incr = curr_to_proc[p] + len - done_to_proc[p];
1364       ADIOI_Assert((done_to_proc[p] + size) == (unsigned)(done_to_proc[p] + size));
1365                         curr_to_proc[p] = done_to_proc[p] + size;
1366                         ADIOI_BUF_COPY
1367                     }
1368                     else {
1369                         size = ADIOI_MIN(len,send_size[p]-send_buf_idx[p]);
1370                         buf_incr = len;
1371       ADIOI_Assert((curr_to_proc[p] + size) == (unsigned)((ADIO_Offset)curr_to_proc[p] + size));
1372                         curr_to_proc[p] += size;
1373                         ADIOI_BUF_COPY
1374                     }
1375                     if (send_buf_idx[p] == send_size[p]) {
1376                         MPI_Isend(send_buf[p], send_size[p], MPI_BYTE, p, 
1377                                 myrank+p+100*iter, fd->comm, requests+jj);
1378                         jj++;
1379                     }
1380                 }
1381                 else {
1382         ADIOI_Assert((curr_to_proc[p] + len) == (unsigned)((ADIO_Offset)curr_to_proc[p] + len));
1383                     curr_to_proc[p] += len;
1384                     buf_incr = len;
1385                     ADIOI_BUF_INCR
1386                 }
1387             }
1388             else {
1389                 buf_incr = len;
1390                 ADIOI_BUF_INCR
1391             }
1392             off     += len;
1393             rem_len -= len;
1394         }
1395     }
1396     for (i=0; i < nprocs; i++) 
1397         if (send_size[i]) sent_to_proc[i] = curr_to_proc[i];
1398 }
1399 
1400 
1401 
1402 static void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count, 
1403                       ADIO_Offset *srt_off, int *srt_len, int *start_pos,
1404                       int nprocs, int nprocs_recv, int total_elements)
1405 {
1406     typedef struct {
1407         ADIO_Offset *off_list;
1408         ADIO_Offset *len_list;
1409         int nelem;
1410     } heap_struct;
1411 
1412     heap_struct *a, tmp;
1413     int i, j, heapsize, l, r, k, smallest;
1414 
1415     a = (heap_struct *) ADIOI_Malloc((nprocs_recv+1)*sizeof(heap_struct));
1416 
1417     j = 0;
1418     for (i=0; i<nprocs; i++)
1419         if (count[i]) {
1420             a[j].off_list = &(others_req[i].offsets[start_pos[i]]);
1421             a[j].len_list = &(others_req[i].lens[start_pos[i]]);
1422             a[j].nelem = count[i];
1423             j++;
1424         }
1425 
1426     /* build a heap out of the first element from each list, with
1427        the smallest element of the heap at the root */
1428 
1429     heapsize = nprocs_recv;
1430     for (i=heapsize/2 - 1; i>=0; i--) {
1431         /* Heapify(a, i, heapsize); Algorithm from Cormen et al. pg. 143
1432            modified for a heap with smallest element at root. I have 
1433            removed the recursion so that there are no function calls.
1434            Function calls are too expensive. */
1435         k = i;
1436         while (1) {
1437             l = 2*(k+1) - 1;
1438             r = 2*(k+1);
1439 
1440             if ((l < heapsize) && 
1441                 (*(a[l].off_list) < *(a[k].off_list)))
1442                 smallest = l;
1443             else smallest = k;
1444 
1445             if ((r < heapsize) && 
1446                 (*(a[r].off_list) < *(a[smallest].off_list)))
1447                 smallest = r;
1448 
1449             if (smallest != k) {
1450                 tmp.off_list = a[k].off_list;
1451                 tmp.len_list = a[k].len_list;
1452                 tmp.nelem = a[k].nelem;
1453 
1454                 a[k].off_list = a[smallest].off_list;
1455                 a[k].len_list = a[smallest].len_list;
1456                 a[k].nelem = a[smallest].nelem;
1457                 
1458                 a[smallest].off_list = tmp.off_list;
1459                 a[smallest].len_list = tmp.len_list;
1460                 a[smallest].nelem = tmp.nelem;
1461             
1462                 k = smallest;
1463             }
1464             else break;
1465         }
1466     }
1467 
1468     for (i=0; i<total_elements; i++) {
1469         /* extract smallest element from heap, i.e. the root */
1470         srt_off[i] = *(a[0].off_list);
1471         srt_len[i] = *(a[0].len_list);
1472         (a[0].nelem)--;
1473 
1474         if (!a[0].nelem) {
1475             a[0].off_list = a[heapsize-1].off_list;
1476             a[0].len_list = a[heapsize-1].len_list;
1477             a[0].nelem = a[heapsize-1].nelem;
1478             heapsize--;
1479         }
1480         else {
1481             (a[0].off_list)++;
1482             (a[0].len_list)++;
1483         }
1484 
1485         /* Heapify(a, 0, heapsize); */
1486         k = 0;
1487         while (1) {
1488             l = 2*(k+1) - 1;
1489             r = 2*(k+1);
1490 
1491             if ((l < heapsize) && 
1492                 (*(a[l].off_list) < *(a[k].off_list)))
1493                 smallest = l;
1494             else smallest = k;
1495 
1496             if ((r < heapsize) && 
1497                 (*(a[r].off_list) < *(a[smallest].off_list)))
1498                 smallest = r;
1499 
1500             if (smallest != k) {
1501                 tmp.off_list = a[k].off_list;
1502                 tmp.len_list = a[k].len_list;
1503                 tmp.nelem = a[k].nelem;
1504 
1505                 a[k].off_list = a[smallest].off_list;
1506                 a[k].len_list = a[smallest].len_list;
1507                 a[k].nelem = a[smallest].nelem;
1508                 
1509                 a[smallest].off_list = tmp.off_list;
1510                 a[smallest].len_list = tmp.len_list;
1511                 a[smallest].nelem = tmp.nelem;
1512             
1513                 k = smallest;
1514             }
1515             else break;
1516         }
1517     }
1518 
1519     ADIOI_Free(a);
1520 }
1521 
1522 
1523 static void ADIOI_W_Exchange_data_alltoallv(
1524                 ADIO_File fd, const void *buf, 
1525                 char *write_buf,                                        /* 1 */
1526                 ADIOI_Flatlist_node *flat_buf, 
1527                 ADIO_Offset *offset_list, 
1528                 ADIO_Offset *len_list, int *send_size, int *recv_size, 
1529                 ADIO_Offset off, int size,                              /* 2 */
1530                 int *count, int *start_pos, int *partial_recv,
1531                 int *sent_to_proc, int nprocs, int myrank, 
1532                 int buftype_is_contig, int contig_access_count,
1533                 ADIO_Offset min_st_offset,
1534                 ADIO_Offset fd_size,
1535                 ADIO_Offset *fd_start, 
1536                 ADIO_Offset *fd_end,
1537                 ADIOI_Access *others_req,
1538                 int *send_buf_idx, int *curr_to_proc,                   /* 3 */
1539                 int *done_to_proc, int *hole,                           /* 4 */
1540                 int iter, MPI_Aint buftype_extent, int *buf_idx,
1541                 int *error_code)
1542 {   
1543     int i, j, k=0, nprocs_recv, nprocs_send, *tmp_len, err;
1544     char **send_buf = NULL;
1545     MPI_Request *send_req=NULL;
1546     MPI_Status status;
1547     int rtail, stail;
1548     char *sbuf_ptr, *to_ptr;
1549     int  len;
1550     int  *sdispls, *rdispls;
1551     char *all_recv_buf, *all_send_buf;
1552     int *srt_len, sum;
1553     ADIO_Offset *srt_off;
1554     static char myname[] = "ADIOI_W_EXCHANGE_DATA";
1555     double io_time;
1556 
1557     io_time = MPI_Wtime();
1558   /* exchange recv_size info so that each process knows how much to
1559      send to whom. */
1560     MPI_Alltoall(recv_size, 1, MPI_INT, send_size, 1, MPI_INT, fd->comm);
1561 
1562     gpfsmpio_prof_cw[GPFSMPIO_CIO_T_DEXCH_RECV_EXCH] += MPI_Wtime() - io_time;
1563     io_time = MPI_Wtime();
1564     
1565     nprocs_recv = 0;
1566     for (i=0; i<nprocs; i++) if (recv_size[i]) { nprocs_recv++; }
1567     nprocs_send = 0;
1568     for (i=0; i<nprocs; i++) if (send_size[i]) { nprocs_send++; }
1569     
1570   /* receiver side data structures */
1571     rdispls = (int *) ADIOI_Malloc( nprocs * sizeof(int) );
1572     rtail = 0;
1573     for (i=0; i<nprocs; i++) { rdispls[i] = rtail; rtail += recv_size[i]; }
1574 
1575         /* data buffer */
1576     all_recv_buf = (char *) ADIOI_Malloc( rtail );
1577 
1578   /* sender side data structures */
1579     sdispls = (int *) ADIOI_Malloc( nprocs * sizeof(int) );
1580     stail = 0;
1581     for (i=0; i<nprocs; i++) { sdispls[i] = stail; stail += send_size[i]; }
1582 
1583         /* data buffer */
1584     all_send_buf = (char *) ADIOI_Malloc( stail );
1585     if (buftype_is_contig) {
1586         for (i=0; i<nprocs; i++)
1587         {
1588             if (send_size[i]) {
1589                 sbuf_ptr = all_send_buf + sdispls[i];
1590                 memcpy( sbuf_ptr, buf + buf_idx[i], send_size[i] );
1591                 buf_idx[i] += send_size[i];
1592             }
1593         } 
1594     } else {
1595         send_buf = (char **) ADIOI_Malloc( nprocs * sizeof(char *) );
1596         for (i=0; i<nprocs; i++)
1597             send_buf[i] = all_send_buf + sdispls[i];
1598         ADIOI_Fill_send_buffer_nosend(fd, buf, flat_buf, send_buf,
1599                            offset_list, len_list, send_size, 
1600                            send_req,
1601                            sent_to_proc, nprocs, myrank, 
1602                            contig_access_count,
1603                            min_st_offset, fd_size, fd_start, fd_end, 
1604                            send_buf_idx, curr_to_proc, done_to_proc, iter,
1605                            buftype_extent);
1606         ADIOI_Free(send_buf);
1607     }
1608 
1609     gpfsmpio_prof_cw[GPFSMPIO_CIO_T_DEXCH_SETUP] += MPI_Wtime() - io_time;
1610 
1611     io_time = MPI_Wtime();
1612   /* alltoallv */
1613     MPI_Alltoallv( 
1614             all_send_buf, send_size, sdispls, MPI_BYTE,
1615             all_recv_buf, recv_size, rdispls, MPI_BYTE,
1616             fd->comm ); 
1617 
1618     ADIOI_Free( all_send_buf );
1619     ADIOI_Free(sdispls);
1620 
1621     gpfsmpio_prof_cw[GPFSMPIO_CIO_T_DEXCH_NET] += MPI_Wtime() - io_time;
1622     io_time = MPI_Wtime();
1623   /* data sieving pre-read */
1624   /* To avoid a read-modify-write, check if there are holes in the 
1625      data to be written. For this, merge the (sorted) offset lists
1626      others_req using a heap-merge. */
1627 
1628     sum = 0;
1629     for (i=0; i<nprocs; i++) sum += count[i];
1630     srt_off = (ADIO_Offset *) ADIOI_Malloc((sum+1)*sizeof(ADIO_Offset));
1631     srt_len = (int *) ADIOI_Malloc((sum+1)*sizeof(int));
1632 
1633     ADIOI_Heap_merge(others_req, count, srt_off, srt_len, start_pos,
1634                      nprocs, nprocs_recv, sum);
1635 
1636     /* check if there are any holes */
1637     *hole = 0;
1638     /* See if there are holes before the first request or after the last request*/
1639     if((srt_off[0] > off) || 
1640        ((srt_off[sum-1] + srt_len[sum-1]) < (off + size)))
1641     {
1642        *hole = 1;
1643     }
1644     else /* See if there are holes between the requests, if there are more than one */
1645       for (i=0; i<sum-1; i++)
1646         if (srt_off[i]+srt_len[i] < srt_off[i+1]) {
1647             *hole = 1;
1648             break;
1649         }
1650 
1651     ADIOI_Free(srt_off);
1652     ADIOI_Free(srt_len);
1653 
1654     gpfsmpio_prof_cw[GPFSMPIO_CIO_T_DEXCH_SORT] += MPI_Wtime() - io_time;
1655     io_time = MPI_Wtime();
1656     if (nprocs_recv) {
1657         if (*hole) {
1658             ADIO_ReadContig(fd, write_buf, size, MPI_BYTE,
1659                             ADIO_EXPLICIT_OFFSET, off, &status, &err);
1660             /* --BEGIN ERROR HANDLING-- */
1661             if (err != MPI_SUCCESS) {
1662                 *error_code = MPIO_Err_create_code(err,
1663                                                    MPIR_ERR_RECOVERABLE, myname,
1664                                                    __LINE__, MPI_ERR_IO,
1665                                                    "**ioRMWrdwr", 0);
1666                 return;
1667             }
1668             /* --END ERROR HANDLING-- */
1669         }
1670     }
1671     gpfsmpio_prof_cw[GPFSMPIO_CIO_T_DEXCH_SIEVE] += MPI_Wtime() - io_time;
1672 
1673   /* scater all_recv_buf into 4M cb_buffer */
1674     tmp_len = (int *) ADIOI_Malloc(nprocs*sizeof(int));
1675     for (i=0; i<nprocs; i++)
1676     {
1677         if (recv_size[i]) {
1678             if (partial_recv[i]) {
1679                 k = start_pos[i] + count[i] - 1;
1680                 tmp_len[i] = others_req[i].lens[k];
1681                 others_req[i].lens[k] = partial_recv[i];
1682             }
1683 
1684             sbuf_ptr = all_recv_buf + rdispls[i];
1685             for (j=0; j<count[i]; j++) {
1686                 ADIOI_ENSURE_AINT_FITS_IN_PTR(others_req[i].mem_ptrs[ start_pos[i]+j ]);
1687                 to_ptr = (char *) ADIOI_AINT_CAST_TO_VOID_PTR ( others_req[i].mem_ptrs[ start_pos[i]+j ] );
1688                 len    =           others_req[i].lens[     start_pos[i]+j ]  ;
1689                 memcpy( to_ptr, sbuf_ptr, len );
1690                 sbuf_ptr += len;
1691             }
1692 
1693             /* restore */
1694             if (partial_recv[i]) {
1695                 k = start_pos[i] + count[i] - 1;
1696                 others_req[i].lens[k] = tmp_len[i];
1697             }
1698             
1699         }
1700     }
1701     
1702     ADIOI_Free( tmp_len );
1703     ADIOI_Free( all_recv_buf );
1704     ADIOI_Free(rdispls);
1705     return; 
1706 }   
1707 
1708 static void ADIOI_Fill_send_buffer_nosend(ADIO_File fd, const void *buf, ADIOI_Flatlist_node
1709                            *flat_buf, char **send_buf, ADIO_Offset 
1710                            *offset_list, ADIO_Offset *len_list, int *send_size, 
1711                            MPI_Request *requests, int *sent_to_proc, 
1712                            int nprocs, int myrank, 
1713                            int contig_access_count, 
1714                            ADIO_Offset min_st_offset, ADIO_Offset fd_size,
1715                            ADIO_Offset *fd_start, ADIO_Offset *fd_end, 
1716                            int *send_buf_idx, int *curr_to_proc, 
1717                            int *done_to_proc, int iter,
1718                            MPI_Aint buftype_extent)
1719 {
1720 /* this function is only called if buftype is not contig */
1721 
1722     int i, p, flat_buf_idx;
1723     ADIO_Offset flat_buf_sz, size_in_buf, buf_incr, size;
1724     int jj, n_buftypes;
1725     ADIO_Offset off, len, rem_len, user_buf_idx;
1726 
1727 /*  curr_to_proc[p] = amount of data sent to proc. p that has already
1728     been accounted for so far
1729     done_to_proc[p] = amount of data already sent to proc. p in 
1730     previous iterations
1731     user_buf_idx = current location in user buffer 
1732     send_buf_idx[p] = current location in send_buf of proc. p  */
1733 
1734     for (i=0; i < nprocs; i++) {
1735         send_buf_idx[i] = curr_to_proc[i] = 0;
1736         done_to_proc[i] = sent_to_proc[i];
1737     }
1738     jj = 0;
1739 
1740     user_buf_idx = flat_buf->indices[0];
1741     flat_buf_idx = 0;
1742     n_buftypes = 0;
1743     flat_buf_sz = flat_buf->blocklens[0];
1744 
1745     /* flat_buf_idx = current index into flattened buftype
1746        flat_buf_sz = size of current contiguous component in 
1747                          flattened buf */
1748 
1749     for (i=0; i<contig_access_count; i++) { 
1750         off     = offset_list[i];
1751         rem_len = len_list[i];
1752 
1753         /*this request may span the file domains of more than one process*/
1754   while (rem_len != 0) {
1755             len = rem_len;
1756             /* NOTE: len value is modified by ADIOI_Calc_aggregator() to be no
1757              * longer than the single region that processor "p" is responsible
1758              * for.
1759              */
1760             p = ADIOI_GPFS_Calc_aggregator(fd,
1761                                       off,
1762                                       min_st_offset,
1763                                       &len,
1764                                       fd_size,
1765                                       fd_start,
1766                                       fd_end);
1767 
1768             if (send_buf_idx[p] < send_size[p]) {
1769                 if (curr_to_proc[p]+len > done_to_proc[p]) {
1770                     if (done_to_proc[p] > curr_to_proc[p]) {
1771                         size = ADIOI_MIN(curr_to_proc[p] + len - 
1772                                 done_to_proc[p], send_size[p]-send_buf_idx[p]);
1773                         buf_incr = done_to_proc[p] - curr_to_proc[p];
1774                         ADIOI_BUF_INCR
1775       ADIOI_Assert((curr_to_proc[p] + len - done_to_proc[p]) == (unsigned)(curr_to_proc[p] + len - done_to_proc[p]));
1776                         buf_incr = curr_to_proc[p] + len - done_to_proc[p];
1777       ADIOI_Assert((done_to_proc[p] + size) == (unsigned)(done_to_proc[p] + size));
1778                         curr_to_proc[p] = done_to_proc[p] + size;
1779                         ADIOI_BUF_COPY
1780                     }
1781                     else {
1782                         size = ADIOI_MIN(len,send_size[p]-send_buf_idx[p]);
1783                         buf_incr = len;
1784       ADIOI_Assert((curr_to_proc[p] + size) == (unsigned)((ADIO_Offset)curr_to_proc[p] + size));
1785                         curr_to_proc[p] += size;
1786                         ADIOI_BUF_COPY
1787                     }
1788                     /* moved to alltoallv */
1789                     /*
1790                     if (send_buf_idx[p] == send_size[p]) {
1791                         MPI_Isend(send_buf[p], send_size[p], MPI_BYTE, p, 
1792                                 myrank+p+100*iter, fd->comm, requests+jj);
1793                         jj++;
1794                     }
1795                     */
1796                 }
1797                 else {
1798         ADIOI_Assert((curr_to_proc[p] + len) == (unsigned)((ADIO_Offset)curr_to_proc[p] + len));
1799                     curr_to_proc[p] += (int)len;
1800                     buf_incr = len;
1801                     ADIOI_BUF_INCR
1802                 }
1803             }
1804             else {
1805                 buf_incr = len;
1806                 ADIOI_BUF_INCR
1807             }
1808             off     += len;
1809             rem_len -= len;
1810         }
1811     }
1812     for (i=0; i < nprocs; i++) 
1813         if (send_size[i]) sent_to_proc[i] = curr_to_proc[i];
1814 }

/* [<][>][^][v][top][bottom][index][help] */