root/ompi/mca/sharedfp/individual/sharedfp_individual_collaborate_data.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_sharedfp_individual_collaborate_data
  2. mca_sharedfp_individual_get_timestamps_and_reclengths
  3. mca_sharedfp_individual_create_buff
  4. mca_sharedfp_individual_sort_timestamps
  5. mca_sharedfp_individual_assign_globaloffset
  6. mca_sharedfp_individual_getoffset

   1 /*
   2  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2004-2017 The University of Tennessee and The University
   6  *                         of Tennessee Research Foundation.  All rights
   7  *                         reserved.
   8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
   9  *                         University of Stuttgart.  All rights reserved.
  10  * Copyright (c) 2004-2005 The Regents of the University of California.
  11  *                         All rights reserved.
  12  * Copyright (c) 2013-2018 University of Houston. All rights reserved.
  13  * Copyright (c) 2018      Research Organization for Information Science
  14  *                         and Technology (RIST). All rights reserved.
  15  * $COPYRIGHT$
  16  *
  17  * Additional copyrights may follow
  18  *
  19  * $HEADER$
  20  */
  21 
  22 
  23 #include "ompi_config.h"
  24 #include "sharedfp_individual.h"
  25 
  26 #include "mpi.h"
  27 #include "ompi/constants.h"
  28 #include "ompi/mca/sharedfp/sharedfp.h"
  29 #include "ompi/mca/sharedfp/base/base.h"
  30 #include "ompi/mca/common/ompio/common_ompio.h"
  31 
  32 #include <stdlib.h>
  33 #include <stdio.h>
  34 
  35 int mca_sharedfp_individual_collaborate_data(struct mca_sharedfp_base_data_t *sh, ompio_file_t *ompio_fh)
  36 {
  37     int ret = OMPI_SUCCESS;
  38     mca_sharedfp_individual_header_record *headnode = NULL;
  39     char *buff=NULL;
  40     int nodesoneachprocess = 0;
  41     int idx=0,i=0,j=0, l=0;
  42     int *ranks = NULL;
  43     double *timestampbuff = NULL;
  44     OMPI_MPI_OFFSET_TYPE *offsetbuff = NULL;
  45     int *countbuff = NULL;
  46     int *displ = NULL;
  47     double *ind_ts = NULL;
  48     long *ind_recordlength = NULL;
  49     OMPI_MPI_OFFSET_TYPE *local_off = NULL;
  50     int totalnodes = 0;
  51     ompi_status_public_t status;
  52     int recordlength=0;
  53 
  54     headnode = (mca_sharedfp_individual_header_record*)sh->selected_module_data;
  55     if ( NULL == headnode)  {
  56         opal_output(0, "sharedfp_individual_collaborate_data: headnode is NULL but file is open\n");
  57         return OMPI_ERROR;
  58     }
  59 
  60     /* Number of nodes on each process is the sum of records
  61      * on file and records in the linked list
  62      */
  63     nodesoneachprocess = headnode->numofrecordsonfile + headnode->numofrecords;
  64 
  65     if ( mca_sharedfp_individual_verbose ) {
  66         opal_output(ompi_sharedfp_base_framework.framework_output,
  67                     "Nodes of each process = %d\n",nodesoneachprocess);
  68     }
  69 
  70     countbuff = (int*)malloc(ompio_fh->f_size * sizeof(int));
  71     if ( NULL == countbuff  ) {
  72         return OMPI_ERR_OUT_OF_RESOURCE;
  73     }
  74 
  75     displ = (int*)malloc(sizeof(int) * ompio_fh->f_size);
  76     if ( NULL == displ ) {
  77         ret = OMPI_ERR_OUT_OF_RESOURCE;
  78         goto exit;
  79     }
  80 
  81     /* Each process counts the number of nodes
  82      * in its linked list for which global offset */
  83     ret =  mca_sharedfp_individual_get_timestamps_and_reclengths ( &ind_ts, &ind_recordlength,
  84                                                                    &local_off, sh );
  85     if ( OMPI_SUCCESS != ret ) {
  86         goto exit;
  87     }
  88 
  89     ret = ompio_fh->f_comm->c_coll->coll_allgather ( &nodesoneachprocess, 
  90                                                      1, 
  91                                                      MPI_INT,
  92                                                      countbuff, 
  93                                                      1, 
  94                                                      MPI_INT, 
  95                                                      ompio_fh->f_comm,
  96                                                      ompio_fh->f_comm->c_coll->coll_allgather_module );
  97 
  98     if ( OMPI_SUCCESS != ret ) {
  99         goto exit;
 100     }
 101 
 102 
 103     if ( mca_sharedfp_individual_verbose) {
 104         for (i = 0; i < ompio_fh->f_size ; i++) {
 105             opal_output(ompi_sharedfp_base_framework.framework_output,"sharedfp_individual_collaborate_data: "
 106                         "Countbuff[%d] = %d\n", i, countbuff[i]);
 107         }
 108     }
 109 
 110     if ( 0 == nodesoneachprocess )    {
 111         ind_ts[0] = 0;
 112         ind_recordlength[0] = 0;
 113         local_off[0] = 0;
 114     }
 115 
 116     for(i = 0; i < ompio_fh->f_size; i++) {
 117         displ[i]    = totalnodes;
 118         if ( mca_sharedfp_individual_verbose ) {
 119             opal_output(ompi_sharedfp_base_framework.framework_output,
 120                         "sharedfp_individual_collaborate_data: displ[%d] = %d\n",i,displ[i]);
 121         }
 122         totalnodes  = totalnodes + countbuff[i];
 123     }
 124 
 125     if (totalnodes <= 0 ) {
 126         goto exit;
 127     }
 128 
 129     ranks = (int *) malloc ( totalnodes * sizeof(int));
 130     if ( NULL == ranks ) {
 131         ret = OMPI_ERR_OUT_OF_RESOURCE;
 132         goto exit;
 133     }
 134     for ( l=0, i=0; i< ompio_fh->f_size; i++ ) {
 135         for ( j=0; j< countbuff[i]; j++ ) {
 136             ranks[l++]=i;
 137         }
 138     }
 139 
 140     ret =  mca_sharedfp_individual_create_buff ( &timestampbuff, &offsetbuff, totalnodes, ompio_fh->f_size);
 141     if ( OMPI_SUCCESS != ret ) {
 142         goto exit;
 143     }
 144 
 145     ret = ompio_fh->f_comm->c_coll->coll_allgatherv ( ind_ts, 
 146                                                       countbuff[ompio_fh->f_rank], 
 147                                                       MPI_DOUBLE,
 148                                                       timestampbuff, 
 149                                                       countbuff, 
 150                                                       displ, 
 151                                                       MPI_DOUBLE,
 152                                                       ompio_fh->f_comm, 
 153                                                       ompio_fh->f_comm->c_coll->coll_allgatherv_module );
 154     if ( OMPI_SUCCESS != ret ) {
 155         goto exit;
 156     }
 157 
 158     ret = ompio_fh->f_comm->c_coll->coll_allgatherv ( ind_recordlength, 
 159                                                       countbuff[ompio_fh->f_rank], 
 160                                                       OMPI_OFFSET_DATATYPE,
 161                                                       offsetbuff, 
 162                                                       countbuff, 
 163                                                       displ, 
 164                                                       OMPI_OFFSET_DATATYPE,
 165                                                       ompio_fh->f_comm, 
 166                                                       ompio_fh->f_comm->c_coll->coll_allgatherv_module );
 167     if ( OMPI_SUCCESS != ret ) {
 168         goto exit;
 169     }
 170 
 171     ret =  mca_sharedfp_individual_sort_timestamps(&timestampbuff, &offsetbuff, &ranks, totalnodes);
 172     if ( OMPI_SUCCESS != ret ) {
 173         goto exit;
 174     }
 175 
 176     sh->global_offset = mca_sharedfp_individual_assign_globaloffset ( &offsetbuff, totalnodes, sh);
 177 
 178     recordlength = ind_recordlength[0] * 1.2;
 179     buff = (char * ) malloc( recordlength );
 180     if  ( NULL == buff ) {
 181         ret = OMPI_ERR_OUT_OF_RESOURCE;
 182         goto exit;
 183     }
 184 
 185     for (i = 0; i < nodesoneachprocess ; i++)  {
 186         if ( ind_recordlength[i] > recordlength ) {
 187             recordlength = ind_recordlength[i] * 1.2;
 188             buff = (char *) realloc ( buff, recordlength );
 189             if  ( NULL == buff ) {
 190                 ret = OMPI_ERR_OUT_OF_RESOURCE;
 191                 goto exit;
 192             }
 193         }
 194 
 195         /*Read from the local data file*/
 196         ret = mca_common_ompio_file_read_at ( headnode->datafilehandle,
 197                                               local_off[i], buff, ind_recordlength[i],
 198                                               MPI_BYTE, &status);
 199         if ( OMPI_SUCCESS != ret ) {
 200             goto exit;
 201         }
 202 
 203         idx =  mca_sharedfp_individual_getoffset(ind_ts[i],timestampbuff, ranks, ompio_fh->f_rank, totalnodes);
 204 
 205         if ( mca_sharedfp_individual_verbose ) {
 206             opal_output(ompi_sharedfp_base_framework.framework_output,
 207                         "sharedfp_individual_collaborate_data: Process %d writing %ld bytes to main file at position"
 208                         "%lld (%d)\n", ompio_fh->f_rank, ind_recordlength[i], offsetbuff[idx], idx);
 209         }
 210 
 211         /*Write into main data file*/
 212         ret = mca_common_ompio_file_write_at( ompio_fh, offsetbuff[idx], buff,
 213                                               ind_recordlength[i], MPI_BYTE, &status);
 214         if ( OMPI_SUCCESS != ret ) {
 215             goto exit;
 216         }
 217 
 218     }
 219 
 220 exit:
 221     if ( NULL != countbuff ) {
 222         free ( countbuff );
 223     }
 224     if ( NULL != displ ) {
 225         free ( displ );
 226     }
 227 
 228     if( NULL != timestampbuff ){
 229         free ( timestampbuff );
 230     }
 231     if ( NULL != offsetbuff ){
 232         free ( offsetbuff );
 233     }
 234     if ( NULL != ind_ts ) {
 235         free ( ind_ts );
 236     }
 237     if ( NULL != ind_recordlength ) {
 238         free ( ind_recordlength );
 239     }
 240     if ( NULL != local_off ) {
 241         free ( local_off );
 242     }
 243     if ( NULL != buff ) {
 244         free ( buff );
 245     }
 246     if ( NULL != ranks ) {
 247         free ( ranks );
 248     }
 249 
 250     return ret;
 251 }
 252 
 253 /* Count the number of nodes and create and array of the timestamps*/
 254 int  mca_sharedfp_individual_get_timestamps_and_reclengths ( double **buff, long **rec_length, 
 255                                                              MPI_Offset **offbuff,struct mca_sharedfp_base_data_t *sh)
 256 {
 257     int num = 0, i= 0, ctr = 0;
 258     int ret=OMPI_SUCCESS;
 259     mca_sharedfp_individual_metadata_node *currnode;
 260     mca_sharedfp_individual_header_record *headnode;
 261     OMPI_MPI_OFFSET_TYPE metaoffset = 0;
 262     struct  mca_sharedfp_individual_record2 rec;
 263     MPI_Status status;
 264 
 265     headnode = (mca_sharedfp_individual_header_record*)(sh->selected_module_data);
 266     num = ( headnode->numofrecords + headnode->numofrecordsonfile);
 267     currnode = headnode->next;
 268 
 269     if ( mca_sharedfp_individual_verbose ) {
 270         opal_output(ompi_sharedfp_base_framework.framework_output,"Num is %d\n",num);
 271     }
 272 
 273     if ( 0 == num )   {
 274         *buff       = (double*) malloc ( sizeof ( double ));
 275         *rec_length = (long *) malloc ( sizeof ( long ));
 276         *offbuff    = (OMPI_MPI_OFFSET_TYPE *)malloc ( sizeof(OMPI_MPI_OFFSET_TYPE) );
 277         if ( NULL == *buff || NULL == *rec_length || NULL == *offbuff ) {
 278             ret = OMPI_ERR_OUT_OF_RESOURCE;
 279             goto exit;
 280         }
 281     }
 282     else {
 283         *buff       = (double* ) malloc(sizeof ( double) * num);
 284         *rec_length = (long *) malloc(sizeof ( long) * num);
 285         *offbuff    = (OMPI_MPI_OFFSET_TYPE *) malloc ( sizeof(OMPI_MPI_OFFSET_TYPE) * num);
 286         if ( NULL == *buff || NULL == *rec_length || NULL == *offbuff ) {
 287             ret = OMPI_ERR_OUT_OF_RESOURCE;
 288             goto exit;
 289         }
 290     }
 291 
 292     if ( mca_sharedfp_individual_verbose ) {
 293         opal_output(ompi_sharedfp_base_framework.framework_output,
 294                     "sharedfp_individual_get_timestamps_and_reclengths: Numofrecords on file %d\n",
 295                     headnode->numofrecordsonfile);
 296     }
 297 
 298     if (headnode->numofrecordsonfile >  0)  {
 299         metaoffset = headnode->metafile_start_offset;
 300         ctr = 0;
 301         for (i = 0; i < headnode->numofrecordsonfile ; i++)  {
 302 
 303             ret = mca_common_ompio_file_read_at(headnode->metadatafilehandle,metaoffset, 
 304                                                 &rec, 32, MPI_BYTE,&status);
 305             if ( OMPI_SUCCESS != ret ) {
 306                 goto exit;
 307             }
 308 
 309             *(*rec_length + ctr) = rec.recordlength;
 310             *(*buff + ctr) = rec.timestamp;
 311             *(*offbuff + ctr) = rec.localposition;
 312 
 313             metaoffset = metaoffset +  sizeof(struct  mca_sharedfp_individual_record2);
 314 
 315             if ( mca_sharedfp_individual_verbose ) {
 316                 opal_output(ompi_sharedfp_base_framework.framework_output,
 317                             "sharedfp_individual_get_timestamps_and_reclengths: Ctr = %d\n",ctr);
 318             }
 319             ctr++;
 320         }
 321 
 322         headnode->numofrecordsonfile = 0;
 323         headnode->metafile_start_offset = metaoffset;
 324 
 325     }   /* End of if (headnode->numofrecordsonfile > 0) */
 326 
 327     /* Add the records from the linked list */
 328     currnode = headnode->next;
 329     while (currnode)  {
 330         if ( mca_sharedfp_individual_verbose ) {
 331             opal_output(ompi_sharedfp_base_framework.framework_output,"Ctr = %d\n",ctr);
 332         }
 333         /* Some error over here..need to check this code again */
 334         /*while(headnode->next  != NULL)*/
 335 
 336         *(*rec_length + ctr) = currnode->recordlength;
 337         *(*buff + ctr) = currnode->timestamp;
 338         *(*offbuff + ctr) = currnode->localposition;
 339 
 340         ctr = ctr + 1;
 341 
 342         headnode->next = currnode->next;
 343         if ( mca_sharedfp_individual_verbose ) {
 344             opal_output(ompi_sharedfp_base_framework.framework_output,
 345                         "sharedfp_individual_get_timestamps_and_reclengths: node deleted from the metadatalinked list\n");
 346         }
 347         free(currnode);
 348         currnode = headnode->next;
 349 
 350     }   /*End of while(currnode) loop*/
 351 
 352 
 353     /*Reset the numofrecords*/
 354     headnode->numofrecords = 0;
 355 
 356 exit:
 357 
 358     return ret;
 359 }
 360 
 361 int  mca_sharedfp_individual_create_buff(double **ts,MPI_Offset **off,int totalnodes, int size)
 362 {
 363 
 364     if ( totalnodes)  {
 365         *off = (OMPI_MPI_OFFSET_TYPE *) malloc ( totalnodes * sizeof(OMPI_MPI_OFFSET_TYPE));
 366         if ( NULL == *off ) {
 367             return OMPI_ERR_OUT_OF_RESOURCE;
 368         }
 369 
 370         *ts = (double *) malloc ( totalnodes * sizeof(double) );
 371         if (NULL == *ts ) {
 372             return OMPI_ERR_OUT_OF_RESOURCE;
 373         }
 374 
 375     }
 376 
 377     return OMPI_SUCCESS;
 378 }
 379 
 380 /*Sort the timestamp buffer*/
 381 int  mca_sharedfp_individual_sort_timestamps(double **ts, MPI_Offset **off, int **ranks, int totalnodes)
 382 {
 383 
 384     int i = 0;
 385     int j = 0;
 386     int flag = 1;
 387     double tempts = 0.0;
 388     OMPI_MPI_OFFSET_TYPE tempoffset = 0;
 389     int temprank = 0;
 390 
 391     for (i= 1; (i <= totalnodes)&&(flag) ; i++)  {
 392         flag = 0;
 393         for (j = 0; j < (totalnodes - 1); j++)  {
 394             if ( *(*ts + j + 1) < *(*ts + j ))  {
 395                 /*swap timestamp*/
 396                 tempts = *(*ts + j );
 397                 *(*ts + j) = *(*ts + j + 1);
 398                 *(*ts + j + 1) = tempts;
 399 
 400                 /*swap offset*/
 401                 tempoffset = *(*off + j);
 402                 *(*off + j) = *(*off + j + 1);
 403                 *(*off + j + 1) = tempoffset;
 404 
 405                 /*swap ranks*/
 406                 temprank = *(*ranks + j);
 407                 *(*ranks + j) = *(*ranks + j + 1);
 408                 *(*ranks + j + 1) = temprank;
 409 
 410                 flag = 1;
 411             }
 412         }
 413 
 414     }
 415 
 416     return OMPI_SUCCESS;
 417 }
 418 
 419 
 420 MPI_Offset  mca_sharedfp_individual_assign_globaloffset(MPI_Offset **offsetbuff,int totalnodes,
 421                                                         struct mca_sharedfp_base_data_t *sh)
 422 {
 423     int i = 0;
 424     OMPI_MPI_OFFSET_TYPE temp = 0,prevoffset = 0;
 425     OMPI_MPI_OFFSET_TYPE global_offset = 0;
 426 
 427     for (i = 0; i < totalnodes; i++) {
 428         temp = *(*offsetbuff + i);
 429 
 430         if (i == 0) {
 431             *(*offsetbuff + i ) = sh->global_offset;
 432         }
 433         else {
 434             *(*offsetbuff + i) = *(*offsetbuff + i - 1) + prevoffset;
 435         }
 436         prevoffset = temp;
 437     }
 438     global_offset =   *(*offsetbuff + i - 1) + prevoffset;
 439 
 440     return global_offset;
 441 }
 442 
 443 
 444 int  mca_sharedfp_individual_getoffset(double timestamp, double *ts, int *ranks, int myrank, int totalnodes)
 445 {
 446     int i = 0;
 447     int notfound = 1;
 448 
 449 
 450     while (notfound) {
 451         if (ts[i] == timestamp && ranks[i] == myrank )
 452             break;
 453 
 454         i++;
 455 
 456         if (i == totalnodes)  {
 457             notfound = 0;
 458         }
 459     }
 460 
 461     if (!notfound) {
 462         return -1;
 463     }
 464 
 465     return i;
 466 }

/* [<][>][^][v][top][bottom][index][help] */