root/ompi/mca/io/romio321/romio/adio/common/ad_iread_coll.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. ADIOI_GEN_IreadStridedColl
  2. ADIOI_GEN_IreadStridedColl_inter
  3. ADIOI_GEN_IreadStridedColl_indio
  4. ADIOI_GEN_IreadStridedColl_read
  5. ADIOI_GEN_IreadStridedColl_free
  6. ADIOI_GEN_IreadStridedColl_fini
  7. ADIOI_Iread_and_exch
  8. ADIOI_Iread_and_exch_l1_begin
  9. ADIOI_Iread_and_exch_l1_end
  10. ADIOI_Iread_and_exch_reset
  11. ADIOI_Iread_and_exch_l2_begin
  12. ADIOI_Iread_and_exch_l2_end
  13. ADIOI_Iread_and_exch_fini
  14. ADIOI_R_Iexchange_data
  15. ADIOI_R_Iexchange_data_recv
  16. ADIOI_R_Iexchange_data_fill
  17. ADIOI_R_Iexchange_data_fini
  18. ADIOI_GEN_irc_query_fn
  19. ADIOI_GEN_irc_free_fn
  20. ADIOI_GEN_irc_poll_fn
  21. ADIOI_GEN_irc_wait_fn

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
   2 /*
   3  *  (C) 2014 by Argonne National Laboratory.
   4  *      See COPYRIGHT in top-level directory.
   5  */
   6 
   7 #include "adio.h"
   8 #include "adio_extern.h"
   9 #include "mpiu_greq.h"
  10 #include "mpioimpl.h"
  11 
  12 #ifdef USE_DBG_LOGGING
  13   #define RDCOLL_DEBUG 1
  14 #endif
  15 #ifdef AGGREGATION_PROFILE
  16 #include "mpe.h"
  17 #endif
  18 
  19 #ifdef HAVE_MPI_GREQUEST_EXTENSIONS
  20 
  21 /* ADIOI_GEN_IreadStridedColl */
  22 struct ADIOI_GEN_IreadStridedColl_vars {
  23     /* requests */
  24     MPI_Request req_offset[2];  /* ADIOI_IRC_STATE_GEN_IREADSTRIDEDCOLL */
  25     MPI_Request req_ind_io;     /* ADIOI_IRC_STATE_GEN_IREADSTRIDEDCOLL_INDIO */
  26 
  27     /* parameters */
  28     ADIO_File fd;
  29     void *buf;
  30     int count;
  31     MPI_Datatype datatype;
  32     int file_ptr_type;
  33     ADIO_Offset offset;
  34 
  35     /* stack variables */
  36     ADIOI_Access *my_req;
  37     /* array of nprocs structures, one for each other process in
  38        whose file domain this process's request lies */
  39 
  40     ADIOI_Access *others_req;
  41     /* array of nprocs structures, one for each other process
  42        whose request lies in this process's file domain. */
  43 
  44     int nprocs;
  45     int nprocs_for_coll;
  46     int myrank;
  47     int contig_access_count;
  48     int interleave_count;
  49     int buftype_is_contig;
  50     int *count_my_req_per_proc;
  51     int count_my_req_procs;
  52     int count_others_req_procs;
  53     ADIO_Offset start_offset;
  54     ADIO_Offset end_offset;
  55     ADIO_Offset orig_fp;
  56     ADIO_Offset fd_size;
  57     ADIO_Offset min_st_offset;
  58     ADIO_Offset *offset_list;
  59     ADIO_Offset *st_offsets;
  60     ADIO_Offset *fd_start;
  61     ADIO_Offset *fd_end;
  62     ADIO_Offset *end_offsets;
  63     ADIO_Offset *len_list;
  64     int *buf_idx;
  65 };
  66 
  67 /* ADIOI_Iread_and_exch */
  68 struct ADIOI_Iread_and_exch_vars {
  69     /* requests */
  70     MPI_Request req1;   /* ADIOI_IRC_STATE_IREAD_AND_EXCH */
  71     MPI_Request req2;   /* ADIOI_IRC_STATE_IREAD_AND_EXCH_L1_BEGIN */
  72 
  73     /* parameters */
  74     ADIO_File fd;
  75     void *buf;
  76     MPI_Datatype datatype;
  77     int nprocs;
  78     int myrank;
  79     ADIOI_Access *others_req;
  80     ADIO_Offset *offset_list;
  81     ADIO_Offset *len_list;
  82     int contig_access_count;
  83     ADIO_Offset min_st_offset;
  84     ADIO_Offset fd_size;
  85     ADIO_Offset *fd_start;
  86     ADIO_Offset *fd_end;
  87     int *buf_idx;
  88 
  89     /* stack variables */
  90     int m;
  91     int ntimes;
  92     int max_ntimes;
  93     int buftype_is_contig;
  94     ADIO_Offset st_loc;
  95     ADIO_Offset end_loc;
  96     ADIO_Offset off;
  97     ADIO_Offset done;
  98     char *read_buf;
  99     int *curr_offlen_ptr;
 100     int *count;
 101     int *send_size;
 102     int *recv_size;
 103     int *partial_send;
 104     int *recd_from_proc;
 105     int *start_pos;
 106     /* Not convinced end_loc-st_loc couldn't be > int, so make these offsets*/
 107     ADIO_Offset size;
 108     ADIO_Offset real_size;
 109     ADIO_Offset for_curr_iter;
 110     ADIO_Offset for_next_iter;
 111     ADIOI_Flatlist_node *flat_buf;
 112     MPI_Aint buftype_extent;
 113     int coll_bufsize;
 114 
 115     /* next function to be called */
 116     void (*next_fn)(ADIOI_NBC_Request *, int *);
 117 };
 118 
 119 /* ADIOI_R_Iexchange_data */
 120 struct ADIOI_R_Iexchange_data_vars {
 121     /* requests */
 122     MPI_Request req1;   /* ADIOI_IRC_STATE_R_IEXCHANGE_DATA */
 123     MPI_Request *req2;  /* ADIOI_IRC_STATE_R_IEXCHANGE_DATA_RECV & FILL */
 124 
 125     /* parameters */
 126     ADIO_File fd;
 127     void *buf;
 128     ADIOI_Flatlist_node *flat_buf;
 129     ADIO_Offset *offset_list;
 130     ADIO_Offset *len_list;
 131     int *send_size;
 132     int *recv_size;
 133     int *count;
 134     int *start_pos;
 135     int *partial_send;
 136     int *recd_from_proc;
 137     int nprocs;
 138     int myrank;
 139     int buftype_is_contig;
 140     int contig_access_count;
 141     ADIO_Offset min_st_offset;
 142     ADIO_Offset fd_size;
 143     ADIO_Offset *fd_start;
 144     ADIO_Offset *fd_end;
 145     ADIOI_Access *others_req;
 146     int iter;
 147     MPI_Aint buftype_extent;
 148     int *buf_idx;
 149 
 150     /* stack variables */
 151     int nprocs_recv;
 152     int nprocs_send;
 153     char **recv_buf;
 154 
 155     /* next function to be called */
 156     void (*next_fn)(ADIOI_NBC_Request *, int *);
 157 };
 158 
 159 
 160 void ADIOI_Fill_user_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
 161                    *flat_buf, char **recv_buf, ADIO_Offset
 162                    *offset_list, ADIO_Offset *len_list,
 163                    unsigned *recv_size,
 164                    MPI_Request *requests, MPI_Status *statuses,
 165                    int *recd_from_proc, int nprocs,
 166                    int contig_access_count,
 167                    ADIO_Offset min_st_offset,
 168                    ADIO_Offset fd_size, ADIO_Offset *fd_start,
 169                    ADIO_Offset *fd_end,
 170                    MPI_Aint buftype_extent);
 171 
 172 /* prototypes of functions used for nonblocking collective reads only. */
 173 static void ADIOI_GEN_IreadStridedColl_inter(ADIOI_NBC_Request *, int *);
 174 static void ADIOI_GEN_IreadStridedColl_indio(ADIOI_NBC_Request *, int *);
 175 static void ADIOI_GEN_IreadStridedColl_read(ADIOI_NBC_Request *, int *);
 176 static void ADIOI_GEN_IreadStridedColl_free(ADIOI_NBC_Request *, int *);
 177 static void ADIOI_GEN_IreadStridedColl_fini(ADIOI_NBC_Request *, int *);
 178 
 179 static void ADIOI_Iread_and_exch(ADIOI_NBC_Request *, int *);
 180 static void ADIOI_Iread_and_exch_l1_begin(ADIOI_NBC_Request *, int *);
 181 static void ADIOI_Iread_and_exch_l1_end(ADIOI_NBC_Request *, int *);
 182 static void ADIOI_Iread_and_exch_reset(ADIOI_NBC_Request *, int *);
 183 static void ADIOI_Iread_and_exch_l2_begin(ADIOI_NBC_Request *, int *);
 184 static void ADIOI_Iread_and_exch_l2_end(ADIOI_NBC_Request *, int *);
 185 static void ADIOI_Iread_and_exch_fini(ADIOI_NBC_Request *, int *);
 186 
 187 static void ADIOI_R_Iexchange_data(ADIOI_NBC_Request *, int *);
 188 static void ADIOI_R_Iexchange_data_recv(ADIOI_NBC_Request *, int *);
 189 static void ADIOI_R_Iexchange_data_fill(ADIOI_NBC_Request *, int *);
 190 static void ADIOI_R_Iexchange_data_fini(ADIOI_NBC_Request *, int *);
 191 
 192 static MPIX_Grequest_class ADIOI_GEN_greq_class = 0;
 193 static int ADIOI_GEN_irc_query_fn(void *extra_state, MPI_Status *status);
 194 static int ADIOI_GEN_irc_free_fn(void *extra_state);
 195 static int ADIOI_GEN_irc_poll_fn(void *extra_state, MPI_Status *status);
 196 static int ADIOI_GEN_irc_wait_fn(int count, void **array_of_states,
 197                                  double timeout, MPI_Status *status);
 198 
 199 
 200 /* Nonblocking version of ADIOI_GEN_ReadStridedColl() */
 201 void ADIOI_GEN_IreadStridedColl(ADIO_File fd, void *buf, int count,
 202                    MPI_Datatype datatype, int file_ptr_type,
 203                    ADIO_Offset offset, MPI_Request *request,
 204                    int *error_code)
 205 {
 206     /* Uses a generalized version of the extended two-phase method described
 207        in "An Extended Two-Phase Method for Accessing Sections of
 208        Out-of-Core Arrays", Rajeev Thakur and Alok Choudhary,
 209        Scientific Programming, (5)4:301--317, Winter 1996.
 210        http://www.mcs.anl.gov/home/thakur/ext2ph.ps */
 211 
 212     ADIOI_NBC_Request *nbc_req = NULL;
 213     ADIOI_GEN_IreadStridedColl_vars *vars = NULL;
 214     int nprocs, myrank;
 215 #ifdef RDCOLL_DEBUG
 216     int i;
 217 #endif
 218 
 219     /* FIXME: need an implementation of ADIOI_IOIstridedColl
 220     if (fd->hints->cb_pfr != ADIOI_HINT_DISABLE) {
 221         ADIOI_IOIstridedColl(fd, buf, count, ADIOI_READ, datatype,
 222                              file_ptr_type, offset, request, error_code);
 223         return;
 224     }
 225     */
 226 
 227     /* top-level struct keeping the status of function progress */
 228     nbc_req = (ADIOI_NBC_Request *)ADIOI_Calloc(1, sizeof(ADIOI_NBC_Request));
 229     nbc_req->rdwr = ADIOI_READ;
 230 
 231     /* create a generalized request */
 232     if (ADIOI_GEN_greq_class == 0) {
 233         MPIX_Grequest_class_create(ADIOI_GEN_irc_query_fn,
 234                 ADIOI_GEN_irc_free_fn, MPIU_Greq_cancel_fn,
 235                 ADIOI_GEN_irc_poll_fn, ADIOI_GEN_irc_wait_fn,
 236                 &ADIOI_GEN_greq_class);
 237     }
 238     MPIX_Grequest_class_allocate(ADIOI_GEN_greq_class, nbc_req, request);
 239     memcpy(&nbc_req->req, request, sizeof(MPI_Request));
 240 
 241     /* create a struct for parameters and variables */
 242     vars = (ADIOI_GEN_IreadStridedColl_vars *)ADIOI_Calloc(
 243             1, sizeof(ADIOI_GEN_IreadStridedColl_vars));
 244     nbc_req->data.rd.rsc_vars = vars;
 245 
 246     /* save the parameters */
 247     vars->fd = fd;
 248     vars->buf = buf;
 249     vars->count = count;
 250     vars->datatype = datatype;
 251     vars->file_ptr_type = file_ptr_type;
 252     vars->offset = offset;
 253 
 254     MPI_Comm_size(fd->comm, &nprocs);
 255     MPI_Comm_rank(fd->comm, &myrank);
 256     vars->nprocs = nprocs;
 257     vars->myrank = myrank;
 258 
 259     /* number of aggregators, cb_nodes, is stored in the hints */
 260     vars->nprocs_for_coll = fd->hints->cb_nodes;
 261     vars->orig_fp = fd->fp_ind;
 262 
 263     /* only check for interleaving if cb_read isn't disabled */
 264     if (fd->hints->cb_read != ADIOI_HINT_DISABLE) {
 265         /* For this process's request, calculate the list of offsets and
 266            lengths in the file and determine the start and end offsets. */
 267 
 268         /* Note: end_offset points to the last byte-offset that will be accessed.
 269            e.g., if start_offset=0 and 100 bytes to be read, end_offset=99*/
 270 
 271         ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,
 272                               &vars->offset_list, &vars->len_list,
 273                               &vars->start_offset, &vars->end_offset,
 274                               &vars->contig_access_count);
 275 
 276 #ifdef RDCOLL_DEBUG
 277         for (i = 0; i < vars->contig_access_count; i++) {
 278             DBG_FPRINTF(stderr, "rank %d  off %lld  len %lld\n",
 279                         myrank, vars->offset_list[i], vars->len_list[i]);
 280         }
 281 #endif
 282 
 283         /* each process communicates its start and end offsets to other
 284            processes. The result is an array each of start and end offsets
 285            stored in order of process rank. */
 286 
 287         vars->st_offsets = (ADIO_Offset *)ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
 288         vars->end_offsets = (ADIO_Offset *)ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
 289 
 290         *error_code = MPI_Iallgather(&vars->start_offset, 1, ADIO_OFFSET,
 291                                      vars->st_offsets, 1, ADIO_OFFSET,
 292                                      fd->comm, &vars->req_offset[0]);
 293         if (*error_code != MPI_SUCCESS) return;
 294         *error_code = MPI_Iallgather(&vars->end_offset, 1, ADIO_OFFSET,
 295                                      vars->end_offsets, 1, ADIO_OFFSET,
 296                                      fd->comm, &vars->req_offset[1]);
 297 
 298         nbc_req->data.rd.state = ADIOI_IRC_STATE_GEN_IREADSTRIDEDCOLL;
 299         return;
 300     }
 301 
 302     ADIOI_GEN_IreadStridedColl_indio(nbc_req, error_code);
 303 }
 304 
 305 static void ADIOI_GEN_IreadStridedColl_inter(ADIOI_NBC_Request *nbc_req,
 306                                              int *error_code)
 307 {
 308     ADIOI_GEN_IreadStridedColl_vars *vars = nbc_req->data.rd.rsc_vars;
 309     int nprocs = vars->nprocs;
 310     ADIO_Offset *st_offsets = vars->st_offsets;
 311     ADIO_Offset *end_offsets = vars->end_offsets;
 312     int i, interleave_count = 0;
 313 
 314     /* are the accesses of different processes interleaved? */
 315     for (i = 1; i < nprocs; i++)
 316         if ((st_offsets[i] < end_offsets[i-1]) &&
 317             (st_offsets[i] <= end_offsets[i]))
 318             interleave_count++;
 319     /* This is a rudimentary check for interleaving, but should suffice
 320        for the moment. */
 321 
 322     vars->interleave_count = interleave_count;
 323 
 324     ADIOI_GEN_IreadStridedColl_indio(nbc_req, error_code);
 325 }
 326 
 327 static void ADIOI_GEN_IreadStridedColl_indio(ADIOI_NBC_Request *nbc_req,
 328                                              int *error_code)
 329 {
 330     ADIOI_GEN_IreadStridedColl_vars *vars = nbc_req->data.rd.rsc_vars;
 331     ADIOI_Icalc_others_req_vars *cor_vars = NULL;
 332     ADIO_File fd = vars->fd;
 333     void *buf;
 334     int count, file_ptr_type;
 335     MPI_Datatype datatype = vars->datatype;
 336     ADIO_Offset offset;
 337     int filetype_is_contig;
 338     ADIO_Offset off;
 339     int nprocs;
 340 
 341     ADIOI_Datatype_iscontig(datatype, &vars->buftype_is_contig);
 342 
 343     if (fd->hints->cb_read == ADIOI_HINT_DISABLE
 344     || (!vars->interleave_count && (fd->hints->cb_read == ADIOI_HINT_AUTO)))
 345     {
 346         buf = vars->buf;
 347         count = vars->count;
 348         file_ptr_type = vars->file_ptr_type;
 349         offset = vars->offset;
 350 
 351         /* don't do aggregation */
 352         if (fd->hints->cb_read != ADIOI_HINT_DISABLE) {
 353             ADIOI_Free(vars->offset_list);
 354             ADIOI_Free(vars->len_list);
 355             ADIOI_Free(vars->st_offsets);
 356             ADIOI_Free(vars->end_offsets);
 357         }
 358 
 359         fd->fp_ind = vars->orig_fp;
 360         ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
 361 
 362 #if defined(ROMIO_RUN_ON_LINUX) && !defined(HAVE_AIO_LITE_H)
 363         /* NOTE: This is currently a workaround to avoid weird errors, e.g.,
 364          * stack fault, occurred on Linux. When the host OS is Linux and
 365          * aio-lite is not used, a blocking ADIO function is used here.
 366          * See https://trac.mpich.org/projects/mpich/ticket/2201. */
 367         MPI_Status status;
 368         if (vars->buftype_is_contig && filetype_is_contig) {
 369             if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
 370                 off = fd->disp + (fd->etype_size) * offset;
 371                 ADIO_ReadContig(fd, buf, count, datatype, ADIO_EXPLICIT_OFFSET,
 372                                 off, &status, error_code);
 373             }
 374             else ADIO_ReadContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
 375                                  0, &status, error_code);
 376         }
 377         else {
 378             ADIO_ReadStrided(fd, buf, count, datatype, file_ptr_type,
 379                              offset, &status, error_code);
 380         }
 381         ADIOI_GEN_IreadStridedColl_fini(nbc_req, error_code);
 382 #else
 383         if (vars->buftype_is_contig && filetype_is_contig) {
 384             if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
 385                 off = fd->disp + (fd->etype_size) * offset;
 386                 ADIO_IreadContig(fd, buf, count, datatype, ADIO_EXPLICIT_OFFSET,
 387                                  off, &vars->req_ind_io, error_code);
 388             }
 389             else ADIO_IreadContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
 390                                   0, &vars->req_ind_io, error_code);
 391         }
 392         else {
 393             ADIO_IreadStrided(fd, buf, count, datatype, file_ptr_type,
 394                               offset, &vars->req_ind_io, error_code);
 395         }
 396 
 397         nbc_req->data.rd.state = ADIOI_IRC_STATE_GEN_IREADSTRIDEDCOLL_INDIO;
 398 #endif
 399         return;
 400     }
 401 
 402     nprocs = vars->nprocs;
 403 
 404     /* We're going to perform aggregation of I/O.  Here we call
 405      * ADIOI_Calc_file_domains() to determine what processes will handle I/O
 406      * to what regions.  We pass nprocs_for_coll into this function; it is
 407      * used to determine how many processes will perform I/O, which is also
 408      * the number of regions into which the range of bytes must be divided.
 409      * These regions are called "file domains", or FDs.
 410      *
 411      * When this function returns, fd_start, fd_end, fd_size, and
 412      * min_st_offset will be filled in.  fd_start holds the starting byte
 413      * location for each file domain.  fd_end holds the ending byte location.
 414      * min_st_offset holds the minimum byte location that will be accessed.
 415      *
 416      * Both fd_start[] and fd_end[] are indexed by an aggregator number; this
 417      * needs to be mapped to an actual rank in the communicator later.
 418      *
 419      */
 420     ADIOI_Calc_file_domains(vars->st_offsets, vars->end_offsets, nprocs,
 421                 vars->nprocs_for_coll, &vars->min_st_offset,
 422                 &vars->fd_start, &vars->fd_end,
 423                 fd->hints->min_fdomain_size, &vars->fd_size,
 424                 fd->hints->striping_unit);
 425 
 426     /* calculate where the portions of the access requests of this process
 427      * are located in terms of the file domains.  this could be on the same
 428      * process or on other processes.  this function fills in:
 429      * count_my_req_procs - number of processes (including this one) for which
 430      *     this process has requests in their file domain
 431      * count_my_req_per_proc - count of requests for each process, indexed
 432      *     by rank of the process
 433      * my_req[] - array of data structures describing the requests to be
 434      *     performed by each process (including self).  indexed by rank.
 435      * buf_idx[] - array of locations into which data can be directly moved;
 436      *     this is only valid for contiguous buffer case
 437      */
 438     ADIOI_Calc_my_req(fd, vars->offset_list, vars->len_list,
 439               vars->contig_access_count, vars->min_st_offset,
 440               vars->fd_start, vars->fd_end, vars->fd_size,
 441               nprocs, &vars->count_my_req_procs,
 442               &vars->count_my_req_per_proc, &vars->my_req,
 443               &vars->buf_idx);
 444 
 445     /* perform a collective communication in order to distribute the
 446      * data calculated above.  fills in the following:
 447      * count_others_req_procs - number of processes (including this
 448      *     one) which have requests in this process's file domain.
 449      * count_others_req_per_proc[] - number of separate contiguous
 450      *     requests from proc i lie in this process's file domain.
 451      */
 452 
 453     cor_vars = (ADIOI_Icalc_others_req_vars *)ADIOI_Calloc(
 454             1, sizeof(ADIOI_Icalc_others_req_vars));
 455     nbc_req->cor_vars = cor_vars;
 456     cor_vars->fd = vars->fd;
 457     cor_vars->count_my_req_procs = vars->count_my_req_procs;
 458     cor_vars->count_my_req_per_proc = vars->count_my_req_per_proc;
 459     cor_vars->my_req = vars->my_req;
 460     cor_vars->nprocs = vars->nprocs;
 461     cor_vars->myrank = vars->myrank;
 462     cor_vars->count_others_req_procs_ptr = &vars->count_others_req_procs;
 463     cor_vars->others_req_ptr = &vars->others_req;
 464     cor_vars->next_fn = ADIOI_GEN_IreadStridedColl_read;
 465 
 466     ADIOI_Icalc_others_req(nbc_req, error_code);
 467 }
 468 
 469 static void ADIOI_GEN_IreadStridedColl_read(ADIOI_NBC_Request *nbc_req,
 470                                             int *error_code)
 471 {
 472     ADIOI_GEN_IreadStridedColl_vars *vars = nbc_req->data.rd.rsc_vars;
 473     ADIOI_Iread_and_exch_vars *rae_vars = NULL;
 474     ADIOI_Access *my_req = vars->my_req;
 475     int nprocs = vars->nprocs;
 476     int i;
 477 
 478     /* my_req[] and count_my_req_per_proc aren't needed at this point, so
 479      * let's free the memory
 480      */
 481     ADIOI_Free(vars->count_my_req_per_proc);
 482     for (i = 0; i < nprocs; i++) {
 483         if (my_req[i].count) {
 484             ADIOI_Free(my_req[i].offsets);
 485             ADIOI_Free(my_req[i].lens);
 486         }
 487     }
 488     ADIOI_Free(my_req);
 489 
 490     /* read data in sizes of no more than ADIOI_Coll_bufsize,
 491      * communicate, and fill user buf.
 492      */
 493     rae_vars = (ADIOI_Iread_and_exch_vars *)ADIOI_Calloc(
 494             1, sizeof(ADIOI_Iread_and_exch_vars));
 495     nbc_req->data.rd.rae_vars = rae_vars;
 496     rae_vars->fd = vars->fd;
 497     rae_vars->buf = vars->buf;
 498     rae_vars->datatype = vars->datatype;
 499     rae_vars->nprocs = vars->nprocs;
 500     rae_vars->myrank = vars->myrank;
 501     rae_vars->others_req = vars->others_req;
 502     rae_vars->offset_list = vars->offset_list;
 503     rae_vars->len_list = vars->len_list;
 504     rae_vars->contig_access_count = vars->contig_access_count;
 505     rae_vars->min_st_offset = vars->min_st_offset;
 506     rae_vars->fd_size = vars->fd_size;
 507     rae_vars->fd_start = vars->fd_start;
 508     rae_vars->fd_end = vars->fd_end;
 509     rae_vars->buf_idx = vars->buf_idx;
 510     rae_vars->next_fn = ADIOI_GEN_IreadStridedColl_free;
 511 
 512     ADIOI_Iread_and_exch(nbc_req, error_code);
 513 }
 514 
 515 static void ADIOI_GEN_IreadStridedColl_free(ADIOI_NBC_Request *nbc_req,
 516                                             int *error_code)
 517 {
 518     ADIOI_GEN_IreadStridedColl_vars *vars = nbc_req->data.rd.rsc_vars;
 519     ADIO_File fd = vars->fd;
 520     MPI_Datatype datatype = vars->datatype;
 521     ADIOI_Access *others_req = vars->others_req;
 522     int nprocs = vars->nprocs;
 523     int i;
 524 
 525     if (!vars->buftype_is_contig) ADIOI_Delete_flattened(datatype);
 526 
 527     /* free all memory allocated for collective I/O */
 528     for (i = 0; i < nprocs; i++) {
 529         if (others_req[i].count) {
 530             ADIOI_Free(others_req[i].offsets);
 531             ADIOI_Free(others_req[i].lens);
 532             ADIOI_Free(others_req[i].mem_ptrs);
 533         }
 534     }
 535     ADIOI_Free(others_req);
 536 
 537     ADIOI_Free(vars->buf_idx);
 538     ADIOI_Free(vars->offset_list);
 539     ADIOI_Free(vars->len_list);
 540     ADIOI_Free(vars->st_offsets);
 541     ADIOI_Free(vars->end_offsets);
 542     ADIOI_Free(vars->fd_start);
 543     ADIOI_Free(vars->fd_end);
 544 
 545     fd->fp_sys_posn = -1;   /* set it to null. */
 546 
 547     ADIOI_GEN_IreadStridedColl_fini(nbc_req, error_code);
 548 }
 549 
 550 static void ADIOI_GEN_IreadStridedColl_fini(ADIOI_NBC_Request *nbc_req,
 551                                             int *error_code)
 552 {
 553     ADIOI_GEN_IreadStridedColl_vars *vars = nbc_req->data.rd.rsc_vars;
 554     MPI_Count size;
 555 
 556     /* This is a temporary way of filling in status. The right way is to
 557        keep track of how much data was actually read and placed in buf
 558        during collective I/O. */
 559     MPI_Type_size_x(vars->datatype, &size);
 560     nbc_req->nbytes = size * vars->count;
 561 
 562     /* free the struct for parameters and variables */
 563     if (nbc_req->data.rd.rsc_vars) {
 564         ADIOI_Free(nbc_req->data.rd.rsc_vars);
 565         nbc_req->data.rd.rsc_vars = NULL;
 566     }
 567 
 568     /* make the request complete */
 569     *error_code = MPI_Grequest_complete(nbc_req->req);
 570     nbc_req->data.rd.state = ADIOI_IRC_STATE_COMPLETE;
 571 }
 572 
 573 
 574 static void ADIOI_Iread_and_exch(ADIOI_NBC_Request *nbc_req, int *error_code)
 575 {
 576     ADIOI_Iread_and_exch_vars *vars = nbc_req->data.rd.rae_vars;
 577     ADIO_File fd = vars->fd;
 578     MPI_Datatype datatype = vars->datatype;
 579     int nprocs = vars->nprocs;
 580     MPI_Aint lb;
 581     ADIOI_Access *others_req = vars->others_req;
 582 
 583     /* Read in sizes of no more than coll_bufsize, an info parameter.
 584        Send data to appropriate processes.
 585        Place recd. data in user buf.
 586        The idea is to reduce the amount of extra memory required for
 587        collective I/O. If all data were read all at once, which is much
 588        easier, it would require temp space more than the size of user_buf,
 589        which is often unacceptable. For example, to read a distributed
 590        array from a file, where each local array is 8Mbytes, requiring
 591        at least another 8Mbytes of temp space is unacceptable. */
 592 
 593     int i, j;
 594     ADIO_Offset st_loc = -1, end_loc = -1;
 595     int coll_bufsize;
 596 
 597     *error_code = MPI_SUCCESS;  /* changed below if error */
 598     /* only I/O errors are currently reported */
 599 
 600     /* calculate the number of reads of size coll_bufsize
 601        to be done by each process and the max among all processes.
 602        That gives the no. of communication phases as well.
 603        coll_bufsize is obtained from the hints object. */
 604 
 605     coll_bufsize = fd->hints->cb_buffer_size;
 606     vars->coll_bufsize = coll_bufsize;
 607 
 608     /* grab some initial values for st_loc and end_loc */
 609     for (i = 0; i < nprocs; i++) {
 610         if (others_req[i].count) {
 611             st_loc = others_req[i].offsets[0];
 612             end_loc = others_req[i].offsets[0];
 613             break;
 614         }
 615     }
 616 
 617     /* now find the real values */
 618     for (i = 0; i < nprocs; i++)
 619         for (j = 0; j < others_req[i].count; j++) {
 620             st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);
 621             end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j]
 622                           + others_req[i].lens[j] - 1));
 623         }
 624 
 625     vars->st_loc = st_loc;
 626     vars->end_loc = end_loc;
 627 
 628     /* calculate ntimes, the number of times this process must perform I/O
 629      * operations in order to complete all the requests it has received.
 630      * the need for multiple I/O operations comes from the restriction that
 631      * we only use coll_bufsize bytes of memory for internal buffering.
 632      */
 633     if ((st_loc == -1) && (end_loc == -1)) {
 634         /* this process does no I/O. */
 635         vars->ntimes = 0;
 636     }
 637     else {
 638         /* ntimes=ceiling_div(end_loc - st_loc + 1, coll_bufsize)*/
 639         vars->ntimes = (int)((end_loc - st_loc + coll_bufsize) / coll_bufsize);
 640     }
 641 
 642     *error_code = MPI_Iallreduce(&vars->ntimes, &vars->max_ntimes, 1, MPI_INT,
 643                                  MPI_MAX, fd->comm, &vars->req1);
 644 
 645     vars->read_buf = fd->io_buf;  /* Allocated at open time */
 646 
 647     vars->curr_offlen_ptr = (int *)ADIOI_Calloc(nprocs, sizeof(int));
 648     /* its use is explained below. calloc initializes to 0. */
 649 
 650     vars->count = (int *)ADIOI_Malloc(nprocs * sizeof(int));
 651     /* to store count of how many off-len pairs per proc are satisfied
 652        in an iteration. */
 653 
 654     vars->partial_send = (int *)ADIOI_Calloc(nprocs, sizeof(int));
 655     /* if only a portion of the last off-len pair is sent to a process
 656        in a particular iteration, the length sent is stored here.
 657        calloc initializes to 0. */
 658 
 659     vars->send_size = (int *)ADIOI_Malloc(nprocs * sizeof(int));
 660     /* total size of data to be sent to each proc. in an iteration */
 661 
 662     vars->recv_size = (int *)ADIOI_Malloc(nprocs * sizeof(int));
 663     /* total size of data to be recd. from each proc. in an iteration.
 664        Of size nprocs so that I can use MPI_Alltoall later. */
 665 
 666     vars->recd_from_proc = (int *)ADIOI_Calloc(nprocs, sizeof(int));
 667     /* amount of data recd. so far from each proc. Used in
 668        ADIOI_Fill_user_buffer. initialized to 0 here. */
 669 
 670     vars->start_pos = (int *)ADIOI_Malloc(nprocs*sizeof(int));
 671     /* used to store the starting value of curr_offlen_ptr[i] in
 672        this iteration */
 673 
 674     ADIOI_Datatype_iscontig(datatype, &vars->buftype_is_contig);
 675     if (!vars->buftype_is_contig) {
 676         vars->flat_buf = ADIOI_Flatten_and_find(datatype);
 677     }
 678     MPI_Type_get_extent(datatype, &lb, &vars->buftype_extent);
 679 
 680     vars->done = 0;
 681     vars->off = st_loc;
 682     vars->for_curr_iter = vars->for_next_iter = 0;
 683 
 684     /* set the state to wait until MPI_Ialltoall finishes. */
 685     nbc_req->data.rd.state = ADIOI_IRC_STATE_IREAD_AND_EXCH;
 686 }
 687 
 688 static void ADIOI_Iread_and_exch_l1_begin(ADIOI_NBC_Request *nbc_req,
 689                                           int *error_code)
 690 {
 691     ADIOI_Iread_and_exch_vars *vars = nbc_req->data.rd.rae_vars;
 692     ADIO_File fd;
 693     int nprocs;
 694     ADIOI_Access *others_req;
 695 
 696     int i, j;
 697     ADIO_Offset real_off, req_off;
 698     char *read_buf;
 699     int *curr_offlen_ptr, *count, *send_size;
 700     int *partial_send, *start_pos;
 701     ADIO_Offset size, real_size, for_next_iter;
 702     int req_len, flag;
 703 
 704     ADIOI_R_Iexchange_data_vars *red_vars = NULL;
 705 
 706     /* loop exit condition */
 707     if (vars->m >= vars->ntimes) {
 708         ADIOI_Iread_and_exch_reset(nbc_req, error_code);
 709         return;
 710     }
 711 
 712     fd = vars->fd;
 713     nprocs = vars->nprocs;
 714     others_req = vars->others_req;
 715 
 716     read_buf = vars->read_buf;
 717     curr_offlen_ptr = vars->curr_offlen_ptr;
 718     count = vars->count;
 719     send_size = vars->send_size;
 720     partial_send = vars->partial_send;
 721     start_pos = vars->start_pos;
 722 
 723     /* read buf of size coll_bufsize (or less) */
 724     /* go through all others_req and check if any are satisfied
 725        by the current read */
 726 
 727     /* since MPI guarantees that displacements in filetypes are in
 728        monotonically nondecreasing order, I can maintain a pointer
 729        (curr_offlen_ptr) to
 730        current off-len pair for each process in others_req and scan
 731        further only from there. There is still a problem of filetypes
 732        such as:  (1, 2, 3 are not process nos. They are just numbers for
 733        three chunks of data, specified by a filetype.)
 734 
 735        1  -------!--
 736        2    -----!----
 737        3       --!-----
 738 
 739        where ! indicates where the current read_size limitation cuts
 740        through the filetype.  I resolve this by reading up to !, but
 741        filling the communication buffer only for 1. I copy the portion
 742        left over for 2 into a tmp_buf for use in the next
 743        iteration. i.e., 2 and 3 will be satisfied in the next
 744        iteration. This simplifies filling in the user's buf at the
 745        other end, as only one off-len pair with incomplete data
 746        will be sent. I also don't need to send the individual
 747        offsets and lens along with the data, as the data is being
 748        sent in a particular order. */
 749 
 750     /* off = start offset in the file for the data actually read in
 751              this iteration
 752        size = size of data read corresponding to off
 753        real_off = off minus whatever data was retained in memory from
 754              previous iteration for cases like 2, 3 illustrated above
 755        real_size = size plus the extra corresponding to real_off
 756        req_off = off in file for a particular contiguous request
 757                  minus what was satisfied in previous iteration
 758        req_size = size corresponding to req_off */
 759 
 760     size = ADIOI_MIN((unsigned)vars->coll_bufsize,
 761                      vars->end_loc - vars->st_loc + 1 - vars->done);
 762     real_off = vars->off - vars->for_curr_iter;
 763     real_size = size + vars->for_curr_iter;
 764 
 765     vars->size = size;
 766     vars->real_size = real_size;
 767 
 768     for (i = 0; i < nprocs; i++) count[i] = send_size[i] = 0;
 769     for_next_iter = 0;
 770 
 771     for (i = 0; i < nprocs; i++) {
 772 #ifdef RDCOLL_DEBUG
 773         DBG_FPRINTF(stderr, "rank %d, i %d, others_count %d\n",
 774                     vars->myrank, i, others_req[i].count);
 775 #endif
 776         if (others_req[i].count) {
 777             start_pos[i] = curr_offlen_ptr[i];
 778             for (j = curr_offlen_ptr[i]; j < others_req[i].count; j++) {
 779                 if (partial_send[i]) {
 780                     /* this request may have been partially
 781                        satisfied in the previous iteration. */
 782                     req_off = others_req[i].offsets[j] + partial_send[i];
 783                     req_len = others_req[i].lens[j] - partial_send[i];
 784                     partial_send[i] = 0;
 785                     /* modify the off-len pair to reflect this change */
 786                     others_req[i].offsets[j] = req_off;
 787                     others_req[i].lens[j] = req_len;
 788                 }
 789                 else {
 790                     req_off = others_req[i].offsets[j];
 791                     req_len = others_req[i].lens[j];
 792                 }
 793                 if (req_off < real_off + real_size) {
 794                     count[i]++;
 795                     ADIOI_Assert((((ADIO_Offset)(MPIU_Upint)read_buf) + req_off - real_off) == (ADIO_Offset)(MPIU_Upint)(read_buf + req_off - real_off));
 796                     MPI_Get_address(read_buf + req_off - real_off,
 797                                 &(others_req[i].mem_ptrs[j]));
 798                     ADIOI_Assert((real_off + real_size - req_off) == (int)(real_off + real_size - req_off));
 799                     send_size[i] += (int)(ADIOI_MIN(real_off + real_size - req_off,
 800                                                     (ADIO_Offset)(unsigned)req_len));
 801 
 802                     if (real_off + real_size - req_off < (ADIO_Offset)(unsigned)req_len) {
 803                         partial_send[i] = (int)(real_off + real_size - req_off);
 804                         if ((j+1 < others_req[i].count) &&
 805                             (others_req[i].offsets[j+1] < real_off + real_size)) {
 806                             /* this is the case illustrated in the
 807                                figure above. */
 808                             for_next_iter = ADIOI_MAX(for_next_iter,
 809                                     real_off + real_size - others_req[i].offsets[j+1]);
 810                             /* max because it must cover requests
 811                                from different processes */
 812                         }
 813                         break;
 814                     }
 815                 }
 816                 else break;
 817             }
 818             curr_offlen_ptr[i] = j;
 819         }
 820     }
 821     vars->for_next_iter = for_next_iter;
 822 
 823     flag = 0;
 824     for (i = 0; i < nprocs; i++)
 825         if (count[i]) flag = 1;
 826 
 827     /* create a struct for ADIOI_R_Iexchange_data() */
 828     red_vars = (ADIOI_R_Iexchange_data_vars *)ADIOI_Calloc(
 829             1, sizeof(ADIOI_R_Iexchange_data_vars));
 830     nbc_req->data.rd.red_vars = red_vars;
 831     red_vars->fd = vars->fd;
 832     red_vars->buf = vars->buf;
 833     red_vars->flat_buf = vars->flat_buf;
 834     red_vars->offset_list = vars->offset_list;
 835     red_vars->len_list = vars->len_list;
 836     red_vars->send_size = vars->send_size;
 837     red_vars->recv_size = vars->recv_size;
 838     red_vars->count = vars->count;
 839     red_vars->start_pos = vars->start_pos;
 840     red_vars->partial_send = vars->partial_send;
 841     red_vars->recd_from_proc = vars->recd_from_proc;
 842     red_vars->nprocs = vars->nprocs;
 843     red_vars->myrank = vars->myrank;
 844     red_vars->buftype_is_contig = vars->buftype_is_contig;
 845     red_vars->contig_access_count = vars->contig_access_count;
 846     red_vars->min_st_offset = vars->min_st_offset;
 847     red_vars->fd_size = vars->fd_size;
 848     red_vars->fd_start = vars->fd_start;
 849     red_vars->fd_end = vars->fd_end;
 850     red_vars->others_req = vars->others_req;
 851     red_vars->iter = vars->m;
 852     red_vars->buftype_extent = vars->buftype_extent;
 853     red_vars->buf_idx = vars->buf_idx;
 854     red_vars->next_fn = ADIOI_Iread_and_exch_l1_end;
 855 
 856     if (flag) {
 857         ADIOI_Assert(size == (int)size);
 858 #if defined(ROMIO_RUN_ON_LINUX) && !defined(HAVE_AIO_LITE_H)
 859         MPI_Status status;
 860         ADIO_ReadContig(fd, read_buf+vars->for_curr_iter, (int)size,
 861                         MPI_BYTE, ADIO_EXPLICIT_OFFSET, vars->off,
 862                         &status, error_code);
 863 #else
 864         ADIO_IreadContig(fd, read_buf+vars->for_curr_iter, (int)size,
 865                          MPI_BYTE, ADIO_EXPLICIT_OFFSET, vars->off,
 866                          &vars->req2, error_code);
 867 
 868         nbc_req->data.rd.state = ADIOI_IRC_STATE_IREAD_AND_EXCH_L1_BEGIN;
 869         return;
 870 #endif
 871     }
 872 
 873     ADIOI_R_Iexchange_data(nbc_req, error_code);
 874 }
 875 
 876 static void ADIOI_Iread_and_exch_l1_end(ADIOI_NBC_Request *nbc_req,
 877                                         int *error_code)
 878 {
 879     ADIOI_Iread_and_exch_vars *vars = nbc_req->data.rd.rae_vars;
 880     ADIO_File fd = vars->fd;
 881     ADIO_Offset size = vars->size;
 882     ADIO_Offset real_size = vars->real_size;
 883     ADIO_Offset for_next_iter = vars->for_next_iter;
 884     char *read_buf = vars->read_buf;
 885     char *tmp_buf;
 886 
 887     vars->for_curr_iter = for_next_iter;
 888 
 889     if (for_next_iter) {
 890         tmp_buf = (char *)ADIOI_Malloc(for_next_iter);
 891         ADIOI_Assert((((ADIO_Offset)(MPIU_Upint)read_buf)+real_size-for_next_iter) == (ADIO_Offset)(MPIU_Upint)(read_buf+real_size-for_next_iter));
 892         ADIOI_Assert((for_next_iter+vars->coll_bufsize) == (size_t)(for_next_iter+vars->coll_bufsize));
 893         memcpy(tmp_buf, read_buf+real_size-for_next_iter, for_next_iter);
 894         ADIOI_Free(fd->io_buf);
 895         fd->io_buf = (char *)ADIOI_Malloc(for_next_iter+vars->coll_bufsize);
 896         memcpy(fd->io_buf, tmp_buf, for_next_iter);
 897         vars->read_buf = fd->io_buf;
 898         ADIOI_Free(tmp_buf);
 899     }
 900 
 901     vars->off += size;
 902     vars->done += size;
 903 
 904     /* increment m and go back to the beginning of m loop */
 905     vars->m++;
 906     ADIOI_Iread_and_exch_l1_begin(nbc_req, error_code);
 907 }
 908 
 909 static void ADIOI_Iread_and_exch_reset(ADIOI_NBC_Request *nbc_req,
 910                                        int *error_code)
 911 {
 912     ADIOI_Iread_and_exch_vars *vars = nbc_req->data.rd.rae_vars;
 913     int nprocs = vars->nprocs;
 914     int *count = vars->count;
 915     int *send_size = vars->send_size;
 916     int i;
 917 
 918     for (i = 0; i < nprocs; i++) count[i] = send_size[i] = 0;
 919 
 920     vars->m = vars->ntimes;
 921     ADIOI_Iread_and_exch_l2_begin(nbc_req, error_code);
 922 }
 923 
 924 static void ADIOI_Iread_and_exch_l2_begin(ADIOI_NBC_Request *nbc_req,
 925                                           int *error_code)
 926 {
 927     ADIOI_Iread_and_exch_vars *vars = nbc_req->data.rd.rae_vars;
 928     ADIOI_R_Iexchange_data_vars *red_vars = NULL;
 929 
 930     /* loop exit condition */
 931     if (vars->m >= vars->max_ntimes) {
 932         ADIOI_Iread_and_exch_fini(nbc_req, error_code);
 933         return;
 934     }
 935 
 936     /* create a struct for ADIOI_R_Iexchange_data() */
 937     red_vars = (ADIOI_R_Iexchange_data_vars *)ADIOI_Calloc(
 938             1, sizeof(ADIOI_R_Iexchange_data_vars));
 939     nbc_req->data.rd.red_vars = red_vars;
 940     red_vars->fd = vars->fd;
 941     red_vars->buf = vars->buf;
 942     red_vars->flat_buf = vars->flat_buf;
 943     red_vars->offset_list = vars->offset_list;
 944     red_vars->len_list = vars->len_list;
 945     red_vars->send_size = vars->send_size;
 946     red_vars->recv_size = vars->recv_size;
 947     red_vars->count = vars->count;
 948     red_vars->start_pos = vars->start_pos;
 949     red_vars->partial_send = vars->partial_send;
 950     red_vars->recd_from_proc = vars->recd_from_proc;
 951     red_vars->nprocs = vars->nprocs;
 952     red_vars->myrank = vars->myrank;
 953     red_vars->buftype_is_contig = vars->buftype_is_contig;
 954     red_vars->contig_access_count = vars->contig_access_count;
 955     red_vars->min_st_offset = vars->min_st_offset;
 956     red_vars->fd_size = vars->fd_size;
 957     red_vars->fd_start = vars->fd_start;
 958     red_vars->fd_end = vars->fd_end;
 959     red_vars->others_req = vars->others_req;
 960     red_vars->iter = vars->m;
 961     red_vars->buftype_extent = vars->buftype_extent;
 962     red_vars->buf_idx = vars->buf_idx;
 963     red_vars->next_fn = ADIOI_Iread_and_exch_l2_end;
 964 
 965     ADIOI_R_Iexchange_data(nbc_req, error_code);
 966 }
 967 
 968 static void ADIOI_Iread_and_exch_l2_end(ADIOI_NBC_Request *nbc_req,
 969                                         int *error_code)
 970 {
 971     ADIOI_Iread_and_exch_vars *vars = nbc_req->data.rd.rae_vars;
 972 
 973     vars->m++;
 974     ADIOI_Iread_and_exch_l2_begin(nbc_req, error_code);
 975 }
 976 
 977 static void ADIOI_Iread_and_exch_fini(ADIOI_NBC_Request *nbc_req, int *error_code)
 978 {
 979     ADIOI_Iread_and_exch_vars *vars = nbc_req->data.rd.rae_vars;
 980     void (*next_fn)(ADIOI_NBC_Request *, int *);
 981 
 982     ADIOI_Free(vars->curr_offlen_ptr);
 983     ADIOI_Free(vars->count);
 984     ADIOI_Free(vars->partial_send);
 985     ADIOI_Free(vars->send_size);
 986     ADIOI_Free(vars->recv_size);
 987     ADIOI_Free(vars->recd_from_proc);
 988     ADIOI_Free(vars->start_pos);
 989 
 990     next_fn = vars->next_fn;
 991 
 992     /* free the struct for parameters and variables */
 993     ADIOI_Free(nbc_req->data.rd.rae_vars);
 994     nbc_req->data.rd.rae_vars = NULL;
 995 
 996     /* move to the next function */
 997     next_fn(nbc_req, error_code);
 998 }
 999 
