root/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_fcoll_dynamic_gen2_file_write_all
  2. write_init
  3. shuffle_init
  4. mca_fcoll_dynamic_gen2_break_file_view
  5. mca_fcoll_dynamic_gen2_get_configuration
  6. mca_fcoll_dynamic_gen2_split_iov_array
  7. local_heap_sort

   1 /*
   2  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2017 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2008-2016 University of Houston. All rights reserved.
  13  * Copyright (c) 2015-2018 Research Organization for Information Science
  14  *                         and Technology (RIST). All rights reserved.
  15  * Copyright (c) 2017      IBM Corporation. All rights reserved.
  16  * Copyright (c) 2018      Cisco Systems, Inc.  All rights reserved
  17  * $COPYRIGHT$
  18  *
  19  * Additional copyrights may follow
  20  *
  21  * $HEADER$
  22  */
  23 
  24 #include "ompi_config.h"
  25 #include "fcoll_dynamic_gen2.h"
  26 
  27 #include "mpi.h"
  28 #include "ompi/constants.h"
  29 #include "ompi/mca/fcoll/fcoll.h"
  30 #include "ompi/mca/fcoll/base/fcoll_base_coll_array.h"
  31 #include "ompi/mca/common/ompio/common_ompio.h"
  32 #include "ompi/mca/io/io.h"
  33 #include "math.h"
  34 #include "ompi/mca/pml/pml.h"
  35 #include <unistd.h>
  36 
  37 
  38 #define DEBUG_ON 0
  39 #define FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG   123
  40 #define INIT_LEN 10
  41 
  42 /*Used for loading file-offsets per aggregator*/
  43 typedef struct mca_io_ompio_local_io_array{
  44     OMPI_MPI_OFFSET_TYPE offset;
  45     MPI_Aint             length;
  46     int                  process_id;
  47 }mca_io_ompio_local_io_array;
  48 
  49 typedef struct mca_io_ompio_aggregator_data {
  50     int *disp_index, *sorted, *fview_count, n;
  51     int *max_disp_index;
  52     int **blocklen_per_process;
  53     MPI_Aint **displs_per_process, total_bytes, bytes_per_cycle, total_bytes_written;
  54     MPI_Comm comm;
  55     char *buf, *global_buf, *prev_global_buf;
  56     ompi_datatype_t **recvtype, **prev_recvtype;
  57     struct iovec *global_iov_array;
  58     int current_index, current_position;
  59     int bytes_to_write_in_cycle, bytes_remaining, procs_per_group;    
  60     int *procs_in_group, iov_index;
  61     int bytes_sent, prev_bytes_sent;
  62     struct iovec *decoded_iov;
  63     int bytes_to_write, prev_bytes_to_write;
  64     mca_common_ompio_io_array_t *io_array, *prev_io_array;
  65     int num_io_entries, prev_num_io_entries;
  66 } mca_io_ompio_aggregator_data;
  67 
  68 
  69 #define SWAP_REQUESTS(_r1,_r2) { \
  70     ompi_request_t **_t=_r1;     \
  71     _r1=_r2;                     \
  72     _r2=_t;}
  73 
  74 #define SWAP_AGGR_POINTERS(_aggr,_num) {                        \
  75     int _i;                                                     \
  76     char *_t;                                                   \
  77     for (_i=0; _i<_num; _i++ ) {                                \
  78         _aggr[_i]->prev_io_array=_aggr[_i]->io_array;             \
  79         _aggr[_i]->prev_num_io_entries=_aggr[_i]->num_io_entries; \
  80         _aggr[_i]->prev_bytes_sent=_aggr[_i]->bytes_sent;         \
  81         _aggr[_i]->prev_bytes_to_write=_aggr[_i]->bytes_to_write; \
  82         _t=_aggr[_i]->prev_global_buf;                            \
  83         _aggr[_i]->prev_global_buf=_aggr[_i]->global_buf;         \
  84         _aggr[_i]->global_buf=_t;                                 \
  85         _t=(char *)_aggr[_i]->recvtype;                           \
  86         _aggr[_i]->recvtype=_aggr[_i]->prev_recvtype;             \
  87         _aggr[_i]->prev_recvtype=(ompi_datatype_t **)_t;          }                                                             \
  88 }
  89 
  90 
  91 
  92 static int shuffle_init ( int index, int cycles, int aggregator, int rank, 
  93                           mca_io_ompio_aggregator_data *data, 
  94                           ompi_request_t **reqs );
  95 static int write_init (ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data, int write_chunksize );
  96 
  97 int mca_fcoll_dynamic_gen2_break_file_view ( struct iovec *decoded_iov, int iov_count, 
  98                                         struct iovec *local_iov_array, int local_count, 
  99                                         struct iovec ***broken_decoded_iovs, int **broken_iov_counts,
 100                                         struct iovec ***broken_iov_arrays, int **broken_counts, 
 101                                         MPI_Aint **broken_total_lengths,
 102                                         int stripe_count, int stripe_size); 
 103 
 104 
 105 int mca_fcoll_dynamic_gen2_get_configuration (ompio_file_t *fh, int *dynamic_gen2_num_io_procs, 
 106                                               int **ret_aggregators);
 107 
 108 
 109 static int local_heap_sort (mca_io_ompio_local_io_array *io_array,
 110                             int num_entries,
 111                             int *sorted);
 112 
 113 int mca_fcoll_dynamic_gen2_split_iov_array ( ompio_file_t *fh, mca_common_ompio_io_array_t *work_array,
 114                                              int num_entries, int *last_array_pos, int *last_pos_in_field,
 115                                              int chunk_size );
 116 
 117 
 118 int mca_fcoll_dynamic_gen2_file_write_all (ompio_file_t *fh,
 119                                       const void *buf,
 120                                       int count,
 121                                       struct ompi_datatype_t *datatype,
 122                                       ompi_status_public_t *status)
 123 {
 124     int index = 0;
 125     int cycles = 0;
 126     int ret =0, l, i, j, bytes_per_cycle;
 127     uint32_t iov_count = 0;
 128     struct iovec *decoded_iov = NULL;
 129     struct iovec *local_iov_array=NULL;
 130     uint32_t total_fview_count = 0;
 131     int local_count = 0;
 132     ompi_request_t **reqs1=NULL,**reqs2=NULL;
 133     ompi_request_t **curr_reqs=NULL,**prev_reqs=NULL;
 134     mca_io_ompio_aggregator_data **aggr_data=NULL;
 135     
 136     int *displs = NULL;
 137     int dynamic_gen2_num_io_procs;
 138     size_t max_data = 0;
 139     MPI_Aint *total_bytes_per_process = NULL;
 140     
 141     struct iovec **broken_iov_arrays=NULL;
 142     struct iovec **broken_decoded_iovs=NULL;
 143     int *broken_counts=NULL;
 144     int *broken_iov_counts=NULL;
 145     MPI_Aint *broken_total_lengths=NULL;
 146 
 147     int *aggregators=NULL;
 148     int write_chunksize, *result_counts=NULL;
 149     
 150     
 151 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 152     double write_time = 0.0, start_write_time = 0.0, end_write_time = 0.0;
 153     double comm_time = 0.0, start_comm_time = 0.0, end_comm_time = 0.0;
 154     double exch_write = 0.0, start_exch = 0.0, end_exch = 0.0;
 155     mca_common_ompio_print_entry nentry;
 156 #endif
 157     
 158     
 159     /**************************************************************************
 160      ** 1.  In case the data is not contigous in memory, decode it into an iovec
 161      **************************************************************************/
 162     bytes_per_cycle = fh->f_bytes_per_agg;
 163 
 164     /* since we want to overlap 2 iterations, define the bytes_per_cycle to be half of what
 165        the user requested */
 166     bytes_per_cycle =bytes_per_cycle/2;
 167         
 168     ret =   mca_common_ompio_decode_datatype ((struct ompio_file_t *) fh,
 169                                               datatype,
 170                                               count,
 171                                               buf,
 172                                               &max_data,
 173                                               fh->f_mem_convertor,
 174                                               &decoded_iov,
 175                                               &iov_count);
 176     if (OMPI_SUCCESS != ret ){
 177         goto exit;
 178     }
 179 
 180     if ( MPI_STATUS_IGNORE != status ) {
 181         status->_ucount = max_data;
 182     }
 183     
 184     /* difference to the first generation of this function:
 185     ** dynamic_gen2_num_io_procs should be the number of io_procs per group
 186     ** consequently.Initially, we will have only 1 group.
 187     */
 188     if ( fh->f_stripe_count > 1 ) {
 189         dynamic_gen2_num_io_procs =  fh->f_stripe_count;
 190     }
 191     else {
 192         dynamic_gen2_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators"));
 193         if ( OMPI_ERR_MAX == dynamic_gen2_num_io_procs ) {
 194             ret = OMPI_ERROR;
 195             goto exit;
 196         }
 197     }
 198 
 199 
 200     if ( fh->f_stripe_size == 0 ) {
 201         // EDGAR: just a quick heck for testing 
 202         fh->f_stripe_size = 65536;
 203     }
 204     if ( -1 == mca_fcoll_dynamic_gen2_write_chunksize  ) {
 205         write_chunksize = fh->f_stripe_size;
 206     }
 207     else {
 208         write_chunksize = mca_fcoll_dynamic_gen2_write_chunksize;
 209     }
 210 
 211 
 212     ret = mca_fcoll_dynamic_gen2_get_configuration (fh, &dynamic_gen2_num_io_procs, &aggregators);
 213     if (OMPI_SUCCESS != ret){
 214         goto exit;
 215     }
 216     
 217     aggr_data = (mca_io_ompio_aggregator_data **) malloc ( dynamic_gen2_num_io_procs * 
 218                                                            sizeof(mca_io_ompio_aggregator_data*));
 219 
 220     for ( i=0; i< dynamic_gen2_num_io_procs; i++ ) {
 221         // At this point we know the number of aggregators. If there is a correlation between
 222         // number of aggregators and number of IO nodes, we know how many aggr_data arrays we need
 223         // to allocate.
 224         aggr_data[i] = (mca_io_ompio_aggregator_data *) calloc ( 1, sizeof(mca_io_ompio_aggregator_data));
 225         aggr_data[i]->procs_per_group = fh->f_procs_per_group;
 226         aggr_data[i]->procs_in_group  = fh->f_procs_in_group;
 227         aggr_data[i]->comm = fh->f_comm;
 228         aggr_data[i]->buf  = (char *)buf;             // should not be used in the new version.
 229     }
 230 
 231     /*********************************************************************
 232      *** 2. Generate the local offsets/lengths array corresponding to
 233      ***    this write operation
 234      ********************************************************************/
 235     ret = fh->f_generate_current_file_view( (struct ompio_file_t *) fh,
 236                                             max_data,
 237                                             &local_iov_array,
 238                                             &local_count);
 239     if (ret != OMPI_SUCCESS){
 240         goto exit;
 241     }
 242 
 243     /*************************************************************************
 244      ** 2b. Separate the local_iov_array entries based on the number of aggregators
 245      *************************************************************************/
 246     // broken_iov_arrays[0] contains broken_counts[0] entries to aggregator 0,
 247     // broken_iov_arrays[1] contains broken_counts[1] entries to aggregator 1, etc.
 248     ret = mca_fcoll_dynamic_gen2_break_file_view ( decoded_iov, iov_count, 
 249                                               local_iov_array, local_count, 
 250                                               &broken_decoded_iovs, &broken_iov_counts,
 251                                               &broken_iov_arrays, &broken_counts, 
 252                                               &broken_total_lengths,
 253                                               dynamic_gen2_num_io_procs,  fh->f_stripe_size); 
 254 
 255 
 256     /**************************************************************************
 257      ** 3. Determine the total amount of data to be written and no. of cycles
 258      **************************************************************************/
 259 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 260     start_comm_time = MPI_Wtime();
 261 #endif
 262     if ( 1 == mca_fcoll_dynamic_gen2_num_groups ) {
 263         ret = fh->f_comm->c_coll->coll_allreduce (MPI_IN_PLACE,
 264                                                   broken_total_lengths,
 265                                                   dynamic_gen2_num_io_procs,
 266                                                   MPI_LONG,
 267                                                   MPI_SUM,
 268                                                   fh->f_comm,
 269                                                   fh->f_comm->c_coll->coll_allreduce_module);
 270         if( OMPI_SUCCESS != ret){
 271             goto exit;
 272         }
 273     }
 274     else {
 275         total_bytes_per_process = (MPI_Aint*)malloc
 276             (dynamic_gen2_num_io_procs * fh->f_procs_per_group*sizeof(MPI_Aint));
 277         if (NULL == total_bytes_per_process) {
 278             opal_output (1, "OUT OF MEMORY\n");
 279             ret = OMPI_ERR_OUT_OF_RESOURCE;
 280             goto exit;
 281         }
 282     
 283         ret = ompi_fcoll_base_coll_allgather_array (broken_total_lengths,
 284                                                     dynamic_gen2_num_io_procs,
 285                                                     MPI_LONG,
 286                                                     total_bytes_per_process,
 287                                                     dynamic_gen2_num_io_procs,
 288                                                     MPI_LONG,
 289                                                     0,
 290                                                     fh->f_procs_in_group,
 291                                                     fh->f_procs_per_group,
 292                                                     fh->f_comm);
 293         if( OMPI_SUCCESS != ret){
 294             goto exit;
 295         }
 296         for ( i=0; i<dynamic_gen2_num_io_procs; i++ ) {
 297             broken_total_lengths[i] = 0;
 298             for (j=0 ; j<fh->f_procs_per_group ; j++) {
 299                 broken_total_lengths[i] += total_bytes_per_process[j*dynamic_gen2_num_io_procs + i];
 300             }
 301         }
 302         if (NULL != total_bytes_per_process) {
 303             free (total_bytes_per_process);
 304             total_bytes_per_process = NULL;
 305         }
 306     }
 307 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 308     end_comm_time = MPI_Wtime();
 309     comm_time += (end_comm_time - start_comm_time);
 310 #endif
 311 
 312     cycles=0;
 313     for ( i=0; i<dynamic_gen2_num_io_procs; i++ ) {
 314 #if DEBUG_ON
 315         printf("%d: Overall broken_total_lengths[%d] = %ld\n", fh->f_rank, i, broken_total_lengths[i]);
 316 #endif
 317         if ( ceil((double)broken_total_lengths[i]/bytes_per_cycle) > cycles ) {
 318             cycles = ceil((double)broken_total_lengths[i]/bytes_per_cycle);
 319         }
 320     }
 321     
 322 
 323     result_counts = (int *) malloc ( dynamic_gen2_num_io_procs * fh->f_procs_per_group * sizeof(int) );
 324     if ( NULL == result_counts ) {
 325         ret = OMPI_ERR_OUT_OF_RESOURCE;
 326         goto exit;
 327     }
 328 
 329 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 330     start_comm_time = MPI_Wtime();
 331 #endif
 332     if ( 1 == mca_fcoll_dynamic_gen2_num_groups ) {
 333         ret = fh->f_comm->c_coll->coll_allgather(broken_counts,
 334                                                 dynamic_gen2_num_io_procs,
 335                                                 MPI_INT,
 336                                                 result_counts,
 337                                                 dynamic_gen2_num_io_procs,
 338                                                 MPI_INT,
 339                                                 fh->f_comm,
 340                                                 fh->f_comm->c_coll->coll_allgather_module);            
 341     }
 342     else {
 343         ret = ompi_fcoll_base_coll_allgather_array (broken_counts,
 344                                                dynamic_gen2_num_io_procs,
 345                                                MPI_INT,
 346                                                result_counts,
 347                                                dynamic_gen2_num_io_procs,
 348                                                MPI_INT,
 349                                                0,
 350                                                fh->f_procs_in_group,
 351                                                fh->f_procs_per_group,
 352                                                fh->f_comm);
 353     }
 354     if( OMPI_SUCCESS != ret){
 355         goto exit;
 356     }
 357 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 358     end_comm_time = MPI_Wtime();
 359     comm_time += (end_comm_time - start_comm_time);
 360 #endif
 361 
 362     /*************************************************************
 363      *** 4. Allgather the offset/lengths array from all processes
 364      *************************************************************/
 365     for ( i=0; i< dynamic_gen2_num_io_procs; i++ ) {
 366         aggr_data[i]->total_bytes = broken_total_lengths[i];
 367         aggr_data[i]->decoded_iov = broken_decoded_iovs[i];
 368         aggr_data[i]->fview_count = (int *) malloc (fh->f_procs_per_group * sizeof (int));
 369         if (NULL == aggr_data[i]->fview_count) {
 370             opal_output (1, "OUT OF MEMORY\n");
 371             ret = OMPI_ERR_OUT_OF_RESOURCE;
 372             goto exit;
 373         }
 374         for ( j=0; j <fh->f_procs_per_group; j++ ) {
 375             aggr_data[i]->fview_count[j] = result_counts[dynamic_gen2_num_io_procs*j+i];
 376         }
 377         displs = (int*) malloc (fh->f_procs_per_group * sizeof (int));
 378         if (NULL == displs) {
 379             opal_output (1, "OUT OF MEMORY\n");
 380             ret = OMPI_ERR_OUT_OF_RESOURCE;
 381             goto exit;
 382         }
 383         
 384         displs[0] = 0;
 385         total_fview_count = aggr_data[i]->fview_count[0];
 386         for (j=1 ; j<fh->f_procs_per_group ; j++) {
 387             total_fview_count += aggr_data[i]->fview_count[j];
 388             displs[j] = displs[j-1] + aggr_data[i]->fview_count[j-1];
 389         }
 390         
 391 #if DEBUG_ON
 392         printf("total_fview_count : %d\n", total_fview_count);
 393         if (aggregators[i] == fh->f_rank) {
 394             for (j=0 ; j<fh->f_procs_per_group ; i++) {
 395                 printf ("%d: PROCESS: %d  ELEMENTS: %d  DISPLS: %d\n",
 396                         fh->f_rank,
 397                         j,
 398                         aggr_data[i]->fview_count[j],
 399                         displs[j]);
 400             }
 401         }
 402 #endif
 403     
 404         /* allocate the global iovec  */
 405         if (0 != total_fview_count) {
 406             aggr_data[i]->global_iov_array = (struct iovec*) malloc (total_fview_count *
 407                                                                      sizeof(struct iovec));
 408             if (NULL == aggr_data[i]->global_iov_array){
 409                 opal_output(1, "OUT OF MEMORY\n");
 410                 ret = OMPI_ERR_OUT_OF_RESOURCE;
 411                 goto exit;
 412             }            
 413         }
 414     
 415 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 416         start_comm_time = MPI_Wtime();
 417 #endif
 418         if ( 1 == mca_fcoll_dynamic_gen2_num_groups ) {
 419             ret = fh->f_comm->c_coll->coll_allgatherv (broken_iov_arrays[i],
 420                                                       broken_counts[i],
 421                                                       fh->f_iov_type,
 422                                                       aggr_data[i]->global_iov_array,
 423                                                       aggr_data[i]->fview_count,
 424                                                       displs,
 425                                                       fh->f_iov_type,
 426                                                       fh->f_comm,
 427                                                       fh->f_comm->c_coll->coll_allgatherv_module );
 428         }
 429         else {
 430             ret = ompi_fcoll_base_coll_allgatherv_array (broken_iov_arrays[i],
 431                                                     broken_counts[i],
 432                                                     fh->f_iov_type,
 433                                                     aggr_data[i]->global_iov_array,
 434                                                     aggr_data[i]->fview_count,
 435                                                     displs,
 436                                                     fh->f_iov_type,
 437                                                     aggregators[i],
 438                                                     fh->f_procs_in_group,
 439                                                     fh->f_procs_per_group,
 440                                                     fh->f_comm);
 441         }
 442         if (OMPI_SUCCESS != ret){
 443             goto exit;
 444         }
 445 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 446         end_comm_time = MPI_Wtime();
 447         comm_time += (end_comm_time - start_comm_time);
 448 #endif
 449         
 450         /****************************************************************************************
 451          *** 5. Sort the global offset/lengths list based on the offsets.
 452          *** The result of the sort operation is the 'sorted', an integer array,
 453          *** which contains the indexes of the global_iov_array based on the offset.
 454          *** For example, if global_iov_array[x].offset is followed by global_iov_array[y].offset
 455          *** in the file, and that one is followed by global_iov_array[z].offset, than
 456          *** sorted[0] = x, sorted[1]=y and sorted[2]=z;
 457          ******************************************************************************************/
 458         if (0 != total_fview_count) {
 459             aggr_data[i]->sorted = (int *)malloc (total_fview_count * sizeof(int));
 460             if (NULL == aggr_data[i]->sorted) {
 461                 opal_output (1, "OUT OF MEMORY\n");
 462                 ret = OMPI_ERR_OUT_OF_RESOURCE;
 463                 goto exit;
 464             }
 465             ompi_fcoll_base_sort_iovec (aggr_data[i]->global_iov_array, total_fview_count, aggr_data[i]->sorted);
 466         }
 467         
 468         if (NULL != local_iov_array){
 469             free(local_iov_array);
 470             local_iov_array = NULL;
 471         }
 472         
 473         if (NULL != displs){
 474             free(displs);
 475             displs=NULL;
 476         }
 477     
 478     
 479 #if DEBUG_ON
 480         if (aggregators[i] == fh->f_rank) {
 481             uint32_t tv=0;
 482             for (tv=0 ; tv<total_fview_count ; tv++) {
 483                 printf("%d: OFFSET: %lld   LENGTH: %ld\n",
 484                        fh->f_rank,
 485                        aggr_data[i]->global_iov_array[aggr_data[i]->sorted[tv]].iov_base,
 486                        aggr_data[i]->global_iov_array[aggr_data[i]->sorted[tv]].iov_len);
 487             }
 488         }
 489 #endif
 490         /*************************************************************
 491          *** 6. Determine the number of cycles required to execute this
 492          ***    operation
 493          *************************************************************/
 494         
 495         aggr_data[i]->bytes_per_cycle = bytes_per_cycle;
 496     
 497         if (aggregators[i] == fh->f_rank) {
 498             aggr_data[i]->disp_index = (int *)malloc (fh->f_procs_per_group * sizeof (int));
 499             if (NULL == aggr_data[i]->disp_index) {
 500                 opal_output (1, "OUT OF MEMORY\n");
 501                 ret = OMPI_ERR_OUT_OF_RESOURCE;
 502                 goto exit;
 503             }
 504         
 505             aggr_data[i]->max_disp_index = (int *)calloc (fh->f_procs_per_group,  sizeof (int));
 506             if (NULL == aggr_data[i]->max_disp_index) {
 507                 opal_output (1, "OUT OF MEMORY\n");
 508                 ret = OMPI_ERR_OUT_OF_RESOURCE;
 509                 goto exit;
 510             }
 511 
 512             aggr_data[i]->blocklen_per_process = (int **)calloc (fh->f_procs_per_group, sizeof (int*));
 513             if (NULL == aggr_data[i]->blocklen_per_process) {
 514                 opal_output (1, "OUT OF MEMORY\n");
 515                 ret = OMPI_ERR_OUT_OF_RESOURCE;
 516                 goto exit;
 517             }
 518         
 519             aggr_data[i]->displs_per_process = (MPI_Aint **)calloc (fh->f_procs_per_group, sizeof (MPI_Aint*));
 520             if (NULL == aggr_data[i]->displs_per_process) {
 521                 opal_output (1, "OUT OF MEMORY\n");
 522                 ret = OMPI_ERR_OUT_OF_RESOURCE;
 523                 goto exit;
 524             }
 525         
 526             
 527             aggr_data[i]->global_buf       = (char *) malloc (bytes_per_cycle);
 528             aggr_data[i]->prev_global_buf  = (char *) malloc (bytes_per_cycle);
 529             if (NULL == aggr_data[i]->global_buf || NULL == aggr_data[i]->prev_global_buf){
 530                 opal_output(1, "OUT OF MEMORY");
 531                 ret = OMPI_ERR_OUT_OF_RESOURCE;
 532                 goto exit;
 533             }
 534         
 535             aggr_data[i]->recvtype = (ompi_datatype_t **) malloc (fh->f_procs_per_group  * 
 536                                                                   sizeof(ompi_datatype_t *));
 537             aggr_data[i]->prev_recvtype = (ompi_datatype_t **) malloc (fh->f_procs_per_group  * 
 538                                                                        sizeof(ompi_datatype_t *));
 539             if (NULL == aggr_data[i]->recvtype || NULL == aggr_data[i]->prev_recvtype) {
 540                 opal_output (1, "OUT OF MEMORY\n");
 541                 ret = OMPI_ERR_OUT_OF_RESOURCE;
 542                 goto exit;
 543             }
 544             for(l=0;l<fh->f_procs_per_group;l++){
 545                 aggr_data[i]->recvtype[l]      = MPI_DATATYPE_NULL;
 546                 aggr_data[i]->prev_recvtype[l] = MPI_DATATYPE_NULL;
 547             }
 548         }
 549     
 550 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 551         start_exch = MPI_Wtime();
 552 #endif
 553     }    
 554 
 555     reqs1 = (ompi_request_t **)malloc ((fh->f_procs_per_group + 1 )*dynamic_gen2_num_io_procs *sizeof(ompi_request_t *));
 556     reqs2 = (ompi_request_t **)malloc ((fh->f_procs_per_group + 1 )*dynamic_gen2_num_io_procs *sizeof(ompi_request_t *));
 557     if ( NULL == reqs1 || NULL == reqs2 ) {
 558         opal_output (1, "OUT OF MEMORY\n");
 559         ret = OMPI_ERR_OUT_OF_RESOURCE;
 560         goto exit;
 561     }
 562     for (l=0,i=0; i < dynamic_gen2_num_io_procs; i++ ) {
 563         for ( j=0; j< (fh->f_procs_per_group+1); j++ ) {
 564             reqs1[l] = MPI_REQUEST_NULL;
 565             reqs2[l] = MPI_REQUEST_NULL;
 566             l++;
 567         }
 568     }
 569 
 570     curr_reqs = reqs1;
 571     prev_reqs = reqs2;
 572     
 573     /* Initialize communication for iteration 0 */
 574     if ( cycles > 0 ) {
 575         for ( i=0; i<dynamic_gen2_num_io_procs; i++ ) {
 576             ret = shuffle_init ( 0, cycles, aggregators[i], fh->f_rank, aggr_data[i], 
 577                                  &curr_reqs[i*(fh->f_procs_per_group + 1)] );
 578             if ( OMPI_SUCCESS != ret ) {
 579                 goto exit;
 580             }
 581         }
 582     }
 583 
 584 
 585     for (index = 1; index < cycles; index++) {
 586         SWAP_REQUESTS(curr_reqs,prev_reqs);
 587         SWAP_AGGR_POINTERS(aggr_data,dynamic_gen2_num_io_procs); 
 588 
 589         /* Initialize communication for iteration i */
 590         for ( i=0; i<dynamic_gen2_num_io_procs; i++ ) {
 591             ret = shuffle_init ( index, cycles, aggregators[i], fh->f_rank, aggr_data[i], 
 592                                  &curr_reqs[i*(fh->f_procs_per_group + 1)] );
 593             if ( OMPI_SUCCESS != ret ) {
 594                 goto exit;
 595             }
 596         }
 597 
 598         /* Finish communication for iteration i-1 */
 599         ret = ompi_request_wait_all ( (fh->f_procs_per_group + 1 )*dynamic_gen2_num_io_procs, 
 600                                       prev_reqs, MPI_STATUS_IGNORE);
 601         if (OMPI_SUCCESS != ret){
 602             goto exit;
 603         }
 604 
 605         
 606         /* Write data for iteration i-1 */
 607         for ( i=0; i<dynamic_gen2_num_io_procs; i++ ) {
 608 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 609             start_write_time = MPI_Wtime();
 610 #endif
 611             ret = write_init (fh, aggregators[i], aggr_data[i], write_chunksize );
 612             if (OMPI_SUCCESS != ret){
 613                 goto exit;
 614             }            
 615 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 616             end_write_time = MPI_Wtime();
 617             write_time += end_write_time - start_write_time;
 618 #endif
 619         }
 620         
 621     } /* end  for (index = 0; index < cycles; index++) */
 622 
 623 
 624     /* Finish communication for iteration i = cycles-1 */
 625     if ( cycles > 0 ) {
 626         SWAP_REQUESTS(curr_reqs,prev_reqs);
 627         SWAP_AGGR_POINTERS(aggr_data,dynamic_gen2_num_io_procs); 
 628         
 629         ret = ompi_request_wait_all ( (fh->f_procs_per_group + 1 )*dynamic_gen2_num_io_procs, 
 630                                       prev_reqs, MPI_STATUS_IGNORE);
 631         if (OMPI_SUCCESS != ret){
 632             goto exit;
 633         }
 634         
 635         /* Write data for iteration i=cycles-1 */
 636         for ( i=0; i<dynamic_gen2_num_io_procs; i++ ) {
 637 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 638             start_write_time = MPI_Wtime();
 639 #endif
 640             ret = write_init (fh, aggregators[i], aggr_data[i], write_chunksize );
 641             if (OMPI_SUCCESS != ret){
 642                 goto exit;
 643             }                    
 644 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 645             end_write_time = MPI_Wtime();
 646             write_time += end_write_time - start_write_time;
 647 #endif
 648         }
 649     }
 650 
 651         
 652 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 653     end_exch = MPI_Wtime();
 654     exch_write += end_exch - start_exch;
 655     nentry.time[0] = write_time;
 656     nentry.time[1] = comm_time;
 657     nentry.time[2] = exch_write;
 658     nentry.aggregator = 0;
 659     for ( i=0; i<dynamic_gen2_num_io_procs; i++ ) {
 660         if (aggregators[i] == fh->f_rank)
 661         nentry.aggregator = 1;
 662     }
 663     nentry.nprocs_for_coll = dynamic_gen2_num_io_procs;
 664     if (!mca_common_ompio_full_print_queue(fh->f_coll_write_time)){
 665         mca_common_ompio_register_print_entry(fh->f_coll_write_time,
 666                                                nentry);
 667     }
 668 #endif
 669     
 670     
 671 exit :
 672     
 673     if ( NULL != aggr_data ) {
 674         
 675         for ( i=0; i< dynamic_gen2_num_io_procs; i++ ) {            
 676             if (aggregators[i] == fh->f_rank) {
 677                 if (NULL != aggr_data[i]->recvtype){
 678                     for (j =0; j< aggr_data[i]->procs_per_group; j++) {
 679                         if ( MPI_DATATYPE_NULL != aggr_data[i]->recvtype[j] ) {
 680                             ompi_datatype_destroy(&aggr_data[i]->recvtype[j]);
 681                         }
 682                         if ( MPI_DATATYPE_NULL != aggr_data[i]->prev_recvtype[j] ) {
 683                             ompi_datatype_destroy(&aggr_data[i]->prev_recvtype[j]);
 684                         }
 685 
 686                     }
 687                     free(aggr_data[i]->recvtype);
 688                     free(aggr_data[i]->prev_recvtype);
 689                 }
 690                 
 691                 free (aggr_data[i]->disp_index);
 692                 free (aggr_data[i]->max_disp_index);
 693                 free (aggr_data[i]->global_buf);
 694                 free (aggr_data[i]->prev_global_buf);
 695                 for(l=0;l<aggr_data[i]->procs_per_group;l++){
 696                     free (aggr_data[i]->blocklen_per_process[l]);
 697                     free (aggr_data[i]->displs_per_process[l]);
 698                 }
 699                 
 700                 free (aggr_data[i]->blocklen_per_process);
 701                 free (aggr_data[i]->displs_per_process);
 702             }
 703             free (aggr_data[i]->sorted);
 704             free (aggr_data[i]->global_iov_array);
 705             free (aggr_data[i]->fview_count);
 706             free (aggr_data[i]->decoded_iov);
 707             
 708             free (aggr_data[i]);
 709         }
 710         free (aggr_data);
 711     }
 712     free(local_iov_array);
 713     free(displs);
 714     free(decoded_iov);
 715     free(broken_counts);
 716     free(broken_total_lengths);
 717     free(broken_iov_counts);
 718     free(broken_decoded_iovs); // decoded_iov arrays[i] were freed as aggr_data[i]->decoded_iov;
 719     if ( NULL != broken_iov_arrays ) {
 720         for (i=0; i<dynamic_gen2_num_io_procs; i++ ) {
 721             free(broken_iov_arrays[i]);
 722         }
 723     }
 724     free(broken_iov_arrays);
 725     free(aggregators);
 726     free(fh->f_procs_in_group);
 727     fh->f_procs_in_group=NULL;
 728     fh->f_procs_per_group=0;
 729     free(reqs1);
 730     free(reqs2);
 731     free(result_counts);
 732 
 733      
 734     return OMPI_SUCCESS;
 735 }
 736 
 737 
 738 static int write_init (ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data, int write_chunksize )
 739 {
 740     int ret=OMPI_SUCCESS;
 741     int last_array_pos=0;
 742     int last_pos=0;
 743         
 744 
 745     if ( aggregator == fh->f_rank && aggr_data->prev_num_io_entries) {
 746         while ( aggr_data->prev_bytes_to_write > 0 ) {                    
 747             aggr_data->prev_bytes_to_write -= mca_fcoll_dynamic_gen2_split_iov_array (fh, aggr_data->prev_io_array, 
 748                                                                                       aggr_data->prev_num_io_entries, 
 749                                                                                       &last_array_pos, &last_pos,
 750                                                                                       write_chunksize );
 751             if ( 0 >  fh->f_fbtl->fbtl_pwritev (fh)) {
 752                 free ( aggr_data->prev_io_array);
 753                 opal_output (1, "dynamic_gen2_write_all: fbtl_pwritev failed\n");
 754                 ret = OMPI_ERROR;
 755                 goto exit;
 756             }
 757         }
 758         free ( fh->f_io_array );
 759         free ( aggr_data->prev_io_array);
 760     } 
 761 
 762 exit:
 763 
 764     fh->f_io_array=NULL;
 765     fh->f_num_of_io_entries=0;
 766     
 767     return ret;
 768 }
 769 
 770 static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_io_ompio_aggregator_data *data, 
 771                           ompi_request_t **reqs )
 772 {
 773     int bytes_sent = 0;
 774     int blocks=0, temp_pindex;
 775     int i, j, l, ret;
 776     int  entries_per_aggregator=0;
 777     mca_io_ompio_local_io_array *file_offsets_for_agg=NULL;
 778     int *sorted_file_offsets=NULL;
 779     int temp_index=0;
 780     MPI_Aint *memory_displacements=NULL;
 781     int *temp_disp_index=NULL;
 782     MPI_Aint global_count = 0;
 783     int* blocklength_proc=NULL;
 784     ptrdiff_t* displs_proc=NULL;
 785 
 786     data->num_io_entries = 0;
 787     data->bytes_sent = 0;
 788     data->io_array=NULL;
 789     /**********************************************************************
 790      ***  7a. Getting ready for next cycle: initializing and freeing buffers
 791      **********************************************************************/
 792     if (aggregator == rank) {
 793         
 794         if (NULL != data->recvtype){
 795             for (i =0; i< data->procs_per_group; i++) {
 796                 if ( MPI_DATATYPE_NULL != data->recvtype[i] ) {
 797                     ompi_datatype_destroy(&data->recvtype[i]);
 798                     data->recvtype[i] = MPI_DATATYPE_NULL;
 799                 }
 800             }
 801         }
 802         
 803         for(l=0;l<data->procs_per_group;l++){
 804             data->disp_index[l] =  1;
 805 
 806             if(data->max_disp_index[l] == 0) {
 807                 data->blocklen_per_process[l]   = (int *)       calloc (INIT_LEN, sizeof(int));
 808                 data->displs_per_process[l]     = (MPI_Aint *)  calloc (INIT_LEN, sizeof(MPI_Aint));
 809                 if (NULL == data->displs_per_process[l] || NULL == data->blocklen_per_process[l]){
 810                     opal_output (1, "OUT OF MEMORY for displs\n");
 811                     ret = OMPI_ERR_OUT_OF_RESOURCE;
 812                     goto exit;
 813                 }
 814                 data->max_disp_index[l] = INIT_LEN;
 815             }
 816             else {
 817                 memset ( data->blocklen_per_process[l], 0, data->max_disp_index[l]*sizeof(int) );
 818                 memset ( data->displs_per_process[l], 0, data->max_disp_index[l]*sizeof(MPI_Aint) );
 819             }
 820         }
 821     } /* (aggregator == rank */
 822     
 823     /**************************************************************************
 824      ***  7b. Determine the number of bytes to be actually written in this cycle
 825      **************************************************************************/
 826     int local_cycles= ceil((double)data->total_bytes / data->bytes_per_cycle);
 827     if ( index  < (local_cycles -1) ) {
 828         data->bytes_to_write_in_cycle = data->bytes_per_cycle;
 829     }
 830     else if ( index == (local_cycles -1)) {
 831         data->bytes_to_write_in_cycle = data->total_bytes - data->bytes_per_cycle*index ;
 832     }
 833     else {
 834         data->bytes_to_write_in_cycle = 0;
 835     }
 836     data->bytes_to_write = data->bytes_to_write_in_cycle;
 837 
 838 #if DEBUG_ON
 839     if (aggregator == rank) {
 840         printf ("****%d: CYCLE %d   Bytes %lld**********\n",
 841                 rank,
 842                 index,
 843                 data->bytes_to_write_in_cycle);
 844     }
 845 #endif
 846     /**********************************************************
 847      **Gather the Data from all the processes at the writers **
 848      *********************************************************/
 849     
 850 #if DEBUG_ON
 851     printf("bytes_to_write_in_cycle: %ld, cycle : %d\n", data->bytes_to_write_in_cycle,
 852            index);
 853 #endif
 854     
 855     /*****************************************************************
 856      *** 7c. Calculate how much data will be contributed in this cycle
 857      ***     by each process
 858      *****************************************************************/
 859     
 860     /* The blocklen and displs calculation only done at aggregators!*/
 861     while (data->bytes_to_write_in_cycle) {
 862         
 863         /* This next block identifies which process is the holder
 864         ** of the sorted[current_index] element;
 865         */
 866         blocks = data->fview_count[0];
 867         for (j=0 ; j<data->procs_per_group ; j++) {
 868             if (data->sorted[data->current_index] < blocks) {
 869                 data->n = j;
 870                 break;
 871             }
 872             else {
 873                 blocks += data->fview_count[j+1];
 874             }
 875         }
 876         
 877         if (data->bytes_remaining) {
 878             /* Finish up a partially used buffer from the previous  cycle */
 879             
 880             if (data->bytes_remaining <= data->bytes_to_write_in_cycle) {
 881                 /* The data fits completely into the block */
 882                 if (aggregator == rank) {
 883                     data->blocklen_per_process[data->n][data->disp_index[data->n] - 1] = data->bytes_remaining;
 884                     data->displs_per_process[data->n][data->disp_index[data->n] - 1] =
 885                         (ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base +
 886                         (data->global_iov_array[data->sorted[data->current_index]].iov_len
 887                          - data->bytes_remaining);
 888                     
 889                     data->disp_index[data->n] += 1;
 890 
 891                     /* In this cases the length is consumed so allocating for
 892                        next displacement and blocklength*/
 893                     if ( data->disp_index[data->n] == data->max_disp_index[data->n] ) {
 894                         data->max_disp_index[data->n] *= 2;
 895                         data->blocklen_per_process[data->n] = (int *) realloc(
 896                             (void *)data->blocklen_per_process[data->n], 
 897                             (data->max_disp_index[data->n])*sizeof(int));
 898                         data->displs_per_process[data->n] = (MPI_Aint *) realloc(
 899                             (void *)data->displs_per_process[data->n], 
 900                             (data->max_disp_index[data->n])*sizeof(MPI_Aint));
 901                     }
 902 
 903                     data->blocklen_per_process[data->n][data->disp_index[data->n]] = 0;
 904                     data->displs_per_process[data->n][data->disp_index[data->n]] = 0;
 905                 }
 906                 if (data->procs_in_group[data->n] == rank) {
 907                     bytes_sent += data->bytes_remaining;
 908                 }
 909                 data->current_index ++;
 910                 data->bytes_to_write_in_cycle -= data->bytes_remaining;
 911                 data->bytes_remaining = 0;
 912             }
 913             else {
 914                 /* the remaining data from the previous cycle is larger than the
 915                    data->bytes_to_write_in_cycle, so we have to segment again */
 916                 if (aggregator == rank) {
 917                     data->blocklen_per_process[data->n][data->disp_index[data->n] - 1] = data->bytes_to_write_in_cycle;
 918                     data->displs_per_process[data->n][data->disp_index[data->n] - 1] =
 919                         (ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base +
 920                         (data->global_iov_array[data->sorted[data->current_index]].iov_len
 921                          - data->bytes_remaining);
 922                 }
 923                 
 924                 if (data->procs_in_group[data->n] == rank) {
 925                     bytes_sent += data->bytes_to_write_in_cycle;
 926                 }
 927                 data->bytes_remaining -= data->bytes_to_write_in_cycle;
 928                 data->bytes_to_write_in_cycle = 0;
 929                 break;
 930             }
 931         }
 932         else {
 933             /* No partially used entry available, have to start a new one */
 934             if (data->bytes_to_write_in_cycle <
 935                 (MPI_Aint) data->global_iov_array[data->sorted[data->current_index]].iov_len) {
 936                 /* This entry has more data than we can sendin one cycle */
 937                 if (aggregator == rank) {
 938                     data->blocklen_per_process[data->n][data->disp_index[data->n] - 1] = data->bytes_to_write_in_cycle;
 939                     data->displs_per_process[data->n][data->disp_index[data->n] - 1] =
 940                         (ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base ;
 941                 }
 942                 if (data->procs_in_group[data->n] == rank) {
 943                     bytes_sent += data->bytes_to_write_in_cycle;
 944                     
 945                 }
 946                 data->bytes_remaining = data->global_iov_array[data->sorted[data->current_index]].iov_len -
 947                     data->bytes_to_write_in_cycle;
 948                 data->bytes_to_write_in_cycle = 0;
 949                 break;
 950             }
 951             else {
 952                 /* Next data entry is less than data->bytes_to_write_in_cycle */
 953                 if (aggregator == rank) {
 954                     data->blocklen_per_process[data->n][data->disp_index[data->n] - 1] =
 955                         data->global_iov_array[data->sorted[data->current_index]].iov_len;
 956                     data->displs_per_process[data->n][data->disp_index[data->n] - 1] = (ptrdiff_t)
 957                         data->global_iov_array[data->sorted[data->current_index]].iov_base;
 958                     
 959                     data->disp_index[data->n] += 1;
 960 
 961                     /*realloc for next blocklength
 962                       and assign this displacement and check for next displs as
 963                       the total length of this entry has been consumed!*/
 964                     if ( data->disp_index[data->n] == data->max_disp_index[data->n] ) {
 965                         data->max_disp_index[data->n] *= 2;
 966                         data->blocklen_per_process[data->n] = (int *) realloc(
 967                             (void *)data->blocklen_per_process[data->n], 
 968                             (data->max_disp_index[data->n]*sizeof(int)));
 969                         data->displs_per_process[data->n] = (MPI_Aint *)realloc(
 970                             (void *)data->displs_per_process[data->n], 
 971                             (data->max_disp_index[data->n]*sizeof(MPI_Aint)));
 972                     }
 973                     data->blocklen_per_process[data->n][data->disp_index[data->n]] = 0;
 974                     data->displs_per_process[data->n][data->disp_index[data->n]] = 0;
 975                 }
 976                 if (data->procs_in_group[data->n] == rank) {
 977                     bytes_sent += data->global_iov_array[data->sorted[data->current_index]].iov_len;
 978                 }
 979                 data->bytes_to_write_in_cycle -=
 980                     data->global_iov_array[data->sorted[data->current_index]].iov_len;
 981                 data->current_index ++;
 982             }
 983         }
 984     }
 985     
 986     
 987     /*************************************************************************
 988      *** 7d. Calculate the displacement on where to put the data and allocate
 989      ***     the recieve buffer (global_buf)
 990      *************************************************************************/
 991     if (aggregator == rank) {
 992         entries_per_aggregator=0;
 993         for (i=0;i<data->procs_per_group; i++){
 994             for (j=0;j<data->disp_index[i];j++){
 995                 if (data->blocklen_per_process[i][j] > 0)
 996                     entries_per_aggregator++ ;
 997             }
 998         }
 999         
1000 #if DEBUG_ON
1001         printf("%d: cycle: %d, bytes_sent: %d\n ",rank,index,
1002                bytes_sent);
1003         printf("%d : Entries per aggregator : %d\n",rank,entries_per_aggregator);
1004 #endif
1005         
1006         if (entries_per_aggregator > 0){
1007             file_offsets_for_agg = (mca_io_ompio_local_io_array *)
1008                 malloc(entries_per_aggregator*sizeof(mca_io_ompio_local_io_array));
1009             if (NULL == file_offsets_for_agg) {
1010                 opal_output (1, "OUT OF MEMORY\n");
1011                 ret = OMPI_ERR_OUT_OF_RESOURCE;
1012                 goto exit;
1013             }
1014             
1015             sorted_file_offsets = (int *)
1016                 malloc (entries_per_aggregator*sizeof(int));
1017             if (NULL == sorted_file_offsets){
1018                 opal_output (1, "OUT OF MEMORY\n");
1019                 ret =  OMPI_ERR_OUT_OF_RESOURCE;
1020                 goto exit;
1021             }
1022             
1023             /*Moving file offsets to an IO array!*/
1024             temp_index = 0;
1025             
1026             for (i=0;i<data->procs_per_group; i++){
1027                 for(j=0;j<data->disp_index[i];j++){
1028                     if (data->blocklen_per_process[i][j] > 0){
1029                         file_offsets_for_agg[temp_index].length =
1030                             data->blocklen_per_process[i][j];
1031                         file_offsets_for_agg[temp_index].process_id = i;
1032                         file_offsets_for_agg[temp_index].offset =
1033                             data->displs_per_process[i][j];
1034                         temp_index++;
1035                         
1036 #if DEBUG_ON
1037                         printf("************Cycle: %d,  Aggregator: %d ***************\n",
1038                                index+1,rank);
1039                         
1040                         printf("%d sends blocklen[%d]: %d, disp[%d]: %ld to %d\n",
1041                                data->procs_in_group[i],j,
1042                                data->blocklen_per_process[i][j],j,
1043                                data->displs_per_process[i][j],
1044                                rank);
1045 #endif
1046                     }
1047                 }
1048             }
1049                 
1050             /* Sort the displacements for each aggregator*/
1051             local_heap_sort (file_offsets_for_agg,
1052                              entries_per_aggregator,
1053                              sorted_file_offsets);
1054             
1055             /*create contiguous memory displacements
1056               based on blocklens on the same displs array
1057               and map it to this aggregator's actual
1058               file-displacements (this is in the io-array created above)*/
1059             memory_displacements = (MPI_Aint *) malloc
1060                 (entries_per_aggregator * sizeof(MPI_Aint));
1061             
1062             memory_displacements[sorted_file_offsets[0]] = 0;
1063             for (i=1; i<entries_per_aggregator; i++){
1064                 memory_displacements[sorted_file_offsets[i]] =
1065                     memory_displacements[sorted_file_offsets[i-1]] +
1066                     file_offsets_for_agg[sorted_file_offsets[i-1]].length;
1067             }
1068             
1069             temp_disp_index = (int *)calloc (1, data->procs_per_group * sizeof (int));
1070             if (NULL == temp_disp_index) {
1071                 opal_output (1, "OUT OF MEMORY\n");
1072                 ret = OMPI_ERR_OUT_OF_RESOURCE;
1073                 goto exit;
1074             }
1075             
1076             /*Now update the displacements array  with memory offsets*/
1077             global_count = 0;
1078             for (i=0;i<entries_per_aggregator;i++){
1079                 temp_pindex =
1080                     file_offsets_for_agg[sorted_file_offsets[i]].process_id;
1081                 data->displs_per_process[temp_pindex][temp_disp_index[temp_pindex]] =
1082                     memory_displacements[sorted_file_offsets[i]];
1083                 if (temp_disp_index[temp_pindex] < data->disp_index[temp_pindex])
1084                     temp_disp_index[temp_pindex] += 1;
1085                 else{
1086                     printf("temp_disp_index[%d]: %d is greater than disp_index[%d]: %d\n",
1087                            temp_pindex, temp_disp_index[temp_pindex],
1088                            temp_pindex, data->disp_index[temp_pindex]);
1089                 }
1090                 global_count +=
1091                     file_offsets_for_agg[sorted_file_offsets[i]].length;
1092             }
1093             
1094             if (NULL != temp_disp_index){
1095                 free(temp_disp_index);
1096                 temp_disp_index = NULL;
1097             }
1098             
1099 #if DEBUG_ON
1100             
1101             printf("************Cycle: %d,  Aggregator: %d ***************\n",
1102                    index+1,rank);
1103             for (i=0;i<data->procs_per_group; i++){
1104                 for(j=0;j<data->disp_index[i];j++){
1105                     if (data->blocklen_per_process[i][j] > 0){
1106                         printf("%d sends blocklen[%d]: %d, disp[%d]: %ld to %d\n",
1107                                data->procs_in_group[i],j,
1108                                data->blocklen_per_process[i][j],j,
1109                                data->displs_per_process[i][j],
1110                                rank);
1111                         
1112                     }
1113                 }
1114             }
1115             printf("************Cycle: %d,  Aggregator: %d ***************\n",
1116                    index+1,rank);
1117             for (i=0; i<entries_per_aggregator;i++){
1118                 printf("%d: OFFSET: %lld   LENGTH: %ld, Mem-offset: %ld\n",
1119                        file_offsets_for_agg[sorted_file_offsets[i]].process_id,
1120                        file_offsets_for_agg[sorted_file_offsets[i]].offset,
1121                        file_offsets_for_agg[sorted_file_offsets[i]].length,
1122                        memory_displacements[sorted_file_offsets[i]]);
1123             }
1124             printf("%d : global_count : %ld, bytes_sent : %d\n",
1125                    rank,global_count, bytes_sent);
1126 #endif
1127 //#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
1128 //            start_comm_time = MPI_Wtime();
1129 //#endif
1130             /*************************************************************************
1131              *** 7e. Perform the actual communication
1132              *************************************************************************/
1133             for (i=0;i<data->procs_per_group; i++) {
1134                 size_t datatype_size;
1135                 reqs[i] = MPI_REQUEST_NULL;
1136                 if ( 0 < data->disp_index[i] ) {
1137                     ompi_datatype_create_hindexed(data->disp_index[i],
1138                                                   data->blocklen_per_process[i],
1139                                                   data->displs_per_process[i],
1140                                                   MPI_BYTE,
1141                                                   &data->recvtype[i]);
1142                     ompi_datatype_commit(&data->recvtype[i]);
1143                     opal_datatype_type_size(&data->recvtype[i]->super, &datatype_size);
1144                     
1145                     if (datatype_size){
1146                         ret = MCA_PML_CALL(irecv(data->global_buf,
1147                                                  1,
1148                                                  data->recvtype[i],
1149                                                  data->procs_in_group[i],
1150                                                  FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG+index,
1151                                                  data->comm,
1152                                                  &reqs[i]));
1153                         if (OMPI_SUCCESS != ret){
1154                             goto exit;
1155                         }
1156                     }
1157                 }
1158             }
1159         }  /* end if (entries_per_aggr > 0 ) */
1160     }/* end if (aggregator == rank ) */
1161 
1162     if (bytes_sent) {
1163         size_t remaining      = bytes_sent;
1164         int block_index       = -1;
1165         int blocklength_size  = INIT_LEN;
1166 
1167         ptrdiff_t send_mem_address  = (ptrdiff_t) NULL;
1168         ompi_datatype_t *newType    = MPI_DATATYPE_NULL;
1169         blocklength_proc            = (int *)       calloc (blocklength_size, sizeof (int));
1170         displs_proc                 = (ptrdiff_t *) calloc (blocklength_size, sizeof (ptrdiff_t));
1171 
1172         if (NULL == blocklength_proc || NULL == displs_proc ) {
1173             opal_output (1, "OUT OF MEMORY\n");
1174             ret = OMPI_ERR_OUT_OF_RESOURCE;
1175             goto exit;
1176         }
1177 
1178         while (remaining) {
1179             block_index++;
1180 
1181             if(0 == block_index) {
1182                 send_mem_address = (ptrdiff_t) (data->decoded_iov[data->iov_index].iov_base) +
1183                                                 data->current_position;
1184             }
1185             else {
1186                 // Reallocate more memory if blocklength_size is not enough
1187                 if(0 == block_index % INIT_LEN) {
1188                     blocklength_size += INIT_LEN;
1189                     blocklength_proc = (int *)       realloc(blocklength_proc, blocklength_size * sizeof(int));
1190                     displs_proc      = (ptrdiff_t *) realloc(displs_proc, blocklength_size * sizeof(ptrdiff_t));
1191                 }
1192                 displs_proc[block_index] = (ptrdiff_t) (data->decoded_iov[data->iov_index].iov_base) +
1193                                                         data->current_position - send_mem_address;
1194             }
1195 
1196             if (remaining >=
1197                 (data->decoded_iov[data->iov_index].iov_len - data->current_position)) {
1198 
1199                 blocklength_proc[block_index] = data->decoded_iov[data->iov_index].iov_len -
1200                                                 data->current_position;
1201                 remaining = remaining -
1202                             (data->decoded_iov[data->iov_index].iov_len - data->current_position);
1203                 data->iov_index = data->iov_index + 1;
1204                 data->current_position = 0;
1205             }
1206             else {
1207                 blocklength_proc[block_index] = remaining;
1208                 data->current_position += remaining;
1209                 remaining = 0;
1210             }
1211         }
1212 
1213         data->total_bytes_written += bytes_sent;
1214         data->bytes_sent = bytes_sent;
1215 
1216         if ( 0 <= block_index ) {
1217             ompi_datatype_create_hindexed(block_index+1,
1218                                           blocklength_proc,
1219                                           displs_proc,
1220                                           MPI_BYTE,
1221                                           &newType);
1222             ompi_datatype_commit(&newType);
1223 
1224             ret = MCA_PML_CALL(isend((char *)send_mem_address,
1225                                      1,
1226                                      newType,
1227                                      aggregator,
1228                                      FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG+index,
1229                                      MCA_PML_BASE_SEND_STANDARD,
1230                                      data->comm,
1231                                      &reqs[data->procs_per_group]));
1232             if ( MPI_DATATYPE_NULL != newType ) {
1233                 ompi_datatype_destroy(&newType);
1234             }
1235             if (OMPI_SUCCESS != ret){
1236                 goto exit;
1237             }
1238         }
1239     }
1240 
1241     
1242 #if DEBUG_ON
1243     if (aggregator == rank){
1244         printf("************Cycle: %d,  Aggregator: %d ***************\n",
1245                index+1,rank);
1246         for (i=0 ; i<global_count/4 ; i++)
1247             printf (" RECV %d \n",((int *)data->global_buf)[i]);
1248     }
1249 #endif
1250     
1251 //#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
1252 //    end_comm_time = MPI_Wtime();
1253 //    comm_time += (end_comm_time - start_comm_time);
1254 //#endif
1255     /**********************************************************
1256      *** 7f. Create the io array, and pass it to fbtl
1257      *********************************************************/
1258     
1259     if (aggregator == rank && entries_per_aggregator>0) {
1260         
1261         
1262         data->io_array = (mca_common_ompio_io_array_t *) malloc
1263             (entries_per_aggregator * sizeof (mca_common_ompio_io_array_t));
1264         if (NULL == data->io_array) {
1265             opal_output(1, "OUT OF MEMORY\n");
1266             ret = OMPI_ERR_OUT_OF_RESOURCE;
1267             goto exit;
1268         }
1269         
1270         data->num_io_entries = 0;
1271         /*First entry for every aggregator*/
1272         data->io_array[0].offset =
1273             (IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_file_offsets[0]].offset;
1274         data->io_array[0].length =
1275             file_offsets_for_agg[sorted_file_offsets[0]].length;
1276         data->io_array[0].memory_address =
1277             data->global_buf+memory_displacements[sorted_file_offsets[0]];
1278         data->num_io_entries++;
1279         
1280         for (i=1;i<entries_per_aggregator;i++){
1281             /* If the enrties are contiguous merge them,
1282                else make a new entry */
1283             if (file_offsets_for_agg[sorted_file_offsets[i-1]].offset +
1284                 file_offsets_for_agg[sorted_file_offsets[i-1]].length ==
1285                 file_offsets_for_agg[sorted_file_offsets[i]].offset){
1286                 data->io_array[data->num_io_entries - 1].length +=
1287                     file_offsets_for_agg[sorted_file_offsets[i]].length;
1288             }
1289             else {
1290                 data->io_array[data->num_io_entries].offset =
1291                     (IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_file_offsets[i]].offset;
1292                 data->io_array[data->num_io_entries].length =
1293                     file_offsets_for_agg[sorted_file_offsets[i]].length;
1294                 data->io_array[data->num_io_entries].memory_address =
1295                     data->global_buf+memory_displacements[sorted_file_offsets[i]];
1296                 data->num_io_entries++;
1297             }
1298             
1299         }
1300         
1301 #if DEBUG_ON
1302         printf("*************************** %d\n", num_of_io_entries);
1303         for (i=0 ; i<num_of_io_entries ; i++) {
1304             printf(" ADDRESS: %p  OFFSET: %ld   LENGTH: %ld\n",
1305                    io_array[i].memory_address,
1306                    (ptrdiff_t)io_array[i].offset,
1307                    io_array[i].length);
1308         }
1309         
1310 #endif
1311     }
1312         
1313 exit:
1314     free(sorted_file_offsets);
1315     free(file_offsets_for_agg);
1316     free(memory_displacements);
1317     free(blocklength_proc);
1318     free(displs_proc);
1319     
1320     return OMPI_SUCCESS;
1321 }
1322     
1323     
1324 
1325 int mca_fcoll_dynamic_gen2_break_file_view ( struct iovec *mem_iov, int mem_count, 
1326                                         struct iovec *file_iov, int file_count, 
1327                                         struct iovec ***ret_broken_mem_iovs, int **ret_broken_mem_counts,
1328                                         struct iovec ***ret_broken_file_iovs, int **ret_broken_file_counts, 
1329                                         MPI_Aint **ret_broken_total_lengths,
1330                                         int stripe_count, int stripe_size)
1331 {
1332     int i, j, ret=OMPI_SUCCESS;
1333     struct iovec **broken_mem_iovs=NULL; 
1334     int *broken_mem_counts=NULL;
1335     struct iovec **broken_file_iovs=NULL; 
1336     int *broken_file_counts=NULL;
1337     MPI_Aint *broken_total_lengths=NULL;
1338     int **block=NULL, **max_lengths=NULL;
1339     
1340     broken_mem_iovs  = (struct iovec **) malloc ( stripe_count * sizeof(struct iovec *)); 
1341     broken_file_iovs = (struct iovec **) malloc ( stripe_count * sizeof(struct iovec *)); 
1342     if ( NULL == broken_mem_iovs || NULL == broken_file_iovs ) {
1343         ret = OMPI_ERR_OUT_OF_RESOURCE;
1344         goto exit;
1345     }
1346     for ( i=0; i<stripe_count; i++ ) {
1347         broken_mem_iovs[i]  = (struct iovec*) calloc (1, sizeof(struct iovec ));
1348         broken_file_iovs[i] = (struct iovec*) calloc (1, sizeof(struct iovec ));
1349     }
1350     
1351     broken_mem_counts    = (int *) calloc ( stripe_count, sizeof(int));
1352     broken_file_counts   = (int *) calloc ( stripe_count, sizeof(int));
1353     broken_total_lengths = (MPI_Aint *) calloc ( stripe_count, sizeof(MPI_Aint));
1354     if ( NULL == broken_mem_counts || NULL == broken_file_counts ||
1355          NULL == broken_total_lengths ) {
1356         ret = OMPI_ERR_OUT_OF_RESOURCE;
1357         goto exit;
1358     }
1359 
1360     block       = (int **) calloc ( stripe_count, sizeof(int *));
1361     max_lengths = (int **) calloc ( stripe_count,  sizeof(int *));
1362     if ( NULL == block || NULL == max_lengths ) {
1363         ret = OMPI_ERR_OUT_OF_RESOURCE;
1364         goto exit;
1365     }
1366     
1367     for ( i=0; i<stripe_count; i++ ){
1368         block[i]       = (int *) malloc ( 5 * sizeof(int));
1369         max_lengths[i] = (int *) malloc ( 2 * sizeof(int));
1370         if ( NULL == block[i] || NULL == max_lengths[i]) {
1371             ret = OMPI_ERR_OUT_OF_RESOURCE;
1372             goto exit;
1373         }
1374         max_lengths[i][0] = 1;
1375         max_lengths[i][1] = 1;
1376         
1377         for ( j=0; j<5; j++ ) {
1378             block[i][j]=2;
1379         }
1380     }
1381     
1382     /* Step 1: separate the local_iov_array per aggregator */
1383     int owner;
1384     size_t rest, len, temp_len, blocklen, memlen=0;
1385     off_t offset, temp_offset, start_offset, memoffset=0;
1386 
1387     i=j=0;
1388 
1389     if ( 0 < mem_count ) {
1390         memoffset = (off_t ) mem_iov[j].iov_base;
1391         memlen    = mem_iov[j].iov_len;
1392     }
1393     while ( i < file_count) {
1394         offset = (off_t) file_iov[i].iov_base;
1395         len    = file_iov[i].iov_len;
1396 
1397 
1398 #if DEBUG_ON
1399         printf("%d:file_iov[%d].base=%ld .len=%d\n", rank, i, 
1400                file_iov[i].iov_base, file_iov[i].iov_len);
1401 #endif
1402         do {
1403             owner        = (offset / stripe_size ) % stripe_count;
1404             start_offset = (offset / stripe_size );
1405             rest         = (start_offset + 1) * stripe_size - offset;
1406 
1407             if ( len >= rest ) {
1408                 blocklen    = rest;
1409                 temp_offset = offset+rest;
1410                 temp_len    = len - rest;
1411             }
1412             else {
1413                 blocklen    = len;
1414                 temp_offset = 0;
1415                 temp_len    = 0;
1416             }
1417             
1418             broken_file_iovs[owner][broken_file_counts[owner]].iov_base = (void *)offset;
1419             broken_file_iovs[owner][broken_file_counts[owner]].iov_len  = blocklen;
1420 #if DEBUG_ON
1421             printf("%d: owner=%d b_file_iovs[%d].base=%ld .len=%d \n", rank, owner, 
1422                    broken_file_counts[owner], 
1423                    broken_file_iovs[owner][broken_file_counts[owner]].iov_base, 
1424                    broken_file_iovs[owner][broken_file_counts[owner]].iov_len );
1425 #endif
1426             do {
1427                 if ( memlen >=  blocklen ) {
1428                     broken_mem_iovs[owner][broken_mem_counts[owner]].iov_base = (void *) memoffset;
1429                     broken_mem_iovs[owner][broken_mem_counts[owner]].iov_len  = blocklen;
1430                     memoffset += blocklen;
1431                     memlen    -= blocklen;
1432                     blocklen   = 0;
1433 
1434                     if ( 0 == memlen ) {
1435                         j++;
1436                         if ( j < mem_count ) {
1437                             memoffset = (off_t) mem_iov[j].iov_base;
1438                             memlen    = mem_iov[j].iov_len;
1439                         }
1440                         else
1441                             break;
1442                     }
1443                 }                
1444                 else {
1445                     broken_mem_iovs[owner][broken_mem_counts[owner]].iov_base = (void *) memoffset;
1446                     broken_mem_iovs[owner][broken_mem_counts[owner]].iov_len  = memlen;
1447                     blocklen -= memlen;
1448                     
1449                     j++;
1450                     if ( j < mem_count ) {
1451                         memoffset = (off_t) mem_iov[j].iov_base;
1452                         memlen    = mem_iov[j].iov_len;
1453                     }
1454                     else 
1455                         break;
1456                 }
1457 #if DEBUG_ON
1458                 printf("%d: owner=%d b_mem_iovs[%d].base=%ld .len=%d\n", rank, owner,
1459                        broken_mem_counts[owner],
1460                        broken_mem_iovs[owner][broken_mem_counts[owner]].iov_base,
1461                        broken_mem_iovs[owner][broken_mem_counts[owner]].iov_len);
1462 #endif
1463 
1464                 broken_mem_counts[owner]++;
1465                 if ( broken_mem_counts[owner] >= max_lengths[owner][0] ) {
1466                     broken_mem_iovs[owner] = (struct iovec*) realloc ( broken_mem_iovs[owner],
1467                                                                        mem_count * block[owner][0] * 
1468                                                                        sizeof(struct iovec ));
1469                     max_lengths[owner][0] = mem_count * block[owner][0];
1470                     block[owner][0]++;
1471                 }
1472 
1473             } while ( blocklen > 0 );
1474 
1475             broken_file_counts[owner]++;
1476             if ( broken_file_counts[owner] >= max_lengths[owner][1] ) {
1477                 broken_file_iovs[owner] = (struct iovec*) realloc ( broken_file_iovs[owner],
1478                                                                     file_count * block[owner][1] * 
1479                                                                     sizeof(struct iovec ));
1480                 max_lengths[owner][1] = file_count * block[owner][1];
1481                 block[owner][1]++;
1482             }
1483 
1484             offset = temp_offset;
1485             len    = temp_len;
1486         } while( temp_len > 0 );
1487 
1488         i++;
1489     } 
1490 
1491     
1492     /* Step 2: recalculating the total lengths per aggregator */
1493     for ( i=0; i< stripe_count; i++ ) {
1494         for ( j=0; j<broken_file_counts[i]; j++ ) {
1495             broken_total_lengths[i] += broken_file_iovs[i][j].iov_len;
1496         }
1497 #if DEBUG_ON
1498         printf("%d: broken_total_lengths[%d] = %d\n", rank, i, broken_total_lengths[i]);
1499 #endif
1500     }
1501 
1502     *ret_broken_mem_iovs      = broken_mem_iovs;
1503     *ret_broken_mem_counts    = broken_mem_counts;
1504     *ret_broken_file_iovs     = broken_file_iovs;
1505     *ret_broken_file_counts   = broken_file_counts;
1506     *ret_broken_total_lengths = broken_total_lengths;    
1507 
1508     if ( NULL != block) {
1509         for ( i=0; i<stripe_count; i++ ){
1510             free (block[i] );
1511         }
1512         free ( block);
1513     }
1514     if ( NULL != max_lengths) {
1515         for ( i=0; i<stripe_count; i++ ){
1516             free (max_lengths[i] );
1517         }
1518         free ( max_lengths);
1519     }
1520 
1521     return ret;
1522 
1523 exit:
1524     free ( broken_mem_iovs);    
1525     free ( broken_mem_counts);
1526     free ( broken_file_iovs );
1527     free ( broken_file_counts);
1528     free ( broken_total_lengths);
1529 
1530     if ( NULL != block) {
1531         for ( i=0; i<stripe_count; i++ ){
1532             free (block[i] );
1533         }
1534         free ( block);
1535     }
1536     if ( NULL != max_lengths) {
1537         for ( i=0; i<stripe_count; i++ ){
1538             free (max_lengths[i] );
1539         }
1540         free ( max_lengths);
1541     }
1542 
1543     *ret_broken_mem_iovs      = NULL;
1544     *ret_broken_mem_counts    = NULL;
1545     *ret_broken_file_iovs     = NULL;
1546     *ret_broken_file_counts   = NULL;
1547     *ret_broken_total_lengths = NULL;
1548 
1549     return ret;
1550 }
1551 
1552 
1553 int mca_fcoll_dynamic_gen2_get_configuration (ompio_file_t *fh, int *dynamic_gen2_num_io_procs, int **ret_aggregators)
1554 {
1555     int *aggregators=NULL;
1556     int num_io_procs = *dynamic_gen2_num_io_procs;
1557     int i;
1558 
1559     if ( num_io_procs < 1 ) {
1560         num_io_procs = fh->f_stripe_count;
1561         if ( num_io_procs < 1 ) {
1562             num_io_procs = 1;
1563         }
1564     }
1565     if ( num_io_procs > fh->f_size ) {
1566         num_io_procs = fh->f_size;
1567     }
1568 
1569     fh->f_procs_per_group = fh->f_size;
1570     fh->f_procs_in_group = (int *) malloc ( sizeof(int) * fh->f_size );
1571     if ( NULL == fh->f_procs_in_group) {
1572         return OMPI_ERR_OUT_OF_RESOURCE;
1573     }
1574     for (i=0; i<fh->f_size; i++ ) {
1575         fh->f_procs_in_group[i]=i;
1576     }
1577 
1578 
1579     aggregators = (int *) malloc ( num_io_procs * sizeof(int));
1580     if ( NULL == aggregators ) {
1581         // fh->procs_in_group will be freed with the fh structure. No need to do it here.
1582         return OMPI_ERR_OUT_OF_RESOURCE;
1583     }
1584     for ( i=0; i<num_io_procs; i++ ) {
1585         aggregators[i] = i * fh->f_size / num_io_procs;
1586     }
1587 
1588     *dynamic_gen2_num_io_procs = num_io_procs;
1589     *ret_aggregators = aggregators;
1590 
1591     return OMPI_SUCCESS;
1592 }    
1593     
1594 
1595 int mca_fcoll_dynamic_gen2_split_iov_array ( ompio_file_t *fh, mca_common_ompio_io_array_t *io_array, int num_entries,
1596                                              int *ret_array_pos, int *ret_pos,  int chunk_size )
1597 {
1598 
1599     int array_pos = *ret_array_pos;
1600     int pos       = *ret_pos;
1601     size_t bytes_written = 0;
1602     size_t bytes_to_write = chunk_size;
1603 
1604     if ( 0 == array_pos && 0 == pos ) {
1605         fh->f_io_array = (mca_common_ompio_io_array_t *) malloc ( num_entries * sizeof(mca_common_ompio_io_array_t));
1606         if ( NULL == fh->f_io_array ){
1607             opal_output (1,"Could not allocate memory\n");
1608             return -1;
1609         }
1610     }
1611         
1612     int i=0;
1613     while (bytes_to_write > 0 ) {
1614         fh->f_io_array[i].memory_address = &(((char *)io_array[array_pos].memory_address)[pos]);
1615         fh->f_io_array[i].offset = &(((char *)io_array[array_pos].offset)[pos]);
1616 
1617         if ( (io_array[array_pos].length - pos ) >= bytes_to_write ) {
1618             fh->f_io_array[i].length = bytes_to_write;
1619         }
1620         else {
1621             fh->f_io_array[i].length = io_array[array_pos].length - pos;
1622         }
1623 
1624         pos           += fh->f_io_array[i].length;
1625         bytes_written += fh->f_io_array[i].length;
1626         bytes_to_write-= fh->f_io_array[i].length;
1627         i++;
1628 
1629         if ( pos == (int)io_array[array_pos].length ) {
1630             pos = 0;
1631             if ((array_pos + 1) < num_entries) {
1632                 array_pos++;
1633             }
1634             else {
1635                 break;
1636             }
1637         }
1638     }
1639     
1640     fh->f_num_of_io_entries = i;
1641     *ret_array_pos   = array_pos;
1642     *ret_pos         = pos;
1643     return bytes_written;
1644 }
1645 
1646     
1647 static int local_heap_sort (mca_io_ompio_local_io_array *io_array,
1648                             int num_entries,
1649                             int *sorted)
1650 {
1651     int i = 0;
1652     int j = 0;
1653     int left = 0;
1654     int right = 0;
1655     int largest = 0;
1656     int heap_size = num_entries - 1;
1657     int temp = 0;
1658     unsigned char done = 0;
1659     int* temp_arr = NULL;
1660 
1661     temp_arr = (int*)malloc(num_entries*sizeof(int));
1662     if (NULL == temp_arr) {
1663         opal_output (1, "OUT OF MEMORY\n");
1664         return OMPI_ERR_OUT_OF_RESOURCE;
1665     }
1666     temp_arr[0] = 0;
1667     for (i = 1; i < num_entries; ++i) {
1668         temp_arr[i] = i;
1669     }
1670     /* num_entries can be a large no. so NO RECURSION */
1671     for (i = num_entries/2-1 ; i>=0 ; i--) {
1672         done = 0;
1673         j = i;
1674         largest = j;
1675 
1676         while (!done) {
1677             left = j*2+1;
1678             right = j*2+2;
1679             if ((left <= heap_size) &&
1680                 (io_array[temp_arr[left]].offset > io_array[temp_arr[j]].offset)) {
1681                 largest = left;
1682             }
1683             else {
1684                 largest = j;
1685             }
1686             if ((right <= heap_size) &&
1687                 (io_array[temp_arr[right]].offset >
1688                  io_array[temp_arr[largest]].offset)) {
1689                 largest = right;
1690             }
1691             if (largest != j) {
1692                 temp = temp_arr[largest];
1693                 temp_arr[largest] = temp_arr[j];
1694                 temp_arr[j] = temp;
1695                 j = largest;
1696             }
1697             else {
1698                 done = 1;
1699             }
1700         }
1701     }
1702 
1703     for (i = num_entries-1; i >=1; --i) {
1704         temp = temp_arr[0];
1705         temp_arr[0] = temp_arr[i];
1706         temp_arr[i] = temp;
1707         heap_size--;
1708         done = 0;
1709         j = 0;
1710         largest = j;
1711 
1712         while (!done) {
1713             left =  j*2+1;
1714             right = j*2+2;
1715 
1716             if ((left <= heap_size) &&
1717                 (io_array[temp_arr[left]].offset >
1718                  io_array[temp_arr[j]].offset)) {
1719                 largest = left;
1720             }
1721             else {
1722                 largest = j;
1723             }
1724             if ((right <= heap_size) &&
1725                 (io_array[temp_arr[right]].offset >
1726                  io_array[temp_arr[largest]].offset)) {
1727                 largest = right;
1728             }
1729             if (largest != j) {
1730                 temp = temp_arr[largest];
1731                 temp_arr[largest] = temp_arr[j];
1732                 temp_arr[j] = temp;
1733                 j = largest;
1734             }
1735             else {
1736                 done = 1;
1737             }
1738         }
1739         sorted[i] = temp_arr[i];
1740     }
1741     sorted[0] = temp_arr[0];
1742 
1743     if (NULL != temp_arr) {
1744         free(temp_arr);
1745         temp_arr = NULL;
1746     }
1747     return OMPI_SUCCESS;
1748 }
1749 

/* [<][>][^][v][top][bottom][index][help] */