简介

说到异步IO,高并发之类的名词, 可能很多人第一反应就是 select, poll, epoll, kqueue 之类的底层代码库。 但是其实除非你要写一个 Nginx 性能级别的服务器, 否则直接使用 epoll 之类的还是太过底层, 诸多不便,要榨干整个异步编程的高并发性能还需要开发很多相关组件, 而 Libevent 就是作为更好用的高性能异步编程网络库而生, 他帮你包装了各种 buffer 和 event, 甚至也提供了更加高层的 http 和 rpc 等接口, 可以让你脱离底层细节,更加专注于服务的其他核心功能的实现。 当然,要真正用好它,还是需要懂不少关于他的实现原理。

如果是第一次接触 Libevent 的可以先看一篇非常好的入门文章: Libevent-book , 文章主要从 C10K 问题的发展循序渐进, 分别讲了在高并发连接的情况下, 多线程解决方案, 多进程解放方案会遇到的问题, 从而引出为什么异步IO是当前解决高并发连接最有效的方案。

ideawu 实现的 C1000K 服务器 icomet 核心就是基于 Libevent 实现的。

本文源码分析基于 Libevent master 分支 commit 6dba1694c89119c44cef03528945e5a5978ab43a 版本的代码。

事件循环

既然是异步IO,事件驱动,自然会有事件循环(event loop) 。 Libevent 的事件循环是通过调用 event_base_dispatch 来实现, 其实 event_base_dispatch 函数也是调用了 event_base_loop, 代码如下:

int event_base_dispatch(struct event_base *event_base) { return (event_base_loop(event_base, 0)); } 

运行事件循环的函数肯定是阻塞函数, 拿 linux 平台来说, libevent 的事件循环其实就是循环调用 epoll_wait 函数,

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout); 

在没有任何事件被触发的时候,epoll_wait 是阻塞等待的, 而且,在 Libevent 里,定时器的实现其实就是通过 epoll_wait 的 timeout 参数实现的。

如果只有一个单线程的话,一旦调用了 event_base_dispatch 之后,这个线程就会被事件完完全全的霸占, 无法进行任何其他的操作。

如果我们需要临时手动激活任何其他事件的话, 则需要借助另一个线程来操作(因为主线程仍然在阻塞等待中)。

在 event_active 函数的解释里面就有一句话说的就是这个事情:

One common use in multithreaded programs is to wake the thread running event_base_loop() from another thread. 

事件是 Libevent 的最小单位

这里的事件指的就是 struct event 数据结构。

事件可以注册的各种信号如下:

#define EV_TIMEOUT 0x01 Indicates that a timeout has occurred. #define EV_READ 0x02 Wait for a socket or FD to become readable. #define EV_WRITE 0x04 Wait for a socket or FD to become writeable. #define EV_SIGNAL 0x08 Wait for a POSIX signal to be raised. #define EV_PERSIST 0x10 Persistent event: won't get removed automatically when activated. #define EV_ET 0x20 Select edge-triggered behavior, if supported by the backend. 

几乎所有其它更上层的数据结构都是基于 struct event 的包装来完成的。

核心数据结构

就拿 基于 Libevent 写一个 HTTP 服务器举例来说说, 编程时需要理解的几个核心数据结构是:

  • evhttp_request
  • evhttp_connection
  • bufferevent
  • evbuffer

从上到下是从高层到底层的关系, 下文的顺序也是从高层往底层分析。

evhttp_request

