大家好,我是极客老墨。

Goroutine 和 Channel 是 Go 并发的基础,但有些场景它们不够用。频繁创建对象导致 GC 压力大?用 sync.Pool。并发读写 map 会 panic?用 sync.Map。简单的计数器用锁太重?用 atomic。

这篇就聊聊 Go 的高级并发工具,看看它们各自适合什么场景。

sync.Pool:对象复用

频繁创建和销毁大对象会给 GC 带来压力,sync.Pool 用于对象复用。

基本用法

 1package main
 2
 3import (
 4    "fmt"
 5    "sync"
 6)
 7
 8// 创建对象池
 9var bufferPool = sync.Pool{
10    // New 函数:池为空时创建新对象
11    New: func() interface{} {
12        fmt.Println("Creating new buffer")
13        return make([]byte, 1024)
14    },
15}
16
17func main() {
18    // 从池中获取对象
19    buf := bufferPool.Get().([]byte)
20    
21    // 使用对象
22    copy(buf, []byte("Hello, World!"))
23    fmt.Println(string(buf[:13]))
24    
25    // 归还到池中
26    bufferPool.Put(buf)
27    
28    // 再次获取(会复用刚才的对象)
29    buf2 := bufferPool.Get().([]byte)
30    fmt.Println(string(buf2[:13]))  // 还是 "Hello, World!"
31}

要点

  • Get() 从池中获取对象
  • Put() 归还对象到池中
  • New 函数在池为空时创建新对象
  • 对象可能被 GC 回收

实际应用

 1import (
 2    "bytes"
 3    "sync"
 4)
 5
 6var bufferPool = sync.Pool{
 7    New: func() interface{} {
 8        return new(bytes.Buffer)
 9    },
10}
11
12func processData(data []byte) string {
13    // 获取 buffer
14    buf := bufferPool.Get().(*bytes.Buffer)
15    defer bufferPool.Put(buf)
16    
17    // 重置 buffer(重要!)
18    buf.Reset()
19    
20    // 使用 buffer
21    buf.Write(data)
22    buf.WriteString(" processed")
23    
24    return buf.String()
25}
26
27func main() {
28    result := processData([]byte("data"))
29    fmt.Println(result)  // data processed
30}

