0%

Linux 等待队列 (wait queue)

Linux 的等待队列(wait queue)是实现 阻塞 I/O 和 I/O复用 (select, poll, epoll 等)的关键同步原语。

本文基于Linux 5.7,主要介绍:

  • wait queue 的数据结构 和 创建方式
    • wait_queue_head : DECLARE_WAIT_QUEUE_HEAD, init_wait_queue_head
    • wait_queue_entry: DECLARE_WAIT_QUEUE, init_wait_queue_entry, init_waitqueue_func_entry
  • wait queue 的手动添加和删除的API

    • add_wait_queueadd_wait_queue_exclusive
    • remove_wait_queue
  • wait queue 的基本API

    • wait_event系列macro
    • wake_up系列macro
      • 使用WQ_FLAG_BOOKMARK来分段遍历,减少单次对自旋锁的占用时间。
      • 使用WQ_FLAG_EXCLUSIVE来实现单独唤醒,避免“惊群”。
  • 结合eventfd的实现来分析 wait queue 的使用

数据结构

wait_queue_head_t

等待队列是一个双向链表。等待队列头是一个wait_queue_head_t由自旋锁lock和一个侵入式链表head组成。lock用来保护等待队列。

1
2
3
4
5
6
7
// include/linux/wait.h
struct wait_queue_head {
spinlock_t lock;
struct list_head head;
};

typedef struct wait_queue_head wait_queue_head_t;

等待队列头可以通过两种方式初始化:

  • 对于全局变量或者栈上的变量,可以使用宏进行静态初始化
1
2
3
4
5
6
7
// include/linux/wait.h
#define DECLARE_WAIT_QUEUE_HEAD(name) \
struct wait_queue_head name = __WAIT_QUEUE_HEAD_INITIALIZER(name)

#define __WAIT_QUEUE_HEAD_INITIALIZER(name) { \
.lock = __SPIN_LOCK_UNLOCKED(name.lock), \
.head = { &(name).head, &(name).head } }
  • 也提供动态初始化的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// include/linux/wait.h
#define init_waitqueue_head(wq_head) \
do { \
static struct lock_class_key __key; \
\
__init_waitqueue_head((wq_head), #wq_head, &__key); \
} while (0)

// kernel/sched/wait.c
void __init_waitqueue_head(struct wait_queue_head *wq_head, const char *name, struct lock_class_key *key)
{
spin_lock_init(&wq_head->lock);
lockdep_set_class_and_name(&wq_head->lock, key, name);
INIT_LIST_HEAD(&wq_head->head);
}

wait_queue_entry

连接在等待队列上的数据结构是wait_queue_entry

1
2
3
4
5
6
7
8
9
10
struct wait_queue_entry {
unsigned int flags;
void *private;
wait_queue_func_t func;
struct list_head entry;
};

typedef int (*wait_queue_func_t)(struct wait_queue_entry *wq_entry, unsigned mode, int flags, void *key);


类似地,wait_queue_entry也可以进行静态初始化。

1
2
3
4
5
6
7
8
// /include/linux/wait.h
#define __WAITQUEUE_INITIALIZER(name, tsk) { \
.private = tsk, \
.func = default_wake_function, \
.entry = { NULL, NULL } }

#define DECLARE_WAITQUEUE(name, tsk) \
struct wait_queue_entry name = __WAITQUEUE_INITIALIZER(name, tsk)

该静态初始化宏DECLARE_WAITQUEUE用来初始化一个回调函数为default_wake_functionprivatetsk(就像名字暗示的一样,tsk一般为一个task_struct),且flag为空的struct wait_queue_entry

wait_queue_entry的动态初始化函数则提供了可以自行设置其回调函数的init_waitqueue_func_entry和默认回调函数为default_wake_functioninit_waitqueue_entry

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// /include/linux/wait.h
static inline void init_waitqueue_entry(struct wait_queue_entry *wq_entry, struct task_struct *p)
{
wq_entry->flags = 0;
wq_entry->private = p;
wq_entry->func = default_wake_function;
}

static inline void
init_waitqueue_func_entry(struct wait_queue_entry *wq_entry, wait_queue_func_t func)
{
wq_entry->flags = 0;
wq_entry->private = NULL;
wq_entry->func = func;
}

关于默认回调函数default_wake_function,我们将在分析等待队列的原理时讲解。


  • flags的可选值为
    1
    2
    3
    4
    #define WQ_FLAG_EXCLUSIVE   0x01
    #define WQ_FLAG_WOKEN 0x02
    #define WQ_FLAG_BOOKMARK 0x04
    #define WQ_FLAG_CUSTOM 0x08

