结构说明:以fd作为索引,存放在block中;当一个fd到来时,通过fd/MAX先找到fd对应的block号,再通过fd%MAX找到对应的偏移地址。例如来了个fd=10000,每个块存放的最大item数量MAX=1024,那么fd对应的block序号等于10000/1024=9;偏移量等于10000%1024=784。这样就可以找到fd对应的数据存放地址item。

数据结构的代码实现如下:

代码语言:javascript

AI代码解释

struct ntyevnt{
   
    int fd;//事件fd
    char buffer[BUFFER_LENGTH];//缓冲区
    int length;//缓存长度
    int status;//状态

    int events;//事件
    void *arg;//callback的参数
    int(*callback)(int fd, int events, void* arg);//回调函数
};
struct eventblock{
   
    struct *sock_items;//事件集合
    struct eventblock *next;//指向下一个内存块
};
struct reactor{
   
    int epfd;//epoll的文件描述符
    int blkcnt;//事件块的数量
    struct eventblock *evtblk;//事件块的起始地址
};

step 2:实现Reactor容器初始化功能

我们这里使用epoll作为IO多路复用器。 思路:初始化reactor内存块,避免脏数据;创建events和block并初始化,将events添加到block中,将block添加到reactor的链表中管理。

代码语言:javascript

AI代码解释

int ntyreactor_init(struct ntyreactor *reactor)
{
   
    if (reactor == NULL)
        return -1;
    memset(reactor, 0, sizeof(struct ntyreactor));
    //创建epoll,作为IO多路复用器
    reactor->epfd = epoll_create(1);
    if (reactor->epfd <= 0)
    {
   
        printf("create epfd in %s error %s\n", __func__, strerror(errno));
        return -2;
    }

    // 创建事件集
    struct ntyevnt *events = (struct ntyevnt *)malloc(MAX_EPOLL_EVENTS * sizeof(struct ntyevnt));
    if (events == NULL)
    {
   
        printf("create ntyevnt in %s error %s\n", __func__, strerror(errno));
        close(reactor->epfd);
        return -3;
    }
    memset(events, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));

    //创建事件内存块
    struct eventblock *block = (struct eventblock*)malloc(sizeof(struct eventblock));
    if (block == NULL)
    {
   
        printf("create eventblock in %s error %s\n", __func__, strerror(errno));
        free(events);
        close(reactor->epfd);
        return -4;
    }
    block->events = events;
    block->next = NULL;

    // reactor初始化赋值
    reactor->evblks=block;
    reactor->blkcnt = 1;

    return 0;
}

step 3:实现socket初始化功能

定义成一个函数,方便初始化多个监听端口。

代码语言:javascript

AI代码解释

int init_sock(short port)
{
   
    int ret = 0;
    int fd = socket(AF_INET, SOCK_STREAM, 0);//创建套字接
    if (fd == -1)
    {
   
        printf("create socket in %s error %s\n", __func__, strerror(errno));
        return -1;
    }
    ret=fcntl(fd, F_SETFL, O_NONBLOCK);//设置非阻塞
    if (ret == -1)
    {
   
        printf("fcntl O_NONBLOCK in %s error %s\n", __func__, strerror(errno));
        return -1;
    }

    // 设置属性
    struct sockaddr_in server_addr;
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;// IPV4
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    server_addr.sin_port = htons(port);

    // 绑定
    ret = bind(fd, (struct sockaddr*)&server_addr, sizeof(server_addr));
    if (ret == -1)
    {
   
        printf("bind() in %s error %s\n", __func__, strerror(errno));
        return -1;
    }

    //监听
    ret = listen(fd, 20);
    if (ret < 0)
    {
   
        printf("listen failed : %s\n", strerror(errno));
        return -1;
    }

    printf("listen server port : %d\n", port);

    return fd;
}

step 4:实现Reactor动态扩容功能

