前言

在前面的 EventLoop、Channel、Poller、Acceptor、Buffer、TcpConnection 等组件基础之上, 今天我们迎来整套 Reactor 模型网络框架的收官之作TcpServer(服务器)+ TcpClient(客户端)


一、整体架构总览(最清晰)

框架是标准主从 Reactor 多线程架构

1. Main Reactor(baseloop)

  • 只做 accept 新连接
  • 不处理任何 IO

2. Sub Reactor(EventLoopThreadPool)

  • 多个 EventLoop,每个一个线程
  • 负责 read/write/close
  • 实现多核 CPU 负载均衡

3. TcpServer

  • 管理 Acceptor + 线程池 + 连接池
  • 新连接到来 → 分配 Sub Reactor

4. TcpConnection

  • 一条连接的全权管理者
  • 收发数据、事件处理、状态机

5. TcpClient

  • 客户端主动发起连接
  • 支持重连、异步建立、线程安全获取连接

二、代码精讲

1. TcpServer.h 知识点

#pragma once
#include "eventloop.h"
#include "connection.h"
#include "acceptor.h"

namespace net
{
    class TcpServer
    {
    public:
        //初始化成员,设置acceptor回调函数
        TcpServer(EventLoop *loop,const InetAddress &addr);
        //将当前连接管理池中的所有连接关闭释放
        ~TcpServer();
        // 设置工作线程数
        void setThreadNum(int count);
        // 设置连接建立/关闭回调
        void setConnectionCallback(ConnectionCallback cb);
        // 设置消息到达回调
        void setMessageCallback(MessageCallback cb);
        //启动服务器
        void start();
    private:
        // 新连接到来
        void onNewConnection(int fd,InetAddress addr);
        // 移除连接
        void removeConnection(TcpConnectionPtr conn);
        void removeConnectionInLoop(TcpConnectionPtr conn);
    private:
        EventLoop *_baseloop;//主reactor(负责监听)
        EventLoopTreadPool _pool;//从属reactor线程池(负责IO)
        Acceptor _acceptor;//监听套接字管理对象
        std::atomic<int64_t> _next_conn_id;//连接标识
        std::unordered_map<int64_t,TcpConnectionPtr> _connections;//连接管理池
        ConnectionCallback _onConnection;
        MessageCallback _onMessage;
    };
}
  • Reactor 主从架构设计
  • Acceptor 与 EventLoop 绑定
  • EventLoopThreadPool 线程池管理
  • unordered_map 保存 TcpConnection 智能指针
  • atomic<int64_t> 原子连接 ID 分配
  • 连接回调、消息回调的类型定义
  • 线程安全的连接新增与移除接口

2. TcpServer.cc 知识点

#include "server.h"

namespace net
{
    // 初始化成员,设置acceptor回调函数
    TcpServer::TcpServer(EventLoop *loop,const InetAddress &addr)
    :_baseloop(loop),
    _pool(loop),
    _acceptor(loop,addr),
    _next_conn_id(1)
    {
        _acceptor.setNewConnectionCallback(std::bind(&TcpServer::onNewConnection,this,
            std::placeholders::_1,std::placeholders::_2));
    }
    // 将当前连接管理池中的所有连接关闭释放
    TcpServer::~TcpServer(){
        _baseloop->assertInLoopThread();
        for(auto &it:_connections){
            TcpConnectionPtr conn(it.second);
            it.second.reset();
            conn->loop()->queueInLoop(std::bind(&TcpConnection::connectDestroyed,conn));
        }
    }
    void TcpServer::setThreadNum(int count){
        _pool.setThreadNum(count);
    }
    void TcpServer::setConnectionCallback(ConnectionCallback cb){
        _onConnection=std::move(cb);
    }
    void TcpServer::setMessageCallback(MessageCallback cb){
        _onMessage=std::move(cb);
    }
    // 启动服务器
    void TcpServer::start()
    {
        //1.启动事件循环池
        _pool.start();
        //2.开始监听
        _baseloop->runInLoop(std::bind(&Acceptor::listen,&_acceptor));
    }
    void TcpServer::onNewConnection(int fd, InetAddress addr){
        _baseloop->assertInLoopThread();
        //新连接的处理:
        //1.给新连接分配一个loop事件循环
        EventLoop *ioloop=_pool.getNextLoop();
        //2.为新连接构造Connection对象
        auto id=_next_conn_id.fetch_add(1);
        TcpConnectionPtr conn(new TcpConnection(ioloop,fd,id));
        conn->setConnectionCallback(_onConnection);
        conn->setMessageCallback(_onMessage);
        conn->setCloseCallback(std::bind(&TcpServer::removeConnection,this,std::placeholders::_1));
        //3.添加conn管理
        _connections.insert(std::make_pair(id,conn));
        //4.调用连接就绪接口
        ioloop->runInLoop(std::bind(&TcpConnection::connectEstablished,conn));
    }
    void TcpServer::removeConnection(TcpConnectionPtr conn){
        
        _baseloop->runInLoop(std::bind(&TcpServer::removeConnectionInLoop,this,conn));
    }
    void TcpServer::removeConnectionInLoop(TcpConnectionPtr conn){
        _baseloop->assertInLoopThread();
        //从连接管理池中移除连接管理
        _connections.erase(conn->id());
        //执行连接销毁操作
        conn->loop()->queueInLoop(std::bind(&TcpConnection::connectDestroyed,conn));
    }
}
  • 主从 Reactor 线程池启动
  • Acceptor 新连接回调绑定
  • 轮询(RR)负载均衡分配连接
  • TcpConnection 智能指针安全管理
  • 跨线程调用 runInLoop
  • 连接建立 connectEstablished
  • 连接销毁 connectDestroyed
  • 服务器关闭时安全释放所有连接
  • 线程归属检查 assertInLoopThread

