This source file includes following definitions.
- ev_token_bucket_init
- ev_token_bucket_update
- bufferevent_update_buckets
- ev_token_bucket_get_tick
- ev_token_bucket_cfg_new
- ev_token_bucket_cfg_free
- _bufferevent_get_rlim_max
- _bufferevent_get_read_max
- _bufferevent_get_write_max
- _bufferevent_decrement_read_buckets
- _bufferevent_decrement_write_buckets
- _bev_group_suspend_reading
- _bev_group_suspend_writing
- _bev_refill_callback
- _bev_group_random_element
- _bev_group_unsuspend_reading
- _bev_group_unsuspend_writing
- _bev_group_refill_callback
- bufferevent_set_rate_limit
- bufferevent_rate_limit_group_new
- bufferevent_rate_limit_group_set_cfg
- bufferevent_rate_limit_group_set_min_share
- bufferevent_rate_limit_group_free
- bufferevent_add_to_rate_limit_group
- bufferevent_remove_from_rate_limit_group
- bufferevent_remove_from_rate_limit_group_internal
- bufferevent_get_read_limit
- bufferevent_get_write_limit
- bufferevent_get_max_to_read
- bufferevent_get_max_to_write
- bufferevent_rate_limit_group_get_read_limit
- bufferevent_rate_limit_group_get_write_limit
- bufferevent_decrement_read_limit
- bufferevent_decrement_write_limit
- bufferevent_rate_limit_group_decrement_read
- bufferevent_rate_limit_group_decrement_write
- bufferevent_rate_limit_group_get_totals
- bufferevent_rate_limit_group_reset_totals
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 
  17 
  18 
  19 
  20 
  21 
  22 
  23 
  24 
  25 
  26 
  27 
  28 
  29 #include <sys/types.h>
  30 #include <limits.h>
  31 #include <string.h>
  32 #include <stdlib.h>
  33 
  34 #include "event2/event.h"
  35 #include "event2/event_struct.h"
  36 #include "event2/util.h"
  37 #include "event2/bufferevent.h"
  38 #include "event2/bufferevent_struct.h"
  39 #include "event2/buffer.h"
  40 
  41 #include "ratelim-internal.h"
  42 
  43 #include "bufferevent-internal.h"
  44 #include "mm-internal.h"
  45 #include "util-internal.h"
  46 #include "event-internal.h"
  47 
  48 int
  49 ev_token_bucket_init(struct ev_token_bucket *bucket,
  50     const struct ev_token_bucket_cfg *cfg,
  51     ev_uint32_t current_tick,
  52     int reinitialize)
  53 {
  54         if (reinitialize) {
  55                 
  56 
  57 
  58 
  59 
  60                 if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
  61                         bucket->read_limit = cfg->read_maximum;
  62                 if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
  63                         bucket->write_limit = cfg->write_maximum;
  64         } else {
  65                 bucket->read_limit = cfg->read_rate;
  66                 bucket->write_limit = cfg->write_rate;
  67                 bucket->last_updated = current_tick;
  68         }
  69         return 0;
  70 }
  71 
  72 int
  73 ev_token_bucket_update(struct ev_token_bucket *bucket,
  74     const struct ev_token_bucket_cfg *cfg,
  75     ev_uint32_t current_tick)
  76 {
  77         
  78 
  79         unsigned n_ticks = current_tick - bucket->last_updated;
  80 
  81         
  82 
  83         if (n_ticks == 0 || n_ticks > INT_MAX)
  84                 return 0;
  85 
  86         
  87 
  88 
  89 
  90 
  91 
  92 
  93 
  94 
  95         if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
  96                 bucket->read_limit = cfg->read_maximum;
  97         else
  98                 bucket->read_limit += n_ticks * cfg->read_rate;
  99 
 100 
 101         if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
 102                 bucket->write_limit = cfg->write_maximum;
 103         else
 104                 bucket->write_limit += n_ticks * cfg->write_rate;
 105 
 106 
 107         bucket->last_updated = current_tick;
 108 
 109         return 1;
 110 }
 111 
 112 static inline void
 113 bufferevent_update_buckets(struct bufferevent_private *bev)
 114 {
 115         
 116         struct timeval now;
 117         unsigned tick;
 118         event_base_gettimeofday_cached(bev->bev.ev_base, &now);
 119         tick = ev_token_bucket_get_tick(&now, bev->rate_limiting->cfg);
 120         if (tick != bev->rate_limiting->limit.last_updated)
 121                 ev_token_bucket_update(&bev->rate_limiting->limit,
 122                     bev->rate_limiting->cfg, tick);
 123 }
 124 
 125 ev_uint32_t
 126 ev_token_bucket_get_tick(const struct timeval *tv,
 127     const struct ev_token_bucket_cfg *cfg)
 128 {
 129         
 130 
 131 
 132 
 133 
 134 
 135         
 136 
 137         ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
 138         return (unsigned)(msec / cfg->msec_per_tick);
 139 }
 140 
 141 struct ev_token_bucket_cfg *
 142 ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
 143     size_t write_rate, size_t write_burst,
 144     const struct timeval *tick_len)
 145 {
 146         struct ev_token_bucket_cfg *r;
 147         struct timeval g;
 148         if (! tick_len) {
 149                 g.tv_sec = 1;
 150                 g.tv_usec = 0;
 151                 tick_len = &g;
 152         }
 153         if (read_rate > read_burst || write_rate > write_burst ||
 154             read_rate < 1 || write_rate < 1)
 155                 return NULL;
 156         if (read_rate > EV_RATE_LIMIT_MAX ||
 157             write_rate > EV_RATE_LIMIT_MAX ||
 158             read_burst > EV_RATE_LIMIT_MAX ||
 159             write_burst > EV_RATE_LIMIT_MAX)
 160                 return NULL;
 161         r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
 162         if (!r)
 163                 return NULL;
 164         r->read_rate = read_rate;
 165         r->write_rate = write_rate;
 166         r->read_maximum = read_burst;
 167         r->write_maximum = write_burst;
 168         memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
 169         r->msec_per_tick = (tick_len->tv_sec * 1000) +
 170             (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
 171         return r;
 172 }
 173 
 174 void
 175 ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
 176 {
 177         mm_free(cfg);
 178 }
 179 
 180 
 181 
 182 #define MAX_TO_READ_EVER 16384
 183 
 184 
 185 #define MAX_TO_WRITE_EVER 16384
 186 
 187 #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
 188 #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
 189 
 190 static int _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g);
 191 static int _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g);
 192 static void _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g);
 193 static void _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g);
 194 
 195 
 196 
 197 
 198 
 199 static inline ev_ssize_t
 200 _bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write)
 201 {
 202         
 203         ev_ssize_t max_so_far = is_write?MAX_TO_WRITE_EVER:MAX_TO_READ_EVER;
 204 
 205 #define LIM(x)                                          \
 206         (is_write ? (x).write_limit : (x).read_limit)
 207 
 208 #define GROUP_SUSPENDED(g)                      \
 209         (is_write ? (g)->write_suspended : (g)->read_suspended)
 210 
 211         
 212 #define CLAMPTO(x)                              \
 213         do {                                    \
 214                 if (max_so_far > (x))           \
 215                         max_so_far = (x);       \
 216         } while (0);
 217 
 218         if (!bev->rate_limiting)
 219                 return max_so_far;
 220 
 221         
 222 
 223 
 224 
 225 
 226         if (bev->rate_limiting->cfg) {
 227                 bufferevent_update_buckets(bev);
 228                 max_so_far = LIM(bev->rate_limiting->limit);
 229         }
 230         if (bev->rate_limiting->group) {
 231                 struct bufferevent_rate_limit_group *g =
 232                     bev->rate_limiting->group;
 233                 ev_ssize_t share;
 234                 LOCK_GROUP(g);
 235                 if (GROUP_SUSPENDED(g)) {
 236                         
 237 
 238 
 239                         if (is_write)
 240                                 bufferevent_suspend_write(&bev->bev,
 241                                     BEV_SUSPEND_BW_GROUP);
 242                         else
 243                                 bufferevent_suspend_read(&bev->bev,
 244                                     BEV_SUSPEND_BW_GROUP);
 245                         share = 0;
 246                 } else {
 247                         
 248 
 249                         share = LIM(g->rate_limit) / g->n_members;
 250                         if (share < g->min_share)
 251                                 share = g->min_share;
 252                 }
 253                 UNLOCK_GROUP(g);
 254                 CLAMPTO(share);
 255         }
 256 
 257         if (max_so_far < 0)
 258                 max_so_far = 0;
 259         return max_so_far;
 260 }
 261 
 262 ev_ssize_t
 263 _bufferevent_get_read_max(struct bufferevent_private *bev)
 264 {
 265         return _bufferevent_get_rlim_max(bev, 0);
 266 }
 267 
 268 ev_ssize_t
 269 _bufferevent_get_write_max(struct bufferevent_private *bev)
 270 {
 271         return _bufferevent_get_rlim_max(bev, 1);
 272 }
 273 
 274 int
 275 _bufferevent_decrement_read_buckets(struct bufferevent_private *bev, ev_ssize_t bytes)
 276 {
 277         
 278         int r = 0;
 279         
 280         if (!bev->rate_limiting)
 281                 return 0;
 282 
 283         if (bev->rate_limiting->cfg) {
 284                 bev->rate_limiting->limit.read_limit -= bytes;
 285                 if (bev->rate_limiting->limit.read_limit <= 0) {
 286                         bufferevent_suspend_read(&bev->bev, BEV_SUSPEND_BW);
 287                         if (event_add(&bev->rate_limiting->refill_bucket_event,
 288                                 &bev->rate_limiting->cfg->tick_timeout) < 0)
 289                                 r = -1;
 290                 } else if (bev->read_suspended & BEV_SUSPEND_BW) {
 291                         if (!(bev->write_suspended & BEV_SUSPEND_BW))
 292                                 event_del(&bev->rate_limiting->refill_bucket_event);
 293                         bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW);
 294                 }
 295         }
 296 
 297         if (bev->rate_limiting->group) {
 298                 LOCK_GROUP(bev->rate_limiting->group);
 299                 bev->rate_limiting->group->rate_limit.read_limit -= bytes;
 300                 bev->rate_limiting->group->total_read += bytes;
 301                 if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
 302                         _bev_group_suspend_reading(bev->rate_limiting->group);
 303                 } else if (bev->rate_limiting->group->read_suspended) {
 304                         _bev_group_unsuspend_reading(bev->rate_limiting->group);
 305                 }
 306                 UNLOCK_GROUP(bev->rate_limiting->group);
 307         }
 308 
 309         return r;
 310 }
 311 
 312 int
 313 _bufferevent_decrement_write_buckets(struct bufferevent_private *bev, ev_ssize_t bytes)
 314 {
 315         
 316         int r = 0;
 317         
 318         if (!bev->rate_limiting)
 319                 return 0;
 320 
 321         if (bev->rate_limiting->cfg) {
 322                 bev->rate_limiting->limit.write_limit -= bytes;
 323                 if (bev->rate_limiting->limit.write_limit <= 0) {
 324                         bufferevent_suspend_write(&bev->bev, BEV_SUSPEND_BW);
 325                         if (event_add(&bev->rate_limiting->refill_bucket_event,
 326                                 &bev->rate_limiting->cfg->tick_timeout) < 0)
 327                                 r = -1;
 328                 } else if (bev->write_suspended & BEV_SUSPEND_BW) {
 329                         if (!(bev->read_suspended & BEV_SUSPEND_BW))
 330                                 event_del(&bev->rate_limiting->refill_bucket_event);
 331                         bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW);
 332                 }
 333         }
 334 
 335         if (bev->rate_limiting->group) {
 336                 LOCK_GROUP(bev->rate_limiting->group);
 337                 bev->rate_limiting->group->rate_limit.write_limit -= bytes;
 338                 bev->rate_limiting->group->total_written += bytes;
 339                 if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
 340                         _bev_group_suspend_writing(bev->rate_limiting->group);
 341                 } else if (bev->rate_limiting->group->write_suspended) {
 342                         _bev_group_unsuspend_writing(bev->rate_limiting->group);
 343                 }
 344                 UNLOCK_GROUP(bev->rate_limiting->group);
 345         }
 346 
 347         return r;
 348 }
 349 
 350 
 351 static int
 352 _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g)
 353 {
 354         
 355         struct bufferevent_private *bev;
 356         g->read_suspended = 1;
 357         g->pending_unsuspend_read = 0;
 358 
 359         
 360 
 361 
 362 
 363 
 364 
 365         TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
 366                 if (EVLOCK_TRY_LOCK(bev->lock)) {
 367                         bufferevent_suspend_read(&bev->bev,
 368                             BEV_SUSPEND_BW_GROUP);
 369                         EVLOCK_UNLOCK(bev->lock, 0);
 370                 }
 371         }
 372         return 0;
 373 }
 374 
 375 
 376 static int
 377 _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g)
 378 {
 379         
 380         struct bufferevent_private *bev;
 381         g->write_suspended = 1;
 382         g->pending_unsuspend_write = 0;
 383         TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
 384                 if (EVLOCK_TRY_LOCK(bev->lock)) {
 385                         bufferevent_suspend_write(&bev->bev,
 386                             BEV_SUSPEND_BW_GROUP);
 387                         EVLOCK_UNLOCK(bev->lock, 0);
 388                 }
 389         }
 390         return 0;
 391 }
 392 
 393 
 394 
 395 static void
 396 _bev_refill_callback(evutil_socket_t fd, short what, void *arg)
 397 {
 398         unsigned tick;
 399         struct timeval now;
 400         struct bufferevent_private *bev = arg;
 401         int again = 0;
 402         BEV_LOCK(&bev->bev);
 403         if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
 404                 BEV_UNLOCK(&bev->bev);
 405                 return;
 406         }
 407 
 408         
 409         event_base_gettimeofday_cached(bev->bev.ev_base, &now);
 410         tick = ev_token_bucket_get_tick(&now,
 411             bev->rate_limiting->cfg);
 412         ev_token_bucket_update(&bev->rate_limiting->limit,
 413             bev->rate_limiting->cfg,
 414             tick);
 415 
 416         
 417         if ((bev->read_suspended & BEV_SUSPEND_BW)) {
 418                 if (bev->rate_limiting->limit.read_limit > 0)
 419                         bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW);
 420                 else
 421                         again = 1;
 422         }
 423         if ((bev->write_suspended & BEV_SUSPEND_BW)) {
 424                 if (bev->rate_limiting->limit.write_limit > 0)
 425                         bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW);
 426                 else
 427                         again = 1;
 428         }
 429         if (again) {
 430                 
 431 
 432 
 433 
 434 
 435 
 436                 
 437                 event_add(&bev->rate_limiting->refill_bucket_event,
 438                     &bev->rate_limiting->cfg->tick_timeout);
 439         }
 440         BEV_UNLOCK(&bev->bev);
 441 }
 442 
 443 
 444 static struct bufferevent_private *
 445 _bev_group_random_element(struct bufferevent_rate_limit_group *group)
 446 {
 447         int which;
 448         struct bufferevent_private *bev;
 449 
 450         
 451 
 452         if (!group->n_members)
 453                 return NULL;
 454 
 455         EVUTIL_ASSERT(! TAILQ_EMPTY(&group->members));
 456 
 457         which = _evutil_weakrand() % group->n_members;
 458 
 459         bev = TAILQ_FIRST(&group->members);
 460         while (which--)
 461                 bev = TAILQ_NEXT(bev, rate_limiting->next_in_group);
 462 
 463         return bev;
 464 }
 465 
 466 
 467 
 468 
 469 
 470 
 471 
 472 
 473 #define FOREACH_RANDOM_ORDER(block)                     \
 474         do {                                            \
 475                 first = _bev_group_random_element(g);   \
 476                 for (bev = first; bev != TAILQ_END(&g->members); \
 477                     bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \
 478                         block ;                                  \
 479                 }                                                \
 480                 for (bev = TAILQ_FIRST(&g->members); bev && bev != first; \
 481                     bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \
 482                         block ;                                         \
 483                 }                                                       \
 484         } while (0)
 485 
 486 static void
 487 _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g)
 488 {
 489         int again = 0;
 490         struct bufferevent_private *bev, *first;
 491 
 492         g->read_suspended = 0;
 493         FOREACH_RANDOM_ORDER({
 494                 if (EVLOCK_TRY_LOCK(bev->lock)) {
 495                         bufferevent_unsuspend_read(&bev->bev,
 496                             BEV_SUSPEND_BW_GROUP);
 497                         EVLOCK_UNLOCK(bev->lock, 0);
 498                 } else {
 499                         again = 1;
 500                 }
 501         });
 502         g->pending_unsuspend_read = again;
 503 }
 504 
 505 static void
 506 _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g)
 507 {
 508         int again = 0;
 509         struct bufferevent_private *bev, *first;
 510         g->write_suspended = 0;
 511 
 512         FOREACH_RANDOM_ORDER({
 513                 if (EVLOCK_TRY_LOCK(bev->lock)) {
 514                         bufferevent_unsuspend_write(&bev->bev,
 515                             BEV_SUSPEND_BW_GROUP);
 516                         EVLOCK_UNLOCK(bev->lock, 0);
 517                 } else {
 518                         again = 1;
 519                 }
 520         });
 521         g->pending_unsuspend_write = again;
 522 }
 523 
 524 
 525 
 526 
 527 static void
 528 _bev_group_refill_callback(evutil_socket_t fd, short what, void *arg)
 529 {
 530         struct bufferevent_rate_limit_group *g = arg;
 531         unsigned tick;
 532         struct timeval now;
 533 
 534         event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
 535 
 536         LOCK_GROUP(g);
 537 
 538         tick = ev_token_bucket_get_tick(&now, &g->rate_limit_cfg);
 539         ev_token_bucket_update(&g->rate_limit, &g->rate_limit_cfg, tick);
 540 
 541         if (g->pending_unsuspend_read ||
 542             (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
 543                 _bev_group_unsuspend_reading(g);
 544         }
 545         if (g->pending_unsuspend_write ||
 546             (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
 547                 _bev_group_unsuspend_writing(g);
 548         }
 549 
 550         
 551 
 552 
 553 
 554 
 555         UNLOCK_GROUP(g);
 556 }
 557 
 558 int
 559 bufferevent_set_rate_limit(struct bufferevent *bev,
 560     struct ev_token_bucket_cfg *cfg)
 561 {
 562         struct bufferevent_private *bevp =
 563             EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
 564         int r = -1;
 565         struct bufferevent_rate_limit *rlim;
 566         struct timeval now;
 567         ev_uint32_t tick;
 568         int reinit = 0, suspended = 0;
 569         
 570 
 571         BEV_LOCK(bev);
 572 
 573         if (cfg == NULL) {
 574                 if (bevp->rate_limiting) {
 575                         rlim = bevp->rate_limiting;
 576                         rlim->cfg = NULL;
 577                         bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
 578                         bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
 579                         if (event_initialized(&rlim->refill_bucket_event))
 580                                 event_del(&rlim->refill_bucket_event);
 581                 }
 582                 r = 0;
 583                 goto done;
 584         }
 585 
 586         event_base_gettimeofday_cached(bev->ev_base, &now);
 587         tick = ev_token_bucket_get_tick(&now, cfg);
 588 
 589         if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
 590                 
 591                 r = 0;
 592                 goto done;
 593         }
 594         if (bevp->rate_limiting == NULL) {
 595                 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
 596                 if (!rlim)
 597                         goto done;
 598                 bevp->rate_limiting = rlim;
 599         } else {
 600                 rlim = bevp->rate_limiting;
 601         }
 602         reinit = rlim->cfg != NULL;
 603 
 604         rlim->cfg = cfg;
 605         ev_token_bucket_init(&rlim->limit, cfg, tick, reinit);
 606 
 607         if (reinit) {
 608                 EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
 609                 event_del(&rlim->refill_bucket_event);
 610         }
 611         evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
 612             _bev_refill_callback, bevp);
 613 
 614         if (rlim->limit.read_limit > 0) {
 615                 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
 616         } else {
 617                 bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
 618                 suspended=1;
 619         }
 620         if (rlim->limit.write_limit > 0) {
 621                 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
 622         } else {
 623                 bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
 624                 suspended = 1;
 625         }
 626 
 627         if (suspended)
 628                 event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
 629 
 630         r = 0;
 631 
 632 done:
 633         BEV_UNLOCK(bev);
 634         return r;
 635 }
 636 
 637 struct bufferevent_rate_limit_group *
 638 bufferevent_rate_limit_group_new(struct event_base *base,
 639     const struct ev_token_bucket_cfg *cfg)
 640 {
 641         struct bufferevent_rate_limit_group *g;
 642         struct timeval now;
 643         ev_uint32_t tick;
 644 
 645         event_base_gettimeofday_cached(base, &now);
 646         tick = ev_token_bucket_get_tick(&now, cfg);
 647 
 648         g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
 649         if (!g)
 650                 return NULL;
 651         memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
 652         TAILQ_INIT(&g->members);
 653 
 654         ev_token_bucket_init(&g->rate_limit, cfg, tick, 0);
 655 
 656         event_assign(&g->master_refill_event, base, -1, EV_PERSIST,
 657             _bev_group_refill_callback, g);
 658         
 659         event_add(&g->master_refill_event, &cfg->tick_timeout);
 660 
 661         EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
 662 
 663         bufferevent_rate_limit_group_set_min_share(g, 64);
 664 
 665         return g;
 666 }
 667 
 668 int
 669 bufferevent_rate_limit_group_set_cfg(
 670         struct bufferevent_rate_limit_group *g,
 671         const struct ev_token_bucket_cfg *cfg)
 672 {
 673         int same_tick;
 674         if (!g || !cfg)
 675                 return -1;
 676 
 677         LOCK_GROUP(g);
 678         same_tick = evutil_timercmp(
 679                 &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
 680         memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
 681 
 682         if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
 683                 g->rate_limit.read_limit = cfg->read_maximum;
 684         if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
 685                 g->rate_limit.write_limit = cfg->write_maximum;
 686 
 687         if (!same_tick) {
 688                 
 689                 event_add(&g->master_refill_event, &cfg->tick_timeout);
 690         }
 691 
 692         
 693         bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
 694 
 695         UNLOCK_GROUP(g);
 696         return 0;
 697 }
 698 
 699 int
 700 bufferevent_rate_limit_group_set_min_share(
 701         struct bufferevent_rate_limit_group *g,
 702         size_t share)
 703 {
 704         if (share > EV_SSIZE_MAX)
 705                 return -1;
 706 
 707         g->configured_min_share = share;
 708 
 709         
 710 
 711         if (share > g->rate_limit_cfg.read_rate)
 712                 share = g->rate_limit_cfg.read_rate;
 713         if (share > g->rate_limit_cfg.write_rate)
 714                 share = g->rate_limit_cfg.write_rate;
 715 
 716         g->min_share = share;
 717         return 0;
 718 }
 719 
 720 void
 721 bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
 722 {
 723         LOCK_GROUP(g);
 724         EVUTIL_ASSERT(0 == g->n_members);
 725         event_del(&g->master_refill_event);
 726         UNLOCK_GROUP(g);
 727         EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
 728         mm_free(g);
 729 }
 730 
 731 int
 732 bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
 733     struct bufferevent_rate_limit_group *g)
 734 {
 735         int wsuspend, rsuspend;
 736         struct bufferevent_private *bevp =
 737             EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
 738         BEV_LOCK(bev);
 739 
 740         if (!bevp->rate_limiting) {
 741                 struct bufferevent_rate_limit *rlim;
 742                 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
 743                 if (!rlim) {
 744                         BEV_UNLOCK(bev);
 745                         return -1;
 746                 }
 747                 evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
 748                     _bev_refill_callback, bevp);
 749                 bevp->rate_limiting = rlim;
 750         }
 751 
 752         if (bevp->rate_limiting->group == g) {
 753                 BEV_UNLOCK(bev);
 754                 return 0;
 755         }
 756         if (bevp->rate_limiting->group)
 757                 bufferevent_remove_from_rate_limit_group(bev);
 758 
 759         LOCK_GROUP(g);
 760         bevp->rate_limiting->group = g;
 761         ++g->n_members;
 762         TAILQ_INSERT_TAIL(&g->members, bevp, rate_limiting->next_in_group);
 763 
 764         rsuspend = g->read_suspended;
 765         wsuspend = g->write_suspended;
 766 
 767         UNLOCK_GROUP(g);
 768 
 769         if (rsuspend)
 770                 bufferevent_suspend_read(bev, BEV_SUSPEND_BW_GROUP);
 771         if (wsuspend)
 772                 bufferevent_suspend_write(bev, BEV_SUSPEND_BW_GROUP);
 773 
 774         BEV_UNLOCK(bev);
 775         return 0;
 776 }
 777 
 778 int
 779 bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
 780 {
 781         return bufferevent_remove_from_rate_limit_group_internal(bev, 1);
 782 }
 783 
 784 int
 785 bufferevent_remove_from_rate_limit_group_internal(struct bufferevent *bev,
 786     int unsuspend)
 787 {
 788         struct bufferevent_private *bevp =
 789             EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
 790         BEV_LOCK(bev);
 791         if (bevp->rate_limiting && bevp->rate_limiting->group) {
 792                 struct bufferevent_rate_limit_group *g =
 793                     bevp->rate_limiting->group;
 794                 LOCK_GROUP(g);
 795                 bevp->rate_limiting->group = NULL;
 796                 --g->n_members;
 797                 TAILQ_REMOVE(&g->members, bevp, rate_limiting->next_in_group);
 798                 UNLOCK_GROUP(g);
 799         }
 800         if (unsuspend) {
 801                 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW_GROUP);
 802                 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW_GROUP);
 803         }
 804         BEV_UNLOCK(bev);
 805         return 0;
 806 }
 807 
 808 
 809 
 810 
 811 
 812 
 813 
 814 
 815 
 816 
 817 ev_ssize_t
 818 bufferevent_get_read_limit(struct bufferevent *bev)
 819 {
 820         ev_ssize_t r;
 821         struct bufferevent_private *bevp;
 822         BEV_LOCK(bev);
 823         bevp = BEV_UPCAST(bev);
 824         if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
 825                 bufferevent_update_buckets(bevp);
 826                 r = bevp->rate_limiting->limit.read_limit;
 827         } else {
 828                 r = EV_SSIZE_MAX;
 829         }
 830         BEV_UNLOCK(bev);
 831         return r;
 832 }
 833 
 834 
 835 
 836 ev_ssize_t
 837 bufferevent_get_write_limit(struct bufferevent *bev)
 838 {
 839         ev_ssize_t r;
 840         struct bufferevent_private *bevp;
 841         BEV_LOCK(bev);
 842         bevp = BEV_UPCAST(bev);
 843         if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
 844                 bufferevent_update_buckets(bevp);
 845                 r = bevp->rate_limiting->limit.write_limit;
 846         } else {
 847                 r = EV_SSIZE_MAX;
 848         }
 849         BEV_UNLOCK(bev);
 850         return r;
 851 }
 852 
 853 ev_ssize_t
 854 bufferevent_get_max_to_read(struct bufferevent *bev)
 855 {
 856         ev_ssize_t r;
 857         BEV_LOCK(bev);
 858         r = _bufferevent_get_read_max(BEV_UPCAST(bev));
 859         BEV_UNLOCK(bev);
 860         return r;
 861 }
 862 
 863 ev_ssize_t
 864 bufferevent_get_max_to_write(struct bufferevent *bev)
 865 {
 866         ev_ssize_t r;
 867         BEV_LOCK(bev);
 868         r = _bufferevent_get_write_max(BEV_UPCAST(bev));
 869         BEV_UNLOCK(bev);
 870         return r;
 871 }
 872 
 873 
 874 
 875 
 876 ev_ssize_t
 877 bufferevent_rate_limit_group_get_read_limit(
 878         struct bufferevent_rate_limit_group *grp)
 879 {
 880         ev_ssize_t r;
 881         LOCK_GROUP(grp);
 882         r = grp->rate_limit.read_limit;
 883         UNLOCK_GROUP(grp);
 884         return r;
 885 }
 886 
 887 
 888 
 889 ev_ssize_t
 890 bufferevent_rate_limit_group_get_write_limit(
 891         struct bufferevent_rate_limit_group *grp)
 892 {
 893         ev_ssize_t r;
 894         LOCK_GROUP(grp);
 895         r = grp->rate_limit.write_limit;
 896         UNLOCK_GROUP(grp);
 897         return r;
 898 }
 899 
 900 int
 901 bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
 902 {
 903         int r = 0;
 904         ev_ssize_t old_limit, new_limit;
 905         struct bufferevent_private *bevp;
 906         BEV_LOCK(bev);
 907         bevp = BEV_UPCAST(bev);
 908         EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
 909         old_limit = bevp->rate_limiting->limit.read_limit;
 910 
 911         new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
 912         if (old_limit > 0 && new_limit <= 0) {
 913                 bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
 914                 if (event_add(&bevp->rate_limiting->refill_bucket_event,
 915                         &bevp->rate_limiting->cfg->tick_timeout) < 0)
 916                         r = -1;
 917         } else if (old_limit <= 0 && new_limit > 0) {
 918                 if (!(bevp->write_suspended & BEV_SUSPEND_BW))
 919                         event_del(&bevp->rate_limiting->refill_bucket_event);
 920                 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
 921         }
 922 
 923         BEV_UNLOCK(bev);
 924         return r;
 925 }
 926 
 927 int
 928 bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
 929 {
 930         
 931 
 932         int r = 0;
 933         ev_ssize_t old_limit, new_limit;
 934         struct bufferevent_private *bevp;
 935         BEV_LOCK(bev);
 936         bevp = BEV_UPCAST(bev);
 937         EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
 938         old_limit = bevp->rate_limiting->limit.write_limit;
 939 
 940         new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
 941         if (old_limit > 0 && new_limit <= 0) {
 942                 bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
 943                 if (event_add(&bevp->rate_limiting->refill_bucket_event,
 944                         &bevp->rate_limiting->cfg->tick_timeout) < 0)
 945                         r = -1;
 946         } else if (old_limit <= 0 && new_limit > 0) {
 947                 if (!(bevp->read_suspended & BEV_SUSPEND_BW))
 948                         event_del(&bevp->rate_limiting->refill_bucket_event);
 949                 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
 950         }
 951 
 952         BEV_UNLOCK(bev);
 953         return r;
 954 }
 955 
 956 int
 957 bufferevent_rate_limit_group_decrement_read(
 958         struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
 959 {
 960         int r = 0;
 961         ev_ssize_t old_limit, new_limit;
 962         LOCK_GROUP(grp);
 963         old_limit = grp->rate_limit.read_limit;
 964         new_limit = (grp->rate_limit.read_limit -= decr);
 965 
 966         if (old_limit > 0 && new_limit <= 0) {
 967                 _bev_group_suspend_reading(grp);
 968         } else if (old_limit <= 0 && new_limit > 0) {
 969                 _bev_group_unsuspend_reading(grp);
 970         }
 971 
 972         UNLOCK_GROUP(grp);
 973         return r;
 974 }
 975 
 976 int
 977 bufferevent_rate_limit_group_decrement_write(
 978         struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
 979 {
 980         int r = 0;
 981         ev_ssize_t old_limit, new_limit;
 982         LOCK_GROUP(grp);
 983         old_limit = grp->rate_limit.write_limit;
 984         new_limit = (grp->rate_limit.write_limit -= decr);
 985 
 986         if (old_limit > 0 && new_limit <= 0) {
 987                 _bev_group_suspend_writing(grp);
 988         } else if (old_limit <= 0 && new_limit > 0) {
 989                 _bev_group_unsuspend_writing(grp);
 990         }
 991 
 992         UNLOCK_GROUP(grp);
 993         return r;
 994 }
 995 
 996 void
 997 bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
 998     ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
 999 {
1000         EVUTIL_ASSERT(grp != NULL);
1001         if (total_read_out)
1002                 *total_read_out = grp->total_read;
1003         if (total_written_out)
1004                 *total_written_out = grp->total_written;
1005 }
1006 
1007 void
1008 bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
1009 {
1010         grp->total_read = grp->total_written = 0;
1011 }