This source file includes following definitions.
- mca_fcoll_vulcan_file_read_all
- read_heap_sort
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 #include "ompi_config.h"
23 #include "fcoll_vulcan.h"
24
25 #include "mpi.h"
26 #include "ompi/constants.h"
27 #include "ompi/mca/fcoll/fcoll.h"
28 #include "ompi/mca/fcoll/base/fcoll_base_coll_array.h"
29 #include "ompi/mca/common/ompio/common_ompio.h"
30 #include "ompi/mca/io/io.h"
31 #include "math.h"
32 #include "ompi/mca/pml/pml.h"
33 #include <unistd.h>
34
35 #define DEBUG_ON 0
36
37
38 typedef struct mca_io_ompio_local_io_array{
39 OMPI_MPI_OFFSET_TYPE offset;
40 MPI_Aint length;
41 int process_id;
42 }mca_io_ompio_local_io_array;
43
44
45 static int read_heap_sort (mca_io_ompio_local_io_array *io_array,
46 int num_entries,
47 int *sorted);
48
49
50
51 int
52 mca_fcoll_vulcan_file_read_all (ompio_file_t *fh,
53 void *buf,
54 int count,
55 struct ompi_datatype_t *datatype,
56 ompi_status_public_t *status)
57 {
58 MPI_Aint position = 0;
59 MPI_Aint total_bytes = 0;
60 MPI_Aint bytes_to_read_in_cycle = 0;
61 MPI_Aint bytes_per_cycle = 0;
62 int index = 0, ret=OMPI_SUCCESS;
63 int cycles = 0;
64 int i=0, j=0, l=0;
65 int n=0;
66 MPI_Aint bytes_remaining = 0;
67
68 int *sorted_file_offsets=NULL, entries_per_aggregator=0;
69 int bytes_received = 0;
70 int blocks = 0;
71
72 uint32_t iov_count = 0;
73 struct iovec *decoded_iov = NULL;
74 int iov_index = 0;
75 size_t current_position = 0;
76 struct iovec *local_iov_array=NULL, *global_iov_array=NULL;
77 char *receive_buf = NULL;
78 MPI_Aint *memory_displacements=NULL;
79
80
81 uint32_t total_fview_count = 0;
82 int local_count = 0;
83 int *fview_count = NULL, *disp_index=NULL, *temp_disp_index=NULL;
84 int current_index=0, temp_index=0;
85 int **blocklen_per_process=NULL;
86 MPI_Aint **displs_per_process=NULL;
87 char *global_buf = NULL;
88 MPI_Aint global_count = 0;
89 mca_io_ompio_local_io_array *file_offsets_for_agg=NULL;
90
91
92 int *sorted = NULL;
93 int *displs = NULL;
94 int vulcan_num_io_procs;
95 size_t max_data = 0;
96 MPI_Aint *total_bytes_per_process = NULL;
97 ompi_datatype_t **sendtype = NULL;
98 MPI_Request *send_req=NULL, recv_req=NULL;
99 int my_aggregator =-1;
100 bool recvbuf_is_contiguous=false;
101 size_t ftype_size;
102 ptrdiff_t ftype_extent, lb;
103
104
105 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
106 double read_time = 0.0, start_read_time = 0.0, end_read_time = 0.0;
107 double rcomm_time = 0.0, start_rcomm_time = 0.0, end_rcomm_time = 0.0;
108 double read_exch = 0.0, start_rexch = 0.0, end_rexch = 0.0;
109 mca_common_ompio_print_entry nentry;
110 #endif
111
112
113
114
115
116 opal_datatype_type_size ( &datatype->super, &ftype_size );
117 opal_datatype_get_extent ( &datatype->super, &lb, &ftype_extent );
118
119 if ( (ftype_extent == (ptrdiff_t) ftype_size) &&
120 opal_datatype_is_contiguous_memory_layout(&datatype->super,1) &&
121 0 == lb ) {
122 recvbuf_is_contiguous = true;
123 }
124
125
126 if (! recvbuf_is_contiguous ) {
127 ret = mca_common_ompio_decode_datatype ((struct ompio_file_t *)fh,
128 datatype,
129 count,
130 buf,
131 &max_data,
132 fh->f_mem_convertor,
133 &decoded_iov,
134 &iov_count);
135 if (OMPI_SUCCESS != ret){
136 goto exit;
137 }
138 }
139 else {
140 max_data = count * datatype->super.size;
141 }
142
143 if ( MPI_STATUS_IGNORE != status ) {
144 status->_ucount = max_data;
145 }
146
147 vulcan_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators"));
148 if ( OMPI_ERR_MAX == vulcan_num_io_procs ) {
149 ret = OMPI_ERROR;
150 goto exit;
151 }
152
153 ret = mca_common_ompio_set_aggregator_props ((struct ompio_file_t *) fh,
154 vulcan_num_io_procs,
155 max_data);
156 if (OMPI_SUCCESS != ret){
157 goto exit;
158 }
159 my_aggregator = fh->f_procs_in_group[0];
160
161
162
163
164 total_bytes_per_process = (MPI_Aint*)malloc(fh->f_procs_per_group*sizeof(MPI_Aint));
165 if (NULL == total_bytes_per_process) {
166 opal_output (1, "OUT OF MEMORY\n");
167 ret = OMPI_ERR_OUT_OF_RESOURCE;
168 goto exit;
169 }
170 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
171 start_rcomm_time = MPI_Wtime();
172 #endif
173 ret = ompi_fcoll_base_coll_allgather_array (&max_data,
174 1,
175 MPI_LONG,
176 total_bytes_per_process,
177 1,
178 MPI_LONG,
179 0,
180 fh->f_procs_in_group,
181 fh->f_procs_per_group,
182 fh->f_comm);
183 if (OMPI_SUCCESS != ret){
184 goto exit;
185 }
186 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
187 end_rcomm_time = MPI_Wtime();
188 rcomm_time += end_rcomm_time - start_rcomm_time;
189 #endif
190
191 for (i=0 ; i<fh->f_procs_per_group ; i++) {
192 total_bytes += total_bytes_per_process[i];
193 }
194
195 if (NULL != total_bytes_per_process) {
196 free (total_bytes_per_process);
197 total_bytes_per_process = NULL;
198 }
199
200
201
202
203 ret = fh->f_generate_current_file_view ((struct ompio_file_t *) fh,
204 max_data,
205 &local_iov_array,
206 &local_count);
207
208 if (ret != OMPI_SUCCESS){
209 goto exit;
210 }
211
212
213
214
215
216 fview_count = (int *) malloc (fh->f_procs_per_group * sizeof (int));
217 if (NULL == fview_count) {
218 opal_output (1, "OUT OF MEMORY\n");
219 ret = OMPI_ERR_OUT_OF_RESOURCE;
220 goto exit;
221 }
222 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
223 start_rcomm_time = MPI_Wtime();
224 #endif
225 ret = ompi_fcoll_base_coll_allgather_array (&local_count,
226 1,
227 MPI_INT,
228 fview_count,
229 1,
230 MPI_INT,
231 0,
232 fh->f_procs_in_group,
233 fh->f_procs_per_group,
234 fh->f_comm);
235
236 if (OMPI_SUCCESS != ret){
237 goto exit;
238 }
239 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
240 end_rcomm_time = MPI_Wtime();
241 rcomm_time += end_rcomm_time - start_rcomm_time;
242 #endif
243
244 displs = (int*)malloc (fh->f_procs_per_group*sizeof(int));
245 if (NULL == displs) {
246 opal_output (1, "OUT OF MEMORY\n");
247 ret = OMPI_ERR_OUT_OF_RESOURCE;
248 goto exit;
249 }
250
251 displs[0] = 0;
252 total_fview_count = fview_count[0];
253 for (i=1 ; i<fh->f_procs_per_group ; i++) {
254 total_fview_count += fview_count[i];
255 displs[i] = displs[i-1] + fview_count[i-1];
256 }
257
258 #if DEBUG_ON
259 if (my_aggregator == fh->f_rank) {
260 for (i=0 ; i<fh->f_procs_per_group ; i++) {
261 printf ("%d: PROCESS: %d ELEMENTS: %d DISPLS: %d\n",
262 fh->f_rank,
263 i,
264 fview_count[i],
265 displs[i]);
266 }
267 }
268 #endif
269
270
271 if (0 != total_fview_count) {
272 global_iov_array = (struct iovec*)malloc (total_fview_count *
273 sizeof(struct iovec));
274 if (NULL == global_iov_array) {
275 opal_output (1, "OUT OF MEMORY\n");
276 ret = OMPI_ERR_OUT_OF_RESOURCE;
277 goto exit;
278 }
279 }
280 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
281 start_rcomm_time = MPI_Wtime();
282 #endif
283 ret = ompi_fcoll_base_coll_allgatherv_array (local_iov_array,
284 local_count,
285 fh->f_iov_type,
286 global_iov_array,
287 fview_count,
288 displs,
289 fh->f_iov_type,
290 0,
291 fh->f_procs_in_group,
292 fh->f_procs_per_group,
293 fh->f_comm);
294
295 if (OMPI_SUCCESS != ret){
296 goto exit;
297 }
298 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
299 end_rcomm_time = MPI_Wtime();
300 rcomm_time += end_rcomm_time - start_rcomm_time;
301 #endif
302
303
304
305
306
307
308
309
310
311 if (0 != total_fview_count) {
312 sorted = (int *)malloc (total_fview_count * sizeof(int));
313 if (NULL == sorted) {
314 opal_output (1, "OUT OF MEMORY\n");
315 ret = OMPI_ERR_OUT_OF_RESOURCE;
316 goto exit;
317 }
318 ompi_fcoll_base_sort_iovec (global_iov_array, total_fview_count, sorted);
319 }
320
321 if (NULL != local_iov_array) {
322 free (local_iov_array);
323 local_iov_array = NULL;
324 }
325
326 #if DEBUG_ON
327 if (my_aggregator == fh->f_rank) {
328 for (i=0 ; i<total_fview_count ; i++) {
329 printf("%d: OFFSET: %p LENGTH: %d\n",
330 fh->f_rank,
331 global_iov_array[sorted[i]].iov_base,
332 global_iov_array[sorted[i]].iov_len);
333 }
334 }
335 #endif
336
337
338
339
340
341 bytes_per_cycle = fh->f_bytes_per_agg;
342 cycles = ceil((double)total_bytes/bytes_per_cycle);
343
344 if ( my_aggregator == fh->f_rank) {
345 disp_index = (int *)malloc (fh->f_procs_per_group * sizeof (int));
346 if (NULL == disp_index) {
347 opal_output (1, "OUT OF MEMORY\n");
348 ret = OMPI_ERR_OUT_OF_RESOURCE;
349 goto exit;
350 }
351
352 blocklen_per_process = (int **)malloc (fh->f_procs_per_group * sizeof (int*));
353 if (NULL == blocklen_per_process) {
354 opal_output (1, "OUT OF MEMORY\n");
355 ret = OMPI_ERR_OUT_OF_RESOURCE;
356 goto exit;
357 }
358
359 displs_per_process = (MPI_Aint **)malloc (fh->f_procs_per_group * sizeof (MPI_Aint*));
360 if (NULL == displs_per_process){
361 opal_output (1, "OUT OF MEMORY\n");
362 ret = OMPI_ERR_OUT_OF_RESOURCE;
363 goto exit;
364 }
365
366 for (i=0;i<fh->f_procs_per_group;i++){
367 blocklen_per_process[i] = NULL;
368 displs_per_process[i] = NULL;
369 }
370
371 send_req = (MPI_Request *) malloc (fh->f_procs_per_group * sizeof(MPI_Request));
372 if (NULL == send_req){
373 opal_output ( 1, "OUT OF MEMORY\n");
374 ret = OMPI_ERR_OUT_OF_RESOURCE;
375 goto exit;
376 }
377
378 global_buf = (char *) malloc (bytes_per_cycle);
379 if (NULL == global_buf){
380 opal_output(1, "OUT OF MEMORY\n");
381 ret = OMPI_ERR_OUT_OF_RESOURCE;
382 goto exit;
383 }
384
385 sendtype = (ompi_datatype_t **) malloc (fh->f_procs_per_group * sizeof(ompi_datatype_t *));
386 if (NULL == sendtype) {
387 opal_output (1, "OUT OF MEMORY\n");
388 ret = OMPI_ERR_OUT_OF_RESOURCE;
389 goto exit;
390 }
391
392 for(l=0;l<fh->f_procs_per_group;l++){
393 sendtype[l] = MPI_DATATYPE_NULL;
394 }
395 }
396
397
398
399
400 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
401 start_rexch = MPI_Wtime();
402 #endif
403 n = 0;
404 bytes_remaining = 0;
405 current_index = 0;
406
407 for (index = 0; index < cycles; index++) {
408
409
410
411 if (my_aggregator == fh->f_rank) {
412 if (NULL != fh->f_io_array) {
413 free (fh->f_io_array);
414 fh->f_io_array = NULL;
415 }
416 fh->f_num_of_io_entries = 0;
417
418 if (NULL != sendtype){
419 for (i =0; i< fh->f_procs_per_group; i++) {
420 if ( MPI_DATATYPE_NULL != sendtype[i] ) {
421 ompi_datatype_destroy(&sendtype[i]);
422 sendtype[i] = MPI_DATATYPE_NULL;
423 }
424 }
425 }
426
427 for(l=0;l<fh->f_procs_per_group;l++){
428 disp_index[l] = 1;
429
430 if (NULL != blocklen_per_process[l]){
431 free(blocklen_per_process[l]);
432 blocklen_per_process[l] = NULL;
433 }
434 if (NULL != displs_per_process[l]){
435 free(displs_per_process[l]);
436 displs_per_process[l] = NULL;
437 }
438 blocklen_per_process[l] = (int *) calloc (1, sizeof(int));
439 if (NULL == blocklen_per_process[l]) {
440 opal_output (1, "OUT OF MEMORY for blocklen\n");
441 ret = OMPI_ERR_OUT_OF_RESOURCE;
442 goto exit;
443 }
444 displs_per_process[l] = (MPI_Aint *) calloc (1, sizeof(MPI_Aint));
445 if (NULL == displs_per_process[l]){
446 opal_output (1, "OUT OF MEMORY for displs\n");
447 ret = OMPI_ERR_OUT_OF_RESOURCE;
448 goto exit;
449 }
450 }
451
452 if (NULL != sorted_file_offsets){
453 free(sorted_file_offsets);
454 sorted_file_offsets = NULL;
455 }
456
457 if(NULL != file_offsets_for_agg){
458 free(file_offsets_for_agg);
459 file_offsets_for_agg = NULL;
460 }
461 if (NULL != memory_displacements){
462 free(memory_displacements);
463 memory_displacements = NULL;
464 }
465 }
466
467
468
469
470 if (cycles-1 == index) {
471 bytes_to_read_in_cycle = total_bytes - bytes_per_cycle*index;
472 }
473 else {
474 bytes_to_read_in_cycle = bytes_per_cycle;
475 }
476
477 #if DEBUG_ON
478 if (my_aggregator == fh->f_rank) {
479 printf ("****%d: CYCLE %d Bytes %d**********\n",
480 fh->f_rank,
481 index,
482 bytes_to_write_in_cycle);
483 }
484 #endif
485
486
487
488
489
490 bytes_received = 0;
491
492 while (bytes_to_read_in_cycle) {
493
494
495
496 blocks = fview_count[0];
497 for (j=0 ; j<fh->f_procs_per_group ; j++) {
498 if (sorted[current_index] < blocks) {
499 n = j;
500 break;
501 }
502 else {
503 blocks += fview_count[j+1];
504 }
505 }
506
507 if (bytes_remaining) {
508
509 if (bytes_remaining <= bytes_to_read_in_cycle) {
510
511 if (my_aggregator == fh->f_rank) {
512 blocklen_per_process[n][disp_index[n] - 1] = bytes_remaining;
513 displs_per_process[n][disp_index[n] - 1] =
514 (ptrdiff_t)global_iov_array[sorted[current_index]].iov_base +
515 (global_iov_array[sorted[current_index]].iov_len - bytes_remaining);
516
517 blocklen_per_process[n] = (int *) realloc
518 ((void *)blocklen_per_process[n], (disp_index[n]+1)*sizeof(int));
519 displs_per_process[n] = (MPI_Aint *) realloc
520 ((void *)displs_per_process[n], (disp_index[n]+1)*sizeof(MPI_Aint));
521 blocklen_per_process[n][disp_index[n]] = 0;
522 displs_per_process[n][disp_index[n]] = 0;
523 disp_index[n] += 1;
524 }
525 if (fh->f_procs_in_group[n] == fh->f_rank) {
526 bytes_received += bytes_remaining;
527 }
528 current_index ++;
529 bytes_to_read_in_cycle -= bytes_remaining;
530 bytes_remaining = 0;
531 continue;
532 }
533 else {
534
535
536 if (my_aggregator == fh->f_rank) {
537 blocklen_per_process[n][disp_index[n] - 1] = bytes_to_read_in_cycle;
538 displs_per_process[n][disp_index[n] - 1] =
539 (ptrdiff_t)global_iov_array[sorted[current_index]].iov_base +
540 (global_iov_array[sorted[current_index]].iov_len
541 - bytes_remaining);
542 }
543 if (fh->f_procs_in_group[n] == fh->f_rank) {
544 bytes_received += bytes_to_read_in_cycle;
545 }
546 bytes_remaining -= bytes_to_read_in_cycle;
547 bytes_to_read_in_cycle = 0;
548 break;
549 }
550 }
551 else {
552
553 if (bytes_to_read_in_cycle <
554 (MPI_Aint) global_iov_array[sorted[current_index]].iov_len) {
555
556 if (my_aggregator == fh->f_rank) {
557 blocklen_per_process[n][disp_index[n] - 1] = bytes_to_read_in_cycle;
558 displs_per_process[n][disp_index[n] - 1] =
559 (ptrdiff_t)global_iov_array[sorted[current_index]].iov_base ;
560 }
561
562 if (fh->f_procs_in_group[n] == fh->f_rank) {
563 bytes_received += bytes_to_read_in_cycle;
564 }
565 bytes_remaining = global_iov_array[sorted[current_index]].iov_len -
566 bytes_to_read_in_cycle;
567 bytes_to_read_in_cycle = 0;
568 break;
569 }
570 else {
571
572 if (my_aggregator == fh->f_rank) {
573 blocklen_per_process[n][disp_index[n] - 1] =
574 global_iov_array[sorted[current_index]].iov_len;
575 displs_per_process[n][disp_index[n] - 1] = (ptrdiff_t)
576 global_iov_array[sorted[current_index]].iov_base;
577 blocklen_per_process[n] =
578 (int *) realloc ((void *)blocklen_per_process[n], (disp_index[n]+1)*sizeof(int));
579 displs_per_process[n] = (MPI_Aint *)realloc
580 ((void *)displs_per_process[n], (disp_index[n]+1)*sizeof(MPI_Aint));
581 blocklen_per_process[n][disp_index[n]] = 0;
582 displs_per_process[n][disp_index[n]] = 0;
583 disp_index[n] += 1;
584 }
585 if (fh->f_procs_in_group[n] == fh->f_rank) {
586 bytes_received +=
587 global_iov_array[sorted[current_index]].iov_len;
588 }
589 bytes_to_read_in_cycle -=
590 global_iov_array[sorted[current_index]].iov_len;
591 current_index ++;
592 continue;
593 }
594 }
595 }
596
597
598
599
600
601 if (my_aggregator == fh->f_rank) {
602 entries_per_aggregator=0;
603 for (i=0;i<fh->f_procs_per_group; i++){
604 for (j=0;j<disp_index[i];j++){
605 if (blocklen_per_process[i][j] > 0)
606 entries_per_aggregator++ ;
607 }
608 }
609 if (entries_per_aggregator > 0){
610 file_offsets_for_agg = (mca_io_ompio_local_io_array *)
611 malloc(entries_per_aggregator*sizeof(mca_io_ompio_local_io_array));
612 if (NULL == file_offsets_for_agg) {
613 opal_output (1, "OUT OF MEMORY\n");
614 ret = OMPI_ERR_OUT_OF_RESOURCE;
615 goto exit;
616 }
617 sorted_file_offsets = (int *)
618 malloc (entries_per_aggregator*sizeof(int));
619 if (NULL == sorted_file_offsets){
620 opal_output (1, "OUT OF MEMORY\n");
621 ret = OMPI_ERR_OUT_OF_RESOURCE;
622 goto exit;
623 }
624
625 temp_index = 0;
626 global_count = 0;
627 for (i=0;i<fh->f_procs_per_group; i++){
628 for(j=0;j<disp_index[i];j++){
629 if (blocklen_per_process[i][j] > 0){
630 file_offsets_for_agg[temp_index].length =
631 blocklen_per_process[i][j];
632 global_count += blocklen_per_process[i][j];
633 file_offsets_for_agg[temp_index].process_id = i;
634 file_offsets_for_agg[temp_index].offset =
635 displs_per_process[i][j];
636 temp_index++;
637 }
638 }
639 }
640 }
641 else{
642 continue;
643 }
644
645
646 read_heap_sort (file_offsets_for_agg,
647 entries_per_aggregator,
648 sorted_file_offsets);
649
650 memory_displacements = (MPI_Aint *) malloc
651 (entries_per_aggregator * sizeof(MPI_Aint));
652 memory_displacements[sorted_file_offsets[0]] = 0;
653 for (i=1; i<entries_per_aggregator; i++){
654 memory_displacements[sorted_file_offsets[i]] =
655 memory_displacements[sorted_file_offsets[i-1]] +
656 file_offsets_for_agg[sorted_file_offsets[i-1]].length;
657 }
658
659
660
661
662 fh->f_io_array = (mca_common_ompio_io_array_t *) malloc
663 (entries_per_aggregator * sizeof (mca_common_ompio_io_array_t));
664 if (NULL == fh->f_io_array) {
665 opal_output(1, "OUT OF MEMORY\n");
666 ret = OMPI_ERR_OUT_OF_RESOURCE;
667 goto exit;
668 }
669
670 fh->f_num_of_io_entries = 0;
671 fh->f_io_array[0].offset =
672 (IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_file_offsets[0]].offset;
673 fh->f_io_array[0].length =
674 file_offsets_for_agg[sorted_file_offsets[0]].length;
675 fh->f_io_array[0].memory_address =
676 global_buf+memory_displacements[sorted_file_offsets[0]];
677 fh->f_num_of_io_entries++;
678 for (i=1;i<entries_per_aggregator;i++){
679 if (file_offsets_for_agg[sorted_file_offsets[i-1]].offset +
680 file_offsets_for_agg[sorted_file_offsets[i-1]].length ==
681 file_offsets_for_agg[sorted_file_offsets[i]].offset){
682 fh->f_io_array[fh->f_num_of_io_entries - 1].length +=
683 file_offsets_for_agg[sorted_file_offsets[i]].length;
684 }
685 else{
686 fh->f_io_array[fh->f_num_of_io_entries].offset =
687 (IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_file_offsets[i]].offset;
688 fh->f_io_array[fh->f_num_of_io_entries].length =
689 file_offsets_for_agg[sorted_file_offsets[i]].length;
690 fh->f_io_array[fh->f_num_of_io_entries].memory_address =
691 global_buf+memory_displacements[sorted_file_offsets[i]];
692 fh->f_num_of_io_entries++;
693 }
694 }
695
696
697 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
698 start_read_time = MPI_Wtime();
699 #endif
700
701 if (fh->f_num_of_io_entries) {
702 if ( 0 > fh->f_fbtl->fbtl_preadv (fh)) {
703 opal_output (1, "READ FAILED\n");
704 ret = OMPI_ERROR;
705 goto exit;
706 }
707 }
708
709 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
710 end_read_time = MPI_Wtime();
711 read_time += end_read_time - start_read_time;
712 #endif
713
714
715
716
717 temp_disp_index = (int *)calloc (1, fh->f_procs_per_group * sizeof (int));
718 if (NULL == temp_disp_index) {
719 opal_output (1, "OUT OF MEMORY\n");
720 ret = OMPI_ERR_OUT_OF_RESOURCE;
721 goto exit;
722 }
723 for (i=0; i<entries_per_aggregator; i++){
724 temp_index =
725 file_offsets_for_agg[sorted_file_offsets[i]].process_id;
726 displs_per_process[temp_index][temp_disp_index[temp_index]] =
727 memory_displacements[sorted_file_offsets[i]];
728 if (temp_disp_index[temp_index] < disp_index[temp_index]){
729 temp_disp_index[temp_index] += 1;
730 }
731 else{
732 printf("temp_disp_index[%d]: %d is greater than disp_index[%d]: %d\n",
733 temp_index, temp_disp_index[temp_index],
734 temp_index, disp_index[temp_index]);
735 }
736 }
737 if (NULL != temp_disp_index){
738 free(temp_disp_index);
739 temp_disp_index = NULL;
740 }
741
742 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
743 start_rcomm_time = MPI_Wtime();
744 #endif
745 for (i=0;i<fh->f_procs_per_group;i++){
746 send_req[i] = MPI_REQUEST_NULL;
747 if ( 0 < disp_index[i] ) {
748 ompi_datatype_create_hindexed(disp_index[i],
749 blocklen_per_process[i],
750 displs_per_process[i],
751 MPI_BYTE,
752 &sendtype[i]);
753 ompi_datatype_commit(&sendtype[i]);
754 ret = MCA_PML_CALL (isend(global_buf,
755 1,
756 sendtype[i],
757 fh->f_procs_in_group[i],
758 123,
759 MCA_PML_BASE_SEND_STANDARD,
760 fh->f_comm,
761 &send_req[i]));
762 if(OMPI_SUCCESS != ret){
763 goto exit;
764 }
765 }
766 }
767 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
768 end_rcomm_time = MPI_Wtime();
769 rcomm_time += end_rcomm_time - start_rcomm_time;
770 #endif
771 }
772
773
774
775
776 if ( recvbuf_is_contiguous ) {
777 receive_buf = &((char*)buf)[position];
778 }
779 else if (bytes_received) {
780
781
782
783 receive_buf = malloc (bytes_received);
784 if (NULL == receive_buf) {
785 opal_output (1, "OUT OF MEMORY\n");
786 ret = OMPI_ERR_OUT_OF_RESOURCE;
787 goto exit;
788 }
789 }
790
791 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
792 start_rcomm_time = MPI_Wtime();
793 #endif
794 ret = MCA_PML_CALL(irecv(receive_buf,
795 bytes_received,
796 MPI_BYTE,
797 my_aggregator,
798 123,
799 fh->f_comm,
800 &recv_req));
801 if (OMPI_SUCCESS != ret){
802 goto exit;
803 }
804
805
806 if (my_aggregator == fh->f_rank){
807 ret = ompi_request_wait_all (fh->f_procs_per_group,
808 send_req,
809 MPI_STATUS_IGNORE);
810 if (OMPI_SUCCESS != ret){
811 goto exit;
812 }
813 }
814
815 ret = ompi_request_wait (&recv_req, MPI_STATUS_IGNORE);
816 if (OMPI_SUCCESS != ret){
817 goto exit;
818 }
819 position += bytes_received;
820
821
822
823 if (!recvbuf_is_contiguous ) {
824 ptrdiff_t mem_address;
825 size_t remaining = 0;
826 size_t temp_position = 0;
827
828 remaining = bytes_received;
829
830 while (remaining) {
831 mem_address = (ptrdiff_t)
832 (decoded_iov[iov_index].iov_base) + current_position;
833
834 if (remaining >=
835 (decoded_iov[iov_index].iov_len - current_position)) {
836 memcpy ((IOVBASE_TYPE *) mem_address,
837 receive_buf+temp_position,
838 decoded_iov[iov_index].iov_len - current_position);
839 remaining = remaining -
840 (decoded_iov[iov_index].iov_len - current_position);
841 temp_position = temp_position +
842 (decoded_iov[iov_index].iov_len - current_position);
843 iov_index = iov_index + 1;
844 current_position = 0;
845 }
846 else {
847 memcpy ((IOVBASE_TYPE *) mem_address,
848 receive_buf+temp_position,
849 remaining);
850 current_position = current_position + remaining;
851 remaining = 0;
852 }
853 }
854
855 if (NULL != receive_buf) {
856 free (receive_buf);
857 receive_buf = NULL;
858 }
859 }
860 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
861 end_rcomm_time = MPI_Wtime();
862 rcomm_time += end_rcomm_time - start_rcomm_time;
863 #endif
864 }
865
866 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
867 end_rexch = MPI_Wtime();
868 read_exch += end_rexch - start_rexch;
869 nentry.time[0] = read_time;
870 nentry.time[1] = rcomm_time;
871 nentry.time[2] = read_exch;
872 if (my_aggregator == fh->f_rank)
873 nentry.aggregator = 1;
874 else
875 nentry.aggregator = 0;
876 nentry.nprocs_for_coll = vulcan_num_io_procs;
877 if (!mca_common_ompio_full_print_queue(fh->f_coll_read_time)){
878 mca_common_ompio_register_print_entry(fh->f_coll_read_time,
879 nentry);
880 }
881 #endif
882
883 exit:
884 if (!recvbuf_is_contiguous) {
885 if (NULL != receive_buf) {
886 free (receive_buf);
887 receive_buf = NULL;
888 }
889 }
890 if (NULL != global_buf) {
891 free (global_buf);
892 global_buf = NULL;
893 }
894 if (NULL != sorted) {
895 free (sorted);
896 sorted = NULL;
897 }
898 if (NULL != global_iov_array) {
899 free (global_iov_array);
900 global_iov_array = NULL;
901 }
902 if (NULL != fview_count) {
903 free (fview_count);
904 fview_count = NULL;
905 }
906 if (NULL != decoded_iov) {
907 free (decoded_iov);
908 decoded_iov = NULL;
909 }
910 if (NULL != local_iov_array){
911 free(local_iov_array);
912 local_iov_array=NULL;
913 }
914
915 if (NULL != displs) {
916 free (displs);
917 displs = NULL;
918 }
919 if (my_aggregator == fh->f_rank) {
920
921 if (NULL != sorted_file_offsets){
922 free(sorted_file_offsets);
923 sorted_file_offsets = NULL;
924 }
925 if (NULL != file_offsets_for_agg){
926 free(file_offsets_for_agg);
927 file_offsets_for_agg = NULL;
928 }
929 if (NULL != memory_displacements){
930 free(memory_displacements);
931 memory_displacements= NULL;
932 }
933 if (NULL != sendtype){
934 for (i = 0; i < fh->f_procs_per_group; i++) {
935 if ( MPI_DATATYPE_NULL != sendtype[i] ) {
936 ompi_datatype_destroy(&sendtype[i]);
937 }
938 }
939 free(sendtype);
940 sendtype=NULL;
941 }
942
943 if (NULL != disp_index){
944 free(disp_index);
945 disp_index = NULL;
946 }
947
948 if ( NULL != blocklen_per_process){
949 for(l=0;l<fh->f_procs_per_group;l++){
950 if (NULL != blocklen_per_process[l]){
951 free(blocklen_per_process[l]);
952 blocklen_per_process[l] = NULL;
953 }
954 }
955
956 free(blocklen_per_process);
957 blocklen_per_process = NULL;
958 }
959
960 if (NULL != displs_per_process){
961 for (l=0; i<fh->f_procs_per_group; l++){
962 if (NULL != displs_per_process[l]){
963 free(displs_per_process[l]);
964 displs_per_process[l] = NULL;
965 }
966 }
967 free(displs_per_process);
968 displs_per_process = NULL;
969 }
970 if ( NULL != send_req ) {
971 free ( send_req );
972 send_req = NULL;
973 }
974 }
975 return ret;
976 }
977
978
979 static int read_heap_sort (mca_io_ompio_local_io_array *io_array,
980 int num_entries,
981 int *sorted)
982 {
983 int i = 0;
984 int j = 0;
985 int left = 0;
986 int right = 0;
987 int largest = 0;
988 int heap_size = num_entries - 1;
989 int temp = 0;
990 unsigned char done = 0;
991 int* temp_arr = NULL;
992
993 temp_arr = (int*)malloc(num_entries*sizeof(int));
994 if (NULL == temp_arr) {
995 opal_output (1, "OUT OF MEMORY\n");
996 return OMPI_ERR_OUT_OF_RESOURCE;
997 }
998 temp_arr[0] = 0;
999 for (i = 1; i < num_entries; ++i) {
1000 temp_arr[i] = i;
1001 }
1002
1003 for (i = num_entries/2-1 ; i>=0 ; i--) {
1004 done = 0;
1005 j = i;
1006 largest = j;
1007
1008 while (!done) {
1009 left = j*2+1;
1010 right = j*2+2;
1011 if ((left <= heap_size) &&
1012 (io_array[temp_arr[left]].offset > io_array[temp_arr[j]].offset)) {
1013 largest = left;
1014 }
1015 else {
1016 largest = j;
1017 }
1018 if ((right <= heap_size) &&
1019 (io_array[temp_arr[right]].offset >
1020 io_array[temp_arr[largest]].offset)) {
1021 largest = right;
1022 }
1023 if (largest != j) {
1024 temp = temp_arr[largest];
1025 temp_arr[largest] = temp_arr[j];
1026 temp_arr[j] = temp;
1027 j = largest;
1028 }
1029 else {
1030 done = 1;
1031 }
1032 }
1033 }
1034
1035 for (i = num_entries-1; i >=1; --i) {
1036 temp = temp_arr[0];
1037 temp_arr[0] = temp_arr[i];
1038 temp_arr[i] = temp;
1039 heap_size--;
1040 done = 0;
1041 j = 0;
1042 largest = j;
1043
1044 while (!done) {
1045 left = j*2+1;
1046 right = j*2+2;
1047
1048 if ((left <= heap_size) &&
1049 (io_array[temp_arr[left]].offset >
1050 io_array[temp_arr[j]].offset)) {
1051 largest = left;
1052 }
1053 else {
1054 largest = j;
1055 }
1056 if ((right <= heap_size) &&
1057 (io_array[temp_arr[right]].offset >
1058 io_array[temp_arr[largest]].offset)) {
1059 largest = right;
1060 }
1061 if (largest != j) {
1062 temp = temp_arr[largest];
1063 temp_arr[largest] = temp_arr[j];
1064 temp_arr[j] = temp;
1065 j = largest;
1066 }
1067 else {
1068 done = 1;
1069 }
1070 }
1071 sorted[i] = temp_arr[i];
1072 }
1073 sorted[0] = temp_arr[0];
1074
1075 if (NULL != temp_arr) {
1076 free(temp_arr);
1077 temp_arr = NULL;
1078 }
1079 return OMPI_SUCCESS;
1080 }
1081
1082
1083