root/opal/mca/event/libevent2022/libevent/bufferevent_sock.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. bufferevent_socket_outbuf_cb
  2. bufferevent_readcb
  3. bufferevent_writecb
  4. bufferevent_socket_new
  5. bufferevent_socket_connect
  6. bufferevent_connect_getaddrinfo_cb
  7. bufferevent_socket_connect_hostname
  8. bufferevent_socket_get_dns_error
  9. bufferevent_new
  10. be_socket_enable
  11. be_socket_disable
  12. be_socket_destruct
  13. be_socket_adj_timeouts
  14. be_socket_flush
  15. be_socket_setfd
  16. bufferevent_priority_set
  17. bufferevent_base_set
  18. be_socket_ctrl

   1 /*
   2  * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
   3  * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
   4  * All rights reserved.
   5  *
   6  * Redistribution and use in source and binary forms, with or without
   7  * modification, are permitted provided that the following conditions
   8  * are met:
   9  * 1. Redistributions of source code must retain the above copyright
  10  *    notice, this list of conditions and the following disclaimer.
  11  * 2. Redistributions in binary form must reproduce the above copyright
  12  *    notice, this list of conditions and the following disclaimer in the
  13  *    documentation and/or other materials provided with the distribution.
  14  * 3. The name of the author may not be used to endorse or promote products
  15  *    derived from this software without specific prior written permission.
  16  *
  17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
  18  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
  19  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
  20  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
  21  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
  22  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  23  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  24  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  25  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
  26  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  27  */
  28 
  29 #include <sys/types.h>
  30 
  31 #include "event2/event-config.h"
  32 
  33 #ifdef _EVENT_HAVE_SYS_TIME_H
  34 #include <sys/time.h>
  35 #endif
  36 
  37 #include <errno.h>
  38 #include <stdio.h>
  39 #include <stdlib.h>
  40 #include <string.h>
  41 #ifdef _EVENT_HAVE_STDARG_H
  42 #include <stdarg.h>
  43 #endif
  44 #ifdef _EVENT_HAVE_UNISTD_H
  45 #include <unistd.h>
  46 #endif
  47 
  48 #ifdef WIN32
  49 #include <winsock2.h>
  50 #include <ws2tcpip.h>
  51 #endif
  52 
  53 #ifdef _EVENT_HAVE_SYS_SOCKET_H
  54 #include <sys/socket.h>
  55 #endif
  56 #ifdef _EVENT_HAVE_NETINET_IN_H
  57 #include <netinet/in.h>
  58 #endif
  59 #ifdef _EVENT_HAVE_NETINET_IN6_H
  60 #include <netinet/in6.h>
  61 #endif
  62 
  63 #include "event2/util.h"
  64 #include "event2/bufferevent.h"
  65 #include "event2/buffer.h"
  66 #include "event2/bufferevent_struct.h"
  67 #include "event2/bufferevent_compat.h"
  68 #include "event2/event.h"
  69 #include "log-internal.h"
  70 #include "mm-internal.h"
  71 #include "bufferevent-internal.h"
  72 #include "util-internal.h"
  73 #ifdef WIN32
  74 #include "iocp-internal.h"
  75 #endif
  76 
  77 /* prototypes */
  78 static int be_socket_enable(struct bufferevent *, short);
  79 static int be_socket_disable(struct bufferevent *, short);
  80 static void be_socket_destruct(struct bufferevent *);
  81 static int be_socket_adj_timeouts(struct bufferevent *);
  82 static int be_socket_flush(struct bufferevent *, short, enum bufferevent_flush_mode);
  83 static int be_socket_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
  84 
  85 static void be_socket_setfd(struct bufferevent *, evutil_socket_t);
  86 
  87 const struct bufferevent_ops bufferevent_ops_socket = {
  88         "socket",
  89         evutil_offsetof(struct bufferevent_private, bev),
  90         be_socket_enable,
  91         be_socket_disable,
  92         be_socket_destruct,
  93         be_socket_adj_timeouts,
  94         be_socket_flush,
  95         be_socket_ctrl,
  96 };
  97 
  98 #define be_socket_add(ev, t)                    \
  99         _bufferevent_add_event((ev), (t))
 100 
 101 static void
 102 bufferevent_socket_outbuf_cb(struct evbuffer *buf,
 103     const struct evbuffer_cb_info *cbinfo,
 104     void *arg)
 105 {
 106         struct bufferevent *bufev = arg;
 107         struct bufferevent_private *bufev_p =
 108             EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
 109 
 110         if (cbinfo->n_added &&
 111             (bufev->enabled & EV_WRITE) &&
 112             !event_pending(&bufev->ev_write, EV_WRITE, NULL) &&
 113             !bufev_p->write_suspended) {
 114                 /* Somebody added data to the buffer, and we would like to
 115                  * write, and we were not writing.  So, start writing. */
 116                 if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) == -1) {
 117                     /* Should we log this? */
 118                 }
 119         }
 120 }
 121 
 122 static void
 123 bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
 124 {
 125         struct bufferevent *bufev = arg;
 126         struct bufferevent_private *bufev_p =
 127             EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
 128         struct evbuffer *input;
 129         int res = 0;
 130         short what = BEV_EVENT_READING;
 131         ev_ssize_t howmuch = -1, readmax=-1;
 132 
 133         _bufferevent_incref_and_lock(bufev);
 134 
 135         if (event == EV_TIMEOUT) {
 136                 /* Note that we only check for event==EV_TIMEOUT. If
 137                  * event==EV_TIMEOUT|EV_READ, we can safely ignore the
 138                  * timeout, since a read has occurred */
 139                 what |= BEV_EVENT_TIMEOUT;
 140                 goto error;
 141         }
 142 
 143         input = bufev->input;
 144 
 145         /*
 146          * If we have a high watermark configured then we don't want to
 147          * read more data than would make us reach the watermark.
 148          */
 149         if (bufev->wm_read.high != 0) {
 150                 howmuch = bufev->wm_read.high - evbuffer_get_length(input);
 151                 /* we somehow lowered the watermark, stop reading */
 152                 if (howmuch <= 0) {
 153                         bufferevent_wm_suspend_read(bufev);
 154                         goto done;
 155                 }
 156         }
 157         readmax = _bufferevent_get_read_max(bufev_p);
 158         if (howmuch < 0 || howmuch > readmax) /* The use of -1 for "unlimited"
 159                                                * uglifies this code. XXXX */
 160                 howmuch = readmax;
 161         if (bufev_p->read_suspended)
 162                 goto done;
 163 
 164         evbuffer_unfreeze(input, 0);
 165         res = evbuffer_read(input, fd, (int)howmuch); /* XXXX evbuffer_read would do better to take and return ev_ssize_t */
 166         evbuffer_freeze(input, 0);
 167 
 168         if (res == -1) {
 169                 int err = evutil_socket_geterror(fd);
 170                 if (EVUTIL_ERR_RW_RETRIABLE(err))
 171                         goto reschedule;
 172                 /* error case */
 173                 what |= BEV_EVENT_ERROR;
 174         } else if (res == 0) {
 175                 /* eof case */
 176                 what |= BEV_EVENT_EOF;
 177         }
 178 
 179         if (res <= 0)
 180                 goto error;
 181 
 182         _bufferevent_decrement_read_buckets(bufev_p, res);
 183 
 184         /* Invoke the user callback - must always be called last */
 185         if (evbuffer_get_length(input) >= bufev->wm_read.low)
 186                 _bufferevent_run_readcb(bufev);
 187 
 188         goto done;
 189 
 190  reschedule:
 191         goto done;
 192 
 193  error:
 194         bufferevent_disable(bufev, EV_READ);
 195         _bufferevent_run_eventcb(bufev, what);
 196 
 197  done:
 198         _bufferevent_decref_and_unlock(bufev);
 199 }
 200 
 201 static void
 202 bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
 203 {
 204         struct bufferevent *bufev = arg;
 205         struct bufferevent_private *bufev_p =
 206             EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
 207         int res = 0;
 208         short what = BEV_EVENT_WRITING;
 209         int connected = 0;
 210         ev_ssize_t atmost = -1;
 211 
 212         _bufferevent_incref_and_lock(bufev);
 213 
 214         if (event == EV_TIMEOUT) {
 215                 /* Note that we only check for event==EV_TIMEOUT. If
 216                  * event==EV_TIMEOUT|EV_WRITE, we can safely ignore the
 217                  * timeout, since a read has occurred */
 218                 what |= BEV_EVENT_TIMEOUT;
 219                 goto error;
 220         }
 221         if (bufev_p->connecting) {
 222                 int c = evutil_socket_finished_connecting(fd);
 223                 /* we need to fake the error if the connection was refused
 224                  * immediately - usually connection to localhost on BSD */
 225                 if (bufev_p->connection_refused) {
 226                   bufev_p->connection_refused = 0;
 227                   c = -1;
 228                 }
 229 
 230                 if (c == 0)
 231                         goto done;
 232 
 233                 bufev_p->connecting = 0;
 234                 if (c < 0) {
 235                         event_del(&bufev->ev_write);
 236                         event_del(&bufev->ev_read);
 237                         _bufferevent_run_eventcb(bufev, BEV_EVENT_ERROR);
 238                         goto done;
 239                 } else {
 240                         connected = 1;
 241 #ifdef WIN32
 242                         if (BEV_IS_ASYNC(bufev)) {
 243                                 event_del(&bufev->ev_write);
 244                                 bufferevent_async_set_connected(bufev);
 245                                 _bufferevent_run_eventcb(bufev,
 246                                                 BEV_EVENT_CONNECTED);
 247                                 goto done;
 248                         }
 249 #endif
 250                         _bufferevent_run_eventcb(bufev,
 251                                         BEV_EVENT_CONNECTED);
 252                         if (!(bufev->enabled & EV_WRITE) ||
 253                             bufev_p->write_suspended) {
 254                                 event_del(&bufev->ev_write);
 255                                 goto done;
 256                         }
 257                 }
 258         }
 259 
 260         atmost = _bufferevent_get_write_max(bufev_p);
 261 
 262         if (bufev_p->write_suspended)
 263                 goto done;
 264 
 265         if (evbuffer_get_length(bufev->output)) {
 266                 evbuffer_unfreeze(bufev->output, 1);
 267                 res = evbuffer_write_atmost(bufev->output, fd, atmost);
 268                 evbuffer_freeze(bufev->output, 1);
 269                 if (res == -1) {
 270                         int err = evutil_socket_geterror(fd);
 271                         if (EVUTIL_ERR_RW_RETRIABLE(err))
 272                                 goto reschedule;
 273                         what |= BEV_EVENT_ERROR;
 274                 } else if (res == 0) {
 275                         /* eof case
 276                            XXXX Actually, a 0 on write doesn't indicate
 277                            an EOF. An ECONNRESET might be more typical.
 278                          */
 279                         what |= BEV_EVENT_EOF;
 280                 }
 281                 if (res <= 0)
 282                         goto error;
 283 
 284                 _bufferevent_decrement_write_buckets(bufev_p, res);
 285         }
 286 
 287         if (evbuffer_get_length(bufev->output) == 0) {
 288                 event_del(&bufev->ev_write);
 289         }
 290 
 291         /*
 292          * Invoke the user callback if our buffer is drained or below the
 293          * low watermark.
 294          */
 295         if ((res || !connected) &&
 296             evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
 297                 _bufferevent_run_writecb(bufev);
 298         }
 299 
 300         goto done;
 301 
 302  reschedule:
 303         if (evbuffer_get_length(bufev->output) == 0) {
 304                 event_del(&bufev->ev_write);
 305         }
 306         goto done;
 307 
 308  error:
 309         bufferevent_disable(bufev, EV_WRITE);
 310         _bufferevent_run_eventcb(bufev, what);
 311 
 312  done:
 313         _bufferevent_decref_and_unlock(bufev);
 314 }
 315 
 316 struct bufferevent *
 317 bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,
 318     int options)
 319 {
 320         struct bufferevent_private *bufev_p;
 321         struct bufferevent *bufev;
 322 
 323 #ifdef WIN32
 324         if (base && event_base_get_iocp(base))
 325                 return bufferevent_async_new(base, fd, options);
 326 #endif
 327 
 328         if ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private)))== NULL)
 329                 return NULL;
 330 
 331         if (bufferevent_init_common(bufev_p, base, &bufferevent_ops_socket,
 332                                     options) < 0) {
 333                 mm_free(bufev_p);
 334                 return NULL;
 335         }
 336         bufev = &bufev_p->bev;
 337         evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);
 338 
 339         event_assign(&bufev->ev_read, bufev->ev_base, fd,
 340             EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
 341         event_assign(&bufev->ev_write, bufev->ev_base, fd,
 342             EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);
 343 
 344         evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);
 345 
 346         evbuffer_freeze(bufev->input, 0);
 347         evbuffer_freeze(bufev->output, 1);
 348 
 349         return bufev;
 350 }
 351 
 352 int
 353 bufferevent_socket_connect(struct bufferevent *bev,
 354     struct sockaddr *sa, int socklen)
 355 {
 356         struct bufferevent_private *bufev_p =
 357             EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
 358 
 359         evutil_socket_t fd;
 360         int r = 0;
 361         int result=-1;
 362         int ownfd = 0;
 363 
 364         _bufferevent_incref_and_lock(bev);
 365 
 366         if (!bufev_p)
 367                 goto done;
 368 
 369         fd = bufferevent_getfd(bev);
 370         if (fd < 0) {
 371                 if (!sa)
 372                         goto done;
 373                 fd = socket(sa->sa_family, SOCK_STREAM, 0);
 374                 if (fd < 0)
 375                         goto done;
 376                 if (evutil_make_socket_nonblocking(fd)<0)
 377                         goto done;
 378                 ownfd = 1;
 379         }
 380         if (sa) {
 381 #ifdef WIN32
 382                 if (bufferevent_async_can_connect(bev)) {
 383                         bufferevent_setfd(bev, fd);
 384                         r = bufferevent_async_connect(bev, fd, sa, socklen);
 385                         if (r < 0)
 386                                 goto freesock;
 387                         bufev_p->connecting = 1;
 388                         result = 0;
 389                         goto done;
 390                 } else
 391 #endif
 392                 r = evutil_socket_connect(&fd, sa, socklen);
 393                 if (r < 0)
 394                         goto freesock;
 395         }
 396 #ifdef WIN32
 397         /* ConnectEx() isn't always around, even when IOCP is enabled.
 398          * Here, we borrow the socket object's write handler to fall back
 399          * on a non-blocking connect() when ConnectEx() is unavailable. */
 400         if (BEV_IS_ASYNC(bev)) {
 401                 event_assign(&bev->ev_write, bev->ev_base, fd,
 402                     EV_WRITE|EV_PERSIST, bufferevent_writecb, bev);
 403         }
 404 #endif
 405         bufferevent_setfd(bev, fd);
 406         if (r == 0) {
 407                 if (! be_socket_enable(bev, EV_WRITE)) {
 408                         bufev_p->connecting = 1;
 409                         result = 0;
 410                         goto done;
 411                 }
 412         } else if (r == 1) {
 413                 /* The connect succeeded already. How very BSD of it. */
 414                 result = 0;
 415                 bufev_p->connecting = 1;
 416                 event_active(&bev->ev_write, EV_WRITE, 1);
 417         } else {
 418                 /* The connect failed already.  How very BSD of it. */
 419                 bufev_p->connection_refused = 1;
 420                 bufev_p->connecting = 1;
 421                 result = 0;
 422                 event_active(&bev->ev_write, EV_WRITE, 1);
 423         }
 424 
 425         goto done;
 426 
 427 freesock:
 428         _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
 429         if (ownfd)
 430                 evutil_closesocket(fd);
 431         /* do something about the error? */
 432 done:
 433         _bufferevent_decref_and_unlock(bev);
 434         return result;
 435 }
 436 
 437 static void
 438 bufferevent_connect_getaddrinfo_cb(int result, struct evutil_addrinfo *ai,
 439     void *arg)
 440 {
 441         struct bufferevent *bev = arg;
 442         struct bufferevent_private *bev_p =
 443             EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
 444         int r;
 445         BEV_LOCK(bev);
 446 
 447         bufferevent_unsuspend_write(bev, BEV_SUSPEND_LOOKUP);
 448         bufferevent_unsuspend_read(bev, BEV_SUSPEND_LOOKUP);
 449 
 450         if (result != 0) {
 451                 bev_p->dns_error = result;
 452                 _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
 453                 _bufferevent_decref_and_unlock(bev);
 454                 if (ai)
 455                         evutil_freeaddrinfo(ai);
 456                 return;
 457         }
 458 
 459         /* XXX use the other addrinfos? */
 460         /* XXX use this return value */
 461         r = bufferevent_socket_connect(bev, ai->ai_addr, (int)ai->ai_addrlen);
 462         (void)r;
 463         _bufferevent_decref_and_unlock(bev);
 464         evutil_freeaddrinfo(ai);
 465 }
 466 
 467 int
 468 bufferevent_socket_connect_hostname(struct bufferevent *bev,
 469     struct evdns_base *evdns_base, int family, const char *hostname, int port)
 470 {
 471         char portbuf[10];
 472         struct evutil_addrinfo hint;
 473         int err;
 474         struct bufferevent_private *bev_p =
 475             EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
 476 
 477         if (family != AF_INET && family != AF_INET6 && family != AF_UNSPEC)
 478                 return -1;
 479         if (port < 1 || port > 65535)
 480                 return -1;
 481 
 482         BEV_LOCK(bev);
 483         bev_p->dns_error = 0;
 484         BEV_UNLOCK(bev);
 485 
 486         evutil_snprintf(portbuf, sizeof(portbuf), "%d", port);
 487 
 488         memset(&hint, 0, sizeof(hint));
 489         hint.ai_family = family;
 490         hint.ai_protocol = IPPROTO_TCP;
 491         hint.ai_socktype = SOCK_STREAM;
 492 
 493         bufferevent_suspend_write(bev, BEV_SUSPEND_LOOKUP);
 494         bufferevent_suspend_read(bev, BEV_SUSPEND_LOOKUP);
 495 
 496         bufferevent_incref(bev);
 497         err = evutil_getaddrinfo_async(evdns_base, hostname, portbuf,
 498             &hint, bufferevent_connect_getaddrinfo_cb, bev);
 499 
 500         if (err == 0) {
 501                 return 0;
 502         } else {
 503                 bufferevent_unsuspend_write(bev, BEV_SUSPEND_LOOKUP);
 504                 bufferevent_unsuspend_read(bev, BEV_SUSPEND_LOOKUP);
 505                 return -1;
 506         }
 507 }
 508 
 509 int
 510 bufferevent_socket_get_dns_error(struct bufferevent *bev)
 511 {
 512         int rv;
 513         struct bufferevent_private *bev_p =
 514             EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
 515 
 516         BEV_LOCK(bev);
 517         rv = bev_p->dns_error;
 518         BEV_UNLOCK(bev);
 519 
 520         return rv;
 521 }
 522 
 523 /*
 524  * Create a new buffered event object.
 525  *
 526  * The read callback is invoked whenever we read new data.
 527  * The write callback is invoked whenever the output buffer is drained.
 528  * The error callback is invoked on a write/read error or on EOF.
 529  *
 530  * Both read and write callbacks maybe NULL.  The error callback is not
 531  * allowed to be NULL and have to be provided always.
 532  */
 533 
 534 struct bufferevent *
 535 bufferevent_new(evutil_socket_t fd,
 536     bufferevent_data_cb readcb, bufferevent_data_cb writecb,
 537     bufferevent_event_cb eventcb, void *cbarg)
 538 {
 539         struct bufferevent *bufev;
 540 
 541         if (!(bufev = bufferevent_socket_new(NULL, fd, 0)))
 542                 return NULL;
 543 
 544         bufferevent_setcb(bufev, readcb, writecb, eventcb, cbarg);
 545 
 546         return bufev;
 547 }
 548 
 549 
 550 static int
 551 be_socket_enable(struct bufferevent *bufev, short event)
 552 {
 553         if (event & EV_READ) {
 554                 if (be_socket_add(&bufev->ev_read,&bufev->timeout_read) == -1)
 555                         return -1;
 556         }
 557         if (event & EV_WRITE) {
 558                 if (be_socket_add(&bufev->ev_write,&bufev->timeout_write) == -1)
 559                         return -1;
 560         }
 561         return 0;
 562 }
 563 
 564 static int
 565 be_socket_disable(struct bufferevent *bufev, short event)
 566 {
 567         struct bufferevent_private *bufev_p =
 568             EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
 569         if (event & EV_READ) {
 570                 if (event_del(&bufev->ev_read) == -1)
 571                         return -1;
 572         }
 573         /* Don't actually disable the write if we are trying to connect. */
 574         if ((event & EV_WRITE) && ! bufev_p->connecting) {
 575                 if (event_del(&bufev->ev_write) == -1)
 576                         return -1;
 577         }
 578         return 0;
 579 }
 580 
 581 static void
 582 be_socket_destruct(struct bufferevent *bufev)
 583 {
 584         struct bufferevent_private *bufev_p =
 585             EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
 586         evutil_socket_t fd;
 587         EVUTIL_ASSERT(bufev->be_ops == &bufferevent_ops_socket);
 588 
 589         fd = event_get_fd(&bufev->ev_read);
 590 
 591         event_del(&bufev->ev_read);
 592         event_del(&bufev->ev_write);
 593 
 594         if ((bufev_p->options & BEV_OPT_CLOSE_ON_FREE) && fd >= 0)
 595                 EVUTIL_CLOSESOCKET(fd);
 596 }
 597 
 598 static int
 599 be_socket_adj_timeouts(struct bufferevent *bufev)
 600 {
 601         int r = 0;
 602         if (event_pending(&bufev->ev_read, EV_READ, NULL))
 603                 if (be_socket_add(&bufev->ev_read, &bufev->timeout_read) < 0)
 604                         r = -1;
 605         if (event_pending(&bufev->ev_write, EV_WRITE, NULL)) {
 606                 if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) < 0)
 607                         r = -1;
 608         }
 609         return r;
 610 }
 611 
 612 static int
 613 be_socket_flush(struct bufferevent *bev, short iotype,
 614     enum bufferevent_flush_mode mode)
 615 {
 616         return 0;
 617 }
 618 
 619 
 620 static void
 621 be_socket_setfd(struct bufferevent *bufev, evutil_socket_t fd)
 622 {
 623         BEV_LOCK(bufev);
 624         EVUTIL_ASSERT(bufev->be_ops == &bufferevent_ops_socket);
 625 
 626         event_del(&bufev->ev_read);
 627         event_del(&bufev->ev_write);
 628 
 629         event_assign(&bufev->ev_read, bufev->ev_base, fd,
 630             EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
 631         event_assign(&bufev->ev_write, bufev->ev_base, fd,
 632             EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);
 633 
 634         if (fd >= 0)
 635                 bufferevent_enable(bufev, bufev->enabled);
 636 
 637         BEV_UNLOCK(bufev);
 638 }
 639 
 640 /* XXXX Should non-socket bufferevents support this? */
 641 int
 642 bufferevent_priority_set(struct bufferevent *bufev, int priority)
 643 {
 644         int r = -1;
 645 
 646         BEV_LOCK(bufev);
 647         if (bufev->be_ops != &bufferevent_ops_socket)
 648                 goto done;
 649 
 650         if (event_priority_set(&bufev->ev_read, priority) == -1)
 651                 goto done;
 652         if (event_priority_set(&bufev->ev_write, priority) == -1)
 653                 goto done;
 654 
 655         r = 0;
 656 done:
 657         BEV_UNLOCK(bufev);
 658         return r;
 659 }
 660 
 661 /* XXXX Should non-socket bufferevents support this? */
 662 int
 663 bufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
 664 {
 665         int res = -1;
 666 
 667         BEV_LOCK(bufev);
 668         if (bufev->be_ops != &bufferevent_ops_socket)
 669                 goto done;
 670 
 671         bufev->ev_base = base;
 672 
 673         res = event_base_set(base, &bufev->ev_read);
 674         if (res == -1)
 675                 goto done;
 676 
 677         res = event_base_set(base, &bufev->ev_write);
 678 done:
 679         BEV_UNLOCK(bufev);
 680         return res;
 681 }
 682 
 683 static int
 684 be_socket_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
 685     union bufferevent_ctrl_data *data)
 686 {
 687         switch (op) {
 688         case BEV_CTRL_SET_FD:
 689                 be_socket_setfd(bev, data->fd);
 690                 return 0;
 691         case BEV_CTRL_GET_FD:
 692                 data->fd = event_get_fd(&bev->ev_read);
 693                 return 0;
 694         case BEV_CTRL_GET_UNDERLYING:
 695         case BEV_CTRL_CANCEL_ALL:
 696         default:
 697                 return -1;
 698         }
 699 }

/* [<][>][^][v][top][bottom][index][help] */