kevinyan815/gocookbook

互斥锁的典型用法和常见误区

kevinyan815 opened this issue · 0 comments

互斥锁是并发控制的一个基本手段,是为了避免竞争而建立的一种并发控制机制。在学习它的具体实现原理前,我们要先搞懂一个概念,就是临界区。

在并发编程中,如果程序中的一部分会被并发访问或修改,那么,为了避免并发访问导致的意想不到的结果,这部分程序需要被保护起来,这部分被保护起来的程序,就叫做临界区。

如果很多线程同步访问临界区,就会造成访问或操作错误,这当然不是我们希望看到的结果。所以,我们可以使用互斥锁,限定临界区只能同时由一个线程持有。

Go 语言的互斥锁

在Go语言里sync.Mutex 是语言层面提供给我们的互斥锁,它实现了sync.Locker 接口

type Locker interface {
    Lock()
    Unlock()
}

所以互斥锁 Mutex 提供两个方法 Lock 和 Unlock:进入临界区之前调用 Lock 方法,退出临界区的时候调用 Unlock 方法:

  func(m *Mutex)Lock()
  func(m *Mutex)Unlock()

当一个 goroutine 通过调用 Lock 方法获得了这个锁的拥有权后, 其它请求锁的 goroutine 就会阻塞在 Lock 方法的调用上,直到锁被释放并且自己获取到了这个锁的拥有权。

Mutex 的典型使用方法

下面是一个使用Mutex 加锁的典型用法:我们创建了 10 个 goroutine,同时不断地对一个变量(count)进行加 1 操作,每个 goroutine 负责执行 10 万次的加 1 操作,我们期望的最后计数的结果是 10 * 100000 = 1000000 (一百万)。

import (
  "fmt"
  "sync"
)
    
func main() {
  var count = 0
  // 使用WaitGroup等待10个goroutine完成
  var wg sync.WaitGroup
  wg.Add(10)
  for i := 0; i < 10; i++ {
    go func() {
      defer wg.Done()
      // 对变量count执行10次加1
      for j := 0; j < 100000; j++ {
        count++
      }
    }()
  }
  // 等待10个goroutine完成
  wg.Wait()
  fmt.Println(count)
}

运行程序我们会发现变量count 最后的结果不是1000000。这是因为变量的累加并不是一个原子操作,它至少包含几个步骤,比如读取变量 count 的当前值,对这个值加 1,把结果再保存到 count 中。因为不是原子操作,在多线程环境下就会有并发的问题。

这里的共享资源是 count 变量,临界区是 count++,只要在临界区前面获取锁,在离开临界区的时候释放锁,就能完美地解决多线程并大的 data race 的问题了。

package main


import (
  "fmt"
  "sync"
)


func main() {
  // 互斥锁保护计数器
  var mu sync.Mutex
  // 计数器的值
  var count = 0

  // 辅助变量,用来确认所有的goroutine都完成
  var wg sync.WaitGroup
  wg.Add(10)

  // 启动10个gourontine
  for i := 0; i < 10; i++ {
    go func() {
      defer wg.Done()
      // 累加10万次
      for j := 0; j < 100000; j++ {
        mu.Lock()
        count++
        mu.Unlock()
      }
    }()
  }
  wg.Wait()
  fmt.Println(count)
}

这里有一点需要注意:Mutex 的零值是还没有 goroutine 等待的未加锁的状态,所以你不需要额外的初始化,直接声明变量(如 var mu sync.Mutex)即可

很多情况下,Mutex 会嵌入到其它 struct 中使用,比如下面的方式:

type Counter struct {
    mu    sync.Mutex
    Count uint64
}

在初始化嵌入的 struct 时,也不必初始化这个 Mutex 字段,不会因为没有初始化出现空指针或者是无法获取到锁的情况

甚至,你还可以把获取锁、释放锁、计数加一的逻辑封装成一个方法,对外不需要暴露锁等逻辑:

func main() {
    // 封装好的计数器
    var counter Counter

    var wg sync.WaitGroup
    wg.Add(10)

    // 启动10个goroutine
    for i := 0; i < 10; i++ {
        go func() {
            defer wg.Done()
            // 执行10万次累加
            for j := 0; j < 100000; j++ {
                counter.Incr() // 受到锁保护的方法
            }
        }()
    }
    wg.Wait()
    fmt.Println(counter.Count())
}

// 线程安全的计数器类型
type Counter struct {
    CounterType int
    Name        string

    mu    sync.Mutex
    count uint64
}

// 加1的方法,内部使用互斥锁保护
func (c *Counter) Incr() {
    c.mu.Lock()
    c.count++
    c.mu.Unlock()
}

// 得到计数器的值,也需要锁保护
func (c *Counter) Count() uint64 {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.count
}

使用互斥锁的误区

  • Lock 和 Unlock 没有成对出现:

    • Lock/Unlock 没有成对出现,就意味着会出现死锁的情况,或者是因为 Unlock 一个未加锁的 Mutex 而导致 panic。
  • Copy 已使用的 Mutex:

    • 多发在参数传递嵌套了Mutex 的结构体时,此时参数应该设置成结构体的指针类型

    • type Counter struct {
          sync.Mutex
          Count int
      }
      
      
      func main() {
          var c Counter
          c.Lock()
          defer c.Unlock()
          c.Count++
          foo(c) // 复制锁
      }
      
      // 这里Counter的参数是通过复制的方式传入的
      func foo(c Counter) {
          c.Lock()
          defer c.Unlock()
          fmt.Println("in foo")
      }
  • Mutex 是一个不可重入锁,这点区别于Java的 sychronized 和 ReentrantLock,所以,一旦误用 Mutex 的重入,就会导致报错。下面是一个误用 Mutex 的重入例子:

    func foo(l sync.Locker) {
        fmt.Println("in foo")
        l.Lock()
        bar(l)
        l.Unlock()
    }
    
    
    func bar(l sync.Locker) {
        l.Lock()
        fmt.Println("in bar")
        l.Unlock()
    }
    
    
    func main() {
        l := &sync.Mutex{}
        foo(l)
    }
  • 死锁:先解释下什么是死锁。两个或两个以上的进程(或线程,goroutine)在执行过程中,因争夺共享资源而处于一种互相等待的状态,如果没有外部干涉,它们都将无法推进下去,此时,我们称系统处于死锁状态或系统产生了死锁。

    • 死锁一般是由环路等待造成的:存在一组等待进程,P={P1,P2,…,PN},P1 等待 P2 持有的资源,P2 等待 P3 持有的资源,依此类推,最后是 PN 等待 P1 持有的资源,这就形成了一个环路等待的死结。