Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

singleflight 源码阅读 #26

Open
LLLeon opened this issue May 9, 2021 · 0 comments
Open

singleflight 源码阅读 #26

LLLeon opened this issue May 9, 2021 · 0 comments
Labels
SourceCodeAnalysis_Go Golang source code analysis notes.

Comments

@LLLeon
Copy link
Owner

LLLeon commented May 9, 2021

缓存击穿

在缓存系统中,当某个热点数据的缓存过期时,如果瞬间有大量请求到达 DB,可能会导致 DB 出现问题。

怎么解决?可以让先到达的请求将最新数据更新到缓存,其它请求再使用缓存数据即可。

Go 中的 singleflight 包就可以用于这个场景。它可以只让其中一个请求得到执行,其余请求会阻塞到该请求返回执行结果并使用该结果,从而达到防止击穿的效果。

singleflight 源码

singleflight.go 的源码比较简单:

// 表示一组请求
type call struct {
  // 用来阻塞其余请求
	wg sync.WaitGroup

	// 被调用的函数返回的结果和 err 赋值给这两个字段
	val interface{}
	err error

	// forgotten indicates whether Forget was called with this call's key while the call was still in flight.
	forgotten bool

	// These fields are read and written with the singleflight
	// mutex held before the WaitGroup is done, and are read but
	// not written after the WaitGroup is done.
	dups  int
	chans []chan<- Result
}

// 用来存储不同 key 的请求组
type Group struct {
	mu sync.Mutex       // protects m
	m  map[string]*call // lazily initialized
}

// Result holds the results of Do, so they can be passed
// on a channel.
type Result struct {
	Val    interface{}
	Err    error
	Shared bool
}

// 执行指定函数并返回结果
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
	g.mu.Lock()
  // 延迟初始化
	if g.m == nil {
		g.m = make(map[string]*call)
	}
  // 如果当前请求之前已经有对该 key 的请求, 则阻塞当前请求
	if c, ok := g.m[key]; ok {
		c.dups++
		g.mu.Unlock() // 释放锁
		c.wg.Wait() // wg 的这种用法妙啊

		if e, ok := c.err.(*panicError); ok {
			panic(e)
		} else if c.err == errGoexit {
			runtime.Goexit()
		}
    // 使用前面请求返回的结果
		return c.val, c.err, true
	}
  
	c := new(call)
	c.wg.Add(1)
	g.m[key] = c // 用 key 来标识一组同样的请求
	g.mu.Unlock()

  // 调用 fn 并返回结果
	g.doCall(c, key, fn)
	return c.val, c.err, c.dups > 0
}

// 与 Do 方法效果一样,不过这里不阻塞当前请求,而是直接返回一个 channel 用于接收结果
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
	ch := make(chan Result, 1) // 当前请求从这个 ch 接收结果
	g.mu.Lock()
	if g.m == nil {
		g.m = make(map[string]*call)
	}
	if c, ok := g.m[key]; ok {
		c.dups++
		c.chans = append(c.chans, ch)
		g.mu.Unlock()
		return ch
	}

	c := &call{chans: []chan<- Result{ch}}
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()

  // 异步调用 fn
	go g.doCall(c, key, fn)

	return ch
}

// doCall handles the single call for a key.
// 对 fn 执行调用, 并将结果
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
	normalReturn := false
	recovered := false

	// use double-defer to distinguish panic from runtime.Goexit,
	// more details see https://golang.org/cl/134395
  // 这个 defer 会后执行, 负责判断
	defer func() {
		// the given function invoked runtime.Goexit
		if !normalReturn && !recovered {
			c.err = errGoexit
		}

		c.wg.Done()
		g.mu.Lock()
		defer g.mu.Unlock()
		if !c.forgotten { // 返回之前删除这个 key, 标识着本次对 key 的调用完成
			delete(g.m, key)
		}

		if e, ok := c.err.(*panicError); ok {
			// In order to prevent the waiting channels from being blocked forever,
			// needs to ensure that this panic cannot be recovered.
			if len(c.chans) > 0 {
				go panic(e)
				select {} // Keep this goroutine around so that it will appear in the crash dump.
			} else {
				panic(e)
			}
		} else if c.err == errGoexit {
			// Already in the process of goexit, no need to call again
		} else {
      // doCall 返回之前将调用结果写入到所有其余等待结果的请求的 channel 中
			for _, ch := range c.chans {
				ch <- Result{c.val, c.err, c.dups > 0}
			}
		}
	}()

  // 执行 fn 的部分, 而且是在一个匿名函数里面执行.
  // 这里又用到了一个 defer(这个 defer 会先执行), 用来区分 panic 和 runtime.Goexit.
	func() {
		defer func() {
			if !normalReturn {
				if r := recover(); r != nil {
					c.err = newPanicError(r) // 如果 panic 了, 修改 c.err 的值
				}
			}
		}()

		c.val, c.err = fn()
		normalReturn = true
	}()

	if !normalReturn {
		recovered = true
	}
}

// Forget tells the singleflight to forget about a key.  Future calls
// to Do for this key will call the function rather than waiting for
// an earlier call to complete.
func (g *Group) Forget(key string) {
	g.mu.Lock()
	if c, ok := g.m[key]; ok {
		c.forgotten = true
	}
	delete(g.m, key)
	g.mu.Unlock()
}

代码虽然不多,里面一些巧妙的用法可以学习。

@LLLeon LLLeon added the SourceCodeAnalysis_Go Golang source code analysis notes. label May 9, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
SourceCodeAnalysis_Go Golang source code analysis notes.
Projects
None yet
Development

No branches or pull requests

1 participant