root/ompi/mca/fcoll/dynamic/fcoll_dynamic_file_write_all.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_fcoll_dynamic_file_write_all
  2. local_heap_sort

   1 /*
   2  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2005 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2008-2015 University of Houston. All rights reserved.
  13  * Copyright (c) 2015-2018 Research Organization for Information Science
  14  *                         and Technology (RIST). All rights reserved.
  15  * Copyright (c) 2017      IBM Corporation. All rights reserved.
  16  * $COPYRIGHT$
  17  *
  18  * Additional copyrights may follow
  19  *
  20  * $HEADER$
  21  */
  22 
  23 #include "ompi_config.h"
  24 #include "fcoll_dynamic.h"
  25 
  26 #include "mpi.h"
  27 #include "ompi/constants.h"
  28 #include "ompi/mca/fcoll/fcoll.h"
  29 #include "ompi/mca/fcoll/base/fcoll_base_coll_array.h"
  30 #include "ompi/mca/common/ompio/common_ompio.h"
  31 #include "ompi/mca/io/io.h"
  32 #include "math.h"
  33 #include "ompi/mca/pml/pml.h"
  34 #include <unistd.h>
  35 
  36 
  37 #define DEBUG_ON 0
  38 
  39 /*Used for loading file-offsets per aggregator*/
  40 typedef struct mca_io_ompio_local_io_array{
  41     OMPI_MPI_OFFSET_TYPE offset;
  42     MPI_Aint             length;
  43     int                  process_id;
  44 }mca_io_ompio_local_io_array;
  45 
  46 
  47 
  48 static int local_heap_sort (mca_io_ompio_local_io_array *io_array,
  49                             int num_entries,
  50                             int *sorted);
  51 
  52 
  53 int
  54 mca_fcoll_dynamic_file_write_all (ompio_file_t *fh,
  55                                   const void *buf,
  56                                   int count,
  57                                   struct ompi_datatype_t *datatype,
  58                                   ompi_status_public_t *status)
  59 {
  60     MPI_Aint total_bytes_written = 0;  /* total bytes that have been written*/
  61     MPI_Aint total_bytes = 0;          /* total bytes to be written */
  62     MPI_Aint bytes_to_write_in_cycle = 0; /* left to be written in a cycle*/
  63     MPI_Aint bytes_per_cycle = 0;      /* total written in each cycle by each process*/
  64     int index = 0;
  65     int cycles = 0;
  66     int i=0, j=0, l=0;
  67     int n=0; /* current position in total_bytes_per_process array */
  68     MPI_Aint bytes_remaining = 0; /* how many bytes have been written from the current
  69                                      value from total_bytes_per_process */
  70     int bytes_sent = 0, ret =0;
  71     int blocks=0, entries_per_aggregator=0;
  72 
  73     /* iovec structure and count of the buffer passed in */
  74     uint32_t iov_count = 0;
  75     struct iovec *decoded_iov = NULL;
  76     int iov_index = 0;
  77     char *send_buf = NULL;
  78     size_t current_position = 0;
  79     struct iovec *local_iov_array=NULL, *global_iov_array=NULL;
  80     mca_io_ompio_local_io_array *file_offsets_for_agg=NULL;
  81     /* global iovec at the writers that contain the iovecs created from
  82        file_set_view */
  83     uint32_t total_fview_count = 0;
  84     int local_count = 0, temp_pindex;
  85     int *fview_count = NULL, *disp_index=NULL, *temp_disp_index=NULL;
  86     int current_index = 0, temp_index=0;
  87 
  88     char *global_buf = NULL;
  89     MPI_Aint global_count = 0;
  90 
  91 
  92     /* array that contains the sorted indices of the global_iov */
  93     int *sorted = NULL, *sorted_file_offsets=NULL;
  94     int *displs = NULL;
  95     int dynamic_num_io_procs;
  96     size_t max_data = 0, datatype_size = 0;
  97     int **blocklen_per_process=NULL;
  98     MPI_Aint **displs_per_process=NULL, *memory_displacements=NULL;
  99     ompi_datatype_t **recvtype = NULL;
 100     MPI_Aint *total_bytes_per_process = NULL;
 101     MPI_Request send_req=NULL, *recv_req=NULL;
 102     int my_aggregator=-1;
 103     bool sendbuf_is_contiguous = false;
 104     size_t ftype_size;
 105     ptrdiff_t ftype_extent, lb;
 106 
 107 
 108 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 109     double write_time = 0.0, start_write_time = 0.0, end_write_time = 0.0;
 110     double comm_time = 0.0, start_comm_time = 0.0, end_comm_time = 0.0;
 111     double exch_write = 0.0, start_exch = 0.0, end_exch = 0.0;
 112     mca_common_ompio_print_entry nentry;
 113 #endif
 114 
 115     opal_datatype_type_size ( &datatype->super, &ftype_size );
 116     opal_datatype_get_extent ( &datatype->super, &lb, &ftype_extent );
 117 
 118     /**************************************************************************
 119      ** 1.  In case the data is not contigous in memory, decode it into an iovec
 120      **************************************************************************/
 121     if ( ( ftype_extent == (ptrdiff_t) ftype_size)             &&
 122          opal_datatype_is_contiguous_memory_layout(&datatype->super,1) &&
 123          0 == lb ) {
 124         sendbuf_is_contiguous = true;
 125     }
 126 
 127 
 128 
 129     if (! sendbuf_is_contiguous ) {
 130         ret =   mca_common_ompio_decode_datatype ((struct ompio_file_t *) fh,
 131                                                   datatype,
 132                                                   count,
 133                                                   buf,
 134                                                   &max_data,
 135                                                   fh->f_mem_convertor, 
 136                                                   &decoded_iov,
 137                                                   &iov_count);
 138         if (OMPI_SUCCESS != ret ){
 139             goto exit;
 140         }
 141     }
 142     else {
 143         max_data = count * datatype->super.size;
 144     }
 145 
 146     if ( MPI_STATUS_IGNORE != status ) {
 147         status->_ucount = max_data;
 148     }
 149 
 150     dynamic_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators"));
 151     if ( OMPI_ERR_MAX == dynamic_num_io_procs ) {
 152         ret = OMPI_ERROR;
 153         goto exit;
 154     }
 155     ret = mca_common_ompio_set_aggregator_props ((struct ompio_file_t *) fh,
 156                                                  dynamic_num_io_procs,
 157                                                  max_data);
 158 
 159     if (OMPI_SUCCESS != ret){
 160         goto exit;
 161     }
 162     my_aggregator = fh->f_procs_in_group[0];
 163     /**************************************************************************
 164      ** 2. Determine the total amount of data to be written
 165      **************************************************************************/
 166     total_bytes_per_process = (MPI_Aint*)malloc
 167         (fh->f_procs_per_group*sizeof(MPI_Aint));
 168     if (NULL == total_bytes_per_process) {
 169         opal_output (1, "OUT OF MEMORY\n");
 170         ret = OMPI_ERR_OUT_OF_RESOURCE;
 171         goto exit;
 172     }
 173 
 174 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 175     start_comm_time = MPI_Wtime();
 176 #endif
 177     ret = ompi_fcoll_base_coll_allgather_array (&max_data,
 178                                            1,
 179                                            MPI_LONG,
 180                                            total_bytes_per_process,
 181                                            1,
 182                                            MPI_LONG,
 183                                            0,
 184                                            fh->f_procs_in_group,
 185                                            fh->f_procs_per_group,
 186                                            fh->f_comm);
 187     
 188     if( OMPI_SUCCESS != ret){
 189         goto exit;
 190     }
 191 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 192     end_comm_time = MPI_Wtime();
 193     comm_time += (end_comm_time - start_comm_time);
 194 #endif
 195 
 196     for (i=0 ; i<fh->f_procs_per_group ; i++) {
 197         total_bytes += total_bytes_per_process[i];
 198     }
 199 
 200     if (NULL != total_bytes_per_process) {
 201         free (total_bytes_per_process);
 202         total_bytes_per_process = NULL;
 203     }
 204 
 205     /*********************************************************************
 206      *** 3. Generate the local offsets/lengths array corresponding to
 207      ***    this write operation
 208      ********************************************************************/
 209     ret = fh->f_generate_current_file_view( (struct ompio_file_t *) fh,
 210                                             max_data,
 211                                             &local_iov_array,
 212                                             &local_count);
 213     if (ret != OMPI_SUCCESS){
 214         goto exit;
 215     }
 216 
 217 #if DEBUG_ON
 218     for (i=0 ; i<local_count ; i++) {
 219 
 220         printf("%d: OFFSET: %d   LENGTH: %ld\n",
 221                fh->f_rank,
 222                local_iov_array[i].iov_base,
 223                local_iov_array[i].iov_len);
 224 
 225     }
 226 #endif
 227 
 228     /*************************************************************
 229      *** 4. Allgather the offset/lengths array from all processes
 230      *************************************************************/
 231     fview_count = (int *) malloc (fh->f_procs_per_group * sizeof (int));
 232     if (NULL == fview_count) {
 233         opal_output (1, "OUT OF MEMORY\n");
 234         ret = OMPI_ERR_OUT_OF_RESOURCE;
 235         goto exit;
 236     }
 237 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 238     start_comm_time = MPI_Wtime();
 239 #endif
 240     ret = ompi_fcoll_base_coll_allgather_array (&local_count,
 241                                            1,
 242                                            MPI_INT,
 243                                            fview_count,
 244                                            1,
 245                                            MPI_INT,
 246                                            0,
 247                                            fh->f_procs_in_group,
 248                                            fh->f_procs_per_group,
 249                                            fh->f_comm);
 250     
 251     if( OMPI_SUCCESS != ret){
 252         goto exit;
 253     }
 254 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 255     end_comm_time = MPI_Wtime();
 256     comm_time += (end_comm_time - start_comm_time);
 257 #endif
 258 
 259     displs = (int*) malloc (fh->f_procs_per_group * sizeof (int));
 260     if (NULL == displs) {
 261         opal_output (1, "OUT OF MEMORY\n");
 262         ret = OMPI_ERR_OUT_OF_RESOURCE;
 263         goto exit;
 264     }
 265 
 266     displs[0] = 0;
 267     total_fview_count = fview_count[0];
 268     for (i=1 ; i<fh->f_procs_per_group ; i++) {
 269         total_fview_count += fview_count[i];
 270         displs[i] = displs[i-1] + fview_count[i-1];
 271     }
 272 
 273 #if DEBUG_ON
 274     printf("total_fview_count : %d\n", total_fview_count);
 275     if (my_aggregator == fh->f_rank) {
 276         for (i=0 ; i<fh->f_procs_per_group ; i++) {
 277             printf ("%d: PROCESS: %d  ELEMENTS: %d  DISPLS: %d\n",
 278                     fh->f_rank,
 279                     i,
 280                     fview_count[i],
 281                     displs[i]);
 282         }
 283     }
 284 #endif
 285 
 286     /* allocate the global iovec  */
 287 
 288     if (0 != total_fview_count) {
 289         global_iov_array = (struct iovec*) malloc (total_fview_count *
 290                                                    sizeof(struct iovec));
 291         if (NULL == global_iov_array){
 292             opal_output(1, "OUT OF MEMORY\n");
 293             ret = OMPI_ERR_OUT_OF_RESOURCE;
 294             goto exit;
 295         }
 296 
 297     }
 298 
 299 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 300     start_comm_time = MPI_Wtime();
 301 #endif
 302     ret = ompi_fcoll_base_coll_allgatherv_array (local_iov_array,
 303                                             local_count,
 304                                             fh->f_iov_type,
 305                                             global_iov_array,
 306                                             fview_count,
 307                                             displs,
 308                                             fh->f_iov_type,
 309                                             0,
 310                                             fh->f_procs_in_group,
 311                                             fh->f_procs_per_group,
 312                                             fh->f_comm);
 313     if (OMPI_SUCCESS != ret){
 314         goto exit;
 315     }
 316 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 317     end_comm_time = MPI_Wtime();
 318     comm_time += (end_comm_time - start_comm_time);
 319 #endif
 320 
 321     /****************************************************************************************
 322     *** 5. Sort the global offset/lengths list based on the offsets.
 323     *** The result of the sort operation is the 'sorted', an integer array,
 324     *** which contains the indexes of the global_iov_array based on the offset.
 325     *** For example, if global_iov_array[x].offset is followed by global_iov_array[y].offset
 326     *** in the file, and that one is followed by global_iov_array[z].offset, than
 327     *** sorted[0] = x, sorted[1]=y and sorted[2]=z;
 328     ******************************************************************************************/
 329     if (0 != total_fview_count) {
 330         sorted = (int *)malloc (total_fview_count * sizeof(int));
 331         if (NULL == sorted) {
 332             opal_output (1, "OUT OF MEMORY\n");
 333             ret = OMPI_ERR_OUT_OF_RESOURCE;
 334             goto exit;
 335         }
 336         ompi_fcoll_base_sort_iovec (global_iov_array, total_fview_count, sorted);
 337     }
 338 
 339     if (NULL != local_iov_array){
 340         free(local_iov_array);
 341         local_iov_array = NULL;
 342     }
 343 
 344     if (NULL != displs){
 345         free(displs);
 346         displs=NULL;
 347     }
 348 
 349 
 350 #if DEBUG_ON
 351     if (my_aggregator == fh->f_rank) {
 352         uint32_t tv=0;
 353         for (tv=0 ; tv<total_fview_count ; tv++) {
 354             printf("%d: OFFSET: %lld   LENGTH: %ld\n",
 355                    fh->f_rank,
 356                    global_iov_array[sorted[tv]].iov_base,
 357                    global_iov_array[sorted[tv]].iov_len);
 358         }
 359     }
 360 #endif
 361     /*************************************************************
 362      *** 6. Determine the number of cycles required to execute this
 363      ***    operation
 364      *************************************************************/
 365     bytes_per_cycle = fh->f_bytes_per_agg;
 366     cycles = ceil((double)total_bytes/bytes_per_cycle);
 367 
 368     if (my_aggregator == fh->f_rank) {
 369         disp_index = (int *)malloc (fh->f_procs_per_group * sizeof (int));
 370         if (NULL == disp_index) {
 371             opal_output (1, "OUT OF MEMORY\n");
 372             ret = OMPI_ERR_OUT_OF_RESOURCE;
 373             goto exit;
 374         }
 375 
 376         blocklen_per_process = (int **)calloc (fh->f_procs_per_group, sizeof (int*));
 377         if (NULL == blocklen_per_process) {
 378             opal_output (1, "OUT OF MEMORY\n");
 379             ret = OMPI_ERR_OUT_OF_RESOURCE;
 380             goto exit;
 381         }
 382 
 383         displs_per_process = (MPI_Aint **)calloc (fh->f_procs_per_group, sizeof (MPI_Aint*));
 384         if (NULL == displs_per_process) {
 385             opal_output (1, "OUT OF MEMORY\n");
 386             ret = OMPI_ERR_OUT_OF_RESOURCE;
 387             goto exit;
 388         }
 389 
 390         recv_req = (MPI_Request *)malloc ((fh->f_procs_per_group)*sizeof(MPI_Request));
 391         if ( NULL == recv_req ) {
 392             opal_output (1, "OUT OF MEMORY\n");
 393             ret = OMPI_ERR_OUT_OF_RESOURCE;
 394             goto exit;
 395         }
 396 
 397         global_buf  = (char *) malloc (bytes_per_cycle);
 398         if (NULL == global_buf){
 399             opal_output(1, "OUT OF MEMORY");
 400             ret = OMPI_ERR_OUT_OF_RESOURCE;
 401             goto exit;
 402         }
 403 
 404         recvtype = (ompi_datatype_t **) malloc (fh->f_procs_per_group  * sizeof(ompi_datatype_t *));
 405         if (NULL == recvtype) {
 406             opal_output (1, "OUT OF MEMORY\n");
 407             ret = OMPI_ERR_OUT_OF_RESOURCE;
 408             goto exit;
 409         }
 410         for(l=0;l<fh->f_procs_per_group;l++){
 411             recvtype[l] = MPI_DATATYPE_NULL;
 412         }
 413     }
 414 
 415 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 416     start_exch = MPI_Wtime();
 417 #endif
 418     n = 0;
 419     bytes_remaining = 0;
 420     current_index = 0;
 421 
 422     for (index = 0; index < cycles; index++) {
 423         /**********************************************************************
 424          ***  7a. Getting ready for next cycle: initializing and freeing buffers
 425          **********************************************************************/
 426         if (my_aggregator == fh->f_rank) {
 427             if (NULL != fh->f_io_array) {
 428                 free (fh->f_io_array);
 429                 fh->f_io_array = NULL;
 430             }
 431             fh->f_num_of_io_entries = 0;
 432 
 433             if (NULL != recvtype){
 434                 for (i =0; i< fh->f_procs_per_group; i++) {
 435                     if ( MPI_DATATYPE_NULL != recvtype[i] ) {
 436                         ompi_datatype_destroy(&recvtype[i]);
 437                         recvtype[i] = MPI_DATATYPE_NULL;
 438                     }
 439                 }
 440             }
 441 
 442             for(l=0;l<fh->f_procs_per_group;l++){
 443                 disp_index[l] =  1;
 444 
 445                 free(blocklen_per_process[l]);
 446                 free(displs_per_process[l]);
 447 
 448                 blocklen_per_process[l] = (int *) calloc (1, sizeof(int));
 449                 displs_per_process[l] = (MPI_Aint *) calloc (1, sizeof(MPI_Aint));
 450                 if (NULL == displs_per_process[l] || NULL == blocklen_per_process[l]){
 451                     opal_output (1, "OUT OF MEMORY for displs\n");
 452                     ret = OMPI_ERR_OUT_OF_RESOURCE;
 453                     goto exit;
 454                 }
 455             }
 456 
 457             if (NULL != sorted_file_offsets){
 458                 free(sorted_file_offsets);
 459                 sorted_file_offsets = NULL;
 460             }
 461 
 462             if(NULL != file_offsets_for_agg){
 463                 free(file_offsets_for_agg);
 464                 file_offsets_for_agg = NULL;
 465             }
 466 
 467             if (NULL != memory_displacements){
 468                 free(memory_displacements);
 469                 memory_displacements = NULL;
 470             }
 471 
 472         } /* (my_aggregator == fh->f_rank */
 473 
 474         /**************************************************************************
 475          ***  7b. Determine the number of bytes to be actually written in this cycle
 476          **************************************************************************/
 477         if (cycles-1 == index) {
 478             bytes_to_write_in_cycle = total_bytes - bytes_per_cycle*index;
 479         }
 480         else {
 481             bytes_to_write_in_cycle = bytes_per_cycle;
 482         }
 483 
 484 #if DEBUG_ON
 485         if (my_aggregator == fh->f_rank) {
 486             printf ("****%d: CYCLE %d   Bytes %lld**********\n",
 487                     fh->f_rank,
 488                     index,
 489                     bytes_to_write_in_cycle);
 490         }
 491 #endif
 492         /**********************************************************
 493          **Gather the Data from all the processes at the writers **
 494          *********************************************************/
 495 
 496 #if DEBUG_ON
 497         printf("bytes_to_write_in_cycle: %ld, cycle : %d\n", bytes_to_write_in_cycle,
 498                index);
 499 #endif
 500 
 501         /*****************************************************************
 502          *** 7c. Calculate how much data will be contributed in this cycle
 503          ***     by each process
 504          *****************************************************************/
 505         bytes_sent = 0;
 506 
 507         /* The blocklen and displs calculation only done at aggregators!*/
 508         while (bytes_to_write_in_cycle) {
 509 
 510             /* This next block identifies which process is the holder
 511             ** of the sorted[current_index] element;
 512             */
 513             blocks = fview_count[0];
 514             for (j=0 ; j<fh->f_procs_per_group ; j++) {
 515                 if (sorted[current_index] < blocks) {
 516                     n = j;
 517                     break;
 518                 }
 519                 else {
 520                     blocks += fview_count[j+1];
 521                 }
 522             }
 523 
 524             if (bytes_remaining) {
 525                 /* Finish up a partially used buffer from the previous  cycle */
 526 
 527                 if (bytes_remaining <= bytes_to_write_in_cycle) {
 528                     /* The data fits completely into the block */
 529                     if (my_aggregator == fh->f_rank) {
 530                         blocklen_per_process[n][disp_index[n] - 1] = bytes_remaining;
 531                         displs_per_process[n][disp_index[n] - 1] =
 532                             (ptrdiff_t)global_iov_array[sorted[current_index]].iov_base +
 533                             (global_iov_array[sorted[current_index]].iov_len
 534                              - bytes_remaining);
 535 
 536                         /* In this cases the length is consumed so allocating for
 537                            next displacement and blocklength*/
 538                         blocklen_per_process[n] = (int *) realloc
 539                             ((void *)blocklen_per_process[n], (disp_index[n]+1)*sizeof(int));
 540                         displs_per_process[n] = (MPI_Aint *) realloc
 541                             ((void *)displs_per_process[n], (disp_index[n]+1)*sizeof(MPI_Aint));
 542                         blocklen_per_process[n][disp_index[n]] = 0;
 543                         displs_per_process[n][disp_index[n]] = 0;
 544                         disp_index[n] += 1;
 545                     }
 546                     if (fh->f_procs_in_group[n] == fh->f_rank) {
 547                         bytes_sent += bytes_remaining;
 548                     }
 549                     current_index ++;
 550                     bytes_to_write_in_cycle -= bytes_remaining;
 551                     bytes_remaining = 0;
 552                     continue;
 553                 }
 554                 else {
 555                     /* the remaining data from the previous cycle is larger than the
 556                        bytes_to_write_in_cycle, so we have to segment again */
 557                     if (my_aggregator == fh->f_rank) {
 558                         blocklen_per_process[n][disp_index[n] - 1] = bytes_to_write_in_cycle;
 559                         displs_per_process[n][disp_index[n] - 1] =
 560                             (ptrdiff_t)global_iov_array[sorted[current_index]].iov_base +
 561                             (global_iov_array[sorted[current_index]].iov_len
 562                              - bytes_remaining);
 563                     }
 564 
 565                     if (fh->f_procs_in_group[n] == fh->f_rank) {
 566                         bytes_sent += bytes_to_write_in_cycle;
 567                     }
 568                     bytes_remaining -= bytes_to_write_in_cycle;
 569                     bytes_to_write_in_cycle = 0;
 570                     break;
 571                 }
 572             }
 573             else {
 574                  /* No partially used entry available, have to start a new one */
 575                 if (bytes_to_write_in_cycle <
 576                     (MPI_Aint) global_iov_array[sorted[current_index]].iov_len) {
 577                      /* This entry has more data than we can sendin one cycle */
 578                     if (my_aggregator == fh->f_rank) {
 579                         blocklen_per_process[n][disp_index[n] - 1] = bytes_to_write_in_cycle;
 580                         displs_per_process[n][disp_index[n] - 1] =
 581                             (ptrdiff_t)global_iov_array[sorted[current_index]].iov_base ;
 582                     }
 583                     if (fh->f_procs_in_group[n] == fh->f_rank) {
 584                         bytes_sent += bytes_to_write_in_cycle;
 585 
 586                     }
 587                     bytes_remaining = global_iov_array[sorted[current_index]].iov_len -
 588                         bytes_to_write_in_cycle;
 589                     bytes_to_write_in_cycle = 0;
 590                     break;
 591                 }
 592                 else {
 593                     /* Next data entry is less than bytes_to_write_in_cycle */
 594                     if (my_aggregator == fh->f_rank) {
 595                         blocklen_per_process[n][disp_index[n] - 1] =
 596                             global_iov_array[sorted[current_index]].iov_len;
 597                         displs_per_process[n][disp_index[n] - 1] = (ptrdiff_t)
 598                             global_iov_array[sorted[current_index]].iov_base;
 599 
 600                         /*realloc for next blocklength
 601                           and assign this displacement and check for next displs as
 602                           the total length of this entry has been consumed!*/
 603                         blocklen_per_process[n] =
 604                             (int *) realloc ((void *)blocklen_per_process[n], (disp_index[n]+1)*sizeof(int));
 605                         displs_per_process[n] = (MPI_Aint *)realloc
 606                             ((void *)displs_per_process[n], (disp_index[n]+1)*sizeof(MPI_Aint));
 607                         blocklen_per_process[n][disp_index[n]] = 0;
 608                         displs_per_process[n][disp_index[n]] = 0;
 609                         disp_index[n] += 1;
 610                     }
 611                     if (fh->f_procs_in_group[n] == fh->f_rank) {
 612                         bytes_sent += global_iov_array[sorted[current_index]].iov_len;
 613                     }
 614                     bytes_to_write_in_cycle -=
 615                         global_iov_array[sorted[current_index]].iov_len;
 616                     current_index ++;
 617                     continue;
 618                 }
 619             }
 620         }
 621 
 622 
 623         /*************************************************************************
 624          *** 7d. Calculate the displacement on where to put the data and allocate
 625          ***     the recieve buffer (global_buf)
 626          *************************************************************************/
 627         if (my_aggregator == fh->f_rank) {
 628             entries_per_aggregator=0;
 629             for (i=0;i<fh->f_procs_per_group; i++){
 630                 for (j=0;j<disp_index[i];j++){
 631                     if (blocklen_per_process[i][j] > 0)
 632                         entries_per_aggregator++ ;
 633                 }
 634             }
 635 
 636 #if DEBUG_ON
 637             printf("%d: cycle: %d, bytes_sent: %d\n ",fh->f_rank,index,
 638                    bytes_sent);
 639             printf("%d : Entries per aggregator : %d\n",fh->f_rank,entries_per_aggregator);
 640 #endif
 641 
 642             if (entries_per_aggregator > 0){
 643                 file_offsets_for_agg = (mca_io_ompio_local_io_array *)
 644                     malloc(entries_per_aggregator*sizeof(mca_io_ompio_local_io_array));
 645                 if (NULL == file_offsets_for_agg) {
 646                     opal_output (1, "OUT OF MEMORY\n");
 647                     ret = OMPI_ERR_OUT_OF_RESOURCE;
 648                     goto exit;
 649                 }
 650 
 651                 sorted_file_offsets = (int *)
 652                     malloc (entries_per_aggregator*sizeof(int));
 653                 if (NULL == sorted_file_offsets){
 654                     opal_output (1, "OUT OF MEMORY\n");
 655                     ret =  OMPI_ERR_OUT_OF_RESOURCE;
 656                     goto exit;
 657                 }
 658 
 659                 /*Moving file offsets to an IO array!*/
 660                 temp_index = 0;
 661 
 662                 for (i=0;i<fh->f_procs_per_group; i++){
 663                     for(j=0;j<disp_index[i];j++){
 664                         if (blocklen_per_process[i][j] > 0){
 665                             file_offsets_for_agg[temp_index].length =
 666                                 blocklen_per_process[i][j];
 667                             file_offsets_for_agg[temp_index].process_id = i;
 668                             file_offsets_for_agg[temp_index].offset =
 669                                 displs_per_process[i][j];
 670                             temp_index++;
 671 
 672 #if DEBUG_ON
 673                             printf("************Cycle: %d,  Aggregator: %d ***************\n",
 674                                    index+1,fh->f_rank);
 675 
 676                             printf("%d sends blocklen[%d]: %d, disp[%d]: %ld to %d\n",
 677                                    fh->f_procs_in_group[i],j,
 678                                    blocklen_per_process[i][j],j,
 679                                    displs_per_process[i][j],
 680                                    fh->f_rank);
 681 #endif
 682                         }
 683                     }
 684                 }
 685             }
 686             else{
 687                 continue;
 688             }
 689             /* Sort the displacements for each aggregator*/
 690             local_heap_sort (file_offsets_for_agg,
 691                              entries_per_aggregator,
 692                              sorted_file_offsets);
 693 
 694             /*create contiguous memory displacements
 695               based on blocklens on the same displs array
 696               and map it to this aggregator's actual
 697               file-displacements (this is in the io-array created above)*/
 698             memory_displacements = (MPI_Aint *) malloc
 699                 (entries_per_aggregator * sizeof(MPI_Aint));
 700 
 701             memory_displacements[sorted_file_offsets[0]] = 0;
 702             for (i=1; i<entries_per_aggregator; i++){
 703                 memory_displacements[sorted_file_offsets[i]] =
 704                     memory_displacements[sorted_file_offsets[i-1]] +
 705                     file_offsets_for_agg[sorted_file_offsets[i-1]].length;
 706             }
 707 
 708             temp_disp_index = (int *)calloc (1, fh->f_procs_per_group * sizeof (int));
 709             if (NULL == temp_disp_index) {
 710                 opal_output (1, "OUT OF MEMORY\n");
 711                 ret = OMPI_ERR_OUT_OF_RESOURCE;
 712                 goto exit;
 713             }
 714 
 715             /*Now update the displacements array  with memory offsets*/
 716             global_count = 0;
 717             for (i=0;i<entries_per_aggregator;i++){
 718                 temp_pindex =
 719                     file_offsets_for_agg[sorted_file_offsets[i]].process_id;
 720                 displs_per_process[temp_pindex][temp_disp_index[temp_pindex]] =
 721                     memory_displacements[sorted_file_offsets[i]];
 722                 if (temp_disp_index[temp_pindex] < disp_index[temp_pindex])
 723                     temp_disp_index[temp_pindex] += 1;
 724                 else{
 725                     printf("temp_disp_index[%d]: %d is greater than disp_index[%d]: %d\n",
 726                            temp_pindex, temp_disp_index[temp_pindex],
 727                            temp_pindex, disp_index[temp_pindex]);
 728                 }
 729                 global_count +=
 730                     file_offsets_for_agg[sorted_file_offsets[i]].length;
 731             }
 732 
 733             if (NULL != temp_disp_index){
 734                 free(temp_disp_index);
 735                 temp_disp_index = NULL;
 736             }
 737 
 738 #if DEBUG_ON
 739 
 740             printf("************Cycle: %d,  Aggregator: %d ***************\n",
 741                    index+1,fh->f_rank);
 742             for (i=0;i<fh->f_procs_per_group; i++){
 743                 for(j=0;j<disp_index[i];j++){
 744                     if (blocklen_per_process[i][j] > 0){
 745                         printf("%d sends blocklen[%d]: %d, disp[%d]: %ld to %d\n",
 746                                fh->f_procs_in_group[i],j,
 747                                blocklen_per_process[i][j],j,
 748                                displs_per_process[i][j],
 749                                fh->f_rank);
 750 
 751                     }
 752                 }
 753             }
 754             printf("************Cycle: %d,  Aggregator: %d ***************\n",
 755                    index+1,fh->f_rank);
 756             for (i=0; i<entries_per_aggregator;i++){
 757                 printf("%d: OFFSET: %lld   LENGTH: %ld, Mem-offset: %ld\n",
 758                        file_offsets_for_agg[sorted_file_offsets[i]].process_id,
 759                        file_offsets_for_agg[sorted_file_offsets[i]].offset,
 760                        file_offsets_for_agg[sorted_file_offsets[i]].length,
 761                        memory_displacements[sorted_file_offsets[i]]);
 762             }
 763             printf("%d : global_count : %ld, bytes_sent : %d\n",
 764                    fh->f_rank,global_count, bytes_sent);
 765 #endif
 766 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 767             start_comm_time = MPI_Wtime();
 768 #endif
 769         /*************************************************************************
 770          *** 7e. Perform the actual communication
 771          *************************************************************************/
 772             for (i=0;i<fh->f_procs_per_group; i++) {
 773                 recv_req[i] = MPI_REQUEST_NULL;
 774                 if ( 0 < disp_index[i] ) {
 775                     ompi_datatype_create_hindexed(disp_index[i],
 776                                                   blocklen_per_process[i],
 777                                                   displs_per_process[i],
 778                                                   MPI_BYTE,
 779                                                   &recvtype[i]);
 780                     ompi_datatype_commit(&recvtype[i]);
 781                     opal_datatype_type_size(&recvtype[i]->super, &datatype_size);
 782 
 783                     if (datatype_size){
 784                         ret = MCA_PML_CALL(irecv(global_buf,
 785                                                  1,
 786                                                  recvtype[i],
 787                                                  fh->f_procs_in_group[i],
 788                                                  123,
 789                                                  fh->f_comm,
 790                                                  &recv_req[i]));
 791                         if (OMPI_SUCCESS != ret){
 792                             goto exit;
 793                         }
 794                     }
 795                 }
 796             }
 797         } /* end if (my_aggregator == fh->f_rank ) */
 798 
 799 
 800         if ( sendbuf_is_contiguous ) {
 801             send_buf = &((char*)buf)[total_bytes_written];
 802         }
 803         else if (bytes_sent) {
 804             /* allocate a send buffer and copy the data that needs
 805                to be sent into it in case the data is non-contigous
 806                in memory */
 807             ptrdiff_t mem_address;
 808             size_t remaining = 0;
 809             size_t temp_position = 0;
 810 
 811             send_buf = malloc (bytes_sent);
 812             if (NULL == send_buf) {
 813                 opal_output (1, "OUT OF MEMORY\n");
 814                 ret = OMPI_ERR_OUT_OF_RESOURCE;
 815                 goto exit;
 816             }
 817 
 818             remaining = bytes_sent;
 819 
 820             while (remaining) {
 821                 mem_address = (ptrdiff_t)
 822                     (decoded_iov[iov_index].iov_base) + current_position;
 823 
 824                 if (remaining >=
 825                     (decoded_iov[iov_index].iov_len - current_position)) {
 826                     memcpy (send_buf+temp_position,
 827                             (IOVBASE_TYPE *)mem_address,
 828                             decoded_iov[iov_index].iov_len - current_position);
 829                     remaining = remaining -
 830                         (decoded_iov[iov_index].iov_len - current_position);
 831                     temp_position = temp_position +
 832                         (decoded_iov[iov_index].iov_len - current_position);
 833                     iov_index = iov_index + 1;
 834                     current_position = 0;
 835                 }
 836                 else {
 837                     memcpy (send_buf+temp_position,
 838                             (IOVBASE_TYPE *) mem_address,
 839                             remaining);
 840                     current_position = current_position + remaining;
 841                     remaining = 0;
 842                 }
 843             }
 844         }
 845         total_bytes_written += bytes_sent;
 846 
 847         /* Gather the sendbuf from each process in appropritate locations in
 848            aggregators*/
 849 
 850         if (bytes_sent){
 851             ret = MCA_PML_CALL(isend(send_buf,
 852                                      bytes_sent,
 853                                      MPI_BYTE,
 854                                      my_aggregator,
 855                                      123,
 856                                      MCA_PML_BASE_SEND_STANDARD,
 857                                      fh->f_comm,
 858                                      &send_req));
 859 
 860 
 861             if ( OMPI_SUCCESS != ret ){
 862                 goto exit;
 863             }
 864 
 865             ret = ompi_request_wait(&send_req, MPI_STATUS_IGNORE);
 866             if (OMPI_SUCCESS != ret){
 867                 goto exit;
 868             }
 869         }
 870 
 871         if (my_aggregator == fh->f_rank) {
 872             ret = ompi_request_wait_all (fh->f_procs_per_group,
 873                                          recv_req,
 874                                          MPI_STATUS_IGNORE);
 875 
 876             if (OMPI_SUCCESS != ret){
 877                 goto exit;
 878             }
 879         }
 880 
 881 #if DEBUG_ON
 882         if (my_aggregator == fh->f_rank){
 883             printf("************Cycle: %d,  Aggregator: %d ***************\n",
 884                    index+1,fh->f_rank);
 885             for (i=0 ; i<global_count/4 ; i++)
 886                 printf (" RECV %d \n",((int *)global_buf)[i]);
 887         }
 888 #endif
 889 
 890         if (! sendbuf_is_contiguous) {
 891             if (NULL != send_buf) {
 892                 free (send_buf);
 893                 send_buf = NULL;
 894             }
 895         }
 896 
 897 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 898         end_comm_time = MPI_Wtime();
 899         comm_time += (end_comm_time - start_comm_time);
 900 #endif
 901         /**********************************************************
 902          *** 7f. Create the io array, and pass it to fbtl
 903          *********************************************************/
 904 
 905         if (my_aggregator == fh->f_rank) {
 906 
 907 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 908             start_write_time = MPI_Wtime();
 909 #endif
 910 
 911             fh->f_io_array = (mca_common_ompio_io_array_t *) malloc
 912                 (entries_per_aggregator * sizeof (mca_common_ompio_io_array_t));
 913             if (NULL == fh->f_io_array) {
 914                 opal_output(1, "OUT OF MEMORY\n");
 915                 ret = OMPI_ERR_OUT_OF_RESOURCE;
 916                 goto exit;
 917             }
 918 
 919             fh->f_num_of_io_entries = 0;
 920             /*First entry for every aggregator*/
 921             fh->f_io_array[0].offset =
 922                 (IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_file_offsets[0]].offset;
 923             fh->f_io_array[0].length =
 924                 file_offsets_for_agg[sorted_file_offsets[0]].length;
 925             fh->f_io_array[0].memory_address =
 926                 global_buf+memory_displacements[sorted_file_offsets[0]];
 927             fh->f_num_of_io_entries++;
 928 
 929             for (i=1;i<entries_per_aggregator;i++){
 930                 /* If the enrties are contiguous merge them,
 931                    else make a new entry */
 932                 if (file_offsets_for_agg[sorted_file_offsets[i-1]].offset +
 933                     file_offsets_for_agg[sorted_file_offsets[i-1]].length ==
 934                     file_offsets_for_agg[sorted_file_offsets[i]].offset){
 935                     fh->f_io_array[fh->f_num_of_io_entries - 1].length +=
 936                         file_offsets_for_agg[sorted_file_offsets[i]].length;
 937                 }
 938                 else {
 939                     fh->f_io_array[fh->f_num_of_io_entries].offset =
 940                         (IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_file_offsets[i]].offset;
 941                     fh->f_io_array[fh->f_num_of_io_entries].length =
 942                         file_offsets_for_agg[sorted_file_offsets[i]].length;
 943                     fh->f_io_array[fh->f_num_of_io_entries].memory_address =
 944                         global_buf+memory_displacements[sorted_file_offsets[i]];
 945                     fh->f_num_of_io_entries++;
 946                 }
 947 
 948             }
 949 
 950 #if DEBUG_ON
 951             printf("*************************** %d\n", fh->f_num_of_io_entries);
 952             for (i=0 ; i<fh->f_num_of_io_entries ; i++) {
 953                 printf(" ADDRESS: %p  OFFSET: %ld   LENGTH: %ld\n",
 954                        fh->f_io_array[i].memory_address,
 955                        (ptrdiff_t)fh->f_io_array[i].offset,
 956                        fh->f_io_array[i].length);
 957             }
 958 
 959 #endif
 960 
 961             if (fh->f_num_of_io_entries) {
 962                 if ( 0 >  fh->f_fbtl->fbtl_pwritev (fh)) {
 963                     opal_output (1, "WRITE FAILED\n");
 964                     ret = OMPI_ERROR;
 965                     goto exit;
 966                 }
 967             }
 968 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 969             end_write_time = MPI_Wtime();
 970             write_time += end_write_time - start_write_time;
 971 #endif
 972 
 973 
 974         } /* end if (my_aggregator == fh->f_rank) */
 975     } /* end  for (index = 0; index < cycles; index++) */
 976 
 977 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 978     end_exch = MPI_Wtime();
 979     exch_write += end_exch - start_exch;
 980     nentry.time[0] = write_time;
 981     nentry.time[1] = comm_time;
 982     nentry.time[2] = exch_write;
 983     if (my_aggregator == fh->f_rank)
 984         nentry.aggregator = 1;
 985     else
 986         nentry.aggregator = 0;
 987     nentry.nprocs_for_coll = dynamic_num_io_procs;
 988     if (!mca_common_ompio_full_print_queue(fh->f_coll_write_time)){
 989         mca_common_ompio_register_print_entry(fh->f_coll_write_time,
 990                                               nentry);
 991     }
 992 #endif
 993 
 994 
 995 exit :
 996     if (my_aggregator == fh->f_rank) {
 997         if (NULL != sorted_file_offsets){
 998             free(sorted_file_offsets);
 999             sorted_file_offsets = NULL;
1000         }
1001         if(NULL != file_offsets_for_agg){
1002             free(file_offsets_for_agg);
1003             file_offsets_for_agg = NULL;
1004         }
1005         if (NULL != memory_displacements){
1006             free(memory_displacements);
1007             memory_displacements = NULL;
1008         }
1009         if (NULL != recvtype){
1010             for (i =0; i< fh->f_procs_per_group; i++) {
1011                 if ( MPI_DATATYPE_NULL != recvtype[i] ) {
1012                     ompi_datatype_destroy(&recvtype[i]);
1013                 }
1014             }
1015             free(recvtype);
1016             recvtype=NULL;
1017         }
1018 
1019         if (NULL != fh->f_io_array) {
1020             free (fh->f_io_array);
1021             fh->f_io_array = NULL;
1022         }
1023         if (NULL != disp_index){
1024             free(disp_index);
1025             disp_index = NULL;
1026         }
1027         if (NULL != recvtype){
1028             free(recvtype);
1029             recvtype=NULL;
1030         }
1031         if (NULL != recv_req){
1032             free(recv_req);
1033             recv_req = NULL;
1034         }
1035         if (NULL != global_buf) {
1036             free (global_buf);
1037             global_buf = NULL;
1038         }
1039         for(l=0;l<fh->f_procs_per_group;l++){
1040             if (NULL != blocklen_per_process){
1041                 free(blocklen_per_process[l]);
1042             }
1043             if (NULL != displs_per_process){
1044                 free(displs_per_process[l]);
1045             }
1046         }
1047 
1048         free(blocklen_per_process);
1049         free(displs_per_process);
1050     }
1051 
1052     if (NULL != displs){
1053         free(displs);
1054         displs=NULL;
1055     }
1056 
1057     if (! sendbuf_is_contiguous) {
1058         if (NULL != send_buf) {
1059             free (send_buf);
1060             send_buf = NULL;
1061         }
1062     }
1063     if (NULL != global_buf) {
1064         free (global_buf);
1065         global_buf = NULL;
1066     }
1067     if (NULL != sorted) {
1068         free (sorted);
1069         sorted = NULL;
1070     }
1071     if (NULL != global_iov_array) {
1072         free (global_iov_array);
1073         global_iov_array = NULL;
1074     }
1075     if (NULL != fview_count) {
1076         free (fview_count);
1077         fview_count = NULL;
1078     }
1079     if (NULL != decoded_iov) {
1080         free (decoded_iov);
1081         decoded_iov = NULL;
1082     }
1083 
1084 
1085     return OMPI_SUCCESS;
1086 }
1087 
1088 
1089 static int local_heap_sort (mca_io_ompio_local_io_array *io_array,
1090                             int num_entries,
1091                             int *sorted)
1092 {
1093     int i = 0;
1094     int j = 0;
1095     int left = 0;
1096     int right = 0;
1097     int largest = 0;
1098     int heap_size = num_entries - 1;
1099     int temp = 0;
1100     unsigned char done = 0;
1101     int* temp_arr = NULL;
1102 
1103     temp_arr = (int*)malloc(num_entries*sizeof(int));
1104     if (NULL == temp_arr) {
1105         opal_output (1, "OUT OF MEMORY\n");
1106         return OMPI_ERR_OUT_OF_RESOURCE;
1107     }
1108     temp_arr[0] = 0;
1109     for (i = 1; i < num_entries; ++i) {
1110         temp_arr[i] = i;
1111     }
1112     /* num_entries can be a large no. so NO RECURSION */
1113     for (i = num_entries/2-1 ; i>=0 ; i--) {
1114         done = 0;
1115         j = i;
1116         largest = j;
1117 
1118         while (!done) {
1119             left = j*2+1;
1120             right = j*2+2;
1121             if ((left <= heap_size) &&
1122                 (io_array[temp_arr[left]].offset > io_array[temp_arr[j]].offset)) {
1123                 largest = left;
1124             }
1125             else {
1126                 largest = j;
1127             }
1128             if ((right <= heap_size) &&
1129                 (io_array[temp_arr[right]].offset >
1130                  io_array[temp_arr[largest]].offset)) {
1131                 largest = right;
1132             }
1133             if (largest != j) {
1134                 temp = temp_arr[largest];
1135                 temp_arr[largest] = temp_arr[j];
1136                 temp_arr[j] = temp;
1137                 j = largest;
1138             }
1139             else {
1140                 done = 1;
1141             }
1142         }
1143     }
1144 
1145     for (i = num_entries-1; i >=1; --i) {
1146         temp = temp_arr[0];
1147         temp_arr[0] = temp_arr[i];
1148         temp_arr[i] = temp;
1149         heap_size--;
1150         done = 0;
1151         j = 0;
1152         largest = j;
1153 
1154         while (!done) {
1155             left =  j*2+1;
1156             right = j*2+2;
1157 
1158             if ((left <= heap_size) &&
1159                 (io_array[temp_arr[left]].offset >
1160                  io_array[temp_arr[j]].offset)) {
1161                 largest = left;
1162             }
1163             else {
1164                 largest = j;
1165             }
1166             if ((right <= heap_size) &&
1167                 (io_array[temp_arr[right]].offset >
1168                  io_array[temp_arr[largest]].offset)) {
1169                 largest = right;
1170             }
1171             if (largest != j) {
1172                 temp = temp_arr[largest];
1173                 temp_arr[largest] = temp_arr[j];
1174                 temp_arr[j] = temp;
1175                 j = largest;
1176             }
1177             else {
1178                 done = 1;
1179             }
1180         }
1181         sorted[i] = temp_arr[i];
1182     }
1183     sorted[0] = temp_arr[0];
1184 
1185     if (NULL != temp_arr) {
1186         free(temp_arr);
1187         temp_arr = NULL;
1188     }
1189     return OMPI_SUCCESS;
1190 }
1191 
1192 

/* [<][>][^][v][top][bottom][index][help] */