Skip to content

Latest commit

 

History

History
252 lines (200 loc) · 12 KB

04.00-Mutex:骇客编程,如何拓展额外功能?.md

File metadata and controls

252 lines (200 loc) · 12 KB

Mutex:骇客编程,如何拓展额外功能?

前面我们搞了互斥锁 Mutex 的基本用法、实现原理以及易错场景,可以说是涵盖了互斥锁的方方面面。如果你能熟练掌握这些内容,那么,在大多数的开发场景中,你都可以得心应手。

但是,在一些特定的场景中,这些基础功能是不足以应对的。这个时候,我们就需要开发一些扩展功能了。

比如说,我们知道,如果互斥锁被某个 goroutine 获取了,而且还没有释放,那么,其他请求这把锁的 goroutine,就会阻塞等待,直到有机会获得这把锁。有时候阻塞并不是一个很好的主意,比如你请求锁更新一个计数器,如果获取不到锁的话没必要等待,大不了这次不更新,我下次更新就好了,如果阻塞的话会导致业务处理能力的下降。

再比如,如果我们要监控锁的竞争情况,一个监控指标就是,等待这把锁的 goroutine 数量。我们可以把这个指标推送到时间序列数据库中,再通过一些监控系统(比如 Grafana)展示出来。要知道,锁是性能下降的“罪魁祸首”之一,所以,有效地降低锁的竞争,就能够很好地提高性能。因此,监控关键互斥锁上等待的 goroutine 的数量,是我们分析锁竞争的激烈程度的一个重要指标。

其实吧,你要是把资源安排的妥妥当当,又要等待吗?还需要阻塞吗?顺畅的一痞性能不久高起来了吗?或者这个资源不行,我们换个资源,先做其它的,然后再等就行了呀。所以我们可以通过 Hacker 的方式,为 Mutex 增加一些额外的功能。包括实现 TryLock,获取等待者的数量等指标,以及实现一个线程安全的队列。

TryLock

我们可以为 Mutex 添加一个 TryLock 的方法,也就是尝试获取排外锁。

这个方法具体是什么意思呢?当一个 goroutine 调用这个 TryLock 方法请求锁的时候,如果这把锁没有被其他 goroutine 所持有,那么,这个 goroutine 就持有了这把锁,并返回 true;如果这把锁已经被其他 goroutine 所持有,或者是正在准备交给某个被唤醒的 goroutine,那么,这个请求锁的 goroutine 就直接返回 false,不会阻塞在方法调用上。(厕所的门锁外是不是有个红绿标志,红色表示里面有人,绿色表示里面没人,是不是一看就懂!)

如下图所示,如果 Mutex 已经被一个 goroutine 持有,调用 Lock 的 goroutine 阻塞排队等待,调用 TryLock 的 goroutine 直接得到一个 false 返回。 TryLock图示 在实际开发中,如果要更新配置数据,我们通常需要加锁,这样可以避免同时有多个 goroutine 并发修改数据。有的时候,我们也会使用 TryLock。这样一来,当某个 goroutine 想要更改配置数据时,如果发现已经有 goroutine 在更改了,其他的 goroutine 调用 TryLock,返回了 false,这个 goroutine 就会放弃更改。

那怎么实现一个扩展 TryLock 方法的 Mutex 呢?我们直接来看代码。

package main
import (
	"fmt"
	"math/rand"
	"sync"
	"sync/atomic"
	"time"
	"unsafe"
)
// 复制Mutex定义的常量
const (
	mutexLocked = 1 << iota // 加锁标识位置
	mutexWoken              // 唤醒标识位置
	mutexStarving           // 锁饥饿标识位置
	mutexWaiterShift = iota // 标识waiter的起始bit位置
)

// 扩展一个Mutex结构
type Mutex struct {
	sync.Mutex
}

