root/ompi/mca/fcoll/two_phase/fcoll_two_phase_file_read_all.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_fcoll_two_phase_file_read_all
  2. two_phase_read_and_exch
  3. two_phase_exchange_data
  4. two_phase_fill_user_buffer
  5. isread_aggregator

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   4  *                         University Research and Technology
   5  *                         Corporation.  All rights reserved.
   6  * Copyright (c) 2004-2017 The University of Tennessee and The University
   7  *                         of Tennessee Research Foundation.  All rights
   8  *                         reserved.
   9  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
  10  *                         University of Stuttgart.  All rights reserved.
  11  * Copyright (c) 2004-2005 The Regents of the University of California.
  12  *                         All rights reserved.
  13  * Copyright (c) 2008-2014 University of Houston. All rights reserved.
  14  * Copyright (c) 2015      Cisco Systems, Inc.  All rights reserved.
  15  * Copyright (c) 2015      Los Alamos National Security, LLC. All rights
  16  *                         reserved.
  17  * Copyright (c) 2017-2018 Research Organization for Information Science
  18  *                         and Technology (RIST). All rights reserved.
  19  * $COPYRIGHT$
  20  *
  21  * Additional copyrights may follow
  22  *
  23  * $HEADER$
  24 */
  25 
  26 #include "ompi_config.h"
  27 #include "fcoll_two_phase.h"
  28 #include "mpi.h"
  29 #include "ompi/constants.h"
  30 #include "ompi/communicator/communicator.h"
  31 #include "ompi/mca/fcoll/fcoll.h"
  32 #include "ompi/mca/common/ompio/common_ompio.h"
  33 #include "ompi/mca/io/io.h"
  34 #include "opal/mca/base/base.h"
  35 #include "math.h"
  36 #include "ompi/mca/pml/pml.h"
  37 #include <unistd.h>
  38 
  39 #define DEBUG 0
  40 
  41 /* Two Phase implementation from ROMIO ported to OMPIO infrastructure
  42  * This is pretty much the same as ROMIO's two_phase and based on ROMIO's code
  43  * base
  44  */
  45 
  46 
  47 /* Datastructure to support specifying the flat-list. */
  48 typedef struct flat_list_node {
  49     MPI_Datatype type;
  50     int count;
  51     OMPI_MPI_OFFSET_TYPE *blocklens;
  52     OMPI_MPI_OFFSET_TYPE *indices;
  53     struct flat_list_node *next;
  54 }Flatlist_node;
  55 
  56 /* local function declarations  */
  57 static int two_phase_read_and_exch(ompio_file_t *fh,
  58                                    void *buf,
  59                                    MPI_Datatype datatype,
  60                                    mca_common_ompio_access_array_t *others_req,
  61                                    struct iovec *offset_len,
  62                                    int contig_access_count,
  63                                    OMPI_MPI_OFFSET_TYPE min_st_offset,
  64                                    OMPI_MPI_OFFSET_TYPE fd_size,
  65                                    OMPI_MPI_OFFSET_TYPE *fd_start,
  66                                    OMPI_MPI_OFFSET_TYPE *fd_end,
  67                                    Flatlist_node *flat_buf,
  68                                    size_t *buf_idx, int striping_unit,
  69                                    int num_io_procs, int *aggregator_list);
  70 
  71 static int  two_phase_exchange_data(ompio_file_t *fh,
  72                                     void *buf,
  73                                     struct iovec *offset_length,
  74                                     int *send_size, int *start_pos,
  75                                     int *recv_size,
  76                                     int *count,
  77                                     int *partial_send, int *recd_from_proc,
  78                                     int contig_access_count,
  79                                     OMPI_MPI_OFFSET_TYPE min_st_offset,
  80                                     OMPI_MPI_OFFSET_TYPE fd_size,
  81                                     OMPI_MPI_OFFSET_TYPE *fd_start,
  82                                     OMPI_MPI_OFFSET_TYPE *fd_end,
  83                                     Flatlist_node *flat_buf,
  84                                     mca_common_ompio_access_array_t *others_req,
  85                                     int iter,
  86                                     size_t *buf_idx, MPI_Aint buftype_extent,
  87                                     int striping_unit, int num_io_procs,
  88                                     int *aggregator_list);
  89 
  90 
  91 static void two_phase_fill_user_buffer(ompio_file_t *fh,
  92                                        void *buf,
  93                                        Flatlist_node *flat_buf,
  94                                        char **recv_buf,
  95                                        struct iovec *offset_length,
  96                                        unsigned *recv_size,
  97                                        MPI_Request *requests,
  98                                        int *recd_from_proc,
  99                                        int contig_access_count,
 100                                        OMPI_MPI_OFFSET_TYPE min_st_offset,
 101                                        OMPI_MPI_OFFSET_TYPE fd_size,
 102                                        OMPI_MPI_OFFSET_TYPE *fd_start,
 103                                        OMPI_MPI_OFFSET_TYPE *fd_end,
 104                                        MPI_Aint buftype_extent,
 105                                        int striping_unit,
 106                                        int num_io_procs, int *aggregator_list);
 107 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 108 static int isread_aggregator(int rank,
 109                              int nprocs_for_coll,
 110                              int *aggregator_list);
 111 
 112 #endif
 113 
 114 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 115 double read_time = 0.0, start_read_time = 0.0, end_read_time = 0.0;
 116 double rcomm_time = 0.0, start_rcomm_time = 0.0, end_rcomm_time = 0.0;
 117 double read_exch = 0.0, start_rexch = 0.0, end_rexch = 0.0;
 118 #endif
 119 
 120 
 121 int
 122 mca_fcoll_two_phase_file_read_all (ompio_file_t *fh,
 123                                    void *buf,
 124                                    int count,
 125                                    struct ompi_datatype_t *datatype,
 126                                    ompi_status_public_t *status)
 127 {
 128 
 129     int ret = OMPI_SUCCESS, i = 0, j = 0, interleave_count = 0, striping_unit = 0;
 130     MPI_Aint recv_buf_addr = 0;
 131     uint32_t iov_count = 0, ti = 0;
 132     struct iovec *decoded_iov = NULL, *temp_iov = NULL, *iov = NULL;
 133     size_t max_data = 0;
 134     long long_max_data = 0, long_total_bytes = 0;
 135     int domain_size=0, *count_my_req_per_proc=NULL, count_my_req_procs = 0;
 136     int count_other_req_procs;
 137     size_t *buf_indices=NULL;
 138     int *aggregator_list = NULL, local_count = 0, local_size = 0;
 139     int two_phase_num_io_procs=1;
 140     OMPI_MPI_OFFSET_TYPE start_offset = 0, end_offset = 0, fd_size = 0;
 141     OMPI_MPI_OFFSET_TYPE *start_offsets=NULL, *end_offsets=NULL;
 142     OMPI_MPI_OFFSET_TYPE *fd_start=NULL, *fd_end=NULL, min_st_offset = 0;
 143     Flatlist_node *flat_buf=NULL;
 144     mca_common_ompio_access_array_t *my_req=NULL, *others_req=NULL;
 145 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 146     mca_common_ompio_print_entry nentry;
 147 #endif
 148 //    if (opal_datatype_is_predefined(&datatype->super)) {
 149 //      fh->f_flags = fh->f_flags |  OMPIO_CONTIGUOUS_MEMORY;
 150 //    }
 151 
 152     if (! (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) {
 153         ret =   mca_common_ompio_decode_datatype ((struct ompio_file_t *)fh,
 154                                                   datatype,
 155                                                   count,
 156                                                   buf,
 157                                                   &max_data,
 158                                                   fh->f_mem_convertor,
 159                                                   &temp_iov,
 160                                                   &iov_count);
 161         if (OMPI_SUCCESS != ret ){
 162             goto exit;
 163         }
 164 
 165         recv_buf_addr = (size_t)(buf);
 166         decoded_iov  = (struct iovec *) calloc
 167             (iov_count, sizeof(struct iovec));
 168 
 169         for (ti = 0; ti < iov_count; ti++){
 170 
 171             decoded_iov[ti].iov_base = (IOVBASE_TYPE *)
 172                 ((ptrdiff_t)temp_iov[ti].iov_base - recv_buf_addr);
 173             decoded_iov[ti].iov_len = temp_iov[ti].iov_len;
 174 #if DEBUG
 175             printf("d_offset[%d]: %ld, d_len[%d]: %ld\n",
 176                    ti, (ptrdiff_t)decoded_iov[ti].iov_base,
 177                    ti, decoded_iov[ti].iov_len);
 178 #endif
 179         }
 180 
 181     }
 182     else{
 183         max_data = count * datatype->super.size;
 184     }
 185 
 186     if ( MPI_STATUS_IGNORE != status ) {
 187         status->_ucount = max_data;
 188     }
 189 
 190     two_phase_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators"));
 191     if ( OMPI_ERR_MAX == two_phase_num_io_procs ) {
 192         ret = OMPI_ERROR;
 193         goto exit;
 194     }
 195     if (-1 == two_phase_num_io_procs ){
 196         ret = mca_common_ompio_set_aggregator_props ((struct ompio_file_t *)fh,
 197                                                      two_phase_num_io_procs,
 198                                                      max_data);
 199         if (OMPI_SUCCESS != ret){
 200             goto exit;
 201         }
 202 
 203         two_phase_num_io_procs = fh->f_num_aggrs;
 204 
 205     }
 206 
 207     if (two_phase_num_io_procs > fh->f_size){
 208         two_phase_num_io_procs = fh->f_size;
 209     }
 210 
 211     aggregator_list = (int *) calloc (two_phase_num_io_procs, sizeof(int));
 212     if (NULL == aggregator_list){
 213         ret = OMPI_ERR_OUT_OF_RESOURCE;
 214         goto exit;
 215     }
 216 
 217     if ( OMPI_COMM_IS_MAPBY_NODE (&ompi_mpi_comm_world.comm) ) {
 218         for (i =0; i< two_phase_num_io_procs; i++){
 219             aggregator_list[i] = i;
 220         }
 221     }
 222     else {
 223         for (i =0; i< two_phase_num_io_procs; i++){
 224             aggregator_list[i] = i * fh->f_size / two_phase_num_io_procs;
 225         }
 226     }        
 227 
 228     ret = fh->f_generate_current_file_view ((struct ompio_file_t *)fh,
 229                                             max_data,
 230                                             &iov,
 231                                             &local_count);
 232 
 233     if (OMPI_SUCCESS != ret){
 234         goto exit;
 235     }
 236 
 237     long_max_data = (long) max_data;
 238     ret = fh->f_comm->c_coll->coll_allreduce (&long_max_data,
 239                                              &long_total_bytes,
 240                                              1,
 241                                              MPI_LONG,
 242                                              MPI_SUM,
 243                                              fh->f_comm,
 244                                              fh->f_comm->c_coll->coll_allreduce_module);
 245 
 246     if ( OMPI_SUCCESS != ret ) {
 247         goto exit;
 248     }
 249 
 250     if (!(fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) {
 251 
 252         /* This datastructre translates between OMPIO->ROMIO its a little hacky!*/
 253         /* But helps to re-use romio's code for handling non-contiguous file-type*/
 254         /*Flattened datatype for ompio is in decoded_iov it translated into
 255           flatbuf*/
 256 
 257         flat_buf = (Flatlist_node *)calloc(1, sizeof(Flatlist_node));
 258         if ( NULL == flat_buf ){
 259             ret = OMPI_ERR_OUT_OF_RESOURCE;
 260             goto exit;
 261         }
 262 
 263         flat_buf->type = datatype;
 264         flat_buf->next = NULL;
 265         flat_buf->count = 0;
 266         flat_buf->indices = NULL;
 267         flat_buf->blocklens = NULL;
 268 
 269         if ( 0 < count ) {
 270             local_size = OMPIO_MAX(1,iov_count/count);
 271         }
 272         else {
 273             local_size = 0;
 274         }
 275 
 276 
 277         if ( 0 < local_size ) {
 278             flat_buf->indices =
 279                 (OMPI_MPI_OFFSET_TYPE *)calloc(local_size,
 280                                                sizeof(OMPI_MPI_OFFSET_TYPE));
 281             if (NULL == flat_buf->indices){
 282                 ret = OMPI_ERR_OUT_OF_RESOURCE;
 283                 goto exit;
 284             }
 285 
 286             flat_buf->blocklens =
 287                 (OMPI_MPI_OFFSET_TYPE *)calloc(local_size,
 288                                                sizeof(OMPI_MPI_OFFSET_TYPE));
 289             if ( NULL == flat_buf->blocklens ){
 290                 ret = OMPI_ERR_OUT_OF_RESOURCE;
 291                 goto exit;
 292             }
 293         }
 294         flat_buf->count = local_size;
 295         for (j = 0 ; j < local_size ; ++j) {
 296             flat_buf->indices[j] = (OMPI_MPI_OFFSET_TYPE)(intptr_t)decoded_iov[j].iov_base;
 297             flat_buf->blocklens[j] = decoded_iov[j].iov_len;
 298         }
 299 
 300 #if DEBUG
 301         printf("flat_buf count: %d\n",
 302                flat_buf->count);
 303         for(i=0;i<flat_buf->count;i++){
 304             printf("%d: blocklen[%d] : %lld, indices[%d]: %lld\n",
 305                    fh->f_rank, i, flat_buf->blocklens[i], i ,flat_buf->indices[i]);
 306         }
 307 #endif
 308     }
 309 
 310 #if DEBUG
 311     printf("%d: total_bytes:%ld, local_count: %d\n",
 312            fh->f_rank, long_total_bytes, local_count);
 313     for (i=0 ; i<local_count ; i++) {
 314         printf("%d: fcoll:two_phase:read_all:OFFSET:%ld,LENGTH:%ld\n",
 315                fh->f_rank,
 316                (size_t)iov[i].iov_base,
 317                (size_t)iov[i].iov_len);
 318     }
 319 #endif
 320 
 321     start_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t)iov[0].iov_base;
 322     if ( 0 < local_count ) {
 323         end_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t)iov[local_count-1].iov_base +
 324             (OMPI_MPI_OFFSET_TYPE)(intptr_t)iov[local_count-1].iov_len - 1;
 325     }
 326     else {
 327         end_offset = 0;
 328     }
 329 #if DEBUG
 330     printf("%d: START OFFSET:%ld, END OFFSET:%ld\n",
 331            fh->f_rank,
 332            (size_t)start_offset,
 333            (size_t)end_offset);
 334 #endif
 335 
 336     start_offsets = (OMPI_MPI_OFFSET_TYPE *)calloc
 337         (fh->f_size, sizeof(OMPI_MPI_OFFSET_TYPE));
 338 
 339     if ( NULL == start_offsets ){
 340         ret = OMPI_ERR_OUT_OF_RESOURCE;
 341         goto exit;
 342     }
 343 
 344     end_offsets = (OMPI_MPI_OFFSET_TYPE *)calloc
 345         (fh->f_size, sizeof(OMPI_MPI_OFFSET_TYPE));
 346 
 347     if (NULL == end_offsets){
 348         ret = OMPI_ERR_OUT_OF_RESOURCE;
 349         goto exit;
 350     }
 351 
 352     ret = fh->f_comm->c_coll->coll_allgather(&start_offset,
 353                                             1,
 354                                             OMPI_OFFSET_DATATYPE,
 355                                             start_offsets,
 356                                             1,
 357                                             OMPI_OFFSET_DATATYPE,
 358                                             fh->f_comm,
 359                                             fh->f_comm->c_coll->coll_allgather_module);
 360 
 361     if ( OMPI_SUCCESS != ret ){
 362         goto exit;
 363     }
 364 
 365     ret = fh->f_comm->c_coll->coll_allgather(&end_offset,
 366                                             1,
 367                                             OMPI_OFFSET_DATATYPE,
 368                                             end_offsets,
 369                                             1,
 370                                             OMPI_OFFSET_DATATYPE,
 371                                             fh->f_comm,
 372                                             fh->f_comm->c_coll->coll_allgather_module);
 373 
 374 
 375     if ( OMPI_SUCCESS != ret ){
 376         goto exit;
 377     }
 378 
 379 #if DEBUG
 380     for (i=0;i<fh->f_size;i++){
 381         printf("%d: start[%d]:%ld,end[%d]:%ld\n",
 382                fh->f_rank,i,
 383                (size_t)start_offsets[i],i,
 384                (size_t)end_offsets[i]);
 385     }
 386 #endif
 387 
 388     for (i=1; i<fh->f_size; i++){
 389         if ((start_offsets[i] < end_offsets[i-1]) &&
 390             (start_offsets[i] <= end_offsets[i])){
 391             interleave_count++;
 392         }
 393     }
 394 
 395 #if DEBUG
 396     printf("%d: interleave_count:%d\n",
 397            fh->f_rank,interleave_count);
 398 #endif
 399 
 400     ret = mca_fcoll_two_phase_domain_partition(fh,
 401                                                start_offsets,
 402                                                end_offsets,
 403                                                &min_st_offset,
 404                                                &fd_start,
 405                                                &fd_end,
 406                                                domain_size,
 407                                                &fd_size,
 408                                                striping_unit,
 409                                                two_phase_num_io_procs);
 410     if (OMPI_SUCCESS != ret){
 411         goto exit;
 412     }
 413 
 414 #if DEBUG
 415     for (i=0;i<two_phase_num_io_procs;i++){
 416         printf("fd_start[%d] : %lld, fd_end[%d] : %lld, local_count: %d\n",
 417                i, fd_start[i], i, fd_end[i], local_count);
 418     }
 419 #endif
 420 
 421     ret = mca_fcoll_two_phase_calc_my_requests (fh,
 422                                                 iov,
 423                                                 local_count,
 424                                                 min_st_offset,
 425                                                 fd_start,
 426                                                 fd_end,
 427                                                 fd_size,
 428                                                 &count_my_req_procs,
 429                                                 &count_my_req_per_proc,
 430                                                 &my_req,
 431                                                 &buf_indices,
 432                                                 striping_unit,
 433                                                 two_phase_num_io_procs,
 434                                                 aggregator_list);
 435     if ( OMPI_SUCCESS != ret ){
 436         goto exit;
 437     }
 438 
 439     ret = mca_fcoll_two_phase_calc_others_requests(fh,
 440                                                    count_my_req_procs,
 441                                                    count_my_req_per_proc,
 442                                                    my_req,
 443                                                    &count_other_req_procs,
 444                                                    &others_req);
 445     if (OMPI_SUCCESS != ret ){
 446         goto exit;
 447     }
 448 
 449 #if DEBUG
 450     printf("%d count_other_req_procs : %d\n",
 451            fh->f_rank,
 452            count_other_req_procs);
 453 #endif
 454 
 455 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 456     start_rexch = MPI_Wtime();
 457 #endif
 458 
 459 
 460     ret = two_phase_read_and_exch(fh,
 461                                   buf,
 462                                   datatype,
 463                                   others_req,
 464                                   iov,
 465                                   local_count,
 466                                   min_st_offset,
 467                                   fd_size,
 468                                   fd_start,
 469                                   fd_end,
 470                                   flat_buf,
 471                                   buf_indices,
 472                                   striping_unit,
 473                                   two_phase_num_io_procs,
 474                                   aggregator_list);
 475 
 476 
 477     if (OMPI_SUCCESS != ret){
 478         goto exit;
 479     }
 480 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 481     end_rexch = MPI_Wtime();
 482     read_exch += (end_rexch - start_rexch);
 483     nentry.time[0] = read_time;
 484     nentry.time[1] = rcomm_time;
 485     nentry.time[2] = read_exch;
 486     if (isread_aggregator(fh->f_rank,
 487                           two_phase_num_io_procs,
 488                           aggregator_list)){
 489         nentry.aggregator = 1;
 490     }
 491     else{
 492         nentry.aggregator = 0;
 493     }
 494     nentry.nprocs_for_coll = two_phase_num_io_procs;
 495 
 496 
 497     if (!mca_common_ompio_full_print_queue(fh->f_coll_read_time)){
 498         mca_common_ompio_register_print_entry(fh->f_coll_read_time,
 499                                               nentry);
 500     }
 501 #endif
 502 
 503 
 504 exit:
 505     if (flat_buf != NULL){
 506         if (flat_buf->blocklens != NULL){
 507             free (flat_buf->blocklens);
 508         }
 509         if (flat_buf->indices != NULL){
 510             free (flat_buf->indices);
 511         }
 512         free (flat_buf);
 513     }
 514 
 515     free (start_offsets);
 516     free (end_offsets);
 517     free (aggregator_list);
 518     free (fd_start);
 519     free (decoded_iov);
 520     free (buf_indices);
 521     free (count_my_req_per_proc);
 522     free (my_req);
 523     free (others_req);
 524     free (fd_end);
 525 
 526     return ret;
 527 }
 528 
 529 
 530 
 531 
 532 static int two_phase_read_and_exch(ompio_file_t *fh,
 533                                    void *buf,
 534                                    MPI_Datatype datatype,
 535                                    mca_common_ompio_access_array_t *others_req,
 536                                    struct iovec *offset_len,
 537                                    int contig_access_count,
 538                                    OMPI_MPI_OFFSET_TYPE min_st_offset,
 539                                    OMPI_MPI_OFFSET_TYPE fd_size,
 540                                    OMPI_MPI_OFFSET_TYPE *fd_start,
 541                                    OMPI_MPI_OFFSET_TYPE *fd_end,
 542                                    Flatlist_node *flat_buf,
 543                                    size_t *buf_idx, int striping_unit,
 544                                    int two_phase_num_io_procs,
 545                                    int *aggregator_list){
 546 
 547 
 548     int ret=OMPI_SUCCESS, i = 0, j = 0, ntimes = 0, max_ntimes = 0;
 549     int m = 0;
 550     int *curr_offlen_ptr=NULL, *count=NULL, *send_size=NULL, *recv_size=NULL;
 551     int *partial_send=NULL, *start_pos=NULL, req_len=0, flag=0;
 552     int *recd_from_proc=NULL;
 553     MPI_Aint buftype_extent=0;
 554     size_t byte_size = 0;
 555     OMPI_MPI_OFFSET_TYPE st_loc=-1, end_loc=-1, off=0, done=0, for_next_iter=0;
 556     OMPI_MPI_OFFSET_TYPE size=0, req_off=0, real_size=0, real_off=0, len=0;
 557     OMPI_MPI_OFFSET_TYPE for_curr_iter=0;
 558     char *read_buf=NULL, *tmp_buf=NULL;
 559     MPI_Datatype byte = MPI_BYTE;
 560     int two_phase_cycle_buffer_size=0;
 561 
 562     opal_datatype_type_size(&byte->super,
 563                             &byte_size);
 564 
 565     for (i = 0; i < fh->f_size; i++){
 566         if (others_req[i].count) {
 567             st_loc = others_req[i].offsets[0];
 568             end_loc = others_req[i].offsets[0];
 569             break;
 570         }
 571     }
 572 
 573     for (i=0;i<fh->f_size;i++){
 574         for(j=0;j< others_req[i].count; j++){
 575             st_loc =
 576                 OMPIO_MIN(st_loc, others_req[i].offsets[j]);
 577             end_loc =
 578                 OMPIO_MAX(end_loc, (others_req[i].offsets[j] +
 579                                     others_req[i].lens[j] - 1));
 580         }
 581     }
 582 
 583     two_phase_cycle_buffer_size = fh->f_bytes_per_agg;
 584     ntimes = (int)((end_loc - st_loc + two_phase_cycle_buffer_size)/
 585                    two_phase_cycle_buffer_size);
 586 
 587     if ((st_loc == -1) && (end_loc == -1)){
 588         ntimes = 0;
 589     }
 590 
 591     fh->f_comm->c_coll->coll_allreduce (&ntimes,
 592                                        &max_ntimes,
 593                                        1,
 594                                        MPI_INT,
 595                                        MPI_MAX,
 596                                        fh->f_comm,
 597                                        fh->f_comm->c_coll->coll_allreduce_module);
 598 
 599     if (ntimes){
 600         read_buf = (char *) calloc (two_phase_cycle_buffer_size, sizeof(char));
 601         if ( NULL == read_buf ){
 602             ret =  OMPI_ERR_OUT_OF_RESOURCE;
 603             goto exit;
 604         }
 605     }
 606 
 607     curr_offlen_ptr = (int *)calloc (fh->f_size,
 608                                      sizeof(int));
 609     if (NULL == curr_offlen_ptr){
 610         ret = OMPI_ERR_OUT_OF_RESOURCE;
 611         goto exit;
 612     }
 613 
 614     count = (int *)calloc (fh->f_size,
 615                            sizeof(int));
 616     if (NULL == count){
 617         ret = OMPI_ERR_OUT_OF_RESOURCE;
 618         goto exit;
 619     }
 620 
 621     partial_send = (int *)calloc(fh->f_size, sizeof(int));
 622     if ( NULL == partial_send ){
 623         ret = OMPI_ERR_OUT_OF_RESOURCE;
 624         goto exit;
 625     }
 626 
 627     send_size = (int *)malloc(fh->f_size * sizeof(int));
 628     if (NULL == send_size){
 629         ret = OMPI_ERR_OUT_OF_RESOURCE;
 630         goto exit;
 631     }
 632 
 633     recv_size = (int *)malloc(fh->f_size * sizeof(int));
 634     if (NULL == recv_size){
 635         ret = OMPI_ERR_OUT_OF_RESOURCE;
 636         goto exit;
 637     }
 638 
 639     recd_from_proc = (int *)calloc(fh->f_size,sizeof(int));
 640     if (NULL == recd_from_proc){
 641         ret = OMPI_ERR_OUT_OF_RESOURCE;
 642         goto exit;
 643     }
 644 
 645     start_pos = (int *) calloc(fh->f_size, sizeof(int));
 646     if ( NULL == start_pos ){
 647         ret = OMPI_ERR_OUT_OF_RESOURCE;
 648         goto exit;
 649     }
 650 
 651     done = 0;
 652     off = st_loc;
 653     for_curr_iter = for_next_iter = 0;
 654 
 655     ompi_datatype_type_extent(datatype, &buftype_extent);
 656 
 657     for (m=0; m<ntimes; m++) {
 658 
 659         size = OMPIO_MIN((unsigned)two_phase_cycle_buffer_size, end_loc-st_loc+1-done);
 660         real_off = off - for_curr_iter;
 661         real_size = size + for_curr_iter;
 662 
 663         for (i=0; i<fh->f_size; i++) count[i] = send_size[i] = 0;
 664         for_next_iter = 0;
 665 
 666         for (i=0; i<fh->f_size; i++) {
 667             if (others_req[i].count) {
 668                 start_pos[i] = curr_offlen_ptr[i];
 669                 for (j=curr_offlen_ptr[i]; j<others_req[i].count;
 670                      j++) {
 671                     if (partial_send[i]) {
 672                         /* this request may have been partially
 673                            satisfied in the previous iteration. */
 674                         req_off = others_req[i].offsets[j] +
 675                             partial_send[i];
 676                         req_len = others_req[i].lens[j] -
 677                             partial_send[i];
 678                         partial_send[i] = 0;
 679                         /* modify the off-len pair to reflect this change */
 680                         others_req[i].offsets[j] = req_off;
 681                         others_req[i].lens[j] = req_len;
 682                     }
 683                     else {
 684                         req_off = others_req[i].offsets[j];
 685                         req_len = others_req[i].lens[j];
 686                     }
 687                     if (req_off < real_off + real_size) {
 688                         count[i]++;
 689                         PMPI_Get_address(read_buf+req_off-real_off,
 690                                      &(others_req[i].mem_ptrs[j]));
 691 
 692                         send_size[i] += (int)(OMPIO_MIN(real_off + real_size - req_off,
 693                                                         (OMPI_MPI_OFFSET_TYPE)req_len));
 694 
 695                         if (real_off+real_size-req_off < (OMPI_MPI_OFFSET_TYPE)req_len) {
 696                             partial_send[i] = (int) (real_off + real_size - req_off);
 697                             if ((j+1 < others_req[i].count) &&
 698                                 (others_req[i].offsets[j+1] <
 699                                  real_off+real_size)) {
 700                                 /* this is the case illustrated in the
 701                                    figure above. */
 702                                 for_next_iter = OMPIO_MAX(for_next_iter,
 703                                                           real_off + real_size - others_req[i].offsets[j+1]);
 704                                 /* max because it must cover requests
 705                                    from different processes */
 706                             }
 707                             break;
 708                         }
 709                     }
 710                     else break;
 711                 }
 712                 curr_offlen_ptr[i] = j;
 713             }
 714         }
 715         flag = 0;
 716         for (i=0; i<fh->f_size; i++)
 717             if (count[i]) flag = 1;
 718 
 719         if (flag) {
 720 
 721 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 722             start_read_time = MPI_Wtime();
 723 #endif
 724 
 725             len = size * byte_size;
 726             fh->f_io_array = (mca_common_ompio_io_array_t *)calloc
 727                 (1,sizeof(mca_common_ompio_io_array_t));
 728             if (NULL == fh->f_io_array) {
 729                 opal_output(1, "OUT OF MEMORY\n");
 730                 ret = OMPI_ERR_OUT_OF_RESOURCE;
 731                 goto exit;
 732             }
 733             fh->f_io_array[0].offset = (IOVBASE_TYPE *)(intptr_t)off;
 734             fh->f_io_array[0].length = len;
 735             fh->f_io_array[0].memory_address =
 736                 read_buf+for_curr_iter;
 737             fh->f_num_of_io_entries = 1;
 738 
 739             if (fh->f_num_of_io_entries){
 740                 if ( 0 > fh->f_fbtl->fbtl_preadv (fh)) {
 741                     opal_output(1, "READ FAILED\n");
 742                     ret = OMPI_ERROR;
 743                     goto exit;
 744                 }
 745             }
 746 
 747 #if 0
 748             int ii;
 749             printf("%d: len/4 : %lld\n",
 750                    fh->f_rank,
 751                    len/4);
 752             for (ii = 0; ii < len/4 ;ii++){
 753                 printf("%d: read_buf[%d]: %ld\n",
 754                        fh->f_rank,
 755                        ii,
 756                        (int *)read_buf[ii]);
 757             }
 758 #endif
 759             fh->f_num_of_io_entries = 0;
 760             if (NULL != fh->f_io_array) {
 761                 free (fh->f_io_array);
 762                 fh->f_io_array = NULL;
 763             }
 764 
 765 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 766             end_read_time = MPI_Wtime();
 767             read_time += (end_read_time - start_read_time);
 768 #endif
 769 
 770 
 771         }
 772 
 773         for_curr_iter = for_next_iter;
 774 
 775         for (i=0; i< fh->f_size; i++){
 776             recv_size[i]  = 0;
 777         }
 778         two_phase_exchange_data(fh, buf, offset_len,
 779                                 send_size, start_pos, recv_size, count,
 780                                 partial_send, recd_from_proc,
 781                                 contig_access_count,
 782                                 min_st_offset, fd_size, fd_start, fd_end,
 783                                 flat_buf, others_req, m, buf_idx,
 784                                 buftype_extent, striping_unit, two_phase_num_io_procs,
 785                                 aggregator_list);
 786 
 787         if (for_next_iter){
 788             tmp_buf = (char *) calloc (for_next_iter, sizeof(char));
 789             memcpy(tmp_buf,
 790                    read_buf+real_size-for_next_iter,
 791                    for_next_iter);
 792             free(read_buf);
 793             read_buf = (char *)malloc(for_next_iter+two_phase_cycle_buffer_size);
 794             memcpy(read_buf, tmp_buf, for_next_iter);
 795             free(tmp_buf);
 796         }
 797 
 798         off += size;
 799         done += size;
 800     }
 801 
 802     for (i=0; i<fh->f_size; i++) count[i] = send_size[i] = 0;
 803     for (m=ntimes; m<max_ntimes; m++)
 804         two_phase_exchange_data(fh, buf, offset_len, send_size,
 805                                 start_pos, recv_size, count,
 806                                 partial_send, recd_from_proc,
 807                                 contig_access_count,
 808                                 min_st_offset, fd_size, fd_start, fd_end,
 809                                 flat_buf, others_req, m, buf_idx,
 810                                 buftype_extent, striping_unit, two_phase_num_io_procs,
 811                                 aggregator_list);
 812 
 813 exit:
 814     free (read_buf);
 815     free (curr_offlen_ptr);
 816     free (count);
 817     free (partial_send);
 818     free (send_size);
 819     free (recv_size);
 820     free (recd_from_proc);
 821     free (start_pos);
 822 
 823     return ret;
 824 
 825 }
 826 
 827 static int two_phase_exchange_data(ompio_file_t *fh,
 828                                    void *buf, struct iovec *offset_len,
 829                                    int *send_size, int *start_pos, int *recv_size,
 830                                    int *count, int *partial_send,
 831                                    int *recd_from_proc, int contig_access_count,
 832                                    OMPI_MPI_OFFSET_TYPE min_st_offset,
 833                                    OMPI_MPI_OFFSET_TYPE fd_size,
 834                                    OMPI_MPI_OFFSET_TYPE *fd_start,
 835                                    OMPI_MPI_OFFSET_TYPE *fd_end,
 836                                    Flatlist_node *flat_buf,
 837                                    mca_common_ompio_access_array_t *others_req,
 838                                    int iter, size_t *buf_idx,
 839                                    MPI_Aint buftype_extent, int striping_unit,
 840                                    int two_phase_num_io_procs, int *aggregator_list)
 841 {
 842 
 843     int i=0, j=0, k=0, tmp=0, nprocs_recv=0, nprocs_send=0;
 844     int ret = OMPI_SUCCESS;
 845     char **recv_buf = NULL;
 846     MPI_Request *requests=NULL;
 847     MPI_Datatype send_type;
 848 
 849 
 850 
 851 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 852     start_rcomm_time = MPI_Wtime();
 853 #endif
 854 
 855     ret = fh->f_comm->c_coll->coll_alltoall (send_size,
 856                                             1,
 857                                             MPI_INT,
 858                                             recv_size,
 859                                             1,
 860                                             MPI_INT,
 861                                             fh->f_comm,
 862                                             fh->f_comm->c_coll->coll_alltoall_module);
 863 
 864     if ( OMPI_SUCCESS != ret ){
 865         goto exit;
 866     }
 867 
 868 
 869 #if DEBUG
 870     for (i=0; i<fh->f_size; i++){
 871         printf("%d: RS[%d]: %d\n", fh->f_rank,
 872                i,
 873                recv_size[i]);
 874     }
 875 #endif
 876 
 877 
 878     nprocs_recv = 0;
 879     for (i=0; i < fh->f_size; i++)
 880         if (recv_size[i]) nprocs_recv++;
 881 
 882     nprocs_send = 0;
 883     for (i=0; i< fh->f_size; i++)
 884         if (send_size[i]) nprocs_send++;
 885 
 886     requests = (MPI_Request *)
 887         malloc((nprocs_send+nprocs_recv+1) *  sizeof(MPI_Request));
 888 
 889     if (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY) {
 890         j = 0;
 891         for (i=0; i < fh->f_size; i++){
 892             if (recv_size[i]){
 893                 ret = MCA_PML_CALL(irecv(((char *) buf)+ buf_idx[i],
 894                                          recv_size[i],
 895                                          MPI_BYTE,
 896                                          i,
 897                                          fh->f_rank+i+100*iter,
 898                                          fh->f_comm,
 899                                          requests+j));
 900 
 901                 if ( OMPI_SUCCESS != ret ){
 902                     return ret;
 903                 }
 904                 j++;
 905                 buf_idx[i] += recv_size[i];
 906             }
 907         }
 908     }
 909     else{
 910 
 911         recv_buf = (char **) calloc (fh->f_size, sizeof(char *));
 912         if (NULL == recv_buf){
 913             ret = OMPI_ERR_OUT_OF_RESOURCE;
 914             goto exit;
 915         }
 916 
 917         for (i=0; i < fh->f_size; i++)
 918             if(recv_size[i]) recv_buf[i] =
 919                                  (char *) malloc (recv_size[i] *  sizeof(char));
 920         j = 0;
 921         for(i=0; i<fh->f_size; i++)
 922             if (recv_size[i]) {
 923                 ret = MCA_PML_CALL(irecv(recv_buf[i],
 924                                          recv_size[i],
 925                                          MPI_BYTE,
 926                                          i,
 927                                          fh->f_rank+i+100*iter,
 928                                          fh->f_comm,
 929                                          requests+j));
 930                 j++;
 931 
 932             }
 933     }
 934 
 935 
 936 
 937     j = 0;
 938     for (i = 0; i< fh->f_size; i++){
 939         if (send_size[i]){
 940             if (partial_send[i]){
 941                 k = start_pos[i] + count[i] - 1;
 942                 tmp = others_req[i].lens[k];
 943                 others_req[i].lens[k] = partial_send[i];
 944             }
 945 
 946             ompi_datatype_create_hindexed(count[i],
 947                                           &(others_req[i].lens[start_pos[i]]),
 948                                           &(others_req[i].mem_ptrs[start_pos[i]]),
 949                                           MPI_BYTE,
 950                                           &send_type);
 951 
 952             ompi_datatype_commit(&send_type);
 953 
 954             ret = MCA_PML_CALL(isend(MPI_BOTTOM,
 955                                      1,
 956                                      send_type,
 957                                      i,
 958                                      fh->f_rank+i+100*iter,
 959                                      MCA_PML_BASE_SEND_STANDARD,
 960                                      fh->f_comm,
 961                                      requests+nprocs_recv+j));
 962             ompi_datatype_destroy(&send_type);
 963 
 964             if (partial_send[i]) others_req[i].lens[k] = tmp;
 965             j++;
 966         }
 967     }
 968 
 969 
 970     if (nprocs_recv) {
 971 
 972         ret = ompi_request_wait_all(nprocs_recv,
 973                                     requests,
 974                                     MPI_STATUS_IGNORE);
 975         if (OMPI_SUCCESS != ret) {
 976             goto exit;
 977         }
 978 
 979         if (! (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) {
 980 
 981             two_phase_fill_user_buffer(fh, buf, flat_buf,
 982                                        recv_buf, offset_len,
 983                                        (unsigned *)recv_size, requests,
 984                                        recd_from_proc, contig_access_count,
 985                                        min_st_offset, fd_size, fd_start, fd_end,
 986                                        buftype_extent, striping_unit, two_phase_num_io_procs,
 987                                        aggregator_list);
 988         }
 989     }
 990 
 991     ret = ompi_request_wait_all(nprocs_send,
 992                                 requests+nprocs_recv,
 993                                 MPI_STATUS_IGNORE);
 994 
 995 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
 996     end_rcomm_time = MPI_Wtime();
 997     rcomm_time += (end_rcomm_time - start_rcomm_time);
 998 #endif
 999 
1000 exit:
1001 
1002     if (recv_buf) {
1003         for (i=0; i< fh->f_size; i++){
1004             free(recv_buf[i]);
1005         }
1006 
1007         free(recv_buf);
1008     }
1009 
1010     free(requests);
1011 
1012     return ret;
1013 
1014 }
1015 
1016 
1017 #define TWO_PHASE_BUF_INCR                      \
1018     {                                           \
1019         while (buf_incr) {                              \
1020             size_in_buf = OMPIO_MIN(buf_incr, flat_buf_sz);     \
1021             user_buf_idx += size_in_buf;                        \
1022             flat_buf_sz -= size_in_buf;                         \
1023             if (!flat_buf_sz) {                                       \
1024                 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
1025                 else {                                                  \
1026                     flat_buf_idx = 0;                                   \
1027                     n_buftypes++;                                       \
1028                 }                                                       \
1029                 user_buf_idx = flat_buf->indices[flat_buf_idx] +        \
1030                 (OMPI_MPI_OFFSET_TYPE)n_buftypes*(OMPI_MPI_OFFSET_TYPE)buftype_extent; \
1031             flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
1032         } \
1033         buf_incr -= size_in_buf; \
1034     } \
1035 }
1036 
1037 
1038 #define TWO_PHASE_BUF_COPY \
1039 { \
1040     while (size) { \
1041         size_in_buf = OMPIO_MIN(size, flat_buf_sz); \
1042         memcpy(((char *) buf) + user_buf_idx, \
1043                &(recv_buf[p][recv_buf_idx[p]]), size_in_buf); \
1044         recv_buf_idx[p] += size_in_buf; \
1045         user_buf_idx += size_in_buf; \
1046         flat_buf_sz -= size_in_buf; \
1047         if (!flat_buf_sz) { \
1048            if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
1049             else { \
1050                 flat_buf_idx = 0; \
1051                 n_buftypes++; \
1052             } \
1053             user_buf_idx = flat_buf->indices[flat_buf_idx] + \
1054               (OMPI_MPI_OFFSET_TYPE)n_buftypes*(OMPI_MPI_OFFSET_TYPE)buftype_extent; \
1055             flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
1056         } \
1057         size -= size_in_buf; \
1058         buf_incr -= size_in_buf; \
1059     } \
1060     TWO_PHASE_BUF_INCR \
1061 }
1062 
1063 
1064 
1065 static void two_phase_fill_user_buffer(ompio_file_t *fh,
1066                                        void *buf,
1067                                        Flatlist_node *flat_buf,
1068                                        char **recv_buf,
1069                                        struct iovec *offset_length,
1070                                        unsigned *recv_size,
1071                                        MPI_Request *requests,
1072                                        int *recd_from_proc,
1073                                        int contig_access_count,
1074                                        OMPI_MPI_OFFSET_TYPE min_st_offset,
1075                                        OMPI_MPI_OFFSET_TYPE fd_size,
1076                                        OMPI_MPI_OFFSET_TYPE *fd_start,
1077                                        OMPI_MPI_OFFSET_TYPE *fd_end,
1078                                        MPI_Aint buftype_extent,
1079                                        int striping_unit, int two_phase_num_io_procs,
1080                                        int *aggregator_list){
1081 
1082     int i = 0, p = 0, flat_buf_idx = 0;
1083     OMPI_MPI_OFFSET_TYPE flat_buf_sz = 0, size_in_buf = 0, buf_incr = 0, size = 0;
1084     int n_buftypes = 0;
1085     OMPI_MPI_OFFSET_TYPE off=0, len=0, rem_len=0, user_buf_idx=0;
1086     unsigned *curr_from_proc=NULL, *done_from_proc=NULL, *recv_buf_idx=NULL;
1087 
1088     curr_from_proc = (unsigned *) malloc (fh->f_size * sizeof(unsigned));
1089     done_from_proc = (unsigned *) malloc (fh->f_size * sizeof(unsigned));
1090     recv_buf_idx = (unsigned *) malloc (fh->f_size * sizeof(unsigned));
1091 
1092     for (i=0; i < fh->f_size; i++) {
1093         recv_buf_idx[i] = curr_from_proc[i] = 0;
1094         done_from_proc[i] = recd_from_proc[i];
1095     }
1096 
1097     flat_buf_idx = 0;
1098     n_buftypes = 0;
1099 
1100     if ( flat_buf->count > 0 ) {
1101         user_buf_idx = flat_buf->indices[0];
1102         flat_buf_sz = flat_buf->blocklens[0];
1103     }
1104 
1105     /* flat_buf_idx = current index into flattened buftype
1106        flat_buf_sz = size of current contiguous component in
1107        flattened buf */
1108 
1109     for (i=0; i<contig_access_count; i++) {
1110 
1111         off     = (OMPI_MPI_OFFSET_TYPE)(intptr_t)offset_length[i].iov_base;
1112         rem_len = (OMPI_MPI_OFFSET_TYPE)offset_length[i].iov_len;
1113 
1114         /* this request may span the file domains of more than one process */
1115         while (rem_len != 0) {
1116             len = rem_len;
1117             /* NOTE: len value is modified by ADIOI_Calc_aggregator() to be no
1118              * longer than the single region that processor "p" is responsible
1119              * for.
1120              */
1121             p = mca_fcoll_two_phase_calc_aggregator(fh,
1122                                                     off,
1123                                                     min_st_offset,
1124                                                     &len,
1125                                                     fd_size,
1126                                                     fd_start,
1127                                                     fd_end,
1128                                                     striping_unit,
1129                                                     two_phase_num_io_procs,
1130                                                     aggregator_list);
1131 
1132             if (recv_buf_idx[p] < recv_size[p]) {
1133                 if (curr_from_proc[p]+len > done_from_proc[p]) {
1134                     if (done_from_proc[p] > curr_from_proc[p]) {
1135                         size = OMPIO_MIN(curr_from_proc[p] + len -
1136                                          done_from_proc[p], recv_size[p]-recv_buf_idx[p]);
1137                         buf_incr = done_from_proc[p] - curr_from_proc[p];
1138                         TWO_PHASE_BUF_INCR
1139                             buf_incr = curr_from_proc[p]+len-done_from_proc[p];
1140                         curr_from_proc[p] = done_from_proc[p] + size;
1141                         TWO_PHASE_BUF_COPY
1142                             }
1143                     else {
1144                         size = OMPIO_MIN(len,recv_size[p]-recv_buf_idx[p]);
1145                         buf_incr = len;
1146                         curr_from_proc[p] += (unsigned) size;
1147                         TWO_PHASE_BUF_COPY
1148                             }
1149                 }
1150                 else {
1151                     curr_from_proc[p] += (unsigned) len;
1152                     buf_incr = len;
1153                     TWO_PHASE_BUF_INCR
1154                         }
1155             }
1156             else {
1157                 buf_incr = len;
1158                 TWO_PHASE_BUF_INCR
1159                     }
1160             off += len;
1161             rem_len -= len;
1162         }
1163     }
1164     for (i=0; i < fh->f_size; i++)
1165         if (recv_size[i]) recd_from_proc[i] = curr_from_proc[i];
1166 
1167     free(curr_from_proc);
1168     free(done_from_proc);
1169     free(recv_buf_idx);
1170 
1171 }
1172 
1173 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
1174 int isread_aggregator(int rank,
1175                       int nprocs_for_coll,
1176                       int *aggregator_list){
1177 
1178     int i=0;
1179     for (i=0; i<nprocs_for_coll; i++){
1180         if (aggregator_list[i] == rank)
1181             return 1;
1182     }
1183     return 0;
1184 }
1185 #endif

/* [<][>][^][v][top][bottom][index][help] */