> 文章列表 > 【Go】六、并发编程

【Go】六、并发编程

【Go】六、并发编程

文章目录

  • 并发编程
    • 1、并发介绍
    • 2、Goroutine
      • 3、runtime包
    • 3、Channel
      • 3.1、channel相关信息
    • 4、Goroutine池(❌)
    • 5、定时器
    • 6、select多路复用
    • 7、并发安全和锁
    • 8、Sync
    • 9、原子操作(atomic包)

并发编程

1、并发介绍

1、进程和线程

​ 1)进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位

​ 2)线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位

​ 3)一个进程可以创建和撤销多个线程;同一个进程中的多个线程之间可以并发执行

2、并发和并行

​ 1)多线程程序在一个核的CPU上运行,就是并发

​ 2)多线程程序在多个核的CPU上运行,就是并行

3、协程和线程

​ 1)协程:独立的栈空间,共享堆空间,调度由用户自己控制,本质上有点类似用户级线程,这些用户级线程的调度也是自己实现的

​ 2)线程:一个线程上可以跑多个协程,协程是轻量级线程

4、goroutine是官方实现的超级“线程池”

​ 每个实例4-5kb的栈内存占用和由于实现机制而大幅度减少的创建和销毁开销是go高并发的根本原因。

goroutine奉行通过通信共享内存,而不是共享内存在通信!!!

2、Goroutine

​ 在java/c++中实现并发编程的时候,需要自己维护线程池,包装一个又一个任务,需要自己去调度线程执行任务并维护上下文 => go语言的goroutine,类似于线程,但其有go运行时调度和管理,智能的将goroutine中的任务合理的分配给每个CPU

​ => 所以当需要让某个任务并发执行的时候,只需要把这个任务包装成一个函数,开启一个goroutine去执行这个函数就可以。

1、使用goroutine

​ 使用的方式很简单,只需要在调用函数的时候,在前面加上go关键字就行,就实现为这个函数创建一个goroutine,一个goroutine必定对应一个函数,一个函数可以由多个goroutine去执行。

2、启动单个goroutine

1、正常
func hello() {fmt.Println("hello goroutine!") // 1
}func main() {hello()fmt.Println("main goroutine done!") // 2
}// 程序运行结果为先1再22、使用goroutine
func main() {go hello()fmt.Println("main goroutine done!") // 2
}// 程序运行结果:只答应main goroutine done!,却没有hello goroutine,为什么?
解释:1)程序启动的时候,go程序会为main()函数创建一个默认的goroutine;2)当main()函数返回goroutine就结束了,所有在main()函数中启动的goroutine会一同结束,main函数所在的goroutine就像是权利的游戏中的夜王,其他goroutine就是夜鬼,鬼王一死,其他人就GG3)所以就要想个办法,让main函数等一下hello函数,最简单的办法就是time.Sleep3、使用goroutine + time.Sleep
func main() {go hello()fmt.Println("main goroutine done!") // 2time.Sleep(time.Second)
}// 程序运行结果:先打印2 后打印1 => 原因就是创建goroutine是需要时间的,而这个main的goroutine是继续进行的(没有等待hello创建完才执行)

3、启动多个goroutine

​ 根据上个例子,我们发现go语言中实现并发好像很简单,那我们启动多个goroutine试试(使用sync.WaitGroup来实现goroutine的同步)