为了实现高并发,服务器需要监听多个端口。当高并发时需要reactor容器进行扩容管理。 核心思路:找到链表的末端,分别为events和block分配内存并初始化,将events添加到block中,将block添加到reactor的链表中管理。

代码语言:javascript

AI代码解释

int ntyreactor_alloc(struct ntyreactor *reactor)
{
   
    if (reactor == NULL)
        return -1;
    if (reactor->evblks == NULL)
        return -1;

    //找到链表末端
    struct eventblock *blk = reactor->evblks;
    while (blk->next != NULL)
        blk = blk->next;

    // 创建事件集
    struct ntyevent *evs = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
    if (evs == NULL)
    {
   
        printf("ntyreactor_alloc ntyevent failed\n");
        return -2;
    }
    memset(evs, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));

    // 创建事件块
    struct eventblock *block = (struct eventblock*)malloc(sizeof(struct eventblock));
    if (block == NULL)
    {
   
        printf("ntyreactor_alloc eventblock failed\n");
        return -3;
    }
    block->events = evs;
    block->next = NULL;

    //实现扩容
    blk->next = block;
    reactor->blkcnt++;

    return 0;
}

step 5:实现Reactor索引功能

思路:通过fd/MAX先找到fd对应的block号,再通过fd%MAX找到对应的偏移地址。 例如来了个fd=10000,每个块存放的最大item数量MAX=1024,那么fd对应的block序号等于10000/1024=9;偏移量等于10000%1024=784。这样就可以找到fd对应的数据存放地址item。

代码语言:javascript

AI代码解释

struct ntyevent *ntyreactor_idx(struct ntyreactor *reactor, int sockfd)
{
   
    if (reactor == NULL)
        return NULL;
    if (reactor->evblks == NULL)
        return NULL;
    // fd所在block序号
    int blkidx = sockfd / MAX_EPOLL_EVENTS;
    while (blkidx >= reactor->blkcnt)
    {
   
        // 扩容
        ntyreactor_alloc(reactor);
    }

    //找到fd对应block的位置
    int i = 0;
    struct eventblock *blk = reactor->evblks;
    while (i++ != blkidx && blk != NULL)
    {
   
        blk = blk->next;
    }

    // 返回item 地址
    return &blk->events[sockfd%MAX_EPOLL_EVENTS];
}

step 6:实现设置事件信息功能

将事件的相关信息保存到数据结构中。主要实现填充关键信息到event结构体中。

代码语言:javascript

AI代码解释

void nty_event_set(struct ntyevent *ev,int fd,NCALLBACK callback,void *arg)
{
   
    ev->fd = fd;
    ev->events = 0;
    ev->callback = callback;
    ev->arg = arg;
}

step 7:实现IO事件监听功能

这里使用epoll作为IO多路复用器,将事件添加到epoll中监听。 思路:主要是epoll_ctl操作,将事件添加到reactor的event结构体中。

代码语言:javascript

AI代码解释

int nty_event_add(int epfd, int events, struct ntyevent *ev)
{
   
    // 设置epoll事件信息
    struct epoll_event ep_ev = {
    0,{
   0} };
    ep_ev.data.ptr = ev;
    ep_ev.events = ev->events = events;

    // 判断,设置epfd的操作模式
    int op;
    if (ev->status == 1)
        op = EPOLL_CTL_MOD;
    else
    {
   
        op = EPOLL_CTL_ADD;
        ev->status = 1;
    }


    // 设置epoll
    int ret = epoll_ctl(epfd, op, ev->fd, &ep_ev);
    if (ret < 0)
    {
   
        printf("event add failed [fd=%d], events[%d],ret:%d\n", ev->fd, events,ret);
        printf("event add failed in %s error %s\n", __func__, strerror(errno));
        return -1;
    }

    return 0;
}

step 8:实现IO事件移除功能

由于设置了非阻塞模式,当事件到来时,需要暂时移除监听,避免干扰。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