This source file includes following definitions.
- hash_debug_entry
- eq_debug_entry
- detect_monotonic
- gettime
- event_base_gettimeofday_cached
- clear_time_cache
- update_time_cache
- event_init
- event_base_new
- event_config_is_avoided_method
- event_is_method_disabled
- event_base_get_features
- event_deferred_cb_queue_init
- notify_base_cbq_callback
- event_base_get_deferred_cb_queue
- event_enable_debug_mode
- event_disable_debug_mode
- event_base_new_with_config
- event_base_start_iocp
- event_base_stop_iocp
- event_base_free
- event_reinit
- event_get_supported_methods
- event_config_new
- event_config_entry_free
- event_config_free
- event_config_set_flag
- event_config_avoid_method
- event_config_require_features
- event_config_set_num_cpus_hint
- event_priority_init
- event_base_priority_init
- event_haveevents
- event_signal_closure
- is_common_timeout
- is_same_common_timeout
- get_common_timeout_list
- common_timeout_ok
- common_timeout_schedule
- common_timeout_callback
- event_base_init_common_timeout
- event_persist_closure
- event_process_active_single_queue
- event_process_deferred_callbacks
- event_process_active
- event_dispatch
- event_base_dispatch
- event_base_get_method
- event_loopexit_cb
- event_loopexit
- event_base_loopexit
- event_loopbreak
- event_base_loopbreak
- event_base_got_break
- event_base_got_exit
- event_loop
- event_base_loop
- event_once_cb
- event_once
- event_base_once
- event_assign
- event_base_set
- event_set
- event_new
- event_free
- event_debug_unassign
- event_priority_set
- event_pending
- event_initialized
- event_get_assignment
- event_get_struct_event_size
- event_get_fd
- event_get_base
- event_get_events
- event_get_callback
- event_get_callback_arg
- event_add
- evthread_notify_base_default
- evthread_notify_base_eventfd
- evthread_notify_base
- event_add_internal
- event_del
- event_del_internal
- event_active
- event_active_nolock
- event_deferred_cb_init
- event_deferred_cb_cancel
- event_deferred_cb_schedule
- timeout_next
- timeout_correct
- timeout_process
- event_queue_remove
- insert_common_timeout_inorder
- event_queue_insert
- event_get_version
- event_get_version_number
- event_get_method
- event_mm_malloc_
- event_mm_calloc_
- event_mm_strdup_
- event_mm_realloc_
- event_mm_free_
- event_set_mem_functions
- evthread_notify_drain_eventfd
- evthread_notify_drain_default
- evthread_make_base_notifiable
- event_base_dump_events
- event_base_add_virtual
- event_base_del_virtual
- event_global_setup_locks_
- event_base_assert_ok
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 #include "event2/event-config.h"
28
29 #ifdef WIN32
30 #include <winsock2.h>
31 #define WIN32_LEAN_AND_MEAN
32 #include <windows.h>
33 #undef WIN32_LEAN_AND_MEAN
34 #endif
35 #include <sys/types.h>
36 #if !defined(WIN32) && defined(_EVENT_HAVE_SYS_TIME_H)
37 #include <sys/time.h>
38 #endif
39 #include <sys/queue.h>
40 #ifdef _EVENT_HAVE_SYS_SOCKET_H
41 #include <sys/socket.h>
42 #endif
43 #include <stdio.h>
44 #include <stdlib.h>
45 #ifdef _EVENT_HAVE_UNISTD_H
46 #include <unistd.h>
47 #endif
48 #ifdef _EVENT_HAVE_SYS_EVENTFD_H
49 #include <sys/eventfd.h>
50 #endif
51 #include <ctype.h>
52 #include <errno.h>
53 #include <signal.h>
54 #include <string.h>
55 #include <time.h>
56
57 #include "event2/event.h"
58 #include "event2/event_struct.h"
59 #include "event2/event_compat.h"
60 #include "event-internal.h"
61 #include "defer-internal.h"
62 #include "evthread-internal.h"
63 #include "event2/thread.h"
64 #include "event2/util.h"
65 #include "log-internal.h"
66 #include "evmap-internal.h"
67 #include "iocp-internal.h"
68 #include "changelist-internal.h"
69 #include "ht-internal.h"
70 #include "util-internal.h"
71
72
73 #if defined(_EVENT_HAVE_EVENT_PORTS) && _EVENT_HAVE_EVENT_PORTS
74 extern const struct eventop evportops;
75 #endif
76 #if defined(_EVENT_HAVE_SELECT) && _EVENT_HAVE_SELECT
77 extern const struct eventop selectops;
78 #endif
79 #if defined(_EVENT_HAVE_EPOLL) && _EVENT_HAVE_EPOLL
80 extern const struct eventop epollops;
81 #endif
82 #if defined(_EVENT_HAVE_POLL) && _EVENT_HAVE_POLL
83 extern const struct eventop pollops;
84 #endif
85 #if defined(_EVENT_HAVE_WORKING_KQUEUE) && _EVENT_HAVE_WORKING_KQUEUE
86 extern const struct eventop kqops;
87 #endif
88 #if defined(_EVENT_HAVE_DEVPOLL) && _EVENT_HAVE_DEVPOLL
89 extern const struct eventop devpollops;
90 #endif
91 #ifdef WIN32
92 extern const struct eventop win32ops;
93 #endif
94
95
96 static const struct eventop *ompi_eventops[] = {
97 #if defined(_EVENT_HAVE_EVENT_PORTS) && _EVENT_HAVE_EVENT_PORTS
98 &evportops,
99 #endif
100 #if defined(_EVENT_HAVE_WORKING_KQUEUE) && _EVENT_HAVE_WORKING_KQUEUE
101 &kqops,
102 #endif
103 #if defined(_EVENT_HAVE_EPOLL) && _EVENT_HAVE_EPOLL
104 &epollops,
105 #endif
106 #if defined(_EVENT_HAVE_DEVPOLL) && _EVENT_HAVE_DEVPOLL
107 &devpollops,
108 #endif
109 #if defined(_EVENT_HAVE_POLL) && _EVENT_HAVE_POLL
110 &pollops,
111 #endif
112 #if defined(_EVENT_HAVE_SELECT) && _EVENT_HAVE_SELECT
113 &selectops,
114 #endif
115 #ifdef WIN32
116 &win32ops,
117 #endif
118 NULL
119 };
120
121
122
123 struct event_base *ompi_event_global_current_base_ = NULL;
124 #define current_base ompi_event_global_current_base_
125
126
127
128 static int use_monotonic;
129
130
131 static inline int event_add_internal(struct event *ev,
132 const struct timeval *tv, int tv_is_absolute);
133 static inline int event_del_internal(struct event *ev);
134
135 static void event_queue_insert(struct event_base *, struct event *, int);
136 static void event_queue_remove(struct event_base *, struct event *, int);
137 static int event_haveevents(struct event_base *);
138
139 static int event_process_active(struct event_base *);
140
141 static int timeout_next(struct event_base *, struct timeval **);
142 static void timeout_process(struct event_base *);
143 static void timeout_correct(struct event_base *, struct timeval *);
144
145 static inline void event_signal_closure(struct event_base *, struct event *ev);
146 static inline void event_persist_closure(struct event_base *, struct event *ev);
147
148 static int evthread_notify_base(struct event_base *base);
149
150 #ifndef _EVENT_DISABLE_DEBUG_MODE
151
152
153
154
155
156
157 struct event_debug_entry {
158 HT_ENTRY(event_debug_entry) node;
159 const struct event *ptr;
160 unsigned added : 1;
161 };
162
163 static inline unsigned
164 hash_debug_entry(const struct event_debug_entry *e)
165 {
166
167
168
169
170 unsigned u = (unsigned) ((ev_uintptr_t) e->ptr);
171
172
173
174 return (u >> 6);
175 }
176
177 static inline int
178 eq_debug_entry(const struct event_debug_entry *a,
179 const struct event_debug_entry *b)
180 {
181 return a->ptr == b->ptr;
182 }
183
184 int ompi__event_debug_mode_on = 0;
185
186 static int event_debug_mode_too_late = 0;
187 #ifndef _EVENT_DISABLE_THREAD_SUPPORT
188 static void *_event_debug_map_lock = NULL;
189 #endif
190 static HT_HEAD(event_debug_map, event_debug_entry) global_debug_map =
191 HT_INITIALIZER();
192
193 HT_PROTOTYPE(event_debug_map, event_debug_entry, node, hash_debug_entry,
194 eq_debug_entry)
195 HT_GENERATE(event_debug_map, event_debug_entry, node, hash_debug_entry,
196 eq_debug_entry, 0.5, mm_malloc, mm_realloc, mm_free)
197
198
199 #define _event_debug_note_setup(ev) do { \
200 if (ompi__event_debug_mode_on) { \
201 struct event_debug_entry *dent,find; \
202 find.ptr = (ev); \
203 EVLOCK_LOCK(_event_debug_map_lock, 0); \
204 dent = HT_FIND(event_debug_map, &global_debug_map, &find); \
205 if (dent) { \
206 dent->added = 0; \
207 } else { \
208 dent = mm_malloc(sizeof(*dent)); \
209 if (!dent) \
210 event_err(1, \
211 "Out of memory in debugging code"); \
212 dent->ptr = (ev); \
213 dent->added = 0; \
214 HT_INSERT(event_debug_map, &global_debug_map, dent); \
215 } \
216 EVLOCK_UNLOCK(_event_debug_map_lock, 0); \
217 } \
218 event_debug_mode_too_late = 1; \
219 } while (0)
220
221 #define _event_debug_note_teardown(ev) do { \
222 if (ompi__event_debug_mode_on) { \
223 struct event_debug_entry *dent,find; \
224 find.ptr = (ev); \
225 EVLOCK_LOCK(_event_debug_map_lock, 0); \
226 dent = HT_REMOVE(event_debug_map, &global_debug_map, &find); \
227 if (dent) \
228 mm_free(dent); \
229 EVLOCK_UNLOCK(_event_debug_map_lock, 0); \
230 } \
231 event_debug_mode_too_late = 1; \
232 } while (0)
233
234 #define _event_debug_note_add(ev) do { \
235 if (ompi__event_debug_mode_on) { \
236 struct event_debug_entry *dent,find; \
237 find.ptr = (ev); \
238 EVLOCK_LOCK(_event_debug_map_lock, 0); \
239 dent = HT_FIND(event_debug_map, &global_debug_map, &find); \
240 if (dent) { \
241 dent->added = 1; \
242 } else { \
243 event_errx(_EVENT_ERR_ABORT, \
244 "%s: noting an add on a non-setup event %p" \
245 " (events: 0x%x, fd: "EV_SOCK_FMT \
246 ", flags: 0x%x)", \
247 __func__, (ev), (ev)->ev_events, \
248 EV_SOCK_ARG((ev)->ev_fd), (ev)->ev_flags); \
249 } \
250 EVLOCK_UNLOCK(_event_debug_map_lock, 0); \
251 } \
252 event_debug_mode_too_late = 1; \
253 } while (0)
254
255 #define _event_debug_note_del(ev) do { \
256 if (ompi__event_debug_mode_on) { \
257 struct event_debug_entry *dent,find; \
258 find.ptr = (ev); \
259 EVLOCK_LOCK(_event_debug_map_lock, 0); \
260 dent = HT_FIND(event_debug_map, &global_debug_map, &find); \
261 if (dent) { \
262 dent->added = 0; \
263 } else { \
264 event_errx(_EVENT_ERR_ABORT, \
265 "%s: noting a del on a non-setup event %p" \
266 " (events: 0x%x, fd: "EV_SOCK_FMT \
267 ", flags: 0x%x)", \
268 __func__, (ev), (ev)->ev_events, \
269 EV_SOCK_ARG((ev)->ev_fd), (ev)->ev_flags); \
270 } \
271 EVLOCK_UNLOCK(_event_debug_map_lock, 0); \
272 } \
273 event_debug_mode_too_late = 1; \
274 } while (0)
275
276 #define _event_debug_assert_is_setup(ev) do { \
277 if (ompi__event_debug_mode_on) { \
278 struct event_debug_entry *dent,find; \
279 find.ptr = (ev); \
280 EVLOCK_LOCK(_event_debug_map_lock, 0); \
281 dent = HT_FIND(event_debug_map, &global_debug_map, &find); \
282 if (!dent) { \
283 event_errx(_EVENT_ERR_ABORT, \
284 "%s called on a non-initialized event %p" \
285 " (events: 0x%x, fd: "EV_SOCK_FMT\
286 ", flags: 0x%x)", \
287 __func__, (ev), (ev)->ev_events, \
288 EV_SOCK_ARG((ev)->ev_fd), (ev)->ev_flags); \
289 } \
290 EVLOCK_UNLOCK(_event_debug_map_lock, 0); \
291 } \
292 } while (0)
293
294
295 #define _event_debug_assert_not_added(ev) do { \
296 if (ompi__event_debug_mode_on) { \
297 struct event_debug_entry *dent,find; \
298 find.ptr = (ev); \
299 EVLOCK_LOCK(_event_debug_map_lock, 0); \
300 dent = HT_FIND(event_debug_map, &global_debug_map, &find); \
301 if (dent && dent->added) { \
302 event_errx(_EVENT_ERR_ABORT, \
303 "%s called on an already added event %p" \
304 " (events: 0x%x, fd: "EV_SOCK_FMT", " \
305 "flags: 0x%x)", \
306 __func__, (ev), (ev)->ev_events, \
307 EV_SOCK_ARG((ev)->ev_fd), (ev)->ev_flags); \
308 } \
309 EVLOCK_UNLOCK(_event_debug_map_lock, 0); \
310 } \
311 } while (0)
312 #else
313 #define _event_debug_note_setup(ev) \
314 ((void)0)
315 #define _event_debug_note_teardown(ev) \
316 ((void)0)
317 #define _event_debug_note_add(ev) \
318 ((void)0)
319 #define _event_debug_note_del(ev) \
320 ((void)0)
321 #define _event_debug_assert_is_setup(ev) \
322 ((void)0)
323 #define _event_debug_assert_not_added(ev) \
324 ((void)0)
325 #endif
326
327 #define EVENT_BASE_ASSERT_LOCKED(base) \
328 EVLOCK_ASSERT_LOCKED((base)->th_base_lock)
329
330
331
332 static void
333 detect_monotonic(void)
334 {
335 #if defined(_EVENT_HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
336 struct timespec ts;
337 static int use_monotonic_initialized = 0;
338
339 if (use_monotonic_initialized)
340 return;
341
342 if (clock_gettime(CLOCK_MONOTONIC, &ts) == 0)
343 use_monotonic = 1;
344
345 use_monotonic_initialized = 1;
346 #endif
347 }
348
349
350
351 #define CLOCK_SYNC_INTERVAL -1
352
353
354
355
356
357
358 static int
359 gettime(struct event_base *base, struct timeval *tp)
360 {
361 EVENT_BASE_ASSERT_LOCKED(base);
362
363 if (base->tv_cache.tv_sec) {
364 *tp = base->tv_cache;
365 return (0);
366 }
367
368 #if defined(_EVENT_HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
369 if (use_monotonic) {
370 struct timespec ts;
371
372 if (clock_gettime(CLOCK_MONOTONIC, &ts) == -1)
373 return (-1);
374
375 tp->tv_sec = ts.tv_sec;
376 tp->tv_usec = ts.tv_nsec / 1000;
377 if (base->last_updated_clock_diff + CLOCK_SYNC_INTERVAL
378 < ts.tv_sec) {
379 struct timeval tv;
380 evutil_gettimeofday(&tv,NULL);
381 evutil_timersub(&tv, tp, &base->tv_clock_diff);
382 base->last_updated_clock_diff = ts.tv_sec;
383 }
384
385 return (0);
386 }
387 #endif
388
389 return (evutil_gettimeofday(tp, NULL));
390 }
391
392 int
393 event_base_gettimeofday_cached(struct event_base *base, struct timeval *tv)
394 {
395 int r;
396 if (!base) {
397 base = current_base;
398 if (!current_base)
399 return evutil_gettimeofday(tv, NULL);
400 }
401
402 EVBASE_ACQUIRE_LOCK(base, th_base_lock);
403 if (base->tv_cache.tv_sec == 0) {
404 r = evutil_gettimeofday(tv, NULL);
405 } else {
406 #if defined(_EVENT_HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
407 evutil_timeradd(&base->tv_cache, &base->tv_clock_diff, tv);
408 #else
409 *tv = base->tv_cache;
410 #endif
411 r = 0;
412 }
413 EVBASE_RELEASE_LOCK(base, th_base_lock);
414 return r;
415 }
416
417
418 static inline void
419 clear_time_cache(struct event_base *base)
420 {
421 base->tv_cache.tv_sec = 0;
422 }
423
424
425 static inline void
426 update_time_cache(struct event_base *base)
427 {
428 base->tv_cache.tv_sec = 0;
429 if (!(base->flags & EVENT_BASE_FLAG_NO_CACHE_TIME))
430 gettime(base, &base->tv_cache);
431 }
432
433 struct event_base *
434 event_init(void)
435 {
436 struct event_base *base = event_base_new_with_config(NULL);
437
438 if (base == NULL) {
439 event_errx(1, "%s: Unable to construct event_base", __func__);
440 return NULL;
441 }
442
443 current_base = base;
444
445 return (base);
446 }
447
448 struct event_base *
449 event_base_new(void)
450 {
451 struct event_base *base = NULL;
452 struct event_config *cfg = event_config_new();
453 if (cfg) {
454 base = event_base_new_with_config(cfg);
455 event_config_free(cfg);
456 }
457 return base;
458 }
459
460
461
462 static int
463 event_config_is_avoided_method(const struct event_config *cfg,
464 const char *method)
465 {
466 struct event_config_entry *entry;
467
468 TAILQ_FOREACH(entry, &cfg->entries, next) {
469 if (entry->avoid_method != NULL &&
470 strcmp(entry->avoid_method, method) == 0)
471 return (1);
472 }
473
474 return (0);
475 }
476
477
478 static int
479 event_is_method_disabled(const char *name)
480 {
481 char environment[64];
482 int i;
483
484 evutil_snprintf(environment, sizeof(environment), "EVENT_NO%s", name);
485 for (i = 8; environment[i] != '\0'; ++i)
486 environment[i] = EVUTIL_TOUPPER(environment[i]);
487
488
489 return (evutil_getenv(environment) != NULL);
490 }
491
492 int
493 event_base_get_features(const struct event_base *base)
494 {
495 return base->evsel->features;
496 }
497
498 void
499 event_deferred_cb_queue_init(struct deferred_cb_queue *cb)
500 {
501 memset(cb, 0, sizeof(struct deferred_cb_queue));
502 TAILQ_INIT(&cb->deferred_cb_list);
503 }
504
505
506 static void
507 notify_base_cbq_callback(struct deferred_cb_queue *cb, void *baseptr)
508 {
509 struct event_base *base = baseptr;
510 if (EVBASE_NEED_NOTIFY(base))
511 evthread_notify_base(base);
512 }
513
514 struct deferred_cb_queue *
515 event_base_get_deferred_cb_queue(struct event_base *base)
516 {
517 return base ? &base->defer_queue : NULL;
518 }
519
520 void
521 event_enable_debug_mode(void)
522 {
523 #ifndef _EVENT_DISABLE_DEBUG_MODE
524 if (ompi__event_debug_mode_on)
525 event_errx(1, "%s was called twice!", __func__);
526 if (event_debug_mode_too_late)
527 event_errx(1, "%s must be called *before* creating any events "
528 "or event_bases",__func__);
529
530 ompi__event_debug_mode_on = 1;
531
532 HT_INIT(event_debug_map, &global_debug_map);
533 #endif
534 }
535
536 #if 0
537 void
538 event_disable_debug_mode(void)
539 {
540 struct event_debug_entry **ent, *victim;
541
542 EVLOCK_LOCK(_event_debug_map_lock, 0);
543 for (ent = HT_START(event_debug_map, &global_debug_map); ent; ) {
544 victim = *ent;
545 ent = HT_NEXT_RMV(event_debug_map,&global_debug_map, ent);
546 mm_free(victim);
547 }
548 HT_CLEAR(event_debug_map, &global_debug_map);
549 EVLOCK_UNLOCK(_event_debug_map_lock , 0);
550 }
551 #endif
552
553 struct event_base *
554 event_base_new_with_config(const struct event_config *cfg)
555 {
556 int i;
557 struct event_base *base;
558 int should_check_environment;
559
560 #ifndef _EVENT_DISABLE_DEBUG_MODE
561 event_debug_mode_too_late = 1;
562 #endif
563
564 if ((base = mm_calloc(1, sizeof(struct event_base))) == NULL) {
565 event_warn("%s: calloc", __func__);
566 return NULL;
567 }
568 detect_monotonic();
569 gettime(base, &base->event_tv);
570
571 min_heap_ctor(&base->timeheap);
572 TAILQ_INIT(&base->eventqueue);
573 base->sig.ev_signal_pair[0] = -1;
574 base->sig.ev_signal_pair[1] = -1;
575 base->th_notify_fd[0] = -1;
576 base->th_notify_fd[1] = -1;
577
578 event_deferred_cb_queue_init(&base->defer_queue);
579 base->defer_queue.notify_fn = notify_base_cbq_callback;
580 base->defer_queue.notify_arg = base;
581 if (cfg)
582 base->flags = cfg->flags;
583
584 evmap_io_initmap(&base->io);
585 evmap_signal_initmap(&base->sigmap);
586 event_changelist_init(&base->changelist);
587
588 base->evbase = NULL;
589
590 should_check_environment =
591 !(cfg && (cfg->flags & EVENT_BASE_FLAG_IGNORE_ENV));
592
593 for (i = 0; ompi_eventops[i] && !base->evbase; i++) {
594 if (cfg != NULL) {
595
596 if (event_config_is_avoided_method(cfg,
597 ompi_eventops[i]->name))
598 continue;
599 if ((ompi_eventops[i]->features & cfg->require_features)
600 != cfg->require_features)
601 continue;
602 }
603
604
605 if (should_check_environment &&
606 event_is_method_disabled(ompi_eventops[i]->name))
607 continue;
608
609 base->evsel = ompi_eventops[i];
610
611 base->evbase = base->evsel->init(base);
612 }
613
614 if (base->evbase == NULL) {
615 event_warnx("%s: no event mechanism available",
616 __func__);
617 base->evsel = NULL;
618 event_base_free(base);
619 return NULL;
620 }
621
622 if (evutil_getenv("EVENT_SHOW_METHOD"))
623 event_msgx("libevent using: %s", base->evsel->name);
624
625
626 if (event_base_priority_init(base, 1) < 0) {
627 event_base_free(base);
628 return NULL;
629 }
630
631
632
633 #ifndef _EVENT_DISABLE_THREAD_SUPPORT
634 if (EVTHREAD_LOCKING_ENABLED() &&
635 (!cfg || !(cfg->flags & EVENT_BASE_FLAG_NOLOCK))) {
636 int r;
637 EVTHREAD_ALLOC_LOCK(base->th_base_lock,
638 EVTHREAD_LOCKTYPE_RECURSIVE);
639 base->defer_queue.lock = base->th_base_lock;
640 EVTHREAD_ALLOC_COND(base->current_event_cond);
641 r = evthread_make_base_notifiable(base);
642 if (r<0) {
643 event_warnx("%s: Unable to make base notifiable.", __func__);
644 event_base_free(base);
645 return NULL;
646 }
647 }
648 #endif
649
650 #ifdef WIN32
651 if (cfg && (cfg->flags & EVENT_BASE_FLAG_STARTUP_IOCP))
652 event_base_start_iocp(base, cfg->n_cpus_hint);
653 #endif
654
655 return (base);
656 }
657
658 int
659 event_base_start_iocp(struct event_base *base, int n_cpus)
660 {
661 #ifdef WIN32
662 if (base->iocp)
663 return 0;
664 base->iocp = event_iocp_port_launch(n_cpus);
665 if (!base->iocp) {
666 event_warnx("%s: Couldn't launch IOCP", __func__);
667 return -1;
668 }
669 return 0;
670 #else
671 return -1;
672 #endif
673 }
674
675 void
676 event_base_stop_iocp(struct event_base *base)
677 {
678 #ifdef WIN32
679 int rv;
680
681 if (!base->iocp)
682 return;
683 rv = event_iocp_shutdown(base->iocp, -1);
684 EVUTIL_ASSERT(rv >= 0);
685 base->iocp = NULL;
686 #endif
687 }
688
689 void
690 event_base_free(struct event_base *base)
691 {
692 int i, n_deleted=0;
693 struct event *ev;
694
695
696
697
698
699 if (base == NULL && current_base)
700 base = current_base;
701
702 if (base == current_base)
703 current_base = NULL;
704
705 if (base == NULL) {
706 event_warnx("%s: no base to free", __func__);
707 return;
708 }
709
710
711 #ifdef WIN32
712 event_base_stop_iocp(base);
713 #endif
714
715
716 if (base->th_notify_fd[0] != -1) {
717 event_del(&base->th_notify);
718 EVUTIL_CLOSESOCKET(base->th_notify_fd[0]);
719 if (base->th_notify_fd[1] != -1)
720 EVUTIL_CLOSESOCKET(base->th_notify_fd[1]);
721 base->th_notify_fd[0] = -1;
722 base->th_notify_fd[1] = -1;
723 event_debug_unassign(&base->th_notify);
724 }
725
726
727 for (ev = TAILQ_FIRST(&base->eventqueue); ev; ) {
728 struct event *next = TAILQ_NEXT(ev, ev_next);
729 if (!(ev->ev_flags & EVLIST_INTERNAL)) {
730 event_del(ev);
731 ++n_deleted;
732 }
733 ev = next;
734 }
735 while ((ev = min_heap_top(&base->timeheap)) != NULL) {
736 event_del(ev);
737 ++n_deleted;
738 }
739 for (i = 0; i < base->n_common_timeouts; ++i) {
740 struct common_timeout_list *ctl =
741 base->common_timeout_queues[i];
742 event_del(&ctl->timeout_event);
743 event_debug_unassign(&ctl->timeout_event);
744 for (ev = TAILQ_FIRST(&ctl->events); ev; ) {
745 struct event *next = TAILQ_NEXT(ev,
746 ev_timeout_pos.ev_next_with_common_timeout);
747 if (!(ev->ev_flags & EVLIST_INTERNAL)) {
748 event_del(ev);
749 ++n_deleted;
750 }
751 ev = next;
752 }
753 mm_free(ctl);
754 }
755 if (base->common_timeout_queues)
756 mm_free(base->common_timeout_queues);
757
758 for (i = 0; i < base->nactivequeues; ++i) {
759 for (ev = TAILQ_FIRST(&base->activequeues[i]); ev; ) {
760 struct event *next = TAILQ_NEXT(ev, ev_active_next);
761 if (!(ev->ev_flags & EVLIST_INTERNAL)) {
762 event_del(ev);
763 ++n_deleted;
764 }
765 ev = next;
766 }
767 }
768
769 if (n_deleted)
770 event_debug(("%s: %d events were still set in base",
771 __func__, n_deleted));
772
773 if (base->evsel != NULL && base->evsel->dealloc != NULL)
774 base->evsel->dealloc(base);
775
776 for (i = 0; i < base->nactivequeues; ++i)
777 EVUTIL_ASSERT(TAILQ_EMPTY(&base->activequeues[i]));
778
779 EVUTIL_ASSERT(min_heap_empty(&base->timeheap));
780 min_heap_dtor(&base->timeheap);
781
782 mm_free(base->activequeues);
783
784 EVUTIL_ASSERT(TAILQ_EMPTY(&base->eventqueue));
785
786 evmap_io_clear(&base->io);
787 evmap_signal_clear(&base->sigmap);
788 event_changelist_freemem(&base->changelist);
789
790 EVTHREAD_FREE_LOCK(base->th_base_lock, EVTHREAD_LOCKTYPE_RECURSIVE);
791 EVTHREAD_FREE_COND(base->current_event_cond);
792
793 mm_free(base);
794 }
795
796
797 int
798 event_reinit(struct event_base *base)
799 {
800 const struct eventop *evsel;
801 int res = 0;
802 struct event *ev;
803 int was_notifiable = 0;
804
805 EVBASE_ACQUIRE_LOCK(base, th_base_lock);
806
807 evsel = base->evsel;
808
809 #if 0
810
811
812
813
814
815
816 if (!evsel->need_reinit)
817 goto done;
818 #endif
819
820
821 if (base->sig.ev_signal_added) {
822
823
824 event_queue_remove(base, &base->sig.ev_signal,
825 EVLIST_INSERTED);
826 if (base->sig.ev_signal.ev_flags & EVLIST_ACTIVE)
827 event_queue_remove(base, &base->sig.ev_signal,
828 EVLIST_ACTIVE);
829 if (base->sig.ev_signal_pair[0] != -1)
830 EVUTIL_CLOSESOCKET(base->sig.ev_signal_pair[0]);
831 if (base->sig.ev_signal_pair[1] != -1)
832 EVUTIL_CLOSESOCKET(base->sig.ev_signal_pair[1]);
833 base->sig.ev_signal_added = 0;
834 }
835 if (base->th_notify_fd[0] != -1) {
836
837
838 was_notifiable = 1;
839 event_queue_remove(base, &base->th_notify,
840 EVLIST_INSERTED);
841 if (base->th_notify.ev_flags & EVLIST_ACTIVE)
842 event_queue_remove(base, &base->th_notify,
843 EVLIST_ACTIVE);
844 base->sig.ev_signal_added = 0;
845 EVUTIL_CLOSESOCKET(base->th_notify_fd[0]);
846 if (base->th_notify_fd[1] != -1)
847 EVUTIL_CLOSESOCKET(base->th_notify_fd[1]);
848 base->th_notify_fd[0] = -1;
849 base->th_notify_fd[1] = -1;
850 event_debug_unassign(&base->th_notify);
851 }
852
853 if (base->evsel->dealloc != NULL)
854 base->evsel->dealloc(base);
855 base->evbase = evsel->init(base);
856 if (base->evbase == NULL) {
857 event_errx(1, "%s: could not reinitialize event mechanism",
858 __func__);
859 res = -1;
860 goto done;
861 }
862
863 event_changelist_freemem(&base->changelist);
864 evmap_io_clear(&base->io);
865 evmap_signal_clear(&base->sigmap);
866
867 TAILQ_FOREACH(ev, &base->eventqueue, ev_next) {
868 if (ev->ev_events & (EV_READ|EV_WRITE)) {
869 if (ev == &base->sig.ev_signal) {
870
871
872
873
874 continue;
875 }
876 if (evmap_io_add(base, ev->ev_fd, ev) == -1)
877 res = -1;
878 } else if (ev->ev_events & EV_SIGNAL) {
879 if (evmap_signal_add(base, (int)ev->ev_fd, ev) == -1)
880 res = -1;
881 }
882 }
883
884 if (was_notifiable && res == 0)
885 res = evthread_make_base_notifiable(base);
886
887 done:
888 EVBASE_RELEASE_LOCK(base, th_base_lock);
889 return (res);
890 }
891
892 const char **
893 event_get_supported_methods(void)
894 {
895 static const char **methods = NULL;
896 const struct eventop **method;
897 const char **tmp;
898 int i = 0, k;
899
900
901 for (method = &ompi_eventops[0]; *method != NULL; ++method) {
902 ++i;
903 }
904
905
906 tmp = mm_calloc((i + 1), sizeof(char *));
907 if (tmp == NULL)
908 return (NULL);
909
910
911 for (k = 0, i = 0; ompi_eventops[k] != NULL; ++k) {
912 tmp[i++] = ompi_eventops[k]->name;
913 }
914 tmp[i] = NULL;
915
916 if (methods != NULL)
917 mm_free((char**)methods);
918
919 methods = tmp;
920
921 return (methods);
922 }
923
924 struct event_config *
925 event_config_new(void)
926 {
927 struct event_config *cfg = mm_calloc(1, sizeof(*cfg));
928
929 if (cfg == NULL)
930 return (NULL);
931
932 TAILQ_INIT(&cfg->entries);
933
934 return (cfg);
935 }
936
937 static void
938 event_config_entry_free(struct event_config_entry *entry)
939 {
940 if (entry->avoid_method != NULL)
941 mm_free((char *)entry->avoid_method);
942 mm_free(entry);
943 }
944
945 void
946 event_config_free(struct event_config *cfg)
947 {
948 struct event_config_entry *entry;
949
950 while ((entry = TAILQ_FIRST(&cfg->entries)) != NULL) {
951 TAILQ_REMOVE(&cfg->entries, entry, next);
952 event_config_entry_free(entry);
953 }
954 mm_free(cfg);
955 }
956
957 int
958 event_config_set_flag(struct event_config *cfg, int flag)
959 {
960 if (!cfg)
961 return -1;
962 cfg->flags |= flag;
963 return 0;
964 }
965
966 int
967 event_config_avoid_method(struct event_config *cfg, const char *method)
968 {
969 struct event_config_entry *entry = mm_malloc(sizeof(*entry));
970 if (entry == NULL)
971 return (-1);
972
973 if ((entry->avoid_method = mm_strdup(method)) == NULL) {
974 mm_free(entry);
975 return (-1);
976 }
977
978 TAILQ_INSERT_TAIL(&cfg->entries, entry, next);
979
980 return (0);
981 }
982
983 int
984 event_config_require_features(struct event_config *cfg,
985 int features)
986 {
987 if (!cfg)
988 return (-1);
989 cfg->require_features = features;
990 return (0);
991 }
992
993 int
994 event_config_set_num_cpus_hint(struct event_config *cfg, int cpus)
995 {
996 if (!cfg)
997 return (-1);
998 cfg->n_cpus_hint = cpus;
999 return (0);
1000 }
1001
1002 int
1003 event_priority_init(int npriorities)
1004 {
1005 return event_base_priority_init(current_base, npriorities);
1006 }
1007
1008 int
1009 event_base_priority_init(struct event_base *base, int npriorities)
1010 {
1011 int i;
1012
1013 if (N_ACTIVE_CALLBACKS(base) || npriorities < 1
1014 || npriorities >= EVENT_MAX_PRIORITIES)
1015 return (-1);
1016
1017 if (npriorities == base->nactivequeues)
1018 return (0);
1019
1020 if (base->nactivequeues) {
1021 mm_free(base->activequeues);
1022 base->nactivequeues = 0;
1023 }
1024
1025
1026 base->activequeues = (struct event_list *)
1027 mm_calloc(npriorities, sizeof(struct event_list));
1028 if (base->activequeues == NULL) {
1029 event_warn("%s: calloc", __func__);
1030 return (-1);
1031 }
1032 base->nactivequeues = npriorities;
1033
1034 for (i = 0; i < base->nactivequeues; ++i) {
1035 TAILQ_INIT(&base->activequeues[i]);
1036 }
1037
1038 return (0);
1039 }
1040
1041
1042 static int
1043 event_haveevents(struct event_base *base)
1044 {
1045
1046 return (base->virtual_event_count > 0 || base->event_count > 0);
1047 }
1048
1049
1050 static inline void
1051 event_signal_closure(struct event_base *base, struct event *ev)
1052 {
1053 short ncalls;
1054 int should_break;
1055
1056
1057 ncalls = ev->ev_ncalls;
1058 if (ncalls != 0)
1059 ev->ev_pncalls = &ncalls;
1060 EVBASE_RELEASE_LOCK(base, th_base_lock);
1061 while (ncalls) {
1062 ncalls--;
1063 ev->ev_ncalls = ncalls;
1064 if (ncalls == 0)
1065 ev->ev_pncalls = NULL;
1066 (*ev->ev_callback)(ev->ev_fd, ev->ev_res, ev->ev_arg);
1067
1068 EVBASE_ACQUIRE_LOCK(base, th_base_lock);
1069 should_break = base->event_break;
1070 EVBASE_RELEASE_LOCK(base, th_base_lock);
1071
1072 if (should_break) {
1073 if (ncalls != 0)
1074 ev->ev_pncalls = NULL;
1075 return;
1076 }
1077 }
1078 }
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094 #define MICROSECONDS_MASK COMMON_TIMEOUT_MICROSECONDS_MASK
1095 #define COMMON_TIMEOUT_IDX_MASK 0x0ff00000
1096 #define COMMON_TIMEOUT_IDX_SHIFT 20
1097 #define COMMON_TIMEOUT_MASK 0xf0000000
1098 #define COMMON_TIMEOUT_MAGIC 0x50000000
1099
1100 #define COMMON_TIMEOUT_IDX(tv) \
1101 (((tv)->tv_usec & COMMON_TIMEOUT_IDX_MASK)>>COMMON_TIMEOUT_IDX_SHIFT)
1102
1103
1104 static inline int
1105 is_common_timeout(const struct timeval *tv,
1106 const struct event_base *base)
1107 {
1108 int idx;
1109 if ((tv->tv_usec & COMMON_TIMEOUT_MASK) != COMMON_TIMEOUT_MAGIC)
1110 return 0;
1111 idx = COMMON_TIMEOUT_IDX(tv);
1112 return idx < base->n_common_timeouts;
1113 }
1114
1115
1116
1117 static inline int
1118 is_same_common_timeout(const struct timeval *tv1, const struct timeval *tv2)
1119 {
1120 return (tv1->tv_usec & ~MICROSECONDS_MASK) ==
1121 (tv2->tv_usec & ~MICROSECONDS_MASK);
1122 }
1123
1124
1125
1126 static inline struct common_timeout_list *
1127 get_common_timeout_list(struct event_base *base, const struct timeval *tv)
1128 {
1129 return base->common_timeout_queues[COMMON_TIMEOUT_IDX(tv)];
1130 }
1131
1132 #if 0
1133 static inline int
1134 common_timeout_ok(const struct timeval *tv,
1135 struct event_base *base)
1136 {
1137 const struct timeval *expect =
1138 &get_common_timeout_list(base, tv)->duration;
1139 return tv->tv_sec == expect->tv_sec &&
1140 tv->tv_usec == expect->tv_usec;
1141 }
1142 #endif
1143
1144
1145
1146 static void
1147 common_timeout_schedule(struct common_timeout_list *ctl,
1148 const struct timeval *now, struct event *head)
1149 {
1150 struct timeval timeout = head->ev_timeout;
1151 timeout.tv_usec &= MICROSECONDS_MASK;
1152 event_add_internal(&ctl->timeout_event, &timeout, 1);
1153 }
1154
1155
1156
1157
1158 static void
1159 common_timeout_callback(evutil_socket_t fd, short what, void *arg)
1160 {
1161 struct timeval now;
1162 struct common_timeout_list *ctl = arg;
1163 struct event_base *base = ctl->base;
1164 struct event *ev = NULL;
1165 EVBASE_ACQUIRE_LOCK(base, th_base_lock);
1166 gettime(base, &now);
1167 while (1) {
1168 ev = TAILQ_FIRST(&ctl->events);
1169 if (!ev || ev->ev_timeout.tv_sec > now.tv_sec ||
1170 (ev->ev_timeout.tv_sec == now.tv_sec &&
1171 (ev->ev_timeout.tv_usec&MICROSECONDS_MASK) > now.tv_usec))
1172 break;
1173 event_del_internal(ev);
1174 event_active_nolock(ev, EV_TIMEOUT, 1);
1175 }
1176 if (ev)
1177 common_timeout_schedule(ctl, &now, ev);
1178 EVBASE_RELEASE_LOCK(base, th_base_lock);
1179 }
1180
1181 #define MAX_COMMON_TIMEOUTS 256
1182
1183 const struct timeval *
1184 event_base_init_common_timeout(struct event_base *base,
1185 const struct timeval *duration)
1186 {
1187 int i;
1188 struct timeval tv;
1189 const struct timeval *result=NULL;
1190 struct common_timeout_list *new_ctl;
1191
1192 EVBASE_ACQUIRE_LOCK(base, th_base_lock);
1193 if (duration->tv_usec > 1000000) {
1194 memcpy(&tv, duration, sizeof(struct timeval));
1195 if (is_common_timeout(duration, base))
1196 tv.tv_usec &= MICROSECONDS_MASK;
1197 tv.tv_sec += tv.tv_usec / 1000000;
1198 tv.tv_usec %= 1000000;
1199 duration = &tv;
1200 }
1201 for (i = 0; i < base->n_common_timeouts; ++i) {
1202 const struct common_timeout_list *ctl =
1203 base->common_timeout_queues[i];
1204 if (duration->tv_sec == ctl->duration.tv_sec &&
1205 duration->tv_usec ==
1206 (ctl->duration.tv_usec & MICROSECONDS_MASK)) {
1207 EVUTIL_ASSERT(is_common_timeout(&ctl->duration, base));
1208 result = &ctl->duration;
1209 goto done;
1210 }
1211 }
1212 if (base->n_common_timeouts == MAX_COMMON_TIMEOUTS) {
1213 event_warnx("%s: Too many common timeouts already in use; "
1214 "we only support %d per event_base", __func__,
1215 MAX_COMMON_TIMEOUTS);
1216 goto done;
1217 }
1218 if (base->n_common_timeouts_allocated == base->n_common_timeouts) {
1219 int n = base->n_common_timeouts < 16 ? 16 :
1220 base->n_common_timeouts*2;
1221 struct common_timeout_list **newqueues =
1222 mm_realloc(base->common_timeout_queues,
1223 n*sizeof(struct common_timeout_queue *));
1224 if (!newqueues) {
1225 event_warn("%s: realloc",__func__);
1226 goto done;
1227 }
1228 base->n_common_timeouts_allocated = n;
1229 base->common_timeout_queues = newqueues;
1230 }
1231 new_ctl = mm_calloc(1, sizeof(struct common_timeout_list));
1232 if (!new_ctl) {
1233 event_warn("%s: calloc",__func__);
1234 goto done;
1235 }
1236 TAILQ_INIT(&new_ctl->events);
1237 new_ctl->duration.tv_sec = duration->tv_sec;
1238 new_ctl->duration.tv_usec =
1239 duration->tv_usec | COMMON_TIMEOUT_MAGIC |
1240 (base->n_common_timeouts << COMMON_TIMEOUT_IDX_SHIFT);
1241 evtimer_assign(&new_ctl->timeout_event, base,
1242 common_timeout_callback, new_ctl);
1243 new_ctl->timeout_event.ev_flags |= EVLIST_INTERNAL;
1244 event_priority_set(&new_ctl->timeout_event, 0);
1245 new_ctl->base = base;
1246 base->common_timeout_queues[base->n_common_timeouts++] = new_ctl;
1247 result = &new_ctl->duration;
1248
1249 done:
1250 if (result)
1251 EVUTIL_ASSERT(is_common_timeout(result, base));
1252
1253 EVBASE_RELEASE_LOCK(base, th_base_lock);
1254 return result;
1255 }
1256
1257
1258 static inline void
1259 event_persist_closure(struct event_base *base, struct event *ev)
1260 {
1261
1262 void (*evcb_callback)(evutil_socket_t, short, void *);
1263
1264
1265 evutil_socket_t evcb_fd;
1266 short evcb_res;
1267 void *evcb_arg;
1268
1269
1270 if (ev->ev_io_timeout.tv_sec || ev->ev_io_timeout.tv_usec) {
1271
1272
1273
1274
1275 struct timeval run_at, relative_to, delay, now;
1276 ev_uint32_t usec_mask = 0;
1277 EVUTIL_ASSERT(is_same_common_timeout(&ev->ev_timeout,
1278 &ev->ev_io_timeout));
1279 gettime(base, &now);
1280 if (is_common_timeout(&ev->ev_timeout, base)) {
1281 delay = ev->ev_io_timeout;
1282 usec_mask = delay.tv_usec & ~MICROSECONDS_MASK;
1283 delay.tv_usec &= MICROSECONDS_MASK;
1284 if (ev->ev_res & EV_TIMEOUT) {
1285 relative_to = ev->ev_timeout;
1286 relative_to.tv_usec &= MICROSECONDS_MASK;
1287 } else {
1288 relative_to = now;
1289 }
1290 } else {
1291 delay = ev->ev_io_timeout;
1292 if (ev->ev_res & EV_TIMEOUT) {
1293 relative_to = ev->ev_timeout;
1294 } else {
1295 relative_to = now;
1296 }
1297 }
1298 evutil_timeradd(&relative_to, &delay, &run_at);
1299 if (evutil_timercmp(&run_at, &now, <)) {
1300
1301
1302
1303
1304
1305 evutil_timeradd(&now, &delay, &run_at);
1306 }
1307 run_at.tv_usec |= usec_mask;
1308 event_add_internal(ev, &run_at, 1);
1309 }
1310
1311
1312 evcb_callback = ev->ev_callback;
1313 evcb_fd = ev->ev_fd;
1314 evcb_res = ev->ev_res;
1315 evcb_arg = ev->ev_arg;
1316
1317
1318 EVBASE_RELEASE_LOCK(base, th_base_lock);
1319
1320
1321 (evcb_callback)(evcb_fd, evcb_res, evcb_arg);
1322 }
1323
1324
1325
1326
1327
1328
1329
1330
1331 static int
1332 event_process_active_single_queue(struct event_base *base,
1333 struct event_list *activeq)
1334 {
1335 struct event *ev;
1336 int count = 0;
1337
1338 EVUTIL_ASSERT(activeq != NULL);
1339
1340 for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) {
1341 if (ev->ev_events & EV_PERSIST)
1342 event_queue_remove(base, ev, EVLIST_ACTIVE);
1343 else
1344 event_del_internal(ev);
1345 if (!(ev->ev_flags & EVLIST_INTERNAL))
1346 ++count;
1347
1348 event_debug((
1349 "event_process_active: event: %p, %s%scall %p",
1350 ev,
1351 ev->ev_res & EV_READ ? "EV_READ " : " ",
1352 ev->ev_res & EV_WRITE ? "EV_WRITE " : " ",
1353 ev->ev_callback));
1354
1355 #ifndef _EVENT_DISABLE_THREAD_SUPPORT
1356 base->current_event = ev;
1357 base->current_event_waiters = 0;
1358 #endif
1359
1360 switch (ev->ev_closure) {
1361 case EV_CLOSURE_SIGNAL:
1362 event_signal_closure(base, ev);
1363 break;
1364 case EV_CLOSURE_PERSIST:
1365 event_persist_closure(base, ev);
1366 break;
1367 default:
1368 case EV_CLOSURE_NONE:
1369 EVBASE_RELEASE_LOCK(base, th_base_lock);
1370 (*ev->ev_callback)(
1371 ev->ev_fd, ev->ev_res, ev->ev_arg);
1372 break;
1373 }
1374
1375 EVBASE_ACQUIRE_LOCK(base, th_base_lock);
1376 #ifndef _EVENT_DISABLE_THREAD_SUPPORT
1377 base->current_event = NULL;
1378 if (base->current_event_waiters) {
1379 base->current_event_waiters = 0;
1380 EVTHREAD_COND_BROADCAST(base->current_event_cond);
1381 }
1382 #endif
1383
1384 if (base->event_break)
1385 return -1;
1386 if (base->event_continue)
1387 break;
1388 }
1389 return count;
1390 }
1391
1392
1393
1394
1395
1396
1397
1398 static int
1399 event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr)
1400 {
1401 int count = 0;
1402 struct deferred_cb *cb;
1403
1404 #define MAX_DEFERRED 16
1405 while ((cb = TAILQ_FIRST(&queue->deferred_cb_list))) {
1406 cb->queued = 0;
1407 TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next);
1408 --queue->active_count;
1409 UNLOCK_DEFERRED_QUEUE(queue);
1410
1411 cb->cb(cb, cb->arg);
1412
1413 LOCK_DEFERRED_QUEUE(queue);
1414 if (*breakptr)
1415 return -1;
1416 if (++count == MAX_DEFERRED)
1417 break;
1418 }
1419 #undef MAX_DEFERRED
1420 return count;
1421 }
1422
1423
1424
1425
1426
1427
1428
1429 static int
1430 event_process_active(struct event_base *base)
1431 {
1432
1433 struct event_list *activeq = NULL;
1434 int i, c = 0;
1435
1436 for (i = 0; i < base->nactivequeues; ++i) {
1437 if (TAILQ_FIRST(&base->activequeues[i]) != NULL) {
1438 base->event_running_priority = i;
1439 activeq = &base->activequeues[i];
1440 c = event_process_active_single_queue(base, activeq);
1441 if (c < 0) {
1442 base->event_running_priority = -1;
1443 return -1;
1444 } else if (c > 0)
1445 break;
1446
1447
1448
1449 }
1450 }
1451
1452 event_process_deferred_callbacks(&base->defer_queue,&base->event_break);
1453 base->event_running_priority = -1;
1454 return c;
1455 }
1456
1457
1458
1459
1460
1461 int
1462 event_dispatch(void)
1463 {
1464 return (event_loop(0));
1465 }
1466
1467 int
1468 event_base_dispatch(struct event_base *event_base)
1469 {
1470 return (event_base_loop(event_base, 0));
1471 }
1472
1473 const char *
1474 event_base_get_method(const struct event_base *base)
1475 {
1476 EVUTIL_ASSERT(base);
1477 return (base->evsel->name);
1478 }
1479
1480
1481
1482 static void
1483 event_loopexit_cb(evutil_socket_t fd, short what, void *arg)
1484 {
1485 struct event_base *base = arg;
1486 base->event_gotterm = 1;
1487 }
1488
1489 int
1490 event_loopexit(const struct timeval *tv)
1491 {
1492 return (event_once(-1, EV_TIMEOUT, event_loopexit_cb,
1493 current_base, tv));
1494 }
1495
1496 int
1497 event_base_loopexit(struct event_base *event_base, const struct timeval *tv)
1498 {
1499 return (event_base_once(event_base, -1, EV_TIMEOUT, event_loopexit_cb,
1500 event_base, tv));
1501 }
1502
1503 int
1504 event_loopbreak(void)
1505 {
1506 return (event_base_loopbreak(current_base));
1507 }
1508
1509 int
1510 event_base_loopbreak(struct event_base *event_base)
1511 {
1512 int r = 0;
1513 if (event_base == NULL)
1514 return (-1);
1515
1516 EVBASE_ACQUIRE_LOCK(event_base, th_base_lock);
1517 event_base->event_break = 1;
1518
1519 if (EVBASE_NEED_NOTIFY(event_base)) {
1520 r = evthread_notify_base(event_base);
1521 } else {
1522 r = (0);
1523 }
1524 EVBASE_RELEASE_LOCK(event_base, th_base_lock);
1525 return r;
1526 }
1527
1528 int
1529 event_base_got_break(struct event_base *event_base)
1530 {
1531 int res;
1532 EVBASE_ACQUIRE_LOCK(event_base, th_base_lock);
1533 res = event_base->event_break;
1534 EVBASE_RELEASE_LOCK(event_base, th_base_lock);
1535 return res;
1536 }
1537
1538 int
1539 event_base_got_exit(struct event_base *event_base)
1540 {
1541 int res;
1542 EVBASE_ACQUIRE_LOCK(event_base, th_base_lock);
1543 res = event_base->event_gotterm;
1544 EVBASE_RELEASE_LOCK(event_base, th_base_lock);
1545 return res;
1546 }
1547
1548
1549
1550 int
1551 event_loop(int flags)
1552 {
1553 return event_base_loop(current_base, flags);
1554 }
1555
1556 int
1557 event_base_loop(struct event_base *base, int flags)
1558 {
1559 const struct eventop *evsel = base->evsel;
1560 struct timeval tv;
1561 struct timeval *tv_p;
1562 int res, done, retval = 0;
1563
1564
1565
1566 EVBASE_ACQUIRE_LOCK(base, th_base_lock);
1567
1568 if (base->running_loop) {
1569
1570 #if OPAL_ENABLE_DEBUG
1571 event_warnx("%s: reentrant invocation. Only one event_base_loop"
1572 " can run on each event_base at once.", __func__);
1573 #endif
1574 EVBASE_RELEASE_LOCK(base, th_base_lock);
1575 return -1;
1576 }
1577
1578 base->running_loop = 1;
1579
1580 clear_time_cache(base);
1581
1582 if (base->sig.ev_signal_added && base->sig.ev_n_signals_added)
1583 evsig_set_base(base);
1584
1585 done = 0;
1586
1587 #ifndef _EVENT_DISABLE_THREAD_SUPPORT
1588 base->th_owner_id = EVTHREAD_GET_ID();
1589 #endif
1590
1591 base->event_gotterm = base->event_break = 0;
1592
1593 while (!done) {
1594 base->event_continue = 0;
1595
1596
1597 if (base->event_gotterm) {
1598 break;
1599 }
1600
1601 if (base->event_break) {
1602 break;
1603 }
1604
1605 timeout_correct(base, &tv);
1606
1607 tv_p = &tv;
1608 if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) {
1609 timeout_next(base, &tv_p);
1610 } else {
1611
1612
1613
1614
1615 evutil_timerclear(&tv);
1616 }
1617
1618
1619 if (!event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)) {
1620 event_debug(("%s: no events registered.", __func__));
1621 retval = 1;
1622 goto done;
1623 }
1624
1625
1626 gettime(base, &base->event_tv);
1627
1628 clear_time_cache(base);
1629
1630 res = evsel->dispatch(base, tv_p);
1631
1632 if (res == -1) {
1633 event_debug(("%s: dispatch returned unsuccessfully.",
1634 __func__));
1635 retval = -1;
1636 goto done;
1637 }
1638
1639 update_time_cache(base);
1640
1641 timeout_process(base);
1642
1643 if (N_ACTIVE_CALLBACKS(base)) {
1644 int n = event_process_active(base);
1645 if ((flags & EVLOOP_ONCE)
1646 && N_ACTIVE_CALLBACKS(base) == 0
1647 && n != 0)
1648 done = 1;
1649 } else if (flags & EVLOOP_NONBLOCK)
1650 done = 1;
1651 }
1652 event_debug(("%s: asked to terminate loop.", __func__));
1653
1654 done:
1655 clear_time_cache(base);
1656 base->running_loop = 0;
1657
1658 EVBASE_RELEASE_LOCK(base, th_base_lock);
1659
1660 return (retval);
1661 }
1662
1663
1664 struct event_once {
1665 struct event ev;
1666
1667 void (*cb)(evutil_socket_t, short, void *);
1668 void *arg;
1669 };
1670
1671
1672
1673 static void
1674 event_once_cb(evutil_socket_t fd, short events, void *arg)
1675 {
1676 struct event_once *eonce = arg;
1677
1678 (*eonce->cb)(fd, events, eonce->arg);
1679 event_debug_unassign(&eonce->ev);
1680 mm_free(eonce);
1681 }
1682
1683
1684 int
1685 event_once(evutil_socket_t fd, short events,
1686 void (*callback)(evutil_socket_t, short, void *),
1687 void *arg, const struct timeval *tv)
1688 {
1689 return event_base_once(current_base, fd, events, callback, arg, tv);
1690 }
1691
1692
1693 int
1694 event_base_once(struct event_base *base, evutil_socket_t fd, short events,
1695 void (*callback)(evutil_socket_t, short, void *),
1696 void *arg, const struct timeval *tv)
1697 {
1698 struct event_once *eonce;
1699 struct timeval etv;
1700 int res = 0;
1701
1702
1703
1704 if (events & (EV_SIGNAL|EV_PERSIST))
1705 return (-1);
1706
1707 if ((eonce = mm_calloc(1, sizeof(struct event_once))) == NULL)
1708 return (-1);
1709
1710 eonce->cb = callback;
1711 eonce->arg = arg;
1712
1713 if (events == EV_TIMEOUT) {
1714 if (tv == NULL) {
1715 evutil_timerclear(&etv);
1716 tv = &etv;
1717 }
1718
1719 evtimer_assign(&eonce->ev, base, event_once_cb, eonce);
1720 } else if (events & (EV_READ|EV_WRITE)) {
1721 events &= EV_READ|EV_WRITE;
1722
1723 event_assign(&eonce->ev, base, fd, events, event_once_cb, eonce);
1724 } else {
1725
1726 mm_free(eonce);
1727 return (-1);
1728 }
1729
1730 if (res == 0)
1731 res = event_add(&eonce->ev, tv);
1732 if (res != 0) {
1733 mm_free(eonce);
1734 return (res);
1735 }
1736
1737 return (0);
1738 }
1739
1740 int
1741 event_assign(struct event *ev, struct event_base *base, evutil_socket_t fd, short events, void (*callback)(evutil_socket_t, short, void *), void *arg)
1742 {
1743 if (!base)
1744 base = current_base;
1745
1746 _event_debug_assert_not_added(ev);
1747
1748 ev->ev_base = base;
1749
1750 ev->ev_callback = callback;
1751 ev->ev_arg = arg;
1752 ev->ev_fd = fd;
1753 ev->ev_events = events;
1754 ev->ev_res = 0;
1755 ev->ev_flags = EVLIST_INIT;
1756 ev->ev_ncalls = 0;
1757 ev->ev_pncalls = NULL;
1758
1759 if (events & EV_SIGNAL) {
1760 if ((events & (EV_READ|EV_WRITE)) != 0) {
1761 event_warnx("%s: EV_SIGNAL is not compatible with "
1762 "EV_READ or EV_WRITE", __func__);
1763 return -1;
1764 }
1765 ev->ev_closure = EV_CLOSURE_SIGNAL;
1766 } else {
1767 if (events & EV_PERSIST) {
1768 evutil_timerclear(&ev->ev_io_timeout);
1769 ev->ev_closure = EV_CLOSURE_PERSIST;
1770 } else {
1771 ev->ev_closure = EV_CLOSURE_NONE;
1772 }
1773 }
1774
1775 min_heap_elem_init(ev);
1776
1777 if (base != NULL) {
1778
1779 ev->ev_pri = base->nactivequeues / 2;
1780 }
1781
1782 _event_debug_note_setup(ev);
1783
1784 return 0;
1785 }
1786
1787 int
1788 event_base_set(struct event_base *base, struct event *ev)
1789 {
1790
1791 if (ev->ev_flags != EVLIST_INIT)
1792 return (-1);
1793
1794 _event_debug_assert_is_setup(ev);
1795
1796 ev->ev_base = base;
1797 ev->ev_pri = base->nactivequeues/2;
1798
1799 return (0);
1800 }
1801
1802 void
1803 event_set(struct event *ev, evutil_socket_t fd, short events,
1804 void (*callback)(evutil_socket_t, short, void *), void *arg)
1805 {
1806 int r;
1807 r = event_assign(ev, current_base, fd, events, callback, arg);
1808 EVUTIL_ASSERT(r == 0);
1809 }
1810
1811 struct event *
1812 event_new(struct event_base *base, evutil_socket_t fd, short events, void (*cb)(evutil_socket_t, short, void *), void *arg)
1813 {
1814 struct event *ev;
1815 ev = mm_malloc(sizeof(struct event));
1816 if (ev == NULL)
1817 return (NULL);
1818 if (event_assign(ev, base, fd, events, cb, arg) < 0) {
1819 mm_free(ev);
1820 return (NULL);
1821 }
1822
1823 return (ev);
1824 }
1825
1826 void
1827 event_free(struct event *ev)
1828 {
1829 _event_debug_assert_is_setup(ev);
1830
1831
1832 event_del(ev);
1833 _event_debug_note_teardown(ev);
1834 mm_free(ev);
1835
1836 }
1837
1838 void
1839 event_debug_unassign(struct event *ev)
1840 {
1841 _event_debug_assert_not_added(ev);
1842 _event_debug_note_teardown(ev);
1843
1844 ev->ev_flags &= ~EVLIST_INIT;
1845 }
1846
1847
1848
1849
1850
1851
1852 int
1853 event_priority_set(struct event *ev, int pri)
1854 {
1855 _event_debug_assert_is_setup(ev);
1856
1857 if (ev->ev_flags & EVLIST_ACTIVE)
1858 return (-1);
1859 if (pri < 0 || pri >= ev->ev_base->nactivequeues)
1860 return (-1);
1861
1862 ev->ev_pri = pri;
1863
1864 return (0);
1865 }
1866
1867
1868
1869
1870
1871 int
1872 event_pending(const struct event *ev, short event, struct timeval *tv)
1873 {
1874 int flags = 0;
1875
1876 if (EVUTIL_FAILURE_CHECK(ev->ev_base == NULL)) {
1877 event_warnx("%s: event has no event_base set.", __func__);
1878 return 0;
1879 }
1880
1881 EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);
1882 _event_debug_assert_is_setup(ev);
1883
1884 if (ev->ev_flags & EVLIST_INSERTED)
1885 flags |= (ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL));
1886 if (ev->ev_flags & EVLIST_ACTIVE)
1887 flags |= ev->ev_res;
1888 if (ev->ev_flags & EVLIST_TIMEOUT)
1889 flags |= EV_TIMEOUT;
1890
1891 event &= (EV_TIMEOUT|EV_READ|EV_WRITE|EV_SIGNAL);
1892
1893
1894 if (tv != NULL && (flags & event & EV_TIMEOUT)) {
1895 struct timeval tmp = ev->ev_timeout;
1896 tmp.tv_usec &= MICROSECONDS_MASK;
1897 #if defined(_EVENT_HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
1898
1899 evutil_timeradd(&ev->ev_base->tv_clock_diff, &tmp, tv);
1900 #else
1901 *tv = tmp;
1902 #endif
1903 }
1904
1905 EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);
1906
1907 return (flags & event);
1908 }
1909
1910 int
1911 event_initialized(const struct event *ev)
1912 {
1913 if (!(ev->ev_flags & EVLIST_INIT))
1914 return 0;
1915
1916 return 1;
1917 }
1918
1919 void
1920 event_get_assignment(const struct event *event, struct event_base **base_out, evutil_socket_t *fd_out, short *events_out, event_callback_fn *callback_out, void **arg_out)
1921 {
1922 _event_debug_assert_is_setup(event);
1923
1924 if (base_out)
1925 *base_out = event->ev_base;
1926 if (fd_out)
1927 *fd_out = event->ev_fd;
1928 if (events_out)
1929 *events_out = event->ev_events;
1930 if (callback_out)
1931 *callback_out = event->ev_callback;
1932 if (arg_out)
1933 *arg_out = event->ev_arg;
1934 }
1935
1936 size_t
1937 event_get_struct_event_size(void)
1938 {
1939 return sizeof(struct event);
1940 }
1941
1942 evutil_socket_t
1943 event_get_fd(const struct event *ev)
1944 {
1945 _event_debug_assert_is_setup(ev);
1946 return ev->ev_fd;
1947 }
1948
1949 struct event_base *
1950 event_get_base(const struct event *ev)
1951 {
1952 _event_debug_assert_is_setup(ev);
1953 return ev->ev_base;
1954 }
1955
1956 short
1957 event_get_events(const struct event *ev)
1958 {
1959 _event_debug_assert_is_setup(ev);
1960 return ev->ev_events;
1961 }
1962
1963 event_callback_fn
1964 event_get_callback(const struct event *ev)
1965 {
1966 _event_debug_assert_is_setup(ev);
1967 return ev->ev_callback;
1968 }
1969
1970 void *
1971 event_get_callback_arg(const struct event *ev)
1972 {
1973 _event_debug_assert_is_setup(ev);
1974 return ev->ev_arg;
1975 }
1976
1977 int
1978 event_add(struct event *ev, const struct timeval *tv)
1979 {
1980 int res;
1981
1982 if (EVUTIL_FAILURE_CHECK(!ev->ev_base)) {
1983 event_warnx("%s: event has no event_base set.", __func__);
1984 return -1;
1985 }
1986
1987 EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);
1988
1989 res = event_add_internal(ev, tv, 0);
1990
1991 EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);
1992
1993 return (res);
1994 }
1995
1996
1997
1998
1999
2000 static int
2001 evthread_notify_base_default(struct event_base *base)
2002 {
2003 char buf[1];
2004 int r;
2005 buf[0] = (char) 0;
2006 #ifdef WIN32
2007 r = send(base->th_notify_fd[1], buf, 1, 0);
2008 #else
2009 r = write(base->th_notify_fd[1], buf, 1);
2010 #endif
2011 return (r < 0 && errno != EAGAIN) ? -1 : 0;
2012 }
2013
2014 #if defined(_EVENT_HAVE_EVENTFD) && defined(_EVENT_HAVE_SYS_EVENTFD_H)
2015
2016
2017 static int
2018 evthread_notify_base_eventfd(struct event_base *base)
2019 {
2020 ev_uint64_t msg = 1;
2021 int r;
2022 do {
2023 r = write(base->th_notify_fd[0], (void*) &msg, sizeof(msg));
2024 } while (r < 0 && errno == EAGAIN);
2025
2026 return (r < 0) ? -1 : 0;
2027 }
2028 #endif
2029
2030
2031
2032
2033 static int
2034 evthread_notify_base(struct event_base *base)
2035 {
2036 EVENT_BASE_ASSERT_LOCKED(base);
2037 if (!base->th_notify_fn)
2038 return -1;
2039 if (base->is_notify_pending)
2040 return 0;
2041 base->is_notify_pending = 1;
2042 return base->th_notify_fn(base);
2043 }
2044
2045
2046
2047
2048
2049 static inline int
2050 event_add_internal(struct event *ev, const struct timeval *tv,
2051 int tv_is_absolute)
2052 {
2053 struct event_base *base = ev->ev_base;
2054 int res = 0;
2055 int notify = 0;
2056
2057 EVENT_BASE_ASSERT_LOCKED(base);
2058 _event_debug_assert_is_setup(ev);
2059
2060 event_debug((
2061 "event_add: event: %p (fd "EV_SOCK_FMT"), %s%s%scall %p",
2062 ev,
2063 EV_SOCK_ARG(ev->ev_fd),
2064 ev->ev_events & EV_READ ? "EV_READ " : " ",
2065 ev->ev_events & EV_WRITE ? "EV_WRITE " : " ",
2066 tv ? "EV_TIMEOUT " : " ",
2067 ev->ev_callback));
2068
2069 EVUTIL_ASSERT(!(ev->ev_flags & ~EVLIST_ALL));
2070
2071
2072
2073
2074
2075 if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) {
2076 if (min_heap_reserve(&base->timeheap,
2077 1 + min_heap_size(&base->timeheap)) == -1)
2078 return (-1);
2079 }
2080
2081
2082
2083
2084
2085 #ifndef _EVENT_DISABLE_THREAD_SUPPORT
2086 if (base->current_event == ev && (ev->ev_events & EV_SIGNAL)
2087 && !EVBASE_IN_THREAD(base)) {
2088 ++base->current_event_waiters;
2089 EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
2090 }
2091 #endif
2092
2093 if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) &&
2094 !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) {
2095 if (ev->ev_events & (EV_READ|EV_WRITE))
2096 res = evmap_io_add(base, ev->ev_fd, ev);
2097 else if (ev->ev_events & EV_SIGNAL)
2098 res = evmap_signal_add(base, (int)ev->ev_fd, ev);
2099 if (res != -1)
2100 event_queue_insert(base, ev, EVLIST_INSERTED);
2101 if (res == 1) {
2102
2103 notify = 1;
2104 res = 0;
2105 }
2106 }
2107
2108
2109
2110
2111
2112 if (res != -1 && tv != NULL) {
2113 struct timeval now;
2114 int common_timeout;
2115
2116
2117
2118
2119
2120
2121
2122 if (ev->ev_closure == EV_CLOSURE_PERSIST && !tv_is_absolute)
2123 ev->ev_io_timeout = *tv;
2124
2125
2126
2127
2128
2129 if (ev->ev_flags & EVLIST_TIMEOUT) {
2130
2131 if (min_heap_elt_is_top(ev))
2132 notify = 1;
2133 event_queue_remove(base, ev, EVLIST_TIMEOUT);
2134 }
2135
2136
2137
2138
2139 if ((ev->ev_flags & EVLIST_ACTIVE) &&
2140 (ev->ev_res & EV_TIMEOUT)) {
2141 if (ev->ev_events & EV_SIGNAL) {
2142
2143
2144
2145 if (ev->ev_ncalls && ev->ev_pncalls) {
2146
2147 *ev->ev_pncalls = 0;
2148 }
2149 }
2150
2151 event_queue_remove(base, ev, EVLIST_ACTIVE);
2152 }
2153
2154 gettime(base, &now);
2155
2156 common_timeout = is_common_timeout(tv, base);
2157 if (tv_is_absolute) {
2158 ev->ev_timeout = *tv;
2159 } else if (common_timeout) {
2160 struct timeval tmp = *tv;
2161 tmp.tv_usec &= MICROSECONDS_MASK;
2162 evutil_timeradd(&now, &tmp, &ev->ev_timeout);
2163 ev->ev_timeout.tv_usec |=
2164 (tv->tv_usec & ~MICROSECONDS_MASK);
2165 } else {
2166 evutil_timeradd(&now, tv, &ev->ev_timeout);
2167 }
2168
2169 event_debug((
2170 "event_add: timeout in %d seconds, call %p",
2171 (int)tv->tv_sec, ev->ev_callback));
2172
2173 event_queue_insert(base, ev, EVLIST_TIMEOUT);
2174 if (common_timeout) {
2175 struct common_timeout_list *ctl =
2176 get_common_timeout_list(base, &ev->ev_timeout);
2177 if (ev == TAILQ_FIRST(&ctl->events)) {
2178 common_timeout_schedule(ctl, &now, ev);
2179 }
2180 } else {
2181
2182
2183
2184
2185 if (min_heap_elt_is_top(ev))
2186 notify = 1;
2187 }
2188 }
2189
2190
2191 if (res != -1 && notify && EVBASE_NEED_NOTIFY(base))
2192 evthread_notify_base(base);
2193
2194 _event_debug_note_add(ev);
2195
2196 return (res);
2197 }
2198
2199 int
2200 event_del(struct event *ev)
2201 {
2202 int res;
2203
2204 if (EVUTIL_FAILURE_CHECK(!ev->ev_base)) {
2205 event_warnx("%s: event has no event_base set.", __func__);
2206 return -1;
2207 }
2208
2209 EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);
2210
2211 res = event_del_internal(ev);
2212
2213 EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);
2214
2215 return (res);
2216 }
2217
2218
2219 static inline int
2220 event_del_internal(struct event *ev)
2221 {
2222 struct event_base *base;
2223 int res = 0, notify = 0;
2224
2225 event_debug(("event_del: %p (fd "EV_SOCK_FMT"), callback %p",
2226 ev, EV_SOCK_ARG(ev->ev_fd), ev->ev_callback));
2227
2228
2229 if (ev->ev_base == NULL)
2230 return (-1);
2231
2232 EVENT_BASE_ASSERT_LOCKED(ev->ev_base);
2233
2234
2235
2236
2237
2238
2239 base = ev->ev_base;
2240 #ifndef _EVENT_DISABLE_THREAD_SUPPORT
2241 if (base->current_event == ev && !EVBASE_IN_THREAD(base)) {
2242 ++base->current_event_waiters;
2243 EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
2244 }
2245 #endif
2246
2247 EVUTIL_ASSERT(!(ev->ev_flags & ~EVLIST_ALL));
2248
2249
2250 if (ev->ev_events & EV_SIGNAL) {
2251 if (ev->ev_ncalls && ev->ev_pncalls) {
2252
2253 *ev->ev_pncalls = 0;
2254 }
2255 }
2256
2257 if (ev->ev_flags & EVLIST_TIMEOUT) {
2258
2259
2260
2261
2262
2263
2264
2265 event_queue_remove(base, ev, EVLIST_TIMEOUT);
2266 }
2267
2268 if (ev->ev_flags & EVLIST_ACTIVE)
2269 event_queue_remove(base, ev, EVLIST_ACTIVE);
2270
2271 if (ev->ev_flags & EVLIST_INSERTED) {
2272 event_queue_remove(base, ev, EVLIST_INSERTED);
2273 if (ev->ev_events & (EV_READ|EV_WRITE))
2274 res = evmap_io_del(base, ev->ev_fd, ev);
2275 else
2276 res = evmap_signal_del(base, (int)ev->ev_fd, ev);
2277 if (res == 1) {
2278
2279 notify = 1;
2280 res = 0;
2281 }
2282 }
2283
2284
2285 if (res != -1 && notify && EVBASE_NEED_NOTIFY(base))
2286 evthread_notify_base(base);
2287
2288 _event_debug_note_del(ev);
2289
2290 return (res);
2291 }
2292
2293 void
2294 event_active(struct event *ev, int res, short ncalls)
2295 {
2296 if (EVUTIL_FAILURE_CHECK(!ev->ev_base)) {
2297 event_warnx("%s: event has no event_base set.", __func__);
2298 return;
2299 }
2300
2301 EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);
2302
2303 _event_debug_assert_is_setup(ev);
2304
2305 event_active_nolock(ev, res, ncalls);
2306
2307 EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);
2308 }
2309
2310
2311 void
2312 event_active_nolock(struct event *ev, int res, short ncalls)
2313 {
2314 struct event_base *base;
2315
2316 event_debug(("event_active: %p (fd "EV_SOCK_FMT"), res %d, callback %p",
2317 ev, EV_SOCK_ARG(ev->ev_fd), (int)res, ev->ev_callback));
2318
2319
2320
2321 if (ev->ev_flags & EVLIST_ACTIVE) {
2322 ev->ev_res |= res;
2323 return;
2324 }
2325
2326 base = ev->ev_base;
2327
2328 EVENT_BASE_ASSERT_LOCKED(base);
2329
2330 ev->ev_res = res;
2331
2332 if (ev->ev_pri < base->event_running_priority)
2333 base->event_continue = 1;
2334
2335 if (ev->ev_events & EV_SIGNAL) {
2336 #ifndef _EVENT_DISABLE_THREAD_SUPPORT
2337 if (base->current_event == ev && !EVBASE_IN_THREAD(base)) {
2338 ++base->current_event_waiters;
2339 EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
2340 }
2341 #endif
2342 ev->ev_ncalls = ncalls;
2343 ev->ev_pncalls = NULL;
2344 }
2345
2346 event_queue_insert(base, ev, EVLIST_ACTIVE);
2347
2348 if (EVBASE_NEED_NOTIFY(base))
2349 evthread_notify_base(base);
2350 }
2351
2352 void
2353 event_deferred_cb_init(struct deferred_cb *cb, deferred_cb_fn fn, void *arg)
2354 {
2355 memset(cb, 0, sizeof(struct deferred_cb));
2356 cb->cb = fn;
2357 cb->arg = arg;
2358 }
2359
2360 void
2361 event_deferred_cb_cancel(struct deferred_cb_queue *queue,
2362 struct deferred_cb *cb)
2363 {
2364 if (!queue) {
2365 if (current_base)
2366 queue = ¤t_base->defer_queue;
2367 else
2368 return;
2369 }
2370
2371 LOCK_DEFERRED_QUEUE(queue);
2372 if (cb->queued) {
2373 TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next);
2374 --queue->active_count;
2375 cb->queued = 0;
2376 }
2377 UNLOCK_DEFERRED_QUEUE(queue);
2378 }
2379
2380 void
2381 event_deferred_cb_schedule(struct deferred_cb_queue *queue,
2382 struct deferred_cb *cb)
2383 {
2384 if (!queue) {
2385 if (current_base)
2386 queue = ¤t_base->defer_queue;
2387 else
2388 return;
2389 }
2390
2391 LOCK_DEFERRED_QUEUE(queue);
2392 if (!cb->queued) {
2393 cb->queued = 1;
2394 TAILQ_INSERT_TAIL(&queue->deferred_cb_list, cb, cb_next);
2395 ++queue->active_count;
2396 if (queue->notify_fn)
2397 queue->notify_fn(queue, queue->notify_arg);
2398 }
2399 UNLOCK_DEFERRED_QUEUE(queue);
2400 }
2401
2402 static int
2403 timeout_next(struct event_base *base, struct timeval **tv_p)
2404 {
2405
2406 struct timeval now;
2407 struct event *ev;
2408 struct timeval *tv = *tv_p;
2409 int res = 0;
2410
2411 ev = min_heap_top(&base->timeheap);
2412
2413 if (ev == NULL) {
2414
2415 *tv_p = NULL;
2416 goto out;
2417 }
2418
2419 if (gettime(base, &now) == -1) {
2420 res = -1;
2421 goto out;
2422 }
2423
2424 if (evutil_timercmp(&ev->ev_timeout, &now, <=)) {
2425 evutil_timerclear(tv);
2426 goto out;
2427 }
2428
2429 evutil_timersub(&ev->ev_timeout, &now, tv);
2430
2431 EVUTIL_ASSERT(tv->tv_sec >= 0);
2432 EVUTIL_ASSERT(tv->tv_usec >= 0);
2433 event_debug(("timeout_next: in %d seconds", (int)tv->tv_sec));
2434
2435 out:
2436 return (res);
2437 }
2438
2439
2440
2441
2442
2443
2444
2445 static void
2446 timeout_correct(struct event_base *base, struct timeval *tv)
2447 {
2448
2449 struct event **pev;
2450 unsigned int size;
2451 struct timeval off;
2452 int i;
2453
2454 if (use_monotonic)
2455 return;
2456
2457
2458 gettime(base, tv);
2459
2460 if (evutil_timercmp(tv, &base->event_tv, >=)) {
2461 base->event_tv = *tv;
2462 return;
2463 }
2464
2465 event_debug(("%s: time is running backwards, corrected",
2466 __func__));
2467 evutil_timersub(&base->event_tv, tv, &off);
2468
2469
2470
2471
2472
2473 pev = base->timeheap.p;
2474 size = base->timeheap.n;
2475 for (; size-- > 0; ++pev) {
2476 struct timeval *ev_tv = &(**pev).ev_timeout;
2477 evutil_timersub(ev_tv, &off, ev_tv);
2478 }
2479 for (i=0; i<base->n_common_timeouts; ++i) {
2480 struct event *ev;
2481 struct common_timeout_list *ctl =
2482 base->common_timeout_queues[i];
2483 TAILQ_FOREACH(ev, &ctl->events,
2484 ev_timeout_pos.ev_next_with_common_timeout) {
2485 struct timeval *ev_tv = &ev->ev_timeout;
2486 ev_tv->tv_usec &= MICROSECONDS_MASK;
2487 evutil_timersub(ev_tv, &off, ev_tv);
2488 ev_tv->tv_usec |= COMMON_TIMEOUT_MAGIC |
2489 (i<<COMMON_TIMEOUT_IDX_SHIFT);
2490 }
2491 }
2492
2493
2494 base->event_tv = *tv;
2495 }
2496
2497
2498 static void
2499 timeout_process(struct event_base *base)
2500 {
2501
2502 struct timeval now;
2503 struct event *ev;
2504
2505 if (min_heap_empty(&base->timeheap)) {
2506 return;
2507 }
2508
2509 gettime(base, &now);
2510
2511 while ((ev = min_heap_top(&base->timeheap))) {
2512 if (evutil_timercmp(&ev->ev_timeout, &now, >))
2513 break;
2514
2515
2516 event_del_internal(ev);
2517
2518 event_debug(("timeout_process: call %p",
2519 ev->ev_callback));
2520 event_active_nolock(ev, EV_TIMEOUT, 1);
2521 }
2522 }
2523
2524
2525 static void
2526 event_queue_remove(struct event_base *base, struct event *ev, int queue)
2527 {
2528 EVENT_BASE_ASSERT_LOCKED(base);
2529
2530 if (!(ev->ev_flags & queue)) {
2531 event_errx(1, "%s: %p(fd "EV_SOCK_FMT") not on queue %x", __func__,
2532 ev, EV_SOCK_ARG(ev->ev_fd), queue);
2533 return;
2534 }
2535
2536 if (~ev->ev_flags & EVLIST_INTERNAL)
2537 base->event_count--;
2538
2539 ev->ev_flags &= ~queue;
2540 switch (queue) {
2541 case EVLIST_INSERTED:
2542 TAILQ_REMOVE(&base->eventqueue, ev, ev_next);
2543 break;
2544 case EVLIST_ACTIVE:
2545 base->event_count_active--;
2546 TAILQ_REMOVE(&base->activequeues[ev->ev_pri],
2547 ev, ev_active_next);
2548 break;
2549 case EVLIST_TIMEOUT:
2550 if (is_common_timeout(&ev->ev_timeout, base)) {
2551 struct common_timeout_list *ctl =
2552 get_common_timeout_list(base, &ev->ev_timeout);
2553 TAILQ_REMOVE(&ctl->events, ev,
2554 ev_timeout_pos.ev_next_with_common_timeout);
2555 } else {
2556 min_heap_erase(&base->timeheap, ev);
2557 }
2558 break;
2559 default:
2560 event_errx(1, "%s: unknown queue %x", __func__, queue);
2561 }
2562 }
2563
2564
2565 static void
2566 insert_common_timeout_inorder(struct common_timeout_list *ctl,
2567 struct event *ev)
2568 {
2569 struct event *e;
2570
2571
2572
2573
2574
2575
2576
2577 TAILQ_FOREACH_REVERSE(e, &ctl->events,
2578 event_list, ev_timeout_pos.ev_next_with_common_timeout) {
2579
2580
2581
2582
2583 EVUTIL_ASSERT(
2584 is_same_common_timeout(&e->ev_timeout, &ev->ev_timeout));
2585 if (evutil_timercmp(&ev->ev_timeout, &e->ev_timeout, >=)) {
2586 TAILQ_INSERT_AFTER(&ctl->events, e, ev,
2587 ev_timeout_pos.ev_next_with_common_timeout);
2588 return;
2589 }
2590 }
2591 TAILQ_INSERT_HEAD(&ctl->events, ev,
2592 ev_timeout_pos.ev_next_with_common_timeout);
2593 }
2594
2595 static void
2596 event_queue_insert(struct event_base *base, struct event *ev, int queue)
2597 {
2598 EVENT_BASE_ASSERT_LOCKED(base);
2599
2600 if (ev->ev_flags & queue) {
2601
2602 if (queue & EVLIST_ACTIVE)
2603 return;
2604
2605 event_errx(1, "%s: %p(fd "EV_SOCK_FMT") already on queue %x", __func__,
2606 ev, EV_SOCK_ARG(ev->ev_fd), queue);
2607 return;
2608 }
2609
2610 if (~ev->ev_flags & EVLIST_INTERNAL)
2611 base->event_count++;
2612
2613 ev->ev_flags |= queue;
2614 switch (queue) {
2615 case EVLIST_INSERTED:
2616 TAILQ_INSERT_TAIL(&base->eventqueue, ev, ev_next);
2617 break;
2618 case EVLIST_ACTIVE:
2619 base->event_count_active++;
2620 TAILQ_INSERT_TAIL(&base->activequeues[ev->ev_pri],
2621 ev,ev_active_next);
2622 break;
2623 case EVLIST_TIMEOUT: {
2624 if (is_common_timeout(&ev->ev_timeout, base)) {
2625 struct common_timeout_list *ctl =
2626 get_common_timeout_list(base, &ev->ev_timeout);
2627 insert_common_timeout_inorder(ctl, ev);
2628 } else
2629 min_heap_push(&base->timeheap, ev);
2630 break;
2631 }
2632 default:
2633 event_errx(1, "%s: unknown queue %x", __func__, queue);
2634 }
2635 }
2636
2637
2638
2639 const char *
2640 event_get_version(void)
2641 {
2642 return (_EVENT_VERSION);
2643 }
2644
2645 ev_uint32_t
2646 event_get_version_number(void)
2647 {
2648 return (_EVENT_NUMERIC_VERSION);
2649 }
2650
2651
2652
2653
2654
2655
2656 const char *
2657 event_get_method(void)
2658 {
2659 return (current_base->evsel->name);
2660 }
2661
2662 #ifndef _EVENT_DISABLE_MM_REPLACEMENT
2663 static void *(*_mm_malloc_fn)(size_t sz) = NULL;
2664 static void *(*_mm_realloc_fn)(void *p, size_t sz) = NULL;
2665 static void (*_mm_free_fn)(void *p) = NULL;
2666
2667 void *
2668 event_mm_malloc_(size_t sz)
2669 {
2670 if (_mm_malloc_fn)
2671 return _mm_malloc_fn(sz);
2672 else
2673 return malloc(sz);
2674 }
2675
2676 void *
2677 event_mm_calloc_(size_t count, size_t size)
2678 {
2679 if (_mm_malloc_fn) {
2680 size_t sz = count * size;
2681 void *p = _mm_malloc_fn(sz);
2682 if (p)
2683 memset(p, 0, sz);
2684 return p;
2685 } else
2686 return calloc(count, size);
2687 }
2688
2689 char *
2690 event_mm_strdup_(const char *str)
2691 {
2692 if (_mm_malloc_fn) {
2693 size_t ln = strlen(str);
2694 void *p = _mm_malloc_fn(ln+1);
2695 if (p)
2696 memcpy(p, str, ln+1);
2697 return p;
2698 } else
2699 #ifdef WIN32
2700 return _strdup(str);
2701 #else
2702 return strdup(str);
2703 #endif
2704 }
2705
2706 void *
2707 event_mm_realloc_(void *ptr, size_t sz)
2708 {
2709 if (_mm_realloc_fn)
2710 return _mm_realloc_fn(ptr, sz);
2711 else
2712 return realloc(ptr, sz);
2713 }
2714
2715 void
2716 event_mm_free_(void *ptr)
2717 {
2718 if (_mm_free_fn)
2719 _mm_free_fn(ptr);
2720 else
2721 free(ptr);
2722 }
2723
2724 void
2725 event_set_mem_functions(void *(*malloc_fn)(size_t sz),
2726 void *(*realloc_fn)(void *ptr, size_t sz),
2727 void (*free_fn)(void *ptr))
2728 {
2729 _mm_malloc_fn = malloc_fn;
2730 _mm_realloc_fn = realloc_fn;
2731 _mm_free_fn = free_fn;
2732 }
2733 #endif
2734
2735 #if defined(_EVENT_HAVE_EVENTFD) && defined(_EVENT_HAVE_SYS_EVENTFD_H)
2736 static void
2737 evthread_notify_drain_eventfd(evutil_socket_t fd, short what, void *arg)
2738 {
2739 ev_uint64_t msg;
2740 ev_ssize_t r;
2741 struct event_base *base = arg;
2742
2743 r = read(fd, (void*) &msg, sizeof(msg));
2744 if (r<0 && errno != EAGAIN) {
2745 event_sock_warn(fd, "Error reading from eventfd");
2746 }
2747 EVBASE_ACQUIRE_LOCK(base, th_base_lock);
2748 base->is_notify_pending = 0;
2749 EVBASE_RELEASE_LOCK(base, th_base_lock);
2750 }
2751 #endif
2752
2753 static void
2754 evthread_notify_drain_default(evutil_socket_t fd, short what, void *arg)
2755 {
2756 unsigned char buf[1024];
2757 struct event_base *base = arg;
2758 #ifdef WIN32
2759 while (recv(fd, (char*)buf, sizeof(buf), 0) > 0)
2760 ;
2761 #else
2762 while (read(fd, (char*)buf, sizeof(buf)) > 0)
2763 ;
2764 #endif
2765
2766 EVBASE_ACQUIRE_LOCK(base, th_base_lock);
2767 base->is_notify_pending = 0;
2768 EVBASE_RELEASE_LOCK(base, th_base_lock);
2769 }
2770
2771 int
2772 evthread_make_base_notifiable(struct event_base *base)
2773 {
2774 void (*cb)(evutil_socket_t, short, void *) = evthread_notify_drain_default;
2775 int (*notify)(struct event_base *) = evthread_notify_base_default;
2776
2777
2778 if (!base)
2779 return -1;
2780
2781 if (base->th_notify_fd[0] >= 0)
2782 return 0;
2783
2784 #if defined(_EVENT_HAVE_EVENTFD) && defined(_EVENT_HAVE_SYS_EVENTFD_H)
2785 #ifndef EFD_CLOEXEC
2786 #define EFD_CLOEXEC 0
2787 #endif
2788 base->th_notify_fd[0] = eventfd(0, EFD_CLOEXEC);
2789 if (base->th_notify_fd[0] >= 0) {
2790 evutil_make_socket_closeonexec(base->th_notify_fd[0]);
2791 notify = evthread_notify_base_eventfd;
2792 cb = evthread_notify_drain_eventfd;
2793 }
2794 #endif
2795 #if defined(_EVENT_HAVE_PIPE)
2796 if (base->th_notify_fd[0] < 0) {
2797 if ((base->evsel->features & EV_FEATURE_FDS)) {
2798 if (pipe(base->th_notify_fd) < 0) {
2799 event_warn("%s: pipe", __func__);
2800 } else {
2801 evutil_make_socket_closeonexec(base->th_notify_fd[0]);
2802 evutil_make_socket_closeonexec(base->th_notify_fd[1]);
2803 }
2804 }
2805 }
2806 #endif
2807
2808 #ifdef WIN32
2809 #define LOCAL_SOCKETPAIR_AF AF_INET
2810 #else
2811 #define LOCAL_SOCKETPAIR_AF AF_UNIX
2812 #endif
2813 if (base->th_notify_fd[0] < 0) {
2814 if (evutil_socketpair(LOCAL_SOCKETPAIR_AF, SOCK_STREAM, 0,
2815 base->th_notify_fd) == -1) {
2816 event_sock_warn(-1, "%s: socketpair", __func__);
2817 return (-1);
2818 } else {
2819 evutil_make_socket_closeonexec(base->th_notify_fd[0]);
2820 evutil_make_socket_closeonexec(base->th_notify_fd[1]);
2821 }
2822 }
2823
2824 evutil_make_socket_nonblocking(base->th_notify_fd[0]);
2825
2826 base->th_notify_fn = notify;
2827
2828
2829
2830
2831
2832
2833
2834
2835
2836 if (base->th_notify_fd[1] > 0)
2837 evutil_make_socket_nonblocking(base->th_notify_fd[1]);
2838
2839
2840 event_assign(&base->th_notify, base, base->th_notify_fd[0],
2841 EV_READ|EV_PERSIST, cb, base);
2842
2843
2844 base->th_notify.ev_flags |= EVLIST_INTERNAL;
2845 event_priority_set(&base->th_notify, 0);
2846
2847 return event_add(&base->th_notify, NULL);
2848 }
2849
2850 void
2851 event_base_dump_events(struct event_base *base, FILE *output)
2852 {
2853 struct event *e;
2854 int i;
2855 fprintf(output, "Inserted events:\n");
2856 TAILQ_FOREACH(e, &base->eventqueue, ev_next) {
2857 fprintf(output, " %p [fd "EV_SOCK_FMT"]%s%s%s%s%s\n",
2858 (void*)e, EV_SOCK_ARG(e->ev_fd),
2859 (e->ev_events&EV_READ)?" Read":"",
2860 (e->ev_events&EV_WRITE)?" Write":"",
2861 (e->ev_events&EV_SIGNAL)?" Signal":"",
2862 (e->ev_events&EV_TIMEOUT)?" Timeout":"",
2863 (e->ev_events&EV_PERSIST)?" Persist":"");
2864
2865 }
2866 for (i = 0; i < base->nactivequeues; ++i) {
2867 if (TAILQ_EMPTY(&base->activequeues[i]))
2868 continue;
2869 fprintf(output, "Active events [priority %d]:\n", i);
2870 TAILQ_FOREACH(e, &base->eventqueue, ev_next) {
2871 fprintf(output, " %p [fd "EV_SOCK_FMT"]%s%s%s%s\n",
2872 (void*)e, EV_SOCK_ARG(e->ev_fd),
2873 (e->ev_res&EV_READ)?" Read active":"",
2874 (e->ev_res&EV_WRITE)?" Write active":"",
2875 (e->ev_res&EV_SIGNAL)?" Signal active":"",
2876 (e->ev_res&EV_TIMEOUT)?" Timeout active":"");
2877 }
2878 }
2879 }
2880
2881 void
2882 event_base_add_virtual(struct event_base *base)
2883 {
2884 EVBASE_ACQUIRE_LOCK(base, th_base_lock);
2885 base->virtual_event_count++;
2886 EVBASE_RELEASE_LOCK(base, th_base_lock);
2887 }
2888
2889 void
2890 event_base_del_virtual(struct event_base *base)
2891 {
2892 EVBASE_ACQUIRE_LOCK(base, th_base_lock);
2893 EVUTIL_ASSERT(base->virtual_event_count > 0);
2894 base->virtual_event_count--;
2895 if (base->virtual_event_count == 0 && EVBASE_NEED_NOTIFY(base))
2896 evthread_notify_base(base);
2897 EVBASE_RELEASE_LOCK(base, th_base_lock);
2898 }
2899
2900 #ifndef _EVENT_DISABLE_THREAD_SUPPORT
2901 int
2902 event_global_setup_locks_(const int enable_locks)
2903 {
2904 #ifndef _EVENT_DISABLE_DEBUG_MODE
2905 EVTHREAD_SETUP_GLOBAL_LOCK(_event_debug_map_lock, 0);
2906 #endif
2907 if (evsig_global_setup_locks_(enable_locks) < 0)
2908 return -1;
2909 if (evutil_secure_rng_global_setup_locks_(enable_locks) < 0)
2910 return -1;
2911 return 0;
2912 }
2913 #endif
2914
2915 void
2916 event_base_assert_ok(struct event_base *base)
2917 {
2918 int i;
2919 EVBASE_ACQUIRE_LOCK(base, th_base_lock);
2920 evmap_check_integrity(base);
2921
2922
2923 for (i = 1; i < (int)base->timeheap.n; ++i) {
2924 int parent = (i - 1) / 2;
2925 struct event *ev, *p_ev;
2926 ev = base->timeheap.p[i];
2927 p_ev = base->timeheap.p[parent];
2928 EVUTIL_ASSERT(ev->ev_flags & EV_TIMEOUT);
2929 EVUTIL_ASSERT(evutil_timercmp(&p_ev->ev_timeout, &ev->ev_timeout, <=));
2930 EVUTIL_ASSERT(ev->ev_timeout_pos.min_heap_idx == i);
2931 }
2932
2933
2934 for (i = 0; i < base->n_common_timeouts; ++i) {
2935 struct common_timeout_list *ctl = base->common_timeout_queues[i];
2936 struct event *last=NULL, *ev;
2937 TAILQ_FOREACH(ev, &ctl->events, ev_timeout_pos.ev_next_with_common_timeout) {
2938 if (last)
2939 EVUTIL_ASSERT(evutil_timercmp(&last->ev_timeout, &ev->ev_timeout, <=));
2940 EVUTIL_ASSERT(ev->ev_flags & EV_TIMEOUT);
2941 EVUTIL_ASSERT(is_common_timeout(&ev->ev_timeout,base));
2942 EVUTIL_ASSERT(COMMON_TIMEOUT_IDX(&ev->ev_timeout) == i);
2943 last = ev;
2944 }
2945 }
2946
2947 EVBASE_RELEASE_LOCK(base, th_base_lock);
2948 }