This source file includes following definitions.
- ADIOI_OneSidedSetup
- ADIOI_OneSidedCleanup
- nonContigSourceDataBufferAdvance
- ADIOI_OneSidedWriteAggregation
- ADIOI_OneSidedReadAggregation
1
2
3
4
5
6
7 #include "adio.h"
8 #include "adio_extern.h"
9 #ifdef ROMIO_GPFS
10
11 #include "../ad_gpfs/ad_gpfs_tuning.h"
12 #else
13 int gpfsmpio_onesided_no_rmw = 0;
14 int gpfsmpio_write_aggmethod = 0;
15 int gpfsmpio_read_aggmethod = 0;
16 int gpfsmpio_onesided_always_rmw = 0;
17 #endif
18
19 #include <pthread.h>
20
21
22
23
24
25
26
27
28 typedef struct NonContigSourceBufOffset {
29 int dataTypeExtent;
30 int flatBufIndice;
31 ADIO_Offset indiceOffset;
32 } NonContigSourceBufOffset;
33
34
35
36
37
38
39
40
41
42
43
44
45 typedef struct FDSourceBufferState {
46
47 ADIO_Offset indiceOffset;
48 MPI_Aint bufTypeExtent;
49 int dataTypeExtent;
50 int flatBufIndice;
51
52 ADIO_Offset sourceBufferOffset;
53
54 } FDSourceBufferState;
55
56
57 static int ADIOI_OneSidedSetup(ADIO_File fd, int procs) {
58 int ret = MPI_SUCCESS;
59
60 ret = MPI_Win_create(fd->io_buf,fd->hints->cb_buffer_size,1,
61 MPI_INFO_NULL,fd->comm, &fd->io_buf_window);
62 if (ret != MPI_SUCCESS) goto fn_exit;
63 fd->io_buf_put_amounts = (int *) ADIOI_Malloc(procs*sizeof(int));
64 ret =MPI_Win_create(fd->io_buf_put_amounts,procs*sizeof(int),sizeof(int),
65 MPI_INFO_NULL,fd->comm, &fd->io_buf_put_amounts_window);
66 fn_exit:
67 return ret;
68 }
69
70 int ADIOI_OneSidedCleanup(ADIO_File fd)
71 {
72 int ret = MPI_SUCCESS;
73 if (fd->io_buf_window != MPI_WIN_NULL)
74 ret = MPI_Win_free(&fd->io_buf_window);
75 if (fd->io_buf_put_amounts_window != MPI_WIN_NULL)
76 ret = MPI_Win_free(&fd->io_buf_put_amounts_window);
77 if (fd->io_buf_put_amounts != NULL)
78 ADIOI_Free(fd->io_buf_put_amounts);
79
80 return ret;
81 }
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101 inline static void nonContigSourceDataBufferAdvance(char *sourceDataBuffer,
102 ADIOI_Flatlist_node *flatBuf, int targetNumBytes, int packing,
103 FDSourceBufferState *currentFDSourceBufferState, char *packedDataBufer)
104 {
105
106
107 ADIO_Offset currentIndiceOffset = currentFDSourceBufferState->indiceOffset;
108 ADIO_Offset bufTypeExtent = (ADIO_Offset)currentFDSourceBufferState->bufTypeExtent;
109 ADIO_Offset currentDataTypeExtent =
110 (ADIO_Offset)currentFDSourceBufferState->dataTypeExtent;
111 int currentFlatBufIndice = currentFDSourceBufferState->flatBufIndice;
112
113 int targetSendDataIndex = 0;
114
115 #ifdef onesidedtrace
116 printf("nonContigSourceDataBufferAdvance: currentFlatBufIndice is %d currentDataTypeExtent is %ld currentIndiceOffset is %ld\n",currentFlatBufIndice,currentDataTypeExtent,currentIndiceOffset);
117 #endif
118
119 int remainingBytesToLoad = targetNumBytes;
120 while (remainingBytesToLoad > 0) {
121 if ((flatBuf->blocklens[currentFlatBufIndice] - currentIndiceOffset) >= remainingBytesToLoad) {
122 ADIO_Offset physicalSourceBufferOffset = (currentDataTypeExtent * bufTypeExtent) + flatBuf->indices[currentFlatBufIndice] + currentIndiceOffset;
123
124 #ifdef onesidedtrace
125 printf("loading remainingBytesToLoad %d from src buffer offset %ld to targetSendDataIndex %d\n",remainingBytesToLoad,physicalSourceBufferOffset,targetSendDataIndex);
126 #endif
127
128 if (packedDataBufer != NULL) {
129 if (packing)
130 memcpy(&(packedDataBufer[targetSendDataIndex]),&(sourceDataBuffer[physicalSourceBufferOffset]),remainingBytesToLoad);
131 else
132 memcpy(&(sourceDataBuffer[physicalSourceBufferOffset]),&(packedDataBufer[targetSendDataIndex]),remainingBytesToLoad);
133 }
134
135 targetSendDataIndex += remainingBytesToLoad;
136 currentIndiceOffset += (ADIO_Offset)remainingBytesToLoad;
137 if (currentIndiceOffset >= flatBuf->blocklens[currentFlatBufIndice]) {
138 currentIndiceOffset = (ADIO_Offset)0;
139 currentFlatBufIndice++;
140 if (currentFlatBufIndice == flatBuf->count) {
141 currentFlatBufIndice = 0;
142 currentDataTypeExtent++;
143 }
144 }
145 remainingBytesToLoad = 0;
146
147 }
148 else {
149 int amountDataToLoad = (flatBuf->blocklens[currentFlatBufIndice] - currentIndiceOffset);
150 ADIO_Offset physicalSourceBufferOffset = (currentDataTypeExtent * bufTypeExtent) + flatBuf->indices[currentFlatBufIndice] + currentIndiceOffset;
151
152 #ifdef onesidedtrace
153 printf("loading amountDataToLoad %d from src buffer offset %ld to targetSendDataIndex %d\n",amountDataToLoad,physicalSourceBufferOffset,targetSendDataIndex);
154 #endif
155 if (packedDataBufer != NULL) {
156 if (packing)
157 memcpy(&(packedDataBufer[targetSendDataIndex]),&(sourceDataBuffer[physicalSourceBufferOffset]),amountDataToLoad);
158 else
159 memcpy(&(sourceDataBuffer[physicalSourceBufferOffset]),&(packedDataBufer[targetSendDataIndex]),amountDataToLoad);
160 }
161
162 targetSendDataIndex += amountDataToLoad;
163 currentIndiceOffset = (ADIO_Offset)0;
164 currentFlatBufIndice++;
165 if (currentFlatBufIndice == flatBuf->count) {
166 currentFlatBufIndice = 0;
167 currentDataTypeExtent++;
168 }
169 remainingBytesToLoad -= amountDataToLoad;
170 }
171 }
172
173
174
175 currentFDSourceBufferState->indiceOffset = currentIndiceOffset;
176 currentFDSourceBufferState->dataTypeExtent = (int) currentDataTypeExtent;
177 currentFDSourceBufferState->flatBufIndice = currentFlatBufIndice;
178 #ifdef onesidedtrace
179 printf("source buf advanced to currentFlatBufIndice %d currentDataTypeExtent %ld currentIndiceOffset %ld\n",currentFlatBufIndice,currentDataTypeExtent,currentIndiceOffset);
180 #endif
181 }
182
183
184 void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
185 ADIO_Offset *offset_list,
186 ADIO_Offset *len_list,
187 int contig_access_count,
188 const void *buf,
189 MPI_Datatype datatype,
190 int *error_code,
191 ADIO_Offset *st_offsets,
192 ADIO_Offset *end_offsets,
193 int numNonZeroDataOffsets,
194 ADIO_Offset *fd_start,
195 ADIO_Offset* fd_end,
196 int *hole_found)
197
198 {
199 int i,j;
200
201 #ifdef onesidedtrace
202 if (buf == NULL) {
203 printf("ADIOI_OneSidedWriteAggregation - buf is NULL contig_access_count is %d\n",contig_access_count);
204 for (i=0;i<contig_access_count;i++)
205 printf("offset_list[%d] is %ld len_list[%d] is %ld\n",
206 i,offset_list[i],i,len_list[i]);
207 }
208 if (contig_access_count < 0)
209 printf("ADIOI_OneSidedWriteAggregation - contig_access_count "
210 "of %d is less than 0\n",contig_access_count);
211 #endif
212
213 int lenListOverZero = 0;
214 for (i=0;((i<contig_access_count) && (!lenListOverZero));i++)
215 if (len_list[i] > 0)
216 lenListOverZero = 1;
217
218
219 *error_code = MPI_SUCCESS;
220
221 #ifdef ROMIO_GPFS
222 double startTimeBase,endTimeBase;
223 startTimeBase = MPI_Wtime();
224 #endif
225
226 MPI_Status status;
227 pthread_t io_thread;
228 void *thread_ret;
229 ADIOI_IO_ThreadFuncData io_thread_args;
230
231 int nprocs,myrank;
232 MPI_Comm_size(fd->comm, &nprocs);
233 MPI_Comm_rank(fd->comm, &myrank);
234 #ifdef onesidedtrace
235 printf("ADIOI_OneSidedWriteAggregation started on rank %d\n",myrank);
236 #endif
237
238
239 if (fd->io_buf_window == MPI_WIN_NULL ||
240 fd->io_buf_put_amounts_window == MPI_WIN_NULL)
241 {
242 ADIOI_OneSidedSetup(fd, nprocs);
243 }
244
245
246
247
248
249
250
251 int bufTypeIsContig;
252
253 MPI_Aint bufTypeExtent, lb;
254 ADIOI_Flatlist_node *flatBuf=NULL;
255 ADIOI_Datatype_iscontig(datatype, &bufTypeIsContig);
256
257 if (!bufTypeIsContig) {
258
259 flatBuf = ADIOI_Flatten_and_find(datatype);
260 MPI_Type_get_extent(datatype, &lb, &bufTypeExtent);
261 #ifdef onesidedtrace
262 printf("flatBuf->count is %d bufTypeExtent is %d\n", flatBuf->count,bufTypeExtent);
263 for (i=0;i<flatBuf->count;i++)
264 printf("flatBuf->blocklens[%d] is %d flatBuf->indices[%d] is %ld\n",i,flatBuf->blocklens[i],i,flatBuf->indices[i]);
265 #endif
266 }
267
268 int naggs = fd->hints->cb_nodes;
269
270
271
272
273
274
275
276
277
278 #ifdef onesidedtrace
279 printf("sizeof(FDSourceBufferState) is %d - make sure is 32 for 32-byte memalign optimal\n",sizeof(FDSourceBufferState));
280 #endif
281 FDSourceBufferState *currentFDSourceBufferState;
282
283 currentFDSourceBufferState = (FDSourceBufferState *) ADIOI_Malloc(naggs * sizeof(FDSourceBufferState));
284 for (i=0;i<naggs;i++) {
285
286
287 if (bufTypeIsContig) {
288 currentFDSourceBufferState[i].sourceBufferOffset = -1;
289 }
290 else {
291 currentFDSourceBufferState[i].indiceOffset = -1;
292 }
293 }
294
295 #ifdef onesidedtrace
296 printf(" ADIOI_OneSidedWriteAggregation bufTypeIsContig is %d contig_access_count is %d\n",bufTypeIsContig,contig_access_count);
297 #endif
298
299
300
301
302 int maxNumContigOperations = contig_access_count;
303
304 ADIO_Offset lastFileOffset = 0, firstFileOffset = -1;
305
306
307
308 for (j=0;j<numNonZeroDataOffsets;j++) {
309 #ifdef onesidedtrace
310 printf("end_offsets[%d] is %ld st_offsets[%d] is %ld\n",j,end_offsets[j],j,st_offsets[j]);
311 #endif
312 lastFileOffset = ADIOI_MAX(lastFileOffset,end_offsets[j]);
313 if (firstFileOffset == -1)
314 firstFileOffset = st_offsets[j];
315 else
316 firstFileOffset = ADIOI_MIN(firstFileOffset,st_offsets[j]);
317 }
318
319 int myAggRank = -1;
320 int iAmUsedAgg = 0;
321
322
323
324 ADIO_Offset coll_bufsize = (ADIO_Offset)(fd->hints->cb_buffer_size);
325 #ifdef ROMIO_GPFS
326 if (gpfsmpio_pthreadio == 1) {
327
328 coll_bufsize = (ADIO_Offset)(fd->hints->cb_buffer_size/2);
329 }
330 #endif
331
332
333
334
335 int greatestFileDomainAggRank = -1,smallestFileDomainAggRank = -1;
336 ADIO_Offset greatestFileDomainOffset = 0;
337 ADIO_Offset smallestFileDomainOffset = lastFileOffset;
338 for (j=0;j<naggs;j++) {
339 if (fd_end[j] > greatestFileDomainOffset) {
340 greatestFileDomainOffset = fd_end[j];
341 greatestFileDomainAggRank = j;
342 }
343 if (fd_start[j] < smallestFileDomainOffset) {
344 smallestFileDomainOffset = fd_start[j];
345 smallestFileDomainAggRank = j;
346 }
347 if (fd->hints->ranklist[j] == myrank) {
348 myAggRank = j;
349 if (fd_end[j] > fd_start[j]) {
350 iAmUsedAgg = 1;
351 }
352 }
353 }
354
355 #ifdef onesidedtrace
356 printf("contig_access_count is %d lastFileOffset is %ld firstFileOffset is %ld\n",contig_access_count,lastFileOffset,firstFileOffset);
357 for (j=0;j<contig_access_count;j++) {
358 printf("offset_list[%d]: %ld , len_list[%d]: %ld\n",j,offset_list[j],j,len_list[j]);
359
360 }
361 #endif
362
363
364
365 int numberOfRounds = 0;
366 for (j=0;j<naggs;j++) {
367 int currentNumberOfRounds = (int)(((fd_end[j] - fd_start[j])+(ADIO_Offset)1)/coll_bufsize);
368 if (((ADIO_Offset)currentNumberOfRounds*coll_bufsize) < ((fd_end[j] - fd_start[j])+(ADIO_Offset)1))
369 currentNumberOfRounds++;
370 if (currentNumberOfRounds > numberOfRounds)
371 numberOfRounds = currentNumberOfRounds;
372 }
373
374
375
376
377 int *targetAggsForMyData = (int *)ADIOI_Malloc(naggs * sizeof(int));
378 ADIO_Offset *targetAggsForMyDataFDStart = (ADIO_Offset *)ADIOI_Malloc(naggs * sizeof(ADIO_Offset));
379 ADIO_Offset *targetAggsForMyDataFDEnd = (ADIO_Offset *)ADIOI_Malloc(naggs * sizeof(ADIO_Offset));
380 int numTargetAggs = 0;
381
382
383
384
385 int **targetAggsForMyDataFirstOffLenIndex = (int **)ADIOI_Malloc(numberOfRounds * sizeof(int *));
386 for (i=0;i<numberOfRounds;i++) {
387 targetAggsForMyDataFirstOffLenIndex[i] = (int *)ADIOI_Malloc(naggs * sizeof(int));
388 for (j=0;j<naggs;j++)
389 targetAggsForMyDataFirstOffLenIndex[i][j] = -1;
390 }
391
392
393
394
395 int **targetAggsForMyDataLastOffLenIndex = (int **)ADIOI_Malloc(numberOfRounds * sizeof(int *));
396 for (i=0;i<numberOfRounds;i++)
397 targetAggsForMyDataLastOffLenIndex[i] = (int *)ADIOI_Malloc(naggs * sizeof(int));
398
399 #ifdef onesidedtrace
400 printf("NumberOfRounds is %d\n",numberOfRounds);
401 for (i=0;i<naggs;i++)
402 printf("fd->hints->ranklist[%d]is %d fd_start is %ld fd_end is %ld\n",i,fd->hints->ranklist[i],fd_start[i],fd_end[i]);
403 for (j=0;j<contig_access_count;j++)
404 printf("offset_list[%d] is %ld len_list is %ld\n",j,offset_list[j],len_list[j]);
405 #endif
406
407 int currentAggRankListIndex = 0;
408 int maxNumNonContigSourceChunks = 0;
409
410 ADIO_Offset currentSourceBufferOffset = 0;
411 int currentDataTypeExtent = 0;
412 int currentFlatBufIndice=0;
413 ADIO_Offset currentIndiceOffset = 0;
414
415
416
417 ADIO_Offset intraRoundCollBufsizeOffset = 0;
418
419
420
421 int *targetAggsForMyDataCurrentRoundIter = (int *)ADIOI_Malloc(naggs * sizeof(int));
422 for (i=0;i<naggs;i++)
423 targetAggsForMyDataCurrentRoundIter[i] = 0;
424
425
426
427
428
429
430 if ((contig_access_count > 0) && (buf != NULL) && lenListOverZero) {
431 int blockIter;
432 for (blockIter=0;blockIter<contig_access_count;blockIter++) {
433
434
435
436 if (blockIter>0) {
437 if (bufTypeIsContig) {
438 currentSourceBufferOffset += len_list[blockIter-1];
439 }
440 else {
441
442
443
444
445
446 ADIO_Offset sourceBlockTotal = 0;
447 int lastIndiceUsed = currentFlatBufIndice;
448 int numNonContigSourceChunks = 0;
449
450 while (sourceBlockTotal < len_list[blockIter-1]) {
451 numNonContigSourceChunks++;
452 sourceBlockTotal += (flatBuf->blocklens[currentFlatBufIndice] - currentIndiceOffset);
453 lastIndiceUsed = currentFlatBufIndice;
454 currentFlatBufIndice++;
455 if (currentFlatBufIndice == flatBuf->count) {
456 currentFlatBufIndice = 0;
457 currentDataTypeExtent++;
458 }
459 currentIndiceOffset = (ADIO_Offset)0;
460 }
461 if (sourceBlockTotal > len_list[blockIter-1]) {
462 currentFlatBufIndice--;
463 if (currentFlatBufIndice < 0 ) {
464 currentDataTypeExtent--;
465 currentFlatBufIndice = flatBuf->count-1;
466 }
467 currentIndiceOffset = len_list[blockIter-1] - (sourceBlockTotal - flatBuf->blocklens[lastIndiceUsed]);
468
469 }
470 else
471 currentIndiceOffset = (ADIO_Offset)0;
472 maxNumContigOperations += (numNonContigSourceChunks+2);
473 if (numNonContigSourceChunks > maxNumNonContigSourceChunks)
474 maxNumNonContigSourceChunks = numNonContigSourceChunks;
475
476 #ifdef onesidedtrace
477 printf("blockiter %d currentFlatBufIndice is now %d currentDataTypeExtent is now %d currentIndiceOffset is now %ld maxNumContigOperations is now %d\n",blockIter,currentFlatBufIndice,currentDataTypeExtent,currentIndiceOffset,maxNumContigOperations);
478 #endif
479 }
480 }
481
482
483
484
485 if ((blockIter == (contig_access_count-1)) && (!bufTypeIsContig)) {
486 ADIO_Offset sourceBlockTotal = 0;
487 int tmpCurrentFlatBufIndice = currentFlatBufIndice;
488 int lastNumNonContigSourceChunks = 0;
489 while (sourceBlockTotal < len_list[blockIter]) {
490 lastNumNonContigSourceChunks++;
491 sourceBlockTotal += flatBuf->blocklens[tmpCurrentFlatBufIndice];
492 tmpCurrentFlatBufIndice++;
493 if (tmpCurrentFlatBufIndice == flatBuf->count) {
494 tmpCurrentFlatBufIndice = 0;
495 }
496 }
497 maxNumContigOperations += (lastNumNonContigSourceChunks+2);
498 if (lastNumNonContigSourceChunks > maxNumNonContigSourceChunks)
499 maxNumNonContigSourceChunks = lastNumNonContigSourceChunks;
500
501 }
502
503 ADIO_Offset blockStart = offset_list[blockIter], blockEnd = offset_list[blockIter]+len_list[blockIter]-(ADIO_Offset)1;
504
505
506
507
508 if (!((blockStart >= fd_start[currentAggRankListIndex]) && (blockStart <= fd_end[currentAggRankListIndex]))) {
509 while (!((blockStart >= fd_start[currentAggRankListIndex]) && (blockStart <= fd_end[currentAggRankListIndex])))
510 currentAggRankListIndex++;
511 };
512
513 #ifdef onesidedtrace
514 printf("currentAggRankListIndex is %d blockStart %ld blockEnd %ld fd_start[currentAggRankListIndex] %ld fd_end[currentAggRankListIndex] %ld\n",currentAggRankListIndex,blockStart,blockEnd,fd_start[currentAggRankListIndex],fd_end[currentAggRankListIndex]);
515 #endif
516
517
518
519 if (blockIter>0) {
520 if ((offset_list[blockIter-1]+len_list[blockIter-1]-(ADIO_Offset)1) < fd_start[currentAggRankListIndex]) {
521 numTargetAggs++;
522 }
523 }
524
525
526
527
528 if ((blockStart - fd_start[currentAggRankListIndex]) >= coll_bufsize) {
529 ADIO_Offset currentRoundBlockStart = fd_start[currentAggRankListIndex];
530 int startingRound = 0;
531 while (blockStart > (currentRoundBlockStart + coll_bufsize - (ADIO_Offset)1)) {
532 currentRoundBlockStart+=coll_bufsize;
533 startingRound++;
534 }
535 targetAggsForMyDataCurrentRoundIter[numTargetAggs] = startingRound;
536 }
537
538
539
540 if (targetAggsForMyDataFirstOffLenIndex[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs] == -1) {
541 targetAggsForMyData[numTargetAggs] = fd->hints->ranklist[currentAggRankListIndex];
542 targetAggsForMyDataFDStart[numTargetAggs] = fd_start[currentAggRankListIndex];
543
544
545 if (currentAggRankListIndex == smallestFileDomainAggRank) {
546 if (targetAggsForMyDataFDStart[numTargetAggs] < firstFileOffset)
547 targetAggsForMyDataFDStart[numTargetAggs] = firstFileOffset;
548 }
549 targetAggsForMyDataFDEnd[numTargetAggs] = fd_end[currentAggRankListIndex];
550
551
552 if (currentAggRankListIndex == greatestFileDomainAggRank) {
553 if (targetAggsForMyDataFDEnd[numTargetAggs] > lastFileOffset)
554 targetAggsForMyDataFDEnd[numTargetAggs] = lastFileOffset;
555 }
556 targetAggsForMyDataFirstOffLenIndex[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs] = blockIter;
557
558
559
560 if (bufTypeIsContig) {
561 if (currentFDSourceBufferState[numTargetAggs].sourceBufferOffset == -1) {
562
563 currentFDSourceBufferState[numTargetAggs].sourceBufferOffset = currentSourceBufferOffset;
564 #ifdef onesidedtrace
565 printf("For agg %d sourceBufferOffset initialized to %ld\n",currentAggRankListIndex,currentSourceBufferOffset);
566 #endif
567 }
568 }
569 else {
570 if (currentFDSourceBufferState[numTargetAggs].indiceOffset == -1) {
571 currentFDSourceBufferState[numTargetAggs].indiceOffset = currentIndiceOffset;
572 currentFDSourceBufferState[numTargetAggs].bufTypeExtent = bufTypeExtent;
573 currentFDSourceBufferState[numTargetAggs].dataTypeExtent = currentDataTypeExtent;
574 currentFDSourceBufferState[numTargetAggs].flatBufIndice = currentFlatBufIndice;
575 #ifdef onesidedtrace
576 printf("For agg %d dataTypeExtent initialized to %d flatBufIndice to %d indiceOffset to %ld\n",numTargetAggs,currentDataTypeExtent,currentFlatBufIndice,currentIndiceOffset);
577 #endif
578 }
579 }
580
581 intraRoundCollBufsizeOffset = fd_start[currentAggRankListIndex] + ((ADIO_Offset)(targetAggsForMyDataCurrentRoundIter[numTargetAggs]+1) * coll_bufsize);
582
583 #ifdef onesidedtrace
584 printf("Initial settings numTargetAggs %d offset_list[%d] with value %ld past fd border %ld with len %ld currentSourceBufferOffset set to %ld intraRoundCollBufsizeOffset set to %ld\n",numTargetAggs,blockIter,offset_list[blockIter],fd_start[currentAggRankListIndex],len_list[blockIter],currentSourceBufferOffset,intraRoundCollBufsizeOffset);
585 #endif
586 }
587
588
589
590 targetAggsForMyDataLastOffLenIndex[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs] = blockIter;
591
592
593
594 if (blockEnd > fd_end[currentAggRankListIndex]) {
595 #ifdef onesidedtrace
596 printf("block extends past current fd, blockEnd %ld >= fd_end[currentAggRankListIndex] %ld total block size is %ld blockStart was %ld\n",blockEnd,fd_end[currentAggRankListIndex], len_list[blockIter],blockStart);
597 #endif
598 ADIO_Offset amountToAdvanceSBOffsetForFD = 0;
599 int additionalFDCounter = 0;
600
601 while (blockEnd >= fd_end[currentAggRankListIndex]) {
602 ADIO_Offset thisAggBlockEnd = fd_end[currentAggRankListIndex];
603 if (thisAggBlockEnd >= intraRoundCollBufsizeOffset) {
604 while (thisAggBlockEnd >= intraRoundCollBufsizeOffset) {
605 targetAggsForMyDataCurrentRoundIter[numTargetAggs]++;
606 intraRoundCollBufsizeOffset += coll_bufsize;
607 targetAggsForMyDataFirstOffLenIndex[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs] = blockIter;
608 targetAggsForMyDataLastOffLenIndex[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs] = blockIter;
609 #ifdef onesidedtrace
610 printf("targetAggsForMyDataCurrentRoundI%d] is now %d intraRoundCollBufsizeOffset is now %ld\n",numTargetAggs,targetAggsForMyDataCurrentRoundIter[numTargetAggs],intraRoundCollBufsizeOffset);
611 #endif
612 }
613 }
614
615 int prevAggRankListIndex = currentAggRankListIndex;
616 currentAggRankListIndex++;
617
618
619
620 if (fd_start[currentAggRankListIndex] > fd_end[currentAggRankListIndex]) {
621 while (fd_start[currentAggRankListIndex] > fd_end[currentAggRankListIndex])
622 currentAggRankListIndex++;
623
624 }
625
626
627
628 if (blockEnd >= fd_start[currentAggRankListIndex]) {
629 numTargetAggs++;
630 targetAggsForMyData[numTargetAggs] = fd->hints->ranklist[currentAggRankListIndex];
631 targetAggsForMyDataFDStart[numTargetAggs] = fd_start[currentAggRankListIndex];
632
633
634 if (currentAggRankListIndex == smallestFileDomainAggRank) {
635 if (targetAggsForMyDataFDStart[numTargetAggs] < firstFileOffset)
636 targetAggsForMyDataFDStart[numTargetAggs] = firstFileOffset;
637 }
638 targetAggsForMyDataFDEnd[numTargetAggs] = fd_end[currentAggRankListIndex];
639
640
641 if (currentAggRankListIndex == greatestFileDomainAggRank) {
642 if (targetAggsForMyDataFDEnd[numTargetAggs] > lastFileOffset)
643 targetAggsForMyDataFDEnd[numTargetAggs] = lastFileOffset;
644 }
645 targetAggsForMyDataFirstOffLenIndex[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs] = blockIter;
646
647
648
649
650
651
652 if (additionalFDCounter == 0)
653 amountToAdvanceSBOffsetForFD = (fd_end[prevAggRankListIndex]
654 - blockStart) + (ADIO_Offset)1;
655 else
656 amountToAdvanceSBOffsetForFD = (fd_end[prevAggRankListIndex]
657 -fd_start[prevAggRankListIndex]) +(ADIO_Offset)1;
658
659 if (bufTypeIsContig) {
660 ADIOI_Assert(numTargetAggs > 0);
661 if (currentFDSourceBufferState[numTargetAggs].sourceBufferOffset == -1) {
662 if (additionalFDCounter == 0) {
663 currentFDSourceBufferState[numTargetAggs].sourceBufferOffset =
664 currentSourceBufferOffset+amountToAdvanceSBOffsetForFD;
665 }
666 else {
667 currentFDSourceBufferState[numTargetAggs].sourceBufferOffset =
668 currentFDSourceBufferState[numTargetAggs-1].sourceBufferOffset+amountToAdvanceSBOffsetForFD;
669 }
670
671 #ifdef onesidedtrace
672 printf("Crossed into new FD - for agg %d sourceBufferOffset initialized to %ld amountToAdvanceSBOffsetForFD is %ld\n",numTargetAggs,currentFDSourceBufferState[numTargetAggs].sourceBufferOffset,amountToAdvanceSBOffsetForFD);
673 #endif
674 }
675 }
676 else if (currentFDSourceBufferState[numTargetAggs].indiceOffset == -1) {
677
678 ADIOI_Assert(numTargetAggs > 0);
679
680
681
682
683
684 if (additionalFDCounter == 0) {
685
686 currentFDSourceBufferState[numTargetAggs].indiceOffset =
687 currentIndiceOffset;
688 currentFDSourceBufferState[numTargetAggs].bufTypeExtent = bufTypeExtent;
689 currentFDSourceBufferState[numTargetAggs].dataTypeExtent =
690 currentDataTypeExtent;
691 currentFDSourceBufferState[numTargetAggs].flatBufIndice =
692 currentFlatBufIndice;
693 }
694 else {
695
696 currentFDSourceBufferState[numTargetAggs].indiceOffset =
697 currentFDSourceBufferState[numTargetAggs-1].indiceOffset;
698 currentFDSourceBufferState[numTargetAggs].bufTypeExtent =
699 currentFDSourceBufferState[numTargetAggs-1].bufTypeExtent;
700 currentFDSourceBufferState[numTargetAggs].dataTypeExtent =
701 currentFDSourceBufferState[numTargetAggs-1].dataTypeExtent;
702 currentFDSourceBufferState[numTargetAggs].flatBufIndice =
703 currentFDSourceBufferState[numTargetAggs-1].flatBufIndice;
704 }
705 nonContigSourceDataBufferAdvance(((char*)buf), flatBuf,
706 (int)amountToAdvanceSBOffsetForFD, 1,
707 ¤tFDSourceBufferState[numTargetAggs], NULL);
708 #ifdef onesidedtrace
709 printf("Crossed into new FD - for agg %d dataTypeExtent initialized to %d flatBufIndice to %d indiceOffset to %ld amountToAdvanceSBOffsetForFD is %d\n",numTargetAggs,currentFDSourceBufferState[numTargetAggs].dataTypeExtent,currentFDSourceBufferState[numTargetAggs].flatBufIndice,currentFDSourceBufferState[numTargetAggs].indiceOffset,amountToAdvanceSBOffsetForFD);
710 #endif
711 }
712 additionalFDCounter++;
713
714 #ifdef onesidedtrace
715 printf("block extended beyond fd init settings numTargetAggs %d offset_list[%d] with value %ld past fd border %ld with len %ld\n",numTargetAggs,i,offset_list[blockIter],fd_start[currentAggRankListIndex],len_list[blockIter]);
716 #endif
717 intraRoundCollBufsizeOffset = fd_start[currentAggRankListIndex] + coll_bufsize;
718 targetAggsForMyDataLastOffLenIndex[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs] = blockIter;
719 }
720 }
721 }
722
723
724
725
726
727 if (blockEnd >= intraRoundCollBufsizeOffset) {
728 ADIO_Offset currentBlockEnd = blockEnd;
729 while (currentBlockEnd >= intraRoundCollBufsizeOffset) {
730 targetAggsForMyDataCurrentRoundIter[numTargetAggs]++;
731 intraRoundCollBufsizeOffset += coll_bufsize;
732 targetAggsForMyDataFirstOffLenIndex[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs] = blockIter;
733 targetAggsForMyDataLastOffLenIndex[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs] = blockIter;
734 #ifdef onesidedtrace
735 printf("smaller than fd currentBlockEnd is now %ld intraRoundCollBufsizeOffset is now %ld targetAggsForMyDataCurrentRoundIter[%d] is now %d\n",currentBlockEnd, intraRoundCollBufsizeOffset, numTargetAggs,targetAggsForMyDataCurrentRoundIter[numTargetAggs]);
736 #endif
737 }
738 }
739
740
741
742
743 if (blockIter == (contig_access_count-1))
744 numTargetAggs++;
745 }
746
747 #ifdef onesidedtrace
748 printf("numTargetAggs is %d\n",numTargetAggs);
749 for (i=0;i<numTargetAggs;i++) {
750 for (j=0;j<=targetAggsForMyDataCurrentRoundIter[i];j++)
751 printf("targetAggsForMyData[%d] is %d targetAggsForMyDataFDStart[%d] is %ld targetAggsForMyDataFDEnd is %ld targetAggsForMyDataFirstOffLenIndex is %d with value %ld targetAggsForMyDataLastOffLenIndex is %d with value %ld\n",i,targetAggsForMyData[i],i,targetAggsForMyDataFDStart[i],targetAggsForMyDataFDEnd[i],targetAggsForMyDataFirstOffLenIndex[j][i],offset_list[targetAggsForMyDataFirstOffLenIndex[j][i]],targetAggsForMyDataLastOffLenIndex[j][i],offset_list[targetAggsForMyDataLastOffLenIndex[j][i]]);
752
753 }
754 #endif
755
756 }
757
758 ADIOI_Free(targetAggsForMyDataCurrentRoundIter);
759
760 int currentWriteBuf = 0;
761 int useIOBuffer = 0;
762 #ifdef ROMIO_GPFS
763 if (gpfsmpio_pthreadio && (numberOfRounds>1)) {
764 useIOBuffer = 1;
765 io_thread = pthread_self();
766 }
767 #endif
768
769
770 char *write_buf0 = fd->io_buf;
771 char *write_buf1 = fd->io_buf + coll_bufsize;
772
773
774
775 char *write_buf = write_buf0;
776 MPI_Win write_buf_window = fd->io_buf_window;
777
778 int *write_buf_put_amounts = fd->io_buf_put_amounts;
779 if(!gpfsmpio_onesided_no_rmw) {
780 *hole_found = 0;
781 for (i=0;i<nprocs;i++)
782 write_buf_put_amounts[i] = 0;
783 }
784
785
786
787 ADIO_Offset currentRoundFDStart = 0;
788 ADIO_Offset currentRoundFDEnd = 0;
789
790 if (iAmUsedAgg) {
791 currentRoundFDStart = fd_start[myAggRank];
792 if (myAggRank == smallestFileDomainAggRank) {
793 if (currentRoundFDStart < firstFileOffset)
794 currentRoundFDStart = firstFileOffset;
795 }
796 else if (myAggRank == greatestFileDomainAggRank) {
797 if (currentRoundFDEnd > lastFileOffset)
798 currentRoundFDEnd = lastFileOffset;
799 }
800 #ifdef onesidedtrace
801 printf("iAmUsedAgg - currentRoundFDStart initialized to %ld currentRoundFDEnd to %ld\n",currentRoundFDStart,currentRoundFDEnd);
802 #endif
803 if (gpfsmpio_onesided_always_rmw) {
804 ADIO_Offset tmpCurrentRoundFDEnd = 0;
805 if ((fd_end[myAggRank] - currentRoundFDStart) < coll_bufsize) {
806 if (myAggRank == greatestFileDomainAggRank) {
807 if (fd_end[myAggRank] > lastFileOffset)
808 tmpCurrentRoundFDEnd = lastFileOffset;
809 else
810 tmpCurrentRoundFDEnd = fd_end[myAggRank];
811 }
812 else
813 tmpCurrentRoundFDEnd = fd_end[myAggRank];
814 }
815 else
816 tmpCurrentRoundFDEnd = currentRoundFDStart + coll_bufsize - (ADIO_Offset)1;
817 #ifdef onesidedtrace
818 printf("gpfsmpio_onesided_always_rmw - first buffer pre-read for file offsets %ld to %ld total is %d\n",currentRoundFDStart,tmpCurrentRoundFDEnd,(int)(tmpCurrentRoundFDEnd - currentRoundFDStart)+1);
819 #endif
820 ADIO_ReadContig(fd, write_buf, (int)(tmpCurrentRoundFDEnd - currentRoundFDStart)+1,
821 MPI_BYTE, ADIO_EXPLICIT_OFFSET,currentRoundFDStart, &status, error_code);
822
823 }
824 }
825 if (gpfsmpio_onesided_always_rmw)
826 MPI_Barrier(fd->comm);
827
828 #ifdef ROMIO_GPFS
829 endTimeBase = MPI_Wtime();
830 gpfsmpio_prof_cw[GPFSMPIO_CIO_T_DEXCH_SETUP] += (endTimeBase-startTimeBase);
831 startTimeBase = MPI_Wtime();
832 #endif
833
834
835
836
837
838
839
840
841
842
843 int roundIter;
844
845 for (roundIter=0;roundIter<numberOfRounds;roundIter++) {
846 if ((contig_access_count > 0) && (buf != NULL) && lenListOverZero) {
847
848
849 int aggIter;
850 for (aggIter=0;aggIter<numTargetAggs;aggIter++) {
851
852 int numBytesPutThisAggRound = 0;
853
854
855 if (targetAggsForMyDataFirstOffLenIndex[roundIter][aggIter] != -1) {
856 ADIO_Offset currentRoundFDStartForMyTargetAgg = (ADIO_Offset)((ADIO_Offset)targetAggsForMyDataFDStart[aggIter] + (ADIO_Offset)((ADIO_Offset)roundIter*coll_bufsize));
857 ADIO_Offset currentRoundFDEndForMyTargetAgg = (ADIO_Offset)((ADIO_Offset)targetAggsForMyDataFDStart[aggIter] + (ADIO_Offset)((ADIO_Offset)(roundIter+1)*coll_bufsize) - (ADIO_Offset)1);
858
859 int targetAggContigAccessCount = 0;
860
861
862
863
864 int *targetAggBlockLengths=NULL;
865 MPI_Aint *targetAggDisplacements=NULL, *sourceBufferDisplacements=NULL;
866 MPI_Datatype *targetAggDataTypes=NULL;
867
868 char *derivedTypePackedSourceBuffer=NULL;
869 int derivedTypePackedSourceBufferOffset = 0;
870 int allocatedDerivedTypeArrays = 0;
871 ADIO_Offset amountOfDataWrittenThisRoundAgg = 0;
872
873 #ifdef onesidedtrace
874 printf("roundIter %d processing targetAggsForMyData %d \n",roundIter,targetAggsForMyData[aggIter]);
875 #endif
876
877
878
879 int offsetIter;
880 int startingOffLenIndex = targetAggsForMyDataFirstOffLenIndex[roundIter][aggIter], endingOffLenIndex = targetAggsForMyDataLastOffLenIndex[roundIter][aggIter];
881 for (offsetIter=startingOffLenIndex;offsetIter<=endingOffLenIndex;offsetIter++) {
882 if (currentRoundFDEndForMyTargetAgg > targetAggsForMyDataFDEnd[aggIter])
883 currentRoundFDEndForMyTargetAgg = targetAggsForMyDataFDEnd[aggIter];
884
885 ADIO_Offset offsetStart = offset_list[offsetIter], offsetEnd = (offset_list[offsetIter]+len_list[offsetIter]-(ADIO_Offset)1);
886
887 #ifdef onesidedtrace
888 printf("roundIter %d target iter %d targetAggsForMyData is %d offset_list[%d] is %ld len_list[%d] is %ld targetAggsForMyDataFDStart is %ld targetAggsForMyDataFDEnd is %ld currentRoundFDStartForMyTargetAgg is %ld currentRoundFDEndForMyTargetAgg is %ld targetAggsForMyDataFirstOffLenIndex is %ld\n",
889 roundIter,aggIter,targetAggsForMyData[aggIter],offsetIter,offset_list[offsetIter],offsetIter,len_list[offsetIter],
890 targetAggsForMyDataFDStart[aggIter],targetAggsForMyDataFDEnd[aggIter],
891 currentRoundFDStartForMyTargetAgg,currentRoundFDEndForMyTargetAgg, targetAggsForMyDataFirstOffLenIndex[roundIter][aggIter]);
892 #endif
893
894
895
896 int bufferAmountToSend = 0;
897
898 if ((offsetStart >= currentRoundFDStartForMyTargetAgg) && (offsetStart <= currentRoundFDEndForMyTargetAgg)) {
899 if (offsetEnd > currentRoundFDEndForMyTargetAgg)
900 bufferAmountToSend = (currentRoundFDEndForMyTargetAgg - offsetStart) +1;
901 else
902 bufferAmountToSend = (offsetEnd - offsetStart) +1;
903 }
904 else if ((offsetEnd >= currentRoundFDStartForMyTargetAgg) && (offsetEnd <= currentRoundFDEndForMyTargetAgg)) {
905 if (offsetEnd > currentRoundFDEndForMyTargetAgg)
906 bufferAmountToSend = (currentRoundFDEndForMyTargetAgg - currentRoundFDStartForMyTargetAgg) +1;
907 else
908 bufferAmountToSend = (offsetEnd - currentRoundFDStartForMyTargetAgg) +1;
909 if (offsetStart < currentRoundFDStartForMyTargetAgg) {
910 offsetStart = currentRoundFDStartForMyTargetAgg;
911 }
912 }
913 else if ((offsetStart <= currentRoundFDStartForMyTargetAgg) && (offsetEnd >= currentRoundFDEndForMyTargetAgg)) {
914 bufferAmountToSend = (currentRoundFDEndForMyTargetAgg - currentRoundFDStartForMyTargetAgg) +1;
915 offsetStart = currentRoundFDStartForMyTargetAgg;
916 }
917
918 numBytesPutThisAggRound += bufferAmountToSend;
919 #ifdef onesidedtrace
920 printf("bufferAmountToSend is %d\n",bufferAmountToSend);
921 #endif
922 if (bufferAmountToSend > 0) {
923 if (gpfsmpio_write_aggmethod == 2) {
924
925
926 if (!allocatedDerivedTypeArrays) {
927 targetAggBlockLengths = (int *)ADIOI_Malloc(maxNumContigOperations * sizeof(int));
928 targetAggDisplacements = (MPI_Aint *)ADIOI_Malloc(maxNumContigOperations * sizeof(MPI_Aint));
929 sourceBufferDisplacements = (MPI_Aint *)ADIOI_Malloc(maxNumContigOperations * sizeof(MPI_Aint));
930 targetAggDataTypes = (MPI_Datatype *)ADIOI_Malloc(maxNumContigOperations * sizeof(MPI_Datatype));
931 if (!bufTypeIsContig) {
932 int k;
933 for (k=targetAggsForMyDataFirstOffLenIndex[roundIter][aggIter];k<=targetAggsForMyDataLastOffLenIndex[roundIter][aggIter];k++)
934 amountOfDataWrittenThisRoundAgg += len_list[k];
935
936 #ifdef onesidedtrace
937 printf("derivedTypePackedSourceBuffer mallocing %ld\n",amountOfDataWrittenThisRoundAgg);
938 #endif
939 if (amountOfDataWrittenThisRoundAgg > 0)
940 derivedTypePackedSourceBuffer = (char *)ADIOI_Malloc(amountOfDataWrittenThisRoundAgg * sizeof(char));
941 else
942 derivedTypePackedSourceBuffer = NULL;
943 }
944 allocatedDerivedTypeArrays = 1;
945 }
946 }
947
948
949
950 MPI_Aint targetDisplacementToUseThisRound = (MPI_Aint) (offsetStart - currentRoundFDStartForMyTargetAgg);
951
952
953
954 if (useIOBuffer && (write_buf == write_buf1)) {
955 targetDisplacementToUseThisRound += (MPI_Aint) coll_bufsize;
956 }
957
958
959
960
961
962 if (gpfsmpio_write_aggmethod == 1) {
963 MPI_Win_lock(MPI_LOCK_SHARED, targetAggsForMyData[aggIter], 0, write_buf_window);
964 if (bufTypeIsContig) {
965 MPI_Put(((char*)buf) + currentFDSourceBufferState[aggIter].sourceBufferOffset,bufferAmountToSend, MPI_BYTE,targetAggsForMyData[aggIter],targetDisplacementToUseThisRound, bufferAmountToSend,MPI_BYTE,write_buf_window);
966 currentFDSourceBufferState[aggIter].sourceBufferOffset += (ADIO_Offset)bufferAmountToSend;
967 }
968 else {
969 char *putSourceData = (char *) ADIOI_Malloc(bufferAmountToSend*sizeof(char));
970 nonContigSourceDataBufferAdvance(((char*)buf), flatBuf, bufferAmountToSend, 1, ¤tFDSourceBufferState[aggIter], putSourceData);
971 MPI_Put(putSourceData,bufferAmountToSend, MPI_BYTE,targetAggsForMyData[aggIter],targetDisplacementToUseThisRound, bufferAmountToSend,MPI_BYTE,write_buf_window);
972 ADIOI_Free(putSourceData);
973 }
974 MPI_Win_unlock(targetAggsForMyData[aggIter], write_buf_window);
975 }
976
977
978
979
980
981 else if (gpfsmpio_write_aggmethod == 2) {
982
983 if (bufTypeIsContig) {
984 targetAggBlockLengths[targetAggContigAccessCount]= bufferAmountToSend;
985 targetAggDataTypes[targetAggContigAccessCount] = MPI_BYTE;
986 targetAggDisplacements[targetAggContigAccessCount] = targetDisplacementToUseThisRound;
987 sourceBufferDisplacements[targetAggContigAccessCount] = (MPI_Aint)currentFDSourceBufferState[aggIter].sourceBufferOffset;
988 currentFDSourceBufferState[aggIter].sourceBufferOffset += (ADIO_Offset)bufferAmountToSend;
989 targetAggContigAccessCount++;
990 }
991 else {
992 nonContigSourceDataBufferAdvance(((char*)buf), flatBuf, bufferAmountToSend, 1, ¤tFDSourceBufferState[aggIter], &derivedTypePackedSourceBuffer[derivedTypePackedSourceBufferOffset]);
993 targetAggBlockLengths[targetAggContigAccessCount]= bufferAmountToSend;
994 targetAggDataTypes[targetAggContigAccessCount] = MPI_BYTE;
995 targetAggDisplacements[targetAggContigAccessCount] = targetDisplacementToUseThisRound;
996 sourceBufferDisplacements[targetAggContigAccessCount] = (MPI_Aint)derivedTypePackedSourceBufferOffset;
997 targetAggContigAccessCount++;
998 derivedTypePackedSourceBufferOffset += (ADIO_Offset)bufferAmountToSend;
999 }
1000 }
1001 #ifdef onesidedtrace
1002 printf("roundIter %d bufferAmountToSend is %d offsetStart is %ld currentRoundFDStartForMyTargetAgg is %ld targetDisplacementToUseThisRound is %ld targetAggsForMyDataFDStart[aggIter] is %ld\n",roundIter, bufferAmountToSend, offsetStart,currentRoundFDStartForMyTargetAgg,targetDisplacementToUseThisRound,targetAggsForMyDataFDStart[aggIter]);
1003 #endif
1004
1005 }
1006 }
1007
1008
1009
1010 if (gpfsmpio_write_aggmethod == 2) {
1011
1012 MPI_Datatype sourceBufferDerivedDataType, targetBufferDerivedDataType;
1013 MPI_Type_create_struct(targetAggContigAccessCount, targetAggBlockLengths, sourceBufferDisplacements, targetAggDataTypes, &sourceBufferDerivedDataType);
1014 MPI_Type_commit(&sourceBufferDerivedDataType);
1015 MPI_Type_create_struct(targetAggContigAccessCount, targetAggBlockLengths, targetAggDisplacements, targetAggDataTypes, &targetBufferDerivedDataType);
1016 MPI_Type_commit(&targetBufferDerivedDataType);
1017
1018 #ifdef onesidedtrace
1019 printf("mpi_put of derived type to agg %d targetAggContigAccessCount is %d\n",targetAggsForMyData[aggIter],targetAggContigAccessCount);
1020 #endif
1021 if (targetAggContigAccessCount > 0) {
1022 MPI_Win_lock(MPI_LOCK_SHARED, targetAggsForMyData[aggIter], 0, write_buf_window);
1023 if (bufTypeIsContig) {
1024 MPI_Put(((char*)buf),1, sourceBufferDerivedDataType,targetAggsForMyData[aggIter],0, 1,targetBufferDerivedDataType,write_buf_window);
1025 }
1026 else {
1027 MPI_Put(derivedTypePackedSourceBuffer,1, sourceBufferDerivedDataType,targetAggsForMyData[aggIter],0, 1,targetBufferDerivedDataType,write_buf_window);
1028 }
1029
1030
1031 MPI_Win_unlock(targetAggsForMyData[aggIter], write_buf_window);
1032 }
1033
1034 if (allocatedDerivedTypeArrays) {
1035 ADIOI_Free(targetAggBlockLengths);
1036 ADIOI_Free(targetAggDisplacements);
1037 ADIOI_Free(targetAggDataTypes);
1038 ADIOI_Free(sourceBufferDisplacements);
1039 if (!bufTypeIsContig)
1040 if (derivedTypePackedSourceBuffer != NULL)
1041 ADIOI_Free(derivedTypePackedSourceBuffer);
1042 }
1043 if (targetAggContigAccessCount > 0) {
1044 MPI_Type_free(&sourceBufferDerivedDataType);
1045 MPI_Type_free(&targetBufferDerivedDataType);
1046 }
1047 }
1048 if (!gpfsmpio_onesided_no_rmw) {
1049 MPI_Win_lock(MPI_LOCK_SHARED, targetAggsForMyData[aggIter], 0, fd->io_buf_put_amounts_window);
1050 MPI_Put(&numBytesPutThisAggRound,1, MPI_INT,targetAggsForMyData[aggIter],myrank, 1,MPI_INT,fd->io_buf_put_amounts_window);
1051 MPI_Win_unlock(targetAggsForMyData[aggIter], fd->io_buf_put_amounts_window);
1052 }
1053 }
1054 }
1055 }
1056
1057 #ifdef onesidedtrace
1058 printf("first barrier roundIter %d\n",roundIter);
1059 #endif
1060 MPI_Barrier(fd->comm);
1061
1062 if (iAmUsedAgg) {
1063
1064
1065 if ((fd_end[myAggRank] - currentRoundFDStart) < coll_bufsize) {
1066 if (myAggRank == greatestFileDomainAggRank) {
1067 if (fd_end[myAggRank] > lastFileOffset)
1068 currentRoundFDEnd = lastFileOffset;
1069 else
1070 currentRoundFDEnd = fd_end[myAggRank];
1071 }
1072 else
1073 currentRoundFDEnd = fd_end[myAggRank];
1074 }
1075 else
1076 currentRoundFDEnd = currentRoundFDStart + coll_bufsize - (ADIO_Offset)1;
1077
1078 #ifdef onesidedtrace
1079 printf("used agg about to writecontig - currentRoundFDStart is %ld currentRoundFDEnd is %ld within file domeain %ld to %ld\n",currentRoundFDStart,currentRoundFDEnd,fd_start[myAggRank],fd_end[myAggRank]);
1080 #endif
1081
1082 int doWriteContig = 1;
1083 if (!gpfsmpio_onesided_no_rmw) {
1084 int numBytesPutIntoBuf = 0;
1085 for (i=0;i<nprocs;i++) {
1086 numBytesPutIntoBuf += write_buf_put_amounts[i];
1087 write_buf_put_amounts[i] = 0;
1088 }
1089 if (numBytesPutIntoBuf != ((int)(currentRoundFDEnd - currentRoundFDStart)+1)) {
1090 doWriteContig = 0;
1091 *hole_found = 1;
1092 }
1093 }
1094
1095 if (!useIOBuffer) {
1096 if (doWriteContig)
1097 ADIO_WriteContig(fd, write_buf, (int)(currentRoundFDEnd - currentRoundFDStart)+1,
1098 MPI_BYTE, ADIO_EXPLICIT_OFFSET,currentRoundFDStart, &status, error_code);
1099
1100 } else {
1101
1102 if(!pthread_equal(io_thread, pthread_self())) {
1103 pthread_join(io_thread, &thread_ret);
1104 *error_code = *(int *)thread_ret;
1105 if (*error_code != MPI_SUCCESS) return;
1106 io_thread = pthread_self();
1107
1108 }
1109 io_thread_args.fd = fd;
1110
1111
1112
1113 if (currentWriteBuf == 0) {
1114 io_thread_args.buf = write_buf0;
1115 currentWriteBuf = 1;
1116 write_buf = write_buf1;
1117 }
1118 else {
1119 io_thread_args.buf = write_buf1;
1120 currentWriteBuf = 0;
1121 write_buf = write_buf0;
1122 }
1123 if (doWriteContig) {
1124 io_thread_args.io_kind = ADIOI_WRITE;
1125 io_thread_args.size = (currentRoundFDEnd-currentRoundFDStart) + 1;
1126 io_thread_args.offset = currentRoundFDStart;
1127 io_thread_args.status = &status;
1128 io_thread_args.error_code = *error_code;
1129
1130 if ( (pthread_create(&io_thread, NULL,
1131 ADIOI_IO_Thread_Func, &(io_thread_args))) != 0)
1132 io_thread = pthread_self();
1133 }
1134 }
1135
1136 }
1137
1138 if (!iAmUsedAgg && useIOBuffer) {
1139 if (currentWriteBuf == 0) {
1140 currentWriteBuf = 1;
1141 write_buf = write_buf1;
1142 }
1143 else {
1144 currentWriteBuf = 0;
1145 write_buf = write_buf0;
1146 }
1147 }
1148
1149 if (iAmUsedAgg) {
1150 currentRoundFDStart += coll_bufsize;
1151
1152 if (gpfsmpio_onesided_always_rmw && (roundIter<(numberOfRounds-1))) {
1153 ADIO_Offset tmpCurrentRoundFDEnd = 0;
1154 if ((fd_end[myAggRank] - currentRoundFDStart) < coll_bufsize) {
1155 if (myAggRank == greatestFileDomainAggRank) {
1156 if (fd_end[myAggRank] > lastFileOffset)
1157 tmpCurrentRoundFDEnd = lastFileOffset;
1158 else
1159 tmpCurrentRoundFDEnd = fd_end[myAggRank];
1160 }
1161 else
1162 tmpCurrentRoundFDEnd = fd_end[myAggRank];
1163 }
1164 else
1165 tmpCurrentRoundFDEnd = currentRoundFDStart + coll_bufsize - (ADIO_Offset)1;
1166 #ifdef onesidedtrace
1167 printf("gpfsmpio_onesided_always_rmw - round %d buffer pre-read for file offsets %ld to %ld total is %d\n",roundIter, currentRoundFDStart,tmpCurrentRoundFDEnd,(int)(tmpCurrentRoundFDEnd - currentRoundFDStart)+1);
1168 #endif
1169 ADIO_ReadContig(fd, write_buf, (int)(tmpCurrentRoundFDEnd - currentRoundFDStart)+1,
1170 MPI_BYTE, ADIO_EXPLICIT_OFFSET,currentRoundFDStart, &status, error_code);
1171
1172 }
1173 }
1174
1175 if (roundIter<(numberOfRounds-1)) {
1176 #ifdef onesidedtrace
1177 printf("second barrier roundIter %d\n",roundIter);
1178 #endif
1179 MPI_Barrier(fd->comm);
1180 }
1181
1182 }
1183
1184
1185 #ifdef ROMIO_GPFS
1186 endTimeBase = MPI_Wtime();
1187 gpfsmpio_prof_cw[GPFSMPIO_CIO_T_DEXCH] += (endTimeBase-startTimeBase);
1188 #endif
1189
1190 if (useIOBuffer) {
1191
1192 if ( !pthread_equal(io_thread, pthread_self()) ) {
1193 pthread_join(io_thread, &thread_ret);
1194 *error_code = *(int *)thread_ret;
1195 }
1196
1197 }
1198
1199 #ifdef onesidedtrace
1200 printf("freeing datastructures\n");
1201 #endif
1202 ADIOI_Free(targetAggsForMyData);
1203 ADIOI_Free(targetAggsForMyDataFDStart);
1204 ADIOI_Free(targetAggsForMyDataFDEnd);
1205
1206 for (i=0;i<numberOfRounds;i++) {
1207 ADIOI_Free(targetAggsForMyDataFirstOffLenIndex[i]);
1208 ADIOI_Free(targetAggsForMyDataLastOffLenIndex[i]);
1209 }
1210 ADIOI_Free(targetAggsForMyDataFirstOffLenIndex);
1211 ADIOI_Free(targetAggsForMyDataLastOffLenIndex);
1212
1213 ADIOI_Free(currentFDSourceBufferState);
1214
1215 if (!bufTypeIsContig)
1216 ADIOI_Delete_flattened(datatype);
1217 return;
1218 }
1219
1220
1221 void ADIOI_OneSidedReadAggregation(ADIO_File fd,
1222 ADIO_Offset *offset_list,
1223 ADIO_Offset *len_list,
1224 int contig_access_count,
1225 const void *buf,
1226 MPI_Datatype datatype,
1227 int *error_code,
1228 ADIO_Offset *st_offsets,
1229 ADIO_Offset *end_offsets,
1230 int numNonZeroDataOffsets,
1231 ADIO_Offset *fd_start,
1232 ADIO_Offset* fd_end)
1233 {
1234 int i,j;
1235
1236 #ifdef onesidedtrace
1237 if (buf == NULL) {
1238 printf("ADIOI_OneSidedWriteAggregation - buf is NULL contig_access_count is %d\n",contig_access_count);
1239 for (i=0;i<contig_access_count;i++)
1240 printf("offset_list[%d] is %ld len_list[%d] is %ld\n",i,offset_list[i],i,len_list[i]);
1241 }
1242 if (contig_access_count < 0)
1243 printf("ADIOI_OneSidedWriteAggregation - contig_access_count of %d is less than 0\n",contig_access_count);
1244 #endif
1245
1246 int lenListOverZero = 0;
1247 for (i=0;((i<contig_access_count) && (!lenListOverZero));i++)
1248 if (len_list[i] > 0)
1249 lenListOverZero = 1;
1250
1251 *error_code = MPI_SUCCESS;
1252
1253 #ifdef ROMIO_GPFS
1254 double startTimeBase,endTimeBase;
1255 startTimeBase = MPI_Wtime();
1256 #endif
1257
1258 MPI_Status status;
1259 pthread_t io_thread;
1260 void *thread_ret;
1261 ADIOI_IO_ThreadFuncData io_thread_args;
1262
1263 int nprocs,myrank;
1264 MPI_Comm_size(fd->comm, &nprocs);
1265 MPI_Comm_rank(fd->comm, &myrank);
1266
1267 #ifdef onesidedtrace
1268 printf("ADIOI_OneSidedReadAggregation started on rank %d\n",myrank);
1269 #endif
1270
1271 if (fd->io_buf_window == MPI_WIN_NULL ||
1272 fd->io_buf_put_amounts_window == MPI_WIN_NULL)
1273 {
1274 ADIOI_OneSidedSetup(fd, nprocs);
1275 }
1276
1277
1278
1279
1280
1281
1282 int bufTypeIsContig;
1283
1284 MPI_Aint bufTypeExtent, lb;
1285 ADIOI_Flatlist_node *flatBuf=NULL;
1286 ADIOI_Datatype_iscontig(datatype, &bufTypeIsContig);
1287
1288 if (!bufTypeIsContig) {
1289
1290
1291 flatBuf = ADIOI_Flatten_and_find(datatype);
1292 MPI_Type_get_extent(datatype, &lb, &bufTypeExtent);
1293 #ifdef onesidedtrace
1294 printf("flatBuf->count is %d bufTypeExtent is %d\n", flatBuf->count,bufTypeExtent);
1295 for (i=0;i<flatBuf->count;i++)
1296 printf("flatBuf->blocklens[%d] is %d flatBuf->indices[%d] is %ld\n",i,flatBuf->blocklens[i],i,flatBuf->indices[i]);
1297 #endif
1298 }
1299 #ifdef onesidedtrace
1300 printf("ADIOI_OneSidedReadAggregation bufTypeIsContig is %d contig_access_count is %d\n",bufTypeIsContig,contig_access_count);
1301 #endif
1302
1303 int naggs = fd->hints->cb_nodes;
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314 #ifdef onesidedtrace
1315 printf("sizeof(FDSourceBufferState) is %d - make sure is 32 for 32-byte memalign optimal\n",sizeof(FDSourceBufferState));
1316 #endif
1317 FDSourceBufferState *currentFDSourceBufferState;
1318
1319 currentFDSourceBufferState = (FDSourceBufferState *) ADIOI_Malloc(naggs * sizeof(FDSourceBufferState));
1320 for (i=0;i<naggs;i++) {
1321
1322
1323 if (bufTypeIsContig) {
1324 currentFDSourceBufferState[i].sourceBufferOffset = -1;
1325 }
1326 else {
1327 currentFDSourceBufferState[i].indiceOffset = -1;
1328 }
1329 }
1330
1331 #ifdef onesidedtrace
1332 printf(" ADIOI_OneSidedReadAggregation bufTypeIsContig is %d contig_access_count is %d\n",bufTypeIsContig,contig_access_count);
1333 #endif
1334
1335
1336
1337
1338
1339 int maxNumContigOperations = contig_access_count;
1340
1341
1342 ADIO_Offset lastFileOffset = 0, firstFileOffset = -1;
1343
1344
1345
1346 for (j=0;j<numNonZeroDataOffsets;j++) {
1347 #ifdef onesidedtrace
1348 printf("end_offsets[%d] is %ld st_offsets[%d] is %ld\n",j,end_offsets[j],j,st_offsets[j]);
1349 #endif
1350 lastFileOffset = ADIOI_MAX(lastFileOffset,end_offsets[j]);
1351 if (firstFileOffset == -1)
1352 firstFileOffset = st_offsets[j];
1353 else
1354 firstFileOffset = ADIOI_MIN(firstFileOffset,st_offsets[j]);
1355 }
1356
1357 int myAggRank = -1;
1358 int iAmUsedAgg = 0;
1359
1360 int coll_bufsize = fd->hints->cb_buffer_size;
1361 #ifdef ROMIO_GPFS
1362 if (gpfsmpio_pthreadio == 1) {
1363
1364 coll_bufsize = fd->hints->cb_buffer_size/2;
1365 }
1366 #endif
1367
1368
1369
1370
1371 int greatestFileDomainAggRank = -1,smallestFileDomainAggRank = -1;
1372 ADIO_Offset greatestFileDomainOffset = 0;
1373 ADIO_Offset smallestFileDomainOffset = lastFileOffset;
1374 for (j=0;j<naggs;j++) {
1375 if (fd_end[j] > greatestFileDomainOffset) {
1376 greatestFileDomainOffset = fd_end[j];
1377 greatestFileDomainAggRank = j;
1378 }
1379 if (fd_start[j] < smallestFileDomainOffset) {
1380 smallestFileDomainOffset = fd_start[j];
1381 smallestFileDomainAggRank = j;
1382 }
1383 if (fd->hints->ranklist[j] == myrank) {
1384 myAggRank = j;
1385 if (fd_end[j] > fd_start[j]) {
1386 iAmUsedAgg = 1;
1387 }
1388 }
1389 }
1390
1391 #ifdef onesidedtrace
1392 printf("contig_access_count is %d lastFileOffset is %ld firstFileOffset is %ld\n",contig_access_count,lastFileOffset,firstFileOffset);
1393 for (j=0;j<contig_access_count;j++) {
1394 printf("offset_list[%d]: %ld , len_list[%d]: %ld\n",j,offset_list[j],j,len_list[j]);
1395 }
1396 #endif
1397
1398
1399
1400 int numberOfRounds = 0;
1401 for (j=0;j<naggs;j++) {
1402 int currentNumberOfRounds = (int)(((fd_end[j] - fd_start[j])+(ADIO_Offset)1)/coll_bufsize);
1403 if (((ADIO_Offset)currentNumberOfRounds*coll_bufsize) < ((fd_end[j] - fd_start[j])+(ADIO_Offset)1))
1404 currentNumberOfRounds++;
1405 if (currentNumberOfRounds > numberOfRounds)
1406 numberOfRounds = currentNumberOfRounds;
1407 }
1408
1409
1410
1411 int *sourceAggsForMyData = (int *)ADIOI_Malloc(naggs * sizeof(int));
1412 ADIO_Offset *sourceAggsForMyDataFDStart = (ADIO_Offset *)ADIOI_Malloc(naggs * sizeof(ADIO_Offset));
1413 ADIO_Offset *sourceAggsForMyDataFDEnd = (ADIO_Offset *)ADIOI_Malloc(naggs * sizeof(ADIO_Offset));
1414 int numSourceAggs = 0;
1415
1416
1417
1418
1419 int **sourceAggsForMyDataFirstOffLenIndex = (int **)ADIOI_Malloc(numberOfRounds * sizeof(int *));
1420 for (i=0;i<numberOfRounds;i++) {
1421 sourceAggsForMyDataFirstOffLenIndex[i] = (int *)ADIOI_Malloc(naggs * sizeof(int));
1422 for (j=0;j<naggs;j++)
1423 sourceAggsForMyDataFirstOffLenIndex[i][j] = -1;
1424 }
1425
1426
1427
1428
1429 int **sourceAggsForMyDataLastOffLenIndex = (int **)ADIOI_Malloc(numberOfRounds * sizeof(int *));
1430 for (i=0;i<numberOfRounds;i++)
1431 sourceAggsForMyDataLastOffLenIndex[i] = (int *)ADIOI_Malloc(naggs * sizeof(int));
1432
1433 #ifdef onesidedtrace
1434 printf("NumberOfRounds is %d\n",numberOfRounds);
1435 for (i=0;i<naggs;i++)
1436 printf("fd->hints->ranklist[%d]is %d fd_start is %ld fd_end is %ld\n",i,fd->hints->ranklist[i],fd_start[i],fd_end[i]);
1437 for (j=0;j<contig_access_count;j++)
1438 printf("offset_list[%d] is %ld len_list is %ld\n",j,offset_list[j],len_list[j]);
1439 #endif
1440
1441 int currentAggRankListIndex = 0;
1442 int maxNumNonContigSourceChunks = 0;
1443
1444 ADIO_Offset currentRecvBufferOffset = 0;
1445 int currentDataTypeExtent = 0;
1446 int currentFlatBufIndice=0;
1447 ADIO_Offset currentIndiceOffset = 0;
1448
1449
1450
1451 ADIO_Offset intraRoundCollBufsizeOffset = 0;
1452
1453
1454
1455 int *sourceAggsForMyDataCurrentRoundIter = (int *)ADIOI_Malloc(naggs * sizeof(int));
1456 for (i=0;i<naggs;i++)
1457 sourceAggsForMyDataCurrentRoundIter[i] = 0;
1458
1459
1460
1461
1462
1463
1464 if ((contig_access_count > 0) && (buf != NULL) && lenListOverZero) {
1465 int blockIter;
1466 for (blockIter=0;blockIter<contig_access_count;blockIter++) {
1467
1468
1469
1470 if (blockIter>0) {
1471 if (bufTypeIsContig) {
1472 currentRecvBufferOffset += len_list[blockIter-1];
1473 }
1474 else {
1475
1476
1477
1478 ADIO_Offset sourceBlockTotal = 0;
1479 int lastIndiceUsed = currentFlatBufIndice;
1480 int numNonContigSourceChunks = 0;
1481 while (sourceBlockTotal < len_list[blockIter-1]) {
1482 numNonContigSourceChunks++;
1483 sourceBlockTotal += (flatBuf->blocklens[currentFlatBufIndice] - currentIndiceOffset);
1484 lastIndiceUsed = currentFlatBufIndice;
1485 currentFlatBufIndice++;
1486 if (currentFlatBufIndice == flatBuf->count) {
1487 currentFlatBufIndice = 0;
1488 currentDataTypeExtent++;
1489 }
1490 currentIndiceOffset = (ADIO_Offset)0;
1491 }
1492 if (sourceBlockTotal > len_list[blockIter-1]) {
1493 currentFlatBufIndice--;
1494 if (currentFlatBufIndice < 0 ) {
1495 currentDataTypeExtent--;
1496 currentFlatBufIndice = flatBuf->count-1;
1497 }
1498 currentIndiceOffset = len_list[blockIter-1] - (sourceBlockTotal - flatBuf->blocklens[lastIndiceUsed]);
1499
1500 }
1501 else
1502 currentIndiceOffset = 0;
1503 maxNumContigOperations += (numNonContigSourceChunks+2);
1504 if (numNonContigSourceChunks > maxNumNonContigSourceChunks)
1505 maxNumNonContigSourceChunks = numNonContigSourceChunks;
1506
1507 #ifdef onesidedtrace
1508 printf("block iter %d currentFlatBufIndice is now %d currentDataTypeExtent is now %d currentIndiceOffset is now %ld maxNumContigOperations is now %d\n",blockIter,currentFlatBufIndice,currentDataTypeExtent,currentIndiceOffset,maxNumContigOperations);
1509 #endif
1510 }
1511 }
1512
1513
1514
1515
1516 if ((blockIter == (contig_access_count-1)) && (!bufTypeIsContig)) {
1517 ADIO_Offset sourceBlockTotal = 0;
1518 int tmpCurrentFlatBufIndice = currentFlatBufIndice;
1519 int lastNumNonContigSourceChunks = 0;
1520 while (sourceBlockTotal < len_list[blockIter]) {
1521 lastNumNonContigSourceChunks++;
1522 sourceBlockTotal += flatBuf->blocklens[tmpCurrentFlatBufIndice];
1523 tmpCurrentFlatBufIndice++;
1524 if (tmpCurrentFlatBufIndice == flatBuf->count) {
1525 tmpCurrentFlatBufIndice = 0;
1526 }
1527 }
1528 maxNumContigOperations += (lastNumNonContigSourceChunks+2);
1529 if (lastNumNonContigSourceChunks > maxNumNonContigSourceChunks)
1530 maxNumNonContigSourceChunks = lastNumNonContigSourceChunks;
1531
1532 }
1533
1534 ADIO_Offset blockStart = offset_list[blockIter], blockEnd = offset_list[blockIter]+len_list[blockIter]-(ADIO_Offset)1;
1535
1536
1537
1538
1539 if (!((blockStart >= fd_start[currentAggRankListIndex]) && (blockStart <= fd_end[currentAggRankListIndex]))) {
1540 while (!((blockStart >= fd_start[currentAggRankListIndex]) && (blockStart <= fd_end[currentAggRankListIndex])))
1541 currentAggRankListIndex++;
1542 };
1543
1544
1545
1546 if (blockIter>0) {
1547 if ((offset_list[blockIter-1]+len_list[blockIter-1]-(ADIO_Offset)1) < fd_start[currentAggRankListIndex])
1548 numSourceAggs++;
1549 }
1550
1551
1552
1553 if ((blockStart - fd_start[currentAggRankListIndex]) >= coll_bufsize) {
1554 ADIO_Offset currentRoundBlockStart = fd_start[currentAggRankListIndex];
1555 int startingRound = 0;
1556 while (blockStart > (currentRoundBlockStart + coll_bufsize - (ADIO_Offset)1)) {
1557 currentRoundBlockStart+=coll_bufsize;
1558 startingRound++;
1559 }
1560 sourceAggsForMyDataCurrentRoundIter[numSourceAggs] = startingRound;
1561 }
1562
1563
1564
1565 if (sourceAggsForMyDataFirstOffLenIndex[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs] == -1) {
1566 sourceAggsForMyData[numSourceAggs] = fd->hints->ranklist[currentAggRankListIndex];
1567 sourceAggsForMyDataFDStart[numSourceAggs] = fd_start[currentAggRankListIndex];
1568
1569
1570 if (currentAggRankListIndex == smallestFileDomainAggRank) {
1571 if (sourceAggsForMyDataFDStart[numSourceAggs] < firstFileOffset)
1572 sourceAggsForMyDataFDStart[numSourceAggs] = firstFileOffset;
1573 }
1574 sourceAggsForMyDataFDEnd[numSourceAggs] = fd_end[currentAggRankListIndex];
1575
1576
1577 if (currentAggRankListIndex == greatestFileDomainAggRank) {
1578 if (sourceAggsForMyDataFDEnd[numSourceAggs] > lastFileOffset)
1579 sourceAggsForMyDataFDEnd[numSourceAggs] = lastFileOffset;
1580 }
1581 sourceAggsForMyDataFirstOffLenIndex[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs] = blockIter;
1582
1583
1584
1585 if (bufTypeIsContig) {
1586 if (currentFDSourceBufferState[numSourceAggs].sourceBufferOffset == -1) {
1587
1588 currentFDSourceBufferState[numSourceAggs].sourceBufferOffset = currentRecvBufferOffset;
1589 #ifdef onesidedtrace
1590 printf("For agg %d sourceBufferOffset initialized to %ld\n",currentAggRankListIndex,currentRecvBufferOffset);
1591 #endif
1592 }
1593 }
1594 else {
1595 if (currentFDSourceBufferState[numSourceAggs].indiceOffset == -1) {
1596 currentFDSourceBufferState[numSourceAggs].indiceOffset = currentIndiceOffset;
1597 currentFDSourceBufferState[numSourceAggs].bufTypeExtent = bufTypeExtent;
1598 currentFDSourceBufferState[numSourceAggs].dataTypeExtent = currentDataTypeExtent;
1599 currentFDSourceBufferState[numSourceAggs].flatBufIndice = currentFlatBufIndice;
1600 #ifdef onesidedtrace
1601 printf("For agg %d dataTypeExtent initialized to %d flatBufIndice to %d indiceOffset to %ld\n",numSourceAggs,currentDataTypeExtent,currentFlatBufIndice,currentIndiceOffset);
1602 #endif
1603 }
1604 }
1605 intraRoundCollBufsizeOffset = fd_start[currentAggRankListIndex] + ((ADIO_Offset)(sourceAggsForMyDataCurrentRoundIter[numSourceAggs]+1) * coll_bufsize);
1606 #ifdef onesidedtrace
1607 printf("init settings numSourceAggs %d offset_list[%d] with value %ld past fd border %ld with len %ld currentRecvBufferOffset set to %ld intraRoundCollBufsizeOffset set to %ld\n",numSourceAggs,blockIter,offset_list[blockIter],fd_start[currentAggRankListIndex],len_list[blockIter],currentRecvBufferOffset,intraRoundCollBufsizeOffset);
1608 #endif
1609
1610 }
1611
1612
1613
1614 sourceAggsForMyDataLastOffLenIndex[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs] = blockIter;
1615
1616
1617
1618 if (blockEnd > fd_end[currentAggRankListIndex]) {
1619 #ifdef onesidedtrace
1620 printf("block extends past current fd, blockEnd %ld >= fd_end[currentAggRankListIndex] %ld total block size is %ld blockStart was %ld\n",blockEnd,fd_end[currentAggRankListIndex], len_list[blockIter],blockStart);
1621 #endif
1622 ADIO_Offset amountToAdvanceSBOffsetForFD = 0;
1623 int additionalFDCounter = 0;
1624 while (blockEnd >= fd_end[currentAggRankListIndex]) {
1625 ADIO_Offset thisAggBlockEnd = fd_end[currentAggRankListIndex];
1626 if (thisAggBlockEnd >= intraRoundCollBufsizeOffset) {
1627 while (thisAggBlockEnd >= intraRoundCollBufsizeOffset) {
1628 sourceAggsForMyDataCurrentRoundIter[numSourceAggs]++;
1629 intraRoundCollBufsizeOffset += coll_bufsize;
1630 sourceAggsForMyDataFirstOffLenIndex[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs] = blockIter;
1631 sourceAggsForMyDataLastOffLenIndex[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs] = blockIter;
1632 #ifdef onesidedtrace
1633 printf("sourceAggsForMyDataCurrentRoundI%d] is now %d intraRoundCollBufsizeOffset is now %ld\n",numSourceAggs,sourceAggsForMyDataCurrentRoundIter[numSourceAggs],intraRoundCollBufsizeOffset);
1634 #endif
1635 }
1636 }
1637
1638 int prevAggRankListIndex = currentAggRankListIndex;
1639 currentAggRankListIndex++;
1640
1641
1642
1643 if (fd_start[currentAggRankListIndex] > fd_end[currentAggRankListIndex]) {
1644 while (fd_start[currentAggRankListIndex] > fd_end[currentAggRankListIndex])
1645 currentAggRankListIndex++;
1646
1647 }
1648
1649
1650
1651 if (blockEnd >= fd_start[currentAggRankListIndex]) {
1652 numSourceAggs++;
1653 sourceAggsForMyData[numSourceAggs] = fd->hints->ranklist[currentAggRankListIndex];
1654 sourceAggsForMyDataFDStart[numSourceAggs] = fd_start[currentAggRankListIndex];
1655
1656
1657 if (currentAggRankListIndex == smallestFileDomainAggRank) {
1658 if (sourceAggsForMyDataFDStart[numSourceAggs] < firstFileOffset)
1659 sourceAggsForMyDataFDStart[numSourceAggs] = firstFileOffset;
1660 }
1661 sourceAggsForMyDataFDEnd[numSourceAggs] = fd_end[currentAggRankListIndex];
1662
1663
1664 if (currentAggRankListIndex == greatestFileDomainAggRank) {
1665 if (sourceAggsForMyDataFDEnd[numSourceAggs] > lastFileOffset)
1666 sourceAggsForMyDataFDEnd[numSourceAggs] = lastFileOffset;
1667 }
1668 sourceAggsForMyDataFirstOffLenIndex[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs] = blockIter;
1669
1670
1671
1672
1673
1674
1675
1676 if (additionalFDCounter == 0)
1677 amountToAdvanceSBOffsetForFD = (fd_end[prevAggRankListIndex] - blockStart) + (ADIO_Offset)1;
1678 else
1679 amountToAdvanceSBOffsetForFD = (fd_end[prevAggRankListIndex]-fd_start[prevAggRankListIndex]) +(ADIO_Offset)1;
1680
1681 if (bufTypeIsContig) {
1682 ADIOI_Assert(numSourceAggs > 0);
1683 if (currentFDSourceBufferState[numSourceAggs].sourceBufferOffset == -1) {
1684 if (additionalFDCounter == 0) {
1685 currentFDSourceBufferState[numSourceAggs].sourceBufferOffset = currentRecvBufferOffset+amountToAdvanceSBOffsetForFD;
1686 }
1687 else {
1688 currentFDSourceBufferState[numSourceAggs].sourceBufferOffset = currentFDSourceBufferState[numSourceAggs-1].sourceBufferOffset+amountToAdvanceSBOffsetForFD;
1689 }
1690
1691 #ifdef onesidedtrace
1692 printf("Crossed into new FD - for agg %d sourceBufferOffset initialized to %ld amountToAdvanceSBOffsetForFD is %ld\n",numSourceAggs,currentFDSourceBufferState[numSourceAggs].sourceBufferOffset,amountToAdvanceSBOffsetForFD);
1693 #endif
1694 }
1695 }
1696 else if (currentFDSourceBufferState[numSourceAggs].indiceOffset == -1) {
1697
1698 ADIOI_Assert(numSourceAggs > 0);
1699
1700
1701
1702
1703 if (additionalFDCounter == 0) {
1704
1705 currentFDSourceBufferState[numSourceAggs].indiceOffset = currentIndiceOffset;
1706 currentFDSourceBufferState[numSourceAggs].bufTypeExtent = bufTypeExtent;
1707 currentFDSourceBufferState[numSourceAggs].dataTypeExtent = currentDataTypeExtent;
1708 currentFDSourceBufferState[numSourceAggs].flatBufIndice = currentFlatBufIndice;
1709 }
1710 else {
1711
1712
1713 currentFDSourceBufferState[numSourceAggs].indiceOffset = currentFDSourceBufferState[numSourceAggs-1].indiceOffset;
1714 currentFDSourceBufferState[numSourceAggs].bufTypeExtent = currentFDSourceBufferState[numSourceAggs-1].bufTypeExtent;
1715 currentFDSourceBufferState[numSourceAggs].dataTypeExtent = currentFDSourceBufferState[numSourceAggs-1].dataTypeExtent;
1716 currentFDSourceBufferState[numSourceAggs].flatBufIndice = currentFDSourceBufferState[numSourceAggs-1].flatBufIndice;
1717 }
1718 nonContigSourceDataBufferAdvance(((char*)buf), flatBuf, (int)amountToAdvanceSBOffsetForFD, 0, ¤tFDSourceBufferState[numSourceAggs], NULL);
1719 #ifdef onesidedtrace
1720 printf("Crossed into new FD - for agg %d dataTypeExtent initialized to %d flatBufIndice to %d indiceOffset to %ld amountToAdvanceSBOffsetForFD is %d\n",numSourceAggs,currentFDSourceBufferState[numSourceAggs].dataTypeExtent,currentFDSourceBufferState[numSourceAggs].flatBufIndice,currentFDSourceBufferState[numSourceAggs].indiceOffset,amountToAdvanceSBOffsetForFD);
1721 #endif
1722 }
1723
1724 additionalFDCounter++;
1725
1726
1727 #ifdef onesidedtrace
1728 printf("block extended beyond fd init settings numSourceAggs %d offset_list[%d] with value %ld past fd border %ld with len %ld\n",numSourceAggs,i,offset_list[blockIter],fd_start[currentAggRankListIndex],len_list[blockIter]);
1729 #endif
1730 intraRoundCollBufsizeOffset = fd_start[currentAggRankListIndex] + coll_bufsize;
1731 sourceAggsForMyDataLastOffLenIndex[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs] = blockIter;
1732 }
1733 }
1734 }
1735
1736
1737
1738
1739 if (blockEnd >= intraRoundCollBufsizeOffset) {
1740 ADIO_Offset currentBlockEnd = blockEnd;
1741 while (currentBlockEnd >= intraRoundCollBufsizeOffset) {
1742 sourceAggsForMyDataCurrentRoundIter[numSourceAggs]++;
1743 intraRoundCollBufsizeOffset += coll_bufsize;
1744 sourceAggsForMyDataFirstOffLenIndex[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs] = blockIter;
1745 sourceAggsForMyDataLastOffLenIndex[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs] = blockIter;
1746 #ifdef onesidedtrace
1747 printf("block less than fd currentBlockEnd is now %ld intraRoundCollBufsizeOffset is now %ld sourceAggsForMyDataCurrentRoundIter[%d] is now %d\n",currentBlockEnd, intraRoundCollBufsizeOffset, numSourceAggs,sourceAggsForMyDataCurrentRoundIter[numSourceAggs]);
1748 #endif
1749 }
1750 }
1751
1752
1753
1754
1755 if (blockIter == (contig_access_count-1))
1756 numSourceAggs++;
1757 }
1758
1759 #ifdef onesidedtrace
1760 printf("numSourceAggs is %d\n",numSourceAggs);
1761 for (i=0;i<numSourceAggs;i++) {
1762 for (j=0;j<=sourceAggsForMyDataCurrentRoundIter[i];j++)
1763 printf("sourceAggsForMyData[%d] is %d sourceAggsForMyDataFDStart[%d] is %ld sourceAggsForMyDataFDEnd is %ld sourceAggsForMyDataFirstOffLenIndex is %d with value %ld sourceAggsForMyDataLastOffLenIndex is %d with value %ld\n",i,sourceAggsForMyData[i],i,sourceAggsForMyDataFDStart[i],sourceAggsForMyDataFDEnd[i],sourceAggsForMyDataFirstOffLenIndex[j][i],offset_list[sourceAggsForMyDataFirstOffLenIndex[j][i]],sourceAggsForMyDataLastOffLenIndex[j][i],offset_list[sourceAggsForMyDataLastOffLenIndex[j][i]]);
1764
1765 }
1766 #endif
1767
1768 }
1769
1770 ADIOI_Free(sourceAggsForMyDataCurrentRoundIter);
1771
1772
1773
1774 char *read_buf0 = fd->io_buf;
1775 char *read_buf1 = fd->io_buf + coll_bufsize;
1776
1777 char *read_buf = read_buf0;
1778
1779 int currentReadBuf = 0;
1780 int useIOBuffer = 0;
1781 #ifdef ROMIO_GPFS
1782 if (gpfsmpio_pthreadio && (numberOfRounds>1)) {
1783 useIOBuffer = 1;
1784 io_thread = pthread_self();
1785 }
1786 #endif
1787
1788 MPI_Win read_buf_window = fd->io_buf_window;
1789
1790 ADIO_Offset currentRoundFDStart = 0, nextRoundFDStart = 0;
1791 ADIO_Offset currentRoundFDEnd = 0, nextRoundFDEnd = 0;
1792
1793 if (iAmUsedAgg) {
1794 currentRoundFDStart = fd_start[myAggRank];
1795 nextRoundFDStart = fd_start[myAggRank];
1796 if (myAggRank == smallestFileDomainAggRank) {
1797 if (currentRoundFDStart < firstFileOffset)
1798 currentRoundFDStart = firstFileOffset;
1799 if (nextRoundFDStart < firstFileOffset)
1800 nextRoundFDStart = firstFileOffset;
1801 }
1802 else if (myAggRank == greatestFileDomainAggRank) {
1803 if (currentRoundFDEnd > lastFileOffset)
1804 currentRoundFDEnd = lastFileOffset;
1805 if (nextRoundFDEnd > lastFileOffset)
1806 nextRoundFDEnd = lastFileOffset;
1807 }
1808 #ifdef onesidedtrace
1809 printf("iAmUsedAgg - currentRoundFDStart initialized "
1810 "to %ld currentRoundFDEnd to %ld\n",
1811 currentRoundFDStart,currentRoundFDEnd);
1812 #endif
1813
1814
1815 }
1816
1817 #ifdef ROMIO_GPFS
1818 endTimeBase = MPI_Wtime();
1819 gpfsmpio_prof_cw[GPFSMPIO_CIO_T_DEXCH_SETUP] += (endTimeBase-startTimeBase);
1820 startTimeBase = MPI_Wtime();
1821 #endif
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831 int roundIter;
1832 for (roundIter=0;roundIter<numberOfRounds;roundIter++) {
1833
1834 if ((contig_access_count > 0) && (buf != NULL) && lenListOverZero)
1835 {
1836
1837 if (iAmUsedAgg) {
1838
1839 currentRoundFDStart = nextRoundFDStart;
1840
1841 if (!useIOBuffer || (roundIter == 0)) {
1842 int amountDataToReadThisRound;
1843 if ((fd_end[myAggRank] - currentRoundFDStart) < coll_bufsize) {
1844 currentRoundFDEnd = fd_end[myAggRank];
1845 amountDataToReadThisRound = ((currentRoundFDEnd-currentRoundFDStart)+1);
1846 }
1847 else {
1848 currentRoundFDEnd = currentRoundFDStart + coll_bufsize - (ADIO_Offset)1;
1849 amountDataToReadThisRound = coll_bufsize;
1850 }
1851
1852
1853 ADIO_ReadContig(fd, read_buf,amountDataToReadThisRound,
1854 MPI_BYTE, ADIO_EXPLICIT_OFFSET, currentRoundFDStart,
1855 &status, error_code);
1856 currentReadBuf = 1;
1857
1858 }
1859 if (useIOBuffer) {
1860
1861
1862 if (roundIter > 0)
1863 currentRoundFDEnd = nextRoundFDEnd;
1864
1865 if (roundIter < (numberOfRounds-1)) {
1866 nextRoundFDStart += coll_bufsize;
1867 int amountDataToReadNextRound;
1868 if ((fd_end[myAggRank] - nextRoundFDStart) < coll_bufsize) {
1869 nextRoundFDEnd = fd_end[myAggRank];
1870 amountDataToReadNextRound = ((nextRoundFDEnd-nextRoundFDStart)+1);
1871 }
1872 else {
1873 nextRoundFDEnd = nextRoundFDStart + coll_bufsize - (ADIO_Offset)1;
1874 amountDataToReadNextRound = coll_bufsize;
1875 }
1876
1877 if(!pthread_equal(io_thread, pthread_self())) {
1878 pthread_join(io_thread, &thread_ret);
1879 *error_code = *(int *)thread_ret;
1880 if (*error_code != MPI_SUCCESS) return;
1881 io_thread = pthread_self();
1882
1883 }
1884 io_thread_args.fd = fd;
1885
1886
1887
1888 if (currentReadBuf == 0) {
1889 io_thread_args.buf = read_buf0;
1890 currentReadBuf = 1;
1891 read_buf = read_buf1;
1892 }
1893 else {
1894 io_thread_args.buf = read_buf1;
1895 currentReadBuf = 0;
1896 read_buf = read_buf0;
1897 }
1898 io_thread_args.io_kind = ADIOI_READ;
1899 io_thread_args.size = amountDataToReadNextRound;
1900 io_thread_args.offset = nextRoundFDStart;
1901 io_thread_args.status = &status;
1902 io_thread_args.error_code = *error_code;
1903 if ( (pthread_create(&io_thread, NULL,
1904 ADIOI_IO_Thread_Func, &(io_thread_args))) != 0)
1905 io_thread = pthread_self();
1906
1907 }
1908 else {
1909
1910 if(!pthread_equal(io_thread, pthread_self())) {
1911 pthread_join(io_thread, &thread_ret);
1912 *error_code = *(int *)thread_ret;
1913 if (*error_code != MPI_SUCCESS) return;
1914 io_thread = pthread_self();
1915
1916 }
1917 if (currentReadBuf == 0) {
1918 read_buf = read_buf0;
1919 }
1920 else {
1921 read_buf = read_buf1;
1922 }
1923
1924 }
1925 }
1926 }
1927 else if (useIOBuffer) {
1928 if (roundIter < (numberOfRounds-1)) {
1929 if (currentReadBuf == 0) {
1930 currentReadBuf = 1;
1931 read_buf = read_buf1;
1932 }
1933 else {
1934 currentReadBuf = 0;
1935 read_buf = read_buf0;
1936 }
1937 }
1938 else {
1939 if (currentReadBuf == 0) {
1940 read_buf = read_buf0;
1941 }
1942 else {
1943 read_buf = read_buf1;
1944 }
1945 }
1946
1947 }
1948
1949 MPI_Barrier(fd->comm);
1950
1951 int aggIter;
1952 for (aggIter=0;aggIter<numSourceAggs;aggIter++) {
1953
1954
1955
1956 if (sourceAggsForMyDataFirstOffLenIndex[roundIter][aggIter] != -1) {
1957
1958 ADIO_Offset currentRoundFDStartForMySourceAgg = (ADIO_Offset)((ADIO_Offset)sourceAggsForMyDataFDStart[aggIter] + (ADIO_Offset)((ADIO_Offset)roundIter*coll_bufsize));
1959 ADIO_Offset currentRoundFDEndForMySourceAgg = (ADIO_Offset)((ADIO_Offset)sourceAggsForMyDataFDStart[aggIter] + (ADIO_Offset)((ADIO_Offset)(roundIter+1)*coll_bufsize) - (ADIO_Offset)1);
1960
1961 int sourceAggContigAccessCount = 0;
1962
1963
1964
1965
1966 int *sourceAggBlockLengths=NULL;
1967 MPI_Aint *sourceAggDisplacements=NULL, *recvBufferDisplacements=NULL;
1968 MPI_Datatype *sourceAggDataTypes=NULL;
1969 char *derivedTypePackedSourceBuffer;
1970 int derivedTypePackedSourceBufferOffset = 0;
1971 int allocatedDerivedTypeArrays = 0;
1972 ADIO_Offset amountOfDataReadThisRoundAgg = 0;
1973
1974
1975
1976 int offsetIter;
1977 int startingOffLenIndex = sourceAggsForMyDataFirstOffLenIndex[roundIter][aggIter], endingOffLenIndex = sourceAggsForMyDataLastOffLenIndex[roundIter][aggIter];
1978 for (offsetIter=startingOffLenIndex;offsetIter<=endingOffLenIndex;offsetIter++) {
1979 if (currentRoundFDEndForMySourceAgg > sourceAggsForMyDataFDEnd[aggIter])
1980 currentRoundFDEndForMySourceAgg = sourceAggsForMyDataFDEnd[aggIter];
1981
1982 ADIO_Offset offsetStart = offset_list[offsetIter], offsetEnd = (offset_list[offsetIter]+len_list[offsetIter]-(ADIO_Offset)1);
1983
1984
1985
1986 int bufferAmountToRecv = 0;
1987
1988 if ((offsetStart >= currentRoundFDStartForMySourceAgg) && (offsetStart <= currentRoundFDEndForMySourceAgg)) {
1989 if (offsetEnd > currentRoundFDEndForMySourceAgg)
1990 bufferAmountToRecv = (currentRoundFDEndForMySourceAgg - offsetStart) +1;
1991 else
1992 bufferAmountToRecv = (offsetEnd - offsetStart) +1;
1993 }
1994 else if ((offsetEnd >= currentRoundFDStartForMySourceAgg) && (offsetEnd <= currentRoundFDEndForMySourceAgg)) {
1995 if (offsetEnd > currentRoundFDEndForMySourceAgg)
1996 bufferAmountToRecv = (currentRoundFDEndForMySourceAgg - currentRoundFDStartForMySourceAgg) +1;
1997 else
1998 bufferAmountToRecv = (offsetEnd - currentRoundFDStartForMySourceAgg) +1;
1999 if (offsetStart < currentRoundFDStartForMySourceAgg) {
2000 offsetStart = currentRoundFDStartForMySourceAgg;
2001 }
2002 }
2003 else if ((offsetStart <= currentRoundFDStartForMySourceAgg) && (offsetEnd >= currentRoundFDEndForMySourceAgg)) {
2004 bufferAmountToRecv = (currentRoundFDEndForMySourceAgg - currentRoundFDStartForMySourceAgg) +1;
2005 offsetStart = currentRoundFDStartForMySourceAgg;
2006 }
2007
2008 if (bufferAmountToRecv > 0) {
2009 if (gpfsmpio_read_aggmethod == 2) {
2010
2011
2012 if (!allocatedDerivedTypeArrays) {
2013 sourceAggBlockLengths = (int *)ADIOI_Malloc(maxNumContigOperations * sizeof(int));
2014 sourceAggDisplacements = (MPI_Aint *)ADIOI_Malloc(maxNumContigOperations * sizeof(MPI_Aint));
2015 recvBufferDisplacements = (MPI_Aint *)ADIOI_Malloc(maxNumContigOperations * sizeof(MPI_Aint));
2016 sourceAggDataTypes = (MPI_Datatype *)ADIOI_Malloc(maxNumContigOperations * sizeof(MPI_Datatype));
2017 if (!bufTypeIsContig) {
2018 int k;
2019 for (k=sourceAggsForMyDataFirstOffLenIndex[roundIter][aggIter];k<=sourceAggsForMyDataLastOffLenIndex[roundIter][aggIter];k++)
2020 amountOfDataReadThisRoundAgg += len_list[k];
2021
2022 #ifdef onesidedtrace
2023 printf("derivedTypePackedSourceBuffer mallocing %ld\n",amountOfDataReadThisRoundAgg);
2024 #endif
2025 if (amountOfDataReadThisRoundAgg > 0)
2026 derivedTypePackedSourceBuffer = (char *)ADIOI_Malloc(amountOfDataReadThisRoundAgg * sizeof(char));
2027 else
2028 derivedTypePackedSourceBuffer = NULL;
2029 }
2030 allocatedDerivedTypeArrays = 1;
2031 }
2032 }
2033
2034
2035
2036 MPI_Aint sourceDisplacementToUseThisRound = (MPI_Aint) (offsetStart - currentRoundFDStartForMySourceAgg);
2037
2038
2039
2040 if (useIOBuffer && (read_buf == read_buf1)) {
2041 sourceDisplacementToUseThisRound += (MPI_Aint)coll_bufsize;
2042 }
2043
2044
2045
2046
2047
2048 if (gpfsmpio_read_aggmethod == 1) {
2049 MPI_Win_lock(MPI_LOCK_SHARED, sourceAggsForMyData[aggIter], 0, read_buf_window);
2050 char *getSourceData = NULL;
2051 if (bufTypeIsContig) {
2052 MPI_Get(((char*)buf) + currentFDSourceBufferState[aggIter].sourceBufferOffset,bufferAmountToRecv, MPI_BYTE,sourceAggsForMyData[aggIter],sourceDisplacementToUseThisRound, bufferAmountToRecv,MPI_BYTE,read_buf_window);
2053 currentFDSourceBufferState[aggIter].sourceBufferOffset += (ADIO_Offset)bufferAmountToRecv;
2054
2055 }
2056 else {
2057 getSourceData = (char *) ADIOI_Malloc(bufferAmountToRecv*sizeof(char));
2058 MPI_Get(getSourceData,bufferAmountToRecv, MPI_BYTE,sourceAggsForMyData[aggIter],sourceDisplacementToUseThisRound, bufferAmountToRecv,MPI_BYTE,read_buf_window);
2059
2060 }
2061 MPI_Win_unlock(sourceAggsForMyData[aggIter], read_buf_window);
2062 if (!bufTypeIsContig) {
2063 nonContigSourceDataBufferAdvance(((char*)buf), flatBuf, bufferAmountToRecv, 0, ¤tFDSourceBufferState[aggIter], getSourceData);
2064 ADIOI_Free(getSourceData);
2065 }
2066 }
2067
2068
2069
2070
2071
2072 else if (gpfsmpio_read_aggmethod == 2) {
2073 if (bufTypeIsContig) {
2074 sourceAggBlockLengths[sourceAggContigAccessCount]= bufferAmountToRecv;
2075 sourceAggDataTypes[sourceAggContigAccessCount] = MPI_BYTE;
2076 sourceAggDisplacements[sourceAggContigAccessCount] = sourceDisplacementToUseThisRound;
2077 recvBufferDisplacements[sourceAggContigAccessCount] = (MPI_Aint)currentFDSourceBufferState[aggIter].sourceBufferOffset;
2078 currentFDSourceBufferState[aggIter].sourceBufferOffset += (ADIO_Offset)bufferAmountToRecv;
2079 sourceAggContigAccessCount++;
2080 }
2081 else {
2082 sourceAggBlockLengths[sourceAggContigAccessCount]= bufferAmountToRecv;
2083 sourceAggDataTypes[sourceAggContigAccessCount] = MPI_BYTE;
2084 sourceAggDisplacements[sourceAggContigAccessCount] = sourceDisplacementToUseThisRound;
2085 recvBufferDisplacements[sourceAggContigAccessCount] = (MPI_Aint)derivedTypePackedSourceBufferOffset;
2086 derivedTypePackedSourceBufferOffset += (ADIO_Offset)bufferAmountToRecv;
2087 sourceAggContigAccessCount++;
2088 }
2089 }
2090 }
2091 }
2092
2093
2094
2095 if (gpfsmpio_read_aggmethod == 2) {
2096 MPI_Datatype recvBufferDerivedDataType, sourceBufferDerivedDataType;
2097
2098 MPI_Type_create_struct(sourceAggContigAccessCount, sourceAggBlockLengths, recvBufferDisplacements, sourceAggDataTypes, &recvBufferDerivedDataType);
2099 MPI_Type_commit(&recvBufferDerivedDataType);
2100 MPI_Type_create_struct(sourceAggContigAccessCount, sourceAggBlockLengths, sourceAggDisplacements, sourceAggDataTypes, &sourceBufferDerivedDataType);
2101 MPI_Type_commit(&sourceBufferDerivedDataType);
2102
2103 if (sourceAggContigAccessCount > 0) {
2104
2105 MPI_Win_lock(MPI_LOCK_SHARED, sourceAggsForMyData[aggIter], 0, read_buf_window);
2106 if (bufTypeIsContig) {
2107 MPI_Get(((char*)buf),1, recvBufferDerivedDataType,sourceAggsForMyData[aggIter],0, 1,sourceBufferDerivedDataType,read_buf_window);
2108 }
2109 else {
2110 MPI_Get(derivedTypePackedSourceBuffer,1, recvBufferDerivedDataType,sourceAggsForMyData[aggIter],0, 1,sourceBufferDerivedDataType,read_buf_window);
2111 }
2112
2113 MPI_Win_unlock(sourceAggsForMyData[aggIter], read_buf_window);
2114 if (!bufTypeIsContig) {
2115 nonContigSourceDataBufferAdvance(((char*)buf), flatBuf, derivedTypePackedSourceBufferOffset, 0, ¤tFDSourceBufferState[aggIter], derivedTypePackedSourceBuffer);
2116 }
2117 }
2118
2119 if (allocatedDerivedTypeArrays) {
2120 ADIOI_Free(sourceAggBlockLengths);
2121 ADIOI_Free(sourceAggDisplacements);
2122 ADIOI_Free(sourceAggDataTypes);
2123 ADIOI_Free(recvBufferDisplacements);
2124 if (!bufTypeIsContig)
2125 if (derivedTypePackedSourceBuffer != NULL)
2126 ADIOI_Free(derivedTypePackedSourceBuffer);
2127 }
2128 if (sourceAggContigAccessCount > 0) {
2129 MPI_Type_free(&recvBufferDerivedDataType);
2130 MPI_Type_free(&sourceBufferDerivedDataType);
2131 }
2132 }
2133 }
2134 }
2135 }
2136
2137
2138 MPI_Barrier(fd->comm);
2139
2140 nextRoundFDStart = currentRoundFDStart + coll_bufsize;
2141
2142 }
2143
2144 #ifdef ROMIO_GPFS
2145 endTimeBase = MPI_Wtime();
2146 gpfsmpio_prof_cw[GPFSMPIO_CIO_T_DEXCH] += (endTimeBase-startTimeBase);
2147 #endif
2148
2149 if (useIOBuffer) {
2150
2151 if ( !pthread_equal(io_thread, pthread_self()) ) {
2152 pthread_join(io_thread, &thread_ret);
2153 *error_code = *(int *)thread_ret;
2154 }
2155
2156 }
2157
2158 ADIOI_Free(sourceAggsForMyData);
2159 ADIOI_Free(sourceAggsForMyDataFDStart);
2160 ADIOI_Free(sourceAggsForMyDataFDEnd);
2161
2162 for (i=0;i<numberOfRounds;i++) {
2163 ADIOI_Free(sourceAggsForMyDataFirstOffLenIndex[i]);
2164 ADIOI_Free(sourceAggsForMyDataLastOffLenIndex[i]);
2165 }
2166 ADIOI_Free(sourceAggsForMyDataFirstOffLenIndex);
2167 ADIOI_Free(sourceAggsForMyDataLastOffLenIndex);
2168
2169 ADIOI_Free(currentFDSourceBufferState);
2170
2171 if (!bufTypeIsContig)
2172 ADIOI_Delete_flattened(datatype);
2173 return;
2174 }