This source file includes following definitions.
- mca_common_monitoring_coll_cache_name
- mca_common_monitoring_coll_cache
- mca_common_monitoring_coll_new
- mca_common_monitoring_coll_release
- mca_common_monitoring_coll_cond_release
- mca_common_monitoring_coll_finalize
- mca_common_monitoring_coll_flush
- mca_common_monitoring_coll_flush_all
- mca_common_monitoring_coll_reset
- mca_common_monitoring_coll_messages_notify
- mca_common_monitoring_coll_o2a
- mca_common_monitoring_coll_get_o2a_count
- mca_common_monitoring_coll_get_o2a_size
- mca_common_monitoring_coll_a2o
- mca_common_monitoring_coll_get_a2o_count
- mca_common_monitoring_coll_get_a2o_size
- mca_common_monitoring_coll_a2a
- mca_common_monitoring_coll_get_a2a_count
- mca_common_monitoring_coll_get_a2a_size
- mca_monitoring_coll_construct
- mca_monitoring_coll_destruct
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 #include <ompi_config.h>
17 #include "common_monitoring.h"
18 #include "common_monitoring_coll.h"
19 #include <ompi/constants.h>
20 #include <ompi/communicator/communicator.h>
21 #include <opal/mca/base/mca_base_component_repository.h>
22 #include <opal/class/opal_hash_table.h>
23 #include <assert.h>
24
25
26 struct mca_monitoring_coll_data_t {
27 opal_object_t super;
28 char*procs;
29 char*comm_name;
30 int world_rank;
31 int is_released;
32 ompi_communicator_t*p_comm;
33 opal_atomic_size_t o2a_count;
34 opal_atomic_size_t o2a_size;
35 opal_atomic_size_t a2o_count;
36 opal_atomic_size_t a2o_size;
37 opal_atomic_size_t a2a_count;
38 opal_atomic_size_t a2a_size;
39 };
40
41
42 static opal_hash_table_t *comm_data = NULL;
43
44 int mca_common_monitoring_coll_cache_name(ompi_communicator_t*comm)
45 {
46 mca_monitoring_coll_data_t*data;
47 int ret = opal_hash_table_get_value_uint64(comm_data, *((uint64_t*)&comm), (void*)&data);
48 if( OPAL_SUCCESS == ret ) {
49 data->comm_name = strdup(comm->c_name);
50 data->p_comm = NULL;
51 }
52 return ret;
53 }
54
55 static inline void mca_common_monitoring_coll_cache(mca_monitoring_coll_data_t*data)
56 {
57 if( -1 == data->world_rank ) {
58
59 mca_common_monitoring_get_world_rank(ompi_comm_rank(data->p_comm),
60 data->p_comm->c_remote_group,
61 &data->world_rank);
62 }
63
64
65 if( (-1 != data->world_rank) && (NULL == data->procs || 0 == strlen(data->procs)) ) {
66 int i, pos = 0, size, world_size = -1, max_length, world_rank;
67 char*tmp_procs;
68 size = ompi_comm_size(data->p_comm);
69 world_size = ompi_comm_size((ompi_communicator_t*)&ompi_mpi_comm_world) - 1;
70 assert( 0 < size );
71
72 max_length = snprintf(NULL, 0, "%d,", world_size - 1) + 1;
73 tmp_procs = malloc((1 + max_length * size) * sizeof(char));
74 if( NULL == tmp_procs ) {
75 OPAL_MONITORING_PRINT_ERR("Cannot allocate memory for caching proc list.");
76 } else {
77 tmp_procs[0] = '\0';
78
79 for(i = 0; i < size; ++i) {
80 if( OPAL_SUCCESS == mca_common_monitoring_get_world_rank(i, data->p_comm->c_remote_group, &world_rank) )
81 pos += sprintf(&tmp_procs[pos], "%d,", world_rank);
82 }
83 tmp_procs[pos - 1] = '\0';
84 data->procs = realloc(tmp_procs, pos * sizeof(char));
85 }
86 }
87 }
88
89 mca_monitoring_coll_data_t*mca_common_monitoring_coll_new( ompi_communicator_t*comm )
90 {
91 mca_monitoring_coll_data_t*data = OBJ_NEW(mca_monitoring_coll_data_t);
92 if( NULL == data ) {
93 OPAL_MONITORING_PRINT_ERR("coll: new: data structure cannot be allocated");
94 return NULL;
95 }
96
97 data->p_comm = comm;
98
99
100 if( NULL == comm_data ) {
101 comm_data = OBJ_NEW(opal_hash_table_t);
102 if( NULL == comm_data ) {
103 OPAL_MONITORING_PRINT_ERR("coll: new: failed to allocate hashtable");
104 return data;
105 }
106 opal_hash_table_init(comm_data, 2048);
107 }
108
109
110 uint64_t key = *((uint64_t*)&comm);
111 if( OPAL_SUCCESS != opal_hash_table_set_value_uint64(comm_data, key, (void*)data) ) {
112 OPAL_MONITORING_PRINT_ERR("coll: new: failed to allocate memory or "
113 "growing the hash table");
114 }
115
116
117 mca_common_monitoring_coll_cache(data);
118
119 return data;
120 }
121
122 void mca_common_monitoring_coll_release(mca_monitoring_coll_data_t*data)
123 {
124 #if OPAL_ENABLE_DEBUG
125 if( NULL == data ) {
126 OPAL_MONITORING_PRINT_ERR("coll: release: data structure empty or already desallocated");
127 return;
128 }
129 #endif
130
131
132 data->is_released = 1;
133 mca_common_monitoring_coll_cache(data);
134 }
135
136 static void mca_common_monitoring_coll_cond_release(mca_monitoring_coll_data_t*data)
137 {
138 #if OPAL_ENABLE_DEBUG
139 if( NULL == data ) {
140 OPAL_MONITORING_PRINT_ERR("coll: release: data structure empty or already desallocated");
141 return;
142 }
143 #endif
144
145 if( data->is_released ) {
146 opal_hash_table_remove_value_uint64(comm_data, *((uint64_t*)&data->p_comm));
147 data->p_comm = NULL;
148 free(data->comm_name);
149 free(data->procs);
150 OBJ_RELEASE(data);
151 }
152 }
153
154 void mca_common_monitoring_coll_finalize( void )
155 {
156 if( NULL != comm_data ) {
157 opal_hash_table_remove_all( comm_data );
158 OBJ_RELEASE(comm_data);
159 }
160 }
161
162 void mca_common_monitoring_coll_flush(FILE *pf, mca_monitoring_coll_data_t*data)
163 {
164
165 fprintf(pf,
166 "D\t%s\tprocs: %s\n"
167 "O2A\t%" PRId32 "\t%zu bytes\t%zu msgs sent\n"
168 "A2O\t%" PRId32 "\t%zu bytes\t%zu msgs sent\n"
169 "A2A\t%" PRId32 "\t%zu bytes\t%zu msgs sent\n",
170 data->comm_name ? data->comm_name : data->p_comm ?
171 data->p_comm->c_name : "(no-name)", data->procs,
172 data->world_rank, data->o2a_size, data->o2a_count,
173 data->world_rank, data->a2o_size, data->a2o_count,
174 data->world_rank, data->a2a_size, data->a2a_count);
175 }
176
177 void mca_common_monitoring_coll_flush_all(FILE *pf)
178 {
179 if( NULL == comm_data ) return;
180
181 uint64_t key;
182 mca_monitoring_coll_data_t*previous = NULL, *data;
183
184 OPAL_HASH_TABLE_FOREACH(key, uint64, data, comm_data) {
185 if( NULL != previous && NULL == previous->p_comm ) {
186
187 mca_common_monitoring_coll_cond_release(previous);
188 }
189 mca_common_monitoring_coll_flush(pf, data);
190 previous = data;
191 }
192 mca_common_monitoring_coll_cond_release(previous);
193 }
194
195
196 void mca_common_monitoring_coll_reset(void)
197 {
198 if( NULL == comm_data ) return;
199
200 uint64_t key;
201 mca_monitoring_coll_data_t*data;
202
203 OPAL_HASH_TABLE_FOREACH(key, uint64, data, comm_data) {
204 data->o2a_count = 0; data->o2a_size = 0;
205 data->a2o_count = 0; data->a2o_size = 0;
206 data->a2a_count = 0; data->a2a_size = 0;
207 }
208 }
209
210 int mca_common_monitoring_coll_messages_notify(mca_base_pvar_t *pvar,
211 mca_base_pvar_event_t event,
212 void *obj_handle,
213 int *count)
214 {
215 switch (event) {
216 case MCA_BASE_PVAR_HANDLE_BIND:
217 *count = 1;
218 case MCA_BASE_PVAR_HANDLE_UNBIND:
219 return OMPI_SUCCESS;
220 case MCA_BASE_PVAR_HANDLE_START:
221 mca_common_monitoring_current_state = mca_common_monitoring_enabled;
222 return OMPI_SUCCESS;
223 case MCA_BASE_PVAR_HANDLE_STOP:
224 mca_common_monitoring_current_state = 0;
225 return OMPI_SUCCESS;
226 }
227
228 return OMPI_ERROR;
229 }
230
231 void mca_common_monitoring_coll_o2a(size_t size, mca_monitoring_coll_data_t*data)
232 {
233 if( 0 == mca_common_monitoring_current_state ) return;
234 #if OPAL_ENABLE_DEBUG
235 if( NULL == data ) {
236 OPAL_MONITORING_PRINT_ERR("coll: o2a: data structure empty");
237 return;
238 }
239 #endif
240 opal_atomic_add_fetch_size_t(&data->o2a_size, size);
241 opal_atomic_add_fetch_size_t(&data->o2a_count, 1);
242 }
243
244 int mca_common_monitoring_coll_get_o2a_count(const struct mca_base_pvar_t *pvar,
245 void *value,
246 void *obj_handle)
247 {
248 ompi_communicator_t *comm = (ompi_communicator_t *) obj_handle;
249 size_t *value_size = (size_t*) value;
250 mca_monitoring_coll_data_t*data;
251 int ret = opal_hash_table_get_value_uint64(comm_data, *((uint64_t*)&comm), (void*)&data);
252 if( OPAL_SUCCESS == ret ) {
253 *value_size = data->o2a_count;
254 }
255 return ret;
256 }
257
258 int mca_common_monitoring_coll_get_o2a_size(const struct mca_base_pvar_t *pvar,
259 void *value,
260 void *obj_handle)
261 {
262 ompi_communicator_t *comm = (ompi_communicator_t *) obj_handle;
263 size_t *value_size = (size_t*) value;
264 mca_monitoring_coll_data_t*data;
265 int ret = opal_hash_table_get_value_uint64(comm_data, *((uint64_t*)&comm), (void*)&data);
266 if( OPAL_SUCCESS == ret ) {
267 *value_size = data->o2a_size;
268 }
269 return ret;
270 }
271
272 void mca_common_monitoring_coll_a2o(size_t size, mca_monitoring_coll_data_t*data)
273 {
274 if( 0 == mca_common_monitoring_current_state ) return;
275 #if OPAL_ENABLE_DEBUG
276 if( NULL == data ) {
277 OPAL_MONITORING_PRINT_ERR("coll: a2o: data structure empty");
278 return;
279 }
280 #endif
281 opal_atomic_add_fetch_size_t(&data->a2o_size, size);
282 opal_atomic_add_fetch_size_t(&data->a2o_count, 1);
283 }
284
285 int mca_common_monitoring_coll_get_a2o_count(const struct mca_base_pvar_t *pvar,
286 void *value,
287 void *obj_handle)
288 {
289 ompi_communicator_t *comm = (ompi_communicator_t *) obj_handle;
290 size_t *value_size = (size_t*) value;
291 mca_monitoring_coll_data_t*data;
292 int ret = opal_hash_table_get_value_uint64(comm_data, *((uint64_t*)&comm), (void*)&data);
293 if( OPAL_SUCCESS == ret ) {
294 *value_size = data->a2o_count;
295 }
296 return ret;
297 }
298
299 int mca_common_monitoring_coll_get_a2o_size(const struct mca_base_pvar_t *pvar,
300 void *value,
301 void *obj_handle)
302 {
303 ompi_communicator_t *comm = (ompi_communicator_t *) obj_handle;
304 size_t *value_size = (size_t*) value;
305 mca_monitoring_coll_data_t*data;
306 int ret = opal_hash_table_get_value_uint64(comm_data, *((uint64_t*)&comm), (void*)&data);
307 if( OPAL_SUCCESS == ret ) {
308 *value_size = data->a2o_size;
309 }
310 return ret;
311 }
312
313 void mca_common_monitoring_coll_a2a(size_t size, mca_monitoring_coll_data_t*data)
314 {
315 if( 0 == mca_common_monitoring_current_state ) return;
316 #if OPAL_ENABLE_DEBUG
317 if( NULL == data ) {
318 OPAL_MONITORING_PRINT_ERR("coll: a2a: data structure empty");
319 return;
320 }
321 #endif
322 opal_atomic_add_fetch_size_t(&data->a2a_size, size);
323 opal_atomic_add_fetch_size_t(&data->a2a_count, 1);
324 }
325
326 int mca_common_monitoring_coll_get_a2a_count(const struct mca_base_pvar_t *pvar,
327 void *value,
328 void *obj_handle)
329 {
330 ompi_communicator_t *comm = (ompi_communicator_t *) obj_handle;
331 size_t *value_size = (size_t*) value;
332 mca_monitoring_coll_data_t*data;
333 int ret = opal_hash_table_get_value_uint64(comm_data, *((uint64_t*)&comm), (void*)&data);
334 if( OPAL_SUCCESS == ret ) {
335 *value_size = data->a2a_count;
336 }
337 return ret;
338 }
339
340 int mca_common_monitoring_coll_get_a2a_size(const struct mca_base_pvar_t *pvar,
341 void *value,
342 void *obj_handle)
343 {
344 ompi_communicator_t *comm = (ompi_communicator_t *) obj_handle;
345 size_t *value_size = (size_t*) value;
346 mca_monitoring_coll_data_t*data;
347 int ret = opal_hash_table_get_value_uint64(comm_data, *((uint64_t*)&comm), (void*)&data);
348 if( OPAL_SUCCESS == ret ) {
349 *value_size = data->a2a_size;
350 }
351 return ret;
352 }
353
354 static void mca_monitoring_coll_construct (mca_monitoring_coll_data_t*coll_data)
355 {
356 coll_data->procs = NULL;
357 coll_data->comm_name = NULL;
358 coll_data->world_rank = -1;
359 coll_data->p_comm = NULL;
360 coll_data->is_released = 0;
361 coll_data->o2a_count = 0;
362 coll_data->o2a_size = 0;
363 coll_data->a2o_count = 0;
364 coll_data->a2o_size = 0;
365 coll_data->a2a_count = 0;
366 coll_data->a2a_size = 0;
367 }
368
369 static void mca_monitoring_coll_destruct (mca_monitoring_coll_data_t*coll_data){}
370
371 OBJ_CLASS_INSTANCE(mca_monitoring_coll_data_t, opal_object_t, mca_monitoring_coll_construct, mca_monitoring_coll_destruct);