struct evhttp_request { // 每个 evhttp_request 都内含一个 evhttp_connection 来负责数据传输 struct evhttp_connection *evcon; //输入输出的两个buffer(这两个buffer的数据是从 evhttp_connection 里面的 input_buffer 和 output_buffer 拷贝过来的。) struct evbuffer *input_buffer; /* read data */ ev_int64_t ntoread; unsigned chunked:1, /* a chunked request */ userdone:1; /* the user has sent all data */ struct evbuffer *output_buffer; /* outgoing post or data */ //和HTTP协议有关的各种回调函数: void (*cb)(struct evhttp_request *, void *); void *cb_arg; void (*chunk_cb)(struct evhttp_request *, void *); int (*header_cb)(struct evhttp_request *, void *); void (*error_cb)(enum evhttp_request_error, void *); void (*on_complete_cb)(struct evhttp_request *, void *); void *on_complete_cb_arg; // 其它 // ...... }; 

evhttp_connection

evhttp_request 和 evhttp_connection 的关系很简单,拿协议栈来对比的话, 前者代表的是 HTTP 协议,即应用层协议, 后者代表的是 TCP 协议,即传输层协议。 前者需要管理所有和 HTTP 相关的数据内容,比如 HTTP header 数据和 body 数据。

struct evhttp_connection { // socket文件描述符 evutil_socket_t fd; // evhttp_connection 含有一个bufferevent,基于它进行数据传输。 struct bufferevent *bufev; // 和数据传输有关的状态 enum evhttp_connection_state state; //和 HTTP 协议无关的数据传输回调函数 void (*cb)(struct evhttp_connection *, void *); void *cb_arg; void (*closecb)(struct evhttp_connection *, void *); void *closecb_arg; // 其它 // ...... }; 

evhttp_connection 结构里面含有 enum evhttp_connection_state state 变量, 这个和 Thrift异步IO服务器源码分析 里面的 保持每个连接的状态是一个道理。 维护状态变化是异步IO服务编程的必要条件。

enum evhttp_connection_state { EVCON_DISCONNECTED, /**< not currently connected not trying either*/ EVCON_CONNECTING, /**< tries to currently connect */ EVCON_IDLE, /**< connection is established */ EVCON_READING_FIRSTLINE,/**< reading Request-Line (incoming conn) or **< Status-Line (outgoing conn) */ EVCON_READING_HEADERS, /**< reading request/response headers */ EVCON_READING_BODY, /**< reading request/response body */ EVCON_READING_TRAILER, /**< reading request/response chunked trailer */ EVCON_WRITING /**< writing request/response headers/body */ }; 

bufferevent

bufferevent 就是包装了 EV_READ event 和 EV_WRITE event , 并且带有读写缓冲区(evbuffer)的更高层的单位。

struct bufferevent { // 读写事件 struct event ev_read; struct event ev_write; // 读写缓冲区 struct evbuffer *input; struct evbuffer *output; // 注意三个回调函数是核心 bufferevent_data_cb readcb; bufferevent_data_cb writecb; bufferevent_event_cb errorcb; void *cbarg; // 其他 // ...... }; 

我觉得上面代码就非常简洁易懂了, 两个读写时间没什么好说的, 两个读写缓冲区也是必须的(evbuffer的实现在后面会谈到), 三个回调函数就是核心。 读和写的回调函数没什么好说的, 唯一需要注意的是 bufferevent_event_cb errorcb 这个回调函数是必须注册的, 它关系到当该 bufferevent 对应的时间发生读写外的任何行为(比如socket关闭)时, 都会触发。

整理一遍 bufferevent 的事件处理过程就是:

  • 当可读事件发生时,调用 readcb 将 socket 的数据 通过 recv 读出来存入 input 缓冲区;
  • 当可写事件发生时,调用 writecb 将 output 缓冲区里面的数据通过 socket send 发送出去;
  • 当其他事件发生时,比如 socket close 发生,进行相应的数据清理退出工作。

evbuffer

基本上的异步IO服务里的buffer都是一个德行(包括Nginx也是这样), 都是即是数组又是链表(类似C++ STL里面的deque)。 对于 libevent 来说, evbuffer 是一个链表,管理整个缓冲区的头指针和尾指针,

struct evbuffer { struct evbuffer_chain *first; struct evbuffer_chain *last; size_t total_len; //...... }; 

对于 evbuffer 这个链表的每个单元,也就是 evbuffer_chain 来说, 则是数组(连续内存空间),

struct evbuffer_chain { struct evbuffer_chain *next; size_t buffer_len; size_t off; unsigned char *buffer; //...... }; 

总结

对于我个人而言,读源码的时候主要是从核心数据结构入手, 如果理解了这几个核心数据结构, 一般就能猜到这些数据结构的相关函数都有哪些。 可以围绕着这些结构去找相关的函数为己所用。

C1000K之Libevent源码分析
标签: