大家好,我是极客老墨。

传统语言里写并发,要创建线程、加锁、处理竞态条件,一不小心就死锁。Go 的并发模型完全不同:用 Goroutine 代替线程,用 Channel 代替锁。这种 CSP(通信顺序进程)模型,让并发编程变得简单多了。

这篇就聊聊 Go 的并发基础,看看 Goroutine 和 Channel 是怎么配合工作的。

Goroutine 基础

Goroutine 是 Go 的轻量级协程,比线程轻量得多。

启动 Goroutine

使用 go 关键字启动一个 Goroutine。

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6)
 7
 8func sayHello() {
 9    fmt.Println("Hello from goroutine")
10}
11
12func main() {
13    // 启动一个 Goroutine
14    go sayHello()
15
16    // 主 Goroutine 继续执行
17    fmt.Println("Hello from main")
18
19    // 等待一下,否则程序会立即退出
20    time.Sleep(time.Second)
21}

要点

  • go funcName() 启动一个新的 Goroutine
  • Goroutine 是并发执行的,不会阻塞主程序
  • main 函数结束时,所有 Goroutine 都会被强制终止

Goroutine 的特点

 1func printNumbers() {
 2    for i := 1; i <= 5; i++ {
 3        fmt.Printf("%d ", i)
 4        time.Sleep(100 * time.Millisecond)
 5    }
 6}
 7
 8func printLetters() {
 9    for i := 'A'; i <= 'E'; i++ {
10        fmt.Printf("%c ", i)
11        time.Sleep(100 * time.Millisecond)
12    }
13}
14
15func main() {
16    go printNumbers()
17    go printLetters()
18
19    time.Sleep(time.Second)
20    fmt.Println("\nDone")
21}

输出可能是1 A 2 B 3 C 4 D 5 E 或其他顺序(不确定)

要点

  • Goroutine 的执行顺序不确定
  • 多个 Goroutine 会并发执行
  • 启动成本极低,可以轻松启动成千上万个

匿名函数 Goroutine

 1func main() {
 2    // 使用匿名函数
 3    go func() {
 4        fmt.Println("Anonymous goroutine")
 5    }()
 6
 7    // 带参数的匿名函数
 8    go func(msg string) {
 9        fmt.Println(msg)
10    }("Hello")
11
12    time.Sleep(time.Second)
13}

Channel 基础

Channel 是 Goroutine 之间通信的管道。

创建和使用 Channel

 1// 创建一个 int 类型的 channel
 2ch := make(chan int)
 3
 4// 发送数据到 channel
 5ch <- 42
 6
 7// 从 channel 接收数据
 8value := <-ch
 9
10// 接收数据但不使用
11<-ch

无缓冲 Channel

无缓冲 Channel 是同步的,发送和接收必须同时准备好。

 1func main() {
 2    ch := make(chan string)
 3
 4    // 启动一个 Goroutine 接收数据
 5    go func() {
 6        msg := <-ch
 7        fmt.Println("Received:", msg)
 8    }()
 9
10    // 发送数据
11    ch <- "Hello"
12
13    time.Sleep(time.Second)
14}

要点

  • 发送操作会阻塞,直到有接收方
  • 接收操作会阻塞,直到有发送方
  • 这是一种同步机制

缓冲 Channel

缓冲 Channel 可以存储一定数量的数据。

 1func main() {
 2    // 创建容量为 3 的缓冲 channel
 3    ch := make(chan int, 3)
 4
 5    // 发送 3 个数据不会阻塞
 6    ch <- 1
 7    ch <- 2
 8    ch <- 3
 9
10    // 接收数据
11    fmt.Println(<-ch) // 1
12    fmt.Println(<-ch) // 2
13    fmt.Println(<-ch) // 3
14}

要点

  • 缓冲区未满时,发送不阻塞
  • 缓冲区为空时,接收阻塞
  • 缓冲区满时,发送阻塞

