RT-Thread 隐藏的宝藏之工作队列

1. 工作队列是什么

工作队列:我们可以将不是很紧急处理的事情放到 workqueue 中处理,等待系统空闲时就会去执行 workqueue 里的事情。工作队列的本质就是开一个线程去处理排列好的任务,所以工作队列中不能有死循环,尽可能的不要使用使用会导致线程阻塞的API

workqueue 的作用就是将工作延迟处理, workqueue 是中断处理的后半程。

2. 工作队列怎么使用

  1. 初始化 work
rt_inline void rt_work_init(struct rt_work *work, void (*work_func)(struct rt_work *work, void *work_data),
                               void *work_data)

work : 工作队列的控制块

*work_func : 指定任务回调函数的指针

*work_data : 待处理的数据指针

  1. 创建工作队列

    struct rt_workqueue *rt_workqueue_create(const char *name, rt_uint16_t stack_size, rt_uint8_t priority)
    

    name : 创建工作队列线程的名字

    stack_size : 创建工作队列线程的栈大小

    priority :创建工作队列线程的优先级

  2. 删除工作队列

    rt_err_t rt_workqueue_destroy(struct rt_workqueue *queue)
    

    queue :工作队列线程块的删除

  3. 增加工作队列任务

    rt_err_t rt_workqueue_dowork(struct rt_workqueue *queue, struct rt_work *work)
    

    queue : 工作队列控制块

    work : work 控制块

  4. 提交工作队列任务

    rt_err_t rt_workqueue_submit_work(struct rt_workqueue *queue, struct rt_work *work, rt_tick_t time)
    

    queue : 工作队列控制块

    work : work 控制块

    time : 超时时间

  5. 提交临界工作队列

    rt_err_t rt_workqueue_critical_work(struct rt_workqueue *queue, struct rt_work *work)
    

    queue : 工作队列控制块

    work : work 控制块

  6. 取消工作队列任务

    rt_err_t rt_workqueue_cancel_work(struct rt_workqueue *queue, struct rt_work *work)
    

    queue : 工作队列控制块

    work : work 控制块

  7. 同步取消工作队列任务

    rt_err_t rt_workqueue_cancel_work_sync(struct rt_workqueue *queue, struct rt_work *work)
    

    queue : 工作队列控制块

    work : work 控制块

  8. 取消工作队列所有任务

    rt_err_t rt_workqueue_cancel_all_work(struct rt_workqueue *queue)
    

    queue : 工作队列控制块

  9. 初始化延迟队列

void rt_delayed_work_init(struct rt_delayed_work *work, void (*work_func)(struct rt_work *work,
                       void *work_data), void *work_data)

3. 工作队列的实现原理

先看一下工作队列相关的结构体

/* workqueue implementation */
struct rt_workqueue
{
    rt_list_t      work_list;
    struct rt_work *work_current; /* current work */

    struct rt_semaphore sem;
    rt_thread_t    work_thread;
};

struct rt_work
{
    rt_list_t list;

    void (*work_func)(struct rt_work *work, void *work_data);
    void *work_data;
    rt_uint16_t flags;
    rt_uint16_t type;
    struct rt_timer timer;
    struct rt_workqueue *workqueue;
};

struct rt_delayed_work
{
    struct rt_work work;
};

