This source file includes following definitions.
- ADIOI_LUSTRE_WriteStridedColl
- ADIOI_LUSTRE_Exch_and_write
- ADIOI_LUSTRE_W_Exchange_data
- ADIOI_LUSTRE_Fill_send_buffer
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 #include "ad_lustre.h"
  12 #include "adio_extern.h"
  13 
  14 
  15 static void ADIOI_LUSTRE_Exch_and_write(ADIO_File fd, const void *buf,
  16                                         MPI_Datatype datatype, int nprocs,
  17                                         int myrank,
  18                                         ADIOI_Access *others_req,
  19                                         ADIOI_Access *my_req,
  20                                         ADIO_Offset *offset_list,
  21                                         ADIO_Offset *len_list,
  22                                         int contig_access_count,
  23                                         int *striping_info,
  24                                         int **buf_idx, int *error_code);
  25 static void ADIOI_LUSTRE_Fill_send_buffer(ADIO_File fd, const void *buf,
  26                                           ADIOI_Flatlist_node *flat_buf,
  27                                           char **send_buf,
  28                                           ADIO_Offset *offset_list,
  29                                           ADIO_Offset *len_list, int *send_size,
  30                                           MPI_Request *requests,
  31                                           int *sent_to_proc, int nprocs,
  32                                           int myrank, int contig_access_count,
  33                                           int *striping_info,
  34                                           int *send_buf_idx,
  35                                           int *curr_to_proc,
  36                                           int *done_to_proc, int iter,
  37                                           MPI_Aint buftype_extent);
  38 static void ADIOI_LUSTRE_W_Exchange_data(ADIO_File fd, const void *buf,
  39                                          char *write_buf,
  40                                          ADIOI_Flatlist_node *flat_buf,
  41                                          ADIO_Offset *offset_list,
  42                                          ADIO_Offset *len_list, int *send_size,
  43                                          int *recv_size, ADIO_Offset off,
  44                                          int size, int *count,
  45                                          int *start_pos, 
  46                                          int *sent_to_proc, int nprocs,
  47                                          int myrank, int buftype_is_contig,
  48                                          int contig_access_count,
  49                                          int *striping_info,
  50                                          ADIOI_Access *others_req,
  51                                          int *send_buf_idx,
  52                                          int *curr_to_proc,
  53                                          int *done_to_proc, int *hole,
  54                                          int iter, MPI_Aint buftype_extent,
  55                                          int *buf_idx,
  56                                          ADIO_Offset **srt_off, int **srt_len, int *srt_num,
  57                                          int *error_code);
  58 void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count,
  59                       ADIO_Offset *srt_off, int *srt_len, int *start_pos,
  60                       int nprocs, int nprocs_recv, int total_elements);
  61 
  62 void ADIOI_LUSTRE_WriteStridedColl(ADIO_File fd, const void *buf, int count,
  63                                    MPI_Datatype datatype,
  64                                    int file_ptr_type, ADIO_Offset offset,
  65                                    ADIO_Status *status, int *error_code)
  66 {
  67     
  68 
  69 
  70 
  71 
  72 
  73 
  74     ADIOI_Access *my_req;
  75     
  76 
  77 
  78     ADIOI_Access *others_req;
  79     
  80 
  81 
  82     int i, filetype_is_contig, nprocs, myrank, do_collect = 0;
  83     int contig_access_count = 0, buftype_is_contig, interleave_count = 0;
  84     int *count_my_req_per_proc, count_my_req_procs, count_others_req_procs;
  85     ADIO_Offset orig_fp, start_offset, end_offset, off;
  86     ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *end_offsets = NULL;
  87     ADIO_Offset *len_list = NULL;
  88     int **buf_idx = NULL, *striping_info = NULL;
  89     int old_error, tmp_error;
  90 
  91     MPI_Comm_size(fd->comm, &nprocs);
  92     MPI_Comm_rank(fd->comm, &myrank);
  93 
  94     orig_fp = fd->fp_ind;
  95 
  96     
  97     if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
  98         
  99 
 100 
 101         
 102 
 103 
 104 
 105         ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,
 106                               &offset_list, &len_list, &start_offset,
 107                               &end_offset, &contig_access_count);
 108 
 109         
 110 
 111 
 112 
 113         st_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs * sizeof(ADIO_Offset));
 114         end_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs * sizeof(ADIO_Offset));
 115         MPI_Allgather(&start_offset, 1, ADIO_OFFSET, st_offsets, 1,
 116                       ADIO_OFFSET, fd->comm);
 117         MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1,
 118                       ADIO_OFFSET, fd->comm);
 119         
 120         for (i = 1; i < nprocs; i++)
 121             if ((st_offsets[i] < end_offsets[i-1]) &&
 122                 (st_offsets[i] <= end_offsets[i]))
 123                 interleave_count++;
 124         
 125 
 126 
 127         
 128 
 129 
 130 
 131         if (interleave_count > 0) {
 132             do_collect = 1;
 133         } else {
 134             do_collect = ADIOI_LUSTRE_Docollect(fd, contig_access_count,
 135                                                 len_list, nprocs);
 136         }
 137     }
 138     ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
 139 
 140     
 141     if ((!do_collect && fd->hints->cb_write == ADIOI_HINT_AUTO) ||
 142         fd->hints->cb_write == ADIOI_HINT_DISABLE) {
 143 
 144         
 145         if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
 146             ADIOI_Free(offset_list);
 147             ADIOI_Free(len_list);
 148             ADIOI_Free(st_offsets);
 149             ADIOI_Free(end_offsets);
 150         }
 151 
 152         fd->fp_ind = orig_fp;
 153         ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
 154         if (buftype_is_contig && filetype_is_contig) {
 155             if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
 156                 off = fd->disp + (ADIO_Offset)(fd->etype_size) * offset;
 157                 ADIO_WriteContig(fd, buf, count, datatype,
 158                                  ADIO_EXPLICIT_OFFSET,
 159                                  off, status, error_code);
 160             } else
 161                 ADIO_WriteContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
 162                                  0, status, error_code);
 163         } else {
 164             ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type,
 165                               offset, status, error_code);
 166         }
 167         return;
 168     }
 169 
 170     
 171     ADIOI_LUSTRE_Get_striping_info(fd, &striping_info, 1);
 172 
 173     
 174 
 175 
 176     ADIOI_LUSTRE_Calc_my_req(fd, offset_list, len_list, contig_access_count,
 177                              striping_info, nprocs, &count_my_req_procs,
 178                              &count_my_req_per_proc, &my_req,
 179                              &buf_idx);
 180 
 181     
 182 
 183 
 184 
 185 
 186 
 187 
 188 
 189     ADIOI_Calc_others_req(fd, count_my_req_procs, count_my_req_per_proc,
 190                           my_req, nprocs, myrank, &count_others_req_procs,
 191                           &others_req);
 192     ADIOI_Free(count_my_req_per_proc);
 193 
 194     
 195     ADIOI_LUSTRE_Exch_and_write(fd, buf, datatype, nprocs, myrank,
 196                                 others_req, my_req, offset_list, len_list,
 197                                 contig_access_count, striping_info,
 198                                 buf_idx, error_code);
 199 
 200     
 201 
 202 
 203 
 204 
 205 
 206 
 207 
 208 
 209 
 210 
 211     old_error = *error_code;
 212     if (*error_code != MPI_SUCCESS)
 213         *error_code = MPI_ERR_IO;
 214 
 215     
 216 
 217 #ifdef ADIOI_MPE_LOGGING
 218     MPE_Log_event(ADIOI_MPE_postwrite_a, 0, NULL);
 219 #endif
 220     if (fd->hints->cb_nodes == 1)
 221         MPI_Bcast(error_code, 1, MPI_INT,
 222                   fd->hints->ranklist[0], fd->comm);
 223     else {
 224         tmp_error = *error_code;
 225         MPI_Allreduce(&tmp_error, error_code, 1, MPI_INT,
 226                       MPI_MAX, fd->comm);
 227     }
 228 #ifdef ADIOI_MPE_LOGGING
 229     MPE_Log_event(ADIOI_MPE_postwrite_b, 0, NULL);
 230 #endif
 231 
 232     if ((old_error != MPI_SUCCESS) && (old_error != MPI_ERR_IO))
 233         *error_code = old_error;
 234 
 235 
 236     if (!buftype_is_contig)
 237         ADIOI_Delete_flattened(datatype);
 238 
 239     
 240     
 241     for (i = 0; i < nprocs; i++) {
 242         if (others_req[i].count) {
 243             ADIOI_Free(others_req[i].offsets);
 244             ADIOI_Free(others_req[i].lens);
 245             ADIOI_Free(others_req[i].mem_ptrs);
 246         }
 247     }
 248     ADIOI_Free(others_req);
 249     
 250     for (i = 0; i < nprocs; i++) {
 251         if (my_req[i].count) {
 252             ADIOI_Free(my_req[i].offsets);
 253             ADIOI_Free(my_req[i].lens);
 254         }
 255     }
 256     ADIOI_Free(my_req);
 257     for (i = 0; i < nprocs; i++) {
 258         ADIOI_Free(buf_idx[i]);
 259     }
 260     ADIOI_Free(buf_idx);
 261     ADIOI_Free(offset_list);
 262     ADIOI_Free(len_list);
 263     ADIOI_Free(st_offsets);
 264     ADIOI_Free(end_offsets);
 265     ADIOI_Free(striping_info);
 266 
 267 #ifdef HAVE_STATUS_SET_BYTES
 268     if (status) {
 269         MPI_Count bufsize, size;
 270         
 271         MPI_Type_size_x(datatype, &size);
 272         bufsize = size * count;
 273         MPIR_Status_set_bytes(status, datatype, bufsize);
 274     }
 275     
 276 
 277 
 278 #endif
 279 
 280     fd->fp_sys_posn = -1;       
 281 }
 282 
 283 
 284 
 285 
 286 static void ADIOI_LUSTRE_Exch_and_write(ADIO_File fd, const void *buf,
 287                                         MPI_Datatype datatype, int nprocs,
 288                                         int myrank, ADIOI_Access *others_req,
 289                                         ADIOI_Access *my_req,
 290                                         ADIO_Offset *offset_list,
 291                                         ADIO_Offset *len_list, 
 292                                         int contig_access_count,
 293                                         int *striping_info, int **buf_idx,
 294                                         int *error_code)
 295 {
 296     
 297 
 298 
 299 
 300 
 301 
 302 
 303 
 304 
 305 
 306     int hole, i, j, m, flag, ntimes = 1 , max_ntimes, buftype_is_contig;
 307     ADIO_Offset st_loc = -1, end_loc = -1, min_st_loc, max_end_loc;
 308     ADIO_Offset off, req_off, send_off, iter_st_off, *off_list;
 309     ADIO_Offset max_size, step_size = 0;
 310     int real_size, req_len, send_len;
 311     int *recv_curr_offlen_ptr, *recv_count, *recv_size;
 312     int *send_curr_offlen_ptr, *send_size;
 313     int *sent_to_proc, *recv_start_pos;
 314     int *send_buf_idx, *curr_to_proc, *done_to_proc;
 315     int *this_buf_idx;
 316     char *write_buf = NULL;
 317     MPI_Status status;
 318     ADIOI_Flatlist_node *flat_buf = NULL;
 319     MPI_Aint buftype_extent, buftype_lb;
 320     int stripe_size = striping_info[0], avail_cb_nodes = striping_info[2];
 321     int data_sieving = 0;
 322     ADIO_Offset *srt_off = NULL;
 323     int *srt_len = NULL;
 324     int srt_num = 0;
 325     ADIO_Offset block_offset;
 326     int block_len;
 327 
 328     *error_code = MPI_SUCCESS;  
 329     
 330 
 331     
 332 
 333 
 334 
 335 
 336 
 337 
 338     for (i = 0; i < nprocs; i++) {
 339         if (others_req[i].count) {
 340             st_loc = others_req[i].offsets[0];
 341             end_loc = others_req[i].offsets[0];
 342             break;
 343         }
 344     }
 345     for (i = 0; i < nprocs; i++) {
 346         for (j = 0; j < others_req[i].count; j++) {
 347             st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);
 348             end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j] +
 349                                           others_req[i].lens[j] - 1));
 350         }
 351     }
 352     
 353     if ((st_loc == -1) && (end_loc == -1))
 354         ntimes = 0;
 355     MPI_Allreduce(&end_loc, &max_end_loc, 1, MPI_LONG_LONG_INT, MPI_MAX, fd->comm);
 356     
 357     if (st_loc == -1)
 358         st_loc = max_end_loc;
 359     MPI_Allreduce(&st_loc, &min_st_loc, 1, MPI_LONG_LONG_INT, MPI_MIN, fd->comm);
 360     
 361     min_st_loc -= min_st_loc % (ADIO_Offset)stripe_size;
 362 
 363     
 364 
 365 
 366 
 367     step_size = (ADIO_Offset) avail_cb_nodes * stripe_size;
 368     max_ntimes = (max_end_loc - min_st_loc + 1) / step_size
 369         + (((max_end_loc - min_st_loc + 1) % step_size) ? 1 : 0);
 370 
 371     if (ntimes)
 372         write_buf = (char *) ADIOI_Malloc(stripe_size);
 373 
 374     
 375     off_list = (ADIO_Offset *) ADIOI_Malloc(max_ntimes * sizeof(ADIO_Offset));
 376     for (m = 0; m < max_ntimes; m ++)
 377         off_list[m] = max_end_loc;
 378     for (i = 0; i < nprocs; i++) {
 379         for (j = 0; j < others_req[i].count; j ++) {
 380             req_off = others_req[i].offsets[j];
 381             m = (int)((req_off - min_st_loc) / step_size);
 382             off_list[m] = ADIOI_MIN(off_list[m], req_off);
 383         }
 384     }
 385 
 386     recv_curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
 387     send_curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
 388     
 389 
 390     recv_count = (int *) ADIOI_Malloc(nprocs * sizeof(int));
 391     
 392 
 393 
 394     send_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
 395     
 396 
 397 
 398     recv_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
 399     
 400 
 401     sent_to_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int));
 402     
 403 
 404 
 405     send_buf_idx = (int *) ADIOI_Malloc(nprocs * sizeof(int));
 406     curr_to_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int));
 407     done_to_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int));
 408     
 409 
 410     this_buf_idx = (int *) ADIOI_Malloc(nprocs * sizeof(int));
 411 
 412     recv_start_pos = (int *) ADIOI_Malloc(nprocs * sizeof(int));
 413     
 414 
 415 
 416     ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
 417     if (!buftype_is_contig) {
 418         flat_buf = ADIOI_Flatten_and_find(datatype);
 419     }
 420     MPI_Type_get_extent(datatype, &buftype_lb, &buftype_extent);
 421     
 422 
 423 
 424 
 425 
 426 
 427     
 428 
 429 
 430 
 431 
 432     iter_st_off = min_st_loc;
 433 
 434     
 435 
 436 
 437 
 438 
 439 
 440 
 441 
 442 
 443     
 444     data_sieving = fd->hints->fs_hints.lustre.ds_in_coll;
 445 
 446     for (m = 0; m < max_ntimes; m++) {
 447         
 448 
 449 
 450 
 451         
 452 
 453 
 454 
 455 
 456         
 457 
 458 
 459 
 460 
 461 
 462 
 463 
 464 
 465 
 466 
 467 
 468 
 469         
 470         for (i = 0; i < nprocs; i++)
 471             recv_count[i] = recv_size[i] = send_size[i] = 0;
 472 
 473         off = off_list[m];
 474         max_size = ADIOI_MIN(step_size, max_end_loc - iter_st_off + 1);
 475         real_size = (int) ADIOI_MIN((off / stripe_size + 1) * stripe_size -
 476                                     off,
 477                                     end_loc - off + 1);
 478 
 479         for (i = 0; i < nprocs; i++) {
 480             if (my_req[i].count) {
 481                 this_buf_idx[i] = buf_idx[i][send_curr_offlen_ptr[i]];
 482                 for (j = send_curr_offlen_ptr[i]; j < my_req[i].count; j++) {
 483                     send_off = my_req[i].offsets[j];
 484                     send_len = my_req[i].lens[j];
 485                     if (send_off < iter_st_off + max_size) {
 486                         send_size[i] += send_len;
 487                     } else {
 488                         break;
 489                     }
 490                 }
 491                 send_curr_offlen_ptr[i] = j;
 492             }
 493             if (others_req[i].count) {
 494                 recv_start_pos[i] = recv_curr_offlen_ptr[i];
 495                 for (j = recv_curr_offlen_ptr[i]; j < others_req[i].count; j++) {
 496                     req_off = others_req[i].offsets[j];
 497                     req_len = others_req[i].lens[j];
 498                     if (req_off < iter_st_off + max_size) {
 499                         recv_count[i]++;
 500                         ADIOI_Assert((((ADIO_Offset)(MPIU_Upint)write_buf)+req_off-off) == (ADIO_Offset)(MPIU_Upint)(write_buf+req_off-off));
 501                         MPI_Get_address(write_buf + req_off - off,
 502                                         &(others_req[i].mem_ptrs[j]));
 503                         recv_size[i] += req_len;
 504                     } else {
 505                         break;
 506                     }
 507                 }
 508                 recv_curr_offlen_ptr[i] = j;
 509             }
 510         }
 511         
 512         hole = data_sieving;
 513         ADIOI_LUSTRE_W_Exchange_data(fd, buf, write_buf, flat_buf, offset_list,
 514                                      len_list, send_size, recv_size, off, real_size,
 515                                      recv_count, recv_start_pos,
 516                                      sent_to_proc, nprocs, myrank,
 517                                      buftype_is_contig, contig_access_count,
 518                                      striping_info, others_req, send_buf_idx,
 519                                      curr_to_proc, done_to_proc, &hole, m,
 520                                   buftype_extent, this_buf_idx,
 521                                   &srt_off, &srt_len, &srt_num, error_code);
 522 
 523         if (*error_code != MPI_SUCCESS)
 524             goto over;
 525 
 526         flag = 0;
 527         for (i = 0; i < nprocs; i++)
 528             if (recv_count[i]) {
 529                 flag = 1;
 530                 break;
 531             }
 532         if (flag) {
 533             
 534             if(data_sieving == ADIOI_HINT_ENABLE) {
 535                 ADIO_WriteContig(fd, write_buf, real_size, MPI_BYTE,
 536                                  ADIO_EXPLICIT_OFFSET, off, &status,
 537                                  error_code);
 538             } else {
 539                 
 540 
 541                 if (!hole) {
 542                     ADIO_WriteContig(fd, write_buf, real_size, MPI_BYTE,
 543                                      ADIO_EXPLICIT_OFFSET, off, &status,
 544                                      error_code);
 545                 } else {
 546                     block_offset = -1;
 547                     block_len = 0;
 548                     for (i = 0; i < srt_num; ++i) {
 549                         if (srt_off[i] < off + real_size &&
 550                             srt_off[i] >= off) {
 551                             if (block_offset == -1) {
 552                                 block_offset = srt_off[i];
 553                                 block_len = srt_len[i];
 554                             } else {
 555                                 if (srt_off[i] == block_offset + block_len) {
 556                                     block_len += srt_len[i];
 557                                 } else {
 558                                     ADIO_WriteContig(fd,
 559                                                      write_buf + block_offset - off,
 560                                                      block_len,
 561                                                      MPI_BYTE, ADIO_EXPLICIT_OFFSET,
 562                                                      block_offset, &status,
 563                                                      error_code);
 564                                     if (*error_code != MPI_SUCCESS)
 565                                         goto over;
 566                                     block_offset = srt_off[i];
 567                                     block_len = srt_len[i];
 568                                 }
 569                             }
 570                         }
 571                     }
 572                     if (block_offset != -1) {
 573                         ADIO_WriteContig(fd,
 574                                          write_buf + block_offset - off,
 575                                          block_len,
 576                                          MPI_BYTE, ADIO_EXPLICIT_OFFSET,
 577                                          block_offset, &status,
 578                                          error_code);
 579                         if (*error_code != MPI_SUCCESS)
 580                             goto over;
 581                     }
 582                 }
 583             }
 584             if (*error_code != MPI_SUCCESS)
 585                 goto over;
 586         }
 587         iter_st_off += max_size;
 588     }
 589 over:
 590     if (srt_off)
 591         ADIOI_Free(srt_off);
 592     if (srt_len)
 593         ADIOI_Free(srt_len);
 594     if (ntimes)
 595         ADIOI_Free(write_buf);
 596     ADIOI_Free(recv_curr_offlen_ptr);
 597     ADIOI_Free(send_curr_offlen_ptr);
 598     ADIOI_Free(recv_count);
 599     ADIOI_Free(send_size);
 600     ADIOI_Free(recv_size);
 601     ADIOI_Free(sent_to_proc);
 602     ADIOI_Free(recv_start_pos);
 603     ADIOI_Free(send_buf_idx);
 604     ADIOI_Free(curr_to_proc);
 605     ADIOI_Free(done_to_proc);
 606     ADIOI_Free(this_buf_idx);
 607     ADIOI_Free(off_list);
 608 }
 609 
 610 
 611 
 612 
 613 static void ADIOI_LUSTRE_W_Exchange_data(ADIO_File fd, const void *buf,
 614                                          char *write_buf,
 615                                          ADIOI_Flatlist_node *flat_buf,
 616                                          ADIO_Offset *offset_list,
 617                                          ADIO_Offset *len_list, int *send_size,
 618                                          int *recv_size, ADIO_Offset off,
 619                                          int size, int *count,
 620                                          int *start_pos, 
 621                                          int *sent_to_proc, int nprocs,
 622                                          int myrank, int buftype_is_contig,
 623                                          int contig_access_count,
 624                                          int *striping_info,
 625                                          ADIOI_Access *others_req,
 626                                          int *send_buf_idx,
 627                                          int *curr_to_proc, int *done_to_proc,
 628                                          int *hole, int iter,
 629                                          MPI_Aint buftype_extent,
 630                                          int *buf_idx,
 631                           ADIO_Offset **srt_off, int **srt_len, int *srt_num,
 632                           int *error_code)
 633 {
 634     int i, j, nprocs_recv, nprocs_send, err;
 635     char **send_buf = NULL;
 636     MPI_Request *requests, *send_req;
 637     MPI_Datatype *recv_types;
 638     MPI_Status *statuses, status;
 639     int sum_recv;
 640     int data_sieving = *hole;
 641     static char myname[] = "ADIOI_W_EXCHANGE_DATA";
 642 
 643     
 644     nprocs_recv = 0;
 645     for (i = 0; i < nprocs; i++)
 646         if (recv_size[i])
 647             nprocs_recv++;
 648 
 649     recv_types = (MPI_Datatype *) ADIOI_Malloc((nprocs_recv + 1) *
 650                                                sizeof(MPI_Datatype));
 651     
 652 
 653     j = 0;
 654     for (i = 0; i < nprocs; i++) {
 655         if (recv_size[i]) {
 656             ADIOI_Type_create_hindexed_x(count[i],
 657                               &(others_req[i].lens[start_pos[i]]),
 658                               &(others_req[i].mem_ptrs[start_pos[i]]),
 659                               MPI_BYTE, recv_types + j);
 660             
 661             MPI_Type_commit(recv_types + j);
 662             j++;
 663         }
 664     }
 665 
 666     
 667 
 668 
 669 
 670 
 671     *srt_num = 0;
 672     for (i = 0; i < nprocs; i++)
 673         *srt_num += count[i];
 674     if (*srt_off)
 675         *srt_off = (ADIO_Offset *) ADIOI_Realloc(*srt_off, (*srt_num + 1) * sizeof(ADIO_Offset));
 676     else
 677         *srt_off = (ADIO_Offset *) ADIOI_Malloc((*srt_num + 1) * sizeof(ADIO_Offset));
 678     if (*srt_len)
 679         *srt_len = (int *) ADIOI_Realloc(*srt_len, (*srt_num + 1) * sizeof(int));
 680     else
 681         *srt_len = (int *) ADIOI_Malloc((*srt_num + 1) * sizeof(int));
 682     
 683 
 684     ADIOI_Heap_merge(others_req, count, *srt_off, *srt_len, start_pos,
 685                      nprocs, nprocs_recv, *srt_num);
 686 
 687     
 688     *hole = 0;
 689     for (i = 0; i < *srt_num - 1; i++) {
 690         if ((*srt_off)[i] + (*srt_len)[i] < (*srt_off)[i + 1]) {
 691             *hole = 1;
 692             break;
 693         }
 694     }
 695     
 696 
 697 
 698 
 699 
 700 
 701     if (*hole == 0) {
 702         sum_recv = 0;
 703         for (i = 0; i < nprocs; i++)
 704             sum_recv += recv_size[i];
 705         if (size > sum_recv)
 706             *hole = 1;
 707     }
 708     
 709     if (data_sieving == ADIOI_HINT_ENABLE && nprocs_recv && *hole) {
 710         ADIO_ReadContig(fd, write_buf, size, MPI_BYTE,
 711                         ADIO_EXPLICIT_OFFSET, off, &status, &err);
 712         
 713         if (err != MPI_SUCCESS) {
 714             *error_code = MPIO_Err_create_code(err,
 715                                                MPIR_ERR_RECOVERABLE,
 716                                                myname, __LINE__,
 717                                                MPI_ERR_IO,
 718                                                "**ioRMWrdwr", 0);
 719             ADIOI_Free(recv_types);
 720             return;
 721         }
 722         
 723     }
 724 
 725     nprocs_send = 0;
 726     for (i = 0; i < nprocs; i++)
 727         if (send_size[i])
 728             nprocs_send++;
 729 
 730     if (fd->atomicity) {
 731         
 732         requests = (MPI_Request *) ADIOI_Malloc((nprocs_send + 1) *
 733                                                 sizeof(MPI_Request));
 734         send_req = requests;
 735     } else {
 736         requests = (MPI_Request *) ADIOI_Malloc((nprocs_send + nprocs_recv + 1)*
 737                                                 sizeof(MPI_Request));
 738         
 739 
 740         
 741         j = 0;
 742         for (i = 0; i < nprocs; i++) {
 743             if (recv_size[i]) {
 744                 MPI_Irecv(MPI_BOTTOM, 1, recv_types[j], i,
 745                           myrank + i + 100 * iter, fd->comm, requests + j);
 746                 j++;
 747             }
 748         }
 749         send_req = requests + nprocs_recv;
 750     }
 751 
 752     
 753 
 754 
 755 
 756     if (buftype_is_contig) {
 757         j = 0;
 758         for (i = 0; i < nprocs; i++)
 759             if (send_size[i]) {
 760                 ADIOI_Assert(buf_idx[i] != -1);
 761                 MPI_Isend(((char *) buf) + buf_idx[i], send_size[i],
 762                           MPI_BYTE, i, myrank + i + 100 * iter, fd->comm,
 763                           send_req + j);
 764                 j++;
 765             }
 766     } else
 767         if (nprocs_send) {
 768         
 769         send_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char *));
 770         for (i = 0; i < nprocs; i++)
 771             if (send_size[i])
 772                 send_buf[i] = (char *) ADIOI_Malloc(send_size[i]);
 773 
 774         ADIOI_LUSTRE_Fill_send_buffer(fd, buf, flat_buf, send_buf, offset_list,
 775                                       len_list, send_size, send_req,
 776                                       sent_to_proc, nprocs, myrank,
 777                                       contig_access_count, striping_info,
 778                                       send_buf_idx, curr_to_proc, done_to_proc,
 779                                       iter, buftype_extent);
 780         
 781     }
 782 
 783         
 784     if (fd->atomicity) {
 785         j = 0;
 786         for (i = 0; i < nprocs; i++) {
 787             MPI_Status wkl_status;
 788             if (recv_size[i]) {
 789                 MPI_Recv(MPI_BOTTOM, 1, recv_types[j], i,
 790                          myrank + i + 100 * iter, fd->comm, &wkl_status);
 791                 j++;
 792             }
 793         }
 794     }
 795 
 796     for (i = 0; i < nprocs_recv; i++)
 797         MPI_Type_free(recv_types + i);
 798     ADIOI_Free(recv_types);
 799 
 800         
 801         
 802     if (fd->atomicity) {
 803         statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send + 1) *
 804                                                sizeof(MPI_Status));
 805     } else {
 806         statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send + nprocs_recv + 1) *
 807                                                sizeof(MPI_Status));
 808     }
 809 
 810 #ifdef NEEDS_MPI_TEST
 811     i = 0;
 812     if (fd->atomicity) {
 813         
 814         while (!i)
 815             MPI_Testall(nprocs_send, send_req, &i, statuses);
 816     } else {
 817         while (!i)
 818             MPI_Testall(nprocs_send + nprocs_recv, requests, &i, statuses);
 819     }
 820 #else
 821         
 822     if (fd->atomicity)
 823         MPI_Waitall(nprocs_send, send_req, statuses);
 824     else
 825         MPI_Waitall(nprocs_send + nprocs_recv, requests, statuses);
 826 #endif
 827     ADIOI_Free(statuses);
 828     ADIOI_Free(requests);
 829     if (!buftype_is_contig && nprocs_send) {
 830         for (i = 0; i < nprocs; i++)
 831             if (send_size[i])
 832                 ADIOI_Free(send_buf[i]);
 833         ADIOI_Free(send_buf);
 834     }
 835 }
 836 
 837 #define ADIOI_BUF_INCR \
 838 { \
 839     while (buf_incr) { \
 840         size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \
 841         user_buf_idx += size_in_buf; \
 842         flat_buf_sz -= size_in_buf; \
 843         if (!flat_buf_sz) { \
 844             if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
 845             else { \
 846                 flat_buf_idx = 0; \
 847                 n_buftypes++; \
 848             } \
 849             user_buf_idx = flat_buf->indices[flat_buf_idx] + \
 850                 (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent;  \
 851             flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
 852         } \
 853         buf_incr -= size_in_buf; \
 854     } \
 855 }
 856 
 857 
 858 #define ADIOI_BUF_COPY \
 859 { \
 860     while (size) { \
 861         size_in_buf = ADIOI_MIN(size, flat_buf_sz); \
 862         ADIOI_Assert((((ADIO_Offset)(MPIU_Upint)buf) + user_buf_idx) == (ADIO_Offset)(MPIU_Upint)((MPIU_Upint)buf + user_buf_idx)); \
 863         ADIOI_Assert(size_in_buf == (size_t)size_in_buf);               \
 864         memcpy(&(send_buf[p][send_buf_idx[p]]), \
 865                ((char *) buf) + user_buf_idx, size_in_buf); \
 866         send_buf_idx[p] += size_in_buf; \
 867         user_buf_idx += size_in_buf; \
 868         flat_buf_sz -= size_in_buf; \
 869         if (!flat_buf_sz) { \
 870             if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
 871             else { \
 872                 flat_buf_idx = 0; \
 873                 n_buftypes++; \
 874             } \
 875             user_buf_idx = flat_buf->indices[flat_buf_idx] + \
 876                 (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent;    \
 877             flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
 878         } \
 879         size -= size_in_buf; \
 880         buf_incr -= size_in_buf; \
 881     } \
 882     ADIOI_BUF_INCR \
 883 }
 884 
 885 static void ADIOI_LUSTRE_Fill_send_buffer(ADIO_File fd, const void *buf,
 886                                           ADIOI_Flatlist_node *flat_buf,
 887                                           char **send_buf,
 888                                           ADIO_Offset *offset_list,
 889                                           ADIO_Offset *len_list, int *send_size,
 890                                           MPI_Request *requests,
 891                                           int *sent_to_proc, int nprocs,
 892                                           int myrank,
 893                                           int contig_access_count,
 894                                           int *striping_info,
 895                                           int *send_buf_idx,
 896                                           int *curr_to_proc,
 897                                           int *done_to_proc, int iter,
 898                                           MPI_Aint buftype_extent)
 899 {
 900     
 901     int i, p, flat_buf_idx, size;
 902     int flat_buf_sz, buf_incr, size_in_buf, jj, n_buftypes;
 903     ADIO_Offset off, len, rem_len, user_buf_idx;
 904 
 905     
 906 
 907 
 908 
 909 
 910 
 911 
 912 
 913     for (i = 0; i < nprocs; i++) {
 914         send_buf_idx[i] = curr_to_proc[i] = 0;
 915         done_to_proc[i] = sent_to_proc[i];
 916     }
 917     jj = 0;
 918 
 919     user_buf_idx = flat_buf->indices[0];
 920     flat_buf_idx = 0;
 921     n_buftypes = 0;
 922     flat_buf_sz = flat_buf->blocklens[0];
 923 
 924     
 925 
 926 
 927     for (i = 0; i < contig_access_count; i++) {
 928         off = offset_list[i];
 929         rem_len = (ADIO_Offset) len_list[i];
 930 
 931         
 932         while (rem_len != 0) {
 933             len = rem_len;
 934             
 935 
 936 
 937 
 938             p = ADIOI_LUSTRE_Calc_aggregator(fd, off, &len, striping_info);
 939 
 940             if (send_buf_idx[p] < send_size[p]) {
 941                 if (curr_to_proc[p] + len > done_to_proc[p]) {
 942                     if (done_to_proc[p] > curr_to_proc[p]) {
 943                         size = (int) ADIOI_MIN(curr_to_proc[p] + len -
 944                                                done_to_proc[p],
 945                                                send_size[p] -
 946                                                send_buf_idx[p]);
 947                         buf_incr = done_to_proc[p] - curr_to_proc[p];
 948                         ADIOI_BUF_INCR
 949                         ADIOI_Assert((curr_to_proc[p] + len - done_to_proc[p]) == (unsigned)(curr_to_proc[p] + len - done_to_proc[p]));
 950                             buf_incr = (int) (curr_to_proc[p] + len -
 951                                               done_to_proc[p]);
 952                         ADIOI_Assert((done_to_proc[p] + size) == (unsigned)(done_to_proc[p] + size));
 953                         curr_to_proc[p] = done_to_proc[p] + size;
 954                         ADIOI_BUF_COPY
 955                     } else {
 956                         size = (int) ADIOI_MIN(len, send_size[p] -
 957                                                send_buf_idx[p]);
 958                         buf_incr = (int) len;
 959                         ADIOI_Assert((curr_to_proc[p] + size) == (unsigned)((ADIO_Offset)curr_to_proc[p] + size));
 960                         curr_to_proc[p] += size;
 961                         ADIOI_BUF_COPY
 962                     }
 963                     if (send_buf_idx[p] == send_size[p]) {
 964                         MPI_Isend(send_buf[p], send_size[p], MPI_BYTE, p,
 965                                   myrank + p + 100 * iter, fd->comm,
 966                                   requests + jj);
 967                         jj++;
 968                     }
 969                 } else {
 970                     ADIOI_Assert((curr_to_proc[p] + len) == (unsigned)((ADIO_Offset)curr_to_proc[p] + len));
 971                     curr_to_proc[p] += (int) len;
 972                     buf_incr = (int) len;
 973                     ADIOI_BUF_INCR
 974                 }
 975             } else {
 976                 buf_incr = (int) len;
 977                 ADIOI_BUF_INCR
 978             }
 979             off += len;
 980             rem_len -= len;
 981         }
 982     }
 983     for (i = 0; i < nprocs; i++)
 984         if (send_size[i])
 985             sent_to_proc[i] = curr_to_proc[i];
 986 }