linux内核的等待队列是在内核中运用非常广泛的数据结构,它是以双循环链表为基础的数据结构,与进程的休眠---唤醒机制紧密相连,可以用来同步对系统资源的访问、异步事件通知、跨进程通信等。

        假设进程A想要获取某资源(读网卡数据),但是此时资源没有准备好(网卡还未接收到数据),这时内核必须切换到其他进程进行,直到资源准备好再唤醒该进程。

1、等待队列头

struct wait_queue_head {
	spinlock_t		lock;  //用于互斥访问的自旋锁
	struct list_head	head;
};
typedef struct wait_queue_head wait_queue_head_t;

可通过宏DECLARE_WAIT_QUEUE_HEAD(name)动态创建或者函数init_waitqueue_head(&name)创建类型为wait_queue_head_t的等待队列头name。

//静态创建
#define DECLARE_WAIT_QUEUE_HEAD(name) \
	struct wait_queue_head name = __WAIT_QUEUE_HEAD_INITIALIZER(name)

#define __WAIT_QUEUE_HEAD_INITIALIZER(name) {					\
	.lock		= __SPIN_LOCK_UNLOCKED(name.lock),			\
	.head		= { &(name).head, &(name).head } }


//动态创建
#define init_waitqueue_head(wq_head)						\
	do {									\
		static struct lock_class_key __key;				\
										\
		__init_waitqueue_head((wq_head), #wq_head, &__key);		\
	} while (0)

void __init_waitqueue_head(struct wait_queue_head *wq_head, const char *name, struct lock_class_key *key)
{
        spin_lock_init(&wq_head->lock);
        lockdep_set_class_and_name(&wq_head->lock, key, name);
        INIT_LIST_HEAD(&wq_head->head);
}

2、等待队列元素

struct wait_queue_entry {
	unsigned int		flags;
	void			*private;  //指向等待队列的进程task_struct
	wait_queue_func_t	func;  //唤醒函数
	struct list_head	entry;  //链表元素,将wait_queue_entry 挂到wait_queue_head_t
};

类似的,队列元素的创建也类似:DECLARE_WAITQUEUE(name, task) 定义一个名为name的等待队列元素,或者使用init_waitqueue_entry(&name, tsk)动态创建

//静态创建宏
#define DECLARE_WAITQUEUE(name, tsk)						\
	struct wait_queue_entry name = __WAITQUEUE_INITIALIZER(name, tsk)

#define __WAITQUEUE_INITIALIZER(name, tsk) {					\
	.private	= tsk,							\
	.func		= default_wake_function,				\
	.entry		= { NULL, NULL } }


//动态创建
static inline void init_waitqueue_entry(struct wait_queue_entry *wq_entry, 
                                            struct task_struct *p)
{
	wq_entry->flags		= 0;
	wq_entry->private	= p;
	wq_entry->func		= default_wake_function;
}

//也可以使用init_waitqueue_func_entry函数来初始化为自定义的唤醒函数
static inline void
init_waitqueue_func_entry(struct wait_queue_entry *wq_entry, wait_queue_func_t func)
{
	wq_entry->flags		= 0;
	wq_entry->private	= NULL;
	wq_entry->func		= func;
}

3、添加移除等待队列

内核提供了几个函数将元素添加删除至等待队列,实现位于kernel/sched/wait.c

add_wait_queue():队列添加非独占普通等待队列(flag清除WQ_FLAG_EXCLUSIVE标志)

add_wait_queue_exclusive():队列尾部添加独占等待队列(flag设置WQ_FLAG_EXCLUSIVE标志)

remove_wait_queue():移除元素

void add_wait_queue(struct wait_queue_head *wq_head, struct wait_queue_entry *wq_entry)
{
	unsigned long flags;

	wq_entry->flags &= ~WQ_FLAG_EXCLUSIVE;
	spin_lock_irqsave(&wq_head->lock, flags);
	__add_wait_queue_entry_tail(wq_head, wq_entry);
	spin_unlock_irqrestore(&wq_head->lock, flags);
}
EXPORT_SYMBOL(add_wait_queue);

