CSP(CommunicatingSequentialProcess)中文翻译"通信顺序进程"或"交换信息的循序进程", CSP描述了一种并发系统进行交互的模式。 CSP允许使用进程组件来描述系统,这些进程组件独立运行并且通过消息传递的方式通信。

Go推荐的并发编程模型为CSP,Go通过Channel来实现CSP,Channel是Go里面CSP的核心。 CSP并发模型不同于传统的多线程并发通过共享内存来通信,CSP是以通信的方式来共享内存。 Go并发的核心哲学是尽量通过消息传递取代共享内存来尽可能的避免显示锁。

1.Channel的定义

在Go中,channel要求操作双方必须明确数据类型,操作的一方并不关心另外一端操作者的数量。 可以把channel理解成一个队列,但有其特殊性,从同步和异步两种模式来理解:

  • 同步模式:要求channel的发送方和接收方成对出现,如果一方没有就绪,则另一方将阻塞。
  • 异步模式:异步模式下将抢夺channel的缓冲区,发送方要求有缓冲区有空位可写入,如果没有则发送方阻塞;接收方要求有缓存数据可读取,如果没有则接收方阻塞。

1.1 nil是chan类型的零值

  • 向值为nil的chan发送数据将一直阻塞
  • 从值为nil的chan接收数据将一直阻塞

1.2 带缓冲区的channel

1c := make(chan string, 3)
2c <- "hello"
3c <- "world"                //异步通道缓冲区未满,写入数据不会阻塞
4fmt.Println(len(c), cap(c)) // 2 3 len和cap分别为已经缓冲数量和缓冲区大小
5fmt.Println(<-c)
6fmt.Println(<-c) // 缓冲区中还有数据,读取数据不会阻塞

2.向channel中发送数据

向channel发送数据的语法十分简单

  • 向已经关闭的channel发送数据将panic

3.从channel中接收数据

3.1 直接读取

1msg := <-ch

3.2 ok-idom,判断通道是否被关闭

从已经关闭的channel中接收数据,可能返回已经缓冲的数据或者零值,可使用ok-idom s, ok := <-ch判断通道是否被关闭。

 1func main() {
 2   done := make(chan struct{})
 3   ch := make(chan string)
 4   go func() {
 5   	defer close(done)
 6   	for {
 7   		s, ok := <-ch
 8   		if !ok { // 判断通道是否被关闭
 9   			fmt.Println("the channel ch is closed")
10   			return
11   		}
12   		fmt.Println(s)
13   	}
14   }()
15   ch <- "a"
16   ch <- "b"
17   close(ch)
18   <-done
19}
20
21输出:
22a
23b
24the channel ch is closed

ok为true的时候,表示从channel中读取出了数据,此时channel有可能还没关闭,也有可能已经关闭了(读取的是缓冲区的数据);ok为false的时候,则通道一定是关闭了。

 1func main() {
 2	ch := make(chan string, 3)
 3	ch <- "hello"
 4	ch <- "world"
 5	close(ch)
 6	for i := 0; i < cap(ch); i++ {
 7		s, ok := <-ch
 8		fmt.Println(i, ":", s, ok)
 9	}
10}
11
12输出:
130 : hello true
141 : world true
152 :  false

4.单向channel

channel默认是双向的,不区分发送端和接收端。 可以通过单向channel限制收发方向。

 1func main() {
 2	var wg sync.WaitGroup
 3	wg.Add(2)
 4
 5	ch := make(chan string)
 6
 7	go func(c <-chan string) {
 8		defer wg.Done()
 9		for x := range c {
10			fmt.Println(x)
11		}
12	}(ch)
13
14	go func(c chan<- string) {
15		defer wg.Done()
16		defer close(c)
17		for i := 0; i < 3; i++ {
18			c <- fmt.Sprint("msg", i)
19		}
20	}(ch)
21	wg.Wait()
22}
23
24msg0
25msg1
26msg2
  • 单向通道一般用来实现对通道更严谨的操作逻辑,因为不能在通道上做逆向操作。
  • close函数无法用于关闭接收端的通道<-chan
  • 无法将单向通道类型转换回普通通道

5.channel和select-case语句

select-case语句可以用来监听多个channel,会随机选择一个可用channel进行接收操作。

下面的代码有点多路选择的意思:

1select {
2	case v1 := <-ch1:
3		log.Println("recieve from ch1: ", v1)
4	case v2 := <-ch2:
5		log.Println("recieve from ch2: ", v2)
6}

上面的代码会从ch1和ch2中随机选择一个可以接收数据的channel接收数据,否则将一直阻塞。 可以为该代码加上超时控制

1select {
2	case v1 := <-ch1:
3		log.Println("recieve from ch1: ", v1)
4	case v2 := <-ch2:
5		log.Println("recieve from ch2: ", v2)
6	case <-time.After(time.Second * 2):
7		log.Println("time out")
8}

如果超过2秒还没有接收到数据的话,将触发超时的操作。

 1func main() {
 2	var wg sync.WaitGroup
 3	wg.Add(3)
 4
 5	ch1 := make(chan string)
 6	ch2 := make(chan string)
 7
 8	go func() {
 9		defer wg.Done()
10		for {
11			select {
12			case x, ok := <-ch1:
13				if !ok { // ch1已结关闭,设置为nil,从nil通道接收数据将一直阻塞
14					ch1 = nil
15					break
16				}
17				fmt.Println(x)
18			case x, ok := <-ch2:
19				if !ok { // ch2已结关闭,设置为nil,从nil通道接收数据将一直阻塞
20					ch2 = nil
21					break
22				}
23				fmt.Println( x)
24			}
25			if ch1 == nil && ch2 == nil {
26				return
27			}
28		}
29	}()
30
31	go func() {
32		defer wg.Done()
33		defer close(ch1)
34		for i := 0; i < 4; i++ {
35			ch1 <- fmt.Sprint("ch1 ", i)
36		}
37	}()
38
39	go func() {
40		defer wg.Done()
41		defer close(ch2)
42		for i := 0; i < 8; i++ {
43			ch2 <- fmt.Sprint("ch2 ", i)
44		}
45	}()
46	wg.Wait()
47}

如果所有的case下的通道都不可用的话,会阻塞操作,此时可以使用default语句避开阻塞,但要注意处理外层循环避免产生空耗。

总结

  • 向nil channel中发送数据,将会一直阻塞
  • 从nil channel中接收数据,将会一直阻塞
  • 从一个已经关闭的channel接收数据,如果缓存区为空将会返回零值。可以使用ok-idom,判断通道是否已被关闭。
  • close一个值为nil的channel会panic
  • close一个已经close的channel会panic
  • 向一个已经关闭的channel发送数据会引发panic