root/ompi/mca/io/romio321/romio/adio/ad_lustre/ad_lustre_wrcoll.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. ADIOI_LUSTRE_WriteStridedColl
  2. ADIOI_LUSTRE_Exch_and_write
  3. ADIOI_LUSTRE_W_Exchange_data
  4. ADIOI_LUSTRE_Fill_send_buffer

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
   2 /*
   3  *   Copyright (C) 1997 University of Chicago.
   4  *   See COPYRIGHT notice in top-level directory.
   5  *
   6  *   Copyright (C) 2007 Oak Ridge National Laboratory
   7  *
   8  *   Copyright (C) 2008 Sun Microsystems, Lustre group
   9  */
  10 
  11 #include "ad_lustre.h"
  12 #include "adio_extern.h"
  13 
  14 /* prototypes of functions used for collective writes only. */
  15 static void ADIOI_LUSTRE_Exch_and_write(ADIO_File fd, const void *buf,
  16                                         MPI_Datatype datatype, int nprocs,
  17                                         int myrank,
  18                                         ADIOI_Access *others_req,
  19                                         ADIOI_Access *my_req,
  20                                         ADIO_Offset *offset_list,
  21                                         ADIO_Offset *len_list,
  22                                         int contig_access_count,
  23                                         int *striping_info,
  24                                         int **buf_idx, int *error_code);
  25 static void ADIOI_LUSTRE_Fill_send_buffer(ADIO_File fd, const void *buf,
  26                                           ADIOI_Flatlist_node *flat_buf,
  27                                           char **send_buf,
  28                                           ADIO_Offset *offset_list,
  29                                           ADIO_Offset *len_list, int *send_size,
  30                                           MPI_Request *requests,
  31                                           int *sent_to_proc, int nprocs,
  32                                           int myrank, int contig_access_count,
  33                                           int *striping_info,
  34                                           int *send_buf_idx,
  35                                           int *curr_to_proc,
  36                                           int *done_to_proc, int iter,
  37                                           MPI_Aint buftype_extent);
  38 static void ADIOI_LUSTRE_W_Exchange_data(ADIO_File fd, const void *buf,
  39                                          char *write_buf,
  40                                          ADIOI_Flatlist_node *flat_buf,
  41                                          ADIO_Offset *offset_list,
  42                                          ADIO_Offset *len_list, int *send_size,
  43                                          int *recv_size, ADIO_Offset off,
  44                                          int size, int *count,
  45                                          int *start_pos, 
  46                                          int *sent_to_proc, int nprocs,
  47                                          int myrank, int buftype_is_contig,
  48                                          int contig_access_count,
  49                                          int *striping_info,
  50                                          ADIOI_Access *others_req,
  51                                          int *send_buf_idx,
  52                                          int *curr_to_proc,
  53                                          int *done_to_proc, int *hole,
  54                                          int iter, MPI_Aint buftype_extent,
  55                                          int *buf_idx,
  56                                          ADIO_Offset **srt_off, int **srt_len, int *srt_num,
  57                                          int *error_code);
  58 void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count,
  59                       ADIO_Offset *srt_off, int *srt_len, int *start_pos,
  60                       int nprocs, int nprocs_recv, int total_elements);
  61 
  62 void ADIOI_LUSTRE_WriteStridedColl(ADIO_File fd, const void *buf, int count,
  63                                    MPI_Datatype datatype,
  64                                    int file_ptr_type, ADIO_Offset offset,
  65                                    ADIO_Status *status, int *error_code)
  66 {
  67     /* Uses a generalized version of the extended two-phase method described
  68      * in "An Extended Two-Phase Method for Accessing Sections of
  69      * Out-of-Core Arrays", Rajeev Thakur and Alok Choudhary,
  70      * Scientific Programming, (5)4:301--317, Winter 1996.
  71      * http://www.mcs.anl.gov/home/thakur/ext2ph.ps
  72      */
  73 
  74     ADIOI_Access *my_req;
  75     /* array of nprocs access structures, one for each other process has
  76        this process's request */
  77 
  78     ADIOI_Access *others_req;
  79     /* array of nprocs access structures, one for each other process
  80        whose request is written by this process. */
  81 
  82     int i, filetype_is_contig, nprocs, myrank, do_collect = 0;
  83     int contig_access_count = 0, buftype_is_contig, interleave_count = 0;
  84     int *count_my_req_per_proc, count_my_req_procs, count_others_req_procs;
  85     ADIO_Offset orig_fp, start_offset, end_offset, off;
  86     ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *end_offsets = NULL;
  87     ADIO_Offset *len_list = NULL;
  88     int **buf_idx = NULL, *striping_info = NULL;
  89     int old_error, tmp_error;
  90 
  91     MPI_Comm_size(fd->comm, &nprocs);
  92     MPI_Comm_rank(fd->comm, &myrank);
  93 
  94     orig_fp = fd->fp_ind;
  95 
  96     /* IO patten identification if cb_write isn't disabled */
  97     if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
  98         /* For this process's request, calculate the list of offsets and
  99            lengths in the file and determine the start and end offsets. */
 100 
 101         /* Note: end_offset points to the last byte-offset that will be accessed.
 102          * e.g., if start_offset=0 and 100 bytes to be read, end_offset=99
 103          */
 104 
 105         ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,
 106                               &offset_list, &len_list, &start_offset,
 107                               &end_offset, &contig_access_count);
 108 
 109         /* each process communicates its start and end offsets to other
 110          * processes. The result is an array each of start and end offsets
 111          * stored in order of process rank.
 112          */
 113         st_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs * sizeof(ADIO_Offset));
 114         end_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs * sizeof(ADIO_Offset));
 115         MPI_Allgather(&start_offset, 1, ADIO_OFFSET, st_offsets, 1,
 116                       ADIO_OFFSET, fd->comm);
 117         MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1,
 118                       ADIO_OFFSET, fd->comm);
 119         /* are the accesses of different processes interleaved? */
 120         for (i = 1; i < nprocs; i++)
 121             if ((st_offsets[i] < end_offsets[i-1]) &&
 122                 (st_offsets[i] <= end_offsets[i]))
 123                 interleave_count++;
 124         /* This is a rudimentary check for interleaving, but should suffice
 125            for the moment. */
 126 
 127         /* Two typical access patterns can benefit from collective write.
 128          *   1) the processes are interleaved, and
 129          *   2) the req size is small.
 130          */
 131         if (interleave_count > 0) {
 132             do_collect = 1;
 133         } else {
 134             do_collect = ADIOI_LUSTRE_Docollect(fd, contig_access_count,
 135                                                 len_list, nprocs);
 136         }
 137     }
 138     ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
 139 
 140     /* Decide if collective I/O should be done */
 141     if ((!do_collect && fd->hints->cb_write == ADIOI_HINT_AUTO) ||
 142         fd->hints->cb_write == ADIOI_HINT_DISABLE) {
 143 
 144         /* use independent accesses */
 145         if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
 146             ADIOI_Free(offset_list);
 147             ADIOI_Free(len_list);
 148             ADIOI_Free(st_offsets);
 149             ADIOI_Free(end_offsets);
 150         }
 151 
 152         fd->fp_ind = orig_fp;
 153         ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
 154         if (buftype_is_contig && filetype_is_contig) {
 155             if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
 156                 off = fd->disp + (ADIO_Offset)(fd->etype_size) * offset;
 157                 ADIO_WriteContig(fd, buf, count, datatype,
 158                                  ADIO_EXPLICIT_OFFSET,
 159                                  off, status, error_code);
 160             } else
 161                 ADIO_WriteContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
 162                                  0, status, error_code);
 163         } else {
 164             ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type,
 165                               offset, status, error_code);
 166         }
 167         return;
 168     }
 169 
 170     /* Get Lustre hints information */
 171     ADIOI_LUSTRE_Get_striping_info(fd, &striping_info, 1);
 172 
 173     /* calculate what portions of the access requests of this process are
 174      * located in which process
 175      */
 176     ADIOI_LUSTRE_Calc_my_req(fd, offset_list, len_list, contig_access_count,
 177                              striping_info, nprocs, &count_my_req_procs,
 178                              &count_my_req_per_proc, &my_req,
 179                              &buf_idx);
 180 
 181     /* based on everyone's my_req, calculate what requests of other processes
 182      * will be accessed by this process.
 183      * count_others_req_procs = number of processes whose requests (including
 184      * this process itself) will be accessed by this process
 185      * count_others_req_per_proc[i] indicates how many separate contiguous
 186      * requests of proc. i will be accessed by this process.
 187      */
 188 
 189     ADIOI_Calc_others_req(fd, count_my_req_procs, count_my_req_per_proc,
 190                           my_req, nprocs, myrank, &count_others_req_procs,
 191                           &others_req);
 192     ADIOI_Free(count_my_req_per_proc);
 193 
 194     /* exchange data and write in sizes of no more than stripe_size. */
 195     ADIOI_LUSTRE_Exch_and_write(fd, buf, datatype, nprocs, myrank,
 196                                 others_req, my_req, offset_list, len_list,
 197                                 contig_access_count, striping_info,
 198                                 buf_idx, error_code);
 199 
 200     /* If this collective write is followed by an independent write,
 201      * it's possible to have those subsequent writes on other processes
 202      * race ahead and sneak in before the read-modify-write completes.
 203      * We carry out a collective communication at the end here so no one
 204      * can start independent i/o before collective I/O completes.
 205      *
 206      * need to do some gymnastics with the error codes so that if something
 207      * went wrong, all processes report error, but if a process has a more
 208      * specific error code, we can still have that process report the
 209      * additional information */
 210 
 211     old_error = *error_code;
 212     if (*error_code != MPI_SUCCESS)
 213         *error_code = MPI_ERR_IO;
 214 
 215     /* optimization: if only one process performing i/o, we can perform
 216      * a less-expensive Bcast  */
 217 #ifdef ADIOI_MPE_LOGGING
 218     MPE_Log_event(ADIOI_MPE_postwrite_a, 0, NULL);
 219 #endif
 220     if (fd->hints->cb_nodes == 1)
 221         MPI_Bcast(error_code, 1, MPI_INT,
 222                   fd->hints->ranklist[0], fd->comm);
 223     else {
 224         tmp_error = *error_code;
 225         MPI_Allreduce(&tmp_error, error_code, 1, MPI_INT,
 226                       MPI_MAX, fd->comm);
 227     }
 228 #ifdef ADIOI_MPE_LOGGING
 229     MPE_Log_event(ADIOI_MPE_postwrite_b, 0, NULL);
 230 #endif
 231 
 232     if ((old_error != MPI_SUCCESS) && (old_error != MPI_ERR_IO))
 233         *error_code = old_error;
 234 
 235 
 236     if (!buftype_is_contig)
 237         ADIOI_Delete_flattened(datatype);
 238 
 239     /* free all memory allocated for collective I/O */
 240     /* free others_req */
 241     for (i = 0; i < nprocs; i++) {
 242         if (others_req[i].count) {
 243             ADIOI_Free(others_req[i].offsets);
 244             ADIOI_Free(others_req[i].lens);
 245             ADIOI_Free(others_req[i].mem_ptrs);
 246         }
 247     }
 248     ADIOI_Free(others_req);
 249     /* free my_req here */
 250     for (i = 0; i < nprocs; i++) {
 251         if (my_req[i].count) {
 252             ADIOI_Free(my_req[i].offsets);
 253             ADIOI_Free(my_req[i].lens);
 254         }
 255     }
 256     ADIOI_Free(my_req);
 257     for (i = 0; i < nprocs; i++) {
 258         ADIOI_Free(buf_idx[i]);
 259     }
 260     ADIOI_Free(buf_idx);
 261     ADIOI_Free(offset_list);
 262     ADIOI_Free(len_list);
 263     ADIOI_Free(st_offsets);
 264     ADIOI_Free(end_offsets);
 265     ADIOI_Free(striping_info);
 266 
 267 #ifdef HAVE_STATUS_SET_BYTES
 268     if (status) {
 269         MPI_Count bufsize, size;
 270         /* Don't set status if it isn't needed */
 271         MPI_Type_size_x(datatype, &size);
 272         bufsize = size * count;
 273         MPIR_Status_set_bytes(status, datatype, bufsize);
 274     }
 275     /* This is a temporary way of filling in status. The right way is to
 276      * keep track of how much data was actually written during collective I/O.
 277      */
 278 #endif
 279 
 280     fd->fp_sys_posn = -1;       /* set it to null. */
 281 }
 282 
 283 /* If successful, error_code is set to MPI_SUCCESS.  Otherwise an error
 284  * code is created and returned in error_code.
 285  */
 286 static void ADIOI_LUSTRE_Exch_and_write(ADIO_File fd, const void *buf,
 287                                         MPI_Datatype datatype, int nprocs,
 288                                         int myrank, ADIOI_Access *others_req,
 289                                         ADIOI_Access *my_req,
 290                                         ADIO_Offset *offset_list,
 291                                         ADIO_Offset *len_list, 
 292                                         int contig_access_count,
 293                                         int *striping_info, int **buf_idx,
 294                                         int *error_code)
 295 {
 296     /* Send data to appropriate processes and write in sizes of no more
 297      * than lustre stripe_size.
 298      * The idea is to reduce the amount of extra memory required for
 299      * collective I/O. If all data were written all at once, which is much
 300      * easier, it would require temp space more than the size of user_buf,
 301      * which is often unacceptable. For example, to write a distributed
 302      * array to a file, where each local array is 8Mbytes, requiring
 303      * at least another 8Mbytes of temp space is unacceptable.
 304      */
 305 
 306     int hole, i, j, m, flag, ntimes = 1 , max_ntimes, buftype_is_contig;
 307     ADIO_Offset st_loc = -1, end_loc = -1, min_st_loc, max_end_loc;
 308     ADIO_Offset off, req_off, send_off, iter_st_off, *off_list;
 309     ADIO_Offset max_size, step_size = 0;
 310     int real_size, req_len, send_len;
 311     int *recv_curr_offlen_ptr, *recv_count, *recv_size;
 312     int *send_curr_offlen_ptr, *send_size;
 313     int *sent_to_proc, *recv_start_pos;
 314     int *send_buf_idx, *curr_to_proc, *done_to_proc;
 315     int *this_buf_idx;
 316     char *write_buf = NULL;
 317     MPI_Status status;
 318     ADIOI_Flatlist_node *flat_buf = NULL;
 319     MPI_Aint buftype_extent, buftype_lb;
 320     int stripe_size = striping_info[0], avail_cb_nodes = striping_info[2];
 321     int data_sieving = 0;
 322     ADIO_Offset *srt_off = NULL;
 323     int *srt_len = NULL;
 324     int srt_num = 0;
 325     ADIO_Offset block_offset;
 326     int block_len;
 327 
 328     *error_code = MPI_SUCCESS;  /* changed below if error */
 329     /* only I/O errors are currently reported */
 330 
 331     /* calculate the number of writes of stripe size to be done.
 332      * That gives the no. of communication phases as well.
 333      * Note:
 334      *   Because we redistribute data in stripe-contiguous pattern for Lustre,
 335      *   each process has the same no. of communication phases.
 336      */
 337 
 338     for (i = 0; i < nprocs; i++) {
 339         if (others_req[i].count) {
 340             st_loc = others_req[i].offsets[0];
 341             end_loc = others_req[i].offsets[0];
 342             break;
 343         }
 344     }
 345     for (i = 0; i < nprocs; i++) {
 346         for (j = 0; j < others_req[i].count; j++) {
 347             st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);
 348             end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j] +
 349                                           others_req[i].lens[j] - 1));
 350         }
 351     }
 352     /* this process does no writing. */
 353     if ((st_loc == -1) && (end_loc == -1))
 354         ntimes = 0;
 355     MPI_Allreduce(&end_loc, &max_end_loc, 1, MPI_LONG_LONG_INT, MPI_MAX, fd->comm);
 356     /* avoid min_st_loc be -1 */
 357     if (st_loc == -1)
 358         st_loc = max_end_loc;
 359     MPI_Allreduce(&st_loc, &min_st_loc, 1, MPI_LONG_LONG_INT, MPI_MIN, fd->comm);
 360     /* align downward */
 361     min_st_loc -= min_st_loc % (ADIO_Offset)stripe_size;
 362 
 363     /* Each time, only avail_cb_nodes number of IO clients perform IO,
 364      * so, step_size=avail_cb_nodes*stripe_size IO will be performed at most,
 365      * and ntimes=whole_file_portion/step_size
 366      */
 367     step_size = (ADIO_Offset) avail_cb_nodes * stripe_size;
 368     max_ntimes = (max_end_loc - min_st_loc + 1) / step_size
 369         + (((max_end_loc - min_st_loc + 1) % step_size) ? 1 : 0);
 370 /*     max_ntimes = (int)((max_end_loc - min_st_loc) / step_size + 1); */
 371     if (ntimes)
 372         write_buf = (char *) ADIOI_Malloc(stripe_size);
 373 
 374     /* calculate the start offset for each iteration */
 375     off_list = (ADIO_Offset *) ADIOI_Malloc(max_ntimes * sizeof(ADIO_Offset));
 376     for (m = 0; m < max_ntimes; m ++)
 377         off_list[m] = max_end_loc;
 378     for (i = 0; i < nprocs; i++) {
 379         for (j = 0; j < others_req[i].count; j ++) {
 380             req_off = others_req[i].offsets[j];
 381             m = (int)((req_off - min_st_loc) / step_size);
 382             off_list[m] = ADIOI_MIN(off_list[m], req_off);
 383         }
 384     }
 385 
 386     recv_curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
 387     send_curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
 388     /* their use is explained below. calloc initializes to 0. */
 389 
 390     recv_count = (int *) ADIOI_Malloc(nprocs * sizeof(int));
 391     /* to store count of how many off-len pairs per proc are satisfied
 392        in an iteration. */
 393 
 394     send_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
 395     /* total size of data to be sent to each proc. in an iteration.
 396        Of size nprocs so that I can use MPI_Alltoall later. */
 397 
 398     recv_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
 399     /* total size of data to be recd. from each proc. in an iteration. */
 400 
 401     sent_to_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int));
 402     /* amount of data sent to each proc so far. Used in
 403        ADIOI_Fill_send_buffer. initialized to 0 here. */
 404 
 405     send_buf_idx = (int *) ADIOI_Malloc(nprocs * sizeof(int));
 406     curr_to_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int));
 407     done_to_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int));
 408     /* Above three are used in ADIOI_Fill_send_buffer */
 409 
 410     this_buf_idx = (int *) ADIOI_Malloc(nprocs * sizeof(int));
 411 
 412     recv_start_pos = (int *) ADIOI_Malloc(nprocs * sizeof(int));
 413     /* used to store the starting value of recv_curr_offlen_ptr[i] in
 414        this iteration */
 415 
 416     ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
 417     if (!buftype_is_contig) {
 418         flat_buf = ADIOI_Flatten_and_find(datatype);
 419     }
 420     MPI_Type_get_extent(datatype, &buftype_lb, &buftype_extent);
 421     /* I need to check if there are any outstanding nonblocking writes to
 422      * the file, which could potentially interfere with the writes taking
 423      * place in this collective write call. Since this is not likely to be
 424      * common, let me do the simplest thing possible here: Each process
 425      * completes all pending nonblocking operations before completing.
 426      */
 427     /*ADIOI_Complete_async(error_code);
 428     if (*error_code != MPI_SUCCESS) return;
 429     MPI_Barrier(fd->comm);
 430     */
 431 
 432     iter_st_off = min_st_loc;
 433 
 434     /* Although we have recognized the data according to OST index,
 435      * a read-modify-write will be done if there is a hole between the data.
 436      * For example: if blocksize=60, xfersize=30 and stripe_size=100,
 437      * then rank0 will collect data [0, 30] and [60, 90] then write. There
 438      * is a hole in [30, 60], which will cause a read-modify-write in [0, 90].
 439      *
 440      * To reduce its impact on the performance, we can disable data sieving
 441      * by hint "ds_in_coll".
 442      */
 443     /* check the hint for data sieving */
 444     data_sieving = fd->hints->fs_hints.lustre.ds_in_coll;
 445 
 446     for (m = 0; m < max_ntimes; m++) {
 447         /* go through all others_req and my_req to check which will be received
 448          * and sent in this iteration.
 449          */
 450 
 451         /* Note that MPI guarantees that displacements in filetypes are in
 452            monotonically nondecreasing order and that, for writes, the
 453            filetypes cannot specify overlapping regions in the file. This
 454            simplifies implementation a bit compared to reads. */
 455 
 456         /*
 457            off         = start offset in the file for the data to be written in
 458                          this iteration
 459            iter_st_off = start offset of this iteration
 460            real_size   = size of data written (bytes) corresponding to off
 461            max_size    = possible maximum size of data written in this iteration
 462            req_off     = offset in the file for a particular contiguous request minus
 463                          what was satisfied in previous iteration
 464            send_off    = offset the request needed by other processes in this iteration
 465            req_len     = size corresponding to req_off
 466            send_len    = size corresponding to send_off
 467          */
 468 
 469         /* first calculate what should be communicated */
 470         for (i = 0; i < nprocs; i++)
 471             recv_count[i] = recv_size[i] = send_size[i] = 0;
 472 
 473         off = off_list[m];
 474         max_size = ADIOI_MIN(step_size, max_end_loc - iter_st_off + 1);
 475         real_size = (int) ADIOI_MIN((off / stripe_size + 1) * stripe_size -
 476                                     off,
 477                                     end_loc - off + 1);
 478 
 479         for (i = 0; i < nprocs; i++) {
 480             if (my_req[i].count) {
 481                 this_buf_idx[i] = buf_idx[i][send_curr_offlen_ptr[i]];
 482                 for (j = send_curr_offlen_ptr[i]; j < my_req[i].count; j++) {
 483                     send_off = my_req[i].offsets[j];
 484                     send_len = my_req[i].lens[j];
 485                     if (send_off < iter_st_off + max_size) {
 486                         send_size[i] += send_len;
 487                     } else {
 488                         break;
 489                     }
 490                 }
 491                 send_curr_offlen_ptr[i] = j;
 492             }
 493             if (others_req[i].count) {
 494                 recv_start_pos[i] = recv_curr_offlen_ptr[i];
 495                 for (j = recv_curr_offlen_ptr[i]; j < others_req[i].count; j++) {
 496                     req_off = others_req[i].offsets[j];
 497                     req_len = others_req[i].lens[j];
 498                     if (req_off < iter_st_off + max_size) {
 499                         recv_count[i]++;
 500                         ADIOI_Assert((((ADIO_Offset)(MPIU_Upint)write_buf)+req_off-off) == (ADIO_Offset)(MPIU_Upint)(write_buf+req_off-off));
 501                         MPI_Get_address(write_buf + req_off - off,
 502                                         &(others_req[i].mem_ptrs[j]));
 503                         recv_size[i] += req_len;
 504                     } else {
 505                         break;
 506                     }
 507                 }
 508                 recv_curr_offlen_ptr[i] = j;
 509             }
 510         }
 511         /* use variable "hole" to pass data_sieving flag into W_Exchange_data */
 512         hole = data_sieving;
 513         ADIOI_LUSTRE_W_Exchange_data(fd, buf, write_buf, flat_buf, offset_list,
 514                                      len_list, send_size, recv_size, off, real_size,
 515                                      recv_count, recv_start_pos,
 516                                      sent_to_proc, nprocs, myrank,
 517                                      buftype_is_contig, contig_access_count,
 518                                      striping_info, others_req, send_buf_idx,
 519                                      curr_to_proc, done_to_proc, &hole, m,
 520                                   buftype_extent, this_buf_idx,
 521                                   &srt_off, &srt_len, &srt_num, error_code);
 522 
 523         if (*error_code != MPI_SUCCESS)
 524             goto over;
 525 
 526         flag = 0;
 527         for (i = 0; i < nprocs; i++)
 528             if (recv_count[i]) {
 529                 flag = 1;
 530                 break;
 531             }
 532         if (flag) {
 533             /* check whether to do data sieving */
 534             if(data_sieving == ADIOI_HINT_ENABLE) {
 535                 ADIO_WriteContig(fd, write_buf, real_size, MPI_BYTE,
 536                                  ADIO_EXPLICIT_OFFSET, off, &status,
 537                                  error_code);
 538             } else {
 539                 /* if there is no hole, write data in one time;
 540                  * otherwise, write data in several times */
 541                 if (!hole) {
 542                     ADIO_WriteContig(fd, write_buf, real_size, MPI_BYTE,
 543                                      ADIO_EXPLICIT_OFFSET, off, &status,
 544                                      error_code);
 545                 } else {
 546                     block_offset = -1;
 547                     block_len = 0;
 548                     for (i = 0; i < srt_num; ++i) {
 549                         if (srt_off[i] < off + real_size &&
 550                             srt_off[i] >= off) {
 551                             if (block_offset == -1) {
 552                                 block_offset = srt_off[i];
 553                                 block_len = srt_len[i];
 554                             } else {
 555                                 if (srt_off[i] == block_offset + block_len) {
 556                                     block_len += srt_len[i];
 557                                 } else {
 558                                     ADIO_WriteContig(fd,
 559                                                      write_buf + block_offset - off,
 560                                                      block_len,
 561                                                      MPI_BYTE, ADIO_EXPLICIT_OFFSET,
 562                                                      block_offset, &status,
 563                                                      error_code);
 564                                     if (*error_code != MPI_SUCCESS)
 565                                         goto over;
 566                                     block_offset = srt_off[i];
 567                                     block_len = srt_len[i];
 568                                 }
 569                             }
 570                         }
 571                     }
 572                     if (block_offset != -1) {
 573                         ADIO_WriteContig(fd,
 574                                          write_buf + block_offset - off,
 575                                          block_len,
 576                                          MPI_BYTE, ADIO_EXPLICIT_OFFSET,
 577                                          block_offset, &status,
 578                                          error_code);
 579                         if (*error_code != MPI_SUCCESS)
 580                             goto over;
 581                     }
 582                 }
 583             }
 584             if (*error_code != MPI_SUCCESS)
 585                 goto over;
 586         }
 587         iter_st_off += max_size;
 588     }
 589 over:
 590     if (srt_off)
 591         ADIOI_Free(srt_off);
 592     if (srt_len)
 593         ADIOI_Free(srt_len);
 594     if (ntimes)
 595         ADIOI_Free(write_buf);
 596     ADIOI_Free(recv_curr_offlen_ptr);
 597     ADIOI_Free(send_curr_offlen_ptr);
 598     ADIOI_Free(recv_count);
 599     ADIOI_Free(send_size);
 600     ADIOI_Free(recv_size);
 601     ADIOI_Free(sent_to_proc);
 602     ADIOI_Free(recv_start_pos);
 603     ADIOI_Free(send_buf_idx);
 604     ADIOI_Free(curr_to_proc);
 605     ADIOI_Free(done_to_proc);
 606     ADIOI_Free(this_buf_idx);
 607     ADIOI_Free(off_list);
 608 }
 609 
 610 /* Sets error_code to MPI_SUCCESS if successful, or creates an error code
 611  * in the case of error.
 612  */
 613 static void ADIOI_LUSTRE_W_Exchange_data(ADIO_File fd, const void *buf,
 614                                          char *write_buf,
 615                                          ADIOI_Flatlist_node *flat_buf,
 616                                          ADIO_Offset *offset_list,
 617                                          ADIO_Offset *len_list, int *send_size,
 618                                          int *recv_size, ADIO_Offset off,
 619                                          int size, int *count,
 620                                          int *start_pos, 
 621                                          int *sent_to_proc, int nprocs,
 622                                          int myrank, int buftype_is_contig,
 623                                          int contig_access_count,
 624                                          int *striping_info,
 625                                          ADIOI_Access *others_req,
 626                                          int *send_buf_idx,
 627                                          int *curr_to_proc, int *done_to_proc,
 628                                          int *hole, int iter,
 629                                          MPI_Aint buftype_extent,
 630                                          int *buf_idx,
 631                           ADIO_Offset **srt_off, int **srt_len, int *srt_num,
 632                           int *error_code)
 633 {
 634     int i, j, nprocs_recv, nprocs_send, err;
 635     char **send_buf = NULL;
 636     MPI_Request *requests, *send_req;
 637     MPI_Datatype *recv_types;
 638     MPI_Status *statuses, status;
 639     int sum_recv;
 640     int data_sieving = *hole;
 641     static char myname[] = "ADIOI_W_EXCHANGE_DATA";
 642 
 643     /* create derived datatypes for recv */
 644     nprocs_recv = 0;
 645     for (i = 0; i < nprocs; i++)
 646         if (recv_size[i])
 647             nprocs_recv++;
 648 
 649     recv_types = (MPI_Datatype *) ADIOI_Malloc((nprocs_recv + 1) *
 650                                                sizeof(MPI_Datatype));
 651     /* +1 to avoid a 0-size malloc */
 652 
 653     j = 0;
 654     for (i = 0; i < nprocs; i++) {
 655         if (recv_size[i]) {
 656             ADIOI_Type_create_hindexed_x(count[i],
 657                               &(others_req[i].lens[start_pos[i]]),
 658                               &(others_req[i].mem_ptrs[start_pos[i]]),
 659                               MPI_BYTE, recv_types + j);
 660             /* absolute displacements; use MPI_BOTTOM in recv */
 661             MPI_Type_commit(recv_types + j);
 662             j++;
 663         }
 664     }
 665 
 666     /* To avoid a read-modify-write,
 667      * check if there are holes in the data to be written.
 668      * For this, merge the (sorted) offset lists others_req using a heap-merge.
 669      */
 670 
 671     *srt_num = 0;
 672     for (i = 0; i < nprocs; i++)
 673         *srt_num += count[i];
 674     if (*srt_off)
 675         *srt_off = (ADIO_Offset *) ADIOI_Realloc(*srt_off, (*srt_num + 1) * sizeof(ADIO_Offset));
 676     else
 677         *srt_off = (ADIO_Offset *) ADIOI_Malloc((*srt_num + 1) * sizeof(ADIO_Offset));
 678     if (*srt_len)
 679         *srt_len = (int *) ADIOI_Realloc(*srt_len, (*srt_num + 1) * sizeof(int));
 680     else
 681         *srt_len = (int *) ADIOI_Malloc((*srt_num + 1) * sizeof(int));
 682     /* +1 to avoid a 0-size malloc */
 683 
 684     ADIOI_Heap_merge(others_req, count, *srt_off, *srt_len, start_pos,
 685                      nprocs, nprocs_recv, *srt_num);
 686 
 687     /* check if there are any holes */
 688     *hole = 0;
 689     for (i = 0; i < *srt_num - 1; i++) {
 690         if ((*srt_off)[i] + (*srt_len)[i] < (*srt_off)[i + 1]) {
 691             *hole = 1;
 692             break;
 693         }
 694     }
 695     /* In some cases (see John Bent ROMIO REQ # 835), an odd interaction
 696      * between aggregation, nominally contiguous regions, and cb_buffer_size
 697      * should be handled with a read-modify-write (otherwise we will write out
 698      * more data than we receive from everyone else (inclusive), so override
 699      * hole detection
 700      */
 701     if (*hole == 0) {
 702         sum_recv = 0;
 703         for (i = 0; i < nprocs; i++)
 704             sum_recv += recv_size[i];
 705         if (size > sum_recv)
 706             *hole = 1;
 707     }
 708     /* check the hint for data sieving */
 709     if (data_sieving == ADIOI_HINT_ENABLE && nprocs_recv && *hole) {
 710         ADIO_ReadContig(fd, write_buf, size, MPI_BYTE,
 711                         ADIO_EXPLICIT_OFFSET, off, &status, &err);
 712         // --BEGIN ERROR HANDLING--
 713         if (err != MPI_SUCCESS) {
 714             *error_code = MPIO_Err_create_code(err,
 715                                                MPIR_ERR_RECOVERABLE,
 716                                                myname, __LINE__,
 717                                                MPI_ERR_IO,
 718                                                "**ioRMWrdwr", 0);
 719             ADIOI_Free(recv_types);
 720             return;
 721         }
 722         // --END ERROR HANDLING--
 723     }
 724 
 725     nprocs_send = 0;
 726     for (i = 0; i < nprocs; i++)
 727         if (send_size[i])
 728             nprocs_send++;
 729 
 730     if (fd->atomicity) {
 731         /* bug fix from Wei-keng Liao and Kenin Coloma */
 732         requests = (MPI_Request *) ADIOI_Malloc((nprocs_send + 1) *
 733                                                 sizeof(MPI_Request));
 734         send_req = requests;
 735     } else {
 736         requests = (MPI_Request *) ADIOI_Malloc((nprocs_send + nprocs_recv + 1)*
 737                                                 sizeof(MPI_Request));
 738         /* +1 to avoid a 0-size malloc */
 739 
 740         /* post receives */
 741         j = 0;
 742         for (i = 0; i < nprocs; i++) {
 743             if (recv_size[i]) {
 744                 MPI_Irecv(MPI_BOTTOM, 1, recv_types[j], i,
 745                           myrank + i + 100 * iter, fd->comm, requests + j);
 746                 j++;
 747             }
 748         }
 749         send_req = requests + nprocs_recv;
 750     }
 751 
 752     /* post sends.
 753      * if buftype_is_contig, data can be directly sent from
 754      * user buf at location given by buf_idx. else use send_buf.
 755      */
 756     if (buftype_is_contig) {
 757         j = 0;
 758         for (i = 0; i < nprocs; i++)
 759             if (send_size[i]) {
 760                 ADIOI_Assert(buf_idx[i] != -1);
 761                 MPI_Isend(((char *) buf) + buf_idx[i], send_size[i],
 762                           MPI_BYTE, i, myrank + i + 100 * iter, fd->comm,
 763                           send_req + j);
 764                 j++;
 765             }
 766     } else
 767         if (nprocs_send) {
 768         /* buftype is not contig */
 769         send_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char *));
 770         for (i = 0; i < nprocs; i++)
 771             if (send_size[i])
 772                 send_buf[i] = (char *) ADIOI_Malloc(send_size[i]);
 773 
 774         ADIOI_LUSTRE_Fill_send_buffer(fd, buf, flat_buf, send_buf, offset_list,
 775                                       len_list, send_size, send_req,
 776                                       sent_to_proc, nprocs, myrank,
 777                                       contig_access_count, striping_info,
 778                                       send_buf_idx, curr_to_proc, done_to_proc,
 779                                       iter, buftype_extent);
 780         /* the send is done in ADIOI_Fill_send_buffer */
 781     }
 782 
 783         /* bug fix from Wei-keng Liao and Kenin Coloma */
 784     if (fd->atomicity) {
 785         j = 0;
 786         for (i = 0; i < nprocs; i++) {
 787             MPI_Status wkl_status;
 788             if (recv_size[i]) {
 789                 MPI_Recv(MPI_BOTTOM, 1, recv_types[j], i,
 790                          myrank + i + 100 * iter, fd->comm, &wkl_status);
 791                 j++;
 792             }
 793         }
 794     }
 795 
 796     for (i = 0; i < nprocs_recv; i++)
 797         MPI_Type_free(recv_types + i);
 798     ADIOI_Free(recv_types);
 799 
 800         /* bug fix from Wei-keng Liao and Kenin Coloma */
 801         /* +1 to avoid a 0-size malloc */
 802     if (fd->atomicity) {
 803         statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send + 1) *
 804                                                sizeof(MPI_Status));
 805     } else {
 806         statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send + nprocs_recv + 1) *
 807                                                sizeof(MPI_Status));
 808     }
 809 
 810 #ifdef NEEDS_MPI_TEST
 811     i = 0;
 812     if (fd->atomicity) {
 813         /* bug fix from Wei-keng Liao and Kenin Coloma */
 814         while (!i)
 815             MPI_Testall(nprocs_send, send_req, &i, statuses);
 816     } else {
 817         while (!i)
 818             MPI_Testall(nprocs_send + nprocs_recv, requests, &i, statuses);
 819     }
 820 #else
 821         /* bug fix from Wei-keng Liao and Kenin Coloma */
 822     if (fd->atomicity)
 823         MPI_Waitall(nprocs_send, send_req, statuses);
 824     else
 825         MPI_Waitall(nprocs_send + nprocs_recv, requests, statuses);
 826 #endif
 827     ADIOI_Free(statuses);
 828     ADIOI_Free(requests);
 829     if (!buftype_is_contig && nprocs_send) {
 830         for (i = 0; i < nprocs; i++)
 831             if (send_size[i])
 832                 ADIOI_Free(send_buf[i]);
 833         ADIOI_Free(send_buf);
 834     }
 835 }
 836 
 837 #define ADIOI_BUF_INCR \
 838 { \
 839     while (buf_incr) { \
 840         size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \
 841         user_buf_idx += size_in_buf; \
 842         flat_buf_sz -= size_in_buf; \
 843         if (!flat_buf_sz) { \
 844             if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
 845             else { \
 846                 flat_buf_idx = 0; \
 847                 n_buftypes++; \
 848             } \
 849             user_buf_idx = flat_buf->indices[flat_buf_idx] + \
 850                 (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent;  \
 851             flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
 852         } \
 853         buf_incr -= size_in_buf; \
 854     } \
 855 }
 856 
 857 
 858 #define ADIOI_BUF_COPY \
 859 { \
 860     while (size) { \
 861         size_in_buf = ADIOI_MIN(size, flat_buf_sz); \
 862         ADIOI_Assert((((ADIO_Offset)(MPIU_Upint)buf) + user_buf_idx) == (ADIO_Offset)(MPIU_Upint)((MPIU_Upint)buf + user_buf_idx)); \
 863         ADIOI_Assert(size_in_buf == (size_t)size_in_buf);               \
 864         memcpy(&(send_buf[p][send_buf_idx[p]]), \
 865                ((char *) buf) + user_buf_idx, size_in_buf); \
 866         send_buf_idx[p] += size_in_buf; \
 867         user_buf_idx += size_in_buf; \
 868         flat_buf_sz -= size_in_buf; \
 869         if (!flat_buf_sz) { \
 870             if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
 871             else { \
 872                 flat_buf_idx = 0; \
 873                 n_buftypes++; \
 874             } \
 875             user_buf_idx = flat_buf->indices[flat_buf_idx] + \
 876                 (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent;    \
 877             flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
 878         } \
 879         size -= size_in_buf; \
 880         buf_incr -= size_in_buf; \
 881     } \
 882     ADIOI_BUF_INCR \
 883 }
 884 
 885 static void ADIOI_LUSTRE_Fill_send_buffer(ADIO_File fd, const void *buf,
 886                                           ADIOI_Flatlist_node *flat_buf,
 887                                           char **send_buf,
 888                                           ADIO_Offset *offset_list,
 889                                           ADIO_Offset *len_list, int *send_size,
 890                                           MPI_Request *requests,
 891                                           int *sent_to_proc, int nprocs,
 892                                           int myrank,
 893                                           int contig_access_count,
 894                                           int *striping_info,
 895                                           int *send_buf_idx,
 896                                           int *curr_to_proc,
 897                                           int *done_to_proc, int iter,
 898                                           MPI_Aint buftype_extent)
 899 {
 900     /* this function is only called if buftype is not contig */
 901     int i, p, flat_buf_idx, size;
 902     int flat_buf_sz, buf_incr, size_in_buf, jj, n_buftypes;
 903     ADIO_Offset off, len, rem_len, user_buf_idx;
 904 
 905     /* curr_to_proc[p] = amount of data sent to proc. p that has already
 906      * been accounted for so far
 907      * done_to_proc[p] = amount of data already sent to proc. p in
 908      * previous iterations
 909      * user_buf_idx = current location in user buffer
 910      * send_buf_idx[p] = current location in send_buf of proc. p
 911      */
 912 
 913     for (i = 0; i < nprocs; i++) {
 914         send_buf_idx[i] = curr_to_proc[i] = 0;
 915         done_to_proc[i] = sent_to_proc[i];
 916     }
 917     jj = 0;
 918 
 919     user_buf_idx = flat_buf->indices[0];
 920     flat_buf_idx = 0;
 921     n_buftypes = 0;
 922     flat_buf_sz = flat_buf->blocklens[0];
 923 
 924     /* flat_buf_idx = current index into flattened buftype
 925      * flat_buf_sz = size of current contiguous component in flattened buf
 926      */
 927     for (i = 0; i < contig_access_count; i++) {
 928         off = offset_list[i];
 929         rem_len = (ADIO_Offset) len_list[i];
 930 
 931         /*this request may span to more than one process */
 932         while (rem_len != 0) {
 933             len = rem_len;
 934             /* NOTE: len value is modified by ADIOI_Calc_aggregator() to be no
 935              * longer than the single region that processor "p" is responsible
 936              * for.
 937              */
 938             p = ADIOI_LUSTRE_Calc_aggregator(fd, off, &len, striping_info);
 939 
 940             if (send_buf_idx[p] < send_size[p]) {
 941                 if (curr_to_proc[p] + len > done_to_proc[p]) {
 942                     if (done_to_proc[p] > curr_to_proc[p]) {
 943                         size = (int) ADIOI_MIN(curr_to_proc[p] + len -
 944                                                done_to_proc[p],
 945                                                send_size[p] -
 946                                                send_buf_idx[p]);
 947                         buf_incr = done_to_proc[p] - curr_to_proc[p];
 948                         ADIOI_BUF_INCR
 949                         ADIOI_Assert((curr_to_proc[p] + len - done_to_proc[p]) == (unsigned)(curr_to_proc[p] + len - done_to_proc[p]));
 950                             buf_incr = (int) (curr_to_proc[p] + len -
 951                                               done_to_proc[p]);
 952                         ADIOI_Assert((done_to_proc[p] + size) == (unsigned)(done_to_proc[p] + size));
 953                         curr_to_proc[p] = done_to_proc[p] + size;
 954                         ADIOI_BUF_COPY
 955                     } else {
 956                         size = (int) ADIOI_MIN(len, send_size[p] -
 957                                                send_buf_idx[p]);
 958                         buf_incr = (int) len;
 959                         ADIOI_Assert((curr_to_proc[p] + size) == (unsigned)((ADIO_Offset)curr_to_proc[p] + size));
 960                         curr_to_proc[p] += size;
 961                         ADIOI_BUF_COPY
 962                     }
 963                     if (send_buf_idx[p] == send_size[p]) {
 964                         MPI_Isend(send_buf[p], send_size[p], MPI_BYTE, p,
 965                                   myrank + p + 100 * iter, fd->comm,
 966                                   requests + jj);
 967                         jj++;
 968                     }
 969                 } else {
 970                     ADIOI_Assert((curr_to_proc[p] + len) == (unsigned)((ADIO_Offset)curr_to_proc[p] + len));
 971                     curr_to_proc[p] += (int) len;
 972                     buf_incr = (int) len;
 973                     ADIOI_BUF_INCR
 974                 }
 975             } else {
 976                 buf_incr = (int) len;
 977                 ADIOI_BUF_INCR
 978             }
 979             off += len;
 980             rem_len -= len;
 981         }
 982     }
 983     for (i = 0; i < nprocs; i++)
 984         if (send_size[i])
 985             sent_to_proc[i] = curr_to_proc[i];
 986 }

/* [<][>][^][v][top][bottom][index][help] */