基于TCP的socket编程

我们首先来看一下基于TCP协议的Socket程序的调用过程。

服务器的程序要先跑起来,然后等待客户端的连接和数据,我们先来看看服务端的 Socket 编程过程是怎样的。

服务端首先调用 socket() 函数,创建网络协议为 IPv4,以及传输协议为 TCP 的 Socket ,接着调用 bind() 函数,给这个 Socket 绑定一个 IP 地址和端口,绑定这两个的目的是什么?

  • 绑定端口的目的:当内核收到 TCP 报文,通过 TCP 头里面的端口号,来找到我们的应用程序,然后把数据传递给我们。
  • 绑定 IP 地址的目的:一台机器是可以有多个网卡的,每个网卡都有对应的 IP 地址,当绑定一个网卡时,内核在收到该网卡上的包,才会发给我们;

绑定完 IP 地址和端口后,就可以调用 listen() 函数进行监听,此时对应 TCP 状态图中的 listen,如果我们要判定服务器中一个网络程序有没有启动,可以通过 netstat 命令查看对应的端口号是否有被监听。

服务端进入了监听状态后,通过调用 accept() 函数,来从内核获取客户端的连接,如果没有客户端连接,则会阻塞等待客户端连接的到来。

那客户端是怎么发起连接的呢?客户端在创建好 Socket 后,调用 connect() 函数发起连接,该函数的参数要指明服务端的 IP 地址和端口号,然后万众期待的 TCP 三次握手就开始了。

在 TCP 连接的过程中,服务器的内核实际上为每个 Socket 维护了两个队列:

  • 一个是「还没完全建立」连接的队列,称为 TCP 半连接队列,这个队列都是没有完成三次握手的连接,此时服务端处于 syn_rcvd 的状态;
  • 一个是「已经建立」连接的队列,称为 TCP 全连接队列,这个队列都是完成了三次握手的连接,此时服务端处于 established 状态;

当 TCP 全连接队列不为空后,服务端的 accept() 函数,就会从内核中的 TCP 全连接队列里拿出一个已经完成连接的 Socket 返回应用程序,后续数据传输都用这个 Socket。

注意,监听的 Socket 和真正用来传数据的 Socket 是两个:

  • 一个叫作监听 Socket
  • 一个叫作已连接 Socket

连接建立后,客户端和服务端就开始相互传输数据了,双方都可以通过 read() 和 write() 函数来读写数据。

多进程模型

基于最原始的阻塞网络 I/O, 如果服务器要支持多个客户端,其中比较传统的方式,就是使用多进程模型,也就是为每个客户端分配一个进程来处理请求。

主进程只负责监听,accept到新连接后,fork一个子进程去处理这个连接。主进程继续监听,子进程专心处理业务。

由于子进程会复制父进程的文件描述符,因此子进程可以直接使用已连接socket和客户端通信。父进程只需要将客户服务交给子进程来处理,不需要关心「已连接 Socket」,只需要关心「监听 Socket」。;子进程不需要关心「监听 Socket」,只需要关心「已连接 Socket」

当「子进程」退出时,实际上内核里还会保留该进程的一些信息,也是会占用内存的,如果不做好“回收”工作,就会变成僵尸进程,随着僵尸进程越多,会慢慢耗尽我们的系统资源。因此要调用wait() 和 waitpid() 函数来进行”善后“。

用多进程的方式来对付多个客户端连接的方式,对于少量的客户端连接是剋性的,但是由于每个进程都会占用一定的系统资源,而且

进程的上下文切换不仅包含了虚拟内存、栈、全局变量等用户空间的资源,还包括了内核堆栈、寄存器等内核空间的资源,因此不适用于大量客户端连接。

多线程模型

相较于进程,线程只是运行在进程中的一个执行流程,一个进程可以运行多个线程,且相同进程中的线程可以共享进程的部分资源,包括文件描述符列表等,因此他们的上下文切换要小得多。

如果每来一个连接就创建一个线程,线程运行完后,还得操作系统还得销毁线程,虽说线程切换的上写文开销不大,但是如果频繁创建和销毁线程,系统开销也是不小的。

