使用run.Group管理协程,实现异步任务优雅启停
使用run.Group管理协程,实现异步任务优雅启停
背景
在日常的业务开发中我们可能会遇见这样的情形,主业务程序不间断运行,异步任务(消费 MQ 等)可以通过一些方式(例如接口,配置中心等)手动控制启停,在异步任务启停的的同时,不能影响主业务线程的运行。有时候,这两种类型的任务是被编排在同一个 Go 项目中的,此时我们可以使用 run.Group 和 context 来实现异步任务的优雅启停,而不影响主业务线程
run.Group
run.Group
是一个由 oklog/run 包提供的 Go 工具,用于管理多个 Goroutine 的启动和优雅关闭流程。它允许你注册多个任务,并在以下几种情况下统一控制其生命周期:
- 启动多个并发任务:
run.Group
能添加多个带启动和停止功能的任务。每个任务包括一个启动函数和一个终止函数。 - 自动优雅退出:
run.Group
会等待直到一个任务返回错误或上下文信号(如 SIGINT、SIGTERM)触发时,调用所有任务的终止函数来优雅地关闭程序。 - 控制并发和清理:你可以用
Add
方法将任务注册到Group
,并确保这些任务会按顺序执行和安全清理。
run.Group 的基本用法如下:
1 |
|
在 run.Group 编排的任务中,如果其中一个任务出现了问题发生了停止,则会导致同一个编排组里的所有任务都停止。为了实现背景中的需求,我们可以向 run.Group 中注册一个管理异步任务的服务,并将异步任务的启停开关和需要启动的协程数量放在配置中心中(如 apollo),由这个服务来实时监听配置中心的变动,并通过向协程中传递 context 来实时通知协程的启动和停止。以此,来实现在不影响主线程的前提下,异步任务的优雅启停
终止信号的创建和捕获
向 Goroutine 中传递一个带有取消功能的 context(例如 context.WithCancel()、context.WithTimeout()、context.WithDealine() 等),该 context 可以返回一个 ctx 和一个取消函数 cancel。同时,在创建 Goroutine 的时候将该 ctx 传入,并使用 select 来阻塞监听。select 会监听多个 channel,当 select 语句执行的时候,它会一直阻塞,直到有一个 case 语句可以执行,即有一个通道中有数据。context 对象的 Done()方法也会返回一个 channel,取消信号就是通过这个 channel 来传递的,因此我们可以在 Goroutine 内部监听这个 channel,一旦通道中有数据吗,说明上下文被取消,因此就可以及时终止 Goroutine 的执行
示例代码如下:
1 |
|
业务代码整合
回到正题中,我们的业务系统使用的是 run.Group 编排的读个服务,因此我们需要将异步消费的逻辑整合进一个服务中,并注册到 run.Group 中
一、创建一个通用的管理异步任务的服务
1 |
|
二、注册服务
编写实际的处理函数 HandleMessage,并将服务注册到 run.Group 中,交由 run.Group 管理
1 |
|
这样,通过创建一个异步管理一批 Goruntine 的服务,并将该服务注册到 run.Group 中,即可实现在不影响主线程的前提下优雅启停异步任务