1.初始化

Windows上需要自己初始化系统网络库,即调用

WSADATA WSAData;
WSAStartup(0x201, &WSAData);

结束使用时再

WSACleanup();

另外还需要初始化线程设置
windows上
evthread_use_windows_threads
linux上
evthread_use_pthreads

综上,使用其它libevent的函数前需要这样:

#if defined(WIN32)
  WSADATA WSAData;
  WSAStartup(0x201, &WSAData);
  evthread_use_windows_threads();
#else
  evthread_use_pthreads();
#endif

2.Loop

因为event_base_loop在没有event的时候就会结束,所以作为事件循环,需要加一个超时时间很长的timer event作为默认的负载,在真正结束事件循环的时候再把这个event delete掉。

3.Timer

event_new出来的对象是需要自己释放的,而且无法从event_base里再获取所有的还没执行的events,所以需要把event*保存起来,执行后再event_del。也就是需要自己做一个封装,callback函数的arg肯定不会是目标的对象。

4.多线程

libevent即使提供了多线程初始化函数,它也不能简单地跨线程发消息。一般来说会想当然地在当前线程中对另一个线程中的event_base加入一个超时为0的timer event,这样就会在另一个线程执行那个事件,达到发线程消息的效果。然而如果短时间内连续这样做,这些timer events并不能保证按序执行。
安全的做法是:

  1. 每个线程创建自己的消息队列,发线程消息就是往队列增加一个消息。当然,这个过程得加锁。
  2. 创建一对互相连接好的socket,线程的event_base加入event监听EV_READ事件,触发就按序执行消息队列。
  3. 每发一个线程消息就等于往队列增加一个消息,然后往socket写一个字节(任意内容)。即目标线程因为socket可读而结束等待继而执行消息。
    综上,用libevent来作为多线程间的事件循环和互发消息是还有工作要做的。

5.HTTP client

evhttp有挺多的坑,如回调的时机不明确、回调的参数有可能为空,connection和request的生命周期奇异等。这些在文档中没有过多的说明,只能吃一堑长一智。用C语言来写的库虽然效率高,但确实不怎么好维护。
下面是自己的实验代码,不想过多解释,请自行参考。

// HttpTransaction.h

struct event_base;
struct evhttp_connection;
struct evhttp_request;
struct event;

class HttpTransaction {
 public:
  explicit HttpTransaction(event_base* base);
  ~HttpTransaction();

  class Delegate {
   public:
    virtual ~Delegate() {}

    virtual void OnResponseReceived(HttpTransaction* transaction,
        const HttpResponse& response) = 0;

    virtual void OnDataReceived(HttpTransaction* transaction,
        const char* data, size_t length) = 0;

    virtual void OnFinished(HttpTransaction* transaction) = 0;

    virtual void OnError(HttpTransaction* transaction, int error_code) = 0;
  };

  virtual void Start(int* out_error_code);

  virtual void Cancel();

  virtual HttpRequest* request() const { return http_request_; }
  virtual void set_request(HttpRequest* request) {
    http_request_ = request;
  }

  virtual Delegate* delegate() const { return delegate_; }
  virtual void set_delegate(Delegate* delegate) { delegate_ = delegate; }

  void OnEvRequestCallback(evhttp_request* req);
  void OnTimerCallback();

 private:
  enum State {
    STATE_NONE,
    STATE_WAITING_REQUEST_CALLBACK,
    STATE_REPORT_RESPONSE,
    STATE_REPORT_DATA,
    STATE_REPORT_FINISH,
    STATE_REPORT_ERROR
  };

  void SetupTimer();
  void DoReportResponse();
  void DoReportData();
  void DoReportFinish();
  void DoReportError();

  event_base* base_;
  HttpRequest* http_request_;
  Delegate* delegate_;
  evhttp_connection* ev_connection_;
  evhttp_request* ev_request_;
  event* call_delegate_timer_;
  State next_state_;
};

// HttpTransaction.cpp
#include <string.h>
#include <assert.h>
#include "event2/event.h"
#include "event2/http.h"
#include "event2/buffer.h"
#include "event2/keyvalq_struct.h"
#include "http_transaction_impl.h"
#include "http_request.h"
#include "mutable_http_response.h"

void RequestCallback(struct evhttp_request* req, void *arg) {
  HttpTransaction* transaction = static_cast<HttpTransaction*>(arg);
  transaction->OnEvRequestCallback(req);
}

void TimerCallback(evutil_socket_t fd, short what, void* arg) {
  HttpTransaction* transaction = static_cast<HttpTransaction*>(arg);
  transaction->OnTimerCallback();
}

HttpTransaction::HttpTransaction(struct event_base* base)
    : base_(base),
      http_request_(NULL),
      delegate_(NULL),
      ev_connection_(NULL),
      ev_request_(NULL),
      call_delegate_timer_(NULL),
      next_state_(STATE_NONE) {
}

HttpTransaction::~HttpTransaction() {
  Cancel();
}

