Go源码学习: 往channel中发送数据的内部实现是什么样的?
2021-11-02
前面学习了channel的底层数据结构hchan以及hchan是如何被创建的。
使用类似ch <- 3
往channel中发送数据时,会调用runtime.chansend1
,chansend1
调用了chansend
,本节将主要学习runtime.chansend
这个函数。
1> runtime.chansend1() /usr/local/Cellar/go/1.17.2/libexec/src/runtime/chan.go:143 (PC: 0x1003a8e)
2Warning: debugging optimized function
3 138: }
4 139:
5 140: // entry point for c <- x from compiled code
6 141: //go:nosplit
7 142: func chansend1(c *hchan, elem unsafe.Pointer) {
8=> 143: chansend(c, elem, true, getcallerpc())
9 144: }
10 145:
11 146: /*
12 147: * generic single channel send/recv
13 148: * If block is not nil,
runtime.chansend
函数签名如下:
1 158: func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
2=> 159: if c == nil {
3 160: if !block {
4 161: return false
5 162: }
6 163: gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
chansend函数的各个参数如下:
c
是hchan,即channelep
为要发送的数据block
从chansend1
传值为true,表示发送操作是一个阻塞操作,对于使用select语句往channel中发送数据时,调用的是runtime.selectnbsend
,selectnbsend表示非阻塞发送,它也是调用的runtime.chansend
只是这个block
参数传递是falsecallerpc
是调用方的PC/IP寄存器值,基本阅读runtime.chansend
的流程可以先忽略它
学习了chansend函数的各个参数后,下面进入对chansend函数实现的学习。
1.runtime.chansend的内部执行流程 #
下图是在阅读runtime.chansend
源码,梳理其内部执行流程时,绘制的活动图,只是梳理大致脉络加深理解,不一定完全准确:
1.1 对channel为nil的处理逻辑 #
gopher在刚开始学习channel时,一定会看到这句话"向一个nil channel中发送数据,将会一直阻塞",其实这句话说的不太严谨,应该不包含select send的情况。
runtime.chansend
函数的第一部分就是对channel为nil的情况的处理:
1 if c == nil {
2 if !block {
3 return false
4 }
5 gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
6 throw("unreachable")
7 }
- 如果channel为nil,block为false时,会直接return false,block为false对应非阻塞发送的情况。对应select send情况,会直接返回不会阻塞。
- 如果channel为nil,block为true时,调用gopark函数,waitReason是
waitReasonChanSendNilChan
,将会一直阻塞
1.2 对channel还未关闭且缓冲区已满时的select send时的处理逻辑 #
1 if !block && c.closed == 0 && full(c) {
2 return false
3 }
如果bolck为false并且channel未关闭并且channel已经满了,直接返回,block为false赌赢select send的情况,如果channel已经满了,不能再往里发送了直接返回。
注意full(c)
函数的实现除了判断对有缓冲区channel判断缓冲区是否已经满了,对于无缓冲区的channel则判断recevq
等待队列上是否有正在等待的接收者。
1func full(c *hchan) bool {
2 // c.dataqsiz is immutable (never written after the channel is created)
3 // so it is safe to read at any time during channel operation.
4 if c.dataqsiz == 0 {
5 // Assumes that a pointer read is relaxed-atomic.
6 return c.recvq.first == nil
7 }
8 // Assumes that a uint read is relaxed-atomic.
9 return c.qcount == c.dataqsiz
10}
1.3 对channel已经关闭时的处理逻辑 #
经过前面1, 2两步,执行到第3步,此时channel缓冲区未满(或者等待队列recevq
不为空),就满足向channel中发送数据的前提条件之一了。
因此后续的操作需要加锁,调用了lock函数。即在发送数据的逻辑执行之前会先为当前channel加锁,防止出现竞争条件的问题。
lock之后,判断channel是否已经通过closed字段被标记成了关闭,就不能再写入了,unlock后马上panic。即向一个已经关闭的channel中发送数据会panic
。
1 lock(&c.lock)
2
3 if c.closed != 0 {
4 unlock(&c.lock)
5 panic(plainError("send on closed channel"))
6 }
1.4 尝试直接发送给recevq上的接收者 #
向channel发送数据时,先尝试发给等待队列中的接收者。如果有接收者挂载在recevq上,那说明channel的缓冲区内一定没有数据,要不然接受者不会在等待队列中。 因此存在接收者的情况下,直接把数据发送给接收者。
具体的操作是会通过recvq.dequeue从recvq中取出最先陷入等待的Goroutine,获取sudog结构,并直接向它发送数据(runtime.send函数),发送完成后返回true结束发送流程。
1 if sg := c.recvq.dequeue(); sg != nil {
2 // Found a waiting receiver. We pass the value we want to send
3 // directly to the receiver, bypassing the channel buffer (if any).
4 send(c, sg, ep, func() { unlock(&c.lock) }, 3)
5 return true
6 }
发送数据时调用的send
函数如下,这个函数中调用了两个关键的函数sendDirect
和goready
,另外执行了unlock操作。
sendDirect
将发送的数据拷贝到接收方持有的目标内存地址上,即把数据发送给等待的接收方goready
函数: 会将sudog上的Goroutine标记成Grunnable
,可以理解为将这个G放到发送方G所在P上(放到runnext上)等待下一次调度时有可能被M执行,即唤醒之前被阻塞的接收方的G,这是Go运行时调度相关的知识。
1func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
2 if raceenabled {
3 if c.dataqsiz == 0 {
4 racesync(c, sg)
5 } else {
6 // Pretend we go through the buffer, even though
7 // we copy directly. Note that we need to increment
8 // the head/tail locations only when raceenabled.
9 racenotify(c, c.recvx, nil)
10 racenotify(c, c.recvx, sg)
11 c.recvx++
12 if c.recvx == c.dataqsiz {
13 c.recvx = 0
14 }
15 c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
16 }
17 }
18 if sg.elem != nil {
19 sendDirect(c.elemtype, sg, ep)
20 sg.elem = nil
21 }
22 gp := sg.g
23 unlockf()
24 gp.param = unsafe.Pointer(sg)
25 sg.success = true
26 if sg.releasetime != 0 {
27 sg.releasetime = cputicks()
28 }
29 goready(gp, skip+1)
30}
1.5 往channel的缓冲区中发送 #
如果前面第4步中,recevq上没有接收者在等待,则在第5步中会尝试往channel的缓冲区中发送。
1 if c.qcount < c.dataqsiz {
2 // Space is available in the channel buffer. Enqueue the element to send.
3 qp := chanbuf(c, c.sendx)
4 if raceenabled {
5 racenotify(c, c.sendx, nil)
6 }
7 typedmemmove(c.elemtype, qp, ep)
8 c.sendx++
9 if c.sendx == c.dataqsiz {
10 c.sendx = 0
11 }
12 c.qcount++
13 unlock(&c.lock)
14 return true
15 }
如果channel的缓冲区还未满,则将发送的数据写入channel缓冲区的循环队列中,这段代码也比较少,主要是入队以及循环队列上一些状态字段维护工作,数据写入后,释放锁,并返回true结束发送流程。 具体的步骤如下:
- 使用
chanbuf
函数计算出下一个可以放置数据的位置 - 使用
typedmemmove
函数将数据拷贝到缓冲区 - 维护channel及缓冲区循环队列的
sendx
和qcount
字段 - unlock解锁
1.6 阻塞发送方 #
如果能到第6步的话,说明没有recevq
中没有等待的接收方,缓冲区也满了,也没有位置放发送方的数据了,这个时候需要将发送方阻塞。
但这里要处理一个小例外,就是select send
的情况,即block=false
的非阻塞发送的情况,如果是非阻塞发送,解锁后直接返回false。
1 if !block {
2 unlock(&c.lock)
3 return false
4 }
如果是block=true
的情况,需要将发送方阻塞,执行下面的代码,主要的执行步骤如下:
- 调用
getg
函数获得发送方的Goroutine - 调用
acquireSudog
获取一个sudog,并将此次发送的Goroutine、发送的channel、发送的数据等"发送现场"保存到sudog上 c.sendq.enqueue(mysg)
将刚才打包好的sudog放到channel的sendq等待队列中- 调用
gopark
函数,waitingReason为waitReasonChanSend
,将当前发送方的Goroutine的状态更新成Gwaiting
。注意gopark
函数的第二个参数为channel上的lock,所以释放锁的操作应该是在gopark中执行。 - 执行到
gopark
后,当前发送方的Goroutine就会在这里陷入阻塞状态,并等待被调度器唤醒了 gopark
函数调用之后的代码,主要是如果被调度器唤醒后要执行的一些清理工作,在最后返回true,表示这次之前被阻塞的发送运行结束。(ps: 之前被阻塞的数据发送结束了,数据是如何被取走的,这里并没有,而是在从channel中接收数据的流程里)
1 // Block on the channel. Some receiver will complete our operation for us.
2 gp := getg()
3 mysg := acquireSudog()
4 mysg.releasetime = 0
5 if t0 != 0 {
6 mysg.releasetime = -1
7 }
8 // No stack splits between assigning elem and enqueuing mysg
9 // on gp.waiting where copystack can find it.
10 mysg.elem = ep
11 mysg.waitlink = nil
12 mysg.g = gp
13 mysg.isSelect = false
14 mysg.c = c
15 gp.waiting = mysg
16 gp.param = nil
17 c.sendq.enqueue(mysg)
18 // Signal to anyone trying to shrink our stack that we're about
19 // to park on a channel. The window between when this G's status
20 // changes and when we set gp.activeStackChans is not safe for
21 // stack shrinking.
22 atomic.Store8(&gp.parkingOnChan, 1)
23 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
24 // Ensure the value being sent is kept alive until the
25 // receiver copies it out. The sudog has a pointer to the
26 // stack object, but sudogs aren't considered as roots of the
27 // stack tracer.
28 KeepAlive(ep)
29
30 // someone woke us up.
31 if mysg != gp.waiting {
32 throw("G waiting list is corrupted")
33 }
34 gp.waiting = nil
35 gp.activeStackChans = false
36 closed := !mysg.success
37 gp.param = nil
38 if mysg.releasetime > 0 {
39 blockevent(mysg.releasetime-t0, 2)
40 }
41 mysg.c = nil
42 releaseSudog(mysg)
43 if closed {
44 if c.closed == 0 {
45 throw("chansend: spurious wakeup")
46 }
47 panic(plainError("send on closed channel"))
48 }
49 return true
2.总结 #
最后简单总结一下使用类似ch <- 3
向channel中发送数据时大致会有几种情况:
- channel为nil时,如果是阻塞式发送,将会一直阻塞;如果是非阻塞式发送(select send),将会直接返回false
- 如果是非阻塞式发送,且缓冲区已满,且channel未关闭,将会直接返回false
- 如果channel已经关闭,将会panic
- 如果recevq等待队列上有等待的接收方,将直接发送给等待的接收方,并唤醒接收方的G
- 如果channel缓冲区未满,将发送数据到缓冲区
- 如果缓冲区已满,且是阻塞式发送的话,将会阻塞发送方,并等待唤醒
返回false的情况主要发生在非阻塞发送(select send),返回false就意味着发送失败,如果是
for select send
下一次还会重新尝试