关闭 Channel

 1func main() {
 2    ch := make(chan int, 3)
 3
 4    // 发送数据
 5    ch <- 1
 6    ch <- 2
 7    ch <- 3
 8
 9    // 关闭 channel
10    close(ch)
11
12    // 可以继续接收已有数据
13    fmt.Println(<-ch) // 1
14    fmt.Println(<-ch) // 2
15    fmt.Println(<-ch) // 3
16
17    // 从已关闭的 channel 接收,返回零值
18    fmt.Println(<-ch) // 0
19}

要点

  • 关闭后不能再发送数据(会 panic)
  • 可以继续接收已有数据
  • 接收完后,继续接收会得到零值

检查 Channel 是否关闭

 1func main() {
 2    ch := make(chan int, 2)
 3    ch <- 1
 4    ch <- 2
 5    close(ch)
 6
 7    // 使用 ok 检查 channel 是否关闭
 8    for {
 9        value, ok := <-ch
10        if !ok {
11            fmt.Println("Channel closed")
12            break
13        }
14        fmt.Println("Received:", value)
15    }
16}

Range 遍历 Channel

 1func main() {
 2    ch := make(chan int, 3)
 3    ch <- 1
 4    ch <- 2
 5    ch <- 3
 6    close(ch)
 7
 8    // range 会自动检测 channel 是否关闭
 9    for value := range ch {
10        fmt.Println("Received:", value)
11    }
12}

要点

  • range 会持续接收数据,直到 channel 关闭
  • 如果 channel 没有关闭,range 会一直等待

单向 Channel

可以限制 Channel 的方向,提高代码安全性。

定义单向 Channel

 1// 只能发送的 channel
 2func send(ch chan<- int) {
 3    ch <- 42
 4}
 5
 6// 只能接收的 channel
 7func receive(ch <-chan int) {
 8    value := <-ch
 9    fmt.Println(value)
10}
11
12func main() {
13    ch := make(chan int)
14
15    go send(ch)
16    receive(ch)
17}

要点

  • chan<- T 表示只能发送
  • <-chan T 表示只能接收
  • 双向 channel 可以转换为单向 channel

Select 多路复用

select 用于同时等待多个 Channel 操作。

基本用法

 1func main() {
 2    ch1 := make(chan string)
 3    ch2 := make(chan string)
 4
 5    go func() {
 6        time.Sleep(1 * time.Second)
 7        ch1 <- "from ch1"
 8    }()
 9
10    go func() {
11        time.Sleep(2 * time.Second)
12        ch2 <- "from ch2"
13    }()
14
15    // select 会等待第一个就绪的 case
16    select {
17    case msg1 := <-ch1:
18        fmt.Println(msg1)
19    case msg2 := <-ch2:
20        fmt.Println(msg2)
21    }
22}

要点

  • select 会阻塞,直到某个 case 可以执行
  • 如果多个 case 同时就绪,随机选择一个
  • 类似于 switch,但专门用于 Channel

超时控制

 1func main() {
 2    ch := make(chan string)
 3
 4    go func() {
 5        time.Sleep(2 * time.Second)
 6        ch <- "result"
 7    }()
 8
 9    select {
10    case result := <-ch:
11        fmt.Println("Got:", result)
12    case <-time.After(1 * time.Second):
13        fmt.Println("Timeout!")
14    }
15}

非阻塞操作

 1func main() {
 2    ch := make(chan int)
 3
 4    select {
 5    case msg := <-ch:
 6        fmt.Println("Received:", msg)
 7    default:
 8        fmt.Println("No message available")
 9    }
10}

要点

  • default 分支在所有 case 都未就绪时执行
  • 可以实现非阻塞的 Channel 操作

同时监听多个 Channel

 1func main() {
 2    ch1 := make(chan int)
 3    ch2 := make(chan int)
 4    quit := make(chan bool)
 5
 6    go func() {
 7        for i := 0; i < 5; i++ {
 8            ch1 <- i
 9            time.Sleep(500 * time.Millisecond)
10        }
11        quit <- true
12    }()
13
14    go func() {
15        for i := 0; i < 5; i++ {
16            ch2 <- i * 10
17            time.Sleep(700 * time.Millisecond)
18        }
19    }()
20
21    for {
22        select {
23        case msg1 := <-ch1:
24            fmt.Println("From ch1:", msg1)
25        case msg2 := <-ch2:
26            fmt.Println("From ch2:", msg2)
27        case <-quit:
28            fmt.Println("Quit")
29            return
30        }
31    }
32}

