互斥锁(mutex)

在信号量最后的部分说,当count=1的时候可以用信号量实现互斥。在早期的Linux版本中就是当count=1来实现mutex的。
在2.6.11版本中,如下:
typedef struct semaphore	mutex_t;

#define mutex_init(lock, type, name)		sema_init(lock, 1)

但是在最新的内核3.18左右, 内核重新定义了一个新的数据结构 struct mutex, 将其称为互斥锁或者互斥体。同时对信号量的DOWN和UP操作针对struct mutex做了修改。

互斥锁的定义和初始化

因为struct mutex的定义中有一些调试相关的成员,在这里去掉调试信息。
/*
 * Simple, straightforward mutexes with strict semantics:
 *
 * - only one task can hold the mutex at a time
 * - only the owner can unlock the mutex
 * - multiple unlocks are not permitted
 * - recursive locking is not permitted
 * - a mutex object must be initialized via the API
 * - a mutex object must not be initialized via memset or copying
 * - task may not exit with mutex held
 * - memory areas where held locks reside must not be freed
 * - held mutexes must not be reinitialized
 * - mutexes may not be used in hardware or software interrupt
 *   contexts such as tasklets and timers
 *
 * These semantics are fully enforced when DEBUG_MUTEXES is
 * enabled. Furthermore, besides enforcing the above rules, the mutex
 * debugging code also implements a number of additional features
 * that make lock debugging easier and faster:
 *
 * - uses symbolic names of mutexes, whenever they are printed in debug output
 * - point-of-acquire tracking, symbolic lookup of function names
 * - list of all locks held in the system, printout of them
 * - owner tracking
 * - detects self-recursing locks and prints out all relevant info
 * - detects multi-task circular deadlocks and prints out all affected
 *   locks and tasks (and only those tasks)
 */
struct mutex {
	/* 1: unlocked, 0: locked, negative: locked, possible waiters */
	atomic_t		count;
	spinlock_t		wait_lock;
	struct list_head	wait_list;
};
这里必须要对注释详细说明一下,其中说明一些mutex的严格语法信息:
a.  在同一时刻只能有一个task获得互斥锁
b.  只有锁的获得者才能有资格释放锁
c.  多处释放锁是不允许的
d.  递归获取锁是不允许的
e.  互斥锁必须使用系统的API初始化,不允许直接操作使用memset/memcpy
f.   获得锁的task是不允许退出
g.  持有锁驻留的内存区域不能被释放
h. 互斥锁不能用于中断上下文中, spin_lock是可以用于中断上下文的 。

再解释下struct mutex成员的含义:
count:        count是一个原子变量,(关于原子变量不懂的,可以看前面的原子变量文章)。 当count=1代表资源可用,等于0代表资源不可用,加锁状态。 负值代表有等待者。
wait_lock: 是一个自旋锁变量, 用于对wait_list的操作变为原子变量
wait_list  : 用于管理那些在获取mutex的进程,在无法获取互斥锁的时候,进入wait_List睡眠。

是不是和semaphore一样。 既然一样,互斥锁的定义和初始化也不能直接操作,必须使用系统提供的API:

定义一个静态的struct mutex变量的同时初始化的方法:
#define __MUTEX_INITIALIZER(lockname) \
		{ .count = ATOMIC_INIT(1) \
		, .wait_lock = __SPIN_LOCK_UNLOCKED(lockname.wait_lock) \
		, .wait_list = LIST_HEAD_INIT(lockname.wait_list) \
		__DEBUG_MUTEX_INITIALIZER(lockname) \
		__DEP_MAP_MUTEX_INITIALIZER(lockname) }

#define DEFINE_MUTEX(mutexname) \
	struct mutex mutexname = __MUTEX_INITIALIZER(mutexname)

如果需要初始化一个互斥锁,则可以使用mutex_init宏:
/**
 * mutex_init - initialize the mutex
 * @mutex: the mutex to be initialized
 *
 * Initialize the mutex to unlocked state.
 *
 * It is not allowed to initialize an already locked mutex.
 */
