This source file includes following definitions.
- ADIOI_LUSTRE_Get_striping_info
- ADIOI_LUSTRE_Calc_aggregator
- ADIOI_LUSTRE_Calc_my_req
- ADIOI_LUSTRE_Docollect
1
2
3
4
5
6
7
8
9
10
11 #include "ad_lustre.h"
12 #include "adio_extern.h"
13
14 #undef AGG_DEBUG
15
16 void ADIOI_LUSTRE_Get_striping_info(ADIO_File fd, int **striping_info_ptr,
17 int mode)
18 {
19 int *striping_info = NULL;
20
21
22
23
24
25 int stripe_size, stripe_count, CO = 1;
26 int avail_cb_nodes, divisor, nprocs_for_coll = fd->hints->cb_nodes;
27
28
29
30 stripe_size = fd->hints->striping_unit;
31
32
33 stripe_count = fd->hints->striping_factor;
34
35
36 if (!mode) {
37
38
39
40
41
42
43 CO = 1;
44
45 } else {
46
47 CO = fd->hints->fs_hints.lustre.co_ratio;
48 }
49
50
51
52
53
54
55
56 if (nprocs_for_coll >= stripe_count)
57
58
59
60
61
62
63 avail_cb_nodes =
64 stripe_count * ADIOI_MIN(nprocs_for_coll/stripe_count, CO);
65 else {
66
67
68
69
70
71
72
73
74 divisor = 2;
75 avail_cb_nodes = 1;
76
77 while (stripe_count >= divisor*divisor) {
78 if ((stripe_count % divisor) == 0) {
79 if (stripe_count/divisor <= nprocs_for_coll) {
80
81 avail_cb_nodes = stripe_count/divisor;
82 break;
83 }
84
85
86 else if (divisor <= nprocs_for_coll)
87 avail_cb_nodes = divisor;
88 }
89 divisor++;
90 }
91 }
92
93 *striping_info_ptr = (int *) ADIOI_Malloc(3 * sizeof(int));
94 striping_info = *striping_info_ptr;
95 striping_info[0] = stripe_size;
96 striping_info[1] = stripe_count;
97 striping_info[2] = avail_cb_nodes;
98 }
99
100 int ADIOI_LUSTRE_Calc_aggregator(ADIO_File fd, ADIO_Offset off,
101 ADIO_Offset *len, int *striping_info)
102 {
103 int rank_index, rank;
104 ADIO_Offset avail_bytes;
105 int stripe_size = striping_info[0];
106 int avail_cb_nodes = striping_info[2];
107
108
109 rank_index = (int)((off / stripe_size) % avail_cb_nodes);
110
111
112
113
114
115 if (rank_index >= fd->hints->cb_nodes)
116 MPI_Abort(MPI_COMM_WORLD, 1);
117
118 avail_bytes = (off / (ADIO_Offset)stripe_size + 1) *
119 (ADIO_Offset)stripe_size - off;
120 if (avail_bytes < *len) {
121
122 *len = avail_bytes;
123 }
124
125
126 rank = fd->hints->ranklist[rank_index];
127
128 return rank;
129 }
130
131
132
133
134
135
136
137 void ADIOI_LUSTRE_Calc_my_req(ADIO_File fd, ADIO_Offset *offset_list,
138 ADIO_Offset *len_list, int contig_access_count,
139 int *striping_info, int nprocs,
140 int *count_my_req_procs_ptr,
141 int **count_my_req_per_proc_ptr,
142 ADIOI_Access **my_req_ptr,
143 int ***buf_idx_ptr)
144 {
145
146
147 int *count_my_req_per_proc, count_my_req_procs, **buf_idx;
148 int i, l, proc;
149 ADIO_Offset avail_len, rem_len, curr_idx, off;
150 ADIOI_Access *my_req;
151
152 *count_my_req_per_proc_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
153 count_my_req_per_proc = *count_my_req_per_proc_ptr;
154
155
156
157
158
159
160 buf_idx = (int **) ADIOI_Malloc(nprocs * sizeof(int*));
161
162
163
164
165 for (i = 0; i < contig_access_count; i++) {
166
167
168
169 if (len_list[i] == 0)
170 continue;
171 off = offset_list[i];
172 avail_len = len_list[i];
173
174
175
176
177 proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info);
178 count_my_req_per_proc[proc]++;
179
180
181
182
183
184 rem_len = len_list[i] - avail_len;
185
186 while (rem_len != 0) {
187 off += avail_len;
188 avail_len = rem_len;
189 proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info);
190 count_my_req_per_proc[proc]++;
191 rem_len -= avail_len;
192 }
193 }
194
195
196
197
198
199
200
201
202 for (i = 0; i < nprocs; i++) {
203
204 buf_idx[i] = (int *) ADIOI_Malloc((count_my_req_per_proc[i] + 1)
205 * sizeof(int));
206 }
207
208
209 *my_req_ptr = (ADIOI_Access *) ADIOI_Malloc(nprocs * sizeof(ADIOI_Access));
210 my_req = *my_req_ptr;
211
212 count_my_req_procs = 0;
213 for (i = 0; i < nprocs; i++) {
214 if (count_my_req_per_proc[i]) {
215 my_req[i].offsets = (ADIO_Offset *)
216 ADIOI_Malloc(count_my_req_per_proc[i] *
217 sizeof(ADIO_Offset));
218 my_req[i].lens = ADIOI_Malloc(count_my_req_per_proc[i] *
219 sizeof(ADIO_Offset));
220 count_my_req_procs++;
221 }
222 my_req[i].count = 0;
223 }
224
225
226 curr_idx = 0;
227 for (i = 0; i < contig_access_count; i++) {
228
229
230 if (len_list[i] == 0)
231 continue;
232 off = offset_list[i];
233 avail_len = len_list[i];
234 proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info);
235
236 l = my_req[proc].count;
237
238 ADIOI_Assert(curr_idx == (int) curr_idx);
239 ADIOI_Assert(l < count_my_req_per_proc[proc]);
240 buf_idx[proc][l] = (int) curr_idx;
241 curr_idx += avail_len;
242
243 rem_len = len_list[i] - avail_len;
244
245
246
247
248
249
250 my_req[proc].offsets[l] = off;
251 ADIOI_Assert(avail_len == (int) avail_len);
252 my_req[proc].lens[l] = (int) avail_len;
253 my_req[proc].count++;
254
255 while (rem_len != 0) {
256 off += avail_len;
257 avail_len = rem_len;
258 proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len,
259 striping_info);
260
261 l = my_req[proc].count;
262 ADIOI_Assert(curr_idx == (int) curr_idx);
263 ADIOI_Assert(l < count_my_req_per_proc[proc]);
264 buf_idx[proc][l] = (int) curr_idx;
265
266 curr_idx += avail_len;
267 rem_len -= avail_len;
268
269 my_req[proc].offsets[l] = off;
270 ADIOI_Assert(avail_len == (int) avail_len);
271 my_req[proc].lens[l] = (int) avail_len;
272 my_req[proc].count++;
273 }
274 }
275
276 #ifdef AGG_DEBUG
277 for (i = 0; i < nprocs; i++) {
278 if (count_my_req_per_proc[i] > 0) {
279 FPRINTF(stdout, "data needed from %d (count = %d):\n",
280 i, my_req[i].count);
281 for (l = 0; l < my_req[i].count; l++) {
282 FPRINTF(stdout, " off[%d] = %lld, len[%d] = %d\n",
283 l, my_req[i].offsets[l], l, my_req[i].lens[l]);
284 }
285 }
286 }
287 #endif
288
289 *count_my_req_procs_ptr = count_my_req_procs;
290 *buf_idx_ptr = buf_idx;
291 }
292
293 int ADIOI_LUSTRE_Docollect(ADIO_File fd, int contig_access_count,
294 ADIO_Offset *len_list, int nprocs)
295 {
296
297
298
299
300
301
302 int i, docollect = 1, big_req_size = 0;
303 ADIO_Offset req_size = 0, total_req_size;
304 int avg_req_size, total_access_count;
305
306
307 for (i = 0; i < contig_access_count; i++)
308 req_size += len_list[i];
309 MPI_Allreduce(&req_size, &total_req_size, 1, MPI_LONG_LONG_INT, MPI_SUM,
310 fd->comm);
311 MPI_Allreduce(&contig_access_count, &total_access_count, 1, MPI_INT, MPI_SUM,
312 fd->comm);
313
314 if (total_access_count != 0) {
315
316 avg_req_size = (int)(total_req_size / total_access_count);
317 } else {
318 avg_req_size = 0;
319 }
320
321 big_req_size = fd->hints->fs_hints.lustre.coll_threshold;
322
323 if ((big_req_size > 0) && (avg_req_size > big_req_size))
324 docollect = 0;
325
326 return docollect;
327 }