目录

一、 TcpServer.h

1. 整体定位

2. TcpServer 在架构中的位置

3. 核心成员变量一览

4. 核心接口

1. 构造函数

2. 设置线程数

3. 设置三大回调

4. 启动服务器

5. 核心内部函数

1. newConnection(...)

2. removeConnection(...)

3. removeConnectionInLoop(...)

6. 主从 Reactor 模型

7. 连接管理 map

二、 TcpServer.cc

1. 总体定位

2. 构造函数 TcpServer::TcpServer ()

作用:

3. 启动服务器:TcpServer::start ()

4. 接收新连接:TcpServer::newConnection ()

5. 移除连接:TcpServer::removeConnection ()

6. 析构函数~TcpServer ()

7. TcpServer 工作流程图

8. TcpServer 核心设计亮点


一、 TcpServer.h

先贴出完整代码,再逐部分解释:

// Copyright 2010, Shuo Chen. 保留所有权利。
// http://code.google.com/p/muduo/
//
// 本源代码使用 BSD 许可证
// 可在 License 文件中找到详细条款

// 作者:陈硕 (chenshuo at chenshuo dot com)
//
// 这是公共头文件,只能包含公共头文件

#ifndef MUDUO_NET_TCPSERVER_H
#define MUDUO_NET_TCPSERVER_H

#include "muduo/base/Atomic.h"
#include "muduo/base/Types.h"
#include "muduo/net/TcpConnection.h"

#include <map>

namespace muduo
{
namespace net
{

class Acceptor;
class EventLoop;
class EventLoopThreadPool;

///
/// TCP 服务器类,支持单线程和线程池模式
///
/// 这是接口类,不对外暴露过多内部细节
/// 是用户使用 muduo 编写服务端的主要入口
class TcpServer : noncopyable
{
 public:
  /// 线程初始化回调类型
  typedef std::function<void(EventLoop*)> ThreadInitCallback;

  /// 端口复用选项
  enum Option
  {
    kNoReusePort,  // 不使用端口复用
    kReusePort,    // 使用端口复用(SO_REUSEPORT)
  };

  /// 构造函数
  /// @param loop 主事件循环(用于接收新连接)
  /// @param listenAddr 监听地址
  /// @param nameArg 服务器名称
  /// @param option 是否开启端口复用
  TcpServer(EventLoop* loop,
            const InetAddress& listenAddr,
            const string& nameArg,
            Option option = kNoReusePort);

  ~TcpServer();  // 强制在外部实现析构,用于 unique_ptr 成员

  /// 获取监听的 IP:端口
  const string& ipPort() const { return ipPort_; }
  /// 获取服务器名称
  const string& name() const { return name_; }
  /// 获取主事件循环
  EventLoop* getLoop() const { return loop_; }

  /// 设置处理 IO 的线程数量
  ///
  /// 新连接永远在主 loop 线程中接收
  /// 必须在 start() 之前调用
  /// @param numThreads 线程数量
  /// - 0:所有 IO 都在主线程,不创建额外线程(默认)
  /// - 1:所有 IO 都在一个子线程
  /// - N:使用 N 个线程的线程池,新连接采用轮询方式分配
  void setThreadNum(int numThreads);

  /// 设置线程初始化回调
  void setThreadInitCallback(const ThreadInitCallback& cb)
  { threadInitCallback_ = cb; }

  /// 获取线程池(start() 调用后有效)
  std::shared_ptr<EventLoopThreadPool> threadPool()
  { return threadPool_; }

  /// 启动服务器(如果尚未监听)
  ///
  /// 多次调用无害
  /// 线程安全
  void start();

  /// 设置连接建立/断开回调
  /// 非线程安全
  void setConnectionCallback(const ConnectionCallback& cb)
  { connectionCallback_ = cb; }

  /// 设置消息到达回调
  /// 非线程安全
  void setMessageCallback(const MessageCallback& cb)
  { messageCallback_ = cb; }

  /// 设置写完成回调
  /// 非线程安全
  void setWriteCompleteCallback(const WriteCompleteCallback& cb)
  { writeCompleteCallback_ = cb; }

 private:
  /// 接收新连接(非线程安全,但在 loop 线程中执行)
  void newConnection(int sockfd, const InetAddress& peerAddr);

  /// 移除连接(线程安全)
  void removeConnection(const TcpConnectionPtr& conn);

  /// 在 loop 线程中移除连接(非线程安全,但在 loop 线程执行)
  void removeConnectionInLoop(const TcpConnectionPtr& conn);

  /// 连接映射表:连接名 -> TcpConnection 智能指针
  typedef std::map<string, TcpConnectionPtr> ConnectionMap;

