This source file includes following definitions.
- ADIOI_GPFS_ReadStridedColl
- ADIOI_Read_and_exch
- ADIOI_R_Exchange_data
- ADIOI_Fill_user_buffer
- ADIOI_R_Exchange_data_alltoallv
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 #include "adio.h"
17 #include "adio_extern.h"
18 #include "ad_gpfs.h"
19 #include "ad_gpfs_aggrs.h"
20
21 #ifdef PROFILE
22 #include "mpe.h"
23 #endif
24
25 #ifdef USE_DBG_LOGGING
26 #define RDCOLL_DEBUG 1
27 #endif
28 #ifdef AGGREGATION_PROFILE
29 #include "mpe.h"
30 #endif
31
32
33 static void ADIOI_Read_and_exch(ADIO_File fd, void *buf, MPI_Datatype
34 datatype, int nprocs,
35 int myrank, ADIOI_Access
36 *others_req, ADIO_Offset *offset_list,
37 ADIO_Offset *len_list, int contig_access_count,
38 ADIO_Offset
39 min_st_offset, ADIO_Offset fd_size,
40 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
41 int *buf_idx, int *error_code);
42 static void ADIOI_R_Exchange_data(ADIO_File fd, void *buf, ADIOI_Flatlist_node
43 *flat_buf, ADIO_Offset *offset_list, ADIO_Offset
44 *len_list, int *send_size, int *recv_size,
45 int *count, int *start_pos,
46 int *partial_send,
47 int *recd_from_proc, int nprocs,
48 int myrank, int
49 buftype_is_contig, int contig_access_count,
50 ADIO_Offset min_st_offset,
51 ADIO_Offset fd_size,
52 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
53 ADIOI_Access *others_req,
54 int iter,
55 MPI_Aint buftype_extent, int *buf_idx);
56 static void ADIOI_R_Exchange_data_alltoallv(ADIO_File fd, void *buf, ADIOI_Flatlist_node
57 *flat_buf, ADIO_Offset *offset_list, ADIO_Offset
58 *len_list, int *send_size, int *recv_size,
59 int *count, int *start_pos,
60 int *partial_send,
61 int *recd_from_proc, int nprocs,
62 int myrank, int
63 buftype_is_contig, int contig_access_count,
64 ADIO_Offset min_st_offset,
65 ADIO_Offset fd_size,
66 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
67 ADIOI_Access *others_req,
68 int iter,
69 MPI_Aint buftype_extent, int *buf_idx);
70 static void ADIOI_Fill_user_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
71 *flat_buf, char **recv_buf, ADIO_Offset
72 *offset_list, ADIO_Offset *len_list,
73 unsigned *recv_size,
74 MPI_Request *requests, MPI_Status *statuses,
75 int *recd_from_proc, int nprocs,
76 int contig_access_count,
77 ADIO_Offset min_st_offset,
78 ADIO_Offset fd_size, ADIO_Offset *fd_start,
79 ADIO_Offset *fd_end,
80 MPI_Aint buftype_extent);
81
82 extern void ADIOI_Calc_my_off_len(ADIO_File fd, int bufcount, MPI_Datatype
83 datatype, int file_ptr_type, ADIO_Offset
84 offset, ADIO_Offset **offset_list_ptr, ADIO_Offset
85 **len_list_ptr, ADIO_Offset *start_offset_ptr,
86 ADIO_Offset *end_offset_ptr, int
87 *contig_access_count_ptr);
88
89
90
91 void ADIOI_GPFS_ReadStridedColl(ADIO_File fd, void *buf, int count,
92 MPI_Datatype datatype, int file_ptr_type,
93 ADIO_Offset offset, ADIO_Status *status, int
94 *error_code)
95 {
96
97
98
99
100
101
102 ADIOI_Access *my_req;
103
104
105
106 ADIOI_Access *others_req;
107
108
109
110 int i, filetype_is_contig, nprocs, nprocs_for_coll, myrank;
111 int contig_access_count=0, interleave_count = 0, buftype_is_contig;
112 int *count_my_req_per_proc, count_my_req_procs, count_others_req_procs;
113 ADIO_Offset start_offset, end_offset, orig_fp, fd_size, min_st_offset, off;
114 ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *fd_start = NULL,
115 *fd_end = NULL, *end_offsets = NULL;
116 ADIO_Offset *gpfs_offsets0 = NULL, *gpfs_offsets = NULL;
117 ADIO_Offset *count_sizes;
118 int ii;
119 ADIO_Offset *len_list = NULL;
120 int *buf_idx = NULL;
121
122 GPFSMPIO_T_CIO_RESET( r);
123
124 #ifdef HAVE_STATUS_SET_BYTES
125 MPI_Count bufsize, size;
126 #endif
127
128 #if 0
129
130 if (fd->hints->cb_pfr != ADIOI_HINT_DISABLE) {
131 ADIOI_IOStridedColl (fd, buf, count, ADIOI_READ, datatype,
132 file_ptr_type, offset, status, error_code);
133 return;
134 } */
135 #endif
136 #ifdef PROFILE
137 MPE_Log_event(13, 0, "start computation");
138 #endif
139
140 MPI_Comm_size(fd->comm, &nprocs);
141 MPI_Comm_rank(fd->comm, &myrank);
142
143
144 nprocs_for_coll = fd->hints->cb_nodes;
145 orig_fp = fd->fp_ind;
146
147 GPFSMPIO_T_CIO_SET_GET( r, 1, 0, GPFSMPIO_CIO_T_MPIO_CRW, GPFSMPIO_CIO_LAST)
148 GPFSMPIO_T_CIO_SET_GET( r, 1, 0, GPFSMPIO_CIO_T_LCOMP, GPFSMPIO_CIO_LAST )
149
150
151 if (fd->hints->cb_read != ADIOI_HINT_DISABLE) {
152
153
154
155
156
157
158 ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,
159 &offset_list, &len_list, &start_offset,
160 &end_offset, &contig_access_count);
161
162 GPFSMPIO_T_CIO_SET_GET( r, 1, 1, GPFSMPIO_CIO_T_GATHER, GPFSMPIO_CIO_T_LCOMP )
163
164 #ifdef RDCOLL_DEBUG
165 for (i=0; i<contig_access_count; i++) {
166 DBG_FPRINTF(stderr, "rank %d off %lld len %lld\n",
167 myrank, offset_list[i], len_list[i]);
168 }
169 #endif
170
171
172
173
174
175 st_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
176 end_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
177
178 ADIO_Offset my_count_size=0;
179
180
181
182 if ((gpfsmpio_read_aggmethod == 1) || (gpfsmpio_read_aggmethod == 2)) {
183 count_sizes = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
184 MPI_Count buftype_size;
185 MPI_Type_size_x(datatype, &buftype_size);
186 my_count_size = (ADIO_Offset) count * (ADIO_Offset)buftype_size;
187 }
188 if (gpfsmpio_tunegather) {
189 if ((gpfsmpio_read_aggmethod == 1) || (gpfsmpio_read_aggmethod == 2)) {
190 gpfs_offsets0 = (ADIO_Offset *) ADIOI_Malloc(3*nprocs*sizeof(ADIO_Offset));
191 gpfs_offsets = (ADIO_Offset *) ADIOI_Malloc(3*nprocs*sizeof(ADIO_Offset));
192 for (ii=0; ii<nprocs; ii++) {
193 gpfs_offsets0[ii*3] = 0;
194 gpfs_offsets0[ii*3+1] = 0;
195 gpfs_offsets0[ii*3+2] = 0;
196 }
197 gpfs_offsets0[myrank*3] = start_offset;
198 gpfs_offsets0[myrank*3+1] = end_offset;
199 gpfs_offsets0[myrank*3+2] = my_count_size;
200 MPI_Allreduce( gpfs_offsets0, gpfs_offsets, nprocs*3, ADIO_OFFSET, MPI_MAX, fd->comm );
201 for (ii=0; ii<nprocs; ii++) {
202 st_offsets [ii] = gpfs_offsets[ii*3] ;
203 end_offsets[ii] = gpfs_offsets[ii*3+1];
204 count_sizes[ii] = gpfs_offsets[ii*3+2];
205 }
206 }
207 else {
208 gpfs_offsets0 = (ADIO_Offset *) ADIOI_Malloc(2*nprocs*sizeof(ADIO_Offset));
209 gpfs_offsets = (ADIO_Offset *) ADIOI_Malloc(2*nprocs*sizeof(ADIO_Offset));
210 for (ii=0; ii<nprocs; ii++) {
211 gpfs_offsets0[ii*2] = 0;
212 gpfs_offsets0[ii*2+1] = 0;
213 }
214 gpfs_offsets0[myrank*2] = start_offset;
215 gpfs_offsets0[myrank*2+1] = end_offset;
216
217 MPI_Allreduce( gpfs_offsets0, gpfs_offsets, nprocs*2, ADIO_OFFSET, MPI_MAX, fd->comm );
218
219 for (ii=0; ii<nprocs; ii++) {
220 st_offsets [ii] = gpfs_offsets[ii*2] ;
221 end_offsets[ii] = gpfs_offsets[ii*2+1];
222 }
223 }
224 ADIOI_Free( gpfs_offsets0 );
225 ADIOI_Free( gpfs_offsets );
226 } else {
227 MPI_Allgather(&start_offset, 1, ADIO_OFFSET, st_offsets, 1,
228 ADIO_OFFSET, fd->comm);
229 MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1,
230 ADIO_OFFSET, fd->comm);
231 if ((gpfsmpio_read_aggmethod == 1) || (gpfsmpio_read_aggmethod == 2)) {
232 MPI_Allgather(&count_sizes, 1, ADIO_OFFSET, count_sizes, 1,
233 ADIO_OFFSET, fd->comm);
234 }
235 }
236
237 GPFSMPIO_T_CIO_SET_GET( r, 1, 1, GPFSMPIO_CIO_T_PATANA, GPFSMPIO_CIO_T_GATHER )
238
239
240 for (i=1; i<nprocs; i++)
241 if ((st_offsets[i] < end_offsets[i-1]) &&
242 (st_offsets[i] <= end_offsets[i]))
243 interleave_count++;
244
245
246 }
247
248 ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
249
250 if (fd->hints->cb_read == ADIOI_HINT_DISABLE
251 || (!interleave_count && (fd->hints->cb_read == ADIOI_HINT_AUTO)))
252 {
253
254 if (fd->hints->cb_read != ADIOI_HINT_DISABLE) {
255 ADIOI_Free(offset_list);
256 ADIOI_Free(len_list);
257 ADIOI_Free(st_offsets);
258 ADIOI_Free(end_offsets);
259 }
260
261 fd->fp_ind = orig_fp;
262 ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
263
264 if (buftype_is_contig && filetype_is_contig) {
265 if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
266 off = fd->disp + (ADIO_Offset)(fd->etype_size) * offset;
267 ADIO_ReadContig(fd, buf, count, datatype, ADIO_EXPLICIT_OFFSET,
268 off, status, error_code);
269 }
270 else ADIO_ReadContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
271 0, status, error_code);
272 }
273 else ADIO_ReadStrided(fd, buf, count, datatype, file_ptr_type,
274 offset, status, error_code);
275
276 return;
277 }
278
279 GPFSMPIO_T_CIO_SET_GET( r, 1, 1, GPFSMPIO_CIO_T_FD_PART, GPFSMPIO_CIO_T_PATANA )
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297 int currentNonZeroDataIndex = 0;
298 if ((gpfsmpio_read_aggmethod == 1) || (gpfsmpio_read_aggmethod == 2)) {
299
300
301
302
303
304
305
306 for (i=0; i<nprocs; i++) {
307 if (count_sizes[i] > 0) {
308 st_offsets[currentNonZeroDataIndex] = st_offsets[i];
309 end_offsets[currentNonZeroDataIndex] = end_offsets[i];
310 currentNonZeroDataIndex++;
311 }
312 }
313 }
314 if (gpfsmpio_tuneblocking) {
315 if ((gpfsmpio_read_aggmethod == 1) || (gpfsmpio_read_aggmethod == 2)) {
316 ADIOI_GPFS_Calc_file_domains(fd, st_offsets, end_offsets, currentNonZeroDataIndex,
317 nprocs_for_coll, &min_st_offset,
318 &fd_start, &fd_end, &fd_size, fd->fs_ptr);
319 }
320 else {
321 ADIOI_GPFS_Calc_file_domains(fd, st_offsets, end_offsets, nprocs,
322 nprocs_for_coll, &min_st_offset,
323 &fd_start, &fd_end, &fd_size, fd->fs_ptr);
324 }
325 }
326 else {
327 if ((gpfsmpio_read_aggmethod == 1) || (gpfsmpio_read_aggmethod == 2)) {
328 ADIOI_Calc_file_domains(st_offsets, end_offsets, currentNonZeroDataIndex,
329 nprocs_for_coll, &min_st_offset,
330 &fd_start, &fd_end,
331 fd->hints->min_fdomain_size, &fd_size,
332 fd->hints->striping_unit);
333 }
334 else {
335 ADIOI_Calc_file_domains(st_offsets, end_offsets, nprocs,
336 nprocs_for_coll, &min_st_offset,
337 &fd_start, &fd_end,
338 fd->hints->min_fdomain_size, &fd_size,
339 fd->hints->striping_unit);
340 }
341 }
342
343 GPFSMPIO_T_CIO_SET_GET( r, 1, 1, GPFSMPIO_CIO_T_MYREQ, GPFSMPIO_CIO_T_FD_PART );
344 if ((gpfsmpio_read_aggmethod == 1) || (gpfsmpio_read_aggmethod == 2)) {
345
346
347
348 ADIOI_OneSidedReadAggregation(fd, offset_list, len_list, contig_access_count, buf,
349 datatype,error_code, st_offsets, end_offsets, currentNonZeroDataIndex, fd_start, fd_end);
350 GPFSMPIO_T_CIO_REPORT( 0, fd, myrank, nprocs)
351 ADIOI_Free(offset_list);
352 ADIOI_Free(len_list);
353 ADIOI_Free(st_offsets);
354 ADIOI_Free(end_offsets);
355 ADIOI_Free(fd_start);
356 ADIOI_Free(fd_end);
357 ADIOI_Free(count_sizes);
358 goto fn_exit;
359 }
360 if (gpfsmpio_p2pcontig==1) {
361
362
363
364
365
366
367
368 int x, inOrderAndNoGaps = 1;
369 for (x=0;x<(nprocs-1);x++) {
370 if (end_offsets[x] != (st_offsets[x+1]-1))
371 inOrderAndNoGaps = 0;
372 }
373 if (inOrderAndNoGaps && buftype_is_contig) {
374
375
376 ADIOI_P2PContigReadAggregation(fd, buf,
377 error_code, st_offsets, end_offsets, fd_start, fd_end);
378
379
380 GPFSMPIO_T_CIO_REPORT( 0, fd, myrank, nprocs)
381
382 ADIOI_Free(offset_list);
383 ADIOI_Free(len_list);
384 ADIOI_Free(st_offsets);
385 ADIOI_Free(end_offsets);
386 ADIOI_Free(fd_start);
387 ADIOI_Free(fd_end);
388 goto fn_exit;
389
390 }
391 }
392
393
394
395
396
397
398
399
400
401
402
403
404
405 if (gpfsmpio_tuneblocking)
406 ADIOI_GPFS_Calc_my_req(fd, offset_list, len_list, contig_access_count,
407 min_st_offset, fd_start, fd_end, fd_size,
408 nprocs, &count_my_req_procs,
409 &count_my_req_per_proc, &my_req,
410 &buf_idx);
411 else
412 ADIOI_Calc_my_req(fd, offset_list, len_list, contig_access_count,
413 min_st_offset, fd_start, fd_end, fd_size,
414 nprocs, &count_my_req_procs,
415 &count_my_req_per_proc, &my_req,
416 &buf_idx);
417
418 GPFSMPIO_T_CIO_SET_GET( r, 1, 1, GPFSMPIO_CIO_T_OTHREQ, GPFSMPIO_CIO_T_MYREQ )
419
420
421
422
423
424
425
426
427 if (gpfsmpio_tuneblocking)
428 ADIOI_GPFS_Calc_others_req(fd, count_my_req_procs,
429 count_my_req_per_proc, my_req,
430 nprocs, myrank, &count_others_req_procs,
431 &others_req);
432
433 else
434 ADIOI_Calc_others_req(fd, count_my_req_procs,
435 count_my_req_per_proc, my_req,
436 nprocs, myrank, &count_others_req_procs,
437 &others_req);
438
439 GPFSMPIO_T_CIO_SET_GET( r, 1, 1, GPFSMPIO_CIO_T_DEXCH, GPFSMPIO_CIO_T_OTHREQ )
440
441
442
443
444 ADIOI_Free(count_my_req_per_proc);
445 for (i=0; i<nprocs; i++) {
446 if (my_req[i].count) {
447 ADIOI_Free(my_req[i].offsets);
448 ADIOI_Free(my_req[i].lens);
449 }
450 }
451 ADIOI_Free(my_req);
452
453
454
455
456
457 ADIOI_Read_and_exch(fd, buf, datatype, nprocs, myrank,
458 others_req, offset_list,
459 len_list, contig_access_count, min_st_offset,
460 fd_size, fd_start, fd_end, buf_idx, error_code);
461
462 GPFSMPIO_T_CIO_SET_GET( r, 0, 1, GPFSMPIO_CIO_LAST, GPFSMPIO_CIO_T_DEXCH )
463 GPFSMPIO_T_CIO_SET_GET( r, 0, 1, GPFSMPIO_CIO_LAST, GPFSMPIO_CIO_T_MPIO_CRW )
464
465 GPFSMPIO_T_CIO_REPORT( 0, fd, myrank, nprocs)
466
467 if (!buftype_is_contig) ADIOI_Delete_flattened(datatype);
468
469
470 for (i=0; i<nprocs; i++) {
471 if (others_req[i].count) {
472 ADIOI_Free(others_req[i].offsets);
473 ADIOI_Free(others_req[i].lens);
474 ADIOI_Free(others_req[i].mem_ptrs);
475 }
476 }
477 ADIOI_Free(others_req);
478
479 ADIOI_Free(buf_idx);
480 ADIOI_Free(offset_list);
481 ADIOI_Free(len_list);
482 ADIOI_Free(st_offsets);
483 ADIOI_Free(end_offsets);
484 ADIOI_Free(fd_start);
485 ADIOI_Free(fd_end);
486
487 fn_exit:
488 #ifdef HAVE_STATUS_SET_BYTES
489 MPI_Type_size_x(datatype, &size);
490 bufsize = size * count;
491 MPIR_Status_set_bytes(status, datatype, bufsize);
492
493
494
495 #endif
496
497 fd->fp_sys_posn = -1;
498 }
499
500 static void ADIOI_Read_and_exch(ADIO_File fd, void *buf, MPI_Datatype
501 datatype, int nprocs,
502 int myrank, ADIOI_Access
503 *others_req, ADIO_Offset *offset_list,
504 ADIO_Offset *len_list, int contig_access_count, ADIO_Offset
505 min_st_offset, ADIO_Offset fd_size,
506 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
507 int *buf_idx, int *error_code)
508 {
509
510
511
512
513
514
515
516
517
518
519 int i, j, m, ntimes, max_ntimes, buftype_is_contig;
520 ADIO_Offset st_loc=-1, end_loc=-1, off, done, real_off, req_off;
521 char *read_buf = NULL, *tmp_buf;
522 int *curr_offlen_ptr, *count, *send_size, *recv_size;
523 int *partial_send, *recd_from_proc, *start_pos;
524
525 ADIO_Offset real_size, size, for_curr_iter, for_next_iter;
526 int req_len, flag, rank;
527 MPI_Status status;
528 ADIOI_Flatlist_node *flat_buf=NULL;
529 MPI_Aint buftype_extent, buftype_lb;
530 int coll_bufsize;
531 #ifdef RDCOLL_DEBUG
532 int iii;
533 #endif
534 *error_code = MPI_SUCCESS;
535
536
537
538
539
540
541
542 coll_bufsize = fd->hints->cb_buffer_size;
543
544
545 for (i=0; i < nprocs; i++) {
546 if (others_req[i].count) {
547 st_loc = others_req[i].offsets[0];
548 end_loc = others_req[i].offsets[0];
549 break;
550 }
551 }
552
553
554 for (i=0; i < nprocs; i++)
555 for (j=0; j<others_req[i].count; j++) {
556 st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);
557 end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j]
558 + others_req[i].lens[j] - 1));
559 }
560
561
562
563
564
565
566 if ((st_loc==-1) && (end_loc==-1)) {
567
568 ntimes = 0;
569 }
570 else {
571
572 ntimes = (int) ((end_loc - st_loc + coll_bufsize)/coll_bufsize);
573 }
574
575 MPI_Allreduce(&ntimes, &max_ntimes, 1, MPI_INT, MPI_MAX, fd->comm);
576
577 read_buf = fd->io_buf;
578
579 curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
580
581
582 count = (int *) ADIOI_Malloc(nprocs * sizeof(int));
583
584
585
586 partial_send = (int *) ADIOI_Calloc(nprocs, sizeof(int));
587
588
589
590
591 send_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
592
593
594 recv_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
595
596
597
598 recd_from_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int));
599
600
601
602 start_pos = (int *) ADIOI_Malloc(nprocs*sizeof(int));
603
604
605
606 ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
607 if (!buftype_is_contig) {
608 flat_buf = ADIOI_Flatten_and_find(datatype);
609 }
610 MPI_Type_get_extent(datatype, &buftype_lb, &buftype_extent);
611
612 done = 0;
613 off = st_loc;
614 for_curr_iter = for_next_iter = 0;
615
616 MPI_Comm_rank(fd->comm, &rank);
617
618 #ifdef PROFILE
619 MPE_Log_event(14, 0, "end computation");
620 #endif
621
622 for (m=0; m<ntimes; m++) {
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660 #ifdef PROFILE
661 MPE_Log_event(13, 0, "start computation");
662 #endif
663 size = ADIOI_MIN((unsigned)coll_bufsize, end_loc-st_loc+1-done);
664 real_off = off - for_curr_iter;
665 real_size = size + for_curr_iter;
666
667 for (i=0; i<nprocs; i++) count[i] = send_size[i] = 0;
668 for_next_iter = 0;
669
670 for (i=0; i<nprocs; i++) {
671 #ifdef RDCOLL_DEBUG
672 DBG_FPRINTF(stderr, "rank %d, i %d, others_count %d\n", rank, i, others_req[i].count);
673 #endif
674 if (others_req[i].count) {
675 start_pos[i] = curr_offlen_ptr[i];
676 for (j=curr_offlen_ptr[i]; j<others_req[i].count;
677 j++) {
678 if (partial_send[i]) {
679
680
681 req_off = others_req[i].offsets[j] +
682 partial_send[i];
683 req_len = others_req[i].lens[j] -
684 partial_send[i];
685 partial_send[i] = 0;
686
687 others_req[i].offsets[j] = req_off;
688 others_req[i].lens[j] = req_len;
689 }
690 else {
691 req_off = others_req[i].offsets[j];
692 req_len = others_req[i].lens[j];
693 }
694 if (req_off < real_off + real_size) {
695 count[i]++;
696 ADIOI_Assert((((ADIO_Offset)(MPIU_Upint)read_buf)+req_off-real_off) == (ADIO_Offset)(MPIU_Upint)(read_buf+req_off-real_off));
697 MPI_Get_address(read_buf+req_off-real_off,
698 &(others_req[i].mem_ptrs[j]));
699 ADIOI_Assert((real_off + real_size - req_off) == (int)(real_off + real_size - req_off));
700 send_size[i] += (int)(ADIOI_MIN(real_off + real_size - req_off,
701 (ADIO_Offset)(unsigned)req_len));
702
703 if (real_off+real_size-req_off < (ADIO_Offset)(unsigned)req_len) {
704 partial_send[i] = (int) (real_off + real_size - req_off);
705 if ((j+1 < others_req[i].count) &&
706 (others_req[i].offsets[j+1] <
707 real_off+real_size)) {
708
709
710 for_next_iter = ADIOI_MAX(for_next_iter,
711 real_off + real_size - others_req[i].offsets[j+1]);
712
713
714 }
715 break;
716 }
717 }
718 else break;
719 }
720 curr_offlen_ptr[i] = j;
721 }
722 }
723
724 flag = 0;
725 for (i=0; i<nprocs; i++)
726 if (count[i]) flag = 1;
727
728 #ifdef PROFILE
729 MPE_Log_event(14, 0, "end computation");
730 #endif
731 if (flag) {
732 char round[50];
733 sprintf(round, "two-phase-round=%d", m);
734 setenv("LIBIOLOG_EXTRA_INFO", round, 1);
735 ADIOI_Assert(size == (int)size);
736 ADIO_ReadContig(fd, read_buf+for_curr_iter, (int)size, MPI_BYTE,
737 ADIO_EXPLICIT_OFFSET, off, &status, error_code);
738 #ifdef RDCOLL_DEBUG
739 DBG_FPRINTF(stderr, "\tread_coll: 700, data read [%lld] = ", size );
740 for (iii=0; iii<size && iii<80; iii++) { DBGV_FPRINTF(stderr, "%3d,", *((unsigned char *)read_buf + for_curr_iter + iii) ); }
741 DBG_FPRINTF(stderr, "\n" );
742 #endif
743
744 if (*error_code != MPI_SUCCESS) return;
745 }
746
747 for_curr_iter = for_next_iter;
748
749 #ifdef PROFILE
750 MPE_Log_event(7, 0, "start communication");
751 #endif
752 if (gpfsmpio_comm == 1)
753 ADIOI_R_Exchange_data(fd, buf, flat_buf, offset_list, len_list,
754 send_size, recv_size, count,
755 start_pos, partial_send, recd_from_proc, nprocs,
756 myrank,
757 buftype_is_contig, contig_access_count,
758 min_st_offset, fd_size, fd_start, fd_end,
759 others_req,
760 m, buftype_extent, buf_idx);
761 else
762 if (gpfsmpio_comm == 0) {
763 ADIOI_R_Exchange_data_alltoallv(fd, buf, flat_buf, offset_list, len_list,
764 send_size, recv_size, count,
765 start_pos, partial_send, recd_from_proc, nprocs,
766 myrank,
767 buftype_is_contig, contig_access_count,
768 min_st_offset, fd_size, fd_start, fd_end,
769 others_req,
770 m, buftype_extent, buf_idx);
771 }
772
773
774 #ifdef PROFILE
775 MPE_Log_event(8, 0, "end communication");
776 #endif
777
778 if (for_next_iter) {
779 tmp_buf = (char *) ADIOI_Malloc(for_next_iter);
780 ADIOI_Assert((((ADIO_Offset)(MPIU_Upint)read_buf)+real_size-for_next_iter) == (ADIO_Offset)(MPIU_Upint)(read_buf+real_size-for_next_iter));
781 ADIOI_Assert((for_next_iter+coll_bufsize) == (size_t)(for_next_iter+coll_bufsize));
782 memcpy(tmp_buf, read_buf+real_size-for_next_iter, for_next_iter);
783 ADIOI_Free(fd->io_buf);
784 fd->io_buf = (char *) ADIOI_Malloc(for_next_iter+coll_bufsize);
785 memcpy(fd->io_buf, tmp_buf, for_next_iter);
786 read_buf = fd->io_buf;
787 ADIOI_Free(tmp_buf);
788 }
789
790 off += size;
791 done += size;
792 }
793
794 for (i=0; i<nprocs; i++) count[i] = send_size[i] = 0;
795 #ifdef PROFILE
796 MPE_Log_event(7, 0, "start communication");
797 #endif
798 for (m=ntimes; m<max_ntimes; m++)
799
800
801 if (gpfsmpio_comm == 1)
802 ADIOI_R_Exchange_data(fd, buf, flat_buf, offset_list, len_list,
803 send_size, recv_size, count,
804 start_pos, partial_send, recd_from_proc, nprocs,
805 myrank,
806 buftype_is_contig, contig_access_count,
807 min_st_offset, fd_size, fd_start, fd_end,
808 others_req, m,
809 buftype_extent, buf_idx);
810 else
811 if (gpfsmpio_comm == 0)
812 ADIOI_R_Exchange_data_alltoallv(fd, buf, flat_buf, offset_list, len_list,
813 send_size, recv_size, count,
814 start_pos, partial_send, recd_from_proc, nprocs,
815 myrank,
816 buftype_is_contig, contig_access_count,
817 min_st_offset, fd_size, fd_start, fd_end,
818 others_req,
819 m, buftype_extent, buf_idx);
820
821 #ifdef PROFILE
822 MPE_Log_event(8, 0, "end communication");
823 #endif
824
825 ADIOI_Free(curr_offlen_ptr);
826 ADIOI_Free(count);
827 ADIOI_Free(partial_send);
828 ADIOI_Free(send_size);
829 ADIOI_Free(recv_size);
830 ADIOI_Free(recd_from_proc);
831 ADIOI_Free(start_pos);
832
833 unsetenv("LIBIOLOG_EXTRA_INFO");
834 }
835
836 static void ADIOI_R_Exchange_data(ADIO_File fd, void *buf, ADIOI_Flatlist_node
837 *flat_buf, ADIO_Offset *offset_list, ADIO_Offset
838 *len_list, int *send_size, int *recv_size,
839 int *count, int *start_pos, int *partial_send,
840 int *recd_from_proc, int nprocs,
841 int myrank, int
842 buftype_is_contig, int contig_access_count,
843 ADIO_Offset min_st_offset, ADIO_Offset fd_size,
844 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
845 ADIOI_Access *others_req,
846 int iter, MPI_Aint buftype_extent, int *buf_idx)
847 {
848 int i, j, k=0, tmp=0, nprocs_recv, nprocs_send;
849 char **recv_buf = NULL;
850 MPI_Request *requests;
851 MPI_Datatype send_type;
852 MPI_Status *statuses;
853
854
855
856
857 MPI_Alltoall(send_size, 1, MPI_INT, recv_size, 1, MPI_INT, fd->comm);
858
859 nprocs_recv = 0;
860 for (i=0; i < nprocs; i++) if (recv_size[i]) nprocs_recv++;
861
862 nprocs_send = 0;
863 for (i=0; i<nprocs; i++) if (send_size[i]) nprocs_send++;
864
865 requests = (MPI_Request *)
866 ADIOI_Malloc((nprocs_send+nprocs_recv+1)*sizeof(MPI_Request));
867
868
869
870
871
872 #ifdef AGGREGATION_PROFILE
873 MPE_Log_event (5032, 0, NULL);
874 #endif
875
876 if (buftype_is_contig) {
877 j = 0;
878 for (i=0; i < nprocs; i++)
879 if (recv_size[i]) {
880 MPI_Irecv(((char *) buf) + buf_idx[i], recv_size[i],
881 MPI_BYTE, i, myrank+i+100*iter, fd->comm, requests+j);
882 j++;
883 buf_idx[i] += recv_size[i];
884 }
885 }
886 else {
887
888 recv_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char*));
889 for (i=0; i < nprocs; i++)
890 if (recv_size[i]) recv_buf[i] =
891 (char *) ADIOI_Malloc(recv_size[i]);
892
893 j = 0;
894 for (i=0; i < nprocs; i++)
895 if (recv_size[i]) {
896 MPI_Irecv(recv_buf[i], recv_size[i], MPI_BYTE, i,
897 myrank+i+100*iter, fd->comm, requests+j);
898 j++;
899 #ifdef RDCOLL_DEBUG
900 DBG_FPRINTF(stderr, "node %d, recv_size %d, tag %d \n",
901 myrank, recv_size[i], myrank+i+100*iter);
902 #endif
903 }
904 }
905
906
907
908 j = 0;
909 for (i=0; i<nprocs; i++) {
910 if (send_size[i]) {
911
912 if (partial_send[i]) {
913 k = start_pos[i] + count[i] - 1;
914 tmp = others_req[i].lens[k];
915 others_req[i].lens[k] = partial_send[i];
916 }
917 ADIOI_Type_create_hindexed_x(count[i],
918 &(others_req[i].lens[start_pos[i]]),
919 &(others_req[i].mem_ptrs[start_pos[i]]),
920 MPI_BYTE, &send_type);
921
922 MPI_Type_commit(&send_type);
923 MPI_Isend(MPI_BOTTOM, 1, send_type, i, myrank+i+100*iter,
924 fd->comm, requests+nprocs_recv+j);
925 MPI_Type_free(&send_type);
926 if (partial_send[i]) others_req[i].lens[k] = tmp;
927 j++;
928 }
929 }
930
931 statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send+nprocs_recv+1) * \
932 sizeof(MPI_Status));
933
934
935
936 if (nprocs_recv) {
937 #ifdef NEEDS_MPI_TEST
938 j = 0;
939 while (!j) MPI_Testall(nprocs_recv, requests, &j, statuses);
940 #else
941 MPI_Waitall(nprocs_recv, requests, statuses);
942 #endif
943
944
945 if (!buftype_is_contig)
946 ADIOI_Fill_user_buffer(fd, buf, flat_buf, recv_buf,
947 offset_list, len_list, (unsigned*)recv_size,
948 requests, statuses, recd_from_proc,
949 nprocs, contig_access_count,
950 min_st_offset, fd_size, fd_start, fd_end,
951 buftype_extent);
952 }
953
954
955 MPI_Waitall(nprocs_send, requests+nprocs_recv, statuses+nprocs_recv);
956
957 ADIOI_Free(statuses);
958 ADIOI_Free(requests);
959
960 if (!buftype_is_contig) {
961 for (i=0; i < nprocs; i++)
962 if (recv_size[i]) ADIOI_Free(recv_buf[i]);
963 ADIOI_Free(recv_buf);
964 }
965 #ifdef AGGREGATION_PROFILE
966 MPE_Log_event (5033, 0, NULL);
967 #endif
968 }
969
970 #define ADIOI_BUF_INCR \
971 { \
972 while (buf_incr) { \
973 size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \
974 user_buf_idx += size_in_buf; \
975 flat_buf_sz -= size_in_buf; \
976 if (!flat_buf_sz) { \
977 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
978 else { \
979 flat_buf_idx = 0; \
980 n_buftypes++; \
981 } \
982 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
983 (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
984 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
985 } \
986 buf_incr -= size_in_buf; \
987 } \
988 }
989
990
991 #define ADIOI_BUF_COPY \
992 { \
993 while (size) { \
994 size_in_buf = ADIOI_MIN(size, flat_buf_sz); \
995 ADIOI_Assert((((ADIO_Offset)(MPIU_Upint)buf) + user_buf_idx) == (ADIO_Offset)(MPIU_Upint)(buf + user_buf_idx)); \
996 ADIOI_Assert(size_in_buf == (size_t)size_in_buf); \
997 memcpy(((char *) buf) + user_buf_idx, \
998 &(recv_buf[p][recv_buf_idx[p]]), size_in_buf); \
999 recv_buf_idx[p] += size_in_buf; \
1000 user_buf_idx += size_in_buf; \
1001 flat_buf_sz -= size_in_buf; \
1002 if (!flat_buf_sz) { \
1003 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
1004 else { \
1005 flat_buf_idx = 0; \
1006 n_buftypes++; \
1007 } \
1008 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
1009 (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
1010 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
1011 } \
1012 size -= size_in_buf; \
1013 buf_incr -= size_in_buf; \
1014 } \
1015 ADIOI_BUF_INCR \
1016 }
1017
1018 static void ADIOI_Fill_user_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
1019 *flat_buf, char **recv_buf, ADIO_Offset
1020 *offset_list, ADIO_Offset *len_list,
1021 unsigned *recv_size,
1022 MPI_Request *requests, MPI_Status *statuses,
1023 int *recd_from_proc, int nprocs,
1024 int contig_access_count,
1025 ADIO_Offset min_st_offset,
1026 ADIO_Offset fd_size, ADIO_Offset *fd_start,
1027 ADIO_Offset *fd_end,
1028 MPI_Aint buftype_extent)
1029 {
1030
1031
1032
1033 int i, p, flat_buf_idx;
1034 ADIO_Offset flat_buf_sz, size_in_buf, buf_incr, size;
1035 int n_buftypes;
1036 ADIO_Offset off, len, rem_len, user_buf_idx;
1037
1038 unsigned *curr_from_proc, *done_from_proc, *recv_buf_idx;
1039
1040 ADIOI_UNREFERENCED_ARG(requests);
1041 ADIOI_UNREFERENCED_ARG(statuses);
1042
1043
1044
1045
1046
1047
1048
1049 curr_from_proc = (unsigned *) ADIOI_Malloc(nprocs * sizeof(unsigned));
1050 done_from_proc = (unsigned *) ADIOI_Malloc(nprocs * sizeof(unsigned));
1051 recv_buf_idx = (unsigned *) ADIOI_Malloc(nprocs * sizeof(unsigned));
1052
1053 for (i=0; i < nprocs; i++) {
1054 recv_buf_idx[i] = curr_from_proc[i] = 0;
1055 done_from_proc[i] = recd_from_proc[i];
1056 }
1057
1058 user_buf_idx = flat_buf->indices[0];
1059 flat_buf_idx = 0;
1060 n_buftypes = 0;
1061 flat_buf_sz = flat_buf->blocklens[0];
1062
1063
1064
1065
1066
1067 for (i=0; i<contig_access_count; i++) {
1068 off = offset_list[i];
1069 rem_len = len_list[i];
1070
1071
1072 while (rem_len > 0) {
1073 len = rem_len;
1074
1075
1076
1077
1078 p = ADIOI_GPFS_Calc_aggregator(fd,
1079 off,
1080 min_st_offset,
1081 &len,
1082 fd_size,
1083 fd_start,
1084 fd_end);
1085
1086 if (recv_buf_idx[p] < recv_size[p]) {
1087 if (curr_from_proc[p]+len > done_from_proc[p]) {
1088 if (done_from_proc[p] > curr_from_proc[p]) {
1089 size = ADIOI_MIN(curr_from_proc[p] + len -
1090 done_from_proc[p], recv_size[p]-recv_buf_idx[p]);
1091 buf_incr = done_from_proc[p] - curr_from_proc[p];
1092 ADIOI_BUF_INCR
1093 buf_incr = curr_from_proc[p]+len-done_from_proc[p];
1094 ADIOI_Assert((done_from_proc[p] + size) == (unsigned)((ADIO_Offset)done_from_proc[p] + size));
1095 curr_from_proc[p] = done_from_proc[p] + size;
1096 ADIOI_BUF_COPY
1097 }
1098 else {
1099 size = ADIOI_MIN(len,recv_size[p]-recv_buf_idx[p]);
1100 buf_incr = len;
1101 ADIOI_Assert((curr_from_proc[p] + size) == (unsigned)((ADIO_Offset)curr_from_proc[p] + size));
1102 curr_from_proc[p] += (unsigned) size;
1103 ADIOI_BUF_COPY
1104 }
1105 }
1106 else {
1107 ADIOI_Assert((curr_from_proc[p] + len) == (unsigned)((ADIO_Offset)curr_from_proc[p] + len));
1108 curr_from_proc[p] += (unsigned) len;
1109 buf_incr = len;
1110 ADIOI_BUF_INCR
1111 }
1112 }
1113 else {
1114 buf_incr = len;
1115 ADIOI_BUF_INCR
1116 }
1117 off += len;
1118 rem_len -= len;
1119 }
1120 }
1121 for (i=0; i < nprocs; i++)
1122 if (recv_size[i]) recd_from_proc[i] = curr_from_proc[i];
1123
1124 ADIOI_Free(curr_from_proc);
1125 ADIOI_Free(done_from_proc);
1126 ADIOI_Free(recv_buf_idx);
1127 }
1128
1129 static void ADIOI_R_Exchange_data_alltoallv(
1130 ADIO_File fd, void *buf, ADIOI_Flatlist_node
1131 *flat_buf, ADIO_Offset *offset_list, ADIO_Offset
1132 *len_list, int *send_size, int *recv_size,
1133 int *count, int *start_pos, int *partial_send,
1134 int *recd_from_proc, int nprocs,
1135 int myrank, int
1136 buftype_is_contig, int contig_access_count,
1137 ADIO_Offset min_st_offset, ADIO_Offset fd_size,
1138 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
1139 ADIOI_Access *others_req,
1140 int iter, MPI_Aint buftype_extent, int *buf_idx)
1141 {
1142 int i, j, k=0, tmp=0, nprocs_recv, nprocs_send;
1143 char **recv_buf = NULL;
1144 MPI_Request *requests=NULL;
1145 MPI_Status *statuses=NULL;
1146 int rtail, stail;
1147 char *sbuf_ptr, *from_ptr;
1148 int len;
1149 int *sdispls, *rdispls;
1150 char *all_recv_buf, *all_send_buf;
1151
1152
1153
1154 MPI_Alltoall(send_size, 1, MPI_INT, recv_size, 1, MPI_INT, fd->comm);
1155
1156 nprocs_recv = 0;
1157 for (i=0; i<nprocs; i++) if (recv_size[i]) { nprocs_recv++; break; }
1158
1159 nprocs_send = 0;
1160 for (i=0; i<nprocs; i++) if (send_size[i]) { nprocs_send++; break; }
1161
1162
1163 rdispls = (int *) ADIOI_Malloc( nprocs * sizeof(int) );
1164 rtail = 0;
1165 for (i=0; i<nprocs; i++) { rdispls[i] = rtail; rtail += recv_size[i]; }
1166
1167
1168 all_recv_buf = (char *) ADIOI_Malloc( rtail );
1169 recv_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char *));
1170 for (i=0; i<nprocs; i++) { recv_buf[i] = all_recv_buf + rdispls[i]; }
1171
1172
1173 sdispls = (int *) ADIOI_Malloc( nprocs * sizeof(int) );
1174 stail = 0;
1175 for (i=0; i<nprocs; i++) { sdispls[i] = stail; stail += send_size[i]; }
1176
1177
1178 all_send_buf = (char *) ADIOI_Malloc( stail );
1179 for (i=0; i<nprocs; i++)
1180 {
1181 if (send_size[i]) {
1182 if (partial_send[i]) {
1183 k = start_pos[i] + count[i] - 1;
1184 tmp = others_req[i].lens[k];
1185 others_req[i].lens[k] = partial_send[i];
1186 }
1187 sbuf_ptr = all_send_buf + sdispls[i];
1188 for (j=0; j<count[i]; j++) {
1189 ADIOI_ENSURE_AINT_FITS_IN_PTR( others_req[i].mem_ptrs[ start_pos[i]+j ]);
1190 from_ptr = (char *) ADIOI_AINT_CAST_TO_VOID_PTR ( others_req[i].mem_ptrs[ start_pos[i]+j ] );
1191 len = others_req[i].lens[ start_pos[i]+j ] ;
1192 memcpy( sbuf_ptr, from_ptr, len );
1193 sbuf_ptr += len;
1194 }
1195 if (partial_send[i]) others_req[i].lens[k] = tmp;
1196 }
1197 }
1198
1199 #if RDCOLL_DEBUG
1200 DBG_FPRINTF(stderr, "\tsend_size = [%d]%2d,",0,send_size[0]);
1201 for (i=1; i<nprocs; i++) if(send_size[i-1]!=send_size[i]){ DBG_FPRINTF(stderr, "\t\t[%d]%2d,", i,send_size[i] ); }
1202 DBG_FPRINTF(stderr, "\trecv_size = [%d]%2d,",0,recv_size[0]);
1203 for (i=1; i<nprocs; i++) if(recv_size[i-1]!=recv_size[i]){ DBG_FPRINTF(stderr, "\t\t[%d]%2d,", i,recv_size[i] ); }
1204 DBG_FPRINTF(stderr, "\tsdispls = [%d]%2d,",0,sdispls[0]);
1205 for (i=1; i<nprocs; i++) if(sdispls[i-1]!=sdispls[i]){ DBG_FPRINTF(stderr, "\t\t[%d]%2d,", i,sdispls [i] ); }
1206 DBG_FPRINTF(stderr, "\trdispls = [%d]%2d,",0,rdispls[0]);
1207 for (i=1; i<nprocs; i++) if(rdispls[i-1]!=rdispls[i]){ DBG_FPRINTF(stderr, "\t\t[%d]%2d,", i,rdispls [i] ); }
1208 DBG_FPRINTF(stderr, "\ttails = %4d, %4d\n", stail, rtail );
1209 if (nprocs_send) {
1210 DBG_FPRINTF(stderr, "\tall_send_buf = [%d]%2d,",0,all_send_buf[0]);
1211
1212
1213 }
1214 #endif
1215
1216
1217 MPI_Alltoallv(
1218 all_send_buf, send_size, sdispls, MPI_BYTE,
1219 all_recv_buf, recv_size, rdispls, MPI_BYTE,
1220 fd->comm );
1221
1222 #if 0
1223 DBG_FPRINTF(stderr, "\tall_recv_buf = " );
1224 for (i=131072; i<131073; i++) { DBG_FPRINTF(stderr, "%2d,", all_recv_buf [i] ); }
1225 DBG_FPRINTF(stderr, "\n" );
1226 #endif
1227
1228
1229 if (nprocs_recv) {
1230 if (!buftype_is_contig)
1231 ADIOI_Fill_user_buffer(fd, buf, flat_buf, recv_buf,
1232 offset_list, len_list, (unsigned*)recv_size,
1233 requests, statuses,
1234 recd_from_proc,
1235 nprocs, contig_access_count,
1236 min_st_offset, fd_size, fd_start, fd_end,
1237 buftype_extent);
1238 else {
1239 rtail = 0;
1240 for (i=0; i < nprocs; i++)
1241 if (recv_size[i]) {
1242 memcpy( (char *)buf + buf_idx[i], all_recv_buf + rtail, recv_size[i] );
1243 buf_idx[i] += recv_size[i];
1244 rtail += recv_size[i];
1245 }
1246 }
1247 }
1248
1249 ADIOI_Free( all_send_buf );
1250 ADIOI_Free( all_recv_buf );
1251 ADIOI_Free( recv_buf );
1252 ADIOI_Free( sdispls );
1253 ADIOI_Free( rdispls );
1254 return;
1255 }