那么,我们可以使用线程池的方式来避免线程的频繁创建和销毁,所谓的线程池,就是提前创建若干个线程,这样当由新连接建立时,将这个已连接的 Socket 放入到一个队列里,然后线程池里的线程负责从队列中取出「已连接 Socket 」进行处理。

需要注意的是,这个队列是全局的,每个线程都会操作,为了避免多线程竞争,线程在操作这个队列前要加锁。

但是当客户端连接高达C10K时,意味着要维护一万个线程,不太现实。

I/O多路复用

I/O复用是网络编程里的经典技术,本质就是用一个进程同时监控多个文件描述符,让它们谁有数据来了就处理谁,而不是傻等一个。

传统阻塞I/O的问题是:

  • 一个进程只能等一个连接
  • 如果这个连接没数据,线程就卡住了
  • 想处理1000个连接,就要开1000个线程?代价太大

我们熟悉的 select/poll/epoll 内核提供给用户态的多路复用系统调用,进程可以通过一个系统调用函数从内核中获取多个事件

1.select

select的底层实现是一个位图。

// 内核里的定义
typedef struct {
    unsigned long fds_bits[__FDSET_LONGS];
} fd_set;

本质

  • 一个很长的bit数组,每个bit对应一个文件描述符
  • fd=5,就把第5个bit置1
  • 最大1024个bit(默认FD_SETSIZE),所以最多监控1024个fd
fd_set readfds;   // 可读事件位图
fd_set writefds;  // 可写事件位图
fd_set exceptfds; // 异常事件位图

操作过程

  1. 用户态:用FD_SET宏设置要监控的fd
  2. 调用select整个位图(文件描述符)从用户态拷贝到内核态
  3. 内核:遍历所有bit,检查对应的fd是否有事件,将此Socket 标记为可读或可写
  4. 返回:修改后的位图再拷贝回用户态

这期间发送可 次「遍历」文件描述符集合,一次是在内核态里,一个次是在用户态里 ,而且还会发生 2 次「拷贝」文件描述符集合,先从用户空间传入内核空间,由内核修改后,再传出到用户空间中。

2.poll

poll的底层是数组。

struct pollfd {
    int fd;         // 文件描述符
    short events;   // 关注的事件(POLLIN、POLLOUT等)
    short revents;  // 返回的事件(内核填写)
};

// 调用方式
struct pollfd fds[1000];
int nfds = 1000;
poll(fds, nfds, timeout);

结构对比select

  • 没有fd数量硬限制了(数组多大你说了算)
  • 不用每次都重新设置位图,数组结构更清晰
  • 可以同时监控多个事件类型(用位掩码组合)

相较于select,poll 不再用 BitsMap 来存储所关注的文件描述符,取而代之用动态数组,以链表形式来组织,突破了 select 的文件描述符个数限制,当然还会受到系统文件描述符限制。

select和poll本质并没有太大的区别,都是通过线性结构存储进程关注的Socket集合,都需要遍历文件描述符集合来寻找可读或可写的socket,时间复杂度为o(n),而且也需要在用户态与内核态之间拷贝文件描述符集合,这种方式随着并发数上来,性能的损耗会呈指数级增长。

3.epoll

相较于select和poll,epoll把"遍历检查"变成"事件通知"

select/poll是被动轮询:"我来看看你们谁有数据了"(每次都问一遍)

epoll是主动通知:"你有数据了再叫我"(注册回调机制)

核心数据结构

epoll在内核里维护了三个关键结构:

1. 红黑树(存储所有注册的fd)

// 内核里的结构(简化)
struct eventpoll {
    struct rb_root rbr;          // 红黑树根节点,存所有监控的fd
    struct list_head rdllist;    // 就绪链表,存有事件的fd
    wait_queue_head_t wq;        // 等待队列,epoll_wait时进程睡在这
};

为什么用红黑树?

  • 增删改查都是O(log n)
  • 平衡二叉树,不会退化成链表
  • 支持范围查询和遍历

每个注册的fd对应一个epitem结构:

struct epitem {
    struct rb_node rbn;          // 红黑树节点
    struct list_head rdllink;    // 就绪链表节点
    struct epoll_filefd ffd;     // 存储fd和file指针
    struct eventpoll *ep;        // 所属的epoll实例
    struct epoll_event event;    // 关注的事件类型
    wait_queue_t wait;           // 等待队列项
};

2. 就绪链表(存储有事件的fd,事件驱动机制)

这是个双向链表,所有就绪的fd会被链在这里。当某个 socket 有事件发生时,通过回调函数内核会将其加入到这个就绪事件列表中,当用户调用 epoll_wait() 函数时,只会返回有事件发生的文件描述符的个数,不需要像 select/poll 那样轮询扫描整个 socket 集合,大大提高了检测的效率。

3. 等待队列

epoll_wait时,如果就绪链表为空,当前进程就睡在这个队列上。

核心API

1. epoll_create1(int flags)

创建一个epoll实例:

int epfd = epoll_create1(0);  // 返回epoll文件描述符

2. epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

操作红黑树,增删改监控的fd:

struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;  // 可读事件 + 边缘触发
ev.data.fd = sockfd;

epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);   // 添加
epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);   // 修改
epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd, NULL);  // 删除

底层做了什么

  • EPOLL_CTL_ADD:在红黑树中插入一个节点,同时给这个socket注册回调函数
  • EPOLL_CTL_DEL:从红黑树删除节点,取消回调注册
  • EPOLL_CTL_MOD:修改节点的事件类型

关键点:只操作红黑树,不涉及遍历,所以是O(log n)。

3. epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)

等待事件发生:

struct epoll_event events[100];
int n = epoll_wait(epfd, events, 100, -1);  // 阻塞等待

for (int i = 0; i < n; i++) {
    int fd = events[i].data.fd;
    if (events[i].events & EPOLLIN) {
        // fd可读了
    }
}

底层做了什么

  1. 检查就绪链表是否为空
  2. 如果不为空,把链表中的事件拷贝到用户态的events数组
  3. 如果为空,当前进程加入等待队列,睡眠
  4. 被唤醒后,再检查就绪链表

底层工作原理(核心)

回调机制

每个socket(准确说是struct file)内部都有一个等待队列。当调用epoll_ctl(EPOLL_CTL_ADD)时:

// 伪代码
ep_insert(struct eventpoll *ep, struct epoll_event *event, int fd) {
    // 1. 创建epitem
    struct epitem *epi = kmalloc(sizeof(*epi));
    epi->ffd.fd = fd;
    epi->event = *event;
    
    // 2. 插入红黑树
    rb_insert(ep->rbr, epi);
    
    // 3. 注册回调函数(关键!)
    init_waitqueue_func_entry(&epi->wait, ep_poll_callback);
    add_wait_queue(fd的等待队列, &epi->wait);
}

回调函数ep_poll_callback做什么

int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key) {
    struct epitem *epi = container_of(wait, struct epitem, wait);
    struct eventpoll *ep = epi->ep;
    
    // 1. 把这个epitem加入就绪链表
    list_add_tail(&epi->rdllink, &ep->rdllist);
    
    // 2. 唤醒等待的进程(如果有进程在epoll_wait睡眠)
    wake_up(&ep->wq);
    
    return 1;
}

完整流程

注册阶段

epoll_ctl(ADD, sockfd)
    ↓
创建epitem,插入红黑树
    ↓
给sockfd注册回调函数 ep_poll_callback

事件发生时

网卡收到数据 → 硬件中断
    ↓
驱动程序处理,调用 sock_def_readable
    ↓
唤醒socket等待队列上的所有项
    ↓
触发 ep_poll_callback
    ↓
把epitem加入就绪链表,唤醒epoll_wait的进程

获取事件

epoll_wait()
    ↓
检查就绪链表
    ↓
如果不为空,拷贝事件到用户态
如果为空,睡眠等待

两种触发模式

LT(Level Triggered,水平触发)

默认模式,特点:

  • 只要socket可读,每次epoll_wait都会返回
  • 不用一次性读完,没读完下次还会通知
  • 编程简单,不容易出错

实现原理

epoll_wait返回时:
    检查就绪链表 → 拷贝事件 → 返回
    
epoll_wait下次调用时:
    如果socket还有数据可读 → 再次加入就绪链表 → 再次返回
ET(Edge Triggered,边缘触发)

高性能模式,特点:

  • 只在状态变化时通知一次(从不可读变成可读)
  • 必须一次性读完所有数据(用循环read直到EAGAIN)
  • 编程难度大,但性能最高
  • 一般和非阻塞 I/O 搭配使用

实现原理

epoll_wait返回时:
    检查就绪链表 → 拷贝事件 → 从就绪链表移除 → 返回
    
后续:
    即使socket还有数据,也不再加入就绪链表
    直到又有新数据到达,状态再次变化

ET模式必须循环读取

while (1) {
    int n = read(fd, buf, sizeof(buf));
    if (n == -1) {
        if (errno == EAGAIN) {
            // 读完了,没问题
            break;
        } else {
            // 真正的错误
            perror("read");
            break;
        }
    } else if (n == 0) {
        // 对方关闭连接
        close(fd);
        break;
    }
    // 处理数据
}

epoll为什么快?

1. 避免了拷贝

select/poll:每次调用都要把所有fd从用户态拷贝到内核态

epoll

  • epoll_ctl时通过mmap建立共享内存
  • 内核和用户态共享eventpoll结构
  • epoll_wait只拷贝就绪的事件,而不是所有fd
2. 避免了遍历

select/poll:O(n)遍历所有fd

epoll

  • O(1)检查就绪链表
  • 只返回有事件的fd
  • 10万连接只有10个活跃?只处理这10个
3. 避免了重复注册

select/poll:每次都要重新设置要监控的fd

epoll

  • 红黑树持久化存储
  • epoll_ctl一次注册,永久生效(除非删除)

性能对比

连接数 select/poll epoll
100
1000
10000 很慢
100000 卡死

完整代码示例

#include <sys/epoll.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>

#define MAX_EVENTS 100

int main() {
    int listenfd = socket(...);
    bind(listenfd, ...);
    listen(listenfd, ...);
    
    // 创建epoll实例
    int epfd = epoll_create1(0);
    
    // 添加监听socket
    struct epoll_event ev, events[MAX_EVENTS];
    ev.events = EPOLLIN;
    ev.data.fd = listenfd;
    epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);
    
    // 事件循环
    while (1) {
        int n = epoll_wait(epfd, events, MAX_EVENTS, -1);
        
        for (int i = 0; i < n; i++) {
            if (events[i].data.fd == listenfd) {
                // 新连接
                int connfd = accept(listenfd, NULL, NULL);
                setnonblocking(connfd);  // 重要!ET模式必须非阻塞
                
                ev.events = EPOLLIN | EPOLLET;  // ET模式
                ev.data.fd = connfd;
                epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);
            } else {
                // 已连接socket有数据
                int fd = events[i].data.fd;
                handle_request(fd);  // 处理请求
            }
        }
    }
}

// 设置非阻塞
void setnonblocking(int fd) {
    int flags = fcntl(fd, F_GETFL);
    fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}

// ET模式必须循环读
void handle_request(int fd) {
    char buf[1024];
    while (1) {
        int n = read(fd, buf, sizeof(buf));
        if (n == -1) {
            if (errno == EAGAIN) break;  // 读完了
            else { close(fd); break; }
        } else if (n == 0) {
            close(fd);  // 对方关闭
            break;
        }
        // 处理数据...
    }
}

注意事项

  1. ET模式必须用非阻塞socket,否则read会阻塞
  2. 不要频繁epoll_ctl,虽然O(log n)但也有开销
  3. EPOLLONESHOT:处理完一个事件后,需要重新arm
  4. 惊群问题:多线程epoll_wait同一个fd,要用EPOLLEXCLUSIVE(Linux 4.5+)
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