// 尝试获取锁
func (m *Mutex) TryLock() bool {
	// 如果能成功抢到锁
	if atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.Mutex)), 0, mutexLocked) {
		return true
	}

	// 如果处于唤醒、加锁或者饥饿状态,这次请求就不参与竞争了,返回false
	old := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
	if old&(mutexLocked|mutexStarving|mutexWoken) != 0 {
		return false
	}

	// 尝试在竞争的状态下请求锁
	new := old | mutexLocked
	return atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.Mutex)), old, new)
}
/*
这个测试程序的工作机制是这样子的:程序运行时会启动一个 goroutine 持有这把我们自己实现的锁,
经过随机的时间才释放。主 goroutine 会尝试获取这把锁。
如果前一个 goroutine 一秒内释放了这把锁,那么,主 goroutine
就有可能获取到这把锁了,输出“got the lock”,
否则没有获取到也不会被阻塞,会直接输出“can't get the lock”。
 */
func try() {
	var mu Mutex
	go func() { // 启动一个goroutine持有一段时间的锁
		mu.Lock()
		time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
		mu.Unlock()
	}()

	time.Sleep(time.Second)

	ok := mu.TryLock() // 尝试获取到锁
	if ok { // 获取成功
		fmt.Println("got the lock")
		// do something
		mu.Unlock()

	}else {
		// 没有获取到
		fmt.Println("can't get the lock")
	}
	time.Sleep(time.Second)
	ok = mu.TryLock() // 尝试获取到锁
	if ok { // 获取成功
		fmt.Println("got the lock")
		// do something
		mu.Unlock()

	}else {
		// 没有获取到
		fmt.Println("can't get the lock")
	}

}

func main() {
	try()
}

获取等待者的数量等指标

在state字段中,上次有说到一个字段是设计了多个意思,通过对底层字节的切割,实现了对不同状态的显示。回顾一下Mutex的数据结构可知,它包含两个字段,state 和 sema。前四个字节(int32)就是 state 字段。

type Mutex struct {
    state int32
    sema  uint32
}

Mutex 结构中的 state 字段有很多个含义,通过 state 字段,你可以知道锁是否已经被某个 goroutine 持有、当前是否处于饥饿状态、是否有等待的 goroutine 被唤醒、等待者的数量等信息。但是,state 这个字段并没有暴露出来,所以,我们需要想办法获取到这个字段,并进行解析。

怎么获取未暴露的字段呢?很简单,我们可以通过 unsafe 的方式实现。

const (
    mutexLocked = 1 << iota // mutex is locked
    mutexWoken
    mutexStarving
    mutexWaiterShift = iota
)

type Mutex struct {
    sync.Mutex
}

func (m *Mutex) Count() int {
    // 获取state字段的值
    v := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
    v = v >> mutexWaiterShift //得到等待者的数值
    v = v + (v & mutexLocked) //再加上锁持有者的数量,0或者1
    return int(v)
}

这个例子的第 14 行通过 unsafe 操作,我们可以得到 state 字段的值。第 15 行我们右移三位(这里的常量 mutexWaiterShift 的值为 3),就得到了当前等待者的数量。如果当前的锁已经被其他 goroutine 持有,那么,我们就稍微调整一下这个值,加上一个 1(第 16 行),你基本上可以把它看作是当前持有和等待这把锁的 goroutine 的总数。

state 这个字段的第一位是用来标记锁是否被持有,第二位用来标记是否已经唤醒了一个等待者,第三位标记锁是否处于饥饿状态,通过分析这个 state 字段我们就可以得到这些状态信息。我们可以为这些状态提供查询的方法,这样就可以实时地知道锁的状态了。

// 锁是否被持有
func (m *Mutex) IsLocked() bool {
    state := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
    return state&mutexLocked == mutexLocked
}

// 是否有等待者被唤醒
func (m *Mutex) IsWoken() bool {
    state := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
    return state&mutexWoken == mutexWoken
}

// 锁是否处于饥饿状态
func (m *Mutex) IsStarving() bool {
    state := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
    return state&mutexStarving == mutexStarving
}

我们可以写一个程序测试一下,比如,在 1000 个 goroutine 并发访问的情况下,我们可以把锁的状态信息输出出来

