linux的动态定时器--时间轮
定时器—有时也称为动态定时器或内核定时器—是管理内核时间的基础。定时器是一种软件功能,即允许在将来的某个时刻,函数在给定的时间间隔用完时被调用。注意的是定时器并不会周期运行,它在超时后就自行销毁,这也是定时器被称为动态定时器的一个原因。动态定时器不断地创建和销毁,而且它的运行次数也不受限制。
定时器在内核代码中属于一个基础组件。要想完全弄清楚linux2.6中内核定时器的实现,得先从初始化开始。
在start_kernel(void)-->init_timers(void)
void __init init_timers(void)
{
int err = timer_cpu_notify(&timers_nb, (unsigned long)CPU_UP_PREPARE,
(void *)(long)smp_processor_id());
init_timer_stats();
BUG_ON(err == NOTIFY_BAD);
register_cpu_notifier(&timers_nb);
open_softirq(TIMER_SOFTIRQ, run_timer_softirq);
}
在timer_cpu_notify(&timers_nb,(unsigned long)CPU_UP_PREPARE,
(void*)(long)smp_processor_id());
中执行
init_timers_cpu(cpu) //初始化本cpu中的timers
初始化的主要代码是:spin_lock_init(&base->lock);
for (j = 0; j < TVN_SIZE; j++) {
INIT_LIST_HEAD(base->tv5.vec + j);
INIT_LIST_HEAD(base->tv4.vec + j);
INIT_LIST_HEAD(base->tv3.vec + j);
INIT_LIST_HEAD(base->tv2.vec + j);
}
for (j = 0; j < TVR_SIZE; j++)
INIT_LIST_HEAD(base->tv1.vec + j);
base->timer_jiffies = jiffies;
base->next_timer = base->timer_jiffies;
这段代码的主体是base,base的定义是:structtvec_base *base;
这个tvec_base是动态定时器的主要数据结构,每个cpu上有一个,它包含相应cpu中处理动态定时器需要的所有数据。为简化分析仅考虑单cpu。给出这个数据机构:
struct tvec_base {
spinlock_t lock;
struct timer_list *running_timer;
unsigned long timer_jiffies;
unsigned long next_timer;
struct tvec_root tv1;
struct tvec tv2;
struct tvec tv3;
struct tvec tv4;
struct tvec tv5;
} ____cacheline_aligned;
其中,timer_list是具体定时器的结构体(后面再具体看timer_list结构体);上面包含tv1,tv2,tv3,tv4,tv5;内核定时器的巧妙设计就在于此。
#define TVN_BITS (CONFIG_BASE_SMALL ? 4 : 6)
#define TVR_BITS (CONFIG_BASE_SMALL ? 6 : 8)
#define TVN_SIZE (1 << TVN_BITS)
#define TVR_SIZE (1 << TVR_BITS)
#define TVN_MASK (TVN_SIZE - 1)
#define TVR_MASK (TVR_SIZE - 1)
struct tvec {
struct list_head vec[TVN_SIZE];
};
struct tvec_root {
struct list_head vec[TVR_SIZE];
};
从这个定义看到,tv1就是长度为256的数组,数组成员是list_head;同样的,tv2,tv3,tv4和tv5都是长度为64的数组,数组成员是list_head。List_head就是linux内核代码中广泛使用的双向链表。在阻塞和非阻塞中的等待队列时就看到了list_head的应用。那么,tv1-tv5都是数组+链表的实现,其实hash的有一种简单实现就是数组+链表的组合,那么这几个就有些hash的味道,具体是不是,还要分析道后面才知道。
再回到前面的初始化中,
for(j = 0; j < TVN_SIZE; j++) {
INIT_LIST_HEAD(base->tv5.vec+ j);
INIT_LIST_HEAD(base->tv4.vec+ j);
INIT_LIST_HEAD(base->tv3.vec+ j);
INIT_LIST_HEAD(base->tv2.vec+ j);
}
for(j = 0; j < TVR_SIZE; j++)
INIT_LIST_HEAD(base->tv1.vec+ j);
就tv1-tv5这5个结构体中的数组中每个list_head进行初始化。
base->timer_jiffies= jiffies;
base->next_timer= base->timer_jiffies;
将base中的timer_jiffies和next_timer都初始化为jiffies。
初始化完成后,在init_timers函数的第二个重要的步骤是:
open_softirq(TIMER_SOFTIRQ,run_timer_softirq);
这个函数注册定时器软中断,简单的软中断分析见《linux的软中断》;
下面来看具体定时器的初始化和添加操作:
初始化的函数:
#define init_timer(timer) \
do { \
static struct lock_class_key __key; \
init_timer_key((timer), #timer, &__key); \
} while (0)
void init_timer_key(struct timer_list *timer,
const char *name,
struct lock_class_key *key)
{
debug_init(timer);
__init_timer(timer, name, key);
}
static void __init_timer(struct timer_list *timer,
const char *name,
struct lock_class_key *key)
{
timer->entry.next = NULL;
timer->base = __raw_get_cpu_var(tvec_bases);
#ifdef CONFIG_TIMER_STATS
timer->start_site = NULL;
timer->start_pid = -1;
memset(timer->start_comm, 0, TASK_COMM_LEN);
#endif
lockdep_init_map(&timer->lockdep_map, name, key, 0);
}
初始化的宏定义:
#define TIMER_INITIALIZER(_function, _expires, _data) { \
.entry = { .prev = TIMER_ENTRY_STATIC }, \
.function = (_function), \
.expires = (_expires), \
.data = (_data), \
.base = &boot_tvec_bases, \
__TIMER_LOCKDEP_MAP_INITIALIZER( \
__FILE__ ":" __stringify(__LINE__)) \
}
定时器的添加:
void add_timer(struct timer_list *timer)
{
BUG_ON(timer_pending(timer));
mod_timer(timer, timer->expires);
}
Timer_list结构体的定义:
struct timer_list {
struct list_head entry;
unsigned long expires;
void (*function)(unsigned long);
unsigned long data;
struct tvec_base *base;
#ifdef CONFIG_TIMER_STATS
void *start_site;
char start_comm[16];
int start_pid;
#endif
#ifdef CONFIG_LOCKDEP
struct lockdep_map lockdep_map;
#endif
};
int mod_timer(struct timer_list *timer, unsigned long expires)
{
/*
* This is a common optimization triggered by the
* networking code - if the timer is re-modified
* to be the same thing then just return:
*/
if (timer_pending(timer) && timer->expires == expires)
return 1;
return __mod_timer(timer, expires, false, TIMER_NOT_PINNED);
}
1、 if(timer_pending(timer) && timer->expires == expires)
return1;
如果该timer已经在已经加入了timer_base中的某个向量(tv1—tv5)中,函数什么也不做,直接返回。
2、__mod_timer(timer,expires, false, TIMER_NOT_PINNED);
这个函数的关键代码:
timer->expires = expires;
if (time_before(timer->expires, base->next_timer) &&
!tbase_get_deferrable(timer->base))
base->next_timer = timer->expires;
internal_add_timer(base, timer);
internal_add_timer(base,timer);函数完成实际的添加操作:
static void internal_add_timer(struct tvec_base *base, struct timer_list *timer)
{
unsigned long expires = timer->expires;
unsigned long idx = expires - base->timer_jiffies;
struct list_head *vec;
if (idx < TVR_SIZE) {
int i = expires & TVR_MASK;
vec = base->tv1.vec + i;
} else if (idx < 1 << (TVR_BITS + TVN_BITS)) {
int i = (expires >> TVR_BITS) & TVN_MASK;
vec = base->tv2.vec + i;
} else if (idx < 1 << (TVR_BITS + 2 * TVN_BITS)) {
int i = (expires >> (TVR_BITS + TVN_BITS)) & TVN_MASK;
vec = base->tv3.vec + i;
} else if (idx < 1 << (TVR_BITS + 3 * TVN_BITS)) {
int i = (expires >> (TVR_BITS + 2 * TVN_BITS)) & TVN_MASK;
vec = base->tv4.vec + i;
} else if ((signed long) idx < 0) {
/*
* Can happen if you add a timer with expires == jiffies,
* or you set a timer to go off in the past
*/
vec = base->tv1.vec + (base->timer_jiffies & TVR_MASK);
} else {
int i;
/* If the timeout is larger than 0xffffffff on 64-bit
* architectures then we use the maximum timeout:
*/
if (idx > 0xffffffffUL) {
idx = 0xffffffffUL;
expires = idx + base->timer_jiffies;
}
i = (expires >> (TVR_BITS + 3 * TVN_BITS)) & TVN_MASK;
vec = base->tv5.vec + i;
}
/*
* Timers are FIFO:
*/
list_add_tail(&timer->entry, vec);
}
internal_add_timer函数仔细分析:
1、 unsigned long expires = timer->expires;
unsigned long idx = expires - base->timer_jiffies;
struct list_head *vec;
unsignedlong expires = timer->expires;这个是该定时器定的时间,
而base->timer_jiffies表示需要检查的动态定时器的最早到期时间。
那么unsignedlong idx = expires - base->timer_jiffies;
他们的差值表示表示这个新插入的定时器相对于最早到期的定时器还需要多长时间间隔才到期。
差值越大,这个定时器的处理可以放后,差值越小,这个定时器就越被关心。
2、if (idx < TVR_SIZE) {
int i = expires & TVR_MASK;
vec = base->tv1.vec + i;
} else if (idx < 1 << (TVR_BITS + TVN_BITS)) {
int i = (expires >> TVR_BITS) & TVN_MASK;
vec = base->tv2.vec + i;
} else if (idx < 1 << (TVR_BITS + 2 * TVN_BITS)) {
int i = (expires >> (TVR_BITS + TVN_BITS)) & TVN_MASK;
vec = base->tv3.vec + i;
} else if (idx < 1 << (TVR_BITS + 3 * TVN_BITS)) {
int i = (expires >> (TVR_BITS + 2 * TVN_BITS)) & TVN_MASK;
vec = base->tv4.vec + i;
} else if ((signed long) idx < 0) {
/*
* Can happen if you add a timer with expires == jiffies,
* or you set a timer to go off in the past
*/
vec = base->tv1.vec + (base->timer_jiffies & TVR_MASK);
} else {
int i;
/* If the timeout is larger than 0xffffffff on 64-bit
* architectures then we use the maximum timeout:
*/
if (idx > 0xffffffffUL) {
idx = 0xffffffffUL;
expires = idx + base->timer_jiffies;
}
i = (expires >> (TVR_BITS + 3 * TVN_BITS)) & TVN_MASK;
vec = base->tv5.vec + i;
}
这一步就是根据idx的值,来确定这个定时器该插入哪个级别的定时器向量中:tv1--tv5.
举两个例子,如果idx= 240;那么表明这个定时器在接下来的256个jiffies单位中就会被处理,将它加入到tv1中:
if(idx < TVR_SIZE) {
inti = expires & TVR_MASK;
vec= base->tv1.vec + i;
}
如果idx在256和19384之间,则将它加入到tv2中:
elseif (idx < 1 << (TVR_BITS + TVN_BITS)) {
inti = (expires >> TVR_BITS) & TVN_MASK;
vec= base->tv2.vec + i;
}
只要(expires>> TVR_BITS) & TVN_MASK值相同的定时器放在tv2下的同一个链表中。
依此类推。
这个地方再仔细想一想:因为按照idx的间隔大小做了区分,那tv1--tv5中每个数组的链表的长度是有限定的:
数组大小 每个数组的链表的粒度
tv1: 256 1
tv2: 64 2<<8 =256
tv3: 64 2<<(8+6)= 16384
tv4: 64 2<<(8+2*6)=1048576
tv5: 64 2<<(8+3*6)=67108864
在tv1中,每个链表头只能包含时间相同的定时器。
而在tv2中,64个数组项中的每个链表中都可以包含256个连续的定时长度的定时器。(当然对于每个定时长度,可以有多个定时器)
其实,这些都是部分排序的,比如tv2中的第一个数组中的链表的所有定时器肯定比第二个数组中的链表的任一个定时器小,但第一个数组中的链表中的定时器不一定是排好序的。tv1中的定时器<tv2<tv3<tv4<tv5.
总的加起来,也可以表示2^32的时间范围。这里才用了256+64+64+64+64 = 256 个链表头。
3、list_add_tail(&timer->entry,vec);
将这个定时器timer内嵌的list_head(entry)加入到上面确定的timer向量vec的尾部。
接下来,我们来分析动态定时器的执行:
再回头看看最开始定时器初始化部分中提到的定时器中断服务函数:
run_timer_softirq
/*
* This function runs timers and the timer-tq in bottom half context.
*/
static void run_timer_softirq(struct softirq_action *h)
{
struct tvec_base *base = __get_cpu_var(tvec_bases);
hrtimer_run_pending();
if (time_after_eq(jiffies, base->timer_jiffies))
__run_timers(base);
}
if(time_after_eq(jiffies, base->timer_jiffies))
__run_timers(base);
当当前系统的jiffies大于或等于base->timer_jiffies(动态定时器中的最早到期时间)时,就会运行__run_timers(base);
这个函数的注释如下:
/**
*__run_timers - run all expired timers (if any) on this CPU.
*@base: the timer vector to be processed.
*
*This function cascades all vectors andexecutes all expired timer
*vectors.
*/
__run_timers会将所有的定时器向量层叠起来,执行到期的定时器向量。
__run_timers的执行核心如下:
pin_lock_irq(&base->lock);
while (time_after_eq(jiffies, base->timer_jiffies)) {
struct list_head work_list;
struct list_head *head = &work_list;
int index = base->timer_jiffies & TVR_MASK;
/*
* Cascade timers:
*/
if (!index &&
(!cascade(base, &base->tv2, INDEX(0))) &&
(!cascade(base, &base->tv3, INDEX(1))) &&
!cascade(base, &base->tv4, INDEX(2)))
cascade(base, &base->tv5, INDEX(3));
++base->timer_jiffies;
list_replace_init(base->tv1.vec + index, &work_list);
while (!list_empty(head)) {
void (*fn)(unsigned long);
unsigned long data;
timer = list_first_entry(head, struct timer_list,entry);
fn = timer->function;
data = timer->data;
timer_stats_account_timer(timer);
set_running_timer(base, timer);
detach_timer(timer, 1);
-
spin_lock_irq(&base->lock);
获取自旋锁并禁止本地中断;
-
进入while循环while(time_after_eq(jiffies, base->timer_jiffies))
当base->timer_jiffies大于jiffies时候,while循环才会终止。
每一次循环的各个子步骤:
A、 structlist_head work_list;
struct list_head *head =&work_list;
intindex = base->timer_jiffies & TVR_MASK;
获取base->tv1中的索引index,如果index的值等于0,说明tv1中各个数组项都
已经执行过了,需要从tv2中将base->tv2.vec+1这个桶中的定时器搬移到tv1中,因为随着时间的推移,tv2中第1个桶中(包含还有256到511才到期的,而tv2的桶编号是第0个桶到第63个桶,因为tv1中包含0到255到期的,tv2的第0个桶是空的,实际上在添加定时器的时候,tv2 tv3 tv4的第0个桶不可能有定时器,因为不可能在保证差值的情况下还保证那个hash用到的索引值等于0;以下面的第一张图:tv3中的桶的粒度16k = 63*256 (tv2的63个桶)+ 256*1(tv1的256个桶))
需要说明的是,每次执行__run_timers是将tv2中某个桶的定时器搬移到tv1中,但最开始是从第一个桶开始的。
上面的index是tv1的索引
INDEX(0)是tv2的索引
INDEX(1)是tv3的索引
INDEX(2)是tv4的索引
INDEX(3)是tv5的索引
给出INDEX的宏定义:
#defineINDEX(N) ((base->timer_jiffies >> (TVR_BITS + (N) *TVN_BITS)) & TVN_MASK)
这几个和index的用法是类似的。
将tv2的某个桶的定时器搬移到tv1中是用casecade函数完成的:
cascade(base,&base->tv2, INDEX(0))
staticint cascade(struct tvec_base *base, struct tvec *tv, int index)
{
/*cascade all the timers from tv up one level */
structtimer_list *timer, *tmp;
structlist_head tv_list;
list_replace_init(tv->vec+ index, &tv_list);
/*
* We are removing _all_ timers from the list, so we
* don't have to detach them individually.
*/
list_for_each_entry_safe(timer,tmp, &tv_list, entry) {
BUG_ON(tbase_get_base(timer->base)!= base);
internal_add_timer(base,timer);
}
returnindex;
}
以最开始从tv2中的第一个桶搬移到tv1中为例:这时候的INDEX(0)就是1;
list_replace_init(tv->vec+ index, &tv_list);
首先找到这个tv2下这个桶挂着的链表;
list_for_each_entry_safe(timer,tmp, &tv_list, entry) {
BUG_ON(tbase_get_base(timer->base)!= base);
internal_add_timer(base,timer);
}
然后遍历刚才找到的链表,对每一个链表项上的定时器进行添加操作;
举个数据例子:
最开始tv1中包含的还有256节拍以内到期的定时器,而tv2的第一个桶包含的还有256—512节拍到期的定时器,因为时间在流逝,tv1中的定时器都被处理了,这时候tv2中第一个桶中的定时器就变为还有256节拍内到期的定时器了;
因此在internal_add_timer(base,timer);函数中根据idx这个差值来选择具体的tv向量,所以这里的定时器都被插入到tv1中。
后面的搬移是同样的道理,比如tv3中的第一个桶会搬移到tv1和tv2中;不再赘述。
给出两张好图,方便理解:
B、++base->timer_jiffies;值加1;
C、list_replace_init(base->tv1.vec+ index, &work_list);//我们始终最关心的还是tv1中的定时器,因为它们很快就要到期了
while(!list_empty(head)) {
……
}
对于用index索引到的base->tv1.vec+ index的那256个桶中的一个,这个while循环遍历该桶中的链表中挂着的每一个定时器:
{
void(*fn)(unsigned long);
unsignedlong data;
timer= list_first_entry(head, struct timer_list,entry);
fn= timer->function;
data= timer->data;
timer_stats_account_timer(timer);
set_running_timer(base,timer);
detach_timer(timer,1);
……
trace_timer_expire_entry(timer);
fn(data);
trace_timer_expire_exit(timer);
timer= list_first_entry(head, structtimer_list,entry);是利用container_of宏定义来根据list_head指针获取定时器的指针;
fn = timer->function;
data= timer->data;
fn和data就是初始化定时器时设定的回调函数;
detach_timer(timer,1);将该定时器从链表中删除;
fn(data); 调用fn(timer->function )函数;
到此,利用软中断机制和特殊设计的hash,linux内核中的动态定时器在算法复杂度和内存上都比较优化。仔细揣摩一下这个动态定时器的设计,就觉得设计的还是相当的巧妙,
利用多级数组+双向链表,用hash的思想做了时间轮的设计。在算法复杂度和内存上都不错。
如果对我的blog不够理解,可以参考下面两篇,写的比较简洁清晰的:
如果对该算法的复杂度,内存使用有一般的分析,或者该算法和其他算法的对比,可以参考下面两篇文章:
Linux 下定时器的实现方式分析
kernel/timer.c designIngo Molnar 对 linux 中的时间轮的实现做了最好的解释。
Linux的动态定时器从本质上是single-shottimer,也就是说timer到期了,调用回调函数后,这个timer就会删掉。如果要实现repeatingtimer,可以在回调函数中再次注册自己,这样该定时器就成了一个周期性的定时器。理解了定时器的算法,做一个周期性的定时器就很容易了~~
更多推荐
所有评论(0)