workqueue

介绍

Workqueue 是 client-go 提供的一个工作队列,主要的作用是提供一个先进先出的队列,顺序执行队列中的任务。

workqueue 与普通队列的不同之处在于,它支持对队列中的元素进行标记和去重。

workqueue 的基本特性如下:

  • 有序:所有的元素按照添加的顺序进行处理

  • 标记元素:标记元素是否被处理,在处理时重新排队

  • 去重:相同的元素在同一时间不会被并发的处理,一个元素在被多次添加进队列的情况下,也只会被处理一次

  • 并发性:支持多个生产者和消费者

  • 通知:ShutDown 方法通过信号量通知队列不在接收新的元素,同时通知 metrics goroutine 退出

其他额外的特性:

  • 延时队列:元素取出后延时一段时间再加入队列
  • 限速队列:限制元素入队的速率,对元素加入队列的次数进行限制
  • 监控:支持 promethus 监控

workerqueue 支持下列三种队列,对外提供三种接口

  • Interface:FIFO 基础队列,支持去重
  • DelayingInterface:延迟队列,基于 Interface ,提供额外的 AddAfter(item interface{}, duration time.Duration) 方法, 延迟一段时间后再将元素加入队列
  • RateLimitingInterface: 限速队列,基于 DelayingInterface, 增加AddRateLimited(item interface{}), Forget(item interface{}), NumRequeues(item interface{}) int 方法,对元素进入队列的速率进行限制

FIFO 队列

FIFO 是最基本的 workqueue,支持基本的队列操作,接口定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type Interface interface {
// Add 向队列中添加元素
Add(item interface{})
// 获取队列的长度
Len() int
// 获取队列头部的元素,当 shutdown 为 true 时,消费者应该关闭 goroutine
Get() (item interface{}, shutdown bool)
// 标记元素已被处理
Done(item interface{})
// 关闭队列
ShutDown()
// 停止元素入队,等待所有元素处理完成后,关闭队列
ShutDownWithDrain()
// 查询队列是否已关闭
ShuttingDown() bool
}

FIFO 队列的结构如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type Type struct {
// queue 按顺序存放了需要处理的元素,
// queue 中的每个元素都应该在 dirty 中,而不在 processing 中
queue []t

// dirty 存放了所有需要被处理的元素
dirty set

// processing 中的元素表示正在被处理,处理完成后,从其中移除元素
// 并检查 dirty 中是否有相同的元素,如果有,将元素加入到 queue 中
processing set

cond *sync.Cond

shuttingDown bool
drain bool

metrics queueMetrics

unfinishedWorkUpdatePeriod time.Duration
clock clock.WithTicker
}

元素入队的过程

workqueue-add.png

使用 Get() 获取元素执行时,如果 queue 为空,则直接返回空指针,以及 shutdown 为 true 的结果。如果队列中不为空,会先从 queue 中获取队头的元素,执行代码如下:

1
2
3
4
5
6
7
8
item := queue[0]
queue[0] = nil // 底层的数组仍然持有对象,不会被垃圾回收,所以需要设置为 nil
queue = queue[1:]

q.metrics.get(item)

q.processing.insert(item)
q.dirty.delete(item)

当对元素的处理完成后,调用 Done() 方法来标记元素已被处理,首先从 processing 中删除元素,然后查看 dirty 中是否有相同的元素,如果有就加入到 queue 中。

1
2
3
4
5
q.processing.delete(item)
if q.dirty.has(item) {
q.queue = append(q.queue, item)
q.cond.Signal()
}

在并发执行的情况下,只有既不在 processing 也不再 dirty 中的元素会被直接加入到 queue 中被执行,其它情况下只有等待当前的元素执行完成才会被取出,保证了去重。

延迟队列

基于 FIFO 队列接口再次封装,增加了 AddAfter方法

1
2
3
4
5
6
type DelayingInterface interface {
Interface
// AddAfter 在指定的 duration 之后将元素加入到队列中
// 当 duration < 0 时,元素会被直接插入到其中
AddAfter(item interface{}, duration time.Duration)
}