func count() {
    var mu Mutex
    for i := 0; i < 1000; i++ { // 启动1000个goroutine
        go func() {
            mu.Lock()
            time.Sleep(time.Second)
            mu.Unlock()
        }()
    }

    time.Sleep(time.Second)
    // 输出锁的信息
    fmt.Printf("waitings: %d, isLocked: %t, woken: %t,  starving: %t\n", mu.Count(), mu.IsLocked(), mu.IsWoken(), mu.IsStarving())
}

有一点你需要注意一下,在获取 state 字段的时候,并没有通过 Lock 获取这把锁,所以获取的这个 state 的值是一个瞬态的值,可能在你解析出这个字段之后,锁的状态已经发生了变化。不过没关系,因为你查看的就是调用的那一时刻的锁的状态。

使用 Mutex 实现一个线程安全的队列

因为 Mutex 经常会和其他非线程安全(对于 Go 来说,我们其实指的是 goroutine 安全)的数据结构一起,组合成一个线程安全的数据结构。新数据结构的业务逻辑由原来的数据结构提供,而 Mutex 提供了锁的机制,来保证线程安全

比如队列,我们可以通过 Slice 来实现,但是通过 Slice 实现的队列不是线程安全的,出队(Dequeue)和入队(Enqueue)会有 data race 的问题。这个时候,Mutex 就要隆重出场了,通过它,我们可以在出队和入队的时候加上锁的保护。

其实呢就是在处理改动的时候将队列锁上嘛,太悲观了,但是也造成了现在的安全局面。

type SliceQueue struct {
    data []interface{}
    mu   sync.Mutex
}

func NewSliceQueue(n int) (q *SliceQueue) {
    return &SliceQueue{data: make([]interface{}, 0, n)}
}

// Enqueue 把值放在队尾
func (q *SliceQueue) Enqueue(v interface{}) {
    q.mu.Lock()
    q.data = append(q.data, v)
    q.mu.Unlock()
}

// Dequeue 移去队头并返回
func (q *SliceQueue) Dequeue() interface{} {
    q.mu.Lock()
    if len(q.data) == 0 {
        q.mu.Unlock()
        return nil
    }
    v := q.data[0]
    q.data = q.data[1:]
    q.mu.Unlock()
    return v
}

总结

Mutex 是 package sync 的基石,其他的一些同步原语也是基于它实现的,所以,我们“隆重”地用了四讲来深度学习它。学到后面,你一定能感受到,多花些时间来完全掌握 Mutex 是值得的。

今天这一讲我和你分享了几个 Mutex 的拓展功能,这些方法是不是给你带来了一种“骇客”的编程体验呢,通过 Hacker 的方式,我们真的可以让 Mutex 变得更强大。

我们学习了基于 Mutex 实现 TryLock,通过 unsafe 的方式读取到 Mutex 内部的 state 字段,这样,我们就解决了开篇列举的问题,一是不希望锁的 goroutine 继续等待,一是想监控锁。

另外,使用 Mutex 组合成更丰富的数据结构是我们常见的场景,今天我们就实现了一个线程安全的队列,未来我们还会讲到实现线程安全的 map 对象

到这里,Mutex 我们就系统学习完了,最后给你总结了一张 Mutex 知识地图,帮你复习一下。

mutex知识结构

思考题

你可以为 Mutex 获取锁时加上 Timeout 机制吗?会有什么问题吗?

优秀回答:最简单直接的是采用channel实现,用select监听锁和timeout两个channel,不在今天的讨论范围内。

  1. 用for循环+TryLock实现: 先记录开始的时间,用for循环判断是否超时:没有超时则反复尝试tryLock,直到获取成功;如果超时直接返回失败。

问题:高频的CAS自旋操作,如果失败的太多,会消耗大量的CPU。

  1. 优化1:TryLock的fast的拆分 TryLock的抢占实现分为两部分,一个是fast path,另一个是竞争状态下的,后者的cas操作很多。我会考虑减少slow方法的频率,比如调用n次fast path失败后,再调用一次整个Trylock。

  2. 优化2:借鉴TCP重试机制 for循环中的重试增加休眠时间,每次失败将休眠时间乘以一个系数(如1.5),直到达到上限(如10ms),减少自旋带来的性能损耗