root/opal/mca/event/libevent2022/libevent/bufferevent_pair.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. upcast
  2. incref_and_lock
  3. decref_and_unlock
  4. bufferevent_pair_elt_new
  5. bufferevent_pair_new
  6. be_pair_transfer
  7. be_pair_wants_to_talk
  8. be_pair_outbuf_cb
  9. be_pair_enable
  10. be_pair_disable
  11. be_pair_destruct
  12. be_pair_flush
  13. bufferevent_pair_get_partner

   1 /*
   2  * Copyright (c) 2009-2012 Niels Provos, Nick Mathewson
   3  *
   4  * Redistribution and use in source and binary forms, with or without
   5  * modification, are permitted provided that the following conditions
   6  * are met:
   7  * 1. Redistributions of source code must retain the above copyright
   8  *    notice, this list of conditions and the following disclaimer.
   9  * 2. Redistributions in binary form must reproduce the above copyright
  10  *    notice, this list of conditions and the following disclaimer in the
  11  *    documentation and/or other materials provided with the distribution.
  12  * 3. The name of the author may not be used to endorse or promote products
  13  *    derived from this software without specific prior written permission.
  14  *
  15  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
  16  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
  17  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
  18  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
  19  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
  20  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  21  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  22  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  23  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
  24  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  25  */
  26 
  27 #include <sys/types.h>
  28 
  29 #ifdef WIN32
  30 #include <winsock2.h>
  31 #endif
  32 
  33 #include "event2/event-config.h"
  34 
  35 #include "event2/util.h"
  36 #include "event2/buffer.h"
  37 #include "event2/bufferevent.h"
  38 #include "event2/bufferevent_struct.h"
  39 #include "event2/event.h"
  40 #include "defer-internal.h"
  41 #include "bufferevent-internal.h"
  42 #include "mm-internal.h"
  43 #include "util-internal.h"
  44 
  45 struct bufferevent_pair {
  46         struct bufferevent_private bev;
  47         struct bufferevent_pair *partner;
  48 };
  49 
  50 
  51 /* Given a bufferevent that's really a bev part of a bufferevent_pair,
  52  * return that bufferevent_filtered. Returns NULL otherwise.*/
  53 static inline struct bufferevent_pair *
  54 upcast(struct bufferevent *bev)
  55 {
  56         struct bufferevent_pair *bev_p;
  57         if (bev->be_ops != &bufferevent_ops_pair)
  58                 return NULL;
  59         bev_p = EVUTIL_UPCAST(bev, struct bufferevent_pair, bev.bev);
  60         EVUTIL_ASSERT(bev_p->bev.bev.be_ops == &bufferevent_ops_pair);
  61         return bev_p;
  62 }
  63 
  64 #define downcast(bev_pair) (&(bev_pair)->bev.bev)
  65 
  66 static inline void
  67 incref_and_lock(struct bufferevent *b)
  68 {
  69         struct bufferevent_pair *bevp;
  70         _bufferevent_incref_and_lock(b);
  71         bevp = upcast(b);
  72         if (bevp->partner)
  73                 _bufferevent_incref_and_lock(downcast(bevp->partner));
  74 }
  75 
  76 static inline void
  77 decref_and_unlock(struct bufferevent *b)
  78 {
  79         struct bufferevent_pair *bevp = upcast(b);
  80         if (bevp->partner)
  81                 _bufferevent_decref_and_unlock(downcast(bevp->partner));
  82         _bufferevent_decref_and_unlock(b);
  83 }
  84 
  85 /* XXX Handle close */
  86 
  87 static void be_pair_outbuf_cb(struct evbuffer *,
  88     const struct evbuffer_cb_info *, void *);
  89 
  90 static struct bufferevent_pair *
  91 bufferevent_pair_elt_new(struct event_base *base,
  92     int options)
  93 {
  94         struct bufferevent_pair *bufev;
  95         if (! (bufev = mm_calloc(1, sizeof(struct bufferevent_pair))))
  96                 return NULL;
  97         if (bufferevent_init_common(&bufev->bev, base, &bufferevent_ops_pair,
  98                 options)) {
  99                 mm_free(bufev);
 100                 return NULL;
 101         }
 102         if (!evbuffer_add_cb(bufev->bev.bev.output, be_pair_outbuf_cb, bufev)) {
 103                 bufferevent_free(downcast(bufev));
 104                 return NULL;
 105         }
 106 
 107         _bufferevent_init_generic_timeout_cbs(&bufev->bev.bev);
 108 
 109         return bufev;
 110 }
 111 
 112 int
 113 bufferevent_pair_new(struct event_base *base, int options,
 114     struct bufferevent *pair[2])
 115 {
 116         struct bufferevent_pair *bufev1 = NULL, *bufev2 = NULL;
 117         int tmp_options;
 118 
 119         options |= BEV_OPT_DEFER_CALLBACKS;
 120         tmp_options = options & ~BEV_OPT_THREADSAFE;
 121 
 122         bufev1 = bufferevent_pair_elt_new(base, options);
 123         if (!bufev1)
 124                 return -1;
 125         bufev2 = bufferevent_pair_elt_new(base, tmp_options);
 126         if (!bufev2) {
 127                 bufferevent_free(downcast(bufev1));
 128                 return -1;
 129         }
 130 
 131         if (options & BEV_OPT_THREADSAFE) {
 132                 /*XXXX check return */
 133                 bufferevent_enable_locking(downcast(bufev2), bufev1->bev.lock);
 134         }
 135 
 136         bufev1->partner = bufev2;
 137         bufev2->partner = bufev1;
 138 
 139         evbuffer_freeze(downcast(bufev1)->input, 0);
 140         evbuffer_freeze(downcast(bufev1)->output, 1);
 141         evbuffer_freeze(downcast(bufev2)->input, 0);
 142         evbuffer_freeze(downcast(bufev2)->output, 1);
 143 
 144         pair[0] = downcast(bufev1);
 145         pair[1] = downcast(bufev2);
 146 
 147         return 0;
 148 }
 149 
 150 static void
 151 be_pair_transfer(struct bufferevent *src, struct bufferevent *dst,
 152     int ignore_wm)
 153 {
 154         size_t src_size, dst_size;
 155         size_t n;
 156 
 157         evbuffer_unfreeze(src->output, 1);
 158         evbuffer_unfreeze(dst->input, 0);
 159 
 160         if (dst->wm_read.high) {
 161                 dst_size = evbuffer_get_length(dst->input);
 162                 if (dst_size < dst->wm_read.high) {
 163                         n = dst->wm_read.high - dst_size;
 164                         evbuffer_remove_buffer(src->output, dst->input, n);
 165                 } else {
 166                         if (!ignore_wm)
 167                                 goto done;
 168                         n = evbuffer_get_length(src->output);
 169                         evbuffer_add_buffer(dst->input, src->output);
 170                 }
 171         } else {
 172                 n = evbuffer_get_length(src->output);
 173                 evbuffer_add_buffer(dst->input, src->output);
 174         }
 175 
 176         if (n) {
 177                 BEV_RESET_GENERIC_READ_TIMEOUT(dst);
 178 
 179                 if (evbuffer_get_length(dst->output))
 180                         BEV_RESET_GENERIC_WRITE_TIMEOUT(dst);
 181                 else
 182                         BEV_DEL_GENERIC_WRITE_TIMEOUT(dst);
 183         }
 184 
 185         src_size = evbuffer_get_length(src->output);
 186         dst_size = evbuffer_get_length(dst->input);
 187 
 188         if (dst_size >= dst->wm_read.low) {
 189                 _bufferevent_run_readcb(dst);
 190         }
 191         if (src_size <= src->wm_write.low) {
 192                 _bufferevent_run_writecb(src);
 193         }
 194 done:
 195         evbuffer_freeze(src->output, 1);
 196         evbuffer_freeze(dst->input, 0);
 197 }
 198 
 199 static inline int
 200 be_pair_wants_to_talk(struct bufferevent_pair *src,
 201     struct bufferevent_pair *dst)
 202 {
 203         return (downcast(src)->enabled & EV_WRITE) &&
 204             (downcast(dst)->enabled & EV_READ) &&
 205             !dst->bev.read_suspended &&
 206             evbuffer_get_length(downcast(src)->output);
 207 }
 208 
 209 static void
 210 be_pair_outbuf_cb(struct evbuffer *outbuf,
 211     const struct evbuffer_cb_info *info, void *arg)
 212 {
 213         struct bufferevent_pair *bev_pair = arg;
 214         struct bufferevent_pair *partner = bev_pair->partner;
 215 
 216         incref_and_lock(downcast(bev_pair));
 217 
 218         if (info->n_added > info->n_deleted && partner) {
 219                 /* We got more data.  If the other side's reading, then
 220                    hand it over. */
 221                 if (be_pair_wants_to_talk(bev_pair, partner)) {
 222                         be_pair_transfer(downcast(bev_pair), downcast(partner), 0);
 223                 }
 224         }
 225 
 226         decref_and_unlock(downcast(bev_pair));
 227 }
 228 
 229 static int
 230 be_pair_enable(struct bufferevent *bufev, short events)
 231 {
 232         struct bufferevent_pair *bev_p = upcast(bufev);
 233         struct bufferevent_pair *partner = bev_p->partner;
 234 
 235         incref_and_lock(bufev);
 236 
 237         if (events & EV_READ) {
 238                 BEV_RESET_GENERIC_READ_TIMEOUT(bufev);
 239         }
 240         if ((events & EV_WRITE) && evbuffer_get_length(bufev->output))
 241                 BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev);
 242 
 243         /* We're starting to read! Does the other side have anything to write?*/
 244         if ((events & EV_READ) && partner &&
 245             be_pair_wants_to_talk(partner, bev_p)) {
 246                 be_pair_transfer(downcast(partner), bufev, 0);
 247         }
 248         /* We're starting to write! Does the other side want to read? */
 249         if ((events & EV_WRITE) && partner &&
 250             be_pair_wants_to_talk(bev_p, partner)) {
 251                 be_pair_transfer(bufev, downcast(partner), 0);
 252         }
 253         decref_and_unlock(bufev);
 254         return 0;
 255 }
 256 
 257 static int
 258 be_pair_disable(struct bufferevent *bev, short events)
 259 {
 260         if (events & EV_READ) {
 261                 BEV_DEL_GENERIC_READ_TIMEOUT(bev);
 262         }
 263         if (events & EV_WRITE)
 264                 BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
 265         return 0;
 266 }
 267 
 268 static void
 269 be_pair_destruct(struct bufferevent *bev)
 270 {
 271         struct bufferevent_pair *bev_p = upcast(bev);
 272 
 273         if (bev_p->partner) {
 274                 bev_p->partner->partner = NULL;
 275                 bev_p->partner = NULL;
 276         }
 277 
 278         _bufferevent_del_generic_timeout_cbs(bev);
 279 }
 280 
 281 static int
 282 be_pair_flush(struct bufferevent *bev, short iotype,
 283     enum bufferevent_flush_mode mode)
 284 {
 285         struct bufferevent_pair *bev_p = upcast(bev);
 286         struct bufferevent *partner;
 287         incref_and_lock(bev);
 288         if (!bev_p->partner)
 289                 return -1;
 290 
 291         partner = downcast(bev_p->partner);
 292 
 293         if (mode == BEV_NORMAL)
 294                 return 0;
 295 
 296         if ((iotype & EV_READ) != 0)
 297                 be_pair_transfer(partner, bev, 1);
 298 
 299         if ((iotype & EV_WRITE) != 0)
 300                 be_pair_transfer(bev, partner, 1);
 301 
 302         if (mode == BEV_FINISHED) {
 303                 _bufferevent_run_eventcb(partner, iotype|BEV_EVENT_EOF);
 304         }
 305         decref_and_unlock(bev);
 306         return 0;
 307 }
 308 
 309 struct bufferevent *
 310 bufferevent_pair_get_partner(struct bufferevent *bev)
 311 {
 312         struct bufferevent_pair *bev_p;
 313         struct bufferevent *partner = NULL;
 314         bev_p = upcast(bev);
 315         if (! bev_p)
 316                 return NULL;
 317 
 318         incref_and_lock(bev);
 319         if (bev_p->partner)
 320                 partner = downcast(bev_p->partner);
 321         decref_and_unlock(bev);
 322         return partner;
 323 }
 324 
 325 const struct bufferevent_ops bufferevent_ops_pair = {
 326         "pair_elt",
 327         evutil_offsetof(struct bufferevent_pair, bev.bev),
 328         be_pair_enable,
 329         be_pair_disable,
 330         be_pair_destruct,
 331         _bufferevent_generic_adj_timeouts,
 332         be_pair_flush,
 333         NULL, /* ctrl */
 334 };

/* [<][>][^][v][top][bottom][index][help] */