并发编程进阶
2026/3/20大约 8 分钟
并发编程进阶
一、Context 详解
1.1 Context 概述
context.Context 是 Go 并发编程的核心工具,用于传递取消信号、超时控制和请求级数据。
Context 层级关系
Background(根 Context)
├── WithCancel → 手动取消
├── WithDeadline → 截止时间
├── WithTimeout → 超时时间
└── WithValue → 传递数据
├── 子 Context 1
│ └── 子 Context 1.1
└── 子 Context 2
1.2 Context 类型与使用
package main
import (
"context"
"fmt"
"time"
)
func main() {
// ========== WithCancel ==========
ctx, cancel := context.WithCancel(context.Background())
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("WithCancel: 收到取消信号 -", ctx.Err())
return
default:
fmt.Println("WithCancel: 工作中...")
time.Sleep(200 * time.Millisecond)
}
}
}(ctx)
time.Sleep(500 * time.Millisecond)
cancel() // 手动取消
time.Sleep(100 * time.Millisecond)
// ========== WithTimeout ==========
ctx2, cancel2 := context.WithTimeout(context.Background(), 300*time.Millisecond)
defer cancel2() // 即使超时也要调用
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("WithTimeout:", ctx.Err())
return
default:
fmt.Println("WithTimeout: 工作中...")
time.Sleep(100 * time.Millisecond)
}
}
}(ctx2)
time.Sleep(500 * time.Millisecond)
// ========== WithDeadline ==========
deadline := time.Now().Add(200 * time.Millisecond)
ctx3, cancel3 := context.WithDeadline(context.Background(), deadline)
defer cancel3()
select {
case <-time.After(1 * time.Second):
fmt.Println("完成")
case <-ctx3.Done():
fmt.Println("WithDeadline:", ctx3.Err())
}
// ========== WithValue ==========
type contextKey string
const (
keyRequestID contextKey = "request_id"
keyUserID contextKey = "user_id"
)
ctx4 := context.WithValue(context.Background(), keyRequestID, "req-12345")
ctx4 = context.WithValue(ctx4, keyUserID, 42)
processRequest(ctx4)
}
func processRequest(ctx context.Context) {
type contextKey string
const keyRequestID contextKey = "request_id"
if reqID, ok := ctx.Value(keyRequestID).(string); ok {
fmt.Println("请求 ID:", reqID)
}
}
1.3 Context 实战:HTTP 请求链路
package main
import (
"context"
"fmt"
"time"
)
// 模拟微服务调用链
func apiGateway(ctx context.Context) error {
// 设置总超时
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
fmt.Println("[Gateway] 开始处理请求")
// 调用用户服务
user, err := callUserService(ctx)
if err != nil {
return fmt.Errorf("user service: %w", err)
}
fmt.Println("[Gateway] 用户:", user)
// 调用订单服务
order, err := callOrderService(ctx)
if err != nil {
return fmt.Errorf("order service: %w", err)
}
fmt.Println("[Gateway] 订单:", order)
return nil
}
func callUserService(ctx context.Context) (string, error) {
// 子服务设置自己的超时(不能超过父级)
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
select {
case <-time.After(100 * time.Millisecond):
return "admin", nil
case <-ctx.Done():
return "", ctx.Err()
}
}
func callOrderService(ctx context.Context) (string, error) {
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
select {
case <-time.After(100 * time.Millisecond):
return "order-001", nil
case <-ctx.Done():
return "", ctx.Err()
}
}
// 运维场景:带超时的健康检查
func healthCheckWithContext(ctx context.Context, servers []string) map[string]bool {
results := make(map[string]bool)
ch := make(chan struct {
server string
healthy bool
}, len(servers))
for _, server := range servers {
go func(srv string) {
// 模拟检查
select {
case <-time.After(50 * time.Millisecond):
ch <- struct {
server string
healthy bool
}{srv, true}
case <-ctx.Done():
ch <- struct {
server string
healthy bool
}{srv, false}
}
}(server)
}
for range servers {
result := <-ch
results[result.server] = result.healthy
}
return results
}
func main() {
// 微服务调用链
ctx := context.Background()
if err := apiGateway(ctx); err != nil {
fmt.Println("错误:", err)
}
// 健康检查
fmt.Println("\n--- 健康检查 ---")
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
servers := []string{"web-01", "web-02", "db-01", "cache-01"}
results := healthCheckWithContext(ctx, servers)
for server, healthy := range results {
status := "正常"
if !healthy {
status = "异常"
}
fmt.Printf("%s: %s\n", server, status)
}
}
1.4 Context 最佳实践
package main
import (
"context"
"fmt"
)
// ========== 最佳实践 ==========
// 1. Context 作为第一个参数
func DoSomething(ctx context.Context, arg string) error {
// ...
return nil
}
// 2. 不要将 Context 存储在结构体中(除非有特殊理由)
// 差:
type BadService struct {
ctx context.Context // 不推荐
}
// 好:
type GoodService struct {
// 不存储 context
}
func (s *GoodService) Process(ctx context.Context) error {
// context 作为参数传入
return nil
}
// 3. 使用自定义类型作为 Value 的 key,避免冲突
type contextKey string
const (
requestIDKey contextKey = "request_id"
traceIDKey contextKey = "trace_id"
)
// 4. 提供辅助函数获取 Context 值
func RequestIDFromContext(ctx context.Context) string {
if id, ok := ctx.Value(requestIDKey).(string); ok {
return id
}
return ""
}
func ContextWithRequestID(ctx context.Context, id string) context.Context {
return context.WithValue(ctx, requestIDKey, id)
}
// 5. 总是调用 cancel(即使 context 已过期)
func example() {
ctx, cancel := context.WithTimeout(context.Background(), 0)
defer cancel() // 必须调用,释放资源
_ = ctx
}
// 6. 不要传递 nil context,使用 context.TODO()
func earlyDev() {
// 开发初期不确定用什么 context
ctx := context.TODO()
_ = ctx
}
func main() {
ctx := context.Background()
ctx = ContextWithRequestID(ctx, "req-001")
id := RequestIDFromContext(ctx)
fmt.Println("Request ID:", id)
}
二、高级并发模式
2.1 errgroup(并发错误处理)
package main
import (
"context"
"fmt"
"time"
"golang.org/x/sync/errgroup"
)
// 并发获取多个服务的数据
func fetchAllData(ctx context.Context) error {
g, ctx := errgroup.WithContext(ctx)
// 并发获取用户数据
var userData string
g.Go(func() error {
select {
case <-time.After(100 * time.Millisecond):
userData = "user data"
return nil
case <-ctx.Done():
return ctx.Err()
}
})
// 并发获取订单数据
var orderData string
g.Go(func() error {
select {
case <-time.After(150 * time.Millisecond):
orderData = "order data"
return nil
case <-ctx.Done():
return ctx.Err()
}
})
// 并发获取商品数据
var productData string
g.Go(func() error {
select {
case <-time.After(200 * time.Millisecond):
productData = "product data"
return nil
case <-ctx.Done():
return ctx.Err()
}
})
// 等待所有任务完成或第一个错误
if err := g.Wait(); err != nil {
return fmt.Errorf("fetch data: %w", err)
}
fmt.Printf("User: %s, Order: %s, Product: %s\n",
userData, orderData, productData)
return nil
}
// 限制并发度的 errgroup
func batchProcess(ctx context.Context, items []string) error {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(5) // 最多 5 个并发
for _, item := range items {
item := item // Go 1.22 之前需要
g.Go(func() error {
fmt.Printf("处理: %s\n", item)
time.Sleep(100 * time.Millisecond)
return nil
})
}
return g.Wait()
}
func main() {
ctx := context.Background()
fetchAllData(ctx)
fmt.Println("\n--- 批量处理 ---")
items := make([]string, 20)
for i := range items {
items[i] = fmt.Sprintf("item-%d", i)
}
batchProcess(ctx, items)
}
2.2 Rate Limiter(限流器)
package main
import (
"fmt"
"sync"
"time"
)
// 令牌桶限流器
type TokenBucket struct {
capacity int
tokens int
rate time.Duration // 每隔多久产生一个令牌
mu sync.Mutex
lastRefill time.Time
}
func NewTokenBucket(capacity int, rate time.Duration) *TokenBucket {
return &TokenBucket{
capacity: capacity,
tokens: capacity,
rate: rate,
lastRefill: time.Now(),
}
}
func (tb *TokenBucket) Allow() bool {
tb.mu.Lock()
defer tb.mu.Unlock()
// 补充令牌
now := time.Now()
elapsed := now.Sub(tb.lastRefill)
newTokens := int(elapsed / tb.rate)
if newTokens > 0 {
tb.tokens = min(tb.capacity, tb.tokens+newTokens)
tb.lastRefill = now
}
// 尝试获取令牌
if tb.tokens > 0 {
tb.tokens--
return true
}
return false
}
func min(a, b int) int {
if a < b {
return a
}
return b
}
func main() {
// 每秒 10 个请求的限流器
limiter := NewTokenBucket(10, 100*time.Millisecond)
var wg sync.WaitGroup
allowed := 0
denied := 0
var mu sync.Mutex
// 模拟 50 个并发请求
for i := 0; i < 50; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
if limiter.Allow() {
mu.Lock()
allowed++
mu.Unlock()
} else {
mu.Lock()
denied++
mu.Unlock()
}
}(i)
}
wg.Wait()
fmt.Printf("通过: %d, 拒绝: %d\n", allowed, denied)
}
2.3 Circuit Breaker(熔断器)
package main
import (
"errors"
"fmt"
"sync"
"time"
)
type State int
const (
StateClosed State = iota // 正常,允许请求
StateOpen // 熔断,拒绝请求
StateHalfOpen // 半开,允许少量请求探测
)
func (s State) String() string {
return [...]string{"Closed", "Open", "HalfOpen"}[s]
}
type CircuitBreaker struct {
mu sync.Mutex
state State
failureCount int
successCount int
failureThreshold int
successThreshold int
timeout time.Duration
lastFailureTime time.Time
}
func NewCircuitBreaker(failureThreshold, successThreshold int, timeout time.Duration) *CircuitBreaker {
return &CircuitBreaker{
state: StateClosed,
failureThreshold: failureThreshold,
successThreshold: successThreshold,
timeout: timeout,
}
}
var ErrCircuitOpen = errors.New("circuit breaker is open")
func (cb *CircuitBreaker) Execute(fn func() error) error {
cb.mu.Lock()
switch cb.state {
case StateOpen:
// 检查是否可以进入半开状态
if time.Since(cb.lastFailureTime) > cb.timeout {
cb.state = StateHalfOpen
cb.successCount = 0
fmt.Println("[熔断器] Open -> HalfOpen")
} else {
cb.mu.Unlock()
return ErrCircuitOpen
}
}
cb.mu.Unlock()
// 执行操作
err := fn()
cb.mu.Lock()
defer cb.mu.Unlock()
if err != nil {
cb.failureCount++
cb.lastFailureTime = time.Now()
if cb.state == StateHalfOpen || cb.failureCount >= cb.failureThreshold {
cb.state = StateOpen
fmt.Printf("[熔断器] -> Open (失败 %d 次)\n", cb.failureCount)
}
return err
}
// 成功
if cb.state == StateHalfOpen {
cb.successCount++
if cb.successCount >= cb.successThreshold {
cb.state = StateClosed
cb.failureCount = 0
fmt.Println("[熔断器] HalfOpen -> Closed")
}
} else {
cb.failureCount = 0
}
return nil
}
func main() {
cb := NewCircuitBreaker(3, 2, 1*time.Second)
callCount := 0
operation := func() error {
callCount++
if callCount <= 5 {
return errors.New("service unavailable")
}
return nil
}
for i := 0; i < 10; i++ {
err := cb.Execute(operation)
if err != nil {
fmt.Printf("请求 %d: 错误 - %v\n", i+1, err)
} else {
fmt.Printf("请求 %d: 成功\n", i+1)
}
time.Sleep(300 * time.Millisecond)
}
}
三、竞态检测
3.1 数据竞争检测
package main
import (
"fmt"
"sync"
)
// 有竞态的代码
func raceExample() {
counter := 0
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter++ // 竞态!多个 goroutine 同时读写
}()
}
wg.Wait()
fmt.Println("Counter:", counter) // 结果不确定
}
// 修复方式一:Mutex
func fixedWithMutex() {
var mu sync.Mutex
counter := 0
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
counter++
mu.Unlock()
}()
}
wg.Wait()
fmt.Println("Mutex Counter:", counter)
}
func main() {
// 运行:go run -race main.go
// -race 标志会检测数据竞争
raceExample()
fixedWithMutex()
}
# 竞态检测命令
go run -race main.go
go test -race ./...
go build -race -o myapp
四、GMP 调度模型
4.1 调度器概览
GMP 调度模型
├── G(Goroutine)
│ ├── 用户态轻量级线程
│ ├── 初始栈 2KB,可动态增长(最大 1GB)
│ ├── 状态:Gidle, Grunnable, Grunning, Gsyscall, Gwaiting, Gdead
│ └── 由 Go 运行时管理
├── M(Machine,OS 线程)
│ ├── 操作系统线程
│ ├── 最大数量由 GOMAXPROCS 间接控制
│ ├── 需要绑定 P 才能执行 G
│ └── 系统调用时会释放 P
├── P(Processor,逻辑处理器)
│ ├── 数量 = GOMAXPROCS(默认 CPU 核数)
│ ├── 维护本地运行队列(Local Run Queue)
│ ├── 提供执行 G 所需的资源
│ └── 工作窃取(Work Stealing)
└── 调度流程
├── P 从本地队列获取 G
├── 本地为空则从全局队列获取
├── 全局为空则从其他 P 窃取
└── 系统调用时 Hand-Off P 给空闲 M
4.2 调度器调优
package main
import (
"fmt"
"runtime"
"time"
)
func main() {
// 查看和设置 GOMAXPROCS
fmt.Println("CPU 核数:", runtime.NumCPU())
fmt.Println("当前 GOMAXPROCS:", runtime.GOMAXPROCS(0))
// 设置 GOMAXPROCS
old := runtime.GOMAXPROCS(4)
fmt.Println("旧 GOMAXPROCS:", old)
fmt.Println("新 GOMAXPROCS:", runtime.GOMAXPROCS(0))
// 查看 Goroutine 数量
fmt.Println("Goroutine 数:", runtime.NumGoroutine())
// 主动让出 CPU 时间片
go func() {
for i := 0; i < 5; i++ {
fmt.Println("Goroutine:", i)
runtime.Gosched() // 让出 CPU
}
}()
time.Sleep(time.Second)
// runtime.LockOSThread: 将 Goroutine 绑定到当前线程
// 常用于 CGO、图形界面等需要线程亲和性的场景
runtime.LockOSThread()
defer runtime.UnlockOSThread()
// 获取调用者信息
pc, file, line, ok := runtime.Caller(0)
if ok {
fn := runtime.FuncForPC(pc)
fmt.Printf("函数: %s\n", fn.Name())
fmt.Printf("文件: %s:%d\n", file, line)
}
}
# 调度器跟踪
GODEBUG=schedtrace=1000 ./myapp # 每 1000ms 打印调度信息
GODEBUG=scheddetail=1,schedtrace=1000 ./myapp # 详细信息
五、并发安全的数据结构
5.1 并发安全的队列
package main
import (
"fmt"
"sync"
)
// 并发安全的队列
type ConcurrentQueue[T any] struct {
mu sync.Mutex
items []T
cond *sync.Cond
}
func NewConcurrentQueue[T any]() *ConcurrentQueue[T] {
q := &ConcurrentQueue[T]{
items: make([]T, 0),
}
q.cond = sync.NewCond(&q.mu)
return q
}
func (q *ConcurrentQueue[T]) Enqueue(item T) {
q.mu.Lock()
defer q.mu.Unlock()
q.items = append(q.items, item)
q.cond.Signal() // 通知等待的消费者
}
func (q *ConcurrentQueue[T]) Dequeue() T {
q.mu.Lock()
defer q.mu.Unlock()
for len(q.items) == 0 {
q.cond.Wait() // 等待生产者
}
item := q.items[0]
q.items = q.items[1:]
return item
}
func (q *ConcurrentQueue[T]) Len() int {
q.mu.Lock()
defer q.mu.Unlock()
return len(q.items)
}
func main() {
queue := NewConcurrentQueue[string]()
var wg sync.WaitGroup
// 生产者
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
task := fmt.Sprintf("task-%d", id)
queue.Enqueue(task)
fmt.Printf("生产: %s\n", task)
}(i)
}
// 消费者
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
item := queue.Dequeue()
fmt.Printf("消费: %s\n", item)
}()
}
wg.Wait()
}
六、本章小结
| 主题 | 核心要点 |
|---|---|
| Context | 取消信号、超时控制、请求级数据传递 |
| errgroup | 并发任务组,统一错误处理 |
| 限流器 | 令牌桶算法,控制请求速率 |
| 熔断器 | Closed/Open/HalfOpen 三状态保护 |
| 竞态检测 | -race 标志检测数据竞争 |
| GMP 模型 | G(Goroutine) + M(线程) + P(处理器) |
运维开发建议
- 所有涉及超时的操作必须传递 Context
- 调用外部服务时使用熔断器保护
- 高并发场景使用限流器防止过载
- 开发和测试阶段开启
-race检测 - 使用
GODEBUG=schedtrace=1000排查调度问题