  EventLoop* loop_;                       // 主事件循环(Acceptor 所在线程)
  const string ipPort_;                    // 监听的 IP:端口
  const string name_;                      // 服务器名称
  std::unique_ptr<Acceptor> acceptor_;     // 接收器(隐藏内部实现)
  std::shared_ptr<EventLoopThreadPool> threadPool_;  // IO 线程池

  /// 用户设置的回调
  ConnectionCallback connectionCallback_;        // 连接状态回调
  MessageCallback messageCallback_;              // 消息到达回调
  WriteCompleteCallback writeCompleteCallback_;  // 写完成回调
  ThreadInitCallback threadInitCallback_;       // 线程初始化回调

  AtomicInt32 started_;                          // 服务器是否已启动(原子变量)
  int nextConnId_;                               // 下一个连接的 ID(仅在 loop 线程使用)
  ConnectionMap connections_;                    // 保存所有连接的映射表
};

}  // namespace net
}  // namespace muduo

#endif  // MUDUO_NET_TCPSERVER_H

1. 整体定位

TcpServer = 整个 TCP 服务器的总控制器

  • 对外:给用户提供 setConnectionCallbacksetMessageCallbackstart() 接口
  • 对内:管理 Acceptor + IO 线程池 + 所有 TcpConnection
  • 实现 主从 Reactor 模型(one loop per thread)

2. TcpServer 在架构中的位置

用户代码
   ↓
TcpServer(大脑) ←—————— 你现在看的这个
   ↓            ↓
Acceptor(主Reactor)  EventLoopThreadPool(从Reactor/IO线程)
   ↓                      ↓
接收新连接            处理连接读写
   ↓
创建 TcpConnection

3. 核心成员变量一览

EventLoop* loop_;                // 主线程(main Reactor)
const string ipPort_;            // 监听地址
const string name_;              // 服务器名称

std::unique_ptr<Acceptor> acceptor_;       // 接收连接的接收器
std::shared_ptr<EventLoopThreadPool> threadPool_; // IO线程池

ConnectionCallback connectionCallback_;    // 连接回调
MessageCallback messageCallback_;          // 消息回调
WriteCompleteCallback writeCompleteCallback_; // 发送完成回调

AtomicInt32 started_;                      // 服务器是否启动
ConnectionMap connections_;                // 保存所有连接 map<string, TcpConnectionPtr>

4. 核心接口

1. 构造函数
TcpServer(EventLoop* loop, const InetAddress& listenAddr, const string& name);
  • 创建 Acceptor
  • 初始化线程池
2. 设置线程数
void setThreadNum(int numThreads);
  • 0:单线程(accept + IO 都在主线程)
  • N:创建 N 个 IO 线程(主从 Reactor 模式)
3. 设置三大回调
void setConnectionCallback(...);  // 连接建立/断开
void setMessageCallback(...);     // 收到消息
void setWriteCompleteCallback(...);// 发送完毕
4. 启动服务器
void start();
  • 启动线程池
  • 让 Acceptor 开始 listen

5. 核心内部函数

1. newConnection(...)

Acceptor 收到新连接后调用

  • 从线程池轮询选取一个 IO 线程
  • 创建 TcpConnection
  • 加入 connections_ map 管理
  • 设置回调
  • 建立连接
2. removeConnection(...)

连接断开时调用

  • 线程安全
  • 转到 removeConnectionInLoop
3. removeConnectionInLoop(...)
  • connections_ map 中删除连接
  • 销毁 TcpConnection

6. 主从 Reactor 模型

工作流程:

  1. 主线程(loop_)
    • 只做一件事:Acceptor 接收新连接
  2. IO 线程池(threadPool_)
    • 每个线程是一个 EventLoop
    • 新连接采用 round-robin(轮询) 分配
    • 连接建立后,所有读写、定时、IO 都在该 IO 线程执行

为什么快?

  • 主线程不处理业务,只负责接收连接
  • IO 线程充分利用多核 CPU
  • 无锁设计(一个连接只属于一个线程)

7. 连接管理 map

typedef std::map<string, TcpConnectionPtr> ConnectionMap;
ConnectionMap connections_;
  • key:连接名(由 server name + 自增 id 组成)
  • value:TcpConnection 智能指针
  • 作用:保存所有活跃连接,服务器关闭时统一释放

二、 TcpServer.cc

先贴出完整代码,再逐部分解释:

// Copyright 2010, Shuo Chen. 保留所有权利。
// http://code.google.com/p/muduo/
//
// 本源代码的使用受 BSD 许可证约束
// 可在 License 文件中找到许可证条款。

// 作者:陈硕 (chenshuo at chenshuo dot com)

#include "muduo/net/TcpServer.h"