WaitGroup 等待组

sync.WaitGroup 用于等待一组 Goroutine 完成。

基本用法

 1import (
 2    "fmt"
 3    "sync"
 4    "time"
 5)
 6
 7func worker(id int, wg *sync.WaitGroup) {
 8    defer wg.Done() // 完成时调用
 9
10    fmt.Printf("Worker %d starting\n", id)
11    time.Sleep(time.Second)
12    fmt.Printf("Worker %d done\n", id)
13}
14
15func main() {
16    var wg sync.WaitGroup
17
18    for i := 1; i <= 3; i++ {
19        wg.Add(1) // 增加计数
20        go worker(i, &wg)
21    }
22
23    wg.Wait() // 等待所有 Goroutine 完成
24    fmt.Println("All workers done")
25}

要点

  • Add(n) 增加计数器
  • Done() 减少计数器(通常用 defer)
  • Wait() 阻塞直到计数器为 0

并发安全

多个 Goroutine 访问共享数据时需要同步。

使用 Mutex

 1import (
 2    "fmt"
 3    "sync"
 4)
 5
 6type Counter struct {
 7    mu    sync.Mutex
 8    value int
 9}
10
11func (c *Counter) Increment() {
12    c.mu.Lock()
13    defer c.mu.Unlock()
14    c.value++
15}
16
17func (c *Counter) Value() int {
18    c.mu.Lock()
19    defer c.mu.Unlock()
20    return c.value
21}
22
23func main() {
24    var wg sync.WaitGroup
25    counter := Counter{}
26
27    for i := 0; i < 1000; i++ {
28        wg.Add(1)
29        go func() {
30            defer wg.Done()
31            counter.Increment()
32        }()
33    }
34
35    wg.Wait()
36    fmt.Println("Final value:", counter.Value())
37}

要点

  • Mutex 提供互斥锁
  • Lock() 加锁,Unlock() 解锁
  • 使用 defer 确保解锁

使用 Channel 代替锁

 1func main() {
 2    counter := 0
 3    ch := make(chan int)
 4
 5    // 启动 10 个 Goroutine
 6    for i := 0; i < 10; i++ {
 7        go func() {
 8            for j := 0; j < 100; j++ {
 9                ch <- 1
10            }
11        }()
12    }
13
14    // 接收并累加
15    go func() {
16        for v := range ch {
17            counter += v
18        }
19    }()
20
21    time.Sleep(time.Second)
22    fmt.Println("Counter:", counter)
23}

完整示例

把前面的知识点串起来,看个完整的例子:

 1package main
 2
 3import (
 4    "fmt"
 5    "sync"
 6    "time"
 7)
 8
 9// 任务结构
10type Job struct {
11    ID     int
12    Number int
13}
14
15// 结果结构
16type Result struct {
17    Job    Job
18    Result int
19}
20
21// Worker 函数
22func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
23    defer wg.Done()
24
25    for job := range jobs {
26        fmt.Printf("Worker %d processing job %d\n", id, job.ID)
27        time.Sleep(500 * time.Millisecond) // 模拟耗时操作
28
29        // 计算结果
30        result := Result{
31            Job:    job,
32            Result: job.Number * 2,
33        }
34
35        results <- result
36    }
37}
38
39func main() {
40    const numJobs = 5
41    const numWorkers = 3
42
43    jobs := make(chan Job, numJobs)
44    results := make(chan Result, numJobs)
45
46    // 启动 Worker Pool
47    var wg sync.WaitGroup
48    for w := 1; w <= numWorkers; w++ {
49        wg.Add(1)
50        go worker(w, jobs, results, &wg)
51    }
52
53    // 发送任务
54    for j := 1; j <= numJobs; j++ {
55        jobs <- Job{ID: j, Number: j * 10}
56    }
57    close(jobs)
58
59    // 等待所有 Worker 完成
60    go func() {
61        wg.Wait()
62        close(results)
63    }()
64
65    // 收集结果
66    for result := range results {
67        fmt.Printf("Job %d result: %d\n", result.Job.ID, result.Result)
68    }
69
70    fmt.Println("All jobs completed")
71}

