This source file includes following definitions.
- ADIOI_GEN_WriteStridedColl
- ADIOI_Exch_and_write
- ADIOI_W_Exchange_data
- ADIOI_Fill_send_buffer
- ADIOI_Heap_merge
1
2
3
4
5
6
7
8 #include "adio.h"
9 #include "adio_extern.h"
10
11 #ifdef AGGREGATION_PROFILE
12 #include "mpe.h"
13 #endif
14
15
16 static void ADIOI_Exch_and_write(ADIO_File fd, void *buf, MPI_Datatype
17 datatype, int nprocs, int myrank,
18 ADIOI_Access
19 *others_req, ADIO_Offset *offset_list,
20 ADIO_Offset *len_list, int contig_access_count, ADIO_Offset
21 min_st_offset, ADIO_Offset fd_size,
22 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
23 int *buf_idx, int *error_code);
24 static void ADIOI_W_Exchange_data(ADIO_File fd, void *buf, char *write_buf,
25 ADIOI_Flatlist_node *flat_buf, ADIO_Offset
26 *offset_list, ADIO_Offset *len_list, int *send_size,
27 int *recv_size, ADIO_Offset off, int size,
28 int *count, int *start_pos, int *partial_recv,
29 int *sent_to_proc, int nprocs,
30 int myrank, int
31 buftype_is_contig, int contig_access_count,
32 ADIO_Offset min_st_offset, ADIO_Offset fd_size,
33 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
34 ADIOI_Access *others_req,
35 int *send_buf_idx, int *curr_to_proc,
36 int *done_to_proc, int *hole, int iter,
37 MPI_Aint buftype_extent, int *buf_idx, int *error_code);
38 void ADIOI_Fill_send_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
39 *flat_buf, char **send_buf, ADIO_Offset
40 *offset_list, ADIO_Offset *len_list, int *send_size,
41 MPI_Request *requests, int *sent_to_proc,
42 int nprocs, int myrank,
43 int contig_access_count, ADIO_Offset
44 min_st_offset, ADIO_Offset fd_size,
45 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
46 int *send_buf_idx, int *curr_to_proc,
47 int *done_to_proc, int iter,
48 MPI_Aint buftype_extent);
49 void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count,
50 ADIO_Offset *srt_off, int *srt_len, int *start_pos,
51 int nprocs, int nprocs_recv, int total_elements);
52
53
54 void ADIOI_GEN_WriteStridedColl(ADIO_File fd, const void *buf, int count,
55 MPI_Datatype datatype, int file_ptr_type,
56 ADIO_Offset offset, ADIO_Status *status, int
57 *error_code)
58 {
59
60
61
62
63
64
65 ADIOI_Access *my_req;
66
67
68
69 ADIOI_Access *others_req;
70
71
72
73 int i, filetype_is_contig, nprocs, nprocs_for_coll, myrank;
74 int contig_access_count=0, interleave_count = 0, buftype_is_contig;
75 int *count_my_req_per_proc, count_my_req_procs, count_others_req_procs;
76 ADIO_Offset orig_fp, start_offset, end_offset, fd_size, min_st_offset, off;
77 ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *fd_start = NULL,
78 *fd_end = NULL, *end_offsets = NULL;
79 int *buf_idx = NULL;
80 ADIO_Offset *len_list = NULL;
81 int old_error, tmp_error;
82
83 if (fd->hints->cb_pfr != ADIOI_HINT_DISABLE) {
84
85
86 ADIOI_IOStridedColl (fd, (char *) buf, count, ADIOI_WRITE, datatype,
87 file_ptr_type, offset, status, error_code);
88 return;
89 }
90
91 MPI_Comm_size(fd->comm, &nprocs);
92 MPI_Comm_rank(fd->comm, &myrank);
93
94
95
96
97 nprocs_for_coll = fd->hints->cb_nodes;
98 orig_fp = fd->fp_ind;
99
100
101 if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
102
103
104
105
106
107
108 ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,
109 &offset_list, &len_list, &start_offset,
110 &end_offset, &contig_access_count);
111
112
113
114
115
116 st_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
117 end_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
118
119 MPI_Allgather(&start_offset, 1, ADIO_OFFSET, st_offsets, 1,
120 ADIO_OFFSET, fd->comm);
121 MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1,
122 ADIO_OFFSET, fd->comm);
123
124
125 for (i=1; i<nprocs; i++)
126 if ((st_offsets[i] < end_offsets[i-1]) &&
127 (st_offsets[i] <= end_offsets[i]))
128 interleave_count++;
129
130
131 }
132
133 ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
134
135 if (fd->hints->cb_write == ADIOI_HINT_DISABLE ||
136 (!interleave_count && (fd->hints->cb_write == ADIOI_HINT_AUTO)))
137 {
138
139 if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
140 ADIOI_Free(offset_list);
141 ADIOI_Free(len_list);
142 ADIOI_Free(st_offsets);
143 ADIOI_Free(end_offsets);
144 }
145
146 fd->fp_ind = orig_fp;
147 ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
148
149 if (buftype_is_contig && filetype_is_contig) {
150 if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
151 off = fd->disp + (ADIO_Offset)(fd->etype_size) * offset;
152 ADIO_WriteContig(fd, buf, count, datatype,
153 ADIO_EXPLICIT_OFFSET,
154 off, status, error_code);
155 }
156 else ADIO_WriteContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
157 0, status, error_code);
158 }
159 else ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type,
160 offset, status, error_code);
161
162 return;
163 }
164
165
166
167
168
169 ADIOI_Calc_file_domains(st_offsets, end_offsets, nprocs,
170 nprocs_for_coll, &min_st_offset,
171 &fd_start, &fd_end,
172 fd->hints->min_fdomain_size, &fd_size,
173 fd->hints->striping_unit);
174
175
176
177
178
179 ADIOI_Calc_my_req(fd, offset_list, len_list, contig_access_count,
180 min_st_offset, fd_start, fd_end, fd_size,
181 nprocs, &count_my_req_procs,
182 &count_my_req_per_proc, &my_req,
183 &buf_idx);
184
185
186
187
188
189
190
191
192 ADIOI_Calc_others_req(fd, count_my_req_procs,
193 count_my_req_per_proc, my_req,
194 nprocs, myrank,
195 &count_others_req_procs, &others_req);
196
197 ADIOI_Free(count_my_req_per_proc);
198 for (i=0; i < nprocs; i++) {
199 if (my_req[i].count) {
200 ADIOI_Free(my_req[i].offsets);
201 ADIOI_Free(my_req[i].lens);
202 }
203 }
204 ADIOI_Free(my_req);
205
206
207
208 ADIOI_Exch_and_write(fd, (char *) buf, datatype, nprocs, myrank,
209 others_req, offset_list,
210 len_list, contig_access_count, min_st_offset,
211 fd_size, fd_start, fd_end, buf_idx, error_code);
212
213
214
215
216
217
218
219
220
221
222
223
224 old_error = *error_code;
225 if (*error_code != MPI_SUCCESS) *error_code = MPI_ERR_IO;
226
227
228
229 #ifdef ADIOI_MPE_LOGGING
230 MPE_Log_event( ADIOI_MPE_postwrite_a, 0, NULL );
231 #endif
232 if (fd->hints->cb_nodes == 1)
233 MPI_Bcast(error_code, 1, MPI_INT,
234 fd->hints->ranklist[0], fd->comm);
235 else {
236 tmp_error = *error_code;
237 MPI_Allreduce(&tmp_error, error_code, 1, MPI_INT,
238 MPI_MAX, fd->comm);
239 }
240 #ifdef ADIOI_MPE_LOGGING
241 MPE_Log_event( ADIOI_MPE_postwrite_b, 0, NULL );
242 #endif
243 #ifdef AGGREGATION_PROFILE
244 MPE_Log_event (5012, 0, NULL);
245 #endif
246
247 if ( (old_error != MPI_SUCCESS) && (old_error != MPI_ERR_IO) )
248 *error_code = old_error;
249
250
251 if (!buftype_is_contig) ADIOI_Delete_flattened(datatype);
252
253
254
255 for (i=0; i<nprocs; i++) {
256 if (others_req[i].count) {
257 ADIOI_Free(others_req[i].offsets);
258 ADIOI_Free(others_req[i].lens);
259 ADIOI_Free(others_req[i].mem_ptrs);
260 }
261 }
262 ADIOI_Free(others_req);
263
264 ADIOI_Free(buf_idx);
265 ADIOI_Free(offset_list);
266 ADIOI_Free(len_list);
267 ADIOI_Free(st_offsets);
268 ADIOI_Free(end_offsets);
269 ADIOI_Free(fd_start);
270 ADIOI_Free(fd_end);
271
272 #ifdef HAVE_STATUS_SET_BYTES
273 if (status) {
274 MPI_Count bufsize, size;
275
276 MPI_Type_size_x(datatype, &size);
277 bufsize = size * count;
278 MPIR_Status_set_bytes(status, datatype, bufsize);
279 }
280
281
282 #endif
283
284 fd->fp_sys_posn = -1;
285 #ifdef AGGREGATION_PROFILE
286 MPE_Log_event (5013, 0, NULL);
287 #endif
288 }
289
290
291
292
293
294
295 static void ADIOI_Exch_and_write(ADIO_File fd, void *buf, MPI_Datatype
296 datatype, int nprocs,
297 int myrank,
298 ADIOI_Access
299 *others_req, ADIO_Offset *offset_list,
300 ADIO_Offset *len_list, int contig_access_count,
301 ADIO_Offset min_st_offset, ADIO_Offset fd_size,
302 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
303 int *buf_idx, int *error_code)
304 {
305
306
307
308
309
310
311
312
313
314
315 ADIO_Offset size=0;
316 int hole, i, j, m, ntimes, max_ntimes, buftype_is_contig;
317 ADIO_Offset st_loc=-1, end_loc=-1, off, done, req_off;
318 char *write_buf=NULL;
319 int *curr_offlen_ptr, *count, *send_size, req_len, *recv_size;
320 int *partial_recv, *sent_to_proc, *start_pos, flag;
321 int *send_buf_idx, *curr_to_proc, *done_to_proc;
322 MPI_Status status;
323 ADIOI_Flatlist_node *flat_buf=NULL;
324 MPI_Aint buftype_extent, lb;
325 int info_flag, coll_bufsize;
326 char *value;
327 static char myname[] = "ADIOI_EXCH_AND_WRITE";
328
329 *error_code = MPI_SUCCESS;
330
331
332
333
334
335
336 value = (char *) ADIOI_Malloc((MPI_MAX_INFO_VAL+1)*sizeof(char));
337 ADIOI_Info_get(fd->info, "cb_buffer_size", MPI_MAX_INFO_VAL, value,
338 &info_flag);
339 coll_bufsize = atoi(value);
340 ADIOI_Free(value);
341
342
343 for (i=0; i < nprocs; i++) {
344 if (others_req[i].count) {
345 st_loc = others_req[i].offsets[0];
346 end_loc = others_req[i].offsets[0];
347 break;
348 }
349 }
350
351 for (i=0; i < nprocs; i++)
352 for (j=0; j < others_req[i].count; j++) {
353 st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);
354 end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j]
355 + others_req[i].lens[j] - 1));
356 }
357
358
359
360 ntimes = (int) ((end_loc - st_loc + coll_bufsize)/coll_bufsize);
361
362 if ((st_loc==-1) && (end_loc==-1)) {
363 ntimes = 0;
364 }
365
366 MPI_Allreduce(&ntimes, &max_ntimes, 1, MPI_INT, MPI_MAX,
367 fd->comm);
368
369 write_buf = fd->io_buf;
370
371 curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
372
373
374 count = (int *) ADIOI_Malloc(nprocs*sizeof(int));
375
376
377
378 partial_recv = (int *) ADIOI_Calloc(nprocs, sizeof(int));
379
380
381
382
383 send_size = (int *) ADIOI_Malloc(nprocs*sizeof(int));
384
385
386
387 recv_size = (int *) ADIOI_Malloc(nprocs*sizeof(int));
388
389
390 sent_to_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int));
391
392
393
394 send_buf_idx = (int *) ADIOI_Malloc(nprocs*sizeof(int));
395 curr_to_proc = (int *) ADIOI_Malloc(nprocs*sizeof(int));
396 done_to_proc = (int *) ADIOI_Malloc(nprocs*sizeof(int));
397
398
399 start_pos = (int *) ADIOI_Malloc(nprocs*sizeof(int));
400
401
402
403 ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
404 if (!buftype_is_contig) {
405 flat_buf = ADIOI_Flatten_and_find(datatype);
406 }
407 MPI_Type_get_extent(datatype, &lb, &buftype_extent);
408
409
410
411
412
413
414
415
416
417
418
419
420
421 done = 0;
422 off = st_loc;
423
424 for (m=0; m < ntimes; m++) {
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442 for (i=0; i < nprocs; i++) count[i] = recv_size[i] = 0;
443
444 size = ADIOI_MIN((unsigned)coll_bufsize, end_loc-st_loc+1-done);
445
446 for (i=0; i < nprocs; i++) {
447 if (others_req[i].count) {
448 start_pos[i] = curr_offlen_ptr[i];
449 for (j=curr_offlen_ptr[i]; j<others_req[i].count; j++) {
450 if (partial_recv[i]) {
451
452
453 req_off = others_req[i].offsets[j] +
454 partial_recv[i];
455 req_len = others_req[i].lens[j] -
456 partial_recv[i];
457 partial_recv[i] = 0;
458
459 others_req[i].offsets[j] = req_off;
460 others_req[i].lens[j] = req_len;
461 }
462 else {
463 req_off = others_req[i].offsets[j];
464 req_len = others_req[i].lens[j];
465 }
466 if (req_off < off + size) {
467 count[i]++;
468 ADIOI_Assert((((ADIO_Offset)(MPIU_Upint)write_buf)+req_off-off) == (ADIO_Offset)(MPIU_Upint)(write_buf+req_off-off));
469 MPI_Get_address(write_buf+req_off-off,
470 &(others_req[i].mem_ptrs[j]));
471 ADIOI_Assert((off + size - req_off) == (int)(off + size - req_off));
472 recv_size[i] += (int)(ADIOI_MIN(off + size - req_off,
473 (unsigned)req_len));
474
475 if (off+size-req_off < (unsigned)req_len)
476 {
477 partial_recv[i] = (int) (off + size - req_off);
478
479
480 if ((j+1 < others_req[i].count) &&
481 (others_req[i].offsets[j+1] < off+size))
482 {
483 *error_code = MPIO_Err_create_code(MPI_SUCCESS,
484 MPIR_ERR_RECOVERABLE,
485 myname,
486 __LINE__,
487 MPI_ERR_ARG,
488 "Filetype specifies overlapping write regions (which is illegal according to the MPI-2 specification)", 0);
489
490
491
492 }
493
494 break;
495 }
496 }
497 else break;
498 }
499 curr_offlen_ptr[i] = j;
500 }
501 }
502
503 ADIOI_W_Exchange_data(fd, buf, write_buf, flat_buf, offset_list,
504 len_list, send_size, recv_size, off, size, count,
505 start_pos, partial_recv,
506 sent_to_proc, nprocs, myrank,
507 buftype_is_contig, contig_access_count,
508 min_st_offset, fd_size, fd_start, fd_end,
509 others_req, send_buf_idx, curr_to_proc,
510 done_to_proc, &hole, m, buftype_extent, buf_idx,
511 error_code);
512 if (*error_code != MPI_SUCCESS) return;
513
514 flag = 0;
515 for (i=0; i<nprocs; i++)
516 if (count[i]) flag = 1;
517
518 if (flag) {
519 ADIOI_Assert(size == (int)size);
520 ADIO_WriteContig(fd, write_buf, (int)size, MPI_BYTE, ADIO_EXPLICIT_OFFSET,
521 off, &status, error_code);
522 if (*error_code != MPI_SUCCESS) return;
523 }
524
525 off += size;
526 done += size;
527 }
528
529 for (i=0; i<nprocs; i++) count[i] = recv_size[i] = 0;
530 for (m=ntimes; m<max_ntimes; m++) {
531 ADIOI_Assert(size == (int)size);
532
533 ADIOI_W_Exchange_data(fd, buf, write_buf, flat_buf, offset_list,
534 len_list, send_size, recv_size, off, (int)size, count,
535 start_pos, partial_recv,
536 sent_to_proc, nprocs, myrank,
537 buftype_is_contig, contig_access_count,
538 min_st_offset, fd_size, fd_start, fd_end,
539 others_req, send_buf_idx,
540 curr_to_proc, done_to_proc, &hole, m,
541 buftype_extent, buf_idx, error_code);
542 if (*error_code != MPI_SUCCESS) return;
543 }
544
545 ADIOI_Free(curr_offlen_ptr);
546 ADIOI_Free(count);
547 ADIOI_Free(partial_recv);
548 ADIOI_Free(send_size);
549 ADIOI_Free(recv_size);
550 ADIOI_Free(sent_to_proc);
551 ADIOI_Free(start_pos);
552 ADIOI_Free(send_buf_idx);
553 ADIOI_Free(curr_to_proc);
554 ADIOI_Free(done_to_proc);
555 }
556
557
558
559
560
561 static void ADIOI_W_Exchange_data(ADIO_File fd, void *buf, char *write_buf,
562 ADIOI_Flatlist_node *flat_buf, ADIO_Offset
563 *offset_list, ADIO_Offset *len_list, int *send_size,
564 int *recv_size, ADIO_Offset off, int size,
565 int *count, int *start_pos,
566 int *partial_recv,
567 int *sent_to_proc, int nprocs,
568 int myrank, int
569 buftype_is_contig, int contig_access_count,
570 ADIO_Offset min_st_offset,
571 ADIO_Offset fd_size,
572 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
573 ADIOI_Access *others_req,
574 int *send_buf_idx, int *curr_to_proc,
575 int *done_to_proc, int *hole, int iter,
576 MPI_Aint buftype_extent, int *buf_idx,
577 int *error_code)
578 {
579 int i, j, k, *tmp_len, nprocs_recv, nprocs_send, err;
580 char **send_buf = NULL;
581 MPI_Request *requests, *send_req;
582 MPI_Datatype *recv_types;
583 MPI_Status *statuses, status;
584 int *srt_len=NULL, sum;
585 ADIO_Offset *srt_off=NULL;
586 static char myname[] = "ADIOI_W_EXCHANGE_DATA";
587
588
589
590
591 MPI_Alltoall(recv_size, 1, MPI_INT, send_size, 1, MPI_INT, fd->comm);
592
593
594
595 nprocs_recv = 0;
596 for (i=0; i<nprocs; i++) if (recv_size[i]) nprocs_recv++;
597
598 recv_types = (MPI_Datatype *)
599 ADIOI_Malloc((nprocs_recv+1)*sizeof(MPI_Datatype));
600
601
602 tmp_len = (int *) ADIOI_Malloc(nprocs*sizeof(int));
603 j = 0;
604 for (i=0; i<nprocs; i++) {
605 if (recv_size[i]) {
606
607 if (partial_recv[i]) {
608 k = start_pos[i] + count[i] - 1;
609 tmp_len[i] = others_req[i].lens[k];
610 others_req[i].lens[k] = partial_recv[i];
611 }
612 ADIOI_Type_create_hindexed_x(count[i],
613 &(others_req[i].lens[start_pos[i]]),
614 &(others_req[i].mem_ptrs[start_pos[i]]),
615 MPI_BYTE, recv_types+j);
616
617 MPI_Type_commit(recv_types+j);
618 j++;
619 }
620 }
621
622
623
624
625
626 sum = 0;
627 for (i=0; i<nprocs; i++) sum += count[i];
628
629
630 if (sum) {
631 srt_off = (ADIO_Offset *) ADIOI_Malloc(sum*sizeof(ADIO_Offset));
632 srt_len = (int *) ADIOI_Malloc(sum*sizeof(int));
633
634 ADIOI_Heap_merge(others_req, count, srt_off, srt_len, start_pos,
635 nprocs, nprocs_recv, sum);
636 }
637
638
639 for (i=0; i<nprocs; i++)
640 if (partial_recv[i]) {
641 k = start_pos[i] + count[i] - 1;
642 others_req[i].lens[k] = tmp_len[i];
643 }
644 ADIOI_Free(tmp_len);
645
646
647
648
649
650
651
652
653 *hole = 0;
654 if (sum) {
655 if (off != srt_off[0])
656 *hole = 1;
657 else {
658 for (i=1; i<sum; i++) {
659 if (srt_off[i] <= srt_off[0] + srt_len[0]) {
660
661 int new_len = (int)srt_off[i] + srt_len[i] - (int)srt_off[0];
662 if (new_len > srt_len[0]) srt_len[0] = new_len;
663 }
664 else
665 break;
666 }
667 if (i < sum || size != srt_len[0])
668 *hole = 1;
669 }
670
671 ADIOI_Free(srt_off);
672 ADIOI_Free(srt_len);
673 }
674
675 if (nprocs_recv) {
676 if (*hole) {
677 ADIO_ReadContig(fd, write_buf, size, MPI_BYTE,
678 ADIO_EXPLICIT_OFFSET, off, &status, &err);
679
680 if (err != MPI_SUCCESS) {
681 *error_code = MPIO_Err_create_code(err,
682 MPIR_ERR_RECOVERABLE, myname,
683 __LINE__, MPI_ERR_IO,
684 "**ioRMWrdwr", 0);
685 return;
686 }
687
688 }
689 }
690
691 nprocs_send = 0;
692 for (i=0; i < nprocs; i++) if (send_size[i]) nprocs_send++;
693
694 if (fd->atomicity) {
695
696 requests = (MPI_Request *)
697 ADIOI_Malloc((nprocs_send+1)*sizeof(MPI_Request));
698 send_req = requests;
699 }
700 else {
701 requests = (MPI_Request *)
702 ADIOI_Malloc((nprocs_send+nprocs_recv+1)*sizeof(MPI_Request));
703
704
705
706 j = 0;
707 for (i=0; i<nprocs; i++) {
708 if (recv_size[i]) {
709 MPI_Irecv(MPI_BOTTOM, 1, recv_types[j], i, myrank+i+100*iter,
710 fd->comm, requests+j);
711 j++;
712 }
713 }
714 send_req = requests + nprocs_recv;
715 }
716
717
718
719
720 #ifdef AGGREGATION_PROFILE
721 MPE_Log_event (5032, 0, NULL);
722 #endif
723 if (buftype_is_contig) {
724 j = 0;
725 for (i=0; i < nprocs; i++)
726 if (send_size[i]) {
727 MPI_Isend(((char *) buf) + buf_idx[i], send_size[i],
728 MPI_BYTE, i, myrank+i+100*iter, fd->comm,
729 send_req+j);
730 j++;
731 buf_idx[i] += send_size[i];
732 }
733 }
734 else if (nprocs_send) {
735
736 send_buf = (char **) ADIOI_Malloc(nprocs*sizeof(char*));
737 for (i=0; i < nprocs; i++)
738 if (send_size[i])
739 send_buf[i] = (char *) ADIOI_Malloc(send_size[i]);
740
741 ADIOI_Fill_send_buffer(fd, buf, flat_buf, send_buf,
742 offset_list, len_list, send_size,
743 send_req,
744 sent_to_proc, nprocs, myrank,
745 contig_access_count,
746 min_st_offset, fd_size, fd_start, fd_end,
747 send_buf_idx, curr_to_proc, done_to_proc, iter,
748 buftype_extent);
749
750 }
751
752 if (fd->atomicity) {
753
754 j = 0;
755 for (i=0; i<nprocs; i++) {
756 MPI_Status wkl_status;
757 if (recv_size[i]) {
758 MPI_Recv(MPI_BOTTOM, 1, recv_types[j], i, myrank+i+100*iter,
759 fd->comm, &wkl_status);
760 j++;
761 }
762 }
763 }
764
765 for (i=0; i<nprocs_recv; i++) MPI_Type_free(recv_types+i);
766 ADIOI_Free(recv_types);
767
768 if (fd->atomicity) {
769
770 statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send+1) * \
771 sizeof(MPI_Status));
772
773 }
774 else {
775 statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send+nprocs_recv+1) * \
776 sizeof(MPI_Status));
777
778 }
779
780 #ifdef NEEDS_MPI_TEST
781 i = 0;
782 if (fd->atomicity) {
783
784 while (!i) MPI_Testall(nprocs_send, send_req, &i, statuses);
785 }
786 else {
787 while (!i) MPI_Testall(nprocs_send+nprocs_recv, requests, &i, statuses);
788 }
789 #else
790 if (fd->atomicity)
791
792 MPI_Waitall(nprocs_send, send_req, statuses);
793 else
794 MPI_Waitall(nprocs_send+nprocs_recv, requests, statuses);
795 #endif
796
797 #ifdef AGGREGATION_PROFILE
798 MPE_Log_event (5033, 0, NULL);
799 #endif
800 ADIOI_Free(statuses);
801 ADIOI_Free(requests);
802 if (!buftype_is_contig && nprocs_send) {
803 for (i=0; i < nprocs; i++)
804 if (send_size[i]) ADIOI_Free(send_buf[i]);
805 ADIOI_Free(send_buf);
806 }
807 }
808
809 #define ADIOI_BUF_INCR \
810 { \
811 while (buf_incr) { \
812 size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \
813 user_buf_idx += size_in_buf; \
814 flat_buf_sz -= size_in_buf; \
815 if (!flat_buf_sz) { \
816 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
817 else { \
818 flat_buf_idx = 0; \
819 n_buftypes++; \
820 } \
821 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
822 (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
823 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
824 } \
825 buf_incr -= size_in_buf; \
826 } \
827 }
828
829
830 #define ADIOI_BUF_COPY \
831 { \
832 while (size) { \
833 size_in_buf = ADIOI_MIN(size, flat_buf_sz); \
834 ADIOI_Assert((((ADIO_Offset)(MPIU_Upint)buf) + user_buf_idx) == (ADIO_Offset)(MPIU_Upint)((MPIU_Upint)buf + user_buf_idx)); \
835 ADIOI_Assert((ADIO_Size) size_in_buf == (size_t)size_in_buf); \
836 memcpy(&(send_buf[p][send_buf_idx[p]]), \
837 ((char *) buf) + user_buf_idx, size_in_buf); \
838 send_buf_idx[p] += size_in_buf; \
839 user_buf_idx += size_in_buf; \
840 flat_buf_sz -= size_in_buf; \
841 if (!flat_buf_sz) { \
842 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
843 else { \
844 flat_buf_idx = 0; \
845 n_buftypes++; \
846 } \
847 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
848 (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
849 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
850 } \
851 size -= size_in_buf; \
852 buf_incr -= size_in_buf; \
853 } \
854 ADIOI_BUF_INCR \
855 }
856
857
858
859
860
861 void ADIOI_Fill_send_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
862 *flat_buf, char **send_buf, ADIO_Offset
863 *offset_list, ADIO_Offset *len_list, int *send_size,
864 MPI_Request *requests, int *sent_to_proc,
865 int nprocs, int myrank,
866 int contig_access_count,
867 ADIO_Offset min_st_offset, ADIO_Offset fd_size,
868 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
869 int *send_buf_idx, int *curr_to_proc,
870 int *done_to_proc, int iter,
871 MPI_Aint buftype_extent)
872 {
873
874
875 int i, p, flat_buf_idx;
876 ADIO_Offset flat_buf_sz, size_in_buf, buf_incr, size;
877 int jj, n_buftypes;
878 ADIO_Offset off, len, rem_len, user_buf_idx;
879
880
881
882
883
884
885
886
887 for (i=0; i < nprocs; i++) {
888 send_buf_idx[i] = curr_to_proc[i] = 0;
889 done_to_proc[i] = sent_to_proc[i];
890 }
891 jj = 0;
892
893 user_buf_idx = flat_buf->indices[0];
894 flat_buf_idx = 0;
895 n_buftypes = 0;
896 flat_buf_sz = flat_buf->blocklens[0];
897
898
899
900
901
902 for (i=0; i<contig_access_count; i++) {
903 off = offset_list[i];
904 rem_len = len_list[i];
905
906
907 while (rem_len != 0) {
908 len = rem_len;
909
910
911
912
913 p = ADIOI_Calc_aggregator(fd,
914 off,
915 min_st_offset,
916 &len,
917 fd_size,
918 fd_start,
919 fd_end);
920
921 if (send_buf_idx[p] < send_size[p]) {
922 if (curr_to_proc[p]+len > done_to_proc[p]) {
923 if (done_to_proc[p] > curr_to_proc[p]) {
924 size = ADIOI_MIN(curr_to_proc[p] + len -
925 done_to_proc[p], send_size[p]-send_buf_idx[p]);
926 buf_incr = done_to_proc[p] - curr_to_proc[p];
927 ADIOI_BUF_INCR
928 ADIOI_Assert((curr_to_proc[p] + len - done_to_proc[p]) == (unsigned)(curr_to_proc[p] + len - done_to_proc[p]));
929 buf_incr = curr_to_proc[p] + len - done_to_proc[p];
930 ADIOI_Assert((done_to_proc[p] + size) == (unsigned)(done_to_proc[p] + size));
931
932 curr_to_proc[p] = done_to_proc[p] + (int)size;
933 ADIOI_BUF_COPY
934 }
935 else {
936 size = ADIOI_MIN(len,send_size[p]-send_buf_idx[p]);
937 buf_incr = len;
938 ADIOI_Assert((curr_to_proc[p] + size) == (unsigned)((ADIO_Offset)curr_to_proc[p] + size));
939 curr_to_proc[p] += size;
940 ADIOI_BUF_COPY
941 }
942 if (send_buf_idx[p] == send_size[p]) {
943 MPI_Isend(send_buf[p], send_size[p], MPI_BYTE, p,
944 myrank+p+100*iter, fd->comm, requests+jj);
945 jj++;
946 }
947 }
948 else {
949 ADIOI_Assert((curr_to_proc[p] + len) == (unsigned)((ADIO_Offset)curr_to_proc[p] + len));
950 curr_to_proc[p] += len;
951 buf_incr = len;
952 ADIOI_BUF_INCR
953 }
954 }
955 else {
956 buf_incr = len;
957 ADIOI_BUF_INCR
958 }
959 off += len;
960 rem_len -= len;
961 }
962 }
963 for (i=0; i < nprocs; i++)
964 if (send_size[i]) sent_to_proc[i] = curr_to_proc[i];
965 }
966
967
968
969 void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count,
970 ADIO_Offset *srt_off, int *srt_len, int *start_pos,
971 int nprocs, int nprocs_recv, int total_elements)
972 {
973 typedef struct {
974 ADIO_Offset *off_list;
975 ADIO_Offset *len_list;
976 int nelem;
977 } heap_struct;
978
979 heap_struct *a, tmp;
980 int i, j, heapsize, l, r, k, smallest;
981
982 a = (heap_struct *) ADIOI_Malloc((nprocs_recv+1)*sizeof(heap_struct));
983
984 j = 0;
985 for (i=0; i<nprocs; i++)
986 if (count[i]) {
987 a[j].off_list = &(others_req[i].offsets[start_pos[i]]);
988 a[j].len_list = &(others_req[i].lens[start_pos[i]]);
989 a[j].nelem = count[i];
990 j++;
991 }
992
993
994
995
996 heapsize = nprocs_recv;
997 for (i=heapsize/2 - 1; i>=0; i--) {
998
999
1000
1001
1002 k = i;
1003 for(;;) {
1004 l = 2*(k+1) - 1;
1005 r = 2*(k+1);
1006
1007 if ((l < heapsize) &&
1008 (*(a[l].off_list) < *(a[k].off_list)))
1009 smallest = l;
1010 else smallest = k;
1011
1012 if ((r < heapsize) &&
1013 (*(a[r].off_list) < *(a[smallest].off_list)))
1014 smallest = r;
1015
1016 if (smallest != k) {
1017 tmp.off_list = a[k].off_list;
1018 tmp.len_list = a[k].len_list;
1019 tmp.nelem = a[k].nelem;
1020
1021 a[k].off_list = a[smallest].off_list;
1022 a[k].len_list = a[smallest].len_list;
1023 a[k].nelem = a[smallest].nelem;
1024
1025 a[smallest].off_list = tmp.off_list;
1026 a[smallest].len_list = tmp.len_list;
1027 a[smallest].nelem = tmp.nelem;
1028
1029 k = smallest;
1030 }
1031 else break;
1032 }
1033 }
1034
1035 for (i=0; i<total_elements; i++) {
1036
1037 srt_off[i] = *(a[0].off_list);
1038 srt_len[i] = *(a[0].len_list);
1039 (a[0].nelem)--;
1040
1041 if (!a[0].nelem) {
1042 a[0].off_list = a[heapsize-1].off_list;
1043 a[0].len_list = a[heapsize-1].len_list;
1044 a[0].nelem = a[heapsize-1].nelem;
1045 heapsize--;
1046 }
1047 else {
1048 (a[0].off_list)++;
1049 (a[0].len_list)++;
1050 }
1051
1052
1053 k = 0;
1054 for (;;) {
1055 l = 2*(k+1) - 1;
1056 r = 2*(k+1);
1057
1058 if ((l < heapsize) &&
1059 (*(a[l].off_list) < *(a[k].off_list)))
1060 smallest = l;
1061 else smallest = k;
1062
1063 if ((r < heapsize) &&
1064 (*(a[r].off_list) < *(a[smallest].off_list)))
1065 smallest = r;
1066
1067 if (smallest != k) {
1068 tmp.off_list = a[k].off_list;
1069 tmp.len_list = a[k].len_list;
1070 tmp.nelem = a[k].nelem;
1071
1072 a[k].off_list = a[smallest].off_list;
1073 a[k].len_list = a[smallest].len_list;
1074 a[k].nelem = a[smallest].nelem;
1075
1076 a[smallest].off_list = tmp.off_list;
1077 a[smallest].len_list = tmp.len_list;
1078 a[smallest].nelem = tmp.nelem;
1079
1080 k = smallest;
1081 }
1082 else break;
1083 }
1084 }
1085 ADIOI_Free(a);
1086 }