如果你已经使用 Go 一段时间,可能了解了一些基本的 Go 并发原语:
goroutine
)的 go
关键字;channel
),用于协程之间的通信;context
)包;sync
和 sync/atomic
包这些语言特性和包相结合,为构建并发应用程序提供了一套非常丰富的工具。你可能还没有发现 golang.org/x/sync
上的“扩展标准库”中提供的一组更高级别的并发原语。我们将在本文中讨论这些内容。
singleflight
包正如 包文档 所述,该包提供了函数重复调用抑制机制。
当你为了响应用户活动而执行计算成本较高(或速度较慢,例如网络访问)的情况时,此包非常有用。例如,假设你有一个包含每个城市的天气信息的数据库,并且你希望将其公开为 API。在某些情况下,你可能会有多个用户同时请求同一城市的天气信息。
当发生这种情况时,如果你可以查询数据库,然后将结果共享给所有等待的请求,那不是很好吗?这正是 singleflight
包的作用!
要使用它,请在某处创建一个 singleflight.Group
。它需要在所有请求之间共享才能正常工作。然后将速度缓慢或开销较大的操作封装在对 group.Do(key, fn)
的调用中。对同一个 key
的多个并发请求只会调用 fn
一次,并且一旦 fn
返回,结果就会返回给所有调用方。
以下是它在实践中的样子:
package weather
type Info struct {
TempC, TempF int // 温度(摄氏度,华氏度)
Conditions string // "sunny"、"snowing"(晴天、下雪)等
}
var group singleflight.Group
func City(city string) (*Info, error) {
results, err, _ := group.Do(city, func() (interface{}, error) {
info, err := fetchWeatherFromDB(city) // 慢操作:从数据库中获取特定城市的天气信息
return info, err
})
if err != nil {
return nil, fmt.Errorf("weather.City %s: %w", city, err)
}
return results.(*Info), nil
}
请注意,我们传递给 group.Do
的闭包必须返回 (interface{}, error)
才能与 Go 类型系统配合使用。在上面的示例中忽略了 group.Do
的第三个返回值,指示结果是否在多个调用方之间共享。
errgroup
包另一个非常宝贵的包是 errgroup 包。最好将其理解为 sync.WaitGroup
,但其中任务返回的错误会传递回等待方。
当你要等待多个操作时,但还想确定这些操作是否全部成功完成时,此包非常有用。例如,在上面的天气示例的基础上,假设你想要一次查询多个城市的天气,如果任何一个查询失败,则会失败。
首先定义一个 errgroup.Group
,然后对每个城市使用 group.Go(fn func() error)
方法。此方法会生成一个协程来运行任务。当你生成所需的所有任务后,请使用 group.Wait()
等待它们完成。请注意,此方法返回 error
,这与 sync.WaitGroup
的等效方法不同。当且仅当所有任务都返回 nil
错误时,此 error
才为 nil
。
在实践中看起来像这样:
func Cities(cities ...string) ([]*Info, error) {
var g errgroup.Group
var mu sync.Mutex
res := make([]*Info, len(cities)) // res[i] 对应 cities[i]
for i, city := range cities {
i, city := i, city // 为下面的闭包创建本地变量
g.Go(func() error {
info, err := City(city)
mu.Lock()
res[i] = info
mu.Unlock()
return err
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return res, nil
}
这里我们分配一个结果切片 res
,以便每个协程都可以写入自己的索引。虽然即使没有 mu
互斥锁,上述代码也是安全的,但由于每个协程都在切片中写入自己的条目,因此我们无论如何都要使用一个互斥锁,以防代码随着时间的推移而发生变化。
上面的代码将同时查找所有给定城市的天气信息。当城市数量较少时,这是可以的,但如果城市数量很多,则可能会导致性能问题。在这些情况下,引入 有界并发(bounded concurrency)是很有用的。
通过使用 信号量,Go 可以很轻松地创建有界限的并发。信号量是一种并发原语,如果你学习过计算机科学,你可能会遇到过它,但如果没有,也不必担心。你可以将信号量用于多种目的,但我们只是使用它们来跟踪正在运行的任务数量,并进行阻塞,直到有空间启动另一个任务为止。
在 Go 中,我们可以通过巧妙地使用通道来实现这一点!如果我们希望一次最多能运行 10 个任务,我们可以使用空的结构体创建一个可容纳 10 个项目的通道: semaphore := make(chan struct{}, 10)
。你可以将其想象为一根可以容纳 10 个球的管子。
要启动一个新任务,如果已经运行太多任务,则阻塞,我们只需尝试在通道上发送一个值: semaphore <- struct{}{}
。这类似于尝试将另一个球推入管道中。如果管道已满,它将等待直到有空间为止。
当一个任务完成时,通过从通道中取出一个值来标记它: <-semaphore
。这类似于从管道的另一端拉出一个球,为另一个球被推入留出空间(另一个任务开始)。
就是这样!我们修改后的 Cities
函数如下所示:
func Cities(cities ...string) ([]*Info, error) {
var g errgroup.Group
var mu sync.Mutex
res := make([]*Info, len(cities)) // res[i] 对应 cities[i]
sem := make(chan struct{}, 10)
for i, city := range cities {
i, city := i, city // 为下面的闭包创建本地变量
sem <- struct{}{} // 任务开始
g.Go(func() error {
info, err := City(city)
mu.Lock()
res[i] = info
mu.Unlock()
<-sem // 任务结束
return err
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return res, nil
}
最后,有时你需要有限的并发性,但并非所有任务都有同样高昂的成本。在这种情况下,我们消耗的资源量将根据廉价和昂贵任务的分配,以及它们的启动方式而有很大的差异。
对于此用例,更好的解决方案是使用 加权有界并发。它的工作原理很简单:我们不需要推测想要同时运行的任务数量,而是为每个任务计算一个“成本”,并从信号量中获取和释放该成本。
我们不能再用通道来模拟这一点了,因为我们需要立即获取和释放全部成本。幸运的是,“扩展标准库”再次拯救了我们!golang.org/x/sync/sempahore 包正是为此目的提供了一个加权信号量的实现。
sem <- struct{}{}
操作称为“获取”(Acquire
),<-sem
操作称为“释放”(Release
)。你会注意到 semaphore.Acquire
方法返回一个错误,这是因为它可以与 context
包一起使用,以便提前中止操作。出于本示例的目的,我们将忽略它。
天气查询示例实际上过于简单,无法保证加权信号量,但为了简单起见,我们假设成本随城市名称的长度而变化。
然后我们得到以下结果:
func Cities(cities ...string) ([]*Info, error) {
ctx := context.TODO() // 替换为实际上下文
var g errgroup.Group
var mu sync.Mutex
res := make([]*Info, len(cities)) // res[i] 对应 cities[i]
sem := semaphore.NewWeighted(100) // 同时处理100个字符
for i, city := range cities {
i, city := i, city // 为下面的闭包创建本地变量
cost := int64(len(city)) // 计算成本
if err := sem.Acquire(ctx, cost); err != nil {
break
}
g.Go(func() error {
info, err := City(city)
mu.Lock()
res[i] = info
mu.Unlock()
sem.Release(cost)
return err
})
}
if err := g.Wait(); err != nil {
return nil, err
} else if err := ctx.Err(); err != nil {
return nil, err
}
return res, nil
}
上面的示例展示了向 Go 程序添加并发性,然后根据需求对其进行微调是多么容易。
- 原文地址:https://encore.dev/blog/advanced-go-concurrency
- 版权归原作者所有
内容声明 | |
---|---|
标题: 【译文】Go 高级并发 | |
链接: https://zixizixi.cn/advanced-go-concurrency | 来源: iTanken |
本作品采用知识共享署名-相同方式共享 4.0 国际许可协议进行许可,转载请保留此声明。
|