This source file includes following definitions.
- Usage
- fatal_error
- print_hints
- fill_buffer
- get_offset
- write_file
- reduce_corruptions
- read_file
- set_hints
- main
1
2
3
4
5
6
7
8
9
10 #define _XOPEN_SOURCE 500
11 #include <unistd.h>
12 #include <stdlib.h>
13 #include <mpi.h>
14 #include <stdio.h>
15 #include <string.h>
16
17 #define NUM_OBJS 4
18 #define OBJ_SIZE 1048576
19
20 extern char *optarg;
21 extern int optind, opterr, optopt;
22
23
24 char *prog = NULL;
25 int debug = 0;
26
27 static void
28 Usage( int line ) {
29 int rank;
30 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
31 if ( rank == 0 ) {
32 fprintf( stderr,
33 "Usage (line %d): %s [-d] [-h] -f filename\n"
34 "\t-d for debugging\n"
35 "\t-h to turn on the hints to force collective aggregation\n",
36 line, prog );
37 }
38 exit( 0 );
39 }
40
41 static void
42 fatal_error( int mpi_ret, MPI_Status *mpi_stat, const char *msg ) {
43 fprintf( stderr, "Fatal error %s: %d\n", msg, mpi_ret );
44 MPI_Abort( MPI_COMM_WORLD, -1 );
45 }
46
47 static void
48 print_hints( int rank, MPI_File *mfh ) {
49 MPI_Info info;
50 int nkeys;
51 int i, dummy_int;
52 char key[1024];
53 char value[1024];
54
55 MPI_Barrier( MPI_COMM_WORLD );
56 if ( rank == 0 ) {
57 MPI_File_get_info( *mfh, &info );
58 MPI_Info_get_nkeys( info, &nkeys );
59
60 printf( "HINTS:\n" );
61 for( i = 0; i < nkeys; i++ ) {
62 MPI_Info_get_nthkey( info, i, key );
63 printf( "%35s -> ", key );
64 MPI_Info_get( info, key, 1024, value, &dummy_int );
65 printf( "%s\n", value );
66 }
67 MPI_Info_free(&info);
68 }
69 MPI_Barrier( MPI_COMM_WORLD );
70 }
71
72 static void
73 fill_buffer( char *buffer, int bufsize, int rank, MPI_Offset offset ) {
74 memset( (void*)buffer, 0, bufsize );
75 snprintf( buffer, bufsize, "Hello from %d at %lld\n", rank, offset );
76 }
77
78 static MPI_Offset
79 get_offset( int rank, int num_objs, int obj_size, int which_obj ) {
80 MPI_Offset offset;
81 offset = (MPI_Offset)rank * num_objs * obj_size + which_obj * obj_size;
82 return offset;
83 }
84
85 static void
86 write_file( char *target, int rank, MPI_Info *info ) {
87 MPI_File wfh;
88 MPI_Status mpi_stat;
89 int mpi_ret;
90 int i;
91 char *buffer;
92
93 buffer = malloc(OBJ_SIZE);
94
95 if ( debug ) printf( "%d writing file %s\n", rank, target );
96
97 if( (mpi_ret = MPI_File_open(MPI_COMM_WORLD, target,
98 MPI_MODE_WRONLY | MPI_MODE_CREATE, *info, &wfh ) )
99 != MPI_SUCCESS )
100 {
101 fatal_error( mpi_ret, NULL, "open for write" );
102 }
103
104 for( i = 0; i < NUM_OBJS; i++ ) {
105 MPI_Offset offset = get_offset( rank, NUM_OBJS, OBJ_SIZE, i );
106 fill_buffer( buffer, OBJ_SIZE, rank, offset );
107 if ( debug ) printf( "%s", buffer );
108 if ( (mpi_ret = MPI_File_write_at_all( wfh, offset, buffer, OBJ_SIZE,
109 MPI_CHAR, &mpi_stat ) ) != MPI_SUCCESS )
110 {
111 fatal_error( mpi_ret, &mpi_stat, "write" );
112 }
113 }
114
115 if ( debug ) print_hints( rank, &wfh );
116
117 if( (mpi_ret = MPI_File_close( &wfh ) ) != MPI_SUCCESS ) {
118 fatal_error( mpi_ret, NULL, "close for write" );
119 }
120 if ( debug ) printf( "%d wrote file %s\n", rank, target );
121 free(buffer);
122 }
123
124 static int
125 reduce_corruptions( int corrupt_blocks ) {
126 int mpi_ret;
127 int sum;
128 if ( ( mpi_ret = MPI_Reduce( &corrupt_blocks, &sum, 1,
129 MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD ) ) != MPI_SUCCESS )
130 {
131 fatal_error( mpi_ret, NULL, "MPI_Reduce" );
132 }
133 return sum;
134 }
135
136 static void
137 read_file( char *target, int rank, MPI_Info *info, int *corrupt_blocks ) {
138 MPI_File rfh;
139 MPI_Status mpi_stat;
140 int mpi_ret;
141 int i;
142 char *buffer;
143 char *verify_buf = NULL;
144 buffer = malloc(OBJ_SIZE);
145 verify_buf = (char *)malloc(OBJ_SIZE);
146
147 if ( debug ) printf( "%d reading file %s\n", rank, target );
148
149 if( (mpi_ret = MPI_File_open(MPI_COMM_WORLD, target,
150 MPI_MODE_RDONLY, *info, &rfh ) ) != MPI_SUCCESS )
151 {
152 fatal_error( mpi_ret, NULL, "open for read" );
153 }
154
155 for( i = 0; i < NUM_OBJS; i++ ) {
156 MPI_Offset offset = get_offset( rank, NUM_OBJS, OBJ_SIZE, i );
157 fill_buffer( verify_buf, OBJ_SIZE, rank, offset );
158 if ( debug ) printf( "Expecting %s", buffer );
159 if ( (mpi_ret = MPI_File_read_at_all( rfh, offset, buffer, OBJ_SIZE,
160 MPI_CHAR, &mpi_stat ) ) != MPI_SUCCESS )
161 {
162 fatal_error( mpi_ret, &mpi_stat, "read" );
163 }
164 if ( memcmp( verify_buf, buffer, OBJ_SIZE ) != 0 ) {
165 (*corrupt_blocks)++;
166 printf( "Corruption at %lld\n", offset );
167 if ( debug ) {
168 printf( "\tExpecting %s\n"
169 "\tRecieved %s\n",
170 verify_buf, buffer );
171 }
172 }
173 }
174
175 if( (mpi_ret = MPI_File_close( &rfh ) ) != MPI_SUCCESS ) {
176 fatal_error( mpi_ret, NULL, "close for read" );
177 }
178 free (buffer);
179 free(verify_buf);
180
181 }
182
183 static void
184 set_hints( MPI_Info *info ) {
185 MPI_Info_set( *info, "romio_cb_write", "enable" );
186 MPI_Info_set( *info, "romio_no_indep_rw", "1" );
187 MPI_Info_set( *info, "cb_nodes", "1" );
188 MPI_Info_set( *info, "cb_buffer_size", "4194304" );
189 }
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211 int
212 main( int argc, char *argv[] ) {
213 int nproc = 1, rank = 0;
214 char *target = NULL;
215 int c;
216 MPI_Info info;
217 int mpi_ret;
218 int corrupt_blocks = 0;
219
220 MPI_Init( &argc, &argv );
221 MPI_Comm_size(MPI_COMM_WORLD, &nproc);
222 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
223
224 if( (mpi_ret = MPI_Info_create(&info)) != MPI_SUCCESS) {
225 if(rank == 0) fatal_error( mpi_ret, NULL, "MPI_info_create.\n");
226 }
227
228 prog = strdup( argv[0] );
229
230 while( ( c = getopt( argc, argv, "df:h" ) ) != EOF ) {
231 switch( c ) {
232 case 'd':
233 debug = 1;
234 break;
235 case 'f':
236 target = strdup( optarg );
237 break;
238 case 'h':
239 set_hints( &info );
240 break;
241 default:
242 Usage( __LINE__ );
243 }
244 }
245 if ( ! target ) {
246 Usage( __LINE__ );
247 }
248
249 write_file( target, rank, &info );
250 read_file( target, rank, &info, &corrupt_blocks );
251
252 corrupt_blocks = reduce_corruptions( corrupt_blocks );
253 if ( rank == 0 ) {
254 if (corrupt_blocks == 0) {
255 fprintf(stdout, " No Errors\n");
256 } else {
257 fprintf(stdout, "%d/%d blocks corrupt\n",
258 corrupt_blocks, nproc * NUM_OBJS );
259 }
260 }
261 MPI_Info_free(&info);
262
263 MPI_Finalize();
264 free(prog);
265 exit( 0 );
266 }