channel实现方式

说明

  1. chan怎么使用不是本文的主题
  2. 本文的chan的是基于golang 1.13,系统是mac os
  3. 本文的思路:自己思考chan有哪些点,他是哪些特点,带着这些特点,去代码中寻找答案

思考包含的主要场景

从过程来看,chan主要涉及到三个大点,
第一怎么创建一个chan
第二怎么发送和接受数据
第三怎么关闭chan
具体上,应该包含这些

  1. 创建channel
    • 有缓存
    • 无缓存
  2. 往channel里面发送数据
    • 有缓存
    • 无缓存
    • 有接收者
    • 无接受者
  3. 从channel里面读取数据
    • 有缓存
    • 无缓存
    • 有数据
    • 无数据
  4. 关闭chan

源码实现

当前代码从/runtime/hchan.go

hchan 介绍

hchan结构体,主要是缓冲区的存贮buf,是一个数组,以及两个队列(双向链表实现)sendq和recvq,主要是利用队列的先进先出的特性,完成chan的发送和接受的顺序,通过lock保证并发正确性

type hchan struct {
	qcount   uint           // total data in the queue,当前data数
	dataqsiz uint           // size of the circular queue //缓存的大小
	buf      unsafe.Pointer // points to an array of dataqsiz elements 存放缓存的一个数组,通过index实现的环形数组
	elemsize uint16  //chan里面的元素的大小,如chan int,则表示int的大小
	elemtype *_type // element type 元素的类型信息
	closed   uint32  //是否已经关闭,1表示已经关闭,0表示开着

	sendx    uint   // send index 发送者的index, 有缓冲区使用
	recvx    uint   // receive index 接收者的index,有缓冲区使用
	recvq    waitq  // list of recv waiters  接受者的队列(使用双向链表实现),当缓冲区无数据,会放到里面
	sendq    waitq  // list of send waiters 发送者的队列(使用双向链表实现),缓冲区已经满了或者无缓冲区的时候,放到里面

	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	lock mutex //锁,保护hchan的所有元素已经sudogs的元素
}

创建 hchan

代码入口:
函数:makechan64或者makechan,我们从makechan入手

func makechan(t *chantype, size int) *hchan {
 ...
}

size即缓存的大小
chantype是通过编译过来的chan的类型,包含chan类型信息

func makechan(t *chantype, size int) *hchan {
	elem := t.elem

	// compiler checks this but be safe.
	if elem.size >= 1<<16 {
		throw("makechan: invalid channel element type")
	}
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		throw("makechan: bad alignment")
	}

   // elem.size*size算出缓冲区的大小,以及是否溢出
	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
	// buf points into the same allocation, elemtype is persistent.
	// SudoG's are referenced from their owning thread so they can't be collected.
	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
	var c *hchan
	switch {
	case mem == 0: //无缓冲区
		// Queue or element size is zero.
		//直接分配hchan size的内存,c指向它
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		// Race detector uses this location for synchronization.
		c.buf = c.raceaddr()
	case elem.ptrdata == 0:
		// Elements do not contain pointers. 元素不包含指针
		// Allocate hchan and buf in one call.  直接一次分配hchan和buf
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		// Elements contain pointers. 包含指针,使用new先把hchan分配好,在分配buf的内存
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}

	c.elemsize = uint16(elem.size)  //元素大小
	c.elemtype = elem //元素信息
	c.dataqsiz = uint(size) //缓冲区的大小
	
	if debugChan { // 可以自己设置debugChan为true,自己想打印源代码,可以使用print函数
		print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
	}
	return c
}

往chan里面发送数据

函数 chansend

  • 无缓冲区
    • 如果有recvq里面有receiver,直接把值给它
    • 如果没有recvq,将数据封装成sudog,放到sendq里,将当前g挂起来
  • 有缓冲区
    • 如果有recvq里面有receiver,直接把值给它
    • 如果没有recvq
      • 如果缓冲区没有满,直接放到缓冲区buf里面
      • 如果缓冲区满了,将数据封装成sudog,放到 sendq里面,将当前g挂起来
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
}
// c hchan的指针,
// block是否阻塞,select  有个default是不阻塞的即selectnbsend函数的实现
//ep 发送的数据的指针
// callerpc 调用者的程序计数器

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	if c == nil {
		if !block {
			return false
		}
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	if debugChan {
		print("chansend: chan=", c, "\n")
	}

	if raceenabled {
		racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
	}

	// Fast path: check for failed non-blocking operation without acquiring the lock.
	//
	// After observing that the channel is not closed, we observe that the channel is
	// not ready for sending. Each of these observations is a single word-sized read
	// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
	// Because a closed channel cannot transition from 'ready for sending' to
	// 'not ready for sending', even if the channel is closed between the two observations,
	// they imply a moment between the two when the channel was both not yet closed
	// and not ready for sending. We behave as if we observed the channel at that moment,
	// and report that the send cannot proceed.
	//
	// It is okay if the reads are reordered here: if we observe that the channel is not
	// ready for sending and then observe that it is not closed, that implies that the
	// channel wasn't closed during the first observation.
	//针对不阻塞的场景,未关闭,无缓冲区,则直接返回false,走select的default场景
	//或者有缓冲区,但是缓冲区已经满了场景直接返回false
	if !block && c.closed == 0 && (c.dataqsiz == 0 && c.recvq.first == nil) ||
		(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
		return false
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

   // 开始上锁,保证并发安全性
	lock(&c.lock)

  //如果已经关闭,再发送数据,直接panic
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}

   // 在recvq的双向链表里面获取第一个sudog,如果获取到,表明当前已经有接受者,则进行发送数据,send将讲述怎么发送,在send函数的解释里面说
	if sg := c.recvq.dequeue(); sg != nil {
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}

   // 有缓冲区的场景
	if c.qcount < c.dataqsiz {
	    //下面的操作是ep位置的数据copy到buf sendx位置,即实现数据的缓冲,然后unlock,返回true
		// Space is available in the channel buffer. Enqueue the element to send. buf对应sendx的指针
		qp := chanbuf(c, c.sendx)
		if raceenabled {
			raceacquire(qp)
			racerelease(qp)
		}
		//把ep地址指向的数据move到qp地址上面
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++
		//循环列表
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		c.qcount++
		unlock(&c.lock)
		return true
	}
    
    // 如果是不阻塞的,直接返回,因为下面的逻辑都是缓冲区是空的,所以直接返回
	if !block {
		unlock(&c.lock)
		return false
	}

   //  缓冲区为空,则将ep放到sudog里面,然后将sudog放到sendq队列里面去,然后将当前g挂起
	// Block on the channel. Some receiver will complete our operation for us.
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
	c.sendq.enqueue(mysg)
	goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
   //这之前当前的g是挂住的,后续代码不会进行,一直到接受者,接受这个数据,才会将会将g恢复到运行状态
	// Ensure the value being sent is kept alive until the
	// receiver copies it out. The sudog has a pointer to the
	// stack object, but sudogs aren't considered as roots of the
	// stack tracer.
	KeepAlive(ep)

	// someone woke us up.
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	if gp.param == nil {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
	releaseSudog(mysg)
	return true
}

函数 send

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	//表示是否启用数据竞争检测
	if raceenabled {
		if c.dataqsiz == 0 {
			racesync(c, sg)
		} else {
			// Pretend we go through the buffer, even though
			// we copy directly. Note that we need to increment
			// the head/tail locations only when raceenabled.
			qp := chanbuf(c, c.recvx)
			raceacquire(qp)
			racerelease(qp)
			raceacquireg(sg.g, qp)
			racereleaseg(sg.g, qp)
			c.recvx++
			if c.recvx == c.dataqsiz {
				c.recvx = 0
			}
			c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
		}
	}
	if sg.elem != nil {
	   //如果要返回值的话,将ep的数据move到receiver的elem去
		sendDirect(c.elemtype, sg, ep)
		sg.elem = nil
	}
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	// 释放receiver对应的g,让对应的g继续运行
	goready(gp, skip+1)
}

从chan里面获取数据

函数 chanrecv

  • 无缓冲区
    • 如果sendq里面有数据,直接sendq获取第一个sudog,获取值
    • 如果没有sendq,将数据封装成sudog,放到recvq里面
  • 有缓冲区
    • 如果sendq里面有数据,取出一个sudog,从缓冲区位置recvx取出值, 放置到返回值里面,然后将sudog里面的值放到刚刚从缓冲区取值的位置recvx里面
    • 如果缓冲区有数据,从缓冲区取值
    • 如果缓冲区无数据,封装成sudog,放到recvq里面