package mainimport ("fmt""sync"
)var wg sync.WaitGroupfunc hello(i int) {defer wg.Done() // goroutine结束就登记-1fmt.Println("Hello Goroutine!", i)
}func main() {for i := 0; i < 10; i++ {wg.Add(1) //启动一个goroutine就登记+1go hello(i)}wg.Wait() // 等待所有登记的goroutine都结束
}// 结果分析:多次执行上面代码,会发现每次打印数据的顺序都不一样,因为10个goroutine是并发执行的,而goroutine的调度是随机的

4、如果主协程退出了,其他任务还会执行吗?

package mainimport ("fmt""time"
)func main() {go func() {i := 0for {i++fmt.Printf("new goroutine: i = %d\\n", i)time.Sleep(time.Second)}}()i := 0for {i++fmt.Printf("main goroutine: i = %d\\n", i)time.Sleep(time.Second)if i == 2 {break}}
}// 运行结果:
main goroutine: i = 1
new goroutine: i = 1
main goroutine: i = 2
new goroutine: i = 2
new goroutine: i = 3=> 意味着main和new两个goroutine都是同步运行的,main当i==2时候停止,此时new的也会将当前的goroutine运行完就停止,不会创建新的goroutine

5、goroutine与线程

1、可增长的栈OS线程一般由固定的栈内存(通常为2MB),一个goroutine的栈在其声明周期开始只有很小的栈(典型情况下为2kb),goroutine的栈不是固定的,可以按需扩大或缩小(最大1GB)-> 所以在go语言中一次创建10w左右的goroutine是可以的。2、goroutine调度GPM是go语言运行时的实现,是go语言自己实现的一条调度系统,区别于操作系统调度OS线程1)G:goroutine,里面除了存放本goroutine信息外,还有所在P的绑定等信息。2)P:管理着一组goroutine队列,P会存储当前gorutine运行的上下文环境(函数指针、堆栈地址、地址边界),P会对自己管理等goroutine队列做一些调度(比如占用cpu时间较长的goroutine暂停、运行后续的goroutine等),当自己队列消费完就去全局队列取,如果全局队列夜消费完就去其他P队列里抢任务3)M:machine是GO运行时对操作系统运行内核线程的虚拟,M与内核线程一般时一一映射关系,一个goroutine最终是要放到M上执行的注意:P与M一般也是一一对应的,他们关系:P管理着一组G挂载在M上运行,当一个G长久的阻塞在一个M上的时候,runtiem会创建一个新的M,阻塞G所在的P会把其他G挂载在心间的M上,旧的G阻塞完成或认为其已经死掉的时候,回收旧的MP的个数正常是通过tuntime.GOMAXPROCS(最大256)

单从线程调度讲,go相比其他语言的优势在于OS线程是由OS内核调度的,goroutine则由go运行时(runtime)自己的调度器调度的,这个调度器使用一个称为:m:n调度的技术(复用/调度m个goroutine到n个OS线程)。

1)goroutine是在用户态下完成的,不涉及内核态与用户态的频繁切换,包括内存的分配与释放,都是由用户态维护一块大的内存池,不直接调用系统的malloc函数

2)成本比调度OS线程低很多

3)充分利用多核的硬件资源,近似把若干goroutine放在物理线程上,再加上本身goroutine的超轻量,保证调度方面的性能

3、runtime包

1、runtime.Goscched()

​ 让出cpu时间片,重新等待安排任务。

import ("fmt""runtime"
)func main() {go func(s string) {for i :=0; i < 2; i++ {fmt.Println(s)}}("world")// 主协程for i := 0; i < 2; i++ {runtime.Gosched()fmt.Println("hello")}
}// 执行结果:意味着让出时间片,执行goroutine,再执行主协程
world
world
hello
hello

2、runtime.Goexit()

​ 退出当前协程(一边烧烤,一边相亲,突然发现相亲对象太丑影响烧烤,让她走,没有然后了)

