基于TCP的网络聊天室
·
工作模型和工作流程
服务器端和客户端模型图

服务器端工作流程图

客户端流程图

Code实现
服务器端实现
SERVER/
├── build/ # 编译产物目录(存放 CMake 生成的中间文件和可执行程序)
├── include/ # 头文件目录
│ └── server.h # 服务器类定义头文件
├── src/ # 源文件目录
│ ├── main.cpp # 程序入口文件
│ └── server.cpp # 服务器类实现文件
├── CMakeLists.txt # CMake 项目构建脚本
└── msg.proto # Protobuf 消息定义文件
#pragma once
#include"msg.pb.h"
#include<string>
#include<map>
#include"arpa/inet.h"
#include<sys/socket.h>
#include<thread>
#include<mutex>
#include<queue>
#include<condition_variable>
#include<functional>
enum class Msg_Type{
LOGIN=1,
CHAT=2,
QUIT=3
};
class server
{
private:
const int BUF_SIZE=128;
//套接字文件
int sockfd;
//客户端容器
std::map<int,struct sockaddr_in>clients;
//保护客户端的互斥锁
std::mutex client_mutex;
//存储线程的线程容器
std::vector<std::thread>workers;
//存储任务的工作队列
std::queue<std::function<void()>>tasks;
//互斥锁:解决线程池的互斥问题
std::mutex task_mutex;
//条件变量,用于解决同步问题
std::condition_variable task_cv;
//线程池工作状态的标志
bool isRun;
private:
//启动线程池
void startThreadPool(const int ThreadSize);
//@task:指定的任务
//添加任务到线程池的工作队列
void addTask(std::function<void()>task);
//@client_fd:客户端的文件描述符
//@cin:客户端的信息结构体
// 处理客户端连接的函数
void handleClient(int client_fd, struct sockaddr_in cin);
//@msg:发送客户端的消息
//@exclude_fd:不转发消息的客户端
// 广播消息函数
void broadcast(const Msg&msg, int exclude_fd = -1);
public:
//@ip:服务器端绑定的ip
//@port:服务器端绑定的端口号
//@ThreadSize:服务器端线程池初始化的工作线程数量
server(const std::string&ip,const int port,const int ThreadSize=4);
//析构函数
~server();
//服务器端主线程主流程
void run();
};
#include"server.h"
#include <unistd.h>
#include <netinet/in.h>
// 用户打印错误信息的宏
#define ERR_LOG(msg) \
do \
{ \
perror(msg); \
std::cout << __LINE__ << " " << __func__ << " " << __FILE__ << std::endl; \
} while (0)
server::server(const std::string&ip,const int port,const int ThreadSize):isRun(true){
sockfd=socket(AF_INET,SOCK_STREAM,0);
//1.创建套接字
if(sockfd<0){
ERR_LOG("socket error");
return;
}
//2.设置端口快速复用
int res=1;
if(setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR,&res,sizeof(res))<0){
ERR_LOG("setsockopt error");
close(sockfd);
return;
}
//3.绑定ip和端口号
struct sockaddr_in sin;
sin.sin_family=AF_INET;
sin.sin_addr.s_addr=inet_addr(ip.c_str());
sin.sin_port=htons(port);
socklen_t len=sizeof(sin);
if(bind(sockfd,(const sockaddr*)&sin,len)<0){
ERR_LOG("bind error");
close(sockfd);
return;
}
//4.启动监听状态
if(listen(sockfd,128)<0){
ERR_LOG("listen error");
return;
}
//5.初始化线程池
startThreadPool(ThreadSize);
}
server::~server(){
//避免死锁:防止当工作线程在执行任务就被通知
{
//主线程抢锁,防止工作线程影响
std::unique_lock<std::mutex> lock(task_mutex);
//关闭线程池
isRun=false;
//当局部变量离开作用域时调用析构函数自动释放锁
}
//主线程唤醒所有工作线程
task_cv.notify_all();
//主线程等待回收所有工作线程
for(auto&worker:workers)worker.join();
close(sockfd);
}
void server::run(){
//创建接受客户端的信息结构体
struct sockaddr_in cli;
socklen_t len=sizeof(cli);
//循环接受客户端
while(1){
int fd=accept(sockfd,(sockaddr*)&cli,&len);
if(fd<0){
ERR_LOG("accept error");
break;
}
//添加客户端任务到任务队列中
addTask([this,fd,cli]{
handleClient(fd,cli);
});
}
}
void server::addTask(std::function<void()>task){
{
//主线程抢锁,防止工作线程影响
std::unique_lock<std::mutex> lock(task_mutex);
//将任务加入到工作队列中,等待有空间的工作线程执行任务
tasks.emplace(task);
//当局部变量离开作用域时调用析构函数自动释放锁
}
//主线程通知一个线程执行任务
task_cv.notify_one();
}
void server::startThreadPool(const int ThreadSize){
//创建并初始化指定数量的线程
for(int i=0;i<ThreadSize;i++){
//emplace_back:直接创建一个线程添加到线程容器中
//lambda表达式作为线程的任务
workers.emplace_back([this]{
while(1){
//创建一个任务,工作线程可能同时执行各自任务,所以执行任务时不占用锁
//但是从工作队列中拿取任务时操作临界资源需要抢占锁
std::function<void()>task;
{
//抢占锁
std::unique_lock<std::mutex> lock(task_mutex);
//等待被主线程唤醒:当线程池停止或有任务可取时返回
task_cv.wait(lock,[this]{
return isRun==false || !tasks.empty();
});
//当线程池停止且工作队列为空时,线程退出
if(isRun==false && tasks.empty()) return;
//线程池正常时,工作线程从工作队列中取出任务
task=std::move(tasks.front());
tasks.pop();
}
//拿到任务后开始执行
task();
}
});
}
}
void server::handleClient(int client_fd, struct sockaddr_in cin){
//创建接受消息的容器
char buf[BUF_SIZE]="";
Msg msg;
std::string input;
int recv_len=0;
while(1){
memset(buf,0,BUF_SIZE);
recv_len=recv(client_fd,buf,BUF_SIZE,0);
//std::cout<<"recv_len is "<<recv_len<<std::endl;
//表示客户端下线,删除客户端并结束工作线程
if(recv_len<=0){
//删除客户端
for(auto&[fd,cli]:clients){
if(fd==client_fd){
clients.erase(fd);
break;
}
}
close(client_fd);
break;
}
//成功收到客户端消息,反序列化消息,并判断消息类型
msg.ParseFromArray(buf,recv_len);
//std::cout<<msg.type()<<" "<<msg.name()<<" "<<msg.data()<<std::endl;
switch (msg.type())
{
case (int)Msg_Type::LOGIN:
//保护客户端容器(临界资源)
{
std::unique_lock<std::mutex>lock(client_mutex);
//把新连接的客户端放入到服务器端的客户端容器中
clients.emplace(client_fd,cin);
//组装消息并发送给所有其他客户端
sprintf(buf,"---------%s:login succeed---------",msg.name().c_str());
msg.set_data(buf);
//转发所有客户端该用户登录成功,包括自己
//std::cout<<msg.type()<<" "<<msg.name()<<" "<<msg.data()<<std::endl;
broadcast(msg);
}
break;
case (int)Msg_Type::CHAT:
{
std::unique_lock<std::mutex>lock(client_mutex);
//转发消息给所有客户端,但不包括自己
broadcast(msg,client_fd);
}
break;
case (int)Msg_Type::QUIT:
//删除客户端
for(auto&[fd,cli]:clients){
if(fd==client_fd){
clients.erase(fd);
break;
}
}
close(client_fd);
break;
default:
ERR_LOG("recv invalid msg");
close(client_fd);
return;
}
}
}
void server::broadcast(const Msg&msg, int exclude_fd){
//序列化消息
std::string output;
msg.SerializePartialToString(&output);
for(auto&[fd,cli]:clients){
if(fd!=exclude_fd){
if(send(fd,output.c_str(),output.size(),0)<0){
ERR_LOG("send error");
break;
}
}
}
}
#include"server.h"
#include<iostream>
int main(int argc,const char*argv[]){
if(argc<3){
std::cout<<"please input the ip and the port of the server"<<std::endl;
return -1;
}
try{
std::string ip=argv[1];
int port=atoi(argv[2]);
server s(ip,port);
s.run();
}catch(const std::exception&e){
std::cout<<e.what()<<std::endl;
return -1;
}
return 0;
}
cmake_minimum_required(VERSION 3.20)
project(CHATROOM_SERVER CXX)
# 1. 查找依赖包
find_package(Protobuf REQUIRED)
find_package(Threads REQUIRED) # 推荐用这种方式处理 pthread
# 2. 自动生成 Protobuf 源代码
# 注意顺序:第一个是生成的 .cc 列表,第二个是生成的 .h 列表
protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS msg.proto)
# 3. 设置头文件搜索路径
# 必须包含 CMAKE_CURRENT_BINARY_DIR,因为生成的 msg.pb.h 在这里
include_directories(
${CMAKE_SOURCE_DIR}/include
${CMAKE_CURRENT_BINARY_DIR}
)
# 4. 生成可执行程序
# 必须把生成的 ${PROTO_SRCS} 丢进去一起编译
add_executable(server
src/main.cpp
src/server.cpp
${PROTO_SRCS}
)
# 5. 链接库
# 使用官方推荐的变量名
target_link_libraries(server
PUBLIC
${Protobuf_LIBRARIES}
Threads::Threads
)
syntax = "proto3";
message Msg {
int32 type=1;
bytes name=2;
bytes data=3;
}
客户端实现
SERVER/
├── build/ # 编译产物目录(存放 CMake 生成的中间文件和可执行程序)
├── include/ # 头文件目录
│ └── server.h # 服务器类定义头文件
├── src/ # 源文件目录
│ ├── main.cpp # 程序入口文件
│ └── server.cpp # 服务器类实现文件
├── CMakeLists.txt # CMake 项目构建脚本
└── msg.proto # Protobuf 消息定义文件
#include<msg.pb.h>
#include<string>
enum class Msg_Type{
LOGIN=1,
CHAT=2,
QUIT=3
};
class client
{
private:
const int BUF_SIZE=128;
int sockfd;
std::string name;
bool running;
private:
void recvMsg();
void sendMsg(Msg_Type type,const std::string&data="");
public:
client(const std::string&ip,const int port,const std::string& name);
~client();
void run();
};
#include"client.h"
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include"arpa/inet.h"
#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>
#define ERR_LOG(msg) \
do \
{ \
perror(msg); \
std::cout << __LINE__ << " " << __func__ << " " << __FILE__ << std::endl; \
} while (0)
client::client(const std::string&ip,const int port,const std::string& name):name(name)
{
//1.创建套接字
sockfd=socket(AF_INET,SOCK_STREAM,0);
if(sockfd<0){
ERR_LOG("socket error");
close(sockfd);
return;
}
//2.连接服务器端
struct sockaddr_in sin;
sin.sin_addr.s_addr=inet_addr(ip.c_str());
sin.sin_family=AF_INET;
sin.sin_port=htons(port);
socklen_t len=sizeof(sin);
if(connect(sockfd,(const sockaddr*)&sin,len)<0){
ERR_LOG("connect error");
close(sockfd);
return;
}
//3.发送登录消息
sendMsg(Msg_Type::LOGIN);
}
client::~client()
{
sendMsg(Msg_Type::QUIT);
if(sockfd>0)close(sockfd);
}
void client::run()
{
// 1. 创建 epoll 实例
int epfd = epoll_create(1);
if (epfd == -1) {
ERR_LOG("epoll_create error");
return;
}
struct epoll_event ev, events[2];
// 2. 将标准输入 (stdin) 加入 epoll 监听 —— 用于读取用户输入
ev.events = EPOLLIN;
ev.data.fd = STDIN_FILENO;
epoll_ctl(epfd, EPOLL_CTL_ADD, STDIN_FILENO, &ev);
// 3. 将服务器套接字 (sockfd) 加入 epoll 监听 —— 用于接收消息
ev.events = EPOLLIN;
ev.data.fd = sockfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);
std::string text;
running=true;
while (running)
{
// 4. 等待事件发生(阻塞)
int n = epoll_wait(epfd, events, 2, -1);
if (n < 0) {
if (errno == EINTR) continue;
ERR_LOG("epoll_wait error");
break;
}
for (int i = 0; i < n; i++)
{
int fd = events[i].data.fd;
// 情况 A:标准输入有数据(用户打了字)
if (fd == STDIN_FILENO)
{
if (!std::getline(std::cin, text)) break;
if (text == "quit") {
running=false;
break;
}
// 发送消息
sendMsg(Msg_Type::CHAT, text);
}
// 情况 B:套接字有数据(服务器发来了消息)
else if (fd == sockfd)
{
recvMsg();
}
}
}
close(epfd);
}
void client::recvMsg(){
char buf[BUF_SIZE];
memset(buf,0,BUF_SIZE);
int recv_len=recv(sockfd,buf,BUF_SIZE,0);
//std::cout<<"recv_len is "<<recv_len<<std::endl;
if(recv_len<=0){
ERR_LOG("recv error");
running=false;
return;
}
Msg msg;
msg.ParseFromArray(buf,recv_len);
std::cout<<msg.name()<<": "<<msg.data()<<std::endl;
}
void client::sendMsg(Msg_Type type,const std::string&data){
Msg msg;
msg.set_data(data.c_str());
msg.set_type((int)type);
msg.set_name(name.c_str());
//序列化
std::string output;
msg.SerializeToString(&output);
if(send(sockfd,output.c_str(),output.size(),0)<0){
ERR_LOG("send error");
running=false;
return;
}
}
#include"client.h"
int main(int argc,const char*argv[]){
if(argc<4){
std::cout<<"please input the ip and the port of the server,and your name"<<std::endl;
return -1;
}
try{
std::string ip=argv[1];
int port=atoi(argv[2]);
std::string name=argv[3];
client s(ip,port,name);
s.run();
}catch(const std::exception&e){
std::cerr<<e.what()<<std::endl;
return -1;
}
return 0;
}
cmake_minimum_required(VERSION 3.20)
project(CHATROOM_CLIENT CXX)
find_package(Threads REQUIRED)
find_package(Protobuf REQUIRED)
include_directories(${CMAKE_SOURCE_DIR}/include)
include_directories(${CMAKE_CURRENT_BINARY_DIR})
protobuf_generate_cpp(PROTO_SRCS PROTO_HADS msg.proto)
add_executable(client src/main.cpp src/client.cpp ${PROTO_SRCS})
target_link_libraries(client PUBLIC Threads::Threads ${Protobuf_LIBRARIES})
syntax = "proto3";
message Msg {
int32 type=1;
bytes name=2;
bytes data=3;
}
注意的点
Protobuf
该项目中采用protobuf来进行消息传输的序列化和反序列化,具体的消息定义如下:
message Msg {
//消息类型
int32 type=1;
//发送消息的用户名
bytes name=2;
//消息正文
bytes data=3;
}
我们的消息类型的定义在.h文件中,具体消息类型如下:
enum class Msg_Type{
LOGIN=1,
CHAT=2,
QUIT=3
};
事实上,消息类型的定义也可以在.proto文件中,因为protobuf本身支持枚举类型。
在项目中有以下几点使用时机:
- 客户端向服务器端发送消息:组装消息类,并序列化后发送消息
- 服务器端接受客户端消息:反序列化消息,判断消息类型并读取消息数据
- 服务端端转发客户消息:将加工后的消息(已序列化)发送给其他客户端
- 客户端接受服务器端的消息:反序列化消息,在终端输出消息数据
在 cmake 中需要加入 protobuf 的处理:
- 查找依赖包
- 自动生成 protobuf 源文件
- 设置头文件搜索路径
- 生成可执行文件时编译 protobuf 源文件
- 动态链接 protobuf 库
# 1. 查找依赖包
find_package(Protobuf REQUIRED)
# 2. 自动生成 Protobuf 源代码
# 注意顺序:第一个是生成的 .cc 列表,第二个是生成的 .h 列表
protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS msg.proto)
# 3. 设置头文件搜索路径
# 必须包含 CMAKE_CURRENT_BINARY_DIR,因为生成的 msg.pb.h 在这里
include_directories(${CMAKE_CURRENT_BINARY_DIR})
# 4. 生成可执行程序
# 必须把生成的 ${PROTO_SRCS} 丢进去一起编译
add_executable(server
src/main.cpp
src/server.cpp
${PROTO_SRCS}
)
# 5. 链接库
# 使用官方推荐的变量名
target_link_libraries(server
PUBLIC
${Protobuf_LIBRARIES}
)
牢记TCP通信流程