通过结构体可以知道, 每一个 work 都通过链表挂载 work_queue

  1. 初始化 work

    rt_inline void rt_work_init(struct rt_work *work, void (*work_func)(struct rt_work *work, void *work_data),
                                void *work_data)
    {
        rt_list_init(&(work->list)); // 初始化 work 链表
        work->work_func = work_func; // work 回调函数
        work->work_data = work_data; // 带处理的数据缓冲区指针
        work->workqueue = RT_NULL; // 初始化的时候不挂载到任何 workqueue 上
        work->flags = 0;
        work->type = 0;
    }
    
  2. 创建工作队列

    struct rt_workqueue *rt_workqueue_create(const char *name, rt_uint16_t stack_size, rt_uint8_t priority)
    {
        struct rt_workqueue *queue = RT_NULL;
        /* 为 queue 申请内存 */
        queue = (struct rt_workqueue *)RT_KERNEL_MALLOC(sizeof(struct rt_workqueue));
        if (queue != RT_NULL)
        {
            /* initialize work list */
            rt_list_init(&(queue->work_list));// 初始化链表 &(queue->work_list)
            queue->work_current = RT_NULL; // 当前还没有 work
            rt_sem_init(&(queue->sem), "wqueue", 0, RT_IPC_FLAG_FIFO); // 初始化一个信号量
    
            /* create the work thread */
            queue->work_thread = rt_thread_create(name, _workqueue_thread_entry, queue, stack_size, priority, 10); // 创建一个线程
            if (queue->work_thread == RT_NULL)
            {
                RT_KERNEL_FREE(queue);
                return RT_NULL;
            }
    
            rt_thread_startup(queue->work_thread); // 启动这个线程
        }
    
        return queue;
    }
    
  3. 删除工作队列

    rt_err_t rt_workqueue_destroy(struct rt_workqueue *queue)
    {
     RT_ASSERT(queue != RT_NULL);
    
     rt_thread_delete(queue->work_thread);// 删除创建的线程
     RT_KERNEL_FREE(queue);// 释放 queue 申请的内存
    
     return RT_EOK;
     }
    
    
  4. 增加任务到工作队列

    rt_err_t rt_workqueue_dowork(struct rt_workqueue *queue, struct rt_work *work)
    {
        RT_ASSERT(queue != RT_NULL);
        RT_ASSERT(work != RT_NULL);
    
        return _workqueue_submit_work(queue, work); // 提交到工作队列
    }
    
    static rt_err_t _workqueue_submit_work(struct rt_workqueue *queue, struct rt_work *work)
    {
        rt_base_t level;
    
        level = rt_hw_interrupt_disable(); // 关中断
        if (work->flags & RT_WORK_STATE_PENDING) // 确保工作队列不是在挂起状态 
        {
            rt_hw_interrupt_enable(level); // 开中断
            return -RT_EBUSY; // 返回 忙
        }
    
        if (queue->work_current == work) // 如果插入的 work 是当前的 work
        {
            rt_hw_interrupt_enable(level);
            return -RT_EBUSY; // 返回忙
        }
    
        /* NOTE: the work MUST be initialized firstly */
        rt_list_remove(&(work->list));// 把 work 的 list 移除,确保链表的首尾都指向自己
    
        rt_list_insert_after(queue->work_list.prev, &(work->list));//把work链表插入到queue
        work->flags |= RT_WORK_STATE_PENDING;//设置工作队列为挂起状态
    
        /* whether the workqueue is doing work */
        if (queue->work_current == RT_NULL) // 如果当前工作队列中没有work
        {
            rt_hw_interrupt_enable(level);// 开启中断
            /* resume work thread */
            rt_thread_resume(queue->work_thread);// 恢复当前 queue 上的线程
            rt_schedule();// 开调度
        }
        else
        {
            rt_hw_interrupt_enable(level);
        }
    
        return RT_EOK;
    }
    
  5. 提交工作到工作队列

    rt_err_t rt_workqueue_submit_work(struct rt_workqueue *queue, struct rt_work *work, rt_tick_t time)
    {
        RT_ASSERT(queue != RT_NULL);
        RT_ASSERT(work != RT_NULL);
    
        if (time > 0) //如果时间设置大于0
        {
            work->type |= RT_WORK_TYPE_DELAYED; //设置 WORK 的状态为等待状态
        }
    
        if (work->type & RT_WORK_TYPE_DELAYED) // 如果是等待状态
        {
            return _workqueue_submit_delayed_work(queue, work, time);
        }
        else
        {
            return _workqueue_submit_work(queue, work); // 参考前面的分析
        }
    }
    
    static rt_err_t _workqueue_submit_delayed_work(struct rt_workqueue *queue,
            struct rt_work *work, rt_tick_t ticks)
    {
        rt_base_t level;
        rt_err_t ret = RT_EOK;
    
        /* Work cannot be active in multiple queues */
        if (work->workqueue && work->workqueue != queue) // 确保工作不在多个工作队列中
        {
            ret = -RT_EINVAL;
            goto __exit;
        }
    
        /* Cancel if work has been submitted */
        if (work->workqueue == queue) //如果已经提交了
        {
            ret = _workqueue_cancel_delayed_work(work); // 那么就取消这工作
            if (ret < 0)
            {
                goto __exit;
            }
        }
    
        level = rt_hw_interrupt_disable(); // 关中断
        /* Attach workqueue so the timeout callback can submit it */
        work->workqueue = queue; // work 在 queue 上
        rt_hw_interrupt_enable(level); // 开中断
    
        if (!ticks) // 如果ticks 是 0
        {
            /* Submit work if no ticks is 0 */
            ret = _workqueue_submit_work(work->workqueue, work);
        }
        else
        {
            level = rt_hw_interrupt_disable(); // 关中断
            /* Add timeout */
            work->flags |= RT_WORK_STATE_SUBMITTING; // 设置状态为 提交状态
            rt_timer_init(&(work->timer), "work", _delayed_work_timeout_handler, work, ticks,
                          RT_TIMER_FLAG_ONE_SHOT | RT_TIMER_FLAG_SOFT_TIMER); // 初始化定时器
            rt_hw_interrupt_enable(level); // 开中断
            rt_timer_start(&(work->timer)); // 启动定时器
        }
    
    __exit:
        return ret;
        }
    

  
  static void _delayed_work_timeout_handler(void *parameter)
  {
      struct rt_work *delayed_work;
      rt_base_t level;
  
      delayed_work = (struct rt_work *)parameter; // 获取参数
      level = rt_hw_interrupt_disable(); // 关中断
      rt_timer_stop(&(delayed_work->timer)); // 停止定时器
      rt_timer_detach(&(delayed_work->timer)); // 脱离定时器
      delayed_work->flags &= ~RT_WORK_STATE_SUBMITTING; // 取消标志位
      delayed_work->type &= ~RT_WORK_TYPE_DELAYED; // 取消标志位
      rt_hw_interrupt_enable(level); // 开中断 
      _workqueue_submit_work(delayed_work->workqueue, delayed_work); //把work加入到queue
  }
  1. 提交临界工作队列

    rt_err_t rt_workqueue_critical_work(struct rt_workqueue *queue, struct rt_work *work)
    {
        rt_base_t level;
        RT_ASSERT(queue != RT_NULL);
        RT_ASSERT(work != RT_NULL);
    
        level = rt_hw_interrupt_disable(); // 关中断
        if (queue->work_current == work) // 确保当前队列上的工作不是当前工作
        {
            rt_hw_interrupt_enable(level);
            return -RT_EBUSY;
        }
    
        /* NOTE: the work MUST be initialized firstly */
        rt_list_remove(&(work->list)); // 确保链表是空的
    
        rt_list_insert_after(queue->work_list.prev, &(work->list));// 把work 插入到 queue
        if (queue->work_current == RT_NULL) // 如果当前的work 是空
        {
            rt_hw_interrupt_enable(level); // 使能中断
            /* resume work thread */
            rt_thread_resume(queue->work_thread); // 恢复这个线程
            rt_schedule(); // 开启调度
        }
        else rt_hw_interrupt_enable(level);
    
        return RT_EOK;
    }
    

    这里没有设置 工作状态为 work->flags |= RT_WORK_STATE_PENDING; 所以 WORK 会立即得到执行.

  2. 取消队列中的工作

    rt_err_t rt_workqueue_cancel_work(struct rt_workqueue *queue, struct rt_work *work)
    {
        RT_ASSERT(queue != RT_NULL);
        RT_ASSERT(work != RT_NULL);
    
        if (work->type & RT_WORK_TYPE_DELAYED) // 如果在等待状态
        {
            return _workqueue_cancel_delayed_work(work); // 延迟取消
        }
        else
        {
            return _workqueue_cancel_work(queue, work); // 取消工作
        }
    }
    
    static rt_err_t _workqueue_cancel_delayed_work(struct rt_work *work)
    {
        rt_base_t level;
        int ret = RT_EOK;
    
        if (!work->workqueue) // 如果 work 不在 队列上
        {
            ret = -EINVAL;
            goto __exit;
        }
    
        if (work->flags & RT_WORK_STATE_PENDING) // work 的状态为 挂起状态
        {
            /* Remove from the queue if already submitted */
            ret = _workqueue_cancel_work(work->workqueue, work);// 取消任务
            if (ret)
            {
                goto __exit;
            }
        }
        else
        {
            if (work->flags & RT_WORK_STATE_SUBMITTING) // 如果状态是提交状态
            {
                level = rt_hw_interrupt_disable(); // 关中断
                rt_timer_stop(&(work->timer)); // 停止定时器
                rt_timer_detach(&(work->timer));// 脱离定时器
                work->flags &= ~RT_WORK_STATE_SUBMITTING; // 取消标志位
                rt_hw_interrupt_enable(level);// 开中断
            }
        }
    
        level = rt_hw_interrupt_disable(); // 关中断
        /* Detach from workqueue */
        work->workqueue = RT_NULL;
        work->flags &= ~(RT_WORK_STATE_PENDING);
        rt_hw_interrupt_enable(level);
    
    __exit:
        return ret;
    }
    
    static rt_err_t _workqueue_cancel_work(struct rt_workqueue *queue, struct rt_work *work)
    {
        rt_base_t level;
    
        level = rt_hw_interrupt_disable();
        if (queue->work_current == work) //如果 work 的工作还没做完
        {
            rt_hw_interrupt_enable(level);
            return -RT_EBUSY;
        }
        rt_list_remove(&(work->list)); //从链表上移除
        work->flags &= ~RT_WORK_STATE_PENDING; // 设置为 pending 状态
        rt_hw_interrupt_enable(level); // 开启使能
    
        return RT_EOK;
    }
    
  3. 取消同步工作

    rt_err_t rt_workqueue_cancel_work_sync(struct rt_workqueue *queue, struct rt_work *work)
    {
        rt_base_t level;
    
        RT_ASSERT(queue != RT_NULL);
        RT_ASSERT(work != RT_NULL);
    
        level = rt_hw_interrupt_disable();
        if (queue->work_current == work) /* it's current work in the queue */
        {
            /* wait for work completion */
            rt_sem_take(&(queue->sem), RT_WAITING_FOREVER); // 等待获取信号量
        }
        else
        {
            rt_list_remove(&(work->list)); // 移除节点
        }
        work->flags &= ~RT_WORK_STATE_PENDING;
        rt_hw_interrupt_enable(level);
    
        return RT_EOK;
    }
    
  4. 取消所有工作

    rt_err_t rt_workqueue_cancel_all_work(struct rt_workqueue *queue)
    {
        struct rt_list_node *node, *next;
        RT_ASSERT(queue != RT_NULL);
    
        rt_enter_critical(); // 进入临界区
        for (node = queue->work_list.next; node != &(queue->work_list); node = next)
        {
            next = node->next;
            rt_list_remove(node);// 遍历移除
        }
        rt_exit_critical(); // 退出临界区
    
        return RT_EOK;
    }
    
  5. 创建延迟工作队列

    void rt_delayed_work_init(struct rt_delayed_work *work, void (*work_func)(struct rt_work *work,
                              void *work_data), void *work_data)
    {
        rt_work_init(&work->work, work_func, work_data); // 初始化 work
    }
    

4. 总结

  1. 工作队列的本质就一个线程完成多个工作,每个工作都是一个函数,切记函数里面不能有死循环
  2. 工作队列的同步的原理是使用了信号量
  3. 延时工作是等待设定的时间后才把 work 加入到工作队列
上一篇:信号量和互斥锁


下一篇:java-有人知道如何实现类似于Google App Engine的白名单类访问方法吗?