大家好,我是极客老墨。
并发编程中,Channel 很好用,但不是万能的。有时候需要更精细的控制:等待一组任务完成、保护共享数据、限制并发数量。这时候就需要 sync 包的同步工具了。
这篇就聊聊 Go 的并发进阶工具,看看它们各自适合什么场景。
WaitGroup:等待组
WaitGroup 用于等待一组 Goroutine 完成,是最常用的同步工具。
基本用法
1import (
2 "fmt"
3 "sync"
4 "time"
5)
6
7func worker(id int, wg *sync.WaitGroup) {
8 defer wg.Done() // 完成时调用
9
10 fmt.Printf("Worker %d starting\n", id)
11 time.Sleep(time.Second)
12 fmt.Printf("Worker %d done\n", id)
13}
14
15func main() {
16 var wg sync.WaitGroup
17
18 for i := 1; i <= 5; i++ {
19 wg.Add(1) // 增加计数
20 go worker(i, &wg)
21 }
22
23 wg.Wait() // 等待所有完成
24 fmt.Println("All workers completed")
25}
要点:
Add(n)增加计数器Done()减少计数器(等价于Add(-1))Wait()阻塞直到计数器为 0- 必须传递指针
*sync.WaitGroup
常见错误
1// ❌ 错误:在 goroutine 内部 Add
2go func() {
3 wg.Add(1) // 可能在 Wait 之后执行
4 defer wg.Done()
5 // ...
6}()
7
8// ✅ 正确:在启动前 Add
9wg.Add(1)
10go func() {
11 defer wg.Done()
12 // ...
13}()
批量添加
1func main() {
2 var wg sync.WaitGroup
3
4 // 一次性添加多个
5 wg.Add(5)
6
7 for i := 1; i <= 5; i++ {
8 go func(id int) {
9 defer wg.Done()
10 fmt.Printf("Task %d\n", id)
11 }(i)
12 }
13
14 wg.Wait()
15}
Mutex:互斥锁
Mutex 用于保护共享数据,确保同一时间只有一个 Goroutine 访问。
基本用法
1type SafeCounter struct {
2 mu sync.Mutex
3 count int
4}
5
6func (c *SafeCounter) Inc() {
7 c.mu.Lock()
8 defer c.mu.Unlock()
9 c.count++
10}
11
12func (c *SafeCounter) Value() int {
13 c.mu.Lock()
14 defer c.mu.Unlock()
15 return c.count
16}
17
18func main() {
19 counter := SafeCounter{}
20 var wg sync.WaitGroup
21
22 for i := 0; i < 1000; i++ {
23 wg.Add(1)
24 go func() {
25 defer wg.Done()
26 counter.Inc()
27 }()
28 }
29
30 wg.Wait()
31 fmt.Println("Final count:", counter.Value()) // 1000
32}
要点:
Lock()加锁,Unlock()解锁- 使用
defer确保解锁 - 锁的粒度要小,避免长时间持有
不使用锁的后果
1// ❌ 不安全:数据竞争
2type UnsafeCounter struct {
3 count int
4}
5
6func (c *UnsafeCounter) Inc() {
7 c.count++ // 多个 goroutine 同时修改
8}
9
10// 最终结果可能小于 1000
锁的粒度
1// ❌ 不好:锁的粒度太大
2func (c *SafeCounter) Process() {
3 c.mu.Lock()
4 defer c.mu.Unlock()
5
6 // 耗时操作
7 time.Sleep(time.Second)
8 c.count++
9}
10
11// ✅ 好:只锁必要的部分
12func (c *SafeCounter) Process() {
13 // 耗时操作在锁外
14 time.Sleep(time.Second)
15
16 c.mu.Lock()
17 c.count++
18 c.mu.Unlock()
19}
RWMutex:读写锁
RWMutex 允许多个读操作同时进行,但写操作是独占的。
基本用法
1type Cache struct {
2 mu sync.RWMutex
3 data map[string]string
4}
5
6func (c *Cache) Get(key string) string {
7 c.mu.RLock() // 读锁
8 defer c.mu.RUnlock()
9 return c.data[key]
10}
11
12func (c *Cache) Set(key, value string) {
13 c.mu.Lock() // 写锁
14 defer c.mu.Unlock()
15 c.data[key] = value
16}
要点:
RLock()读锁,多个 Goroutine 可以同时持有Lock()写锁,独占访问- 读多写少的场景性能更好
性能对比
1// 读多写少场景
2func benchmark() {
3 cache := Cache{data: make(map[string]string)}
4 cache.Set("key", "value")
5
6 var wg sync.WaitGroup
7
8 // 1000 个读操作(并发执行)
9 for i := 0; i < 1000; i++ {
10 wg.Add(1)
11 go func() {
12 defer wg.Done()
13 cache.Get("key")
14 }()
15 }
16
17 // 10 个写操作(独占执行)
18 for i := 0; i < 10; i++ {
19 wg.Add(1)
20 go func(i int) {
21 defer wg.Done()
22 cache.Set("key", fmt.Sprintf("value%d", i))
23 }(i)
24 }
25
26 wg.Wait()
27}
何时使用
- ✅ 读操作远多于写操作
- ✅ 读操作耗时较长
- ❌ 写操作频繁(用 Mutex 更简单)
Once:单次执行
Once 确保某个操作只执行一次,常用于初始化。
单例模式
1var (
2 instance *Singleton
3 once sync.Once
4)
5
6type Singleton struct {
7 data string
8}
9
10func GetInstance() *Singleton {
11 once.Do(func() {
12 fmt.Println("Creating instance")
13 instance = &Singleton{data: "singleton"}
14 })
15 return instance
16}
17
18func main() {
19 var wg sync.WaitGroup
20
21 for i := 0; i < 10; i++ {
22 wg.Add(1)
23 go func() {
24 defer wg.Done()
25 s := GetInstance()
26 fmt.Println(s.data)
27 }()
28 }
29
30 wg.Wait()
31 // "Creating instance" 只打印一次
32}
配置加载
1var (
2 config *Config
3 once sync.Once
4)
5
6type Config struct {
7 Host string
8 Port int
9}
10
11func LoadConfig() *Config {
12 once.Do(func() {
13 fmt.Println("Loading config...")
14 config = &Config{
15 Host: "localhost",
16 Port: 8080,
17 }
18 })
19 return config
20}
要点:
Do(func())只执行一次- 即使多个 Goroutine 同时调用,也只执行一次
- 线程安全,无需额外加锁
使用场景
- 单例模式
- 配置文件加载
- 数据库连接初始化
- 日志初始化
Cond:条件变量
Cond 用于 Goroutine 之间的协调,等待或通知条件发生。
基本用法
1func main() {
2 var mu sync.Mutex
3 cond := sync.NewCond(&mu)
4 ready := false
5
6 // 等待者
7 go func() {
8 mu.Lock()
9 for !ready {
10 cond.Wait() // 等待条件
11 }
12 fmt.Println("Condition met")
13 mu.Unlock()
14 }()
15
16 // 通知者
17 time.Sleep(time.Second)
18 mu.Lock()
19 ready = true
20 cond.Signal() // 通知一个等待者
21 mu.Unlock()
22
23 time.Sleep(time.Second)
24}
生产者-消费者
1type Queue struct {
2 mu sync.Mutex
3 cond *sync.Cond
4 items []int
5}
6
7func NewQueue() *Queue {
8 q := &Queue{}
9 q.cond = sync.NewCond(&q.mu)
10 return q
11}
12
13func (q *Queue) Put(item int) {
14 q.mu.Lock()
15 defer q.mu.Unlock()
16
17 q.items = append(q.items, item)
18 q.cond.Signal() // 通知消费者
19}
20
21func (q *Queue) Get() int {
22 q.mu.Lock()
23 defer q.mu.Unlock()
24
25 for len(q.items) == 0 {
26 q.cond.Wait() // 等待生产者
27 }
28
29 item := q.items[0]
30 q.items = q.items[1:]
31 return item
32}
要点:
Wait()释放锁并等待,被唤醒后重新获取锁Signal()唤醒一个等待的 GoroutineBroadcast()唤醒所有等待的 Goroutine- 必须在持有锁的情况下调用
何时使用
- 需要等待某个条件满足
- 生产者-消费者模式
- 事件通知机制
⚠️ 注意:大多数情况下,Channel 更简单易用。
Map:并发安全的 Map
Go 1.9+ 提供了 sync.Map,适合特定场景。
基本用法
1func main() {
2 var m sync.Map
3
4 // 存储
5 m.Store("key1", "value1")
6 m.Store("key2", "value2")
7
8 // 读取
9 if value, ok := m.Load("key1"); ok {
10 fmt.Println("Found:", value)
11 }
12
13 // 删除
14 m.Delete("key2")
15
16 // 遍历
17 m.Range(func(key, value interface{}) bool {
18 fmt.Printf("%v: %v\n", key, value)
19 return true // 继续遍历
20 })
21}
LoadOrStore
1// 如果存在则返回,否则存储并返回
2actual, loaded := m.LoadOrStore("key", "value")
3if loaded {
4 fmt.Println("Key exists:", actual)
5} else {
6 fmt.Println("Key stored:", actual)
7}
何时使用
- ✅ 键值对只写入一次,读取多次
- ✅ 多个 Goroutine 读写不同的键
- ❌ 频繁修改同一个键(用
map + Mutex更好)
Pool:对象池
Pool 用于复用对象,减少 GC 压力。
基本用法
1var bufferPool = sync.Pool{
2 New: func() interface{} {
3 return new(bytes.Buffer)
4 },
5}
6
7func processData(data string) {
8 // 从池中获取
9 buf := bufferPool.Get().(*bytes.Buffer)
10 defer bufferPool.Put(buf) // 归还到池
11
12 buf.Reset() // 重置状态
13 buf.WriteString(data)
14
15 // 使用 buf
16 fmt.Println(buf.String())
17}
要点:
Get()从池中获取对象Put()归还对象到池New函数在池为空时创建新对象- 对象可能被 GC 回收
使用场景
- 频繁创建和销毁的对象
- 对象创建成本高
- 减少 GC 压力
信号量
信号量用于限制并发数量。
使用 golang.org/x/sync/semaphore
1import (
2 "context"
3 "fmt"
4 "golang.org/x/sync/semaphore"
5 "time"
6)
7
8func main() {
9 // 最多 3 个并发
10 sem := semaphore.NewWeighted(3)
11 ctx := context.Background()
12
13 for i := 1; i <= 10; i++ {
14 // 获取信号量
15 if err := sem.Acquire(ctx, 1); err != nil {
16 fmt.Println("Failed:", err)
17 break
18 }
19
20 go func(id int) {
21 defer sem.Release(1) // 释放
22
23 fmt.Printf("Task %d started\n", id)
24 time.Sleep(time.Second)
25 fmt.Printf("Task %d done\n", id)
26 }(i)
27 }
28
29 // 等待所有完成
30 sem.Acquire(ctx, 3)
31}
使用场景
- 限制并发数(如数据库连接池)
- 控制资源访问(如 API 限流)
- 避免资源耗尽
ErrGroup:带错误处理
ErrGroup 提供了更方便的错误处理和上下文管理。
基本用法
1import (
2 "context"
3 "fmt"
4 "golang.org/x/sync/errgroup"
5 "time"
6)
7
8func fetchURL(url string) error {
9 time.Sleep(time.Second)
10 if url == "bad-url" {
11 return fmt.Errorf("failed: %s", url)
12 }
13 fmt.Printf("Fetched %s\n", url)
14 return nil
15}
16
17func main() {
18 g, ctx := errgroup.WithContext(context.Background())
19
20 urls := []string{"url1", "url2", "bad-url", "url3"}
21
22 for _, url := range urls {
23 url := url // 避免闭包问题
24 g.Go(func() error {
25 select {
26 case <-ctx.Done():
27 return ctx.Err()
28 default:
29 return fetchURL(url)
30 }
31 })
32 }
33
34 // 等待所有完成,返回第一个错误
35 if err := g.Wait(); err != nil {
36 fmt.Println("Error:", err)
37 }
38}
要点:
- 自动管理 WaitGroup
- 返回第一个非 nil 错误
- 提供 Context 取消机制
- 一个任务失败,其他任务可以通过 Context 感知
限制并发数
1func main() {
2 g, ctx := errgroup.WithContext(context.Background())
3
4 // 限制并发数为 3
5 g.SetLimit(3)
6
7 for i := 1; i <= 10; i++ {
8 i := i
9 g.Go(func() error {
10 fmt.Printf("Task %d\n", i)
11 time.Sleep(time.Second)
12 return nil
13 })
14 }
15
16 g.Wait()
17}
完整示例
把前面的知识点串起来,看个完整的例子:
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/errgroup"
7 "golang.org/x/sync/semaphore"
8 "sync"
9 "time"
10)
11
12// 任务结果
13type Result struct {
14 ID int
15 Value string
16}
17
18// 并发安全的结果收集器
19type ResultCollector struct {
20 mu sync.RWMutex
21 results []Result
22}
23
24func (rc *ResultCollector) Add(r Result) {
25 rc.mu.Lock()
26 defer rc.mu.Unlock()
27 rc.results = append(rc.results, r)
28}
29
30func (rc *ResultCollector) GetAll() []Result {
31 rc.mu.RLock()
32 defer rc.mu.RUnlock()
33 return rc.results
34}
35
36// 配置(单例)
37var (
38 config *Config
39 once sync.Once
40)
41
42type Config struct {
43 MaxConcurrency int64
44}
45
46func GetConfig() *Config {
47 once.Do(func() {
48 config = &Config{MaxConcurrency: 3}
49 })
50 return config
51}
52
53// 处理任务
54func processTask(id int, sem *semaphore.Weighted) (Result, error) {
55 ctx := context.Background()
56
57 // 获取信号量
58 if err := sem.Acquire(ctx, 1); err != nil {
59 return Result{}, err
60 }
61 defer sem.Release(1)
62
63 // 模拟处理
64 time.Sleep(500 * time.Millisecond)
65
66 if id == 5 {
67 return Result{}, fmt.Errorf("task %d failed", id)
68 }
69
70 return Result{
71 ID: id,
72 Value: fmt.Sprintf("result-%d", id),
73 }, nil
74}
75
76func main() {
77 cfg := GetConfig()
78 collector := &ResultCollector{}
79
80 // 创建信号量
81 sem := semaphore.NewWeighted(cfg.MaxConcurrency)
82
83 // 使用 errgroup
84 g, _ := errgroup.WithContext(context.Background())
85
86 // 启动 10 个任务
87 for i := 1; i <= 10; i++ {
88 i := i
89 g.Go(func() error {
90 result, err := processTask(i, sem)
91 if err != nil {
92 return err
93 }
94 collector.Add(result)
95 return nil
96 })
97 }
98
99 // 等待所有完成
100 if err := g.Wait(); err != nil {
101 fmt.Println("Error:", err)
102 }
103
104 // 打印结果
105 results := collector.GetAll()
106 fmt.Printf("Collected %d results\n", len(results))
107 for _, r := range results {
108 fmt.Printf("Task %d: %s\n", r.ID, r.Value)
109 }
110}
这个例子展示了:
- Once 实现单例配置
- RWMutex 保护结果收集器
- Semaphore 限制并发数
- ErrGroup 管理任务和错误
- 多种同步工具的组合使用
老墨总结
Go 并发同步工具的 5 个关键点:
- WaitGroup 等待组:等待一组 Goroutine 完成,Add 要在 Wait 之前
- Mutex/RWMutex 锁:保护共享数据,读多写少用 RWMutex
- Once 单次执行:确保初始化只执行一次,常用于单例模式
- 信号量限流:限制并发数量,避免资源耗尽
- ErrGroup 错误处理:自动管理 WaitGroup,提供错误处理和取消机制
实战建议:
- 优先使用 Channel,必要时才用锁
- 锁的粒度要小,避免长时间持有
- 使用 defer 确保解锁
- 读多写少用 RWMutex
- 需要错误处理用 ErrGroup
- 限制并发数用信号量
选对工具很重要,不要所有场景都用 Channel,也不要所有场景都用锁。
你在项目中用过哪些同步工具?遇到过什么并发问题?欢迎评论区聊聊。
极客老墨,继续折腾!
练习题
- 使用 WaitGroup 和 Mutex 实现一个并发安全的计数器,1000 个 Goroutine 同时增加
- 使用 RWMutex 实现一个缓存系统,支持并发读写,统计读写次数
- 使用 Once 实现一个配置加载器,确保配置文件只加载一次
- 使用 sync.Map 实现一个并发安全的用户会话管理器
- 使用信号量实现一个连接池,限制最多 5 个并发连接
- 使用 ErrGroup 并发下载 10 个文件,如果任一失败则取消所有下载