channel是Go语言核心数据结构之一。Go提供了CSP(通信顺序进程Communicating sequential processes)的并发模型,关于Go并发的那句经典的谚语不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存,goroutine之间的通信通过channel来完成。 本节将通过阅读源码(Go 1.17.2)了解Go语言中channel的数据结构,以及channel是如何被创建的。

我们编写的代码中channel是类似chan int类型的,channel在Go内部真实的数据结构可以使用delve调试的方式找到。编写以下简单的代码:

1package main
2
3func main() {
4	ch := make(chan int, 3)
5	ch <- 3
6	<-ch
7	close(ch)
8}

这段代码虽然简单,但几乎包含了关于channe的除了for rangeselect之外的所有基本操作: 创建channel、往channel中发送数据、从channel接收数据、关闭channel。

将断点打到make(chan int, 3)这行:

1> main.main() ./main.go:4 (PC: 0x1058e34)
2     1: package main
3     2:
4     3: func main() {
5=>   4:         ch := make(chan int, 3)
6     5:         ch <- 3
7     6:         <-ch
8     7:         close(ch)
9     8: }

使用disass切换到汇编模式:

 1TEXT main.main(SB) /Users/Erich/ViewChain/vhscops/go-hubot-api/cmd/hello/main.go
 2        main.go:3       0x1058e20       493b6610        cmp rsp, qword ptr [r14+0x10]
 3        main.go:3       0x1058e24       7653            jbe 0x1058e79
 4        main.go:3       0x1058e26*      4883ec20        sub rsp, 0x20
 5        main.go:3       0x1058e2a       48896c2418      mov qword ptr [rsp+0x18], rbp
 6        main.go:3       0x1058e2f       488d6c2418      lea rbp, ptr [rsp+0x18]
 7=>      main.go:4       0x1058e34       488d05e53f0000  lea rax, ptr [rip+0x3fe5]
 8        main.go:4       0x1058e3b       bb03000000      mov ebx, 0x3
 9        main.go:4       0x1058e40       e8bba9faff      call $runtime.makechan
10        main.go:4       0x1058e45       4889442410      mov qword ptr [rsp+0x10], rax
11        main.go:5       0x1058e4a       488d1d9f360100  lea rbx, ptr [rip+0x1369f]
12        main.go:5       0x1058e51       e82aacfaff      call $runtime.chansend1
13        main.go:6       0x1058e56       488b442410      mov rax, qword ptr [rsp+0x10]
14        main.go:6       0x1058e5b       31db            xor ebx, ebx
15        main.go:6       0x1058e5d       0f1f00          nop dword ptr [rax], eax
16        main.go:6       0x1058e60       e81bb6faff      call $runtime.chanrecv1
17        main.go:7       0x1058e65       488b442410      mov rax, qword ptr [rsp+0x10]
18        main.go:7       0x1058e6a       e891b3faff      call $runtime.closechan
19        main.go:8       0x1058e6f       488b6c2418      mov rbp, qword ptr [rsp+0x18]
20        main.go:8       0x1058e74       4883c420        add rsp, 0x20
21        main.go:8       0x1058e78       c3              ret
22        main.go:3       0x1058e79       e882d0ffff      call $runtime.morestack_noctxt
23        main.go:3       0x1058e7e       6690            data16 nop
24        main.go:3       0x1058e80       eb9e            jmp $main.main

看出make(chan int, 3)这行对应会调用runtime.makechan就是今天要阅读的channel如何被创建的源码。runtime.chansend1对应发送数据,runtime.chanrecv1对应接收数据,runtime.closechan对应关闭channel。

channel的底层数据结构runtime.hchan

