This source file includes following definitions.
- ADIOI_P2PContigWriteAggregation
- ADIOI_P2PContigReadAggregation
1
2
3
4
5
6
7 #include "adio.h"
8 #include "adio_extern.h"
9 #include "../ad_gpfs/ad_gpfs_tuning.h"
10
11 #include <pthread.h>
12
13
14
15 void ADIOI_P2PContigWriteAggregation(ADIO_File fd,
16 const void *buf,
17 int *error_code,
18 ADIO_Offset *st_offsets,
19 ADIO_Offset *end_offsets,
20 ADIO_Offset *fd_start,
21 ADIO_Offset* fd_end)
22 {
23
24 *error_code = MPI_SUCCESS;
25
26 #ifdef ROMIO_GPFS
27 double startTimeBase,endTimeBase;
28 #endif
29
30 MPI_Status status;
31 pthread_t io_thread;
32 void *thread_ret;
33 ADIOI_IO_ThreadFuncData io_thread_args;
34
35 int nprocs,myrank;
36 MPI_Comm_size(fd->comm, &nprocs);
37 MPI_Comm_rank(fd->comm, &myrank);
38
39 ADIO_Offset myOffsetStart = st_offsets[myrank], myOffsetEnd = end_offsets[myrank];
40
41 int myAggRank = -1;
42 int iAmUsedAgg = 0;
43
44 #ifdef ROMIO_GPFS
45 startTimeBase = MPI_Wtime();
46 #endif
47
48 int naggs = fd->hints->cb_nodes;
49 int coll_bufsize = fd->hints->cb_buffer_size;
50 #ifdef ROMIO_GPFS
51 if (gpfsmpio_pthreadio == 1) {
52
53 coll_bufsize = fd->hints->cb_buffer_size/2;
54 }
55 #endif
56
57 int j;
58 for (j=0;j<naggs;j++) {
59 if (fd->hints->ranklist[j] == myrank) {
60 myAggRank = j;
61 if (fd_end[j] > fd_start[j]) {
62 iAmUsedAgg = 1;
63 }
64 }
65 }
66
67
68
69
70 int *targetAggsForMyData = (int *)ADIOI_Malloc(naggs * sizeof(int));
71 ADIO_Offset *targetAggsForMyDataFDStart = (ADIO_Offset *)ADIOI_Malloc(naggs * sizeof(ADIO_Offset));
72 ADIO_Offset *targetAggsForMyDataFDEnd = (ADIO_Offset *)ADIOI_Malloc(naggs * sizeof(ADIO_Offset));
73 int numTargetAggs = 0;
74 int i;
75 for (i=0;i<naggs;i++) {
76 if ( ((myOffsetStart >= fd_start[i]) && (myOffsetStart <= fd_end[i])) || ((myOffsetEnd >= fd_start[i]) && (myOffsetEnd <= fd_end[i]))) {
77 targetAggsForMyData[numTargetAggs] = fd->hints->ranklist[i];
78 targetAggsForMyDataFDStart[numTargetAggs] = fd_start[i];
79 targetAggsForMyDataFDEnd[numTargetAggs] = fd_end[i];
80 numTargetAggs++;
81 }
82 }
83
84
85 int *sourceProcsForMyData=NULL;
86 int *remainingDataAmountToGetPerProc=NULL;
87 ADIO_Offset *remainingDataOffsetToGetPerProc=NULL;
88
89 int numSourceProcs = 0;
90
91 if (iAmUsedAgg) {
92
93
94
95 for (i=0;i<nprocs;i++)
96 if ( ((st_offsets[i] >= fd_start[myAggRank]) && (st_offsets[i] <= fd_end[myAggRank])) || ((end_offsets[i] >= fd_start[myAggRank]) && (end_offsets[i] <= fd_end[myAggRank])))
97 numSourceProcs++;
98
99 sourceProcsForMyData = (int *)ADIOI_Malloc(numSourceProcs * sizeof(int));
100 remainingDataAmountToGetPerProc = (int *)ADIOI_Malloc(numSourceProcs * sizeof(int));
101 remainingDataOffsetToGetPerProc = (ADIO_Offset *)ADIOI_Malloc(numSourceProcs * sizeof(ADIO_Offset));
102
103
104
105
106 numSourceProcs = 0;
107 for (i=0;i<nprocs;i++) {
108 if ( ((st_offsets[i] >= fd_start[myAggRank]) && (st_offsets[i] <= fd_end[myAggRank])) || ((end_offsets[i] >= fd_start[myAggRank]) && (end_offsets[i] <= fd_end[myAggRank]))) {
109 sourceProcsForMyData[numSourceProcs] = i;
110 if ( ((st_offsets[i] >= fd_start[myAggRank]) && (st_offsets[i] <= fd_end[myAggRank])) && ((end_offsets[i] >= fd_start[myAggRank]) && (end_offsets[i] <= fd_end[myAggRank]))) {
111 remainingDataAmountToGetPerProc[numSourceProcs] = (end_offsets[i] - st_offsets[i])+1;
112 remainingDataOffsetToGetPerProc[numSourceProcs] = st_offsets[i];
113 }
114 else if ((st_offsets[i] >= fd_start[myAggRank]) && (st_offsets[i] <= fd_end[myAggRank])) {
115 remainingDataAmountToGetPerProc[numSourceProcs] = (fd_end[myAggRank] - st_offsets[i]) +1;
116 remainingDataOffsetToGetPerProc[numSourceProcs] = st_offsets[i];
117 }
118 else {
119 remainingDataAmountToGetPerProc[numSourceProcs] = (end_offsets[i] - fd_start[myAggRank]) +1;
120 remainingDataOffsetToGetPerProc[numSourceProcs] = fd_start[myAggRank];
121 }
122 #ifdef p2pcontigtrace
123 printf("getting %ld bytes from source proc %d in fd rank %d with borders %ld to %ld\n",remainingDataAmountToGetPerProc[numSourceProcs],i,fd->hints->ranklist[myAggRank],fd_start[myAggRank],fd_end[myAggRank]);
124 #endif
125 numSourceProcs++;
126 }
127 }
128 }
129
130 int *amountOfDataReqestedByTargetAgg = (int *)ADIOI_Malloc(naggs * sizeof(int));
131 for (i=0;i<numTargetAggs;i++) {
132 amountOfDataReqestedByTargetAgg[i] = 0;
133 }
134
135 int totalAmountDataReceived = 0;
136 MPI_Request *mpiSizeToSendRequest = (MPI_Request *) ADIOI_Malloc(numTargetAggs * sizeof(MPI_Request));
137 MPI_Request *mpiRecvDataRequest = (MPI_Request *) ADIOI_Malloc(numSourceProcs * sizeof(MPI_Request));
138 MPI_Request *mpiSendDataSizeRequest = (MPI_Request *) ADIOI_Malloc(numSourceProcs * sizeof(MPI_Request));
139
140 MPI_Request *mpiSendDataToTargetAggRequest = (MPI_Request *) ADIOI_Malloc(numTargetAggs * sizeof(MPI_Request));
141 MPI_Status mpiWaitAnyStatusFromTargetAggs,mpiWaitAnyStatusFromSourceProcs;
142 MPI_Status mpiIsendStatusForSize, mpiIsendStatusForData;
143
144
145 char *write_buf0 = fd->io_buf;
146 char *write_buf1 = fd->io_buf + coll_bufsize;
147
148
149
150 char *write_buf = write_buf0;
151
152
153 ADIO_Offset numberOfRounds = (ADIO_Offset)((((ADIO_Offset)(end_offsets[nprocs-1]-st_offsets[0]))/((ADIO_Offset)((ADIO_Offset)coll_bufsize*(ADIO_Offset)naggs)))) + 1;
154
155 int currentWriteBuf = 0;
156 int useIOBuffer = 0;
157 #ifdef ROMIO_GPFS
158 if (gpfsmpio_pthreadio && (numberOfRounds>1)) {
159 useIOBuffer = 1;
160 io_thread = pthread_self();
161 }
162 #endif
163
164 ADIO_Offset currentRoundFDStart = 0;
165 ADIO_Offset currentRoundFDEnd = 0;
166
167 if (iAmUsedAgg) {
168 currentRoundFDStart = fd_start[myAggRank];
169 }
170
171 int *dataSizeGottenThisRoundPerProc = (int *)ADIOI_Malloc(numSourceProcs * sizeof(int));
172 int *mpiRequestMapPerProc = (int *)ADIOI_Malloc(numSourceProcs * sizeof(int));
173 int *targetAggIndexesForMyDataThisRound = (int *)ADIOI_Malloc(numTargetAggs * sizeof(int));
174 int *sendBufferOffsetsThisRound = (int *)ADIOI_Malloc(numTargetAggs * sizeof(int));
175 int *bufferAmountsToSendThisRound = (int *)ADIOI_Malloc(numTargetAggs * sizeof(int));
176
177 #ifdef ROMIO_GPFS
178 endTimeBase = MPI_Wtime();
179 gpfsmpio_prof_cw[GPFSMPIO_CIO_T_MYREQ] += (endTimeBase-startTimeBase);
180 startTimeBase = MPI_Wtime();
181 #endif
182
183
184
185 int roundIter;
186 for (roundIter=0;roundIter<numberOfRounds;roundIter++) {
187
188
189 int numTargetAggsThisRound = 0;
190 for (i=0;i<numTargetAggs;i++) {
191 if ( ((myOffsetStart >= targetAggsForMyDataFDStart[i]) && (myOffsetStart <= targetAggsForMyDataFDEnd[i])) ||
192 ((myOffsetEnd >= targetAggsForMyDataFDStart[i]) && (myOffsetEnd <= targetAggsForMyDataFDEnd[i]))) {
193
194
195
196 ADIO_Offset currentRoundFDStartForMyTargetAgg = (ADIO_Offset)((ADIO_Offset)targetAggsForMyDataFDStart[i] + (ADIO_Offset)((ADIO_Offset)roundIter*(ADIO_Offset)coll_bufsize));
197 ADIO_Offset currentRoundFDEndForMyTargetAgg = (ADIO_Offset)((ADIO_Offset)targetAggsForMyDataFDStart[i] + (ADIO_Offset)((ADIO_Offset)(roundIter+1)*(ADIO_Offset)coll_bufsize) - (ADIO_Offset)1);
198 if (currentRoundFDEndForMyTargetAgg > targetAggsForMyDataFDEnd[i])
199 currentRoundFDEndForMyTargetAgg = targetAggsForMyDataFDEnd[i];
200
201 #ifdef p2pcontigtrace
202 printf("roundIter %d target iter %d targetAggsForMyData is %d myOffsetStart is %ld myOffsetEnd is %ld targetAggsForMyDataFDStart is %ld targetAggsForMyDataFDEnd is %ld currentRoundFDStartForMyTargetAgg is %ld currentRoundFDEndForMyTargetAgg is %ld\n",
203 roundIter,i,targetAggsForMyData[i],myOffsetStart,myOffsetEnd,
204 targetAggsForMyDataFDStart[i],targetAggsForMyDataFDEnd[i],
205 currentRoundFDStartForMyTargetAgg,currentRoundFDEndForMyTargetAgg);
206 #endif
207
208
209
210
211
212
213 int sendBufferOffset = 0;
214 int bufferAmountToSend = 0;
215
216 if ((myOffsetStart >= currentRoundFDStartForMyTargetAgg) && (myOffsetStart <= currentRoundFDEndForMyTargetAgg)) {
217 if (myOffsetEnd > currentRoundFDEndForMyTargetAgg)
218 bufferAmountToSend = (currentRoundFDEndForMyTargetAgg - myOffsetStart) +1;
219 else
220 bufferAmountToSend = (myOffsetEnd - myOffsetStart) +1;
221 }
222 else if ((myOffsetEnd >= currentRoundFDStartForMyTargetAgg) && (myOffsetEnd <= currentRoundFDEndForMyTargetAgg)) {
223 sendBufferOffset = (int) (currentRoundFDStartForMyTargetAgg - myOffsetStart);
224 if (myOffsetEnd > currentRoundFDEndForMyTargetAgg)
225 bufferAmountToSend = (currentRoundFDEndForMyTargetAgg - currentRoundFDStartForMyTargetAgg) +1;
226 else
227 bufferAmountToSend = (myOffsetEnd - currentRoundFDStartForMyTargetAgg) +1;
228 }
229 else if ((myOffsetStart <= currentRoundFDStartForMyTargetAgg) && (myOffsetEnd >= currentRoundFDEndForMyTargetAgg)) {
230 sendBufferOffset = (int) (currentRoundFDStartForMyTargetAgg - myOffsetStart);
231 bufferAmountToSend = (currentRoundFDEndForMyTargetAgg - currentRoundFDStartForMyTargetAgg) +1;
232 }
233
234 if (bufferAmountToSend > 0) {
235 targetAggIndexesForMyDataThisRound[numTargetAggsThisRound] = i;
236 sendBufferOffsetsThisRound[numTargetAggsThisRound] = sendBufferOffset;
237 bufferAmountsToSendThisRound[numTargetAggsThisRound] = bufferAmountToSend;
238 #ifdef p2pcontigtrace
239 printf("bufferAmountToSend is %d sendBufferOffset is %d\n",bufferAmountToSend,sendBufferOffset);
240 #endif
241
242
243 if (roundIter > 0)
244 MPI_Irecv(&amountOfDataReqestedByTargetAgg[numTargetAggsThisRound],1,
245 MPI_INT,targetAggsForMyData[i],0,
246 fd->comm,&mpiSizeToSendRequest[numTargetAggsThisRound]);
247 numTargetAggsThisRound++;
248
249 }
250 }
251 }
252
253
254 if (iAmUsedAgg) {
255 if ((fd_end[myAggRank] - currentRoundFDStart) < coll_bufsize) {
256 currentRoundFDEnd = fd_end[myAggRank];
257 }
258 else
259 currentRoundFDEnd = currentRoundFDStart + coll_bufsize - 1;
260 #ifdef p2pcontigtrace
261 printf("currentRoundFDStart is %ld currentRoundFDEnd is %ld within file domeain %ld to %ld\n",currentRoundFDStart,currentRoundFDEnd,fd_start[myAggRank],fd_end[myAggRank]);
262 #endif
263 }
264
265 int irecv,isend;
266 int numSourceProcsSentData = 0;
267
268
269 for (i=0;i<numSourceProcs;i++) {
270 if ((remainingDataOffsetToGetPerProc[i] >= currentRoundFDStart) && (remainingDataOffsetToGetPerProc[i] <= currentRoundFDEnd)) {
271 if ((remainingDataOffsetToGetPerProc[i] + remainingDataAmountToGetPerProc[i]) <= currentRoundFDEnd)
272 dataSizeGottenThisRoundPerProc[i] = remainingDataAmountToGetPerProc[i];
273 else
274 dataSizeGottenThisRoundPerProc[i] = (currentRoundFDEnd - remainingDataOffsetToGetPerProc[i]) +1;
275 }
276 else if (((remainingDataOffsetToGetPerProc[i]+remainingDataAmountToGetPerProc[i]) >= currentRoundFDStart) && ((remainingDataOffsetToGetPerProc[i]+remainingDataAmountToGetPerProc[i]) <= currentRoundFDEnd)) {
277 if ((remainingDataOffsetToGetPerProc[i]) >= currentRoundFDStart)
278 dataSizeGottenThisRoundPerProc[i] = remainingDataAmountToGetPerProc[i];
279 else
280 dataSizeGottenThisRoundPerProc[i] = (remainingDataOffsetToGetPerProc[i]-currentRoundFDStart) +1;
281 }
282 else
283 dataSizeGottenThisRoundPerProc[i] = 0;
284
285 #ifdef p2pcontigtrace
286 printf("dataSizeGottenThisRoundPerProc[%d] set to %d - remainingDataOffsetToGetPerProc is %d remainingDataAmountToGetPerProc is %d currentRoundFDStart is %d currentRoundFDEnd is %d\n",i,dataSizeGottenThisRoundPerProc[i],remainingDataOffsetToGetPerProc[i],remainingDataAmountToGetPerProc[i],currentRoundFDStart,currentRoundFDEnd);
287 #endif
288 if (dataSizeGottenThisRoundPerProc[i] > 0) {
289 if (roundIter > 0) {
290 MPI_Isend(&dataSizeGottenThisRoundPerProc[i],1,MPI_INT,
291 sourceProcsForMyData[i],0,fd->comm,
292 &mpiSendDataSizeRequest[numSourceProcsSentData]);
293 numSourceProcsSentData++;
294 }
295 }
296 }
297
298 int numDataSendToWaitFor = 0;
299
300 for (i = 0; i < numTargetAggsThisRound; i++) {
301
302
303 if (roundIter > 0) {
304
305 MPI_Waitany(numTargetAggsThisRound,mpiSizeToSendRequest,
306 &irecv,&mpiWaitAnyStatusFromTargetAggs);
307
308 #ifdef p2pcontigtrace
309 printf("irecv is %d amountOfDataReqestedByTargetAgg is %d bufferAmountsToSendThisRound is %d sendBufferOffsetsThisRound is %d targetAggsForMyData is %d\n",irecv,amountOfDataReqestedByTargetAgg[irecv], bufferAmountsToSendThisRound[irecv], sendBufferOffsetsThisRound[irecv],targetAggsForMyData[targetAggIndexesForMyDataThisRound[irecv]]);
310 #endif
311 ADIOI_Assert(amountOfDataReqestedByTargetAgg[irecv] == bufferAmountsToSendThisRound[irecv]);
312 MPI_Isend(&((char*)buf)[sendBufferOffsetsThisRound[irecv]],
313 bufferAmountsToSendThisRound[irecv],MPI_BYTE,
314 targetAggsForMyData[targetAggIndexesForMyDataThisRound[irecv]],
315 0,fd->comm,&mpiSendDataToTargetAggRequest[irecv]);
316
317 }
318 else {
319 #ifdef p2pcontigtrace
320 printf("i is %d bufferAmountsToSendThisRound is %d sendBufferOffsetsThisRound is %d targetAggsForMyData is %d\n",i, bufferAmountsToSendThisRound[i], sendBufferOffsetsThisRound[i],targetAggsForMyData[targetAggIndexesForMyDataThisRound[i]]);
321 #endif
322 MPI_Isend(&((char*)buf)[sendBufferOffsetsThisRound[i]],bufferAmountsToSendThisRound[i],MPI_BYTE,
323 targetAggsForMyData[targetAggIndexesForMyDataThisRound[i]],0,fd->comm,&mpiSendDataToTargetAggRequest[i]);
324 }
325 numDataSendToWaitFor++;
326 }
327
328 #ifdef ROMIO_GPFS
329 gpfsmpio_prof_cw[GPFSMPIO_CIO_T_DEXCH_SETUP] += (endTimeBase-startTimeBase);
330 startTimeBase = MPI_Wtime();
331 #endif
332
333
334 int numDataRecvToWaitFor = 0;
335 for (i=0;i<numSourceProcs;i++) {
336
337 int currentWBOffset = 0;
338 for (j=0;j<i;j++)
339 currentWBOffset += dataSizeGottenThisRoundPerProc[j];
340
341
342 if (dataSizeGottenThisRoundPerProc[i] > 0) {
343 #ifdef p2pcontigtrace
344 printf("receiving data from rank %d dataSizeGottenThisRoundPerProc is %d currentWBOffset is %d\n",sourceProcsForMyData[i],dataSizeGottenThisRoundPerProc[i],currentWBOffset);
345 #endif
346 MPI_Irecv(&((char*)write_buf)[currentWBOffset],dataSizeGottenThisRoundPerProc[i],
347 MPI_BYTE,sourceProcsForMyData[i],0,
348 fd->comm,&mpiRecvDataRequest[numDataRecvToWaitFor]);
349 mpiRequestMapPerProc[numDataRecvToWaitFor] = i;
350 numDataRecvToWaitFor++;
351 }
352
353 #ifdef p2pcontigtrace
354 printf("MPI_Irecv from rank %d\n",targetAggsForMyData[i]);
355 #endif
356 }
357
358 int totalDataReceivedThisRound = 0;
359 for (i = 0; i < numDataRecvToWaitFor; i++) {
360 MPI_Waitany(numDataRecvToWaitFor,mpiRecvDataRequest,
361 &irecv,&mpiWaitAnyStatusFromSourceProcs);
362 totalDataReceivedThisRound +=
363 dataSizeGottenThisRoundPerProc[mpiRequestMapPerProc[irecv]];
364 totalAmountDataReceived +=
365 dataSizeGottenThisRoundPerProc[mpiRequestMapPerProc[irecv]];
366
367 #ifdef p2pcontigtrace
368 printf("numDataRecvToWaitFor is %d was sent %d bytes data for %d remaining bytes from rank %d irecv index %d\n",numDataRecvToWaitFor,dataSizeGottenThisRoundPerProc[mpiRequestMapPerProc[irecv]],remainingDataAmountToGetPerProc[mpiRequestMapPerProc[irecv]],sourceProcsForMyData[mpiRequestMapPerProc[irecv]],irecv);
369 #endif
370 remainingDataAmountToGetPerProc[mpiRequestMapPerProc[irecv]] -=
371 dataSizeGottenThisRoundPerProc[mpiRequestMapPerProc[irecv]];
372 remainingDataOffsetToGetPerProc[mpiRequestMapPerProc[irecv]] +=
373 dataSizeGottenThisRoundPerProc[mpiRequestMapPerProc[irecv]];
374
375 }
376
377
378
379 for (i=0;i<numSourceProcsSentData;i++) {
380 MPI_Waitany(numSourceProcsSentData,mpiSendDataSizeRequest,
381 &isend,&mpiIsendStatusForSize);
382 }
383
384
385 #ifdef ROMIO_GPFS
386 endTimeBase = MPI_Wtime();
387 gpfsmpio_prof_cw[GPFSMPIO_CIO_T_DEXCH_NET] += (endTimeBase-startTimeBase);
388 #endif
389
390 if (numDataRecvToWaitFor > 0) {
391
392 #ifdef p2pcontigtrace
393 printf("totalDataReceivedThisRound is %d\n",totalDataReceivedThisRound);
394 #endif
395 if (!useIOBuffer) {
396
397 ADIO_WriteContig(fd, write_buf, (int)totalDataReceivedThisRound,
398 MPI_BYTE, ADIO_EXPLICIT_OFFSET,
399 currentRoundFDStart, &status, error_code);
400 } else {
401
402 if(!pthread_equal(io_thread, pthread_self())) {
403 pthread_join(io_thread, &thread_ret);
404 *error_code = *(int *)thread_ret;
405 if (*error_code != MPI_SUCCESS) return;
406 io_thread = pthread_self();
407
408 }
409 io_thread_args.fd = fd;
410
411
412
413 if (currentWriteBuf == 0) {
414 io_thread_args.buf = write_buf0;
415 currentWriteBuf = 1;
416 write_buf = write_buf1;
417 }
418 else {
419 io_thread_args.buf = write_buf1;
420 currentWriteBuf = 0;
421 write_buf = write_buf0;
422 }
423 io_thread_args.io_kind = ADIOI_WRITE;
424 io_thread_args.size = totalDataReceivedThisRound;
425 io_thread_args.offset = currentRoundFDStart;
426 io_thread_args.status = &status;
427 io_thread_args.error_code = *error_code;
428 if ( (pthread_create(&io_thread, NULL,
429 ADIOI_IO_Thread_Func, &(io_thread_args))) != 0)
430 io_thread = pthread_self();
431
432 }
433
434 }
435
436 if (iAmUsedAgg)
437 currentRoundFDStart += coll_bufsize;
438 for (i = 0; i < numDataSendToWaitFor; i++) {
439 MPI_Wait(&mpiSendDataToTargetAggRequest[i],
440 &mpiIsendStatusForData);
441 }
442
443 }
444
445 #ifdef ROMIO_GPFS
446 endTimeBase = MPI_Wtime();
447 gpfsmpio_prof_cw[GPFSMPIO_CIO_T_DEXCH] += (endTimeBase-startTimeBase);
448 #endif
449
450 if (useIOBuffer) {
451
452 if ( !pthread_equal(io_thread, pthread_self()) ) {
453 pthread_join(io_thread, &thread_ret);
454 *error_code = *(int *)thread_ret;
455 }
456
457 }
458
459
460
461 if (iAmUsedAgg) {
462 ADIOI_Free(sourceProcsForMyData);
463 ADIOI_Free(remainingDataAmountToGetPerProc);
464 ADIOI_Free(remainingDataOffsetToGetPerProc);
465 }
466
467 ADIOI_Free(targetAggsForMyData);
468 ADIOI_Free(targetAggsForMyDataFDStart);
469 ADIOI_Free(targetAggsForMyDataFDEnd);
470 ADIOI_Free(targetAggIndexesForMyDataThisRound);
471 ADIOI_Free(sendBufferOffsetsThisRound);
472 ADIOI_Free(bufferAmountsToSendThisRound);
473 ADIOI_Free(amountOfDataReqestedByTargetAgg);
474 ADIOI_Free(mpiSizeToSendRequest);
475 ADIOI_Free(mpiRecvDataRequest);
476 ADIOI_Free(mpiSendDataSizeRequest);
477 ADIOI_Free(mpiSendDataToTargetAggRequest);
478 ADIOI_Free(dataSizeGottenThisRoundPerProc);
479 ADIOI_Free(mpiRequestMapPerProc);
480
481
482 MPI_Barrier(fd->comm);
483 return;
484 }
485
486 void ADIOI_P2PContigReadAggregation(ADIO_File fd,
487 const void *buf,
488 int *error_code,
489 ADIO_Offset *st_offsets,
490 ADIO_Offset *end_offsets,
491 ADIO_Offset *fd_start,
492 ADIO_Offset* fd_end)
493 {
494
495 *error_code = MPI_SUCCESS;
496
497 #ifdef ROMIO_GPFS
498 double startTimeBase,endTimeBase;
499 #endif
500
501 MPI_Status status;
502 pthread_t io_thread;
503 void *thread_ret;
504 ADIOI_IO_ThreadFuncData io_thread_args;
505
506 #ifdef ROMIO_GPFS
507 startTimeBase = MPI_Wtime();
508 #endif
509
510 int nprocs,myrank;
511 MPI_Comm_size(fd->comm, &nprocs);
512 MPI_Comm_rank(fd->comm, &myrank);
513
514 ADIO_Offset myOffsetStart = st_offsets[myrank], myOffsetEnd = end_offsets[myrank];
515
516 int myAggRank = -1;
517 int iAmUsedAgg = 0;
518
519 int naggs = fd->hints->cb_nodes;
520 int coll_bufsize = fd->hints->cb_buffer_size;
521 #ifdef ROMIO_GPFS
522 if (gpfsmpio_pthreadio == 1)
523
524 coll_bufsize = coll_bufsize/2;
525 #endif
526
527 int j;
528 for (j=0;j<naggs;j++) {
529 if (fd->hints->ranklist[j] == myrank) {
530 myAggRank = j;
531 if (fd_end[j] > fd_start[j]) {
532 iAmUsedAgg = 1;
533 }
534 }
535 }
536
537
538
539
540 int *sourceAggsForMyData = (int *)ADIOI_Malloc(naggs * sizeof(int));
541 ADIO_Offset *sourceAggsForMyDataFDStart = (ADIO_Offset *)ADIOI_Malloc(naggs * sizeof(ADIO_Offset));
542 ADIO_Offset *sourceAggsForMyDataFDEnd = (ADIO_Offset *)ADIOI_Malloc(naggs * sizeof(ADIO_Offset));
543 int numSourceAggs = 0;
544 int i;
545 for (i=0;i<naggs;i++) {
546 if ( ((myOffsetStart >= fd_start[i]) && (myOffsetStart <= fd_end[i])) ||
547 ((myOffsetEnd >= fd_start[i]) && (myOffsetEnd <= fd_end[i]))) {
548 sourceAggsForMyData[numSourceAggs] = fd->hints->ranklist[i];
549 sourceAggsForMyDataFDStart[numSourceAggs] = fd_start[i];
550 sourceAggsForMyDataFDEnd[numSourceAggs] = fd_end[i];
551 numSourceAggs++;
552 }
553 }
554
555
556
557
558
559 int *targetProcsForMyData=NULL;
560 int *remainingDataAmountToSendPerProc=NULL;
561 ADIO_Offset *remainingDataOffsetToSendPerProc=NULL;
562
563 int numTargetProcs = 0;
564
565 if (iAmUsedAgg) {
566
567
568
569 for (i=0;i<nprocs;i++)
570 if ( ((st_offsets[i] >= fd_start[myAggRank]) &&
571 (st_offsets[i] <= fd_end[myAggRank])) ||
572 ((end_offsets[i] >= fd_start[myAggRank]) &&
573 (end_offsets[i] <= fd_end[myAggRank])) )
574 numTargetProcs++;
575
576 targetProcsForMyData =
577 (int *)ADIOI_Malloc(numTargetProcs * sizeof(int));
578 remainingDataAmountToSendPerProc =
579 (int *)ADIOI_Malloc(numTargetProcs * sizeof(int));
580 remainingDataOffsetToSendPerProc =
581 (ADIO_Offset *)ADIOI_Malloc(numTargetProcs * sizeof(ADIO_Offset));
582
583
584
585
586 numTargetProcs = 0;
587 for (i=0;i<nprocs;i++) {
588 if ( ((st_offsets[i] >= fd_start[myAggRank]) && (st_offsets[i] <= fd_end[myAggRank])) || ((end_offsets[i] >= fd_start[myAggRank]) && (end_offsets[i] <= fd_end[myAggRank]))) {
589 targetProcsForMyData[numTargetProcs] = i;
590 if ( ((st_offsets[i] >= fd_start[myAggRank]) && (st_offsets[i] <= fd_end[myAggRank])) && ((end_offsets[i] >= fd_start[myAggRank]) && (end_offsets[i] <= fd_end[myAggRank]))) {
591 remainingDataAmountToSendPerProc[numTargetProcs] = (end_offsets[i] - st_offsets[i])+1;
592 remainingDataOffsetToSendPerProc[numTargetProcs] = st_offsets[i];
593 }
594 else if ((st_offsets[i] >= fd_start[myAggRank]) && (st_offsets[i] <= fd_end[myAggRank])) {
595 remainingDataAmountToSendPerProc[numTargetProcs] = (fd_end[myAggRank] - st_offsets[i]) +1;
596 remainingDataOffsetToSendPerProc[numTargetProcs] = st_offsets[i];
597 }
598 else {
599 remainingDataAmountToSendPerProc[numTargetProcs] = (end_offsets[i] - fd_start[myAggRank]) +1;
600 remainingDataOffsetToSendPerProc[numTargetProcs] = fd_start[myAggRank];
601 }
602 numTargetProcs++;
603 }
604 }
605 }
606
607
608 MPI_Request *mpiRecvDataFromSourceAggsRequest = (MPI_Request *) ADIOI_Malloc(numSourceAggs * sizeof(MPI_Request));
609 MPI_Request *mpiSendDataToTargetProcRequest = (MPI_Request *) ADIOI_Malloc(numTargetProcs * sizeof(MPI_Request));
610 MPI_Status mpiWaitAnyStatusFromSourceProcs,mpiIsendStatusForData;
611
612
613
614 char *read_buf0 = fd->io_buf;
615 char *read_buf1 = fd->io_buf + coll_bufsize;
616
617 char *read_buf = read_buf0;
618
619
620 ADIO_Offset numberOfRounds = (ADIO_Offset)((((ADIO_Offset)(end_offsets[nprocs-1]-st_offsets[0]))/((ADIO_Offset)((ADIO_Offset)coll_bufsize*(ADIO_Offset)naggs)))) + 1;
621
622 ADIO_Offset currentRoundFDStart = 0, nextRoundFDStart = 0;
623 ADIO_Offset currentRoundFDEnd = 0, nextRoundFDEnd = 0;
624
625 if (iAmUsedAgg) {
626 currentRoundFDStart = fd_start[myAggRank];
627 nextRoundFDStart = fd_start[myAggRank];
628 }
629
630 int *dataSizeSentThisRoundPerProc = (int *)ADIOI_Malloc(numTargetProcs * sizeof(int));
631 int *sourceAggIndexesForMyDataThisRound = (int *)ADIOI_Malloc(numSourceAggs * sizeof(int));
632 int *recvBufferOffsetsThisRound = (int *)ADIOI_Malloc(numSourceAggs * sizeof(int));
633 int *bufferAmountsToGetThisRound = (int *)ADIOI_Malloc(numSourceAggs * sizeof(int));
634 *error_code = MPI_SUCCESS;
635
636 int currentReadBuf = 0;
637 int useIOBuffer = 0;
638 #ifdef ROMIO_GPFS
639 if (gpfsmpio_pthreadio && (numberOfRounds>1)) {
640 useIOBuffer = 1;
641 io_thread = pthread_self();
642 }
643 #endif
644
645 #ifdef ROMIO_GPFS
646 endTimeBase = MPI_Wtime();
647 gpfsmpio_prof_cw[GPFSMPIO_CIO_T_MYREQ] += (endTimeBase-startTimeBase);
648 #endif
649
650
651
652 int roundIter;
653 for (roundIter=0;roundIter<numberOfRounds;roundIter++) {
654
655 int irecv,isend;
656
657 if (iAmUsedAgg) {
658
659 currentRoundFDStart = nextRoundFDStart;
660
661 if (!useIOBuffer || (roundIter == 0)) {
662 int amountDataToReadThisRound;
663 if ((fd_end[myAggRank] - currentRoundFDStart) < coll_bufsize) {
664 currentRoundFDEnd = fd_end[myAggRank];
665 amountDataToReadThisRound = ((currentRoundFDEnd-currentRoundFDStart)+1);
666 }
667 else {
668 currentRoundFDEnd = currentRoundFDStart + coll_bufsize - 1;
669 amountDataToReadThisRound = coll_bufsize;
670 }
671
672
673 ADIO_ReadContig(fd, read_buf,amountDataToReadThisRound,
674 MPI_BYTE, ADIO_EXPLICIT_OFFSET, currentRoundFDStart,
675 &status, error_code);
676 currentReadBuf = 1;
677
678 #ifdef ROMIO_GPFS
679 endTimeBase = MPI_Wtime();
680 #endif
681 }
682
683 if (useIOBuffer) {
684
685
686 if (roundIter > 0)
687 currentRoundFDEnd = nextRoundFDEnd;
688
689 if (roundIter < (numberOfRounds-1)) {
690 nextRoundFDStart += coll_bufsize;
691 int amountDataToReadNextRound;
692 if ((fd_end[myAggRank] - nextRoundFDStart) < coll_bufsize) {
693 nextRoundFDEnd = fd_end[myAggRank];
694 amountDataToReadNextRound = ((nextRoundFDEnd-nextRoundFDStart)+1);
695 }
696 else {
697 nextRoundFDEnd = nextRoundFDStart + coll_bufsize - 1;
698 amountDataToReadNextRound = coll_bufsize;
699 }
700
701 if(!pthread_equal(io_thread, pthread_self())) {
702 pthread_join(io_thread, &thread_ret);
703 *error_code = *(int *)thread_ret;
704 if (*error_code != MPI_SUCCESS) return;
705 io_thread = pthread_self();
706
707 }
708 io_thread_args.fd = fd;
709
710
711
712 if (currentReadBuf == 0) {
713 io_thread_args.buf = read_buf0;
714 currentReadBuf = 1;
715 read_buf = read_buf1;
716 }
717 else {
718 io_thread_args.buf = read_buf1;
719 currentReadBuf = 0;
720 read_buf = read_buf0;
721 }
722 io_thread_args.io_kind = ADIOI_READ;
723 io_thread_args.size = amountDataToReadNextRound;
724 io_thread_args.offset = nextRoundFDStart;
725 io_thread_args.status = &status;
726 io_thread_args.error_code = *error_code;
727 if ( (pthread_create(&io_thread, NULL,
728 ADIOI_IO_Thread_Func, &(io_thread_args))) != 0)
729 io_thread = pthread_self();
730
731 }
732 else {
733
734 if(!pthread_equal(io_thread, pthread_self())) {
735 pthread_join(io_thread, &thread_ret);
736 *error_code = *(int *)thread_ret;
737 if (*error_code != MPI_SUCCESS) return;
738 io_thread = pthread_self();
739
740 }
741 if (currentReadBuf == 0) {
742 read_buf = read_buf1;
743 }
744 else {
745 read_buf = read_buf0;
746 }
747
748 }
749 }
750 }
751
752
753
754 int numSourceAggsThisRound = 0;
755 for (i=0;i<numSourceAggs;i++) {
756 if ( ((myOffsetStart >= sourceAggsForMyDataFDStart[i]) && (myOffsetStart <= sourceAggsForMyDataFDEnd[i]))
757 || ((myOffsetEnd >= sourceAggsForMyDataFDStart[i]) && (myOffsetEnd <= sourceAggsForMyDataFDEnd[i])) ) {
758
759
760
761
762
763 ADIO_Offset currentRoundFDStartForMySourceAgg =
764 (ADIO_Offset)((ADIO_Offset)sourceAggsForMyDataFDStart[i] +
765 (ADIO_Offset)((ADIO_Offset)roundIter*(ADIO_Offset)coll_bufsize));
766 ADIO_Offset currentRoundFDEndForMySourceAgg =
767 (ADIO_Offset)((ADIO_Offset)sourceAggsForMyDataFDStart[i] +
768 (ADIO_Offset)((ADIO_Offset)(roundIter+1)*(ADIO_Offset)coll_bufsize) - (ADIO_Offset)1);
769 if (currentRoundFDEndForMySourceAgg > sourceAggsForMyDataFDEnd[i])
770 currentRoundFDEndForMySourceAgg = sourceAggsForMyDataFDEnd[i];
771
772 #ifdef p2pcontigtrace
773 printf("roundIter %d source iter %d sourceAggsForMyData is %d myOffsetStart is %ld myOffsetEnd is %ld sourceAggsForMyDataFDStart is %ld sourceAggsForMyDataFDEnd is %ld currentRoundFDStartForMySourceAgg is %ld currentRoundFDEndForMySourceAgg is %ld\n",roundIter,i,sourceAggsForMyData[i],myOffsetStart,myOffsetEnd,sourceAggsForMyDataFDStart[i],sourceAggsForMyDataFDEnd[i],currentRoundFDStartForMySourceAgg,currentRoundFDEndForMySourceAgg);
774 #endif
775
776
777
778 int recvBufferOffset = 0;
779 int bufferAmountToGet = 0;
780
781 if ((myOffsetStart >= currentRoundFDStartForMySourceAgg) && (myOffsetStart <= currentRoundFDEndForMySourceAgg)) {
782 if (myOffsetEnd > currentRoundFDEndForMySourceAgg)
783 bufferAmountToGet = (currentRoundFDEndForMySourceAgg - myOffsetStart) +1;
784 else
785 bufferAmountToGet = (myOffsetEnd - myOffsetStart) +1;
786 }
787 else if ((myOffsetEnd >= currentRoundFDStartForMySourceAgg) && (myOffsetEnd <= currentRoundFDEndForMySourceAgg)) {
788 recvBufferOffset = (int) (currentRoundFDStartForMySourceAgg - myOffsetStart);
789 if (myOffsetEnd > currentRoundFDEndForMySourceAgg)
790 bufferAmountToGet = (currentRoundFDEndForMySourceAgg - currentRoundFDStartForMySourceAgg) +1;
791 else
792 bufferAmountToGet = (myOffsetEnd - currentRoundFDStartForMySourceAgg) +1;
793 }
794 else if ((myOffsetStart <= currentRoundFDStartForMySourceAgg) && (myOffsetEnd >= currentRoundFDEndForMySourceAgg)) {
795 recvBufferOffset = (int) (currentRoundFDStartForMySourceAgg - myOffsetStart);
796 bufferAmountToGet = (currentRoundFDEndForMySourceAgg - currentRoundFDStartForMySourceAgg) +1;
797 }
798
799
800 if (bufferAmountToGet > 0) {
801 sourceAggIndexesForMyDataThisRound[numSourceAggsThisRound] = i;
802 recvBufferOffsetsThisRound[numSourceAggsThisRound] = recvBufferOffset;
803 bufferAmountsToGetThisRound[numSourceAggsThisRound] = bufferAmountToGet;
804 #ifdef p2pcontigtrace
805 printf("bufferAmountToGet is %d recvBufferOffset is %d\n",bufferAmountToGet,recvBufferOffset);
806 #endif
807 numSourceAggsThisRound++;
808 }
809 }
810 }
811
812
813
814 for (i=0;i<numTargetProcs;i++) {
815 if ((remainingDataOffsetToSendPerProc[i] >= currentRoundFDStart) &&
816 (remainingDataOffsetToSendPerProc[i] <= currentRoundFDEnd)) {
817 if ((remainingDataOffsetToSendPerProc[i] +
818 remainingDataAmountToSendPerProc[i]) <= currentRoundFDEnd)
819 dataSizeSentThisRoundPerProc[i] = remainingDataAmountToSendPerProc[i];
820 else
821 dataSizeSentThisRoundPerProc[i] =
822 (currentRoundFDEnd - remainingDataOffsetToSendPerProc[i]) +1;
823 }
824 else if (((remainingDataOffsetToSendPerProc[i]+
825 remainingDataAmountToSendPerProc[i]) >=
826 currentRoundFDStart) &&
827 ((remainingDataOffsetToSendPerProc[i]+
828 remainingDataAmountToSendPerProc[i]) <= currentRoundFDEnd)) {
829 if ((remainingDataOffsetToSendPerProc[i]) >= currentRoundFDStart)
830 dataSizeSentThisRoundPerProc[i] = remainingDataAmountToSendPerProc[i];
831 else
832 dataSizeSentThisRoundPerProc[i] =
833 (remainingDataOffsetToSendPerProc[i]-currentRoundFDStart) +1;
834 }
835 else
836 dataSizeSentThisRoundPerProc[i] = 0;
837
838 }
839
840
841 for (i = 0; i < numSourceAggsThisRound; i++) {
842 MPI_Irecv(&((char*)buf)[recvBufferOffsetsThisRound[i]],
843 bufferAmountsToGetThisRound[i],MPI_BYTE,
844 sourceAggsForMyData[sourceAggIndexesForMyDataThisRound[i]],0,fd->comm,
845 &mpiRecvDataFromSourceAggsRequest[i]);
846 }
847
848
849 int numTargetProcsSentThisRound = 0;
850 for (i=0;i<numTargetProcs;i++) {
851
852 int currentWBOffset = 0;
853 for (j=0;j<i;j++)
854 currentWBOffset += dataSizeSentThisRoundPerProc[j];
855
856
857 if (dataSizeSentThisRoundPerProc[i] > 0) {
858 MPI_Isend(&((char*)read_buf)[currentWBOffset],
859 dataSizeSentThisRoundPerProc[i],
860 MPI_BYTE,targetProcsForMyData[i],0,
861 fd->comm,&mpiSendDataToTargetProcRequest[numTargetProcsSentThisRound]);
862 numTargetProcsSentThisRound++;
863 remainingDataAmountToSendPerProc[i] -= dataSizeSentThisRoundPerProc[i];
864 remainingDataOffsetToSendPerProc[i] += dataSizeSentThisRoundPerProc[i];
865 }
866 }
867
868
869 for (i = 0; i < numSourceAggsThisRound; i++) {
870 MPI_Waitany(numSourceAggsThisRound,mpiRecvDataFromSourceAggsRequest,
871 &irecv,&mpiWaitAnyStatusFromSourceProcs);
872 }
873
874 nextRoundFDStart = currentRoundFDStart + coll_bufsize;
875
876
877 for (i=0;i<numTargetProcsSentThisRound;i++) {
878 MPI_Waitany(numTargetProcsSentThisRound,mpiSendDataToTargetProcRequest,
879 &isend,&mpiIsendStatusForData);
880 }
881
882 MPI_Barrier(fd->comm);
883
884 }
885
886 if (useIOBuffer) {
887
888 if ( !pthread_equal(io_thread, pthread_self()) ) {
889 pthread_join(io_thread, &thread_ret);
890 *error_code = *(int *)thread_ret;
891 }
892 }
893
894 if (iAmUsedAgg) {
895 ADIOI_Free(targetProcsForMyData);
896 ADIOI_Free(remainingDataAmountToSendPerProc);
897 ADIOI_Free(remainingDataOffsetToSendPerProc);
898 }
899
900 ADIOI_Free(sourceAggsForMyData);
901 ADIOI_Free(sourceAggsForMyDataFDStart);
902 ADIOI_Free(sourceAggsForMyDataFDEnd);
903
904 ADIOI_Free(mpiRecvDataFromSourceAggsRequest);
905 ADIOI_Free(mpiSendDataToTargetProcRequest);
906 ADIOI_Free(dataSizeSentThisRoundPerProc);
907 ADIOI_Free(sourceAggIndexesForMyDataThisRound);
908 ADIOI_Free(recvBufferOffsetsThisRound);
909 ADIOI_Free(bufferAmountsToGetThisRound);
910
911
912 MPI_Barrier(fd->comm);
913
914 return;
915
916 }