使用 run.Group 编排 Goroutine

run.Group 是一个由 oklog/run 包提供的 Go 工具,用于管理多个 Goroutine 的启动和优雅关闭流程。它允许你注册多个任务,并在以下几种情况下统一控制其生命周期:

  1. 启动多个并发任务run.Group 能添加多个带启动和停止功能的任务。每个任务包括一个启动函数和一个终止函数。
  2. 自动优雅退出run.Group 会等待直到一个任务返回错误或上下文信号(如 SIGINT、SIGTERM)触发时,调用所有任务的终止函数来优雅地关闭程序。
  3. 控制并发和清理:你可以用 Add 方法将任务注册到 Group,并确保这些任务会按顺序执行和安全清理。

以下是一个简单的使用 run.Group 的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
"fmt"
"os"
"os/signal"
"syscall"
"time"

"github.com/oklog/run"
)

func main() {
var g run.Group

// 添加第一个任务
g.Add(func() error {
fmt.Println("Task 1 is running")
time.Sleep(5 * time.Second)
return nil // 返回 nil 表示正常结束
}, func(error) {
fmt.Println("Cleaning up Task 1")
})

// 添加第二个任务,等待退出信号
g.Add(func() error {
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
fmt.Println("Waiting for exit signal")
<-sig
return nil
}, func(error) {
fmt.Println("Cleaning up after exit signal")
})

// 运行所有任务并等待结束
if err := g.Run(); err != nil {
fmt.Println("Error:", err)
}
}

解释

  • **g.Add(startFunc, stopFunc)**Add 方法将一个任务添加到 Group,接受两个参数:
    • startFunc:启动任务的函数。
    • stopFunc:在任务停止时调用的清理函数。
  • 优雅退出任何一个 <font style="color:#DF2A3F;">startFunc</font> 返回后,<font style="color:#DF2A3F;">run.Group</font> 会依次调用所有注册的 <font style="color:#DF2A3F;">stopFunc</font> 函数,以实现安全退出。

使用场景

  • 管理多服务、处理任务和系统信号(如 HTTP 服务器、消息队列消费者、或监控任务),使它们统一启动和关闭。

以下是一些使用 run.Group 的详细示例,展示如何管理多个并发任务:

示例 1:管理多个 HTTP 服务器

在这个例子中,我们启动了两个 HTTP 服务器,一个用于主应用流量,另一个用于监控数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package main

import (
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/oklog/run"
)

func main() {
var g run.Group

// 添加主 HTTP 服务器
{
server := &http.Server{Addr: ":8080", Handler: http.DefaultServeMux}
g.Add(func() error {
fmt.Println("Starting main server on :8080")
return server.ListenAndServe()
}, func(error) {
fmt.Println("Shutting down main server")
server.Close()
})
}

// 添加监控 HTTP 服务器
{
metricsServer := &http.Server{Addr: ":9090", Handler: http.DefaultServeMux}
g.Add(func() error {
fmt.Println("Starting metrics server on :9090")
return metricsServer.ListenAndServe()
}, func(error) {
fmt.Println("Shutting down metrics server")
metricsServer.Close()
})
}

// 添加捕获退出信号
{
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
g.Add(func() error {
fmt.Println("Waiting for termination signal...")
<-sigChan
return nil
}, func(error) {
fmt.Println("Received termination signal, shutting down gracefully...")
})
}

// 运行所有任务并等待退出
if err := g.Run(); err != nil {
fmt.Println("Error:", err)
}
}

解释:

  • 添加服务器任务:使用 g.Add 启动和停止 HTTP 服务器。g.Add 接受两个参数:第一个是启动函数,第二个是停止(清理)函数。
  • 捕获退出信号:使用 signal.Notify 来捕获操作系统的终止信号(如 SIGINTSIGTERM),并优雅地关闭服务器。
  • 优雅退出:当任一任务返回错误(如服务器失败),run.Group 会触发所有已注册的停止函数,确保资源被释放,避免进程僵死。

示例 2:启动多个消息队列消费者

如果你需要同时运行多个消息队列消费者,你可以用 run.Group 来管理它们。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
"fmt"
"time"
"github.com/oklog/run"
)

