root/ompi/mca/io/romio321/romio/adio/ad_gpfs/ad_gpfs_aggrs.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. ADIOI_GPFS_Calc_aggregator
  2. ADIOI_GPFS_Calc_file_domains
  3. ADIOI_GPFS_Calc_my_req
  4. ADIOI_GPFS_Calc_others_req

   1 /* ---------------------------------------------------------------- */
   2 /* (C)Copyright IBM Corp.  2007, 2008, 2019                         */
   3 /* ---------------------------------------------------------------- */
   4 /**
   5  * \file ad_gpfs_aggrs.c
   6  * \brief The externally used function from this file is is declared in ad_gpfs_aggrs.h
   7  */
   8 
   9 /* -*- Mode: C; c-basic-offset:4 ; -*- */
  10 /*
  11  *   Copyright (C) 1997-2001 University of Chicago.
  12  *   See COPYRIGHT notice in top-level directory.
  13  */
  14 
  15 
  16 #include "adio.h"
  17 #include "adio_cb_config_list.h"
  18 #include "ad_gpfs.h"
  19 #include "ad_gpfs_aggrs.h"
  20 
  21 #ifdef AGGREGATION_PROFILE
  22 #include "mpe.h"
  23 #endif
  24 
  25 
  26 #ifdef USE_DBG_LOGGING
  27   #define AGG_DEBUG 1
  28 #endif
  29 
  30 #ifndef TRACE_ERR
  31 #  define TRACE_ERR(format...)
  32 #endif
  33 
  34 /* Comments copied from common:
  35  * This file contains four functions:
  36  *
  37  * ADIOI_Calc_aggregator()
  38  * ADIOI_Calc_file_domains()
  39  * ADIOI_Calc_my_req()
  40  * ADIOI_Calc_others_req()
  41  *
  42  * The last three of these were originally in ad_read_coll.c, but they are
  43  * also shared with ad_write_coll.c.  I felt that they were better kept with
  44  * the rest of the shared aggregation code.
  45  */
  46 
  47 /* Discussion of values available from above:
  48  *
  49  * ADIO_Offset st_offsets[0..nprocs-1]
  50  * ADIO_Offset end_offsets[0..nprocs-1]
  51  *    These contain a list of start and end offsets for each process in
  52  *    the communicator.  For example, an access at loc 10, size 10 would
  53  *    have a start offset of 10 and end offset of 19.
  54  * int nprocs
  55  *    number of processors in the collective I/O communicator
  56  * ADIO_Offset min_st_offset
  57  * ADIO_Offset fd_start[0..nprocs_for_coll-1]
  58  *    starting location of "file domain"; region that a given process will
  59  *    perform aggregation for (i.e. actually do I/O)
  60  * ADIO_Offset fd_end[0..nprocs_for_coll-1]
  61  *    start + size - 1 roughly, but it can be less, or 0, in the case of
  62  *    uneven distributions
  63  */
  64 
  65 /* Description from common/ad_aggregate.c.  (Does it completely apply to bg?)
  66  * ADIOI_Calc_aggregator()
  67  *
  68  * The intention here is to implement a function which provides basically
  69  * the same functionality as in Rajeev's original version of
  70  * ADIOI_Calc_my_req().  He used a ceiling division approach to assign the
  71  * file domains, and we use the same approach here when calculating the
  72  * location of an offset/len in a specific file domain.  Further we assume
  73  * this same distribution when calculating the rank_index, which is later
  74  *  used to map to a specific process rank in charge of the file domain.
  75  *
  76  * A better (i.e. more general) approach would be to use the list of file
  77  * domains only.  This would be slower in the case where the
  78  * original ceiling division was used, but it would allow for arbitrary
  79  * distributions of regions to aggregators.  We'd need to know the
  80  * nprocs_for_coll in that case though, which we don't have now.
  81  *
  82  * Note a significant difference between this function and Rajeev's old code:
  83  * this code doesn't necessarily return a rank in the range
  84  * 0..nprocs_for_coll; instead you get something in 0..nprocs.  This is a
  85  * result of the rank mapping; any set of ranks in the communicator could be
  86  * used now.
  87  *
  88  * Returns an integer representing a rank in the collective I/O communicator.
  89  *
  90  * The "len" parameter is also modified to indicate the amount of data
  91  * actually available in this file domain.
  92  */
  93 /*
  94  * This is more general aggregator search function which does not base on the assumption
  95  * that each aggregator hosts the file domain with the same size
  96  */
  97 int ADIOI_GPFS_Calc_aggregator(ADIO_File fd,
  98                               ADIO_Offset off,
  99                               ADIO_Offset min_off,
 100                               ADIO_Offset *len,
 101                               ADIO_Offset fd_size,
 102                               ADIO_Offset *fd_start,
 103                               ADIO_Offset *fd_end)
 104 {
 105     int rank_index, rank;
 106     ADIO_Offset avail_bytes;
 107     TRACE_ERR("Entering ADIOI_GPFS_Calc_aggregator\n");
 108 
 109     ADIOI_Assert ( (off <= fd_end[fd->hints->cb_nodes-1] && off >= min_off && fd_start[0] >= min_off ) );
 110 
 111     /* binary search --> rank_index is returned */
 112     int ub = fd->hints->cb_nodes;
 113     int lb = 0;
 114     /* get an index into our array of aggregators */
 115     /* Common code for striping - bg doesn't use it but it's
 116        here to make diff'ing easier.
 117     rank_index = (int) ((off - min_off + fd_size)/ fd_size - 1);
 118 
 119     if (fd->hints->striping_unit > 0) {
 120         * wkliao: implementation for file domain alignment
 121            fd_start[] and fd_end[] have been aligned with file lock
 122            boundaries when returned from ADIOI_Calc_file_domains() so cannot
 123            just use simple arithmatic as above *
 124         rank_index = 0;
 125         while (off > fd_end[rank_index]) rank_index++;
 126     }
 127     bg does it's own striping below
 128     */
 129     rank_index = fd->hints->cb_nodes / 2;
 130     while ( off < fd_start[rank_index] || off > fd_end[rank_index] ) {
 131         if ( off > fd_end  [rank_index] ) {
 132             lb = rank_index;
 133             rank_index = (rank_index + ub) / 2;
 134         }
 135         else
 136         if ( off < fd_start[rank_index] ) {
 137             ub = rank_index;
 138             rank_index = (rank_index + lb) / 2;
 139         }
 140     }
 141     /* we index into fd_end with rank_index, and fd_end was allocated to be no
 142      * bigger than fd->hins->cb_nodes.   If we ever violate that, we're
 143      * overrunning arrays.  Obviously, we should never ever hit this abort */
 144     if (rank_index >= fd->hints->cb_nodes || rank_index < 0) {
 145         FPRINTF(stderr, "Error in ADIOI_Calc_aggregator(): rank_index(%d) >= fd->hints->cb_nodes (%d) fd_size=%lld off=%lld\n",
 146                         rank_index,fd->hints->cb_nodes,fd_size,off);
 147         MPI_Abort(MPI_COMM_WORLD, 1);
 148     }
 149     /* DBG_FPRINTF ("ADIOI_GPFS_Calc_aggregator: rank_index = %d\n",
 150        rank_index ); */
 151 
 152     /*
 153      * remember here that even in Rajeev's original code it was the case that
 154      * different aggregators could end up with different amounts of data to
 155      * aggregate.  here we use fd_end[] to make sure that we know how much
 156      * data this aggregator is working with.
 157      *
 158      * the +1 is to take into account the end vs. length issue.
 159      */
 160     avail_bytes = fd_end[rank_index] + 1 - off;
 161     if (avail_bytes < *len && avail_bytes > 0) {
 162         /* this file domain only has part of the requested contig. region */
 163 
 164         *len = avail_bytes;
 165     }
 166 
 167     /* map our index to a rank */
 168     /* NOTE: FOR NOW WE DON'T HAVE A MAPPING...JUST DO 0..NPROCS_FOR_COLL */
 169     rank = fd->hints->ranklist[rank_index];
 170     TRACE_ERR("Leaving ADIOI_GPFS_Calc_aggregator\n");
 171 
 172     return rank;
 173 }
 174 
 175 /*
 176  * Compute a dynamic access range based file domain partition among I/O aggregators,
 177  * which align to the GPFS block size
 178  * Divide the I/O workload among "nprocs_for_coll" processes. This is
 179  * done by (logically) dividing the file into file domains (FDs); each
 180  * process may directly access only its own file domain.
 181  * Additional effort is to make sure that each I/O aggregator get
 182  * a file domain that aligns to the GPFS block size.  So, there will
 183  * not be any false sharing of GPFS file blocks among multiple I/O nodes.
 184  *
 185  * The common version of this now accepts a min_fd_size and striping_unit.
 186  * It doesn't seem necessary here (using GPFS block sizes) but keep it in mind
 187  * (e.g. we could pass striping unit instead of using fs_ptr->blksize).
 188  */
 189 void ADIOI_GPFS_Calc_file_domains(ADIO_File fd,
 190                                       ADIO_Offset *st_offsets,
 191                                       ADIO_Offset *end_offsets,
 192                                       int          nprocs,
 193                                       int          nprocs_for_coll,
 194                                       ADIO_Offset *min_st_offset_ptr,
 195                                       ADIO_Offset **fd_start_ptr,
 196                                       ADIO_Offset **fd_end_ptr,
 197                                       ADIO_Offset *fd_size_ptr,
 198                                       void        *fs_ptr)
 199 {
 200     ADIO_Offset min_st_offset, max_end_offset, *fd_start, *fd_end, *fd_size;
 201     int i, aggr;
 202     TRACE_ERR("Entering ADIOI_GPFS_Calc_file_domains\n");
 203     blksize_t blksize;
 204 
 205 #ifdef AGGREGATION_PROFILE
 206     MPE_Log_event (5004, 0, NULL);
 207 #endif
 208 
 209 #   if AGG_DEBUG
 210     static char myname[] = "ADIOI_GPFS_Calc_file_domains";
 211     DBG_FPRINTF(stderr, "%s(%d): %d aggregator(s)\n",
 212             myname,__LINE__,nprocs_for_coll);
 213 #   endif
 214     if (fd->blksize <= 0)
 215         /* default to 1M if blksize unset */
 216         fd->blksize = 1048576;
 217     blksize = fd->blksize;
 218 
 219 #   if AGG_DEBUG
 220     DBG_FPRINTF(stderr,"%s(%d): Blocksize=%ld\n",myname,__LINE__,blksize);
 221 #   endif
 222 /* find min of start offsets and max of end offsets of all processes */
 223     min_st_offset  = st_offsets [0];
 224     max_end_offset = end_offsets[0];
 225     for (i=1; i<nprocs; i++) {
 226         min_st_offset = ADIOI_MIN(min_st_offset, st_offsets[i]);
 227         max_end_offset = ADIOI_MAX(max_end_offset, end_offsets[i]);
 228     }
 229 
 230     /* DBG_FPRINTF(stderr, "_calc_file_domains, min_st_offset, max_
 231        = %qd, %qd\n", min_st_offset, max_end_offset );*/
 232 
 233     /* determine the "file domain (FD)" of each process, i.e., the portion of
 234        the file that will be "owned" by each process */
 235 
 236     ADIO_Offset gpfs_ub       = (max_end_offset +blksize-1) / blksize * blksize - 1;
 237     ADIO_Offset gpfs_lb       = min_st_offset / blksize * blksize;
 238     ADIO_Offset gpfs_ub_rdoff = (max_end_offset +blksize-1) / blksize * blksize - 1 - max_end_offset;
 239     ADIO_Offset gpfs_lb_rdoff = min_st_offset - min_st_offset / blksize * blksize;
 240     ADIO_Offset fd_gpfs_range = gpfs_ub - gpfs_lb + 1;
 241 
 242     int         naggs    = nprocs_for_coll;
 243 
 244     /* Tweak the file domains so that no fd is smaller than a threshold.  We
 245      * have to strike a balance between efficency and parallelism: somewhere
 246      * between 10k processes sending 32-byte requests and one process sending a
 247      * 320k request is a (system-dependent) sweet spot
 248 
 249     This is from the common code - the new min_fd_size parm that we didn't implement.
 250     (And common code uses a different declaration of fd_size so beware)
 251 
 252     if (fd_size < min_fd_size)
 253         fd_size = min_fd_size;
 254     */
 255     fd_size              = (ADIO_Offset *) ADIOI_Malloc(nprocs_for_coll * sizeof(ADIO_Offset));
 256     *fd_start_ptr        = (ADIO_Offset *) ADIOI_Malloc(nprocs_for_coll * sizeof(ADIO_Offset));
 257     *fd_end_ptr          = (ADIO_Offset *) ADIOI_Malloc(nprocs_for_coll * sizeof(ADIO_Offset));
 258     fd_start             = *fd_start_ptr;
 259     fd_end               = *fd_end_ptr;
 260 
 261     /* each process will have a file domain of some number of gpfs blocks, but
 262      * the division of blocks is not likely to be even.  Some file domains will
 263      * be "large" and others "small"
 264      *
 265      * Example: consider  17 blocks distributed over 3 aggregators.
 266      * nb_cn_small = 17/3 = 5
 267      * naggs_large = 17 - 3*(17/3) = 17 - 15  = 2
 268      * naggs_small = 3 - 2 = 1
 269      *
 270      * and you end up with file domains of {5-blocks, 6-blocks, 6-blocks}
 271      *
 272      * what about (relatively) small files?  say, a file of 1000 blocks
 273      * distributed over 2064 aggregators:
 274      * nb_cn_small = 1000/2064 = 0
 275      * naggs_large = 1000 - 2064*(1000/2064) = 1000
 276      * naggs_small = 2064 - 1000 = 1064
 277      * and you end up with domains of {0, 0, 0, ... 1, 1, 1 ...}
 278      *
 279      * it might be a good idea instead of having all the zeros up front, to
 280      * "mix" those zeros into the fd_size array.  that way, no pset/bridge-set
 281      * is left with zero work.  In fact, even if the small file domains aren't
 282      * zero, it's probably still a good idea to mix the "small" file domains
 283      * across the fd_size array to keep the io nodes in balance */
 284 
 285 
 286     ADIO_Offset n_gpfs_blk    = fd_gpfs_range / blksize;
 287     ADIO_Offset nb_cn_small   = n_gpfs_blk/naggs;
 288     ADIO_Offset naggs_large   = n_gpfs_blk - naggs * (n_gpfs_blk/naggs);
 289     ADIO_Offset naggs_small   = naggs - naggs_large;
 290 
 291 #ifdef BGQPLATFORM
 292     if (gpfsmpio_balancecontig == 1) {
 293         /* File domains blocks are assigned to aggregators in a breadth-first
 294          * fashion relative to the ions - additionally, file domains on the
 295          * aggregators sharing the same bridgeset and ion have contiguous
 296          * offsets. */
 297 
 298         // initialize everything to small
 299         for (i=0; i<naggs; i++)
 300             fd_size[i] = nb_cn_small     * blksize;
 301 
 302         // go thru and distribute the large across the bridges
 303 
 304         /* bridelistoffset: agg rank list offsets using the bridgelist - each
 305          * entry is created by adding up the indexes for the aggs from all
 306          * previous bridges */
 307         int *bridgelistoffset =
 308             (int *) ADIOI_Malloc(fd->hints->fs_hints.bg.numbridges*sizeof(int));
 309         /* tmpbridgelistnum: copy of the bridgelistnum whose entries can be
 310          * decremented to keep track of bridge assignments during the actual
 311          * large block assignments to the agg rank list*/
 312         int *tmpbridgelistnum =
 313             (int *) ADIOI_Malloc(fd->hints->fs_hints.bg.numbridges*sizeof(int));
 314 
 315         int j;
 316         for (j=0;j<fd->hints->fs_hints.bg.numbridges;j++) {
 317             int k, bridgerankoffset = 0;
 318             for (k=0;k<j;k++) {
 319                 bridgerankoffset += fd->hints->fs_hints.bg.bridgelistnum[k];
 320             }
 321             bridgelistoffset[j] = bridgerankoffset;
 322         }
 323 
 324         for (j=0;j<fd->hints->fs_hints.bg.numbridges;j++)
 325             tmpbridgelistnum[j] = fd->hints->fs_hints.bg.bridgelistnum[j];
 326         int bridgeiter = 0;
 327 
 328         /* distribute the large blocks across the aggs going breadth-first
 329          * across the bridgelist - this distributes the fd sizes across the
 330          * ions, so later in the file domain assignment when it iterates thru
 331          * the ranklist the offsets will be contiguous within the bridge and
 332          * ion as well */
 333         for (j=0;j<naggs_large;j++) {
 334             int foundbridge = 0;
 335             int numbridgelistpasses = 0;
 336             while (!foundbridge) {
 337                 if (tmpbridgelistnum[bridgeiter] > 0) {
 338                     foundbridge = 1;
 339                     /*
 340                        printf("bridgeiter is %d tmpbridgelistnum[bridgeiter] is %d bridgelistoffset[bridgeiter] is %d\n",bridgeiter,tmpbridgelistnum[bridgeiter],bridgelistoffset[bridgeiter]);
 341                        printf("naggs is %d bridgeiter is %d bridgelistoffset[bridgeiter] is %d tmpbridgelistnum[bridgeiter] is %d\n",naggs, bridgeiter,bridgelistoffset[bridgeiter],tmpbridgelistnum[bridgeiter]);
 342                        printf("naggs is %d bridgeiter is %d setting fd_size[%d]\n",naggs, bridgeiter,bridgelistoffset[bridgeiter]+(fd->hints->bridgelistnum[bridgeiter]-tmpbridgelistnum[bridgeiter]));
 343                      */
 344                     int currentbridgelistnum =
 345                         (fd->hints->fs_hints.bg.bridgelistnum[bridgeiter]-
 346                          tmpbridgelistnum[bridgeiter]);
 347                     int currentfdsizeindex = bridgelistoffset[bridgeiter] +
 348                         currentbridgelistnum;
 349                     fd_size[currentfdsizeindex] = (nb_cn_small+1) * blksize;
 350                     tmpbridgelistnum[bridgeiter]--;
 351                 }
 352                 if (bridgeiter == (fd->hints->fs_hints.bg.numbridges-1)) {
 353                     /* guard against infinite loop - should only ever make 1 pass
 354                      * thru bridgelist */
 355                     ADIOI_Assert(numbridgelistpasses == 0);
 356                     numbridgelistpasses++;
 357                     bridgeiter = 0;
 358                 }
 359                 else
 360                     bridgeiter++;
 361             }
 362         }
 363         ADIOI_Free(tmpbridgelistnum);
 364         ADIOI_Free(bridgelistoffset);
 365 
 366     } else {
 367         /* BG/L- and BG/P-style distribution of file domains: simple allocation of
 368          * file domins to each aggregator */
 369         for (i=0; i<naggs; i++) {
 370             if (i < naggs_large) {
 371                 fd_size[i] = (nb_cn_small+1) * blksize;
 372             } else {
 373                 fd_size[i] = nb_cn_small     * blksize;
 374             }
 375         }
 376     }
 377 #ifdef balancecontigtrace
 378     int myrank;
 379     MPI_Comm_rank(fd->comm,&myrank);
 380     if (myrank == 0) {
 381       fprintf(stderr,"naggs_small is %d nb_cn_small is %d\n",naggs_small,nb_cn_small);
 382         for (i=0; i<naggs; i++) {
 383             fprintf(stderr,"fd_size[%d] set to %d agg rank is %d\n",i,fd_size[i],fd->hints->ranklist[i]);
 384         }
 385     }
 386 #endif
 387 
 388 #else // not BGQ platform
 389         for (i=0; i<naggs; i++) {
 390             if (i < naggs_large) {
 391                 fd_size[i] = (nb_cn_small+1) * blksize;
 392             } else {
 393                 fd_size[i] = nb_cn_small     * blksize;
 394             }
 395     }
 396 
 397 #endif
 398 
 399 
 400 #   if AGG_DEBUG
 401      DBG_FPRINTF(stderr,"%s(%d): "
 402                    "gpfs_ub       %llu, "
 403                    "gpfs_lb       %llu, "
 404                    "gpfs_ub_rdoff %llu, "
 405                    "gpfs_lb_rdoff %llu, "
 406                    "fd_gpfs_range %llu, "
 407                    "n_gpfs_blk    %llu, "
 408                    "nb_cn_small   %llu, "
 409                    "naggs_large   %llu, "
 410                    "naggs_small   %llu, "
 411                    "\n",
 412                    myname,__LINE__,
 413                    gpfs_ub      ,
 414                    gpfs_lb      ,
 415                    gpfs_ub_rdoff,
 416                    gpfs_lb_rdoff,
 417                    fd_gpfs_range,
 418                    n_gpfs_blk   ,
 419                    nb_cn_small  ,
 420                    naggs_large  ,
 421                    naggs_small
 422                    );
 423 #   endif
 424 
 425     fd_size[0]       -= gpfs_lb_rdoff;
 426     fd_size[naggs-1] -= gpfs_ub_rdoff;
 427 
 428     /* compute the file domain for each aggr */
 429     ADIO_Offset offset = min_st_offset;
 430     for (aggr=0; aggr<naggs; aggr++) {
 431         fd_start[aggr] = offset;
 432         fd_end  [aggr] = offset + fd_size[aggr] - 1;
 433         offset += fd_size[aggr];
 434     }
 435 
 436     *fd_size_ptr = fd_size[0];
 437     *min_st_offset_ptr = min_st_offset;
 438 
 439 #ifdef AGGREGATION_PROFILE
 440     MPE_Log_event (5005, 0, NULL);
 441 #endif
 442     ADIOI_Free (fd_size);
 443     TRACE_ERR("Leaving ADIOI_GPFS_Calc_file_domains\n");
 444 }
 445 
 446 /*
 447  * ADIOI_GPFS_Calc_my_req() overrides ADIOI_Calc_my_req for the default implementation
 448  * is specific for static file domain partitioning.
 449  *
 450  * ADIOI_Calc_my_req() - calculate what portions of the access requests
 451  * of this process are located in the file domains of various processes
 452  * (including this one)
 453  */
 454 void ADIOI_GPFS_Calc_my_req(ADIO_File fd, ADIO_Offset *offset_list, ADIO_Offset *len_list,
 455                            int contig_access_count, ADIO_Offset
 456                            min_st_offset, ADIO_Offset *fd_start,
 457                            ADIO_Offset *fd_end, ADIO_Offset fd_size,
 458                            int nprocs,
 459                            int *count_my_req_procs_ptr,
 460                            int **count_my_req_per_proc_ptr,
 461                            ADIOI_Access **my_req_ptr,
 462                            int **buf_idx_ptr)
 463 /* Possibly reconsider if buf_idx's are ok as int's, or should they be aints/offsets?
 464    They are used as memory buffer indices so it seems like the 2G limit is in effect */
 465 {
 466     int *count_my_req_per_proc, count_my_req_procs, *buf_idx;
 467     int i, l, proc;
 468     ADIO_Offset fd_len, rem_len, curr_idx, off;
 469     ADIOI_Access *my_req;
 470     TRACE_ERR("Entering ADIOI_GPFS_Calc_my_req\n");
 471 
 472 #ifdef AGGREGATION_PROFILE
 473     MPE_Log_event (5024, 0, NULL);
 474 #endif
 475     *count_my_req_per_proc_ptr = (int *) ADIOI_Calloc(nprocs,sizeof(int));
 476     count_my_req_per_proc = *count_my_req_per_proc_ptr;
 477 /* count_my_req_per_proc[i] gives the no. of contig. requests of this
 478    process in process i's file domain. calloc initializes to zero.
 479    I'm allocating memory of size nprocs, so that I can do an
 480    MPI_Alltoall later on.*/
 481 
 482     buf_idx = (int *) ADIOI_Malloc(nprocs*sizeof(int));
 483 /* buf_idx is relevant only if buftype_is_contig.
 484    buf_idx[i] gives the index into user_buf where data received
 485    from proc. i should be placed. This allows receives to be done
 486    without extra buffer. This can't be done if buftype is not contig. */
 487 
 488     /* initialize buf_idx to -1 */
 489     for (i=0; i < nprocs; i++) buf_idx[i] = -1;
 490 
 491     /* one pass just to calculate how much space to allocate for my_req;
 492      * contig_access_count was calculated way back in ADIOI_Calc_my_off_len()
 493      */
 494     for (i=0; i < contig_access_count; i++) {
 495         /* short circuit offset/len processing if len == 0
 496          *      (zero-byte  read/write */
 497         if (len_list[i] == 0)
 498                 continue;
 499         off = offset_list[i];
 500         fd_len = len_list[i];
 501         /* note: we set fd_len to be the total size of the access.  then
 502          * ADIOI_Calc_aggregator() will modify the value to return the
 503          * amount that was available from the file domain that holds the
 504          * first part of the access.
 505          */
 506   /* BES */
 507         proc = ADIOI_GPFS_Calc_aggregator(fd, off, min_st_offset, &fd_len, fd_size,
 508                                      fd_start, fd_end);
 509         count_my_req_per_proc[proc]++;
 510 
 511         /* figure out how much data is remaining in the access (i.e. wasn't
 512          * part of the file domain that had the starting byte); we'll take
 513          * care of this data (if there is any) in the while loop below.
 514          */
 515         rem_len = len_list[i] - fd_len;
 516 
 517         while (rem_len > 0) {
 518             off += fd_len; /* point to first remaining byte */
 519             fd_len = rem_len; /* save remaining size, pass to calc */
 520             proc = ADIOI_GPFS_Calc_aggregator(fd, off, min_st_offset, &fd_len,
 521                                          fd_size, fd_start, fd_end);
 522 
 523             count_my_req_per_proc[proc]++;
 524             rem_len -= fd_len; /* reduce remaining length by amount from fd */
 525         }
 526     }
 527 
 528 /* now allocate space for my_req, offset, and len */
 529 
 530     *my_req_ptr = (ADIOI_Access *)
 531         ADIOI_Malloc(nprocs*sizeof(ADIOI_Access));
 532     my_req = *my_req_ptr;
 533 
 534     count_my_req_procs = 0;
 535     for (i=0; i < nprocs; i++) {
 536         if (count_my_req_per_proc[i]) {
 537             my_req[i].offsets = (ADIO_Offset *)
 538                 ADIOI_Malloc(count_my_req_per_proc[i] * sizeof(ADIO_Offset));
 539             my_req[i].lens =
 540                 ADIOI_Malloc(count_my_req_per_proc[i] * sizeof(ADIO_Offset));
 541             count_my_req_procs++;
 542         }
 543         my_req[i].count = 0;  /* will be incremented where needed
 544                                       later */
 545     }
 546 
 547 /* now fill in my_req */
 548     curr_idx = 0;
 549     for (i=0; i<contig_access_count; i++) {
 550         /* short circuit offset/len processing if len == 0
 551          *      (zero-byte  read/write */
 552         if (len_list[i] == 0)
 553                 continue;
 554         off = offset_list[i];
 555         fd_len = len_list[i];
 556         proc = ADIOI_GPFS_Calc_aggregator(fd, off, min_st_offset, &fd_len, fd_size,
 557                                      fd_start, fd_end);
 558 
 559         /* for each separate contiguous access from this process */
 560         if (buf_idx[proc] == -1)
 561   {
 562     ADIOI_Assert(curr_idx == (int) curr_idx);
 563     buf_idx[proc] = (int) curr_idx;
 564   }
 565 
 566         l = my_req[proc].count;
 567         curr_idx += fd_len;
 568 
 569         rem_len = len_list[i] - fd_len;
 570 
 571         /* store the proc, offset, and len information in an array
 572          * of structures, my_req. Each structure contains the
 573          * offsets and lengths located in that process's FD,
 574          * and the associated count.
 575          */
 576         my_req[proc].offsets[l] = off;
 577         my_req[proc].lens[l] = fd_len;
 578         my_req[proc].count++;
 579 
 580         while (rem_len > 0) {
 581             off += fd_len;
 582             fd_len = rem_len;
 583             proc = ADIOI_GPFS_Calc_aggregator(fd, off, min_st_offset, &fd_len,
 584                                          fd_size, fd_start, fd_end);
 585 
 586             if (buf_idx[proc] == -1)
 587       {
 588         ADIOI_Assert(curr_idx == (int) curr_idx);
 589         buf_idx[proc] = (int) curr_idx;
 590       }
 591 
 592             l = my_req[proc].count;
 593             curr_idx += fd_len;
 594             rem_len -= fd_len;
 595 
 596             my_req[proc].offsets[l] = off;
 597             my_req[proc].lens[l] = fd_len;
 598             my_req[proc].count++;
 599         }
 600     }
 601 
 602 
 603 
 604 #ifdef AGG_DEBUG
 605     for (i=0; i<nprocs; i++) {
 606         if (count_my_req_per_proc[i] > 0) {
 607             DBG_FPRINTF(stderr, "data needed from %d (count = %d):\n", i,
 608                     my_req[i].count);
 609             for (l=0; l < my_req[i].count; l++) {
 610                 DBG_FPRINTF(stderr, "   off[%d] = %lld, len[%d] = %lld\n", l,
 611                         my_req[i].offsets[l], l, my_req[i].lens[l]);
 612             }
 613         }
 614         DBG_FPRINTF(stderr, "buf_idx[%d] = 0x%x\n", i, buf_idx[i]);
 615     }
 616 #endif
 617 
 618     *count_my_req_procs_ptr = count_my_req_procs;
 619     *buf_idx_ptr = buf_idx;
 620 #ifdef AGGREGATION_PROFILE
 621     MPE_Log_event (5025, 0, NULL);
 622 #endif
 623     TRACE_ERR("Leaving ADIOI_GPFS_Calc_my_req\n");
 624 }
 625 
 626 /*
 627  * ADIOI_Calc_others_req (copied to bg and switched to all to all for performance)
 628  *
 629  * param[in]  count_my_req_procs        Number of processes whose file domain my
 630  *                                        request touches.
 631  * param[in]  count_my_req_per_proc     count_my_req_per_proc[i] gives the no. of
 632  *                                        contig. requests of this process in
 633  *                                        process i's file domain.
 634  * param[in]  my_req                    A structure defining my request
 635  * param[in]  nprocs                    Number of nodes in the block
 636  * param[in]  myrank                    Rank of this node
 637  * param[out] count_others_req_proc_ptr Number of processes whose requests lie in
 638  *                                        my process's file domain (including my
 639  *                                        process itself)
 640  * param[out] others_req_ptr            Array of other process' requests that lie
 641  *                                        in my process's file domain
 642  */
 643 void ADIOI_GPFS_Calc_others_req(ADIO_File fd, int count_my_req_procs,
 644                                 int *count_my_req_per_proc,
 645                                 ADIOI_Access *my_req,
 646                                 int nprocs, int myrank,
 647                                 int *count_others_req_procs_ptr,
 648                                 ADIOI_Access **others_req_ptr)
 649 {
 650     TRACE_ERR("Entering ADIOI_GPFS_Calc_others_req\n");
 651 /* determine what requests of other processes lie in this process's
 652    file domain */
 653 
 654 /* count_others_req_procs = number of processes whose requests lie in
 655    this process's file domain (including this process itself)
 656    count_others_req_per_proc[i] indicates how many separate contiguous
 657    requests of proc. i lie in this process's file domain. */
 658 
 659     int *count_others_req_per_proc, count_others_req_procs;
 660     int i;
 661     ADIOI_Access *others_req;
 662 
 663     /* Parameters for MPI_Alltoallv */
 664     int *scounts, *sdispls, *rcounts, *rdispls;
 665 
 666 /* first find out how much to send/recv and from/to whom */
 667 #ifdef AGGREGATION_PROFILE
 668     MPE_Log_event (5026, 0, NULL);
 669 #endif
 670     /* Send 1 int to each process.  count_my_req_per_proc[i] is the number of
 671      * requests that my process will do to the file domain owned by process[i].
 672      * Receive 1 int from each process.  count_others_req_per_proc[i] is the number of
 673      * requests that process[i] will do to the file domain owned by my process.
 674      */
 675     count_others_req_per_proc = (int *) ADIOI_Malloc(nprocs*sizeof(int));
 676 /*     cora2a1=timebase(); */
 677 /*for(i=0;i<nprocs;i++) ?*/
 678     MPI_Alltoall(count_my_req_per_proc, 1, MPI_INT,
 679                  count_others_req_per_proc, 1, MPI_INT, fd->comm);
 680 
 681 /*     total_cora2a+=timebase()-cora2a1; */
 682 
 683     /* Allocate storage for an array of other nodes' accesses of our
 684      * node's file domain.  Also allocate storage for the alltoallv
 685      * parameters.
 686      */
 687     *others_req_ptr = (ADIOI_Access *)
 688         ADIOI_Malloc(nprocs*sizeof(ADIOI_Access));
 689     others_req = *others_req_ptr;
 690 
 691     scounts = ADIOI_Malloc(nprocs*sizeof(int));
 692     sdispls = ADIOI_Malloc(nprocs*sizeof(int));
 693     rcounts = ADIOI_Malloc(nprocs*sizeof(int));
 694     rdispls = ADIOI_Malloc(nprocs*sizeof(int));
 695 
 696     /* If process[i] has any requests in my file domain,
 697      *   initialize an ADIOI_Access structure that will describe each request
 698      *   from process[i].  The offsets, lengths, and buffer pointers still need
 699      *   to be obtained to complete the setting of this structure.
 700      */
 701     count_others_req_procs = 0;
 702     for (i=0; i<nprocs; i++) {
 703         if (count_others_req_per_proc[i])
 704   {
 705             others_req[i].count = count_others_req_per_proc[i];
 706 
 707             others_req[i].offsets = (ADIO_Offset *)
 708                 ADIOI_Malloc(count_others_req_per_proc[i]*sizeof(ADIO_Offset));
 709             others_req[i].lens =
 710                 ADIOI_Malloc(count_others_req_per_proc[i]*sizeof(ADIO_Offset));
 711 
 712             others_req[i].mem_ptrs = (MPI_Aint *)
 713                 ADIOI_Malloc(count_others_req_per_proc[i]*sizeof(MPI_Aint));
 714 
 715             count_others_req_procs++;
 716         }
 717         else
 718         {
 719             others_req[i].count = 0;
 720             others_req[i].offsets = NULL;
 721             others_req[i].lens    = NULL;
 722         }
 723     }
 724 
 725     /* Now send the calculated offsets and lengths to respective processes */
 726 
 727     /************************/
 728     /* Exchange the offsets */
 729     /************************/
 730 
 731     // Figure out the layout for the sendbuf and recvbuf.
 732     // scounts[] and sdisps[]  /  rcounts[] and rdisps[] define the layout,
 733     // and the data for each section will come from from my_req[i].offsets
 734     // or others_req[i].offsets.
 735 
 736     int scount_total = 0;
 737     int rcount_total = 0;
 738     for (i=0; i<nprocs; i++)
 739     {
 740         /* Send these offsets to process i.*/
 741         scounts[i] = count_my_req_per_proc[i];
 742         sdispls[i] = scount_total;
 743         scount_total += scounts[i];
 744 
 745         /* Receive these offsets from process i.*/
 746         rcounts[i] = count_others_req_per_proc[i];
 747         rdispls[i] = rcount_total;
 748         rcount_total += rcounts[i];
 749     }
 750 
 751     void *sbuf_copy_of_req_info;
 752     void *rbuf_copy_of_req_info;
 753 
 754     sbuf_copy_of_req_info = (ADIO_Offset *) ADIOI_Malloc(scount_total * sizeof(ADIO_Offset));
 755     rbuf_copy_of_req_info = (ADIO_Offset *) ADIOI_Malloc(rcount_total * sizeof(ADIO_Offset));
 756     for (i=0; i<nprocs; i++)
 757     {
 758         // I haven't timed it, I'm just assuming a memcpy(,,0) is fast for
 759         // the entries that don't have data to contribute so I didn't bother
 760         // with an 'if' statement
 761         memcpy(sbuf_copy_of_req_info + sdispls[i] * sizeof(ADIO_Offset),
 762             my_req[i].offsets,
 763             scounts[i] * sizeof(ADIO_Offset));
 764     }
 765 
 766     /* Exchange the offsets */
 767     MPI_Alltoallv(sbuf_copy_of_req_info,
 768                   scounts, sdispls, ADIO_OFFSET,
 769                   rbuf_copy_of_req_info,
 770                   rcounts, rdispls, ADIO_OFFSET,
 771                   fd->comm);
 772     for (i=0; i<nprocs; i++)
 773     {
 774         memcpy(others_req[i].offsets,
 775             rbuf_copy_of_req_info + rdispls[i] * sizeof(ADIO_Offset),
 776             rcounts[i] * sizeof(ADIO_Offset));
 777     }
 778 
 779     /************************/
 780     /* Exchange the lengths */
 781     /************************/
 782 
 783     for (i=0; i<nprocs; i++)
 784     {
 785         memcpy(sbuf_copy_of_req_info + sdispls[i] * sizeof(ADIO_Offset),
 786             my_req[i].lens,
 787             scounts[i] * sizeof(ADIO_Offset));
 788     }
 789 
 790     /* Exchange the lengths */
 791     MPI_Alltoallv(sbuf_copy_of_req_info,
 792                   scounts, sdispls, ADIO_OFFSET,
 793                   rbuf_copy_of_req_info,
 794                   rcounts, rdispls, ADIO_OFFSET,
 795                   fd->comm);
 796     for (i=0; i<nprocs; i++)
 797     {
 798         memcpy(others_req[i].lens,
 799             rbuf_copy_of_req_info + rdispls[i] * sizeof(ADIO_Offset),
 800             rcounts[i] * sizeof(ADIO_Offset));
 801     }
 802 
 803     /* Clean up */
 804     ADIOI_Free(sbuf_copy_of_req_info);
 805     ADIOI_Free(rbuf_copy_of_req_info);
 806     ADIOI_Free(count_others_req_per_proc);
 807     ADIOI_Free (scounts);
 808     ADIOI_Free (sdispls);
 809     ADIOI_Free (rcounts);
 810     ADIOI_Free (rdispls);
 811 
 812     *count_others_req_procs_ptr = count_others_req_procs;
 813 #ifdef AGGREGATION_PROFILE
 814     MPE_Log_event (5027, 0, NULL);
 815 #endif
 816     TRACE_ERR("Leaving ADIOI_GPFS_Calc_others_req\n");
 817 }

/* [<][>][^][v][top][bottom][index][help] */