工作模型和工作流程

服务器端和客户端模型图

服务器端工作流程图

客户端流程图

Code实现

服务器端实现

SERVER/
├── build/            # 编译产物目录(存放 CMake 生成的中间文件和可执行程序)
├── include/          # 头文件目录
│   └── server.h      # 服务器类定义头文件
├── src/              # 源文件目录
│   ├── main.cpp      # 程序入口文件
│   └── server.cpp    # 服务器类实现文件
├── CMakeLists.txt    # CMake 项目构建脚本
└── msg.proto         # Protobuf 消息定义文件
#pragma once
#include"msg.pb.h"
#include<string>
#include<map>
#include"arpa/inet.h"
#include<sys/socket.h>
#include<thread>
#include<mutex>
#include<queue>
#include<condition_variable>
#include<functional>

enum class Msg_Type{
    LOGIN=1,
    CHAT=2,
    QUIT=3
};

class server
{
private:
    const int BUF_SIZE=128;

    //套接字文件
    int sockfd;
    //客户端容器
    std::map<int,struct sockaddr_in>clients;
    //保护客户端的互斥锁
    std::mutex client_mutex;

    //存储线程的线程容器
    std::vector<std::thread>workers;
    //存储任务的工作队列
    std::queue<std::function<void()>>tasks;
    //互斥锁:解决线程池的互斥问题
    std::mutex task_mutex;
    //条件变量,用于解决同步问题
    std::condition_variable task_cv;
    //线程池工作状态的标志
    bool isRun;

private:
    //启动线程池
    void startThreadPool(const int ThreadSize);
    //@task:指定的任务
    //添加任务到线程池的工作队列
    void addTask(std::function<void()>task);
    //@client_fd:客户端的文件描述符
    //@cin:客户端的信息结构体
    // 处理客户端连接的函数
    void handleClient(int client_fd, struct sockaddr_in cin);
    //@msg:发送客户端的消息
    //@exclude_fd:不转发消息的客户端
    // 广播消息函数        
    void broadcast(const Msg&msg, int exclude_fd = -1);             
public:
    //@ip:服务器端绑定的ip
    //@port:服务器端绑定的端口号
    //@ThreadSize:服务器端线程池初始化的工作线程数量
    server(const std::string&ip,const int port,const int ThreadSize=4);
    //析构函数  
    ~server();
    //服务器端主线程主流程
    void run();
};


#include"server.h"
#include <unistd.h>
#include <netinet/in.h>

// 用户打印错误信息的宏
#define ERR_LOG(msg)                                                      \
    do                                                                    \
    {                                                                     \
        perror(msg);                                                      \
        std::cout << __LINE__ << "  " << __func__ << "  " << __FILE__ << std::endl; \
    } while (0)

server::server(const std::string&ip,const int port,const int ThreadSize):isRun(true){
    sockfd=socket(AF_INET,SOCK_STREAM,0);
    //1.创建套接字
    if(sockfd<0){
        ERR_LOG("socket error");
        return;
    }

    //2.设置端口快速复用
    int res=1;
    if(setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR,&res,sizeof(res))<0){
        ERR_LOG("setsockopt error");
        close(sockfd);
        return;
    }

    //3.绑定ip和端口号
    struct sockaddr_in sin;
    sin.sin_family=AF_INET;
    sin.sin_addr.s_addr=inet_addr(ip.c_str());
    sin.sin_port=htons(port);
    socklen_t len=sizeof(sin);
    if(bind(sockfd,(const sockaddr*)&sin,len)<0){
        ERR_LOG("bind error");
        close(sockfd);
        return;
    }

    //4.启动监听状态
    if(listen(sockfd,128)<0){
        ERR_LOG("listen error");
        return;
    }

    //5.初始化线程池
    startThreadPool(ThreadSize);
}

server::~server(){
    //避免死锁:防止当工作线程在执行任务就被通知
    {
        //主线程抢锁,防止工作线程影响
        std::unique_lock<std::mutex> lock(task_mutex);
        //关闭线程池
        isRun=false;
        //当局部变量离开作用域时调用析构函数自动释放锁
    }
    //主线程唤醒所有工作线程
    task_cv.notify_all();
    //主线程等待回收所有工作线程
    for(auto&worker:workers)worker.join();
    close(sockfd);
}

