在基于 Reactor 模型的 C++ 高性能网络库中,应用层缓冲区(Buffer)TCP 连接管理(TcpConnection) 是连接建立、数据收发、异常处理的核心组件。


一、整体结构与核心定位

Buffer + TcpConnection 是网络库连接层 + IO 层的核心:

  • Buffer:用户态动态缓冲区,解决非阻塞 socket 数据收发的粘包、半包、缓冲区不足问题
  • TcpConnection:封装一条 TCP 连接的完整生命周期,管理 socket、channel、读写缓冲区、回调、状态

二者共同构成网络库的数据通道 + 连接载体


二、代码精讲

第一段:buffer.h 

#pragma once

/*
    封装实现一个用户态的套接字收发缓冲区,用于后边实现的Connection连接对象
*/
#include <vector>
#include <string>
#include <optional>
namespace net
{
    class Buffer
    {
    public:
        Buffer();
        Buffer(const Buffer &other);
        Buffer(Buffer &&other);
        Buffer &operator=(const Buffer &other);
        Buffer &operator=(Buffer &&other);
        void swap(Buffer &other);
        // 获取可读起始地址
        const char *peek()const;
        // 获取可读数据大小
        size_t readableBytes() const;
        // 可写空间⼤⼩
        size_t writableBytes() const;
        // 向后移动_readerIndex,若⼩于可读数据⼤⼩则向后偏移,否则就重置读写索引
        void retrieve(size_t len);
        // 数据被全部取出,则重置读写索引
        void retrieveAll();
        // 获取所有数据到string中,并偏移读索引
        std::string retrieveAllAsString();
        // 获取指定⻓度数据到string中,并偏移读索引
        std::string retrieveAsString(size_t len);
        // 判断扩容,向末尾追加数据,偏移写索引 std::copy
        void append(const void *data, size_t len);
        // 若可写空间⼤⼩⼩于len则扩容
        void ensureWritableBytes(size_t len);
        // 获取写起始地址
        char *beginWrite();
        // 偏移写索引
        void hasWritten(size_t len);
        // 分块读取数据到buffer和临时空间中
        // 若实际读取⻓度⼩于等于buffer可写空间⼤⼩,则偏移写索引即可
        // 若实际读取⻓度⼤于buffer可写空间⼤⼩,则需要将临时空间的数据追加到buffer中
        ssize_t readFd(int fd, int *savedErrno);

        //追加getline && prependableBytes prepend
        size_t prependableBytes();
        void prepend(const void *data,size_t len);
        //c++17特性 optional int size()
        std::optional<std::string> getline();
    private:
        // 获取空间起始地址
        const char *begin()const; 
        char *begin();
        // 空间扩容:若前置⼤⼩+末尾空闲⼤⼩ < ⽬标⻓度+预留,则直接扩容到末尾
        // 若前置⼤⼩+末尾空闲⼤⼩ >= ⽬标⻓度+预留,则将后边的数据拷⻉到前边去
        void makeSpace(size_t len);

    private:
        const static int kInitialSize = 1024; // 默认缓冲区大小
        const static int kCheapPrepend = 8;    // 前置默认预留空间大小
        // 使用vector进行空间管理
        std::vector<char> _buffer;
        size_t _reader_idx; // 读位置索引
        size_t _writer_idx; // 写位置索引
    };
}

  1. 头文件依赖

    • <vector>:底层动态内存管理
    • <string>:数据返回与格式化
    • <optional>:C++17 安全返回(getline)
    • <cassert>:运行时断言校验
  2. Buffer 类设计思想

    • 应用层非阻塞 socket 收发缓冲区
    • 支持自动扩容、数据移动、内存紧凑、前置预留空间
    • 面向 TcpConnection 提供统一读写接口
  3. 核心成员变量

    • std::vector<char> _buffer:动态内存容器
    • _reader_idx / _writer_idx:读写指针索引
    • kInitialSize:默认缓冲区大小(1024)
    • kCheapPrepend:头部预留空间(8 字节)
  4. 公开接口设计

    • 构造 / 拷贝 / 移动 / 赋值 /swap:现代 C++ 五法则
    • 可读 / 可写空间获取
    • 数据追加、读取、回收、格式化
    • readFd:从 fd 分散读取数据
    • prepend:头部插入数据
    • getline:按行读取(支持 \n)
  5. 私有接口

    • begin():缓冲区起始地址
    • makeSpace():内存扩容与紧凑算法

