使用run.Group管理协程,实现异步任务优雅启停

使用run.Group管理协程,实现异步任务优雅启停

背景

在日常的业务开发中我们可能会遇见这样的情形,主业务程序不间断运行,异步任务(消费 MQ 等)可以通过一些方式(例如接口,配置中心等)手动控制启停,在异步任务启停的的同时,不能影响主业务线程的运行。有时候,这两种类型的任务是被编排在同一个 Go 项目中的,此时我们可以使用 run.Group 和 context 来实现异步任务的优雅启停,而不影响主业务线程

run.Group

run.Group 是一个由 oklog/run 包提供的 Go 工具,用于管理多个 Goroutine 的启动和优雅关闭流程。它允许你注册多个任务,并在以下几种情况下统一控制其生命周期:

  1. 启动多个并发任务run.Group 能添加多个带启动和停止功能的任务。每个任务包括一个启动函数和一个终止函数。
  2. 自动优雅退出run.Group 会等待直到一个任务返回错误或上下文信号(如 SIGINT、SIGTERM)触发时,调用所有任务的终止函数来优雅地关闭程序。
  3. 控制并发和清理:你可以用 Add 方法将任务注册到 Group,并确保这些任务会按顺序执行和安全清理。

run.Group 的基本用法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
"fmt"
"os"
"os/signal"
"syscall"
"time"

"github.com/oklog/run"
)

func main() {
var g run.Group

// 添加第一个任务
g.Add(func() error {
fmt.Println("Task 1 is running")
time.Sleep(5 * time.Second)
return nil // 返回 nil 表示正常结束
}, func(error) {
fmt.Println("Cleaning up Task 1")
})

// 添加第二个任务,等待退出信号
g.Add(func() error {
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
fmt.Println("Waiting for exit signal")
<-sig
return nil
}, func(error) {
fmt.Println("Cleaning up after exit signal")
})

// 运行所有任务并等待结束
if err := g.Run(); err != nil {
fmt.Println("Error:", err)
}
}

在 run.Group 编排的任务中,如果其中一个任务出现了问题发生了停止,则会导致同一个编排组里的所有任务都停止。为了实现背景中的需求,我们可以向 run.Group 中注册一个管理异步任务的服务,并将异步任务的启停开关和需要启动的协程数量放在配置中心中(如 apollo),由这个服务来实时监听配置中心的变动,并通过向协程中传递 context 来实时通知协程的启动和停止。以此,来实现在不影响主线程的前提下,异步任务的优雅启停

终止信号的创建和捕获

向 Goroutine 中传递一个带有取消功能的 context(例如 context.WithCancel()、context.WithTimeout()、context.WithDealine() 等),该 context 可以返回一个 ctx 和一个取消函数 cancel。同时,在创建 Goroutine 的时候将该 ctx 传入,并使用 select 来阻塞监听。select 会监听多个 channel,当 select 语句执行的时候,它会一直阻塞,直到有一个 case 语句可以执行,即有一个通道中有数据。context 对象的 Done()方法也会返回一个 channel,取消信号就是通过这个 channel 来传递的,因此我们可以在 Goroutine 内部监听这个 channel,一旦通道中有数据吗,说明上下文被取消,因此就可以及时终止 Goroutine 的执行

示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ctx, cancel := context.WithCancel(context.Background())
go func(ctx context.Context) {
for {
select {
// 调用cancel函数后,这里将能够收到通知
case <-ctx.Done():
return
default:
// do something
}
}
}(ctx)
// 在需要关闭协程时调用cancel方法发送取消信号
cancel()

业务代码整合

回到正题中,我们的业务系统使用的是 run.Group 编排的读个服务,因此我们需要将异步消费的逻辑整合进一个服务中,并注册到 run.Group 中

一、创建一个通用的管理异步任务的服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
type ConsumerManager struct {
consumerTaskName string
ctx context.Context
cancel context.CancelFunc
RabbitMQ *rabbitmq.Connection
handleMsgFunc func(ctx context.Context, msg amqp091.Delivery)
running bool
mu sync.Mutex
}