#include "muduo/base/Logging.h"
#include "muduo/net/Acceptor.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/EventLoopThreadPool.h"
#include "muduo/net/SocketsOps.h"

#include <stdio.h>  // snprintf

using namespace muduo;
using namespace muduo::net;

// 构造函数:初始化服务器核心组件
TcpServer::TcpServer(EventLoop* loop,
                     const InetAddress& listenAddr,
                     const string& nameArg,
                     Option option)
  : loop_(CHECK_NOTNULL(loop)),                // 主事件循环(负责接收新连接)
    ipPort_(listenAddr.toIpPort()),            // 监听的IP:端口
    name_(nameArg),                            // 服务器名称
    acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),  // 连接接收器
    threadPool_(new EventLoopThreadPool(loop, name_)),                 // IO线程池
    connectionCallback_(defaultConnectionCallback),  // 默认连接回调
    messageCallback_(defaultMessageCallback),        // 默认消息回调
    nextConnId_(1)                                    // 下一个连接ID从1开始
{
  // 设置Acceptor收到新连接时的回调
  acceptor_->setNewConnectionCallback(
      std::bind(&TcpServer::newConnection, this, _1, _2));
}

// 析构函数:销毁所有连接
TcpServer::~TcpServer()
{
  loop_->assertInLoopThread();
  LOG_TRACE << "TcpServer::~TcpServer [" << name_ << "] 正在销毁";

  // 遍历并销毁所有连接
  for (auto& item : connections_)
  {
    TcpConnectionPtr conn(item.second);
    item.second.reset();
    // 让连接所在的IO线程执行销毁逻辑
    conn->getLoop()->runInLoop(
      std::bind(&TcpConnection::connectDestroyed, conn));
  }
}

// 设置IO线程数量
void TcpServer::setThreadNum(int numThreads)
{
  assert(0 <= numThreads);
  threadPool_->setThreadNum(numThreads);
}

// 启动服务器(线程安全)
void TcpServer::start()
{
  // 保证只启动一次
  if (started_.getAndSet(1) == 0)
  {
    // 启动IO线程池
    threadPool_->start(threadInitCallback_);

    // 开始监听端口
    assert(!acceptor_->listening());
    loop_->runInLoop(
        std::bind(&Acceptor::listen, get_pointer(acceptor_)));
  }
}

// 接收新连接(由Acceptor回调调用)
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
  loop_->assertInLoopThread();
  // 从线程池轮询获取一个IO loop
  EventLoop* ioLoop = threadPool_->getNextLoop();

  // 生成唯一的连接名称
  char buf[64];
  snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
  ++nextConnId_;
  string connName = name_ + buf;

  LOG_INFO << "TcpServer::newConnection [" << name_
           << "] - 新连接 [" << connName
           << "] 来自 " << peerAddr.toIpPort();

  // 获取本地地址
  InetAddress localAddr(sockets::getLocalAddr(sockfd));

  // 创建TcpConnection对象
  TcpConnectionPtr conn(new TcpConnection(ioLoop,
                                          connName,
                                          sockfd,
                                          localAddr,
                                          peerAddr));
  // 把新连接存入map管理
  connections_[connName] = conn;

  // 设置用户注册的回调函数
  conn->setConnectionCallback(connectionCallback_);
  conn->setMessageCallback(messageCallback_);
  conn->setWriteCompleteCallback(writeCompleteCallback_);

  // 设置连接关闭时的回调(通知TcpServer删除该连接)
  conn->setCloseCallback(
      std::bind(&TcpServer::removeConnection, this, _1));

  // 在IO线程中把连接设置为已建立状态
  ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}

// 移除连接(线程安全接口)
void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{
  // 投递到主线程执行删除
  loop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
}

// 主线程中真正执行移除连接
void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{
  loop_->assertInLoopThread();
  LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_
           << "] - 连接 " << conn->name();

  // 从连接map中删除
  size_t n = connections_.erase(conn->name());
  (void)n;
  assert(n == 1);

  // 在连接所属的IO线程中执行销毁逻辑
  EventLoop* ioLoop = conn->getLoop();
  ioLoop->queueInLoop(
      std::bind(&TcpConnection::connectDestroyed, conn));
}

1. 总体定位

TcpServer = 服务器大脑

  • 持有 main EventLoop(主 Reactor)
  • 持有 Acceptor(负责接收连接)
  • 持有 EventLoopThreadPool(IO 线程池)
  • 持有所有 TcpConnection(用 map 管理)
  • 对外提供用户接口,对内调度所有组件

2. 构造函数 TcpServer::TcpServer ()