# define mutex_init(mutex) \
do {							\
	static struct lock_class_key __key;		\
							\
	__mutex_init((mutex), #mutex, &__key);		\
} while (0)

void __mutex_init(struct mutex *lock, const char *name, struct lock_class_key *key)
{
	atomic_set(&lock->count, 1);
	spin_lock_init(&lock->wait_lock);
	INIT_LIST_HEAD(&lock->wait_list);
	mutex_clear_owner(lock);
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
	osq_lock_init(&lock->osq);
#endif

	debug_mutex_init(lock, name, key);
}
上面的两种初始化,效果最后都是一样。将count初始化为1, 将wait_lock设置为unlock状态,初始化wait_list链表头。

互斥锁的DOWN操作

互斥锁的DOWN操作在linux内核中定义为mutex_lock函数,如下:
/**
 * mutex_lock - acquire the mutex
 * @lock: the mutex to be acquired
 *
 * Lock the mutex exclusively for this task. If the mutex is not
 * available right now, it will sleep until it can get it.
 *
 * The mutex must later on be released by the same task that
 * acquired it. Recursive locking is not allowed. The task
 * may not exit without first unlocking the mutex. Also, kernel
 * memory where the mutex resides mutex must not be freed with
 * the mutex still locked. The mutex must first be initialized
 * (or statically defined) before it can be locked. memset()-ing
 * the mutex to 0 is not allowed.
 *
 * ( The CONFIG_DEBUG_MUTEXES .config option turns on debugging
 *   checks that will enforce the restrictions and will also do
 *   deadlock debugging. )
 *
 * This function is similar to (but not equivalent to) down().
 */
void __sched mutex_lock(struct mutex *lock)
{
	might_sleep();
	/*
	 * The locking fastpath is the 1->0 transition from
	 * 'unlocked' into 'locked' state.
	 */
	__mutex_fastpath_lock(&lock->count, __mutex_lock_slowpath);
	mutex_set_owner(lock);
}

mutex_lock是用来获得互斥锁,如果不能立刻获得互斥锁,进程将睡眠直到获得锁为止。

#ifdef CONFIG_DEBUG_ATOMIC_SLEEP
  void __might_sleep(const char *file, int line, int preempt_offset);
/**
 * might_sleep - annotation for functions that can sleep
 *
 * this macro will print a stack trace if it is executed in an atomic
 * context (spinlock, irq-handler, ...).
 *
 * This is a useful debugging help to be able to catch problems early and not
 * be bitten later when the calling function happens to sleep when it is not
 * supposed to.
 */
# define might_sleep() \
	do { __might_sleep(__FILE__, __LINE__, 0); might_resched(); } while (0)
#else
  static inline void __might_sleep(const char *file, int line,
				   int preempt_offset) { }
# define might_sleep() do { might_resched(); } while (0)
#endif
might_sleep用于调试使用,可以很好的判断是否在原子上下文中是否睡眠,只有当CONFIG_DEBUG_ATOMIC_SLEEP宏开启的时候有效。

/*
 * The locking fastpath is the 1->0 transition from 'unlocked' into 'locked' state.
 */
__mutex_fastpath_lock(&lock->count, __mutex_lock_slowpath);

__mutex_fastpath_lock函数用于锁的状态从unlocked转为locked。函数的设计思想体现在__mutex_fastpath_lock和__mutex_lock_slowpath两条主线上。__mutex_fastpath_lock用于快速判断是否可以获得互斥锁,如果成功获得,就直接返回。否则就进入到__mutex_lock_slowpath函数中。这样设计是基于一个事实,就是获得某一个互斥锁绝大多数是可以获得的。也可一说进入到__mutex_lock_slowpath的几率很低。

/**
 *  __mutex_fastpath_lock - try to take the lock by moving the count
 *                          from 1 to a 0 value
 *  @count: pointer of type atomic_t
 *  @fail_fn: function to call if the original value was not 1
 *
 * Change the count from 1 to a value lower than 1, and call <fail_fn> if
 * it wasn't 1 originally. This function MUST leave the value lower than
 * 1 even when the "1" assertion wasn't true.
 */
static inline void
__mutex_fastpath_lock(atomic_t *count, void (*fail_fn)(atomic_t *))
{
	if (unlikely(atomic_dec_return(count) < 0))
		fail_fn(count);
}
该函数是给count执行减1的操作,如果执行失败就执行slowpath函数。此函数减1的操作是调用的原子操作中的dec函数执行的,详细可见原子操作小节。

__visible void __sched __mutex_lock_slowpath(atomic_t *lock_count)
{
	struct mutex *lock = container_of(lock_count, struct mutex, count);

	__mutex_lock_common(lock, TASK_UNINTERRUPTIBLE, 0,
			    NULL, _RET_IP_, NULL, 0);
}
调用__mutex_lock_common函数来执行最后的获得互斥锁的旅途。
/*
 * Lock a mutex (possibly interruptible), slowpath:
 */
static __always_inline int __sched __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,
		    struct lockdep_map *nest_lock, unsigned long ip,
		    struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
	struct task_struct *task = current;
	struct mutex_waiter waiter;
	unsigned long flags;
	int ret;

	preempt_disable();
	mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip);

	if (mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx)) {
		/* got the lock, yay! */
		preempt_enable();
		return 0;
	}

	spin_lock_mutex(&lock->wait_lock, flags);

	/*
	 * Once more, try to acquire the lock. Only try-lock the mutex if
	 * it is unlocked to reduce unnecessary xchg() operations.
	 */
	if (!mutex_is_locked(lock) && (atomic_xchg(&lock->count, 0) == 1))
		goto skip_wait;

	debug_mutex_lock_common(lock, &waiter);
	debug_mutex_add_waiter(lock, &waiter, task_thread_info(task));

	/* add waiting tasks to the end of the waitqueue (FIFO): */
	list_add_tail(&waiter.list, &lock->wait_list);
	waiter.task = task;

	lock_contended(&lock->dep_map, ip);

	for (;;) {
		/*
		 * Lets try to take the lock again - this is needed even if
		 * we get here for the first time (shortly after failing to
		 * acquire the lock), to make sure that we get a wakeup once
		 * it's unlocked. Later on, if we sleep, this is the
		 * operation that gives us the lock. We xchg it to -1, so
		 * that when we release the lock, we properly wake up the
		 * other waiters. We only attempt the xchg if the count is
		 * non-negative in order to avoid unnecessary xchg operations:
		 */
		if (atomic_read(&lock->count) >= 0 &&
		    (atomic_xchg(&lock->count, -1) == 1))
			break;

		/*
		 * got a signal? (This code gets eliminated in the
		 * TASK_UNINTERRUPTIBLE case.)
		 */
		if (unlikely(signal_pending_state(state, task))) {
			ret = -EINTR;
			goto err;
		}

		if (use_ww_ctx && ww_ctx->acquired > 0) {
			ret = __mutex_lock_check_stamp(lock, ww_ctx);
			if (ret)
				goto err;
		}

		__set_task_state(task, state);

		/* didn't get the lock, go to sleep: */
		spin_unlock_mutex(&lock->wait_lock, flags);
		schedule_preempt_disabled();
		spin_lock_mutex(&lock->wait_lock, flags);
	}
	mutex_remove_waiter(lock, &waiter, current_thread_info());
	/* set it to 0 if there are no waiters left: */
	if (likely(list_empty(&lock->wait_list)))
		atomic_set(&lock->count, 0);
	debug_mutex_free_waiter(&waiter);

