root/opal/mca/event/libevent2022/libevent/bufferevent_async.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. upcast
  2. upcast_connect
  3. upcast_read
  4. upcast_write
  5. bev_async_del_write
  6. bev_async_del_read
  7. bev_async_add_write
  8. bev_async_add_read
  9. bev_async_consider_writing
  10. bev_async_consider_reading
  11. be_async_outbuf_callback
  12. be_async_inbuf_callback
  13. be_async_enable
  14. be_async_disable
  15. be_async_destruct
  16. bev_async_set_wsa_error
  17. be_async_flush
  18. connect_complete
  19. read_complete
  20. write_complete
  21. bufferevent_async_new
  22. bufferevent_async_set_connected
  23. bufferevent_async_can_connect
  24. bufferevent_async_connect
  25. be_async_ctrl

   1 /*
   2  * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson
   3  *
   4  * All rights reserved.
   5  *
   6  * Redistribution and use in source and binary forms, with or without
   7  * modification, are permitted provided that the following conditions
   8  * are met:
   9  * 1. Redistributions of source code must retain the above copyright
  10  *    notice, this list of conditions and the following disclaimer.
  11  * 2. Redistributions in binary form must reproduce the above copyright
  12  *    notice, this list of conditions and the following disclaimer in the
  13  *    documentation and/or other materials provided with the distribution.
  14  * 3. The name of the author may not be used to endorse or promote products
  15  *    derived from this software without specific prior written permission.
  16  *
  17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
  18  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
  19  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
  20  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
  21  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
  22  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  23  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  24  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  25  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
  26  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  27  */
  28 
  29 #include "event2/event-config.h"
  30 
  31 #ifdef _EVENT_HAVE_SYS_TIME_H
  32 #include <sys/time.h>
  33 #endif
  34 
  35 #include <errno.h>
  36 #include <stdio.h>
  37 #include <stdlib.h>
  38 #include <string.h>
  39 #ifdef _EVENT_HAVE_STDARG_H
  40 #include <stdarg.h>
  41 #endif
  42 #ifdef _EVENT_HAVE_UNISTD_H
  43 #include <unistd.h>
  44 #endif
  45 
  46 #ifdef WIN32
  47 #include <winsock2.h>
  48 #include <ws2tcpip.h>
  49 #endif
  50 
  51 #include <sys/queue.h>
  52 
  53 #include "event2/util.h"
  54 #include "event2/bufferevent.h"
  55 #include "event2/buffer.h"
  56 #include "event2/bufferevent_struct.h"
  57 #include "event2/event.h"
  58 #include "event2/util.h"
  59 #include "event-internal.h"
  60 #include "log-internal.h"
  61 #include "mm-internal.h"
  62 #include "bufferevent-internal.h"
  63 #include "util-internal.h"
  64 #include "iocp-internal.h"
  65 
  66 #ifndef SO_UPDATE_CONNECT_CONTEXT
  67 /* Mingw is sometimes missing this */
  68 #define SO_UPDATE_CONNECT_CONTEXT 0x7010
  69 #endif
  70 
  71 /* prototypes */
  72 static int be_async_enable(struct bufferevent *, short);
  73 static int be_async_disable(struct bufferevent *, short);
  74 static void be_async_destruct(struct bufferevent *);
  75 static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode);
  76 static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
  77 
  78 struct bufferevent_async {
  79         struct bufferevent_private bev;
  80         struct event_overlapped connect_overlapped;
  81         struct event_overlapped read_overlapped;
  82         struct event_overlapped write_overlapped;
  83         size_t read_in_progress;
  84         size_t write_in_progress;
  85         unsigned ok : 1;
  86         unsigned read_added : 1;
  87         unsigned write_added : 1;
  88 };
  89 
  90 const struct bufferevent_ops bufferevent_ops_async = {
  91         "socket_async",
  92         evutil_offsetof(struct bufferevent_async, bev.bev),
  93         be_async_enable,
  94         be_async_disable,
  95         be_async_destruct,
  96         _bufferevent_generic_adj_timeouts,
  97         be_async_flush,
  98         be_async_ctrl,
  99 };
 100 
 101 static inline struct bufferevent_async *
 102 upcast(struct bufferevent *bev)
 103 {
 104         struct bufferevent_async *bev_a;
 105         if (bev->be_ops != &bufferevent_ops_async)
 106                 return NULL;
 107         bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev);
 108         return bev_a;
 109 }
 110 
 111 static inline struct bufferevent_async *
 112 upcast_connect(struct event_overlapped *eo)
 113 {
 114         struct bufferevent_async *bev_a;
 115         bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped);
 116         EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
 117         return bev_a;
 118 }
 119 
 120 static inline struct bufferevent_async *
 121 upcast_read(struct event_overlapped *eo)
 122 {
 123         struct bufferevent_async *bev_a;
 124         bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped);
 125         EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
 126         return bev_a;
 127 }
 128 
 129 static inline struct bufferevent_async *
 130 upcast_write(struct event_overlapped *eo)
 131 {
 132         struct bufferevent_async *bev_a;
 133         bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped);
 134         EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
 135         return bev_a;
 136 }
 137 
 138 static void
 139 bev_async_del_write(struct bufferevent_async *beva)
 140 {
 141         struct bufferevent *bev = &beva->bev.bev;
 142 
 143         if (beva->write_added) {
 144                 beva->write_added = 0;
 145                 event_base_del_virtual(bev->ev_base);
 146         }
 147 }
 148 
 149 static void
 150 bev_async_del_read(struct bufferevent_async *beva)
 151 {
 152         struct bufferevent *bev = &beva->bev.bev;
 153 
 154         if (beva->read_added) {
 155                 beva->read_added = 0;
 156                 event_base_del_virtual(bev->ev_base);
 157         }
 158 }
 159 
 160 static void
 161 bev_async_add_write(struct bufferevent_async *beva)
 162 {
 163         struct bufferevent *bev = &beva->bev.bev;
 164 
 165         if (!beva->write_added) {
 166                 beva->write_added = 1;
 167                 event_base_add_virtual(bev->ev_base);
 168         }
 169 }
 170 
 171 static void
 172 bev_async_add_read(struct bufferevent_async *beva)
 173 {
 174         struct bufferevent *bev = &beva->bev.bev;
 175 
 176         if (!beva->read_added) {
 177                 beva->read_added = 1;
 178                 event_base_add_virtual(bev->ev_base);
 179         }
 180 }
 181 
 182 static void
 183 bev_async_consider_writing(struct bufferevent_async *beva)
 184 {
 185         size_t at_most;
 186         int limit;
 187         struct bufferevent *bev = &beva->bev.bev;
 188 
 189         /* Don't write if there's a write in progress, or we do not
 190          * want to write, or when there's nothing left to write. */
 191         if (beva->write_in_progress || beva->bev.connecting)
 192                 return;
 193         if (!beva->ok || !(bev->enabled&EV_WRITE) ||
 194             !evbuffer_get_length(bev->output)) {
 195                 bev_async_del_write(beva);
 196                 return;
 197         }
 198 
 199         at_most = evbuffer_get_length(bev->output);
 200 
 201         /* This is safe so long as bufferevent_get_write_max never returns
 202          * more than INT_MAX.  That's true for now. XXXX */
 203         limit = (int)_bufferevent_get_write_max(&beva->bev);
 204         if (at_most >= (size_t)limit && limit >= 0)
 205                 at_most = limit;
 206 
 207         if (beva->bev.write_suspended) {
 208                 bev_async_del_write(beva);
 209                 return;
 210         }
 211 
 212         /*  XXXX doesn't respect low-water mark very well. */
 213         bufferevent_incref(bev);
 214         if (evbuffer_launch_write(bev->output, at_most,
 215             &beva->write_overlapped)) {
 216                 bufferevent_decref(bev);
 217                 beva->ok = 0;
 218                 _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
 219         } else {
 220                 beva->write_in_progress = at_most;
 221                 _bufferevent_decrement_write_buckets(&beva->bev, at_most);
 222                 bev_async_add_write(beva);
 223         }
 224 }
 225 
 226 static void
 227 bev_async_consider_reading(struct bufferevent_async *beva)
 228 {
 229         size_t cur_size;
 230         size_t read_high;
 231         size_t at_most;
 232         int limit;
 233         struct bufferevent *bev = &beva->bev.bev;
 234 
 235         /* Don't read if there is a read in progress, or we do not
 236          * want to read. */
 237         if (beva->read_in_progress || beva->bev.connecting)
 238                 return;
 239         if (!beva->ok || !(bev->enabled&EV_READ)) {
 240                 bev_async_del_read(beva);
 241                 return;
 242         }
 243 
 244         /* Don't read if we're full */
 245         cur_size = evbuffer_get_length(bev->input);
 246         read_high = bev->wm_read.high;
 247         if (read_high) {
 248                 if (cur_size >= read_high) {
 249                         bev_async_del_read(beva);
 250                         return;
 251                 }
 252                 at_most = read_high - cur_size;
 253         } else {
 254                 at_most = 16384; /* FIXME totally magic. */
 255         }
 256 
 257         /* XXXX This over-commits. */
 258         /* XXXX see also not above on cast on _bufferevent_get_write_max() */
 259         limit = (int)_bufferevent_get_read_max(&beva->bev);
 260         if (at_most >= (size_t)limit && limit >= 0)
 261                 at_most = limit;
 262 
 263         if (beva->bev.read_suspended) {
 264                 bev_async_del_read(beva);
 265                 return;
 266         }
 267 
 268         bufferevent_incref(bev);
 269         if (evbuffer_launch_read(bev->input, at_most, &beva->read_overlapped)) {
 270                 beva->ok = 0;
 271                 _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
 272                 bufferevent_decref(bev);
 273         } else {
 274                 beva->read_in_progress = at_most;
 275                 _bufferevent_decrement_read_buckets(&beva->bev, at_most);
 276                 bev_async_add_read(beva);
 277         }
 278 
 279         return;
 280 }
 281 
 282 static void
 283 be_async_outbuf_callback(struct evbuffer *buf,
 284     const struct evbuffer_cb_info *cbinfo,
 285     void *arg)
 286 {
 287         struct bufferevent *bev = arg;
 288         struct bufferevent_async *bev_async = upcast(bev);
 289 
 290         /* If we added data to the outbuf and were not writing before,
 291          * we may want to write now. */
 292 
 293         _bufferevent_incref_and_lock(bev);
 294 
 295         if (cbinfo->n_added)
 296                 bev_async_consider_writing(bev_async);
 297 
 298         _bufferevent_decref_and_unlock(bev);
 299 }
 300 
 301 static void
 302 be_async_inbuf_callback(struct evbuffer *buf,
 303     const struct evbuffer_cb_info *cbinfo,
 304     void *arg)
 305 {
 306         struct bufferevent *bev = arg;
 307         struct bufferevent_async *bev_async = upcast(bev);
 308 
 309         /* If we drained data from the inbuf and were not reading before,
 310          * we may want to read now */
 311 
 312         _bufferevent_incref_and_lock(bev);
 313 
 314         if (cbinfo->n_deleted)
 315                 bev_async_consider_reading(bev_async);
 316 
 317         _bufferevent_decref_and_unlock(bev);
 318 }
 319 
 320 static int
 321 be_async_enable(struct bufferevent *buf, short what)
 322 {
 323         struct bufferevent_async *bev_async = upcast(buf);
 324 
 325         if (!bev_async->ok)
 326                 return -1;
 327 
 328         if (bev_async->bev.connecting) {
 329                 /* Don't launch anything during connection attempts. */
 330                 return 0;
 331         }
 332 
 333         if (what & EV_READ)
 334                 BEV_RESET_GENERIC_READ_TIMEOUT(buf);
 335         if (what & EV_WRITE)
 336                 BEV_RESET_GENERIC_WRITE_TIMEOUT(buf);
 337 
 338         /* If we newly enable reading or writing, and we aren't reading or
 339            writing already, consider launching a new read or write. */
 340 
 341         if (what & EV_READ)
 342                 bev_async_consider_reading(bev_async);
 343         if (what & EV_WRITE)
 344                 bev_async_consider_writing(bev_async);
 345         return 0;
 346 }
 347 
 348 static int
 349 be_async_disable(struct bufferevent *bev, short what)
 350 {
 351         struct bufferevent_async *bev_async = upcast(bev);
 352         /* XXXX If we disable reading or writing, we may want to consider
 353          * canceling any in-progress read or write operation, though it might
 354          * not work. */
 355 
 356         if (what & EV_READ) {
 357                 BEV_DEL_GENERIC_READ_TIMEOUT(bev);
 358                 bev_async_del_read(bev_async);
 359         }
 360         if (what & EV_WRITE) {
 361                 BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
 362                 bev_async_del_write(bev_async);
 363         }
 364 
 365         return 0;
 366 }
 367 
 368 static void
 369 be_async_destruct(struct bufferevent *bev)
 370 {
 371         struct bufferevent_async *bev_async = upcast(bev);
 372         struct bufferevent_private *bev_p = BEV_UPCAST(bev);
 373         evutil_socket_t fd;
 374 
 375         EVUTIL_ASSERT(!upcast(bev)->write_in_progress &&
 376                         !upcast(bev)->read_in_progress);
 377 
 378         bev_async_del_read(bev_async);
 379         bev_async_del_write(bev_async);
 380 
 381         fd = _evbuffer_overlapped_get_fd(bev->input);
 382         if (bev_p->options & BEV_OPT_CLOSE_ON_FREE) {
 383                 /* XXXX possible double-close */
 384                 evutil_closesocket(fd);
 385         }
 386         /* delete this in case non-blocking connect was used */
 387         if (event_initialized(&bev->ev_write)) {
 388                 event_del(&bev->ev_write);
 389                 _bufferevent_del_generic_timeout_cbs(bev);
 390         }
 391 }
 392 
 393 /* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so
 394  * we use WSAGetOverlappedResult to translate. */
 395 static void
 396 bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo)
 397 {
 398         DWORD bytes, flags;
 399         evutil_socket_t fd;
 400 
 401         fd = _evbuffer_overlapped_get_fd(bev->input);
 402         WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags);
 403 }
 404 
 405 static int
 406 be_async_flush(struct bufferevent *bev, short what,
 407     enum bufferevent_flush_mode mode)
 408 {
 409         return 0;
 410 }
 411 
 412 static void
 413 connect_complete(struct event_overlapped *eo, ev_uintptr_t key,
 414     ev_ssize_t nbytes, int ok)
 415 {
 416         struct bufferevent_async *bev_a = upcast_connect(eo);
 417         struct bufferevent *bev = &bev_a->bev.bev;
 418         evutil_socket_t sock;
 419 
 420         BEV_LOCK(bev);
 421 
 422         EVUTIL_ASSERT(bev_a->bev.connecting);
 423         bev_a->bev.connecting = 0;
 424         sock = _evbuffer_overlapped_get_fd(bev_a->bev.bev.input);
 425         /* XXXX Handle error? */
 426         setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
 427 
 428         if (ok)
 429                 bufferevent_async_set_connected(bev);
 430         else
 431                 bev_async_set_wsa_error(bev, eo);
 432 
 433         _bufferevent_run_eventcb(bev,
 434                         ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR);
 435 
 436         event_base_del_virtual(bev->ev_base);
 437 
 438         _bufferevent_decref_and_unlock(bev);
 439 }
 440 
 441 static void
 442 read_complete(struct event_overlapped *eo, ev_uintptr_t key,
 443     ev_ssize_t nbytes, int ok)
 444 {
 445         struct bufferevent_async *bev_a = upcast_read(eo);
 446         struct bufferevent *bev = &bev_a->bev.bev;
 447         short what = BEV_EVENT_READING;
 448         ev_ssize_t amount_unread;
 449         BEV_LOCK(bev);
 450         EVUTIL_ASSERT(bev_a->read_in_progress);
 451 
 452         amount_unread = bev_a->read_in_progress - nbytes;
 453         evbuffer_commit_read(bev->input, nbytes);
 454         bev_a->read_in_progress = 0;
 455         if (amount_unread)
 456                 _bufferevent_decrement_read_buckets(&bev_a->bev, -amount_unread);
 457 
 458         if (!ok)
 459                 bev_async_set_wsa_error(bev, eo);
 460 
 461         if (bev_a->ok) {
 462                 if (ok && nbytes) {
 463                         BEV_RESET_GENERIC_READ_TIMEOUT(bev);
 464                         if (evbuffer_get_length(bev->input) >= bev->wm_read.low)
 465                                 _bufferevent_run_readcb(bev);
 466                         bev_async_consider_reading(bev_a);
 467                 } else if (!ok) {
 468                         what |= BEV_EVENT_ERROR;
 469                         bev_a->ok = 0;
 470                         _bufferevent_run_eventcb(bev, what);
 471                 } else if (!nbytes) {
 472                         what |= BEV_EVENT_EOF;
 473                         bev_a->ok = 0;
 474                         _bufferevent_run_eventcb(bev, what);
 475                 }
 476         }
 477 
 478         _bufferevent_decref_and_unlock(bev);
 479 }
 480 
 481 static void
 482 write_complete(struct event_overlapped *eo, ev_uintptr_t key,
 483     ev_ssize_t nbytes, int ok)
 484 {
 485         struct bufferevent_async *bev_a = upcast_write(eo);
 486         struct bufferevent *bev = &bev_a->bev.bev;
 487         short what = BEV_EVENT_WRITING;
 488         ev_ssize_t amount_unwritten;
 489 
 490         BEV_LOCK(bev);
 491         EVUTIL_ASSERT(bev_a->write_in_progress);
 492 
 493         amount_unwritten = bev_a->write_in_progress - nbytes;
 494         evbuffer_commit_write(bev->output, nbytes);
 495         bev_a->write_in_progress = 0;
 496 
 497         if (amount_unwritten)
 498                 _bufferevent_decrement_write_buckets(&bev_a->bev,
 499                                                      -amount_unwritten);
 500 
 501 
 502         if (!ok)
 503                 bev_async_set_wsa_error(bev, eo);
 504 
 505         if (bev_a->ok) {
 506                 if (ok && nbytes) {
 507                         BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
 508                         if (evbuffer_get_length(bev->output) <=
 509                             bev->wm_write.low)
 510                                 _bufferevent_run_writecb(bev);
 511                         bev_async_consider_writing(bev_a);
 512                 } else if (!ok) {
 513                         what |= BEV_EVENT_ERROR;
 514                         bev_a->ok = 0;
 515                         _bufferevent_run_eventcb(bev, what);
 516                 } else if (!nbytes) {
 517                         what |= BEV_EVENT_EOF;
 518                         bev_a->ok = 0;
 519                         _bufferevent_run_eventcb(bev, what);
 520                 }
 521         }
 522 
 523         _bufferevent_decref_and_unlock(bev);
 524 }
 525 
 526 struct bufferevent *
 527 bufferevent_async_new(struct event_base *base,
 528     evutil_socket_t fd, int options)
 529 {
 530         struct bufferevent_async *bev_a;
 531         struct bufferevent *bev;
 532         struct event_iocp_port *iocp;
 533 
 534         options |= BEV_OPT_THREADSAFE;
 535 
 536         if (!(iocp = event_base_get_iocp(base)))
 537                 return NULL;
 538 
 539         if (fd >= 0 && event_iocp_port_associate(iocp, fd, 1)<0) {
 540                 int err = GetLastError();
 541                 /* We may have alrady associated this fd with a port.
 542                  * Let's hope it's this port, and that the error code
 543                  * for doing this neer changes. */
 544                 if (err != ERROR_INVALID_PARAMETER)
 545                         return NULL;
 546         }
 547 
 548         if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async))))
 549                 return NULL;
 550 
 551         bev = &bev_a->bev.bev;
 552         if (!(bev->input = evbuffer_overlapped_new(fd))) {
 553                 mm_free(bev_a);
 554                 return NULL;
 555         }
 556         if (!(bev->output = evbuffer_overlapped_new(fd))) {
 557                 evbuffer_free(bev->input);
 558                 mm_free(bev_a);
 559                 return NULL;
 560         }
 561 
 562         if (bufferevent_init_common(&bev_a->bev, base, &bufferevent_ops_async,
 563                 options)<0)
 564                 goto err;
 565 
 566         evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
 567         evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev);
 568 
 569         event_overlapped_init(&bev_a->connect_overlapped, connect_complete);
 570         event_overlapped_init(&bev_a->read_overlapped, read_complete);
 571         event_overlapped_init(&bev_a->write_overlapped, write_complete);
 572 
 573         bev_a->ok = fd >= 0;
 574         if (bev_a->ok)
 575                 _bufferevent_init_generic_timeout_cbs(bev);
 576 
 577         return bev;
 578 err:
 579         bufferevent_free(&bev_a->bev.bev);
 580         return NULL;
 581 }
 582 
 583 void
 584 bufferevent_async_set_connected(struct bufferevent *bev)
 585 {
 586         struct bufferevent_async *bev_async = upcast(bev);
 587         bev_async->ok = 1;
 588         _bufferevent_init_generic_timeout_cbs(bev);
 589         /* Now's a good time to consider reading/writing */
 590         be_async_enable(bev, bev->enabled);
 591 }
 592 
 593 int
 594 bufferevent_async_can_connect(struct bufferevent *bev)
 595 {
 596         const struct win32_extension_fns *ext =
 597             event_get_win32_extension_fns();
 598 
 599         if (BEV_IS_ASYNC(bev) &&
 600             event_base_get_iocp(bev->ev_base) &&
 601             ext && ext->ConnectEx)
 602                 return 1;
 603 
 604         return 0;
 605 }
 606 
 607 int
 608 bufferevent_async_connect(struct bufferevent *bev, evutil_socket_t fd,
 609         const struct sockaddr *sa, int socklen)
 610 {
 611         BOOL rc;
 612         struct bufferevent_async *bev_async = upcast(bev);
 613         struct sockaddr_storage ss;
 614         const struct win32_extension_fns *ext =
 615             event_get_win32_extension_fns();
 616 
 617         EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL);
 618 
 619         /* ConnectEx() requires that the socket be bound to an address
 620          * with bind() before using, otherwise it will fail. We attempt
 621          * to issue a bind() here, taking into account that the error
 622          * code is set to WSAEINVAL when the socket is already bound. */
 623         memset(&ss, 0, sizeof(ss));
 624         if (sa->sa_family == AF_INET) {
 625                 struct sockaddr_in *sin = (struct sockaddr_in *)&ss;
 626                 sin->sin_family = AF_INET;
 627                 sin->sin_addr.s_addr = INADDR_ANY;
 628         } else if (sa->sa_family == AF_INET6) {
 629                 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss;
 630                 sin6->sin6_family = AF_INET6;
 631                 sin6->sin6_addr = in6addr_any;
 632         } else {
 633                 /* Well, the user will have to bind() */
 634                 return -1;
 635         }
 636         if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 &&
 637             WSAGetLastError() != WSAEINVAL)
 638                 return -1;
 639 
 640         event_base_add_virtual(bev->ev_base);
 641         bufferevent_incref(bev);
 642         rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL,
 643                             &bev_async->connect_overlapped.overlapped);
 644         if (rc || WSAGetLastError() == ERROR_IO_PENDING)
 645                 return 0;
 646 
 647         event_base_del_virtual(bev->ev_base);
 648         bufferevent_decref(bev);
 649 
 650         return -1;
 651 }
 652 
 653 static int
 654 be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
 655     union bufferevent_ctrl_data *data)
 656 {
 657         switch (op) {
 658         case BEV_CTRL_GET_FD:
 659                 data->fd = _evbuffer_overlapped_get_fd(bev->input);
 660                 return 0;
 661         case BEV_CTRL_SET_FD: {
 662                 struct event_iocp_port *iocp;
 663 
 664                 if (data->fd == _evbuffer_overlapped_get_fd(bev->input))
 665                         return 0;
 666                 if (!(iocp = event_base_get_iocp(bev->ev_base)))
 667                         return -1;
 668                 if (event_iocp_port_associate(iocp, data->fd, 1) < 0)
 669                         return -1;
 670                 _evbuffer_overlapped_set_fd(bev->input, data->fd);
 671                 _evbuffer_overlapped_set_fd(bev->output, data->fd);
 672                 return 0;
 673         }
 674         case BEV_CTRL_CANCEL_ALL: {
 675                 struct bufferevent_async *bev_a = upcast(bev);
 676                 evutil_socket_t fd = _evbuffer_overlapped_get_fd(bev->input);
 677                 if (fd != (evutil_socket_t)INVALID_SOCKET &&
 678                     (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) {
 679                         closesocket(fd);
 680                 }
 681                 bev_a->ok = 0;
 682                 return 0;
 683         }
 684         case BEV_CTRL_GET_UNDERLYING:
 685         default:
 686                 return -1;
 687         }
 688 }
 689 
 690 

/* [<][>][^][v][top][bottom][index][help] */