关于WQ_FLAG_BOOKMARKWQ_FLAG_EXCLUSIVE的介绍,见wake_up的实现

  • private用来存放和回调函数func功能相关的数据结构。

    • 如果该entry被用来唤醒某个等待在该队列上的task,则此处一般为一个task_struct的指针,这种entry在进程阻塞在I/O上时常见,它一般来自宏DECLARE_WAITQUEUE或者默认回调函数的初始化函数init_waitqueue_entry
  • func是在等待的事件发生时,队列上的该entry会被调用的函数。他一般用来唤醒一个task(default_wake_function)。在epoll的实现中,当该entry所在的等待队列等待的事件发生时,一个独特的回调函数回会将该entry关联的epitem放入rdllist(也可能是ovflist)中,以待epoll_wait对该列表中的内容进行收割。

  • entry:用来和wait_queue_head_t或者其他struct wait_queue_entry相连的侵入式链表项。

wait_queue API

队列中entry的增加与移除

使用队列头中的自旋锁保证队列操作的原子性。

1
2
3
4
void add_wait_queue(struct wait_queue_head *wq_head, 
struct wait_queue_entry *wq_entry);
void remove_wait_queue(struct wait_queue_head *wq_head,
struct wait_queue_entry *wq_entry);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

// include/linux/wait.h
static inline void __add_wait_queue(struct wait_queue_head *wq_head, struct wait_queue_entry *wq_entry)
{
list_add(&wq_entry->entry, &wq_head->head); // #3
}

// kernel/sched/wati.c
void add_wait_queue(struct wait_queue_head *wq_head, struct wait_queue_entry *wq_entry)
{
unsigned long flags;

wq_entry->flags &= ~WQ_FLAG_EXCLUSIVE; // #1
spin_lock_irqsave(&wq_head->lock, flags); // #2
__add_wait_queue(wq_head, wq_entry);
spin_unlock_irqrestore(&wq_head->lock, flags);
}
EXPORT_SYMBOL(add_wait_queue);

将一个wait_queue_entry加入到wait_queue_head所代表的等待队列中

  • #1 使用add_wait_queue时不允许该entry的flag为WQ_FLAG_EXCLUSICE,关于WQ_FLAG_EXCLUSICE,见Exclusive waits
  • #2 使用关闭中断并记录之前关闭中断状态的自旋锁加锁方式spin_lock_irqsave
  • #3 将entry中的侵入式链表项与原等待队列相连

类似地,移除 entry 时也要加锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// include/linux/wait.h
static inline void
__remove_wait_queue(struct wait_queue_head *wq_head, struct wait_queue_entry *wq_entry)
{
list_del(&wq_entry->entry);
}

// kernel/sched/wati.c
void remove_wait_queue(struct wait_queue_head *wq_head, struct wait_queue_entry *wq_entry)
{
unsigned long flags;

spin_lock_irqsave(&wq_head->lock, flags);
__remove_wait_queue(wq_head, wq_entry);
spin_unlock_irqrestore(&wq_head->lock, flags);
}
EXPORT_SYMBOL(remove_wait_queue);

wait_event和wait_event_interruptible

/include/linux/wait.h中提供一系列的wait_event macro以供某一task等待在某一条件上。

1
2
3
wait_event_interruptible(wq_head, condition)
wait_event(wq_head, condition)
// ...

wait_event系列macro会将当前进程置于阻塞状态,直到condition为true。因为condition可能被运算多次,因此condition的计算一般不应有副作用。

一般来讲不会使用wait_event,因为其会将task置于uninterruptible sleep状态(也就是使用ps命令中列出的那些处于恶心的”D”状态的task)。uninterruptible sleep状态的task不被SIGNAL从睡眠中唤醒,这也就意味着不可以使用SIGKILL将其终止。

uninterruptible sleep的一个臭名昭著的例子是在Linux NFS上使用mkdir(2)系统调用而导致的阻塞。mkdir(2)是一个不可中断的系统调用,其返回值没有EINTR。我们可以通过观察mkdir(2)的man手册来确认这一点。在一个正常的本地文件系统上,mkdir(2)最差的情况也仅仅是进行几次硬盘寻道。但是在一个像NFS这样的网络文件系统上,mkdir(2)需要进行多个RPC调用,这些RPC调用又可能阻塞在其网络I/O上。因此,这种mkdir(2)可能会阻塞很久很久。但其uninterruptible sleep的状态使得task变成了“不死僵尸”,消灭它的唯一办法只有重启(或者不理他😊)。

