root/ompi/mca/fcoll/two_phase/fcoll_two_phase_support_fns.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_fcoll_two_phase_domain_partition
  2. mca_fcoll_two_phase_calc_aggregator
  3. mca_fcoll_two_phase_calc_others_requests
  4. mca_fcoll_two_phase_calc_my_requests

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
   4  *                         University Research and Technology
   5  *                         Corporation.  All rights reserved.
   6  * Copyright (c) 2004-2017 The University of Tennessee and The University
   7  *                         of Tennessee Research Foundation.  All rights
   8  *                         reserved.
   9  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
  10  *                         University of Stuttgart.  All rights reserved.
  11  * Copyright (c) 2004-2005 The Regents of the University of California.
  12  *                         All rights reserved.
  13  * Copyright (c) 2008-2011 University of Houston. All rights reserved.
  14  * Copyright (c) 2014-2018 Research Organization for Information Science
  15  *                         and Technology (RIST). All rights reserved.
  16  * Copyright (c) 2015      Los Alamos National Security, LLC. All rights
  17  *                         reserved.
  18  * $COPYRIGHT$
  19  *
  20  * Additional copyrights may follow
  21  *
  22  * $HEADER$
  23  */
  24 
  25 #include "ompi_config.h"
  26 #include "fcoll_two_phase.h"
  27 
  28 #include "mpi.h"
  29 #include "ompi/constants.h"
  30 #include "ompi/mca/fcoll/fcoll.h"
  31 #include "ompi/mca/common/ompio/common_ompio.h"
  32 #include "ompi/mca/io/io.h"
  33 #include "opal/mca/base/base.h"
  34 #include "math.h"
  35 #include "ompi/mca/pml/pml.h"
  36 #include <unistd.h>
  37 
  38 /*Based on ROMIO's domain partitioning implementaion
  39 Series of functions implementations for two-phase implementation
  40 Functions to support Domain partitioning and aggregator
  41 selection for two_phase .
  42 This is commom to both two_phase_read and write. */
  43 
  44 int mca_fcoll_two_phase_domain_partition (ompio_file_t *fh,
  45                                           OMPI_MPI_OFFSET_TYPE *start_offsets,
  46                                           OMPI_MPI_OFFSET_TYPE *end_offsets,
  47                                           OMPI_MPI_OFFSET_TYPE *min_st_offset_ptr,
  48                                           OMPI_MPI_OFFSET_TYPE **fd_st_ptr,
  49                                           OMPI_MPI_OFFSET_TYPE **fd_end_ptr,
  50                                           int min_fd_size,
  51                                           OMPI_MPI_OFFSET_TYPE *fd_size_ptr,
  52                                           int striping_unit,
  53                                           int nprocs_for_coll){
  54 
  55     OMPI_MPI_OFFSET_TYPE min_st_offset, max_end_offset, *fd_start=NULL, *fd_end=NULL, fd_size;
  56     int i;
  57 
  58     min_st_offset = start_offsets[0];
  59     max_end_offset = end_offsets[0];
  60 
  61     for (i=0; i< fh->f_size; i++){
  62         min_st_offset = OMPIO_MIN(min_st_offset, start_offsets[i]);
  63         max_end_offset = OMPIO_MAX(max_end_offset, end_offsets[i]);
  64 
  65     }
  66 
  67     fd_size = ((max_end_offset - min_st_offset + 1) + nprocs_for_coll - 1)/nprocs_for_coll;
  68 
  69     if (fd_size < min_fd_size)
  70         fd_size = min_fd_size;
  71 
  72     *fd_st_ptr = (OMPI_MPI_OFFSET_TYPE *)
  73         malloc(nprocs_for_coll*sizeof(OMPI_MPI_OFFSET_TYPE));
  74 
  75     if ( NULL == *fd_st_ptr ) {
  76         return OMPI_ERR_OUT_OF_RESOURCE;
  77     }
  78 
  79     *fd_end_ptr = (OMPI_MPI_OFFSET_TYPE *)
  80         malloc(nprocs_for_coll*sizeof(OMPI_MPI_OFFSET_TYPE));
  81 
  82     if ( NULL == *fd_end_ptr ) {
  83         return OMPI_ERR_OUT_OF_RESOURCE;
  84     }
  85 
  86 
  87     fd_start = *fd_st_ptr;
  88     fd_end = *fd_end_ptr;
  89 
  90 
  91     if (striping_unit > 0){
  92       /* Lock Boundary based domain partitioning */
  93         int rem_front, rem_back;
  94         OMPI_MPI_OFFSET_TYPE end_off;
  95 
  96         fd_start[0] = min_st_offset;
  97         end_off     = fd_start[0] + fd_size;
  98         rem_front   = end_off % striping_unit;
  99         rem_back    = striping_unit - rem_front;
 100         if (rem_front < rem_back)
 101                 end_off -= rem_front;
 102         else
 103                 end_off += rem_back;
 104         fd_end[0] = end_off - 1;
 105 
 106         /* align fd_end[i] to the nearest file lock boundary */
 107         for (i=1; i<nprocs_for_coll; i++) {
 108             fd_start[i] = fd_end[i-1] + 1;
 109             end_off     = min_st_offset + fd_size * (i+1);
 110             rem_front   = end_off % striping_unit;
 111             rem_back    = striping_unit - rem_front;
 112             if (rem_front < rem_back)
 113                     end_off -= rem_front;
 114             else
 115                     end_off += rem_back;
 116             fd_end[i] = end_off - 1;
 117         }
 118         fd_end[nprocs_for_coll-1] = max_end_offset;
 119     }
 120     else{
 121         fd_start[0] = min_st_offset;
 122         fd_end[0] = min_st_offset + fd_size - 1;
 123 
 124         for (i=1; i<nprocs_for_coll; i++) {
 125             fd_start[i] = fd_end[i-1] + 1;
 126             fd_end[i] = fd_start[i] + fd_size - 1;
 127         }
 128 
 129     }
 130 
 131     for (i=0; i<nprocs_for_coll; i++) {
 132         if (fd_start[i] > max_end_offset)
 133             fd_start[i] = fd_end[i] = -1;
 134         if (fd_end[i] > max_end_offset)
 135             fd_end[i] = max_end_offset;
 136     }
 137 
 138     *fd_size_ptr = fd_size;
 139     *min_st_offset_ptr = min_st_offset;
 140 
 141     return OMPI_SUCCESS;
 142 }
 143 
 144 
 145 
 146 int mca_fcoll_two_phase_calc_aggregator(ompio_file_t *fh,
 147                                         OMPI_MPI_OFFSET_TYPE off,
 148                                         OMPI_MPI_OFFSET_TYPE min_off,
 149                                         OMPI_MPI_OFFSET_TYPE *len,
 150                                         OMPI_MPI_OFFSET_TYPE fd_size,
 151                                         OMPI_MPI_OFFSET_TYPE *fd_start,
 152                                         OMPI_MPI_OFFSET_TYPE *fd_end,
 153                                         int striping_unit,
 154                                         int num_aggregators,
 155                                         int *aggregator_list)
 156 {
 157 
 158 
 159     int rank_index, rank;
 160     OMPI_MPI_OFFSET_TYPE avail_bytes;
 161 
 162     rank_index = (int) ((off - min_off + fd_size)/ fd_size - 1);
 163 
 164     if (striping_unit > 0){
 165         rank_index = 0;
 166         while (off > fd_end[rank_index]) rank_index++;
 167     }
 168 
 169 
 170     if (rank_index >= num_aggregators || rank_index < 0) {
 171        fprintf(stderr,
 172                "Error in ompi_io_ompio_calcl_aggregator():");
 173        fprintf(stderr,
 174                "rank_index(%d) >= num_aggregators(%d)fd_size=%lld off=%lld\n",
 175                rank_index,num_aggregators,fd_size,off);
 176        ompi_mpi_abort(&ompi_mpi_comm_world.comm, 1);
 177     }
 178 
 179 
 180     avail_bytes = fd_end[rank_index] + 1 - off;
 181     if (avail_bytes < *len){
 182         *len = avail_bytes;
 183     }
 184 
 185     rank = aggregator_list[rank_index];
 186 
 187     #if 0
 188     printf("rank : %d, rank_index : %d\n",rank, rank_index);
 189     #endif
 190 
 191     return rank;
 192 }
 193 
 194 int mca_fcoll_two_phase_calc_others_requests(ompio_file_t *fh,
 195                                              int count_my_req_procs,
 196                                              int *count_my_req_per_proc,
 197                                              mca_common_ompio_access_array_t *my_req,
 198                                              int *count_others_req_procs_ptr,
 199                                              mca_common_ompio_access_array_t **others_req_ptr)
 200 {
 201 
 202 
 203     int *count_others_req_per_proc=NULL, count_others_req_procs;
 204     int i,j, ret=OMPI_SUCCESS;
 205     MPI_Request *requests=NULL;
 206     mca_common_ompio_access_array_t *others_req=NULL;
 207 
 208     count_others_req_per_proc = (int *)malloc(fh->f_size*sizeof(int));
 209 
 210     if ( NULL == count_others_req_per_proc ) {
 211         return OMPI_ERR_OUT_OF_RESOURCE;
 212     }
 213 
 214     /* Change it to the ompio specific alltoall in coll module : VVN*/
 215     ret =  fh->f_comm->c_coll->coll_alltoall (count_my_req_per_proc,
 216                                              1,
 217                                              MPI_INT,
 218                                              count_others_req_per_proc,
 219                                              1,
 220                                              MPI_INT,
 221                                              fh->f_comm,
 222                                              fh->f_comm->c_coll->coll_alltoall_module);
 223     if ( OMPI_SUCCESS != ret ) {
 224         return ret;
 225     }
 226 
 227 #if 0
 228     for( i = 0; i< fh->f_size; i++){
 229         printf("my: %d, others: %d\n",count_my_req_per_proc[i],
 230                count_others_req_per_proc[i]);
 231 
 232     }
 233 #endif
 234 
 235     *others_req_ptr = (mca_common_ompio_access_array_t  *) malloc
 236         (fh->f_size*sizeof(mca_common_ompio_access_array_t));
 237     others_req = *others_req_ptr;
 238 
 239     count_others_req_procs = 0;
 240     for (i=0; i<fh->f_size; i++) {
 241         if (count_others_req_per_proc[i]) {
 242             others_req[i].count = count_others_req_per_proc[i];
 243             others_req[i].offsets = (OMPI_MPI_OFFSET_TYPE *)
 244                 malloc(count_others_req_per_proc[i]*sizeof(OMPI_MPI_OFFSET_TYPE));
 245             others_req[i].lens = (int *)
 246                 malloc(count_others_req_per_proc[i]*sizeof(int));
 247             others_req[i].mem_ptrs = (MPI_Aint *)
 248                 malloc(count_others_req_per_proc[i]*sizeof(MPI_Aint));
 249             count_others_req_procs++;
 250         }
 251         else
 252             others_req[i].count = 0;
 253     }
 254 
 255 
 256     requests = (MPI_Request *)
 257         malloc(1+2*(count_my_req_procs+count_others_req_procs)*
 258                sizeof(MPI_Request));
 259 
 260     if ( NULL == requests ) {
 261         ret = OMPI_ERR_OUT_OF_RESOURCE;
 262         goto exit;
 263     }
 264 
 265     j = 0;
 266     for (i=0; i<fh->f_size; i++){
 267         if (others_req[i].count){
 268             ret = MCA_PML_CALL(irecv(others_req[i].offsets,
 269                                      others_req[i].count,
 270                                      OMPI_OFFSET_DATATYPE,
 271                                      i,
 272                                      i+fh->f_rank,
 273                                      fh->f_comm,
 274                                      &requests[j]));
 275             if ( OMPI_SUCCESS != ret  ) {
 276                 goto exit;
 277             }
 278 
 279             j++;
 280 
 281             ret = MCA_PML_CALL(irecv(others_req[i].lens,
 282                                      others_req[i].count,
 283                                      MPI_INT,
 284                                      i,
 285                                      i+fh->f_rank+1,
 286                                      fh->f_comm,
 287                                      &requests[j]));
 288             if ( OMPI_SUCCESS != ret  ) {
 289                 goto exit;
 290             }
 291 
 292             j++;
 293         }
 294     }
 295 
 296 
 297     for (i=0; i < fh->f_size; i++) {
 298         if (my_req[i].count) {
 299             ret = MCA_PML_CALL(isend(my_req[i].offsets,
 300                                      my_req[i].count,
 301                                      OMPI_OFFSET_DATATYPE,
 302                                      i,
 303                                      i+fh->f_rank,
 304                                      MCA_PML_BASE_SEND_STANDARD,
 305                                      fh->f_comm,
 306                                      &requests[j]));
 307             if ( OMPI_SUCCESS != ret  ) {
 308                 goto exit;
 309             }
 310 
 311             j++;
 312             ret = MCA_PML_CALL(isend(my_req[i].lens,
 313                                      my_req[i].count,
 314                                      MPI_INT,
 315                                      i,
 316                                      i+fh->f_rank+1,
 317                                      MCA_PML_BASE_SEND_STANDARD,
 318                                      fh->f_comm,
 319                                      &requests[j]));
 320             if ( OMPI_SUCCESS != ret  ) {
 321                 goto exit;
 322             }
 323 
 324             j++;
 325         }
 326     }
 327 
 328     if (j) {
 329         ret = ompi_request_wait_all ( j, requests, MPI_STATUSES_IGNORE );
 330         if ( OMPI_SUCCESS != ret  ) {
 331             return ret;
 332         }
 333     }
 334 
 335     *count_others_req_procs_ptr = count_others_req_procs;
 336 
 337 exit:
 338     if ( NULL != requests ) {
 339         free(requests);
 340     }
 341     if ( NULL != count_others_req_per_proc ) {
 342         free(count_others_req_per_proc);
 343     }
 344 
 345 
 346     return ret;
 347 }
 348 
 349 
 350 int mca_fcoll_two_phase_calc_my_requests (ompio_file_t *fh,
 351                                           struct iovec *offset_len,
 352                                           int contig_access_count,
 353                                           OMPI_MPI_OFFSET_TYPE min_st_offset,
 354                                           OMPI_MPI_OFFSET_TYPE *fd_start,
 355                                           OMPI_MPI_OFFSET_TYPE *fd_end,
 356                                           OMPI_MPI_OFFSET_TYPE fd_size,
 357                                           int *count_my_req_procs_ptr,
 358                                           int **count_my_req_per_proc_ptr,
 359                                           mca_common_ompio_access_array_t **my_req_ptr,
 360                                           size_t **buf_indices,
 361                                           int striping_unit,
 362                                           int num_aggregators,
 363                                           int *aggregator_list)
 364 {
 365     int ret = MPI_SUCCESS;
 366     int *count_my_req_per_proc, count_my_req_procs;
 367     size_t *buf_idx = NULL;
 368     int i, l, proc;
 369     OMPI_MPI_OFFSET_TYPE fd_len, rem_len, curr_idx, off;
 370     mca_common_ompio_access_array_t *my_req = NULL;
 371 
 372 
 373     *count_my_req_per_proc_ptr = (int*)malloc(fh->f_size*sizeof(int));
 374 
 375     if ( NULL == *count_my_req_per_proc_ptr ){
 376         return OMPI_ERR_OUT_OF_RESOURCE;
 377     }
 378 
 379     count_my_req_per_proc = *count_my_req_per_proc_ptr;
 380 
 381     for (i=0;i<fh->f_size;i++){
 382         count_my_req_per_proc[i] = 0;
 383     }
 384 
 385     buf_idx = (size_t *) malloc (fh->f_size * sizeof(size_t));
 386 
 387     if ( NULL == buf_idx ){
 388         return OMPI_ERR_OUT_OF_RESOURCE;
 389     }
 390 
 391     for (i=0; i < fh->f_size; i++) buf_idx[i] = -1;
 392 
 393     for (i=0;i<contig_access_count; i++){
 394 
 395         if (offset_len[i].iov_len==0)
 396             continue;
 397         off = (OMPI_MPI_OFFSET_TYPE)(intptr_t)offset_len[i].iov_base;
 398         fd_len = (OMPI_MPI_OFFSET_TYPE)offset_len[i].iov_len;
 399         proc = mca_fcoll_two_phase_calc_aggregator(fh, off, min_st_offset, &fd_len, fd_size,
 400                                              fd_start, fd_end, striping_unit, num_aggregators,aggregator_list);
 401         count_my_req_per_proc[proc]++;
 402         rem_len = offset_len[i].iov_len - fd_len;
 403 
 404         while (rem_len != 0) {
 405             off += fd_len; /* point to first remaining byte */
 406             fd_len = rem_len; /* save remaining size, pass to calc */
 407             proc = mca_fcoll_two_phase_calc_aggregator(fh, off, min_st_offset, &fd_len,
 408                                                  fd_size, fd_start, fd_end, striping_unit,
 409                                                  num_aggregators, aggregator_list);
 410 
 411             count_my_req_per_proc[proc]++;
 412             rem_len -= fd_len; /* reduce remaining length by amount from fd */
 413         }
 414 
 415     }
 416 
 417 /*    printf("%d: fh->f_size : %d\n", fh->f_rank,fh->f_size);*/
 418     *my_req_ptr =  (mca_common_ompio_access_array_t *)
 419         malloc (fh->f_size * sizeof(mca_common_ompio_access_array_t));
 420     if ( NULL == *my_req_ptr ) {
 421         ret = OMPI_ERR_OUT_OF_RESOURCE;
 422         goto err_exit;
 423     }
 424     my_req = *my_req_ptr;
 425 
 426     count_my_req_procs = 0;
 427     for (i = 0; i < fh->f_size; i++){
 428         if(count_my_req_per_proc[i]) {
 429             my_req[i].offsets = (OMPI_MPI_OFFSET_TYPE *)
 430                 malloc(count_my_req_per_proc[i] * sizeof(OMPI_MPI_OFFSET_TYPE));
 431 
 432             if ( NULL == my_req[i].offsets ) {
 433                 ret =  OMPI_ERR_OUT_OF_RESOURCE;
 434                 goto err_exit;
 435             }
 436 
 437             my_req[i].lens = (int *)
 438                 malloc(count_my_req_per_proc[i] * sizeof(int));
 439 
 440             if ( NULL == my_req[i].lens ) {
 441                 ret =  OMPI_ERR_OUT_OF_RESOURCE;
 442                 goto err_exit;
 443             }
 444             count_my_req_procs++;
 445         }
 446         my_req[i].count = 0;
 447     }
 448     curr_idx = 0;
 449     for (i=0; i<contig_access_count; i++) {
 450         if ((int)offset_len[i].iov_len == 0)
 451             continue;
 452         off = (OMPI_MPI_OFFSET_TYPE)(intptr_t)offset_len[i].iov_base;
 453         fd_len = (OMPI_MPI_OFFSET_TYPE)offset_len[i].iov_len;
 454         proc = mca_fcoll_two_phase_calc_aggregator(fh, off, min_st_offset, &fd_len,
 455                                              fd_size, fd_start, fd_end,
 456                                              striping_unit, num_aggregators,
 457                                              aggregator_list);
 458         if (buf_idx[proc] == (size_t) -1){
 459             buf_idx[proc] = (int) curr_idx;
 460         }
 461         l = my_req[proc].count;
 462         curr_idx += fd_len;
 463         rem_len = offset_len[i].iov_len - fd_len;
 464         my_req[proc].offsets[l] = off;
 465         my_req[proc].lens[l] = (int)fd_len;
 466         my_req[proc].count++;
 467 
 468         while (rem_len != 0) {
 469             off += fd_len;
 470             fd_len = rem_len;
 471             proc = mca_fcoll_two_phase_calc_aggregator(fh, off, min_st_offset,
 472                                                        &fd_len, fd_size, fd_start,
 473                                                        fd_end, striping_unit,
 474                                                        num_aggregators,
 475                                                        aggregator_list);
 476 
 477             if (buf_idx[proc] == (size_t) -1){
 478                 buf_idx[proc] = (int) curr_idx;
 479             }
 480 
 481             l = my_req[proc].count;
 482             curr_idx += fd_len;
 483             rem_len -= fd_len;
 484 
 485             my_req[proc].offsets[l] = off;
 486             my_req[proc].lens[l] = (int) fd_len;
 487             my_req[proc].count++;
 488 
 489         }
 490 
 491     }
 492 
 493   #if 0
 494     for (i=0; i<fh->f_size; i++) {
 495         if (count_my_req_per_proc[i] > 0) {
 496             fprintf(stdout, "data needed from %d (count = %d):\n", i,
 497                     my_req[i].count);
 498             for (l=0; l < my_req[i].count; l++) {
 499                 fprintf(stdout, " %d: off[%d] = %lld, len[%d] = %d\n", fh->f_rank, l,
 500                 my_req[i].offsets[l], l, my_req[i].lens[l]);
 501             }
 502             fprintf(stdout, "%d: buf_idx[%d] = 0x%x\n", fh->f_rank, i, buf_idx[i]);
 503         }
 504     }
 505 #endif
 506 
 507 
 508     *count_my_req_procs_ptr = count_my_req_procs;
 509     *buf_indices = buf_idx;
 510 
 511     return ret;
 512 err_exit:
 513     if (NULL != my_req) {
 514         for (i = 0; i < fh->f_size; i++) {
 515             if (NULL != my_req[i].offsets) {
 516                 free(my_req[i].offsets);
 517             }
 518             if (NULL != my_req[i].lens) {
 519                 free(my_req[i].lens);
 520             }
 521         }
 522     }
 523     if (NULL != buf_idx) {
 524         free(buf_idx);
 525     }
 526     return ret;
 527 }
 528 /*Two-phase support functions ends here!*/

/* [<][>][^][v][top][bottom][index][help] */