func main() {
var g run.Group

// 添加多个消费者
for i := 0; i < 3; i++ {
consumerID := i + 1
g.Add(func() error {
fmt.Printf("Consumer %d started\n", consumerID)
// 模拟消费者工作
time.Sleep(10 * time.Second)
return nil
}, func(error) {
fmt.Printf("Consumer %d stopped\n", consumerID)
})
}

// 运行消费者
if err := g.Run(); err != nil {
fmt.Println("Error:", err)
}
}

解释:

  • 多个消费者:我们启动了三个消费者,每个消费者独立运行,并处理消息。每个消费者都注册了启动和停止函数。
  • 统一管理:通过 run.Group,所有消费者可以统一管理,确保当任何一个消费者任务退出时,其他任务也可以按顺序停止。

示例 3:整合 Web 服务和 异步消费 MQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package main

import (
"context"
"fmt"
"github.com/gin-gonic/gin"
"github.com/oklog/run"
"net/http"
"time"
)

// 使用 run.Group 编排多个服务
func main() {
var g run.Group

// Web 服务
startWebServer(&g)

// 异步消费服务
startMQConsumer(&g)

// 启动所有服务
if err := g.Run(); err != nil {
fmt.Println("服务异常终止:", err)
}
}

func startWebServer(g *run.Group) {
app := gin.New()
app.GET("/aaa", func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{
"code": 200,
"msg": "success",
})
})
srv := &http.Server{
Addr: ":15672", // 此处设置一个已经被占用的端口,模拟 Web server 启动失败的情况
Handler: app,
}

g.Add(func() error {
fmt.Println("启动 Web 服务...")
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
fmt.Println("Web 服务启动失败:", err)
return err
}
return nil
}, func(err error) {
fmt.Println("停止 Web 服务...")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
srv.Shutdown(ctx)
})
}

func startMQConsumer(g *run.Group) {
server := newAsyncSmsServer()
ctx, cancel := context.WithCancel(context.Background())

g.Add(func() error {
fmt.Println("启动 MQ 消费服务...")
return server.StartConsumeMessage(ctx, 3)
}, func(err error) {
fmt.Println("停止 MQ 消费服务...")
cancel()
})
}

type AsyncConsumerMessage struct {
}

func (a *AsyncConsumerMessage) StartConsumeMessage(ctx context.Context, consumerCount int) error {
done := make(chan bool)
// 启动三个模拟消费MQ 的协程
for i := 0; i < consumerCount; i++ {
go func(i int) {
for {
select {
case <-done:
fmt.Println(i, ":接收到退出信号,优雅退出")
return
default:
fmt.Println(i, ":执行业务操作....")
time.Sleep(1 * time.Second) // 防止过快打印
}
}
}(i)
}

// 阻塞直到 context 被取消或超时
<-ctx.Done()
close(done)
return nil
}

func newAsyncSmsServer() *AsyncConsumerMessage {
return &AsyncConsumerMessage{}
}

解释

整合了 Web 服务和一个异步消费 MQ 的服务,在Web 服务启动的时候使其使用一个已被占用的端口,来模拟其中一个服务失败,其他服务优雅退出的情形

程序启动过程中打印日志如下

1
2
3
4
5
6
7
8
9
10
11
[GIN-debug] GET    /aaa                      --> main.startWebServer.func1 (1 handlers)
启动 MQ 消费服务...
2 :执行业务操作....
0 :执行业务操作....
1 :执行业务操作....
启动 Web 服务...
Web 服务启动失败: listen tcp :15672: bind: address already in use
停止 Web 服务...
停止 MQ 消费服务...
服务异常终止: listen tcp :15672: bind: address already in use

总结

run.Group 是一个非常强大的工具,适用于管理多个并发任务。它能够保证任务按顺序启动,并确保在出现错误或收到退出信号时优雅关闭所有任务。


使用 run.Group 编排 Goroutine
https://wuwanhao.github.io/2024/11/14/Goroutine编排/
作者
Wuuu
发布于
2024年11月14日
许可协议