This source file includes following definitions.
- print_vars
- main
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 
  17 
  18 
  19 
  20 
  21 
  22 
  23 
  24 
  25 
  26 
  27 
  28 
  29 
  30 
  31 
  32 
  33 
  34 
  35 
  36 
  37 
  38 
  39 
  40 
  41 
  42 
  43 
  44 #include <stdlib.h>
  45 #include <stdio.h>
  46 #include <mpi.h>
  47 
  48 static MPI_T_pvar_handle count_handle;
  49 static MPI_T_pvar_handle msize_handle;
  50 static const char count_pvar_name[] = "pml_monitoring_messages_count";
  51 static const char msize_pvar_name[] = "pml_monitoring_messages_size";
  52 static int count_pvar_idx, msize_pvar_idx;
  53 static int world_rank, world_size;
  54 
  55 static void print_vars(int rank, int size, size_t* msg_count, size_t*msg_size)
  56 {
  57     int i;
  58     for(i = 0; i < size; ++i) {
  59         if(0 != msg_size[i])
  60             printf("I\t%d\t%d\t%zu bytes\t%zu msgs sent\n", rank, i, msg_size[i], msg_count[i]);
  61     }
  62 }
  63 
  64 int main(int argc, char* argv[])
  65 {
  66     int rank, size, n, to, from, tagno, MPIT_result, provided, count;
  67     MPI_T_pvar_session session;
  68     MPI_Status status;
  69     MPI_Comm newcomm;
  70     MPI_Comm comm = MPI_COMM_WORLD;
  71     MPI_Request request;
  72     size_t*msg_count_p1, *msg_size_p1;
  73     size_t*msg_count_p2, *msg_size_p2;
  74 
  75     
  76     n = -1;
  77     MPI_Init(&argc, &argv);
  78     MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  79     MPI_Comm_size(MPI_COMM_WORLD, &size);
  80     world_size = size;
  81     world_rank = rank;
  82     to = (rank + 1) % size;
  83     from = (rank - 1) % size;
  84     tagno = 201;
  85 
  86     MPIT_result = MPI_T_init_thread(MPI_THREAD_SINGLE, &provided);
  87     if (MPIT_result != MPI_SUCCESS)
  88         MPI_Abort(MPI_COMM_WORLD, MPIT_result);
  89     
  90     
  91     MPIT_result = MPI_T_pvar_get_index(count_pvar_name, MPI_T_PVAR_CLASS_SIZE, &count_pvar_idx);
  92     if (MPIT_result != MPI_SUCCESS) {
  93         printf("cannot find monitoring MPI_T \"%s\" pvar, check that you have monitoring pml\n",
  94                count_pvar_name);
  95         MPI_Abort(MPI_COMM_WORLD, MPIT_result);
  96     }
  97     MPIT_result = MPI_T_pvar_get_index(msize_pvar_name, MPI_T_PVAR_CLASS_SIZE, &msize_pvar_idx);
  98     if (MPIT_result != MPI_SUCCESS) {
  99         printf("cannot find monitoring MPI_T \"%s\" pvar, check that you have monitoring pml\n",
 100                msize_pvar_name);
 101         MPI_Abort(MPI_COMM_WORLD, MPIT_result);
 102     }
 103 
 104     
 105     MPIT_result = MPI_T_pvar_session_create(&session);
 106     if (MPIT_result != MPI_SUCCESS) {
 107         printf("cannot create a session for \"%s\" and \"%s\" pvars\n",
 108                count_pvar_name, msize_pvar_name);
 109         MPI_Abort(MPI_COMM_WORLD, MPIT_result);
 110     }
 111 
 112     
 113     MPIT_result = MPI_T_pvar_handle_alloc(session, count_pvar_idx,
 114                                           &comm, &count_handle, &count);
 115     if (MPIT_result != MPI_SUCCESS) {
 116         printf("failed to allocate handle on \"%s\" pvar, check that you have monitoring pml\n",
 117                count_pvar_name);
 118         MPI_Abort(MPI_COMM_WORLD, MPIT_result);
 119     }
 120     MPIT_result = MPI_T_pvar_handle_alloc(session, msize_pvar_idx,
 121                                           &comm, &msize_handle, &count);
 122     if (MPIT_result != MPI_SUCCESS) {
 123         printf("failed to allocate handle on \"%s\" pvar, check that you have monitoring pml\n",
 124                msize_pvar_name);
 125         MPI_Abort(MPI_COMM_WORLD, MPIT_result);
 126     }
 127 
 128     
 129     msg_count_p1 = calloc(count * 4, sizeof(size_t));
 130     msg_size_p1 = &msg_count_p1[count];
 131     msg_count_p2 = &msg_count_p1[2*count];
 132     msg_size_p2 = &msg_count_p1[3*count];
 133 
 134     
 135     MPIT_result = MPI_T_pvar_start(session, count_handle);
 136     if (MPIT_result != MPI_SUCCESS) {
 137         printf("failed to start handle on \"%s\" pvar, check that you have monitoring pml\n",
 138                count_pvar_name);
 139         MPI_Abort(MPI_COMM_WORLD, MPIT_result);
 140     }
 141     MPIT_result = MPI_T_pvar_start(session, msize_handle);
 142     if (MPIT_result != MPI_SUCCESS) {
 143         printf("failed to start handle on \"%s\" pvar, check that you have monitoring pml\n",
 144                msize_pvar_name);
 145         MPI_Abort(MPI_COMM_WORLD, MPIT_result);
 146     }
 147     
 148     if (rank == 0) {
 149         n = 25;
 150         MPI_Isend(&n,1,MPI_INT,to,tagno,MPI_COMM_WORLD,&request);
 151     }
 152     while (1) {
 153         MPI_Irecv(&n, 1, MPI_INT, from, tagno, MPI_COMM_WORLD, &request);
 154         MPI_Wait(&request, &status);
 155         if (rank == 0) {n--;tagno++;}
 156         MPI_Isend(&n, 1, MPI_INT, to, tagno, MPI_COMM_WORLD, &request);
 157         if (rank != 0) {n--;tagno++;}
 158         if (n<0){
 159             break;
 160         }
 161     }
 162 
 163     
 164     MPIT_result = MPI_T_pvar_stop(session, count_handle);
 165     if (MPIT_result != MPI_SUCCESS) {
 166         printf("failed to stop handle on \"%s\" pvar, check that you have monitoring pml\n",
 167                count_pvar_name);
 168         MPI_Abort(MPI_COMM_WORLD, MPIT_result);
 169     }    
 170     MPIT_result = MPI_T_pvar_stop(session, msize_handle);
 171     if (MPIT_result != MPI_SUCCESS) {
 172         printf("failed to stop handle on \"%s\" pvar, check that you have monitoring pml\n",
 173                msize_pvar_name);
 174         MPI_Abort(MPI_COMM_WORLD, MPIT_result);
 175     }
 176 
 177     MPIT_result = MPI_T_pvar_read(session, count_handle, msg_count_p1);
 178     if (MPIT_result != MPI_SUCCESS) {
 179         printf("failed to fetch handle on \"%s\" pvar, check that you have monitoring pml\n",
 180                count_pvar_name);
 181         MPI_Abort(MPI_COMM_WORLD, MPIT_result);
 182     }
 183     MPIT_result = MPI_T_pvar_read(session, msize_handle, msg_size_p1);
 184     if (MPIT_result != MPI_SUCCESS) {
 185         printf("failed to fetch handle on \"%s\" pvar, check that you have monitoring pml\n",
 186                msize_pvar_name);
 187         MPI_Abort(MPI_COMM_WORLD, MPIT_result);
 188     }
 189 
 190     
 191     if(0 == world_rank) {
 192         printf("Flushing phase 1:\n");
 193         print_vars(world_rank, world_size, msg_count_p1, msg_size_p1);
 194         MPI_Send(NULL, 0, MPI_BYTE, (world_rank + 1) % world_size, 300, MPI_COMM_WORLD);
 195         MPI_Recv(NULL, 0, MPI_BYTE, (world_rank - 1) % world_size, 300, MPI_COMM_WORLD, &status);
 196     } else {
 197         MPI_Recv(NULL, 0, MPI_BYTE, (world_rank - 1) % world_size, 300, MPI_COMM_WORLD, &status);
 198         print_vars(world_rank, world_size, msg_count_p1, msg_size_p1);
 199         MPI_Send(NULL, 0, MPI_BYTE, (world_rank + 1) % world_size, 300, MPI_COMM_WORLD);
 200     }
 201 
 202     
 203     MPIT_result = MPI_T_pvar_read(session, count_handle, msg_count_p1);
 204     if (MPIT_result != MPI_SUCCESS) {
 205         printf("failed to fetch handle on \"%s\" pvar, check that you have monitoring pml\n",
 206                count_pvar_name);
 207         MPI_Abort(MPI_COMM_WORLD, MPIT_result);
 208     }
 209     MPIT_result = MPI_T_pvar_read(session, msize_handle, msg_size_p1);
 210     if (MPIT_result != MPI_SUCCESS) {
 211         printf("failed to fetch handle on \"%s\" pvar, check that you have monitoring pml\n",
 212                msize_pvar_name);
 213         MPI_Abort(MPI_COMM_WORLD, MPIT_result);
 214     }
 215     
 216     
 217 
 218 
 219 
 220 
 221     MPIT_result = MPI_T_pvar_start(session, count_handle);
 222     if (MPIT_result != MPI_SUCCESS) {
 223         printf("failed to start handle on \"%s\" pvar, check that you have monitoring pml\n",
 224                count_pvar_name);
 225         MPI_Abort(MPI_COMM_WORLD, MPIT_result);
 226     }
 227     MPIT_result = MPI_T_pvar_start(session, msize_handle);
 228     if (MPIT_result != MPI_SUCCESS) {
 229         printf("failed to start handle on \"%s\" pvar, check that you have monitoring pml\n",
 230                msize_pvar_name);
 231         MPI_Abort(MPI_COMM_WORLD, MPIT_result);
 232     }
 233     
 234     MPI_Comm_split(MPI_COMM_WORLD, rank%2, rank, &newcomm);
 235 
 236     if(rank%2){ 
 237         MPI_Comm_rank(newcomm, &rank);
 238         MPI_Comm_size(newcomm, &size);
 239         if( size > 1 ) {
 240             to = (rank + 1) % size;
 241             from = (rank - 1) % size;
 242             tagno = 201;
 243             if (rank == 0){
 244                 n = 50;
 245                 MPI_Send(&n, 1, MPI_INT, to, tagno, newcomm);
 246             }
 247             while (1){
 248                 MPI_Recv(&n, 1, MPI_INT, from, tagno, newcomm, &status);
 249                 if (rank == 0) {n--; tagno++;}
 250                 MPI_Send(&n, 1, MPI_INT, to, tagno, newcomm);
 251                 if (rank != 0) {n--; tagno++;}
 252                 if (n<0){
 253                     break;
 254                 }
 255             }
 256         }
 257     } else { 
 258         int send_buff[10240];
 259         int recv_buff[10240];
 260         MPI_Comm_rank(newcomm, &rank);
 261         MPI_Comm_size(newcomm, &size);
 262         MPI_Alltoall(send_buff, 10240/size, MPI_INT, recv_buff, 10240/size, MPI_INT, newcomm);
 263         MPI_Comm_split(newcomm, rank%2, rank, &newcomm);
 264         MPI_Barrier(newcomm);
 265     }
 266 
 267     MPIT_result = MPI_T_pvar_read(session, count_handle, msg_count_p2);
 268     if (MPIT_result != MPI_SUCCESS) {
 269         printf("failed to fetch handle on \"%s\" pvar, check that you have monitoring pml\n",
 270                count_pvar_name);
 271         MPI_Abort(MPI_COMM_WORLD, MPIT_result);
 272     }
 273     MPIT_result = MPI_T_pvar_read(session, msize_handle, msg_size_p2);
 274     if (MPIT_result != MPI_SUCCESS) {
 275         printf("failed to fetch handle on \"%s\" pvar, check that you have monitoring pml\n",
 276                msize_pvar_name);
 277         MPI_Abort(MPI_COMM_WORLD, MPIT_result);
 278     }
 279 
 280     
 281     for(int i = 0; i < size; ++i) {
 282         msg_count_p2[i] -= msg_count_p1[i];
 283         msg_size_p2[i] -= msg_size_p1[i];
 284     }
 285 
 286     
 287     if(0 == world_rank) {
 288         printf("Flushing phase 2:\n");
 289         print_vars(world_rank, world_size, msg_count_p2, msg_size_p2);
 290         MPI_Send(NULL, 0, MPI_BYTE, (world_rank + 1) % world_size, 300, MPI_COMM_WORLD);
 291         MPI_Recv(NULL, 0, MPI_BYTE, (world_rank - 1) % world_size, 300, MPI_COMM_WORLD, &status);
 292     } else {
 293         MPI_Recv(NULL, 0, MPI_BYTE, (world_rank - 1) % world_size, 300, MPI_COMM_WORLD, &status);
 294         print_vars(world_rank, world_size, msg_count_p2, msg_size_p2);
 295         MPI_Send(NULL, 0, MPI_BYTE, (world_rank + 1) % world_size, 300, MPI_COMM_WORLD);
 296     }
 297 
 298     MPIT_result = MPI_T_pvar_handle_free(session, &count_handle);
 299     if (MPIT_result != MPI_SUCCESS) {
 300         printf("failed to free handle on \"%s\" pvar, check that you have monitoring pml\n",
 301                count_pvar_name);
 302         MPI_Abort(MPI_COMM_WORLD, MPIT_result);
 303     }
 304     MPIT_result = MPI_T_pvar_handle_free(session, &msize_handle);
 305     if (MPIT_result != MPI_SUCCESS) {
 306         printf("failed to free handle on \"%s\" pvar, check that you have monitoring pml\n",
 307                msize_pvar_name);
 308         MPI_Abort(MPI_COMM_WORLD, MPIT_result);
 309     }
 310 
 311     MPIT_result = MPI_T_pvar_session_free(&session);
 312     if (MPIT_result != MPI_SUCCESS) {
 313         printf("cannot close a session for \"%s\" and \"%s\" pvars\n",
 314                count_pvar_name, msize_pvar_name);
 315         MPI_Abort(MPI_COMM_WORLD, MPIT_result);
 316     }
 317 
 318     (void)MPI_T_finalize();
 319 
 320     free(msg_count_p1);
 321     
 322     MPI_Finalize();
 323     return EXIT_SUCCESS;
 324 }