Go源码学习: 往channel中发送数据的内部实现是什么样的?

Go源码学习: 往channel中发送数据的内部实现是什么样的?

📅 2021-11-02 | 🖱️
🔖 go

前面学习了channel的底层数据结构hchan以及hchan是如何被创建的。 使用类似ch <- 3往channel中发送数据时,会调用runtime.chansend1chansend1调用了chansend,本节将主要学习runtime.chansend这个函数。

 1> runtime.chansend1() /usr/local/Cellar/go/1.17.2/libexec/src/runtime/chan.go:143 (PC: 0x1003a8e)
 2Warning: debugging optimized function
 3   138: }
 4   139:
 5   140: // entry point for c <- x from compiled code
 6   141: //go:nosplit
 7   142: func chansend1(c *hchan, elem unsafe.Pointer) {
 8=> 143:         chansend(c, elem, true, getcallerpc())
 9   144: }
10   145:
11   146: /*
12   147:  * generic single channel send/recv
13   148:  * If block is not nil,

runtime.chansend函数签名如下:

1   158: func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
2=> 159:         if c == nil {
3   160:                 if !block {
4   161:                         return false
5   162:                 }
6   163:                 gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)

chansend函数的各个参数如下:

  • c是hchan,即channel
  • ep为要发送的数据
  • blockchansend1传值为true,表示发送操作是一个阻塞操作,对于使用select语句往channel中发送数据时,调用的是runtime.selectnbsend,selectnbsend表示非阻塞发送,它也是调用的runtime.chansend只是这个block参数传递是false
  • callerpc是调用方的PC/IP寄存器值,基本阅读runtime.chansend的流程可以先忽略它

学习了chansend函数的各个参数后,下面进入对chansend函数实现的学习。

1.runtime.chansend的内部执行流程 #

下图是在阅读runtime.chansend源码,梳理其内部执行流程时,绘制的活动图,只是梳理大致脉络加深理解,不一定完全准确:

runtime-chansend.png

1.1 对channel为nil的处理逻辑 #

gopher在刚开始学习channel时,一定会看到这句话"向一个nil channel中发送数据,将会一直阻塞",其实这句话说的不太严谨,应该不包含select send的情况。 runtime.chansend函数的第一部分就是对channel为nil的情况的处理:

1	if c == nil {
2		if !block {
3			return false
4		}
5		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
6		throw("unreachable")
7	}
  • 如果channel为nil,block为false时,会直接return false,block为false对应非阻塞发送的情况。对应select send情况,会直接返回不会阻塞。
  • 如果channel为nil,block为true时,调用gopark函数,waitReason是waitReasonChanSendNilChan,将会一直阻塞

1.2 对channel还未关闭且缓冲区已满时的select send时的处理逻辑 #

1	if !block && c.closed == 0 && full(c) {
2		return false
3	}

如果bolck为false并且channel未关闭并且channel已经满了,直接返回,block为false赌赢select send的情况,如果channel已经满了,不能再往里发送了直接返回。 注意full(c)函数的实现除了判断对有缓冲区channel判断缓冲区是否已经满了,对于无缓冲区的channel则判断recevq等待队列上是否有正在等待的接收者。

 1func full(c *hchan) bool {
 2	// c.dataqsiz is immutable (never written after the channel is created)
 3	// so it is safe to read at any time during channel operation.
 4	if c.dataqsiz == 0 {
 5		// Assumes that a pointer read is relaxed-atomic.
 6		return c.recvq.first == nil
 7	}
 8	// Assumes that a uint read is relaxed-atomic.
 9	return c.qcount == c.dataqsiz
10}

1.3 对channel已经关闭时的处理逻辑 #

经过前面1, 2两步,执行到第3步,此时channel缓冲区未满(或者等待队列recevq不为空),就满足向channel中发送数据的前提条件之一了。 因此后续的操作需要加锁,调用了lock函数。即在发送数据的逻辑执行之前会先为当前channel加锁,防止出现竞争条件的问题。 lock之后,判断channel是否已经通过closed字段被标记成了关闭,就不能再写入了,unlock后马上panic。即向一个已经关闭的channel中发送数据会panic

1	lock(&c.lock)
2
3	if c.closed != 0 {
4		unlock(&c.lock)
5		panic(plainError("send on closed channel"))
6	}

1.4 尝试直接发送给recevq上的接收者 #

向channel发送数据时,先尝试发给等待队列中的接收者。如果有接收者挂载在recevq上,那说明channel的缓冲区内一定没有数据,要不然接受者不会在等待队列中。 因此存在接收者的情况下,直接把数据发送给接收者。

具体的操作是会通过recvq.dequeue从recvq中取出最先陷入等待的Goroutine,获取sudog结构,并直接向它发送数据(runtime.send函数),发送完成后返回true结束发送流程。

1	if sg := c.recvq.dequeue(); sg != nil {
2		// Found a waiting receiver. We pass the value we want to send
3		// directly to the receiver, bypassing the channel buffer (if any).
4		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
5		return true
6	}

发送数据时调用的send函数如下,这个函数中调用了两个关键的函数sendDirectgoready,另外执行了unlock操作。

  • sendDirect将发送的数据拷贝到接收方持有的目标内存地址上,即把数据发送给等待的接收方
  • goready函数: 会将sudog上的Goroutine标记成Grunnable,可以理解为将这个G放到发送方G所在P上(放到runnext上)等待下一次调度时有可能被M执行,即唤醒之前被阻塞的接收方的G,这是Go运行时调度相关的知识。
 1func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
 2	if raceenabled {
 3		if c.dataqsiz == 0 {
 4			racesync(c, sg)
 5		} else {
 6			// Pretend we go through the buffer, even though
 7			// we copy directly. Note that we need to increment
 8			// the head/tail locations only when raceenabled.
 9			racenotify(c, c.recvx, nil)
10			racenotify(c, c.recvx, sg)
11			c.recvx++
12			if c.recvx == c.dataqsiz {
13				c.recvx = 0
14			}
15			c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
16		}
17	}
18	if sg.elem != nil {
19		sendDirect(c.elemtype, sg, ep)
20		sg.elem = nil
21	}
22	gp := sg.g
23	unlockf()
24	gp.param = unsafe.Pointer(sg)
25	sg.success = true
26	if sg.releasetime != 0 {
27		sg.releasetime = cputicks()
28	}
29	goready(gp, skip+1)
30}

1.5 往channel的缓冲区中发送 #

如果前面第4步中,recevq上没有接收者在等待,则在第5步中会尝试往channel的缓冲区中发送。

 1	if c.qcount < c.dataqsiz {
 2		// Space is available in the channel buffer. Enqueue the element to send.
 3		qp := chanbuf(c, c.sendx)
 4		if raceenabled {
 5			racenotify(c, c.sendx, nil)
 6		}
 7		typedmemmove(c.elemtype, qp, ep)
 8		c.sendx++
 9		if c.sendx == c.dataqsiz {
10			c.sendx = 0
11		}
12		c.qcount++
13		unlock(&c.lock)
14		return true
15	}

如果channel的缓冲区还未满,则将发送的数据写入channel缓冲区的循环队列中,这段代码也比较少,主要是入队以及循环队列上一些状态字段维护工作,数据写入后,释放锁,并返回true结束发送流程。 具体的步骤如下:

  • 使用chanbuf函数计算出下一个可以放置数据的位置
  • 使用typedmemmove函数将数据拷贝到缓冲区
  • 维护channel及缓冲区循环队列的sendxqcount字段
  • unlock解锁

1.6 阻塞发送方 #

如果能到第6步的话,说明没有recevq中没有等待的接收方,缓冲区也满了,也没有位置放发送方的数据了,这个时候需要将发送方阻塞。 但这里要处理一个小例外,就是select send的情况,即block=false的非阻塞发送的情况,如果是非阻塞发送,解锁后直接返回false。

1	if !block {
2		unlock(&c.lock)
3		return false
4	}

如果是block=true的情况,需要将发送方阻塞,执行下面的代码,主要的执行步骤如下:

  • 调用getg函数获得发送方的Goroutine
  • 调用acquireSudog获取一个sudog,并将此次发送的Goroutine、发送的channel、发送的数据等"发送现场"保存到sudog上
  • c.sendq.enqueue(mysg)将刚才打包好的sudog放到channel的sendq等待队列中
  • 调用gopark函数,waitingReason为waitReasonChanSend,将当前发送方的Goroutine的状态更新成Gwaiting。注意gopark函数的第二个参数为channel上的lock,所以释放锁的操作应该是在gopark中执行。
  • 执行到gopark后,当前发送方的Goroutine就会在这里陷入阻塞状态,并等待被调度器唤醒了
  • gopark函数调用之后的代码,主要是如果被调度器唤醒后要执行的一些清理工作,在最后返回true,表示这次之前被阻塞的发送运行结束。(ps: 之前被阻塞的数据发送结束了,数据是如何被取走的,这里并没有,而是在从channel中接收数据的流程里)
 1	// Block on the channel. Some receiver will complete our operation for us.
 2	gp := getg()
 3	mysg := acquireSudog()
 4	mysg.releasetime = 0
 5	if t0 != 0 {
 6		mysg.releasetime = -1
 7	}
 8	// No stack splits between assigning elem and enqueuing mysg
 9	// on gp.waiting where copystack can find it.
10	mysg.elem = ep
11	mysg.waitlink = nil
12	mysg.g = gp
13	mysg.isSelect = false
14	mysg.c = c
15	gp.waiting = mysg
16	gp.param = nil
17	c.sendq.enqueue(mysg)
18	// Signal to anyone trying to shrink our stack that we're about
19	// to park on a channel. The window between when this G's status
20	// changes and when we set gp.activeStackChans is not safe for
21	// stack shrinking.
22	atomic.Store8(&gp.parkingOnChan, 1)
23	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
24	// Ensure the value being sent is kept alive until the
25	// receiver copies it out. The sudog has a pointer to the
26	// stack object, but sudogs aren't considered as roots of the
27	// stack tracer.
28	KeepAlive(ep)
29
30	// someone woke us up.
31	if mysg != gp.waiting {
32		throw("G waiting list is corrupted")
33	}
34	gp.waiting = nil
35	gp.activeStackChans = false
36	closed := !mysg.success
37	gp.param = nil
38	if mysg.releasetime > 0 {
39		blockevent(mysg.releasetime-t0, 2)
40	}
41	mysg.c = nil
42	releaseSudog(mysg)
43	if closed {
44		if c.closed == 0 {
45			throw("chansend: spurious wakeup")
46		}
47		panic(plainError("send on closed channel"))
48	}
49	return true

2.总结 #

最后简单总结一下使用类似ch <- 3向channel中发送数据时大致会有几种情况:

  • channel为nil时,如果是阻塞式发送,将会一直阻塞;如果是非阻塞式发送(select send),将会直接返回false
  • 如果是非阻塞式发送,且缓冲区已满,且channel未关闭,将会直接返回false
  • 如果channel已经关闭,将会panic
  • 如果recevq等待队列上有等待的接收方,将直接发送给等待的接收方,并唤醒接收方的G
  • 如果channel缓冲区未满,将发送数据到缓冲区
  • 如果缓冲区已满,且是阻塞式发送的话,将会阻塞发送方,并等待唤醒

返回false的情况主要发生在非阻塞发送(select send),返回false就意味着发送失败,如果是for select send下一次还会重新尝试

© 2025 青蛙小白 | 总访问量 | 总访客数