root/ompi/mca/io/romio321/romio/adio/ad_lustre/ad_lustre_aggregate.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. ADIOI_LUSTRE_Get_striping_info
  2. ADIOI_LUSTRE_Calc_aggregator
  3. ADIOI_LUSTRE_Calc_my_req
  4. ADIOI_LUSTRE_Docollect

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
   2 /*
   3  *   Copyright (C) 1997 University of Chicago.
   4  *   See COPYRIGHT notice in top-level directory.
   5  *
   6  *   Copyright (C) 2007 Oak Ridge National Laboratory
   7  *
   8  *   Copyright (C) 2008 Sun Microsystems, Lustre group
   9  */
  10 
  11 #include "ad_lustre.h"
  12 #include "adio_extern.h"
  13 
  14 #undef AGG_DEBUG
  15 
  16 void ADIOI_LUSTRE_Get_striping_info(ADIO_File fd, int **striping_info_ptr,
  17                                     int mode)
  18 {
  19     int *striping_info = NULL;
  20     /* get striping information:
  21      *  striping_info[0]: stripe_size
  22      *  striping_info[1]: stripe_count
  23      *  striping_info[2]: avail_cb_nodes
  24      */
  25     int stripe_size, stripe_count, CO = 1;
  26     int avail_cb_nodes, divisor, nprocs_for_coll = fd->hints->cb_nodes;
  27 
  28     /* Get hints value */
  29     /* stripe size */
  30     stripe_size = fd->hints->striping_unit;
  31     /* stripe count */
  32     /* stripe_size and stripe_count have been validated in ADIOI_LUSTRE_Open() */
  33     stripe_count = fd->hints->striping_factor;
  34 
  35     /* Calculate the available number of I/O clients */
  36     if (!mode) {
  37         /* for collective read,
  38          * if "CO" clients access the same OST simultaneously,
  39          * the OST disk seek time would be much. So, to avoid this,
  40          * it might be better if 1 client only accesses 1 OST.
  41          * So, we set CO = 1 to meet the above requirement.
  42          */
  43         CO = 1;
  44         /*XXX: maybe there are other better way for collective read */
  45     } else {
  46         /* CO also has been validated in ADIOI_LUSTRE_Open(), >0 */
  47         CO = fd->hints->fs_hints.lustre.co_ratio;
  48     }
  49     /* Calculate how many IO clients we need */
  50     /* Algorithm courtesy Pascal Deveze (pascal.deveze@bull.net) */
  51     /* To avoid extent lock conflicts,
  52      * avail_cb_nodes should either 
  53      *  - be a multiple of stripe_count,
  54      *  - or divide stripe_count exactly
  55      * so that each OST is accessed by a maximum of CO constant clients. */
  56     if (nprocs_for_coll >= stripe_count)
  57         /* avail_cb_nodes should be a multiple of stripe_count and the number
  58          * of procs per OST should be limited to the minimum between
  59          * nprocs_for_coll/stripe_count and CO 
  60          * 
  61          * e.g. if stripe_count=20, nprocs_for_coll=42 and CO=3 then 
  62          * avail_cb_nodes should be equal to 40 */
  63         avail_cb_nodes = 
  64                 stripe_count * ADIOI_MIN(nprocs_for_coll/stripe_count, CO);
  65     else {
  66         /* nprocs_for_coll is less than stripe_count */
  67         /* avail_cb_nodes should divide stripe_count */
  68         /* e.g. if stripe_count=60 and nprocs_for_coll=8 then 
  69          * avail_cb_nodes should be egal to 6 */
  70         /* This could be done with :
  71                 while (stripe_count % avail_cb_nodes != 0) avail_cb_nodes--;
  72            but this can be optimized for large values of nprocs_for_coll and
  73            stripe_count */
  74         divisor = 2;
  75         avail_cb_nodes = 1;
  76         /* try to divise */
  77         while (stripe_count >= divisor*divisor) {
  78             if ((stripe_count % divisor) == 0) {
  79                  if (stripe_count/divisor <= nprocs_for_coll) {
  80                      /* The value is found ! */
  81                      avail_cb_nodes = stripe_count/divisor;
  82                      break;
  83                 }
  84                 /* if divisor is less than nprocs_for_coll, divisor is a
  85                  * solution, but it is not sure that it is the best one */
  86                 else if (divisor <= nprocs_for_coll) 
  87                         avail_cb_nodes = divisor;
  88             }
  89             divisor++;
  90         }
  91     }
  92 
  93     *striping_info_ptr = (int *) ADIOI_Malloc(3 * sizeof(int));
  94     striping_info = *striping_info_ptr;
  95     striping_info[0] = stripe_size;
  96     striping_info[1] = stripe_count;
  97     striping_info[2] = avail_cb_nodes;
  98 }
  99 
 100 int ADIOI_LUSTRE_Calc_aggregator(ADIO_File fd, ADIO_Offset off,
 101                                  ADIO_Offset *len, int *striping_info)
 102 {
 103     int rank_index, rank;
 104     ADIO_Offset avail_bytes;
 105     int stripe_size = striping_info[0];
 106     int avail_cb_nodes = striping_info[2];
 107 
 108     /* Produce the stripe-contiguous pattern for Lustre */
 109     rank_index = (int)((off / stripe_size) % avail_cb_nodes);
 110 
 111     /* we index into fd_end with rank_index, and fd_end was allocated to be no
 112      * bigger than fd->hins->cb_nodes.   If we ever violate that, we're
 113      * overrunning arrays.  Obviously, we should never ever hit this abort
 114      */
 115     if (rank_index >= fd->hints->cb_nodes)
 116             MPI_Abort(MPI_COMM_WORLD, 1);
 117 
 118     avail_bytes = (off / (ADIO_Offset)stripe_size + 1) *
 119                   (ADIO_Offset)stripe_size - off;
 120     if (avail_bytes < *len) {
 121         /* this proc only has part of the requested contig. region */
 122         *len = avail_bytes;
 123     }
 124     /* map our index to a rank */
 125     /* NOTE: FOR NOW WE DON'T HAVE A MAPPING...JUST DO 0..NPROCS_FOR_COLL */
 126     rank = fd->hints->ranklist[rank_index];
 127 
 128     return rank;
 129 }
 130 
 131 /* ADIOI_LUSTRE_Calc_my_req() - calculate what portions of the access requests
 132  * of this process are located in the file domains of various processes
 133  * (including this one)
 134  */
 135 
 136 
 137 void ADIOI_LUSTRE_Calc_my_req(ADIO_File fd, ADIO_Offset *offset_list,
 138                               ADIO_Offset *len_list, int contig_access_count,
 139                               int *striping_info, int nprocs,
 140                               int *count_my_req_procs_ptr,
 141                               int **count_my_req_per_proc_ptr,
 142                               ADIOI_Access **my_req_ptr,
 143                               int ***buf_idx_ptr)
 144 {
 145     /* Nothing different from ADIOI_Calc_my_req(), except calling
 146      * ADIOI_Lustre_Calc_aggregator() instead of the old one */
 147     int *count_my_req_per_proc, count_my_req_procs, **buf_idx;
 148     int i, l, proc;
 149     ADIO_Offset avail_len, rem_len, curr_idx, off;
 150     ADIOI_Access *my_req;
 151 
 152     *count_my_req_per_proc_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
 153     count_my_req_per_proc = *count_my_req_per_proc_ptr;
 154     /* count_my_req_per_proc[i] gives the no. of contig. requests of this
 155      * process in process i's file domain. calloc initializes to zero.
 156      * I'm allocating memory of size nprocs, so that I can do an
 157      * MPI_Alltoall later on.
 158      */
 159 
 160     buf_idx = (int **) ADIOI_Malloc(nprocs * sizeof(int*));
 161 
 162     /* one pass just to calculate how much space to allocate for my_req;
 163      * contig_access_count was calculated way back in ADIOI_Calc_my_off_len()
 164      */
 165     for (i = 0; i < contig_access_count; i++) {
 166         /* short circuit offset/len processing if len == 0
 167          * (zero-byte  read/write
 168          */
 169         if (len_list[i] == 0)
 170             continue;
 171         off = offset_list[i];
 172         avail_len = len_list[i];
 173         /* note: we set avail_len to be the total size of the access.
 174          * then ADIOI_LUSTRE_Calc_aggregator() will modify the value to return
 175          * the amount that was available.
 176          */
 177         proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info);
 178         count_my_req_per_proc[proc]++;
 179 
 180         /* figure out how many data is remaining in the access
 181          * we'll take care of this data (if there is any)
 182          * in the while loop below.
 183          */
 184         rem_len = len_list[i] - avail_len;
 185 
 186         while (rem_len != 0) {
 187             off += avail_len;   /* point to first remaining byte */
 188             avail_len = rem_len;        /* save remaining size, pass to calc */
 189             proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info);
 190             count_my_req_per_proc[proc]++;
 191             rem_len -= avail_len;       /* reduce remaining length by amount from fd */
 192         }
 193     }
 194 
 195     /* buf_idx is relevant only if buftype_is_contig.
 196      * buf_idx[i] gives the index into user_buf where data received
 197      * from proc 'i' should be placed. This allows receives to be done
 198      * without extra buffer. This can't be done if buftype is not contig.
 199      */
 200 
 201     /* initialize buf_idx vectors */
 202     for (i = 0; i < nprocs; i++) {
 203         /* add one to count_my_req_per_proc[i] to avoid zero size malloc */
 204         buf_idx[i] = (int *) ADIOI_Malloc((count_my_req_per_proc[i] + 1)
 205                                            * sizeof(int)); 
 206     }
 207 
 208     /* now allocate space for my_req, offset, and len */
 209     *my_req_ptr = (ADIOI_Access *) ADIOI_Malloc(nprocs * sizeof(ADIOI_Access));
 210     my_req = *my_req_ptr;
 211 
 212     count_my_req_procs = 0;
 213     for (i = 0; i < nprocs; i++) {
 214         if (count_my_req_per_proc[i]) {
 215             my_req[i].offsets = (ADIO_Offset *)
 216                                 ADIOI_Malloc(count_my_req_per_proc[i] *
 217                                              sizeof(ADIO_Offset));
 218             my_req[i].lens = ADIOI_Malloc(count_my_req_per_proc[i] *
 219                                                   sizeof(ADIO_Offset));
 220             count_my_req_procs++;
 221         }
 222         my_req[i].count = 0;    /* will be incremented where needed later */
 223     }
 224 
 225     /* now fill in my_req */
 226     curr_idx = 0;
 227     for (i = 0; i < contig_access_count; i++) {
 228         /* short circuit offset/len processing if len == 0
 229          *      (zero-byte  read/write */
 230         if (len_list[i] == 0)
 231             continue;
 232         off = offset_list[i];
 233         avail_len = len_list[i];
 234         proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info);
 235 
 236         l = my_req[proc].count;
 237 
 238         ADIOI_Assert(curr_idx == (int) curr_idx);
 239         ADIOI_Assert(l < count_my_req_per_proc[proc]);
 240         buf_idx[proc][l] = (int) curr_idx;
 241         curr_idx += avail_len;
 242 
 243         rem_len = len_list[i] - avail_len;
 244 
 245         /* store the proc, offset, and len information in an array
 246          * of structures, my_req. Each structure contains the
 247          * offsets and lengths located in that process's FD,
 248          * and the associated count.
 249          */
 250         my_req[proc].offsets[l] = off;
 251         ADIOI_Assert(avail_len == (int) avail_len);
 252         my_req[proc].lens[l] = (int) avail_len;
 253         my_req[proc].count++;
 254 
 255         while (rem_len != 0) {
 256             off += avail_len;
 257             avail_len = rem_len;
 258             proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len,
 259                                                 striping_info);
 260 
 261             l = my_req[proc].count;
 262             ADIOI_Assert(curr_idx == (int) curr_idx);
 263             ADIOI_Assert(l < count_my_req_per_proc[proc]);
 264             buf_idx[proc][l] = (int) curr_idx;
 265 
 266             curr_idx += avail_len;
 267             rem_len -= avail_len;
 268 
 269             my_req[proc].offsets[l] = off;
 270             ADIOI_Assert(avail_len == (int) avail_len);
 271             my_req[proc].lens[l] = (int) avail_len;
 272             my_req[proc].count++;
 273         }
 274     }
 275 
 276 #ifdef AGG_DEBUG
 277     for (i = 0; i < nprocs; i++) {
 278         if (count_my_req_per_proc[i] > 0) {
 279             FPRINTF(stdout, "data needed from %d (count = %d):\n",
 280                             i, my_req[i].count);
 281             for (l = 0; l < my_req[i].count; l++) {
 282                 FPRINTF(stdout, "   off[%d] = %lld, len[%d] = %d\n",
 283                                 l, my_req[i].offsets[l], l, my_req[i].lens[l]);
 284             }
 285         }
 286     }
 287 #endif
 288 
 289     *count_my_req_procs_ptr = count_my_req_procs;
 290     *buf_idx_ptr = buf_idx;
 291 }
 292 
 293 int ADIOI_LUSTRE_Docollect(ADIO_File fd, int contig_access_count,
 294                            ADIO_Offset *len_list, int nprocs)
 295 {
 296     /* If the processes are non-interleaved, we will check the req_size.
 297      *   if (avg_req_size > big_req_size) {
 298      *       docollect = 0;
 299      *   }
 300      */
 301 
 302     int i, docollect = 1, big_req_size = 0;
 303     ADIO_Offset req_size = 0, total_req_size;
 304     int avg_req_size, total_access_count;
 305 
 306     /* calculate total_req_size and total_access_count */
 307     for (i = 0; i < contig_access_count; i++)
 308         req_size += len_list[i];
 309     MPI_Allreduce(&req_size, &total_req_size, 1, MPI_LONG_LONG_INT, MPI_SUM,
 310                fd->comm);
 311     MPI_Allreduce(&contig_access_count, &total_access_count, 1, MPI_INT, MPI_SUM,
 312                fd->comm);
 313     /* avoid possible divide-by-zero) */
 314     if (total_access_count != 0) {
 315         /* estimate average req_size */
 316         avg_req_size = (int)(total_req_size / total_access_count);
 317     } else {
 318         avg_req_size = 0;
 319     }
 320     /* get hint of big_req_size */
 321     big_req_size = fd->hints->fs_hints.lustre.coll_threshold;
 322     /* Don't perform collective I/O if there are big requests */
 323     if ((big_req_size > 0) && (avg_req_size > big_req_size))
 324         docollect = 0;
 325 
 326     return docollect;
 327 }

/* [<][>][^][v][top][bottom][index][help] */