void server::run(){
    //创建接受客户端的信息结构体
    struct sockaddr_in cli;
    socklen_t len=sizeof(cli);

    //循环接受客户端
    while(1){
        int fd=accept(sockfd,(sockaddr*)&cli,&len);
        if(fd<0){
            ERR_LOG("accept error");
            break;
        }
        //添加客户端任务到任务队列中
        addTask([this,fd,cli]{
            handleClient(fd,cli);
        });
    }
}

void server::addTask(std::function<void()>task){
    {
        //主线程抢锁,防止工作线程影响
        std::unique_lock<std::mutex> lock(task_mutex);
        //将任务加入到工作队列中,等待有空间的工作线程执行任务
        tasks.emplace(task);
        //当局部变量离开作用域时调用析构函数自动释放锁
    }
    //主线程通知一个线程执行任务
    task_cv.notify_one();
}

void server::startThreadPool(const int ThreadSize){
    //创建并初始化指定数量的线程
    for(int i=0;i<ThreadSize;i++){
        //emplace_back:直接创建一个线程添加到线程容器中
        //lambda表达式作为线程的任务
        workers.emplace_back([this]{
            while(1){
                //创建一个任务,工作线程可能同时执行各自任务,所以执行任务时不占用锁
                //但是从工作队列中拿取任务时操作临界资源需要抢占锁
                std::function<void()>task;
                {
                    //抢占锁
                    std::unique_lock<std::mutex> lock(task_mutex);
                    //等待被主线程唤醒:当线程池停止或有任务可取时返回
                    task_cv.wait(lock,[this]{
                        return isRun==false || !tasks.empty();
                    });
                    //当线程池停止且工作队列为空时,线程退出
                    if(isRun==false && tasks.empty()) return;
                    //线程池正常时,工作线程从工作队列中取出任务
                    task=std::move(tasks.front());
                    tasks.pop();
                }
                //拿到任务后开始执行
                task();
            }
        });
    }
}

void server::handleClient(int client_fd, struct sockaddr_in cin){
    //创建接受消息的容器
    char buf[BUF_SIZE]="";
    Msg msg;
    std::string input;
    int recv_len=0;
    
    while(1){
        memset(buf,0,BUF_SIZE);
        recv_len=recv(client_fd,buf,BUF_SIZE,0);
        //std::cout<<"recv_len is "<<recv_len<<std::endl;
        //表示客户端下线,删除客户端并结束工作线程
        if(recv_len<=0){
            //删除客户端
            for(auto&[fd,cli]:clients){
                if(fd==client_fd){
                    clients.erase(fd);
                    break;
                }
            }
            close(client_fd);
            break;
        }
        //成功收到客户端消息,反序列化消息,并判断消息类型
        msg.ParseFromArray(buf,recv_len);
        //std::cout<<msg.type()<<" "<<msg.name()<<" "<<msg.data()<<std::endl;

        switch (msg.type())
        {
        case (int)Msg_Type::LOGIN:
            //保护客户端容器(临界资源)
            {
                std::unique_lock<std::mutex>lock(client_mutex);
                //把新连接的客户端放入到服务器端的客户端容器中
                clients.emplace(client_fd,cin);
                //组装消息并发送给所有其他客户端
                sprintf(buf,"---------%s:login succeed---------",msg.name().c_str());
                msg.set_data(buf);
                //转发所有客户端该用户登录成功,包括自己
                //std::cout<<msg.type()<<" "<<msg.name()<<" "<<msg.data()<<std::endl;
                broadcast(msg);
            }
            break;
        
        case (int)Msg_Type::CHAT:
            {
                std::unique_lock<std::mutex>lock(client_mutex);
                //转发消息给所有客户端,但不包括自己
                broadcast(msg,client_fd);
            }
            break;
        
        case (int)Msg_Type::QUIT:
            //删除客户端
            for(auto&[fd,cli]:clients){
                if(fd==client_fd){
                    clients.erase(fd);
                    break;
                }
            }
            close(client_fd);
            break;
        
        default:
            ERR_LOG("recv invalid msg");
            close(client_fd);
            return;
        }
    }
}