这个例子展示了:

  • Worker Pool 模式
  • 使用 Channel 分发任务和收集结果
  • 使用 WaitGroup 等待所有 Worker 完成
  • 单向 Channel 提高安全性
  • Goroutine 的并发执行

老墨踩过的坑

坑 1:忘记等待 Goroutine 完成

老墨刚学 Go 时,经常写出这样的代码:

1func main() {
2    for i := 0; i < 5; i++ {
3        go func(n int) {
4            fmt.Println(n)
5        }(i)
6    }
7    // ❌ 程序立即退出,Goroutine 还没来得及执行
8}

问题:main 函数结束时,所有 Goroutine 都会被强制终止,可能什么都没打印出来。

正确做法

 1func main() {
 2    var wg sync.WaitGroup
 3    for i := 0; i < 5; i++ {
 4        wg.Add(1)
 5        go func(n int) {
 6            defer wg.Done()
 7            fmt.Println(n)
 8        }(i)
 9    }
10    wg.Wait() // ✅ 等待所有 Goroutine 完成
11}

坑 2:闭包捕获循环变量

这是最经典的坑,老墨在生产环境踩过:

 1func main() {
 2    var wg sync.WaitGroup
 3    for i := 0; i < 5; i++ {
 4        wg.Add(1)
 5        go func() {
 6            defer wg.Done()
 7            fmt.Println(i) // ❌ 所有 Goroutine 可能都打印 5
 8        }()
 9    }
10    wg.Wait()
11}

问题:所有 Goroutine 共享同一个变量 i,当它们执行时,循环可能已经结束,i 的值是 5。

正确做法

 1// 方式 1:传参
 2for i := 0; i < 5; i++ {
 3    wg.Add(1)
 4    go func(n int) {
 5        defer wg.Done()
 6        fmt.Println(n) // ✅ 每个 Goroutine 有自己的副本
 7    }(i)
 8}
 9
10// 方式 2:在循环内创建新变量(Go 1.22+)
11for i := 0; i < 5; i++ {
12    i := i // 创建新变量
13    wg.Add(1)
14    go func() {
15        defer wg.Done()
16        fmt.Println(i) // ✅ 捕获的是新变量
17    }()
18}

坑 3:向已关闭的 Channel 发送数据

1func main() {
2    ch := make(chan int, 2)
3    ch <- 1
4    close(ch)
5
6    ch <- 2 // ❌ panic: send on closed channel
7}

教训:关闭 Channel 后不能再发送数据,但可以继续接收。

正确做法

 1func main() {
 2    ch := make(chan int, 2)
 3    ch <- 1
 4    ch <- 2
 5    close(ch)
 6
 7    // ✅ 可以继续接收
 8    fmt.Println(<-ch) // 1
 9    fmt.Println(<-ch) // 2
10    fmt.Println(<-ch) // 0(零值)
11}

最佳实践:由发送方关闭 Channel,接收方不要关闭。

坑 4:Goroutine 泄漏

老墨曾经写过这样的代码,导致内存泄漏:

1func leak() {
2    ch := make(chan int)
3    go func() {
4        val := <-ch // ❌ 永远阻塞,Goroutine 泄漏
5        fmt.Println(val)
6    }()
7    // 忘记发送数据或关闭 channel
8}

问题:Goroutine 一直等待接收数据,但没有人发送,导致 Goroutine 永远不会结束。

正确做法

 1func noLeak() {
 2    ch := make(chan int)
 3    go func() {
 4        select {
 5        case val := <-ch:
 6            fmt.Println(val)
 7        case <-time.After(5 * time.Second):
 8            fmt.Println("Timeout") // ✅ 超时退出
 9        }
10    }()
11
12    // 或者确保发送数据
13    ch <- 42
14}

坑 5:无缓冲 Channel 的死锁

