何为 Reactor 模式

Reactor 模式是一种网络编程的模型,它基于操作系统提供的 IO 多路复用的相关 API 实现了基于事件的网络编程模式。基于事件驱动的方式,可以充分利用计算机资源。

要想搞懂 Reactor 模式,必须要知道 select/poll/epoll 这样的 API,因为 Reactor 模式依赖于这些系统 API。如果不清楚这些 API 的用法,需要先去搞懂,否则肯定学不明白 Reactor。后文中以 epoll 为例进行说明。

一个例子

既然 epoll 可以在文件描述符可读、可写或出错的时候提醒用户,那么可以采用如下思路:

  • 创建套接字 server_socklisten,把 server_sock 加入 epoll,注册读事件。
  • server_sock 可读时,可以无阻塞地 accept,把 accept 到的 client_sock 加入 epoll,注册读事件。
  • client_sock 可读时,无阻塞地读取 client_sock 上的数据,然后处理。处理完成后注册 client_sock 的可写事件。
  • client_sock 可写时,把处理后的结果写入。如果写完了,那就取消注册写事件。
  • 如果 client_sock 读写出错,那就把它从 epoll 中删除。

以上步骤,accept read write process 这些操作都是由 epoll 的事件驱动的。上面这个步骤就是 Reactor 模式。

下面基于 reactor 模式实现了一个 server,功能是客户传来的输入中的小写字母转换大写字母,然后返回给客户。

#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <errno.h>
#include <unordered_map>
#include <string>
#include <algorithm>
#include <iterator>

// 这个头文件中封装了 socket 的常用操作
// https://github.com/wy-ei/ws/blob/master/net/Socket.h
#include "Socket.h"

// 根据前文的描述,我们不难发现,每个套接字上都需要存有两个缓存区
// 接受到的数据,需要加入读缓冲区中,等待处理。处理完成后的数据需要加入
// 写缓冲区中,等待写事件的到来。这里我定义了一个类,用来维护缓冲区
struct Connection{
    Socket sock;
    std::string input_buffer;
    std::string output_buffer;
    void process(){
        // 把输入内容转为大写
        std::transform(input_buffer.begin(), input_buffer.end(), std::back_inserter(output_buffer), [](char ch){
            return toupper(ch);
        });
        input_buffer.clear();
    }
};


int main(int argc, char *argv[]){
    // 创建套接字并监听
    Socket server_socket(AF_INET, SOCK_STREAM);
    server_socket.bind({"127.0.0.1", 8001});
    server_socket.listen();

    int efd = epoll_create(5);

    // 把 server_sock 加入 epoll    
    epoll_event ev{};
    ev.events = EPOLLIN;
    ev.data.fd = server_socket.fd();
    epoll_ctl(efd, EPOLL_CTL_ADD, server_socket.fd(), &ev);

    epoll_event events[10];
    char buffer[1024];
    std::unordered_map<int, Connection> fd_to_conn;

    while(true){
        // 等待事件发生,如果在 3000ms 内没有事件发生,就会超时
        int ms = 30000;
        int n = epoll_wait(efd, events, 10, ms);
        if(n == -1){
            perror("epoll_wait");
            exit(1);
        }
        if(n == 0) {
            printf("no input arrive after %d ms\n", ms);
            continue;
        }

        for(int i=0; i < n;i++){
            epoll_event event = events[i];
            int fd = event.data.fd;

            // 如果是 server_socket 上的事件,就说明有新的连接到来
            // 此时可以无阻塞地 accept
            if(fd == server_socket.fd()){
                Socket client_socket = server_socket.accept();

                // 把客户加入 epoll,并注册可读事件。`EPOLLIN` 表示可读事件。
                ev.events = EPOLLIN;
                ev.data.fd = client_socket.fd();
                epoll_ctl(efd, EPOLL_CTL_ADD, client_socket.fd(), &ev);

                // 记录下此连接
                Connection conn;
                conn.sock = client_socket;
                fd_to_conn.emplace(client_socket.fd(), conn);
            }else{
                // 根据事件对应的 fd 取出连接对象,注意是引用
                Connection& conn = fd_to_conn[fd];

                // 可读事件
                if(event.events & EPOLLIN){
                    // 读套接字
                    int n_read = conn.sock.recv(buffer, sizeof(buffer));
                    if(n_read > 0) {
                        // 读取成功了,加入读缓冲区
                        conn.input_buffer.append(buffer, n_read);
                        
                        // 处理读取到的内容,这一步如果耗时较多,通常需要在别的线程中进行
                        conn.process();

                        // 处理完成后,就会有可写的数据,因此在 epoll 上注册可写事件
                        ev.data.fd = conn.sock.fd();
                        ev.events = EPOLLOUT | EPOLLIN;
                        epoll_ctl(efd, EPOLL_CTL_MOD, conn.sock.fd(), &ev);
                    }else{
                        // 如果出错或者断开,从 epoll 中删除 fd
                        epoll_ctl(efd, EPOLL_CTL_DEL, conn.sock.fd(), nullptr);
                        fd_to_conn.erase(fd);
                        close(fd);
                    }
                }
                else if(event.events & EPOLLOUT){
                    // 把写缓冲区中的内容写入套接字
                    int n_write = conn.sock.send(conn.output_buffer.data(), conn.output_buffer.size());
                    if(n_write > 0){
                        // 把写过的内容删除掉。这里的实现很不高效,只是为了演示原理
                        conn.output_buffer.erase(conn.output_buffer.begin(), conn.output_buffer.begin() + n_write);
                        // 如果没有数据可写了,取消注册写事件
                        if(conn.output_buffer.empty()){
                            ev.data.fd = conn.sock.fd();
                            ev.events = EPOLLIN;
                            epoll_ctl(efd, EPOLL_CTL_MOD, conn.sock.fd(), &ev);
                        }
                    }else{
                        // 如果出错,从 epoll 中删除 fd
                        epoll_ctl(efd, EPOLL_CTL_DEL, conn.sock.fd(), nullptr);
                        fd_to_conn.erase(fd);
                        close(fd);
                    }
                }
            }
        } // end for
    } // end while
}

