This source file includes following definitions.
- ADIOI_LUSTRE_WriteStridedColl
- ADIOI_LUSTRE_Exch_and_write
- ADIOI_LUSTRE_W_Exchange_data
- ADIOI_LUSTRE_Fill_send_buffer
1
2
3
4
5
6
7
8
9
10
11 #include "ad_lustre.h"
12 #include "adio_extern.h"
13
14
15 static void ADIOI_LUSTRE_Exch_and_write(ADIO_File fd, const void *buf,
16 MPI_Datatype datatype, int nprocs,
17 int myrank,
18 ADIOI_Access *others_req,
19 ADIOI_Access *my_req,
20 ADIO_Offset *offset_list,
21 ADIO_Offset *len_list,
22 int contig_access_count,
23 int *striping_info,
24 int **buf_idx, int *error_code);
25 static void ADIOI_LUSTRE_Fill_send_buffer(ADIO_File fd, const void *buf,
26 ADIOI_Flatlist_node *flat_buf,
27 char **send_buf,
28 ADIO_Offset *offset_list,
29 ADIO_Offset *len_list, int *send_size,
30 MPI_Request *requests,
31 int *sent_to_proc, int nprocs,
32 int myrank, int contig_access_count,
33 int *striping_info,
34 int *send_buf_idx,
35 int *curr_to_proc,
36 int *done_to_proc, int iter,
37 MPI_Aint buftype_extent);
38 static void ADIOI_LUSTRE_W_Exchange_data(ADIO_File fd, const void *buf,
39 char *write_buf,
40 ADIOI_Flatlist_node *flat_buf,
41 ADIO_Offset *offset_list,
42 ADIO_Offset *len_list, int *send_size,
43 int *recv_size, ADIO_Offset off,
44 int size, int *count,
45 int *start_pos,
46 int *sent_to_proc, int nprocs,
47 int myrank, int buftype_is_contig,
48 int contig_access_count,
49 int *striping_info,
50 ADIOI_Access *others_req,
51 int *send_buf_idx,
52 int *curr_to_proc,
53 int *done_to_proc, int *hole,
54 int iter, MPI_Aint buftype_extent,
55 int *buf_idx,
56 ADIO_Offset **srt_off, int **srt_len, int *srt_num,
57 int *error_code);
58 void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count,
59 ADIO_Offset *srt_off, int *srt_len, int *start_pos,
60 int nprocs, int nprocs_recv, int total_elements);
61
62 void ADIOI_LUSTRE_WriteStridedColl(ADIO_File fd, const void *buf, int count,
63 MPI_Datatype datatype,
64 int file_ptr_type, ADIO_Offset offset,
65 ADIO_Status *status, int *error_code)
66 {
67
68
69
70
71
72
73
74 ADIOI_Access *my_req;
75
76
77
78 ADIOI_Access *others_req;
79
80
81
82 int i, filetype_is_contig, nprocs, myrank, do_collect = 0;
83 int contig_access_count = 0, buftype_is_contig, interleave_count = 0;
84 int *count_my_req_per_proc, count_my_req_procs, count_others_req_procs;
85 ADIO_Offset orig_fp, start_offset, end_offset, off;
86 ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *end_offsets = NULL;
87 ADIO_Offset *len_list = NULL;
88 int **buf_idx = NULL, *striping_info = NULL;
89 int old_error, tmp_error;
90
91 MPI_Comm_size(fd->comm, &nprocs);
92 MPI_Comm_rank(fd->comm, &myrank);
93
94 orig_fp = fd->fp_ind;
95
96
97 if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
98
99
100
101
102
103
104
105 ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,
106 &offset_list, &len_list, &start_offset,
107 &end_offset, &contig_access_count);
108
109
110
111
112
113 st_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs * sizeof(ADIO_Offset));
114 end_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs * sizeof(ADIO_Offset));
115 MPI_Allgather(&start_offset, 1, ADIO_OFFSET, st_offsets, 1,
116 ADIO_OFFSET, fd->comm);
117 MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1,
118 ADIO_OFFSET, fd->comm);
119
120 for (i = 1; i < nprocs; i++)
121 if ((st_offsets[i] < end_offsets[i-1]) &&
122 (st_offsets[i] <= end_offsets[i]))
123 interleave_count++;
124
125
126
127
128
129
130
131 if (interleave_count > 0) {
132 do_collect = 1;
133 } else {
134 do_collect = ADIOI_LUSTRE_Docollect(fd, contig_access_count,
135 len_list, nprocs);
136 }
137 }
138 ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
139
140
141 if ((!do_collect && fd->hints->cb_write == ADIOI_HINT_AUTO) ||
142 fd->hints->cb_write == ADIOI_HINT_DISABLE) {
143
144
145 if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
146 ADIOI_Free(offset_list);
147 ADIOI_Free(len_list);
148 ADIOI_Free(st_offsets);
149 ADIOI_Free(end_offsets);
150 }
151
152 fd->fp_ind = orig_fp;
153 ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
154 if (buftype_is_contig && filetype_is_contig) {
155 if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
156 off = fd->disp + (ADIO_Offset)(fd->etype_size) * offset;
157 ADIO_WriteContig(fd, buf, count, datatype,
158 ADIO_EXPLICIT_OFFSET,
159 off, status, error_code);
160 } else
161 ADIO_WriteContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
162 0, status, error_code);
163 } else {
164 ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type,
165 offset, status, error_code);
166 }
167 return;
168 }
169
170
171 ADIOI_LUSTRE_Get_striping_info(fd, &striping_info, 1);
172
173
174
175
176 ADIOI_LUSTRE_Calc_my_req(fd, offset_list, len_list, contig_access_count,
177 striping_info, nprocs, &count_my_req_procs,
178 &count_my_req_per_proc, &my_req,
179 &buf_idx);
180
181
182
183
184
185
186
187
188
189 ADIOI_Calc_others_req(fd, count_my_req_procs, count_my_req_per_proc,
190 my_req, nprocs, myrank, &count_others_req_procs,
191 &others_req);
192 ADIOI_Free(count_my_req_per_proc);
193
194
195 ADIOI_LUSTRE_Exch_and_write(fd, buf, datatype, nprocs, myrank,
196 others_req, my_req, offset_list, len_list,
197 contig_access_count, striping_info,
198 buf_idx, error_code);
199
200
201
202
203
204
205
206
207
208
209
210
211 old_error = *error_code;
212 if (*error_code != MPI_SUCCESS)
213 *error_code = MPI_ERR_IO;
214
215
216
217 #ifdef ADIOI_MPE_LOGGING
218 MPE_Log_event(ADIOI_MPE_postwrite_a, 0, NULL);
219 #endif
220 if (fd->hints->cb_nodes == 1)
221 MPI_Bcast(error_code, 1, MPI_INT,
222 fd->hints->ranklist[0], fd->comm);
223 else {
224 tmp_error = *error_code;
225 MPI_Allreduce(&tmp_error, error_code, 1, MPI_INT,
226 MPI_MAX, fd->comm);
227 }
228 #ifdef ADIOI_MPE_LOGGING
229 MPE_Log_event(ADIOI_MPE_postwrite_b, 0, NULL);
230 #endif
231
232 if ((old_error != MPI_SUCCESS) && (old_error != MPI_ERR_IO))
233 *error_code = old_error;
234
235
236 if (!buftype_is_contig)
237 ADIOI_Delete_flattened(datatype);
238
239
240
241 for (i = 0; i < nprocs; i++) {
242 if (others_req[i].count) {
243 ADIOI_Free(others_req[i].offsets);
244 ADIOI_Free(others_req[i].lens);
245 ADIOI_Free(others_req[i].mem_ptrs);
246 }
247 }
248 ADIOI_Free(others_req);
249
250 for (i = 0; i < nprocs; i++) {
251 if (my_req[i].count) {
252 ADIOI_Free(my_req[i].offsets);
253 ADIOI_Free(my_req[i].lens);
254 }
255 }
256 ADIOI_Free(my_req);
257 for (i = 0; i < nprocs; i++) {
258 ADIOI_Free(buf_idx[i]);
259 }
260 ADIOI_Free(buf_idx);
261 ADIOI_Free(offset_list);
262 ADIOI_Free(len_list);
263 ADIOI_Free(st_offsets);
264 ADIOI_Free(end_offsets);
265 ADIOI_Free(striping_info);
266
267 #ifdef HAVE_STATUS_SET_BYTES
268 if (status) {
269 MPI_Count bufsize, size;
270
271 MPI_Type_size_x(datatype, &size);
272 bufsize = size * count;
273 MPIR_Status_set_bytes(status, datatype, bufsize);
274 }
275
276
277
278 #endif
279
280 fd->fp_sys_posn = -1;
281 }
282
283
284
285
286 static void ADIOI_LUSTRE_Exch_and_write(ADIO_File fd, const void *buf,
287 MPI_Datatype datatype, int nprocs,
288 int myrank, ADIOI_Access *others_req,
289 ADIOI_Access *my_req,
290 ADIO_Offset *offset_list,
291 ADIO_Offset *len_list,
292 int contig_access_count,
293 int *striping_info, int **buf_idx,
294 int *error_code)
295 {
296
297
298
299
300
301
302
303
304
305
306 int hole, i, j, m, flag, ntimes = 1 , max_ntimes, buftype_is_contig;
307 ADIO_Offset st_loc = -1, end_loc = -1, min_st_loc, max_end_loc;
308 ADIO_Offset off, req_off, send_off, iter_st_off, *off_list;
309 ADIO_Offset max_size, step_size = 0;
310 int real_size, req_len, send_len;
311 int *recv_curr_offlen_ptr, *recv_count, *recv_size;
312 int *send_curr_offlen_ptr, *send_size;
313 int *sent_to_proc, *recv_start_pos;
314 int *send_buf_idx, *curr_to_proc, *done_to_proc;
315 int *this_buf_idx;
316 char *write_buf = NULL;
317 MPI_Status status;
318 ADIOI_Flatlist_node *flat_buf = NULL;
319 MPI_Aint buftype_extent, buftype_lb;
320 int stripe_size = striping_info[0], avail_cb_nodes = striping_info[2];
321 int data_sieving = 0;
322 ADIO_Offset *srt_off = NULL;
323 int *srt_len = NULL;
324 int srt_num = 0;
325 ADIO_Offset block_offset;
326 int block_len;
327
328 *error_code = MPI_SUCCESS;
329
330
331
332
333
334
335
336
337
338 for (i = 0; i < nprocs; i++) {
339 if (others_req[i].count) {
340 st_loc = others_req[i].offsets[0];
341 end_loc = others_req[i].offsets[0];
342 break;
343 }
344 }
345 for (i = 0; i < nprocs; i++) {
346 for (j = 0; j < others_req[i].count; j++) {
347 st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);
348 end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j] +
349 others_req[i].lens[j] - 1));
350 }
351 }
352
353 if ((st_loc == -1) && (end_loc == -1))
354 ntimes = 0;
355 MPI_Allreduce(&end_loc, &max_end_loc, 1, MPI_LONG_LONG_INT, MPI_MAX, fd->comm);
356
357 if (st_loc == -1)
358 st_loc = max_end_loc;
359 MPI_Allreduce(&st_loc, &min_st_loc, 1, MPI_LONG_LONG_INT, MPI_MIN, fd->comm);
360
361 min_st_loc -= min_st_loc % (ADIO_Offset)stripe_size;
362
363
364
365
366
367 step_size = (ADIO_Offset) avail_cb_nodes * stripe_size;
368 max_ntimes = (max_end_loc - min_st_loc + 1) / step_size
369 + (((max_end_loc - min_st_loc + 1) % step_size) ? 1 : 0);
370
371 if (ntimes)
372 write_buf = (char *) ADIOI_Malloc(stripe_size);
373
374
375 off_list = (ADIO_Offset *) ADIOI_Malloc(max_ntimes * sizeof(ADIO_Offset));
376 for (m = 0; m < max_ntimes; m ++)
377 off_list[m] = max_end_loc;
378 for (i = 0; i < nprocs; i++) {
379 for (j = 0; j < others_req[i].count; j ++) {
380 req_off = others_req[i].offsets[j];
381 m = (int)((req_off - min_st_loc) / step_size);
382 off_list[m] = ADIOI_MIN(off_list[m], req_off);
383 }
384 }
385
386 recv_curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
387 send_curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
388
389
390 recv_count = (int *) ADIOI_Malloc(nprocs * sizeof(int));
391
392
393
394 send_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
395
396
397
398 recv_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
399
400
401 sent_to_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int));
402
403
404
405 send_buf_idx = (int *) ADIOI_Malloc(nprocs * sizeof(int));
406 curr_to_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int));
407 done_to_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int));
408
409
410 this_buf_idx = (int *) ADIOI_Malloc(nprocs * sizeof(int));
411
412 recv_start_pos = (int *) ADIOI_Malloc(nprocs * sizeof(int));
413
414
415
416 ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
417 if (!buftype_is_contig) {
418 flat_buf = ADIOI_Flatten_and_find(datatype);
419 }
420 MPI_Type_get_extent(datatype, &buftype_lb, &buftype_extent);
421
422
423
424
425
426
427
428
429
430
431
432 iter_st_off = min_st_loc;
433
434
435
436
437
438
439
440
441
442
443
444 data_sieving = fd->hints->fs_hints.lustre.ds_in_coll;
445
446 for (m = 0; m < max_ntimes; m++) {
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470 for (i = 0; i < nprocs; i++)
471 recv_count[i] = recv_size[i] = send_size[i] = 0;
472
473 off = off_list[m];
474 max_size = ADIOI_MIN(step_size, max_end_loc - iter_st_off + 1);
475 real_size = (int) ADIOI_MIN((off / stripe_size + 1) * stripe_size -
476 off,
477 end_loc - off + 1);
478
479 for (i = 0; i < nprocs; i++) {
480 if (my_req[i].count) {
481 this_buf_idx[i] = buf_idx[i][send_curr_offlen_ptr[i]];
482 for (j = send_curr_offlen_ptr[i]; j < my_req[i].count; j++) {
483 send_off = my_req[i].offsets[j];
484 send_len = my_req[i].lens[j];
485 if (send_off < iter_st_off + max_size) {
486 send_size[i] += send_len;
487 } else {
488 break;
489 }
490 }
491 send_curr_offlen_ptr[i] = j;
492 }
493 if (others_req[i].count) {
494 recv_start_pos[i] = recv_curr_offlen_ptr[i];
495 for (j = recv_curr_offlen_ptr[i]; j < others_req[i].count; j++) {
496 req_off = others_req[i].offsets[j];
497 req_len = others_req[i].lens[j];
498 if (req_off < iter_st_off + max_size) {
499 recv_count[i]++;
500 ADIOI_Assert((((ADIO_Offset)(MPIU_Upint)write_buf)+req_off-off) == (ADIO_Offset)(MPIU_Upint)(write_buf+req_off-off));
501 MPI_Get_address(write_buf + req_off - off,
502 &(others_req[i].mem_ptrs[j]));
503 recv_size[i] += req_len;
504 } else {
505 break;
506 }
507 }
508 recv_curr_offlen_ptr[i] = j;
509 }
510 }
511
512 hole = data_sieving;
513 ADIOI_LUSTRE_W_Exchange_data(fd, buf, write_buf, flat_buf, offset_list,
514 len_list, send_size, recv_size, off, real_size,
515 recv_count, recv_start_pos,
516 sent_to_proc, nprocs, myrank,
517 buftype_is_contig, contig_access_count,
518 striping_info, others_req, send_buf_idx,
519 curr_to_proc, done_to_proc, &hole, m,
520 buftype_extent, this_buf_idx,
521 &srt_off, &srt_len, &srt_num, error_code);
522
523 if (*error_code != MPI_SUCCESS)
524 goto over;
525
526 flag = 0;
527 for (i = 0; i < nprocs; i++)
528 if (recv_count[i]) {
529 flag = 1;
530 break;
531 }
532 if (flag) {
533
534 if(data_sieving == ADIOI_HINT_ENABLE) {
535 ADIO_WriteContig(fd, write_buf, real_size, MPI_BYTE,
536 ADIO_EXPLICIT_OFFSET, off, &status,
537 error_code);
538 } else {
539
540
541 if (!hole) {
542 ADIO_WriteContig(fd, write_buf, real_size, MPI_BYTE,
543 ADIO_EXPLICIT_OFFSET, off, &status,
544 error_code);
545 } else {
546 block_offset = -1;
547 block_len = 0;
548 for (i = 0; i < srt_num; ++i) {
549 if (srt_off[i] < off + real_size &&
550 srt_off[i] >= off) {
551 if (block_offset == -1) {
552 block_offset = srt_off[i];
553 block_len = srt_len[i];
554 } else {
555 if (srt_off[i] == block_offset + block_len) {
556 block_len += srt_len[i];
557 } else {
558 ADIO_WriteContig(fd,
559 write_buf + block_offset - off,
560 block_len,
561 MPI_BYTE, ADIO_EXPLICIT_OFFSET,
562 block_offset, &status,
563 error_code);
564 if (*error_code != MPI_SUCCESS)
565 goto over;
566 block_offset = srt_off[i];
567 block_len = srt_len[i];
568 }
569 }
570 }
571 }
572 if (block_offset != -1) {
573 ADIO_WriteContig(fd,
574 write_buf + block_offset - off,
575 block_len,
576 MPI_BYTE, ADIO_EXPLICIT_OFFSET,
577 block_offset, &status,
578 error_code);
579 if (*error_code != MPI_SUCCESS)
580 goto over;
581 }
582 }
583 }
584 if (*error_code != MPI_SUCCESS)
585 goto over;
586 }
587 iter_st_off += max_size;
588 }
589 over:
590 if (srt_off)
591 ADIOI_Free(srt_off);
592 if (srt_len)
593 ADIOI_Free(srt_len);
594 if (ntimes)
595 ADIOI_Free(write_buf);
596 ADIOI_Free(recv_curr_offlen_ptr);
597 ADIOI_Free(send_curr_offlen_ptr);
598 ADIOI_Free(recv_count);
599 ADIOI_Free(send_size);
600 ADIOI_Free(recv_size);
601 ADIOI_Free(sent_to_proc);
602 ADIOI_Free(recv_start_pos);
603 ADIOI_Free(send_buf_idx);
604 ADIOI_Free(curr_to_proc);
605 ADIOI_Free(done_to_proc);
606 ADIOI_Free(this_buf_idx);
607 ADIOI_Free(off_list);
608 }
609
610
611
612
613 static void ADIOI_LUSTRE_W_Exchange_data(ADIO_File fd, const void *buf,
614 char *write_buf,
615 ADIOI_Flatlist_node *flat_buf,
616 ADIO_Offset *offset_list,
617 ADIO_Offset *len_list, int *send_size,
618 int *recv_size, ADIO_Offset off,
619 int size, int *count,
620 int *start_pos,
621 int *sent_to_proc, int nprocs,
622 int myrank, int buftype_is_contig,
623 int contig_access_count,
624 int *striping_info,
625 ADIOI_Access *others_req,
626 int *send_buf_idx,
627 int *curr_to_proc, int *done_to_proc,
628 int *hole, int iter,
629 MPI_Aint buftype_extent,
630 int *buf_idx,
631 ADIO_Offset **srt_off, int **srt_len, int *srt_num,
632 int *error_code)
633 {
634 int i, j, nprocs_recv, nprocs_send, err;
635 char **send_buf = NULL;
636 MPI_Request *requests, *send_req;
637 MPI_Datatype *recv_types;
638 MPI_Status *statuses, status;
639 int sum_recv;
640 int data_sieving = *hole;
641 static char myname[] = "ADIOI_W_EXCHANGE_DATA";
642
643
644 nprocs_recv = 0;
645 for (i = 0; i < nprocs; i++)
646 if (recv_size[i])
647 nprocs_recv++;
648
649 recv_types = (MPI_Datatype *) ADIOI_Malloc((nprocs_recv + 1) *
650 sizeof(MPI_Datatype));
651
652
653 j = 0;
654 for (i = 0; i < nprocs; i++) {
655 if (recv_size[i]) {
656 ADIOI_Type_create_hindexed_x(count[i],
657 &(others_req[i].lens[start_pos[i]]),
658 &(others_req[i].mem_ptrs[start_pos[i]]),
659 MPI_BYTE, recv_types + j);
660
661 MPI_Type_commit(recv_types + j);
662 j++;
663 }
664 }
665
666
667
668
669
670
671 *srt_num = 0;
672 for (i = 0; i < nprocs; i++)
673 *srt_num += count[i];
674 if (*srt_off)
675 *srt_off = (ADIO_Offset *) ADIOI_Realloc(*srt_off, (*srt_num + 1) * sizeof(ADIO_Offset));
676 else
677 *srt_off = (ADIO_Offset *) ADIOI_Malloc((*srt_num + 1) * sizeof(ADIO_Offset));
678 if (*srt_len)
679 *srt_len = (int *) ADIOI_Realloc(*srt_len, (*srt_num + 1) * sizeof(int));
680 else
681 *srt_len = (int *) ADIOI_Malloc((*srt_num + 1) * sizeof(int));
682
683
684 ADIOI_Heap_merge(others_req, count, *srt_off, *srt_len, start_pos,
685 nprocs, nprocs_recv, *srt_num);
686
687
688 *hole = 0;
689 for (i = 0; i < *srt_num - 1; i++) {
690 if ((*srt_off)[i] + (*srt_len)[i] < (*srt_off)[i + 1]) {
691 *hole = 1;
692 break;
693 }
694 }
695
696
697
698
699
700
701 if (*hole == 0) {
702 sum_recv = 0;
703 for (i = 0; i < nprocs; i++)
704 sum_recv += recv_size[i];
705 if (size > sum_recv)
706 *hole = 1;
707 }
708
709 if (data_sieving == ADIOI_HINT_ENABLE && nprocs_recv && *hole) {
710 ADIO_ReadContig(fd, write_buf, size, MPI_BYTE,
711 ADIO_EXPLICIT_OFFSET, off, &status, &err);
712
713 if (err != MPI_SUCCESS) {
714 *error_code = MPIO_Err_create_code(err,
715 MPIR_ERR_RECOVERABLE,
716 myname, __LINE__,
717 MPI_ERR_IO,
718 "**ioRMWrdwr", 0);
719 ADIOI_Free(recv_types);
720 return;
721 }
722
723 }
724
725 nprocs_send = 0;
726 for (i = 0; i < nprocs; i++)
727 if (send_size[i])
728 nprocs_send++;
729
730 if (fd->atomicity) {
731
732 requests = (MPI_Request *) ADIOI_Malloc((nprocs_send + 1) *
733 sizeof(MPI_Request));
734 send_req = requests;
735 } else {
736 requests = (MPI_Request *) ADIOI_Malloc((nprocs_send + nprocs_recv + 1)*
737 sizeof(MPI_Request));
738
739
740
741 j = 0;
742 for (i = 0; i < nprocs; i++) {
743 if (recv_size[i]) {
744 MPI_Irecv(MPI_BOTTOM, 1, recv_types[j], i,
745 myrank + i + 100 * iter, fd->comm, requests + j);
746 j++;
747 }
748 }
749 send_req = requests + nprocs_recv;
750 }
751
752
753
754
755
756 if (buftype_is_contig) {
757 j = 0;
758 for (i = 0; i < nprocs; i++)
759 if (send_size[i]) {
760 ADIOI_Assert(buf_idx[i] != -1);
761 MPI_Isend(((char *) buf) + buf_idx[i], send_size[i],
762 MPI_BYTE, i, myrank + i + 100 * iter, fd->comm,
763 send_req + j);
764 j++;
765 }
766 } else
767 if (nprocs_send) {
768
769 send_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char *));
770 for (i = 0; i < nprocs; i++)
771 if (send_size[i])
772 send_buf[i] = (char *) ADIOI_Malloc(send_size[i]);
773
774 ADIOI_LUSTRE_Fill_send_buffer(fd, buf, flat_buf, send_buf, offset_list,
775 len_list, send_size, send_req,
776 sent_to_proc, nprocs, myrank,
777 contig_access_count, striping_info,
778 send_buf_idx, curr_to_proc, done_to_proc,
779 iter, buftype_extent);
780
781 }
782
783
784 if (fd->atomicity) {
785 j = 0;
786 for (i = 0; i < nprocs; i++) {
787 MPI_Status wkl_status;
788 if (recv_size[i]) {
789 MPI_Recv(MPI_BOTTOM, 1, recv_types[j], i,
790 myrank + i + 100 * iter, fd->comm, &wkl_status);
791 j++;
792 }
793 }
794 }
795
796 for (i = 0; i < nprocs_recv; i++)
797 MPI_Type_free(recv_types + i);
798 ADIOI_Free(recv_types);
799
800
801
802 if (fd->atomicity) {
803 statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send + 1) *
804 sizeof(MPI_Status));
805 } else {
806 statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send + nprocs_recv + 1) *
807 sizeof(MPI_Status));
808 }
809
810 #ifdef NEEDS_MPI_TEST
811 i = 0;
812 if (fd->atomicity) {
813
814 while (!i)
815 MPI_Testall(nprocs_send, send_req, &i, statuses);
816 } else {
817 while (!i)
818 MPI_Testall(nprocs_send + nprocs_recv, requests, &i, statuses);
819 }
820 #else
821
822 if (fd->atomicity)
823 MPI_Waitall(nprocs_send, send_req, statuses);
824 else
825 MPI_Waitall(nprocs_send + nprocs_recv, requests, statuses);
826 #endif
827 ADIOI_Free(statuses);
828 ADIOI_Free(requests);
829 if (!buftype_is_contig && nprocs_send) {
830 for (i = 0; i < nprocs; i++)
831 if (send_size[i])
832 ADIOI_Free(send_buf[i]);
833 ADIOI_Free(send_buf);
834 }
835 }
836
837 #define ADIOI_BUF_INCR \
838 { \
839 while (buf_incr) { \
840 size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \
841 user_buf_idx += size_in_buf; \
842 flat_buf_sz -= size_in_buf; \
843 if (!flat_buf_sz) { \
844 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
845 else { \
846 flat_buf_idx = 0; \
847 n_buftypes++; \
848 } \
849 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
850 (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
851 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
852 } \
853 buf_incr -= size_in_buf; \
854 } \
855 }
856
857
858 #define ADIOI_BUF_COPY \
859 { \
860 while (size) { \
861 size_in_buf = ADIOI_MIN(size, flat_buf_sz); \
862 ADIOI_Assert((((ADIO_Offset)(MPIU_Upint)buf) + user_buf_idx) == (ADIO_Offset)(MPIU_Upint)((MPIU_Upint)buf + user_buf_idx)); \
863 ADIOI_Assert(size_in_buf == (size_t)size_in_buf); \
864 memcpy(&(send_buf[p][send_buf_idx[p]]), \
865 ((char *) buf) + user_buf_idx, size_in_buf); \
866 send_buf_idx[p] += size_in_buf; \
867 user_buf_idx += size_in_buf; \
868 flat_buf_sz -= size_in_buf; \
869 if (!flat_buf_sz) { \
870 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
871 else { \
872 flat_buf_idx = 0; \
873 n_buftypes++; \
874 } \
875 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
876 (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
877 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
878 } \
879 size -= size_in_buf; \
880 buf_incr -= size_in_buf; \
881 } \
882 ADIOI_BUF_INCR \
883 }
884
885 static void ADIOI_LUSTRE_Fill_send_buffer(ADIO_File fd, const void *buf,
886 ADIOI_Flatlist_node *flat_buf,
887 char **send_buf,
888 ADIO_Offset *offset_list,
889 ADIO_Offset *len_list, int *send_size,
890 MPI_Request *requests,
891 int *sent_to_proc, int nprocs,
892 int myrank,
893 int contig_access_count,
894 int *striping_info,
895 int *send_buf_idx,
896 int *curr_to_proc,
897 int *done_to_proc, int iter,
898 MPI_Aint buftype_extent)
899 {
900
901 int i, p, flat_buf_idx, size;
902 int flat_buf_sz, buf_incr, size_in_buf, jj, n_buftypes;
903 ADIO_Offset off, len, rem_len, user_buf_idx;
904
905
906
907
908
909
910
911
912
913 for (i = 0; i < nprocs; i++) {
914 send_buf_idx[i] = curr_to_proc[i] = 0;
915 done_to_proc[i] = sent_to_proc[i];
916 }
917 jj = 0;
918
919 user_buf_idx = flat_buf->indices[0];
920 flat_buf_idx = 0;
921 n_buftypes = 0;
922 flat_buf_sz = flat_buf->blocklens[0];
923
924
925
926
927 for (i = 0; i < contig_access_count; i++) {
928 off = offset_list[i];
929 rem_len = (ADIO_Offset) len_list[i];
930
931
932 while (rem_len != 0) {
933 len = rem_len;
934
935
936
937
938 p = ADIOI_LUSTRE_Calc_aggregator(fd, off, &len, striping_info);
939
940 if (send_buf_idx[p] < send_size[p]) {
941 if (curr_to_proc[p] + len > done_to_proc[p]) {
942 if (done_to_proc[p] > curr_to_proc[p]) {
943 size = (int) ADIOI_MIN(curr_to_proc[p] + len -
944 done_to_proc[p],
945 send_size[p] -
946 send_buf_idx[p]);
947 buf_incr = done_to_proc[p] - curr_to_proc[p];
948 ADIOI_BUF_INCR
949 ADIOI_Assert((curr_to_proc[p] + len - done_to_proc[p]) == (unsigned)(curr_to_proc[p] + len - done_to_proc[p]));
950 buf_incr = (int) (curr_to_proc[p] + len -
951 done_to_proc[p]);
952 ADIOI_Assert((done_to_proc[p] + size) == (unsigned)(done_to_proc[p] + size));
953 curr_to_proc[p] = done_to_proc[p] + size;
954 ADIOI_BUF_COPY
955 } else {
956 size = (int) ADIOI_MIN(len, send_size[p] -
957 send_buf_idx[p]);
958 buf_incr = (int) len;
959 ADIOI_Assert((curr_to_proc[p] + size) == (unsigned)((ADIO_Offset)curr_to_proc[p] + size));
960 curr_to_proc[p] += size;
961 ADIOI_BUF_COPY
962 }
963 if (send_buf_idx[p] == send_size[p]) {
964 MPI_Isend(send_buf[p], send_size[p], MPI_BYTE, p,
965 myrank + p + 100 * iter, fd->comm,
966 requests + jj);
967 jj++;
968 }
969 } else {
970 ADIOI_Assert((curr_to_proc[p] + len) == (unsigned)((ADIO_Offset)curr_to_proc[p] + len));
971 curr_to_proc[p] += (int) len;
972 buf_incr = (int) len;
973 ADIOI_BUF_INCR
974 }
975 } else {
976 buf_incr = (int) len;
977 ADIOI_BUF_INCR
978 }
979 off += len;
980 rem_len -= len;
981 }
982 }
983 for (i = 0; i < nprocs; i++)
984 if (send_size[i])
985 sent_to_proc[i] = curr_to_proc[i];
986 }