This source file includes following definitions.
- ADIOI_IOStridedColl
- ADIOI_Calc_bounds
- ADIOI_IOFiletype
- Exch_data_amounts
- post_aggregator_comm
- post_client_comm
1
2
3
4
5
6
7 #include "assert.h"
8 #include "adio.h"
9 #include "adio_extern.h"
10 #ifdef AGGREGATION_PROFILE
11 #include "mpe.h"
12 #endif
13
14
15
16
17
18
19 #define USE_PRE_REQ
20
21 static void Exch_data_amounts (ADIO_File fd, int nprocs,
22 ADIO_Offset *client_comm_sz_arr,
23 ADIO_Offset *agg_comm_sz_arr,
24 int *client_alltoallw_counts,
25 int *agg_alltoallw_counts,
26 int *aggregators_done);
27 static void post_aggregator_comm (MPI_Comm comm, int rw_type, int nproc,
28 void *cb_buf,
29 MPI_Datatype *client_comm_dtype_arr,
30 ADIO_Offset *client_comm_sz_arr,
31 MPI_Request **requests,
32 int *aggregators_client_count_p);
33
34 static void post_client_comm (ADIO_File fd, int rw_type,
35 int agg_rank, void *buf,
36 MPI_Datatype agg_comm_dtype,
37 int agg_alltoallw_count,
38 MPI_Request *request);
39
40
41
42
43
44 void ADIOI_IOStridedColl (ADIO_File fd, void *buf, int count, int rdwr,
45 MPI_Datatype datatype, int file_ptr_type,
46 ADIO_Offset offset, ADIO_Status *status,
47 int *error_code)
48 {
49 ADIO_Offset min_st_offset=0, max_end_offset=0;
50 ADIO_Offset st_end_offset[2];
51 ADIO_Offset *all_st_end_offsets = NULL;
52 int filetype_is_contig, buftype_is_contig, is_contig;
53 ADIO_Offset off;
54 int interleave_count = 0, i, nprocs, myrank, nprocs_for_coll;
55 int cb_enable;
56 ADIO_Offset bufsize;
57 MPI_Aint extent, lb;
58 #ifdef DEBUG2
59 MPI_Aint bufextent;
60 #endif
61 MPI_Count size;
62 int agg_rank;
63
64 ADIO_Offset agg_disp;
65 MPI_Datatype agg_dtype;
66
67 int aggregators_done = 0;
68 ADIO_Offset buffered_io_size = 0;
69
70 int *alltoallw_disps;
71
72 int *alltoallw_counts;
73 int *client_alltoallw_counts;
74 int *agg_alltoallw_counts;
75
76 char *cb_buf = NULL;
77
78 MPI_Datatype *client_comm_dtype_arr;
79 MPI_Datatype *agg_comm_dtype_arr;
80 ADIO_Offset *client_comm_sz_arr;
81 ADIO_Offset *agg_comm_sz_arr;
82
83
84 view_state *client_file_view_state_arr = NULL;
85 view_state *agg_file_view_state_arr = NULL;
86
87 view_state *my_mem_view_state_arr = NULL;
88
89 MPI_Status *agg_comm_statuses = NULL;
90 MPI_Request *agg_comm_requests = NULL;
91 MPI_Status *client_comm_statuses = NULL;
92 MPI_Request *client_comm_requests = NULL;
93 int aggs_client_count = 0;
94 int clients_agg_count = 0;
95
96 MPI_Comm_size (fd->comm, &nprocs);
97 MPI_Comm_rank (fd->comm, &myrank);
98 #ifdef DEBUG
99 fprintf (stderr, "p%d: entering ADIOI_IOStridedColl\n", myrank);
100 #endif
101 #ifdef AGGREGATION_PROFILE
102 if (rdwr == ADIOI_READ)
103 MPE_Log_event (5010, 0, NULL);
104 else
105 MPE_Log_event (5012, 0, NULL);
106 #endif
107
108
109
110
111
112
113
114
115 nprocs_for_coll = fd->hints->cb_nodes;
116
117 if (rdwr == ADIOI_READ)
118 cb_enable = fd->hints->cb_read;
119 else
120 cb_enable = fd->hints->cb_write;
121
122
123 if (cb_enable != ADIOI_HINT_DISABLE) {
124
125 ADIOI_Calc_bounds (fd, count, datatype, file_ptr_type, offset,
126 &st_end_offset[0], &st_end_offset[1]);
127
128
129 all_st_end_offsets = (ADIO_Offset *)
130 ADIOI_Malloc (2*nprocs*sizeof(ADIO_Offset));
131 MPI_Allgather (st_end_offset, 2, ADIO_OFFSET, all_st_end_offsets, 2,
132 ADIO_OFFSET, fd->comm);
133
134 min_st_offset = all_st_end_offsets[0];
135 max_end_offset = all_st_end_offsets[1];
136
137 for (i=1; i<nprocs; i++) {
138
139 if ((all_st_end_offsets[i*2] < all_st_end_offsets[i*2-1]) &&
140 (all_st_end_offsets[i*2] <= all_st_end_offsets[i*2+1]))
141 interleave_count++;
142
143
144
145 min_st_offset = ADIOI_MIN(all_st_end_offsets[i*2],
146 min_st_offset);
147 max_end_offset = ADIOI_MAX(all_st_end_offsets[i*2+1],
148 max_end_offset);
149 }
150 }
151
152 ADIOI_Datatype_iscontig (datatype, &buftype_is_contig);
153 ADIOI_Datatype_iscontig (fd->filetype, &filetype_is_contig);
154
155 if ((cb_enable == ADIOI_HINT_DISABLE
156 || (!interleave_count && (cb_enable == ADIOI_HINT_AUTO)))
157 && (fd->hints->cb_pfr != ADIOI_HINT_ENABLE)){
158 if (cb_enable != ADIOI_HINT_DISABLE) {
159 ADIOI_Free (all_st_end_offsets);
160 }
161
162 if (buftype_is_contig && filetype_is_contig) {
163 if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
164 off = fd->disp + (fd->etype_size) * offset;
165 if (rdwr == ADIOI_READ)
166 ADIO_ReadContig(fd, buf, count, datatype,
167 ADIO_EXPLICIT_OFFSET, off, status,
168 error_code);
169 else
170 ADIO_WriteContig(fd, buf, count, datatype,
171 ADIO_EXPLICIT_OFFSET, off, status,
172 error_code);
173 }
174 else {
175 if (rdwr == ADIOI_READ)
176 ADIO_ReadContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
177 0, status, error_code);
178 else
179 ADIO_WriteContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
180 0, status, error_code);
181 }
182 }
183 else {
184 if (rdwr == ADIOI_READ)
185 ADIO_ReadStrided(fd, buf, count, datatype, file_ptr_type,
186 offset, status, error_code);
187 else
188 ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type,
189 offset, status, error_code);
190 }
191 return;
192 }
193
194 MPI_Type_get_extent(datatype, &lb, &extent);
195 #ifdef DEBUG2
196 bufextent = extent * count;
197 #endif
198 MPI_Type_size_x(datatype, &size);
199 bufsize = size * (MPI_Count)count;
200
201
202 if ((fd->hints->cb_pfr != ADIOI_HINT_ENABLE) ||
203 (fd->file_realm_types == NULL))
204 ADIOI_Calc_file_realms (fd, min_st_offset, max_end_offset);
205
206 my_mem_view_state_arr = (view_state *)
207 ADIOI_Calloc (1, nprocs * sizeof(view_state));
208 agg_file_view_state_arr = (view_state *)
209 ADIOI_Calloc (1, nprocs * sizeof(view_state));
210 client_comm_sz_arr = (ADIO_Offset *)
211 ADIOI_Calloc (1, nprocs * sizeof(ADIO_Offset));
212
213 if (fd->is_agg) {
214 client_file_view_state_arr = (view_state *)
215 ADIOI_Calloc (1, nprocs * sizeof(view_state));
216 }
217 else {
218 client_file_view_state_arr = NULL;
219 }
220
221
222
223 client_comm_dtype_arr = (MPI_Datatype *)
224 ADIOI_Calloc (1, nprocs * sizeof(MPI_Datatype));
225 if (!fd->is_agg)
226 for (i = 0; i < nprocs; i++)
227 client_comm_dtype_arr[i] = MPI_BYTE;
228
229 ADIOI_Exch_file_views (myrank, nprocs, file_ptr_type, fd, count,
230 datatype, offset, my_mem_view_state_arr,
231 agg_file_view_state_arr,
232 client_file_view_state_arr);
233
234 agg_comm_sz_arr = (ADIO_Offset *)
235 ADIOI_Calloc (1, nprocs * sizeof(ADIO_Offset));
236 agg_comm_dtype_arr = (MPI_Datatype *)
237 ADIOI_Malloc (nprocs * sizeof(MPI_Datatype));
238 if (fd->is_agg) {
239 ADIOI_Build_agg_reqs (fd, rdwr, nprocs,
240 client_file_view_state_arr,
241 client_comm_dtype_arr,
242 client_comm_sz_arr,
243 &agg_disp,
244 &agg_dtype);
245 buffered_io_size = 0;
246 for (i=0; i <nprocs; i++) {
247 if (client_comm_sz_arr[i] > 0)
248 buffered_io_size += client_comm_sz_arr[i];
249 }
250 }
251 #ifdef USE_PRE_REQ
252 else
253 {
254
255
256
257 for (i = 0; i < fd->hints->cb_nodes; i++)
258 {
259 agg_rank = fd->hints->ranklist[(i+myrank)%fd->hints->cb_nodes];
260 #ifdef AGGREGATION_PROFILE
261 MPE_Log_event (5040, 0, NULL);
262 #endif
263 ADIOI_Build_client_pre_req(
264 fd, agg_rank, (i+myrank)%fd->hints->cb_nodes,
265 &(my_mem_view_state_arr[agg_rank]),
266 &(agg_file_view_state_arr[agg_rank]),
267 2*1024*1024,
268 64*1024);
269 #ifdef AGGREGATION_PROFILE
270 MPE_Log_event (5041, 0, NULL);
271 #endif
272 }
273 }
274 #endif
275
276
277 if (fd->is_agg)
278 cb_buf = (char *) ADIOI_Malloc (fd->hints->cb_buffer_size);
279 alltoallw_disps = (int *) ADIOI_Calloc (nprocs, sizeof(int));
280 alltoallw_counts = client_alltoallw_counts = (int *)
281 ADIOI_Calloc (2*nprocs, sizeof(int));
282 agg_alltoallw_counts = &alltoallw_counts[nprocs];
283
284 if (fd->hints->cb_alltoall == ADIOI_HINT_DISABLE) {
285
286 if ((fd->is_agg) && (rdwr == ADIOI_WRITE))
287 post_aggregator_comm(fd->comm, rdwr, nprocs, cb_buf,
288 client_comm_dtype_arr,
289 client_comm_sz_arr,
290 &agg_comm_requests,
291 &aggs_client_count);
292 }
293
294 Exch_data_amounts (fd, nprocs, client_comm_sz_arr, agg_comm_sz_arr,
295 client_alltoallw_counts, agg_alltoallw_counts,
296 &aggregators_done);
297
298 #ifdef DEBUG
299 fprintf (stderr, "client_alltoallw_counts[ ");
300 for (i=0; i<nprocs; i++) {
301 fprintf (stderr, "%d ", client_alltoallw_counts[i]);
302 }
303 fprintf (stderr, "]\n");
304 fprintf (stderr, "agg_alltoallw_counts[ ");
305 for (i=0; i<nprocs; i++) {
306 fprintf (stderr,"%d ", agg_alltoallw_counts[i]);
307 }
308 fprintf (stderr, "]\n");
309 #endif
310
311
312 while (aggregators_done != nprocs_for_coll) {
313 if (fd->hints->cb_alltoall == ADIOI_HINT_DISABLE) {
314
315
316
317
318 client_comm_requests = (MPI_Request *)
319 ADIOI_Calloc (fd->hints->cb_nodes, sizeof(MPI_Request));
320
321 for (i = 0; i < fd->hints->cb_nodes; i++)
322 {
323 clients_agg_count = 0;
324 agg_rank = fd->hints->ranklist[(i+myrank)%fd->hints->cb_nodes];
325 if (agg_comm_sz_arr[agg_rank] > 0) {
326 ADIOI_Build_client_req(fd, agg_rank,
327 (i+myrank)%fd->hints->cb_nodes,
328 &(my_mem_view_state_arr[agg_rank]),
329 &(agg_file_view_state_arr[agg_rank]),
330 agg_comm_sz_arr[agg_rank],
331 &(agg_comm_dtype_arr[agg_rank]));
332
333 #ifdef AGGREGATION_PROFILE
334 if (i == 0)
335 MPE_Log_event (5038, 0, NULL);
336 #endif
337 post_client_comm (fd, rdwr, agg_rank, buf,
338 agg_comm_dtype_arr[agg_rank],
339 agg_alltoallw_counts[agg_rank],
340 &client_comm_requests[clients_agg_count]);
341 clients_agg_count++;
342 }
343 }
344 #ifdef AGGREGATION_PROFILE
345 if (!clients_agg_count)
346 MPE_Log_event(5039, 0, NULL);
347 #endif
348
349 if (rdwr == ADIOI_READ) {
350 if (fd->is_agg && buffered_io_size) {
351 ADIOI_IOFiletype (fd, cb_buf, buffered_io_size, MPI_BYTE,
352 ADIO_EXPLICIT_OFFSET, agg_disp, agg_dtype,
353 ADIOI_READ, status, error_code);
354 if (*error_code != MPI_SUCCESS) return;
355 MPI_Type_free (&agg_dtype);
356 }
357
358 #ifdef DEBUG
359 fprintf (stderr, "expecting from [agg](disp,size,cnt)=");
360 for (i=0; i < nprocs; i++) {
361 MPI_Type_size_x (agg_comm_dtype_arr[i], &size);
362 fprintf (stderr, "[%d](%d,%d,%d)", i, alltoallw_disps[i],
363 size, agg_alltoallw_counts[i]);
364 if (i != nprocs - 1)
365 fprintf(stderr, ",");
366 }
367 fprintf (stderr, "]\n");
368 if (fd->is_agg) {
369 fprintf (stderr, "sending to [client](disp,size,cnt)=");
370 for (i=0; i < nprocs; i++) {
371 if (fd->is_agg)
372 MPI_Type_size_x (client_comm_dtype_arr[i], &size);
373 else
374 size = -1;
375
376 fprintf (stderr, "[%d](%d,%d,%d)", i, alltoallw_disps[i],
377 size, client_alltoallw_counts[i]);
378 if (i != nprocs - 1)
379 fprintf(stderr, ",");
380 }
381 fprintf (stderr,"\n");
382 }
383 fflush (NULL);
384 #endif
385
386 if (fd->is_agg)
387 post_aggregator_comm(fd->comm, rdwr, nprocs, cb_buf,
388 client_comm_dtype_arr,
389 client_comm_sz_arr,
390 &agg_comm_requests,
391 &aggs_client_count);
392
393 if (fd->is_agg && aggs_client_count) {
394 agg_comm_statuses = ADIOI_Malloc(aggs_client_count *
395 sizeof(MPI_Status));
396 MPI_Waitall(aggs_client_count, agg_comm_requests,
397 agg_comm_statuses);
398 #ifdef AGGREGATION_PROFILE
399 MPE_Log_event (5033, 0, NULL);
400 #endif
401 ADIOI_Free (agg_comm_requests);
402 ADIOI_Free (agg_comm_statuses);
403 }
404
405 if (clients_agg_count) {
406 client_comm_statuses = ADIOI_Malloc(clients_agg_count *
407 sizeof(MPI_Status));
408 MPI_Waitall(clients_agg_count, client_comm_requests,
409 client_comm_statuses);
410 #ifdef AGGREGATION_PROFILE
411 MPE_Log_event (5039, 0, NULL);
412 #endif
413 ADIOI_Free (client_comm_requests);
414 ADIOI_Free (client_comm_statuses);
415 }
416
417 #ifdef DEBUG2
418 fprintf (stderr, "buffered_io_size = %lld\n", buffered_io_size);
419 if (fd->is_agg && buffered_io_size) {
420 fprintf (stderr, "buf = [");
421 for (i=0; i<bufextent; i++)
422 fprintf (stderr, "%c", ((char *) buf)[i]);
423 fprintf (stderr, "]\n");
424 fprintf (stderr, "cb_buf = [");
425 for (i=0; i<buffered_io_size; i++)
426 fprintf (stderr, "%c", cb_buf[i]);
427 fprintf (stderr, "]\n");
428 fflush (NULL);
429 }
430 #endif
431 }
432 else {
433 #ifdef DEBUG
434 fprintf (stderr, "sending to [agg](disp,size,cnt)=");
435 for (i=0; i < nprocs; i++) {
436 MPI_Type_size_x (agg_comm_dtype_arr[i], &size);
437 fprintf (stderr, "[%d](%d,%d,%d)", i, alltoallw_disps[i],
438 size, agg_alltoallw_counts[i]);
439 if (i != nprocs - 1)
440 fprintf(stderr, ",");
441 }
442 fprintf (stderr, "]\n");
443 fprintf (stderr, "expecting from [client](disp,size,cnt)=");
444 for (i=0; i < nprocs; i++) {
445 if (fd->is_agg)
446 MPI_Type_size_x (client_comm_dtype_arr[i], &size);
447 else
448 size = -1;
449
450 fprintf (stderr, "[%d](%d,%d,%d)", i, alltoallw_disps[i],
451 size, client_alltoallw_counts[i]);
452 if (i != nprocs - 1)
453 fprintf(stderr, ",");
454 }
455 fprintf (stderr,"\n");
456 fflush (NULL);
457 #endif
458 #ifdef DEBUG
459 fprintf (stderr, "buffered_io_size = %lld\n", buffered_io_size);
460 #endif
461
462 if (clients_agg_count) {
463 client_comm_statuses = ADIOI_Malloc(clients_agg_count *
464 sizeof(MPI_Status));
465 MPI_Waitall(clients_agg_count, client_comm_requests,
466 client_comm_statuses);
467 #ifdef AGGREGATION_PROFILE
468 MPE_Log_event (5039, 0, NULL);
469 #endif
470 ADIOI_Free(client_comm_requests);
471 ADIOI_Free(client_comm_statuses);
472 }
473 #ifdef DEBUG2
474 if (bufextent) {
475 fprintf (stderr, "buf = [");
476 for (i=0; i<bufextent; i++)
477 fprintf (stderr, "%c", ((char *) buf)[i]);
478 fprintf (stderr, "]\n");
479 }
480 #endif
481
482 if (fd->is_agg && buffered_io_size) {
483 assert (aggs_client_count != 0);
484
485 agg_comm_statuses = (MPI_Status *)
486 ADIOI_Malloc (aggs_client_count*sizeof(MPI_Status));
487
488 MPI_Waitall (aggs_client_count, agg_comm_requests,
489 agg_comm_statuses);
490 #ifdef AGGREGATION_PROFILE
491 MPE_Log_event (5033, 0, NULL);
492 #endif
493 ADIOI_Free (agg_comm_requests);
494 ADIOI_Free (agg_comm_statuses);
495 #ifdef DEBUG2
496 fprintf (stderr, "cb_buf = [");
497 for (i=0; i<buffered_io_size; i++)
498 fprintf (stderr, "%c", cb_buf[i]);
499 fprintf (stderr, "]\n");
500 fflush (NULL);
501 #endif
502 ADIOI_IOFiletype (fd, cb_buf, buffered_io_size, MPI_BYTE,
503 ADIO_EXPLICIT_OFFSET, agg_disp, agg_dtype,
504 ADIOI_WRITE, status, error_code);
505 if (*error_code != MPI_SUCCESS) return;
506 MPI_Type_free (&agg_dtype);
507 }
508
509 }
510 } else {
511
512 ADIOI_Build_client_reqs(fd, nprocs, my_mem_view_state_arr,
513 agg_file_view_state_arr,
514 agg_comm_sz_arr, agg_comm_dtype_arr);
515
516 if (rdwr == ADIOI_READ) {
517 if (fd->is_agg && buffered_io_size) {
518 ADIOI_IOFiletype (fd, cb_buf, buffered_io_size, MPI_BYTE,
519 ADIO_EXPLICIT_OFFSET, agg_disp, agg_dtype,
520 ADIOI_READ, status, error_code);
521 if (*error_code != MPI_SUCCESS) return;
522 MPI_Type_free (&agg_dtype);
523 }
524
525 #ifdef AGGREGATION_PROFILE
526 MPE_Log_event (5032, 0, NULL);
527 #endif
528 MPI_Alltoallw (cb_buf, client_alltoallw_counts, alltoallw_disps,
529 client_comm_dtype_arr,
530 buf, agg_alltoallw_counts , alltoallw_disps,
531 agg_comm_dtype_arr,
532 fd->comm);
533 #ifdef AGGREGATION_PROFILE
534 MPE_Log_event (5033, 0, NULL);
535 #endif
536 }
537 else {
538 #ifdef AGGREGATION_PROFILE
539 MPE_Log_event (5032, 0, NULL);
540 #endif
541 MPI_Alltoallw (buf, agg_alltoallw_counts, alltoallw_disps,
542 agg_comm_dtype_arr,
543 cb_buf, client_alltoallw_counts, alltoallw_disps,
544 client_comm_dtype_arr,
545 fd->comm);
546 #ifdef AGGREGATION_PROFILE
547 MPE_Log_event (5033, 0, NULL);
548 #endif
549 if (fd->is_agg && buffered_io_size) {
550 ADIOI_IOFiletype (fd, cb_buf, buffered_io_size, MPI_BYTE,
551 ADIO_EXPLICIT_OFFSET, agg_disp, agg_dtype,
552 ADIOI_WRITE, status, error_code);
553 if (*error_code != MPI_SUCCESS) return;
554 MPI_Type_free (&agg_dtype);
555 }
556 }
557 }
558
559
560 if (fd->is_agg) {
561 if (buffered_io_size > 0) {
562 for (i=0; i<nprocs; i++) {
563 if (client_comm_sz_arr[i] > 0)
564 MPI_Type_free (&client_comm_dtype_arr[i]);
565 }
566 }
567 }
568 for (i=0; i<nprocs; i++) {
569 if (agg_comm_sz_arr[i] > 0)
570 MPI_Type_free (&agg_comm_dtype_arr[i]);
571 }
572
573
574 if (fd->is_agg) {
575 ADIOI_Build_agg_reqs (fd, rdwr, nprocs,
576 client_file_view_state_arr,
577 client_comm_dtype_arr,
578 client_comm_sz_arr,
579 &agg_disp,
580 &agg_dtype);
581 buffered_io_size = 0;
582 for (i=0; i <nprocs; i++) {
583 if (client_comm_sz_arr[i] > 0)
584 buffered_io_size += client_comm_sz_arr[i];
585 }
586 }
587 #ifdef USE_PRE_REQ
588 else {
589
590
591 for (i = 0; i < fd->hints->cb_nodes; i++)
592 {
593 agg_rank = fd->hints->ranklist[(i+myrank)%fd->hints->cb_nodes];
594 #ifdef AGGREGATION_PROFILE
595 MPE_Log_event (5040, 0, NULL);
596 #endif
597 ADIOI_Build_client_pre_req(
598 fd, agg_rank, (i+myrank)%fd->hints->cb_nodes,
599 &(my_mem_view_state_arr[agg_rank]),
600 &(agg_file_view_state_arr[agg_rank]),
601 2*1024*1024,
602 64*1024);
603 #ifdef AGGREGATION_PROFILE
604 MPE_Log_event (5041, 0, NULL);
605 #endif
606 }
607 }
608 #endif
609
610
611
612
613 if (fd->hints->cb_alltoall == ADIOI_HINT_DISABLE) {
614 if ((fd->is_agg) && (rdwr == ADIOI_WRITE))
615 post_aggregator_comm(fd->comm, rdwr, nprocs, cb_buf,
616 client_comm_dtype_arr,
617 client_comm_sz_arr,
618 &agg_comm_requests,
619 &aggs_client_count);
620 }
621
622
623 Exch_data_amounts (fd, nprocs, client_comm_sz_arr, agg_comm_sz_arr,
624 client_alltoallw_counts, agg_alltoallw_counts,
625 &aggregators_done);
626
627 }
628
629
630
631 if (fd->hints->cb_pfr != ADIOI_HINT_ENABLE) {
632
633 if (1) {
634 ADIOI_Delete_flattened (fd->file_realm_types[0]);
635 MPI_Type_free (&fd->file_realm_types[0]);
636 }
637 else {
638 for (i=0; i<fd->hints->cb_nodes; i++) {
639 ADIOI_Datatype_iscontig(fd->file_realm_types[i], &is_contig);
640 if (!is_contig)
641 ADIOI_Delete_flattened(fd->file_realm_types[i]);
642 MPI_Type_free (&fd->file_realm_types[i]);
643 }
644 }
645 ADIOI_Free (fd->file_realm_types);
646 ADIOI_Free (fd->file_realm_st_offs);
647 }
648
649
650
651
652 ADIOI_Delete_flattened(datatype);
653 ADIOI_Delete_flattened(fd->filetype);
654
655 if (fd->is_agg) {
656 if (buffered_io_size > 0)
657 MPI_Type_free (&agg_dtype);
658 for (i=0; i<nprocs; i++) {
659 MPI_Type_free (&client_comm_dtype_arr[i]);
660 ADIOI_Free (client_file_view_state_arr[i].flat_type_p->indices);
661 ADIOI_Free (client_file_view_state_arr[i].flat_type_p->blocklens);
662 ADIOI_Free (client_file_view_state_arr[i].flat_type_p);
663 }
664 ADIOI_Free (client_file_view_state_arr);
665 ADIOI_Free (cb_buf);
666 }
667 for (i = 0; i<nprocs; i++)
668 if (agg_comm_sz_arr[i] > 0)
669 MPI_Type_free (&agg_comm_dtype_arr[i]);
670
671 ADIOI_Free (client_comm_sz_arr);
672 ADIOI_Free (client_comm_dtype_arr);
673 ADIOI_Free (my_mem_view_state_arr);
674 ADIOI_Free (agg_file_view_state_arr);
675 ADIOI_Free (agg_comm_sz_arr);
676 ADIOI_Free (agg_comm_dtype_arr);
677 ADIOI_Free (alltoallw_disps);
678 ADIOI_Free (alltoallw_counts);
679 ADIOI_Free (all_st_end_offsets);
680
681 #ifdef HAVE_STATUS_SET_BYTES
682 MPIR_Status_set_bytes(status, datatype, bufsize);
683
684
685
686 #endif
687 fd->fp_sys_posn = -1;
688 #ifdef AGGREGATION_PROFILE
689 if (rdwr == ADIOI_READ)
690 MPE_Log_event (5011, 0, NULL);
691 else
692 MPE_Log_event (5013, 0, NULL);
693 #endif
694 }
695
696
697
698
699 void ADIOI_Calc_bounds (ADIO_File fd, int count, MPI_Datatype buftype,
700 int file_ptr_type, ADIO_Offset offset,
701 ADIO_Offset *st_offset, ADIO_Offset *end_offset)
702 {
703 MPI_Count filetype_size, buftype_size, etype_size;
704 int sum;
705 MPI_Aint filetype_extent, lb;
706 ADIO_Offset total_io;
707 int filetype_is_contig;
708 ADIO_Offset i, remainder;
709 ADIOI_Flatlist_node *flat_file;
710
711 ADIO_Offset st_byte_off, end_byte_off;
712
713 #ifdef AGGREGATION_PROFILE
714 MPE_Log_event (5000, 0, NULL);
715 #endif
716
717 if (!count) {
718
719
720 memset (st_offset, 8, sizeof(ADIO_Offset));
721 *st_offset = *st_offset / 2;
722 *end_offset = -1;
723 return;
724 }
725
726 ADIOI_Datatype_iscontig (fd->filetype, &filetype_is_contig);
727
728 MPI_Type_size_x (fd->filetype, &filetype_size);
729 MPI_Type_get_extent (fd->filetype, &lb, &filetype_extent);
730 MPI_Type_size_x (fd->etype, &etype_size);
731 MPI_Type_size_x (buftype, &buftype_size);
732
733 total_io = buftype_size * count;
734
735 if (filetype_is_contig) {
736 if (file_ptr_type == ADIO_INDIVIDUAL)
737 st_byte_off = fd->fp_ind;
738 else
739 st_byte_off = fd->disp + etype_size * offset;
740
741 end_byte_off = st_byte_off + total_io - 1;
742 }
743 else {
744 flat_file = ADIOI_Flatlist;
745 while (flat_file->type != fd->filetype) flat_file = flat_file->next;
746
747
748
749
750
751 if (file_ptr_type == ADIO_INDIVIDUAL) {
752 st_byte_off = fd->fp_ind;
753
754
755
756 end_byte_off = (ADIO_Offset)
757 ((fd->fp_ind - fd->disp - flat_file->indices[0]) /
758 filetype_extent) * filetype_extent + fd->disp +
759 flat_file->indices[0];
760
761 remainder = (fd->fp_ind - fd->disp - flat_file->indices[0]) %
762 filetype_extent;
763 if (remainder) {
764
765 sum = 0;
766 for (i=0; i<flat_file->count; i++) {
767 sum += flat_file->blocklens[i];
768 if ((flat_file->indices[i] - flat_file->indices[0] +
769 flat_file->blocklens[i]) >= remainder) {
770 sum -= (flat_file->blocklens[i] - (sum - remainder));
771 break;
772 }
773 }
774 total_io += sum;
775 }
776
777 end_byte_off += (total_io - 1) / filetype_size * filetype_extent;
778
779 remainder = total_io % filetype_size;
780 if (!remainder) {
781 for (i=flat_file->count - 1; i>=0; i--) {
782 if (flat_file->blocklens[i]) break;
783 }
784 assert (i > -1);
785 end_byte_off += flat_file->indices[i] +
786 flat_file->blocklens[i] - 1;
787 end_byte_off -= flat_file->indices[0];
788 }
789 else {
790 sum = 0;
791 for (i=0; i<flat_file->count; i++) {
792 sum += flat_file->blocklens[i];
793 if (sum >= remainder) {
794 end_byte_off += flat_file->indices[i] +
795 flat_file->blocklens[i] - sum + remainder - 1;
796 break;
797 }
798 }
799 end_byte_off -= flat_file->indices[0];
800 }
801 }
802 else {
803
804
805 st_byte_off = fd->disp + ((offset * etype_size) / filetype_size) *
806 filetype_extent;
807
808 remainder = (etype_size * offset) % filetype_size;
809
810 sum = 0;
811 for (i=0; i<flat_file->count; i++) {
812 sum += flat_file->blocklens[i];
813 if (sum >= remainder) {
814 if (sum == remainder)
815 st_byte_off += flat_file->indices[i+1];
816 else
817 st_byte_off += flat_file->indices[i] +
818 flat_file->blocklens[i] - sum + remainder;
819 break;
820 }
821 }
822
823
824
825 end_byte_off = fd->disp + (offset * etype_size + total_io) /
826 filetype_size * filetype_extent;
827
828 remainder = (offset * etype_size + total_io) % filetype_size;
829
830 if (!remainder) {
831
832 for (i=flat_file->count-1; i>=0; i--) {
833 if (flat_file->blocklens[i]) break;
834 }
835 assert (i >= 0);
836
837
838
839
840
841
842 end_byte_off -= filetype_extent - flat_file->indices[i] -
843 flat_file->blocklens[i] + 1;
844 }
845 else {
846 sum = 0;
847 for (i=0; i<flat_file->count; i++) {
848 sum += flat_file->blocklens[i];
849 if (sum >= remainder) {
850 end_byte_off += flat_file->indices[i] +
851 flat_file->blocklens[i] - sum + remainder - 1;
852 break;
853 }
854 }
855 }
856 }
857 }
858
859 *st_offset = st_byte_off;
860 *end_offset = end_byte_off;
861 #ifdef DEBUG
862 printf ("st_offset = %lld\nend_offset = %lld\n",
863 st_byte_off, end_byte_off);
864 #endif
865 #ifdef AGGREGATION_PROFILE
866 MPE_Log_event (5001, 0, NULL);
867 #endif
868 }
869
870
871
872
873
874
875 void ADIOI_IOFiletype(ADIO_File fd, void *buf, int count,
876 MPI_Datatype datatype, int file_ptr_type,
877 ADIO_Offset offset, MPI_Datatype custom_ftype,
878 int rdwr, ADIO_Status *status, int *error_code)
879 {
880 MPI_Datatype user_filetype;
881 MPI_Datatype user_etype;
882 ADIO_Offset user_disp;
883 int user_ind_wr_buffer_size;
884 int user_ind_rd_buffer_size;
885 int f_is_contig, m_is_contig;
886 int user_ds_read, user_ds_write;
887 MPI_Aint f_extent, lb;
888 MPI_Count f_size;
889 int f_ds_percent;
890
891 #ifdef AGGREGATION_PROFILE
892 if (rdwr == ADIOI_READ)
893 MPE_Log_event(5006, 0, NULL);
894 else
895 MPE_Log_event(5008, 0, NULL);
896 #endif
897 MPI_Type_get_extent(custom_ftype, &lb, &f_extent);
898 MPI_Type_size_x(custom_ftype, &f_size);
899 f_ds_percent = 100 * f_size / f_extent;
900
901
902 user_filetype = fd->filetype;
903 user_etype = fd->etype;
904 user_disp = fd->disp;
905 user_ds_read = fd->hints->ds_read;
906 user_ds_write = fd->hints->ds_write;
907
908 user_ind_wr_buffer_size = fd->hints->ind_wr_buffer_size;
909 user_ind_rd_buffer_size = fd->hints->ind_rd_buffer_size;
910
911
912 fd->filetype = custom_ftype;
913 fd->etype = MPI_BYTE;
914
915 fd->hints->ind_wr_buffer_size = fd->hints->cb_buffer_size;
916 fd->hints->ind_rd_buffer_size = fd->hints->cb_buffer_size;
917
918 #ifdef DEBUG
919 printf ("f_ds_percent = %d cb_ds_threshold = %d\n", f_ds_percent,
920 fd->hints->cb_ds_threshold);
921 #endif
922 if (f_ds_percent >= fd->hints->cb_ds_threshold) {
923 fd->hints->ds_read = ADIOI_HINT_ENABLE;
924 fd->hints->ds_write = ADIOI_HINT_ENABLE;
925 }
926 else {
927 fd->hints->ds_read = ADIOI_HINT_DISABLE;
928 fd->hints->ds_write = ADIOI_HINT_DISABLE;
929 }
930
931
932
933
934
935
936 ADIOI_Datatype_iscontig(custom_ftype, &f_is_contig);
937 ADIOI_Datatype_iscontig(datatype, &m_is_contig);
938 if (!f_is_contig)
939 ADIOI_Flatten_datatype (custom_ftype);
940
941
942
943 if (f_is_contig && m_is_contig) {
944 fd->disp = 0;
945 if (rdwr == ADIOI_READ)
946 ADIO_ReadContig(fd, buf, count, datatype, file_ptr_type, offset,
947 status, error_code);
948 else
949 ADIO_WriteContig(fd, buf, count, datatype, file_ptr_type, offset,
950 status, error_code);
951 }
952 else {
953 fd->disp = offset;
954 if (rdwr == ADIOI_READ)
955 ADIO_ReadStrided(fd, buf, count, datatype, file_ptr_type, 0,
956 status, error_code);
957 else
958 ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type, 0,
959 status, error_code);
960 }
961
962
963 if (!f_is_contig)
964 ADIOI_Delete_flattened (custom_ftype);
965
966
967 fd->filetype = user_filetype;
968 fd->etype = user_etype;
969 fd->disp = user_disp;
970 fd->hints->ds_read = user_ds_read;
971 fd->hints->ds_write = user_ds_write;
972 fd->hints->ind_wr_buffer_size = user_ind_wr_buffer_size;
973 fd->hints->ind_rd_buffer_size = user_ind_rd_buffer_size;
974 #ifdef AGGREGATION_PROFILE
975 if (rdwr == ADIOI_READ)
976 MPE_Log_event (5007, 0, NULL);
977 else
978 MPE_Log_event (5009, 0, NULL);
979 #endif
980 }
981
982 static void Exch_data_amounts (ADIO_File fd, int nprocs,
983 ADIO_Offset *client_comm_sz_arr,
984 ADIO_Offset *agg_comm_sz_arr,
985 int *client_alltoallw_counts,
986 int *agg_alltoallw_counts,
987 int *aggregators_done)
988 {
989 int i;
990 int recv_idx;
991 MPI_Request *recv_requests;
992 MPI_Request *send_requests;
993 MPI_Status status;
994 MPI_Status *send_statuses;
995
996 if (fd->hints->cb_alltoall != ADIOI_HINT_DISABLE) {
997 MPI_Alltoall (client_comm_sz_arr, sizeof(ADIO_Offset), MPI_BYTE,
998 agg_comm_sz_arr, sizeof(ADIO_Offset), MPI_BYTE,
999 fd->comm);
1000
1001 if (fd->is_agg) {
1002 for (i=0; i<nprocs; i++)
1003 if (client_comm_sz_arr[i] > 0)
1004 client_alltoallw_counts[i] = 1;
1005 else
1006 client_alltoallw_counts[i] = 0;
1007 }
1008 *aggregators_done = 0;
1009 for (i=0; i<nprocs; i++) {
1010 if (agg_comm_sz_arr[i] == -1)
1011 *aggregators_done = *aggregators_done + 1;
1012 else if (agg_comm_sz_arr[i] > 0)
1013 agg_alltoallw_counts[i] = 1;
1014 else
1015 agg_alltoallw_counts[i] = 0;
1016 }
1017 } else {
1018
1019
1020
1021 recv_requests = ADIOI_Malloc (fd->hints->cb_nodes * sizeof(MPI_Request));
1022
1023 for (i = 0; i < fd->hints->cb_nodes; i++)
1024 MPI_Irecv (&agg_comm_sz_arr[fd->hints->ranklist[i]],
1025 sizeof(ADIO_Offset), MPI_BYTE, fd->hints->ranklist[i],
1026 AMT_TAG, fd->comm, &recv_requests[i]);
1027
1028
1029
1030
1031 send_requests = NULL;
1032 if (fd->is_agg) {
1033
1034 send_requests = ADIOI_Malloc (nprocs * sizeof(MPI_Request));
1035
1036
1037 for (i = 0; i < nprocs; i++) {
1038 MPI_Isend (&client_comm_sz_arr[i], sizeof(ADIO_Offset),
1039 MPI_BYTE, i, AMT_TAG, fd->comm, &send_requests[i]);
1040
1041 if (client_comm_sz_arr[i] > 0)
1042 client_alltoallw_counts[i] = 1;
1043 else
1044 client_alltoallw_counts[i] = 0;
1045 }
1046 }
1047
1048 *aggregators_done = 0;
1049 for (i=0; i < fd->hints->cb_nodes; i++) {
1050 MPI_Waitany (fd->hints->cb_nodes, recv_requests, &recv_idx, &status);
1051 if (agg_comm_sz_arr[fd->hints->ranklist[recv_idx]] == -1)
1052 *aggregators_done = *aggregators_done + 1;
1053 else if (agg_comm_sz_arr[fd->hints->ranklist[recv_idx]] > 0)
1054 agg_alltoallw_counts[fd->hints->ranklist[recv_idx]] = 1;
1055 else
1056 agg_alltoallw_counts[fd->hints->ranklist[recv_idx]] = 0;
1057 }
1058
1059 ADIOI_Free (recv_requests);
1060 if (fd->is_agg) {
1061
1062 send_statuses = ADIOI_Malloc (nprocs * sizeof (MPI_Status));
1063 MPI_Waitall (nprocs, send_requests, send_statuses);
1064 ADIOI_Free (send_requests);
1065 ADIOI_Free (send_statuses);
1066 }
1067 }
1068 }
1069
1070 static void post_aggregator_comm (MPI_Comm comm, int rw_type,
1071 int nproc, void *cb_buf,
1072 MPI_Datatype *client_comm_dtype_arr,
1073 ADIO_Offset *client_comm_sz_arr,
1074 MPI_Request **requests_p,
1075 int *aggs_client_count_p)
1076 {
1077 int aggs_client_count = 0;
1078 MPI_Request *requests;
1079 int i;
1080
1081 #ifdef DEBUG
1082 printf ("posting aggregator communication\n");
1083 #endif
1084
1085 for (i=0; i < nproc; i++)
1086 if (client_comm_sz_arr[i] > 0)
1087 aggs_client_count++;
1088 #ifdef DEBUG
1089 printf ("aggregator needs to talk to %d clients\n",
1090 aggs_client_count);
1091 #endif
1092 *aggs_client_count_p = aggs_client_count;
1093 if (aggs_client_count) {
1094 requests = (MPI_Request *)
1095 ADIOI_Malloc (aggs_client_count * sizeof(MPI_Request));
1096 aggs_client_count = 0;
1097 #ifdef AGGREGATION_PROFILE
1098 MPE_Log_event (5032, 0, NULL);
1099 #endif
1100 for (i=0; i < nproc; i++) {
1101 if (client_comm_sz_arr[i] > 0) {
1102 if (rw_type == ADIOI_WRITE)
1103 MPI_Irecv (cb_buf, 1, client_comm_dtype_arr[i], i,
1104 DATA_TAG, comm,
1105 &requests[aggs_client_count]);
1106 else
1107 MPI_Isend (cb_buf, 1, client_comm_dtype_arr[i], i,
1108 DATA_TAG, comm,
1109 &requests[aggs_client_count]);
1110
1111 aggs_client_count++;
1112 }
1113 }
1114 *requests_p = requests;
1115 }
1116 }
1117
1118 static void post_client_comm (ADIO_File fd, int rw_type,
1119 int agg_rank, void *buf,
1120 MPI_Datatype agg_comm_dtype,
1121 int agg_alltoallw_count,
1122 MPI_Request *request)
1123 {
1124 if (agg_alltoallw_count) {
1125 if (rw_type == ADIOI_READ)
1126 MPI_Irecv (buf, 1, agg_comm_dtype, agg_rank, DATA_TAG, fd->comm,
1127 request);
1128 else
1129 MPI_Isend (buf, 1, agg_comm_dtype, agg_rank, DATA_TAG, fd->comm,
1130 request);
1131 }
1132 }
1133
1134
1135