go 并发
goroutine
goroutine 是 go 并行设计的核心。goroutine 其实就是协程,它比线程更小,十几个 goroutine 可能体现在底层就是五六个线程,可同时运行成千上万个并发任务。goroutine 比 thread 更易用、更高效、更轻便
package main
import (
"fmt"
"runtime"
)
func say(s string) {
for i := 0; i < 5; i++ {
//runtime.Gosched()表示让CPU把时间片让给别人,下次某个时候继续恢复执行该goroutine
runtime.Gosched()
fmt.Println(s)
}
}
func main() {
go say("world")
say("hello")
}
//某一次运行结果,多次运行结果可能不同
//hello
//world
//hello
//world
//hello
//world
//hello
//world
//hello
注意
runtime.Gosched() 表示让 CPU 把时间片让给别人,下次某个时候继续恢复执行该 goroutine
在 Go 1.5 以前调度器仅使用单线程,也就是说只实现了并发。想要发挥多核处理器的并行,需要在我们的程序中显式调用 runtime.GOMAXPROCS(n) 告诉调度器同时使用多个线程。GOMAXPROCS 设置了同时运行逻辑代码的系统线程的最大数量,并返回之前的设置。如果 n < 1,不会改变当前设置
channels
goroutine 运行在相同的地址空间,channels 用于 goroutine 之间通信 必须使用 make 创建 channel
ci := make(chan int)
cs := make(chan string)
cf := make(chan interface{})
channel 通过操作符 <- 来接收和发送数据
package main
import (
"fmt"
_ "runtime"
)
func sum(a []int, c chan int) {
total := 0
for _, v := range a {
total += v
}
c <- total //发送 total 到 channel c
}
func main() {
a := []int{7, 2, 8, -9, 4, 0}
c := make(chan int)
go sum(a[:len(a)/2], c)
go sum(a[len(a)/2:], c)
x, y := <-c, <-c //从 c 中接收数据
fmt.Print(x, y, x+y) //-5 17 12
}
注意 默认情况下(无缓冲),channel 接收和发送数据都是阻塞的,除非另一端已经准备好,这样就使得 Goroutines 同步变的更加的简单,而不需要显式的 lock。所谓阻塞,也就是如果读取(value := <-ch)它将会被阻塞,直到有数据接收。其次,任何发送(ch<-5)将会被阻塞,直到数据被读出
Buffered Channels
默认情况下 channel 是无缓冲的,但是我们也可以指定 channel 的缓冲大小
ch := make(chan type, value)
当 value = 0 时,channel 是无缓冲阻塞读写的,当 value > 0 时,channel 有缓冲、是非阻塞的,直到写满 value 个元素才阻塞写入
package main
import (
"fmt"
_ "runtime"
)
func main() {
x := make(chan int, 2) //修改2为1就报错,修改2为3可以正常运行
x <- 1
x <- 2
//若 channel 通道缓冲大小为 1, 读入第二个元素时若前面读入的元素没有被读出则会报错
fmt.Println(<-x)
fmt.Println(<-x)
}
//1
//2
//修改为缓冲大小为 1 报如下的错误:
//fatal error: all goroutines are asleep - deadlock!
Range 和 Close
为了方便,我们可以通过 range 读取 channel 中的数据。for i := range c 能够不断的读取 channel 里面的数据,直到该 channel 被显式的关闭。生产者通过内置函数 close 关闭 channel。关闭 channel 之后就无法再发送任何数据了,在消费方可以通过语法 v, ok := <-ch 测试 channel 是否被关闭。如果 ok 返回 false,那么说明 channel 已经没有任何数据并且已经被关闭
package main
import (
"fmt"
_ "runtime"
)
func fibonacci(n int, c chan int) {
x, y := 1, 1
for i := 0; i < n; i++ {
c <- x
x, y = y, x+y
}
close(c)
//必须显示的 close, main 中的 range c 会阻塞到 close(c)
}
func main() {
c := make(chan int, 10)
go fibonacci(cap(c), c)
for i := range c {
fmt.Print(i, " ")
}
}
//1 1 2 3 5 8 13 21 34 55
Select
如果存在多个 channel 的时候我们可以通过 select 来处理。Go 里面提供了一个关键字 select,通过 select 可以监听 channel 上的数据流动。select 默认是阻塞的,只有当监听的 channel 中有发送或接收可以进行时才会运行,当多个 channel 都准备好的时候,select 是随机的选择一个执行的
package main
import (
"fmt"
_ "runtime"
)
func fibonacci(c, quit chan int) {
x, y := 1, 1
for {
select {
case c <- x:
x, y = y, x+y
case <-quit:
fmt.Println("quit")
return
}
}
}
func main() {
c := make(chan int)
quit := make(chan int)
go func() {
for i := 0; i < 10; i++ {
fmt.Print(<-c, " ")
}
quit <- 0 //当 quit 中写入数据时, select 运行到第二个分支
}()
fibonacci(c, quit)
}
//1 1 2 3 5 8 13 21 34 55 quit
在 select 里面有类似于 switfch 中的 default 语法,default 就是当监听的 channel 都没有准备好的时候,默认执行的(select 不再阻塞等待 channel)
select {
case i := <-c:
// use i
default:
// 当c阻塞的时候执行这里
}
通过 select 处理因 goroutine 阻塞而死锁的情况
package main
import (
"fmt"
_ "runtime"
"time"
)
func main() {
c := make(chan int)
o := make(chan bool)
go func() {
for {
select {
case v := <-c:
fmt.Println(v)
case <-time.After(5 * time.Second):
fmt.Println("time out")
o <- true
break
}
}
}()
<-o
}
//time out
runtime 中其他处理 goroutine 的函数
- Goexit 退出当前执行的goroutine,但是defer函数还会继续调用
- Gosched 让出当前goroutine的执行权限,调度器安排其他等待的任务运行,并在下次某个时候从该位置恢复执行
- NumCPU 返回 CPU 核数量
- NumGoroutine 返回正在执行和排队的任务总数
- GOMAXPROCS 用来设置可以并行计算的CPU核数的最大值,并返回之前的值