第二段:buffer.cc 知识点

#include "buffer.h"
#include <cassert>
#include<cstring>
#include<sys/uio.h>
#include <iostream>

namespace net
{
    Buffer::Buffer()
        : _buffer(kInitialSize),
          _reader_idx(kCheapPrepend),
          _writer_idx(kCheapPrepend)
    {
    }
    Buffer::Buffer(const Buffer &other)
        : _buffer(other._buffer),
          _reader_idx(other._reader_idx),
          _writer_idx(other._writer_idx)
    {
    }
    Buffer::Buffer(Buffer &&other)
    {
        Buffer tmp;
        tmp.swap(other);
        tmp.swap(*this);
    }
    Buffer &Buffer::operator=(const Buffer &other)
    {
        Buffer tmp(other);
        tmp.swap(*this);
        return *this;
    }
    Buffer &Buffer::operator=(Buffer &&other)
    {
        Buffer tmp(std::move(other));
        tmp.swap(*this);
        return*this;
    }
    void Buffer::swap(Buffer &other)
    {
        _buffer.swap(other._buffer);
        std::swap(_reader_idx, other._reader_idx);
        std::swap(_writer_idx, other._writer_idx);
    }
    // 获取可读起始地址
    const char *Buffer::peek() const
    {
        return begin() + _reader_idx;
    }
    // 获取可读数据大小
    size_t Buffer::readableBytes() const
    {
        return _writer_idx - _reader_idx;
    }
    // 可写空间⼤⼩
    size_t Buffer::writableBytes() const
    {
        return _buffer.size() - _writer_idx;
    }
    // 向后移动_readerIndex,若⼩于可读数据⼤⼩则向后偏移,否则就重置读写索引
    void Buffer::retrieve(size_t len)
    {

        assert(len <= readableBytes());
        _reader_idx += len;
    }
    // 数据被全部取出,则重置读写索引
    void Buffer::retrieveAll()
    {
        retrieve(readableBytes());
    }
    // 获取所有数据到string中,并偏移读索引
    std::string Buffer::retrieveAllAsString()
    {
        return retrieveAsString(readableBytes());
    }
    // 获取指定⻓度数据到string中,并偏移读索引
    std::string Buffer::retrieveAsString(size_t len)
    {
        assert(len <= readableBytes());
        std::string retval;
        // 获取数据到string 中
        retval.assign(peek(), len);
        // 偏移读指针
        retrieve(len);
        return retval;
    }
    // 判断扩容,向末尾追加数据,偏移写索引 std::copy
    void Buffer::append(const void *data, size_t len)
    {
        // 1.确保空闲空间足够(不够就需要扩容)
        ensureWritableBytes(len);
        // 2.将数据拷贝到缓冲区中
        std::copy((const char*)data, (const char*)data + len, beginWrite());
        // 3.将写指针向后偏移
        hasWritten(len);
        //std::cout<<"["<<std::string((const char*)data,len)<<"]\n";
    }
    // 若可写空间⼤⼩⼩于len则扩容
    void Buffer::ensureWritableBytes(size_t len)
    {
        // 检查后置空闲空间是否足够:够了就返回,不够就扩容
        if (len <= writableBytes())
        {
            return;
        }
        else
        {
            makeSpace(len);
        }
    }
    // 获取写入起始地址
    char *Buffer::beginWrite()
    {
        return begin() + _writer_idx;
    }
    // 偏移写索引
    void Buffer::hasWritten(size_t len)
    {
        assert(len <= writableBytes());
        _writer_idx += len;
    }

