root/opal/mca/event/libevent2022/libevent/evrpc.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. evrpc_init
  2. evrpc_free
  3. evrpc_add_hook
  4. evrpc_remove_hook_internal
  5. evrpc_remove_hook
  6. evrpc_process_hooks
  7. evrpc_construct_uri
  8. evrpc_register_rpc
  9. evrpc_unregister_rpc
  10. evrpc_request_cb
  11. evrpc_request_cb_closure
  12. evrpc_reqstate_free
  13. evrpc_request_done
  14. evrpc_get_request
  15. evrpc_get_reply
  16. evrpc_request_done_closure
  17. evrpc_pool_new
  18. evrpc_request_wrapper_free
  19. evrpc_pool_free
  20. evrpc_pool_add_connection
  21. evrpc_pool_remove_connection
  22. evrpc_pool_set_timeout
  23. evrpc_pool_find_connection
  24. evrpc_schedule_request
  25. evrpc_schedule_request_closure
  26. evrpc_pause_request
  27. evrpc_resume_request
  28. evrpc_make_request
  29. evrpc_make_request_ctx
  30. evrpc_reply_done
  31. evrpc_reply_done_closure
  32. evrpc_pool_schedule
  33. evrpc_request_timeout
  34. evrpc_meta_data_free
  35. evrpc_hook_meta_new
  36. evrpc_hook_associate_meta
  37. evrpc_hook_context_free
  38. evrpc_hook_add_meta
  39. evrpc_hook_find_meta
  40. evrpc_hook_get_connection
  41. evrpc_send_request_generic
  42. evrpc_register_object
  43. evrpc_register_generic
  44. evrpc_request_get_pool
  45. evrpc_request_set_pool
  46. evrpc_request_set_cb

   1 /*
   2  * Copyright (c) 2000-2007 Niels Provos <provos@citi.umich.edu>
   3  * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
   4  *
   5  * Redistribution and use in source and binary forms, with or without
   6  * modification, are permitted provided that the following conditions
   7  * are met:
   8  * 1. Redistributions of source code must retain the above copyright
   9  *    notice, this list of conditions and the following disclaimer.
  10  * 2. Redistributions in binary form must reproduce the above copyright
  11  *    notice, this list of conditions and the following disclaimer in the
  12  *    documentation and/or other materials provided with the distribution.
  13  * 3. The name of the author may not be used to endorse or promote products
  14  *    derived from this software without specific prior written permission.
  15  *
  16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
  17  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
  18  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
  19  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
  20  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
  21  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  22  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  23  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  24  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
  25  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  26  */
  27 #include "event2/event-config.h"
  28 
  29 #ifdef WIN32
  30 #define WIN32_LEAN_AND_MEAN
  31 #include <winsock2.h>
  32 #include <windows.h>
  33 #undef WIN32_LEAN_AND_MEAN
  34 #endif
  35 
  36 #include <sys/types.h>
  37 #ifndef WIN32
  38 #include <sys/socket.h>
  39 #endif
  40 #ifdef _EVENT_HAVE_SYS_TIME_H
  41 #include <sys/time.h>
  42 #endif
  43 #include <sys/queue.h>
  44 #include <stdio.h>
  45 #include <stdlib.h>
  46 #ifndef WIN32
  47 #include <unistd.h>
  48 #endif
  49 #include <errno.h>
  50 #include <signal.h>
  51 #include <string.h>
  52 
  53 #include <sys/queue.h>
  54 
  55 #include "event2/event.h"
  56 #include "event2/event_struct.h"
  57 #include "event2/rpc.h"
  58 #include "event2/rpc_struct.h"
  59 #include "evrpc-internal.h"
  60 #include "event2/http.h"
  61 #include "event2/buffer.h"
  62 #include "event2/tag.h"
  63 #include "event2/http_struct.h"
  64 #include "event2/http_compat.h"
  65 #include "event2/util.h"
  66 #include "util-internal.h"
  67 #include "log-internal.h"
  68 #include "mm-internal.h"
  69 
  70 struct evrpc_base *
  71 evrpc_init(struct evhttp *http_server)
  72 {
  73         struct evrpc_base* base = mm_calloc(1, sizeof(struct evrpc_base));
  74         if (base == NULL)
  75                 return (NULL);
  76 
  77         /* we rely on the tagging sub system */
  78         evtag_init();
  79 
  80         TAILQ_INIT(&base->registered_rpcs);
  81         TAILQ_INIT(&base->input_hooks);
  82         TAILQ_INIT(&base->output_hooks);
  83 
  84         TAILQ_INIT(&base->paused_requests);
  85 
  86         base->http_server = http_server;
  87 
  88         return (base);
  89 }
  90 
  91 void
  92 evrpc_free(struct evrpc_base *base)
  93 {
  94         struct evrpc *rpc;
  95         struct evrpc_hook *hook;
  96         struct evrpc_hook_ctx *pause;
  97         int r;
  98 
  99         while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) {
 100                 r = evrpc_unregister_rpc(base, rpc->uri);
 101                 EVUTIL_ASSERT(r == 0);
 102         }
 103         while ((pause = TAILQ_FIRST(&base->paused_requests)) != NULL) {
 104                 TAILQ_REMOVE(&base->paused_requests, pause, next);
 105                 mm_free(pause);
 106         }
 107         while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) {
 108                 r = evrpc_remove_hook(base, EVRPC_INPUT, hook);
 109                 EVUTIL_ASSERT(r);
 110         }
 111         while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) {
 112                 r = evrpc_remove_hook(base, EVRPC_OUTPUT, hook);
 113                 EVUTIL_ASSERT(r);
 114         }
 115         mm_free(base);
 116 }
 117 
 118 void *
 119 evrpc_add_hook(void *vbase,
 120     enum EVRPC_HOOK_TYPE hook_type,
 121     int (*cb)(void *, struct evhttp_request *, struct evbuffer *, void *),
 122     void *cb_arg)
 123 {
 124         struct _evrpc_hooks *base = vbase;
 125         struct evrpc_hook_list *head = NULL;
 126         struct evrpc_hook *hook = NULL;
 127         switch (hook_type) {
 128         case EVRPC_INPUT:
 129                 head = &base->in_hooks;
 130                 break;
 131         case EVRPC_OUTPUT:
 132                 head = &base->out_hooks;
 133                 break;
 134         default:
 135                 EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
 136         }
 137 
 138         hook = mm_calloc(1, sizeof(struct evrpc_hook));
 139         EVUTIL_ASSERT(hook != NULL);
 140 
 141         hook->process = cb;
 142         hook->process_arg = cb_arg;
 143         TAILQ_INSERT_TAIL(head, hook, next);
 144 
 145         return (hook);
 146 }
 147 
 148 static int
 149 evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle)
 150 {
 151         struct evrpc_hook *hook = NULL;
 152         TAILQ_FOREACH(hook, head, next) {
 153                 if (hook == handle) {
 154                         TAILQ_REMOVE(head, hook, next);
 155                         mm_free(hook);
 156                         return (1);
 157                 }
 158         }
 159 
 160         return (0);
 161 }
 162 
 163 /*
 164  * remove the hook specified by the handle
 165  */
 166 
 167 int
 168 evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle)
 169 {
 170         struct _evrpc_hooks *base = vbase;
 171         struct evrpc_hook_list *head = NULL;
 172         switch (hook_type) {
 173         case EVRPC_INPUT:
 174                 head = &base->in_hooks;
 175                 break;
 176         case EVRPC_OUTPUT:
 177                 head = &base->out_hooks;
 178                 break;
 179         default:
 180                 EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
 181         }
 182 
 183         return (evrpc_remove_hook_internal(head, handle));
 184 }
 185 
 186 static int
 187 evrpc_process_hooks(struct evrpc_hook_list *head, void *ctx,
 188     struct evhttp_request *req, struct evbuffer *evbuf)
 189 {
 190         struct evrpc_hook *hook;
 191         TAILQ_FOREACH(hook, head, next) {
 192                 int res = hook->process(ctx, req, evbuf, hook->process_arg);
 193                 if (res != EVRPC_CONTINUE)
 194                         return (res);
 195         }
 196 
 197         return (EVRPC_CONTINUE);
 198 }
 199 
 200 static void evrpc_pool_schedule(struct evrpc_pool *pool);
 201 static void evrpc_request_cb(struct evhttp_request *, void *);
 202 
 203 /*
 204  * Registers a new RPC with the HTTP server.   The evrpc object is expected
 205  * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn
 206  * calls this function.
 207  */
 208 
 209 static char *
 210 evrpc_construct_uri(const char *uri)
 211 {
 212         char *constructed_uri;
 213         size_t constructed_uri_len;
 214 
 215         constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1;
 216         if ((constructed_uri = mm_malloc(constructed_uri_len)) == NULL)
 217                 event_err(1, "%s: failed to register rpc at %s",
 218                     __func__, uri);
 219         memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX));
 220         memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri));
 221         constructed_uri[constructed_uri_len - 1] = '\0';
 222 
 223         return (constructed_uri);
 224 }
 225 
 226 int
 227 evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc,
 228     void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg)
 229 {
 230         char *constructed_uri = evrpc_construct_uri(rpc->uri);
 231 
 232         rpc->base = base;
 233         rpc->cb = cb;
 234         rpc->cb_arg = cb_arg;
 235 
 236         TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next);
 237 
 238         evhttp_set_cb(base->http_server,
 239             constructed_uri,
 240             evrpc_request_cb,
 241             rpc);
 242 
 243         mm_free(constructed_uri);
 244 
 245         return (0);
 246 }
 247 
 248 int
 249 evrpc_unregister_rpc(struct evrpc_base *base, const char *name)
 250 {
 251         char *registered_uri = NULL;
 252         struct evrpc *rpc;
 253         int r;
 254 
 255         /* find the right rpc; linear search might be slow */
 256         TAILQ_FOREACH(rpc, &base->registered_rpcs, next) {
 257                 if (strcmp(rpc->uri, name) == 0)
 258                         break;
 259         }
 260         if (rpc == NULL) {
 261                 /* We did not find an RPC with this name */
 262                 return (-1);
 263         }
 264         TAILQ_REMOVE(&base->registered_rpcs, rpc, next);
 265 
 266         registered_uri = evrpc_construct_uri(name);
 267 
 268         /* remove the http server callback */
 269         r = evhttp_del_cb(base->http_server, registered_uri);
 270         EVUTIL_ASSERT(r == 0);
 271 
 272         mm_free(registered_uri);
 273 
 274         mm_free((char *)rpc->uri);
 275         mm_free(rpc);
 276         return (0);
 277 }
 278 
 279 static int evrpc_pause_request(void *vbase, void *ctx,
 280     void (*cb)(void *, enum EVRPC_HOOK_RESULT));
 281 static void evrpc_request_cb_closure(void *, enum EVRPC_HOOK_RESULT);
 282 
 283 static void
 284 evrpc_request_cb(struct evhttp_request *req, void *arg)
 285 {
 286         struct evrpc *rpc = arg;
 287         struct evrpc_req_generic *rpc_state = NULL;
 288 
 289         /* let's verify the outside parameters */
 290         if (req->type != EVHTTP_REQ_POST ||
 291             evbuffer_get_length(req->input_buffer) <= 0)
 292                 goto error;
 293 
 294         rpc_state = mm_calloc(1, sizeof(struct evrpc_req_generic));
 295         if (rpc_state == NULL)
 296                 goto error;
 297         rpc_state->rpc = rpc;
 298         rpc_state->http_req = req;
 299         rpc_state->rpc_data = NULL;
 300 
 301         if (TAILQ_FIRST(&rpc->base->input_hooks) != NULL) {
 302                 int hook_res;
 303 
 304                 evrpc_hook_associate_meta(&rpc_state->hook_meta, req->evcon);
 305 
 306                 /*
 307                  * allow hooks to modify the outgoing request
 308                  */
 309                 hook_res = evrpc_process_hooks(&rpc->base->input_hooks,
 310                     rpc_state, req, req->input_buffer);
 311                 switch (hook_res) {
 312                 case EVRPC_TERMINATE:
 313                         goto error;
 314                 case EVRPC_PAUSE:
 315                         evrpc_pause_request(rpc->base, rpc_state,
 316                             evrpc_request_cb_closure);
 317                         return;
 318                 case EVRPC_CONTINUE:
 319                         break;
 320                 default:
 321                         EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
 322                             hook_res == EVRPC_CONTINUE ||
 323                             hook_res == EVRPC_PAUSE);
 324                 }
 325         }
 326 
 327         evrpc_request_cb_closure(rpc_state, EVRPC_CONTINUE);
 328         return;
 329 
 330 error:
 331         if (rpc_state != NULL)
 332                 evrpc_reqstate_free(rpc_state);
 333         evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
 334         return;
 335 }
 336 
 337 static void
 338 evrpc_request_cb_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
 339 {
 340         struct evrpc_req_generic *rpc_state = arg;
 341         struct evrpc *rpc;
 342         struct evhttp_request *req;
 343 
 344         EVUTIL_ASSERT(rpc_state);
 345         rpc = rpc_state->rpc;
 346         req = rpc_state->http_req;
 347 
 348         if (hook_res == EVRPC_TERMINATE)
 349                 goto error;
 350 
 351         /* let's check that we can parse the request */
 352         rpc_state->request = rpc->request_new(rpc->request_new_arg);
 353         if (rpc_state->request == NULL)
 354                 goto error;
 355 
 356         if (rpc->request_unmarshal(
 357                     rpc_state->request, req->input_buffer) == -1) {
 358                 /* we failed to parse the request; that's a bummer */
 359                 goto error;
 360         }
 361 
 362         /* at this point, we have a well formed request, prepare the reply */
 363 
 364         rpc_state->reply = rpc->reply_new(rpc->reply_new_arg);
 365         if (rpc_state->reply == NULL)
 366                 goto error;
 367 
 368         /* give the rpc to the user; they can deal with it */
 369         rpc->cb(rpc_state, rpc->cb_arg);
 370 
 371         return;
 372 
 373 error:
 374         if (rpc_state != NULL)
 375                 evrpc_reqstate_free(rpc_state);
 376         evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
 377         return;
 378 }
 379 
 380 
 381 void
 382 evrpc_reqstate_free(struct evrpc_req_generic* rpc_state)
 383 {
 384         struct evrpc *rpc;
 385         EVUTIL_ASSERT(rpc_state != NULL);
 386         rpc = rpc_state->rpc;
 387 
 388         /* clean up all memory */
 389         if (rpc_state->hook_meta != NULL)
 390                 evrpc_hook_context_free(rpc_state->hook_meta);
 391         if (rpc_state->request != NULL)
 392                 rpc->request_free(rpc_state->request);
 393         if (rpc_state->reply != NULL)
 394                 rpc->reply_free(rpc_state->reply);
 395         if (rpc_state->rpc_data != NULL)
 396                 evbuffer_free(rpc_state->rpc_data);
 397         mm_free(rpc_state);
 398 }
 399 
 400 static void
 401 evrpc_request_done_closure(void *, enum EVRPC_HOOK_RESULT);
 402 
 403 void
 404 evrpc_request_done(struct evrpc_req_generic *rpc_state)
 405 {
 406         struct evhttp_request *req;
 407         struct evrpc *rpc;
 408 
 409         EVUTIL_ASSERT(rpc_state);
 410 
 411         req = rpc_state->http_req;
 412         rpc = rpc_state->rpc;
 413 
 414         if (rpc->reply_complete(rpc_state->reply) == -1) {
 415                 /* the reply was not completely filled in.  error out */
 416                 goto error;
 417         }
 418 
 419         if ((rpc_state->rpc_data = evbuffer_new()) == NULL) {
 420                 /* out of memory */
 421                 goto error;
 422         }
 423 
 424         /* serialize the reply */
 425         rpc->reply_marshal(rpc_state->rpc_data, rpc_state->reply);
 426 
 427         if (TAILQ_FIRST(&rpc->base->output_hooks) != NULL) {
 428                 int hook_res;
 429 
 430                 evrpc_hook_associate_meta(&rpc_state->hook_meta, req->evcon);
 431 
 432                 /* do hook based tweaks to the request */
 433                 hook_res = evrpc_process_hooks(&rpc->base->output_hooks,
 434                     rpc_state, req, rpc_state->rpc_data);
 435                 switch (hook_res) {
 436                 case EVRPC_TERMINATE:
 437                         goto error;
 438                 case EVRPC_PAUSE:
 439                         if (evrpc_pause_request(rpc->base, rpc_state,
 440                                 evrpc_request_done_closure) == -1)
 441                                 goto error;
 442                         return;
 443                 case EVRPC_CONTINUE:
 444                         break;
 445                 default:
 446                         EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
 447                             hook_res == EVRPC_CONTINUE ||
 448                             hook_res == EVRPC_PAUSE);
 449                 }
 450         }
 451 
 452         evrpc_request_done_closure(rpc_state, EVRPC_CONTINUE);
 453         return;
 454 
 455 error:
 456         if (rpc_state != NULL)
 457                 evrpc_reqstate_free(rpc_state);
 458         evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
 459         return;
 460 }
 461 
 462 void *
 463 evrpc_get_request(struct evrpc_req_generic *req)
 464 {
 465         return req->request;
 466 }
 467 
 468 void *
 469 evrpc_get_reply(struct evrpc_req_generic *req)
 470 {
 471         return req->reply;
 472 }
 473 
 474 static void
 475 evrpc_request_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
 476 {
 477         struct evrpc_req_generic *rpc_state = arg;
 478         struct evhttp_request *req;
 479         EVUTIL_ASSERT(rpc_state);
 480         req = rpc_state->http_req;
 481 
 482         if (hook_res == EVRPC_TERMINATE)
 483                 goto error;
 484 
 485         /* on success, we are going to transmit marshaled binary data */
 486         if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) {
 487                 evhttp_add_header(req->output_headers,
 488                     "Content-Type", "application/octet-stream");
 489         }
 490         evhttp_send_reply(req, HTTP_OK, "OK", rpc_state->rpc_data);
 491 
 492         evrpc_reqstate_free(rpc_state);
 493 
 494         return;
 495 
 496 error:
 497         if (rpc_state != NULL)
 498                 evrpc_reqstate_free(rpc_state);
 499         evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
 500         return;
 501 }
 502 
 503 
 504 /* Client implementation of RPC site */
 505 
 506 static int evrpc_schedule_request(struct evhttp_connection *connection,
 507     struct evrpc_request_wrapper *ctx);
 508 
 509 struct evrpc_pool *
 510 evrpc_pool_new(struct event_base *base)
 511 {
 512         struct evrpc_pool *pool = mm_calloc(1, sizeof(struct evrpc_pool));
 513         if (pool == NULL)
 514                 return (NULL);
 515 
 516         TAILQ_INIT(&pool->connections);
 517         TAILQ_INIT(&pool->requests);
 518 
 519         TAILQ_INIT(&pool->paused_requests);
 520 
 521         TAILQ_INIT(&pool->input_hooks);
 522         TAILQ_INIT(&pool->output_hooks);
 523 
 524         pool->base = base;
 525         pool->timeout = -1;
 526 
 527         return (pool);
 528 }
 529 
 530 static void
 531 evrpc_request_wrapper_free(struct evrpc_request_wrapper *request)
 532 {
 533         if (request->hook_meta != NULL)
 534                 evrpc_hook_context_free(request->hook_meta);
 535         mm_free(request->name);
 536         mm_free(request);
 537 }
 538 
 539 void
 540 evrpc_pool_free(struct evrpc_pool *pool)
 541 {
 542         struct evhttp_connection *connection;
 543         struct evrpc_request_wrapper *request;
 544         struct evrpc_hook_ctx *pause;
 545         struct evrpc_hook *hook;
 546         int r;
 547 
 548         while ((request = TAILQ_FIRST(&pool->requests)) != NULL) {
 549                 TAILQ_REMOVE(&pool->requests, request, next);
 550                 evrpc_request_wrapper_free(request);
 551         }
 552 
 553         while ((pause = TAILQ_FIRST(&pool->paused_requests)) != NULL) {
 554                 TAILQ_REMOVE(&pool->paused_requests, pause, next);
 555                 mm_free(pause);
 556         }
 557 
 558         while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) {
 559                 TAILQ_REMOVE(&pool->connections, connection, next);
 560                 evhttp_connection_free(connection);
 561         }
 562 
 563         while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) {
 564                 r = evrpc_remove_hook(pool, EVRPC_INPUT, hook);
 565                 EVUTIL_ASSERT(r);
 566         }
 567 
 568         while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) {
 569                 r = evrpc_remove_hook(pool, EVRPC_OUTPUT, hook);
 570                 EVUTIL_ASSERT(r);
 571         }
 572 
 573         mm_free(pool);
 574 }
 575 
 576 /*
 577  * Add a connection to the RPC pool.   A request scheduled on the pool
 578  * may use any available connection.
 579  */
 580 
 581 void
 582 evrpc_pool_add_connection(struct evrpc_pool *pool,
 583     struct evhttp_connection *connection)
 584 {
 585         EVUTIL_ASSERT(connection->http_server == NULL);
 586         TAILQ_INSERT_TAIL(&pool->connections, connection, next);
 587 
 588         /*
 589          * associate an event base with this connection
 590          */
 591         if (pool->base != NULL)
 592                 evhttp_connection_set_base(connection, pool->base);
 593 
 594         /*
 595          * unless a timeout was specifically set for a connection,
 596          * the connection inherits the timeout from the pool.
 597          */
 598         if (connection->timeout == -1)
 599                 connection->timeout = pool->timeout;
 600 
 601         /*
 602          * if we have any requests pending, schedule them with the new
 603          * connections.
 604          */
 605 
 606         if (TAILQ_FIRST(&pool->requests) != NULL) {
 607                 struct evrpc_request_wrapper *request =
 608                     TAILQ_FIRST(&pool->requests);
 609                 TAILQ_REMOVE(&pool->requests, request, next);
 610                 evrpc_schedule_request(connection, request);
 611         }
 612 }
 613 
 614 void
 615 evrpc_pool_remove_connection(struct evrpc_pool *pool,
 616     struct evhttp_connection *connection)
 617 {
 618         TAILQ_REMOVE(&pool->connections, connection, next);
 619 }
 620 
 621 void
 622 evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs)
 623 {
 624         struct evhttp_connection *evcon;
 625         TAILQ_FOREACH(evcon, &pool->connections, next) {
 626                 evcon->timeout = timeout_in_secs;
 627         }
 628         pool->timeout = timeout_in_secs;
 629 }
 630 
 631 
 632 static void evrpc_reply_done(struct evhttp_request *, void *);
 633 static void evrpc_request_timeout(evutil_socket_t, short, void *);
 634 
 635 /*
 636  * Finds a connection object associated with the pool that is currently
 637  * idle and can be used to make a request.
 638  */
 639 static struct evhttp_connection *
 640 evrpc_pool_find_connection(struct evrpc_pool *pool)
 641 {
 642         struct evhttp_connection *connection;
 643         TAILQ_FOREACH(connection, &pool->connections, next) {
 644                 if (TAILQ_FIRST(&connection->requests) == NULL)
 645                         return (connection);
 646         }
 647 
 648         return (NULL);
 649 }
 650 
 651 /*
 652  * Prototypes responsible for evrpc scheduling and hooking
 653  */
 654 
 655 static void evrpc_schedule_request_closure(void *ctx, enum EVRPC_HOOK_RESULT);
 656 
 657 /*
 658  * We assume that the ctx is no longer queued on the pool.
 659  */
 660 static int
 661 evrpc_schedule_request(struct evhttp_connection *connection,
 662     struct evrpc_request_wrapper *ctx)
 663 {
 664         struct evhttp_request *req = NULL;
 665         struct evrpc_pool *pool = ctx->pool;
 666         struct evrpc_status status;
 667 
 668         if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL)
 669                 goto error;
 670 
 671         /* serialize the request data into the output buffer */
 672         ctx->request_marshal(req->output_buffer, ctx->request);
 673 
 674         /* we need to know the connection that we might have to abort */
 675         ctx->evcon = connection;
 676 
 677         /* if we get paused we also need to know the request */
 678         ctx->req = req;
 679 
 680         if (TAILQ_FIRST(&pool->output_hooks) != NULL) {
 681                 int hook_res;
 682 
 683                 evrpc_hook_associate_meta(&ctx->hook_meta, connection);
 684 
 685                 /* apply hooks to the outgoing request */
 686                 hook_res = evrpc_process_hooks(&pool->output_hooks,
 687                     ctx, req, req->output_buffer);
 688 
 689                 switch (hook_res) {
 690                 case EVRPC_TERMINATE:
 691                         goto error;
 692                 case EVRPC_PAUSE:
 693                         /* we need to be explicitly resumed */
 694                         if (evrpc_pause_request(pool, ctx,
 695                                 evrpc_schedule_request_closure) == -1)
 696                                 goto error;
 697                         return (0);
 698                 case EVRPC_CONTINUE:
 699                         /* we can just continue */
 700                         break;
 701                 default:
 702                         EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
 703                             hook_res == EVRPC_CONTINUE ||
 704                             hook_res == EVRPC_PAUSE);
 705                 }
 706         }
 707 
 708         evrpc_schedule_request_closure(ctx, EVRPC_CONTINUE);
 709         return (0);
 710 
 711 error:
 712         memset(&status, 0, sizeof(status));
 713         status.error = EVRPC_STATUS_ERR_UNSTARTED;
 714         (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
 715         evrpc_request_wrapper_free(ctx);
 716         return (-1);
 717 }
 718 
 719 static void
 720 evrpc_schedule_request_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
 721 {
 722         struct evrpc_request_wrapper *ctx = arg;
 723         struct evhttp_connection *connection = ctx->evcon;
 724         struct evhttp_request *req = ctx->req;
 725         struct evrpc_pool *pool = ctx->pool;
 726         struct evrpc_status status;
 727         char *uri = NULL;
 728         int res = 0;
 729 
 730         if (hook_res == EVRPC_TERMINATE)
 731                 goto error;
 732 
 733         uri = evrpc_construct_uri(ctx->name);
 734         if (uri == NULL)
 735                 goto error;
 736 
 737         if (pool->timeout > 0) {
 738                 /*
 739                  * a timeout after which the whole rpc is going to be aborted.
 740                  */
 741                 struct timeval tv;
 742                 evutil_timerclear(&tv);
 743                 tv.tv_sec = pool->timeout;
 744                 evtimer_add(&ctx->ev_timeout, &tv);
 745         }
 746 
 747         /* start the request over the connection */
 748         res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri);
 749         mm_free(uri);
 750 
 751         if (res == -1)
 752                 goto error;
 753 
 754         return;
 755 
 756 error:
 757         memset(&status, 0, sizeof(status));
 758         status.error = EVRPC_STATUS_ERR_UNSTARTED;
 759         (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
 760         evrpc_request_wrapper_free(ctx);
 761 }
 762 
 763 /* we just queue the paused request on the pool under the req object */
 764 static int
 765 evrpc_pause_request(void *vbase, void *ctx,
 766     void (*cb)(void *, enum EVRPC_HOOK_RESULT))
 767 {
 768         struct _evrpc_hooks *base = vbase;
 769         struct evrpc_hook_ctx *pause = mm_malloc(sizeof(*pause));
 770         if (pause == NULL)
 771                 return (-1);
 772 
 773         pause->ctx = ctx;
 774         pause->cb = cb;
 775 
 776         TAILQ_INSERT_TAIL(&base->pause_requests, pause, next);
 777         return (0);
 778 }
 779 
 780 int
 781 evrpc_resume_request(void *vbase, void *ctx, enum EVRPC_HOOK_RESULT res)
 782 {
 783         struct _evrpc_hooks *base = vbase;
 784         struct evrpc_pause_list *head = &base->pause_requests;
 785         struct evrpc_hook_ctx *pause;
 786 
 787         TAILQ_FOREACH(pause, head, next) {
 788                 if (pause->ctx == ctx)
 789                         break;
 790         }
 791 
 792         if (pause == NULL)
 793                 return (-1);
 794 
 795         (*pause->cb)(pause->ctx, res);
 796         TAILQ_REMOVE(head, pause, next);
 797         mm_free(pause);
 798         return (0);
 799 }
 800 
 801 int
 802 evrpc_make_request(struct evrpc_request_wrapper *ctx)
 803 {
 804         struct evrpc_pool *pool = ctx->pool;
 805 
 806         /* initialize the event structure for this rpc */
 807         evtimer_assign(&ctx->ev_timeout, pool->base, evrpc_request_timeout, ctx);
 808 
 809         /* we better have some available connections on the pool */
 810         EVUTIL_ASSERT(TAILQ_FIRST(&pool->connections) != NULL);
 811 
 812         /*
 813          * if no connection is available, we queue the request on the pool,
 814          * the next time a connection is empty, the rpc will be send on that.
 815          */
 816         TAILQ_INSERT_TAIL(&pool->requests, ctx, next);
 817 
 818         evrpc_pool_schedule(pool);
 819 
 820         return (0);
 821 }
 822 
 823 
 824 struct evrpc_request_wrapper *
 825 evrpc_make_request_ctx(
 826         struct evrpc_pool *pool, void *request, void *reply,
 827         const char *rpcname,
 828         void (*req_marshal)(struct evbuffer*, void *),
 829         void (*rpl_clear)(void *),
 830         int (*rpl_unmarshal)(void *, struct evbuffer *),
 831         void (*cb)(struct evrpc_status *, void *, void *, void *),
 832         void *cbarg)
 833 {
 834         struct evrpc_request_wrapper *ctx = (struct evrpc_request_wrapper *)
 835             mm_malloc(sizeof(struct evrpc_request_wrapper));
 836         if (ctx == NULL)
 837                 return (NULL);
 838 
 839         ctx->pool = pool;
 840         ctx->hook_meta = NULL;
 841         ctx->evcon = NULL;
 842         ctx->name = mm_strdup(rpcname);
 843         if (ctx->name == NULL) {
 844                 mm_free(ctx);
 845                 return (NULL);
 846         }
 847         ctx->cb = cb;
 848         ctx->cb_arg = cbarg;
 849         ctx->request = request;
 850         ctx->reply = reply;
 851         ctx->request_marshal = req_marshal;
 852         ctx->reply_clear = rpl_clear;
 853         ctx->reply_unmarshal = rpl_unmarshal;
 854 
 855         return (ctx);
 856 }
 857 
 858 static void
 859 evrpc_reply_done_closure(void *, enum EVRPC_HOOK_RESULT);
 860 
 861 static void
 862 evrpc_reply_done(struct evhttp_request *req, void *arg)
 863 {
 864         struct evrpc_request_wrapper *ctx = arg;
 865         struct evrpc_pool *pool = ctx->pool;
 866         int hook_res = EVRPC_CONTINUE;
 867 
 868         /* cancel any timeout we might have scheduled */
 869         event_del(&ctx->ev_timeout);
 870 
 871         ctx->req = req;
 872 
 873         /* we need to get the reply now */
 874         if (req == NULL) {
 875                 evrpc_reply_done_closure(ctx, EVRPC_CONTINUE);
 876                 return;
 877         }
 878 
 879         if (TAILQ_FIRST(&pool->input_hooks) != NULL) {
 880                 evrpc_hook_associate_meta(&ctx->hook_meta, ctx->evcon);
 881 
 882                 /* apply hooks to the incoming request */
 883                 hook_res = evrpc_process_hooks(&pool->input_hooks,
 884                     ctx, req, req->input_buffer);
 885 
 886                 switch (hook_res) {
 887                 case EVRPC_TERMINATE:
 888                 case EVRPC_CONTINUE:
 889                         break;
 890                 case EVRPC_PAUSE:
 891                         /*
 892                          * if we get paused we also need to know the
 893                          * request.  unfortunately, the underlying
 894                          * layer is going to free it.  we need to
 895                          * request ownership explicitly
 896                          */
 897                         if (req != NULL)
 898                                 evhttp_request_own(req);
 899 
 900                         evrpc_pause_request(pool, ctx,
 901                             evrpc_reply_done_closure);
 902                         return;
 903                 default:
 904                         EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
 905                             hook_res == EVRPC_CONTINUE ||
 906                             hook_res == EVRPC_PAUSE);
 907                 }
 908         }
 909 
 910         evrpc_reply_done_closure(ctx, hook_res);
 911 
 912         /* http request is being freed by underlying layer */
 913 }
 914 
 915 static void
 916 evrpc_reply_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
 917 {
 918         struct evrpc_request_wrapper *ctx = arg;
 919         struct evhttp_request *req = ctx->req;
 920         struct evrpc_pool *pool = ctx->pool;
 921         struct evrpc_status status;
 922         int res = -1;
 923 
 924         memset(&status, 0, sizeof(status));
 925         status.http_req = req;
 926 
 927         /* we need to get the reply now */
 928         if (req == NULL) {
 929                 status.error = EVRPC_STATUS_ERR_TIMEOUT;
 930         } else if (hook_res == EVRPC_TERMINATE) {
 931                 status.error = EVRPC_STATUS_ERR_HOOKABORTED;
 932         } else {
 933                 res = ctx->reply_unmarshal(ctx->reply, req->input_buffer);
 934                 if (res == -1)
 935                         status.error = EVRPC_STATUS_ERR_BADPAYLOAD;
 936         }
 937 
 938         if (res == -1) {
 939                 /* clear everything that we might have written previously */
 940                 ctx->reply_clear(ctx->reply);
 941         }
 942 
 943         (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
 944 
 945         evrpc_request_wrapper_free(ctx);
 946 
 947         /* the http layer owned the original request structure, but if we
 948          * got paused, we asked for ownership and need to free it here. */
 949         if (req != NULL && evhttp_request_is_owned(req))
 950                 evhttp_request_free(req);
 951 
 952         /* see if we can schedule another request */
 953         evrpc_pool_schedule(pool);
 954 }
 955 
 956 static void
 957 evrpc_pool_schedule(struct evrpc_pool *pool)
 958 {
 959         struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests);
 960         struct evhttp_connection *evcon;
 961 
 962         /* if no requests are pending, we have no work */
 963         if (ctx == NULL)
 964                 return;
 965 
 966         if ((evcon = evrpc_pool_find_connection(pool)) != NULL) {
 967                 TAILQ_REMOVE(&pool->requests, ctx, next);
 968                 evrpc_schedule_request(evcon, ctx);
 969         }
 970 }
 971 
 972 static void
 973 evrpc_request_timeout(evutil_socket_t fd, short what, void *arg)
 974 {
 975         struct evrpc_request_wrapper *ctx = arg;
 976         struct evhttp_connection *evcon = ctx->evcon;
 977         EVUTIL_ASSERT(evcon != NULL);
 978 
 979         evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT);
 980 }
 981 
 982 /*
 983  * frees potential meta data associated with a request.
 984  */
 985 
 986 static void
 987 evrpc_meta_data_free(struct evrpc_meta_list *meta_data)
 988 {
 989         struct evrpc_meta *entry;
 990         EVUTIL_ASSERT(meta_data != NULL);
 991 
 992         while ((entry = TAILQ_FIRST(meta_data)) != NULL) {
 993                 TAILQ_REMOVE(meta_data, entry, next);
 994                 mm_free(entry->key);
 995                 mm_free(entry->data);
 996                 mm_free(entry);
 997         }
 998 }
 999 