上面的代码中,我实现了基于 reactor 模型的单线程 echo 服务器。可以使用 nc 测试一下:

$ nc 127.0.0.1 8001

上面的代码中没有使用任何抽象,主函数写的很冗长,只是为了便于理解。如果理解了这种基于 epoll 的事件驱动方式,那么就很容易做其他改进了。

把上面的代码做一总结,如果了解 JavaScript 中注册回调的那个套路。这里就可以把 epoll 看做某个可以在上面注册事件的对象,这里叫做 reactor。一开始只是注册了 server_socketread 事件,在事件的回调中又注册了更多的事件。但整个逻辑都在回调函数中。这便是事件驱动了。

Reactor reactor;

void write_data(Socket client){
    client.write();
}

void read_data(Socket client){
    client.read();
    reactor.addEventListener(client, 'write', write_data)
}

reactor.addEventListener(server_socket, 'read', []{
    client = server_socket.accept();
    reactor.addEventListener(client, 'read', read_data);
});

单线程 Reactor 模型

单个线程中使用一个 reactor 来监听所有事件,每个事件的处理函数都需要执行的很快才行,否则就会阻塞在某个事件的处理上。稍微复杂点的程序,往往在处理环节都需要消耗不少时间,比如涉及到同步地读取文件。因此单线程的 Reactor 模型往往只能运用于较简单的场合。

如果处理环节需要消耗不少时间,可以考虑使用工作线程。使用一个线程池,把需要进行的处理操作放到进程池的任务队列中,等待处理。主线程可以继续监听 epoll 上的事件。处理任务被执行完成后,可以注册可写入的事件。等到被读取事件驱动。

多线程 Reactor 模型

既然套接字的读取到的结果的处理往往需要消耗不少时间,需要借助多线程来处理。那么可以在多个线程中使用 epoll,每个线程中只监听部分套接字,然后套接字上读取到的数据的处理可以就在本线程中进行。

这就形成了如下方案:

在一个线程中只执行 accept,然后将 accept 到的套接字 client 加入到其他线程中的 reactor 中,client 自始至终都在该线程中被处理。这样,就把整个任务分散到不同的线程中去了。

这种设计也叫做主从模式。要想实现主从模式,就需要一番仔细设计了,向前面那样面条式的代码就行不通了。

主从 Reactor 模式代码实现

下面的代码实现了一个简单的主从 Reactor 模型,在主线程中执行 accept 操作,然后把接收到的套接字交给其他 reactor 来管理。

#include <unistd.h>
#include <sys/epoll.h>
#include <unordered_map>
#include <string>
#include <algorithm>
#include <iterator>
#include <mutex>
#include <thread>
#include <vector>
#include <functional>
#include "Socket.h"

// 由于 Connection 和 Reactor 两个类交叉引用了,因此这里采用前置声明
class Reactor;

struct Connection{
    void read();
    void write();
    void process();