延迟队列的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type delayingType struct {
Interface

// clock tracks time for delayed firing
clock clock.Clock

// stopCh 发送信号来停止 waiting loop
stopCh chan struct{}
// stopOnce 保证只发送一次停止信号
stopOnce sync.Once

// heartbeat ensures we wait no more than maxWait before firing
// heartbeat 用来保证没有超过最大等待时间
heartbeat clock.Ticker

// waitingForAddCh 是一个缓冲 channel 提供 waitingForAdd 的元素
waitingForAddCh chan *waitFor

// metrics 计算重试的次数
metrics retryMetrics
}

waitingForAddCh 的默认初始大小为 1000, 当使用 AddAfter 加入元素后,会被加入到 waitingForAddCh 中。

waitingForChan 中的元素结构如下:

1
2
3
4
5
6
type waitFor struct {
data t
readyAt time.Time
// 在 waitForProrityQueue 中的位置
index int
}

所有 waitingForAddCh 中的元素都是通过 waitingLoop 函数进行执行的。当 delayingType 被初始化时,会启动新的 goroutine 来执行 waitingLoop 方法。

当元素的延迟时间不大于当前时间的时候,即延迟时间还未到,元素插入到 waitForPriorityQueue 中。否则就将元素插入到 FIFO 队列中。

waitForPriorityQueue 是一个根据延迟时间排序的堆,堆顶的元素是最先就绪的元素,底层结构是一个 waitFor 的 slice

1
type waitForPriorityQueue []*waitFor

在遍历的过程中,查看 waitForPriorityQueue 堆顶的元素是是否就绪,如果就绪则 Pop 出堆顶元素,插入的 FIFO 队列中,然后继续查看下一个元素。

限速队列

限速队列,基于延迟队列与 FIFO 队列接口封装。新增了 AddRateLimited, Forget, NumRequeues 方法。

限速队列达通过延迟某个元素的插入时间,达到特定时间段内插入指定数量元素的目标。

定义如下:

1
2
3
4
5
6
7
8
type RateLimiter interface {
// 获取元素应该等待的时间
When(item interface{}) time.Duration
// 释放元素,清空元素的失败次数
Forget(item interface{})
// 返回元素的失败次数
NumRequeues(item interface{}) int
}

限速周期:执行 AddRateLimited 方法到执行完 Forget 方法之间的时间。

限速算法

令牌桶算法

令牌桶算法

workqueue 的令牌桶算法借助 rate 库实现。

令牌桶内部是一个存放 token 的 bucket,初始时 bucket 为空,token 以固定的速率往 bucket 中填入。当 bucket 满了的时候,token 会被直接丢弃。

只有获取到了 token 的元素才会被 accept,没有获取到 token 的元素只能排队等待。令牌桶通过发放 token 的方式达到限制速率。

workqueue 默认队列为

1
2
3
4
5
6
7
func DefaultControllerRateLimiter() RateLimiter {
return NewMaxOfRateLimiter(
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
// 每秒填入 10 个令牌,bucket 的容量为 100.
&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
}

在限速周期内插入的元素,通过 r.LimiterReserve().Delay 返回指定元素等待的时间,前 bucket 容量的元素会被立即返回,之后的元素的延迟时间为 100/100 ms, 101/200ms, 102/300ms。每 100ms 放入一个令牌,后续的令牌得在 100 ms 后才能获取到新令牌。

排队指数算法

将元素的排队数量作为指数,排队数增大,速率就以 2 的指数形式增长,但最大不超过 maxDelay。

基础的延迟速率为 baseDelay,如果有十个相同的元素在排队,那么第一个元素的延迟时间为 1* baseDelay, 第二个元素的延迟时间就是 2 * baseDelay,第三个元素的延迟时间就为 4 * baseDelay,第四个元素的延迟时间就为 8 * baseDelay,以此类推,直到超过 maxDelay 后,设置为 maxDelay。

计数器算法

在一个计数周期内,最多只允许通过规定数量的元素。

比如,设定在 1 分钟内只能通过 100 个元素。开始的时候,计数器归零,每通过一个元素,计数器 +1,当达到上限 100 时不允许新的元素通过。当 1 分钟时间到了后,重新开始计时,计数器归零,重新开始计数。

基于计数器算法,拓展增加 slow 和 fast 功能,当元素排队的数量小于 maxFastAttempts 使用 fastDelay,超过之后使用 fastDelay。

混合模式

混合多种算法,在一个队列中有多个算法同时生效。


workqueue
https://blog.zhangliangliang.cc/post/workqueue.html
作者
Bobby Zhang
发布于
2022年7月11日
许可协议