workqueue
介绍
Workqueue 是 client-go 提供的一个工作队列,主要的作用是提供一个先进先出的队列,顺序执行队列中的任务。
workqueue 与普通队列的不同之处在于,它支持对队列中的元素进行标记和去重。
workqueue 的基本特性如下:
有序:所有的元素按照添加的顺序进行处理
标记元素:标记元素是否被处理,在处理时重新排队
去重:相同的元素在同一时间不会被并发的处理,一个元素在被多次添加进队列的情况下,也只会被处理一次
并发性:支持多个生产者和消费者
通知:ShutDown 方法通过信号量通知队列不在接收新的元素,同时通知 metrics goroutine 退出
其他额外的特性:
- 延时队列:元素取出后延时一段时间再加入队列
- 限速队列:限制元素入队的速率,对元素加入队列的次数进行限制
- 监控:支持 promethus 监控
workerqueue 支持下列三种队列,对外提供三种接口
Interface
:FIFO 基础队列,支持去重DelayingInterface
:延迟队列,基于 Interface ,提供额外的AddAfter(item interface{}, duration time.Duration)
方法, 延迟一段时间后再将元素加入队列RateLimitingInterface
: 限速队列,基于 DelayingInterface, 增加AddRateLimited(item interface{})
,Forget(item interface{})
,NumRequeues(item interface{}) int
方法,对元素进入队列的速率进行限制
FIFO 队列
FIFO 是最基本的 workqueue,支持基本的队列操作,接口定义如下
1 |
|
FIFO 队列的结构如下
1 |
|
元素入队的过程
使用 Get()
获取元素执行时,如果 queue 为空,则直接返回空指针,以及 shutdown 为 true 的结果。如果队列中不为空,会先从 queue 中获取队头的元素,执行代码如下:
1 |
|
当对元素的处理完成后,调用 Done()
方法来标记元素已被处理,首先从 processing
中删除元素,然后查看 dirty
中是否有相同的元素,如果有就加入到 queue
中。
1 |
|
在并发执行的情况下,只有既不在 processing 也不再 dirty 中的元素会被直接加入到 queue 中被执行,其它情况下只有等待当前的元素执行完成才会被取出,保证了去重。
延迟队列
基于 FIFO 队列接口再次封装,增加了 AddAfter
方法
1 |
|
延迟队列的实现
1 |
|
waitingForAddCh 的默认初始大小为 1000, 当使用 AddAfter 加入元素后,会被加入到 waitingForAddCh 中。
waitingForChan 中的元素结构如下:
1 |
|
所有 waitingForAddCh 中的元素都是通过 waitingLoop 函数进行执行的。当 delayingType 被初始化时,会启动新的 goroutine 来执行 waitingLoop 方法。
当元素的延迟时间不大于当前时间的时候,即延迟时间还未到,元素插入到 waitForPriorityQueue 中。否则就将元素插入到 FIFO 队列中。
waitForPriorityQueue 是一个根据延迟时间排序的堆,堆顶的元素是最先就绪的元素,底层结构是一个 waitFor 的 slice
1 |
|
在遍历的过程中,查看 waitForPriorityQueue 堆顶的元素是是否就绪,如果就绪则 Pop 出堆顶元素,插入的 FIFO 队列中,然后继续查看下一个元素。
限速队列
限速队列,基于延迟队列与 FIFO 队列接口封装。新增了 AddRateLimited
, Forget
, NumRequeues
方法。
限速队列达通过延迟某个元素的插入时间,达到特定时间段内插入指定数量元素的目标。
定义如下:
1 |
|
限速周期:执行 AddRateLimited 方法到执行完 Forget 方法之间的时间。
限速算法
令牌桶算法
workqueue 的令牌桶算法借助 rate 库实现。
令牌桶内部是一个存放 token 的 bucket,初始时 bucket 为空,token 以固定的速率往 bucket 中填入。当 bucket 满了的时候,token 会被直接丢弃。
只有获取到了 token 的元素才会被 accept,没有获取到 token 的元素只能排队等待。令牌桶通过发放 token 的方式达到限制速率。
workqueue 默认队列为
1 |
|
在限速周期内插入的元素,通过 r.LimiterReserve().Delay
返回指定元素等待的时间,前 bucket 容量的元素会被立即返回,之后的元素的延迟时间为 100/100 ms, 101/200ms, 102/300ms。每 100ms 放入一个令牌,后续的令牌得在 100 ms 后才能获取到新令牌。
排队指数算法
将元素的排队数量作为指数,排队数增大,速率就以 2 的指数形式增长,但最大不超过 maxDelay。
基础的延迟速率为 baseDelay,如果有十个相同的元素在排队,那么第一个元素的延迟时间为 1* baseDelay, 第二个元素的延迟时间就是 2 * baseDelay,第三个元素的延迟时间就为 4 * baseDelay,第四个元素的延迟时间就为 8 * baseDelay,以此类推,直到超过 maxDelay 后,设置为 maxDelay。
计数器算法
在一个计数周期内,最多只允许通过规定数量的元素。
比如,设定在 1 分钟内只能通过 100 个元素。开始的时候,计数器归零,每通过一个元素,计数器 +1,当达到上限 100 时不允许新的元素通过。当 1 分钟时间到了后,重新开始计时,计数器归零,重新开始计数。
基于计数器算法,拓展增加 slow 和 fast 功能,当元素排队的数量小于 maxFastAttempts 使用 fastDelay,超过之后使用 fastDelay。
混合模式
混合多种算法,在一个队列中有多个算法同时生效。