1 /* ---------------------------------------------------------------- */
2 /* (C)Copyright IBM Corp. 2007, 2008, 2019 */
3 /* ---------------------------------------------------------------- */
4 /**
5 * \file ad_gpfs_aggrs.c
6 * \brief The externally used function from this file is is declared in ad_gpfs_aggrs.h
7 */
8
9 /* -*- Mode: C; c-basic-offset:4 ; -*- */
10 /*
11 * Copyright (C) 1997-2001 University of Chicago.
12 * See COPYRIGHT notice in top-level directory.
13 */
14
15
16 #include "adio.h"
17 #include "adio_cb_config_list.h"
18 #include "ad_gpfs.h"
19 #include "ad_gpfs_aggrs.h"
20
21 #ifdef AGGREGATION_PROFILE
22 #include "mpe.h"
23 #endif
24
25
26 #ifdef USE_DBG_LOGGING
27 #define AGG_DEBUG 1
28 #endif
29
30 #ifndef TRACE_ERR
31 # define TRACE_ERR(format...)
32 #endif
33
34 /* Comments copied from common:
35 * This file contains four functions:
36 *
37 * ADIOI_Calc_aggregator()
38 * ADIOI_Calc_file_domains()
39 * ADIOI_Calc_my_req()
40 * ADIOI_Calc_others_req()
41 *
42 * The last three of these were originally in ad_read_coll.c, but they are
43 * also shared with ad_write_coll.c. I felt that they were better kept with
44 * the rest of the shared aggregation code.
45 */
46
47 /* Discussion of values available from above:
48 *
49 * ADIO_Offset st_offsets[0..nprocs-1]
50 * ADIO_Offset end_offsets[0..nprocs-1]
51 * These contain a list of start and end offsets for each process in
52 * the communicator. For example, an access at loc 10, size 10 would
53 * have a start offset of 10 and end offset of 19.
54 * int nprocs
55 * number of processors in the collective I/O communicator
56 * ADIO_Offset min_st_offset
57 * ADIO_Offset fd_start[0..nprocs_for_coll-1]
58 * starting location of "file domain"; region that a given process will
59 * perform aggregation for (i.e. actually do I/O)
60 * ADIO_Offset fd_end[0..nprocs_for_coll-1]
61 * start + size - 1 roughly, but it can be less, or 0, in the case of
62 * uneven distributions
63 */
64
65 /* Description from common/ad_aggregate.c. (Does it completely apply to bg?)
66 * ADIOI_Calc_aggregator()
67 *
68 * The intention here is to implement a function which provides basically
69 * the same functionality as in Rajeev's original version of
70 * ADIOI_Calc_my_req(). He used a ceiling division approach to assign the
71 * file domains, and we use the same approach here when calculating the
72 * location of an offset/len in a specific file domain. Further we assume
73 * this same distribution when calculating the rank_index, which is later
74 * used to map to a specific process rank in charge of the file domain.
75 *
76 * A better (i.e. more general) approach would be to use the list of file
77 * domains only. This would be slower in the case where the
78 * original ceiling division was used, but it would allow for arbitrary
79 * distributions of regions to aggregators. We'd need to know the
80 * nprocs_for_coll in that case though, which we don't have now.
81 *
82 * Note a significant difference between this function and Rajeev's old code:
83 * this code doesn't necessarily return a rank in the range
84 * 0..nprocs_for_coll; instead you get something in 0..nprocs. This is a
85 * result of the rank mapping; any set of ranks in the communicator could be
86 * used now.
87 *
88 * Returns an integer representing a rank in the collective I/O communicator.
89 *
90 * The "len" parameter is also modified to indicate the amount of data
91 * actually available in this file domain.
92 */
93 /*
94 * This is more general aggregator search function which does not base on the assumption
95 * that each aggregator hosts the file domain with the same size
96 */
97 int ADIOI_GPFS_Calc_aggregator(ADIO_File fd,
98 ADIO_Offset off,
99 ADIO_Offset min_off,
100 ADIO_Offset *len,
101 ADIO_Offset fd_size,
102 ADIO_Offset *fd_start,
103 ADIO_Offset *fd_end)
104 {
105 int rank_index, rank;
106 ADIO_Offset avail_bytes;
107 TRACE_ERR("Entering ADIOI_GPFS_Calc_aggregator\n");
108
109 ADIOI_Assert ( (off <= fd_end[fd->hints->cb_nodes-1] && off >= min_off && fd_start[0] >= min_off ) );
110
111 /* binary search --> rank_index is returned */
112 int ub = fd->hints->cb_nodes;
113 int lb = 0;
114 /* get an index into our array of aggregators */
115 /* Common code for striping - bg doesn't use it but it's
116 here to make diff'ing easier.
117 rank_index = (int) ((off - min_off + fd_size)/ fd_size - 1);
118
119 if (fd->hints->striping_unit > 0) {
120 * wkliao: implementation for file domain alignment
121 fd_start[] and fd_end[] have been aligned with file lock
122 boundaries when returned from ADIOI_Calc_file_domains() so cannot
123 just use simple arithmatic as above *
124 rank_index = 0;
125 while (off > fd_end[rank_index]) rank_index++;
126 }
127 bg does it's own striping below
128 */
129 rank_index = fd->hints->cb_nodes / 2;
130 while ( off < fd_start[rank_index] || off > fd_end[rank_index] ) {
131 if ( off > fd_end [rank_index] ) {
132 lb = rank_index;
133 rank_index = (rank_index + ub) / 2;
134 }
135 else
136 if ( off < fd_start[rank_index] ) {
137 ub = rank_index;
138 rank_index = (rank_index + lb) / 2;
139 }
140 }
141 /* we index into fd_end with rank_index, and fd_end was allocated to be no
142 * bigger than fd->hins->cb_nodes. If we ever violate that, we're
143 * overrunning arrays. Obviously, we should never ever hit this abort */
144 if (rank_index >= fd->hints->cb_nodes || rank_index < 0) {
145 FPRINTF(stderr, "Error in ADIOI_Calc_aggregator(): rank_index(%d) >= fd->hints->cb_nodes (%d) fd_size=%lld off=%lld\n",
146 rank_index,fd->hints->cb_nodes,fd_size,off);
147 MPI_Abort(MPI_COMM_WORLD, 1);
148 }
149 /* DBG_FPRINTF ("ADIOI_GPFS_Calc_aggregator: rank_index = %d\n",
150 rank_index ); */
151
152 /*
153 * remember here that even in Rajeev's original code it was the case that
154 * different aggregators could end up with different amounts of data to
155 * aggregate. here we use fd_end[] to make sure that we know how much
156 * data this aggregator is working with.
157 *
158 * the +1 is to take into account the end vs. length issue.
159 */
160 avail_bytes = fd_end[rank_index] + 1 - off;
161 if (avail_bytes < *len && avail_bytes > 0) {
162 /* this file domain only has part of the requested contig. region */
163
164 *len = avail_bytes;
165 }
166
167 /* map our index to a rank */
168 /* NOTE: FOR NOW WE DON'T HAVE A MAPPING...JUST DO 0..NPROCS_FOR_COLL */
169 rank = fd->hints->ranklist[rank_index];
170 TRACE_ERR("Leaving ADIOI_GPFS_Calc_aggregator\n");
171
172 return rank;
173 }
174
175 /*
176 * Compute a dynamic access range based file domain partition among I/O aggregators,
177 * which align to the GPFS block size
178 * Divide the I/O workload among "nprocs_for_coll" processes. This is
179 * done by (logically) dividing the file into file domains (FDs); each
180 * process may directly access only its own file domain.
181 * Additional effort is to make sure that each I/O aggregator get
182 * a file domain that aligns to the GPFS block size. So, there will
183 * not be any false sharing of GPFS file blocks among multiple I/O nodes.
184 *
185 * The common version of this now accepts a min_fd_size and striping_unit.
186 * It doesn't seem necessary here (using GPFS block sizes) but keep it in mind
187 * (e.g. we could pass striping unit instead of using fs_ptr->blksize).
188 */
189 void ADIOI_GPFS_Calc_file_domains(ADIO_File fd,
190 ADIO_Offset *st_offsets,
191 ADIO_Offset *end_offsets,
192 int nprocs,
193 int nprocs_for_coll,
194 ADIO_Offset *min_st_offset_ptr,
195 ADIO_Offset **fd_start_ptr,
196 ADIO_Offset **fd_end_ptr,
197 ADIO_Offset *fd_size_ptr,
198 void *fs_ptr)
199 {
200 ADIO_Offset min_st_offset, max_end_offset, *fd_start, *fd_end, *fd_size;
201 int i, aggr;
202 TRACE_ERR("Entering ADIOI_GPFS_Calc_file_domains\n");
203 blksize_t blksize;
204
205 #ifdef AGGREGATION_PROFILE
206 MPE_Log_event (5004, 0, NULL);
207 #endif
208
209 # if AGG_DEBUG
210 static char myname[] = "ADIOI_GPFS_Calc_file_domains";
211 DBG_FPRINTF(stderr, "%s(%d): %d aggregator(s)\n",
212 myname,__LINE__,nprocs_for_coll);
213 # endif
214 if (fd->blksize <= 0)
215 /* default to 1M if blksize unset */
216 fd->blksize = 1048576;
217 blksize = fd->blksize;
218
219 # if AGG_DEBUG
220 DBG_FPRINTF(stderr,"%s(%d): Blocksize=%ld\n",myname,__LINE__,blksize);
221 # endif
222 /* find min of start offsets and max of end offsets of all processes */
223 min_st_offset = st_offsets [0];
224 max_end_offset = end_offsets[0];
225 for (i=1; i<nprocs; i++) {
226 min_st_offset = ADIOI_MIN(min_st_offset, st_offsets[i]);
227 max_end_offset = ADIOI_MAX(max_end_offset, end_offsets[i]);
228 }
229
230 /* DBG_FPRINTF(stderr, "_calc_file_domains, min_st_offset, max_
231 = %qd, %qd\n", min_st_offset, max_end_offset );*/
232
233 /* determine the "file domain (FD)" of each process, i.e., the portion of
234 the file that will be "owned" by each process */
235
236 ADIO_Offset gpfs_ub = (max_end_offset +blksize-1) / blksize * blksize - 1;
237 ADIO_Offset gpfs_lb = min_st_offset / blksize * blksize;
238 ADIO_Offset gpfs_ub_rdoff = (max_end_offset +blksize-1) / blksize * blksize - 1 - max_end_offset;
239 ADIO_Offset gpfs_lb_rdoff = min_st_offset - min_st_offset / blksize * blksize;
240 ADIO_Offset fd_gpfs_range = gpfs_ub - gpfs_lb + 1;
241
242 int naggs = nprocs_for_coll;
243
244 /* Tweak the file domains so that no fd is smaller than a threshold. We
245 * have to strike a balance between efficency and parallelism: somewhere
246 * between 10k processes sending 32-byte requests and one process sending a
247 * 320k request is a (system-dependent) sweet spot
248
249 This is from the common code - the new min_fd_size parm that we didn't implement.
250 (And common code uses a different declaration of fd_size so beware)
251
252 if (fd_size < min_fd_size)
253 fd_size = min_fd_size;
254 */
255 fd_size = (ADIO_Offset *) ADIOI_Malloc(nprocs_for_coll * sizeof(ADIO_Offset));
256 *fd_start_ptr = (ADIO_Offset *) ADIOI_Malloc(nprocs_for_coll * sizeof(ADIO_Offset));
257 *fd_end_ptr = (ADIO_Offset *) ADIOI_Malloc(nprocs_for_coll * sizeof(ADIO_Offset));
258 fd_start = *fd_start_ptr;
259 fd_end = *fd_end_ptr;
260
261 /* each process will have a file domain of some number of gpfs blocks, but
262 * the division of blocks is not likely to be even. Some file domains will
263 * be "large" and others "small"
264 *
265 * Example: consider 17 blocks distributed over 3 aggregators.
266 * nb_cn_small = 17/3 = 5
267 * naggs_large = 17 - 3*(17/3) = 17 - 15 = 2
268 * naggs_small = 3 - 2 = 1
269 *
270 * and you end up with file domains of {5-blocks, 6-blocks, 6-blocks}
271 *
272 * what about (relatively) small files? say, a file of 1000 blocks
273 * distributed over 2064 aggregators:
274 * nb_cn_small = 1000/2064 = 0
275 * naggs_large = 1000 - 2064*(1000/2064) = 1000
276 * naggs_small = 2064 - 1000 = 1064
277 * and you end up with domains of {0, 0, 0, ... 1, 1, 1 ...}
278 *
279 * it might be a good idea instead of having all the zeros up front, to
280 * "mix" those zeros into the fd_size array. that way, no pset/bridge-set
281 * is left with zero work. In fact, even if the small file domains aren't
282 * zero, it's probably still a good idea to mix the "small" file domains
283 * across the fd_size array to keep the io nodes in balance */
284
285
286 ADIO_Offset n_gpfs_blk = fd_gpfs_range / blksize;
287 ADIO_Offset nb_cn_small = n_gpfs_blk/naggs;
288 ADIO_Offset naggs_large = n_gpfs_blk - naggs * (n_gpfs_blk/naggs);
289 ADIO_Offset naggs_small = naggs - naggs_large;
290
291 #ifdef BGQPLATFORM
292 if (gpfsmpio_balancecontig == 1) {
293 /* File domains blocks are assigned to aggregators in a breadth-first
294 * fashion relative to the ions - additionally, file domains on the
295 * aggregators sharing the same bridgeset and ion have contiguous
296 * offsets. */
297
298 // initialize everything to small
299 for (i=0; i<naggs; i++)
300 fd_size[i] = nb_cn_small * blksize;
301
302 // go thru and distribute the large across the bridges
303
304 /* bridelistoffset: agg rank list offsets using the bridgelist - each
305 * entry is created by adding up the indexes for the aggs from all
306 * previous bridges */
307 int *bridgelistoffset =
308 (int *) ADIOI_Malloc(fd->hints->fs_hints.bg.numbridges*sizeof(int));
309 /* tmpbridgelistnum: copy of the bridgelistnum whose entries can be
310 * decremented to keep track of bridge assignments during the actual
311 * large block assignments to the agg rank list*/
312 int *tmpbridgelistnum =
313 (int *) ADIOI_Malloc(fd->hints->fs_hints.bg.numbridges*sizeof(int));
314
315 int j;
316 for (j=0;j<fd->hints->fs_hints.bg.numbridges;j++) {
317 int k, bridgerankoffset = 0;
318 for (k=0;k<j;k++) {
319 bridgerankoffset += fd->hints->fs_hints.bg.bridgelistnum[k];
320 }
321 bridgelistoffset[j] = bridgerankoffset;
322 }
323
324 for (j=0;j<fd->hints->fs_hints.bg.numbridges;j++)
325 tmpbridgelistnum[j] = fd->hints->fs_hints.bg.bridgelistnum[j];
326 int bridgeiter = 0;
327
328 /* distribute the large blocks across the aggs going breadth-first
329 * across the bridgelist - this distributes the fd sizes across the
330 * ions, so later in the file domain assignment when it iterates thru
331 * the ranklist the offsets will be contiguous within the bridge and
332 * ion as well */
333 for (j=0;j<naggs_large;j++) {
334 int foundbridge = 0;
335 int numbridgelistpasses = 0;
336 while (!foundbridge) {
337 if (tmpbridgelistnum[bridgeiter] > 0) {
338 foundbridge = 1;
339 /*
340 printf("bridgeiter is %d tmpbridgelistnum[bridgeiter] is %d bridgelistoffset[bridgeiter] is %d\n",bridgeiter,tmpbridgelistnum[bridgeiter],bridgelistoffset[bridgeiter]);
341 printf("naggs is %d bridgeiter is %d bridgelistoffset[bridgeiter] is %d tmpbridgelistnum[bridgeiter] is %d\n",naggs, bridgeiter,bridgelistoffset[bridgeiter],tmpbridgelistnum[bridgeiter]);
342 printf("naggs is %d bridgeiter is %d setting fd_size[%d]\n",naggs, bridgeiter,bridgelistoffset[bridgeiter]+(fd->hints->bridgelistnum[bridgeiter]-tmpbridgelistnum[bridgeiter]));
343 */
344 int currentbridgelistnum =
345 (fd->hints->fs_hints.bg.bridgelistnum[bridgeiter]-
346 tmpbridgelistnum[bridgeiter]);
347 int currentfdsizeindex = bridgelistoffset[bridgeiter] +
348 currentbridgelistnum;
349 fd_size[currentfdsizeindex] = (nb_cn_small+1) * blksize;
350 tmpbridgelistnum[bridgeiter]--;
351 }
352 if (bridgeiter == (fd->hints->fs_hints.bg.numbridges-1)) {
353 /* guard against infinite loop - should only ever make 1 pass
354 * thru bridgelist */
355 ADIOI_Assert(numbridgelistpasses == 0);
356 numbridgelistpasses++;
357 bridgeiter = 0;
358 }
359 else
360 bridgeiter++;
361 }
362 }
363 ADIOI_Free(tmpbridgelistnum);
364 ADIOI_Free(bridgelistoffset);
365
366 } else {
367 /* BG/L- and BG/P-style distribution of file domains: simple allocation of
368 * file domins to each aggregator */
369 for (i=0; i<naggs; i++) {
370 if (i < naggs_large) {
371 fd_size[i] = (nb_cn_small+1) * blksize;
372 } else {
373 fd_size[i] = nb_cn_small * blksize;
374 }
375 }
376 }
377 #ifdef balancecontigtrace
378 int myrank;
379 MPI_Comm_rank(fd->comm,&myrank);
380 if (myrank == 0) {
381 fprintf(stderr,"naggs_small is %d nb_cn_small is %d\n",naggs_small,nb_cn_small);
382 for (i=0; i<naggs; i++) {
383 fprintf(stderr,"fd_size[%d] set to %d agg rank is %d\n",i,fd_size[i],fd->hints->ranklist[i]);
384 }
385 }
386 #endif
387
388 #else // not BGQ platform
389 for (i=0; i<naggs; i++) {
390 if (i < naggs_large) {
391 fd_size[i] = (nb_cn_small+1) * blksize;
392 } else {
393 fd_size[i] = nb_cn_small * blksize;
394 }
395 }
396
397 #endif
398
399
400 # if AGG_DEBUG
401 DBG_FPRINTF(stderr,"%s(%d): "
402 "gpfs_ub %llu, "
403 "gpfs_lb %llu, "
404 "gpfs_ub_rdoff %llu, "
405 "gpfs_lb_rdoff %llu, "
406 "fd_gpfs_range %llu, "
407 "n_gpfs_blk %llu, "
408 "nb_cn_small %llu, "
409 "naggs_large %llu, "
410 "naggs_small %llu, "
411 "\n",
412 myname,__LINE__,
413 gpfs_ub ,
414 gpfs_lb ,
415 gpfs_ub_rdoff,
416 gpfs_lb_rdoff,
417 fd_gpfs_range,
418 n_gpfs_blk ,
419 nb_cn_small ,
420 naggs_large ,
421 naggs_small
422 );
423 # endif
424
425 fd_size[0] -= gpfs_lb_rdoff;
426 fd_size[naggs-1] -= gpfs_ub_rdoff;
427
428 /* compute the file domain for each aggr */
429 ADIO_Offset offset = min_st_offset;
430 for (aggr=0; aggr<naggs; aggr++) {
431 fd_start[aggr] = offset;
432 fd_end [aggr] = offset + fd_size[aggr] - 1;
433 offset += fd_size[aggr];
434 }
435
436 *fd_size_ptr = fd_size[0];
437 *min_st_offset_ptr = min_st_offset;
438
439 #ifdef AGGREGATION_PROFILE
440 MPE_Log_event (5005, 0, NULL);
441 #endif
442 ADIOI_Free (fd_size);
443 TRACE_ERR("Leaving ADIOI_GPFS_Calc_file_domains\n");
444 }
445
446 /*
447 * ADIOI_GPFS_Calc_my_req() overrides ADIOI_Calc_my_req for the default implementation
448 * is specific for static file domain partitioning.
449 *
450 * ADIOI_Calc_my_req() - calculate what portions of the access requests
451 * of this process are located in the file domains of various processes
452 * (including this one)
453 */
454 void ADIOI_GPFS_Calc_my_req(ADIO_File fd, ADIO_Offset *offset_list, ADIO_Offset *len_list,
455 int contig_access_count, ADIO_Offset
456 min_st_offset, ADIO_Offset *fd_start,
457 ADIO_Offset *fd_end, ADIO_Offset fd_size,
458 int nprocs,
459 int *count_my_req_procs_ptr,
460 int **count_my_req_per_proc_ptr,
461 ADIOI_Access **my_req_ptr,
462 int **buf_idx_ptr)
463 /* Possibly reconsider if buf_idx's are ok as int's, or should they be aints/offsets?
464 They are used as memory buffer indices so it seems like the 2G limit is in effect */
465 {
466 int *count_my_req_per_proc, count_my_req_procs, *buf_idx;
467 int i, l, proc;
468 ADIO_Offset fd_len, rem_len, curr_idx, off;
469 ADIOI_Access *my_req;
470 TRACE_ERR("Entering ADIOI_GPFS_Calc_my_req\n");
471
472 #ifdef AGGREGATION_PROFILE
473 MPE_Log_event (5024, 0, NULL);
474 #endif
475 *count_my_req_per_proc_ptr = (int *) ADIOI_Calloc(nprocs,sizeof(int));
476 count_my_req_per_proc = *count_my_req_per_proc_ptr;
477 /* count_my_req_per_proc[i] gives the no. of contig. requests of this
478 process in process i's file domain. calloc initializes to zero.
479 I'm allocating memory of size nprocs, so that I can do an
480 MPI_Alltoall later on.*/
481
482 buf_idx = (int *) ADIOI_Malloc(nprocs*sizeof(int));
483 /* buf_idx is relevant only if buftype_is_contig.
484 buf_idx[i] gives the index into user_buf where data received
485 from proc. i should be placed. This allows receives to be done
486 without extra buffer. This can't be done if buftype is not contig. */
487
488 /* initialize buf_idx to -1 */
489 for (i=0; i < nprocs; i++) buf_idx[i] = -1;
490
491 /* one pass just to calculate how much space to allocate for my_req;
492 * contig_access_count was calculated way back in ADIOI_Calc_my_off_len()
493 */
494 for (i=0; i < contig_access_count; i++) {
495 /* short circuit offset/len processing if len == 0
496 * (zero-byte read/write */
497 if (len_list[i] == 0)
498 continue;
499 off = offset_list[i];
500 fd_len = len_list[i];
501 /* note: we set fd_len to be the total size of the access. then
502 * ADIOI_Calc_aggregator() will modify the value to return the
503 * amount that was available from the file domain that holds the
504 * first part of the access.
505 */
506 /* BES */
507 proc = ADIOI_GPFS_Calc_aggregator(fd, off, min_st_offset, &fd_len, fd_size,
508 fd_start, fd_end);
509 count_my_req_per_proc[proc]++;
510
511 /* figure out how much data is remaining in the access (i.e. wasn't
512 * part of the file domain that had the starting byte); we'll take
513 * care of this data (if there is any) in the while loop below.
514 */
515 rem_len = len_list[i] - fd_len;
516
517 while (rem_len > 0) {
518 off += fd_len; /* point to first remaining byte */
519 fd_len = rem_len; /* save remaining size, pass to calc */
520 proc = ADIOI_GPFS_Calc_aggregator(fd, off, min_st_offset, &fd_len,
521 fd_size, fd_start, fd_end);
522
523 count_my_req_per_proc[proc]++;
524 rem_len -= fd_len; /* reduce remaining length by amount from fd */
525 }
526 }
527
528 /* now allocate space for my_req, offset, and len */
529
530 *my_req_ptr = (ADIOI_Access *)
531 ADIOI_Malloc(nprocs*sizeof(ADIOI_Access));
532 my_req = *my_req_ptr;
533
534 count_my_req_procs = 0;
535 for (i=0; i < nprocs; i++) {
536 if (count_my_req_per_proc[i]) {
537 my_req[i].offsets = (ADIO_Offset *)
538 ADIOI_Malloc(count_my_req_per_proc[i] * sizeof(ADIO_Offset));
539 my_req[i].lens =
540 ADIOI_Malloc(count_my_req_per_proc[i] * sizeof(ADIO_Offset));
541 count_my_req_procs++;
542 }
543 my_req[i].count = 0; /* will be incremented where needed
544 later */
545 }
546
547 /* now fill in my_req */
548 curr_idx = 0;
549 for (i=0; i<contig_access_count; i++) {
550 /* short circuit offset/len processing if len == 0
551 * (zero-byte read/write */
552 if (len_list[i] == 0)
553 continue;
554 off = offset_list[i];
555 fd_len = len_list[i];
556 proc = ADIOI_GPFS_Calc_aggregator(fd, off, min_st_offset, &fd_len, fd_size,
557 fd_start, fd_end);
558
559 /* for each separate contiguous access from this process */
560 if (buf_idx[proc] == -1)
561 {
562 ADIOI_Assert(curr_idx == (int) curr_idx);
563 buf_idx[proc] = (int) curr_idx;
564 }
565
566 l = my_req[proc].count;
567 curr_idx += fd_len;
568
569 rem_len = len_list[i] - fd_len;
570
571 /* store the proc, offset, and len information in an array
572 * of structures, my_req. Each structure contains the
573 * offsets and lengths located in that process's FD,
574 * and the associated count.
575 */
576 my_req[proc].offsets[l] = off;
577 my_req[proc].lens[l] = fd_len;
578 my_req[proc].count++;
579
580 while (rem_len > 0) {
581 off += fd_len;
582 fd_len = rem_len;
583 proc = ADIOI_GPFS_Calc_aggregator(fd, off, min_st_offset, &fd_len,
584 fd_size, fd_start, fd_end);
585
586 if (buf_idx[proc] == -1)
587 {
588 ADIOI_Assert(curr_idx == (int) curr_idx);
589 buf_idx[proc] = (int) curr_idx;
590 }
591
592 l = my_req[proc].count;
593 curr_idx += fd_len;
594 rem_len -= fd_len;
595
596 my_req[proc].offsets[l] = off;
597 my_req[proc].lens[l] = fd_len;
598 my_req[proc].count++;
599 }
600 }
601
602
603
604 #ifdef AGG_DEBUG
605 for (i=0; i<nprocs; i++) {
606 if (count_my_req_per_proc[i] > 0) {
607 DBG_FPRINTF(stderr, "data needed from %d (count = %d):\n", i,
608 my_req[i].count);
609 for (l=0; l < my_req[i].count; l++) {
610 DBG_FPRINTF(stderr, " off[%d] = %lld, len[%d] = %lld\n", l,
611 my_req[i].offsets[l], l, my_req[i].lens[l]);
612 }
613 }
614 DBG_FPRINTF(stderr, "buf_idx[%d] = 0x%x\n", i, buf_idx[i]);
615 }
616 #endif
617
618 *count_my_req_procs_ptr = count_my_req_procs;
619 *buf_idx_ptr = buf_idx;
620 #ifdef AGGREGATION_PROFILE
621 MPE_Log_event (5025, 0, NULL);
622 #endif
623 TRACE_ERR("Leaving ADIOI_GPFS_Calc_my_req\n");
624 }
625
626 /*
627 * ADIOI_Calc_others_req (copied to bg and switched to all to all for performance)
628 *
629 * param[in] count_my_req_procs Number of processes whose file domain my
630 * request touches.
631 * param[in] count_my_req_per_proc count_my_req_per_proc[i] gives the no. of
632 * contig. requests of this process in
633 * process i's file domain.
634 * param[in] my_req A structure defining my request
635 * param[in] nprocs Number of nodes in the block
636 * param[in] myrank Rank of this node
637 * param[out] count_others_req_proc_ptr Number of processes whose requests lie in
638 * my process's file domain (including my
639 * process itself)
640 * param[out] others_req_ptr Array of other process' requests that lie
641 * in my process's file domain
642 */
643 void ADIOI_GPFS_Calc_others_req(ADIO_File fd, int count_my_req_procs,
644 int *count_my_req_per_proc,
645 ADIOI_Access *my_req,
646 int nprocs, int myrank,
647 int *count_others_req_procs_ptr,
648 ADIOI_Access **others_req_ptr)
649 {
650 TRACE_ERR("Entering ADIOI_GPFS_Calc_others_req\n");
651 /* determine what requests of other processes lie in this process's
652 file domain */
653
654 /* count_others_req_procs = number of processes whose requests lie in
655 this process's file domain (including this process itself)
656 count_others_req_per_proc[i] indicates how many separate contiguous
657 requests of proc. i lie in this process's file domain. */
658
659 int *count_others_req_per_proc, count_others_req_procs;
660 int i;
661 ADIOI_Access *others_req;
662
663 /* Parameters for MPI_Alltoallv */
664 int *scounts, *sdispls, *rcounts, *rdispls;
665
666 /* first find out how much to send/recv and from/to whom */
667 #ifdef AGGREGATION_PROFILE
668 MPE_Log_event (5026, 0, NULL);
669 #endif
670 /* Send 1 int to each process. count_my_req_per_proc[i] is the number of
671 * requests that my process will do to the file domain owned by process[i].
672 * Receive 1 int from each process. count_others_req_per_proc[i] is the number of
673 * requests that process[i] will do to the file domain owned by my process.
674 */
675 count_others_req_per_proc = (int *) ADIOI_Malloc(nprocs*sizeof(int));
676 /* cora2a1=timebase(); */
677 /*for(i=0;i<nprocs;i++) ?*/
678 MPI_Alltoall(count_my_req_per_proc, 1, MPI_INT,
679 count_others_req_per_proc, 1, MPI_INT, fd->comm);
680
681 /* total_cora2a+=timebase()-cora2a1; */
682
683 /* Allocate storage for an array of other nodes' accesses of our
684 * node's file domain. Also allocate storage for the alltoallv
685 * parameters.
686 */
687 *others_req_ptr = (ADIOI_Access *)
688 ADIOI_Malloc(nprocs*sizeof(ADIOI_Access));
689 others_req = *others_req_ptr;
690
691 scounts = ADIOI_Malloc(nprocs*sizeof(int));
692 sdispls = ADIOI_Malloc(nprocs*sizeof(int));
693 rcounts = ADIOI_Malloc(nprocs*sizeof(int));
694 rdispls = ADIOI_Malloc(nprocs*sizeof(int));
695
696 /* If process[i] has any requests in my file domain,
697 * initialize an ADIOI_Access structure that will describe each request
698 * from process[i]. The offsets, lengths, and buffer pointers still need
699 * to be obtained to complete the setting of this structure.
700 */
701 count_others_req_procs = 0;
702 for (i=0; i<nprocs; i++) {
703 if (count_others_req_per_proc[i])
704 {
705 others_req[i].count = count_others_req_per_proc[i];
706
707 others_req[i].offsets = (ADIO_Offset *)
708 ADIOI_Malloc(count_others_req_per_proc[i]*sizeof(ADIO_Offset));
709 others_req[i].lens =
710 ADIOI_Malloc(count_others_req_per_proc[i]*sizeof(ADIO_Offset));
711
712 others_req[i].mem_ptrs = (MPI_Aint *)
713 ADIOI_Malloc(count_others_req_per_proc[i]*sizeof(MPI_Aint));
714
715 count_others_req_procs++;
716 }
717 else
718 {
719 others_req[i].count = 0;
720 others_req[i].offsets = NULL;
721 others_req[i].lens = NULL;
722 }
723 }
724
725 /* Now send the calculated offsets and lengths to respective processes */
726
727 /************************/
728 /* Exchange the offsets */
729 /************************/
730
731 // Figure out the layout for the sendbuf and recvbuf.
732 // scounts[] and sdisps[] / rcounts[] and rdisps[] define the layout,
733 // and the data for each section will come from from my_req[i].offsets
734 // or others_req[i].offsets.
735
736 int scount_total = 0;
737 int rcount_total = 0;
738 for (i=0; i<nprocs; i++)
739 {
740 /* Send these offsets to process i.*/
741 scounts[i] = count_my_req_per_proc[i];
742 sdispls[i] = scount_total;
743 scount_total += scounts[i];
744
745 /* Receive these offsets from process i.*/
746 rcounts[i] = count_others_req_per_proc[i];
747 rdispls[i] = rcount_total;
748 rcount_total += rcounts[i];
749 }
750
751 void *sbuf_copy_of_req_info;
752 void *rbuf_copy_of_req_info;
753
754 sbuf_copy_of_req_info = (ADIO_Offset *) ADIOI_Malloc(scount_total * sizeof(ADIO_Offset));
755 rbuf_copy_of_req_info = (ADIO_Offset *) ADIOI_Malloc(rcount_total * sizeof(ADIO_Offset));
756 for (i=0; i<nprocs; i++)
757 {
758 // I haven't timed it, I'm just assuming a memcpy(,,0) is fast for
759 // the entries that don't have data to contribute so I didn't bother
760 // with an 'if' statement
761 memcpy(sbuf_copy_of_req_info + sdispls[i] * sizeof(ADIO_Offset),
762 my_req[i].offsets,
763 scounts[i] * sizeof(ADIO_Offset));
764 }
765
766 /* Exchange the offsets */
767 MPI_Alltoallv(sbuf_copy_of_req_info,
768 scounts, sdispls, ADIO_OFFSET,
769 rbuf_copy_of_req_info,
770 rcounts, rdispls, ADIO_OFFSET,
771 fd->comm);
772 for (i=0; i<nprocs; i++)
773 {
774 memcpy(others_req[i].offsets,
775 rbuf_copy_of_req_info + rdispls[i] * sizeof(ADIO_Offset),
776 rcounts[i] * sizeof(ADIO_Offset));
777 }
778
779 /************************/
780 /* Exchange the lengths */
781 /************************/
782
783 for (i=0; i<nprocs; i++)
784 {
785 memcpy(sbuf_copy_of_req_info + sdispls[i] * sizeof(ADIO_Offset),
786 my_req[i].lens,
787 scounts[i] * sizeof(ADIO_Offset));
788 }
789
790 /* Exchange the lengths */
791 MPI_Alltoallv(sbuf_copy_of_req_info,
792 scounts, sdispls, ADIO_OFFSET,
793 rbuf_copy_of_req_info,
794 rcounts, rdispls, ADIO_OFFSET,
795 fd->comm);
796 for (i=0; i<nprocs; i++)
797 {
798 memcpy(others_req[i].lens,
799 rbuf_copy_of_req_info + rdispls[i] * sizeof(ADIO_Offset),
800 rcounts[i] * sizeof(ADIO_Offset));
801 }
802
803 /* Clean up */
804 ADIOI_Free(sbuf_copy_of_req_info);
805 ADIOI_Free(rbuf_copy_of_req_info);
806 ADIOI_Free(count_others_req_per_proc);
807 ADIOI_Free (scounts);
808 ADIOI_Free (sdispls);
809 ADIOI_Free (rcounts);
810 ADIOI_Free (rdispls);
811
812 *count_others_req_procs_ptr = count_others_req_procs;
813 #ifdef AGGREGATION_PROFILE
814 MPE_Log_event (5027, 0, NULL);
815 #endif
816 TRACE_ERR("Leaving ADIOI_GPFS_Calc_others_req\n");
817 }