root/ompi/mca/fcoll/two_phase/fcoll_two_phase_file_write_all.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_fcoll_two_phase_file_write_all
  2. two_phase_exch_and_write
  3. two_phase_exchage_data
  4. two_phase_fill_send_buffer
  5. two_phase_heap_merge
  6. is_aggregator

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   4  *                         University Research and Technology
   5  *                         Corporation.  All rights reserved.
   6  * Copyright (c) 2004-2017 The University of Tennessee and The University
   7  *                         of Tennessee Research Foundation.  All rights
   8  *                         reserved.
   9  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
  10  *                         University of Stuttgart.  All rights reserved.
  11  * Copyright (c) 2004-2005 The Regents of the University of California.
  12  *                         All rights reserved.
  13  * Copyright (c) 2008-2014 University of Houston. All rights reserved.
  14  * Copyright (c) 2015-2018 Research Organization for Information Science
  15  *                         and Technology (RIST). All rights reserved.
  16  * Copyright (c) 2015-2016 Los Alamos National Security, LLC. All rights
  17  *                         reserved.
  18  * $COPYRIGHT$
  19  *
  20  * Additional copyrights may follow
  21  *
  22  * $HEADER$
  23  */
  24 
  25 #include "ompi_config.h"
  26 #include "fcoll_two_phase.h"
  27 
  28 #include "mpi.h"
  29 #include "ompi/constants.h"
  30 #include "ompi/communicator/communicator.h"
  31 #include "ompi/mca/fcoll/fcoll.h"
  32 #include "ompi/mca/common/ompio/common_ompio.h"
  33 #include "ompi/mca/io/io.h"
  34 #include "opal/mca/base/base.h"
  35 #include "math.h"
  36 #include "ompi/mca/pml/pml.h"
  37 #include <unistd.h>
  38 
  39 #define DEBUG_ON 0
  40 
  41 /* Two Phase implementation from ROMIO ported to OMPIO infrastructure
  42  * This is pretty much the same as ROMIO's two_phase and based on ROMIO's code
  43  * base
  44  */
  45 
  46 
  47 /* Datastructure to support specifying the flat-list. */
  48 typedef struct flat_list_node {
  49     MPI_Datatype type;
  50     int count;
  51     OMPI_MPI_OFFSET_TYPE *blocklens;
  52     OMPI_MPI_OFFSET_TYPE *indices;
  53     struct flat_list_node *next;
  54 } Flatlist_node;
  55 
  56 
  57 
  58 /* local function declarations  */
  59 static int two_phase_exch_and_write(ompio_file_t *fh,
  60                                     const void *buf,
  61                                     MPI_Datatype datatype,
  62                                     mca_common_ompio_access_array_t *others_req,
  63                                     struct iovec *offset_len,
  64                                     int contig_access_count,
  65                                     OMPI_MPI_OFFSET_TYPE min_st_offset,
  66                                     OMPI_MPI_OFFSET_TYPE fd_size,
  67                                     OMPI_MPI_OFFSET_TYPE *fd_start,
  68                                     OMPI_MPI_OFFSET_TYPE *fd_end,
  69                                     Flatlist_node *flat_buf,
  70                                     size_t *buf_idx, int striping_unit,
  71                                     int num_io_procs, int *aggregator_list);
  72 
  73 
  74 
  75 static int  two_phase_exchage_data(ompio_file_t *fh,
  76                                    const void *buf,
  77                                    char *write_buf,
  78                                    struct iovec *offset_length,
  79                                    int *send_size, int *start_pos,
  80                                    int *recv_size,
  81                                    OMPI_MPI_OFFSET_TYPE off,
  82                                    OMPI_MPI_OFFSET_TYPE size, int *count,
  83                                    int *partial_recv, int *sent_to_proc,
  84                                    int contig_access_count,
  85                                    OMPI_MPI_OFFSET_TYPE min_st_offset,
  86                                    OMPI_MPI_OFFSET_TYPE fd_size,
  87                                    OMPI_MPI_OFFSET_TYPE *fd_start,
  88                                    OMPI_MPI_OFFSET_TYPE *fd_end,
  89                                    Flatlist_node *flat_buf,
  90                                    mca_common_ompio_access_array_t *others_req,
  91                                    int *send_buf_idx, int *curr_to_proc,
  92                                    int *done_to_proc, int iter,
  93                                    size_t *buf_idx, MPI_Aint buftype_extent,
  94                                    int striping_unit, int num_io_procs,
  95                                    int *aggregator_list,  int *hole);
  96 
  97 
  98 static int two_phase_fill_send_buffer(ompio_file_t *fh,
  99                                       const void *buf,
 100                                       Flatlist_node *flat_buf,
 101                                       char **send_buf,
 102                                       struct iovec *offset_length,
 103                                       int *send_size,
 104                                       MPI_Request *send_req,
 105                                       int *sent_to_proc,
 106                                       int contig_access_count,
 107                                       OMPI_MPI_OFFSET_TYPE min_st_offset,
 108                                       OMPI_MPI_OFFSET_TYPE fd_size,
 109                                       OMPI_MPI_OFFSET_TYPE *fd_start,
 110                                       OMPI_MPI_OFFSET_TYPE *fd_end,
 111                                       int *send_buf_idx,
 112                                       int *curr_to_proc,
 113                                       int *done_to_proc,
 114                                       int iter, MPI_Aint buftype_extent,
 115                                       int striping_unit,
 116                                       int num_io_procs, int *aggregator_list);
 117 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 118 static int is_aggregator(int rank,
 119                          int nprocs_for_coll,
 120                          int *aggregator_list);
 121 #endif
 122 
 123 void two_phase_heap_merge(mca_common_ompio_access_array_t *others_req,
 124                           int *count,
 125                           OMPI_MPI_OFFSET_TYPE *srt_off,
 126                           int *srt_len,
 127                           int *start_pos,
 128                           int myrank,
 129                           int nprocs,
 130                           int nprocs_recv,
 131                           int total_elements);
 132 
 133 
 134 /* local function declarations  ends here!*/
 135 
 136 
 137 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 138 double write_time = 0.0, start_write_time = 0.0, end_write_time = 0.0;
 139 double comm_time = 0.0, start_comm_time = 0.0, end_comm_time = 0.0;
 140 double exch_write = 0.0, start_exch = 0.0, end_exch = 0.0;
 141 #endif
 142 
 143 int
 144 mca_fcoll_two_phase_file_write_all (ompio_file_t *fh,
 145                                     const void *buf,
 146                                     int count,
 147                                     struct ompi_datatype_t *datatype,
 148                                     ompi_status_public_t *status)
 149 {
 150 
 151 
 152 
 153     int i, j,interleave_count=0, striping_unit=0;
 154     uint32_t iov_count=0,ti;
 155     struct iovec *decoded_iov=NULL, *temp_iov=NULL;
 156     size_t max_data = 0, total_bytes = 0;
 157     long long_max_data = 0, long_total_bytes = 0;
 158     int domain_size=0, *count_my_req_per_proc=NULL, count_my_req_procs;
 159     int count_other_req_procs,  ret=OMPI_SUCCESS;
 160     int two_phase_num_io_procs=1;
 161     size_t *buf_indices=NULL;
 162     int local_count = 0, local_size=0,*aggregator_list = NULL;
 163     struct iovec *iov = NULL;
 164 
 165     OMPI_MPI_OFFSET_TYPE start_offset, end_offset, fd_size;
 166     OMPI_MPI_OFFSET_TYPE *start_offsets=NULL, *end_offsets=NULL;
 167     OMPI_MPI_OFFSET_TYPE *fd_start=NULL, *fd_end=NULL, min_st_offset;
 168     Flatlist_node *flat_buf=NULL;
 169     mca_common_ompio_access_array_t *my_req=NULL, *others_req=NULL;
 170     MPI_Aint send_buf_addr;
 171 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 172     mca_common_ompio_print_entry nentry;
 173 #endif
 174 
 175 
 176 //    if (opal_datatype_is_predefined(&datatype->super)) {
 177 //      fh->f_flags = fh->f_flags |  OMPIO_CONTIGUOUS_MEMORY;
 178 //    }
 179 
 180 
 181     if (! (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) {
 182 
 183         ret =   mca_common_ompio_decode_datatype ((struct ompio_file_t *)fh,
 184                                                   datatype,
 185                                                   count,
 186                                                   buf,
 187                                                   &max_data,
 188                                                   fh->f_mem_convertor,
 189                                                   &temp_iov,
 190                                                   &iov_count);
 191         if (OMPI_SUCCESS != ret ){
 192             goto exit;
 193         }
 194 
 195         send_buf_addr = (ptrdiff_t)buf;
 196         if ( 0 < iov_count ) {
 197             decoded_iov = (struct iovec *)malloc
 198                 (iov_count * sizeof(struct iovec));
 199             if (NULL == decoded_iov) {
 200                 ret = OMPI_ERR_OUT_OF_RESOURCE;
 201                 goto exit;
 202             }
 203         }
 204         for (ti = 0; ti < iov_count; ti ++){
 205             decoded_iov[ti].iov_base = (IOVBASE_TYPE *)(
 206                 (ptrdiff_t)temp_iov[ti].iov_base -
 207                 send_buf_addr);
 208             decoded_iov[ti].iov_len =
 209                 temp_iov[ti].iov_len ;
 210 #if DEBUG_ON
 211             printf("d_offset[%d]: %ld, d_len[%d]: %ld\n",
 212                    ti, (ptrdiff_t)decoded_iov[ti].iov_base,
 213                    ti, decoded_iov[ti].iov_len);
 214 #endif
 215         }
 216 
 217     }
 218     else{
 219         max_data = count * datatype->super.size;
 220     }
 221 
 222     if ( MPI_STATUS_IGNORE != status ) {
 223         status->_ucount = max_data;
 224     }
 225 
 226     two_phase_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators"));
 227     if ( OMPI_ERR_MAX == two_phase_num_io_procs ) {
 228         ret = OMPI_ERROR;
 229         goto exit;
 230     }
 231     if(-1 == two_phase_num_io_procs){
 232         ret = mca_common_ompio_set_aggregator_props ((struct ompio_file_t *)fh,
 233                                                      two_phase_num_io_procs,
 234                                                      max_data);
 235         if ( OMPI_SUCCESS != ret){
 236             goto exit;
 237         }
 238 
 239         two_phase_num_io_procs = fh->f_num_aggrs;
 240 
 241     }
 242 
 243     if (two_phase_num_io_procs > fh->f_size){
 244         two_phase_num_io_procs = fh->f_size;
 245     }
 246 
 247 #if DEBUG_ON
 248     printf("Number of aggregators : %ld\n", two_phase_num_io_procs);
 249 #endif
 250 
 251     aggregator_list = (int *) malloc (two_phase_num_io_procs *sizeof(int));
 252     if ( NULL == aggregator_list ) {
 253         ret = OMPI_ERR_OUT_OF_RESOURCE;
 254         goto exit;
 255     }
 256 
 257     if ( OMPI_COMM_IS_MAPBY_NODE (&ompi_mpi_comm_world.comm) ) {
 258         for (i =0; i< two_phase_num_io_procs; i++){
 259             aggregator_list[i] = i;
 260         }
 261     }
 262     else {
 263         for (i =0; i< two_phase_num_io_procs; i++){
 264             aggregator_list[i] = i * fh->f_size / two_phase_num_io_procs;
 265         }
 266     }        
 267 
 268     ret = fh->f_generate_current_file_view ((struct ompio_file_t*)fh,
 269                                             max_data,
 270                                             &iov,
 271                                             &local_count);
 272 
 273 
 274     if ( OMPI_SUCCESS != ret ){
 275         goto exit;
 276     }
 277 
 278     long_max_data = (long) max_data;
 279     ret = fh->f_comm->c_coll->coll_allreduce (&long_max_data,
 280                                              &long_total_bytes,
 281                                              1,
 282                                              MPI_LONG,
 283                                              MPI_SUM,
 284                                              fh->f_comm,
 285                                              fh->f_comm->c_coll->coll_allreduce_module);
 286 
 287     if ( OMPI_SUCCESS != ret ) {
 288         goto exit;
 289     }
 290     total_bytes = (size_t) long_total_bytes;
 291 
 292     if ( 0 == total_bytes ) {
 293         ret = OMPI_SUCCESS;
 294         goto exit;
 295     }
 296 
 297     if (!(fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) {
 298 
 299         /* This datastructre translates between OMPIO->ROMIO its a little hacky!*/
 300         /* But helps to re-use romio's code for handling non-contiguous file-type*/
 301         flat_buf = (Flatlist_node *)malloc(sizeof(Flatlist_node));
 302         if ( NULL == flat_buf ){
 303             ret = OMPI_ERR_OUT_OF_RESOURCE;
 304             goto exit;
 305         }
 306 
 307         flat_buf->type = datatype;
 308         flat_buf->next = NULL;
 309         flat_buf->count = 0;
 310         flat_buf->indices = NULL;
 311         flat_buf->blocklens = NULL;
 312 
 313         if ( 0 < count ) {
 314             local_size = OMPIO_MAX(1,iov_count/count);
 315         }
 316         else {
 317             local_size = 0;
 318         }
 319 
 320         if ( 0 < local_size ) {
 321             flat_buf->indices =
 322                 (OMPI_MPI_OFFSET_TYPE *)malloc(local_size *
 323                                                sizeof(OMPI_MPI_OFFSET_TYPE));
 324             if ( NULL == flat_buf->indices ){
 325                 ret = OMPI_ERR_OUT_OF_RESOURCE;
 326                 goto exit;
 327 
 328             }
 329 
 330             flat_buf->blocklens =
 331                 (OMPI_MPI_OFFSET_TYPE *)malloc(local_size *
 332                                                sizeof(OMPI_MPI_OFFSET_TYPE));
 333             if ( NULL == flat_buf->blocklens ){
 334                 ret = OMPI_ERR_OUT_OF_RESOURCE;
 335                 goto exit;
 336             }
 337         }
 338         flat_buf->count = local_size;
 339         for (j = 0 ; j < local_size ; ++j) {
 340             if ( 0 < max_data  ) {
 341                 flat_buf->indices[j] = (OMPI_MPI_OFFSET_TYPE)(intptr_t)decoded_iov[j].iov_base;
 342                 flat_buf->blocklens[j] = decoded_iov[j].iov_len;
 343             }
 344             else {
 345                 flat_buf->indices[j] = 0;
 346                 flat_buf->blocklens[j] = 0;
 347             }
 348         }
 349 
 350 #if DEBUG_ON
 351         printf("flat_buf_count : %d\n", flat_buf->count);
 352         for(i=0;i<flat_buf->count;i++){
 353             printf("%d: blocklen[%d] : %lld, indices[%d]: %lld \n",
 354                    fh->f_rank, i, flat_buf->blocklens[i], i ,flat_buf->indices[i]);
 355 
 356         }
 357 #endif
 358     }
 359 
 360 #if DEBUG_ON
 361     printf("%d: fcoll:two_phase:write_all->total_bytes:%ld, local_count: %d\n",
 362            fh->f_rank,total_bytes, local_count);
 363     for (i=0 ; i<local_count ; i++) {
 364         printf("%d: fcoll:two_phase:write_all:OFFSET:%ld,LENGTH:%ld\n",
 365                fh->f_rank,
 366                (size_t)iov[i].iov_base,
 367                (size_t)iov[i].iov_len);
 368     }
 369 
 370 
 371 #endif
 372 
 373     start_offset = (OMPI_MPI_OFFSET_TYPE)(uintptr_t)iov[0].iov_base;
 374     if ( 0 < local_count ) {
 375         end_offset = (OMPI_MPI_OFFSET_TYPE)(uintptr_t)iov[local_count-1].iov_base +
 376             (OMPI_MPI_OFFSET_TYPE)iov[local_count-1].iov_len - 1;
 377     }
 378     else {
 379         end_offset = 0;
 380     }
 381 
 382 #if DEBUG_ON
 383     printf("%d: fcoll:two_phase:write_all:START OFFSET:%ld,END OFFSET:%ld\n",
 384            fh->f_rank,
 385            (size_t)start_offset,
 386            (size_t)end_offset);
 387 
 388 #endif
 389 
 390     start_offsets = (OMPI_MPI_OFFSET_TYPE *)malloc
 391         (fh->f_size*sizeof(OMPI_MPI_OFFSET_TYPE));
 392 
 393     if ( NULL == start_offsets ){
 394         ret = OMPI_ERR_OUT_OF_RESOURCE;
 395         goto exit;
 396     }
 397 
 398     end_offsets = (OMPI_MPI_OFFSET_TYPE *)malloc
 399         (fh->f_size*sizeof(OMPI_MPI_OFFSET_TYPE));
 400 
 401     if ( NULL == end_offsets ){
 402         ret =  OMPI_ERR_OUT_OF_RESOURCE;
 403         goto exit;
 404     }
 405 
 406 
 407     ret = fh->f_comm->c_coll->coll_allgather(&start_offset,
 408                                             1,
 409                                             OMPI_OFFSET_DATATYPE,
 410                                             start_offsets,
 411                                             1,
 412                                             OMPI_OFFSET_DATATYPE,
 413                                             fh->f_comm,
 414                                             fh->f_comm->c_coll->coll_allgather_module);
 415 
 416     if ( OMPI_SUCCESS != ret ){
 417         goto exit;
 418     }
 419 
 420 
 421     ret = fh->f_comm->c_coll->coll_allgather(&end_offset,
 422                                             1,
 423                                             OMPI_OFFSET_DATATYPE,
 424                                             end_offsets,
 425                                             1,
 426                                             OMPI_OFFSET_DATATYPE,
 427                                             fh->f_comm,
 428                                             fh->f_comm->c_coll->coll_allgather_module);
 429 
 430 
 431     if ( OMPI_SUCCESS != ret ){
 432         goto exit;
 433     }
 434 
 435 #if DEBUG_ON
 436     for (i=0;i<fh->f_size;i++){
 437         printf("%d: fcoll:two_phase:write_all:start[%d]:%ld,end[%d]:%ld\n",
 438                fh->f_rank,i,
 439                (size_t)start_offsets[i],i,
 440                (size_t)end_offsets[i]);
 441     }
 442 #endif
 443 
 444 
 445 
 446     for (i=1; i<fh->f_size; i++){
 447         if ((start_offsets[i] < end_offsets[i-1]) &&
 448             (start_offsets[i] <= end_offsets[i])){
 449             interleave_count++;
 450         }
 451     }
 452 
 453 #if DEBUG_ON
 454     printf("%d: fcoll:two_phase:write_all:interleave_count:%d\n",
 455            fh->f_rank,interleave_count);
 456 #endif
 457 
 458 
 459     ret = mca_fcoll_two_phase_domain_partition(fh,
 460                                                start_offsets,
 461                                                end_offsets,
 462                                                &min_st_offset,
 463                                                &fd_start,
 464                                                &fd_end,
 465                                                domain_size,
 466                                                &fd_size,
 467                                                striping_unit,
 468                                                two_phase_num_io_procs);
 469     if ( OMPI_SUCCESS != ret ){
 470         goto exit;
 471     }
 472 
 473 
 474 #if  DEBUG_ON
 475     for (i=0;i<two_phase_num_io_procs;i++){
 476         printf("fd_start[%d] : %lld, fd_end[%d] : %lld, local_count: %d\n",
 477                i, fd_start[i], i, fd_end[i], local_count);
 478     }
 479 #endif
 480 
 481 
 482     ret = mca_fcoll_two_phase_calc_my_requests (fh,
 483                                                 iov,
 484                                                 local_count,
 485                                                 min_st_offset,
 486                                                 fd_start,
 487                                                 fd_end,
 488                                                 fd_size,
 489                                                 &count_my_req_procs,
 490                                                 &count_my_req_per_proc,
 491                                                 &my_req,
 492                                                 &buf_indices,
 493                                                 striping_unit,
 494                                                 two_phase_num_io_procs,
 495                                                 aggregator_list);
 496     if ( OMPI_SUCCESS != ret ){
 497         goto exit;
 498     }
 499 
 500 
 501 
 502     ret = mca_fcoll_two_phase_calc_others_requests(fh,
 503                                                    count_my_req_procs,
 504                                                    count_my_req_per_proc,
 505                                                    my_req,
 506                                                    &count_other_req_procs,
 507                                                    &others_req);
 508     if (OMPI_SUCCESS != ret ){
 509         goto exit;
 510     }
 511 
 512 
 513 #if DEBUG_ON
 514     printf("count_other_req_procs : %d\n", count_other_req_procs);
 515 #endif
 516 
 517 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 518     start_exch = MPI_Wtime();
 519 #endif
 520 
 521     ret = two_phase_exch_and_write(fh,
 522                                    buf,
 523                                    datatype,
 524                                    others_req,
 525                                    iov,
 526                                    local_count,
 527                                    min_st_offset,
 528                                    fd_size,
 529                                    fd_start,
 530                                    fd_end,
 531                                    flat_buf,
 532                                    buf_indices,
 533                                    striping_unit,
 534                                    two_phase_num_io_procs,
 535                                    aggregator_list);
 536 
 537     if (OMPI_SUCCESS != ret){
 538         goto exit;
 539     }
 540 
 541 
 542 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 543     end_exch = MPI_Wtime();
 544     exch_write += (end_exch - start_exch);
 545 
 546     nentry.time[0] = write_time;
 547     nentry.time[1] = comm_time;
 548     nentry.time[2] = exch_write;
 549     if (is_aggregator(fh->f_rank,
 550                       two_phase_num_io_procs,
 551                       aggregator_list)){
 552         nentry.aggregator = 1;
 553     }
 554     else{
 555         nentry.aggregator = 0;
 556     }
 557     nentry.nprocs_for_coll = two_phase_num_io_procs;
 558     if (!mca_common_ompio_full_print_queue(fh->f_coll_write_time)){
 559         mca_common_ompio_register_print_entry(fh->f_coll_write_time,
 560                                               nentry);
 561     }
 562 #endif
 563 
 564 exit :
 565     if (flat_buf != NULL) {
 566 
 567         if (flat_buf->blocklens != NULL) {
 568             free (flat_buf->blocklens);
 569         }
 570 
 571         if (flat_buf->indices != NULL) {
 572             free (flat_buf->indices);
 573         }
 574         free (flat_buf);
 575 
 576     }
 577 
 578 
 579     free (start_offsets);
 580     free (end_offsets);
 581     free (aggregator_list);
 582     free (decoded_iov);
 583     free (fd_start);
 584     free (fd_end);
 585     free (others_req);
 586     free (my_req);
 587     free (buf_indices);
 588     free (count_my_req_per_proc);
 589 
 590     return ret;
 591 }
 592 
 593 
 594 static int two_phase_exch_and_write(ompio_file_t *fh,
 595                                     const void *buf,
 596                                     MPI_Datatype datatype,
 597                                     mca_common_ompio_access_array_t *others_req,
 598                                     struct iovec *offset_len,
 599                                     int contig_access_count,
 600                                     OMPI_MPI_OFFSET_TYPE min_st_offset,
 601                                     OMPI_MPI_OFFSET_TYPE fd_size,
 602                                     OMPI_MPI_OFFSET_TYPE *fd_start,
 603                                     OMPI_MPI_OFFSET_TYPE *fd_end,
 604                                     Flatlist_node *flat_buf,
 605                                     size_t *buf_idx, int striping_unit,
 606                                     int two_phase_num_io_procs,
 607                                     int *aggregator_list)
 608 
 609 {
 610 
 611 
 612     int i, j, ntimes, max_ntimes, m;
 613     int *curr_offlen_ptr=NULL, *count=NULL, *send_size=NULL, *recv_size=NULL;
 614     int *partial_recv=NULL, *start_pos=NULL, req_len, flag;
 615     int *sent_to_proc=NULL, ret = OMPI_SUCCESS;
 616     int *send_buf_idx=NULL, *curr_to_proc=NULL, *done_to_proc=NULL;
 617     OMPI_MPI_OFFSET_TYPE st_loc=-1, end_loc=-1, off, done;
 618     OMPI_MPI_OFFSET_TYPE size=0, req_off, len;
 619     MPI_Aint buftype_extent;
 620     int  hole;
 621     int two_phase_cycle_buffer_size;
 622     size_t byte_size;
 623     MPI_Datatype byte = MPI_BYTE;
 624 #if DEBUG_ON
 625     int ii,jj;
 626 #endif
 627 
 628     char *write_buf=NULL;
 629 
 630 
 631     opal_datatype_type_size(&byte->super,
 632                             &byte_size);
 633 
 634     for (i = 0; i < fh->f_size; i++){
 635         if (others_req[i].count) {
 636             st_loc = others_req[i].offsets[0];
 637             end_loc = others_req[i].offsets[0];
 638             break;
 639         }
 640     }
 641 
 642     for (i=0;i<fh->f_size;i++){
 643         for(j=0;j< others_req[i].count; j++){
 644             st_loc = OMPIO_MIN(st_loc, others_req[i].offsets[j]);
 645             end_loc = OMPIO_MAX(end_loc, (others_req[i].offsets[j] + others_req[i].lens[j] - 1));
 646 
 647         }
 648     }
 649 
 650     two_phase_cycle_buffer_size = fh->f_bytes_per_agg;
 651     ntimes = (int) ((end_loc - st_loc + two_phase_cycle_buffer_size)/two_phase_cycle_buffer_size);
 652 
 653     if ((st_loc == -1) && (end_loc == -1)) {
 654         ntimes = 0;
 655     }
 656 
 657     fh->f_comm->c_coll->coll_allreduce (&ntimes,
 658                                        &max_ntimes,
 659                                        1,
 660                                        MPI_INT,
 661                                        MPI_MAX,
 662                                        fh->f_comm,
 663                                        fh->f_comm->c_coll->coll_allreduce_module);
 664 
 665     if (ntimes){
 666         write_buf = (char *) malloc (two_phase_cycle_buffer_size);
 667         if ( NULL == write_buf ){
 668             return OMPI_ERR_OUT_OF_RESOURCE;
 669         }
 670     }
 671 
 672     curr_offlen_ptr = (int *) calloc(fh->f_size, sizeof(int));
 673 
 674     if ( NULL == curr_offlen_ptr ){
 675         ret = OMPI_ERR_OUT_OF_RESOURCE;
 676         goto exit;
 677     }
 678 
 679     count = (int *) malloc(fh->f_size*sizeof(int));
 680 
 681     if ( NULL == count ){
 682         ret = OMPI_ERR_OUT_OF_RESOURCE;
 683         goto exit;
 684     }
 685 
 686     partial_recv = (int *)calloc(fh->f_size, sizeof(int));
 687 
 688     if ( NULL == partial_recv ){
 689         ret = OMPI_ERR_OUT_OF_RESOURCE;
 690         goto exit;
 691     }
 692 
 693     send_size = (int *) calloc(fh->f_size,sizeof(int));
 694 
 695     if ( NULL == send_size ){
 696         ret = OMPI_ERR_OUT_OF_RESOURCE;
 697         goto exit;
 698     }
 699 
 700     recv_size = (int *) calloc(fh->f_size,sizeof(int));
 701 
 702     if ( NULL == recv_size ){
 703         ret = OMPI_ERR_OUT_OF_RESOURCE;
 704         goto exit;
 705     }
 706 
 707     send_buf_idx = (int *) malloc(fh->f_size*sizeof(int));
 708 
 709     if ( NULL == send_buf_idx ){
 710         ret = OMPI_ERR_OUT_OF_RESOURCE;
 711         goto exit;
 712     }
 713 
 714     sent_to_proc = (int *) calloc(fh->f_size, sizeof(int));
 715 
 716     if ( NULL == sent_to_proc){
 717         ret = OMPI_ERR_OUT_OF_RESOURCE;
 718         goto exit;
 719     }
 720 
 721     curr_to_proc = (int *) malloc(fh->f_size*sizeof(int));
 722 
 723     if ( NULL == curr_to_proc ){
 724         ret = OMPI_ERR_OUT_OF_RESOURCE;
 725         goto exit;
 726     }
 727 
 728     done_to_proc = (int *) malloc(fh->f_size*sizeof(int));
 729 
 730     if ( NULL == done_to_proc ){
 731         ret = OMPI_ERR_OUT_OF_RESOURCE;
 732         goto exit;
 733     }
 734 
 735     start_pos = (int *) malloc(fh->f_size*sizeof(int));
 736 
 737     if ( NULL == start_pos ){
 738       ret = OMPI_ERR_OUT_OF_RESOURCE;
 739       goto exit;
 740     }
 741 
 742 
 743     done = 0;
 744     off = st_loc;
 745 
 746     ompi_datatype_type_extent(datatype, &buftype_extent);
 747     for (m=0;m <ntimes; m++){
 748         for (i=0; i< fh->f_size; i++) count[i] = recv_size[i] = 0;
 749 
 750         size = OMPIO_MIN((unsigned)two_phase_cycle_buffer_size,
 751                          end_loc-st_loc+1-done);
 752         for (i=0;i<fh->f_size;i++){
 753             if(others_req[i].count){
 754                 start_pos[i] = curr_offlen_ptr[i];
 755                 for (j=curr_offlen_ptr[i]; j<others_req[i].count; j++) {
 756                     if (partial_recv[i]) {
 757                         /* this request may have been partially
 758                            satisfied in the previous iteration. */
 759                         req_off = others_req[i].offsets[j] +
 760                             partial_recv[i];
 761                         req_len = others_req[i].lens[j] -
 762                             partial_recv[i];
 763                         partial_recv[i] = 0;
 764                         /* modify the off-len pair to reflect this change */
 765                         others_req[i].offsets[j] = req_off;
 766                         others_req[i].lens[j] = req_len;
 767                     }
 768                     else {
 769                         req_off = others_req[i].offsets[j];
 770                         req_len = others_req[i].lens[j];
 771                     }
 772                     if (req_off < off + size) {
 773                         count[i]++;
 774 #if DEBUG_ON
 775                         printf("%d: req_off : %lld, off : %lld, size : %lld, count[%d]: %d\n", fh->f_rank,
 776                                req_off,
 777                                off,
 778                                size,i,
 779                                count[i]);
 780 #endif
 781                         PMPI_Get_address(write_buf+req_off-off,
 782                                      &(others_req[i].mem_ptrs[j]));
 783 #if DEBUG_ON
 784                         printf("%d : mem_ptrs : %ld\n", fh->f_rank,
 785                                others_req[i].mem_ptrs[j]);
 786 #endif
 787                         recv_size[i] += (int) (OMPIO_MIN(off + size - req_off,
 788                                                          (unsigned)req_len));
 789 
 790                         if (off+size-req_off < (unsigned)req_len){
 791 
 792                             partial_recv[i] = (int)(off + size - req_off);
 793                             break;
 794                         }
 795                     }
 796                     else break;
 797                 }
 798                 curr_offlen_ptr[i] = j;
 799             }
 800         }
 801 
 802         ret = two_phase_exchage_data(fh, buf, write_buf,
 803                                      offset_len,send_size,
 804                                      start_pos,recv_size,off,size,
 805                                      count, partial_recv, sent_to_proc,
 806                                      contig_access_count,
 807                                      min_st_offset,
 808                                      fd_size, fd_start,
 809                                      fd_end, flat_buf, others_req,
 810                                      send_buf_idx, curr_to_proc,
 811                                      done_to_proc, m, buf_idx,
 812                                      buftype_extent, striping_unit,
 813                                      two_phase_num_io_procs,
 814                                      aggregator_list, &hole);
 815 
 816         if ( OMPI_SUCCESS != ret ){
 817             goto exit;
 818         }
 819 
 820 
 821 
 822         flag = 0;
 823         for (i=0; i<fh->f_size; i++)
 824             if (count[i]) flag = 1;
 825 
 826 
 827 
 828         if (flag){
 829 
 830 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 831             start_write_time = MPI_Wtime();
 832 #endif
 833 
 834 #if DEBUG_ON
 835             printf("rank : %d enters writing\n", fh->f_rank);
 836             printf("size : %ld, off : %ld\n",size, off);
 837             for (ii=0, jj=0;jj<size;jj+=4, ii++){
 838                 printf("%d : write_buf[%d]: %d\n", fh->f_rank, ii,((int *)write_buf[jj]));
 839             }
 840 #endif
 841             len = size * byte_size;
 842             fh->f_io_array = (mca_common_ompio_io_array_t *)malloc
 843                 (sizeof(mca_common_ompio_io_array_t));
 844             if (NULL == fh->f_io_array) {
 845                 opal_output(1, "OUT OF MEMORY\n");
 846                 ret = OMPI_ERR_OUT_OF_RESOURCE;
 847                 goto exit;
 848             }
 849 
 850             fh->f_io_array[0].offset  =(IOVBASE_TYPE *)(intptr_t) off;
 851             fh->f_io_array[0].length = len;
 852             fh->f_io_array[0].memory_address = write_buf;
 853             fh->f_num_of_io_entries = 1;
 854 
 855 #if DEBUG_ON
 856             for (i=0 ; i<fh->f_num_of_io_entries ; i++) {
 857                 printf("%d: ADDRESS: %p  OFFSET: %ld   LENGTH: %d\n",
 858                        fh->f_rank,
 859                        fh->f_io_array[i].memory_address,
 860                        fh->f_io_array[i].offset,
 861                        fh->f_io_array[i].length);
 862             }
 863 #endif
 864 
 865             if (fh->f_num_of_io_entries){
 866                 if ( 0 > fh->f_fbtl->fbtl_pwritev (fh)) {
 867                     opal_output(1, "WRITE FAILED\n");
 868                     ret = OMPI_ERROR;
 869                     goto exit;
 870                 }
 871             }
 872 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 873             end_write_time = MPI_Wtime();
 874             write_time += (end_write_time - start_write_time);
 875 #endif
 876 
 877 
 878         }
 879         /***************** DONE WRITING *****************************************/
 880         /****RESET **********************/
 881         fh->f_num_of_io_entries = 0;
 882         if (NULL != fh->f_io_array) {
 883             free (fh->f_io_array);
 884             fh->f_io_array = NULL;
 885         }
 886 
 887         off += size;
 888         done += size;
 889 
 890     }
 891     for (i=0; i<fh->f_size; i++) count[i] = recv_size[i] = 0;
 892     for (m=ntimes; m<max_ntimes; m++) {
 893         ret = two_phase_exchage_data(fh, buf, write_buf,
 894                                      offset_len,send_size,
 895                                      start_pos,recv_size,off,size,
 896                                      count, partial_recv, sent_to_proc,
 897                                      contig_access_count,
 898                                      min_st_offset,
 899                                      fd_size, fd_start,
 900                                      fd_end, flat_buf,others_req,
 901                                      send_buf_idx, curr_to_proc,
 902                                      done_to_proc, m, buf_idx,
 903                                      buftype_extent, striping_unit,
 904                                      two_phase_num_io_procs,
 905                                      aggregator_list, &hole);
 906         if ( OMPI_SUCCESS != ret ){
 907             goto exit;
 908         }
 909     }
 910 
 911 exit:
 912 
 913     free (write_buf);
 914     free (curr_offlen_ptr);
 915     free (count);
 916     free (partial_recv);
 917     free (send_size);
 918     free (recv_size);
 919     free (sent_to_proc);
 920     free (start_pos);
 921     free (send_buf_idx);
 922     free (curr_to_proc);
 923     free (done_to_proc);
 924 
 925     return ret;
 926 }
 927 
 928 static int two_phase_exchage_data(ompio_file_t *fh,
 929                                   const void *buf,
 930                                   char *write_buf,
 931                                   struct iovec *offset_length,
 932                                   int *send_size,int *start_pos,
 933                                   int *recv_size,
 934                                   OMPI_MPI_OFFSET_TYPE off,
 935                                   OMPI_MPI_OFFSET_TYPE size, int *count,
 936                                   int *partial_recv, int *sent_to_proc,
 937                                   int contig_access_count,
 938                                   OMPI_MPI_OFFSET_TYPE min_st_offset,
 939                                   OMPI_MPI_OFFSET_TYPE fd_size,
 940                                   OMPI_MPI_OFFSET_TYPE *fd_start,
 941                                   OMPI_MPI_OFFSET_TYPE *fd_end,
 942                                   Flatlist_node *flat_buf,
 943                                   mca_common_ompio_access_array_t *others_req,
 944                                   int *send_buf_idx, int *curr_to_proc,
 945                                   int *done_to_proc, int iter,
 946                                   size_t *buf_idx,MPI_Aint buftype_extent,
 947                                   int striping_unit, int two_phase_num_io_procs,
 948                                   int *aggregator_list, int *hole){
 949 
 950     int *tmp_len=NULL, sum, *srt_len=NULL, nprocs_recv, nprocs_send,  k,i,j;
 951     int ret=OMPI_SUCCESS;
 952     MPI_Request *requests=NULL, *send_req=NULL;
 953     ompi_datatype_t **recv_types=NULL;
 954     OMPI_MPI_OFFSET_TYPE *srt_off=NULL;
 955     char **send_buf = NULL;
 956 
 957 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 958     start_comm_time = MPI_Wtime();
 959 #endif
 960     ret = fh->f_comm->c_coll->coll_alltoall (recv_size,
 961                                             1,
 962                                             MPI_INT,
 963                                             send_size,
 964                                             1,
 965                                             MPI_INT,
 966                                             fh->f_comm,
 967                                             fh->f_comm->c_coll->coll_alltoall_module);
 968 
 969     if ( OMPI_SUCCESS != ret ){
 970         return ret;
 971     }
 972 
 973     nprocs_recv = 0;
 974     for (i=0;i<fh->f_size;i++){
 975         if (recv_size[i]){
 976             nprocs_recv++;
 977         }
 978     }
 979 
 980 
 981     recv_types = (ompi_datatype_t **)
 982         calloc (( nprocs_recv + 1 ), sizeof(ompi_datatype_t *));
 983 
 984     if ( NULL == recv_types ){
 985         ret = OMPI_ERR_OUT_OF_RESOURCE;
 986         goto exit;
 987     }
 988 
 989     tmp_len = (int *) calloc(fh->f_size, sizeof(int));
 990 
 991     if ( NULL == tmp_len ) {
 992         ret = OMPI_ERR_OUT_OF_RESOURCE;
 993         goto exit;
 994     }
 995 
 996     j = 0;
 997     for (i=0;i<fh->f_size;i++){
 998         if (recv_size[i]) {
 999             if (partial_recv[i]) {
1000                 k = start_pos[i] + count[i] - 1;
1001                 tmp_len[i] = others_req[i].lens[k];
1002                 others_req[i].lens[k] = partial_recv[i];
1003             }
1004             ompi_datatype_create_hindexed(count[i],
1005                                           &(others_req[i].lens[start_pos[i]]),
1006                                           &(others_req[i].mem_ptrs[start_pos[i]]),
1007                                           MPI_BYTE, recv_types+j);
1008             ompi_datatype_commit(recv_types+j);
1009             j++;
1010         }
1011     }
1012 
1013     sum = 0;
1014     for (i=0;i<fh->f_size;i++) sum += count[i];
1015     srt_off = (OMPI_MPI_OFFSET_TYPE *)
1016         malloc((sum+1)*sizeof(OMPI_MPI_OFFSET_TYPE));
1017 
1018     if ( NULL == srt_off ){
1019         ret = OMPI_ERR_OUT_OF_RESOURCE;
1020         goto exit;
1021     }
1022 
1023     srt_len = (int *) malloc((sum+1)*sizeof(int));
1024 
1025     if ( NULL == srt_len ) {
1026         ret = OMPI_ERR_OUT_OF_RESOURCE;
1027         free(srt_off);
1028         goto exit;
1029     }
1030 
1031 
1032     two_phase_heap_merge(others_req, count, srt_off, srt_len, start_pos, fh->f_size,fh->f_rank,  nprocs_recv, sum);
1033 
1034 
1035     for (i=0; i<fh->f_size; i++)
1036         if (partial_recv[i]) {
1037             k = start_pos[i] + count[i] - 1;
1038             others_req[i].lens[k] = tmp_len[i];
1039         }
1040 
1041     free(tmp_len);
1042     tmp_len = NULL;
1043 
1044     *hole = 0;
1045     if (off != srt_off[0]){
1046         *hole = 1;
1047     }
1048     else{
1049         for (i=1;i<sum;i++){
1050             if (srt_off[i] <= srt_off[0] + srt_len[0]){
1051                 int new_len = srt_off[i] + srt_len[i] - srt_off[0];
1052                 if(new_len > srt_len[0])
1053                     srt_len[0] = new_len;
1054             }
1055             else
1056                 break;
1057         }
1058         if (i < sum || size != srt_len[0])
1059             *hole = 1;
1060     }
1061 
1062 
1063     free(srt_off);
1064     free(srt_len);
1065 
1066     if (nprocs_recv){
1067         if (*hole){
1068             if (off >= 0){
1069                 fh->f_io_array = (mca_common_ompio_io_array_t *)malloc
1070                     (sizeof(mca_common_ompio_io_array_t));
1071                 if (NULL == fh->f_io_array) {
1072                     opal_output(1, "OUT OF MEMORY\n");
1073                     ret = OMPI_ERR_OUT_OF_RESOURCE;
1074                     goto exit;
1075                 }
1076                 fh->f_io_array[0].offset  =(IOVBASE_TYPE *)(intptr_t) off;
1077                 fh->f_num_of_io_entries = 1;
1078                 fh->f_io_array[0].length = size;
1079                 fh->f_io_array[0].memory_address = write_buf;
1080                 if (fh->f_num_of_io_entries){
1081                     int amode_overwrite;
1082                     amode_overwrite = fh->f_get_mca_parameter_value ("overwrite_amode", strlen("overwrite_amode"));
1083                     if ( OMPI_ERR_MAX == amode_overwrite ) {
1084                         ret = OMPI_ERROR;
1085                         goto exit;
1086                     }
1087                     if ( fh->f_amode & MPI_MODE_WRONLY && !amode_overwrite ){
1088                         if ( 0 == fh->f_rank ) {
1089                             printf("\n File not opened in RDWR mode, can not continue."
1090                                    "\n To resolve this problem, you can either \n"
1091                                    "  a. open the file with MPI_MODE_RDWR instead of MPI_MODE_WRONLY\n"
1092                                    "  b. ensure that the mca parameter mca_io_ompio_overwrite_amode is set to 1\n"
1093                                    "  c. use an fcoll component that does not use data sieving (e.g. dynamic)\n");
1094                         }
1095                         ret = MPI_ERR_FILE;
1096                         goto exit;
1097                     }
1098                     if ( 0 >  fh->f_fbtl->fbtl_preadv (fh)) {
1099                         opal_output(1, "READ FAILED\n");
1100                         ret = OMPI_ERROR;
1101                         goto exit;
1102                     }
1103                 }
1104 
1105             }
1106             fh->f_num_of_io_entries = 0;
1107             if (NULL != fh->f_io_array) {
1108                 free (fh->f_io_array);
1109                 fh->f_io_array = NULL;
1110             }
1111         }
1112     }
1113 
1114     nprocs_send = 0;
1115     for (i=0; i <fh->f_size; i++) if (send_size[i]) nprocs_send++;
1116 
1117 #if DEBUG_ON
1118     printf("%d : nprocs_send : %d\n", fh->f_rank,nprocs_send);
1119 #endif
1120 
1121     requests = (MPI_Request *)
1122         malloc((nprocs_send+nprocs_recv+1)*sizeof(MPI_Request));
1123 
1124     if ( NULL == requests ){
1125         ret = OMPI_ERR_OUT_OF_RESOURCE;
1126         goto exit;
1127     }
1128 
1129     j = 0;
1130     for (i=0; i<fh->f_size; i++) {
1131         if (recv_size[i]) {
1132             ret = MCA_PML_CALL(irecv(MPI_BOTTOM,
1133                                      1,
1134                                      recv_types[j],
1135                                      i,
1136                                      fh->f_rank+i+100*iter,
1137                                      fh->f_comm,
1138                                      requests+j));
1139 
1140             if ( OMPI_SUCCESS != ret ){
1141                 goto exit;
1142             }
1143             j++;
1144         }
1145     }
1146     send_req = requests + nprocs_recv;
1147 
1148 
1149     if (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY) {
1150         j = 0;
1151         for (i=0; i <fh->f_size; i++)
1152             if (send_size[i]) {
1153                 ret = MCA_PML_CALL(isend(((char *) buf) + buf_idx[i],
1154                                          send_size[i],
1155                                          MPI_BYTE,
1156                                          i,
1157                                          fh->f_rank+i+100*iter,
1158                                          MCA_PML_BASE_SEND_STANDARD,
1159                                          fh->f_comm,
1160                                          send_req+j));
1161 
1162                 if ( OMPI_SUCCESS != ret ){
1163                     goto exit;
1164                 }
1165 
1166                 j++;
1167                 buf_idx[i] += send_size[i];
1168             }
1169     }
1170     else if(nprocs_send && (!(fh->f_flags & OMPIO_CONTIGUOUS_MEMORY))){
1171         send_buf = (char **) calloc (fh->f_size, sizeof(char*));
1172         if ( NULL == send_buf ){
1173             ret = OMPI_ERR_OUT_OF_RESOURCE;
1174             goto exit;
1175         }
1176         for (i=0; i < fh->f_size; i++){
1177             if (send_size[i]) {
1178                 send_buf[i] = (char *) malloc(send_size[i]);
1179 
1180                 if ( NULL == send_buf[i] ){
1181                     ret = OMPI_ERR_OUT_OF_RESOURCE;
1182                     goto exit;
1183                 }
1184             }
1185         }
1186 
1187         ret = two_phase_fill_send_buffer(fh, buf,flat_buf, send_buf,
1188                                          offset_length, send_size,
1189                                          send_req,sent_to_proc,
1190                                          contig_access_count,
1191                                          min_st_offset, fd_size,
1192                                          fd_start, fd_end, send_buf_idx,
1193                                          curr_to_proc, done_to_proc,
1194                                          iter, buftype_extent, striping_unit,
1195                                          two_phase_num_io_procs, aggregator_list);
1196 
1197         if ( OMPI_SUCCESS != ret ){
1198             goto exit;
1199         }
1200     }
1201 
1202 
1203     ret = ompi_request_wait_all (nprocs_send+nprocs_recv,
1204                                  requests,
1205                                  MPI_STATUS_IGNORE);
1206 
1207 
1208 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
1209     end_comm_time = MPI_Wtime();
1210     comm_time += (end_comm_time - start_comm_time);
1211 #endif
1212 
1213 exit:
1214     if (recv_types) {
1215         for (i=0; i<nprocs_recv; i++) {
1216             if (recv_types[i]) {
1217                 ompi_datatype_destroy(recv_types+i);
1218             }
1219         }
1220     }
1221     free (recv_types);
1222 
1223     free (requests);
1224     if (send_buf) {
1225         for (i=0; i < fh->f_size; i++){
1226             free (send_buf[i]);
1227         }
1228 
1229         free (send_buf);
1230     }
1231     free (tmp_len);
1232 
1233     return ret;
1234 }
1235 
1236 
1237 #define TWO_PHASE_BUF_INCR                      \
1238     {                                           \
1239         while (buf_incr) {                              \
1240             size_in_buf = OMPIO_MIN(buf_incr, flat_buf_sz);     \
1241             user_buf_idx += size_in_buf;                        \
1242             flat_buf_sz -= size_in_buf;                         \
1243             if (!flat_buf_sz) {                                       \
1244                 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
1245                 else {                                                  \
1246                     flat_buf_idx = 0;                                   \
1247                     n_buftypes++;                                       \
1248                 }                                                       \
1249                 user_buf_idx = flat_buf->indices[flat_buf_idx] +        \
1250                     (OMPI_MPI_OFFSET_TYPE)n_buftypes*(OMPI_MPI_OFFSET_TYPE)buftype_extent; \
1251                 flat_buf_sz = flat_buf->blocklens[flat_buf_idx];        \
1252             }                                                           \
1253             buf_incr -= size_in_buf;                                    \
1254         }                                                               \
1255     }
1256 
1257 
1258 #define TWO_PHASE_BUF_COPY                      \
1259     {                                           \
1260         while (size) {                              \
1261             size_in_buf = OMPIO_MIN(size, flat_buf_sz); \
1262             memcpy(&(send_buf[p][send_buf_idx[p]]),         \
1263                    ((char *) buf) + user_buf_idx, size_in_buf); \
1264             send_buf_idx[p] += size_in_buf;                     \
1265             user_buf_idx += size_in_buf;                        \
1266             flat_buf_sz -= size_in_buf;                         \
1267             if (!flat_buf_sz) {                                       \
1268                 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
1269                 else {                                                  \
1270                     flat_buf_idx = 0;                                   \
1271                     n_buftypes++;                                       \
1272                 }                                                       \
1273                 user_buf_idx = flat_buf->indices[flat_buf_idx] +        \
1274                     (OMPI_MPI_OFFSET_TYPE)n_buftypes*(OMPI_MPI_OFFSET_TYPE)buftype_extent; \
1275                 flat_buf_sz = flat_buf->blocklens[flat_buf_idx];        \
1276             }                                                           \
1277             size -= size_in_buf;                                        \
1278             buf_incr -= size_in_buf;                                    \
1279         }                                                               \
1280         TWO_PHASE_BUF_INCR                                              \
1281 }
1282 
1283 
1284 
1285 
1286 
1287 static int two_phase_fill_send_buffer(ompio_file_t *fh,
1288                                       const void *buf,
1289                                       Flatlist_node *flat_buf,
1290                                       char **send_buf,
1291                                       struct iovec *offset_length,
1292                                       int *send_size,
1293                                       MPI_Request *requests,
1294                                       int *sent_to_proc,
1295                                       int contig_access_count,
1296                                       OMPI_MPI_OFFSET_TYPE min_st_offset,
1297                                       OMPI_MPI_OFFSET_TYPE fd_size,
1298                                       OMPI_MPI_OFFSET_TYPE *fd_start,
1299                                       OMPI_MPI_OFFSET_TYPE *fd_end,
1300                                       int *send_buf_idx,
1301                                       int *curr_to_proc,
1302                                       int *done_to_proc,
1303                                       int iter, MPI_Aint buftype_extent,
1304                                       int striping_unit, int two_phase_num_io_procs,
1305                                       int *aggregator_list)
1306 {
1307 
1308     int i, p, flat_buf_idx=0;
1309     OMPI_MPI_OFFSET_TYPE flat_buf_sz=0, size_in_buf=0, buf_incr=0, size=0;
1310     int jj, n_buftypes, ret=OMPI_SUCCESS;
1311     OMPI_MPI_OFFSET_TYPE off=0, len=0, rem_len=0, user_buf_idx=0;
1312 
1313     for (i=0; i < fh->f_size; i++) {
1314         send_buf_idx[i] = curr_to_proc[i] = 0;
1315         done_to_proc[i] = sent_to_proc[i];
1316     }
1317     jj = 0;
1318 
1319     flat_buf_idx = 0;
1320     n_buftypes = 0;
1321     if ( flat_buf->count > 0 ) {
1322         user_buf_idx = flat_buf->indices[0];
1323         flat_buf_sz = flat_buf->blocklens[0];
1324     }
1325 
1326     for (i=0; i<contig_access_count; i++) {
1327 
1328         off     = (OMPI_MPI_OFFSET_TYPE) (intptr_t)offset_length[i].iov_base;
1329         rem_len = (OMPI_MPI_OFFSET_TYPE)offset_length[i].iov_len;
1330 
1331 
1332         while (rem_len != 0) {
1333             len = rem_len;
1334             p = mca_fcoll_two_phase_calc_aggregator(fh,
1335                                                     off,
1336                                                     min_st_offset,
1337                                                     &len,
1338                                                     fd_size,
1339                                                     fd_start,
1340                                                     fd_end,
1341                                                     striping_unit,
1342                                                     two_phase_num_io_procs,
1343                                                     aggregator_list);
1344 
1345             if (send_buf_idx[p] < send_size[p]) {
1346                 if (curr_to_proc[p]+len > done_to_proc[p]) {
1347                     if (done_to_proc[p] > curr_to_proc[p]) {
1348                         size = OMPIO_MIN(curr_to_proc[p] + len -
1349                                          done_to_proc[p], send_size[p]-send_buf_idx[p]);
1350                         buf_incr = done_to_proc[p] - curr_to_proc[p];
1351                         TWO_PHASE_BUF_INCR
1352                         buf_incr = curr_to_proc[p] + len - done_to_proc[p];
1353                         curr_to_proc[p] = done_to_proc[p] + size;
1354                         TWO_PHASE_BUF_COPY
1355                     }
1356                     else {
1357                         size = OMPIO_MIN(len,send_size[p]-send_buf_idx[p]);
1358                         buf_incr = len;
1359                         curr_to_proc[p] += size;
1360                         TWO_PHASE_BUF_COPY
1361                     }
1362                     if (send_buf_idx[p] == send_size[p]) {
1363 
1364                       ret = MCA_PML_CALL(isend(send_buf[p],
1365                                                send_size[p],
1366                                                MPI_BYTE,
1367                                                p,
1368                                                fh->f_rank+p+100*iter,
1369                                                MCA_PML_BASE_SEND_STANDARD,
1370                                                fh->f_comm,
1371                                                requests+jj));
1372 
1373                       if ( OMPI_SUCCESS != ret ){
1374                         return ret;
1375                       }
1376                       jj++;
1377                     }
1378                 }
1379                 else {
1380                     curr_to_proc[p] += len;
1381                     buf_incr = len;
1382                     TWO_PHASE_BUF_INCR
1383                 }
1384             }
1385             else {
1386                 buf_incr = len;
1387                 TWO_PHASE_BUF_INCR
1388             }
1389             off     += len;
1390             rem_len -= len;
1391         }
1392     }
1393     for (i=0; i < fh->f_size; i++) {
1394       if (send_size[i]){
1395         sent_to_proc[i] = curr_to_proc[i];
1396       }
1397     }
1398 
1399     return ret;
1400 }
1401 
1402 
1403 
1404 
1405 
1406 
1407 void two_phase_heap_merge( mca_common_ompio_access_array_t *others_req,
1408                            int *count,
1409                            OMPI_MPI_OFFSET_TYPE *srt_off,
1410                            int *srt_len,
1411                            int *start_pos,
1412                            int nprocs,
1413                            int myrank,
1414                            int nprocs_recv,
1415                            int total_elements)
1416 {
1417 
1418 
1419 
1420     typedef struct {
1421         OMPI_MPI_OFFSET_TYPE *off_list;
1422         int *len_list;
1423         int nelem;
1424     } heap_struct;
1425 
1426     heap_struct *a, tmp;
1427     int i, j, heapsize, l, r, k, smallest;
1428 
1429     a = (heap_struct *) malloc((nprocs_recv+1)*sizeof(heap_struct));
1430 
1431     j = 0;
1432     for (i=0; i<nprocs; i++)
1433         if (count[i]) {
1434             a[j].off_list = &(others_req[i].offsets[start_pos[i]]);
1435             a[j].len_list = &(others_req[i].lens[start_pos[i]]);
1436             a[j].nelem = count[i];
1437             j++;
1438         }
1439 
1440     heapsize = nprocs_recv;
1441 
1442     for (i=heapsize/2 - 1; i>=0; i--) {
1443         k = i;
1444         for(;;) {
1445             l = 2*(k+1) - 1;
1446             r = 2*(k+1);
1447             if ((l < heapsize) &&
1448                 (*(a[l].off_list) < *(a[k].off_list)))
1449                 smallest = l;
1450             else smallest = k;
1451 
1452             if ((r < heapsize) &&
1453                 (*(a[r].off_list) < *(a[smallest].off_list)))
1454                 smallest = r;
1455 
1456             if (smallest != k) {
1457                 tmp.off_list = a[k].off_list;
1458                 tmp.len_list = a[k].len_list;
1459                 tmp.nelem = a[k].nelem;
1460 
1461                 a[k].off_list = a[smallest].off_list;
1462                 a[k].len_list = a[smallest].len_list;
1463                 a[k].nelem = a[smallest].nelem;
1464 
1465                 a[smallest].off_list = tmp.off_list;
1466                 a[smallest].len_list = tmp.len_list;
1467                 a[smallest].nelem = tmp.nelem;
1468 
1469                 k = smallest;
1470             }
1471             else break;
1472         }
1473     }
1474 
1475 
1476     for (i=0; i<total_elements; i++) {
1477         /* extract smallest element from heap, i.e. the root */
1478         srt_off[i] = *(a[0].off_list);
1479         srt_len[i] = *(a[0].len_list);
1480         (a[0].nelem)--;
1481 
1482         if (!a[0].nelem) {
1483             a[0].off_list = a[heapsize-1].off_list;
1484             a[0].len_list = a[heapsize-1].len_list;
1485             a[0].nelem = a[heapsize-1].nelem;
1486             heapsize--;
1487         }
1488         else {
1489             (a[0].off_list)++;
1490             (a[0].len_list)++;
1491         }
1492 
1493         /* Heapify(a, 0, heapsize); */
1494         k = 0;
1495         for (;;) {
1496             l = 2*(k+1) - 1;
1497             r = 2*(k+1);
1498 
1499             if ((l < heapsize) &&
1500                 (*(a[l].off_list) < *(a[k].off_list)))
1501                 smallest = l;
1502             else smallest = k;
1503 
1504             if ((r < heapsize) &&
1505                 (*(a[r].off_list) < *(a[smallest].off_list)))
1506                 smallest = r;
1507 
1508             if (smallest != k) {
1509                 tmp.off_list = a[k].off_list;
1510                 tmp.len_list = a[k].len_list;
1511                 tmp.nelem = a[k].nelem;
1512 
1513                 a[k].off_list = a[smallest].off_list;
1514                 a[k].len_list = a[smallest].len_list;
1515                 a[k].nelem = a[smallest].nelem;
1516 
1517                 a[smallest].off_list = tmp.off_list;
1518                 a[smallest].len_list = tmp.len_list;
1519                 a[smallest].nelem = tmp.nelem;
1520 
1521                 k = smallest;
1522             }
1523             else break;
1524         }
1525     }
1526     free(a);
1527 }
1528 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
1529 int is_aggregator(int rank,
1530                   int nprocs_for_coll,
1531                   int *aggregator_list){
1532 
1533   int i=0;
1534   for (i=0; i<nprocs_for_coll; i++){
1535     if (aggregator_list[i] == rank)
1536       return 1;
1537   }
1538   return 0;
1539 }
1540 #endif

/* [<][>][^][v][top][bottom][index][help] */