This source file includes following definitions.
- ADIOI_IOStridedColl
- ADIOI_Calc_bounds
- ADIOI_IOFiletype
- Exch_data_amounts
- post_aggregator_comm
- post_client_comm
   1 
   2 
   3 
   4 
   5 
   6 
   7 #include "assert.h"
   8 #include "adio.h"
   9 #include "adio_extern.h"
  10 #ifdef AGGREGATION_PROFILE
  11 #include "mpe.h"
  12 #endif
  13 
  14 
  15 
  16 
  17   
  18 
  19 #define USE_PRE_REQ
  20 
  21 static void Exch_data_amounts (ADIO_File fd, int nprocs,
  22                         ADIO_Offset *client_comm_sz_arr,
  23                         ADIO_Offset *agg_comm_sz_arr,
  24                         int *client_alltoallw_counts,
  25                         int *agg_alltoallw_counts,
  26                         int *aggregators_done);
  27 static void post_aggregator_comm (MPI_Comm comm, int rw_type, int nproc,
  28                            void *cb_buf,
  29                            MPI_Datatype *client_comm_dtype_arr,
  30                            ADIO_Offset *client_comm_sz_arr,
  31                            MPI_Request **requests,
  32                            int *aggregators_client_count_p);
  33 
  34 static void post_client_comm (ADIO_File fd, int rw_type, 
  35                        int agg_rank, void *buf,
  36                        MPI_Datatype agg_comm_dtype,
  37                        int agg_alltoallw_count,
  38                        MPI_Request *request);
  39 
  40 
  41 
  42 
  43 
  44 void ADIOI_IOStridedColl (ADIO_File fd, void *buf, int count, int rdwr,
  45                           MPI_Datatype datatype, int file_ptr_type,
  46                           ADIO_Offset offset, ADIO_Status *status,
  47                           int *error_code)
  48 {
  49     ADIO_Offset min_st_offset=0, max_end_offset=0;
  50     ADIO_Offset st_end_offset[2];
  51     ADIO_Offset *all_st_end_offsets = NULL;
  52     int filetype_is_contig, buftype_is_contig, is_contig;
  53     ADIO_Offset off;
  54     int interleave_count = 0, i, nprocs, myrank, nprocs_for_coll;
  55     int cb_enable;
  56     ADIO_Offset bufsize;
  57     MPI_Aint extent, lb;
  58 #ifdef DEBUG2
  59     MPI_Aint bufextent;
  60 #endif
  61     MPI_Count size;
  62     int agg_rank;
  63 
  64     ADIO_Offset agg_disp; 
  65     MPI_Datatype agg_dtype; 
  66 
  67     int aggregators_done = 0;
  68     ADIO_Offset buffered_io_size = 0;
  69 
  70     int *alltoallw_disps;
  71 
  72     int *alltoallw_counts;
  73     int *client_alltoallw_counts;
  74     int *agg_alltoallw_counts;
  75 
  76     char *cb_buf = NULL;
  77 
  78     MPI_Datatype *client_comm_dtype_arr; 
  79     MPI_Datatype *agg_comm_dtype_arr;    
  80     ADIO_Offset *client_comm_sz_arr;     
  81     ADIO_Offset *agg_comm_sz_arr;        
  82 
  83     
  84     view_state *client_file_view_state_arr = NULL;
  85     view_state *agg_file_view_state_arr    = NULL;
  86     
  87     view_state *my_mem_view_state_arr      = NULL;
  88 
  89     MPI_Status *agg_comm_statuses     = NULL;
  90     MPI_Request *agg_comm_requests    = NULL;
  91     MPI_Status *client_comm_statuses  = NULL;
  92     MPI_Request *client_comm_requests = NULL;
  93     int aggs_client_count = 0;
  94     int clients_agg_count = 0;
  95 
  96     MPI_Comm_size (fd->comm, &nprocs);
  97     MPI_Comm_rank (fd->comm, &myrank);
  98 #ifdef DEBUG
  99     fprintf (stderr, "p%d: entering ADIOI_IOStridedColl\n", myrank);
 100 #endif
 101 #ifdef AGGREGATION_PROFILE
 102     if (rdwr == ADIOI_READ)
 103         MPE_Log_event (5010, 0, NULL);
 104     else
 105         MPE_Log_event (5012, 0, NULL);
 106 #endif
 107 
 108     
 109 
 110 
 111 
 112 
 113 
 114 
 115     nprocs_for_coll = fd->hints->cb_nodes;
 116 
 117     if (rdwr == ADIOI_READ)
 118         cb_enable = fd->hints->cb_read;
 119     else
 120         cb_enable = fd->hints->cb_write;
 121 
 122     
 123     if (cb_enable != ADIOI_HINT_DISABLE) {
 124         
 125         ADIOI_Calc_bounds (fd, count, datatype, file_ptr_type, offset,
 126                            &st_end_offset[0], &st_end_offset[1]);
 127 
 128         
 129         all_st_end_offsets = (ADIO_Offset *)
 130             ADIOI_Malloc (2*nprocs*sizeof(ADIO_Offset));
 131         MPI_Allgather (st_end_offset, 2, ADIO_OFFSET, all_st_end_offsets, 2,
 132                        ADIO_OFFSET, fd->comm);
 133 
 134         min_st_offset = all_st_end_offsets[0];
 135         max_end_offset = all_st_end_offsets[1];
 136 
 137         for (i=1; i<nprocs; i++) {
 138             
 139             if ((all_st_end_offsets[i*2] < all_st_end_offsets[i*2-1]) &&
 140                 (all_st_end_offsets[i*2] <= all_st_end_offsets[i*2+1]))
 141                 interleave_count++;
 142             
 143 
 144             
 145             min_st_offset = ADIOI_MIN(all_st_end_offsets[i*2],
 146                                       min_st_offset);
 147             max_end_offset = ADIOI_MAX(all_st_end_offsets[i*2+1],
 148                                        max_end_offset);
 149         }
 150     }
 151 
 152     ADIOI_Datatype_iscontig (datatype, &buftype_is_contig);
 153     ADIOI_Datatype_iscontig (fd->filetype, &filetype_is_contig);
 154 
 155     if ((cb_enable == ADIOI_HINT_DISABLE
 156          || (!interleave_count && (cb_enable == ADIOI_HINT_AUTO)))
 157         && (fd->hints->cb_pfr != ADIOI_HINT_ENABLE)){
 158         if (cb_enable != ADIOI_HINT_DISABLE) {
 159             ADIOI_Free (all_st_end_offsets);
 160         }
 161 
 162         if (buftype_is_contig && filetype_is_contig) {
 163             if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
 164                 off = fd->disp + (fd->etype_size) * offset;
 165                 if (rdwr == ADIOI_READ)
 166                     ADIO_ReadContig(fd, buf, count, datatype,
 167                                     ADIO_EXPLICIT_OFFSET, off, status,
 168                                     error_code);
 169                 else
 170                     ADIO_WriteContig(fd, buf, count, datatype,
 171                                      ADIO_EXPLICIT_OFFSET, off, status,
 172                                      error_code);
 173             }
 174             else {
 175                 if (rdwr == ADIOI_READ)
 176                     ADIO_ReadContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
 177                                     0, status, error_code);
 178                 else
 179                     ADIO_WriteContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
 180                                      0, status, error_code);
 181             }
 182         }
 183         else {
 184             if (rdwr == ADIOI_READ)
 185                 ADIO_ReadStrided(fd, buf, count, datatype, file_ptr_type,
 186                                  offset, status, error_code);
 187             else
 188                 ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type,
 189                                   offset, status, error_code);
 190         }
 191         return;
 192     }
 193 
 194     MPI_Type_get_extent(datatype, &lb, &extent);
 195 #ifdef DEBUG2
 196     bufextent = extent * count;
 197 #endif
 198     MPI_Type_size_x(datatype, &size);
 199     bufsize = size * (MPI_Count)count;
 200 
 201     
 202     if ((fd->hints->cb_pfr != ADIOI_HINT_ENABLE) ||
 203         (fd->file_realm_types == NULL))
 204         ADIOI_Calc_file_realms (fd, min_st_offset, max_end_offset);
 205 
 206     my_mem_view_state_arr = (view_state *)
 207         ADIOI_Calloc (1, nprocs * sizeof(view_state));
 208     agg_file_view_state_arr = (view_state *)
 209         ADIOI_Calloc (1, nprocs * sizeof(view_state));
 210     client_comm_sz_arr = (ADIO_Offset *)
 211         ADIOI_Calloc (1, nprocs * sizeof(ADIO_Offset));
 212 
 213     if (fd->is_agg) {
 214         client_file_view_state_arr = (view_state *)
 215             ADIOI_Calloc (1, nprocs * sizeof(view_state));
 216     }
 217     else {
 218         client_file_view_state_arr = NULL;
 219     }
 220 
 221     
 222 
 223     client_comm_dtype_arr = (MPI_Datatype *)
 224         ADIOI_Calloc (1, nprocs * sizeof(MPI_Datatype));
 225     if (!fd->is_agg)
 226         for (i = 0; i < nprocs; i++)
 227             client_comm_dtype_arr[i] = MPI_BYTE;
 228 
 229     ADIOI_Exch_file_views (myrank, nprocs, file_ptr_type, fd, count,
 230                            datatype, offset, my_mem_view_state_arr,
 231                            agg_file_view_state_arr,
 232                            client_file_view_state_arr);
 233 
 234     agg_comm_sz_arr = (ADIO_Offset *)
 235         ADIOI_Calloc (1, nprocs * sizeof(ADIO_Offset));
 236     agg_comm_dtype_arr = (MPI_Datatype *)
 237         ADIOI_Malloc (nprocs * sizeof(MPI_Datatype));
 238     if (fd->is_agg) {
 239         ADIOI_Build_agg_reqs (fd, rdwr, nprocs,
 240                               client_file_view_state_arr,
 241                               client_comm_dtype_arr,
 242                               client_comm_sz_arr,
 243                               &agg_disp,
 244                               &agg_dtype);
 245         buffered_io_size = 0;
 246         for (i=0; i <nprocs; i++) {
 247             if (client_comm_sz_arr[i] > 0)
 248                 buffered_io_size += client_comm_sz_arr[i];
 249         }
 250     }
 251 #ifdef USE_PRE_REQ
 252     else 
 253     {
 254         
 255 
 256         
 257         for (i = 0; i < fd->hints->cb_nodes; i++)
 258         {
 259             agg_rank = fd->hints->ranklist[(i+myrank)%fd->hints->cb_nodes];
 260 #ifdef AGGREGATION_PROFILE
 261             MPE_Log_event (5040, 0, NULL);
 262 #endif
 263             ADIOI_Build_client_pre_req(
 264                 fd, agg_rank, (i+myrank)%fd->hints->cb_nodes,
 265                 &(my_mem_view_state_arr[agg_rank]),
 266                 &(agg_file_view_state_arr[agg_rank]),
 267                 2*1024*1024, 
 268                 64*1024);
 269 #ifdef AGGREGATION_PROFILE
 270             MPE_Log_event (5041, 0, NULL);
 271 #endif
 272         }
 273     }
 274 #endif
 275 
 276 
 277     if (fd->is_agg)
 278         cb_buf = (char *) ADIOI_Malloc (fd->hints->cb_buffer_size);
 279     alltoallw_disps  = (int *) ADIOI_Calloc (nprocs, sizeof(int));
 280     alltoallw_counts = client_alltoallw_counts = (int *)
 281         ADIOI_Calloc (2*nprocs, sizeof(int));
 282     agg_alltoallw_counts = &alltoallw_counts[nprocs];
 283 
 284     if (fd->hints->cb_alltoall == ADIOI_HINT_DISABLE) {
 285         
 286         if ((fd->is_agg) && (rdwr == ADIOI_WRITE))
 287             post_aggregator_comm(fd->comm, rdwr, nprocs, cb_buf,
 288                              client_comm_dtype_arr,
 289                              client_comm_sz_arr,
 290                              &agg_comm_requests,
 291                              &aggs_client_count);
 292     }
 293     
 294     Exch_data_amounts (fd, nprocs, client_comm_sz_arr, agg_comm_sz_arr,
 295                        client_alltoallw_counts, agg_alltoallw_counts,
 296                        &aggregators_done);
 297 
 298 #ifdef DEBUG
 299     fprintf (stderr, "client_alltoallw_counts[ ");
 300     for (i=0; i<nprocs; i++) {
 301         fprintf (stderr, "%d ", client_alltoallw_counts[i]);
 302     }
 303     fprintf (stderr, "]\n");
 304     fprintf (stderr, "agg_alltoallw_counts[ ");
 305     for (i=0; i<nprocs; i++) {
 306         fprintf (stderr,"%d ", agg_alltoallw_counts[i]);
 307     }
 308     fprintf (stderr, "]\n");
 309 #endif
 310 
 311     
 312     while (aggregators_done != nprocs_for_coll) {
 313         if (fd->hints->cb_alltoall == ADIOI_HINT_DISABLE) {
 314         
 315 
 316 
 317 
 318         client_comm_requests = (MPI_Request *)
 319             ADIOI_Calloc (fd->hints->cb_nodes, sizeof(MPI_Request));
 320 
 321         for (i = 0; i < fd->hints->cb_nodes; i++)
 322         {
 323             clients_agg_count = 0;
 324             agg_rank = fd->hints->ranklist[(i+myrank)%fd->hints->cb_nodes];
 325             if (agg_comm_sz_arr[agg_rank] > 0) {
 326                 ADIOI_Build_client_req(fd, agg_rank,
 327                                        (i+myrank)%fd->hints->cb_nodes,
 328                                        &(my_mem_view_state_arr[agg_rank]),
 329                                        &(agg_file_view_state_arr[agg_rank]),
 330                                        agg_comm_sz_arr[agg_rank], 
 331                                        &(agg_comm_dtype_arr[agg_rank]));
 332 
 333 #ifdef AGGREGATION_PROFILE
 334                 if (i == 0)
 335                     MPE_Log_event (5038, 0, NULL);
 336 #endif
 337                 post_client_comm (fd, rdwr, agg_rank, buf,
 338                                   agg_comm_dtype_arr[agg_rank],
 339                                   agg_alltoallw_counts[agg_rank],
 340                                   &client_comm_requests[clients_agg_count]);
 341                 clients_agg_count++;
 342             }
 343         }
 344 #ifdef AGGREGATION_PROFILE
 345         if (!clients_agg_count)
 346             MPE_Log_event(5039, 0, NULL);
 347 #endif
 348 
 349         if (rdwr == ADIOI_READ) {
 350             if (fd->is_agg && buffered_io_size) {
 351                 ADIOI_IOFiletype (fd, cb_buf, buffered_io_size, MPI_BYTE,
 352                                   ADIO_EXPLICIT_OFFSET, agg_disp, agg_dtype,
 353                                   ADIOI_READ, status, error_code);
 354                 if (*error_code != MPI_SUCCESS) return;
 355                 MPI_Type_free (&agg_dtype);
 356             }
 357 
 358 #ifdef DEBUG
 359             fprintf (stderr, "expecting from [agg](disp,size,cnt)=");
 360             for (i=0; i < nprocs; i++) {
 361                 MPI_Type_size_x (agg_comm_dtype_arr[i], &size);
 362                 fprintf (stderr, "[%d](%d,%d,%d)", i, alltoallw_disps[i], 
 363                          size, agg_alltoallw_counts[i]);
 364                 if (i != nprocs - 1)
 365                     fprintf(stderr, ",");
 366             }
 367             fprintf (stderr, "]\n");
 368             if (fd->is_agg) {
 369                 fprintf (stderr, "sending to [client](disp,size,cnt)=");
 370                 for (i=0; i < nprocs; i++) {
 371                     if (fd->is_agg)
 372                         MPI_Type_size_x (client_comm_dtype_arr[i], &size);
 373                     else
 374                         size = -1;
 375                     
 376                     fprintf (stderr, "[%d](%d,%d,%d)", i, alltoallw_disps[i], 
 377                              size, client_alltoallw_counts[i]);
 378                     if (i != nprocs - 1)
 379                         fprintf(stderr, ",");
 380                 }
 381                 fprintf (stderr,"\n");
 382             }
 383             fflush (NULL);
 384 #endif
 385             
 386             if (fd->is_agg)
 387                 post_aggregator_comm(fd->comm, rdwr, nprocs, cb_buf,
 388                                      client_comm_dtype_arr,
 389                                      client_comm_sz_arr,
 390                                      &agg_comm_requests,
 391                                      &aggs_client_count);
 392 
 393             if (fd->is_agg && aggs_client_count) {
 394                 agg_comm_statuses = ADIOI_Malloc(aggs_client_count *
 395                                                  sizeof(MPI_Status));
 396                 MPI_Waitall(aggs_client_count, agg_comm_requests,
 397                             agg_comm_statuses);
 398 #ifdef AGGREGATION_PROFILE
 399                 MPE_Log_event (5033, 0, NULL);
 400 #endif
 401                 ADIOI_Free (agg_comm_requests);
 402                 ADIOI_Free (agg_comm_statuses);
 403             }
 404 
 405             if (clients_agg_count) {
 406                 client_comm_statuses = ADIOI_Malloc(clients_agg_count *
 407                                                     sizeof(MPI_Status));
 408                 MPI_Waitall(clients_agg_count, client_comm_requests,
 409                             client_comm_statuses);
 410 #ifdef AGGREGATION_PROFILE
 411                 MPE_Log_event (5039, 0, NULL);
 412 #endif
 413                 ADIOI_Free (client_comm_requests);
 414                 ADIOI_Free (client_comm_statuses);
 415             }
 416 
 417 #ifdef DEBUG2
 418             fprintf (stderr, "buffered_io_size = %lld\n", buffered_io_size);
 419             if (fd->is_agg && buffered_io_size) {
 420                 fprintf (stderr, "buf = [");
 421                 for (i=0; i<bufextent; i++)
 422                     fprintf (stderr, "%c", ((char *) buf)[i]);
 423                 fprintf (stderr, "]\n");
 424                 fprintf (stderr, "cb_buf = [");
 425                 for (i=0; i<buffered_io_size; i++)
 426                     fprintf (stderr, "%c", cb_buf[i]);
 427                 fprintf (stderr, "]\n");
 428                 fflush (NULL);
 429             }
 430 #endif
 431         }
 432         else { 
 433 #ifdef DEBUG
 434             fprintf (stderr, "sending to [agg](disp,size,cnt)=");
 435             for (i=0; i < nprocs; i++) {
 436                 MPI_Type_size_x (agg_comm_dtype_arr[i], &size);
 437                 fprintf (stderr, "[%d](%d,%d,%d)", i, alltoallw_disps[i], 
 438                          size, agg_alltoallw_counts[i]);
 439                 if (i != nprocs - 1)
 440                     fprintf(stderr, ",");
 441             }
 442             fprintf (stderr, "]\n");
 443             fprintf (stderr, "expecting from [client](disp,size,cnt)=");
 444             for (i=0; i < nprocs; i++) {
 445                 if (fd->is_agg)
 446                     MPI_Type_size_x (client_comm_dtype_arr[i], &size);
 447                 else
 448                     size = -1;
 449                 
 450                 fprintf (stderr, "[%d](%d,%d,%d)", i, alltoallw_disps[i], 
 451                          size, client_alltoallw_counts[i]);
 452                 if (i != nprocs - 1)
 453                     fprintf(stderr, ",");
 454             }
 455             fprintf (stderr,"\n");
 456             fflush (NULL);
 457 #endif
 458 #ifdef DEBUG
 459             fprintf (stderr, "buffered_io_size = %lld\n", buffered_io_size);
 460 #endif
 461             
 462             if (clients_agg_count) {
 463                 client_comm_statuses = ADIOI_Malloc(clients_agg_count *
 464                                                     sizeof(MPI_Status));
 465                 MPI_Waitall(clients_agg_count, client_comm_requests,
 466                             client_comm_statuses);
 467 #ifdef AGGREGATION_PROFILE
 468                 MPE_Log_event (5039, 0, NULL);
 469 #endif
 470                 ADIOI_Free(client_comm_requests);
 471                 ADIOI_Free(client_comm_statuses);
 472             }
 473 #ifdef DEBUG2
 474             if (bufextent) {
 475                 fprintf (stderr, "buf = [");
 476                 for (i=0; i<bufextent; i++)
 477                     fprintf (stderr, "%c", ((char *) buf)[i]);
 478                 fprintf (stderr, "]\n");
 479             }
 480 #endif
 481 
 482             if (fd->is_agg && buffered_io_size) {
 483                 assert (aggs_client_count != 0);
 484                 
 485                 agg_comm_statuses = (MPI_Status *)
 486                     ADIOI_Malloc (aggs_client_count*sizeof(MPI_Status));
 487                 
 488                 MPI_Waitall (aggs_client_count, agg_comm_requests,
 489                              agg_comm_statuses);
 490 #ifdef AGGREGATION_PROFILE
 491                 MPE_Log_event (5033, 0, NULL);
 492 #endif
 493                 ADIOI_Free (agg_comm_requests);
 494                 ADIOI_Free (agg_comm_statuses);
 495 #ifdef DEBUG2
 496                 fprintf (stderr, "cb_buf = [");
 497                 for (i=0; i<buffered_io_size; i++)
 498                     fprintf (stderr, "%c", cb_buf[i]);
 499                 fprintf (stderr, "]\n");
 500                 fflush (NULL);
 501 #endif
 502                 ADIOI_IOFiletype (fd, cb_buf, buffered_io_size, MPI_BYTE,
 503                                   ADIO_EXPLICIT_OFFSET, agg_disp, agg_dtype,
 504                                   ADIOI_WRITE, status, error_code);
 505                 if (*error_code != MPI_SUCCESS) return;
 506                 MPI_Type_free (&agg_dtype);
 507             }
 508 
 509         }
 510         } else {
 511         
 512         ADIOI_Build_client_reqs(fd, nprocs, my_mem_view_state_arr,
 513                                 agg_file_view_state_arr,
 514                                 agg_comm_sz_arr, agg_comm_dtype_arr);
 515 
 516         if (rdwr == ADIOI_READ) {
 517             if (fd->is_agg && buffered_io_size) {
 518                 ADIOI_IOFiletype (fd, cb_buf, buffered_io_size, MPI_BYTE,
 519                                   ADIO_EXPLICIT_OFFSET, agg_disp, agg_dtype,
 520                                   ADIOI_READ, status, error_code);
 521                 if (*error_code != MPI_SUCCESS) return;
 522                 MPI_Type_free (&agg_dtype);
 523             }
 524 
 525 #ifdef AGGREGATION_PROFILE
 526             MPE_Log_event (5032, 0, NULL);
 527 #endif
 528             MPI_Alltoallw (cb_buf, client_alltoallw_counts, alltoallw_disps,
 529                            client_comm_dtype_arr,
 530                            buf, agg_alltoallw_counts , alltoallw_disps,
 531                            agg_comm_dtype_arr,
 532                            fd->comm);
 533 #ifdef AGGREGATION_PROFILE
 534             MPE_Log_event (5033, 0, NULL);
 535 #endif
 536         }
 537         else { 
 538 #ifdef AGGREGATION_PROFILE
 539             MPE_Log_event (5032, 0, NULL);
 540 #endif
 541             MPI_Alltoallw (buf, agg_alltoallw_counts, alltoallw_disps,
 542                            agg_comm_dtype_arr,
 543                            cb_buf, client_alltoallw_counts, alltoallw_disps,
 544                            client_comm_dtype_arr,
 545                            fd->comm);
 546 #ifdef AGGREGATION_PROFILE
 547             MPE_Log_event (5033, 0, NULL);
 548 #endif
 549             if (fd->is_agg && buffered_io_size) {
 550                 ADIOI_IOFiletype (fd, cb_buf, buffered_io_size, MPI_BYTE,
 551                                   ADIO_EXPLICIT_OFFSET, agg_disp, agg_dtype,
 552                                   ADIOI_WRITE, status, error_code);
 553                 if (*error_code != MPI_SUCCESS) return;
 554                 MPI_Type_free (&agg_dtype);
 555             }
 556         }
 557         }
 558 
 559         
 560         if (fd->is_agg) {
 561             if (buffered_io_size > 0) {
 562                 for (i=0; i<nprocs; i++) {
 563                     if (client_comm_sz_arr[i] > 0)
 564                         MPI_Type_free (&client_comm_dtype_arr[i]);
 565                 }
 566             }
 567         }
 568         for (i=0; i<nprocs; i++) {
 569             if (agg_comm_sz_arr[i] > 0)
 570                 MPI_Type_free (&agg_comm_dtype_arr[i]);
 571         }
 572 
 573         
 574         if (fd->is_agg) {
 575             ADIOI_Build_agg_reqs (fd, rdwr, nprocs,
 576                                   client_file_view_state_arr,
 577                                   client_comm_dtype_arr,
 578                                   client_comm_sz_arr,
 579                                   &agg_disp,
 580                                   &agg_dtype);
 581             buffered_io_size = 0;
 582             for (i=0; i <nprocs; i++) {
 583                 if (client_comm_sz_arr[i] > 0)
 584                     buffered_io_size += client_comm_sz_arr[i];
 585             }
 586         }
 587 #ifdef USE_PRE_REQ
 588         else {
 589             
 590 
 591             for (i = 0; i < fd->hints->cb_nodes; i++)
 592             {
 593                 agg_rank = fd->hints->ranklist[(i+myrank)%fd->hints->cb_nodes];
 594 #ifdef AGGREGATION_PROFILE
 595                 MPE_Log_event (5040, 0, NULL);
 596 #endif
 597                 ADIOI_Build_client_pre_req(
 598                     fd, agg_rank, (i+myrank)%fd->hints->cb_nodes,
 599                     &(my_mem_view_state_arr[agg_rank]),
 600                     &(agg_file_view_state_arr[agg_rank]),
 601                     2*1024*1024, 
 602                     64*1024);
 603 #ifdef AGGREGATION_PROFILE
 604                 MPE_Log_event (5041, 0, NULL);
 605 #endif
 606             }
 607         }
 608 #endif
 609         
 610         
 611 
 612 
 613         if (fd->hints->cb_alltoall == ADIOI_HINT_DISABLE) {
 614             if ((fd->is_agg) && (rdwr == ADIOI_WRITE))
 615                 post_aggregator_comm(fd->comm, rdwr, nprocs, cb_buf,
 616                                  client_comm_dtype_arr,
 617                                  client_comm_sz_arr,
 618                                  &agg_comm_requests,
 619                                  &aggs_client_count);
 620         }
 621 
 622         
 623         Exch_data_amounts (fd, nprocs, client_comm_sz_arr, agg_comm_sz_arr,
 624                            client_alltoallw_counts, agg_alltoallw_counts,
 625                            &aggregators_done);
 626 
 627     }
 628 
 629     
 630         
 631     if (fd->hints->cb_pfr != ADIOI_HINT_ENABLE) {
 632         
 633         if (1) {
 634             ADIOI_Delete_flattened (fd->file_realm_types[0]);
 635             MPI_Type_free (&fd->file_realm_types[0]);
 636         }
 637         else {
 638             for (i=0; i<fd->hints->cb_nodes; i++) {
 639                 ADIOI_Datatype_iscontig(fd->file_realm_types[i], &is_contig);
 640                 if (!is_contig)
 641                     ADIOI_Delete_flattened(fd->file_realm_types[i]);
 642                 MPI_Type_free (&fd->file_realm_types[i]);
 643             }
 644         }
 645         ADIOI_Free (fd->file_realm_types);
 646         ADIOI_Free (fd->file_realm_st_offs);
 647     }
 648 
 649     
 650 
 651 
 652     ADIOI_Delete_flattened(datatype);
 653     ADIOI_Delete_flattened(fd->filetype);
 654 
 655     if (fd->is_agg) {
 656         if (buffered_io_size > 0)
 657             MPI_Type_free (&agg_dtype);
 658         for (i=0; i<nprocs; i++) {
 659             MPI_Type_free (&client_comm_dtype_arr[i]);
 660             ADIOI_Free (client_file_view_state_arr[i].flat_type_p->indices);
 661             ADIOI_Free (client_file_view_state_arr[i].flat_type_p->blocklens);
 662             ADIOI_Free (client_file_view_state_arr[i].flat_type_p);
 663         }
 664         ADIOI_Free (client_file_view_state_arr);
 665         ADIOI_Free (cb_buf);
 666     } 
 667     for (i = 0; i<nprocs; i++)
 668         if (agg_comm_sz_arr[i] > 0)
 669             MPI_Type_free (&agg_comm_dtype_arr[i]);
 670     
 671     ADIOI_Free (client_comm_sz_arr);
 672     ADIOI_Free (client_comm_dtype_arr);
 673     ADIOI_Free (my_mem_view_state_arr);
 674     ADIOI_Free (agg_file_view_state_arr);
 675     ADIOI_Free (agg_comm_sz_arr);
 676     ADIOI_Free (agg_comm_dtype_arr);
 677     ADIOI_Free (alltoallw_disps);
 678     ADIOI_Free (alltoallw_counts);
 679     ADIOI_Free (all_st_end_offsets);
 680 
 681 #ifdef HAVE_STATUS_SET_BYTES
 682     MPIR_Status_set_bytes(status, datatype, bufsize);
 683     
 684 
 685 
 686 #endif
 687     fd->fp_sys_posn = -1; 
 688 #ifdef AGGREGATION_PROFILE
 689     if (rdwr == ADIOI_READ)
 690         MPE_Log_event (5011, 0, NULL);
 691     else
 692         MPE_Log_event (5013, 0, NULL);
 693 #endif
 694 }
 695 
 696 
 697 
 698 
 699 void ADIOI_Calc_bounds (ADIO_File fd, int count, MPI_Datatype buftype,
 700                         int file_ptr_type, ADIO_Offset offset,
 701                         ADIO_Offset *st_offset, ADIO_Offset *end_offset)
 702 {
 703     MPI_Count filetype_size, buftype_size, etype_size;
 704     int sum;
 705     MPI_Aint filetype_extent, lb;
 706     ADIO_Offset total_io;
 707     int filetype_is_contig;
 708     ADIO_Offset i, remainder;
 709     ADIOI_Flatlist_node *flat_file;
 710     
 711     ADIO_Offset st_byte_off, end_byte_off;
 712     
 713 #ifdef AGGREGATION_PROFILE
 714     MPE_Log_event (5000, 0, NULL);
 715 #endif
 716 
 717     if (!count) {
 718         
 719 
 720         memset (st_offset, 8, sizeof(ADIO_Offset));
 721         *st_offset = *st_offset / 2;
 722         *end_offset = -1;
 723         return;
 724     }
 725 
 726     ADIOI_Datatype_iscontig (fd->filetype, &filetype_is_contig);
 727     
 728     MPI_Type_size_x (fd->filetype, &filetype_size);
 729     MPI_Type_get_extent (fd->filetype, &lb, &filetype_extent);
 730     MPI_Type_size_x (fd->etype, &etype_size);
 731     MPI_Type_size_x (buftype, &buftype_size);
 732     
 733     total_io = buftype_size * count;
 734 
 735     if (filetype_is_contig) {
 736         if (file_ptr_type == ADIO_INDIVIDUAL)
 737             st_byte_off = fd->fp_ind;
 738         else
 739             st_byte_off = fd->disp + etype_size * offset;
 740 
 741         end_byte_off = st_byte_off + total_io - 1;
 742     }
 743     else {
 744         flat_file = ADIOI_Flatlist;
 745         while (flat_file->type != fd->filetype) flat_file = flat_file->next;
 746 
 747         
 748 
 749 
 750 
 751         if (file_ptr_type == ADIO_INDIVIDUAL) { 
 752             st_byte_off = fd->fp_ind;
 753             
 754 
 755             
 756             end_byte_off = (ADIO_Offset)
 757                 ((fd->fp_ind - fd->disp - flat_file->indices[0]) /
 758                  filetype_extent) * filetype_extent + fd->disp +
 759                 flat_file->indices[0];
 760             
 761             remainder = (fd->fp_ind - fd->disp - flat_file->indices[0]) %
 762                 filetype_extent;
 763             if (remainder) {
 764                 
 765                 sum = 0;
 766                 for (i=0; i<flat_file->count; i++) {
 767                     sum += flat_file->blocklens[i];
 768                     if ((flat_file->indices[i] - flat_file->indices[0] +
 769                          flat_file->blocklens[i]) >= remainder) {
 770                         sum -= (flat_file->blocklens[i] - (sum - remainder));
 771                         break;
 772                     }
 773                 }
 774                 total_io += sum;
 775             }
 776             
 777             end_byte_off += (total_io - 1) / filetype_size * filetype_extent;
 778             
 779             remainder = total_io % filetype_size;
 780             if (!remainder) {
 781                 for (i=flat_file->count - 1; i>=0; i--) {
 782                     if (flat_file->blocklens[i]) break;
 783                 }
 784                 assert (i > -1);
 785                 end_byte_off += flat_file->indices[i] +
 786                     flat_file->blocklens[i] - 1;
 787                 end_byte_off -= flat_file->indices[0];
 788             }
 789             else {
 790                 sum = 0;
 791                 for (i=0; i<flat_file->count; i++) {
 792                     sum += flat_file->blocklens[i];
 793                     if (sum >= remainder) {
 794                         end_byte_off += flat_file->indices[i] + 
 795                             flat_file->blocklens[i] - sum + remainder - 1;
 796                         break;
 797                     }
 798                 }
 799                 end_byte_off -= flat_file->indices[0];
 800             }
 801         }
 802         else {
 803             
 804                               
 805             st_byte_off = fd->disp + ((offset * etype_size) / filetype_size) *
 806                 filetype_extent;
 807             
 808             remainder = (etype_size * offset) % filetype_size;
 809             
 810             sum = 0;
 811             for (i=0; i<flat_file->count; i++) {
 812                 sum += flat_file->blocklens[i];
 813                 if (sum >= remainder) {
 814                     if (sum == remainder)
 815                         st_byte_off += flat_file->indices[i+1];
 816                     else
 817                         st_byte_off += flat_file->indices[i] +
 818                             flat_file->blocklens[i] - sum + remainder;
 819                     break;
 820                 }
 821             }
 822             
 823             
 824             
 825             end_byte_off = fd->disp + (offset * etype_size + total_io) /
 826                 filetype_size * filetype_extent;
 827             
 828             remainder = (offset * etype_size + total_io) % filetype_size;
 829             
 830             if (!remainder) {
 831                 
 832                 for (i=flat_file->count-1; i>=0; i--) {
 833                     if (flat_file->blocklens[i]) break;
 834                 }
 835                 assert (i >= 0);
 836                 
 837 
 838                 
 839 
 840 
 841                 
 842                 end_byte_off -= filetype_extent - flat_file->indices[i] -
 843                     flat_file->blocklens[i] + 1;
 844             }
 845             else {
 846                 sum = 0;
 847                 for (i=0; i<flat_file->count; i++) {
 848                     sum += flat_file->blocklens[i];
 849                     if (sum >= remainder) {
 850                         end_byte_off += flat_file->indices[i] + 
 851                             flat_file->blocklens[i] - sum + remainder - 1;
 852                         break;
 853                     }
 854                 }
 855             }
 856         }
 857     }
 858     
 859     *st_offset  = st_byte_off;
 860     *end_offset = end_byte_off;
 861 #ifdef DEBUG
 862     printf ("st_offset = %lld\nend_offset = %lld\n",
 863             st_byte_off, end_byte_off);
 864 #endif
 865 #ifdef AGGREGATION_PROFILE
 866     MPE_Log_event (5001, 0, NULL);
 867 #endif
 868 }
 869 
 870 
 871 
 872 
 873 
 874 
 875 void ADIOI_IOFiletype(ADIO_File fd, void *buf, int count,
 876                       MPI_Datatype datatype, int file_ptr_type,
 877                       ADIO_Offset offset, MPI_Datatype custom_ftype, 
 878                       int rdwr, ADIO_Status *status, int *error_code)
 879 {
 880     MPI_Datatype user_filetype;
 881     MPI_Datatype user_etype;
 882     ADIO_Offset user_disp;
 883     int user_ind_wr_buffer_size;
 884     int user_ind_rd_buffer_size;
 885     int f_is_contig, m_is_contig;
 886     int user_ds_read, user_ds_write;
 887     MPI_Aint f_extent, lb;
 888     MPI_Count f_size;
 889     int f_ds_percent; 
 890 
 891 #ifdef AGGREGATION_PROFILE
 892     if (rdwr == ADIOI_READ)
 893         MPE_Log_event(5006, 0, NULL);
 894     else
 895         MPE_Log_event(5008, 0, NULL);
 896 #endif
 897     MPI_Type_get_extent(custom_ftype, &lb, &f_extent);
 898     MPI_Type_size_x(custom_ftype, &f_size);
 899     f_ds_percent = 100 * f_size / f_extent;
 900 
 901     
 902     user_filetype           = fd->filetype;
 903     user_etype              = fd->etype;
 904     user_disp               = fd->disp;
 905     user_ds_read            = fd->hints->ds_read;
 906     user_ds_write           = fd->hints->ds_write;
 907     
 908     user_ind_wr_buffer_size = fd->hints->ind_wr_buffer_size;
 909     user_ind_rd_buffer_size = fd->hints->ind_rd_buffer_size;
 910 
 911     
 912     fd->filetype = custom_ftype;
 913     fd->etype    = MPI_BYTE;
 914     
 915     fd->hints->ind_wr_buffer_size = fd->hints->cb_buffer_size;
 916     fd->hints->ind_rd_buffer_size = fd->hints->cb_buffer_size;
 917     
 918 #ifdef DEBUG
 919     printf ("f_ds_percent = %d cb_ds_threshold = %d\n", f_ds_percent,
 920             fd->hints->cb_ds_threshold);
 921 #endif
 922     if (f_ds_percent >= fd->hints->cb_ds_threshold) {
 923         fd->hints->ds_read = ADIOI_HINT_ENABLE;
 924         fd->hints->ds_write = ADIOI_HINT_ENABLE;
 925     }
 926     else {
 927         fd->hints->ds_read = ADIOI_HINT_DISABLE;
 928         fd->hints->ds_write = ADIOI_HINT_DISABLE;
 929     }
 930 
 931     
 932 
 933 
 934 
 935 
 936     ADIOI_Datatype_iscontig(custom_ftype, &f_is_contig);
 937     ADIOI_Datatype_iscontig(datatype, &m_is_contig);
 938     if (!f_is_contig)
 939         ADIOI_Flatten_datatype (custom_ftype);
 940 
 941     
 942 
 943     if (f_is_contig && m_is_contig) {
 944         fd->disp = 0;
 945         if (rdwr == ADIOI_READ)
 946             ADIO_ReadContig(fd, buf, count, datatype, file_ptr_type, offset,
 947                             status, error_code);
 948         else
 949             ADIO_WriteContig(fd, buf, count, datatype, file_ptr_type, offset,
 950                              status, error_code);
 951     }
 952     else {
 953         fd->disp = offset;
 954         if (rdwr == ADIOI_READ)
 955             ADIO_ReadStrided(fd, buf, count, datatype, file_ptr_type, 0,
 956                              status, error_code);
 957         else
 958             ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type, 0,
 959                               status, error_code);
 960     }
 961 
 962     
 963     if (!f_is_contig)
 964         ADIOI_Delete_flattened (custom_ftype);
 965 
 966     
 967     fd->filetype                  = user_filetype;
 968     fd->etype                     = user_etype;
 969     fd->disp                      = user_disp;
 970     fd->hints->ds_read            = user_ds_read;
 971     fd->hints->ds_write           = user_ds_write;
 972     fd->hints->ind_wr_buffer_size = user_ind_wr_buffer_size;
 973     fd->hints->ind_rd_buffer_size = user_ind_rd_buffer_size;
 974 #ifdef AGGREGATION_PROFILE
 975     if (rdwr == ADIOI_READ)
 976         MPE_Log_event (5007, 0, NULL);
 977     else
 978         MPE_Log_event (5009, 0, NULL);
 979 #endif
 980 }
 981 
 982 static void Exch_data_amounts (ADIO_File fd, int nprocs,
 983                         ADIO_Offset *client_comm_sz_arr,
 984                         ADIO_Offset *agg_comm_sz_arr,
 985                         int *client_alltoallw_counts,
 986                         int *agg_alltoallw_counts,
 987                         int *aggregators_done)
 988 {
 989     int i;
 990     int recv_idx;
 991     MPI_Request *recv_requests;
 992     MPI_Request *send_requests;
 993     MPI_Status status;
 994     MPI_Status *send_statuses;
 995     
 996     if (fd->hints->cb_alltoall != ADIOI_HINT_DISABLE) {
 997         MPI_Alltoall (client_comm_sz_arr, sizeof(ADIO_Offset), MPI_BYTE,
 998                   agg_comm_sz_arr, sizeof(ADIO_Offset), MPI_BYTE,
 999                   fd->comm);
1000 
1001         if (fd->is_agg) {
1002             for (i=0; i<nprocs; i++)
1003                 if (client_comm_sz_arr[i] > 0)
1004                     client_alltoallw_counts[i] = 1;
1005                 else
1006                     client_alltoallw_counts[i] = 0;
1007         }
1008         *aggregators_done = 0;
1009         for (i=0; i<nprocs; i++) {
1010             if (agg_comm_sz_arr[i] == -1)
1011                 *aggregators_done = *aggregators_done + 1;
1012             else if (agg_comm_sz_arr[i] > 0)
1013                 agg_alltoallw_counts[i] = 1;
1014             else
1015                 agg_alltoallw_counts[i] = 0;
1016         }
1017     } else {
1018         
1019 
1020 
1021         recv_requests = ADIOI_Malloc (fd->hints->cb_nodes * sizeof(MPI_Request));
1022         
1023         for (i = 0; i < fd->hints->cb_nodes; i++)
1024             MPI_Irecv (&agg_comm_sz_arr[fd->hints->ranklist[i]],
1025                    sizeof(ADIO_Offset), MPI_BYTE, fd->hints->ranklist[i],
1026                    AMT_TAG, fd->comm, &recv_requests[i]);
1027 
1028         
1029 
1030         
1031         send_requests = NULL;
1032         if (fd->is_agg) {
1033             
1034             send_requests = ADIOI_Malloc (nprocs * sizeof(MPI_Request)); 
1035 
1036             
1037             for (i = 0; i < nprocs; i++) {
1038                 MPI_Isend (&client_comm_sz_arr[i], sizeof(ADIO_Offset),
1039                        MPI_BYTE, i, AMT_TAG, fd->comm, &send_requests[i]);
1040 
1041                 if (client_comm_sz_arr[i] > 0)
1042                     client_alltoallw_counts[i] = 1;
1043                 else
1044                     client_alltoallw_counts[i] = 0;
1045             }
1046         }
1047 
1048         *aggregators_done = 0;
1049         for (i=0; i < fd->hints->cb_nodes; i++) {
1050             MPI_Waitany (fd->hints->cb_nodes, recv_requests, &recv_idx, &status);
1051             if (agg_comm_sz_arr[fd->hints->ranklist[recv_idx]] == -1)
1052                 *aggregators_done = *aggregators_done + 1;
1053             else if (agg_comm_sz_arr[fd->hints->ranklist[recv_idx]] > 0)
1054                 agg_alltoallw_counts[fd->hints->ranklist[recv_idx]] = 1;
1055             else
1056                 agg_alltoallw_counts[fd->hints->ranklist[recv_idx]] = 0;
1057         }
1058 
1059         ADIOI_Free (recv_requests);
1060         if (fd->is_agg) {
1061             
1062             send_statuses = ADIOI_Malloc (nprocs * sizeof (MPI_Status));
1063             MPI_Waitall (nprocs, send_requests, send_statuses);
1064             ADIOI_Free (send_requests);
1065             ADIOI_Free (send_statuses);
1066         }
1067     }
1068 }
1069 
1070 static void post_aggregator_comm (MPI_Comm comm, int rw_type, 
1071                            int nproc, void *cb_buf,
1072                            MPI_Datatype *client_comm_dtype_arr,
1073                            ADIO_Offset *client_comm_sz_arr,
1074                            MPI_Request **requests_p,
1075                            int *aggs_client_count_p)
1076 {
1077     int aggs_client_count = 0;
1078     MPI_Request *requests;
1079     int i;
1080 
1081 #ifdef DEBUG
1082     printf ("posting aggregator communication\n");
1083 #endif
1084 
1085     for (i=0; i < nproc; i++)
1086         if (client_comm_sz_arr[i] > 0)
1087             aggs_client_count++;
1088 #ifdef DEBUG
1089     printf ("aggregator needs to talk to %d clients\n",
1090         aggs_client_count);
1091 #endif
1092     *aggs_client_count_p = aggs_client_count;
1093     if (aggs_client_count) {
1094         requests = (MPI_Request *)
1095             ADIOI_Malloc (aggs_client_count * sizeof(MPI_Request));
1096         aggs_client_count = 0;
1097 #ifdef AGGREGATION_PROFILE
1098         MPE_Log_event (5032, 0, NULL);
1099 #endif
1100         for (i=0; i < nproc; i++) {
1101             if (client_comm_sz_arr[i] > 0) {
1102                 if (rw_type == ADIOI_WRITE)
1103                     MPI_Irecv (cb_buf, 1, client_comm_dtype_arr[i], i,
1104                                DATA_TAG, comm,
1105                                &requests[aggs_client_count]);
1106                 else
1107                     MPI_Isend (cb_buf, 1, client_comm_dtype_arr[i], i,
1108                                DATA_TAG, comm,
1109                                &requests[aggs_client_count]);
1110 
1111                 aggs_client_count++;
1112             }
1113         }
1114         *requests_p = requests;
1115     }
1116 }
1117 
1118 static void post_client_comm (ADIO_File fd, int rw_type, 
1119                        int agg_rank, void *buf,
1120                        MPI_Datatype agg_comm_dtype,
1121                        int agg_alltoallw_count,
1122                        MPI_Request *request)
1123 {
1124     if (agg_alltoallw_count) {
1125         if (rw_type == ADIOI_READ)
1126             MPI_Irecv (buf, 1, agg_comm_dtype, agg_rank, DATA_TAG, fd->comm,
1127                        request);
1128         else
1129             MPI_Isend (buf, 1, agg_comm_dtype, agg_rank, DATA_TAG, fd->comm,
1130                        request);
1131     }
1132 }
1133 
1134 
1135