Linux下套接字详解(九)---poll模式下的IO多路复用服务器
参照
poll调用深入解析-从poll的实现来讲poll多路复用模型,很有深度
poll多路复用
poll的机制与select类似,与select在本质上没有多大差别,管理多个描述符也是进行轮询,根据描述符的状态进行处理,但是poll没有最大文件描述符数量的限制。
poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。
poll编程模型
函数原型
函数格式如下所示:
# include <poll.h>
int poll ( struct pollfd * fds, unsigned int nfds, int timeout);
参数说明
fds
是一个struct pollfd结构类型的数组,用于存放需要检测其状态的Socket描述符;每当调用这个函数之后,系统不会清空这个数组,操作起来比较方便;特别是对于socket连接比较多的情况下,在一定程度上可以提高处理的效率;这一点与select()函数不同,调用select()函数之后,select()函数会清空它所检测的socket描述符集合,导致每次调用select()之前都必须把socket描述符重新加入到待检测的集合中;因此,select()函数适合于只检测一个socket描述符的情况,而poll()函数适合于大量socket描述符的情况;
nfds
nfds_t类型的参数,用于标记数组fds中的结构体元素的总数量;
timeout
是poll函数调用阻塞的时间,单位:毫秒;
和 select 一样,最后一个参数 timeout 指定 poll() 将在超时前等待一个事件多长事件。这里有 3 种情况:
timeout 为 -1
这会造成 poll 永远等待。poll() 只有在一个描述符就绪时返回,或者在调用进程捕捉到信号时返回(在这里,poll 返回 -1),并且设置 errno 值为 EINTR 。-1 可以用宏定义常量 INFTIM 来代替(在 pth.h 中有定义) 。timeout 等于0
在这种情况下,测试所有的描述符,并且 poll() 立刻返回。这允许在 poll 中没有阻塞的情况下找出多个文件描述符的状态。time > 0
这将以毫秒为单位指定 timeout 的超时周期。poll() 只有在超时到期时返回,除非一个描述符变为就绪,在这种情况下,它立刻返回。如果超时周期到齐,poll() 返回 0。这里也可能会因为某个信号而中断该等待。
和 select 一样,文件描述符是否阻塞对 poll 是否阻塞没有任何影响。
返回值和错误代码
成功时,poll()返回结构体中revents域不为0的文件描述符个数;如果在超时前没有任何事件发生,poll()返回0;失败时,poll()返回-1
>0
数组fds中准备好读、写或出错状态的那些socket描述符的总数量;
==0
数组fds中没有任何socket描述符准备好读、写,或出错;此时poll超时,超时时间是timeout毫秒;换句话说,如果所检测的socket描述符上没有任何事件发生的话,那么poll()函数会阻塞timeout所指定的毫秒时间长度之后返回,如果timeout==0,那么poll() 函数立即返回而不阻塞,如果timeout==INFTIM,那么poll() 函数会一直阻塞下去,直到所检测的socket描述符上的感兴趣的事件发生是才返回,如果感兴趣的事件永远不发生,那么poll()就会永远阻塞下去;
-1
poll函数调用失败,同时会自动设置全局变量errno为下列值之一
errno | 描述 |
---|---|
EBADF | 一个或多个结构体中指定的文件描述符无效 |
EFAULTfds | 指针指向的地址超出进程的地址空间 |
EINTR | 请求的事件之前产生一个信号,调用可以重新发起 |
EINVALnfds | 参数超出PLIMIT_NOFILE值 |
ENOMEM | 可用内存不足,无法完成请求 |
pollfd结构体
struct pollfd
{
int fd; /* 文件描述符 */
short events; /* 等待的事件 */
short revents; /* 实际发生了的事件 */
} ;
fd 成员表示感兴趣的,且打开了的文件描述符;
events 成员是位掩码,用于指定针对这个文件描述符感兴趣的事件;
revents 成员是位掩码,用于指定当 poll 返回时,在该文件描述符上已经发生了哪些事情。
每一个pollfd结构体指定了一个被监视的文件描述符,可以传递多个结构体,指示poll()监视多个文件描述符。
事件
在poll返回时,我们可以检查revents中的标志,对应于文件描述符请求的events结构体。如果POLLIN事件被设置,则文件描述符可以被读取而不阻塞。
如果POLLOUT被设置,则文件描述符可以写入而不导致阻塞。这些标志并不是互斥的:它们可能被同时设置,表示这个文件描述符的读取和写入操作都会正常返回而不阻塞。
timeout参数指定等待的毫秒数,无论I/O是否准备好,poll都会返回。timeout指定为负数值表示无限超时,使poll()一直挂起直到一个指定事件发生;timeout为0指示poll调用立即返回并列出准备好I/O的文件描述符,但并不等待其它的事件。这种情况下,poll()就像它的名字那样,一旦选举出来,立即返回。
event注册的事件,通过revents返回
每个结构体的events域是监视该文件描述符的事件掩码,由用户来设置这个域。
revents域是文件描述符的操作结果事件掩码,内核在调用返回时设置这个域。
events域中请求的任何事件都可能在revents域中返回。
例如fds[0].events = POLLIN; /将测试条件设置成普通或优先级带数据可读/
然后 int pollresult = poll(fds,xx,xx); //这样就可以监听fds里面文件描述符了,当满足特定条件就返回,并将结果保存在revents中。
事件描述符概述
合法的事件如下:
事件 | 描述 |
---|---|
POLLIN | 有数据可读 |
POLLRDNORM | 有普通数据可读 |
POLLRDBAND | 有优先数据可读。 |
POLLPRI | 有紧迫数据可读。 |
POLLOUT | 写数据不会导致阻塞 |
POLLWRNORM | 写普通数据不会导致阻塞 |
POLLWRBAND | 写优先数据不会导致阻塞 |
POLLMSGSIGPOLL | 消息可用。 |
此外,revents域中还可能返回下列事件:
事件 | 描述 |
---|---|
POLLER | 指定的文件描述符发生错误 |
POLLHUP | 指定的文件描述符挂起事件 |
POLLNVAL | 指定的文件描述符非法 |
这些事件在events域中无意义,因为它们在合适的时候总是会从revents中返回。
事件使用技巧
- POLLIN
events 中使用该宏常数,能够在折本文件的可读情况下,结束 poll() 函数。相反,revents 上使用该宏常数,在检查 poll() 函数结束后,可依此判断设备文件是否处于可读状态(即使消息长度是 0)。
- POLLPRI
在 events 域中使用该宏常数,能够在设备文件的高优先级数据读取状态下,结束 poll() 函数。相反,revents 上使用该宏常数,在检查 poll() 函数结束后,可依此判断设备文件是否处于可读高优先级数据的状态(即使消息长度是 0)。该宏常数用于处理网络信息包(packet) 的数据传递。
- POLLOUT
在 events 域中使用该宏常数,能够在设备文件的写入状态下,结束 poll() 函数。相反,revents 域上使用该宏常数,在检查 poll() 结束后,可依此判断设备文件是否处于可写状态。
- POLLERR
在 events 域中使用该宏常数,能够在设备文件上发生错误时,结束 poll() 函数。相反,revents 域上使用该宏函数,在检查 poll() 函数结束后,可依此判断设备文件是否出错。
- POLLHUP
在 events域中使用该宏常数,能够在设备文件中发生 hungup 时,结束 poll() 函数 。相反,在检查 poll() 结束后,可依此判断设备文件是否发生 hungup 。
- POLLNVAL
在 events 域中使用该宏函数,能够在文件描述符的值无效时,结束 poll() 。相反,在 revents 域上使用该宏函数时,在检查 poll() 函数后,文件描述符是否有效。可用于处理网络信息时,检查 socket handler 是否已经无效。
poll与select的区别与联系
使用poll()和select()不一样,你不需要显式地请求异常情况报告。
POLLIN | POLLPRI等价于select()的读事件,
POLLOUT |POLLWRBAND等价于select()的写事件。
POLLIN等价于POLLRDNORM |POLLRDBAND,
而POLLOUT则等价于POLLWRNORM。
例如,要同时监视一个文件描述符是否可读和可写,我们可以设置 events为POLLIN |POLLOUT。
示例
poll的本质是轮训,就是监听我们所有的文件描述符的所注册的事件,当有事件请求时,poll返回,然后我们轮询所有的描述符,找到有时间请求的那个即可
服务器server
#define _GNU_SOURCE 1
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <poll.h>
#include <fcntl.h>
#include <sys/poll.h>
#define USER_LIMIT 5
#define BUFFER_SIZE 64
#define FD_LIMIT 65535
typedef struct client_data // 客户端的数据结构
{
struct sockaddr_in address; //
char* write_buf; // 发送数据缓冲区
char buf[ BUFFER_SIZE ]; // 接收数据缓冲区
}client_data;
int setnonblocking( int fd )
{
int old_option = fcntl( fd, F_GETFL );
int new_option = old_option | O_NONBLOCK;
fcntl( fd, F_SETFL, new_option );
return old_option;
}
int main( int argc, char* argv[] )
{
if( argc <= 2 )
{
printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
return 1;
}
const char* ip = argv[1];
int port = atoi( argv[2] );
int listenfd;
int ret = 0;
struct sockaddr_in address;
client_data *users = NULL;
bzero( &address, sizeof( address ) );
address.sin_family = AF_INET;
inet_pton( AF_INET, ip, &address.sin_addr );
address.sin_port = htons( port );
//
// 创建服务器的监听套接字
//
if( ( listenfd = socket( PF_INET, SOCK_STREAM, 0 ) ) < 0)
{
perror("create socket error...\n");
exit(-1);
}
else
{
printf("create socket success...\n");
}
//
// 命名服务器的监听套接字
//
if((ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address))) < 0 )
{
perror("bind socket error...\n");
exit(-1);
}
else
{
printf("bind socket success...\n");
}
if((ret = listen(listenfd, 5)) < 0)
{
perror("listen error...\n");
}
else
{
printf("start listen...\n");
}
assert( ret != -1 );
if((users = (client_data *)malloc(sizeof(client_data) * FD_LIMIT)) == NULL)
{
perror("malloc client_data error...");
}
else
{
printf("malloc client_data success...");
}
struct pollfd fds[USER_LIMIT + 1];
/* Data structure describing a polling request.
struct pollfd
{
int fd; poll 的文件描述符.
short int events; fd 上感兴趣的事件(等待的事件).
short int revents; fd 上实际发生的事件.
};
*/
/// 初始化poll的
int user_counter = 0;
for( int i = 1; i <= USER_LIMIT; ++i )
{
fds[i].fd = -1;
fds[i].events = 0;
}
fds[0].fd = listenfd;
fds[0].events = POLLIN | POLLERR; // POLLIN表示有数据可读, POLLERR表示出错
fds[0].revents = 0;
while( 1 )
{
ret = poll( fds, // 准备轮训的套接字文件描述符
user_counter + 1, //
-1); // poll 永远等待。poll() 只有在一个描述符就绪时返回,或者在调用进程捕捉到信号时返回
if ( ret < 0 )
{
printf( "poll failure\n" );
break;
}
///
/// poll模型的本质就是轮训, 在pull返回时,轮询所有的文件描述符, 查找到有事情请求的那个文件
///
for( int i = 0; i < user_counter + 1; ++i )
{
if((fds[i].fd == listenfd) /* 监听的是服务器套接字, 此时如果有数据可读,说明有客户端请求链接*/
&& (fds[i].revents & POLLIN)) /* 有数据可读取 */
{
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof( client_address );
// 开始接收客户端的链接
int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
if ( connfd < 0 )
{
printf( "errno is: %d\n", errno );
continue;
}
if( user_counter >= USER_LIMIT )
{
const char* info = "too many users\n";
printf( "%s", info );
send( connfd, info, strlen( info ), 0 );
close( connfd );
continue;
}
user_counter++;
users[connfd].address = client_address;
setnonblocking( connfd );
fds[user_counter].fd = connfd;
fds[user_counter].events = POLLIN | POLLRDHUP | POLLERR;
fds[user_counter].revents = 0;
printf( "comes a new user, now have %d users\n", user_counter );
}
else if( fds[i].revents & POLLERR ) // 数据出错
{
printf( "get an error from %d\n", fds[i].fd );
char errors[ 100 ];
memset( errors, '\0', 100 );
socklen_t length = sizeof( errors );
if( getsockopt( fds[i].fd, SOL_SOCKET, SO_ERROR, &errors, &length ) < 0 )
{
printf( "get socket option failed\n" );
}
continue;
}
else if( fds[i].revents & POLLRDHUP ) // 被挂起---断开
{
users[fds[i].fd] = users[fds[user_counter].fd];
close( fds[i].fd );
fds[i] = fds[user_counter];
i--;
user_counter--;
printf( "a client left\n" );
}
else if( fds[i].revents & POLLIN ) // 客户端套接字有数据可写
{
int connfd = fds[i].fd;
memset( users[connfd].buf, '\0', BUFFER_SIZE );
ret = recv( connfd, users[connfd].buf, BUFFER_SIZE-1, 0 );
printf( "get %d bytes of client data %s from %d\n", ret, users[connfd].buf, connfd );
if( ret < 0 )
{
if( errno != EAGAIN )
{
close( connfd );
users[fds[i].fd] = users[fds[user_counter].fd];
fds[i] = fds[user_counter];
i--;
user_counter--;
}
}
else if( ret == 0 )
{
printf( "code should not come to here\n" );
}
else
{
for( int j = 1; j <= user_counter; ++j )
{
if( fds[j].fd == connfd )
{
continue;
}
fds[j].events |= ~POLLIN;
fds[j].events |= POLLOUT;
users[fds[j].fd].write_buf = users[connfd].buf;
}
}
}
else if( fds[i].revents & POLLOUT ) // 服务器向外发送数据
{
int connfd = fds[i].fd;
if( ! users[connfd].write_buf )
{
continue;
}
ret = send( connfd, users[connfd].write_buf, strlen( users[connfd].write_buf ), 0 );
users[connfd].write_buf = NULL;
fds[i].events |= ~POLLOUT;
fds[i].events |= POLLIN;
}
}
}
free(users);
close( listenfd );
return 0;
}
服务器client
#define _GNU_SOURCE 1
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <poll.h>
#include <fcntl.h>
#define BUFFER_SIZE 64
int main( int argc, char* argv[] )
{
if( argc <= 2 )
{
printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
return 1;
}
const char* ip = argv[1];
int port = atoi( argv[2] );
struct sockaddr_in server_address;
bzero( &server_address, sizeof( server_address ) );
server_address.sin_family = AF_INET;
inet_pton( AF_INET, ip, &server_address.sin_addr );
server_address.sin_port = htons( port );
int sockfd = socket( PF_INET, SOCK_STREAM, 0 );
assert( sockfd >= 0 );
if ( connect( sockfd, ( struct sockaddr* )&server_address, sizeof( server_address ) ) < 0 )
{
printf( "connection failed\n" );
close( sockfd );
return 1;
}
struct pollfd fds[2];
// 添加标准输入
fds[0].fd = STDIN_FILENO;
fds[0].events = POLLIN;
fds[0].revents = 0;
// 添加套接字描述符
fds[1].fd = sockfd;
fds[1].events = POLLIN | POLLRDHUP;
fds[1].revents = 0;
char read_buf[BUFFER_SIZE];
int pipefd[2];
int ret = pipe( pipefd );
assert( ret != -1 );
while( 1 )
{
ret = poll( fds, 2, -1 );
if( ret < 0 )
{
printf( "poll failure\n" );
break;
}
if( fds[1].revents & POLLRDHUP )
{
printf( "server close the connection\n" );
break;
}
else if( fds[1].revents & POLLIN )
{
memset( read_buf, '\0', BUFFER_SIZE );
recv( fds[1].fd, read_buf, BUFFER_SIZE-1, 0 );
printf( "%s\n", read_buf );
}
if( fds[0].revents & POLLIN )
{
ret = splice( 0, NULL, pipefd[1], NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE );
ret = splice( pipefd[0], NULL, sockfd, NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE );
}
}
close( sockfd );
return 0;
}
更多推荐
所有评论(0)