root/ompi/mca/io/romio321/romio/adio/common/p2p_aggregation.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. ADIOI_P2PContigWriteAggregation
  2. ADIOI_P2PContigReadAggregation

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
   2 /*
   3  *  (C) 2014 by Argonne National Laboratory.
   4  *      See COPYRIGHT in top-level directory.
   5  */
   6 
   7 #include "adio.h"
   8 #include "adio_extern.h"
   9 #include "../ad_gpfs/ad_gpfs_tuning.h"
  10 
  11 #include <pthread.h>
  12 
  13 /* #define p2pcontigtrace 1 */
  14 
  15 void ADIOI_P2PContigWriteAggregation(ADIO_File fd,
  16         const void *buf,
  17         int *error_code,
  18         ADIO_Offset *st_offsets,
  19         ADIO_Offset *end_offsets,
  20         ADIO_Offset *fd_start,
  21         ADIO_Offset* fd_end)
  22 {
  23 
  24     *error_code = MPI_SUCCESS; /* initialize to success */
  25 
  26 #ifdef ROMIO_GPFS
  27     double startTimeBase,endTimeBase;
  28 #endif
  29 
  30     MPI_Status status;
  31     pthread_t io_thread;
  32     void *thread_ret;
  33     ADIOI_IO_ThreadFuncData io_thread_args;
  34 
  35     int nprocs,myrank;
  36     MPI_Comm_size(fd->comm, &nprocs);
  37     MPI_Comm_rank(fd->comm, &myrank);
  38 
  39         ADIO_Offset myOffsetStart = st_offsets[myrank], myOffsetEnd = end_offsets[myrank];
  40 
  41     int myAggRank = -1; /* if I am an aggregor this is my index into fd->hints->ranklist */
  42     int iAmUsedAgg = 0;
  43 
  44 #ifdef ROMIO_GPFS
  45     startTimeBase = MPI_Wtime();
  46 #endif
  47 
  48     int naggs = fd->hints->cb_nodes;
  49     int coll_bufsize = fd->hints->cb_buffer_size;
  50 #ifdef ROMIO_GPFS
  51     if (gpfsmpio_pthreadio == 1) {
  52         /* split buffer in half for a kind of double buffering with the threads*/
  53         coll_bufsize = fd->hints->cb_buffer_size/2;
  54     }
  55 #endif
  56 
  57     int j;
  58     for (j=0;j<naggs;j++) {
  59         if (fd->hints->ranklist[j] == myrank) {
  60             myAggRank = j;
  61             if (fd_end[j] > fd_start[j]) {
  62                 iAmUsedAgg = 1;
  63             }
  64         }
  65     }
  66 
  67     /* Determine how much data and to whom I need to send.  For source proc
  68      * targets, also determine the target file domain offsets locally to
  69      * reduce communication overhead */
  70     int *targetAggsForMyData = (int *)ADIOI_Malloc(naggs * sizeof(int));
  71     ADIO_Offset *targetAggsForMyDataFDStart = (ADIO_Offset *)ADIOI_Malloc(naggs * sizeof(ADIO_Offset));
  72     ADIO_Offset *targetAggsForMyDataFDEnd = (ADIO_Offset *)ADIOI_Malloc(naggs * sizeof(ADIO_Offset));
  73     int numTargetAggs = 0;
  74     int i;
  75     for (i=0;i<naggs;i++) {
  76         if ( ((myOffsetStart >= fd_start[i]) &&  (myOffsetStart <= fd_end[i])) || ((myOffsetEnd >= fd_start[i]) &&  (myOffsetEnd <= fd_end[i]))) {
  77             targetAggsForMyData[numTargetAggs] = fd->hints->ranklist[i];
  78             targetAggsForMyDataFDStart[numTargetAggs] = fd_start[i];
  79             targetAggsForMyDataFDEnd[numTargetAggs] = fd_end[i];
  80             numTargetAggs++;
  81         }
  82     }
  83 
  84     /* these 3 arrays track info on the procs that feed an aggregtor */
  85     int *sourceProcsForMyData=NULL;
  86     int *remainingDataAmountToGetPerProc=NULL;
  87     ADIO_Offset *remainingDataOffsetToGetPerProc=NULL;
  88 
  89     int numSourceProcs = 0;
  90 
  91     if (iAmUsedAgg) { /* for the used aggregators figure out how much data I
  92                          need from what procs */
  93 
  94         /* count numSourceProcs so we know how large to make the arrays */
  95         for (i=0;i<nprocs;i++)
  96             if ( ((st_offsets[i] >= fd_start[myAggRank]) &&  (st_offsets[i] <= fd_end[myAggRank])) || ((end_offsets[i] >= fd_start[myAggRank]) &&  (end_offsets[i] <= fd_end[myAggRank])))
  97                 numSourceProcs++;
  98 
  99         sourceProcsForMyData = (int *)ADIOI_Malloc(numSourceProcs * sizeof(int));
 100         remainingDataAmountToGetPerProc = (int *)ADIOI_Malloc(numSourceProcs * sizeof(int));
 101         remainingDataOffsetToGetPerProc = (ADIO_Offset *)ADIOI_Malloc(numSourceProcs * sizeof(ADIO_Offset));
 102 
 103         /* everybody has the st_offsets and end_offsets for all ranks so if I am a
 104          * used aggregator go thru them and figure out which ranks have data that
 105          * falls into my file domain assigned to me */
 106         numSourceProcs = 0;
 107         for (i=0;i<nprocs;i++) {
 108             if ( ((st_offsets[i] >= fd_start[myAggRank]) &&  (st_offsets[i] <= fd_end[myAggRank])) || ((end_offsets[i] >= fd_start[myAggRank]) &&  (end_offsets[i] <= fd_end[myAggRank]))) {
 109                 sourceProcsForMyData[numSourceProcs] = i;
 110                 if ( ((st_offsets[i] >= fd_start[myAggRank]) &&  (st_offsets[i] <= fd_end[myAggRank])) && ((end_offsets[i] >= fd_start[myAggRank]) &&  (end_offsets[i] <= fd_end[myAggRank]))) {
 111                     remainingDataAmountToGetPerProc[numSourceProcs] = (end_offsets[i] - st_offsets[i])+1;
 112                     remainingDataOffsetToGetPerProc[numSourceProcs] = st_offsets[i];
 113                 }
 114                 else if ((st_offsets[i] >= fd_start[myAggRank]) &&  (st_offsets[i] <= fd_end[myAggRank])) {/* starts in this fd and goes past it */
 115                     remainingDataAmountToGetPerProc[numSourceProcs] = (fd_end[myAggRank] - st_offsets[i]) +1;
 116                     remainingDataOffsetToGetPerProc[numSourceProcs] = st_offsets[i];
 117                 }
 118                 else { /* starts in fd before this and ends in it */
 119                     remainingDataAmountToGetPerProc[numSourceProcs] = (end_offsets[i] - fd_start[myAggRank]) +1;
 120                     remainingDataOffsetToGetPerProc[numSourceProcs] = fd_start[myAggRank];
 121                 }
 122 #ifdef p2pcontigtrace
 123                 printf("getting %ld bytes from source proc %d in fd rank %d with borders %ld to %ld\n",remainingDataAmountToGetPerProc[numSourceProcs],i,fd->hints->ranklist[myAggRank],fd_start[myAggRank],fd_end[myAggRank]);
 124 #endif
 125                 numSourceProcs++;
 126             }
 127         }
 128     }
 129 
 130     int *amountOfDataReqestedByTargetAgg = (int *)ADIOI_Malloc(naggs * sizeof(int));
 131     for (i=0;i<numTargetAggs;i++) {
 132         amountOfDataReqestedByTargetAgg[i] = 0;
 133     }
 134 
 135     int totalAmountDataReceived = 0;
 136     MPI_Request *mpiSizeToSendRequest = (MPI_Request *) ADIOI_Malloc(numTargetAggs * sizeof(MPI_Request));
 137     MPI_Request *mpiRecvDataRequest = (MPI_Request *) ADIOI_Malloc(numSourceProcs * sizeof(MPI_Request));
 138     MPI_Request *mpiSendDataSizeRequest = (MPI_Request *) ADIOI_Malloc(numSourceProcs * sizeof(MPI_Request));
 139 
 140     MPI_Request *mpiSendDataToTargetAggRequest = (MPI_Request *) ADIOI_Malloc(numTargetAggs * sizeof(MPI_Request));
 141     MPI_Status mpiWaitAnyStatusFromTargetAggs,mpiWaitAnyStatusFromSourceProcs;
 142     MPI_Status mpiIsendStatusForSize,  mpiIsendStatusForData;
 143 
 144     /* use the write buffer allocated in the file_open */
 145     char *write_buf0 = fd->io_buf;
 146     char *write_buf1 = fd->io_buf + coll_bufsize;
 147 
 148     /* start off pointing to the first buffer. If we use the 2nd buffer (threaded
 149      * case) we'll swap later */
 150     char *write_buf = write_buf0;
 151 
 152     /* compute number of rounds */
 153     ADIO_Offset numberOfRounds = (ADIO_Offset)((((ADIO_Offset)(end_offsets[nprocs-1]-st_offsets[0]))/((ADIO_Offset)((ADIO_Offset)coll_bufsize*(ADIO_Offset)naggs)))) + 1;
 154 
 155     int currentWriteBuf = 0;
 156     int useIOBuffer = 0;
 157 #ifdef ROMIO_GPFS
 158     if (gpfsmpio_pthreadio && (numberOfRounds>1)) {
 159         useIOBuffer = 1;
 160         io_thread = pthread_self();
 161     }
 162 #endif
 163 
 164     ADIO_Offset currentRoundFDStart = 0;
 165     ADIO_Offset currentRoundFDEnd = 0;
 166 
 167     if (iAmUsedAgg) {
 168         currentRoundFDStart = fd_start[myAggRank];
 169     }
 170 
 171     int *dataSizeGottenThisRoundPerProc = (int *)ADIOI_Malloc(numSourceProcs * sizeof(int));
 172     int *mpiRequestMapPerProc = (int *)ADIOI_Malloc(numSourceProcs * sizeof(int));
 173     int *targetAggIndexesForMyDataThisRound = (int *)ADIOI_Malloc(numTargetAggs * sizeof(int));
 174     int *sendBufferOffsetsThisRound = (int *)ADIOI_Malloc(numTargetAggs * sizeof(int));
 175     int *bufferAmountsToSendThisRound = (int *)ADIOI_Malloc(numTargetAggs * sizeof(int));
 176 
 177 #ifdef ROMIO_GPFS
 178     endTimeBase = MPI_Wtime();
 179     gpfsmpio_prof_cw[GPFSMPIO_CIO_T_MYREQ] += (endTimeBase-startTimeBase);
 180     startTimeBase = MPI_Wtime();
 181 #endif
 182 
 183     /* each iteration of this loop writes a coll_bufsize portion of the file
 184      * domain */
 185     int roundIter;
 186     for (roundIter=0;roundIter<numberOfRounds;roundIter++) {
 187 
 188         /* determine what target aggs I need to send data to this round */
 189         int numTargetAggsThisRound = 0;
 190         for (i=0;i<numTargetAggs;i++) {
 191             if ( ((myOffsetStart >= targetAggsForMyDataFDStart[i]) && (myOffsetStart <= targetAggsForMyDataFDEnd[i])) ||
 192                     ((myOffsetEnd >= targetAggsForMyDataFDStart[i]) && (myOffsetEnd <= targetAggsForMyDataFDEnd[i]))) {
 193                 /* we know that we need to send data to this target agg at some point, now need to figure out how much this round */
 194 
 195                 /* here are the offsets currently being collected by the aggregator during this round */
 196                 ADIO_Offset currentRoundFDStartForMyTargetAgg = (ADIO_Offset)((ADIO_Offset)targetAggsForMyDataFDStart[i] + (ADIO_Offset)((ADIO_Offset)roundIter*(ADIO_Offset)coll_bufsize));
 197                 ADIO_Offset currentRoundFDEndForMyTargetAgg = (ADIO_Offset)((ADIO_Offset)targetAggsForMyDataFDStart[i] + (ADIO_Offset)((ADIO_Offset)(roundIter+1)*(ADIO_Offset)coll_bufsize) - (ADIO_Offset)1);
 198                 if (currentRoundFDEndForMyTargetAgg > targetAggsForMyDataFDEnd[i])
 199                     currentRoundFDEndForMyTargetAgg = targetAggsForMyDataFDEnd[i];
 200 
 201 #ifdef p2pcontigtrace
 202                 printf("roundIter %d target iter %d targetAggsForMyData is %d myOffsetStart is %ld myOffsetEnd is %ld targetAggsForMyDataFDStart is %ld targetAggsForMyDataFDEnd is %ld currentRoundFDStartForMyTargetAgg is %ld currentRoundFDEndForMyTargetAgg is %ld\n",
 203                         roundIter,i,targetAggsForMyData[i],myOffsetStart,myOffsetEnd,
 204                         targetAggsForMyDataFDStart[i],targetAggsForMyDataFDEnd[i],
 205                         currentRoundFDStartForMyTargetAgg,currentRoundFDEndForMyTargetAgg);
 206 #endif
 207 
 208                 /* send the portion of my data that is within
 209                  * currentRoundFDStartForMyTargetAgg to
 210                  * currentRoundFDEndForMyTargetAgg */
 211                 /* find the offset into the send buffer and the amount
 212                  * of data to send */
 213                 int sendBufferOffset = 0;
 214                 int bufferAmountToSend = 0;
 215 
 216                 if ((myOffsetStart >= currentRoundFDStartForMyTargetAgg) && (myOffsetStart <= currentRoundFDEndForMyTargetAgg)) {
 217                     if (myOffsetEnd > currentRoundFDEndForMyTargetAgg)
 218                         bufferAmountToSend = (currentRoundFDEndForMyTargetAgg - myOffsetStart) +1;
 219                     else
 220                         bufferAmountToSend = (myOffsetEnd - myOffsetStart) +1;
 221                 }
 222                 else if ((myOffsetEnd >= currentRoundFDStartForMyTargetAgg) && (myOffsetEnd <= currentRoundFDEndForMyTargetAgg)) {
 223                     sendBufferOffset = (int) (currentRoundFDStartForMyTargetAgg - myOffsetStart);
 224                     if (myOffsetEnd > currentRoundFDEndForMyTargetAgg)
 225                         bufferAmountToSend = (currentRoundFDEndForMyTargetAgg - currentRoundFDStartForMyTargetAgg) +1;
 226                     else
 227                         bufferAmountToSend = (myOffsetEnd - currentRoundFDStartForMyTargetAgg) +1;
 228                 }
 229                 else if ((myOffsetStart <= currentRoundFDStartForMyTargetAgg) && (myOffsetEnd >= currentRoundFDEndForMyTargetAgg)) {
 230                     sendBufferOffset = (int) (currentRoundFDStartForMyTargetAgg - myOffsetStart);
 231                     bufferAmountToSend = (currentRoundFDEndForMyTargetAgg - currentRoundFDStartForMyTargetAgg) +1;
 232                 }
 233 
 234                 if (bufferAmountToSend > 0) { /* we have data to send this round */
 235                     targetAggIndexesForMyDataThisRound[numTargetAggsThisRound] = i;
 236                     sendBufferOffsetsThisRound[numTargetAggsThisRound] = sendBufferOffset;
 237                     bufferAmountsToSendThisRound[numTargetAggsThisRound] = bufferAmountToSend;
 238 #ifdef p2pcontigtrace
 239                     printf("bufferAmountToSend is %d sendBufferOffset is %d\n",bufferAmountToSend,sendBufferOffset);
 240 #endif
 241                     /* only need to be pinged by the agg for rounds after the first one - for the first one just
 242                      * send the data without being pinged */
 243                     if (roundIter > 0)
 244                         MPI_Irecv(&amountOfDataReqestedByTargetAgg[numTargetAggsThisRound],1,
 245                                 MPI_INT,targetAggsForMyData[i],0,
 246                                 fd->comm,&mpiSizeToSendRequest[numTargetAggsThisRound]);
 247                     numTargetAggsThisRound++;
 248 
 249                 }
 250             }
 251         }
 252 
 253         /* determine what offsets define the portion of the file domain the agg is writing this round */
 254         if (iAmUsedAgg) {
 255             if ((fd_end[myAggRank] - currentRoundFDStart) < coll_bufsize) {
 256                 currentRoundFDEnd = fd_end[myAggRank];
 257             }
 258             else
 259                 currentRoundFDEnd = currentRoundFDStart + coll_bufsize - 1;
 260 #ifdef p2pcontigtrace
 261             printf("currentRoundFDStart is %ld currentRoundFDEnd is %ld within file domeain %ld to %ld\n",currentRoundFDStart,currentRoundFDEnd,fd_start[myAggRank],fd_end[myAggRank]);
 262 #endif
 263         }
 264 
 265         int irecv,isend;
 266         int numSourceProcsSentData = 0;
 267 
 268         /* the aggs send the amount of data they need to their source procs */
 269         for (i=0;i<numSourceProcs;i++) {
 270             if ((remainingDataOffsetToGetPerProc[i] >= currentRoundFDStart) && (remainingDataOffsetToGetPerProc[i] <= currentRoundFDEnd)) {
 271                 if ((remainingDataOffsetToGetPerProc[i] + remainingDataAmountToGetPerProc[i]) <= currentRoundFDEnd)
 272                     dataSizeGottenThisRoundPerProc[i] = remainingDataAmountToGetPerProc[i];
 273                 else
 274                     dataSizeGottenThisRoundPerProc[i] = (currentRoundFDEnd - remainingDataOffsetToGetPerProc[i]) +1;
 275             }
 276             else if (((remainingDataOffsetToGetPerProc[i]+remainingDataAmountToGetPerProc[i]) >= currentRoundFDStart) && ((remainingDataOffsetToGetPerProc[i]+remainingDataAmountToGetPerProc[i]) <= currentRoundFDEnd)) {
 277                 if ((remainingDataOffsetToGetPerProc[i]) >= currentRoundFDStart)
 278                     dataSizeGottenThisRoundPerProc[i] = remainingDataAmountToGetPerProc[i];
 279                 else
 280                     dataSizeGottenThisRoundPerProc[i] = (remainingDataOffsetToGetPerProc[i]-currentRoundFDStart) +1;
 281             }
 282             else
 283                 dataSizeGottenThisRoundPerProc[i] = 0;
 284 
 285 #ifdef p2pcontigtrace
 286             printf("dataSizeGottenThisRoundPerProc[%d] set to %d - remainingDataOffsetToGetPerProc is %d remainingDataAmountToGetPerProc is %d currentRoundFDStart is %d currentRoundFDEnd is %d\n",i,dataSizeGottenThisRoundPerProc[i],remainingDataOffsetToGetPerProc[i],remainingDataAmountToGetPerProc[i],currentRoundFDStart,currentRoundFDEnd);
 287 #endif
 288             if (dataSizeGottenThisRoundPerProc[i] > 0) {
 289                 if (roundIter > 0) {
 290                     MPI_Isend(&dataSizeGottenThisRoundPerProc[i],1,MPI_INT,
 291                             sourceProcsForMyData[i],0,fd->comm,
 292                             &mpiSendDataSizeRequest[numSourceProcsSentData]);
 293                     numSourceProcsSentData++;
 294                 }
 295             }
 296         }
 297 
 298         int numDataSendToWaitFor = 0;
 299         /* the source procs send the requested data to the aggs */
 300         for (i = 0; i < numTargetAggsThisRound; i++) {
 301 
 302                 /* the source procs aren't pinged by the target aggs on the first round */
 303             if (roundIter > 0) {
 304 
 305                 MPI_Waitany(numTargetAggsThisRound,mpiSizeToSendRequest,
 306                         &irecv,&mpiWaitAnyStatusFromTargetAggs);
 307 
 308 #ifdef p2pcontigtrace
 309                 printf("irecv is %d amountOfDataReqestedByTargetAgg is %d bufferAmountsToSendThisRound is %d sendBufferOffsetsThisRound is %d targetAggsForMyData is %d\n",irecv,amountOfDataReqestedByTargetAgg[irecv], bufferAmountsToSendThisRound[irecv], sendBufferOffsetsThisRound[irecv],targetAggsForMyData[targetAggIndexesForMyDataThisRound[irecv]]);
 310 #endif
 311                 ADIOI_Assert(amountOfDataReqestedByTargetAgg[irecv] == bufferAmountsToSendThisRound[irecv]);
 312                 MPI_Isend(&((char*)buf)[sendBufferOffsetsThisRound[irecv]],
 313                         bufferAmountsToSendThisRound[irecv],MPI_BYTE,
 314                         targetAggsForMyData[targetAggIndexesForMyDataThisRound[irecv]],
 315                         0,fd->comm,&mpiSendDataToTargetAggRequest[irecv]);
 316 
 317             }
 318             else {
 319 #ifdef p2pcontigtrace
 320                 printf("i is %d bufferAmountsToSendThisRound is %d sendBufferOffsetsThisRound is %d targetAggsForMyData is %d\n",i, bufferAmountsToSendThisRound[i], sendBufferOffsetsThisRound[i],targetAggsForMyData[targetAggIndexesForMyDataThisRound[i]]);
 321 #endif
 322                 MPI_Isend(&((char*)buf)[sendBufferOffsetsThisRound[i]],bufferAmountsToSendThisRound[i],MPI_BYTE,
 323                         targetAggsForMyData[targetAggIndexesForMyDataThisRound[i]],0,fd->comm,&mpiSendDataToTargetAggRequest[i]);
 324             }
 325         numDataSendToWaitFor++;
 326         }
 327 
 328 #ifdef ROMIO_GPFS
 329         gpfsmpio_prof_cw[GPFSMPIO_CIO_T_DEXCH_SETUP] += (endTimeBase-startTimeBase);
 330         startTimeBase = MPI_Wtime();
 331 #endif
 332 
 333         /* the aggs receive the data from the source procs */
 334         int numDataRecvToWaitFor = 0;
 335         for (i=0;i<numSourceProcs;i++) {
 336 
 337             int currentWBOffset = 0;
 338             for (j=0;j<i;j++)
 339                 currentWBOffset += dataSizeGottenThisRoundPerProc[j];
 340 
 341             /* only receive from source procs that will send > 0 count data */
 342             if (dataSizeGottenThisRoundPerProc[i] > 0) {
 343 #ifdef p2pcontigtrace
 344                 printf("receiving data from rank %d dataSizeGottenThisRoundPerProc is %d currentWBOffset is %d\n",sourceProcsForMyData[i],dataSizeGottenThisRoundPerProc[i],currentWBOffset);
 345 #endif
 346                 MPI_Irecv(&((char*)write_buf)[currentWBOffset],dataSizeGottenThisRoundPerProc[i],
 347                         MPI_BYTE,sourceProcsForMyData[i],0,
 348                         fd->comm,&mpiRecvDataRequest[numDataRecvToWaitFor]);
 349                 mpiRequestMapPerProc[numDataRecvToWaitFor] = i;
 350                 numDataRecvToWaitFor++;
 351             }
 352 
 353 #ifdef p2pcontigtrace
 354             printf("MPI_Irecv from rank %d\n",targetAggsForMyData[i]);
 355 #endif
 356         }
 357 
 358         int totalDataReceivedThisRound = 0;
 359         for (i = 0; i < numDataRecvToWaitFor; i++) {
 360             MPI_Waitany(numDataRecvToWaitFor,mpiRecvDataRequest,
 361                     &irecv,&mpiWaitAnyStatusFromSourceProcs);
 362             totalDataReceivedThisRound +=
 363                 dataSizeGottenThisRoundPerProc[mpiRequestMapPerProc[irecv]];
 364             totalAmountDataReceived +=
 365                 dataSizeGottenThisRoundPerProc[mpiRequestMapPerProc[irecv]];
 366 
 367 #ifdef p2pcontigtrace
 368             printf("numDataRecvToWaitFor is %d was sent %d bytes data for %d remaining bytes from rank %d irecv index %d\n",numDataRecvToWaitFor,dataSizeGottenThisRoundPerProc[mpiRequestMapPerProc[irecv]],remainingDataAmountToGetPerProc[mpiRequestMapPerProc[irecv]],sourceProcsForMyData[mpiRequestMapPerProc[irecv]],irecv);
 369 #endif
 370             remainingDataAmountToGetPerProc[mpiRequestMapPerProc[irecv]] -=
 371                 dataSizeGottenThisRoundPerProc[mpiRequestMapPerProc[irecv]];
 372             remainingDataOffsetToGetPerProc[mpiRequestMapPerProc[irecv]] +=
 373                 dataSizeGottenThisRoundPerProc[mpiRequestMapPerProc[irecv]];
 374 
 375         }
 376 
 377         /* clean up the MPI_Request object for the MPI_Isend which told the
 378          * source procs how much data to send */
 379         for (i=0;i<numSourceProcsSentData;i++) {
 380            MPI_Waitany(numSourceProcsSentData,mpiSendDataSizeRequest,
 381                    &isend,&mpiIsendStatusForSize);
 382         }
 383 
 384 
 385 #ifdef ROMIO_GPFS
 386         endTimeBase = MPI_Wtime();
 387         gpfsmpio_prof_cw[GPFSMPIO_CIO_T_DEXCH_NET] += (endTimeBase-startTimeBase);
 388 #endif
 389         /* the aggs now write the data */
 390         if (numDataRecvToWaitFor > 0) {
 391 
 392 #ifdef p2pcontigtrace
 393             printf("totalDataReceivedThisRound is %d\n",totalDataReceivedThisRound);
 394 #endif
 395             if (!useIOBuffer) {
 396 
 397                 ADIO_WriteContig(fd, write_buf, (int)totalDataReceivedThisRound,
 398                         MPI_BYTE, ADIO_EXPLICIT_OFFSET,
 399                         currentRoundFDStart, &status, error_code);
 400             } else { /* use the thread writer */
 401 
 402                 if(!pthread_equal(io_thread, pthread_self())) {
 403                     pthread_join(io_thread, &thread_ret);
 404                     *error_code = *(int *)thread_ret;
 405                     if (*error_code != MPI_SUCCESS) return;
 406                     io_thread = pthread_self();
 407 
 408                 }
 409                 io_thread_args.fd = fd;
 410                 /* do a little pointer shuffling: background I/O works from one
 411                  * buffer while two-phase machinery fills up another */
 412 
 413                 if (currentWriteBuf == 0) {
 414                     io_thread_args.buf = write_buf0;
 415                     currentWriteBuf = 1;
 416                     write_buf = write_buf1;
 417                 }
 418                 else {
 419                     io_thread_args.buf = write_buf1;
 420                     currentWriteBuf = 0;
 421                     write_buf = write_buf0;
 422                 }
 423                 io_thread_args.io_kind = ADIOI_WRITE;
 424                 io_thread_args.size = totalDataReceivedThisRound;
 425                 io_thread_args.offset = currentRoundFDStart;
 426                 io_thread_args.status = &status;
 427                 io_thread_args.error_code = *error_code;
 428                 if ( (pthread_create(&io_thread, NULL,
 429                                 ADIOI_IO_Thread_Func, &(io_thread_args))) != 0)
 430                     io_thread = pthread_self();
 431 
 432             }
 433 
 434         } /* numDataRecvToWaitFor > 0 */
 435 
 436         if (iAmUsedAgg)
 437             currentRoundFDStart += coll_bufsize;
 438         for (i = 0; i < numDataSendToWaitFor; i++) {
 439           MPI_Wait(&mpiSendDataToTargetAggRequest[i],
 440                   &mpiIsendStatusForData);
 441         }
 442 
 443     } /* for-loop roundIter */
 444 
 445 #ifdef ROMIO_GPFS
 446     endTimeBase = MPI_Wtime();
 447     gpfsmpio_prof_cw[GPFSMPIO_CIO_T_DEXCH] += (endTimeBase-startTimeBase);
 448 #endif
 449 
 450     if (useIOBuffer) { /* thread writer cleanup */
 451 
 452         if ( !pthread_equal(io_thread, pthread_self()) ) {
 453             pthread_join(io_thread, &thread_ret);
 454             *error_code = *(int *)thread_ret;
 455         }
 456 
 457     }
 458 
 459 
 460 
 461     if (iAmUsedAgg) {
 462         ADIOI_Free(sourceProcsForMyData);
 463         ADIOI_Free(remainingDataAmountToGetPerProc);
 464         ADIOI_Free(remainingDataOffsetToGetPerProc);
 465     }
 466 
 467     ADIOI_Free(targetAggsForMyData);
 468     ADIOI_Free(targetAggsForMyDataFDStart);
 469     ADIOI_Free(targetAggsForMyDataFDEnd);
 470     ADIOI_Free(targetAggIndexesForMyDataThisRound);
 471     ADIOI_Free(sendBufferOffsetsThisRound);
 472     ADIOI_Free(bufferAmountsToSendThisRound);
 473     ADIOI_Free(amountOfDataReqestedByTargetAgg);
 474     ADIOI_Free(mpiSizeToSendRequest);
 475     ADIOI_Free(mpiRecvDataRequest);
 476     ADIOI_Free(mpiSendDataSizeRequest);
 477     ADIOI_Free(mpiSendDataToTargetAggRequest);
 478     ADIOI_Free(dataSizeGottenThisRoundPerProc);
 479     ADIOI_Free(mpiRequestMapPerProc);
 480 
 481     /* TODO: still need a barrier here? */
 482     MPI_Barrier(fd->comm);
 483     return;
 484 }
 485 
 486 void ADIOI_P2PContigReadAggregation(ADIO_File fd,
 487         const void *buf,
 488         int *error_code,
 489         ADIO_Offset *st_offsets,
 490         ADIO_Offset *end_offsets,
 491         ADIO_Offset *fd_start,
 492         ADIO_Offset* fd_end)
 493 {
 494 
 495     *error_code = MPI_SUCCESS; /* initialize to success */
 496 
 497 #ifdef ROMIO_GPFS
 498     double startTimeBase,endTimeBase;
 499 #endif
 500 
 501     MPI_Status status;
 502     pthread_t io_thread;
 503     void *thread_ret;
 504     ADIOI_IO_ThreadFuncData io_thread_args;
 505 
 506 #ifdef ROMIO_GPFS
 507     startTimeBase = MPI_Wtime();
 508 #endif
 509 
 510     int nprocs,myrank;
 511     MPI_Comm_size(fd->comm, &nprocs);
 512     MPI_Comm_rank(fd->comm, &myrank);
 513 
 514     ADIO_Offset myOffsetStart = st_offsets[myrank], myOffsetEnd = end_offsets[myrank];
 515 
 516     int myAggRank = -1; /* if I am an aggregor this is my index into fd->hints->ranklist */
 517     int iAmUsedAgg = 0;
 518 
 519     int naggs = fd->hints->cb_nodes;
 520     int coll_bufsize = fd->hints->cb_buffer_size;
 521 #ifdef ROMIO_GPFS
 522     if (gpfsmpio_pthreadio == 1)
 523         /* share buffer between working threads */
 524         coll_bufsize = coll_bufsize/2;
 525 #endif
 526 
 527     int j;
 528     for (j=0;j<naggs;j++) {
 529         if (fd->hints->ranklist[j] == myrank) {
 530             myAggRank = j;
 531             if (fd_end[j] > fd_start[j]) {
 532                 iAmUsedAgg = 1;
 533             }
 534         }
 535     }
 536 
 537     /* for my offset range determine how much data and from whom I need to get
 538      * it.  For source ag targets, also determine the source file domain
 539      * offsets locally to reduce communication overhead */
 540     int *sourceAggsForMyData = (int *)ADIOI_Malloc(naggs * sizeof(int));
 541     ADIO_Offset *sourceAggsForMyDataFDStart = (ADIO_Offset *)ADIOI_Malloc(naggs * sizeof(ADIO_Offset));
 542     ADIO_Offset *sourceAggsForMyDataFDEnd = (ADIO_Offset *)ADIOI_Malloc(naggs * sizeof(ADIO_Offset));
 543     int numSourceAggs = 0;
 544     int i;
 545     for (i=0;i<naggs;i++) {
 546         if ( ((myOffsetStart >= fd_start[i]) && (myOffsetStart <= fd_end[i])) ||
 547                 ((myOffsetEnd >= fd_start[i]) &&  (myOffsetEnd <= fd_end[i]))) {
 548             sourceAggsForMyData[numSourceAggs] = fd->hints->ranklist[i];
 549             sourceAggsForMyDataFDStart[numSourceAggs] = fd_start[i];
 550             sourceAggsForMyDataFDEnd[numSourceAggs] = fd_end[i];
 551             numSourceAggs++;
 552         }
 553     }
 554 
 555     /* these 3 arrays track info on the procs that are fed from an aggregtor -
 556      * to sacrifice some performance at setup to save on memory instead of
 557      * using max size of nprocs for the arrays could determine exact size first
 558      * and then allocate that size */
 559     int *targetProcsForMyData=NULL;
 560     int *remainingDataAmountToSendPerProc=NULL;
 561     ADIO_Offset *remainingDataOffsetToSendPerProc=NULL;
 562 
 563     int numTargetProcs = 0;
 564 
 565     if (iAmUsedAgg) {
 566         /* for the used aggregators figure out how much data I need from what procs */
 567 
 568         /* count numTargetProcs so we know how large to make the arrays */
 569         for (i=0;i<nprocs;i++)
 570             if ( ((st_offsets[i] >= fd_start[myAggRank]) &&
 571                         (st_offsets[i] <= fd_end[myAggRank])) ||
 572                     ((end_offsets[i] >= fd_start[myAggRank]) &&
 573                      (end_offsets[i] <= fd_end[myAggRank]))  )
 574                 numTargetProcs++;
 575 
 576         targetProcsForMyData =
 577             (int *)ADIOI_Malloc(numTargetProcs * sizeof(int));
 578         remainingDataAmountToSendPerProc =
 579             (int *)ADIOI_Malloc(numTargetProcs * sizeof(int));
 580         remainingDataOffsetToSendPerProc =
 581             (ADIO_Offset *)ADIOI_Malloc(numTargetProcs * sizeof(ADIO_Offset));
 582 
 583         /* everybody has the st_offsets and end_offsets for all ranks so if I am a
 584          * used aggregator go thru them and figure out which ranks have data that
 585          * falls into my file domain assigned to me */
 586         numTargetProcs = 0;
 587         for (i=0;i<nprocs;i++) {
 588             if ( ((st_offsets[i] >= fd_start[myAggRank]) &&  (st_offsets[i] <= fd_end[myAggRank])) || ((end_offsets[i] >= fd_start[myAggRank]) &&  (end_offsets[i] <= fd_end[myAggRank]))) {
 589                 targetProcsForMyData[numTargetProcs] = i;
 590                 if ( ((st_offsets[i] >= fd_start[myAggRank]) &&  (st_offsets[i] <= fd_end[myAggRank])) && ((end_offsets[i] >= fd_start[myAggRank]) &&  (end_offsets[i] <= fd_end[myAggRank]))) {
 591                     remainingDataAmountToSendPerProc[numTargetProcs] = (end_offsets[i] - st_offsets[i])+1;
 592                     remainingDataOffsetToSendPerProc[numTargetProcs] = st_offsets[i];
 593                 }
 594                 else if ((st_offsets[i] >= fd_start[myAggRank]) &&  (st_offsets[i] <= fd_end[myAggRank])) {/* starts in this fd and goes past it */
 595                     remainingDataAmountToSendPerProc[numTargetProcs] = (fd_end[myAggRank] - st_offsets[i]) +1;
 596                     remainingDataOffsetToSendPerProc[numTargetProcs] = st_offsets[i];
 597                 }
 598                 else { /* starts in fd before this and ends in it */
 599                     remainingDataAmountToSendPerProc[numTargetProcs] = (end_offsets[i] - fd_start[myAggRank]) +1;
 600                     remainingDataOffsetToSendPerProc[numTargetProcs] = fd_start[myAggRank];
 601                 }
 602                 numTargetProcs++;
 603             }
 604         }
 605     }
 606 
 607 
 608     MPI_Request *mpiRecvDataFromSourceAggsRequest = (MPI_Request *) ADIOI_Malloc(numSourceAggs * sizeof(MPI_Request));
 609     MPI_Request *mpiSendDataToTargetProcRequest = (MPI_Request *) ADIOI_Malloc(numTargetProcs * sizeof(MPI_Request));
 610     MPI_Status mpiWaitAnyStatusFromSourceProcs,mpiIsendStatusForData;
 611 
 612     /* use the two-phase buffer allocated in the file_open - no app should ever
 613      * be both reading and writing at the same time */
 614     char *read_buf0 = fd->io_buf;
 615     char *read_buf1 = fd->io_buf + coll_bufsize;
 616     /* if threaded i/o selected, we'll do a kind of double buffering */
 617     char *read_buf = read_buf0;
 618 
 619     /* compute number of rounds */
 620     ADIO_Offset numberOfRounds = (ADIO_Offset)((((ADIO_Offset)(end_offsets[nprocs-1]-st_offsets[0]))/((ADIO_Offset)((ADIO_Offset)coll_bufsize*(ADIO_Offset)naggs)))) + 1;
 621 
 622     ADIO_Offset currentRoundFDStart = 0, nextRoundFDStart = 0;
 623     ADIO_Offset currentRoundFDEnd = 0, nextRoundFDEnd = 0;
 624 
 625     if (iAmUsedAgg) {
 626         currentRoundFDStart = fd_start[myAggRank];
 627         nextRoundFDStart = fd_start[myAggRank];
 628     }
 629 
 630     int *dataSizeSentThisRoundPerProc = (int *)ADIOI_Malloc(numTargetProcs * sizeof(int));
 631     int *sourceAggIndexesForMyDataThisRound = (int *)ADIOI_Malloc(numSourceAggs * sizeof(int));
 632     int *recvBufferOffsetsThisRound = (int *)ADIOI_Malloc(numSourceAggs * sizeof(int));
 633     int *bufferAmountsToGetThisRound = (int *)ADIOI_Malloc(numSourceAggs * sizeof(int));
 634     *error_code = MPI_SUCCESS;
 635 
 636     int currentReadBuf = 0;
 637     int useIOBuffer = 0;
 638 #ifdef ROMIO_GPFS
 639     if (gpfsmpio_pthreadio && (numberOfRounds>1)) {
 640         useIOBuffer = 1;
 641         io_thread = pthread_self();
 642     }
 643 #endif
 644 
 645 #ifdef ROMIO_GPFS
 646     endTimeBase = MPI_Wtime();
 647     gpfsmpio_prof_cw[GPFSMPIO_CIO_T_MYREQ] += (endTimeBase-startTimeBase);
 648 #endif
 649 
 650 
 651     /* each iteration of this loop reads a coll_bufsize portion of the file domain */
 652     int roundIter;
 653     for (roundIter=0;roundIter<numberOfRounds;roundIter++) {
 654 
 655         int irecv,isend;
 656         /* determine what offsets define the portion of the file domain the agg is reading this round */
 657         if (iAmUsedAgg) {
 658 
 659             currentRoundFDStart = nextRoundFDStart;
 660 
 661             if (!useIOBuffer || (roundIter == 0)) {
 662                 int amountDataToReadThisRound;
 663                 if ((fd_end[myAggRank] - currentRoundFDStart) < coll_bufsize) {
 664                     currentRoundFDEnd = fd_end[myAggRank];
 665                     amountDataToReadThisRound = ((currentRoundFDEnd-currentRoundFDStart)+1);
 666                 }
 667                 else {
 668                     currentRoundFDEnd = currentRoundFDStart + coll_bufsize - 1;
 669                     amountDataToReadThisRound = coll_bufsize;
 670                 }
 671 
 672                 /* read currentRoundFDEnd bytes */
 673                 ADIO_ReadContig(fd, read_buf,amountDataToReadThisRound,
 674                         MPI_BYTE, ADIO_EXPLICIT_OFFSET, currentRoundFDStart,
 675                         &status, error_code);
 676         currentReadBuf = 1;
 677 
 678 #ifdef ROMIO_GPFS
 679                 endTimeBase = MPI_Wtime();
 680 #endif
 681             }
 682 
 683             if (useIOBuffer) { /* use the thread reader for the next round */
 684                 /* switch back and forth between the read buffers so that the data aggregation code is diseminating 1 buffer while the thread is reading into the other */
 685 
 686                 if (roundIter > 0)
 687                     currentRoundFDEnd = nextRoundFDEnd;
 688 
 689                 if (roundIter < (numberOfRounds-1)) {
 690                     nextRoundFDStart += coll_bufsize;
 691                     int amountDataToReadNextRound;
 692                     if ((fd_end[myAggRank] - nextRoundFDStart) < coll_bufsize) {
 693                         nextRoundFDEnd = fd_end[myAggRank];
 694                         amountDataToReadNextRound = ((nextRoundFDEnd-nextRoundFDStart)+1);
 695                     }
 696                     else {
 697                         nextRoundFDEnd = nextRoundFDStart + coll_bufsize - 1;
 698                         amountDataToReadNextRound = coll_bufsize;
 699                     }
 700 
 701                     if(!pthread_equal(io_thread, pthread_self())) {
 702                         pthread_join(io_thread, &thread_ret);
 703                         *error_code = *(int *)thread_ret;
 704                         if (*error_code != MPI_SUCCESS) return;
 705                         io_thread = pthread_self();
 706 
 707                     }
 708                     io_thread_args.fd = fd;
 709                     /* do a little pointer shuffling: background I/O works from one
 710                      * buffer while two-phase machinery fills up another */
 711 
 712                     if (currentReadBuf == 0) {
 713                         io_thread_args.buf = read_buf0;
 714                         currentReadBuf = 1;
 715                         read_buf = read_buf1;
 716                     }
 717                     else {
 718                         io_thread_args.buf = read_buf1;
 719                         currentReadBuf = 0;
 720                         read_buf = read_buf0;
 721                     }
 722                     io_thread_args.io_kind = ADIOI_READ;
 723                     io_thread_args.size = amountDataToReadNextRound;
 724                     io_thread_args.offset = nextRoundFDStart;
 725                     io_thread_args.status = &status;
 726                     io_thread_args.error_code = *error_code;
 727                     if ( (pthread_create(&io_thread, NULL,
 728                                     ADIOI_IO_Thread_Func, &(io_thread_args))) != 0)
 729                         io_thread = pthread_self();
 730 
 731                 }
 732                 else { /* last round */
 733 
 734                     if(!pthread_equal(io_thread, pthread_self())) {
 735                         pthread_join(io_thread, &thread_ret);
 736                         *error_code = *(int *)thread_ret;
 737                         if (*error_code != MPI_SUCCESS) return;
 738                         io_thread = pthread_self();
 739 
 740                     }
 741                     if (currentReadBuf == 0) {
 742                         read_buf = read_buf1;
 743                     }
 744                     else {
 745                         read_buf = read_buf0;
 746                     }
 747 
 748                 }
 749             } /* useIOBuffer */
 750         } /* IAmUsedAgg */
 751 
 752         /* determine what source aggs I need to get data from this round and
 753          * recv only from them */
 754         int numSourceAggsThisRound = 0;
 755         for (i=0;i<numSourceAggs;i++) {
 756             if ( ((myOffsetStart >= sourceAggsForMyDataFDStart[i]) && (myOffsetStart <= sourceAggsForMyDataFDEnd[i]))
 757                     || ((myOffsetEnd >= sourceAggsForMyDataFDStart[i]) && (myOffsetEnd <= sourceAggsForMyDataFDEnd[i])) ) {
 758                 /* we know that we need to get data from this source agg at
 759                  * some point, now need to figure out how much this round */
 760 
 761                 /* here are the offsets currently being sent by the aggregator
 762                  * during this round */
 763                 ADIO_Offset currentRoundFDStartForMySourceAgg =
 764                     (ADIO_Offset)((ADIO_Offset)sourceAggsForMyDataFDStart[i] +
 765                             (ADIO_Offset)((ADIO_Offset)roundIter*(ADIO_Offset)coll_bufsize));
 766                 ADIO_Offset currentRoundFDEndForMySourceAgg =
 767                     (ADIO_Offset)((ADIO_Offset)sourceAggsForMyDataFDStart[i] +
 768                             (ADIO_Offset)((ADIO_Offset)(roundIter+1)*(ADIO_Offset)coll_bufsize) - (ADIO_Offset)1);
 769                 if (currentRoundFDEndForMySourceAgg > sourceAggsForMyDataFDEnd[i])
 770                     currentRoundFDEndForMySourceAgg = sourceAggsForMyDataFDEnd[i];
 771 
 772 #ifdef p2pcontigtrace
 773                 printf("roundIter %d source iter %d sourceAggsForMyData is %d myOffsetStart is %ld myOffsetEnd is %ld sourceAggsForMyDataFDStart is %ld sourceAggsForMyDataFDEnd is %ld currentRoundFDStartForMySourceAgg is %ld currentRoundFDEndForMySourceAgg is %ld\n",roundIter,i,sourceAggsForMyData[i],myOffsetStart,myOffsetEnd,sourceAggsForMyDataFDStart[i],sourceAggsForMyDataFDEnd[i],currentRoundFDStartForMySourceAgg,currentRoundFDEndForMySourceAgg);
 774 #endif
 775 
 776                 /* get the portion of my data that is within currentRoundFDStartForMySourceAgg to currentRoundFDEndForMySourceAgg */
 777                 /* find the offset into the recv buffer and the amount of data to get */
 778                 int recvBufferOffset = 0;
 779                 int bufferAmountToGet = 0;
 780 
 781                 if ((myOffsetStart >= currentRoundFDStartForMySourceAgg) && (myOffsetStart <= currentRoundFDEndForMySourceAgg)) {
 782                     if (myOffsetEnd > currentRoundFDEndForMySourceAgg)
 783                         bufferAmountToGet = (currentRoundFDEndForMySourceAgg - myOffsetStart) +1;
 784                     else
 785                         bufferAmountToGet = (myOffsetEnd - myOffsetStart) +1;
 786                 }
 787                 else if ((myOffsetEnd >= currentRoundFDStartForMySourceAgg) && (myOffsetEnd <= currentRoundFDEndForMySourceAgg)) {
 788                     recvBufferOffset = (int) (currentRoundFDStartForMySourceAgg - myOffsetStart);
 789                     if (myOffsetEnd > currentRoundFDEndForMySourceAgg)
 790                         bufferAmountToGet = (currentRoundFDEndForMySourceAgg - currentRoundFDStartForMySourceAgg) +1;
 791                     else
 792                         bufferAmountToGet = (myOffsetEnd - currentRoundFDStartForMySourceAgg) +1;
 793                 }
 794                 else if ((myOffsetStart <= currentRoundFDStartForMySourceAgg) && (myOffsetEnd >= currentRoundFDEndForMySourceAgg)) {
 795                     recvBufferOffset = (int) (currentRoundFDStartForMySourceAgg - myOffsetStart);
 796                     bufferAmountToGet = (currentRoundFDEndForMySourceAgg - currentRoundFDStartForMySourceAgg) +1;
 797                 }
 798 
 799 
 800                 if (bufferAmountToGet > 0) { /* we have data to get this round */
 801                     sourceAggIndexesForMyDataThisRound[numSourceAggsThisRound] = i;
 802                     recvBufferOffsetsThisRound[numSourceAggsThisRound] = recvBufferOffset;
 803                     bufferAmountsToGetThisRound[numSourceAggsThisRound] = bufferAmountToGet;
 804 #ifdef p2pcontigtrace
 805                     printf("bufferAmountToGet is %d recvBufferOffset is %d\n",bufferAmountToGet,recvBufferOffset);
 806 #endif
 807                     numSourceAggsThisRound++;
 808                 }
 809             }
 810         }
 811 
 812         /* the aggs determine the amount of data they will be sending to their
 813          * source procs */
 814         for (i=0;i<numTargetProcs;i++) {
 815             if ((remainingDataOffsetToSendPerProc[i] >= currentRoundFDStart) &&
 816                     (remainingDataOffsetToSendPerProc[i] <= currentRoundFDEnd)) {
 817                 if ((remainingDataOffsetToSendPerProc[i] +
 818                             remainingDataAmountToSendPerProc[i]) <= currentRoundFDEnd)
 819                     dataSizeSentThisRoundPerProc[i] = remainingDataAmountToSendPerProc[i];
 820                 else
 821                     dataSizeSentThisRoundPerProc[i] =
 822                         (currentRoundFDEnd - remainingDataOffsetToSendPerProc[i]) +1;
 823             }
 824             else if (((remainingDataOffsetToSendPerProc[i]+
 825                             remainingDataAmountToSendPerProc[i]) >=
 826                         currentRoundFDStart) &&
 827                     ((remainingDataOffsetToSendPerProc[i]+
 828                       remainingDataAmountToSendPerProc[i]) <= currentRoundFDEnd)) {
 829                 if ((remainingDataOffsetToSendPerProc[i]) >= currentRoundFDStart)
 830                     dataSizeSentThisRoundPerProc[i] = remainingDataAmountToSendPerProc[i];
 831                 else
 832                     dataSizeSentThisRoundPerProc[i] =
 833                         (remainingDataOffsetToSendPerProc[i]-currentRoundFDStart) +1;
 834             }
 835             else
 836                 dataSizeSentThisRoundPerProc[i] = 0;
 837 
 838         }
 839 
 840         /* the target procs get the data from the source aggs */
 841         for (i = 0; i < numSourceAggsThisRound; i++) {
 842             MPI_Irecv(&((char*)buf)[recvBufferOffsetsThisRound[i]],
 843                     bufferAmountsToGetThisRound[i],MPI_BYTE,
 844                     sourceAggsForMyData[sourceAggIndexesForMyDataThisRound[i]],0,fd->comm,
 845                     &mpiRecvDataFromSourceAggsRequest[i]);
 846         }
 847 
 848         /* the source aggs send the data to the target procs */
 849         int numTargetProcsSentThisRound = 0;
 850         for (i=0;i<numTargetProcs;i++) {
 851 
 852             int currentWBOffset = 0;
 853             for (j=0;j<i;j++)
 854                 currentWBOffset += dataSizeSentThisRoundPerProc[j];
 855 
 856             /* only send to target procs that will recv > 0 count data */
 857             if (dataSizeSentThisRoundPerProc[i] > 0) {
 858                 MPI_Isend(&((char*)read_buf)[currentWBOffset],
 859                         dataSizeSentThisRoundPerProc[i],
 860                         MPI_BYTE,targetProcsForMyData[i],0,
 861                         fd->comm,&mpiSendDataToTargetProcRequest[numTargetProcsSentThisRound]);
 862                 numTargetProcsSentThisRound++;
 863                 remainingDataAmountToSendPerProc[i] -= dataSizeSentThisRoundPerProc[i];
 864                 remainingDataOffsetToSendPerProc[i] += dataSizeSentThisRoundPerProc[i];
 865             }
 866         }
 867 
 868         /* wait for the target procs to get their data */
 869         for (i = 0; i < numSourceAggsThisRound; i++) {
 870             MPI_Waitany(numSourceAggsThisRound,mpiRecvDataFromSourceAggsRequest,
 871                     &irecv,&mpiWaitAnyStatusFromSourceProcs);
 872         }
 873 
 874         nextRoundFDStart = currentRoundFDStart + coll_bufsize;
 875 
 876         /* clean up the MPI_Isend MPI_Requests */
 877         for (i=0;i<numTargetProcsSentThisRound;i++) {
 878           MPI_Waitany(numTargetProcsSentThisRound,mpiSendDataToTargetProcRequest,
 879                   &isend,&mpiIsendStatusForData);
 880         }
 881 
 882         MPI_Barrier(fd->comm); /* need to sync up the source aggs which did the isend with the target procs which did the irecvs to give the target procs time to get the data before overwriting with next round readcontig */
 883 
 884     } /* for-loop roundIter */
 885 
 886     if (useIOBuffer) { /* thread reader cleanup */
 887 
 888         if ( !pthread_equal(io_thread, pthread_self()) ) {
 889             pthread_join(io_thread, &thread_ret);
 890             *error_code = *(int *)thread_ret;
 891         }
 892     }
 893 
 894     if (iAmUsedAgg) {
 895         ADIOI_Free(targetProcsForMyData);
 896         ADIOI_Free(remainingDataAmountToSendPerProc);
 897         ADIOI_Free(remainingDataOffsetToSendPerProc);
 898     }
 899 
 900     ADIOI_Free(sourceAggsForMyData);
 901     ADIOI_Free(sourceAggsForMyDataFDStart);
 902     ADIOI_Free(sourceAggsForMyDataFDEnd);
 903 
 904     ADIOI_Free(mpiRecvDataFromSourceAggsRequest);
 905     ADIOI_Free(mpiSendDataToTargetProcRequest);
 906     ADIOI_Free(dataSizeSentThisRoundPerProc);
 907     ADIOI_Free(sourceAggIndexesForMyDataThisRound);
 908     ADIOI_Free(recvBufferOffsetsThisRound);
 909     ADIOI_Free(bufferAmountsToGetThisRound);
 910 
 911     /* TODO: is Barrier here needed? */
 912     MPI_Barrier(fd->comm);
 913 
 914     return;
 915 
 916 }

/* [<][>][^][v][top][bottom][index][help] */