run.Group
是一个由 oklog/run 包提供的 Go 工具,用于管理多个 Goroutine 的启动和优雅关闭流程。它允许你注册多个任务,并在以下几种情况下统一控制其生命周期:
启动多个并发任务 :run.Group
能添加多个带启动和停止功能的任务。每个任务包括一个启动函数和一个终止函数。
自动优雅退出 :run.Group
会等待直到一个任务返回错误或上下文信号(如 SIGINT、SIGTERM)触发时,调用所有任务的终止函数来优雅地关闭程序。
控制并发和清理 :你可以用 Add
方法将任务注册到 Group
,并确保这些任务会按顺序执行和安全清理。
以下是一个简单的使用 run.Group 的例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 package mainimport ( "fmt" "os" "os/signal" "syscall" "time" "github.com/oklog/run" )func main () { var g run.Group g.Add(func () error { fmt.Println("Task 1 is running" ) time.Sleep(5 * time.Second) return nil }, func (error ) { fmt.Println("Cleaning up Task 1" ) }) g.Add(func () error { sig := make (chan os.Signal, 1 ) signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) fmt.Println("Waiting for exit signal" ) <-sig return nil }, func (error ) { fmt.Println("Cleaning up after exit signal" ) }) if err := g.Run(); err != nil { fmt.Println("Error:" , err) } }
解释
**g.Add(startFunc, stopFunc)**
:Add
方法将一个任务添加到 Group
,接受两个参数:
startFunc
:启动任务的函数。
stopFunc
:在任务停止时调用的清理函数。
优雅退出 :任何一个 <font style="color:#DF2A3F;">startFunc</font>
返回后, <font style="color:#DF2A3F;">run.Group</font>
会依次调用所有注册的 <font style="color:#DF2A3F;">stopFunc</font>
函数,以实现安全退出。
使用场景
管理多服务、处理任务和系统信号(如 HTTP 服务器、消息队列消费者、或监控任务),使它们统一启动和关闭。
以下是一些使用 run.Group
的详细示例,展示如何管理多个并发任务:
示例 1:管理多个 HTTP 服务器 在这个例子中,我们启动了两个 HTTP 服务器,一个用于主应用流量,另一个用于监控数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 package mainimport ( "fmt" "net/http" "os" "os/signal" "syscall" "time" "github.com/oklog/run" )func main () { var g run.Group { server := &http.Server{Addr: ":8080" , Handler: http.DefaultServeMux} g.Add(func () error { fmt.Println("Starting main server on :8080" ) return server.ListenAndServe() }, func (error ) { fmt.Println("Shutting down main server" ) server.Close() }) } { metricsServer := &http.Server{Addr: ":9090" , Handler: http.DefaultServeMux} g.Add(func () error { fmt.Println("Starting metrics server on :9090" ) return metricsServer.ListenAndServe() }, func (error ) { fmt.Println("Shutting down metrics server" ) metricsServer.Close() }) } { sigChan := make (chan os.Signal, 1 ) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) g.Add(func () error { fmt.Println("Waiting for termination signal..." ) <-sigChan return nil }, func (error ) { fmt.Println("Received termination signal, shutting down gracefully..." ) }) } if err := g.Run(); err != nil { fmt.Println("Error:" , err) } }
解释:
添加服务器任务 :使用 g.Add
启动和停止 HTTP 服务器。g.Add
接受两个参数:第一个是启动函数,第二个是停止(清理)函数。
捕获退出信号 :使用 signal.Notify
来捕获操作系统的终止信号(如 SIGINT
或 SIGTERM
),并优雅地关闭服务器。
优雅退出 :当任一任务返回错误(如服务器失败),run.Group
会触发所有已注册的停止函数,确保资源被释放,避免进程僵死。
示例 2:启动多个消息队列消费者 如果你需要同时运行多个消息队列消费者,你可以用 run.Group
来管理它们。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 package mainimport ( "fmt" "time" "github.com/oklog/run" )func main () { var g run.Group for i := 0 ; i < 3 ; i++ { consumerID := i + 1 g.Add(func () error { fmt.Printf("Consumer %d started\n" , consumerID) time.Sleep(10 * time.Second) return nil }, func (error ) { fmt.Printf("Consumer %d stopped\n" , consumerID) }) } if err := g.Run(); err != nil { fmt.Println("Error:" , err) } }
解释:
多个消费者 :我们启动了三个消费者,每个消费者独立运行,并处理消息。每个消费者都注册了启动和停止函数。
统一管理 :通过 run.Group
,所有消费者可以统一管理,确保当任何一个消费者任务退出时,其他任务也可以按顺序停止。
示例 3:整合 Web 服务和 异步消费 MQ 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 package mainimport ( "context" "fmt" "github.com/gin-gonic/gin" "github.com/oklog/run" "net/http" "time" )func main () { var g run.Group startWebServer(&g) startMQConsumer(&g) if err := g.Run(); err != nil { fmt.Println("服务异常终止:" , err) } }func startWebServer (g *run.Group) { app := gin.New() app.GET("/aaa" , func (c *gin.Context) { c.JSON(http.StatusOK, gin.H{ "code" : 200 , "msg" : "success" , }) }) srv := &http.Server{ Addr: ":15672" , Handler: app, } g.Add(func () error { fmt.Println("启动 Web 服务..." ) if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { fmt.Println("Web 服务启动失败:" , err) return err } return nil }, func (err error ) { fmt.Println("停止 Web 服务..." ) ctx, cancel := context.WithTimeout(context.Background(), 5 *time.Second) defer cancel() srv.Shutdown(ctx) }) }func startMQConsumer (g *run.Group) { server := newAsyncSmsServer() ctx, cancel := context.WithCancel(context.Background()) g.Add(func () error { fmt.Println("启动 MQ 消费服务..." ) return server.StartConsumeMessage(ctx, 3 ) }, func (err error ) { fmt.Println("停止 MQ 消费服务..." ) cancel() }) }type AsyncConsumerMessage struct { }func (a *AsyncConsumerMessage) StartConsumeMessage(ctx context.Context, consumerCount int ) error { done := make (chan bool ) for i := 0 ; i < consumerCount; i++ { go func (i int ) { for { select { case <-done: fmt.Println(i, ":接收到退出信号,优雅退出" ) return default : fmt.Println(i, ":执行业务操作...." ) time.Sleep(1 * time.Second) } } }(i) } <-ctx.Done() close (done) return nil }func newAsyncSmsServer () *AsyncConsumerMessage { return &AsyncConsumerMessage{} }
解释 整合了 Web 服务和一个异步消费 MQ 的服务,在Web 服务启动的时候使其使用一个已被占用的端口,来模拟其中一个服务失败,其他服务优雅退出的情形
程序启动过程中打印日志如下
1 2 3 4 5 6 7 8 9 10 11 [GIN-debug] GET /aaa --> main.startWebServer.func1 (1 handlers) 启动 MQ 消费服务... 2 :执行业务操作.... 0 :执行业务操作.... 1 :执行业务操作.... 启动 Web 服务... Web 服务启动失败: listen tcp :15672: bind: address already in use 停止 Web 服务... 停止 MQ 消费服务... 服务异常终止: listen tcp :15672: bind: address already in use
总结 run.Group
是一个非常强大的工具,适用于管理多个并发任务。它能够保证任务按顺序启动,并确保在出现错误或收到退出信号时优雅关闭所有任务。