This source file includes following definitions.
- evrpc_init
- evrpc_free
- evrpc_add_hook
- evrpc_remove_hook_internal
- evrpc_remove_hook
- evrpc_process_hooks
- evrpc_construct_uri
- evrpc_register_rpc
- evrpc_unregister_rpc
- evrpc_request_cb
- evrpc_request_cb_closure
- evrpc_reqstate_free
- evrpc_request_done
- evrpc_get_request
- evrpc_get_reply
- evrpc_request_done_closure
- evrpc_pool_new
- evrpc_request_wrapper_free
- evrpc_pool_free
- evrpc_pool_add_connection
- evrpc_pool_remove_connection
- evrpc_pool_set_timeout
- evrpc_pool_find_connection
- evrpc_schedule_request
- evrpc_schedule_request_closure
- evrpc_pause_request
- evrpc_resume_request
- evrpc_make_request
- evrpc_make_request_ctx
- evrpc_reply_done
- evrpc_reply_done_closure
- evrpc_pool_schedule
- evrpc_request_timeout
- evrpc_meta_data_free
- evrpc_hook_meta_new
- evrpc_hook_associate_meta
- evrpc_hook_context_free
- evrpc_hook_add_meta
- evrpc_hook_find_meta
- evrpc_hook_get_connection
- evrpc_send_request_generic
- evrpc_register_object
- evrpc_register_generic
- evrpc_request_get_pool
- evrpc_request_set_pool
- evrpc_request_set_cb
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 
  17 
  18 
  19 
  20 
  21 
  22 
  23 
  24 
  25 
  26 
  27 #include "event2/event-config.h"
  28 
  29 #ifdef WIN32
  30 #define WIN32_LEAN_AND_MEAN
  31 #include <winsock2.h>
  32 #include <windows.h>
  33 #undef WIN32_LEAN_AND_MEAN
  34 #endif
  35 
  36 #include <sys/types.h>
  37 #ifndef WIN32
  38 #include <sys/socket.h>
  39 #endif
  40 #ifdef _EVENT_HAVE_SYS_TIME_H
  41 #include <sys/time.h>
  42 #endif
  43 #include <sys/queue.h>
  44 #include <stdio.h>
  45 #include <stdlib.h>
  46 #ifndef WIN32
  47 #include <unistd.h>
  48 #endif
  49 #include <errno.h>
  50 #include <signal.h>
  51 #include <string.h>
  52 
  53 #include <sys/queue.h>
  54 
  55 #include "event2/event.h"
  56 #include "event2/event_struct.h"
  57 #include "event2/rpc.h"
  58 #include "event2/rpc_struct.h"
  59 #include "evrpc-internal.h"
  60 #include "event2/http.h"
  61 #include "event2/buffer.h"
  62 #include "event2/tag.h"
  63 #include "event2/http_struct.h"
  64 #include "event2/http_compat.h"
  65 #include "event2/util.h"
  66 #include "util-internal.h"
  67 #include "log-internal.h"
  68 #include "mm-internal.h"
  69 
  70 struct evrpc_base *
  71 evrpc_init(struct evhttp *http_server)
  72 {
  73         struct evrpc_base* base = mm_calloc(1, sizeof(struct evrpc_base));
  74         if (base == NULL)
  75                 return (NULL);
  76 
  77         
  78         evtag_init();
  79 
  80         TAILQ_INIT(&base->registered_rpcs);
  81         TAILQ_INIT(&base->input_hooks);
  82         TAILQ_INIT(&base->output_hooks);
  83 
  84         TAILQ_INIT(&base->paused_requests);
  85 
  86         base->http_server = http_server;
  87 
  88         return (base);
  89 }
  90 
  91 void
  92 evrpc_free(struct evrpc_base *base)
  93 {
  94         struct evrpc *rpc;
  95         struct evrpc_hook *hook;
  96         struct evrpc_hook_ctx *pause;
  97         int r;
  98 
  99         while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) {
 100                 r = evrpc_unregister_rpc(base, rpc->uri);
 101                 EVUTIL_ASSERT(r == 0);
 102         }
 103         while ((pause = TAILQ_FIRST(&base->paused_requests)) != NULL) {
 104                 TAILQ_REMOVE(&base->paused_requests, pause, next);
 105                 mm_free(pause);
 106         }
 107         while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) {
 108                 r = evrpc_remove_hook(base, EVRPC_INPUT, hook);
 109                 EVUTIL_ASSERT(r);
 110         }
 111         while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) {
 112                 r = evrpc_remove_hook(base, EVRPC_OUTPUT, hook);
 113                 EVUTIL_ASSERT(r);
 114         }
 115         mm_free(base);
 116 }
 117 
 118 void *
 119 evrpc_add_hook(void *vbase,
 120     enum EVRPC_HOOK_TYPE hook_type,
 121     int (*cb)(void *, struct evhttp_request *, struct evbuffer *, void *),
 122     void *cb_arg)
 123 {
 124         struct _evrpc_hooks *base = vbase;
 125         struct evrpc_hook_list *head = NULL;
 126         struct evrpc_hook *hook = NULL;
 127         switch (hook_type) {
 128         case EVRPC_INPUT:
 129                 head = &base->in_hooks;
 130                 break;
 131         case EVRPC_OUTPUT:
 132                 head = &base->out_hooks;
 133                 break;
 134         default:
 135                 EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
 136         }
 137 
 138         hook = mm_calloc(1, sizeof(struct evrpc_hook));
 139         EVUTIL_ASSERT(hook != NULL);
 140 
 141         hook->process = cb;
 142         hook->process_arg = cb_arg;
 143         TAILQ_INSERT_TAIL(head, hook, next);
 144 
 145         return (hook);
 146 }
 147 
 148 static int
 149 evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle)
 150 {
 151         struct evrpc_hook *hook = NULL;
 152         TAILQ_FOREACH(hook, head, next) {
 153                 if (hook == handle) {
 154                         TAILQ_REMOVE(head, hook, next);
 155                         mm_free(hook);
 156                         return (1);
 157                 }
 158         }
 159 
 160         return (0);
 161 }
 162 
 163 
 164 
 165 
 166 
 167 int
 168 evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle)
 169 {
 170         struct _evrpc_hooks *base = vbase;
 171         struct evrpc_hook_list *head = NULL;
 172         switch (hook_type) {
 173         case EVRPC_INPUT:
 174                 head = &base->in_hooks;
 175                 break;
 176         case EVRPC_OUTPUT:
 177                 head = &base->out_hooks;
 178                 break;
 179         default:
 180                 EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
 181         }
 182 
 183         return (evrpc_remove_hook_internal(head, handle));
 184 }
 185 
 186 static int
 187 evrpc_process_hooks(struct evrpc_hook_list *head, void *ctx,
 188     struct evhttp_request *req, struct evbuffer *evbuf)
 189 {
 190         struct evrpc_hook *hook;
 191         TAILQ_FOREACH(hook, head, next) {
 192                 int res = hook->process(ctx, req, evbuf, hook->process_arg);
 193                 if (res != EVRPC_CONTINUE)
 194                         return (res);
 195         }
 196 
 197         return (EVRPC_CONTINUE);
 198 }
 199 
 200 static void evrpc_pool_schedule(struct evrpc_pool *pool);
 201 static void evrpc_request_cb(struct evhttp_request *, void *);
 202 
 203 
 204 
 205 
 206 
 207 
 208 
 209 static char *
 210 evrpc_construct_uri(const char *uri)
 211 {
 212         char *constructed_uri;
 213         size_t constructed_uri_len;
 214 
 215         constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1;
 216         if ((constructed_uri = mm_malloc(constructed_uri_len)) == NULL)
 217                 event_err(1, "%s: failed to register rpc at %s",
 218                     __func__, uri);
 219         memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX));
 220         memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri));
 221         constructed_uri[constructed_uri_len - 1] = '\0';
 222 
 223         return (constructed_uri);
 224 }
 225 
 226 int
 227 evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc,
 228     void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg)
 229 {
 230         char *constructed_uri = evrpc_construct_uri(rpc->uri);
 231 
 232         rpc->base = base;
 233         rpc->cb = cb;
 234         rpc->cb_arg = cb_arg;
 235 
 236         TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next);
 237 
 238         evhttp_set_cb(base->http_server,
 239             constructed_uri,
 240             evrpc_request_cb,
 241             rpc);
 242 
 243         mm_free(constructed_uri);
 244 
 245         return (0);
 246 }
 247 
 248 int
 249 evrpc_unregister_rpc(struct evrpc_base *base, const char *name)
 250 {
 251         char *registered_uri = NULL;
 252         struct evrpc *rpc;
 253         int r;
 254 
 255         
 256         TAILQ_FOREACH(rpc, &base->registered_rpcs, next) {
 257                 if (strcmp(rpc->uri, name) == 0)
 258                         break;
 259         }
 260         if (rpc == NULL) {
 261                 
 262                 return (-1);
 263         }
 264         TAILQ_REMOVE(&base->registered_rpcs, rpc, next);
 265 
 266         registered_uri = evrpc_construct_uri(name);
 267 
 268         
 269         r = evhttp_del_cb(base->http_server, registered_uri);
 270         EVUTIL_ASSERT(r == 0);
 271 
 272         mm_free(registered_uri);
 273 
 274         mm_free((char *)rpc->uri);
 275         mm_free(rpc);
 276         return (0);
 277 }
 278 
 279 static int evrpc_pause_request(void *vbase, void *ctx,
 280     void (*cb)(void *, enum EVRPC_HOOK_RESULT));
 281 static void evrpc_request_cb_closure(void *, enum EVRPC_HOOK_RESULT);
 282 
 283 static void
 284 evrpc_request_cb(struct evhttp_request *req, void *arg)
 285 {
 286         struct evrpc *rpc = arg;
 287         struct evrpc_req_generic *rpc_state = NULL;
 288 
 289         
 290         if (req->type != EVHTTP_REQ_POST ||
 291             evbuffer_get_length(req->input_buffer) <= 0)
 292                 goto error;
 293 
 294         rpc_state = mm_calloc(1, sizeof(struct evrpc_req_generic));
 295         if (rpc_state == NULL)
 296                 goto error;
 297         rpc_state->rpc = rpc;
 298         rpc_state->http_req = req;
 299         rpc_state->rpc_data = NULL;
 300 
 301         if (TAILQ_FIRST(&rpc->base->input_hooks) != NULL) {
 302                 int hook_res;
 303 
 304                 evrpc_hook_associate_meta(&rpc_state->hook_meta, req->evcon);
 305 
 306                 
 307 
 308 
 309                 hook_res = evrpc_process_hooks(&rpc->base->input_hooks,
 310                     rpc_state, req, req->input_buffer);
 311                 switch (hook_res) {
 312                 case EVRPC_TERMINATE:
 313                         goto error;
 314                 case EVRPC_PAUSE:
 315                         evrpc_pause_request(rpc->base, rpc_state,
 316                             evrpc_request_cb_closure);
 317                         return;
 318                 case EVRPC_CONTINUE:
 319                         break;
 320                 default:
 321                         EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
 322                             hook_res == EVRPC_CONTINUE ||
 323                             hook_res == EVRPC_PAUSE);
 324                 }
 325         }
 326 
 327         evrpc_request_cb_closure(rpc_state, EVRPC_CONTINUE);
 328         return;
 329 
 330 error:
 331         if (rpc_state != NULL)
 332                 evrpc_reqstate_free(rpc_state);
 333         evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
 334         return;
 335 }
 336 
 337 static void
 338 evrpc_request_cb_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
 339 {
 340         struct evrpc_req_generic *rpc_state = arg;
 341         struct evrpc *rpc;
 342         struct evhttp_request *req;
 343 
 344         EVUTIL_ASSERT(rpc_state);
 345         rpc = rpc_state->rpc;
 346         req = rpc_state->http_req;
 347 
 348         if (hook_res == EVRPC_TERMINATE)
 349                 goto error;
 350 
 351         
 352         rpc_state->request = rpc->request_new(rpc->request_new_arg);
 353         if (rpc_state->request == NULL)
 354                 goto error;
 355 
 356         if (rpc->request_unmarshal(
 357                     rpc_state->request, req->input_buffer) == -1) {
 358                 
 359                 goto error;
 360         }
 361 
 362         
 363 
 364         rpc_state->reply = rpc->reply_new(rpc->reply_new_arg);
 365         if (rpc_state->reply == NULL)
 366                 goto error;
 367 
 368         
 369         rpc->cb(rpc_state, rpc->cb_arg);
 370 
 371         return;
 372 
 373 error:
 374         if (rpc_state != NULL)
 375                 evrpc_reqstate_free(rpc_state);
 376         evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
 377         return;
 378 }
 379 
 380 
 381 void
 382 evrpc_reqstate_free(struct evrpc_req_generic* rpc_state)
 383 {
 384         struct evrpc *rpc;
 385         EVUTIL_ASSERT(rpc_state != NULL);
 386         rpc = rpc_state->rpc;
 387 
 388         
 389         if (rpc_state->hook_meta != NULL)
 390                 evrpc_hook_context_free(rpc_state->hook_meta);
 391         if (rpc_state->request != NULL)
 392                 rpc->request_free(rpc_state->request);
 393         if (rpc_state->reply != NULL)
 394                 rpc->reply_free(rpc_state->reply);
 395         if (rpc_state->rpc_data != NULL)
 396                 evbuffer_free(rpc_state->rpc_data);
 397         mm_free(rpc_state);
 398 }
 399 
 400 static void
 401 evrpc_request_done_closure(void *, enum EVRPC_HOOK_RESULT);
 402 
 403 void
 404 evrpc_request_done(struct evrpc_req_generic *rpc_state)
 405 {
 406         struct evhttp_request *req;
 407         struct evrpc *rpc;
 408 
 409         EVUTIL_ASSERT(rpc_state);
 410 
 411         req = rpc_state->http_req;
 412         rpc = rpc_state->rpc;
 413 
 414         if (rpc->reply_complete(rpc_state->reply) == -1) {
 415                 
 416                 goto error;
 417         }
 418 
 419         if ((rpc_state->rpc_data = evbuffer_new()) == NULL) {
 420                 
 421                 goto error;
 422         }
 423 
 424         
 425         rpc->reply_marshal(rpc_state->rpc_data, rpc_state->reply);
 426 
 427         if (TAILQ_FIRST(&rpc->base->output_hooks) != NULL) {
 428                 int hook_res;
 429 
 430                 evrpc_hook_associate_meta(&rpc_state->hook_meta, req->evcon);
 431 
 432                 
 433                 hook_res = evrpc_process_hooks(&rpc->base->output_hooks,
 434                     rpc_state, req, rpc_state->rpc_data);
 435                 switch (hook_res) {
 436                 case EVRPC_TERMINATE:
 437                         goto error;
 438                 case EVRPC_PAUSE:
 439                         if (evrpc_pause_request(rpc->base, rpc_state,
 440                                 evrpc_request_done_closure) == -1)
 441                                 goto error;
 442                         return;
 443                 case EVRPC_CONTINUE:
 444                         break;
 445                 default:
 446                         EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
 447                             hook_res == EVRPC_CONTINUE ||
 448                             hook_res == EVRPC_PAUSE);
 449                 }
 450         }
 451 
 452         evrpc_request_done_closure(rpc_state, EVRPC_CONTINUE);
 453         return;
 454 
 455 error:
 456         if (rpc_state != NULL)
 457                 evrpc_reqstate_free(rpc_state);
 458         evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
 459         return;
 460 }
 461 
 462 void *
 463 evrpc_get_request(struct evrpc_req_generic *req)
 464 {
 465         return req->request;
 466 }
 467 
 468 void *
 469 evrpc_get_reply(struct evrpc_req_generic *req)
 470 {
 471         return req->reply;
 472 }
 473 
 474 static void
 475 evrpc_request_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
 476 {
 477         struct evrpc_req_generic *rpc_state = arg;
 478         struct evhttp_request *req;
 479         EVUTIL_ASSERT(rpc_state);
 480         req = rpc_state->http_req;
 481 
 482         if (hook_res == EVRPC_TERMINATE)
 483                 goto error;
 484 
 485         
 486         if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) {
 487                 evhttp_add_header(req->output_headers,
 488                     "Content-Type", "application/octet-stream");
 489         }
 490         evhttp_send_reply(req, HTTP_OK, "OK", rpc_state->rpc_data);
 491 
 492         evrpc_reqstate_free(rpc_state);
 493 
 494         return;
 495 
 496 error:
 497         if (rpc_state != NULL)
 498                 evrpc_reqstate_free(rpc_state);
 499         evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
 500         return;
 501 }
 502 
 503 
 504 
 505 
 506 static int evrpc_schedule_request(struct evhttp_connection *connection,
 507     struct evrpc_request_wrapper *ctx);
 508 
 509 struct evrpc_pool *
 510 evrpc_pool_new(struct event_base *base)
 511 {
 512         struct evrpc_pool *pool = mm_calloc(1, sizeof(struct evrpc_pool));
 513         if (pool == NULL)
 514                 return (NULL);
 515 
 516         TAILQ_INIT(&pool->connections);
 517         TAILQ_INIT(&pool->requests);
 518 
 519         TAILQ_INIT(&pool->paused_requests);
 520 
 521         TAILQ_INIT(&pool->input_hooks);
 522         TAILQ_INIT(&pool->output_hooks);
 523 
 524         pool->base = base;
 525         pool->timeout = -1;
 526 
 527         return (pool);
 528 }
 529 
 530 static void
 531 evrpc_request_wrapper_free(struct evrpc_request_wrapper *request)
 532 {
 533         if (request->hook_meta != NULL)
 534                 evrpc_hook_context_free(request->hook_meta);
 535         mm_free(request->name);
 536         mm_free(request);
 537 }
 538 
 539 void
 540 evrpc_pool_free(struct evrpc_pool *pool)
 541 {
 542         struct evhttp_connection *connection;
 543         struct evrpc_request_wrapper *request;
 544         struct evrpc_hook_ctx *pause;
 545         struct evrpc_hook *hook;
 546         int r;
 547 
 548         while ((request = TAILQ_FIRST(&pool->requests)) != NULL) {
 549                 TAILQ_REMOVE(&pool->requests, request, next);
 550                 evrpc_request_wrapper_free(request);
 551         }
 552 
 553         while ((pause = TAILQ_FIRST(&pool->paused_requests)) != NULL) {
 554                 TAILQ_REMOVE(&pool->paused_requests, pause, next);
 555                 mm_free(pause);
 556         }
 557 
 558         while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) {
 559                 TAILQ_REMOVE(&pool->connections, connection, next);
 560                 evhttp_connection_free(connection);
 561         }
 562 
 563         while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) {
 564                 r = evrpc_remove_hook(pool, EVRPC_INPUT, hook);
 565                 EVUTIL_ASSERT(r);
 566         }
 567 
 568         while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) {
 569                 r = evrpc_remove_hook(pool, EVRPC_OUTPUT, hook);
 570                 EVUTIL_ASSERT(r);
 571         }
 572 
 573         mm_free(pool);
 574 }
 575 
 576 
 577 
 578 
 579 
 580 
 581 void
 582 evrpc_pool_add_connection(struct evrpc_pool *pool,
 583     struct evhttp_connection *connection)
 584 {
 585         EVUTIL_ASSERT(connection->http_server == NULL);
 586         TAILQ_INSERT_TAIL(&pool->connections, connection, next);
 587 
 588         
 589 
 590 
 591         if (pool->base != NULL)
 592                 evhttp_connection_set_base(connection, pool->base);
 593 
 594         
 595 
 596 
 597 
 598         if (connection->timeout == -1)
 599                 connection->timeout = pool->timeout;
 600 
 601         
 602 
 603 
 604 
 605 
 606         if (TAILQ_FIRST(&pool->requests) != NULL) {
 607                 struct evrpc_request_wrapper *request =
 608                     TAILQ_FIRST(&pool->requests);
 609                 TAILQ_REMOVE(&pool->requests, request, next);
 610                 evrpc_schedule_request(connection, request);
 611         }
 612 }
 613 
 614 void
 615 evrpc_pool_remove_connection(struct evrpc_pool *pool,
 616     struct evhttp_connection *connection)
 617 {
 618         TAILQ_REMOVE(&pool->connections, connection, next);
 619 }
 620 
 621 void
 622 evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs)
 623 {
 624         struct evhttp_connection *evcon;
 625         TAILQ_FOREACH(evcon, &pool->connections, next) {
 626                 evcon->timeout = timeout_in_secs;
 627         }
 628         pool->timeout = timeout_in_secs;
 629 }
 630 
 631 
 632 static void evrpc_reply_done(struct evhttp_request *, void *);
 633 static void evrpc_request_timeout(evutil_socket_t, short, void *);
 634 
 635 
 636 
 637 
 638 
 639 static struct evhttp_connection *
 640 evrpc_pool_find_connection(struct evrpc_pool *pool)
 641 {
 642         struct evhttp_connection *connection;
 643         TAILQ_FOREACH(connection, &pool->connections, next) {
 644                 if (TAILQ_FIRST(&connection->requests) == NULL)
 645                         return (connection);
 646         }
 647 
 648         return (NULL);
 649 }
 650 
 651 
 652 
 653 
 654 
 655 static void evrpc_schedule_request_closure(void *ctx, enum EVRPC_HOOK_RESULT);
 656 
 657 
 658 
 659 
 660 static int
 661 evrpc_schedule_request(struct evhttp_connection *connection,
 662     struct evrpc_request_wrapper *ctx)
 663 {
 664         struct evhttp_request *req = NULL;
 665         struct evrpc_pool *pool = ctx->pool;
 666         struct evrpc_status status;
 667 
 668         if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL)
 669                 goto error;
 670 
 671         
 672         ctx->request_marshal(req->output_buffer, ctx->request);
 673 
 674         
 675         ctx->evcon = connection;
 676 
 677         
 678         ctx->req = req;
 679 
 680         if (TAILQ_FIRST(&pool->output_hooks) != NULL) {
 681                 int hook_res;
 682 
 683                 evrpc_hook_associate_meta(&ctx->hook_meta, connection);
 684 
 685                 
 686                 hook_res = evrpc_process_hooks(&pool->output_hooks,
 687                     ctx, req, req->output_buffer);
 688 
 689                 switch (hook_res) {
 690                 case EVRPC_TERMINATE:
 691                         goto error;
 692                 case EVRPC_PAUSE:
 693                         
 694                         if (evrpc_pause_request(pool, ctx,
 695                                 evrpc_schedule_request_closure) == -1)
 696                                 goto error;
 697                         return (0);
 698                 case EVRPC_CONTINUE:
 699                         
 700                         break;
 701                 default:
 702                         EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
 703                             hook_res == EVRPC_CONTINUE ||
 704                             hook_res == EVRPC_PAUSE);
 705                 }
 706         }
 707 
 708         evrpc_schedule_request_closure(ctx, EVRPC_CONTINUE);
 709         return (0);
 710 
 711 error:
 712         memset(&status, 0, sizeof(status));
 713         status.error = EVRPC_STATUS_ERR_UNSTARTED;
 714         (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
 715         evrpc_request_wrapper_free(ctx);
 716         return (-1);
 717 }
 718 
 719 static void
 720 evrpc_schedule_request_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
 721 {
 722         struct evrpc_request_wrapper *ctx = arg;
 723         struct evhttp_connection *connection = ctx->evcon;
 724         struct evhttp_request *req = ctx->req;
 725         struct evrpc_pool *pool = ctx->pool;
 726         struct evrpc_status status;
 727         char *uri = NULL;
 728         int res = 0;
 729 
 730         if (hook_res == EVRPC_TERMINATE)
 731                 goto error;
 732 
 733         uri = evrpc_construct_uri(ctx->name);
 734         if (uri == NULL)
 735                 goto error;
 736 
 737         if (pool->timeout > 0) {
 738                 
 739 
 740 
 741                 struct timeval tv;
 742                 evutil_timerclear(&tv);
 743                 tv.tv_sec = pool->timeout;
 744                 evtimer_add(&ctx->ev_timeout, &tv);
 745         }
 746 
 747         
 748         res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri);
 749         mm_free(uri);
 750 
 751         if (res == -1)
 752                 goto error;
 753 
 754         return;
 755 
 756 error:
 757         memset(&status, 0, sizeof(status));
 758         status.error = EVRPC_STATUS_ERR_UNSTARTED;
 759         (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
 760         evrpc_request_wrapper_free(ctx);
 761 }
 762 
 763 
 764 static int
 765 evrpc_pause_request(void *vbase, void *ctx,
 766     void (*cb)(void *, enum EVRPC_HOOK_RESULT))
 767 {
 768         struct _evrpc_hooks *base = vbase;
 769         struct evrpc_hook_ctx *pause = mm_malloc(sizeof(*pause));
 770         if (pause == NULL)
 771                 return (-1);
 772 
 773         pause->ctx = ctx;
 774         pause->cb = cb;
 775 
 776         TAILQ_INSERT_TAIL(&base->pause_requests, pause, next);
 777         return (0);
 778 }
 779 
 780 int
 781 evrpc_resume_request(void *vbase, void *ctx, enum EVRPC_HOOK_RESULT res)
 782 {
 783         struct _evrpc_hooks *base = vbase;
 784         struct evrpc_pause_list *head = &base->pause_requests;
 785         struct evrpc_hook_ctx *pause;
 786 
 787         TAILQ_FOREACH(pause, head, next) {
 788                 if (pause->ctx == ctx)
 789                         break;
 790         }
 791 
 792         if (pause == NULL)
 793                 return (-1);
 794 
 795         (*pause->cb)(pause->ctx, res);
 796         TAILQ_REMOVE(head, pause, next);
 797         mm_free(pause);
 798         return (0);
 799 }
 800 
 801 int
 802 evrpc_make_request(struct evrpc_request_wrapper *ctx)
 803 {
 804         struct evrpc_pool *pool = ctx->pool;
 805 
 806         
 807         evtimer_assign(&ctx->ev_timeout, pool->base, evrpc_request_timeout, ctx);
 808 
 809         
 810         EVUTIL_ASSERT(TAILQ_FIRST(&pool->connections) != NULL);
 811 
 812         
 813 
 814 
 815 
 816         TAILQ_INSERT_TAIL(&pool->requests, ctx, next);
 817 
 818         evrpc_pool_schedule(pool);
 819 
 820         return (0);
 821 }
 822 
 823 
 824 struct evrpc_request_wrapper *
 825 evrpc_make_request_ctx(
 826         struct evrpc_pool *pool, void *request, void *reply,
 827         const char *rpcname,
 828         void (*req_marshal)(struct evbuffer*, void *),
 829         void (*rpl_clear)(void *),
 830         int (*rpl_unmarshal)(void *, struct evbuffer *),
 831         void (*cb)(struct evrpc_status *, void *, void *, void *),
 832         void *cbarg)
 833 {
 834         struct evrpc_request_wrapper *ctx = (struct evrpc_request_wrapper *)
 835             mm_malloc(sizeof(struct evrpc_request_wrapper));
 836         if (ctx == NULL)
 837                 return (NULL);
 838 
 839         ctx->pool = pool;
 840         ctx->hook_meta = NULL;
 841         ctx->evcon = NULL;
 842         ctx->name = mm_strdup(rpcname);
 843         if (ctx->name == NULL) {
 844                 mm_free(ctx);
 845                 return (NULL);
 846         }
 847         ctx->cb = cb;
 848         ctx->cb_arg = cbarg;
 849         ctx->request = request;
 850         ctx->reply = reply;
 851         ctx->request_marshal = req_marshal;
 852         ctx->reply_clear = rpl_clear;
 853         ctx->reply_unmarshal = rpl_unmarshal;
 854 
 855         return (ctx);
 856 }
 857 
 858 static void
 859 evrpc_reply_done_closure(void *, enum EVRPC_HOOK_RESULT);
 860 
 861 static void
 862 evrpc_reply_done(struct evhttp_request *req, void *arg)
 863 {
 864         struct evrpc_request_wrapper *ctx = arg;
 865         struct evrpc_pool *pool = ctx->pool;
 866         int hook_res = EVRPC_CONTINUE;
 867 
 868         
 869         event_del(&ctx->ev_timeout);
 870 
 871         ctx->req = req;
 872 
 873         
 874         if (req == NULL) {
 875                 evrpc_reply_done_closure(ctx, EVRPC_CONTINUE);
 876                 return;
 877         }
 878 
 879         if (TAILQ_FIRST(&pool->input_hooks) != NULL) {
 880                 evrpc_hook_associate_meta(&ctx->hook_meta, ctx->evcon);
 881 
 882                 
 883                 hook_res = evrpc_process_hooks(&pool->input_hooks,
 884                     ctx, req, req->input_buffer);
 885 
 886                 switch (hook_res) {
 887                 case EVRPC_TERMINATE:
 888                 case EVRPC_CONTINUE:
 889                         break;
 890                 case EVRPC_PAUSE:
 891                         
 892 
 893 
 894 
 895 
 896 
 897                         if (req != NULL)
 898                                 evhttp_request_own(req);
 899 
 900                         evrpc_pause_request(pool, ctx,
 901                             evrpc_reply_done_closure);
 902                         return;
 903                 default:
 904                         EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
 905                             hook_res == EVRPC_CONTINUE ||
 906                             hook_res == EVRPC_PAUSE);
 907                 }
 908         }
 909 
 910         evrpc_reply_done_closure(ctx, hook_res);
 911 
 912         
 913 }
 914 
 915 static void
 916 evrpc_reply_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
 917 {
 918         struct evrpc_request_wrapper *ctx = arg;
 919         struct evhttp_request *req = ctx->req;
 920         struct evrpc_pool *pool = ctx->pool;
 921         struct evrpc_status status;
 922         int res = -1;
 923 
 924         memset(&status, 0, sizeof(status));
 925         status.http_req = req;
 926 
 927         
 928         if (req == NULL) {
 929                 status.error = EVRPC_STATUS_ERR_TIMEOUT;
 930         } else if (hook_res == EVRPC_TERMINATE) {
 931                 status.error = EVRPC_STATUS_ERR_HOOKABORTED;
 932         } else {
 933                 res = ctx->reply_unmarshal(ctx->reply, req->input_buffer);
 934                 if (res == -1)
 935                         status.error = EVRPC_STATUS_ERR_BADPAYLOAD;
 936         }
 937 
 938         if (res == -1) {
 939                 
 940                 ctx->reply_clear(ctx->reply);
 941         }
 942 
 943         (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
 944 
 945         evrpc_request_wrapper_free(ctx);
 946 
 947         
 948 
 949         if (req != NULL && evhttp_request_is_owned(req))
 950                 evhttp_request_free(req);
 951 
 952         
 953         evrpc_pool_schedule(pool);
 954 }
 955 
 956 static void
 957 evrpc_pool_schedule(struct evrpc_pool *pool)
 958 {
 959         struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests);
 960         struct evhttp_connection *evcon;
 961 
 962         
 963         if (ctx == NULL)
 964                 return;
 965 
 966         if ((evcon = evrpc_pool_find_connection(pool)) != NULL) {
 967                 TAILQ_REMOVE(&pool->requests, ctx, next);
 968                 evrpc_schedule_request(evcon, ctx);
 969         }
 970 }
 971 
 972 static void
 973 evrpc_request_timeout(evutil_socket_t fd, short what, void *arg)
 974 {
 975         struct evrpc_request_wrapper *ctx = arg;
 976         struct evhttp_connection *evcon = ctx->evcon;
 977         EVUTIL_ASSERT(evcon != NULL);
 978 
 979         evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT);
 980 }
 981 
 982 
 983 
 984 
 985 
 986 static void
 987 evrpc_meta_data_free(struct evrpc_meta_list *meta_data)
 988 {
 989         struct evrpc_meta *entry;
 990         EVUTIL_ASSERT(meta_data != NULL);
 991 
 992         while ((entry = TAILQ_FIRST(meta_data)) != NULL) {
 993                 TAILQ_REMOVE(meta_data, entry, next);
 994                 mm_free(entry->key);
 995                 mm_free(entry->data);
 996                 mm_free(entry);
 997         }
 998 }
 999 
