Go语言并发秘籍:掌握上下文context存储与并发控制之道
概述
在 Go 语言的并发编程中,context
包提供了一个极为重要的机制,用于在多个 goroutine 之间传播控制信号(例如取消信号、超时/截止日期)以及“请求级”数据(例如用户 ID、Trace ID)。本文将从 context
的基础概念入手,结合代码示例、ASCII 图解和详细说明,带你系统掌握如何使用 context
存储值、进行取消控制以及在并发场景下优雅地管理生命周期。通过掌握这些“并发秘籍”,你将能够写出更健壮、可控且容易维护的 Go 并发程序。
一、为什么需要 context
在典型的并发应用中,往往存在以下需求:
- 取消传播(Cancellation Propagation)
某个请求到达服务器后,可能触发多个子任务(goroutine)并发执行。如果用户或调用方超时或取消,所有相关的子任务都要及时响应并退出,避免资源浪费。 - 超时/截止日期(Timeout / Deadline)
为了保证系统的可控性,常常需要给一整条调用链或一组并发操作设置“最晚完成时间”。一旦超过这个时间,要关闭或放弃相应逻辑。 - 请求范围内的数据传递(Request-scoped Values)
比如在 Web 服务器场景中,为了统计日志、链路追踪,我们需要在整个请求上下文中传递诸如“TraceID”、“UserID”、“Locale”等信息,使各个层级或中间件都能访问到。
传统做法往往依赖全局变量或显式参数传递,既冗长又容易出错。Go 语言的 context
包正是为了解决上述问题而设计,通过**上下文(Context)**对象,将取消/超时信号与键值对“请求属性”捆绑在一起,一并传递给所有相关的 goroutine,实现统一管理。
二、context
基础概念与核心接口
2.1 Context
接口的定义
type Context interface {
// Done 返回一个只读 channel,表示上下文被取消或者过期时会关闭该 channel
Done() <-chan struct{}
// Err: 当且仅当 Done() 关闭后,Err() 会返回 “context.Canceled” 或 “context.DeadlineExceeded”
Err() error
// Deadline 返回上下文关联的截止时间(time.Time)和一个 bool,表示是否设置了截止日期
Deadline() (deadline time.Time, ok bool)
// Value 根据 key 返回与该 key 对应的值(如果不存在则返回 nil)
Value(key interface{}) interface{}
}
Done()
- 返回一个
<-chan struct{}
,当上下文被取消(被调用者调用Cancel()
)或者截止日期到达时,这个 channel 会被关闭。 - 通过
<-ctx.Done()
方式可以等待取消信号。
- 返回一个
Err()
当
Done()
关闭后,Err()
会返回具体的错误:context.Canceled
:显式调用取消函数(cancel()
)导致的取消;context.DeadlineExceeded
:截止日期到达或超时导致的取消。
Deadline()
- 返回上下文关联的截止日期和一个 bool(表示是否设置)。如果没有设置截止日期,
ok
为false
。
- 返回上下文关联的截止日期和一个 bool(表示是否设置)。如果没有设置截止日期,
Value(key)
- 返回在该上下文中存储的与
key
对应的值。常用于跨 API 边界传递“请求级”信息。 - 注意:
key
建议使用自定义类型,以避免与其他包冲突。
- 返回在该上下文中存储的与
2.2 context
的四种常见构造方式
标准库中提供了多种创建 Context
的函数,它们位于 context
包中:
context.Background()
- 返回一个空的根上下文,永远不会被取消,也没有值和截止日期。可以作为程序的根上下文 (root)。
- 在
main
函数、顶层测试(TestMain
)或初始化时使用。
context.TODO()
- 类似于
Background()
,但表明“这里还不知道使用什么上下文,后续再补充”。通常在原型或开发阶段用于占位。
- 类似于
context.WithCancel(parent Context)
- 基于
parent
创建一个可取消上下文,并返回新上下文ctx
以及一个取消函数cancelFunc
。 - 调用
cancelFunc()
会关闭ctx.Done()
,向其所有下游派生子上下文以及监视ctx.Done()
的 goroutine 发送取消信号。 原型:
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
- 基于
context.WithDeadline(parent Context, deadline time.Time)
- 基于
parent
创建一个带截止日期的上下文,返回新的ctx
和取消函数cancelFunc
。 - 当当前时间到达
deadline
时,自动调用cancelFunc()
,关闭ctx.Done()
并让Err()
返回DeadlineExceeded
。 原型:
func WithDeadline(parent Context, d time.Time) (ctx Context, cancel CancelFunc)
- 基于
context.WithTimeout(parent Context, timeout time.Duration)
- 语法糖,在内部调用了
WithDeadline(parent, time.Now().Add(timeout))
。 - 返回
ctx
和cancelFunc
,超时后与WithDeadline
行为一致。
- 语法糖,在内部调用了
context.WithValue(parent Context, key, val interface{})
- 基于
parent
创建一个能存储键值对的上下文,返回新的ctx
。 Value
操作会在当前ctx
的值和其所有父级上下文中按链向上查找。- 注意:不要用上下文存储大量数据或应该主动释放的对象,应仅用于传递轻量级请求范围内的数据,例如“认证令牌”、“TraceID”等。
- 基于
三、取消与超时管理:WithCancel
、WithTimeout
、WithDeadline
3.1 WithCancel
的使用
当需要让多个 goroutine 可以手动触发取消操作时,使用 WithCancel
最为直接。
package main
import (
"context"
"fmt"
"time"
)
func worker(ctx context.Context, id int) {
for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d: 收到取消信号, err=%v\n", id, ctx.Err())
return
default:
fmt.Printf("Worker %d: 正在工作...\n", id)
time.Sleep(500 * time.Millisecond)
}
}
}
func main() {
// 1. 创建带取消功能的上下文
ctx, cancel := context.WithCancel(context.Background())
// 2. 启动多个 worker
for i := 1; i <= 3; i++ {
go worker(ctx, i)
}
// 3. 运行一段时间后手动取消
time.Sleep(2 * time.Second)
fmt.Println("main: 调用 cancel() 取消所有 worker")
cancel()
// 4. 等待一段时间,观察程序退出
time.Sleep(1 * time.Second)
fmt.Println("main: 退出程序")
}
执行结果示例:
Worker 1: 正在工作...
Worker 2: 正在工作...
Worker 3: 正在工作...
Worker 1: 正在工作...
Worker 2: 正在工作...
Worker 3: 正在工作...
Worker 1: 正在工作...
Worker 2: 正在工作...
Worker 3: 正在工作...
main: 调用 cancel() 取消所有 worker
Worker 1: 收到取消信号, err=context.Canceled
Worker 3: 收到取消信号, err=context.Canceled
Worker 2: 收到取消信号, err=context.Canceled
main: 退出程序
WithCancel
返回的ctx
与cancel
形成一对,如果任意一处调用cancel()
,会关闭ctx.Done()
,下游所有监听ctx.Done()
的 goroutine 都会收到信号并退出。- 即使在多个 goroutine 中使用同一个
ctx
,只要调用一次cancel()
,所有 goroutine 都会“广播”收到取消通知。
3.2 WithTimeout
与 WithDeadline
当你想让操作在指定时间后自动超时并取消时,可以使用 WithTimeout
或 WithDeadline
。
package main
import (
"context"
"fmt"
"time"
)
func doWork(ctx context.Context) {
select {
case <-time.After(2 * time.Second):
fmt.Println("任务完成")
case <-ctx.Done():
fmt.Println("任务被取消: err =", ctx.Err())
}
}
func main() {
// 1. 设置 1 秒超时
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
// 2. 启动任务
doWork(ctx)
// 3. 等待
fmt.Println("main: 结束")
}
执行结果:
任务被取消: err = context.DeadlineExceeded
main: 结束
- 在上面示例中,
doWork
内部用select
同时等待 “模拟 2 秒完成的任务” 与ctx.Done()
。因为我们设置了 1 秒超时,time.After(2s)
先于超时完成之前,ctx.Done()
会优先被选择,从而打印 “任务被取消”。 WithDeadline
与此类似,只是你需要传入一个固定的时间点,而不是一个持续时间。
3.3 取消链(Cancellation Propagation)示意图
当你从一个根上下文(context.Background()
)依次创建多个派生上下文时,取消信号会沿着**“值链”**向下传播。下图为简单示意(ASCII 画图):
Root (没有取消)
│
ctx1,cancel1 := WithCancel(Root)
│
┌─────────┴─────────┐
│ │
ctx2,cancel2 := ctx3,cancel3 :=
WithCancel(ctx1) WithTimeout(ctx1, 5s)
│ │
┌───┴───┐ ┌───┴───┐
│ │ │ │
子任务A 子任务B 子任务C 子任务D
节点含义
Root
:根上下文,不会被取消;ctx1
:第一级派生,可通过调用cancel1()
进行取消;ctx2
、ctx3
:第二级派生,分别基于不同场景创建,可手动取消或自动超时;- 底层的子任务(goroutine)都共享相应的
ctx
,并监听ctx.Done()
。
取消流程
- 如果调用了
cancel1()
,则ctx1.Done()
关闭,下游所有基于ctx1
或其子上下文(ctx2
/ctx3
)的Done()
也会立即关闭。 - 如果
ctx3
因超时到期而自行取消,仅会关闭ctx3.Done()
及其子上下文,ctx1
、ctx2
不受影响。
- 如果调用了
Context 链式调用示意(伪代码):
root := context.Background() ctx1, cancel1 := context.WithCancel(root) ctx2, cancel2 := context.WithCancel(ctx1) ctx3, cancel3 := context.WithTimeout(ctx1, 5*time.Second) // 子任务 A、B 监听 ctx2.Done() go taskA(ctx2) go taskB(ctx2) // 子任务 C、D 监听 ctx3.Done() go taskC(ctx3) go taskD(ctx3) // …… 若此时调用 cancel1(),则所有 taskA/B/C/D 都会被取消 // 若 ctx3 超时,则仅 taskC、taskD 被取消,taskA、taskB 不受影响
四、在 context
中存储与获取值(Value)
4.1 WithValue
的使用场景与注意事项
WithValue
允许你在上下文中附带轻量级键值对,以便在函数调用链或多个 goroutine 间传递一些“请求级”信息。常见用途包括:
- 链路追踪 ID(TraceID)
- 认证信息(UserID、Token)
- 日志记录字段(RequestID)
- 本地化信息(Locale)
注意事项:
- 尽量仅用于传递“只读”数据,且对性能开销敏感的场景。不要把上下文当成“通用 map”,避免存储大量数据或可变数据。
Key 应使用自定义类型,例如:
type userKey struct{}
再这样使用:
ctx = context.WithValue(ctx, userKey{}, "Tom")
这样可以避免不同包之间“key 名称冲突”。
4.2 简单示例:请求链路中传递 TraceID
下面模拟一个“HTTP 请求处理链”,在顶层生成一个 TraceID
,并通过 context.WithValue
传递给下层中间件或处理器。
package main
import (
"context"
"fmt"
"time"
)
type ctxKey string
const (
traceIDKey ctxKey = "traceID"
)
// 第一级:创建带 TraceID 的上下文
func handler() {
// 生成 TraceID(此处简化为时间戳字符串)
tid := fmt.Sprintf("trace-%d", time.Now().UnixNano())
ctx := context.WithValue(context.Background(), traceIDKey, tid)
fmt.Println("handler: TraceID =", tid)
// 调用下级服务
svcA(ctx)
}
// 第二级:某个微服务 A
func svcA(ctx context.Context) {
// 从 ctx 中取 TraceID
tid := ctx.Value(traceIDKey).(string)
fmt.Println("svcA: 拿到 TraceID =", tid)
// 传给下一级
svcB(ctx)
}
// 第三级:微服务 B
func svcB(ctx context.Context) {
// 仍然可以取到同一个 TraceID
tid := ctx.Value(traceIDKey).(string)
fmt.Println("svcB: 继续使用 TraceID =", tid)
}
func main() {
handler()
}
输出示例:
handler: TraceID = trace-1612345678901234567
svcA: 拿到 TraceID = trace-1612345678901234567
svcB: 继续使用 TraceID = trace-1612345678901234567
- 在
handler()
函数中,我们通过context.WithValue
在ctx
中存储了traceIDKey
对应的值。 - 之后传递
ctx
给svcA
、svcB
,它们可以随时通过ctx.Value(traceIDKey)
取到同一个TraceID
。
4.3 Value
查找规则
当调用
ctx.Value(key)
时,Go 运行时会沿着上下文继承链向上查找:- 首先检查当前
ctx
是否是通过WithValue
创建,如果是且 key 匹配,则返回对应的值。 - 否则继续检查当前
ctx
的父级ctx
,直到到达根上下文(Background()
或TODO()
)。 - 如果都没有找到,则返回
nil
。
- 首先检查当前
示意图(ASCII):
ctx0 = context.Background() // 根上下文 │ ctx1 = context.WithValue(ctx0, K1, V1) │ ctx2 = context.WithValue(ctx1, K2, V2) │ ctx3 = context.WithTimeout(ctx2, 1*time.Second)
当调用
ctx3.Value(K2)
时,查询链为:- ctx3 不是 valueCtx,跳过
- ctx2 是 valueCtx,key==K2 → 返回 V2
当调用
ctx3.Value(K1)
时,链依次为:- ctx3 → 跳过
- ctx2 → key 不匹配(K2 != K1)
- ctx1 → key==K1 → 返回 V1
五、并发场景下的 context
管理
在并发程序中,往往会按照一定模式启动多个 goroutine 并共享同一个 context
。常见模式包括:
- Fan-out / Fan-in 模式
- Worker Pool(工作池)
- Pipeline(管道)
- 组合超时与取消控制
下面通过示例演示如何结合 context
在这些场景中优雅地管理并发控制。
5.1 Fan-out / Fan-in 模式
场景示意:
- 主 goroutine 需要并发地向多个下游服务发起请求,并收集它们的结果。
- 如果主 goroutine 决定撤销整个操作,所有下游的 goroutine 必须停止,并及时清理资源。
5.1.1 代码示例
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)
// 模拟下游任务:随机耗时然后返回结果
func doTask(ctx context.Context, id int) (int, error) {
// 随机 100ms~800ms 之间
duration := time.Duration(100+rand.Intn(700)) * time.Millisecond
select {
case <-time.After(duration):
result := id * 10 // 举例计算
return result, nil
case <-ctx.Done():
return 0, ctx.Err() // 被取消或超时
}
}
// Fan-out 并发启动所有任务
func fanOut(ctx context.Context, taskCount int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
wg.Add(taskCount)
for i := 1; i <= taskCount; i++ {
go func(taskID int) {
defer wg.Done()
// 每个子任务都监听同一个 ctx
res, err := doTask(ctx, taskID)
if err != nil {
fmt.Printf("Task %d 取消: %v\n", taskID, err)
return
}
select {
case out <- res:
case <-ctx.Done():
// 如果主协程已取消,则不再发送
return
}
}(i)
}
// 当所有子任务完成后,关闭 out channel
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
rand.Seed(time.Now().UnixNano())
// 1. 创建带 500ms 超时的上下文
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
// 2. 并发执行 5 个子任务
results := fanOut(ctx, 5)
// 3. Fan-in:收集子任务结果
for r := range results {
fmt.Println("收到任务结果:", r)
}
fmt.Println("main: 所有处理完毕或已超时退出")
}
5.1.2 运行示例与解读
- 上述例子中,主 goroutine 使用
ctx, cancel := context.WithTimeout(...)
设置了500ms的超时时间。 fanOut
会并发启动 5 个子任务,每个子任务都会随机耗时 100\~800ms 不等。- 如果某个子任务在 500ms 内完成,就会通过
out
通道将结果发送给主 goroutine;否则会因监听到ctx.Done()
而被取消。 - 主 goroutine 在
for r := range results
中不断读取结果,直到results
通道关闭(即所有子任务都退出)。 - 最终,如果多数子任务超时被取消,则只会收到部分结果,其他任务在
doTask
内收到ctx.Err()
后直接返回,不再向out
发送。
示意 ASCII 图:
┌──────────────────────────────────┐
│ ctx: WithTimeout(500ms) │
└──────────────────────────────────┘
│
▼
┌──────────────────────────────────┐
│ Fan-out 阶段 │
│ 启动 Task1~Task5 并行 │
└──────────────────────────────────┘
↑ ↑ ↑ ↑ ↑
... ... ... ... ...
│ │ │ │ │
┌───────────────────┐ ┌───────────────────┐ ┌───────────────────┐
│ doTask(ctx, 1) │ │ doTask(ctx, 2) │ │ doTask(ctx, 3) │
│ (耗时 300ms) │ │ (耗时 600ms ) │ │ (耗时 200ms) │
└───────────────────┘ └───────────────────┘ └───────────────────┘
▲ ▲ ▲
result1→ out ←未完成取消 result3→ out
... ...
Task4/Task5
└───────────────────┘
│ Fan-in 阶段 │
│ 收集 out 通道结果 │
└──────────────────────────────────┘
- Task1(耗时 300ms)先完成,向
out
发送结果; - Task2(耗时 600ms)超出 500ms 超时,会先收到
<-ctx.Done()
,直接返回,不向out
发送; - Task3(耗时 200ms)先完成,向
out
发送; - 以此类推,最终
Task1
、Task3
、Task4
(如耗时 < 500ms)会成功,其他超时。
5.2 Worker Pool(工作池)模式
场景示意:
- 有大量任务需要处理,但我们希望限制同时进行的 goroutine 数量,以控制资源消耗。
- 并且希望可响应取消或超时信号,及时关闭所有 worker。
5.2.1 代码示例
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)
// 模拟工作任务:随机耗时 100~400ms 后返回结果
func process(ctx context.Context, id, data int) (int, error) {
duration := time.Duration(100+rand.Intn(300)) * time.Millisecond
select {
case <-time.After(duration):
return data * 2, nil // 举例返回 data*2
case <-ctx.Done():
return 0, ctx.Err()
}
}
func worker(ctx context.Context, id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case data, ok := <-jobs:
if !ok {
// jobs 通道被关闭,退出
return
}
res, err := process(ctx, id, data)
if err != nil {
fmt.Printf("Worker %d 任务被取消: %v\n", id, err)
return
}
select {
case results <- res:
case <-ctx.Done():
return
}
case <-ctx.Done():
// 上下文取消,直接退出
fmt.Printf("Worker %d: 收到全局取消信号\n", id)
return
}
}
}
func main() {
rand.Seed(time.Now().UnixNano())
// 1. 创建带超时的 Context
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
numWorkers := 5
jobs := make(chan int, 10)
results := make(chan int, 10)
// 2. 启动 Worker Pool
var wg sync.WaitGroup
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(ctx, i, jobs, results, &wg)
}
// 3. 发送任务
go func() {
for i := 1; i <= 20; i++ {
select {
case jobs <- i:
case <-ctx.Done():
// 如果上下文超时或取消,不再发送
return
}
}
close(jobs)
}()
// 4. 收集结果
go func() {
wg.Wait()
close(results)
}()
// 5. 主协程遍历 results
for r := range results {
fmt.Println("主: 收到结果:", r)
}
fmt.Println("main: 所有 worker 已结束或超时退出")
}
5.2.2 解读
整体流程
- 主协程创建了一个 1 秒超时的上下文
ctx
; - 启动 5 个
worker
,每个 worker 持有同一个ctx
,从jobs
通道中接收整数data
,模拟耗时处理后将结果写入results
; - 另一个 goroutine 向
jobs
通道发送 1\~20 的数字,若ctx.Done()
已关闭,则停止发送并退出; - Worker 在处理每个
data
时,也会监听ctx.Done()
,如果超时或被取消,会提前退出。 - 当所有 worker 退出后,关闭
results
通道,主协程在遍历results
后退出。
- 主协程创建了一个 1 秒超时的上下文
并发控制
- 这里用
jobs
缓冲区配合 5 个 worker 限制并发:最多只有 5 个 goroutine 同时从jobs
中取任务执行。 - 如果任务较多,但
ctx
在 1 秒内没取消完,worker
和发送者都会因为监听到ctx.Done()
而提前退出,避免因过多堆积而浪费资源。
- 这里用
取消流程
jobs
的发送者会因为<-ctx.Done()
导致停止发送并返回;- 同时,所有
worker
因监听到<-ctx.Done()
也会打印并返回,最终wg.Wait()
完成后关闭results
,主协程读取完毕后结束。
5.3 Pipeline(管道)模式:多阶段并发
**场景示意:**数据流经多个处理阶段(Stage 1 → Stage 2 → …),每个阶段都有其独立的并发度。整个流水线希望能被优雅取消。
5.3.1 代码示例
package main
import (
"context"
"fmt"
"sync"
"time"
)
// Stage1:生成数据
func stage1(ctx context.Context) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 1; i <= 10; i++ {
select {
case out <- i:
case <-ctx.Done():
return
}
time.Sleep(50 * time.Millisecond) // 模拟耗时
}
}()
return out
}
// Stage2:对数据进行 +100 处理
func stage2(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for v := range in {
select {
case out <- v + 100:
case <-ctx.Done():
return
}
time.Sleep(80 * time.Millisecond)
}
}()
return out
}
// Stage3:将数据转成字符串并打印
func stage3(ctx context.Context, in <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for v := range in {
select {
case <-ctx.Done():
return
default:
fmt.Println("最终结果:", v)
}
}
}
func main() {
// 整个 Pipeline 设置 500ms 超时
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
// 构建多个阶段的 Pipeline
c1 := stage1(ctx)
c2 := stage2(ctx, c1)
var wg sync.WaitGroup
wg.Add(1)
go stage3(ctx, c2, &wg)
wg.Wait()
fmt.Println("main: Pipeline 结束或超时退出")
}
5.3.2 解读
Pipeline 分为三个阶段:
- Stage1:在
c1
通道中生成 1\~10 的整数(每 50ms 一次)。 - Stage2:从
c1
中读取,将每个整数加 100 后写入c2
通道(每 80ms 一次)。 - Stage3:读取
c2
中的整数并打印输出。
- Stage1:在
取消传播
- 主函数创建带 500ms 超时的上下文
ctx
。 - 每个阶段都监听
<-ctx.Done()
,一旦超时(500ms)到达或外部调用cancel()
,各阶段都会返回并关闭自己的通道或退出。 - 因此整个 Pipeline 会在 500ms 后整体终止,不会出现“生产者卡住”、“下游阻塞”等死锁风险。
- 主函数创建带 500ms 超时的上下文
ASCII 图解 Pipeline 流程:
┌──────────────────────────┐
│ ctx: WithTimeout(500ms) │
└──────────────────────────┘
│
▼
┌────────────────┐
│ Stage1 (c1) │
│ i=1→out │
│ i=2→out │
│ ⋮ │
└────────────────┘
│
▼
┌────────────────┐
│ Stage2 (c2) │
│ v+100→out │
│ ⋮ │
└────────────────┘
│
▼
┌──────────────────┐
│ Stage3 输出 │
└──────────────────┘
如果 500ms 到达 → ctx.Done() 关闭 → 所有阶段退出
- 由于 Stage2 每次处理耗时 80ms,而 Stage1 产生速度 50ms,到了第 7\~8 个数据时可能会趋近 500ms 超时时间,从而后续数据未能完全通过 Stage3 即被取消。
六、结合 context
的并发控制示例:带 Value、取消与并发管理的综合案例
下面给出一个更完整的示例,结合前面所述的要点,在一个“伪 RPC 调用”场景中使用 context
:
- 存储请求上下文值:用户 ID(UserID)、TraceID
- 设置超时:整个调用链最大耗时 1 秒
- 并发发起多个子任务:模拟对多个后端服务的并发调用
- 统一取消:若超时或收到外部取消,则所有未完成子任务立即退出
- 结果收集:将返回结果聚合后输出
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)
// 定义 Context Key 类型,避免冲突
type ctxKey string
const (
traceIDKey ctxKey = "traceID"
userIDKey ctxKey = "userID"
)
// 模拟下游 RPC 调用:读取 ctx 中的 UserID 和 TraceID,并随机耗时
func rpcCall(ctx context.Context, serviceName string) (string, error) {
// 取出上下文值
userID, _ := ctx.Value(userIDKey).(string)
traceID, _ := ctx.Value(traceIDKey).(string)
// 模拟随机耗时 100~700ms
d := time.Duration(100+rand.Intn(600)) * time.Millisecond
select {
case <-time.After(d):
// 模拟返回结果
return fmt.Sprintf("[%s] user=%s trace=%s result=%d", serviceName, userID, traceID, rand.Intn(1000)), nil
case <-ctx.Done():
return "", ctx.Err()
}
}
func mainHandler(parentCtx context.Context, userID string) {
// 1. 在父级 Context 上添加 UserID、TraceID
// 先添加 UserID
ctx := context.WithValue(parentCtx, userIDKey, userID)
// 再添加 TraceID(可根据时间戳或 UUID 生成)
traceID := fmt.Sprintf("trace-%d", time.Now().UnixNano())
ctx = context.WithValue(ctx, traceIDKey, traceID)
fmt.Printf("主处理: userID=%s traceID=%s\n", userID, traceID)
// 2. 设置 1 秒超时
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
// 3. 并发调用 3 个后端服务
services := []string{"AuthService", "OrderService", "PaymentService"}
var wg sync.WaitGroup
resultCh := make(chan string, len(services))
for _, svc := range services {
wg.Add(1)
go func(s string) {
defer wg.Done()
res, err := rpcCall(ctx, s)
if err != nil {
fmt.Printf("[%s] 调用失败:%v\n", s, err)
return
}
select {
case resultCh <- res:
case <-ctx.Done():
return
}
}(svc)
}
// 4. WaitGroup 等待+超时控制:启动一个 goroutine,在所有子任务结束后关闭 resultCh
go func() {
wg.Wait()
close(resultCh)
}()
// 5. 主协程收集结果或超时退出
for r := range resultCh {
fmt.Println("收到结果:", r)
}
// 6. 检查 ctx.Err(),判断是否是超时或手动取消
if ctx.Err() == context.DeadlineExceeded {
fmt.Println("主处理: 已超时,已取消未完成任务")
} else {
fmt.Println("主处理: 所有任务处理完毕")
}
}
func main() {
rand.Seed(time.Now().UnixNano())
// 顶层使用 Background()
mainHandler(context.Background(), "user-123")
}
6.1 示例解读
Context 值存储
ctx := context.WithValue(parentCtx, userIDKey, userID)
:在根上下文上绑定userID
;ctx = context.WithValue(ctx, traceIDKey, traceID)
:为同一个ctx
再绑定traceID
。- 子函数
rpcCall
内可以通过ctx.Value
取出这两个值,用于日志、链路追踪等。
超时控制
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
:整个调用链及其派生的子任务都基于这个带 1 秒超时的ctx
。
并发调用子任务
for _, svc := range services { go … }
:并发向AuthService
、OrderService
、PaymentService
三个服务模拟 RPC 调用。- 每个子任务都传入同一个
ctx
,并在内部监听<-ctx.Done()
,一旦超时或外部调用cancel()
,就会提前退出并返回错误。
结果收集与退出
- 通过
sync.WaitGroup
等待所有子任务结束,并在wg.Wait()
完成后关闭resultCh
,让主协程的for r := range resultCh
能正常结束。 - 同时,主协程在
for
循环中也会因为<-ctx.Done()
导致所有子任务退出,resultCh
尽早关闭。
- 通过
Cancellation Propagation
- 如果某个子任务因
rpcCall
耗时过久(>1s)而未返回,会被ctx.Done()
触发ctx.Err() == DeadlineExceeded
,进而退出。 - 一旦第一个超时动作发生,其它子任务在
select
里也会优先进入case <-ctx.Done()
,快速停止,避免无谓计算。
- 如果某个子任务因
七、context
常见误区与最佳实践
在掌握了 context
的基本功能后,需要注意一些常见误区和推荐的最佳实践,防止滥用或出现隐蔽的并发问题。
7.1 不要将 Context
存储在结构体中(或作为结构体字段)
错误示例:
type MyService struct { ctx context.Context }
Context
本身是“一次性”的:通常应该将Context
作为函数参数传入,而不是作为全局或结构体字段持久保存。- 如果把
ctx
存在MyService
里,可能导致多个并发请求共用同一个ctx
,失去“请求范围”的隔离,也增加了取消时的复杂度。
7.2 在 context.WithValue
中存储“轻量且只读”的信息
- 不要把大量数据、或者应该及时关闭的对象(例如数据库链接)存入
Value
。 Value
主要用于“跨层级传递一些元信息”,而不是存储业务数据。
7.3 始终在派生的 Context
上调用 cancel()
类似于:
func handleRequest(w http.ResponseWriter, r *http.Request) { // 错误示例:直接使用 r.Context() ctx := r.Context() // ... 忘记调用 cancel ctx, cancel := context.WithTimeout(ctx, 2*time.Second) // … // 最后没有 defer cancel() }
正确做法:
func handleRequest(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), 2*time.Second) defer cancel() // 确保在函数退出前释放资源 // … 使用 ctx 执行下游操作 … }
- 如果忘记
cancel()
,即使ctx
到期自动结束了,Go 运行时内部的定时器等资源也不会及时释放,可能造成“资源泄漏”。
7.4 在 select
中使用 case <-ctx.Done()
永远放在最前面(可选,但推荐)
这样可以保证在等待其它 channel 或操作时,能够优先响应取消信号,减少无谓等待。例如:
select { case <-ctx.Done(): return case data := <-dataCh: process(data) case <-time.After(1 * time.Second): // 超时逻辑 }
- 使得“取消优先”策略更明确、可靠。
7.5 避免向已关闭的 Context
派生过多子上下文
- 每次调用
WithCancel
、WithTimeout
、WithValue
等,都会生成一个新的context
对象,如果在一个“到期或取消”的ctx
上频繁派生,容易导致 GC 压力增大。 - 建议在需要多个子 goroutine 共享同一个取消/超时信号时,只派生一次,然后在多个 goroutine 中共享这个
ctx
,而不是每个 goroutine 都从父级一直往上找。
八、小结与思考
为何使用
context
?- 统一管理取消信号与超时控制,避免不同 goroutine 或不同代码路径各自实现取消逻辑而混乱。
- 在同一请求范围内,传递请求级元数据(例如 TraceID、UserID)时,能够避免在函数调用链中“到处传参”的烦恼。
关键 API 复习
context.Background()
/context.TODO()
:根上下文或占位上下文;WithCancel(parent)
:手动取消;WithTimeout(parent, dur)
/WithDeadline(parent, time)
:自动超时;WithValue(parent, key, val)
:传递轻量级、只读的键值对;ctx.Done()
、ctx.Err()
、ctx.Value(key)
:监听取消、检查错误、获取上下文值。
并发模式结合
context
- Fan-out/Fan-in:并发调用子任务并集中收集结果,可通过
ctx
取消所有子任务。 - Worker Pool:启动固定数量的 worker,所有 worker 共享同一个
ctx
,在取消时全部退出。 - Pipeline(多阶段流水线):各阶段监听同一个
ctx
,在截止日期到或取消时整体终止,避免死锁。
- Fan-out/Fan-in:并发调用子任务并集中收集结果,可通过
最佳实践要点
- 始终在派生
ctx
后defer cancel()
; - 将
ctx
作为函数显式参数,避免全局或结构体中保存; - 用
WithValue
传递轻量级只读信息,尽量不要存储大型对象; - 在
select
中优先监听ctx.Done()
,保证“取消优先”; - 避免在同一个已过期/已取消的
ctx
上频繁派生新上下文。
- 始终在派生
通过本文的基础概念、取消与超时机制、Value 存储、并发模式示例以及最佳实践,相信你已经掌握了 Go 语言 context
包在并发编程中的核心用法。结合代码示例与 ASCII 图解,在实际开发中根据具体场景合理使用 context
,你将能够编写出“既高效又可控”的并发程序,轻松应对复杂的并发需求。
评论已关闭