LINUX内核信号量设计与实现

taoistf just for fun

taoistf@gmail.com 2008/08/18

LINUX内核信号量简介

为了同步对内核共享资源的访问,内核提供了 down函数和 up函数用于获取和释放资源。down和up所保护的访问资源的内核代码区域,就构成一个临界区。在等待获取资源进入临界区的过程中,代表进程运行的内核控制路径可以睡眠。

我们从 LINUX内核信号量最直观的设计/实现出发,通过一步步改进,揭示在 x86平台上完整的信号量设计/实现,然后探讨在不同平台上通用的信号量设计/实现。

LINUX内核信号量的初步设计与实现

1 数据结构

我们首先分析信号量 semphore应具备的数据结构。它需要一个计数 count,表示能进入临界区的进程个数。conut一般初始化为 1,表示信号量是互斥信号量,一次只允许一个进程进入临界区。它也能被初始化为其他正值,表示可有多个进程同时进入临界区。

当进程不能进入临界区时,它必须在信号量上睡眠,因此需要一个表示 “等待队列头 ”的字段 wait。在 SMP中,等待队列需要自旋锁来保护,因此 wait结构中应含有自旋锁 lock,和指向等待队列链表的指针 task_list。而插入等待队列的 “等待元素 ”,其字段中应含有指向进程结构的指针 task,及能够链入等待队列的指针 task_list。

Generated by Foxit PDF Creator © Foxit Software

http://www.foxitsoftware.com For evaluation only.

具体表示如下:

struct wait_queue_t{

struct task_struct * task;

struct list_head task_list; }; /*等待元素结构 */

struct wait_queue_head_t{

spinlock_t lock;

struct list_head task_list; }; /*等待队列头结构 */

struct semphore{

int count;

wait_queue_head_t wait; } /*信号量结构 */

2 算法

当进程需要获取信号量时,它调用down函数进入临界区。 down将semphore的count字段减1,若count非负,则可进入临界区;否则,加入睡眠队列。当进程离开临界区时,它调用up函数。 up将semphore的count字段加 1,若count非正,表示有进程睡眠,则将进程从wait等待队列中移走并唤醒它。

这里有一个问题:当up唤醒等待队列中睡眠进程时,是唤醒所有等待进程,还是只唤醒一个?如果唤醒所有进程,它们醒来后,就会去竞争一个获得信号量的机会,竞争失败的进程只能再度睡眠,这就使得系统有许多无谓的 schedule切换,降低了系统性能。所以, up只唤醒一个进程。为了保持 FIFO的唤醒顺序,down会将新进程以独占方式(表示唤醒时只唤醒一个)加在等待队列尾部,up则去唤醒等待队列头部的第一个独占进程。

3 实现

由于信号量的实现效率与系统性能息息相关,为了达到高效的信号量实现,不同的 cpu系统根据各自特点,提供了不同版本。我们以 x86平台为例,介绍信号量的实现与优化。

我们要注意一点:由于 x86平台对原子指令有很好的支持,而并发的多个进程可能

Generated by Foxit PDF Creator © Foxit Software

http://www.foxitsoftware.com For evaluation only.

会同时修改或读取 count值,所以对 count的修改与读取都要使用原子指令。为了达到高效, down函数与 up函数都使用汇编语言实现。 down函数特别要保证:进程能以最快的速度,判断出能否进入临界区。这样才能保证高效的系统性能。

按上述算法,其实现过程是很直接的。设sem是类型为 struct semphore的信号量地址。 down:

movl $sem, %ecx lock;decl (%ecx); jns 1f /* 如非负,则往前跳到标号 1处进入临界区 */ /*调用者堆栈保存约定 */ pushl %eax

pushl %edx pushl %ecx call __down /*如为负,调用 __down加入睡眠队列 */ popl %ecx popl %edx popl %eax

1:

void __down(struct semaphore * sem)

{ struct task_struct *tsk = current; /*申明“等待元素结构 ”,并将结构中的进程指针初始化为 tsk */ DECLARE_WAITQUEUE(wait, tsk); tsk->state = TASK_UNINTERRUPTIBLE; /*将wait表示的“等待元素 ”以独占方式加在 sem->wait表示的等待队列尾部 */ add_wait_queue_exclusive(&sem->wait, &wait); schedule();

}

up: movl $sem, %ecx lock;incl (%ecx); jg1f /* 如为正,则往前跳到标号 1处执行 up后第一条指令 */ /*调用者堆栈保存约定 */ pushl %eax

pushl %edx pushl %ecx call __up /*如为非正,调用 __up唤醒睡眠进程 */ popl %ecx popl %edx popl %eax

1:

Generated by Foxit PDF Creator © Foxit Software

http://www.foxitsoftware.com For evaluation only.

void __up(struct semaphore * sem)

{

/*在sem->wait表示的等待队列中,获取其第一个 “等待元素 ”wait */

struct wait_queue_t *wait = list_entry(&sem->wait.task_list,struct wait_queue_t ,

task_list);

remove_wait_queue(&sem->wait, &wait); /* 在等待队列中删除 “等待元素 ” */

wake_up(sem->wait); /*唤醒等待队列中的第一个 “等待元素 ”所表示的独占进程 */ }