3. TcpClient.h 知识点

#pragma once
#include "eventloop.h"
#include "connection.h"

namespace net
{
    class TcpClient
    {
    public:
        // 初始化成员
        TcpClient(EventLoop *loop, InetAddress addr);
        // 关闭连接
        ~TcpClient();
        // 连接服务端
        void connect();
        // 获取通信连接对象
        TcpConnectionPtr connection();
        void setConnectionCallback(ConnectionCallback cb);
        void setMessageCallback(MessageCallback cb);

    private:
        // 连接重试接口
        void retry(int fd);
        void retryInLoop();
        // 连接成功时,为描述符构造通信连接对象
        void newConnection(int fd);
        // 连接关闭时,调用的回调函数,移除连接管理
        void removeConnection(TcpConnectionPtr conn);

    private:
        std::mutex _mutex;
        std::condition_variable _cond; // 等待连接建立完成
        InetAddress _srvAddr;          // 服务器地址
        EventLoop *_baseloop;          // 客户端事件循环
        TcpConnectionPtr _connection;  // 客户端连接
        ConnectionCallback _onConnection;
        MessageCallback _onMessage;
    };

}
  • 客户端非阻塞连接模型
  • mutex + condition_variable 线程安全等待连接
  • 客户端重连机制
  • 连接成功 / 失败错误码处理
  • 线程安全获取 TcpConnectionPtr
  • 连接关闭自动清理

4. TcpClient.cc 知识点

#include "client.h"

namespace net
{
    // 初始化成员
    TcpClient::TcpClient(EventLoop *loop, InetAddress addr)
    :_srvAddr(addr),
    _baseloop(loop){}
    // 关闭连接
    TcpClient::~TcpClient(){
        if(_connection){
            _connection->forceClose();
        }
    }
    void TcpClient::setConnectionCallback(ConnectionCallback cb){
        _onConnection=std::move(cb);
    }
    void TcpClient::setMessageCallback(MessageCallback cb){
        _onMessage=std::move(cb);
    }
    // 连接服务端:创建套接字,连接服务器(成功构造connection对象,否则错误处理)
    void TcpClient::connect(){
        int sockfd = sockets::createBlockSocket();
        int ret=sockets::connect(sockfd,_srvAddr.getSockAddr());
        int errNo=ret==0?0:errno;
        switch(errNo)
        {
            case 0://连接成功
            case EINTR://阻塞连接过程被信号中断
            case EISCONN://套接字已经连接成功
            case EINPROGRESS://套接字非阻塞的情况下连接服务器,返回正在连接中,需要监控可写事件,描述符可写了才代表连接成功
                _baseloop->runInLoop(std::bind(&TcpClient::newConnection,this,sockfd));
                break;
            case EAGAIN://连接立即返回无法确定是否连接成功
            case EADDRINUSE://当前连接绑定的地址被占用
            case EADDRNOTAVAIL://地址信息无效(通常在没有绑定地址内核会从空闲端口池绑定地址,但是端口池无空闲端口)
            case ECONNREFUSED://连接被拒绝,通常指服务端没有对连接端口进行监听
            case ENETUNREACH://网络不可达,通常指网络断开
            case ETIMEDOUT://网络连接超时
                retry(sockfd);
                break;
            case EACCES://权限错误
            case EPROTOTYPE://套接字类型错误
            case ENOTSOCK://描述符不是一个套接字描述符
            case EFAULT://用户空间地址不够用了
            case EBADF://文件描述符损坏
            case EALREADY://非阻塞套接字上已有连接
            case EAFNOSUPPORT://不支持的地址域
            default:
                LOG_FATAL("连接服务器失败");
                break;
        }
    }