1000 
1001 static void ADIOI_R_Iexchange_data(ADIOI_NBC_Request *nbc_req, int *error_code)
1002 {
1003     ADIOI_R_Iexchange_data_vars *vars = nbc_req->data.rd.red_vars;
1004 
1005     /* exchange send_size info so that each process knows how much to
1006        receive from whom and how much memory to allocate. */
1007     *error_code = MPI_Ialltoall(vars->send_size, 1, MPI_INT, vars->recv_size, 1,
1008                                 MPI_INT, vars->fd->comm, &vars->req1);
1009 
1010     nbc_req->data.rd.state = ADIOI_IRC_STATE_R_IEXCHANGE_DATA;
1011 }
1012 
1013 static void ADIOI_R_Iexchange_data_recv(ADIOI_NBC_Request *nbc_req,
1014                                         int *error_code)
1015 {
1016     ADIOI_R_Iexchange_data_vars *vars = nbc_req->data.rd.red_vars;
1017     ADIO_File fd = vars->fd;
1018     int *send_size = vars->send_size;
1019     int *recv_size = vars->recv_size;
1020     int *count = vars->count;
1021     int *start_pos = vars->start_pos;
1022     int *partial_send = vars->partial_send;
1023     int nprocs = vars->nprocs;
1024     int myrank = vars->myrank;
1025     ADIOI_Access *others_req = vars->others_req;
1026     int iter = vars->iter;
1027     int *buf_idx = vars->buf_idx;
1028 
1029     int i, j, k = 0, tmp = 0, nprocs_recv, nprocs_send;
1030     char **recv_buf = NULL;
1031     MPI_Datatype send_type;
1032 
1033     nprocs_recv = 0;
1034     for (i = 0; i < nprocs; i++) if (recv_size[i]) nprocs_recv++;
1035     vars->nprocs_recv = nprocs_recv;
1036 
1037     nprocs_send = 0;
1038     for (i = 0; i < nprocs; i++) if (send_size[i]) nprocs_send++;
1039     vars->nprocs_send = nprocs_send;
1040 
1041     vars->req2 = (MPI_Request *)
1042         ADIOI_Malloc((nprocs_send+nprocs_recv+1)*sizeof(MPI_Request));
1043     /* +1 to avoid a 0-size malloc */
1044 
1045     /* post recvs. if buftype_is_contig, data can be directly recd. into
1046        user buf at location given by buf_idx. else use recv_buf. */
1047 
1048 #ifdef AGGREGATION_PROFILE
1049     MPE_Log_event (5032, 0, NULL);
1050 #endif
1051 
1052     if (vars->buftype_is_contig) {
1053         j = 0;
1054         for (i = 0; i < nprocs; i++)
1055             if (recv_size[i]) {
1056                 MPI_Irecv(((char *)vars->buf) + buf_idx[i], recv_size[i],
1057                           MPI_BYTE, i, myrank+i+100*iter, fd->comm,
1058                           vars->req2 + j);
1059                 j++;
1060                 buf_idx[i] += recv_size[i];
1061             }
1062     }
1063     else {
1064         /* allocate memory for recv_buf and post receives */
1065         recv_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char*));
1066         vars->recv_buf = recv_buf;
1067         for (i = 0; i < nprocs; i++)
1068             if (recv_size[i]) recv_buf[i] = (char *)ADIOI_Malloc(recv_size[i]);
1069 
1070         j = 0;
1071         for (i = 0; i < nprocs; i++)
1072             if (recv_size[i]) {
1073                 MPI_Irecv(recv_buf[i], recv_size[i], MPI_BYTE, i,
1074                           myrank+i+100*iter, fd->comm,
1075                           vars->req2 + j);
1076                 j++;
1077 #ifdef RDCOLL_DEBUG
1078                 DBG_FPRINTF(stderr, "node %d, recv_size %d, tag %d \n",
1079                             myrank, recv_size[i], myrank+i+100*iter);
1080 #endif
1081             }
1082     }
1083 
1084     /* create derived datatypes and send data */
1085 
1086     j = 0;
1087     for (i = 0; i < nprocs; i++) {
1088         if (send_size[i]) {
1089             /* take care if the last off-len pair is a partial send */
1090             if (partial_send[i]) {
1091                 k = start_pos[i] + count[i] - 1;
1092                 tmp = others_req[i].lens[k];
1093                 others_req[i].lens[k] = partial_send[i];
1094             }
1095             ADIOI_Type_create_hindexed_x(count[i],
1096                     &(others_req[i].lens[start_pos[i]]),
1097                     &(others_req[i].mem_ptrs[start_pos[i]]),
1098                     MPI_BYTE, &send_type);
1099             /* absolute displacement; use MPI_BOTTOM in send */
1100             MPI_Type_commit(&send_type);
1101             MPI_Isend(MPI_BOTTOM, 1, send_type, i, myrank+i+100*iter,
1102                       fd->comm, vars->req2 + nprocs_recv + j);
1103             MPI_Type_free(&send_type);
1104             if (partial_send[i]) others_req[i].lens[k] = tmp;
1105             j++;
1106         }
1107     }
1108 
1109     /* wait on the receives */
1110     if (nprocs_recv) {
1111         nbc_req->data.rd.state = ADIOI_IRC_STATE_R_IEXCHANGE_DATA_RECV;
1112         return;
1113     }
1114 
1115     ADIOI_R_Iexchange_data_fill(nbc_req, error_code);
1116 }
1117 
1118 static void ADIOI_R_Iexchange_data_fill(ADIOI_NBC_Request *nbc_req,
1119                                         int *error_code)
1120 {
1121     ADIOI_R_Iexchange_data_vars *vars = nbc_req->data.rd.red_vars;
1122 
1123     if (vars->nprocs_recv) {
1124         /* if noncontiguous, to the copies from the recv buffers */
1125         if (!vars->buftype_is_contig)
1126             ADIOI_Fill_user_buffer(vars->fd, vars->buf, vars->flat_buf,
1127                     vars->recv_buf, vars->offset_list, vars->len_list,
1128                     (unsigned*)vars->recv_size,
1129                     vars->req2, NULL, vars->recd_from_proc,
1130                     vars->nprocs, vars->contig_access_count,
1131                     vars->min_st_offset, vars->fd_size, vars->fd_start,
1132                     vars->fd_end, vars->buftype_extent);
1133     }
1134 
1135     nbc_req->data.rd.state = ADIOI_IRC_STATE_R_IEXCHANGE_DATA_FILL;
1136 }
1137 
1138 static void ADIOI_R_Iexchange_data_fini(ADIOI_NBC_Request *nbc_req, int *error_code)
1139 {
1140     ADIOI_R_Iexchange_data_vars *vars = nbc_req->data.rd.red_vars;
1141     void (*next_fn)(ADIOI_NBC_Request *, int *);
1142     int i;
1143 
1144     ADIOI_Free(vars->req2);
1145 
1146     if (!vars->buftype_is_contig) {
1147         for (i = 0; i < vars->nprocs; i++)
1148             if (vars->recv_size[i]) ADIOI_Free(vars->recv_buf[i]);
1149         ADIOI_Free(vars->recv_buf);
1150     }
1151 #ifdef AGGREGATION_PROFILE
1152     MPE_Log_event (5033, 0, NULL);
1153 #endif
1154 
1155     next_fn = vars->next_fn;
1156 
1157     /* free the structure for parameters and variables */
1158     ADIOI_Free(vars);
1159     nbc_req->data.rd.red_vars = NULL;
1160 
1161     /* move to the next function */
1162     next_fn(nbc_req, error_code);
1163 }
1164 
1165 
1166 static int ADIOI_GEN_irc_query_fn(void *extra_state, MPI_Status *status)
1167 {
1168     ADIOI_NBC_Request *nbc_req;
1169 
1170     nbc_req = (ADIOI_NBC_Request *)extra_state;
1171 
1172     MPI_Status_set_elements_x(status, MPI_BYTE, nbc_req->nbytes);
1173 
1174     /* can never cancel so always true */
1175     MPI_Status_set_cancelled(status, 0);
1176 
1177     /* choose not to return a value for this */
1178     status->MPI_SOURCE = MPI_UNDEFINED;
1179     /* tag has no meaning for this generalized request */
1180     status->MPI_TAG = MPI_UNDEFINED;
1181 
1182     /* this generalized request never fails */
1183     return MPI_SUCCESS;
1184 }
1185 
1186 static int ADIOI_GEN_irc_free_fn(void *extra_state)
1187 {
1188     ADIOI_NBC_Request *nbc_req;
1189 
1190     nbc_req = (ADIOI_NBC_Request *)extra_state;
1191     ADIOI_Free(nbc_req);
1192 
1193     return MPI_SUCCESS;
1194 }
1195 
1196 static int ADIOI_GEN_irc_poll_fn(void *extra_state, MPI_Status *status)
1197 {
1198     ADIOI_NBC_Request *nbc_req;
1199     ADIOI_GEN_IreadStridedColl_vars *rsc_vars = NULL;
1200     ADIOI_Icalc_others_req_vars     *cor_vars = NULL;
1201     ADIOI_Iread_and_exch_vars       *rae_vars = NULL;
1202     ADIOI_R_Iexchange_data_vars     *red_vars = NULL;
1203     int errcode = MPI_SUCCESS;
1204     int flag;
1205 
1206     nbc_req = (ADIOI_NBC_Request *)extra_state;
1207 
1208     switch (nbc_req->data.rd.state) {
1209         case ADIOI_IRC_STATE_GEN_IREADSTRIDEDCOLL:
1210             rsc_vars = nbc_req->data.rd.rsc_vars;
1211             errcode = MPI_Testall(2, rsc_vars->req_offset, &flag,
1212                                   MPI_STATUSES_IGNORE);
1213             if (errcode == MPI_SUCCESS && flag) {
1214                 ADIOI_GEN_IreadStridedColl_inter(nbc_req, &errcode);
1215             }
1216             break;
1217 
1218         case ADIOI_IRC_STATE_GEN_IREADSTRIDEDCOLL_INDIO:
1219             rsc_vars = nbc_req->data.rd.rsc_vars;
1220             errcode = MPI_Test(&rsc_vars->req_ind_io, &flag, MPI_STATUS_IGNORE);
1221             if (errcode == MPI_SUCCESS && flag) {
1222                 /* call the last function */
1223                 ADIOI_GEN_IreadStridedColl_fini(nbc_req, &errcode);
1224             }
1225             break;
1226 
1227         case ADIOI_IRC_STATE_ICALC_OTHERS_REQ:
1228             cor_vars = nbc_req->cor_vars;
1229             errcode = MPI_Test(&cor_vars->req1, &flag, MPI_STATUS_IGNORE);
1230             if (errcode == MPI_SUCCESS && flag) {
1231                 ADIOI_Icalc_others_req_main(nbc_req, &errcode);
1232             }
1233             break;
1234 
1235         case ADIOI_IRC_STATE_ICALC_OTHERS_REQ_MAIN:
1236             cor_vars = nbc_req->cor_vars;
1237             if (cor_vars->num_req2) {
1238                 errcode = MPI_Testall(cor_vars->num_req2, cor_vars->req2,
1239                                       &flag, MPI_STATUSES_IGNORE);
1240                 if (errcode == MPI_SUCCESS && flag) {
1241                     ADIOI_Icalc_others_req_fini(nbc_req, &errcode);
1242                 }
1243             } else {
1244                 ADIOI_Icalc_others_req_fini(nbc_req, &errcode);
1245             }
1246             break;
1247 
1248         case ADIOI_IRC_STATE_IREAD_AND_EXCH:
1249             rae_vars = nbc_req->data.rd.rae_vars;
1250             errcode = MPI_Test(&rae_vars->req1, &flag, MPI_STATUS_IGNORE);
1251             if (errcode == MPI_SUCCESS && flag) {
1252                 rae_vars->m = 0;
1253                 ADIOI_Iread_and_exch_l1_begin(nbc_req, &errcode);
1254             }
1255             break;
1256 
1257         case ADIOI_IRC_STATE_IREAD_AND_EXCH_L1_BEGIN:
1258             rae_vars = nbc_req->data.rd.rae_vars;
1259             errcode = MPI_Test(&rae_vars->req2, &flag, MPI_STATUS_IGNORE);
1260             if (errcode == MPI_SUCCESS && flag) {
1261                 ADIOI_R_Iexchange_data(nbc_req, &errcode);
1262             }
1263             break;
1264 
1265         case ADIOI_IRC_STATE_R_IEXCHANGE_DATA:
1266             red_vars = nbc_req->data.rd.red_vars;
1267             errcode = MPI_Test(&red_vars->req1, &flag, MPI_STATUS_IGNORE);
1268             if (errcode == MPI_SUCCESS && flag) {
1269                 ADIOI_R_Iexchange_data_recv(nbc_req, &errcode);
1270             }
1271             break;
1272 
1273         case ADIOI_IRC_STATE_R_IEXCHANGE_DATA_RECV:
1274             red_vars = nbc_req->data.rd.red_vars;
1275             errcode = MPI_Testall(red_vars->nprocs_recv, red_vars->req2, &flag,
1276                                   MPI_STATUSES_IGNORE);
1277             if (errcode == MPI_SUCCESS && flag) {
1278                 ADIOI_R_Iexchange_data_fill(nbc_req, &errcode);
1279             }
1280             break;
1281 
1282         case ADIOI_IRC_STATE_R_IEXCHANGE_DATA_FILL:
1283             red_vars = nbc_req->data.rd.red_vars;
1284             errcode = MPI_Testall(red_vars->nprocs_send,
1285                                   red_vars->req2 + red_vars->nprocs_recv,
1286                                   &flag, MPI_STATUSES_IGNORE);
1287             if (errcode == MPI_SUCCESS && flag) {
1288                 ADIOI_R_Iexchange_data_fini(nbc_req, &errcode);
1289             }
1290             break;
1291 
1292         default:
1293             break;
1294     }
1295 
1296     /* --BEGIN ERROR HANDLING-- */
1297     if (errcode != MPI_SUCCESS) {
1298         errcode = MPIO_Err_create_code(MPI_SUCCESS,
1299                 MPIR_ERR_RECOVERABLE,
1300                 "ADIOI_GEN_irc_poll_fn", __LINE__,
1301                 MPI_ERR_IO, "**mpi_grequest_complete",
1302                 0);
1303     }
1304     /* --END ERROR HANDLING-- */
1305 
1306     return errcode;
1307 }
1308 
1309 /* wait for multiple requests to complete */
1310 static int ADIOI_GEN_irc_wait_fn(int count, void **array_of_states,
1311                                  double timeout, MPI_Status *status)
1312 {
1313     int i, errcode = MPI_SUCCESS;
1314     double starttime;
1315     ADIOI_NBC_Request **nbc_reqlist;
1316 
1317     nbc_reqlist = (ADIOI_NBC_Request **)array_of_states;
1318 
1319     starttime = MPI_Wtime();
1320     for (i = 0; i < count ; i++) {
1321         while (nbc_reqlist[i]->data.rd.state != ADIOI_IRC_STATE_COMPLETE) {
1322             errcode = ADIOI_GEN_irc_poll_fn(nbc_reqlist[i], MPI_STATUS_IGNORE);
1323             /* --BEGIN ERROR HANDLING-- */
1324             if (errcode != MPI_SUCCESS) {
1325                 errcode = MPIO_Err_create_code(MPI_SUCCESS,
1326                         MPIR_ERR_RECOVERABLE,
1327                         "ADIOI_GEN_irc_wait_fn",
1328                         __LINE__, MPI_ERR_IO,
1329                         "**mpi_grequest_complete", 0);
1330             }
1331             /* --END ERROR HANDLING-- */
1332 
1333             if ((timeout > 0) && (timeout < (MPI_Wtime() - starttime)))
1334                 goto fn_exit;
1335 
1336             /* If the progress engine is blocked, we have to yield for another
1337              * thread to be able to unblock the progress engine. */
1338             MPIR_Ext_cs_yield();
1339         }
1340     }
1341 
1342   fn_exit:
1343     return errcode;
1344 }
1345 
1346 #endif /* HAVE_MPI_GREQUEST_EXTENSIONS */

/* [<][>][^][v][top][bottom][index][help] */