Linux程序开发经验总结
1. 进程和线程
1.1. 进程
进程终止
有8种方式使进程终止,其中5种为正常终止,3种异常终止方式:
1) 从main返回
2) 调用exit
3) 调用_exit或_Exit
4) 最后一个线程从其启动例程返回
5) 最后一个线程调用pthread_exit
6) 调用abort
7) 接到一个信号并终止
8) 最后一个线程对取消请求作出响应
程序存储空间布局
正文段:存放程序文本,可共享。
初始化数据段:出现在任何函数之外的声明,并且已经初始化。
未初始化数据段:出现在任何函数之外的声明,未初始化。
栈:自动变量和临时变量以及每次函数调用时的环境信息、返回值、函数参数等。
堆:动态内存分配,由于历史原因,堆位于未初始化数据段和栈之间。
进程资源
1)每个程序都有一张环境表,通过environ指针数组来保存,也可通过getenv函数来获取。
extern char **environ;
char *getenv(const char *name);
2)每个进程都有一组资源限制,可以用getrlimit和setrlimit函数查询和获取。
int getrlimit(int resource, struct rlimit*rlim);
int setrlimit(int resource, const structrlimit *rlim);
子进程
由fork创建的新进程称为子进程。子进程是父进程的副本,获得父进程数据空间、堆、栈的副本。父、子进程共享正文段。一般来说父子进程的执行顺序是不固定的。
注意:在fork之前不能用标准IO库,因为标准IO库是带缓冲的,该缓冲区也被复制到子进程中。
除了打开文件之外,父进程的很多其他属性也由子进程继承,包括:
1) 用户ID和组ID相关
2) 会话ID
3) 控制终端
4) 环境、根目录和当前工作目录
5) 文件模式创建屏蔽字
6) 信号屏蔽字和安排
7) 连接的共享存储段和存储映射
8) 资源限制
父子进程区别:
1) fork返回值不同,子进程返回0
2) 进程ID和父进程ID不同
3) 子进程的tms_utime、tms_stime、tms_cutime以及tms_ustime均被设置为0
4) 父进程设置的文件锁不会被子进程继承
5) 子进程的未处理的闹钟、信号集被清除
vfork用于创建一个新进程,子进程在父进程的地址空间中运行,例如shell,vfork能保证子进程先运行。
当一个进程终止时,内核就向其父进程发送SIGCHLD信号,父进程可以通过wait或waitpid来获取。其中wait为阻塞调用,waitpid可选非阻塞。
pid = waitpid(-1, NULL, WNOHANG); //非阻塞检查任意子进程退出
信号
信号是异步事件的经典实例,产生信号的事件是随机出现的,必须告诉内核,出现此信号时执行哪个操作。如忽略信号、捕捉信号、默认动作。其中,SIGKILL和SIGSTOP这两个信号是不能被忽略的。
void (*signal(int sig, void(*disp)(int)))(int);
typedef void (*sighandler_t)(int);
sighandler_t signal(int signum,sighandler_t handler);
kill函数将信号发送给指定进程或进程组。
int kill(pid_t pid, int sig);
int raise(int sig);等价于kill(getpid(),sig)
unsigned int alarm(unsigned int seconds);指定井seconds秒后产生信号。
int pause(void);使调用进程挂起直到捕捉到一个信号。
守护进程
例子:
void process_detach()
{
int i;
pid_t pid;
int fd0, fd1, fd2;
umask(0);
if((pid = fork()) < 0)
printf("系统创建子进程失败\n");
if(pid > 0){
printf("系统守护进程在后台运行\n");
exit(0);
}
signal(SIGHUP,SIG_IGN);
setsid();
if((pid = fork()) < 0)
printf("系统创建子进程失败\n");
else if(pid > 0)
exit(0);
for(i=0; i<getdtablesize(); i++)
(void)close(i);
fd0 = open("/dev/null", O_RDWR);
fd1 = dup(0);
fd2 = dup(0);
if(fd0 != 0 || fd1 != 1 || fd2 != 2){
printf("打开012描述符失败\n");
exit(1);
}
sleep(3);
return ;
}
1.2. 线程
线程特点
1) 通过为每种事件类型的处理分配单独的线程,能够简化处理异步事件的代码。每个线程在进行事件处理时可以采用同步编程模式,同步编程模式要比异步编程模式简单很多。
2) 多个进程必须使用操作系统提供的复杂机制才能实现内存和文件描述符的共享,而多个线程自动地可以访问相同的存储地址空间和文件描述符。
3) 有些问题可以通过将其分解而改善整个程序的吞吐量。当然,多进程的也可以。
4) 交互的程序同样可以使用多线程实现响应时间的改善,例如,可以把程序中处理输入输出的部分与其他部分分开。
线程包含了表示进程内执行环境必需的信息,包括线程ID、一组寄存器值、栈、调度优先级和策略、信号屏蔽字、errno变量以及线程私有数据。
线程终止
1) 线程从启动例程中返回,调用return。
2) 线程可以被同一进程中的其他线程取消。
3) 线程调用pthrea_exit。
线程同步
互斥量pthread_mutex_t
如果锁的粒度太粗,就会出现很多线程阻塞等待相同的锁,源自并发性的改善微乎其微。
如果锁的粒度太细,那么过多的锁开销会使系统性能受到影响,而且代码会变得相当复杂。
作为一个程序员,需要在满足锁需求的情况下,在代码复杂性和优化性能之间找好平衡点。
条件变量,通常与互斥量一起使用
读写锁可以有三种状态:读模式下加锁状态;写模式下加锁状态;不加锁状态。
读写锁缺点:当出现写锁时,会等待所有的读锁结束,同时,加写锁时,所有的读锁也不能使用,会导致整个系统出现停顿现象。
读写锁解决方法:可以通过设计优化,降低加锁范围;可以通过多个互斥锁代替读写锁。
int pthread_mutexattr_init(pthread_mutexattr_t*attr);
intpthread_mutexattr_destroy(pthread_mutexattr_t *attr);
int pthread_mutex_init(pthread_mutex_t*restrict mutex, const pthread_mutexattr_t *restrict attr);
int pthread_mutex_destroy(pthread_mutex_t*mutex);
int pthread_mutex_lock(pthread_mutex_t*mutex);
int pthread_mutex_unlock(pthread_mutex_t*mutex);
intpthread_rwlockattr_init(pthread_rwlockattr_t *attr);
intpthread_rwlockattr_destroy(pthread_rwlockattr_t *attr);
int pthread_rwlock_init(pthread_rwlock_t*restrict rwlock, const pthread_rwlockattr_t *restrict attr);
int pthread_rwlock_destroy(pthread_rwlock_t*rwlock);
int pthread_rwlock_rdlock(pthread_rwlock_t*rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t*rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t*rwlock);
互斥量属性
int pthread_mutexattr_getpshared(constpthread_mutexattr_t * restrict attr, int *restrict pshared);
intpthread_mutexattr_setpshared(pthread_mutexattr_t *attr, int pshared);
互斥锁变量可以是进程专用的(进程内)变量,也可以是系统范围内的(进程间)变量。
要在多个进程中的线程之间共享互斥锁,可以在共享内存中创建互斥锁,并将pshared属性设置为 PTHREAD_PROCESS_SHARED。 此行为与最初的 Solaris 线程实现中mutex_init()中的USYNC_PROCESS 标志等效。
如果互斥锁的pshared属性设置为 PTHREAD_PROCESS_PRIVATE,则仅有那些由同一个进程创建的线程才能够处理该互斥锁。如果不同进程的线程访问该互斥量,该行为未定义。
互斥量类型
int pthread_mutexattr_gettype(constpthread_mutexattr_t *restrict attr, int *restrict type);
intpthread_mutexattr_settype(pthread_mutexattr_t *attr, int type);
类型属性的缺省值为 PTHREAD_MUTEX_DEFAULT。
type参数指定互斥锁的类型。以下列出了有效的互斥锁类型:
PTHREAD_MUTEX_NORMAL
描述:此类型的互斥锁不会检测死锁。如果线程在不首先解除互斥锁的情况下尝试重新锁定该互斥锁,则会产生死锁。尝试解除由其他线程锁定的互斥锁会产生不确定的行为。如果尝试解除锁定的互斥锁未锁定,则会产生不确定的行为。
PTHREAD_MUTEX_ERRORCHECK
描述:此类型的互斥锁可提供错误检查。如果线程在不首先解除锁定互斥锁的情况下尝试重新锁定该互斥锁,则会返回错误。如果线程尝试解除锁定的互斥锁已经由其他线程锁定,则会返回错误。如果线程尝试解除锁定的互斥锁未锁定,则会返回错误。
PTHREAD_MUTEX_RECURSIVE
描述:如果线程在不首先解除锁定互斥锁的情况下尝试重新锁定该互斥锁,则可成功锁定该互斥锁。与 PTHREAD_MUTEX_NORMAL 类型的互斥锁不同,对此类型互斥锁进行重新锁定时不会产生死锁情况。多次锁定互斥锁需要进行相同次数的解除锁定才可以释放该锁,然后其他线程才能获取该互斥锁。如果线程尝试解除锁定的互斥锁已经由其他线程锁定,则会返回错误。如果线程尝试解除锁定的互斥锁未锁定,则会返回错误。
PTHREAD_MUTEX_DEFAULT
描述:如果尝试以递归方式锁定此类型的互斥锁,则会产生不确定的行为。对于不是由调用线程锁定的此类型互斥锁,如果尝试对它解除锁定,则会产生不确定的行为。对于尚未锁定的此类型互斥锁,如果尝试对它解除锁定,也会产生不确定的行为。允许在实现中将该互斥锁映射到其他互斥锁类型之一。对于 Solaris 线程,PTHREAD_PROCESS_DEFAULT 会映射到PTHREAD_PROCESS_NORMAL。
例子:
int com_lock_init(pthread_mutex_t *mutex,int type)
{
int stat;
pthread_mutexattr_t mattr;
stat = pthread_mutexattr_init(&mattr);
if(stat != 0){
log_error(pthread_mutexattr_init, stat);
return stat;
}
stat = pthread_mutexattr_settype(&mattr, PTHREAD_MUTEX_ERRORCHECK);
if(stat != 0){
log_error(pthread_mutexattr_settype, stat);
goto over;
}
if(type == COM_LOCK_NORMAL){
goto init;
}else if(type == COM_LOCK_SHARE){
}else if(type == COM_LOCK_REUSE){
stat =pthread_mutexattr_setprotocol(&mattr, PTHREAD_PRIO_INHERIT);
if(stat){
log_error(pthread_mutexattr_setprotocol, stat);
goto over;
}
stat =pthread_mutexattr_setrobust_np(&mattr, PTHREAD_MUTEX_ROBUST_NP);
if(stat){
log_error(pthread_mutexattr_setrobust_np, stat);
goto over;
}
}
stat = pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
if(stat != 0){
log_error(pthread_mutexattr_setpshared, stat);
goto over;
}
init:
stat = pthread_mutex_init(mutex, &mattr);
if(stat != 0){
log_error(pthread_mutex_init,stat);
}
over:
pthread_mutexattr_destroy(&mattr);
return stat;
}
条件变量
条件变量是线程可用的另一种同步机制。条件变量与互斥量一起使用,允许线程以无竞争的方式等待特定的条件发生。
条件本省是由互斥量保护的。线程在该表条件状态前必须首先锁住互斥量。
int pthread_cond_init(pthread_cond_t*restrict cond, const pthread_condattr_t *restrict attr);
int pthread_cond_destroy(pthread_cond_t*cond);
int pthread_cond_wait(pthread_cond_t*restrict cond, pthread_mutex_t *restrict mutex);
int pthread_cond_timedwait(pthread_cond_t*restrict cond, pthread_mutex_t *restrict mutex, const struct timespec*restrict abstime);
int pthread_cond_signal(pthread_cond_t*cond);
int pthread_cond_broadcast(pthread_cond_t*cond);
线程私有数据
为了使基于进程的程序能够在多线程环境下使用,增加了线程私有数据errno。除了使用寄存器外,线程没有办法阻止其他线程访问它的数据,线程私有数据也不例外。但管理线程私有数据的函数可以提高线程间的数据独立性。
int pthread_key_create(pthread_key_t *key,void (*destructor)(void*));
void *pthread_getspecific(pthread_key_tkey);
int pthread_setspecific(pthread_key_t key,const void *value);
线程信号
每个线程都有自己的信号屏蔽字,但是信号的处理是进程中所有线程共享的。尽管单个线程可以阻止某些信号,但当线程修改了与某个信号相关的处理行为后,所有的线程都必须共享这个处理行为的改变。直到另外的线程重新设置信号处理。
进程中的信号是递送到单个线程的。如果信号与硬件故障或计时器超时相关,该信号就被发送到引起该事件的线程中去,而其他信号则被发送到任意一个线程。
2. IO和内存
1.
2.
2.1. IO多路复用
select
int select(int nfds, fd_set *readfds,fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
原本的IO多路复用功能可以由poll完成,select目前能看到的唯一使用方法是做微秒级超时判断。
例子:
void sleep(uint32_t ms)
{
struct timeval tm;
tm.tv_sec = ms / 1000000;
tm.tv_usec = ms % 1000000);
(void) select(1,NULL,NULL,NULL,&tm);
return ;
}
poll
int poll(struct pollfd *fds, nfds_t nfds,int timeout);
加入中断和超时判断的poll监听方法:
例子:
status_t poll(pollfd_t *dplset, int32_t*nsds, int32_t timeout)
{
int rv;
time_t tin;
time_t tout;
if (timeout > 0) {
timeout *= 1000;
tin = time(NULL);
}
do{
rv = poll(dplset->pollset,dplset->curpos, timeout);
if((rv < 0) &&(errno == EINTR) && (timeout > 0)){
tout = time(NULL);
if(tout - tin >=timeout/1000){
rv = 0;
break;
}else{
timeout -=(tout - tin)*1000;
tin = tout;
continue;
}
}
}while((rv < 0) && (errno == EINTR));
(*nsds) = rv;
if ((*nsds) < 0) {
return errno;
}
return SUCCESS;
}
epoll
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd,struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);
epoll数据结构
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
典型epoll程序,参考man epoll
事件 | 值 | 触发条件 |
EPOLLIN | 0x1 | 新连接,断连接,可读 |
EPOLLPRI | 0x2 | 有带外数据到来 |
EPOLLOUT | 0x4 | 新连接,可写 |
EPOLLERR | 0x8 | 写已关闭socket pipe broken |
EPOLLHUP | 0x10 | 断连接,收到RST包。在注册事件的时候这个事件是默认添加 |
EPOLLET | 0x80000000 | 边沿触发模式 |
EPOLLONESHOT | 0x40000000 | 只监听一次事件模式 |
对端异常断开连接(拔网线) |
| 没触发任何事件 |
EPOLL ET 模式下事件触发条件:
EPOLLOUT事件只有在连接时触发一次,表示可写,其他时候想要触发,那你要先准备好下面条件:
1.某次write,写满了发送缓冲区,返回错误码为EAGAIN。
2.对端读取了一些数据,又重新可写了,此时会触发EPOLLOUT。
简单地说:EPOLLOUT事件只有在不可写到可写的转变时刻,才会触发一次,所以叫边缘触发。
其实,如果你真的想强制触发一次,也是有办法的,直接调用epoll_ctl重新设置一下event就可以了,event跟原来的设置一模一样都行(但必须包含EPOLLOUT),关键是重新设置,就会马上触发一次EPOLLOUT事件。
EPOLLIN事件:
EPOLLIN事件则只有当对端有数据写入时才会触发,所以触发一次后需要不断读取所有数据直到读完EAGAIN为止。否则剩下的数据只有在下次对端有写入时才能一起取出来了。
注意事项
1、如果fd被注册到两个epoll中时,如果有事件发生则两个epoll都会触发事件。
2、如果注册到epoll中的fd被关闭,则其会自动被清除出epoll监听列表。
3、如果多个事件同时触发epoll,则多个事件会被联合在一起返回。
4、epoll_wait会一直监听epollhup事件发生,所以其不需要添加到events中。
5、为了避免大数据量io时,et模式下只处理一个fd,其他fd被饿死的情况发生。linux建议可以在fd联系到的结构中增加ready位,然后epoll_wait触发事件之后仅将其置位为ready模式,然后在下边轮询readyfd列表。
6、在水平触发模式下,如果对端socket关闭,则会一直触发epollin事件,驱动去处理clientsocket。
7、在边沿触发模式下,如果client首先发送协议然后shutdown写端。则会触发epollin事件。但是如果处理程序只进行一次recv操作时,根据recv收取到得数据长度来判读后边是否还有需要处理的协议时,将丢失客户端关闭事件。
2.2. 非阻塞IO
阻塞的调用:
1) 如果某些文件类型(管道、终端设备和网络设备)的数据不存在,则读操作可能会使调用者永远阻塞。
2) 如果不能立即被上述同样类型的文件接受(由于管道中无空间、网络流控制等),则写操作也会是调用者永远阻塞。
3) 在某种条件发生之前,打开某些类型的文件会被阻塞(例如打开一个终端设备可能需要等到与之连接的调制解调器应答;又例如在没有其他进程已经用读模式打开该FIFO时若以只写模式打开FIFO,那么也要等待,此时可以使用读写模式打开FIFO)。
4) 对已经加上强制性记录锁的文件进行读写。
5) 某些ioctl操作。
6) 某些进程间通信函数。
对于一个给定的描述符有两种方法对其指定非阻塞IO:
1) 如果调用open获取描述符,则可指定O_NONBLOCK标志。
2) 对于已经打开的一个描述符,可使用fcntl函数指定O_NONBLOCK标志。
2.3. 进程IPC
Pipe
应用场景:异步唤醒多进程间select、poll等。
缺点:需要有父子关系的进程间使用。
当管道的一段被关闭后:
1) 当读一个写端被关闭的管道时,在所有的数据都被读取后,read返回0。
2) 当写一个读端被关闭的管道时,则产生SIGPIPE信号。
Fifo
应用场景:通常用作临时的多模块间的共享内存。Solaris系统中默认大小为16K,Linux系统中默认大小为64K。
默认只读FIFO要阻塞到某个进程为写而打开此FIFO,类似的,只写要阻塞到某个进程为读而打开它。
如果指定了O_NONBLOCK,则只读立即返回,只写将出错返回-1,errno是ENXIO。
采用O_RDWR方式打开的FIFO可以在关闭读端时,将FIFO缓冲区写满。
最后一个将某个FIFO打开着的进程关闭后,该FIFO中的数据都被丢弃。
FIFO是一种只能在单台主机上使用的IPC形式,不同用在通过NFS安装的文件系统上。
从FIFO中读取数据:
1.如果有进程写打开FIFO,且当前FIFO内没有数据,则对于设置了阻塞标志的读操作来说,将一直阻塞。对于没有设置阻塞标志的读操作来说则返回-1,当前errno值为EAGAIN,提醒以后再试。
2.对于设置了阻塞标志的读操作来说,造成阻塞的原因有两种:一种是当前FIFO内有数据,但有其它进程再读这些数据;另一种是FIFO内没有数据,阻塞原因是FIFO中有新的数据写入,而不论新写入数据量的大小,也不论读操作请求多少数据量。
3.读打开的阻塞标志只对本进程第一个读操作施加作用,如果本进程内有多个读操作序列,则在第一个读操作被唤醒并完成读操作后,其他将要执行的读操作将不再阻塞,即使在执行读操作时,FIFO中没有数据也一样(此时,读操作返回0)
4.如果没有进程写打开FIFO,则设置了阻塞标志的读操作会阻塞
5.如果FIFO中有数据,则设置了阻塞标志的读操作不会因为FIFO中的字节数小于请求读的字节数而阻塞,此时,读操作会返回FIFO中现有的数据量。
向FIFO中写入数据:
对于设置了阻塞标志的写操作:
1.当要写入的数据量不大于PIPE_BUF时,Linux将保证写入的原子性。如果此时管道空闲缓冲区不足以容纳要写入的字节数,则进入睡眠,知道当缓冲区中能够容纳要写入的字节数时,才开始进行一次性写操作
2.当要写入的数据量大于PIPE_BUF时,Linux将不再保证写入的原子性。FIFO缓冲区一有空闲区域,写进程就会试图向管道写入数据,写操作在写完所有请求写的数据后返回。
对于没有设置阻塞标志的写操作:
1.当要写入的数据量大于PIPE_BUF时,Linux将不再保证写入的原子性。再写满所有FIFO空闲缓冲区后,写操作返回。
2.当要写入的数据量不大于PIPE_BUF时,Linux将保证写入的原子性。如果当前FIFO空闲缓冲区能够容纳请求写入的字节数,写完后成功返回;如果当前FIFO空闲缓冲区不能够容纳请求写入的字节数,则返回EAGAIN错误,提醒以后再写。
例子:
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <limits.h>
#define WRITENUM 4
typedef struct writeattr writeattr;
struct writeattr{
int id;
int fd;
};
int run_flag;
writeattr wattr[WRITENUM];
void *readfifo(void *arg)
{
int n;
int fd = (int)arg;
char buf[PIPE_BUF];
while(run_flag){
n = read(fd, buf, PIPE_BUF);
if(n>0){
printf("%s\n", buf);
}
}
}
void *writefifo(void *arg)
{
int id =((writeattr*)arg)->id;
int fd =((writeattr*)arg)->fd;
char buf[PIPE_BUF];
memset(buf, '0'+id, PIPE_BUF);
while(run_flag){
write(fd, buf, PIPE_BUF);
}
}
int main()
{
int stat;
char fifo[]="gytest.fifo";
int fd1, fd2, n, s=0;
pthread_t th_w[WRITENUM],th_r;
int i;
//printf("PIPE_BUF:%d\n", PIPE_BUF);
/* create fifo */
remake:
if((fd1 = open(fifo, O_RDONLY | O_NONBLOCK, 0)) < 0){
mkfifo(fifo,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
goto remake;
}
if((fd2 = open(fifo, O_RDWR | O_NONBLOCK, 0)) < 0){
mkfifo(fifo,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
goto remake;
}
/* create thread */
run_flag = 1;
pthread_create(&th_r, NULL, readfifo, fd1);
for(i=0; i<WRITENUM; i++){
wattr[i].id = i;
wattr[i].fd = fd2;
pthread_create(&th_w[i],NULL, writefifo, &wattr[i]);
}
/* destroy thread */
pthread_join(th_r, NULL);
for(i=0; i<WRITENUM; i++)
pthread_join(&th_w[i],NULL);
close(fd1);
close(fd2);
return 0;
}
校验写入FIFO的不同消息间是否有交叉:
#include <stdio.h>
#include <errno.h>
#include <limits.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
int main()
{
int fd, i;
char buf[PIPE_BUF+1];
int n, m=0;
fd = open("1.log", O_RDONLY, 0);
if(fd < 0)
return errno;
while((n = read(fd, buf, PIPE_BUF+1)) > 0){
m++;
for(i=1; i<n-1; i++)
if(buf[0] != buf[i]){
printf("parse failed, %d\n", m);
break;
}
}
printf("M:%d\n", m);
}
Socketpair
应用场景:多进程间信号通知。
sockerpair与pipe的实现略有差异,pipe断连时没有EPOLLIN事件,只有EPOLLHUP,sockerpair两种都有
2.4. 内核IPC(消息队列、信号量、共享内存)
共享内存应用编程接口API允许一个进程使用公共内存区段。共享内存的优点是简易性。
消息队列
消息队列是消息的链接表,存放在内核中并由消息队列标识符标识。
Posix消息队列:
mqd_t mq_open(const char *name, int oflag);
mqd_t mq_open(const char *name, int oflag,mode_t mode, struct mq_attr *attr);
mqd_t mq_close(mqd_t mqdes);
mqd_t mq_getattr(mqd_t mqdes, structmq_attr *attr);
mqd_t mq_setattr(mqd_t mqdes, structmq_attr *newattr, struct mq_attr *oldattr);
mqd_t mq_send(mqd_t mqdes, const char*msg_ptr, size_t msg_len, unsigned msg_prio);
mqd_t mq_timedreceive(mqd_t mqdes, char*msg_ptr, size_t msg_len, unsigned *msg_prio, const struct timespec*abs_timeout);
mqd_t mq_notify(mqd_t mqdes, const structsigevent *notification);
System V消息队列:
key_t ftok(const char *path, int id);
int msgget(key_t key, int msgflg);
int msgsnd(int msqid, const void *msgp,size_t msgsz, int msgflg);
ssize_t msgrcv(int msqid, void *msgp,size_t msgsz, long msgtyp, int msgflg);
int msgctl(int msqid, int cmd, struct msqid_ds*buf);
Posix消息队列和System V系统的消息队列区别:
1、 对Posix消息队列的读总是返回最高优先级的最早消息,对System V消息队列得读则可以返回任意指定优先级的消息
2、 当往一个队列放置一个消息时,Posix消息队列允许产生一个信号或启动一个线程,System V消息队列则不提供类似的机制
消息的属性:
1、 一个无符号整数优先级(Posix)或是长整类型(SystemV) 。
2、 消息的数据部分长度 。
3、 数据本身(System V系统发送消息时需要定义消息的结构,而Posix系统不需要定义这样的结构) 。
msgsnd(int msqid,const void * ptr,size_t length,int flag)中long类型的消息类型不能为0,否则发送会失败。
msgrcv(int msqid,void *ptr ,size_t length,long type ,int flag)中ptr必须是一个结构体的地址,并且该结构体第一个成员要是long类型。否则会接收失败。length参数以字节为单位指定代发送消息的长度。这是位于长整数消息类型之后的用户自定义数据的长度。该长度可以是0。如上例长度为 sizeof(struct my_msgbuf)-sizeof(long)。
系统配置 | 配置含义 | 最大值 | 最小值 | 默认值 | 建议值 |
fs.mqueue.msg_max | 每个消息队列中消息个数最大限制 | 32768 | 10 | 10 | 10000 |
fs.mqueue.msgsize_max | 消息队列消息长度最大限制 | 2147483647 | 8192 | 8192 | 8192 |
fs.mqueue.queues_max | 消息队列个数最大限制 | 2147483647 | 0 | 256 | 10 |
注意:修改完/etc/sysctl.conf配置文件后,需要执行下面命令使其生效 /sbin/sysctl -p |
在linux里,消息队列被创建在虚拟文件系统,因此需要挂载文件系统:
在/etc/rc.local里添加如下两行:
mkdir /dev/mqueue
mount -t mqueue none /dev/mqueue
在文件系统挂载后,消息队列可以作为文件操作,比如
$ ls /dev/mqueue/mymq
QSIZE:129 NOTIFY:2 SIGNO:0 NOTIFY_PID:8260
$rm /dev/mqueue/mymq
信号量
信号量是一个计数器,用于多进程对共享数据对象的访问。
为了获得共享资源,进程需要执行下列操作:
1) 测试控制该资源的信号量;
2) 若此信号量的值为正,则进程可以使用该资源。进程将信号量值减1,表示它使用了1个单位资源。
3) 若此信号量的值为0,则进程进入休眠状态,直至信号量值大于0,。进程被唤醒后,它返回至第1步。
#表示设置的信号量,有4个值(semmsl,semmns,semopm,semmni)
kernel.sem = 250 32000 100 128
共享内存
共享内存区是最快的可用IPC形式。一旦这样的内存区映射到共享它的进程的地址空间,这些进程间数据的传递就不再涉及内核。
然而往该共享内存写入或读出数据的进程间通常需要某种形式的同步。例如:互斥锁、条件变量、读写锁、记录锁、信号灯。
这里的“不在涉及内核”的含义是:进程不是通过执行任何进入内核的系统调用来传递彼此数据的。显然,内核必须建立允许各个进程共享该内存区的内存映射关系,然后一直管理该内存区。而管道、FIFO和System V消息队列的write或msgsnd都涉及从进程到内核的数据拷贝,他们的read或msgrcv都涉及从内核到进程的数据拷贝。通常需要四次数据拷贝,而且这四次拷贝是在内核和进程间进行的,往往开销很大。而采用共享内存通常指需要拷贝两次。
mmap
例子:
void *mmap(void *addr, size_t len, intprot, int flags, int fildes, off_t off);
int mmap_create(void **pmmap, size_t size)
{
int fd;
fd = open("/dev/zero",O_RDWR);
if(fd < 0)
return errno;
(*pmmap) = mmap(NULL,(off_t)size,PROT_READ|PROT_WRITE, MAP_SHARED,fd,0);
close(fd);
if((*pmmap) == MAP_FAILED)
return errno;
memset(*pmmap, 0, (off_t)size);
return 0;
}
int mmap_destroy(void *pmmap, size_t len)
{
if(munmap(pmmap, len) < 0)
return errno;
return 0;
}
SHM
key_t ftok(const char *path, int id);
int shmget(key_t key, size_t size, intshmflg);
void *shmat(int shmid, const void *shmaddr,int shmflg);
int shmctl(int shmid, int cmd, structshmid_ds *buf);
可通过修改系统参数来设置共享内存区大小。
Solaris系统修改方法:
在project文件中添加该主机所有帐户的限制信息
$vi etc/project
添加如下信息(每个模块添加一条记录):(里面的红色色部分需要修改,com1修改为实际的帐户名,1000表示ID,必须在该文件中唯一,需要修改为一个与其他行不同的值)
user.com1:1000::::project.max-shm-memory=(privileged,85899345920,deny)
Linux系统默认值较大,不需要修改:
[host]$ sysctl -a|grep shm
vm.hugetlb_shm_group = 0
kernel.shmmni = 4096
kernel.shmall = 4294967296
kernel.shmmax = 68719476736
例子:
int shm_create(void **pshm, char *fn,size_t *size, uint32_t *first_create)
{
key_t key = IPC_PRIVATE;
int shm_id = 0;
void *shm_buf = NULL;
*first_create = 1;
key = ftok(fn, IPCKEY);
if(-1 == key) {
printf("获取shm key失败。call ftokerror = %d\n", errno);
return errno;
}
shm_id = shmget(key, *size, SHM_R|SHM_W);
//首次创建
if(-1== shm_id) {
shm_id = shmget(key, *size,IPC_CREAT|SHM_R|SHM_W);
if(-1== shm_id) {
printf("创建shm共享内存失败。callshmget error= %d\n", errno);
return errno;
}
shm_buf = shmat(shm_id, NULL,SHM_RND);
if(-1 == (int)shm_buf){
printf("获取共享内存地址失败.call shmat errno = %d\n", errno);
return errno;
} else{
printf("shm共享内存首次创建成功\n");
}
//初始化为空
memset(shm_buf, 0, *size);
}else{
shm_buf = shmat(shm_id, NULL,SHM_RND);
if(-1 == (int)shm_buf){
printf("获取共享内存地址失败.call shmat errno = %d\n", errno);
return errno;
}else{
printf("shm共享内存恢复成功\n");
}
*first_create = 0;
}
(*pshm) = shm_buf;
return 0;
}
2.5. 文件系统IPC
2.6. 进程间通信(IPC)技术总结
1) 消息传递:管道、FIFO、Posix和System V消息队列。
2) 同步:互斥锁、条件变量、读写锁、文件和记录锁、Posix和System V信号灯。
3) 共享内存区:匿名共享内存区、有名Posix和System V共享内存区。
4) 过程调用:Solaris门、Sun RPC。
它们之间的差异:
1) 管道和FIFO是字节流,没有消息边界。Posix和System V消息则有从发送者向接收者维护的记录边界。
2) 当有一个消息放置到空队列时,Posix消息队列可向一个进程发送一个信号,或者启动一个新的线程。这两种消息队列都不能直接跟select或poll一起使用。
3) 管道和FIFO是先进先出的。Posix和System V消息具备由发送者赋予的优先级。从一个Posix消息队列读出时,首先返回的总是最高优先级的消息。从一个System V消息队列读出时,读出者可以要求想要的任意优先级的消息。
4) 当有一个消息发送到一个消息队列,或者写入一个管道或FIFO时,只有一个拷贝递交给仅仅一个线程。
5) 互斥锁、条件变量和读写锁都是无名的,也就是说它们是基于内存的。它们能够很容易在单个进程内的不同线程间共享。然而只有存放在不同进程间共享的共享内存区中时,它们才可能为这些进程所共享。有名信号灯总能在不同进程间共享。
6) 如果持有某个锁的进程没有释放它就终止,内核就自动释放fcntl记录锁。System V信号灯将这一特性作为一个选项提供。互斥锁、条件变量、读写锁和Posix信号灯不具备该特性。
7) Posix和System V共享内存都有随内核的持续性,即使当前没有任何进程使用它,Posix共享内存区的大小可在使用期间扩张,System V共享内存区的大小则是在创建时固定下来的。
8) 在众多同步技术中,可从一个信号处理程序中调用的函数只有sem_post和fcntl。
9) 在众多消息传递技术中,可从一个信号处理程序中调用的函数只有read和write(适用于管道和FIFO)。
3. 网络IPC(socket)
1.
2.
3.
3.1. 网络连接处理
建立连接
int socket(int domain, int type, intprotocol);
int bind(int s, const struct sockaddr*name, int namelen);
int listen(int sockfd, int backlog);
int accept(int sockfd, struct sockaddr*addr, socklen_t *addrlen);
int connect(int s, const struct sockaddr*name, int namelen);
获取地址
int getsockname(int s, struct sockaddr*name, socklen_t *namelen);
int getpeername(int s, struct sockaddr*name, socklen_t *namelen);
收发数据
ssize_t send(int s, const void *buf, size_tlen, int flags);
ssize_t recv(int s, void *buf, size_t len,int flags);
描述
在数据传送过程中所产生的错误不会返回给send。如果发生本地错误,则返回-1。
当要发送的消息长度大于套接字当前可用缓冲区时,send将阻塞,除非在套接字上设置了非阻塞式输入输出模式。对于非阻塞模式,这种情况下将返回 EAGAIN 错误。系统调用 select(2) 可以用来检测何时可以发送更多的数据。
参数flags
MSG_OOB:将送出 out-of-band (带外)数据(比如,SOCK_STREAM 类型的套接字);下层协议也必须支持。带外 数据。
MSG_DONTROUTE:在送出分组时不使用网关。只有直接连接在网络上的主机才能接收到数据。这个标志通常仅用于诊断和路由程序。可路由的协议族才能使用这个标志;包套接字不可以。
MSG_DONTWAIT:使用非阻塞式操作;如果操作需要阻塞,将返回 EAGAIN 错误(也可以用 F_SETFL fcntl(2) 设置 O_NONBLOCK 实现这个功能。)
MSG_NOSIGNAL:当流式套接字的另一端中断连接时不发送SIGPIPE 信号,但仍然返回 EPIPE 错误。
MSG_CONFIRM:(仅用于Linux 2。3以上版本)通知链路层发生了转发过程:得到了另一端的成功应答。 如果链路层没有收到通知,它将按照常规探测网络上的相邻主机(比如通过免费arp)。只能用于 SOCK_DGRAM 和 SOCK_RAW 类型的套接字,且仅对IPv4和IPv6有效。
MSG_PEEK:窥看外来消息,允许我们窥看已可读取的数据,不过系统不丢弃有recv或recvfrom窥看的数据。
MSG_WAITALL:等待所有数据,它告知内核不要在尚未读入请求数目的字节之前让一个读操作返回。
返回值
成功时返回发送的字符个数,否则返回-1。
错误代码
EBADF:指定了非法描述符。
ENOTSOCK:参数 s 不是一个套接字。
EFAULT:参数指定的用户地址空间非法。
EMSGSIZE:消息长度越界。
EAGAIN或者EWOULDBLOCK:套接字设置为非阻塞式,但所请求的操作需要阻塞。
ENOBUFS:网络接口输出队列已满。这通常表明接口已停止发送,也有可能是暂时性的拥挤(这不会发生在linux下,当设备队列溢出时数据报只是被简单丢弃。
EINTR:接收到信号。
ENOMEM:没有可用内存。
EINVAL:传递的参数非法。
EPIPE:连接套接字的本地端已关闭。这种情况下进程还会接收到 SIGPIPE 信号。
设置参数
int getsockopt(int s, int level, intoptname, void *optval, socklen_t *optlen);
int setsockopt(int s, int level, intoptname, const void *optval, socklen_t optlen);
int fcntl(int fd, int cmd, long arg);
int ioctl(int d, int request, ...);
例子:
int socket_create_tcp_listen(int *sockfd,char *ip, int port,int backlog)
{
int stat;
struct sockaddr_in serv_addr;
if(sockfd == NULL || ip == NULL)
return COM_EINVAL;
*sockfd = socket(AF_INET, SOCK_STREAM, 0);
if(-1 == *sockfd){
return errno;
}
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(port);
serv_addr.sin_addr.s_addr = inet_addr(ip);
bzero(&serv_addr.sin_zero, 8);
stat = bind(*sockfd,(struct sockaddr *)&serv_addr,sizeof(structsockaddr));
if(-1 == stat){
return errno;
}
stat = listen(*sockfd, backlog);
if(-1 == stat){
return errno;
}
return 0;
}
fcntl(sockfd, F_SETFL, fcntl(sockfd,F_GETFD, 0)|O_NONBLOCK);
3.2. 网络事件处理
处理流程
1. 连接的建立,包括服务端接受(accept)新连接和客户端成功发起(connect)连接。TCP 连接一旦建立,客户端和服务端是平等的,可以各自收发数据。
2. 连接的断开,包括主动断开(close、shutdown)和被动断开(read(2)返回0)。
3. 消息到达,文件描述符可读。这是最为重要的一个事件,对它的处理方式决定了网络编程的风格(阻塞还是非阻塞,如何处理分包,应用层的缓冲如何设计,等等)。
3.5 消息发送完毕,这算半个。对于低流量的服务,可以不必关心这个事件;另外,这里的“发送完毕”是指将数据写入操作系统的缓冲区,将由TCP 协议栈负责数据的发送与重传,不代表对方已经收到数据。
例子:
int com_event_loop(com_event_t *efd)
{
int stat = 0;
int i, nfds, sock;
struct epoll_event events[MAX_EVENTS];
if(efd == NULL)
return COM_EINVAL;
// epoll wait
while(efd->run_flag){
memset(events, 0,sizeof(events));
nfds = epoll_wait(efd->epfd,events, MAX_EVENTS, 100);
for(i=0; i<nfds; ++i){
sock =events[i].data.fd;
if(events[i].events& EPOLLIN){
//log_printf("[event]:EPOLLIN[%d]\n", sock);
stat =recv_socket(efd, sock);
if(stat != 0){
log_error(recv_socket, stat);
stat =com_event_socket_del(efd, sock);
if(stat!= 0){
log_error(com_event_socket_del, stat);
}
}
}else if(events[i].events& EPOLLOUT){
//log_printf("[event]:EPOLLOUT[%d]\n", sock);
stat =efd->handle_out(efd, sock, efd->sock[sock].arg);
if(stat != 0){
log_error(handle_out, stat);
}
}else{
log_printf("[event]:com_event_loop,socket[%d],events[%#x]\n",sock, events[i].events);
}
}
stat = timer_update(efd);
if(stat != 0){
log_error(timer_update,stat);
}
}
return stat;
}
3.3. 非阻塞IO
可能阻塞的套接口调用可分为以下四类:
1) 输入操作:read、readv、recv、recvfrom和recvmsg。
阻塞socket:如果某个进程对一个阻塞的socket调用这些函数,如果此时接收缓冲区没有数据可读,则阻塞,如果缓冲区有数据,则返回成功。如果想等到某个固定数目的数据,可以指定MSG_WAITALL标志。
非阻塞socket,如果接收缓冲区没有数据,则立即返回EWOULDBLOCK。
2) 输出操作:write、writev、send、sendto和sendmsg。
阻塞socket:情况同上。
非阻塞socket:如果发送缓冲区没有空间,则立即返回EWOULDBLOCK。
3) 接收外来连接:accept。
阻塞socket:如果对一个阻塞的socket调用accept,并且尚无新的连接到达,则阻塞。
非阻塞socket:如果调用accept,并且尚无新的连接到达,则立即返回EWOULDBLOCK。
4) 发起外出连接:connect。
阻塞socket:connet函数一直要等到客户端收到对于自己的SYN的ACK位置才返回。这意味着个connect总是阻塞其调用进程至少一个RTT时间。
非阻塞socket:如果连接不能立即建立,那么连接的建立照样发起,不过返回一个EINPROGRESS。
3.4. 非阻塞accept
当有一个已完成的连接准备好被accept时,select将作为可读描述符返回该连接的监听socket。因此,如果我们使用select在某个监听socket上等待一个外来连接,那就没必要把该监听socket设置为非阻塞,这是因为select告诉我们该socket上已有连接就绪,那么随后的accept调用不应该阻塞。
不幸的是,这里存在一个可能让我们掉入陷阱的定时问题,例如:
1) 客户端建立一个连接并随后夭折它。
2) Select向服务器进程返回可读条件,不过服务器要过一小段时间才调用accept。
3) 在服务器从select返回到调用accept期间,服务器TCP收到来自客户的RST。
4) 这个已完成的连接被服务器TCP驱除出队列,我们假设队列中没有其他已完成的连接。
5) 服务器调用accept,但是由于没有任何已完成的连接,服务器于是阻塞。
服务器会一直阻塞在accept调用上,直到某个客户建立一个连接为止。但是在此期间,服务器无法处理任何其他的已就绪的描述符。
本问题的结晶办法如下:
1) 当使用select获悉某个监听socket上何时有已完成连接准备好被accept时,总是把这个监听socket设置为非阻塞。
2) 在后续的accept调用中忽略以下错误:EWOULDBLOCK、ECONNABORTED、EPROTO和EINTR。
例子:
create_tcp_listen(listenfd);
fcntl(listenfd,F_SETFL, fcntl(listenfd, F_GETFD, 0)|O_NONBLOCK);
pfds[0].fd = listenfd;
pfds[0].events = POLLIN;
while(run_flag){
do{
nfds = poll(pfds, 1,3000);
}while(nfds<0 &&errno==EINTR);
//new conn
clilen = sizeof(structsockaddr_in);
newfd = accept(listenfd,(structsockaddr *)&addr, &clilen);
if(newfd < 0)
continue;
fcntl(newfd, F_SETFL,fcntl(newfd, F_GETFD, 0)|O_NONBLOCK);
//add event
stat =com_event_socket_add(&g_res.efd, newfd, EPOLLIN|EPOLLERR, DEF_TIMEOUT,NULL);
if(stat != 0)
log_error(com_event_socket_add, stat);
else
log_printf("[smsc]:accept_socket[%d],ip[%s]\n", newfd,inet_ntoa(addr.sin_addr));
//update smsc
g_smsc.sock[newfd] = 1;
g_smsc.type[newfd] =SOCKET_NONE;
}
3.5. 客户/服务器程序设计范式
3.6. 网络通信模式
模式1:单进程单线程
应用场景:适合处理简单的连接类或超时类的事件,网关老代码中多数采用该模式。
使用方法:一般采用poll + event loop的循环事件处理方式,属于典型的reactor模式。
优点:逻辑简单。
缺点:性能不足,特别是某个事件处理过程占用较多CPU时间时,会造成其他事件响应延迟。
注意:在需要使用fork函数的线程中必须使用单进程,避免程序出现异常错误。
举例:在网关与短信中心通信过程中,通常采用poll监听父进程fd、对端连接fd、发送信号fd等描述符,分别处理超时事件、父进程事件、对端连接事件、发送消息事件。
模式2:单进程多线程
IO线程+工作线程
这种模型下,IO线程负责event loop、IO操作,工作线程负责实际的业务逻辑处理,IO线程与工作线程可以通过阻塞队列/共享内存等方式进行数据交换,队列/共享内存的访问需要加锁。
实际上,这种模型本质上与单线程是类似的,只不过这里的业务逻辑交由单独的工作线程处理。
多个IO线程/工作线程
这种模型下,每个IO线程都有一个event loop,另外,这里的工作线程可有可无,而且通常是没有,即IO线程既处理IO,又进行业务逻辑计算。
例如,主从式leader/follower(L/F)和简单复制muti-reactor(M-R)模型都属于这类。Memcached使用的M-R,ICE使用的L/F。
模式3:多进程单线程
简单复制模式1
优点:逻辑简单,可部分提高处理性能。
缺点:使用多进程间共享锁时,必须要求锁可恢复,否则当一个进程加锁后阻塞或死掉时,其他进程会全部挂死。而使用可恢复锁会影响加锁性能。
主进程+工作进程
优点:进程间没有共享数据或共享数据多数为只读,相比模式2来说,逻辑简单。
缺点:使用多进程间共享锁时,必须要求锁可恢复,否则当一个进程加锁后阻塞或死掉时,其他进程会全部挂死。而使用可恢复锁会影响加锁性能。
模式4:多进程多线程
通常是模式2与模式3的混搭。
各个模式原则总结
One event loop per thread
这种方式的好处是:
1)线程数目基本固定,不会频繁创建和销毁;
2)不需要考虑多线程并发处理sock。
一个程序到底使用一个还是多个event loop呢?ZeroMQ的手册给出的建议是:按照每千兆比特/秒吞吐量配置一个event loop的比例来设置。
按照这个建议,在千兆以太网上用一个event loop就足以应付所有网络IO。
在网关中,由于所有模块共享千兆以太网,既包括与外部网元通信的模块,如SMSC、SMG、BOSS等,又包括内部通信模块,如Kernel和SR,CMS等模块。要充分发挥整体系统性能,既需要提高单模块的处理性能,也需要减少冗余模块数量。
模式2与模式3区别如何取舍
在其他条件相同的情况下,可根据程序响应一次请求所访问的内存大小来决定,
如果访问的内存较大,那么就使用多线程,避免CPU cache换入换出操作,影响性能;
如果访问内存较小,就使用多进程,编码简单。
IO多路复用函数缺点
Select、poll、epoll、dev/poll、kqueue等都不能根据每个fd设置超时时间。
Libevent库优点
libevent是一个事件触发的网络库,适用于windows、linux、bsd等多种平台,内部使用select、epoll、kqueue等系统调用管理事件机制。著名分布式缓存软件memcached也是libevent based,而且libevent在使用上可以做到跨平台,而且根据libevent官方网站上公布的数据统计,似乎也有着非凡的性能。
libevent支持用户使用三种类型的事件,分别是网络IO、定时器、信号三种,
1)在定时器的实现上使用了最小堆(MinHeap),以达到高效查找、排序、删除定时器的目的,网络IO上,主要关注了一下linux上的epoll(因为目前的开发主要在linux平台),结果发现libevent的epoll居然用的EPOLLLT,水平触发的方式用起来比较方便,不容易出错,但是在效率上可能比EPOLLET要低一些。
2)网络IO和信号的数据结构采用了双向队列(TAILQ)。在实现上主要有3种链表: EVLIST_INSERTED, EVLIST_ACTIVE, EVLIST_TIMEOUT,一个ev在这3种链表之间被插入或删除,处于EVLIST_ACTIVE链表中的ev最后将会被调度执行。
Libevent提供了DNS,HTTP Server,RPC等组件,HTTPServer可以说是Libevent的经典应用。从http.c可看到Libevent的很多标准写法。
写非阻塞式的HTTP Server很容易将socket处理与HTTP协议处理纠缠在一起,Libevent在这点上似乎也有值得推敲的地方。
libevent支持多线程编程,每个事件需要关联到自己的event_base。
Libev库优点
libevent首个版本发布于2000-11-14.是高性能事件循环,支持简单的API,两种事件类型( I/O+timeout,signal+timeout),支持select,poll,epoll,kqueue等。
libev第一个版本发布于2007-11-12,也是高性能事件循环,支持八种事件类型(I/O,real time timers,wall clock timers,signals,child status changes, idle,check and prepare handlers)
它使用一种优先队列的方式管理计数器以及使用数组作为基础数据结构。对于相同事件监视器数没有人为的限制。它为libevent提供一个竞争层。
基于libev具有更低的消耗,因此要快于libevent,在api设计问题也在结果中扮演重要的角色,当采用计时器的时候native api比emulation API更有效。尽管这使得libev处于劣势,但它仍然要快于libevent。
避免多线程同时操作一个socket描述符
每个文件描述符只由一个线程操作。
新的fd加入linux内核的的版本
signalfd:2.6.22
timerfd:2.6.25
eventfd:2.6.22
三种fd的意义:
signalfd:传统的处理信号的方式是注册信号处理函数;由于信号是异步发生的,要解决数据的并发访问,可重入问题。signalfd可以将信号抽象为一个文件描述符,当有信号发生时可以对其read,这样可以将信号的监听放到select、poll、epoll等监听队列中。
timerfd:可以实现定时器的功能,将定时器抽象为文件描述符,当定时器到期时可以对其read,这样也可以放到监听队列的主循环中。
eventfd:实现了线程之间事件通知的方式,eventfd的缓冲区大小是sizeof(uint64_t);向其write可以递增这个计数 器,read操作可以读取,并进行清零;eventfd也可以放到监听队列中,当计数器不是0时,有可读事件发生,可以进行读取。
从Linux内核2.6.27起,凡是会创建文件描述符的syscall一般都增加了额外的flags参数,可以直接指定O_NONBLOCK和FD_CLOEXEC,例如:
accept4 – 2.6.28
eventfd2 – 2.6.27
inotify_init1 – 2.6.27
pipe2 – 2.6.27
signalfd4 – 2.6.27
timerfd_create 2.6.25
推荐模式
1、 一个epoll线程完成所有的IO操作,收发消息通过回调函数定义。
2、 多个worker线程处理业务,根据CPU数据决定线程数目。
3、 将socket与其timeout事件绑定,采用数组实现socket和timeout管理。
例子:
//监听的最大sock数量
#define MAX_SOCKET 65536
//消息缓冲区最大值
#define MAX_WMEM 16384
//消息包最大值
#define MAX_PACK 1024
//连接最大超时时间
#define MAX_TIMEOUT 300
enum{
SOCKET_UNUSE,
SOCKET_INUSE,
SOCKET_ERROR,
};
typedef struct com_event_socket_t com_event_socket_t;
typedef struct com_event_async_t com_event_async_t;
typedef struct com_event_timer_t com_event_timer_t;
typedef struct com_event_attr_t com_event_attr_t;
typedef struct com_event_t com_event_t;
struct com_event_socket_t{
int status;
short timeout; //固定超时时间
short timenext; //下次超时时间,0~MAX_TIMEOUT
int bufrn;
int bufsn;
char bufr[MAX_PACK]; //接收缓冲区
char bufs[MAX_PACK]; //发送缓冲区
void *arg; //回调参数
};
struct com_event_timer_t{
int timelast; //上次更新时间
short timecur; //当前超时秒,0~MAX_TIMEOUT
};
struct com_event_attr_t{
int port;
int max_conn;
int (*handle_in_decode)(com_event_t *efd, int sock, void *ptr, int len, int*msglen);
int (*handle_in)(com_event_t*efd, int sock, void *ptr, int msglen, void *arg);
int (*handle_out)(com_event_t*efd, int sock, void *arg);
int (*handle_error)(com_event_t *efd, int sock, void *arg);
int (*handle_timeout)(com_event_t *efd, int sock, void *arg);
};
struct com_event_t{
int run_flag; //运行标志
int max_conn; //最大连接数,避免fd用光
int epfd; //epoll描述符
/* socket struct callback */
com_event_socket_t sock[MAX_SOCKET]; //连接信息
int (*handle_in_decode)(com_event_t *efd, int sock, void *ptr, int len, int*msglen); //分包接收
int (*handle_in)(com_event_t*efd, int sock, void *ptr, int msglen, void *arg);
int (*handle_out)(com_event_t*efd, int sock, void *arg);
int (*handle_error)(com_event_t *efd, int sock, void *arg);
/* timer struct callback */
com_event_timer_t timer;
int (*handle_timeout)(com_event_t*efd, int sock, void *arg);
};
/* interface */
int com_event_init(com_event_t *efd,com_event_attr_t *attr);
int com_event_loop(com_event_t *efd);
int com_event_exit(com_event_t *efd);
int com_event_socket_add(com_event_t *efd,int sock, int events, int timeout, void *arg);
int com_event_socket_mod(com_event_t *efd,int sock, int events, int timeout, void *arg);
int com_event_socket_del(com_event_t *efd,int sock);
int com_event_socket_send(com_event_t *efd,int sock, void *ptr, int msglen, void *arg);
4. 数据结构
4.
4.1. 队列
一般采用双向链表来实现队列操作,额外需要在数据结构里增加com_list类型变量。
typedef struct com_list com_list;
struct com_list{
struct com_list *next, *prev;
};
typedef struct com_queue com_queue;
struct com_queue{
com_list head;
pthread_mutex_t lock;
int count;
};
4.2. 数组和HASH
能用数组地方不用HASH,能用HASH的地方不用链表,避免采用链表时查找。
4.3. 树和堆
平衡二叉树
应用场景:带有区间比较的,例如号段范围。
typedef struct bbtree_node bbtree_node;
typedef struct bbtree_tree bbtree_tree;
struct bbtree_node{
bbtree_node *left; //left child pointer
bbtree_node *right; //right child pointer
bbtree_node *parent; //parent pointer
int bf; //balance factor
void *data; //value pointer
};
struct bbtree_tree{
bbtree_node *root; //root pointer
pthread_mutex_t lock;
int keysize;
int (*compare)(constvoid*,const void*);
};
B+树
应用场景:文件系统,带有单个值比较的,例如消息ID。
typedef struct dpl_bptree_t dpl_bptree_t;
typedef struct bptree_node bptree_node;
struct bptree_node{
com_list link;
int isleaf; /* 0-node,1-leaf */
int numrec; /* number of elem */
bptree_node *parent;
void *data;
};
struct bptree_tree{
bptree_node *root;
pthread_mutex_t lock;
int msgc;
int nodec;
int keysize;
int rank;
int (*compare)(constvoid*,const void*);
};
基数树
应用场景:手机号码精确查找。
typedef struct digittree_nodedigittree_node;
typedef struct digittree_treedigittree_tree;
struct digittree_node{
digittree_node *parent;
char label; //节点标签
char level; //节点高度
char valid; //是否有效
void *data; //业务数据
digittree_node *child[10];
};
struct digittree_tree{
int nodes;
pthread_mutex_t lock;
digittree_node *root;
};
堆
应用场景:带有最小或最大N个值的,例如socket超时时间排序。
跳表
如果你不想一边看算法一边在网上找代码示例来实现各种树结构,那么可以考虑跳表。
typedef struct skip_node skip_node;
typedef struct skip_list skip_list;
struct skip_node{
keyType key;
valueType value;
skip_node *forward[1]; //向前指针数组,根据该节点层数的不同指向不同大小的数组
};
struct skip_list{
int level;
structskip_node *header;
};
5. 其他
1.
2.
3.
4.
5.
5.1. 硬件处理速度
寄存器:1ns
一级Cache:5~10ns
二级Cache:40~60ns
内存100~150ns
硬盘:3~15ms
5.2. 软件处理速度
在硬件E5320,Linux2.6.32平台下:
fork()+exit():160us
pthread_create()+pthread_join():42.5us,其中创建线程用了26.1us
push/pop a blocking queue:11.5us
context switch :1.5us
5.3. Read-copy update
对效率要求较高的地方可以采用此方法,例如,处理一个加锁更新时间较长的操作,可以先在备用配置上操作,然后加锁替换配置。
5.4. 禁止使用system函数等临时创建资源
可通过标准IO库函数替代system函数避免新生成进程耗费系统资源。
5.5. Setjmp和longjmp
在C语言中,goto语句是不能跨越函数的,可以使用setjmp和longjmp。通常,在多进程的程序中,用longjmp来响应退出信号,跳转到main函数的退出位置。
例子:
jmp_buf exit_jb;
int is_running = 1;
static void sig_fatal(int signo)
{
if(is_running){
is_running = 0;
longjmp(exit_jb,signo);
}
}
void set_root_signal(void)
{
signal(SIGINT, sig_fatal);
signal(SIGQUIT, sig_fatal);
signal(SIGILL, sig_fatal);
signal(SIGTRAP, sig_fatal);
signal(SIGIOT, sig_fatal);
signal(SIGFPE, sig_fatal);
signal(SIGTERM, sig_fatal);
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
signal(SIGUSR1, SIG_IGN);
signal(SIGUSR2, SIG_IGN);
}
Int main()
{
if(setjmp(exit_jb))
goto shutdown;
shutdown:
clost(fd[1]);
return0;
}
5.6. 动态连接库
重要的dlfcn.h头文件
LINUX下使用动态链接库,源程序需要包含dlfcn.h头文件,此文件定义了调用动态链接库的函数的原型。下面详细说明一下这些函数。
1、原型为: const char*dlerror(void);
当动态链接库操作函数执行失败时,dlerror可以返回出错信息,返回值为NULL时表示操作函数执行成功。
2、原型为: void *dlopen(const char *filename, int flag);
dlopen用于打开指定名字(filename)的动态链接库,并返回操作句柄。
filename: 如果名字不以/开头,则非绝对路径名,将按下列先后顺序查找该文件。
(1) 用户环境变量中的LD_LIBRARY值;
(2) 动态链接缓冲文件/etc/ld.so.cache
(3) 目录/lib,/usr/lib
flag表示在什么时候解决未定义的符号(调用)。取值有两个:
1) RTLD_LAZY : 表明在动态链接库的函数代码执行时解决。
2) RTLD_NOW : 表明在dlopen返回前就解决所有未定义的符号,一旦未解决,dlopen将返回错误。
dlopen调用失败时,将返回NULL值,否则返回的是操作句柄。
3、原型为: void*dlsym(void *handle, char *symbol);
dlsym根据动态链接库操作句柄(handle)与符号(symbol),返回符号对应的函数的执行代码地址。由此地址,可以带参数执行相应的函数。
如程序代码: void (*add)(int x,int y); /* 说明一下要调用的动态函数add */
add=dlsym("xxx.so","add");/* 打开xxx.so共享库,取add函数地址 */
add(89,369); /* 带两个参数89和369调用add函数 */
4、原型为: int dlclose(void *handle);
dlclose用于关闭指定句柄的动态链接库,只有当此动态链接库的使用计数为0时,才会真正被系统卸载。
更多推荐
所有评论(0)