This source file includes following definitions.
- mca_fcoll_vulcan_file_write_all
- write_init
- shuffle_init
- mca_fcoll_vulcan_minmax
- mca_fcoll_vulcan_break_file_view
- mca_fcoll_vulcan_get_configuration
- mca_fcoll_vulcan_split_iov_array
- local_heap_sort
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 #include "ompi_config.h"
23 #include "fcoll_vulcan.h"
24
25 #include "mpi.h"
26 #include "ompi/constants.h"
27 #include "ompi/mca/fcoll/fcoll.h"
28 #include "ompi/mca/fcoll/base/fcoll_base_coll_array.h"
29 #include "ompi/mca/common/ompio/common_ompio.h"
30 #include "ompi/mca/io/io.h"
31 #include "ompi/mca/common/ompio/common_ompio_request.h"
32 #include "math.h"
33 #include "ompi/mca/pml/pml.h"
34 #include <unistd.h>
35
36 #define DEBUG_ON 0
37 #define FCOLL_VULCAN_SHUFFLE_TAG 123
38 #define INIT_LEN 10
39 #define NOT_AGGR_INDEX -1
40
41
42 typedef struct mca_io_ompio_local_io_array{
43 OMPI_MPI_OFFSET_TYPE offset;
44 MPI_Aint length;
45 int process_id;
46 }mca_io_ompio_local_io_array;
47
48 typedef struct mca_io_ompio_aggregator_data {
49 int *disp_index, *sorted, *fview_count, n;
50 int *max_disp_index;
51 int **blocklen_per_process;
52 MPI_Aint **displs_per_process, total_bytes, bytes_per_cycle, total_bytes_written;
53 MPI_Comm comm;
54 char *buf, *global_buf, *prev_global_buf;
55 ompi_datatype_t **recvtype, **prev_recvtype;
56 struct iovec *global_iov_array;
57 int current_index, current_position;
58 int bytes_to_write_in_cycle, bytes_remaining, procs_per_group;
59 int *procs_in_group, iov_index;
60 int bytes_sent, prev_bytes_sent;
61 struct iovec *decoded_iov;
62 int bytes_to_write, prev_bytes_to_write;
63 mca_common_ompio_io_array_t *io_array, *prev_io_array;
64 int num_io_entries, prev_num_io_entries;
65 } mca_io_ompio_aggregator_data;
66
67
68 #define SWAP_REQUESTS(_r1,_r2) { \
69 ompi_request_t **_t=_r1; \
70 _r1=_r2; \
71 _r2=_t;}
72
73 #define SWAP_AGGR_POINTERS(_aggr,_num) { \
74 int _i; \
75 char *_t; \
76 for (_i=0; _i<_num; _i++ ) { \
77 _aggr[_i]->prev_io_array=_aggr[_i]->io_array; \
78 _aggr[_i]->prev_num_io_entries=_aggr[_i]->num_io_entries; \
79 _aggr[_i]->prev_bytes_sent=_aggr[_i]->bytes_sent; \
80 _aggr[_i]->prev_bytes_to_write=_aggr[_i]->bytes_to_write; \
81 _t=_aggr[_i]->prev_global_buf; \
82 _aggr[_i]->prev_global_buf=_aggr[_i]->global_buf; \
83 _aggr[_i]->global_buf=_t; \
84 _t=(char *)_aggr[_i]->recvtype; \
85 _aggr[_i]->recvtype=_aggr[_i]->prev_recvtype; \
86 _aggr[_i]->prev_recvtype=(ompi_datatype_t **)_t; } \
87 }
88
89
90
91 static int shuffle_init ( int index, int cycles, int aggregator, int rank,
92 mca_io_ompio_aggregator_data *data,
93 ompi_request_t **reqs );
94 static int write_init (ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data,
95 int write_chunksize, int write_synchType, ompi_request_t **request);
96 int mca_fcoll_vulcan_break_file_view ( struct iovec *decoded_iov, int iov_count,
97 struct iovec *local_iov_array, int local_count,
98 struct iovec ***broken_decoded_iovs, int **broken_iov_counts,
99 struct iovec ***broken_iov_arrays, int **broken_counts,
100 MPI_Aint **broken_total_lengths,
101 int stripe_count, size_t stripe_size);
102
103
104 int mca_fcoll_vulcan_get_configuration (ompio_file_t *fh, int num_io_procs,
105 int num_groups, size_t max_data);
106
107
108 static int local_heap_sort (mca_io_ompio_local_io_array *io_array,
109 int num_entries,
110 int *sorted);
111
112 int mca_fcoll_vulcan_split_iov_array ( ompio_file_t *fh, mca_common_ompio_io_array_t *work_array,
113 int num_entries, int *last_array_pos, int *last_pos_in_field,
114 int chunk_size );
115
116
117 static int mca_fcoll_vulcan_minmax ( ompio_file_t *fh, struct iovec *iov, int iov_count, int num_aggregators,
118 long *new_stripe_size);
119
120
121 int mca_fcoll_vulcan_file_write_all (ompio_file_t *fh,
122 const void *buf,
123 int count,
124 struct ompi_datatype_t *datatype,
125 ompi_status_public_t *status)
126 {
127 int index = 0;
128 int cycles = 0;
129 int ret =0, l, i, j, bytes_per_cycle;
130 uint32_t iov_count = 0;
131 struct iovec *decoded_iov = NULL;
132 struct iovec *local_iov_array=NULL;
133 uint32_t total_fview_count = 0;
134 int local_count = 0;
135 ompi_request_t **reqs = NULL;
136 ompi_request_t *req_iwrite = MPI_REQUEST_NULL;
137 mca_io_ompio_aggregator_data **aggr_data=NULL;
138
139 int *displs = NULL;
140 int vulcan_num_io_procs;
141 size_t max_data = 0;
142 MPI_Aint *total_bytes_per_process = NULL;
143
144 struct iovec **broken_iov_arrays=NULL;
145 struct iovec **broken_decoded_iovs=NULL;
146 int *broken_counts=NULL;
147 int *broken_iov_counts=NULL;
148 MPI_Aint *broken_total_lengths=NULL;
149
150 int aggr_index = NOT_AGGR_INDEX;
151 int write_synch_type = 2;
152 int write_chunksize, *result_counts=NULL;
153
154 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
155 double write_time = 0.0, start_write_time = 0.0, end_write_time = 0.0;
156 double comm_time = 0.0, start_comm_time = 0.0, end_comm_time = 0.0;
157 double exch_write = 0.0, start_exch = 0.0, end_exch = 0.0;
158 mca_common_ompio_print_entry nentry;
159 #endif
160
161
162
163
164
165 vulcan_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators"));
166 if ( OMPI_ERR_MAX == vulcan_num_io_procs ) {
167 ret = OMPI_ERROR;
168 goto exit;
169 }
170 bytes_per_cycle = fh->f_bytes_per_agg;
171
172 if( (1 == mca_fcoll_vulcan_async_io) && (NULL == fh->f_fbtl->fbtl_ipwritev) ) {
173 opal_output (1, "vulcan_write_all: fbtl Does NOT support ipwritev() (asynchrounous write) \n");
174 ret = MPI_ERR_UNSUPPORTED_OPERATION;
175 goto exit;
176 }
177
178
179
180 bytes_per_cycle =bytes_per_cycle/2;
181 write_chunksize = bytes_per_cycle;
182
183 ret = mca_common_ompio_decode_datatype ((struct ompio_file_t *) fh,
184 datatype,
185 count,
186 buf,
187 &max_data,
188 fh->f_mem_convertor,
189 &decoded_iov,
190 &iov_count);
191 if (OMPI_SUCCESS != ret ){
192 goto exit;
193 }
194
195 if ( MPI_STATUS_IGNORE != status ) {
196 status->_ucount = max_data;
197 }
198
199
200 ret = mca_fcoll_vulcan_get_configuration (fh, vulcan_num_io_procs, mca_fcoll_vulcan_num_groups, max_data);
201 if (OMPI_SUCCESS != ret){
202 goto exit;
203 }
204
205 aggr_data = (mca_io_ompio_aggregator_data **) malloc ( fh->f_num_aggrs *
206 sizeof(mca_io_ompio_aggregator_data*));
207
208 for ( i=0; i< fh->f_num_aggrs; i++ ) {
209
210
211
212 aggr_data[i] = (mca_io_ompio_aggregator_data *) calloc ( 1, sizeof(mca_io_ompio_aggregator_data));
213 aggr_data[i]->procs_per_group = fh->f_procs_per_group;
214 aggr_data[i]->procs_in_group = fh->f_procs_in_group;
215 aggr_data[i]->comm = fh->f_comm;
216 aggr_data[i]->buf = (char *)buf;
217
218
219 if(fh->f_aggr_list[i] == fh->f_rank) {
220 aggr_index = i;
221 }
222 }
223
224
225
226
227
228 ret = fh->f_generate_current_file_view( (struct ompio_file_t *) fh,
229 max_data,
230 &local_iov_array,
231 &local_count);
232 if (ret != OMPI_SUCCESS){
233 goto exit;
234 }
235
236
237
238
239
240 long domain_size;
241 ret = mca_fcoll_vulcan_minmax ( fh, local_iov_array, local_count, fh->f_num_aggrs, &domain_size);
242
243
244
245 ret = mca_fcoll_vulcan_break_file_view ( decoded_iov, iov_count,
246 local_iov_array, local_count,
247 &broken_decoded_iovs, &broken_iov_counts,
248 &broken_iov_arrays, &broken_counts,
249 &broken_total_lengths,
250 fh->f_num_aggrs, domain_size);
251
252
253
254
255
256 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
257 start_comm_time = MPI_Wtime();
258 #endif
259 if ( 1 == mca_fcoll_vulcan_num_groups ) {
260 ret = fh->f_comm->c_coll->coll_allreduce (MPI_IN_PLACE,
261 broken_total_lengths,
262 fh->f_num_aggrs,
263 MPI_LONG,
264 MPI_SUM,
265 fh->f_comm,
266 fh->f_comm->c_coll->coll_allreduce_module);
267 if( OMPI_SUCCESS != ret){
268 goto exit;
269 }
270
271 }
272 else {
273 total_bytes_per_process = (MPI_Aint*)malloc
274 (fh->f_num_aggrs * fh->f_procs_per_group*sizeof(MPI_Aint));
275 if (NULL == total_bytes_per_process) {
276 opal_output (1, "OUT OF MEMORY\n");
277 ret = OMPI_ERR_OUT_OF_RESOURCE;
278 goto exit;
279 }
280
281 ret = ompi_fcoll_base_coll_allgather_array (broken_total_lengths,
282 fh->f_num_aggrs,
283 MPI_LONG,
284 total_bytes_per_process,
285 fh->f_num_aggrs,
286 MPI_LONG,
287 0,
288 fh->f_procs_in_group,
289 fh->f_procs_per_group,
290 fh->f_comm);
291 if( OMPI_SUCCESS != ret){
292 goto exit;
293 }
294
295 for ( i=0; i<fh->f_num_aggrs; i++ ) {
296 broken_total_lengths[i] = 0;
297 for (j=0 ; j<fh->f_procs_per_group ; j++) {
298 broken_total_lengths[i] += total_bytes_per_process[j*fh->f_num_aggrs + i];
299 }
300 }
301 if (NULL != total_bytes_per_process) {
302 free (total_bytes_per_process);
303 total_bytes_per_process = NULL;
304 }
305 }
306
307 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
308 end_comm_time = MPI_Wtime();
309 comm_time += (end_comm_time - start_comm_time);
310 #endif
311
312 cycles=0;
313 for ( i=0; i<fh->f_num_aggrs; i++ ) {
314 #if DEBUG_ON
315 printf("%d: Overall broken_total_lengths[%d] = %ld\n", fh->f_rank, i, broken_total_lengths[i]);
316 #endif
317 if ( ceil((double)broken_total_lengths[i]/bytes_per_cycle) > cycles ) {
318 cycles = ceil((double)broken_total_lengths[i]/bytes_per_cycle);
319 }
320 }
321
322 result_counts = (int *) malloc ( fh->f_num_aggrs * fh->f_procs_per_group * sizeof(int) );
323 if ( NULL == result_counts ) {
324 ret = OMPI_ERR_OUT_OF_RESOURCE;
325 goto exit;
326 }
327
328 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
329 start_comm_time = MPI_Wtime();
330 #endif
331 if ( 1 == mca_fcoll_vulcan_num_groups ) {
332 ret = fh->f_comm->c_coll->coll_allgather(broken_counts,
333 fh->f_num_aggrs,
334 MPI_INT,
335 result_counts,
336 fh->f_num_aggrs,
337 MPI_INT,
338 fh->f_comm,
339 fh->f_comm->c_coll->coll_allgather_module);
340 }
341 else {
342 ret = ompi_fcoll_base_coll_allgather_array (broken_counts,
343 fh->f_num_aggrs,
344 MPI_INT,
345 result_counts,
346 fh->f_num_aggrs,
347 MPI_INT,
348 0,
349 fh->f_procs_in_group,
350 fh->f_procs_per_group,
351 fh->f_comm);
352 }
353 if( OMPI_SUCCESS != ret){
354 goto exit;
355 }
356 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
357 end_comm_time = MPI_Wtime();
358 comm_time += (end_comm_time - start_comm_time);
359 #endif
360
361
362
363
364 for ( i=0; i< fh->f_num_aggrs; i++ ) {
365 aggr_data[i]->total_bytes = broken_total_lengths[i];
366 aggr_data[i]->decoded_iov = broken_decoded_iovs[i];
367 aggr_data[i]->fview_count = (int *) malloc (fh->f_procs_per_group * sizeof (int));
368 if (NULL == aggr_data[i]->fview_count) {
369 opal_output (1, "OUT OF MEMORY\n");
370 ret = OMPI_ERR_OUT_OF_RESOURCE;
371 goto exit;
372 }
373 for ( j=0; j <fh->f_procs_per_group; j++ ) {
374 aggr_data[i]->fview_count[j] = result_counts[fh->f_num_aggrs*j+i];
375 }
376 displs = (int*) malloc (fh->f_procs_per_group * sizeof (int));
377 if (NULL == displs) {
378 opal_output (1, "OUT OF MEMORY\n");
379 ret = OMPI_ERR_OUT_OF_RESOURCE;
380 goto exit;
381 }
382
383 displs[0] = 0;
384 total_fview_count = aggr_data[i]->fview_count[0];
385 for (j=1 ; j<fh->f_procs_per_group ; j++) {
386 total_fview_count += aggr_data[i]->fview_count[j];
387 displs[j] = displs[j-1] + aggr_data[i]->fview_count[j-1];
388 }
389
390 #if DEBUG_ON
391 printf("total_fview_count : %d\n", total_fview_count);
392 if (fh->f_aggr_list[i] == fh->f_rank) {
393 for (j=0 ; j<fh->f_procs_per_group ; i++) {
394 printf ("%d: PROCESS: %d ELEMENTS: %d DISPLS: %d\n",
395 fh->f_rank,
396 j,
397 aggr_data[i]->fview_count[j],
398 displs[j]);
399 }
400 }
401 #endif
402
403
404 if (0 != total_fview_count) {
405 aggr_data[i]->global_iov_array = (struct iovec*) malloc (total_fview_count *
406 sizeof(struct iovec));
407 if (NULL == aggr_data[i]->global_iov_array){
408 opal_output(1, "OUT OF MEMORY\n");
409 ret = OMPI_ERR_OUT_OF_RESOURCE;
410 goto exit;
411 }
412 }
413
414 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
415 start_comm_time = MPI_Wtime();
416 #endif
417 if ( 1 == mca_fcoll_vulcan_num_groups ) {
418 ret = fh->f_comm->c_coll->coll_allgatherv (broken_iov_arrays[i],
419 broken_counts[i],
420 fh->f_iov_type,
421 aggr_data[i]->global_iov_array,
422 aggr_data[i]->fview_count,
423 displs,
424 fh->f_iov_type,
425 fh->f_comm,
426 fh->f_comm->c_coll->coll_allgatherv_module );
427 }
428 else {
429 ret = ompi_fcoll_base_coll_allgatherv_array (broken_iov_arrays[i],
430 broken_counts[i],
431 fh->f_iov_type,
432 aggr_data[i]->global_iov_array,
433 aggr_data[i]->fview_count,
434 displs,
435 fh->f_iov_type,
436 fh->f_aggr_list[i],
437 fh->f_procs_in_group,
438 fh->f_procs_per_group,
439 fh->f_comm);
440 }
441 if (OMPI_SUCCESS != ret){
442 goto exit;
443 }
444 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
445 end_comm_time = MPI_Wtime();
446 comm_time += (end_comm_time - start_comm_time);
447 #endif
448
449
450
451
452
453
454
455
456
457 if (0 != total_fview_count) {
458 aggr_data[i]->sorted = (int *)malloc (total_fview_count * sizeof(int));
459 if (NULL == aggr_data[i]->sorted) {
460 opal_output (1, "OUT OF MEMORY\n");
461 ret = OMPI_ERR_OUT_OF_RESOURCE;
462 goto exit;
463 }
464 ompi_fcoll_base_sort_iovec (aggr_data[i]->global_iov_array, total_fview_count, aggr_data[i]->sorted);
465 }
466
467 if (NULL != local_iov_array){
468 free(local_iov_array);
469 local_iov_array = NULL;
470 }
471
472 if (NULL != displs){
473 free(displs);
474 displs=NULL;
475 }
476
477
478 #if DEBUG_ON
479 if (fh->f_aggr_list[i] == fh->f_rank) {
480 uint32_t tv=0;
481 for (tv=0 ; tv<total_fview_count ; tv++) {
482 printf("%d: OFFSET: %lld LENGTH: %ld\n",
483 fh->f_rank,
484 aggr_data[i]->global_iov_array[aggr_data[i]->sorted[tv]].iov_base,
485 aggr_data[i]->global_iov_array[aggr_data[i]->sorted[tv]].iov_len);
486 }
487 }
488 #endif
489
490
491
492
493
494 aggr_data[i]->bytes_per_cycle = bytes_per_cycle;
495
496 if (fh->f_aggr_list[i] == fh->f_rank) {
497 aggr_data[i]->disp_index = (int *)malloc (fh->f_procs_per_group * sizeof (int));
498 if (NULL == aggr_data[i]->disp_index) {
499 opal_output (1, "OUT OF MEMORY\n");
500 ret = OMPI_ERR_OUT_OF_RESOURCE;
501 goto exit;
502 }
503
504 aggr_data[i]->max_disp_index = (int *)calloc (fh->f_procs_per_group, sizeof (int));
505 if (NULL == aggr_data[i]->max_disp_index) {
506 opal_output (1, "OUT OF MEMORY\n");
507 ret = OMPI_ERR_OUT_OF_RESOURCE;
508 goto exit;
509 }
510
511 aggr_data[i]->blocklen_per_process = (int **)calloc (fh->f_procs_per_group, sizeof (int*));
512 if (NULL == aggr_data[i]->blocklen_per_process) {
513 opal_output (1, "OUT OF MEMORY\n");
514 ret = OMPI_ERR_OUT_OF_RESOURCE;
515 goto exit;
516 }
517
518 aggr_data[i]->displs_per_process = (MPI_Aint **)calloc (fh->f_procs_per_group, sizeof (MPI_Aint*));
519 if (NULL == aggr_data[i]->displs_per_process) {
520 opal_output (1, "OUT OF MEMORY\n");
521 ret = OMPI_ERR_OUT_OF_RESOURCE;
522 goto exit;
523 }
524
525
526 aggr_data[i]->global_buf = (char *) malloc (bytes_per_cycle);
527 aggr_data[i]->prev_global_buf = (char *) malloc (bytes_per_cycle);
528 if (NULL == aggr_data[i]->global_buf || NULL == aggr_data[i]->prev_global_buf){
529 opal_output(1, "OUT OF MEMORY");
530 ret = OMPI_ERR_OUT_OF_RESOURCE;
531 goto exit;
532 }
533
534 aggr_data[i]->recvtype = (ompi_datatype_t **) malloc (fh->f_procs_per_group *
535 sizeof(ompi_datatype_t *));
536 aggr_data[i]->prev_recvtype = (ompi_datatype_t **) malloc (fh->f_procs_per_group *
537 sizeof(ompi_datatype_t *));
538 if (NULL == aggr_data[i]->recvtype || NULL == aggr_data[i]->prev_recvtype) {
539 opal_output (1, "OUT OF MEMORY\n");
540 ret = OMPI_ERR_OUT_OF_RESOURCE;
541 goto exit;
542 }
543 for(l=0;l<fh->f_procs_per_group;l++){
544 aggr_data[i]->recvtype[l] = MPI_DATATYPE_NULL;
545 aggr_data[i]->prev_recvtype[l] = MPI_DATATYPE_NULL;
546 }
547 }
548
549 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
550 start_exch = MPI_Wtime();
551 #endif
552 }
553
554 reqs = (ompi_request_t **)malloc ((fh->f_procs_per_group + 1 )*fh->f_num_aggrs *sizeof(ompi_request_t *));
555
556 if ( NULL == reqs ) {
557 opal_output (1, "OUT OF MEMORY\n");
558 ret = OMPI_ERR_OUT_OF_RESOURCE;
559 goto exit;
560 }
561
562 for (l=0,i=0; i < fh->f_num_aggrs; i++ ) {
563 for ( j=0; j< (fh->f_procs_per_group+1); j++ ) {
564 reqs[l] = MPI_REQUEST_NULL;
565 l++;
566 }
567 }
568
569
570
571 if( (1 == mca_fcoll_vulcan_async_io) ||
572 ( (0 == mca_fcoll_vulcan_async_io) && (NULL != fh->f_fbtl->fbtl_ipwritev) && (2 < cycles) ) ) {
573 write_synch_type = 1;
574 }
575
576 if ( cycles > 0 ) {
577 for ( i=0; i<fh->f_num_aggrs; i++ ) {
578 ret = shuffle_init ( 0, cycles, fh->f_aggr_list[i], fh->f_rank, aggr_data[i],
579 &reqs[i*(fh->f_procs_per_group + 1)] );
580 if ( OMPI_SUCCESS != ret ) {
581 goto exit;
582 }
583 }
584
585 if (NOT_AGGR_INDEX != aggr_index) {
586 mca_common_ompio_register_progress ();
587 }
588 }
589
590 ret = ompi_request_wait_all ( (fh->f_procs_per_group + 1 )*fh->f_num_aggrs,
591 reqs, MPI_STATUS_IGNORE);
592
593 for (index = 1; index < cycles; index++) {
594 SWAP_AGGR_POINTERS(aggr_data, fh->f_num_aggrs);
595
596 if(NOT_AGGR_INDEX != aggr_index) {
597 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
598 start_write_time = MPI_Wtime();
599 #endif
600 ret = write_init (fh, fh->f_aggr_list[aggr_index], aggr_data[aggr_index],
601 write_chunksize, write_synch_type, &req_iwrite);
602 if (OMPI_SUCCESS != ret){
603 goto exit;
604 }
605 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
606 end_write_time = MPI_Wtime();
607 write_time += end_write_time - start_write_time;
608 #endif
609 }
610
611 for ( i=0; i<fh->f_num_aggrs; i++ ) {
612 ret = shuffle_init ( index, cycles, fh->f_aggr_list[i], fh->f_rank, aggr_data[i],
613 &reqs[i*(fh->f_procs_per_group + 1)] );
614 if ( OMPI_SUCCESS != ret ) {
615 goto exit;
616 }
617 }
618
619 ret = ompi_request_wait_all ( (fh->f_procs_per_group + 1 )*fh->f_num_aggrs,
620 reqs, MPI_STATUS_IGNORE);
621 if (OMPI_SUCCESS != ret){
622 goto exit;
623 }
624
625 if(NOT_AGGR_INDEX != aggr_index) {
626 ret = ompi_request_wait(&req_iwrite, MPI_STATUS_IGNORE);
627 if (OMPI_SUCCESS != ret){
628 goto exit;
629 }
630 }
631 }
632
633 if ( cycles > 0 ) {
634 SWAP_AGGR_POINTERS(aggr_data,fh->f_num_aggrs);
635
636 if(NOT_AGGR_INDEX != aggr_index) {
637 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
638 start_write_time = MPI_Wtime();
639 #endif
640 ret = write_init (fh, fh->f_aggr_list[aggr_index], aggr_data[aggr_index],
641 write_chunksize, write_synch_type, &req_iwrite);
642 if (OMPI_SUCCESS != ret){
643 goto exit;
644 }
645 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
646 end_write_time = MPI_Wtime();
647 write_time += end_write_time - start_write_time;
648 #endif
649 }
650
651 if(NOT_AGGR_INDEX != aggr_index) {
652 ret = ompi_request_wait(&req_iwrite, MPI_STATUS_IGNORE);
653 if (OMPI_SUCCESS != ret){
654 goto exit;
655 }
656 }
657 }
658
659 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
660 end_exch = MPI_Wtime();
661 exch_write += end_exch - start_exch;
662 nentry.time[0] = write_time;
663 nentry.time[1] = comm_time;
664 nentry.time[2] = exch_write;
665 nentry.aggregator = 0;
666 for ( i=0; i<fh->f_num_aggrs; i++ ) {
667 if (fh->f_aggr_list[i] == fh->f_rank)
668 nentry.aggregator = 1;
669 }
670 nentry.nprocs_for_coll = fh->f_num_aggrs;
671 if (!mca_common_ompio_full_print_queue(fh->f_coll_write_time)){
672 mca_common_ompio_register_print_entry(fh->f_coll_write_time,
673 nentry);
674 }
675 #endif
676
677
678 exit :
679
680 if ( NULL != aggr_data ) {
681
682 for ( i=0; i< fh->f_num_aggrs; i++ ) {
683 if (fh->f_aggr_list[i] == fh->f_rank) {
684 if (NULL != aggr_data[i]->recvtype){
685 for (j =0; j< aggr_data[i]->procs_per_group; j++) {
686 if ( MPI_DATATYPE_NULL != aggr_data[i]->recvtype[j] ) {
687 ompi_datatype_destroy(&aggr_data[i]->recvtype[j]);
688 }
689 if ( MPI_DATATYPE_NULL != aggr_data[i]->prev_recvtype[j] ) {
690 ompi_datatype_destroy(&aggr_data[i]->prev_recvtype[j]);
691 }
692
693 }
694 free(aggr_data[i]->recvtype);
695 free(aggr_data[i]->prev_recvtype);
696 }
697
698 free (aggr_data[i]->disp_index);
699 free (aggr_data[i]->max_disp_index);
700 free (aggr_data[i]->global_buf);
701 free (aggr_data[i]->prev_global_buf);
702 for(l=0;l<aggr_data[i]->procs_per_group;l++){
703 free (aggr_data[i]->blocklen_per_process[l]);
704 free (aggr_data[i]->displs_per_process[l]);
705 }
706
707 free (aggr_data[i]->blocklen_per_process);
708 free (aggr_data[i]->displs_per_process);
709 }
710 free (aggr_data[i]->sorted);
711 free (aggr_data[i]->global_iov_array);
712 free (aggr_data[i]->fview_count);
713 free (aggr_data[i]->decoded_iov);
714
715 free (aggr_data[i]);
716 }
717 free (aggr_data);
718 }
719 free(displs);
720 free(decoded_iov);
721 free(broken_counts);
722 free(broken_total_lengths);
723 free(broken_iov_counts);
724 free(broken_decoded_iovs);
725 if ( NULL != broken_iov_arrays ) {
726 for (i=0; i<fh->f_num_aggrs; i++ ) {
727 free(broken_iov_arrays[i]);
728 }
729 }
730 free(broken_iov_arrays);
731 free(fh->f_procs_in_group);
732 fh->f_procs_in_group=NULL;
733 fh->f_procs_per_group=0;
734 free(result_counts);
735 free(reqs);
736
737 return OMPI_SUCCESS;
738 }
739
740 static int write_init (ompio_file_t *fh,
741 int aggregator,
742 mca_io_ompio_aggregator_data *aggr_data,
743 int write_chunksize,
744 int write_synchType,
745 ompi_request_t **request )
746 {
747 int ret = OMPI_SUCCESS;
748 ssize_t ret_temp = 0;
749 int last_array_pos = 0;
750 int last_pos = 0;
751 mca_ompio_request_t *ompio_req = NULL;
752
753 mca_common_ompio_request_alloc ( &ompio_req, MCA_OMPIO_REQUEST_WRITE );
754
755 if (aggr_data->prev_num_io_entries) {
756
757
758
759
760 mca_fcoll_vulcan_split_iov_array (fh, aggr_data->prev_io_array,
761 aggr_data->prev_num_io_entries,
762 &last_array_pos, &last_pos,
763 write_chunksize);
764
765 if (1 == write_synchType) {
766 ret = fh->f_fbtl->fbtl_ipwritev(fh, (ompi_request_t *) ompio_req);
767 if(0 > ret) {
768 opal_output (1, "vulcan_write_all: fbtl_ipwritev failed\n");
769 ompio_req->req_ompi.req_status.MPI_ERROR = ret;
770 ompio_req->req_ompi.req_status._ucount = 0;
771 }
772 }
773 else {
774 ret_temp = fh->f_fbtl->fbtl_pwritev(fh);
775 if(0 > ret_temp) {
776 opal_output (1, "vulcan_write_all: fbtl_pwritev failed\n");
777 ret = ret_temp;
778 ret_temp = 0;
779 }
780
781 ompio_req->req_ompi.req_status.MPI_ERROR = ret;
782 ompio_req->req_ompi.req_status._ucount = ret_temp;
783 ompi_request_complete (&ompio_req->req_ompi, false);
784 }
785
786 free(fh->f_io_array);
787 free(aggr_data->prev_io_array);
788 }
789 else {
790 ompio_req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
791 ompio_req->req_ompi.req_status._ucount = 0;
792 ompi_request_complete (&ompio_req->req_ompi, false);
793 }
794
795 *request = (ompi_request_t *) ompio_req;
796
797 fh->f_io_array=NULL;
798 fh->f_num_of_io_entries=0;
799
800 return ret;
801 }
802
803 static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_io_ompio_aggregator_data *data,
804 ompi_request_t **reqs )
805 {
806 int bytes_sent = 0;
807 int blocks=0, temp_pindex;
808 int i, j, l, ret;
809 int entries_per_aggregator=0;
810 mca_io_ompio_local_io_array *file_offsets_for_agg=NULL;
811 int *sorted_file_offsets=NULL;
812 int temp_index=0;
813 MPI_Aint *memory_displacements=NULL;
814 int *temp_disp_index=NULL;
815 MPI_Aint global_count = 0;
816 int* blocklength_proc=NULL;
817 ptrdiff_t* displs_proc=NULL;
818
819 data->num_io_entries = 0;
820 data->bytes_sent = 0;
821 data->io_array=NULL;
822
823
824
825
826 if (aggregator == rank) {
827
828 if (NULL != data->recvtype){
829 for (i =0; i< data->procs_per_group; i++) {
830 if ( MPI_DATATYPE_NULL != data->recvtype[i] ) {
831 ompi_datatype_destroy(&data->recvtype[i]);
832 data->recvtype[i] = MPI_DATATYPE_NULL;
833 }
834 }
835 }
836
837
838 for(l=0;l<data->procs_per_group;l++){
839 data->disp_index[l] = 1;
840
841 if ( data->max_disp_index[l] == 0 ) {
842 data->blocklen_per_process[l] = (int *) calloc (INIT_LEN, sizeof(int));
843 data->displs_per_process[l] = (MPI_Aint *) calloc (INIT_LEN, sizeof(MPI_Aint));
844 if (NULL == data->displs_per_process[l] || NULL == data->blocklen_per_process[l]){
845 opal_output (1, "OUT OF MEMORY for displs\n");
846 ret = OMPI_ERR_OUT_OF_RESOURCE;
847 goto exit;
848 }
849 data->max_disp_index[l] = INIT_LEN;
850 }
851 else {
852 memset ( data->blocklen_per_process[l], 0, data->max_disp_index[l]*sizeof(int) );
853 memset ( data->displs_per_process[l], 0, data->max_disp_index[l]*sizeof(MPI_Aint) );
854 }
855 }
856 }
857
858
859
860
861 int local_cycles= ceil((double)data->total_bytes / data->bytes_per_cycle);
862 if ( index < (local_cycles -1) ) {
863 data->bytes_to_write_in_cycle = data->bytes_per_cycle;
864 }
865 else if ( index == (local_cycles -1)) {
866 data->bytes_to_write_in_cycle = data->total_bytes - data->bytes_per_cycle*index ;
867 }
868 else {
869 data->bytes_to_write_in_cycle = 0;
870 }
871 data->bytes_to_write = data->bytes_to_write_in_cycle;
872
873 #if DEBUG_ON
874 if (aggregator == rank) {
875 printf ("****%d: CYCLE %d Bytes %lld**********\n",
876 rank,
877 index,
878 data->bytes_to_write_in_cycle);
879 }
880 #endif
881
882
883
884
885 #if DEBUG_ON
886 printf("bytes_to_write_in_cycle: %ld, cycle : %d\n", data->bytes_to_write_in_cycle,
887 index);
888 #endif
889
890
891
892
893
894
895
896 while (data->bytes_to_write_in_cycle) {
897
898
899
900
901 blocks = data->fview_count[0];
902 for (j=0 ; j<data->procs_per_group ; j++) {
903 if (data->sorted[data->current_index] < blocks) {
904 data->n = j;
905 break;
906 }
907 else {
908 blocks += data->fview_count[j+1];
909 }
910 }
911
912 if (data->bytes_remaining) {
913
914
915 if (data->bytes_remaining <= data->bytes_to_write_in_cycle) {
916
917 if (aggregator == rank) {
918 data->blocklen_per_process[data->n][data->disp_index[data->n] - 1] = data->bytes_remaining;
919 data->displs_per_process[data->n][data->disp_index[data->n] - 1] =
920 (ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base +
921 (data->global_iov_array[data->sorted[data->current_index]].iov_len
922 - data->bytes_remaining);
923
924 data->disp_index[data->n] += 1;
925
926
927
928 if ( data->disp_index[data->n] == data->max_disp_index[data->n] ) {
929 data->max_disp_index[data->n] *= 2;
930
931 data->blocklen_per_process[data->n] = (int *) realloc
932 ((void *)data->blocklen_per_process[data->n],
933 (data->max_disp_index[data->n])*sizeof(int));
934 data->displs_per_process[data->n] = (MPI_Aint *) realloc
935 ((void *)data->displs_per_process[data->n],
936 (data->max_disp_index[data->n])*sizeof(MPI_Aint));
937 }
938 data->blocklen_per_process[data->n][data->disp_index[data->n]] = 0;
939 data->displs_per_process[data->n][data->disp_index[data->n]] = 0;
940
941 }
942 if (data->procs_in_group[data->n] == rank) {
943 bytes_sent += data->bytes_remaining;
944 }
945 data->current_index ++;
946 data->bytes_to_write_in_cycle -= data->bytes_remaining;
947 data->bytes_remaining = 0;
948 }
949 else {
950
951
952 if (aggregator == rank) {
953 data->blocklen_per_process[data->n][data->disp_index[data->n] - 1] = data->bytes_to_write_in_cycle;
954 data->displs_per_process[data->n][data->disp_index[data->n] - 1] =
955 (ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base +
956 (data->global_iov_array[data->sorted[data->current_index]].iov_len
957 - data->bytes_remaining);
958 }
959
960 if (data->procs_in_group[data->n] == rank) {
961 bytes_sent += data->bytes_to_write_in_cycle;
962 }
963 data->bytes_remaining -= data->bytes_to_write_in_cycle;
964 data->bytes_to_write_in_cycle = 0;
965 break;
966 }
967 }
968 else {
969
970 if (data->bytes_to_write_in_cycle <
971 (MPI_Aint) data->global_iov_array[data->sorted[data->current_index]].iov_len) {
972
973 if (aggregator == rank) {
974 data->blocklen_per_process[data->n][data->disp_index[data->n] - 1] = data->bytes_to_write_in_cycle;
975 data->displs_per_process[data->n][data->disp_index[data->n] - 1] =
976 (ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base ;
977 }
978 if (data->procs_in_group[data->n] == rank) {
979 bytes_sent += data->bytes_to_write_in_cycle;
980
981 }
982 data->bytes_remaining = data->global_iov_array[data->sorted[data->current_index]].iov_len -
983 data->bytes_to_write_in_cycle;
984 data->bytes_to_write_in_cycle = 0;
985 break;
986 }
987 else {
988
989 if (aggregator == rank) {
990 data->blocklen_per_process[data->n][data->disp_index[data->n] - 1] =
991 data->global_iov_array[data->sorted[data->current_index]].iov_len;
992 data->displs_per_process[data->n][data->disp_index[data->n] - 1] = (ptrdiff_t)
993 data->global_iov_array[data->sorted[data->current_index]].iov_base;
994
995 data->disp_index[data->n] += 1;
996
997
998
999
1000 if ( data->disp_index[data->n] == data->max_disp_index[data->n] ) {
1001 data->max_disp_index[data->n] *=2 ;
1002 data->blocklen_per_process[data->n] = (int *) realloc (
1003 (void *)data->blocklen_per_process[data->n],
1004 (data->max_disp_index[data->n]*sizeof(int)));
1005 data->displs_per_process[data->n] = (MPI_Aint *)realloc (
1006 (void *)data->displs_per_process[data->n],
1007 (data->max_disp_index[data->n]*sizeof(MPI_Aint)));
1008 }
1009 data->blocklen_per_process[data->n][data->disp_index[data->n]] = 0;
1010 data->displs_per_process[data->n][data->disp_index[data->n]] = 0;
1011 }
1012 if (data->procs_in_group[data->n] == rank) {
1013 bytes_sent += data->global_iov_array[data->sorted[data->current_index]].iov_len;
1014 }
1015 data->bytes_to_write_in_cycle -=
1016 data->global_iov_array[data->sorted[data->current_index]].iov_len;
1017 data->current_index ++;
1018 }
1019 }
1020 }
1021
1022
1023
1024
1025
1026
1027 if (aggregator == rank) {
1028 entries_per_aggregator=0;
1029 for (i=0;i<data->procs_per_group; i++){
1030 for (j=0;j<data->disp_index[i];j++){
1031 if (data->blocklen_per_process[i][j] > 0)
1032 entries_per_aggregator++ ;
1033 }
1034 }
1035
1036 #if DEBUG_ON
1037 printf("%d: cycle: %d, bytes_sent: %d\n ",rank,index,
1038 bytes_sent);
1039 printf("%d : Entries per aggregator : %d\n",rank,entries_per_aggregator);
1040 #endif
1041
1042 if (entries_per_aggregator > 0){
1043 file_offsets_for_agg = (mca_io_ompio_local_io_array *)
1044 malloc(entries_per_aggregator*sizeof(mca_io_ompio_local_io_array));
1045 if (NULL == file_offsets_for_agg) {
1046 opal_output (1, "OUT OF MEMORY\n");
1047 ret = OMPI_ERR_OUT_OF_RESOURCE;
1048 goto exit;
1049 }
1050
1051 sorted_file_offsets = (int *)
1052 malloc (entries_per_aggregator*sizeof(int));
1053 if (NULL == sorted_file_offsets){
1054 opal_output (1, "OUT OF MEMORY\n");
1055 ret = OMPI_ERR_OUT_OF_RESOURCE;
1056 goto exit;
1057 }
1058
1059
1060 temp_index = 0;
1061
1062 for (i=0;i<data->procs_per_group; i++){
1063 for(j=0;j<data->disp_index[i];j++){
1064 if (data->blocklen_per_process[i][j] > 0){
1065 file_offsets_for_agg[temp_index].length =
1066 data->blocklen_per_process[i][j];
1067 file_offsets_for_agg[temp_index].process_id = i;
1068 file_offsets_for_agg[temp_index].offset =
1069 data->displs_per_process[i][j];
1070 temp_index++;
1071
1072 #if DEBUG_ON
1073 printf("************Cycle: %d, Aggregator: %d ***************\n",
1074 index+1,rank);
1075
1076 printf("%d sends blocklen[%d]: %d, disp[%d]: %ld to %d\n",
1077 data->procs_in_group[i],j,
1078 data->blocklen_per_process[i][j],j,
1079 data->displs_per_process[i][j],
1080 rank);
1081 #endif
1082 }
1083 }
1084 }
1085
1086
1087 local_heap_sort (file_offsets_for_agg,
1088 entries_per_aggregator,
1089 sorted_file_offsets);
1090
1091
1092
1093
1094
1095 memory_displacements = (MPI_Aint *) malloc
1096 (entries_per_aggregator * sizeof(MPI_Aint));
1097
1098 memory_displacements[sorted_file_offsets[0]] = 0;
1099 for (i=1; i<entries_per_aggregator; i++){
1100 memory_displacements[sorted_file_offsets[i]] =
1101 memory_displacements[sorted_file_offsets[i-1]] +
1102 file_offsets_for_agg[sorted_file_offsets[i-1]].length;
1103 }
1104
1105 temp_disp_index = (int *)calloc (1, data->procs_per_group * sizeof (int));
1106 if (NULL == temp_disp_index) {
1107 opal_output (1, "OUT OF MEMORY\n");
1108 ret = OMPI_ERR_OUT_OF_RESOURCE;
1109 goto exit;
1110 }
1111
1112
1113 global_count = 0;
1114 for (i=0;i<entries_per_aggregator;i++){
1115 temp_pindex =
1116 file_offsets_for_agg[sorted_file_offsets[i]].process_id;
1117 data->displs_per_process[temp_pindex][temp_disp_index[temp_pindex]] =
1118 memory_displacements[sorted_file_offsets[i]];
1119 if (temp_disp_index[temp_pindex] < data->disp_index[temp_pindex])
1120 temp_disp_index[temp_pindex] += 1;
1121 else{
1122 printf("temp_disp_index[%d]: %d is greater than disp_index[%d]: %d\n",
1123 temp_pindex, temp_disp_index[temp_pindex],
1124 temp_pindex, data->disp_index[temp_pindex]);
1125 }
1126 global_count +=
1127 file_offsets_for_agg[sorted_file_offsets[i]].length;
1128 }
1129
1130 if (NULL != temp_disp_index){
1131 free(temp_disp_index);
1132 temp_disp_index = NULL;
1133 }
1134
1135 #if DEBUG_ON
1136
1137 printf("************Cycle: %d, Aggregator: %d ***************\n",
1138 index+1,rank);
1139 for (i=0;i<data->procs_per_group; i++){
1140 for(j=0;j<data->disp_index[i];j++){
1141 if (data->blocklen_per_process[i][j] > 0){
1142 printf("%d sends blocklen[%d]: %d, disp[%d]: %ld to %d\n",
1143 data->procs_in_group[i],j,
1144 data->blocklen_per_process[i][j],j,
1145 data->displs_per_process[i][j],
1146 rank);
1147
1148 }
1149 }
1150 }
1151 printf("************Cycle: %d, Aggregator: %d ***************\n",
1152 index+1,rank);
1153 for (i=0; i<entries_per_aggregator;i++){
1154 printf("%d: OFFSET: %lld LENGTH: %ld, Mem-offset: %ld\n",
1155 file_offsets_for_agg[sorted_file_offsets[i]].process_id,
1156 file_offsets_for_agg[sorted_file_offsets[i]].offset,
1157 file_offsets_for_agg[sorted_file_offsets[i]].length,
1158 memory_displacements[sorted_file_offsets[i]]);
1159 }
1160 printf("%d : global_count : %ld, bytes_sent : %d\n",
1161 rank,global_count, bytes_sent);
1162 #endif
1163
1164
1165
1166
1167
1168
1169 for (i=0;i<data->procs_per_group; i++) {
1170 size_t datatype_size;
1171 reqs[i] = MPI_REQUEST_NULL;
1172 if ( 0 < data->disp_index[i] ) {
1173 ompi_datatype_create_hindexed(data->disp_index[i],
1174 data->blocklen_per_process[i],
1175 data->displs_per_process[i],
1176 MPI_BYTE,
1177 &data->recvtype[i]);
1178 ompi_datatype_commit(&data->recvtype[i]);
1179 opal_datatype_type_size(&data->recvtype[i]->super, &datatype_size);
1180
1181 if (datatype_size){
1182 ret = MCA_PML_CALL(irecv(data->global_buf,
1183 1,
1184 data->recvtype[i],
1185 data->procs_in_group[i],
1186 FCOLL_VULCAN_SHUFFLE_TAG+index,
1187 data->comm,
1188 &reqs[i]));
1189 if (OMPI_SUCCESS != ret){
1190 goto exit;
1191 }
1192 }
1193 }
1194 }
1195 }
1196 }
1197
1198 if (bytes_sent) {
1199 size_t remaining = bytes_sent;
1200 int block_index = -1;
1201 int blocklength_size = INIT_LEN;
1202
1203 ptrdiff_t send_mem_address = 0;
1204 ompi_datatype_t *newType = MPI_DATATYPE_NULL;
1205 blocklength_proc = (int *) calloc (blocklength_size, sizeof (int));
1206 displs_proc = (ptrdiff_t *) calloc (blocklength_size, sizeof (ptrdiff_t));
1207
1208 if (NULL == blocklength_proc || NULL == displs_proc ) {
1209 opal_output (1, "OUT OF MEMORY\n");
1210 ret = OMPI_ERR_OUT_OF_RESOURCE;
1211 goto exit;
1212 }
1213
1214 while (remaining) {
1215 block_index++;
1216
1217 if(0 == block_index) {
1218 send_mem_address = (ptrdiff_t) (data->decoded_iov[data->iov_index].iov_base) +
1219 data->current_position;
1220 }
1221 else {
1222
1223 if(0 == block_index % INIT_LEN) {
1224 blocklength_size += INIT_LEN;
1225 blocklength_proc = (int *) realloc(blocklength_proc, blocklength_size * sizeof(int));
1226 displs_proc = (ptrdiff_t *) realloc(displs_proc, blocklength_size * sizeof(ptrdiff_t));
1227 }
1228 displs_proc[block_index] = (ptrdiff_t) (data->decoded_iov[data->iov_index].iov_base) +
1229 data->current_position - send_mem_address;
1230 }
1231
1232 if (remaining >=
1233 (data->decoded_iov[data->iov_index].iov_len - data->current_position)) {
1234
1235 blocklength_proc[block_index] = data->decoded_iov[data->iov_index].iov_len -
1236 data->current_position;
1237 remaining = remaining -
1238 (data->decoded_iov[data->iov_index].iov_len - data->current_position);
1239 data->iov_index = data->iov_index + 1;
1240 data->current_position = 0;
1241 }
1242 else {
1243 blocklength_proc[block_index] = remaining;
1244 data->current_position += remaining;
1245 remaining = 0;
1246 }
1247 }
1248
1249 data->total_bytes_written += bytes_sent;
1250 data->bytes_sent = bytes_sent;
1251
1252 if ( 0 <= block_index ) {
1253 ompi_datatype_create_hindexed(block_index+1,
1254 blocklength_proc,
1255 displs_proc,
1256 MPI_BYTE,
1257 &newType);
1258 ompi_datatype_commit(&newType);
1259
1260 ret = MCA_PML_CALL(isend((char *)send_mem_address,
1261 1,
1262 newType,
1263 aggregator,
1264 FCOLL_VULCAN_SHUFFLE_TAG+index,
1265 MCA_PML_BASE_SEND_STANDARD,
1266 data->comm,
1267 &reqs[data->procs_per_group]));
1268 if ( MPI_DATATYPE_NULL != newType ) {
1269 ompi_datatype_destroy(&newType);
1270 }
1271 if (OMPI_SUCCESS != ret){
1272 goto exit;
1273 }
1274 }
1275 }
1276
1277 #if DEBUG_ON
1278 if (aggregator == rank){
1279 printf("************Cycle: %d, Aggregator: %d ***************\n",
1280 index+1,rank);
1281 for (i=0 ; i<global_count/4 ; i++)
1282 printf (" RECV %d \n",((int *)data->global_buf)[i]);
1283 }
1284 #endif
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294 if (aggregator == rank && entries_per_aggregator>0) {
1295
1296
1297 data->io_array = (mca_common_ompio_io_array_t *) malloc
1298 (entries_per_aggregator * sizeof (mca_common_ompio_io_array_t));
1299 if (NULL == data->io_array) {
1300 opal_output(1, "OUT OF MEMORY\n");
1301 ret = OMPI_ERR_OUT_OF_RESOURCE;
1302 goto exit;
1303 }
1304
1305 data->num_io_entries = 0;
1306
1307 data->io_array[0].offset =
1308 (IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_file_offsets[0]].offset;
1309 data->io_array[0].length =
1310 file_offsets_for_agg[sorted_file_offsets[0]].length;
1311 data->io_array[0].memory_address =
1312 data->global_buf+memory_displacements[sorted_file_offsets[0]];
1313 data->num_io_entries++;
1314
1315 for (i=1;i<entries_per_aggregator;i++){
1316
1317
1318 if (file_offsets_for_agg[sorted_file_offsets[i-1]].offset +
1319 file_offsets_for_agg[sorted_file_offsets[i-1]].length ==
1320 file_offsets_for_agg[sorted_file_offsets[i]].offset){
1321 data->io_array[data->num_io_entries - 1].length +=
1322 file_offsets_for_agg[sorted_file_offsets[i]].length;
1323 }
1324 else {
1325 data->io_array[data->num_io_entries].offset =
1326 (IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_file_offsets[i]].offset;
1327 data->io_array[data->num_io_entries].length =
1328 file_offsets_for_agg[sorted_file_offsets[i]].length;
1329 data->io_array[data->num_io_entries].memory_address =
1330 data->global_buf+memory_displacements[sorted_file_offsets[i]];
1331 data->num_io_entries++;
1332 }
1333
1334 }
1335
1336 #if DEBUG_ON
1337 printf("*************************** %d\n", num_of_io_entries);
1338 for (i=0 ; i<num_of_io_entries ; i++) {
1339 printf(" ADDRESS: %p OFFSET: %ld LENGTH: %ld\n",
1340 io_array[i].memory_address,
1341 (ptrdiff_t)io_array[i].offset,
1342 io_array[i].length);
1343 }
1344
1345 #endif
1346 }
1347
1348 exit:
1349 free(sorted_file_offsets);
1350 free(file_offsets_for_agg);
1351 free(memory_displacements);
1352 free(blocklength_proc);
1353 free(displs_proc);
1354
1355 return OMPI_SUCCESS;
1356 }
1357
1358 static int mca_fcoll_vulcan_minmax ( ompio_file_t *fh, struct iovec *iov, int iov_count, int num_aggregators, long *new_stripe_size)
1359 {
1360 long min, max, globalmin, globalmax;
1361 long stripe_size;
1362
1363 if ( iov_count > 0 ) {
1364 min = (long) iov[0].iov_base;
1365 max = ((long) iov[iov_count-1].iov_base + (long) iov[iov_count-1].iov_len);
1366 }
1367 else {
1368 min = 0;
1369 max = 0;
1370 }
1371 fh->f_comm->c_coll->coll_allreduce ( &min, &globalmin, 1, MPI_LONG, MPI_MIN,
1372 fh->f_comm, fh->f_comm->c_coll->coll_allreduce_module);
1373
1374 fh->f_comm->c_coll->coll_allreduce ( &max, &globalmax, 1, MPI_LONG, MPI_MAX,
1375 fh->f_comm, fh->f_comm->c_coll->coll_allreduce_module);
1376
1377
1378
1379 stripe_size = (globalmax - globalmin)/num_aggregators;
1380 if ( (globalmax - globalmin) % num_aggregators ) {
1381 stripe_size++;
1382 }
1383
1384 *new_stripe_size = stripe_size;
1385
1386
1387
1388 return OMPI_SUCCESS;
1389 }
1390
1391
1392
1393 int mca_fcoll_vulcan_break_file_view ( struct iovec *mem_iov, int mem_count,
1394 struct iovec *file_iov, int file_count,
1395 struct iovec ***ret_broken_mem_iovs, int **ret_broken_mem_counts,
1396 struct iovec ***ret_broken_file_iovs, int **ret_broken_file_counts,
1397 MPI_Aint **ret_broken_total_lengths,
1398 int stripe_count, size_t stripe_size)
1399 {
1400 int i, j, ret=OMPI_SUCCESS;
1401 struct iovec **broken_mem_iovs=NULL;
1402 int *broken_mem_counts=NULL;
1403 struct iovec **broken_file_iovs=NULL;
1404 int *broken_file_counts=NULL;
1405 MPI_Aint *broken_total_lengths=NULL;
1406 int **block=NULL, **max_lengths=NULL;
1407
1408 broken_mem_iovs = (struct iovec **) malloc ( stripe_count * sizeof(struct iovec *));
1409 broken_file_iovs = (struct iovec **) malloc ( stripe_count * sizeof(struct iovec *));
1410 if ( NULL == broken_mem_iovs || NULL == broken_file_iovs ) {
1411 ret = OMPI_ERR_OUT_OF_RESOURCE;
1412 goto exit;
1413 }
1414 for ( i=0; i<stripe_count; i++ ) {
1415 broken_mem_iovs[i] = (struct iovec*) calloc (1, sizeof(struct iovec ));
1416 broken_file_iovs[i] = (struct iovec*) calloc (1, sizeof(struct iovec ));
1417 }
1418
1419 broken_mem_counts = (int *) calloc ( stripe_count, sizeof(int));
1420 broken_file_counts = (int *) calloc ( stripe_count, sizeof(int));
1421 broken_total_lengths = (MPI_Aint *) calloc ( stripe_count, sizeof(MPI_Aint));
1422 if ( NULL == broken_mem_counts || NULL == broken_file_counts ||
1423 NULL == broken_total_lengths ) {
1424 ret = OMPI_ERR_OUT_OF_RESOURCE;
1425 goto exit;
1426 }
1427
1428 block = (int **) calloc ( stripe_count, sizeof(int *));
1429 max_lengths = (int **) calloc ( stripe_count, sizeof(int *));
1430 if ( NULL == block || NULL == max_lengths ) {
1431 ret = OMPI_ERR_OUT_OF_RESOURCE;
1432 goto exit;
1433 }
1434
1435 for ( i=0; i<stripe_count; i++ ){
1436 block[i] = (int *) malloc ( 5 * sizeof(int));
1437 max_lengths[i] = (int *) malloc ( 2 * sizeof(int));
1438 if ( NULL == block[i] || NULL == max_lengths[i]) {
1439 ret = OMPI_ERR_OUT_OF_RESOURCE;
1440 goto exit;
1441 }
1442 max_lengths[i][0] = 1;
1443 max_lengths[i][1] = 1;
1444
1445 for ( j=0; j<5; j++ ) {
1446 block[i][j]=2;
1447 }
1448 }
1449
1450
1451 int owner;
1452 size_t rest, len, temp_len, blocklen, memlen=0;
1453 off_t offset, temp_offset, start_offset, memoffset=0;
1454
1455 i=j=0;
1456
1457 if ( 0 < mem_count ) {
1458 memoffset = (off_t ) mem_iov[j].iov_base;
1459 memlen = mem_iov[j].iov_len;
1460 }
1461 while ( i < file_count) {
1462 offset = (off_t) file_iov[i].iov_base;
1463 len = file_iov[i].iov_len;
1464
1465
1466 #if DEBUG_ON
1467 printf("%d:file_iov[%d].base=%ld .len=%d\n", rank, i,
1468 file_iov[i].iov_base, file_iov[i].iov_len);
1469 #endif
1470 do {
1471 owner = (offset / stripe_size ) % stripe_count;
1472 start_offset = (offset / stripe_size );
1473 rest = (start_offset + 1) * stripe_size - offset;
1474
1475 if ( len >= rest ) {
1476 blocklen = rest;
1477 temp_offset = offset+rest;
1478 temp_len = len - rest;
1479 }
1480 else {
1481 blocklen = len;
1482 temp_offset = 0;
1483 temp_len = 0;
1484 }
1485
1486 broken_file_iovs[owner][broken_file_counts[owner]].iov_base = (void *)offset;
1487 broken_file_iovs[owner][broken_file_counts[owner]].iov_len = blocklen;
1488 #if DEBUG_ON
1489 printf("%d: owner=%d b_file_iovs[%d].base=%ld .len=%d \n", rank, owner,
1490 broken_file_counts[owner],
1491 broken_file_iovs[owner][broken_file_counts[owner]].iov_base,
1492 broken_file_iovs[owner][broken_file_counts[owner]].iov_len );
1493 #endif
1494 do {
1495 if ( memlen >= blocklen ) {
1496 broken_mem_iovs[owner][broken_mem_counts[owner]].iov_base = (void *) memoffset;
1497 broken_mem_iovs[owner][broken_mem_counts[owner]].iov_len = blocklen;
1498 memoffset += blocklen;
1499 memlen -= blocklen;
1500 blocklen = 0;
1501
1502 if ( 0 == memlen ) {
1503 j++;
1504 if ( j < mem_count ) {
1505 memoffset = (off_t) mem_iov[j].iov_base;
1506 memlen = mem_iov[j].iov_len;
1507 }
1508 else
1509 break;
1510 }
1511 }
1512 else {
1513 broken_mem_iovs[owner][broken_mem_counts[owner]].iov_base = (void *) memoffset;
1514 broken_mem_iovs[owner][broken_mem_counts[owner]].iov_len = memlen;
1515 blocklen -= memlen;
1516
1517 j++;
1518 if ( j < mem_count ) {
1519 memoffset = (off_t) mem_iov[j].iov_base;
1520 memlen = mem_iov[j].iov_len;
1521 }
1522 else
1523 break;
1524 }
1525 #if DEBUG_ON
1526 printf("%d: owner=%d b_mem_iovs[%d].base=%ld .len=%d\n", rank, owner,
1527 broken_mem_counts[owner],
1528 broken_mem_iovs[owner][broken_mem_counts[owner]].iov_base,
1529 broken_mem_iovs[owner][broken_mem_counts[owner]].iov_len);
1530 #endif
1531
1532 broken_mem_counts[owner]++;
1533 if ( broken_mem_counts[owner] >= max_lengths[owner][0] ) {
1534 broken_mem_iovs[owner] = (struct iovec*) realloc ( broken_mem_iovs[owner],
1535 mem_count * block[owner][0] *
1536 sizeof(struct iovec ));
1537 max_lengths[owner][0] = mem_count * block[owner][0];
1538 block[owner][0]++;
1539 }
1540
1541 } while ( blocklen > 0 );
1542
1543 broken_file_counts[owner]++;
1544 if ( broken_file_counts[owner] >= max_lengths[owner][1] ) {
1545 broken_file_iovs[owner] = (struct iovec*) realloc ( broken_file_iovs[owner],
1546 file_count * block[owner][1] *
1547 sizeof(struct iovec ));
1548 max_lengths[owner][1] = file_count * block[owner][1];
1549 block[owner][1]++;
1550 }
1551
1552 offset = temp_offset;
1553 len = temp_len;
1554 } while( temp_len > 0 );
1555
1556 i++;
1557 }
1558
1559
1560
1561 for ( i=0; i< stripe_count; i++ ) {
1562 for ( j=0; j<broken_file_counts[i]; j++ ) {
1563 broken_total_lengths[i] += broken_file_iovs[i][j].iov_len;
1564 }
1565 #if DEBUG_ON
1566 printf("%d: broken_total_lengths[%d] = %d\n", rank, i, broken_total_lengths[i]);
1567 #endif
1568 }
1569
1570 *ret_broken_mem_iovs = broken_mem_iovs;
1571 *ret_broken_mem_counts = broken_mem_counts;
1572 *ret_broken_file_iovs = broken_file_iovs;
1573 *ret_broken_file_counts = broken_file_counts;
1574 *ret_broken_total_lengths = broken_total_lengths;
1575
1576 if ( NULL != block) {
1577 for ( i=0; i<stripe_count; i++ ){
1578 free (block[i] );
1579 }
1580 free ( block);
1581 }
1582 if ( NULL != max_lengths) {
1583 for ( i=0; i<stripe_count; i++ ){
1584 free (max_lengths[i] );
1585 }
1586 free ( max_lengths);
1587 }
1588
1589 return ret;
1590
1591 exit:
1592 free ( broken_mem_iovs);
1593 free ( broken_mem_counts);
1594 free ( broken_file_iovs );
1595 free ( broken_file_counts);
1596 free ( broken_total_lengths);
1597
1598 if ( NULL != block) {
1599 for ( i=0; i<stripe_count; i++ ){
1600 free (block[i] );
1601 }
1602 free ( block);
1603 }
1604 if ( NULL != max_lengths) {
1605 for ( i=0; i<stripe_count; i++ ){
1606 free (max_lengths[i] );
1607 }
1608 free ( max_lengths);
1609 }
1610
1611 *ret_broken_mem_iovs = NULL;
1612 *ret_broken_mem_counts = NULL;
1613 *ret_broken_file_iovs = NULL;
1614 *ret_broken_file_counts = NULL;
1615 *ret_broken_total_lengths = NULL;
1616
1617 return ret;
1618 }
1619
1620
1621 int mca_fcoll_vulcan_get_configuration (ompio_file_t *fh, int num_io_procs, int num_groups,
1622 size_t max_data)
1623 {
1624 int i, ret;
1625 ret = mca_common_ompio_set_aggregator_props (fh, num_io_procs, max_data);
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636 fh->f_procs_per_group = fh->f_size;
1637 if ( NULL != fh->f_procs_in_group ) {
1638 free ( fh->f_procs_in_group );
1639 }
1640 fh->f_procs_in_group = (int *) malloc ( sizeof(int) * fh->f_size );
1641 if ( NULL == fh->f_procs_in_group) {
1642 return OMPI_ERR_OUT_OF_RESOURCE;
1643 }
1644 for (i=0; i<fh->f_size; i++ ) {
1645 fh->f_procs_in_group[i]=i;
1646 }
1647
1648 return ret;
1649 }
1650
1651
1652 int mca_fcoll_vulcan_split_iov_array ( ompio_file_t *fh, mca_common_ompio_io_array_t *io_array, int num_entries,
1653 int *ret_array_pos, int *ret_pos, int chunk_size )
1654 {
1655
1656 int array_pos = *ret_array_pos;
1657 int pos = *ret_pos;
1658 size_t bytes_written = 0;
1659 size_t bytes_to_write = chunk_size;
1660
1661 if ( 0 == array_pos && 0 == pos ) {
1662 fh->f_io_array = (mca_common_ompio_io_array_t *) malloc ( num_entries * sizeof(mca_common_ompio_io_array_t));
1663 if ( NULL == fh->f_io_array ){
1664 opal_output (1,"Could not allocate memory\n");
1665 return -1;
1666 }
1667 }
1668
1669 int i=0;
1670 while (bytes_to_write > 0 ) {
1671 fh->f_io_array[i].memory_address = &(((char *)io_array[array_pos].memory_address)[pos]);
1672 fh->f_io_array[i].offset = &(((char *)io_array[array_pos].offset)[pos]);
1673
1674 if ( (io_array[array_pos].length - pos ) >= bytes_to_write ) {
1675 fh->f_io_array[i].length = bytes_to_write;
1676 }
1677 else {
1678 fh->f_io_array[i].length = io_array[array_pos].length - pos;
1679 }
1680
1681 pos += fh->f_io_array[i].length;
1682 bytes_written += fh->f_io_array[i].length;
1683 bytes_to_write-= fh->f_io_array[i].length;
1684 i++;
1685
1686 if ( pos == (int)io_array[array_pos].length ) {
1687 pos = 0;
1688 if ((array_pos + 1) < num_entries) {
1689 array_pos++;
1690 }
1691 else {
1692 break;
1693 }
1694 }
1695 }
1696
1697 fh->f_num_of_io_entries = i;
1698 *ret_array_pos = array_pos;
1699 *ret_pos = pos;
1700 return bytes_written;
1701 }
1702
1703
1704 static int local_heap_sort (mca_io_ompio_local_io_array *io_array,
1705 int num_entries,
1706 int *sorted)
1707 {
1708 int i = 0;
1709 int j = 0;
1710 int left = 0;
1711 int right = 0;
1712 int largest = 0;
1713 int heap_size = num_entries - 1;
1714 int temp = 0;
1715 unsigned char done = 0;
1716 int* temp_arr = NULL;
1717
1718 temp_arr = (int*)malloc(num_entries*sizeof(int));
1719 if (NULL == temp_arr) {
1720 opal_output (1, "OUT OF MEMORY\n");
1721 return OMPI_ERR_OUT_OF_RESOURCE;
1722 }
1723 temp_arr[0] = 0;
1724 for (i = 1; i < num_entries; ++i) {
1725 temp_arr[i] = i;
1726 }
1727
1728 for (i = num_entries/2-1 ; i>=0 ; i--) {
1729 done = 0;
1730 j = i;
1731 largest = j;
1732
1733 while (!done) {
1734 left = j*2+1;
1735 right = j*2+2;
1736 if ((left <= heap_size) &&
1737 (io_array[temp_arr[left]].offset > io_array[temp_arr[j]].offset)) {
1738 largest = left;
1739 }
1740 else {
1741 largest = j;
1742 }
1743 if ((right <= heap_size) &&
1744 (io_array[temp_arr[right]].offset >
1745 io_array[temp_arr[largest]].offset)) {
1746 largest = right;
1747 }
1748 if (largest != j) {
1749 temp = temp_arr[largest];
1750 temp_arr[largest] = temp_arr[j];
1751 temp_arr[j] = temp;
1752 j = largest;
1753 }
1754 else {
1755 done = 1;
1756 }
1757 }
1758 }
1759
1760 for (i = num_entries-1; i >=1; --i) {
1761 temp = temp_arr[0];
1762 temp_arr[0] = temp_arr[i];
1763 temp_arr[i] = temp;
1764 heap_size--;
1765 done = 0;
1766 j = 0;
1767 largest = j;
1768
1769 while (!done) {
1770 left = j*2+1;
1771 right = j*2+2;
1772
1773 if ((left <= heap_size) &&
1774 (io_array[temp_arr[left]].offset >
1775 io_array[temp_arr[j]].offset)) {
1776 largest = left;
1777 }
1778 else {
1779 largest = j;
1780 }
1781 if ((right <= heap_size) &&
1782 (io_array[temp_arr[right]].offset >
1783 io_array[temp_arr[largest]].offset)) {
1784 largest = right;
1785 }
1786 if (largest != j) {
1787 temp = temp_arr[largest];
1788 temp_arr[largest] = temp_arr[j];
1789 temp_arr[j] = temp;
1790 j = largest;
1791 }
1792 else {
1793 done = 1;
1794 }
1795 }
1796 sorted[i] = temp_arr[i];
1797 }
1798 sorted[0] = temp_arr[0];
1799
1800 if (NULL != temp_arr) {
1801 free(temp_arr);
1802 temp_arr = NULL;
1803 }
1804 return OMPI_SUCCESS;
1805 }
1806
1807