This source file includes following definitions.
- ADIOI_GEN_ReadStridedColl
- ADIOI_Calc_my_off_len
- ADIOI_Read_and_exch
- ADIOI_R_Exchange_data
- ADIOI_Fill_user_buffer
1
2
3
4
5
6
7
8 #include "adio.h"
9 #include "adio_extern.h"
10
11 #ifdef USE_DBG_LOGGING
12 #define RDCOLL_DEBUG 1
13 #endif
14 #ifdef AGGREGATION_PROFILE
15 #include "mpe.h"
16 #endif
17
18
19 static void ADIOI_Read_and_exch(ADIO_File fd, void *buf, MPI_Datatype
20 datatype, int nprocs,
21 int myrank, ADIOI_Access
22 *others_req, ADIO_Offset *offset_list,
23 ADIO_Offset *len_list, int contig_access_count,
24 ADIO_Offset
25 min_st_offset, ADIO_Offset fd_size,
26 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
27 int *buf_idx, int *error_code);
28 static void ADIOI_R_Exchange_data(ADIO_File fd, void *buf, ADIOI_Flatlist_node
29 *flat_buf, ADIO_Offset *offset_list, ADIO_Offset
30 *len_list, int *send_size, int *recv_size,
31 int *count, int *start_pos,
32 int *partial_send,
33 int *recd_from_proc, int nprocs,
34 int myrank, int
35 buftype_is_contig, int contig_access_count,
36 ADIO_Offset min_st_offset,
37 ADIO_Offset fd_size,
38 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
39 ADIOI_Access *others_req,
40 int iter,
41 MPI_Aint buftype_extent, int *buf_idx);
42 void ADIOI_Fill_user_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
43 *flat_buf, char **recv_buf, ADIO_Offset
44 *offset_list, ADIO_Offset *len_list,
45 unsigned *recv_size,
46 MPI_Request *requests, MPI_Status *statuses,
47 int *recd_from_proc, int nprocs,
48 int contig_access_count,
49 ADIO_Offset min_st_offset,
50 ADIO_Offset fd_size, ADIO_Offset *fd_start,
51 ADIO_Offset *fd_end,
52 MPI_Aint buftype_extent);
53
54
55 void ADIOI_GEN_ReadStridedColl(ADIO_File fd, void *buf, int count,
56 MPI_Datatype datatype, int file_ptr_type,
57 ADIO_Offset offset, ADIO_Status *status, int
58 *error_code)
59 {
60
61
62
63
64
65
66 ADIOI_Access *my_req;
67
68
69
70 ADIOI_Access *others_req;
71
72
73
74 int i, filetype_is_contig, nprocs, nprocs_for_coll, myrank;
75 int contig_access_count=0, interleave_count = 0, buftype_is_contig;
76 int *count_my_req_per_proc, count_my_req_procs, count_others_req_procs;
77 ADIO_Offset start_offset, end_offset, orig_fp, fd_size, min_st_offset, off;
78 ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *fd_start = NULL,
79 *fd_end = NULL, *end_offsets = NULL;
80 ADIO_Offset *len_list = NULL;
81 int *buf_idx = NULL;
82
83 #ifdef HAVE_STATUS_SET_BYTES
84 MPI_Count bufsize, size;
85 #endif
86
87 if (fd->hints->cb_pfr != ADIOI_HINT_DISABLE) {
88 ADIOI_IOStridedColl (fd, buf, count, ADIOI_READ, datatype,
89 file_ptr_type, offset, status, error_code);
90 return;
91 }
92
93
94 MPI_Comm_size(fd->comm, &nprocs);
95 MPI_Comm_rank(fd->comm, &myrank);
96
97
98 nprocs_for_coll = fd->hints->cb_nodes;
99 orig_fp = fd->fp_ind;
100
101
102 if (fd->hints->cb_read != ADIOI_HINT_DISABLE) {
103
104
105
106
107
108
109 ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,
110 &offset_list, &len_list, &start_offset,
111 &end_offset, &contig_access_count);
112
113 #ifdef RDCOLL_DEBUG
114 for (i=0; i<contig_access_count; i++) {
115 DBG_FPRINTF(stderr, "rank %d off %lld len %lld\n",
116 myrank, offset_list[i], len_list[i]);
117 }
118 #endif
119
120
121
122
123
124 st_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
125 end_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
126
127 MPI_Allgather(&start_offset, 1, ADIO_OFFSET, st_offsets, 1,
128 ADIO_OFFSET, fd->comm);
129 MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1,
130 ADIO_OFFSET, fd->comm);
131
132
133 for (i=1; i<nprocs; i++)
134 if ((st_offsets[i] < end_offsets[i-1]) &&
135 (st_offsets[i] <= end_offsets[i]))
136 interleave_count++;
137
138
139 }
140
141 ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
142
143 if (fd->hints->cb_read == ADIOI_HINT_DISABLE
144 || (!interleave_count && (fd->hints->cb_read == ADIOI_HINT_AUTO)))
145 {
146
147 if (fd->hints->cb_read != ADIOI_HINT_DISABLE) {
148 ADIOI_Free(offset_list);
149 ADIOI_Free(len_list);
150 ADIOI_Free(st_offsets);
151 ADIOI_Free(end_offsets);
152 }
153
154 fd->fp_ind = orig_fp;
155 ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
156
157 if (buftype_is_contig && filetype_is_contig) {
158 if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
159 off = fd->disp + (fd->etype_size) * offset;
160 ADIO_ReadContig(fd, buf, count, datatype, ADIO_EXPLICIT_OFFSET,
161 off, status, error_code);
162 }
163 else ADIO_ReadContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
164 0, status, error_code);
165 }
166 else ADIO_ReadStrided(fd, buf, count, datatype, file_ptr_type,
167 offset, status, error_code);
168
169 return;
170 }
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188 ADIOI_Calc_file_domains(st_offsets, end_offsets, nprocs,
189 nprocs_for_coll, &min_st_offset,
190 &fd_start, &fd_end,
191 fd->hints->min_fdomain_size, &fd_size,
192 fd->hints->striping_unit);
193
194
195
196
197
198
199
200
201
202
203
204
205
206 ADIOI_Calc_my_req(fd, offset_list, len_list, contig_access_count,
207 min_st_offset, fd_start, fd_end, fd_size,
208 nprocs, &count_my_req_procs,
209 &count_my_req_per_proc, &my_req,
210 &buf_idx);
211
212
213
214
215
216
217
218
219 ADIOI_Calc_others_req(fd, count_my_req_procs,
220 count_my_req_per_proc, my_req,
221 nprocs, myrank, &count_others_req_procs,
222 &others_req);
223
224
225
226
227 ADIOI_Free(count_my_req_per_proc);
228 for (i=0; i<nprocs; i++) {
229 if (my_req[i].count) {
230 ADIOI_Free(my_req[i].offsets);
231 ADIOI_Free(my_req[i].lens);
232 }
233 }
234 ADIOI_Free(my_req);
235
236
237
238
239
240 ADIOI_Read_and_exch(fd, buf, datatype, nprocs, myrank,
241 others_req, offset_list,
242 len_list, contig_access_count, min_st_offset,
243 fd_size, fd_start, fd_end, buf_idx, error_code);
244
245 if (!buftype_is_contig) ADIOI_Delete_flattened(datatype);
246
247
248 for (i=0; i<nprocs; i++) {
249 if (others_req[i].count) {
250 ADIOI_Free(others_req[i].offsets);
251 ADIOI_Free(others_req[i].lens);
252 ADIOI_Free(others_req[i].mem_ptrs);
253 }
254 }
255 ADIOI_Free(others_req);
256
257 ADIOI_Free(buf_idx);
258 ADIOI_Free(offset_list);
259 ADIOI_Free(len_list);
260 ADIOI_Free(st_offsets);
261 ADIOI_Free(end_offsets);
262 ADIOI_Free(fd_start);
263 ADIOI_Free(fd_end);
264
265 #ifdef HAVE_STATUS_SET_BYTES
266 MPI_Type_size_x(datatype, &size);
267 bufsize = size * count;
268 MPIR_Status_set_bytes(status, datatype, bufsize);
269
270
271
272 #endif
273
274 fd->fp_sys_posn = -1;
275 }
276
277 void ADIOI_Calc_my_off_len(ADIO_File fd, int bufcount, MPI_Datatype
278 datatype, int file_ptr_type, ADIO_Offset
279 offset, ADIO_Offset **offset_list_ptr, ADIO_Offset
280 **len_list_ptr, ADIO_Offset *start_offset_ptr,
281 ADIO_Offset *end_offset_ptr, int
282 *contig_access_count_ptr)
283 {
284 MPI_Count filetype_size, etype_size;
285 MPI_Count buftype_size;
286 int i, j, k;
287 ADIO_Offset i_offset;
288 ADIO_Offset frd_size=0, old_frd_size=0;
289 int st_index=0;
290 ADIO_Offset n_filetypes, etype_in_filetype;
291 ADIO_Offset abs_off_in_filetype=0;
292 ADIO_Offset bufsize;
293 ADIO_Offset sum, n_etypes_in_filetype, size_in_filetype;
294 int contig_access_count, filetype_is_contig;
295 ADIO_Offset *len_list;
296 MPI_Aint filetype_extent, filetype_lb;
297 ADIOI_Flatlist_node *flat_file;
298 ADIO_Offset *offset_list, off, end_offset=0, disp;
299
300 #ifdef AGGREGATION_PROFILE
301 MPE_Log_event (5028, 0, NULL);
302 #endif
303
304
305
306
307 ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
308
309 MPI_Type_size_x(fd->filetype, &filetype_size);
310 MPI_Type_get_extent(fd->filetype, &filetype_lb, &filetype_extent);
311 MPI_Type_size_x(datatype, &buftype_size);
312 etype_size = fd->etype_size;
313
314 if ( ! filetype_size ) {
315 *contig_access_count_ptr = 0;
316 *offset_list_ptr = (ADIO_Offset *) ADIOI_Malloc(2*sizeof(ADIO_Offset));
317 *len_list_ptr = (ADIO_Offset *) ADIOI_Malloc(2*sizeof(ADIO_Offset));
318
319
320 offset_list = *offset_list_ptr;
321 len_list = *len_list_ptr;
322 offset_list[0] = (file_ptr_type == ADIO_INDIVIDUAL) ? fd->fp_ind :
323 fd->disp + (ADIO_Offset)etype_size * offset;
324 len_list[0] = 0;
325 *start_offset_ptr = offset_list[0];
326 *end_offset_ptr = offset_list[0] + len_list[0] - 1;
327
328 return;
329 }
330
331 if (filetype_is_contig) {
332 *contig_access_count_ptr = 1;
333 *offset_list_ptr = (ADIO_Offset *) ADIOI_Malloc(2*sizeof(ADIO_Offset));
334 *len_list_ptr = (ADIO_Offset *) ADIOI_Malloc(2*sizeof(ADIO_Offset));
335
336
337 offset_list = *offset_list_ptr;
338 len_list = *len_list_ptr;
339 offset_list[0] = (file_ptr_type == ADIO_INDIVIDUAL) ? fd->fp_ind :
340 fd->disp + (ADIO_Offset)etype_size * offset;
341 len_list[0] = (ADIO_Offset)bufcount * (ADIO_Offset)buftype_size;
342 *start_offset_ptr = offset_list[0];
343 *end_offset_ptr = offset_list[0] + len_list[0] - 1;
344
345
346 if (file_ptr_type == ADIO_INDIVIDUAL) fd->fp_ind = *end_offset_ptr + 1;
347 }
348
349 else {
350
351
352
353
354 flat_file = ADIOI_Flatlist;
355 while (flat_file->type != fd->filetype) flat_file = flat_file->next;
356 disp = fd->disp;
357
358 #ifdef RDCOLL_DEBUG
359 {
360 int ii;
361 DBG_FPRINTF(stderr, "flattened %3lld : ", flat_file->count );
362 for (ii=0; ii<flat_file->count; ii++) {
363 DBG_FPRINTF(stderr, "%16lld:%-16lld", flat_file->indices[ii], flat_file->blocklens[ii] );
364 }
365 DBG_FPRINTF(stderr, "\n" );
366 }
367 #endif
368 if (file_ptr_type == ADIO_INDIVIDUAL) {
369
370 offset = fd->fp_ind - disp;
371 n_filetypes = (offset - flat_file->indices[0]) / filetype_extent;
372 offset -= (ADIO_Offset)n_filetypes * filetype_extent;
373
374
375
376 for (i=0; i<flat_file->count; i++) {
377 ADIO_Offset dist;
378 if (flat_file->blocklens[i] == 0) continue;
379 dist = flat_file->indices[i] + flat_file->blocklens[i] - offset;
380
381 if (dist == 0) {
382 i++;
383 offset = flat_file->indices[i];
384 frd_size = flat_file->blocklens[i];
385 break;
386 }
387 if (dist > 0) {
388 frd_size = dist;
389 break;
390 }
391 }
392 st_index = i;
393 offset += disp + (ADIO_Offset)n_filetypes*filetype_extent;
394 }
395 else {
396 n_etypes_in_filetype = filetype_size/etype_size;
397 n_filetypes = offset / n_etypes_in_filetype;
398 etype_in_filetype = offset % n_etypes_in_filetype;
399 size_in_filetype = etype_in_filetype * etype_size;
400
401 sum = 0;
402 for (i=0; i<flat_file->count; i++) {
403 sum += flat_file->blocklens[i];
404 if (sum > size_in_filetype) {
405 st_index = i;
406 frd_size = sum - size_in_filetype;
407 abs_off_in_filetype = flat_file->indices[i] +
408 size_in_filetype - (sum - flat_file->blocklens[i]);
409 break;
410 }
411 }
412
413
414 offset = disp + n_filetypes* (ADIO_Offset)filetype_extent +
415 abs_off_in_filetype;
416 }
417
418
419
420 old_frd_size = frd_size;
421 contig_access_count = i_offset = 0;
422 j = st_index;
423 bufsize = (ADIO_Offset)buftype_size * (ADIO_Offset)bufcount;
424 frd_size = ADIOI_MIN(frd_size, bufsize);
425 while (i_offset < bufsize) {
426 if (frd_size) contig_access_count++;
427 i_offset += frd_size;
428 j = (j + 1) % flat_file->count;
429 frd_size = ADIOI_MIN(flat_file->blocklens[j], bufsize-i_offset);
430 }
431
432
433
434 *offset_list_ptr = (ADIO_Offset *)
435 ADIOI_Malloc((contig_access_count+1)*sizeof(ADIO_Offset));
436 *len_list_ptr = (ADIO_Offset *) ADIOI_Malloc((contig_access_count+1)*sizeof(ADIO_Offset));
437
438
439 offset_list = *offset_list_ptr;
440 len_list = *len_list_ptr;
441
442
443
444 *start_offset_ptr = offset;
445
446 i_offset = k = 0;
447 j = st_index;
448 off = offset;
449 frd_size = ADIOI_MIN(old_frd_size, bufsize);
450 while (i_offset < bufsize) {
451 if (frd_size) {
452 offset_list[k] = off;
453 len_list[k] = frd_size;
454 k++;
455 }
456 i_offset += frd_size;
457 end_offset = off + frd_size - 1;
458
459
460
461
462 if (off + frd_size < disp + flat_file->indices[j] +
463 flat_file->blocklens[j] +
464 n_filetypes* (ADIO_Offset)filetype_extent)
465 {
466 off += frd_size;
467
468
469
470 }
471 else {
472 j = (j+1) % flat_file->count;
473 n_filetypes += (j == 0) ? 1 : 0;
474 while (flat_file->blocklens[j]==0) {
475 j = (j+1) % flat_file->count;
476 n_filetypes += (j == 0) ? 1 : 0;
477
478
479 }
480 off = disp + flat_file->indices[j] +
481 n_filetypes* (ADIO_Offset)filetype_extent;
482 frd_size = ADIOI_MIN(flat_file->blocklens[j], bufsize-i_offset);
483 }
484 }
485
486
487 if (file_ptr_type == ADIO_INDIVIDUAL) fd->fp_ind = off;
488
489 *contig_access_count_ptr = contig_access_count;
490 *end_offset_ptr = end_offset;
491 }
492 #ifdef AGGREGATION_PROFILE
493 MPE_Log_event (5029, 0, NULL);
494 #endif
495 }
496
497 static void ADIOI_Read_and_exch(ADIO_File fd, void *buf, MPI_Datatype
498 datatype, int nprocs,
499 int myrank, ADIOI_Access
500 *others_req, ADIO_Offset *offset_list,
501 ADIO_Offset *len_list, int contig_access_count, ADIO_Offset
502 min_st_offset, ADIO_Offset fd_size,
503 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
504 int *buf_idx, int *error_code)
505 {
506
507
508
509
510
511
512
513
514
515
516 int i, j, m, ntimes, max_ntimes, buftype_is_contig;
517 ADIO_Offset st_loc=-1, end_loc=-1, off, done, real_off, req_off;
518 char *read_buf = NULL, *tmp_buf;
519 int *curr_offlen_ptr, *count, *send_size, *recv_size;
520 int *partial_send, *recd_from_proc, *start_pos;
521
522 ADIO_Offset real_size, size, for_curr_iter, for_next_iter;
523 int req_len, flag, rank;
524 MPI_Status status;
525 ADIOI_Flatlist_node *flat_buf=NULL;
526 MPI_Aint buftype_extent, lb;
527 int coll_bufsize;
528
529 *error_code = MPI_SUCCESS;
530
531
532
533
534
535
536
537 coll_bufsize = fd->hints->cb_buffer_size;
538
539
540 for (i=0; i < nprocs; i++) {
541 if (others_req[i].count) {
542 st_loc = others_req[i].offsets[0];
543 end_loc = others_req[i].offsets[0];
544 break;
545 }
546 }
547
548
549 for (i=0; i < nprocs; i++)
550 for (j=0; j<others_req[i].count; j++) {
551 st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);
552 end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j]
553 + others_req[i].lens[j] - 1));
554 }
555
556
557
558
559
560
561 if ((st_loc==-1) && (end_loc==-1)) {
562
563 ntimes = 0;
564 }
565 else {
566
567 ntimes = (int) ((end_loc - st_loc + coll_bufsize)/coll_bufsize);
568 }
569
570 MPI_Allreduce(&ntimes, &max_ntimes, 1, MPI_INT, MPI_MAX, fd->comm);
571
572 read_buf = fd->io_buf;
573
574 curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
575
576
577 count = (int *) ADIOI_Malloc(nprocs * sizeof(int));
578
579
580
581 partial_send = (int *) ADIOI_Calloc(nprocs, sizeof(int));
582
583
584
585
586 send_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
587
588
589 recv_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
590
591
592
593 recd_from_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int));
594
595
596
597 start_pos = (int *) ADIOI_Malloc(nprocs*sizeof(int));
598
599
600
601 ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
602 if (!buftype_is_contig) {
603 flat_buf = ADIOI_Flatten_and_find(datatype);
604 }
605 MPI_Type_get_extent(datatype, &lb, &buftype_extent);
606
607 done = 0;
608 off = st_loc;
609 for_curr_iter = for_next_iter = 0;
610
611 MPI_Comm_rank(fd->comm, &rank);
612
613 for (m=0; m<ntimes; m++) {
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651 size = ADIOI_MIN((unsigned)coll_bufsize, end_loc-st_loc+1-done);
652 real_off = off - for_curr_iter;
653 real_size = size + for_curr_iter;
654
655 for (i=0; i<nprocs; i++) count[i] = send_size[i] = 0;
656 for_next_iter = 0;
657
658 for (i=0; i<nprocs; i++) {
659 #ifdef RDCOLL_DEBUG
660 DBG_FPRINTF(stderr, "rank %d, i %d, others_count %d\n", rank, i, others_req[i].count);
661 #endif
662 if (others_req[i].count) {
663 start_pos[i] = curr_offlen_ptr[i];
664 for (j=curr_offlen_ptr[i]; j<others_req[i].count;
665 j++) {
666 if (partial_send[i]) {
667
668
669 req_off = others_req[i].offsets[j] +
670 partial_send[i];
671 req_len = others_req[i].lens[j] -
672 partial_send[i];
673 partial_send[i] = 0;
674
675 others_req[i].offsets[j] = req_off;
676 others_req[i].lens[j] = req_len;
677 }
678 else {
679 req_off = others_req[i].offsets[j];
680 req_len = others_req[i].lens[j];
681 }
682 if (req_off < real_off + real_size) {
683 count[i]++;
684 ADIOI_Assert((((ADIO_Offset)(MPIU_Upint)read_buf)+req_off-real_off) == (ADIO_Offset)(MPIU_Upint)(read_buf+req_off-real_off));
685 MPI_Get_address(read_buf+req_off-real_off,
686 &(others_req[i].mem_ptrs[j]));
687 ADIOI_Assert((real_off + real_size - req_off) == (int)(real_off + real_size - req_off));
688 send_size[i] += (int)(ADIOI_MIN(real_off + real_size - req_off,
689 (ADIO_Offset)(unsigned)req_len));
690
691 if (real_off+real_size-req_off < (ADIO_Offset)(unsigned)req_len) {
692 partial_send[i] = (int) (real_off + real_size - req_off);
693 if ((j+1 < others_req[i].count) &&
694 (others_req[i].offsets[j+1] <
695 real_off+real_size)) {
696
697
698 for_next_iter = ADIOI_MAX(for_next_iter,
699 real_off + real_size - others_req[i].offsets[j+1]);
700
701
702 }
703 break;
704 }
705 }
706 else break;
707 }
708 curr_offlen_ptr[i] = j;
709 }
710 }
711
712 flag = 0;
713 for (i=0; i<nprocs; i++)
714 if (count[i]) flag = 1;
715
716 if (flag) {
717 ADIOI_Assert(size == (int)size);
718 ADIO_ReadContig(fd, read_buf+for_curr_iter, (int)size, MPI_BYTE,
719 ADIO_EXPLICIT_OFFSET, off, &status, error_code);
720 if (*error_code != MPI_SUCCESS) return;
721 }
722
723 for_curr_iter = for_next_iter;
724
725 ADIOI_R_Exchange_data(fd, buf, flat_buf, offset_list, len_list,
726 send_size, recv_size, count,
727 start_pos, partial_send, recd_from_proc, nprocs,
728 myrank,
729 buftype_is_contig, contig_access_count,
730 min_st_offset, fd_size, fd_start, fd_end,
731 others_req,
732 m, buftype_extent, buf_idx);
733
734
735 if (for_next_iter) {
736 tmp_buf = (char *) ADIOI_Malloc(for_next_iter);
737 ADIOI_Assert((((ADIO_Offset)(MPIU_Upint)read_buf)+real_size-for_next_iter) == (ADIO_Offset)(MPIU_Upint)(read_buf+real_size-for_next_iter));
738 ADIOI_Assert((ADIO_Size) (for_next_iter+coll_bufsize) == (size_t)(for_next_iter+coll_bufsize));
739 memcpy(tmp_buf, read_buf+real_size-for_next_iter, for_next_iter);
740 ADIOI_Free(fd->io_buf);
741 fd->io_buf = (char *) ADIOI_Malloc(for_next_iter+coll_bufsize);
742 memcpy(fd->io_buf, tmp_buf, for_next_iter);
743 read_buf = fd->io_buf;
744 ADIOI_Free(tmp_buf);
745 }
746
747 off += size;
748 done += size;
749 }
750
751 for (i=0; i<nprocs; i++) count[i] = send_size[i] = 0;
752 for (m=ntimes; m<max_ntimes; m++)
753
754 ADIOI_R_Exchange_data(fd, buf, flat_buf, offset_list, len_list,
755 send_size, recv_size, count,
756 start_pos, partial_send, recd_from_proc, nprocs,
757 myrank,
758 buftype_is_contig, contig_access_count,
759 min_st_offset, fd_size, fd_start, fd_end,
760 others_req, m,
761 buftype_extent, buf_idx);
762
763 ADIOI_Free(curr_offlen_ptr);
764 ADIOI_Free(count);
765 ADIOI_Free(partial_send);
766 ADIOI_Free(send_size);
767 ADIOI_Free(recv_size);
768 ADIOI_Free(recd_from_proc);
769 ADIOI_Free(start_pos);
770 }
771
772 static void ADIOI_R_Exchange_data(ADIO_File fd, void *buf, ADIOI_Flatlist_node
773 *flat_buf, ADIO_Offset *offset_list, ADIO_Offset
774 *len_list, int *send_size, int *recv_size,
775 int *count, int *start_pos, int *partial_send,
776 int *recd_from_proc, int nprocs,
777 int myrank, int
778 buftype_is_contig, int contig_access_count,
779 ADIO_Offset min_st_offset, ADIO_Offset fd_size,
780 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
781 ADIOI_Access *others_req,
782 int iter, MPI_Aint buftype_extent, int *buf_idx)
783 {
784 int i, j, k=0, tmp=0, nprocs_recv, nprocs_send;
785 char **recv_buf = NULL;
786 MPI_Request *requests;
787 MPI_Datatype send_type;
788 MPI_Status *statuses;
789
790
791
792
793 MPI_Alltoall(send_size, 1, MPI_INT, recv_size, 1, MPI_INT, fd->comm);
794
795 nprocs_recv = 0;
796 for (i=0; i < nprocs; i++) if (recv_size[i]) nprocs_recv++;
797
798 nprocs_send = 0;
799 for (i=0; i<nprocs; i++) if (send_size[i]) nprocs_send++;
800
801 requests = (MPI_Request *)
802 ADIOI_Malloc((nprocs_send+nprocs_recv+1)*sizeof(MPI_Request));
803
804
805
806
807
808 #ifdef AGGREGATION_PROFILE
809 MPE_Log_event (5032, 0, NULL);
810 #endif
811
812 if (buftype_is_contig) {
813 j = 0;
814 for (i=0; i < nprocs; i++)
815 if (recv_size[i]) {
816 MPI_Irecv(((char *) buf) + buf_idx[i], recv_size[i],
817 MPI_BYTE, i, myrank+i+100*iter, fd->comm, requests+j);
818 j++;
819 buf_idx[i] += recv_size[i];
820 }
821 }
822 else {
823
824 recv_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char*));
825 for (i=0; i < nprocs; i++)
826 if (recv_size[i]) recv_buf[i] =
827 (char *) ADIOI_Malloc(recv_size[i]);
828
829 j = 0;
830 for (i=0; i < nprocs; i++)
831 if (recv_size[i]) {
832 MPI_Irecv(recv_buf[i], recv_size[i], MPI_BYTE, i,
833 myrank+i+100*iter, fd->comm, requests+j);
834 j++;
835 #ifdef RDCOLL_DEBUG
836 DBG_FPRINTF(stderr, "node %d, recv_size %d, tag %d \n",
837 myrank, recv_size[i], myrank+i+100*iter);
838 #endif
839 }
840 }
841
842
843
844 j = 0;
845 for (i=0; i<nprocs; i++) {
846 if (send_size[i]) {
847
848 if (partial_send[i]) {
849 k = start_pos[i] + count[i] - 1;
850 tmp = others_req[i].lens[k];
851 others_req[i].lens[k] = partial_send[i];
852 }
853 ADIOI_Type_create_hindexed_x(count[i],
854 &(others_req[i].lens[start_pos[i]]),
855 &(others_req[i].mem_ptrs[start_pos[i]]),
856 MPI_BYTE, &send_type);
857
858 MPI_Type_commit(&send_type);
859 MPI_Isend(MPI_BOTTOM, 1, send_type, i, myrank+i+100*iter,
860 fd->comm, requests+nprocs_recv+j);
861 MPI_Type_free(&send_type);
862 if (partial_send[i]) others_req[i].lens[k] = tmp;
863 j++;
864 }
865 }
866
867 statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send+nprocs_recv+1) * \
868 sizeof(MPI_Status));
869
870
871
872 if (nprocs_recv) {
873 #ifdef NEEDS_MPI_TEST
874 j = 0;
875 while (!j) MPI_Testall(nprocs_recv, requests, &j, statuses);
876 #else
877 MPI_Waitall(nprocs_recv, requests, statuses);
878 #endif
879
880
881 if (!buftype_is_contig)
882 ADIOI_Fill_user_buffer(fd, buf, flat_buf, recv_buf,
883 offset_list, len_list, (unsigned*)recv_size,
884 requests, statuses, recd_from_proc,
885 nprocs, contig_access_count,
886 min_st_offset, fd_size, fd_start, fd_end,
887 buftype_extent);
888 }
889
890
891 MPI_Waitall(nprocs_send, requests+nprocs_recv, statuses+nprocs_recv);
892
893 ADIOI_Free(statuses);
894 ADIOI_Free(requests);
895
896 if (!buftype_is_contig) {
897 for (i=0; i < nprocs; i++)
898 if (recv_size[i]) ADIOI_Free(recv_buf[i]);
899 ADIOI_Free(recv_buf);
900 }
901 #ifdef AGGREGATION_PROFILE
902 MPE_Log_event (5033, 0, NULL);
903 #endif
904 }
905
906 #define ADIOI_BUF_INCR \
907 { \
908 while (buf_incr) { \
909 size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \
910 user_buf_idx += size_in_buf; \
911 flat_buf_sz -= size_in_buf; \
912 if (!flat_buf_sz) { \
913 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
914 else { \
915 flat_buf_idx = 0; \
916 n_buftypes++; \
917 } \
918 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
919 (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
920 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
921 } \
922 buf_incr -= size_in_buf; \
923 } \
924 }
925
926
927 #define ADIOI_BUF_COPY \
928 { \
929 while (size) { \
930 size_in_buf = ADIOI_MIN(size, flat_buf_sz); \
931 ADIOI_Assert((((ADIO_Offset)(MPIU_Upint)buf) + user_buf_idx) == (ADIO_Offset)(MPIU_Upint)((MPIU_Upint)buf + user_buf_idx)); \
932 ADIOI_Assert((ADIO_Size) size_in_buf == (size_t)size_in_buf); \
933 memcpy(((char *) buf) + user_buf_idx, \
934 &(recv_buf[p][recv_buf_idx[p]]), size_in_buf); \
935 recv_buf_idx[p] += size_in_buf; \
936 user_buf_idx += size_in_buf; \
937 flat_buf_sz -= size_in_buf; \
938 if (!flat_buf_sz) { \
939 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
940 else { \
941 flat_buf_idx = 0; \
942 n_buftypes++; \
943 } \
944 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
945 (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
946 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
947 } \
948 size -= size_in_buf; \
949 buf_incr -= size_in_buf; \
950 } \
951 ADIOI_BUF_INCR \
952 }
953
954 void ADIOI_Fill_user_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
955 *flat_buf, char **recv_buf, ADIO_Offset
956 *offset_list, ADIO_Offset *len_list,
957 unsigned *recv_size,
958 MPI_Request *requests, MPI_Status *statuses,
959 int *recd_from_proc, int nprocs,
960 int contig_access_count,
961 ADIO_Offset min_st_offset,
962 ADIO_Offset fd_size, ADIO_Offset *fd_start,
963 ADIO_Offset *fd_end,
964 MPI_Aint buftype_extent)
965 {
966
967
968
969 int i, p, flat_buf_idx;
970 ADIO_Offset flat_buf_sz, size_in_buf, buf_incr, size;
971 int n_buftypes;
972 ADIO_Offset off, len, rem_len, user_buf_idx;
973
974 unsigned *curr_from_proc, *done_from_proc, *recv_buf_idx;
975
976 ADIOI_UNREFERENCED_ARG(requests);
977 ADIOI_UNREFERENCED_ARG(statuses);
978
979
980
981
982
983
984
985 curr_from_proc = (unsigned *) ADIOI_Malloc(nprocs * sizeof(unsigned));
986 done_from_proc = (unsigned *) ADIOI_Malloc(nprocs * sizeof(unsigned));
987 recv_buf_idx = (unsigned *) ADIOI_Malloc(nprocs * sizeof(unsigned));
988
989 for (i=0; i < nprocs; i++) {
990 recv_buf_idx[i] = curr_from_proc[i] = 0;
991 done_from_proc[i] = recd_from_proc[i];
992 }
993
994 user_buf_idx = flat_buf->indices[0];
995 flat_buf_idx = 0;
996 n_buftypes = 0;
997 flat_buf_sz = flat_buf->blocklens[0];
998
999
1000
1001
1002
1003 for (i=0; i<contig_access_count; i++) {
1004 off = offset_list[i];
1005 rem_len = len_list[i];
1006
1007
1008 while (rem_len != 0) {
1009 len = rem_len;
1010
1011
1012
1013
1014 p = ADIOI_Calc_aggregator(fd,
1015 off,
1016 min_st_offset,
1017 &len,
1018 fd_size,
1019 fd_start,
1020 fd_end);
1021
1022 if (recv_buf_idx[p] < recv_size[p]) {
1023 if (curr_from_proc[p]+len > done_from_proc[p]) {
1024 if (done_from_proc[p] > curr_from_proc[p]) {
1025 size = ADIOI_MIN(curr_from_proc[p] + len -
1026 done_from_proc[p], recv_size[p]-recv_buf_idx[p]);
1027 buf_incr = done_from_proc[p] - curr_from_proc[p];
1028 ADIOI_BUF_INCR
1029 buf_incr = curr_from_proc[p]+len-done_from_proc[p];
1030 ADIOI_Assert((done_from_proc[p] + size) == (unsigned)((ADIO_Offset)done_from_proc[p] + size));
1031 curr_from_proc[p] = done_from_proc[p] + size;
1032 ADIOI_BUF_COPY
1033 }
1034 else {
1035 size = ADIOI_MIN(len,recv_size[p]-recv_buf_idx[p]);
1036 buf_incr = len;
1037 ADIOI_Assert((curr_from_proc[p] + size) == (unsigned)((ADIO_Offset)curr_from_proc[p] + size));
1038 curr_from_proc[p] += (unsigned) size;
1039 ADIOI_BUF_COPY
1040 }
1041 }
1042 else {
1043 ADIOI_Assert((curr_from_proc[p] + len) == (unsigned)((ADIO_Offset)curr_from_proc[p] + len));
1044 curr_from_proc[p] += (unsigned) len;
1045 buf_incr = len;
1046 ADIOI_BUF_INCR
1047 }
1048 }
1049 else {
1050 buf_incr = len;
1051 ADIOI_BUF_INCR
1052 }
1053 off += len;
1054 rem_len -= len;
1055 }
1056 }
1057 for (i=0; i < nprocs; i++)
1058 if (recv_size[i]) recd_from_proc[i] = curr_from_proc[i];
1059
1060 ADIOI_Free(curr_from_proc);
1061 ADIOI_Free(done_from_proc);
1062 ADIOI_Free(recv_buf_idx);
1063 }