Linux 的等待队列(wait queue)是实现 阻塞 I/O 和 I/O复用 (select, poll, epoll 等)的关键同步原语。
本文基于Linux 5.7,主要介绍:
- wait queue 的数据结构 和 创建方式
wait_queue_head
:DECLARE_WAIT_QUEUE_HEAD
,init_wait_queue_head
wait_queue_entry
:DECLARE_WAIT_QUEUE
,init_wait_queue_entry
,init_waitqueue_func_entry
wait queue 的手动添加和删除的API
add_wait_queue
和add_wait_queue_exclusive
remove_wait_queue
wait queue 的基本API
wait_event
系列macrowake_up
系列macro- 使用
WQ_FLAG_BOOKMARK
来分段遍历,减少单次对自旋锁的占用时间。 - 使用
WQ_FLAG_EXCLUSIVE
来实现单独唤醒,避免“惊群”。
- 使用
结合eventfd的实现来分析 wait queue 的使用
数据结构
wait_queue_head_t
等待队列是一个双向链表。等待队列头是一个wait_queue_head_t
由自旋锁lock
和一个侵入式链表head
组成。lock
用来保护等待队列。
1 | // include/linux/wait.h |
等待队列头可以通过两种方式初始化:
- 对于全局变量或者栈上的变量,可以使用宏进行静态初始化
1 | // include/linux/wait.h |
- 也提供动态初始化的方法
1 | // include/linux/wait.h |
wait_queue_entry
连接在等待队列上的数据结构是wait_queue_entry
1 | struct wait_queue_entry { |
类似地,wait_queue_entry
也可以进行静态初始化。
1 | // /include/linux/wait.h |
该静态初始化宏DECLARE_WAITQUEUE
用来初始化一个回调函数为default_wake_function
,private
为tsk
(就像名字暗示的一样,tsk
一般为一个task_struct
),且flag
为空的struct wait_queue_entry
对wait_queue_entry
的动态初始化函数则提供了可以自行设置其回调函数的init_waitqueue_func_entry
和默认回调函数为default_wake_function
的init_waitqueue_entry
。
1 | // /include/linux/wait.h |
关于默认回调函数default_wake_function
,我们将在分析等待队列的原理时讲解。
flags
的可选值为1
2
3
4
关于WQ_FLAG_BOOKMARK
和WQ_FLAG_EXCLUSIVE
的介绍,见wake_up的实现
private
用来存放和回调函数func
功能相关的数据结构。- 如果该entry被用来唤醒某个等待在该队列上的task,则此处一般为一个
task_struct
的指针,这种entry在进程阻塞在I/O上时常见,它一般来自宏DECLARE_WAITQUEUE
或者默认回调函数的初始化函数init_waitqueue_entry
- 如果该entry被用来唤醒某个等待在该队列上的task,则此处一般为一个
func
是在等待的事件发生时,队列上的该entry会被调用的函数。他一般用来唤醒一个task(default_wake_function
)。在epoll的实现中,当该entry所在的等待队列等待的事件发生时,一个独特的回调函数回会将该entry关联的epitem
放入rdllist
(也可能是ovflist
)中,以待epoll_wait
对该列表中的内容进行收割。entry
:用来和wait_queue_head_t
或者其他struct wait_queue_entry
相连的侵入式链表项。
wait_queue API
队列中entry的增加与移除
使用队列头中的自旋锁保证队列操作的原子性。
1 | void add_wait_queue(struct wait_queue_head *wq_head, |
1 |
|
将一个wait_queue_entry
加入到wait_queue_head
所代表的等待队列中
#1
使用add_wait_queue
时不允许该entry的flag为WQ_FLAG_EXCLUSICE
,关于WQ_FLAG_EXCLUSICE
,见Exclusive waits#2
使用关闭中断并记录之前关闭中断状态的自旋锁加锁方式spin_lock_irqsave
#3
将entry中的侵入式链表项与原等待队列相连
类似地,移除 entry 时也要加锁:
1 | // include/linux/wait.h |
wait_event和wait_event_interruptible
在/include/linux/wait.h
中提供一系列的wait_event
macro以供某一task等待在某一条件上。
1 | wait_event_interruptible(wq_head, condition) |
wait_event系列macro会将当前进程置于阻塞状态,直到condition为true。因为condition可能被运算多次,因此condition的计算一般不应有副作用。
一般来讲不会使用wait_event
,因为其会将task置于uninterruptible sleep状态(也就是使用ps
命令中列出的那些处于恶心的”D”状态的task)。uninterruptible sleep状态的task不被SIGNAL从睡眠中唤醒,这也就意味着不可以使用SIGKILL
将其终止。
uninterruptible sleep的一个臭名昭著的例子是在Linux NFS上使用mkdir(2)
系统调用而导致的阻塞。mkdir(2)
是一个不可中断的系统调用,其返回值没有EINTR
。我们可以通过观察mkdir(2)
的man手册来确认这一点。在一个正常的本地文件系统上,mkdir(2)
最差的情况也仅仅是进行几次硬盘寻道。但是在一个像NFS这样的网络文件系统上,mkdir(2)
需要进行多个RPC调用,这些RPC调用又可能阻塞在其网络I/O上。因此,这种mkdir(2)
可能会阻塞很久很久。但其uninterruptible sleep的状态使得task变成了“不死僵尸”,消灭它的唯一办法只有重启(或者不理他😊)。
为了避免产生上面所说的“不死僵尸”,我们一般采用可以被信号中断的wait_event_intterruptible
。如果其返回一个非零值,则证明该task在等待过程中被某个信号中断。
wait_event的实现
wait_event系列macro都是对___wait_event
macro的封装,我们以wait_event_interruptible
为例来分析其实现,为了方便分析,我们将该macro展开:
1 | // /include/linux/wait.h |
1 | void init_wait_entry(struct wait_queue_entry *wq_entry, int flags) |
1 | int autoremove_wake_function(struct wait_queue_entry *wq_entry, unsigned mode, int sync, void *key) |
1 | // kernel/sched/wait.c |
1 | // kernel/sched/wait.c |
#1
might_sleep()
是一个用来帮助debug的函数,用来标注那些可能会sleep的函数。当这些被标注的函数在不允许sleep的条件下(例如持有一个自旋锁时)进行了sleep,该函数就会发出警告。#2
首先判断该条件是否满足,如果满足,整个表达式的值就是为0的_ret
。#3
__label__ __out
是一个局部label该label只能在其作用域内被引用。局部label被用做在含有循环的复杂macro中跳出循环。#4
根据是否进行Exclusive wait来初始化__wq_entry
。不同与之前介绍的init_waitqueue_entry
,这里绑定的回调函数func
是autoremove_wake_function
#5
调用prepare_to_wait_event
,因为涉及队列的操作,所以全程带自旋锁#5.1
,检查此时是否有未处理的信号,如果有,则停止加入等待队列并返回-ERESTARTSYS
#5.1
如果要进行exclusive waits,则将该entry放在队尾#5.2
否则,放在队首。#5.4
将当前进程的状态设置为TASK_INTERRUPTIBLE
,以后scheduler将不会再schedule它。
#6
检查条件是否满足#7
检查在可中断的条件下,是否有中断产生#8
让出CPU。当它被再次唤醒时,将会从schedule函数返回。在#5
处被重新添加到队列,并进行#6
,#7
的检查。#9
从循环跳出后,恢复TASK_RUNNING
状态,并释放wq_entry->entry
的资源。
wake_up和wake_up_interruptible
1 |
|
一般来说,当用wait_event_interruptible
进行等待,之后使用wake_up
和wake_up_interruptible
唤醒,其效果是没有差别的。实践中,我们通常按照wait_event
对应wake_up
;wait_event_interruptible
对应wake_up_interruptible
来使用。
wake_up的实现
wake_up
的主要逻辑实现在__wake_up_common
中。它遍历等待队列中的所有entry,并在#1.5
处调用该entry在之前设置好的回调函数。代码中的其他细节解释如下:
#1.1
处使用macrolockdep_assert_held
来assert已经持有等待队列头的自旋锁。因此该函数只能在持有锁的情况下被调用。__wake_up_common_lock
中的#2.2
使用spin_lock_irqsave
验证了这一点。
分批次唤醒
优化:分批次遍历较长的等待队列,以免持有自旋锁时间过长,影响性能。
#1.2
用来处理bookmark
逻辑的代码。- 首次进入函数
__wake_up_common
时,#1.2
处,bookmark
的WQ_FLAG_BOOKMARK
还没有设置(见#2.1
),因此将curr
置为等待队列的第一个entry(#1.3
)。
- 首次进入函数
- 之后进入循环
list_for_each_entry_safe_from
,在#1.7处
检查当前已经遍历过的节点个数是否已经超过WAITQUEUE_WALK_BREAK_CNT
(在当前版本该值被定义为64),如果已经超过,就在 下一个entrynext
之前将bookmark
设置好WQ_FLAG_BOOKMARK
后,用macrolist_add_tail
插入到next
节点之前,并结束循环。 - 结束循环后在
#2.4
处放弃了持有自旋锁。这样就保证了自旋锁的持有时间不会过长。 - 之后循环继续进行,重新进入
__wake_up_common
函数后,在#1.2
位置,因为WQ_FLAG_BOOKMARK
已经被设置,因此curr
会被置为bookmark
的后继节点,也就是继续之前的循环。
关于bookmark
,详见sched/wait: Break up long wake list walk
避免惊群
优化:防止惊群的Exclusive waits
由上面关于wake_up
的行为分析可知,wake_up
会一段一段地调用等待队列中的entry的回调函数func
。由前面关于wait_event
的实现分析可知,使用wait_event
系列macro添加到队列中的entry的回调函数autoremove_wake_function
。其作用为将当前entry从等待队列中移除,并将对应的task唤醒。
当有许多任务等待在等待队列上等待某一资源,而该资源却只能由被唤醒的task中的其中一个得到时,被唤醒的,未得到该资源的其他task就会重新进入sleep。这导致了不必要的context switch,从而降低了内核性能。我们把这种状况称为”惊群“(thundering herd)。
为了避免惊群,wait_queue
提供了一个WQ_FLAG_EXCLUSIVE
。当一个entry被标记为exclusive时,在prepare_to_wait_event
函数中,其会被添加到等待队列的末尾。
注意:手动添加entry到等待队列的函数
add_wait_queue
会清除WQ_FLAG_EXCLUSIVE
,要用add_wait_queue_exclusive
来添加带有该flag的entry。
当wake_up
对等待队列中的entry进行唤醒时,#1.6
说明对整个等待队列的遍历会停止在第一个遇到的flag
为WQ_FLAG_EXCLUSIVE
的entry上面。
因此,如果所有的entry都带有该flag,且结果就是每次只有一个task被唤醒,不会产生“惊群”。
当多个task之间对某种资源有很严重的竞争,并且当该资源可用时,只唤醒一个task便足以将该资源完全消耗时,我们考虑使用Exclusive waits。一个网络编程中典型的例子是:当多个acceptors竞争一个新到来的tcp握手请求,进行accept系统调用时,我们只需要唤醒其中一个acceptor即可完全处理这个新到来的tcp链接。唤醒其他acceptor是不必要的。
1 | static int __wake_up_common(struct wait_queue_head *wq_head, unsigned int mode, |
1 | // kernel/sched/wait.c |
eventfd
eventfd是Linux一种轻量化的线程间通信方式。eventfd(2)
会产生一个”eventfd object”,该对象包含一个uint64_t
的计数器。使用eventfd(2)
成功后会返回一个fd。在使用阻塞I/O时,eventfd的逻辑如下:
- 对eventfd进行
read
- 如果计数器的值为0,则该次
read
调用阻塞,直到计数器的值不为零。 - 如果计数器的值不为0,则将计数器的值置0,并返回原来原来计数器的值。
- 如果计数器的值为0,则该次
- 对eventfd进行
write
- 将buffer中的数值加到计数器上,如果相加后计数器的值超过的
UINT64_MAX
,则write
阻塞直到有其他进程调用read
- 将buffer中的数值加到计数器上,如果相加后计数器的值超过的
eventfd的“阻塞”功能,是通过wait_queue
来实现的,下面我们来分析eventfd的一部分的代码。
eventfd的数据结构与初始化
1 | struct eventfd_ctx { |
wqh
是一个等待队列头wait_queue_head_t
,而计数器就是count
。
eventfd(2)
系统调用通过do_eventfd
来完成evnetfd的创建和初始化:
#1
对等待队列头进行了初始化。#2
取得一个未使用的文件描述符#3
创建一个匿名inode#4
将该fd和匿名inode安装到当前线程上
1 | static int do_eventfd(unsigned int count, int flags) |
eventfd并没有使用之前介绍的封装好的wait_event
和wake_up
系列macro,而是使用DECLARE_WAITQUEUE
,__add_wait_queue
等自行维护wait_queue
:
#1
在栈上静态初始化一个wait_queue_entry
。其回调函数为default_wake_function
。#2
不允许读取的内容少于8个字节#3
获得等待队列的自旋锁,该自旋锁不但保护等待队列,也保护counter。可能这就是重新实现一次wait_queue逻辑的原因。因为如果我们使用封装好的wait_evnet
系列宏,就可能需要另一个同步原语来保证计数器的读写。#4
如果计数器的值大于零,不需等待,直接跳到#14
,遍历等待队列,并唤醒等待队列上所有的task。#5
如果计数器的值为零,且不是非阻塞I/O,就进行将该task加入到等待队列的一系列工作,首先将之前声明在栈上的wait_queue_entry
放入等待队列#6
开始进入循环,每次程序被唤醒时从schedule()
中返回,并从#11
处开始执行#7
将该task状态设置为TASK_INTERRUPTIBLE
,准备将该进程置于interruptible sleep#8
检查counter,如果不为零,就结束sleep#9
因为处于interruptible sleep,检查信号,如果有待处理的信号,返回-ERESTARTSYS
#10
马上要放弃CPU进入sleep,因为绝对不可带着自旋锁进入休眠,因此将释放自旋锁。#11
当从该task被唤醒,并从schedule()
返回后,重新取得自旋锁,进行#7
,#8
,#9
,#10
重新进行条件检查。
由此可见,其逻辑基本和wait_event_interruptible
相同。
1 | static ssize_t eventfd_read(struct file *file, char __user *buf, size_t count, |
1 | // /include/linux/wait.h |
对于eventfd_write
的分析和eventfd_read
类似,这里就留给读者了。
参考:
- Linux Device Drivers 3rd Edition - Chapter 6 - Blocking I/O
- Linux Wait Queue 等待队列 - RobotKarel
- 等待队列wait queue - kaedehao
- 源码解读Linux等待队列 - Gityuan
- Uninterruptible Sleep - eklitzke.org
- Locally Decalred Labels - gcc online documents
- sched/wait: Break up long wake list walk
- Linux中的惊群问题 - JackT’s Blog
- eventfd 分析 - huazq’s blog