skip_wait:
	/* got the lock - cleanup and rejoice! */
	lock_acquired(&lock->dep_map, ip);
	mutex_set_owner(lock);

	spin_unlock_mutex(&lock->wait_lock, flags);
	preempt_enable();
	return 0;
}

该函数先不是急于将没有获得互斥锁的进程放入到等待队列,而是在放入之前还要看是否锁已经释放的挣扎。这是基于这样的一个事实: 拥有互斥锁的进程应该在尽短的时间内释放锁,如果刚好释放了锁,就不需要进入到等待队列等待了。

挣扎一:
if (mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx)) {
	/* got the lock, yay! */
	preempt_enable();
	return 0;
}

挣扎二:
/*
 * Once more, try to acquire the lock. Only try-lock the mutex if
 * it is unlocked to reduce unnecessary xchg() operations.
 */
if (!mutex_is_locked(lock) && (atomic_xchg(&lock->count, 0) == 1))
	goto skip_wait;

在经过两次挣扎之后,终于选择放弃。将此进程加入到等待队列。
/* add waiting tasks to the end of the waitqueue (FIFO): */
list_add_tail(&waiter.list, &lock->wait_list);
waiter.task = task;

接下来和信号量大体逻辑相似,在一个for循环中将进程设置state,  然后调度出去。 等待互斥锁的UP操作之后,返回。

