root/ompi/mca/common/ompio/common_ompio_aggregators.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_common_ompio_simple_grouping
  2. mca_common_ompio_forced_grouping
  3. mca_common_ompio_fview_based_grouping
  4. mca_common_ompio_cart_based_grouping
  5. mca_common_ompio_finalize_initial_grouping
  6. mca_common_ompio_set_aggregator_props
  7. mca_common_ompio_create_groups
  8. mca_common_ompio_merge_initial_groups
  9. mca_common_ompio_split_initial_groups
  10. mca_common_ompio_retain_initial_groups
  11. mca_common_ompio_merge_groups
  12. mca_common_ompio_split_a_group
  13. mca_common_ompio_finalize_split
  14. mca_common_ompio_prepare_to_group
  15. cost_calc

   1 /* -*- Mode: C; c-basic-offset:4 ; -*- */
   2 /*
   3  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   4  *                         University Research and Technology
   5  *                         Corporation.  All rights reserved.
   6  * Copyright (c) 2004-2017 The University of Tennessee and The University
   7  *                         of Tennessee Research Foundation.  All rights
   8  *                         reserved.
   9  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
  10  *                         University of Stuttgart.  All rights reserved.
  11  * Copyright (c) 2004-2005 The Regents of the University of California.
  12  *                         All rights reserved.
  13  * Copyright (c) 2008-2017 University of Houston. All rights reserved.
  14  * Copyright (c) 2011-2018 Cisco Systems, Inc.  All rights reserved
  15  * Copyright (c) 2012-2013 Inria.  All rights reserved.
  16  * Copyright (c) 2015-2018 Research Organization for Information Science
  17  *                         and Technology (RIST). All rights reserved.
  18  * Copyright (c) 2017      IBM Corporation. All rights reserved.
  19  * $COPYRIGHT$
  20  *
  21  * Additional copyrights may follow
  22  *
  23  * $HEADER$
  24  */
  25 
  26 #include "ompi_config.h"
  27 
  28 #include "ompi/runtime/params.h"
  29 #include "ompi/communicator/communicator.h"
  30 #include "ompi/mca/pml/pml.h"
  31 #include "ompi/mca/topo/topo.h"
  32 #include "ompi/mca/fcoll/base/fcoll_base_coll_array.h"
  33 #include "opal/datatype/opal_convertor.h"
  34 #include "opal/datatype/opal_datatype.h"
  35 #include "ompi/datatype/ompi_datatype.h"
  36 #include "ompi/info/info.h"
  37 #include "ompi/request/request.h"
  38 
  39 #include <math.h>
  40 #include <unistd.h>
  41 
  42 #include "common_ompio.h"
  43 
  44 /*
  45 ** This file contains all the functionality related to determing the number of aggregators
  46 ** and the list of aggregators.
  47 **
  48 ** The first group functions determines the number of aggregators based on various characteristics
  49 ** 
  50 ** 1. simple_grouping: A heuristic based on a cost model
  51 ** 2. fview_based_grouping: analysis the fileview to detect regular patterns
  52 ** 3. cart_based_grouping: uses a cartesian communicator to derive certain (probable) properties
  53 **    of the access pattern
  54 */
  55 
  56 static double cost_calc (int P, int P_agg, size_t Data_proc, size_t coll_buffer, int dim );
  57 #define DIM1 1
  58 #define DIM2 2
  59 
  60 int mca_common_ompio_simple_grouping(ompio_file_t *fh,
  61                                      int *num_groups_out,
  62                                      mca_common_ompio_contg *contg_groups)
  63 {
  64     int num_groups=1;
  65 
  66     double time=0.0, time_prev=0.0, dtime=0.0, dtime_abs=0.0, dtime_diff=0.0, dtime_prev=0.0;
  67     double dtime_threshold=0.0;
  68 
  69     /* This is the threshold for absolute improvement. It is not 
  70     ** exposed as an MCA parameter to avoid overwhelming users. It is 
  71     ** mostly relevant for smaller process counts and data volumes. 
  72     */
  73     double time_threshold=0.001; 
  74 
  75     int incr=1, mode=1;
  76     int P_a, P_a_prev;
  77 
  78     /* The aggregator selection algorithm is based on the formulas described
  79     ** in: Shweta Jha, Edgar Gabriel, 'Performance Models for Communication in
  80     ** Collective I/O operations', Proceedings of the 17th IEEE/ACM Symposium
  81     ** on Cluster, Cloud and Grid Computing, Workshop on Theoretical
  82     ** Approaches to Performance Evaluation, Modeling and Simulation, 2017.
  83     **
  84     ** The current implementation is based on the 1-D and 2-D models derived for the even
  85     ** file partitioning strategy in the paper. Note, that the formulas currently only model
  86     ** the communication aspect of collective I/O operations. There are two extensions in this
  87     ** implementation: 
  88     ** 
  89     ** 1. Since the resulting formula has an asymptotic behavior w.r.t. the
  90     ** no. of aggregators, this version determines the no. of aggregators to
  91     ** be used iteratively and stops increasing the no. of aggregators if the
  92     ** benefits of increasing the aggregators is below a certain threshold
  93     ** value relative to the last number tested. The aggresivnes of cutting of
  94     ** the increasie in the number of aggregators is controlled by the new mca
  95     ** parameter mca_io_ompio_aggregator_cutoff_threshold.  Lower values for
  96     ** this parameter will lead to higher number of aggregators (useful e.g
  97     ** for PVFS2 and GPFS file systems), while higher number will lead to
  98     ** lower no. of aggregators (useful for regular UNIX or NFS file systems).
  99     **
 100     ** 2. The algorithm further caps the maximum no. of aggregators used to not exceed
 101     ** (no. of processes / mca_io_ompio_max_aggregators_ratio), i.e. a higher value
 102     ** for mca_io_ompio_max_aggregators will decrease the maximum number of aggregators
 103     ** allowed for the given no. of processes.
 104     */
 105     dtime_threshold = (double) OMPIO_MCA_GET(fh, aggregators_cutoff_threshold) / 100.0;
 106 
 107     /* Determine whether to use the formula for 1-D or 2-D data decomposition. Anything
 108     ** that is not 1-D is assumed to be 2-D in this version
 109     */ 
 110     mode = ( fh->f_cc_size == fh->f_view_size ) ? 1 : 2;
 111 
 112     /* Determine the increment size when searching the optimal
 113     ** no. of aggregators 
 114     */
 115     if ( fh->f_size < 16 ) {
 116         incr = 2;
 117     }
 118     else if (fh->f_size < 128 ) {
 119         incr = 4;
 120     }
 121     else if ( fh->f_size < 4096 ) {
 122         incr = 16;
 123     }
 124     else {
 125         incr = 32;
 126     }
 127 
 128     P_a = 1;
 129     time_prev = cost_calc ( fh->f_size, P_a, fh->f_view_size, (size_t) fh->f_bytes_per_agg, mode );
 130     P_a_prev = P_a;
 131     for ( P_a = incr; P_a <= fh->f_size; P_a += incr ) {
 132         time = cost_calc ( fh->f_size, P_a, fh->f_view_size, (size_t) fh->f_bytes_per_agg, mode );
 133         dtime_abs = (time_prev - time);
 134         dtime = dtime_abs / time_prev;
 135         dtime_diff = ( P_a == incr ) ? dtime : (dtime_prev - dtime);
 136 #ifdef OMPIO_DEBUG
 137         if ( 0 == fh->f_rank  ){
 138             printf(" d_p = %ld P_a = %d time = %lf dtime = %lf dtime_abs =%lf dtime_diff=%lf\n", 
 139                    fh->f_view_size, P_a, time, dtime, dtime_abs, dtime_diff );
 140         }
 141 #endif
 142         if ( dtime_diff < dtime_threshold ) {
 143             /* The relative improvement compared to the last number
 144             ** of aggregators was below a certain threshold. This is typically
 145             ** the dominating factor for large data volumes and larger process
 146             ** counts 
 147             */
 148 #ifdef OMPIO_DEBUG
 149             if ( 0 == fh->f_rank ) {
 150                 printf("dtime_diff below threshold\n");
 151             }
 152 #endif
 153             break;
 154         }
 155         if ( dtime_abs < time_threshold ) {
 156             /* The absolute improvement compared to the last number 
 157             ** of aggregators was below a given threshold. This is typically
 158             ** important for small data valomes and smallers process counts
 159             */
 160 #ifdef OMPIO_DEBUG
 161             if ( 0 == fh->f_rank ) {
 162                 printf("dtime_abs below threshold\n");
 163             }
 164 #endif
 165             break;
 166         }
 167         time_prev = time;
 168         dtime_prev = dtime;
 169         P_a_prev = P_a; 
 170     }
 171     num_groups = P_a_prev;
 172 #ifdef OMPIO_DEBUG
 173     printf(" For P=%d d_p=%ld b_c=%d threshold=%f chosen P_a = %d \n", 
 174            fh->f_size, fh->f_view_size, fh->f_bytes_per_agg, dtime_threshold, P_a_prev);
 175 #endif
 176     
 177     /* Cap the maximum number of aggregators.*/
 178     if ( num_groups > (fh->f_size/OMPIO_MCA_GET(fh, max_aggregators_ratio))) {
 179         num_groups = (fh->f_size/OMPIO_MCA_GET(fh, max_aggregators_ratio));
 180     }
 181     if ( 1 >= num_groups ) {
 182         num_groups = 1;
 183     }
 184     
 185     *num_groups_out = num_groups;
 186     return mca_common_ompio_forced_grouping ( fh, num_groups, contg_groups);
 187 }
 188 
 189 int  mca_common_ompio_forced_grouping ( ompio_file_t *fh,
 190                                         int num_groups,
 191                                         mca_common_ompio_contg *contg_groups)
 192 {
 193     int group_size = fh->f_size / num_groups;
 194     int rest = fh->f_size % num_groups;
 195     int flag = OMPI_COMM_IS_MAPBY_NODE (&ompi_mpi_comm_world.comm);
 196     int k=0, p=0, g=0;
 197     int total_procs = 0; 
 198 
 199     for ( k=0, p=0; p<num_groups; p++ ) {
 200         if ( p < rest ) {
 201             contg_groups[p].procs_per_contg_group = group_size+1;
 202             total_procs +=(group_size+1);
 203         }
 204         else {
 205             contg_groups[p].procs_per_contg_group = group_size;
 206             total_procs +=group_size;
 207         }
 208 
 209         if ( flag ) {
 210             /* Map by node used for MPI_COMM_WORLD */
 211             for ( g=0; g<contg_groups[p].procs_per_contg_group; g++ ) {
 212                 k = g*num_groups+p;
 213                 contg_groups[p].procs_in_contg_group[g] = k;
 214             }
 215         }
 216         else {
 217             for ( g=0; g<contg_groups[p].procs_per_contg_group; g++ ) {
 218                 contg_groups[p].procs_in_contg_group[g] = k;
 219                 k++;
 220             }
 221         }
 222     }    
 223 
 224     return OMPI_SUCCESS;
 225 }
 226 
 227 int mca_common_ompio_fview_based_grouping(ompio_file_t *fh,
 228                                           int *num_groups,
 229                                           mca_common_ompio_contg *contg_groups)
 230 {
 231 
 232     int k = 0;
 233     int p = 0;
 234     int g = 0;
 235     int ret = OMPI_SUCCESS;
 236     OMPI_MPI_OFFSET_TYPE start_offset_len[3] = {0};
 237     OMPI_MPI_OFFSET_TYPE *end_offsets = NULL;
 238     OMPI_MPI_OFFSET_TYPE *start_offsets_lens = NULL;
 239 
 240     //Store start offset,length and corresponding rank in an array
 241     if(NULL == fh->f_decoded_iov){
 242       start_offset_len[0] = 0;
 243       start_offset_len[1] = 0;
 244     }
 245     else{
 246        start_offset_len[0] = (OMPI_MPI_OFFSET_TYPE) fh->f_decoded_iov[0].iov_base;
 247        start_offset_len[1] = fh->f_decoded_iov[0].iov_len;
 248     }
 249     start_offset_len[2] = fh->f_rank;
 250 
 251     start_offsets_lens = (OMPI_MPI_OFFSET_TYPE* )malloc (3 * fh->f_size * sizeof(OMPI_MPI_OFFSET_TYPE));
 252     if (NULL == start_offsets_lens) {
 253         opal_output (1, "OUT OF MEMORY\n");
 254         ret = OMPI_ERR_OUT_OF_RESOURCE;
 255         goto exit;
 256     }
 257     end_offsets = (OMPI_MPI_OFFSET_TYPE* )malloc (fh->f_size * sizeof(OMPI_MPI_OFFSET_TYPE));
 258     if (NULL == end_offsets) {
 259         opal_output (1, "OUT OF MEMORY\n");
 260         ret = OMPI_ERR_OUT_OF_RESOURCE;
 261         goto exit;
 262     }
 263     
 264     //Allgather start offsets across processes in a group on aggregator
 265     ret = fh->f_comm->c_coll->coll_allgather (start_offset_len,
 266                                              3,
 267                                              OMPI_OFFSET_DATATYPE,
 268                                              start_offsets_lens,
 269                                              3,
 270                                              OMPI_OFFSET_DATATYPE,
 271                                              fh->f_comm,
 272                                              fh->f_comm->c_coll->coll_allgather_module);
 273     if ( OMPI_SUCCESS != ret ) {
 274         goto exit;
 275     }
 276 
 277 
 278     //Calculate contg chunk size and contg subgroups
 279     for( k = 0 ; k < fh->f_size; k++){
 280         end_offsets[k] = start_offsets_lens[3*k] + start_offsets_lens[3*k+1];
 281         contg_groups[k].contg_chunk_size = 0;
 282     }
 283     k = 0;
 284     while( k < fh->f_size){
 285         if( k == 0){
 286             contg_groups[p].contg_chunk_size += start_offsets_lens[3*k+1];
 287             contg_groups[p].procs_in_contg_group[g] = start_offsets_lens[3*k + 2];
 288             g++;
 289             contg_groups[p].procs_per_contg_group = g;
 290             k++;
 291         }
 292         else if( start_offsets_lens[3*k] == end_offsets[k - 1] ){
 293             contg_groups[p].contg_chunk_size += start_offsets_lens[3*k+1];
 294             contg_groups[p].procs_in_contg_group[g] = start_offsets_lens[3*k + 2];
 295             g++;
 296             contg_groups[p].procs_per_contg_group = g;
 297             k++;
 298         }
 299         else{
 300             p++;
 301             g = 0;
 302             contg_groups[p].contg_chunk_size += start_offsets_lens[3*k+1];
 303             contg_groups[p].procs_in_contg_group[g] = start_offsets_lens[3*k + 2];
 304             g++;
 305             contg_groups[p].procs_per_contg_group = g;
 306             k++;
 307         }
 308     }
 309     
 310     *num_groups = p+1;
 311     ret = OMPI_SUCCESS;
 312 
 313 exit:
 314     if (NULL != start_offsets_lens) {
 315         free (start_offsets_lens);
 316     }
 317     if (NULL != end_offsets) {
 318         free(end_offsets);
 319     }
 320  
 321     return ret;
 322 }
 323 
 324 int mca_common_ompio_cart_based_grouping(ompio_file_t *ompio_fh, 
 325                                          int *num_groups,
 326                                          mca_common_ompio_contg *contg_groups)
 327 {
 328     int k = 0;
 329     int g=0;
 330     int ret = OMPI_SUCCESS, tmp_rank = 0;
 331     int *coords_tmp = NULL;
 332 
 333     mca_io_ompio_cart_topo_components cart_topo;
 334     memset (&cart_topo, 0, sizeof(mca_io_ompio_cart_topo_components)); 
 335 
 336     ret = ompio_fh->f_comm->c_topo->topo.cart.cartdim_get(ompio_fh->f_comm, &cart_topo.ndims);
 337     if (OMPI_SUCCESS != ret  ) {
 338         goto exit;
 339     }
 340 
 341     if (cart_topo.ndims < 2 ) {
 342         /* We shouldn't be here, this routine only works for more than 1 dimension */
 343         ret = MPI_ERR_INTERN;
 344         goto exit;
 345     }
 346 
 347     cart_topo.dims = (int*)malloc (cart_topo.ndims * sizeof(int));
 348     if (NULL == cart_topo.dims) {
 349         opal_output (1, "OUT OF MEMORY\n");
 350         ret = OMPI_ERR_OUT_OF_RESOURCE;
 351         goto exit;
 352     }
 353     cart_topo.periods = (int*)malloc (cart_topo.ndims * sizeof(int));
 354     if (NULL == cart_topo.periods) {
 355         opal_output (1, "OUT OF MEMORY\n");
 356         ret = OMPI_ERR_OUT_OF_RESOURCE;
 357         goto exit;
 358     }
 359     cart_topo.coords = (int*)malloc (cart_topo.ndims * sizeof(int));
 360     if (NULL == cart_topo.coords) {
 361         opal_output (1, "OUT OF MEMORY\n");
 362         ret = OMPI_ERR_OUT_OF_RESOURCE;
 363         goto exit;
 364     }
 365 
 366     coords_tmp  = (int*)malloc (cart_topo.ndims * sizeof(int));
 367     if (NULL == coords_tmp) {
 368         opal_output (1, "OUT OF MEMORY\n");
 369         ret = OMPI_ERR_OUT_OF_RESOURCE;
 370         goto exit;
 371     }
 372 
 373     ret = ompio_fh->f_comm->c_topo->topo.cart.cart_get(ompio_fh->f_comm,
 374                                                        cart_topo.ndims,
 375                                                        cart_topo.dims,
 376                                                        cart_topo.periods,
 377                                                        cart_topo.coords);
 378     if ( OMPI_SUCCESS != ret ) {
 379         opal_output (1, "mca_io_ompio_cart_based_grouping: Error in cart_get \n");
 380         goto exit;
 381     }
 382 
 383     *num_groups = cart_topo.dims[0];  //number of rows    
 384 
 385     for(k = 0; k < cart_topo.dims[0]; k++){
 386         int done = 0;
 387         int index = cart_topo.ndims-1;
 388 
 389         memset ( coords_tmp, 0, cart_topo.ndims * sizeof(int));
 390         contg_groups[k].procs_per_contg_group = (ompio_fh->f_size / cart_topo.dims[0]);
 391         coords_tmp[0] = k;
 392 
 393         ret = ompio_fh->f_comm->c_topo->topo.cart.cart_rank (ompio_fh->f_comm,coords_tmp,&tmp_rank);
 394         if ( OMPI_SUCCESS != ret ) {
 395             opal_output (1, "mca_io_ompio_cart_based_grouping: Error in cart_rank\n");
 396             goto exit;
 397         }
 398         contg_groups[k].procs_in_contg_group[0] = tmp_rank;
 399 
 400         for ( g=1; g< contg_groups[k].procs_per_contg_group; g++ ) {
 401             done = 0;
 402             index = cart_topo.ndims-1;
 403   
 404             while ( ! done ) { 
 405                 coords_tmp[index]++;
 406                 if ( coords_tmp[index] ==cart_topo.dims[index] ) {
 407                     coords_tmp[index]=0;
 408                     index--;
 409                 }
 410                 else {
 411                     done = 1;
 412                 }
 413                 if ( index == 0 ) {
 414                     done = 1;
 415                 }
 416             }
 417 
 418            ret = ompio_fh->f_comm->c_topo->topo.cart.cart_rank (ompio_fh->f_comm,coords_tmp,&tmp_rank);
 419            if ( OMPI_SUCCESS != ret ) {
 420              opal_output (1, "mca_io_ompio_cart_based_grouping: Error in cart_rank\n");
 421              goto exit;
 422            }
 423            contg_groups[k].procs_in_contg_group[g] = tmp_rank;
 424         }
 425     }
 426 
 427 
 428 exit:
 429     if (NULL != cart_topo.dims) {
 430        free (cart_topo.dims);
 431        cart_topo.dims = NULL;
 432     }
 433     if (NULL != cart_topo.periods) {
 434        free (cart_topo.periods);
 435        cart_topo.periods = NULL;
 436     }
 437     if (NULL != cart_topo.coords) {
 438        free (cart_topo.coords);
 439        cart_topo.coords = NULL;
 440     }
 441     if (NULL != coords_tmp) {
 442        free (coords_tmp);
 443        coords_tmp = NULL;
 444     }
 445 
 446     return ret;
 447 }
 448 
 449 
 450 
 451 int mca_common_ompio_finalize_initial_grouping(ompio_file_t *fh,
 452                                                int num_groups,
 453                                                mca_common_ompio_contg *contg_groups)
 454 {
 455 
 456     int z = 0;
 457     int y = 0;
 458 
 459     fh->f_init_num_aggrs = num_groups;
 460     if (NULL != fh->f_init_aggr_list) {
 461         free(fh->f_init_aggr_list);
 462     }
 463     fh->f_init_aggr_list = (int*)malloc (fh->f_init_num_aggrs * sizeof(int));
 464     if (NULL == fh->f_init_aggr_list) {
 465         opal_output (1, "OUT OF MEMORY\n");
 466         return OMPI_ERR_OUT_OF_RESOURCE;
 467     }
 468 
 469     for( z = 0 ;z < num_groups; z++){
 470         for( y = 0; y < contg_groups[z].procs_per_contg_group; y++){
 471             if ( fh->f_rank == contg_groups[z].procs_in_contg_group[y] ) {
 472                 fh->f_init_procs_per_group = contg_groups[z].procs_per_contg_group;
 473                 if (NULL != fh->f_init_procs_in_group) {
 474                     free(fh->f_init_procs_in_group);
 475                 }
 476                 fh->f_init_procs_in_group = (int*)malloc (fh->f_init_procs_per_group * sizeof(int));
 477                 if (NULL == fh->f_init_procs_in_group) {
 478                     opal_output (1, "OUT OF MEMORY\n");
 479                     return OMPI_ERR_OUT_OF_RESOURCE;
 480                 }
 481                 memcpy ( fh->f_init_procs_in_group, contg_groups[z].procs_in_contg_group, 
 482                          contg_groups[z].procs_per_contg_group * sizeof (int));
 483                 
 484             }
 485         }
 486     }
 487 
 488     for( z = 0 ;z < num_groups; z++){
 489         fh->f_init_aggr_list[z] = contg_groups[z].procs_in_contg_group[0];
 490     }
 491 
 492 
 493    return OMPI_SUCCESS;
 494 }
 495 
 496 /*****************************************************************************************************/
 497 /*****************************************************************************************************/
 498 /*****************************************************************************************************/
 499 /* 
 500 ** This function is called by the collective I/O operations to determine the final number
 501 ** of aggregators.
 502 */
 503 
 504 int mca_common_ompio_set_aggregator_props (struct ompio_file_t *fh,
 505                                            int num_aggregators,
 506                                            size_t bytes_per_proc)
 507 {
 508     int j;
 509     int ret=OMPI_SUCCESS;
 510 
 511     fh->f_flags |= OMPIO_AGGREGATOR_IS_SET;
 512 
 513     if ( (-1 == num_aggregators) && 
 514          ((SIMPLE        != OMPIO_MCA_GET(fh, grouping_option) &&
 515            NO_REFINEMENT != OMPIO_MCA_GET(fh, grouping_option) &&
 516            SIMPLE_PLUS   != OMPIO_MCA_GET(fh, grouping_option) ))) {
 517         ret = mca_common_ompio_create_groups(fh,bytes_per_proc);
 518     }
 519     else {
 520         fh->f_procs_per_group  = fh->f_init_procs_per_group;
 521         fh->f_procs_in_group   = (int*)malloc (fh->f_procs_per_group * sizeof(int));
 522         if (NULL == fh->f_procs_in_group) {
 523             opal_output (1, "OUT OF MEMORY\n");
 524             return OMPI_ERR_OUT_OF_RESOURCE;
 525         }
 526         for (j=0 ; j<fh->f_procs_per_group ; j++) {
 527             fh->f_procs_in_group[j] = fh->f_init_procs_in_group[j];
 528         }
 529         
 530         fh->f_num_aggrs = fh->f_init_num_aggrs;
 531         fh->f_aggr_list = (int*) malloc ( fh->f_num_aggrs * sizeof(int));
 532         if (NULL == fh->f_aggr_list ) {
 533             opal_output (1, "OUT OF MEMORY\n");
 534             return OMPI_ERR_OUT_OF_RESOURCE;
 535         }
 536         for (j=0 ; j<fh->f_num_aggrs; j++) {
 537             fh->f_aggr_list[j] = fh->f_init_aggr_list[j];
 538         }            
 539     }
 540 
 541     return ret;
 542 }
 543 
 544 
 545 
 546 /*****************************************************************************************************/
 547 /*****************************************************************************************************/
 548 /*****************************************************************************************************/
 549 int mca_common_ompio_create_groups(ompio_file_t *fh,
 550                                    size_t bytes_per_proc)
 551 {
 552 
 553     int is_aggregator = 0;
 554     int final_aggr = 0;
 555     int final_num_aggrs = 0;
 556     int ret = OMPI_SUCCESS, ompio_grouping_flag = 0;
 557     int *tmp_final_aggrs=NULL;
 558     int *decision_list = NULL;
 559     int i,j;
 560 
 561     OMPI_MPI_OFFSET_TYPE *start_offsets_lens = NULL;
 562     OMPI_MPI_OFFSET_TYPE *end_offsets = NULL;
 563     OMPI_MPI_OFFSET_TYPE bytes_per_group = 0;
 564     OMPI_MPI_OFFSET_TYPE *aggr_bytes_per_group = NULL;
 565 
 566     ret = mca_common_ompio_prepare_to_group(fh,
 567                                             &start_offsets_lens,
 568                                             &end_offsets,
 569                                             &aggr_bytes_per_group,
 570                                             &bytes_per_group,
 571                                             &decision_list,
 572                                             bytes_per_proc,
 573                                             &is_aggregator,
 574                                             &ompio_grouping_flag);
 575     if ( OMPI_SUCCESS != ret ) {
 576         opal_output (1, "mca_common_ompio_create_groups: error in mca_common_ompio_prepare_to_group\n");
 577         goto exit;
 578     }
 579 
 580     switch(ompio_grouping_flag){
 581 
 582         case OMPIO_SPLIT:
 583             ret = mca_common_ompio_split_initial_groups(fh,
 584                                                         start_offsets_lens,
 585                                                         end_offsets,
 586                                                         bytes_per_group);
 587         break;
 588 
 589         case OMPIO_MERGE:
 590             ret = mca_common_ompio_merge_initial_groups(fh,
 591                                                         aggr_bytes_per_group,
 592                                                         decision_list,
 593                                                         is_aggregator);
 594             break;
 595             
 596         case  OMPIO_RETAIN:
 597 
 598             ret = mca_common_ompio_retain_initial_groups(fh);
 599 
 600         break;
 601 
 602 
 603     }
 604     if ( OMPI_SUCCESS != ret ) {
 605         opal_output (1, "mca_common_ompio_create_groups: error in subroutine called within switch statement\n");
 606         goto exit;
 607     }
 608     
 609     //Set aggregator index
 610 
 611     //Calculate final number of aggregators
 612     if(fh->f_rank == fh->f_procs_in_group[0]){
 613            final_aggr = 1;
 614     }
 615     ret = fh->f_comm->c_coll->coll_allreduce (&final_aggr,
 616                                              &final_num_aggrs,
 617                                              1,
 618                                              MPI_INT,
 619                                              MPI_SUM,
 620                                              fh->f_comm,
 621                                              fh->f_comm->c_coll->coll_allreduce_module);
 622     if ( OMPI_SUCCESS != ret ) {
 623         opal_output (1, "mca_common_ompio_create_groups: error in allreduce\n");
 624         goto exit;
 625     }
 626 
 627     tmp_final_aggrs =(int*) malloc ( fh->f_size *sizeof(int));
 628     if ( NULL == tmp_final_aggrs ) {
 629         opal_output(1,"mca_common_ompio_create_groups: could not allocate memory\n");
 630         goto exit;
 631     }
 632     ret = fh->f_comm->c_coll->coll_allgather (&final_aggr,
 633                                               1, 
 634                                               MPI_INT,
 635                                               tmp_final_aggrs,
 636                                               1,
 637                                               MPI_INT,
 638                                               fh->f_comm,
 639                                               fh->f_comm->c_coll->coll_allgather_module);
 640     if ( OMPI_SUCCESS != ret ) {
 641         opal_output (1, "mca_common_ompio_create_groups: error in allreduce\n");
 642         goto exit;
 643     }
 644     
 645 
 646     //Set final number of aggregators in file handle
 647     fh->f_num_aggrs = final_num_aggrs;
 648     fh->f_aggr_list = (int*) malloc (fh->f_num_aggrs * sizeof(int));
 649     if ( NULL == fh->f_aggr_list ) {
 650         opal_output(1,"mca_common_ompio_create_groups: could not allocate memory\n");
 651         goto exit;
 652     }
 653     
 654     int found;
 655     for ( i=0, j=0; i<fh->f_num_aggrs; i++ ) {
 656         found = 0; 
 657         do {
 658             if ( 1 == tmp_final_aggrs[j] ) {
 659                 fh->f_aggr_list[i] = j;
 660                 found=1;
 661             }
 662             j++;
 663         } while ( !found && j < fh->f_size);       
 664     }
 665 
 666 exit:
 667 
 668     if (NULL != start_offsets_lens) {
 669         free (start_offsets_lens);
 670     }
 671     if (NULL != end_offsets) {
 672         free (end_offsets);
 673     }
 674     if(NULL != aggr_bytes_per_group){
 675         free(aggr_bytes_per_group);
 676     }
 677     if( NULL != decision_list){
 678         free(decision_list);
 679     }
 680     if ( NULL != tmp_final_aggrs){
 681         free(tmp_final_aggrs);
 682     }
 683 
 684    return ret;
 685 }
 686 
 687 int mca_common_ompio_merge_initial_groups(ompio_file_t *fh,
 688                                           OMPI_MPI_OFFSET_TYPE *aggr_bytes_per_group,
 689                                           int *decision_list,
 690                                           int is_aggregator){
 691 
 692     OMPI_MPI_OFFSET_TYPE sum_bytes = 0;
 693     MPI_Request *sendreqs = NULL;
 694 
 695     int start = 0;
 696     int end = 0;
 697     int i = 0;
 698     int j = 0;
 699     int r  = 0;
 700 
 701     int merge_pair_flag = 4;
 702     int first_merge_flag = 4;
 703     int *merge_aggrs = NULL;
 704     int is_new_aggregator= 0;
 705     int ret = OMPI_SUCCESS;
 706 
 707     if(is_aggregator){
 708         i = 0;
 709         sum_bytes = 0;
 710         //go through the decision list
 711         //Find the aggregators that could merge
 712 
 713         while(i < fh->f_init_num_aggrs){
 714             while(1){
 715                 if( i >= fh->f_init_num_aggrs){
 716                     break;
 717                 }
 718                 else if((decision_list[i] == OMPIO_MERGE) &&
 719                         (sum_bytes <= OMPIO_MCA_GET(fh, bytes_per_agg))){
 720                     sum_bytes = sum_bytes + aggr_bytes_per_group[i];
 721                     decision_list[i] = merge_pair_flag;
 722                     i++;
 723                 }
 724                 else if((decision_list[i] == OMPIO_MERGE) &&
 725                         (sum_bytes >= OMPIO_MCA_GET(fh, bytes_per_agg))){
 726                    if(decision_list[i+1] == OMPIO_MERGE){
 727                        merge_pair_flag++;
 728                        decision_list[i] = merge_pair_flag;
 729                        sum_bytes = aggr_bytes_per_group[i];
 730                        i++;
 731                    }
 732                    else{
 733                        decision_list[i] = merge_pair_flag;
 734                        i++;
 735                    }
 736                 }
 737                 else{
 738                     i++;
 739                     if(decision_list[i] == OMPIO_MERGE)
 740                        merge_pair_flag++;
 741                     sum_bytes = 0;
 742                     break;
 743                 }
 744             }
 745         }
 746 
 747         //Now go through the new edited decision list and
 748         //make lists of aggregators to merge and number
 749         //of groups to me merged.
 750         i = 0;
 751         j = 0;
 752 
 753         while(i < fh->f_init_num_aggrs){
 754            if(decision_list[i] >= first_merge_flag){
 755                start = i;
 756                while((decision_list[i] >= first_merge_flag) &&
 757                       (i < fh->f_init_num_aggrs-1)){
 758                    if(decision_list[i+1] == decision_list[i]){
 759                        i++;
 760                    }
 761                    else{
 762                        break;
 763                    }
 764                    end = i;
 765                }
 766                merge_aggrs = (int *)malloc((end - start + 1) * sizeof(int));
 767                if (NULL == merge_aggrs) {
 768                   opal_output (1, "OUT OF MEMORY\n");
 769                   return OMPI_ERR_OUT_OF_RESOURCE;
 770                }
 771                j = 0;
 772                for( j = 0 ; j < end - start + 1; j++){
 773                    merge_aggrs[j] = fh->f_init_aggr_list[start+j];
 774                }
 775                if(fh->f_rank == merge_aggrs[0])
 776                   is_new_aggregator = 1;
 777 
 778                for( j = 0 ; j < end-start+1 ;j++){
 779                   if(fh->f_rank == merge_aggrs[j]){
 780                       ret = mca_common_ompio_merge_groups(fh, merge_aggrs,
 781                                                           end-start+1);
 782                       if ( OMPI_SUCCESS != ret ) {
 783                           opal_output (1, "mca_common_ompio_merge_initial_groups: error in mca_common_ompio_merge_groups\n");
 784                           free ( merge_aggrs );                          
 785                           return ret;
 786                       }
 787                   }
 788                }
 789                if(NULL != merge_aggrs){
 790                    free(merge_aggrs);
 791                    merge_aggrs = NULL;
 792                }
 793 
 794            }
 795            i++;
 796         }
 797 
 798     }//end old aggregators
 799 
 800     //New aggregators communicate new grouping info to the groups
 801     if(is_new_aggregator){
 802        sendreqs = (MPI_Request *)malloc ( 2 *fh->f_procs_per_group * sizeof(MPI_Request));
 803        if (NULL == sendreqs) {
 804           return OMPI_ERR_OUT_OF_RESOURCE;
 805        }
 806        //Communicate grouping info
 807        for( j = 0 ; j < fh->f_procs_per_group; j++){
 808            if (fh->f_procs_in_group[j] == fh->f_rank ) {
 809                continue;
 810            }
 811            //new aggregator sends new procs_per_group to all its members
 812            ret = MCA_PML_CALL(isend(&fh->f_procs_per_group,
 813                                     1,
 814                                     MPI_INT,
 815                                     fh->f_procs_in_group[j],
 816                                     OMPIO_PROCS_PER_GROUP_TAG,
 817                                     MCA_PML_BASE_SEND_STANDARD,
 818                                     fh->f_comm,
 819                                     sendreqs + r++));
 820            if ( OMPI_SUCCESS != ret ) {
 821                opal_output (1, "mca_common_ompio_merge_initial_groups: error in Isend\n");
 822                goto exit;
 823            }
 824            //new aggregator sends distribution of process to all its new members
 825            ret = MCA_PML_CALL(isend(fh->f_procs_in_group,
 826                                     fh->f_procs_per_group,
 827                                     MPI_INT,
 828                                     fh->f_procs_in_group[j],
 829                                     OMPIO_PROCS_IN_GROUP_TAG,
 830                                     MCA_PML_BASE_SEND_STANDARD,
 831                                     fh->f_comm,
 832                                     sendreqs + r++));
 833            if ( OMPI_SUCCESS != ret ) {
 834                opal_output (1, "mca_common_ompio_merge_initial_groups: error in Isend 2\n");
 835                goto exit;
 836            }
 837            
 838        }
 839     }
 840     else {
 841         //All non aggregators
 842         //All processes receive initial process distribution from aggregators
 843         ret = MCA_PML_CALL(recv(&fh->f_procs_per_group,
 844                                 1,
 845                                 MPI_INT,
 846                                 MPI_ANY_SOURCE,
 847                                 OMPIO_PROCS_PER_GROUP_TAG,
 848                                 fh->f_comm,
 849                                 MPI_STATUS_IGNORE));
 850         if ( OMPI_SUCCESS != ret ) {
 851             opal_output (1, "mca_common_ompio_merge_initial_groups: error in Recv\n");
 852             return ret;
 853         }
 854         
 855         fh->f_procs_in_group = (int*)malloc (fh->f_procs_per_group * sizeof(int));
 856         if (NULL == fh->f_procs_in_group) {
 857             opal_output (1, "OUT OF MEMORY\n");
 858             return OMPI_ERR_OUT_OF_RESOURCE;
 859         }
 860 
 861         ret = MCA_PML_CALL(recv(fh->f_procs_in_group,
 862                                 fh->f_procs_per_group,
 863                                 MPI_INT,
 864                                 MPI_ANY_SOURCE,
 865                                 OMPIO_PROCS_IN_GROUP_TAG,
 866                                 fh->f_comm,
 867                                 MPI_STATUS_IGNORE));
 868         if ( OMPI_SUCCESS != ret ) {
 869             opal_output (1, "mca_common_ompio_merge_initial_groups: error in Recv 2\n");
 870             return ret;
 871         }
 872 
 873     }
 874     
 875     if(is_new_aggregator) {
 876         ret = ompi_request_wait_all (r, sendreqs, MPI_STATUSES_IGNORE);
 877     }
 878 
 879 exit:
 880     if (NULL != sendreqs) {
 881         free(sendreqs);
 882     }
 883 
 884     return ret;
 885 }
 886 
 887 int mca_common_ompio_split_initial_groups(ompio_file_t *fh,
 888                                           OMPI_MPI_OFFSET_TYPE *start_offsets_lens,
 889                                           OMPI_MPI_OFFSET_TYPE *end_offsets,
 890                                           OMPI_MPI_OFFSET_TYPE bytes_per_group){
 891 
 892 
 893     int size_new_group = 0;
 894     int size_old_group = 0;
 895     int size_last_group = 0;
 896     int size_smallest_group = 0;
 897     int num_groups = 0;
 898     int ret = OMPI_SUCCESS;
 899     OMPI_MPI_COUNT_TYPE bytes_per_agg_group = 0;
 900 
 901     OMPI_MPI_OFFSET_TYPE max_cci = 0;
 902     OMPI_MPI_OFFSET_TYPE min_cci = 0;
 903 
 904     bytes_per_agg_group = (OMPI_MPI_COUNT_TYPE)OMPIO_MCA_GET(fh, bytes_per_agg);
 905     // integer round up
 906     size_new_group = (int)(bytes_per_agg_group / bytes_per_group + (bytes_per_agg_group % bytes_per_group ? 1u : 0u));
 907     size_old_group = fh->f_init_procs_per_group;
 908 
 909     ret = mca_common_ompio_split_a_group(fh,
 910                                          start_offsets_lens,
 911                                          end_offsets,
 912                                          size_new_group,
 913                                          &max_cci,
 914                                          &min_cci,
 915                                          &num_groups,
 916                                          &size_smallest_group);
 917     if (OMPI_SUCCESS != ret ) {
 918         opal_output (1, "mca_common_ompio_split_initial_groups: error in mca_common_ompio_split_a_group\n");
 919         return ret;
 920     }
 921 
 922 
 923     switch(OMPIO_MCA_GET(fh, grouping_option)){
 924         case DATA_VOLUME:
 925             //Just use size as returned by split group
 926             size_last_group = size_smallest_group;
 927         break;
 928 
 929         case UNIFORM_DISTRIBUTION:
 930             if(size_smallest_group <= OMPIO_UNIFORM_DIST_THRESHOLD * size_new_group){
 931                 //uneven split need to call split again
 932                 if( size_old_group % num_groups == 0 ){
 933                    //most even distribution possible
 934                    size_new_group = size_old_group / num_groups;
 935                    size_last_group = size_new_group;
 936                 }
 937                 else{
 938                     //merge the last small group with the previous group
 939                     size_last_group = size_new_group + size_smallest_group;
 940                 }
 941             }
 942             else{
 943                  //Considered uniform
 944                  size_last_group = size_smallest_group;
 945             }
 946         break;
 947 
 948         case CONTIGUITY:
 949 
 950             while(1){
 951                  if((max_cci < OMPIO_CONTG_THRESHOLD) &&
 952                     (size_new_group < size_old_group)){
 953 
 954                     size_new_group = (size_new_group + size_old_group ) / 2;
 955                     ret = mca_common_ompio_split_a_group(fh,
 956                                                          start_offsets_lens,
 957                                                          end_offsets,
 958                                                          size_new_group,
 959                                                          &max_cci,
 960                                                          &min_cci,
 961                                                          &num_groups,
 962                                                          &size_smallest_group);
 963                     if (OMPI_SUCCESS != ret ) {
 964                         opal_output (1, "mca_common_ompio_split_initial_groups: error in mca_common_ompio_split_a_group 2\n");
 965                         return ret;
 966                     }
 967                  }
 968                  else{
 969                      break;
 970                  }
 971             }
 972             size_last_group = size_smallest_group;
 973         break;
 974 
 975         case OPTIMIZE_GROUPING:
 976             //This case is a combination of Data volume, contiguity and uniform distribution
 977             while(1){
 978                  if((max_cci < OMPIO_CONTG_THRESHOLD) &&
 979                     (size_new_group < size_old_group)){  //can be a better condition
 980                  //monitor the previous iteration
 981                  //break if it has not changed.
 982                      size_new_group = size_new_group + size_old_group;
 983                      // integer round up
 984                      size_new_group = size_new_group / 2 + (size_new_group % 2 ? 1 : 0);
 985                      ret = mca_common_ompio_split_a_group(fh,
 986                                                           start_offsets_lens,
 987                                                           end_offsets,
 988                                                           size_new_group,
 989                                                           &max_cci,
 990                                                           &min_cci,
 991                                                           &num_groups,
 992                                                           &size_smallest_group);
 993                     if (OMPI_SUCCESS != ret ) {
 994                         opal_output (1, "mca_common_ompio_split_initial_groups: error in mca_common_ompio_split_a_group 3\n");
 995                         return ret;
 996                     }
 997                  }
 998                  else{
 999                      break;
1000                  }
1001             }
1002 
1003            if(size_smallest_group <= OMPIO_UNIFORM_DIST_THRESHOLD * size_new_group){
1004                //uneven split need to call split again
1005                if( size_old_group % num_groups == 0 ){
1006                    //most even distribution possible
1007                    size_new_group = size_old_group / num_groups;
1008                    size_last_group = size_new_group;
1009                }
1010                else{
1011                     //merge the last small group with the previous group
1012                     size_last_group = size_new_group + size_smallest_group;
1013                }
1014            }
1015            else{
1016                //Considered uniform
1017                size_last_group = size_smallest_group;
1018            }
1019 
1020         break;
1021     }
1022 
1023     ret = mca_common_ompio_finalize_split(fh, size_new_group, size_last_group);
1024 
1025     return ret;
1026 }
1027 
1028 
1029 int mca_common_ompio_retain_initial_groups(ompio_file_t *fh){
1030 
1031     int i = 0;
1032 
1033     fh->f_procs_per_group = fh->f_init_procs_per_group;
1034     fh->f_procs_in_group = (int*)malloc (fh->f_procs_per_group * sizeof(int));
1035     if (NULL == fh->f_procs_in_group) {
1036         opal_output (1, "OUT OF MEMORY\n");
1037         return OMPI_ERR_OUT_OF_RESOURCE;
1038     }
1039     for( i = 0 ; i < fh->f_procs_per_group; i++){
1040         fh->f_procs_in_group[i] = fh->f_init_procs_in_group[i];
1041     }
1042 
1043 
1044     return OMPI_SUCCESS;
1045 }
1046 
1047 int mca_common_ompio_merge_groups(ompio_file_t *fh,
1048                                   int *merge_aggrs,
1049                                   int num_merge_aggrs)
1050 {
1051     int i = 0;
1052     int *sizes_old_group;
1053     int ret;
1054     int *displs = NULL;
1055 
1056     sizes_old_group = (int*)malloc(num_merge_aggrs * sizeof(int));
1057     if (NULL == sizes_old_group) {
1058         opal_output (1, "OUT OF MEMORY\n");
1059         ret = OMPI_ERR_OUT_OF_RESOURCE;
1060         goto exit;
1061     }
1062 
1063 
1064     displs = (int*)malloc(num_merge_aggrs * sizeof(int));
1065     if (NULL == displs) {
1066         opal_output (1, "OUT OF MEMORY\n");
1067         ret = OMPI_ERR_OUT_OF_RESOURCE;
1068         goto exit;
1069     }
1070 
1071 
1072     //merge_aggrs[0] is considered the new aggregator
1073     //New aggregator collects group sizes of the groups to be merged
1074     ret = ompi_fcoll_base_coll_allgather_array (&fh->f_init_procs_per_group,
1075                                            1,
1076                                            MPI_INT,
1077                                            sizes_old_group,
1078                                            1,
1079                                            MPI_INT,
1080                                            0,
1081                                            merge_aggrs,
1082                                            num_merge_aggrs,
1083                                            fh->f_comm);
1084     
1085     if ( OMPI_SUCCESS != ret ) {
1086         goto exit;
1087     }
1088     fh->f_procs_per_group = 0;
1089 
1090 
1091     for( i = 0; i < num_merge_aggrs; i++){
1092         fh->f_procs_per_group = fh->f_procs_per_group + sizes_old_group[i];
1093     }
1094 
1095     displs[0] = 0;
1096     for(i = 1; i < num_merge_aggrs; i++){
1097           displs[i] = displs[i-1] + sizes_old_group[i-1];
1098     }
1099 
1100     fh->f_procs_in_group = (int*)malloc (fh->f_procs_per_group * sizeof(int));
1101     if (NULL == fh->f_procs_in_group) {
1102         opal_output (1, "OUT OF MEMORY\n");
1103         ret = OMPI_ERR_OUT_OF_RESOURCE;
1104         goto exit;
1105     }
1106 
1107     //New aggregator also collects the grouping distribution
1108     //This is the actual merge
1109     //use allgatherv array
1110     ret = ompi_fcoll_base_coll_allgatherv_array (fh->f_init_procs_in_group,
1111                                             fh->f_init_procs_per_group,
1112                                             MPI_INT,
1113                                             fh->f_procs_in_group,
1114                                             sizes_old_group,
1115                                             displs,
1116                                             MPI_INT,
1117                                             0,
1118                                             merge_aggrs,
1119                                             num_merge_aggrs,
1120                                             fh->f_comm);
1121     
1122 exit:
1123     if (NULL != displs) {
1124         free (displs);
1125     }
1126     if (NULL != sizes_old_group) {
1127         free (sizes_old_group);
1128     }
1129 
1130     return ret;
1131 
1132 }
1133 
1134 
1135 
1136 int mca_common_ompio_split_a_group(ompio_file_t *fh,
1137                                    OMPI_MPI_OFFSET_TYPE *start_offsets_lens,
1138                                    OMPI_MPI_OFFSET_TYPE *end_offsets,
1139                                    int size_new_group,
1140                                    OMPI_MPI_OFFSET_TYPE *max_cci,
1141                                    OMPI_MPI_OFFSET_TYPE *min_cci,
1142                                    int *num_groups,
1143                                    int *size_smallest_group)
1144 {
1145 
1146     OMPI_MPI_OFFSET_TYPE *cci = NULL;
1147     *num_groups = fh->f_init_procs_per_group / size_new_group;
1148     *size_smallest_group = size_new_group;
1149     int i = 0;
1150     int k = 0;
1151     int flag = 0; //all groups same size
1152     int size = 0;
1153 
1154     if( fh->f_init_procs_per_group % size_new_group != 0 ){
1155         *num_groups = *num_groups + 1;
1156         *size_smallest_group = fh->f_init_procs_per_group % size_new_group;
1157         flag = 1;
1158     }
1159 
1160     cci = (OMPI_MPI_OFFSET_TYPE*)malloc(*num_groups * sizeof( OMPI_MPI_OFFSET_TYPE ));
1161     if (NULL == cci) {
1162         opal_output(1, "OUT OF MEMORY\n");
1163         return OMPI_ERR_OUT_OF_RESOURCE;
1164     }
1165 
1166     //check contiguity within new groups
1167     size = size_new_group;
1168     for( i = 0; i < *num_groups; i++){
1169          cci[i] = start_offsets_lens[3*size_new_group*i  + 1];
1170          //if it is the last group check if it is the smallest group
1171          if( (i == *num_groups-1) && flag == 1){
1172              size = *size_smallest_group;
1173          }
1174          for( k = 0; k < size-1; k++){
1175              if( end_offsets[size_new_group* i + k] == start_offsets_lens[3*size_new_group*i + 3*(k+1)] ){
1176                  cci[i] += start_offsets_lens[3*size_new_group*i + 3*(k + 1) + 1];
1177              }
1178          }
1179      }
1180 
1181      //get min and max cci
1182      *min_cci = cci[0];
1183      *max_cci = cci[0];
1184      for( i = 1 ; i < *num_groups; i++){
1185          if(cci[i] > *max_cci){
1186              *max_cci = cci[i];
1187          }
1188          else if(cci[i] < *min_cci){
1189              *min_cci = cci[i];
1190          }
1191      }
1192 
1193      free (cci);
1194      return OMPI_SUCCESS;
1195 }
1196 
1197 int mca_common_ompio_finalize_split(ompio_file_t *fh,
1198                                     int size_new_group,
1199                                     int size_last_group)
1200 {
1201    //based on new group and last group finalize f_procs_per_group and f_procs_in_group
1202 
1203     int i = 0;
1204     int j = 0;
1205     int k = 0;
1206 
1207     for( i = 0; i < fh->f_init_procs_per_group ; i++){
1208 
1209         if( fh->f_rank == fh->f_init_procs_in_group[i]){
1210              if( i >= fh->f_init_procs_per_group - size_last_group ){
1211                  fh->f_procs_per_group = size_last_group;
1212              }
1213              else{
1214                  fh->f_procs_per_group = size_new_group;
1215              }
1216         }
1217     }
1218 
1219 
1220     fh->f_procs_in_group = (int*)malloc (fh->f_procs_per_group * sizeof(int));
1221     if (NULL == fh->f_procs_in_group) {
1222         opal_output (1, "OUT OF MEMORY\n");
1223         return OMPI_ERR_OUT_OF_RESOURCE;
1224     }
1225 
1226     for( i = 0; i < fh->f_init_procs_per_group ; i++){
1227         if( fh->f_rank == fh->f_init_procs_in_group[i]){
1228             if( i >= fh->f_init_procs_per_group - size_last_group ){
1229                //distribution of last group
1230                for( j = 0; j < fh->f_procs_per_group; j++){
1231                    fh->f_procs_in_group[j] = fh->f_init_procs_in_group[fh->f_init_procs_per_group - size_last_group + j];
1232                }
1233             }
1234             else{
1235                  //distribute all other groups
1236                  for( j = 0 ; j < fh->f_init_procs_per_group; j = j + size_new_group){
1237                      if(i >= j && i < j+size_new_group  ){
1238                          for( k = 0; k < fh->f_procs_per_group ; k++){
1239                             fh->f_procs_in_group[k] = fh->f_init_procs_in_group[j+k];
1240                          }
1241                      }
1242                  }
1243             }
1244 
1245         }
1246     }
1247 
1248     return OMPI_SUCCESS;
1249 }
1250 
1251 int mca_common_ompio_prepare_to_group(ompio_file_t *fh,
1252                                       OMPI_MPI_OFFSET_TYPE **start_offsets_lens,
1253                                       OMPI_MPI_OFFSET_TYPE **end_offsets, // need it?
1254                                       OMPI_MPI_OFFSET_TYPE **aggr_bytes_per_group,
1255                                       OMPI_MPI_OFFSET_TYPE *bytes_per_group,
1256                                       int **decision_list,
1257                                       size_t bytes_per_proc,
1258                                       int *is_aggregator,
1259                                       int *ompio_grouping_flag)
1260 {
1261 
1262     OMPI_MPI_OFFSET_TYPE start_offset_len[3] = {0};
1263     OMPI_MPI_OFFSET_TYPE *aggr_bytes_per_group_tmp = NULL;
1264     OMPI_MPI_OFFSET_TYPE *start_offsets_lens_tmp = NULL;
1265     OMPI_MPI_OFFSET_TYPE *end_offsets_tmp = NULL;
1266     int *decision_list_tmp = NULL;
1267 
1268     int i = 0;
1269     int j = 0;
1270     int k = 0;
1271     int merge_count = 0;
1272     int split_count = 0; //not req?
1273     int retain_as_is_count = 0; //not req?
1274     int ret=OMPI_SUCCESS;
1275 
1276     //Store start offset and length in an array //also add bytes per process
1277     if(NULL == fh->f_decoded_iov){
1278          start_offset_len[0] = 0;
1279          start_offset_len[1] = 0;
1280     }
1281     else{
1282          start_offset_len[0] = (OMPI_MPI_OFFSET_TYPE) fh->f_decoded_iov[0].iov_base;
1283          start_offset_len[1] = fh->f_decoded_iov[0].iov_len;
1284     }
1285     start_offset_len[2] = bytes_per_proc;
1286     start_offsets_lens_tmp = (OMPI_MPI_OFFSET_TYPE* )malloc (3 * fh->f_init_procs_per_group * sizeof(OMPI_MPI_OFFSET_TYPE));
1287     if (NULL == start_offsets_lens_tmp) {
1288         opal_output (1, "OUT OF MEMORY\n");
1289         return OMPI_ERR_OUT_OF_RESOURCE;
1290     }
1291 
1292     //Gather start offsets across processes in a group on aggregator
1293     ret = ompi_fcoll_base_coll_allgather_array (start_offset_len,
1294                                            3,
1295                                            OMPI_OFFSET_DATATYPE,
1296                                            start_offsets_lens_tmp,
1297                                            3,
1298                                            OMPI_OFFSET_DATATYPE,
1299                                            0,
1300                                            fh->f_init_procs_in_group,
1301                                            fh->f_init_procs_per_group,
1302                                            fh->f_comm);
1303     if ( OMPI_SUCCESS != ret ) {
1304         opal_output (1, "mca_common_ompio_prepare_to_group: error in ompi_fcoll_base_coll_allgather_array\n");
1305         goto exit;
1306     }
1307     end_offsets_tmp = (OMPI_MPI_OFFSET_TYPE* )malloc (fh->f_init_procs_per_group * sizeof(OMPI_MPI_OFFSET_TYPE));
1308     if (NULL == end_offsets_tmp) {
1309         opal_output (1, "OUT OF MEMORY\n");
1310         goto exit;
1311     }
1312     for( k = 0 ; k < fh->f_init_procs_per_group; k++){
1313         end_offsets_tmp[k] = start_offsets_lens_tmp[3*k] + start_offsets_lens_tmp[3*k+1];
1314     }
1315     //Every process has the total bytes written in its group
1316     for(j = 0; j < fh->f_init_procs_per_group; j++){
1317         *bytes_per_group = *bytes_per_group + start_offsets_lens_tmp[3*j+2];
1318     }
1319 
1320     *start_offsets_lens = &start_offsets_lens_tmp[0];
1321     *end_offsets = &end_offsets_tmp[0];
1322 
1323 
1324     for( j = 0 ; j < fh->f_init_num_aggrs ; j++){
1325         if(fh->f_rank == fh->f_init_aggr_list[j])
1326            *is_aggregator = 1;
1327     }
1328     //Decide groups going in for a merge or a split
1329     //Merge only if the groups are consecutive
1330     if(*is_aggregator == 1){
1331        aggr_bytes_per_group_tmp = (OMPI_MPI_OFFSET_TYPE*)malloc (fh->f_init_num_aggrs * sizeof(OMPI_MPI_OFFSET_TYPE));
1332        if (NULL == aggr_bytes_per_group_tmp) {
1333           opal_output (1, "OUT OF MEMORY\n");
1334           ret = OMPI_ERR_OUT_OF_RESOURCE;
1335           free(end_offsets_tmp);
1336           goto exit;
1337        }
1338     decision_list_tmp = (int* )malloc (fh->f_init_num_aggrs * sizeof(int));
1339     if (NULL == decision_list_tmp) {
1340         opal_output (1, "OUT OF MEMORY\n");
1341         ret = OMPI_ERR_OUT_OF_RESOURCE;
1342         free(end_offsets_tmp);
1343         if (NULL != aggr_bytes_per_group_tmp) {
1344             free(aggr_bytes_per_group_tmp);
1345         }
1346         goto exit;
1347     }
1348     //Communicate bytes per group between all aggregators
1349     ret = ompi_fcoll_base_coll_allgather_array (bytes_per_group,
1350                                            1,
1351                                            OMPI_OFFSET_DATATYPE,
1352                                            aggr_bytes_per_group_tmp,
1353                                            1,
1354                                            OMPI_OFFSET_DATATYPE,
1355                                            0,
1356                                            fh->f_init_aggr_list,
1357                                            fh->f_init_num_aggrs,
1358                                            fh->f_comm);
1359     if ( OMPI_SUCCESS != ret ) {
1360         opal_output (1, "mca_common_ompio_prepare_to_group: error in ompi_fcoll_base_coll_allgather_array 2\n");
1361         free(decision_list_tmp);
1362         goto exit;
1363     }
1364     
1365     for( i = 0; i < fh->f_init_num_aggrs; i++){
1366        if((size_t)(aggr_bytes_per_group_tmp[i])>
1367           (size_t)OMPIO_MCA_GET(fh, bytes_per_agg)){
1368           decision_list_tmp[i] = OMPIO_SPLIT;
1369           split_count++;
1370        }
1371        else if((size_t)(aggr_bytes_per_group_tmp[i])<
1372                (size_t)OMPIO_MCA_GET(fh, bytes_per_agg)){
1373             decision_list_tmp[i] = OMPIO_MERGE;
1374             merge_count++;
1375        }
1376        else{
1377            decision_list_tmp[i] = OMPIO_RETAIN;
1378            retain_as_is_count++;
1379            }
1380     }
1381 
1382     *aggr_bytes_per_group = &aggr_bytes_per_group_tmp[0];
1383     //Go through the decision list to see if non consecutive
1384     //processes intend to merge, if yes retain original grouping
1385     for( i = 0; i < fh->f_init_num_aggrs ; i++){
1386         if(decision_list_tmp[i] == OMPIO_MERGE){
1387             if( (i == 0) &&
1388                 (decision_list_tmp[i+1] != OMPIO_MERGE)){ //first group
1389                     decision_list_tmp[i] = OMPIO_RETAIN;
1390             }
1391             else if( (i == fh->f_init_num_aggrs-1) &&
1392                      (decision_list_tmp[i-1] != OMPIO_MERGE)){
1393 
1394                 decision_list_tmp[i] = OMPIO_RETAIN;
1395             }
1396             else if(!((decision_list_tmp[i-1] == OMPIO_MERGE) ||
1397                       (decision_list_tmp[i+1] == OMPIO_MERGE))){
1398 
1399                  decision_list_tmp[i] = OMPIO_RETAIN;
1400             }
1401         }
1402     }
1403 
1404     //Set the flag as per the decision list
1405     for( i = 0 ; i < fh->f_init_num_aggrs; i++){
1406         if((decision_list_tmp[i] == OMPIO_MERGE)&&
1407            (fh->f_rank == fh->f_init_aggr_list[i]))
1408            *ompio_grouping_flag = OMPIO_MERGE;
1409 
1410         if((decision_list_tmp[i] == OMPIO_SPLIT)&&
1411            (fh->f_rank == fh->f_init_aggr_list[i]))
1412            *ompio_grouping_flag = OMPIO_SPLIT;
1413 
1414         if((decision_list_tmp[i] == OMPIO_RETAIN)&&
1415            (fh->f_rank == fh->f_init_aggr_list[i]))
1416            *ompio_grouping_flag = OMPIO_RETAIN;
1417     }
1418 
1419     //print decision list of aggregators
1420     /*printf("RANK%d  : Printing decsion list   : \n",fh->f_rank);
1421     for( i = 0; i < fh->f_init_num_aggrs; i++){
1422         if(decision_list_tmp[i] == OMPIO_MERGE)
1423             printf("MERGE,");
1424         else if(decision_list_tmp[i] == OMPIO_SPLIT)
1425             printf("SPLIT, ");
1426         else if(decision_list_tmp[i] == OMPIO_RETAIN)
1427             printf("RETAIN, " );
1428     }
1429     printf("\n\n");
1430     */
1431     *decision_list = &decision_list_tmp[0];
1432     }
1433     //Communicate flag to all group members
1434     ret = ompi_fcoll_base_coll_bcast_array (ompio_grouping_flag,
1435                                        1,
1436                                        MPI_INT,
1437                                        0,
1438                                        fh->f_init_procs_in_group,
1439                                        fh->f_init_procs_per_group,
1440                                        fh->f_comm);   
1441 
1442 exit:
1443     /* Do not free aggr_bytes_per_group_tmp, 
1444     ** start_offsets_lens_tmp, and end_offsets_tmp
1445     ** here. The memory is released in the layer above.
1446     */
1447 
1448 
1449     return ret;
1450 }
1451 
1452 /*
1453 ** This is the actual formula of the cost function from the paper.
1454 ** One change made here is to use floating point values for
1455 ** all parameters, since the ceil() function leads to sometimes
1456 ** unexpected jumps in the execution time. Using float leads to 
1457 ** more consistent predictions for the no. of aggregators.
1458 */
1459 static double cost_calc (int P, int P_a, size_t d_p, size_t b_c, int dim )
1460 {
1461     float  n_as=1.0, m_s=1.0, n_s=1.0;
1462     float  n_ar=1.0;
1463     double t_send, t_recv, t_tot;
1464 
1465     /* LogGP parameters based on DDR InfiniBand values */
1466     double L=.00000184;
1467     double o=.00000149;
1468     double g=.0000119;
1469     double G=.00000000067;
1470     
1471     long file_domain = (P * d_p) / P_a;
1472     float n_r = (float)file_domain/(float) b_c;
1473     
1474     switch (dim) {
1475         case DIM1:
1476         {
1477             if( d_p > b_c ){
1478                 //printf("case 1\n");
1479                 n_ar = 1;
1480                 n_as = 1;
1481                 m_s = b_c;
1482                 n_s = (float)d_p/(float)b_c;
1483             }
1484             else {
1485                 n_ar = (float)b_c/(float)d_p;
1486                 n_as = 1;
1487                 m_s = d_p;
1488                 n_s = 1;
1489             }
1490             break;
1491         }         
1492         case DIM2:
1493         {
1494             int P_x, P_y, c;
1495             
1496             P_x = P_y = (int) sqrt(P);
1497             c = (float) P_a / (float)P_x;
1498             
1499             n_ar = (float) P_y;
1500             n_as = (float) c;
1501             if ( d_p > (P_a*b_c/P )) {
1502                 m_s = fmin(b_c / P_y, d_p);
1503             }
1504             else {
1505                 m_s = fmin(d_p * P_x / P_a, d_p);
1506             }
1507             break;        
1508         }
1509         default :
1510             printf("stop putting random values\n");
1511             break;
1512     } 
1513     
1514     n_s = (float) d_p / (float)(n_as * m_s);
1515     
1516     if( m_s < 33554432) {
1517         g = .00000108;
1518     }   
1519     t_send = n_s * (L + 2 * o + (n_as -1) * g + (m_s - 1) * n_as * G);
1520     t_recv=  n_r * (L + 2 * o + (n_ar -1) * g + (m_s - 1) * n_ar * G);;
1521     t_tot = t_send + t_recv;
1522     
1523     return t_tot;
1524 }
1525     

/* [<][>][^][v][top][bottom][index][help] */