为了避免产生上面所说的“不死僵尸”,我们一般采用可以被信号中断的wait_event_intterruptible。如果其返回一个非零值,则证明该task在等待过程中被某个信号中断。

wait_event的实现

wait_event系列macro都是对___wait_event macro的封装,我们以wait_event_interruptible为例来分析其实现,为了方便分析,我们将该macro展开:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// /include/linux/wait.h

({
int __ret = 0;
might_sleep(); // #1

if(!(condition)) // #2
{
__ret = ({
__label__ __out; // #3
struct wait_queue_entry __wq_entry;
long __ret = ret; /* explicit shadow */

init_wait_entry(&__wq_entry, exclusive ? WQ_FLAG_EXCLUSIVE : 0); // #4
for (;;) {
long __int = prepare_to_wait_event(&wq_head, &__wq_entry, state); // #5

if (condition) // #6
break;

if (___wait_is_interruptible(state) && __int) {
__ret = __int;
goto __out;
} // #7

schedule(); // #8
}
finish_wait(&wq_head, &__wq_entry);// #9
__out: __ret;
})
}
_ret;
})
  • #1 might_sleep()是一个用来帮助debug的函数,用来标注那些可能会sleep的函数。当这些被标注的函数在不允许sleep的条件下(例如持有一个自旋锁时)进行了sleep,该函数就会发出警告。
  • #2 首先判断该条件是否满足,如果满足,整个表达式的值就是为0的_ret
  • #3 __label__ __out是一个局部label该label只能在其作用域内被引用。局部label被用做在含有循环的复杂macro中跳出循环。
  • #4 根据是否进行Exclusive wait来初始化__wq_entry。不同与之前介绍的init_waitqueue_entry,这里绑定的回调函数funcautoremove_wake_function
  • #5 调用prepare_to_wait_event,因为涉及队列的操作,所以全程带自旋锁
    • #5.1,检查此时是否有未处理的信号,如果有,则停止加入等待队列并返回-ERESTARTSYS
    • #5.1 如果要进行exclusive waits,则将该entry放在队尾
    • #5.2 否则,放在队首。
    • #5.4 将当前进程的状态设置为TASK_INTERRUPTIBLE,以后scheduler将不会再schedule它。
  • #6 检查条件是否满足
  • #7 检查在可中断的条件下,是否有中断产生
  • #8 让出CPU。当它被再次唤醒时,将会从schedule函数返回。在#5处被重新添加到队列,并进行#6#7的检查。
  • #9 从循环跳出后,恢复TASK_RUNNING状态,并释放wq_entry->entry的资源。

wake_up和wake_up_interruptible

1
2
3
4
#define wake_up(x)      __wake_up(x, TASK_NORMAL, 1, NULL)
#define wake_up_interruptible(x) __wake_up(x, TASK_INTERRUPTIBLE, 1, NULL)
void __wake_up(struct wait_queue_head *wq_head, unsigned int mode, int nr, void *key);

一般来说,当用wait_event_interruptible进行等待,之后使用wake_upwake_up_interruptible唤醒,其效果是没有差别的。实践中,我们通常按照wait_event对应wake_upwait_event_interruptible对应wake_up_interruptible来使用。

wake_up的实现

wake_up的主要逻辑实现在__wake_up_common中。它遍历等待队列中的所有entry,并在#1.5处调用该entry在之前设置好的回调函数。代码中的其他细节解释如下:

  • #1.1 处使用macro lockdep_assert_held 来assert已经持有等待队列头的自旋锁。因此该函数只能在持有锁的情况下被调用。__wake_up_common_lock中的#2.2使用spin_lock_irqsave验证了这一点。

分批次唤醒

优化:分批次遍历较长的等待队列,以免持有自旋锁时间过长,影响性能。

  • #1.2 用来处理bookmark逻辑的代码。
    • 首次进入函数__wake_up_common时, #1.2处,bookmarkWQ_FLAG_BOOKMARK还没有设置(见#2.1),因此将curr置为等待队列的第一个entry(#1.3)。
  • 之后进入循环list_for_each_entry_safe_from,在#1.7处检查当前已经遍历过的节点个数是否已经超过WAITQUEUE_WALK_BREAK_CNT(在当前版本该值被定义为64),如果已经超过,就在 下一个entrynext之前将bookmark设置好WQ_FLAG_BOOKMARK后,用macrolist_add_tail插入到next节点之前,并结束循环。
  • 结束循环后在#2.4处放弃了持有自旋锁。这样就保证了自旋锁的持有时间不会过长。
  • 之后循环继续进行,重新进入__wake_up_common函数后,在#1.2位置,因为WQ_FLAG_BOOKMARK已经被设置,因此curr会被置为bookmark的后继节点,也就是继续之前的循环。