void add_wait_queue_exclusive(struct wait_queue_head *wq_head, struct wait_queue_entry *wq_entry)
{
	unsigned long flags;

	wq_entry->flags |= WQ_FLAG_EXCLUSIVE;
	spin_lock_irqsave(&wq_head->lock, flags);
	__add_wait_queue_entry_tail(wq_head, wq_entry);
	spin_unlock_irqrestore(&wq_head->lock, flags);
}
EXPORT_SYMBOL(add_wait_queue_exclusive);

//移除
void remove_wait_queue(struct wait_queue_head *wq_head, struct wait_queue_entry *wq_entry)
{
	unsigned long flags;

	spin_lock_irqsave(&wq_head->lock, flags);
	__remove_wait_queue(wq_head, wq_entry);
	spin_unlock_irqrestore(&wq_head->lock, flags);
}
EXPORT_SYMBOL(remove_wait_queue);


/
static inline void __add_wait_queue_entry_tail(struct wait_queue_head *wq_head, struct wait_queue_entry *wq_entry)
{
	list_add_tail(&wq_entry->entry, &wq_head->head); //添加到队列头部
}

static inline void
__remove_wait_queue(struct wait_queue_head *wq_head, struct wait_queue_entry *wq_entry)
{
	list_del(&wq_entry->entry);
}

示意图如下:

4、进程 休眠——唤醒

把进程(task_struct)添加到等待队列后,就可以休眠该进程,让出cpu给其他进程运行,内核提供wait_event宏和它的几个变种来实现进程休眠,直到condition成立。

wq_head为等待队列头,condition是一个bool表达式,

wait_event(wq_head, condition)  //非中断休眠
wait_event_timeout(wq_head, condition, timeout)  //同上,另外进程等待限定时间返回不论                                                                                                condition是否成立
wait_event_interruptible(wq_head, condition) //进程可以被信号打断
wait_event_interruptible_timeout(wq_head, condition, timeout)   //类似上面
io_wait_event(wq_head, condition)

#define wait_event(wq_head, condition)						\
do {										\
	might_sleep();								\
	if (condition)								\
		break;								\
	__wait_event(wq_head, condition);					\
} while (0)


#define __wait_event(wq_head, condition)					\
	(void)___wait_event(wq_head, condition, 
                        TASK_UNINTERRUPTIBLE,  \  //带interruptible为TASK_INTERRUPTIBLE
                        0,               \  //
                        0,	\  //timeout
			    schedule())


/* 定义等待队列元素,并将元素加入到等待队列中
 * 循环判断等待条件condition是否满足,若条件满足,或者接收到中断信号,等待结束,函数返回
 * 若condition满足,返回0;否则返回-ERESTARTSYS
 */
#define ___wait_event(wq_head, condition, state, exclusive, ret, cmd)		\
({										\
	__label__ __out;							\
	struct wait_queue_entry __wq_entry;					\
	long __ret = ret;	/* explicit shadow */				\
			
    // 初始化等待队列元素__wq_entry,关联当前进程,根据exclusive参数初始化属性标志 
    // 唤醒函数为autoremove_wake_function()    							
	init_wait_entry(&__wq_entry, exclusive ? WQ_FLAG_EXCLUSIVE : 0);	\
    // 等待事件循环  
	for (;;) {								\
		//当检测进程是否有待处理信号则返回值__int不为0 
		long __int = prepare_to_wait_event(&wq_head, &__wq_entry, state);\
			                                                                    \
        // 当前进程让出调度器前,判断condition是否成立。若成立,提前结束,后续将返回0 
		if (condition)							\
			break;							\
										\
        // 当前进程让出调度器前,判断当前进程是否接收到中断信号(或KILL信号)       
        // 如果成立,将提前返回-ERESTARTSYS   
		if (___wait_is_interruptible(state) && __int) {			\ 
			__ret = __int;						\
			goto __out;						\
		}								\
										\
        // 此处实际执行schedule(),当前进程让出调度器 
		cmd;								\
	}									\
    // 设置进程为可运行状态,并且将等待队列元素从等待队列中删除    
	finish_wait(&wq_head, &__wq_entry);					\
__out:	__ret;									\
})


void init_wait_entry(struct wait_queue_entry *wq_entry, int flags)
{
	wq_entry->flags = flags;
	wq_entry->private = current;
	wq_entry->func = autoremove_wake_function;
	INIT_LIST_HEAD(&wq_entry->entry);
}
EXPORT_SYMBOL(init_wait_entry);

