singleflight凭什么这么快?源码告诉你答案
大家好,我是极客老墨。
前边老墨写了一篇 自从用了singleflight,再也不怕服务器崩了,知道它可以抑制多个重复请求,极大地节约带宽、提升系统性能。用起来确实爽,但你有没有好奇过:这玩意儿底层到底是怎么实现的?为什么几行代码就能搞定并发控制?
今天我们就来扒一扒 singleflight[1] 的源码,看看它的魔法到底藏在哪里。
核心思路:一个请求干活,其他请求白嫖
singleflight 的核心思路很简单:同一时间段内,对于相同的数据请求,只让第一个请求真正执行,其他请求全部阻塞等待。等第一个请求拿到结果后,直接把结果分享给所有等待的请求。
这就像食堂打饭,第一个人去窗口打饭,后面排队的人都等着。等第一个人打完,大家直接复制他的饭菜,不用再排队了。虽然这个比喻有点扯,但意思就是这么个意思。
回顾一下 singleflight 的公开 API:
Group对象:管理所有请求的大管家Result对象:执行结果的包装Do方法:同步执行,阻塞等待结果DoChan方法:异步执行,通过 channel 返回结果
从这些 API 可以推测:对于同一个 key,首个调用会执行真正的逻辑,后续相同 key 的调用都会阻塞,直到第一个请求返回。
singleflight 的源码不多,算上注释一共就 200 来行。我们来逐一分析。
Group:请求管理的大管家
先看 Group 的定义:
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
Group 用一个 map[string]*call 存储所有正在执行的请求。为了保证并发安全,内部持有 sync.Mutex 锁来保护这个 map 的读写。
Group 有两个重要方法 Do 和 DoChan,在 上一篇 已经介绍过了。
再来回顾一下 Do 方法的定义:
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool
参数说明:
key:标记同一请求的 key,相同 key 认为是相同请求fn:真正执行业务逻辑的方法
返回值:
v:fn 方法返回的结果err:fn 方法返回的错误shared:如果抑制了其他请求,返回true
DoChan 方法与 Do 方法的区别在于返回值:
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Resul
DoChan 返回一个只读的 <-chan Result,调用方可以通过 channel 异步接收结果。
call:真正干活的结构
再来看 singleflight 的核心结构 call:
type call struct {
wg sync.WaitGroup
// val、err 均表示 Group 的 Do 方法的返回值
// 在 WaitGroup 完成之前只能写入一次,完成之后只能读
val interface{}
err error
// dups 表示重复调用 Do 方法的次数
// chans 表示抑制调用的返回 chan,调用 DoChan 方法时会向通道中写入结果
dups int
chans []chan<- Result
}
call 表示一次真正的业务方法调用,它内部持有 sync.WaitGroup,用来控制并发:
- 首次执行时调用
WaitGroup.Add(1) - 重复请求调用
WaitGroup.Wait()阻塞 - 执行完成后调用
WaitGroup.Done()释放
这就是 singleflight 的核心机制:用 WaitGroup 来控制并发,简单粗暴,但非常有效。
Do 方法:同步执行的实现
来看 Do 的实现代码:
func (g*Group)Do(keystring,fnfunc()(interface{},error))(vinterface{},errerror,sharedbool){
g.mu.Lock()
// 第一次创建 map
if g.m == nil {
g.m = make(map[string]*call)
}
// 如果 key 已经存在,说明是重复请求
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
// 关键点:等待 fn 方法调用结束
c.wg.Wait()
// 处理错误
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
// 返回结果
return c.val, c.err, true
}
// 创建 call
c := new(call)
// WaitGroup 设置为 1,其他重复调用均会 wait
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
// 调用真正业务逻辑方法 fn
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
逻辑很清晰:
- 加锁,检查
map中是否已经有相同 key 的请求 - 如果有,说明是重复请求,记录重复次数,然后调用
WaitGroup.Wait()阻塞 - 如果没有,创建新的
call,设置WaitGroup为 1,然后执行业务方法 - 执行完成后,阻塞的请求会被唤醒,直接返回结果
关键点在于 call 上的 WaitGroup,这是实现的核心。
再来看业务方法 fn 是如何调用的,也就是 doCall() 方法:
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
normalReturn := false
recovered := false
// 使用两次 defer 来区分错误
defer func() {
if !normalReturn && !recovered {
c.err = errGoexit
}
g.mu.Lock()
defer g.mu.Unlock()
// fn 调用结束,WaitGroup done,阻塞调用可以返回了
c.wg.Done()
// 调用完成后删除
if g.m[key] == c {
delete(g.m, key)
}
if e, ok := c.err.(*panicError); ok {
// 确保 panic 不能被 recover,防止 chan 永久阻塞
if len(c.chans) > 0 {
go panic(e)
select {} // 保留此 goroutine,以便它出现在 crash dump 中
} else {
panic(e)
}
} else if c.err == errGoexit {
} else {
// 正常返回,向 call 的 chans 写入结果
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()
func() {
defer func() {
if !normalReturn {
if r := recover(); r != nil {
c.err = newPanicError(r)
}
}
}()
// 调用 fn
c.val, c.err = fn()
normalReturn = true
}()
if !normalReturn {
recovered = true
}
}
整个方法虽然代码看起来多,其实都是在处理错误。真正的逻辑就一句:
c.val, c.err = fn()
调用 fn,并将返回值和错误赋值给 call。结果和错误处理都在 defer 的匿名函数中,defer 中会调用 WaitGroup.Done(),被阻塞的请求就可以拿到结果了。
DoChan:异步执行的实现
看完了 Do 方法,DoChan 方法的实现就很简单了:
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
go g.doCall(c, key, fn)
return ch
}
与 Do 方法逻辑类似,只是每次调用都会创建一个 channel,并放入 call 的 chans 属性中。同样,只有第一个调用会创建 call 并执行业务方法。
在调用 Do 方法时,call 结构体中的 chans 属性都是 nil,用不到。它是专门给 DoChan 方法设计的。在 doCall 方法中,会向 chans 写入结果:
// 正常返回,向 call 的 chans 写入结果
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
至此,DoChan 方法的逻辑就很清楚了:为每个调用方创建一个 channel,它们可以通过 channel 异步接收结果。重复调用读取 channel 被阻塞,直到第一次调用完成,向 channel 写入结果。由于 channel 本身是阻塞的,不再需要调用 WaitGroup.Wait() 了。
总结
singleflight 的实现主要依赖两个标准库:
sync.WaitGroup:控制并发,只让一个请求执行sync.Mutex:保护 map 的并发读写
DoChan 和 Do 方法的区别在于处理结果的方式,前者多了对 channel 的管理。
理解了这些核心机制,我们就能更好地使用 singleflight 来优化系统性能了。说实话,Go 标准库的设计真的很优雅,简单的几个组件组合起来,就能实现强大的功能。
你在项目中用过 singleflight 吗?有没有遇到什么坑?欢迎评论区讨论!
极客老墨,继续折腾!
引用链接
- [1] singleflight: https://pkg.go.dev/golang.org/x/sync/singleflight
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)