This source file includes following definitions.
- print_vars
- main
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 #include <stdlib.h>
45 #include <stdio.h>
46 #include <mpi.h>
47
48 static MPI_T_pvar_handle count_handle;
49 static MPI_T_pvar_handle msize_handle;
50 static const char count_pvar_name[] = "pml_monitoring_messages_count";
51 static const char msize_pvar_name[] = "pml_monitoring_messages_size";
52 static int count_pvar_idx, msize_pvar_idx;
53 static int world_rank, world_size;
54
55 static void print_vars(int rank, int size, size_t* msg_count, size_t*msg_size)
56 {
57 int i;
58 for(i = 0; i < size; ++i) {
59 if(0 != msg_size[i])
60 printf("I\t%d\t%d\t%zu bytes\t%zu msgs sent\n", rank, i, msg_size[i], msg_count[i]);
61 }
62 }
63
64 int main(int argc, char* argv[])
65 {
66 int rank, size, n, to, from, tagno, MPIT_result, provided, count;
67 MPI_T_pvar_session session;
68 MPI_Status status;
69 MPI_Comm newcomm;
70 MPI_Comm comm = MPI_COMM_WORLD;
71 MPI_Request request;
72 size_t*msg_count_p1, *msg_size_p1;
73 size_t*msg_count_p2, *msg_size_p2;
74
75
76 n = -1;
77 MPI_Init(&argc, &argv);
78 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
79 MPI_Comm_size(MPI_COMM_WORLD, &size);
80 world_size = size;
81 world_rank = rank;
82 to = (rank + 1) % size;
83 from = (rank - 1) % size;
84 tagno = 201;
85
86 MPIT_result = MPI_T_init_thread(MPI_THREAD_SINGLE, &provided);
87 if (MPIT_result != MPI_SUCCESS)
88 MPI_Abort(MPI_COMM_WORLD, MPIT_result);
89
90
91 MPIT_result = MPI_T_pvar_get_index(count_pvar_name, MPI_T_PVAR_CLASS_SIZE, &count_pvar_idx);
92 if (MPIT_result != MPI_SUCCESS) {
93 printf("cannot find monitoring MPI_T \"%s\" pvar, check that you have monitoring pml\n",
94 count_pvar_name);
95 MPI_Abort(MPI_COMM_WORLD, MPIT_result);
96 }
97 MPIT_result = MPI_T_pvar_get_index(msize_pvar_name, MPI_T_PVAR_CLASS_SIZE, &msize_pvar_idx);
98 if (MPIT_result != MPI_SUCCESS) {
99 printf("cannot find monitoring MPI_T \"%s\" pvar, check that you have monitoring pml\n",
100 msize_pvar_name);
101 MPI_Abort(MPI_COMM_WORLD, MPIT_result);
102 }
103
104
105 MPIT_result = MPI_T_pvar_session_create(&session);
106 if (MPIT_result != MPI_SUCCESS) {
107 printf("cannot create a session for \"%s\" and \"%s\" pvars\n",
108 count_pvar_name, msize_pvar_name);
109 MPI_Abort(MPI_COMM_WORLD, MPIT_result);
110 }
111
112
113 MPIT_result = MPI_T_pvar_handle_alloc(session, count_pvar_idx,
114 &comm, &count_handle, &count);
115 if (MPIT_result != MPI_SUCCESS) {
116 printf("failed to allocate handle on \"%s\" pvar, check that you have monitoring pml\n",
117 count_pvar_name);
118 MPI_Abort(MPI_COMM_WORLD, MPIT_result);
119 }
120 MPIT_result = MPI_T_pvar_handle_alloc(session, msize_pvar_idx,
121 &comm, &msize_handle, &count);
122 if (MPIT_result != MPI_SUCCESS) {
123 printf("failed to allocate handle on \"%s\" pvar, check that you have monitoring pml\n",
124 msize_pvar_name);
125 MPI_Abort(MPI_COMM_WORLD, MPIT_result);
126 }
127
128
129 msg_count_p1 = calloc(count * 4, sizeof(size_t));
130 msg_size_p1 = &msg_count_p1[count];
131 msg_count_p2 = &msg_count_p1[2*count];
132 msg_size_p2 = &msg_count_p1[3*count];
133
134
135 MPIT_result = MPI_T_pvar_start(session, count_handle);
136 if (MPIT_result != MPI_SUCCESS) {
137 printf("failed to start handle on \"%s\" pvar, check that you have monitoring pml\n",
138 count_pvar_name);
139 MPI_Abort(MPI_COMM_WORLD, MPIT_result);
140 }
141 MPIT_result = MPI_T_pvar_start(session, msize_handle);
142 if (MPIT_result != MPI_SUCCESS) {
143 printf("failed to start handle on \"%s\" pvar, check that you have monitoring pml\n",
144 msize_pvar_name);
145 MPI_Abort(MPI_COMM_WORLD, MPIT_result);
146 }
147
148 if (rank == 0) {
149 n = 25;
150 MPI_Isend(&n,1,MPI_INT,to,tagno,MPI_COMM_WORLD,&request);
151 }
152 while (1) {
153 MPI_Irecv(&n, 1, MPI_INT, from, tagno, MPI_COMM_WORLD, &request);
154 MPI_Wait(&request, &status);
155 if (rank == 0) {n--;tagno++;}
156 MPI_Isend(&n, 1, MPI_INT, to, tagno, MPI_COMM_WORLD, &request);
157 if (rank != 0) {n--;tagno++;}
158 if (n<0){
159 break;
160 }
161 }
162
163
164 MPIT_result = MPI_T_pvar_stop(session, count_handle);
165 if (MPIT_result != MPI_SUCCESS) {
166 printf("failed to stop handle on \"%s\" pvar, check that you have monitoring pml\n",
167 count_pvar_name);
168 MPI_Abort(MPI_COMM_WORLD, MPIT_result);
169 }
170 MPIT_result = MPI_T_pvar_stop(session, msize_handle);
171 if (MPIT_result != MPI_SUCCESS) {
172 printf("failed to stop handle on \"%s\" pvar, check that you have monitoring pml\n",
173 msize_pvar_name);
174 MPI_Abort(MPI_COMM_WORLD, MPIT_result);
175 }
176
177 MPIT_result = MPI_T_pvar_read(session, count_handle, msg_count_p1);
178 if (MPIT_result != MPI_SUCCESS) {
179 printf("failed to fetch handle on \"%s\" pvar, check that you have monitoring pml\n",
180 count_pvar_name);
181 MPI_Abort(MPI_COMM_WORLD, MPIT_result);
182 }
183 MPIT_result = MPI_T_pvar_read(session, msize_handle, msg_size_p1);
184 if (MPIT_result != MPI_SUCCESS) {
185 printf("failed to fetch handle on \"%s\" pvar, check that you have monitoring pml\n",
186 msize_pvar_name);
187 MPI_Abort(MPI_COMM_WORLD, MPIT_result);
188 }
189
190
191 if(0 == world_rank) {
192 printf("Flushing phase 1:\n");
193 print_vars(world_rank, world_size, msg_count_p1, msg_size_p1);
194 MPI_Send(NULL, 0, MPI_BYTE, (world_rank + 1) % world_size, 300, MPI_COMM_WORLD);
195 MPI_Recv(NULL, 0, MPI_BYTE, (world_rank - 1) % world_size, 300, MPI_COMM_WORLD, &status);
196 } else {
197 MPI_Recv(NULL, 0, MPI_BYTE, (world_rank - 1) % world_size, 300, MPI_COMM_WORLD, &status);
198 print_vars(world_rank, world_size, msg_count_p1, msg_size_p1);
199 MPI_Send(NULL, 0, MPI_BYTE, (world_rank + 1) % world_size, 300, MPI_COMM_WORLD);
200 }
201
202
203 MPIT_result = MPI_T_pvar_read(session, count_handle, msg_count_p1);
204 if (MPIT_result != MPI_SUCCESS) {
205 printf("failed to fetch handle on \"%s\" pvar, check that you have monitoring pml\n",
206 count_pvar_name);
207 MPI_Abort(MPI_COMM_WORLD, MPIT_result);
208 }
209 MPIT_result = MPI_T_pvar_read(session, msize_handle, msg_size_p1);
210 if (MPIT_result != MPI_SUCCESS) {
211 printf("failed to fetch handle on \"%s\" pvar, check that you have monitoring pml\n",
212 msize_pvar_name);
213 MPI_Abort(MPI_COMM_WORLD, MPIT_result);
214 }
215
216
217
218
219
220
221 MPIT_result = MPI_T_pvar_start(session, count_handle);
222 if (MPIT_result != MPI_SUCCESS) {
223 printf("failed to start handle on \"%s\" pvar, check that you have monitoring pml\n",
224 count_pvar_name);
225 MPI_Abort(MPI_COMM_WORLD, MPIT_result);
226 }
227 MPIT_result = MPI_T_pvar_start(session, msize_handle);
228 if (MPIT_result != MPI_SUCCESS) {
229 printf("failed to start handle on \"%s\" pvar, check that you have monitoring pml\n",
230 msize_pvar_name);
231 MPI_Abort(MPI_COMM_WORLD, MPIT_result);
232 }
233
234 MPI_Comm_split(MPI_COMM_WORLD, rank%2, rank, &newcomm);
235
236 if(rank%2){
237 MPI_Comm_rank(newcomm, &rank);
238 MPI_Comm_size(newcomm, &size);
239 if( size > 1 ) {
240 to = (rank + 1) % size;
241 from = (rank - 1) % size;
242 tagno = 201;
243 if (rank == 0){
244 n = 50;
245 MPI_Send(&n, 1, MPI_INT, to, tagno, newcomm);
246 }
247 while (1){
248 MPI_Recv(&n, 1, MPI_INT, from, tagno, newcomm, &status);
249 if (rank == 0) {n--; tagno++;}
250 MPI_Send(&n, 1, MPI_INT, to, tagno, newcomm);
251 if (rank != 0) {n--; tagno++;}
252 if (n<0){
253 break;
254 }
255 }
256 }
257 } else {
258 int send_buff[10240];
259 int recv_buff[10240];
260 MPI_Comm_rank(newcomm, &rank);
261 MPI_Comm_size(newcomm, &size);
262 MPI_Alltoall(send_buff, 10240/size, MPI_INT, recv_buff, 10240/size, MPI_INT, newcomm);
263 MPI_Comm_split(newcomm, rank%2, rank, &newcomm);
264 MPI_Barrier(newcomm);
265 }
266
267 MPIT_result = MPI_T_pvar_read(session, count_handle, msg_count_p2);
268 if (MPIT_result != MPI_SUCCESS) {
269 printf("failed to fetch handle on \"%s\" pvar, check that you have monitoring pml\n",
270 count_pvar_name);
271 MPI_Abort(MPI_COMM_WORLD, MPIT_result);
272 }
273 MPIT_result = MPI_T_pvar_read(session, msize_handle, msg_size_p2);
274 if (MPIT_result != MPI_SUCCESS) {
275 printf("failed to fetch handle on \"%s\" pvar, check that you have monitoring pml\n",
276 msize_pvar_name);
277 MPI_Abort(MPI_COMM_WORLD, MPIT_result);
278 }
279
280
281 for(int i = 0; i < size; ++i) {
282 msg_count_p2[i] -= msg_count_p1[i];
283 msg_size_p2[i] -= msg_size_p1[i];
284 }
285
286
287 if(0 == world_rank) {
288 printf("Flushing phase 2:\n");
289 print_vars(world_rank, world_size, msg_count_p2, msg_size_p2);
290 MPI_Send(NULL, 0, MPI_BYTE, (world_rank + 1) % world_size, 300, MPI_COMM_WORLD);
291 MPI_Recv(NULL, 0, MPI_BYTE, (world_rank - 1) % world_size, 300, MPI_COMM_WORLD, &status);
292 } else {
293 MPI_Recv(NULL, 0, MPI_BYTE, (world_rank - 1) % world_size, 300, MPI_COMM_WORLD, &status);
294 print_vars(world_rank, world_size, msg_count_p2, msg_size_p2);
295 MPI_Send(NULL, 0, MPI_BYTE, (world_rank + 1) % world_size, 300, MPI_COMM_WORLD);
296 }
297
298 MPIT_result = MPI_T_pvar_handle_free(session, &count_handle);
299 if (MPIT_result != MPI_SUCCESS) {
300 printf("failed to free handle on \"%s\" pvar, check that you have monitoring pml\n",
301 count_pvar_name);
302 MPI_Abort(MPI_COMM_WORLD, MPIT_result);
303 }
304 MPIT_result = MPI_T_pvar_handle_free(session, &msize_handle);
305 if (MPIT_result != MPI_SUCCESS) {
306 printf("failed to free handle on \"%s\" pvar, check that you have monitoring pml\n",
307 msize_pvar_name);
308 MPI_Abort(MPI_COMM_WORLD, MPIT_result);
309 }
310
311 MPIT_result = MPI_T_pvar_session_free(&session);
312 if (MPIT_result != MPI_SUCCESS) {
313 printf("cannot close a session for \"%s\" and \"%s\" pvars\n",
314 count_pvar_name, msize_pvar_name);
315 MPI_Abort(MPI_COMM_WORLD, MPIT_result);
316 }
317
318 (void)MPI_T_finalize();
319
320 free(msg_count_p1);
321
322 MPI_Finalize();
323 return EXIT_SUCCESS;
324 }