三一步步改进信号量的设计与实现

前面提到过,信号量的实现效率与系统性能息息相关。下面,我们围绕如何提高性能,对上述信号量的设计/实现进行改进。

1 分析上述实现中的性能问题

我们设想这样的情景:当 wait等待队列不空时, up出现,它唤醒 wait队列的第一个进程prev。与此同时,有一个进程 next(通过内核控制路径)调用 down试图进入临界区。但prev原来调用 down试图进入临界区时对 count减1,在未能进入临界区而睡眠时并未恢复,直到prev被唤醒并调度执行进入临界区,而且执行完临界区代码,再调用up时才会对count增1。所以,进程 next只可能在 prev之后,进入临界区。也就是说:进程完全是按照发出down调用的次序,以 FIFO方式进入临界区,这非常公平。

但我们要注意一点:当 up出现时,说明可有一个进程进入临界区。如果 prev的优先级较低,它被唤醒后,很可能长时间都不能获得调度,在这段时间内,尽管允许进程进入临界区,而且进程next又试图进入临界区,但任何试图进入临界区的进程发只能睡眠!

我们从系统性能的角度来考察是否允许next在prev之前进入临界区。对内核信号量的争用,影响系统性能的关键就是进程上下文切换schedule的代价。如果不允许 next在prev之前进入临界区,则 next必须被shedule切换掉。而允许 next在prev之前进入临界区,则 prev“至多 ”再次被 shedule。

而在以下两种情形下, prev无须被再次 shedule切换掉。 a在单 cpu上, up唤醒prev后,由于进程 next在prev之前获得调度运行, prev无须再次进行shedule切换。

Generated by Foxit PDF Creator © Foxit Software

http://www.foxitsoftware.com For evaluation only.

b在多 cpu上,如果 next在prev获得调度之前,执行完临界区代码,则也可减少一次对prev的shedule切换。

所以,如果允许进程 next在up后,但在 prev获得调度前进入临界区,能减少系统进行 schedule切换的次数,则系统的性能就会获得提高,系统会更有效率。其弊端是:后发出down调用的进程可能先获取信号量,一定程度上违背了公平原则。

如果允许next提前进入临界区,则原来的代码必须进行修改:当进程不能获取信号量而睡眠时,count要增1,抵消调用 down时的无条件减1。当进程睡眠醒来时,相当于进程调用 down之前的状态:可能与新进入的调用 down的进程竞争信号量。它再次尝试进入临界区的方式也应该与 down一致: count先减1,再判断能否进入临界区。

因此__down将被改写为:

void __down(struct semaphore * sem)