关于bookmark,详见sched/wait: Break up long wake list walk

避免惊群

优化:防止惊群的Exclusive waits

由上面关于wake_up的行为分析可知,wake_up会一段一段地调用等待队列中的entry的回调函数func。由前面关于wait_event的实现分析可知,使用wait_event系列macro添加到队列中的entry的回调函数autoremove_wake_function。其作用为将当前entry从等待队列中移除,并将对应的task唤醒。

当有许多任务等待在等待队列上等待某一资源,而该资源却只能由被唤醒的task中的其中一个得到时,被唤醒的,未得到该资源的其他task就会重新进入sleep。这导致了不必要的context switch,从而降低了内核性能。我们把这种状况称为”惊群“(thundering herd)。

为了避免惊群,wait_queue提供了一个WQ_FLAG_EXCLUSIVE。当一个entry被标记为exclusive时,在prepare_to_wait_event函数中,其会被添加到等待队列的末尾。

注意:手动添加entry到等待队列的函数add_wait_queue会清除WQ_FLAG_EXCLUSIVE,要用add_wait_queue_exclusive来添加带有该flag的entry。

wake_up对等待队列中的entry进行唤醒时,#1.6说明对整个等待队列的遍历会停止在第一个遇到的flagWQ_FLAG_EXCLUSIVE的entry上面。

因此,如果所有的entry都带有该flag,且结果就是每次只有一个task被唤醒,不会产生“惊群”。