1000 static struct evrpc_hook_meta *
1001 evrpc_hook_meta_new(void)
1002 {
1003         struct evrpc_hook_meta *ctx;
1004         ctx = mm_malloc(sizeof(struct evrpc_hook_meta));
1005         EVUTIL_ASSERT(ctx != NULL);
1006 
1007         TAILQ_INIT(&ctx->meta_data);
1008         ctx->evcon = NULL;
1009 
1010         return (ctx);
1011 }
1012 
1013 static void
1014 evrpc_hook_associate_meta(struct evrpc_hook_meta **pctx,
1015     struct evhttp_connection *evcon)
1016 {
1017         struct evrpc_hook_meta *ctx = *pctx;
1018         if (ctx == NULL)
1019                 *pctx = ctx = evrpc_hook_meta_new();
1020         ctx->evcon = evcon;
1021 }
1022 
1023 static void
1024 evrpc_hook_context_free(struct evrpc_hook_meta *ctx)
1025 {
1026         evrpc_meta_data_free(&ctx->meta_data);
1027         mm_free(ctx);
1028 }
1029 
1030 /* Adds meta data */
1031 void
1032 evrpc_hook_add_meta(void *ctx, const char *key,
1033     const void *data, size_t data_size)
1034 {
1035         struct evrpc_request_wrapper *req = ctx;
1036         struct evrpc_hook_meta *store = NULL;
1037         struct evrpc_meta *meta = NULL;
1038 
1039         if ((store = req->hook_meta) == NULL)
1040                 store = req->hook_meta = evrpc_hook_meta_new();
1041 
1042         meta = mm_malloc(sizeof(struct evrpc_meta));
1043         EVUTIL_ASSERT(meta != NULL);
1044         meta->key = mm_strdup(key);
1045         EVUTIL_ASSERT(meta->key != NULL);
1046         meta->data_size = data_size;
1047         meta->data = mm_malloc(data_size);
1048         EVUTIL_ASSERT(meta->data != NULL);
1049         memcpy(meta->data, data, data_size);
1050 
1051         TAILQ_INSERT_TAIL(&store->meta_data, meta, next);
1052 }
1053 
1054 int
1055 evrpc_hook_find_meta(void *ctx, const char *key, void **data, size_t *data_size)
1056 {
1057         struct evrpc_request_wrapper *req = ctx;
1058         struct evrpc_meta *meta = NULL;
1059 
1060         if (req->hook_meta == NULL)
1061                 return (-1);
1062 
1063         TAILQ_FOREACH(meta, &req->hook_meta->meta_data, next) {
1064                 if (strcmp(meta->key, key) == 0) {
1065                         *data = meta->data;
1066                         *data_size = meta->data_size;
1067                         return (0);
1068                 }
1069         }
1070 
1071         return (-1);
1072 }
1073 
1074 struct evhttp_connection *
1075 evrpc_hook_get_connection(void *ctx)
1076 {
1077         struct evrpc_request_wrapper *req = ctx;
1078         return (req->hook_meta != NULL ? req->hook_meta->evcon : NULL);
1079 }
1080 
1081 int
1082 evrpc_send_request_generic(struct evrpc_pool *pool,
1083     void *request, void *reply,
1084     void (*cb)(struct evrpc_status *, void *, void *, void *),
1085     void *cb_arg,
1086     const char *rpcname,
1087     void (*req_marshal)(struct evbuffer *, void *),
1088     void (*rpl_clear)(void *),
1089     int (*rpl_unmarshal)(void *, struct evbuffer *))
1090 {
1091         struct evrpc_status status;
1092         struct evrpc_request_wrapper *ctx;
1093         ctx = evrpc_make_request_ctx(pool, request, reply,
1094             rpcname, req_marshal, rpl_clear, rpl_unmarshal, cb, cb_arg);
1095         if (ctx == NULL)
1096                 goto error;
1097         return (evrpc_make_request(ctx));
1098 error:
1099         memset(&status, 0, sizeof(status));
1100         status.error = EVRPC_STATUS_ERR_UNSTARTED;
1101         (*(cb))(&status, request, reply, cb_arg);
1102         return (-1);
1103 }
1104 
1105 /** Takes a request object and fills it in with the right magic */
1106 static struct evrpc *
1107 evrpc_register_object(const char *name,
1108     void *(*req_new)(void*), void *req_new_arg, void (*req_free)(void *),
1109     int (*req_unmarshal)(void *, struct evbuffer *),
1110     void *(*rpl_new)(void*), void *rpl_new_arg, void (*rpl_free)(void *),
1111     int (*rpl_complete)(void *),
1112     void (*rpl_marshal)(struct evbuffer *, void *))
1113 {
1114         struct evrpc* rpc = (struct evrpc *)mm_calloc(1, sizeof(struct evrpc));
1115         if (rpc == NULL)
1116                 return (NULL);
1117         rpc->uri = mm_strdup(name);
1118         if (rpc->uri == NULL) {
1119                 mm_free(rpc);
1120                 return (NULL);
1121         }
1122         rpc->request_new = req_new;
1123         rpc->request_new_arg = req_new_arg;
1124         rpc->request_free = req_free;
1125         rpc->request_unmarshal = req_unmarshal;
1126         rpc->reply_new = rpl_new;
1127         rpc->reply_new_arg = rpl_new_arg;
1128         rpc->reply_free = rpl_free;
1129         rpc->reply_complete = rpl_complete;
1130         rpc->reply_marshal = rpl_marshal;
1131         return (rpc);
1132 }
1133 
1134 int
1135 evrpc_register_generic(struct evrpc_base *base, const char *name,
1136     void (*callback)(struct evrpc_req_generic *, void *), void *cbarg,
1137     void *(*req_new)(void *), void *req_new_arg, void (*req_free)(void *),
1138     int (*req_unmarshal)(void *, struct evbuffer *),
1139     void *(*rpl_new)(void *), void *rpl_new_arg, void (*rpl_free)(void *),
1140     int (*rpl_complete)(void *),
1141     void (*rpl_marshal)(struct evbuffer *, void *))
1142 {
1143         struct evrpc* rpc =
1144             evrpc_register_object(name, req_new, req_new_arg, req_free, req_unmarshal,
1145                 rpl_new, rpl_new_arg, rpl_free, rpl_complete, rpl_marshal);
1146         if (rpc == NULL)
1147                 return (-1);
1148         evrpc_register_rpc(base, rpc,
1149             (void (*)(struct evrpc_req_generic*, void *))callback, cbarg);
1150         return (0);
1151 }
1152 
1153 /** accessors for obscure and undocumented functionality */
1154 struct evrpc_pool *
1155 evrpc_request_get_pool(struct evrpc_request_wrapper *ctx)
1156 {
1157         return (ctx->pool);
1158 }
1159 
1160 void
1161 evrpc_request_set_pool(struct evrpc_request_wrapper *ctx,
1162     struct evrpc_pool *pool)
1163 {
1164         ctx->pool = pool;
1165 }
1166 
1167 void
1168 evrpc_request_set_cb(struct evrpc_request_wrapper *ctx,
1169     void (*cb)(struct evrpc_status*, void *request, void *reply, void *arg),
1170     void *cb_arg)
1171 {
1172         ctx->cb = cb;
1173         ctx->cb_arg = cb_arg;
1174 }

/* [<][>][^][v][top][bottom][index][help] */