This source file includes following definitions.
- mca_fcoll_dynamic_file_write_all
- local_heap_sort
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 #include "ompi_config.h"
24 #include "fcoll_dynamic.h"
25
26 #include "mpi.h"
27 #include "ompi/constants.h"
28 #include "ompi/mca/fcoll/fcoll.h"
29 #include "ompi/mca/fcoll/base/fcoll_base_coll_array.h"
30 #include "ompi/mca/common/ompio/common_ompio.h"
31 #include "ompi/mca/io/io.h"
32 #include "math.h"
33 #include "ompi/mca/pml/pml.h"
34 #include <unistd.h>
35
36
37 #define DEBUG_ON 0
38
39
40 typedef struct mca_io_ompio_local_io_array{
41 OMPI_MPI_OFFSET_TYPE offset;
42 MPI_Aint length;
43 int process_id;
44 }mca_io_ompio_local_io_array;
45
46
47
48 static int local_heap_sort (mca_io_ompio_local_io_array *io_array,
49 int num_entries,
50 int *sorted);
51
52
53 int
54 mca_fcoll_dynamic_file_write_all (ompio_file_t *fh,
55 const void *buf,
56 int count,
57 struct ompi_datatype_t *datatype,
58 ompi_status_public_t *status)
59 {
60 MPI_Aint total_bytes_written = 0;
61 MPI_Aint total_bytes = 0;
62 MPI_Aint bytes_to_write_in_cycle = 0;
63 MPI_Aint bytes_per_cycle = 0;
64 int index = 0;
65 int cycles = 0;
66 int i=0, j=0, l=0;
67 int n=0;
68 MPI_Aint bytes_remaining = 0;
69
70 int bytes_sent = 0, ret =0;
71 int blocks=0, entries_per_aggregator=0;
72
73
74 uint32_t iov_count = 0;
75 struct iovec *decoded_iov = NULL;
76 int iov_index = 0;
77 char *send_buf = NULL;
78 size_t current_position = 0;
79 struct iovec *local_iov_array=NULL, *global_iov_array=NULL;
80 mca_io_ompio_local_io_array *file_offsets_for_agg=NULL;
81
82
83 uint32_t total_fview_count = 0;
84 int local_count = 0, temp_pindex;
85 int *fview_count = NULL, *disp_index=NULL, *temp_disp_index=NULL;
86 int current_index = 0, temp_index=0;
87
88 char *global_buf = NULL;
89 MPI_Aint global_count = 0;
90
91
92
93 int *sorted = NULL, *sorted_file_offsets=NULL;
94 int *displs = NULL;
95 int dynamic_num_io_procs;
96 size_t max_data = 0, datatype_size = 0;
97 int **blocklen_per_process=NULL;
98 MPI_Aint **displs_per_process=NULL, *memory_displacements=NULL;
99 ompi_datatype_t **recvtype = NULL;
100 MPI_Aint *total_bytes_per_process = NULL;
101 MPI_Request send_req=NULL, *recv_req=NULL;
102 int my_aggregator=-1;
103 bool sendbuf_is_contiguous = false;
104 size_t ftype_size;
105 ptrdiff_t ftype_extent, lb;
106
107
108 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
109 double write_time = 0.0, start_write_time = 0.0, end_write_time = 0.0;
110 double comm_time = 0.0, start_comm_time = 0.0, end_comm_time = 0.0;
111 double exch_write = 0.0, start_exch = 0.0, end_exch = 0.0;
112 mca_common_ompio_print_entry nentry;
113 #endif
114
115 opal_datatype_type_size ( &datatype->super, &ftype_size );
116 opal_datatype_get_extent ( &datatype->super, &lb, &ftype_extent );
117
118
119
120
121 if ( ( ftype_extent == (ptrdiff_t) ftype_size) &&
122 opal_datatype_is_contiguous_memory_layout(&datatype->super,1) &&
123 0 == lb ) {
124 sendbuf_is_contiguous = true;
125 }
126
127
128
129 if (! sendbuf_is_contiguous ) {
130 ret = mca_common_ompio_decode_datatype ((struct ompio_file_t *) fh,
131 datatype,
132 count,
133 buf,
134 &max_data,
135 fh->f_mem_convertor,
136 &decoded_iov,
137 &iov_count);
138 if (OMPI_SUCCESS != ret ){
139 goto exit;
140 }
141 }
142 else {
143 max_data = count * datatype->super.size;
144 }
145
146 if ( MPI_STATUS_IGNORE != status ) {
147 status->_ucount = max_data;
148 }
149
150 dynamic_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators"));
151 if ( OMPI_ERR_MAX == dynamic_num_io_procs ) {
152 ret = OMPI_ERROR;
153 goto exit;
154 }
155 ret = mca_common_ompio_set_aggregator_props ((struct ompio_file_t *) fh,
156 dynamic_num_io_procs,
157 max_data);
158
159 if (OMPI_SUCCESS != ret){
160 goto exit;
161 }
162 my_aggregator = fh->f_procs_in_group[0];
163
164
165
166 total_bytes_per_process = (MPI_Aint*)malloc
167 (fh->f_procs_per_group*sizeof(MPI_Aint));
168 if (NULL == total_bytes_per_process) {
169 opal_output (1, "OUT OF MEMORY\n");
170 ret = OMPI_ERR_OUT_OF_RESOURCE;
171 goto exit;
172 }
173
174 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
175 start_comm_time = MPI_Wtime();
176 #endif
177 ret = ompi_fcoll_base_coll_allgather_array (&max_data,
178 1,
179 MPI_LONG,
180 total_bytes_per_process,
181 1,
182 MPI_LONG,
183 0,
184 fh->f_procs_in_group,
185 fh->f_procs_per_group,
186 fh->f_comm);
187
188 if( OMPI_SUCCESS != ret){
189 goto exit;
190 }
191 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
192 end_comm_time = MPI_Wtime();
193 comm_time += (end_comm_time - start_comm_time);
194 #endif
195
196 for (i=0 ; i<fh->f_procs_per_group ; i++) {
197 total_bytes += total_bytes_per_process[i];
198 }
199
200 if (NULL != total_bytes_per_process) {
201 free (total_bytes_per_process);
202 total_bytes_per_process = NULL;
203 }
204
205
206
207
208
209 ret = fh->f_generate_current_file_view( (struct ompio_file_t *) fh,
210 max_data,
211 &local_iov_array,
212 &local_count);
213 if (ret != OMPI_SUCCESS){
214 goto exit;
215 }
216
217 #if DEBUG_ON
218 for (i=0 ; i<local_count ; i++) {
219
220 printf("%d: OFFSET: %d LENGTH: %ld\n",
221 fh->f_rank,
222 local_iov_array[i].iov_base,
223 local_iov_array[i].iov_len);
224
225 }
226 #endif
227
228
229
230
231 fview_count = (int *) malloc (fh->f_procs_per_group * sizeof (int));
232 if (NULL == fview_count) {
233 opal_output (1, "OUT OF MEMORY\n");
234 ret = OMPI_ERR_OUT_OF_RESOURCE;
235 goto exit;
236 }
237 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
238 start_comm_time = MPI_Wtime();
239 #endif
240 ret = ompi_fcoll_base_coll_allgather_array (&local_count,
241 1,
242 MPI_INT,
243 fview_count,
244 1,
245 MPI_INT,
246 0,
247 fh->f_procs_in_group,
248 fh->f_procs_per_group,
249 fh->f_comm);
250
251 if( OMPI_SUCCESS != ret){
252 goto exit;
253 }
254 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
255 end_comm_time = MPI_Wtime();
256 comm_time += (end_comm_time - start_comm_time);
257 #endif
258
259 displs = (int*) malloc (fh->f_procs_per_group * sizeof (int));
260 if (NULL == displs) {
261 opal_output (1, "OUT OF MEMORY\n");
262 ret = OMPI_ERR_OUT_OF_RESOURCE;
263 goto exit;
264 }
265
266 displs[0] = 0;
267 total_fview_count = fview_count[0];
268 for (i=1 ; i<fh->f_procs_per_group ; i++) {
269 total_fview_count += fview_count[i];
270 displs[i] = displs[i-1] + fview_count[i-1];
271 }
272
273 #if DEBUG_ON
274 printf("total_fview_count : %d\n", total_fview_count);
275 if (my_aggregator == fh->f_rank) {
276 for (i=0 ; i<fh->f_procs_per_group ; i++) {
277 printf ("%d: PROCESS: %d ELEMENTS: %d DISPLS: %d\n",
278 fh->f_rank,
279 i,
280 fview_count[i],
281 displs[i]);
282 }
283 }
284 #endif
285
286
287
288 if (0 != total_fview_count) {
289 global_iov_array = (struct iovec*) malloc (total_fview_count *
290 sizeof(struct iovec));
291 if (NULL == global_iov_array){
292 opal_output(1, "OUT OF MEMORY\n");
293 ret = OMPI_ERR_OUT_OF_RESOURCE;
294 goto exit;
295 }
296
297 }
298
299 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
300 start_comm_time = MPI_Wtime();
301 #endif
302 ret = ompi_fcoll_base_coll_allgatherv_array (local_iov_array,
303 local_count,
304 fh->f_iov_type,
305 global_iov_array,
306 fview_count,
307 displs,
308 fh->f_iov_type,
309 0,
310 fh->f_procs_in_group,
311 fh->f_procs_per_group,
312 fh->f_comm);
313 if (OMPI_SUCCESS != ret){
314 goto exit;
315 }
316 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
317 end_comm_time = MPI_Wtime();
318 comm_time += (end_comm_time - start_comm_time);
319 #endif
320
321
322
323
324
325
326
327
328
329 if (0 != total_fview_count) {
330 sorted = (int *)malloc (total_fview_count * sizeof(int));
331 if (NULL == sorted) {
332 opal_output (1, "OUT OF MEMORY\n");
333 ret = OMPI_ERR_OUT_OF_RESOURCE;
334 goto exit;
335 }
336 ompi_fcoll_base_sort_iovec (global_iov_array, total_fview_count, sorted);
337 }
338
339 if (NULL != local_iov_array){
340 free(local_iov_array);
341 local_iov_array = NULL;
342 }
343
344 if (NULL != displs){
345 free(displs);
346 displs=NULL;
347 }
348
349
350 #if DEBUG_ON
351 if (my_aggregator == fh->f_rank) {
352 uint32_t tv=0;
353 for (tv=0 ; tv<total_fview_count ; tv++) {
354 printf("%d: OFFSET: %lld LENGTH: %ld\n",
355 fh->f_rank,
356 global_iov_array[sorted[tv]].iov_base,
357 global_iov_array[sorted[tv]].iov_len);
358 }
359 }
360 #endif
361
362
363
364
365 bytes_per_cycle = fh->f_bytes_per_agg;
366 cycles = ceil((double)total_bytes/bytes_per_cycle);
367
368 if (my_aggregator == fh->f_rank) {
369 disp_index = (int *)malloc (fh->f_procs_per_group * sizeof (int));
370 if (NULL == disp_index) {
371 opal_output (1, "OUT OF MEMORY\n");
372 ret = OMPI_ERR_OUT_OF_RESOURCE;
373 goto exit;
374 }
375
376 blocklen_per_process = (int **)calloc (fh->f_procs_per_group, sizeof (int*));
377 if (NULL == blocklen_per_process) {
378 opal_output (1, "OUT OF MEMORY\n");
379 ret = OMPI_ERR_OUT_OF_RESOURCE;
380 goto exit;
381 }
382
383 displs_per_process = (MPI_Aint **)calloc (fh->f_procs_per_group, sizeof (MPI_Aint*));
384 if (NULL == displs_per_process) {
385 opal_output (1, "OUT OF MEMORY\n");
386 ret = OMPI_ERR_OUT_OF_RESOURCE;
387 goto exit;
388 }
389
390 recv_req = (MPI_Request *)malloc ((fh->f_procs_per_group)*sizeof(MPI_Request));
391 if ( NULL == recv_req ) {
392 opal_output (1, "OUT OF MEMORY\n");
393 ret = OMPI_ERR_OUT_OF_RESOURCE;
394 goto exit;
395 }
396
397 global_buf = (char *) malloc (bytes_per_cycle);
398 if (NULL == global_buf){
399 opal_output(1, "OUT OF MEMORY");
400 ret = OMPI_ERR_OUT_OF_RESOURCE;
401 goto exit;
402 }
403
404 recvtype = (ompi_datatype_t **) malloc (fh->f_procs_per_group * sizeof(ompi_datatype_t *));
405 if (NULL == recvtype) {
406 opal_output (1, "OUT OF MEMORY\n");
407 ret = OMPI_ERR_OUT_OF_RESOURCE;
408 goto exit;
409 }
410 for(l=0;l<fh->f_procs_per_group;l++){
411 recvtype[l] = MPI_DATATYPE_NULL;
412 }
413 }
414
415 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
416 start_exch = MPI_Wtime();
417 #endif
418 n = 0;
419 bytes_remaining = 0;
420 current_index = 0;
421
422 for (index = 0; index < cycles; index++) {
423
424
425
426 if (my_aggregator == fh->f_rank) {
427 if (NULL != fh->f_io_array) {
428 free (fh->f_io_array);
429 fh->f_io_array = NULL;
430 }
431 fh->f_num_of_io_entries = 0;
432
433 if (NULL != recvtype){
434 for (i =0; i< fh->f_procs_per_group; i++) {
435 if ( MPI_DATATYPE_NULL != recvtype[i] ) {
436 ompi_datatype_destroy(&recvtype[i]);
437 recvtype[i] = MPI_DATATYPE_NULL;
438 }
439 }
440 }
441
442 for(l=0;l<fh->f_procs_per_group;l++){
443 disp_index[l] = 1;
444
445 free(blocklen_per_process[l]);
446 free(displs_per_process[l]);
447
448 blocklen_per_process[l] = (int *) calloc (1, sizeof(int));
449 displs_per_process[l] = (MPI_Aint *) calloc (1, sizeof(MPI_Aint));
450 if (NULL == displs_per_process[l] || NULL == blocklen_per_process[l]){
451 opal_output (1, "OUT OF MEMORY for displs\n");
452 ret = OMPI_ERR_OUT_OF_RESOURCE;
453 goto exit;
454 }
455 }
456
457 if (NULL != sorted_file_offsets){
458 free(sorted_file_offsets);
459 sorted_file_offsets = NULL;
460 }
461
462 if(NULL != file_offsets_for_agg){
463 free(file_offsets_for_agg);
464 file_offsets_for_agg = NULL;
465 }
466
467 if (NULL != memory_displacements){
468 free(memory_displacements);
469 memory_displacements = NULL;
470 }
471
472 }
473
474
475
476
477 if (cycles-1 == index) {
478 bytes_to_write_in_cycle = total_bytes - bytes_per_cycle*index;
479 }
480 else {
481 bytes_to_write_in_cycle = bytes_per_cycle;
482 }
483
484 #if DEBUG_ON
485 if (my_aggregator == fh->f_rank) {
486 printf ("****%d: CYCLE %d Bytes %lld**********\n",
487 fh->f_rank,
488 index,
489 bytes_to_write_in_cycle);
490 }
491 #endif
492
493
494
495
496 #if DEBUG_ON
497 printf("bytes_to_write_in_cycle: %ld, cycle : %d\n", bytes_to_write_in_cycle,
498 index);
499 #endif
500
501
502
503
504
505 bytes_sent = 0;
506
507
508 while (bytes_to_write_in_cycle) {
509
510
511
512
513 blocks = fview_count[0];
514 for (j=0 ; j<fh->f_procs_per_group ; j++) {
515 if (sorted[current_index] < blocks) {
516 n = j;
517 break;
518 }
519 else {
520 blocks += fview_count[j+1];
521 }
522 }
523
524 if (bytes_remaining) {
525
526
527 if (bytes_remaining <= bytes_to_write_in_cycle) {
528
529 if (my_aggregator == fh->f_rank) {
530 blocklen_per_process[n][disp_index[n] - 1] = bytes_remaining;
531 displs_per_process[n][disp_index[n] - 1] =
532 (ptrdiff_t)global_iov_array[sorted[current_index]].iov_base +
533 (global_iov_array[sorted[current_index]].iov_len
534 - bytes_remaining);
535
536
537
538 blocklen_per_process[n] = (int *) realloc
539 ((void *)blocklen_per_process[n], (disp_index[n]+1)*sizeof(int));
540 displs_per_process[n] = (MPI_Aint *) realloc
541 ((void *)displs_per_process[n], (disp_index[n]+1)*sizeof(MPI_Aint));
542 blocklen_per_process[n][disp_index[n]] = 0;
543 displs_per_process[n][disp_index[n]] = 0;
544 disp_index[n] += 1;
545 }
546 if (fh->f_procs_in_group[n] == fh->f_rank) {
547 bytes_sent += bytes_remaining;
548 }
549 current_index ++;
550 bytes_to_write_in_cycle -= bytes_remaining;
551 bytes_remaining = 0;
552 continue;
553 }
554 else {
555
556
557 if (my_aggregator == fh->f_rank) {
558 blocklen_per_process[n][disp_index[n] - 1] = bytes_to_write_in_cycle;
559 displs_per_process[n][disp_index[n] - 1] =
560 (ptrdiff_t)global_iov_array[sorted[current_index]].iov_base +
561 (global_iov_array[sorted[current_index]].iov_len
562 - bytes_remaining);
563 }
564
565 if (fh->f_procs_in_group[n] == fh->f_rank) {
566 bytes_sent += bytes_to_write_in_cycle;
567 }
568 bytes_remaining -= bytes_to_write_in_cycle;
569 bytes_to_write_in_cycle = 0;
570 break;
571 }
572 }
573 else {
574
575 if (bytes_to_write_in_cycle <
576 (MPI_Aint) global_iov_array[sorted[current_index]].iov_len) {
577
578 if (my_aggregator == fh->f_rank) {
579 blocklen_per_process[n][disp_index[n] - 1] = bytes_to_write_in_cycle;
580 displs_per_process[n][disp_index[n] - 1] =
581 (ptrdiff_t)global_iov_array[sorted[current_index]].iov_base ;
582 }
583 if (fh->f_procs_in_group[n] == fh->f_rank) {
584 bytes_sent += bytes_to_write_in_cycle;
585
586 }
587 bytes_remaining = global_iov_array[sorted[current_index]].iov_len -
588 bytes_to_write_in_cycle;
589 bytes_to_write_in_cycle = 0;
590 break;
591 }
592 else {
593
594 if (my_aggregator == fh->f_rank) {
595 blocklen_per_process[n][disp_index[n] - 1] =
596 global_iov_array[sorted[current_index]].iov_len;
597 displs_per_process[n][disp_index[n] - 1] = (ptrdiff_t)
598 global_iov_array[sorted[current_index]].iov_base;
599
600
601
602
603 blocklen_per_process[n] =
604 (int *) realloc ((void *)blocklen_per_process[n], (disp_index[n]+1)*sizeof(int));
605 displs_per_process[n] = (MPI_Aint *)realloc
606 ((void *)displs_per_process[n], (disp_index[n]+1)*sizeof(MPI_Aint));
607 blocklen_per_process[n][disp_index[n]] = 0;
608 displs_per_process[n][disp_index[n]] = 0;
609 disp_index[n] += 1;
610 }
611 if (fh->f_procs_in_group[n] == fh->f_rank) {
612 bytes_sent += global_iov_array[sorted[current_index]].iov_len;
613 }
614 bytes_to_write_in_cycle -=
615 global_iov_array[sorted[current_index]].iov_len;
616 current_index ++;
617 continue;
618 }
619 }
620 }
621
622
623
624
625
626
627 if (my_aggregator == fh->f_rank) {
628 entries_per_aggregator=0;
629 for (i=0;i<fh->f_procs_per_group; i++){
630 for (j=0;j<disp_index[i];j++){
631 if (blocklen_per_process[i][j] > 0)
632 entries_per_aggregator++ ;
633 }
634 }
635
636 #if DEBUG_ON
637 printf("%d: cycle: %d, bytes_sent: %d\n ",fh->f_rank,index,
638 bytes_sent);
639 printf("%d : Entries per aggregator : %d\n",fh->f_rank,entries_per_aggregator);
640 #endif
641
642 if (entries_per_aggregator > 0){
643 file_offsets_for_agg = (mca_io_ompio_local_io_array *)
644 malloc(entries_per_aggregator*sizeof(mca_io_ompio_local_io_array));
645 if (NULL == file_offsets_for_agg) {
646 opal_output (1, "OUT OF MEMORY\n");
647 ret = OMPI_ERR_OUT_OF_RESOURCE;
648 goto exit;
649 }
650
651 sorted_file_offsets = (int *)
652 malloc (entries_per_aggregator*sizeof(int));
653 if (NULL == sorted_file_offsets){
654 opal_output (1, "OUT OF MEMORY\n");
655 ret = OMPI_ERR_OUT_OF_RESOURCE;
656 goto exit;
657 }
658
659
660 temp_index = 0;
661
662 for (i=0;i<fh->f_procs_per_group; i++){
663 for(j=0;j<disp_index[i];j++){
664 if (blocklen_per_process[i][j] > 0){
665 file_offsets_for_agg[temp_index].length =
666 blocklen_per_process[i][j];
667 file_offsets_for_agg[temp_index].process_id = i;
668 file_offsets_for_agg[temp_index].offset =
669 displs_per_process[i][j];
670 temp_index++;
671
672 #if DEBUG_ON
673 printf("************Cycle: %d, Aggregator: %d ***************\n",
674 index+1,fh->f_rank);
675
676 printf("%d sends blocklen[%d]: %d, disp[%d]: %ld to %d\n",
677 fh->f_procs_in_group[i],j,
678 blocklen_per_process[i][j],j,
679 displs_per_process[i][j],
680 fh->f_rank);
681 #endif
682 }
683 }
684 }
685 }
686 else{
687 continue;
688 }
689
690 local_heap_sort (file_offsets_for_agg,
691 entries_per_aggregator,
692 sorted_file_offsets);
693
694
695
696
697
698 memory_displacements = (MPI_Aint *) malloc
699 (entries_per_aggregator * sizeof(MPI_Aint));
700
701 memory_displacements[sorted_file_offsets[0]] = 0;
702 for (i=1; i<entries_per_aggregator; i++){
703 memory_displacements[sorted_file_offsets[i]] =
704 memory_displacements[sorted_file_offsets[i-1]] +
705 file_offsets_for_agg[sorted_file_offsets[i-1]].length;
706 }
707
708 temp_disp_index = (int *)calloc (1, fh->f_procs_per_group * sizeof (int));
709 if (NULL == temp_disp_index) {
710 opal_output (1, "OUT OF MEMORY\n");
711 ret = OMPI_ERR_OUT_OF_RESOURCE;
712 goto exit;
713 }
714
715
716 global_count = 0;
717 for (i=0;i<entries_per_aggregator;i++){
718 temp_pindex =
719 file_offsets_for_agg[sorted_file_offsets[i]].process_id;
720 displs_per_process[temp_pindex][temp_disp_index[temp_pindex]] =
721 memory_displacements[sorted_file_offsets[i]];
722 if (temp_disp_index[temp_pindex] < disp_index[temp_pindex])
723 temp_disp_index[temp_pindex] += 1;
724 else{
725 printf("temp_disp_index[%d]: %d is greater than disp_index[%d]: %d\n",
726 temp_pindex, temp_disp_index[temp_pindex],
727 temp_pindex, disp_index[temp_pindex]);
728 }
729 global_count +=
730 file_offsets_for_agg[sorted_file_offsets[i]].length;
731 }
732
733 if (NULL != temp_disp_index){
734 free(temp_disp_index);
735 temp_disp_index = NULL;
736 }
737
738 #if DEBUG_ON
739
740 printf("************Cycle: %d, Aggregator: %d ***************\n",
741 index+1,fh->f_rank);
742 for (i=0;i<fh->f_procs_per_group; i++){
743 for(j=0;j<disp_index[i];j++){
744 if (blocklen_per_process[i][j] > 0){
745 printf("%d sends blocklen[%d]: %d, disp[%d]: %ld to %d\n",
746 fh->f_procs_in_group[i],j,
747 blocklen_per_process[i][j],j,
748 displs_per_process[i][j],
749 fh->f_rank);
750
751 }
752 }
753 }
754 printf("************Cycle: %d, Aggregator: %d ***************\n",
755 index+1,fh->f_rank);
756 for (i=0; i<entries_per_aggregator;i++){
757 printf("%d: OFFSET: %lld LENGTH: %ld, Mem-offset: %ld\n",
758 file_offsets_for_agg[sorted_file_offsets[i]].process_id,
759 file_offsets_for_agg[sorted_file_offsets[i]].offset,
760 file_offsets_for_agg[sorted_file_offsets[i]].length,
761 memory_displacements[sorted_file_offsets[i]]);
762 }
763 printf("%d : global_count : %ld, bytes_sent : %d\n",
764 fh->f_rank,global_count, bytes_sent);
765 #endif
766 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
767 start_comm_time = MPI_Wtime();
768 #endif
769
770
771
772 for (i=0;i<fh->f_procs_per_group; i++) {
773 recv_req[i] = MPI_REQUEST_NULL;
774 if ( 0 < disp_index[i] ) {
775 ompi_datatype_create_hindexed(disp_index[i],
776 blocklen_per_process[i],
777 displs_per_process[i],
778 MPI_BYTE,
779 &recvtype[i]);
780 ompi_datatype_commit(&recvtype[i]);
781 opal_datatype_type_size(&recvtype[i]->super, &datatype_size);
782
783 if (datatype_size){
784 ret = MCA_PML_CALL(irecv(global_buf,
785 1,
786 recvtype[i],
787 fh->f_procs_in_group[i],
788 123,
789 fh->f_comm,
790 &recv_req[i]));
791 if (OMPI_SUCCESS != ret){
792 goto exit;
793 }
794 }
795 }
796 }
797 }
798
799
800 if ( sendbuf_is_contiguous ) {
801 send_buf = &((char*)buf)[total_bytes_written];
802 }
803 else if (bytes_sent) {
804
805
806
807 ptrdiff_t mem_address;
808 size_t remaining = 0;
809 size_t temp_position = 0;
810
811 send_buf = malloc (bytes_sent);
812 if (NULL == send_buf) {
813 opal_output (1, "OUT OF MEMORY\n");
814 ret = OMPI_ERR_OUT_OF_RESOURCE;
815 goto exit;
816 }
817
818 remaining = bytes_sent;
819
820 while (remaining) {
821 mem_address = (ptrdiff_t)
822 (decoded_iov[iov_index].iov_base) + current_position;
823
824 if (remaining >=
825 (decoded_iov[iov_index].iov_len - current_position)) {
826 memcpy (send_buf+temp_position,
827 (IOVBASE_TYPE *)mem_address,
828 decoded_iov[iov_index].iov_len - current_position);
829 remaining = remaining -
830 (decoded_iov[iov_index].iov_len - current_position);
831 temp_position = temp_position +
832 (decoded_iov[iov_index].iov_len - current_position);
833 iov_index = iov_index + 1;
834 current_position = 0;
835 }
836 else {
837 memcpy (send_buf+temp_position,
838 (IOVBASE_TYPE *) mem_address,
839 remaining);
840 current_position = current_position + remaining;
841 remaining = 0;
842 }
843 }
844 }
845 total_bytes_written += bytes_sent;
846
847
848
849
850 if (bytes_sent){
851 ret = MCA_PML_CALL(isend(send_buf,
852 bytes_sent,
853 MPI_BYTE,
854 my_aggregator,
855 123,
856 MCA_PML_BASE_SEND_STANDARD,
857 fh->f_comm,
858 &send_req));
859
860
861 if ( OMPI_SUCCESS != ret ){
862 goto exit;
863 }
864
865 ret = ompi_request_wait(&send_req, MPI_STATUS_IGNORE);
866 if (OMPI_SUCCESS != ret){
867 goto exit;
868 }
869 }
870
871 if (my_aggregator == fh->f_rank) {
872 ret = ompi_request_wait_all (fh->f_procs_per_group,
873 recv_req,
874 MPI_STATUS_IGNORE);
875
876 if (OMPI_SUCCESS != ret){
877 goto exit;
878 }
879 }
880
881 #if DEBUG_ON
882 if (my_aggregator == fh->f_rank){
883 printf("************Cycle: %d, Aggregator: %d ***************\n",
884 index+1,fh->f_rank);
885 for (i=0 ; i<global_count/4 ; i++)
886 printf (" RECV %d \n",((int *)global_buf)[i]);
887 }
888 #endif
889
890 if (! sendbuf_is_contiguous) {
891 if (NULL != send_buf) {
892 free (send_buf);
893 send_buf = NULL;
894 }
895 }
896
897 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
898 end_comm_time = MPI_Wtime();
899 comm_time += (end_comm_time - start_comm_time);
900 #endif
901
902
903
904
905 if (my_aggregator == fh->f_rank) {
906
907 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
908 start_write_time = MPI_Wtime();
909 #endif
910
911 fh->f_io_array = (mca_common_ompio_io_array_t *) malloc
912 (entries_per_aggregator * sizeof (mca_common_ompio_io_array_t));
913 if (NULL == fh->f_io_array) {
914 opal_output(1, "OUT OF MEMORY\n");
915 ret = OMPI_ERR_OUT_OF_RESOURCE;
916 goto exit;
917 }
918
919 fh->f_num_of_io_entries = 0;
920
921 fh->f_io_array[0].offset =
922 (IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_file_offsets[0]].offset;
923 fh->f_io_array[0].length =
924 file_offsets_for_agg[sorted_file_offsets[0]].length;
925 fh->f_io_array[0].memory_address =
926 global_buf+memory_displacements[sorted_file_offsets[0]];
927 fh->f_num_of_io_entries++;
928
929 for (i=1;i<entries_per_aggregator;i++){
930
931
932 if (file_offsets_for_agg[sorted_file_offsets[i-1]].offset +
933 file_offsets_for_agg[sorted_file_offsets[i-1]].length ==
934 file_offsets_for_agg[sorted_file_offsets[i]].offset){
935 fh->f_io_array[fh->f_num_of_io_entries - 1].length +=
936 file_offsets_for_agg[sorted_file_offsets[i]].length;
937 }
938 else {
939 fh->f_io_array[fh->f_num_of_io_entries].offset =
940 (IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_file_offsets[i]].offset;
941 fh->f_io_array[fh->f_num_of_io_entries].length =
942 file_offsets_for_agg[sorted_file_offsets[i]].length;
943 fh->f_io_array[fh->f_num_of_io_entries].memory_address =
944 global_buf+memory_displacements[sorted_file_offsets[i]];
945 fh->f_num_of_io_entries++;
946 }
947
948 }
949
950 #if DEBUG_ON
951 printf("*************************** %d\n", fh->f_num_of_io_entries);
952 for (i=0 ; i<fh->f_num_of_io_entries ; i++) {
953 printf(" ADDRESS: %p OFFSET: %ld LENGTH: %ld\n",
954 fh->f_io_array[i].memory_address,
955 (ptrdiff_t)fh->f_io_array[i].offset,
956 fh->f_io_array[i].length);
957 }
958
959 #endif
960
961 if (fh->f_num_of_io_entries) {
962 if ( 0 > fh->f_fbtl->fbtl_pwritev (fh)) {
963 opal_output (1, "WRITE FAILED\n");
964 ret = OMPI_ERROR;
965 goto exit;
966 }
967 }
968 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
969 end_write_time = MPI_Wtime();
970 write_time += end_write_time - start_write_time;
971 #endif
972
973
974 }
975 }
976
977 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
978 end_exch = MPI_Wtime();
979 exch_write += end_exch - start_exch;
980 nentry.time[0] = write_time;
981 nentry.time[1] = comm_time;
982 nentry.time[2] = exch_write;
983 if (my_aggregator == fh->f_rank)
984 nentry.aggregator = 1;
985 else
986 nentry.aggregator = 0;
987 nentry.nprocs_for_coll = dynamic_num_io_procs;
988 if (!mca_common_ompio_full_print_queue(fh->f_coll_write_time)){
989 mca_common_ompio_register_print_entry(fh->f_coll_write_time,
990 nentry);
991 }
992 #endif
993
994
995 exit :
996 if (my_aggregator == fh->f_rank) {
997 if (NULL != sorted_file_offsets){
998 free(sorted_file_offsets);
999 sorted_file_offsets = NULL;
1000 }
1001 if(NULL != file_offsets_for_agg){
1002 free(file_offsets_for_agg);
1003 file_offsets_for_agg = NULL;
1004 }
1005 if (NULL != memory_displacements){
1006 free(memory_displacements);
1007 memory_displacements = NULL;
1008 }
1009 if (NULL != recvtype){
1010 for (i =0; i< fh->f_procs_per_group; i++) {
1011 if ( MPI_DATATYPE_NULL != recvtype[i] ) {
1012 ompi_datatype_destroy(&recvtype[i]);
1013 }
1014 }
1015 free(recvtype);
1016 recvtype=NULL;
1017 }
1018
1019 if (NULL != fh->f_io_array) {
1020 free (fh->f_io_array);
1021 fh->f_io_array = NULL;
1022 }
1023 if (NULL != disp_index){
1024 free(disp_index);
1025 disp_index = NULL;
1026 }
1027 if (NULL != recvtype){
1028 free(recvtype);
1029 recvtype=NULL;
1030 }
1031 if (NULL != recv_req){
1032 free(recv_req);
1033 recv_req = NULL;
1034 }
1035 if (NULL != global_buf) {
1036 free (global_buf);
1037 global_buf = NULL;
1038 }
1039 for(l=0;l<fh->f_procs_per_group;l++){
1040 if (NULL != blocklen_per_process){
1041 free(blocklen_per_process[l]);
1042 }
1043 if (NULL != displs_per_process){
1044 free(displs_per_process[l]);
1045 }
1046 }
1047
1048 free(blocklen_per_process);
1049 free(displs_per_process);
1050 }
1051
1052 if (NULL != displs){
1053 free(displs);
1054 displs=NULL;
1055 }
1056
1057 if (! sendbuf_is_contiguous) {
1058 if (NULL != send_buf) {
1059 free (send_buf);
1060 send_buf = NULL;
1061 }
1062 }
1063 if (NULL != global_buf) {
1064 free (global_buf);
1065 global_buf = NULL;
1066 }
1067 if (NULL != sorted) {
1068 free (sorted);
1069 sorted = NULL;
1070 }
1071 if (NULL != global_iov_array) {
1072 free (global_iov_array);
1073 global_iov_array = NULL;
1074 }
1075 if (NULL != fview_count) {
1076 free (fview_count);
1077 fview_count = NULL;
1078 }
1079 if (NULL != decoded_iov) {
1080 free (decoded_iov);
1081 decoded_iov = NULL;
1082 }
1083
1084
1085 return OMPI_SUCCESS;
1086 }
1087
1088
1089 static int local_heap_sort (mca_io_ompio_local_io_array *io_array,
1090 int num_entries,
1091 int *sorted)
1092 {
1093 int i = 0;
1094 int j = 0;
1095 int left = 0;
1096 int right = 0;
1097 int largest = 0;
1098 int heap_size = num_entries - 1;
1099 int temp = 0;
1100 unsigned char done = 0;
1101 int* temp_arr = NULL;
1102
1103 temp_arr = (int*)malloc(num_entries*sizeof(int));
1104 if (NULL == temp_arr) {
1105 opal_output (1, "OUT OF MEMORY\n");
1106 return OMPI_ERR_OUT_OF_RESOURCE;
1107 }
1108 temp_arr[0] = 0;
1109 for (i = 1; i < num_entries; ++i) {
1110 temp_arr[i] = i;
1111 }
1112
1113 for (i = num_entries/2-1 ; i>=0 ; i--) {
1114 done = 0;
1115 j = i;
1116 largest = j;
1117
1118 while (!done) {
1119 left = j*2+1;
1120 right = j*2+2;
1121 if ((left <= heap_size) &&
1122 (io_array[temp_arr[left]].offset > io_array[temp_arr[j]].offset)) {
1123 largest = left;
1124 }
1125 else {
1126 largest = j;
1127 }
1128 if ((right <= heap_size) &&
1129 (io_array[temp_arr[right]].offset >
1130 io_array[temp_arr[largest]].offset)) {
1131 largest = right;
1132 }
1133 if (largest != j) {
1134 temp = temp_arr[largest];
1135 temp_arr[largest] = temp_arr[j];
1136 temp_arr[j] = temp;
1137 j = largest;
1138 }
1139 else {
1140 done = 1;
1141 }
1142 }
1143 }
1144
1145 for (i = num_entries-1; i >=1; --i) {
1146 temp = temp_arr[0];
1147 temp_arr[0] = temp_arr[i];
1148 temp_arr[i] = temp;
1149 heap_size--;
1150 done = 0;
1151 j = 0;
1152 largest = j;
1153
1154 while (!done) {
1155 left = j*2+1;
1156 right = j*2+2;
1157
1158 if ((left <= heap_size) &&
1159 (io_array[temp_arr[left]].offset >
1160 io_array[temp_arr[j]].offset)) {
1161 largest = left;
1162 }
1163 else {
1164 largest = j;
1165 }
1166 if ((right <= heap_size) &&
1167 (io_array[temp_arr[right]].offset >
1168 io_array[temp_arr[largest]].offset)) {
1169 largest = right;
1170 }
1171 if (largest != j) {
1172 temp = temp_arr[largest];
1173 temp_arr[largest] = temp_arr[j];
1174 temp_arr[j] = temp;
1175 j = largest;
1176 }
1177 else {
1178 done = 1;
1179 }
1180 }
1181 sorted[i] = temp_arr[i];
1182 }
1183 sorted[0] = temp_arr[0];
1184
1185 if (NULL != temp_arr) {
1186 free(temp_arr);
1187 temp_arr = NULL;
1188 }
1189 return OMPI_SUCCESS;
1190 }
1191
1192