    // 获取空间起始地址
    const char *Buffer::begin() const
    {
        return &_buffer[0];
    }
    char *Buffer::begin()
    {
        return &_buffer[0];
    }
    size_t Buffer::prependableBytes()
    {
        return _reader_idx;
    }
    // 空间扩容:若前置⼤⼩+末尾空闲⼤⼩ < ⽬标⻓度(len)+预留,则直接扩容到末尾
    // 若前置⼤⼩+末尾空闲⼤⼩ >= ⽬标⻓度+预留,则将后边的数据拷⻉到前边去
    void Buffer::makeSpace(size_t len)
    {
        if (prependableBytes() + writableBytes() <= len + kCheapPrepend)
        {
            _buffer.resize(_writer_idx + len);
        }
        else
        {
            assert(kCheapPrepend < _reader_idx);
            // 1.获取可读数据大小
            size_t dataSize = readableBytes();
            // 2.将数据拷贝到起始位置(前置还是有一个固定的预留空间)
            std::copy(begin() + _reader_idx,
                      begin() + _writer_idx,
                      begin() + kCheapPrepend);

            // 3.重置读写指针位置
            _reader_idx = kCheapPrepend;
            _writer_idx = _reader_idx + dataSize;
            assert(dataSize == readableBytes());
        }
    }
    void Buffer::prepend(const void *data, size_t len){
        assert(len<=prependableBytes());
        //将读指针向前偏移len字节
        _reader_idx-=len;
        //拷贝数据到读指针指向位置
        std::copy((const char*)data,//要拷贝的目标数据的起始地址
            (const char*)data+len,//要拷贝的目标数据的结束地址
            begin()+_reader_idx);
    }
    // c++17特性 optional int size()
    std::optional<std::string> Buffer::getline(){
        //memchr 从可读数据中查找"\n"
        void* pos=memchr((void*)peek(),'\n',readableBytes());
        if(pos==NULL)
        {
            return std::nullopt;
        }
        //构造string对象,拷贝数据进去
        size_t lineSize=(char*)pos-peek()+1;
        //std::string str(peek(),lineSize);
        //std::cout<<"获取数据:["<<str<<"/\n";
        //将读指针向后偏移
        return retrieveAsString(lineSize);
    }
    // 分块读取数据到buffer和临时空间中
    // 若实际读取⻓度⼩于等于buffer可写空间⼤⼩,则偏移写索引即可
    // 若实际读取⻓度⼤于buffer可写空间⼤⼩,则需要将临时空间的数据追加到buffer中

    // 从描述符中读取数据到缓冲区中
    // 缓冲区剩余空间大小是固定的,但是fd能读到的数据是不固定,要用到的分块读取
    ssize_t Buffer::readFd(int fd, int *savedErrno)
    {
        //ssize_t readv(int fd, const struct iovec *iov, int iovcnt);
        //ssize_t read(int fd, void *buf, size_t count);
        ssize_t wableSize=writableBytes();
        char temp[65536];//64k
        struct iovec vec[2];
        vec[0].iov_base=beginWrite();//读取的数据优先写入的位置
        vec[0].iov_len=wableSize;//该位置最大写入的数据长度
        vec[1].iov_base=temp;//第二个备用空间
        vec[1].iov_len=65536;
        //1.判断是否需要使用备用空间
        int count=wableSize>65536?1:2;
        ssize_t ret=readv(fd,vec,count);
        //返回值是在实际读取到的数据量,判断是否实际读取的数据大小小于writableBytes()
        //ret <= writableBytes(),代表第二块备用空间没用上,偏移指针即可
        //ret>writableBytes(),代表备用空间中必然有一部分数据:ret-writeableBytes()
        //这种情况就需要将备用空间的数据追加到缓冲区中
        if(ret<0)
        {
            *savedErrno=errno;
        } else if(ret <= wableSize)
        {
            hasWritten(ret);//将写指针向后偏移
        }else{
            hasWritten(wableSize);//将写指针直接偏移到末尾
            append(temp,ret-wableSize);
        }
        return ret;
    }
}

  1. 现代 C++ 内存管理

    • 拷贝构造、移动构造、拷贝赋值、移动赋值
    • swap 高效交换资源
    • vector 自动内存管理
  2. 缓冲区核心算法

    • 双指针读写模型:读指针、写指针分离
    • 内存紧凑机制:空间不足时将数据前移,减少扩容
    • 自动扩容策略:不够则扩容,够则紧凑
    • 前置预留空间:支持头部快速插入数据
  3. Linux 高性能 IO

    • readv 分散读:一次读取两块不连续内存
    • 64KB 临时栈缓冲区,避免内存浪费
    • 错误码保存机制
  4. 字符串与数据处理

    • memchr:查找换行符
    • std::copy:安全内存拷贝
    • assign:string 安全构造
    • retrieve:读指针后移,自动回收空间
  5. C++17 特性

    • std::optional:安全返回,避免空字符串歧义
    • 移动语义优化性能
  6. 工程健壮性

    • 大量 assert 保证逻辑合法性
    • 边界条件全覆盖
    • 自动内存整理,无内存泄漏