package mainimport ("fmt""runtime"
)func main() {go func() {defer fmt.Println("A defer")func() {defer fmt.Println("B defer")// 结束协程runtime.Goexit()defer fmt.Println("c defer")fmt.Println("B")}()fmt.Println("A")}()for {} // 保持程序一直执行
}// 执行结果:defer按照先入后出执行,然后runtime.Goexit()直接结束当前协程,没有其他操作,程序也没有终止,但也没有输出
B defer
A defer

3、runtime.GOMAXPROCS

​ go运行时调度器使用GOMAXPROCS参数来确定需要多少个OS线程来执行GO代码,默认时机器上的CPU核心数(GOMAXPROCS是m:n调度中的n),runtime.GOMAXPROCS()可以设置当前程序并发时占用的cpu逻辑核心数。

1、例子1
package mainimport ("fmt""runtime""time"
)func a() {for i := 0; i < 10; i++ {fmt.Println("A:", i)}
}func b() {for i := 0; i < 10; i++ {fmt.Println("B:", i)}
}func main() {runtime.GOMAXPROCS(1) // 只有一个核心可以让并发的线程执行go a()go b()time.Sleep(time.Second)
}// 执行结果:所以可以看到结果是B执行完之后,执行A!(先入后出的思想),顺序固定2、例子2
func main() {runtime.GOMAXPROCS(2) // 有两个核心可以让并发的线程执行go a()go b()time.Sleep(time.Second)
}// 执行结果:A先执行完 or B先执行完,顺序不定

GO语言中操作系统线程和goroutine的关系:

1、一个操作系统线程对应用户态多个goroutine

2、go程序可以使用多个操作系统线程

3、goroutine和OS线程是多对多的关系,即m:n

3、Channel

​ 在Go语言中,单纯的将函数并发执行是没有意义的,函数与函数之间交换数据才能体现并发执行函数的意义,虽然可以使用共享内存进行数据交换,但是共享内存在不同的goroutine中容易发生竞态问题,为了保证数据交换的正确性,必须使用互斥量对内存进行加锁,这种做法势必造成性能问题。

​ Go语言的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存实现通信。(重点强调:通信 -> 内存)

​ 如果说goroutine是go程序并发的执行体,那么channel就是他们之间的连接,channel可以让一个goroutine发送特定值到另外一个goroutine的通信机制。

​ go语言中的通道channel是一种特殊的类型,类似一个传送带或者队列,总是遵循先入后出规则,保证收发数据的顺序,每个通道都是一个具体类型的导管(声明channel的时候需要指定其元素类型)

3.1、channel相关信息

​ channel是一种类型,一种引用变量,格式:var 变量 chan 元素类型 => 具体例子:var ch1 chan int、var ch2 chan []int等

1、创建channel并初始化

// 通道是引用类型,通道类型的空值是nil
// 这里在复习一遍值类型和引用类型:值类型(int、float、bool、string)、引用类型(指针,slice,map,channel,接口,函数等)// 1、声明通道
var ch chan int
fmt.Println(ch) // <nil>// 2、初始化,使用make,以下是格式
make(chan 元素类型, [缓冲大小]) => 具体例子:ch := make(chan int) // 缓冲大小是可选的

2、channel操作

​ 1)发送send、2)接收receive、3)关闭close

// 1、定义一个通道
ch := make(chan int)// 2、发送,将一个值发送到通道中
ch <- 10 // 把10发送到ch中// 3、接收
方式1: x := <- ch // 从ch中接收并赋值给变量x
方式2: <- ch // 从ch中接收,但是忽略结果// 4、关闭: 需要注意的是,只有在通知接收方goroutine所有的数据都发送完毕的时候才需要关闭通道,通道是可以被垃圾回收机制回收的,文件必须在操作结束后关闭,通道可以不用
close(ch)// 5、关闭后的通道的特点:1)对一个关闭的通道再发送信息就会导致panic;2)进行接收会一直获取值直到通道为空;3)对一个已经关闭且没有值的通道执行接收会得到对应的零值;4)关闭一个已经关闭的通道会导致panic

3、无缓冲通道

​ 无缓冲通道又称为阻塞的通道。下面举一个例子:

func main() {ch := make(chan int)ch <- 10fmt.Println("发送成功")
}// 执行结果:可以编译通过,但是会报错
// 原因:1)ch := make(chan int)属于无缓冲通道,需要有人接收才能发送值 -> 改进:在发送之前,先让一个值去接收,如下:
func recv(c chan int) {ret := <- cfmt.Println("接收成功", ret)
}func main() {ch := make(chan int)go recv(ch) // 启动goroutine从通道中接收值ch <- 10fmt.Println("发送成功")
}// 执行过程:无缓冲通道上的发送操作会阻塞,直到另外一个goroutine在该通道上执行接收操作,这个时候值才能发送成功,两个goroutine继续执行;相反如果接收操作先执行,接收方的goroutine将阻塞,知道另外一个goroutine在该通道上发送一个值

4、有缓冲通道

func main() {ch := make(chan int, 1) // 创建一个容量为1的缓冲区通道ch <- 10fmt.Println("发送成功")
}

5、通道关闭

​ 可以通过内置的close()函数关闭channel(如果管道里不存值或者取值的时候,一定记得关闭通道)

