This source file includes following definitions.
- mca_fcoll_dynamic_gen2_file_write_all
- write_init
- shuffle_init
- mca_fcoll_dynamic_gen2_break_file_view
- mca_fcoll_dynamic_gen2_get_configuration
- mca_fcoll_dynamic_gen2_split_iov_array
- local_heap_sort
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 #include "ompi_config.h"
25 #include "fcoll_dynamic_gen2.h"
26
27 #include "mpi.h"
28 #include "ompi/constants.h"
29 #include "ompi/mca/fcoll/fcoll.h"
30 #include "ompi/mca/fcoll/base/fcoll_base_coll_array.h"
31 #include "ompi/mca/common/ompio/common_ompio.h"
32 #include "ompi/mca/io/io.h"
33 #include "math.h"
34 #include "ompi/mca/pml/pml.h"
35 #include <unistd.h>
36
37
38 #define DEBUG_ON 0
39 #define FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG 123
40 #define INIT_LEN 10
41
42
43 typedef struct mca_io_ompio_local_io_array{
44 OMPI_MPI_OFFSET_TYPE offset;
45 MPI_Aint length;
46 int process_id;
47 }mca_io_ompio_local_io_array;
48
49 typedef struct mca_io_ompio_aggregator_data {
50 int *disp_index, *sorted, *fview_count, n;
51 int *max_disp_index;
52 int **blocklen_per_process;
53 MPI_Aint **displs_per_process, total_bytes, bytes_per_cycle, total_bytes_written;
54 MPI_Comm comm;
55 char *buf, *global_buf, *prev_global_buf;
56 ompi_datatype_t **recvtype, **prev_recvtype;
57 struct iovec *global_iov_array;
58 int current_index, current_position;
59 int bytes_to_write_in_cycle, bytes_remaining, procs_per_group;
60 int *procs_in_group, iov_index;
61 int bytes_sent, prev_bytes_sent;
62 struct iovec *decoded_iov;
63 int bytes_to_write, prev_bytes_to_write;
64 mca_common_ompio_io_array_t *io_array, *prev_io_array;
65 int num_io_entries, prev_num_io_entries;
66 } mca_io_ompio_aggregator_data;
67
68
69 #define SWAP_REQUESTS(_r1,_r2) { \
70 ompi_request_t **_t=_r1; \
71 _r1=_r2; \
72 _r2=_t;}
73
74 #define SWAP_AGGR_POINTERS(_aggr,_num) { \
75 int _i; \
76 char *_t; \
77 for (_i=0; _i<_num; _i++ ) { \
78 _aggr[_i]->prev_io_array=_aggr[_i]->io_array; \
79 _aggr[_i]->prev_num_io_entries=_aggr[_i]->num_io_entries; \
80 _aggr[_i]->prev_bytes_sent=_aggr[_i]->bytes_sent; \
81 _aggr[_i]->prev_bytes_to_write=_aggr[_i]->bytes_to_write; \
82 _t=_aggr[_i]->prev_global_buf; \
83 _aggr[_i]->prev_global_buf=_aggr[_i]->global_buf; \
84 _aggr[_i]->global_buf=_t; \
85 _t=(char *)_aggr[_i]->recvtype; \
86 _aggr[_i]->recvtype=_aggr[_i]->prev_recvtype; \
87 _aggr[_i]->prev_recvtype=(ompi_datatype_t **)_t; } \
88 }
89
90
91
92 static int shuffle_init ( int index, int cycles, int aggregator, int rank,
93 mca_io_ompio_aggregator_data *data,
94 ompi_request_t **reqs );
95 static int write_init (ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data, int write_chunksize );
96
97 int mca_fcoll_dynamic_gen2_break_file_view ( struct iovec *decoded_iov, int iov_count,
98 struct iovec *local_iov_array, int local_count,
99 struct iovec ***broken_decoded_iovs, int **broken_iov_counts,
100 struct iovec ***broken_iov_arrays, int **broken_counts,
101 MPI_Aint **broken_total_lengths,
102 int stripe_count, int stripe_size);
103
104
105 int mca_fcoll_dynamic_gen2_get_configuration (ompio_file_t *fh, int *dynamic_gen2_num_io_procs,
106 int **ret_aggregators);
107
108
109 static int local_heap_sort (mca_io_ompio_local_io_array *io_array,
110 int num_entries,
111 int *sorted);
112
113 int mca_fcoll_dynamic_gen2_split_iov_array ( ompio_file_t *fh, mca_common_ompio_io_array_t *work_array,
114 int num_entries, int *last_array_pos, int *last_pos_in_field,
115 int chunk_size );
116
117
118 int mca_fcoll_dynamic_gen2_file_write_all (ompio_file_t *fh,
119 const void *buf,
120 int count,
121 struct ompi_datatype_t *datatype,
122 ompi_status_public_t *status)
123 {
124 int index = 0;
125 int cycles = 0;
126 int ret =0, l, i, j, bytes_per_cycle;
127 uint32_t iov_count = 0;
128 struct iovec *decoded_iov = NULL;
129 struct iovec *local_iov_array=NULL;
130 uint32_t total_fview_count = 0;
131 int local_count = 0;
132 ompi_request_t **reqs1=NULL,**reqs2=NULL;
133 ompi_request_t **curr_reqs=NULL,**prev_reqs=NULL;
134 mca_io_ompio_aggregator_data **aggr_data=NULL;
135
136 int *displs = NULL;
137 int dynamic_gen2_num_io_procs;
138 size_t max_data = 0;
139 MPI_Aint *total_bytes_per_process = NULL;
140
141 struct iovec **broken_iov_arrays=NULL;
142 struct iovec **broken_decoded_iovs=NULL;
143 int *broken_counts=NULL;
144 int *broken_iov_counts=NULL;
145 MPI_Aint *broken_total_lengths=NULL;
146
147 int *aggregators=NULL;
148 int write_chunksize, *result_counts=NULL;
149
150
151 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
152 double write_time = 0.0, start_write_time = 0.0, end_write_time = 0.0;
153 double comm_time = 0.0, start_comm_time = 0.0, end_comm_time = 0.0;
154 double exch_write = 0.0, start_exch = 0.0, end_exch = 0.0;
155 mca_common_ompio_print_entry nentry;
156 #endif
157
158
159
160
161
162 bytes_per_cycle = fh->f_bytes_per_agg;
163
164
165
166 bytes_per_cycle =bytes_per_cycle/2;
167
168 ret = mca_common_ompio_decode_datatype ((struct ompio_file_t *) fh,
169 datatype,
170 count,
171 buf,
172 &max_data,
173 fh->f_mem_convertor,
174 &decoded_iov,
175 &iov_count);
176 if (OMPI_SUCCESS != ret ){
177 goto exit;
178 }
179
180 if ( MPI_STATUS_IGNORE != status ) {
181 status->_ucount = max_data;
182 }
183
184
185
186
187
188 if ( fh->f_stripe_count > 1 ) {
189 dynamic_gen2_num_io_procs = fh->f_stripe_count;
190 }
191 else {
192 dynamic_gen2_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators"));
193 if ( OMPI_ERR_MAX == dynamic_gen2_num_io_procs ) {
194 ret = OMPI_ERROR;
195 goto exit;
196 }
197 }
198
199
200 if ( fh->f_stripe_size == 0 ) {
201
202 fh->f_stripe_size = 65536;
203 }
204 if ( -1 == mca_fcoll_dynamic_gen2_write_chunksize ) {
205 write_chunksize = fh->f_stripe_size;
206 }
207 else {
208 write_chunksize = mca_fcoll_dynamic_gen2_write_chunksize;
209 }
210
211
212 ret = mca_fcoll_dynamic_gen2_get_configuration (fh, &dynamic_gen2_num_io_procs, &aggregators);
213 if (OMPI_SUCCESS != ret){
214 goto exit;
215 }
216
217 aggr_data = (mca_io_ompio_aggregator_data **) malloc ( dynamic_gen2_num_io_procs *
218 sizeof(mca_io_ompio_aggregator_data*));
219
220 for ( i=0; i< dynamic_gen2_num_io_procs; i++ ) {
221
222
223
224 aggr_data[i] = (mca_io_ompio_aggregator_data *) calloc ( 1, sizeof(mca_io_ompio_aggregator_data));
225 aggr_data[i]->procs_per_group = fh->f_procs_per_group;
226 aggr_data[i]->procs_in_group = fh->f_procs_in_group;
227 aggr_data[i]->comm = fh->f_comm;
228 aggr_data[i]->buf = (char *)buf;
229 }
230
231
232
233
234
235 ret = fh->f_generate_current_file_view( (struct ompio_file_t *) fh,
236 max_data,
237 &local_iov_array,
238 &local_count);
239 if (ret != OMPI_SUCCESS){
240 goto exit;
241 }
242
243
244
245
246
247
248 ret = mca_fcoll_dynamic_gen2_break_file_view ( decoded_iov, iov_count,
249 local_iov_array, local_count,
250 &broken_decoded_iovs, &broken_iov_counts,
251 &broken_iov_arrays, &broken_counts,
252 &broken_total_lengths,
253 dynamic_gen2_num_io_procs, fh->f_stripe_size);
254
255
256
257
258
259 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
260 start_comm_time = MPI_Wtime();
261 #endif
262 if ( 1 == mca_fcoll_dynamic_gen2_num_groups ) {
263 ret = fh->f_comm->c_coll->coll_allreduce (MPI_IN_PLACE,
264 broken_total_lengths,
265 dynamic_gen2_num_io_procs,
266 MPI_LONG,
267 MPI_SUM,
268 fh->f_comm,
269 fh->f_comm->c_coll->coll_allreduce_module);
270 if( OMPI_SUCCESS != ret){
271 goto exit;
272 }
273 }
274 else {
275 total_bytes_per_process = (MPI_Aint*)malloc
276 (dynamic_gen2_num_io_procs * fh->f_procs_per_group*sizeof(MPI_Aint));
277 if (NULL == total_bytes_per_process) {
278 opal_output (1, "OUT OF MEMORY\n");
279 ret = OMPI_ERR_OUT_OF_RESOURCE;
280 goto exit;
281 }
282
283 ret = ompi_fcoll_base_coll_allgather_array (broken_total_lengths,
284 dynamic_gen2_num_io_procs,
285 MPI_LONG,
286 total_bytes_per_process,
287 dynamic_gen2_num_io_procs,
288 MPI_LONG,
289 0,
290 fh->f_procs_in_group,
291 fh->f_procs_per_group,
292 fh->f_comm);
293 if( OMPI_SUCCESS != ret){
294 goto exit;
295 }
296 for ( i=0; i<dynamic_gen2_num_io_procs; i++ ) {
297 broken_total_lengths[i] = 0;
298 for (j=0 ; j<fh->f_procs_per_group ; j++) {
299 broken_total_lengths[i] += total_bytes_per_process[j*dynamic_gen2_num_io_procs + i];
300 }
301 }
302 if (NULL != total_bytes_per_process) {
303 free (total_bytes_per_process);
304 total_bytes_per_process = NULL;
305 }
306 }
307 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
308 end_comm_time = MPI_Wtime();
309 comm_time += (end_comm_time - start_comm_time);
310 #endif
311
312 cycles=0;
313 for ( i=0; i<dynamic_gen2_num_io_procs; i++ ) {
314 #if DEBUG_ON
315 printf("%d: Overall broken_total_lengths[%d] = %ld\n", fh->f_rank, i, broken_total_lengths[i]);
316 #endif
317 if ( ceil((double)broken_total_lengths[i]/bytes_per_cycle) > cycles ) {
318 cycles = ceil((double)broken_total_lengths[i]/bytes_per_cycle);
319 }
320 }
321
322
323 result_counts = (int *) malloc ( dynamic_gen2_num_io_procs * fh->f_procs_per_group * sizeof(int) );
324 if ( NULL == result_counts ) {
325 ret = OMPI_ERR_OUT_OF_RESOURCE;
326 goto exit;
327 }
328
329 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
330 start_comm_time = MPI_Wtime();
331 #endif
332 if ( 1 == mca_fcoll_dynamic_gen2_num_groups ) {
333 ret = fh->f_comm->c_coll->coll_allgather(broken_counts,
334 dynamic_gen2_num_io_procs,
335 MPI_INT,
336 result_counts,
337 dynamic_gen2_num_io_procs,
338 MPI_INT,
339 fh->f_comm,
340 fh->f_comm->c_coll->coll_allgather_module);
341 }
342 else {
343 ret = ompi_fcoll_base_coll_allgather_array (broken_counts,
344 dynamic_gen2_num_io_procs,
345 MPI_INT,
346 result_counts,
347 dynamic_gen2_num_io_procs,
348 MPI_INT,
349 0,
350 fh->f_procs_in_group,
351 fh->f_procs_per_group,
352 fh->f_comm);
353 }
354 if( OMPI_SUCCESS != ret){
355 goto exit;
356 }
357 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
358 end_comm_time = MPI_Wtime();
359 comm_time += (end_comm_time - start_comm_time);
360 #endif
361
362
363
364
365 for ( i=0; i< dynamic_gen2_num_io_procs; i++ ) {
366 aggr_data[i]->total_bytes = broken_total_lengths[i];
367 aggr_data[i]->decoded_iov = broken_decoded_iovs[i];
368 aggr_data[i]->fview_count = (int *) malloc (fh->f_procs_per_group * sizeof (int));
369 if (NULL == aggr_data[i]->fview_count) {
370 opal_output (1, "OUT OF MEMORY\n");
371 ret = OMPI_ERR_OUT_OF_RESOURCE;
372 goto exit;
373 }
374 for ( j=0; j <fh->f_procs_per_group; j++ ) {
375 aggr_data[i]->fview_count[j] = result_counts[dynamic_gen2_num_io_procs*j+i];
376 }
377 displs = (int*) malloc (fh->f_procs_per_group * sizeof (int));
378 if (NULL == displs) {
379 opal_output (1, "OUT OF MEMORY\n");
380 ret = OMPI_ERR_OUT_OF_RESOURCE;
381 goto exit;
382 }
383
384 displs[0] = 0;
385 total_fview_count = aggr_data[i]->fview_count[0];
386 for (j=1 ; j<fh->f_procs_per_group ; j++) {
387 total_fview_count += aggr_data[i]->fview_count[j];
388 displs[j] = displs[j-1] + aggr_data[i]->fview_count[j-1];
389 }
390
391 #if DEBUG_ON
392 printf("total_fview_count : %d\n", total_fview_count);
393 if (aggregators[i] == fh->f_rank) {
394 for (j=0 ; j<fh->f_procs_per_group ; i++) {
395 printf ("%d: PROCESS: %d ELEMENTS: %d DISPLS: %d\n",
396 fh->f_rank,
397 j,
398 aggr_data[i]->fview_count[j],
399 displs[j]);
400 }
401 }
402 #endif
403
404
405 if (0 != total_fview_count) {
406 aggr_data[i]->global_iov_array = (struct iovec*) malloc (total_fview_count *
407 sizeof(struct iovec));
408 if (NULL == aggr_data[i]->global_iov_array){
409 opal_output(1, "OUT OF MEMORY\n");
410 ret = OMPI_ERR_OUT_OF_RESOURCE;
411 goto exit;
412 }
413 }
414
415 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
416 start_comm_time = MPI_Wtime();
417 #endif
418 if ( 1 == mca_fcoll_dynamic_gen2_num_groups ) {
419 ret = fh->f_comm->c_coll->coll_allgatherv (broken_iov_arrays[i],
420 broken_counts[i],
421 fh->f_iov_type,
422 aggr_data[i]->global_iov_array,
423 aggr_data[i]->fview_count,
424 displs,
425 fh->f_iov_type,
426 fh->f_comm,
427 fh->f_comm->c_coll->coll_allgatherv_module );
428 }
429 else {
430 ret = ompi_fcoll_base_coll_allgatherv_array (broken_iov_arrays[i],
431 broken_counts[i],
432 fh->f_iov_type,
433 aggr_data[i]->global_iov_array,
434 aggr_data[i]->fview_count,
435 displs,
436 fh->f_iov_type,
437 aggregators[i],
438 fh->f_procs_in_group,
439 fh->f_procs_per_group,
440 fh->f_comm);
441 }
442 if (OMPI_SUCCESS != ret){
443 goto exit;
444 }
445 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
446 end_comm_time = MPI_Wtime();
447 comm_time += (end_comm_time - start_comm_time);
448 #endif
449
450
451
452
453
454
455
456
457
458 if (0 != total_fview_count) {
459 aggr_data[i]->sorted = (int *)malloc (total_fview_count * sizeof(int));
460 if (NULL == aggr_data[i]->sorted) {
461 opal_output (1, "OUT OF MEMORY\n");
462 ret = OMPI_ERR_OUT_OF_RESOURCE;
463 goto exit;
464 }
465 ompi_fcoll_base_sort_iovec (aggr_data[i]->global_iov_array, total_fview_count, aggr_data[i]->sorted);
466 }
467
468 if (NULL != local_iov_array){
469 free(local_iov_array);
470 local_iov_array = NULL;
471 }
472
473 if (NULL != displs){
474 free(displs);
475 displs=NULL;
476 }
477
478
479 #if DEBUG_ON
480 if (aggregators[i] == fh->f_rank) {
481 uint32_t tv=0;
482 for (tv=0 ; tv<total_fview_count ; tv++) {
483 printf("%d: OFFSET: %lld LENGTH: %ld\n",
484 fh->f_rank,
485 aggr_data[i]->global_iov_array[aggr_data[i]->sorted[tv]].iov_base,
486 aggr_data[i]->global_iov_array[aggr_data[i]->sorted[tv]].iov_len);
487 }
488 }
489 #endif
490
491
492
493
494
495 aggr_data[i]->bytes_per_cycle = bytes_per_cycle;
496
497 if (aggregators[i] == fh->f_rank) {
498 aggr_data[i]->disp_index = (int *)malloc (fh->f_procs_per_group * sizeof (int));
499 if (NULL == aggr_data[i]->disp_index) {
500 opal_output (1, "OUT OF MEMORY\n");
501 ret = OMPI_ERR_OUT_OF_RESOURCE;
502 goto exit;
503 }
504
505 aggr_data[i]->max_disp_index = (int *)calloc (fh->f_procs_per_group, sizeof (int));
506 if (NULL == aggr_data[i]->max_disp_index) {
507 opal_output (1, "OUT OF MEMORY\n");
508 ret = OMPI_ERR_OUT_OF_RESOURCE;
509 goto exit;
510 }
511
512 aggr_data[i]->blocklen_per_process = (int **)calloc (fh->f_procs_per_group, sizeof (int*));
513 if (NULL == aggr_data[i]->blocklen_per_process) {
514 opal_output (1, "OUT OF MEMORY\n");
515 ret = OMPI_ERR_OUT_OF_RESOURCE;
516 goto exit;
517 }
518
519 aggr_data[i]->displs_per_process = (MPI_Aint **)calloc (fh->f_procs_per_group, sizeof (MPI_Aint*));
520 if (NULL == aggr_data[i]->displs_per_process) {
521 opal_output (1, "OUT OF MEMORY\n");
522 ret = OMPI_ERR_OUT_OF_RESOURCE;
523 goto exit;
524 }
525
526
527 aggr_data[i]->global_buf = (char *) malloc (bytes_per_cycle);
528 aggr_data[i]->prev_global_buf = (char *) malloc (bytes_per_cycle);
529 if (NULL == aggr_data[i]->global_buf || NULL == aggr_data[i]->prev_global_buf){
530 opal_output(1, "OUT OF MEMORY");
531 ret = OMPI_ERR_OUT_OF_RESOURCE;
532 goto exit;
533 }
534
535 aggr_data[i]->recvtype = (ompi_datatype_t **) malloc (fh->f_procs_per_group *
536 sizeof(ompi_datatype_t *));
537 aggr_data[i]->prev_recvtype = (ompi_datatype_t **) malloc (fh->f_procs_per_group *
538 sizeof(ompi_datatype_t *));
539 if (NULL == aggr_data[i]->recvtype || NULL == aggr_data[i]->prev_recvtype) {
540 opal_output (1, "OUT OF MEMORY\n");
541 ret = OMPI_ERR_OUT_OF_RESOURCE;
542 goto exit;
543 }
544 for(l=0;l<fh->f_procs_per_group;l++){
545 aggr_data[i]->recvtype[l] = MPI_DATATYPE_NULL;
546 aggr_data[i]->prev_recvtype[l] = MPI_DATATYPE_NULL;
547 }
548 }
549
550 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
551 start_exch = MPI_Wtime();
552 #endif
553 }
554
555 reqs1 = (ompi_request_t **)malloc ((fh->f_procs_per_group + 1 )*dynamic_gen2_num_io_procs *sizeof(ompi_request_t *));
556 reqs2 = (ompi_request_t **)malloc ((fh->f_procs_per_group + 1 )*dynamic_gen2_num_io_procs *sizeof(ompi_request_t *));
557 if ( NULL == reqs1 || NULL == reqs2 ) {
558 opal_output (1, "OUT OF MEMORY\n");
559 ret = OMPI_ERR_OUT_OF_RESOURCE;
560 goto exit;
561 }
562 for (l=0,i=0; i < dynamic_gen2_num_io_procs; i++ ) {
563 for ( j=0; j< (fh->f_procs_per_group+1); j++ ) {
564 reqs1[l] = MPI_REQUEST_NULL;
565 reqs2[l] = MPI_REQUEST_NULL;
566 l++;
567 }
568 }
569
570 curr_reqs = reqs1;
571 prev_reqs = reqs2;
572
573
574 if ( cycles > 0 ) {
575 for ( i=0; i<dynamic_gen2_num_io_procs; i++ ) {
576 ret = shuffle_init ( 0, cycles, aggregators[i], fh->f_rank, aggr_data[i],
577 &curr_reqs[i*(fh->f_procs_per_group + 1)] );
578 if ( OMPI_SUCCESS != ret ) {
579 goto exit;
580 }
581 }
582 }
583
584
585 for (index = 1; index < cycles; index++) {
586 SWAP_REQUESTS(curr_reqs,prev_reqs);
587 SWAP_AGGR_POINTERS(aggr_data,dynamic_gen2_num_io_procs);
588
589
590 for ( i=0; i<dynamic_gen2_num_io_procs; i++ ) {
591 ret = shuffle_init ( index, cycles, aggregators[i], fh->f_rank, aggr_data[i],
592 &curr_reqs[i*(fh->f_procs_per_group + 1)] );
593 if ( OMPI_SUCCESS != ret ) {
594 goto exit;
595 }
596 }
597
598
599 ret = ompi_request_wait_all ( (fh->f_procs_per_group + 1 )*dynamic_gen2_num_io_procs,
600 prev_reqs, MPI_STATUS_IGNORE);
601 if (OMPI_SUCCESS != ret){
602 goto exit;
603 }
604
605
606
607 for ( i=0; i<dynamic_gen2_num_io_procs; i++ ) {
608 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
609 start_write_time = MPI_Wtime();
610 #endif
611 ret = write_init (fh, aggregators[i], aggr_data[i], write_chunksize );
612 if (OMPI_SUCCESS != ret){
613 goto exit;
614 }
615 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
616 end_write_time = MPI_Wtime();
617 write_time += end_write_time - start_write_time;
618 #endif
619 }
620
621 }
622
623
624
625 if ( cycles > 0 ) {
626 SWAP_REQUESTS(curr_reqs,prev_reqs);
627 SWAP_AGGR_POINTERS(aggr_data,dynamic_gen2_num_io_procs);
628
629 ret = ompi_request_wait_all ( (fh->f_procs_per_group + 1 )*dynamic_gen2_num_io_procs,
630 prev_reqs, MPI_STATUS_IGNORE);
631 if (OMPI_SUCCESS != ret){
632 goto exit;
633 }
634
635
636 for ( i=0; i<dynamic_gen2_num_io_procs; i++ ) {
637 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
638 start_write_time = MPI_Wtime();
639 #endif
640 ret = write_init (fh, aggregators[i], aggr_data[i], write_chunksize );
641 if (OMPI_SUCCESS != ret){
642 goto exit;
643 }
644 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
645 end_write_time = MPI_Wtime();
646 write_time += end_write_time - start_write_time;
647 #endif
648 }
649 }
650
651
652 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
653 end_exch = MPI_Wtime();
654 exch_write += end_exch - start_exch;
655 nentry.time[0] = write_time;
656 nentry.time[1] = comm_time;
657 nentry.time[2] = exch_write;
658 nentry.aggregator = 0;
659 for ( i=0; i<dynamic_gen2_num_io_procs; i++ ) {
660 if (aggregators[i] == fh->f_rank)
661 nentry.aggregator = 1;
662 }
663 nentry.nprocs_for_coll = dynamic_gen2_num_io_procs;
664 if (!mca_common_ompio_full_print_queue(fh->f_coll_write_time)){
665 mca_common_ompio_register_print_entry(fh->f_coll_write_time,
666 nentry);
667 }
668 #endif
669
670
671 exit :
672
673 if ( NULL != aggr_data ) {
674
675 for ( i=0; i< dynamic_gen2_num_io_procs; i++ ) {
676 if (aggregators[i] == fh->f_rank) {
677 if (NULL != aggr_data[i]->recvtype){
678 for (j =0; j< aggr_data[i]->procs_per_group; j++) {
679 if ( MPI_DATATYPE_NULL != aggr_data[i]->recvtype[j] ) {
680 ompi_datatype_destroy(&aggr_data[i]->recvtype[j]);
681 }
682 if ( MPI_DATATYPE_NULL != aggr_data[i]->prev_recvtype[j] ) {
683 ompi_datatype_destroy(&aggr_data[i]->prev_recvtype[j]);
684 }
685
686 }
687 free(aggr_data[i]->recvtype);
688 free(aggr_data[i]->prev_recvtype);
689 }
690
691 free (aggr_data[i]->disp_index);
692 free (aggr_data[i]->max_disp_index);
693 free (aggr_data[i]->global_buf);
694 free (aggr_data[i]->prev_global_buf);
695 for(l=0;l<aggr_data[i]->procs_per_group;l++){
696 free (aggr_data[i]->blocklen_per_process[l]);
697 free (aggr_data[i]->displs_per_process[l]);
698 }
699
700 free (aggr_data[i]->blocklen_per_process);
701 free (aggr_data[i]->displs_per_process);
702 }
703 free (aggr_data[i]->sorted);
704 free (aggr_data[i]->global_iov_array);
705 free (aggr_data[i]->fview_count);
706 free (aggr_data[i]->decoded_iov);
707
708 free (aggr_data[i]);
709 }
710 free (aggr_data);
711 }
712 free(local_iov_array);
713 free(displs);
714 free(decoded_iov);
715 free(broken_counts);
716 free(broken_total_lengths);
717 free(broken_iov_counts);
718 free(broken_decoded_iovs);
719 if ( NULL != broken_iov_arrays ) {
720 for (i=0; i<dynamic_gen2_num_io_procs; i++ ) {
721 free(broken_iov_arrays[i]);
722 }
723 }
724 free(broken_iov_arrays);
725 free(aggregators);
726 free(fh->f_procs_in_group);
727 fh->f_procs_in_group=NULL;
728 fh->f_procs_per_group=0;
729 free(reqs1);
730 free(reqs2);
731 free(result_counts);
732
733
734 return OMPI_SUCCESS;
735 }
736
737
738 static int write_init (ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data, int write_chunksize )
739 {
740 int ret=OMPI_SUCCESS;
741 int last_array_pos=0;
742 int last_pos=0;
743
744
745 if ( aggregator == fh->f_rank && aggr_data->prev_num_io_entries) {
746 while ( aggr_data->prev_bytes_to_write > 0 ) {
747 aggr_data->prev_bytes_to_write -= mca_fcoll_dynamic_gen2_split_iov_array (fh, aggr_data->prev_io_array,
748 aggr_data->prev_num_io_entries,
749 &last_array_pos, &last_pos,
750 write_chunksize );
751 if ( 0 > fh->f_fbtl->fbtl_pwritev (fh)) {
752 free ( aggr_data->prev_io_array);
753 opal_output (1, "dynamic_gen2_write_all: fbtl_pwritev failed\n");
754 ret = OMPI_ERROR;
755 goto exit;
756 }
757 }
758 free ( fh->f_io_array );
759 free ( aggr_data->prev_io_array);
760 }
761
762 exit:
763
764 fh->f_io_array=NULL;
765 fh->f_num_of_io_entries=0;
766
767 return ret;
768 }
769
770 static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_io_ompio_aggregator_data *data,
771 ompi_request_t **reqs )
772 {
773 int bytes_sent = 0;
774 int blocks=0, temp_pindex;
775 int i, j, l, ret;
776 int entries_per_aggregator=0;
777 mca_io_ompio_local_io_array *file_offsets_for_agg=NULL;
778 int *sorted_file_offsets=NULL;
779 int temp_index=0;
780 MPI_Aint *memory_displacements=NULL;
781 int *temp_disp_index=NULL;
782 MPI_Aint global_count = 0;
783 int* blocklength_proc=NULL;
784 ptrdiff_t* displs_proc=NULL;
785
786 data->num_io_entries = 0;
787 data->bytes_sent = 0;
788 data->io_array=NULL;
789
790
791
792 if (aggregator == rank) {
793
794 if (NULL != data->recvtype){
795 for (i =0; i< data->procs_per_group; i++) {
796 if ( MPI_DATATYPE_NULL != data->recvtype[i] ) {
797 ompi_datatype_destroy(&data->recvtype[i]);
798 data->recvtype[i] = MPI_DATATYPE_NULL;
799 }
800 }
801 }
802
803 for(l=0;l<data->procs_per_group;l++){
804 data->disp_index[l] = 1;
805
806 if(data->max_disp_index[l] == 0) {
807 data->blocklen_per_process[l] = (int *) calloc (INIT_LEN, sizeof(int));
808 data->displs_per_process[l] = (MPI_Aint *) calloc (INIT_LEN, sizeof(MPI_Aint));
809 if (NULL == data->displs_per_process[l] || NULL == data->blocklen_per_process[l]){
810 opal_output (1, "OUT OF MEMORY for displs\n");
811 ret = OMPI_ERR_OUT_OF_RESOURCE;
812 goto exit;
813 }
814 data->max_disp_index[l] = INIT_LEN;
815 }
816 else {
817 memset ( data->blocklen_per_process[l], 0, data->max_disp_index[l]*sizeof(int) );
818 memset ( data->displs_per_process[l], 0, data->max_disp_index[l]*sizeof(MPI_Aint) );
819 }
820 }
821 }
822
823
824
825
826 int local_cycles= ceil((double)data->total_bytes / data->bytes_per_cycle);
827 if ( index < (local_cycles -1) ) {
828 data->bytes_to_write_in_cycle = data->bytes_per_cycle;
829 }
830 else if ( index == (local_cycles -1)) {
831 data->bytes_to_write_in_cycle = data->total_bytes - data->bytes_per_cycle*index ;
832 }
833 else {
834 data->bytes_to_write_in_cycle = 0;
835 }
836 data->bytes_to_write = data->bytes_to_write_in_cycle;
837
838 #if DEBUG_ON
839 if (aggregator == rank) {
840 printf ("****%d: CYCLE %d Bytes %lld**********\n",
841 rank,
842 index,
843 data->bytes_to_write_in_cycle);
844 }
845 #endif
846
847
848
849
850 #if DEBUG_ON
851 printf("bytes_to_write_in_cycle: %ld, cycle : %d\n", data->bytes_to_write_in_cycle,
852 index);
853 #endif
854
855
856
857
858
859
860
861 while (data->bytes_to_write_in_cycle) {
862
863
864
865
866 blocks = data->fview_count[0];
867 for (j=0 ; j<data->procs_per_group ; j++) {
868 if (data->sorted[data->current_index] < blocks) {
869 data->n = j;
870 break;
871 }
872 else {
873 blocks += data->fview_count[j+1];
874 }
875 }
876
877 if (data->bytes_remaining) {
878
879
880 if (data->bytes_remaining <= data->bytes_to_write_in_cycle) {
881
882 if (aggregator == rank) {
883 data->blocklen_per_process[data->n][data->disp_index[data->n] - 1] = data->bytes_remaining;
884 data->displs_per_process[data->n][data->disp_index[data->n] - 1] =
885 (ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base +
886 (data->global_iov_array[data->sorted[data->current_index]].iov_len
887 - data->bytes_remaining);
888
889 data->disp_index[data->n] += 1;
890
891
892
893 if ( data->disp_index[data->n] == data->max_disp_index[data->n] ) {
894 data->max_disp_index[data->n] *= 2;
895 data->blocklen_per_process[data->n] = (int *) realloc(
896 (void *)data->blocklen_per_process[data->n],
897 (data->max_disp_index[data->n])*sizeof(int));
898 data->displs_per_process[data->n] = (MPI_Aint *) realloc(
899 (void *)data->displs_per_process[data->n],
900 (data->max_disp_index[data->n])*sizeof(MPI_Aint));
901 }
902
903 data->blocklen_per_process[data->n][data->disp_index[data->n]] = 0;
904 data->displs_per_process[data->n][data->disp_index[data->n]] = 0;
905 }
906 if (data->procs_in_group[data->n] == rank) {
907 bytes_sent += data->bytes_remaining;
908 }
909 data->current_index ++;
910 data->bytes_to_write_in_cycle -= data->bytes_remaining;
911 data->bytes_remaining = 0;
912 }
913 else {
914
915
916 if (aggregator == rank) {
917 data->blocklen_per_process[data->n][data->disp_index[data->n] - 1] = data->bytes_to_write_in_cycle;
918 data->displs_per_process[data->n][data->disp_index[data->n] - 1] =
919 (ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base +
920 (data->global_iov_array[data->sorted[data->current_index]].iov_len
921 - data->bytes_remaining);
922 }
923
924 if (data->procs_in_group[data->n] == rank) {
925 bytes_sent += data->bytes_to_write_in_cycle;
926 }
927 data->bytes_remaining -= data->bytes_to_write_in_cycle;
928 data->bytes_to_write_in_cycle = 0;
929 break;
930 }
931 }
932 else {
933
934 if (data->bytes_to_write_in_cycle <
935 (MPI_Aint) data->global_iov_array[data->sorted[data->current_index]].iov_len) {
936
937 if (aggregator == rank) {
938 data->blocklen_per_process[data->n][data->disp_index[data->n] - 1] = data->bytes_to_write_in_cycle;
939 data->displs_per_process[data->n][data->disp_index[data->n] - 1] =
940 (ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base ;
941 }
942 if (data->procs_in_group[data->n] == rank) {
943 bytes_sent += data->bytes_to_write_in_cycle;
944
945 }
946 data->bytes_remaining = data->global_iov_array[data->sorted[data->current_index]].iov_len -
947 data->bytes_to_write_in_cycle;
948 data->bytes_to_write_in_cycle = 0;
949 break;
950 }
951 else {
952
953 if (aggregator == rank) {
954 data->blocklen_per_process[data->n][data->disp_index[data->n] - 1] =
955 data->global_iov_array[data->sorted[data->current_index]].iov_len;
956 data->displs_per_process[data->n][data->disp_index[data->n] - 1] = (ptrdiff_t)
957 data->global_iov_array[data->sorted[data->current_index]].iov_base;
958
959 data->disp_index[data->n] += 1;
960
961
962
963
964 if ( data->disp_index[data->n] == data->max_disp_index[data->n] ) {
965 data->max_disp_index[data->n] *= 2;
966 data->blocklen_per_process[data->n] = (int *) realloc(
967 (void *)data->blocklen_per_process[data->n],
968 (data->max_disp_index[data->n]*sizeof(int)));
969 data->displs_per_process[data->n] = (MPI_Aint *)realloc(
970 (void *)data->displs_per_process[data->n],
971 (data->max_disp_index[data->n]*sizeof(MPI_Aint)));
972 }
973 data->blocklen_per_process[data->n][data->disp_index[data->n]] = 0;
974 data->displs_per_process[data->n][data->disp_index[data->n]] = 0;
975 }
976 if (data->procs_in_group[data->n] == rank) {
977 bytes_sent += data->global_iov_array[data->sorted[data->current_index]].iov_len;
978 }
979 data->bytes_to_write_in_cycle -=
980 data->global_iov_array[data->sorted[data->current_index]].iov_len;
981 data->current_index ++;
982 }
983 }
984 }
985
986
987
988
989
990
991 if (aggregator == rank) {
992 entries_per_aggregator=0;
993 for (i=0;i<data->procs_per_group; i++){
994 for (j=0;j<data->disp_index[i];j++){
995 if (data->blocklen_per_process[i][j] > 0)
996 entries_per_aggregator++ ;
997 }
998 }
999
1000 #if DEBUG_ON
1001 printf("%d: cycle: %d, bytes_sent: %d\n ",rank,index,
1002 bytes_sent);
1003 printf("%d : Entries per aggregator : %d\n",rank,entries_per_aggregator);
1004 #endif
1005
1006 if (entries_per_aggregator > 0){
1007 file_offsets_for_agg = (mca_io_ompio_local_io_array *)
1008 malloc(entries_per_aggregator*sizeof(mca_io_ompio_local_io_array));
1009 if (NULL == file_offsets_for_agg) {
1010 opal_output (1, "OUT OF MEMORY\n");
1011 ret = OMPI_ERR_OUT_OF_RESOURCE;
1012 goto exit;
1013 }
1014
1015 sorted_file_offsets = (int *)
1016 malloc (entries_per_aggregator*sizeof(int));
1017 if (NULL == sorted_file_offsets){
1018 opal_output (1, "OUT OF MEMORY\n");
1019 ret = OMPI_ERR_OUT_OF_RESOURCE;
1020 goto exit;
1021 }
1022
1023
1024 temp_index = 0;
1025
1026 for (i=0;i<data->procs_per_group; i++){
1027 for(j=0;j<data->disp_index[i];j++){
1028 if (data->blocklen_per_process[i][j] > 0){
1029 file_offsets_for_agg[temp_index].length =
1030 data->blocklen_per_process[i][j];
1031 file_offsets_for_agg[temp_index].process_id = i;
1032 file_offsets_for_agg[temp_index].offset =
1033 data->displs_per_process[i][j];
1034 temp_index++;
1035
1036 #if DEBUG_ON
1037 printf("************Cycle: %d, Aggregator: %d ***************\n",
1038 index+1,rank);
1039
1040 printf("%d sends blocklen[%d]: %d, disp[%d]: %ld to %d\n",
1041 data->procs_in_group[i],j,
1042 data->blocklen_per_process[i][j],j,
1043 data->displs_per_process[i][j],
1044 rank);
1045 #endif
1046 }
1047 }
1048 }
1049
1050
1051 local_heap_sort (file_offsets_for_agg,
1052 entries_per_aggregator,
1053 sorted_file_offsets);
1054
1055
1056
1057
1058
1059 memory_displacements = (MPI_Aint *) malloc
1060 (entries_per_aggregator * sizeof(MPI_Aint));
1061
1062 memory_displacements[sorted_file_offsets[0]] = 0;
1063 for (i=1; i<entries_per_aggregator; i++){
1064 memory_displacements[sorted_file_offsets[i]] =
1065 memory_displacements[sorted_file_offsets[i-1]] +
1066 file_offsets_for_agg[sorted_file_offsets[i-1]].length;
1067 }
1068
1069 temp_disp_index = (int *)calloc (1, data->procs_per_group * sizeof (int));
1070 if (NULL == temp_disp_index) {
1071 opal_output (1, "OUT OF MEMORY\n");
1072 ret = OMPI_ERR_OUT_OF_RESOURCE;
1073 goto exit;
1074 }
1075
1076
1077 global_count = 0;
1078 for (i=0;i<entries_per_aggregator;i++){
1079 temp_pindex =
1080 file_offsets_for_agg[sorted_file_offsets[i]].process_id;
1081 data->displs_per_process[temp_pindex][temp_disp_index[temp_pindex]] =
1082 memory_displacements[sorted_file_offsets[i]];
1083 if (temp_disp_index[temp_pindex] < data->disp_index[temp_pindex])
1084 temp_disp_index[temp_pindex] += 1;
1085 else{
1086 printf("temp_disp_index[%d]: %d is greater than disp_index[%d]: %d\n",
1087 temp_pindex, temp_disp_index[temp_pindex],
1088 temp_pindex, data->disp_index[temp_pindex]);
1089 }
1090 global_count +=
1091 file_offsets_for_agg[sorted_file_offsets[i]].length;
1092 }
1093
1094 if (NULL != temp_disp_index){
1095 free(temp_disp_index);
1096 temp_disp_index = NULL;
1097 }
1098
1099 #if DEBUG_ON
1100
1101 printf("************Cycle: %d, Aggregator: %d ***************\n",
1102 index+1,rank);
1103 for (i=0;i<data->procs_per_group; i++){
1104 for(j=0;j<data->disp_index[i];j++){
1105 if (data->blocklen_per_process[i][j] > 0){
1106 printf("%d sends blocklen[%d]: %d, disp[%d]: %ld to %d\n",
1107 data->procs_in_group[i],j,
1108 data->blocklen_per_process[i][j],j,
1109 data->displs_per_process[i][j],
1110 rank);
1111
1112 }
1113 }
1114 }
1115 printf("************Cycle: %d, Aggregator: %d ***************\n",
1116 index+1,rank);
1117 for (i=0; i<entries_per_aggregator;i++){
1118 printf("%d: OFFSET: %lld LENGTH: %ld, Mem-offset: %ld\n",
1119 file_offsets_for_agg[sorted_file_offsets[i]].process_id,
1120 file_offsets_for_agg[sorted_file_offsets[i]].offset,
1121 file_offsets_for_agg[sorted_file_offsets[i]].length,
1122 memory_displacements[sorted_file_offsets[i]]);
1123 }
1124 printf("%d : global_count : %ld, bytes_sent : %d\n",
1125 rank,global_count, bytes_sent);
1126 #endif
1127
1128
1129
1130
1131
1132
1133 for (i=0;i<data->procs_per_group; i++) {
1134 size_t datatype_size;
1135 reqs[i] = MPI_REQUEST_NULL;
1136 if ( 0 < data->disp_index[i] ) {
1137 ompi_datatype_create_hindexed(data->disp_index[i],
1138 data->blocklen_per_process[i],
1139 data->displs_per_process[i],
1140 MPI_BYTE,
1141 &data->recvtype[i]);
1142 ompi_datatype_commit(&data->recvtype[i]);
1143 opal_datatype_type_size(&data->recvtype[i]->super, &datatype_size);
1144
1145 if (datatype_size){
1146 ret = MCA_PML_CALL(irecv(data->global_buf,
1147 1,
1148 data->recvtype[i],
1149 data->procs_in_group[i],
1150 FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG+index,
1151 data->comm,
1152 &reqs[i]));
1153 if (OMPI_SUCCESS != ret){
1154 goto exit;
1155 }
1156 }
1157 }
1158 }
1159 }
1160 }
1161
1162 if (bytes_sent) {
1163 size_t remaining = bytes_sent;
1164 int block_index = -1;
1165 int blocklength_size = INIT_LEN;
1166
1167 ptrdiff_t send_mem_address = (ptrdiff_t) NULL;
1168 ompi_datatype_t *newType = MPI_DATATYPE_NULL;
1169 blocklength_proc = (int *) calloc (blocklength_size, sizeof (int));
1170 displs_proc = (ptrdiff_t *) calloc (blocklength_size, sizeof (ptrdiff_t));
1171
1172 if (NULL == blocklength_proc || NULL == displs_proc ) {
1173 opal_output (1, "OUT OF MEMORY\n");
1174 ret = OMPI_ERR_OUT_OF_RESOURCE;
1175 goto exit;
1176 }
1177
1178 while (remaining) {
1179 block_index++;
1180
1181 if(0 == block_index) {
1182 send_mem_address = (ptrdiff_t) (data->decoded_iov[data->iov_index].iov_base) +
1183 data->current_position;
1184 }
1185 else {
1186
1187 if(0 == block_index % INIT_LEN) {
1188 blocklength_size += INIT_LEN;
1189 blocklength_proc = (int *) realloc(blocklength_proc, blocklength_size * sizeof(int));
1190 displs_proc = (ptrdiff_t *) realloc(displs_proc, blocklength_size * sizeof(ptrdiff_t));
1191 }
1192 displs_proc[block_index] = (ptrdiff_t) (data->decoded_iov[data->iov_index].iov_base) +
1193 data->current_position - send_mem_address;
1194 }
1195
1196 if (remaining >=
1197 (data->decoded_iov[data->iov_index].iov_len - data->current_position)) {
1198
1199 blocklength_proc[block_index] = data->decoded_iov[data->iov_index].iov_len -
1200 data->current_position;
1201 remaining = remaining -
1202 (data->decoded_iov[data->iov_index].iov_len - data->current_position);
1203 data->iov_index = data->iov_index + 1;
1204 data->current_position = 0;
1205 }
1206 else {
1207 blocklength_proc[block_index] = remaining;
1208 data->current_position += remaining;
1209 remaining = 0;
1210 }
1211 }
1212
1213 data->total_bytes_written += bytes_sent;
1214 data->bytes_sent = bytes_sent;
1215
1216 if ( 0 <= block_index ) {
1217 ompi_datatype_create_hindexed(block_index+1,
1218 blocklength_proc,
1219 displs_proc,
1220 MPI_BYTE,
1221 &newType);
1222 ompi_datatype_commit(&newType);
1223
1224 ret = MCA_PML_CALL(isend((char *)send_mem_address,
1225 1,
1226 newType,
1227 aggregator,
1228 FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG+index,
1229 MCA_PML_BASE_SEND_STANDARD,
1230 data->comm,
1231 &reqs[data->procs_per_group]));
1232 if ( MPI_DATATYPE_NULL != newType ) {
1233 ompi_datatype_destroy(&newType);
1234 }
1235 if (OMPI_SUCCESS != ret){
1236 goto exit;
1237 }
1238 }
1239 }
1240
1241
1242 #if DEBUG_ON
1243 if (aggregator == rank){
1244 printf("************Cycle: %d, Aggregator: %d ***************\n",
1245 index+1,rank);
1246 for (i=0 ; i<global_count/4 ; i++)
1247 printf (" RECV %d \n",((int *)data->global_buf)[i]);
1248 }
1249 #endif
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259 if (aggregator == rank && entries_per_aggregator>0) {
1260
1261
1262 data->io_array = (mca_common_ompio_io_array_t *) malloc
1263 (entries_per_aggregator * sizeof (mca_common_ompio_io_array_t));
1264 if (NULL == data->io_array) {
1265 opal_output(1, "OUT OF MEMORY\n");
1266 ret = OMPI_ERR_OUT_OF_RESOURCE;
1267 goto exit;
1268 }
1269
1270 data->num_io_entries = 0;
1271
1272 data->io_array[0].offset =
1273 (IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_file_offsets[0]].offset;
1274 data->io_array[0].length =
1275 file_offsets_for_agg[sorted_file_offsets[0]].length;
1276 data->io_array[0].memory_address =
1277 data->global_buf+memory_displacements[sorted_file_offsets[0]];
1278 data->num_io_entries++;
1279
1280 for (i=1;i<entries_per_aggregator;i++){
1281
1282
1283 if (file_offsets_for_agg[sorted_file_offsets[i-1]].offset +
1284 file_offsets_for_agg[sorted_file_offsets[i-1]].length ==
1285 file_offsets_for_agg[sorted_file_offsets[i]].offset){
1286 data->io_array[data->num_io_entries - 1].length +=
1287 file_offsets_for_agg[sorted_file_offsets[i]].length;
1288 }
1289 else {
1290 data->io_array[data->num_io_entries].offset =
1291 (IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_file_offsets[i]].offset;
1292 data->io_array[data->num_io_entries].length =
1293 file_offsets_for_agg[sorted_file_offsets[i]].length;
1294 data->io_array[data->num_io_entries].memory_address =
1295 data->global_buf+memory_displacements[sorted_file_offsets[i]];
1296 data->num_io_entries++;
1297 }
1298
1299 }
1300
1301 #if DEBUG_ON
1302 printf("*************************** %d\n", num_of_io_entries);
1303 for (i=0 ; i<num_of_io_entries ; i++) {
1304 printf(" ADDRESS: %p OFFSET: %ld LENGTH: %ld\n",
1305 io_array[i].memory_address,
1306 (ptrdiff_t)io_array[i].offset,
1307 io_array[i].length);
1308 }
1309
1310 #endif
1311 }
1312
1313 exit:
1314 free(sorted_file_offsets);
1315 free(file_offsets_for_agg);
1316 free(memory_displacements);
1317 free(blocklength_proc);
1318 free(displs_proc);
1319
1320 return OMPI_SUCCESS;
1321 }
1322
1323
1324
1325 int mca_fcoll_dynamic_gen2_break_file_view ( struct iovec *mem_iov, int mem_count,
1326 struct iovec *file_iov, int file_count,
1327 struct iovec ***ret_broken_mem_iovs, int **ret_broken_mem_counts,
1328 struct iovec ***ret_broken_file_iovs, int **ret_broken_file_counts,
1329 MPI_Aint **ret_broken_total_lengths,
1330 int stripe_count, int stripe_size)
1331 {
1332 int i, j, ret=OMPI_SUCCESS;
1333 struct iovec **broken_mem_iovs=NULL;
1334 int *broken_mem_counts=NULL;
1335 struct iovec **broken_file_iovs=NULL;
1336 int *broken_file_counts=NULL;
1337 MPI_Aint *broken_total_lengths=NULL;
1338 int **block=NULL, **max_lengths=NULL;
1339
1340 broken_mem_iovs = (struct iovec **) malloc ( stripe_count * sizeof(struct iovec *));
1341 broken_file_iovs = (struct iovec **) malloc ( stripe_count * sizeof(struct iovec *));
1342 if ( NULL == broken_mem_iovs || NULL == broken_file_iovs ) {
1343 ret = OMPI_ERR_OUT_OF_RESOURCE;
1344 goto exit;
1345 }
1346 for ( i=0; i<stripe_count; i++ ) {
1347 broken_mem_iovs[i] = (struct iovec*) calloc (1, sizeof(struct iovec ));
1348 broken_file_iovs[i] = (struct iovec*) calloc (1, sizeof(struct iovec ));
1349 }
1350
1351 broken_mem_counts = (int *) calloc ( stripe_count, sizeof(int));
1352 broken_file_counts = (int *) calloc ( stripe_count, sizeof(int));
1353 broken_total_lengths = (MPI_Aint *) calloc ( stripe_count, sizeof(MPI_Aint));
1354 if ( NULL == broken_mem_counts || NULL == broken_file_counts ||
1355 NULL == broken_total_lengths ) {
1356 ret = OMPI_ERR_OUT_OF_RESOURCE;
1357 goto exit;
1358 }
1359
1360 block = (int **) calloc ( stripe_count, sizeof(int *));
1361 max_lengths = (int **) calloc ( stripe_count, sizeof(int *));
1362 if ( NULL == block || NULL == max_lengths ) {
1363 ret = OMPI_ERR_OUT_OF_RESOURCE;
1364 goto exit;
1365 }
1366
1367 for ( i=0; i<stripe_count; i++ ){
1368 block[i] = (int *) malloc ( 5 * sizeof(int));
1369 max_lengths[i] = (int *) malloc ( 2 * sizeof(int));
1370 if ( NULL == block[i] || NULL == max_lengths[i]) {
1371 ret = OMPI_ERR_OUT_OF_RESOURCE;
1372 goto exit;
1373 }
1374 max_lengths[i][0] = 1;
1375 max_lengths[i][1] = 1;
1376
1377 for ( j=0; j<5; j++ ) {
1378 block[i][j]=2;
1379 }
1380 }
1381
1382
1383 int owner;
1384 size_t rest, len, temp_len, blocklen, memlen=0;
1385 off_t offset, temp_offset, start_offset, memoffset=0;
1386
1387 i=j=0;
1388
1389 if ( 0 < mem_count ) {
1390 memoffset = (off_t ) mem_iov[j].iov_base;
1391 memlen = mem_iov[j].iov_len;
1392 }
1393 while ( i < file_count) {
1394 offset = (off_t) file_iov[i].iov_base;
1395 len = file_iov[i].iov_len;
1396
1397
1398 #if DEBUG_ON
1399 printf("%d:file_iov[%d].base=%ld .len=%d\n", rank, i,
1400 file_iov[i].iov_base, file_iov[i].iov_len);
1401 #endif
1402 do {
1403 owner = (offset / stripe_size ) % stripe_count;
1404 start_offset = (offset / stripe_size );
1405 rest = (start_offset + 1) * stripe_size - offset;
1406
1407 if ( len >= rest ) {
1408 blocklen = rest;
1409 temp_offset = offset+rest;
1410 temp_len = len - rest;
1411 }
1412 else {
1413 blocklen = len;
1414 temp_offset = 0;
1415 temp_len = 0;
1416 }
1417
1418 broken_file_iovs[owner][broken_file_counts[owner]].iov_base = (void *)offset;
1419 broken_file_iovs[owner][broken_file_counts[owner]].iov_len = blocklen;
1420 #if DEBUG_ON
1421 printf("%d: owner=%d b_file_iovs[%d].base=%ld .len=%d \n", rank, owner,
1422 broken_file_counts[owner],
1423 broken_file_iovs[owner][broken_file_counts[owner]].iov_base,
1424 broken_file_iovs[owner][broken_file_counts[owner]].iov_len );
1425 #endif
1426 do {
1427 if ( memlen >= blocklen ) {
1428 broken_mem_iovs[owner][broken_mem_counts[owner]].iov_base = (void *) memoffset;
1429 broken_mem_iovs[owner][broken_mem_counts[owner]].iov_len = blocklen;
1430 memoffset += blocklen;
1431 memlen -= blocklen;
1432 blocklen = 0;
1433
1434 if ( 0 == memlen ) {
1435 j++;
1436 if ( j < mem_count ) {
1437 memoffset = (off_t) mem_iov[j].iov_base;
1438 memlen = mem_iov[j].iov_len;
1439 }
1440 else
1441 break;
1442 }
1443 }
1444 else {
1445 broken_mem_iovs[owner][broken_mem_counts[owner]].iov_base = (void *) memoffset;
1446 broken_mem_iovs[owner][broken_mem_counts[owner]].iov_len = memlen;
1447 blocklen -= memlen;
1448
1449 j++;
1450 if ( j < mem_count ) {
1451 memoffset = (off_t) mem_iov[j].iov_base;
1452 memlen = mem_iov[j].iov_len;
1453 }
1454 else
1455 break;
1456 }
1457 #if DEBUG_ON
1458 printf("%d: owner=%d b_mem_iovs[%d].base=%ld .len=%d\n", rank, owner,
1459 broken_mem_counts[owner],
1460 broken_mem_iovs[owner][broken_mem_counts[owner]].iov_base,
1461 broken_mem_iovs[owner][broken_mem_counts[owner]].iov_len);
1462 #endif
1463
1464 broken_mem_counts[owner]++;
1465 if ( broken_mem_counts[owner] >= max_lengths[owner][0] ) {
1466 broken_mem_iovs[owner] = (struct iovec*) realloc ( broken_mem_iovs[owner],
1467 mem_count * block[owner][0] *
1468 sizeof(struct iovec ));
1469 max_lengths[owner][0] = mem_count * block[owner][0];
1470 block[owner][0]++;
1471 }
1472
1473 } while ( blocklen > 0 );
1474
1475 broken_file_counts[owner]++;
1476 if ( broken_file_counts[owner] >= max_lengths[owner][1] ) {
1477 broken_file_iovs[owner] = (struct iovec*) realloc ( broken_file_iovs[owner],
1478 file_count * block[owner][1] *
1479 sizeof(struct iovec ));
1480 max_lengths[owner][1] = file_count * block[owner][1];
1481 block[owner][1]++;
1482 }
1483
1484 offset = temp_offset;
1485 len = temp_len;
1486 } while( temp_len > 0 );
1487
1488 i++;
1489 }
1490
1491
1492
1493 for ( i=0; i< stripe_count; i++ ) {
1494 for ( j=0; j<broken_file_counts[i]; j++ ) {
1495 broken_total_lengths[i] += broken_file_iovs[i][j].iov_len;
1496 }
1497 #if DEBUG_ON
1498 printf("%d: broken_total_lengths[%d] = %d\n", rank, i, broken_total_lengths[i]);
1499 #endif
1500 }
1501
1502 *ret_broken_mem_iovs = broken_mem_iovs;
1503 *ret_broken_mem_counts = broken_mem_counts;
1504 *ret_broken_file_iovs = broken_file_iovs;
1505 *ret_broken_file_counts = broken_file_counts;
1506 *ret_broken_total_lengths = broken_total_lengths;
1507
1508 if ( NULL != block) {
1509 for ( i=0; i<stripe_count; i++ ){
1510 free (block[i] );
1511 }
1512 free ( block);
1513 }
1514 if ( NULL != max_lengths) {
1515 for ( i=0; i<stripe_count; i++ ){
1516 free (max_lengths[i] );
1517 }
1518 free ( max_lengths);
1519 }
1520
1521 return ret;
1522
1523 exit:
1524 free ( broken_mem_iovs);
1525 free ( broken_mem_counts);
1526 free ( broken_file_iovs );
1527 free ( broken_file_counts);
1528 free ( broken_total_lengths);
1529
1530 if ( NULL != block) {
1531 for ( i=0; i<stripe_count; i++ ){
1532 free (block[i] );
1533 }
1534 free ( block);
1535 }
1536 if ( NULL != max_lengths) {
1537 for ( i=0; i<stripe_count; i++ ){
1538 free (max_lengths[i] );
1539 }
1540 free ( max_lengths);
1541 }
1542
1543 *ret_broken_mem_iovs = NULL;
1544 *ret_broken_mem_counts = NULL;
1545 *ret_broken_file_iovs = NULL;
1546 *ret_broken_file_counts = NULL;
1547 *ret_broken_total_lengths = NULL;
1548
1549 return ret;
1550 }
1551
1552
1553 int mca_fcoll_dynamic_gen2_get_configuration (ompio_file_t *fh, int *dynamic_gen2_num_io_procs, int **ret_aggregators)
1554 {
1555 int *aggregators=NULL;
1556 int num_io_procs = *dynamic_gen2_num_io_procs;
1557 int i;
1558
1559 if ( num_io_procs < 1 ) {
1560 num_io_procs = fh->f_stripe_count;
1561 if ( num_io_procs < 1 ) {
1562 num_io_procs = 1;
1563 }
1564 }
1565 if ( num_io_procs > fh->f_size ) {
1566 num_io_procs = fh->f_size;
1567 }
1568
1569 fh->f_procs_per_group = fh->f_size;
1570 fh->f_procs_in_group = (int *) malloc ( sizeof(int) * fh->f_size );
1571 if ( NULL == fh->f_procs_in_group) {
1572 return OMPI_ERR_OUT_OF_RESOURCE;
1573 }
1574 for (i=0; i<fh->f_size; i++ ) {
1575 fh->f_procs_in_group[i]=i;
1576 }
1577
1578
1579 aggregators = (int *) malloc ( num_io_procs * sizeof(int));
1580 if ( NULL == aggregators ) {
1581
1582 return OMPI_ERR_OUT_OF_RESOURCE;
1583 }
1584 for ( i=0; i<num_io_procs; i++ ) {
1585 aggregators[i] = i * fh->f_size / num_io_procs;
1586 }
1587
1588 *dynamic_gen2_num_io_procs = num_io_procs;
1589 *ret_aggregators = aggregators;
1590
1591 return OMPI_SUCCESS;
1592 }
1593
1594
1595 int mca_fcoll_dynamic_gen2_split_iov_array ( ompio_file_t *fh, mca_common_ompio_io_array_t *io_array, int num_entries,
1596 int *ret_array_pos, int *ret_pos, int chunk_size )
1597 {
1598
1599 int array_pos = *ret_array_pos;
1600 int pos = *ret_pos;
1601 size_t bytes_written = 0;
1602 size_t bytes_to_write = chunk_size;
1603
1604 if ( 0 == array_pos && 0 == pos ) {
1605 fh->f_io_array = (mca_common_ompio_io_array_t *) malloc ( num_entries * sizeof(mca_common_ompio_io_array_t));
1606 if ( NULL == fh->f_io_array ){
1607 opal_output (1,"Could not allocate memory\n");
1608 return -1;
1609 }
1610 }
1611
1612 int i=0;
1613 while (bytes_to_write > 0 ) {
1614 fh->f_io_array[i].memory_address = &(((char *)io_array[array_pos].memory_address)[pos]);
1615 fh->f_io_array[i].offset = &(((char *)io_array[array_pos].offset)[pos]);
1616
1617 if ( (io_array[array_pos].length - pos ) >= bytes_to_write ) {
1618 fh->f_io_array[i].length = bytes_to_write;
1619 }
1620 else {
1621 fh->f_io_array[i].length = io_array[array_pos].length - pos;
1622 }
1623
1624 pos += fh->f_io_array[i].length;
1625 bytes_written += fh->f_io_array[i].length;
1626 bytes_to_write-= fh->f_io_array[i].length;
1627 i++;
1628
1629 if ( pos == (int)io_array[array_pos].length ) {
1630 pos = 0;
1631 if ((array_pos + 1) < num_entries) {
1632 array_pos++;
1633 }
1634 else {
1635 break;
1636 }
1637 }
1638 }
1639
1640 fh->f_num_of_io_entries = i;
1641 *ret_array_pos = array_pos;
1642 *ret_pos = pos;
1643 return bytes_written;
1644 }
1645
1646
1647 static int local_heap_sort (mca_io_ompio_local_io_array *io_array,
1648 int num_entries,
1649 int *sorted)
1650 {
1651 int i = 0;
1652 int j = 0;
1653 int left = 0;
1654 int right = 0;
1655 int largest = 0;
1656 int heap_size = num_entries - 1;
1657 int temp = 0;
1658 unsigned char done = 0;
1659 int* temp_arr = NULL;
1660
1661 temp_arr = (int*)malloc(num_entries*sizeof(int));
1662 if (NULL == temp_arr) {
1663 opal_output (1, "OUT OF MEMORY\n");
1664 return OMPI_ERR_OUT_OF_RESOURCE;
1665 }
1666 temp_arr[0] = 0;
1667 for (i = 1; i < num_entries; ++i) {
1668 temp_arr[i] = i;
1669 }
1670
1671 for (i = num_entries/2-1 ; i>=0 ; i--) {
1672 done = 0;
1673 j = i;
1674 largest = j;
1675
1676 while (!done) {
1677 left = j*2+1;
1678 right = j*2+2;
1679 if ((left <= heap_size) &&
1680 (io_array[temp_arr[left]].offset > io_array[temp_arr[j]].offset)) {
1681 largest = left;
1682 }
1683 else {
1684 largest = j;
1685 }
1686 if ((right <= heap_size) &&
1687 (io_array[temp_arr[right]].offset >
1688 io_array[temp_arr[largest]].offset)) {
1689 largest = right;
1690 }
1691 if (largest != j) {
1692 temp = temp_arr[largest];
1693 temp_arr[largest] = temp_arr[j];
1694 temp_arr[j] = temp;
1695 j = largest;
1696 }
1697 else {
1698 done = 1;
1699 }
1700 }
1701 }
1702
1703 for (i = num_entries-1; i >=1; --i) {
1704 temp = temp_arr[0];
1705 temp_arr[0] = temp_arr[i];
1706 temp_arr[i] = temp;
1707 heap_size--;
1708 done = 0;
1709 j = 0;
1710 largest = j;
1711
1712 while (!done) {
1713 left = j*2+1;
1714 right = j*2+2;
1715
1716 if ((left <= heap_size) &&
1717 (io_array[temp_arr[left]].offset >
1718 io_array[temp_arr[j]].offset)) {
1719 largest = left;
1720 }
1721 else {
1722 largest = j;
1723 }
1724 if ((right <= heap_size) &&
1725 (io_array[temp_arr[right]].offset >
1726 io_array[temp_arr[largest]].offset)) {
1727 largest = right;
1728 }
1729 if (largest != j) {
1730 temp = temp_arr[largest];
1731 temp_arr[largest] = temp_arr[j];
1732 temp_arr[j] = temp;
1733 j = largest;
1734 }
1735 else {
1736 done = 1;
1737 }
1738 }
1739 sorted[i] = temp_arr[i];
1740 }
1741 sorted[0] = temp_arr[0];
1742
1743 if (NULL != temp_arr) {
1744 free(temp_arr);
1745 temp_arr = NULL;
1746 }
1747 return OMPI_SUCCESS;
1748 }
1749