This source file includes following definitions.
- ompi_io_ompio_generate_current_file_view
- ompi_io_ompio_sort_offlen
- mca_io_ompio_get_mca_parameter_value
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 #include "ompi_config.h"
26
27 #include "ompi/runtime/params.h"
28 #include "ompi/communicator/communicator.h"
29 #include "ompi/mca/pml/pml.h"
30 #include "ompi/mca/topo/topo.h"
31 #include "ompi/mca/fcoll/base/fcoll_base_coll_array.h"
32 #include "opal/datatype/opal_convertor.h"
33 #include "opal/datatype/opal_datatype.h"
34 #include "ompi/datatype/ompi_datatype.h"
35 #include "ompi/info/info.h"
36 #include "ompi/request/request.h"
37
38 #include <math.h>
39 #include <unistd.h>
40
41 #include "io_ompio.h"
42
43
44 int ompi_io_ompio_generate_current_file_view (struct ompio_file_t *fh,
45 size_t max_data,
46 struct iovec **f_iov,
47 int *iov_count)
48 {
49
50 struct iovec *iov = NULL;
51 size_t bytes_to_write;
52 size_t sum_previous_counts = 0;
53 int j, k;
54 int block = 1;
55
56
57 iov = (struct iovec *) calloc
58 (OMPIO_IOVEC_INITIAL_SIZE, sizeof (struct iovec));
59 if (NULL == iov) {
60 opal_output(1, "OUT OF MEMORY\n");
61 return OMPI_ERR_OUT_OF_RESOURCE;
62 }
63
64 sum_previous_counts = fh->f_position_in_file_view;
65 j = fh->f_index_in_file_view;
66 bytes_to_write = max_data;
67 k = 0;
68
69 while (bytes_to_write) {
70 ptrdiff_t disp;
71
72 if (OMPIO_IOVEC_INITIAL_SIZE*block <= k) {
73 block ++;
74 iov = (struct iovec *)realloc
75 (iov, OMPIO_IOVEC_INITIAL_SIZE *block *sizeof(struct iovec));
76 if (NULL == iov) {
77 opal_output(1, "OUT OF MEMORY\n");
78 return OMPI_ERR_OUT_OF_RESOURCE;
79 }
80 }
81
82 if (fh->f_decoded_iov[j].iov_len -
83 (fh->f_total_bytes - sum_previous_counts) <= 0) {
84 sum_previous_counts += fh->f_decoded_iov[j].iov_len;
85 j = j + 1;
86 if (j == (int)fh->f_iov_count) {
87 j = 0;
88 sum_previous_counts = 0;
89 fh->f_offset += fh->f_view_extent;
90 fh->f_position_in_file_view = sum_previous_counts;
91 fh->f_index_in_file_view = j;
92 fh->f_total_bytes = 0;
93 }
94 }
95
96 disp = (ptrdiff_t)(fh->f_decoded_iov[j].iov_base) +
97 (fh->f_total_bytes - sum_previous_counts);
98 iov[k].iov_base = (IOVBASE_TYPE *)(intptr_t)(disp + fh->f_offset);
99
100 if ((fh->f_decoded_iov[j].iov_len -
101 (fh->f_total_bytes - sum_previous_counts))
102 >= bytes_to_write) {
103 iov[k].iov_len = bytes_to_write;
104 }
105 else {
106 iov[k].iov_len = fh->f_decoded_iov[j].iov_len -
107 (fh->f_total_bytes - sum_previous_counts);
108 }
109
110 fh->f_total_bytes += iov[k].iov_len;
111 bytes_to_write -= iov[k].iov_len;
112 k = k + 1;
113 }
114 fh->f_position_in_file_view = sum_previous_counts;
115 fh->f_index_in_file_view = j;
116 *iov_count = k;
117 *f_iov = iov;
118
119 if (mca_io_ompio_record_offset_info){
120
121 int tot_entries=0, *recvcounts=NULL, *displs=NULL;
122 mca_io_ompio_offlen_array_t *per_process=NULL;
123 mca_io_ompio_offlen_array_t *all_process=NULL;
124 int *sorted=NULL, *column_list=NULL, *values=NULL;
125 int *row_index=NULL, i=0, l=0, m=0;
126 int column_index=0, r_index=0;
127 int blocklen[3] = {1, 1, 1};
128 ptrdiff_t d[3], base;
129 ompi_datatype_t *types[3];
130 ompi_datatype_t *io_array_type=MPI_DATATYPE_NULL;
131 int **adj_matrix=NULL;
132 FILE *fp;
133
134
135 recvcounts = (int *) malloc (fh->f_size * sizeof(int));
136 if (NULL == recvcounts){
137 return OMPI_ERR_OUT_OF_RESOURCE;
138 }
139 displs = (int *) malloc (fh->f_size * sizeof(int));
140 if (NULL == displs){
141 free(recvcounts);
142 return OMPI_ERR_OUT_OF_RESOURCE;
143 }
144
145 fh->f_comm->c_coll->coll_gather (&k,
146 1,
147 MPI_INT,
148 recvcounts,
149 1,
150 MPI_INT,
151 OMPIO_ROOT,
152 fh->f_comm,
153 fh->f_comm->c_coll->coll_gather_module);
154
155 per_process = (mca_io_ompio_offlen_array_t *)
156 malloc (k * sizeof(mca_io_ompio_offlen_array_t));
157 if (NULL == per_process){
158 opal_output(1,"Error while allocating per process!\n");
159 free(recvcounts);
160 free(displs);
161 return OMPI_ERR_OUT_OF_RESOURCE;
162 }
163 for (i=0;i<k;i++){
164 per_process[i].offset =
165 (OMPI_MPI_OFFSET_TYPE)(intptr_t)iov[i].iov_base;
166 per_process[i].length =
167 (MPI_Aint)iov[i].iov_len;
168 per_process[i].process_id = fh->f_rank;
169 }
170
171 types[0] = &ompi_mpi_long.dt;
172 types[1] = &ompi_mpi_long.dt;
173 types[2] = &ompi_mpi_int.dt;
174
175 d[0] = (ptrdiff_t)&per_process[0];
176 d[1] = (ptrdiff_t)&per_process[0].length;
177 d[2] = (ptrdiff_t)&per_process[0].process_id;
178 base = d[0];
179 for (i=0;i<3;i++){
180 d[i] -= base;
181 }
182 ompi_datatype_create_struct (3,
183 blocklen,
184 d,
185 types,
186 &io_array_type);
187 ompi_datatype_commit (&io_array_type);
188
189 if (OMPIO_ROOT == fh->f_rank){
190 tot_entries = recvcounts[0];
191 displs[0] = 0;
192 for(i=1;i<fh->f_size;i++){
193 displs[i] = displs[i-1] + recvcounts[i-1];
194 tot_entries += recvcounts[i];
195 }
196 all_process = (mca_io_ompio_offlen_array_t *)
197 malloc (tot_entries * sizeof(mca_io_ompio_offlen_array_t));
198 if (NULL == all_process){
199 opal_output(1,"Error while allocating per process!\n");
200 free(per_process);
201 free(recvcounts);
202 free(displs);
203 return OMPI_ERR_OUT_OF_RESOURCE;
204 }
205
206 sorted = (int *) malloc
207 (tot_entries * sizeof(int));
208 if (NULL == sorted){
209 opal_output(1,"Error while allocating per process!\n");
210 free(all_process);
211 free(per_process);
212 free(recvcounts);
213 free(displs);
214 return OMPI_ERR_OUT_OF_RESOURCE;
215 }
216
217 adj_matrix = (int **) malloc (fh->f_size *
218 sizeof(int *));
219 if (NULL == adj_matrix) {
220 opal_output(1,"Error while allocating per process!\n");
221 free(sorted);
222 free(all_process);
223 free(per_process);
224 free(recvcounts);
225 free(displs);
226 return OMPI_ERR_OUT_OF_RESOURCE;
227 }
228 for (i=0;i<fh->f_size;i++){
229 adj_matrix[i] = (int *) malloc (fh->f_size *
230 sizeof (int ));
231 if (NULL == adj_matrix[i]) {
232 for (j=0; j<i; j++) {
233 free(adj_matrix[j]);
234 }
235 free(adj_matrix);
236 free(sorted);
237 free(all_process);
238 free(per_process);
239 free(recvcounts);
240 free(displs);
241 return OMPI_ERR_OUT_OF_RESOURCE;
242 }
243 }
244
245 for (i=0;i<fh->f_size;i++){
246 for (j=0;j<fh->f_size;j++){
247 adj_matrix[i][j] = 0;
248 }
249 }
250 }
251 fh->f_comm->c_coll->coll_gatherv (per_process,
252 k,
253 io_array_type,
254 all_process,
255 recvcounts,
256 displs,
257 io_array_type,
258 OMPIO_ROOT,
259 fh->f_comm,
260 fh->f_comm->c_coll->coll_gatherv_module);
261
262 ompi_datatype_destroy(&io_array_type);
263
264 if (OMPIO_ROOT == fh->f_rank){
265
266 ompi_io_ompio_sort_offlen(all_process,
267 tot_entries,
268 sorted);
269
270 for (i=0;i<tot_entries-1;i++){
271 j = all_process[sorted[i]].process_id;
272 l = all_process[sorted[i+1]].process_id;
273 adj_matrix[j][l] += 1;
274 adj_matrix[l][j] += 1;
275 }
276
277
278 m = 0;
279 for (i=0; i<fh->f_size; i++){
280 for (j=0; j<fh->f_size; j++){
281 if (adj_matrix[i][j] > 0){
282 m++;
283 }
284 }
285 }
286 fp = fopen("fileview_info.out", "w+");
287 if ( NULL == fp ) {
288 for (i=0; i<fh->f_size; i++) {
289 free(adj_matrix[i]);
290 }
291 free(adj_matrix);
292 free(sorted);
293 free(all_process);
294 free(per_process);
295 free(recvcounts);
296 free(displs);
297 return MPI_ERR_OTHER;
298 }
299 fprintf(fp,"FILEVIEW\n");
300 column_list = (int *) malloc ( m * sizeof(int));
301 if (NULL == column_list){
302 opal_output(1,"Error while allocating column list\n");
303 fclose(fp);
304 for (i=0; i<fh->f_size; i++) {
305 free(adj_matrix[i]);
306 }
307 free(adj_matrix);
308 free(sorted);
309 free(all_process);
310 free(per_process);
311 free(recvcounts);
312 free(displs);
313 return OMPI_ERR_OUT_OF_RESOURCE;
314 }
315 values = (int *) malloc ( m * sizeof(int));
316 if (NULL == values){
317 opal_output(1,"Error while allocating values list\n");
318 fclose(fp);
319 for (i=0; i<fh->f_size; i++) {
320 free(adj_matrix[i]);
321 }
322 free(adj_matrix);
323 free(column_list);
324 free(sorted);
325 free(all_process);
326 free(per_process);
327 free(recvcounts);
328 free(displs);
329 return OMPI_ERR_OUT_OF_RESOURCE;
330 }
331
332 row_index = (int *) malloc ((fh->f_size + 1) *
333 sizeof(int));
334 if (NULL == row_index){
335 opal_output(1,"Error while allocating row_index list\n");
336 fclose(fp);
337 for (i=0; i<fh->f_size; i++) {
338 free(adj_matrix[i]);
339 }
340 free(adj_matrix);
341 free(values);
342 free(column_list);
343 free(sorted);
344 free(all_process);
345 free(per_process);
346 free(recvcounts);
347 free(displs);
348 return OMPI_ERR_OUT_OF_RESOURCE;
349 }
350 fprintf(fp,"%d %d\n", m, fh->f_size+1);
351 column_index = 0;
352 r_index = 1;
353 row_index[0] = r_index;
354 for (i=0; i<fh->f_size; i++){
355 for (j=0; j<fh->f_size; j++){
356 if (adj_matrix[i][j] > 0){
357 values[column_index]= adj_matrix[i][j];
358 column_list[column_index]= j;
359 fprintf(fp,"%d ", column_list[column_index]);
360 column_index++;
361 r_index++;
362 }
363
364 }
365 row_index[i+1]= r_index;
366 }
367
368 fprintf(fp,"\n");
369 for (i=0; i<m;i++){
370 fprintf(fp, "%d ", values[i]);
371 }
372 fprintf(fp, "\n");
373 for (i=0; i< (fh->f_size + 1); i++){
374 fprintf(fp, "%d ", row_index[i]);
375 }
376 fprintf(fp, "\n");
377 fclose(fp);
378
379 if (NULL != recvcounts){
380 free(recvcounts);
381 recvcounts = NULL;
382 }
383 if (NULL != displs){
384 free(displs);
385 displs = NULL;
386 }
387 if (NULL != sorted){
388 free(sorted);
389 sorted = NULL;
390 }
391 if (NULL != per_process){
392 free(per_process);
393 per_process = NULL;
394 }
395 if (NULL != all_process){
396 free(all_process);
397 all_process = NULL;
398 }
399 free(column_list);
400 free(values);
401 if (NULL != row_index){
402 free(row_index);
403 row_index = NULL;
404 }
405 if (NULL != adj_matrix){
406 for (i=0;i<fh->f_size;i++){
407 free(adj_matrix[i]);
408 }
409 free(adj_matrix);
410 adj_matrix = NULL;
411 }
412 }
413 }
414 return OMPI_SUCCESS;
415 }
416
417
418 int ompi_io_ompio_sort_offlen (mca_io_ompio_offlen_array_t *io_array,
419 int num_entries,
420 int *sorted){
421
422 int i = 0;
423 int j = 0;
424 int left = 0;
425 int right = 0;
426 int largest = 0;
427 int heap_size = num_entries - 1;
428 int temp = 0;
429 unsigned char done = 0;
430 int* temp_arr = NULL;
431
432 temp_arr = (int*)malloc(num_entries*sizeof(int));
433 if (NULL == temp_arr) {
434 opal_output (1, "OUT OF MEMORY\n");
435 return OMPI_ERR_OUT_OF_RESOURCE;
436 }
437 temp_arr[0] = 0;
438 for (i = 1; i < num_entries; ++i) {
439 temp_arr[i] = i;
440 }
441
442 for (i = num_entries/2-1 ; i>=0 ; i--) {
443 done = 0;
444 j = i;
445 largest = j;
446
447 while (!done) {
448 left = j*2+1;
449 right = j*2+2;
450 if ((left <= heap_size) &&
451 (io_array[temp_arr[left]].offset > io_array[temp_arr[j]].offset)) {
452 largest = left;
453 }
454 else {
455 largest = j;
456 }
457 if ((right <= heap_size) &&
458 (io_array[temp_arr[right]].offset >
459 io_array[temp_arr[largest]].offset)) {
460 largest = right;
461 }
462 if (largest != j) {
463 temp = temp_arr[largest];
464 temp_arr[largest] = temp_arr[j];
465 temp_arr[j] = temp;
466 j = largest;
467 }
468 else {
469 done = 1;
470 }
471 }
472 }
473
474 for (i = num_entries-1; i >=1; --i) {
475 temp = temp_arr[0];
476 temp_arr[0] = temp_arr[i];
477 temp_arr[i] = temp;
478 heap_size--;
479 done = 0;
480 j = 0;
481 largest = j;
482
483 while (!done) {
484 left = j*2+1;
485 right = j*2+2;
486
487 if ((left <= heap_size) &&
488 (io_array[temp_arr[left]].offset >
489 io_array[temp_arr[j]].offset)) {
490 largest = left;
491 }
492 else {
493 largest = j;
494 }
495 if ((right <= heap_size) &&
496 (io_array[temp_arr[right]].offset >
497 io_array[temp_arr[largest]].offset)) {
498 largest = right;
499 }
500 if (largest != j) {
501 temp = temp_arr[largest];
502 temp_arr[largest] = temp_arr[j];
503 temp_arr[j] = temp;
504 j = largest;
505 }
506 else {
507 done = 1;
508 }
509 }
510 sorted[i] = temp_arr[i];
511 }
512 sorted[0] = temp_arr[0];
513
514 if (NULL != temp_arr) {
515 free(temp_arr);
516 temp_arr = NULL;
517 }
518 return OMPI_SUCCESS;
519 }
520
521
522 int mca_io_ompio_get_mca_parameter_value ( char *mca_parameter_name, int name_length )
523 {
524 if ( !strncmp ( mca_parameter_name, "verbose_info_parsing", name_length )) {
525 return mca_io_ompio_verbose_info_parsing;
526 }
527 else if ( !strncmp ( mca_parameter_name, "num_aggregators", name_length )) {
528 return mca_io_ompio_num_aggregators;
529 }
530 else if ( !strncmp ( mca_parameter_name, "bytes_per_agg", name_length )) {
531 return mca_io_ompio_bytes_per_agg;
532 }
533 else if ( !strncmp ( mca_parameter_name, "overwrite_amode", name_length )) {
534 return mca_io_ompio_overwrite_amode;
535 }
536 else if ( !strncmp ( mca_parameter_name, "cycle_buffer_size", name_length )) {
537 return mca_io_ompio_cycle_buffer_size;
538 }
539 else if ( !strncmp ( mca_parameter_name, "max_aggregators_ratio", name_length )) {
540 return mca_io_ompio_max_aggregators_ratio;
541 }
542 else if ( !strncmp ( mca_parameter_name, "aggregators_cutoff_threshold", name_length )) {
543 return mca_io_ompio_aggregators_cutoff_threshold;
544 }
545 else if ( !strncmp ( mca_parameter_name, "grouping_option", name_length )) {
546 return mca_io_ompio_grouping_option;
547 }
548 else if ( !strncmp ( mca_parameter_name, "coll_timing_info", name_length )) {
549 return mca_io_ompio_coll_timing_info;
550 }
551 else {
552 opal_output (1, "Error in mca_io_ompio_get_mca_parameter_value: unknown parameter name");
553 }
554
555
556
557
558
559 return OMPI_ERR_MAX;
560 }
561
562
563
564
565