1000 static struct evrpc_hook_meta *
1001 evrpc_hook_meta_new(void)
1002 {
1003         struct evrpc_hook_meta *ctx;
1004         ctx = mm_malloc(sizeof(struct evrpc_hook_meta));
1005         EVUTIL_ASSERT(ctx != NULL);
1006 
1007         TAILQ_INIT(&ctx->meta_data);
1008         ctx->evcon = NULL;
1009 
1010         return (ctx);
1011 }
1012 
1013 static void
1014 evrpc_hook_associate_meta(struct evrpc_hook_meta **pctx,
1015     struct evhttp_connection *evcon)
1016 {
1017         struct evrpc_hook_meta *ctx = *pctx;
1018         if (ctx == NULL)
1019                 *pctx = ctx = evrpc_hook_meta_new();
1020         ctx->evcon = evcon;
1021 }
1022 
1023 static void
1024 evrpc_hook_context_free(struct evrpc_hook_meta *ctx)
1025 {
1026         evrpc_meta_data_free(&ctx->meta_data);
1027         mm_free(ctx);
1028 }
1029 
1030 
1031 void
1032 evrpc_hook_add_meta(void *ctx, const char *key,
1033     const void *data, size_t data_size)
1034 {
1035         struct evrpc_request_wrapper *req = ctx;
1036         struct evrpc_hook_meta *store = NULL;
1037         struct evrpc_meta *meta = NULL;
1038 
1039         if ((store = req->hook_meta) == NULL)
1040                 store = req->hook_meta = evrpc_hook_meta_new();
1041 
1042         meta = mm_malloc(sizeof(struct evrpc_meta));
1043         EVUTIL_ASSERT(meta != NULL);
1044         meta->key = mm_strdup(key);
1045         EVUTIL_ASSERT(meta->key != NULL);
1046         meta->data_size = data_size;
1047         meta->data = mm_malloc(data_size);
1048         EVUTIL_ASSERT(meta->data != NULL);
1049         memcpy(meta->data, data, data_size);
1050 
1051         TAILQ_INSERT_TAIL(&store->meta_data, meta, next);
1052 }
1053 
1054 int
1055 evrpc_hook_find_meta(void *ctx, const char *key, void **data, size_t *data_size)
1056 {
1057         struct evrpc_request_wrapper *req = ctx;
1058         struct evrpc_meta *meta = NULL;
1059 
1060         if (req->hook_meta == NULL)
1061                 return (-1);
1062 
1063         TAILQ_FOREACH(meta, &req->hook_meta->meta_data, next) {
1064                 if (strcmp(meta->key, key) == 0) {
1065                         *data = meta->data;
1066                         *data_size = meta->data_size;
1067                         return (0);
1068                 }
1069         }
1070 
1071         return (-1);
1072 }
1073 
1074 struct evhttp_connection *
1075 evrpc_hook_get_connection(void *ctx)
1076 {
1077         struct evrpc_request_wrapper *req = ctx;
1078         return (req->hook_meta != NULL ? req->hook_meta->evcon : NULL);
1079 }
1080 
1081 int
1082 evrpc_send_request_generic(struct evrpc_pool *pool,
1083     void *request, void *reply,
1084     void (*cb)(struct evrpc_status *, void *, void *, void *),
1085     void *cb_arg,
1086     const char *rpcname,
1087     void (*req_marshal)(struct evbuffer *, void *),
1088     void (*rpl_clear)(void *),
1089     int (*rpl_unmarshal)(void *, struct evbuffer *))
1090 {
1091         struct evrpc_status status;
1092         struct evrpc_request_wrapper *ctx;
1093         ctx = evrpc_make_request_ctx(pool, request, reply,
1094             rpcname, req_marshal, rpl_clear, rpl_unmarshal, cb, cb_arg);
1095         if (ctx == NULL)
1096                 goto error;
1097         return (evrpc_make_request(ctx));
1098 error:
1099         memset(&status, 0, sizeof(status));
1100         status.error = EVRPC_STATUS_ERR_UNSTARTED;
1101         (*(cb))(&status, request, reply, cb_arg);
1102         return (-1);
1103 }
1104 
1105 
1106 static struct evrpc *
1107 evrpc_register_object(const char *name,
1108     void *(*req_new)(void*), void *req_new_arg, void (*req_free)(void *),
1109     int (*req_unmarshal)(void *, struct evbuffer *),
1110     void *(*rpl_new)(void*), void *rpl_new_arg, void (*rpl_free)(void *),
1111     int (*rpl_complete)(void *),
1112     void (*rpl_marshal)(struct evbuffer *, void *))
1113 {
1114         struct evrpc* rpc = (struct evrpc *)mm_calloc(1, sizeof(struct evrpc));
1115         if (rpc == NULL)
1116                 return (NULL);
1117         rpc->uri = mm_strdup(name);
1118         if (rpc->uri == NULL) {
1119                 mm_free(rpc);
1120                 return (NULL);
1121         }
1122         rpc->request_new = req_new;
1123         rpc->request_new_arg = req_new_arg;
1124         rpc->request_free = req_free;
1125         rpc->request_unmarshal = req_unmarshal;
1126         rpc->reply_new = rpl_new;
1127         rpc->reply_new_arg = rpl_new_arg;
1128         rpc->reply_free = rpl_free;
1129         rpc->reply_complete = rpl_complete;
1130         rpc->reply_marshal = rpl_marshal;
1131         return (rpc);
1132 }
1133 
1134 int
1135 evrpc_register_generic(struct evrpc_base *base, const char *name,
1136     void (*callback)(struct evrpc_req_generic *, void *), void *cbarg,
1137     void *(*req_new)(void *), void *req_new_arg, void (*req_free)(void *),
1138     int (*req_unmarshal)(void *, struct evbuffer *),
1139     void *(*rpl_new)(void *), void *rpl_new_arg, void (*rpl_free)(void *),
1140     int (*rpl_complete)(void *),
1141     void (*rpl_marshal)(struct evbuffer *, void *))
1142 {
1143         struct evrpc* rpc =
1144             evrpc_register_object(name, req_new, req_new_arg, req_free, req_unmarshal,
1145                 rpl_new, rpl_new_arg, rpl_free, rpl_complete, rpl_marshal);
1146         if (rpc == NULL)
1147                 return (-1);
1148         evrpc_register_rpc(base, rpc,
1149             (void (*)(struct evrpc_req_generic*, void *))callback, cbarg);
1150         return (0);
1151 }
1152 
1153 
1154 struct evrpc_pool *
1155 evrpc_request_get_pool(struct evrpc_request_wrapper *ctx)
1156 {
1157         return (ctx->pool);
1158 }
1159 
1160 void
1161 evrpc_request_set_pool(struct evrpc_request_wrapper *ctx,
1162     struct evrpc_pool *pool)
1163 {
1164         ctx->pool = pool;
1165 }
1166 
1167 void
1168 evrpc_request_set_cb(struct evrpc_request_wrapper *ctx,
1169     void (*cb)(struct evrpc_status*, void *request, void *reply, void *arg),
1170     void *cb_arg)
1171 {
1172         ctx->cb = cb;
1173         ctx->cb_arg = cb_arg;
1174 }