This source file includes following definitions.
- fake_cb
- opal_progress_finalize
- opal_progress_init
- opal_progress_events
- opal_progress
- opal_progress_set_event_flag
- opal_progress_event_users_increment
- opal_progress_event_users_decrement
- opal_progress_set_yield_when_idle
- opal_progress_set_event_poll_rate
- opal_progress_find_cb
- _opal_progress_register
- opal_progress_register
- opal_progress_register_lp
- _opal_progress_unregister
- opal_progress_unregister
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 #include "opal_config.h"
27
28 #ifdef HAVE_SCHED_H
29 #include <sched.h>
30 #endif
31
32 #include "opal/runtime/opal_progress.h"
33 #include "opal/mca/event/event.h"
34 #include "opal/mca/base/mca_base_var.h"
35 #include "opal/constants.h"
36 #include "opal/mca/timer/base/base.h"
37 #include "opal/util/output.h"
38 #include "opal/runtime/opal_params.h"
39
40 #define OPAL_PROGRESS_USE_TIMERS (OPAL_TIMER_CYCLE_SUPPORTED || OPAL_TIMER_USEC_SUPPORTED)
41 #define OPAL_PROGRESS_ONLY_USEC_NATIVE (OPAL_TIMER_USEC_NATIVE && !OPAL_TIMER_CYCLE_NATIVE)
42
43 #if OPAL_ENABLE_DEBUG
44 bool opal_progress_debug = false;
45 #endif
46
47
48
49
50 static int opal_progress_event_flag = OPAL_EVLOOP_ONCE | OPAL_EVLOOP_NONBLOCK;
51 int opal_progress_spin_count = 10000;
52
53
54
55
56
57 static opal_atomic_lock_t progress_lock;
58
59
60 static volatile opal_progress_callback_t *callbacks = NULL;
61 static size_t callbacks_len = 0;
62 static size_t callbacks_size = 0;
63
64 static volatile opal_progress_callback_t *callbacks_lp = NULL;
65 static size_t callbacks_lp_len = 0;
66 static size_t callbacks_lp_size = 0;
67
68
69 bool opal_progress_yield_when_idle = false;
70
71 #if OPAL_PROGRESS_USE_TIMERS
72 static opal_timer_t event_progress_last_time = 0;
73 static opal_timer_t event_progress_delta = 0;
74 #else
75
76 static opal_atomic_int32_t event_progress_counter = 0;
77
78 static int32_t event_progress_delta = 0;
79 #endif
80
81
82 static opal_atomic_int32_t num_event_users = 0;
83
84 #if OPAL_ENABLE_DEBUG
85 static int debug_output = -1;
86 #endif
87
88
89
90
91
92
93
94
95
96 static int fake_cb(void) { return 0; }
97
98 static int _opal_progress_unregister (opal_progress_callback_t cb, volatile opal_progress_callback_t *callback_array,
99 size_t *callback_array_len);
100
101 static void opal_progress_finalize (void)
102 {
103
104 opal_atomic_lock(&progress_lock);
105
106 callbacks_len = 0;
107 callbacks_size = 0;
108 free ((void *) callbacks);
109 callbacks = NULL;
110
111 callbacks_lp_len = 0;
112 callbacks_lp_size = 0;
113 free ((void *) callbacks_lp);
114 callbacks_lp = NULL;
115
116 opal_atomic_unlock(&progress_lock);
117 }
118
119
120
121 int
122 opal_progress_init(void)
123 {
124
125 opal_atomic_lock_init(&progress_lock, OPAL_ATOMIC_LOCK_UNLOCKED);
126
127
128 opal_progress_set_event_poll_rate(10000);
129
130 #if OPAL_ENABLE_DEBUG
131 if (opal_progress_debug) {
132 debug_output = opal_output_open(NULL);
133 }
134 #endif
135
136 callbacks_size = callbacks_lp_size = 8;
137
138 callbacks = malloc (callbacks_size * sizeof (callbacks[0]));
139 callbacks_lp = malloc (callbacks_lp_size * sizeof (callbacks_lp[0]));
140
141 if (NULL == callbacks || NULL == callbacks_lp) {
142 free ((void *) callbacks);
143 free ((void *) callbacks_lp);
144 callbacks_size = callbacks_lp_size = 0;
145 callbacks = callbacks_lp = NULL;
146 return OPAL_ERR_OUT_OF_RESOURCE;
147 }
148
149 for (size_t i = 0 ; i < callbacks_size ; ++i) {
150 callbacks[i] = fake_cb;
151 }
152
153 for (size_t i = 0 ; i < callbacks_lp_size ; ++i) {
154 callbacks_lp[i] = fake_cb;
155 }
156
157 OPAL_OUTPUT((debug_output, "progress: initialized event flag to: %x",
158 opal_progress_event_flag));
159 OPAL_OUTPUT((debug_output, "progress: initialized yield_when_idle to: %s",
160 opal_progress_yield_when_idle ? "true" : "false"));
161 OPAL_OUTPUT((debug_output, "progress: initialized num users to: %d",
162 num_event_users));
163 OPAL_OUTPUT((debug_output, "progress: initialized poll rate to: %ld",
164 (long) event_progress_delta));
165
166 opal_finalize_register_cleanup (opal_progress_finalize);
167
168 return OPAL_SUCCESS;
169 }
170
171 static int opal_progress_events(void)
172 {
173 static opal_atomic_int32_t lock = 0;
174 int events = 0;
175
176 if( opal_progress_event_flag != 0 && !OPAL_THREAD_SWAP_32(&lock, 1) ) {
177 #if OPAL_HAVE_WORKING_EVENTOPS
178 #if OPAL_PROGRESS_USE_TIMERS
179 #if OPAL_PROGRESS_ONLY_USEC_NATIVE
180 opal_timer_t now = opal_timer_base_get_usec();
181 #else
182 opal_timer_t now = opal_timer_base_get_cycles();
183 #endif
184
185
186 if (now - event_progress_last_time > event_progress_delta ) {
187 event_progress_last_time = (num_event_users > 0) ?
188 now - event_progress_delta : now;
189
190 events += opal_event_loop(opal_sync_event_base, opal_progress_event_flag);
191 }
192
193 #else
194
195
196 if (OPAL_THREAD_ADD_FETCH32(&event_progress_counter, -1) <= 0 ) {
197 event_progress_counter =
198 (num_event_users > 0) ? 0 : event_progress_delta;
199 events += opal_event_loop(opal_sync_event_base, opal_progress_event_flag);
200 }
201 #endif
202
203 #endif
204 lock = 0;
205 }
206
207 return events;
208 }
209
210
211
212
213
214
215
216
217
218
219
220
221 void
222 opal_progress(void)
223 {
224 static uint32_t num_calls = 0;
225 size_t i;
226 int events = 0;
227
228
229 for (i = 0 ; i < callbacks_len ; ++i) {
230 events += (callbacks[i])();
231 }
232
233
234
235
236
237
238
239 if (((num_calls++) & 0x7) == 0) {
240 for (i = 0 ; i < callbacks_lp_len ; ++i) {
241 events += (callbacks_lp[i])();
242 }
243
244 opal_progress_events();
245 } else if (num_event_users > 0) {
246 opal_progress_events();
247 }
248
249 #if OPAL_HAVE_SCHED_YIELD
250 if (opal_progress_yield_when_idle && events <= 0) {
251
252
253
254
255
256 sched_yield();
257 }
258 #endif
259 }
260
261
262 int
263 opal_progress_set_event_flag(int flag)
264 {
265 int tmp = opal_progress_event_flag;
266 opal_progress_event_flag = flag;
267
268 OPAL_OUTPUT((debug_output, "progress: set_event_flag setting to %d", flag));
269
270 return tmp;
271 }
272
273
274 void
275 opal_progress_event_users_increment(void)
276 {
277 #if OPAL_ENABLE_DEBUG
278 int32_t val;
279 val = opal_atomic_add_fetch_32(&num_event_users, 1);
280
281 OPAL_OUTPUT((debug_output, "progress: event_users_increment setting count to %d", val));
282 #else
283 (void)opal_atomic_add_fetch_32(&num_event_users, 1);
284 #endif
285
286 #if OPAL_PROGRESS_USE_TIMERS
287
288 event_progress_last_time -= event_progress_delta;
289 #else
290
291 event_progress_counter = 0;
292 #endif
293 }
294
295
296 void
297 opal_progress_event_users_decrement(void)
298 {
299 #if OPAL_ENABLE_DEBUG || ! OPAL_PROGRESS_USE_TIMERS
300 int32_t val;
301 val = opal_atomic_sub_fetch_32(&num_event_users, 1);
302
303 OPAL_OUTPUT((debug_output, "progress: event_users_decrement setting count to %d", val));
304 #else
305 (void)opal_atomic_sub_fetch_32(&num_event_users, 1);
306 #endif
307
308 #if !OPAL_PROGRESS_USE_TIMERS
309
310 if (val >= 0) {
311 event_progress_counter = event_progress_delta;
312 }
313 #endif
314 }
315
316
317 bool
318 opal_progress_set_yield_when_idle(bool yieldopt)
319 {
320 bool tmp = opal_progress_yield_when_idle;
321 opal_progress_yield_when_idle = (yieldopt) ? 1 : 0;
322
323 OPAL_OUTPUT((debug_output, "progress: progress_set_yield_when_idle to %s",
324 opal_progress_yield_when_idle ? "true" : "false"));
325
326 return tmp;
327 }
328
329
330 void
331 opal_progress_set_event_poll_rate(int polltime)
332 {
333 OPAL_OUTPUT((debug_output, "progress: progress_set_event_poll_rate(%d)", polltime));
334
335 #if OPAL_PROGRESS_USE_TIMERS
336 event_progress_delta = 0;
337 # if OPAL_PROGRESS_ONLY_USEC_NATIVE
338 event_progress_last_time = opal_timer_base_get_usec();
339 # else
340 event_progress_last_time = opal_timer_base_get_cycles();
341 # endif
342 #else
343 event_progress_counter = event_progress_delta = 0;
344 #endif
345
346 if (polltime == 0) {
347 #if OPAL_PROGRESS_USE_TIMERS
348
349 event_progress_delta = 60 * 1000000;
350 #else
351
352 event_progress_delta = INT_MAX;
353 #endif
354 } else {
355 #if OPAL_PROGRESS_USE_TIMERS
356 event_progress_delta = polltime;
357 #else
358
359
360 event_progress_delta = polltime - 1;
361 #endif
362 }
363
364 #if OPAL_PROGRESS_USE_TIMERS && !OPAL_PROGRESS_ONLY_USEC_NATIVE
365
366 event_progress_delta = event_progress_delta * opal_timer_base_get_freq() / 1000000;
367 #endif
368 }
369
370 static int opal_progress_find_cb (opal_progress_callback_t cb, volatile opal_progress_callback_t *cbs,
371 size_t cbs_len)
372 {
373 for (size_t i = 0 ; i < cbs_len ; ++i) {
374 if (cbs[i] == cb) {
375 return (int) i;
376 }
377 }
378
379 return OPAL_ERR_NOT_FOUND;
380 }
381
382 static int _opal_progress_register (opal_progress_callback_t cb, volatile opal_progress_callback_t **cbs,
383 size_t *cbs_size, size_t *cbs_len)
384 {
385 int ret = OPAL_SUCCESS;
386
387 if (OPAL_ERR_NOT_FOUND != opal_progress_find_cb (cb, *cbs, *cbs_len)) {
388 return OPAL_SUCCESS;
389 }
390
391
392 if (*cbs_len + 1 > *cbs_size) {
393 opal_progress_callback_t *tmp, *old;
394
395 tmp = (opal_progress_callback_t *) malloc (sizeof (tmp[0]) * 2 * *cbs_size);
396 if (tmp == NULL) {
397 return OPAL_ERR_TEMP_OUT_OF_RESOURCE;
398 }
399
400 if (*cbs) {
401
402 memcpy (tmp, (void *) *cbs, sizeof(tmp[0]) * *cbs_size);
403 }
404
405 for (size_t i = *cbs_len ; i < 2 * *cbs_size ; ++i) {
406 tmp[i] = fake_cb;
407 }
408
409 opal_atomic_wmb ();
410
411
412 old = (opal_progress_callback_t *) opal_atomic_swap_ptr ((opal_atomic_intptr_t *) cbs, (intptr_t) tmp);
413
414 opal_atomic_wmb ();
415
416 free (old);
417 *cbs_size *= 2;
418 }
419
420 cbs[0][*cbs_len] = cb;
421 ++*cbs_len;
422
423 opal_atomic_wmb ();
424
425 return ret;
426 }
427
428 int opal_progress_register (opal_progress_callback_t cb)
429 {
430 int ret;
431
432 opal_atomic_lock(&progress_lock);
433
434 (void) _opal_progress_unregister (cb, callbacks_lp, &callbacks_lp_len);
435
436 ret = _opal_progress_register (cb, &callbacks, &callbacks_size, &callbacks_len);
437
438 opal_atomic_unlock(&progress_lock);
439
440 return ret;
441 }
442
443 int opal_progress_register_lp (opal_progress_callback_t cb)
444 {
445 int ret;
446
447 opal_atomic_lock(&progress_lock);
448
449 (void) _opal_progress_unregister (cb, callbacks, &callbacks_len);
450
451 ret = _opal_progress_register (cb, &callbacks_lp, &callbacks_lp_size, &callbacks_lp_len);
452
453 opal_atomic_unlock(&progress_lock);
454
455 return ret;
456 }
457
458 static int _opal_progress_unregister (opal_progress_callback_t cb, volatile opal_progress_callback_t *callback_array,
459 size_t *callback_array_len)
460 {
461 int ret = opal_progress_find_cb (cb, callback_array, *callback_array_len);
462 if (OPAL_ERR_NOT_FOUND == ret) {
463 return ret;
464 }
465
466
467
468
469
470 for (size_t i = (size_t) ret ; i < *callback_array_len - 1 ; ++i) {
471
472
473 (void) opal_atomic_swap_ptr ((opal_atomic_intptr_t *) (callback_array + i), (intptr_t) callback_array[i+1]);
474 }
475
476 callback_array[*callback_array_len] = fake_cb;
477 --*callback_array_len;
478
479 return OPAL_SUCCESS;
480 }
481
482 int opal_progress_unregister (opal_progress_callback_t cb)
483 {
484 int ret;
485
486 opal_atomic_lock(&progress_lock);
487
488 ret = _opal_progress_unregister (cb, callbacks, &callbacks_len);
489
490 if (OPAL_SUCCESS != ret) {
491
492
493 ret = _opal_progress_unregister (cb, callbacks_lp, &callbacks_lp_len);
494 }
495
496 opal_atomic_unlock(&progress_lock);
497
498 return ret;
499 }