root/opal/mca/event/libevent2022/libevent/bufferevent.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. bufferevent_suspend_read
  2. bufferevent_unsuspend_read
  3. bufferevent_suspend_write
  4. bufferevent_unsuspend_write
  5. bufferevent_inbuf_wm_cb
  6. bufferevent_run_deferred_callbacks_locked
  7. bufferevent_run_deferred_callbacks_unlocked
  8. _bufferevent_run_readcb
  9. _bufferevent_run_writecb
  10. _bufferevent_run_eventcb
  11. bufferevent_init_common
  12. bufferevent_setcb
  13. bufferevent_get_input
  14. bufferevent_get_output
  15. bufferevent_get_base
  16. bufferevent_write
  17. bufferevent_write_buffer
  18. bufferevent_read
  19. bufferevent_read_buffer
  20. bufferevent_enable
  21. bufferevent_set_timeouts
  22. bufferevent_settimeout
  23. bufferevent_disable_hard
  24. bufferevent_disable
  25. bufferevent_setwatermark
  26. bufferevent_flush
  27. _bufferevent_incref_and_lock
  28. _bufferevent_transfer_lock_ownership
  29. _bufferevent_decref_and_unlock
  30. bufferevent_decref
  31. bufferevent_free
  32. bufferevent_incref
  33. bufferevent_enable_locking
  34. bufferevent_setfd
  35. bufferevent_getfd
  36. _bufferevent_cancel_all
  37. bufferevent_get_enabled
  38. bufferevent_get_underlying
  39. bufferevent_generic_read_timeout_cb
  40. bufferevent_generic_write_timeout_cb
  41. _bufferevent_init_generic_timeout_cbs
  42. _bufferevent_del_generic_timeout_cbs
  43. _bufferevent_generic_adj_timeouts
  44. _bufferevent_add_event
  45. bufferevent_lock
  46. bufferevent_unlock

   1 /*
   2  * Copyright (c) 2002-2007 Niels Provos <provos@citi.umich.edu>
   3  * Copyright (c) 2007-2012 Niels Provos, Nick Mathewson
   4  *
   5  * Redistribution and use in source and binary forms, with or without
   6  * modification, are permitted provided that the following conditions
   7  * are met:
   8  * 1. Redistributions of source code must retain the above copyright
   9  *    notice, this list of conditions and the following disclaimer.
  10  * 2. Redistributions in binary form must reproduce the above copyright
  11  *    notice, this list of conditions and the following disclaimer in the
  12  *    documentation and/or other materials provided with the distribution.
  13  * 3. The name of the author may not be used to endorse or promote products
  14  *    derived from this software without specific prior written permission.
  15  *
  16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
  17  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
  18  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
  19  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
  20  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
  21  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  22  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  23  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  24  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
  25  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  26  */
  27 
  28 #include <sys/types.h>
  29 
  30 #include "event2/event-config.h"
  31 
  32 #ifdef _EVENT_HAVE_SYS_TIME_H
  33 #include <sys/time.h>
  34 #endif
  35 
  36 #include <errno.h>
  37 #include <stdio.h>
  38 #include <stdlib.h>
  39 #include <string.h>
  40 #ifdef _EVENT_HAVE_STDARG_H
  41 #include <stdarg.h>
  42 #endif
  43 
  44 #ifdef WIN32
  45 #include <winsock2.h>
  46 #endif
  47 #include <errno.h>
  48 
  49 #include "event2/util.h"
  50 #include "event2/buffer.h"
  51 #include "event2/buffer_compat.h"
  52 #include "event2/bufferevent.h"
  53 #include "event2/bufferevent_struct.h"
  54 #include "event2/bufferevent_compat.h"
  55 #include "event2/event.h"
  56 #include "log-internal.h"
  57 #include "mm-internal.h"
  58 #include "bufferevent-internal.h"
  59 #include "evbuffer-internal.h"
  60 #include "util-internal.h"
  61 
  62 static void _bufferevent_cancel_all(struct bufferevent *bev);
  63 
  64 
  65 void
  66 bufferevent_suspend_read(struct bufferevent *bufev, bufferevent_suspend_flags what)
  67 {
  68         struct bufferevent_private *bufev_private =
  69             EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
  70         BEV_LOCK(bufev);
  71         if (!bufev_private->read_suspended)
  72                 bufev->be_ops->disable(bufev, EV_READ);
  73         bufev_private->read_suspended |= what;
  74         BEV_UNLOCK(bufev);
  75 }
  76 
  77 void
  78 bufferevent_unsuspend_read(struct bufferevent *bufev, bufferevent_suspend_flags what)
  79 {
  80         struct bufferevent_private *bufev_private =
  81             EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
  82         BEV_LOCK(bufev);
  83         bufev_private->read_suspended &= ~what;
  84         if (!bufev_private->read_suspended && (bufev->enabled & EV_READ))
  85                 bufev->be_ops->enable(bufev, EV_READ);
  86         BEV_UNLOCK(bufev);
  87 }
  88 
  89 void
  90 bufferevent_suspend_write(struct bufferevent *bufev, bufferevent_suspend_flags what)
  91 {
  92         struct bufferevent_private *bufev_private =
  93             EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
  94         BEV_LOCK(bufev);
  95         if (!bufev_private->write_suspended)
  96                 bufev->be_ops->disable(bufev, EV_WRITE);
  97         bufev_private->write_suspended |= what;
  98         BEV_UNLOCK(bufev);
  99 }
 100 
 101 void
 102 bufferevent_unsuspend_write(struct bufferevent *bufev, bufferevent_suspend_flags what)
 103 {
 104         struct bufferevent_private *bufev_private =
 105             EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
 106         BEV_LOCK(bufev);
 107         bufev_private->write_suspended &= ~what;
 108         if (!bufev_private->write_suspended && (bufev->enabled & EV_WRITE))
 109                 bufev->be_ops->enable(bufev, EV_WRITE);
 110         BEV_UNLOCK(bufev);
 111 }
 112 
 113 
 114 /* Callback to implement watermarks on the input buffer.  Only enabled
 115  * if the watermark is set. */
 116 static void
 117 bufferevent_inbuf_wm_cb(struct evbuffer *buf,
 118     const struct evbuffer_cb_info *cbinfo,
 119     void *arg)
 120 {
 121         struct bufferevent *bufev = arg;
 122         size_t size;
 123 
 124         size = evbuffer_get_length(buf);
 125 
 126         if (size >= bufev->wm_read.high)
 127                 bufferevent_wm_suspend_read(bufev);
 128         else
 129                 bufferevent_wm_unsuspend_read(bufev);
 130 }
 131 
 132 static void
 133 bufferevent_run_deferred_callbacks_locked(struct deferred_cb *_, void *arg)
 134 {
 135         struct bufferevent_private *bufev_private = arg;
 136         struct bufferevent *bufev = &bufev_private->bev;
 137 
 138         BEV_LOCK(bufev);
 139         if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) &&
 140             bufev->errorcb) {
 141                 /* The "connected" happened before any reads or writes, so
 142                    send it first. */
 143                 bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED;
 144                 bufev->errorcb(bufev, BEV_EVENT_CONNECTED, bufev->cbarg);
 145         }
 146         if (bufev_private->readcb_pending && bufev->readcb) {
 147                 bufev_private->readcb_pending = 0;
 148                 bufev->readcb(bufev, bufev->cbarg);
 149         }
 150         if (bufev_private->writecb_pending && bufev->writecb) {
 151                 bufev_private->writecb_pending = 0;
 152                 bufev->writecb(bufev, bufev->cbarg);
 153         }
 154         if (bufev_private->eventcb_pending && bufev->errorcb) {
 155                 short what = bufev_private->eventcb_pending;
 156                 int err = bufev_private->errno_pending;
 157                 bufev_private->eventcb_pending = 0;
 158                 bufev_private->errno_pending = 0;
 159                 EVUTIL_SET_SOCKET_ERROR(err);
 160                 bufev->errorcb(bufev, what, bufev->cbarg);
 161         }
 162         _bufferevent_decref_and_unlock(bufev);
 163 }
 164 
 165 static void
 166 bufferevent_run_deferred_callbacks_unlocked(struct deferred_cb *_, void *arg)
 167 {
 168         struct bufferevent_private *bufev_private = arg;
 169         struct bufferevent *bufev = &bufev_private->bev;
 170 
 171         BEV_LOCK(bufev);
 172 #define UNLOCKED(stmt) \
 173         do { BEV_UNLOCK(bufev); stmt; BEV_LOCK(bufev); } while(0)
 174 
 175         if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) &&
 176             bufev->errorcb) {
 177                 /* The "connected" happened before any reads or writes, so
 178                    send it first. */
 179                 bufferevent_event_cb errorcb = bufev->errorcb;
 180                 void *cbarg = bufev->cbarg;
 181                 bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED;
 182                 UNLOCKED(errorcb(bufev, BEV_EVENT_CONNECTED, cbarg));
 183         }
 184         if (bufev_private->readcb_pending && bufev->readcb) {
 185                 bufferevent_data_cb readcb = bufev->readcb;
 186                 void *cbarg = bufev->cbarg;
 187                 bufev_private->readcb_pending = 0;
 188                 UNLOCKED(readcb(bufev, cbarg));
 189         }
 190         if (bufev_private->writecb_pending && bufev->writecb) {
 191                 bufferevent_data_cb writecb = bufev->writecb;
 192                 void *cbarg = bufev->cbarg;
 193                 bufev_private->writecb_pending = 0;
 194                 UNLOCKED(writecb(bufev, cbarg));
 195         }
 196         if (bufev_private->eventcb_pending && bufev->errorcb) {
 197                 bufferevent_event_cb errorcb = bufev->errorcb;
 198                 void *cbarg = bufev->cbarg;
 199                 short what = bufev_private->eventcb_pending;
 200                 int err = bufev_private->errno_pending;
 201                 bufev_private->eventcb_pending = 0;
 202                 bufev_private->errno_pending = 0;
 203                 EVUTIL_SET_SOCKET_ERROR(err);
 204                 UNLOCKED(errorcb(bufev,what,cbarg));
 205         }
 206         _bufferevent_decref_and_unlock(bufev);
 207 #undef UNLOCKED
 208 }
 209 
 210 #define SCHEDULE_DEFERRED(bevp)                                         \
 211         do {                                                            \
 212                 bufferevent_incref(&(bevp)->bev);                       \
 213                 event_deferred_cb_schedule(                             \
 214                         event_base_get_deferred_cb_queue((bevp)->bev.ev_base), \
 215                         &(bevp)->deferred);                             \
 216         } while (0)
 217 
 218 
 219 void
 220 _bufferevent_run_readcb(struct bufferevent *bufev)
 221 {
 222         /* Requires that we hold the lock and a reference */
 223         struct bufferevent_private *p =
 224             EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
 225         if (bufev->readcb == NULL)
 226                 return;
 227         if (p->options & BEV_OPT_DEFER_CALLBACKS) {
 228                 p->readcb_pending = 1;
 229                 if (!p->deferred.queued)
 230                         SCHEDULE_DEFERRED(p);
 231         } else {
 232                 bufev->readcb(bufev, bufev->cbarg);
 233         }
 234 }
 235 
 236 void
 237 _bufferevent_run_writecb(struct bufferevent *bufev)
 238 {
 239         /* Requires that we hold the lock and a reference */
 240         struct bufferevent_private *p =
 241             EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
 242         if (bufev->writecb == NULL)
 243                 return;
 244         if (p->options & BEV_OPT_DEFER_CALLBACKS) {
 245                 p->writecb_pending = 1;
 246                 if (!p->deferred.queued)
 247                         SCHEDULE_DEFERRED(p);
 248         } else {
 249                 bufev->writecb(bufev, bufev->cbarg);
 250         }
 251 }
 252 
 253 void
 254 _bufferevent_run_eventcb(struct bufferevent *bufev, short what)
 255 {
 256         /* Requires that we hold the lock and a reference */
 257         struct bufferevent_private *p =
 258             EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
 259         if (bufev->errorcb == NULL)
 260                 return;
 261         if (p->options & BEV_OPT_DEFER_CALLBACKS) {
 262                 p->eventcb_pending |= what;
 263                 p->errno_pending = EVUTIL_SOCKET_ERROR();
 264                 if (!p->deferred.queued)
 265                         SCHEDULE_DEFERRED(p);
 266         } else {
 267                 bufev->errorcb(bufev, what, bufev->cbarg);
 268         }
 269 }
 270 
 271 int
 272 bufferevent_init_common(struct bufferevent_private *bufev_private,
 273     struct event_base *base,
 274     const struct bufferevent_ops *ops,
 275     enum bufferevent_options options)
 276 {
 277         struct bufferevent *bufev = &bufev_private->bev;
 278 
 279         if (!bufev->input) {
 280                 if ((bufev->input = evbuffer_new()) == NULL)
 281                         return -1;
 282         }
 283 
 284         if (!bufev->output) {
 285                 if ((bufev->output = evbuffer_new()) == NULL) {
 286                         evbuffer_free(bufev->input);
 287                         return -1;
 288                 }
 289         }
 290 
 291         bufev_private->refcnt = 1;
 292         bufev->ev_base = base;
 293 
 294         /* Disable timeouts. */
 295         evutil_timerclear(&bufev->timeout_read);
 296         evutil_timerclear(&bufev->timeout_write);
 297 
 298         bufev->be_ops = ops;
 299 
 300         /*
 301          * Set to EV_WRITE so that using bufferevent_write is going to
 302          * trigger a callback.  Reading needs to be explicitly enabled
 303          * because otherwise no data will be available.
 304          */
 305         bufev->enabled = EV_WRITE;
 306 
 307 #ifndef _EVENT_DISABLE_THREAD_SUPPORT
 308         if (options & BEV_OPT_THREADSAFE) {
 309                 if (bufferevent_enable_locking(bufev, NULL) < 0) {
 310                         /* cleanup */
 311                         evbuffer_free(bufev->input);
 312                         evbuffer_free(bufev->output);
 313                         bufev->input = NULL;
 314                         bufev->output = NULL;
 315                         return -1;
 316                 }
 317         }
 318 #endif
 319         if ((options & (BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS))
 320             == BEV_OPT_UNLOCK_CALLBACKS) {
 321                 event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS");
 322                 return -1;
 323         }
 324         if (options & BEV_OPT_DEFER_CALLBACKS) {
 325                 if (options & BEV_OPT_UNLOCK_CALLBACKS)
 326                         event_deferred_cb_init(&bufev_private->deferred,
 327                             bufferevent_run_deferred_callbacks_unlocked,
 328                             bufev_private);
 329                 else
 330                         event_deferred_cb_init(&bufev_private->deferred,
 331                             bufferevent_run_deferred_callbacks_locked,
 332                             bufev_private);
 333         }
 334 
 335         bufev_private->options = options;
 336 
 337         evbuffer_set_parent(bufev->input, bufev);
 338         evbuffer_set_parent(bufev->output, bufev);
 339 
 340         return 0;
 341 }
 342 
 343 void
 344 bufferevent_setcb(struct bufferevent *bufev,
 345     bufferevent_data_cb readcb, bufferevent_data_cb writecb,
 346     bufferevent_event_cb eventcb, void *cbarg)
 347 {
 348         BEV_LOCK(bufev);
 349 
 350         bufev->readcb = readcb;
 351         bufev->writecb = writecb;
 352         bufev->errorcb = eventcb;
 353 
 354         bufev->cbarg = cbarg;
 355         BEV_UNLOCK(bufev);
 356 }
 357 
 358 struct evbuffer *
 359 bufferevent_get_input(struct bufferevent *bufev)
 360 {
 361         return bufev->input;
 362 }
 363 
 364 struct evbuffer *
 365 bufferevent_get_output(struct bufferevent *bufev)
 366 {
 367         return bufev->output;
 368 }
 369 
 370 struct event_base *
 371 bufferevent_get_base(struct bufferevent *bufev)
 372 {
 373         return bufev->ev_base;
 374 }
 375 
 376 int
 377 bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
 378 {
 379         if (evbuffer_add(bufev->output, data, size) == -1)
 380                 return (-1);
 381 
 382         return 0;
 383 }
 384 
 385 int
 386 bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
 387 {
 388         if (evbuffer_add_buffer(bufev->output, buf) == -1)
 389                 return (-1);
 390 
 391         return 0;
 392 }
 393 
 394 size_t
 395 bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
 396 {
 397         return (evbuffer_remove(bufev->input, data, size));
 398 }
 399 
 400 int
 401 bufferevent_read_buffer(struct bufferevent *bufev, struct evbuffer *buf)
 402 {
 403         return (evbuffer_add_buffer(buf, bufev->input));
 404 }
 405 
 406 int
 407 bufferevent_enable(struct bufferevent *bufev, short event)
 408 {
 409         struct bufferevent_private *bufev_private =
 410             EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
 411         short impl_events = event;
 412         int r = 0;
 413 
 414         _bufferevent_incref_and_lock(bufev);
 415         if (bufev_private->read_suspended)
 416                 impl_events &= ~EV_READ;
 417         if (bufev_private->write_suspended)
 418                 impl_events &= ~EV_WRITE;
 419 
 420         bufev->enabled |= event;
 421 
 422         if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0)
 423                 r = -1;
 424 
 425         _bufferevent_decref_and_unlock(bufev);
 426         return r;
 427 }
 428 
 429 int
 430 bufferevent_set_timeouts(struct bufferevent *bufev,
 431                          const struct timeval *tv_read,
 432                          const struct timeval *tv_write)
 433 {
 434         int r = 0;
 435         BEV_LOCK(bufev);
 436         if (tv_read) {
 437                 bufev->timeout_read = *tv_read;
 438         } else {
 439                 evutil_timerclear(&bufev->timeout_read);
 440         }
 441         if (tv_write) {
 442                 bufev->timeout_write = *tv_write;
 443         } else {
 444                 evutil_timerclear(&bufev->timeout_write);
 445         }
 446 
 447         if (bufev->be_ops->adj_timeouts)
 448                 r = bufev->be_ops->adj_timeouts(bufev);
 449         BEV_UNLOCK(bufev);
 450 
 451         return r;
 452 }
 453 
 454 
 455 /* Obsolete; use bufferevent_set_timeouts */
 456 void
 457 bufferevent_settimeout(struct bufferevent *bufev,
 458                        int timeout_read, int timeout_write)
 459 {
 460         struct timeval tv_read, tv_write;
 461         struct timeval *ptv_read = NULL, *ptv_write = NULL;
 462 
 463         memset(&tv_read, 0, sizeof(tv_read));
 464         memset(&tv_write, 0, sizeof(tv_write));
 465 
 466         if (timeout_read) {
 467                 tv_read.tv_sec = timeout_read;
 468                 ptv_read = &tv_read;
 469         }
 470         if (timeout_write) {
 471                 tv_write.tv_sec = timeout_write;
 472                 ptv_write = &tv_write;
 473         }
 474 
 475         bufferevent_set_timeouts(bufev, ptv_read, ptv_write);
 476 }
 477 
 478 
 479 int
 480 bufferevent_disable_hard(struct bufferevent *bufev, short event)
 481 {
 482         int r = 0;
 483         struct bufferevent_private *bufev_private =
 484             EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
 485 
 486         BEV_LOCK(bufev);
 487         bufev->enabled &= ~event;
 488 
 489         bufev_private->connecting = 0;
 490         if (bufev->be_ops->disable(bufev, event) < 0)
 491                 r = -1;
 492 
 493         BEV_UNLOCK(bufev);
 494         return r;
 495 }
 496 
 497 int
 498 bufferevent_disable(struct bufferevent *bufev, short event)
 499 {
 500         int r = 0;
 501 
 502         BEV_LOCK(bufev);
 503         bufev->enabled &= ~event;
 504 
 505         if (bufev->be_ops->disable(bufev, event) < 0)
 506                 r = -1;
 507 
 508         BEV_UNLOCK(bufev);
 509         return r;
 510 }
 511 
 512 /*
 513  * Sets the water marks
 514  */
 515 
 516 void
 517 bufferevent_setwatermark(struct bufferevent *bufev, short events,
 518     size_t lowmark, size_t highmark)
 519 {
 520         struct bufferevent_private *bufev_private =
 521             EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
 522 
 523         BEV_LOCK(bufev);
 524         if (events & EV_WRITE) {
 525                 bufev->wm_write.low = lowmark;
 526                 bufev->wm_write.high = highmark;
 527         }
 528 
 529         if (events & EV_READ) {
 530                 bufev->wm_read.low = lowmark;
 531                 bufev->wm_read.high = highmark;
 532 
 533                 if (highmark) {
 534                         /* There is now a new high-water mark for read.
 535                            enable the callback if needed, and see if we should
 536                            suspend/bufferevent_wm_unsuspend. */
 537 
 538                         if (bufev_private->read_watermarks_cb == NULL) {
 539                                 bufev_private->read_watermarks_cb =
 540                                     evbuffer_add_cb(bufev->input,
 541                                                     bufferevent_inbuf_wm_cb,
 542                                                     bufev);
 543                         }
 544                         evbuffer_cb_set_flags(bufev->input,
 545                                       bufev_private->read_watermarks_cb,
 546                                       EVBUFFER_CB_ENABLED|EVBUFFER_CB_NODEFER);
 547 
 548                         if (evbuffer_get_length(bufev->input) >= highmark)
 549                                 bufferevent_wm_suspend_read(bufev);
 550                         else if (evbuffer_get_length(bufev->input) < highmark)
 551                                 bufferevent_wm_unsuspend_read(bufev);
 552                 } else {
 553                         /* There is now no high-water mark for read. */
 554                         if (bufev_private->read_watermarks_cb)
 555                                 evbuffer_cb_clear_flags(bufev->input,
 556                                     bufev_private->read_watermarks_cb,
 557                                     EVBUFFER_CB_ENABLED);
 558                         bufferevent_wm_unsuspend_read(bufev);
 559                 }
 560         }
 561         BEV_UNLOCK(bufev);
 562 }
 563 
 564 int
 565 bufferevent_flush(struct bufferevent *bufev,
 566     short iotype,
 567     enum bufferevent_flush_mode mode)
 568 {
 569         int r = -1;
 570         BEV_LOCK(bufev);
 571         if (bufev->be_ops->flush)
 572                 r = bufev->be_ops->flush(bufev, iotype, mode);
 573         BEV_UNLOCK(bufev);
 574         return r;
 575 }
 576 
 577 void
 578 _bufferevent_incref_and_lock(struct bufferevent *bufev)
 579 {
 580         struct bufferevent_private *bufev_private =
 581             BEV_UPCAST(bufev);
 582         BEV_LOCK(bufev);
 583         ++bufev_private->refcnt;
 584 }
 585 
 586 #if 0
 587 static void
 588 _bufferevent_transfer_lock_ownership(struct bufferevent *donor,
 589     struct bufferevent *recipient)
 590 {
 591         struct bufferevent_private *d = BEV_UPCAST(donor);
 592         struct bufferevent_private *r = BEV_UPCAST(recipient);
 593         if (d->lock != r->lock)
 594                 return;
 595         if (r->own_lock)
 596                 return;
 597         if (d->own_lock) {
 598                 d->own_lock = 0;
 599                 r->own_lock = 1;
 600         }
 601 }
 602 #endif
 603 
 604 int
 605 _bufferevent_decref_and_unlock(struct bufferevent *bufev)
 606 {
 607         struct bufferevent_private *bufev_private =
 608             EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
 609         struct bufferevent *underlying;
 610 
 611         EVUTIL_ASSERT(bufev_private->refcnt > 0);
 612 
 613         if (--bufev_private->refcnt) {
 614                 BEV_UNLOCK(bufev);
 615                 return 0;
 616         }
 617 
 618         underlying = bufferevent_get_underlying(bufev);
 619 
 620         /* Clean up the shared info */
 621         if (bufev->be_ops->destruct)
 622                 bufev->be_ops->destruct(bufev);
 623 
 624         /* XXX what happens if refcnt for these buffers is > 1?
 625          * The buffers can share a lock with this bufferevent object,
 626          * but the lock might be destroyed below. */
 627         /* evbuffer will free the callbacks */
 628         evbuffer_free(bufev->input);
 629         evbuffer_free(bufev->output);
 630 
 631         if (bufev_private->rate_limiting) {
 632                 if (bufev_private->rate_limiting->group)
 633                         bufferevent_remove_from_rate_limit_group_internal(bufev,0);
 634                 if (event_initialized(&bufev_private->rate_limiting->refill_bucket_event))
 635                         event_del(&bufev_private->rate_limiting->refill_bucket_event);
 636                 event_debug_unassign(&bufev_private->rate_limiting->refill_bucket_event);
 637                 mm_free(bufev_private->rate_limiting);
 638                 bufev_private->rate_limiting = NULL;
 639         }
 640 
 641         event_debug_unassign(&bufev->ev_read);
 642         event_debug_unassign(&bufev->ev_write);
 643 
 644         BEV_UNLOCK(bufev);
 645         if (bufev_private->own_lock)
 646                 EVTHREAD_FREE_LOCK(bufev_private->lock,
 647                     EVTHREAD_LOCKTYPE_RECURSIVE);
 648 
 649         /* Free the actual allocated memory. */
 650         mm_free(((char*)bufev) - bufev->be_ops->mem_offset);
 651 
 652         /* Release the reference to underlying now that we no longer need the
 653          * reference to it.  We wait this long mainly in case our lock is
 654          * shared with underlying.
 655          *
 656          * The 'destruct' function will also drop a reference to underlying
 657          * if BEV_OPT_CLOSE_ON_FREE is set.
 658          *
 659          * XXX Should we/can we just refcount evbuffer/bufferevent locks?
 660          * It would probably save us some headaches.
 661          */
 662         if (underlying)
 663                 bufferevent_decref(underlying);
 664 
 665         return 1;
 666 }
 667 
 668 int
 669 bufferevent_decref(struct bufferevent *bufev)
 670 {
 671         BEV_LOCK(bufev);
 672         return _bufferevent_decref_and_unlock(bufev);
 673 }
 674 
 675 void
 676 bufferevent_free(struct bufferevent *bufev)
 677 {
 678         BEV_LOCK(bufev);
 679         bufferevent_setcb(bufev, NULL, NULL, NULL, NULL);
 680         _bufferevent_cancel_all(bufev);
 681         _bufferevent_decref_and_unlock(bufev);
 682 }
 683 
 684 void
 685 bufferevent_incref(struct bufferevent *bufev)
 686 {
 687         struct bufferevent_private *bufev_private =
 688             EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
 689 
 690         BEV_LOCK(bufev);
 691         ++bufev_private->refcnt;
 692         BEV_UNLOCK(bufev);
 693 }
 694 
 695 int
 696 bufferevent_enable_locking(struct bufferevent *bufev, void *lock)
 697 {
 698 #ifdef _EVENT_DISABLE_THREAD_SUPPORT
 699         return -1;
 700 #else
 701         struct bufferevent *underlying;
 702 
 703         if (BEV_UPCAST(bufev)->lock)
 704                 return -1;
 705         underlying = bufferevent_get_underlying(bufev);
 706 
 707         if (!lock && underlying && BEV_UPCAST(underlying)->lock) {
 708                 lock = BEV_UPCAST(underlying)->lock;
 709                 BEV_UPCAST(bufev)->lock = lock;
 710                 BEV_UPCAST(bufev)->own_lock = 0;
 711         } else if (!lock) {
 712                 EVTHREAD_ALLOC_LOCK(lock, EVTHREAD_LOCKTYPE_RECURSIVE);
 713                 if (!lock)
 714                         return -1;
 715                 BEV_UPCAST(bufev)->lock = lock;
 716                 BEV_UPCAST(bufev)->own_lock = 1;
 717         } else {
 718                 BEV_UPCAST(bufev)->lock = lock;
 719                 BEV_UPCAST(bufev)->own_lock = 0;
 720         }
 721         evbuffer_enable_locking(bufev->input, lock);
 722         evbuffer_enable_locking(bufev->output, lock);
 723 
 724         if (underlying && !BEV_UPCAST(underlying)->lock)
 725                 bufferevent_enable_locking(underlying, lock);
 726 
 727         return 0;
 728 #endif
 729 }
 730 
 731 int
 732 bufferevent_setfd(struct bufferevent *bev, evutil_socket_t fd)
 733 {
 734         union bufferevent_ctrl_data d;
 735         int res = -1;
 736         d.fd = fd;
 737         BEV_LOCK(bev);
 738         if (bev->be_ops->ctrl)
 739                 res = bev->be_ops->ctrl(bev, BEV_CTRL_SET_FD, &d);
 740         BEV_UNLOCK(bev);
 741         return res;
 742 }
 743 
 744 evutil_socket_t
 745 bufferevent_getfd(struct bufferevent *bev)
 746 {
 747         union bufferevent_ctrl_data d;
 748         int res = -1;
 749         d.fd = -1;
 750         BEV_LOCK(bev);
 751         if (bev->be_ops->ctrl)
 752                 res = bev->be_ops->ctrl(bev, BEV_CTRL_GET_FD, &d);
 753         BEV_UNLOCK(bev);
 754         return (res<0) ? -1 : d.fd;
 755 }
 756 
 757 static void
 758 _bufferevent_cancel_all(struct bufferevent *bev)
 759 {
 760         union bufferevent_ctrl_data d;
 761         memset(&d, 0, sizeof(d));
 762         BEV_LOCK(bev);
 763         if (bev->be_ops->ctrl)
 764                 bev->be_ops->ctrl(bev, BEV_CTRL_CANCEL_ALL, &d);
 765         BEV_UNLOCK(bev);
 766 }
 767 
 768 short
 769 bufferevent_get_enabled(struct bufferevent *bufev)
 770 {
 771         short r;
 772         BEV_LOCK(bufev);
 773         r = bufev->enabled;
 774         BEV_UNLOCK(bufev);
 775         return r;
 776 }
 777 
 778 struct bufferevent *
 779 bufferevent_get_underlying(struct bufferevent *bev)
 780 {
 781         union bufferevent_ctrl_data d;
 782         int res = -1;
 783         d.ptr = NULL;
 784         BEV_LOCK(bev);
 785         if (bev->be_ops->ctrl)
 786                 res = bev->be_ops->ctrl(bev, BEV_CTRL_GET_UNDERLYING, &d);
 787         BEV_UNLOCK(bev);
 788         return (res<0) ? NULL : d.ptr;
 789 }
 790 
 791 static void
 792 bufferevent_generic_read_timeout_cb(evutil_socket_t fd, short event, void *ctx)
 793 {
 794         struct bufferevent *bev = ctx;
 795         _bufferevent_incref_and_lock(bev);
 796         bufferevent_disable(bev, EV_READ);
 797         _bufferevent_run_eventcb(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_READING);
 798         _bufferevent_decref_and_unlock(bev);
 799 }
 800 static void
 801 bufferevent_generic_write_timeout_cb(evutil_socket_t fd, short event, void *ctx)
 802 {
 803         struct bufferevent *bev = ctx;
 804         _bufferevent_incref_and_lock(bev);
 805         bufferevent_disable(bev, EV_WRITE);
 806         _bufferevent_run_eventcb(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING);
 807         _bufferevent_decref_and_unlock(bev);
 808 }
 809 
 810 void
 811 _bufferevent_init_generic_timeout_cbs(struct bufferevent *bev)
 812 {
 813         evtimer_assign(&bev->ev_read, bev->ev_base,
 814             bufferevent_generic_read_timeout_cb, bev);
 815         evtimer_assign(&bev->ev_write, bev->ev_base,
 816             bufferevent_generic_write_timeout_cb, bev);
 817 }
 818 
 819 int
 820 _bufferevent_del_generic_timeout_cbs(struct bufferevent *bev)
 821 {
 822         int r1,r2;
 823         r1 = event_del(&bev->ev_read);
 824         r2 = event_del(&bev->ev_write);
 825         if (r1<0 || r2<0)
 826                 return -1;
 827         return 0;
 828 }
 829 
 830 int
 831 _bufferevent_generic_adj_timeouts(struct bufferevent *bev)
 832 {
 833         const short enabled = bev->enabled;
 834         struct bufferevent_private *bev_p =
 835             EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
 836         int r1=0, r2=0;
 837         if ((enabled & EV_READ) && !bev_p->read_suspended &&
 838             evutil_timerisset(&bev->timeout_read))
 839                 r1 = event_add(&bev->ev_read, &bev->timeout_read);
 840         else
 841                 r1 = event_del(&bev->ev_read);
 842 
 843         if ((enabled & EV_WRITE) && !bev_p->write_suspended &&
 844             evutil_timerisset(&bev->timeout_write) &&
 845             evbuffer_get_length(bev->output))
 846                 r2 = event_add(&bev->ev_write, &bev->timeout_write);
 847         else
 848                 r2 = event_del(&bev->ev_write);
 849         if (r1 < 0 || r2 < 0)
 850                 return -1;
 851         return 0;
 852 }
 853 
 854 int
 855 _bufferevent_add_event(struct event *ev, const struct timeval *tv)
 856 {
 857         if (tv->tv_sec == 0 && tv->tv_usec == 0)
 858                 return event_add(ev, NULL);
 859         else
 860                 return event_add(ev, tv);
 861 }
 862 
 863 /* For use by user programs only; internally, we should be calling
 864    either _bufferevent_incref_and_lock(), or BEV_LOCK. */
 865 void
 866 bufferevent_lock(struct bufferevent *bev)
 867 {
 868         _bufferevent_incref_and_lock(bev);
 869 }
 870 
 871 void
 872 bufferevent_unlock(struct bufferevent *bev)
 873 {
 874         _bufferevent_decref_and_unlock(bev);
 875 }

/* [<][>][^][v][top][bottom][index][help] */