root/ompi/mca/fcoll/dynamic/fcoll_dynamic_file_read_all.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_fcoll_dynamic_file_read_all
  2. read_heap_sort

   1 /*
   2  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2005 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2008-2015 University of Houston. All rights reserved.
  13  * Copyright (c) 2017-2018 Research Organization for Information Science
  14  *                         and Technology (RIST). All rights reserved.
  15  * Copyright (c) 2017      IBM Corporation. All rights reserved.
  16  * $COPYRIGHT$
  17  *
  18  * Additional copyrights may follow
  19  *
  20  * $HEADER$
  21  */
  22 
  23 #include "ompi_config.h"
  24 #include "fcoll_dynamic.h"
  25 
  26 #include "mpi.h"
  27 #include "ompi/constants.h"
  28 #include "ompi/mca/fcoll/fcoll.h"
  29 #include "ompi/mca/fcoll/base/fcoll_base_coll_array.h"
  30 #include "ompi/mca/common/ompio/common_ompio.h"
  31 #include "ompi/mca/io/io.h"
  32 #include "math.h"
  33 #include "ompi/mca/pml/pml.h"
  34 #include <unistd.h>
  35 
  36 #define DEBUG_ON 0
  37 
  38 /*Used for loading file-offsets per aggregator*/
  39 typedef struct mca_io_ompio_local_io_array{
  40     OMPI_MPI_OFFSET_TYPE offset;
  41     MPI_Aint             length;
  42     int                  process_id;
  43 }mca_io_ompio_local_io_array;
  44 
  45 
  46 static int read_heap_sort (mca_io_ompio_local_io_array *io_array,
  47                            int num_entries,
  48                            int *sorted);
  49 
  50 
  51 
  52 int
  53 mca_fcoll_dynamic_file_read_all (ompio_file_t *fh,
  54                                  void *buf,
  55                                  int count,
  56                                  struct ompi_datatype_t *datatype,
  57                                  ompi_status_public_t *status)
  58 {
  59     MPI_Aint position = 0;
  60     MPI_Aint total_bytes = 0;          /* total bytes to be read */
  61     MPI_Aint bytes_to_read_in_cycle = 0; /* left to be read in a cycle*/
  62     MPI_Aint bytes_per_cycle = 0;      /* total read in each cycle by each process*/
  63     int index = 0, ret=OMPI_SUCCESS;
  64     int cycles = 0;
  65     int i=0, j=0, l=0;
  66     int n=0; /* current position in total_bytes_per_process array */
  67     MPI_Aint bytes_remaining = 0; /* how many bytes have been read from the current
  68                                      value from total_bytes_per_process */
  69     int *sorted_file_offsets=NULL, entries_per_aggregator=0;
  70     int bytes_received = 0;
  71     int blocks = 0;
  72     /* iovec structure and count of the buffer passed in */
  73     uint32_t iov_count = 0;
  74     struct iovec *decoded_iov = NULL;
  75     int iov_index = 0;
  76     size_t current_position = 0;
  77     struct iovec *local_iov_array=NULL, *global_iov_array=NULL;
  78     char *receive_buf = NULL;
  79     MPI_Aint *memory_displacements=NULL;
  80     /* global iovec at the readers that contain the iovecs created from
  81        file_set_view */
  82     uint32_t total_fview_count = 0;
  83     int local_count = 0;
  84     int *fview_count = NULL, *disp_index=NULL, *temp_disp_index=NULL;
  85     int current_index=0, temp_index=0;
  86     int **blocklen_per_process=NULL;
  87     MPI_Aint **displs_per_process=NULL;
  88     char *global_buf = NULL;
  89     MPI_Aint global_count = 0;
  90     mca_io_ompio_local_io_array *file_offsets_for_agg=NULL;
  91 
  92     /* array that contains the sorted indices of the global_iov */
  93     int *sorted = NULL;
  94     int *displs = NULL;
  95     int dynamic_num_io_procs;
  96     size_t max_data = 0;
  97     MPI_Aint *total_bytes_per_process = NULL;
  98     ompi_datatype_t **sendtype = NULL;
  99     MPI_Request *send_req=NULL, recv_req=NULL;
 100     int my_aggregator =-1;
 101     bool recvbuf_is_contiguous=false;
 102     size_t ftype_size;
 103     ptrdiff_t ftype_extent, lb;
 104 
 105 
 106 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 107     double read_time = 0.0, start_read_time = 0.0, end_read_time = 0.0;
 108     double rcomm_time = 0.0, start_rcomm_time = 0.0, end_rcomm_time = 0.0;
 109     double read_exch = 0.0, start_rexch = 0.0, end_rexch = 0.0;
 110     mca_common_ompio_print_entry nentry;
 111 #endif
 112 
 113     /**************************************************************************
 114      ** 1. In case the data is not contigous in memory, decode it into an iovec
 115      **************************************************************************/
 116 
 117     opal_datatype_type_size ( &datatype->super, &ftype_size );
 118     opal_datatype_get_extent ( &datatype->super, &lb, &ftype_extent );
 119 
 120     if ( (ftype_extent == (ptrdiff_t) ftype_size)             &&
 121         opal_datatype_is_contiguous_memory_layout(&datatype->super,1) &&
 122         0 == lb ) {
 123         recvbuf_is_contiguous = true;
 124     }
 125 
 126 
 127     if (! recvbuf_is_contiguous ) {
 128         ret = mca_common_ompio_decode_datatype ((struct ompio_file_t *)fh,
 129                                                 datatype,
 130                                                 count,
 131                                                 buf,
 132                                                 &max_data,
 133                                                 fh->f_mem_convertor,
 134                                                 &decoded_iov,
 135                                                 &iov_count);
 136         if (OMPI_SUCCESS != ret){
 137             goto exit;
 138         }
 139     }
 140     else {
 141         max_data = count * datatype->super.size;
 142     }
 143 
 144     if ( MPI_STATUS_IGNORE != status ) {
 145         status->_ucount = max_data;
 146     }
 147 
 148     dynamic_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators"));
 149     if ( OMPI_ERR_MAX == dynamic_num_io_procs ) {
 150         ret = OMPI_ERROR;
 151         goto exit;
 152     }
 153     ret = mca_common_ompio_set_aggregator_props ((struct ompio_file_t *) fh,
 154                                                  dynamic_num_io_procs,
 155                                                  max_data);
 156     if (OMPI_SUCCESS != ret){
 157         goto exit;
 158     }
 159     my_aggregator = fh->f_procs_in_group[0];
 160 
 161     /**************************************************************************
 162      ** 2. Determine the total amount of data to be written
 163      **************************************************************************/
 164     total_bytes_per_process = (MPI_Aint*)malloc(fh->f_procs_per_group*sizeof(MPI_Aint));
 165     if (NULL == total_bytes_per_process) {
 166         opal_output (1, "OUT OF MEMORY\n");
 167         ret = OMPI_ERR_OUT_OF_RESOURCE;
 168         goto exit;
 169     }
 170 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 171     start_rcomm_time = MPI_Wtime();
 172 #endif
 173     ret = ompi_fcoll_base_coll_allgather_array (&max_data,
 174                                            1,
 175                                            MPI_LONG,
 176                                            total_bytes_per_process,
 177                                            1,
 178                                            MPI_LONG,
 179                                            0,
 180                                            fh->f_procs_in_group,
 181                                            fh->f_procs_per_group,
 182                                            fh->f_comm);
 183     if (OMPI_SUCCESS != ret){
 184         goto exit;
 185     }
 186 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 187     end_rcomm_time = MPI_Wtime();
 188     rcomm_time += end_rcomm_time - start_rcomm_time;
 189 #endif
 190 
 191     for (i=0 ; i<fh->f_procs_per_group ; i++) {
 192         total_bytes += total_bytes_per_process[i];
 193     }
 194 
 195     if (NULL != total_bytes_per_process) {
 196         free (total_bytes_per_process);
 197         total_bytes_per_process = NULL;
 198     }
 199 
 200     /*********************************************************************
 201      *** 3. Generate the File offsets/lengths corresponding to this write
 202      ********************************************************************/
 203     ret = fh->f_generate_current_file_view ((struct ompio_file_t *) fh,
 204                                             max_data,
 205                                             &local_iov_array,
 206                                             &local_count);
 207 
 208     if (ret != OMPI_SUCCESS){
 209         goto exit;
 210     }
 211 
 212     /*************************************************************
 213      *** 4. Allgather the File View information at all processes
 214      *************************************************************/
 215 
 216     fview_count = (int *) malloc (fh->f_procs_per_group * sizeof (int));
 217     if (NULL == fview_count) {
 218         opal_output (1, "OUT OF MEMORY\n");
 219         ret = OMPI_ERR_OUT_OF_RESOURCE;
 220         goto exit;
 221     }
 222 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 223     start_rcomm_time = MPI_Wtime();
 224 #endif
 225     ret = ompi_fcoll_base_coll_allgather_array (&local_count,
 226                                            1,
 227                                            MPI_INT,
 228                                            fview_count,
 229                                            1,
 230                                            MPI_INT,
 231                                            0,
 232                                            fh->f_procs_in_group,
 233                                            fh->f_procs_per_group,
 234                                            fh->f_comm);
 235     
 236     if (OMPI_SUCCESS != ret){
 237         goto exit;
 238     }
 239 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 240     end_rcomm_time = MPI_Wtime();
 241     rcomm_time += end_rcomm_time - start_rcomm_time;
 242 #endif
 243 
 244     displs = (int*)malloc (fh->f_procs_per_group*sizeof(int));
 245     if (NULL == displs) {
 246         opal_output (1, "OUT OF MEMORY\n");
 247         ret = OMPI_ERR_OUT_OF_RESOURCE;
 248         goto exit;
 249     }
 250 
 251     displs[0] = 0;
 252     total_fview_count = fview_count[0];
 253     for (i=1 ; i<fh->f_procs_per_group ; i++) {
 254         total_fview_count += fview_count[i];
 255         displs[i] = displs[i-1] + fview_count[i-1];
 256     }
 257 
 258 #if DEBUG_ON
 259     if (my_aggregator == fh->f_rank) {
 260     for (i=0 ; i<fh->f_procs_per_group ; i++) {
 261     printf ("%d: PROCESS: %d  ELEMENTS: %d  DISPLS: %d\n",
 262         fh->f_rank,
 263         i,
 264         fview_count[i],
 265         displs[i]);
 266 }
 267 }
 268 #endif
 269 
 270     /* allocate the global iovec  */
 271     if (0 != total_fview_count) {
 272         global_iov_array = (struct iovec*)malloc (total_fview_count *
 273                                                   sizeof(struct iovec));
 274         if (NULL == global_iov_array) {
 275             opal_output (1, "OUT OF MEMORY\n");
 276             ret = OMPI_ERR_OUT_OF_RESOURCE;
 277             goto exit;
 278         }
 279     }
 280 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 281     start_rcomm_time = MPI_Wtime();
 282 #endif
 283     ret =  ompi_fcoll_base_coll_allgatherv_array (local_iov_array,
 284                                              local_count,
 285                                              fh->f_iov_type,
 286                                              global_iov_array,
 287                                              fview_count,
 288                                              displs,
 289                                              fh->f_iov_type,
 290                                              0,
 291                                              fh->f_procs_in_group,
 292                                              fh->f_procs_per_group,
 293                                              fh->f_comm);
 294     
 295     if (OMPI_SUCCESS != ret){
 296         goto exit;
 297     }
 298 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 299     end_rcomm_time = MPI_Wtime();
 300     rcomm_time += end_rcomm_time - start_rcomm_time;
 301 #endif
 302 
 303     /****************************************************************************************
 304      *** 5. Sort the global offset/lengths list based on the offsets.
 305      *** The result of the sort operation is the 'sorted', an integer array,
 306      *** which contains the indexes of the global_iov_array based on the offset.
 307      *** For example, if global_iov_array[x].offset is followed by global_iov_array[y].offset
 308      *** in the file, and that one is followed by global_iov_array[z].offset, than
 309      *** sorted[0] = x, sorted[1]=y and sorted[2]=z;
 310      ******************************************************************************************/
 311     if (0 != total_fview_count) {
 312        sorted = (int *)malloc (total_fview_count * sizeof(int));
 313       if (NULL == sorted) {
 314             opal_output (1, "OUT OF MEMORY\n");
 315             ret = OMPI_ERR_OUT_OF_RESOURCE;
 316             goto exit;
 317         }
 318         ompi_fcoll_base_sort_iovec (global_iov_array, total_fview_count, sorted);
 319     }
 320 
 321     if (NULL != local_iov_array) {
 322         free (local_iov_array);
 323         local_iov_array = NULL;
 324     }
 325 
 326 #if DEBUG_ON
 327     if (my_aggregator == fh->f_rank) {
 328         for (i=0 ; i<total_fview_count ; i++) {
 329             printf("%d: OFFSET: %p   LENGTH: %d\n",
 330                    fh->f_rank,
 331                    global_iov_array[sorted[i]].iov_base,
 332                    global_iov_array[sorted[i]].iov_len);
 333         }
 334     }
 335 #endif
 336 
 337     /*************************************************************
 338      *** 6. Determine the number of cycles required to execute this
 339      ***    operation
 340      *************************************************************/
 341     bytes_per_cycle = fh->f_bytes_per_agg;
 342     cycles = ceil((double)total_bytes/bytes_per_cycle);
 343 
 344     if ( my_aggregator == fh->f_rank) {
 345       disp_index = (int *)malloc (fh->f_procs_per_group * sizeof (int));
 346       if (NULL == disp_index) {
 347             opal_output (1, "OUT OF MEMORY\n");
 348             ret = OMPI_ERR_OUT_OF_RESOURCE;
 349             goto exit;
 350         }
 351 
 352         blocklen_per_process = (int **)malloc (fh->f_procs_per_group * sizeof (int*));
 353         if (NULL == blocklen_per_process) {
 354             opal_output (1, "OUT OF MEMORY\n");
 355             ret = OMPI_ERR_OUT_OF_RESOURCE;
 356             goto exit;
 357         }
 358 
 359         displs_per_process = (MPI_Aint **)malloc (fh->f_procs_per_group * sizeof (MPI_Aint*));
 360         if (NULL == displs_per_process){
 361             opal_output (1, "OUT OF MEMORY\n");
 362             ret = OMPI_ERR_OUT_OF_RESOURCE;
 363             goto exit;
 364         }
 365 
 366         for (i=0;i<fh->f_procs_per_group;i++){
 367             blocklen_per_process[i] = NULL;
 368             displs_per_process[i] = NULL;
 369         }
 370 
 371         send_req = (MPI_Request *) malloc (fh->f_procs_per_group * sizeof(MPI_Request));
 372         if (NULL == send_req){
 373             opal_output ( 1, "OUT OF MEMORY\n");
 374             ret = OMPI_ERR_OUT_OF_RESOURCE;
 375             goto exit;
 376         }
 377 
 378         global_buf = (char *) malloc (bytes_per_cycle);
 379         if (NULL == global_buf){
 380             opal_output(1, "OUT OF MEMORY\n");
 381             ret = OMPI_ERR_OUT_OF_RESOURCE;
 382             goto exit;
 383         }
 384 
 385         sendtype = (ompi_datatype_t **) malloc (fh->f_procs_per_group * sizeof(ompi_datatype_t *));
 386         if (NULL == sendtype) {
 387             opal_output (1, "OUT OF MEMORY\n");
 388             ret = OMPI_ERR_OUT_OF_RESOURCE;
 389             goto exit;
 390         }
 391 
 392         for(l=0;l<fh->f_procs_per_group;l++){
 393             sendtype[l] = MPI_DATATYPE_NULL;
 394         }
 395     }
 396 
 397 
 398 
 399 
 400 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 401     start_rexch = MPI_Wtime();
 402 #endif
 403     n = 0;
 404     bytes_remaining = 0;
 405     current_index = 0;
 406 
 407     for (index = 0; index < cycles; index++) {
 408         /**********************************************************************
 409          ***  7a. Getting ready for next cycle: initializing and freeing buffers
 410          **********************************************************************/
 411         if (my_aggregator == fh->f_rank) {
 412              if (NULL != fh->f_io_array) {
 413                 free (fh->f_io_array);
 414                 fh->f_io_array = NULL;
 415             }
 416             fh->f_num_of_io_entries = 0;
 417 
 418             if (NULL != sendtype){
 419                 for (i =0; i< fh->f_procs_per_group; i++) {
 420                     if ( MPI_DATATYPE_NULL != sendtype[i] ) {
 421                         ompi_datatype_destroy(&sendtype[i]);
 422                         sendtype[i] = MPI_DATATYPE_NULL;
 423                     }
 424                 }
 425             }
 426 
 427             for(l=0;l<fh->f_procs_per_group;l++){
 428                 disp_index[l] =  1;
 429 
 430                 if (NULL != blocklen_per_process[l]){
 431                     free(blocklen_per_process[l]);
 432                     blocklen_per_process[l] = NULL;
 433                 }
 434                 if (NULL != displs_per_process[l]){
 435                     free(displs_per_process[l]);
 436                     displs_per_process[l] = NULL;
 437                 }
 438                 blocklen_per_process[l] = (int *) calloc (1, sizeof(int));
 439                 if (NULL == blocklen_per_process[l]) {
 440                     opal_output (1, "OUT OF MEMORY for blocklen\n");
 441                     ret = OMPI_ERR_OUT_OF_RESOURCE;
 442                     goto exit;
 443                 }
 444                 displs_per_process[l] = (MPI_Aint *) calloc (1, sizeof(MPI_Aint));
 445                 if (NULL == displs_per_process[l]){
 446                     opal_output (1, "OUT OF MEMORY for displs\n");
 447                     ret = OMPI_ERR_OUT_OF_RESOURCE;
 448                     goto exit;
 449                 }
 450             }
 451 
 452             if (NULL != sorted_file_offsets){
 453                 free(sorted_file_offsets);
 454                 sorted_file_offsets = NULL;
 455             }
 456 
 457             if(NULL != file_offsets_for_agg){
 458                 free(file_offsets_for_agg);
 459                 file_offsets_for_agg = NULL;
 460             }
 461             if (NULL != memory_displacements){
 462                 free(memory_displacements);
 463                 memory_displacements = NULL;
 464             }
 465         }  /* (my_aggregator == fh->f_rank */
 466 
 467         /**************************************************************************
 468          ***  7b. Determine the number of bytes to be actually read in this cycle
 469          **************************************************************************/
 470         if (cycles-1 == index) {
 471             bytes_to_read_in_cycle = total_bytes - bytes_per_cycle*index;
 472         }
 473         else {
 474             bytes_to_read_in_cycle = bytes_per_cycle;
 475         }
 476 
 477 #if DEBUG_ON
 478         if (my_aggregator == fh->f_rank) {
 479             printf ("****%d: CYCLE %d   Bytes %d**********\n",
 480                     fh->f_rank,
 481                     index,
 482                     bytes_to_write_in_cycle);
 483         }
 484 #endif
 485 
 486         /*****************************************************************
 487          *** 7c. Calculate how much data will be contributed in this cycle
 488          ***     by each process
 489          *****************************************************************/
 490         bytes_received = 0;
 491 
 492         while (bytes_to_read_in_cycle) {
 493             /* This next block identifies which process is the holder
 494             ** of the sorted[current_index] element;
 495             */
 496             blocks = fview_count[0];
 497             for (j=0 ; j<fh->f_procs_per_group ; j++) {
 498                 if (sorted[current_index] < blocks) {
 499                     n = j;
 500                     break;
 501                 }
 502                 else {
 503                     blocks += fview_count[j+1];
 504                 }
 505             }
 506 
 507             if (bytes_remaining) {
 508                 /* Finish up a partially used buffer from the previous  cycle */
 509                 if (bytes_remaining <= bytes_to_read_in_cycle) {
 510                     /* Data fits completely into the block */
 511                     if (my_aggregator == fh->f_rank) {
 512                         blocklen_per_process[n][disp_index[n] - 1] = bytes_remaining;
 513                         displs_per_process[n][disp_index[n] - 1] =
 514                             (ptrdiff_t)global_iov_array[sorted[current_index]].iov_base +
 515                             (global_iov_array[sorted[current_index]].iov_len - bytes_remaining);
 516 
 517                         blocklen_per_process[n] = (int *) realloc
 518                             ((void *)blocklen_per_process[n], (disp_index[n]+1)*sizeof(int));
 519                         displs_per_process[n] = (MPI_Aint *) realloc
 520                             ((void *)displs_per_process[n], (disp_index[n]+1)*sizeof(MPI_Aint));
 521                         blocklen_per_process[n][disp_index[n]] = 0;
 522                         displs_per_process[n][disp_index[n]] = 0;
 523                         disp_index[n] += 1;
 524                     }
 525                     if (fh->f_procs_in_group[n] == fh->f_rank) {
 526                         bytes_received += bytes_remaining;
 527                     }
 528                     current_index ++;
 529                     bytes_to_read_in_cycle -= bytes_remaining;
 530                     bytes_remaining = 0;
 531                     continue;
 532                 }
 533                 else {
 534                      /* the remaining data from the previous cycle is larger than the
 535                         bytes_to_write_in_cycle, so we have to segment again */
 536                     if (my_aggregator == fh->f_rank) {
 537                         blocklen_per_process[n][disp_index[n] - 1] = bytes_to_read_in_cycle;
 538                         displs_per_process[n][disp_index[n] - 1] =
 539                             (ptrdiff_t)global_iov_array[sorted[current_index]].iov_base +
 540                             (global_iov_array[sorted[current_index]].iov_len
 541                              - bytes_remaining);
 542                     }
 543                     if (fh->f_procs_in_group[n] == fh->f_rank) {
 544                         bytes_received += bytes_to_read_in_cycle;
 545                     }
 546                     bytes_remaining -= bytes_to_read_in_cycle;
 547                     bytes_to_read_in_cycle = 0;
 548                     break;
 549                 }
 550             }
 551             else {
 552                 /* No partially used entry available, have to start a new one */
 553                 if (bytes_to_read_in_cycle <
 554                     (MPI_Aint) global_iov_array[sorted[current_index]].iov_len) {
 555                     /* This entry has more data than we can sendin one cycle */
 556                     if (my_aggregator == fh->f_rank) {
 557                         blocklen_per_process[n][disp_index[n] - 1] = bytes_to_read_in_cycle;
 558                         displs_per_process[n][disp_index[n] - 1] =
 559                             (ptrdiff_t)global_iov_array[sorted[current_index]].iov_base ;
 560                     }
 561 
 562                     if (fh->f_procs_in_group[n] == fh->f_rank) {
 563                         bytes_received += bytes_to_read_in_cycle;
 564                     }
 565                     bytes_remaining = global_iov_array[sorted[current_index]].iov_len -
 566                         bytes_to_read_in_cycle;
 567                     bytes_to_read_in_cycle = 0;
 568                     break;
 569                 }
 570                 else {
 571                     /* Next data entry is less than bytes_to_write_in_cycle */
 572                     if (my_aggregator ==  fh->f_rank) {
 573                         blocklen_per_process[n][disp_index[n] - 1] =
 574                             global_iov_array[sorted[current_index]].iov_len;
 575                         displs_per_process[n][disp_index[n] - 1] = (ptrdiff_t)
 576                             global_iov_array[sorted[current_index]].iov_base;
 577                         blocklen_per_process[n] =
 578                             (int *) realloc ((void *)blocklen_per_process[n], (disp_index[n]+1)*sizeof(int));
 579                         displs_per_process[n] = (MPI_Aint *)realloc
 580                             ((void *)displs_per_process[n], (disp_index[n]+1)*sizeof(MPI_Aint));
 581                         blocklen_per_process[n][disp_index[n]] = 0;
 582                         displs_per_process[n][disp_index[n]] = 0;
 583                         disp_index[n] += 1;
 584                     }
 585                     if (fh->f_procs_in_group[n] == fh->f_rank) {
 586                         bytes_received +=
 587                             global_iov_array[sorted[current_index]].iov_len;
 588                     }
 589                     bytes_to_read_in_cycle -=
 590                         global_iov_array[sorted[current_index]].iov_len;
 591                     current_index ++;
 592                     continue;
 593                 }
 594             }
 595         } /* end while (bytes_to_read_in_cycle) */
 596 
 597         /*************************************************************************
 598          *** 7d. Calculate the displacement on where to put the data and allocate
 599          ***     the recieve buffer (global_buf)
 600          *************************************************************************/
 601         if (my_aggregator == fh->f_rank) {
 602             entries_per_aggregator=0;
 603             for (i=0;i<fh->f_procs_per_group; i++){
 604                 for (j=0;j<disp_index[i];j++){
 605                     if (blocklen_per_process[i][j] > 0)
 606                         entries_per_aggregator++ ;
 607                 }
 608             }
 609             if (entries_per_aggregator > 0){
 610                 file_offsets_for_agg = (mca_io_ompio_local_io_array *)
 611                     malloc(entries_per_aggregator*sizeof(mca_io_ompio_local_io_array));
 612                 if (NULL == file_offsets_for_agg) {
 613                     opal_output (1, "OUT OF MEMORY\n");
 614                     ret = OMPI_ERR_OUT_OF_RESOURCE;
 615                     goto exit;
 616                 }
 617                 sorted_file_offsets = (int *)
 618                     malloc (entries_per_aggregator*sizeof(int));
 619                 if (NULL == sorted_file_offsets){
 620                     opal_output (1, "OUT OF MEMORY\n");
 621                     ret =  OMPI_ERR_OUT_OF_RESOURCE;
 622                     goto exit;
 623                 }
 624                 /*Moving file offsets to an IO array!*/
 625                 temp_index = 0;
 626                 global_count = 0;
 627                 for (i=0;i<fh->f_procs_per_group; i++){
 628                     for(j=0;j<disp_index[i];j++){
 629                         if (blocklen_per_process[i][j] > 0){
 630                             file_offsets_for_agg[temp_index].length =
 631                                 blocklen_per_process[i][j];
 632                             global_count += blocklen_per_process[i][j];
 633                             file_offsets_for_agg[temp_index].process_id = i;
 634                             file_offsets_for_agg[temp_index].offset =
 635                                 displs_per_process[i][j];
 636                             temp_index++;
 637                         }
 638                     }
 639                 }
 640             }
 641             else{
 642                 continue;
 643             }
 644 
 645              /* Sort the displacements for each aggregator */
 646             read_heap_sort (file_offsets_for_agg,
 647                             entries_per_aggregator,
 648                             sorted_file_offsets);
 649 
 650             memory_displacements = (MPI_Aint *) malloc
 651                 (entries_per_aggregator * sizeof(MPI_Aint));
 652             memory_displacements[sorted_file_offsets[0]] = 0;
 653             for (i=1; i<entries_per_aggregator; i++){
 654                 memory_displacements[sorted_file_offsets[i]] =
 655                     memory_displacements[sorted_file_offsets[i-1]] +
 656                     file_offsets_for_agg[sorted_file_offsets[i-1]].length;
 657             }
 658 
 659              /**********************************************************
 660              *** 7e. Create the io array, and pass it to fbtl
 661              *********************************************************/
 662             fh->f_io_array = (mca_common_ompio_io_array_t *) malloc
 663                 (entries_per_aggregator * sizeof (mca_common_ompio_io_array_t));
 664             if (NULL == fh->f_io_array) {
 665                 opal_output(1, "OUT OF MEMORY\n");
 666                 ret = OMPI_ERR_OUT_OF_RESOURCE;
 667                 goto exit;
 668             }
 669 
 670             fh->f_num_of_io_entries = 0;
 671             fh->f_io_array[0].offset =
 672                 (IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_file_offsets[0]].offset;
 673             fh->f_io_array[0].length =
 674                 file_offsets_for_agg[sorted_file_offsets[0]].length;
 675             fh->f_io_array[0].memory_address =
 676                 global_buf+memory_displacements[sorted_file_offsets[0]];
 677             fh->f_num_of_io_entries++;
 678             for (i=1;i<entries_per_aggregator;i++){
 679                 if (file_offsets_for_agg[sorted_file_offsets[i-1]].offset +
 680                     file_offsets_for_agg[sorted_file_offsets[i-1]].length ==
 681                     file_offsets_for_agg[sorted_file_offsets[i]].offset){
 682                     fh->f_io_array[fh->f_num_of_io_entries - 1].length +=
 683                         file_offsets_for_agg[sorted_file_offsets[i]].length;
 684                 }
 685                 else{
 686                     fh->f_io_array[fh->f_num_of_io_entries].offset =
 687                         (IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_file_offsets[i]].offset;
 688                     fh->f_io_array[fh->f_num_of_io_entries].length =
 689                         file_offsets_for_agg[sorted_file_offsets[i]].length;
 690                     fh->f_io_array[fh->f_num_of_io_entries].memory_address =
 691                         global_buf+memory_displacements[sorted_file_offsets[i]];
 692                     fh->f_num_of_io_entries++;
 693                 }
 694             }
 695 
 696 
 697 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 698             start_read_time = MPI_Wtime();
 699 #endif
 700 
 701             if (fh->f_num_of_io_entries) {
 702                 if ( 0 >  fh->f_fbtl->fbtl_preadv (fh)) {
 703                     opal_output (1, "READ FAILED\n");
 704                     ret = OMPI_ERROR;
 705                     goto exit;
 706                 }
 707             }
 708 
 709 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 710             end_read_time = MPI_Wtime();
 711             read_time += end_read_time - start_read_time;
 712 #endif
 713             /**********************************************************
 714              ******************** DONE READING ************************
 715              *********************************************************/
 716 
 717             temp_disp_index = (int *)calloc (1, fh->f_procs_per_group * sizeof (int));
 718             if (NULL == temp_disp_index) {
 719                 opal_output (1, "OUT OF MEMORY\n");
 720                 ret = OMPI_ERR_OUT_OF_RESOURCE;
 721                 goto exit;
 722             }
 723             for (i=0; i<entries_per_aggregator; i++){
 724                 temp_index =
 725                     file_offsets_for_agg[sorted_file_offsets[i]].process_id;
 726                 displs_per_process[temp_index][temp_disp_index[temp_index]] =
 727                     memory_displacements[sorted_file_offsets[i]];
 728                 if (temp_disp_index[temp_index] < disp_index[temp_index]){
 729                     temp_disp_index[temp_index] += 1;
 730                 }
 731                 else{
 732                     printf("temp_disp_index[%d]: %d is greater than disp_index[%d]: %d\n",
 733                            temp_index, temp_disp_index[temp_index],
 734                            temp_index, disp_index[temp_index]);
 735                 }
 736             }
 737             if (NULL != temp_disp_index){
 738                 free(temp_disp_index);
 739                 temp_disp_index = NULL;
 740             }
 741 
 742 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 743             start_rcomm_time = MPI_Wtime();
 744 #endif
 745             for (i=0;i<fh->f_procs_per_group;i++){
 746                 send_req[i] = MPI_REQUEST_NULL;
 747                 if ( 0 < disp_index[i] ) {
 748                     ompi_datatype_create_hindexed(disp_index[i],
 749                                                   blocklen_per_process[i],
 750                                                   displs_per_process[i],
 751                                                   MPI_BYTE,
 752                                                   &sendtype[i]);
 753                     ompi_datatype_commit(&sendtype[i]);
 754                     ret = MCA_PML_CALL (isend(global_buf,
 755                                               1,
 756                                               sendtype[i],
 757                                               fh->f_procs_in_group[i],
 758                                               123,
 759                                               MCA_PML_BASE_SEND_STANDARD,
 760                                               fh->f_comm,
 761                                               &send_req[i]));
 762                     if(OMPI_SUCCESS != ret){
 763                         goto exit;
 764                     }
 765                 }
 766             }
 767 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 768             end_rcomm_time = MPI_Wtime();
 769             rcomm_time += end_rcomm_time - start_rcomm_time;
 770 #endif
 771         }
 772 
 773         /**********************************************************
 774          *** 7f.  Scatter the Data from the readers
 775          *********************************************************/
 776         if ( recvbuf_is_contiguous ) {
 777             receive_buf = &((char*)buf)[position];
 778         }
 779         else if (bytes_received) {
 780             /* allocate a receive buffer and copy the data that needs
 781                to be received into it in case the data is non-contigous
 782                in memory */
 783             receive_buf = malloc (bytes_received);
 784             if (NULL == receive_buf) {
 785                 opal_output (1, "OUT OF MEMORY\n");
 786                 ret = OMPI_ERR_OUT_OF_RESOURCE;
 787                 goto exit;
 788             }
 789         }
 790 
 791 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 792         start_rcomm_time = MPI_Wtime();
 793 #endif
 794         ret = MCA_PML_CALL(irecv(receive_buf,
 795                                  bytes_received,
 796                                  MPI_BYTE,
 797                                  my_aggregator,
 798                                  123,
 799                                  fh->f_comm,
 800                                  &recv_req));
 801         if (OMPI_SUCCESS != ret){
 802             goto exit;
 803         }
 804 
 805 
 806         if (my_aggregator == fh->f_rank){
 807             ret = ompi_request_wait_all (fh->f_procs_per_group,
 808                                          send_req,
 809                                          MPI_STATUS_IGNORE);
 810             if (OMPI_SUCCESS != ret){
 811                 goto exit;
 812             }
 813         }
 814 
 815         ret = ompi_request_wait (&recv_req, MPI_STATUS_IGNORE);
 816         if (OMPI_SUCCESS != ret){
 817             goto exit;
 818         }
 819         position += bytes_received;
 820 
 821         /* If data is not contigous in memory, copy the data from the
 822            receive buffer into the buffer passed in */
 823         if (!recvbuf_is_contiguous ) {
 824             ptrdiff_t mem_address;
 825             size_t remaining = 0;
 826             size_t temp_position = 0;
 827 
 828             remaining = bytes_received;
 829 
 830             while (remaining) {
 831                 mem_address = (ptrdiff_t)
 832                     (decoded_iov[iov_index].iov_base) + current_position;
 833 
 834                 if (remaining >=
 835                     (decoded_iov[iov_index].iov_len - current_position)) {
 836                     memcpy ((IOVBASE_TYPE *) mem_address,
 837                             receive_buf+temp_position,
 838                             decoded_iov[iov_index].iov_len - current_position);
 839                     remaining = remaining -
 840                         (decoded_iov[iov_index].iov_len - current_position);
 841                     temp_position = temp_position +
 842                         (decoded_iov[iov_index].iov_len - current_position);
 843                     iov_index = iov_index + 1;
 844                     current_position = 0;
 845                 }
 846                 else {
 847                     memcpy ((IOVBASE_TYPE *) mem_address,
 848                             receive_buf+temp_position,
 849                             remaining);
 850                     current_position = current_position + remaining;
 851                     remaining = 0;
 852                 }
 853             }
 854 
 855             if (NULL != receive_buf) {
 856                 free (receive_buf);
 857                 receive_buf = NULL;
 858             }
 859         }
 860 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 861         end_rcomm_time = MPI_Wtime();
 862         rcomm_time += end_rcomm_time - start_rcomm_time;
 863 #endif
 864     } /* end for (index=0; index < cycles; index ++) */
 865 
 866 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 867     end_rexch = MPI_Wtime();
 868     read_exch += end_rexch - start_rexch;
 869     nentry.time[0] = read_time;
 870     nentry.time[1] = rcomm_time;
 871     nentry.time[2] = read_exch;
 872     if (my_aggregator == fh->f_rank)
 873         nentry.aggregator = 1;
 874     else
 875         nentry.aggregator = 0;
 876     nentry.nprocs_for_coll = dynamic_num_io_procs;
 877     if (!mca_common_ompio_full_print_queue(fh->f_coll_read_time)){
 878         mca_common_ompio_register_print_entry(fh->f_coll_read_time,
 879                                               nentry);
 880     }
 881 #endif
 882 
 883 exit:
 884     if (!recvbuf_is_contiguous) {
 885         if (NULL != receive_buf) {
 886             free (receive_buf);
 887             receive_buf = NULL;
 888         }
 889     }
 890     if (NULL != global_buf) {
 891         free (global_buf);
 892         global_buf = NULL;
 893     }
 894     if (NULL != sorted) {
 895         free (sorted);
 896         sorted = NULL;
 897     }
 898     if (NULL != global_iov_array) {
 899         free (global_iov_array);
 900         global_iov_array = NULL;
 901     }
 902     if (NULL != fview_count) {
 903         free (fview_count);
 904         fview_count = NULL;
 905     }
 906     if (NULL != decoded_iov) {
 907         free (decoded_iov);
 908         decoded_iov = NULL;
 909     }
 910     if (NULL != local_iov_array){
 911         free(local_iov_array);
 912         local_iov_array=NULL;
 913     }
 914 
 915     if (NULL != displs) {
 916         free (displs);
 917         displs = NULL;
 918     }
 919     if (my_aggregator == fh->f_rank) {
 920 
 921         if (NULL != sorted_file_offsets){
 922             free(sorted_file_offsets);
 923             sorted_file_offsets = NULL;
 924         }
 925         if (NULL != file_offsets_for_agg){
 926             free(file_offsets_for_agg);
 927             file_offsets_for_agg = NULL;
 928         }
 929         if (NULL != memory_displacements){
 930             free(memory_displacements);
 931             memory_displacements= NULL;
 932         }
 933         if (NULL != sendtype){
 934             for (i = 0; i < fh->f_procs_per_group; i++) {
 935                 if ( MPI_DATATYPE_NULL != sendtype[i] ) {
 936                     ompi_datatype_destroy(&sendtype[i]);
 937                 }
 938             }
 939             free(sendtype);
 940             sendtype=NULL;
 941         }
 942 
 943         if (NULL != disp_index){
 944             free(disp_index);
 945             disp_index = NULL;
 946         }
 947 
 948         if ( NULL != blocklen_per_process){
 949             for(l=0;l<fh->f_procs_per_group;l++){
 950                 if (NULL != blocklen_per_process[l]){
 951                     free(blocklen_per_process[l]);
 952                     blocklen_per_process[l] = NULL;
 953                 }
 954             }
 955 
 956             free(blocklen_per_process);
 957             blocklen_per_process = NULL;
 958         }
 959 
 960         if (NULL != displs_per_process){
 961             for (l=0; i<fh->f_procs_per_group; l++){
 962                 if (NULL != displs_per_process[l]){
 963                     free(displs_per_process[l]);
 964                     displs_per_process[l] = NULL;
 965                 }
 966             }
 967             free(displs_per_process);
 968             displs_per_process = NULL;
 969         }
 970         if ( NULL != send_req ) {
 971             free ( send_req );
 972             send_req = NULL;
 973         }
 974     }
 975     return ret;
 976 }
 977 
 978 
 979 static int read_heap_sort (mca_io_ompio_local_io_array *io_array,
 980                            int num_entries,
 981                            int *sorted)
 982 {
 983     int i = 0;
 984     int j = 0;
 985     int left = 0;
 986     int right = 0;
 987     int largest = 0;
 988     int heap_size = num_entries - 1;
 989     int temp = 0;
 990     unsigned char done = 0;
 991     int* temp_arr = NULL;
 992 
 993     temp_arr = (int*)malloc(num_entries*sizeof(int));
 994     if (NULL == temp_arr) {
 995         opal_output (1, "OUT OF MEMORY\n");
 996         return OMPI_ERR_OUT_OF_RESOURCE;
 997     }
 998     temp_arr[0] = 0;
 999     for (i = 1; i < num_entries; ++i) {
1000         temp_arr[i] = i;
1001     }
1002     /* num_entries can be a large no. so NO RECURSION */
1003     for (i = num_entries/2-1 ; i>=0 ; i--) {
1004         done = 0;
1005         j = i;
1006         largest = j;
1007 
1008         while (!done) {
1009             left = j*2+1;
1010             right = j*2+2;
1011             if ((left <= heap_size) &&
1012                 (io_array[temp_arr[left]].offset > io_array[temp_arr[j]].offset)) {
1013                 largest = left;
1014             }
1015             else {
1016                 largest = j;
1017             }
1018             if ((right <= heap_size) &&
1019                 (io_array[temp_arr[right]].offset >
1020                  io_array[temp_arr[largest]].offset)) {
1021                 largest = right;
1022             }
1023             if (largest != j) {
1024                 temp = temp_arr[largest];
1025                 temp_arr[largest] = temp_arr[j];
1026                 temp_arr[j] = temp;
1027                 j = largest;
1028             }
1029             else {
1030                 done = 1;
1031             }
1032         }
1033     }
1034 
1035     for (i = num_entries-1; i >=1; --i) {
1036         temp = temp_arr[0];
1037         temp_arr[0] = temp_arr[i];
1038         temp_arr[i] = temp;
1039         heap_size--;
1040         done = 0;
1041         j = 0;
1042         largest = j;
1043 
1044         while (!done) {
1045             left =  j*2+1;
1046             right = j*2+2;
1047 
1048             if ((left <= heap_size) &&
1049                 (io_array[temp_arr[left]].offset >
1050                  io_array[temp_arr[j]].offset)) {
1051                 largest = left;
1052             }
1053             else {
1054                 largest = j;
1055             }
1056             if ((right <= heap_size) &&
1057                 (io_array[temp_arr[right]].offset >
1058                  io_array[temp_arr[largest]].offset)) {
1059                 largest = right;
1060             }
1061             if (largest != j) {
1062                 temp = temp_arr[largest];
1063                 temp_arr[largest] = temp_arr[j];
1064                 temp_arr[j] = temp;
1065                 j = largest;
1066             }
1067             else {
1068                 done = 1;
1069             }
1070         }
1071         sorted[i] = temp_arr[i];
1072     }
1073     sorted[0] = temp_arr[0];
1074 
1075     if (NULL != temp_arr) {
1076         free(temp_arr);
1077         temp_arr = NULL;
1078     }
1079     return OMPI_SUCCESS;
1080 }
1081 
1082 
1083 

/* [<][>][^][v][top][bottom][index][help] */