This source file includes following definitions.
- ADIOI_GPFS_WriteStridedColl
- gpfs_wr_access_start
- gpfs_wr_access_end
- gpfs_find_access_for_ion
- ADIOI_Exch_and_write
- ADIOI_W_Exchange_data
- ADIOI_Fill_send_buffer
- ADIOI_Heap_merge
- ADIOI_W_Exchange_data_alltoallv
- ADIOI_Fill_send_buffer_nosend
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 #include "adio.h"
16 #include "adio_extern.h"
17 #include "ad_gpfs.h"
18 #include "ad_gpfs_aggrs.h"
19
20 #ifdef BGQPLATFORM
21 #include "bg/ad_bg_pset.h"
22 #endif
23
24 #ifdef AGGREGATION_PROFILE
25 #include "mpe.h"
26 #endif
27 #ifdef PROFILE
28 #include "mpe.h"
29 #endif
30
31 #include <pthread.h>
32
33 #ifdef HAVE_GPFS_H
34 #include <gpfs.h>
35 #endif
36 #ifdef HAVE_GPFS_FCNTL_H
37 #include <gpfs_fcntl.h>
38 #endif
39
40 #include <limits.h>
41
42 static void ADIOI_Exch_and_write(ADIO_File fd, const void *buf, MPI_Datatype
43 datatype, int nprocs, int myrank, ADIOI_Access
44 *others_req, ADIO_Offset *offset_list,
45 ADIO_Offset *len_list, int contig_access_count, ADIO_Offset
46 min_st_offset, ADIO_Offset fd_size,
47 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
48 int *buf_idx, int *error_code);
49 static void ADIOI_W_Exchange_data(ADIO_File fd, const void *buf, char *write_buf,
50 ADIOI_Flatlist_node *flat_buf, ADIO_Offset
51 *offset_list, ADIO_Offset *len_list, int *send_size,
52 int *recv_size, ADIO_Offset off, int size,
53 int *count, int *start_pos, int *partial_recv,
54 int *sent_to_proc, int nprocs,
55 int myrank, int
56 buftype_is_contig, int contig_access_count,
57 ADIO_Offset min_st_offset, ADIO_Offset fd_size,
58 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
59 ADIOI_Access *others_req,
60 int *send_buf_idx, int *curr_to_proc,
61 int *done_to_proc, int *hole, int iter,
62 MPI_Aint buftype_extent, int *buf_idx, int *error_code);
63 static void ADIOI_W_Exchange_data_alltoallv(
64 ADIO_File fd, const void *buf,
65 char *write_buf,
66 ADIOI_Flatlist_node *flat_buf,
67 ADIO_Offset *offset_list,
68 ADIO_Offset *len_list, int *send_size, int *recv_size,
69 ADIO_Offset off, int size,
70 int *count, int *start_pos, int *partial_recv,
71 int *sent_to_proc, int nprocs, int myrank,
72 int buftype_is_contig, int contig_access_count,
73 ADIO_Offset min_st_offset,
74 ADIO_Offset fd_size,
75 ADIO_Offset *fd_start,
76 ADIO_Offset *fd_end,
77 ADIOI_Access *others_req,
78 int *send_buf_idx, int *curr_to_proc,
79 int *done_to_proc, int *hole,
80 int iter, MPI_Aint buftype_extent, int *buf_idx,
81 int *error_code);
82 static void ADIOI_Fill_send_buffer(ADIO_File fd, const void *buf, ADIOI_Flatlist_node
83 *flat_buf, char **send_buf, ADIO_Offset
84 *offset_list, ADIO_Offset *len_list, int *send_size,
85 MPI_Request *requests, int *sent_to_proc,
86 int nprocs, int myrank,
87 int contig_access_count, ADIO_Offset
88 min_st_offset, ADIO_Offset fd_size,
89 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
90 int *send_buf_idx, int *curr_to_proc,
91 int *done_to_proc, int iter,
92 MPI_Aint buftype_extent);
93 static void ADIOI_Fill_send_buffer_nosend(ADIO_File fd, const void *buf, ADIOI_Flatlist_node
94 *flat_buf, char **send_buf, ADIO_Offset
95 *offset_list, ADIO_Offset *len_list, int *send_size,
96 MPI_Request *requests, int *sent_to_proc,
97 int nprocs, int myrank,
98 int contig_access_count, ADIO_Offset
99 min_st_offset, ADIO_Offset fd_size,
100 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
101 int *send_buf_idx, int *curr_to_proc,
102 int *done_to_proc, int iter,
103 MPI_Aint buftype_extent);
104 static void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count,
105 ADIO_Offset *srt_off, int *srt_len, int *start_pos,
106 int nprocs, int nprocs_recv, int total_elements);
107
108
109 void ADIOI_GPFS_WriteStridedColl(ADIO_File fd, const void *buf, int count,
110 MPI_Datatype datatype, int file_ptr_type,
111 ADIO_Offset offset, ADIO_Status *status, int
112 *error_code)
113 {
114
115
116
117
118
119
120 ADIOI_Access *my_req;
121
122
123
124 ADIOI_Access *others_req;
125
126
127
128 int i, filetype_is_contig, nprocs, nprocs_for_coll, myrank;
129 int contig_access_count=0, interleave_count = 0, buftype_is_contig;
130 int *count_my_req_per_proc, count_my_req_procs, count_others_req_procs;
131 ADIO_Offset orig_fp, start_offset, end_offset, fd_size, min_st_offset, off;
132 ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *fd_start = NULL,
133 *fd_end = NULL, *end_offsets = NULL;
134 ADIO_Offset *gpfs_offsets0 = NULL, *gpfs_offsets = NULL;
135 ADIO_Offset *count_sizes;
136 int ii;
137
138 int *buf_idx = NULL;
139 ADIO_Offset *len_list = NULL;
140 GPFSMPIO_T_CIO_RESET( w )
141 #ifdef PROFILE
142 MPE_Log_event(13, 0, "start computation");
143 #endif
144
145 MPI_Comm_size(fd->comm, &nprocs);
146 MPI_Comm_rank(fd->comm, &myrank);
147
148
149
150
151 nprocs_for_coll = fd->hints->cb_nodes;
152 orig_fp = fd->fp_ind;
153
154 GPFSMPIO_T_CIO_SET_GET( w, 1, 0, GPFSMPIO_CIO_T_MPIO_CRW, GPFSMPIO_CIO_LAST)
155 GPFSMPIO_T_CIO_SET_GET( w, 1, 0, GPFSMPIO_CIO_T_LCOMP, GPFSMPIO_CIO_LAST )
156
157
158
159 if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
160
161
162
163
164
165
166 ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,
167 &offset_list, &len_list, &start_offset,
168 &end_offset, &contig_access_count);
169
170 GPFSMPIO_T_CIO_SET_GET( w, 1, 1, GPFSMPIO_CIO_T_GATHER, GPFSMPIO_CIO_T_LCOMP )
171
172
173
174
175
176 st_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
177 end_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
178
179 ADIO_Offset my_count_size=0;
180
181
182
183
184 if ((gpfsmpio_write_aggmethod == 1) || (gpfsmpio_write_aggmethod == 2)) {
185 count_sizes = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
186 MPI_Count buftype_size;
187 MPI_Type_size_x(datatype, &buftype_size);
188 my_count_size = (ADIO_Offset) count * (ADIO_Offset)buftype_size;
189 }
190 if (gpfsmpio_tunegather) {
191 if ((gpfsmpio_write_aggmethod == 1) || (gpfsmpio_write_aggmethod == 2)) {
192 gpfs_offsets0 = (ADIO_Offset *) ADIOI_Malloc(3*nprocs*sizeof(ADIO_Offset));
193 gpfs_offsets = (ADIO_Offset *) ADIOI_Malloc(3*nprocs*sizeof(ADIO_Offset));
194 for (ii=0; ii<nprocs; ii++) {
195 gpfs_offsets0[ii*3] = 0;
196 gpfs_offsets0[ii*3+1] = 0;
197 gpfs_offsets0[ii*3+2] = 0;
198 }
199 gpfs_offsets0[myrank*3] = start_offset;
200 gpfs_offsets0[myrank*3+1] = end_offset;
201 gpfs_offsets0[myrank*3+2] = my_count_size;
202 MPI_Allreduce( gpfs_offsets0, gpfs_offsets, nprocs*3, ADIO_OFFSET, MPI_MAX, fd->comm );
203 for (ii=0; ii<nprocs; ii++) {
204 st_offsets [ii] = gpfs_offsets[ii*3] ;
205 end_offsets[ii] = gpfs_offsets[ii*3+1];
206 count_sizes[ii] = gpfs_offsets[ii*3+2];
207 }
208 }
209 else {
210 gpfs_offsets0 = (ADIO_Offset *) ADIOI_Malloc(2*nprocs*sizeof(ADIO_Offset));
211 gpfs_offsets = (ADIO_Offset *) ADIOI_Malloc(2*nprocs*sizeof(ADIO_Offset));
212 for (ii=0; ii<nprocs; ii++) {
213 gpfs_offsets0[ii*2] = 0;
214 gpfs_offsets0[ii*2+1] = 0;
215 }
216 gpfs_offsets0[myrank*2] = start_offset;
217 gpfs_offsets0[myrank*2+1] = end_offset;
218
219 MPI_Allreduce( gpfs_offsets0, gpfs_offsets, nprocs*2, ADIO_OFFSET, MPI_MAX, fd->comm );
220
221 for (ii=0; ii<nprocs; ii++) {
222 st_offsets [ii] = gpfs_offsets[ii*2] ;
223 end_offsets[ii] = gpfs_offsets[ii*2+1];
224 }
225 }
226 ADIOI_Free( gpfs_offsets0 );
227 ADIOI_Free( gpfs_offsets );
228 } else {
229 MPI_Allgather(&start_offset, 1, ADIO_OFFSET, st_offsets, 1,
230 ADIO_OFFSET, fd->comm);
231 MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1,
232 ADIO_OFFSET, fd->comm);
233 if ((gpfsmpio_write_aggmethod == 1) || (gpfsmpio_write_aggmethod == 2)) {
234 MPI_Allgather(&count_sizes, 1, ADIO_OFFSET, count_sizes, 1,
235 ADIO_OFFSET, fd->comm);
236 }
237 }
238
239 GPFSMPIO_T_CIO_SET_GET(w, 1, 1, GPFSMPIO_CIO_T_PATANA, GPFSMPIO_CIO_T_GATHER )
240
241
242 for (i=1; i<nprocs; i++)
243 if ((st_offsets[i] < end_offsets[i-1]) &&
244 (st_offsets[i] <= end_offsets[i]))
245 interleave_count++;
246
247
248 }
249
250 ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
251
252 if (fd->hints->cb_write == ADIOI_HINT_DISABLE ||
253 (!interleave_count && (fd->hints->cb_write == ADIOI_HINT_AUTO)))
254 {
255
256 if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
257 ADIOI_Free(offset_list);
258 ADIOI_Free(len_list);
259 ADIOI_Free(st_offsets);
260 ADIOI_Free(end_offsets);
261 }
262
263 fd->fp_ind = orig_fp;
264 ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
265
266 if (buftype_is_contig && filetype_is_contig) {
267
268 if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
269 off = fd->disp + (ADIO_Offset)(fd->etype_size) * offset;
270 ADIO_WriteContig(fd, buf, count, datatype,
271 ADIO_EXPLICIT_OFFSET,
272 off, status, error_code);
273 }
274 else ADIO_WriteContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
275 0, status, error_code);
276 }
277 else ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type,
278 offset, status, error_code);
279
280 return;
281 }
282
283 GPFSMPIO_T_CIO_SET_GET( w, 1, 1, GPFSMPIO_CIO_T_FD_PART, GPFSMPIO_CIO_T_PATANA )
284
285
286
287
288
289 int currentValidDataIndex = 0;
290 if ((gpfsmpio_write_aggmethod == 1) || (gpfsmpio_write_aggmethod == 2)) {
291
292
293
294 for (i=0; i<nprocs; i++) {
295 if (count_sizes[i] > 0) {
296 st_offsets[currentValidDataIndex] = st_offsets[i];
297 end_offsets[currentValidDataIndex] = end_offsets[i];
298 currentValidDataIndex++;
299 }
300 }
301 }
302
303 if (gpfsmpio_tuneblocking) {
304 if ((gpfsmpio_write_aggmethod == 1) || (gpfsmpio_write_aggmethod == 2)) {
305 ADIOI_GPFS_Calc_file_domains(fd, st_offsets, end_offsets,
306 currentValidDataIndex,
307 nprocs_for_coll, &min_st_offset,
308 &fd_start, &fd_end, &fd_size, fd->fs_ptr);
309 }
310 else {
311
312 ADIOI_GPFS_Calc_file_domains(fd, st_offsets, end_offsets, nprocs,
313 nprocs_for_coll, &min_st_offset,
314 &fd_start, &fd_end, &fd_size, fd->fs_ptr);
315 }
316 }
317 else {
318 if ((gpfsmpio_write_aggmethod == 1) || (gpfsmpio_write_aggmethod == 2)) {
319 ADIOI_Calc_file_domains(st_offsets, end_offsets, currentValidDataIndex,
320 nprocs_for_coll, &min_st_offset,
321 &fd_start, &fd_end,
322 fd->hints->min_fdomain_size, &fd_size,
323 fd->hints->striping_unit);
324 }
325 else {
326 ADIOI_Calc_file_domains(st_offsets, end_offsets, nprocs,
327 nprocs_for_coll, &min_st_offset,
328 &fd_start, &fd_end,
329 fd->hints->min_fdomain_size, &fd_size,
330 fd->hints->striping_unit);
331 }
332 }
333
334 GPFSMPIO_T_CIO_SET_GET( w, 1, 1, GPFSMPIO_CIO_T_MYREQ, GPFSMPIO_CIO_T_FD_PART );
335
336 if ((gpfsmpio_write_aggmethod == 1) || (gpfsmpio_write_aggmethod == 2)) {
337
338
339
340 int holeFound = 0;
341 ADIOI_OneSidedWriteAggregation(fd, offset_list, len_list, contig_access_count,
342 buf, datatype, error_code, st_offsets, end_offsets,
343 currentValidDataIndex, fd_start, fd_end, &holeFound);
344 int anyHolesFound = 0;
345 if (!gpfsmpio_onesided_no_rmw)
346 MPI_Allreduce(&holeFound, &anyHolesFound, 1, MPI_INT, MPI_MAX, fd->comm);
347 if (anyHolesFound == 0) {
348 GPFSMPIO_T_CIO_REPORT( 1, fd, myrank, nprocs)
349 ADIOI_Free(offset_list);
350 ADIOI_Free(len_list);
351 ADIOI_Free(st_offsets);
352 ADIOI_Free(end_offsets);
353 ADIOI_Free(fd_start);
354 ADIOI_Free(fd_end);
355 ADIOI_Free(count_sizes);
356 goto fn_exit;
357 }
358 else {
359
360
361
362
363
364
365
366 if (gpfsmpio_onesided_inform_rmw && (myrank ==0))
367 FPRINTF(stderr,"Information: Holes found during one-sided "
368 "write aggregation algorithm --- re-running one-sided "
369 "write aggregation with GPFSMPIO_ONESIDED_ALWAYS_RMW set to 1.\n");
370 gpfsmpio_onesided_always_rmw = 1;
371 int prev_gpfsmpio_onesided_no_rmw = gpfsmpio_onesided_no_rmw;
372 gpfsmpio_onesided_no_rmw = 1;
373 ADIOI_OneSidedWriteAggregation(fd, offset_list, len_list, contig_access_count, buf, datatype, error_code, st_offsets, end_offsets, currentValidDataIndex, fd_start, fd_end, &holeFound);
374 gpfsmpio_onesided_no_rmw = prev_gpfsmpio_onesided_no_rmw;
375 GPFSMPIO_T_CIO_REPORT( 1, fd, myrank, nprocs)
376 ADIOI_Free(offset_list);
377 ADIOI_Free(len_list);
378 ADIOI_Free(st_offsets);
379 ADIOI_Free(end_offsets);
380 ADIOI_Free(fd_start);
381 ADIOI_Free(fd_end);
382 ADIOI_Free(count_sizes);
383 goto fn_exit;
384 }
385 }
386 if (gpfsmpio_p2pcontig==1) {
387
388
389
390
391 int inOrderAndNoGaps = 1;
392 for (i=0;i<(nprocs-1);i++) {
393 if (end_offsets[i] != (st_offsets[i+1]-1))
394 inOrderAndNoGaps = 0;
395 }
396 if (inOrderAndNoGaps && buftype_is_contig) {
397
398
399 ADIOI_P2PContigWriteAggregation(fd, buf,
400 error_code, st_offsets, end_offsets, fd_start, fd_end);
401
402 GPFSMPIO_T_CIO_REPORT( 1, fd, myrank, nprocs)
403
404 ADIOI_Free(offset_list);
405 ADIOI_Free(len_list);
406 ADIOI_Free(st_offsets);
407 ADIOI_Free(end_offsets);
408 ADIOI_Free(fd_start);
409 ADIOI_Free(fd_end);
410
411 goto fn_exit;
412 }
413 }
414
415
416
417
418 if (gpfsmpio_tuneblocking)
419 ADIOI_GPFS_Calc_my_req(fd, offset_list, len_list, contig_access_count,
420 min_st_offset, fd_start, fd_end, fd_size,
421 nprocs, &count_my_req_procs,
422 &count_my_req_per_proc, &my_req,
423 &buf_idx);
424 else
425 ADIOI_Calc_my_req(fd, offset_list, len_list, contig_access_count,
426 min_st_offset, fd_start, fd_end, fd_size,
427 nprocs, &count_my_req_procs,
428 &count_my_req_per_proc, &my_req,
429 &buf_idx);
430
431 GPFSMPIO_T_CIO_SET_GET( w, 1, 1, GPFSMPIO_CIO_T_OTHREQ, GPFSMPIO_CIO_T_MYREQ )
432
433
434
435
436
437
438
439
440 if (gpfsmpio_tuneblocking)
441 ADIOI_GPFS_Calc_others_req(fd, count_my_req_procs,
442 count_my_req_per_proc, my_req,
443 nprocs, myrank,
444 &count_others_req_procs, &others_req);
445 else
446 ADIOI_Calc_others_req(fd, count_my_req_procs,
447 count_my_req_per_proc, my_req,
448 nprocs, myrank,
449 &count_others_req_procs, &others_req);
450
451 GPFSMPIO_T_CIO_SET_GET( w, 1, 1, GPFSMPIO_CIO_T_DEXCH, GPFSMPIO_CIO_T_OTHREQ )
452
453 ADIOI_Free(count_my_req_per_proc);
454 for (i=0; i < nprocs; i++) {
455 if (my_req[i].count) {
456 ADIOI_Free(my_req[i].offsets);
457 ADIOI_Free(my_req[i].lens);
458 }
459 }
460 ADIOI_Free(my_req);
461
462
463 ADIOI_Exch_and_write(fd, buf, datatype, nprocs, myrank,
464 others_req, offset_list,
465 len_list, contig_access_count, min_st_offset,
466 fd_size, fd_start, fd_end, buf_idx, error_code);
467
468 GPFSMPIO_T_CIO_SET_GET( w, 0, 1, GPFSMPIO_CIO_LAST, GPFSMPIO_CIO_T_DEXCH )
469 GPFSMPIO_T_CIO_SET_GET( w, 0, 1, GPFSMPIO_CIO_LAST, GPFSMPIO_CIO_T_MPIO_CRW )
470
471 GPFSMPIO_T_CIO_REPORT( 1, fd, myrank, nprocs)
472
473
474 if (!buftype_is_contig) ADIOI_Delete_flattened(datatype);
475
476 for (i=0; i<nprocs; i++) {
477 if (others_req[i].count) {
478 ADIOI_Free(others_req[i].offsets);
479 ADIOI_Free(others_req[i].lens);
480 ADIOI_Free(others_req[i].mem_ptrs);
481 }
482 }
483 ADIOI_Free(others_req);
484
485 ADIOI_Free(buf_idx);
486 ADIOI_Free(offset_list);
487 ADIOI_Free(len_list);
488 ADIOI_Free(st_offsets);
489 ADIOI_Free(end_offsets);
490 ADIOI_Free(fd_start);
491 ADIOI_Free(fd_end);
492
493 fn_exit:
494 #ifdef HAVE_STATUS_SET_BYTES
495 if (status) {
496 MPI_Count bufsize, size;
497
498 MPI_Type_size_x(datatype, &size);
499 bufsize = size * count;
500 MPIR_Status_set_bytes(status, datatype, bufsize);
501 }
502
503
504 #endif
505
506 fd->fp_sys_posn = -1;
507 #ifdef AGGREGATION_PROFILE
508 MPE_Log_event (5013, 0, NULL);
509 #endif
510 }
511
512 static void gpfs_wr_access_start(int fd, ADIO_Offset offset, ADIO_Offset length)
513 {
514 int rc=0;
515 #ifdef HAVE_GPFS_FCNTL_H
516 struct {
517 gpfsFcntlHeader_t header;
518 gpfsAccessRange_t access;
519 } take_locks;
520
521 take_locks.header.totalLength = sizeof(take_locks);
522 take_locks.header.fcntlVersion = GPFS_FCNTL_CURRENT_VERSION;
523 take_locks.header.fcntlReserved = 0;
524
525 take_locks.access.structLen = sizeof(take_locks.access);
526 take_locks.access.structType = GPFS_ACCESS_RANGE;
527 take_locks.access.start = offset;
528 take_locks.access.length = length;
529 take_locks.access.isWrite = 1;
530
531 rc = gpfs_fcntl(fd, &take_locks);
532 #endif
533 ADIOI_Assert(rc == 0);
534 }
535
536 static void gpfs_wr_access_end(int fd, ADIO_Offset offset, ADIO_Offset length)
537 {
538 int rc=0;
539 #ifdef HAVE_GPFS_FCNTL_H
540 struct {
541 gpfsFcntlHeader_t header;
542 gpfsFreeRange_t free;
543 } free_locks;
544
545
546 free_locks.header.totalLength = sizeof(free_locks);
547 free_locks.header.fcntlVersion = GPFS_FCNTL_CURRENT_VERSION;
548 free_locks.header.fcntlReserved = 0;
549
550 free_locks.free.structLen = sizeof(free_locks.free);
551 free_locks.free.structType = GPFS_FREE_RANGE;
552 free_locks.free.start = offset;
553 free_locks.free.length = length;
554
555 rc = gpfs_fcntl(fd, &free_locks);
556 #endif
557 ADIOI_Assert(rc == 0);
558 }
559
560 #ifdef BGQPLATFORM
561
562
563 static int gpfs_find_access_for_ion(ADIO_File fd,
564 ADIO_Offset my_start, ADIO_Offset my_end,
565 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
566 ADIO_Offset *start, ADIO_Offset *end)
567 {
568 int my_ionode = BGQ_IO_node_id();
569 int *rank_to_ionode;
570 int i, nprocs, rank;
571 ADIO_Offset group_start=LLONG_MAX, group_end=0;
572
573 MPI_Comm_size(fd->comm, &nprocs);
574 MPI_Comm_rank(fd->comm, &rank);
575
576 rank_to_ionode = ADIOI_Calloc(nprocs, sizeof(int));
577 MPI_Allgather(&my_ionode, 1, MPI_INT, rank_to_ionode, 1, MPI_INT, fd->comm);
578
579
580
581
582
583
584
585
586
587
588
589 if (my_start == -1 || my_end == -1) {
590 ADIOI_Free(rank_to_ionode);
591 return 0;
592 }
593
594 for (i=0; i<fd->hints->cb_nodes; i++ ){
595 if (my_ionode == rank_to_ionode[fd->hints->ranklist[i]] ) {
596 group_start = ADIOI_MIN(fd_start[i], group_start);
597 group_end = ADIOI_MAX(fd_end[i], group_end);
598 }
599 }
600 *start = group_start;
601 *end = group_end;
602 ADIOI_Free(rank_to_ionode);
603 return 1;
604 }
605 #endif
606
607
608
609
610
611 static void ADIOI_Exch_and_write(ADIO_File fd, const void *buf, MPI_Datatype
612 datatype, int nprocs,
613 int myrank,
614 ADIOI_Access
615 *others_req, ADIO_Offset *offset_list,
616 ADIO_Offset *len_list, int contig_access_count,
617 ADIO_Offset min_st_offset, ADIO_Offset fd_size,
618 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
619 int *buf_idx, int *error_code)
620 {
621
622
623
624
625
626
627
628
629
630
631 ADIO_Offset size=0;
632 int hole, i, j, m, ntimes, max_ntimes, buftype_is_contig;
633 ADIO_Offset st_loc=-1, end_loc=-1, off, done, req_off;
634 char *write_buf=NULL, *write_buf2=NULL;
635 int *curr_offlen_ptr, *count, *send_size, req_len, *recv_size;
636 int *partial_recv, *sent_to_proc, *start_pos, flag;
637 int *send_buf_idx, *curr_to_proc, *done_to_proc;
638 MPI_Status status;
639 ADIOI_Flatlist_node *flat_buf=NULL;
640 MPI_Aint buftype_extent, buftype_lb;
641 int info_flag, coll_bufsize;
642 char *value;
643 static char myname[] = "ADIOI_EXCH_AND_WRITE";
644 pthread_t io_thread;
645 void *thread_ret;
646 ADIOI_IO_ThreadFuncData io_thread_args;
647
648 *error_code = MPI_SUCCESS;
649
650
651
652
653
654
655 value = (char *) ADIOI_Malloc((MPI_MAX_INFO_VAL+1)*sizeof(char));
656 ADIOI_Info_get(fd->info, "cb_buffer_size", MPI_MAX_INFO_VAL, value,
657 &info_flag);
658 coll_bufsize = atoi(value);
659 ADIOI_Free(value);
660
661 if (gpfsmpio_pthreadio == 1){
662
663
664 coll_bufsize = coll_bufsize/2;
665 }
666
667 for (i=0; i < nprocs; i++) {
668 if (others_req[i].count) {
669 st_loc = others_req[i].offsets[0];
670 end_loc = others_req[i].offsets[0];
671 break;
672 }
673 }
674
675 for (i=0; i < nprocs; i++)
676 for (j=0; j < others_req[i].count; j++) {
677 st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);
678 end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j]
679 + others_req[i].lens[j] - 1));
680 }
681
682
683
684 ntimes = (int) ((end_loc - st_loc + coll_bufsize)/coll_bufsize);
685
686 if ((st_loc==-1) && (end_loc==-1)) {
687 ntimes = 0;
688 }
689 if (ntimes > 0) {
690
691 if (getenv("ROMIO_GPFS_DECLARE_ACCESS")!=NULL) {
692 gpfs_wr_access_start(fd->fd_sys, st_loc, end_loc - st_loc);
693 }
694 }
695
696 ADIO_Offset st_loc_ion=0, end_loc_ion=0, needs_gpfs_access_cleanup=0;
697 #ifdef BGQPLATFORM
698 if (ntimes > 0) {
699
700
701 if (getenv("ROMIO_GPFS_DECLARE_ION_ACCESS")!=NULL) {
702 if (gpfs_find_access_for_ion(fd, st_loc, end_loc, fd_start, fd_end,
703 &st_loc_ion, &end_loc_ion)) {
704 gpfs_wr_access_start(fd->fd_sys, st_loc_ion, end_loc_ion-st_loc_ion);
705 needs_gpfs_access_cleanup=1;
706 }
707 }
708 }
709 #endif
710
711 MPI_Allreduce(&ntimes, &max_ntimes, 1, MPI_INT, MPI_MAX,
712 fd->comm);
713
714 write_buf = fd->io_buf;
715 if (gpfsmpio_pthreadio == 1) {
716 write_buf2 = fd->io_buf + coll_bufsize;
717 }
718
719 curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
720
721
722 count = (int *) ADIOI_Malloc(nprocs*sizeof(int));
723
724
725
726 partial_recv = (int *) ADIOI_Calloc(nprocs, sizeof(int));
727
728
729
730
731 send_size = (int *) ADIOI_Malloc(nprocs*sizeof(int));
732
733
734
735 recv_size = (int *) ADIOI_Malloc(nprocs*sizeof(int));
736
737
738 sent_to_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int));
739
740
741
742 send_buf_idx = (int *) ADIOI_Malloc(nprocs*sizeof(int));
743 curr_to_proc = (int *) ADIOI_Malloc(nprocs*sizeof(int));
744 done_to_proc = (int *) ADIOI_Malloc(nprocs*sizeof(int));
745
746
747 start_pos = (int *) ADIOI_Malloc(nprocs*sizeof(int));
748
749
750
751 ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
752 if (!buftype_is_contig) {
753 flat_buf = ADIOI_Flatten_and_find(datatype);
754 }
755 MPI_Type_get_extent(datatype, &buftype_lb, &buftype_extent);
756
757
758
759
760
761
762
763
764
765
766
767
768
769 done = 0;
770 off = st_loc;
771
772 if(gpfsmpio_pthreadio == 1)
773 io_thread = pthread_self();
774
775 #ifdef PROFILE
776 MPE_Log_event(14, 0, "end computation");
777 #endif
778
779 for (m=0; m < ntimes; m++) {
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797 #ifdef PROFILE
798 MPE_Log_event(13, 0, "start computation");
799 #endif
800 for (i=0; i < nprocs; i++) count[i] = recv_size[i] = 0;
801
802 size = ADIOI_MIN((unsigned)coll_bufsize, end_loc-st_loc+1-done);
803
804 for (i=0; i < nprocs; i++) {
805 if (others_req[i].count) {
806 start_pos[i] = curr_offlen_ptr[i];
807 for (j=curr_offlen_ptr[i]; j<others_req[i].count; j++) {
808 if (partial_recv[i]) {
809
810
811 req_off = others_req[i].offsets[j] +
812 partial_recv[i];
813 req_len = others_req[i].lens[j] -
814 partial_recv[i];
815 partial_recv[i] = 0;
816
817 others_req[i].offsets[j] = req_off;
818 others_req[i].lens[j] = req_len;
819 }
820 else {
821 req_off = others_req[i].offsets[j];
822 req_len = others_req[i].lens[j];
823 }
824 if (req_off < off + size) {
825 count[i]++;
826 ADIOI_Assert((((ADIO_Offset)(MPIU_Upint)write_buf)+req_off-off) == (ADIO_Offset)(MPIU_Upint)(write_buf+req_off-off));
827 MPI_Get_address(write_buf+req_off-off,
828 &(others_req[i].mem_ptrs[j]));
829 ADIOI_Assert((off + size - req_off) == (int)(off + size - req_off));
830 recv_size[i] += (int)(ADIOI_MIN(off + size - req_off,
831 (unsigned)req_len));
832
833 if (off+size-req_off < (unsigned)req_len)
834 {
835 partial_recv[i] = (int) (off + size - req_off);
836
837
838 if ((j+1 < others_req[i].count) &&
839 (others_req[i].offsets[j+1] < off+size))
840 {
841 *error_code = MPIO_Err_create_code(MPI_SUCCESS,
842 MPIR_ERR_RECOVERABLE,
843 myname,
844 __LINE__,
845 MPI_ERR_ARG,
846 "Filetype specifies overlapping write regions (which is illegal according to the MPI-2 specification)", 0);
847
848
849
850 }
851
852 break;
853 }
854 }
855 else break;
856 }
857 curr_offlen_ptr[i] = j;
858 }
859 }
860
861 #ifdef PROFILE
862 MPE_Log_event(14, 0, "end computation");
863 MPE_Log_event(7, 0, "start communication");
864 #endif
865 if (gpfsmpio_comm == 1)
866 ADIOI_W_Exchange_data(fd, buf, write_buf, flat_buf, offset_list,
867 len_list, send_size, recv_size, off, size, count,
868 start_pos, partial_recv,
869 sent_to_proc, nprocs, myrank,
870 buftype_is_contig, contig_access_count,
871 min_st_offset, fd_size, fd_start, fd_end,
872 others_req, send_buf_idx, curr_to_proc,
873 done_to_proc, &hole, m, buftype_extent, buf_idx,
874 error_code);
875 else
876 if (gpfsmpio_comm == 0)
877 ADIOI_W_Exchange_data_alltoallv(fd, buf, write_buf, flat_buf, offset_list,
878 len_list, send_size, recv_size, off, size, count,
879 start_pos, partial_recv,
880 sent_to_proc, nprocs, myrank,
881 buftype_is_contig, contig_access_count,
882 min_st_offset, fd_size, fd_start, fd_end,
883 others_req, send_buf_idx, curr_to_proc,
884 done_to_proc, &hole, m, buftype_extent, buf_idx,
885 error_code);
886 if (*error_code != MPI_SUCCESS) return;
887 #ifdef PROFILE
888 MPE_Log_event(8, 0, "end communication");
889 #endif
890
891 flag = 0;
892 for (i=0; i<nprocs; i++)
893 if (count[i]) flag = 1;
894
895 if (flag) {
896 char round[50];
897 sprintf(round, "two-phase-round=%d", m);
898 setenv("LIBIOLOG_EXTRA_INFO", round, 1);
899 ADIOI_Assert(size == (int)size);
900 if (gpfsmpio_pthreadio == 1) {
901
902
903
904
905
906 if(!pthread_equal(io_thread, pthread_self())) {
907 pthread_join(io_thread, &thread_ret);
908 *error_code = *(int *)thread_ret;
909 if (*error_code != MPI_SUCCESS) return;
910 io_thread = pthread_self();
911
912 }
913 io_thread_args.fd = fd;
914
915
916 io_thread_args.buf = write_buf;
917 ADIOI_SWAP(write_buf, write_buf2, char*);
918 io_thread_args.io_kind = ADIOI_WRITE;
919 io_thread_args.size = size;
920 io_thread_args.offset = off;
921 io_thread_args.status = &status;
922 io_thread_args.error_code = *error_code;
923 if ( (pthread_create(&io_thread, NULL,
924 ADIOI_IO_Thread_Func, &(io_thread_args))) != 0)
925 io_thread = pthread_self();
926 } else {
927 ADIO_WriteContig(fd, write_buf, (int)size, MPI_BYTE,
928 ADIO_EXPLICIT_OFFSET, off, &status, error_code);
929 if (*error_code != MPI_SUCCESS) return;
930 }
931 }
932
933 off += size;
934 done += size;
935 }
936 if (gpfsmpio_pthreadio == 1) {
937 if ( !pthread_equal(io_thread, pthread_self()) ) {
938 pthread_join(io_thread, &thread_ret);
939 *error_code = *(int *)thread_ret;
940 }
941 }
942
943 for (i=0; i<nprocs; i++) count[i] = recv_size[i] = 0;
944 #ifdef PROFILE
945 MPE_Log_event(7, 0, "start communication");
946 #endif
947 for (m=ntimes; m<max_ntimes; m++)
948
949 if (gpfsmpio_comm == 1)
950 ADIOI_W_Exchange_data(fd, buf, write_buf, flat_buf, offset_list,
951 len_list, send_size, recv_size, off, size, count,
952 start_pos, partial_recv,
953 sent_to_proc, nprocs, myrank,
954 buftype_is_contig, contig_access_count,
955 min_st_offset, fd_size, fd_start, fd_end,
956 others_req, send_buf_idx,
957 curr_to_proc, done_to_proc, &hole, m,
958 buftype_extent, buf_idx, error_code);
959 else
960 if (gpfsmpio_comm == 0)
961 ADIOI_W_Exchange_data_alltoallv(fd, buf, write_buf, flat_buf, offset_list,
962 len_list, send_size, recv_size, off, size, count,
963 start_pos, partial_recv,
964 sent_to_proc, nprocs, myrank,
965 buftype_is_contig, contig_access_count,
966 min_st_offset, fd_size, fd_start, fd_end,
967 others_req, send_buf_idx,
968 curr_to_proc, done_to_proc, &hole, m,
969 buftype_extent, buf_idx, error_code);
970 if (*error_code != MPI_SUCCESS) return;
971 #ifdef PROFILE
972 MPE_Log_event(8, 0, "end communication");
973 #endif
974
975 ADIOI_Free(curr_offlen_ptr);
976 ADIOI_Free(count);
977 ADIOI_Free(partial_recv);
978 ADIOI_Free(send_size);
979 ADIOI_Free(recv_size);
980 ADIOI_Free(sent_to_proc);
981 ADIOI_Free(start_pos);
982 ADIOI_Free(send_buf_idx);
983 ADIOI_Free(curr_to_proc);
984 ADIOI_Free(done_to_proc);
985
986 if (ntimes != 0 && getenv("ROMIO_GPFS_DECLARE_ACCESS")!=NULL) {
987 gpfs_wr_access_end(fd->fd_sys, st_loc, end_loc-st_loc);
988 }
989
990 if (needs_gpfs_access_cleanup) {
991 gpfs_wr_access_end(fd->fd_sys, st_loc_ion, end_loc_ion-st_loc_ion);
992 needs_gpfs_access_cleanup=0;
993 }
994
995 unsetenv("LIBIOLOG_EXTRA_INFO");
996 }
997
998
999
1000
1001
1002 static void ADIOI_W_Exchange_data(ADIO_File fd, const void *buf, char *write_buf,
1003 ADIOI_Flatlist_node *flat_buf, ADIO_Offset
1004 *offset_list, ADIO_Offset *len_list, int *send_size,
1005 int *recv_size, ADIO_Offset off, int size,
1006 int *count, int *start_pos,
1007 int *partial_recv,
1008 int *sent_to_proc, int nprocs,
1009 int myrank, int
1010 buftype_is_contig, int contig_access_count,
1011 ADIO_Offset min_st_offset,
1012 ADIO_Offset fd_size,
1013 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
1014 ADIOI_Access *others_req,
1015 int *send_buf_idx, int *curr_to_proc,
1016 int *done_to_proc, int *hole, int iter,
1017 MPI_Aint buftype_extent, int *buf_idx,
1018 int *error_code)
1019 {
1020 int i, j, k, *tmp_len, nprocs_recv, nprocs_send, err;
1021 char **send_buf = NULL;
1022 MPI_Request *requests, *send_req;
1023 MPI_Datatype *recv_types;
1024 MPI_Status *statuses, status;
1025 int *srt_len, sum;
1026 ADIO_Offset *srt_off;
1027 static char myname[] = "ADIOI_W_EXCHANGE_DATA";
1028
1029
1030
1031
1032 MPI_Alltoall(recv_size, 1, MPI_INT, send_size, 1, MPI_INT, fd->comm);
1033
1034
1035
1036 nprocs_recv = 0;
1037 for (i=0; i<nprocs; i++) if (recv_size[i]) nprocs_recv++;
1038
1039 recv_types = (MPI_Datatype *)
1040 ADIOI_Malloc((nprocs_recv+1)*sizeof(MPI_Datatype));
1041
1042
1043 tmp_len = (int *) ADIOI_Malloc(nprocs*sizeof(int));
1044 j = 0;
1045 for (i=0; i<nprocs; i++) {
1046 if (recv_size[i]) {
1047
1048 if (partial_recv[i]) {
1049 k = start_pos[i] + count[i] - 1;
1050 tmp_len[i] = others_req[i].lens[k];
1051 others_req[i].lens[k] = partial_recv[i];
1052 }
1053 ADIOI_Type_create_hindexed_x(count[i],
1054 &(others_req[i].lens[start_pos[i]]),
1055 &(others_req[i].mem_ptrs[start_pos[i]]),
1056 MPI_BYTE, recv_types+j);
1057
1058 MPI_Type_commit(recv_types+j);
1059 j++;
1060 }
1061 }
1062
1063
1064
1065
1066
1067 sum = 0;
1068 for (i=0; i<nprocs; i++) sum += count[i];
1069 srt_off = (ADIO_Offset *) ADIOI_Malloc((sum+1)*sizeof(ADIO_Offset));
1070 srt_len = (int *) ADIOI_Malloc((sum+1)*sizeof(int));
1071
1072
1073 ADIOI_Heap_merge(others_req, count, srt_off, srt_len, start_pos,
1074 nprocs, nprocs_recv, sum);
1075
1076
1077 for (i=0; i<nprocs; i++)
1078 if (partial_recv[i]) {
1079 k = start_pos[i] + count[i] - 1;
1080 others_req[i].lens[k] = tmp_len[i];
1081 }
1082 ADIOI_Free(tmp_len);
1083
1084
1085
1086
1087
1088
1089
1090 *hole = 0;
1091 if (off != srt_off[0])
1092 *hole = 1;
1093 else {
1094 for (i=1; i<sum; i++) {
1095 if (srt_off[i] <= srt_off[0] + srt_len[0]) {
1096 int new_len = srt_off[i] + srt_len[i] - srt_off[0];
1097 if (new_len > srt_len[0]) srt_len[0] = new_len;
1098 }
1099 else
1100 break;
1101 }
1102 if (i < sum || size != srt_len[0])
1103 *hole = 1;
1104 }
1105
1106 ADIOI_Free(srt_off);
1107 ADIOI_Free(srt_len);
1108
1109 if (nprocs_recv) {
1110 if (*hole) {
1111 const char * stuff = "data-sieve-in-two-phase";
1112 setenv("LIBIOLOG_EXTRA_INFO", stuff, 1);
1113 ADIO_ReadContig(fd, write_buf, size, MPI_BYTE,
1114 ADIO_EXPLICIT_OFFSET, off, &status, &err);
1115
1116 if (err != MPI_SUCCESS) {
1117 *error_code = MPIO_Err_create_code(err,
1118 MPIR_ERR_RECOVERABLE, myname,
1119 __LINE__, MPI_ERR_IO,
1120 "**ioRMWrdwr", 0);
1121 return;
1122 }
1123
1124 unsetenv("LIBIOLOG_EXTRA_INFO");
1125 }
1126 }
1127
1128 nprocs_send = 0;
1129 for (i=0; i < nprocs; i++) if (send_size[i]) nprocs_send++;
1130
1131 if (fd->atomicity) {
1132
1133 requests = (MPI_Request *)
1134 ADIOI_Malloc((nprocs_send+1)*sizeof(MPI_Request));
1135 send_req = requests;
1136 }
1137 else {
1138 requests = (MPI_Request *)
1139 ADIOI_Malloc((nprocs_send+nprocs_recv+1)*sizeof(MPI_Request));
1140
1141
1142
1143 j = 0;
1144 for (i=0; i<nprocs; i++) {
1145 if (recv_size[i]) {
1146 MPI_Irecv(MPI_BOTTOM, 1, recv_types[j], i, myrank+i+100*iter,
1147 fd->comm, requests+j);
1148 j++;
1149 }
1150 }
1151 send_req = requests + nprocs_recv;
1152 }
1153
1154
1155
1156
1157 #ifdef AGGREGATION_PROFILE
1158 MPE_Log_event (5032, 0, NULL);
1159 #endif
1160 if (buftype_is_contig) {
1161 j = 0;
1162 for (i=0; i < nprocs; i++)
1163 if (send_size[i]) {
1164 MPI_Isend(((char *) buf) + buf_idx[i], send_size[i],
1165 MPI_BYTE, i, myrank+i+100*iter, fd->comm,
1166 send_req+j);
1167 j++;
1168 buf_idx[i] += send_size[i];
1169 }
1170 }
1171 else if (nprocs_send) {
1172
1173 send_buf = (char **) ADIOI_Malloc(nprocs*sizeof(char*));
1174 for (i=0; i < nprocs; i++)
1175 if (send_size[i])
1176 send_buf[i] = (char *) ADIOI_Malloc(send_size[i]);
1177
1178 ADIOI_Fill_send_buffer(fd, buf, flat_buf, send_buf,
1179 offset_list, len_list, send_size,
1180 send_req,
1181 sent_to_proc, nprocs, myrank,
1182 contig_access_count,
1183 min_st_offset, fd_size, fd_start, fd_end,
1184 send_buf_idx, curr_to_proc, done_to_proc, iter,
1185 buftype_extent);
1186
1187 }
1188
1189 if (fd->atomicity) {
1190
1191 j = 0;
1192 for (i=0; i<nprocs; i++) {
1193 MPI_Status wkl_status;
1194 if (recv_size[i]) {
1195 MPI_Recv(MPI_BOTTOM, 1, recv_types[j], i, myrank+i+100*iter,
1196 fd->comm, &wkl_status);
1197 j++;
1198 }
1199 }
1200 }
1201
1202 for (i=0; i<nprocs_recv; i++) MPI_Type_free(recv_types+i);
1203 ADIOI_Free(recv_types);
1204
1205 if (fd->atomicity) {
1206
1207 statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send+1) * \
1208 sizeof(MPI_Status));
1209
1210 }
1211 else {
1212 statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send+nprocs_recv+1) * \
1213 sizeof(MPI_Status));
1214
1215 }
1216
1217 #ifdef NEEDS_MPI_TEST
1218 i = 0;
1219 if (fd->atomicity) {
1220
1221 while (!i) MPI_Testall(nprocs_send, send_req, &i, statuses);
1222 }
1223 else {
1224 while (!i) MPI_Testall(nprocs_send+nprocs_recv, requests, &i, statuses);
1225 }
1226 #else
1227 if (fd->atomicity)
1228
1229 MPI_Waitall(nprocs_send, send_req, statuses);
1230 else
1231 MPI_Waitall(nprocs_send+nprocs_recv, requests, statuses);
1232 #endif
1233
1234 #ifdef AGGREGATION_PROFILE
1235 MPE_Log_event (5033, 0, NULL);
1236 #endif
1237 ADIOI_Free(statuses);
1238 ADIOI_Free(requests);
1239 if (!buftype_is_contig && nprocs_send) {
1240 for (i=0; i < nprocs; i++)
1241 if (send_size[i]) ADIOI_Free(send_buf[i]);
1242 ADIOI_Free(send_buf);
1243 }
1244 }
1245
1246
1247 #define ADIOI_BUF_INCR \
1248 { \
1249 while (buf_incr) { \
1250 size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \
1251 user_buf_idx += size_in_buf; \
1252 flat_buf_sz -= size_in_buf; \
1253 if (!flat_buf_sz) { \
1254 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
1255 else { \
1256 flat_buf_idx = 0; \
1257 n_buftypes++; \
1258 } \
1259 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
1260 (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
1261 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
1262 } \
1263 buf_incr -= size_in_buf; \
1264 } \
1265 }
1266
1267
1268 #define ADIOI_BUF_COPY \
1269 { \
1270 while (size) { \
1271 size_in_buf = ADIOI_MIN(size, flat_buf_sz); \
1272 ADIOI_Assert((((ADIO_Offset)(MPIU_Upint)buf) + user_buf_idx) == (ADIO_Offset)(MPIU_Upint)((MPIU_Upint)buf + user_buf_idx)); \
1273 ADIOI_Assert(size_in_buf == (size_t)size_in_buf); \
1274 memcpy(&(send_buf[p][send_buf_idx[p]]), \
1275 ((char *) buf) + user_buf_idx, size_in_buf); \
1276 send_buf_idx[p] += size_in_buf; \
1277 user_buf_idx += size_in_buf; \
1278 flat_buf_sz -= size_in_buf; \
1279 if (!flat_buf_sz) { \
1280 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
1281 else { \
1282 flat_buf_idx = 0; \
1283 n_buftypes++; \
1284 } \
1285 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
1286 (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
1287 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
1288 } \
1289 size -= size_in_buf; \
1290 buf_incr -= size_in_buf; \
1291 } \
1292 ADIOI_BUF_INCR \
1293 }
1294
1295 static void ADIOI_Fill_send_buffer(ADIO_File fd, const void *buf, ADIOI_Flatlist_node
1296 *flat_buf, char **send_buf, ADIO_Offset
1297 *offset_list, ADIO_Offset *len_list, int *send_size,
1298 MPI_Request *requests, int *sent_to_proc,
1299 int nprocs, int myrank,
1300 int contig_access_count,
1301 ADIO_Offset min_st_offset, ADIO_Offset fd_size,
1302 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
1303 int *send_buf_idx, int *curr_to_proc,
1304 int *done_to_proc, int iter,
1305 MPI_Aint buftype_extent)
1306 {
1307
1308
1309 int i, p, flat_buf_idx;
1310 ADIO_Offset flat_buf_sz, size_in_buf, buf_incr, size;
1311 int jj, n_buftypes;
1312 ADIO_Offset off, len, rem_len, user_buf_idx;
1313
1314
1315
1316
1317
1318
1319
1320
1321 for (i=0; i < nprocs; i++) {
1322 send_buf_idx[i] = curr_to_proc[i] = 0;
1323 done_to_proc[i] = sent_to_proc[i];
1324 }
1325 jj = 0;
1326
1327 user_buf_idx = flat_buf->indices[0];
1328 flat_buf_idx = 0;
1329 n_buftypes = 0;
1330 flat_buf_sz = flat_buf->blocklens[0];
1331
1332
1333
1334
1335
1336 for (i=0; i<contig_access_count; i++) {
1337 off = offset_list[i];
1338 rem_len = len_list[i];
1339
1340
1341 while (rem_len != 0) {
1342 len = rem_len;
1343
1344
1345
1346
1347 p = ADIOI_GPFS_Calc_aggregator(fd,
1348 off,
1349 min_st_offset,
1350 &len,
1351 fd_size,
1352 fd_start,
1353 fd_end);
1354
1355 if (send_buf_idx[p] < send_size[p]) {
1356 if (curr_to_proc[p]+len > done_to_proc[p]) {
1357 if (done_to_proc[p] > curr_to_proc[p]) {
1358 size = ADIOI_MIN(curr_to_proc[p] + len -
1359 done_to_proc[p], send_size[p]-send_buf_idx[p]);
1360 buf_incr = done_to_proc[p] - curr_to_proc[p];
1361 ADIOI_BUF_INCR
1362 ADIOI_Assert((curr_to_proc[p] + len - done_to_proc[p]) == (unsigned)(curr_to_proc[p] + len - done_to_proc[p]));
1363 buf_incr = curr_to_proc[p] + len - done_to_proc[p];
1364 ADIOI_Assert((done_to_proc[p] + size) == (unsigned)(done_to_proc[p] + size));
1365 curr_to_proc[p] = done_to_proc[p] + size;
1366 ADIOI_BUF_COPY
1367 }
1368 else {
1369 size = ADIOI_MIN(len,send_size[p]-send_buf_idx[p]);
1370 buf_incr = len;
1371 ADIOI_Assert((curr_to_proc[p] + size) == (unsigned)((ADIO_Offset)curr_to_proc[p] + size));
1372 curr_to_proc[p] += size;
1373 ADIOI_BUF_COPY
1374 }
1375 if (send_buf_idx[p] == send_size[p]) {
1376 MPI_Isend(send_buf[p], send_size[p], MPI_BYTE, p,
1377 myrank+p+100*iter, fd->comm, requests+jj);
1378 jj++;
1379 }
1380 }
1381 else {
1382 ADIOI_Assert((curr_to_proc[p] + len) == (unsigned)((ADIO_Offset)curr_to_proc[p] + len));
1383 curr_to_proc[p] += len;
1384 buf_incr = len;
1385 ADIOI_BUF_INCR
1386 }
1387 }
1388 else {
1389 buf_incr = len;
1390 ADIOI_BUF_INCR
1391 }
1392 off += len;
1393 rem_len -= len;
1394 }
1395 }
1396 for (i=0; i < nprocs; i++)
1397 if (send_size[i]) sent_to_proc[i] = curr_to_proc[i];
1398 }
1399
1400
1401
1402 static void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count,
1403 ADIO_Offset *srt_off, int *srt_len, int *start_pos,
1404 int nprocs, int nprocs_recv, int total_elements)
1405 {
1406 typedef struct {
1407 ADIO_Offset *off_list;
1408 ADIO_Offset *len_list;
1409 int nelem;
1410 } heap_struct;
1411
1412 heap_struct *a, tmp;
1413 int i, j, heapsize, l, r, k, smallest;
1414
1415 a = (heap_struct *) ADIOI_Malloc((nprocs_recv+1)*sizeof(heap_struct));
1416
1417 j = 0;
1418 for (i=0; i<nprocs; i++)
1419 if (count[i]) {
1420 a[j].off_list = &(others_req[i].offsets[start_pos[i]]);
1421 a[j].len_list = &(others_req[i].lens[start_pos[i]]);
1422 a[j].nelem = count[i];
1423 j++;
1424 }
1425
1426
1427
1428
1429 heapsize = nprocs_recv;
1430 for (i=heapsize/2 - 1; i>=0; i--) {
1431
1432
1433
1434
1435 k = i;
1436 while (1) {
1437 l = 2*(k+1) - 1;
1438 r = 2*(k+1);
1439
1440 if ((l < heapsize) &&
1441 (*(a[l].off_list) < *(a[k].off_list)))
1442 smallest = l;
1443 else smallest = k;
1444
1445 if ((r < heapsize) &&
1446 (*(a[r].off_list) < *(a[smallest].off_list)))
1447 smallest = r;
1448
1449 if (smallest != k) {
1450 tmp.off_list = a[k].off_list;
1451 tmp.len_list = a[k].len_list;
1452 tmp.nelem = a[k].nelem;
1453
1454 a[k].off_list = a[smallest].off_list;
1455 a[k].len_list = a[smallest].len_list;
1456 a[k].nelem = a[smallest].nelem;
1457
1458 a[smallest].off_list = tmp.off_list;
1459 a[smallest].len_list = tmp.len_list;
1460 a[smallest].nelem = tmp.nelem;
1461
1462 k = smallest;
1463 }
1464 else break;
1465 }
1466 }
1467
1468 for (i=0; i<total_elements; i++) {
1469
1470 srt_off[i] = *(a[0].off_list);
1471 srt_len[i] = *(a[0].len_list);
1472 (a[0].nelem)--;
1473
1474 if (!a[0].nelem) {
1475 a[0].off_list = a[heapsize-1].off_list;
1476 a[0].len_list = a[heapsize-1].len_list;
1477 a[0].nelem = a[heapsize-1].nelem;
1478 heapsize--;
1479 }
1480 else {
1481 (a[0].off_list)++;
1482 (a[0].len_list)++;
1483 }
1484
1485
1486 k = 0;
1487 while (1) {
1488 l = 2*(k+1) - 1;
1489 r = 2*(k+1);
1490
1491 if ((l < heapsize) &&
1492 (*(a[l].off_list) < *(a[k].off_list)))
1493 smallest = l;
1494 else smallest = k;
1495
1496 if ((r < heapsize) &&
1497 (*(a[r].off_list) < *(a[smallest].off_list)))
1498 smallest = r;
1499
1500 if (smallest != k) {
1501 tmp.off_list = a[k].off_list;
1502 tmp.len_list = a[k].len_list;
1503 tmp.nelem = a[k].nelem;
1504
1505 a[k].off_list = a[smallest].off_list;
1506 a[k].len_list = a[smallest].len_list;
1507 a[k].nelem = a[smallest].nelem;
1508
1509 a[smallest].off_list = tmp.off_list;
1510 a[smallest].len_list = tmp.len_list;
1511 a[smallest].nelem = tmp.nelem;
1512
1513 k = smallest;
1514 }
1515 else break;
1516 }
1517 }
1518
1519 ADIOI_Free(a);
1520 }
1521
1522
1523 static void ADIOI_W_Exchange_data_alltoallv(
1524 ADIO_File fd, const void *buf,
1525 char *write_buf,
1526 ADIOI_Flatlist_node *flat_buf,
1527 ADIO_Offset *offset_list,
1528 ADIO_Offset *len_list, int *send_size, int *recv_size,
1529 ADIO_Offset off, int size,
1530 int *count, int *start_pos, int *partial_recv,
1531 int *sent_to_proc, int nprocs, int myrank,
1532 int buftype_is_contig, int contig_access_count,
1533 ADIO_Offset min_st_offset,
1534 ADIO_Offset fd_size,
1535 ADIO_Offset *fd_start,
1536 ADIO_Offset *fd_end,
1537 ADIOI_Access *others_req,
1538 int *send_buf_idx, int *curr_to_proc,
1539 int *done_to_proc, int *hole,
1540 int iter, MPI_Aint buftype_extent, int *buf_idx,
1541 int *error_code)
1542 {
1543 int i, j, k=0, nprocs_recv, nprocs_send, *tmp_len, err;
1544 char **send_buf = NULL;
1545 MPI_Request *send_req=NULL;
1546 MPI_Status status;
1547 int rtail, stail;
1548 char *sbuf_ptr, *to_ptr;
1549 int len;
1550 int *sdispls, *rdispls;
1551 char *all_recv_buf, *all_send_buf;
1552 int *srt_len, sum;
1553 ADIO_Offset *srt_off;
1554 static char myname[] = "ADIOI_W_EXCHANGE_DATA";
1555 double io_time;
1556
1557 io_time = MPI_Wtime();
1558
1559
1560 MPI_Alltoall(recv_size, 1, MPI_INT, send_size, 1, MPI_INT, fd->comm);
1561
1562 gpfsmpio_prof_cw[GPFSMPIO_CIO_T_DEXCH_RECV_EXCH] += MPI_Wtime() - io_time;
1563 io_time = MPI_Wtime();
1564
1565 nprocs_recv = 0;
1566 for (i=0; i<nprocs; i++) if (recv_size[i]) { nprocs_recv++; }
1567 nprocs_send = 0;
1568 for (i=0; i<nprocs; i++) if (send_size[i]) { nprocs_send++; }
1569
1570
1571 rdispls = (int *) ADIOI_Malloc( nprocs * sizeof(int) );
1572 rtail = 0;
1573 for (i=0; i<nprocs; i++) { rdispls[i] = rtail; rtail += recv_size[i]; }
1574
1575
1576 all_recv_buf = (char *) ADIOI_Malloc( rtail );
1577
1578
1579 sdispls = (int *) ADIOI_Malloc( nprocs * sizeof(int) );
1580 stail = 0;
1581 for (i=0; i<nprocs; i++) { sdispls[i] = stail; stail += send_size[i]; }
1582
1583
1584 all_send_buf = (char *) ADIOI_Malloc( stail );
1585 if (buftype_is_contig) {
1586 for (i=0; i<nprocs; i++)
1587 {
1588 if (send_size[i]) {
1589 sbuf_ptr = all_send_buf + sdispls[i];
1590 memcpy( sbuf_ptr, buf + buf_idx[i], send_size[i] );
1591 buf_idx[i] += send_size[i];
1592 }
1593 }
1594 } else {
1595 send_buf = (char **) ADIOI_Malloc( nprocs * sizeof(char *) );
1596 for (i=0; i<nprocs; i++)
1597 send_buf[i] = all_send_buf + sdispls[i];
1598 ADIOI_Fill_send_buffer_nosend(fd, buf, flat_buf, send_buf,
1599 offset_list, len_list, send_size,
1600 send_req,
1601 sent_to_proc, nprocs, myrank,
1602 contig_access_count,
1603 min_st_offset, fd_size, fd_start, fd_end,
1604 send_buf_idx, curr_to_proc, done_to_proc, iter,
1605 buftype_extent);
1606 ADIOI_Free(send_buf);
1607 }
1608
1609 gpfsmpio_prof_cw[GPFSMPIO_CIO_T_DEXCH_SETUP] += MPI_Wtime() - io_time;
1610
1611 io_time = MPI_Wtime();
1612
1613 MPI_Alltoallv(
1614 all_send_buf, send_size, sdispls, MPI_BYTE,
1615 all_recv_buf, recv_size, rdispls, MPI_BYTE,
1616 fd->comm );
1617
1618 ADIOI_Free( all_send_buf );
1619 ADIOI_Free(sdispls);
1620
1621 gpfsmpio_prof_cw[GPFSMPIO_CIO_T_DEXCH_NET] += MPI_Wtime() - io_time;
1622 io_time = MPI_Wtime();
1623
1624
1625
1626
1627
1628 sum = 0;
1629 for (i=0; i<nprocs; i++) sum += count[i];
1630 srt_off = (ADIO_Offset *) ADIOI_Malloc((sum+1)*sizeof(ADIO_Offset));
1631 srt_len = (int *) ADIOI_Malloc((sum+1)*sizeof(int));
1632
1633 ADIOI_Heap_merge(others_req, count, srt_off, srt_len, start_pos,
1634 nprocs, nprocs_recv, sum);
1635
1636
1637 *hole = 0;
1638
1639 if((srt_off[0] > off) ||
1640 ((srt_off[sum-1] + srt_len[sum-1]) < (off + size)))
1641 {
1642 *hole = 1;
1643 }
1644 else
1645 for (i=0; i<sum-1; i++)
1646 if (srt_off[i]+srt_len[i] < srt_off[i+1]) {
1647 *hole = 1;
1648 break;
1649 }
1650
1651 ADIOI_Free(srt_off);
1652 ADIOI_Free(srt_len);
1653
1654 gpfsmpio_prof_cw[GPFSMPIO_CIO_T_DEXCH_SORT] += MPI_Wtime() - io_time;
1655 io_time = MPI_Wtime();
1656 if (nprocs_recv) {
1657 if (*hole) {
1658 ADIO_ReadContig(fd, write_buf, size, MPI_BYTE,
1659 ADIO_EXPLICIT_OFFSET, off, &status, &err);
1660
1661 if (err != MPI_SUCCESS) {
1662 *error_code = MPIO_Err_create_code(err,
1663 MPIR_ERR_RECOVERABLE, myname,
1664 __LINE__, MPI_ERR_IO,
1665 "**ioRMWrdwr", 0);
1666 return;
1667 }
1668
1669 }
1670 }
1671 gpfsmpio_prof_cw[GPFSMPIO_CIO_T_DEXCH_SIEVE] += MPI_Wtime() - io_time;
1672
1673
1674 tmp_len = (int *) ADIOI_Malloc(nprocs*sizeof(int));
1675 for (i=0; i<nprocs; i++)
1676 {
1677 if (recv_size[i]) {
1678 if (partial_recv[i]) {
1679 k = start_pos[i] + count[i] - 1;
1680 tmp_len[i] = others_req[i].lens[k];
1681 others_req[i].lens[k] = partial_recv[i];
1682 }
1683
1684 sbuf_ptr = all_recv_buf + rdispls[i];
1685 for (j=0; j<count[i]; j++) {
1686 ADIOI_ENSURE_AINT_FITS_IN_PTR(others_req[i].mem_ptrs[ start_pos[i]+j ]);
1687 to_ptr = (char *) ADIOI_AINT_CAST_TO_VOID_PTR ( others_req[i].mem_ptrs[ start_pos[i]+j ] );
1688 len = others_req[i].lens[ start_pos[i]+j ] ;
1689 memcpy( to_ptr, sbuf_ptr, len );
1690 sbuf_ptr += len;
1691 }
1692
1693
1694 if (partial_recv[i]) {
1695 k = start_pos[i] + count[i] - 1;
1696 others_req[i].lens[k] = tmp_len[i];
1697 }
1698
1699 }
1700 }
1701
1702 ADIOI_Free( tmp_len );
1703 ADIOI_Free( all_recv_buf );
1704 ADIOI_Free(rdispls);
1705 return;
1706 }
1707
1708 static void ADIOI_Fill_send_buffer_nosend(ADIO_File fd, const void *buf, ADIOI_Flatlist_node
1709 *flat_buf, char **send_buf, ADIO_Offset
1710 *offset_list, ADIO_Offset *len_list, int *send_size,
1711 MPI_Request *requests, int *sent_to_proc,
1712 int nprocs, int myrank,
1713 int contig_access_count,
1714 ADIO_Offset min_st_offset, ADIO_Offset fd_size,
1715 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
1716 int *send_buf_idx, int *curr_to_proc,
1717 int *done_to_proc, int iter,
1718 MPI_Aint buftype_extent)
1719 {
1720
1721
1722 int i, p, flat_buf_idx;
1723 ADIO_Offset flat_buf_sz, size_in_buf, buf_incr, size;
1724 int jj, n_buftypes;
1725 ADIO_Offset off, len, rem_len, user_buf_idx;
1726
1727
1728
1729
1730
1731
1732
1733
1734 for (i=0; i < nprocs; i++) {
1735 send_buf_idx[i] = curr_to_proc[i] = 0;
1736 done_to_proc[i] = sent_to_proc[i];
1737 }
1738 jj = 0;
1739
1740 user_buf_idx = flat_buf->indices[0];
1741 flat_buf_idx = 0;
1742 n_buftypes = 0;
1743 flat_buf_sz = flat_buf->blocklens[0];
1744
1745
1746
1747
1748
1749 for (i=0; i<contig_access_count; i++) {
1750 off = offset_list[i];
1751 rem_len = len_list[i];
1752
1753
1754 while (rem_len != 0) {
1755 len = rem_len;
1756
1757
1758
1759
1760 p = ADIOI_GPFS_Calc_aggregator(fd,
1761 off,
1762 min_st_offset,
1763 &len,
1764 fd_size,
1765 fd_start,
1766 fd_end);
1767
1768 if (send_buf_idx[p] < send_size[p]) {
1769 if (curr_to_proc[p]+len > done_to_proc[p]) {
1770 if (done_to_proc[p] > curr_to_proc[p]) {
1771 size = ADIOI_MIN(curr_to_proc[p] + len -
1772 done_to_proc[p], send_size[p]-send_buf_idx[p]);
1773 buf_incr = done_to_proc[p] - curr_to_proc[p];
1774 ADIOI_BUF_INCR
1775 ADIOI_Assert((curr_to_proc[p] + len - done_to_proc[p]) == (unsigned)(curr_to_proc[p] + len - done_to_proc[p]));
1776 buf_incr = curr_to_proc[p] + len - done_to_proc[p];
1777 ADIOI_Assert((done_to_proc[p] + size) == (unsigned)(done_to_proc[p] + size));
1778 curr_to_proc[p] = done_to_proc[p] + size;
1779 ADIOI_BUF_COPY
1780 }
1781 else {
1782 size = ADIOI_MIN(len,send_size[p]-send_buf_idx[p]);
1783 buf_incr = len;
1784 ADIOI_Assert((curr_to_proc[p] + size) == (unsigned)((ADIO_Offset)curr_to_proc[p] + size));
1785 curr_to_proc[p] += size;
1786 ADIOI_BUF_COPY
1787 }
1788
1789
1790
1791
1792
1793
1794
1795
1796 }
1797 else {
1798 ADIOI_Assert((curr_to_proc[p] + len) == (unsigned)((ADIO_Offset)curr_to_proc[p] + len));
1799 curr_to_proc[p] += (int)len;
1800 buf_incr = len;
1801 ADIOI_BUF_INCR
1802 }
1803 }
1804 else {
1805 buf_incr = len;
1806 ADIOI_BUF_INCR
1807 }
1808 off += len;
1809 rem_len -= len;
1810 }
1811 }
1812 for (i=0; i < nprocs; i++)
1813 if (send_size[i]) sent_to_proc[i] = curr_to_proc[i];
1814 }