Go 的一种并发模式:管道

Go 管道模式

开始

这片文章是看完 Golang 官方博客后写下,记录下。原文在这里

首先是 What is pipeline?官方博客上将管道看作是并发的程序的一种。一系列的 stage 通过 channel 串联起来。其中,每个 stage 都是一组执行相同任务的 goroutine。在每个 Stage 中,每个 goroutine 都会做三件事情:

  • 通过一个 channel 接收到要处理的上游数据。
  • 对接收的数据做处理,通常是产生一个新的值
  • 将产生的新数据传入另一个 channel 中,传递到下游。

除了第一个和最后一个 Stage 以外,中间的 Stage 都会有几个传入和传出的数据通道,第一个只有传出,而最后一个只有传入。数据由第一个 stage 流入到最后一个 stage。

也就是说,在 Go 中的管道(pipeline)就像是一条生产线,一个零件从被生产到被消费,会经过几个不同的车间处理。第一个车间会生产最初的数据,而最后一个车间去消费这个数据。

为了更好的帮助理解,官方博客给出了例子。首先是最简单的一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main

// gen 根据传入的数字列表产生 channel, 并且将数字传入其中。
// 在完成传输后,gen 会关闭相应的 channel.
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}

// sq 从 上游的 channel 中接收数据,做平方后传入新的 channel 中。
// 与 gen 相同,sq 也会在完成任务后,将他产生的 channel 关闭。
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}

// main 函数负责设置 channel,并且消费 channel 中产生的数据。
func main() {
for n := range sq(sq(gen(2, 3))) {
fmt.Println(n)
}
}

// Output:
// 16
// 81

Fan-out 和 Fan-in

fan-out: 多个程序从一个 channel 中读取数据,直到 channel 关闭。就像是一个管理人,分发工作给多个工人的情况。

fan-in: 与 fan-out 相反,一个程序从多个 channel 中读取数据,直到所有的 channel 都关闭了。于此同时,会将处理后的数据送入一个新的 channel 中,并在完成工作后关闭它。

例如,现在有一个 merge 函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func merge(done <-chan struct, cs ... <-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)

// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed or it receives a value
// from done, then output calls wg.Done.
output := func(c <-chan int) {
for n := range c {
select {
case out <- n:
case <-done:
}
}
wg.Done()
}

wg.Add(len(cs))
for _, c := range cs {
go output(c)
}

// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}

原有的 gen 方法会被改造成下面的样子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func sq(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-done:
return
}
}
}()
return out
}

main 函数则变成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func main() {
// Set up a done channel that's shared by the whole pipeline,
// and close that channel when this pipeline exits, as a signal
// for all the goroutines we started to exit.
done := make(chan struct{})
defer close(done)

in := gen(done, 2, 3)

// Distribute the sq work across two goroutines that both read from in.
c1 := sq(done, in)
c2 := sq(done, in)

// Consume the first value from output.
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9

// done will be closed by the deferred call.
}

通过设置一个名为 done 的 channel, 下游的程序可以给上游的程序发送信号,通知他们停止生产数据,作为了一个反馈途径。

因此,如果想要构建一个 pipeline 系统, 有以下的要点:

- 当所有的操作完成后,stage 关闭通向外界的 channel。
- stage 不断从流入 channel 中接收数据,直到这些 channel 关闭了或者是发送者解除阻塞状态。

Go 的一种并发模式:管道
https://blog.zhangliangliang.cc/post/go-concurrency-pipeline.html
作者
Bobby Zhang
发布于
2020年1月9日
许可协议