// c 当前hchan
// ep返回的值的地址,如果为nil,则忽略返回值
// block是否阻塞,跟send的block一个含义
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	// raceenabled: don't need to check ep, as it is always on the stack
	// or is new memory allocated by reflect.

	if debugChan {
		print("chanrecv: chan=", c, "\n")
	}

	if c == nil {
		if !block {
			return
		}
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	//如果不阻塞,无缓冲去为0 或者当前缓冲区已经满了 且未关闭,直接返回,因为这些都是阻塞式的
	if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
		c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
		atomic.Load(&c.closed) == 0 {
		return
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

   // 开始上锁
	lock(&c.lock)
   
   // 当前chan已经关闭,并且已经没有数据了,直接返回
	if c.closed != 0 && c.qcount == 0 {
		if raceenabled {
			raceacquire(c.raceaddr())
		}
		unlock(&c.lock)
		if ep != nil {
			typedmemclr(c.elemtype, ep)
		}
		return true, false
	}

   // 从发送队列里面获取一个sudog,如果获取不为空,则进行接收数据,具体recv的说明在后面,这个分为两个场景,一个为缓冲区已经满了,放到了sendq队列,一个是缓冲区为0,直接放到sendq,需要先把sendq里面的数据取出来,将buf里第recv个数据赋值给ep(如果有返回值的话),然后将取出来的sudog里的elem放到buf里的第recv,保证是队列的先进先出 
	if sg := c.sendq.dequeue(); sg != nil {
		// Found a waiting sender. If buffer is size 0, receive value
		// directly from sender. Otherwise, receive from head of queue
		// and add sender's value to the tail of the queue (both map to
		// the same buffer slot because the queue is full).
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}

  // 缓冲区里面有数据,分为两个场景,一个为chan已经关闭,一个chan正常
	if c.qcount > 0 {
		// Receive directly from queue
		// 从buf里面获取当前recvx的数据
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			raceacquire(qp)
			racerelease(qp)
		}
		// 有返回值,则将qp指针指向的数据拷贝到ep指针位置
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		//然后将buf revcx位置的数据清空
		typedmemclr(c.elemtype, qp)

       // recvx加1,如果buf数组都读取完了,直接将recvx设置为0,实现一个循环数组
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		// 当前数据个数减1,unlock,返回
		c.qcount--
		unlock(&c.lock)
		return true, true
	}

  // 无缓冲区,不阻塞的读数据,直接返回
	if !block {
		unlock(&c.lock)
		return false, false
	}

    //如果缓冲区为0或者缓冲区里面没有数据,则将当前ep封装为sudog,然后放到队列recvq里面,然后将当前g挂起
	// no sender available: block on this channel.
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
	c.recvq.enqueue(mysg)
	goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

	// someone woke us up
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	closed := gp.param == nil
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true, !closed
}

函数 recv

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	if c.dataqsiz == 0 {
		if raceenabled {
			racesync(c, sg)
		}
		// 无缓冲区,也要返回elem,则把sg里面存的值赋值给ep,返回
		if ep != nil {
			// copy data from sender
			recvDirect(c.elemtype, sg, ep)
		}
	} else {
		// Queue is full. Take the item at the
		// head of the queue. Make the sender enqueue
		// its item at the tail of the queue. Since the
		// queue is full, those are both the same slot.
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			raceacquire(qp)
			racerelease(qp)
			raceacquireg(sg.g, qp)
			racereleaseg(sg.g, qp)
		}
		// copy data from queue to receiver
		if ep != nil {
		   // 要返回值,则将buf里面的第recvx个里面对应值,赋值给ep
			typedmemmove(c.elemtype, ep, qp)
		}
		// copy data from sender to queue
		// 然后把sg里面的elem值赋值到buf里面的第recvx个里面(即qp),环形的buf,保证先把buf里面的值读取完,然后读取sendq里面的sudog里面的值
		typedmemmove(c.elemtype, qp, sg.elem)
		c.recvx++
		if c.recvx == c.dataqsiz {
		 //环形数组
			c.recvx = 0
		}
		// 下个要往缓冲区写入的index
		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
	}
	sg.elem = nil
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	// 将sender的g不在阻塞
	goready(gp, skip+1)
}

关闭 chan

close chan,主要是两个释放和将closed设置为1,sendq里面都是为发送成功的数据,直接丢弃,缓冲里面的数据,还可以继续读,如果recvq有等待者,表面chan里面是无数据的,直接释放。

func closechan(c *hchan) {
	if c == nil {
		panic(plainError("close of nil channel"))
	}

	lock(&c.lock)
	if c.closed != 0 {
	  // chan已经关闭,再关闭,panic
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}

	if raceenabled {
		callerpc := getcallerpc()
		racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
		racerelease(c.raceaddr())
	}

    //将closed设置为1
	c.closed = 1

	var glist gList

	// release all readers,释放所有的readers
	for {
		sg := c.recvq.dequeue()
		if sg == nil {
			break
		}
		if sg.elem != nil {
		    //要返回值,将值设置为nil
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = nil
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		放置到glist
		glist.push(gp)
	}

	// release all writers (they will panic),释放所有挂住的的sender
	for {
		sg := c.sendq.dequeue()
		if sg == nil {
			break
		}
		sg.elem = nil
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = nil
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}
	unlock(&c.lock)

	// Ready all Gs now that we've dropped the channel lock.
	for !glist.empty() {
		gp := glist.pop()
		gp.schedlink = 0
		goready(gp, 3)
	}
}

总结

主旨:使用队列的先进先出能力,进行channel发送和接受任务管理
有缓冲则使用 环形数组+双向链表实现带缓存功能的channel
不带缓存功能的:双向链表

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