大家好,我是极客老墨。

并发编程中,Channel 很好用,但不是万能的。有时候需要更精细的控制:等待一组任务完成、保护共享数据、限制并发数量。这时候就需要 sync 包的同步工具了。

这篇就聊聊 Go 的并发进阶工具,看看它们各自适合什么场景。

WaitGroup:等待组

WaitGroup 用于等待一组 Goroutine 完成,是最常用的同步工具。

基本用法

 1import (
 2    "fmt"
 3    "sync"
 4    "time"
 5)
 6
 7func worker(id int, wg *sync.WaitGroup) {
 8    defer wg.Done() // 完成时调用
 9    
10    fmt.Printf("Worker %d starting\n", id)
11    time.Sleep(time.Second)
12    fmt.Printf("Worker %d done\n", id)
13}
14
15func main() {
16    var wg sync.WaitGroup
17    
18    for i := 1; i <= 5; i++ {
19        wg.Add(1) // 增加计数
20        go worker(i, &wg)
21    }
22    
23    wg.Wait() // 等待所有完成
24    fmt.Println("All workers completed")
25}

要点

  • Add(n) 增加计数器
  • Done() 减少计数器(等价于 Add(-1)
  • Wait() 阻塞直到计数器为 0
  • 必须传递指针 *sync.WaitGroup

常见错误

 1// ❌ 错误:在 goroutine 内部 Add
 2go func() {
 3    wg.Add(1) // 可能在 Wait 之后执行
 4    defer wg.Done()
 5    // ...
 6}()
 7
 8// ✅ 正确:在启动前 Add
 9wg.Add(1)
10go func() {
11    defer wg.Done()
12    // ...
13}()

批量添加

 1func main() {
 2    var wg sync.WaitGroup
 3    
 4    // 一次性添加多个
 5    wg.Add(5)
 6    
 7    for i := 1; i <= 5; i++ {
 8        go func(id int) {
 9            defer wg.Done()
10            fmt.Printf("Task %d\n", id)
11        }(i)
12    }
13    
14    wg.Wait()
15}

Mutex:互斥锁

Mutex 用于保护共享数据,确保同一时间只有一个 Goroutine 访问。

基本用法

 1type SafeCounter struct {
 2    mu    sync.Mutex
 3    count int
 4}
 5
 6func (c *SafeCounter) Inc() {
 7    c.mu.Lock()
 8    defer c.mu.Unlock()
 9    c.count++
10}
11
12func (c *SafeCounter) Value() int {
13    c.mu.Lock()
14    defer c.mu.Unlock()
15    return c.count
16}
17
18func main() {
19    counter := SafeCounter{}
20    var wg sync.WaitGroup
21    
22    for i := 0; i < 1000; i++ {
23        wg.Add(1)
24        go func() {
25            defer wg.Done()
26            counter.Inc()
27        }()
28    }
29    
30    wg.Wait()
31    fmt.Println("Final count:", counter.Value()) // 1000
32}

要点

  • Lock() 加锁,Unlock() 解锁
  • 使用 defer 确保解锁
  • 锁的粒度要小,避免长时间持有

不使用锁的后果

 1// ❌ 不安全:数据竞争
 2type UnsafeCounter struct {
 3    count int
 4}
 5
 6func (c *UnsafeCounter) Inc() {
 7    c.count++ // 多个 goroutine 同时修改
 8}
 9
10// 最终结果可能小于 1000

锁的粒度

 1// ❌ 不好:锁的粒度太大
 2func (c *SafeCounter) Process() {
 3    c.mu.Lock()
 4    defer c.mu.Unlock()
 5    
 6    // 耗时操作
 7    time.Sleep(time.Second)
 8    c.count++
 9}
10
11// ✅ 好:只锁必要的部分
12func (c *SafeCounter) Process() {
13    // 耗时操作在锁外
14    time.Sleep(time.Second)
15    
16    c.mu.Lock()
17    c.count++
18    c.mu.Unlock()
19}

RWMutex:读写锁

RWMutex 允许多个读操作同时进行,但写操作是独占的。

基本用法

 1type Cache struct {
 2    mu   sync.RWMutex
 3    data map[string]string
 4}
 5
 6func (c *Cache) Get(key string) string {
 7    c.mu.RLock()         // 读锁
 8    defer c.mu.RUnlock()
 9    return c.data[key]
10}
11
12func (c *Cache) Set(key, value string) {
13    c.mu.Lock()          // 写锁
14    defer c.mu.Unlock()
15    c.data[key] = value
16}

要点

  • RLock() 读锁,多个 Goroutine 可以同时持有
  • Lock() 写锁,独占访问
  • 读多写少的场景性能更好

性能对比

 1// 读多写少场景
 2func benchmark() {
 3    cache := Cache{data: make(map[string]string)}
 4    cache.Set("key", "value")
 5    
 6    var wg sync.WaitGroup
 7    
 8    // 1000 个读操作(并发执行)
 9    for i := 0; i < 1000; i++ {
10        wg.Add(1)
11        go func() {
12            defer wg.Done()
13            cache.Get("key")
14        }()
15    }
16    
17    // 10 个写操作(独占执行)
18    for i := 0; i < 10; i++ {
19        wg.Add(1)
20        go func(i int) {
21            defer wg.Done()
22            cache.Set("key", fmt.Sprintf("value%d", i))
23        }(i)
24    }
25    
26    wg.Wait()
27}

何时使用

  • ✅ 读操作远多于写操作
  • ✅ 读操作耗时较长
  • ❌ 写操作频繁(用 Mutex 更简单)

Once:单次执行

Once 确保某个操作只执行一次,常用于初始化。

单例模式

 1var (
 2    instance *Singleton
 3    once     sync.Once
 4)
 5
 6type Singleton struct {
 7    data string
 8}
 9
10func GetInstance() *Singleton {
11    once.Do(func() {
12        fmt.Println("Creating instance")
13        instance = &Singleton{data: "singleton"}
14    })
15    return instance
16}
17
18func main() {
19    var wg sync.WaitGroup
20    
21    for i := 0; i < 10; i++ {
22        wg.Add(1)
23        go func() {
24            defer wg.Done()
25            s := GetInstance()
26            fmt.Println(s.data)
27        }()
28    }
29    
30    wg.Wait()
31    // "Creating instance" 只打印一次
32}

配置加载

 1var (
 2    config *Config
 3    once   sync.Once
 4)
 5
 6type Config struct {
 7    Host string
 8    Port int
 9}
10
11func LoadConfig() *Config {
12    once.Do(func() {
13        fmt.Println("Loading config...")
14        config = &Config{
15            Host: "localhost",
16            Port: 8080,
17        }
18    })
19    return config
20}

要点

  • Do(func()) 只执行一次
  • 即使多个 Goroutine 同时调用,也只执行一次
  • 线程安全,无需额外加锁

使用场景

  • 单例模式
  • 配置文件加载
  • 数据库连接初始化
  • 日志初始化

Cond:条件变量

Cond 用于 Goroutine 之间的协调,等待或通知条件发生。

基本用法

 1func main() {
 2    var mu sync.Mutex
 3    cond := sync.NewCond(&mu)
 4    ready := false
 5    
 6    // 等待者
 7    go func() {
 8        mu.Lock()
 9        for !ready {
10            cond.Wait() // 等待条件
11        }
12        fmt.Println("Condition met")
13        mu.Unlock()
14    }()
15    
16    // 通知者
17    time.Sleep(time.Second)
18    mu.Lock()
19    ready = true
20    cond.Signal() // 通知一个等待者
21    mu.Unlock()
22    
23    time.Sleep(time.Second)
24}

生产者-消费者

 1type Queue struct {
 2    mu    sync.Mutex
 3    cond  *sync.Cond
 4    items []int
 5}
 6
 7func NewQueue() *Queue {
 8    q := &Queue{}
 9    q.cond = sync.NewCond(&q.mu)
10    return q
11}
12
13func (q *Queue) Put(item int) {
14    q.mu.Lock()
15    defer q.mu.Unlock()
16    
17    q.items = append(q.items, item)
18    q.cond.Signal() // 通知消费者
19}
20
21func (q *Queue) Get() int {
22    q.mu.Lock()
23    defer q.mu.Unlock()
24    
25    for len(q.items) == 0 {
26        q.cond.Wait() // 等待生产者
27    }
28    
29    item := q.items[0]
30    q.items = q.items[1:]
31    return item
32}

要点

  • Wait() 释放锁并等待,被唤醒后重新获取锁
  • Signal() 唤醒一个等待的 Goroutine
  • Broadcast() 唤醒所有等待的 Goroutine
  • 必须在持有锁的情况下调用

何时使用

  • 需要等待某个条件满足
  • 生产者-消费者模式
  • 事件通知机制

⚠️ 注意:大多数情况下,Channel 更简单易用。

Map:并发安全的 Map

Go 1.9+ 提供了 sync.Map,适合特定场景。

基本用法

 1func main() {
 2    var m sync.Map
 3    
 4    // 存储
 5    m.Store("key1", "value1")
 6    m.Store("key2", "value2")
 7    
 8    // 读取
 9    if value, ok := m.Load("key1"); ok {
10        fmt.Println("Found:", value)
11    }
12    
13    // 删除
14    m.Delete("key2")
15    
16    // 遍历
17    m.Range(func(key, value interface{}) bool {
18        fmt.Printf("%v: %v\n", key, value)
19        return true // 继续遍历
20    })
21}

LoadOrStore

1// 如果存在则返回,否则存储并返回
2actual, loaded := m.LoadOrStore("key", "value")
3if loaded {
4    fmt.Println("Key exists:", actual)
5} else {
6    fmt.Println("Key stored:", actual)
7}

何时使用

  • ✅ 键值对只写入一次,读取多次
  • ✅ 多个 Goroutine 读写不同的键
  • ❌ 频繁修改同一个键(用 map + Mutex 更好)

Pool:对象池

Pool 用于复用对象,减少 GC 压力。

基本用法

 1var bufferPool = sync.Pool{
 2    New: func() interface{} {
 3        return new(bytes.Buffer)
 4    },
 5}
 6
 7func processData(data string) {
 8    // 从池中获取
 9    buf := bufferPool.Get().(*bytes.Buffer)
10    defer bufferPool.Put(buf) // 归还到池
11    
12    buf.Reset() // 重置状态
13    buf.WriteString(data)
14    
15    // 使用 buf
16    fmt.Println(buf.String())
17}

要点

  • Get() 从池中获取对象
  • Put() 归还对象到池
  • New 函数在池为空时创建新对象
  • 对象可能被 GC 回收

使用场景

  • 频繁创建和销毁的对象
  • 对象创建成本高
  • 减少 GC 压力

信号量

信号量用于限制并发数量。

使用 golang.org/x/sync/semaphore

 1import (
 2    "context"
 3    "fmt"
 4    "golang.org/x/sync/semaphore"
 5    "time"
 6)
 7
 8func main() {
 9    // 最多 3 个并发
10    sem := semaphore.NewWeighted(3)
11    ctx := context.Background()
12    
13    for i := 1; i <= 10; i++ {
14        // 获取信号量
15        if err := sem.Acquire(ctx, 1); err != nil {
16            fmt.Println("Failed:", err)
17            break
18        }
19        
20        go func(id int) {
21            defer sem.Release(1) // 释放
22            
23            fmt.Printf("Task %d started\n", id)
24            time.Sleep(time.Second)
25            fmt.Printf("Task %d done\n", id)
26        }(i)
27    }
28    
29    // 等待所有完成
30    sem.Acquire(ctx, 3)
31}

使用场景

  • 限制并发数(如数据库连接池)
  • 控制资源访问(如 API 限流)
  • 避免资源耗尽

ErrGroup:带错误处理

ErrGroup 提供了更方便的错误处理和上下文管理。

基本用法

 1import (
 2    "context"
 3    "fmt"
 4    "golang.org/x/sync/errgroup"
 5    "time"
 6)
 7
 8func fetchURL(url string) error {
 9    time.Sleep(time.Second)
10    if url == "bad-url" {
11        return fmt.Errorf("failed: %s", url)
12    }
13    fmt.Printf("Fetched %s\n", url)
14    return nil
15}
16
17func main() {
18    g, ctx := errgroup.WithContext(context.Background())
19    
20    urls := []string{"url1", "url2", "bad-url", "url3"}
21    
22    for _, url := range urls {
23        url := url // 避免闭包问题
24        g.Go(func() error {
25            select {
26            case <-ctx.Done():
27                return ctx.Err()
28            default:
29                return fetchURL(url)
30            }
31        })
32    }
33    
34    // 等待所有完成,返回第一个错误
35    if err := g.Wait(); err != nil {
36        fmt.Println("Error:", err)
37    }
38}

要点

  • 自动管理 WaitGroup
  • 返回第一个非 nil 错误
  • 提供 Context 取消机制
  • 一个任务失败,其他任务可以通过 Context 感知

限制并发数

 1func main() {
 2    g, ctx := errgroup.WithContext(context.Background())
 3    
 4    // 限制并发数为 3
 5    g.SetLimit(3)
 6    
 7    for i := 1; i <= 10; i++ {
 8        i := i
 9        g.Go(func() error {
10            fmt.Printf("Task %d\n", i)
11            time.Sleep(time.Second)
12            return nil
13        })
14    }
15    
16    g.Wait()
17}

完整示例

把前面的知识点串起来,看个完整的例子:

  1package main
  2
  3import (
  4    "context"
  5    "fmt"
  6    "golang.org/x/sync/errgroup"
  7    "golang.org/x/sync/semaphore"
  8    "sync"
  9    "time"
 10)
 11
 12// 任务结果
 13type Result struct {
 14    ID    int
 15    Value string
 16}
 17
 18// 并发安全的结果收集器
 19type ResultCollector struct {
 20    mu      sync.RWMutex
 21    results []Result
 22}
 23
 24func (rc *ResultCollector) Add(r Result) {
 25    rc.mu.Lock()
 26    defer rc.mu.Unlock()
 27    rc.results = append(rc.results, r)
 28}
 29
 30func (rc *ResultCollector) GetAll() []Result {
 31    rc.mu.RLock()
 32    defer rc.mu.RUnlock()
 33    return rc.results
 34}
 35
 36// 配置(单例)
 37var (
 38    config *Config
 39    once   sync.Once
 40)
 41
 42type Config struct {
 43    MaxConcurrency int64
 44}
 45
 46func GetConfig() *Config {
 47    once.Do(func() {
 48        config = &Config{MaxConcurrency: 3}
 49    })
 50    return config
 51}
 52
 53// 处理任务
 54func processTask(id int, sem *semaphore.Weighted) (Result, error) {
 55    ctx := context.Background()
 56    
 57    // 获取信号量
 58    if err := sem.Acquire(ctx, 1); err != nil {
 59        return Result{}, err
 60    }
 61    defer sem.Release(1)
 62    
 63    // 模拟处理
 64    time.Sleep(500 * time.Millisecond)
 65    
 66    if id == 5 {
 67        return Result{}, fmt.Errorf("task %d failed", id)
 68    }
 69    
 70    return Result{
 71        ID:    id,
 72        Value: fmt.Sprintf("result-%d", id),
 73    }, nil
 74}
 75
 76func main() {
 77    cfg := GetConfig()
 78    collector := &ResultCollector{}
 79    
 80    // 创建信号量
 81    sem := semaphore.NewWeighted(cfg.MaxConcurrency)
 82    
 83    // 使用 errgroup
 84    g, _ := errgroup.WithContext(context.Background())
 85    
 86    // 启动 10 个任务
 87    for i := 1; i <= 10; i++ {
 88        i := i
 89        g.Go(func() error {
 90            result, err := processTask(i, sem)
 91            if err != nil {
 92                return err
 93            }
 94            collector.Add(result)
 95            return nil
 96        })
 97    }
 98    
 99    // 等待所有完成
100    if err := g.Wait(); err != nil {
101        fmt.Println("Error:", err)
102    }
103    
104    // 打印结果
105    results := collector.GetAll()
106    fmt.Printf("Collected %d results\n", len(results))
107    for _, r := range results {
108        fmt.Printf("Task %d: %s\n", r.ID, r.Value)
109    }
110}

这个例子展示了:

  • Once 实现单例配置
  • RWMutex 保护结果收集器
  • Semaphore 限制并发数
  • ErrGroup 管理任务和错误
  • 多种同步工具的组合使用

老墨总结

Go 并发同步工具的 5 个关键点:

  1. WaitGroup 等待组:等待一组 Goroutine 完成,Add 要在 Wait 之前
  2. Mutex/RWMutex 锁:保护共享数据,读多写少用 RWMutex
  3. Once 单次执行:确保初始化只执行一次,常用于单例模式
  4. 信号量限流:限制并发数量,避免资源耗尽
  5. ErrGroup 错误处理:自动管理 WaitGroup,提供错误处理和取消机制

实战建议

  • 优先使用 Channel,必要时才用锁
  • 锁的粒度要小,避免长时间持有
  • 使用 defer 确保解锁
  • 读多写少用 RWMutex
  • 需要错误处理用 ErrGroup
  • 限制并发数用信号量

选对工具很重要,不要所有场景都用 Channel,也不要所有场景都用锁。


你在项目中用过哪些同步工具?遇到过什么并发问题?欢迎评论区聊聊。

极客老墨,继续折腾!

练习题

  1. 使用 WaitGroup 和 Mutex 实现一个并发安全的计数器,1000 个 Goroutine 同时增加
  2. 使用 RWMutex 实现一个缓存系统,支持并发读写,统计读写次数
  3. 使用 Once 实现一个配置加载器,确保配置文件只加载一次
  4. 使用 sync.Map 实现一个并发安全的用户会话管理器
  5. 使用信号量实现一个连接池,限制最多 5 个并发连接
  6. 使用 ErrGroup 并发下载 10 个文件,如果任一失败则取消所有下载

相关阅读