    // 获取通信连接对象:连接对象还没有初始完毕就阻塞等待
    TcpConnectionPtr TcpClient::connection(){
        std::unique_lock<std::mutex> lock(_mutex);
        _cond.wait(lock,[this](){return (bool)_connection;});
        return _connection;
    }

    // 连接重试接口
    void TcpClient::retry(int fd){
        sockets::close(fd);
        _baseloop->runAfter(3,std::bind(&TcpClient::retryInLoop,this));
    }
    void TcpClient::retryInLoop(){
        _baseloop->assertInLoopThread();
        connect();//调用connect连接服务器端
    }
    // 连接成功时,为描述符构造通信连接对象
    void TcpClient::newConnection(int fd){
        _baseloop->assertInLoopThread();
        TcpConnectionPtr conn(new TcpConnection(_baseloop,fd,0));
        conn->setConnectionCallback(_onConnection);
        conn->setMessageCallback(_onMessage);
        conn->setCloseCallback(std::bind(&TcpClient::removeConnection,this,std::placeholders::_1));
        conn->connectEstablished();//将连接挂到loop上开始读事件监控
        {
            std::unique_lock<std::mutex> lock(_mutex);
            _connection=conn;
            _cond.notify_all();
        }
    }
    // 连接关闭时,调用的回调函数,移除连接管理
    void TcpClient::removeConnection(TcpConnectionPtr conn){
        _baseloop->assertInLoopThread();
        {
            std::unique_lock<std::mutex> lock(_mutex);
            _connection.reset();
        }
        conn->connectDestroyed();
    }
}
  • 阻塞 / 非阻塞 socket 连接
  • connect 系统调用错误码精细化处理
  • EINPROGRESS 非阻塞连接处理
  • 连接失败自动延迟重连(3s)
  • 客户端建立 newConnection
  • 客户端关闭 removeConnection
  • runAfter 定时器重连
  • 线程安全等待连接建立

5. 测试 main 函数知识点

服务端:

#include "../net/poller.h"
#include "../net/channel.h"
#include "../net/eventloop.h"
#include "../net/timestamp.h"
#include "../net/timer.h"
#include "../net/buffer.h"
#include "../net/socket.h"
#include "../net/acceptor.h"
#include "../net/connection.h"
#include "../net/server.h"
#include <iostream>

#if 0
int main()
{
    net::EventLoop baseloop;
    net::Timestamp now = net::Timestamp::now();
    std::cout<<now.toFormatString()<<std::endl;
    baseloop.runAt(net::addTime(now,2),[](){
        std::cout << "这是一个2秒后的定时任务" << std::endl;
    });
    baseloop.runAfter(4,[](){
        std::cout << "这是一个4秒后的延迟定时任务" << std::endl;
    });
    auto tid = baseloop.runEvery(1,[](){
        static int count=0;
        std::cout << "这是一个1秒后的循环定时任务" << std::endl;
    });
    baseloop.loop();
    return 0;
}
#endif

#if 0
int main()
{
    net::Buffer buffer;
    //添加1000条 HelloWorld+N\n
    uint32_t size=0;
    for(int i=0;i<1000;i++)
    {
        std::string str="HelloWorld+"+std::to_string(i)+"\n";
        buffer.append(str.c_str(),str.size());
        size+=str.size();
    }
    buffer.prepend(&size,sizeof(size));//添加前置数据
    assert(buffer.readableBytes()==sizeof(size)+size);

    uint32_t num;
    memcpy(&num,buffer.peek(),sizeof(num));//取出前四字节数据
    assert(num==size);
    buffer.retrieve(sizeof(num));
    
    for(int i=0;i<1000;i++)
    {
        std::optional<std::string> line=buffer.getline();

        std::string str ="HelloWorld+"+std::to_string(i)+"\n";
        assert(line);

        assert(*line==str);
    }
    assert(buffer.readableBytes()==0);
    return 0;
}
#endif