{ struct task_struct *tsk = current; DECLARE_WAITQUEUE(wait, tsk);

for (;;) { /* 被唤醒后,还需再次判断能否进入临界区 */ atomic_inc(&sem->count); /* 睡眠前恢复 count */ tsk->state = TASK_UNINTERRUPTIBLE; add_wait_queue_exclusive(&sem->wait, &wait);

schedule();

tsk->state = TASK_UNINTERRUPTIBLE; atomic_dec(&sem->count); /* 尝试进入临界区前,先减 1 */ if(atomic_read((&sem->count) >= 0)/* 重新判断能否进入临界区 */

break; } tsk->state = TASK_RUNNING;

}

在上述实现中,进程不一定按 FIFO进入临界区。而在进程尝试进入临界区时,总是无条件先减1,直到不能进入临界区时,才恢复 count值。这种对 count“优先减 1”的处理方法,使得在一些局部的时段, count的值会比“能进入临界区的”进程个数要小。这种实现上的特点,使得它能确保同时进入临界区的进程不会超过 count初值所限定的数目,但可能导致本来能进入临界区的进程无法进入临界区,特别是导致唤醒事件的丢失或延迟,这是我们在下面要尤其关注的。

Generated by Foxit PDF Creator © Foxit Software

http://www.foxitsoftware.com For evaluation only.

2 引入全局 spinlock保护对 count的修改与读取

分析一下,上述实现其实存在错误。考察下述情景:假设进程 A已进入临界区,进程B在等待队列。 A调用 up增加 sem->count,唤醒进程 B,同时进程 C也调用 __down试图进入临界区。假设事件发生的时序如下(表示形式:指令[所属进程顺序 ]): lock;decl(%ecx)[C1] lock;incl(%ecx)[A2] atomic_dec(&sem->count)[B3] if(atomic_read((&sem->count) >= 0)[B4] atomic_inc(&sem->count)[B5] atomic_inc(&sem->count)[C6]注:上述%ecx的值就是 &sem->count

显然,如果按照这个时序,不管进程 B还是C都会睡眠。只有下一个 up才能将它们唤醒,这就相当于丢失了一次唤醒事件。出现这个问题的原因是:进程 B对sem->count进行修改与判断时,进程 C对sem->count的修改与判断也同时进行。要解决这个问题,必须把对sem->count的修改与判断放在一个自旋锁保护的“原子块”中,使得在同一个原子块中,对sem->count的判断与修改是一致的,而后一个原子块只能看到一致结果。我们引入专门用于信号量的全局自旋锁 semaphore_lock,因此 __down将被改写为:

void __down(struct semaphore * sem)

{ struct task_struct *tsk = current; DECLARE_WAITQUEUE(wait, tsk); spin_lock(&semaphore_lock); /* 原子化 sem->count的修改与判断 */ for (;;) {

atomic_inc(&sem->count); spin_unlock(&semaphore_lock);

tsk->state = TASK_UNINTERRUPTIBLE; add_wait_queue_exclusive(&sem->wait, &wait); schedule();

tsk->state = TASK_UNINTERRUPTIBLE; spin_lock(&semaphore_lock);/* 原子化 sem->count的修改与判断 */ atomic_dec(&sem->count); if(atomic_read((&sem->count) >= 0) /* 重新判断能否进入临界区 */

break; } tsk->state = TASK_RUNNING;

}

但上述实现仍有问题。我们还是沿用同样的情景,假设进程B比进程 C先获得 semaphore_lock,则原来的时序也依然有效:

Generated by Foxit PDF Creator © Foxit Software

http://www.foxitsoftware.com For evaluation only.

lock;decl(%ecx)[C1] lock;incl(%ecx)[A2] atomic_dec(&sem->count)[B3] if(atomic_read((&sem->count) >= 0)[B4] atomic_inc(&sem->count)[B5] atomic_inc(&sem->count)[C6]

不管进程B还是 C仍旧会睡眠。不过,现在的原因是非常明显的:进程 C获得 semaphore_lock时,sem->count已经发生变化,但进程 C并未读取判断sem->count,而是直接睡眠。只要将对 sem->count的判断放在 schedule之前,进程 C就可以进入临界区。这种改变使得任何情形下执行的原子块都含有对sem->count的读取判断,因而就能“看到“前一个原子块对sem->count的一致修改。因此__down将被改写为:

void __down(struct semaphore * sem)

{ struct task_struct *tsk = current; DECLARE_WAITQUEUE(wait, tsk); spin_lock(&semaphore_lock); /* 原子化 sem->count的修改与判断 */ for (;;) {

if(atomic_read((&sem->count) >= 0) /* 将判断放在 schedule之前 */

break; atomic_inc(&sem->count); spin_unlock(&semaphore_lock);

tsk->state = TASK_UNINTERRUPTIBLE; add_wait_queue_exclusive(&sem->wait, &wait); schedule();

tsk->state = TASK_UNINTERRUPTIBLE; spin_lock(&semaphore_lock);/* 原子化 sem->count的修改与判断 */ atomic_dec(&sem->count);

} tsk->state = TASK_RUNNING; }

在下面的讨论中,我们还会将自旋锁所保护的代码区域称作一个“原子块”。

为了对上述实现有一个更好的直观理解,我们来做一个小测试:

假设count的初值为 1。进程 A已在临界区,进程 B在等待队列,C,D,E同时调用 __down试图进入临界区(即同时调用 lock;decl(%ecx))。然后, C发现不能进入临界区,首先获取自旋锁,此时进程 A调用up唤醒进程 B,它们按B,D,E的顺序获取自旋锁。那么,B,C,D,E中的哪一个进程将进入临界区?

Generated by Foxit PDF Creator © Foxit Software

http://www.foxitsoftware.com For evaluation only.

3 无竞争地进入睡眠

上述实现仍在错误。考察下述情景:假设进程A已进入临界区,进程 B调用 __down试图进入临界区,同时, A调用 up增加 sem->count,唤醒睡眠进程。假设事件发生的时序如下:

lock;decl((%ecx) [B1] if(atomic_read((&sem->count) >= 0)[B2] lock;incl (%ecx)[A3] wake_up(sem->wait)[A4] add_wait_queue_exclusive(&sem->wait, &wait)[B5] 注:上述%ecx的值就是 &sem->count

结果是:进程 B无法进入临界区而睡眠,但进程 A却没有唤醒它!也是相当于丢失了一次唤醒事件。引起这个问题的原因,是因为进程 B先判断 sem->count,如果为负,然后加入睡眠队列。而进程 A的唤醒事件在进程 B“判断 sem->count是否为负 ”和“加入睡眠队列”之间到来。如果将原代码改成:先加入等待队列,然后再判断 sem->count,则不会出现上述竞态条件,问题迎刃而解。

在LDD“中断处理”一章“无竞争地睡眠”一节也讲到了类似内容,将其称作“半睡眠状态时的检查”。在内核中,引入wait_event,wait_event_interruptible,就是为了解决由sleep_on,sleep_on_interrupitle引发的同样的唤醒丢失问题,而 down进入睡眠的方式与wait_event一致。

因此__down将被改写为:

void __down(struct semaphore * sem)

{ struct task_struct *tsk = current; DECLARE_WAITQUEUE(wait, tsk); tsk->state = TASK_UNINTERRUPTIBLE;

spin_lock(&semaphore_lock);

for (;;) { /* 将加入等待队列操作放在判断之前 */ add_wait_queue_exclusive(&sem->wait, &wait); if (atomic_read((&sem->count) >= 0)

break; atomic_inc(&sem->count); spin_unlock(&semaphore_lock); schedule(); tsk->state = TASK_UNINTERRUPTIBLE; spin_lock(&semaphore_lock); atomic_dec(&sem->count);

} spin_unlock(&semaphore_lock);

Generated by Foxit PDF Creator © Foxit Software

http://www.foxitsoftware.com For evaluation only.

remove_wait_queue(&sem->wait, &wait); /* 如能进入临界区,从等待队列中删除 */ tsk->state = TASK_RUNNING; }

考察这个实现,需要注意一点,add_wait_queue_exclusive函数需要获取 sem->wait.lock自旋锁,它本身又在 semaphore_lock自旋锁保护之内。如果获取 sem->wait.lock自旋锁需要较长时间,就会使得由 semaphore_lock自旋锁所保护临界区代码,执行时间过长,其他等待获得自旋锁的进程也会等待很长时间。另一方面,获取不同自旋锁的方式一般是:获取一把锁,释放后再获取另一把锁,以避免发生死锁。所以,需要在上述实现中将add_wait_queue_exclusive函数移到 spin_lock(&semaphore_lock)之前,也即for循环之外。

此时,会出现另一个问题:如果进程不能进入临界区而睡眠,当它被唤醒时,同时被__up从等待队列中删除,但由于add_wait_queue_exclusive在for循环之外,它不会重新加入等待队列!此时,需要我们改进__up:__up唤醒进程时,并不把它从等待队列中删除。这种改进还有一个额外的功效,能避免进程在尝试进入临界区的过程中,反复加入等待队列和从等待队列中删除,从而提高了性能。(但也会引发新的问题,我们在下一个步骤中就会看到)

因此__down将被改写为:

void __down(struct semaphore * sem)

{ struct task_struct *tsk = current; DECLARE_WAITQUEUE(wait, tsk); tsk->state = TASK_UNINTERRUPTIBLE; /*将加入等待队列操作放在获取 semaphore_lock之前 */ add_wait_queue_exclusive(&sem->wait, &wait); spin_lock(&semaphore_lock); for (;;) {

if (atomic_read((&sem->count) >= 0)

break; atomic_inc(&sem->count); spin_unlock(&semaphore_lock); schedule(); tsk->state = TASK_UNINTERRUPTIBLE; spin_lock(&semaphore_lock); atomic_dec(&sem->count);

} spin_unlock(&semaphore_lock); remove_wait_queue(&sem->wait, &wait); tsk->state = TASK_RUNNING;

}

而__up将被改写为:

Generated by Foxit PDF Creator © Foxit Software

http://www.foxitsoftware.com For evaluation only.

void __up(struct semaphore * sem) {

wake_up(sem->wait); /*去掉从等待队列中删除进程的操作 */ }

4进入临界区前,弥补可能丢失的唤醒事件

上述实现仍在问题。在上述实现中,进程一开始加入等待队列,被唤醒时仍然在等待队列里。直到获取sem,进入临界区之前,才从等待队列里删除。前面提到过, sem->count的初值一般为 1,表示sem为互斥信号量。但 sem的设计允许count的初值为任何正值,表示可有 count个进程同时进入临界区。设想这样的情形: count的初值为2。进程A,B都在临界区里,进程 C,D在睡眠队列里等待进入临界区,进程 C在睡眠队列头部,进程A,B几乎同时调用up唤醒睡眠进程。

假设事件发生的时序如下:

wake_up(sem->wait)[A1] wake_up(sem->wait)[B2] remove_wait_queue(&sem->wait, &wait)[C3]

这样,虽然有两次唤醒操作,但针对的都是进程 C,进程 D依然处于睡眠状态,相当于又丢失了一次唤醒事件。只有在进程C被调度执行并进入临界区,执行完临界区代码,然后调用up,进程 D才能被唤醒。所以,严格来说,唤醒事件并未丢失,而是被严重延迟。引起这个问题的原因是:一个唤醒操作只能唤醒睡眠队列第一个进程,而且并未将其从等待队列中删除,使其仍然处在睡眠队列头,下一个唤醒操作可能又针对同一个进程。为了解决这个问题,在进程进入临界区之前,需再次调用 wake_up(sem->wait),以弥补可能延迟的唤醒事件。

需要额外注意的一点是,在linux中,还提供了一种 down操作的非阻塞版本 down_trylock:当不能获取信号量时,进程不睡眠而直接退出,它一般用在不能睡眠的中断服务例程和可延迟函数中。因此在上述实现中,如果在spin_lock保护的原子块中发生中断,而中断中又调用 down_trylock,down_trylock也会调用 spin_lock,就会导致死锁。因此,需将上述实现中涉及到spin_lock的地方,换成其带中断控制的版本。

因此__down将被改写为:

void __down(struct semaphore * sem)

{

struct task_struct *tsk = current;

DECLARE_WAITQUEUE(wait, tsk);

Generated by Foxit PDF Creator © Foxit Software

http://www.foxitsoftware.com For evaluation only.

tsk->state = TASK_UNINTERRUPTIBLE; add_wait_queue_exclusive(&sem->wait, &wait);

spin_lock_irq(&semaphore_lock); /* 加上 “禁止中断 ”功能 */ for (;;) { if (atomic_read(&sem->count) >= 0)

break; atomic_inc(&sem->count); spin_unlock_irq(&semaphore_lock); /* 加上“允许中断 ”功能 */ schedule(); tsk->state = TASK_UNINTERRUPTIBLE; spin_lock_irq(&semaphore_lock); /*加上“禁止中断 ”功能 */ atomic_dec(&sem->count);

} spin_unlock_irq(&semaphore_lock); remove_wait_queue(&sem->wait, &wait); tsk->state = TASK_RUNNING; wake_up(&sem->wait); /*弥补可能丢失的唤醒事件 */

}

从直观上看,当存在“可能丢失的唤醒事件”时, sem->count的值应该大于0,那么,在__down最后对 wake_up函数的调用能否改成条件判断,即: if (atomic_read(&sem->count) > 0)

wake_up(&sem->wait)当条件判断不满足时,就能减少对wake_up的调用,从而优化了程序性能。

仔细一想,其实这是有问题的!设想这样的情形: count的初值为2。进程 A正准备进入临界区,但尚未从等待队列中删除,依然位于等待队列头部。进程 B已在临界区里,进程C在等待队列中,位于进程 A之后。当进程 A执行到 __down的尾部 spin_unlock_irq(&semaphore_lock)与 remove_wait_queue(&sem->wait, &wait)之间时候,进程B调用up,与此同时,有一个进程 D调用down试图进入临界区。假设事件发生时序如下:

spin_unlock_irq(&semaphore_lock)[A1] lock;decl (%ecx) [D2] spin_lock_irq(&semaphore_lock) [D3] if atomic_read(&sem->count)[D4] lock;incl (%ecx) [B5] wake_up(&sem->wait)[B6] remove_wait_queue(&sem->wait, &wait)[A7] if (atomic_read(&sem->count) > 0)[A8] atomic_inc(&sem->count)[D9]

按照这个时序,虽然进程 B调用了 up,但它不能唤醒进程 C,也不能使进程 D进入临界区,而 [A8]处的判断,使得并不重新调用 wake_up,因此,唤醒事件仍然丢失了!问题的根源在于: [A8]处的判断不在semaphore_lock的保护之下,这种判断是不可靠的,因而是无意义的。

Generated by Foxit PDF Creator © Foxit Software

http://www.foxitsoftware.com For evaluation only.

5 引入 sleepers,尽量减少原子指令

在上述实现中,用到了很多原子指令。而原子指令是很耗时的指令,并且在几乎所有系统中,它相当于memory barrier,使得编译器和 cpu都不能改变它前后的读写内存指令的顺序,因此也无法进行优化。因此,从程序性能的角度来讲,应该尽量避免使用原子指令。我们回忆一下:为避免 “只有先调用 down的进程才能先进入临界区 ”导致的系统性能损失,当进程无法进入临界区睡眠和被唤醒时,引入原子指令对 count计数进行直接修改,使新调用 down的进程有可能先进入临界区。

在保证同样功能的前提下,要减少原子指令,关键是:当进程无法进入临界区而睡眠时,及当进程被唤醒时,尽量不要直接修改count的值,而且尽可能合并 “修改与读取判断count的值 ”为一个原子指令。所以,我们可考虑一个两阶段修改 count的方案:当进程无法进入临界区而睡眠时,不修改 count,而是由下一个调用 down的进程在count中将此 1加上。

具体实现中,我们在 semphore结构中引入新的字段 sleepers,当进程能够进入临界区时,设置其值为0;当进程需要睡眠时,设置其值为 1。设想前一个进程已经设置 sleepers,则当本进程在 __down中读取count时(不管是第一次进入 for循环,还是从睡眠中醒来),需将sleepers加上,它表示:如果前一个进程已经进入临界区,则count不变;如果前一个进程睡眠,则需在 count中将 1补上。如果本进程不能进入临界区而睡眠,则当本进程醒来时,它仍相当于进程调用 down之前的状态,与原来的处理一样:使 count先减 1,再判断能否进入临界区。

我们可以考虑将__down中for循环改成如下代码:

spin_lock_irq(&semaphore_lock); /* 加上“禁止中断 ”功能 */

for (;;) {

int sleepers = sem->sleepers;

if (!atomic_add_negative(sleepers, &sem->count)) {

sem->sleepers = 0; /* 本进程能进入临界区,后一个进程不用增加 count */

break;

}

sem->sleepers = 1; /*本进程不能进入临界区,后一个进程需要增加 count */

spin_unlock_irq(&semaphore_lock);

schedule();

tsk->state = TASK_UNINTERRUPTIBLE;

spin_lock_irq(&semaphore_lock);

atomic_dec(&sem->count); /* 尝试进入临界区前,先减 1 */ }

我们在这里要注意一点:对函数atomic_add_negative的调用与对sleepers的赋值

Generated by Foxit PDF Creator © Foxit Software

http://www.foxitsoftware.com For evaluation only.

(sem->sleepers = 0 or 1)一定要放在 semaphore_lock自旋锁的保护之下,这与步骤 2中要将atomic_read与atomic_inc放在自旋锁内是一个道理,否则会引发相似的并发问题。

接下来思考:能否将atomic_dec(&sem->count)的功能,考虑由 atomic_add_negative(sleepers, &sem->count)来代替,这样就又能减少一条原子指令。我们注意到一点:进程只有在睡眠醒来后才需要对count减1,而在第一次进入 for循环时,并不需要减 1。因此,我们考虑将 atomic_add_negative函数中的 sleepers参数替换为sleepers-1,这样,就使得 count无条件减 1。但在进入 for循环时,使 sleepers增1,以抵消原子指令无条件减1的影响。

因此__down将被改写为:

void __down(struct semaphore * sem)

{ struct task_struct *tsk = current; DECLARE_WAITQUEUE(wait, tsk); tsk->state = TASK_UNINTERRUPTIBLE; add_wait_queue_exclusive(&sem->wait, &wait);

spin_lock_irq(&semaphore_lock); sem->sleepers++; /* 抵消原子指令中无条件减 1的影响 */ for (;;) {

int sleepers = sem->sleepers; /* 相当于先赋值: count = count + sleepers, 然后使count无条件减 1 */ if (!atomic_add_negative(sleepers - 1, &sem->count)) {

sem->sleepers = 0;

break; } sem->sleepers = 1; spin_unlock_irq(&semaphore_lock); schedule(); tsk->state = TASK_UNINTERRUPTIBLE; spin_lock_irq(&semaphore_lock);

} spin_unlock_irq(&semaphore_lock); remove_wait_queue(&sem->wait, &wait); tsk->state = TASK_RUNNING; wake_up(&sem->wait);

}以上实现对应linux2.4版本信号量实现。

性能分析:对比上一个实现,本实现减少了两条原子指令,但在进程不能进入临界区睡眠时,并没有及时减1,而是靠下一个试图进入临界区的进程将 1补上。这样也会引发一些性能问题,考虑这样的情形:当前一个进程睡眠后,up出现,唤醒睡眠

Generated by Foxit PDF Creator © Foxit Software

http://www.foxitsoftware.com For evaluation only.

队列头的进程A,在进程 A被调度执行之前,如果有新进程 B调用 down试图进入临界区,此时,在上一个实现中,进程 B通过 down的第一条指令,就会马上进入临界区。而在本实现中,进程 B需要进入 __down的for循环,通过原子指令 atomic_add_negative的判断后,才能进入临界区,进程B就会多执行一些指令。但要注意的是: up出现后,进程B只有在特定时间段内出现,才会出现上述问题。但由于进程调用 down试图进入临界区的时机完全是随机的,所以进程B在特定时间段内出现的概率几乎为 0,而原子指令的减少所带来的程序性能优化,则是完全确定的。因此,整体来看,上述实现仍然提高了程序性能。

6 去掉全局自旋锁

在上述实现中,我们用来保护原子块的自旋锁是一个全局自旋锁。也就是说,不同的信号量使用的是完全相同的一个锁,这会使得不同的信号量之间也会存在竞争。如果希望每个信号量都使用一个特定的自旋锁,最简单的方法是在信号量结构中增加一个字段lock作为自旋锁,则原实现中所有的 semaphore_lock都可替换成sem.lock。

假设原实现中所有的semaphore_lock都已替换成 sem.lock。我们注意到,在 add_wait_queue_exclusive和remove_wait_queue函数中,因为要对等待队列进行修改操作,必须先获取sem->wait结构中用来保护等待队列的自旋锁 lock,修改完睡眠队列,然后释放自旋锁;在 wake_up(&sem->wait)函数中要扫描等待队列,也必须先获取wait.lcok自旋锁。所以,在实现中两个自旋锁的使用是息息相关的:总是先释放一个自旋锁,然后马上去获取另外一个自旋锁。还要注意到一点:对这两个自旋锁的使用只存在于__down与__up之中,没有其他内核函数会使用它们。(当然, __down_interuptible也会使用这两个自旋锁,但它本质上与 _down在并发处理方式上是完全一致的,只要把__down考虑清楚了,它自然就没有问题了),因此,对这两个自旋锁的竞争只存在于__down与__down,__down与__up,__up与__up之间。

因此,我们有一个直观的想法:能否在 __down中将这两个自旋锁合并?使其所保护的原子指令块合二为一。因为wait.lock是与特定信号量相关的局部自旋锁,如果合并成wait.lock,还能避免引入 sem.lock而增加信号量空间。我们先分析一下,如果合并成wait.lock,上述实现中的代码应如何变化?先将 sem.lock简单地替换为 wait.lock。于是在__down中就会出现刚释放 wait.lock,紧接着又去获取 wait.lock的情形,这是一种无谓的锁争用,需要我们引入 add_wait_queue_exclusive,remove_wait_queue和wake_up的locked版本:他们都假设相应函数在调用之前,已经获取自旋锁,因此它们本身都不带有获取自旋锁的操作。

__down可被改写为:

void __down(struct semaphore * sem)

Generated by Foxit PDF Creator © Foxit Software

http://www.foxitsoftware.com For evaluation only.

{ struct task_struct *tsk = current; DECLARE_WAITQUEUE(wait, tsk); tsk->state = TASK_UNINTERRUPTIBLE; spin_lock_irq(&sem->wait.lcok);

/*不带获取等待队列自旋锁操作 */ add_wait_queue_exclusive_locked(&sem->wait, &wait); sem->sleepers++; for (;;) {

int sleepers = sem->sleepers;

if (!atomic_add_negative(sleepers - 1, &sem->count)) { sem->sleepers = 0; break;

} sem->sleepers = 1; spin_unlock_irq(&sem->wait.lock); schedule(); tsk->state = TASK_UNINTERRUPTIBLE; spin_lock_irq(&sem->wait.lock);

} /*不带获取等待队列自旋锁操作 */ remove_wait_queue_locked(&sem->wait, &wait); wake_up_locked(&sem->wait); /* 不带获取等待队列自旋锁操作 */ spin_unlock_irq(&sem->wait.lock); tsk->state = TASK_RUNNING;

}

以上实现对应linux2.6版本中在 2.6.16之前的信号量实现。

我们接着分析上述修改对自旋锁竞争的影响。这种修改不会对__up之间竞争自旋锁有任何影响,我们只用考虑对__down与__down之间, __down与__up之间竞争自旋锁的影响。

先考虑__down与__down竞争自旋锁的情形。使用两个自旋锁时, __down在每次尝试进入临界区时至少需获取两次自旋锁(只在睡眠醒来又不能进入临界区时,只获取一次自旋锁),现在改成了一个自旋锁,在所有情形下,只需获取一次自旋锁即可完成尝试进入临界区的功能,因此,减少了 __down与__down竞争自旋锁的频率。

再者,需要注意的是:有两个自旋锁时, __down中两个自旋锁各自保护的原子块区域,比只有一个自旋锁时所保护的原子块区域要小,因此 __down之间竞争某一个锁时,时间可能会减少,但 __down只有依次获得这两个锁,执行完这两个原子块区域才能尝试进入临界区,所以的总的执行时间不可能减少。因此,整体来看,改成一个自旋锁wait.lock,提高了 __down之间竞争自旋锁的性能。

Generated by Foxit PDF Creator © Foxit Software

http://www.foxitsoftware.com For evaluation only.

再考虑__down与__up竞争自旋锁的情形。我们以进程能进入临界区的情形为例来说明,其他情形是一样的。

在使用两个自旋锁时,__down在进入临界区时候,会执行 add_wait_queue_exclusive, remove_wait_queue,wake_up三个函数,它们执行时都必须先获取 wait.lock自旋锁,因此都可能与__up形成三次竞争。

在只使用wait.lock时, __down在执行过程中,只须获取一次 wait.lock自旋锁,也就只有一次可能与__up形成竞争。我们要注意到一点,虽然减少了可能的竞争次数,但如果__down中wait.lcok保护的原子块执行时间明显增长,就会使得 __up等待自旋锁的时间明显增长,还是会影响系统性能。所以,我们考察一下 __down中原子块的内容:

1 add_wait_queue_exclusive_locked 2对count的修改判断操作 3 remove_wait_queue_locked 4 wake_up_locked而add_wait_queue_exclusive_locked,remove_wait_queue_locked两个函数都是很简单的一个链表增/删操作,“对 count的处理”也是很简单的修改判断操作,只有 wake_up_locked比较复杂:它读取睡眠队列头元素,如被唤醒进程不在运行队列中,获取运行队列自旋锁,将其加入运行队列,并尝试在其他 cpu上重新调度它。因此,wait.lock自旋锁所保护的原子块的大小只相当于比 wake_up_locked有很小增长,加上获取 wait.lock自旋锁的时间,其执行时间只比 wake_up函数执行时间有很小的增长。因此,__down与__up的竞争类似于 wake_up函数与 __up的竞争,显然,它比使用两个自旋锁时有效得多。

综上所述,去掉全局自旋锁,只用一个wait.lock,提高了系统性能。

还有一个类似于步骤4的小问题:能否将 __down结尾处的wake_up_locked调用改成条件判断,即:

if (atomic_read(&sem->count) > 0) wake_up_locked(&sem->wait);我感觉似乎是可行的,因为整个判断都处于 wait.lock的保护之下。但内核不这么写,问题在哪呢?

四通用信号量

在信号量的设计中,其 count字段可初始化为任何非负值。在信号量的实际使用中,其初值一般都为1,即作为互斥信号量来使用。在 2.6.16中,内核引入了 mutex,其功

Generated by Foxit PDF Creator © Foxit Software

http://www.foxitsoftware.com For evaluation only.

能相当于count初值为 1的信号量,因为不用考虑多个进程同时进入临界区的情形,就可更加优化代码的性能。对性能敏感的内核代码如果要使用信号量,一般都会使用mutex,因而通常的信号量实现就不必过于苛求性能。

我们在上面的讨论中,都是从优化性能的角度来改进信号量的设计 /实现,例如通过原子指令直接判断能否进入临界区,进程睡眠时对count的增加,sleepers字段的引入等等。对性能的关注,还使得不同 cpu平台中的信号量实现代码会有很大不同。(我们上面讨论的信号量实现,其实是基于 x86平台的)比喻说,不同的平台,对原子指令的支持就不一样,有的平台就无法支持__down中的原子指令 atomic_add_negative的实现。如果不过于关注性能,就无须使用原子指令,也无须根据特定平台对代码进行优化,那么,信号量代码就能统一,形成通用的信号量实现。

通用的信号量实现,关键就在于对 count字段的处理。由于不能使用原子指令,在任何时候读取或修改count时,都需要放在自旋锁 wait.lock的保护之下。按照这个思路,我们考虑如此使用count和等待队列:当进程能进入临界区,我们对 count减1。当进程不能进入临界区,不修改 count,只是将进程加入睡眠队列。当调用 up时,如果睡眠队列为空,对 count增1,否则,不修改 count,只是唤醒睡眠队列头的进程。这样, count永远为非负值,而且,进程永远按照 “先调用 down先进入临界区 ”的公平原则顺序进入临界区。

因此,信号量的实现可是如下形式:

void down(struct semaphore *sem) {

unsigned long flags;

spin_lock_irq(&sem->wait.lock); if (sem->count > 0)) sem->count--; else __down(sem); spin_unlock_irq(&sem->wait.lock); }

static inline int __down(struct semaphore *sem)

{

struct task_struct *task = current;

DECLARE_WAITQUEUE(wait, tsk);

tsk->state = TASK_UNINTERRUPTIBLE;

add_wait_queue_exclusive_locked(&sem->wait.lock, &wait);

schedule; }

void up(struct semaphore *sem)

Generated by Foxit PDF Creator © Foxit Software

http://www.foxitsoftware.com For evaluation only.

{

spin_lock_irq(&sem->wait.lock);

if (list_empty(&sem->wait.list))

sem->count++; else __up(sem); spin_unlock_irq(&sem->wait.lock); }

static void __up(struct semaphore *sem)

{

/*获取等待队列头元素 */

struct wait_queue_t *wait = list_entry(&sem->wait.task_list,struct wait_queue_t ,

task_list);

remove_wait_queue_locked(&sem->wait, &wait);

wake_up_locked(&sem->wait);

tsk->state = TASK_RUNNING; }

以上实现对应linux2.6版本中在 2.6.16之后的信号量实现(只是思路一致,具体形式有区别。因为内核的实现将down,down_interruptible,down_trylock,down_timeout 的代码进行了整合)它大大简化了以前的实现。

从某种意义上来说,好象终点回到了起点,与刚开始信号量代码实现思路有很大相似。

六总结

我们从最直观的设计/实现出发,一步步提高性能,一步步发现问题,一步步解决问题,导出了 LINUX多个版本内核信号量的实现逻辑。从中可以看出,虽然内核信号量的设计非常简单,但其实现却需要考虑到很多问题,要做出一个较为完美的实现,相当不易!

值得一提的是,严格来说,down的设计/实现还应该考虑与 down_interruptible, down_trylock的并发情形。但由于 down_interruptible除了信号处理之外,与 down的实现逻辑完全一致,而down_trylock非常简单,可看作 down实现逻辑的子集。因此,我们完全可先抛开这两个函数,只考虑down,up等的并发情形,实现 down和up。然后,仿照down,实现 down_interruptible,down_trylock。再检查一下,这两个函数与down和up是否会有意料之外的竟态条件。限于篇幅,就不再阐释了。

Generated by Foxit PDF Creator © Foxit Software

http://www.foxitsoftware.com For evaluation only.

七参考文献

1 LINUX内核代码 2.2.14,2.4.18,2.6.11,2.6.26 2《Understanding the Linux Kernel 2nd,3rd》 3《Linux Device Driver 2nd》

GitHub 加速计划 / li / linux-dash
10.39 K
1.2 K
下载
A beautiful web dashboard for Linux
最近提交(Master分支:2 个月前 )
186a802e added ecosystem file for PM2 4 年前
5def40a3 Add host customization support for the NodeJS version 4 年前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