前面学习了channel的底层数据结构hchan以及hchan是如何被创建的。 使用类似ch <- 3往channel中发送数据时,会调用runtime.chansend1chansend1调用了chansend,本节将主要学习runtime.chansend这个函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
> runtime.chansend1() /usr/local/Cellar/go/1.17.2/libexec/src/runtime/chan.go:143 (PC: 0x1003a8e)
Warning: debugging optimized function
   138: }
   139:
   140: // entry point for c <- x from compiled code
   141: //go:nosplit
   142: func chansend1(c *hchan, elem unsafe.Pointer) {
=> 143:         chansend(c, elem, true, getcallerpc())
   144: }
   145:
   146: /*
   147:  * generic single channel send/recv
   148:  * If block is not nil,

runtime.chansend函数签名如下:

1
2
3
4
5
6
   158: func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
=> 159:         if c == nil {
   160:                 if !block {
   161:                         return false
   162:                 }
   163:                 gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)

chansend函数的各个参数如下:

  • c是hchan,即channel
  • ep为要发送的数据
  • blockchansend1传值为true,表示发送操作是一个阻塞操作,对于使用select语句往channel中发送数据时,调用的是runtime.selectnbsend,selectnbsend表示非阻塞发送,它也是调用的runtime.chansend只是这个block参数传递是false
  • callerpc是调用方的PC/IP寄存器值,基本阅读runtime.chansend的流程可以先忽略它

学习了chansend函数的各个参数后,下面进入对chansend函数实现的学习。

1.runtime.chansend的内部执行流程

下图是在阅读runtime.chansend源码,梳理其内部执行流程时,绘制的活动图,只是梳理大致脉络加深理解,不一定完全准确:

runtime-chansend.png

1.1 对channel为nil的处理逻辑

gopher在刚开始学习channel时,一定会看到这句话"向一个nil channel中发送数据,将会一直阻塞",其实这句话说的不太严谨,应该不包含select send的情况。 runtime.chansend函数的第一部分就是对channel为nil的情况的处理:

1
2
3
4
5
6
7
	if c == nil {
		if !block {
			return false
		}
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}
  • 如果channel为nil,block为false时,会直接return false,block为false对应非阻塞发送的情况。对应select send情况,会直接返回不会阻塞。
  • 如果channel为nil,block为true时,调用gopark函数,waitReason是waitReasonChanSendNilChan,将会一直阻塞

1.2 对channel还未关闭且缓冲区已满时的select send时的处理逻辑

1
2
3
	if !block && c.closed == 0 && full(c) {
		return false
	}

如果bolck为false并且channel未关闭并且channel已经满了,直接返回,block为false赌赢select send的情况,如果channel已经满了,不能再往里发送了直接返回。 注意full(c)函数的实现除了判断对有缓冲区channel判断缓冲区是否已经满了,对于无缓冲区的channel则判断recevq等待队列上是否有正在等待的接收者。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func full(c *hchan) bool {
	// c.dataqsiz is immutable (never written after the channel is created)
	// so it is safe to read at any time during channel operation.
	if c.dataqsiz == 0 {
		// Assumes that a pointer read is relaxed-atomic.
		return c.recvq.first == nil
	}
	// Assumes that a uint read is relaxed-atomic.
	return c.qcount == c.dataqsiz
}

1.3 对channel已经关闭时的处理逻辑

经过前面1, 2两步,执行到第3步,此时channel缓冲区未满(或者等待队列recevq不为空),就满足向channel中发送数据的前提条件之一了。 因此后续的操作需要加锁,调用了lock函数。即在发送数据的逻辑执行之前会先为当前channel加锁,防止出现竞争条件的问题。 lock之后,判断channel是否已经通过closed字段被标记成了关闭,就不能再写入了,unlock后马上panic。即向一个已经关闭的channel中发送数据会panic

1
2
3
4
5
6
	lock(&c.lock)

	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}

1.4 尝试直接发送给recevq上的接收者

向channel发送数据时,先尝试发给等待队列中的接收者。如果有接收者挂载在recevq上,那说明channel的缓冲区内一定没有数据,要不然接受者不会在等待队列中。 因此存在接收者的情况下,直接把数据发送给接收者。

具体的操作是会通过recvq.dequeue从recvq中取出最先陷入等待的Goroutine,获取sudog结构,并直接向它发送数据(runtime.send函数),发送完成后返回true结束发送流程。

1
2
3
4
5
6
	if sg := c.recvq.dequeue(); sg != nil {
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}

发送数据时调用的send函数如下,这个函数中调用了两个关键的函数sendDirectgoready,另外执行了unlock操作。

  • sendDirect将发送的数据拷贝到接收方持有的目标内存地址上,即把数据发送给等待的接收方
  • goready函数: 会将sudog上的Goroutine标记成Grunnable,可以理解为将这个G放到发送方G所在P上(放到runnext上)等待下一次调度时有可能被M执行,即唤醒之前被阻塞的接收方的G,这是Go运行时调度相关的知识。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	if raceenabled {
		if c.dataqsiz == 0 {
			racesync(c, sg)
		} else {
			// Pretend we go through the buffer, even though
			// we copy directly. Note that we need to increment
			// the head/tail locations only when raceenabled.
			racenotify(c, c.recvx, nil)
			racenotify(c, c.recvx, sg)
			c.recvx++
			if c.recvx == c.dataqsiz {
				c.recvx = 0
			}
			c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
		}
	}
	if sg.elem != nil {
		sendDirect(c.elemtype, sg, ep)
		sg.elem = nil
	}
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	sg.success = true
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	goready(gp, skip+1)
}

1.5 往channel的缓冲区中发送

如果前面第4步中,recevq上没有接收者在等待,则在第5步中会尝试往channel的缓冲区中发送。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
	if c.qcount < c.dataqsiz {
		// Space is available in the channel buffer. Enqueue the element to send.
		qp := chanbuf(c, c.sendx)
		if raceenabled {
			racenotify(c, c.sendx, nil)
		}
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		c.qcount++
		unlock(&c.lock)
		return true
	}

如果channel的缓冲区还未满,则将发送的数据写入channel缓冲区的循环队列中,这段代码也比较少,主要是入队以及循环队列上一些状态字段维护工作,数据写入后,释放锁,并返回true结束发送流程。 具体的步骤如下:

  • 使用chanbuf函数计算出下一个可以放置数据的位置
  • 使用typedmemmove函数将数据拷贝到缓冲区
  • 维护channel及缓冲区循环队列的sendxqcount字段
  • unlock解锁

1.6 阻塞发送方

如果能到第6步的话,说明没有recevq中没有等待的接收方,缓冲区也满了,也没有位置放发送方的数据了,这个时候需要将发送方阻塞。 但这里要处理一个小例外,就是select send的情况,即block=false的非阻塞发送的情况,如果是非阻塞发送,解锁后直接返回false。

1
2
3
4
	if !block {
		unlock(&c.lock)
		return false
	}

如果是block=true的情况,需要将发送方阻塞,执行下面的代码,主要的执行步骤如下:

  • 调用getg函数获得发送方的Goroutine
  • 调用acquireSudog获取一个sudog,并将此次发送的Goroutine、发送的channel、发送的数据等"发送现场"保存到sudog上
  • c.sendq.enqueue(mysg)将刚才打包好的sudog放到channel的sendq等待队列中
  • 调用gopark函数,waitingReason为waitReasonChanSend,将当前发送方的Goroutine的状态更新成Gwaiting。注意gopark函数的第二个参数为channel上的lock,所以释放锁的操作应该是在gopark中执行。
  • 执行到gopark后,当前发送方的Goroutine就会在这里陷入阻塞状态,并等待被调度器唤醒了
  • gopark函数调用之后的代码,主要是如果被调度器唤醒后要执行的一些清理工作,在最后返回true,表示这次之前被阻塞的发送运行结束。(ps: 之前被阻塞的数据发送结束了,数据是如何被取走的,这里并没有,而是在从channel中接收数据的流程里)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
	// Block on the channel. Some receiver will complete our operation for us.
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
	c.sendq.enqueue(mysg)
	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
	atomic.Store8(&gp.parkingOnChan, 1)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
	// Ensure the value being sent is kept alive until the
	// receiver copies it out. The sudog has a pointer to the
	// stack object, but sudogs aren't considered as roots of the
	// stack tracer.
	KeepAlive(ep)

	// someone woke us up.
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	closed := !mysg.success
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
	releaseSudog(mysg)
	if closed {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	return true

2.总结

最后简单总结一下使用类似ch <- 3向channel中发送数据时大致会有几种情况:

  • channel为nil时,如果是阻塞式发送,将会一直阻塞;如果是非阻塞式发送(select send),将会直接返回false
  • 如果是非阻塞式发送,且缓冲区已满,且channel未关闭,将会直接返回false
  • 如果channel已经关闭,将会panic
  • 如果recevq等待队列上有等待的接收方,将直接发送给等待的接收方,并唤醒接收方的G
  • 如果channel缓冲区未满,将发送数据到缓冲区
  • 如果缓冲区已满,且是阻塞式发送的话,将会阻塞发送方,并等待唤醒

返回false的情况主要发生在非阻塞发送(select send),返回false就意味着发送失败,如果是for select send下一次还会重新尝试