面试准备-通道
面试准备-通道
1、什么是CSP(Communicating Sequential Processes)
CSP
是一种发编程模型,它是一种用于描述并发系统中独立进程之间通信和同步的形式化模型。它描述了两个独立的并发实体通过共享的通讯 channel(管道)实现并发的并发模型。Go是第一个将CSP思想引入并融入到语言的核心里。相较于java,python等通过共享内存的同步访问等实现并发编程,Go 的并发编程的模型则用 goroutine 和 channel 来替代。
2、channel的底层数据结构
type hchan struct {
// chan 里元素数量
qcount uint
// chan 底层循环数组的长度
dataqsiz uint
// 指向底层循环数组的指针
// 只针对有缓冲的 channel
buf unsafe.Pointer
// chan 中元素大小
elemsize uint16
// chan 是否被关闭的标志
closed uint32
// chan 中元素类型
elemtype *_type // element type
// 已发送元素在循环数组中的索引
sendx uint // send index
// 已接收元素在循环数组中的索引
recvx uint // receive index
// 等待接收的 goroutine 队列
recvq waitq // list of recv waiters
// 等待发送的 goroutine 队列
sendq waitq // list of send waiters
// 保护 hchan 中所有字段
lock mutex
}
其中有几个比较重要的字段:
buf
:指向底层循环数组,只有缓冲型的channel才有。一个通道相当于一个先进先出(FIFO)的队列。也就是说,通道中的各个元素值都是严格地按照发送的顺序排列的,先被发送通道的元素值一定会先被接收。
sendx
,recvx
:均指向底层循环数组,表示当前可以发送和接收的元素位置索引值(相对于底层数组)。
sendq
,recvq
:分别表示被阻塞的 goroutine,这些 goroutine 由于尝试读取 channel 或向 channel 发送数据而被阻塞。
waitq
是 sudog
的一个双向链表,而 sudog
实际上是对 goroutine 的一个封装:
type waitq struct {
first *sudog
last *sudog
}
lock
用来保证每个读 channel 或写 channel 的操作都是原子的。
例如,创建一个容量为 6 的,元素为 int 型的 channel 数据结构如下 :
创建
我们一般使用make创建一个收发通道
// 无缓冲通道
ch1 := make(chan int)
// 有缓冲通道
ch2 := make(chan int, 10)
编译器在编译的时候,会调用makechan
来创建一个通道
func makechan(t *chantype, size int64) *hchan
从函数原型来看,创建的chan是一个指针,所以我们能在函数直接传递channel,而不是传递channel的指针。
具体来看一下代码:
const hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
func makechan(t *chantype, size int64) *hchan {
elem := t.elem
// 省略了检查 channel size,align 的代码
// ……
var c *hchan
// 如果元素类型不含指针 或者 size 大小为 0(无缓冲类型)
// 只进行一次内存分配
if elem.kind&kindNoPointers != 0 || size == 0 {
// 如果 hchan 结构体中不含指针,GC 就不会扫描 chan 中的元素
// 只分配 "hchan 结构体大小 + 元素大小*个数" 的内存
c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
// 如果是缓冲型 channel 且元素大小不等于 0(大小等于 0的元素类型:struct{})
if size > 0 && elem.size != 0 {
c.buf = add(unsafe.Pointer(c), hchanSize)
} else {
// race detector uses this location for synchronization
// Also prevents us from pointing beyond the allocation (see issue 9401).
// 1. 非缓冲型的,buf 没用,直接指向 chan 起始地址处
// 2. 缓冲型的,能进入到这里,说明元素无指针且元素类型为 struct{},也无影响
// 因为只会用到接收和发送游标,不会真正拷贝东西到 c.buf 处(这会覆盖 chan的内容)
c.buf = unsafe.Pointer(c)
}
} else {
// 进行两次内存分配操作
c = new(hchan)
c.buf = newarray(elem, int(size))
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
// 循环数组长度
c.dataqsiz = uint(size)
// 返回 hchan 指针
return c
}
新建一个 chan 后,内存在堆上分配,大概长这样:
3、有缓冲和无缓冲通道
无缓冲通道(Unbuffered Channel):
- 无缓冲通道没有内部存储空间,每次发送操作都要等待对应的接收操作,每次接收操作都要等待对应的发送操作。
- 发送操作和接收操作是同步的,发送和接收两个goroutine会彼此阻塞,直到另一个goroutine准备好进行匹配的操作。
- 无缓冲通道用于强制进行同步,确保数据在发送和接收之间进行直接传递。
- 通过
<-
操作符进行发送和接收操作时,会引发发送和接收操作的阻塞,直到另一个goroutine准备好进行匹配的操作。
缓冲通道(Buffered Channel):
- 缓冲通道具有内部存储空间,可以在通道中存储一定数量的元素。
- 发送操作在通道未满时可以立即完成,而接收操作在通道非空时可以立即完成。
- 缓冲通道允许发送操作和接收操作以异步方式进行,发送和接收两个goroutine不会彼此直接阻塞等待。
- 通过使用带有缓冲区大小的
make
函数来创建缓冲通道,如ch := make(chan int, bufferSize)
。
缓冲相关的五个属性:
qcount
当前缓冲中元素个数dataqsize
缓冲最大数量buf
指向缓冲区内存,这块内存空间可容纳dataqsize
个元素sendx
缓冲区中下一个元素写入时的位置recvx
缓冲区中下一个被读取的元素的位置
其中通过sendx,recvx可以实现类似于环形队列的机制。这样就保证了保证缓冲区有序,并且不需要在每次取出元素时对缓冲区重新排序。
4、通道的阻塞和非阻塞模式
在Go语言中,通道可以以阻塞或非阻塞的方式进行操作,这两种模式决定了在通道上发送和接收数据的行为。
1、阻塞模式
- 发送数据:如果通道已满(缓冲区已满),发送操作将被阻塞,直到有空间可用为止。
- 接收数据:如果通道为空(缓冲区为空),接收操作将被阻塞,直到有数据可用为止。
2、非阻塞模式
发送数据:如果通道已满(缓冲区已满),发送操作将立即返回一个错误(通常是
false
)。接收数据:如果通道为空(缓冲区为空),接收操作将立即返回一个错误(通常是
false
)。package main import ( "fmt" "time" ) func main() { ch := make(chan int, 1) // 创建一个带有一个缓冲区的通道 // 非阻塞发送 select { case ch <- 42: // 发送数据到通道 fmt.Println("数据已发送") default: fmt.Println("通道已满,发送失败") } // 非阻塞接收 select { case value := <-ch: // 从通道接收数据 fmt.Println("接收到数据:", value) default: fmt.Println("通道为空,接收失败") } // 延时一段时间以便观察输出 time.Sleep(time.Second) }
5、向channel发送数据
要了解向c *hchan
中发送数据时需要的一些字段
c.qcount 通道缓存中的元素个数,无缓冲通道一直为0
c.datasiz 通道底层数组中可存储的元素个数
c.closed 通道状态标志,=0表示为关闭,=1表示关闭
c.sendx 下一个要发送进来的元素在循环数组中的下标位置
c.recvx 下一个要被接受者接收的元素在循环数组中的下标位置
c.recvq 等待接收消息的协程队列
c.sendq 等到发送消息的协程队列
一些状态
c.qcount<c.datasiz: 只在缓冲型通道中出现,说明缓冲空间没可用。
c.recvq.first == nil: 等待队列中有协程,缓存空间为空
c.sendq.first == nil: 发送队列中有协程,缓存空间满
不阻塞的发送:向chan中发送元素时,无需向chan中的buf中拷贝元素值,而是将消息直接拷贝到接收协程的工作栈中,这有两种情况
a、cahnnel是非缓冲型的,这里会检测recvq中是否有等待接收的协程,如果有,直接将数据拷贝到等待协程中,没有的话就挂起发送协程
b、channel是缓冲型的:这里如果检测到缓冲空间已经存满元素就直接挂起发送协程,如果检测到recvq中有等待的接收协程,说明通道缓存中没有元素,这时候就可以直接将元素拷贝到等待携程中去。
这种不向缓冲区中写元素,而是直接将元素发送到指定协程的栈空间中的做法的好处是可以减少一次加锁和内存拷贝的次数,性能更高。
阻塞发送:
如果
c.qcount < c.dataqsiz
,说明缓冲区可用(肯定是缓冲型的 channel)。先通过函数取出待发送元素应该去到的位置:qp := chanbuf(c, c.sendx) // 返回循环队列里第 i 个元素的地址处 func chanbuf(c *hchan, i uint) unsafe.Pointer { return add(c.buf, uintptr(i)*uintptr(c.elemsize)) }
如果没有命中以上条件的,说明 channel 已经满了。不管这个 channel 是缓冲型的还是非缓冲型的,都要将这个 sender “关起来”(goroutine 被阻塞)。如果 block 为 false,直接解锁,返回 false。
拓展
当有多个goroutine等待一个通道的时候如果没有使用同步机制对协程的调用顺序进行控制的话,无法确哪个goroutine将首先接收到数据。这是由Go调度器决定的,它在运行时负责调度goroutine的执行。
6、从channel接收数据
接收操作有两种写法,一种带“OK”,反应channel是够关闭,一种不带 “ok”
经过编译器的处理后,这两种写法最后对应源码里的这两个函数:
// entry points for <- c from compiled code
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
两个函数都指向chanrecv函数:
chanrecv接收三个参数:c:表示绑定的通道; ep:接收元素的写入地址; block:表示接收是以阻塞模式进行还是以非阻塞的方式进行。
如果channel是一个空值(nil),在非阻塞模式下,会直接返回。在阻塞模式下,会调用gopark函数挂起,而且永远不会被唤醒。因为关闭一个nil的cahnnel会直接引发panic
和发送函数一样,接下来搞了一个在非阻塞模式下,不用获取锁,快速检测到失败并且返回的操作。
// 在非阻塞模式下,快速检测到失败,不用获取锁,快速返回 (false, false) if !block && (c.dataqsiz == 0 && c.sendq.first == nil || c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) && atomic.Load(&c.closed) == 0 { return }
当我们观察到 channel 没准备好接收:
- 非缓冲型,等待发送列队里没有 goroutine 在等待
- 缓冲型,但 buf 里没有元素
之后,又观察到 closed == 0,即 channel 未关闭。此时并没由被选中,而且也没有取到数据,最后的返回值是 (false, false)。
7、关闭cahnnel
close 逻辑比较简单,对于一个 channel,recvq 和 sendq 中分别保存了阻塞的发送者和接收者。关闭 channel 后,对于等待接收者而言,会收到一个相应类型的零值。对于等待发送者,会直接 panic。所以,在不了解 channel 还有没有接收者的情况下,不能贸然关闭 channel。
close 函数先上一把大锁,接着把所有挂在这个 channel 上的 sender 和 receiver 全都连成一个 sudog 链表,再解锁。最后,再将所有的 sudog 全都唤醒。
唤醒之后,该干嘛干嘛。sender 会继续执行 chansend 函数里 goparkunlock 函数之后的代码,很不幸,检测到 channel 已经关闭了,panic。receiver 则比较幸运,进行一些扫尾工作后,返回。这里,selected 返回 true,而返回值 received 则要根据 channel 是否关闭,返回不同的值。如果 channel 关闭,received 为 false,否则为 true。这我们分析的这种情况下,received 返回 false。
8、从一个关闭的channel仍然能读取出数据吗?
从一个有缓冲的 channel 里读数据,当 channel 被关闭,依然能读出有效值。只有当返回的 ok 为 false 时,读出的数据才是无效的。
func main() {
ch := make(chan int, 5)
ch <- 18
close(ch)
x, ok := <-ch
if ok {
fmt.Println("received: ", x)
}
x, ok = <-ch
if !ok {
fmt.Println("channel closed, data invalid.")
}
}
运行结果:
received: 18
channel closed, data invalid.
先创建了一个有缓冲的 channel,向其发送一个元素,然后关闭此 channel。之后两次尝试从 channel 中读取数据,第一次仍然能正常读出值。第二次返回的 ok 为 false,说明 channel 已关闭,且通道里没有数据。
9、操作channel的情况总结
操作 | nil channel | closed channel | not nil, not closed channel |
---|---|---|---|
close | panic | panic | 正常关闭 |
读 <- ch | 阻塞 | 读到对应类型的零值 | 阻塞或正常读取数据。缓冲型 channel 为空或非缓冲型 channel 没有等待发送者时会阻塞 |
写 ch <- | 阻塞 | panic | 阻塞或正常写入数据。非缓冲型 channel 没有等待接收者或缓冲型 channel buf 满时会被阻塞 |
总结一下,发生 panic 的情况有三种:向一个关闭的 channel 进行写操作;关闭一个 nil 的 channel;重复关闭一个 channel。
读、写一个 nil channel 都会被阻塞
10、优雅的关闭channel
关于 channel 的使用,有几点不方便的地方:
- 在不改变 channel 自身状态的情况下,无法获知一个 channel 是否关闭。
- 关闭一个 closed channel 会导致 panic。所以,如果关闭 channel 的一方在不知道 channel 是否处于关闭状态时就去贸然关闭 channel 是很危险的事情。
- 向一个 closed channel 发送数据会导致 panic。所以,如果向 channel 发送数据的一方不知道 channel 是否处于关闭状态时就去贸然向 channel 发送数据是很危险的事情。
一个比较粗糙的检查channel是否关闭的函数
func IsClosed(ch <-chan T) bool {
select {
case <-ch:
return true
default:
}
return false
}
func main() {
c := make(chan T)
fmt.Println(IsClosed(c)) // false
close(c)
fmt.Println(IsClosed(c)) // true
}
这个函数其实有很多的问题。首先是每调用一次IsClosed函数都会读出channel中的一个元素,改变了channel的状态。
其次IsClosed函数每次返回的结果仅表示调用的那个瞬间,并不能表示调用之后会不会有其他的goroutine对他进行了操作,改变了它的状态。
有一条流传比较广泛的channel原则:不要从一个receiver 侧关闭 channel,也不要在有多个 sender 时,关闭 channel。
有两个不那么优雅的关闭channel的方法:
使用 defer-recover 机制,放心大胆地关闭 channel 或者向 channel 发送数据。即使发生了 panic,有 defer-recover 在兜底。
func SafeCloseChannel(ch chan int) (justClosed bool) { defer func() { if recover() != nil { justClosed = false } }() close(ch) return true }
使用 sync.Once 来保证只关闭一次。
type MyChannel struct{
C chan struct{}
once sync.Once
}
func NewMyChannel() *MyChannel{
return &MyChannel{C:make(chan struct{})}
}
func (mc *MyChannel) SafeClose(){
mc.once.Do(func(){
close(mc.C)
})
}
那么如何来优雅的关闭通道呢?我们先总结一下sender 和 receiver 的个数有几种情况:
- 一个 sender,一个 receiver
- 一个 sender, M 个 receiver
- N 个 sender,一个 reciver
- N 个 sender, M 个 receiver
对于第1, 2中情况,不必多说,直接在sender中关闭就好了。
第3中情况下,我们的解决方案是增加一个传递关闭信号的channel,receiver通过信号channel下达关闭数据channel指令。senders监听到关闭信号后,停止发送数据。
func main() {
rand.Seed(time.Now().UnixNano())
const Max = 100000
const NumSenders = 1000
dataCh := make(chan int, 100)
stopCh := make(chan struct{})
for i := 0; i < NumSenders; i++ {
go func() {
for {
select {
case <-stopCh:
return
case dataCh <- rand.Intn(Max):
}
}
}()
}
go func() {
for value := range dataCh {
if value == Max-1 {
fmt.Println("send stop signal to senders")
// 当我们关闭一个cahnnel的时候,所有的receiver会受到对应类型的零值。
// 这时候select分支case <-stopCh被选中,退出函数,不再发送数据。
close(stopCh)
return
}
fmt.Println(value)
}
}()
select {
case <-time.After(time.Hour):
}
}
第四种情况下,这里有 M 个 receiver,如果直接还是采取第 3 种解决方案,由 receiver 直接关闭 stopCh 的话,就会重复关闭一个 channel,导致 panic。因此需要增加一个中间人goroutine,M 个 receiver 都向它发送关闭 stopCh 的“请求”,中间人收到第一个请求后,就会直接下达关闭 stopCh 的指令(通过关闭 stopCh,这时就不会发生重复关闭的情况,因为 stopCh 的发送方只有中间人一个)。另外,这里的 N 个 sender 也可以向中间人发送关闭 stopCh 的请求。
func main() {
rand.Seed(time.Now().UnixNano())
const Max = 100000
const NumSenders = 1000
const NumReceiver = 10
dataCh := make(chan int, 100)
stopCh := make(chan struct{})
toStop := make(chan string, 1)
// moderator
go func() {
<-toStop
close(stopCh)
}()
for i := 0; i < NumSenders; i++ {
go func(id string) {
for {
value := rand.Intn(Max)
if value == 0 {
select {
case toStop <- "sender#" + id:
default:
}
return
}
select {
case <-stopCh:
return
case dataCh <- value:
}
}
}(strconv.Itoa(i))
}
for i := 0; i < NumReceiver; i++ {
go func(id string) {
for {
select {
case <-stopCh:
return
case value := <-dataCh:
if value == Max-1 {
select {
case toStop <- "receiver#" + id:
default:
}
return
}
fmt.Println(value)
}
}
}(strconv.Itoa(i))
}
select {
case <-time.After(time.Hour):
}
}
代码中的toStop就是中间人的角色,这里将其声明为一个缓冲型的channel。假设 toStop 声明的是一个非缓冲型的 channel,那么第一个发送的关闭 stopCh 请求可能会丢失。因为无论是 sender 还是 receiver 都是通过 select 语句来发送请求,如果中间人所在的 goroutine 没有准备好,那 select 语句就不会选中,直接走 default 选项,什么也不做。这样,第一个关闭 stopCh 的请求就会丢失。
11、Channel 发送和接收元素的本质是什么?
channel 的发送和接收操作本质上都是 “值的拷贝”,无论是从 sender goroutine 的栈到 chan buf,还是从 chan buf 到 receiver goroutine,或者是直接从 sender goroutine 到 receiver goroutine。
举个例子:
type user struct {
name string
age int
}
var u = user{name: "Ankur", age: 25}
var g = &u
func modifyUser(pu *user) {
fmt.Println("modifyUser Received Vaule", pu)
pu.name = "Anand"
}
func printUser(u <-chan *user) {
time.Sleep(2 * time.Second)
fmt.Println("printUser goRoutine called", <-u)
}
func main() {
c := make(chan *user, 5)
c <- g
fmt.Println(g)
// modify g
g = &user{name: "Ankur Anand", age: 100}
go printUser(c)
go modifyUser(g)
time.Sleep(5 * time.Second)
fmt.Println(g)
}
运行结果:
&{Ankur 25}
modifyUser Received Vaule &{Ankur Anand 100}
printUser goRoutine called &{Ankur 25}
&{Anand 100}
一开始构造一个结构体 u,地址是 0x56420,图中地址上方就是它的内容。接着把 &u
赋值给指针 g
,g 的地址是 0x565bb0,它的内容就是一个地址,指向 u。
main 程序里,先把 g 发送到 c,根据 copy value
的本质,进入到 chan buf 里的就是 0x56420
,它是指针 g 的值(不是它指向的内容),所以打印从 channel 接收到的元素时,它就是 &{Ankur 25}
。因此,这里并不是将指针 g “发送” 到了 channel 里,只是拷贝它的值而已。
12、channel会在什么情况下引起资源泄露
channel可能会引起goroutine的泄露,因为有的goroutine会因为channel处于满或者空的状态一直得不到改变。同时垃圾回收器也不会回收此类资源,进而导致goroutine一直处于被挂起的状态,资源得不到释放。
13 channel有哪些应用
1、停止信号
这个在“如何优雅地关闭 channel”那一节已经讲得很多了
2、任务定时
与timer结合一般有两种方法:实现超时控制,实现定期执行某个任务。
有时候我们想要执行一个任务,但是又不想它浪费太多时间,上一个定时器就可以搞定
select{
case <- time.After(100 * time.Millisecond):
case <- s.stopc:
return false
}
等待 100 ms 后,如果 s.stopc 还没有读出数据或者被关闭,就直接结束。这是来自 etcd 源码里的一个例子,这样的写法随处可见。
定时执行某个任务,也比较简单:
func worker() {
ticker := time.Tick(1 * time.Second)
for {
select {
case <- ticker:
// 执行定时任务
fmt.Println("执行 1s 定时任务")
}
}
ticker := time.Tick(1 * time.Second)
// 创建一个信号通道
sigs := make(chan os.Signal, 1)
// 监听 SIGINT 和 SIGTERM 信号
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
var i = 0
for {
select {
case <-ticker:
// 执行定时任务
fmt.Println(i)
case <-sigs:
// 结束定时任务
fmt.Println("shutdown!!!")
return
}
i++
}
}
每隔 1 秒种,执行一次定时任务。
3、解耦生产方和消费方
服务启动时,启动 n 个 worker,作为工作协程池,这些协程工作在一个 for {}
无限循环里,从某个 channel 消费工作任务并执行:
func worker(taskCh <-chan int) {
const N = 5
for i := 0; i < N; i++ {
go func(id int) {
for {
task := <-taskCh
fmt.Println("finish task: %d by worker %d\n", task, id)
time.After(1 * time.Second)
}
}(i)
}
}
func main() {
// 创建一个信号通道
sigs := make(chan os.Signal, 1)
// 监听 SIGINT 和 SIGTERM 信号
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
taskCh := make(chan int, 100)
go worker(taskCh)
for i := 0; i < 10; i++ {
taskCh <- i
}
select {
case <-time.After(time.Hour):
case <-sigs:
fmt.Println("shutdown!!!")
return
}
}
5 个工作协程在不断地从工作队列里取任务,生产方只管往 channel 发送任务即可,解耦生产方和消费方。
输出结果:
finish task: %d by worker %d
0 4
finish task: %d by worker %d
5 4
finish task: %d by worker %d
6 4
finish task: %d by worker %d
7 4
finish task: %d by worker %d
8 4
finish task: %d by worker %d
9 4
finish task: %d by worker %d
2 0
finish task: %d by worker %d
3 2
finish task: %d by worker %d
4 3
finish task: %d by worker %d
1 1
4、控制并发数
有时需要定时执行几百个任务,例如每天定时按城市来执行一些离线计算的任务。但是并发数又不能太高,因为任务执行过程依赖第三方的一些资源,对请求的速率有限制。这时就可以通过 channel 来控制并发数。
下面是一个例子:
var limit = make(chan int, 3)
func main() {
// …………
for _, w := range work {
go func() {
limit <- 1
w()
<-limit
}()
}
// …………
}
构建一个缓冲型的 channel,容量为 3。接着遍历任务列表,每个任务启动一个 goroutine 去完成。真正执行任务,访问第三方的动作在 w() 中完成,在执行 w() 之前,先要从 limit 中拿“许可证”,拿到许可证之后,才能执行 w(),并且在执行完任务,要将“许可证”归还。这样就可以控制同时运行的 goroutine 数。
这里,limit <- 1
放在 func 内部而不是外部,原因是:
如果在外层,就是控制系统 goroutine 的数量,可能会阻塞 for 循环,影响业务逻辑。
limit 其实和逻辑无关,只是性能调优,放在内层和外层的语义不太一样。
还有一点要注意的是,如果 w() 发生 panic,那“许可证”可能就还不回去了,因此需要使用 defer 来保证。