> 文章列表 > go语言并发编程

go语言并发编程

go语言并发编程

并发编程

    • 1.并发介绍
      • 1.1进程和线程
      • 1.2并发和并行
      • 1.3协程和线程
        • 1.协程
        • 2.线程
      • 1.4goroutine只是由官方实现的超级"线程池"
    • 2.Goroutine
      • 2.1使用Goroutine
        • 1.启动单个goroutine
        • 2.启动多个goroutine
        • 3.goroutine与线程
          • 3.1可增长的栈
          • 3.2goroutine调度
    • 3.runtime包
      • 3.1runtime.Gosched()
      • 3.2runtime.Goexit()
      • 3.3runtime.GOMAXPROCS
      • 3.4Go语言中的操作系统线程和goroutine的关系
    • 4.Channel
      • 4.1channel类型
      • 4.2创建channel
      • 4.3初始化channel
      • 4.4channel操作
        • 1.发送
        • 2.接收
        • 3.关闭
        • 4.注意点
      • 4.4无缓冲的通道
      • 4.5有缓冲的通道
      • 4.6close()
        • 1.从通道循环取值
      • 4.7单向通道
        • 1.注意点
      • 3.7通道总结
    • 5.Goroutine池
    • 6.定时器
      • 6.1Timer
      • 6.2Ticker
    • 7.select
      • 7.1select多路复用
        • 1.问题
        • 2.Go内置了select关键字,可以同时响应多个通道的操作
        • 3.select可以同时监听一个或多个channel,直到其中一个channel ready
        • 4.如果多个channel同时ready,则随机选择一个执行
        • 5.可以用于判断管道是否存满
    • 8.并发安全和锁
      • 8.1竞态问题
      • 8.2互斥锁
      • 8.3读写互斥锁
    • 9.Sync
      • 9.1sync.WaitGroup
      • 9.2sync.Once
        • 1.sync.Once源码解析
      • 9.3sync.Map
        • 1.sync.Map属于go语言中并发安全版
    • 10.原子操作(atomic包)

1.并发介绍

1.1进程和线程

  • 进程是程序操作系统中得一次执行过程,系统进行资源分配和调度的一个独立单元
  • 线程是进程的一个执行实体,是cpu调度分派的基本单位,它是比进程更小的能独立运行的基本单位
  • 一个进程可以创建和撤销多个线程,同一个进程中的多个线程之间可以并发执行

1.2并发和并行

  • 多线程程序在一个核的CPU上运行,就是并发
  • 多线程程序在多个核的CPU上运行,就是并行

1.3协程和线程

1.协程

  • 独立的栈空间共享堆空间,调度由用户自己控制,本质上有点类似于用户级线程,这些用户级线程的调度也是自己实现的

2.线程

  • 一个线程可以跑多个协程,协程是轻量级的线程

1.4goroutine只是由官方实现的超级"线程池"

  • goroutine奉行通过通信来共享内存而不是共享内存来通信

2.Goroutine

2.1使用Goroutine

  • Go语言中使用Goroutine非常简单,只需要在调用函数的时候在前面加上go关键字,就可以为一个函数创建一个goroutine
  • 一个goroutine必定对应一个函数可以创建多个goroutine去执行相同的函数

1.启动单个goroutine

  • 启动goroutine的方式非常简单,只需要在调用的函数(普通函数和匿名函数)前面加上一个go关键字
package mainimport ("fmt""time"
)func hello() {fmt.Println("Hello Goroutine!")
}func main() {// 直接调用  可以执行hello方法// hello()// 增加关键词  go  有概率不执行hello方法结束// 在程序启动时,Go程序就会main()函数创建一个默认的goroutine,默认结束,其他goroutine一同结束go hello()fmt.Println("main goroutine done!")// 最简单的方式就添加time.Sleep()time.Sleep(time.Second)
}

2.启动多个goroutine

