goroutine生命周期管理-errgroup和run

Go 语言有提供了多个包来将多个 goroutine 的生命周期编组管理。最简单的是标准库的 sync .WaitGroup,应用比较普遍的是 google 的 errgroup,Prometheus 用的是 oklog 的 run。下面学习后两个包的用法。

errgroup

errgroup 为作为一个任务中的多个子任务的多组 goroutine 提供了同步、错误传播和取消上下文。

Package errgroup provides synchronization, error propagation, and Context cancelation for groups of goroutines working on subtasks of a common task.

Group 结构体是一组 goroutine 的集合。零值 Group 是有效的,在 error 的时候不会 cancel。使用方法如下:

1. 提供错误处理的 sync.WaitGroup

WaitGroup 只管理 goroutine 同步,如果发生错误要自己处理,errgroup 提供了错误处理的能力,不过仅能记录多个 goroutine 中第一个出错的。

// 初始化 Group 实例作为组
g := new(errgroup.Group)

// 启动组中的多个 goroutine,第一次返回非空的 error 会 cancel 整个组并返回这个 error。
g.Go(func() error{})
g.Go(func() error{})

// 阻塞等待组完成,如果有错误,获取第一个 error
if err := g.Wait(); err == nil {
		fmt.Println("Successfully fetched all URLs.")
	}

2. 传入 context.Context

WithContext(ctx context.Context) 返回的 Group 在传入 Go 中的多个 goroutine 中第一次出错或第一次 Wait 返回时 cancel。

run

跟 errgroup 类似,run.Group 创建以后通过 Add() 方法添加 goroutine。其有两个函数参数,第一个是执行函数,第二个是中断函数,调用中断函数的时候会使执行函数返回。调用 Run(0) 函数的时候并发执行所有 goroutine,并阻塞等待知道第一个 goroutine 退出,调用中断函数,待全部 goroutine 返回后返回到调用者。run 与 errgroup 相似,除了它不需要 goroutine 理解 context 语义。

示例1

package main

import (
	"errors"
	"fmt"
	"time"

	"github.com/oklog/run"
)

func main() {
	var g run.Group
	{
		cancel := make(chan struct{})
		g.Add(func() error {
			for {
				select {
				case <-time.After(time.Second):
					fmt.Println("the first actor had its time elapsed 1 second")
					// return nil
				case <-cancel:
					fmt.Println("The first actor was canceled")
					return nil
				}
			}
		}, func(err error) {
			fmt.Printf("the first actor was interrupted with: %v\n", err)
			close(cancel)
		})
	}
	{
		g.Add(func() error {
			time.Sleep(2 * time.Second)
			fmt.Println("the second actor is returning after 2 second")
			return errors.New("second actor teardown")
		}, func(err error) {
			// 在对应的执行函数返回后,这个取消函数也会被执行
			fmt.Printf("the second actor was interrupted with %v\n", err)
		})
	}
	g.Run()
}
$ go run main.go
the first actor had its time elapsed 1 second
the second actor is returning after 2 second
the first actor was interrupted with: second actor teardown
the second actor was interrupted with second actor teardown
The first actor was canceled

run.Group 中每个 Add 的成员是由执行函数和取消函数组成的一对函数,某一个执行函数返回错误以后,将执行它对应的取消函数,同时其他的取消函数。

示例2 Context

package main

import (
	"context"
	"fmt"

	"github.com/oklog/run"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	var g run.Group
	{
		ctx1, cancel := context.WithCancel(ctx)
		g.Add(func() error {
			return runUntilCanceled(ctx1)
		}, func(error) {
			cancel()
		})
	}
	go cancel()
	fmt.Printf("The group was terminated with: %v\n", g.Run())
}

func runUntilCanceled(ctx context.Context) error {
	<-ctx.Done()
	return ctx.Err()
}
The group was terminated with: context canceled
上一篇:Golang 之禅


下一篇:理解go中goroutine的GPM