Go源码学习: 从channel中接收数据的内部实现是什么样的?

Go源码学习: 从channel中接收数据的内部实现是什么样的?

📅 2021-11-03 | 🖱️
🔖 go

本节将学习从channel中接收数据的内部实现。

在Go中从channel中接收数据有两种方式:

1v <- ch
2v, ok <- ch

使用v <- ch从channel中接收数据时,会调用runtime.chanrecv1chanrecv1调用了runtime.chanrecv

1runtime.chanrecv1() /usr/local/Cellar/go/1.17.2/libexec/src/runtime/chan.go:439 (PC: 0x100448e)
2Warning: debugging optimized function
3   434: }
4   435:
5   436: // entry points for <- c from compiled code
6   437: //go:nosplit
7   438: func chanrecv1(c *hchan, elem unsafe.Pointer) {
8=> 439:         chanrecv(c, elem, true)
9   440: }

使用v, ok <- ch从channel中接收数据时,会调用runtime.chanrecv2chanrecv2调用了runtime.chanrecv

 1> runtime.chanrecv2() /usr/local/Cellar/go/1.17.2/libexec/src/runtime/chan.go:444 (PC: 0x1004e0e)
 2Warning: debugging optimized function
 3   439:         chanrecv(c, elem, true)
 4   440: }
 5   441:
 6   442: //go:nosplit
 7   443: func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
 8=> 444:         _, received = chanrecv(c, elem, true)
 9   445:         return
10   446: }

可以看到chanrecv1函数和chanrecv2函数调用chanrecv的传参都是一样的,区别只是chanrecv2获取了received的这个返回结果以确认channel是否已经被关闭。 本节主要学习runtime.chanrecv函数。

1.runtime.chanrecv函数 #

runtime.chanrecv函数的签名如下:

1 runtime.chanrecv() /usr/local/Cellar/go/1.17.2/libexec/src/runtime/chan.go:454 (PC: 0x1004e4f)
2Warning: debugging optimized function
3   449: // ep may be nil, in which case received data is ignored.
4   450: // If block == false and no elements are available, returns (false, false).
5   451: // Otherwise, if c is closed, zeros *ep and returns (true, false).
6   452: // Otherwise, fills in *ep with an element and returns (true, true).
7   453: // A non-nil ep must point to the heap or the caller's stack.
8=> 454: func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

该函数有以下参数:

  • c是hchan,即channel
  • ep指向用于接收数据变量地址,用于接收数据
  • block表示接收操作是否是阻塞操作,v <- chv, ok <- ch两种都是阻塞的,对于使用select语句从channel中接收数据时,调用的是runtime.selectnbrecv,selectnbrecv表示非阻塞接收。selectnbrecv的实现也是调用的runtime.chanrecv,只是传参的时候block=false。因此这个block参数主要是区分是否是select receive

该函数有以下返回值:

  • selected表示是否是selectnbrecv
  • received表示是否成功从channel中接收到数据

2.runtime.chanrecv的内部执行流程 #

下图是在阅读runtime.chanrecv源码,梳理其内部执行流程时,绘制的活动图,只是梳理大致脉络加深理解,不一定完全准确:

runtime-chanrecv.png

2.1 处理channel为nil的情况 #

runtime.chanrecv的第一步是处理channel为nil的情况:

1	if c == nil {
2		if !block {
3			return
4		}
5		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
6		throw("unreachable")
7	}

channel为nil涉及两种情况:

  • block为true时,即v <- chv, ok <- ch两种阻塞式接收,将为调用gopark一直阻塞,waitResaon是ChanReceiveNilChan。对应了Go中的那句话: 从一个nil channel中接收数据时,会一直阻塞,注意这是不严谨的,因为不包含select receive的情况
  • block为false时,即select receive的情况,将直接返回

2.2 处理select receive且无从channel中接收的情况 #

第二步是对select receive非阻塞式接收并且因channel中没有数据而无法从中接收数据的情况处理:

block=false表示无阻塞式接收,empty函数的实现用于判断能否从channel中接收数据,对于无缓冲区的channel,判断sendq上没有正在等待的发送者,对于有缓冲区的channel判断缓冲区中是否是空的。

  • 在channel未关闭时直接返回selected=false, received=false
  • 如果channel已经关闭且再次检查channel中没有数据可被接收时,返回selected=true, received=false
 1	if !block && empty(c) {
 2		if atomic.Load(&c.closed) == 0 {
 3			return
 4		}
 5
 6		if empty(c) {
 7			if raceenabled {
 8				raceacquire(c.raceaddr())
 9			}
10			if ep != nil {
11				typedmemclr(c.elemtype, ep)
12			}
13			return true, false
14		}
15	}
1func empty(c *hchan) bool {
2	// c.dataqsiz is immutable.
3	if c.dataqsiz == 0 {
4		return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
5	}
6	return atomic.Loaduint(&c.qcount) == 0
7}

2.3 处理channel已经关闭且缓冲区中没有数据可接收的情况 #

能执行到第三步的话,就要从channel接收数据了,需要先加锁。接收数据时有几种情况。

本步骤处理的是当channel已经关闭并且channel的缓冲区中没有数据可以接收的情况,即如果channel已关闭且缓冲区为空,就会直接解锁,并清除ep指针中的数据,后直接返回selected=true, received=false。 对应那句从一个已经关闭的channel接收数据,如果缓存区为空将会返回零值。可以使用ok-idom,判断通道是否已被关闭。

 1	lock(&c.lock)
 2
 3	if c.closed != 0 && c.qcount == 0 {
 4		if raceenabled {
 5			raceacquire(c.raceaddr())
 6		}
 7		unlock(&c.lock)
 8		if ep != nil {
 9			typedmemclr(c.elemtype, ep)
10		}
11		return true, false
12	}

2.4 尝试从等待队列sendq上的发送方接收 #

从channel接收数据时,先尝试从阻塞在等待队列sendq中的发送方那里接收。 如下面的代码,会从sendq队列中将队头出列,取出最先陷入等待的发送方Groutine,获取sudog结构,调用runtime.recv函数接收数据。

1	if sg := c.sendq.dequeue(); sg != nil {
2		// Found a waiting sender. If buffer is size 0, receive value
3		// directly from sender. Otherwise, receive from head of queue
4		// and add sender's value to the tail of the queue (both map to
5		// the same buffer slot because the queue is full).
6		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
7		return true, true
8	}

runtime.recv函数如下:

 1func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
 2	if c.dataqsiz == 0 {
 3		if raceenabled {
 4			racesync(c, sg)
 5		}
 6		if ep != nil {
 7			// copy data from sender
 8			recvDirect(c.elemtype, sg, ep)
 9		}
10	} else {
11		// Queue is full. Take the item at the
12		// head of the queue. Make the sender enqueue
13		// its item at the tail of the queue. Since the
14		// queue is full, those are both the same slot.
15		qp := chanbuf(c, c.recvx)
16		if raceenabled {
17			racenotify(c, c.recvx, nil)
18			racenotify(c, c.recvx, sg)
19		}
20		// copy data from queue to receiver
21		if ep != nil {
22			typedmemmove(c.elemtype, ep, qp)
23		}
24		// copy data from sender to queue
25		typedmemmove(c.elemtype, qp, sg.elem)
26		c.recvx++
27		if c.recvx == c.dataqsiz {
28			c.recvx = 0
29		}
30		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
31	}
32	sg.elem = nil
33	gp := sg.g
34	unlockf()
35	gp.param = unsafe.Pointer(sg)
36	sg.success = true
37	if sg.releasetime != 0 {
38		sg.releasetime = cputicks()
39	}
40	goready(gp, skip+1)
41}

接收的流程大致如下:

  • 如果channel的缓冲区大小dataqsiz为0,就会调用recvDirect函数,这个函数会将sendq中出队的那个Goroutine的sudog的elem数据拷贝到目标内存地址中
  • 如果channel有缓冲区,则说明此时缓冲区一定是满的,因为sendq上有发送方等待嘛,此时从缓冲区环形队列按照recvx取出一个数据拷贝到接收目标内存地址中typedmemmove(c.elemtype, ep, qp),并将sendq中出队的那个sudog的elem数据拷贝到缓冲区环形队列中typedmemmove(c.elemtype, ep, qp),并维护好channel和缓冲区环形队列的一些状态字段。
  • 最后解锁unlockf,并调用goready函数,将sudog上的Goroutine标记成Grunnable,可以理解为将这个G放到接收方G所在P上(放到runnext上)等待下一次调度时有可能被M执行,即唤醒之前被阻塞的发送方的G。

2.5 从缓冲区接收 #

