root/opal/mca/event/libevent2022/libevent/bufferevent_filter.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. upcast
  2. be_underlying_writebuf_full
  3. be_readbuf_full
  4. be_null_filter
  5. bufferevent_filter_new
  6. be_filter_destruct
  7. be_filter_enable
  8. be_filter_disable
  9. be_filter_process_input
  10. be_filter_process_output
  11. bufferevent_filtered_outbuf_cb
  12. be_filter_readcb
  13. be_filter_writecb
  14. be_filter_eventcb
  15. be_filter_flush
  16. be_filter_ctrl

   1 /*
   2  * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
   3  * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
   4  * All rights reserved.
   5  *
   6  * Redistribution and use in source and binary forms, with or without
   7  * modification, are permitted provided that the following conditions
   8  * are met:
   9  * 1. Redistributions of source code must retain the above copyright
  10  *    notice, this list of conditions and the following disclaimer.
  11  * 2. Redistributions in binary form must reproduce the above copyright
  12  *    notice, this list of conditions and the following disclaimer in the
  13  *    documentation and/or other materials provided with the distribution.
  14  * 3. The name of the author may not be used to endorse or promote products
  15  *    derived from this software without specific prior written permission.
  16  *
  17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
  18  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
  19  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
  20  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
  21  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
  22  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  23  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  24  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  25  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
  26  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  27  */
  28 
  29 #include <sys/types.h>
  30 
  31 #include "event2/event-config.h"
  32 
  33 #ifdef _EVENT_HAVE_SYS_TIME_H
  34 #include <sys/time.h>
  35 #endif
  36 
  37 #include <errno.h>
  38 #include <stdio.h>
  39 #include <stdlib.h>
  40 #include <string.h>
  41 #ifdef _EVENT_HAVE_STDARG_H
  42 #include <stdarg.h>
  43 #endif
  44 
  45 #ifdef WIN32
  46 #include <winsock2.h>
  47 #endif
  48 
  49 #include "event2/util.h"
  50 #include "event2/bufferevent.h"
  51 #include "event2/buffer.h"
  52 #include "event2/bufferevent_struct.h"
  53 #include "event2/event.h"
  54 #include "log-internal.h"
  55 #include "mm-internal.h"
  56 #include "bufferevent-internal.h"
  57 #include "util-internal.h"
  58 
  59 /* prototypes */
  60 static int be_filter_enable(struct bufferevent *, short);
  61 static int be_filter_disable(struct bufferevent *, short);
  62 static void be_filter_destruct(struct bufferevent *);
  63 
  64 static void be_filter_readcb(struct bufferevent *, void *);
  65 static void be_filter_writecb(struct bufferevent *, void *);
  66 static void be_filter_eventcb(struct bufferevent *, short, void *);
  67 static int be_filter_flush(struct bufferevent *bufev,
  68     short iotype, enum bufferevent_flush_mode mode);
  69 static int be_filter_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
  70 
  71 static void bufferevent_filtered_outbuf_cb(struct evbuffer *buf,
  72     const struct evbuffer_cb_info *info, void *arg);
  73 
  74 struct bufferevent_filtered {
  75         struct bufferevent_private bev;
  76 
  77         /** The bufferevent that we read/write filtered data from/to. */
  78         struct bufferevent *underlying;
  79         /** A callback on our outbuf to notice when somebody adds data */
  80         struct evbuffer_cb_entry *outbuf_cb;
  81         /** True iff we have received an EOF callback from the underlying
  82          * bufferevent. */
  83         unsigned got_eof;
  84 
  85         /** Function to free context when we're done. */
  86         void (*free_context)(void *);
  87         /** Input filter */
  88         bufferevent_filter_cb process_in;
  89         /** Output filter */
  90         bufferevent_filter_cb process_out;
  91         /** User-supplied argument to the filters. */
  92         void *context;
  93 };
  94 
  95 const struct bufferevent_ops bufferevent_ops_filter = {
  96         "filter",
  97         evutil_offsetof(struct bufferevent_filtered, bev.bev),
  98         be_filter_enable,
  99         be_filter_disable,
 100         be_filter_destruct,
 101         _bufferevent_generic_adj_timeouts,
 102         be_filter_flush,
 103         be_filter_ctrl,
 104 };
 105 
 106 /* Given a bufferevent that's really the bev filter of a bufferevent_filtered,
 107  * return that bufferevent_filtered. Returns NULL otherwise.*/
 108 static inline struct bufferevent_filtered *
 109 upcast(struct bufferevent *bev)
 110 {
 111         struct bufferevent_filtered *bev_f;
 112         if (bev->be_ops != &bufferevent_ops_filter)
 113                 return NULL;
 114         bev_f = (void*)( ((char*)bev) -
 115                          evutil_offsetof(struct bufferevent_filtered, bev.bev));
 116         EVUTIL_ASSERT(bev_f->bev.bev.be_ops == &bufferevent_ops_filter);
 117         return bev_f;
 118 }
 119 
 120 #define downcast(bev_f) (&(bev_f)->bev.bev)
 121 
 122 /** Return 1 iff bevf's underlying bufferevent's output buffer is at or
 123  * over its high watermark such that we should not write to it in a given
 124  * flush mode. */
 125 static int
 126 be_underlying_writebuf_full(struct bufferevent_filtered *bevf,
 127     enum bufferevent_flush_mode state)
 128 {
 129         struct bufferevent *u = bevf->underlying;
 130         return state == BEV_NORMAL &&
 131             u->wm_write.high &&
 132             evbuffer_get_length(u->output) >= u->wm_write.high;
 133 }
 134 
 135 /** Return 1 if our input buffer is at or over its high watermark such that we
 136  * should not write to it in a given flush mode. */
 137 static int
 138 be_readbuf_full(struct bufferevent_filtered *bevf,
 139     enum bufferevent_flush_mode state)
 140 {
 141         struct bufferevent *bufev = downcast(bevf);
 142         return state == BEV_NORMAL &&
 143             bufev->wm_read.high &&
 144             evbuffer_get_length(bufev->input) >= bufev->wm_read.high;
 145 }
 146 
 147 
 148 /* Filter to use when we're created with a NULL filter. */
 149 static enum bufferevent_filter_result
 150 be_null_filter(struct evbuffer *src, struct evbuffer *dst, ev_ssize_t lim,
 151                enum bufferevent_flush_mode state, void *ctx)
 152 {
 153         (void)state;
 154         if (evbuffer_remove_buffer(src, dst, lim) == 0)
 155                 return BEV_OK;
 156         else
 157                 return BEV_ERROR;
 158 }
 159 
 160 struct bufferevent *
 161 bufferevent_filter_new(struct bufferevent *underlying,
 162                        bufferevent_filter_cb input_filter,
 163                        bufferevent_filter_cb output_filter,
 164                        int options,
 165                        void (*free_context)(void *),
 166                        void *ctx)
 167 {
 168         struct bufferevent_filtered *bufev_f;
 169         int tmp_options = options & ~BEV_OPT_THREADSAFE;
 170 
 171         if (!underlying)
 172                 return NULL;
 173 
 174         if (!input_filter)
 175                 input_filter = be_null_filter;
 176         if (!output_filter)
 177                 output_filter = be_null_filter;
 178 
 179         bufev_f = mm_calloc(1, sizeof(struct bufferevent_filtered));
 180         if (!bufev_f)
 181                 return NULL;
 182 
 183         if (bufferevent_init_common(&bufev_f->bev, underlying->ev_base,
 184                                     &bufferevent_ops_filter, tmp_options) < 0) {
 185                 mm_free(bufev_f);
 186                 return NULL;
 187         }
 188         if (options & BEV_OPT_THREADSAFE) {
 189                 bufferevent_enable_locking(downcast(bufev_f), NULL);
 190         }
 191 
 192         bufev_f->underlying = underlying;
 193 
 194         bufev_f->process_in = input_filter;
 195         bufev_f->process_out = output_filter;
 196         bufev_f->free_context = free_context;
 197         bufev_f->context = ctx;
 198 
 199         bufferevent_setcb(bufev_f->underlying,
 200             be_filter_readcb, be_filter_writecb, be_filter_eventcb, bufev_f);
 201 
 202         bufev_f->outbuf_cb = evbuffer_add_cb(downcast(bufev_f)->output,
 203            bufferevent_filtered_outbuf_cb, bufev_f);
 204 
 205         _bufferevent_init_generic_timeout_cbs(downcast(bufev_f));
 206         bufferevent_incref(underlying);
 207 
 208         bufferevent_enable(underlying, EV_READ|EV_WRITE);
 209         bufferevent_suspend_read(underlying, BEV_SUSPEND_FILT_READ);
 210 
 211         return downcast(bufev_f);
 212 }
 213 
 214 static void
 215 be_filter_destruct(struct bufferevent *bev)
 216 {
 217         struct bufferevent_filtered *bevf = upcast(bev);
 218         EVUTIL_ASSERT(bevf);
 219         if (bevf->free_context)
 220                 bevf->free_context(bevf->context);
 221 
 222         if (bevf->bev.options & BEV_OPT_CLOSE_ON_FREE) {
 223                 /* Yes, there is also a decref in bufferevent_decref.
 224                  * That decref corresponds to the incref when we set
 225                  * underlying for the first time.  This decref is an
 226                  * extra one to remove the last reference.
 227                  */
 228                 if (BEV_UPCAST(bevf->underlying)->refcnt < 2) {
 229                         event_warnx("BEV_OPT_CLOSE_ON_FREE set on an "
 230                             "bufferevent with too few references");
 231                 } else {
 232                         bufferevent_free(bevf->underlying);
 233                 }
 234         } else {
 235                 if (bevf->underlying) {
 236                         if (bevf->underlying->errorcb == be_filter_eventcb)
 237                                 bufferevent_setcb(bevf->underlying,
 238                                     NULL, NULL, NULL, NULL);
 239                         bufferevent_unsuspend_read(bevf->underlying,
 240                             BEV_SUSPEND_FILT_READ);
 241                 }
 242         }
 243 
 244         _bufferevent_del_generic_timeout_cbs(bev);
 245 }
 246 
 247 static int
 248 be_filter_enable(struct bufferevent *bev, short event)
 249 {
 250         struct bufferevent_filtered *bevf = upcast(bev);
 251         if (event & EV_WRITE)
 252                 BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
 253 
 254         if (event & EV_READ) {
 255                 BEV_RESET_GENERIC_READ_TIMEOUT(bev);
 256                 bufferevent_unsuspend_read(bevf->underlying,
 257                     BEV_SUSPEND_FILT_READ);
 258         }
 259         return 0;
 260 }
 261 
 262 static int
 263 be_filter_disable(struct bufferevent *bev, short event)
 264 {
 265         struct bufferevent_filtered *bevf = upcast(bev);
 266         if (event & EV_WRITE)
 267                 BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
 268         if (event & EV_READ) {
 269                 BEV_DEL_GENERIC_READ_TIMEOUT(bev);
 270                 bufferevent_suspend_read(bevf->underlying,
 271                     BEV_SUSPEND_FILT_READ);
 272         }
 273         return 0;
 274 }
 275 
 276 static enum bufferevent_filter_result
 277 be_filter_process_input(struct bufferevent_filtered *bevf,
 278                         enum bufferevent_flush_mode state,
 279                         int *processed_out)
 280 {
 281         enum bufferevent_filter_result res;
 282         struct bufferevent *bev = downcast(bevf);
 283 
 284         if (state == BEV_NORMAL) {
 285                 /* If we're in 'normal' mode, don't urge data on the filter
 286                  * unless we're reading data and under our high-water mark.*/
 287                 if (!(bev->enabled & EV_READ) ||
 288                     be_readbuf_full(bevf, state))
 289                         return BEV_OK;
 290         }
 291 
 292         do {
 293                 ev_ssize_t limit = -1;
 294                 if (state == BEV_NORMAL && bev->wm_read.high)
 295                         limit = bev->wm_read.high -
 296                             evbuffer_get_length(bev->input);
 297 
 298                 res = bevf->process_in(bevf->underlying->input,
 299                     bev->input, limit, state, bevf->context);
 300 
 301                 if (res == BEV_OK)
 302                         *processed_out = 1;
 303         } while (res == BEV_OK &&
 304                  (bev->enabled & EV_READ) &&
 305                  evbuffer_get_length(bevf->underlying->input) &&
 306                  !be_readbuf_full(bevf, state));
 307 
 308         if (*processed_out)
 309                 BEV_RESET_GENERIC_READ_TIMEOUT(bev);
 310 
 311         return res;
 312 }
 313 
 314 
 315 static enum bufferevent_filter_result
 316 be_filter_process_output(struct bufferevent_filtered *bevf,
 317                          enum bufferevent_flush_mode state,
 318                          int *processed_out)
 319 {
 320         /* Requires references and lock: might call writecb */
 321         enum bufferevent_filter_result res = BEV_OK;
 322         struct bufferevent *bufev = downcast(bevf);
 323         int again = 0;
 324 
 325         if (state == BEV_NORMAL) {
 326                 /* If we're in 'normal' mode, don't urge data on the
 327                  * filter unless we're writing data, and the underlying
 328                  * bufferevent is accepting data, and we have data to
 329                  * give the filter.  If we're in 'flush' or 'finish',
 330                  * call the filter no matter what. */
 331                 if (!(bufev->enabled & EV_WRITE) ||
 332                     be_underlying_writebuf_full(bevf, state) ||
 333                     !evbuffer_get_length(bufev->output))
 334                         return BEV_OK;
 335         }
 336 
 337         /* disable the callback that calls this function
 338            when the user adds to the output buffer. */
 339         evbuffer_cb_set_flags(bufev->output, bevf->outbuf_cb, 0);
 340 
 341         do {
 342                 int processed = 0;
 343                 again = 0;
 344 
 345                 do {
 346                         ev_ssize_t limit = -1;
 347                         if (state == BEV_NORMAL &&
 348                             bevf->underlying->wm_write.high)
 349                                 limit = bevf->underlying->wm_write.high -
 350                                     evbuffer_get_length(bevf->underlying->output);
 351 
 352                         res = bevf->process_out(downcast(bevf)->output,
 353                             bevf->underlying->output,
 354                             limit,
 355                             state,
 356                             bevf->context);
 357 
 358                         if (res == BEV_OK)
 359                                 processed = *processed_out = 1;
 360                 } while (/* Stop if the filter wasn't successful...*/
 361                         res == BEV_OK &&
 362                         /* Or if we aren't writing any more. */
 363                         (bufev->enabled & EV_WRITE) &&
 364                         /* Of if we have nothing more to write and we are
 365                          * not flushing. */
 366                         evbuffer_get_length(bufev->output) &&
 367                         /* Or if we have filled the underlying output buffer. */
 368                         !be_underlying_writebuf_full(bevf,state));
 369 
 370                 if (processed &&
 371                     evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
 372                         /* call the write callback.*/
 373                         _bufferevent_run_writecb(bufev);
 374 
 375                         if (res == BEV_OK &&
 376                             (bufev->enabled & EV_WRITE) &&
 377                             evbuffer_get_length(bufev->output) &&
 378                             !be_underlying_writebuf_full(bevf, state)) {
 379                                 again = 1;
 380                         }
 381                 }
 382         } while (again);
 383 
 384         /* reenable the outbuf_cb */
 385         evbuffer_cb_set_flags(bufev->output,bevf->outbuf_cb,
 386             EVBUFFER_CB_ENABLED);
 387 
 388         if (*processed_out)
 389                 BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev);
 390 
 391         return res;
 392 }
 393 
 394 /* Called when the size of our outbuf changes. */
 395 static void
 396 bufferevent_filtered_outbuf_cb(struct evbuffer *buf,
 397     const struct evbuffer_cb_info *cbinfo, void *arg)
 398 {
 399         struct bufferevent_filtered *bevf = arg;
 400         struct bufferevent *bev = downcast(bevf);
 401 
 402         if (cbinfo->n_added) {
 403                 int processed_any = 0;
 404                 /* Somebody added more data to the output buffer. Try to
 405                  * process it, if we should. */
 406                 _bufferevent_incref_and_lock(bev);
 407                 be_filter_process_output(bevf, BEV_NORMAL, &processed_any);
 408                 _bufferevent_decref_and_unlock(bev);
 409         }
 410 }
 411 
 412 /* Called when the underlying socket has read. */
 413 static void
 414 be_filter_readcb(struct bufferevent *underlying, void *_me)
 415 {
 416         struct bufferevent_filtered *bevf = _me;
 417         enum bufferevent_filter_result res;
 418         enum bufferevent_flush_mode state;
 419         struct bufferevent *bufev = downcast(bevf);
 420         int processed_any = 0;
 421 
 422         _bufferevent_incref_and_lock(bufev);
 423 
 424         if (bevf->got_eof)
 425                 state = BEV_FINISHED;
 426         else
 427                 state = BEV_NORMAL;
 428 
 429         /* XXXX use return value */
 430         res = be_filter_process_input(bevf, state, &processed_any);
 431         (void)res;
 432 
 433         /* XXX This should be in process_input, not here.  There are
 434          * other places that can call process-input, and they should
 435          * force readcb calls as needed. */
 436         if (processed_any &&
 437             evbuffer_get_length(bufev->input) >= bufev->wm_read.low)
 438                 _bufferevent_run_readcb(bufev);
 439 
 440         _bufferevent_decref_and_unlock(bufev);
 441 }
 442 
 443 /* Called when the underlying socket has drained enough that we can write to
 444    it. */
 445 static void
 446 be_filter_writecb(struct bufferevent *underlying, void *_me)
 447 {
 448         struct bufferevent_filtered *bevf = _me;
 449         struct bufferevent *bev = downcast(bevf);
 450         int processed_any = 0;
 451 
 452         _bufferevent_incref_and_lock(bev);
 453         be_filter_process_output(bevf, BEV_NORMAL, &processed_any);
 454         _bufferevent_decref_and_unlock(bev);
 455 }
 456 
 457 /* Called when the underlying socket has given us an error */
 458 static void
 459 be_filter_eventcb(struct bufferevent *underlying, short what, void *_me)
 460 {
 461         struct bufferevent_filtered *bevf = _me;
 462         struct bufferevent *bev = downcast(bevf);
 463 
 464         _bufferevent_incref_and_lock(bev);
 465         /* All we can really to is tell our own eventcb. */
 466         _bufferevent_run_eventcb(bev, what);
 467         _bufferevent_decref_and_unlock(bev);
 468 }
 469 
 470 static int
 471 be_filter_flush(struct bufferevent *bufev,
 472     short iotype, enum bufferevent_flush_mode mode)
 473 {
 474         struct bufferevent_filtered *bevf = upcast(bufev);
 475         int processed_any = 0;
 476         EVUTIL_ASSERT(bevf);
 477 
 478         _bufferevent_incref_and_lock(bufev);
 479 
 480         if (iotype & EV_READ) {
 481                 be_filter_process_input(bevf, mode, &processed_any);
 482         }
 483         if (iotype & EV_WRITE) {
 484                 be_filter_process_output(bevf, mode, &processed_any);
 485         }
 486         /* XXX check the return value? */
 487         /* XXX does this want to recursively call lower-level flushes? */
 488         bufferevent_flush(bevf->underlying, iotype, mode);
 489 
 490         _bufferevent_decref_and_unlock(bufev);
 491 
 492         return processed_any;
 493 }
 494 
 495 static int
 496 be_filter_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
 497     union bufferevent_ctrl_data *data)
 498 {
 499         struct bufferevent_filtered *bevf;
 500         switch (op) {
 501         case BEV_CTRL_GET_UNDERLYING:
 502                 bevf = upcast(bev);
 503                 data->ptr = bevf->underlying;
 504                 return 0;
 505         case BEV_CTRL_GET_FD:
 506         case BEV_CTRL_SET_FD:
 507         case BEV_CTRL_CANCEL_ALL:
 508         default:
 509                 return -1;
 510         }
 511 }

/* [<][>][^][v][top][bottom][index][help] */