root/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_fcoll_vulcan_file_write_all
  2. write_init
  3. shuffle_init
  4. mca_fcoll_vulcan_minmax
  5. mca_fcoll_vulcan_break_file_view
  6. mca_fcoll_vulcan_get_configuration
  7. mca_fcoll_vulcan_split_iov_array
  8. local_heap_sort

   1 /*
   2  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2017 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2008-2018 University of Houston. All rights reserved.
  13  * Copyright (c) 2015-2018 Research Organization for Information Science
  14  *                         and Technology (RIST). All rights reserved.
  15  * $COPYRIGHT$
  16  *
  17  * Additional copyrights may follow
  18  *
  19  * $HEADER$
  20  */
  21 
  22 #include "ompi_config.h"
  23 #include "fcoll_vulcan.h"
  24 
  25 #include "mpi.h"
  26 #include "ompi/constants.h"
  27 #include "ompi/mca/fcoll/fcoll.h"
  28 #include "ompi/mca/fcoll/base/fcoll_base_coll_array.h"
  29 #include "ompi/mca/common/ompio/common_ompio.h"
  30 #include "ompi/mca/io/io.h"
  31 #include "ompi/mca/common/ompio/common_ompio_request.h"
  32 #include "math.h"
  33 #include "ompi/mca/pml/pml.h"
  34 #include <unistd.h>
  35 
  36 #define DEBUG_ON 0
  37 #define FCOLL_VULCAN_SHUFFLE_TAG   123
  38 #define INIT_LEN 10
  39 #define NOT_AGGR_INDEX -1
  40 
  41 /*Used for loading file-offsets per aggregator*/
  42 typedef struct mca_io_ompio_local_io_array{
  43     OMPI_MPI_OFFSET_TYPE offset;
  44     MPI_Aint             length;
  45     int                  process_id;
  46 }mca_io_ompio_local_io_array;
  47 
  48 typedef struct mca_io_ompio_aggregator_data {
  49     int *disp_index, *sorted, *fview_count, n;
  50     int *max_disp_index;
  51     int **blocklen_per_process;
  52     MPI_Aint **displs_per_process, total_bytes, bytes_per_cycle, total_bytes_written;
  53     MPI_Comm comm;
  54     char *buf, *global_buf, *prev_global_buf;
  55     ompi_datatype_t **recvtype, **prev_recvtype;
  56     struct iovec *global_iov_array;
  57     int current_index, current_position;
  58     int bytes_to_write_in_cycle, bytes_remaining, procs_per_group;    
  59     int *procs_in_group, iov_index;
  60     int bytes_sent, prev_bytes_sent;
  61     struct iovec *decoded_iov;
  62     int bytes_to_write, prev_bytes_to_write;
  63     mca_common_ompio_io_array_t *io_array, *prev_io_array;
  64     int num_io_entries, prev_num_io_entries;
  65 } mca_io_ompio_aggregator_data;
  66 
  67 
  68 #define SWAP_REQUESTS(_r1,_r2) { \
  69     ompi_request_t **_t=_r1;     \
  70     _r1=_r2;                     \
  71     _r2=_t;}
  72 
  73 #define SWAP_AGGR_POINTERS(_aggr,_num) {                        \
  74     int _i;                                                     \
  75     char *_t;                                                   \
  76     for (_i=0; _i<_num; _i++ ) {                                \
  77         _aggr[_i]->prev_io_array=_aggr[_i]->io_array;             \
  78         _aggr[_i]->prev_num_io_entries=_aggr[_i]->num_io_entries; \
  79         _aggr[_i]->prev_bytes_sent=_aggr[_i]->bytes_sent;         \
  80         _aggr[_i]->prev_bytes_to_write=_aggr[_i]->bytes_to_write; \
  81         _t=_aggr[_i]->prev_global_buf;                            \
  82         _aggr[_i]->prev_global_buf=_aggr[_i]->global_buf;         \
  83         _aggr[_i]->global_buf=_t;                                 \
  84         _t=(char *)_aggr[_i]->recvtype;                           \
  85         _aggr[_i]->recvtype=_aggr[_i]->prev_recvtype;             \
  86         _aggr[_i]->prev_recvtype=(ompi_datatype_t **)_t;          }                                                             \
  87 }
  88 
  89 
  90 
  91 static int shuffle_init ( int index, int cycles, int aggregator, int rank, 
  92                           mca_io_ompio_aggregator_data *data, 
  93                           ompi_request_t **reqs );
  94 static int write_init (ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data,
  95                         int write_chunksize, int write_synchType, ompi_request_t **request);
  96 int mca_fcoll_vulcan_break_file_view ( struct iovec *decoded_iov, int iov_count, 
  97                                         struct iovec *local_iov_array, int local_count, 
  98                                         struct iovec ***broken_decoded_iovs, int **broken_iov_counts,
  99                                         struct iovec ***broken_iov_arrays, int **broken_counts, 
 100                                         MPI_Aint **broken_total_lengths,
 101                                         int stripe_count, size_t stripe_size); 
 102 
 103 
 104 int mca_fcoll_vulcan_get_configuration (ompio_file_t *fh, int num_io_procs, 
 105                                         int num_groups, size_t max_data);
 106 
 107 
 108 static int local_heap_sort (mca_io_ompio_local_io_array *io_array,
 109                             int num_entries,
 110                             int *sorted);
 111 
 112 int mca_fcoll_vulcan_split_iov_array ( ompio_file_t *fh, mca_common_ompio_io_array_t *work_array,
 113                                              int num_entries, int *last_array_pos, int *last_pos_in_field,
 114                                              int chunk_size );
 115 
 116 
 117 static int mca_fcoll_vulcan_minmax ( ompio_file_t *fh, struct iovec *iov, int iov_count,  int num_aggregators, 
 118                                      long *new_stripe_size);
 119 
 120 
 121 int mca_fcoll_vulcan_file_write_all (ompio_file_t *fh,
 122                                       const void *buf,
 123                                       int count,
 124                                       struct ompi_datatype_t *datatype,
 125                                       ompi_status_public_t *status)
 126 {
 127     int index = 0;
 128     int cycles = 0;
 129     int ret =0, l, i, j, bytes_per_cycle;
 130     uint32_t iov_count = 0;
 131     struct iovec *decoded_iov = NULL;
 132     struct iovec *local_iov_array=NULL;
 133     uint32_t total_fview_count = 0;
 134     int local_count = 0;
 135     ompi_request_t **reqs = NULL;
 136     ompi_request_t *req_iwrite = MPI_REQUEST_NULL;
 137     mca_io_ompio_aggregator_data **aggr_data=NULL;
 138     
 139     int *displs = NULL;
 140     int vulcan_num_io_procs;
 141     size_t max_data = 0;
 142     MPI_Aint *total_bytes_per_process = NULL;
 143     
 144     struct iovec **broken_iov_arrays=NULL;
 145     struct iovec **broken_decoded_iovs=NULL;
 146     int *broken_counts=NULL;
 147     int *broken_iov_counts=NULL;
 148     MPI_Aint *broken_total_lengths=NULL;
 149 
 150     int aggr_index = NOT_AGGR_INDEX;
 151     int write_synch_type = 2;
 152     int write_chunksize, *result_counts=NULL;
 153     
 154 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 155     double write_time = 0.0, start_write_time = 0.0, end_write_time = 0.0;
 156     double comm_time = 0.0, start_comm_time = 0.0, end_comm_time = 0.0;
 157     double exch_write = 0.0, start_exch = 0.0, end_exch = 0.0;
 158     mca_common_ompio_print_entry nentry;
 159 #endif
 160     
 161     
 162     /**************************************************************************
 163      ** 1.  In case the data is not contigous in memory, decode it into an iovec
 164      **************************************************************************/
 165     vulcan_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators"));
 166     if ( OMPI_ERR_MAX == vulcan_num_io_procs ) {
 167         ret = OMPI_ERROR;
 168         goto exit;
 169     }
 170     bytes_per_cycle = fh->f_bytes_per_agg;
 171 
 172     if( (1 == mca_fcoll_vulcan_async_io) && (NULL == fh->f_fbtl->fbtl_ipwritev) ) {
 173         opal_output (1, "vulcan_write_all: fbtl Does NOT support ipwritev() (asynchrounous write) \n");
 174         ret = MPI_ERR_UNSUPPORTED_OPERATION;
 175         goto exit;
 176     }
 177 
 178     /* since we want to overlap 2 iterations, define the bytes_per_cycle to be half of what
 179        the user requested */
 180     bytes_per_cycle =bytes_per_cycle/2;
 181     write_chunksize = bytes_per_cycle;
 182     
 183     ret =   mca_common_ompio_decode_datatype ((struct ompio_file_t *) fh,
 184                                               datatype,
 185                                               count,
 186                                               buf,
 187                                               &max_data,
 188                                               fh->f_mem_convertor,
 189                                               &decoded_iov,
 190                                               &iov_count);
 191     if (OMPI_SUCCESS != ret ){
 192         goto exit;
 193     }
 194     
 195     if ( MPI_STATUS_IGNORE != status ) {
 196         status->_ucount = max_data;
 197     }
 198     
 199     
 200     ret = mca_fcoll_vulcan_get_configuration (fh, vulcan_num_io_procs, mca_fcoll_vulcan_num_groups, max_data);
 201     if (OMPI_SUCCESS != ret){
 202         goto exit;
 203     }
 204 
 205     aggr_data = (mca_io_ompio_aggregator_data **) malloc ( fh->f_num_aggrs * 
 206                                                            sizeof(mca_io_ompio_aggregator_data*));
 207     
 208     for ( i=0; i< fh->f_num_aggrs; i++ ) {
 209         // At this point we know the number of aggregators. If there is a correlation between
 210         // number of aggregators and number of IO nodes, we know how many aggr_data arrays we need
 211         // to allocate.
 212         aggr_data[i] = (mca_io_ompio_aggregator_data *) calloc ( 1, sizeof(mca_io_ompio_aggregator_data));
 213         aggr_data[i]->procs_per_group = fh->f_procs_per_group;
 214         aggr_data[i]->procs_in_group  = fh->f_procs_in_group;
 215         aggr_data[i]->comm = fh->f_comm;
 216         aggr_data[i]->buf  = (char *)buf;             // should not be used in the new version.
 217         // Identify if the process is an aggregator.
 218         // If so, aggr_index would be its index in "aggr_data" and "aggregators" arrays.
 219         if(fh->f_aggr_list[i] == fh->f_rank) {
 220             aggr_index = i;
 221         }
 222     }
 223     
 224     /*********************************************************************
 225      *** 2. Generate the local offsets/lengths array corresponding to
 226      ***    this write operation
 227      ********************************************************************/
 228     ret = fh->f_generate_current_file_view( (struct ompio_file_t *) fh,
 229                                             max_data,
 230                                             &local_iov_array,
 231                                             &local_count);
 232     if (ret != OMPI_SUCCESS){
 233         goto exit;
 234     }
 235     
 236     /*************************************************************************
 237      ** 2b. Separate the local_iov_array entries based on the number of aggregators
 238      *************************************************************************/
 239     // Modifications for the even distribution:
 240     long domain_size;
 241     ret = mca_fcoll_vulcan_minmax ( fh, local_iov_array, local_count,  fh->f_num_aggrs, &domain_size);
 242     
 243     // broken_iov_arrays[0] contains broken_counts[0] entries to aggregator 0,
 244     // broken_iov_arrays[1] contains broken_counts[1] entries to aggregator 1, etc.
 245     ret = mca_fcoll_vulcan_break_file_view ( decoded_iov, iov_count, 
 246                                               local_iov_array, local_count, 
 247                                               &broken_decoded_iovs, &broken_iov_counts,
 248                                               &broken_iov_arrays, &broken_counts, 
 249                                               &broken_total_lengths,
 250                                               fh->f_num_aggrs, domain_size); 
 251 
 252 
 253     /**************************************************************************
 254      ** 3. Determine the total amount of data to be written and no. of cycles
 255      **************************************************************************/
 256 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 257     start_comm_time = MPI_Wtime();
 258 #endif
 259     if ( 1 == mca_fcoll_vulcan_num_groups ) {
 260         ret = fh->f_comm->c_coll->coll_allreduce (MPI_IN_PLACE,
 261                                                   broken_total_lengths,
 262                                                   fh->f_num_aggrs,
 263                                                   MPI_LONG,
 264                                                   MPI_SUM,
 265                                                   fh->f_comm,
 266                                                   fh->f_comm->c_coll->coll_allreduce_module);
 267         if( OMPI_SUCCESS != ret){
 268             goto exit;
 269         }
 270 
 271     }
 272     else {
 273         total_bytes_per_process = (MPI_Aint*)malloc
 274             (fh->f_num_aggrs * fh->f_procs_per_group*sizeof(MPI_Aint));
 275         if (NULL == total_bytes_per_process) {
 276             opal_output (1, "OUT OF MEMORY\n");
 277             ret = OMPI_ERR_OUT_OF_RESOURCE;
 278             goto exit;
 279         }
 280     
 281         ret = ompi_fcoll_base_coll_allgather_array (broken_total_lengths,
 282                                                     fh->f_num_aggrs,
 283                                                     MPI_LONG,
 284                                                     total_bytes_per_process,
 285                                                     fh->f_num_aggrs,
 286                                                     MPI_LONG,
 287                                                     0,
 288                                                     fh->f_procs_in_group,
 289                                                     fh->f_procs_per_group,
 290                                                     fh->f_comm);
 291         if( OMPI_SUCCESS != ret){
 292             goto exit;
 293         }
 294 
 295         for ( i=0; i<fh->f_num_aggrs; i++ ) {
 296             broken_total_lengths[i] = 0;
 297             for (j=0 ; j<fh->f_procs_per_group ; j++) {
 298                 broken_total_lengths[i] += total_bytes_per_process[j*fh->f_num_aggrs + i];
 299             }
 300         }
 301         if (NULL != total_bytes_per_process) {
 302             free (total_bytes_per_process);
 303             total_bytes_per_process = NULL;
 304         }    
 305     }
 306     
 307 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 308     end_comm_time = MPI_Wtime();
 309     comm_time += (end_comm_time - start_comm_time);
 310 #endif
 311     
 312     cycles=0;
 313     for ( i=0; i<fh->f_num_aggrs; i++ ) {
 314 #if DEBUG_ON
 315         printf("%d: Overall broken_total_lengths[%d] = %ld\n", fh->f_rank, i, broken_total_lengths[i]);
 316 #endif
 317         if ( ceil((double)broken_total_lengths[i]/bytes_per_cycle) > cycles ) {
 318             cycles = ceil((double)broken_total_lengths[i]/bytes_per_cycle);
 319         }
 320     }
 321     
 322     result_counts = (int *) malloc ( fh->f_num_aggrs * fh->f_procs_per_group * sizeof(int) );
 323     if ( NULL == result_counts ) {
 324         ret = OMPI_ERR_OUT_OF_RESOURCE;
 325         goto exit;
 326     }
 327     
 328 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 329     start_comm_time = MPI_Wtime();
 330 #endif
 331     if ( 1 == mca_fcoll_vulcan_num_groups ) {
 332         ret = fh->f_comm->c_coll->coll_allgather(broken_counts,
 333                                                  fh->f_num_aggrs,
 334                                                  MPI_INT,
 335                                                  result_counts,
 336                                                  fh->f_num_aggrs,
 337                                                  MPI_INT,
 338                                                  fh->f_comm,
 339                                                  fh->f_comm->c_coll->coll_allgather_module);            
 340     }
 341     else {
 342         ret = ompi_fcoll_base_coll_allgather_array (broken_counts,
 343                                                     fh->f_num_aggrs,
 344                                                     MPI_INT,
 345                                                     result_counts,
 346                                                     fh->f_num_aggrs,
 347                                                     MPI_INT,
 348                                                     0,
 349                                                     fh->f_procs_in_group,
 350                                                     fh->f_procs_per_group,
 351                                                     fh->f_comm);
 352     }
 353     if( OMPI_SUCCESS != ret){
 354         goto exit;
 355     }
 356 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 357     end_comm_time = MPI_Wtime();
 358     comm_time += (end_comm_time - start_comm_time);
 359 #endif
 360     
 361     /*************************************************************
 362      *** 4. Allgather the offset/lengths array from all processes
 363      *************************************************************/
 364     for ( i=0; i< fh->f_num_aggrs; i++ ) {
 365         aggr_data[i]->total_bytes = broken_total_lengths[i];
 366         aggr_data[i]->decoded_iov = broken_decoded_iovs[i];
 367         aggr_data[i]->fview_count = (int *) malloc (fh->f_procs_per_group * sizeof (int));
 368         if (NULL == aggr_data[i]->fview_count) {
 369             opal_output (1, "OUT OF MEMORY\n");
 370             ret = OMPI_ERR_OUT_OF_RESOURCE;
 371             goto exit;
 372         }
 373         for ( j=0; j <fh->f_procs_per_group; j++ ) {
 374             aggr_data[i]->fview_count[j] = result_counts[fh->f_num_aggrs*j+i];
 375         }
 376         displs = (int*) malloc (fh->f_procs_per_group * sizeof (int));
 377         if (NULL == displs) {
 378             opal_output (1, "OUT OF MEMORY\n");
 379             ret = OMPI_ERR_OUT_OF_RESOURCE;
 380             goto exit;
 381         }
 382         
 383         displs[0] = 0;
 384         total_fview_count = aggr_data[i]->fview_count[0];
 385         for (j=1 ; j<fh->f_procs_per_group ; j++) {
 386             total_fview_count += aggr_data[i]->fview_count[j];
 387             displs[j] = displs[j-1] + aggr_data[i]->fview_count[j-1];
 388         }
 389         
 390 #if DEBUG_ON
 391         printf("total_fview_count : %d\n", total_fview_count);
 392         if (fh->f_aggr_list[i] == fh->f_rank) {
 393             for (j=0 ; j<fh->f_procs_per_group ; i++) {
 394                 printf ("%d: PROCESS: %d  ELEMENTS: %d  DISPLS: %d\n",
 395                         fh->f_rank,
 396                         j,
 397                         aggr_data[i]->fview_count[j],
 398                         displs[j]);
 399             }
 400         }
 401 #endif
 402     
 403         /* allocate the global iovec  */
 404         if (0 != total_fview_count) {
 405             aggr_data[i]->global_iov_array = (struct iovec*) malloc (total_fview_count *
 406                                                                      sizeof(struct iovec));
 407             if (NULL == aggr_data[i]->global_iov_array){
 408                 opal_output(1, "OUT OF MEMORY\n");
 409                 ret = OMPI_ERR_OUT_OF_RESOURCE;
 410                 goto exit;
 411             }            
 412         }
 413     
 414 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 415         start_comm_time = MPI_Wtime();
 416 #endif
 417         if ( 1 == mca_fcoll_vulcan_num_groups ) {
 418             ret = fh->f_comm->c_coll->coll_allgatherv (broken_iov_arrays[i],
 419                                                       broken_counts[i],
 420                                                       fh->f_iov_type,
 421                                                       aggr_data[i]->global_iov_array,
 422                                                       aggr_data[i]->fview_count,
 423                                                       displs,
 424                                                       fh->f_iov_type,
 425                                                       fh->f_comm,
 426                                                       fh->f_comm->c_coll->coll_allgatherv_module );
 427         }
 428         else {
 429             ret = ompi_fcoll_base_coll_allgatherv_array (broken_iov_arrays[i],
 430                                                          broken_counts[i],
 431                                                          fh->f_iov_type,
 432                                                          aggr_data[i]->global_iov_array,
 433                                                          aggr_data[i]->fview_count,
 434                                                          displs,
 435                                                          fh->f_iov_type,
 436                                                          fh->f_aggr_list[i],
 437                                                          fh->f_procs_in_group,
 438                                                          fh->f_procs_per_group,
 439                                                          fh->f_comm);
 440         }
 441         if (OMPI_SUCCESS != ret){
 442             goto exit;
 443         }
 444 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 445         end_comm_time = MPI_Wtime();
 446         comm_time += (end_comm_time - start_comm_time);
 447 #endif
 448         
 449         /****************************************************************************************
 450          *** 5. Sort the global offset/lengths list based on the offsets.
 451          *** The result of the sort operation is the 'sorted', an integer array,
 452          *** which contains the indexes of the global_iov_array based on the offset.
 453          *** For example, if global_iov_array[x].offset is followed by global_iov_array[y].offset
 454          *** in the file, and that one is followed by global_iov_array[z].offset, than
 455          *** sorted[0] = x, sorted[1]=y and sorted[2]=z;
 456          ******************************************************************************************/
 457         if (0 != total_fview_count) {
 458             aggr_data[i]->sorted = (int *)malloc (total_fview_count * sizeof(int));
 459             if (NULL == aggr_data[i]->sorted) {
 460                 opal_output (1, "OUT OF MEMORY\n");
 461                 ret = OMPI_ERR_OUT_OF_RESOURCE;
 462                 goto exit;
 463             }
 464             ompi_fcoll_base_sort_iovec (aggr_data[i]->global_iov_array, total_fview_count, aggr_data[i]->sorted);
 465         }
 466         
 467         if (NULL != local_iov_array){
 468             free(local_iov_array);
 469             local_iov_array = NULL;
 470         }
 471         
 472         if (NULL != displs){
 473             free(displs);
 474             displs=NULL;
 475         }
 476     
 477     
 478 #if DEBUG_ON
 479         if (fh->f_aggr_list[i] == fh->f_rank) {
 480             uint32_t tv=0;
 481             for (tv=0 ; tv<total_fview_count ; tv++) {
 482                 printf("%d: OFFSET: %lld   LENGTH: %ld\n",
 483                        fh->f_rank,
 484                        aggr_data[i]->global_iov_array[aggr_data[i]->sorted[tv]].iov_base,
 485                        aggr_data[i]->global_iov_array[aggr_data[i]->sorted[tv]].iov_len);
 486             }
 487         }
 488 #endif
 489         /*************************************************************
 490          *** 6. Determine the number of cycles required to execute this
 491          ***    operation
 492          *************************************************************/
 493         
 494         aggr_data[i]->bytes_per_cycle = bytes_per_cycle;
 495     
 496         if (fh->f_aggr_list[i] == fh->f_rank) {
 497             aggr_data[i]->disp_index = (int *)malloc (fh->f_procs_per_group * sizeof (int));
 498             if (NULL == aggr_data[i]->disp_index) {
 499                 opal_output (1, "OUT OF MEMORY\n");
 500                 ret = OMPI_ERR_OUT_OF_RESOURCE;
 501                 goto exit;
 502             }
 503 
 504             aggr_data[i]->max_disp_index = (int *)calloc (fh->f_procs_per_group,  sizeof (int));
 505             if (NULL == aggr_data[i]->max_disp_index) {
 506                 opal_output (1, "OUT OF MEMORY\n");
 507                 ret = OMPI_ERR_OUT_OF_RESOURCE;
 508                 goto exit;
 509             }
 510         
 511             aggr_data[i]->blocklen_per_process = (int **)calloc (fh->f_procs_per_group, sizeof (int*));
 512             if (NULL == aggr_data[i]->blocklen_per_process) {
 513                 opal_output (1, "OUT OF MEMORY\n");
 514                 ret = OMPI_ERR_OUT_OF_RESOURCE;
 515                 goto exit;
 516             }
 517         
 518             aggr_data[i]->displs_per_process = (MPI_Aint **)calloc (fh->f_procs_per_group, sizeof (MPI_Aint*));
 519             if (NULL == aggr_data[i]->displs_per_process) {
 520                 opal_output (1, "OUT OF MEMORY\n");
 521                 ret = OMPI_ERR_OUT_OF_RESOURCE;
 522                 goto exit;
 523             }
 524         
 525             
 526             aggr_data[i]->global_buf       = (char *) malloc (bytes_per_cycle);
 527             aggr_data[i]->prev_global_buf  = (char *) malloc (bytes_per_cycle);
 528             if (NULL == aggr_data[i]->global_buf || NULL == aggr_data[i]->prev_global_buf){
 529                 opal_output(1, "OUT OF MEMORY");
 530                 ret = OMPI_ERR_OUT_OF_RESOURCE;
 531                 goto exit;
 532             }
 533         
 534             aggr_data[i]->recvtype = (ompi_datatype_t **) malloc (fh->f_procs_per_group  * 
 535                                                                   sizeof(ompi_datatype_t *));
 536             aggr_data[i]->prev_recvtype = (ompi_datatype_t **) malloc (fh->f_procs_per_group  * 
 537                                                                        sizeof(ompi_datatype_t *));
 538             if (NULL == aggr_data[i]->recvtype || NULL == aggr_data[i]->prev_recvtype) {
 539                 opal_output (1, "OUT OF MEMORY\n");
 540                 ret = OMPI_ERR_OUT_OF_RESOURCE;
 541                 goto exit;
 542             }
 543             for(l=0;l<fh->f_procs_per_group;l++){
 544                 aggr_data[i]->recvtype[l]      = MPI_DATATYPE_NULL;
 545                 aggr_data[i]->prev_recvtype[l] = MPI_DATATYPE_NULL;
 546             }
 547         }
 548     
 549 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 550         start_exch = MPI_Wtime();
 551 #endif
 552     }    
 553 
 554     reqs = (ompi_request_t **)malloc ((fh->f_procs_per_group + 1 )*fh->f_num_aggrs *sizeof(ompi_request_t *));
 555 
 556     if ( NULL == reqs ) {
 557         opal_output (1, "OUT OF MEMORY\n");
 558         ret = OMPI_ERR_OUT_OF_RESOURCE;
 559         goto exit;
 560     }
 561 
 562     for (l=0,i=0; i < fh->f_num_aggrs; i++ ) {
 563         for ( j=0; j< (fh->f_procs_per_group+1); j++ ) {
 564             reqs[l] = MPI_REQUEST_NULL;
 565             l++;
 566         }
 567     }
 568 
 569     // In fact it should be: if ((1 == mca_fcoll_vulcan_async_io) && (NULL != fh->f_fbtl->fbtl_ipwritev))
 570     // But we've already tested that.
 571     if( (1 == mca_fcoll_vulcan_async_io) ||
 572         ( (0 == mca_fcoll_vulcan_async_io) && (NULL != fh->f_fbtl->fbtl_ipwritev) && (2 < cycles) ) ) {
 573         write_synch_type = 1;
 574     }
 575 
 576     if ( cycles > 0 ) {
 577         for ( i=0; i<fh->f_num_aggrs; i++ ) {
 578             ret = shuffle_init ( 0, cycles, fh->f_aggr_list[i], fh->f_rank, aggr_data[i],
 579                                  &reqs[i*(fh->f_procs_per_group + 1)] );
 580             if ( OMPI_SUCCESS != ret ) {
 581                 goto exit;
 582             }
 583         }
 584         // Register progress function that should be used by ompi_request_wait
 585         if (NOT_AGGR_INDEX != aggr_index)  {
 586             mca_common_ompio_register_progress ();
 587         }
 588     }
 589 
 590     ret = ompi_request_wait_all ( (fh->f_procs_per_group + 1 )*fh->f_num_aggrs,
 591                                   reqs, MPI_STATUS_IGNORE);
 592 
 593     for (index = 1; index < cycles; index++) {
 594         SWAP_AGGR_POINTERS(aggr_data, fh->f_num_aggrs);
 595 
 596         if(NOT_AGGR_INDEX != aggr_index) {
 597 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 598             start_write_time = MPI_Wtime();
 599 #endif
 600             ret = write_init (fh, fh->f_aggr_list[aggr_index], aggr_data[aggr_index],
 601                               write_chunksize, write_synch_type, &req_iwrite);
 602             if (OMPI_SUCCESS != ret){
 603                 goto exit;
 604             }
 605 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 606             end_write_time = MPI_Wtime();
 607             write_time += end_write_time - start_write_time;
 608 #endif
 609         }
 610 
 611         for ( i=0; i<fh->f_num_aggrs; i++ ) {
 612             ret = shuffle_init ( index, cycles, fh->f_aggr_list[i], fh->f_rank, aggr_data[i],
 613                                  &reqs[i*(fh->f_procs_per_group + 1)] );
 614             if ( OMPI_SUCCESS != ret ) {
 615                 goto exit;
 616             }
 617         }
 618 
 619         ret = ompi_request_wait_all ( (fh->f_procs_per_group + 1 )*fh->f_num_aggrs,
 620                                       reqs, MPI_STATUS_IGNORE);
 621         if (OMPI_SUCCESS != ret){
 622             goto exit;
 623         }
 624 
 625         if(NOT_AGGR_INDEX != aggr_index) {
 626             ret = ompi_request_wait(&req_iwrite, MPI_STATUS_IGNORE);
 627             if (OMPI_SUCCESS != ret){
 628                 goto exit;
 629             }
 630         }
 631     } /* end  for (index = 0; index < cycles; index++) */
 632 
 633     if ( cycles > 0 ) {
 634         SWAP_AGGR_POINTERS(aggr_data,fh->f_num_aggrs);
 635 
 636         if(NOT_AGGR_INDEX != aggr_index) {
 637 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 638             start_write_time = MPI_Wtime();
 639 #endif
 640             ret = write_init (fh, fh->f_aggr_list[aggr_index], aggr_data[aggr_index],
 641                               write_chunksize, write_synch_type, &req_iwrite);
 642             if (OMPI_SUCCESS != ret){
 643                 goto exit;
 644             }
 645 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 646             end_write_time = MPI_Wtime();
 647             write_time += end_write_time - start_write_time;
 648 #endif
 649         }
 650 
 651         if(NOT_AGGR_INDEX != aggr_index) {
 652             ret = ompi_request_wait(&req_iwrite, MPI_STATUS_IGNORE);
 653             if (OMPI_SUCCESS != ret){
 654                 goto exit;
 655             }
 656         }
 657     }
 658         
 659 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 660     end_exch = MPI_Wtime();
 661     exch_write += end_exch - start_exch;
 662     nentry.time[0] = write_time;
 663     nentry.time[1] = comm_time;
 664     nentry.time[2] = exch_write;
 665     nentry.aggregator = 0;
 666     for ( i=0; i<fh->f_num_aggrs; i++ ) {
 667         if (fh->f_aggr_list[i] == fh->f_rank)
 668         nentry.aggregator = 1;
 669     }
 670     nentry.nprocs_for_coll = fh->f_num_aggrs;
 671     if (!mca_common_ompio_full_print_queue(fh->f_coll_write_time)){
 672         mca_common_ompio_register_print_entry(fh->f_coll_write_time,
 673                                                nentry);
 674     }
 675 #endif
 676     
 677     
 678 exit :
 679     
 680     if ( NULL != aggr_data ) {
 681         
 682         for ( i=0; i< fh->f_num_aggrs; i++ ) {            
 683             if (fh->f_aggr_list[i] == fh->f_rank) {
 684                 if (NULL != aggr_data[i]->recvtype){
 685                     for (j =0; j< aggr_data[i]->procs_per_group; j++) {
 686                         if ( MPI_DATATYPE_NULL != aggr_data[i]->recvtype[j] ) {
 687                             ompi_datatype_destroy(&aggr_data[i]->recvtype[j]);
 688                         }
 689                         if ( MPI_DATATYPE_NULL != aggr_data[i]->prev_recvtype[j] ) {
 690                             ompi_datatype_destroy(&aggr_data[i]->prev_recvtype[j]);
 691                         }
 692                         
 693                     }
 694                     free(aggr_data[i]->recvtype);
 695                     free(aggr_data[i]->prev_recvtype);
 696                 }
 697                 
 698                 free (aggr_data[i]->disp_index);
 699                 free (aggr_data[i]->max_disp_index);
 700                 free (aggr_data[i]->global_buf);
 701                 free (aggr_data[i]->prev_global_buf);
 702                 for(l=0;l<aggr_data[i]->procs_per_group;l++){
 703                     free (aggr_data[i]->blocklen_per_process[l]);
 704                     free (aggr_data[i]->displs_per_process[l]);
 705                 }
 706                 
 707                 free (aggr_data[i]->blocklen_per_process);
 708                 free (aggr_data[i]->displs_per_process);
 709             }
 710             free (aggr_data[i]->sorted);
 711             free (aggr_data[i]->global_iov_array);
 712             free (aggr_data[i]->fview_count);
 713             free (aggr_data[i]->decoded_iov);
 714             
 715             free (aggr_data[i]);
 716         }
 717         free (aggr_data);
 718     }
 719     free(displs);
 720     free(decoded_iov);
 721     free(broken_counts);
 722     free(broken_total_lengths);
 723     free(broken_iov_counts);
 724     free(broken_decoded_iovs); // decoded_iov arrays[i] were freed as aggr_data[i]->decoded_iov;
 725     if ( NULL != broken_iov_arrays ) {
 726         for (i=0; i<fh->f_num_aggrs; i++ ) {
 727             free(broken_iov_arrays[i]);
 728         }
 729     }
 730     free(broken_iov_arrays);
 731     free(fh->f_procs_in_group);
 732     fh->f_procs_in_group=NULL;
 733     fh->f_procs_per_group=0;
 734     free(result_counts);
 735     free(reqs);
 736      
 737     return OMPI_SUCCESS;
 738 }
 739 
 740 static int write_init (ompio_file_t *fh,
 741                        int aggregator,
 742                        mca_io_ompio_aggregator_data *aggr_data,
 743                        int write_chunksize,
 744                        int write_synchType,
 745                        ompi_request_t **request )
 746 {
 747     int ret = OMPI_SUCCESS;
 748     ssize_t ret_temp = 0;
 749     int last_array_pos = 0;
 750     int last_pos = 0;
 751     mca_ompio_request_t *ompio_req = NULL;
 752 
 753     mca_common_ompio_request_alloc ( &ompio_req, MCA_OMPIO_REQUEST_WRITE );
 754 
 755     if (aggr_data->prev_num_io_entries) {
 756         /*  In this case, aggr_data->prev_num_io_entries is always == 1.
 757             Therefore we can write the data of size aggr_data->prev_bytes_to_write in one iteration.
 758             In fact, aggr_data->prev_bytes_to_write <= write_chunksize.
 759         */
 760         mca_fcoll_vulcan_split_iov_array (fh, aggr_data->prev_io_array,
 761                                           aggr_data->prev_num_io_entries,
 762                                           &last_array_pos, &last_pos,
 763                                           write_chunksize);
 764 
 765         if (1 == write_synchType) {
 766             ret = fh->f_fbtl->fbtl_ipwritev(fh, (ompi_request_t *) ompio_req);
 767             if(0 > ret) {
 768                 opal_output (1, "vulcan_write_all: fbtl_ipwritev failed\n");
 769                 ompio_req->req_ompi.req_status.MPI_ERROR = ret;
 770                 ompio_req->req_ompi.req_status._ucount = 0;
 771             }
 772         }
 773         else {
 774             ret_temp = fh->f_fbtl->fbtl_pwritev(fh);
 775             if(0 > ret_temp) {
 776                 opal_output (1, "vulcan_write_all: fbtl_pwritev failed\n");
 777                 ret = ret_temp;
 778                 ret_temp = 0;
 779             }
 780 
 781             ompio_req->req_ompi.req_status.MPI_ERROR = ret;
 782             ompio_req->req_ompi.req_status._ucount = ret_temp;
 783             ompi_request_complete (&ompio_req->req_ompi, false);
 784         }
 785 
 786         free(fh->f_io_array);
 787         free(aggr_data->prev_io_array);
 788     }
 789     else {
 790         ompio_req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
 791         ompio_req->req_ompi.req_status._ucount = 0;
 792         ompi_request_complete (&ompio_req->req_ompi, false);
 793     }
 794 
 795     *request = (ompi_request_t *) ompio_req;
 796 
 797     fh->f_io_array=NULL;
 798     fh->f_num_of_io_entries=0;
 799 
 800     return ret;
 801 }
 802 
 803 static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_io_ompio_aggregator_data *data, 
 804                           ompi_request_t **reqs )
 805 {
 806     int bytes_sent = 0;
 807     int blocks=0, temp_pindex;
 808     int i, j, l, ret;
 809     int  entries_per_aggregator=0;
 810     mca_io_ompio_local_io_array *file_offsets_for_agg=NULL;
 811     int *sorted_file_offsets=NULL;
 812     int temp_index=0;
 813     MPI_Aint *memory_displacements=NULL;
 814     int *temp_disp_index=NULL;
 815     MPI_Aint global_count = 0;
 816     int* blocklength_proc=NULL;
 817     ptrdiff_t* displs_proc=NULL;
 818 
 819     data->num_io_entries = 0;
 820     data->bytes_sent = 0;
 821     data->io_array=NULL;
 822 
 823     /**********************************************************************
 824      ***  7a. Getting ready for next cycle: initializing and freeing buffers
 825      **********************************************************************/
 826     if (aggregator == rank) {
 827         
 828         if (NULL != data->recvtype){
 829             for (i =0; i< data->procs_per_group; i++) {
 830                 if ( MPI_DATATYPE_NULL != data->recvtype[i] ) {
 831                     ompi_datatype_destroy(&data->recvtype[i]);
 832                     data->recvtype[i] = MPI_DATATYPE_NULL;
 833                 }
 834             }
 835         }
 836 
 837         
 838         for(l=0;l<data->procs_per_group;l++){
 839             data->disp_index[l] =  1;
 840             
 841             if ( data->max_disp_index[l] == 0 ) {
 842                 data->blocklen_per_process[l] = (int *) calloc (INIT_LEN, sizeof(int));
 843                 data->displs_per_process[l] = (MPI_Aint *) calloc (INIT_LEN, sizeof(MPI_Aint));
 844                 if (NULL == data->displs_per_process[l] || NULL == data->blocklen_per_process[l]){
 845                     opal_output (1, "OUT OF MEMORY for displs\n");
 846                     ret = OMPI_ERR_OUT_OF_RESOURCE;
 847                     goto exit;
 848                 }
 849                 data->max_disp_index[l] = INIT_LEN;
 850             }
 851             else {
 852                 memset ( data->blocklen_per_process[l], 0, data->max_disp_index[l]*sizeof(int) );
 853                 memset ( data->displs_per_process[l], 0, data->max_disp_index[l]*sizeof(MPI_Aint) );
 854             }
 855         }
 856     } /* (aggregator == rank */
 857     
 858     /**************************************************************************
 859      ***  7b. Determine the number of bytes to be actually written in this cycle
 860      **************************************************************************/
 861     int local_cycles= ceil((double)data->total_bytes / data->bytes_per_cycle);
 862     if ( index  < (local_cycles -1) ) {
 863         data->bytes_to_write_in_cycle = data->bytes_per_cycle;
 864     }
 865     else if ( index == (local_cycles -1)) {
 866         data->bytes_to_write_in_cycle = data->total_bytes - data->bytes_per_cycle*index ;
 867     }
 868     else {
 869         data->bytes_to_write_in_cycle = 0;
 870     }
 871     data->bytes_to_write = data->bytes_to_write_in_cycle;
 872 
 873 #if DEBUG_ON
 874     if (aggregator == rank) {
 875         printf ("****%d: CYCLE %d   Bytes %lld**********\n",
 876                 rank,
 877                 index,
 878                 data->bytes_to_write_in_cycle);
 879     }
 880 #endif
 881     /**********************************************************
 882      **Gather the Data from all the processes at the writers **
 883      *********************************************************/
 884     
 885 #if DEBUG_ON
 886     printf("bytes_to_write_in_cycle: %ld, cycle : %d\n", data->bytes_to_write_in_cycle,
 887            index);
 888 #endif
 889     
 890     /*****************************************************************
 891      *** 7c. Calculate how much data will be contributed in this cycle
 892      ***     by each process
 893      *****************************************************************/
 894     
 895     /* The blocklen and displs calculation only done at aggregators!*/
 896     while (data->bytes_to_write_in_cycle) {
 897         
 898         /* This next block identifies which process is the holder
 899         ** of the sorted[current_index] element;
 900         */
 901         blocks = data->fview_count[0];
 902         for (j=0 ; j<data->procs_per_group ; j++) {
 903             if (data->sorted[data->current_index] < blocks) {
 904                 data->n = j;
 905                 break;
 906             }
 907             else {
 908                 blocks += data->fview_count[j+1];
 909             }
 910         }
 911         
 912         if (data->bytes_remaining) {
 913             /* Finish up a partially used buffer from the previous  cycle */
 914             
 915             if (data->bytes_remaining <= data->bytes_to_write_in_cycle) {
 916                 /* The data fits completely into the block */
 917                 if (aggregator == rank) {
 918                     data->blocklen_per_process[data->n][data->disp_index[data->n] - 1] = data->bytes_remaining;
 919                     data->displs_per_process[data->n][data->disp_index[data->n] - 1] =
 920                         (ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base +
 921                         (data->global_iov_array[data->sorted[data->current_index]].iov_len
 922                          - data->bytes_remaining);
 923 
 924                     data->disp_index[data->n] += 1;
 925                     
 926                     /* In this cases the length is consumed so allocating for
 927                        next displacement and blocklength*/
 928                     if ( data->disp_index[data->n] == data->max_disp_index[data->n] ) {
 929                         data->max_disp_index[data->n] *= 2;
 930 
 931                         data->blocklen_per_process[data->n] = (int *) realloc
 932                             ((void *)data->blocklen_per_process[data->n], 
 933                              (data->max_disp_index[data->n])*sizeof(int));
 934                         data->displs_per_process[data->n] = (MPI_Aint *) realloc
 935                             ((void *)data->displs_per_process[data->n], 
 936                              (data->max_disp_index[data->n])*sizeof(MPI_Aint));
 937                     }
 938                     data->blocklen_per_process[data->n][data->disp_index[data->n]] = 0;
 939                     data->displs_per_process[data->n][data->disp_index[data->n]] = 0;
 940 
 941                 }
 942                 if (data->procs_in_group[data->n] == rank) {
 943                     bytes_sent += data->bytes_remaining;
 944                 }
 945                 data->current_index ++;
 946                 data->bytes_to_write_in_cycle -= data->bytes_remaining;
 947                 data->bytes_remaining = 0;
 948             }
 949             else {
 950                 /* the remaining data from the previous cycle is larger than the
 951                    data->bytes_to_write_in_cycle, so we have to segment again */
 952                 if (aggregator == rank) {
 953                     data->blocklen_per_process[data->n][data->disp_index[data->n] - 1] = data->bytes_to_write_in_cycle;
 954                     data->displs_per_process[data->n][data->disp_index[data->n] - 1] =
 955                         (ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base +
 956                         (data->global_iov_array[data->sorted[data->current_index]].iov_len
 957                          - data->bytes_remaining);
 958                 }
 959                 
 960                 if (data->procs_in_group[data->n] == rank) {
 961                     bytes_sent += data->bytes_to_write_in_cycle;
 962                 }
 963                 data->bytes_remaining -= data->bytes_to_write_in_cycle;
 964                 data->bytes_to_write_in_cycle = 0;
 965                 break;
 966             }
 967         }
 968         else {
 969             /* No partially used entry available, have to start a new one */
 970             if (data->bytes_to_write_in_cycle <
 971                 (MPI_Aint) data->global_iov_array[data->sorted[data->current_index]].iov_len) {
 972                 /* This entry has more data than we can sendin one cycle */
 973                 if (aggregator == rank) {
 974                     data->blocklen_per_process[data->n][data->disp_index[data->n] - 1] = data->bytes_to_write_in_cycle;
 975                     data->displs_per_process[data->n][data->disp_index[data->n] - 1] =
 976                         (ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base ;
 977                 }
 978                 if (data->procs_in_group[data->n] == rank) {
 979                     bytes_sent += data->bytes_to_write_in_cycle;
 980                     
 981                 }
 982                 data->bytes_remaining = data->global_iov_array[data->sorted[data->current_index]].iov_len -
 983                     data->bytes_to_write_in_cycle;
 984                 data->bytes_to_write_in_cycle = 0;
 985                 break;
 986             }
 987             else {
 988                 /* Next data entry is less than data->bytes_to_write_in_cycle */
 989                 if (aggregator == rank) {
 990                     data->blocklen_per_process[data->n][data->disp_index[data->n] - 1] =
 991                         data->global_iov_array[data->sorted[data->current_index]].iov_len;
 992                     data->displs_per_process[data->n][data->disp_index[data->n] - 1] = (ptrdiff_t)
 993                         data->global_iov_array[data->sorted[data->current_index]].iov_base;
 994 
 995                     data->disp_index[data->n] += 1;
 996                     
 997                     /*realloc for next blocklength
 998                       and assign this displacement and check for next displs as
 999                       the total length of this entry has been consumed!*/
1000                     if ( data->disp_index[data->n] == data->max_disp_index[data->n] ) {
1001                         data->max_disp_index[data->n] *=2 ;
1002                         data->blocklen_per_process[data->n] = (int *) realloc (
1003                             (void *)data->blocklen_per_process[data->n], 
1004                             (data->max_disp_index[data->n]*sizeof(int)));
1005                         data->displs_per_process[data->n] = (MPI_Aint *)realloc (
1006                             (void *)data->displs_per_process[data->n], 
1007                             (data->max_disp_index[data->n]*sizeof(MPI_Aint)));
1008                     }
1009                     data->blocklen_per_process[data->n][data->disp_index[data->n]] = 0;
1010                     data->displs_per_process[data->n][data->disp_index[data->n]] = 0;
1011                 }
1012                 if (data->procs_in_group[data->n] == rank) {
1013                     bytes_sent += data->global_iov_array[data->sorted[data->current_index]].iov_len;
1014                 }
1015                 data->bytes_to_write_in_cycle -=
1016                     data->global_iov_array[data->sorted[data->current_index]].iov_len;
1017                 data->current_index ++;
1018             }
1019         }
1020     }
1021     
1022     
1023     /*************************************************************************
1024      *** 7d. Calculate the displacement on where to put the data and allocate
1025      ***     the recieve buffer (global_buf)
1026      *************************************************************************/
1027     if (aggregator == rank) {
1028         entries_per_aggregator=0;
1029         for (i=0;i<data->procs_per_group; i++){
1030             for (j=0;j<data->disp_index[i];j++){
1031                 if (data->blocklen_per_process[i][j] > 0)
1032                     entries_per_aggregator++ ;
1033             }
1034         }
1035         
1036 #if DEBUG_ON
1037         printf("%d: cycle: %d, bytes_sent: %d\n ",rank,index,
1038                bytes_sent);
1039         printf("%d : Entries per aggregator : %d\n",rank,entries_per_aggregator);
1040 #endif
1041         
1042         if (entries_per_aggregator > 0){
1043             file_offsets_for_agg = (mca_io_ompio_local_io_array *)
1044                 malloc(entries_per_aggregator*sizeof(mca_io_ompio_local_io_array));
1045             if (NULL == file_offsets_for_agg) {
1046                 opal_output (1, "OUT OF MEMORY\n");
1047                 ret = OMPI_ERR_OUT_OF_RESOURCE;
1048                 goto exit;
1049             }
1050             
1051             sorted_file_offsets = (int *)
1052                 malloc (entries_per_aggregator*sizeof(int));
1053             if (NULL == sorted_file_offsets){
1054                 opal_output (1, "OUT OF MEMORY\n");
1055                 ret =  OMPI_ERR_OUT_OF_RESOURCE;
1056                 goto exit;
1057             }
1058             
1059             /*Moving file offsets to an IO array!*/
1060             temp_index = 0;
1061             
1062             for (i=0;i<data->procs_per_group; i++){
1063                 for(j=0;j<data->disp_index[i];j++){
1064                     if (data->blocklen_per_process[i][j] > 0){
1065                         file_offsets_for_agg[temp_index].length =
1066                             data->blocklen_per_process[i][j];
1067                         file_offsets_for_agg[temp_index].process_id = i;
1068                         file_offsets_for_agg[temp_index].offset =
1069                             data->displs_per_process[i][j];
1070                         temp_index++;
1071                         
1072 #if DEBUG_ON
1073                         printf("************Cycle: %d,  Aggregator: %d ***************\n",
1074                                index+1,rank);
1075                         
1076                         printf("%d sends blocklen[%d]: %d, disp[%d]: %ld to %d\n",
1077                                data->procs_in_group[i],j,
1078                                data->blocklen_per_process[i][j],j,
1079                                data->displs_per_process[i][j],
1080                                rank);
1081 #endif
1082                     }
1083                 }
1084             }
1085                 
1086             /* Sort the displacements for each aggregator*/
1087             local_heap_sort (file_offsets_for_agg,
1088                              entries_per_aggregator,
1089                              sorted_file_offsets);
1090             
1091             /*create contiguous memory displacements
1092               based on blocklens on the same displs array
1093               and map it to this aggregator's actual
1094               file-displacements (this is in the io-array created above)*/
1095             memory_displacements = (MPI_Aint *) malloc
1096                 (entries_per_aggregator * sizeof(MPI_Aint));
1097             
1098             memory_displacements[sorted_file_offsets[0]] = 0;
1099             for (i=1; i<entries_per_aggregator; i++){
1100                 memory_displacements[sorted_file_offsets[i]] =
1101                     memory_displacements[sorted_file_offsets[i-1]] +
1102                     file_offsets_for_agg[sorted_file_offsets[i-1]].length;
1103             }
1104             
1105             temp_disp_index = (int *)calloc (1, data->procs_per_group * sizeof (int));
1106             if (NULL == temp_disp_index) {
1107                 opal_output (1, "OUT OF MEMORY\n");
1108                 ret = OMPI_ERR_OUT_OF_RESOURCE;
1109                 goto exit;
1110             }
1111             
1112             /*Now update the displacements array  with memory offsets*/
1113             global_count = 0;
1114             for (i=0;i<entries_per_aggregator;i++){
1115                 temp_pindex =
1116                     file_offsets_for_agg[sorted_file_offsets[i]].process_id;
1117                 data->displs_per_process[temp_pindex][temp_disp_index[temp_pindex]] =
1118                     memory_displacements[sorted_file_offsets[i]];
1119                 if (temp_disp_index[temp_pindex] < data->disp_index[temp_pindex])
1120                     temp_disp_index[temp_pindex] += 1;
1121                 else{
1122                     printf("temp_disp_index[%d]: %d is greater than disp_index[%d]: %d\n",
1123                            temp_pindex, temp_disp_index[temp_pindex],
1124                            temp_pindex, data->disp_index[temp_pindex]);
1125                 }
1126                 global_count +=
1127                     file_offsets_for_agg[sorted_file_offsets[i]].length;
1128             }
1129             
1130             if (NULL != temp_disp_index){
1131                 free(temp_disp_index);
1132                 temp_disp_index = NULL;
1133             }
1134             
1135 #if DEBUG_ON
1136             
1137             printf("************Cycle: %d,  Aggregator: %d ***************\n",
1138                    index+1,rank);
1139             for (i=0;i<data->procs_per_group; i++){
1140                 for(j=0;j<data->disp_index[i];j++){
1141                     if (data->blocklen_per_process[i][j] > 0){
1142                         printf("%d sends blocklen[%d]: %d, disp[%d]: %ld to %d\n",
1143                                data->procs_in_group[i],j,
1144                                data->blocklen_per_process[i][j],j,
1145                                data->displs_per_process[i][j],
1146                                rank);
1147                         
1148                     }
1149                 }
1150             }
1151             printf("************Cycle: %d,  Aggregator: %d ***************\n",
1152                    index+1,rank);
1153             for (i=0; i<entries_per_aggregator;i++){
1154                 printf("%d: OFFSET: %lld   LENGTH: %ld, Mem-offset: %ld\n",
1155                        file_offsets_for_agg[sorted_file_offsets[i]].process_id,
1156                        file_offsets_for_agg[sorted_file_offsets[i]].offset,
1157                        file_offsets_for_agg[sorted_file_offsets[i]].length,
1158                        memory_displacements[sorted_file_offsets[i]]);
1159             }
1160             printf("%d : global_count : %ld, bytes_sent : %d\n",
1161                    rank,global_count, bytes_sent);
1162 #endif
1163 //#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
1164 //            start_comm_time = MPI_Wtime();
1165 //#endif
1166             /*************************************************************************
1167              *** 7e. Perform the actual communication
1168              *************************************************************************/
1169             for (i=0;i<data->procs_per_group; i++) {
1170                 size_t datatype_size;
1171                 reqs[i] = MPI_REQUEST_NULL;
1172                 if ( 0 < data->disp_index[i] ) {
1173                     ompi_datatype_create_hindexed(data->disp_index[i],
1174                                                   data->blocklen_per_process[i],
1175                                                   data->displs_per_process[i],
1176                                                   MPI_BYTE,
1177                                                   &data->recvtype[i]);
1178                     ompi_datatype_commit(&data->recvtype[i]);
1179                     opal_datatype_type_size(&data->recvtype[i]->super, &datatype_size);
1180                     
1181                     if (datatype_size){
1182                         ret = MCA_PML_CALL(irecv(data->global_buf,
1183                                                  1,
1184                                                  data->recvtype[i],
1185                                                  data->procs_in_group[i],
1186                                                  FCOLL_VULCAN_SHUFFLE_TAG+index,
1187                                                  data->comm,
1188                                                  &reqs[i]));
1189                         if (OMPI_SUCCESS != ret){
1190                             goto exit;
1191                         }
1192                     }
1193                 }
1194             }
1195         }  /* end if (entries_per_aggr > 0 ) */
1196     }/* end if (aggregator == rank ) */
1197 
1198     if (bytes_sent) {
1199         size_t remaining      = bytes_sent;
1200         int block_index       = -1;
1201         int blocklength_size  = INIT_LEN;
1202 
1203         ptrdiff_t send_mem_address  = 0;
1204         ompi_datatype_t *newType    = MPI_DATATYPE_NULL;
1205         blocklength_proc            = (int *)       calloc (blocklength_size, sizeof (int));
1206         displs_proc                 = (ptrdiff_t *) calloc (blocklength_size, sizeof (ptrdiff_t));
1207 
1208         if (NULL == blocklength_proc || NULL == displs_proc ) {
1209             opal_output (1, "OUT OF MEMORY\n");
1210             ret = OMPI_ERR_OUT_OF_RESOURCE;
1211             goto exit;
1212         }
1213 
1214         while (remaining) {
1215             block_index++;
1216 
1217             if(0 == block_index) {
1218                 send_mem_address = (ptrdiff_t) (data->decoded_iov[data->iov_index].iov_base) +
1219                                                 data->current_position;
1220             }
1221             else {
1222                 // Reallocate more memory if blocklength_size is not enough
1223                 if(0 == block_index % INIT_LEN) {
1224                     blocklength_size += INIT_LEN;
1225                     blocklength_proc = (int *)       realloc(blocklength_proc, blocklength_size * sizeof(int));
1226                     displs_proc      = (ptrdiff_t *) realloc(displs_proc, blocklength_size * sizeof(ptrdiff_t));
1227                 }
1228                 displs_proc[block_index] = (ptrdiff_t) (data->decoded_iov[data->iov_index].iov_base) +
1229                                                         data->current_position - send_mem_address;
1230             }
1231 
1232             if (remaining >=
1233                 (data->decoded_iov[data->iov_index].iov_len - data->current_position)) {
1234 
1235                 blocklength_proc[block_index] = data->decoded_iov[data->iov_index].iov_len -
1236                                                 data->current_position;
1237                 remaining = remaining -
1238                             (data->decoded_iov[data->iov_index].iov_len - data->current_position);
1239                 data->iov_index = data->iov_index + 1;
1240                 data->current_position = 0;
1241             }
1242             else {
1243                 blocklength_proc[block_index] = remaining;
1244                 data->current_position += remaining;
1245                 remaining = 0;
1246             }
1247         }
1248 
1249         data->total_bytes_written += bytes_sent;
1250         data->bytes_sent = bytes_sent;
1251 
1252         if ( 0 <= block_index ) {
1253             ompi_datatype_create_hindexed(block_index+1,
1254                                           blocklength_proc,
1255                                           displs_proc,
1256                                           MPI_BYTE,
1257                                           &newType);
1258             ompi_datatype_commit(&newType);
1259 
1260             ret = MCA_PML_CALL(isend((char *)send_mem_address,
1261                                      1,
1262                                      newType,
1263                                      aggregator,
1264                                      FCOLL_VULCAN_SHUFFLE_TAG+index,
1265                                      MCA_PML_BASE_SEND_STANDARD,
1266                                      data->comm,
1267                                      &reqs[data->procs_per_group]));
1268             if ( MPI_DATATYPE_NULL != newType ) {
1269                 ompi_datatype_destroy(&newType);
1270             }
1271             if (OMPI_SUCCESS != ret){
1272                 goto exit;
1273             }
1274         }
1275     }
1276 
1277 #if DEBUG_ON
1278     if (aggregator == rank){
1279         printf("************Cycle: %d,  Aggregator: %d ***************\n",
1280                index+1,rank);
1281         for (i=0 ; i<global_count/4 ; i++)
1282             printf (" RECV %d \n",((int *)data->global_buf)[i]);
1283     }
1284 #endif
1285     
1286 //#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
1287 //    end_comm_time = MPI_Wtime();
1288 //    comm_time += (end_comm_time - start_comm_time);
1289 //#endif
1290     /**********************************************************
1291      *** 7f. Create the io array, and pass it to fbtl
1292      *********************************************************/
1293     
1294     if (aggregator == rank && entries_per_aggregator>0) {
1295         
1296         
1297         data->io_array = (mca_common_ompio_io_array_t *) malloc
1298             (entries_per_aggregator * sizeof (mca_common_ompio_io_array_t));
1299         if (NULL == data->io_array) {
1300             opal_output(1, "OUT OF MEMORY\n");
1301             ret = OMPI_ERR_OUT_OF_RESOURCE;
1302             goto exit;
1303         }
1304         
1305         data->num_io_entries = 0;
1306         /*First entry for every aggregator*/
1307         data->io_array[0].offset =
1308             (IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_file_offsets[0]].offset;
1309         data->io_array[0].length =
1310             file_offsets_for_agg[sorted_file_offsets[0]].length;
1311         data->io_array[0].memory_address =
1312             data->global_buf+memory_displacements[sorted_file_offsets[0]];
1313         data->num_io_entries++;
1314         
1315         for (i=1;i<entries_per_aggregator;i++){
1316             /* If the enrties are contiguous merge them,
1317                else make a new entry */
1318             if (file_offsets_for_agg[sorted_file_offsets[i-1]].offset +
1319                 file_offsets_for_agg[sorted_file_offsets[i-1]].length ==
1320                 file_offsets_for_agg[sorted_file_offsets[i]].offset){
1321                 data->io_array[data->num_io_entries - 1].length +=
1322                     file_offsets_for_agg[sorted_file_offsets[i]].length;
1323             }
1324             else {
1325                 data->io_array[data->num_io_entries].offset =
1326                     (IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_file_offsets[i]].offset;
1327                 data->io_array[data->num_io_entries].length =
1328                     file_offsets_for_agg[sorted_file_offsets[i]].length;
1329                 data->io_array[data->num_io_entries].memory_address =
1330                     data->global_buf+memory_displacements[sorted_file_offsets[i]];
1331                 data->num_io_entries++;
1332             }
1333             
1334         }
1335         
1336 #if DEBUG_ON
1337         printf("*************************** %d\n", num_of_io_entries);
1338         for (i=0 ; i<num_of_io_entries ; i++) {
1339             printf(" ADDRESS: %p  OFFSET: %ld   LENGTH: %ld\n",
1340                    io_array[i].memory_address,
1341                    (ptrdiff_t)io_array[i].offset,
1342                    io_array[i].length);
1343         }
1344         
1345 #endif
1346     }
1347         
1348 exit:
1349     free(sorted_file_offsets);
1350     free(file_offsets_for_agg);
1351     free(memory_displacements);
1352     free(blocklength_proc);
1353     free(displs_proc);
1354 
1355     return OMPI_SUCCESS;
1356 }
1357     
1358 static int mca_fcoll_vulcan_minmax ( ompio_file_t *fh, struct iovec *iov, int iov_count,  int num_aggregators, long *new_stripe_size)
1359 {
1360     long min, max, globalmin, globalmax;
1361     long stripe_size;
1362 
1363     if ( iov_count > 0 ) {
1364         min = (long) iov[0].iov_base;
1365         max = ((long) iov[iov_count-1].iov_base + (long) iov[iov_count-1].iov_len);
1366     }
1367     else {
1368         min = 0;
1369         max = 0;
1370     }
1371     fh->f_comm->c_coll->coll_allreduce ( &min, &globalmin, 1, MPI_LONG, MPI_MIN,
1372                                          fh->f_comm, fh->f_comm->c_coll->coll_allreduce_module);
1373     
1374     fh->f_comm->c_coll->coll_allreduce ( &max, &globalmax, 1, MPI_LONG, MPI_MAX,
1375                                          fh->f_comm, fh->f_comm->c_coll->coll_allreduce_module);
1376 
1377     //    if ( fh->f_rank < 10 ) printf("[%d]: min=%ld max=%ld globalmin=%ld, globalmax=%ld num_aggregators=%d\n", fh->f_rank, min, max, globalmin, globalmax, num_aggregators);
1378 
1379     stripe_size = (globalmax - globalmin)/num_aggregators;
1380     if ( (globalmax - globalmin) % num_aggregators ) {
1381       stripe_size++;
1382     }
1383 
1384     *new_stripe_size  = stripe_size;
1385     //    if ( fh->f_rank == 0 ) 
1386     //    printf(" partition size is %ld\n", stripe_size);
1387 
1388     return OMPI_SUCCESS;
1389 }
1390     
1391     
1392 
1393 int mca_fcoll_vulcan_break_file_view ( struct iovec *mem_iov, int mem_count, 
1394                                         struct iovec *file_iov, int file_count, 
1395                                         struct iovec ***ret_broken_mem_iovs, int **ret_broken_mem_counts,
1396                                         struct iovec ***ret_broken_file_iovs, int **ret_broken_file_counts, 
1397                                         MPI_Aint **ret_broken_total_lengths,
1398                                         int stripe_count, size_t stripe_size)
1399 {
1400     int i, j, ret=OMPI_SUCCESS;
1401     struct iovec **broken_mem_iovs=NULL; 
1402     int *broken_mem_counts=NULL;
1403     struct iovec **broken_file_iovs=NULL; 
1404     int *broken_file_counts=NULL;
1405     MPI_Aint *broken_total_lengths=NULL;
1406     int **block=NULL, **max_lengths=NULL;
1407     
1408     broken_mem_iovs  = (struct iovec **) malloc ( stripe_count * sizeof(struct iovec *)); 
1409     broken_file_iovs = (struct iovec **) malloc ( stripe_count * sizeof(struct iovec *)); 
1410     if ( NULL == broken_mem_iovs || NULL == broken_file_iovs ) {
1411         ret = OMPI_ERR_OUT_OF_RESOURCE;
1412         goto exit;
1413     }
1414     for ( i=0; i<stripe_count; i++ ) {
1415         broken_mem_iovs[i]  = (struct iovec*) calloc (1, sizeof(struct iovec ));
1416         broken_file_iovs[i] = (struct iovec*) calloc (1, sizeof(struct iovec ));
1417     }
1418     
1419     broken_mem_counts    = (int *) calloc ( stripe_count, sizeof(int));
1420     broken_file_counts   = (int *) calloc ( stripe_count, sizeof(int));
1421     broken_total_lengths = (MPI_Aint *) calloc ( stripe_count, sizeof(MPI_Aint));
1422     if ( NULL == broken_mem_counts || NULL == broken_file_counts ||
1423          NULL == broken_total_lengths ) {
1424         ret = OMPI_ERR_OUT_OF_RESOURCE;
1425         goto exit;
1426     }
1427 
1428     block       = (int **) calloc ( stripe_count, sizeof(int *));
1429     max_lengths = (int **) calloc ( stripe_count,  sizeof(int *));
1430     if ( NULL == block || NULL == max_lengths ) {
1431         ret = OMPI_ERR_OUT_OF_RESOURCE;
1432         goto exit;
1433     }
1434     
1435     for ( i=0; i<stripe_count; i++ ){
1436         block[i]       = (int *) malloc ( 5 * sizeof(int));
1437         max_lengths[i] = (int *) malloc ( 2 * sizeof(int));
1438         if ( NULL == block[i] || NULL == max_lengths[i]) {
1439             ret = OMPI_ERR_OUT_OF_RESOURCE;
1440             goto exit;
1441         }
1442         max_lengths[i][0] = 1;
1443         max_lengths[i][1] = 1;
1444         
1445         for ( j=0; j<5; j++ ) {
1446             block[i][j]=2;
1447         }
1448     }
1449     
1450     /* Step 1: separate the local_iov_array per aggregator */
1451     int owner;
1452     size_t rest, len, temp_len, blocklen, memlen=0;
1453     off_t offset, temp_offset, start_offset, memoffset=0;
1454 
1455     i=j=0;
1456 
1457     if ( 0 < mem_count ) {
1458         memoffset = (off_t ) mem_iov[j].iov_base;
1459         memlen    = mem_iov[j].iov_len;
1460     }
1461     while ( i < file_count) {
1462         offset = (off_t) file_iov[i].iov_base;
1463         len    = file_iov[i].iov_len;
1464 
1465 
1466 #if DEBUG_ON
1467         printf("%d:file_iov[%d].base=%ld .len=%d\n", rank, i, 
1468                file_iov[i].iov_base, file_iov[i].iov_len);
1469 #endif
1470         do {
1471             owner        = (offset / stripe_size ) % stripe_count;
1472             start_offset = (offset / stripe_size );
1473             rest         = (start_offset + 1) * stripe_size - offset;
1474 
1475             if ( len >= rest ) {
1476                 blocklen    = rest;
1477                 temp_offset = offset+rest;
1478                 temp_len    = len - rest;
1479             }
1480             else {
1481                 blocklen    = len;
1482                 temp_offset = 0;
1483                 temp_len    = 0;
1484             }
1485             
1486             broken_file_iovs[owner][broken_file_counts[owner]].iov_base = (void *)offset;
1487             broken_file_iovs[owner][broken_file_counts[owner]].iov_len  = blocklen;
1488 #if DEBUG_ON
1489             printf("%d: owner=%d b_file_iovs[%d].base=%ld .len=%d \n", rank, owner, 
1490                    broken_file_counts[owner], 
1491                    broken_file_iovs[owner][broken_file_counts[owner]].iov_base, 
1492                    broken_file_iovs[owner][broken_file_counts[owner]].iov_len );
1493 #endif
1494             do {
1495                 if ( memlen >=  blocklen ) {
1496                     broken_mem_iovs[owner][broken_mem_counts[owner]].iov_base = (void *) memoffset;
1497                     broken_mem_iovs[owner][broken_mem_counts[owner]].iov_len  = blocklen;
1498                     memoffset += blocklen;
1499                     memlen    -= blocklen;
1500                     blocklen   = 0;
1501 
1502                     if ( 0 == memlen ) {
1503                         j++;
1504                         if ( j < mem_count ) {
1505                             memoffset = (off_t) mem_iov[j].iov_base;
1506                             memlen    = mem_iov[j].iov_len;
1507                         }
1508                         else
1509                             break;
1510                     }
1511                 }                
1512                 else {
1513                     broken_mem_iovs[owner][broken_mem_counts[owner]].iov_base = (void *) memoffset;
1514                     broken_mem_iovs[owner][broken_mem_counts[owner]].iov_len  = memlen;
1515                     blocklen -= memlen;
1516                     
1517                     j++;
1518                     if ( j < mem_count ) {
1519                         memoffset = (off_t) mem_iov[j].iov_base;
1520                         memlen    = mem_iov[j].iov_len;
1521                     }
1522                     else 
1523                         break;
1524                 }
1525 #if DEBUG_ON
1526                 printf("%d: owner=%d b_mem_iovs[%d].base=%ld .len=%d\n", rank, owner,
1527                        broken_mem_counts[owner],
1528                        broken_mem_iovs[owner][broken_mem_counts[owner]].iov_base,
1529                        broken_mem_iovs[owner][broken_mem_counts[owner]].iov_len);
1530 #endif
1531 
1532                 broken_mem_counts[owner]++;
1533                 if ( broken_mem_counts[owner] >= max_lengths[owner][0] ) {
1534                     broken_mem_iovs[owner] = (struct iovec*) realloc ( broken_mem_iovs[owner],
1535                                                                        mem_count * block[owner][0] * 
1536                                                                        sizeof(struct iovec ));
1537                     max_lengths[owner][0] = mem_count * block[owner][0];
1538                     block[owner][0]++;
1539                 }
1540 
1541             } while ( blocklen > 0 );
1542 
1543             broken_file_counts[owner]++;
1544             if ( broken_file_counts[owner] >= max_lengths[owner][1] ) {
1545                 broken_file_iovs[owner] = (struct iovec*) realloc ( broken_file_iovs[owner],
1546                                                                     file_count * block[owner][1] * 
1547                                                                     sizeof(struct iovec ));
1548                 max_lengths[owner][1] = file_count * block[owner][1];
1549                 block[owner][1]++;
1550             }
1551 
1552             offset = temp_offset;
1553             len    = temp_len;
1554         } while( temp_len > 0 );
1555 
1556         i++;
1557     } 
1558 
1559     
1560     /* Step 2: recalculating the total lengths per aggregator */
1561     for ( i=0; i< stripe_count; i++ ) {
1562         for ( j=0; j<broken_file_counts[i]; j++ ) {
1563             broken_total_lengths[i] += broken_file_iovs[i][j].iov_len;
1564         }
1565 #if DEBUG_ON
1566         printf("%d: broken_total_lengths[%d] = %d\n", rank, i, broken_total_lengths[i]);
1567 #endif
1568     }
1569 
1570     *ret_broken_mem_iovs      = broken_mem_iovs;
1571     *ret_broken_mem_counts    = broken_mem_counts;
1572     *ret_broken_file_iovs     = broken_file_iovs;
1573     *ret_broken_file_counts   = broken_file_counts;
1574     *ret_broken_total_lengths = broken_total_lengths;    
1575 
1576     if ( NULL != block) {
1577         for ( i=0; i<stripe_count; i++ ){
1578             free (block[i] );
1579         }
1580         free ( block);
1581     }
1582     if ( NULL != max_lengths) {
1583         for ( i=0; i<stripe_count; i++ ){
1584             free (max_lengths[i] );
1585         }
1586         free ( max_lengths);
1587     }
1588 
1589     return ret;
1590 
1591 exit:
1592     free ( broken_mem_iovs);    
1593     free ( broken_mem_counts);
1594     free ( broken_file_iovs );
1595     free ( broken_file_counts);
1596     free ( broken_total_lengths);
1597 
1598     if ( NULL != block) {
1599         for ( i=0; i<stripe_count; i++ ){
1600             free (block[i] );
1601         }
1602         free ( block);
1603     }
1604     if ( NULL != max_lengths) {
1605         for ( i=0; i<stripe_count; i++ ){
1606             free (max_lengths[i] );
1607         }
1608         free ( max_lengths);
1609     }
1610 
1611     *ret_broken_mem_iovs      = NULL;
1612     *ret_broken_mem_counts    = NULL;
1613     *ret_broken_file_iovs     = NULL;
1614     *ret_broken_file_counts   = NULL;
1615     *ret_broken_total_lengths = NULL;
1616 
1617     return ret;
1618 }
1619 
1620 
1621 int mca_fcoll_vulcan_get_configuration (ompio_file_t *fh, int num_io_procs, int num_groups, 
1622                                         size_t max_data)
1623 {
1624     int i, ret;
1625     ret = mca_common_ompio_set_aggregator_props (fh, num_io_procs, max_data);
1626 
1627     /* Note: as of this version of the vulcan component, we are not using yet
1628        the num_groups parameter to split the aggregators (and processes) into
1629        distinct subgroups. This will however hopefullty be done in a second step
1630        as well, allowing to keep communication just to individual subgroups of processes,
1631        each subgroup using however the classic two-phase collective I/O algorithm
1632        with multiple aggregators and even partitioning internally. 
1633     
1634        For now, logically all processes are in a single group. */
1635 
1636     fh->f_procs_per_group = fh->f_size;
1637     if ( NULL != fh->f_procs_in_group ) {
1638         free ( fh->f_procs_in_group );
1639     }
1640     fh->f_procs_in_group = (int *) malloc ( sizeof(int) * fh->f_size );
1641     if ( NULL == fh->f_procs_in_group) {
1642         return OMPI_ERR_OUT_OF_RESOURCE;
1643     }
1644     for (i=0; i<fh->f_size; i++ ) {
1645         fh->f_procs_in_group[i]=i;
1646     }
1647     
1648     return ret;
1649 }    
1650     
1651 
1652 int mca_fcoll_vulcan_split_iov_array ( ompio_file_t *fh, mca_common_ompio_io_array_t *io_array, int num_entries,
1653                                              int *ret_array_pos, int *ret_pos,  int chunk_size )
1654 {
1655 
1656     int array_pos = *ret_array_pos;
1657     int pos       = *ret_pos;
1658     size_t bytes_written = 0;
1659     size_t bytes_to_write = chunk_size;
1660 
1661     if ( 0 == array_pos && 0 == pos ) {
1662         fh->f_io_array = (mca_common_ompio_io_array_t *) malloc ( num_entries * sizeof(mca_common_ompio_io_array_t));
1663         if ( NULL == fh->f_io_array ){
1664             opal_output (1,"Could not allocate memory\n");
1665             return -1;
1666         }
1667     }
1668         
1669     int i=0;
1670     while (bytes_to_write > 0 ) {
1671         fh->f_io_array[i].memory_address = &(((char *)io_array[array_pos].memory_address)[pos]);
1672         fh->f_io_array[i].offset = &(((char *)io_array[array_pos].offset)[pos]);
1673 
1674         if ( (io_array[array_pos].length - pos ) >= bytes_to_write ) {
1675             fh->f_io_array[i].length = bytes_to_write;
1676         }
1677         else {
1678             fh->f_io_array[i].length = io_array[array_pos].length - pos;
1679         }
1680 
1681         pos           += fh->f_io_array[i].length;
1682         bytes_written += fh->f_io_array[i].length;
1683         bytes_to_write-= fh->f_io_array[i].length;
1684         i++;
1685 
1686         if ( pos == (int)io_array[array_pos].length ) {
1687             pos = 0;
1688             if ((array_pos + 1) < num_entries) {
1689                 array_pos++;
1690             }
1691             else {
1692                 break;
1693             }
1694         }
1695     }
1696     
1697     fh->f_num_of_io_entries = i;
1698     *ret_array_pos   = array_pos;
1699     *ret_pos         = pos;
1700     return bytes_written;
1701 }
1702 
1703     
1704 static int local_heap_sort (mca_io_ompio_local_io_array *io_array,
1705                             int num_entries,
1706                             int *sorted)
1707 {
1708     int i = 0;
1709     int j = 0;
1710     int left = 0;
1711     int right = 0;
1712     int largest = 0;
1713     int heap_size = num_entries - 1;
1714     int temp = 0;
1715     unsigned char done = 0;
1716     int* temp_arr = NULL;
1717 
1718     temp_arr = (int*)malloc(num_entries*sizeof(int));
1719     if (NULL == temp_arr) {
1720         opal_output (1, "OUT OF MEMORY\n");
1721         return OMPI_ERR_OUT_OF_RESOURCE;
1722     }
1723     temp_arr[0] = 0;
1724     for (i = 1; i < num_entries; ++i) {
1725         temp_arr[i] = i;
1726     }
1727     /* num_entries can be a large no. so NO RECURSION */
1728     for (i = num_entries/2-1 ; i>=0 ; i--) {
1729         done = 0;
1730         j = i;
1731         largest = j;
1732 
1733         while (!done) {
1734             left = j*2+1;
1735             right = j*2+2;
1736             if ((left <= heap_size) &&
1737                 (io_array[temp_arr[left]].offset > io_array[temp_arr[j]].offset)) {
1738                 largest = left;
1739             }
1740             else {
1741                 largest = j;
1742             }
1743             if ((right <= heap_size) &&
1744                 (io_array[temp_arr[right]].offset >
1745                  io_array[temp_arr[largest]].offset)) {
1746                 largest = right;
1747             }
1748             if (largest != j) {
1749                 temp = temp_arr[largest];
1750                 temp_arr[largest] = temp_arr[j];
1751                 temp_arr[j] = temp;
1752                 j = largest;
1753             }
1754             else {
1755                 done = 1;
1756             }
1757         }
1758     }
1759 
1760     for (i = num_entries-1; i >=1; --i) {
1761         temp = temp_arr[0];
1762         temp_arr[0] = temp_arr[i];
1763         temp_arr[i] = temp;
1764         heap_size--;
1765         done = 0;
1766         j = 0;
1767         largest = j;
1768 
1769         while (!done) {
1770             left =  j*2+1;
1771             right = j*2+2;
1772 
1773             if ((left <= heap_size) &&
1774                 (io_array[temp_arr[left]].offset >
1775                  io_array[temp_arr[j]].offset)) {
1776                 largest = left;
1777             }
1778             else {
1779                 largest = j;
1780             }
1781             if ((right <= heap_size) &&
1782                 (io_array[temp_arr[right]].offset >
1783                  io_array[temp_arr[largest]].offset)) {
1784                 largest = right;
1785             }
1786             if (largest != j) {
1787                 temp = temp_arr[largest];
1788                 temp_arr[largest] = temp_arr[j];
1789                 temp_arr[j] = temp;
1790                 j = largest;
1791             }
1792             else {
1793                 done = 1;
1794             }
1795         }
1796         sorted[i] = temp_arr[i];
1797     }
1798     sorted[0] = temp_arr[0];
1799 
1800     if (NULL != temp_arr) {
1801         free(temp_arr);
1802         temp_arr = NULL;
1803     }
1804     return OMPI_SUCCESS;
1805 }
1806 
1807 

/* [<][>][^][v][top][bottom][index][help] */