void server::broadcast(const Msg&msg, int exclude_fd){
    //序列化消息
    std::string output;
    msg.SerializePartialToString(&output);

    for(auto&[fd,cli]:clients){
        if(fd!=exclude_fd){
            if(send(fd,output.c_str(),output.size(),0)<0){
                ERR_LOG("send error");
                break;
            }
        }
    }
}
#include"server.h"
#include<iostream>

int main(int argc,const char*argv[]){
    if(argc<3){
        std::cout<<"please input the ip and the port of the server"<<std::endl;
        return -1;
    }
    try{
        std::string ip=argv[1];
        int port=atoi(argv[2]);
        server s(ip,port);
        s.run();
    }catch(const std::exception&e){
        std::cout<<e.what()<<std::endl;
        return -1;
    }
    return 0;
}
cmake_minimum_required(VERSION 3.20)
project(CHATROOM_SERVER CXX)

# 1. 查找依赖包
find_package(Protobuf REQUIRED)
find_package(Threads REQUIRED) # 推荐用这种方式处理 pthread

# 2. 自动生成 Protobuf 源代码
# 注意顺序:第一个是生成的 .cc 列表,第二个是生成的 .h 列表
protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS msg.proto)

# 3. 设置头文件搜索路径
# 必须包含 CMAKE_CURRENT_BINARY_DIR,因为生成的 msg.pb.h 在这里
include_directories(
    ${CMAKE_SOURCE_DIR}/include
    ${CMAKE_CURRENT_BINARY_DIR}
)

# 4. 生成可执行程序
# 必须把生成的 ${PROTO_SRCS} 丢进去一起编译
add_executable(server 
    src/main.cpp 
    src/server.cpp 
    ${PROTO_SRCS}
)

# 5. 链接库
# 使用官方推荐的变量名
target_link_libraries(server 
    PUBLIC 
    ${Protobuf_LIBRARIES} 
    Threads::Threads
)
syntax = "proto3";

message Msg {
  int32 type=1;
  bytes name=2;
  bytes data=3;
}

客户端实现

SERVER/
├── build/            # 编译产物目录(存放 CMake 生成的中间文件和可执行程序)
├── include/          # 头文件目录
│   └── server.h      # 服务器类定义头文件
├── src/              # 源文件目录
│   ├── main.cpp      # 程序入口文件
│   └── server.cpp    # 服务器类实现文件
├── CMakeLists.txt    # CMake 项目构建脚本
└── msg.proto         # Protobuf 消息定义文件
#include<msg.pb.h>
#include<string>

enum class Msg_Type{
LOGIN=1,
CHAT=2,
QUIT=3
};

class client
{
private:
const int BUF_SIZE=128;
int sockfd;
std::string name;
bool running;

private:
void recvMsg();
void sendMsg(Msg_Type type,const std::string&data="");

public:
client(const std::string&ip,const int port,const std::string& name);
~client();
void run();
};


#include"client.h"
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include"arpa/inet.h"
#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>

#define ERR_LOG(msg)                                                      \
    do                                                                    \
    {                                                                     \
        perror(msg);                                                      \
        std::cout << __LINE__ << "  " << __func__ << "  " << __FILE__ << std::endl; \
    } while (0)

client::client(const std::string&ip,const int port,const std::string& name):name(name)
{
    //1.创建套接字
    sockfd=socket(AF_INET,SOCK_STREAM,0);
    if(sockfd<0){
        ERR_LOG("socket error");
        close(sockfd);
        return;
    }
    
    //2.连接服务器端
    struct sockaddr_in sin;
    sin.sin_addr.s_addr=inet_addr(ip.c_str());
    sin.sin_family=AF_INET;
    sin.sin_port=htons(port);
    socklen_t len=sizeof(sin);
    if(connect(sockfd,(const sockaddr*)&sin,len)<0){
        ERR_LOG("connect error");
        close(sockfd);
        return;
    }

    //3.发送登录消息
    sendMsg(Msg_Type::LOGIN);
}

client::~client()
{
    sendMsg(Msg_Type::QUIT);
    if(sockfd>0)close(sockfd);
}

