1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3 * Copyright (C) 1997 University of Chicago.
4 * See COPYRIGHT notice in top-level directory.
5 *
6 * Copyright (C) 2007 Oak Ridge National Laboratory
7 *
8 * Copyright (C) 2008 Sun Microsystems, Lustre group
9 */
10
11 #include "ad_lustre.h"
12 #include "adio_extern.h"
13
14 #undef AGG_DEBUG
15
16 void ADIOI_LUSTRE_Get_striping_info(ADIO_File fd, int **striping_info_ptr,
17 int mode)
18 {
19 int *striping_info = NULL;
20 /* get striping information:
21 * striping_info[0]: stripe_size
22 * striping_info[1]: stripe_count
23 * striping_info[2]: avail_cb_nodes
24 */
25 int stripe_size, stripe_count, CO = 1;
26 int avail_cb_nodes, divisor, nprocs_for_coll = fd->hints->cb_nodes;
27
28 /* Get hints value */
29 /* stripe size */
30 stripe_size = fd->hints->striping_unit;
31 /* stripe count */
32 /* stripe_size and stripe_count have been validated in ADIOI_LUSTRE_Open() */
33 stripe_count = fd->hints->striping_factor;
34
35 /* Calculate the available number of I/O clients */
36 if (!mode) {
37 /* for collective read,
38 * if "CO" clients access the same OST simultaneously,
39 * the OST disk seek time would be much. So, to avoid this,
40 * it might be better if 1 client only accesses 1 OST.
41 * So, we set CO = 1 to meet the above requirement.
42 */
43 CO = 1;
44 /*XXX: maybe there are other better way for collective read */
45 } else {
46 /* CO also has been validated in ADIOI_LUSTRE_Open(), >0 */
47 CO = fd->hints->fs_hints.lustre.co_ratio;
48 }
49 /* Calculate how many IO clients we need */
50 /* Algorithm courtesy Pascal Deveze (pascal.deveze@bull.net) */
51 /* To avoid extent lock conflicts,
52 * avail_cb_nodes should either
53 * - be a multiple of stripe_count,
54 * - or divide stripe_count exactly
55 * so that each OST is accessed by a maximum of CO constant clients. */
56 if (nprocs_for_coll >= stripe_count)
57 /* avail_cb_nodes should be a multiple of stripe_count and the number
58 * of procs per OST should be limited to the minimum between
59 * nprocs_for_coll/stripe_count and CO
60 *
61 * e.g. if stripe_count=20, nprocs_for_coll=42 and CO=3 then
62 * avail_cb_nodes should be equal to 40 */
63 avail_cb_nodes =
64 stripe_count * ADIOI_MIN(nprocs_for_coll/stripe_count, CO);
65 else {
66 /* nprocs_for_coll is less than stripe_count */
67 /* avail_cb_nodes should divide stripe_count */
68 /* e.g. if stripe_count=60 and nprocs_for_coll=8 then
69 * avail_cb_nodes should be egal to 6 */
70 /* This could be done with :
71 while (stripe_count % avail_cb_nodes != 0) avail_cb_nodes--;
72 but this can be optimized for large values of nprocs_for_coll and
73 stripe_count */
74 divisor = 2;
75 avail_cb_nodes = 1;
76 /* try to divise */
77 while (stripe_count >= divisor*divisor) {
78 if ((stripe_count % divisor) == 0) {
79 if (stripe_count/divisor <= nprocs_for_coll) {
80 /* The value is found ! */
81 avail_cb_nodes = stripe_count/divisor;
82 break;
83 }
84 /* if divisor is less than nprocs_for_coll, divisor is a
85 * solution, but it is not sure that it is the best one */
86 else if (divisor <= nprocs_for_coll)
87 avail_cb_nodes = divisor;
88 }
89 divisor++;
90 }
91 }
92
93 *striping_info_ptr = (int *) ADIOI_Malloc(3 * sizeof(int));
94 striping_info = *striping_info_ptr;
95 striping_info[0] = stripe_size;
96 striping_info[1] = stripe_count;
97 striping_info[2] = avail_cb_nodes;
98 }
99
100 int ADIOI_LUSTRE_Calc_aggregator(ADIO_File fd, ADIO_Offset off,
101 ADIO_Offset *len, int *striping_info)
102 {
103 int rank_index, rank;
104 ADIO_Offset avail_bytes;
105 int stripe_size = striping_info[0];
106 int avail_cb_nodes = striping_info[2];
107
108 /* Produce the stripe-contiguous pattern for Lustre */
109 rank_index = (int)((off / stripe_size) % avail_cb_nodes);
110
111 /* we index into fd_end with rank_index, and fd_end was allocated to be no
112 * bigger than fd->hins->cb_nodes. If we ever violate that, we're
113 * overrunning arrays. Obviously, we should never ever hit this abort
114 */
115 if (rank_index >= fd->hints->cb_nodes)
116 MPI_Abort(MPI_COMM_WORLD, 1);
117
118 avail_bytes = (off / (ADIO_Offset)stripe_size + 1) *
119 (ADIO_Offset)stripe_size - off;
120 if (avail_bytes < *len) {
121 /* this proc only has part of the requested contig. region */
122 *len = avail_bytes;
123 }
124 /* map our index to a rank */
125 /* NOTE: FOR NOW WE DON'T HAVE A MAPPING...JUST DO 0..NPROCS_FOR_COLL */
126 rank = fd->hints->ranklist[rank_index];
127
128 return rank;
129 }
130
131 /* ADIOI_LUSTRE_Calc_my_req() - calculate what portions of the access requests
132 * of this process are located in the file domains of various processes
133 * (including this one)
134 */
135
136
137 void ADIOI_LUSTRE_Calc_my_req(ADIO_File fd, ADIO_Offset *offset_list,
138 ADIO_Offset *len_list, int contig_access_count,
139 int *striping_info, int nprocs,
140 int *count_my_req_procs_ptr,
141 int **count_my_req_per_proc_ptr,
142 ADIOI_Access **my_req_ptr,
143 int ***buf_idx_ptr)
144 {
145 /* Nothing different from ADIOI_Calc_my_req(), except calling
146 * ADIOI_Lustre_Calc_aggregator() instead of the old one */
147 int *count_my_req_per_proc, count_my_req_procs, **buf_idx;
148 int i, l, proc;
149 ADIO_Offset avail_len, rem_len, curr_idx, off;
150 ADIOI_Access *my_req;
151
152 *count_my_req_per_proc_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
153 count_my_req_per_proc = *count_my_req_per_proc_ptr;
154 /* count_my_req_per_proc[i] gives the no. of contig. requests of this
155 * process in process i's file domain. calloc initializes to zero.
156 * I'm allocating memory of size nprocs, so that I can do an
157 * MPI_Alltoall later on.
158 */
159
160 buf_idx = (int **) ADIOI_Malloc(nprocs * sizeof(int*));
161
162 /* one pass just to calculate how much space to allocate for my_req;
163 * contig_access_count was calculated way back in ADIOI_Calc_my_off_len()
164 */
165 for (i = 0; i < contig_access_count; i++) {
166 /* short circuit offset/len processing if len == 0
167 * (zero-byte read/write
168 */
169 if (len_list[i] == 0)
170 continue;
171 off = offset_list[i];
172 avail_len = len_list[i];
173 /* note: we set avail_len to be the total size of the access.
174 * then ADIOI_LUSTRE_Calc_aggregator() will modify the value to return
175 * the amount that was available.
176 */
177 proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info);
178 count_my_req_per_proc[proc]++;
179
180 /* figure out how many data is remaining in the access
181 * we'll take care of this data (if there is any)
182 * in the while loop below.
183 */
184 rem_len = len_list[i] - avail_len;
185
186 while (rem_len != 0) {
187 off += avail_len; /* point to first remaining byte */
188 avail_len = rem_len; /* save remaining size, pass to calc */
189 proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info);
190 count_my_req_per_proc[proc]++;
191 rem_len -= avail_len; /* reduce remaining length by amount from fd */
192 }
193 }
194
195 /* buf_idx is relevant only if buftype_is_contig.
196 * buf_idx[i] gives the index into user_buf where data received
197 * from proc 'i' should be placed. This allows receives to be done
198 * without extra buffer. This can't be done if buftype is not contig.
199 */
200
201 /* initialize buf_idx vectors */
202 for (i = 0; i < nprocs; i++) {
203 /* add one to count_my_req_per_proc[i] to avoid zero size malloc */
204 buf_idx[i] = (int *) ADIOI_Malloc((count_my_req_per_proc[i] + 1)
205 * sizeof(int));
206 }
207
208 /* now allocate space for my_req, offset, and len */
209 *my_req_ptr = (ADIOI_Access *) ADIOI_Malloc(nprocs * sizeof(ADIOI_Access));
210 my_req = *my_req_ptr;
211
212 count_my_req_procs = 0;
213 for (i = 0; i < nprocs; i++) {
214 if (count_my_req_per_proc[i]) {
215 my_req[i].offsets = (ADIO_Offset *)
216 ADIOI_Malloc(count_my_req_per_proc[i] *
217 sizeof(ADIO_Offset));
218 my_req[i].lens = ADIOI_Malloc(count_my_req_per_proc[i] *
219 sizeof(ADIO_Offset));
220 count_my_req_procs++;
221 }
222 my_req[i].count = 0; /* will be incremented where needed later */
223 }
224
225 /* now fill in my_req */
226 curr_idx = 0;
227 for (i = 0; i < contig_access_count; i++) {
228 /* short circuit offset/len processing if len == 0
229 * (zero-byte read/write */
230 if (len_list[i] == 0)
231 continue;
232 off = offset_list[i];
233 avail_len = len_list[i];
234 proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info);
235
236 l = my_req[proc].count;
237
238 ADIOI_Assert(curr_idx == (int) curr_idx);
239 ADIOI_Assert(l < count_my_req_per_proc[proc]);
240 buf_idx[proc][l] = (int) curr_idx;
241 curr_idx += avail_len;
242
243 rem_len = len_list[i] - avail_len;
244
245 /* store the proc, offset, and len information in an array
246 * of structures, my_req. Each structure contains the
247 * offsets and lengths located in that process's FD,
248 * and the associated count.
249 */
250 my_req[proc].offsets[l] = off;
251 ADIOI_Assert(avail_len == (int) avail_len);
252 my_req[proc].lens[l] = (int) avail_len;
253 my_req[proc].count++;
254
255 while (rem_len != 0) {
256 off += avail_len;
257 avail_len = rem_len;
258 proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len,
259 striping_info);
260
261 l = my_req[proc].count;
262 ADIOI_Assert(curr_idx == (int) curr_idx);
263 ADIOI_Assert(l < count_my_req_per_proc[proc]);
264 buf_idx[proc][l] = (int) curr_idx;
265
266 curr_idx += avail_len;
267 rem_len -= avail_len;
268
269 my_req[proc].offsets[l] = off;
270 ADIOI_Assert(avail_len == (int) avail_len);
271 my_req[proc].lens[l] = (int) avail_len;
272 my_req[proc].count++;
273 }
274 }
275
276 #ifdef AGG_DEBUG
277 for (i = 0; i < nprocs; i++) {
278 if (count_my_req_per_proc[i] > 0) {
279 FPRINTF(stdout, "data needed from %d (count = %d):\n",
280 i, my_req[i].count);
281 for (l = 0; l < my_req[i].count; l++) {
282 FPRINTF(stdout, " off[%d] = %lld, len[%d] = %d\n",
283 l, my_req[i].offsets[l], l, my_req[i].lens[l]);
284 }
285 }
286 }
287 #endif
288
289 *count_my_req_procs_ptr = count_my_req_procs;
290 *buf_idx_ptr = buf_idx;
291 }
292
293 int ADIOI_LUSTRE_Docollect(ADIO_File fd, int contig_access_count,
294 ADIO_Offset *len_list, int nprocs)
295 {
296 /* If the processes are non-interleaved, we will check the req_size.
297 * if (avg_req_size > big_req_size) {
298 * docollect = 0;
299 * }
300 */
301
302 int i, docollect = 1, big_req_size = 0;
303 ADIO_Offset req_size = 0, total_req_size;
304 int avg_req_size, total_access_count;
305
306 /* calculate total_req_size and total_access_count */
307 for (i = 0; i < contig_access_count; i++)
308 req_size += len_list[i];
309 MPI_Allreduce(&req_size, &total_req_size, 1, MPI_LONG_LONG_INT, MPI_SUM,
310 fd->comm);
311 MPI_Allreduce(&contig_access_count, &total_access_count, 1, MPI_INT, MPI_SUM,
312 fd->comm);
313 /* avoid possible divide-by-zero) */
314 if (total_access_count != 0) {
315 /* estimate average req_size */
316 avg_req_size = (int)(total_req_size / total_access_count);
317 } else {
318 avg_req_size = 0;
319 }
320 /* get hint of big_req_size */
321 big_req_size = fd->hints->fs_hints.lustre.coll_threshold;
322 /* Don't perform collective I/O if there are big requests */
323 if ((big_req_size > 0) && (avg_req_size > big_req_size))
324 docollect = 0;
325
326 return docollect;
327 }