//防止wait没有在队列中,但是事件已经产生导致无限等待
long prepare_to_wait_event(struct wait_queue_head *wq_head, 
                            struct wait_queue_entry *wq_entry, int state)
{
	unsigned long flags;
	long ret = 0;

	spin_lock_irqsave(&wq_head->lock, flags);
    // 返回非0值条件:可被信号中断并且确实有信号挂起
	if (unlikely(signal_pending_state(state, current))) {
        // 将等待队列元素从等待队列中删除,返回-ERESTARTSYS
		list_del_init(&wq_entry->entry);
		ret = -ERESTARTSYS;
	} else {
        // 判断wq_entry->entry是否为空,即等待队列元素是否已经被添加到等待队列中
		if (list_empty(&wq_entry->entry)) {
            // WQ_FLAG_EXCLUSIVE标志被设置时,将等待队列元素添加到等待队列尾部(独占等待)
            // 否则,将等待队列元素添加到等待队列头部。同2.1中对WQ_FLAG_EXCLUSIVE标志介绍。
			if (wq_entry->flags & WQ_FLAG_EXCLUSIVE)
				__add_wait_queue_entry_tail(wq_head, wq_entry);
			else
				__add_wait_queue(wq_head, wq_entry);
		}
        // 改变当前进程的状态
		set_current_state(state);
	}
	spin_unlock_irqrestore(&wq_head->lock, flags);

	return ret;
}
EXPORT_SYMBOL(prepare_to_wait_event);

 用state_value改变当前的进程状态
#define set_current_state(state_value)				\
	do {							\
		current->task_state_change = _THIS_IP_;		\
		smp_store_mb(current->state, (state_value));	\
	} while (0)


/*  设置进程为可运行状态,并且将等待队列元素从等待队列中删除  */
void finish_wait(struct wait_queue_head *wq_head, struct wait_queue_entry *wq_entry)
{
	unsigned long flags;

    // 将当前进程状态改为可运行状态(TASK_RUNNING)
    // 类似set_current_state(),差别在于未进行内存屏障
	__set_current_state(TASK_RUNNING);

    // 等待队列元素若仍在等待队列中,则将其删除
	if (!list_empty_careful(&wq_entry->entry)) {
		spin_lock_irqsave(&wq_head->lock, flags);
		list_del_init(&wq_entry->entry);
		spin_unlock_irqrestore(&wq_head->lock, flags);
	}
}
EXPORT_SYMBOL(finish_wait);

简单总结下进程进入休眠的步骤:

1、使用add_wait_queue将当前进程关联的等待队列元素添加到等待队列

2、set_current_state设置中断状态

3、判断资源是否拿到,或是否捕捉到中断信号

4、没拿到进程让出调度器,调用schedule()进入休眠状态

5、资源得到满足,将等待队列元素从等待队列删除

唤醒等待队列

当资源准备好后,就可以唤醒等待队列中的进程,内核通过wake_up()和它的几个变种来唤醒等待队列中的进程

wake_up(&wq_head)  //唤醒等待队列上的所有进程
wake_up_interruptible(&wq_head)  //只唤醒哪些执行可中断睡眠的进程
wake_up_nr(&wq_head, nr) //唤醒给定数目的独占等待进程
wake_up_interruptible_nr(&wq_head, nr)
wake_up_interruptible_all(&wq_head)

#define TASK_NORMAL         (TASK_INTERRUPTIBLE | TASK_UNINTERRUPTIBLE)
//可知TASK_NORMAL唤醒TASK_INTERRUPTIBLE 和 TASK_UNINTERRUPTIBLE的所有进程

#define wake_up(x)			        __wake_up(x, TASK_NORMAL, 1, NULL)
#define wake_up_nr(x, nr)	        __wake_up(x, TASK_NORMAL, nr, NULL)
#define wake_up_interruptible(x)	__wake_up(x, TASK_INTERRUPTIBLE, 1, NULL)

void __wake_up(struct wait_queue_head *wq_head, unsigned int mode,
			int nr_exclusive, void *key)
{
	unsigned long flags;

	spin_lock_irqsave(&wq_head->lock, flags);
	__wake_up_common(wq_head, mode, nr_exclusive, 0, key);
	spin_unlock_irqrestore(&wq_head->lock, flags);
}
EXPORT_SYMBOL(__wake_up);


