子兮子兮 子兮子兮

No can, but will.

目录
【译文】Go 高级并发
/        

【译文】Go 高级并发

原文地址:https://encore.dev/blog/advanced-go-concurrency

如果你已经使用 Go 一段时间,可能了解了一些基本的 Go 并发原语:

  • 用于生成 协程goroutine)的 go 关键字;
  • 通道channel),用于协程之间的通信;
  • 用于传递取消的 上下文context)包;
  • 用于较低级别原语(如互斥锁和原子内存访问)的 syncsync/atomic

这些语言特性和包相结合,为构建并发应用程序提供了一套非常丰富的工具。你可能还没有发现 golang.org/x/sync上的“扩展标准库”中提供的一组更高级别的并发原语。我们将在本文中讨论这些内容。

singleflight

正如 包文档 所述,该包提供了函数重复调用抑制机制。

当你为了响应用户活动而执行计算成本较高(或速度较慢,例如网络访问)的情况时,此包非常有用。例如,假设你有一个包含每个城市的天气信息的数据库,并且你希望将其公开为 API。在某些情况下,你可能会有多个用户同时请求同一城市的天气信息。

当发生这种情况时,如果你可以查询数据库,然后将结果共享给所有等待的请求,那不是很好吗?这正是 singleflight 包的作用!

要使用它,请在某处创建一个 singleflight.Group。它需要在所有请求之间共享才能正常工作。然后将速度缓慢或开销较大的操作封装在对 group.Do(key, fn) 的调用中。对同一个 key 的多个并发请求只会调用 fn 一次,并且一旦 fn 返回,结果就会返回给所有调用方。

以下是它在实践中的样子:

package weather

type Info struct {
	TempC, TempF int    // 温度(摄氏度,华氏度)
	Conditions   string // "sunny"、"snowing"(晴天、下雪)等
}

var group singleflight.Group

func City(city string) (*Info, error) {
	results, err, _ := group.Do(city, func() (interface{}, error) {
		info, err := fetchWeatherFromDB(city) // 慢操作:从数据库中获取特定城市的天气信息
		return info, err
	})
	if err != nil {
		return nil, fmt.Errorf("weather.City %s: %w", city, err)
	}
	return results.(*Info), nil
}

请注意,我们传递给 group.Do 的闭包必须返回 (interface{}, error) 才能与 Go 类型系统配合使用。在上面的示例中忽略了 group.Do 的第三个返回值,指示结果是否在多个调用方之间共享。

errgroup

另一个非常宝贵的包是 errgroup 包。最好将其理解为 sync.WaitGroup,但其中任务返回的错误会传递回等待方。

当你要等待多个操作时,但还想确定这些操作是否全部成功完成时,此包非常有用。例如,在上面的天气示例的基础上,假设你想要一次查询多个城市的天气,如果任何一个查询失败,则会失败。

首先定义一个 errgroup.Group,然后对每个城市使用 group.Go(fn func() error) 方法。此方法会生成一个协程来运行任务。当你生成所需的所有任务后,请使用 group.Wait() 等待它们完成。请注意,此方法返回 error,这与 sync.WaitGroup 的等效方法不同。当且仅当所有任务都返回 nil 错误时,此 error 才为 nil

在实践中看起来像这样:

func Cities(cities ...string) ([]*Info, error) {
	var g errgroup.Group
	var mu sync.Mutex
	res := make([]*Info, len(cities)) // res[i] 对应 cities[i]

	for i, city := range cities {
		i, city := i, city // 为下面的闭包创建本地变量
		g.Go(func() error {
			info, err := City(city)
			mu.Lock()
			res[i] = info
			mu.Unlock()
			return err
		})
	}
	if err := g.Wait(); err != nil {
		return nil, err
	}
	return res, nil
}

这里我们分配一个结果切片 res,以便每个协程都可以写入自己的索引。虽然即使没有 mu 互斥锁,上述代码也是安全的,但由于每个协程都在切片中写入自己的条目,因此我们无论如何都要使用一个互斥锁,以防代码随着时间的推移而发生变化。

有界并发

上面的代码将同时查找所有给定城市的天气信息。当城市数量较少时,这是可以的,但如果城市数量很多,则可能会导致性能问题。在这些情况下,引入 有界并发bounded concurrency)是很有用的。