void client::run()
{
    // 1. 创建 epoll 实例
    int epfd = epoll_create(1);
    if (epfd == -1) {
        ERR_LOG("epoll_create error");
        return;
    }

    struct epoll_event ev, events[2];

    // 2. 将标准输入 (stdin) 加入 epoll 监听 —— 用于读取用户输入
    ev.events = EPOLLIN;
    ev.data.fd = STDIN_FILENO; 
    epoll_ctl(epfd, EPOLL_CTL_ADD, STDIN_FILENO, &ev);

    // 3. 将服务器套接字 (sockfd) 加入 epoll 监听 —— 用于接收消息
    ev.events = EPOLLIN;
    ev.data.fd = sockfd;
    epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);

    std::string text;
    running=true;
    while (running)
    {
        // 4. 等待事件发生(阻塞)
        int n = epoll_wait(epfd, events, 2, -1);
        if (n < 0) {
            if (errno == EINTR) continue;
            ERR_LOG("epoll_wait error");
            break;
        }

        for (int i = 0; i < n; i++)
        {
            int fd = events[i].data.fd;

            // 情况 A:标准输入有数据(用户打了字)
            if (fd == STDIN_FILENO)
            {
                if (!std::getline(std::cin, text)) break;

                if (text == "quit") {
                    running=false;
                    break;
                }
                // 发送消息
                sendMsg(Msg_Type::CHAT, text);
            }
            // 情况 B:套接字有数据(服务器发来了消息)
            else if (fd == sockfd)
            {
                recvMsg();
            }
        }
    }

    close(epfd);
}

void client::recvMsg(){
    char buf[BUF_SIZE];
    memset(buf,0,BUF_SIZE);
    int recv_len=recv(sockfd,buf,BUF_SIZE,0);
    //std::cout<<"recv_len is "<<recv_len<<std::endl;
    if(recv_len<=0){
        ERR_LOG("recv error");
        running=false;
        return;
    }
    Msg msg;
    msg.ParseFromArray(buf,recv_len);
    std::cout<<msg.name()<<": "<<msg.data()<<std::endl;
}

void client::sendMsg(Msg_Type type,const std::string&data){
    Msg msg;
    msg.set_data(data.c_str());
    msg.set_type((int)type);
    msg.set_name(name.c_str());

    //序列化
    std::string output;
    msg.SerializeToString(&output);

    if(send(sockfd,output.c_str(),output.size(),0)<0){
        ERR_LOG("send error");
        running=false;
        return;
    }
}
#include"client.h"

int main(int argc,const char*argv[]){
    if(argc<4){
        std::cout<<"please input the ip and the port of the server,and your name"<<std::endl;
        return -1;
    }
    try{
        std::string ip=argv[1];
        int port=atoi(argv[2]);
        std::string name=argv[3];
        client s(ip,port,name);
        s.run();
    }catch(const std::exception&e){
        std::cerr<<e.what()<<std::endl;
        return -1;
    }
    return 0;
}
cmake_minimum_required(VERSION 3.20)

project(CHATROOM_CLIENT CXX)

find_package(Threads REQUIRED)
find_package(Protobuf REQUIRED)

include_directories(${CMAKE_SOURCE_DIR}/include)
include_directories(${CMAKE_CURRENT_BINARY_DIR})

protobuf_generate_cpp(PROTO_SRCS PROTO_HADS msg.proto)

add_executable(client src/main.cpp src/client.cpp ${PROTO_SRCS})

target_link_libraries(client PUBLIC Threads::Threads ${Protobuf_LIBRARIES})
syntax = "proto3";

message Msg {
    int32 type=1;
    bytes name=2;
    bytes data=3;
}


注意的点

Protobuf

该项目中采用protobuf来进行消息传输的序列化和反序列化,具体的消息定义如下:

message Msg {
    //消息类型
    int32 type=1;
    //发送消息的用户名
    bytes name=2;
    //消息正文
    bytes data=3;
}

我们的消息类型的定义在.h文件中,具体消息类型如下:

enum class Msg_Type{
    LOGIN=1,
    CHAT=2,
    QUIT=3
};

事实上,消息类型的定义也可以在.proto文件中,因为protobuf本身支持枚举类型。

