在上一篇文章《深入理解 go chan》中,我们讲解了 chan
相关的一些概念、原理等东西, 今天让我们再深入一下,读一下它的源码,看看底层实际上是怎么实现的。
整体设计
我们可以从以下三个角度看 chan
的设计(源码位于 runtime/chan.go
,结构体 hchan
就是 chan
的底层数据结构):
- 存储:
chan
里面的数据是通过一个环形队列来存储的(实际上是一个数组,但是我们视作环形队列来操作。无缓冲chan
不用存储,会直接从sender
复制到receiver
) - 发送:数据发送到
chan
的时候,如果chan
满了,则会将发送数据的协程挂起,将其放入一个协程队列中,chan
空闲的时候会唤醒这个协程队列。如果chan
没满,则发送队列为空。 - 接收:从
chan
中接收数据的时候,如果chan
是空的,则会将接收数据的协程挂起,将其放入一个协程队列中,当chan
有数据的时候会唤醒这个协程队列。如果chan
有数据,则接收队列为空。
文中一些比较关键的名词解释:
sender
: 表示尝试写入chan
的goroutine
。receiver
: 表示尝试从chan
读取数据的goroutine
。sendq
是一个队列,存储那些尝试写入channel
但被阻塞的goroutine
。recvq
是一个队列,存储那些尝试读取channel
但被阻塞的goroutine
。g
表示一个协程。gopark
是将协程挂起的函数,协程状态:_Grunning
=>_Gwaiting
。goready
是将协程改为可运行状态的函数,协程状态:_Gwaiting
=>_Grunnable
。
现在,假设我们有下面这样的一段代码,通过这段代码,我们可以大概看一下 chan
的总体设计:
package main
func main() {
// 创建一个缓冲区大小为 9 的 chan
ch := make(chan int, 9)
// 往 chan 写入 [1,2,3,4,5,6,7]
for i := 0; i < 7; i++ {
ch <- i + 1
}
// 将 1 从缓冲区移出来
<-ch
}
现在,我们的 chan
大概长得像下面这个样子,后面会详细展开将这个图中的所有元素:
上图为了说明而在 recvq 和 sendq 都画了 3 个 G,但实际上 recvq 和 sendq 至少有一个为空。因为不可能有协程正在等待接收数据的时候,还有协程的数据因为发不出去数据而阻塞。
数据结构
在底层,go 是使用 hchan
这个结构体来表示 chan
的,下面是结构体的定义:
type hchan struct {
qcount uint // 缓冲区(环形队列)元素个数
dataqsiz uint // 缓冲区的大小(最多可容纳的元素个数)
buf unsafe.Pointer // 指向缓冲区入口的指针(从 buf 开始 qcount * elemsize 大小的内存就是缓冲区所用的内存)
elemsize uint16 // chan 对应类型元素的大小(主要用以计算第 i 个元素的内存地址)
closed uint32 // chan 是否已经关闭(0-未关闭,1-已关闭)
elemtype *_type // chan 的元素类型
sendx uint // chan 发送操作处理到的位置
recvx uint // chan 接收操作处理到的位置
recvq waitq // 等待接收数据的协程队列(双向链表)
sendq waitq // 等待发送数据的协程队列(双向链表)
// 锁
lock mutex
}
waitq
的数据结构如下:
type waitq struct {
first *sudog
last *sudog
}
waitq
用来保存阻塞在等待或接收数据的协程列表(是一个双向链表),在解除阻塞的时候,需要唤醒这两个队列中的数据。
对应上图各字段详细说明
hchan
,对于 hchan
这个结构体,我们知道,在 go 里面,结构体字段是存储在一段连续的内存上的(可以看看《深入理解 go unsafe》),所以图中用了连续的一段单元格表示。
下面是各字段说明:
qcount
: 写入chan
缓冲区元素个数。我们的代码往chan
中存入了7
个数,然后从中取出了一个数,最终还剩6
个,因此qcount
是6
。dataqsiz
:hchan
缓冲区的长度。它在内存中是连续的一段内存,是一个数组,是通过make
创建的时候传入的,是9
。buf
:hchan
缓冲区指针。指向了一个数组,这个数组就是用来保存发送到chan
的数据的。sendx
、recvx
:写、读操作的下标。指向了buf
指向的数组中的下标,sendx
是下一个发送操作保存的下标,recvx
是下一个接收操作的下标。recvq
、sendq
: 阻塞在chan
读写上的协程列表。底层是双向链表,链表的元素是sudog
(sudog
是一个对g
的封装),我们可以简单地理解为recvq
和sendq
的元素就是g
(协程)。
g 和 sudog 是什么?
上面提到了 g
和 sudog
,g
是底层用来表示协程的结构体,而 sudog
是对 g
的封装,记录了一些额外的信息,比如关联的 hchan
。
在 go 里面,协程调度的模型是 GMP
模型,G
代表协程、M
代表线程、P
表示协程调度器。我上图里面的 G
就是代表协程(当然,实际上是 sudog
)。 还有一个下面会提到的就是 g0
,g0
表示 P
上启动的第一个协程。
GMP
模型是另外一个庞大的话题了,大家可以自行去了解一下,对理解本文也很有好处。因为在 chan
阻塞的时候实际上也是一个协程调度的过程。 具体来说,就是从 g
的栈切换到 g0
的栈,然后重新进行协程调度。这个时候 g
因为从运行状态修改为了等待状态,所以在协程调度中不会将它调度来执行, 而是会去找其他可执行的协程来执行。
创建 chan
我们的 make(chan int, 9)
最终会调用 makechan
方法:
// chantype 是 chan 元素类型,size 是缓冲区大小
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
// 检查元素个数是否合法(不能超过 1<<16 个)
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
// 判断内存是否对齐
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// mem 是 chan 缓冲区(环形队列)所需要的内存大小
// mem = 元素大小 * 元素个数
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// 定义 hchan
var c *hchan
switch {
case mem == 0:
// 队列或者元素大小是 0(比如 make(chan int, 0))
// 只需要分配 hchan 所需要的内存
c = (*hchan)(mallocgc(hchanSize, nil, true))
// ...
case elem.ptrdata == 0:
// elem 类型里面不包含指针
// 分配的内存 = hchan 所需内存 + 缓冲区内存
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
// 分配的是连续的一段内存,缓冲区内存在 hchan 后面
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 元素类型里面包含指针
c = new(hchan)
// buf 需要另外分配内存
c.buf = mallocgc(mem, elem, true)
}
// 单个元素的大小
c.elemsize = uint16(elem.size)
// 元素类型
c.elemtype = elem
// 缓冲区大小
c.dataqsiz = uint(size)
// ...
}
创建 chan
的过程主要就是给 hchan
分配内存的过程:
- 非缓冲
chan
,只需要分配hchan
结构体所需要的内存,无需分配环形队列内存(数据会直接从sender
复制到receiver
) - 缓冲
chan
(不包含指针),分配hchan
所需要的内存和环形队列所需要的内存,其中buf
会紧挨着hchan
- 缓冲
chan
(含指针),hchan
和环形队列所需要的内存单独进行分配
对应到文章开头的图就是,底下的
hchan
和buf
那两段内存。
发送数据
<- 语法糖
在《深入理解 go chan》中,我们说也过,<-
这个操作符号是一种语法糖, 实际上,<-
会被编译成一个函数调用,对于发送操作而言,c <- x
会编译为对下面的函数的调用:
// elem 是被发送到 chan 的数据的指针。
// 对于 ch <- x,ch 对应参数中的 c,unsafe.Pointer(&x) 对应参数中的 elem。
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
另外,对于 select
里面的调用,chansend
会返回一个布尔值给 select
用来判断是否是要选中当前 case
分支。 如果 chan
发送成功,则返回 true
,则 select
的那个分支得以执行。(select...case
本质上是 if...else
,返回 false
表示判断失败。)
chansend 第二个参数的含义
chansend
第二个参数 true
表示是一个阻塞调用,另外一种是在 select
里面的发送操作,在 select
中的操作是非阻塞的。
package main
func main() {
ch := make(chan int, 2)
ch <- 1 // 如果 ch 满了,会阻塞
select {
case ch <- 3: // 非阻塞
}
}
在 select
中对 chan
的读写是非阻塞的,不会导致当前协程阻塞,如果是因为 chan
满或者空无法发送或接收, 则不会导致阻塞在 case
的某一个分支上,还可以继续判断其他 case
分支。
select
中的 send
实现:
// go 代码:
// select {
// case c <- v:
// ... foo
// default:
// ... bar
// }
//
// 实际效果:
//
// if selectnbsend(c, v) {
// ... foo
// } else {
// ... bar
// }
// select 里面往 chan 发送数据的分支,返回的 selected 表示当前的分支是否被选中
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}
chansend 发送实现
- 发送到
nil chan
(select
中发送不阻塞,其他情况阻塞)
如果是在 select
的 case
里面发送,则不会阻塞,其他情况会导致当前 goroutine 挂起,永远阻塞:
示例代码:
// 下面的代码运行会报错:
var ch chan int
// 发送到 nil chan 会永久阻塞
ch <- 1
select {
// 这个发送失败,但是不会阻塞,可继续判断其他分支。
case ch <- 3:
}
- 发送到满了的
chan
(select
中发送不阻塞,其他情况阻塞)
对于无缓冲而且又没有 receiver
,或者是有缓冲但是缓冲满了的情况,发送也会阻塞(我们称其为 full
,也就是满了,满了的 chan
是放不下任何数据了的,所以就无法再往 chan
发送数据了):
receiver 表示等待从 chan 接收数据的协程。
对于满了的 chan
,什么时候可以再次发送呢?那就是有 receiver
接收数据的时候。chan
之所以会满就是因为没有 receiver
,也就是没有从 chan
接收数据的协程。
A. 对于无缓冲的 chan
,在满了的情况下,当有 receiver
来读取数据的时候,数据会直接从 sender
复制到 receiver
中:
B. 对于有缓冲,但是缓冲满了的情况(图中 chan
满了,并且有两个 g
正在等待写入 chan
):
这个发送过程大概如下:
receiver
从chan
中获取到chan
队头元素,然后chan
的队头元素出队。- 发送队列
sendq
对头元素出队,将其要发送的数据写入到chan
缓冲中。最后,sendq
只剩下一个等待写入chan
的g
示例代码:
package main
// 注意:以下代码可能不能正常执行,只是为了描述问题。
func main() {
// 情况 2.A.
var ch1 = make(chan int) // 无缓冲的 chan
ch1 <- 1 // 阻塞
select {
// 不阻塞,但是不会执行这个分支
case ch1 <- 1:
}
// 情况 2.B.
var ch2 = make(chan int, 1) // 有缓冲,缓冲区容量为 1
ch2 <- 1 // 1 写入之后,ch2 的缓冲区满了
go func() {
ch2 <- 2 // 阻塞,调用 gopark 挂起
}()
go func() {
ch2 <- 3 // 阻塞
}()
select {
// 不阻塞,但是不会执行这个分支
case ch2 <- 4:
}
}
- 发送到有缓冲,但是缓冲还没满的
chan
(不阻塞,发送成功)
这种情况比较简单,就是将 sender
要发送的数据写入到 chan
缓冲区:
示例代码:
var ch = make(chan int, 1)
// 不阻塞,1 写入 chan 缓冲区
ch <- 1
chansend 源码解读
阻塞模式下,在发送的过程中,如果遇到无法发送成功的情况,会调用 gopark
来将协程挂起,然后当前协程陷入阻塞状态。
非阻塞模式下(select
),在发送过程中,任何无法发送的情况,都会直接返回 false
,表示发送失败。
// 参数说明:
// c 表示 hchan 实例
// ep 表示要发送的数据所在的地址
// block 是否是阻塞模式(select 语句的 case 里面的发送是非阻塞模式,其他情况是阻塞模式)
// 非阻塞模式下,遇到无法发送的情况,会返回 false。阻塞模式下,遇到无法发送的情况,协程会挂起。
// 返回值:表示是否发送成功。false 的时候,如果是 select 的 case,则表示没有选中这个 case。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 情况 1:nil chan
if c == nil {
// select 语句里面发送数据到 chan 的操作失败,直接返回 false,表示当前的 case 没有被选中。
if !block {
// select 分支没有被选中
return false
}
// 阻塞模式,协程挂起
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// ... 其他代码...
// 不获取锁的情况下快速失败。select 中 chan 满了的时候无法发送成功,直接返回 false,协程无需挂起。
// 场景:非阻塞模式、chan 未关闭、chan 已满(无缓冲且没有接收数据的协程、或者有缓冲但是缓冲区满)
if !block && c.closed == 0 && full(c) {
return false
}
// ... 其他代码...
// 获取锁
lock(&c.lock)
// 如果 chan 已经关闭,则释放锁并 panic,不能往一个已经关闭的 chan 发送数据
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 情况 2.A,又或者是有缓冲但是缓冲区空,有一个正在等待接收数据的 receiver。
// 如果有协程在等待接收数据(说明 chan 缓冲区空、或者 chan 是无缓冲的)
// 则直接将元素传递给这个接收数据的协程,这样就避免了 sender -> chan -> receiver 这个数据复制的过程,效率更高。
// 返回 true 表示 select 的分支可以执行(发送成功)
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 情况 3,发送到缓冲 chan,且 chan 未满
// 没有协程在等待接收数据。
// 缓冲区还有空余,则将数据写入到 chan 的缓冲区
if c.qcount < c.dataqsiz {
// 获取写入的地址
qp := chanbuf(c, c.sendx)
// 通过内存复制的方式写入
typedmemmove(c.elemtype, qp, ep)
// 写入的下标指向下一个位置
c.sendx++
// 如果到超出环形队列尾了,则指向第一个位置
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// chan 里面的元素个数加上 1
c.qcount++
// 释放锁
unlock(&c.lock)
// 发送成功,返回 true
return true
}
// 没有协程在接收数据,而且缓冲区满了。
// 如果是 select 语句里面的发送,则释放锁,返回 false
if !block {
unlock(&c.lock)
return false
}
// 发不出去,当前协程阻塞。
// 阻塞模式下,缓冲区满了,需要将当前协程挂起。
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep // chan 要操作的元素指针
mysg.waitlink = nil
mysg.g = gp // sudog 上的 g 属性
mysg.isSelect = false // 如果是 select,上面已经返回了,因此这里是 false
mysg.c = c // sudog 上的 c 属性
gp.waiting = mysg // g 正在等待的 sudog
gp.param = nil // 当通道操作唤醒被阻塞的 goroutine 时,它将 param 设置为指向已完成的阻塞操作的 sudog
c.sendq.enqueue(mysg) // 将 sudog 放入发送队列
// 在 chan 读写上阻塞的标志
gp.parkingOnChan.Store(true)
// 最关键的一步:将当前协程挂起
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// 保证 ep 指向的地址不被垃圾回收器回收
KeepAlive(ep)
// ...被唤醒了之后的一些收尾操作...
return true
}
// 参数说明:c 是 chan 实例,sg 是等待接收数据的 g,ep 是被发送进 chan 的数据,unlockf 是释放锁的函数。
// 空 chan 上发送,会直接发送给等待接收数据的协程。
// ep 指向的值会被复制到 sg 中(ep -> sg,ep 是被发送的值,sg 是要接收数据的 g)。
// 接收数据的协程会被唤醒。
// 通道 c 必须是空的并且获取了锁。send 会通过 unlockf 来释放锁。
// sg 必须已从 c 中退出队列(从 recvq 这个接收队列中移除)。
// ep 必须不能为 nil,同时指向堆或者调用者的栈。
// sg 是接收队列上的 g。
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// ...其他代码...
// 如果没有忽略返回值,将值直接从 ep 复制到 sg 中
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
// 释放锁
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 最关键的一步:唤醒等待队列中的那个接收到数据的 g
//(也就是之前因为接收不到数据而被阻塞的那个 g)
goready(gp, skip+1)
}
// 参数:t 是 chan 的元素类型,sg 是接收数据的 g(协程),src 是被发送的数据的指针。
// 场景:无缓冲 chan、有缓冲但是缓冲区没数据。
// 作用:将数据直接从发送数据的协程复制到接收数据的协程。
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
// 将 ep 的值直接复制到 sg 中
memmove(dst, src, t.size)
}
// full 报告 c 上的发送是否会阻塞(即通道已满)。
func full(c *hchan) bool {
// c.dataqsiz 是不可变的(创建 chan 后不会再去修改)
// 因此在 chan 操作期间的任何时间读取都是安全的。
if c.dataqsiz == 0 {
// 如果是非缓冲 chan,则看接收队列有没有数据,有则表明满了(没有正在发送的 g)
return c.recvq.first == nil
}
// 如果是缓冲 chan,只需要比较实际元素总数跟缓冲区容量即可
return c.qcount == c.dataqsiz
}
接收数据
<- 语法糖
在发送数据的那一节我们提到了,ch <- x
编译之后,实际上是对 chansend1
的函数调用。同样的,在接收数据的时候, <-
这个操作符也会根据不同情况编译成不同的函数调用:
// elem 是用来保存从 c 中接收到的值的地址的指针
// <- c 编译器处理之后实际上就是下面的这个函数调用。(从通道接收,但是忽略接收到的值)
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
// received 表示是否是从 chan 中接收到的(如果 chan 关闭,则接收到的是零值,received 是 false)
// v, ok := <-c 编译之后的函数(从通道接收,第一个 v 对应 elem,第二个 ok 对应 received)
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
// select 里面的接收操作:
//
// select {
// case v, ok = <-c:
// ... foo
// default:
// ... bar
// }
//
// 实际 go 实现
//
// if selected, ok = selectnbrecv(&v, c); selected {
// ... foo
// } else {
// ... bar
// }
//
// select 里面从 chan 接收数据的分支,返回的 selected 表示当前的分支是否被选中,received 表示是否有数据被接收到
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
return chanrecv(c, elem, false)
}
还需要再提醒一下的是:chan
关闭之后,并且 chan
缓冲区所有数据被接收完之后,received
才会是 false
,并不是一关闭 received
马上返回 false
chanrecv 函数 block 参数的含义
跟 chansend
中的 block
参数的作用一样,用来判断是否是 select
模式下的接收操作,如果是,则在需要阻塞的时候不会阻塞,取而代之的是直接返回。
chanrecv 接收数据实现
- 从
nil chan
接收(select
中接收不阻塞,其他情况阻塞)
从 nil chan
中读取的时候,如果是阻塞模式,会调用 gopark
将协程阻塞起来。
示例代码:
var ch chan int
<-ch
- 从空
chan
接收(select
中接收不阻塞,其他情况阻塞)
判断空的条件为:无缓冲并且没有等待发送数据的 g,或者有缓冲但是缓冲区无数据。
示例代码:
package main
// 注意:以下代码执行不了,只是展示一下实际中对应的代码
func main() {
// 情况 1,无缓冲的 chan,空的
var ch1 = make(chan int)
<-ch1 // 阻塞
select {
// 不阻塞,但是该分支不会执行
case <-ch1:
}
// 情况 2,有缓冲的 chan,空的
var ch2 = make(chan int, 1)
<-ch2 // 阻塞
select {
// 不阻塞,但是该分支不会执行
case <-ch2:
}
}
- 从缓冲区满的
chan
接收(不会阻塞,这个时候sendq
一定不为空)
这种情况不会阻塞,上面已经有图了,这里不再贴了。
- 从缓冲区不满的
chan
接收(不会阻塞)
示例代码:
package main
func main() {
var ch = make(chan int, 2)
ch <- 1
// 从缓冲区没满的 chan 接收
<-ch
}
chanrecv 源码解读
chanrecv
函数:
- 参数:
c
是chan
实例,ep
是用来接收数据的指针,block
表示是否是阻塞模式。 - 返回值:
selected
表示select
语句的case
是否被选中,received
表示接收到的值是否有效。 - 功能:从
c
这个通道接收数据,同时将接收到的数据写入到ep
里。
概览:
ep
可能是nil
,这意味着接收到的值被忽略了(对应<-c
这种形式的接收)。- 如果是非阻塞模式,并且通道无数据,返回
(false, false)
,也就是select
语句中的case
不会被选中。 - 否则,如果
c
关闭了,会对ep
指向的地址设置零值,然后返回(true, false)
。如果是select
语句,意味被选中, - 但是
received
为false
表明返回的数不是通道关闭之前发送的。 - 否则,将从通道中获取到的值写入
ep
指向的地址,并且返回(true, true)
- 一个非
nil
的ep
必须指向堆或者调用者的栈。
// 从 c 读取数据,写入到 ep 指向的地址。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// ...
// c 是 nil chan
if c == nil {
// select 里面的 case 不会被选中
if !block {
return
}
// 阻塞模式时,协程挂起
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
// 在实际执行的时候,如果其他协程都执行完了,只剩下这一个协程(又或者全部协程都是睡眠状态,并且无法被唤醒的那种),那么会报错:
// "fatal error: all goroutines are asleep - deadlock!"
throw("unreachable")
}
// 如果是非阻塞模式(select),并且 c 是空的
if !block && empty(c) {
// chan 未关闭,并且是空的,返回 false,false
if atomic.Load(&c.closed) == 0 {
return
}
// chan 已经关闭,并且 chan 是空的
if empty(c) {
// ...
// 返回一个零值
if ep != nil {
typedmemclr(c.elemtype, ep)
}
// select 分支被选中,但是返回值是无效的,是一个零值
return true, false
}
}
// ...
// 获取锁
lock(&c.lock)
// chan 已关闭
if c.closed != 0 {
// chan 已经关闭,同时也没有数据
if c.qcount == 0 {
// ...
// 释放锁
unlock(&c.lock)
if ep != nil {
// 设置零值
typedmemclr(c.elemtype, ep)
}
// select 的分支被选中,但是返回值无效
return true, false
}
} else {
// chan 未关闭,并且有一个等待发送的元素(对应情况:chan 是满的或者无缓冲而且没有 receiver)
// 如果无缓冲:则将元素直接从 sender 复制到 receiver 中。
// 否则:意味着 c 的缓冲区满了,从环形队列中接收值,将 sg 需要发送的值添加到环形队列尾,
// 实际上这个时候,队列头和队列尾都是同一个位置,因为队列满了。
// 只不过,队列头和队列尾指向的位置会发生变化(都加 1,然后对缓冲区长度取模)。
if sg := c.sendq.dequeue(); sg != nil {
// 找到一个 sender。
// 如果无缓冲,直接从 sender 复制到 receiver
// 否则,环形队列对头元素复制给 receiver,sender 要发送的元素复制进环形队列队尾。
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
// select 分支被选中,接收成功,并且接收的值是有效的。
return true, true
}
}
// 缓冲区有数据,并且缓冲区没满
if c.qcount > 0 {
// qp 是被接收元素的地址
qp := chanbuf(c, c.recvx)
// ...
// 将 qp 指向的值复制到 ep
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 清空队列中 ep 的空间(设置为零值)
typedmemclr(c.elemtype, qp)
// 被接收的下标指向下一个元素
c.recvx++
// 环形队列,回到开头
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 缓冲区长度减 1
c.qcount--
// 释放锁
unlock(&c.lock)
// select 分支被选中,并且接收的值是有效的。
return true, true
}
// 缓冲区空的,并且是非阻塞(select)
if !block {
// 释放锁
unlock(&c.lock)
// 返回 false,false
return false, false
}
// 缓冲区空,并且是阻塞模式,同时没有等待发送的 g
// 没有 sender,阻塞
gp := getg()
mysg := acquireSudog()
// ...
// c 的 recvq,也就是等待接收的队列,在队尾添加当前的 g
c.recvq.enqueue(mysg)
// ...
// g 挂起,等待下一个发送数据的协程
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// ... 被唤醒后的操作 ...
return true, success
}
// recv 处理缓冲区已满的 chan 的接收操作(或者无缓冲,这个函数处理这两种情况)。
// 有两部分:
// 1. 等待发送数据的协程(sender),会将其要发送的数据放入 chan 中,然后这个协程会被唤醒
// 2. 被接收协程接收的值会写入到 ep 中
//
// 对于同步 chan(无缓冲 chan),两个值是同一个。
// 对于异步 chan,接收者从 chan 的缓冲区获取数据,发送方的输入放入 chan 缓冲区。
// 通道 c 必须已满并锁定。recv 会使用 unlockf 来解锁 c。
// sg 必须已经从 c 中移除(准确来说是 c.sendq)。
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 如果无缓冲区
if c.dataqsiz == 0 {
// ...
// 直接将 sender 的要发送的值复制到 ep
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
} else {
// 有缓冲区,但是缓冲区满了。
// 从队列头获取元素,将要发送的值放入队列尾。(实际上操作的是同一个位置,因为环形队列满了)
// 需要获取的值的指针地址
qp := chanbuf(c, c.recvx)
// ...
// 如果需要接收值,则将 qp 复制到 ep(没有忽略返回值)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 将要发送的值写入到 qp(sendq 对头元素要发送的值写入到 qp,也就是 chan 刚刚空出来的位置)
typedmemmove(c.elemtype, qp, sg.elem)
// 队列头、尾指针移动
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
// 释放锁
unlockf()
// ...
// 唤醒协程(这个被唤醒的协程是之前因为发送不出去被阻塞的协程)
goready(gp, skip+1)
}
// 将数据直接从 sender 复制到 receiver
// 场景:发送到无缓冲的 chan
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
src := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
// dst 是 receiver 栈里保存接收值的地址,src 是 sender 栈里要被发送的值的地址
memmove(dst, src, t.size)
}
关闭 chan
chan
关闭的过程比较简单,修改 closed
为 1,然后唤醒发送队列和接收队列里面的 g
,如果发送队列有 g
,被唤醒之后会 panic
,因为不能往一个已经关闭的 chan
发送数据。
// 关闭 chan
func closechan(c *hchan) {
// 不能关闭 nil chan
if c == nil {
panic(plainError("close of nil channel"))
}
// 开启锁
lock(&c.lock)
if c.closed != 0 {
// chan 已经关闭,panic,不能重复关闭。释放锁
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
// ...
// 设置 closed 标志
c.closed = 1
// gList 用来保存阻塞在 chan 上的 g(链表,包括了 sender 和 receiver)
var glist gList
// 释放所有等待读取 chan 的协程(解除阻塞状态)
for {
// recvq 队头元素出队
sg := c.recvq.dequeue()
if sg == nil {
// sendq 已经没有元素了
break
}
// 关闭之后,从 chan 接收到的都是零值
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
// ...
glist.push(gp)
}
// 释放所有正在等待写入 chan 的协程(解除阻塞状态,这些协程会 panic)
for {
// sendq 队头元素出队
sg := c.sendq.dequeue()
if sg == nil {
// sendq 已经没有元素了
break
}
// ...
glist.push(gp)
}
// 释放锁
unlock(&c.lock)
// 将所有等待的协程修改为就绪态
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
// g 状态修改为可运行状态
goready(gp, 3)
}
}
对于实际开发的作用
在上一篇文章和本文中,花了很大的篇幅来讲述 chan
的设计、实现与使用,这么多东西对我们有什么用呢?
其中非常重要的一个作用是,清楚地了解 chan
的工作机制,便于我们对程序实际运行情况进行分析, 尤其是一些非常隐晦的读写 chan
场景,毕竟稍有不慎就会导致协程泄漏,这对进程影响可能是非常大的。
比如下面的这种代码:
package main
import (
"fmt"
"runtime"
"time"
)
func main() {
for i := 0; i < 10000; i++ {
time.Sleep(time.Second)
go func() {
// 永远阻塞,协程泄漏
var ch chan int
ch <- 1
}()
// 我们会看到协程数量逐渐增长。
// 但是这部分挂起的协程永远不会被调度。
fmt.Printf("goroutine count: %d\n", runtime.NumGoroutine())
}
time.Sleep(time.Hour)
}
tips:在
chan
读写的地方需要注意自己的写法会不会让 goroutine 永远陷入阻塞,或者长时间阻塞。
总结
chan
底层是hchan
结构体。- go 语法里面的
<-
不过是语法糖,在编译的时候,会编译成hchan
相关的方法调用。最终都会调用chansend
或者chanrecv
。select...case
里面的chan
读写最终也会编译为对chansend
或chanrecv
的调用。 chan
总体设计:维护了三个队列:hchan.buf
:chan
中暂存sender
发送数据的队列(在有receiver
读取的时候会从这个队列中复制到receiver
中)hchan.recvq
: 接收队列,存储那些尝试读取channel
但被阻塞的goroutine
。hchan.sendq
: 发送队列,存储那些尝试写入channel
但被阻塞的goroutine
。
- 读写
chan
的协程阻塞是通过gopark
实现的,而从阻塞态转换为可运行状态是通过goready
实现的。 - 在
chan
读写操作阻塞的时候,如果是在select
语句中,则会直接返回(表示当前的分支没有被选中),否则,会调用gopark
挂起当前协程。 - 在关闭
chan
的时候,会调用goready
唤醒阻塞在发送或者接收操作上的g
(协程)。 - 无缓冲
chan
的操作有点特殊,对于无缓冲chan
,必须同时有sender
和receiver
才能发送和接收成功,否则另一边都会陷入阻塞(当然,select
不会阻塞)。
今天的文章go chan 设计与实现分享到此就结束了,感谢您的阅读。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/14557.html