如果执行到这一步,没有发送方阻塞在sendq中,此时判断如果缓冲区中有数据,则从缓冲区中接收。 缓冲区中有数据时,从缓冲区循环队列的recvx索引位置接收数据,并维护好channel和缓冲区环形队列的一些状态字段。最后unLock解锁后返回。

 1	if c.qcount > 0 {
 2		// Receive directly from queue
 3		qp := chanbuf(c, c.recvx)
 4		if raceenabled {
 5			racenotify(c, c.recvx, nil)
 6		}
 7		if ep != nil {
 8			typedmemmove(c.elemtype, ep, qp)
 9		}
10		typedmemclr(c.elemtype, qp)
11		c.recvx++
12		if c.recvx == c.dataqsiz {
13			c.recvx = 0
14		}
15		c.qcount--
16		unlock(&c.lock)
17		return true, true
18	}

2.6 阻塞接收方 #

如果执行到这一步,说明没有发送方阻塞在sendq,而且缓冲区中也没有数据可接收,这个时候之后能将接收方阻塞了。 但这里要处理一个小例外,就是select recev的情况,即block=false的非阻塞接收的情况,如果是非阻塞接收,解锁后直接返回。

1	if !block {
2		unlock(&c.lock)
3		return false, false
4	}

如果是block=true的情况,需要将接收方阻塞,执行下面的代码主要步骤如下:

  • 调用getg函数获得接收方的Goroutine
  • 调用acquireSudog获取一个sudog,并将此次接收方的Goroutine、接收的channel等"接收现场"保存到sudog上
  • c.recvq.enqueue(mysg)将打包好的sudog放到channel的revq等待队列中
  • 调用gopark函数,waitingReason为waitReasonChanReceive,将当前接收方的Goroutine的状态更新成Gwaiting。注意gopark函数的第二个参数为channel上的lock,所以释放锁的操作应该是在gopark中执行。
  • 执行到gopark后,当前接收方的Goroutine就会在这里陷入阻塞状态,并等待被调度器唤醒了
  • gopark函数调用之后的代码,主要是如果被调度器唤醒后要执行的一些清理工作,在最后返回true,表示这次之前被阻塞的接收操作运行结束。
 1	// no sender available: block on this channel.
 2	gp := getg()
 3	mysg := acquireSudog()
 4	mysg.releasetime = 0
 5	if t0 != 0 {
 6		mysg.releasetime = -1
 7	}
 8	// No stack splits between assigning elem and enqueuing mysg
 9	// on gp.waiting where copystack can find it.
10	mysg.elem = ep
11	mysg.waitlink = nil
12	gp.waiting = mysg
13	mysg.g = gp
14	mysg.isSelect = false
15	mysg.c = c
16	gp.param = nil
17	c.recvq.enqueue(mysg)
18	// Signal to anyone trying to shrink our stack that we're about
19	// to park on a channel. The window between when this G's status
20	// changes and when we set gp.activeStackChans is not safe for
21	// stack shrinking.
22	atomic.Store8(&gp.parkingOnChan, 1)
23	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
24
25	// someone woke us up
26	if mysg != gp.waiting {
27		throw("G waiting list is corrupted")
28	}
29	gp.waiting = nil
30	gp.activeStackChans = false
31	if mysg.releasetime > 0 {
32		blockevent(mysg.releasetime-t0, 2)
33	}
34	success := mysg.success
35	gp.param = nil
36	mysg.c = nil
37	releaseSudog(mysg)
38	return true, success

3.总结 #

从channel中接收数据时大致有以下几种情况:

  • channel为nil时,如果是阻塞式接收,将会一直阻塞;如果是非阻塞式接收(select recev),将会直接返回
  • 如果非阻塞并且channel中没有数据可接收(包括无发送方在等待和缓冲区为空的情况),直接返回
  • 如果channel已经被关闭,并且channel中没有数据可接收(包括无发送方在等待和缓冲区为空的情况),将直接返回
  • 有发送方在sendq上等待时,从发送方接收,并唤醒发送方的G,包含两种情况:
    • 对于没有缓冲区的channel,直接接收等待队列中第一个等待的发送方的数据
    • 如果有缓冲区,则从缓冲区循环队列中接收数据,并将等待队列中第一个等待的发送方的数据放入缓冲区
  • 如果没有发送方在等待,但缓冲区有数据,则从缓冲区中接收数据
  • 如果连缓冲区中也没有数据,则将接收方阻塞
© 2025 青蛙小白 | 总访问量 | 总访客数