TcpServer::TcpServer(EventLoop* loop, ...)
  : loop_(loop),               // 主线程(main reactor)
    ipPort_(listenAddr.toIpPort()),
    name_(nameArg),
    acceptor_(new Acceptor(...)), // 创建接收器
    threadPool_(new EventLoopThreadPool(...)), // 创建线程池
    connectionCallback_(defaultConnectionCallback),
    messageCallback_(defaultMessageCallback),
    nextConnId_(1)
{
  // Acceptor 收到新连接 → 调用 TcpServer::newConnection
  acceptor_->setNewConnectionCallback(
      std::bind(&TcpServer::newConnection, this, _1, _2));
}

作用:

  1. 绑定主线程 loop
  2. 创建 Acceptor(监听 socket)
  3. 创建 IO 线程池
  4. 设置 Acceptor 回调:有连接就交给 newConnection

3. 启动服务器:TcpServer::start ()

void TcpServer::start()
{
  if (started_.getAndSet(1) == 0)  // 原子操作,保证只启动一次
  {
    threadPool_->start(threadInitCallback_);  // 启动 IO 线程池

    // 让 Acceptor 开始 listen(必须在主线程)
    loop_->runInLoop(
        std::bind(&Acceptor::listen, acceptor_.get()));
  }
}

启动流程:

  1. 启动 IO 线程池
  2. 主线程调用 Acceptor::listen ()
  3. 服务器开始接受客户端连接

4. 接收新连接:TcpServer::newConnection ()

void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
  loop_->assertInLoopThread();

  // 1. 从线程池中**轮询**选一个 IO 线程
  EventLoop* ioLoop = threadPool_->getNextLoop();

  // 2. 生成连接名称(serverName-ip:port#id)
  char buf[64];
  snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_++);
  string connName = name_ + buf;

  // 3. 创建 TcpConnection
  TcpConnectionPtr conn(new TcpConnection(
      ioLoop,
      connName,
      sockfd,
      localAddr,
      peerAddr));

  // 4. 加入 map 管理
  connections_[connName] = conn;

  // 5. 设置用户回调
  conn->setConnectionCallback(connectionCallback_);
  conn->setMessageCallback(messageCallback_);
  conn->setWriteCompleteCallback(writeCompleteCallback_);

  // 6. 连接关闭时 → 调用 removeConnection
  conn->setCloseCallback(
      std::bind(&TcpServer::removeConnection, this, _1));

  // 7. 在 IO 线程中初始化连接(开始监听读事件)
  ioLoop->runInLoop(
      std::bind(&TcpConnection::connectEstablished, conn));
}

流程:

  1. Acceptor 接收连接(main loop)
  2. TcpServer 轮询选择一个 IO 线程
  3. 创建 TcpConnection,绑定到该 IO 线程
  4. 所有后续读写、事件、定时 全部在该 IO 线程执行
  5. 主线程完全不参与业务,只负责分发连接

5. 移除连接:TcpServer::removeConnection ()

void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{
  // 切换到主线程执行
  loop_->runInLoop(
      std::bind(&TcpServer::removeConnectionInLoop, this, conn));
}

void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{
  // 从 map 中删除
  connections_.erase(conn->name());

  // 在连接所属的 IO 线程中销毁
  EventLoop* ioLoop = conn->getLoop();
  ioLoop->queueInLoop(
    std::bind(&TcpConnection::connectDestroyed, conn));
}
  • 所有连接管理 必须在 main loop 线程
  • 连接销毁 必须在其所属 IO 线程
  • 线程安全,无锁,不崩溃

6. 析构函数~TcpServer ()

TcpServer::~TcpServer()
{
  for (auto& item : connections_)
  {
    TcpConnectionPtr conn(item.second);
    item.second.reset();
    conn->getLoop()->runInLoop(
      std::bind(&TcpConnection::connectDestroyed, conn));
  }
}

关闭时安全销毁所有连接

7. TcpServer 工作流程图

用户调用 server.start()
   ↓
启动线程池 → Acceptor listen
   ↓
客户端连接 → Acceptor::handleRead()
   ↓
TcpServer::newConnection()
   ↓
轮询选择 IO 线程 → 创建 TcpConnection
   ↓
连接交由 IO 线程管理
   ↓
读写/回调/定时 全部在 IO 线程执行

8. TcpServer 核心设计亮点

  • 主从 Reactor 模型
    • main Reactor:只 accept
    • sub Reactor:处理所有连接 IO
  • one loop per thread一个连接只属于一个线程,无锁、高性能
  • 轮询分配连接负载均衡
  • map 管理所有连接安全、可追踪、方便关闭
  • 全程线程安全所有跨线程操作通过 runInLoop 调度
  • 完全非阻塞无任何阻塞调用,支持百万并发

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