在项目中有以下几点使用时机:

  1. 客户端向服务器端发送消息:组装消息类,并序列化后发送消息
  2. 服务器端接受客户端消息:反序列化消息,判断消息类型并读取消息数据
  3. 服务端端转发客户消息:将加工后的消息(已序列化)发送给其他客户端
  4. 客户端接受服务器端的消息:反序列化消息,在终端输出消息数据

在 cmake 中需要加入 protobuf 的处理:

  1. 查找依赖包
  2. 自动生成 protobuf 源文件
  3. 设置头文件搜索路径
  4. 生成可执行文件时编译 protobuf 源文件
  5. 动态链接 protobuf 库
# 1. 查找依赖包
find_package(Protobuf REQUIRED)

# 2. 自动生成 Protobuf 源代码
# 注意顺序:第一个是生成的 .cc 列表,第二个是生成的 .h 列表
protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS msg.proto)

# 3. 设置头文件搜索路径
# 必须包含 CMAKE_CURRENT_BINARY_DIR,因为生成的 msg.pb.h 在这里
include_directories(${CMAKE_CURRENT_BINARY_DIR})

# 4. 生成可执行程序
# 必须把生成的 ${PROTO_SRCS} 丢进去一起编译
add_executable(server 
    src/main.cpp 
    src/server.cpp 
    ${PROTO_SRCS}
)

# 5. 链接库
# 使用官方推荐的变量名
target_link_libraries(server 
    PUBLIC 
    ${Protobuf_LIBRARIES} 
)

牢记TCP通信流程

线程池的使用

在服务器端中使用线程池来处理并发客户端。

多线程

客户端的存储容器

该项目中使用 std::map<int,struct sockaddr_in>类型的容器存储客户端,其原理是数据结构中的红黑树,插入、查询和删除的效率都较适中,其他容器只要能实现插入、查询和删除均可。

在该类型中以客户端文件描述符作为键值,容器内客户端的排序以键值来作为排序标准,默认为升序

//删除客户端
for(auto&[fd,cli]:clients){
    if(fd==client_fd){
        clients.erase(fd);
        break;
    }
}
//把新连接的客户端放入到服务器端的客户端容器中
clients.emplace(client_fd,cin);
for(auto&[fd,cli]:clients){
    if(fd!=exclude_fd){
        if(send(fd,output.c_str(),output.size(),0)<0){
            ERR_LOG("send error");
            break;
        }
    }
}

IO 多路复用(epoll)

在客户端中需要并发处理两个任务:

  1. 从终端读取数据,发送消息
  2. 接受服务器端消息,输出在终端

一种方法是:创建一个子线程处理其中一个任务,另一个任务主线程处理。

但考虑到客户端处理任务复杂度并不高,可以考虑采用 IO 多路复用进行非阻塞处理任务,效率较高。

IO 多路复用常见的有三种:

  1. select 机制
  2. poll 机制
  3. epoll 机制

这里使用** epoll 机制,**具体处理流程:

// 1. 初始化阶段:创建“监控大厅”
int epoll_fd = epoll_create(); 

// 2. 注册阶段:把关心的“目标”和“事件”加入名单
for (auto& target : watch_list) {
    epoll_event ev;
    ev.events = EPOLLIN;      // 监控“可读”事件
    ev.data.fd = target.fd;   // 绑定文件描述符
    epoll_ctl(epoll_fd, ADD, target.fd, &ev);
}

// 3. 循环监听阶段
while (is_running) {
    epoll_event active_events[MAX_EVENTS];
    
    // 阻塞等待,直到有 fd 触发了事件
    int count = epoll_wait(epoll_fd, active_events, MAX_EVENTS, -1);
    
    // 4. 事件分发处理阶段
    for (int i = 0; i < count; ++i) {
        int fd = active_events[i].data.fd;
        uint32_t mode = active_events[i].events;

        if (mode & EPOLLIN) {
            // 根据 fd 的类型执行对应的逻辑
            handle_read_event(fd); 
        } else if (mode & EPOLLOUT) {
            handle_write_event(fd);
        } else if (mode & EPOLLERR) {
            handle_error(fd);
        }
    }
}

// 5. 资源销毁
close(epoll_fd);
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