线程池的使用
在服务器端中使用线程池来处理并发客户端。
客户端的存储容器
该项目中使用 std::map<int,struct sockaddr_in>类型的容器存储客户端,其原理是数据结构中的红黑树,插入、查询和删除的效率都较适中,其他容器只要能实现插入、查询和删除均可。
在该类型中以客户端文件描述符作为键值,容器内客户端的排序以键值来作为排序标准,默认为升序。
//删除客户端
for(auto&[fd,cli]:clients){
if(fd==client_fd){
clients.erase(fd);
break;
}
}
//把新连接的客户端放入到服务器端的客户端容器中
clients.emplace(client_fd,cin);
for(auto&[fd,cli]:clients){
if(fd!=exclude_fd){
if(send(fd,output.c_str(),output.size(),0)<0){
ERR_LOG("send error");
break;
}
}
}
IO 多路复用(epoll)
在客户端中需要并发处理两个任务:
- 从终端读取数据,发送消息
- 接受服务器端消息,输出在终端
一种方法是:创建一个子线程处理其中一个任务,另一个任务主线程处理。
但考虑到客户端处理任务复杂度并不高,可以考虑采用 IO 多路复用进行非阻塞处理任务,效率较高。
IO 多路复用常见的有三种:
- select 机制
- poll 机制
- epoll 机制
这里使用** epoll 机制,**具体处理流程:
// 1. 初始化阶段:创建“监控大厅”
int epoll_fd = epoll_create();
// 2. 注册阶段:把关心的“目标”和“事件”加入名单
for (auto& target : watch_list) {
epoll_event ev;
ev.events = EPOLLIN; // 监控“可读”事件
ev.data.fd = target.fd; // 绑定文件描述符
epoll_ctl(epoll_fd, ADD, target.fd, &ev);
}
// 3. 循环监听阶段
while (is_running) {
epoll_event active_events[MAX_EVENTS];
// 阻塞等待,直到有 fd 触发了事件
int count = epoll_wait(epoll_fd, active_events, MAX_EVENTS, -1);
// 4. 事件分发处理阶段
for (int i = 0; i < count; ++i) {
int fd = active_events[i].data.fd;
uint32_t mode = active_events[i].events;
if (mode & EPOLLIN) {
// 根据 fd 的类型执行对应的逻辑
handle_read_event(fd);
} else if (mode & EPOLLOUT) {
handle_write_event(fd);
} else if (mode & EPOLLERR) {
handle_error(fd);
}
}
}
// 5. 资源销毁
close(epoll_fd);
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)