Go源码学习: channel底层的数据结构是什么, 是如何被创建的
📅 2021-11-01 | 🖱️
channel是Go语言核心数据结构之一。Go提供了CSP(通信顺序进程Communicating sequential processes)的并发模型,关于Go并发的那句经典的谚语不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存
,goroutine之间的通信通过channel来完成。
本节将通过阅读源码(Go 1.17.2)了解Go语言中channel的数据结构,以及channel是如何被创建的。
我们编写的代码中channel是类似chan int
类型的,channel在Go内部真实的数据结构可以使用delve调试的方式找到。编写以下简单的代码:
1package main
2
3func main() {
4 ch := make(chan int, 3)
5 ch <- 3
6 <-ch
7 close(ch)
8}
这段代码虽然简单,但几乎包含了关于channe的除了for range
和select
之外的所有基本操作: 创建channel、往channel中发送数据、从channel接收数据、关闭channel。
将断点打到make(chan int, 3)
这行:
1> main.main() ./main.go:4 (PC: 0x1058e34)
2 1: package main
3 2:
4 3: func main() {
5=> 4: ch := make(chan int, 3)
6 5: ch <- 3
7 6: <-ch
8 7: close(ch)
9 8: }
使用disass切换到汇编模式:
1TEXT main.main(SB) main.go
2 main.go:3 0x1058e20 493b6610 cmp rsp, qword ptr [r14+0x10]
3 main.go:3 0x1058e24 7653 jbe 0x1058e79
4 main.go:3 0x1058e26* 4883ec20 sub rsp, 0x20
5 main.go:3 0x1058e2a 48896c2418 mov qword ptr [rsp+0x18], rbp
6 main.go:3 0x1058e2f 488d6c2418 lea rbp, ptr [rsp+0x18]
7=> main.go:4 0x1058e34 488d05e53f0000 lea rax, ptr [rip+0x3fe5]
8 main.go:4 0x1058e3b bb03000000 mov ebx, 0x3
9 main.go:4 0x1058e40 e8bba9faff call $runtime.makechan
10 main.go:4 0x1058e45 4889442410 mov qword ptr [rsp+0x10], rax
11 main.go:5 0x1058e4a 488d1d9f360100 lea rbx, ptr [rip+0x1369f]
12 main.go:5 0x1058e51 e82aacfaff call $runtime.chansend1
13 main.go:6 0x1058e56 488b442410 mov rax, qword ptr [rsp+0x10]
14 main.go:6 0x1058e5b 31db xor ebx, ebx
15 main.go:6 0x1058e5d 0f1f00 nop dword ptr [rax], eax
16 main.go:6 0x1058e60 e81bb6faff call $runtime.chanrecv1
17 main.go:7 0x1058e65 488b442410 mov rax, qword ptr [rsp+0x10]
18 main.go:7 0x1058e6a e891b3faff call $runtime.closechan
19 main.go:8 0x1058e6f 488b6c2418 mov rbp, qword ptr [rsp+0x18]
20 main.go:8 0x1058e74 4883c420 add rsp, 0x20
21 main.go:8 0x1058e78 c3 ret
22 main.go:3 0x1058e79 e882d0ffff call $runtime.morestack_noctxt
23 main.go:3 0x1058e7e 6690 data16 nop
24 main.go:3 0x1058e80 eb9e jmp $main.main
看出make(chan int, 3)
这行对应会调用runtime.makechan
就是今天要阅读的channel如何被创建的源码。runtime.chansend1
对应发送数据,runtime.chanrecv1
对应接收数据,runtime.closechan
对应关闭channel。
channel的底层数据结构runtime.hchan #
本节先看channel是如何被创建的,将断点打到runtime.makechan
并执行到这个断点上。
1> runtime.makechan() /usr/local/Cellar/go/1.17.2/libexec/src/runtime/chan.go:71 (hits goroutine(1):1 total:1) (PC: 0x100380a)
2Warning: debugging optimized function
3 66: }
4 67:
5 68: return makechan(t, int(size))
6 69: }
7 70:
8=> 71: func makechan(t *chantype, size int) *hchan {
9 72: elem := t.elem
10 73:
11 74: // compiler checks this but be safe.
12 75: if elem.size >= 1<<16 {
13 76: throw("makechan: invalid channel element type")
这样就找到了runtime.makechan
的源码在runtime/chan.go的71行。在第91行,声明了一个var c *hchan, *hchan类型的结构体指针,hchan结构体就是channel的数据结构。
在chan.go的第32行找到了hchan结构体的具体定义:
1type hchan struct {
2 qcount uint // total data in the queue
3 dataqsiz uint // size of the circular queue
4 buf unsafe.Pointer // points to an array of dataqsiz elements
5 elemsize uint16
6 closed uint32
7 elemtype *_type // element type
8 sendx uint // send index
9 recvx uint // receive index
10 recvq waitq // list of recv waiters
11 sendq waitq // list of send waiters
12
13 // lock protects all fields in hchan, as well as several
14 // fields in sudogs blocked on this channel.
15 //
16 // Do not change another G's status while holding this lock
17 // (in particular, do not ready a G), as this can deadlock
18 // with stack shrinking.
19 lock mutex
20}
hchan结构体中qcount
, dataqsiz
, buf
, sendx
, recvx
这5个字段用于构建hchan的底层数据结构循环队列
:
- qcount是循环队列中元素的个数,也就是当前channel中的元素个数
- dataqsize是循环队列的长度,也就是channel缓冲区的大小
- buf指向了一个长度为dataqsiz的数组,即循环队列是使用数组实现的
- sendx表示当前channel发送已经到了数组中的哪个位置
- recvx表示当前channel接收已经到了数组中的哪个位置
elemtype
和elemsize
分别表示channel收发元素的类型和大小。
sendq
用来存储当channel的缓冲区已满时阻塞的等待发送数据的goroutine。recvq
用来存储当channel的缓冲区为空时,阻塞的等待接收数据的goroutine。
sendq
和recvq
的类型是一个waitq
的结构体:
1type waitq struct {
2 first *sudog
3 last *sudog
4}
waitq从名称看是等待队列,waitq这个结构体中有first
和last
两个类型为sudog
的指针,sudog
的源码位于runtime/runtime2.go
中:
1type sudog struct {
2 // The following fields are protected by the hchan.lock of the
3 // channel this sudog is blocking on. shrinkstack depends on
4 // this for sudogs involved in channel ops.
5
6 g *g
7
8 next *sudog
9 prev *sudog
10 elem unsafe.Pointer // data element (may point to stack)
11
12 // The following fields are never accessed concurrently.
13 // For channels, waitlink is only accessed by g.
14 // For semaphores, all fields (including the ones above)
15 // are only accessed when holding a semaRoot lock.
16
17 acquiretime int64
18 releasetime int64
19 ticket uint32
20
21 // isSelect indicates g is participating in a select, so
22 // g.selectDone must be CAS'd to win the wake-up race.
23 isSelect bool
24
25 // success indicates whether communication over channel c
26 // succeeded. It is true if the goroutine was awoken because a
27 // value was delivered over channel c, and false if awoken
28 // because c was closed.
29 success bool
30
31 parent *sudog // semaRoot binary tree
32 waitlink *sudog // g.waiting list or semaRoot
33 waittail *sudog // semaRoot
34 c *hchan // channel
35}
一个sudog
就表示一个在等待队列中的goroutine,其内部包含其在等待队列中前后两个sudog的指针prev
和next
。
hchan是如何被创建的 #
回到runtime.makechan
函数本身,只关注主要脉络来看一下hchan是如何被创建的:
1func makechan(t *chantype, size int) *hchan {
2 elem := t.elem
3
4 // compiler checks this but be safe.
5 if elem.size >= 1<<16 {
6 throw("makechan: invalid channel element type")
7 }
8 if hchanSize%maxAlign != 0 || elem.align > maxAlign {
9 throw("makechan: bad alignment")
10 }
11
12 mem, overflow := math.MulUintptr(elem.size, uintptr(size))
13 if overflow || mem > maxAlloc-hchanSize || size < 0 {
14 panic(plainError("makechan: size out of range"))
15 }
16
17 // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
18 // buf points into the same allocation, elemtype is persistent.
19 // SudoG's are referenced from their owning thread so they can't be collected.
20 // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
21 var c *hchan
22 switch {
23 case mem == 0:
24 // Queue or element size is zero.
25 c = (*hchan)(mallocgc(hchanSize, nil, true))
26 // Race detector uses this location for synchronization.
27 c.buf = c.raceaddr()
28 case elem.ptrdata == 0:
29 // Elements do not contain pointers.
30 // Allocate hchan and buf in one call.
31 c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
32 c.buf = add(unsafe.Pointer(c), hchanSize)
33 default:
34 // Elements contain pointers.
35 c = new(hchan)
36 c.buf = mallocgc(mem, elem, true)
37 }
38
39 c.elemsize = uint16(elem.size)
40 c.elemtype = elem
41 c.dataqsiz = uint(size)
42 lockInit(&c.lock, lockRankHchan)
43
44 if debugChan {
45 print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
46 }
47 return c
48}
runtime.makechan
中创建hchan的代码,在当前的实现中还比较简单,switch语句中考虑了channel中不存在缓冲区,channel中不保存指针类型的数据,以及之外的默认情况如何为hchan和缓冲区buf分配内存。
在makechan函数的最后更新elemsize
, elemtype
, dataqsiz
等字段。makechan中只是创建了底层hchan及hchan内部的buf循环队列。