func main() {c := make(chan int)go func() {for i := 0; i < 5; i++ {c <- i}close(c)}()for {if data, ok := <- c; ok {fmt.Println(data)} else {break}}fmt.Println("main结束")
}

6、如何优雅的从通道循环取值

​ 当通过通道发送有限数据的时候,我们可以通过close函数关闭通道来告知从该通道接收值的goroutine停止等待,当通道关闭时候,往该通道发送值会引发panic,从该通道接收的值一直都是类型零值,那么如何判断一个通道是否被关闭?

// channel 练习
func main() {ch1 := make(chan int)ch2 := make(chan int)// 开启goroutine将0~100的数发送到ch1中go func() {for i := 0; i < 100; i++ {ch1 <- i}close(ch1)}()// 开启goroutine从ch1中接收值,并将该值的平方发送到ch2中go func() {for {i, ok := <-ch1 // 通道关闭后再取值ok=falseif !ok {break}ch2 <- i * i}close(ch2)}()// 在主goroutine中从ch2中接收值打印for i := range ch2 { // 通道关闭后会退出for range循环fmt.Println(i)}
} // 所以我们有两种方式,判断通道是否被关闭,通常使用for range方式

7、单向通道

​ 有时候我们会将通道作为参数在多个任务函数间传递,很多时候我们在不同的任务函数中使用通道函数都会对其进行限制,比如限制通道在函数中智能接收或发送。

1chan<- int 是一个只能发送的通道,可以发送但不能接受
2<-chan int 是一个只能接收的通道,可以接收但不能发送注意:函数传参及任何赋值操作中将双向通道转换为单向通道是可以的,但是反过来是不行的

【Go】六、并发编程

4、Goroutine池(❌)

1、worker pool(goroutine池)

​ 1)本质上是生产者消费者模型

​ 2)可以有效控制goroutine数量,防止暴涨

​ 3)需求:如计算一个数字的各个位数之和,随机生成数字进行计算等;

5、定时器

​ Timer:时间到了,只执行一次。

package mainimport ("fmt""time"
)func main() {// 1、Time基本使用timer1 := time.NewTimer(2 * time.Second)t1 := time.Now()fmt.Printf("t1 : %v\\n", t1)t2 := <-timer1.Cfmt.Printf("t2 : %v\\n", t2)// 2、验证timer只能响应一次timer2 := time.NewTimer(time.Second)for {<-timer2.Cfmt.Println("时间到了")}// 3、实现timer的延时功能time.Sleep(time.Second)timer3 := time.NewTimer(2 * time.Second)<- timer3.Cfmt.Println("两秒到")<-time.After(2 * time.Second)fmt.Println("两秒到")// 4、停止定时器timer4 := time.NewTimer(2 * time.Second)go func() {<-timer4.Cfmt.Println("定时器执行超时啦")}()b := timer4.Stop()if b {fmt.Println("timer4已经关闭")}// 5、重置定时器timer5 := time.NewTimer(3 * time.Second)timer5.Reset(1 * time.Second)fmt.Println(time.Now())fmt.Println(<-timer5.C)
}

​ Ticker:时间到了,多次执行

func main() {// 1、获取ticker对象ticker := time.newTicker(1 * time.Second)i := 0// 子协程go func() {for {// <-ticker.Ci++fmt.Println(<-ticker.C)if i == 5 {// 停止ticker.Stop()}}}
}

6、select多路复用

​ 在某些场景下我们需要同多个通道接收数据,通道在接收数据的时候,如果没有数据可以接收,将会发生阻塞,你也许会写出如下代码,使用遍历方式来实现:

for {// 尝试从ch1接收值data, ok := <-ch1// 尝试从ch2接收值data, ok := <-ch2...
}// 上面的这种可以实现从多个通道接收需求,但是运行性能会差很多,为了应对这样的场景,Go内置了select关键字,可以同时响应多个通道的操作

1、select

​ 类似switch语句,它有一系列case分支和一个默认的分支,每个case会有对应一个通道的通信过程,select会一直等待,直到某个case通信操作完成,就会执行case分支对应的语句。

select {case <-chan1:// cha1成功读取到数据,则进行case处理语句case <-chan2 <-1:// 如果成功向chan2写入数据,则进行case处理语句default:// 如果上面都没有成功,则进行default处理层
}

