本节将学习从channel中接收数据的内部实现。

在Go中从channel中接收数据有两种方式:

1
2
v <- ch
v, ok <- ch

使用v <- ch从channel中接收数据时,会调用runtime.chanrecv1chanrecv1调用了runtime.chanrecv

1
2
3
4
5
6
7
8
9
runtime.chanrecv1() /usr/local/Cellar/go/1.17.2/libexec/src/runtime/chan.go:439 (PC: 0x100448e)
Warning: debugging optimized function
   434: }
   435:
   436: // entry points for <- c from compiled code
   437: //go:nosplit
   438: func chanrecv1(c *hchan, elem unsafe.Pointer) {
=> 439:         chanrecv(c, elem, true)
   440: }

使用v, ok <- ch从channel中接收数据时,会调用runtime.chanrecv2chanrecv2调用了runtime.chanrecv

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
> runtime.chanrecv2() /usr/local/Cellar/go/1.17.2/libexec/src/runtime/chan.go:444 (PC: 0x1004e0e)
Warning: debugging optimized function
   439:         chanrecv(c, elem, true)
   440: }
   441:
   442: //go:nosplit
   443: func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
=> 444:         _, received = chanrecv(c, elem, true)
   445:         return
   446: }

可以看到chanrecv1函数和chanrecv2函数调用chanrecv的传参都是一样的,区别只是chanrecv2获取了received的这个返回结果以确认channel是否已经被关闭。 本节主要学习runtime.chanrecv函数。

1.runtime.chanrecv函数

runtime.chanrecv函数的签名如下:

1
2
3
4
5
6
7
8
 runtime.chanrecv() /usr/local/Cellar/go/1.17.2/libexec/src/runtime/chan.go:454 (PC: 0x1004e4f)
Warning: debugging optimized function
   449: // ep may be nil, in which case received data is ignored.
   450: // If block == false and no elements are available, returns (false, false).
   451: // Otherwise, if c is closed, zeros *ep and returns (true, false).
   452: // Otherwise, fills in *ep with an element and returns (true, true).
   453: // A non-nil ep must point to the heap or the caller's stack.
=> 454: func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

该函数有以下参数:

  • c是hchan,即channel
  • ep指向用于接收数据变量地址,用于接收数据
  • block表示接收操作是否是阻塞操作,v <- chv, ok <- ch两种都是阻塞的,对于使用select语句从channel中接收数据时,调用的是runtime.selectnbrecv,selectnbrecv表示非阻塞接收。selectnbrecv的实现也是调用的runtime.chanrecv,只是传参的时候block=false。因此这个block参数主要是区分是否是select receive

该函数有以下返回值:

  • selected表示是否是selectnbrecv
  • received表示是否成功从channel中接收到数据

2.runtime.chanrecv的内部执行流程

下图是在阅读runtime.chanrecv源码,梳理其内部执行流程时,绘制的活动图,只是梳理大致脉络加深理解,不一定完全准确:

runtime-chanrecv.png

2.1 处理channel为nil的情况

runtime.chanrecv的第一步是处理channel为nil的情况:

1
2
3
4
5
6
7
	if c == nil {
		if !block {
			return
		}
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

channel为nil涉及两种情况:

  • block为true时,即v <- chv, ok <- ch两种阻塞式接收,将为调用gopark一直阻塞,waitResaon是ChanReceiveNilChan。对应了Go中的那句话: 从一个nil channel中接收数据时,会一直阻塞,注意这是不严谨的,因为不包含select receive的情况
  • block为false时,即select receive的情况,将直接返回

2.2 处理select receive且无从channel中接收的情况

第二步是对select receive非阻塞式接收并且因channel中没有数据而无法从中接收数据的情况处理:

block=false表示无阻塞式接收,empty函数的实现用于判断能否从channel中接收数据,对于无缓冲区的channel,判断sendq上没有正在等待的发送者,对于有缓冲区的channel判断缓冲区中是否是空的。

  • 在channel未关闭时直接返回selected=false, received=false
  • 如果channel已经关闭且再次检查channel中没有数据可被接收时,返回selected=true, received=false
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
	if !block && empty(c) {
		if atomic.Load(&c.closed) == 0 {
			return
		}

		if empty(c) {
			if raceenabled {
				raceacquire(c.raceaddr())
			}
			if ep != nil {
				typedmemclr(c.elemtype, ep)
			}
			return true, false
		}
	}
1
2
3
4
5
6
7
func empty(c *hchan) bool {
	// c.dataqsiz is immutable.
	if c.dataqsiz == 0 {
		return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
	}
	return atomic.Loaduint(&c.qcount) == 0
}

2.3 处理channel已经关闭且缓冲区中没有数据可接收的情况

能执行到第三步的话,就要从channel接收数据了,需要先加锁。接收数据时有几种情况。

本步骤处理的是当channel已经关闭并且channel的缓冲区中没有数据可以接收的情况,即如果channel已关闭且缓冲区为空,就会直接解锁,并清除ep指针中的数据,后直接返回selected=true, received=false。 对应那句从一个已经关闭的channel接收数据,如果缓存区为空将会返回零值。可以使用ok-idom,判断通道是否已被关闭。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
	lock(&c.lock)

	if c.closed != 0 && c.qcount == 0 {
		if raceenabled {
			raceacquire(c.raceaddr())
		}
		unlock(&c.lock)
		if ep != nil {
			typedmemclr(c.elemtype, ep)
		}
		return true, false
	}

2.4 尝试从等待队列sendq上的发送方接收

从channel接收数据时,先尝试从阻塞在等待队列sendq中的发送方那里接收。 如下面的代码,会从sendq队列中将队头出列,取出最先陷入等待的发送方Groutine,获取sudog结构,调用runtime.recv函数接收数据。

1
2
3
4
5
6
7
8
	if sg := c.sendq.dequeue(); sg != nil {
		// Found a waiting sender. If buffer is size 0, receive value
		// directly from sender. Otherwise, receive from head of queue
		// and add sender's value to the tail of the queue (both map to
		// the same buffer slot because the queue is full).
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}

runtime.recv函数如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	if c.dataqsiz == 0 {
		if raceenabled {
			racesync(c, sg)
		}
		if ep != nil {
			// copy data from sender
			recvDirect(c.elemtype, sg, ep)
		}
	} else {
		// Queue is full. Take the item at the
		// head of the queue. Make the sender enqueue
		// its item at the tail of the queue. Since the
		// queue is full, those are both the same slot.
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			racenotify(c, c.recvx, nil)
			racenotify(c, c.recvx, sg)
		}
		// copy data from queue to receiver
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		// copy data from sender to queue
		typedmemmove(c.elemtype, qp, sg.elem)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
	}
	sg.elem = nil
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	sg.success = true
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	goready(gp, skip+1)
}

