linux内核⼯作队列讲解和源码详细注释
1. 前⾔
⼯作队列(workqueue)的Linux内核中的定义的⽤来处理不是很紧急事件的回调⽅式处理⽅法。
以下代码的linux内核版本为2.6.19.2,源代码⽂件主要为
2. 数据结构
/* include/linux/workqueue.h */ // ⼯作节点结构struct work_struct { // 等待时间unsigned long pending;// 链表节点struct list_head entry;// workqueue回调函数void (*func)(void *);// 回调函数func的数据void *data;// 指向CPU相关数据,⼀般指向struct
cpu_workqueue_struct结构void *wq_data;// 定时器struct timer_list timer;};
struct execute_work { struct work_struct work;};
/* kernel/workqueue.c */ /* * The per-CPU workqueue (if single thread, we always use the first * possible cpu)。
* * The sequence counters are for flush_scheduled_work()。 It wants to wait * until all currently-scheduled works are completed,but it doesn't * want to be livelocked by new, incoming ones. So it waits until * remove_sequence is >= the insert_sequence which pertained when * flush_scheduled_work() was called. */ // 这个结构是针对每个CPU的struct cpu_workqueue_struct { // 结构锁spinlock_t lock;// 下⼀个要执⾏的节点序号long remove_sequence; /* Least-recently added (next to run) */ // 下⼀个要插⼊节点的序号long insert_sequence; /* Next to add */ // ⼯作机构链表节点struct list_head worklist;// 要进⾏处理的等待队列wait_queue_head_t
more_work;// 处理完的等待队列wait_queue_head_t work_done;// ⼯作队列节点struct workqueue_struct *wq;// 进程指针struct
task_struct *thread;int run_depth; /* Detect run_workqueue() recursion depth */ } ____cacheline_aligned;/* * The externally visible workqueue abstraction is an array of * per-CPU workqueues:*/ // ⼯作队列结构struct workqueue_struct { struct cpu_workqueue_struct
*cpu_wq;const char *name;struct list_head list; /* Empty if single thread */ };
kernel/workqueue.c中定义了⼀个⼯作队列链表,所有⼯作队列可以挂接到这个链表中:static LIST_HEAD(workqueues);
3. ⼀些宏定义
/* include/linux/workqueue.h */ // 初始化⼯作队列#define __WORK_INITIALIZER(n, f, d) { // 初始化list。entry = { &(n)。entry, &(n)。entry },// 回调函数。func = (f),// 回调函数参数。data = (d),// 初始化定时器。timer =
TIMER_INITIALIZER(NULL, 0, 0),}
// 声明⼯作队列并初始化#define DECLARE_WORK(n, f, d)
struct work_struct n = __WORK_INITIALIZER(n, f, d)
/* * initialize a work-struct's func and data pointers:*/ // 重新定义⼯作结构参数#define PREPARE_WORK(_work, _func, _data)
do {(_work)->func = _func;(_work)->data = _data;} while (0)
/* * initialize all of a work-struct:*/ // 初始化⼯作结构,和__WORK_INITIALIZER功能相同,不过__WORK_INITIALIZER⽤在// 参数初始化定义,⽽该宏⽤在程序之中对⼯作结构赋值#define INIT_WORK(_work, _func, _data)
do { INIT_LIST_HEAD(&(_work)->entry);(_work)->pending = 0;PREPARE_WORK((_work),(_func),
(_data));init_timer(&(_work)->timer);} while (0)
4. 操作函数
4.1 创建⼯作队列
⼀般的创建函数是create_workqueue,但这其实只是⼀个宏:/* include/linux/workqueue.h */ #define create_workqueue(name)
__create_workqueue((name), 0)
在workqueue的初始化函数中,定义了⼀个针对内核中所有线程可⽤的事件⼯作队列,其他内核线程建⽴的事件⼯作结构就都挂接到该队列:void init_workqueues(void)
{……
keventd_wq = create_workqueue("events");……
}
核⼼创建函数是__create_workqueue:
struct workqueue_struct *__create_workqueue(const char *name,int singlethread)
{ int cpu, destroy = 0;struct workqueue_struct *wq;struct task_struct *p;// 分配⼯作队列结构空间wq = kzalloc(sizeof(*wq),GFP_KERNEL);if (!wq)
return NULL;// 为每个CPU分配单独的⼯作队列空间wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);if (!wq-
>cpu_wq) { kfree(wq);return NULL;} wq->name = name;mutex_lock(&workqueue_mutex);if (singlethread) { // 使⽤create_workqueue宏时该参数始终为0 // 如果是单⼀线程模式,在单线程中调⽤各个⼯作队列// 建⽴⼀个的⼯作队列内核线程
INIT_LIST_HEAD(&wq->list);// 建⽴⼯作队列的线程p = create_workqueue_thread(wq, singlethread_cpu);if (!p)
destroy = 1;else // 唤醒该线程wake_up_process(p);} else { // 链表模式,将⼯作队列添加到⼯
作队列链表list_add(&wq->list,&workqueues);// 为每个CPU建⽴⼀个⼯作队列线程for_each_online_cpu(cpu) { p = create_workqueue_thread(wq, cpu);if (p) { // 绑定CPU kthread_bind(p, cpu);// 唤醒线程wake_up_process(p);} else destroy = 1;}
mutex_unlock(&workqueue_mutex);/* * Was there any error during startup? If yes then clean up:*/ if (destroy) { // 建⽴线程失败,释放⼯作队列destroy_workqueue(wq);wq = NULL;} return wq;} EXPORT_SYMBOL_GPL(__create_workqueue);
// 创建⼯作队列线程static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,int cpu)
源代码电影讲解 { // 每个CPU的⼯作队列struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);struct task_struct *p;
spin_lock_init(&cwq->lock);// 初始化cwq->wq = wq;cwq->thread = NULL;cwq->insert_sequence = 0;cwq->remove_sequence = 0;INIT_LIST_HEAD(&cwq->worklist);// 初始化等待队列more_work,该队列处理要执⾏的⼯作结构init_waitqueue_head(&cwq-
>more_work);// 初始化等待队列work_done,该队列处理执⾏完的⼯作结构init_waitqueue_head(&cwq->work_done);// 建⽴内核线程work_thread if (is_single_threaded(wq))
p = kthread_create(worker_thread, cwq, "%s", wq->name);else p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);if (IS_ERR(p))
return NULL;// 保存线程指针cwq->thread = p;return p;} static int worker_thread(void *__cwq)
{ struct cpu_workqueue_struct *cwq = __cwq;// 声明⼀个等待队列DECLARE_WAITQUEUE(wait, current);// 信号struct
k_sigaction sa;sigset_t blocked;current->flags |= PF_NOFREEZE;// 降低进程优先级,⼯作进程不是个很紧急的进程,不和其他进程抢占CPU,通常在系统空闲时运⾏set_user_nice(current, -5);/* Block and flush all signals */ // 阻塞所有信号sigfillset(&blocked);sigprocmask(SIG_BLOCK, &blocked, NULL);flush_signals(current);/* * We inherited MPOL_INTERLEAVE from the booting kernel. * Set MPOL_DEFAULT to insure node local allocations. */ numa_default_policy();/* SIG_IGN makes children autoreap: see do_notify_parent()。 */ // 信号处理都是忽略sa.sa.sa_handler = SIG_IGN;sa.sa.sa_flags = 0;siginitset(&sa.sa.sa_mask,sigmask(SIGCHLD));do_sigaction(SIGCHLD, &sa,(struct k_sigaction *)0);// 进程可中断
set_current_state(TASK_INTERRUPTIBLE);// 进⼊循环,没明确停⽌该进程就⼀直运⾏while (!kthread_should_stop()) { // 设置more_work等待队列,当有新work结构链⼊队列中时会激发此等待队列add_wait_queue(&cwq->more_work, &wait);if
(list_empty(&cwq->worklist))
// ⼯作队列为空,睡眠schedule();else // 进⾏运⾏状态__set_current_state(TASK_RUNNING);// 删除等待队列
remove_wait_queue(&cwq->more_work, &wait);// 按链表遍历执⾏⼯作任务if (!list_empty(&cwq->worklist))
run_workqueue(cwq);// 执⾏完⼯作,设置进程是可中断的,重新循环等待⼯作set_current_state(TASK_INTERRUPTIBLE);} __set_current_state(TASK_RUNNING);return 0;}
// 运⾏⼯作结构static void run_workqueue(struct cpu_workqueue_struct *cwq)
{ unsigned long flags;/* * Keep taking off work from the queue until * done. */ // 加锁spin_lock_irqsave(&cwq->lock, flags);// 统计已经递归调⽤了多少次了cwq->run_depth++;if (
cwq->run_depth > 3) { // 递归调⽤此时太多/* morton gets to eat his hat */
printk("%s: recursion depth exceeded: %dn",__FUNCTION__, cwq->run_depth);dump_stack();} // 遍历⼯作链表while
(!list_empty(&cwq->worklist)) { // 获取的是next节点的struct work_struct *work = list_entry(cwq-&,struct work_struct,entry);void (*f)(void *) = work->func;void *data = work->data;// 删除节点,同时节点中的list参数清空list_del_init(cwq-
&);// 解锁// 现在在执⾏以下代码时可以中断,run_workqueue本⾝可能会重新被调⽤,所以要判断递归深度
spin_unlock_irqrestore(&cwq->lock, flags);BUG_ON(work->wq_data != cwq);// ⼯作结构已经不在链表中clear_bit(0, &work->pending);// 执⾏⼯作函数f(data);// 重新加锁spin_lock_irqsave(&cwq->lock, flags);// 执⾏完的⼯作序列号递增cwq-
>remove_sequence++;// 唤醒⼯作完成等待队列,供释放⼯作队列wake_up(&cwq->work_done);} // 减少递归深度cwq->run_depth ——;// 解锁spin_unlock_irqrestore(&cwq->lock, flags);}
4.2 释放⼯作队列/** * destroy_workqueue - safely terminate a workqueue * @wq: target workqueue * * Safely destroy a workqueue. All work currently pending will be done first. */ void destroy_workqueue(struct workqueue_struct *wq)
{ int cpu;// 清除当前⼯作队列中的所有⼯作flush_workqueue(wq);/* We don't need the distraction of CPUs appearing and vanishing. */ mutex_lock(&workqueue_mutex);// 结束该⼯作队列的线程if (is_single_threaded(wq))
cleanup_workqueue_thread(wq, singlethread_cpu);else { for_each_online_cpu(cpu)
cleanup_workqueue_thread(wq, cpu);list_del(&wq->list);} mutex_unlock(&workqueue_mutex);// 释放⼯作队列中对应每个CPU的⼯作队列数据free_percpu(wq->cpu_wq);kfree(wq);} EXPORT_SYMBOL_GPL(destroy_workqueue);
/** * flush_workqueue - ensure that any scheduled work has run to completion. * @wq: workqueue to flush * * Forces execution of the workqueue and blocks until its completion. * This is typically used in driver shutdown handlers. * * This function will sample each workqueue's current insert_sequence number and * will sleep until the head sequence is greater than or equal to that. This * means that we sleep until all works which were queued on entry have been * handled, but we are not livelocke
d by new incoming ones. * * This function used to run the workqueues itself. Now we just wait for the * helper threads to do it. */ void fastcall flush_workqueue(struct
workqueue_struct *wq)
{ // 该进程可以睡眠might_sleep();// 清空每个CPU上的⼯作队列if (is_single_threaded(wq)) { /* Always use first cpu's area. */ flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu));} else { int cpu;mutex_lock(&workqueue_mutex);
for_each_online_cpu(cpu)
flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));mutex_unlock(&workqueue_mutex);}
EXPORT_SYMBOL_GPL(flush_workqueue);
flush_workqueue的核⼼处理函数为flush_cpu_workqueue:static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
{ if (cwq->thread == current) { // 如果是⼯作队列进程正在被调度/* * Probably keventd trying to flu
sh its own queue. So simply run * it by hand rather than deadlocking. */ // 执⾏完该⼯作队列run_workqueue(cwq);} else { // 定义等待DEFINE_WAIT(wait);long sequence_needed;// 加锁spin_lock_irq(&cwq->lock);// 最新⼯作结构序号sequence_needed = cwq->insert_sequence;// 该条件是判断队列中是否还有没有执⾏的⼯作结构while (sequence_needed - cwq->remove_sequence > 0) { // 有为执⾏的⼯作结构// 通过
work_done等待队列等待prepare_to_wait(&cwq->work_done, &wait,TASK_UNINTERRUPTIBLE);// 解锁spin_unlock_irq(&cwq->lock);// 睡眠,由wake_up(&cwq->work_done)来唤醒schedule();// 重新加锁spin_lock_irq(&cwq->lock);} // 等待清除finish_wait(&cwq->work_done, &wait);spin_unlock_irq(&cwq->lock);}
4.3 调度⼯作
在⼤多数情况下,并不需要⾃⼰建⽴⼯作队列,⽽是只定义⼯作,将⼯作结构挂接到内核预定义的事件⼯作队列中调度,在
kernel/workqueue.c中定义了⼀个静态全局量的⼯作队列keventd_wq:static struct workqueue_struct *keventd_wq;
4.3.1 ⽴即调度// 在其他函数中使⽤以下函数来调度⼯作结构,是把⼯作结构挂接到⼯作队列中进⾏调度/** * schedule_work - put work task in global workqueue * @work: job to be done * * This puts a job in the kernel-global workqueue. */ // 调度⼯作结构,将⼯作结构添加到事件⼯作队列keventd_wq int fastcall schedule_work(struct work_struct *work)
{ return queue_work(keventd_wq, work);} EXPORT_SYMBOL(schedule_work);
/** * queue_work - queue work on a workqueue * @wq: workqueue to use * @work: work to queue * * Returns 0 if @work was already on a queue, non-zero otherwise. * * We queue the work to the CPU it was submitted, but there is no * guarantee that it will be processed by that CPU. */ int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
{ int ret = 0, cpu = get_cpu();if (!test_and_set_bit(0, &work->pending)) { // ⼯作结构还没在队列,设置pending标志表⽰把⼯作结构挂接到队列中if (unlikely(is_single_threaded(wq)))
cpu = singlethread_cpu;BUG_ON(!list_empty(&work->entry));// 进⾏具体的排队__queue_work(per_cpu_ptr(wq-
>cpu_wq, cpu), work);ret = 1;} put_cpu();return ret;} EXPORT_SYMBOL_GPL(queue_work);/* Preempt must be disabled. */ // 不能被抢占static void __queue_work(struct cpu_workqueue_struct *cwq,struct work_struct *work)
{ unsigned long flags;// 加锁spin_lock_irqsave(&cwq->lock, flags);// 指向CPU⼯作队列work->wq_data = cwq;// 挂接到⼯作链表list_add_tail(&work->entry, &cwq->worklist);// 递增插⼊的序列号cwq->insert_sequence++;// 唤醒等待队列准备处理⼯作结构wake_up(&cwq->more_work);spin_unlock_irqrestore(&cwq->lock, flags);}
4.3.2 延迟调度
4.3.2.1 schedule_delayed_work /** * schedule_delayed_work - put work task in global workqueue after delay * @work: job to be done * @delay: number of jiffies to wait * * After waiting for a given time this puts a job in the kernel-global * workqueue. */ // 延迟调度⼯作,延迟⼀定时间后再将⼯作结构挂接到⼯作队列int fastcall schedule_delayed_work(struct work_struct *work, unsigned long delay)
{ return queue_delayed_work(keventd_wq, work, delay);} EXPORT_SYMBOL(schedule_delayed_work);
/** * queue_delayed_work - queue work on a workqueue after delay * @wq: workqueue to use * @work: work to queue * @delay:number of jiffies to wait before queueing * * Returns 0 if @work was already on a queue, non-zero otherwise. */ int fastcall
queue_delayed_work(struct workqueue_struct *wq,struct work_struct *work, unsigned long delay)
{ int ret = 0;// 定时器,此时的定时器应该是不起效的,延迟将通过该定时器来实现struct timer_list *timer = &work->timer;if
(!test_and_set_bit(0, &work->pending)) { // ⼯作结构还没在队列,设置pending标志表⽰把⼯作结构挂接到队列中// 如果现在定时器已经起效,出错BUG_ON(timer_pending(timer));// ⼯作结构已经挂接到链表,出错BUG_ON(!list_empty(&work-
>entry));/* This stores wq for the moment, for the timer_fn */ // 保存⼯作队列的指针work->wq_data = wq;// 定时器初始化timer-
>expires = jiffies + delay;timer->data = (unsigned long)work;// 定时函数timer->function = delayed_work_timer_fn;// 定时器⽣效,定时到期后再添加到⼯作队列add_timer(timer);ret = 1;} return ret;} EXPORT_SYMBOL_GPL(queue_delayed_work);
// 定时中断函数static void delayed_work_timer_fn(unsigned long __data)
{ struct work_struct *work = (struct work_struct *)__data;struct workqueue_struct *wq = work->wq_data;// 获取CPU int cpu = smp_processor_id();if (unlikely(is_single_threaded(wq)))
cpu = singlethread_cpu;// 将⼯作结构添加到⼯作队列,注意这是在时间中断调⽤__queue_work(per_cpu_ptr(wq->cpu_wq,cpu), work);}
4.3.2.2 schedule_delayed_work_on
指定CPU的延迟调度⼯作结构,和schedule_delayed_work相⽐增加了⼀个CPU参数,其他都相同/** * schedule_delayed_work_on -queue work in global workqueue on CPU after delay * @cpu: cpu to use * @work: job to be done * @delay: number of jiffies to wait * * After waiting for a given time this puts a job in the kernel-global * workqueue on the specified CPU. */ int schedule_delayed_work_on(int cpu,struct work_struct *work, unsigned long delay)
{ return queue_delayed_work_on(cpu, keventd_wq, work, delay);}
/** * queue_delayed_work_on - queue work on specific CPU after delay * @cpu: CPU number to
execute work on * @wq:workqueue to use * @work: work to queue * @delay: number of jiffies to wait before queueing * * Returns 0 if @work was already on a queue, non-zero otherwise. */ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,struct work_struct *work, unsigned long delay)
{ int ret = 0;struct timer_list *timer = &work->timer;if (!test_and_set_bit(0, &work->pending)) {
BUG_ON(timer_pending(timer));BUG_ON(!list_empty(&work->entry));/* This stores wq for the moment, for the timer_fn */ work->wq_data = wq;timer->expires = jiffies + delay;timer->data = (unsigned long)work;timer->function = delayed_work_timer_fn;add_timer_on(timer, cpu);ret = 1;} return ret;} EXPORT_SYMBOL_GPL(queue_delayed_work_on);
5. 结论
⼯作队列和定时器函数处理有点类似,都是执⾏⼀定的回调函数,但和定时器处理函数不同的是定时器回调函数只执⾏⼀次,⽽且执⾏定时器回调函数的时候是在时钟中断中,限制⽐较多,因此回调程序不能太复杂;⽽⼯作队列是通过内核线程实现,⼀直有效,可重复执⾏,由于执⾏时降低了线程的优先级,执⾏时可能休眠,因此⼯作队列处理的应该是那些不是很紧急的任务,如垃圾回收处理等,
通常在系统空闲时执⾏,在xfrm库中就⼴泛使⽤了workqueue,使⽤时,只需要定义work结构,然后调⽤schedule_(delayed_)work即可。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论