【C++ 高性能网络框架收官篇】TcpServer / TcpClient 架构解析与实现
·
前言
在前面的 EventLoop、Channel、Poller、Acceptor、Buffer、TcpConnection 等组件基础之上, 今天我们迎来整套 Reactor 模型网络框架的收官之作: TcpServer(服务器)+ TcpClient(客户端)。
一、整体架构总览(最清晰)
框架是标准主从 Reactor 多线程架构:
1. Main Reactor(baseloop)
- 只做 accept 新连接
- 不处理任何 IO
2. Sub Reactor(EventLoopThreadPool)
- 多个 EventLoop,每个一个线程
- 负责 read/write/close
- 实现多核 CPU 负载均衡
3. TcpServer
- 管理 Acceptor + 线程池 + 连接池
- 新连接到来 → 分配 Sub Reactor
4. TcpConnection
- 一条连接的全权管理者
- 收发数据、事件处理、状态机
5. TcpClient
- 客户端主动发起连接
- 支持重连、异步建立、线程安全获取连接
二、代码精讲
1. TcpServer.h 知识点
#pragma once
#include "eventloop.h"
#include "connection.h"
#include "acceptor.h"
namespace net
{
class TcpServer
{
public:
//初始化成员,设置acceptor回调函数
TcpServer(EventLoop *loop,const InetAddress &addr);
//将当前连接管理池中的所有连接关闭释放
~TcpServer();
// 设置工作线程数
void setThreadNum(int count);
// 设置连接建立/关闭回调
void setConnectionCallback(ConnectionCallback cb);
// 设置消息到达回调
void setMessageCallback(MessageCallback cb);
//启动服务器
void start();
private:
// 新连接到来
void onNewConnection(int fd,InetAddress addr);
// 移除连接
void removeConnection(TcpConnectionPtr conn);
void removeConnectionInLoop(TcpConnectionPtr conn);
private:
EventLoop *_baseloop;//主reactor(负责监听)
EventLoopTreadPool _pool;//从属reactor线程池(负责IO)
Acceptor _acceptor;//监听套接字管理对象
std::atomic<int64_t> _next_conn_id;//连接标识
std::unordered_map<int64_t,TcpConnectionPtr> _connections;//连接管理池
ConnectionCallback _onConnection;
MessageCallback _onMessage;
};
}
- Reactor 主从架构设计
- Acceptor 与 EventLoop 绑定
- EventLoopThreadPool 线程池管理
unordered_map保存 TcpConnection 智能指针atomic<int64_t>原子连接 ID 分配- 连接回调、消息回调的类型定义
- 线程安全的连接新增与移除接口
2. TcpServer.cc 知识点
#include "server.h"
namespace net
{
// 初始化成员,设置acceptor回调函数
TcpServer::TcpServer(EventLoop *loop,const InetAddress &addr)
:_baseloop(loop),
_pool(loop),
_acceptor(loop,addr),
_next_conn_id(1)
{
_acceptor.setNewConnectionCallback(std::bind(&TcpServer::onNewConnection,this,
std::placeholders::_1,std::placeholders::_2));
}
// 将当前连接管理池中的所有连接关闭释放
TcpServer::~TcpServer(){
_baseloop->assertInLoopThread();
for(auto &it:_connections){
TcpConnectionPtr conn(it.second);
it.second.reset();
conn->loop()->queueInLoop(std::bind(&TcpConnection::connectDestroyed,conn));
}
}
void TcpServer::setThreadNum(int count){
_pool.setThreadNum(count);
}
void TcpServer::setConnectionCallback(ConnectionCallback cb){
_onConnection=std::move(cb);
}
void TcpServer::setMessageCallback(MessageCallback cb){
_onMessage=std::move(cb);
}
// 启动服务器
void TcpServer::start()
{
//1.启动事件循环池
_pool.start();
//2.开始监听
_baseloop->runInLoop(std::bind(&Acceptor::listen,&_acceptor));
}
void TcpServer::onNewConnection(int fd, InetAddress addr){
_baseloop->assertInLoopThread();
//新连接的处理:
//1.给新连接分配一个loop事件循环
EventLoop *ioloop=_pool.getNextLoop();
//2.为新连接构造Connection对象
auto id=_next_conn_id.fetch_add(1);
TcpConnectionPtr conn(new TcpConnection(ioloop,fd,id));
conn->setConnectionCallback(_onConnection);
conn->setMessageCallback(_onMessage);
conn->setCloseCallback(std::bind(&TcpServer::removeConnection,this,std::placeholders::_1));
//3.添加conn管理
_connections.insert(std::make_pair(id,conn));
//4.调用连接就绪接口
ioloop->runInLoop(std::bind(&TcpConnection::connectEstablished,conn));
}
void TcpServer::removeConnection(TcpConnectionPtr conn){
_baseloop->runInLoop(std::bind(&TcpServer::removeConnectionInLoop,this,conn));
}
void TcpServer::removeConnectionInLoop(TcpConnectionPtr conn){
_baseloop->assertInLoopThread();
//从连接管理池中移除连接管理
_connections.erase(conn->id());
//执行连接销毁操作
conn->loop()->queueInLoop(std::bind(&TcpConnection::connectDestroyed,conn));
}
}
- 主从 Reactor 线程池启动
- Acceptor 新连接回调绑定
- 轮询(RR)负载均衡分配连接
- TcpConnection 智能指针安全管理
- 跨线程调用
runInLoop - 连接建立
connectEstablished - 连接销毁
connectDestroyed - 服务器关闭时安全释放所有连接
- 线程归属检查
assertInLoopThread
3. TcpClient.h 知识点
#pragma once
#include "eventloop.h"
#include "connection.h"
namespace net
{
class TcpClient
{
public:
// 初始化成员
TcpClient(EventLoop *loop, InetAddress addr);
// 关闭连接
~TcpClient();
// 连接服务端
void connect();
// 获取通信连接对象
TcpConnectionPtr connection();
void setConnectionCallback(ConnectionCallback cb);
void setMessageCallback(MessageCallback cb);
private:
// 连接重试接口
void retry(int fd);
void retryInLoop();
// 连接成功时,为描述符构造通信连接对象
void newConnection(int fd);
// 连接关闭时,调用的回调函数,移除连接管理
void removeConnection(TcpConnectionPtr conn);
private:
std::mutex _mutex;
std::condition_variable _cond; // 等待连接建立完成
InetAddress _srvAddr; // 服务器地址
EventLoop *_baseloop; // 客户端事件循环
TcpConnectionPtr _connection; // 客户端连接
ConnectionCallback _onConnection;
MessageCallback _onMessage;
};
}
- 客户端非阻塞连接模型
mutex + condition_variable线程安全等待连接- 客户端重连机制
- 连接成功 / 失败错误码处理
- 线程安全获取 TcpConnectionPtr
- 连接关闭自动清理
4. TcpClient.cc 知识点
#include "client.h"
namespace net
{
// 初始化成员
TcpClient::TcpClient(EventLoop *loop, InetAddress addr)
:_srvAddr(addr),
_baseloop(loop){}
// 关闭连接
TcpClient::~TcpClient(){
if(_connection){
_connection->forceClose();
}
}
void TcpClient::setConnectionCallback(ConnectionCallback cb){
_onConnection=std::move(cb);
}
void TcpClient::setMessageCallback(MessageCallback cb){
_onMessage=std::move(cb);
}
// 连接服务端:创建套接字,连接服务器(成功构造connection对象,否则错误处理)
void TcpClient::connect(){
int sockfd = sockets::createBlockSocket();
int ret=sockets::connect(sockfd,_srvAddr.getSockAddr());
int errNo=ret==0?0:errno;
switch(errNo)
{
case 0://连接成功
case EINTR://阻塞连接过程被信号中断
case EISCONN://套接字已经连接成功
case EINPROGRESS://套接字非阻塞的情况下连接服务器,返回正在连接中,需要监控可写事件,描述符可写了才代表连接成功
_baseloop->runInLoop(std::bind(&TcpClient::newConnection,this,sockfd));
break;
case EAGAIN://连接立即返回无法确定是否连接成功
case EADDRINUSE://当前连接绑定的地址被占用
case EADDRNOTAVAIL://地址信息无效(通常在没有绑定地址内核会从空闲端口池绑定地址,但是端口池无空闲端口)
case ECONNREFUSED://连接被拒绝,通常指服务端没有对连接端口进行监听
case ENETUNREACH://网络不可达,通常指网络断开
case ETIMEDOUT://网络连接超时
retry(sockfd);
break;
case EACCES://权限错误
case EPROTOTYPE://套接字类型错误
case ENOTSOCK://描述符不是一个套接字描述符
case EFAULT://用户空间地址不够用了
case EBADF://文件描述符损坏
case EALREADY://非阻塞套接字上已有连接
case EAFNOSUPPORT://不支持的地址域
default:
LOG_FATAL("连接服务器失败");
break;
}
}
// 获取通信连接对象:连接对象还没有初始完毕就阻塞等待
TcpConnectionPtr TcpClient::connection(){
std::unique_lock<std::mutex> lock(_mutex);
_cond.wait(lock,[this](){return (bool)_connection;});
return _connection;
}
// 连接重试接口
void TcpClient::retry(int fd){
sockets::close(fd);
_baseloop->runAfter(3,std::bind(&TcpClient::retryInLoop,this));
}
void TcpClient::retryInLoop(){
_baseloop->assertInLoopThread();
connect();//调用connect连接服务器端
}
// 连接成功时,为描述符构造通信连接对象
void TcpClient::newConnection(int fd){
_baseloop->assertInLoopThread();
TcpConnectionPtr conn(new TcpConnection(_baseloop,fd,0));
conn->setConnectionCallback(_onConnection);
conn->setMessageCallback(_onMessage);
conn->setCloseCallback(std::bind(&TcpClient::removeConnection,this,std::placeholders::_1));
conn->connectEstablished();//将连接挂到loop上开始读事件监控
{
std::unique_lock<std::mutex> lock(_mutex);
_connection=conn;
_cond.notify_all();
}
}
// 连接关闭时,调用的回调函数,移除连接管理
void TcpClient::removeConnection(TcpConnectionPtr conn){
_baseloop->assertInLoopThread();
{
std::unique_lock<std::mutex> lock(_mutex);
_connection.reset();
}
conn->connectDestroyed();
}
}
- 阻塞 / 非阻塞 socket 连接
connect系统调用错误码精细化处理EINPROGRESS非阻塞连接处理- 连接失败自动延迟重连(3s)
- 客户端建立
newConnection - 客户端关闭
removeConnection runAfter定时器重连- 线程安全等待连接建立
5. 测试 main 函数知识点
服务端:
#include "../net/poller.h"
#include "../net/channel.h"
#include "../net/eventloop.h"
#include "../net/timestamp.h"
#include "../net/timer.h"
#include "../net/buffer.h"
#include "../net/socket.h"
#include "../net/acceptor.h"
#include "../net/connection.h"
#include "../net/server.h"
#include <iostream>
#if 0
int main()
{
net::EventLoop baseloop;
net::Timestamp now = net::Timestamp::now();
std::cout<<now.toFormatString()<<std::endl;
baseloop.runAt(net::addTime(now,2),[](){
std::cout << "这是一个2秒后的定时任务" << std::endl;
});
baseloop.runAfter(4,[](){
std::cout << "这是一个4秒后的延迟定时任务" << std::endl;
});
auto tid = baseloop.runEvery(1,[](){
static int count=0;
std::cout << "这是一个1秒后的循环定时任务" << std::endl;
});
baseloop.loop();
return 0;
}
#endif
#if 0
int main()
{
net::Buffer buffer;
//添加1000条 HelloWorld+N\n
uint32_t size=0;
for(int i=0;i<1000;i++)
{
std::string str="HelloWorld+"+std::to_string(i)+"\n";
buffer.append(str.c_str(),str.size());
size+=str.size();
}
buffer.prepend(&size,sizeof(size));//添加前置数据
assert(buffer.readableBytes()==sizeof(size)+size);
uint32_t num;
memcpy(&num,buffer.peek(),sizeof(num));//取出前四字节数据
assert(num==size);
buffer.retrieve(sizeof(num));
for(int i=0;i<1000;i++)
{
std::optional<std::string> line=buffer.getline();
std::string str ="HelloWorld+"+std::to_string(i)+"\n";
assert(line);
assert(*line==str);
}
assert(buffer.readableBytes()==0);
return 0;
}
#endif
void onConnection(net::TcpConnectionPtr conn){
if(conn->connected()){
//LOG_DEBUG("连接建立");
}else{
//LOG_DEBUG("连接关闭");
}
}
void onMessage(net::TcpConnectionPtr conn,net::Buffer* buf,net::Timestamp rtime){
std::string str=buf->retrieveAllAsString();
std::cout<<str<<std::endl;
conn->send(str.data(),str.size());
//conn->forceClose();
}
int main()
{
net::EventLoop baseloop;
net::TcpServer server(&baseloop,net::InetAddress(8080));
server.setThreadNum(3);
server.setConnectionCallback(onConnection);
server.setMessageCallback(onMessage);
server.start();
baseloop.loop();//启动主事件循环
return 0;
}
客户端:
#include "../net/eventloop.h"
#include "../net/client.h"
#include <iostream>
void onConnection(net::TcpConnectionPtr conn)
{
if (conn->connected())
{
// LOG_DEBUG("连接建立");
}
else
{
// LOG_DEBUG("连接关闭");
}
}
void onMessage(net::TcpConnectionPtr conn, net::Buffer *buf, net::Timestamp rtime)
{
std::string str = buf->retrieveAllAsString();
std::cout << str << std::endl;
}
int main()
{
net::EventLoopThread loopthread;
{
net::TcpClient client(loopthread.startLoop(), net::InetAddress("127.0.0.1", 8080));
client.connect();
client.setConnectionCallback(onConnection);
client.setMessageCallback(onMessage);
auto connection = client.connection();
for (int i = 0; i < 10; i++)
{
std::string str = "Hello World +" + std::to_string(i);
connection->send(str.data(), str.size());
}
sleep(1);
}
sleep(1);
return 0;
}
- 服务器 Echo 服务实现
- 客户端批量发送数据
- EventLoop 启动与事件循环
- 多线程客户端测试
- 定时器使用演示
- Buffer 粘包 / 半包测试
- 完整网络框架使用示例
三、TcpServer:服务器核心调度器
核心职责
- 启动服务器监听
- 接收新连接
- 分配 IO 线程(负载均衡)
- 管理所有客户端连接
- 安全关闭服务器
工作流程
start()启动线程池- Acceptor 开始监听
- 新连接 →
onNewConnection - 轮询选择一个 SubReactor
- 创建 TcpConnection
- 加入连接管理 map
- 启动连接
connectEstablished
设计亮点
1. 主从 Reactor 多线程
one loop per thread
- MainReactor:只负责接受连接
- SubReactor:每个连接绑定一个 loop
- 无锁、高性能、真正多核并行
2. 轮询负载均衡
ioloop = _pool.getNextLoop();
均匀分配连接,避免单核过热。
3. 连接自动管理
unordered_map<int64_t, TcpConnectionPtr> _connections;
服务器关闭时自动释放所有连接。
四、TcpClient:客户端连接器
核心职责
- 主动连接服务器
- 处理连接错误
- 自动重连
- 线程安全获取连接
- 安全关闭
设计亮点
1. 完整错误码处理
支持:
- EINTR
- EISCONN
- EINPROGRESS
- ECONNREFUSED
- ETIMEDOUT
- ENETUNREACH
- EAGAIN 等全部常见错误。
2. 自动重连机制
_baseloop->runAfter(3, std::bind(&TcpClient::retryInLoop, this));
连接失败 3 秒后自动重试。
3. 线程安全等待连接
_cond.wait(lock, [this]() {
return (bool)_connection;
});
未连接成功前阻塞等待。
五、整套框架的工作流程(最终版)
服务端
- 启动 TcpServer
- MainReactor 监听端口
- 客户端连接 → Acceptor 接受
- 分配给 SubReactor
- 创建 TcpConnection
- 开始读写事件监听
- 数据到达 → 消息回调
- 连接关闭 → 自动释放
客户端
- 创建 TcpClient
- 调用 connect ()
- 处理连接错误 / 重连
- 连接建立 → 回调通知
- 发送 / 接收数据
- 断开连接 → 自动清理
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)