    Reactor* reactor { nullptr };
    Socket sock;
    std::string input_buffer;
    std::string output_buffer;
};


class Reactor{
public:
    void add(Connection& conn);
    void remove(const Connection& conn);
    void update(const Connection& conn, int events);
    void run();
private:
    std::unordered_map<int, Connection> fd_to_conn_;
    std::mutex mutex_;
    int epoll_fd_ { -1 };
};


void Connection::read() {
    char buffer[1024];
    int n_read = sock.recv(buffer, sizeof(buffer));
    if(n_read > 0) {
        input_buffer.append(buffer, n_read);
        process();

        // 让 epoll 开始监听可写事件
        reactor->update(*this, EPOLLOUT | EPOLLIN);
    }else{
        // 出错了,让 reactor 删除本连接
        reactor->remove(*this);
    }
}

void Connection::write() {
    int n_write = sock.send(output_buffer.data(), output_buffer.size());
    if(n_write > 0) {
        output_buffer.erase(output_buffer.begin(), output_buffer.begin() + n_write);
        if(output_buffer.empty()){
            // 没得写了,不在监听可写事件
            reactor->update(*this, EPOLLIN);
        }
    }else{
        reactor->remove(*this);
    }
}

void Connection::process() {
    std::transform(input_buffer.begin(), input_buffer.end(), std::back_inserter(output_buffer), [](char ch){
        return toupper(ch);
    });
    input_buffer.clear();
}


void Reactor::add(Connection &conn) {
    conn.reactor = this;
    int fd = conn.sock.fd();
    {
        std::unique_lock<std::mutex> lock(mutex_);
        fd_to_conn_.emplace(fd, conn);
    }

    epoll_event ev{};
    ev.events = EPOLLIN;
    ev.data.fd = fd;
    epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev);
}

void Reactor::remove(const Connection &conn) {
    int fd = conn.sock.fd();
    epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
    {
        std::unique_lock<std::mutex> lock(mutex_);
        fd_to_conn_.erase(fd);;
    }

}

void Reactor::update(const Connection &conn, int events) {
    int fd = conn.sock.fd();
    epoll_event ev{};
    ev.events = events;
    ev.data.fd = fd;
    epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &ev);
}

void Reactor::run() {
    // 启动后立刻创建一个 epoll
    epoll_fd_ = epoll_create(5);

    epoll_event events[10];
    while(true){
        // 等待事件发生
        int n = epoll_wait(epoll_fd_, events, 10, -1);
        for(int i=0; i < n;i++){
            epoll_event event = events[i];
            int fd = event.data.fd;

            std::unique_lock<std::mutex> lock(mutex_);
            Connection& conn = fd_to_conn_[fd];
            lock.unlock();

            if(event.events & EPOLLIN){
                conn.read();
            }else if(event.events & EPOLLOUT){
                conn.write();
            }
        }
    }
}


int main(int argc, char *argv[]){
    // 创建套接字并监听
    Socket server_socket(AF_INET, SOCK_STREAM);
    server_socket.bind({"127.0.0.1", 8001});
    server_socket.listen();

    int efd = epoll_create(5);

    // 把 server_socket 加入 epoll
    epoll_event ev{};
    ev.events = EPOLLIN;
    ev.data.fd = server_socket.fd();
    epoll_ctl(efd, EPOLL_CTL_ADD, server_socket.fd(), &ev);

    // 创建子 reactor
    std::vector<Reactor*> reactors;
    for(int i=0;i<10;i++){
        reactors.push_back(new Reactor);
        // 这里创建了线程,线程启动后会立刻执行 Reactor::run
        std::thread t(std::bind(&Reactor::run, reactors.back()));
        t.detach();
    }

    int i = 0;
    while(true){
        epoll_event events[1];
        int n = epoll_wait(efd, events, 1, -1);
        if(n == -1){
            perror("epoll_wait");
            exit(1);
        }
        Socket client_socket = server_socket.accept();

        // 选择一个 reactor,这里采取轮流选择的策略
        Reactor* reactor = reactors[i++];
        if(i == reactors.size()){
            i = 0;
        }

        // 把此连接加入 reactor
        Connection conn;
        conn.sock = client_socket;
        reactor->add(conn);
    }
}

以上代码实现了一个 server,它把输入转换成大写然后返回回去。代码中抽象出了 Reactor 类,这个类和 server 的功能完全没有耦合。只需要在 connection 中修改业务逻辑即可。因此,Reactor 模式是一个很通用的模型。