root/opal/mca/event/libevent2022/libevent/bufferevent_ratelim.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. ev_token_bucket_init
  2. ev_token_bucket_update
  3. bufferevent_update_buckets
  4. ev_token_bucket_get_tick
  5. ev_token_bucket_cfg_new
  6. ev_token_bucket_cfg_free
  7. _bufferevent_get_rlim_max
  8. _bufferevent_get_read_max
  9. _bufferevent_get_write_max
  10. _bufferevent_decrement_read_buckets
  11. _bufferevent_decrement_write_buckets
  12. _bev_group_suspend_reading
  13. _bev_group_suspend_writing
  14. _bev_refill_callback
  15. _bev_group_random_element
  16. _bev_group_unsuspend_reading
  17. _bev_group_unsuspend_writing
  18. _bev_group_refill_callback
  19. bufferevent_set_rate_limit
  20. bufferevent_rate_limit_group_new
  21. bufferevent_rate_limit_group_set_cfg
  22. bufferevent_rate_limit_group_set_min_share
  23. bufferevent_rate_limit_group_free
  24. bufferevent_add_to_rate_limit_group
  25. bufferevent_remove_from_rate_limit_group
  26. bufferevent_remove_from_rate_limit_group_internal
  27. bufferevent_get_read_limit
  28. bufferevent_get_write_limit
  29. bufferevent_get_max_to_read
  30. bufferevent_get_max_to_write
  31. bufferevent_rate_limit_group_get_read_limit
  32. bufferevent_rate_limit_group_get_write_limit
  33. bufferevent_decrement_read_limit
  34. bufferevent_decrement_write_limit
  35. bufferevent_rate_limit_group_decrement_read
  36. bufferevent_rate_limit_group_decrement_write
  37. bufferevent_rate_limit_group_get_totals
  38. bufferevent_rate_limit_group_reset_totals

   1 /*
   2  * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
   3  * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
   4  * All rights reserved.
   5  *
   6  * Redistribution and use in source and binary forms, with or without
   7  * modification, are permitted provided that the following conditions
   8  * are met:
   9  * 1. Redistributions of source code must retain the above copyright
  10  *    notice, this list of conditions and the following disclaimer.
  11  * 2. Redistributions in binary form must reproduce the above copyright
  12  *    notice, this list of conditions and the following disclaimer in the
  13  *    documentation and/or other materials provided with the distribution.
  14  * 3. The name of the author may not be used to endorse or promote products
  15  *    derived from this software without specific prior written permission.
  16  *
  17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
  18  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
  19  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
  20  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
  21  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
  22  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  23  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  24  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  25  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
  26  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  27  */
  28 
  29 #include <sys/types.h>
  30 #include <limits.h>
  31 #include <string.h>
  32 #include <stdlib.h>
  33 
  34 #include "event2/event.h"
  35 #include "event2/event_struct.h"
  36 #include "event2/util.h"
  37 #include "event2/bufferevent.h"
  38 #include "event2/bufferevent_struct.h"
  39 #include "event2/buffer.h"
  40 
  41 #include "ratelim-internal.h"
  42 
  43 #include "bufferevent-internal.h"
  44 #include "mm-internal.h"
  45 #include "util-internal.h"
  46 #include "event-internal.h"
  47 
  48 int
  49 ev_token_bucket_init(struct ev_token_bucket *bucket,
  50     const struct ev_token_bucket_cfg *cfg,
  51     ev_uint32_t current_tick,
  52     int reinitialize)
  53 {
  54         if (reinitialize) {
  55                 /* on reinitialization, we only clip downwards, since we've
  56                    already used who-knows-how-much bandwidth this tick.  We
  57                    leave "last_updated" as it is; the next update will add the
  58                    appropriate amount of bandwidth to the bucket.
  59                 */
  60                 if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
  61                         bucket->read_limit = cfg->read_maximum;
  62                 if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
  63                         bucket->write_limit = cfg->write_maximum;
  64         } else {
  65                 bucket->read_limit = cfg->read_rate;
  66                 bucket->write_limit = cfg->write_rate;
  67                 bucket->last_updated = current_tick;
  68         }
  69         return 0;
  70 }
  71 
  72 int
  73 ev_token_bucket_update(struct ev_token_bucket *bucket,
  74     const struct ev_token_bucket_cfg *cfg,
  75     ev_uint32_t current_tick)
  76 {
  77         /* It's okay if the tick number overflows, since we'll just
  78          * wrap around when we do the unsigned substraction. */
  79         unsigned n_ticks = current_tick - bucket->last_updated;
  80 
  81         /* Make sure some ticks actually happened, and that time didn't
  82          * roll back. */
  83         if (n_ticks == 0 || n_ticks > INT_MAX)
  84                 return 0;
  85 
  86         /* Naively, we would say
  87                 bucket->limit += n_ticks * cfg->rate;
  88 
  89                 if (bucket->limit > cfg->maximum)
  90                         bucket->limit = cfg->maximum;
  91 
  92            But we're worried about overflow, so we do it like this:
  93         */
  94 
  95         if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
  96                 bucket->read_limit = cfg->read_maximum;
  97         else
  98                 bucket->read_limit += n_ticks * cfg->read_rate;
  99 
 100 
 101         if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
 102                 bucket->write_limit = cfg->write_maximum;
 103         else
 104                 bucket->write_limit += n_ticks * cfg->write_rate;
 105 
 106 
 107         bucket->last_updated = current_tick;
 108 
 109         return 1;
 110 }
 111 
 112 static inline void
 113 bufferevent_update_buckets(struct bufferevent_private *bev)
 114 {
 115         /* Must hold lock on bev. */
 116         struct timeval now;
 117         unsigned tick;
 118         event_base_gettimeofday_cached(bev->bev.ev_base, &now);
 119         tick = ev_token_bucket_get_tick(&now, bev->rate_limiting->cfg);
 120         if (tick != bev->rate_limiting->limit.last_updated)
 121                 ev_token_bucket_update(&bev->rate_limiting->limit,
 122                     bev->rate_limiting->cfg, tick);
 123 }
 124 
 125 ev_uint32_t
 126 ev_token_bucket_get_tick(const struct timeval *tv,
 127     const struct ev_token_bucket_cfg *cfg)
 128 {
 129         /* This computation uses two multiplies and a divide.  We could do
 130          * fewer if we knew that the tick length was an integer number of
 131          * seconds, or if we knew it divided evenly into a second.  We should
 132          * investigate that more.
 133          */
 134 
 135         /* We cast to an ev_uint64_t first, since we don't want to overflow
 136          * before we do the final divide. */
 137         ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
 138         return (unsigned)(msec / cfg->msec_per_tick);
 139 }
 140 
 141 struct ev_token_bucket_cfg *
 142 ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
 143     size_t write_rate, size_t write_burst,
 144     const struct timeval *tick_len)
 145 {
 146         struct ev_token_bucket_cfg *r;
 147         struct timeval g;
 148         if (! tick_len) {
 149                 g.tv_sec = 1;
 150                 g.tv_usec = 0;
 151                 tick_len = &g;
 152         }
 153         if (read_rate > read_burst || write_rate > write_burst ||
 154             read_rate < 1 || write_rate < 1)
 155                 return NULL;
 156         if (read_rate > EV_RATE_LIMIT_MAX ||
 157             write_rate > EV_RATE_LIMIT_MAX ||
 158             read_burst > EV_RATE_LIMIT_MAX ||
 159             write_burst > EV_RATE_LIMIT_MAX)
 160                 return NULL;
 161         r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
 162         if (!r)
 163                 return NULL;
 164         r->read_rate = read_rate;
 165         r->write_rate = write_rate;
 166         r->read_maximum = read_burst;
 167         r->write_maximum = write_burst;
 168         memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
 169         r->msec_per_tick = (tick_len->tv_sec * 1000) +
 170             (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
 171         return r;
 172 }
 173 
 174 void
 175 ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
 176 {
 177         mm_free(cfg);
 178 }
 179 
 180 /* No matter how big our bucket gets, don't try to read more than this
 181  * much in a single read operation. */
 182 #define MAX_TO_READ_EVER 16384
 183 /* No matter how big our bucket gets, don't try to write more than this
 184  * much in a single write operation. */
 185 #define MAX_TO_WRITE_EVER 16384
 186 
 187 #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
 188 #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
 189 
 190 static int _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g);
 191 static int _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g);
 192 static void _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g);
 193 static void _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g);
 194 
 195 /** Helper: figure out the maximum amount we should write if is_write, or
 196     the maximum amount we should read if is_read.  Return that maximum, or
 197     0 if our bucket is wholly exhausted.
 198  */
 199 static inline ev_ssize_t
 200 _bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write)
 201 {
 202         /* needs lock on bev. */
 203         ev_ssize_t max_so_far = is_write?MAX_TO_WRITE_EVER:MAX_TO_READ_EVER;
 204 
 205 #define LIM(x)                                          \
 206         (is_write ? (x).write_limit : (x).read_limit)
 207 
 208 #define GROUP_SUSPENDED(g)                      \
 209         (is_write ? (g)->write_suspended : (g)->read_suspended)
 210 
 211         /* Sets max_so_far to MIN(x, max_so_far) */
 212 #define CLAMPTO(x)                              \
 213         do {                                    \
 214                 if (max_so_far > (x))           \
 215                         max_so_far = (x);       \
 216         } while (0);
 217 
 218         if (!bev->rate_limiting)
 219                 return max_so_far;
 220 
 221         /* If rate-limiting is enabled at all, update the appropriate
 222            bucket, and take the smaller of our rate limit and the group
 223            rate limit.
 224          */
 225 
 226         if (bev->rate_limiting->cfg) {
 227                 bufferevent_update_buckets(bev);
 228                 max_so_far = LIM(bev->rate_limiting->limit);
 229         }
 230         if (bev->rate_limiting->group) {
 231                 struct bufferevent_rate_limit_group *g =
 232                     bev->rate_limiting->group;
 233                 ev_ssize_t share;
 234                 LOCK_GROUP(g);
 235                 if (GROUP_SUSPENDED(g)) {
 236                         /* We can get here if we failed to lock this
 237                          * particular bufferevent while suspending the whole
 238                          * group. */
 239                         if (is_write)
 240                                 bufferevent_suspend_write(&bev->bev,
 241                                     BEV_SUSPEND_BW_GROUP);
 242                         else
 243                                 bufferevent_suspend_read(&bev->bev,
 244                                     BEV_SUSPEND_BW_GROUP);
 245                         share = 0;
 246                 } else {
 247                         /* XXXX probably we should divide among the active
 248                          * members, not the total members. */
 249                         share = LIM(g->rate_limit) / g->n_members;
 250                         if (share < g->min_share)
 251                                 share = g->min_share;
 252                 }
 253                 UNLOCK_GROUP(g);
 254                 CLAMPTO(share);
 255         }
 256 
 257         if (max_so_far < 0)
 258                 max_so_far = 0;
 259         return max_so_far;
 260 }
 261 
 262 ev_ssize_t
 263 _bufferevent_get_read_max(struct bufferevent_private *bev)
 264 {
 265         return _bufferevent_get_rlim_max(bev, 0);
 266 }
 267 
 268 ev_ssize_t
 269 _bufferevent_get_write_max(struct bufferevent_private *bev)
 270 {
 271         return _bufferevent_get_rlim_max(bev, 1);
 272 }
 273 
 274 int
 275 _bufferevent_decrement_read_buckets(struct bufferevent_private *bev, ev_ssize_t bytes)
 276 {
 277         /* XXXXX Make sure all users of this function check its return value */
 278         int r = 0;
 279         /* need to hold lock on bev */
 280         if (!bev->rate_limiting)
 281                 return 0;
 282 
 283         if (bev->rate_limiting->cfg) {
 284                 bev->rate_limiting->limit.read_limit -= bytes;
 285                 if (bev->rate_limiting->limit.read_limit <= 0) {
 286                         bufferevent_suspend_read(&bev->bev, BEV_SUSPEND_BW);
 287                         if (event_add(&bev->rate_limiting->refill_bucket_event,
 288                                 &bev->rate_limiting->cfg->tick_timeout) < 0)
 289                                 r = -1;
 290                 } else if (bev->read_suspended & BEV_SUSPEND_BW) {
 291                         if (!(bev->write_suspended & BEV_SUSPEND_BW))
 292                                 event_del(&bev->rate_limiting->refill_bucket_event);
 293                         bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW);
 294                 }
 295         }
 296 
 297         if (bev->rate_limiting->group) {
 298                 LOCK_GROUP(bev->rate_limiting->group);
 299                 bev->rate_limiting->group->rate_limit.read_limit -= bytes;
 300                 bev->rate_limiting->group->total_read += bytes;
 301                 if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
 302                         _bev_group_suspend_reading(bev->rate_limiting->group);
 303                 } else if (bev->rate_limiting->group->read_suspended) {
 304                         _bev_group_unsuspend_reading(bev->rate_limiting->group);
 305                 }
 306                 UNLOCK_GROUP(bev->rate_limiting->group);
 307         }
 308 
 309         return r;
 310 }
 311 
 312 int
 313 _bufferevent_decrement_write_buckets(struct bufferevent_private *bev, ev_ssize_t bytes)
 314 {
 315         /* XXXXX Make sure all users of this function check its return value */
 316         int r = 0;
 317         /* need to hold lock */
 318         if (!bev->rate_limiting)
 319                 return 0;
 320 
 321         if (bev->rate_limiting->cfg) {
 322                 bev->rate_limiting->limit.write_limit -= bytes;
 323                 if (bev->rate_limiting->limit.write_limit <= 0) {
 324                         bufferevent_suspend_write(&bev->bev, BEV_SUSPEND_BW);
 325                         if (event_add(&bev->rate_limiting->refill_bucket_event,
 326                                 &bev->rate_limiting->cfg->tick_timeout) < 0)
 327                                 r = -1;
 328                 } else if (bev->write_suspended & BEV_SUSPEND_BW) {
 329                         if (!(bev->read_suspended & BEV_SUSPEND_BW))
 330                                 event_del(&bev->rate_limiting->refill_bucket_event);
 331                         bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW);
 332                 }
 333         }
 334 
 335         if (bev->rate_limiting->group) {
 336                 LOCK_GROUP(bev->rate_limiting->group);
 337                 bev->rate_limiting->group->rate_limit.write_limit -= bytes;
 338                 bev->rate_limiting->group->total_written += bytes;
 339                 if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
 340                         _bev_group_suspend_writing(bev->rate_limiting->group);
 341                 } else if (bev->rate_limiting->group->write_suspended) {
 342                         _bev_group_unsuspend_writing(bev->rate_limiting->group);
 343                 }
 344                 UNLOCK_GROUP(bev->rate_limiting->group);
 345         }
 346 
 347         return r;
 348 }
 349 
 350 /** Stop reading on every bufferevent in <b>g</b> */
 351 static int
 352 _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g)
 353 {
 354         /* Needs group lock */
 355         struct bufferevent_private *bev;
 356         g->read_suspended = 1;
 357         g->pending_unsuspend_read = 0;
 358 
 359         /* Note that in this loop we call EVLOCK_TRY_LOCK instead of BEV_LOCK,
 360            to prevent a deadlock.  (Ordinarily, the group lock nests inside
 361            the bufferevent locks.  If we are unable to lock any individual
 362            bufferevent, it will find out later when it looks at its limit
 363            and sees that its group is suspended.
 364         */
 365         TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
 366                 if (EVLOCK_TRY_LOCK(bev->lock)) {
 367                         bufferevent_suspend_read(&bev->bev,
 368                             BEV_SUSPEND_BW_GROUP);
 369                         EVLOCK_UNLOCK(bev->lock, 0);
 370                 }
 371         }
 372         return 0;
 373 }
 374 
 375 /** Stop writing on every bufferevent in <b>g</b> */
 376 static int
 377 _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g)
 378 {
 379         /* Needs group lock */
 380         struct bufferevent_private *bev;
 381         g->write_suspended = 1;
 382         g->pending_unsuspend_write = 0;
 383         TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
 384                 if (EVLOCK_TRY_LOCK(bev->lock)) {
 385                         bufferevent_suspend_write(&bev->bev,
 386                             BEV_SUSPEND_BW_GROUP);
 387                         EVLOCK_UNLOCK(bev->lock, 0);
 388                 }
 389         }
 390         return 0;
 391 }
 392 
 393 /** Timer callback invoked on a single bufferevent with one or more exhausted
 394     buckets when they are ready to refill. */
 395 static void
 396 _bev_refill_callback(evutil_socket_t fd, short what, void *arg)
 397 {
 398         unsigned tick;
 399         struct timeval now;
 400         struct bufferevent_private *bev = arg;
 401         int again = 0;
 402         BEV_LOCK(&bev->bev);
 403         if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
 404                 BEV_UNLOCK(&bev->bev);
 405                 return;
 406         }
 407 
 408         /* First, update the bucket */
 409         event_base_gettimeofday_cached(bev->bev.ev_base, &now);
 410         tick = ev_token_bucket_get_tick(&now,
 411             bev->rate_limiting->cfg);
 412         ev_token_bucket_update(&bev->rate_limiting->limit,
 413             bev->rate_limiting->cfg,
 414             tick);
 415 
 416         /* Now unsuspend any read/write operations as appropriate. */
 417         if ((bev->read_suspended & BEV_SUSPEND_BW)) {
 418                 if (bev->rate_limiting->limit.read_limit > 0)
 419                         bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW);
 420                 else
 421                         again = 1;
 422         }
 423         if ((bev->write_suspended & BEV_SUSPEND_BW)) {
 424                 if (bev->rate_limiting->limit.write_limit > 0)
 425                         bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW);
 426                 else
 427                         again = 1;
 428         }
 429         if (again) {
 430                 /* One or more of the buckets may need another refill if they
 431                    started negative.
 432 
 433                    XXXX if we need to be quiet for more ticks, we should
 434                    maybe figure out what timeout we really want.
 435                 */
 436                 /* XXXX Handle event_add failure somehow */
 437                 event_add(&bev->rate_limiting->refill_bucket_event,
 438                     &bev->rate_limiting->cfg->tick_timeout);
 439         }
 440         BEV_UNLOCK(&bev->bev);
 441 }
 442 
 443 /** Helper: grab a random element from a bufferevent group. */
 444 static struct bufferevent_private *
 445 _bev_group_random_element(struct bufferevent_rate_limit_group *group)
 446 {
 447         int which;
 448         struct bufferevent_private *bev;
 449 
 450         /* requires group lock */
 451 
 452         if (!group->n_members)
 453                 return NULL;
 454 
 455         EVUTIL_ASSERT(! TAILQ_EMPTY(&group->members));
 456 
 457         which = _evutil_weakrand() % group->n_members;
 458 
 459         bev = TAILQ_FIRST(&group->members);
 460         while (which--)
 461                 bev = TAILQ_NEXT(bev, rate_limiting->next_in_group);
 462 
 463         return bev;
 464 }
 465 
 466 /** Iterate over the elements of a rate-limiting group 'g' with a random
 467     starting point, assigning each to the variable 'bev', and executing the
 468     block 'block'.
 469 
 470     We do this in a half-baked effort to get fairness among group members.
 471     XXX Round-robin or some kind of priority queue would be even more fair.
 472  */
 473 #define FOREACH_RANDOM_ORDER(block)                     \
 474         do {                                            \
 475                 first = _bev_group_random_element(g);   \
 476                 for (bev = first; bev != TAILQ_END(&g->members); \
 477                     bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \
 478                         block ;                                  \
 479                 }                                                \
 480                 for (bev = TAILQ_FIRST(&g->members); bev && bev != first; \
 481                     bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \
 482                         block ;                                         \
 483                 }                                                       \
 484         } while (0)
 485 
 486 static void
 487 _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g)
 488 {
 489         int again = 0;
 490         struct bufferevent_private *bev, *first;
 491 
 492         g->read_suspended = 0;
 493         FOREACH_RANDOM_ORDER({
 494                 if (EVLOCK_TRY_LOCK(bev->lock)) {
 495                         bufferevent_unsuspend_read(&bev->bev,
 496                             BEV_SUSPEND_BW_GROUP);
 497                         EVLOCK_UNLOCK(bev->lock, 0);
 498                 } else {
 499                         again = 1;
 500                 }
 501         });
 502         g->pending_unsuspend_read = again;
 503 }
 504 
 505 static void
 506 _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g)
 507 {
 508         int again = 0;
 509         struct bufferevent_private *bev, *first;
 510         g->write_suspended = 0;
 511 
 512         FOREACH_RANDOM_ORDER({
 513                 if (EVLOCK_TRY_LOCK(bev->lock)) {
 514                         bufferevent_unsuspend_write(&bev->bev,
 515                             BEV_SUSPEND_BW_GROUP);
 516                         EVLOCK_UNLOCK(bev->lock, 0);
 517                 } else {
 518                         again = 1;
 519                 }
 520         });
 521         g->pending_unsuspend_write = again;
 522 }
 523 
 524 /** Callback invoked every tick to add more elements to the group bucket
 525     and unsuspend group members as needed.
 526  */
 527 static void
 528 _bev_group_refill_callback(evutil_socket_t fd, short what, void *arg)
 529 {
 530         struct bufferevent_rate_limit_group *g = arg;
 531         unsigned tick;
 532         struct timeval now;
 533 
 534         event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
 535 
 536         LOCK_GROUP(g);
 537 
 538         tick = ev_token_bucket_get_tick(&now, &g->rate_limit_cfg);
 539         ev_token_bucket_update(&g->rate_limit, &g->rate_limit_cfg, tick);
 540 
 541         if (g->pending_unsuspend_read ||
 542             (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
 543                 _bev_group_unsuspend_reading(g);
 544         }
 545         if (g->pending_unsuspend_write ||
 546             (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
 547                 _bev_group_unsuspend_writing(g);
 548         }
 549 
 550         /* XXXX Rather than waiting to the next tick to unsuspend stuff
 551          * with pending_unsuspend_write/read, we should do it on the
 552          * next iteration of the mainloop.
 553          */
 554 
 555         UNLOCK_GROUP(g);
 556 }
 557 
 558 int
 559 bufferevent_set_rate_limit(struct bufferevent *bev,
 560     struct ev_token_bucket_cfg *cfg)
 561 {
 562         struct bufferevent_private *bevp =
 563             EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
 564         int r = -1;
 565         struct bufferevent_rate_limit *rlim;
 566         struct timeval now;
 567         ev_uint32_t tick;
 568         int reinit = 0, suspended = 0;
 569         /* XXX reference-count cfg */
 570 
 571         BEV_LOCK(bev);
 572 
 573         if (cfg == NULL) {
 574                 if (bevp->rate_limiting) {
 575                         rlim = bevp->rate_limiting;
 576                         rlim->cfg = NULL;
 577                         bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
 578                         bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
 579                         if (event_initialized(&rlim->refill_bucket_event))
 580                                 event_del(&rlim->refill_bucket_event);
 581                 }
 582                 r = 0;
 583                 goto done;
 584         }
 585 
 586         event_base_gettimeofday_cached(bev->ev_base, &now);
 587         tick = ev_token_bucket_get_tick(&now, cfg);
 588 
 589         if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
 590                 /* no-op */
 591                 r = 0;
 592                 goto done;
 593         }
 594         if (bevp->rate_limiting == NULL) {
 595                 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
 596                 if (!rlim)
 597                         goto done;
 598                 bevp->rate_limiting = rlim;
 599         } else {
 600                 rlim = bevp->rate_limiting;
 601         }
 602         reinit = rlim->cfg != NULL;
 603 
 604         rlim->cfg = cfg;
 605         ev_token_bucket_init(&rlim->limit, cfg, tick, reinit);
 606 
 607         if (reinit) {
 608                 EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
 609                 event_del(&rlim->refill_bucket_event);
 610         }
 611         evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
 612             _bev_refill_callback, bevp);
 613 
 614         if (rlim->limit.read_limit > 0) {
 615                 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
 616         } else {
 617                 bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
 618                 suspended=1;
 619         }
 620         if (rlim->limit.write_limit > 0) {
 621                 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
 622         } else {
 623                 bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
 624                 suspended = 1;
 625         }
 626 
 627         if (suspended)
 628                 event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
 629 
 630         r = 0;
 631 
 632 done:
 633         BEV_UNLOCK(bev);
 634         return r;
 635 }
 636 
 637 struct bufferevent_rate_limit_group *
 638 bufferevent_rate_limit_group_new(struct event_base *base,
 639     const struct ev_token_bucket_cfg *cfg)
 640 {
 641         struct bufferevent_rate_limit_group *g;
 642         struct timeval now;
 643         ev_uint32_t tick;
 644 
 645         event_base_gettimeofday_cached(base, &now);
 646         tick = ev_token_bucket_get_tick(&now, cfg);
 647 
 648         g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
 649         if (!g)
 650                 return NULL;
 651         memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
 652         TAILQ_INIT(&g->members);
 653 
 654         ev_token_bucket_init(&g->rate_limit, cfg, tick, 0);
 655 
 656         event_assign(&g->master_refill_event, base, -1, EV_PERSIST,
 657             _bev_group_refill_callback, g);
 658         /*XXXX handle event_add failure */
 659         event_add(&g->master_refill_event, &cfg->tick_timeout);
 660 
 661         EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
 662 
 663         bufferevent_rate_limit_group_set_min_share(g, 64);
 664 
 665         return g;
 666 }
 667 
 668 int
 669 bufferevent_rate_limit_group_set_cfg(
 670         struct bufferevent_rate_limit_group *g,
 671         const struct ev_token_bucket_cfg *cfg)
 672 {
 673         int same_tick;
 674         if (!g || !cfg)
 675                 return -1;
 676 
 677         LOCK_GROUP(g);
 678         same_tick = evutil_timercmp(
 679                 &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
 680         memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
 681 
 682         if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
 683                 g->rate_limit.read_limit = cfg->read_maximum;
 684         if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
 685                 g->rate_limit.write_limit = cfg->write_maximum;
 686 
 687         if (!same_tick) {
 688                 /* This can cause a hiccup in the schedule */
 689                 event_add(&g->master_refill_event, &cfg->tick_timeout);
 690         }
 691 
 692         /* The new limits might force us to adjust min_share differently. */
 693         bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
 694 
 695         UNLOCK_GROUP(g);
 696         return 0;
 697 }
 698 
 699 int
 700 bufferevent_rate_limit_group_set_min_share(
 701         struct bufferevent_rate_limit_group *g,
 702         size_t share)
 703 {
 704         if (share > EV_SSIZE_MAX)
 705                 return -1;
 706 
 707         g->configured_min_share = share;
 708 
 709         /* Can't set share to less than the one-tick maximum.  IOW, at steady
 710          * state, at least one connection can go per tick. */
 711         if (share > g->rate_limit_cfg.read_rate)
 712                 share = g->rate_limit_cfg.read_rate;
 713         if (share > g->rate_limit_cfg.write_rate)
 714                 share = g->rate_limit_cfg.write_rate;
 715 
 716         g->min_share = share;
 717         return 0;
 718 }
 719 
 720 void
 721 bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
 722 {
 723         LOCK_GROUP(g);
 724         EVUTIL_ASSERT(0 == g->n_members);
 725         event_del(&g->master_refill_event);
 726         UNLOCK_GROUP(g);
 727         EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
 728         mm_free(g);
 729 }
 730 
 731 int
 732 bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
 733     struct bufferevent_rate_limit_group *g)
 734 {
 735         int wsuspend, rsuspend;
 736         struct bufferevent_private *bevp =
 737             EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
 738         BEV_LOCK(bev);
 739 
 740         if (!bevp->rate_limiting) {
 741                 struct bufferevent_rate_limit *rlim;
 742                 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
 743                 if (!rlim) {
 744                         BEV_UNLOCK(bev);
 745                         return -1;
 746                 }
 747                 evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
 748                     _bev_refill_callback, bevp);
 749                 bevp->rate_limiting = rlim;
 750         }
 751 
 752         if (bevp->rate_limiting->group == g) {
 753                 BEV_UNLOCK(bev);
 754                 return 0;
 755         }
 756         if (bevp->rate_limiting->group)
 757                 bufferevent_remove_from_rate_limit_group(bev);
 758 
 759         LOCK_GROUP(g);
 760         bevp->rate_limiting->group = g;
 761         ++g->n_members;
 762         TAILQ_INSERT_TAIL(&g->members, bevp, rate_limiting->next_in_group);
 763 
 764         rsuspend = g->read_suspended;
 765         wsuspend = g->write_suspended;
 766 
 767         UNLOCK_GROUP(g);
 768 
 769         if (rsuspend)
 770                 bufferevent_suspend_read(bev, BEV_SUSPEND_BW_GROUP);
 771         if (wsuspend)
 772                 bufferevent_suspend_write(bev, BEV_SUSPEND_BW_GROUP);
 773 
 774         BEV_UNLOCK(bev);
 775         return 0;
 776 }
 777 
 778 int
 779 bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
 780 {
 781         return bufferevent_remove_from_rate_limit_group_internal(bev, 1);
 782 }
 783 
 784 int
 785 bufferevent_remove_from_rate_limit_group_internal(struct bufferevent *bev,
 786     int unsuspend)
 787 {
 788         struct bufferevent_private *bevp =
 789             EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
 790         BEV_LOCK(bev);
 791         if (bevp->rate_limiting && bevp->rate_limiting->group) {
 792                 struct bufferevent_rate_limit_group *g =
 793                     bevp->rate_limiting->group;
 794                 LOCK_GROUP(g);
 795                 bevp->rate_limiting->group = NULL;
 796                 --g->n_members;
 797                 TAILQ_REMOVE(&g->members, bevp, rate_limiting->next_in_group);
 798                 UNLOCK_GROUP(g);
 799         }
 800         if (unsuspend) {
 801                 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW_GROUP);
 802                 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW_GROUP);
 803         }
 804         BEV_UNLOCK(bev);
 805         return 0;
 806 }
 807 
 808 /* ===
 809  * API functions to expose rate limits.
 810  *
 811  * Don't use these from inside Libevent; they're meant to be for use by
 812  * the program.
 813  * === */
 814 
 815 /* Mostly you don't want to use this function from inside libevent;
 816  * _bufferevent_get_read_max() is more likely what you want*/
 817 ev_ssize_t
 818 bufferevent_get_read_limit(struct bufferevent *bev)
 819 {
 820         ev_ssize_t r;
 821         struct bufferevent_private *bevp;
 822         BEV_LOCK(bev);
 823         bevp = BEV_UPCAST(bev);
 824         if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
 825                 bufferevent_update_buckets(bevp);
 826                 r = bevp->rate_limiting->limit.read_limit;
 827         } else {
 828                 r = EV_SSIZE_MAX;
 829         }
 830         BEV_UNLOCK(bev);
 831         return r;
 832 }
 833 
 834 /* Mostly you don't want to use this function from inside libevent;
 835  * _bufferevent_get_write_max() is more likely what you want*/
 836 ev_ssize_t
 837 bufferevent_get_write_limit(struct bufferevent *bev)
 838 {
 839         ev_ssize_t r;
 840         struct bufferevent_private *bevp;
 841         BEV_LOCK(bev);
 842         bevp = BEV_UPCAST(bev);
 843         if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
 844                 bufferevent_update_buckets(bevp);
 845                 r = bevp->rate_limiting->limit.write_limit;
 846         } else {
 847                 r = EV_SSIZE_MAX;
 848         }
 849         BEV_UNLOCK(bev);
 850         return r;
 851 }
 852 
 853 ev_ssize_t
 854 bufferevent_get_max_to_read(struct bufferevent *bev)
 855 {
 856         ev_ssize_t r;
 857         BEV_LOCK(bev);
 858         r = _bufferevent_get_read_max(BEV_UPCAST(bev));
 859         BEV_UNLOCK(bev);
 860         return r;
 861 }
 862 
 863 ev_ssize_t
 864 bufferevent_get_max_to_write(struct bufferevent *bev)
 865 {
 866         ev_ssize_t r;
 867         BEV_LOCK(bev);
 868         r = _bufferevent_get_write_max(BEV_UPCAST(bev));
 869         BEV_UNLOCK(bev);
 870         return r;
 871 }
 872 
 873 
 874 /* Mostly you don't want to use this function from inside libevent;
 875  * _bufferevent_get_read_max() is more likely what you want*/
 876 ev_ssize_t
 877 bufferevent_rate_limit_group_get_read_limit(
 878         struct bufferevent_rate_limit_group *grp)
 879 {
 880         ev_ssize_t r;
 881         LOCK_GROUP(grp);
 882         r = grp->rate_limit.read_limit;
 883         UNLOCK_GROUP(grp);
 884         return r;
 885 }
 886 
 887 /* Mostly you don't want to use this function from inside libevent;
 888  * _bufferevent_get_write_max() is more likely what you want. */
 889 ev_ssize_t
 890 bufferevent_rate_limit_group_get_write_limit(
 891         struct bufferevent_rate_limit_group *grp)
 892 {
 893         ev_ssize_t r;
 894         LOCK_GROUP(grp);
 895         r = grp->rate_limit.write_limit;
 896         UNLOCK_GROUP(grp);
 897         return r;
 898 }
 899 
 900 int
 901 bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
 902 {
 903         int r = 0;
 904         ev_ssize_t old_limit, new_limit;
 905         struct bufferevent_private *bevp;
 906         BEV_LOCK(bev);
 907         bevp = BEV_UPCAST(bev);
 908         EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
 909         old_limit = bevp->rate_limiting->limit.read_limit;
 910 
 911         new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
 912         if (old_limit > 0 && new_limit <= 0) {
 913                 bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
 914                 if (event_add(&bevp->rate_limiting->refill_bucket_event,
 915                         &bevp->rate_limiting->cfg->tick_timeout) < 0)
 916                         r = -1;
 917         } else if (old_limit <= 0 && new_limit > 0) {
 918                 if (!(bevp->write_suspended & BEV_SUSPEND_BW))
 919                         event_del(&bevp->rate_limiting->refill_bucket_event);
 920                 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
 921         }
 922 
 923         BEV_UNLOCK(bev);
 924         return r;
 925 }
 926 
 927 int
 928 bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
 929 {
 930         /* XXXX this is mostly copy-and-paste from
 931          * bufferevent_decrement_read_limit */
 932         int r = 0;
 933         ev_ssize_t old_limit, new_limit;
 934         struct bufferevent_private *bevp;
 935         BEV_LOCK(bev);
 936         bevp = BEV_UPCAST(bev);
 937         EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
 938         old_limit = bevp->rate_limiting->limit.write_limit;
 939 
 940         new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
 941         if (old_limit > 0 && new_limit <= 0) {
 942                 bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
 943                 if (event_add(&bevp->rate_limiting->refill_bucket_event,
 944                         &bevp->rate_limiting->cfg->tick_timeout) < 0)
 945                         r = -1;
 946         } else if (old_limit <= 0 && new_limit > 0) {
 947                 if (!(bevp->read_suspended & BEV_SUSPEND_BW))
 948                         event_del(&bevp->rate_limiting->refill_bucket_event);
 949                 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
 950         }
 951 
 952         BEV_UNLOCK(bev);
 953         return r;
 954 }
 955 
 956 int
 957 bufferevent_rate_limit_group_decrement_read(
 958         struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
 959 {
 960         int r = 0;
 961         ev_ssize_t old_limit, new_limit;
 962         LOCK_GROUP(grp);
 963         old_limit = grp->rate_limit.read_limit;
 964         new_limit = (grp->rate_limit.read_limit -= decr);
 965 
 966         if (old_limit > 0 && new_limit <= 0) {
 967                 _bev_group_suspend_reading(grp);
 968         } else if (old_limit <= 0 && new_limit > 0) {
 969                 _bev_group_unsuspend_reading(grp);
 970         }
 971 
 972         UNLOCK_GROUP(grp);
 973         return r;
 974 }
 975 
 976 int
 977 bufferevent_rate_limit_group_decrement_write(
 978         struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
 979 {
 980         int r = 0;
 981         ev_ssize_t old_limit, new_limit;
 982         LOCK_GROUP(grp);
 983         old_limit = grp->rate_limit.write_limit;
 984         new_limit = (grp->rate_limit.write_limit -= decr);
 985 
 986         if (old_limit > 0 && new_limit <= 0) {
 987                 _bev_group_suspend_writing(grp);
 988         } else if (old_limit <= 0 && new_limit > 0) {
 989                 _bev_group_unsuspend_writing(grp);
 990         }
 991 
 992         UNLOCK_GROUP(grp);
 993         return r;
 994 }
 995 
 996 void
 997 bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
 998     ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
 999 {
1000         EVUTIL_ASSERT(grp != NULL);
1001         if (total_read_out)
1002                 *total_read_out = grp->total_read;
1003         if (total_written_out)
1004                 *total_written_out = grp->total_written;
1005 }
1006 
1007 void
1008 bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
1009 {
1010         grp->total_read = grp->total_written = 0;
1011 }

/* [<][>][^][v][top][bottom][index][help] */