void onConnection(net::TcpConnectionPtr conn){
    if(conn->connected()){
        //LOG_DEBUG("连接建立");
    }else{
        //LOG_DEBUG("连接关闭");
    }
}
void onMessage(net::TcpConnectionPtr conn,net::Buffer* buf,net::Timestamp rtime){
    std::string str=buf->retrieveAllAsString();
    std::cout<<str<<std::endl;
    conn->send(str.data(),str.size());
    //conn->forceClose();
}
int main()
{
    net::EventLoop baseloop;
    net::TcpServer server(&baseloop,net::InetAddress(8080));
    server.setThreadNum(3);
    server.setConnectionCallback(onConnection);
    server.setMessageCallback(onMessage);
    server.start();
    baseloop.loop();//启动主事件循环
    return 0;
}

客户端:

#include "../net/eventloop.h"
#include "../net/client.h"
#include <iostream>
void onConnection(net::TcpConnectionPtr conn)
{
    if (conn->connected())
    {
        // LOG_DEBUG("连接建立");
    }
    else
    {
        // LOG_DEBUG("连接关闭");
    }
}
void onMessage(net::TcpConnectionPtr conn, net::Buffer *buf, net::Timestamp rtime)
{
    std::string str = buf->retrieveAllAsString();
    std::cout << str << std::endl;
}

int main()
{
    net::EventLoopThread loopthread;
    {
        net::TcpClient client(loopthread.startLoop(), net::InetAddress("127.0.0.1", 8080));
        client.connect();
        client.setConnectionCallback(onConnection);
        client.setMessageCallback(onMessage);
        auto connection = client.connection();
        for (int i = 0; i < 10; i++)
        {
            std::string str = "Hello World +" + std::to_string(i);
            connection->send(str.data(), str.size());
        }
        sleep(1);
    }
    sleep(1);
    return 0;
}
  • 服务器 Echo 服务实现
  • 客户端批量发送数据
  • EventLoop 启动与事件循环
  • 多线程客户端测试
  • 定时器使用演示
  • Buffer 粘包 / 半包测试
  • 完整网络框架使用示例

三、TcpServer:服务器核心调度器

核心职责

  • 启动服务器监听
  • 接收新连接
  • 分配 IO 线程(负载均衡)
  • 管理所有客户端连接
  • 安全关闭服务器

工作流程

  1. start() 启动线程池
  2. Acceptor 开始监听
  3. 新连接 → onNewConnection
  4. 轮询选择一个 SubReactor
  5. 创建 TcpConnection
  6. 加入连接管理 map
  7. 启动连接 connectEstablished

设计亮点

1. 主从 Reactor 多线程

one loop per thread

  • MainReactor:只负责接受连接
  • SubReactor:每个连接绑定一个 loop
  • 无锁、高性能、真正多核并行

2. 轮询负载均衡

ioloop = _pool.getNextLoop();

均匀分配连接,避免单核过热。

3. 连接自动管理

unordered_map<int64_t, TcpConnectionPtr> _connections;

服务器关闭时自动释放所有连接。


四、TcpClient:客户端连接器

核心职责

  • 主动连接服务器
  • 处理连接错误
  • 自动重连
  • 线程安全获取连接
  • 安全关闭

设计亮点

1. 完整错误码处理

支持:

  • EINTR
  • EISCONN
  • EINPROGRESS
  • ECONNREFUSED
  • ETIMEDOUT
  • ENETUNREACH
  • EAGAIN 等全部常见错误。

2. 自动重连机制

_baseloop->runAfter(3, std::bind(&TcpClient::retryInLoop, this));

连接失败 3 秒后自动重试。

3. 线程安全等待连接

_cond.wait(lock, [this]() {
    return (bool)_connection;
});

未连接成功前阻塞等待。


五、整套框架的工作流程(最终版)

服务端

  1. 启动 TcpServer
  2. MainReactor 监听端口
  3. 客户端连接 → Acceptor 接受
  4. 分配给 SubReactor
  5. 创建 TcpConnection
  6. 开始读写事件监听
  7. 数据到达 → 消息回调
  8. 连接关闭 → 自动释放

客户端

  1. 创建 TcpClient
  2. 调用 connect ()
  3. 处理连接错误 / 重连
  4. 连接建立 → 回调通知
  5. 发送 / 接收数据
  6. 断开连接 → 自动清理

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