package mainimport ("fmt""sync"
)// sync.WaitGroup来实现goroutine的同步
var wg sync.WaitGroupfunc hello(i int) {defer wg.Done()fmt.Println("hello,Goroutine", i)
}
func main() {for i := 0; i < 10; i++ {// 启动一个goroutine就登记一个wg.Add(1)// 10个goroutine是并发执行的,而goroutine的调度是随机的go hello(i)}//等待所有登记的goroutine都结束wg.Wait()
}

3.goroutine与线程

3.1可增长的栈
  • OS线程(操作系统线程)一般都有固定的栈内存(通常为2MB),一个goroutine的栈在其生命周期开始时只有很小的栈(典型情况下2KB),goroutine的栈不是固定的,他可以按需增大和缩小,goroutine的栈大小限制可以达到1GB,虽然极少会用到这个大。所以在Go语言中一次创建十万左右的goroutine也是可以的。
3.2goroutine调度
  • GPM是Go语言运行时(runtime)层面的实现,是go语言自己实现的一套调度系统。区别于操作系统调度OS线程。
    • G很好理解,就是个goroutine的,里面除了存放本goroutine信息外 还有与所在P的绑定等信息。
    • P管理着一组goroutine队列,P里面会存储当前goroutine运行的上下文环境(函数指针,堆栈地址及地址边界),P会对自己管理的goroutine队列做一些调度(比如把占用CPU时间较长的goroutine暂停、运行后续的goroutine等等)当自己的队列消费完了就去全局队列里取,如果全局队列里也消费完了会去其他P的队列里抢任务。
    • M(machine)是Go运行时(runtime)对操作系统内核线程的虚拟, M与内核线程一般是一一映射的关系, 一个groutine最终是要放到M上执行的;
    • P管理着一组G挂载在M上运行。当一个G长久阻塞在一个M上时,runtime会新建一个M,阻塞G所在的P会把其他的G 挂载在新建的M上。当旧的G阻塞完成或者认为其已经死掉时 回收旧的M。
    • P的个数是通过runtime.GOMAXPROCS设定(最大256),Go1.5版本之后默认为物理线程数。 在并发量大的时候会增加一些P和M,但不会太多,切换太频繁的话得不偿失。

3.runtime包

3.1runtime.Gosched()

  • 让出CPU时间片,重新等待安排任务
package mainimport ("fmt""runtime"
)func main() {go func(s string) {for i := 0; i < 2; i++ {fmt.Println(s)}}("world")// 主协程for i := 0; i < 2; i++ {// 切一下 ,再次分配任务runtime.Gosched()fmt.Println("hello")}}

3.2runtime.Goexit()

  • 退出当前协程
package mainimport ("fmt""runtime"
)func main() {go func() {defer fmt.Println("A.defer")func() {defer fmt.Println("B.defer")// 结束协程runtime.Goexit()defer fmt.Println("C.defer")fmt.Println("B")}()}()for {}
}

3.3runtime.GOMAXPROCS

  • Go运行时的调度器使用GOMAXPROCS参数来确定需要使用多少个OS线程来同时执行Go代码
  • 默认值是机器上的CPU核心数
  • Go1.5版本之前,默认使用的是单核心执行。Go1.5版本之后,默认使用全部的CPU逻辑核心数
  • Go语言中可以通过runtime.GOMAXPROCS()函数设置当前程序并发时占用的CPU逻辑核心数。
package mainimport ("fmt""runtime""time"
)func a() {for i := 1; i < 10; i++ {fmt.Println("A:", i)}
}
func b() {for i := 1; i < 10; i++ {fmt.Println("B:", i)}}func main() {runtime.GOMAXPROCS(8)go a()go b()time.Sleep(time.Second)
}

3.4Go语言中的操作系统线程和goroutine的关系

  • 一个操作系统线程对应用户态多个goroutine。
  • go程序可以同时使用多个操作系统线程。
  • goroutine和OS线程是多对多的关系,即m:n。

4.Channel

  • Go语言的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信
  • Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。

4.1channel类型

  • channel是一种类型,一种引用类型。声明通道类型的格式
var 变量 chan 元素类型

4.2创建channel

  • 通道是引用类型,通道类型的空值是nil
  • 声明的通道后需要使用make函数初始化之后才能使用
var ch chan int 
fmt.Println(ch)//nil

4.3初始化channel

  • 格式
make(chan 元素类型,[缓冲大小])
  • 代码
ch1 := make(chan int)

4.4channel操作

  • 通道有发送接收关闭三种操作
  • 发送和接收都使用 <-符号
//初始化通道
ch := make(chan int)

1.发送

  • 将一个值发送到通道中
ch<-10

2.接收

  • 从一个通道中接收值
// 从ch中接收值并赋值给变量x
x := <- ch
//从ch中接收者,忽略结果
<-ch

3.关闭

  • 通过内置的close函数来关闭通道
close(ch)

4.注意点

  • 对一个关闭的通道再发送值就会导致panic。
  • 对一个关闭的通道进行接收会一直获取值直到通道为空。
  • 对一个关闭的并且没有值的通道执行接收操作会得到对应类型的默认值
  • 关闭一个已经关闭的通道会导致panic

4.4无缓冲的通道

  • 无缓冲的通道又称为阻塞的通道
// 编译能通过,但是不能执行,报 deadlock
ch := make(chan int)
ch <- 10
  • 无缓冲通道进行通信将导致发送和接收的goroutine同步化。因此,无缓冲通道也被称为同步通道。
func recv(c chan int){ret <- cfmt.Println("接收到的值",ret)
}func main(){ch := make(chan int)//启用goroutine从通道接收值go recv()ch <- 10fmt.Println("发送成功")
}

在这里插入图片描述

4.5有缓冲的通道

  • 只要通道的容量大于零,那么该通道就是有缓冲的通道,通道的容量表示通道中能存放元素的数量
package mainimport "fmt"func main() {//创建一个容量为1的有缓冲区通道ch := make(chan int, 1)ch <- 10fmt.Println("发送成功")close(ch)
}

4.6close()

  • 可以通过内置的close()函数关闭channel(如果你的管道不往里存值或者取值的时候一定记得关闭管道)

1.从通道循环取值

package mainimport "fmt"func main() {ch1 := make(chan int)ch2 := make(chan int)// 开启goroutine将0~99的数发送到ch1go func() {for i := 0; i < 100; i++ {ch1 <- i}close(ch1)}()//开启goroutine从ch1中接收值,并将该值的平方发送到ch2中go func() {for {// 通道关闭后再取值 ok = falsei, ok := <-ch1if !ok {break}ch2 <- i * i}close(ch2)}()for i := range ch2 {fmt.Println(i)}
}

4.7单向通道

  • 通道作为参数在多个任务函数间传递,很多时候我们在不同的任务函数中使用通道都会对其进行限制,比如限制通道在函数中只能发送或只能接收
package mainimport "fmt"func counter(out chan<- int) {for i := 0; i < 100; i++ {out <- i}close(out)
}
func squarer(out chan<- int, in <-chan int) {for i := range in {out <- i * i}close(out)
}
func printer(in <-chan int) {for i := range in {fmt.Println(i)}
}func main() {ch1 := make(chan int)ch2 := make(chan int)go counter(ch1)go squarer(ch2, ch1)printer(ch2)
}

1.注意点

  • chan<- int是一个只能发送的通道,可以发送但是不能接收
  • <-chan int是一个只能接收的通道,可以接收但是不能发送

3.7通道总结

channel nil 非空 空的 满了 没满
接收 阻塞 接收值 阻塞 接收值 接收值
发送 阻塞 发送值 发送值 阻塞 发送值
关闭 panic 关闭成功,读完数据后返回默认值 关闭成功,返回默认值 关闭成功,读完数据后返回默认值 关闭成功,读完数据后返回默认值

5.Goroutine池

package mainimport ("fmt""math/rand"
)type Job struct {Id      intRandNum int
}
type Result struct {job *Jobsum int
}// 创建工作池
// 参数1:开几个协程
func createPool(num int, jobChan chan *Job, resultChan chan *Result) {//根据开协程个数,去运行for i := 0; i < num; i++ {go func(jobChan chan *Job, resultChan chan *Result) {//执行运算//遍历job通道所有数据,数字进行相加for job := range jobChan {//随机数接过来r_num := job.RandNum//随机数每一位相加//定义返回值(结果)var sum intfor r_num != 0 {tmp := r_num % 10sum += tmpr_num /= 10}//想要得结果resultr := &Result{job: job,sum: sum,}//运算结果放到管道resultChan <- r}}(jobChan, resultChan)}
}func main() {//创建两个通道// job通道jobChan := make(chan *Job, 128)// 结果通道resultChan := make(chan *Result, 128)//创建工作池createPool(64, jobChan, resultChan)go func(resultChan chan *Result) {//遍历管道for result := range resultChan {fmt.Printf("job id:%v randnum:%v result:%d\\n", result.job.Id, result.job.RandNum, result.sum)}}(resultChan)var id int// 循环创建job,输入到管道for {id++//生成随机数r_num := rand.Int()job := &Job{Id:      id,RandNum: r_num,}jobChan <- job}
}

6.定时器

6.1Timer

  • 时间到了,执行一次
package mainimport ("fmt""time"
)func main() {// timer 基本使用// timer1 := time.NewTimer(2 * time.Second)// t1 := time.Now()// fmt.Printf("t1:%v\\n", t1)// t2 := <-timer1.C// fmt.Printf("t2:%v\\n", t2)// 验证timer只能响应一次// timer2 := time.NewTimer(time.Second)// for {// 	<-timer2.C// 	fmt.Println("时间到")// }// timer 实现延时的功能// time.Sleep(time.Second)// timer3 := time.NewTimer(2 * time.Second)// <-timer3.C// fmt.Println("时间到")// <-time.After(2 * time.Second)// fmt.Println("2秒到")// 停止定时器// timer4 := time.NewTimer(2 * time.Second)// go func() {// 	<-timer4.C// 	fmt.Println("定时器执行了")// }()// b := timer4.Stop()// if b {// 	fmt.Println("timer4 已经关闭")// }// 重置定时器timer5 := time.NewTimer(3 * time.Second)timer5.Reset(1 * time.Second)fmt.Println(time.Now())fmt.Println(<-timer5.C)for {}
}

6.2Ticker

  • 时间到了,多次执行
package mainimport ("fmt""time"
)func main() {//获取ticker对象ticker := time.NewTicker(1 * time.Second)i := 0//子协程go func() {for {//调用一次 <-ticker.C 就延迟1s//<-ticker.Ci++fmt.Println(<-ticker.C)if i == 5 {//stopticker.Stop()}}}()for {}
}

7.select

7.1select多路复用

1.问题

  • 在某些场景下需要同时从多个通道接收数据,通道在接收数据时,如果没有数据可以接收将会发生阻塞
for{//尝试从ch1接收值data,ok := <-ch1// 尝试从ch2接收值data,ok := <-ch2
}

2.Go内置了select关键字,可以同时响应多个通道的操作

  • select的使用类似于switch语句,它有一系列case分支和一个默认分支,每个case会对应一个通道的通信(接收或发送)过程。select会一直等待,直到某个case的通信操作完成时,就会执行case分支对应的语句
select{
case <-ch1://如果ch1成功读到数据,则进行该case处理语句
case ch2 <-2//如果成功向ch2写入数据,则进行该case处理语句
default://如果上面都没成功,则进入default处理流程
}

3.select可以同时监听一个或多个channel,直到其中一个channel ready

package mainimport ("fmt""time"
)func test1(ch chan int) {time.Sleep(time.Second * 5)ch <- 1
}
func test2(ch chan int) {time.Sleep(time.Second * 2)ch <- 2
}func main() {ch1 := make(chan int)ch2 := make(chan int)//select 可以同时监听一个或多个channel,直到其中一个channel readygo test1(ch1)go test2(ch2)//用select监控  执行了case data2 := <-ch2select {case data1 := <-ch1:fmt.Println("data1: ", data1)case data2 := <-ch2:fmt.Println("data2: ", data2)}}

4.如果多个channel同时ready,则随机选择一个执行

	//创建两个管道int_chan := make(chan int, 1)string_chan := make(chan string, 1)go func() {// 协程线程休眠2s// time.Sleep(time.Second * 2)int_chan <- 1}()go func() {string_chan <- "hello"}()select {case value := <-int_chan:fmt.Println("int: ", value)case value := <-string_chan:fmt.Println("string: ", value)}fmt.Println("main结束")

5.可以用于判断管道是否存满

//创建管道ch := make(chan string, 10)//子协程写数据go write(ch)//取数据for str := range ch {fmt.Println("str: ", str)time.Sleep(time.Second)}func write(ch chan<- string) {for {select {//写数据case ch <- "hello":fmt.Println("write hello")default:fmt.Println("channel full")}time.Sleep(time.Millisecond * 500)}
}

8.并发安全和锁

8.1竞态问题

  • 在Go代码中可能会存在多个goroutine同时操作一个资源(临界区),这种情况会发生竞态问题(数据竞态)
package mainimport ("fmt""sync"
)var x int64
var wg sync.WaitGroupfunc add() {for i := 0; i < 5000; i++ {x = x + 1}wg.Done()
}func main() {wg.Add(2)go add()go add()wg.Wait()fmt.Println("x: ", x)
}

8.2互斥锁

  • 互斥锁是一种常用的控制共享资源访问的方法,能够保证同时只有一个goroutine可以访问共享资源。Go语言中使用sync包的Mutex类型来实现互斥锁
package mainimport ("fmt""sync"
)var x int64
var wg sync.WaitGroup
var lock sync.Mutexfunc add() {for i := 0; i < 5000; i++ {//加锁lock.Lock()x = x + 1//解锁lock.Unlock()}wg.Done()
}func main() {wg.Add(2)go add()go add()wg.Wait()fmt.Println("x: ", x)
}

8.3读写互斥锁

  • 互斥锁是完全互斥的,但是有很多实际的场景下是读多写少的,当我们并发的去读取一个资源不涉及资源修改的时候是没有必要加锁的,这种场景下使用读写锁是更好的一种选择。读写锁在Go语言中使用sync包中的RWMutex类型

  • 读写锁分为两种:读锁和写锁

    • 当一个goroutine获取读锁之后,其他的goroutine如果是获取读锁会继续获得锁,如果是获取写锁就会等待
    • 当一个goroutine获取写锁之后,其他的goroutine无论是获取读锁还是写锁都会等待
package mainimport ("fmt""sync""time"
)var (x      int64wg     sync.WaitGrouplock   sync.Mutexrwlock sync.RWMutex
)func write() {// 加互斥锁//lock.Lock()// 加写锁rwlock.Lock()x = x + 1time.Sleep(time.Millisecond * 10)// 解写锁rwlock.Unlock()// 解互斥锁// lock.Unlock()wg.Done()}
func read() {// 加互斥锁//lock.Lock()// 加读锁rwlock.RLock()time.Sleep(time.Millisecond)//解读锁rwlock.RUnlock()// 解互斥锁//lock.Unlock()wg.Done()}func main() {start := time.Now()for i := 0; i < 10; i++ {wg.Add(1)go write()}for i := 0; i < 1000; i++ {wg.Add(1)go read()}wg.Wait()end := time.Now()fmt.Println(end.Sub(start))
}

9.Sync

9.1sync.WaitGroup

  • 在实际代码中,使用time.Sleep不太合适,sync.WaitGroup提供了以下方法实现并发任务的同步
方法名 功能
(wg * WaitGroup) Add(delta int) 计数器+delta
(wg *WaitGroup) Done() 计数器-1
(wg *WaitGroup) Wait() 阻塞直到计数器变为0
package mainimport ("fmt""sync"
)var wg sync.WaitGroupfunc hello() {defer wg.Done()fmt.Println("Hello Goroutine")
}
func main() {wg.Add(1)// 启动另外一个goroutine去执行hello函数go hello()fmt.Println("main goroutine done")wg.Wait()
}

9.2sync.Once

  • Go语言中的sync包中提供了一个针对只执行一次场景的解决方案–sync.Once。
  • sync.Once只有一个Do方法
    func (o * Once)Do(f func()){}
    
  • 注意:如果要执行的函数f需要传递参数就需要搭配闭包来使用

1.sync.Once源码解析

type Once struct{// done 字段用来判断某行为action是否已进行,因为hot path中被使用,放在结构体的第一字段能够减少机器指令done uint32m Mutex
}func (o *Once) Do(f func()) {// 原子加载标识值,判断是否已被执行过if atomic.LoadUint32(&o.done) == 0 {o.doSlow(f)}
}func (o *Once) doSlow(f func()) { // 还没执行过函数o.m.Lock()defer o.m.Unlock()if o.done == 0 { // 再次判断下是否已被执行过函数/**Once 本身的语义就是对外保证你传进来 f 执行过一次,若 f 在执行过程中 panic 了,会导致 		Do 也直接退出,但是退出前会把所有的 defer 都执行完,保证了 f 执行过一次。若放在 f() 后面,当 f 发生 panic 之后,done 就不能置为 1*/defer atomic.StoreUint32(&o.done, 1) // 原子操作:修改标识值f() // 执行函数}
}

9.3sync.Map

  • Go语言中内置的map不是并发安全的
package mainimport ("fmt""strconv""sync"
)var m = make(map[string]int)func get(key string) int {return m[key]
}
func set(key string, value int) {m[key] = value
}
func main() {wg := sync.WaitGroup{}for i := 0; i < 20; i++ {wg.Add(1)go func(n int) {key := strconv.Itoa(n)set(key, n)fmt.Printf("k=%v,v=%v\\n", key, get(key))wg.Done()}(i)wg.Wait()}
}

1.sync.Map属于go语言中并发安全版

var m = sync.Map{}func main() {wg := sync.WaitGroup{}for i := 0; i < 20; i++ {wg.Add(1)go func(n int) {key := strconv.Itoa(n)m.Store(key, n)value, _ := m.Load(key)fmt.Printf("k=:%v,v:=%v\\n", key, value)wg.Done()}(i)}wg.Wait()
}

10.原子操作(atomic包)

package mainimport ("fmt""sync""sync/atomic""time"
)var x int64
var l sync.Mutex
var wg sync.WaitGroup// 普通版函数
func add() {x++wg.Done()
}// 互斥锁版函数
func mutesAdd() {l.Lock()x++l.Unlock()wg.Done()
}// 原子操作版函数
func atomicAdd() {atomic.AddInt64(&x, 1)wg.Done()
}func main() {start := time.Now()for i := 0; i < 1000000; i++ {wg.Add(1)// 普通版add函数  不是并发安全的// 执行时间 3ms左右   值不固定//go add()//加锁版add函数,是并发安全的,但是加锁性能开销大// 执行时间 2~5ms  值固定//go mutesAdd()// 原子操作版add函数 是并发安全,性能优于加锁版// 执行时间 2~5ms 值固定go atomicAdd()}wg.Wait()end := time.Now()fmt.Println("x=", x)fmt.Println(end.Sub(start))
}