作为一个Gopher在刚开始学习channel的时候,一定见过以下关于关闭channel会发生什么情况的总结:

  • 关闭一个值为nil的channel将会panic
  • 关闭一个已经关闭的channel将会panic
  • 向一个已经关闭的channel发送数据会panic

本节将学习关闭channel,即close(ch)的内部实现。关闭channel实际上调用的是runtime.closechan这个函数。

runtime.closechan函数签名如下,它只有一个*hcan参数指向要关闭的channel,它没有返回值。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
> runtime.closechan() /usr/local/Cellar/go/1.17.2/libexec/src/runtime/chan.go:355 (hits goroutine(1):1 total:1) (PC: 0x1004b4a)
Warning: debugging optimized function
   350:         src := sg.elem
   351:         typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
   352:         memmove(dst, src, t.size)
   353: }
   354:
=> 355: func closechan(c *hchan) {
   356:         if c == nil {
   357:                 panic(plainError("close of nil channel"))
   358:         }
   359:
   360:         lock(&c.lock)

runtime.closechan的内部执行流程

下图是在阅读runtime.closechan源码,梳理其内部执行流程时,绘制的活动图,只是梳理大致脉络加深理解,不一定完全准确:

runtime-closechan.png

对异常情况处理channel为ni的情况和channel已经关闭的情况

clonsechan函数内部首先是对异常情况的判断,如果channel是nil将直接panic,如果channel已经关闭,将直接panic。

1
2
3
4
5
6
7
8
9
	if c == nil {
		panic(plainError("close of nil channel"))
	}

	lock(&c.lock)
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}

清理并释放两个等待队列recvq和sendq

到这里进入关闭channel的主逻辑:

  • 首先将channel的底层数据结构hchan上的closed字段设置成1,表示当前channel已经关闭了。
  • 接下来,会初始化一个gList结构,用来存放recvq和sendq两个等待队列中的接收方和发送方的Goroutine。
  • 将接收方等待队列recvq上的sudog出队,清理未被处理的元素,并将接收方的G都添加到gList中
  • 将发送方等待队列sendq上的sudog出队,清理未被处理的元素,并将发送方的G都添加到gList中
  • 最后执行解锁操作
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
	c.closed = 1

	var glist gList

	// release all readers
	for {
		sg := c.recvq.dequeue()
		if sg == nil {
			break
		}
		if sg.elem != nil {
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}

	// release all writers (they will panic)
	for {
		sg := c.sendq.dequeue()
		if sg == nil {
			break
		}
		sg.elem = nil
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}
	unlock(&c.lock)

唤醒原来在两个等待队列recvq和sendq中阻塞的Goroutine

执行到这一步,原来在两个等待队列recvq和sendq中阻塞的Goroutine都被放到了glist中,最后为这些被阻塞的G调用goready函数唤醒这些G,将重新对这些G进行调度。

1
2
3
4
5
6
	// Ready all Gs now that we've dropped the channel lock.
	for !glist.empty() {
		gp := glist.pop()
		gp.schedlink = 0
		goready(gp, 3)
	}