1func main() {
2    ch := make(chan int)
3    ch <- 42 // ❌ fatal error: all goroutines are asleep - deadlock!
4    fmt.Println(<-ch)
5}

问题:无缓冲 Channel 的发送操作会阻塞,直到有接收方。但这里没有其他 Goroutine 接收,导致死锁。

正确做法

 1// 方式 1:使用缓冲 Channel
 2func main() {
 3    ch := make(chan int, 1) // ✅ 缓冲区大小为 1
 4    ch <- 42
 5    fmt.Println(<-ch)
 6}
 7
 8// 方式 2:在 Goroutine 中发送
 9func main() {
10    ch := make(chan int)
11    go func() {
12        ch <- 42 // ✅ 在另一个 Goroutine 中发送
13    }()
14    fmt.Println(<-ch)
15}

实战建议

1. Worker Pool 模式

这是处理大量任务的标准模式:

 1func workerPool(numWorkers int, jobs <-chan int, results chan<- int) {
 2    var wg sync.WaitGroup
 3
 4    // 启动固定数量的 Worker
 5    for i := 0; i < numWorkers; i++ {
 6        wg.Add(1)
 7        go func(id int) {
 8            defer wg.Done()
 9            for job := range jobs {
10                // 处理任务
11                results <- job * 2
12            }
13        }(i)
14    }
15
16    // 等待所有 Worker 完成后关闭结果 Channel
17    go func() {
18        wg.Wait()
19        close(results)
20    }()
21}
22
23func main() {
24    jobs := make(chan int, 100)
25    results := make(chan int, 100)
26
27    // 启动 Worker Pool
28    workerPool(10, jobs, results)
29
30    // 发送任务
31    for i := 1; i <= 100; i++ {
32        jobs <- i
33    }
34    close(jobs)
35
36    // 收集结果
37    for result := range results {
38        fmt.Println(result)
39    }
40}

2. 使用 Context 控制 Goroutine 生命周期

 1import "context"
 2
 3func worker(ctx context.Context, id int) {
 4    for {
 5        select {
 6        case <-ctx.Done():
 7            fmt.Printf("Worker %d stopping\n", id)
 8            return
 9        default:
10            // 执行任务
11            fmt.Printf("Worker %d working\n", id)
12            time.Sleep(time.Second)
13        }
14    }
15}
16
17func main() {
18    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
19    defer cancel()
20
21    for i := 1; i <= 3; i++ {
22        go worker(ctx, i)
23    }
24
25    <-ctx.Done()
26    fmt.Println("All workers stopped")
27}

3. 限流器(Rate Limiter)

 1func rateLimiter() {
 2    // 每秒最多处理 5 个请求
 3    limiter := time.Tick(200 * time.Millisecond)
 4
 5    requests := make(chan int, 10)
 6    for i := 1; i <= 10; i++ {
 7        requests <- i
 8    }
 9    close(requests)
10
11    for req := range requests {
12        <-limiter // 等待限流器
13        fmt.Printf("Processing request %d at %v\n", req, time.Now())
14    }
15}

4. 扇入扇出模式(Fan-in/Fan-out)

 1// Fan-out: 一个输入,多个输出
 2func fanOut(input <-chan int, workers int) []<-chan int {
 3    outputs := make([]<-chan int, workers)
 4    for i := 0; i < workers; i++ {
 5        outputs[i] = process(input)
 6    }
 7    return outputs
 8}
 9
10func process(input <-chan int) <-chan int {
11    output := make(chan int)
12    go func() {
13        defer close(output)
14        for val := range input {
15            output <- val * 2
16        }
17    }()
18    return output
19}
20
21// Fan-in: 多个输入,一个输出
22func fanIn(channels ...<-chan int) <-chan int {
23    output := make(chan int)
24    var wg sync.WaitGroup
25
26    for _, ch := range channels {
27        wg.Add(1)
28        go func(c <-chan int) {
29            defer wg.Done()
30            for val := range c {
31                output <- val
32            }
33        }(ch)
34    }
35
36    go func() {
37        wg.Wait()
38        close(output)
39    }()
40
41    return output
42}

