并发编程基础
2026/3/20大约 10 分钟
并发编程基础
一、Go 并发模型概述
Go 语言的并发是其最强大的特性之一。Go 采用 CSP(Communicating Sequential Processes)并发模型。
Go 并发模型
├── Goroutine(轻量级线程)
│ ├── 初始栈仅 2KB(线程通常 1-8MB)
│ ├── Go 运行时管理调度
│ ├── 创建成本极低
│ └── 可轻松创建数十万个
├── Channel(通道)
│ ├── Goroutine 间通信
│ ├── 类型安全
│ ├── 支持缓冲和非缓冲
│ └── 可用于同步
├── GMP 调度模型
│ ├── G - Goroutine
│ ├── M - Machine(OS 线程)
│ ├── P - Processor(逻辑处理器)
│ └── 默认 P 数量 = CPU 核数
└── 核心理念
└── "不要通过共享内存来通信,而要通过通信来共享内存"
二、Goroutine
2.1 基础用法
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func main() {
// 查看 CPU 核数和 Goroutine 数量
fmt.Println("CPU 核数:", runtime.NumCPU())
fmt.Println("GOMAXPROCS:", runtime.GOMAXPROCS(0))
fmt.Println("初始 Goroutine 数:", runtime.NumGoroutine())
// ========== 启动 Goroutine ==========
// 方式一:普通函数
go sayHello("World")
// 方式二:匿名函数
go func() {
fmt.Println("匿名 Goroutine")
}()
// 方式三:带参数的匿名函数
name := "Gopher"
go func(n string) {
fmt.Println("Hello,", n)
}(name)
fmt.Println("当前 Goroutine 数:", runtime.NumGoroutine())
time.Sleep(time.Second) // 等待 goroutine 完成
// ========== WaitGroup 等待多个 Goroutine ==========
var wg sync.WaitGroup
servers := []string{"web-01", "web-02", "web-03", "db-01", "cache-01"}
for _, server := range servers {
wg.Add(1)
go func(srv string) {
defer wg.Done()
checkServer(srv)
}(server)
}
wg.Wait() // 等待所有 goroutine 完成
fmt.Println("所有服务器检查完成")
}
func sayHello(name string) {
fmt.Println("Hello,", name)
}
func checkServer(name string) {
fmt.Printf("[检查] %s...\n", name)
time.Sleep(100 * time.Millisecond) // 模拟检查耗时
fmt.Printf("[完成] %s 正常\n", name)
}
2.2 Goroutine 生命周期管理
package main
import (
"context"
"fmt"
"sync"
"time"
)
// 使用 channel 控制退出
func worker(id int, quit <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-quit:
fmt.Printf("Worker %d: 收到退出信号\n", id)
return
default:
fmt.Printf("Worker %d: 工作中...\n", id)
time.Sleep(200 * time.Millisecond)
}
}
}
// 使用 context 控制退出(推荐)
func workerWithContext(ctx context.Context, id int, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d: %v\n", id, ctx.Err())
return
default:
fmt.Printf("Worker %d: 工作中...\n", id)
time.Sleep(200 * time.Millisecond)
}
}
}
func main() {
// ========== 方式一:channel 控制 ==========
fmt.Println("=== Channel 控制 ===")
quit := make(chan struct{})
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1)
go worker(i, quit, &wg)
}
time.Sleep(500 * time.Millisecond)
close(quit) // 通知所有 worker 退出
wg.Wait()
// ========== 方式二:context 控制 ==========
fmt.Println("\n=== Context 控制 ===")
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
var wg2 sync.WaitGroup
for i := 1; i <= 3; i++ {
wg2.Add(1)
go workerWithContext(ctx, i, &wg2)
}
wg2.Wait()
fmt.Println("所有 Worker 已退出")
}
三、Channel
3.1 Channel 基础
package main
import "fmt"
func main() {
// ========== 无缓冲 Channel(同步)==========
ch := make(chan string)
// 发送方
go func() {
ch <- "hello" // 阻塞,直到接收方准备好
fmt.Println("发送完毕")
}()
// 接收方
msg := <-ch // 阻塞,直到发送方发送数据
fmt.Println("收到:", msg)
// ========== 有缓冲 Channel(异步)==========
buffered := make(chan int, 3) // 容量为 3
buffered <- 1 // 不阻塞
buffered <- 2 // 不阻塞
buffered <- 3 // 不阻塞
// buffered <- 4 // 阻塞!缓冲已满
fmt.Println(<-buffered) // 1
fmt.Println(<-buffered) // 2
fmt.Println(<-buffered) // 3
// ========== Channel 方向(限制读写)==========
// chan<- int : 只写 channel
// <-chan int : 只读 channel
jobs := make(chan int, 5)
results := make(chan int, 5)
// producer 只能写
go producer(jobs)
// consumer 只读 jobs,只写 results
go consumer(jobs, results)
// 读取结果
for r := range results {
fmt.Println("结果:", r)
}
}
func producer(out chan<- int) {
for i := 1; i <= 5; i++ {
out <- i
}
close(out) // 生产完毕,关闭 channel
}
func consumer(in <-chan int, out chan<- int) {
for v := range in {
out <- v * v
}
close(out)
}
3.2 Channel 模式
package main
import (
"fmt"
"sync"
"time"
)
// ========== 模式一:生产者-消费者 ==========
func producerConsumer() {
fmt.Println("=== 生产者-消费者 ===")
tasks := make(chan string, 10)
var wg sync.WaitGroup
// 生产者
go func() {
servers := []string{"web-01", "web-02", "web-03", "db-01", "cache-01"}
for _, s := range servers {
tasks <- s
}
close(tasks)
}()
// 多个消费者
for i := 0; i < 3; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for task := range tasks {
fmt.Printf("Worker %d: 处理 %s\n", workerID, task)
time.Sleep(50 * time.Millisecond)
}
}(i)
}
wg.Wait()
}
// ========== 模式二:扇出/扇入(Fan-out/Fan-in)==========
func fanOutFanIn() {
fmt.Println("\n=== 扇出/扇入 ===")
// 生成数据
genNumbers := func(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// 处理数据(扇出:多个 goroutine 读同一个 channel)
square := func(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
// 合并结果(扇入:多个 channel 合并为一个)
merge := func(channels ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
out <- v
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
// 使用管道
input := genNumbers(1, 2, 3, 4, 5, 6, 7, 8)
// 扇出:3 个 worker 并行处理
c1 := square(input)
c2 := square(input)
c3 := square(input)
// 扇入:合并结果
for result := range merge(c1, c2, c3) {
fmt.Println("结果:", result)
}
}
// ========== 模式三:Pipeline(管道)==========
func pipeline() {
fmt.Println("\n=== Pipeline ===")
// 阶段1:生成数据
gen := func(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// 阶段2:平方
sq := func(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
// 阶段3:加倍
double := func(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * 2
}
close(out)
}()
return out
}
// 连接管道
for v := range double(sq(gen(1, 2, 3, 4, 5))) {
fmt.Println(v)
}
}
// ========== 模式四:超时控制 ==========
func timeoutPattern() {
fmt.Println("\n=== 超时控制 ===")
result := make(chan string, 1)
go func() {
time.Sleep(2 * time.Second) // 模拟慢操作
result <- "完成"
}()
select {
case res := <-result:
fmt.Println("结果:", res)
case <-time.After(1 * time.Second):
fmt.Println("操作超时")
}
}
// ========== 模式五:信号量(限制并发)==========
func semaphorePattern() {
fmt.Println("\n=== 信号量限制并发 ===")
const maxConcurrent = 3
sem := make(chan struct{}, maxConcurrent) // 信号量
var wg sync.WaitGroup
tasks := []string{"t1", "t2", "t3", "t4", "t5", "t6", "t7", "t8"}
for _, task := range tasks {
wg.Add(1)
go func(t string) {
defer wg.Done()
sem <- struct{}{} // 获取信号量(可能阻塞)
defer func() { <-sem }() // 释放信号量
fmt.Printf("执行 %s (并发数: %d)\n", t, len(sem))
time.Sleep(100 * time.Millisecond)
}(task)
}
wg.Wait()
}
func main() {
producerConsumer()
fanOutFanIn()
pipeline()
timeoutPattern()
semaphorePattern()
}
四、select 语句
4.1 select 基础
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(100 * time.Millisecond)
ch1 <- "来自 channel 1"
}()
go func() {
time.Sleep(200 * time.Millisecond)
ch2 <- "来自 channel 2"
}()
// select 等待多个 channel
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println("收到:", msg1)
case msg2 := <-ch2:
fmt.Println("收到:", msg2)
}
}
// ========== 非阻塞操作 ==========
ch := make(chan int)
select {
case v := <-ch:
fmt.Println("收到:", v)
default:
fmt.Println("没有数据可读(非阻塞)")
}
// ========== 超时控制 ==========
slowCh := make(chan string)
go func() {
time.Sleep(2 * time.Second)
slowCh <- "慢操作完成"
}()
select {
case result := <-slowCh:
fmt.Println(result)
case <-time.After(500 * time.Millisecond):
fmt.Println("超时")
}
}
4.2 运维场景:多源监控
package main
import (
"context"
"fmt"
"math/rand"
"time"
)
// 模拟不同来源的告警
type Alert struct {
Source string
Level string
Message string
Time time.Time
}
func generateAlerts(ctx context.Context, source string, interval time.Duration) <-chan Alert {
out := make(chan Alert)
go func() {
defer close(out)
levels := []string{"INFO", "WARN", "ERROR", "CRITICAL"}
for {
select {
case <-ctx.Done():
return
case <-time.After(interval + time.Duration(rand.Intn(100))*time.Millisecond):
alert := Alert{
Source: source,
Level: levels[rand.Intn(len(levels))],
Message: fmt.Sprintf("%s 检测到异常", source),
Time: time.Now(),
}
out <- alert
}
}
}()
return out
}
// 心跳检测
func heartbeat(ctx context.Context, interval time.Duration) <-chan time.Time {
out := make(chan time.Time)
go func() {
defer close(out)
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case t := <-ticker.C:
out <- t
}
}
}()
return out
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
// 多个告警源
cpuAlerts := generateAlerts(ctx, "CPU", 300*time.Millisecond)
memAlerts := generateAlerts(ctx, "Memory", 400*time.Millisecond)
diskAlerts := generateAlerts(ctx, "Disk", 500*time.Millisecond)
hb := heartbeat(ctx, 1*time.Second)
fmt.Println("开始监控...")
for {
select {
case alert, ok := <-cpuAlerts:
if !ok {
cpuAlerts = nil
continue
}
fmt.Printf("[%s][%s] %s (%s)\n",
alert.Level, alert.Source, alert.Message,
alert.Time.Format("15:04:05"))
case alert, ok := <-memAlerts:
if !ok {
memAlerts = nil
continue
}
fmt.Printf("[%s][%s] %s (%s)\n",
alert.Level, alert.Source, alert.Message,
alert.Time.Format("15:04:05"))
case alert, ok := <-diskAlerts:
if !ok {
diskAlerts = nil
continue
}
fmt.Printf("[%s][%s] %s (%s)\n",
alert.Level, alert.Source, alert.Message,
alert.Time.Format("15:04:05"))
case t, ok := <-hb:
if !ok {
hb = nil
continue
}
fmt.Printf("[HEARTBEAT] %s\n", t.Format("15:04:05"))
case <-ctx.Done():
fmt.Println("\n监控结束:", ctx.Err())
return
}
}
}
五、同步原语
5.1 sync.Mutex 互斥锁
package main
import (
"fmt"
"sync"
)
// 非线程安全的计数器
type UnsafeCounter struct {
count int
}
// 线程安全的计数器
type SafeCounter struct {
mu sync.Mutex
count int
}
func (c *SafeCounter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.count++
}
func (c *SafeCounter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.count
}
// 读写锁(适合读多写少)
type MetricsCollector struct {
mu sync.RWMutex
metrics map[string]float64
}
func NewMetricsCollector() *MetricsCollector {
return &MetricsCollector{
metrics: make(map[string]float64),
}
}
func (mc *MetricsCollector) Set(key string, value float64) {
mc.mu.Lock()
defer mc.mu.Unlock()
mc.metrics[key] = value
}
func (mc *MetricsCollector) Get(key string) (float64, bool) {
mc.mu.RLock() // 读锁,多个读取可以并发
defer mc.mu.RUnlock()
val, ok := mc.metrics[key]
return val, ok
}
func (mc *MetricsCollector) GetAll() map[string]float64 {
mc.mu.RLock()
defer mc.mu.RUnlock()
result := make(map[string]float64, len(mc.metrics))
for k, v := range mc.metrics {
result[k] = v
}
return result
}
func main() {
// 安全计数器
counter := &SafeCounter{}
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Increment()
}()
}
wg.Wait()
fmt.Println("计数器:", counter.Value()) // 1000
// 监控指标收集器
mc := NewMetricsCollector()
var wg2 sync.WaitGroup
// 写入
for i := 0; i < 10; i++ {
wg2.Add(1)
go func(id int) {
defer wg2.Done()
mc.Set(fmt.Sprintf("metric_%d", id), float64(id)*1.5)
}(i)
}
// 读取
for i := 0; i < 20; i++ {
wg2.Add(1)
go func(id int) {
defer wg2.Done()
key := fmt.Sprintf("metric_%d", id%10)
if val, ok := mc.Get(key); ok {
_ = val
}
}(i)
}
wg2.Wait()
fmt.Println("指标数量:", len(mc.GetAll()))
}
5.2 sync.Once
package main
import (
"fmt"
"sync"
)
// 单例模式
type Database struct {
Name string
}
var (
dbInstance *Database
dbOnce sync.Once
)
func GetDB() *Database {
dbOnce.Do(func() {
fmt.Println("初始化数据库连接...(只执行一次)")
dbInstance = &Database{Name: "production"}
})
return dbInstance
}
// 配置初始化
type AppConfig struct {
Host string
Port int
}
var (
config *AppConfig
configOnce sync.Once
)
func GetConfig() *AppConfig {
configOnce.Do(func() {
config = &AppConfig{
Host: "localhost",
Port: 8080,
}
})
return config
}
func main() {
var wg sync.WaitGroup
// 多个 goroutine 同时获取数据库实例
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
db := GetDB()
fmt.Println("获取到数据库:", db.Name)
}()
}
wg.Wait()
}
5.3 sync.Pool
package main
import (
"bytes"
"fmt"
"sync"
)
// 对象池:减少内存分配和 GC 压力
var bufferPool = sync.Pool{
New: func() interface{} {
fmt.Println("创建新 Buffer")
return new(bytes.Buffer)
},
}
func processRequest(id int) string {
// 从池中获取
buf := bufferPool.Get().(*bytes.Buffer)
defer func() {
buf.Reset() // 重置内容
bufferPool.Put(buf) // 归还到池中
}()
// 使用 buffer
fmt.Fprintf(buf, "Request %d processed", id)
return buf.String()
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
result := processRequest(id)
fmt.Println(result)
}(i)
}
wg.Wait()
}
5.4 sync.Cond
package main
import (
"fmt"
"sync"
"time"
)
// 运维场景:等待服务就绪
type ServiceRegistry struct {
mu sync.Mutex
cond *sync.Cond
services map[string]bool
required int
}
func NewServiceRegistry(required int) *ServiceRegistry {
sr := &ServiceRegistry{
services: make(map[string]bool),
required: required,
}
sr.cond = sync.NewCond(&sr.mu)
return sr
}
func (sr *ServiceRegistry) Register(name string) {
sr.mu.Lock()
defer sr.mu.Unlock()
sr.services[name] = true
fmt.Printf("[注册] %s (已注册: %d/%d)\n", name, len(sr.services), sr.required)
// 广播通知等待的 goroutine
sr.cond.Broadcast()
}
func (sr *ServiceRegistry) WaitForAll() {
sr.mu.Lock()
defer sr.mu.Unlock()
for len(sr.services) < sr.required {
sr.cond.Wait() // 等待条件满足
}
fmt.Println("所有服务就绪!")
}
func main() {
registry := NewServiceRegistry(3)
// 等待所有服务就绪
go func() {
registry.WaitForAll()
}()
// 模拟服务启动
services := []string{"database", "cache", "auth"}
for _, svc := range services {
time.Sleep(200 * time.Millisecond)
registry.Register(svc)
}
time.Sleep(100 * time.Millisecond)
}
5.5 原子操作
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
// ========== 原子计数器 ==========
var counter int64
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomic.AddInt64(&counter, 1)
}()
}
wg.Wait()
fmt.Println("原子计数:", atomic.LoadInt64(&counter))
// ========== atomic.Value(存储任意类型)==========
var config atomic.Value
type Config struct {
Host string
Port int
}
// 存储
config.Store(&Config{Host: "localhost", Port: 8080})
// 读取
cfg := config.Load().(*Config)
fmt.Printf("配置: %s:%d\n", cfg.Host, cfg.Port)
// 热更新
config.Store(&Config{Host: "10.0.0.1", Port: 9090})
newCfg := config.Load().(*Config)
fmt.Printf("新配置: %s:%d\n", newCfg.Host, newCfg.Port)
// ========== Go 1.19+ atomic 泛型类型 ==========
var atomicBool atomic.Bool
atomicBool.Store(true)
fmt.Println("Bool:", atomicBool.Load())
var atomicInt atomic.Int64
atomicInt.Add(100)
atomicInt.Add(50)
fmt.Println("Int64:", atomicInt.Load())
// CompareAndSwap
swapped := atomicInt.CompareAndSwap(150, 200)
fmt.Printf("CAS: swapped=%v, value=%d\n", swapped, atomicInt.Load())
}
六、Worker Pool 模式
package main
import (
"fmt"
"sync"
"time"
)
// 任务定义
type Task struct {
ID int
Payload string
}
// 结果定义
type Result struct {
TaskID int
Output string
Error error
}
// Worker Pool
type WorkerPool struct {
workerCount int
taskChan chan Task
resultChan chan Result
wg sync.WaitGroup
}
func NewWorkerPool(workerCount, bufferSize int) *WorkerPool {
return &WorkerPool{
workerCount: workerCount,
taskChan: make(chan Task, bufferSize),
resultChan: make(chan Result, bufferSize),
}
}
func (wp *WorkerPool) Start() {
for i := 0; i < wp.workerCount; i++ {
wp.wg.Add(1)
go wp.worker(i)
}
}
func (wp *WorkerPool) worker(id int) {
defer wp.wg.Done()
for task := range wp.taskChan {
fmt.Printf("Worker %d: 处理任务 %d\n", id, task.ID)
time.Sleep(50 * time.Millisecond) // 模拟处理
wp.resultChan <- Result{
TaskID: task.ID,
Output: fmt.Sprintf("任务 %d (%s) 完成", task.ID, task.Payload),
}
}
}
func (wp *WorkerPool) Submit(task Task) {
wp.taskChan <- task
}
func (wp *WorkerPool) Results() <-chan Result {
return wp.resultChan
}
func (wp *WorkerPool) Wait() {
close(wp.taskChan)
wp.wg.Wait()
close(wp.resultChan)
}
func main() {
pool := NewWorkerPool(3, 20)
pool.Start()
// 提交任务
go func() {
servers := []string{"web-01", "web-02", "web-03", "db-01", "db-02",
"cache-01", "cache-02", "queue-01", "queue-02", "proxy-01"}
for i, server := range servers {
pool.Submit(Task{
ID: i + 1,
Payload: server,
})
}
pool.Wait()
}()
// 收集结果
for result := range pool.Results() {
if result.Error != nil {
fmt.Printf("任务 %d 失败: %v\n", result.TaskID, result.Error)
} else {
fmt.Printf("结果: %s\n", result.Output)
}
}
fmt.Println("所有任务完成")
}
七、本章小结
| 主题 | 核心要点 |
|---|---|
| Goroutine | 轻量级协程,使用 go 关键字启动 |
| Channel | 类型安全的通信管道,分缓冲和非缓冲 |
| select | 多路复用,监听多个 Channel |
| WaitGroup | 等待一组 Goroutine 完成 |
| Mutex | 互斥锁保护共享数据 |
| RWMutex | 读写锁,适合读多写少 |
| Once | 确保只执行一次(单例、初始化) |
| Pool | 对象池减少 GC 压力 |
| atomic | 原子操作,无锁并发 |
| Worker Pool | 控制并发度的经典模式 |
运维开发建议
- 优先使用 Channel 通信,而非共享内存
- 使用
context.Context控制 Goroutine 生命周期 - Worker Pool 控制并发数,避免资源耗尽
- 读多写少场景使用
sync.RWMutex - 简单的计数器使用
atomic比Mutex更高效