要点

  • 归还前要重置对象状态(Reset()
  • 使用 defer 确保对象归还
  • 适合频繁创建销毁的临时对象

使用场景

适合的场景

  • HTTP 请求的 buffer
  • JSON 编码的 buffer
  • 临时的 byte slice
  • 频繁创建的临时对象

不适合的场景

  • 数据库连接(需要长期保持)
  • 网络连接(需要状态管理)
  • 需要保证存在的对象

注意事项

 1// ❌ 不好:依赖 Pool 中的对象
 2var pool = sync.Pool{
 3    New: func() interface{} {
 4        return &MyObject{}
 5    },
 6}
 7
 8func bad() {
 9    obj := pool.Get().(*MyObject)
10    // 假设对象一直存在,可能被 GC 回收
11}
12
13// ✅ 好:不依赖对象存在
14func good() {
15    obj := pool.Get().(*MyObject)
16    defer pool.Put(obj)
17    
18    // 使用对象
19    // ...
20}

⚠️ 重要:Pool 中的对象随时可能被 GC 回收,不要依赖对象的持久性。

sync.Map:并发安全的 Map

原生 map 不是并发安全的,多个 Goroutine 同时读写会 panic。

问题演示

 1// ❌ 会 panic
 2func badMap() {
 3    m := make(map[string]int)
 4    
 5    go func() {
 6        for i := 0; i < 1000; i++ {
 7            m["key"] = i  // 写
 8        }
 9    }()
10    
11    go func() {
12        for i := 0; i < 1000; i++ {
13            _ = m["key"]  // 读
14        }
15    }()
16    
17    time.Sleep(time.Second)
18}

sync.Map 基本用法

 1import (
 2    "fmt"
 3    "sync"
 4)
 5
 6func main() {
 7    var m sync.Map
 8    
 9    // 存储
10    m.Store("name", "Alice")
11    m.Store("age", 30)
12    
13    // 读取
14    if value, ok := m.Load("name"); ok {
15        fmt.Println("Name:", value)
16    }
17    
18    // 删除
19    m.Delete("age")
20    
21    // 遍历
22    m.Range(func(key, value interface{}) bool {
23        fmt.Printf("%v: %v\n", key, value)
24        return true  // 返回 false 停止遍历
25    })
26}

要点

  • Store(key, value) 存储
  • Load(key) 读取
  • Delete(key) 删除
  • Range(func) 遍历

LoadOrStore

原子性的读取或存储操作。

 1func main() {
 2    var m sync.Map
 3    
 4    // 如果 key 存在,返回现有值
 5    // 如果 key 不存在,存储新值
 6    actual, loaded := m.LoadOrStore("key", "value1")
 7    fmt.Println(actual, loaded)  // value1 false
 8    
 9    // 再次调用,返回已存在的值
10    actual, loaded = m.LoadOrStore("key", "value2")
11    fmt.Println(actual, loaded)  // value1 true
12}

LoadAndDelete

原子性的读取并删除操作。

 1func main() {
 2    var m sync.Map
 3    m.Store("key", "value")
 4    
 5    // 读取并删除
 6    value, loaded := m.LoadAndDelete("key")
 7    fmt.Println(value, loaded)  // value true
 8    
 9    // 再次读取,已经不存在
10    value, loaded = m.Load("key")
11    fmt.Println(value, loaded)  // <nil> false
12}

实际应用

 1// 并发安全的缓存
 2type Cache struct {
 3    data sync.Map
 4}
 5
 6func (c *Cache) Get(key string) (interface{}, bool) {
 7    return c.data.Load(key)
 8}
 9
10func (c *Cache) Set(key string, value interface{}) {
11    c.data.Store(key, value)
12}
13
14func (c *Cache) GetOrSet(key string, value interface{}) interface{} {
15    actual, _ := c.data.LoadOrStore(key, value)
16    return actual
17}
18
19func main() {
20    cache := &Cache{}
21    
22    // 并发写入
23    var wg sync.WaitGroup
24    for i := 0; i < 100; i++ {
25        wg.Add(1)
26        go func(i int) {
27            defer wg.Done()
28            cache.Set(fmt.Sprintf("key%d", i), i)
29        }(i)
30    }
31    
32    wg.Wait()
33    
34    // 读取
35    if value, ok := cache.Get("key0"); ok {
36        fmt.Println("Value:", value)
37    }
38}

使用场景

适合的场景

  • 读多写少的缓存
  • 配置信息存储
  • 并发安全的注册表

不适合的场景

  • 写多读少(性能不如 map + Mutex)
  • 需要复杂操作(如批量更新)
  • 需要类型安全(都是 interface{})

性能对比

 1// 方案 1:map + RWMutex
 2type SafeMap1 struct {
 3    mu sync.RWMutex
 4    m  map[string]int
 5}
 6
 7func (sm *SafeMap1) Get(key string) int {
 8    sm.mu.RLock()
 9    defer sm.mu.RUnlock()
10    return sm.m[key]
11}
12
13// 方案 2:sync.Map
14type SafeMap2 struct {
15    m sync.Map
16}
17
18func (sm *SafeMap2) Get(key string) int {
19    v, _ := sm.m.Load(key)
20    return v.(int)
21}

结论

  • 读多写少:sync.Map 更快
  • 写多读少:map + RWMutex 更快
  • 需要类型安全:map + RWMutex

atomic:原子操作

锁的开销较大,简单的计数器用 atomic 更高效。

基本操作

 1import (
 2    "fmt"
 3    "sync"
 4    "sync/atomic"
 5)
 6
 7func main() {
 8    var count int64
 9    var wg sync.WaitGroup
10    
11    // 1000 个 Goroutine 同时增加
12    for i := 0; i < 1000; i++ {
13        wg.Add(1)
14        go func() {
15            defer wg.Done()
16            atomic.AddInt64(&count, 1)
17        }()
18    }
19    
20    wg.Wait()
21    fmt.Println("Count:", count)  // 1000
22}

要点

  • AddInt64 原子加法
  • 比 Mutex 快得多
  • CPU 指令级别的原子性

常用函数

 1func main() {
 2    var value int64
 3    
 4    // 加法
 5    atomic.AddInt64(&value, 10)
 6    fmt.Println(value)  // 10
 7    
 8    // 减法(加负数)
 9    atomic.AddInt64(&value, -5)
10    fmt.Println(value)  // 5
11    
12    // 读取
13    v := atomic.LoadInt64(&value)
14    fmt.Println(v)  // 5
15    
16    // 存储
17    atomic.StoreInt64(&value, 100)
18    fmt.Println(value)  // 100
19    
20    // 交换
21    old := atomic.SwapInt64(&value, 200)
22    fmt.Println(old, value)  // 100 200
23    
24    // CAS(Compare And Swap)
25    swapped := atomic.CompareAndSwapInt64(&value, 200, 300)
26    fmt.Println(swapped, value)  // true 300
27}

支持的类型

 1// 整数类型
 2var i32 int32
 3var i64 int64
 4var u32 uint32
 5var u64 uint64
 6
 7atomic.AddInt32(&i32, 1)
 8atomic.AddInt64(&i64, 1)
 9atomic.AddUint32(&u32, 1)
10atomic.AddUint64(&u64, 1)
11
12// 指针类型
13var ptr unsafe.Pointer
14atomic.StorePointer(&ptr, unsafe.Pointer(&i64))
15
16// 任意类型(Go 1.19+)
17var value atomic.Value
18value.Store("hello")
19v := value.Load().(string)
20fmt.Println(v)  // hello

atomic.Value

存储任意类型的值。

 1import (
 2    "fmt"
 3    "sync/atomic"
 4)
 5
 6type Config struct {
 7    Host string
 8    Port int
 9}
10
11func main() {
12    var config atomic.Value
13    
14    // 存储
15    config.Store(&Config{
16        Host: "localhost",
17        Port: 8080,
18    })
19    
20    // 读取
21    c := config.Load().(*Config)
22    fmt.Printf("Host: %s, Port: %d\n", c.Host, c.Port)
23    
24    // 更新(原子替换整个对象)
25    config.Store(&Config{
26        Host: "0.0.0.0",
27        Port: 9090,
28    })
29}

要点

  • 可以存储任意类型
  • 原子性替换整个对象
  • 适合配置热更新

实际应用

 1// 并发安全的计数器
 2type Counter struct {
 3    value int64
 4}
 5
 6func (c *Counter) Inc() {
 7    atomic.AddInt64(&c.value, 1)
 8}
 9
10func (c *Counter) Dec() {
11    atomic.AddInt64(&c.value, -1)
12}
13
14func (c *Counter) Value() int64 {
15    return atomic.LoadInt64(&c.value)
16}
17
18// 并发安全的标志位
19type Flag struct {
20    value int32
21}
22
23func (f *Flag) Set() {
24    atomic.StoreInt32(&f.value, 1)
25}
26
27func (f *Flag) Clear() {
28    atomic.StoreInt32(&f.value, 0)
29}
30
31func (f *Flag) IsSet() bool {
32    return atomic.LoadInt32(&f.value) == 1
33}

性能对比

 1import (
 2    "sync"
 3    "sync/atomic"
 4    "testing"
 5)
 6
 7// 使用 Mutex
 8func BenchmarkMutex(b *testing.B) {
 9    var mu sync.Mutex
10    var count int64
11    
12    b.RunParallel(func(pb *testing.PB) {
13        for pb.Next() {
14            mu.Lock()
15            count++
16            mu.Unlock()
17        }
18    })
19}
20
21// 使用 Atomic
22func BenchmarkAtomic(b *testing.B) {
23    var count int64
24    
25    b.RunParallel(func(pb *testing.PB) {
26        for pb.Next() {
27            atomic.AddInt64(&count, 1)
28        }
29    })
30}

结果:atomic 比 Mutex 快 5-10 倍。

数据竞争检测

数据竞争是并发程序最难调试的 Bug。

什么是数据竞争

两个或多个 Goroutine 同时访问同一块内存,且至少有一个在写,没有同步机制。

 1// ❌ 数据竞争
 2func badCode() {
 3    count := 0
 4    
 5    go func() {
 6        count++  // 写
 7    }()
 8    
 9    fmt.Println(count)  // 读
10}

使用 -race 检测

1# 运行时检测
2go run -race main.go
3
4# 测试时检测
5go test -race
6
7# 构建时检测
8go build -race

检测示例

 1// main.go
 2package main
 3
 4import (
 5    "fmt"
 6    "time"
 7)
 8
 9func main() {
10    count := 0
11    
12    go func() {
13        for i := 0; i < 1000; i++ {
14            count++
15        }
16    }()
17    
18    go func() {
19        for i := 0; i < 1000; i++ {
20            count++
21        }
22    }()
23    
24    time.Sleep(time.Second)
25    fmt.Println("Count:", count)
26}

运行检测

 1$ go run -race main.go
 2
 3==================
 4WARNING: DATA RACE
 5Write at 0x00c000018090 by goroutine 7:
 6  main.main.func1()
 7      /path/to/main.go:12 +0x4c
 8
 9Previous write at 0x00c000018090 by goroutine 8:
10  main.main.func2()
11      /path/to/main.go:18 +0x4c
12
13Goroutine 7 (running) created at:
14  main.main()
15      /path/to/main.go:10 +0x84
16==================

修复数据竞争

 1// 方案 1:使用 Mutex
 2func fixWithMutex() {
 3    var mu sync.Mutex
 4    count := 0
 5    
 6    go func() {
 7        for i := 0; i < 1000; i++ {
 8            mu.Lock()
 9            count++
10            mu.Unlock()
11        }
12    }()
13    
14    go func() {
15        for i := 0; i < 1000; i++ {
16            mu.Lock()
17            count++
18            mu.Unlock()
19        }
20    }()
21    
22    time.Sleep(time.Second)
23    fmt.Println("Count:", count)
24}
25
26// 方案 2:使用 Atomic
27func fixWithAtomic() {
28    var count int64
29    
30    go func() {
31        for i := 0; i < 1000; i++ {
32            atomic.AddInt64(&count, 1)
33        }
34    }()
35    
36    go func() {
37        for i := 0; i < 1000; i++ {
38            atomic.AddInt64(&count, 1)
39        }
40    }()
41    
42    time.Sleep(time.Second)
43    fmt.Println("Count:", count)
44}
45
46// 方案 3:使用 Channel
47func fixWithChannel() {
48    ch := make(chan int)
49    count := 0
50    
51    go func() {
52        for i := 0; i < 1000; i++ {
53            ch <- 1
54        }
55    }()
56    
57    go func() {
58        for i := 0; i < 1000; i++ {
59            ch <- 1
60        }
61        close(ch)
62    }()
63    
64    for range ch {
65        count++
66    }
67    
68    fmt.Println("Count:", count)
69}

CI/CD 集成

 1# .github/workflows/test.yml
 2name: Test
 3
 4on: [push, pull_request]
 5
 6jobs:
 7  test:
 8    runs-on: ubuntu-latest
 9    steps:
10      - uses: actions/checkout@v3
11      - uses: actions/setup-go@v4
12        with:
13          go-version: '1.21'
14      
15      - name: Run tests with race detector
16        run: go test -race ./...

最佳实践

1. 选择合适的工具

 1// 简单计数器:使用 atomic
 2var counter int64
 3atomic.AddInt64(&counter, 1)
 4
 5// 复杂状态:使用 Mutex
 6type State struct {
 7    mu   sync.Mutex
 8    data map[string]int
 9}
10
11// 对象复用:使用 Pool
12var bufferPool = sync.Pool{
13    New: func() interface{} {
14        return new(bytes.Buffer)
15    },
16}
17
18// 并发 map:使用 sync.Map
19var cache sync.Map

2. 避免过度优化

 1// ❌ 不好:过度使用 atomic
 2type User struct {
 3    name atomic.Value
 4    age  atomic.Value
 5}
 6
 7// ✅ 好:简单场景用 Mutex
 8type User struct {
 9    mu   sync.Mutex
10    name string
11    age  int
12}

3. 始终使用 -race

1# 开发环境
2go run -race main.go
3
4# 测试环境
5go test -race ./...
6
7# CI/CD
8go test -race -coverprofile=coverage.out ./...

4. 理解性能权衡

  • Channel:最安全,但性能一般
  • Mutex:平衡性能和安全
  • RWMutex:读多写少场景
  • atomic:最快,但功能有限

5. 文档化并发设计

 1// Counter 是一个并发安全的计数器
 2// 使用 atomic 操作保证线程安全
 3type Counter struct {
 4    value int64  // 使用 atomic 操作
 5}
 6
 7// Inc 原子性地增加计数器
 8func (c *Counter) Inc() {
 9    atomic.AddInt64(&c.value, 1)
10}

老墨总结

Go 高级并发工具的 5 个关键点:

  1. sync.Pool:对象复用减轻 GC 压力,适合临时对象,不适合长期资源
  2. sync.Map:并发安全的 map,读多写少场景性能好
  3. atomic:原子操作比锁快,适合简单计数器和标志位
  4. -race:数据竞争检测器,开发和测试必开
  5. 选择合适工具:根据场景选择,不要过度优化

实战建议

  • 优先使用 Channel,必要时才用锁
  • 简单计数器用 atomic,复杂状态用 Mutex
  • 对象复用用 Pool,并发 map 用 sync.Map
  • 始终在测试环境开启 -race
  • 理解每个工具的适用场景和局限

高级并发工具是性能优化的利器,但要在正确的场景使用。


你在项目中用过这些高级并发工具吗?遇到过什么并发问题?欢迎评论区聊聊。

极客老墨,继续折腾!

练习题

  1. 使用 sync.Pool 实现一个 HTTP 请求的 buffer 池,减少内存分配
  2. 使用 sync.Map 实现一个并发安全的缓存系统,支持过期时间
  3. 使用 atomic 实现一个并发安全的限流器(令牌桶算法)
  4. 编写一个有数据竞争的程序,使用 -race 检测并修复
  5. 对比 Mutex、RWMutex、atomic 的性能,编写基准测试
  6. 使用 atomic.Value 实现配置热更新,支持并发读取

相关阅读