第三段:connection.h 知识点

#pragma once
#include "channel.h"
#include "details.h"
#include "socket.h"
#include "buffer.h"
#include <any>
#include <cassert>

namespace net
{
    class TcpConnection;
    using TcpConnectionPtr = std::shared_ptr<TcpConnection>;
    using ConnectionCallback = std::function<void(TcpConnectionPtr)>;
    using MessageCallback = std::function<void(TcpConnectionPtr, Buffer *, Timestamp)>;
    using CloseCallback = std::function<void(TcpConnectionPtr)>;

    // 让 TcpConnection 这个类具备在成员函数里安全获取自身 shared_ptr
    // 的能力; 防止对象还在使用中就被析构(悬空指针)
    class TcpConnection : public std::enable_shared_from_this<TcpConnection>
    {
        enum State
        {
            kDisconnected,
            kConnecting,
            kConnected,
            kDisconnecting
        };

    public:
        // 初始化成员;设置channel回调;设置连接保活
        TcpConnection(EventLoop *loop, int fd, int id) : 
        _id(id),
        _state(kConnecting),
        _loop(loop),
        _socket(new Socket(fd)),
        _channel(new Channel(loop, fd))
        {
            // 设置套接字选项:连接保活
            _socket->setKeepAlive(true);
            // 设置channel回调
            _channel->setReadCallback(std::bind(&TcpConnection::handleRead, this, std::placeholders::_1));
            _channel->setWriteCallback(std::bind(&TcpConnection::handleWrite, this));
            _channel->setCloseCallback(std::bind(&TcpConnection::handleClose, this));
            _channel->setErrorCallback(std::bind(&TcpConnection::handleError, this));
            LOG_DEBUG("construct TcpConnection at:%p - %d", this, _socket->fd());
        }
        // 没有什么要做的
        ~TcpConnection()
        {
            LOG_DEBUG("distruct TcpConnection at:%p - %d", this, _socket->fd());
            assert(_state == kDisconnected);
        }
        // 成员操作
        int id() { return _id; }
        bool connected() { return _state == kConnected; }
        bool disconnected() { return _state == kDisconnected; }
        EventLoop *loop() { return _loop; }
        Buffer *inputBuffer() { return &_inputBuffer; }
        Buffer *outputBuffer() { return &_outputBuffer; }
        void setContext(std::any context) { _context = std::move(context); }
        const std::any &context() const { return _context; }
        std::any &mutableContext() { return _context; }
        void setConnectionCallback(ConnectionCallback cb) { _onConnection = std::move(cb); }
        void setMessageCallback(MessageCallback cb) { _onMessage = std::move(cb); }
        void setCloseCallback(CloseCallback cb) { _onClose = std::move(cb); }

        void send(const void *data, size_t len);
        void forceClose();
        void connectEstablished();
        void connectDestroyed();

