解析muduo源码之 TcpServer.h & TcpServer.cc
·
目录
3. removeConnectionInLoop(...)
2. 构造函数 TcpServer::TcpServer ()
4. 接收新连接:TcpServer::newConnection ()
5. 移除连接:TcpServer::removeConnection ()
一、 TcpServer.h
先贴出完整代码,再逐部分解释:
// Copyright 2010, Shuo Chen. 保留所有权利。
// http://code.google.com/p/muduo/
//
// 本源代码使用 BSD 许可证
// 可在 License 文件中找到详细条款
// 作者:陈硕 (chenshuo at chenshuo dot com)
//
// 这是公共头文件,只能包含公共头文件
#ifndef MUDUO_NET_TCPSERVER_H
#define MUDUO_NET_TCPSERVER_H
#include "muduo/base/Atomic.h"
#include "muduo/base/Types.h"
#include "muduo/net/TcpConnection.h"
#include <map>
namespace muduo
{
namespace net
{
class Acceptor;
class EventLoop;
class EventLoopThreadPool;
///
/// TCP 服务器类,支持单线程和线程池模式
///
/// 这是接口类,不对外暴露过多内部细节
/// 是用户使用 muduo 编写服务端的主要入口
class TcpServer : noncopyable
{
public:
/// 线程初始化回调类型
typedef std::function<void(EventLoop*)> ThreadInitCallback;
/// 端口复用选项
enum Option
{
kNoReusePort, // 不使用端口复用
kReusePort, // 使用端口复用(SO_REUSEPORT)
};
/// 构造函数
/// @param loop 主事件循环(用于接收新连接)
/// @param listenAddr 监听地址
/// @param nameArg 服务器名称
/// @param option 是否开启端口复用
TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const string& nameArg,
Option option = kNoReusePort);
~TcpServer(); // 强制在外部实现析构,用于 unique_ptr 成员
/// 获取监听的 IP:端口
const string& ipPort() const { return ipPort_; }
/// 获取服务器名称
const string& name() const { return name_; }
/// 获取主事件循环
EventLoop* getLoop() const { return loop_; }
/// 设置处理 IO 的线程数量
///
/// 新连接永远在主 loop 线程中接收
/// 必须在 start() 之前调用
/// @param numThreads 线程数量
/// - 0:所有 IO 都在主线程,不创建额外线程(默认)
/// - 1:所有 IO 都在一个子线程
/// - N:使用 N 个线程的线程池,新连接采用轮询方式分配
void setThreadNum(int numThreads);
/// 设置线程初始化回调
void setThreadInitCallback(const ThreadInitCallback& cb)
{ threadInitCallback_ = cb; }
/// 获取线程池(start() 调用后有效)
std::shared_ptr<EventLoopThreadPool> threadPool()
{ return threadPool_; }
/// 启动服务器(如果尚未监听)
///
/// 多次调用无害
/// 线程安全
void start();
/// 设置连接建立/断开回调
/// 非线程安全
void setConnectionCallback(const ConnectionCallback& cb)
{ connectionCallback_ = cb; }
/// 设置消息到达回调
/// 非线程安全
void setMessageCallback(const MessageCallback& cb)
{ messageCallback_ = cb; }
/// 设置写完成回调
/// 非线程安全
void setWriteCompleteCallback(const WriteCompleteCallback& cb)
{ writeCompleteCallback_ = cb; }
private:
/// 接收新连接(非线程安全,但在 loop 线程中执行)
void newConnection(int sockfd, const InetAddress& peerAddr);
/// 移除连接(线程安全)
void removeConnection(const TcpConnectionPtr& conn);
/// 在 loop 线程中移除连接(非线程安全,但在 loop 线程执行)
void removeConnectionInLoop(const TcpConnectionPtr& conn);
/// 连接映射表:连接名 -> TcpConnection 智能指针
typedef std::map<string, TcpConnectionPtr> ConnectionMap;
EventLoop* loop_; // 主事件循环(Acceptor 所在线程)
const string ipPort_; // 监听的 IP:端口
const string name_; // 服务器名称
std::unique_ptr<Acceptor> acceptor_; // 接收器(隐藏内部实现)
std::shared_ptr<EventLoopThreadPool> threadPool_; // IO 线程池
/// 用户设置的回调
ConnectionCallback connectionCallback_; // 连接状态回调
MessageCallback messageCallback_; // 消息到达回调
WriteCompleteCallback writeCompleteCallback_; // 写完成回调
ThreadInitCallback threadInitCallback_; // 线程初始化回调
AtomicInt32 started_; // 服务器是否已启动(原子变量)
int nextConnId_; // 下一个连接的 ID(仅在 loop 线程使用)
ConnectionMap connections_; // 保存所有连接的映射表
};
} // namespace net
} // namespace muduo
#endif // MUDUO_NET_TCPSERVER_H
1. 整体定位
TcpServer = 整个 TCP 服务器的总控制器
- 对外:给用户提供
setConnectionCallback、setMessageCallback、start()接口 - 对内:管理 Acceptor + IO 线程池 + 所有 TcpConnection
- 实现 主从 Reactor 模型(one loop per thread)
2. TcpServer 在架构中的位置
用户代码
↓
TcpServer(大脑) ←—————— 你现在看的这个
↓ ↓
Acceptor(主Reactor) EventLoopThreadPool(从Reactor/IO线程)
↓ ↓
接收新连接 处理连接读写
↓
创建 TcpConnection
3. 核心成员变量一览
EventLoop* loop_; // 主线程(main Reactor)
const string ipPort_; // 监听地址
const string name_; // 服务器名称
std::unique_ptr<Acceptor> acceptor_; // 接收连接的接收器
std::shared_ptr<EventLoopThreadPool> threadPool_; // IO线程池
ConnectionCallback connectionCallback_; // 连接回调
MessageCallback messageCallback_; // 消息回调
WriteCompleteCallback writeCompleteCallback_; // 发送完成回调
AtomicInt32 started_; // 服务器是否启动
ConnectionMap connections_; // 保存所有连接 map<string, TcpConnectionPtr>
4. 核心接口
1. 构造函数
TcpServer(EventLoop* loop, const InetAddress& listenAddr, const string& name);
- 创建 Acceptor
- 初始化线程池
2. 设置线程数
void setThreadNum(int numThreads);
- 0:单线程(accept + IO 都在主线程)
- N:创建 N 个 IO 线程(主从 Reactor 模式)
3. 设置三大回调
void setConnectionCallback(...); // 连接建立/断开
void setMessageCallback(...); // 收到消息
void setWriteCompleteCallback(...);// 发送完毕
4. 启动服务器
void start();
- 启动线程池
- 让 Acceptor 开始 listen
5. 核心内部函数
1. newConnection(...)
Acceptor 收到新连接后调用
- 从线程池轮询选取一个 IO 线程
- 创建 TcpConnection
- 加入
connections_map 管理 - 设置回调
- 建立连接
2. removeConnection(...)
连接断开时调用
- 线程安全
- 转到
removeConnectionInLoop
3. removeConnectionInLoop(...)
- 从
connections_map 中删除连接 - 销毁 TcpConnection
6. 主从 Reactor 模型
工作流程:
- 主线程(loop_):
- 只做一件事:Acceptor 接收新连接
- IO 线程池(threadPool_):
- 每个线程是一个 EventLoop
- 新连接采用 round-robin(轮询) 分配
- 连接建立后,所有读写、定时、IO 都在该 IO 线程执行
为什么快?
- 主线程不处理业务,只负责接收连接
- IO 线程充分利用多核 CPU
- 无锁设计(一个连接只属于一个线程)
7. 连接管理 map
typedef std::map<string, TcpConnectionPtr> ConnectionMap;
ConnectionMap connections_;
- key:连接名(由 server name + 自增 id 组成)
- value:TcpConnection 智能指针
- 作用:保存所有活跃连接,服务器关闭时统一释放
二、 TcpServer.cc
先贴出完整代码,再逐部分解释:
// Copyright 2010, Shuo Chen. 保留所有权利。
// http://code.google.com/p/muduo/
//
// 本源代码的使用受 BSD 许可证约束
// 可在 License 文件中找到许可证条款。
// 作者:陈硕 (chenshuo at chenshuo dot com)
#include "muduo/net/TcpServer.h"
#include "muduo/base/Logging.h"
#include "muduo/net/Acceptor.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/EventLoopThreadPool.h"
#include "muduo/net/SocketsOps.h"
#include <stdio.h> // snprintf
using namespace muduo;
using namespace muduo::net;
// 构造函数:初始化服务器核心组件
TcpServer::TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const string& nameArg,
Option option)
: loop_(CHECK_NOTNULL(loop)), // 主事件循环(负责接收新连接)
ipPort_(listenAddr.toIpPort()), // 监听的IP:端口
name_(nameArg), // 服务器名称
acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)), // 连接接收器
threadPool_(new EventLoopThreadPool(loop, name_)), // IO线程池
connectionCallback_(defaultConnectionCallback), // 默认连接回调
messageCallback_(defaultMessageCallback), // 默认消息回调
nextConnId_(1) // 下一个连接ID从1开始
{
// 设置Acceptor收到新连接时的回调
acceptor_->setNewConnectionCallback(
std::bind(&TcpServer::newConnection, this, _1, _2));
}
// 析构函数:销毁所有连接
TcpServer::~TcpServer()
{
loop_->assertInLoopThread();
LOG_TRACE << "TcpServer::~TcpServer [" << name_ << "] 正在销毁";
// 遍历并销毁所有连接
for (auto& item : connections_)
{
TcpConnectionPtr conn(item.second);
item.second.reset();
// 让连接所在的IO线程执行销毁逻辑
conn->getLoop()->runInLoop(
std::bind(&TcpConnection::connectDestroyed, conn));
}
}
// 设置IO线程数量
void TcpServer::setThreadNum(int numThreads)
{
assert(0 <= numThreads);
threadPool_->setThreadNum(numThreads);
}
// 启动服务器(线程安全)
void TcpServer::start()
{
// 保证只启动一次
if (started_.getAndSet(1) == 0)
{
// 启动IO线程池
threadPool_->start(threadInitCallback_);
// 开始监听端口
assert(!acceptor_->listening());
loop_->runInLoop(
std::bind(&Acceptor::listen, get_pointer(acceptor_)));
}
}
// 接收新连接(由Acceptor回调调用)
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
loop_->assertInLoopThread();
// 从线程池轮询获取一个IO loop
EventLoop* ioLoop = threadPool_->getNextLoop();
// 生成唯一的连接名称
char buf[64];
snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
++nextConnId_;
string connName = name_ + buf;
LOG_INFO << "TcpServer::newConnection [" << name_
<< "] - 新连接 [" << connName
<< "] 来自 " << peerAddr.toIpPort();
// 获取本地地址
InetAddress localAddr(sockets::getLocalAddr(sockfd));
// 创建TcpConnection对象
TcpConnectionPtr conn(new TcpConnection(ioLoop,
connName,
sockfd,
localAddr,
peerAddr));
// 把新连接存入map管理
connections_[connName] = conn;
// 设置用户注册的回调函数
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
// 设置连接关闭时的回调(通知TcpServer删除该连接)
conn->setCloseCallback(
std::bind(&TcpServer::removeConnection, this, _1));
// 在IO线程中把连接设置为已建立状态
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}
// 移除连接(线程安全接口)
void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{
// 投递到主线程执行删除
loop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
}
// 主线程中真正执行移除连接
void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{
loop_->assertInLoopThread();
LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_
<< "] - 连接 " << conn->name();
// 从连接map中删除
size_t n = connections_.erase(conn->name());
(void)n;
assert(n == 1);
// 在连接所属的IO线程中执行销毁逻辑
EventLoop* ioLoop = conn->getLoop();
ioLoop->queueInLoop(
std::bind(&TcpConnection::connectDestroyed, conn));
}
1. 总体定位
TcpServer = 服务器大脑
- 持有 main EventLoop(主 Reactor)
- 持有 Acceptor(负责接收连接)
- 持有 EventLoopThreadPool(IO 线程池)
- 持有所有 TcpConnection(用 map 管理)
- 对外提供用户接口,对内调度所有组件
2. 构造函数 TcpServer::TcpServer ()
TcpServer::TcpServer(EventLoop* loop, ...)
: loop_(loop), // 主线程(main reactor)
ipPort_(listenAddr.toIpPort()),
name_(nameArg),
acceptor_(new Acceptor(...)), // 创建接收器
threadPool_(new EventLoopThreadPool(...)), // 创建线程池
connectionCallback_(defaultConnectionCallback),
messageCallback_(defaultMessageCallback),
nextConnId_(1)
{
// Acceptor 收到新连接 → 调用 TcpServer::newConnection
acceptor_->setNewConnectionCallback(
std::bind(&TcpServer::newConnection, this, _1, _2));
}
作用:
- 绑定主线程 loop
- 创建 Acceptor(监听 socket)
- 创建 IO 线程池
- 设置 Acceptor 回调:有连接就交给 newConnection
3. 启动服务器:TcpServer::start ()
void TcpServer::start()
{
if (started_.getAndSet(1) == 0) // 原子操作,保证只启动一次
{
threadPool_->start(threadInitCallback_); // 启动 IO 线程池
// 让 Acceptor 开始 listen(必须在主线程)
loop_->runInLoop(
std::bind(&Acceptor::listen, acceptor_.get()));
}
}
启动流程:
- 启动 IO 线程池
- 主线程调用 Acceptor::listen ()
- 服务器开始接受客户端连接
4. 接收新连接:TcpServer::newConnection ()
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
loop_->assertInLoopThread();
// 1. 从线程池中**轮询**选一个 IO 线程
EventLoop* ioLoop = threadPool_->getNextLoop();
// 2. 生成连接名称(serverName-ip:port#id)
char buf[64];
snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_++);
string connName = name_ + buf;
// 3. 创建 TcpConnection
TcpConnectionPtr conn(new TcpConnection(
ioLoop,
connName,
sockfd,
localAddr,
peerAddr));
// 4. 加入 map 管理
connections_[connName] = conn;
// 5. 设置用户回调
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
// 6. 连接关闭时 → 调用 removeConnection
conn->setCloseCallback(
std::bind(&TcpServer::removeConnection, this, _1));
// 7. 在 IO 线程中初始化连接(开始监听读事件)
ioLoop->runInLoop(
std::bind(&TcpConnection::connectEstablished, conn));
}
流程:
- Acceptor 接收连接(main loop)
- TcpServer 轮询选择一个 IO 线程
- 创建 TcpConnection,绑定到该 IO 线程
- 所有后续读写、事件、定时 全部在该 IO 线程执行
- 主线程完全不参与业务,只负责分发连接
5. 移除连接:TcpServer::removeConnection ()
void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{
// 切换到主线程执行
loop_->runInLoop(
std::bind(&TcpServer::removeConnectionInLoop, this, conn));
}
void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{
// 从 map 中删除
connections_.erase(conn->name());
// 在连接所属的 IO 线程中销毁
EventLoop* ioLoop = conn->getLoop();
ioLoop->queueInLoop(
std::bind(&TcpConnection::connectDestroyed, conn));
}
- 所有连接管理 必须在 main loop 线程
- 连接销毁 必须在其所属 IO 线程
- 线程安全,无锁,不崩溃
6. 析构函数~TcpServer ()
TcpServer::~TcpServer()
{
for (auto& item : connections_)
{
TcpConnectionPtr conn(item.second);
item.second.reset();
conn->getLoop()->runInLoop(
std::bind(&TcpConnection::connectDestroyed, conn));
}
}
关闭时安全销毁所有连接。
7. TcpServer 工作流程图
用户调用 server.start()
↓
启动线程池 → Acceptor listen
↓
客户端连接 → Acceptor::handleRead()
↓
TcpServer::newConnection()
↓
轮询选择 IO 线程 → 创建 TcpConnection
↓
连接交由 IO 线程管理
↓
读写/回调/定时 全部在 IO 线程执行
8. TcpServer 核心设计亮点
- 主从 Reactor 模型
- main Reactor:只 accept
- sub Reactor:处理所有连接 IO
- one loop per thread一个连接只属于一个线程,无锁、高性能
- 轮询分配连接负载均衡
- map 管理所有连接安全、可追踪、方便关闭
- 全程线程安全所有跨线程操作通过 runInLoop 调度
- 完全非阻塞无任何阻塞调用,支持百万并发
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)