root/ompi/mca/common/monitoring/common_monitoring.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. mca_common_monitoring_set_flush
  2. mca_common_monitoring_get_flush
  3. mca_common_monitoring_notify_flush
  4. mca_common_monitoring_comm_size_notify
  5. mca_common_monitoring_init
  6. mca_common_monitoring_finalize
  7. mca_common_monitoring_register
  8. mca_common_monitoring_add_procs
  9. mca_common_monitoring_reset
  10. mca_common_monitoring_record_pml
  11. mca_common_monitoring_get_pml_count
  12. mca_common_monitoring_get_pml_size
  13. mca_common_monitoring_record_osc
  14. mca_common_monitoring_get_osc_sent_count
  15. mca_common_monitoring_get_osc_sent_size
  16. mca_common_monitoring_get_osc_recv_count
  17. mca_common_monitoring_get_osc_recv_size
  18. mca_common_monitoring_record_coll
  19. mca_common_monitoring_get_coll_count
  20. mca_common_monitoring_get_coll_size
  21. mca_common_monitoring_output
  22. mca_common_monitoring_flush

   1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
   2 /*
   3  * Copyright (c) 2013-2017 The University of Tennessee and The University
   4  *                         of Tennessee Research Foundation.  All rights
   5  *                         reserved.
   6  * Copyright (c) 2013-2017 Inria.  All rights reserved.
   7  * Copyright (c) 2015      Bull SAS.  All rights reserved.
   8  * Copyright (c) 2016-2017 Research Organization for Information Science
   9  *                         and Technology (RIST). All rights reserved.
  10  * Copyright (c) 2017-2018 Los Alamos National Security, LLC. All rights
  11  *                         reserved.
  12  * Copyright (c) 2018      Amazon.com, Inc. or its affiliates.  All Rights reserved.
  13  * $COPYRIGHT$
  14  *
  15  * Additional copyrights may follow
  16  *
  17  * $HEADER$
  18  */
  19 
  20 #include <ompi_config.h>
  21 #include "common_monitoring.h"
  22 #include "common_monitoring_coll.h"
  23 #include <ompi/constants.h>
  24 #include <ompi/communicator/communicator.h>
  25 #include <opal/mca/base/mca_base_component_repository.h>
  26 #include <opal/class/opal_hash_table.h>
  27 #include <opal/util/output.h>
  28 #include "opal/util/printf.h"
  29 #include <math.h>
  30 
  31 #if SIZEOF_LONG_LONG == SIZEOF_SIZE_T
  32 #define MCA_MONITORING_VAR_TYPE MCA_BASE_VAR_TYPE_UNSIGNED_LONG_LONG
  33 #elif SIZEOF_LONG == SIZEOF_SIZE_T
  34 #define MCA_MONITORING_VAR_TYPE MCA_BASE_VAR_TYPE_UNSIGNED_LONG
  35 #endif
  36 
  37 /*** Monitoring specific variables ***/
  38 /* Keep tracks of how many components are currently using the common part */
  39 static opal_atomic_int32_t mca_common_monitoring_hold = 0;
  40 /* Output parameters */
  41 int mca_common_monitoring_output_stream_id = -1;
  42 static opal_output_stream_t mca_common_monitoring_output_stream_obj = {
  43     .lds_verbose_level = 0,
  44     .lds_want_syslog = false,
  45     .lds_prefix = NULL,
  46     .lds_suffix = NULL,
  47     .lds_is_debugging = true,
  48     .lds_want_stdout = false,
  49     .lds_want_stderr = true,
  50     .lds_want_file = false,
  51     .lds_want_file_append = false,
  52     .lds_file_suffix = NULL
  53 };
  54 
  55 /*** MCA params to mark the monitoring as enabled. ***/
  56 /* This signals that the monitoring will highjack the PML, OSC and COLL */
  57 int mca_common_monitoring_enabled = 0;
  58 int mca_common_monitoring_current_state = 0;
  59 /* Signals there will be an output of the monitored data at component close */
  60 static int mca_common_monitoring_output_enabled = 0;
  61 /* File where to output the monitored data */
  62 static char* mca_common_monitoring_initial_filename = "";
  63 static char* mca_common_monitoring_current_filename = NULL;
  64 
  65 /* array for stroring monitoring data*/
  66 static opal_atomic_size_t* pml_data = NULL;
  67 static opal_atomic_size_t* pml_count = NULL;
  68 static opal_atomic_size_t* filtered_pml_data = NULL;
  69 static opal_atomic_size_t* filtered_pml_count = NULL;
  70 static opal_atomic_size_t* osc_data_s = NULL;
  71 static opal_atomic_size_t* osc_count_s = NULL;
  72 static opal_atomic_size_t* osc_data_r = NULL;
  73 static opal_atomic_size_t* osc_count_r = NULL;
  74 static opal_atomic_size_t* coll_data = NULL;
  75 static opal_atomic_size_t* coll_count = NULL;
  76 
  77 static opal_atomic_size_t* size_histogram = NULL;
  78 static const int max_size_histogram = 66;
  79 static double log10_2 = 0.;
  80 
  81 static int rank_world = -1;
  82 static int nprocs_world = 0;
  83 
  84 opal_hash_table_t *common_monitoring_translation_ht = NULL;
  85 
  86 /* Reset all the monitoring arrays */
  87 static void mca_common_monitoring_reset ( void );
  88 
  89 /* Flushes the monitored data and reset the values */
  90 static int mca_common_monitoring_flush (int fd, char* filename);
  91 
  92 /* Retreive the PML recorded count of messages sent */
  93 static int mca_common_monitoring_get_pml_count (const struct mca_base_pvar_t *pvar,
  94                                                 void *value, void *obj_handle);
  95 
  96 /* Retreive the PML recorded amount of data sent */
  97 static int mca_common_monitoring_get_pml_size (const struct mca_base_pvar_t *pvar,
  98                                                void *value, void *obj_handle);
  99 
 100 /* Retreive the OSC recorded count of messages sent */
 101 static int mca_common_monitoring_get_osc_sent_count (const struct mca_base_pvar_t *pvar,
 102                                                      void *value, void *obj_handle);
 103 
 104 /* Retreive the OSC recorded amount of data sent */
 105 static int mca_common_monitoring_get_osc_sent_size (const struct mca_base_pvar_t *pvar,
 106                                                     void *value, void *obj_handle);
 107 
 108 /* Retreive the OSC recorded count of messages received */
 109 static int mca_common_monitoring_get_osc_recv_count (const struct mca_base_pvar_t *pvar,
 110                                                      void *value, void *obj_handle);
 111 
 112 /* Retreive the OSC recorded amount of data received */
 113 static int mca_common_monitoring_get_osc_recv_size (const struct mca_base_pvar_t *pvar,
 114                                                     void *value, void *obj_handle);
 115 
 116 /* Retreive the COLL recorded count of messages sent */
 117 static int mca_common_monitoring_get_coll_count (const struct mca_base_pvar_t *pvar,
 118                                                  void *value, void *obj_handle);
 119 
 120 /* Retreive the COLL recorded amount of data sent */
 121 static int mca_common_monitoring_get_coll_size (const struct mca_base_pvar_t *pvar,
 122                                                 void *value, void *obj_handle);
 123 
 124 /* Set the filename where to output the monitored data */
 125 static int mca_common_monitoring_set_flush(struct mca_base_pvar_t *pvar,
 126                                            const void *value, void *obj);
 127 
 128 /* Does nothing, as the pml_monitoring_flush pvar has no point to be read */
 129 static int mca_common_monitoring_get_flush(const struct mca_base_pvar_t *pvar,
 130                                            void *value, void *obj);
 131 
 132 /* pml_monitoring_count, pml_monitoring_size,
 133    osc_monitoring_sent_count, osc_monitoring sent_size,
 134    osc_monitoring_recv_size and osc_monitoring_recv_count pvar notify
 135    function */
 136 static int mca_common_monitoring_comm_size_notify(mca_base_pvar_t *pvar,
 137                                                   mca_base_pvar_event_t event,
 138                                                   void *obj_handle, int *count);
 139 
 140 /* pml_monitoring_flush pvar notify function */
 141 static int mca_common_monitoring_notify_flush(struct mca_base_pvar_t *pvar,
 142                                               mca_base_pvar_event_t event,
 143                                               void *obj, int *count);
 144 
 145 static int mca_common_monitoring_set_flush(struct mca_base_pvar_t *pvar,
 146                                            const void *value, void *obj)
 147 {
 148     if( NULL != mca_common_monitoring_current_filename ) {
 149         free(mca_common_monitoring_current_filename);
 150     }
 151     if( NULL == *(char**)value || 0 == strlen((char*)value) ) {  /* No more output */
 152         mca_common_monitoring_current_filename = NULL;
 153     } else {
 154         mca_common_monitoring_current_filename = strdup((char*)value);
 155         if( NULL == mca_common_monitoring_current_filename )
 156             return OMPI_ERROR;
 157     }
 158     return OMPI_SUCCESS;
 159 }
 160 
 161 static int mca_common_monitoring_get_flush(const struct mca_base_pvar_t *pvar,
 162                                            void *value, void *obj)
 163 {
 164     return OMPI_SUCCESS;
 165 }
 166 
 167 static int mca_common_monitoring_notify_flush(struct mca_base_pvar_t *pvar,
 168                                               mca_base_pvar_event_t event,
 169                                               void *obj, int *count)
 170 {
 171     switch (event) {
 172     case MCA_BASE_PVAR_HANDLE_BIND:
 173         mca_common_monitoring_reset();
 174         *count = (NULL == mca_common_monitoring_current_filename
 175                   ? 0 : strlen(mca_common_monitoring_current_filename));
 176     case MCA_BASE_PVAR_HANDLE_UNBIND:
 177         return OMPI_SUCCESS;
 178     case MCA_BASE_PVAR_HANDLE_START:
 179         mca_common_monitoring_current_state = mca_common_monitoring_enabled;
 180         mca_common_monitoring_output_enabled = 0;  /* we can't control the monitoring via MPIT and
 181                                                     * expect accurate answer upon MPI_Finalize. */
 182         return OMPI_SUCCESS;
 183     case MCA_BASE_PVAR_HANDLE_STOP:
 184         return mca_common_monitoring_flush(3, mca_common_monitoring_current_filename);
 185     }
 186     return OMPI_ERROR;
 187 }
 188 
 189 static int mca_common_monitoring_comm_size_notify(mca_base_pvar_t *pvar,
 190                                                   mca_base_pvar_event_t event,
 191                                                   void *obj_handle,
 192                                                   int *count)
 193 {
 194     switch (event) {
 195     case MCA_BASE_PVAR_HANDLE_BIND:
 196         /* Return the size of the communicator as the number of values */
 197         *count = ompi_comm_size ((ompi_communicator_t *) obj_handle);
 198     case MCA_BASE_PVAR_HANDLE_UNBIND:
 199         return OMPI_SUCCESS;
 200     case MCA_BASE_PVAR_HANDLE_START:
 201         mca_common_monitoring_current_state = mca_common_monitoring_enabled;
 202         return OMPI_SUCCESS;
 203     case MCA_BASE_PVAR_HANDLE_STOP:
 204         mca_common_monitoring_current_state = 0;
 205         return OMPI_SUCCESS;
 206     }
 207 
 208     return OMPI_ERROR;
 209 }
 210 
 211 int mca_common_monitoring_init( void )
 212 {
 213     if( !mca_common_monitoring_enabled ) return OMPI_ERROR;
 214     if( 1 < opal_atomic_add_fetch_32(&mca_common_monitoring_hold, 1) ) return OMPI_SUCCESS; /* Already initialized */
 215 
 216     char hostname[OPAL_MAXHOSTNAMELEN] = "NA";
 217     /* Initialize constant */
 218     log10_2 = log10(2.);
 219     /* Open the opal_output stream */
 220     gethostname(hostname, sizeof(hostname));
 221     opal_asprintf(&mca_common_monitoring_output_stream_obj.lds_prefix,
 222              "[%s:%06d] monitoring: ", hostname, getpid());
 223     mca_common_monitoring_output_stream_id =
 224         opal_output_open(&mca_common_monitoring_output_stream_obj);
 225     /* Initialize proc translation hashtable */
 226     common_monitoring_translation_ht = OBJ_NEW(opal_hash_table_t);
 227     opal_hash_table_init(common_monitoring_translation_ht, 2048);
 228     return OMPI_SUCCESS;
 229 }
 230 
 231 void mca_common_monitoring_finalize( void )
 232 {
 233     if( ! mca_common_monitoring_enabled || /* Don't release if not last */
 234         0 < opal_atomic_sub_fetch_32(&mca_common_monitoring_hold, 1) ) return;
 235     
 236     OPAL_MONITORING_PRINT_INFO("common_component_finish");
 237     /* Dump monitoring informations */
 238     mca_common_monitoring_flush(mca_common_monitoring_output_enabled,
 239                                 mca_common_monitoring_current_filename);
 240     /* Disable all monitoring */
 241     mca_common_monitoring_enabled = 0;
 242     /* Close the opal_output stream */
 243     opal_output_close(mca_common_monitoring_output_stream_id);
 244     free(mca_common_monitoring_output_stream_obj.lds_prefix);
 245     /* Free internal data structure */
 246     free((void *) pml_data);  /* a single allocation */
 247     opal_hash_table_remove_all( common_monitoring_translation_ht );
 248     OBJ_RELEASE(common_monitoring_translation_ht);
 249     mca_common_monitoring_coll_finalize();
 250     if( NULL != mca_common_monitoring_current_filename ) {
 251         free(mca_common_monitoring_current_filename);
 252         mca_common_monitoring_current_filename = NULL;
 253     }
 254 }
 255 
 256 void mca_common_monitoring_register(void*pml_monitoring_component)
 257 {
 258     /* Because we are playing tricks with the component close, we should not
 259      * use mca_base_component_var_register but instead stay with the basic
 260      * version mca_base_var_register.
 261      */
 262     (void)mca_base_var_register("ompi", "pml", "monitoring", "enable",
 263                                 "Enable the monitoring at the PML level. A value of 0 "
 264                                 "will disable the monitoring (default). A value of 1 will "
 265                                 "aggregate all monitoring information (point-to-point and "
 266                                 "collective). Any other value will enable filtered monitoring",
 267                                 MCA_BASE_VAR_TYPE_INT, NULL, MPI_T_BIND_NO_OBJECT,
 268                                 MCA_BASE_VAR_FLAG_DWG, OPAL_INFO_LVL_4,
 269                                 MCA_BASE_VAR_SCOPE_READONLY,
 270                                 &mca_common_monitoring_enabled);
 271 
 272     mca_common_monitoring_current_state = mca_common_monitoring_enabled;
 273     
 274     (void)mca_base_var_register("ompi", "pml", "monitoring", "enable_output",
 275                                 "Enable the PML monitoring textual output at MPI_Finalize "
 276                                 "(it will be automatically turned off when MPIT is used to "
 277                                 "monitor communications). This value should be different "
 278                                 "than 0 in order for the output to be enabled (default disable)",
 279                                 MCA_BASE_VAR_TYPE_INT, NULL, MPI_T_BIND_NO_OBJECT,
 280                                 MCA_BASE_VAR_FLAG_DWG, OPAL_INFO_LVL_9,
 281                                 MCA_BASE_VAR_SCOPE_READONLY,
 282                                 &mca_common_monitoring_output_enabled);
 283     
 284     (void)mca_base_var_register("ompi", "pml", "monitoring", "filename",
 285                                 /*&mca_common_monitoring_component.pmlm_version, "filename",*/
 286                                 "The name of the file where the monitoring information "
 287                                 "should be saved (the filename will be extended with the "
 288                                 "process rank and the \".prof\" extension). If this field "
 289                                 "is NULL the monitoring will not be saved.",
 290                                 MCA_BASE_VAR_TYPE_STRING, NULL, MPI_T_BIND_NO_OBJECT,
 291                                 MCA_BASE_VAR_FLAG_DWG, OPAL_INFO_LVL_9,
 292                                 MCA_BASE_VAR_SCOPE_READONLY,
 293                                 &mca_common_monitoring_initial_filename);
 294 
 295     /* Now that the MCA variables are automatically unregistered when
 296      * their component close, we need to keep a safe copy of the
 297      * filename.  
 298      * Keep the copy completely separated in order to let the initial
 299      * filename to be handled by the framework. It's easier to deal
 300      * with the string lifetime.
 301      */
 302     if( NULL != mca_common_monitoring_initial_filename )
 303         mca_common_monitoring_current_filename = strdup(mca_common_monitoring_initial_filename);
 304 
 305     /* Register PVARs */
 306 
 307     /* PML PVARs */
 308     (void)mca_base_pvar_register("ompi", "pml", "monitoring", "flush", "Flush the monitoring "
 309                                  "information in the provided file. The filename is append with "
 310                                  "the .%d.prof suffix, where %d is replaced with the processus "
 311                                  "rank in MPI_COMM_WORLD.",
 312                                  OPAL_INFO_LVL_1, MCA_BASE_PVAR_CLASS_GENERIC,
 313                                  MCA_BASE_VAR_TYPE_STRING, NULL, MPI_T_BIND_NO_OBJECT, MCA_BASE_PVAR_FLAG_IWG,
 314                                  mca_common_monitoring_get_flush, mca_common_monitoring_set_flush,
 315                                  mca_common_monitoring_notify_flush, NULL);
 316 
 317     (void)mca_base_pvar_register("ompi", "pml", "monitoring", "messages_count", "Number of "
 318                                  "messages sent to each peer through the PML framework.",
 319                                  OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE,
 320                                  MCA_MONITORING_VAR_TYPE, NULL, MPI_T_BIND_MPI_COMM,
 321                                  MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_IWG,
 322                                  mca_common_monitoring_get_pml_count, NULL,
 323                                  mca_common_monitoring_comm_size_notify, NULL);
 324 
 325     (void)mca_base_pvar_register("ompi", "pml", "monitoring", "messages_size", "Size of messages "
 326                                  "sent to each peer in a communicator through the PML framework.",
 327                                  OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE,
 328                                  MCA_MONITORING_VAR_TYPE, NULL, MPI_T_BIND_MPI_COMM,
 329                                  MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_IWG,
 330                                  mca_common_monitoring_get_pml_size, NULL,
 331                                  mca_common_monitoring_comm_size_notify, NULL);
 332 
 333     /* OSC PVARs */
 334     (void)mca_base_pvar_register("ompi", "osc", "monitoring", "messages_sent_count", "Number of "
 335                                  "messages sent through the OSC framework with each peer.",
 336                                  OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE,
 337                                  MCA_MONITORING_VAR_TYPE, NULL, MPI_T_BIND_MPI_COMM,
 338                                  MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_IWG,
 339                                  mca_common_monitoring_get_osc_sent_count, NULL,
 340                                  mca_common_monitoring_comm_size_notify, NULL);
 341     
 342     (void)mca_base_pvar_register("ompi", "osc", "monitoring", "messages_sent_size", "Size of "
 343                                  "messages sent through the OSC framework with each peer.",
 344                                  OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE,
 345                                  MCA_MONITORING_VAR_TYPE, NULL, MPI_T_BIND_MPI_COMM,
 346                                  MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_IWG,
 347                                  mca_common_monitoring_get_osc_sent_size, NULL,
 348                                  mca_common_monitoring_comm_size_notify, NULL);
 349 
 350     (void)mca_base_pvar_register("ompi", "osc", "monitoring", "messages_recv_count", "Number of "
 351                                  "messages received through the OSC framework with each peer.",
 352                                  OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE,
 353                                  MCA_MONITORING_VAR_TYPE, NULL, MPI_T_BIND_MPI_COMM,
 354                                  MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_IWG,
 355                                  mca_common_monitoring_get_osc_recv_count, NULL,
 356                                  mca_common_monitoring_comm_size_notify, NULL);
 357 
 358     (void)mca_base_pvar_register("ompi", "osc", "monitoring", "messages_recv_size", "Size of "
 359                                  "messages received through the OSC framework with each peer.",
 360                                  OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE,
 361                                  MCA_MONITORING_VAR_TYPE, NULL, MPI_T_BIND_MPI_COMM,
 362                                  MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_IWG,
 363                                  mca_common_monitoring_get_osc_recv_size, NULL,
 364                                  mca_common_monitoring_comm_size_notify, NULL);
 365 
 366     /* COLL PVARs */
 367     (void)mca_base_pvar_register("ompi", "coll", "monitoring", "messages_count", "Number of "
 368                                  "messages exchanged through the COLL framework with each peer.",
 369                                  OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE,
 370                                  MCA_MONITORING_VAR_TYPE, NULL, MPI_T_BIND_MPI_COMM,
 371                                  MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_IWG,
 372                                  mca_common_monitoring_get_coll_count, NULL,
 373                                  mca_common_monitoring_comm_size_notify, NULL);
 374 
 375     (void)mca_base_pvar_register("ompi", "coll", "monitoring", "messages_size", "Size of "
 376                                  "messages exchanged through the COLL framework with each peer.",
 377                                  OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE,
 378                                  MCA_MONITORING_VAR_TYPE, NULL, MPI_T_BIND_MPI_COMM,
 379                                  MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_IWG,
 380                                  mca_common_monitoring_get_coll_size, NULL,
 381                                  mca_common_monitoring_comm_size_notify, NULL);
 382 
 383     (void)mca_base_pvar_register("ompi", "coll", "monitoring", "o2a_count", "Number of messages "
 384                                  "exchanged as one-to-all operations in a communicator.",
 385                                  OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_COUNTER,
 386                                  MCA_MONITORING_VAR_TYPE, NULL, MPI_T_BIND_MPI_COMM,
 387                                  MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_IWG,
 388                                  mca_common_monitoring_coll_get_o2a_count, NULL,
 389                                  mca_common_monitoring_coll_messages_notify, NULL);
 390     
 391     (void)mca_base_pvar_register("ompi", "coll", "monitoring", "o2a_size", "Size of messages "
 392                                  "exchanged as one-to-all operations in a communicator.",
 393                                  OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_AGGREGATE,
 394                                  MCA_MONITORING_VAR_TYPE, NULL, MPI_T_BIND_MPI_COMM,
 395                                  MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_IWG,
 396                                  mca_common_monitoring_coll_get_o2a_size, NULL,
 397                                  mca_common_monitoring_coll_messages_notify, NULL);
 398 
 399     (void)mca_base_pvar_register("ompi", "coll", "monitoring", "a2o_count", "Number of messages "
 400                                  "exchanged as all-to-one operations in a communicator.",
 401                                  OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_COUNTER,
 402                                  MCA_MONITORING_VAR_TYPE, NULL, MPI_T_BIND_MPI_COMM,
 403                                  MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_IWG,
 404                                  mca_common_monitoring_coll_get_a2o_count, NULL,
 405                                  mca_common_monitoring_coll_messages_notify, NULL);
 406     
 407     (void)mca_base_pvar_register("ompi", "coll", "monitoring", "a2o_size", "Size of messages "
 408                                  "exchanged as all-to-one operations in a communicator.",
 409                                  OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_AGGREGATE,
 410                                  MCA_MONITORING_VAR_TYPE, NULL, MPI_T_BIND_MPI_COMM,
 411                                  MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_IWG,
 412                                  mca_common_monitoring_coll_get_a2o_size, NULL,
 413                                  mca_common_monitoring_coll_messages_notify, NULL);
 414 
 415     (void)mca_base_pvar_register("ompi", "coll", "monitoring", "a2a_count", "Number of messages "
 416                                  "exchanged as all-to-all operations in a communicator.",
 417                                  OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_COUNTER,
 418                                  MCA_MONITORING_VAR_TYPE, NULL, MPI_T_BIND_MPI_COMM,
 419                                  MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_IWG,
 420                                  mca_common_monitoring_coll_get_a2a_count, NULL,
 421                                  mca_common_monitoring_coll_messages_notify, NULL);
 422     
 423     (void)mca_base_pvar_register("ompi", "coll", "monitoring", "a2a_size", "Size of messages "
 424                                  "exchanged as all-to-all operations in a communicator.",
 425                                  OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_AGGREGATE,
 426                                  MCA_MONITORING_VAR_TYPE, NULL, MPI_T_BIND_MPI_COMM,
 427                                  MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_IWG,
 428                                  mca_common_monitoring_coll_get_a2a_size, NULL,
 429                                  mca_common_monitoring_coll_messages_notify, NULL);
 430 }
 431 
 432 /**
 433  * This PML monitors only the processes in the MPI_COMM_WORLD. As OMPI is now lazily
 434  * adding peers on the first call to add_procs we need to check how many processes
 435  * are in the MPI_COMM_WORLD to create the storage with the right size.
 436  */
 437 int mca_common_monitoring_add_procs(struct ompi_proc_t **procs,
 438                                     size_t nprocs)
 439 {
 440     opal_process_name_t tmp, wp_name;
 441     size_t i;
 442     int peer_rank;
 443     uint64_t key;
 444     if( 0 > rank_world )
 445         rank_world = ompi_comm_rank((ompi_communicator_t*)&ompi_mpi_comm_world);
 446     if( !nprocs_world )
 447         nprocs_world = ompi_comm_size((ompi_communicator_t*)&ompi_mpi_comm_world);
 448 
 449     if( NULL == pml_data ) {
 450         int array_size = (10 + max_size_histogram) * nprocs_world;
 451         pml_data           = (opal_atomic_size_t*)calloc(array_size, sizeof(size_t));
 452         pml_count          = pml_data + nprocs_world;
 453         filtered_pml_data  = pml_count + nprocs_world;
 454         filtered_pml_count = filtered_pml_data + nprocs_world;
 455         osc_data_s         = filtered_pml_count + nprocs_world;
 456         osc_count_s        = osc_data_s + nprocs_world;
 457         osc_data_r         = osc_count_s + nprocs_world;
 458         osc_count_r        = osc_data_r + nprocs_world;
 459         coll_data          = osc_count_r + nprocs_world;
 460         coll_count         = coll_data + nprocs_world;
 461 
 462         size_histogram     = coll_count + nprocs_world;
 463     }
 464 
 465     /* For all procs in the same MPI_COMM_WORLD we need to add them to the hash table */
 466     for( i = 0; i < nprocs; i++ ) {
 467 
 468         /* Extract the peer procname from the procs array */
 469         if( ompi_proc_is_sentinel(procs[i]) ) {
 470             tmp = ompi_proc_sentinel_to_name((uintptr_t)procs[i]);
 471         } else {
 472             tmp = procs[i]->super.proc_name;
 473         }
 474         if( tmp.jobid != ompi_proc_local_proc->super.proc_name.jobid )
 475             continue;
 476 
 477         /* each process will only be added once, so there is no way it already exists in the hash */
 478         for( peer_rank = 0; peer_rank < nprocs_world; peer_rank++ ) {
 479             wp_name = ompi_group_get_proc_name(((ompi_communicator_t*)&ompi_mpi_comm_world)->c_remote_group, peer_rank);
 480             if( 0 != opal_compare_proc( tmp, wp_name ) )
 481                 continue;
 482 
 483             key = *((uint64_t*)&tmp);
 484             /* save the rank of the process in MPI_COMM_WORLD in the hash using the proc_name as the key */
 485             if( OPAL_SUCCESS != opal_hash_table_set_value_uint64(common_monitoring_translation_ht,
 486                                                                  key, (void*)(uintptr_t)peer_rank) ) {
 487                 return OMPI_ERR_OUT_OF_RESOURCE;  /* failed to allocate memory or growing the hash table */
 488             }
 489             break;
 490         }
 491     }
 492     return OMPI_SUCCESS;
 493 }
 494 
 495 static void mca_common_monitoring_reset( void )
 496 {
 497     int array_size = (10 + max_size_histogram) * nprocs_world;
 498     memset((void *) pml_data, 0, array_size * sizeof(size_t));
 499     mca_common_monitoring_coll_reset();
 500 }
 501 
 502 void mca_common_monitoring_record_pml(int world_rank, size_t data_size, int tag)
 503 {
 504     if( 0 == mca_common_monitoring_current_state ) return;  /* right now the monitoring is not started */
 505 
 506     /* Keep tracks of the data_size distribution */
 507     if( 0 == data_size ) {
 508         opal_atomic_add_fetch_size_t(&size_histogram[world_rank * max_size_histogram], 1);
 509     } else {
 510         int log2_size = log10(data_size)/log10_2;
 511         if(log2_size > max_size_histogram - 2) /* Avoid out-of-bound write */
 512             log2_size = max_size_histogram - 2;
 513         opal_atomic_add_fetch_size_t(&size_histogram[world_rank * max_size_histogram + log2_size + 1], 1);
 514     }
 515         
 516     /* distinguishses positive and negative tags if requested */
 517     if( (tag < 0) && (mca_common_monitoring_filter()) ) {
 518         opal_atomic_add_fetch_size_t(&filtered_pml_data[world_rank], data_size);
 519         opal_atomic_add_fetch_size_t(&filtered_pml_count[world_rank], 1);
 520     } else { /* if filtered monitoring is not activated data is aggregated indifferently */
 521         opal_atomic_add_fetch_size_t(&pml_data[world_rank], data_size);
 522         opal_atomic_add_fetch_size_t(&pml_count[world_rank], 1);
 523     }
 524 }
 525 
 526 static int mca_common_monitoring_get_pml_count(const struct mca_base_pvar_t *pvar,
 527                                                void *value,
 528                                                void *obj_handle)
 529 {
 530     ompi_communicator_t *comm = (ompi_communicator_t *) obj_handle;
 531     int i, comm_size = ompi_comm_size (comm);
 532     size_t *values = (size_t*) value;
 533 
 534     if(comm != &ompi_mpi_comm_world.comm || NULL == pml_count)
 535         return OMPI_ERROR;
 536 
 537     for (i = 0 ; i < comm_size ; ++i) {
 538         values[i] = pml_count[i];
 539     }
 540 
 541     return OMPI_SUCCESS;
 542 }
 543 
 544 static int mca_common_monitoring_get_pml_size(const struct mca_base_pvar_t *pvar,
 545                                               void *value,
 546                                               void *obj_handle)
 547 {
 548     ompi_communicator_t *comm = (ompi_communicator_t *) obj_handle;
 549     int comm_size = ompi_comm_size (comm);
 550     size_t *values = (size_t*) value;
 551     int i;
 552 
 553     if(comm != &ompi_mpi_comm_world.comm || NULL == pml_data)
 554         return OMPI_ERROR;
 555 
 556     for (i = 0 ; i < comm_size ; ++i) {
 557         values[i] = pml_data[i];
 558     }
 559 
 560     return OMPI_SUCCESS;
 561 }
 562 
 563 void mca_common_monitoring_record_osc(int world_rank, size_t data_size,
 564                                       enum mca_monitoring_osc_direction dir)
 565 {
 566     if( 0 == mca_common_monitoring_current_state ) return;  /* right now the monitoring is not started */
 567 
 568     if( SEND == dir ) {
 569         opal_atomic_add_fetch_size_t(&osc_data_s[world_rank], data_size);
 570         opal_atomic_add_fetch_size_t(&osc_count_s[world_rank], 1);
 571     } else {
 572         opal_atomic_add_fetch_size_t(&osc_data_r[world_rank], data_size);
 573         opal_atomic_add_fetch_size_t(&osc_count_r[world_rank], 1);
 574     }
 575 }
 576 
 577 static int mca_common_monitoring_get_osc_sent_count(const struct mca_base_pvar_t *pvar,
 578                                                     void *value,
 579                                                     void *obj_handle)
 580 {
 581     ompi_communicator_t *comm = (ompi_communicator_t *) obj_handle;
 582     int i, comm_size = ompi_comm_size (comm);
 583     size_t *values = (size_t*) value;
 584 
 585     if(comm != &ompi_mpi_comm_world.comm || NULL == pml_count)
 586         return OMPI_ERROR;
 587 
 588     for (i = 0 ; i < comm_size ; ++i) {
 589         values[i] = osc_count_s[i];
 590     }
 591 
 592     return OMPI_SUCCESS;
 593 }
 594 
 595 static int mca_common_monitoring_get_osc_sent_size(const struct mca_base_pvar_t *pvar,
 596                                                    void *value,
 597                                                    void *obj_handle)
 598 {
 599     ompi_communicator_t *comm = (ompi_communicator_t *) obj_handle;
 600     int comm_size = ompi_comm_size (comm);
 601     size_t *values = (size_t*) value;
 602     int i;
 603 
 604     if(comm != &ompi_mpi_comm_world.comm || NULL == pml_data)
 605         return OMPI_ERROR;
 606 
 607     for (i = 0 ; i < comm_size ; ++i) {
 608         values[i] = osc_data_s[i];
 609     }
 610 
 611     return OMPI_SUCCESS;
 612 }
 613 
 614 static int mca_common_monitoring_get_osc_recv_count(const struct mca_base_pvar_t *pvar,
 615                                                     void *value,
 616                                                     void *obj_handle)
 617 {
 618     ompi_communicator_t *comm = (ompi_communicator_t *) obj_handle;
 619     int i, comm_size = ompi_comm_size (comm);
 620     size_t *values = (size_t*) value;
 621 
 622     if(comm != &ompi_mpi_comm_world.comm || NULL == pml_count)
 623         return OMPI_ERROR;
 624 
 625     for (i = 0 ; i < comm_size ; ++i) {
 626         values[i] = osc_count_r[i];
 627     }
 628 
 629     return OMPI_SUCCESS;
 630 }
 631 
 632 static int mca_common_monitoring_get_osc_recv_size(const struct mca_base_pvar_t *pvar,
 633                                                    void *value,
 634                                                    void *obj_handle)
 635 {
 636     ompi_communicator_t *comm = (ompi_communicator_t *) obj_handle;
 637     int comm_size = ompi_comm_size (comm);
 638     size_t *values = (size_t*) value;
 639     int i;
 640 
 641     if(comm != &ompi_mpi_comm_world.comm || NULL == pml_data)
 642         return OMPI_ERROR;
 643 
 644     for (i = 0 ; i < comm_size ; ++i) {
 645         values[i] = osc_data_r[i];
 646     }
 647 
 648     return OMPI_SUCCESS;
 649 }
 650 
 651 void mca_common_monitoring_record_coll(int world_rank, size_t data_size)
 652 {
 653     if( 0 == mca_common_monitoring_current_state ) return;  /* right now the monitoring is not started */
 654 
 655     opal_atomic_add_fetch_size_t(&coll_data[world_rank], data_size);
 656     opal_atomic_add_fetch_size_t(&coll_count[world_rank], 1);
 657 }
 658 
 659 static int mca_common_monitoring_get_coll_count(const struct mca_base_pvar_t *pvar,
 660                                                 void *value,
 661                                                 void *obj_handle)
 662 {
 663     ompi_communicator_t *comm = (ompi_communicator_t *) obj_handle;
 664     int i, comm_size = ompi_comm_size (comm);
 665     size_t *values = (size_t*) value;
 666 
 667     if(comm != &ompi_mpi_comm_world.comm || NULL == pml_count)
 668         return OMPI_ERROR;
 669 
 670     for (i = 0 ; i < comm_size ; ++i) {
 671         values[i] = coll_count[i];
 672     }
 673 
 674     return OMPI_SUCCESS;
 675 }
 676 
 677 static int mca_common_monitoring_get_coll_size(const struct mca_base_pvar_t *pvar,
 678                                                void *value,
 679                                                void *obj_handle)
 680 {
 681     ompi_communicator_t *comm = (ompi_communicator_t *) obj_handle;
 682     int comm_size = ompi_comm_size (comm);
 683     size_t *values = (size_t*) value;
 684     int i;
 685 
 686     if(comm != &ompi_mpi_comm_world.comm || NULL == pml_data)
 687         return OMPI_ERROR;
 688 
 689     for (i = 0 ; i < comm_size ; ++i) {
 690         values[i] = coll_data[i];
 691     }
 692 
 693     return OMPI_SUCCESS;
 694 }
 695 
 696 static void mca_common_monitoring_output( FILE *pf, int my_rank, int nbprocs )
 697 {
 698     /* Dump outgoing messages */
 699     fprintf(pf, "# POINT TO POINT\n");
 700     for (int i = 0 ; i < nbprocs ; i++) {
 701         if(pml_count[i] > 0) {
 702             fprintf(pf, "E\t%" PRId32 "\t%" PRId32 "\t%zu bytes\t%zu msgs sent\t",
 703                     my_rank, i, pml_data[i], pml_count[i]);
 704             for(int j = 0 ; j < max_size_histogram ; ++j)
 705                 fprintf(pf, "%zu%s", size_histogram[i * max_size_histogram + j],
 706                         j < max_size_histogram - 1 ? "," : "\n");
 707         }
 708     }
 709 
 710     /* Dump outgoing synchronization/collective messages */
 711     if( mca_common_monitoring_filter() ) {
 712         for (int i = 0 ; i < nbprocs ; i++) {
 713             if(filtered_pml_count[i] > 0) {
 714                 fprintf(pf, "I\t%" PRId32 "\t%" PRId32 "\t%zu bytes\t%zu msgs sent%s",
 715                         my_rank, i, filtered_pml_data[i], filtered_pml_count[i],
 716                         0 == pml_count[i] ? "\t" : "\n");
 717                 /* 
 718                  * In the case there was no external messages
 719                  * exchanged between the two processes, the histogram
 720                  * has not yet been dumpped. Then we need to add it at
 721                  * the end of the internal category.
 722                  */
 723                 if(0 == pml_count[i]) {
 724                     for(int j = 0 ; j < max_size_histogram ; ++j)
 725                         fprintf(pf, "%zu%s", size_histogram[i * max_size_histogram + j],
 726                                 j < max_size_histogram - 1 ? "," : "\n");
 727                 }
 728             }
 729         }
 730     }
 731 
 732     /* Dump incoming messages */
 733     fprintf(pf, "# OSC\n");
 734     for (int i = 0 ; i < nbprocs ; i++) {
 735         if(osc_count_s[i] > 0) {
 736             fprintf(pf, "S\t%" PRId32 "\t%" PRId32 "\t%zu bytes\t%zu msgs sent\n",
 737                     my_rank, i, osc_data_s[i], osc_count_s[i]);
 738         }
 739         if(osc_count_r[i] > 0) {
 740             fprintf(pf, "R\t%" PRId32 "\t%" PRId32 "\t%zu bytes\t%zu msgs sent\n",
 741                     my_rank, i, osc_data_r[i], osc_count_r[i]);
 742         }
 743     }
 744 
 745     /* Dump collectives */
 746     fprintf(pf, "# COLLECTIVES\n");
 747     for (int i = 0 ; i < nbprocs ; i++) {
 748         if(coll_count[i] > 0) {
 749             fprintf(pf, "C\t%" PRId32 "\t%" PRId32 "\t%zu bytes\t%zu msgs sent\n",
 750                     my_rank, i, coll_data[i], coll_count[i]);
 751         }
 752     }
 753     mca_common_monitoring_coll_flush_all(pf);
 754 }
 755 
 756 /*
 757  * Flushes the monitoring into filename
 758  * Useful for phases (see example in test/monitoring)
 759  */
 760 static int mca_common_monitoring_flush(int fd, char* filename)
 761 {
 762     /* If we are not drived by MPIT then dump the monitoring information */
 763     if( 0 == mca_common_monitoring_current_state || 0 == fd ) /* if disabled do nothing */
 764         return OMPI_SUCCESS;
 765 
 766     if( 1 == fd ) {
 767         OPAL_MONITORING_PRINT_INFO("Proc %" PRId32 " flushing monitoring to stdout", rank_world);
 768         mca_common_monitoring_output( stdout, rank_world, nprocs_world );
 769     } else if( 2 == fd ) {
 770         OPAL_MONITORING_PRINT_INFO("Proc %" PRId32 " flushing monitoring to stderr", rank_world);
 771         mca_common_monitoring_output( stderr, rank_world, nprocs_world );
 772     } else {
 773         FILE *pf = NULL;
 774         char* tmpfn = NULL;
 775 
 776         if( NULL == filename ) { /* No filename */
 777             OPAL_MONITORING_PRINT_ERR("Error while flushing: no filename provided");
 778             return OMPI_ERROR;
 779         } else {
 780             opal_asprintf(&tmpfn, "%s.%" PRId32 ".prof", filename, rank_world);
 781             pf = fopen(tmpfn, "w");
 782             free(tmpfn);
 783         }
 784 
 785         if(NULL == pf) {  /* Error during open */
 786             OPAL_MONITORING_PRINT_ERR("Error while flushing to: %s.%" PRId32 ".prof",
 787                                       filename, rank_world);
 788             return OMPI_ERROR;
 789         }
 790 
 791         OPAL_MONITORING_PRINT_INFO("Proc %d flushing monitoring to: %s.%" PRId32 ".prof",
 792                                    rank_world, filename, rank_world);
 793 
 794         mca_common_monitoring_output( pf, rank_world, nprocs_world );
 795 
 796         fclose(pf);
 797     }
 798     /* Reset to 0 all monitored data */
 799     mca_common_monitoring_reset();
 800     return OMPI_SUCCESS;
 801 }

/* [<][>][^][v][top][bottom][index][help] */