// Start 启动 Goroutine 的方法,接收要启动的 goroutine 数量和要监听的队列
func (m *ConsumerManager) Start(consumerCount int, queueName string) error {
m.mu.Lock()
defer m.mu.Unlock()
if m.running {
fmt.Printf("[Async Consumer Service] %s is already running\n", m.consumerTaskName)
return nil
}

deliveries, err := m.RabbitMQ.Consume(queueName)
if err != nil {
fmt.Printf("[Async Consumer Service] Failed to register "+m.consumerTaskName+" MQ consumer, err: %s \n", err.Error())
return err
}
// 启动消费者 goroutines
m.running = true
for i := 0; i < consumerCount; i++ {
go func() {
for {
select {
case msg, ok := <-deliveries:
if !ok {
return
}
// 处理消息
m.handleMsgFunc(m.ctx, msg)
case <-m.ctx.Done():
return
}
}
}()
}
return nil
}

// Stop 停止消费者服务
func (m *ConsumerManager) Stop() {
m.mu.Lock()
defer m.mu.Unlock()

if !m.running {
fmt.Printf("[Async Consumer Service] %s is not running\n", m.consumerTaskName)
return
}

m.running = false
m.cancel() // 调用 cancel 方法通知所有 goroutine 退出
// reset context
m.ctx, m.cancel = context.WithCancel(context.Background())
}

// WatchConfig 实时监听配置变更
func (m *ConsumerManager) WatchConfig(queueName string, consumerCount int) {
go func() {
for {
select {
case <-m.ctx.Done():
return
// 5S 监听一次变更
case <-time.After(5 * time.Second):
isEnabled := CheckApolloConfig("service", "task", m.consumerTaskName)
m.mu.Lock()
if isEnabled && !m.running {
fmt.Printf("[Async Consumer Service] Start %s\n", m.consumerTaskName)
m.mu.Unlock()
_ = m.Start(consumerCount, queueName)
} else if !isEnabled && m.running {
fmt.Printf("[Async Consumer Service] Stop %s\n", m.consumerTaskName)
m.mu.Unlock()
m.Stop()
} else {
m.mu.Unlock()
}
}
}
}()
}

func (m *ConsumerManager) CloseMqConn(ctx context.Context) {
fmt.Printf("[Async Consumer Service] close MQ conn\n")
m.RabbitMQ.Close()
}

// check apollo config
func CheckApolloConfig(namespace, key, task string) bool {
// 这里以 apollo 为例,检查 apollo 中对应的配置变更
// todo
}


func NewConsumerManager(ctx context.Context, mq *rabbitmq.Connection, consumerTaskName string, handleMsgFunc func(ctx context.Context, msg amqp091.Delivery)) *ConsumerManager {
managerCtx, cancelFunc := context.WithCancel(ctx)
return &ConsumerManager{
RabbitMQ: mq,
ctx: managerCtx,
cancel: cancelFunc,
consumerTaskName: consumerTaskName,
handleMsgFunc: handleMsgFunc,
}
}

二、注册服务

编写实际的处理函数 HandleMessage,并将服务注册到 run.Group 中,交由 run.Group 管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// handle an async sms message
func HandleMessage(ctx context.Context, msg amqp091.Delivery) {
// 这里是实际的消息处理逻辑
}

// 将服务注册到 run.Group中
func AsyncSmsConsumerManagerServer(wg *run.Group) {
ctx, cancel := context.WithCancel(context.Background())
manager := async_task.NewConsumerManager(ctx, util.SmsMQ, consts.AsyncSmsTask, HandleMessage)
wg.Add(func() error {
// 这里假设开启了 5 个 goroutine 去处理异步消息,实际可将改配置放到配置中心中,来实现动态管理
manager.WatchConfig(consts.AsyncSmsQueue, 5)
<-ctx.Done() // 阻塞当前 goroutine,因为 watchConfig 里面没有阻塞的代码,所以在这里设置一个阻塞,防止工作函数退出
return nil
}, func(err error) {
manager.Stop()
cancel()
})
}

这样,通过创建一个异步管理一批 Goruntine 的服务,并将该服务注册到 run.Group 中,即可实现在不影响主线程的前提下优雅启停异步任务


使用run.Group管理协程,实现异步任务优雅启停
https://wuwanhao.github.io/2025/01/13/使用 run.Group管理协程,实现异步任务的优雅启停/
作者
Wuuu
发布于
2025年1月13日
许可协议