root/ompi/mca/common/monitoring/common_monitoring_coll.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_common_monitoring_coll_cache_name
  2. mca_common_monitoring_coll_cache
  3. mca_common_monitoring_coll_new
  4. mca_common_monitoring_coll_release
  5. mca_common_monitoring_coll_cond_release
  6. mca_common_monitoring_coll_finalize
  7. mca_common_monitoring_coll_flush
  8. mca_common_monitoring_coll_flush_all
  9. mca_common_monitoring_coll_reset
  10. mca_common_monitoring_coll_messages_notify
  11. mca_common_monitoring_coll_o2a
  12. mca_common_monitoring_coll_get_o2a_count
  13. mca_common_monitoring_coll_get_o2a_size
  14. mca_common_monitoring_coll_a2o
  15. mca_common_monitoring_coll_get_a2o_count
  16. mca_common_monitoring_coll_get_a2o_size
  17. mca_common_monitoring_coll_a2a
  18. mca_common_monitoring_coll_get_a2a_count
  19. mca_common_monitoring_coll_get_a2a_size
  20. mca_monitoring_coll_construct
  21. mca_monitoring_coll_destruct

   1 /*
   2  * Copyright (c) 2013-2016 The University of Tennessee and The University
   3  *                         of Tennessee Research Foundation.  All rights
   4  *                         reserved.
   5  * Copyright (c) 2013-2018 Inria.  All rights reserved.
   6  * Copyright (c) 2015      Bull SAS.  All rights reserved.
   7  * Copyright (c) 2016-2017 Research Organization for Information Science
   8  *                         and Technology (RIST). All rights reserved.
   9  * $COPYRIGHT$
  10  *
  11  * Additional copyrights may follow
  12  *
  13  * $HEADER$
  14  */
  15 
  16 #include <ompi_config.h>
  17 #include "common_monitoring.h"
  18 #include "common_monitoring_coll.h"
  19 #include <ompi/constants.h>
  20 #include <ompi/communicator/communicator.h>
  21 #include <opal/mca/base/mca_base_component_repository.h>
  22 #include <opal/class/opal_hash_table.h>
  23 #include <assert.h>
  24 
  25 /*** Monitoring specific variables ***/
  26 struct mca_monitoring_coll_data_t {
  27     opal_object_t super;
  28     char*procs;
  29     char*comm_name;
  30     int world_rank;
  31     int is_released;
  32     ompi_communicator_t*p_comm;
  33     opal_atomic_size_t o2a_count;
  34     opal_atomic_size_t o2a_size;
  35     opal_atomic_size_t a2o_count;
  36     opal_atomic_size_t a2o_size;
  37     opal_atomic_size_t a2a_count;
  38     opal_atomic_size_t a2a_size;
  39 };
  40 
  41 /* Collectives operation monitoring */
  42 static opal_hash_table_t *comm_data = NULL;
  43 
  44 int mca_common_monitoring_coll_cache_name(ompi_communicator_t*comm)
  45 {
  46     mca_monitoring_coll_data_t*data;
  47     int ret = opal_hash_table_get_value_uint64(comm_data, *((uint64_t*)&comm), (void*)&data);
  48     if( OPAL_SUCCESS == ret ) {
  49         data->comm_name = strdup(comm->c_name);
  50         data->p_comm = NULL;
  51     }
  52     return ret;
  53 }
  54 
  55 static inline void mca_common_monitoring_coll_cache(mca_monitoring_coll_data_t*data)
  56 {
  57     if( -1 == data->world_rank ) {
  58         /* Get current process world_rank */
  59         mca_common_monitoring_get_world_rank(ompi_comm_rank(data->p_comm),
  60                                              data->p_comm->c_remote_group,
  61                                              &data->world_rank);
  62     }
  63     /* Only list procs if the hashtable is already initialized,
  64        i.e. if the previous call worked */
  65     if( (-1 != data->world_rank) && (NULL == data->procs || 0 == strlen(data->procs)) ) {
  66         int i, pos = 0, size, world_size = -1, max_length, world_rank;
  67         char*tmp_procs;
  68         size = ompi_comm_size(data->p_comm);
  69         world_size = ompi_comm_size((ompi_communicator_t*)&ompi_mpi_comm_world) - 1;
  70         assert( 0 < size );
  71         /* Allocate enough space for list (add 1 to keep the final '\0' if already exact size) */
  72         max_length = snprintf(NULL, 0, "%d,", world_size - 1) + 1;
  73         tmp_procs = malloc((1 + max_length * size) * sizeof(char));
  74         if( NULL == tmp_procs ) {
  75             OPAL_MONITORING_PRINT_ERR("Cannot allocate memory for caching proc list.");
  76         } else {
  77             tmp_procs[0] = '\0';
  78             /* Build procs list */
  79             for(i = 0; i < size; ++i) {
  80                 if( OPAL_SUCCESS == mca_common_monitoring_get_world_rank(i, data->p_comm->c_remote_group, &world_rank) )
  81                     pos += sprintf(&tmp_procs[pos], "%d,", world_rank);
  82             }
  83             tmp_procs[pos - 1] = '\0'; /* Remove final coma */
  84             data->procs = realloc(tmp_procs, pos * sizeof(char)); /* Adjust to size required */
  85         }
  86     }
  87 }
  88 
  89 mca_monitoring_coll_data_t*mca_common_monitoring_coll_new( ompi_communicator_t*comm )
  90 {
  91     mca_monitoring_coll_data_t*data = OBJ_NEW(mca_monitoring_coll_data_t);
  92     if( NULL == data ) {
  93         OPAL_MONITORING_PRINT_ERR("coll: new: data structure cannot be allocated");
  94         return NULL;
  95     }
  96 
  97     data->p_comm      = comm;
  98 
  99     /* Allocate hashtable */
 100     if( NULL == comm_data ) {
 101         comm_data = OBJ_NEW(opal_hash_table_t);
 102         if( NULL == comm_data ) {
 103             OPAL_MONITORING_PRINT_ERR("coll: new: failed to allocate hashtable");
 104             return data;
 105         }
 106         opal_hash_table_init(comm_data, 2048);
 107     }
 108 
 109     /* Insert in hashtable */
 110     uint64_t key = *((uint64_t*)&comm);
 111     if( OPAL_SUCCESS != opal_hash_table_set_value_uint64(comm_data, key, (void*)data) ) {
 112         OPAL_MONITORING_PRINT_ERR("coll: new: failed to allocate memory or "
 113                                   "growing the hash table");
 114     }
 115 
 116     /* Cache data so the procs can be released without affecting the output */
 117     mca_common_monitoring_coll_cache(data);
 118 
 119     return data;
 120 }
 121 
 122 void mca_common_monitoring_coll_release(mca_monitoring_coll_data_t*data)
 123 {
 124 #if OPAL_ENABLE_DEBUG
 125     if( NULL == data ) {
 126         OPAL_MONITORING_PRINT_ERR("coll: release: data structure empty or already desallocated");
 127         return;
 128     }
 129 #endif /* OPAL_ENABLE_DEBUG */
 130 
 131     /* not flushed yet */
 132     data->is_released = 1;
 133     mca_common_monitoring_coll_cache(data);
 134 }
 135 
 136 static void mca_common_monitoring_coll_cond_release(mca_monitoring_coll_data_t*data)
 137 {
 138 #if OPAL_ENABLE_DEBUG
 139     if( NULL == data ) {
 140         OPAL_MONITORING_PRINT_ERR("coll: release: data structure empty or already desallocated");
 141         return;
 142     }
 143 #endif /* OPAL_ENABLE_DEBUG */
 144 
 145     if( data->is_released ) { /* if the communicator is already released */
 146         opal_hash_table_remove_value_uint64(comm_data, *((uint64_t*)&data->p_comm));
 147         data->p_comm = NULL;
 148         free(data->comm_name);
 149         free(data->procs);
 150         OBJ_RELEASE(data);
 151     }
 152 }
 153 
 154 void mca_common_monitoring_coll_finalize( void )
 155 {
 156     if( NULL != comm_data ) {
 157         opal_hash_table_remove_all( comm_data );
 158         OBJ_RELEASE(comm_data);
 159     }
 160 }
 161 
 162 void mca_common_monitoring_coll_flush(FILE *pf, mca_monitoring_coll_data_t*data)
 163 {
 164     /* Flush data */
 165     fprintf(pf,
 166             "D\t%s\tprocs: %s\n"
 167             "O2A\t%" PRId32 "\t%zu bytes\t%zu msgs sent\n"
 168             "A2O\t%" PRId32 "\t%zu bytes\t%zu msgs sent\n"
 169             "A2A\t%" PRId32 "\t%zu bytes\t%zu msgs sent\n",
 170             data->comm_name ? data->comm_name : data->p_comm ?
 171             data->p_comm->c_name : "(no-name)", data->procs,
 172             data->world_rank, data->o2a_size, data->o2a_count,
 173             data->world_rank, data->a2o_size, data->a2o_count,
 174             data->world_rank, data->a2a_size, data->a2a_count);
 175 }
 176 
 177 void mca_common_monitoring_coll_flush_all(FILE *pf)
 178 {
 179     if( NULL == comm_data ) return; /* No hashtable */
 180 
 181     uint64_t key;
 182     mca_monitoring_coll_data_t*previous = NULL, *data;
 183 
 184     OPAL_HASH_TABLE_FOREACH(key, uint64, data, comm_data) {
 185         if( NULL != previous && NULL == previous->p_comm ) {
 186             /* Phase flushed -> free already released once coll_data_t */
 187             mca_common_monitoring_coll_cond_release(previous);
 188         }
 189         mca_common_monitoring_coll_flush(pf, data);
 190         previous = data;
 191     }
 192     mca_common_monitoring_coll_cond_release(previous);
 193 }
 194 
 195 
 196 void mca_common_monitoring_coll_reset(void)
 197 {
 198     if( NULL == comm_data ) return; /* No hashtable */
 199 
 200     uint64_t key;
 201     mca_monitoring_coll_data_t*data;
 202 
 203     OPAL_HASH_TABLE_FOREACH(key, uint64, data, comm_data) {
 204         data->o2a_count = 0; data->o2a_size  = 0;
 205         data->a2o_count = 0; data->a2o_size  = 0;
 206         data->a2a_count = 0; data->a2a_size  = 0;
 207     }
 208 }
 209 
 210 int mca_common_monitoring_coll_messages_notify(mca_base_pvar_t *pvar,
 211                                                mca_base_pvar_event_t event,
 212                                                void *obj_handle,
 213                                                int *count)
 214 {
 215     switch (event) {
 216     case MCA_BASE_PVAR_HANDLE_BIND:
 217         *count = 1;
 218     case MCA_BASE_PVAR_HANDLE_UNBIND:
 219         return OMPI_SUCCESS;
 220     case MCA_BASE_PVAR_HANDLE_START:
 221         mca_common_monitoring_current_state = mca_common_monitoring_enabled;
 222         return OMPI_SUCCESS;
 223     case MCA_BASE_PVAR_HANDLE_STOP:
 224         mca_common_monitoring_current_state = 0;
 225         return OMPI_SUCCESS;
 226     }
 227 
 228     return OMPI_ERROR;
 229 }
 230 
 231 void mca_common_monitoring_coll_o2a(size_t size, mca_monitoring_coll_data_t*data)
 232 {
 233     if( 0 == mca_common_monitoring_current_state ) return; /* right now the monitoring is not started */
 234 #if OPAL_ENABLE_DEBUG
 235     if( NULL == data ) {
 236         OPAL_MONITORING_PRINT_ERR("coll: o2a: data structure empty");
 237         return;
 238     }
 239 #endif /* OPAL_ENABLE_DEBUG */
 240     opal_atomic_add_fetch_size_t(&data->o2a_size, size);
 241     opal_atomic_add_fetch_size_t(&data->o2a_count, 1);
 242 }
 243 
 244 int mca_common_monitoring_coll_get_o2a_count(const struct mca_base_pvar_t *pvar,
 245                                              void *value,
 246                                              void *obj_handle)
 247 {
 248     ompi_communicator_t *comm = (ompi_communicator_t *) obj_handle;
 249     size_t *value_size = (size_t*) value;
 250     mca_monitoring_coll_data_t*data;
 251     int ret = opal_hash_table_get_value_uint64(comm_data, *((uint64_t*)&comm), (void*)&data);
 252     if( OPAL_SUCCESS == ret ) {
 253         *value_size = data->o2a_count;
 254     }
 255     return ret;
 256 }
 257 
 258 int mca_common_monitoring_coll_get_o2a_size(const struct mca_base_pvar_t *pvar,
 259                                             void *value,
 260                                             void *obj_handle)
 261 {
 262     ompi_communicator_t *comm = (ompi_communicator_t *) obj_handle;
 263     size_t *value_size = (size_t*) value;
 264     mca_monitoring_coll_data_t*data;
 265     int ret = opal_hash_table_get_value_uint64(comm_data, *((uint64_t*)&comm), (void*)&data);
 266     if( OPAL_SUCCESS == ret ) {
 267         *value_size = data->o2a_size;
 268     }
 269     return ret;
 270 }
 271 
 272 void mca_common_monitoring_coll_a2o(size_t size, mca_monitoring_coll_data_t*data)
 273 {
 274     if( 0 == mca_common_monitoring_current_state ) return; /* right now the monitoring is not started */
 275 #if OPAL_ENABLE_DEBUG
 276     if( NULL == data ) {
 277         OPAL_MONITORING_PRINT_ERR("coll: a2o: data structure empty");
 278         return;
 279     }
 280 #endif /* OPAL_ENABLE_DEBUG */
 281     opal_atomic_add_fetch_size_t(&data->a2o_size, size);
 282     opal_atomic_add_fetch_size_t(&data->a2o_count, 1);
 283 }
 284 
 285 int mca_common_monitoring_coll_get_a2o_count(const struct mca_base_pvar_t *pvar,
 286                                              void *value,
 287                                              void *obj_handle)
 288 {
 289     ompi_communicator_t *comm = (ompi_communicator_t *) obj_handle;
 290     size_t *value_size = (size_t*) value;
 291     mca_monitoring_coll_data_t*data;
 292     int ret = opal_hash_table_get_value_uint64(comm_data, *((uint64_t*)&comm), (void*)&data);
 293     if( OPAL_SUCCESS == ret ) {
 294         *value_size = data->a2o_count;
 295     }
 296     return ret;
 297 }
 298 
 299 int mca_common_monitoring_coll_get_a2o_size(const struct mca_base_pvar_t *pvar,
 300                                             void *value,
 301                                             void *obj_handle)
 302 {
 303     ompi_communicator_t *comm = (ompi_communicator_t *) obj_handle;
 304     size_t *value_size = (size_t*) value;
 305     mca_monitoring_coll_data_t*data;
 306     int ret = opal_hash_table_get_value_uint64(comm_data, *((uint64_t*)&comm), (void*)&data);
 307     if( OPAL_SUCCESS == ret ) {
 308         *value_size = data->a2o_size;
 309     }
 310     return ret;
 311 }
 312 
 313 void mca_common_monitoring_coll_a2a(size_t size, mca_monitoring_coll_data_t*data)
 314 {
 315     if( 0 == mca_common_monitoring_current_state ) return; /* right now the monitoring is not started */
 316 #if OPAL_ENABLE_DEBUG
 317     if( NULL == data ) {
 318         OPAL_MONITORING_PRINT_ERR("coll: a2a: data structure empty");
 319         return;
 320     }
 321 #endif /* OPAL_ENABLE_DEBUG */
 322     opal_atomic_add_fetch_size_t(&data->a2a_size, size);
 323     opal_atomic_add_fetch_size_t(&data->a2a_count, 1);
 324 }
 325 
 326 int mca_common_monitoring_coll_get_a2a_count(const struct mca_base_pvar_t *pvar,
 327                                              void *value,
 328                                              void *obj_handle)
 329 {
 330     ompi_communicator_t *comm = (ompi_communicator_t *) obj_handle;
 331     size_t *value_size = (size_t*) value;
 332     mca_monitoring_coll_data_t*data;
 333     int ret = opal_hash_table_get_value_uint64(comm_data, *((uint64_t*)&comm), (void*)&data);
 334     if( OPAL_SUCCESS == ret ) {
 335         *value_size = data->a2a_count;
 336     }
 337     return ret;
 338 }
 339 
 340 int mca_common_monitoring_coll_get_a2a_size(const struct mca_base_pvar_t *pvar,
 341                                             void *value,
 342                                             void *obj_handle)
 343 {
 344     ompi_communicator_t *comm = (ompi_communicator_t *) obj_handle;
 345     size_t *value_size = (size_t*) value;
 346     mca_monitoring_coll_data_t*data;
 347     int ret = opal_hash_table_get_value_uint64(comm_data, *((uint64_t*)&comm), (void*)&data);
 348     if( OPAL_SUCCESS == ret ) {
 349         *value_size = data->a2a_size;
 350     }
 351     return ret;
 352 }
 353 
 354 static void mca_monitoring_coll_construct (mca_monitoring_coll_data_t*coll_data)
 355 {
 356     coll_data->procs       = NULL;
 357     coll_data->comm_name   = NULL;
 358     coll_data->world_rank  = -1;
 359     coll_data->p_comm      = NULL;
 360     coll_data->is_released = 0;
 361     coll_data->o2a_count   = 0;
 362     coll_data->o2a_size    = 0;
 363     coll_data->a2o_count   = 0;
 364     coll_data->a2o_size    = 0;
 365     coll_data->a2a_count   = 0;
 366     coll_data->a2a_size    = 0;
 367 }
 368 
 369 static void mca_monitoring_coll_destruct (mca_monitoring_coll_data_t*coll_data){}
 370 
 371 OBJ_CLASS_INSTANCE(mca_monitoring_coll_data_t, opal_object_t, mca_monitoring_coll_construct, mca_monitoring_coll_destruct);

/* [<][>][^][v][top][bottom][index][help] */