2、select可以同时监听一个或多个channel,直到其中一个channel ready

package mainimport ("fmt""time"
)func test1(ch chan string) {time.Sleep(time.Second * 5)ch <- "test1"
}
func test2(ch chan string) {time.Sleep(time.Second * 2)ch <- "test2"
}func main() {// 2个管道output1 := make(chan string)output2 := make(chan string)// 跑2个子协程,写数据go test1(output1)go test2(output2)// 用select监控select {case s1 := <-output1:fmt.Println("s1=", s1)case s2 := <-output2:fmt.Println("s2=", s2)}
}// 运行结果是:s2 = test2 
// 通过结果分析,我们知道我们同事对多个通道进行监听,因为没有while(true)语句,所以这个时候,监听到output2的时候,执行结束,完成操作,没有检测到output1
// 思考:如果多个通道,是在同一时刻就准备好了,那么怎么进行分配呢 => 答案是随机原则一个执行

3、判断通道是否存满

func main() {// 创建管道output1 := make(chan string, 10)// 子协程写数据go write(output1)// 取数据for s := range output1 {fmt.Println("res:", s)time.Sleep(time.Second)}
}func write(ch chan string) {for {select {// 写数据case ch <- "hello":fmt.Println("write hello")default:fmt.Println("channel full")}time.Sleep(time.Millisecond * 500)}
}// 结果就是执行和输出是同步运行的,最后会出现又写但是又是满的情况

7、并发安全和锁

​ go代码中可能存在多个goroutine同时操作一个资源(临界区),这种情况会发生竞态问题(数据竞态),比如十字路口汽车竞争,车厢卫生间竞争等。

1、例子
var x int64
var wg sync.WaitGroupfunc add() {for i := 0; i < 5000; i++ {x = x + 1}wg.Done()
}func main() {wg.Add(2)go add()go add()wg.Wait()fmt.Println(x)
}// 运行结果为5000,我们在代码中开启两个goroutine取累加变量x的值,但是这两个goroutine在访问和修改x变量的时候存在数据竞争,导致最后的结果与期待的不符合,下面我们来说说怎么改进

1、互斥锁

​ 互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个goroutine可以访问共享资源,go语言中使用sync包的Mutex类型来实现互斥锁,我们用实际的行动,看看是怎么进行修复的。

var x int64
var wg sync.WaitGroup
var lock sync.Mutex // 新增锁func add() {for i := 0; i < 5000; i++ {lock.lock() // 加锁x = x + 1lock.unlock() // 解锁}wg.Done()
}func main() {wg.Add(2)go add()go add()wg.Wait()fmt.Println(x)
}// 运行结果:10000,是符合我们的预期的

2、读写互斥锁

​ 互斥锁是完全互斥的,但是很多实际场景是读多写少,当我们并发的去读一个资源不涉及到资源修改的时候是没有必要加锁的,在这种场景下,读写锁是更好的一种选择,读写锁在Go语言中使用sync包中的RWMutex类型

​ 1)读锁:当一个goroutine获取读锁之后,其他的goroutine如果获取读锁会继续获得锁,如果是写锁则等待;

​ 2)写锁:当一个goroutine获取写锁之后,其他的goroutine不论获取读锁还是写锁都会等待。

var (x      int64wg     sync.WaitGrouplock   sync.Mutexrwlock sync.RWMutex
)func write() {// lock.Lock()   // 加互斥锁rwlock.Lock() // 加写锁x = x + 1time.Sleep(10 * time.Millisecond) // 假设读操作耗时10毫秒rwlock.Unlock()                   // 解写锁// lock.Unlock()                     // 解互斥锁wg.Done()
}func read() {// lock.Lock()                  // 加互斥锁rwlock.RLock()               // 加读锁time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒rwlock.RUnlock()             // 解读锁// lock.Unlock()                // 解互斥锁wg.Done()
}func main() {start := time.Now()for i := 0; i < 10; i++ {wg.Add(1)go write()}for i := 0; i < 1000; i++ {wg.Add(1)go read()}wg.Wait()end := time.Now()fmt.Println(end.Sub(start))
}

8、Sync