通过使用 信号量,Go 可以很轻松地创建有界限的并发。信号量是一种并发原语,如果你学习过计算机科学,你可能会遇到过它,但如果没有,也不必担心。你可以将信号量用于多种目的,但我们只是使用它们来跟踪正在运行的任务数量,并进行阻塞,直到有空间启动另一个任务为止。

在 Go 中,我们可以通过巧妙地使用通道来实现这一点!如果我们希望一次最多能运行 10 个任务,我们可以使用空的结构体创建一个可容纳 10 个项目的通道: semaphore := make(chan struct{}, 10)。你可以将其想象为一根可以容纳 10 个球的管子。

要启动一个新任务,如果已经运行太多任务,则阻塞,我们只需尝试在通道上发送一个值: semaphore <- struct{}{}。这类似于尝试将另一个球推入管道中。如果管道已满,它将等待直到有空间为止。

当一个任务完成时,通过从通道中取出一个值来标记它: <-semaphore。这类似于从管道的另一端拉出一个球,为另一个球被推入留出空间(另一个任务开始)。

就是这样!我们修改后的 Cities 函数如下所示:

func Cities(cities ...string) ([]*Info, error) {
	var g errgroup.Group
	var mu sync.Mutex
	res := make([]*Info, len(cities)) // res[i] 对应 cities[i]
	sem := make(chan struct{}, 10)
	for i, city := range cities {
		i, city := i, city // 为下面的闭包创建本地变量
		sem <- struct{}{}  // 任务开始
		g.Go(func() error {
			info, err := City(city)
			mu.Lock()
			res[i] = info
			mu.Unlock()
			<-sem // 任务结束
			return err
		})
	}
	if err := g.Wait(); err != nil {
		return nil, err
	}
	return res, nil
}

加权有界并发

最后,有时你需要有限的并发性,但并非所有任务都有同样高昂的成本。在这种情况下,我们消耗的资源量将根据廉价和昂贵任务的分配,以及它们的启动方式而有很大的差异。

对于此用例,更好的解决方案是使用 加权有界并发。它的工作原理很简单:我们不需要推测想要同时运行的任务数量,而是为每个任务计算一个“成本”,并从信号量中获取和释放该成本。

我们不能再用通道来模拟这一点了,因为我们需要立即获取和释放全部成本。幸运的是,“扩展标准库”再次拯救了我们!golang.org/x/sync/sempahore 包正是为此目的提供了一个加权信号量的实现。

sem <- struct{}{} 操作称为“获取”(Acquire),<-sem 操作称为“释放”(Release)。你会注意到 semaphore.Acquire 方法返回一个错误,这是因为它可以与 context 包一起使用,以便提前中止操作。出于本示例的目的,我们将忽略它。

天气查询示例实际上过于简单,无法保证加权信号量,但为了简单起见,我们假设成本随城市名称的长度而变化。
然后我们得到以下结果:

func Cities(cities ...string) ([]*Info, error) {
	ctx := context.TODO() // 替换为实际上下文
	var g errgroup.Group
	var mu sync.Mutex
	res := make([]*Info, len(cities)) // res[i] 对应 cities[i]
	sem := semaphore.NewWeighted(100) // 同时处理100个字符
	for i, city := range cities {
		i, city := i, city       // 为下面的闭包创建本地变量
		cost := int64(len(city)) // 计算成本
		if err := sem.Acquire(ctx, cost); err != nil {
			break
		}
		g.Go(func() error {
			info, err := City(city)
			mu.Lock()
			res[i] = info
			mu.Unlock()
			sem.Release(cost)
			return err
		})
	}
	if err := g.Wait(); err != nil {
		return nil, err
	} else if err := ctx.Err(); err != nil {
		return nil, err
	}
	return res, nil
}

总结

上面的示例展示了向 Go 程序添加并发性,然后根据需求对其进行微调是多么容易。


内容声明
标题: 【译文】Go 高级并发
链接: https://zixizixi.cn/advanced-go-concurrency 来源: iTanken
本作品采用知识共享署名-相同方式共享 4.0 国际许可协议进行许可,转载请保留此声明