root/opal/mca/event/libevent2022/libevent/buffer_iocp.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. upcast_evbuffer
  2. pin_release
  3. evbuffer_commit_read
  4. evbuffer_commit_write
  5. evbuffer_overlapped_new
  6. evbuffer_launch_write
  7. evbuffer_launch_read
  8. _evbuffer_overlapped_get_fd
  9. _evbuffer_overlapped_set_fd

   1 /*
   2  * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson
   3  *
   4  * Redistribution and use in source and binary forms, with or without
   5  * modification, are permitted provided that the following conditions
   6  * are met:
   7  * 1. Redistributions of source code must retain the above copyright
   8  *    notice, this list of conditions and the following disclaimer.
   9  * 2. Redistributions in binary form must reproduce the above copyright
  10  *    notice, this list of conditions and the following disclaimer in the
  11  *    documentation and/or other materials provided with the distribution.
  12  * 3. The name of the author may not be used to endorse or promote products
  13  *    derived from this software without specific prior written permission.
  14  *
  15  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
  16  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
  17  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
  18  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
  19  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
  20  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  21  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  22  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  23  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
  24  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  25  */
  26 
  27 /**
  28    @file buffer_iocp.c
  29 
  30    This module implements overlapped read and write functions for evbuffer
  31    objects on Windows.
  32 */
  33 
  34 #include "event2/buffer.h"
  35 #include "event2/buffer_compat.h"
  36 #include "event2/util.h"
  37 #include "event2/thread.h"
  38 #include "event2/event-config.h"
  39 #include "util-internal.h"
  40 #include "evthread-internal.h"
  41 #include "evbuffer-internal.h"
  42 #include "iocp-internal.h"
  43 #include "mm-internal.h"
  44 
  45 #include <winsock2.h>
  46 #include <windows.h>
  47 #include <stdio.h>
  48 
  49 #define MAX_WSABUFS 16
  50 
  51 /** An evbuffer that can handle overlapped IO. */
  52 struct evbuffer_overlapped {
  53         struct evbuffer buffer;
  54         /** The socket that we're doing overlapped IO on. */
  55         evutil_socket_t fd;
  56 
  57         /** pending I/O type */
  58         unsigned read_in_progress : 1;
  59         unsigned write_in_progress : 1;
  60 
  61         /** The first pinned chain in the buffer. */
  62         struct evbuffer_chain *first_pinned;
  63 
  64         /** How many chains are pinned; how many of the fields in buffers
  65          * are we using. */
  66         int n_buffers;
  67         WSABUF buffers[MAX_WSABUFS];
  68 };
  69 
  70 /** Given an evbuffer, return the correponding evbuffer structure, or NULL if
  71  * the evbuffer isn't overlapped. */
  72 static inline struct evbuffer_overlapped *
  73 upcast_evbuffer(struct evbuffer *buf)
  74 {
  75         if (!buf || !buf->is_overlapped)
  76                 return NULL;
  77         return EVUTIL_UPCAST(buf, struct evbuffer_overlapped, buffer);
  78 }
  79 
  80 /** Unpin all the chains noted as pinned in 'eo'. */
  81 static void
  82 pin_release(struct evbuffer_overlapped *eo, unsigned flag)
  83 {
  84         int i;
  85         struct evbuffer_chain *next, *chain = eo->first_pinned;
  86 
  87         for (i = 0; i < eo->n_buffers; ++i) {
  88                 EVUTIL_ASSERT(chain);
  89                 next = chain->next;
  90                 _evbuffer_chain_unpin(chain, flag);
  91                 chain = next;
  92         }
  93 }
  94 
  95 void
  96 evbuffer_commit_read(struct evbuffer *evbuf, ev_ssize_t nBytes)
  97 {
  98         struct evbuffer_overlapped *buf = upcast_evbuffer(evbuf);
  99         struct evbuffer_chain **chainp;
 100         size_t remaining, len;
 101         unsigned i;
 102 
 103         EVBUFFER_LOCK(evbuf);
 104         EVUTIL_ASSERT(buf->read_in_progress && !buf->write_in_progress);
 105         EVUTIL_ASSERT(nBytes >= 0); /* XXXX Can this be false? */
 106 
 107         evbuffer_unfreeze(evbuf, 0);
 108 
 109         chainp = evbuf->last_with_datap;
 110         if (!((*chainp)->flags & EVBUFFER_MEM_PINNED_R))
 111                 chainp = &(*chainp)->next;
 112         remaining = nBytes;
 113         for (i = 0; remaining > 0 && i < (unsigned)buf->n_buffers; ++i) {
 114                 EVUTIL_ASSERT(*chainp);
 115                 len = buf->buffers[i].len;
 116                 if (remaining < len)
 117                         len = remaining;
 118                 (*chainp)->off += len;
 119                 evbuf->last_with_datap = chainp;
 120                 remaining -= len;
 121                 chainp = &(*chainp)->next;
 122         }
 123 
 124         pin_release(buf, EVBUFFER_MEM_PINNED_R);
 125 
 126         buf->read_in_progress = 0;
 127 
 128         evbuf->total_len += nBytes;
 129         evbuf->n_add_for_cb += nBytes;
 130 
 131         evbuffer_invoke_callbacks(evbuf);
 132 
 133         _evbuffer_decref_and_unlock(evbuf);
 134 }
 135 
 136 void
 137 evbuffer_commit_write(struct evbuffer *evbuf, ev_ssize_t nBytes)
 138 {
 139         struct evbuffer_overlapped *buf = upcast_evbuffer(evbuf);
 140 
 141         EVBUFFER_LOCK(evbuf);
 142         EVUTIL_ASSERT(buf->write_in_progress && !buf->read_in_progress);
 143         evbuffer_unfreeze(evbuf, 1);
 144         evbuffer_drain(evbuf, nBytes);
 145         pin_release(buf,EVBUFFER_MEM_PINNED_W);
 146         buf->write_in_progress = 0;
 147         _evbuffer_decref_and_unlock(evbuf);
 148 }
 149 
 150 struct evbuffer *
 151 evbuffer_overlapped_new(evutil_socket_t fd)
 152 {
 153         struct evbuffer_overlapped *evo;
 154 
 155         evo = mm_calloc(1, sizeof(struct evbuffer_overlapped));
 156         if (!evo)
 157                 return NULL;
 158 
 159         TAILQ_INIT(&evo->buffer.callbacks);
 160         evo->buffer.refcnt = 1;
 161         evo->buffer.last_with_datap = &evo->buffer.first;
 162 
 163         evo->buffer.is_overlapped = 1;
 164         evo->fd = fd;
 165 
 166         return &evo->buffer;
 167 }
 168 
 169 int
 170 evbuffer_launch_write(struct evbuffer *buf, ev_ssize_t at_most,
 171                 struct event_overlapped *ol)
 172 {
 173         struct evbuffer_overlapped *buf_o = upcast_evbuffer(buf);
 174         int r = -1;
 175         int i;
 176         struct evbuffer_chain *chain;
 177         DWORD bytesSent;
 178 
 179         if (!buf) {
 180                 /* No buffer, or it isn't overlapped */
 181                 return -1;
 182         }
 183 
 184         EVBUFFER_LOCK(buf);
 185         EVUTIL_ASSERT(!buf_o->read_in_progress);
 186         if (buf->freeze_start || buf_o->write_in_progress)
 187                 goto done;
 188         if (!buf->total_len) {
 189                 /* Nothing to write */
 190                 r = 0;
 191                 goto done;
 192         } else if (at_most < 0 || (size_t)at_most > buf->total_len) {
 193                 at_most = buf->total_len;
 194         }
 195         evbuffer_freeze(buf, 1);
 196 
 197         buf_o->first_pinned = NULL;
 198         buf_o->n_buffers = 0;
 199         memset(buf_o->buffers, 0, sizeof(buf_o->buffers));
 200 
 201         chain = buf_o->first_pinned = buf->first;
 202 
 203         for (i=0; i < MAX_WSABUFS && chain; ++i, chain=chain->next) {
 204                 WSABUF *b = &buf_o->buffers[i];
 205                 b->buf = (char*)( chain->buffer + chain->misalign );
 206                 _evbuffer_chain_pin(chain, EVBUFFER_MEM_PINNED_W);
 207 
 208                 if ((size_t)at_most > chain->off) {
 209                         /* XXXX Cast is safe for now, since win32 has no
 210                            mmaped chains.  But later, we need to have this
 211                            add more WSAbufs if chain->off is greater than
 212                            ULONG_MAX */
 213                         b->len = (unsigned long)chain->off;
 214                         at_most -= chain->off;
 215                 } else {
 216                         b->len = (unsigned long)at_most;
 217                         ++i;
 218                         break;
 219                 }
 220         }
 221 
 222         buf_o->n_buffers = i;
 223         _evbuffer_incref(buf);
 224         if (WSASend(buf_o->fd, buf_o->buffers, i, &bytesSent, 0,
 225                 &ol->overlapped, NULL)) {
 226                 int error = WSAGetLastError();
 227                 if (error != WSA_IO_PENDING) {
 228                         /* An actual error. */
 229                         pin_release(buf_o, EVBUFFER_MEM_PINNED_W);
 230                         evbuffer_unfreeze(buf, 1);
 231                         evbuffer_free(buf); /* decref */
 232                         goto done;
 233                 }
 234         }
 235 
 236         buf_o->write_in_progress = 1;
 237         r = 0;
 238 done:
 239         EVBUFFER_UNLOCK(buf);
 240         return r;
 241 }
 242 
 243 int
 244 evbuffer_launch_read(struct evbuffer *buf, size_t at_most,
 245                 struct event_overlapped *ol)
 246 {
 247         struct evbuffer_overlapped *buf_o = upcast_evbuffer(buf);
 248         int r = -1, i;
 249         int nvecs;
 250         int npin=0;
 251         struct evbuffer_chain *chain=NULL, **chainp;
 252         DWORD bytesRead;
 253         DWORD flags = 0;
 254         struct evbuffer_iovec vecs[MAX_WSABUFS];
 255 
 256         if (!buf_o)
 257                 return -1;
 258         EVBUFFER_LOCK(buf);
 259         EVUTIL_ASSERT(!buf_o->write_in_progress);
 260         if (buf->freeze_end || buf_o->read_in_progress)
 261                 goto done;
 262 
 263         buf_o->first_pinned = NULL;
 264         buf_o->n_buffers = 0;
 265         memset(buf_o->buffers, 0, sizeof(buf_o->buffers));
 266 
 267         if (_evbuffer_expand_fast(buf, at_most, MAX_WSABUFS) == -1)
 268                 goto done;
 269         evbuffer_freeze(buf, 0);
 270 
 271         nvecs = _evbuffer_read_setup_vecs(buf, at_most,
 272             vecs, MAX_WSABUFS, &chainp, 1);
 273         for (i=0;i<nvecs;++i) {
 274                 WSABUF_FROM_EVBUFFER_IOV(
 275                         &buf_o->buffers[i],
 276                         &vecs[i]);
 277         }
 278 
 279         buf_o->n_buffers = nvecs;
 280         buf_o->first_pinned = chain = *chainp;
 281 
 282         npin=0;
 283         for ( ; chain; chain = chain->next) {
 284                 _evbuffer_chain_pin(chain, EVBUFFER_MEM_PINNED_R);
 285                 ++npin;
 286         }
 287         EVUTIL_ASSERT(npin == nvecs);
 288 
 289         _evbuffer_incref(buf);
 290         if (WSARecv(buf_o->fd, buf_o->buffers, nvecs, &bytesRead, &flags,
 291                     &ol->overlapped, NULL)) {
 292                 int error = WSAGetLastError();
 293                 if (error != WSA_IO_PENDING) {
 294                         /* An actual error. */
 295                         pin_release(buf_o, EVBUFFER_MEM_PINNED_R);
 296                         evbuffer_unfreeze(buf, 0);
 297                         evbuffer_free(buf); /* decref */
 298                         goto done;
 299                 }
 300         }
 301 
 302         buf_o->read_in_progress = 1;
 303         r = 0;
 304 done:
 305         EVBUFFER_UNLOCK(buf);
 306         return r;
 307 }
 308 
 309 evutil_socket_t
 310 _evbuffer_overlapped_get_fd(struct evbuffer *buf)
 311 {
 312         struct evbuffer_overlapped *buf_o = upcast_evbuffer(buf);
 313         return buf_o ? buf_o->fd : -1;
 314 }
 315 
 316 void
 317 _evbuffer_overlapped_set_fd(struct evbuffer *buf, evutil_socket_t fd)
 318 {
 319         struct evbuffer_overlapped *buf_o = upcast_evbuffer(buf);
 320         EVBUFFER_LOCK(buf);
 321         /* XXX is this right?, should it cancel current I/O operations? */
 322         if (buf_o)
 323                 buf_o->fd = fd;
 324         EVBUFFER_UNLOCK(buf);
 325 }

/* [<][>][^][v][top][bottom][index][help] */