    private:
        void sendInLoop(const void *data, size_t len);
        void sendInLoop(const std::string &str);
        void forceCLoseInLoop();
        void handleRead(Timestamp recvTime);
        void handleWrite();
        void handleClose();
        void handleError();

    private:
        const int64_t _id; // 标识ID
        State _state;      // 连接状态

        EventLoop *_loop;
        std::unique_ptr<Socket> _socket;   // 描述符对应的套接字对象
        std::unique_ptr<Channel> _channel; // 描述符事件处理对象
        Buffer _inputBuffer;               // 输入缓冲区(内核-》用户)
        Buffer _outputBuffer;              // 输出缓冲区(用户-》内核)
        std::any _context;                 // 上下文对象
        ConnectionCallback _onConnection;  //
        MessageCallback _onMessage;        //消息到达回调
        CloseCallback _onClose;
    };

}

  1. 头文件依赖

    • channel.h:事件分发
    • socket.h:套接字管理
    • buffer.h:读写缓冲区
    • <any>:连接上下文存储
    • <memory>:智能指针
    • <functional>:函数回调
  2. 回调类型定义

    • ConnectionCallback:连接建立 / 关闭回调
    • MessageCallback:消息到达回调
    • CloseCallback:内部关闭回调
    • TcpConnectionPtrshared_ptr<TcpConnection>
  3. TcpConnection 核心设计

    • 继承 enable_shared_from_this:安全获取自身智能指针
    • 连接状态机:kDisconnected/kConnecting/kConnected/kDisconnecting
  4. 核心成员

    • EventLoop* _loop:归属 IO 线程
    • unique_ptr<Socket>:套接字管理
    • unique_ptr<Channel>:事件管理
    • Buffer _inputBuffer / _outputBuffer:读写缓冲区
    • std::any _context:用户自定义上下文
    • 各类回调函数对象
  5. 公开接口

    • send():发送数据
    • forceClose():强制关闭连接
    • connectEstablished/Destroyed:连接建立 / 销毁
    • 缓冲区、状态、ID 获取接口

第四段:connection.cc 知识点