本节先看channel是如何被创建的,将断点打到runtime.makechan并执行到这个断点上。

 1> runtime.makechan() /usr/local/Cellar/go/1.17.2/libexec/src/runtime/chan.go:71 (hits goroutine(1):1 total:1) (PC: 0x100380a)
 2Warning: debugging optimized function
 3    66:         }
 4    67:
 5    68:         return makechan(t, int(size))
 6    69: }
 7    70:
 8=>  71: func makechan(t *chantype, size int) *hchan {
 9    72:         elem := t.elem
10    73:
11    74:         // compiler checks this but be safe.
12    75:         if elem.size >= 1<<16 {
13    76:                 throw("makechan: invalid channel element type")

这样就找到了runtime.makechan的源码在runtime/chan.go的71行。在第91行,声明了一个var c *hchan, *hchan类型的结构体指针,hchan结构体就是channel的数据结构。

在chan.go的第32行找到了hchan结构体的具体定义:

 1type hchan struct {
 2	qcount   uint           // total data in the queue
 3	dataqsiz uint           // size of the circular queue
 4	buf      unsafe.Pointer // points to an array of dataqsiz elements
 5	elemsize uint16
 6	closed   uint32
 7	elemtype *_type // element type
 8	sendx    uint   // send index
 9	recvx    uint   // receive index
10	recvq    waitq  // list of recv waiters
11	sendq    waitq  // list of send waiters
12
13	// lock protects all fields in hchan, as well as several
14	// fields in sudogs blocked on this channel.
15	//
16	// Do not change another G's status while holding this lock
17	// (in particular, do not ready a G), as this can deadlock
18	// with stack shrinking.
19	lock mutex
20}

hchan结构体中qcount, dataqsiz, buf, sendx, recvx这5个字段用于构建hchan的底层数据结构循环队列:

  • qcount是循环队列中元素的个数,也就是当前channel中的元素个数
  • dataqsize是循环队列的长度,也就是channel缓冲区的大小
  • buf指向了一个长度为dataqsiz的数组,即循环队列是使用数组实现的
  • sendx表示当前channel发送已经到了数组中的哪个位置
  • recvx表示当前channel接收已经到了数组中的哪个位置

elemtypeelemsize分别表示channel收发元素的类型和大小。

sendq用来存储当channel的缓冲区已满时阻塞的等待发送数据的goroutine。recvq用来存储当channel的缓冲区为空时,阻塞的等待接收数据的goroutine。 sendqrecvq的类型是一个waitq的结构体:

1type waitq struct {
2	first *sudog
3	last  *sudog
4}

waitq从名称看是等待队列,waitq这个结构体中有firstlast两个类型为sudog的指针,sudog的源码位于runtime/runtime2.go中:

 1type sudog struct {
 2	// The following fields are protected by the hchan.lock of the
 3	// channel this sudog is blocking on. shrinkstack depends on
 4	// this for sudogs involved in channel ops.
 5
 6	g *g
 7
 8	next *sudog
 9	prev *sudog
10	elem unsafe.Pointer // data element (may point to stack)
11
12	// The following fields are never accessed concurrently.
13	// For channels, waitlink is only accessed by g.
14	// For semaphores, all fields (including the ones above)
15	// are only accessed when holding a semaRoot lock.
16
17	acquiretime int64
18	releasetime int64
19	ticket      uint32
20
21	// isSelect indicates g is participating in a select, so
22	// g.selectDone must be CAS'd to win the wake-up race.
23	isSelect bool
24
25	// success indicates whether communication over channel c
26	// succeeded. It is true if the goroutine was awoken because a
27	// value was delivered over channel c, and false if awoken
28	// because c was closed.
29	success bool
30
31	parent   *sudog // semaRoot binary tree
32	waitlink *sudog // g.waiting list or semaRoot
33	waittail *sudog // semaRoot
34	c        *hchan // channel
35}

一个sudog就表示一个在等待队列中的goroutine,其内部包含其在等待队列中前后两个sudog的指针prevnext

hchan是如何被创建的

回到runtime.makechan函数本身,只关注主要脉络来看一下hchan是如何被创建的:

 1func makechan(t *chantype, size int) *hchan {
 2	elem := t.elem
 3
 4	// compiler checks this but be safe.
 5	if elem.size >= 1<<16 {
 6		throw("makechan: invalid channel element type")
 7	}
 8	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
 9		throw("makechan: bad alignment")
10	}
11
12	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
13	if overflow || mem > maxAlloc-hchanSize || size < 0 {
14		panic(plainError("makechan: size out of range"))
15	}
16
17	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
18	// buf points into the same allocation, elemtype is persistent.
19	// SudoG's are referenced from their owning thread so they can't be collected.
20	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
21	var c *hchan
22	switch {
23	case mem == 0:
24		// Queue or element size is zero.
25		c = (*hchan)(mallocgc(hchanSize, nil, true))
26		// Race detector uses this location for synchronization.
27		c.buf = c.raceaddr()
28	case elem.ptrdata == 0:
29		// Elements do not contain pointers.
30		// Allocate hchan and buf in one call.
31		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
32		c.buf = add(unsafe.Pointer(c), hchanSize)
33	default:
34		// Elements contain pointers.
35		c = new(hchan)
36		c.buf = mallocgc(mem, elem, true)
37	}
38
39	c.elemsize = uint16(elem.size)
40	c.elemtype = elem
41	c.dataqsiz = uint(size)
42	lockInit(&c.lock, lockRankHchan)
43
44	if debugChan {
45		print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
46	}
47	return c
48}

runtime.makechan中创建hchan的代码,在当前的实现中还比较简单,switch语句中考虑了channel中不存在缓冲区,channel中不保存指针类型的数据,以及之外的默认情况如何为hchan和缓冲区buf分配内存。 在makechan函数的最后更新elemsize, elemtype, dataqsiz等字段。makechan中只是创建了底层hchan及hchan内部的buf循环队列。