This source file includes following definitions.
- mca_fcoll_two_phase_file_write_all
- two_phase_exch_and_write
- two_phase_exchage_data
- two_phase_fill_send_buffer
- two_phase_heap_merge
- is_aggregator
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 #include "ompi_config.h"
26 #include "fcoll_two_phase.h"
27
28 #include "mpi.h"
29 #include "ompi/constants.h"
30 #include "ompi/communicator/communicator.h"
31 #include "ompi/mca/fcoll/fcoll.h"
32 #include "ompi/mca/common/ompio/common_ompio.h"
33 #include "ompi/mca/io/io.h"
34 #include "opal/mca/base/base.h"
35 #include "math.h"
36 #include "ompi/mca/pml/pml.h"
37 #include <unistd.h>
38
39 #define DEBUG_ON 0
40
41
42
43
44
45
46
47
48 typedef struct flat_list_node {
49 MPI_Datatype type;
50 int count;
51 OMPI_MPI_OFFSET_TYPE *blocklens;
52 OMPI_MPI_OFFSET_TYPE *indices;
53 struct flat_list_node *next;
54 } Flatlist_node;
55
56
57
58
59 static int two_phase_exch_and_write(ompio_file_t *fh,
60 const void *buf,
61 MPI_Datatype datatype,
62 mca_common_ompio_access_array_t *others_req,
63 struct iovec *offset_len,
64 int contig_access_count,
65 OMPI_MPI_OFFSET_TYPE min_st_offset,
66 OMPI_MPI_OFFSET_TYPE fd_size,
67 OMPI_MPI_OFFSET_TYPE *fd_start,
68 OMPI_MPI_OFFSET_TYPE *fd_end,
69 Flatlist_node *flat_buf,
70 size_t *buf_idx, int striping_unit,
71 int num_io_procs, int *aggregator_list);
72
73
74
75 static int two_phase_exchage_data(ompio_file_t *fh,
76 const void *buf,
77 char *write_buf,
78 struct iovec *offset_length,
79 int *send_size, int *start_pos,
80 int *recv_size,
81 OMPI_MPI_OFFSET_TYPE off,
82 OMPI_MPI_OFFSET_TYPE size, int *count,
83 int *partial_recv, int *sent_to_proc,
84 int contig_access_count,
85 OMPI_MPI_OFFSET_TYPE min_st_offset,
86 OMPI_MPI_OFFSET_TYPE fd_size,
87 OMPI_MPI_OFFSET_TYPE *fd_start,
88 OMPI_MPI_OFFSET_TYPE *fd_end,
89 Flatlist_node *flat_buf,
90 mca_common_ompio_access_array_t *others_req,
91 int *send_buf_idx, int *curr_to_proc,
92 int *done_to_proc, int iter,
93 size_t *buf_idx, MPI_Aint buftype_extent,
94 int striping_unit, int num_io_procs,
95 int *aggregator_list, int *hole);
96
97
98 static int two_phase_fill_send_buffer(ompio_file_t *fh,
99 const void *buf,
100 Flatlist_node *flat_buf,
101 char **send_buf,
102 struct iovec *offset_length,
103 int *send_size,
104 MPI_Request *send_req,
105 int *sent_to_proc,
106 int contig_access_count,
107 OMPI_MPI_OFFSET_TYPE min_st_offset,
108 OMPI_MPI_OFFSET_TYPE fd_size,
109 OMPI_MPI_OFFSET_TYPE *fd_start,
110 OMPI_MPI_OFFSET_TYPE *fd_end,
111 int *send_buf_idx,
112 int *curr_to_proc,
113 int *done_to_proc,
114 int iter, MPI_Aint buftype_extent,
115 int striping_unit,
116 int num_io_procs, int *aggregator_list);
117 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
118 static int is_aggregator(int rank,
119 int nprocs_for_coll,
120 int *aggregator_list);
121 #endif
122
123 void two_phase_heap_merge(mca_common_ompio_access_array_t *others_req,
124 int *count,
125 OMPI_MPI_OFFSET_TYPE *srt_off,
126 int *srt_len,
127 int *start_pos,
128 int myrank,
129 int nprocs,
130 int nprocs_recv,
131 int total_elements);
132
133
134
135
136
137 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
138 double write_time = 0.0, start_write_time = 0.0, end_write_time = 0.0;
139 double comm_time = 0.0, start_comm_time = 0.0, end_comm_time = 0.0;
140 double exch_write = 0.0, start_exch = 0.0, end_exch = 0.0;
141 #endif
142
143 int
144 mca_fcoll_two_phase_file_write_all (ompio_file_t *fh,
145 const void *buf,
146 int count,
147 struct ompi_datatype_t *datatype,
148 ompi_status_public_t *status)
149 {
150
151
152
153 int i, j,interleave_count=0, striping_unit=0;
154 uint32_t iov_count=0,ti;
155 struct iovec *decoded_iov=NULL, *temp_iov=NULL;
156 size_t max_data = 0, total_bytes = 0;
157 long long_max_data = 0, long_total_bytes = 0;
158 int domain_size=0, *count_my_req_per_proc=NULL, count_my_req_procs;
159 int count_other_req_procs, ret=OMPI_SUCCESS;
160 int two_phase_num_io_procs=1;
161 size_t *buf_indices=NULL;
162 int local_count = 0, local_size=0,*aggregator_list = NULL;
163 struct iovec *iov = NULL;
164
165 OMPI_MPI_OFFSET_TYPE start_offset, end_offset, fd_size;
166 OMPI_MPI_OFFSET_TYPE *start_offsets=NULL, *end_offsets=NULL;
167 OMPI_MPI_OFFSET_TYPE *fd_start=NULL, *fd_end=NULL, min_st_offset;
168 Flatlist_node *flat_buf=NULL;
169 mca_common_ompio_access_array_t *my_req=NULL, *others_req=NULL;
170 MPI_Aint send_buf_addr;
171 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
172 mca_common_ompio_print_entry nentry;
173 #endif
174
175
176
177
178
179
180
181 if (! (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) {
182
183 ret = mca_common_ompio_decode_datatype ((struct ompio_file_t *)fh,
184 datatype,
185 count,
186 buf,
187 &max_data,
188 fh->f_mem_convertor,
189 &temp_iov,
190 &iov_count);
191 if (OMPI_SUCCESS != ret ){
192 goto exit;
193 }
194
195 send_buf_addr = (ptrdiff_t)buf;
196 if ( 0 < iov_count ) {
197 decoded_iov = (struct iovec *)malloc
198 (iov_count * sizeof(struct iovec));
199 if (NULL == decoded_iov) {
200 ret = OMPI_ERR_OUT_OF_RESOURCE;
201 goto exit;
202 }
203 }
204 for (ti = 0; ti < iov_count; ti ++){
205 decoded_iov[ti].iov_base = (IOVBASE_TYPE *)(
206 (ptrdiff_t)temp_iov[ti].iov_base -
207 send_buf_addr);
208 decoded_iov[ti].iov_len =
209 temp_iov[ti].iov_len ;
210 #if DEBUG_ON
211 printf("d_offset[%d]: %ld, d_len[%d]: %ld\n",
212 ti, (ptrdiff_t)decoded_iov[ti].iov_base,
213 ti, decoded_iov[ti].iov_len);
214 #endif
215 }
216
217 }
218 else{
219 max_data = count * datatype->super.size;
220 }
221
222 if ( MPI_STATUS_IGNORE != status ) {
223 status->_ucount = max_data;
224 }
225
226 two_phase_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators"));
227 if ( OMPI_ERR_MAX == two_phase_num_io_procs ) {
228 ret = OMPI_ERROR;
229 goto exit;
230 }
231 if(-1 == two_phase_num_io_procs){
232 ret = mca_common_ompio_set_aggregator_props ((struct ompio_file_t *)fh,
233 two_phase_num_io_procs,
234 max_data);
235 if ( OMPI_SUCCESS != ret){
236 goto exit;
237 }
238
239 two_phase_num_io_procs = fh->f_num_aggrs;
240
241 }
242
243 if (two_phase_num_io_procs > fh->f_size){
244 two_phase_num_io_procs = fh->f_size;
245 }
246
247 #if DEBUG_ON
248 printf("Number of aggregators : %ld\n", two_phase_num_io_procs);
249 #endif
250
251 aggregator_list = (int *) malloc (two_phase_num_io_procs *sizeof(int));
252 if ( NULL == aggregator_list ) {
253 ret = OMPI_ERR_OUT_OF_RESOURCE;
254 goto exit;
255 }
256
257 if ( OMPI_COMM_IS_MAPBY_NODE (&ompi_mpi_comm_world.comm) ) {
258 for (i =0; i< two_phase_num_io_procs; i++){
259 aggregator_list[i] = i;
260 }
261 }
262 else {
263 for (i =0; i< two_phase_num_io_procs; i++){
264 aggregator_list[i] = i * fh->f_size / two_phase_num_io_procs;
265 }
266 }
267
268 ret = fh->f_generate_current_file_view ((struct ompio_file_t*)fh,
269 max_data,
270 &iov,
271 &local_count);
272
273
274 if ( OMPI_SUCCESS != ret ){
275 goto exit;
276 }
277
278 long_max_data = (long) max_data;
279 ret = fh->f_comm->c_coll->coll_allreduce (&long_max_data,
280 &long_total_bytes,
281 1,
282 MPI_LONG,
283 MPI_SUM,
284 fh->f_comm,
285 fh->f_comm->c_coll->coll_allreduce_module);
286
287 if ( OMPI_SUCCESS != ret ) {
288 goto exit;
289 }
290 total_bytes = (size_t) long_total_bytes;
291
292 if ( 0 == total_bytes ) {
293 ret = OMPI_SUCCESS;
294 goto exit;
295 }
296
297 if (!(fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) {
298
299
300
301 flat_buf = (Flatlist_node *)malloc(sizeof(Flatlist_node));
302 if ( NULL == flat_buf ){
303 ret = OMPI_ERR_OUT_OF_RESOURCE;
304 goto exit;
305 }
306
307 flat_buf->type = datatype;
308 flat_buf->next = NULL;
309 flat_buf->count = 0;
310 flat_buf->indices = NULL;
311 flat_buf->blocklens = NULL;
312
313 if ( 0 < count ) {
314 local_size = OMPIO_MAX(1,iov_count/count);
315 }
316 else {
317 local_size = 0;
318 }
319
320 if ( 0 < local_size ) {
321 flat_buf->indices =
322 (OMPI_MPI_OFFSET_TYPE *)malloc(local_size *
323 sizeof(OMPI_MPI_OFFSET_TYPE));
324 if ( NULL == flat_buf->indices ){
325 ret = OMPI_ERR_OUT_OF_RESOURCE;
326 goto exit;
327
328 }
329
330 flat_buf->blocklens =
331 (OMPI_MPI_OFFSET_TYPE *)malloc(local_size *
332 sizeof(OMPI_MPI_OFFSET_TYPE));
333 if ( NULL == flat_buf->blocklens ){
334 ret = OMPI_ERR_OUT_OF_RESOURCE;
335 goto exit;
336 }
337 }
338 flat_buf->count = local_size;
339 for (j = 0 ; j < local_size ; ++j) {
340 if ( 0 < max_data ) {
341 flat_buf->indices[j] = (OMPI_MPI_OFFSET_TYPE)(intptr_t)decoded_iov[j].iov_base;
342 flat_buf->blocklens[j] = decoded_iov[j].iov_len;
343 }
344 else {
345 flat_buf->indices[j] = 0;
346 flat_buf->blocklens[j] = 0;
347 }
348 }
349
350 #if DEBUG_ON
351 printf("flat_buf_count : %d\n", flat_buf->count);
352 for(i=0;i<flat_buf->count;i++){
353 printf("%d: blocklen[%d] : %lld, indices[%d]: %lld \n",
354 fh->f_rank, i, flat_buf->blocklens[i], i ,flat_buf->indices[i]);
355
356 }
357 #endif
358 }
359
360 #if DEBUG_ON
361 printf("%d: fcoll:two_phase:write_all->total_bytes:%ld, local_count: %d\n",
362 fh->f_rank,total_bytes, local_count);
363 for (i=0 ; i<local_count ; i++) {
364 printf("%d: fcoll:two_phase:write_all:OFFSET:%ld,LENGTH:%ld\n",
365 fh->f_rank,
366 (size_t)iov[i].iov_base,
367 (size_t)iov[i].iov_len);
368 }
369
370
371 #endif
372
373 start_offset = (OMPI_MPI_OFFSET_TYPE)(uintptr_t)iov[0].iov_base;
374 if ( 0 < local_count ) {
375 end_offset = (OMPI_MPI_OFFSET_TYPE)(uintptr_t)iov[local_count-1].iov_base +
376 (OMPI_MPI_OFFSET_TYPE)iov[local_count-1].iov_len - 1;
377 }
378 else {
379 end_offset = 0;
380 }
381
382 #if DEBUG_ON
383 printf("%d: fcoll:two_phase:write_all:START OFFSET:%ld,END OFFSET:%ld\n",
384 fh->f_rank,
385 (size_t)start_offset,
386 (size_t)end_offset);
387
388 #endif
389
390 start_offsets = (OMPI_MPI_OFFSET_TYPE *)malloc
391 (fh->f_size*sizeof(OMPI_MPI_OFFSET_TYPE));
392
393 if ( NULL == start_offsets ){
394 ret = OMPI_ERR_OUT_OF_RESOURCE;
395 goto exit;
396 }
397
398 end_offsets = (OMPI_MPI_OFFSET_TYPE *)malloc
399 (fh->f_size*sizeof(OMPI_MPI_OFFSET_TYPE));
400
401 if ( NULL == end_offsets ){
402 ret = OMPI_ERR_OUT_OF_RESOURCE;
403 goto exit;
404 }
405
406
407 ret = fh->f_comm->c_coll->coll_allgather(&start_offset,
408 1,
409 OMPI_OFFSET_DATATYPE,
410 start_offsets,
411 1,
412 OMPI_OFFSET_DATATYPE,
413 fh->f_comm,
414 fh->f_comm->c_coll->coll_allgather_module);
415
416 if ( OMPI_SUCCESS != ret ){
417 goto exit;
418 }
419
420
421 ret = fh->f_comm->c_coll->coll_allgather(&end_offset,
422 1,
423 OMPI_OFFSET_DATATYPE,
424 end_offsets,
425 1,
426 OMPI_OFFSET_DATATYPE,
427 fh->f_comm,
428 fh->f_comm->c_coll->coll_allgather_module);
429
430
431 if ( OMPI_SUCCESS != ret ){
432 goto exit;
433 }
434
435 #if DEBUG_ON
436 for (i=0;i<fh->f_size;i++){
437 printf("%d: fcoll:two_phase:write_all:start[%d]:%ld,end[%d]:%ld\n",
438 fh->f_rank,i,
439 (size_t)start_offsets[i],i,
440 (size_t)end_offsets[i]);
441 }
442 #endif
443
444
445
446 for (i=1; i<fh->f_size; i++){
447 if ((start_offsets[i] < end_offsets[i-1]) &&
448 (start_offsets[i] <= end_offsets[i])){
449 interleave_count++;
450 }
451 }
452
453 #if DEBUG_ON
454 printf("%d: fcoll:two_phase:write_all:interleave_count:%d\n",
455 fh->f_rank,interleave_count);
456 #endif
457
458
459 ret = mca_fcoll_two_phase_domain_partition(fh,
460 start_offsets,
461 end_offsets,
462 &min_st_offset,
463 &fd_start,
464 &fd_end,
465 domain_size,
466 &fd_size,
467 striping_unit,
468 two_phase_num_io_procs);
469 if ( OMPI_SUCCESS != ret ){
470 goto exit;
471 }
472
473
474 #if DEBUG_ON
475 for (i=0;i<two_phase_num_io_procs;i++){
476 printf("fd_start[%d] : %lld, fd_end[%d] : %lld, local_count: %d\n",
477 i, fd_start[i], i, fd_end[i], local_count);
478 }
479 #endif
480
481
482 ret = mca_fcoll_two_phase_calc_my_requests (fh,
483 iov,
484 local_count,
485 min_st_offset,
486 fd_start,
487 fd_end,
488 fd_size,
489 &count_my_req_procs,
490 &count_my_req_per_proc,
491 &my_req,
492 &buf_indices,
493 striping_unit,
494 two_phase_num_io_procs,
495 aggregator_list);
496 if ( OMPI_SUCCESS != ret ){
497 goto exit;
498 }
499
500
501
502 ret = mca_fcoll_two_phase_calc_others_requests(fh,
503 count_my_req_procs,
504 count_my_req_per_proc,
505 my_req,
506 &count_other_req_procs,
507 &others_req);
508 if (OMPI_SUCCESS != ret ){
509 goto exit;
510 }
511
512
513 #if DEBUG_ON
514 printf("count_other_req_procs : %d\n", count_other_req_procs);
515 #endif
516
517 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
518 start_exch = MPI_Wtime();
519 #endif
520
521 ret = two_phase_exch_and_write(fh,
522 buf,
523 datatype,
524 others_req,
525 iov,
526 local_count,
527 min_st_offset,
528 fd_size,
529 fd_start,
530 fd_end,
531 flat_buf,
532 buf_indices,
533 striping_unit,
534 two_phase_num_io_procs,
535 aggregator_list);
536
537 if (OMPI_SUCCESS != ret){
538 goto exit;
539 }
540
541
542 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
543 end_exch = MPI_Wtime();
544 exch_write += (end_exch - start_exch);
545
546 nentry.time[0] = write_time;
547 nentry.time[1] = comm_time;
548 nentry.time[2] = exch_write;
549 if (is_aggregator(fh->f_rank,
550 two_phase_num_io_procs,
551 aggregator_list)){
552 nentry.aggregator = 1;
553 }
554 else{
555 nentry.aggregator = 0;
556 }
557 nentry.nprocs_for_coll = two_phase_num_io_procs;
558 if (!mca_common_ompio_full_print_queue(fh->f_coll_write_time)){
559 mca_common_ompio_register_print_entry(fh->f_coll_write_time,
560 nentry);
561 }
562 #endif
563
564 exit :
565 if (flat_buf != NULL) {
566
567 if (flat_buf->blocklens != NULL) {
568 free (flat_buf->blocklens);
569 }
570
571 if (flat_buf->indices != NULL) {
572 free (flat_buf->indices);
573 }
574 free (flat_buf);
575
576 }
577
578
579 free (start_offsets);
580 free (end_offsets);
581 free (aggregator_list);
582 free (decoded_iov);
583 free (fd_start);
584 free (fd_end);
585 free (others_req);
586 free (my_req);
587 free (buf_indices);
588 free (count_my_req_per_proc);
589
590 return ret;
591 }
592
593
594 static int two_phase_exch_and_write(ompio_file_t *fh,
595 const void *buf,
596 MPI_Datatype datatype,
597 mca_common_ompio_access_array_t *others_req,
598 struct iovec *offset_len,
599 int contig_access_count,
600 OMPI_MPI_OFFSET_TYPE min_st_offset,
601 OMPI_MPI_OFFSET_TYPE fd_size,
602 OMPI_MPI_OFFSET_TYPE *fd_start,
603 OMPI_MPI_OFFSET_TYPE *fd_end,
604 Flatlist_node *flat_buf,
605 size_t *buf_idx, int striping_unit,
606 int two_phase_num_io_procs,
607 int *aggregator_list)
608
609 {
610
611
612 int i, j, ntimes, max_ntimes, m;
613 int *curr_offlen_ptr=NULL, *count=NULL, *send_size=NULL, *recv_size=NULL;
614 int *partial_recv=NULL, *start_pos=NULL, req_len, flag;
615 int *sent_to_proc=NULL, ret = OMPI_SUCCESS;
616 int *send_buf_idx=NULL, *curr_to_proc=NULL, *done_to_proc=NULL;
617 OMPI_MPI_OFFSET_TYPE st_loc=-1, end_loc=-1, off, done;
618 OMPI_MPI_OFFSET_TYPE size=0, req_off, len;
619 MPI_Aint buftype_extent;
620 int hole;
621 int two_phase_cycle_buffer_size;
622 size_t byte_size;
623 MPI_Datatype byte = MPI_BYTE;
624 #if DEBUG_ON
625 int ii,jj;
626 #endif
627
628 char *write_buf=NULL;
629
630
631 opal_datatype_type_size(&byte->super,
632 &byte_size);
633
634 for (i = 0; i < fh->f_size; i++){
635 if (others_req[i].count) {
636 st_loc = others_req[i].offsets[0];
637 end_loc = others_req[i].offsets[0];
638 break;
639 }
640 }
641
642 for (i=0;i<fh->f_size;i++){
643 for(j=0;j< others_req[i].count; j++){
644 st_loc = OMPIO_MIN(st_loc, others_req[i].offsets[j]);
645 end_loc = OMPIO_MAX(end_loc, (others_req[i].offsets[j] + others_req[i].lens[j] - 1));
646
647 }
648 }
649
650 two_phase_cycle_buffer_size = fh->f_bytes_per_agg;
651 ntimes = (int) ((end_loc - st_loc + two_phase_cycle_buffer_size)/two_phase_cycle_buffer_size);
652
653 if ((st_loc == -1) && (end_loc == -1)) {
654 ntimes = 0;
655 }
656
657 fh->f_comm->c_coll->coll_allreduce (&ntimes,
658 &max_ntimes,
659 1,
660 MPI_INT,
661 MPI_MAX,
662 fh->f_comm,
663 fh->f_comm->c_coll->coll_allreduce_module);
664
665 if (ntimes){
666 write_buf = (char *) malloc (two_phase_cycle_buffer_size);
667 if ( NULL == write_buf ){
668 return OMPI_ERR_OUT_OF_RESOURCE;
669 }
670 }
671
672 curr_offlen_ptr = (int *) calloc(fh->f_size, sizeof(int));
673
674 if ( NULL == curr_offlen_ptr ){
675 ret = OMPI_ERR_OUT_OF_RESOURCE;
676 goto exit;
677 }
678
679 count = (int *) malloc(fh->f_size*sizeof(int));
680
681 if ( NULL == count ){
682 ret = OMPI_ERR_OUT_OF_RESOURCE;
683 goto exit;
684 }
685
686 partial_recv = (int *)calloc(fh->f_size, sizeof(int));
687
688 if ( NULL == partial_recv ){
689 ret = OMPI_ERR_OUT_OF_RESOURCE;
690 goto exit;
691 }
692
693 send_size = (int *) calloc(fh->f_size,sizeof(int));
694
695 if ( NULL == send_size ){
696 ret = OMPI_ERR_OUT_OF_RESOURCE;
697 goto exit;
698 }
699
700 recv_size = (int *) calloc(fh->f_size,sizeof(int));
701
702 if ( NULL == recv_size ){
703 ret = OMPI_ERR_OUT_OF_RESOURCE;
704 goto exit;
705 }
706
707 send_buf_idx = (int *) malloc(fh->f_size*sizeof(int));
708
709 if ( NULL == send_buf_idx ){
710 ret = OMPI_ERR_OUT_OF_RESOURCE;
711 goto exit;
712 }
713
714 sent_to_proc = (int *) calloc(fh->f_size, sizeof(int));
715
716 if ( NULL == sent_to_proc){
717 ret = OMPI_ERR_OUT_OF_RESOURCE;
718 goto exit;
719 }
720
721 curr_to_proc = (int *) malloc(fh->f_size*sizeof(int));
722
723 if ( NULL == curr_to_proc ){
724 ret = OMPI_ERR_OUT_OF_RESOURCE;
725 goto exit;
726 }
727
728 done_to_proc = (int *) malloc(fh->f_size*sizeof(int));
729
730 if ( NULL == done_to_proc ){
731 ret = OMPI_ERR_OUT_OF_RESOURCE;
732 goto exit;
733 }
734
735 start_pos = (int *) malloc(fh->f_size*sizeof(int));
736
737 if ( NULL == start_pos ){
738 ret = OMPI_ERR_OUT_OF_RESOURCE;
739 goto exit;
740 }
741
742
743 done = 0;
744 off = st_loc;
745
746 ompi_datatype_type_extent(datatype, &buftype_extent);
747 for (m=0;m <ntimes; m++){
748 for (i=0; i< fh->f_size; i++) count[i] = recv_size[i] = 0;
749
750 size = OMPIO_MIN((unsigned)two_phase_cycle_buffer_size,
751 end_loc-st_loc+1-done);
752 for (i=0;i<fh->f_size;i++){
753 if(others_req[i].count){
754 start_pos[i] = curr_offlen_ptr[i];
755 for (j=curr_offlen_ptr[i]; j<others_req[i].count; j++) {
756 if (partial_recv[i]) {
757
758
759 req_off = others_req[i].offsets[j] +
760 partial_recv[i];
761 req_len = others_req[i].lens[j] -
762 partial_recv[i];
763 partial_recv[i] = 0;
764
765 others_req[i].offsets[j] = req_off;
766 others_req[i].lens[j] = req_len;
767 }
768 else {
769 req_off = others_req[i].offsets[j];
770 req_len = others_req[i].lens[j];
771 }
772 if (req_off < off + size) {
773 count[i]++;
774 #if DEBUG_ON
775 printf("%d: req_off : %lld, off : %lld, size : %lld, count[%d]: %d\n", fh->f_rank,
776 req_off,
777 off,
778 size,i,
779 count[i]);
780 #endif
781 PMPI_Get_address(write_buf+req_off-off,
782 &(others_req[i].mem_ptrs[j]));
783 #if DEBUG_ON
784 printf("%d : mem_ptrs : %ld\n", fh->f_rank,
785 others_req[i].mem_ptrs[j]);
786 #endif
787 recv_size[i] += (int) (OMPIO_MIN(off + size - req_off,
788 (unsigned)req_len));
789
790 if (off+size-req_off < (unsigned)req_len){
791
792 partial_recv[i] = (int)(off + size - req_off);
793 break;
794 }
795 }
796 else break;
797 }
798 curr_offlen_ptr[i] = j;
799 }
800 }
801
802 ret = two_phase_exchage_data(fh, buf, write_buf,
803 offset_len,send_size,
804 start_pos,recv_size,off,size,
805 count, partial_recv, sent_to_proc,
806 contig_access_count,
807 min_st_offset,
808 fd_size, fd_start,
809 fd_end, flat_buf, others_req,
810 send_buf_idx, curr_to_proc,
811 done_to_proc, m, buf_idx,
812 buftype_extent, striping_unit,
813 two_phase_num_io_procs,
814 aggregator_list, &hole);
815
816 if ( OMPI_SUCCESS != ret ){
817 goto exit;
818 }
819
820
821
822 flag = 0;
823 for (i=0; i<fh->f_size; i++)
824 if (count[i]) flag = 1;
825
826
827
828 if (flag){
829
830 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
831 start_write_time = MPI_Wtime();
832 #endif
833
834 #if DEBUG_ON
835 printf("rank : %d enters writing\n", fh->f_rank);
836 printf("size : %ld, off : %ld\n",size, off);
837 for (ii=0, jj=0;jj<size;jj+=4, ii++){
838 printf("%d : write_buf[%d]: %d\n", fh->f_rank, ii,((int *)write_buf[jj]));
839 }
840 #endif
841 len = size * byte_size;
842 fh->f_io_array = (mca_common_ompio_io_array_t *)malloc
843 (sizeof(mca_common_ompio_io_array_t));
844 if (NULL == fh->f_io_array) {
845 opal_output(1, "OUT OF MEMORY\n");
846 ret = OMPI_ERR_OUT_OF_RESOURCE;
847 goto exit;
848 }
849
850 fh->f_io_array[0].offset =(IOVBASE_TYPE *)(intptr_t) off;
851 fh->f_io_array[0].length = len;
852 fh->f_io_array[0].memory_address = write_buf;
853 fh->f_num_of_io_entries = 1;
854
855 #if DEBUG_ON
856 for (i=0 ; i<fh->f_num_of_io_entries ; i++) {
857 printf("%d: ADDRESS: %p OFFSET: %ld LENGTH: %d\n",
858 fh->f_rank,
859 fh->f_io_array[i].memory_address,
860 fh->f_io_array[i].offset,
861 fh->f_io_array[i].length);
862 }
863 #endif
864
865 if (fh->f_num_of_io_entries){
866 if ( 0 > fh->f_fbtl->fbtl_pwritev (fh)) {
867 opal_output(1, "WRITE FAILED\n");
868 ret = OMPI_ERROR;
869 goto exit;
870 }
871 }
872 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
873 end_write_time = MPI_Wtime();
874 write_time += (end_write_time - start_write_time);
875 #endif
876
877
878 }
879
880
881 fh->f_num_of_io_entries = 0;
882 if (NULL != fh->f_io_array) {
883 free (fh->f_io_array);
884 fh->f_io_array = NULL;
885 }
886
887 off += size;
888 done += size;
889
890 }
891 for (i=0; i<fh->f_size; i++) count[i] = recv_size[i] = 0;
892 for (m=ntimes; m<max_ntimes; m++) {
893 ret = two_phase_exchage_data(fh, buf, write_buf,
894 offset_len,send_size,
895 start_pos,recv_size,off,size,
896 count, partial_recv, sent_to_proc,
897 contig_access_count,
898 min_st_offset,
899 fd_size, fd_start,
900 fd_end, flat_buf,others_req,
901 send_buf_idx, curr_to_proc,
902 done_to_proc, m, buf_idx,
903 buftype_extent, striping_unit,
904 two_phase_num_io_procs,
905 aggregator_list, &hole);
906 if ( OMPI_SUCCESS != ret ){
907 goto exit;
908 }
909 }
910
911 exit:
912
913 free (write_buf);
914 free (curr_offlen_ptr);
915 free (count);
916 free (partial_recv);
917 free (send_size);
918 free (recv_size);
919 free (sent_to_proc);
920 free (start_pos);
921 free (send_buf_idx);
922 free (curr_to_proc);
923 free (done_to_proc);
924
925 return ret;
926 }
927
928 static int two_phase_exchage_data(ompio_file_t *fh,
929 const void *buf,
930 char *write_buf,
931 struct iovec *offset_length,
932 int *send_size,int *start_pos,
933 int *recv_size,
934 OMPI_MPI_OFFSET_TYPE off,
935 OMPI_MPI_OFFSET_TYPE size, int *count,
936 int *partial_recv, int *sent_to_proc,
937 int contig_access_count,
938 OMPI_MPI_OFFSET_TYPE min_st_offset,
939 OMPI_MPI_OFFSET_TYPE fd_size,
940 OMPI_MPI_OFFSET_TYPE *fd_start,
941 OMPI_MPI_OFFSET_TYPE *fd_end,
942 Flatlist_node *flat_buf,
943 mca_common_ompio_access_array_t *others_req,
944 int *send_buf_idx, int *curr_to_proc,
945 int *done_to_proc, int iter,
946 size_t *buf_idx,MPI_Aint buftype_extent,
947 int striping_unit, int two_phase_num_io_procs,
948 int *aggregator_list, int *hole){
949
950 int *tmp_len=NULL, sum, *srt_len=NULL, nprocs_recv, nprocs_send, k,i,j;
951 int ret=OMPI_SUCCESS;
952 MPI_Request *requests=NULL, *send_req=NULL;
953 ompi_datatype_t **recv_types=NULL;
954 OMPI_MPI_OFFSET_TYPE *srt_off=NULL;
955 char **send_buf = NULL;
956
957 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
958 start_comm_time = MPI_Wtime();
959 #endif
960 ret = fh->f_comm->c_coll->coll_alltoall (recv_size,
961 1,
962 MPI_INT,
963 send_size,
964 1,
965 MPI_INT,
966 fh->f_comm,
967 fh->f_comm->c_coll->coll_alltoall_module);
968
969 if ( OMPI_SUCCESS != ret ){
970 return ret;
971 }
972
973 nprocs_recv = 0;
974 for (i=0;i<fh->f_size;i++){
975 if (recv_size[i]){
976 nprocs_recv++;
977 }
978 }
979
980
981 recv_types = (ompi_datatype_t **)
982 calloc (( nprocs_recv + 1 ), sizeof(ompi_datatype_t *));
983
984 if ( NULL == recv_types ){
985 ret = OMPI_ERR_OUT_OF_RESOURCE;
986 goto exit;
987 }
988
989 tmp_len = (int *) calloc(fh->f_size, sizeof(int));
990
991 if ( NULL == tmp_len ) {
992 ret = OMPI_ERR_OUT_OF_RESOURCE;
993 goto exit;
994 }
995
996 j = 0;
997 for (i=0;i<fh->f_size;i++){
998 if (recv_size[i]) {
999 if (partial_recv[i]) {
1000 k = start_pos[i] + count[i] - 1;
1001 tmp_len[i] = others_req[i].lens[k];
1002 others_req[i].lens[k] = partial_recv[i];
1003 }
1004 ompi_datatype_create_hindexed(count[i],
1005 &(others_req[i].lens[start_pos[i]]),
1006 &(others_req[i].mem_ptrs[start_pos[i]]),
1007 MPI_BYTE, recv_types+j);
1008 ompi_datatype_commit(recv_types+j);
1009 j++;
1010 }
1011 }
1012
1013 sum = 0;
1014 for (i=0;i<fh->f_size;i++) sum += count[i];
1015 srt_off = (OMPI_MPI_OFFSET_TYPE *)
1016 malloc((sum+1)*sizeof(OMPI_MPI_OFFSET_TYPE));
1017
1018 if ( NULL == srt_off ){
1019 ret = OMPI_ERR_OUT_OF_RESOURCE;
1020 goto exit;
1021 }
1022
1023 srt_len = (int *) malloc((sum+1)*sizeof(int));
1024
1025 if ( NULL == srt_len ) {
1026 ret = OMPI_ERR_OUT_OF_RESOURCE;
1027 free(srt_off);
1028 goto exit;
1029 }
1030
1031
1032 two_phase_heap_merge(others_req, count, srt_off, srt_len, start_pos, fh->f_size,fh->f_rank, nprocs_recv, sum);
1033
1034
1035 for (i=0; i<fh->f_size; i++)
1036 if (partial_recv[i]) {
1037 k = start_pos[i] + count[i] - 1;
1038 others_req[i].lens[k] = tmp_len[i];
1039 }
1040
1041 free(tmp_len);
1042 tmp_len = NULL;
1043
1044 *hole = 0;
1045 if (off != srt_off[0]){
1046 *hole = 1;
1047 }
1048 else{
1049 for (i=1;i<sum;i++){
1050 if (srt_off[i] <= srt_off[0] + srt_len[0]){
1051 int new_len = srt_off[i] + srt_len[i] - srt_off[0];
1052 if(new_len > srt_len[0])
1053 srt_len[0] = new_len;
1054 }
1055 else
1056 break;
1057 }
1058 if (i < sum || size != srt_len[0])
1059 *hole = 1;
1060 }
1061
1062
1063 free(srt_off);
1064 free(srt_len);
1065
1066 if (nprocs_recv){
1067 if (*hole){
1068 if (off >= 0){
1069 fh->f_io_array = (mca_common_ompio_io_array_t *)malloc
1070 (sizeof(mca_common_ompio_io_array_t));
1071 if (NULL == fh->f_io_array) {
1072 opal_output(1, "OUT OF MEMORY\n");
1073 ret = OMPI_ERR_OUT_OF_RESOURCE;
1074 goto exit;
1075 }
1076 fh->f_io_array[0].offset =(IOVBASE_TYPE *)(intptr_t) off;
1077 fh->f_num_of_io_entries = 1;
1078 fh->f_io_array[0].length = size;
1079 fh->f_io_array[0].memory_address = write_buf;
1080 if (fh->f_num_of_io_entries){
1081 int amode_overwrite;
1082 amode_overwrite = fh->f_get_mca_parameter_value ("overwrite_amode", strlen("overwrite_amode"));
1083 if ( OMPI_ERR_MAX == amode_overwrite ) {
1084 ret = OMPI_ERROR;
1085 goto exit;
1086 }
1087 if ( fh->f_amode & MPI_MODE_WRONLY && !amode_overwrite ){
1088 if ( 0 == fh->f_rank ) {
1089 printf("\n File not opened in RDWR mode, can not continue."
1090 "\n To resolve this problem, you can either \n"
1091 " a. open the file with MPI_MODE_RDWR instead of MPI_MODE_WRONLY\n"
1092 " b. ensure that the mca parameter mca_io_ompio_overwrite_amode is set to 1\n"
1093 " c. use an fcoll component that does not use data sieving (e.g. dynamic)\n");
1094 }
1095 ret = MPI_ERR_FILE;
1096 goto exit;
1097 }
1098 if ( 0 > fh->f_fbtl->fbtl_preadv (fh)) {
1099 opal_output(1, "READ FAILED\n");
1100 ret = OMPI_ERROR;
1101 goto exit;
1102 }
1103 }
1104
1105 }
1106 fh->f_num_of_io_entries = 0;
1107 if (NULL != fh->f_io_array) {
1108 free (fh->f_io_array);
1109 fh->f_io_array = NULL;
1110 }
1111 }
1112 }
1113
1114 nprocs_send = 0;
1115 for (i=0; i <fh->f_size; i++) if (send_size[i]) nprocs_send++;
1116
1117 #if DEBUG_ON
1118 printf("%d : nprocs_send : %d\n", fh->f_rank,nprocs_send);
1119 #endif
1120
1121 requests = (MPI_Request *)
1122 malloc((nprocs_send+nprocs_recv+1)*sizeof(MPI_Request));
1123
1124 if ( NULL == requests ){
1125 ret = OMPI_ERR_OUT_OF_RESOURCE;
1126 goto exit;
1127 }
1128
1129 j = 0;
1130 for (i=0; i<fh->f_size; i++) {
1131 if (recv_size[i]) {
1132 ret = MCA_PML_CALL(irecv(MPI_BOTTOM,
1133 1,
1134 recv_types[j],
1135 i,
1136 fh->f_rank+i+100*iter,
1137 fh->f_comm,
1138 requests+j));
1139
1140 if ( OMPI_SUCCESS != ret ){
1141 goto exit;
1142 }
1143 j++;
1144 }
1145 }
1146 send_req = requests + nprocs_recv;
1147
1148
1149 if (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY) {
1150 j = 0;
1151 for (i=0; i <fh->f_size; i++)
1152 if (send_size[i]) {
1153 ret = MCA_PML_CALL(isend(((char *) buf) + buf_idx[i],
1154 send_size[i],
1155 MPI_BYTE,
1156 i,
1157 fh->f_rank+i+100*iter,
1158 MCA_PML_BASE_SEND_STANDARD,
1159 fh->f_comm,
1160 send_req+j));
1161
1162 if ( OMPI_SUCCESS != ret ){
1163 goto exit;
1164 }
1165
1166 j++;
1167 buf_idx[i] += send_size[i];
1168 }
1169 }
1170 else if(nprocs_send && (!(fh->f_flags & OMPIO_CONTIGUOUS_MEMORY))){
1171 send_buf = (char **) calloc (fh->f_size, sizeof(char*));
1172 if ( NULL == send_buf ){
1173 ret = OMPI_ERR_OUT_OF_RESOURCE;
1174 goto exit;
1175 }
1176 for (i=0; i < fh->f_size; i++){
1177 if (send_size[i]) {
1178 send_buf[i] = (char *) malloc(send_size[i]);
1179
1180 if ( NULL == send_buf[i] ){
1181 ret = OMPI_ERR_OUT_OF_RESOURCE;
1182 goto exit;
1183 }
1184 }
1185 }
1186
1187 ret = two_phase_fill_send_buffer(fh, buf,flat_buf, send_buf,
1188 offset_length, send_size,
1189 send_req,sent_to_proc,
1190 contig_access_count,
1191 min_st_offset, fd_size,
1192 fd_start, fd_end, send_buf_idx,
1193 curr_to_proc, done_to_proc,
1194 iter, buftype_extent, striping_unit,
1195 two_phase_num_io_procs, aggregator_list);
1196
1197 if ( OMPI_SUCCESS != ret ){
1198 goto exit;
1199 }
1200 }
1201
1202
1203 ret = ompi_request_wait_all (nprocs_send+nprocs_recv,
1204 requests,
1205 MPI_STATUS_IGNORE);
1206
1207
1208 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
1209 end_comm_time = MPI_Wtime();
1210 comm_time += (end_comm_time - start_comm_time);
1211 #endif
1212
1213 exit:
1214 if (recv_types) {
1215 for (i=0; i<nprocs_recv; i++) {
1216 if (recv_types[i]) {
1217 ompi_datatype_destroy(recv_types+i);
1218 }
1219 }
1220 }
1221 free (recv_types);
1222
1223 free (requests);
1224 if (send_buf) {
1225 for (i=0; i < fh->f_size; i++){
1226 free (send_buf[i]);
1227 }
1228
1229 free (send_buf);
1230 }
1231 free (tmp_len);
1232
1233 return ret;
1234 }
1235
1236
1237 #define TWO_PHASE_BUF_INCR \
1238 { \
1239 while (buf_incr) { \
1240 size_in_buf = OMPIO_MIN(buf_incr, flat_buf_sz); \
1241 user_buf_idx += size_in_buf; \
1242 flat_buf_sz -= size_in_buf; \
1243 if (!flat_buf_sz) { \
1244 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
1245 else { \
1246 flat_buf_idx = 0; \
1247 n_buftypes++; \
1248 } \
1249 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
1250 (OMPI_MPI_OFFSET_TYPE)n_buftypes*(OMPI_MPI_OFFSET_TYPE)buftype_extent; \
1251 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
1252 } \
1253 buf_incr -= size_in_buf; \
1254 } \
1255 }
1256
1257
1258 #define TWO_PHASE_BUF_COPY \
1259 { \
1260 while (size) { \
1261 size_in_buf = OMPIO_MIN(size, flat_buf_sz); \
1262 memcpy(&(send_buf[p][send_buf_idx[p]]), \
1263 ((char *) buf) + user_buf_idx, size_in_buf); \
1264 send_buf_idx[p] += size_in_buf; \
1265 user_buf_idx += size_in_buf; \
1266 flat_buf_sz -= size_in_buf; \
1267 if (!flat_buf_sz) { \
1268 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
1269 else { \
1270 flat_buf_idx = 0; \
1271 n_buftypes++; \
1272 } \
1273 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
1274 (OMPI_MPI_OFFSET_TYPE)n_buftypes*(OMPI_MPI_OFFSET_TYPE)buftype_extent; \
1275 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
1276 } \
1277 size -= size_in_buf; \
1278 buf_incr -= size_in_buf; \
1279 } \
1280 TWO_PHASE_BUF_INCR \
1281 }
1282
1283
1284
1285
1286
1287 static int two_phase_fill_send_buffer(ompio_file_t *fh,
1288 const void *buf,
1289 Flatlist_node *flat_buf,
1290 char **send_buf,
1291 struct iovec *offset_length,
1292 int *send_size,
1293 MPI_Request *requests,
1294 int *sent_to_proc,
1295 int contig_access_count,
1296 OMPI_MPI_OFFSET_TYPE min_st_offset,
1297 OMPI_MPI_OFFSET_TYPE fd_size,
1298 OMPI_MPI_OFFSET_TYPE *fd_start,
1299 OMPI_MPI_OFFSET_TYPE *fd_end,
1300 int *send_buf_idx,
1301 int *curr_to_proc,
1302 int *done_to_proc,
1303 int iter, MPI_Aint buftype_extent,
1304 int striping_unit, int two_phase_num_io_procs,
1305 int *aggregator_list)
1306 {
1307
1308 int i, p, flat_buf_idx=0;
1309 OMPI_MPI_OFFSET_TYPE flat_buf_sz=0, size_in_buf=0, buf_incr=0, size=0;
1310 int jj, n_buftypes, ret=OMPI_SUCCESS;
1311 OMPI_MPI_OFFSET_TYPE off=0, len=0, rem_len=0, user_buf_idx=0;
1312
1313 for (i=0; i < fh->f_size; i++) {
1314 send_buf_idx[i] = curr_to_proc[i] = 0;
1315 done_to_proc[i] = sent_to_proc[i];
1316 }
1317 jj = 0;
1318
1319 flat_buf_idx = 0;
1320 n_buftypes = 0;
1321 if ( flat_buf->count > 0 ) {
1322 user_buf_idx = flat_buf->indices[0];
1323 flat_buf_sz = flat_buf->blocklens[0];
1324 }
1325
1326 for (i=0; i<contig_access_count; i++) {
1327
1328 off = (OMPI_MPI_OFFSET_TYPE) (intptr_t)offset_length[i].iov_base;
1329 rem_len = (OMPI_MPI_OFFSET_TYPE)offset_length[i].iov_len;
1330
1331
1332 while (rem_len != 0) {
1333 len = rem_len;
1334 p = mca_fcoll_two_phase_calc_aggregator(fh,
1335 off,
1336 min_st_offset,
1337 &len,
1338 fd_size,
1339 fd_start,
1340 fd_end,
1341 striping_unit,
1342 two_phase_num_io_procs,
1343 aggregator_list);
1344
1345 if (send_buf_idx[p] < send_size[p]) {
1346 if (curr_to_proc[p]+len > done_to_proc[p]) {
1347 if (done_to_proc[p] > curr_to_proc[p]) {
1348 size = OMPIO_MIN(curr_to_proc[p] + len -
1349 done_to_proc[p], send_size[p]-send_buf_idx[p]);
1350 buf_incr = done_to_proc[p] - curr_to_proc[p];
1351 TWO_PHASE_BUF_INCR
1352 buf_incr = curr_to_proc[p] + len - done_to_proc[p];
1353 curr_to_proc[p] = done_to_proc[p] + size;
1354 TWO_PHASE_BUF_COPY
1355 }
1356 else {
1357 size = OMPIO_MIN(len,send_size[p]-send_buf_idx[p]);
1358 buf_incr = len;
1359 curr_to_proc[p] += size;
1360 TWO_PHASE_BUF_COPY
1361 }
1362 if (send_buf_idx[p] == send_size[p]) {
1363
1364 ret = MCA_PML_CALL(isend(send_buf[p],
1365 send_size[p],
1366 MPI_BYTE,
1367 p,
1368 fh->f_rank+p+100*iter,
1369 MCA_PML_BASE_SEND_STANDARD,
1370 fh->f_comm,
1371 requests+jj));
1372
1373 if ( OMPI_SUCCESS != ret ){
1374 return ret;
1375 }
1376 jj++;
1377 }
1378 }
1379 else {
1380 curr_to_proc[p] += len;
1381 buf_incr = len;
1382 TWO_PHASE_BUF_INCR
1383 }
1384 }
1385 else {
1386 buf_incr = len;
1387 TWO_PHASE_BUF_INCR
1388 }
1389 off += len;
1390 rem_len -= len;
1391 }
1392 }
1393 for (i=0; i < fh->f_size; i++) {
1394 if (send_size[i]){
1395 sent_to_proc[i] = curr_to_proc[i];
1396 }
1397 }
1398
1399 return ret;
1400 }
1401
1402
1403
1404
1405
1406
1407 void two_phase_heap_merge( mca_common_ompio_access_array_t *others_req,
1408 int *count,
1409 OMPI_MPI_OFFSET_TYPE *srt_off,
1410 int *srt_len,
1411 int *start_pos,
1412 int nprocs,
1413 int myrank,
1414 int nprocs_recv,
1415 int total_elements)
1416 {
1417
1418
1419
1420 typedef struct {
1421 OMPI_MPI_OFFSET_TYPE *off_list;
1422 int *len_list;
1423 int nelem;
1424 } heap_struct;
1425
1426 heap_struct *a, tmp;
1427 int i, j, heapsize, l, r, k, smallest;
1428
1429 a = (heap_struct *) malloc((nprocs_recv+1)*sizeof(heap_struct));
1430
1431 j = 0;
1432 for (i=0; i<nprocs; i++)
1433 if (count[i]) {
1434 a[j].off_list = &(others_req[i].offsets[start_pos[i]]);
1435 a[j].len_list = &(others_req[i].lens[start_pos[i]]);
1436 a[j].nelem = count[i];
1437 j++;
1438 }
1439
1440 heapsize = nprocs_recv;
1441
1442 for (i=heapsize/2 - 1; i>=0; i--) {
1443 k = i;
1444 for(;;) {
1445 l = 2*(k+1) - 1;
1446 r = 2*(k+1);
1447 if ((l < heapsize) &&
1448 (*(a[l].off_list) < *(a[k].off_list)))
1449 smallest = l;
1450 else smallest = k;
1451
1452 if ((r < heapsize) &&
1453 (*(a[r].off_list) < *(a[smallest].off_list)))
1454 smallest = r;
1455
1456 if (smallest != k) {
1457 tmp.off_list = a[k].off_list;
1458 tmp.len_list = a[k].len_list;
1459 tmp.nelem = a[k].nelem;
1460
1461 a[k].off_list = a[smallest].off_list;
1462 a[k].len_list = a[smallest].len_list;
1463 a[k].nelem = a[smallest].nelem;
1464
1465 a[smallest].off_list = tmp.off_list;
1466 a[smallest].len_list = tmp.len_list;
1467 a[smallest].nelem = tmp.nelem;
1468
1469 k = smallest;
1470 }
1471 else break;
1472 }
1473 }
1474
1475
1476 for (i=0; i<total_elements; i++) {
1477
1478 srt_off[i] = *(a[0].off_list);
1479 srt_len[i] = *(a[0].len_list);
1480 (a[0].nelem)--;
1481
1482 if (!a[0].nelem) {
1483 a[0].off_list = a[heapsize-1].off_list;
1484 a[0].len_list = a[heapsize-1].len_list;
1485 a[0].nelem = a[heapsize-1].nelem;
1486 heapsize--;
1487 }
1488 else {
1489 (a[0].off_list)++;
1490 (a[0].len_list)++;
1491 }
1492
1493
1494 k = 0;
1495 for (;;) {
1496 l = 2*(k+1) - 1;
1497 r = 2*(k+1);
1498
1499 if ((l < heapsize) &&
1500 (*(a[l].off_list) < *(a[k].off_list)))
1501 smallest = l;
1502 else smallest = k;
1503
1504 if ((r < heapsize) &&
1505 (*(a[r].off_list) < *(a[smallest].off_list)))
1506 smallest = r;
1507
1508 if (smallest != k) {
1509 tmp.off_list = a[k].off_list;
1510 tmp.len_list = a[k].len_list;
1511 tmp.nelem = a[k].nelem;
1512
1513 a[k].off_list = a[smallest].off_list;
1514 a[k].len_list = a[smallest].len_list;
1515 a[k].nelem = a[smallest].nelem;
1516
1517 a[smallest].off_list = tmp.off_list;
1518 a[smallest].len_list = tmp.len_list;
1519 a[smallest].nelem = tmp.nelem;
1520
1521 k = smallest;
1522 }
1523 else break;
1524 }
1525 }
1526 free(a);
1527 }
1528 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
1529 int is_aggregator(int rank,
1530 int nprocs_for_coll,
1531 int *aggregator_list){
1532
1533 int i=0;
1534 for (i=0; i<nprocs_for_coll; i++){
1535 if (aggregator_list[i] == rank)
1536 return 1;
1537 }
1538 return 0;
1539 }
1540 #endif