channel是Go语言核心数据结构之一。Go提供了CSP(通信顺序进程Communicating sequential processes)的并发模型,关于Go并发的那句经典的谚语不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存,goroutine之间的通信通过channel来完成。 本节将通过阅读源码(Go 1.17.2)了解Go语言中channel的数据结构,以及channel是如何被创建的。

我们编写的代码中channel是类似chan int类型的,channel在Go内部真实的数据结构可以使用delve调试的方式找到。编写以下简单的代码:

1
2
3
4
5
6
7
8
package main

func main() {
	ch := make(chan int, 3)
	ch <- 3
	<-ch
	close(ch)
}

这段代码虽然简单,但几乎包含了关于channe的除了for rangeselect之外的所有基本操作: 创建channel、往channel中发送数据、从channel接收数据、关闭channel。

将断点打到make(chan int, 3)这行:

1
2
3
4
5
6
7
8
9
> main.main() ./main.go:4 (PC: 0x1058e34)
     1: package main
     2:
     3: func main() {
=>   4:         ch := make(chan int, 3)
     5:         ch <- 3
     6:         <-ch
     7:         close(ch)
     8: }

使用disass切换到汇编模式:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
TEXT main.main(SB) /Users/Erich/ViewChain/vhscops/go-hubot-api/cmd/hello/main.go
        main.go:3       0x1058e20       493b6610        cmp rsp, qword ptr [r14+0x10]
        main.go:3       0x1058e24       7653            jbe 0x1058e79
        main.go:3       0x1058e26*      4883ec20        sub rsp, 0x20
        main.go:3       0x1058e2a       48896c2418      mov qword ptr [rsp+0x18], rbp
        main.go:3       0x1058e2f       488d6c2418      lea rbp, ptr [rsp+0x18]
=>      main.go:4       0x1058e34       488d05e53f0000  lea rax, ptr [rip+0x3fe5]
        main.go:4       0x1058e3b       bb03000000      mov ebx, 0x3
        main.go:4       0x1058e40       e8bba9faff      call $runtime.makechan
        main.go:4       0x1058e45       4889442410      mov qword ptr [rsp+0x10], rax
        main.go:5       0x1058e4a       488d1d9f360100  lea rbx, ptr [rip+0x1369f]
        main.go:5       0x1058e51       e82aacfaff      call $runtime.chansend1
        main.go:6       0x1058e56       488b442410      mov rax, qword ptr [rsp+0x10]
        main.go:6       0x1058e5b       31db            xor ebx, ebx
        main.go:6       0x1058e5d       0f1f00          nop dword ptr [rax], eax
        main.go:6       0x1058e60       e81bb6faff      call $runtime.chanrecv1
        main.go:7       0x1058e65       488b442410      mov rax, qword ptr [rsp+0x10]
        main.go:7       0x1058e6a       e891b3faff      call $runtime.closechan
        main.go:8       0x1058e6f       488b6c2418      mov rbp, qword ptr [rsp+0x18]
        main.go:8       0x1058e74       4883c420        add rsp, 0x20
        main.go:8       0x1058e78       c3              ret
        main.go:3       0x1058e79       e882d0ffff      call $runtime.morestack_noctxt
        main.go:3       0x1058e7e       6690            data16 nop
        main.go:3       0x1058e80       eb9e            jmp $main.main

看出make(chan int, 3)这行对应会调用runtime.makechan就是今天要阅读的channel如何被创建的源码。runtime.chansend1对应发送数据,runtime.chanrecv1对应接收数据,runtime.closechan对应关闭channel。

channel的底层数据结构runtime.hchan

本节先看channel是如何被创建的,将断点打到runtime.makechan并执行到这个断点上。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
> runtime.makechan() /usr/local/Cellar/go/1.17.2/libexec/src/runtime/chan.go:71 (hits goroutine(1):1 total:1) (PC: 0x100380a)
Warning: debugging optimized function
    66:         }
    67:
    68:         return makechan(t, int(size))
    69: }
    70:
=>  71: func makechan(t *chantype, size int) *hchan {
    72:         elem := t.elem
    73:
    74:         // compiler checks this but be safe.
    75:         if elem.size >= 1<<16 {
    76:                 throw("makechan: invalid channel element type")

这样就找到了runtime.makechan的源码在runtime/chan.go的71行。在第91行,声明了一个var c *hchan, *hchan类型的结构体指针,hchan结构体就是channel的数据结构。

在chan.go的第32行找到了hchan结构体的具体定义:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
type hchan struct {
	qcount   uint           // total data in the queue
	dataqsiz uint           // size of the circular queue
	buf      unsafe.Pointer // points to an array of dataqsiz elements
	elemsize uint16
	closed   uint32
	elemtype *_type // element type
	sendx    uint   // send index
	recvx    uint   // receive index
	recvq    waitq  // list of recv waiters
	sendq    waitq  // list of send waiters

	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	lock mutex
}

hchan结构体中qcount, dataqsiz, buf, sendx, recvx这5个字段用于构建hchan的底层数据结构循环队列:

  • qcount是循环队列中元素的个数,也就是当前channel中的元素个数
  • dataqsize是循环队列的长度,也就是channel缓冲区的大小
  • buf指向了一个长度为dataqsiz的数组,即循环队列是使用数组实现的
  • sendx表示当前channel发送已经到了数组中的哪个位置
  • recvx表示当前channel接收已经到了数组中的哪个位置

elemtypeelemsize分别表示channel收发元素的类型和大小。

sendq用来存储当channel的缓冲区已满时阻塞的等待发送数据的goroutine。recvq用来存储当channel的缓冲区为空时,阻塞的等待接收数据的goroutine。 sendqrecvq的类型是一个waitq的结构体:

1
2
3
4
type waitq struct {
	first *sudog
	last  *sudog
}

waitq从名称看是等待队列,waitq这个结构体中有firstlast两个类型为sudog的指针,sudog的源码位于runtime/runtime2.go中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
type sudog struct {
	// The following fields are protected by the hchan.lock of the
	// channel this sudog is blocking on. shrinkstack depends on
	// this for sudogs involved in channel ops.

	g *g

	next *sudog
	prev *sudog
	elem unsafe.Pointer // data element (may point to stack)

	// The following fields are never accessed concurrently.
	// For channels, waitlink is only accessed by g.
	// For semaphores, all fields (including the ones above)
	// are only accessed when holding a semaRoot lock.

	acquiretime int64
	releasetime int64
	ticket      uint32

	// isSelect indicates g is participating in a select, so
	// g.selectDone must be CAS'd to win the wake-up race.
	isSelect bool

	// success indicates whether communication over channel c
	// succeeded. It is true if the goroutine was awoken because a
	// value was delivered over channel c, and false if awoken
	// because c was closed.
	success bool

	parent   *sudog // semaRoot binary tree
	waitlink *sudog // g.waiting list or semaRoot
	waittail *sudog // semaRoot
	c        *hchan // channel
}

一个sudog就表示一个在等待队列中的goroutine,其内部包含其在等待队列中前后两个sudog的指针prevnext

hchan是如何被创建的

回到runtime.makechan函数本身,只关注主要脉络来看一下hchan是如何被创建的:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func makechan(t *chantype, size int) *hchan {
	elem := t.elem

	// compiler checks this but be safe.
	if elem.size >= 1<<16 {
		throw("makechan: invalid channel element type")
	}
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		throw("makechan: bad alignment")
	}

	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
	// buf points into the same allocation, elemtype is persistent.
	// SudoG's are referenced from their owning thread so they can't be collected.
	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
	var c *hchan
	switch {
	case mem == 0:
		// Queue or element size is zero.
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		// Race detector uses this location for synchronization.
		c.buf = c.raceaddr()
	case elem.ptrdata == 0:
		// Elements do not contain pointers.
		// Allocate hchan and buf in one call.
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		// Elements contain pointers.
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}

	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)
	lockInit(&c.lock, lockRankHchan)

	if debugChan {
		print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
	}
	return c
}

runtime.makechan中创建hchan的代码,在当前的实现中还比较简单,switch语句中考虑了channel中不存在缓冲区,channel中不保存指针类型的数据,以及之外的默认情况如何为hchan和缓冲区buf分配内存。 在makechan函数的最后更新elemsize, elemtype, dataqsiz等字段。makechan中只是创建了底层hchan及hchan内部的buf循环队列。