#include "connection.h"
#include "eventloop.h"
#include <cassert>
namespace net
{
    void TcpConnection::send(const void *data, size_t len){
        //若发送数据的时候就在loop线程中,则直接发送,否则将send任务压入到loop任务池中
        if(_loop->isInLoopThread())
        {
            sendInLoop(data,len);
        }else{
            std::string str((char*)data,len);
            void(TcpConnection::*fp)(const std::string&)=&TcpConnection::sendInLoop;
            _loop->runInLoop(std::bind(fp,this,str));
        }
    }
    void TcpConnection::sendInLoop(const void *data, size_t len){
        _loop->assertInLoopThread();
        //1.若当前没有进行描述符的写事件监控,且发送缓冲区中没有数据:
        //  则直接write/send数据; 否则将数据放入到发送缓冲区中
        //2.若直接发送数据,数据没有发送完,就要把剩余数据放入到自己的发送缓冲区,开启写事件监控
        bool errFlag=false;//错误标志,用于保存write是否出错了
        int32_t leftSize=len;
        int32_t nwrote=0;
        if(!_channel->isWriting()&&_outputBuffer.readableBytes()==0)
        {
            nwrote=sockets::write(_socket->fd(),data,len);
            if(nwrote>=0)
            {
                leftSize-=nwrote;
            }else{
                LOG_ERROR("写入数据失败:%d!",_socket->fd());
                errFlag=true;
            }
        }
        if(leftSize>0&&errFlag==false){
            //将剩余数据放入缓冲区
            _outputBuffer.append((const char*)data+nwrote,leftSize);
            if(!_channel->isWriting()){
                _channel->enableReading();//开启写事件jiankong
            }
        }

    }
    void TcpConnection::sendInLoop(const std::string &str){
        sendInLoop((const void*)str.c_str(),str.size());
    }
    void TcpConnection::forceClose(){
        //连接不管是不是在线程中,都不能直接关闭,必须压任务到loop任务池中进行
        //主要是为了避免任务池中有send任务
        _state=kDisconnecting;//连接状态设置为关闭
        _loop->queueInLoop(std::bind(&TcpConnection::forceCLoseInLoop,shared_from_this()));
    }
    void TcpConnection::forceCLoseInLoop(){
        _loop->assertInLoopThread();
        //设置状态为kdisConnected,解除事件监控,移除监控管理,调用回调函数...
        if(_state==kConnected||_state==kDisconnecting){
            handleClose();
        }
        //kDisconnecting状态说明:这是调用forceClose的时候设置的
        //kConnected状态说明:整个服务端释放资源的时候会调用connectionCallback
        //  内部可能用户调用conn->forceClose()
    }
     //这是实际的释放资源回调
    void TcpConnection::handleClose(){
        //设置状态为kDisconnected,解除事件监控,调用回调函数
        _loop->assertInLoopThread();
        //这个断言保证了handleClose永远只会被调用一次
        assert(_state==kConnected||_state==kDisconnecting);
        _state=kDisconnected;
        _channel->disableAll();
        //额外保存一份只能指针对象,防止提前释放
        TcpConnectionPtr gardThis(shared_from_this());
        if(_onConnection) _onConnection(gardThis);//这是用户设置的连接建立/关闭回调函数
        if(_onClose) _onClose(gardThis);//这是框架内部的资源释放回调处理函数
    }
    void TcpConnection::connectEstablished(){
        //连接对象所有初始化工作都完成后,才会被调用的接口,而且必须在loop线程中
        _loop->assertInLoopThread();
        assert(_state==kConnecting);
        _state=kConnected;
        //设置channel中的观察者对象
        _channel->tie(shared_from_this());// 绑定自己,防止析构
        //启动channel的读事件监控
        _channel->enableReading();// 开启读事件监听
        //调用连接建立回调函数(通知用户)
        if(_onConnection) _onConnection(shared_from_this());
        
    }
    void TcpConnection::connectDestroyed(){
        //这个接口是整个服务器TcpServer析构的时候释放连接对象时调用
        _loop->assertInLoopThread();
        if(_state==kConnected){//连接建立中
            _channel->disableAll();
            if(_onConnection) _onConnection(shared_from_this());
        }
        _channel->remove();//移除事件监控管理
    }
    
    void TcpConnection::handleRead(Timestamp recvTime){
        _loop->assertInLoopThread();
        //将socket内核缓冲区的数据读取到inputbuffer中,然后调用mssageCallback
        int errNum;
        ssize_t n=_inputBuffer.readFd(_socket->fd(),&errNum);
        if(n==0){
            //连接断开
            return handleClose();
        }else if(n<0){
            //读取出错
            return handleError();
        }
        if(_onMessage) _onMessage(shared_from_this(),&_inputBuffer,recvTime);

    }
    void TcpConnection::handleWrite(){//写事件触发的时候调用
        _loop->assertInLoopThread();
        if(_channel->isWriting()==false)
        {
            //连接半关闭
            LOG_ERROR("CONNECTION FD %d is shutdown write",_socket->fd());
            return;
        }
        ssize_t n=sockets::write(_socket->fd(),_outputBuffer.peek(),_outputBuffer.readableBytes());
        if(n<0)
        {
            LOG_ERROR("发送数据错误");
            return;
        }
        _outputBuffer.retrieve(n);//将发送缓冲区的读指针向后偏移
        //需要考虑发送缓冲区中的数据有没有发送完毕,发送缓冲区没有数据了,关闭写事件监控
        if(_outputBuffer.readableBytes()==0){
            // 发完关闭写监听,避免空转
            _channel->disableWriting();
        }
    }
 
