Go源码学习: 关闭channel的内部实现是什么样的?
2021-11-05
作为一个Gopher在刚开始学习channel的时候,一定见过以下关于关闭channel会发生什么情况
的总结:
- 关闭一个值为nil的channel将会panic
- 关闭一个已经关闭的channel将会panic
- 向一个已经关闭的channel发送数据会panic
本节将学习关闭channel,即close(ch)
的内部实现。关闭channel实际上调用的是runtime.closechan
这个函数。
runtime.closechan
函数签名如下,它只有一个*hcan
参数指向要关闭的channel,它没有返回值。
1> runtime.closechan() /usr/local/Cellar/go/1.17.2/libexec/src/runtime/chan.go:355 (hits goroutine(1):1 total:1) (PC: 0x1004b4a)
2Warning: debugging optimized function
3 350: src := sg.elem
4 351: typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
5 352: memmove(dst, src, t.size)
6 353: }
7 354:
8=> 355: func closechan(c *hchan) {
9 356: if c == nil {
10 357: panic(plainError("close of nil channel"))
11 358: }
12 359:
13 360: lock(&c.lock)
runtime.closechan的内部执行流程 #
下图是在阅读runtime.closechan源码,梳理其内部执行流程时,绘制的活动图,只是梳理大致脉络加深理解,不一定完全准确:
对异常情况处理channel为ni的情况和channel已经关闭的情况 #
clonsechan函数内部首先是对异常情况的判断,如果channel是nil将直接panic,如果channel已经关闭,将直接panic。
1 if c == nil {
2 panic(plainError("close of nil channel"))
3 }
4
5 lock(&c.lock)
6 if c.closed != 0 {
7 unlock(&c.lock)
8 panic(plainError("close of closed channel"))
9 }
清理并释放两个等待队列recvq和sendq #
到这里进入关闭channel的主逻辑:
- 首先将channel的底层数据结构hchan上的
closed
字段设置成1,表示当前channel已经关闭了。 - 接下来,会初始化一个
gList
结构,用来存放recvq和sendq两个等待队列中的接收方和发送方的Goroutine。 - 将接收方等待队列recvq上的sudog出队,清理未被处理的元素,并将接收方的G都添加到gList中
- 将发送方等待队列sendq上的sudog出队,清理未被处理的元素,并将发送方的G都添加到gList中
- 最后执行解锁操作
1 c.closed = 1
2
3 var glist gList
4
5 // release all readers
6 for {
7 sg := c.recvq.dequeue()
8 if sg == nil {
9 break
10 }
11 if sg.elem != nil {
12 typedmemclr(c.elemtype, sg.elem)
13 sg.elem = nil
14 }
15 if sg.releasetime != 0 {
16 sg.releasetime = cputicks()
17 }
18 gp := sg.g
19 gp.param = unsafe.Pointer(sg)
20 sg.success = false
21 if raceenabled {
22 raceacquireg(gp, c.raceaddr())
23 }
24 glist.push(gp)
25 }
26
27 // release all writers (they will panic)
28 for {
29 sg := c.sendq.dequeue()
30 if sg == nil {
31 break
32 }
33 sg.elem = nil
34 if sg.releasetime != 0 {
35 sg.releasetime = cputicks()
36 }
37 gp := sg.g
38 gp.param = unsafe.Pointer(sg)
39 sg.success = false
40 if raceenabled {
41 raceacquireg(gp, c.raceaddr())
42 }
43 glist.push(gp)
44 }
45 unlock(&c.lock)
唤醒原来在两个等待队列recvq和sendq中阻塞的Goroutine #
执行到这一步,原来在两个等待队列recvq和sendq中阻塞的Goroutine都被放到了glist中,最后为这些被阻塞的G调用goready
函数唤醒这些G,将重新对这些G进行调度。
1 // Ready all Gs now that we've dropped the channel lock.
2 for !glist.empty() {
3 gp := glist.pop()
4 gp.schedlink = 0
5 goready(gp, 3)
6 }