5. 使用 sync.Once 确保只执行一次

 1var (
 2    instance *Singleton
 3    once     sync.Once
 4)
 5
 6type Singleton struct {
 7    data string
 8}
 9
10func GetInstance() *Singleton {
11    once.Do(func() {
12        fmt.Println("Creating singleton instance")
13        instance = &Singleton{data: "singleton"}
14    })
15    return instance
16}
17
18func main() {
19    var wg sync.WaitGroup
20    for i := 0; i < 10; i++ {
21        wg.Add(1)
22        go func() {
23            defer wg.Done()
24            GetInstance() // 只会创建一次
25        }()
26    }
27    wg.Wait()
28}

老墨总结

Go 并发编程的 5 个关键点:

  1. Goroutine 轻量:用 go 关键字启动,比线程轻量得多,可以轻松启动成千上万个
  2. Channel 通信:通过 Channel 在 Goroutine 间传递数据,避免共享内存
  3. Select 多路复用:同时等待多个 Channel,支持超时和非阻塞操作
  4. WaitGroup 同步:等待一组 Goroutine 完成,避免主程序提前退出
  5. 并发安全:使用 Mutex 或 Channel 保护共享数据,避免竞态条件

实战建议

  • 使用 WaitGroup 确保 Goroutine 完成,避免程序提前退出
  • 循环中启动 Goroutine 时,传参而不是捕获循环变量
  • 由发送方关闭 Channel,接收方不要关闭
  • 使用 selectcontext 避免 Goroutine 泄漏
  • 优先使用 Channel 而不是锁,“通过通信来共享内存”

练习题

练习题 1:Goroutine 执行顺序(⭐)

编写一个程序,启动 5 个 Goroutine,每个打印自己的 ID 和当前时间,观察执行顺序。

要求:

  • 使用 WaitGroup 等待所有 Goroutine 完成
  • 观察输出顺序是否确定
  • 尝试添加随机延迟

练习题 2:生产者-消费者模式(⭐⭐)

实现一个生产者-消费者模式:一个 Goroutine 生成数据,另一个 Goroutine 消费数据。

要求:

  • 使用无缓冲 Channel
  • 生产者生成 10 个数据后关闭 Channel
  • 消费者使用 range 接收数据

练习题 3:任务队列(⭐⭐)

使用 Channel 实现一个简单的任务队列,支持添加任务和获取任务。

要求:

  • 定义 Task 结构体
  • 实现 AddTaskGetTask 方法
  • 支持并发添加和获取

练习题 4:超时控制(⭐⭐⭐)

编写一个程序,使用 select 实现超时控制,如果 3 秒内没有收到数据就退出。

要求:

  • 使用 time.After 实现超时
  • 模拟一个耗时操作
  • 打印超时或成功的消息

练习题 5:Worker Pool(⭐⭐⭐)

实现一个 Worker Pool,处理 100 个任务,使用 10 个 Worker 并发处理。

要求:

  • 定义 JobResult 结构体
  • 实现 Worker 函数
  • 收集所有结果并打印

练习题 6:并发安全计数器(⭐⭐⭐)

使用 Mutex 和 WaitGroup 实现一个并发安全的计数器,1000 个 Goroutine 同时增加计数。

要求:

  • 定义 Counter 结构体
  • 实现 IncrementValue 方法
  • 验证最终结果是否正确

思考题

  1. Goroutine 和线程有什么区别? 为什么 Goroutine 可以轻松启动成千上万个?
  2. 什么时候用缓冲 Channel,什么时候用无缓冲 Channel? 它们的使用场景有什么不同?
  3. 如何检测和避免 Goroutine 泄漏? 有哪些工具可以帮助排查?
  4. Channel 和 Mutex 如何选择? “通过通信来共享内存"是否总是最佳实践?

快来评论区秀出你的思考,大家一起讨论!


你在项目中是怎么使用 Goroutine 和 Channel 的?遇到过哪些并发问题?欢迎评论区聊聊。

极客老墨,继续折腾!💪

如果有任何问题,欢迎在评论区留言或关注公众号「极客老墨」交流。

完整示例代码在 go-tutorial-code/10-concurrency


相关阅读