This source file includes following definitions.
- ADIOI_GEN_ReadStridedColl
- ADIOI_Calc_my_off_len
- ADIOI_Read_and_exch
- ADIOI_R_Exchange_data
- ADIOI_Fill_user_buffer
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 #include "adio.h"
   9 #include "adio_extern.h"
  10 
  11 #ifdef USE_DBG_LOGGING
  12   #define RDCOLL_DEBUG 1
  13 #endif
  14 #ifdef AGGREGATION_PROFILE
  15 #include "mpe.h"
  16 #endif
  17 
  18 
  19 static void ADIOI_Read_and_exch(ADIO_File fd, void *buf, MPI_Datatype
  20                                 datatype, int nprocs,
  21                                 int myrank, ADIOI_Access
  22                                 *others_req, ADIO_Offset *offset_list,
  23                                 ADIO_Offset *len_list, int contig_access_count, 
  24                                 ADIO_Offset
  25                                 min_st_offset, ADIO_Offset fd_size,
  26                                 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
  27                                 int *buf_idx, int *error_code);
  28 static void ADIOI_R_Exchange_data(ADIO_File fd, void *buf, ADIOI_Flatlist_node
  29                                   *flat_buf, ADIO_Offset *offset_list, ADIO_Offset
  30                                   *len_list, int *send_size, int *recv_size,
  31                                   int *count, int *start_pos, 
  32                                   int *partial_send, 
  33                                   int *recd_from_proc, int nprocs, 
  34                                   int myrank, int
  35                                   buftype_is_contig, int contig_access_count,
  36                                   ADIO_Offset min_st_offset, 
  37                                   ADIO_Offset fd_size,
  38                                   ADIO_Offset *fd_start, ADIO_Offset *fd_end, 
  39                                   ADIOI_Access *others_req, 
  40                                   int iter, 
  41                                   MPI_Aint buftype_extent, int *buf_idx);
  42 void ADIOI_Fill_user_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
  43                                    *flat_buf, char **recv_buf, ADIO_Offset 
  44                                    *offset_list, ADIO_Offset *len_list, 
  45                                    unsigned *recv_size, 
  46                                    MPI_Request *requests, MPI_Status *statuses,
  47                                    int *recd_from_proc, int nprocs,
  48                                    int contig_access_count, 
  49                                    ADIO_Offset min_st_offset, 
  50                                    ADIO_Offset fd_size, ADIO_Offset *fd_start, 
  51                                    ADIO_Offset *fd_end,
  52                                    MPI_Aint buftype_extent);
  53 
  54 
  55 void ADIOI_GEN_ReadStridedColl(ADIO_File fd, void *buf, int count,
  56                                MPI_Datatype datatype, int file_ptr_type,
  57                                ADIO_Offset offset, ADIO_Status *status, int
  58                                *error_code)
  59 {
  60 
  61 
  62 
  63 
  64 
  65 
  66     ADIOI_Access *my_req; 
  67     
  68 
  69     
  70     ADIOI_Access *others_req;
  71     
  72 
  73 
  74     int i, filetype_is_contig, nprocs, nprocs_for_coll, myrank;
  75     int contig_access_count=0, interleave_count = 0, buftype_is_contig;
  76     int *count_my_req_per_proc, count_my_req_procs, count_others_req_procs;
  77     ADIO_Offset start_offset, end_offset, orig_fp, fd_size, min_st_offset, off;
  78     ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *fd_start = NULL,
  79         *fd_end = NULL, *end_offsets = NULL;
  80     ADIO_Offset *len_list = NULL;
  81     int *buf_idx = NULL;
  82 
  83 #ifdef HAVE_STATUS_SET_BYTES
  84     MPI_Count bufsize, size;
  85 #endif
  86 
  87     if (fd->hints->cb_pfr != ADIOI_HINT_DISABLE) {
  88         ADIOI_IOStridedColl (fd, buf, count, ADIOI_READ, datatype, 
  89                         file_ptr_type, offset, status, error_code);
  90         return;
  91     }
  92 
  93 
  94     MPI_Comm_size(fd->comm, &nprocs);
  95     MPI_Comm_rank(fd->comm, &myrank);
  96 
  97     
  98     nprocs_for_coll = fd->hints->cb_nodes;
  99     orig_fp = fd->fp_ind;
 100 
 101     
 102     if (fd->hints->cb_read != ADIOI_HINT_DISABLE) {
 103     
 104 
 105 
 106     
 107 
 108 
 109         ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,
 110                               &offset_list, &len_list, &start_offset,
 111                               &end_offset, &contig_access_count); 
 112     
 113 #ifdef RDCOLL_DEBUG
 114     for (i=0; i<contig_access_count; i++) {
 115               DBG_FPRINTF(stderr, "rank %d  off %lld  len %lld\n", 
 116                               myrank, offset_list[i], len_list[i]);
 117               }
 118 #endif
 119 
 120         
 121 
 122  
 123     
 124         st_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
 125         end_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
 126 
 127         MPI_Allgather(&start_offset, 1, ADIO_OFFSET, st_offsets, 1,
 128                       ADIO_OFFSET, fd->comm);
 129         MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1,
 130                       ADIO_OFFSET, fd->comm);
 131 
 132         
 133         for (i=1; i<nprocs; i++)
 134             if ((st_offsets[i] < end_offsets[i-1]) && 
 135                 (st_offsets[i] <= end_offsets[i]))
 136                 interleave_count++;
 137         
 138 
 139     }
 140 
 141     ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
 142 
 143     if (fd->hints->cb_read == ADIOI_HINT_DISABLE
 144         || (!interleave_count && (fd->hints->cb_read == ADIOI_HINT_AUTO))) 
 145     {
 146         
 147         if (fd->hints->cb_read != ADIOI_HINT_DISABLE) {
 148             ADIOI_Free(offset_list);
 149             ADIOI_Free(len_list);
 150             ADIOI_Free(st_offsets);
 151             ADIOI_Free(end_offsets);
 152         }
 153 
 154         fd->fp_ind = orig_fp;
 155         ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
 156 
 157         if (buftype_is_contig && filetype_is_contig) {
 158             if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
 159                 off = fd->disp + (fd->etype_size) * offset;
 160                 ADIO_ReadContig(fd, buf, count, datatype, ADIO_EXPLICIT_OFFSET,
 161                        off, status, error_code);
 162             }
 163             else ADIO_ReadContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
 164                        0, status, error_code);
 165         }
 166         else ADIO_ReadStrided(fd, buf, count, datatype, file_ptr_type,
 167                        offset, status, error_code);
 168 
 169         return;
 170     }
 171 
 172     
 173 
 174 
 175 
 176 
 177 
 178 
 179 
 180 
 181 
 182 
 183 
 184 
 185 
 186 
 187 
 188     ADIOI_Calc_file_domains(st_offsets, end_offsets, nprocs,
 189                             nprocs_for_coll, &min_st_offset,
 190                             &fd_start, &fd_end, 
 191                             fd->hints->min_fdomain_size, &fd_size,
 192                             fd->hints->striping_unit);
 193 
 194     
 195 
 196 
 197 
 198 
 199 
 200 
 201 
 202 
 203 
 204 
 205 
 206     ADIOI_Calc_my_req(fd, offset_list, len_list, contig_access_count,
 207                       min_st_offset, fd_start, fd_end, fd_size,
 208                       nprocs, &count_my_req_procs, 
 209                       &count_my_req_per_proc, &my_req,
 210                       &buf_idx);
 211 
 212     
 213 
 214 
 215 
 216 
 217 
 218 
 219     ADIOI_Calc_others_req(fd, count_my_req_procs, 
 220                           count_my_req_per_proc, my_req, 
 221                           nprocs, myrank, &count_others_req_procs, 
 222                           &others_req); 
 223 
 224     
 225 
 226 
 227     ADIOI_Free(count_my_req_per_proc);
 228     for (i=0; i<nprocs; i++) {
 229         if (my_req[i].count) {
 230             ADIOI_Free(my_req[i].offsets);
 231             ADIOI_Free(my_req[i].lens);
 232         }
 233     }
 234     ADIOI_Free(my_req);
 235 
 236 
 237     
 238 
 239 
 240     ADIOI_Read_and_exch(fd, buf, datatype, nprocs, myrank,
 241                         others_req, offset_list,
 242                         len_list, contig_access_count, min_st_offset,
 243                         fd_size, fd_start, fd_end, buf_idx, error_code);
 244 
 245     if (!buftype_is_contig) ADIOI_Delete_flattened(datatype);
 246 
 247     
 248     for (i=0; i<nprocs; i++) {
 249         if (others_req[i].count) {
 250             ADIOI_Free(others_req[i].offsets);
 251             ADIOI_Free(others_req[i].lens);
 252             ADIOI_Free(others_req[i].mem_ptrs);
 253         }
 254     }
 255     ADIOI_Free(others_req);
 256 
 257     ADIOI_Free(buf_idx);
 258     ADIOI_Free(offset_list);
 259     ADIOI_Free(len_list);
 260     ADIOI_Free(st_offsets);
 261     ADIOI_Free(end_offsets);
 262     ADIOI_Free(fd_start);
 263     ADIOI_Free(fd_end);
 264 
 265 #ifdef HAVE_STATUS_SET_BYTES
 266     MPI_Type_size_x(datatype, &size);
 267     bufsize = size * count;
 268     MPIR_Status_set_bytes(status, datatype, bufsize);
 269 
 270 
 271 
 272 #endif
 273 
 274     fd->fp_sys_posn = -1;   
 275 }
 276 
 277 void ADIOI_Calc_my_off_len(ADIO_File fd, int bufcount, MPI_Datatype
 278                             datatype, int file_ptr_type, ADIO_Offset
 279                             offset, ADIO_Offset **offset_list_ptr, ADIO_Offset
 280                             **len_list_ptr, ADIO_Offset *start_offset_ptr,
 281                             ADIO_Offset *end_offset_ptr, int
 282                            *contig_access_count_ptr)
 283 {
 284     MPI_Count filetype_size, etype_size;
 285     MPI_Count buftype_size;
 286     int i, j, k;
 287     ADIO_Offset i_offset;
 288     ADIO_Offset frd_size=0, old_frd_size=0;
 289     int st_index=0;
 290     ADIO_Offset n_filetypes, etype_in_filetype;
 291     ADIO_Offset abs_off_in_filetype=0;
 292     ADIO_Offset bufsize;
 293     ADIO_Offset sum, n_etypes_in_filetype, size_in_filetype;
 294     int contig_access_count, filetype_is_contig;
 295     ADIO_Offset *len_list;
 296     MPI_Aint filetype_extent, filetype_lb;
 297     ADIOI_Flatlist_node *flat_file;
 298     ADIO_Offset *offset_list, off, end_offset=0, disp;
 299 
 300 #ifdef AGGREGATION_PROFILE
 301     MPE_Log_event (5028, 0, NULL);
 302 #endif
 303     
 304 
 305 
 306 
 307     ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
 308 
 309     MPI_Type_size_x(fd->filetype, &filetype_size);
 310     MPI_Type_get_extent(fd->filetype, &filetype_lb, &filetype_extent);
 311     MPI_Type_size_x(datatype, &buftype_size);
 312     etype_size = fd->etype_size;
 313 
 314     if ( ! filetype_size ) {
 315         *contig_access_count_ptr = 0;
 316         *offset_list_ptr = (ADIO_Offset *) ADIOI_Malloc(2*sizeof(ADIO_Offset));
 317         *len_list_ptr = (ADIO_Offset *) ADIOI_Malloc(2*sizeof(ADIO_Offset));
 318         
 319 
 320         offset_list = *offset_list_ptr;
 321         len_list = *len_list_ptr;
 322         offset_list[0] = (file_ptr_type == ADIO_INDIVIDUAL) ? fd->fp_ind : 
 323                  fd->disp + (ADIO_Offset)etype_size * offset;
 324         len_list[0] = 0;
 325         *start_offset_ptr = offset_list[0];
 326         *end_offset_ptr = offset_list[0] + len_list[0] - 1;
 327         
 328         return;
 329     }
 330 
 331     if (filetype_is_contig) {
 332         *contig_access_count_ptr = 1;        
 333         *offset_list_ptr = (ADIO_Offset *) ADIOI_Malloc(2*sizeof(ADIO_Offset));
 334         *len_list_ptr = (ADIO_Offset *) ADIOI_Malloc(2*sizeof(ADIO_Offset));
 335         
 336 
 337         offset_list = *offset_list_ptr;
 338         len_list = *len_list_ptr;
 339         offset_list[0] = (file_ptr_type == ADIO_INDIVIDUAL) ? fd->fp_ind : 
 340                  fd->disp + (ADIO_Offset)etype_size * offset;
 341         len_list[0] = (ADIO_Offset)bufcount * (ADIO_Offset)buftype_size;
 342         *start_offset_ptr = offset_list[0];
 343         *end_offset_ptr = offset_list[0] + len_list[0] - 1;
 344 
 345         
 346         if (file_ptr_type == ADIO_INDIVIDUAL) fd->fp_ind = *end_offset_ptr + 1;
 347     }
 348 
 349     else {
 350 
 351        
 352    
 353        
 354         flat_file = ADIOI_Flatlist;
 355         while (flat_file->type != fd->filetype) flat_file = flat_file->next;
 356         disp = fd->disp;
 357 
 358 #ifdef RDCOLL_DEBUG 
 359         {
 360             int ii;
 361             DBG_FPRINTF(stderr, "flattened %3lld : ", flat_file->count );
 362             for (ii=0; ii<flat_file->count; ii++) {
 363                 DBG_FPRINTF(stderr, "%16lld:%-16lld", flat_file->indices[ii], flat_file->blocklens[ii] );
 364             }
 365             DBG_FPRINTF(stderr, "\n" );
 366         }
 367 #endif
 368         if (file_ptr_type == ADIO_INDIVIDUAL) {
 369            
 370             offset       = fd->fp_ind - disp;
 371             n_filetypes  = (offset - flat_file->indices[0]) / filetype_extent;
 372              offset     -= (ADIO_Offset)n_filetypes * filetype_extent;
 373                 
 374  
 375             
 376             for (i=0; i<flat_file->count; i++) {
 377                 ADIO_Offset dist;
 378                 if (flat_file->blocklens[i] == 0) continue;
 379                 dist = flat_file->indices[i] + flat_file->blocklens[i] - offset;
 380                 
 381                 if (dist == 0) {
 382                         i++;
 383                         offset   = flat_file->indices[i];
 384                         frd_size = flat_file->blocklens[i];
 385                         break;
 386                 }
 387                 if (dist > 0) {
 388                     frd_size = dist;
 389                     break;
 390                 }
 391             }
 392             st_index = i;  
 393             offset += disp + (ADIO_Offset)n_filetypes*filetype_extent;
 394         }
 395         else {
 396             n_etypes_in_filetype = filetype_size/etype_size;
 397             n_filetypes = offset / n_etypes_in_filetype;
 398             etype_in_filetype = offset % n_etypes_in_filetype;
 399             size_in_filetype = etype_in_filetype * etype_size;
 400  
 401             sum = 0;
 402             for (i=0; i<flat_file->count; i++) {
 403                 sum += flat_file->blocklens[i];
 404                 if (sum > size_in_filetype) {
 405                     st_index = i;
 406                     frd_size = sum - size_in_filetype;
 407                     abs_off_in_filetype = flat_file->indices[i] +
 408                         size_in_filetype - (sum - flat_file->blocklens[i]);
 409                     break;
 410                 }
 411             }
 412 
 413             
 414             offset = disp + n_filetypes* (ADIO_Offset)filetype_extent + 
 415                 abs_off_in_filetype;
 416         }
 417 
 418          
 419 
 420         old_frd_size = frd_size;
 421         contig_access_count = i_offset = 0;
 422         j = st_index;
 423         bufsize = (ADIO_Offset)buftype_size * (ADIO_Offset)bufcount;
 424         frd_size = ADIOI_MIN(frd_size, bufsize);
 425         while (i_offset < bufsize) {
 426             if (frd_size) contig_access_count++;
 427             i_offset += frd_size;
 428             j = (j + 1) % flat_file->count;
 429             frd_size = ADIOI_MIN(flat_file->blocklens[j], bufsize-i_offset);
 430         }
 431 
 432         
 433 
 434         *offset_list_ptr = (ADIO_Offset *)
 435                  ADIOI_Malloc((contig_access_count+1)*sizeof(ADIO_Offset));  
 436         *len_list_ptr = (ADIO_Offset *) ADIOI_Malloc((contig_access_count+1)*sizeof(ADIO_Offset));
 437         
 438 
 439         offset_list = *offset_list_ptr;
 440         len_list = *len_list_ptr;
 441 
 442       
 443 
 444         *start_offset_ptr = offset; 
 445 
 446         i_offset = k = 0;
 447         j = st_index;
 448         off = offset;
 449         frd_size = ADIOI_MIN(old_frd_size, bufsize);
 450         while (i_offset < bufsize) {
 451             if (frd_size) {
 452                 offset_list[k] = off;
 453                 len_list[k] = frd_size;
 454                 k++;
 455             }
 456             i_offset += frd_size;
 457             end_offset = off + frd_size - 1;
 458 
 459      
 460 
 461 
 462             if (off + frd_size < disp + flat_file->indices[j] +
 463                 flat_file->blocklens[j] + 
 464                  n_filetypes* (ADIO_Offset)filetype_extent)
 465             {
 466                 off += frd_size;
 467                 
 468 
 469 
 470             }
 471             else {
 472                 j = (j+1) % flat_file->count;
 473                 n_filetypes += (j == 0) ? 1 : 0;
 474                 while (flat_file->blocklens[j]==0) {
 475                         j = (j+1) % flat_file->count;
 476                     n_filetypes += (j == 0) ? 1 : 0;
 477                     
 478 
 479                 }
 480                 off = disp + flat_file->indices[j] + 
 481                      n_filetypes* (ADIO_Offset)filetype_extent;
 482                 frd_size = ADIOI_MIN(flat_file->blocklens[j], bufsize-i_offset);
 483             }
 484         }
 485 
 486         
 487         if (file_ptr_type == ADIO_INDIVIDUAL) fd->fp_ind = off;
 488 
 489         *contig_access_count_ptr = contig_access_count;
 490          *end_offset_ptr = end_offset;
 491     }
 492 #ifdef AGGREGATION_PROFILE
 493     MPE_Log_event (5029, 0, NULL);
 494 #endif
 495 }
 496 
 497 static void ADIOI_Read_and_exch(ADIO_File fd, void *buf, MPI_Datatype
 498                          datatype, int nprocs,
 499                          int myrank, ADIOI_Access
 500                          *others_req, ADIO_Offset *offset_list,
 501                          ADIO_Offset *len_list, int contig_access_count, ADIO_Offset
 502                          min_st_offset, ADIO_Offset fd_size,
 503                          ADIO_Offset *fd_start, ADIO_Offset *fd_end,
 504                          int *buf_idx, int *error_code)
 505 {
 506 
 507 
 508 
 509 
 510 
 511 
 512 
 513 
 514 
 515 
 516     int i, j, m, ntimes, max_ntimes, buftype_is_contig;
 517     ADIO_Offset st_loc=-1, end_loc=-1, off, done, real_off, req_off;
 518     char *read_buf = NULL, *tmp_buf;
 519     int *curr_offlen_ptr, *count, *send_size, *recv_size;
 520     int *partial_send, *recd_from_proc, *start_pos;
 521     
 522     ADIO_Offset real_size, size, for_curr_iter, for_next_iter;
 523     int req_len, flag, rank;
 524     MPI_Status status;
 525     ADIOI_Flatlist_node *flat_buf=NULL;
 526     MPI_Aint buftype_extent, lb;
 527     int coll_bufsize;
 528 
 529     *error_code = MPI_SUCCESS;  
 530     
 531     
 532 
 533 
 534 
 535 
 536 
 537     coll_bufsize = fd->hints->cb_buffer_size;
 538 
 539     
 540     for (i=0; i < nprocs; i++) {
 541         if (others_req[i].count) {
 542             st_loc = others_req[i].offsets[0];
 543             end_loc = others_req[i].offsets[0];
 544             break;
 545         }
 546     }
 547 
 548     
 549     for (i=0; i < nprocs; i++)
 550         for (j=0; j<others_req[i].count; j++) {
 551             st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);
 552             end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j]
 553                                           + others_req[i].lens[j] - 1));
 554         }
 555 
 556     
 557 
 558 
 559 
 560 
 561     if ((st_loc==-1) && (end_loc==-1)) {
 562         
 563         ntimes = 0;
 564     }
 565     else {
 566         
 567         ntimes = (int) ((end_loc - st_loc + coll_bufsize)/coll_bufsize);
 568     }
 569 
 570     MPI_Allreduce(&ntimes, &max_ntimes, 1, MPI_INT, MPI_MAX, fd->comm); 
 571 
 572     read_buf = fd->io_buf;  
 573 
 574     curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int)); 
 575     
 576 
 577     count = (int *) ADIOI_Malloc(nprocs * sizeof(int));
 578     
 579 
 580 
 581     partial_send = (int *) ADIOI_Calloc(nprocs, sizeof(int));
 582     
 583 
 584 
 585 
 586     send_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
 587     
 588 
 589     recv_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
 590     
 591 
 592 
 593     recd_from_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int));
 594     
 595 
 596 
 597     start_pos = (int *) ADIOI_Malloc(nprocs*sizeof(int));
 598     
 599 
 600 
 601     ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
 602     if (!buftype_is_contig) {
 603         flat_buf = ADIOI_Flatten_and_find(datatype);
 604     }
 605     MPI_Type_get_extent(datatype, &lb, &buftype_extent);
 606 
 607     done = 0;
 608     off = st_loc;
 609     for_curr_iter = for_next_iter = 0;
 610 
 611     MPI_Comm_rank(fd->comm, &rank);
 612 
 613     for (m=0; m<ntimes; m++) {
 614        
 615        
 616 
 617 
 618        
 619 
 620 
 621 
 622 
 623 
 624 
 625 
 626 
 627 
 628 
 629 
 630 
 631 
 632 
 633 
 634 
 635 
 636 
 637 
 638 
 639  
 640 
 641           
 642 
 643 
 644 
 645 
 646 
 647 
 648 
 649 
 650 
 651         size = ADIOI_MIN((unsigned)coll_bufsize, end_loc-st_loc+1-done); 
 652         real_off = off - for_curr_iter;
 653         real_size = size + for_curr_iter;
 654 
 655         for (i=0; i<nprocs; i++) count[i] = send_size[i] = 0;
 656         for_next_iter = 0;
 657 
 658         for (i=0; i<nprocs; i++) {
 659 #ifdef RDCOLL_DEBUG
 660             DBG_FPRINTF(stderr, "rank %d, i %d, others_count %d\n", rank, i, others_req[i].count); 
 661 #endif
 662             if (others_req[i].count) {
 663                 start_pos[i] = curr_offlen_ptr[i];
 664                 for (j=curr_offlen_ptr[i]; j<others_req[i].count;
 665                      j++) {
 666                     if (partial_send[i]) {
 667                         
 668 
 669                         req_off = others_req[i].offsets[j] +
 670                             partial_send[i]; 
 671                         req_len = others_req[i].lens[j] -
 672                             partial_send[i];
 673                         partial_send[i] = 0;
 674                         
 675                         others_req[i].offsets[j] = req_off;
 676                         others_req[i].lens[j] = req_len;
 677                     }
 678                     else {
 679                         req_off = others_req[i].offsets[j];
 680                         req_len = others_req[i].lens[j];
 681                     }
 682                     if (req_off < real_off + real_size) {
 683                         count[i]++;
 684       ADIOI_Assert((((ADIO_Offset)(MPIU_Upint)read_buf)+req_off-real_off) == (ADIO_Offset)(MPIU_Upint)(read_buf+req_off-real_off));
 685                         MPI_Get_address(read_buf+req_off-real_off, 
 686                                &(others_req[i].mem_ptrs[j]));
 687       ADIOI_Assert((real_off + real_size - req_off) == (int)(real_off + real_size - req_off));
 688                         send_size[i] += (int)(ADIOI_MIN(real_off + real_size - req_off, 
 689                                       (ADIO_Offset)(unsigned)req_len)); 
 690 
 691                         if (real_off+real_size-req_off < (ADIO_Offset)(unsigned)req_len) {
 692                             partial_send[i] = (int) (real_off + real_size - req_off);
 693                             if ((j+1 < others_req[i].count) && 
 694                                  (others_req[i].offsets[j+1] < 
 695                                      real_off+real_size)) { 
 696                                 
 697 
 698                                 for_next_iter = ADIOI_MAX(for_next_iter,
 699                                           real_off + real_size - others_req[i].offsets[j+1]); 
 700                                 
 701 
 702                             }
 703                             break;
 704                         }
 705                     }
 706                     else break;
 707                 }
 708                 curr_offlen_ptr[i] = j;
 709             }
 710         }
 711 
 712         flag = 0;
 713         for (i=0; i<nprocs; i++)
 714             if (count[i]) flag = 1;
 715 
 716         if (flag) {
 717       ADIOI_Assert(size == (int)size);
 718             ADIO_ReadContig(fd, read_buf+for_curr_iter, (int)size, MPI_BYTE,
 719                             ADIO_EXPLICIT_OFFSET, off, &status, error_code);
 720             if (*error_code != MPI_SUCCESS) return;
 721         }
 722         
 723         for_curr_iter = for_next_iter;
 724         
 725         ADIOI_R_Exchange_data(fd, buf, flat_buf, offset_list, len_list,
 726                             send_size, recv_size, count, 
 727                             start_pos, partial_send, recd_from_proc, nprocs,
 728                             myrank, 
 729                             buftype_is_contig, contig_access_count,
 730                             min_st_offset, fd_size, fd_start, fd_end,
 731                             others_req, 
 732                             m, buftype_extent, buf_idx); 
 733 
 734 
 735         if (for_next_iter) {
 736             tmp_buf = (char *) ADIOI_Malloc(for_next_iter);
 737       ADIOI_Assert((((ADIO_Offset)(MPIU_Upint)read_buf)+real_size-for_next_iter) == (ADIO_Offset)(MPIU_Upint)(read_buf+real_size-for_next_iter));
 738       ADIOI_Assert((ADIO_Size) (for_next_iter+coll_bufsize) == (size_t)(for_next_iter+coll_bufsize));
 739             memcpy(tmp_buf, read_buf+real_size-for_next_iter, for_next_iter);
 740             ADIOI_Free(fd->io_buf);
 741             fd->io_buf = (char *) ADIOI_Malloc(for_next_iter+coll_bufsize);
 742             memcpy(fd->io_buf, tmp_buf, for_next_iter);
 743             read_buf = fd->io_buf;
 744             ADIOI_Free(tmp_buf);
 745         }
 746 
 747         off += size;
 748         done += size;
 749     }
 750 
 751     for (i=0; i<nprocs; i++) count[i] = send_size[i] = 0;
 752     for (m=ntimes; m<max_ntimes; m++) 
 753 
 754         ADIOI_R_Exchange_data(fd, buf, flat_buf, offset_list, len_list,
 755                             send_size, recv_size, count, 
 756                             start_pos, partial_send, recd_from_proc, nprocs,
 757                             myrank, 
 758                             buftype_is_contig, contig_access_count,
 759                             min_st_offset, fd_size, fd_start, fd_end,
 760                             others_req, m,
 761                             buftype_extent, buf_idx); 
 762 
 763     ADIOI_Free(curr_offlen_ptr);
 764     ADIOI_Free(count);
 765     ADIOI_Free(partial_send);
 766     ADIOI_Free(send_size);
 767     ADIOI_Free(recv_size);
 768     ADIOI_Free(recd_from_proc);
 769     ADIOI_Free(start_pos);
 770 }
 771 
 772 static void ADIOI_R_Exchange_data(ADIO_File fd, void *buf, ADIOI_Flatlist_node
 773                          *flat_buf, ADIO_Offset *offset_list, ADIO_Offset
 774                          *len_list, int *send_size, int *recv_size,
 775                          int *count, int *start_pos, int *partial_send, 
 776                          int *recd_from_proc, int nprocs, 
 777                          int myrank, int
 778                          buftype_is_contig, int contig_access_count,
 779                          ADIO_Offset min_st_offset, ADIO_Offset fd_size,
 780                          ADIO_Offset *fd_start, ADIO_Offset *fd_end, 
 781                          ADIOI_Access *others_req, 
 782                          int iter, MPI_Aint buftype_extent, int *buf_idx)
 783 {
 784     int i, j, k=0, tmp=0, nprocs_recv, nprocs_send;
 785     char **recv_buf = NULL; 
 786     MPI_Request *requests;
 787     MPI_Datatype send_type;
 788     MPI_Status *statuses;
 789 
 790 
 791 
 792 
 793     MPI_Alltoall(send_size, 1, MPI_INT, recv_size, 1, MPI_INT, fd->comm);
 794 
 795     nprocs_recv = 0;
 796     for (i=0; i < nprocs; i++) if (recv_size[i]) nprocs_recv++;
 797 
 798     nprocs_send = 0;
 799     for (i=0; i<nprocs; i++) if (send_size[i]) nprocs_send++;
 800 
 801     requests = (MPI_Request *)
 802         ADIOI_Malloc((nprocs_send+nprocs_recv+1)*sizeof(MPI_Request));
 803 
 804 
 805 
 806 
 807 
 808 #ifdef AGGREGATION_PROFILE
 809     MPE_Log_event (5032, 0, NULL);
 810 #endif
 811 
 812     if (buftype_is_contig) {
 813         j = 0;
 814         for (i=0; i < nprocs; i++) 
 815             if (recv_size[i]) {
 816                 MPI_Irecv(((char *) buf) + buf_idx[i], recv_size[i], 
 817                   MPI_BYTE, i, myrank+i+100*iter, fd->comm, requests+j);
 818                 j++;
 819                 buf_idx[i] += recv_size[i];
 820             }
 821     }
 822     else {
 823 
 824         recv_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char*));
 825         for (i=0; i < nprocs; i++) 
 826             if (recv_size[i]) recv_buf[i] = 
 827                                   (char *) ADIOI_Malloc(recv_size[i]);
 828 
 829             j = 0;
 830             for (i=0; i < nprocs; i++) 
 831                 if (recv_size[i]) {
 832                     MPI_Irecv(recv_buf[i], recv_size[i], MPI_BYTE, i, 
 833                               myrank+i+100*iter, fd->comm, requests+j);
 834                     j++;
 835 #ifdef RDCOLL_DEBUG
 836                     DBG_FPRINTF(stderr, "node %d, recv_size %d, tag %d \n", 
 837                        myrank, recv_size[i], myrank+i+100*iter); 
 838 #endif
 839                 }
 840     }
 841 
 842 
 843 
 844     j = 0;
 845     for (i=0; i<nprocs; i++) {
 846         if (send_size[i]) {
 847 
 848             if (partial_send[i]) {
 849                 k = start_pos[i] + count[i] - 1;
 850                 tmp = others_req[i].lens[k];
 851                 others_req[i].lens[k] = partial_send[i];
 852             }
 853             ADIOI_Type_create_hindexed_x(count[i],
 854                   &(others_req[i].lens[start_pos[i]]),
 855                     &(others_req[i].mem_ptrs[start_pos[i]]), 
 856                          MPI_BYTE, &send_type);
 857             
 858             MPI_Type_commit(&send_type);
 859             MPI_Isend(MPI_BOTTOM, 1, send_type, i, myrank+i+100*iter,
 860                       fd->comm, requests+nprocs_recv+j);
 861             MPI_Type_free(&send_type);
 862             if (partial_send[i]) others_req[i].lens[k] = tmp;
 863             j++;
 864         }
 865     }
 866 
 867     statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send+nprocs_recv+1) * \
 868                                      sizeof(MPI_Status)); 
 869      
 870 
 871     
 872     if (nprocs_recv) {
 873 #ifdef NEEDS_MPI_TEST
 874         j = 0;
 875         while (!j) MPI_Testall(nprocs_recv, requests, &j, statuses);
 876 #else
 877         MPI_Waitall(nprocs_recv, requests, statuses);
 878 #endif
 879 
 880         
 881         if (!buftype_is_contig) 
 882             ADIOI_Fill_user_buffer(fd, buf, flat_buf, recv_buf,
 883                                    offset_list, len_list, (unsigned*)recv_size, 
 884                                    requests, statuses, recd_from_proc, 
 885                                    nprocs, contig_access_count,
 886                                    min_st_offset, fd_size, fd_start, fd_end,
 887                                    buftype_extent);
 888     }
 889 
 890     
 891     MPI_Waitall(nprocs_send, requests+nprocs_recv, statuses+nprocs_recv);
 892 
 893     ADIOI_Free(statuses);
 894     ADIOI_Free(requests);
 895 
 896     if (!buftype_is_contig) {
 897         for (i=0; i < nprocs; i++) 
 898             if (recv_size[i]) ADIOI_Free(recv_buf[i]);
 899         ADIOI_Free(recv_buf);
 900     }
 901 #ifdef AGGREGATION_PROFILE
 902     MPE_Log_event (5033, 0, NULL);
 903 #endif
 904 }
 905 
 906 #define ADIOI_BUF_INCR \
 907 { \
 908     while (buf_incr) { \
 909         size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \
 910         user_buf_idx += size_in_buf; \
 911         flat_buf_sz -= size_in_buf; \
 912         if (!flat_buf_sz) { \
 913             if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
 914             else { \
 915                 flat_buf_idx = 0; \
 916                 n_buftypes++; \
 917             } \
 918             user_buf_idx = flat_buf->indices[flat_buf_idx] + \
 919                               (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
 920             flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
 921         } \
 922         buf_incr -= size_in_buf; \
 923     } \
 924 }
 925 
 926 
 927 #define ADIOI_BUF_COPY \
 928 { \
 929     while (size) { \
 930         size_in_buf = ADIOI_MIN(size, flat_buf_sz); \
 931   ADIOI_Assert((((ADIO_Offset)(MPIU_Upint)buf) + user_buf_idx) == (ADIO_Offset)(MPIU_Upint)((MPIU_Upint)buf + user_buf_idx)); \
 932   ADIOI_Assert((ADIO_Size) size_in_buf == (size_t)size_in_buf);          \
 933         memcpy(((char *) buf) + user_buf_idx, \
 934                &(recv_buf[p][recv_buf_idx[p]]), size_in_buf); \
 935         recv_buf_idx[p] += size_in_buf;  \
 936         user_buf_idx += size_in_buf; \
 937         flat_buf_sz -= size_in_buf; \
 938         if (!flat_buf_sz) { \
 939             if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
 940             else { \
 941                 flat_buf_idx = 0; \
 942                 n_buftypes++; \
 943             } \
 944             user_buf_idx = flat_buf->indices[flat_buf_idx] + \
 945                               (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
 946             flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
 947         } \
 948         size -= size_in_buf; \
 949         buf_incr -= size_in_buf; \
 950     } \
 951     ADIOI_BUF_INCR \
 952 }
 953 
 954 void ADIOI_Fill_user_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
 955                                    *flat_buf, char **recv_buf, ADIO_Offset 
 956                                    *offset_list, ADIO_Offset *len_list, 
 957                                    unsigned *recv_size, 
 958                                    MPI_Request *requests, MPI_Status *statuses,
 959                                    int *recd_from_proc, int nprocs,
 960                                    int contig_access_count, 
 961                                    ADIO_Offset min_st_offset, 
 962                                    ADIO_Offset fd_size, ADIO_Offset *fd_start, 
 963                                    ADIO_Offset *fd_end,
 964                                    MPI_Aint buftype_extent)
 965 {
 966 
 967 
 968 
 969     int i, p, flat_buf_idx;
 970     ADIO_Offset flat_buf_sz, size_in_buf, buf_incr, size;
 971     int n_buftypes;
 972     ADIO_Offset off, len, rem_len, user_buf_idx;
 973     
 974     unsigned *curr_from_proc, *done_from_proc, *recv_buf_idx;
 975 
 976     ADIOI_UNREFERENCED_ARG(requests);
 977     ADIOI_UNREFERENCED_ARG(statuses);
 978 
 979 
 980 
 981 
 982 
 983 
 984 
 985     curr_from_proc = (unsigned *) ADIOI_Malloc(nprocs * sizeof(unsigned));
 986     done_from_proc = (unsigned *) ADIOI_Malloc(nprocs * sizeof(unsigned));
 987     recv_buf_idx   = (unsigned *) ADIOI_Malloc(nprocs * sizeof(unsigned));
 988 
 989     for (i=0; i < nprocs; i++) {
 990         recv_buf_idx[i] = curr_from_proc[i] = 0;
 991         done_from_proc[i] = recd_from_proc[i];
 992     }
 993 
 994     user_buf_idx = flat_buf->indices[0];
 995     flat_buf_idx = 0;
 996     n_buftypes = 0;
 997     flat_buf_sz = flat_buf->blocklens[0];
 998 
 999     
1000 
1001 
1002 
1003     for (i=0; i<contig_access_count; i++) { 
1004         off     = offset_list[i];
1005         rem_len = len_list[i];
1006 
1007         
1008         while (rem_len != 0) {
1009             len = rem_len;
1010             
1011 
1012 
1013 
1014             p = ADIOI_Calc_aggregator(fd,
1015                                       off,
1016                                       min_st_offset,
1017                                       &len,
1018                                       fd_size,
1019                                       fd_start,
1020                                       fd_end);
1021 
1022             if (recv_buf_idx[p] < recv_size[p]) {
1023                 if (curr_from_proc[p]+len > done_from_proc[p]) {
1024                     if (done_from_proc[p] > curr_from_proc[p]) {
1025                         size = ADIOI_MIN(curr_from_proc[p] + len - 
1026                               done_from_proc[p], recv_size[p]-recv_buf_idx[p]);
1027                         buf_incr = done_from_proc[p] - curr_from_proc[p];
1028                         ADIOI_BUF_INCR
1029                         buf_incr = curr_from_proc[p]+len-done_from_proc[p];
1030       ADIOI_Assert((done_from_proc[p] + size) == (unsigned)((ADIO_Offset)done_from_proc[p] + size));
1031                         curr_from_proc[p] = done_from_proc[p] + size;
1032                         ADIOI_BUF_COPY
1033                     }
1034                     else {
1035                         size = ADIOI_MIN(len,recv_size[p]-recv_buf_idx[p]);
1036                         buf_incr = len;
1037       ADIOI_Assert((curr_from_proc[p] + size) == (unsigned)((ADIO_Offset)curr_from_proc[p] + size));
1038                         curr_from_proc[p] += (unsigned) size;
1039                         ADIOI_BUF_COPY
1040                     }
1041                 }
1042                 else {
1043         ADIOI_Assert((curr_from_proc[p] + len) == (unsigned)((ADIO_Offset)curr_from_proc[p] + len));
1044                     curr_from_proc[p] += (unsigned) len;
1045                     buf_incr = len;
1046                     ADIOI_BUF_INCR
1047                 }
1048             }
1049             else {
1050                 buf_incr = len;
1051                 ADIOI_BUF_INCR
1052             }
1053             off     += len;
1054             rem_len -= len;
1055         }
1056     }
1057     for (i=0; i < nprocs; i++) 
1058         if (recv_size[i]) recd_from_proc[i] = curr_from_proc[i];
1059 
1060     ADIOI_Free(curr_from_proc);
1061     ADIOI_Free(done_from_proc);
1062     ADIOI_Free(recv_buf_idx);
1063 }