root/ompi/mca/io/romio321/romio/adio/common/ad_io_coll.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. ADIOI_IOStridedColl
  2. ADIOI_Calc_bounds
  3. ADIOI_IOFiletype
  4. Exch_data_amounts
  5. post_aggregator_comm
  6. post_client_comm

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
   2 /* 
   3  *   Copyright (C) 2008 University of Chicago. 
   4  *   See COPYRIGHT notice in top-level directory.
   5  */
   6 
   7 #include "assert.h"
   8 #include "adio.h"
   9 #include "adio_extern.h"
  10 #ifdef AGGREGATION_PROFILE
  11 #include "mpe.h"
  12 #endif
  13 
  14 /* #define ALLTOALL */
  15 
  16 /* #define DEBUG */
  17 /* #define DEBUG2 */  /* print buffers */
  18 
  19 #define USE_PRE_REQ
  20 
  21 static void Exch_data_amounts (ADIO_File fd, int nprocs,
  22                         ADIO_Offset *client_comm_sz_arr,
  23                         ADIO_Offset *agg_comm_sz_arr,
  24                         int *client_alltoallw_counts,
  25                         int *agg_alltoallw_counts,
  26                         int *aggregators_done);
  27 static void post_aggregator_comm (MPI_Comm comm, int rw_type, int nproc,
  28                            void *cb_buf,
  29                            MPI_Datatype *client_comm_dtype_arr,
  30                            ADIO_Offset *client_comm_sz_arr,
  31                            MPI_Request **requests,
  32                            int *aggregators_client_count_p);
  33 
  34 static void post_client_comm (ADIO_File fd, int rw_type, 
  35                        int agg_rank, void *buf,
  36                        MPI_Datatype agg_comm_dtype,
  37                        int agg_alltoallw_count,
  38                        MPI_Request *request);
  39 
  40 /* Avery Ching and Kenin Columa's reworked two-phase algorithm.  Key features
  41  * - persistent file domains
  42  * - an option to use alltoall instead of point-to-point
  43  */
  44 void ADIOI_IOStridedColl (ADIO_File fd, void *buf, int count, int rdwr,
  45                           MPI_Datatype datatype, int file_ptr_type,
  46                           ADIO_Offset offset, ADIO_Status *status,
  47                           int *error_code)
  48 {
  49     ADIO_Offset min_st_offset=0, max_end_offset=0;
  50     ADIO_Offset st_end_offset[2];
  51     ADIO_Offset *all_st_end_offsets = NULL;
  52     int filetype_is_contig, buftype_is_contig, is_contig;
  53     ADIO_Offset off;
  54     int interleave_count = 0, i, nprocs, myrank, nprocs_for_coll;
  55     int cb_enable;
  56     ADIO_Offset bufsize;
  57     MPI_Aint extent, lb;
  58 #ifdef DEBUG2
  59     MPI_Aint bufextent;
  60 #endif
  61     MPI_Count size;
  62     int agg_rank;
  63 
  64     ADIO_Offset agg_disp; /* aggregated file offset */
  65     MPI_Datatype agg_dtype; /* aggregated file datatype */
  66 
  67     int aggregators_done = 0;
  68     ADIO_Offset buffered_io_size = 0;
  69 
  70     int *alltoallw_disps;
  71 
  72     int *alltoallw_counts;
  73     int *client_alltoallw_counts;
  74     int *agg_alltoallw_counts;
  75 
  76     char *cb_buf = NULL;
  77 
  78     MPI_Datatype *client_comm_dtype_arr; /* aggregator perspective */
  79     MPI_Datatype *agg_comm_dtype_arr;    /* client perspective */
  80     ADIO_Offset *client_comm_sz_arr;     /* aggregator perspective */
  81     ADIO_Offset *agg_comm_sz_arr;        /* client perspective */
  82 
  83     /* file views for each client and aggregator */
  84     view_state *client_file_view_state_arr = NULL;
  85     view_state *agg_file_view_state_arr    = NULL;
  86     /* mem views for local process */
  87     view_state *my_mem_view_state_arr      = NULL;
  88 
  89     MPI_Status *agg_comm_statuses     = NULL;
  90     MPI_Request *agg_comm_requests    = NULL;
  91     MPI_Status *client_comm_statuses  = NULL;
  92     MPI_Request *client_comm_requests = NULL;
  93     int aggs_client_count = 0;
  94     int clients_agg_count = 0;
  95 
  96     MPI_Comm_size (fd->comm, &nprocs);
  97     MPI_Comm_rank (fd->comm, &myrank);
  98 #ifdef DEBUG
  99     fprintf (stderr, "p%d: entering ADIOI_IOStridedColl\n", myrank);
 100 #endif
 101 #ifdef AGGREGATION_PROFILE
 102     if (rdwr == ADIOI_READ)
 103         MPE_Log_event (5010, 0, NULL);
 104     else
 105         MPE_Log_event (5012, 0, NULL);
 106 #endif
 107 
 108     /* I need to check if there are any outstanding nonblocking writes
 109        to the file, which could potentially interfere with the writes
 110        taking place in this collective write call. Since this is not
 111        likely to be common, let me do the simplest thing possible here:
 112        Each process completes all pending nonblocking operations before
 113        completing. */
 114 
 115     nprocs_for_coll = fd->hints->cb_nodes;
 116 
 117     if (rdwr == ADIOI_READ)
 118         cb_enable = fd->hints->cb_read;
 119     else
 120         cb_enable = fd->hints->cb_write;
 121 
 122     /* only check for interleaving if cb_read isn't disabled */
 123     if (cb_enable != ADIOI_HINT_DISABLE) {
 124         /* find the starting and ending byte of my I/O access */
 125         ADIOI_Calc_bounds (fd, count, datatype, file_ptr_type, offset,
 126                            &st_end_offset[0], &st_end_offset[1]);
 127 
 128         /* allocate an array of start/end pairs */
 129         all_st_end_offsets = (ADIO_Offset *)
 130             ADIOI_Malloc (2*nprocs*sizeof(ADIO_Offset));
 131         MPI_Allgather (st_end_offset, 2, ADIO_OFFSET, all_st_end_offsets, 2,
 132                        ADIO_OFFSET, fd->comm);
 133 
 134         min_st_offset = all_st_end_offsets[0];
 135         max_end_offset = all_st_end_offsets[1];
 136 
 137         for (i=1; i<nprocs; i++) {
 138             /* are the accesses of different processes interleaved? */
 139             if ((all_st_end_offsets[i*2] < all_st_end_offsets[i*2-1]) &&
 140                 (all_st_end_offsets[i*2] <= all_st_end_offsets[i*2+1]))
 141                 interleave_count++;
 142             /* This is a rudimentary check for interleaving, but should
 143              * suffice for the moment. */
 144             
 145             min_st_offset = ADIOI_MIN(all_st_end_offsets[i*2],
 146                                       min_st_offset);
 147             max_end_offset = ADIOI_MAX(all_st_end_offsets[i*2+1],
 148                                        max_end_offset);
 149         }
 150     }
 151 
 152     ADIOI_Datatype_iscontig (datatype, &buftype_is_contig);
 153     ADIOI_Datatype_iscontig (fd->filetype, &filetype_is_contig);
 154 
 155     if ((cb_enable == ADIOI_HINT_DISABLE
 156          || (!interleave_count && (cb_enable == ADIOI_HINT_AUTO)))
 157         && (fd->hints->cb_pfr != ADIOI_HINT_ENABLE)){
 158         if (cb_enable != ADIOI_HINT_DISABLE) {
 159             ADIOI_Free (all_st_end_offsets);
 160         }
 161 
 162         if (buftype_is_contig && filetype_is_contig) {
 163             if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
 164                 off = fd->disp + (fd->etype_size) * offset;
 165                 if (rdwr == ADIOI_READ)
 166                     ADIO_ReadContig(fd, buf, count, datatype,
 167                                     ADIO_EXPLICIT_OFFSET, off, status,
 168                                     error_code);
 169                 else
 170                     ADIO_WriteContig(fd, buf, count, datatype,
 171                                      ADIO_EXPLICIT_OFFSET, off, status,
 172                                      error_code);
 173             }
 174             else {
 175                 if (rdwr == ADIOI_READ)
 176                     ADIO_ReadContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
 177                                     0, status, error_code);
 178                 else
 179                     ADIO_WriteContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
 180                                      0, status, error_code);
 181             }
 182         }
 183         else {
 184             if (rdwr == ADIOI_READ)
 185                 ADIO_ReadStrided(fd, buf, count, datatype, file_ptr_type,
 186                                  offset, status, error_code);
 187             else
 188                 ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type,
 189                                   offset, status, error_code);
 190         }
 191         return;
 192     }
 193 
 194     MPI_Type_get_extent(datatype, &lb, &extent);
 195 #ifdef DEBUG2
 196     bufextent = extent * count;
 197 #endif
 198     MPI_Type_size_x(datatype, &size);
 199     bufsize = size * (MPI_Count)count;
 200 
 201     /* Calculate file realms */
 202     if ((fd->hints->cb_pfr != ADIOI_HINT_ENABLE) ||
 203         (fd->file_realm_types == NULL))
 204         ADIOI_Calc_file_realms (fd, min_st_offset, max_end_offset);
 205 
 206     my_mem_view_state_arr = (view_state *)
 207         ADIOI_Calloc (1, nprocs * sizeof(view_state));
 208     agg_file_view_state_arr = (view_state *)
 209         ADIOI_Calloc (1, nprocs * sizeof(view_state));
 210     client_comm_sz_arr = (ADIO_Offset *)
 211         ADIOI_Calloc (1, nprocs * sizeof(ADIO_Offset));
 212 
 213     if (fd->is_agg) {
 214         client_file_view_state_arr = (view_state *)
 215             ADIOI_Calloc (1, nprocs * sizeof(view_state));
 216     }
 217     else {
 218         client_file_view_state_arr = NULL;
 219     }
 220 
 221     /* Alltoallw doesn't like a null array even if the counts are
 222      * zero.  If you do not include this code, it will fail. */
 223     client_comm_dtype_arr = (MPI_Datatype *)
 224         ADIOI_Calloc (1, nprocs * sizeof(MPI_Datatype));
 225     if (!fd->is_agg)
 226         for (i = 0; i < nprocs; i++)
 227             client_comm_dtype_arr[i] = MPI_BYTE;
 228 
 229     ADIOI_Exch_file_views (myrank, nprocs, file_ptr_type, fd, count,
 230                            datatype, offset, my_mem_view_state_arr,
 231                            agg_file_view_state_arr,
 232                            client_file_view_state_arr);
 233 
 234     agg_comm_sz_arr = (ADIO_Offset *)
 235         ADIOI_Calloc (1, nprocs * sizeof(ADIO_Offset));
 236     agg_comm_dtype_arr = (MPI_Datatype *)
 237         ADIOI_Malloc (nprocs * sizeof(MPI_Datatype));
 238     if (fd->is_agg) {
 239         ADIOI_Build_agg_reqs (fd, rdwr, nprocs,
 240                               client_file_view_state_arr,
 241                               client_comm_dtype_arr,
 242                               client_comm_sz_arr,
 243                               &agg_disp,
 244                               &agg_dtype);
 245         buffered_io_size = 0;
 246         for (i=0; i <nprocs; i++) {
 247             if (client_comm_sz_arr[i] > 0)
 248                 buffered_io_size += client_comm_sz_arr[i];
 249         }
 250     }
 251 #ifdef USE_PRE_REQ
 252     else 
 253     {
 254         /* Example use of ADIOI_Build_client_pre_req. to an
 255          * appropriate section */
 256         
 257         for (i = 0; i < fd->hints->cb_nodes; i++)
 258         {
 259             agg_rank = fd->hints->ranklist[(i+myrank)%fd->hints->cb_nodes];
 260 #ifdef AGGREGATION_PROFILE
 261             MPE_Log_event (5040, 0, NULL);
 262 #endif
 263             ADIOI_Build_client_pre_req(
 264                 fd, agg_rank, (i+myrank)%fd->hints->cb_nodes,
 265                 &(my_mem_view_state_arr[agg_rank]),
 266                 &(agg_file_view_state_arr[agg_rank]),
 267                 2*1024*1024, 
 268                 64*1024);
 269 #ifdef AGGREGATION_PROFILE
 270             MPE_Log_event (5041, 0, NULL);
 271 #endif
 272         }
 273     }
 274 #endif
 275 
 276 
 277     if (fd->is_agg)
 278         cb_buf = (char *) ADIOI_Malloc (fd->hints->cb_buffer_size);
 279     alltoallw_disps  = (int *) ADIOI_Calloc (nprocs, sizeof(int));
 280     alltoallw_counts = client_alltoallw_counts = (int *)
 281         ADIOI_Calloc (2*nprocs, sizeof(int));
 282     agg_alltoallw_counts = &alltoallw_counts[nprocs];
 283 
 284     if (fd->hints->cb_alltoall == ADIOI_HINT_DISABLE) {
 285         /* aggregators pre-post all Irecv's for incoming data from clients */
 286         if ((fd->is_agg) && (rdwr == ADIOI_WRITE))
 287             post_aggregator_comm(fd->comm, rdwr, nprocs, cb_buf,
 288                              client_comm_dtype_arr,
 289                              client_comm_sz_arr,
 290                              &agg_comm_requests,
 291                              &aggs_client_count);
 292     }
 293     /* Aggregators send amounts for data requested to clients */
 294     Exch_data_amounts (fd, nprocs, client_comm_sz_arr, agg_comm_sz_arr,
 295                        client_alltoallw_counts, agg_alltoallw_counts,
 296                        &aggregators_done);
 297 
 298 #ifdef DEBUG
 299     fprintf (stderr, "client_alltoallw_counts[ ");
 300     for (i=0; i<nprocs; i++) {
 301         fprintf (stderr, "%d ", client_alltoallw_counts[i]);
 302     }
 303     fprintf (stderr, "]\n");
 304     fprintf (stderr, "agg_alltoallw_counts[ ");
 305     for (i=0; i<nprocs; i++) {
 306         fprintf (stderr,"%d ", agg_alltoallw_counts[i]);
 307     }
 308     fprintf (stderr, "]\n");
 309 #endif
 310 
 311     /* keep looping while aggregators still have I/O to do */
 312     while (aggregators_done != nprocs_for_coll) {
 313         if (fd->hints->cb_alltoall == ADIOI_HINT_DISABLE) {
 314         /* clients should build datatypes for local memory locations
 315            for data communication with aggregators and post
 316            communication as the datatypes are built */
 317 
 318         client_comm_requests = (MPI_Request *)
 319             ADIOI_Calloc (fd->hints->cb_nodes, sizeof(MPI_Request));
 320 
 321         for (i = 0; i < fd->hints->cb_nodes; i++)
 322         {
 323             clients_agg_count = 0;
 324             agg_rank = fd->hints->ranklist[(i+myrank)%fd->hints->cb_nodes];
 325             if (agg_comm_sz_arr[agg_rank] > 0) {
 326                 ADIOI_Build_client_req(fd, agg_rank,
 327                                        (i+myrank)%fd->hints->cb_nodes,
 328                                        &(my_mem_view_state_arr[agg_rank]),
 329                                        &(agg_file_view_state_arr[agg_rank]),
 330                                        agg_comm_sz_arr[agg_rank], 
 331                                        &(agg_comm_dtype_arr[agg_rank]));
 332 
 333 #ifdef AGGREGATION_PROFILE
 334                 if (i == 0)
 335                     MPE_Log_event (5038, 0, NULL);
 336 #endif
 337                 post_client_comm (fd, rdwr, agg_rank, buf,
 338                                   agg_comm_dtype_arr[agg_rank],
 339                                   agg_alltoallw_counts[agg_rank],
 340                                   &client_comm_requests[clients_agg_count]);
 341                 clients_agg_count++;
 342             }
 343         }
 344 #ifdef AGGREGATION_PROFILE
 345         if (!clients_agg_count)
 346             MPE_Log_event(5039, 0, NULL);
 347 #endif
 348 
 349         if (rdwr == ADIOI_READ) {
 350             if (fd->is_agg && buffered_io_size) {
 351                 ADIOI_IOFiletype (fd, cb_buf, buffered_io_size, MPI_BYTE,
 352                                   ADIO_EXPLICIT_OFFSET, agg_disp, agg_dtype,
 353                                   ADIOI_READ, status, error_code);
 354                 if (*error_code != MPI_SUCCESS) return;
 355                 MPI_Type_free (&agg_dtype);
 356             }
 357 
 358 #ifdef DEBUG
 359             fprintf (stderr, "expecting from [agg](disp,size,cnt)=");
 360             for (i=0; i < nprocs; i++) {
 361                 MPI_Type_size_x (agg_comm_dtype_arr[i], &size);
 362                 fprintf (stderr, "[%d](%d,%d,%d)", i, alltoallw_disps[i], 
 363                          size, agg_alltoallw_counts[i]);
 364                 if (i != nprocs - 1)
 365                     fprintf(stderr, ",");
 366             }
 367             fprintf (stderr, "]\n");
 368             if (fd->is_agg) {
 369                 fprintf (stderr, "sending to [client](disp,size,cnt)=");
 370                 for (i=0; i < nprocs; i++) {
 371                     if (fd->is_agg)
 372                         MPI_Type_size_x (client_comm_dtype_arr[i], &size);
 373                     else
 374                         size = -1;
 375                     
 376                     fprintf (stderr, "[%d](%d,%d,%d)", i, alltoallw_disps[i], 
 377                              size, client_alltoallw_counts[i]);
 378                     if (i != nprocs - 1)
 379                         fprintf(stderr, ",");
 380                 }
 381                 fprintf (stderr,"\n");
 382             }
 383             fflush (NULL);
 384 #endif
 385             /* aggregators post all Isends for outgoing data to clients */
 386             if (fd->is_agg)
 387                 post_aggregator_comm(fd->comm, rdwr, nprocs, cb_buf,
 388                                      client_comm_dtype_arr,
 389                                      client_comm_sz_arr,
 390                                      &agg_comm_requests,
 391                                      &aggs_client_count);
 392 
 393             if (fd->is_agg && aggs_client_count) {
 394                 agg_comm_statuses = ADIOI_Malloc(aggs_client_count *
 395                                                  sizeof(MPI_Status));
 396                 MPI_Waitall(aggs_client_count, agg_comm_requests,
 397                             agg_comm_statuses);
 398 #ifdef AGGREGATION_PROFILE
 399                 MPE_Log_event (5033, 0, NULL);
 400 #endif
 401                 ADIOI_Free (agg_comm_requests);
 402                 ADIOI_Free (agg_comm_statuses);
 403             }
 404 
 405             if (clients_agg_count) {
 406                 client_comm_statuses = ADIOI_Malloc(clients_agg_count *
 407                                                     sizeof(MPI_Status));
 408                 MPI_Waitall(clients_agg_count, client_comm_requests,
 409                             client_comm_statuses);
 410 #ifdef AGGREGATION_PROFILE
 411                 MPE_Log_event (5039, 0, NULL);
 412 #endif
 413                 ADIOI_Free (client_comm_requests);
 414                 ADIOI_Free (client_comm_statuses);
 415             }
 416 
 417 #ifdef DEBUG2
 418             fprintf (stderr, "buffered_io_size = %lld\n", buffered_io_size);
 419             if (fd->is_agg && buffered_io_size) {
 420                 fprintf (stderr, "buf = [");
 421                 for (i=0; i<bufextent; i++)
 422                     fprintf (stderr, "%c", ((char *) buf)[i]);
 423                 fprintf (stderr, "]\n");
 424                 fprintf (stderr, "cb_buf = [");
 425                 for (i=0; i<buffered_io_size; i++)
 426                     fprintf (stderr, "%c", cb_buf[i]);
 427                 fprintf (stderr, "]\n");
 428                 fflush (NULL);
 429             }
 430 #endif
 431         }
 432         else { /* Write Case */
 433 #ifdef DEBUG
 434             fprintf (stderr, "sending to [agg](disp,size,cnt)=");
 435             for (i=0; i < nprocs; i++) {
 436                 MPI_Type_size_x (agg_comm_dtype_arr[i], &size);
 437                 fprintf (stderr, "[%d](%d,%d,%d)", i, alltoallw_disps[i], 
 438                          size, agg_alltoallw_counts[i]);
 439                 if (i != nprocs - 1)
 440                     fprintf(stderr, ",");
 441             }
 442             fprintf (stderr, "]\n");
 443             fprintf (stderr, "expecting from [client](disp,size,cnt)=");
 444             for (i=0; i < nprocs; i++) {
 445                 if (fd->is_agg)
 446                     MPI_Type_size_x (client_comm_dtype_arr[i], &size);
 447                 else
 448                     size = -1;
 449                 
 450                 fprintf (stderr, "[%d](%d,%d,%d)", i, alltoallw_disps[i], 
 451                          size, client_alltoallw_counts[i]);
 452                 if (i != nprocs - 1)
 453                     fprintf(stderr, ",");
 454             }
 455             fprintf (stderr,"\n");
 456             fflush (NULL);
 457 #endif
 458 #ifdef DEBUG
 459             fprintf (stderr, "buffered_io_size = %lld\n", buffered_io_size);
 460 #endif
 461             
 462             if (clients_agg_count) {
 463                 client_comm_statuses = ADIOI_Malloc(clients_agg_count *
 464                                                     sizeof(MPI_Status));
 465                 MPI_Waitall(clients_agg_count, client_comm_requests,
 466                             client_comm_statuses);
 467 #ifdef AGGREGATION_PROFILE
 468                 MPE_Log_event (5039, 0, NULL);
 469 #endif
 470                 ADIOI_Free(client_comm_requests);
 471                 ADIOI_Free(client_comm_statuses);
 472             }
 473 #ifdef DEBUG2
 474             if (bufextent) {
 475                 fprintf (stderr, "buf = [");
 476                 for (i=0; i<bufextent; i++)
 477                     fprintf (stderr, "%c", ((char *) buf)[i]);
 478                 fprintf (stderr, "]\n");
 479             }
 480 #endif
 481 
 482             if (fd->is_agg && buffered_io_size) {
 483                 assert (aggs_client_count != 0);
 484                 /* make sure we actually have the data to write out */
 485                 agg_comm_statuses = (MPI_Status *)
 486                     ADIOI_Malloc (aggs_client_count*sizeof(MPI_Status));
 487                 
 488                 MPI_Waitall (aggs_client_count, agg_comm_requests,
 489                              agg_comm_statuses);
 490 #ifdef AGGREGATION_PROFILE
 491                 MPE_Log_event (5033, 0, NULL);
 492 #endif
 493                 ADIOI_Free (agg_comm_requests);
 494                 ADIOI_Free (agg_comm_statuses);
 495 #ifdef DEBUG2
 496                 fprintf (stderr, "cb_buf = [");
 497                 for (i=0; i<buffered_io_size; i++)
 498                     fprintf (stderr, "%c", cb_buf[i]);
 499                 fprintf (stderr, "]\n");
 500                 fflush (NULL);
 501 #endif
 502                 ADIOI_IOFiletype (fd, cb_buf, buffered_io_size, MPI_BYTE,
 503                                   ADIO_EXPLICIT_OFFSET, agg_disp, agg_dtype,
 504                                   ADIOI_WRITE, status, error_code);
 505                 if (*error_code != MPI_SUCCESS) return;
 506                 MPI_Type_free (&agg_dtype);
 507             }
 508 
 509         }
 510         } else {
 511         /* Alltoallw version of everything */
 512         ADIOI_Build_client_reqs(fd, nprocs, my_mem_view_state_arr,
 513                                 agg_file_view_state_arr,
 514                                 agg_comm_sz_arr, agg_comm_dtype_arr);
 515 
 516         if (rdwr == ADIOI_READ) {
 517             if (fd->is_agg && buffered_io_size) {
 518                 ADIOI_IOFiletype (fd, cb_buf, buffered_io_size, MPI_BYTE,
 519                                   ADIO_EXPLICIT_OFFSET, agg_disp, agg_dtype,
 520                                   ADIOI_READ, status, error_code);
 521                 if (*error_code != MPI_SUCCESS) return;
 522                 MPI_Type_free (&agg_dtype);
 523             }
 524 
 525 #ifdef AGGREGATION_PROFILE
 526             MPE_Log_event (5032, 0, NULL);
 527 #endif
 528             MPI_Alltoallw (cb_buf, client_alltoallw_counts, alltoallw_disps,
 529                            client_comm_dtype_arr,
 530                            buf, agg_alltoallw_counts , alltoallw_disps,
 531                            agg_comm_dtype_arr,
 532                            fd->comm);
 533 #ifdef AGGREGATION_PROFILE
 534             MPE_Log_event (5033, 0, NULL);
 535 #endif
 536         }
 537         else { /* Write Case */
 538 #ifdef AGGREGATION_PROFILE
 539             MPE_Log_event (5032, 0, NULL);
 540 #endif
 541             MPI_Alltoallw (buf, agg_alltoallw_counts, alltoallw_disps,
 542                            agg_comm_dtype_arr,
 543                            cb_buf, client_alltoallw_counts, alltoallw_disps,
 544                            client_comm_dtype_arr,
 545                            fd->comm);
 546 #ifdef AGGREGATION_PROFILE
 547             MPE_Log_event (5033, 0, NULL);
 548 #endif
 549             if (fd->is_agg && buffered_io_size) {
 550                 ADIOI_IOFiletype (fd, cb_buf, buffered_io_size, MPI_BYTE,
 551                                   ADIO_EXPLICIT_OFFSET, agg_disp, agg_dtype,
 552                                   ADIOI_WRITE, status, error_code);
 553                 if (*error_code != MPI_SUCCESS) return;
 554                 MPI_Type_free (&agg_dtype);
 555             }
 556         }
 557         }
 558 
 559         /* Free (uncommit) datatypes for reuse */
 560         if (fd->is_agg) {
 561             if (buffered_io_size > 0) {
 562                 for (i=0; i<nprocs; i++) {
 563                     if (client_comm_sz_arr[i] > 0)
 564                         MPI_Type_free (&client_comm_dtype_arr[i]);
 565                 }
 566             }
 567         }
 568         for (i=0; i<nprocs; i++) {
 569             if (agg_comm_sz_arr[i] > 0)
 570                 MPI_Type_free (&agg_comm_dtype_arr[i]);
 571         }
 572 
 573         /* figure out next set up requests */
 574         if (fd->is_agg) {
 575             ADIOI_Build_agg_reqs (fd, rdwr, nprocs,
 576                                   client_file_view_state_arr,
 577                                   client_comm_dtype_arr,
 578                                   client_comm_sz_arr,
 579                                   &agg_disp,
 580                                   &agg_dtype);
 581             buffered_io_size = 0;
 582             for (i=0; i <nprocs; i++) {
 583                 if (client_comm_sz_arr[i] > 0)
 584                     buffered_io_size += client_comm_sz_arr[i];
 585             }
 586         }
 587 #ifdef USE_PRE_REQ
 588         else {
 589             /* Example use of ADIOI_Build_client_pre_req. to an
 590              * appropriate section */
 591             for (i = 0; i < fd->hints->cb_nodes; i++)
 592             {
 593                 agg_rank = fd->hints->ranklist[(i+myrank)%fd->hints->cb_nodes];
 594 #ifdef AGGREGATION_PROFILE
 595                 MPE_Log_event (5040, 0, NULL);
 596 #endif
 597                 ADIOI_Build_client_pre_req(
 598                     fd, agg_rank, (i+myrank)%fd->hints->cb_nodes,
 599                     &(my_mem_view_state_arr[agg_rank]),
 600                     &(agg_file_view_state_arr[agg_rank]),
 601                     2*1024*1024, 
 602                     64*1024);
 603 #ifdef AGGREGATION_PROFILE
 604                 MPE_Log_event (5041, 0, NULL);
 605 #endif
 606             }
 607         }
 608 #endif
 609         
 610         /* aggregators pre-post all Irecv's for incoming data from
 611          * clients.  if nothing is needed, agg_comm_requests is not
 612          * allocated */
 613         if (fd->hints->cb_alltoall == ADIOI_HINT_DISABLE) {
 614             if ((fd->is_agg) && (rdwr == ADIOI_WRITE))
 615                 post_aggregator_comm(fd->comm, rdwr, nprocs, cb_buf,
 616                                  client_comm_dtype_arr,
 617                                  client_comm_sz_arr,
 618                                  &agg_comm_requests,
 619                                  &aggs_client_count);
 620         }
 621 
 622         /* Aggregators send amounts for data requested to clients */
 623         Exch_data_amounts (fd, nprocs, client_comm_sz_arr, agg_comm_sz_arr,
 624                            client_alltoallw_counts, agg_alltoallw_counts,
 625                            &aggregators_done);
 626 
 627     }
 628 
 629     /* Clean up */
 630         
 631     if (fd->hints->cb_pfr != ADIOI_HINT_ENABLE) {
 632         /* AAR, FSIZE, and User provided uniform File realms */
 633         if (1) {
 634             ADIOI_Delete_flattened (fd->file_realm_types[0]);
 635             MPI_Type_free (&fd->file_realm_types[0]);
 636         }
 637         else {
 638             for (i=0; i<fd->hints->cb_nodes; i++) {
 639                 ADIOI_Datatype_iscontig(fd->file_realm_types[i], &is_contig);
 640                 if (!is_contig)
 641                     ADIOI_Delete_flattened(fd->file_realm_types[i]);
 642                 MPI_Type_free (&fd->file_realm_types[i]);
 643             }
 644         }
 645         ADIOI_Free (fd->file_realm_types);
 646         ADIOI_Free (fd->file_realm_st_offs);
 647     }
 648 
 649     /* This memtype must be deleted from the ADIOI_Flatlist or else it
 650      * will match incorrectly with other datatypes which use this
 651      * pointer. */
 652     ADIOI_Delete_flattened(datatype);
 653     ADIOI_Delete_flattened(fd->filetype);
 654 
 655     if (fd->is_agg) {
 656         if (buffered_io_size > 0)
 657             MPI_Type_free (&agg_dtype);
 658         for (i=0; i<nprocs; i++) {
 659             MPI_Type_free (&client_comm_dtype_arr[i]);
 660             ADIOI_Free (client_file_view_state_arr[i].flat_type_p->indices);
 661             ADIOI_Free (client_file_view_state_arr[i].flat_type_p->blocklens);
 662             ADIOI_Free (client_file_view_state_arr[i].flat_type_p);
 663         }
 664         ADIOI_Free (client_file_view_state_arr);
 665         ADIOI_Free (cb_buf);
 666     } 
 667     for (i = 0; i<nprocs; i++)
 668         if (agg_comm_sz_arr[i] > 0)
 669             MPI_Type_free (&agg_comm_dtype_arr[i]);
 670     
 671     ADIOI_Free (client_comm_sz_arr);
 672     ADIOI_Free (client_comm_dtype_arr);
 673     ADIOI_Free (my_mem_view_state_arr);
 674     ADIOI_Free (agg_file_view_state_arr);
 675     ADIOI_Free (agg_comm_sz_arr);
 676     ADIOI_Free (agg_comm_dtype_arr);
 677     ADIOI_Free (alltoallw_disps);
 678     ADIOI_Free (alltoallw_counts);
 679     ADIOI_Free (all_st_end_offsets);
 680 
 681 #ifdef HAVE_STATUS_SET_BYTES
 682     MPIR_Status_set_bytes(status, datatype, bufsize);
 683     /* This is a temporary way of filling in status.  The right way is
 684      * to keep track of how much data was actually read and placed in
 685      * buf during collective I/O. */
 686 #endif
 687     fd->fp_sys_posn = -1; /* set it to null. */
 688 #ifdef AGGREGATION_PROFILE
 689     if (rdwr == ADIOI_READ)
 690         MPE_Log_event (5011, 0, NULL);
 691     else
 692         MPE_Log_event (5013, 0, NULL);
 693 #endif
 694 }
 695 
 696 
 697 /* Some of this code is from the old Calc_my_off_len() function.
 698  * It calculates the 1st and last byte accessed */
 699 void ADIOI_Calc_bounds (ADIO_File fd, int count, MPI_Datatype buftype,
 700                         int file_ptr_type, ADIO_Offset offset,
 701                         ADIO_Offset *st_offset, ADIO_Offset *end_offset)
 702 {
 703     MPI_Count filetype_size, buftype_size, etype_size;
 704     int sum;
 705     MPI_Aint filetype_extent, lb;
 706     ADIO_Offset total_io;
 707     int filetype_is_contig;
 708     ADIO_Offset i, remainder;
 709     ADIOI_Flatlist_node *flat_file;
 710     
 711     ADIO_Offset st_byte_off, end_byte_off;
 712     
 713 #ifdef AGGREGATION_PROFILE
 714     MPE_Log_event (5000, 0, NULL);
 715 #endif
 716 
 717     if (!count) {
 718         /* Max signed positive value for ADIO_Offset
 719          * (arch. dependent?).  is there a better way? */
 720         memset (st_offset, 8, sizeof(ADIO_Offset));
 721         *st_offset = *st_offset / 2;
 722         *end_offset = -1;
 723         return;
 724     }
 725 
 726     ADIOI_Datatype_iscontig (fd->filetype, &filetype_is_contig);
 727     
 728     MPI_Type_size_x (fd->filetype, &filetype_size);
 729     MPI_Type_get_extent (fd->filetype, &lb, &filetype_extent);
 730     MPI_Type_size_x (fd->etype, &etype_size);
 731     MPI_Type_size_x (buftype, &buftype_size);
 732     
 733     total_io = buftype_size * count;
 734 
 735     if (filetype_is_contig) {
 736         if (file_ptr_type == ADIO_INDIVIDUAL)
 737             st_byte_off = fd->fp_ind;
 738         else
 739             st_byte_off = fd->disp + etype_size * offset;
 740 
 741         end_byte_off = st_byte_off + total_io - 1;
 742     }
 743     else {
 744         flat_file = ADIOI_Flatlist;
 745         while (flat_file->type != fd->filetype) flat_file = flat_file->next;
 746 
 747         /* we need to take care of some weirdness since fd->fp_ind
 748            points at an accessible byte in file.  the first accessible
 749            byte in the file is not necessarily the first byte, nor is
 750            it necessarily the first off/len pair in the filetype. */
 751         if (file_ptr_type == ADIO_INDIVIDUAL) { 
 752             st_byte_off = fd->fp_ind;
 753             /* find end byte of I/O (may be in middle of an etype) */
 754 
 755             /* calculate byte starting point of first filetype */
 756             end_byte_off = (ADIO_Offset)
 757                 ((fd->fp_ind - fd->disp - flat_file->indices[0]) /
 758                  filetype_extent) * filetype_extent + fd->disp +
 759                 flat_file->indices[0];
 760             /* number of absolute bytes into first filetype */
 761             remainder = (fd->fp_ind - fd->disp - flat_file->indices[0]) %
 762                 filetype_extent;
 763             if (remainder) {
 764                 /* find how many file viewable bytes into first filetype */
 765                 sum = 0;
 766                 for (i=0; i<flat_file->count; i++) {
 767                     sum += flat_file->blocklens[i];
 768                     if ((flat_file->indices[i] - flat_file->indices[0] +
 769                          flat_file->blocklens[i]) >= remainder) {
 770                         sum -= (flat_file->blocklens[i] - (sum - remainder));
 771                         break;
 772                     }
 773                 }
 774                 total_io += sum;
 775             }
 776             /* byte starting point of last filetype */
 777             end_byte_off += (total_io - 1) / filetype_size * filetype_extent;
 778             /* number of bytes into last filetype */
 779             remainder = total_io % filetype_size;
 780             if (!remainder) {
 781                 for (i=flat_file->count - 1; i>=0; i--) {
 782                     if (flat_file->blocklens[i]) break;
 783                 }
 784                 assert (i > -1);
 785                 end_byte_off += flat_file->indices[i] +
 786                     flat_file->blocklens[i] - 1;
 787                 end_byte_off -= flat_file->indices[0];
 788             }
 789             else {
 790                 sum = 0;
 791                 for (i=0; i<flat_file->count; i++) {
 792                     sum += flat_file->blocklens[i];
 793                     if (sum >= remainder) {
 794                         end_byte_off += flat_file->indices[i] + 
 795                             flat_file->blocklens[i] - sum + remainder - 1;
 796                         break;
 797                     }
 798                 }
 799                 end_byte_off -= flat_file->indices[0];
 800             }
 801         }
 802         else {
 803             /* find starting byte of I/O (must be aligned with an etype) */
 804             /* byte starting point of starting filetype */                  
 805             st_byte_off = fd->disp + ((offset * etype_size) / filetype_size) *
 806                 filetype_extent;
 807             /* number of file viewable bytes into starting filetype */
 808             remainder = (etype_size * offset) % filetype_size;
 809             
 810             sum = 0;
 811             for (i=0; i<flat_file->count; i++) {
 812                 sum += flat_file->blocklens[i];
 813                 if (sum >= remainder) {
 814                     if (sum == remainder)
 815                         st_byte_off += flat_file->indices[i+1];
 816                     else
 817                         st_byte_off += flat_file->indices[i] +
 818                             flat_file->blocklens[i] - sum + remainder;
 819                     break;
 820                 }
 821             }
 822             
 823             /* find end byte of I/O (may be in middle of an etype) */
 824             /* byte starting point of last filetype */
 825             end_byte_off = fd->disp + (offset * etype_size + total_io) /
 826                 filetype_size * filetype_extent;
 827             /* number of bytes into last filetype */
 828             remainder = (offset * etype_size + total_io) % filetype_size;
 829             
 830             if (!remainder) {
 831                 /* the last non-zero off/len pair */
 832                 for (i=flat_file->count-1; i>=0; i--) {
 833                     if (flat_file->blocklens[i]) break;
 834                 }
 835                 assert (i >= 0);
 836                 /* back up a whole filetype, and put back up to the
 837                  * last byte of the last non-zero offlen pair */
 838                 /* end_byte_off = (end_byte_off - filetype_extent) +
 839                     flat_file->indices[i] +
 840                     flat_file->blocklens[i] - 1; */
 841                 /* equivalent of above commented out equation */
 842                 end_byte_off -= filetype_extent - flat_file->indices[i] -
 843                     flat_file->blocklens[i] + 1;
 844             }
 845             else {
 846                 sum = 0;
 847                 for (i=0; i<flat_file->count; i++) {
 848                     sum += flat_file->blocklens[i];
 849                     if (sum >= remainder) {
 850                         end_byte_off += flat_file->indices[i] + 
 851                             flat_file->blocklens[i] - sum + remainder - 1;
 852                         break;
 853                     }
 854                 }
 855             }
 856         }
 857     }
 858     
 859     *st_offset  = st_byte_off;
 860     *end_offset = end_byte_off;
 861 #ifdef DEBUG
 862     printf ("st_offset = %lld\nend_offset = %lld\n",
 863             st_byte_off, end_byte_off);
 864 #endif
 865 #ifdef AGGREGATION_PROFILE
 866     MPE_Log_event (5001, 0, NULL);
 867 #endif
 868 }
 869 
 870 /* wrapper function for ADIO_WriteStrided and ADIO_ReadStrided.  Used
 871  * by new 2 phase code to pass an arbitrary file type directly to
 872  * WriteStrided call without affecting existing code.  For the new 2
 873  * phase code, we really only need to set a custom_ftype, and we can
 874  * assume that this uses MPI_BYTE for the etype, and disp is 0 */
 875 void ADIOI_IOFiletype(ADIO_File fd, void *buf, int count,
 876                       MPI_Datatype datatype, int file_ptr_type,
 877                       ADIO_Offset offset, MPI_Datatype custom_ftype, 
 878                       int rdwr, ADIO_Status *status, int *error_code)
 879 {
 880     MPI_Datatype user_filetype;
 881     MPI_Datatype user_etype;
 882     ADIO_Offset user_disp;
 883     int user_ind_wr_buffer_size;
 884     int user_ind_rd_buffer_size;
 885     int f_is_contig, m_is_contig;
 886     int user_ds_read, user_ds_write;
 887     MPI_Aint f_extent, lb;
 888     MPI_Count f_size;
 889     int f_ds_percent; /* size/extent */
 890 
 891 #ifdef AGGREGATION_PROFILE
 892     if (rdwr == ADIOI_READ)
 893         MPE_Log_event(5006, 0, NULL);
 894     else
 895         MPE_Log_event(5008, 0, NULL);
 896 #endif
 897     MPI_Type_get_extent(custom_ftype, &lb, &f_extent);
 898     MPI_Type_size_x(custom_ftype, &f_size);
 899     f_ds_percent = 100 * f_size / f_extent;
 900 
 901     /* temporarily store file view information */
 902     user_filetype           = fd->filetype;
 903     user_etype              = fd->etype;
 904     user_disp               = fd->disp;
 905     user_ds_read            = fd->hints->ds_read;
 906     user_ds_write           = fd->hints->ds_write;
 907     /* temporarily override the independent I/O datasieve buffer size */
 908     user_ind_wr_buffer_size = fd->hints->ind_wr_buffer_size;
 909     user_ind_rd_buffer_size = fd->hints->ind_rd_buffer_size;
 910 
 911     /* set new values for temporary file view */
 912     fd->filetype = custom_ftype;
 913     fd->etype    = MPI_BYTE;
 914     /* set new values for independent I/O datasieve buffer size */
 915     fd->hints->ind_wr_buffer_size = fd->hints->cb_buffer_size;
 916     fd->hints->ind_rd_buffer_size = fd->hints->cb_buffer_size;
 917     /* decide whether or not to do datasieving */
 918 #ifdef DEBUG
 919     printf ("f_ds_percent = %d cb_ds_threshold = %d\n", f_ds_percent,
 920             fd->hints->cb_ds_threshold);
 921 #endif
 922     if (f_ds_percent >= fd->hints->cb_ds_threshold) {
 923         fd->hints->ds_read = ADIOI_HINT_ENABLE;
 924         fd->hints->ds_write = ADIOI_HINT_ENABLE;
 925     }
 926     else {
 927         fd->hints->ds_read = ADIOI_HINT_DISABLE;
 928         fd->hints->ds_write = ADIOI_HINT_DISABLE;
 929     }
 930 
 931     /* flatten the new filetype since the strided calls expect it to
 932      * have been flattened in set file view.  in the two phase code,
 933      * the datatype passed down should always be MPI_BYTE, and
 934      * therefore contiguous, but just for completeness sake, we'll
 935      * check the memory datatype anyway */
 936     ADIOI_Datatype_iscontig(custom_ftype, &f_is_contig);
 937     ADIOI_Datatype_iscontig(datatype, &m_is_contig);
 938     if (!f_is_contig)
 939         ADIOI_Flatten_datatype (custom_ftype);
 940 
 941     /* make appropriate Read/Write calls.  Let ROMIO figure out file
 942      * system specific stuff. */
 943     if (f_is_contig && m_is_contig) {
 944         fd->disp = 0;
 945         if (rdwr == ADIOI_READ)
 946             ADIO_ReadContig(fd, buf, count, datatype, file_ptr_type, offset,
 947                             status, error_code);
 948         else
 949             ADIO_WriteContig(fd, buf, count, datatype, file_ptr_type, offset,
 950                              status, error_code);
 951     }
 952     else {
 953         fd->disp = offset;
 954         if (rdwr == ADIOI_READ)
 955             ADIO_ReadStrided(fd, buf, count, datatype, file_ptr_type, 0,
 956                              status, error_code);
 957         else
 958             ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type, 0,
 959                               status, error_code);
 960     }
 961 
 962     /* Delete flattened temporary filetype */
 963     if (!f_is_contig)
 964         ADIOI_Delete_flattened (custom_ftype);
 965 
 966     /* restore the user specified file view to cover our tracks */
 967     fd->filetype                  = user_filetype;
 968     fd->etype                     = user_etype;
 969     fd->disp                      = user_disp;
 970     fd->hints->ds_read            = user_ds_read;
 971     fd->hints->ds_write           = user_ds_write;
 972     fd->hints->ind_wr_buffer_size = user_ind_wr_buffer_size;
 973     fd->hints->ind_rd_buffer_size = user_ind_rd_buffer_size;
 974 #ifdef AGGREGATION_PROFILE
 975     if (rdwr == ADIOI_READ)
 976         MPE_Log_event (5007, 0, NULL);
 977     else
 978         MPE_Log_event (5009, 0, NULL);
 979 #endif
 980 }
 981 
 982 static void Exch_data_amounts (ADIO_File fd, int nprocs,
 983                         ADIO_Offset *client_comm_sz_arr,
 984                         ADIO_Offset *agg_comm_sz_arr,
 985                         int *client_alltoallw_counts,
 986                         int *agg_alltoallw_counts,
 987                         int *aggregators_done)
 988 {
 989     int i;
 990     int recv_idx;
 991     MPI_Request *recv_requests;
 992     MPI_Request *send_requests;
 993     MPI_Status status;
 994     MPI_Status *send_statuses;
 995     /* Aggregators send amounts for data requested to clients */
 996     if (fd->hints->cb_alltoall != ADIOI_HINT_DISABLE) {
 997         MPI_Alltoall (client_comm_sz_arr, sizeof(ADIO_Offset), MPI_BYTE,
 998                   agg_comm_sz_arr, sizeof(ADIO_Offset), MPI_BYTE,
 999                   fd->comm);
1000 
1001         if (fd->is_agg) {
1002             for (i=0; i<nprocs; i++)
1003                 if (client_comm_sz_arr[i] > 0)
1004                     client_alltoallw_counts[i] = 1;
1005                 else
1006                     client_alltoallw_counts[i] = 0;
1007         }
1008         *aggregators_done = 0;
1009         for (i=0; i<nprocs; i++) {
1010             if (agg_comm_sz_arr[i] == -1)
1011                 *aggregators_done = *aggregators_done + 1;
1012             else if (agg_comm_sz_arr[i] > 0)
1013                 agg_alltoallw_counts[i] = 1;
1014             else
1015                 agg_alltoallw_counts[i] = 0;
1016         }
1017     } else {
1018         /* let's see if we can't reduce some communication as well as
1019          * overlap some communication and work */
1020 
1021         recv_requests = ADIOI_Malloc (fd->hints->cb_nodes * sizeof(MPI_Request));
1022         /* post all receives - only receive from aggregators */
1023         for (i = 0; i < fd->hints->cb_nodes; i++)
1024             MPI_Irecv (&agg_comm_sz_arr[fd->hints->ranklist[i]],
1025                    sizeof(ADIO_Offset), MPI_BYTE, fd->hints->ranklist[i],
1026                    AMT_TAG, fd->comm, &recv_requests[i]);
1027 
1028         /* Barrier is needed here if we're worried about unexpected
1029          * messages being dropped */
1030         /* MPI_Barrier (fd->comm); */
1031         send_requests = NULL;
1032         if (fd->is_agg) {
1033             /* only aggregators send data */
1034             send_requests = ADIOI_Malloc (nprocs * sizeof(MPI_Request)); 
1035 
1036             /* post all sends */
1037             for (i = 0; i < nprocs; i++) {
1038                 MPI_Isend (&client_comm_sz_arr[i], sizeof(ADIO_Offset),
1039                        MPI_BYTE, i, AMT_TAG, fd->comm, &send_requests[i]);
1040 
1041                 if (client_comm_sz_arr[i] > 0)
1042                     client_alltoallw_counts[i] = 1;
1043                 else
1044                     client_alltoallw_counts[i] = 0;
1045             }
1046         }
1047 
1048         *aggregators_done = 0;
1049         for (i=0; i < fd->hints->cb_nodes; i++) {
1050             MPI_Waitany (fd->hints->cb_nodes, recv_requests, &recv_idx, &status);
1051             if (agg_comm_sz_arr[fd->hints->ranklist[recv_idx]] == -1)
1052                 *aggregators_done = *aggregators_done + 1;
1053             else if (agg_comm_sz_arr[fd->hints->ranklist[recv_idx]] > 0)
1054                 agg_alltoallw_counts[fd->hints->ranklist[recv_idx]] = 1;
1055             else
1056                 agg_alltoallw_counts[fd->hints->ranklist[recv_idx]] = 0;
1057         }
1058 
1059         ADIOI_Free (recv_requests);
1060         if (fd->is_agg) {
1061             /* wait for all sends to complete */
1062             send_statuses = ADIOI_Malloc (nprocs * sizeof (MPI_Status));
1063             MPI_Waitall (nprocs, send_requests, send_statuses);
1064             ADIOI_Free (send_requests);
1065             ADIOI_Free (send_statuses);
1066         }
1067     }
1068 }
1069 
1070 static void post_aggregator_comm (MPI_Comm comm, int rw_type, 
1071                            int nproc, void *cb_buf,
1072                            MPI_Datatype *client_comm_dtype_arr,
1073                            ADIO_Offset *client_comm_sz_arr,
1074                            MPI_Request **requests_p,
1075                            int *aggs_client_count_p)
1076 {
1077     int aggs_client_count = 0;
1078     MPI_Request *requests;
1079     int i;
1080 
1081 #ifdef DEBUG
1082     printf ("posting aggregator communication\n");
1083 #endif
1084 
1085     for (i=0; i < nproc; i++)
1086         if (client_comm_sz_arr[i] > 0)
1087             aggs_client_count++;
1088 #ifdef DEBUG
1089     printf ("aggregator needs to talk to %d clients\n",
1090         aggs_client_count);
1091 #endif
1092     *aggs_client_count_p = aggs_client_count;
1093     if (aggs_client_count) {
1094         requests = (MPI_Request *)
1095             ADIOI_Malloc (aggs_client_count * sizeof(MPI_Request));
1096         aggs_client_count = 0;
1097 #ifdef AGGREGATION_PROFILE
1098         MPE_Log_event (5032, 0, NULL);
1099 #endif
1100         for (i=0; i < nproc; i++) {
1101             if (client_comm_sz_arr[i] > 0) {
1102                 if (rw_type == ADIOI_WRITE)
1103                     MPI_Irecv (cb_buf, 1, client_comm_dtype_arr[i], i,
1104                                DATA_TAG, comm,
1105                                &requests[aggs_client_count]);
1106                 else
1107                     MPI_Isend (cb_buf, 1, client_comm_dtype_arr[i], i,
1108                                DATA_TAG, comm,
1109                                &requests[aggs_client_count]);
1110 
1111                 aggs_client_count++;
1112             }
1113         }
1114         *requests_p = requests;
1115     }
1116 }
1117 
1118 static void post_client_comm (ADIO_File fd, int rw_type, 
1119                        int agg_rank, void *buf,
1120                        MPI_Datatype agg_comm_dtype,
1121                        int agg_alltoallw_count,
1122                        MPI_Request *request)
1123 {
1124     if (agg_alltoallw_count) {
1125         if (rw_type == ADIOI_READ)
1126             MPI_Irecv (buf, 1, agg_comm_dtype, agg_rank, DATA_TAG, fd->comm,
1127                        request);
1128         else
1129             MPI_Isend (buf, 1, agg_comm_dtype, agg_rank, DATA_TAG, fd->comm,
1130                        request);
1131     }
1132 }
1133 
1134 
1135 

/* [<][>][^][v][top][bottom][index][help] */