Go源码学习: 从channel中接收数据的内部实现是什么样的?
2021-11-03
本节将学习从channel中接收数据的内部实现。
在Go中从channel中接收数据有两种方式:
1v <- ch
2v, ok <- ch
使用v <- ch
从channel中接收数据时,会调用runtime.chanrecv1
,chanrecv1
调用了runtime.chanrecv
。
1runtime.chanrecv1() /usr/local/Cellar/go/1.17.2/libexec/src/runtime/chan.go:439 (PC: 0x100448e)
2Warning: debugging optimized function
3 434: }
4 435:
5 436: // entry points for <- c from compiled code
6 437: //go:nosplit
7 438: func chanrecv1(c *hchan, elem unsafe.Pointer) {
8=> 439: chanrecv(c, elem, true)
9 440: }
使用v, ok <- ch
从channel中接收数据时,会调用runtime.chanrecv2
,chanrecv2
调用了runtime.chanrecv
。
1> runtime.chanrecv2() /usr/local/Cellar/go/1.17.2/libexec/src/runtime/chan.go:444 (PC: 0x1004e0e)
2Warning: debugging optimized function
3 439: chanrecv(c, elem, true)
4 440: }
5 441:
6 442: //go:nosplit
7 443: func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
8=> 444: _, received = chanrecv(c, elem, true)
9 445: return
10 446: }
可以看到chanrecv1函数和chanrecv2函数调用chanrecv的传参都是一样的,区别只是chanrecv2获取了received的这个返回结果以确认channel是否已经被关闭。
本节主要学习runtime.chanrecv
函数。
1.runtime.chanrecv函数 #
runtime.chanrecv
函数的签名如下:
1 runtime.chanrecv() /usr/local/Cellar/go/1.17.2/libexec/src/runtime/chan.go:454 (PC: 0x1004e4f)
2Warning: debugging optimized function
3 449: // ep may be nil, in which case received data is ignored.
4 450: // If block == false and no elements are available, returns (false, false).
5 451: // Otherwise, if c is closed, zeros *ep and returns (true, false).
6 452: // Otherwise, fills in *ep with an element and returns (true, true).
7 453: // A non-nil ep must point to the heap or the caller's stack.
8=> 454: func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
该函数有以下参数:
c
是hchan,即channelep
指向用于接收数据变量地址,用于接收数据block
表示接收操作是否是阻塞操作,v <- ch
和v, ok <- ch
两种都是阻塞的,对于使用select语句从channel中接收数据时,调用的是runtime.selectnbrecv
,selectnbrecv表示非阻塞接收。selectnbrecv的实现也是调用的runtime.chanrecv
,只是传参的时候block=false
。因此这个block参数主要是区分是否是select receive
。
该函数有以下返回值:
selected
表示是否是selectnbrecv
received
表示是否成功从channel中接收到数据
2.runtime.chanrecv的内部执行流程 #
下图是在阅读runtime.chanrecv
源码,梳理其内部执行流程时,绘制的活动图,只是梳理大致脉络加深理解,不一定完全准确:
2.1 处理channel为nil的情况 #
runtime.chanrecv的第一步是处理channel为nil的情况:
1 if c == nil {
2 if !block {
3 return
4 }
5 gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
6 throw("unreachable")
7 }
channel为nil涉及两种情况:
- block为true时,即
v <- ch
和v, ok <- ch
两种阻塞式接收,将为调用gopark
一直阻塞,waitResaon是ChanReceiveNilChan。对应了Go中的那句话:从一个nil channel中接收数据时,会一直阻塞
,注意这是不严谨的,因为不包含select receive
的情况 - block为false时,即
select receive
的情况,将直接返回
2.2 处理select receive且无从channel中接收的情况 #
第二步是对select receive
非阻塞式接收并且因channel中没有数据而无法从中接收数据的情况处理:
block=false
表示无阻塞式接收,empty
函数的实现用于判断能否从channel中接收数据,对于无缓冲区的channel,判断sendq
上没有正在等待的发送者,对于有缓冲区的channel判断缓冲区中是否是空的。
- 在channel未关闭时直接返回
selected=false, received=false
- 如果channel已经关闭且再次检查channel中没有数据可被接收时,返回
selected=true, received=false
1 if !block && empty(c) {
2 if atomic.Load(&c.closed) == 0 {
3 return
4 }
5
6 if empty(c) {
7 if raceenabled {
8 raceacquire(c.raceaddr())
9 }
10 if ep != nil {
11 typedmemclr(c.elemtype, ep)
12 }
13 return true, false
14 }
15 }
1func empty(c *hchan) bool {
2 // c.dataqsiz is immutable.
3 if c.dataqsiz == 0 {
4 return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
5 }
6 return atomic.Loaduint(&c.qcount) == 0
7}
2.3 处理channel已经关闭且缓冲区中没有数据可接收的情况 #
能执行到第三步的话,就要从channel接收数据了,需要先加锁。接收数据时有几种情况。
本步骤处理的是当channel已经关闭并且channel的缓冲区中没有数据可以接收的情况,即如果channel已关闭且缓冲区为空,就会直接解锁,并清除ep
指针中的数据,后直接返回selected=true, received=false
。
对应那句从一个已经关闭的channel接收数据,如果缓存区为空将会返回零值。可以使用ok-idom,判断通道是否已被关闭。
。
1 lock(&c.lock)
2
3 if c.closed != 0 && c.qcount == 0 {
4 if raceenabled {
5 raceacquire(c.raceaddr())
6 }
7 unlock(&c.lock)
8 if ep != nil {
9 typedmemclr(c.elemtype, ep)
10 }
11 return true, false
12 }
2.4 尝试从等待队列sendq上的发送方接收 #
从channel接收数据时,先尝试从阻塞在等待队列sendq中的发送方那里接收。
如下面的代码,会从sendq队列中将队头出列,取出最先陷入等待的发送方Groutine,获取sudog结构,调用runtime.recv
函数接收数据。
1 if sg := c.sendq.dequeue(); sg != nil {
2 // Found a waiting sender. If buffer is size 0, receive value
3 // directly from sender. Otherwise, receive from head of queue
4 // and add sender's value to the tail of the queue (both map to
5 // the same buffer slot because the queue is full).
6 recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
7 return true, true
8 }
runtime.recv
函数如下:
1func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
2 if c.dataqsiz == 0 {
3 if raceenabled {
4 racesync(c, sg)
5 }
6 if ep != nil {
7 // copy data from sender
8 recvDirect(c.elemtype, sg, ep)
9 }
10 } else {
11 // Queue is full. Take the item at the
12 // head of the queue. Make the sender enqueue
13 // its item at the tail of the queue. Since the
14 // queue is full, those are both the same slot.
15 qp := chanbuf(c, c.recvx)
16 if raceenabled {
17 racenotify(c, c.recvx, nil)
18 racenotify(c, c.recvx, sg)
19 }
20 // copy data from queue to receiver
21 if ep != nil {
22 typedmemmove(c.elemtype, ep, qp)
23 }
24 // copy data from sender to queue
25 typedmemmove(c.elemtype, qp, sg.elem)
26 c.recvx++
27 if c.recvx == c.dataqsiz {
28 c.recvx = 0
29 }
30 c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
31 }
32 sg.elem = nil
33 gp := sg.g
34 unlockf()
35 gp.param = unsafe.Pointer(sg)
36 sg.success = true
37 if sg.releasetime != 0 {
38 sg.releasetime = cputicks()
39 }
40 goready(gp, skip+1)
41}
接收的流程大致如下:
- 如果channel的缓冲区大小dataqsiz为0,就会调用
recvDirect
函数,这个函数会将sendq中出队的那个Goroutine的sudog的elem数据拷贝到目标内存地址中 - 如果channel有缓冲区,则说明此时缓冲区一定是满的,因为sendq上有发送方等待嘛,此时从缓冲区环形队列按照recvx取出一个数据拷贝到接收目标内存地址中
typedmemmove(c.elemtype, ep, qp)
,并将sendq中出队的那个sudog的elem数据拷贝到缓冲区环形队列中typedmemmove(c.elemtype, ep, qp)
,并维护好channel和缓冲区环形队列的一些状态字段。 - 最后解锁unlockf,并调用
goready
函数,将sudog上的Goroutine标记成Grunnable,可以理解为将这个G放到接收方G所在P上(放到runnext上)等待下一次调度时有可能被M执行,即唤醒之前被阻塞的发送方的G。
2.5 从缓冲区接收 #
如果执行到这一步,没有发送方阻塞在sendq中,此时判断如果缓冲区中有数据,则从缓冲区中接收。 缓冲区中有数据时,从缓冲区循环队列的recvx索引位置接收数据,并维护好channel和缓冲区环形队列的一些状态字段。最后unLock解锁后返回。
1 if c.qcount > 0 {
2 // Receive directly from queue
3 qp := chanbuf(c, c.recvx)
4 if raceenabled {
5 racenotify(c, c.recvx, nil)
6 }
7 if ep != nil {
8 typedmemmove(c.elemtype, ep, qp)
9 }
10 typedmemclr(c.elemtype, qp)
11 c.recvx++
12 if c.recvx == c.dataqsiz {
13 c.recvx = 0
14 }
15 c.qcount--
16 unlock(&c.lock)
17 return true, true
18 }
2.6 阻塞接收方 #
如果执行到这一步,说明没有发送方阻塞在sendq,而且缓冲区中也没有数据可接收,这个时候之后能将接收方阻塞了。
但这里要处理一个小例外,就是select recev的情况,即block=false
的非阻塞接收的情况,如果是非阻塞接收,解锁后直接返回。
1 if !block {
2 unlock(&c.lock)
3 return false, false
4 }
如果是block=true
的情况,需要将接收方阻塞,执行下面的代码主要步骤如下:
- 调用
getg
函数获得接收方的Goroutine - 调用
acquireSudog
获取一个sudog,并将此次接收方的Goroutine、接收的channel等"接收现场"保存到sudog上 c.recvq.enqueue(mysg)
将打包好的sudog放到channel的revq等待队列中- 调用
gopark
函数,waitingReason为waitReasonChanReceive,将当前接收方的Goroutine的状态更新成Gwaiting
。注意gopark函数的第二个参数为channel上的lock,所以释放锁的操作应该是在gopark中执行。 - 执行到
gopark
后,当前接收方的Goroutine就会在这里陷入阻塞状态,并等待被调度器唤醒了 - gopark函数调用之后的代码,主要是如果被调度器唤醒后要执行的一些清理工作,在最后返回true,表示这次之前被阻塞的接收操作运行结束。
1 // no sender available: block on this channel.
2 gp := getg()
3 mysg := acquireSudog()
4 mysg.releasetime = 0
5 if t0 != 0 {
6 mysg.releasetime = -1
7 }
8 // No stack splits between assigning elem and enqueuing mysg
9 // on gp.waiting where copystack can find it.
10 mysg.elem = ep
11 mysg.waitlink = nil
12 gp.waiting = mysg
13 mysg.g = gp
14 mysg.isSelect = false
15 mysg.c = c
16 gp.param = nil
17 c.recvq.enqueue(mysg)
18 // Signal to anyone trying to shrink our stack that we're about
19 // to park on a channel. The window between when this G's status
20 // changes and when we set gp.activeStackChans is not safe for
21 // stack shrinking.
22 atomic.Store8(&gp.parkingOnChan, 1)
23 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
24
25 // someone woke us up
26 if mysg != gp.waiting {
27 throw("G waiting list is corrupted")
28 }
29 gp.waiting = nil
30 gp.activeStackChans = false
31 if mysg.releasetime > 0 {
32 blockevent(mysg.releasetime-t0, 2)
33 }
34 success := mysg.success
35 gp.param = nil
36 mysg.c = nil
37 releaseSudog(mysg)
38 return true, success
3.总结 #
从channel中接收数据时大致有以下几种情况:
- channel为nil时,如果是阻塞式接收,将会一直阻塞;如果是非阻塞式接收(select recev),将会直接返回
- 如果非阻塞并且channel中没有数据可接收(包括无发送方在等待和缓冲区为空的情况),直接返回
- 如果channel已经被关闭,并且channel中没有数据可接收(包括无发送方在等待和缓冲区为空的情况),将直接返回
- 有发送方在sendq上等待时,从发送方接收,并唤醒发送方的G,包含两种情况:
- 对于没有缓冲区的channel,直接接收等待队列中第一个等待的发送方的数据
- 如果有缓冲区,则从缓冲区循环队列中接收数据,并将等待队列中第一个等待的发送方的数据放入缓冲区
- 如果没有发送方在等待,但缓冲区有数据,则从缓冲区中接收数据
- 如果连缓冲区中也没有数据,则将接收方阻塞