root/ompi/mca/io/romio321/romio/test/aggregation1.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. Usage
  2. fatal_error
  3. print_hints
  4. fill_buffer
  5. get_offset
  6. write_file
  7. reduce_corruptions
  8. read_file
  9. set_hints
  10. main

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
   2 /*  
   3  *  (C) 2007 by Argonne National Laboratory.
   4  *      See COPYRIGHT in top-level directory.
   5  */
   6 
   7 /* Test case from John Bent (ROMIO req #835)
   8  * Aggregation code was not handling certain access patterns when collective
   9  * buffering forced */
  10 #define _XOPEN_SOURCE 500 /* strdup not in string.h otherwsie */
  11 #include <unistd.h>
  12 #include <stdlib.h>
  13 #include <mpi.h>
  14 #include <stdio.h>
  15 #include <string.h>
  16 
  17 #define NUM_OBJS 4
  18 #define OBJ_SIZE 1048576 
  19 
  20 extern char *optarg;
  21 extern int optind, opterr, optopt;
  22 
  23 
  24 char *prog = NULL;
  25 int  debug = 0;
  26 
  27 static void
  28 Usage( int line ) {
  29     int rank;
  30     MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  31     if ( rank == 0 ) {
  32         fprintf( stderr, 
  33                 "Usage (line %d): %s [-d] [-h] -f filename\n"
  34                 "\t-d for debugging\n"
  35                 "\t-h to turn on the hints to force collective aggregation\n",
  36                 line, prog );
  37     }
  38     exit( 0 );
  39 }
  40 
  41 static void
  42 fatal_error( int mpi_ret, MPI_Status *mpi_stat, const char *msg ) {
  43     fprintf( stderr, "Fatal error %s: %d\n", msg, mpi_ret );
  44     MPI_Abort( MPI_COMM_WORLD, -1 );
  45 }
  46 
  47 static void
  48 print_hints( int rank, MPI_File *mfh ) {
  49     MPI_Info info;
  50     int nkeys;
  51     int i, dummy_int;
  52     char key[1024];
  53     char value[1024];
  54 
  55     MPI_Barrier( MPI_COMM_WORLD );
  56     if ( rank == 0 ) {
  57         MPI_File_get_info( *mfh, &info );
  58         MPI_Info_get_nkeys( info, &nkeys );
  59 
  60         printf( "HINTS:\n" );
  61         for( i = 0; i < nkeys; i++ ) {
  62             MPI_Info_get_nthkey( info, i, key );
  63             printf( "%35s -> ", key );
  64             MPI_Info_get( info, key, 1024, value, &dummy_int ); 
  65             printf( "%s\n", value );
  66         }
  67         MPI_Info_free(&info);
  68     }
  69     MPI_Barrier( MPI_COMM_WORLD );
  70 }
  71 
  72 static void
  73 fill_buffer( char *buffer, int bufsize, int rank, MPI_Offset offset ) {
  74     memset( (void*)buffer, 0, bufsize );
  75     snprintf( buffer, bufsize, "Hello from %d at %lld\n", rank, offset );
  76 }
  77 
  78 static MPI_Offset
  79 get_offset( int rank, int num_objs, int obj_size, int which_obj ) {
  80     MPI_Offset offset;
  81     offset = (MPI_Offset)rank * num_objs * obj_size + which_obj * obj_size;
  82     return offset;
  83 }
  84 
  85 static void
  86 write_file( char *target, int rank, MPI_Info *info ) {
  87     MPI_File wfh;
  88     MPI_Status mpi_stat;
  89     int mpi_ret;
  90     int i;
  91     char *buffer;
  92 
  93     buffer = malloc(OBJ_SIZE);
  94 
  95     if ( debug ) printf( "%d writing file %s\n", rank, target );
  96     
  97     if( (mpi_ret = MPI_File_open(MPI_COMM_WORLD, target, 
  98                     MPI_MODE_WRONLY | MPI_MODE_CREATE, *info, &wfh ) )
  99                 != MPI_SUCCESS ) 
 100     {
 101         fatal_error( mpi_ret, NULL, "open for write" );
 102     }
 103 
 104     for( i = 0; i < NUM_OBJS; i++ ) {
 105         MPI_Offset offset = get_offset( rank, NUM_OBJS, OBJ_SIZE, i );
 106         fill_buffer( buffer, OBJ_SIZE, rank, offset );
 107         if ( debug ) printf( "%s", buffer );
 108         if ( (mpi_ret = MPI_File_write_at_all( wfh, offset, buffer, OBJ_SIZE,
 109                         MPI_CHAR, &mpi_stat ) ) != MPI_SUCCESS ) 
 110         {
 111             fatal_error( mpi_ret, &mpi_stat, "write" );
 112         }
 113     }
 114 
 115     if ( debug ) print_hints( rank, &wfh );
 116 
 117     if( (mpi_ret = MPI_File_close( &wfh ) ) != MPI_SUCCESS ) {
 118         fatal_error( mpi_ret, NULL, "close for write" );
 119     }
 120     if ( debug ) printf( "%d wrote file %s\n", rank, target );
 121     free(buffer);
 122 }
 123 
 124 static int
 125 reduce_corruptions( int corrupt_blocks ) {
 126     int mpi_ret;
 127     int sum;
 128     if ( ( mpi_ret = MPI_Reduce( &corrupt_blocks, &sum, 1, 
 129                     MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD ) ) != MPI_SUCCESS )
 130     {
 131         fatal_error( mpi_ret, NULL, "MPI_Reduce" );
 132     }
 133     return sum;
 134 }
 135 
 136 static void
 137 read_file( char *target, int rank, MPI_Info *info, int *corrupt_blocks ) {
 138     MPI_File rfh;
 139     MPI_Status mpi_stat;
 140     int mpi_ret;
 141     int i;
 142     char *buffer;
 143     char *verify_buf = NULL;
 144     buffer = malloc(OBJ_SIZE);
 145     verify_buf = (char *)malloc(OBJ_SIZE);
 146 
 147     if ( debug ) printf( "%d reading file %s\n", rank, target );
 148     
 149     if( (mpi_ret = MPI_File_open(MPI_COMM_WORLD, target, 
 150                     MPI_MODE_RDONLY, *info, &rfh ) ) != MPI_SUCCESS ) 
 151     {
 152         fatal_error( mpi_ret, NULL, "open for read" );
 153     }
 154 
 155     for( i = 0; i < NUM_OBJS; i++ ) {
 156         MPI_Offset offset = get_offset( rank, NUM_OBJS, OBJ_SIZE, i );
 157         fill_buffer( verify_buf, OBJ_SIZE, rank, offset );
 158         if ( debug ) printf( "Expecting %s", buffer );
 159         if ( (mpi_ret = MPI_File_read_at_all( rfh, offset, buffer, OBJ_SIZE,
 160                         MPI_CHAR, &mpi_stat ) ) != MPI_SUCCESS ) 
 161         {
 162             fatal_error( mpi_ret, &mpi_stat, "read" );
 163         }
 164         if ( memcmp( verify_buf, buffer, OBJ_SIZE ) != 0 ) {
 165             (*corrupt_blocks)++;
 166             printf( "Corruption at %lld\n", offset );
 167             if ( debug ) {
 168                 printf( "\tExpecting %s\n"
 169                          "\tRecieved  %s\n",
 170                          verify_buf, buffer );
 171             }
 172         }
 173     }
 174 
 175     if( (mpi_ret = MPI_File_close( &rfh ) ) != MPI_SUCCESS ) {
 176         fatal_error( mpi_ret, NULL, "close for read" );
 177     }
 178     free (buffer);
 179     free(verify_buf);
 180 
 181 }
 182 
 183 static void
 184 set_hints( MPI_Info *info ) {
 185     MPI_Info_set( *info, "romio_cb_write", "enable" ); 
 186     MPI_Info_set( *info, "romio_no_indep_rw", "1" ); 
 187     MPI_Info_set( *info, "cb_nodes", "1" ); 
 188     MPI_Info_set( *info, "cb_buffer_size", "4194304" );
 189 }
 190 
 191 /*
 192 void
 193 set_hints( MPI_Info *info, char *hints ) {
 194     char *delimiter = " ";
 195     char *hints_cp  = strdup( hints );
 196     char *key = strtok( hints_cp, delimiter );
 197     char *val;
 198     while( key ) {
 199         val = strtok( NULL, delimiter );
 200         if ( debug ) printf( "HINT: %s = %s\n", key, val );
 201         if ( ! val ) {
 202             Usage( __LINE__ ); 
 203         }
 204         MPI_Info_set( *info, key, val );
 205         key = strtok( NULL, delimiter );
 206     }
 207     free( hints_cp );
 208 }
 209 */
 210 
 211 int 
 212 main( int argc, char *argv[] ) {
 213         int nproc = 1, rank = 0;
 214     char *target = NULL;
 215     int c;
 216     MPI_Info info;
 217     int mpi_ret;
 218     int corrupt_blocks = 0;
 219 
 220     MPI_Init( &argc, &argv );
 221     MPI_Comm_size(MPI_COMM_WORLD, &nproc);
 222     MPI_Comm_rank(MPI_COMM_WORLD, &rank);
 223 
 224     if( (mpi_ret = MPI_Info_create(&info)) != MPI_SUCCESS) {
 225         if(rank == 0) fatal_error( mpi_ret, NULL, "MPI_info_create.\n");
 226     }
 227 
 228     prog = strdup( argv[0] );
 229 
 230     while( ( c = getopt( argc, argv, "df:h" ) ) != EOF ) {
 231         switch( c ) {
 232             case 'd':
 233                 debug = 1;
 234                 break;
 235             case 'f':
 236                 target = strdup( optarg );
 237                 break;
 238             case 'h':
 239                 set_hints( &info );
 240                 break;
 241             default:
 242                 Usage( __LINE__ );
 243         }
 244     }
 245     if ( ! target ) {
 246         Usage( __LINE__ );
 247     }
 248 
 249     write_file( target, rank, &info );
 250     read_file(  target, rank, &info, &corrupt_blocks );
 251 
 252     corrupt_blocks = reduce_corruptions( corrupt_blocks );
 253     if ( rank == 0 ) {
 254         if (corrupt_blocks == 0) {
 255             fprintf(stdout, " No Errors\n");
 256         } else {
 257             fprintf(stdout, "%d/%d blocks corrupt\n",
 258                 corrupt_blocks, nproc * NUM_OBJS );
 259         }
 260     }
 261     MPI_Info_free(&info);
 262 
 263     MPI_Finalize();
 264     free(prog);
 265     exit( 0 );
 266 }

/* [<][>][^][v][top][bottom][index][help] */