接收的流程大致如下:

  • 如果channel的缓冲区大小dataqsiz为0,就会调用recvDirect函数,这个函数会将sendq中出队的那个Goroutine的sudog的elem数据拷贝到目标内存地址中
  • 如果channel有缓冲区,则说明此时缓冲区一定是满的,因为sendq上有发送方等待嘛,此时从缓冲区环形队列按照recvx取出一个数据拷贝到接收目标内存地址中typedmemmove(c.elemtype, ep, qp),并将sendq中出队的那个sudog的elem数据拷贝到缓冲区环形队列中typedmemmove(c.elemtype, ep, qp),并维护好channel和缓冲区环形队列的一些状态字段。
  • 最后解锁unlockf,并调用goready函数,将sudog上的Goroutine标记成Grunnable,可以理解为将这个G放到接收方G所在P上(放到runnext上)等待下一次调度时有可能被M执行,即唤醒之前被阻塞的发送方的G。

2.5 从缓冲区接收

如果执行到这一步,没有发送方阻塞在sendq中,此时判断如果缓冲区中有数据,则从缓冲区中接收。 缓冲区中有数据时,从缓冲区循环队列的recvx索引位置接收数据,并维护好channel和缓冲区环形队列的一些状态字段。最后unLock解锁后返回。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
	if c.qcount > 0 {
		// Receive directly from queue
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			racenotify(c, c.recvx, nil)
		}
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
		unlock(&c.lock)
		return true, true
	}

2.6 阻塞接收方

如果执行到这一步,说明没有发送方阻塞在sendq,而且缓冲区中也没有数据可接收,这个时候之后能将接收方阻塞了。 但这里要处理一个小例外,就是select recev的情况,即block=false的非阻塞接收的情况,如果是非阻塞接收,解锁后直接返回。

1
2
3
4
	if !block {
		unlock(&c.lock)
		return false, false
	}

如果是block=true的情况,需要将接收方阻塞,执行下面的代码主要步骤如下:

  • 调用getg函数获得接收方的Goroutine
  • 调用acquireSudog获取一个sudog,并将此次接收方的Goroutine、接收的channel等"接收现场"保存到sudog上
  • c.recvq.enqueue(mysg)将打包好的sudog放到channel的revq等待队列中
  • 调用gopark函数,waitingReason为waitReasonChanReceive,将当前接收方的Goroutine的状态更新成Gwaiting。注意gopark函数的第二个参数为channel上的lock,所以释放锁的操作应该是在gopark中执行。
  • 执行到gopark后,当前接收方的Goroutine就会在这里陷入阻塞状态,并等待被调度器唤醒了
  • gopark函数调用之后的代码,主要是如果被调度器唤醒后要执行的一些清理工作,在最后返回true,表示这次之前被阻塞的接收操作运行结束。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
	// no sender available: block on this channel.
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
	c.recvq.enqueue(mysg)
	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
	atomic.Store8(&gp.parkingOnChan, 1)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

	// someone woke us up
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	success := mysg.success
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true, success

3.总结

从channel中接收数据时大致有以下几种情况:

  • channel为nil时,如果是阻塞式接收,将会一直阻塞;如果是非阻塞式接收(select recev),将会直接返回
  • 如果非阻塞并且channel中没有数据可接收(包括无发送方在等待和缓冲区为空的情况),直接返回
  • 如果channel已经被关闭,并且channel中没有数据可接收(包括无发送方在等待和缓冲区为空的情况),将直接返回
  • 有发送方在sendq上等待时,从发送方接收,并唤醒发送方的G,包含两种情况:
    • 对于没有缓冲区的channel,直接接收等待队列中第一个等待的发送方的数据
    • 如果有缓冲区,则从缓冲区循环队列中接收数据,并将等待队列中第一个等待的发送方的数据放入缓冲区
  • 如果没有发送方在等待,但缓冲区有数据,则从缓冲区中接收数据
  • 如果连缓冲区中也没有数据,则将接收方阻塞