void HttpTransaction::Start(int* out_error_code) {
  if (!delegate_) {
    *out_error_code = 1;
    return;
  }

  if (!http_request_) {
    *out_error_code = 4;
    return;
  }

  const char* url = http_request_->url();
  if (!url) {
    *out_error_code = 11;
    return;
  }
  struct evhttp_uri *uri = evhttp_uri_parse(url);
  if (!uri) {
    *out_error_code = 12;
    return;
  }

  const char* host = evhttp_uri_get_host(uri);
  if (!host) {
    evhttp_uri_free(uri);
    *out_error_code = 13;
    return;
  }
  int port = evhttp_uri_get_port(uri);
  if (port == -1)
    port = 80;

  const char* method = http_request_->method();
  if (!method) {
    evhttp_uri_free(uri);
    *out_error_code = 14;
    return;
  }
  evhttp_cmd_type cmd_type;
  if (strcmp(method, "GET") == 0) {
    cmd_type = EVHTTP_REQ_GET;
  } else if (strcmp(method, "POST") == 0) {
    cmd_type = EVHTTP_REQ_POST;
  } else {
    evhttp_uri_free(uri);
    *out_error_code = 15;
    return;
  }

  ev_connection_ = evhttp_connection_base_new(base_, NULL, host, port);
  if (!ev_connection_) {
    evhttp_uri_free(uri);
    *out_error_code = 2;
    return;
  }

  ev_request_ = evhttp_request_new(&RequestCallback, this);
  void* iter = NULL;
  const char* name = NULL;
  const char* value = NULL;
  struct evkeyvalq *headers = evhttp_request_get_output_headers(ev_request_);
  while (http_request_->EnumerateHeaderLines(&iter, &name, &value)) {
    evhttp_add_header(headers, name, value);
  }
  value = http_request_->GetHeader("host");
  if (!value) {
    evhttp_add_header(headers, "host", host);
  }

  const char* body = NULL;
  size_t length = 0;
  if (http_request_->body(&body, &length)) {
    struct evbuffer *buffer = evhttp_request_get_output_buffer(ev_request_);
    evbuffer_add(buffer, body, length);
  }

  // evhttp_make_request() may trigger RequestCallback() synchronously and the
  // return code is 0. An example is DNS error.
  int rv = evhttp_make_request(ev_connection_, ev_request_,
      cmd_type, evhttp_uri_get_path(uri));
  evhttp_uri_free(uri);
  if (rv == -1) {
    ev_request_ = NULL;
    evhttp_connection_free(ev_connection_);
    ev_connection_ = NULL;
    *out_error_code = 3;
    return;
  }

  if (next_state_ == STATE_NONE)
    next_state_ = STATE_WAITING_REQUEST_CALLBACK;
  *out_error_code = 0;
  // evhttp_connection_set_timeout(m_conn, (int)request.timeoutInterval());
}

void HttpTransaction::Cancel() {
  next_state_ = STATE_NONE;
  if (call_delegate_timer_) {
    event_del(call_delegate_timer_);
    event_free(call_delegate_timer_);
    call_delegate_timer_ = NULL;
  }
  if (ev_request_) {
    if (evhttp_request_is_owned(ev_request_))
      evhttp_request_free(ev_request_);
    ev_request_ = NULL;
  }
  if (ev_connection_) {
    evhttp_connection_free(ev_connection_);
    ev_connection_ = NULL;
  }
}

void HttpTransaction::OnEvRequestCallback(evhttp_request* req) {
  SetupTimer();
  if (req == NULL ||
      evhttp_request_get_response_code(ev_request_) == 0) {
    ev_request_ = NULL;
    next_state_ = STATE_REPORT_ERROR;
  } else {
    assert(ev_request_ == req);
    evhttp_request_own(req);
    next_state_ = STATE_REPORT_RESPONSE;
  }
}

void HttpTransaction::OnTimerCallback() {
  State state = next_state_;
  next_state_ = STATE_NONE;
  switch (state) {
  case STATE_REPORT_RESPONSE:
    DoReportResponse();
    break;
  case STATE_REPORT_DATA:
    DoReportData();
    break;
  case STATE_REPORT_FINISH:
    DoReportFinish();
    break;
  case STATE_REPORT_ERROR:
    DoReportError();
    break;
  default:
    assert(0);
    break;
  }
}

void HttpTransaction::SetupTimer() {
  if (!call_delegate_timer_) {
    call_delegate_timer_ = evtimer_new(base_, &TimerCallback, this);
  }
  struct timeval tv = { 0, 0 };
  event_add(call_delegate_timer_, &tv);
}

void HttpTransaction::DoReportResponse() {
  SetupTimer();
  next_state_ = STATE_REPORT_DATA;
  MutableHttpResponse* response =
      MutableHttpResponse::Create();
  const char* url = http_request_->url();
  response->set_url(url);
  response->set_status_code(evhttp_request_get_response_code(ev_request_));
  struct evkeyvalq* headers = evhttp_request_get_input_headers(ev_request_);
  for (struct evkeyval* header = headers->tqh_first; header;
    header = header->next.tqe_next) {
    response->AddHeader(header->key, header->value);
  }
  delegate_->OnResponseReceived(this, *response);
  delete response;
}

void HttpTransaction::DoReportData() {
  struct evbuffer *buf = evhttp_request_get_input_buffer(ev_request_);
  if (evbuffer_get_length(buf)) {
    const int kBufferSize = 1024 * 12;
    char* buffer = new char[kBufferSize];
    int n = evbuffer_remove(buf, buffer, kBufferSize);

    SetupTimer();
    if (evbuffer_get_length(buf))
      next_state_ = STATE_REPORT_DATA;
    else
      next_state_ = STATE_REPORT_FINISH;

    if (n > 0) {
      struct timeval tv = { 0, 0 };
      event_add(call_delegate_timer_, &tv);
      delegate_->OnDataReceived(this, buffer, n);
    }
    delete buffer;
  } else {
    DoReportFinish();
  }
}

void HttpTransaction::DoReportFinish() {
  Cancel();
  delegate_->OnFinished(this);
}

void HttpTransaction::DoReportError() {
  Cancel();
  delegate_->OnError(this, 0xdead);
}

转载请注明出处:http://blog.csdn.net/hursing

GitHub 加速计划 / li / linux-dash
10.39 K
1.2 K
下载
A beautiful web dashboard for Linux
最近提交(Master分支:2 个月前 )
186a802e added ecosystem file for PM2 4 年前
5def40a3 Add host customization support for the NodeJS version 4 年前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