goroutine

goroutine 是 go 并行设计的核心。goroutine 其实就是协程,它比线程更小,十几个 goroutine 可能体现在底层就是五六个线程,可同时运行成千上万个并发任务。goroutine 比 thread 更易用、更高效、更轻便

package main

import (
	"fmt"
	"runtime"
)

func say(s string) {
	for i := 0; i < 5; i++ {
		//runtime.Gosched()表示让CPU把时间片让给别人,下次某个时候继续恢复执行该goroutine
		runtime.Gosched()
		fmt.Println(s)
	}
}

func main() {
	go say("world")
	say("hello")
}

//某一次运行结果,多次运行结果可能不同
//hello
//world
//hello
//world
//hello
//world
//hello
//world
//hello

注意 runtime.Gosched() 表示让 CPU 把时间片让给别人,下次某个时候继续恢复执行该 goroutine
在 Go 1.5 以前调度器仅使用单线程,也就是说只实现了并发。想要发挥多核处理器的并行,需要在我们的程序中显式调用 runtime.GOMAXPROCS(n) 告诉调度器同时使用多个线程。GOMAXPROCS 设置了同时运行逻辑代码的系统线程的最大数量,并返回之前的设置。如果 n < 1,不会改变当前设置

channels

goroutine 运行在相同的地址空间,channels 用于 goroutine 之间通信 必须使用 make 创建 channel

ci := make(chan int)
cs := make(chan string)
cf := make(chan interface{})

channel 通过操作符 <- 来接收和发送数据

package main

import (
	"fmt"
	_ "runtime"
)

func sum(a []int, c chan int) {
	total := 0
	for _, v := range a {
		total += v
	}
	c <- total //发送 total 到 channel c
}

func main() {
	a := []int{7, 2, 8, -9, 4, 0}

	c := make(chan int)

	go sum(a[:len(a)/2], c)
	go sum(a[len(a)/2:], c)

	x, y := <-c, <-c //从 c 中接收数据

	fmt.Print(x, y, x+y) //-5 17 12
}

注意 默认情况下(无缓冲),channel 接收和发送数据都是阻塞的,除非另一端已经准备好,这样就使得 Goroutines 同步变的更加的简单,而不需要显式的 lock。所谓阻塞,也就是如果读取(value := <-ch)它将会被阻塞,直到有数据接收。其次,任何发送(ch<-5)将会被阻塞,直到数据被读出

Buffered Channels

默认情况下 channel 是无缓冲的,但是我们也可以指定 channel 的缓冲大小

ch := make(chan type, value)

当 value = 0 时,channel 是无缓冲阻塞读写的,当 value > 0 时,channel 有缓冲、是非阻塞的,直到写满 value 个元素才阻塞写入

package main

import (
	"fmt"
	_ "runtime"
)

func main() {
	x := make(chan int, 2) //修改2为1就报错,修改2为3可以正常运行

	x <- 1
	x <- 2
	//若 channel 通道缓冲大小为 1, 读入第二个元素时若前面读入的元素没有被读出则会报错
	fmt.Println(<-x)
	fmt.Println(<-x)
}

//1
//2
//修改为缓冲大小为 1 报如下的错误:
//fatal error: all goroutines are asleep - deadlock!
Range 和 Close

为了方便,我们可以通过 range 读取 channel 中的数据。for i := range c 能够不断的读取 channel 里面的数据,直到该 channel 被显式的关闭。生产者通过内置函数 close 关闭 channel。关闭 channel 之后就无法再发送任何数据了,在消费方可以通过语法 v, ok := <-ch 测试 channel 是否被关闭。如果 ok 返回 false,那么说明 channel 已经没有任何数据并且已经被关闭

package main

import (
	"fmt"
	_ "runtime"
)

func fibonacci(n int, c chan int) {
	x, y := 1, 1

	for i := 0; i < n; i++ {
		c <- x
		x, y = y, x+y
	}
	close(c)
	//必须显示的 close, main 中的 range c 会阻塞到 close(c)
}

func main() {
	c := make(chan int, 10)

	go fibonacci(cap(c), c)

	for i := range c {
		fmt.Print(i, " ")
	}
}

//1 1 2 3 5 8 13 21 34 55
Select

如果存在多个 channel 的时候我们可以通过 select 来处理。Go 里面提供了一个关键字 select,通过 select 可以监听 channel 上的数据流动。select 默认是阻塞的,只有当监听的 channel 中有发送或接收可以进行时才会运行,当多个 channel 都准备好的时候,select 是随机的选择一个执行的

package main

import (
	"fmt"
	_ "runtime"
)

func fibonacci(c, quit chan int) {
	x, y := 1, 1
	for {
		select {
		case c <- x:
			x, y = y, x+y
		case <-quit:
			fmt.Println("quit")
			return
		}
	}
}

func main() {
	c := make(chan int)
	quit := make(chan int)

	go func() {
		for i := 0; i < 10; i++ {
			fmt.Print(<-c, " ")
		}
		quit <- 0 //当 quit 中写入数据时, select 运行到第二个分支
	}()

	fibonacci(c, quit)
}

//1 1 2 3 5 8 13 21 34 55 quit

在 select 里面有类似于 switfch 中的 default 语法,default 就是当监听的 channel 都没有准备好的时候,默认执行的(select 不再阻塞等待 channel)

select {
case i := <-c:
    // use i
default:
    // 当c阻塞的时候执行这里
}

通过 select 处理因 goroutine 阻塞而死锁的情况

package main

import (
	"fmt"
	_ "runtime"
	"time"
)

func main() {
	c := make(chan int)
	o := make(chan bool)

	go func() {
		for {
			select {
			case v := <-c:
				fmt.Println(v)
			case <-time.After(5 * time.Second):
				fmt.Println("time out")
				o <- true
				break
			}
		}
	}()
	<-o
}

//time out
runtime 中其他处理 goroutine 的函数
  1. Goexit 退出当前执行的goroutine,但是defer函数还会继续调用
  2. Gosched 让出当前goroutine的执行权限,调度器安排其他等待的任务运行,并在下次某个时候从该位置恢复执行
  3. NumCPU 返回 CPU 核数量
  4. NumGoroutine 返回正在执行和排队的任务总数
  5. GOMAXPROCS 用来设置可以并行计算的CPU核数的最大值,并返回之前的值