root/ompi/mca/io/ompio/io_ompio.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. ompi_io_ompio_generate_current_file_view
  2. ompi_io_ompio_sort_offlen
  3. mca_io_ompio_get_mca_parameter_value

   1 /* -*- Mode: C; c-basic-offset:4 ; -*- */
   2 /*
   3  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   4  *                         University Research and Technology
   5  *                         Corporation.  All rights reserved.
   6  * Copyright (c) 2004-2017 The University of Tennessee and The University
   7  *                         of Tennessee Research Foundation.  All rights
   8  *                         reserved.
   9  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
  10  *                         University of Stuttgart.  All rights reserved.
  11  * Copyright (c) 2004-2005 The Regents of the University of California.
  12  *                         All rights reserved.
  13  * Copyright (c) 2008-2018 University of Houston. All rights reserved.
  14  * Copyright (c) 2011-2015 Cisco Systems, Inc.  All rights reserved.
  15  * Copyright (c) 2012-2013 Inria.  All rights reserved.
  16  * Copyright (c) 2015-2018 Research Organization for Information Science
  17  *                         and Technology (RIST). All rights reserved.
  18  * $COPYRIGHT$
  19  *
  20  * Additional copyrights may follow
  21  *
  22  * $HEADER$
  23  */
  24 
  25 #include "ompi_config.h"
  26 
  27 #include "ompi/runtime/params.h"
  28 #include "ompi/communicator/communicator.h"
  29 #include "ompi/mca/pml/pml.h"
  30 #include "ompi/mca/topo/topo.h"
  31 #include "ompi/mca/fcoll/base/fcoll_base_coll_array.h"
  32 #include "opal/datatype/opal_convertor.h"
  33 #include "opal/datatype/opal_datatype.h"
  34 #include "ompi/datatype/ompi_datatype.h"
  35 #include "ompi/info/info.h"
  36 #include "ompi/request/request.h"
  37 
  38 #include <math.h>
  39 #include <unistd.h>
  40 
  41 #include "io_ompio.h"
  42 
  43 
  44 int ompi_io_ompio_generate_current_file_view (struct ompio_file_t *fh,
  45                                               size_t max_data,
  46                                               struct iovec **f_iov,
  47                                               int *iov_count)
  48 {
  49 
  50     struct iovec *iov = NULL;
  51     size_t bytes_to_write;
  52     size_t sum_previous_counts = 0;
  53     int j, k;
  54     int block = 1;
  55 
  56    /* allocate an initial iovec, will grow if needed */
  57     iov = (struct iovec *) calloc
  58         (OMPIO_IOVEC_INITIAL_SIZE, sizeof (struct iovec));
  59     if (NULL == iov) {
  60         opal_output(1, "OUT OF MEMORY\n");
  61         return OMPI_ERR_OUT_OF_RESOURCE;
  62     }
  63 
  64     sum_previous_counts = fh->f_position_in_file_view;
  65     j = fh->f_index_in_file_view;
  66     bytes_to_write = max_data;
  67     k = 0;
  68 
  69     while (bytes_to_write) {
  70         ptrdiff_t disp;
  71         /* reallocate if needed */
  72         if (OMPIO_IOVEC_INITIAL_SIZE*block <= k) {
  73             block ++;
  74             iov = (struct iovec *)realloc
  75                 (iov, OMPIO_IOVEC_INITIAL_SIZE *block *sizeof(struct iovec));
  76             if (NULL == iov) {
  77                 opal_output(1, "OUT OF MEMORY\n");
  78                 return OMPI_ERR_OUT_OF_RESOURCE;
  79             }
  80         }
  81 
  82         if (fh->f_decoded_iov[j].iov_len -
  83             (fh->f_total_bytes - sum_previous_counts) <= 0) {
  84             sum_previous_counts += fh->f_decoded_iov[j].iov_len;
  85             j = j + 1;
  86             if (j == (int)fh->f_iov_count) {
  87                 j = 0;
  88                 sum_previous_counts = 0;
  89                 fh->f_offset += fh->f_view_extent;
  90                 fh->f_position_in_file_view = sum_previous_counts;
  91                 fh->f_index_in_file_view = j;
  92                 fh->f_total_bytes = 0;
  93             }
  94         }
  95 
  96         disp = (ptrdiff_t)(fh->f_decoded_iov[j].iov_base) +
  97             (fh->f_total_bytes - sum_previous_counts);
  98         iov[k].iov_base = (IOVBASE_TYPE *)(intptr_t)(disp + fh->f_offset);
  99 
 100         if ((fh->f_decoded_iov[j].iov_len -
 101              (fh->f_total_bytes - sum_previous_counts))
 102             >= bytes_to_write) {
 103             iov[k].iov_len = bytes_to_write;
 104         }
 105         else {
 106             iov[k].iov_len =  fh->f_decoded_iov[j].iov_len -
 107                 (fh->f_total_bytes - sum_previous_counts);
 108         }
 109 
 110         fh->f_total_bytes += iov[k].iov_len;
 111         bytes_to_write -= iov[k].iov_len;
 112         k = k + 1;
 113     }
 114     fh->f_position_in_file_view = sum_previous_counts;
 115     fh->f_index_in_file_view = j;
 116     *iov_count = k;
 117     *f_iov = iov;
 118 
 119     if (mca_io_ompio_record_offset_info){
 120 
 121         int tot_entries=0, *recvcounts=NULL, *displs=NULL;
 122         mca_io_ompio_offlen_array_t *per_process=NULL;
 123         mca_io_ompio_offlen_array_t  *all_process=NULL;
 124         int *sorted=NULL, *column_list=NULL, *values=NULL;
 125         int *row_index=NULL, i=0, l=0, m=0;
 126         int column_index=0, r_index=0;
 127         int blocklen[3] = {1, 1, 1};
 128         ptrdiff_t d[3], base;
 129         ompi_datatype_t *types[3];
 130         ompi_datatype_t *io_array_type=MPI_DATATYPE_NULL;
 131         int **adj_matrix=NULL;
 132         FILE *fp;
 133 
 134 
 135         recvcounts = (int *) malloc (fh->f_size * sizeof(int));
 136         if (NULL == recvcounts){
 137             return OMPI_ERR_OUT_OF_RESOURCE;
 138         }
 139         displs = (int *) malloc (fh->f_size * sizeof(int));
 140         if (NULL == displs){
 141             free(recvcounts);
 142             return OMPI_ERR_OUT_OF_RESOURCE;
 143         }
 144 
 145         fh->f_comm->c_coll->coll_gather (&k,
 146                                         1,
 147                                         MPI_INT,
 148                                         recvcounts,
 149                                         1,
 150                                         MPI_INT,
 151                                         OMPIO_ROOT,
 152                                         fh->f_comm,
 153                                         fh->f_comm->c_coll->coll_gather_module);
 154 
 155         per_process = (mca_io_ompio_offlen_array_t *)
 156             malloc (k * sizeof(mca_io_ompio_offlen_array_t));
 157         if (NULL == per_process){
 158             opal_output(1,"Error while allocating per process!\n");
 159             free(recvcounts);
 160             free(displs);
 161             return  OMPI_ERR_OUT_OF_RESOURCE;
 162         }
 163         for (i=0;i<k;i++){
 164             per_process[i].offset =
 165                 (OMPI_MPI_OFFSET_TYPE)(intptr_t)iov[i].iov_base;
 166             per_process[i].length =
 167                 (MPI_Aint)iov[i].iov_len;
 168             per_process[i].process_id = fh->f_rank;
 169         }
 170 
 171         types[0] = &ompi_mpi_long.dt;
 172         types[1] = &ompi_mpi_long.dt;
 173         types[2] = &ompi_mpi_int.dt;
 174 
 175         d[0] = (ptrdiff_t)&per_process[0];
 176         d[1] = (ptrdiff_t)&per_process[0].length;
 177         d[2] = (ptrdiff_t)&per_process[0].process_id;
 178         base = d[0];
 179         for (i=0;i<3;i++){
 180             d[i] -= base;
 181         }
 182         ompi_datatype_create_struct (3,
 183                                      blocklen,
 184                                      d,
 185                                      types,
 186                                      &io_array_type);
 187         ompi_datatype_commit (&io_array_type);
 188 
 189         if (OMPIO_ROOT == fh->f_rank){
 190             tot_entries = recvcounts[0];
 191             displs[0] = 0;
 192             for(i=1;i<fh->f_size;i++){
 193                 displs[i] = displs[i-1] + recvcounts[i-1];
 194                 tot_entries += recvcounts[i];
 195             }
 196             all_process = (mca_io_ompio_offlen_array_t *)
 197                 malloc (tot_entries * sizeof(mca_io_ompio_offlen_array_t));
 198             if (NULL == all_process){
 199                 opal_output(1,"Error while allocating per process!\n");
 200                 free(per_process);
 201                 free(recvcounts);
 202                 free(displs);
 203                 return  OMPI_ERR_OUT_OF_RESOURCE;
 204             }
 205 
 206             sorted = (int *) malloc
 207                 (tot_entries * sizeof(int));
 208             if (NULL == sorted){
 209                 opal_output(1,"Error while allocating per process!\n");
 210                 free(all_process);
 211                 free(per_process);
 212                 free(recvcounts);
 213                 free(displs);
 214                 return  OMPI_ERR_OUT_OF_RESOURCE;
 215             }
 216 
 217             adj_matrix = (int **) malloc (fh->f_size *
 218                                           sizeof(int *));
 219             if (NULL == adj_matrix) {
 220                 opal_output(1,"Error while allocating per process!\n");
 221                 free(sorted);
 222                 free(all_process);
 223                 free(per_process);
 224                 free(recvcounts);
 225                 free(displs);
 226                 return  OMPI_ERR_OUT_OF_RESOURCE;
 227             }
 228             for (i=0;i<fh->f_size;i++){
 229                 adj_matrix[i] = (int *) malloc (fh->f_size *
 230                                                 sizeof (int ));
 231                 if (NULL == adj_matrix[i]) {
 232                     for (j=0; j<i; j++) {
 233                         free(adj_matrix[j]);
 234                     }
 235                     free(adj_matrix);
 236                     free(sorted);
 237                     free(all_process);
 238                     free(per_process);
 239                     free(recvcounts);
 240                     free(displs);
 241                     return  OMPI_ERR_OUT_OF_RESOURCE;
 242                  }
 243             }
 244 
 245             for (i=0;i<fh->f_size;i++){
 246                 for (j=0;j<fh->f_size;j++){
 247                     adj_matrix[i][j] = 0;
 248                 }
 249             }
 250         }
 251         fh->f_comm->c_coll->coll_gatherv (per_process,
 252                                          k,
 253                                          io_array_type,
 254                                          all_process,
 255                                          recvcounts,
 256                                          displs,
 257                                          io_array_type,
 258                                          OMPIO_ROOT,
 259                                          fh->f_comm,
 260                                          fh->f_comm->c_coll->coll_gatherv_module);
 261 
 262         ompi_datatype_destroy(&io_array_type);
 263 
 264         if (OMPIO_ROOT == fh->f_rank){
 265 
 266             ompi_io_ompio_sort_offlen(all_process,
 267                                       tot_entries,
 268                                       sorted);
 269 
 270             for (i=0;i<tot_entries-1;i++){
 271                 j = all_process[sorted[i]].process_id;
 272                 l = all_process[sorted[i+1]].process_id;
 273                 adj_matrix[j][l] += 1;
 274                 adj_matrix[l][j] += 1;
 275             }
 276 
 277             /*Compress sparse matrix based on CRS to write to file */
 278             m = 0;
 279             for (i=0; i<fh->f_size; i++){
 280                 for (j=0; j<fh->f_size; j++){
 281                     if (adj_matrix[i][j] > 0){
 282                         m++;
 283                     }
 284                 }
 285             }
 286             fp = fopen("fileview_info.out", "w+");
 287             if ( NULL == fp ) {
 288                 for (i=0; i<fh->f_size; i++) {
 289                     free(adj_matrix[i]);
 290                 }
 291                 free(adj_matrix);
 292                 free(sorted);
 293                 free(all_process);
 294                 free(per_process);
 295                 free(recvcounts);
 296                 free(displs);
 297                 return MPI_ERR_OTHER;
 298             }
 299             fprintf(fp,"FILEVIEW\n");
 300             column_list = (int *) malloc ( m * sizeof(int));
 301             if (NULL == column_list){
 302                 opal_output(1,"Error while allocating column list\n");
 303                 fclose(fp);
 304                 for (i=0; i<fh->f_size; i++) {
 305                     free(adj_matrix[i]);
 306                 }
 307                 free(adj_matrix);
 308                 free(sorted);
 309                 free(all_process);
 310                 free(per_process);
 311                 free(recvcounts);
 312                 free(displs);
 313                 return OMPI_ERR_OUT_OF_RESOURCE;
 314             }
 315             values = (int *) malloc ( m * sizeof(int));
 316             if (NULL == values){
 317                 opal_output(1,"Error while allocating values list\n");
 318                 fclose(fp);
 319                 for (i=0; i<fh->f_size; i++) {
 320                     free(adj_matrix[i]);
 321                 }
 322                 free(adj_matrix);
 323                 free(column_list);
 324                 free(sorted);
 325                 free(all_process);
 326                 free(per_process);
 327                 free(recvcounts);
 328                 free(displs);
 329                 return OMPI_ERR_OUT_OF_RESOURCE;
 330             }
 331 
 332             row_index = (int *) malloc ((fh->f_size + 1) *
 333                                         sizeof(int));
 334             if (NULL == row_index){
 335                 opal_output(1,"Error while allocating row_index list\n");
 336                 fclose(fp);
 337                 for (i=0; i<fh->f_size; i++) {
 338                     free(adj_matrix[i]);
 339                 }
 340                 free(adj_matrix);
 341                 free(values);
 342                 free(column_list);
 343                 free(sorted);
 344                 free(all_process);
 345                 free(per_process);
 346                 free(recvcounts);
 347                 free(displs);
 348                 return OMPI_ERR_OUT_OF_RESOURCE;
 349             }
 350             fprintf(fp,"%d %d\n", m, fh->f_size+1);
 351             column_index = 0;
 352             r_index = 1;
 353             row_index[0] = r_index;
 354             for (i=0; i<fh->f_size; i++){
 355                 for (j=0; j<fh->f_size; j++){
 356                     if (adj_matrix[i][j] > 0){
 357                         values[column_index]= adj_matrix[i][j];
 358                         column_list[column_index]= j;
 359                         fprintf(fp,"%d ", column_list[column_index]);
 360                         column_index++;
 361                         r_index++;
 362                     }
 363 
 364                 }
 365                 row_index[i+1]= r_index;
 366             }
 367 
 368             fprintf(fp,"\n");
 369             for (i=0; i<m;i++){
 370                 fprintf(fp, "%d ", values[i]);
 371             }
 372             fprintf(fp, "\n");
 373             for (i=0; i< (fh->f_size + 1); i++){
 374                 fprintf(fp, "%d ", row_index[i]);
 375             }
 376             fprintf(fp, "\n");
 377             fclose(fp);
 378 
 379             if (NULL != recvcounts){
 380                 free(recvcounts);
 381                 recvcounts = NULL;
 382             }
 383             if (NULL != displs){
 384                 free(displs);
 385                 displs = NULL;
 386             }
 387             if (NULL != sorted){
 388                 free(sorted);
 389                 sorted = NULL;
 390             }
 391             if (NULL != per_process){
 392                 free(per_process);
 393                 per_process = NULL;
 394             }
 395             if (NULL != all_process){
 396                 free(all_process);
 397                 all_process = NULL;
 398             }
 399             free(column_list);
 400             free(values);
 401             if (NULL != row_index){
 402                 free(row_index);
 403                 row_index = NULL;
 404             }
 405             if (NULL != adj_matrix){
 406                 for (i=0;i<fh->f_size;i++){
 407                     free(adj_matrix[i]);
 408                 }
 409                 free(adj_matrix);
 410                 adj_matrix = NULL;
 411             }
 412         }
 413     }
 414     return OMPI_SUCCESS;
 415 }
 416 
 417 
 418 int ompi_io_ompio_sort_offlen (mca_io_ompio_offlen_array_t *io_array,
 419                                int num_entries,
 420                                int *sorted){
 421 
 422     int i = 0;
 423     int j = 0;
 424     int left = 0;
 425     int right = 0;
 426     int largest = 0;
 427     int heap_size = num_entries - 1;
 428     int temp = 0;
 429     unsigned char done = 0;
 430     int* temp_arr = NULL;
 431 
 432     temp_arr = (int*)malloc(num_entries*sizeof(int));
 433     if (NULL == temp_arr) {
 434         opal_output (1, "OUT OF MEMORY\n");
 435         return OMPI_ERR_OUT_OF_RESOURCE;
 436     }
 437     temp_arr[0] = 0;
 438     for (i = 1; i < num_entries; ++i) {
 439         temp_arr[i] = i;
 440     }
 441     /* num_entries can be a large no. so NO RECURSION */
 442     for (i = num_entries/2-1 ; i>=0 ; i--) {
 443         done = 0;
 444         j = i;
 445         largest = j;
 446 
 447         while (!done) {
 448             left = j*2+1;
 449             right = j*2+2;
 450             if ((left <= heap_size) &&
 451                 (io_array[temp_arr[left]].offset > io_array[temp_arr[j]].offset)) {
 452                 largest = left;
 453             }
 454             else {
 455                 largest = j;
 456             }
 457             if ((right <= heap_size) &&
 458                 (io_array[temp_arr[right]].offset >
 459                  io_array[temp_arr[largest]].offset)) {
 460                 largest = right;
 461             }
 462             if (largest != j) {
 463                 temp = temp_arr[largest];
 464                 temp_arr[largest] = temp_arr[j];
 465                 temp_arr[j] = temp;
 466                 j = largest;
 467             }
 468             else {
 469                 done = 1;
 470             }
 471         }
 472     }
 473 
 474     for (i = num_entries-1; i >=1; --i) {
 475         temp = temp_arr[0];
 476         temp_arr[0] = temp_arr[i];
 477         temp_arr[i] = temp;
 478         heap_size--;
 479         done = 0;
 480         j = 0;
 481         largest = j;
 482 
 483         while (!done) {
 484             left =  j*2+1;
 485             right = j*2+2;
 486 
 487             if ((left <= heap_size) &&
 488                 (io_array[temp_arr[left]].offset >
 489                  io_array[temp_arr[j]].offset)) {
 490                 largest = left;
 491             }
 492             else {
 493                 largest = j;
 494             }
 495             if ((right <= heap_size) &&
 496                 (io_array[temp_arr[right]].offset >
 497                  io_array[temp_arr[largest]].offset)) {
 498                 largest = right;
 499             }
 500             if (largest != j) {
 501                 temp = temp_arr[largest];
 502                 temp_arr[largest] = temp_arr[j];
 503                 temp_arr[j] = temp;
 504                 j = largest;
 505             }
 506             else {
 507                 done = 1;
 508             }
 509         }
 510         sorted[i] = temp_arr[i];
 511     }
 512     sorted[0] = temp_arr[0];
 513 
 514     if (NULL != temp_arr) {
 515         free(temp_arr);
 516         temp_arr = NULL;
 517     }
 518     return OMPI_SUCCESS;
 519 }
 520 
 521 
 522 int mca_io_ompio_get_mca_parameter_value ( char *mca_parameter_name, int name_length )
 523 {
 524     if ( !strncmp ( mca_parameter_name, "verbose_info_parsing", name_length )) {
 525         return mca_io_ompio_verbose_info_parsing;
 526     }
 527     else if ( !strncmp ( mca_parameter_name, "num_aggregators", name_length )) {
 528         return mca_io_ompio_num_aggregators;
 529     }
 530     else if ( !strncmp ( mca_parameter_name, "bytes_per_agg", name_length )) {
 531         return mca_io_ompio_bytes_per_agg;
 532     }
 533     else if ( !strncmp ( mca_parameter_name, "overwrite_amode", name_length )) {
 534         return mca_io_ompio_overwrite_amode;
 535     }
 536     else if ( !strncmp ( mca_parameter_name, "cycle_buffer_size", name_length )) {
 537         return mca_io_ompio_cycle_buffer_size;
 538     }
 539     else if ( !strncmp ( mca_parameter_name, "max_aggregators_ratio", name_length )) {
 540         return mca_io_ompio_max_aggregators_ratio;
 541     }
 542     else if ( !strncmp ( mca_parameter_name, "aggregators_cutoff_threshold", name_length )) {
 543         return mca_io_ompio_aggregators_cutoff_threshold;
 544     }
 545     else if ( !strncmp ( mca_parameter_name, "grouping_option", name_length )) {
 546         return mca_io_ompio_grouping_option;
 547     }
 548     else if ( !strncmp ( mca_parameter_name, "coll_timing_info", name_length )) {
 549         return mca_io_ompio_coll_timing_info;
 550     }
 551     else {
 552         opal_output (1, "Error in mca_io_ompio_get_mca_parameter_value: unknown parameter name");
 553     }
 554 
 555     /* Using here OMPI_ERROR_MAX instead of OMPI_ERROR, since -1 (which is OMPI_ERROR) 
 556     ** is a valid value for some mca parameters, indicating that the user did not set 
 557     ** that parameter value 
 558     */
 559     return OMPI_ERR_MAX;
 560 }
 561 
 562 
 563 
 564 
 565 

/* [<][>][^][v][top][bottom][index][help] */