root/ompi/mca/topo/base/topo_base_dist_graph_create.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_topo_base_dist_graph_distribute
  2. mca_topo_base_dist_graph_create
  3. mca_topo_base_comm_dist_graph_2_2_0_construct
  4. mca_topo_base_comm_dist_graph_2_2_0_destruct

   1 /*
   2  * Copyright (c) 2008      The Trustees of Indiana University and Indiana
   3  *                         University Research and Technology
   4  *                         Corporation.  All rights reserved.
   5  * Copyright (c) 2009      Cisco Systems, Inc.  All rights reserved.
   6  * Copyright (c) 2011-2017 The University of Tennessee and The University
   7  *                         of Tennessee Research Foundation.  All rights
   8  *                         reserved.
   9  * Copyright (c) 2011-2013 Inria.  All rights reserved.
  10  * Copyright (c) 2011-2013 Université Bordeaux 1
  11  * Copyright (c) 2014-2015 Research Organization for Information Science
  12  *                         and Technology (RIST). All rights reserved.
  13  * Copyright (c) 2016-2017 IBM Corporation.  All rights reserved.
  14  */
  15 
  16 #include "ompi_config.h"
  17 
  18 #include "ompi/communicator/communicator.h"
  19 #include "ompi/info/info.h"
  20 #include "ompi/mca/topo/base/base.h"
  21 #include "ompi/datatype/ompi_datatype.h"
  22 #include "ompi/mca/pml/pml.h"
  23 #include "ompi/request/request.h"
  24 
  25 #define IN_INDEX   0
  26 #define OUT_INDEX  1
  27 #define MCA_TOPO_BASE_TAG_DIST_EDGE_IN    -50
  28 #define MCA_TOPO_BASE_TAG_DIST_EDGE_OUT   -51
  29 
  30 typedef struct _dist_graph_elem {
  31     int in;
  32     int out;
  33 } mca_topo_base_dist_graph_elem_t;
  34 
  35 int mca_topo_base_dist_graph_distribute(mca_topo_base_module_t* module,
  36                                         ompi_communicator_t *comm,
  37                                         int n, const int nodes[],
  38                                         const int degrees[], const int targets[],
  39                                         const int weights[],
  40                                         mca_topo_base_comm_dist_graph_2_2_0_t** ptopo)
  41 {
  42     int i, j, err, count, left_over, pending_reqs, current_pos, index, csize;
  43     int *rin = NULL, *rout, *temp = NULL;
  44     mca_topo_base_dist_graph_elem_t *pos, *cnt, *idx;
  45     size_t int_size, how_much;
  46     ompi_status_public_t status;
  47     ompi_request_t **reqs = NULL;
  48     mca_topo_base_comm_dist_graph_2_2_0_t* topo=NULL;
  49 
  50     ompi_datatype_type_size( (ompi_datatype_t*)&ompi_mpi_int, &int_size);
  51 
  52     csize = ompi_comm_size(comm);
  53     /**
  54      * We compress the counts: for each peer we maintain an in and an out.
  55      * In addition we compute 3 arrays (that are allocated in one go):
  56      * - cnt: the number of elements for a peer
  57      * - pos: the position of the first element for a peer
  58      * - idx: temporary indexes and message count after the reduce.
  59      */
  60     cnt = (mca_topo_base_dist_graph_elem_t*)calloc(3 * csize, sizeof(mca_topo_base_dist_graph_elem_t));
  61     if( NULL == cnt ) {
  62         err = OMPI_ERR_OUT_OF_RESOURCE;
  63         goto bail_out;
  64     }
  65     pos = cnt + csize;
  66     idx = pos + csize;
  67 
  68     for( index = i = 0; i < n; i++ ) {
  69         cnt[nodes[i]].out += degrees[i];
  70         for( j = 0; j < degrees[i]; ++j ) {
  71             cnt[targets[index]].in++;
  72             index++;
  73         }
  74     }
  75 
  76     /**
  77      * Prepare the positions array. The ith element is the corresponding
  78      * starting position of the ith neighbor in the global array.
  79      */
  80     pos[0].in  = 0;
  81     pos[0].out = 0;
  82     for( i = 0; i < (csize - 1); i++ ) {
  83         pos[i + 1].in  = pos[i].in  + cnt[i].in;
  84         pos[i + 1].out = pos[i].out + cnt[i].out;
  85     }
  86 
  87     rin = (int*)calloc(2 * (pos[csize - 1].in +  cnt[csize - 1].in +
  88                             pos[csize - 1].out + cnt[csize - 1].out), sizeof(int));
  89     if( NULL == rin ) {
  90         err = OMPI_ERR_OUT_OF_RESOURCE;
  91         goto bail_out;
  92     }
  93     rout = &rin[2 * (pos[csize - 1].in +  cnt[csize - 1].in)];
  94 
  95     for( index = i = 0; i < n; ++i ) {  /* for each of the nodes */
  96         for( j = 0; j < degrees[i]; ++j ) {  /* for each node's degree */
  97             int position = pos[nodes[i]].out + idx[nodes[i]].out;
  98             if( MPI_UNWEIGHTED != weights ) {
  99                 position *= 2;
 100                 rout[position + 1] = weights[index];
 101             }
 102             rout[position + 0] = targets[index];
 103             idx[nodes[i]].out++;
 104 
 105             position = pos[targets[index]].in + idx[targets[index]].in;
 106             if( MPI_UNWEIGHTED != weights ) {
 107                 position *= 2;
 108                 rin[position + 1] = weights[index];
 109             }
 110             rin[position + 0] = nodes[i];
 111             idx[targets[index]].in++;
 112 
 113             index++;
 114         }
 115     }
 116 
 117     err = comm->c_coll->coll_reduce_scatter_block( MPI_IN_PLACE, idx, 2,
 118                                                   (ompi_datatype_t*)&ompi_mpi_int, MPI_SUM, comm,
 119                                                   comm->c_coll->coll_reduce_scatter_block_module);
 120     /**
 121      * At this point in the indexes array we have:
 122      * - idx[0].in  total number of IN  edges
 123      * - idx[0].out total number of OUT edges
 124      */
 125     topo = OBJ_NEW(mca_topo_base_comm_dist_graph_2_2_0_t);
 126     if( NULL == topo ) {
 127         err = OMPI_ERR_OUT_OF_RESOURCE;
 128         goto bail_out;
 129     }
 130     topo->indegree  = idx[0].in;
 131     topo->outdegree = idx[0].out;
 132     topo->weighted = (weights != MPI_UNWEIGHTED);
 133     if (topo->indegree > 0) {
 134         topo->in = (int*)malloc(sizeof(int) * topo->indegree);
 135         if (NULL == topo->in) {
 136             err = OMPI_ERR_OUT_OF_RESOURCE;
 137             goto bail_out;
 138         }
 139         if (MPI_UNWEIGHTED != weights) {
 140             topo->inw = (int*)malloc(sizeof(int) * topo->indegree);
 141             if (NULL == topo->inw) {
 142                 err = OMPI_ERR_OUT_OF_RESOURCE;
 143                 goto bail_out;
 144             }
 145         }
 146     }
 147     if (topo->outdegree > 0) {
 148         topo->out = (int*)malloc(sizeof(int) * topo->outdegree);
 149         if (NULL == topo->out) {
 150             err = OMPI_ERR_OUT_OF_RESOURCE;
 151             goto bail_out;
 152         }
 153         if (MPI_UNWEIGHTED != weights) {
 154             topo->outw = (int*)malloc(sizeof(int) * topo->outdegree);
 155             if (NULL == topo->outw) {
 156                 err = OMPI_ERR_OUT_OF_RESOURCE;
 157                 goto bail_out;
 158             }
 159         }
 160     }
 161 
 162     reqs = (ompi_request_t**)malloc(sizeof(ompi_request_t*) * 2 * csize);
 163     for (pending_reqs = i = 0; i < csize; ++i) {
 164         int position;
 165         if( 0 != (count = cnt[i].in) ) {
 166             position = pos[i].in;
 167             if (MPI_UNWEIGHTED != weights) {
 168                 count *= 2;  /* don't forget the weights */
 169                 position *= 2;
 170             }
 171             err = MCA_PML_CALL(isend( &rin[position], count, (ompi_datatype_t*)&ompi_mpi_int,
 172                                       i, MCA_TOPO_BASE_TAG_DIST_EDGE_IN, MCA_PML_BASE_SEND_STANDARD,
 173                                       comm, &reqs[pending_reqs]));
 174             pending_reqs++;
 175         }
 176         if( 0 != (count = cnt[i].out) ) {
 177             position = pos[i].out;
 178             if (MPI_UNWEIGHTED != weights) {
 179                 count *= 2;  /* don't forget the weights */
 180                 position *= 2;
 181             }
 182             err = MCA_PML_CALL(isend(&rout[position], count, (ompi_datatype_t*)&ompi_mpi_int,
 183                                      i, MCA_TOPO_BASE_TAG_DIST_EDGE_OUT, MCA_PML_BASE_SEND_STANDARD,
 184                                      comm, &reqs[pending_reqs]));
 185             pending_reqs++;
 186         }
 187     }
 188 
 189     /**
 190      * Now let's receive the input edges in a temporary array
 191      * and then move them to their corresponding place.
 192      */
 193     count = topo->indegree;
 194     temp = topo->in;
 195     if (MPI_UNWEIGHTED != weights) {
 196         count *= 2;  /* don't forget the weights */
 197         if (count > 0) {
 198             /* Allocate an array big enough to hold the edges and
 199                their weights */
 200             temp = (int*)malloc(count*sizeof(int));
 201             if (NULL == temp) {
 202                 err = OMPI_ERR_OUT_OF_RESOURCE;
 203                 goto bail_out;
 204             }
 205         }
 206     }
 207     for( left_over = count, current_pos = i = 0; left_over > 0; i++ ) {
 208 
 209         MCA_PML_CALL(recv( &temp[count - left_over], left_over, (ompi_datatype_t*)&ompi_mpi_int,  /* keep receiving in the same buffer */
 210                            MPI_ANY_SOURCE, MCA_TOPO_BASE_TAG_DIST_EDGE_IN,
 211                            comm, &status ));
 212         how_much = status._ucount / int_size;
 213         if (MPI_UNWEIGHTED != weights) {
 214             for( j = 0; j < ((int)how_much >> 1); j++, current_pos++ ) {
 215                 topo->in[current_pos]  = temp[2 * j + 0 + (count - left_over)];
 216                 topo->inw[current_pos] = temp[2 * j + 1 + (count - left_over)];
 217             }
 218         }
 219         left_over -= how_much;
 220     }
 221     if (MPI_UNWEIGHTED != weights) {
 222         free(temp);
 223     }
 224 
 225     /**
 226      * Now let's receive the output edges in a temporary array
 227      * and then move them to their corresponding place.
 228      */
 229     count = topo->outdegree;
 230     temp = topo->out;
 231     if (MPI_UNWEIGHTED != weights) {
 232         count *= 2;  /* don't forget the weights */
 233         if (count > 0) {
 234             /* Allocate an array big enough to hold the edges and
 235                their weights */
 236             temp = (int*)malloc(count*sizeof(int));
 237             if (NULL == temp) {
 238                 err = OMPI_ERR_OUT_OF_RESOURCE;
 239                 goto bail_out;
 240             }
 241         }
 242     }
 243     for( left_over = count, current_pos = i = 0; left_over > 0; i++ ) {
 244 
 245         MCA_PML_CALL(recv( &temp[count - left_over], left_over, (ompi_datatype_t*)&ompi_mpi_int,  /* keep receiving in the same buffer */
 246                            MPI_ANY_SOURCE, MCA_TOPO_BASE_TAG_DIST_EDGE_OUT,
 247                            comm, &status ));
 248         how_much = status._ucount / int_size;
 249 
 250         if (MPI_UNWEIGHTED != weights) {
 251             for( j = 0; j < ((int)how_much >> 1); j++, current_pos++ ) {
 252                 topo->out[current_pos]  = temp[2 * j + 0 + (count - left_over)];
 253                 topo->outw[current_pos] = temp[2 * j + 1 + (count - left_over)];
 254             }
 255         }
 256         left_over -= how_much;
 257     }
 258     if (MPI_UNWEIGHTED != weights) {
 259         free(temp);
 260     }
 261 
 262     err = ompi_request_wait_all(pending_reqs, reqs, MPI_STATUSES_IGNORE);
 263     *ptopo = topo;
 264     topo = NULL;  /* don't free it below */
 265 
 266  bail_out:
 267     if( NULL != reqs ) {
 268         free(reqs);
 269     }
 270     if( NULL != rin ) {
 271         free(rin);
 272     }
 273     if( NULL != cnt ) {
 274         free(cnt);
 275     }
 276     if( NULL != topo ) {
 277         OBJ_RELEASE(topo);
 278     }
 279     return err;
 280 }
 281 
 282 int mca_topo_base_dist_graph_create(mca_topo_base_module_t* module,
 283                                     ompi_communicator_t *comm_old,
 284                                     int n, const int nodes[],
 285                                     const int degrees[], const int targets[],
 286                                     const int weights[],
 287                                     opal_info_t *info, int reorder,
 288                                     ompi_communicator_t **newcomm)
 289 {
 290     int err;
 291 
 292     if( OMPI_SUCCESS != (err = ompi_comm_create(comm_old,
 293                                                 comm_old->c_local_group,
 294                                                 newcomm)) ) {
 295         OBJ_RELEASE(module);
 296         return err;
 297     }
 298     // But if there is an info object, the above call didn't make use
 299     // of it, so we'll do a dup-with-info to get the final comm and
 300     // free the above intermediate newcomm:
 301     if (info && info != &(MPI_INFO_NULL->super)) {
 302         ompi_communicator_t *intermediate_comm = *newcomm;
 303         ompi_comm_dup_with_info (intermediate_comm, info, newcomm);
 304         ompi_comm_free(&intermediate_comm);
 305     }
 306 
 307     assert(NULL == (*newcomm)->c_topo);
 308     (*newcomm)->c_topo             = module;
 309     (*newcomm)->c_topo->reorder    = reorder;
 310     (*newcomm)->c_flags           |= OMPI_COMM_DIST_GRAPH;
 311 
 312     err = mca_topo_base_dist_graph_distribute(module,
 313                                               *newcomm, 
 314                                               n, nodes,
 315                                               degrees, targets,
 316                                               weights,
 317                                               &((*newcomm)->c_topo->mtc.dist_graph));
 318     if( OMPI_SUCCESS != err ) {
 319         ompi_comm_free(newcomm);
 320     }
 321     return err;
 322 }
 323 
 324 static void mca_topo_base_comm_dist_graph_2_2_0_construct(mca_topo_base_comm_dist_graph_2_2_0_t * dist_graph) {
 325     dist_graph->in = NULL;
 326     dist_graph->inw = NULL;
 327     dist_graph->out = NULL;
 328     dist_graph->outw = NULL;
 329     dist_graph->indegree = 0;
 330     dist_graph->outdegree = 0;
 331     dist_graph->weighted = false;
 332 }
 333 
 334 static void mca_topo_base_comm_dist_graph_2_2_0_destruct(mca_topo_base_comm_dist_graph_2_2_0_t * dist_graph) {
 335     if (NULL != dist_graph->in) {
 336         free(dist_graph->in);
 337     }
 338     if (NULL != dist_graph->inw) {
 339         free(dist_graph->inw);
 340     }
 341     if (NULL != dist_graph->out) {
 342         free(dist_graph->out);
 343     }
 344     if (NULL != dist_graph->outw) {
 345         free(dist_graph->outw);
 346     }
 347 }
 348 
 349 OBJ_CLASS_INSTANCE(mca_topo_base_comm_dist_graph_2_2_0_t, opal_object_t,
 350                    mca_topo_base_comm_dist_graph_2_2_0_construct,
 351                    mca_topo_base_comm_dist_graph_2_2_0_destruct);

/* [<][>][^][v][top][bottom][index][help] */