​ 在前面我们看到sync.WaitGroup,我们在代码中生硬的使用time.Sleep肯定是不合适的,Go语言中可以使用sync.WaitGroup来实现并发任务同步,其中sync.WaitGroup有以下几个方法:

​ 1)(wg *WaitGroup) Add(delta int): 计数器 + delta

​ 2)(wg *WaitGrooup) Done():计数器-1

​ 3)(wg *WaitGroup) Wait(): 阻塞知道计数器为0

在sync.WaitGroup内部维护这一个计数器,计数器的值可以增加或减少;例如当我们启动N个并发任务时,就将计数器的任务增加N,每个任务完成时,通过done()实现计数器-1,通过调用wait()来等待并发任务执行完,当计数器的值为0,表示所有并发任务已经完成。

var wg sync.Waitgroupfunc hello() {defer wg.Done() // 这个属于子协程里的,所以在原本主协程个数1 + 子协程1 - 1 => 现在执行完,只有主协程fmt.Println("hello Goroutine!")
}func main() {wg.Add(1) // 主协程在运行,计数器+1go hello() //启动另外一个goroutine去执行hello函数fmt.Println("main goroutine done")wg.wait()
}

1、sync.Once()

​ 这是一个进阶的知识点,在编程很多场景下,我们需要确保在高并发场景下只执行一次,例如只加载一次配置文件,只关闭一次通道等。

​ sync.Once就针对只执行一次场景的解决方案: func (o *Once) Do(f func()) {} => 如果需要执行的函数f需要参数传递,那么就要搭配闭包来使用

1、例子:加载配置文件示例
var icons map[string]image.Imagefunc loadIcons() {icons = map[string] image.Image{"left" : loadIcon("left.png").........}
}// Icon被多个goroutine调用时,不是并发安全的
func Icon(name string) image.Image {if icons == nil {loadIcons()}return icons[name]
}// 运行结果,由于编译器cpu认为美国goroutine满足串行一致的基础上自由的访问内存,所以结果可能被重新排布 => 出现了即使判断icons不是nil,也不意味着变量初始化完成了,考虑到这种情况,我们能想到的办法就是添加互斥锁,保证初始化icons的时候,不会被其他goroutine操作,这样可能引发性能安全问题:
var icons map[string]image.Imagevar loadIconsOnce sync.Oncefunc loadIcons() {icons = map[string]image.Image{"left":  loadIcon("left.png"),"up":    loadIcon("up.png"),"right": loadIcon("right.png"),"down":  loadIcon("down.png"),}
}// Icon 是并发安全的
func Icon(name string) image.Image {loadIconsOnce.Do(loadIcons)return icons[name]
}// 可见,syncc.Once内部其实包含一个互斥锁和布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成,保证初始化操作的时候是并发安全的并且初始化事务不会被多次执行

9、原子操作(atomic包)

1、原子操作

​ 代码中的加锁操作是因为涉及内核态的上下文切换会比较耗时、代价比较高。针对基本数据类型我们还可以使用原子操作来保证并发安全,因为原子操作是go语言提供的方法在用户态就可以完成,因此性能也比加锁操作更好,go语言中原子操作有内置的标准库sync/atomic提供

​ 1)读取操作;2)写入操作;3)修改草走;4)交换操作;5)比较并操作

1、我们写一个示例:比较互斥锁和原子操作的性能
var x int64
var lock1 sync.Mutex
var wg sync.WaitGroup// 普通版本加函数
func add() {x++wg.Done()
}// 互斥锁版本加函数
func mutexAdd() {lock1.Lock()x++lock1.Unlock()wg.Done()
}// 原子版本加操作
func atmoicAdd() {start := time.Now()for i := 0; i < 10000; i++ {wg.Add(1)// go add() 属于普通版本add函数,不是并发安全的// go mutexAdd() 属于加锁版add函数,并发安全,但是加锁性能开销大go atomicAdd() // 原子版add函数,并发安全,性能优于加锁版}wg.Wait()end := time.Now()fmt.Println(x)fmt.Println(end.Sub(start))
}// atmoic包提供了底层的原子内存级别操作,对于同步算法实现很有用,这些函数必须谨慎的保证正确使用,除了某些特殊的底层应用,使用通道或者atomic包的函数/类型实现同步更好