当多个task之间对某种资源有很严重的竞争,并且当该资源可用时,只唤醒一个task便足以将该资源完全消耗时,我们考虑使用Exclusive waits。一个网络编程中典型的例子是:当多个acceptors竞争一个新到来的tcp握手请求,进行accept系统调用时,我们只需要唤醒其中一个acceptor即可完全处理这个新到来的tcp链接。唤醒其他acceptor是不必要的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
static int __wake_up_common(struct wait_queue_head *wq_head, unsigned int mode,
int nr_exclusive, int wake_flags, void *key,
wait_queue_entry_t *bookmark)
{
wait_queue_entry_t *curr, *next;
int cnt = 0;

lockdep_assert_held(&wq_head->lock); // #1.1

if (bookmark && (bookmark->flags & WQ_FLAG_BOOKMARK)) { // #1.2
curr = list_next_entry(bookmark, entry);

list_del(&bookmark->entry);
bookmark->flags = 0;
} else
curr = list_first_entry(&wq_head->head, wait_queue_entry_t, entry); // #1.3

if (&curr->entry == &wq_head->head) // #1.4
return nr_exclusive;

list_for_each_entry_safe_from(curr, next, &wq_head->head, entry) {
unsigned flags = curr->flags;
int ret;

if (flags & WQ_FLAG_BOOKMARK)
continue;

ret = curr->func(curr, mode, wake_flags, key); // #1.5
if (ret < 0)
break;
if (ret && (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive) // #1.6
break;

if (bookmark && (++cnt > WAITQUEUE_WALK_BREAK_CNT) && // #1.7
(&next->entry != &wq_head->head)) {
bookmark->flags = WQ_FLAG_BOOKMARK;
list_add_tail(&bookmark->entry, &next->entry); // #1.8
break;
}
}

return nr_exclusive;
}

eventfd

eventfd是Linux一种轻量化的线程间通信方式。eventfd(2)会产生一个”eventfd object”,该对象包含一个uint64_t的计数器。使用eventfd(2)成功后会返回一个fd。在使用阻塞I/O时,eventfd的逻辑如下:

  • 对eventfd进行read
    • 如果计数器的值为0,则该次read调用阻塞,直到计数器的值不为零。
    • 如果计数器的值不为0,则将计数器的值置0,并返回原来原来计数器的值。
  • 对eventfd进行write
    • 将buffer中的数值加到计数器上,如果相加后计数器的值超过的UINT64_MAX,则write阻塞直到有其他进程调用read

eventfd的“阻塞”功能,是通过wait_queue来实现的,下面我们来分析eventfd的一部分的代码。

eventfd的数据结构与初始化

1
2
3
4
5
6
7
struct eventfd_ctx {
struct kref kref;
wait_queue_head_t wqh;
__u64 count;
unsigned int flags;
int id;
};

wqh是一个等待队列头wait_queue_head_t,而计数器就是count


eventfd(2)系统调用通过do_eventfd来完成evnetfd的创建和初始化:

  • #1 对等待队列头进行了初始化。
  • #2 取得一个未使用的文件描述符
  • #3 创建一个匿名inode
  • #4 将该fd和匿名inode安装到当前线程上

eventfd并没有使用之前介绍的封装好的wait_eventwake_up系列macro,而是使用DECLARE_WAITQUEUE__add_wait_queue等自行维护wait_queue

  • #1 在栈上静态初始化一个wait_queue_entry。其回调函数为default_wake_function

  • #2 不允许读取的内容少于8个字节

  • #3 获得等待队列的自旋锁,该自旋锁不但保护等待队列,也保护counter。可能这就是重新实现一次wait_queue逻辑的原因。因为如果我们使用封装好的wait_evnet系列宏,就可能需要另一个同步原语来保证计数器的读写。
  • #4 如果计数器的值大于零,不需等待,直接跳到#14,遍历等待队列,并唤醒等待队列上所有的task。
  • #5 如果计数器的值为零,且不是非阻塞I/O,就进行将该task加入到等待队列的一系列工作,首先将之前声明在栈上的wait_queue_entry放入等待队列
  • #6 开始进入循环,每次程序被唤醒时从schedule()中返回,并从#11处开始执行
  • #7 将该task状态设置为TASK_INTERRUPTIBLE,准备将该进程置于interruptible sleep
  • #8 检查counter,如果不为零,就结束sleep
  • #9 因为处于interruptible sleep,检查信号,如果有待处理的信号,返回-ERESTARTSYS
  • #10 马上要放弃CPU进入sleep,因为绝对不可带着自旋锁进入休眠,因此将释放自旋锁。
  • #11 当从该task被唤醒,并从schedule()返回后,重新取得自旋锁,进行#7,#8,#9,#10重新进行条件检查。

由此可见,其逻辑基本和wait_event_interruptible相同。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
static ssize_t eventfd_read(struct file *file, char __user *buf, size_t count,
loff_t *ppos)
{
struct eventfd_ctx *ctx = file->private_data;
ssize_t res;
__u64 ucnt = 0;
DECLARE_WAITQUEUE(wait, current); // #1

if (count < sizeof(ucnt)) // #2
return -EINVAL;

spin_lock_irq(&ctx->wqh.lock); // #3
res = -EAGAIN;
if (ctx->count > 0) // #4
res = sizeof(ucnt);
else if (!(file->f_flags & O_NONBLOCK)) {
__add_wait_queue(&ctx->wqh, &wait); // #5
for (;;) { // #6
set_current_state(TASK_INTERRUPTIBLE); // #7
if (ctx->count > 0) { // #8
res = sizeof(ucnt);
break;
}
if (signal_pending(current)) { // #9
res = -ERESTARTSYS;
break;
}
spin_unlock_irq(&ctx->wqh.lock); // #10
schedule();
spin_lock_irq(&ctx->wqh.lock); // #11
}
__remove_wait_queue(&ctx->wqh, &wait); // #12
__set_current_state(TASK_RUNNING); // #13
}
if (likely(res > 0)) {
eventfd_ctx_do_read(ctx, &ucnt);
if (waitqueue_active(&ctx->wqh)) // #14
wake_up_locked_poll(&ctx->wqh, EPOLLOUT); // #15
}
spin_unlock_irq(&ctx->wqh.lock);

if (res > 0 && put_user(ucnt, (__u64 __user *)buf))
return -EFAULT;

return res;
}

1
2
3
4
5
6
7
8
9
10
11
// /include/linux/wait.h
#define wake_up_locked_poll(x, m) \
__wake_up_locked_key((x), TASK_NORMAL, poll_to_key(m))


// /kernel/sched/wait.c

void __wake_up_locked_key(struct wait_queue_head *wq_head, unsigned int mode, void *key)
{
__wake_up_common(wq_head, mode, 1, 0, key, NULL);
}

对于eventfd_write的分析和eventfd_read类似,这里就留给读者了。


参考: