channel源代码实现
·
文章目录
channel实现方式
说明
- chan怎么使用不是本文的主题
- 本文的chan的是基于golang 1.13,系统是mac os
- 本文的思路:自己思考chan有哪些点,他是哪些特点,带着这些特点,去代码中寻找答案
思考包含的主要场景
从过程来看,chan主要涉及到三个大点,
第一怎么创建一个chan
第二怎么发送和接受数据
第三怎么关闭chan
具体上,应该包含这些
- 创建channel
- 有缓存
- 无缓存
- 往channel里面发送数据
- 有缓存
- 无缓存
- 有接收者
- 无接受者
- 从channel里面读取数据
- 有缓存
- 无缓存
- 有数据
- 无数据
- 关闭chan
源码实现
当前代码从/runtime/hchan.go
hchan 介绍
hchan结构体,主要是缓冲区的存贮buf,是一个数组,以及两个队列(双向链表实现)sendq和recvq,主要是利用队列的先进先出的特性,完成chan的发送和接受的顺序,通过lock保证并发正确性
type hchan struct {
qcount uint // total data in the queue,当前data数
dataqsiz uint // size of the circular queue //缓存的大小
buf unsafe.Pointer // points to an array of dataqsiz elements 存放缓存的一个数组,通过index实现的环形数组
elemsize uint16 //chan里面的元素的大小,如chan int,则表示int的大小
elemtype *_type // element type 元素的类型信息
closed uint32 //是否已经关闭,1表示已经关闭,0表示开着
sendx uint // send index 发送者的index, 有缓冲区使用
recvx uint // receive index 接收者的index,有缓冲区使用
recvq waitq // list of recv waiters 接受者的队列(使用双向链表实现),当缓冲区无数据,会放到里面
sendq waitq // list of send waiters 发送者的队列(使用双向链表实现),缓冲区已经满了或者无缓冲区的时候,放到里面
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex //锁,保护hchan的所有元素已经sudogs的元素
}
创建 hchan
代码入口:
函数:makechan64或者makechan,我们从makechan入手
func makechan(t *chantype, size int) *hchan {
...
}
size即缓存的大小
chantype是通过编译过来的chan的类型,包含chan类型信息
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// elem.size*size算出缓冲区的大小,以及是否溢出
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0: //无缓冲区
// Queue or element size is zero.
//直接分配hchan size的内存,c指向它
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers. 元素不包含指针
// Allocate hchan and buf in one call. 直接一次分配hchan和buf
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers. 包含指针,使用new先把hchan分配好,在分配buf的内存
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size) //元素大小
c.elemtype = elem //元素信息
c.dataqsiz = uint(size) //缓冲区的大小
if debugChan { // 可以自己设置debugChan为true,自己想打印源代码,可以使用print函数
print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
}
return c
}
往chan里面发送数据
函数 chansend
- 无缓冲区
- 如果有recvq里面有receiver,直接把值给它
- 如果没有recvq,将数据封装成sudog,放到sendq里,将当前g挂起来
- 有缓冲区
- 如果有recvq里面有receiver,直接把值给它
- 如果没有recvq
- 如果缓冲区没有满,直接放到缓冲区buf里面
- 如果缓冲区满了,将数据封装成sudog,放到 sendq里面,将当前g挂起来
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
}
// c hchan的指针,
// block是否阻塞,select 有个default是不阻塞的即selectnbsend函数的实现
//ep 发送的数据的指针
// callerpc 调用者的程序计数器
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
// and not ready for sending. We behave as if we observed the channel at that moment,
// and report that the send cannot proceed.
//
// It is okay if the reads are reordered here: if we observe that the channel is not
// ready for sending and then observe that it is not closed, that implies that the
// channel wasn't closed during the first observation.
//针对不阻塞的场景,未关闭,无缓冲区,则直接返回false,走select的default场景
//或者有缓冲区,但是缓冲区已经满了场景直接返回false
if !block && c.closed == 0 && (c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 开始上锁,保证并发安全性
lock(&c.lock)
//如果已经关闭,再发送数据,直接panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 在recvq的双向链表里面获取第一个sudog,如果获取到,表明当前已经有接受者,则进行发送数据,send将讲述怎么发送,在send函数的解释里面说
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 有缓冲区的场景
if c.qcount < c.dataqsiz {
//下面的操作是ep位置的数据copy到buf sendx位置,即实现数据的缓冲,然后unlock,返回true
// Space is available in the channel buffer. Enqueue the element to send. buf对应sendx的指针
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
//把ep地址指向的数据move到qp地址上面
typedmemmove(c.elemtype, qp, ep)
c.sendx++
//循环列表
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
// 如果是不阻塞的,直接返回,因为下面的逻辑都是缓冲区是空的,所以直接返回
if !block {
unlock(&c.lock)
return false
}
// 缓冲区为空,则将ep放到sudog里面,然后将sudog放到sendq队列里面去,然后将当前g挂起
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
//这之前当前的g是挂住的,后续代码不会进行,一直到接受者,接受这个数据,才会将会将g恢复到运行状态
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}
函数 send
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
//表示是否启用数据竞争检测
if raceenabled {
if c.dataqsiz == 0 {
racesync(c, sg)
} else {
// Pretend we go through the buffer, even though
// we copy directly. Note that we need to increment
// the head/tail locations only when raceenabled.
qp := chanbuf(c, c.recvx)
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
}
if sg.elem != nil {
//如果要返回值的话,将ep的数据move到receiver的elem去
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 释放receiver对应的g,让对应的g继续运行
goready(gp, skip+1)
}
从chan里面获取数据
函数 chanrecv
- 无缓冲区
- 如果sendq里面有数据,直接sendq获取第一个sudog,获取值
- 如果没有sendq,将数据封装成sudog,放到recvq里面
- 有缓冲区
- 如果sendq里面有数据,取出一个sudog,从缓冲区位置recvx取出值, 放置到返回值里面,然后将sudog里面的值放到刚刚从缓冲区取值的位置recvx里面
- 如果缓冲区有数据,从缓冲区取值
- 如果缓冲区无数据,封装成sudog,放到recvq里面
// c 当前hchan
// ep返回的值的地址,如果为nil,则忽略返回值
// block是否阻塞,跟send的block一个含义
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.
if debugChan {
print("chanrecv: chan=", c, "\n")
}
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
//如果不阻塞,无缓冲去为0 或者当前缓冲区已经满了 且未关闭,直接返回,因为这些都是阻塞式的
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 开始上锁
lock(&c.lock)
// 当前chan已经关闭,并且已经没有数据了,直接返回
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// 从发送队列里面获取一个sudog,如果获取不为空,则进行接收数据,具体recv的说明在后面,这个分为两个场景,一个为缓冲区已经满了,放到了sendq队列,一个是缓冲区为0,直接放到sendq,需要先把sendq里面的数据取出来,将buf里第recv个数据赋值给ep(如果有返回值的话),然后将取出来的sudog里的elem放到buf里的第recv,保证是队列的先进先出
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// 缓冲区里面有数据,分为两个场景,一个为chan已经关闭,一个chan正常
if c.qcount > 0 {
// Receive directly from queue
// 从buf里面获取当前recvx的数据
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
// 有返回值,则将qp指针指向的数据拷贝到ep指针位置
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
//然后将buf revcx位置的数据清空
typedmemclr(c.elemtype, qp)
// recvx加1,如果buf数组都读取完了,直接将recvx设置为0,实现一个循环数组
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 当前数据个数减1,unlock,返回
c.qcount--
unlock(&c.lock)
return true, true
}
// 无缓冲区,不阻塞的读数据,直接返回
if !block {
unlock(&c.lock)
return false, false
}
//如果缓冲区为0或者缓冲区里面没有数据,则将当前ep封装为sudog,然后放到队列recvq里面,然后将当前g挂起
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
函数 recv
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
// 无缓冲区,也要返回elem,则把sg里面存的值赋值给ep,返回
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
}
// copy data from queue to receiver
if ep != nil {
// 要返回值,则将buf里面的第recvx个里面对应值,赋值给ep
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
// 然后把sg里面的elem值赋值到buf里面的第recvx个里面(即qp),环形的buf,保证先把buf里面的值读取完,然后读取sendq里面的sudog里面的值
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
//环形数组
c.recvx = 0
}
// 下个要往缓冲区写入的index
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 将sender的g不在阻塞
goready(gp, skip+1)
}
关闭 chan
close chan,主要是两个释放和将closed设置为1,sendq里面都是为发送成功的数据,直接丢弃,缓冲里面的数据,还可以继续读,如果recvq有等待者,表面chan里面是无数据的,直接释放。
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
// chan已经关闭,再关闭,panic
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}
//将closed设置为1
c.closed = 1
var glist gList
// release all readers,释放所有的readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
//要返回值,将值设置为nil
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
放置到glist
glist.push(gp)
}
// release all writers (they will panic),释放所有挂住的的sender
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
总结
主旨:使用队列的先进先出能力,进行channel发送和接受任务管理
有缓冲则使用 环形数组+双向链表实现带缓存功能的channel
不带缓存功能的:双向链表
更多推荐
已为社区贡献1条内容
所有评论(0)