    void TcpConnection::handleError(){
        _loop->assertInLoopThread();
        LOG_ERROR("连接操作出错");
    }
}

  1. Reactor 模型事件驱动

    • Channel 绑定读 / 写 / 关闭 / 错误回调
    • 事件触发 → 调用对应 handle 函数
    • 完全非阻塞 IO 模型
  2. 线程安全保证

    • isInLoopThread:判断是否在 IO 线程
    • runInLoop/queueInLoop:跨线程安全调用
    • 所有 IO 操作必须在 IO 线程执行
  3. 数据发送逻辑(send)

    • 直接发送 + 缓冲区排队双策略
    • 写事件监控按需开启 / 关闭
    • 未发完数据存入 outputBuffer
    • 避免重复开启写事件导致 CPU 空转
  4. 读事件处理(handleRead)

    • readFd 分散读取到 inputBuffer
    • 读取长度 = 0 → 对端关闭 → handleClose
    • 读取成功 → 调用 messageCallback
  5. 写事件处理(handleWrite)

    • 从 outputBuffer 取出数据发送
    • 发送完成自动关闭写事件
    • 错误处理与日志输出
  6. 连接关闭与状态机

    • handleClose:关闭状态流转
    • forceClose:安全强制关闭
    • 状态保证:只关闭一次,不重复析构
    • shared_from_this 防止连接提前释放
  7. 智能指针安全

    • gardThis 保护连接生命周期
    • tie(shared_from_this()) 绑定 Channel 防止悬空
  8. 工程级鲁棒性

    • 大量断言保证状态合法性
    • 错误日志清晰
    • 资源自动释放
    • 上下文任意类型存储
  9. 套接字选项

    • setKeepAlive:TCP 保活

三、Buffer 类:高性能应用层缓冲区设计思想

1. 双指针无拷贝设计

Buffer 使用 读指针 + 写指针 实现高效数据管理:

  • 无需频繁移动内存
  • 读取只移动指针
  • 写入直接追加
  • 空间不足自动紧凑或扩容

2. 自动内存紧凑(核心亮点)

当尾部空间不足时:

  • 将有效数据整体前移
  • 释放头部空闲空间
  • 减少不必要的内存分配
  • 大幅提升性能

3. readv 分散读(Linux 高性能 IO)

一次 readv 读取两块内存:

  1. Buffer 可写区域
  2. 栈上临时 64KB 空间 避免缓冲区不足导致数据丢失。

4. 行读取支持

getline() 使用 memchr 快速查找 \n,适合 HTTP、Redis、自定义文本协议。


四、TcpConnection 类:TCP 连接生命周期管理

1. 连接状态机(非常关键)

  • kConnecting:连接中
  • kConnected:已连接
  • kDisconnecting:关闭中
  • kDisconnected:已关闭

所有操作严格按状态流转,避免逻辑混乱。

2. 线程安全

所有 IO、事件、关闭操作必须在 IO 线程执行。 跨线程调用通过 runInLoop 转发,无锁、高效、安全。

3. 智能指针安全

继承 enable_shared_from_this,确保:

  • 回调执行时连接不会被释放
  • Channel 不会出现悬空指针
  • 资源安全释放

4. 双缓冲区设计

  • inputBuffer:内核 → 用户
  • outputBuffer:用户 → 内核

完美解决非阻塞 socket 发送不完整、粘包半包问题。


五、整体工作流程(最清晰)

  1. 连接建立 → TcpConnection 构造
  2. 绑定 Channel 回调 → 开启读事件
  3. 数据到达 → 触发 handleRead → readFd 读入 Buffer → 消息回调
  4. 发送数据 → 直接写 / 存入 outputBuffer → 开启写事件
  5. 写事件触发 → 发送缓冲区数据 → 发完关闭写事件
  6. 对端关闭 / 主动关闭 → 状态机关闭 → 资源释放 → 回调通知

六、总结(最精华)

Buffer 核心价值

  • 双指针高效管理
  • 自动扩容 + 内存紧凑
  • 分散读高性能
  • 行读取支持协议解析
  • 无拷贝、无泄漏、高稳定

TcpConnection 核心价值

  • 完整 TCP 连接生命周期管理
  • Reactor 事件驱动
  • 线程安全、跨线程安全发送
  • 状态机保证健壮
  • 智能指针防止悬空
  • 双缓冲区解决非阻塞收发问题
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