This source file includes following definitions.
- ADIOI_Calc_aggregator
- ADIOI_Calc_file_domains
- ADIOI_Calc_my_req
- ADIOI_Calc_others_req
- ADIOI_Icalc_others_req
- ADIOI_Icalc_others_req_main
- ADIOI_Icalc_others_req_fini
1
2
3
4
5
6
7 #include "adio.h"
8 #include "adio_extern.h"
9
10 #ifdef AGGREGATION_PROFILE
11 #include "mpe.h"
12 #endif
13
14 #undef AGG_DEBUG
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73 int ADIOI_Calc_aggregator(ADIO_File fd,
74 ADIO_Offset off,
75 ADIO_Offset min_off,
76 ADIO_Offset *len,
77 ADIO_Offset fd_size,
78 ADIO_Offset *fd_start,
79 ADIO_Offset *fd_end)
80 {
81 int rank_index, rank;
82 ADIO_Offset avail_bytes;
83
84 ADIOI_UNREFERENCED_ARG(fd_start);
85
86
87 rank_index = (int) ((off - min_off + fd_size)/ fd_size - 1);
88
89 if (fd->hints->striping_unit > 0) {
90
91
92
93
94 rank_index = 0;
95 while (off > fd_end[rank_index]) rank_index++;
96 }
97
98
99
100
101 if (rank_index >= fd->hints->cb_nodes || rank_index < 0) {
102 FPRINTF(stderr, "Error in ADIOI_Calc_aggregator(): rank_index(%d) >= fd->hints->cb_nodes (%d) fd_size=%lld off=%lld\n",
103 rank_index,fd->hints->cb_nodes,fd_size,off);
104 MPI_Abort(MPI_COMM_WORLD, 1);
105 }
106
107
108
109
110
111
112
113
114 avail_bytes = fd_end[rank_index] + 1 - off;
115 if (avail_bytes < *len) {
116
117 *len = avail_bytes;
118 }
119
120
121
122 rank = fd->hints->ranklist[rank_index];
123
124 return rank;
125 }
126
127 void ADIOI_Calc_file_domains(ADIO_Offset *st_offsets, ADIO_Offset
128 *end_offsets, int nprocs, int nprocs_for_coll,
129 ADIO_Offset *min_st_offset_ptr,
130 ADIO_Offset **fd_start_ptr, ADIO_Offset
131 **fd_end_ptr, int min_fd_size,
132 ADIO_Offset *fd_size_ptr,
133 int striping_unit)
134 {
135
136
137
138
139 ADIO_Offset min_st_offset, max_end_offset, *fd_start, *fd_end, fd_size;
140 int i;
141
142 #ifdef AGGREGATION_PROFILE
143 MPE_Log_event (5004, 0, NULL);
144 #endif
145
146 #ifdef AGG_DEBUG
147 FPRINTF(stderr, "ADIOI_Calc_file_domains: %d aggregator(s)\n",
148 nprocs_for_coll);
149 #endif
150
151
152
153 min_st_offset = st_offsets[0];
154 max_end_offset = end_offsets[0];
155
156 for (i=1; i<nprocs; i++) {
157 min_st_offset = ADIOI_MIN(min_st_offset, st_offsets[i]);
158 max_end_offset = ADIOI_MAX(max_end_offset, end_offsets[i]);
159 }
160
161
162
163
164
165
166 fd_size = ((max_end_offset - min_st_offset + 1) + nprocs_for_coll -
167 1)/nprocs_for_coll;
168
169
170
171
172
173
174
175 if (fd_size < min_fd_size)
176 fd_size = min_fd_size;
177
178 *fd_start_ptr = (ADIO_Offset *)
179 ADIOI_Malloc(nprocs_for_coll*sizeof(ADIO_Offset));
180 *fd_end_ptr = (ADIO_Offset *)
181 ADIOI_Malloc(nprocs_for_coll*sizeof(ADIO_Offset));
182
183 fd_start = *fd_start_ptr;
184 fd_end = *fd_end_ptr;
185
186
187
188
189 if (striping_unit > 0) {
190 ADIO_Offset end_off;
191 int rem_front, rem_back;
192
193
194 fd_start[0] = min_st_offset;
195 end_off = fd_start[0] + fd_size;
196 rem_front = end_off % striping_unit;
197 rem_back = striping_unit - rem_front;
198 if (rem_front < rem_back)
199 end_off -= rem_front;
200 else
201 end_off += rem_back;
202 fd_end[0] = end_off - 1;
203
204
205 for (i=1; i<nprocs_for_coll; i++) {
206 fd_start[i] = fd_end[i-1] + 1;
207 end_off = min_st_offset + fd_size * (i+1);
208 rem_front = end_off % striping_unit;
209 rem_back = striping_unit - rem_front;
210 if (rem_front < rem_back)
211 end_off -= rem_front;
212 else
213 end_off += rem_back;
214 fd_end[i] = end_off - 1;
215 }
216 fd_end[nprocs_for_coll-1] = max_end_offset;
217 }
218 else {
219 fd_start[0] = min_st_offset;
220 fd_end[0] = min_st_offset + fd_size - 1;
221
222 for (i=1; i<nprocs_for_coll; i++) {
223 fd_start[i] = fd_end[i-1] + 1;
224 fd_end[i] = fd_start[i] + fd_size - 1;
225 }
226 }
227
228
229
230
231
232
233
234 for (i=0; i<nprocs_for_coll; i++) {
235 if (fd_start[i] > max_end_offset)
236 fd_start[i] = fd_end[i] = -1;
237 if (fd_end[i] > max_end_offset)
238 fd_end[i] = max_end_offset;
239 }
240
241 *fd_size_ptr = fd_size;
242 *min_st_offset_ptr = min_st_offset;
243
244 #ifdef AGGREGATION_PROFILE
245 MPE_Log_event (5005, 0, NULL);
246 #endif
247 }
248
249
250
251
252
253
254 void ADIOI_Calc_my_req(ADIO_File fd, ADIO_Offset *offset_list, ADIO_Offset *len_list,
255 int contig_access_count, ADIO_Offset
256 min_st_offset, ADIO_Offset *fd_start,
257 ADIO_Offset *fd_end, ADIO_Offset fd_size,
258 int nprocs,
259 int *count_my_req_procs_ptr,
260 int **count_my_req_per_proc_ptr,
261 ADIOI_Access **my_req_ptr,
262 int **buf_idx_ptr)
263
264
265 {
266 int *count_my_req_per_proc, count_my_req_procs, *buf_idx;
267 int i, l, proc;
268 ADIO_Offset fd_len, rem_len, curr_idx, off;
269 ADIOI_Access *my_req;
270
271 #ifdef AGGREGATION_PROFILE
272 MPE_Log_event (5024, 0, NULL);
273 #endif
274
275 *count_my_req_per_proc_ptr = (int *) ADIOI_Calloc(nprocs,sizeof(int));
276 count_my_req_per_proc = *count_my_req_per_proc_ptr;
277
278
279
280
281
282 buf_idx = (int *) ADIOI_Malloc(nprocs*sizeof(int));
283
284
285
286
287
288
289 for (i=0; i < nprocs; i++) buf_idx[i] = -1;
290
291
292
293
294 for (i=0; i < contig_access_count; i++) {
295
296
297 if (len_list[i] == 0)
298 continue;
299 off = offset_list[i];
300 fd_len = len_list[i];
301
302
303
304
305
306 proc = ADIOI_Calc_aggregator(fd, off, min_st_offset, &fd_len, fd_size,
307 fd_start, fd_end);
308 count_my_req_per_proc[proc]++;
309
310
311
312
313
314 rem_len = len_list[i] - fd_len;
315
316 while (rem_len != 0) {
317 off += fd_len;
318 fd_len = rem_len;
319 proc = ADIOI_Calc_aggregator(fd, off, min_st_offset, &fd_len,
320 fd_size, fd_start, fd_end);
321
322 count_my_req_per_proc[proc]++;
323 rem_len -= fd_len;
324 }
325 }
326
327
328
329 *my_req_ptr = (ADIOI_Access *)
330 ADIOI_Malloc(nprocs*sizeof(ADIOI_Access));
331 my_req = *my_req_ptr;
332
333 count_my_req_procs = 0;
334 for (i=0; i < nprocs; i++) {
335 if (count_my_req_per_proc[i]) {
336 my_req[i].offsets = (ADIO_Offset *)
337 ADIOI_Malloc(count_my_req_per_proc[i] * sizeof(ADIO_Offset));
338 my_req[i].lens =
339 ADIOI_Malloc(count_my_req_per_proc[i] * sizeof(ADIO_Offset));
340 count_my_req_procs++;
341 }
342 my_req[i].count = 0;
343
344 }
345
346
347 curr_idx = 0;
348 for (i=0; i<contig_access_count; i++) {
349
350
351 if (len_list[i] == 0)
352 continue;
353 off = offset_list[i];
354 fd_len = len_list[i];
355 proc = ADIOI_Calc_aggregator(fd, off, min_st_offset, &fd_len, fd_size,
356 fd_start, fd_end);
357
358
359 if (buf_idx[proc] == -1)
360 {
361 ADIOI_Assert(curr_idx == (int) curr_idx);
362 buf_idx[proc] = (int) curr_idx;
363 }
364
365 l = my_req[proc].count;
366 curr_idx += fd_len;
367
368 rem_len = len_list[i] - fd_len;
369
370
371
372
373
374
375 my_req[proc].offsets[l] = off;
376 my_req[proc].lens[l] = fd_len;
377 my_req[proc].count++;
378
379 while (rem_len != 0) {
380 off += fd_len;
381 fd_len = rem_len;
382 proc = ADIOI_Calc_aggregator(fd, off, min_st_offset, &fd_len,
383 fd_size, fd_start, fd_end);
384
385 if (buf_idx[proc] == -1)
386 {
387 ADIOI_Assert(curr_idx == (int) curr_idx);
388 buf_idx[proc] = (int) curr_idx;
389 }
390
391 l = my_req[proc].count;
392 curr_idx += fd_len;
393 rem_len -= fd_len;
394
395 my_req[proc].offsets[l] = off;
396 my_req[proc].lens[l] = fd_len;
397 my_req[proc].count++;
398 }
399 }
400
401 #ifdef AGG_DEBUG
402 for (i=0; i<nprocs; i++) {
403 if (count_my_req_per_proc[i] > 0) {
404 FPRINTF(stdout, "data needed from %d (count = %d):\n", i,
405 my_req[i].count);
406 for (l=0; l < my_req[i].count; l++) {
407 FPRINTF(stdout, " off[%d] = %lld, len[%d] = %d\n", l,
408 my_req[i].offsets[l], l, my_req[i].lens[l]);
409 }
410 FPRINTF(stdout, "buf_idx[%d] = 0x%x\n", i, buf_idx[i]);
411 }
412 }
413 #endif
414
415 *count_my_req_procs_ptr = count_my_req_procs;
416 *buf_idx_ptr = buf_idx;
417 #ifdef AGGREGATION_PROFILE
418 MPE_Log_event (5025, 0, NULL);
419 #endif
420 }
421
422
423
424 void ADIOI_Calc_others_req(ADIO_File fd, int count_my_req_procs,
425 int *count_my_req_per_proc,
426 ADIOI_Access *my_req,
427 int nprocs, int myrank,
428 int *count_others_req_procs_ptr,
429 ADIOI_Access **others_req_ptr)
430 {
431
432
433
434
435
436
437
438
439 int *count_others_req_per_proc, count_others_req_procs;
440 int i, j;
441 MPI_Request *requests;
442 MPI_Status *statuses;
443 ADIOI_Access *others_req;
444
445
446 #ifdef AGGREGATION_PROFILE
447 MPE_Log_event (5026, 0, NULL);
448 #endif
449 count_others_req_per_proc = (int *) ADIOI_Malloc(nprocs*sizeof(int));
450
451 MPI_Alltoall(count_my_req_per_proc, 1, MPI_INT,
452 count_others_req_per_proc, 1, MPI_INT, fd->comm);
453
454 *others_req_ptr = (ADIOI_Access *)
455 ADIOI_Malloc(nprocs*sizeof(ADIOI_Access));
456 others_req = *others_req_ptr;
457
458 count_others_req_procs = 0;
459 for (i=0; i<nprocs; i++) {
460 if (count_others_req_per_proc[i]) {
461 others_req[i].count = count_others_req_per_proc[i];
462 others_req[i].offsets = (ADIO_Offset *)
463 ADIOI_Malloc(count_others_req_per_proc[i]*sizeof(ADIO_Offset));
464 others_req[i].lens =
465 ADIOI_Malloc(count_others_req_per_proc[i]*sizeof(ADIO_Offset));
466 others_req[i].mem_ptrs = (MPI_Aint *)
467 ADIOI_Malloc(count_others_req_per_proc[i]*sizeof(MPI_Aint));
468 count_others_req_procs++;
469 }
470 else others_req[i].count = 0;
471 }
472
473
474
475 requests = (MPI_Request *)
476 ADIOI_Malloc(1+2*(count_my_req_procs+count_others_req_procs)*sizeof(MPI_Request));
477
478
479 j = 0;
480 for (i=0; i<nprocs; i++) {
481 if (others_req[i].count) {
482 MPI_Irecv(others_req[i].offsets, others_req[i].count,
483 ADIO_OFFSET, i, i+myrank, fd->comm, &requests[j]);
484 j++;
485 MPI_Irecv(others_req[i].lens, others_req[i].count,
486 ADIO_OFFSET, i, i+myrank+1, fd->comm, &requests[j]);
487 j++;
488 }
489 }
490
491 for (i=0; i < nprocs; i++) {
492 if (my_req[i].count) {
493 MPI_Isend(my_req[i].offsets, my_req[i].count,
494 ADIO_OFFSET, i, i+myrank, fd->comm, &requests[j]);
495 j++;
496 MPI_Isend(my_req[i].lens, my_req[i].count,
497 ADIO_OFFSET, i, i+myrank+1, fd->comm, &requests[j]);
498 j++;
499 }
500 }
501
502 if (j) {
503 statuses = (MPI_Status *) ADIOI_Malloc(j * sizeof(MPI_Status));
504 MPI_Waitall(j, requests, statuses);
505 ADIOI_Free(statuses);
506 }
507
508 ADIOI_Free(requests);
509 ADIOI_Free(count_others_req_per_proc);
510
511 *count_others_req_procs_ptr = count_others_req_procs;
512 #ifdef AGGREGATION_PROFILE
513 MPE_Log_event (5027, 0, NULL);
514 #endif
515 }
516
517
518
519
520
521 void ADIOI_Icalc_others_req(ADIOI_NBC_Request *nbc_req, int *error_code)
522 {
523 ADIOI_Icalc_others_req_vars *vars = nbc_req->cor_vars;
524
525
526
527
528
529 #ifdef AGGREGATION_PROFILE
530 MPE_Log_event(5026, 0, NULL);
531 #endif
532 vars->count_others_req_per_proc =
533 (int *)ADIOI_Malloc(vars->nprocs * sizeof(int));
534
535 *error_code = MPI_Ialltoall(vars->count_my_req_per_proc, 1, MPI_INT,
536 vars->count_others_req_per_proc, 1, MPI_INT, vars->fd->comm,
537 &vars->req1);
538
539 if (nbc_req->rdwr == ADIOI_READ) {
540 nbc_req->data.rd.state = ADIOI_IRC_STATE_ICALC_OTHERS_REQ;
541 } else {
542 ADIOI_Assert(nbc_req->rdwr == ADIOI_WRITE);
543 nbc_req->data.wr.state = ADIOI_IWC_STATE_ICALC_OTHERS_REQ;
544 }
545 }
546
547 void ADIOI_Icalc_others_req_main(ADIOI_NBC_Request *nbc_req, int *error_code)
548 {
549 ADIOI_Icalc_others_req_vars *vars = nbc_req->cor_vars;
550 ADIO_File fd = vars->fd;
551 int count_my_req_procs = vars->count_my_req_procs;
552 ADIOI_Access *my_req = vars->my_req;
553 int nprocs = vars->nprocs;
554 int myrank = vars->myrank;
555 ADIOI_Access **others_req_ptr = vars->others_req_ptr;
556
557
558
559
560
561
562
563
564
565 int *count_others_req_per_proc = vars->count_others_req_per_proc;
566 int count_others_req_procs;
567 int i, j;
568 ADIOI_Access *others_req;
569
570 *others_req_ptr = (ADIOI_Access *)ADIOI_Malloc(nprocs*sizeof(ADIOI_Access));
571 others_req = *others_req_ptr;
572
573 count_others_req_procs = 0;
574 for (i = 0; i < nprocs; i++) {
575 if (count_others_req_per_proc[i]) {
576 others_req[i].count = count_others_req_per_proc[i];
577 others_req[i].offsets = (ADIO_Offset *)
578 ADIOI_Malloc(count_others_req_per_proc[i]*sizeof(ADIO_Offset));
579 others_req[i].lens =
580 ADIOI_Malloc(count_others_req_per_proc[i]*sizeof(ADIO_Offset));
581 others_req[i].mem_ptrs = (MPI_Aint *)
582 ADIOI_Malloc(count_others_req_per_proc[i]*sizeof(MPI_Aint));
583 count_others_req_procs++;
584 }
585 else others_req[i].count = 0;
586 }
587 vars->count_others_req_procs = count_others_req_procs;
588
589
590
591 vars->req2 = (MPI_Request *)
592 ADIOI_Malloc(1+2*(count_my_req_procs+count_others_req_procs)
593 *sizeof(MPI_Request));
594
595
596 j = 0;
597 for (i = 0; i < nprocs; i++) {
598 if (others_req[i].count) {
599 MPI_Irecv(others_req[i].offsets, others_req[i].count,
600 ADIO_OFFSET, i, i+myrank, fd->comm, &vars->req2[j]);
601 j++;
602 MPI_Irecv(others_req[i].lens, others_req[i].count,
603 ADIO_OFFSET, i, i+myrank+1, fd->comm, &vars->req2[j]);
604 j++;
605 }
606 }
607
608 for (i=0; i < nprocs; i++) {
609 if (my_req[i].count) {
610 MPI_Isend(my_req[i].offsets, my_req[i].count,
611 ADIO_OFFSET, i, i+myrank, fd->comm, &vars->req2[j]);
612 j++;
613 MPI_Isend(my_req[i].lens, my_req[i].count,
614 ADIO_OFFSET, i, i+myrank+1, fd->comm, &vars->req2[j]);
615 j++;
616 }
617 }
618
619
620 vars->num_req2 = j;
621
622 if (nbc_req->rdwr == ADIOI_READ) {
623 nbc_req->data.rd.state = ADIOI_IRC_STATE_ICALC_OTHERS_REQ_MAIN;
624 } else {
625 ADIOI_Assert(nbc_req->rdwr == ADIOI_WRITE);
626 nbc_req->data.wr.state = ADIOI_IWC_STATE_ICALC_OTHERS_REQ_MAIN;
627 }
628 }
629
630 void ADIOI_Icalc_others_req_fini(ADIOI_NBC_Request *nbc_req, int *error_code)
631 {
632 ADIOI_Icalc_others_req_vars *vars = nbc_req->cor_vars;
633 void (*next_fn)(ADIOI_NBC_Request *, int *);
634
635 ADIOI_Free(vars->req2);
636 ADIOI_Free(vars->count_others_req_per_proc);
637
638 *vars->count_others_req_procs_ptr = vars->count_others_req_procs;
639 #ifdef AGGREGATION_PROFILE
640 MPE_Log_event(5027, 0, NULL);
641 #endif
642
643
644 next_fn = vars->next_fn;
645
646
647 ADIOI_Free(vars);
648 nbc_req->cor_vars = NULL;
649
650
651 next_fn(nbc_req, error_code);
652 }
653