root/ompi/mca/io/romio321/romio/adio/ad_gpfs/pe/ad_pe_aggrs.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. ADIOI_PE_gen_agg_ranklist

   1 /* ---------------------------------------------------------------- */
   2 /* (C)Copyright IBM Corp.  2007, 2008                               */
   3 /* ---------------------------------------------------------------- */
   4 /**
   5  * \file ad_pe_aggrs.c
   6  * \brief The externally used function from this file is is declared in ad_pe_aggrs.h
   7  */
   8 
   9 /* -*- Mode: C; c-basic-offset:4 ; -*- */
  10 /*
  11  *   Copyright (C) 1997-2001 University of Chicago.
  12  *   See COPYRIGHT notice in top-level directory.
  13  */
  14 
  15 /*#define TRACE_ON */
  16 
  17 #include "adio.h"
  18 #include "adio_cb_config_list.h"
  19 #include "../ad_gpfs.h"
  20 #include "ad_pe_aggrs.h"
  21 #include "mpiimpl.h"
  22 
  23 #ifdef AGGREGATION_PROFILE
  24 #include "mpe.h"
  25 #endif
  26 
  27 #ifdef USE_DBG_LOGGING
  28   #define AGG_DEBUG 1
  29 #endif
  30 
  31 #ifndef TRACE_ERR
  32 #  define TRACE_ERR(format...)
  33 #endif
  34 
  35 /*
  36  * Compute the aggregator-related parameters that are required in 2-phase
  37  * collective IO of ADIO.
  38  * The parameters are
  39  *      . the number of aggregators (proxies) : fd->hints->cb_nodes
  40  *      . the ranks of the aggregators :        fd->hints->ranklist
  41  * If MP_IONODEFILE is defined, POE determines all tasks on every node listed
  42  * in the node file and defines MP_IOTASKLIST with them, making them all
  43  * aggregators.  Alternatively, the user can explictly set MP_IOTASKLIST
  44  * themselves.  The format of the MP_IOTASKLIST is a colon-delimited list of
  45  * task ids, the first entry being the total number of aggregators, for example
  46  * to specify 4 aggregators on task ids 0,8,16,24  the value would be:
  47  * 4:0:8:16:24.  If there is no MP_IONODEFILE, or MP_IOTASKLIST, then the
  48  * default aggregator selection is 1 task per node for every node of the job -
  49  * additionally, an environment variable MP_IOAGGR_CNT  can be specified, which
  50  * defines the total number of aggregators, spread evenly across all the nodes.
  51  * The romio_cb_nodes and romio_cb_config_list hint user settings are ignored.
  52  */
  53 int
  54 ADIOI_PE_gen_agg_ranklist(ADIO_File fd)
  55 {
  56 
  57     int numAggs = 0;
  58     char *ioTaskList = getenv( "MP_IOTASKLIST" );
  59     char *ioAggrCount = getenv("MP_IOAGGR_CNT");
  60     int i,j;
  61     int inTERcommFlag = 0;
  62 
  63     int myRank,commSize;
  64     MPI_Comm_rank(fd->comm, &myRank);
  65     MPI_Comm_size(fd->comm, &commSize);
  66 
  67     MPI_Comm_test_inter(fd->comm, &inTERcommFlag);
  68     if (inTERcommFlag) {
  69       FPRINTF(stderr,"ERROR: ATTENTION: inTERcomms are not supported in MPI-IO - aborting....\n");
  70       perror("ADIOI_PE_gen_agg_ranklist:");
  71       MPI_Abort(MPI_COMM_WORLD, 1);
  72     }
  73 
  74     if (ioTaskList) {
  75       int ioTaskListLen = strlen(ioTaskList);
  76       int ioTaskListPos = 0;
  77       char tmpBuf[8];   /* Big enough for 1M tasks (7 digits task ID). */
  78       tmpBuf[7] = '\0';
  79       for (i=0; i<7; i++) {
  80          tmpBuf[i] = *ioTaskList++;      /* Maximum is 7 digits for 1 million. */
  81          ioTaskListPos++;
  82          if (*ioTaskList == ':') {       /* If the next char is a ':' ends it. */
  83              tmpBuf[i+1] = '\0';
  84              break;
  85          }
  86       }
  87       numAggs = atoi(tmpBuf);
  88       if (numAggs == 0)
  89         FPRINTF(stderr,"ERROR: ATTENTION: Number of aggregators specified in MP_IOTASKLIST set at 0 - default aggregator selection will be used.\n");
  90       else if (!((numAggs > 0 ) && (numAggs <= commSize))) {
  91         FPRINTF(stderr,"ERROR: ATTENTION: The number of aggregators (%s) specified in MP_IOTASKLIST is outside the communicator task range of %d.\n",tmpBuf,commSize);
  92         numAggs = commSize;
  93       }
  94       fd->hints->ranklist = (int *) ADIOI_Malloc (numAggs * sizeof(int));
  95 
  96       int aggIndex = 0;
  97       while (aggIndex < numAggs) {
  98          ioTaskList++;                /* Advance past the ':' */
  99          ioTaskListPos++;
 100          int allDigits=1;
 101          for (i=0; i<7; i++) {
 102             if (*ioTaskList < '0' || *ioTaskList > '9')
 103               allDigits=0;
 104             tmpBuf[i] = *ioTaskList++;
 105             ioTaskListPos++;
 106             if ( (*ioTaskList == ':') || (*ioTaskList == '\0') ) {
 107                 tmpBuf[i+1] = '\0';
 108                 break;
 109             }
 110          }
 111          if (allDigits) {
 112            int newAggRank = atoi(tmpBuf);
 113            if (!((newAggRank >= 0 ) && (newAggRank < commSize))) {
 114              FPRINTF(stderr,"ERROR: ATTENTION: The aggregator '%s' specified in MP_IOTASKLIST is not within the communicator task range of 0 to %d  - it will be ignored.\n",tmpBuf,commSize-1);
 115            }
 116            else {
 117              int aggAlreadyAdded = 0;
 118              for (i=0;i<aggIndex;i++)
 119                if (fd->hints->ranklist[i] == newAggRank) {
 120                  aggAlreadyAdded = 1;
 121                  break;
 122                }
 123              if (!aggAlreadyAdded)
 124                fd->hints->ranklist[aggIndex++] = newAggRank;
 125              else
 126                FPRINTF(stderr,"ERROR: ATTENTION: The aggregator '%d' is specified multiple times in MP_IOTASKLIST - duplicates are ignored.\n",newAggRank);
 127            }
 128          }
 129          else {
 130            FPRINTF(stderr,"ERROR: ATTENTION: The aggregator '%s' specified in MP_IOTASKLIST is not a valid integer task id  - it will be ignored.\n",tmpBuf);
 131          }
 132 
 133          /* At the end check whether the list is shorter than specified. */
 134          if (ioTaskListPos == ioTaskListLen) {
 135            if (aggIndex == 0) {
 136              FPRINTF(stderr,"ERROR: ATTENTION: No aggregators were correctly specified in MP_IOTASKLIST - default aggregator selection will be used.\n");
 137              ADIOI_Free(fd->hints->ranklist);
 138            }
 139            else if (aggIndex < numAggs)
 140              FPRINTF(stderr,"ERROR: ATTENTION: %d aggregators were specified in MP_IOTASKLIST but only %d were correctly specified - setting the number of aggregators to %d.\n",numAggs, aggIndex,aggIndex);
 141            numAggs = aggIndex;
 142          }
 143       }
 144     }
 145     if (numAggs == 0)  {
 146       MPID_Comm *mpidCommData;
 147 
 148       MPID_Comm_get_ptr(fd->comm,mpidCommData);
 149       int localSize = mpidCommData->local_size;
 150 
 151       // get my node rank
 152       int myNodeRank = mpidCommData->intranode_table[mpidCommData->rank];
 153 
 154       int *allNodeRanks = (int *) ADIOI_Malloc (localSize * sizeof(int));
 155 
 156       allNodeRanks[myRank] = myNodeRank;
 157       MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, allNodeRanks, 1, MPI_INT, fd->comm);
 158 
 159 #ifdef AGG_DEBUG
 160       printf("MPID_Comm data: local_size is %d\nintranode_table entries:\n",mpidCommData->local_size);
 161       for (i=0;i<localSize;i++) {
 162         printf("%d ",mpidCommData->intranode_table[i]);
 163       }
 164       printf("\ninternode_table entries:\n");
 165       for (i=0;i<localSize;i++) {
 166         printf("%d ",mpidCommData->internode_table[i]);
 167       }
 168       printf("\n");
 169 
 170       printf("\nallNodeRanks entries:\n");
 171       for (i=0;i<localSize;i++) {
 172         printf("%d ",allNodeRanks[i]);
 173       }
 174       printf("\n");
 175 
 176 #endif
 177 
 178       if (ioAggrCount) {
 179         int cntType = -1;
 180 
 181         if ( strcasecmp(ioAggrCount, "ALL") ) {
 182            if ( (cntType = atoi(ioAggrCount)) <= 0 ) {
 183               /* Input is other non-digit or less than 1 the  assume */
 184               /* 1 aggregator per node.  Note: atoi(-1) reutns -1.   */
 185               /* No warning message given here -- done earlier.      */
 186               cntType = -1;
 187            }
 188         }
 189         else {
 190            /* ALL is specified set aggr count to localSize */
 191            cntType = -2;
 192         }
 193         switch(cntType) {
 194            case -1:
 195               /* 1 aggr/node case */
 196             {
 197              int rankListIndex = 0;
 198              fd->hints->ranklist = (int *) ADIOI_Malloc (localSize * sizeof(int));
 199              for (i=0;i<localSize;i++) {
 200                if (allNodeRanks[i] == 0) {
 201                  fd->hints->ranklist[rankListIndex++] = i;
 202                  numAggs++;
 203                }
 204              }
 205             }
 206               break;
 207            case -2:
 208               /* ALL tasks case */
 209              fd->hints->ranklist = (int *) ADIOI_Malloc (localSize * sizeof(int));
 210              for (i=0;i<localSize;i++) {
 211                fd->hints->ranklist[i] = i;
 212                numAggs++;
 213              }
 214               break;
 215            default:
 216               /* Specific aggr count case -- MUST be less than localSize, otherwise set to localSize */
 217              if (cntType > localSize)
 218                cntType = localSize;
 219 
 220              numAggs = cntType;
 221              // Round-robin thru allNodeRanks - pick the 0's, then the 1's, etc
 222              int currentNodeRank = 0;  // node rank currently being selected as aggregator
 223              int rankListIndex = 0;
 224              int currentAllNodeIndex = 0;
 225 
 226              fd->hints->ranklist = (int *) ADIOI_Malloc (numAggs * sizeof(int));
 227 
 228              while (rankListIndex < numAggs) {
 229                int foundEntry = 0;
 230                while (!foundEntry && (currentAllNodeIndex < localSize)) {
 231                  if (allNodeRanks[currentAllNodeIndex] == currentNodeRank) {
 232                    fd->hints->ranklist[rankListIndex++] = currentAllNodeIndex;
 233                    foundEntry = 1;
 234                  }
 235                  currentAllNodeIndex++;
 236                }
 237                if (!foundEntry) {
 238                  currentNodeRank++;
 239                  currentAllNodeIndex = 0;
 240                }
 241              } // while
 242           break;
 243         } // switch(cntType)
 244       } // if (ioAggrCount)
 245 
 246       else { // default is 1 aggregator per node
 247         // take the 0 entries from allNodeRanks
 248         int rankListIndex = 0;
 249         fd->hints->ranklist = (int *) ADIOI_Malloc (localSize * sizeof(int));
 250         for (i=0;i<localSize;i++) {
 251           if (allNodeRanks[i] == 0) {
 252             fd->hints->ranklist[rankListIndex++] = i;
 253             numAggs++;
 254           }
 255         }
 256       }
 257 
 258       ADIOI_Free(allNodeRanks);
 259 
 260     }
 261 
 262     if ( getenv("MP_I_SHOW_AGGRS") ) {
 263       if (myRank == 0) {
 264         printf("Agg rank list of %d generated:\n", numAggs);
 265         for (i=0;i<numAggs;i++) {
 266           printf("%d ",fd->hints->ranklist[i]);
 267         }
 268         printf("\n");
 269       }
 270     }
 271 
 272     fd->hints->cb_nodes = numAggs;
 273 
 274     return 0;
 275 }
 276 

/* [<][>][^][v][top][bottom][index][help] */