Reactor 模式原理
何为 Reactor 模式
Reactor 模式是一种网络编程的模型,它基于操作系统提供的 IO 多路复用的相关 API 实现了基于事件的网络编程模式。基于事件驱动的方式,可以充分利用计算机资源。
要想搞懂 Reactor 模式,必须要知道 select
/poll
/epoll
这样的 API,因为 Reactor 模式依赖于这些系统 API。如果不清楚这些 API 的用法,需要先去搞懂,否则肯定学不明白 Reactor。后文中以 epoll
为例进行说明。
一个例子
既然 epoll 可以在文件描述符可读、可写或出错的时候提醒用户,那么可以采用如下思路:
- 创建套接字
server_sock
并listen
,把server_sock
加入 epoll,注册读事件。 - 当
server_sock
可读时,可以无阻塞地accept
,把accept
到的client_sock
加入 epoll,注册读事件。 - 当
client_sock
可读时,无阻塞地读取client_sock
上的数据,然后处理。处理完成后注册client_sock
的可写事件。 - 当
client_sock
可写时,把处理后的结果写入。如果写完了,那就取消注册写事件。 - 如果
client_sock
读写出错,那就把它从 epoll 中删除。
以上步骤,accept
read
write
process
这些操作都是由 epoll 的事件驱动的。上面这个步骤就是 Reactor 模式。
下面基于 reactor 模式实现了一个 server,功能是客户传来的输入中的小写字母转换大写字母,然后返回给客户。
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <errno.h>
#include <unordered_map>
#include <string>
#include <algorithm>
#include <iterator>
// 这个头文件中封装了 socket 的常用操作
// https://github.com/wy-ei/ws/blob/master/net/Socket.h
#include "Socket.h"
// 根据前文的描述,我们不难发现,每个套接字上都需要存有两个缓存区
// 接受到的数据,需要加入读缓冲区中,等待处理。处理完成后的数据需要加入
// 写缓冲区中,等待写事件的到来。这里我定义了一个类,用来维护缓冲区
struct Connection{
Socket sock;
std::string input_buffer;
std::string output_buffer;
void process(){
// 把输入内容转为大写
std::transform(input_buffer.begin(), input_buffer.end(), std::back_inserter(output_buffer), [](char ch){
return toupper(ch);
});
input_buffer.clear();
}
};
int main(int argc, char *argv[]){
// 创建套接字并监听
Socket server_socket(AF_INET, SOCK_STREAM);
server_socket.bind({"127.0.0.1", 8001});
server_socket.listen();
int efd = epoll_create(5);
// 把 server_sock 加入 epoll
epoll_event ev{};
ev.events = EPOLLIN;
ev.data.fd = server_socket.fd();
epoll_ctl(efd, EPOLL_CTL_ADD, server_socket.fd(), &ev);
epoll_event events[10];
char buffer[1024];
std::unordered_map<int, Connection> fd_to_conn;
while(true){
// 等待事件发生,如果在 3000ms 内没有事件发生,就会超时
int ms = 30000;
int n = epoll_wait(efd, events, 10, ms);
if(n == -1){
perror("epoll_wait");
exit(1);
}
if(n == 0) {
printf("no input arrive after %d ms\n", ms);
continue;
}
for(int i=0; i < n;i++){
epoll_event event = events[i];
int fd = event.data.fd;
// 如果是 server_socket 上的事件,就说明有新的连接到来
// 此时可以无阻塞地 accept
if(fd == server_socket.fd()){
Socket client_socket = server_socket.accept();
// 把客户加入 epoll,并注册可读事件。`EPOLLIN` 表示可读事件。
ev.events = EPOLLIN;
ev.data.fd = client_socket.fd();
epoll_ctl(efd, EPOLL_CTL_ADD, client_socket.fd(), &ev);
// 记录下此连接
Connection conn;
conn.sock = client_socket;
fd_to_conn.emplace(client_socket.fd(), conn);
}else{
// 根据事件对应的 fd 取出连接对象,注意是引用
Connection& conn = fd_to_conn[fd];
// 可读事件
if(event.events & EPOLLIN){
// 读套接字
int n_read = conn.sock.recv(buffer, sizeof(buffer));
if(n_read > 0) {
// 读取成功了,加入读缓冲区
conn.input_buffer.append(buffer, n_read);
// 处理读取到的内容,这一步如果耗时较多,通常需要在别的线程中进行
conn.process();
// 处理完成后,就会有可写的数据,因此在 epoll 上注册可写事件
ev.data.fd = conn.sock.fd();
ev.events = EPOLLOUT | EPOLLIN;
epoll_ctl(efd, EPOLL_CTL_MOD, conn.sock.fd(), &ev);
}else{
// 如果出错或者断开,从 epoll 中删除 fd
epoll_ctl(efd, EPOLL_CTL_DEL, conn.sock.fd(), nullptr);
fd_to_conn.erase(fd);
close(fd);
}
}
else if(event.events & EPOLLOUT){
// 把写缓冲区中的内容写入套接字
int n_write = conn.sock.send(conn.output_buffer.data(), conn.output_buffer.size());
if(n_write > 0){
// 把写过的内容删除掉。这里的实现很不高效,只是为了演示原理
conn.output_buffer.erase(conn.output_buffer.begin(), conn.output_buffer.begin() + n_write);
// 如果没有数据可写了,取消注册写事件
if(conn.output_buffer.empty()){
ev.data.fd = conn.sock.fd();
ev.events = EPOLLIN;
epoll_ctl(efd, EPOLL_CTL_MOD, conn.sock.fd(), &ev);
}
}else{
// 如果出错,从 epoll 中删除 fd
epoll_ctl(efd, EPOLL_CTL_DEL, conn.sock.fd(), nullptr);
fd_to_conn.erase(fd);
close(fd);
}
}
}
} // end for
} // end while
}
上面的代码中,我实现了基于 reactor 模型的单线程 echo 服务器。可以使用 nc
测试一下:
$ nc 127.0.0.1 8001
上面的代码中没有使用任何抽象,主函数写的很冗长,只是为了便于理解。如果理解了这种基于 epoll 的事件驱动方式,那么就很容易做其他改进了。
把上面的代码做一总结,如果了解 JavaScript
中注册回调的那个套路。这里就可以把 epoll 看做某个可以在上面注册事件的对象,这里叫做 reactor。一开始只是注册了 server_socket
的 read
事件,在事件的回调中又注册了更多的事件。但整个逻辑都在回调函数中。这便是事件驱动了。
Reactor reactor;
void write_data(Socket client){
client.write();
}
void read_data(Socket client){
client.read();
reactor.addEventListener(client, 'write', write_data)
}
reactor.addEventListener(server_socket, 'read', []{
client = server_socket.accept();
reactor.addEventListener(client, 'read', read_data);
});
单线程 Reactor 模型
单个线程中使用一个 reactor 来监听所有事件,每个事件的处理函数都需要执行的很快才行,否则就会阻塞在某个事件的处理上。稍微复杂点的程序,往往在处理环节都需要消耗不少时间,比如涉及到同步地读取文件。因此单线程的 Reactor 模型往往只能运用于较简单的场合。
如果处理环节需要消耗不少时间,可以考虑使用工作线程。使用一个线程池,把需要进行的处理操作放到进程池的任务队列中,等待处理。主线程可以继续监听 epoll 上的事件。处理任务被执行完成后,可以注册可写入的事件。等到被读取事件驱动。
多线程 Reactor 模型
既然套接字的读取到的结果的处理往往需要消耗不少时间,需要借助多线程来处理。那么可以在多个线程中使用 epoll,每个线程中只监听部分套接字,然后套接字上读取到的数据的处理可以就在本线程中进行。
这就形成了如下方案:
在一个线程中只执行 accept
,然后将 accept
到的套接字 client
加入到其他线程中的 reactor
中,client
自始至终都在该线程中被处理。这样,就把整个任务分散到不同的线程中去了。
这种设计也叫做主从模式。要想实现主从模式,就需要一番仔细设计了,向前面那样面条式的代码就行不通了。
主从 Reactor 模式代码实现
下面的代码实现了一个简单的主从 Reactor 模型,在主线程中执行 accept
操作,然后把接收到的套接字交给其他 reactor 来管理。
#include <unistd.h>
#include <sys/epoll.h>
#include <unordered_map>
#include <string>
#include <algorithm>
#include <iterator>
#include <mutex>
#include <thread>
#include <vector>
#include <functional>
#include "Socket.h"
// 由于 Connection 和 Reactor 两个类交叉引用了,因此这里采用前置声明
class Reactor;
struct Connection{
void read();
void write();
void process();
Reactor* reactor { nullptr };
Socket sock;
std::string input_buffer;
std::string output_buffer;
};
class Reactor{
public:
void add(Connection& conn);
void remove(const Connection& conn);
void update(const Connection& conn, int events);
void run();
private:
std::unordered_map<int, Connection> fd_to_conn_;
std::mutex mutex_;
int epoll_fd_ { -1 };
};
void Connection::read() {
char buffer[1024];
int n_read = sock.recv(buffer, sizeof(buffer));
if(n_read > 0) {
input_buffer.append(buffer, n_read);
process();
// 让 epoll 开始监听可写事件
reactor->update(*this, EPOLLOUT | EPOLLIN);
}else{
// 出错了,让 reactor 删除本连接
reactor->remove(*this);
}
}
void Connection::write() {
int n_write = sock.send(output_buffer.data(), output_buffer.size());
if(n_write > 0) {
output_buffer.erase(output_buffer.begin(), output_buffer.begin() + n_write);
if(output_buffer.empty()){
// 没得写了,不在监听可写事件
reactor->update(*this, EPOLLIN);
}
}else{
reactor->remove(*this);
}
}
void Connection::process() {
std::transform(input_buffer.begin(), input_buffer.end(), std::back_inserter(output_buffer), [](char ch){
return toupper(ch);
});
input_buffer.clear();
}
void Reactor::add(Connection &conn) {
conn.reactor = this;
int fd = conn.sock.fd();
{
std::unique_lock<std::mutex> lock(mutex_);
fd_to_conn_.emplace(fd, conn);
}
epoll_event ev{};
ev.events = EPOLLIN;
ev.data.fd = fd;
epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev);
}
void Reactor::remove(const Connection &conn) {
int fd = conn.sock.fd();
epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
{
std::unique_lock<std::mutex> lock(mutex_);
fd_to_conn_.erase(fd);;
}
}
void Reactor::update(const Connection &conn, int events) {
int fd = conn.sock.fd();
epoll_event ev{};
ev.events = events;
ev.data.fd = fd;
epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &ev);
}
void Reactor::run() {
// 启动后立刻创建一个 epoll
epoll_fd_ = epoll_create(5);
epoll_event events[10];
while(true){
// 等待事件发生
int n = epoll_wait(epoll_fd_, events, 10, -1);
for(int i=0; i < n;i++){
epoll_event event = events[i];
int fd = event.data.fd;
std::unique_lock<std::mutex> lock(mutex_);
Connection& conn = fd_to_conn_[fd];
lock.unlock();
if(event.events & EPOLLIN){
conn.read();
}else if(event.events & EPOLLOUT){
conn.write();
}
}
}
}
int main(int argc, char *argv[]){
// 创建套接字并监听
Socket server_socket(AF_INET, SOCK_STREAM);
server_socket.bind({"127.0.0.1", 8001});
server_socket.listen();
int efd = epoll_create(5);
// 把 server_socket 加入 epoll
epoll_event ev{};
ev.events = EPOLLIN;
ev.data.fd = server_socket.fd();
epoll_ctl(efd, EPOLL_CTL_ADD, server_socket.fd(), &ev);
// 创建子 reactor
std::vector<Reactor*> reactors;
for(int i=0;i<10;i++){
reactors.push_back(new Reactor);
// 这里创建了线程,线程启动后会立刻执行 Reactor::run
std::thread t(std::bind(&Reactor::run, reactors.back()));
t.detach();
}
int i = 0;
while(true){
epoll_event events[1];
int n = epoll_wait(efd, events, 1, -1);
if(n == -1){
perror("epoll_wait");
exit(1);
}
Socket client_socket = server_socket.accept();
// 选择一个 reactor,这里采取轮流选择的策略
Reactor* reactor = reactors[i++];
if(i == reactors.size()){
i = 0;
}
// 把此连接加入 reactor
Connection conn;
conn.sock = client_socket;
reactor->add(conn);
}
}
以上代码实现了一个 server,它把输入转换成大写然后返回回去。代码中抽象出了 Reactor 类,这个类和 server 的功能完全没有耦合。只需要在 connection 中修改业务逻辑即可。因此,Reactor 模式是一个很通用的模型。