static void __wake_up_common(struct wait_queue_head *wq_head, unsigned int mode,
			int nr_exclusive, int wake_flags, void *key)
{
	wait_queue_entry_t *curr, *next;

     在等待队列头指向的链表上,从curr指向的元素开始依次遍历元素
	list_for_each_entry_safe(curr, next, &wq_head->head, entry) {
		unsigned flags = curr->flags;

        // 调用等待队列元素绑定的唤醒回调函数
        // 注意,具体唤醒何种进程(TASK_INTERRUPTIBLE/TASK_UNINTERRUPTIBLE),作为参数传递给唤        
        // 醒函数处理
        // 当进程不符合唤醒条件时,ret为0,详见try_to_wake_up()
		int ret = curr->func(curr, mode, wake_flags, key);
		if (ret < 0)
			break;

        // 如果当前等待队列元素为独占等待,并且独占等待个数已经等于nr_exclusive,提前退出循环
        // 独占等待进程被加入到等待队列的尾部,因此此时表明所有唤醒工作已经完成
		if (ret && (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
			break;
	}
}

wake_up会编译等待队列上的所有元素,最终会调用等待队列元素所绑定的唤醒函数

DECLARE_WAITQUEUE(name, tsk)使用default_wake_function()

init_wait_entry(&name,flag)中使用autoremove_wake_function()

default_wake_function

int default_wake_function(wait_queue_entry_t *curr, unsigned mode, int wake_flags,
			  void *key)
{
	return try_to_wake_up(curr->private, mode, wake_flags);
}
EXPORT_SYMBOL(default_wake_function);

static int
try_to_wake_up(struct task_struct *p, unsigned int state, int wake_flags)
{
	unsigned long flags;
	int cpu, success = 0;

	smp_mb__before_spinlock();
	raw_spin_lock_irqsave(&p->pi_lock, flags);

    // 此处对进程的状态进行筛选,跳过不符合状态的进程(TASK_INTERRUPTIBLE/TASK_UNINTERRUPTIBLE)
	if (!(p->state & state))
		goto out;

	trace_sched_waking(p);

	/* We're going to change ->state: */
	success = 1;
	cpu = task_cpu(p);

	smp_rmb();
	if (p->on_rq && ttwu_remote(p, wake_flags)) //当前进程已处于rq运行队列,则无需唤醒
		goto stat;

...

	ttwu_queue(p, cpu, wake_flags);
stat:
	ttwu_stat(p, cpu, wake_flags);
out:
	raw_spin_unlock_irqrestore(&p->pi_lock, flags);

	return success;
}


static void ttwu_queue_remote(struct task_struct *p, int cpu, int wake_flags)
{
	struct rq *rq = cpu_rq(cpu);  // 获取当前进程的运行队列

	p->sched_remote_wakeup = !!(wake_flags & WF_MIGRATED);

	if (llist_add(&p->wake_entry, &cpu_rq(cpu)->wake_list)) {
		if (!set_nr_if_polling(rq->idle))
			smp_send_reschedule(cpu);
		else
			trace_sched_wake_idle_without_ipi(cpu);
	}
}
...
default_wake_function函数调用顺序为:

default_wake_function() --> try_to_wake_up() --> ttwu_queue() --> ttwu_do_activate() --> ttwu_do_wakeup()

autoremove_wake_function

int autoremove_wake_function(struct wait_queue_entry *wq_entry, unsigned mode, int sync, void *key)
{
	int ret = default_wake_function(wq_entry, mode, sync, key);

	if (ret)
		list_del_init(&wq_entry->entry);
	return ret;
}
EXPORT_SYMBOL(autoremove_wake_function);

int default_wake_function(wait_queue_entry_t *curr, unsigned mode, int wake_flags,
			  void *key)
{
	return try_to_wake_up(curr->private, mode, wake_flags);
}
EXPORT_SYMBOL(default_wake_function);

可以看到autoremove_wake_function相比default_wake_function,在成功执行唤醒工作后,会自动将等待队列元素从等待队列中移除,所以使用default_wake_function()时不能忘记将元素移除。

GitHub 加速计划 / li / linux-dash
6
1
下载
A beautiful web dashboard for Linux
最近提交(Master分支:4 个月前 )
186a802e added ecosystem file for PM2 4 年前
5def40a3 Add host customization support for the NodeJS version 4 年前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