互斥锁的UP操作

/**
 * mutex_unlock - release the mutex
 * @lock: the mutex to be released
 *
 * Unlock a mutex that has been locked by this task previously.
 *
 * This function must not be used in interrupt context. Unlocking
 * of a not locked mutex is not allowed.
 *
 * This function is similar to (but not equivalent to) up().
 */
void __sched mutex_unlock(struct mutex *lock)
{
	/*
	 * The unlocking fastpath is the 0->1 transition from 'locked'
	 * into 'unlocked' state:
	 */
	__mutex_fastpath_unlock(&lock->count, __mutex_unlock_slowpath);
}

mutex_unlock用于释放一个互斥锁,只有以前获得锁的task才可以释放锁,同时mutex_unlock不能用于中断上写文,因为其可能会睡眠。此函数和up函数很相似,但不等同。
__mutex_fastpath_unlock函数用于锁的状态从"locked"到"unlocked"。

static inline void
__mutex_fastpath_unlock(atomic_t *count, void (*fail_fn)(atomic_t *))
{
	if (unlikely(atomic_inc_return(count) <= 0))
		fail_fn(count);
}
可以看到依旧是调用的是原子操作的inc函数,然后对返回值返回。如果设置失败就调用slowpath函数。

/*
 * Release the lock, slowpath:
 */
__visible void
__mutex_unlock_slowpath(atomic_t *lock_count)
{
	struct mutex *lock = container_of(lock_count, struct mutex, count);

	__mutex_unlock_common_slowpath(lock, 1);
}
调用__mutex_unlock_common_slowpath函数执行唤醒操作。
/*
 * Release the lock, slowpath:
 */
static inline void __mutex_unlock_common_slowpath(struct mutex *lock, int nested)
{
	unsigned long flags;

	/*
	 * As a performance measurement, release the lock before doing other
	 * wakeup related duties to follow. This allows other tasks to acquire
	 * the lock sooner, while still handling cleanups in past unlock calls.
	 * This can be done as we do not enforce strict equivalence between the
	 * mutex counter and wait_list.
	 *
	 *
	 * Some architectures leave the lock unlocked in the fastpath failure
	 * case, others need to leave it locked. In the later case we have to
	 * unlock it here - as the lock counter is currently 0 or negative.
	 */
	if (__mutex_slowpath_needs_to_unlock())
		atomic_set(&lock->count, 1);

	spin_lock_mutex(&lock->wait_lock, flags);
	mutex_release(&lock->dep_map, nested, _RET_IP_);
	debug_mutex_unlock(lock);

	if (!list_empty(&lock->wait_list)) {
		/* get the first entry from the wait-list: */
		struct mutex_waiter *waiter =
				list_entry(lock->wait_list.next,
					   struct mutex_waiter, list);

		debug_mutex_wake_waiter(lock, waiter);

		wake_up_process(waiter->task);
	}

	spin_unlock_mutex(&lock->wait_lock, flags);
}

if (__mutex_slowpath_needs_to_unlock())
	atomic_set(&lock->count, 1);
以上函数是用来将count执行set为1的操作。

if (!list_empty(&lock->wait_list)) {
	/* get the first entry from the wait-list: */
	struct mutex_waiter *waiter = list_entry(lock->wait_list.next,struct mutex_waiter, list);

	debug_mutex_wake_waiter(lock, waiter);
	wake_up_process(waiter->task);
}
从等待队列取第一个等待者,然后执行唤醒操作。



GitHub 加速计划 / li / linux-dash
10.39 K
1.2 K
下载
A beautiful web dashboard for Linux
最近提交(Master分支:2 个月前 )
186a802e added ecosystem file for PM2 4 年前
5def40a3 Add host customization support for the NodeJS version 4 年前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