This source file includes following definitions.
- ADIOI_P2PContigWriteAggregation
- ADIOI_P2PContigReadAggregation
   1 
   2 
   3 
   4 
   5 
   6 
   7 #include "adio.h"
   8 #include "adio_extern.h"
   9 #include "../ad_gpfs/ad_gpfs_tuning.h"
  10 
  11 #include <pthread.h>
  12 
  13 
  14 
  15 void ADIOI_P2PContigWriteAggregation(ADIO_File fd,
  16         const void *buf,
  17         int *error_code,
  18         ADIO_Offset *st_offsets,
  19         ADIO_Offset *end_offsets,
  20         ADIO_Offset *fd_start,
  21         ADIO_Offset* fd_end)
  22 {
  23 
  24     *error_code = MPI_SUCCESS; 
  25 
  26 #ifdef ROMIO_GPFS
  27     double startTimeBase,endTimeBase;
  28 #endif
  29 
  30     MPI_Status status;
  31     pthread_t io_thread;
  32     void *thread_ret;
  33     ADIOI_IO_ThreadFuncData io_thread_args;
  34 
  35     int nprocs,myrank;
  36     MPI_Comm_size(fd->comm, &nprocs);
  37     MPI_Comm_rank(fd->comm, &myrank);
  38 
  39         ADIO_Offset myOffsetStart = st_offsets[myrank], myOffsetEnd = end_offsets[myrank];
  40 
  41     int myAggRank = -1; 
  42     int iAmUsedAgg = 0;
  43 
  44 #ifdef ROMIO_GPFS
  45     startTimeBase = MPI_Wtime();
  46 #endif
  47 
  48     int naggs = fd->hints->cb_nodes;
  49     int coll_bufsize = fd->hints->cb_buffer_size;
  50 #ifdef ROMIO_GPFS
  51     if (gpfsmpio_pthreadio == 1) {
  52         
  53         coll_bufsize = fd->hints->cb_buffer_size/2;
  54     }
  55 #endif
  56 
  57     int j;
  58     for (j=0;j<naggs;j++) {
  59         if (fd->hints->ranklist[j] == myrank) {
  60             myAggRank = j;
  61             if (fd_end[j] > fd_start[j]) {
  62                 iAmUsedAgg = 1;
  63             }
  64         }
  65     }
  66 
  67     
  68 
  69 
  70     int *targetAggsForMyData = (int *)ADIOI_Malloc(naggs * sizeof(int));
  71     ADIO_Offset *targetAggsForMyDataFDStart = (ADIO_Offset *)ADIOI_Malloc(naggs * sizeof(ADIO_Offset));
  72     ADIO_Offset *targetAggsForMyDataFDEnd = (ADIO_Offset *)ADIOI_Malloc(naggs * sizeof(ADIO_Offset));
  73     int numTargetAggs = 0;
  74     int i;
  75     for (i=0;i<naggs;i++) {
  76         if ( ((myOffsetStart >= fd_start[i]) &&  (myOffsetStart <= fd_end[i])) || ((myOffsetEnd >= fd_start[i]) &&  (myOffsetEnd <= fd_end[i]))) {
  77             targetAggsForMyData[numTargetAggs] = fd->hints->ranklist[i];
  78             targetAggsForMyDataFDStart[numTargetAggs] = fd_start[i];
  79             targetAggsForMyDataFDEnd[numTargetAggs] = fd_end[i];
  80             numTargetAggs++;
  81         }
  82     }
  83 
  84     
  85     int *sourceProcsForMyData=NULL;
  86     int *remainingDataAmountToGetPerProc=NULL;
  87     ADIO_Offset *remainingDataOffsetToGetPerProc=NULL;
  88 
  89     int numSourceProcs = 0;
  90 
  91     if (iAmUsedAgg) { 
  92 
  93 
  94         
  95         for (i=0;i<nprocs;i++)
  96             if ( ((st_offsets[i] >= fd_start[myAggRank]) &&  (st_offsets[i] <= fd_end[myAggRank])) || ((end_offsets[i] >= fd_start[myAggRank]) &&  (end_offsets[i] <= fd_end[myAggRank])))
  97                 numSourceProcs++;
  98 
  99         sourceProcsForMyData = (int *)ADIOI_Malloc(numSourceProcs * sizeof(int));
 100         remainingDataAmountToGetPerProc = (int *)ADIOI_Malloc(numSourceProcs * sizeof(int));
 101         remainingDataOffsetToGetPerProc = (ADIO_Offset *)ADIOI_Malloc(numSourceProcs * sizeof(ADIO_Offset));
 102 
 103         
 104 
 105 
 106         numSourceProcs = 0;
 107         for (i=0;i<nprocs;i++) {
 108             if ( ((st_offsets[i] >= fd_start[myAggRank]) &&  (st_offsets[i] <= fd_end[myAggRank])) || ((end_offsets[i] >= fd_start[myAggRank]) &&  (end_offsets[i] <= fd_end[myAggRank]))) {
 109                 sourceProcsForMyData[numSourceProcs] = i;
 110                 if ( ((st_offsets[i] >= fd_start[myAggRank]) &&  (st_offsets[i] <= fd_end[myAggRank])) && ((end_offsets[i] >= fd_start[myAggRank]) &&  (end_offsets[i] <= fd_end[myAggRank]))) {
 111                     remainingDataAmountToGetPerProc[numSourceProcs] = (end_offsets[i] - st_offsets[i])+1;
 112                     remainingDataOffsetToGetPerProc[numSourceProcs] = st_offsets[i];
 113                 }
 114                 else if ((st_offsets[i] >= fd_start[myAggRank]) &&  (st_offsets[i] <= fd_end[myAggRank])) {
 115                     remainingDataAmountToGetPerProc[numSourceProcs] = (fd_end[myAggRank] - st_offsets[i]) +1;
 116                     remainingDataOffsetToGetPerProc[numSourceProcs] = st_offsets[i];
 117                 }
 118                 else { 
 119                     remainingDataAmountToGetPerProc[numSourceProcs] = (end_offsets[i] - fd_start[myAggRank]) +1;
 120                     remainingDataOffsetToGetPerProc[numSourceProcs] = fd_start[myAggRank];
 121                 }
 122 #ifdef p2pcontigtrace
 123                 printf("getting %ld bytes from source proc %d in fd rank %d with borders %ld to %ld\n",remainingDataAmountToGetPerProc[numSourceProcs],i,fd->hints->ranklist[myAggRank],fd_start[myAggRank],fd_end[myAggRank]);
 124 #endif
 125                 numSourceProcs++;
 126             }
 127         }
 128     }
 129 
 130     int *amountOfDataReqestedByTargetAgg = (int *)ADIOI_Malloc(naggs * sizeof(int));
 131     for (i=0;i<numTargetAggs;i++) {
 132         amountOfDataReqestedByTargetAgg[i] = 0;
 133     }
 134 
 135     int totalAmountDataReceived = 0;
 136     MPI_Request *mpiSizeToSendRequest = (MPI_Request *) ADIOI_Malloc(numTargetAggs * sizeof(MPI_Request));
 137     MPI_Request *mpiRecvDataRequest = (MPI_Request *) ADIOI_Malloc(numSourceProcs * sizeof(MPI_Request));
 138     MPI_Request *mpiSendDataSizeRequest = (MPI_Request *) ADIOI_Malloc(numSourceProcs * sizeof(MPI_Request));
 139 
 140     MPI_Request *mpiSendDataToTargetAggRequest = (MPI_Request *) ADIOI_Malloc(numTargetAggs * sizeof(MPI_Request));
 141     MPI_Status mpiWaitAnyStatusFromTargetAggs,mpiWaitAnyStatusFromSourceProcs;
 142     MPI_Status mpiIsendStatusForSize,  mpiIsendStatusForData;
 143 
 144     
 145     char *write_buf0 = fd->io_buf;
 146     char *write_buf1 = fd->io_buf + coll_bufsize;
 147 
 148     
 149 
 150     char *write_buf = write_buf0;
 151 
 152     
 153     ADIO_Offset numberOfRounds = (ADIO_Offset)((((ADIO_Offset)(end_offsets[nprocs-1]-st_offsets[0]))/((ADIO_Offset)((ADIO_Offset)coll_bufsize*(ADIO_Offset)naggs)))) + 1;
 154 
 155     int currentWriteBuf = 0;
 156     int useIOBuffer = 0;
 157 #ifdef ROMIO_GPFS
 158     if (gpfsmpio_pthreadio && (numberOfRounds>1)) {
 159         useIOBuffer = 1;
 160         io_thread = pthread_self();
 161     }
 162 #endif
 163 
 164     ADIO_Offset currentRoundFDStart = 0;
 165     ADIO_Offset currentRoundFDEnd = 0;
 166 
 167     if (iAmUsedAgg) {
 168         currentRoundFDStart = fd_start[myAggRank];
 169     }
 170 
 171     int *dataSizeGottenThisRoundPerProc = (int *)ADIOI_Malloc(numSourceProcs * sizeof(int));
 172     int *mpiRequestMapPerProc = (int *)ADIOI_Malloc(numSourceProcs * sizeof(int));
 173     int *targetAggIndexesForMyDataThisRound = (int *)ADIOI_Malloc(numTargetAggs * sizeof(int));
 174     int *sendBufferOffsetsThisRound = (int *)ADIOI_Malloc(numTargetAggs * sizeof(int));
 175     int *bufferAmountsToSendThisRound = (int *)ADIOI_Malloc(numTargetAggs * sizeof(int));
 176 
 177 #ifdef ROMIO_GPFS
 178     endTimeBase = MPI_Wtime();
 179     gpfsmpio_prof_cw[GPFSMPIO_CIO_T_MYREQ] += (endTimeBase-startTimeBase);
 180     startTimeBase = MPI_Wtime();
 181 #endif
 182 
 183     
 184 
 185     int roundIter;
 186     for (roundIter=0;roundIter<numberOfRounds;roundIter++) {
 187 
 188         
 189         int numTargetAggsThisRound = 0;
 190         for (i=0;i<numTargetAggs;i++) {
 191             if ( ((myOffsetStart >= targetAggsForMyDataFDStart[i]) && (myOffsetStart <= targetAggsForMyDataFDEnd[i])) ||
 192                     ((myOffsetEnd >= targetAggsForMyDataFDStart[i]) && (myOffsetEnd <= targetAggsForMyDataFDEnd[i]))) {
 193                 
 194 
 195                 
 196                 ADIO_Offset currentRoundFDStartForMyTargetAgg = (ADIO_Offset)((ADIO_Offset)targetAggsForMyDataFDStart[i] + (ADIO_Offset)((ADIO_Offset)roundIter*(ADIO_Offset)coll_bufsize));
 197                 ADIO_Offset currentRoundFDEndForMyTargetAgg = (ADIO_Offset)((ADIO_Offset)targetAggsForMyDataFDStart[i] + (ADIO_Offset)((ADIO_Offset)(roundIter+1)*(ADIO_Offset)coll_bufsize) - (ADIO_Offset)1);
 198                 if (currentRoundFDEndForMyTargetAgg > targetAggsForMyDataFDEnd[i])
 199                     currentRoundFDEndForMyTargetAgg = targetAggsForMyDataFDEnd[i];
 200 
 201 #ifdef p2pcontigtrace
 202                 printf("roundIter %d target iter %d targetAggsForMyData is %d myOffsetStart is %ld myOffsetEnd is %ld targetAggsForMyDataFDStart is %ld targetAggsForMyDataFDEnd is %ld currentRoundFDStartForMyTargetAgg is %ld currentRoundFDEndForMyTargetAgg is %ld\n",
 203                         roundIter,i,targetAggsForMyData[i],myOffsetStart,myOffsetEnd,
 204                         targetAggsForMyDataFDStart[i],targetAggsForMyDataFDEnd[i],
 205                         currentRoundFDStartForMyTargetAgg,currentRoundFDEndForMyTargetAgg);
 206 #endif
 207 
 208                 
 209 
 210 
 211                 
 212 
 213                 int sendBufferOffset = 0;
 214                 int bufferAmountToSend = 0;
 215 
 216                 if ((myOffsetStart >= currentRoundFDStartForMyTargetAgg) && (myOffsetStart <= currentRoundFDEndForMyTargetAgg)) {
 217                     if (myOffsetEnd > currentRoundFDEndForMyTargetAgg)
 218                         bufferAmountToSend = (currentRoundFDEndForMyTargetAgg - myOffsetStart) +1;
 219                     else
 220                         bufferAmountToSend = (myOffsetEnd - myOffsetStart) +1;
 221                 }
 222                 else if ((myOffsetEnd >= currentRoundFDStartForMyTargetAgg) && (myOffsetEnd <= currentRoundFDEndForMyTargetAgg)) {
 223                     sendBufferOffset = (int) (currentRoundFDStartForMyTargetAgg - myOffsetStart);
 224                     if (myOffsetEnd > currentRoundFDEndForMyTargetAgg)
 225                         bufferAmountToSend = (currentRoundFDEndForMyTargetAgg - currentRoundFDStartForMyTargetAgg) +1;
 226                     else
 227                         bufferAmountToSend = (myOffsetEnd - currentRoundFDStartForMyTargetAgg) +1;
 228                 }
 229                 else if ((myOffsetStart <= currentRoundFDStartForMyTargetAgg) && (myOffsetEnd >= currentRoundFDEndForMyTargetAgg)) {
 230                     sendBufferOffset = (int) (currentRoundFDStartForMyTargetAgg - myOffsetStart);
 231                     bufferAmountToSend = (currentRoundFDEndForMyTargetAgg - currentRoundFDStartForMyTargetAgg) +1;
 232                 }
 233 
 234                 if (bufferAmountToSend > 0) { 
 235                     targetAggIndexesForMyDataThisRound[numTargetAggsThisRound] = i;
 236                     sendBufferOffsetsThisRound[numTargetAggsThisRound] = sendBufferOffset;
 237                     bufferAmountsToSendThisRound[numTargetAggsThisRound] = bufferAmountToSend;
 238 #ifdef p2pcontigtrace
 239                     printf("bufferAmountToSend is %d sendBufferOffset is %d\n",bufferAmountToSend,sendBufferOffset);
 240 #endif
 241                     
 242 
 243                     if (roundIter > 0)
 244                         MPI_Irecv(&amountOfDataReqestedByTargetAgg[numTargetAggsThisRound],1,
 245                                 MPI_INT,targetAggsForMyData[i],0,
 246                                 fd->comm,&mpiSizeToSendRequest[numTargetAggsThisRound]);
 247                     numTargetAggsThisRound++;
 248 
 249                 }
 250             }
 251         }
 252 
 253         
 254         if (iAmUsedAgg) {
 255             if ((fd_end[myAggRank] - currentRoundFDStart) < coll_bufsize) {
 256                 currentRoundFDEnd = fd_end[myAggRank];
 257             }
 258             else
 259                 currentRoundFDEnd = currentRoundFDStart + coll_bufsize - 1;
 260 #ifdef p2pcontigtrace
 261             printf("currentRoundFDStart is %ld currentRoundFDEnd is %ld within file domeain %ld to %ld\n",currentRoundFDStart,currentRoundFDEnd,fd_start[myAggRank],fd_end[myAggRank]);
 262 #endif
 263         }
 264 
 265         int irecv,isend;
 266         int numSourceProcsSentData = 0;
 267 
 268         
 269         for (i=0;i<numSourceProcs;i++) {
 270             if ((remainingDataOffsetToGetPerProc[i] >= currentRoundFDStart) && (remainingDataOffsetToGetPerProc[i] <= currentRoundFDEnd)) {
 271                 if ((remainingDataOffsetToGetPerProc[i] + remainingDataAmountToGetPerProc[i]) <= currentRoundFDEnd)
 272                     dataSizeGottenThisRoundPerProc[i] = remainingDataAmountToGetPerProc[i];
 273                 else
 274                     dataSizeGottenThisRoundPerProc[i] = (currentRoundFDEnd - remainingDataOffsetToGetPerProc[i]) +1;
 275             }
 276             else if (((remainingDataOffsetToGetPerProc[i]+remainingDataAmountToGetPerProc[i]) >= currentRoundFDStart) && ((remainingDataOffsetToGetPerProc[i]+remainingDataAmountToGetPerProc[i]) <= currentRoundFDEnd)) {
 277                 if ((remainingDataOffsetToGetPerProc[i]) >= currentRoundFDStart)
 278                     dataSizeGottenThisRoundPerProc[i] = remainingDataAmountToGetPerProc[i];
 279                 else
 280                     dataSizeGottenThisRoundPerProc[i] = (remainingDataOffsetToGetPerProc[i]-currentRoundFDStart) +1;
 281             }
 282             else
 283                 dataSizeGottenThisRoundPerProc[i] = 0;
 284 
 285 #ifdef p2pcontigtrace
 286             printf("dataSizeGottenThisRoundPerProc[%d] set to %d - remainingDataOffsetToGetPerProc is %d remainingDataAmountToGetPerProc is %d currentRoundFDStart is %d currentRoundFDEnd is %d\n",i,dataSizeGottenThisRoundPerProc[i],remainingDataOffsetToGetPerProc[i],remainingDataAmountToGetPerProc[i],currentRoundFDStart,currentRoundFDEnd);
 287 #endif
 288             if (dataSizeGottenThisRoundPerProc[i] > 0) {
 289                 if (roundIter > 0) {
 290                     MPI_Isend(&dataSizeGottenThisRoundPerProc[i],1,MPI_INT,
 291                             sourceProcsForMyData[i],0,fd->comm,
 292                             &mpiSendDataSizeRequest[numSourceProcsSentData]);
 293                     numSourceProcsSentData++;
 294                 }
 295             }
 296         }
 297 
 298         int numDataSendToWaitFor = 0;
 299         
 300         for (i = 0; i < numTargetAggsThisRound; i++) {
 301 
 302                 
 303             if (roundIter > 0) {
 304 
 305                 MPI_Waitany(numTargetAggsThisRound,mpiSizeToSendRequest,
 306                         &irecv,&mpiWaitAnyStatusFromTargetAggs);
 307 
 308 #ifdef p2pcontigtrace
 309                 printf("irecv is %d amountOfDataReqestedByTargetAgg is %d bufferAmountsToSendThisRound is %d sendBufferOffsetsThisRound is %d targetAggsForMyData is %d\n",irecv,amountOfDataReqestedByTargetAgg[irecv], bufferAmountsToSendThisRound[irecv], sendBufferOffsetsThisRound[irecv],targetAggsForMyData[targetAggIndexesForMyDataThisRound[irecv]]);
 310 #endif
 311                 ADIOI_Assert(amountOfDataReqestedByTargetAgg[irecv] == bufferAmountsToSendThisRound[irecv]);
 312                 MPI_Isend(&((char*)buf)[sendBufferOffsetsThisRound[irecv]],
 313                         bufferAmountsToSendThisRound[irecv],MPI_BYTE,
 314                         targetAggsForMyData[targetAggIndexesForMyDataThisRound[irecv]],
 315                         0,fd->comm,&mpiSendDataToTargetAggRequest[irecv]);
 316 
 317             }
 318             else {
 319 #ifdef p2pcontigtrace
 320                 printf("i is %d bufferAmountsToSendThisRound is %d sendBufferOffsetsThisRound is %d targetAggsForMyData is %d\n",i, bufferAmountsToSendThisRound[i], sendBufferOffsetsThisRound[i],targetAggsForMyData[targetAggIndexesForMyDataThisRound[i]]);
 321 #endif
 322                 MPI_Isend(&((char*)buf)[sendBufferOffsetsThisRound[i]],bufferAmountsToSendThisRound[i],MPI_BYTE,
 323                         targetAggsForMyData[targetAggIndexesForMyDataThisRound[i]],0,fd->comm,&mpiSendDataToTargetAggRequest[i]);
 324             }
 325         numDataSendToWaitFor++;
 326         }
 327 
 328 #ifdef ROMIO_GPFS
 329         gpfsmpio_prof_cw[GPFSMPIO_CIO_T_DEXCH_SETUP] += (endTimeBase-startTimeBase);
 330         startTimeBase = MPI_Wtime();
 331 #endif
 332 
 333         
 334         int numDataRecvToWaitFor = 0;
 335         for (i=0;i<numSourceProcs;i++) {
 336 
 337             int currentWBOffset = 0;
 338             for (j=0;j<i;j++)
 339                 currentWBOffset += dataSizeGottenThisRoundPerProc[j];
 340 
 341             
 342             if (dataSizeGottenThisRoundPerProc[i] > 0) {
 343 #ifdef p2pcontigtrace
 344                 printf("receiving data from rank %d dataSizeGottenThisRoundPerProc is %d currentWBOffset is %d\n",sourceProcsForMyData[i],dataSizeGottenThisRoundPerProc[i],currentWBOffset);
 345 #endif
 346                 MPI_Irecv(&((char*)write_buf)[currentWBOffset],dataSizeGottenThisRoundPerProc[i],
 347                         MPI_BYTE,sourceProcsForMyData[i],0,
 348                         fd->comm,&mpiRecvDataRequest[numDataRecvToWaitFor]);
 349                 mpiRequestMapPerProc[numDataRecvToWaitFor] = i;
 350                 numDataRecvToWaitFor++;
 351             }
 352 
 353 #ifdef p2pcontigtrace
 354             printf("MPI_Irecv from rank %d\n",targetAggsForMyData[i]);
 355 #endif
 356         }
 357 
 358         int totalDataReceivedThisRound = 0;
 359         for (i = 0; i < numDataRecvToWaitFor; i++) {
 360             MPI_Waitany(numDataRecvToWaitFor,mpiRecvDataRequest,
 361                     &irecv,&mpiWaitAnyStatusFromSourceProcs);
 362             totalDataReceivedThisRound +=
 363                 dataSizeGottenThisRoundPerProc[mpiRequestMapPerProc[irecv]];
 364             totalAmountDataReceived +=
 365                 dataSizeGottenThisRoundPerProc[mpiRequestMapPerProc[irecv]];
 366 
 367 #ifdef p2pcontigtrace
 368             printf("numDataRecvToWaitFor is %d was sent %d bytes data for %d remaining bytes from rank %d irecv index %d\n",numDataRecvToWaitFor,dataSizeGottenThisRoundPerProc[mpiRequestMapPerProc[irecv]],remainingDataAmountToGetPerProc[mpiRequestMapPerProc[irecv]],sourceProcsForMyData[mpiRequestMapPerProc[irecv]],irecv);
 369 #endif
 370             remainingDataAmountToGetPerProc[mpiRequestMapPerProc[irecv]] -=
 371                 dataSizeGottenThisRoundPerProc[mpiRequestMapPerProc[irecv]];
 372             remainingDataOffsetToGetPerProc[mpiRequestMapPerProc[irecv]] +=
 373                 dataSizeGottenThisRoundPerProc[mpiRequestMapPerProc[irecv]];
 374 
 375         }
 376 
 377         
 378 
 379         for (i=0;i<numSourceProcsSentData;i++) {
 380            MPI_Waitany(numSourceProcsSentData,mpiSendDataSizeRequest,
 381                    &isend,&mpiIsendStatusForSize);
 382         }
 383 
 384 
 385 #ifdef ROMIO_GPFS
 386         endTimeBase = MPI_Wtime();
 387         gpfsmpio_prof_cw[GPFSMPIO_CIO_T_DEXCH_NET] += (endTimeBase-startTimeBase);
 388 #endif
 389         
 390         if (numDataRecvToWaitFor > 0) {
 391 
 392 #ifdef p2pcontigtrace
 393             printf("totalDataReceivedThisRound is %d\n",totalDataReceivedThisRound);
 394 #endif
 395             if (!useIOBuffer) {
 396 
 397                 ADIO_WriteContig(fd, write_buf, (int)totalDataReceivedThisRound,
 398                         MPI_BYTE, ADIO_EXPLICIT_OFFSET,
 399                         currentRoundFDStart, &status, error_code);
 400             } else { 
 401 
 402                 if(!pthread_equal(io_thread, pthread_self())) {
 403                     pthread_join(io_thread, &thread_ret);
 404                     *error_code = *(int *)thread_ret;
 405                     if (*error_code != MPI_SUCCESS) return;
 406                     io_thread = pthread_self();
 407 
 408                 }
 409                 io_thread_args.fd = fd;
 410                 
 411 
 412 
 413                 if (currentWriteBuf == 0) {
 414                     io_thread_args.buf = write_buf0;
 415                     currentWriteBuf = 1;
 416                     write_buf = write_buf1;
 417                 }
 418                 else {
 419                     io_thread_args.buf = write_buf1;
 420                     currentWriteBuf = 0;
 421                     write_buf = write_buf0;
 422                 }
 423                 io_thread_args.io_kind = ADIOI_WRITE;
 424                 io_thread_args.size = totalDataReceivedThisRound;
 425                 io_thread_args.offset = currentRoundFDStart;
 426                 io_thread_args.status = &status;
 427                 io_thread_args.error_code = *error_code;
 428                 if ( (pthread_create(&io_thread, NULL,
 429                                 ADIOI_IO_Thread_Func, &(io_thread_args))) != 0)
 430                     io_thread = pthread_self();
 431 
 432             }
 433 
 434         } 
 435 
 436         if (iAmUsedAgg)
 437             currentRoundFDStart += coll_bufsize;
 438         for (i = 0; i < numDataSendToWaitFor; i++) {
 439           MPI_Wait(&mpiSendDataToTargetAggRequest[i],
 440                   &mpiIsendStatusForData);
 441         }
 442 
 443     } 
 444 
 445 #ifdef ROMIO_GPFS
 446     endTimeBase = MPI_Wtime();
 447     gpfsmpio_prof_cw[GPFSMPIO_CIO_T_DEXCH] += (endTimeBase-startTimeBase);
 448 #endif
 449 
 450     if (useIOBuffer) { 
 451 
 452         if ( !pthread_equal(io_thread, pthread_self()) ) {
 453             pthread_join(io_thread, &thread_ret);
 454             *error_code = *(int *)thread_ret;
 455         }
 456 
 457     }
 458 
 459 
 460 
 461     if (iAmUsedAgg) {
 462         ADIOI_Free(sourceProcsForMyData);
 463         ADIOI_Free(remainingDataAmountToGetPerProc);
 464         ADIOI_Free(remainingDataOffsetToGetPerProc);
 465     }
 466 
 467     ADIOI_Free(targetAggsForMyData);
 468     ADIOI_Free(targetAggsForMyDataFDStart);
 469     ADIOI_Free(targetAggsForMyDataFDEnd);
 470     ADIOI_Free(targetAggIndexesForMyDataThisRound);
 471     ADIOI_Free(sendBufferOffsetsThisRound);
 472     ADIOI_Free(bufferAmountsToSendThisRound);
 473     ADIOI_Free(amountOfDataReqestedByTargetAgg);
 474     ADIOI_Free(mpiSizeToSendRequest);
 475     ADIOI_Free(mpiRecvDataRequest);
 476     ADIOI_Free(mpiSendDataSizeRequest);
 477     ADIOI_Free(mpiSendDataToTargetAggRequest);
 478     ADIOI_Free(dataSizeGottenThisRoundPerProc);
 479     ADIOI_Free(mpiRequestMapPerProc);
 480 
 481     
 482     MPI_Barrier(fd->comm);
 483     return;
 484 }
 485 
 486 void ADIOI_P2PContigReadAggregation(ADIO_File fd,
 487         const void *buf,
 488         int *error_code,
 489         ADIO_Offset *st_offsets,
 490         ADIO_Offset *end_offsets,
 491         ADIO_Offset *fd_start,
 492         ADIO_Offset* fd_end)
 493 {
 494 
 495     *error_code = MPI_SUCCESS; 
 496 
 497 #ifdef ROMIO_GPFS
 498     double startTimeBase,endTimeBase;
 499 #endif
 500 
 501     MPI_Status status;
 502     pthread_t io_thread;
 503     void *thread_ret;
 504     ADIOI_IO_ThreadFuncData io_thread_args;
 505 
 506 #ifdef ROMIO_GPFS
 507     startTimeBase = MPI_Wtime();
 508 #endif
 509 
 510     int nprocs,myrank;
 511     MPI_Comm_size(fd->comm, &nprocs);
 512     MPI_Comm_rank(fd->comm, &myrank);
 513 
 514     ADIO_Offset myOffsetStart = st_offsets[myrank], myOffsetEnd = end_offsets[myrank];
 515 
 516     int myAggRank = -1; 
 517     int iAmUsedAgg = 0;
 518 
 519     int naggs = fd->hints->cb_nodes;
 520     int coll_bufsize = fd->hints->cb_buffer_size;
 521 #ifdef ROMIO_GPFS
 522     if (gpfsmpio_pthreadio == 1)
 523         
 524         coll_bufsize = coll_bufsize/2;
 525 #endif
 526 
 527     int j;
 528     for (j=0;j<naggs;j++) {
 529         if (fd->hints->ranklist[j] == myrank) {
 530             myAggRank = j;
 531             if (fd_end[j] > fd_start[j]) {
 532                 iAmUsedAgg = 1;
 533             }
 534         }
 535     }
 536 
 537     
 538 
 539 
 540     int *sourceAggsForMyData = (int *)ADIOI_Malloc(naggs * sizeof(int));
 541     ADIO_Offset *sourceAggsForMyDataFDStart = (ADIO_Offset *)ADIOI_Malloc(naggs * sizeof(ADIO_Offset));
 542     ADIO_Offset *sourceAggsForMyDataFDEnd = (ADIO_Offset *)ADIOI_Malloc(naggs * sizeof(ADIO_Offset));
 543     int numSourceAggs = 0;
 544     int i;
 545     for (i=0;i<naggs;i++) {
 546         if ( ((myOffsetStart >= fd_start[i]) && (myOffsetStart <= fd_end[i])) ||
 547                 ((myOffsetEnd >= fd_start[i]) &&  (myOffsetEnd <= fd_end[i]))) {
 548             sourceAggsForMyData[numSourceAggs] = fd->hints->ranklist[i];
 549             sourceAggsForMyDataFDStart[numSourceAggs] = fd_start[i];
 550             sourceAggsForMyDataFDEnd[numSourceAggs] = fd_end[i];
 551             numSourceAggs++;
 552         }
 553     }
 554 
 555     
 556 
 557 
 558 
 559     int *targetProcsForMyData=NULL;
 560     int *remainingDataAmountToSendPerProc=NULL;
 561     ADIO_Offset *remainingDataOffsetToSendPerProc=NULL;
 562 
 563     int numTargetProcs = 0;
 564 
 565     if (iAmUsedAgg) {
 566         
 567 
 568         
 569         for (i=0;i<nprocs;i++)
 570             if ( ((st_offsets[i] >= fd_start[myAggRank]) &&
 571                         (st_offsets[i] <= fd_end[myAggRank])) ||
 572                     ((end_offsets[i] >= fd_start[myAggRank]) &&
 573                      (end_offsets[i] <= fd_end[myAggRank]))  )
 574                 numTargetProcs++;
 575 
 576         targetProcsForMyData =
 577             (int *)ADIOI_Malloc(numTargetProcs * sizeof(int));
 578         remainingDataAmountToSendPerProc =
 579             (int *)ADIOI_Malloc(numTargetProcs * sizeof(int));
 580         remainingDataOffsetToSendPerProc =
 581             (ADIO_Offset *)ADIOI_Malloc(numTargetProcs * sizeof(ADIO_Offset));
 582 
 583         
 584 
 585 
 586         numTargetProcs = 0;
 587         for (i=0;i<nprocs;i++) {
 588             if ( ((st_offsets[i] >= fd_start[myAggRank]) &&  (st_offsets[i] <= fd_end[myAggRank])) || ((end_offsets[i] >= fd_start[myAggRank]) &&  (end_offsets[i] <= fd_end[myAggRank]))) {
 589                 targetProcsForMyData[numTargetProcs] = i;
 590                 if ( ((st_offsets[i] >= fd_start[myAggRank]) &&  (st_offsets[i] <= fd_end[myAggRank])) && ((end_offsets[i] >= fd_start[myAggRank]) &&  (end_offsets[i] <= fd_end[myAggRank]))) {
 591                     remainingDataAmountToSendPerProc[numTargetProcs] = (end_offsets[i] - st_offsets[i])+1;
 592                     remainingDataOffsetToSendPerProc[numTargetProcs] = st_offsets[i];
 593                 }
 594                 else if ((st_offsets[i] >= fd_start[myAggRank]) &&  (st_offsets[i] <= fd_end[myAggRank])) {
 595                     remainingDataAmountToSendPerProc[numTargetProcs] = (fd_end[myAggRank] - st_offsets[i]) +1;
 596                     remainingDataOffsetToSendPerProc[numTargetProcs] = st_offsets[i];
 597                 }
 598                 else { 
 599                     remainingDataAmountToSendPerProc[numTargetProcs] = (end_offsets[i] - fd_start[myAggRank]) +1;
 600                     remainingDataOffsetToSendPerProc[numTargetProcs] = fd_start[myAggRank];
 601                 }
 602                 numTargetProcs++;
 603             }
 604         }
 605     }
 606 
 607 
 608     MPI_Request *mpiRecvDataFromSourceAggsRequest = (MPI_Request *) ADIOI_Malloc(numSourceAggs * sizeof(MPI_Request));
 609     MPI_Request *mpiSendDataToTargetProcRequest = (MPI_Request *) ADIOI_Malloc(numTargetProcs * sizeof(MPI_Request));
 610     MPI_Status mpiWaitAnyStatusFromSourceProcs,mpiIsendStatusForData;
 611 
 612     
 613 
 614     char *read_buf0 = fd->io_buf;
 615     char *read_buf1 = fd->io_buf + coll_bufsize;
 616     
 617     char *read_buf = read_buf0;
 618 
 619     
 620     ADIO_Offset numberOfRounds = (ADIO_Offset)((((ADIO_Offset)(end_offsets[nprocs-1]-st_offsets[0]))/((ADIO_Offset)((ADIO_Offset)coll_bufsize*(ADIO_Offset)naggs)))) + 1;
 621 
 622     ADIO_Offset currentRoundFDStart = 0, nextRoundFDStart = 0;
 623     ADIO_Offset currentRoundFDEnd = 0, nextRoundFDEnd = 0;
 624 
 625     if (iAmUsedAgg) {
 626         currentRoundFDStart = fd_start[myAggRank];
 627         nextRoundFDStart = fd_start[myAggRank];
 628     }
 629 
 630     int *dataSizeSentThisRoundPerProc = (int *)ADIOI_Malloc(numTargetProcs * sizeof(int));
 631     int *sourceAggIndexesForMyDataThisRound = (int *)ADIOI_Malloc(numSourceAggs * sizeof(int));
 632     int *recvBufferOffsetsThisRound = (int *)ADIOI_Malloc(numSourceAggs * sizeof(int));
 633     int *bufferAmountsToGetThisRound = (int *)ADIOI_Malloc(numSourceAggs * sizeof(int));
 634     *error_code = MPI_SUCCESS;
 635 
 636     int currentReadBuf = 0;
 637     int useIOBuffer = 0;
 638 #ifdef ROMIO_GPFS
 639     if (gpfsmpio_pthreadio && (numberOfRounds>1)) {
 640         useIOBuffer = 1;
 641         io_thread = pthread_self();
 642     }
 643 #endif
 644 
 645 #ifdef ROMIO_GPFS
 646     endTimeBase = MPI_Wtime();
 647     gpfsmpio_prof_cw[GPFSMPIO_CIO_T_MYREQ] += (endTimeBase-startTimeBase);
 648 #endif
 649 
 650 
 651     
 652     int roundIter;
 653     for (roundIter=0;roundIter<numberOfRounds;roundIter++) {
 654 
 655         int irecv,isend;
 656         
 657         if (iAmUsedAgg) {
 658 
 659             currentRoundFDStart = nextRoundFDStart;
 660 
 661             if (!useIOBuffer || (roundIter == 0)) {
 662                 int amountDataToReadThisRound;
 663                 if ((fd_end[myAggRank] - currentRoundFDStart) < coll_bufsize) {
 664                     currentRoundFDEnd = fd_end[myAggRank];
 665                     amountDataToReadThisRound = ((currentRoundFDEnd-currentRoundFDStart)+1);
 666                 }
 667                 else {
 668                     currentRoundFDEnd = currentRoundFDStart + coll_bufsize - 1;
 669                     amountDataToReadThisRound = coll_bufsize;
 670                 }
 671 
 672                 
 673                 ADIO_ReadContig(fd, read_buf,amountDataToReadThisRound,
 674                         MPI_BYTE, ADIO_EXPLICIT_OFFSET, currentRoundFDStart,
 675                         &status, error_code);
 676         currentReadBuf = 1;
 677 
 678 #ifdef ROMIO_GPFS
 679                 endTimeBase = MPI_Wtime();
 680 #endif
 681             }
 682 
 683             if (useIOBuffer) { 
 684                 
 685 
 686                 if (roundIter > 0)
 687                     currentRoundFDEnd = nextRoundFDEnd;
 688 
 689                 if (roundIter < (numberOfRounds-1)) {
 690                     nextRoundFDStart += coll_bufsize;
 691                     int amountDataToReadNextRound;
 692                     if ((fd_end[myAggRank] - nextRoundFDStart) < coll_bufsize) {
 693                         nextRoundFDEnd = fd_end[myAggRank];
 694                         amountDataToReadNextRound = ((nextRoundFDEnd-nextRoundFDStart)+1);
 695                     }
 696                     else {
 697                         nextRoundFDEnd = nextRoundFDStart + coll_bufsize - 1;
 698                         amountDataToReadNextRound = coll_bufsize;
 699                     }
 700 
 701                     if(!pthread_equal(io_thread, pthread_self())) {
 702                         pthread_join(io_thread, &thread_ret);
 703                         *error_code = *(int *)thread_ret;
 704                         if (*error_code != MPI_SUCCESS) return;
 705                         io_thread = pthread_self();
 706 
 707                     }
 708                     io_thread_args.fd = fd;
 709                     
 710 
 711 
 712                     if (currentReadBuf == 0) {
 713                         io_thread_args.buf = read_buf0;
 714                         currentReadBuf = 1;
 715                         read_buf = read_buf1;
 716                     }
 717                     else {
 718                         io_thread_args.buf = read_buf1;
 719                         currentReadBuf = 0;
 720                         read_buf = read_buf0;
 721                     }
 722                     io_thread_args.io_kind = ADIOI_READ;
 723                     io_thread_args.size = amountDataToReadNextRound;
 724                     io_thread_args.offset = nextRoundFDStart;
 725                     io_thread_args.status = &status;
 726                     io_thread_args.error_code = *error_code;
 727                     if ( (pthread_create(&io_thread, NULL,
 728                                     ADIOI_IO_Thread_Func, &(io_thread_args))) != 0)
 729                         io_thread = pthread_self();
 730 
 731                 }
 732                 else { 
 733 
 734                     if(!pthread_equal(io_thread, pthread_self())) {
 735                         pthread_join(io_thread, &thread_ret);
 736                         *error_code = *(int *)thread_ret;
 737                         if (*error_code != MPI_SUCCESS) return;
 738                         io_thread = pthread_self();
 739 
 740                     }
 741                     if (currentReadBuf == 0) {
 742                         read_buf = read_buf1;
 743                     }
 744                     else {
 745                         read_buf = read_buf0;
 746                     }
 747 
 748                 }
 749             } 
 750         } 
 751 
 752         
 753 
 754         int numSourceAggsThisRound = 0;
 755         for (i=0;i<numSourceAggs;i++) {
 756             if ( ((myOffsetStart >= sourceAggsForMyDataFDStart[i]) && (myOffsetStart <= sourceAggsForMyDataFDEnd[i]))
 757                     || ((myOffsetEnd >= sourceAggsForMyDataFDStart[i]) && (myOffsetEnd <= sourceAggsForMyDataFDEnd[i])) ) {
 758                 
 759 
 760 
 761                 
 762 
 763                 ADIO_Offset currentRoundFDStartForMySourceAgg =
 764                     (ADIO_Offset)((ADIO_Offset)sourceAggsForMyDataFDStart[i] +
 765                             (ADIO_Offset)((ADIO_Offset)roundIter*(ADIO_Offset)coll_bufsize));
 766                 ADIO_Offset currentRoundFDEndForMySourceAgg =
 767                     (ADIO_Offset)((ADIO_Offset)sourceAggsForMyDataFDStart[i] +
 768                             (ADIO_Offset)((ADIO_Offset)(roundIter+1)*(ADIO_Offset)coll_bufsize) - (ADIO_Offset)1);
 769                 if (currentRoundFDEndForMySourceAgg > sourceAggsForMyDataFDEnd[i])
 770                     currentRoundFDEndForMySourceAgg = sourceAggsForMyDataFDEnd[i];
 771 
 772 #ifdef p2pcontigtrace
 773                 printf("roundIter %d source iter %d sourceAggsForMyData is %d myOffsetStart is %ld myOffsetEnd is %ld sourceAggsForMyDataFDStart is %ld sourceAggsForMyDataFDEnd is %ld currentRoundFDStartForMySourceAgg is %ld currentRoundFDEndForMySourceAgg is %ld\n",roundIter,i,sourceAggsForMyData[i],myOffsetStart,myOffsetEnd,sourceAggsForMyDataFDStart[i],sourceAggsForMyDataFDEnd[i],currentRoundFDStartForMySourceAgg,currentRoundFDEndForMySourceAgg);
 774 #endif
 775 
 776                 
 777                 
 778                 int recvBufferOffset = 0;
 779                 int bufferAmountToGet = 0;
 780 
 781                 if ((myOffsetStart >= currentRoundFDStartForMySourceAgg) && (myOffsetStart <= currentRoundFDEndForMySourceAgg)) {
 782                     if (myOffsetEnd > currentRoundFDEndForMySourceAgg)
 783                         bufferAmountToGet = (currentRoundFDEndForMySourceAgg - myOffsetStart) +1;
 784                     else
 785                         bufferAmountToGet = (myOffsetEnd - myOffsetStart) +1;
 786                 }
 787                 else if ((myOffsetEnd >= currentRoundFDStartForMySourceAgg) && (myOffsetEnd <= currentRoundFDEndForMySourceAgg)) {
 788                     recvBufferOffset = (int) (currentRoundFDStartForMySourceAgg - myOffsetStart);
 789                     if (myOffsetEnd > currentRoundFDEndForMySourceAgg)
 790                         bufferAmountToGet = (currentRoundFDEndForMySourceAgg - currentRoundFDStartForMySourceAgg) +1;
 791                     else
 792                         bufferAmountToGet = (myOffsetEnd - currentRoundFDStartForMySourceAgg) +1;
 793                 }
 794                 else if ((myOffsetStart <= currentRoundFDStartForMySourceAgg) && (myOffsetEnd >= currentRoundFDEndForMySourceAgg)) {
 795                     recvBufferOffset = (int) (currentRoundFDStartForMySourceAgg - myOffsetStart);
 796                     bufferAmountToGet = (currentRoundFDEndForMySourceAgg - currentRoundFDStartForMySourceAgg) +1;
 797                 }
 798 
 799 
 800                 if (bufferAmountToGet > 0) { 
 801                     sourceAggIndexesForMyDataThisRound[numSourceAggsThisRound] = i;
 802                     recvBufferOffsetsThisRound[numSourceAggsThisRound] = recvBufferOffset;
 803                     bufferAmountsToGetThisRound[numSourceAggsThisRound] = bufferAmountToGet;
 804 #ifdef p2pcontigtrace
 805                     printf("bufferAmountToGet is %d recvBufferOffset is %d\n",bufferAmountToGet,recvBufferOffset);
 806 #endif
 807                     numSourceAggsThisRound++;
 808                 }
 809             }
 810         }
 811 
 812         
 813 
 814         for (i=0;i<numTargetProcs;i++) {
 815             if ((remainingDataOffsetToSendPerProc[i] >= currentRoundFDStart) &&
 816                     (remainingDataOffsetToSendPerProc[i] <= currentRoundFDEnd)) {
 817                 if ((remainingDataOffsetToSendPerProc[i] +
 818                             remainingDataAmountToSendPerProc[i]) <= currentRoundFDEnd)
 819                     dataSizeSentThisRoundPerProc[i] = remainingDataAmountToSendPerProc[i];
 820                 else
 821                     dataSizeSentThisRoundPerProc[i] =
 822                         (currentRoundFDEnd - remainingDataOffsetToSendPerProc[i]) +1;
 823             }
 824             else if (((remainingDataOffsetToSendPerProc[i]+
 825                             remainingDataAmountToSendPerProc[i]) >=
 826                         currentRoundFDStart) &&
 827                     ((remainingDataOffsetToSendPerProc[i]+
 828                       remainingDataAmountToSendPerProc[i]) <= currentRoundFDEnd)) {
 829                 if ((remainingDataOffsetToSendPerProc[i]) >= currentRoundFDStart)
 830                     dataSizeSentThisRoundPerProc[i] = remainingDataAmountToSendPerProc[i];
 831                 else
 832                     dataSizeSentThisRoundPerProc[i] =
 833                         (remainingDataOffsetToSendPerProc[i]-currentRoundFDStart) +1;
 834             }
 835             else
 836                 dataSizeSentThisRoundPerProc[i] = 0;
 837 
 838         }
 839 
 840         
 841         for (i = 0; i < numSourceAggsThisRound; i++) {
 842             MPI_Irecv(&((char*)buf)[recvBufferOffsetsThisRound[i]],
 843                     bufferAmountsToGetThisRound[i],MPI_BYTE,
 844                     sourceAggsForMyData[sourceAggIndexesForMyDataThisRound[i]],0,fd->comm,
 845                     &mpiRecvDataFromSourceAggsRequest[i]);
 846         }
 847 
 848         
 849         int numTargetProcsSentThisRound = 0;
 850         for (i=0;i<numTargetProcs;i++) {
 851 
 852             int currentWBOffset = 0;
 853             for (j=0;j<i;j++)
 854                 currentWBOffset += dataSizeSentThisRoundPerProc[j];
 855 
 856             
 857             if (dataSizeSentThisRoundPerProc[i] > 0) {
 858                 MPI_Isend(&((char*)read_buf)[currentWBOffset],
 859                         dataSizeSentThisRoundPerProc[i],
 860                         MPI_BYTE,targetProcsForMyData[i],0,
 861                         fd->comm,&mpiSendDataToTargetProcRequest[numTargetProcsSentThisRound]);
 862                 numTargetProcsSentThisRound++;
 863                 remainingDataAmountToSendPerProc[i] -= dataSizeSentThisRoundPerProc[i];
 864                 remainingDataOffsetToSendPerProc[i] += dataSizeSentThisRoundPerProc[i];
 865             }
 866         }
 867 
 868         
 869         for (i = 0; i < numSourceAggsThisRound; i++) {
 870             MPI_Waitany(numSourceAggsThisRound,mpiRecvDataFromSourceAggsRequest,
 871                     &irecv,&mpiWaitAnyStatusFromSourceProcs);
 872         }
 873 
 874         nextRoundFDStart = currentRoundFDStart + coll_bufsize;
 875 
 876         
 877         for (i=0;i<numTargetProcsSentThisRound;i++) {
 878           MPI_Waitany(numTargetProcsSentThisRound,mpiSendDataToTargetProcRequest,
 879                   &isend,&mpiIsendStatusForData);
 880         }
 881 
 882         MPI_Barrier(fd->comm); 
 883 
 884     } 
 885 
 886     if (useIOBuffer) { 
 887 
 888         if ( !pthread_equal(io_thread, pthread_self()) ) {
 889             pthread_join(io_thread, &thread_ret);
 890             *error_code = *(int *)thread_ret;
 891         }
 892     }
 893 
 894     if (iAmUsedAgg) {
 895         ADIOI_Free(targetProcsForMyData);
 896         ADIOI_Free(remainingDataAmountToSendPerProc);
 897         ADIOI_Free(remainingDataOffsetToSendPerProc);
 898     }
 899 
 900     ADIOI_Free(sourceAggsForMyData);
 901     ADIOI_Free(sourceAggsForMyDataFDStart);
 902     ADIOI_Free(sourceAggsForMyDataFDEnd);
 903 
 904     ADIOI_Free(mpiRecvDataFromSourceAggsRequest);
 905     ADIOI_Free(mpiSendDataToTargetProcRequest);
 906     ADIOI_Free(dataSizeSentThisRoundPerProc);
 907     ADIOI_Free(sourceAggIndexesForMyDataThisRound);
 908     ADIOI_Free(recvBufferOffsetsThisRound);
 909     ADIOI_Free(bufferAmountsToGetThisRound);
 910 
 911     
 912     MPI_Barrier(fd->comm);
 913 
 914     return;
 915 
 916 }