大家好,我是极客老墨。
传统语言里写并发,要创建线程、加锁、处理竞态条件,一不小心就死锁。Go 的并发模型完全不同:用 Goroutine 代替线程,用 Channel 代替锁。这种 CSP(通信顺序进程)模型,让并发编程变得简单多了。
这篇就聊聊 Go 的并发基础,看看 Goroutine 和 Channel 是怎么配合工作的。
Goroutine 基础
Goroutine 是 Go 的轻量级协程,比线程轻量得多。
启动 Goroutine
使用 go 关键字启动一个 Goroutine。
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func sayHello() {
9 fmt.Println("Hello from goroutine")
10}
11
12func main() {
13 // 启动一个 Goroutine
14 go sayHello()
15
16 // 主 Goroutine 继续执行
17 fmt.Println("Hello from main")
18
19 // 等待一下,否则程序会立即退出
20 time.Sleep(time.Second)
21}
要点:
go funcName()启动一个新的 Goroutine- Goroutine 是并发执行的,不会阻塞主程序
- main 函数结束时,所有 Goroutine 都会被强制终止
Goroutine 的特点
1func printNumbers() {
2 for i := 1; i <= 5; i++ {
3 fmt.Printf("%d ", i)
4 time.Sleep(100 * time.Millisecond)
5 }
6}
7
8func printLetters() {
9 for i := 'A'; i <= 'E'; i++ {
10 fmt.Printf("%c ", i)
11 time.Sleep(100 * time.Millisecond)
12 }
13}
14
15func main() {
16 go printNumbers()
17 go printLetters()
18
19 time.Sleep(time.Second)
20 fmt.Println("\nDone")
21}
输出可能是:1 A 2 B 3 C 4 D 5 E 或其他顺序(不确定)
要点:
- Goroutine 的执行顺序不确定
- 多个 Goroutine 会并发执行
- 启动成本极低,可以轻松启动成千上万个
匿名函数 Goroutine
1func main() {
2 // 使用匿名函数
3 go func() {
4 fmt.Println("Anonymous goroutine")
5 }()
6
7 // 带参数的匿名函数
8 go func(msg string) {
9 fmt.Println(msg)
10 }("Hello")
11
12 time.Sleep(time.Second)
13}
Channel 基础
Channel 是 Goroutine 之间通信的管道。
创建和使用 Channel
1// 创建一个 int 类型的 channel
2ch := make(chan int)
3
4// 发送数据到 channel
5ch <- 42
6
7// 从 channel 接收数据
8value := <-ch
9
10// 接收数据但不使用
11<-ch
无缓冲 Channel
无缓冲 Channel 是同步的,发送和接收必须同时准备好。
1func main() {
2 ch := make(chan string)
3
4 // 启动一个 Goroutine 接收数据
5 go func() {
6 msg := <-ch
7 fmt.Println("Received:", msg)
8 }()
9
10 // 发送数据
11 ch <- "Hello"
12
13 time.Sleep(time.Second)
14}
要点:
- 发送操作会阻塞,直到有接收方
- 接收操作会阻塞,直到有发送方
- 这是一种同步机制
缓冲 Channel
缓冲 Channel 可以存储一定数量的数据。
1func main() {
2 // 创建容量为 3 的缓冲 channel
3 ch := make(chan int, 3)
4
5 // 发送 3 个数据不会阻塞
6 ch <- 1
7 ch <- 2
8 ch <- 3
9
10 // 接收数据
11 fmt.Println(<-ch) // 1
12 fmt.Println(<-ch) // 2
13 fmt.Println(<-ch) // 3
14}
要点:
- 缓冲区未满时,发送不阻塞
- 缓冲区为空时,接收阻塞
- 缓冲区满时,发送阻塞
关闭 Channel
1func main() {
2 ch := make(chan int, 3)
3
4 // 发送数据
5 ch <- 1
6 ch <- 2
7 ch <- 3
8
9 // 关闭 channel
10 close(ch)
11
12 // 可以继续接收已有数据
13 fmt.Println(<-ch) // 1
14 fmt.Println(<-ch) // 2
15 fmt.Println(<-ch) // 3
16
17 // 从已关闭的 channel 接收,返回零值
18 fmt.Println(<-ch) // 0
19}
要点:
- 关闭后不能再发送数据(会 panic)
- 可以继续接收已有数据
- 接收完后,继续接收会得到零值
检查 Channel 是否关闭
1func main() {
2 ch := make(chan int, 2)
3 ch <- 1
4 ch <- 2
5 close(ch)
6
7 // 使用 ok 检查 channel 是否关闭
8 for {
9 value, ok := <-ch
10 if !ok {
11 fmt.Println("Channel closed")
12 break
13 }
14 fmt.Println("Received:", value)
15 }
16}
Range 遍历 Channel
1func main() {
2 ch := make(chan int, 3)
3 ch <- 1
4 ch <- 2
5 ch <- 3
6 close(ch)
7
8 // range 会自动检测 channel 是否关闭
9 for value := range ch {
10 fmt.Println("Received:", value)
11 }
12}
要点:
range会持续接收数据,直到 channel 关闭- 如果 channel 没有关闭,
range会一直等待
单向 Channel
可以限制 Channel 的方向,提高代码安全性。
定义单向 Channel
1// 只能发送的 channel
2func send(ch chan<- int) {
3 ch <- 42
4}
5
6// 只能接收的 channel
7func receive(ch <-chan int) {
8 value := <-ch
9 fmt.Println(value)
10}
11
12func main() {
13 ch := make(chan int)
14
15 go send(ch)
16 receive(ch)
17}
要点:
chan<- T表示只能发送<-chan T表示只能接收- 双向 channel 可以转换为单向 channel
Select 多路复用
select 用于同时等待多个 Channel 操作。
基本用法
1func main() {
2 ch1 := make(chan string)
3 ch2 := make(chan string)
4
5 go func() {
6 time.Sleep(1 * time.Second)
7 ch1 <- "from ch1"
8 }()
9
10 go func() {
11 time.Sleep(2 * time.Second)
12 ch2 <- "from ch2"
13 }()
14
15 // select 会等待第一个就绪的 case
16 select {
17 case msg1 := <-ch1:
18 fmt.Println(msg1)
19 case msg2 := <-ch2:
20 fmt.Println(msg2)
21 }
22}
要点:
select会阻塞,直到某个 case 可以执行- 如果多个 case 同时就绪,随机选择一个
- 类似于
switch,但专门用于 Channel
超时控制
1func main() {
2 ch := make(chan string)
3
4 go func() {
5 time.Sleep(2 * time.Second)
6 ch <- "result"
7 }()
8
9 select {
10 case result := <-ch:
11 fmt.Println("Got:", result)
12 case <-time.After(1 * time.Second):
13 fmt.Println("Timeout!")
14 }
15}
非阻塞操作
1func main() {
2 ch := make(chan int)
3
4 select {
5 case msg := <-ch:
6 fmt.Println("Received:", msg)
7 default:
8 fmt.Println("No message available")
9 }
10}
要点:
default分支在所有 case 都未就绪时执行- 可以实现非阻塞的 Channel 操作
同时监听多个 Channel
1func main() {
2 ch1 := make(chan int)
3 ch2 := make(chan int)
4 quit := make(chan bool)
5
6 go func() {
7 for i := 0; i < 5; i++ {
8 ch1 <- i
9 time.Sleep(500 * time.Millisecond)
10 }
11 quit <- true
12 }()
13
14 go func() {
15 for i := 0; i < 5; i++ {
16 ch2 <- i * 10
17 time.Sleep(700 * time.Millisecond)
18 }
19 }()
20
21 for {
22 select {
23 case msg1 := <-ch1:
24 fmt.Println("From ch1:", msg1)
25 case msg2 := <-ch2:
26 fmt.Println("From ch2:", msg2)
27 case <-quit:
28 fmt.Println("Quit")
29 return
30 }
31 }
32}
WaitGroup 等待组
sync.WaitGroup 用于等待一组 Goroutine 完成。
基本用法
1import (
2 "fmt"
3 "sync"
4 "time"
5)
6
7func worker(id int, wg *sync.WaitGroup) {
8 defer wg.Done() // 完成时调用
9
10 fmt.Printf("Worker %d starting\n", id)
11 time.Sleep(time.Second)
12 fmt.Printf("Worker %d done\n", id)
13}
14
15func main() {
16 var wg sync.WaitGroup
17
18 for i := 1; i <= 3; i++ {
19 wg.Add(1) // 增加计数
20 go worker(i, &wg)
21 }
22
23 wg.Wait() // 等待所有 Goroutine 完成
24 fmt.Println("All workers done")
25}
要点:
Add(n)增加计数器Done()减少计数器(通常用 defer)Wait()阻塞直到计数器为 0
并发安全
多个 Goroutine 访问共享数据时需要同步。
使用 Mutex
1import (
2 "fmt"
3 "sync"
4)
5
6type Counter struct {
7 mu sync.Mutex
8 value int
9}
10
11func (c *Counter) Increment() {
12 c.mu.Lock()
13 defer c.mu.Unlock()
14 c.value++
15}
16
17func (c *Counter) Value() int {
18 c.mu.Lock()
19 defer c.mu.Unlock()
20 return c.value
21}
22
23func main() {
24 var wg sync.WaitGroup
25 counter := Counter{}
26
27 for i := 0; i < 1000; i++ {
28 wg.Add(1)
29 go func() {
30 defer wg.Done()
31 counter.Increment()
32 }()
33 }
34
35 wg.Wait()
36 fmt.Println("Final value:", counter.Value())
37}
要点:
Mutex提供互斥锁Lock()加锁,Unlock()解锁- 使用
defer确保解锁
使用 Channel 代替锁
1func main() {
2 counter := 0
3 ch := make(chan int)
4
5 // 启动 10 个 Goroutine
6 for i := 0; i < 10; i++ {
7 go func() {
8 for j := 0; j < 100; j++ {
9 ch <- 1
10 }
11 }()
12 }
13
14 // 接收并累加
15 go func() {
16 for v := range ch {
17 counter += v
18 }
19 }()
20
21 time.Sleep(time.Second)
22 fmt.Println("Counter:", counter)
23}
完整示例
把前面的知识点串起来,看个完整的例子:
1package main
2
3import (
4 "fmt"
5 "sync"
6 "time"
7)
8
9// 任务结构
10type Job struct {
11 ID int
12 Number int
13}
14
15// 结果结构
16type Result struct {
17 Job Job
18 Result int
19}
20
21// Worker 函数
22func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
23 defer wg.Done()
24
25 for job := range jobs {
26 fmt.Printf("Worker %d processing job %d\n", id, job.ID)
27 time.Sleep(500 * time.Millisecond) // 模拟耗时操作
28
29 // 计算结果
30 result := Result{
31 Job: job,
32 Result: job.Number * 2,
33 }
34
35 results <- result
36 }
37}
38
39func main() {
40 const numJobs = 5
41 const numWorkers = 3
42
43 jobs := make(chan Job, numJobs)
44 results := make(chan Result, numJobs)
45
46 // 启动 Worker Pool
47 var wg sync.WaitGroup
48 for w := 1; w <= numWorkers; w++ {
49 wg.Add(1)
50 go worker(w, jobs, results, &wg)
51 }
52
53 // 发送任务
54 for j := 1; j <= numJobs; j++ {
55 jobs <- Job{ID: j, Number: j * 10}
56 }
57 close(jobs)
58
59 // 等待所有 Worker 完成
60 go func() {
61 wg.Wait()
62 close(results)
63 }()
64
65 // 收集结果
66 for result := range results {
67 fmt.Printf("Job %d result: %d\n", result.Job.ID, result.Result)
68 }
69
70 fmt.Println("All jobs completed")
71}
这个例子展示了:
- Worker Pool 模式
- 使用 Channel 分发任务和收集结果
- 使用 WaitGroup 等待所有 Worker 完成
- 单向 Channel 提高安全性
- Goroutine 的并发执行
老墨踩过的坑
坑 1:忘记等待 Goroutine 完成
老墨刚学 Go 时,经常写出这样的代码:
1func main() {
2 for i := 0; i < 5; i++ {
3 go func(n int) {
4 fmt.Println(n)
5 }(i)
6 }
7 // ❌ 程序立即退出,Goroutine 还没来得及执行
8}
问题:main 函数结束时,所有 Goroutine 都会被强制终止,可能什么都没打印出来。
正确做法:
1func main() {
2 var wg sync.WaitGroup
3 for i := 0; i < 5; i++ {
4 wg.Add(1)
5 go func(n int) {
6 defer wg.Done()
7 fmt.Println(n)
8 }(i)
9 }
10 wg.Wait() // ✅ 等待所有 Goroutine 完成
11}
坑 2:闭包捕获循环变量
这是最经典的坑,老墨在生产环境踩过:
1func main() {
2 var wg sync.WaitGroup
3 for i := 0; i < 5; i++ {
4 wg.Add(1)
5 go func() {
6 defer wg.Done()
7 fmt.Println(i) // ❌ 所有 Goroutine 可能都打印 5
8 }()
9 }
10 wg.Wait()
11}
问题:所有 Goroutine 共享同一个变量 i,当它们执行时,循环可能已经结束,i 的值是 5。
正确做法:
1// 方式 1:传参
2for i := 0; i < 5; i++ {
3 wg.Add(1)
4 go func(n int) {
5 defer wg.Done()
6 fmt.Println(n) // ✅ 每个 Goroutine 有自己的副本
7 }(i)
8}
9
10// 方式 2:在循环内创建新变量(Go 1.22+)
11for i := 0; i < 5; i++ {
12 i := i // 创建新变量
13 wg.Add(1)
14 go func() {
15 defer wg.Done()
16 fmt.Println(i) // ✅ 捕获的是新变量
17 }()
18}
坑 3:向已关闭的 Channel 发送数据
1func main() {
2 ch := make(chan int, 2)
3 ch <- 1
4 close(ch)
5
6 ch <- 2 // ❌ panic: send on closed channel
7}
教训:关闭 Channel 后不能再发送数据,但可以继续接收。
正确做法:
1func main() {
2 ch := make(chan int, 2)
3 ch <- 1
4 ch <- 2
5 close(ch)
6
7 // ✅ 可以继续接收
8 fmt.Println(<-ch) // 1
9 fmt.Println(<-ch) // 2
10 fmt.Println(<-ch) // 0(零值)
11}
最佳实践:由发送方关闭 Channel,接收方不要关闭。
坑 4:Goroutine 泄漏
老墨曾经写过这样的代码,导致内存泄漏:
1func leak() {
2 ch := make(chan int)
3 go func() {
4 val := <-ch // ❌ 永远阻塞,Goroutine 泄漏
5 fmt.Println(val)
6 }()
7 // 忘记发送数据或关闭 channel
8}
问题:Goroutine 一直等待接收数据,但没有人发送,导致 Goroutine 永远不会结束。
正确做法:
1func noLeak() {
2 ch := make(chan int)
3 go func() {
4 select {
5 case val := <-ch:
6 fmt.Println(val)
7 case <-time.After(5 * time.Second):
8 fmt.Println("Timeout") // ✅ 超时退出
9 }
10 }()
11
12 // 或者确保发送数据
13 ch <- 42
14}
坑 5:无缓冲 Channel 的死锁
1func main() {
2 ch := make(chan int)
3 ch <- 42 // ❌ fatal error: all goroutines are asleep - deadlock!
4 fmt.Println(<-ch)
5}
问题:无缓冲 Channel 的发送操作会阻塞,直到有接收方。但这里没有其他 Goroutine 接收,导致死锁。
正确做法:
1// 方式 1:使用缓冲 Channel
2func main() {
3 ch := make(chan int, 1) // ✅ 缓冲区大小为 1
4 ch <- 42
5 fmt.Println(<-ch)
6}
7
8// 方式 2:在 Goroutine 中发送
9func main() {
10 ch := make(chan int)
11 go func() {
12 ch <- 42 // ✅ 在另一个 Goroutine 中发送
13 }()
14 fmt.Println(<-ch)
15}
实战建议
1. Worker Pool 模式
这是处理大量任务的标准模式:
1func workerPool(numWorkers int, jobs <-chan int, results chan<- int) {
2 var wg sync.WaitGroup
3
4 // 启动固定数量的 Worker
5 for i := 0; i < numWorkers; i++ {
6 wg.Add(1)
7 go func(id int) {
8 defer wg.Done()
9 for job := range jobs {
10 // 处理任务
11 results <- job * 2
12 }
13 }(i)
14 }
15
16 // 等待所有 Worker 完成后关闭结果 Channel
17 go func() {
18 wg.Wait()
19 close(results)
20 }()
21}
22
23func main() {
24 jobs := make(chan int, 100)
25 results := make(chan int, 100)
26
27 // 启动 Worker Pool
28 workerPool(10, jobs, results)
29
30 // 发送任务
31 for i := 1; i <= 100; i++ {
32 jobs <- i
33 }
34 close(jobs)
35
36 // 收集结果
37 for result := range results {
38 fmt.Println(result)
39 }
40}
2. 使用 Context 控制 Goroutine 生命周期
1import "context"
2
3func worker(ctx context.Context, id int) {
4 for {
5 select {
6 case <-ctx.Done():
7 fmt.Printf("Worker %d stopping\n", id)
8 return
9 default:
10 // 执行任务
11 fmt.Printf("Worker %d working\n", id)
12 time.Sleep(time.Second)
13 }
14 }
15}
16
17func main() {
18 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
19 defer cancel()
20
21 for i := 1; i <= 3; i++ {
22 go worker(ctx, i)
23 }
24
25 <-ctx.Done()
26 fmt.Println("All workers stopped")
27}
3. 限流器(Rate Limiter)
1func rateLimiter() {
2 // 每秒最多处理 5 个请求
3 limiter := time.Tick(200 * time.Millisecond)
4
5 requests := make(chan int, 10)
6 for i := 1; i <= 10; i++ {
7 requests <- i
8 }
9 close(requests)
10
11 for req := range requests {
12 <-limiter // 等待限流器
13 fmt.Printf("Processing request %d at %v\n", req, time.Now())
14 }
15}
4. 扇入扇出模式(Fan-in/Fan-out)
1// Fan-out: 一个输入,多个输出
2func fanOut(input <-chan int, workers int) []<-chan int {
3 outputs := make([]<-chan int, workers)
4 for i := 0; i < workers; i++ {
5 outputs[i] = process(input)
6 }
7 return outputs
8}
9
10func process(input <-chan int) <-chan int {
11 output := make(chan int)
12 go func() {
13 defer close(output)
14 for val := range input {
15 output <- val * 2
16 }
17 }()
18 return output
19}
20
21// Fan-in: 多个输入,一个输出
22func fanIn(channels ...<-chan int) <-chan int {
23 output := make(chan int)
24 var wg sync.WaitGroup
25
26 for _, ch := range channels {
27 wg.Add(1)
28 go func(c <-chan int) {
29 defer wg.Done()
30 for val := range c {
31 output <- val
32 }
33 }(ch)
34 }
35
36 go func() {
37 wg.Wait()
38 close(output)
39 }()
40
41 return output
42}
5. 使用 sync.Once 确保只执行一次
1var (
2 instance *Singleton
3 once sync.Once
4)
5
6type Singleton struct {
7 data string
8}
9
10func GetInstance() *Singleton {
11 once.Do(func() {
12 fmt.Println("Creating singleton instance")
13 instance = &Singleton{data: "singleton"}
14 })
15 return instance
16}
17
18func main() {
19 var wg sync.WaitGroup
20 for i := 0; i < 10; i++ {
21 wg.Add(1)
22 go func() {
23 defer wg.Done()
24 GetInstance() // 只会创建一次
25 }()
26 }
27 wg.Wait()
28}
老墨总结
Go 并发编程的 5 个关键点:
- Goroutine 轻量:用
go关键字启动,比线程轻量得多,可以轻松启动成千上万个 - Channel 通信:通过 Channel 在 Goroutine 间传递数据,避免共享内存
- Select 多路复用:同时等待多个 Channel,支持超时和非阻塞操作
- WaitGroup 同步:等待一组 Goroutine 完成,避免主程序提前退出
- 并发安全:使用 Mutex 或 Channel 保护共享数据,避免竞态条件
实战建议:
- 使用 WaitGroup 确保 Goroutine 完成,避免程序提前退出
- 循环中启动 Goroutine 时,传参而不是捕获循环变量
- 由发送方关闭 Channel,接收方不要关闭
- 使用
select和context避免 Goroutine 泄漏 - 优先使用 Channel 而不是锁,“通过通信来共享内存”
练习题
练习题 1:Goroutine 执行顺序(⭐)
编写一个程序,启动 5 个 Goroutine,每个打印自己的 ID 和当前时间,观察执行顺序。
要求:
- 使用 WaitGroup 等待所有 Goroutine 完成
- 观察输出顺序是否确定
- 尝试添加随机延迟
练习题 2:生产者-消费者模式(⭐⭐)
实现一个生产者-消费者模式:一个 Goroutine 生成数据,另一个 Goroutine 消费数据。
要求:
- 使用无缓冲 Channel
- 生产者生成 10 个数据后关闭 Channel
- 消费者使用
range接收数据
练习题 3:任务队列(⭐⭐)
使用 Channel 实现一个简单的任务队列,支持添加任务和获取任务。
要求:
- 定义
Task结构体 - 实现
AddTask和GetTask方法 - 支持并发添加和获取
练习题 4:超时控制(⭐⭐⭐)
编写一个程序,使用 select 实现超时控制,如果 3 秒内没有收到数据就退出。
要求:
- 使用
time.After实现超时 - 模拟一个耗时操作
- 打印超时或成功的消息
练习题 5:Worker Pool(⭐⭐⭐)
实现一个 Worker Pool,处理 100 个任务,使用 10 个 Worker 并发处理。
要求:
- 定义
Job和Result结构体 - 实现 Worker 函数
- 收集所有结果并打印
练习题 6:并发安全计数器(⭐⭐⭐)
使用 Mutex 和 WaitGroup 实现一个并发安全的计数器,1000 个 Goroutine 同时增加计数。
要求:
- 定义
Counter结构体 - 实现
Increment和Value方法 - 验证最终结果是否正确
思考题
- Goroutine 和线程有什么区别? 为什么 Goroutine 可以轻松启动成千上万个?
- 什么时候用缓冲 Channel,什么时候用无缓冲 Channel? 它们的使用场景有什么不同?
- 如何检测和避免 Goroutine 泄漏? 有哪些工具可以帮助排查?
- Channel 和 Mutex 如何选择? “通过通信来共享内存"是否总是最佳实践?
快来评论区秀出你的思考,大家一起讨论!
你在项目中是怎么使用 Goroutine 和 Channel 的?遇到过哪些并发问题?欢迎评论区聊聊。
极客老墨,继续折腾!💪
如果有任何问题,欢迎在评论区留言或关注公众号「极客老墨」交流。
完整示例代码在 go-tutorial-code/10-concurrency。