This source file includes following definitions.
- ev_token_bucket_init
- ev_token_bucket_update
- bufferevent_update_buckets
- ev_token_bucket_get_tick
- ev_token_bucket_cfg_new
- ev_token_bucket_cfg_free
- _bufferevent_get_rlim_max
- _bufferevent_get_read_max
- _bufferevent_get_write_max
- _bufferevent_decrement_read_buckets
- _bufferevent_decrement_write_buckets
- _bev_group_suspend_reading
- _bev_group_suspend_writing
- _bev_refill_callback
- _bev_group_random_element
- _bev_group_unsuspend_reading
- _bev_group_unsuspend_writing
- _bev_group_refill_callback
- bufferevent_set_rate_limit
- bufferevent_rate_limit_group_new
- bufferevent_rate_limit_group_set_cfg
- bufferevent_rate_limit_group_set_min_share
- bufferevent_rate_limit_group_free
- bufferevent_add_to_rate_limit_group
- bufferevent_remove_from_rate_limit_group
- bufferevent_remove_from_rate_limit_group_internal
- bufferevent_get_read_limit
- bufferevent_get_write_limit
- bufferevent_get_max_to_read
- bufferevent_get_max_to_write
- bufferevent_rate_limit_group_get_read_limit
- bufferevent_rate_limit_group_get_write_limit
- bufferevent_decrement_read_limit
- bufferevent_decrement_write_limit
- bufferevent_rate_limit_group_decrement_read
- bufferevent_rate_limit_group_decrement_write
- bufferevent_rate_limit_group_get_totals
- bufferevent_rate_limit_group_reset_totals
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 #include <sys/types.h>
30 #include <limits.h>
31 #include <string.h>
32 #include <stdlib.h>
33
34 #include "event2/event.h"
35 #include "event2/event_struct.h"
36 #include "event2/util.h"
37 #include "event2/bufferevent.h"
38 #include "event2/bufferevent_struct.h"
39 #include "event2/buffer.h"
40
41 #include "ratelim-internal.h"
42
43 #include "bufferevent-internal.h"
44 #include "mm-internal.h"
45 #include "util-internal.h"
46 #include "event-internal.h"
47
48 int
49 ev_token_bucket_init(struct ev_token_bucket *bucket,
50 const struct ev_token_bucket_cfg *cfg,
51 ev_uint32_t current_tick,
52 int reinitialize)
53 {
54 if (reinitialize) {
55
56
57
58
59
60 if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
61 bucket->read_limit = cfg->read_maximum;
62 if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
63 bucket->write_limit = cfg->write_maximum;
64 } else {
65 bucket->read_limit = cfg->read_rate;
66 bucket->write_limit = cfg->write_rate;
67 bucket->last_updated = current_tick;
68 }
69 return 0;
70 }
71
72 int
73 ev_token_bucket_update(struct ev_token_bucket *bucket,
74 const struct ev_token_bucket_cfg *cfg,
75 ev_uint32_t current_tick)
76 {
77
78
79 unsigned n_ticks = current_tick - bucket->last_updated;
80
81
82
83 if (n_ticks == 0 || n_ticks > INT_MAX)
84 return 0;
85
86
87
88
89
90
91
92
93
94
95 if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
96 bucket->read_limit = cfg->read_maximum;
97 else
98 bucket->read_limit += n_ticks * cfg->read_rate;
99
100
101 if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
102 bucket->write_limit = cfg->write_maximum;
103 else
104 bucket->write_limit += n_ticks * cfg->write_rate;
105
106
107 bucket->last_updated = current_tick;
108
109 return 1;
110 }
111
112 static inline void
113 bufferevent_update_buckets(struct bufferevent_private *bev)
114 {
115
116 struct timeval now;
117 unsigned tick;
118 event_base_gettimeofday_cached(bev->bev.ev_base, &now);
119 tick = ev_token_bucket_get_tick(&now, bev->rate_limiting->cfg);
120 if (tick != bev->rate_limiting->limit.last_updated)
121 ev_token_bucket_update(&bev->rate_limiting->limit,
122 bev->rate_limiting->cfg, tick);
123 }
124
125 ev_uint32_t
126 ev_token_bucket_get_tick(const struct timeval *tv,
127 const struct ev_token_bucket_cfg *cfg)
128 {
129
130
131
132
133
134
135
136
137 ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
138 return (unsigned)(msec / cfg->msec_per_tick);
139 }
140
141 struct ev_token_bucket_cfg *
142 ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
143 size_t write_rate, size_t write_burst,
144 const struct timeval *tick_len)
145 {
146 struct ev_token_bucket_cfg *r;
147 struct timeval g;
148 if (! tick_len) {
149 g.tv_sec = 1;
150 g.tv_usec = 0;
151 tick_len = &g;
152 }
153 if (read_rate > read_burst || write_rate > write_burst ||
154 read_rate < 1 || write_rate < 1)
155 return NULL;
156 if (read_rate > EV_RATE_LIMIT_MAX ||
157 write_rate > EV_RATE_LIMIT_MAX ||
158 read_burst > EV_RATE_LIMIT_MAX ||
159 write_burst > EV_RATE_LIMIT_MAX)
160 return NULL;
161 r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
162 if (!r)
163 return NULL;
164 r->read_rate = read_rate;
165 r->write_rate = write_rate;
166 r->read_maximum = read_burst;
167 r->write_maximum = write_burst;
168 memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
169 r->msec_per_tick = (tick_len->tv_sec * 1000) +
170 (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
171 return r;
172 }
173
174 void
175 ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
176 {
177 mm_free(cfg);
178 }
179
180
181
182 #define MAX_TO_READ_EVER 16384
183
184
185 #define MAX_TO_WRITE_EVER 16384
186
187 #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
188 #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
189
190 static int _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g);
191 static int _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g);
192 static void _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g);
193 static void _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g);
194
195
196
197
198
199 static inline ev_ssize_t
200 _bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write)
201 {
202
203 ev_ssize_t max_so_far = is_write?MAX_TO_WRITE_EVER:MAX_TO_READ_EVER;
204
205 #define LIM(x) \
206 (is_write ? (x).write_limit : (x).read_limit)
207
208 #define GROUP_SUSPENDED(g) \
209 (is_write ? (g)->write_suspended : (g)->read_suspended)
210
211
212 #define CLAMPTO(x) \
213 do { \
214 if (max_so_far > (x)) \
215 max_so_far = (x); \
216 } while (0);
217
218 if (!bev->rate_limiting)
219 return max_so_far;
220
221
222
223
224
225
226 if (bev->rate_limiting->cfg) {
227 bufferevent_update_buckets(bev);
228 max_so_far = LIM(bev->rate_limiting->limit);
229 }
230 if (bev->rate_limiting->group) {
231 struct bufferevent_rate_limit_group *g =
232 bev->rate_limiting->group;
233 ev_ssize_t share;
234 LOCK_GROUP(g);
235 if (GROUP_SUSPENDED(g)) {
236
237
238
239 if (is_write)
240 bufferevent_suspend_write(&bev->bev,
241 BEV_SUSPEND_BW_GROUP);
242 else
243 bufferevent_suspend_read(&bev->bev,
244 BEV_SUSPEND_BW_GROUP);
245 share = 0;
246 } else {
247
248
249 share = LIM(g->rate_limit) / g->n_members;
250 if (share < g->min_share)
251 share = g->min_share;
252 }
253 UNLOCK_GROUP(g);
254 CLAMPTO(share);
255 }
256
257 if (max_so_far < 0)
258 max_so_far = 0;
259 return max_so_far;
260 }
261
262 ev_ssize_t
263 _bufferevent_get_read_max(struct bufferevent_private *bev)
264 {
265 return _bufferevent_get_rlim_max(bev, 0);
266 }
267
268 ev_ssize_t
269 _bufferevent_get_write_max(struct bufferevent_private *bev)
270 {
271 return _bufferevent_get_rlim_max(bev, 1);
272 }
273
274 int
275 _bufferevent_decrement_read_buckets(struct bufferevent_private *bev, ev_ssize_t bytes)
276 {
277
278 int r = 0;
279
280 if (!bev->rate_limiting)
281 return 0;
282
283 if (bev->rate_limiting->cfg) {
284 bev->rate_limiting->limit.read_limit -= bytes;
285 if (bev->rate_limiting->limit.read_limit <= 0) {
286 bufferevent_suspend_read(&bev->bev, BEV_SUSPEND_BW);
287 if (event_add(&bev->rate_limiting->refill_bucket_event,
288 &bev->rate_limiting->cfg->tick_timeout) < 0)
289 r = -1;
290 } else if (bev->read_suspended & BEV_SUSPEND_BW) {
291 if (!(bev->write_suspended & BEV_SUSPEND_BW))
292 event_del(&bev->rate_limiting->refill_bucket_event);
293 bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW);
294 }
295 }
296
297 if (bev->rate_limiting->group) {
298 LOCK_GROUP(bev->rate_limiting->group);
299 bev->rate_limiting->group->rate_limit.read_limit -= bytes;
300 bev->rate_limiting->group->total_read += bytes;
301 if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
302 _bev_group_suspend_reading(bev->rate_limiting->group);
303 } else if (bev->rate_limiting->group->read_suspended) {
304 _bev_group_unsuspend_reading(bev->rate_limiting->group);
305 }
306 UNLOCK_GROUP(bev->rate_limiting->group);
307 }
308
309 return r;
310 }
311
312 int
313 _bufferevent_decrement_write_buckets(struct bufferevent_private *bev, ev_ssize_t bytes)
314 {
315
316 int r = 0;
317
318 if (!bev->rate_limiting)
319 return 0;
320
321 if (bev->rate_limiting->cfg) {
322 bev->rate_limiting->limit.write_limit -= bytes;
323 if (bev->rate_limiting->limit.write_limit <= 0) {
324 bufferevent_suspend_write(&bev->bev, BEV_SUSPEND_BW);
325 if (event_add(&bev->rate_limiting->refill_bucket_event,
326 &bev->rate_limiting->cfg->tick_timeout) < 0)
327 r = -1;
328 } else if (bev->write_suspended & BEV_SUSPEND_BW) {
329 if (!(bev->read_suspended & BEV_SUSPEND_BW))
330 event_del(&bev->rate_limiting->refill_bucket_event);
331 bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW);
332 }
333 }
334
335 if (bev->rate_limiting->group) {
336 LOCK_GROUP(bev->rate_limiting->group);
337 bev->rate_limiting->group->rate_limit.write_limit -= bytes;
338 bev->rate_limiting->group->total_written += bytes;
339 if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
340 _bev_group_suspend_writing(bev->rate_limiting->group);
341 } else if (bev->rate_limiting->group->write_suspended) {
342 _bev_group_unsuspend_writing(bev->rate_limiting->group);
343 }
344 UNLOCK_GROUP(bev->rate_limiting->group);
345 }
346
347 return r;
348 }
349
350
351 static int
352 _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g)
353 {
354
355 struct bufferevent_private *bev;
356 g->read_suspended = 1;
357 g->pending_unsuspend_read = 0;
358
359
360
361
362
363
364
365 TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
366 if (EVLOCK_TRY_LOCK(bev->lock)) {
367 bufferevent_suspend_read(&bev->bev,
368 BEV_SUSPEND_BW_GROUP);
369 EVLOCK_UNLOCK(bev->lock, 0);
370 }
371 }
372 return 0;
373 }
374
375
376 static int
377 _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g)
378 {
379
380 struct bufferevent_private *bev;
381 g->write_suspended = 1;
382 g->pending_unsuspend_write = 0;
383 TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
384 if (EVLOCK_TRY_LOCK(bev->lock)) {
385 bufferevent_suspend_write(&bev->bev,
386 BEV_SUSPEND_BW_GROUP);
387 EVLOCK_UNLOCK(bev->lock, 0);
388 }
389 }
390 return 0;
391 }
392
393
394
395 static void
396 _bev_refill_callback(evutil_socket_t fd, short what, void *arg)
397 {
398 unsigned tick;
399 struct timeval now;
400 struct bufferevent_private *bev = arg;
401 int again = 0;
402 BEV_LOCK(&bev->bev);
403 if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
404 BEV_UNLOCK(&bev->bev);
405 return;
406 }
407
408
409 event_base_gettimeofday_cached(bev->bev.ev_base, &now);
410 tick = ev_token_bucket_get_tick(&now,
411 bev->rate_limiting->cfg);
412 ev_token_bucket_update(&bev->rate_limiting->limit,
413 bev->rate_limiting->cfg,
414 tick);
415
416
417 if ((bev->read_suspended & BEV_SUSPEND_BW)) {
418 if (bev->rate_limiting->limit.read_limit > 0)
419 bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW);
420 else
421 again = 1;
422 }
423 if ((bev->write_suspended & BEV_SUSPEND_BW)) {
424 if (bev->rate_limiting->limit.write_limit > 0)
425 bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW);
426 else
427 again = 1;
428 }
429 if (again) {
430
431
432
433
434
435
436
437 event_add(&bev->rate_limiting->refill_bucket_event,
438 &bev->rate_limiting->cfg->tick_timeout);
439 }
440 BEV_UNLOCK(&bev->bev);
441 }
442
443
444 static struct bufferevent_private *
445 _bev_group_random_element(struct bufferevent_rate_limit_group *group)
446 {
447 int which;
448 struct bufferevent_private *bev;
449
450
451
452 if (!group->n_members)
453 return NULL;
454
455 EVUTIL_ASSERT(! TAILQ_EMPTY(&group->members));
456
457 which = _evutil_weakrand() % group->n_members;
458
459 bev = TAILQ_FIRST(&group->members);
460 while (which--)
461 bev = TAILQ_NEXT(bev, rate_limiting->next_in_group);
462
463 return bev;
464 }
465
466
467
468
469
470
471
472
473 #define FOREACH_RANDOM_ORDER(block) \
474 do { \
475 first = _bev_group_random_element(g); \
476 for (bev = first; bev != TAILQ_END(&g->members); \
477 bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \
478 block ; \
479 } \
480 for (bev = TAILQ_FIRST(&g->members); bev && bev != first; \
481 bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \
482 block ; \
483 } \
484 } while (0)
485
486 static void
487 _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g)
488 {
489 int again = 0;
490 struct bufferevent_private *bev, *first;
491
492 g->read_suspended = 0;
493 FOREACH_RANDOM_ORDER({
494 if (EVLOCK_TRY_LOCK(bev->lock)) {
495 bufferevent_unsuspend_read(&bev->bev,
496 BEV_SUSPEND_BW_GROUP);
497 EVLOCK_UNLOCK(bev->lock, 0);
498 } else {
499 again = 1;
500 }
501 });
502 g->pending_unsuspend_read = again;
503 }
504
505 static void
506 _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g)
507 {
508 int again = 0;
509 struct bufferevent_private *bev, *first;
510 g->write_suspended = 0;
511
512 FOREACH_RANDOM_ORDER({
513 if (EVLOCK_TRY_LOCK(bev->lock)) {
514 bufferevent_unsuspend_write(&bev->bev,
515 BEV_SUSPEND_BW_GROUP);
516 EVLOCK_UNLOCK(bev->lock, 0);
517 } else {
518 again = 1;
519 }
520 });
521 g->pending_unsuspend_write = again;
522 }
523
524
525
526
527 static void
528 _bev_group_refill_callback(evutil_socket_t fd, short what, void *arg)
529 {
530 struct bufferevent_rate_limit_group *g = arg;
531 unsigned tick;
532 struct timeval now;
533
534 event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
535
536 LOCK_GROUP(g);
537
538 tick = ev_token_bucket_get_tick(&now, &g->rate_limit_cfg);
539 ev_token_bucket_update(&g->rate_limit, &g->rate_limit_cfg, tick);
540
541 if (g->pending_unsuspend_read ||
542 (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
543 _bev_group_unsuspend_reading(g);
544 }
545 if (g->pending_unsuspend_write ||
546 (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
547 _bev_group_unsuspend_writing(g);
548 }
549
550
551
552
553
554
555 UNLOCK_GROUP(g);
556 }
557
558 int
559 bufferevent_set_rate_limit(struct bufferevent *bev,
560 struct ev_token_bucket_cfg *cfg)
561 {
562 struct bufferevent_private *bevp =
563 EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
564 int r = -1;
565 struct bufferevent_rate_limit *rlim;
566 struct timeval now;
567 ev_uint32_t tick;
568 int reinit = 0, suspended = 0;
569
570
571 BEV_LOCK(bev);
572
573 if (cfg == NULL) {
574 if (bevp->rate_limiting) {
575 rlim = bevp->rate_limiting;
576 rlim->cfg = NULL;
577 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
578 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
579 if (event_initialized(&rlim->refill_bucket_event))
580 event_del(&rlim->refill_bucket_event);
581 }
582 r = 0;
583 goto done;
584 }
585
586 event_base_gettimeofday_cached(bev->ev_base, &now);
587 tick = ev_token_bucket_get_tick(&now, cfg);
588
589 if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
590
591 r = 0;
592 goto done;
593 }
594 if (bevp->rate_limiting == NULL) {
595 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
596 if (!rlim)
597 goto done;
598 bevp->rate_limiting = rlim;
599 } else {
600 rlim = bevp->rate_limiting;
601 }
602 reinit = rlim->cfg != NULL;
603
604 rlim->cfg = cfg;
605 ev_token_bucket_init(&rlim->limit, cfg, tick, reinit);
606
607 if (reinit) {
608 EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
609 event_del(&rlim->refill_bucket_event);
610 }
611 evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
612 _bev_refill_callback, bevp);
613
614 if (rlim->limit.read_limit > 0) {
615 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
616 } else {
617 bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
618 suspended=1;
619 }
620 if (rlim->limit.write_limit > 0) {
621 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
622 } else {
623 bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
624 suspended = 1;
625 }
626
627 if (suspended)
628 event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
629
630 r = 0;
631
632 done:
633 BEV_UNLOCK(bev);
634 return r;
635 }
636
637 struct bufferevent_rate_limit_group *
638 bufferevent_rate_limit_group_new(struct event_base *base,
639 const struct ev_token_bucket_cfg *cfg)
640 {
641 struct bufferevent_rate_limit_group *g;
642 struct timeval now;
643 ev_uint32_t tick;
644
645 event_base_gettimeofday_cached(base, &now);
646 tick = ev_token_bucket_get_tick(&now, cfg);
647
648 g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
649 if (!g)
650 return NULL;
651 memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
652 TAILQ_INIT(&g->members);
653
654 ev_token_bucket_init(&g->rate_limit, cfg, tick, 0);
655
656 event_assign(&g->master_refill_event, base, -1, EV_PERSIST,
657 _bev_group_refill_callback, g);
658
659 event_add(&g->master_refill_event, &cfg->tick_timeout);
660
661 EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
662
663 bufferevent_rate_limit_group_set_min_share(g, 64);
664
665 return g;
666 }
667
668 int
669 bufferevent_rate_limit_group_set_cfg(
670 struct bufferevent_rate_limit_group *g,
671 const struct ev_token_bucket_cfg *cfg)
672 {
673 int same_tick;
674 if (!g || !cfg)
675 return -1;
676
677 LOCK_GROUP(g);
678 same_tick = evutil_timercmp(
679 &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
680 memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
681
682 if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
683 g->rate_limit.read_limit = cfg->read_maximum;
684 if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
685 g->rate_limit.write_limit = cfg->write_maximum;
686
687 if (!same_tick) {
688
689 event_add(&g->master_refill_event, &cfg->tick_timeout);
690 }
691
692
693 bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
694
695 UNLOCK_GROUP(g);
696 return 0;
697 }
698
699 int
700 bufferevent_rate_limit_group_set_min_share(
701 struct bufferevent_rate_limit_group *g,
702 size_t share)
703 {
704 if (share > EV_SSIZE_MAX)
705 return -1;
706
707 g->configured_min_share = share;
708
709
710
711 if (share > g->rate_limit_cfg.read_rate)
712 share = g->rate_limit_cfg.read_rate;
713 if (share > g->rate_limit_cfg.write_rate)
714 share = g->rate_limit_cfg.write_rate;
715
716 g->min_share = share;
717 return 0;
718 }
719
720 void
721 bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
722 {
723 LOCK_GROUP(g);
724 EVUTIL_ASSERT(0 == g->n_members);
725 event_del(&g->master_refill_event);
726 UNLOCK_GROUP(g);
727 EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
728 mm_free(g);
729 }
730
731 int
732 bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
733 struct bufferevent_rate_limit_group *g)
734 {
735 int wsuspend, rsuspend;
736 struct bufferevent_private *bevp =
737 EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
738 BEV_LOCK(bev);
739
740 if (!bevp->rate_limiting) {
741 struct bufferevent_rate_limit *rlim;
742 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
743 if (!rlim) {
744 BEV_UNLOCK(bev);
745 return -1;
746 }
747 evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
748 _bev_refill_callback, bevp);
749 bevp->rate_limiting = rlim;
750 }
751
752 if (bevp->rate_limiting->group == g) {
753 BEV_UNLOCK(bev);
754 return 0;
755 }
756 if (bevp->rate_limiting->group)
757 bufferevent_remove_from_rate_limit_group(bev);
758
759 LOCK_GROUP(g);
760 bevp->rate_limiting->group = g;
761 ++g->n_members;
762 TAILQ_INSERT_TAIL(&g->members, bevp, rate_limiting->next_in_group);
763
764 rsuspend = g->read_suspended;
765 wsuspend = g->write_suspended;
766
767 UNLOCK_GROUP(g);
768
769 if (rsuspend)
770 bufferevent_suspend_read(bev, BEV_SUSPEND_BW_GROUP);
771 if (wsuspend)
772 bufferevent_suspend_write(bev, BEV_SUSPEND_BW_GROUP);
773
774 BEV_UNLOCK(bev);
775 return 0;
776 }
777
778 int
779 bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
780 {
781 return bufferevent_remove_from_rate_limit_group_internal(bev, 1);
782 }
783
784 int
785 bufferevent_remove_from_rate_limit_group_internal(struct bufferevent *bev,
786 int unsuspend)
787 {
788 struct bufferevent_private *bevp =
789 EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
790 BEV_LOCK(bev);
791 if (bevp->rate_limiting && bevp->rate_limiting->group) {
792 struct bufferevent_rate_limit_group *g =
793 bevp->rate_limiting->group;
794 LOCK_GROUP(g);
795 bevp->rate_limiting->group = NULL;
796 --g->n_members;
797 TAILQ_REMOVE(&g->members, bevp, rate_limiting->next_in_group);
798 UNLOCK_GROUP(g);
799 }
800 if (unsuspend) {
801 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW_GROUP);
802 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW_GROUP);
803 }
804 BEV_UNLOCK(bev);
805 return 0;
806 }
807
808
809
810
811
812
813
814
815
816
817 ev_ssize_t
818 bufferevent_get_read_limit(struct bufferevent *bev)
819 {
820 ev_ssize_t r;
821 struct bufferevent_private *bevp;
822 BEV_LOCK(bev);
823 bevp = BEV_UPCAST(bev);
824 if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
825 bufferevent_update_buckets(bevp);
826 r = bevp->rate_limiting->limit.read_limit;
827 } else {
828 r = EV_SSIZE_MAX;
829 }
830 BEV_UNLOCK(bev);
831 return r;
832 }
833
834
835
836 ev_ssize_t
837 bufferevent_get_write_limit(struct bufferevent *bev)
838 {
839 ev_ssize_t r;
840 struct bufferevent_private *bevp;
841 BEV_LOCK(bev);
842 bevp = BEV_UPCAST(bev);
843 if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
844 bufferevent_update_buckets(bevp);
845 r = bevp->rate_limiting->limit.write_limit;
846 } else {
847 r = EV_SSIZE_MAX;
848 }
849 BEV_UNLOCK(bev);
850 return r;
851 }
852
853 ev_ssize_t
854 bufferevent_get_max_to_read(struct bufferevent *bev)
855 {
856 ev_ssize_t r;
857 BEV_LOCK(bev);
858 r = _bufferevent_get_read_max(BEV_UPCAST(bev));
859 BEV_UNLOCK(bev);
860 return r;
861 }
862
863 ev_ssize_t
864 bufferevent_get_max_to_write(struct bufferevent *bev)
865 {
866 ev_ssize_t r;
867 BEV_LOCK(bev);
868 r = _bufferevent_get_write_max(BEV_UPCAST(bev));
869 BEV_UNLOCK(bev);
870 return r;
871 }
872
873
874
875
876 ev_ssize_t
877 bufferevent_rate_limit_group_get_read_limit(
878 struct bufferevent_rate_limit_group *grp)
879 {
880 ev_ssize_t r;
881 LOCK_GROUP(grp);
882 r = grp->rate_limit.read_limit;
883 UNLOCK_GROUP(grp);
884 return r;
885 }
886
887
888
889 ev_ssize_t
890 bufferevent_rate_limit_group_get_write_limit(
891 struct bufferevent_rate_limit_group *grp)
892 {
893 ev_ssize_t r;
894 LOCK_GROUP(grp);
895 r = grp->rate_limit.write_limit;
896 UNLOCK_GROUP(grp);
897 return r;
898 }
899
900 int
901 bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
902 {
903 int r = 0;
904 ev_ssize_t old_limit, new_limit;
905 struct bufferevent_private *bevp;
906 BEV_LOCK(bev);
907 bevp = BEV_UPCAST(bev);
908 EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
909 old_limit = bevp->rate_limiting->limit.read_limit;
910
911 new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
912 if (old_limit > 0 && new_limit <= 0) {
913 bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
914 if (event_add(&bevp->rate_limiting->refill_bucket_event,
915 &bevp->rate_limiting->cfg->tick_timeout) < 0)
916 r = -1;
917 } else if (old_limit <= 0 && new_limit > 0) {
918 if (!(bevp->write_suspended & BEV_SUSPEND_BW))
919 event_del(&bevp->rate_limiting->refill_bucket_event);
920 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
921 }
922
923 BEV_UNLOCK(bev);
924 return r;
925 }
926
927 int
928 bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
929 {
930
931
932 int r = 0;
933 ev_ssize_t old_limit, new_limit;
934 struct bufferevent_private *bevp;
935 BEV_LOCK(bev);
936 bevp = BEV_UPCAST(bev);
937 EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
938 old_limit = bevp->rate_limiting->limit.write_limit;
939
940 new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
941 if (old_limit > 0 && new_limit <= 0) {
942 bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
943 if (event_add(&bevp->rate_limiting->refill_bucket_event,
944 &bevp->rate_limiting->cfg->tick_timeout) < 0)
945 r = -1;
946 } else if (old_limit <= 0 && new_limit > 0) {
947 if (!(bevp->read_suspended & BEV_SUSPEND_BW))
948 event_del(&bevp->rate_limiting->refill_bucket_event);
949 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
950 }
951
952 BEV_UNLOCK(bev);
953 return r;
954 }
955
956 int
957 bufferevent_rate_limit_group_decrement_read(
958 struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
959 {
960 int r = 0;
961 ev_ssize_t old_limit, new_limit;
962 LOCK_GROUP(grp);
963 old_limit = grp->rate_limit.read_limit;
964 new_limit = (grp->rate_limit.read_limit -= decr);
965
966 if (old_limit > 0 && new_limit <= 0) {
967 _bev_group_suspend_reading(grp);
968 } else if (old_limit <= 0 && new_limit > 0) {
969 _bev_group_unsuspend_reading(grp);
970 }
971
972 UNLOCK_GROUP(grp);
973 return r;
974 }
975
976 int
977 bufferevent_rate_limit_group_decrement_write(
978 struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
979 {
980 int r = 0;
981 ev_ssize_t old_limit, new_limit;
982 LOCK_GROUP(grp);
983 old_limit = grp->rate_limit.write_limit;
984 new_limit = (grp->rate_limit.write_limit -= decr);
985
986 if (old_limit > 0 && new_limit <= 0) {
987 _bev_group_suspend_writing(grp);
988 } else if (old_limit <= 0 && new_limit > 0) {
989 _bev_group_unsuspend_writing(grp);
990 }
991
992 UNLOCK_GROUP(grp);
993 return r;
994 }
995
996 void
997 bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
998 ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
999 {
1000 EVUTIL_ASSERT(grp != NULL);
1001 if (total_read_out)
1002 *total_read_out = grp->total_read;
1003 if (total_written_out)
1004 *total_written_out = grp->total_written;
1005 }
1006
1007 void
1008 bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
1009 {
1010 grp->total_read = grp->total_written = 0;
1011 }