-
Notifications
You must be signed in to change notification settings - Fork 4.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Browse the repository at this point in the history
This implements a LRU cache on top of the FileHasher from hasher.go, it will be used in the new backend for the system process module on linux. The cache is indexed by file path and stores the metadata (what we get from stat(2)/statx(2)) along with the hashes of each file. When we want to hash a file: we stat() the file, then do cache lookup and compare against the stored metadata, if it differs, we rehash, if not we use the cached values. The cache ignores access time (atime), it's only interested in write modifications, if the machine doesn't support statx(2) it falls back to stat(2) but uses the same Unix.Statx_t. With this we end up with a stat() + lookup on the hotpath, and a stat() + stat() + insert on the cold path. The motivation for this is that the new backend ends up fetching "all processes", which in turn causes it to try to hash at every event, the current/old hasher just can't cope with it: 1. Hashing for each event is simply to expensive, in the 100us-50ms range on the default configuration, which puts us below 1000/s. 2. It has a scan rate throttling that on the default configuration ends easily at 40ms per event (25/s). With the cache things improve considerably, we stay below 5us (200k/s) in all cases: ``` MISSES "miss (/usr/sbin/sshd) took 2.571359ms" "miss (/usr/bin/containerd) took 52.099386ms" "miss (/usr/sbin/gssproxy) took 160us" "miss (/usr/sbin/atd) took 50.032us" HITS "hit (/usr/sbin/sshd) took 2.163us" "hit (/usr/lib/systemd/systemd) took 3.024us" "hit (/usr/lib/systemd/systemd) took 859ns" "hit (/usr/sbin/sshd) took 805ns" ``` (cherry picked from commit 8ec2e31) Co-authored-by: Christiano Haesbaert <haesbaert@elastic.co>
- Loading branch information
1 parent
05fd06c
commit 47b2060
Showing
3 changed files
with
373 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,221 @@ | ||
// Licensed to Elasticsearch B.V. under one or more contributor | ||
// license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright | ||
// ownership. Elasticsearch B.V. licenses this file to you under | ||
// the Apache License, Version 2.0 (the "License"); you may | ||
// not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
//go:build linux | ||
|
||
package hasher | ||
|
||
import ( | ||
"errors" | ||
"time" | ||
|
||
lru "github.com/hashicorp/golang-lru/v2" | ||
"golang.org/x/sys/unix" | ||
|
||
"github.com/elastic/elastic-agent-libs/logp" | ||
) | ||
|
||
// CachedHasher is a metadata aware FileHasher with a LRU cache on top of it. | ||
type CachedHasher struct { | ||
hasher *FileHasher | ||
hashLRU *lru.Cache[string, hashEntry] | ||
hasStatx bool | ||
stats CachedHasherStats | ||
log *logp.Logger | ||
} | ||
|
||
// CachedHasherStats are basics statistics for debugging and testing. | ||
type CachedHasherStats struct { | ||
Hits uint64 | ||
Misses uint64 | ||
Invalidations uint64 | ||
Evictions uint64 | ||
} | ||
|
||
// hashEntry is an entry in the LRU cache. | ||
type hashEntry struct { | ||
statx unix.Statx_t | ||
hashes map[HashType]Digest | ||
} | ||
|
||
// NewFileHasherWithCache creates a CachedHasher with space up to size elements. | ||
func NewFileHasherWithCache(c Config, size int) (*CachedHasher, error) { | ||
// We don't rate limit our hashes, we cache | ||
c.ScanRateBytesPerSec = 0 | ||
hasher, err := NewFileHasher(c, nil) | ||
if err != nil { | ||
return nil, err | ||
} | ||
hashLRU, err := lru.New[string, hashEntry](size) | ||
if err != nil { | ||
return nil, err | ||
} | ||
var nada unix.Statx_t | ||
hasStatx := unix.Statx(-1, "/", 0, unix.STATX_ALL|unix.STATX_MNT_ID, &nada) != unix.ENOSYS | ||
|
||
return &CachedHasher{ | ||
hasher: hasher, | ||
hashLRU: hashLRU, | ||
hasStatx: hasStatx, | ||
log: logp.NewLogger("cached_hasher"), | ||
}, nil | ||
} | ||
|
||
// HashFile looks up a hashEntry in the cache, if the lookup fails, | ||
// the hash is computed, inserted in the cache, and returned. If the | ||
// lookup succeeds but the file metadata changed, the entry is evicted | ||
// and refreshed. | ||
func (ch *CachedHasher) HashFile(path string) (map[HashType]Digest, error) { | ||
var x time.Time | ||
if logp.IsDebug("cached_hasher") { | ||
x = time.Now() | ||
} | ||
|
||
// See if we have it stored | ||
if entry, ok := ch.hashLRU.Get(path); ok { | ||
statx, err := ch.statxFromPath(path) | ||
if err != nil { | ||
// No point in keeping an entry if we can't compare | ||
if !ch.hashLRU.Remove(path) { | ||
err := errors.New("can't remove existing entry, this is a bug") | ||
ch.log.Error(err) | ||
} | ||
return nil, err | ||
} | ||
// If metadata didn't change, this is a good entry, if not fall and rehash | ||
if statx == entry.statx { | ||
ch.log.Debugf("hit (%s) took %v", path, time.Since(x)) | ||
ch.stats.Hits++ | ||
return entry.hashes, nil | ||
} | ||
// Zap from lru | ||
if !ch.hashLRU.Remove(path) { | ||
err := errors.New("can't remove existing entry, this is a bug") | ||
ch.log.Error(err) | ||
return nil, err | ||
} else { | ||
ch.stats.Invalidations++ | ||
ch.log.Debugf("invalidate (%s)", path) | ||
} | ||
} | ||
// Nah, so do the hard work | ||
hashes, err := ch.hasher.HashFile(path) | ||
if err != nil { | ||
return nil, err | ||
} | ||
// Fetch metadata | ||
statx, err := ch.statxFromPath(path) | ||
if err != nil { | ||
return nil, err | ||
} | ||
// Insert | ||
entry := hashEntry{hashes: hashes, statx: statx} | ||
if ch.hashLRU.Add(path, entry) { | ||
ch.stats.Evictions++ | ||
ch.log.Debugf("evict (%s)") | ||
} | ||
|
||
ch.log.Debugf("miss (%s) took %v", path, time.Since(x)) | ||
ch.stats.Misses++ | ||
|
||
return entry.hashes, nil | ||
} | ||
|
||
// Close releases all resources | ||
func (ch *CachedHasher) Close() { | ||
ch.hashLRU.Purge() | ||
} | ||
|
||
// Stats returns basic stats suitable for debugging and testing | ||
func (ch *CachedHasher) Stats() CachedHasherStats { | ||
return ch.stats | ||
} | ||
|
||
// statxFromPath returns the metadata (unix.Statx_t) of path. In case | ||
// the system doesn't support statx(2), it uses stat(2) and fills the | ||
// corresponding members of unix.Statx_t, leaving the remaining members | ||
// with a zero value. | ||
func (ch *CachedHasher) statxFromPath(path string) (unix.Statx_t, error) { | ||
if ch.hasStatx { | ||
var tmpstx unix.Statx_t | ||
err := unix.Statx(-1, path, 0, unix.STATX_ALL|unix.STATX_MNT_ID, &tmpstx) | ||
if err != nil { | ||
return unix.Statx_t{}, err | ||
} | ||
|
||
// This might look stupid, but it guarantees we only compare | ||
// the members we are really interested, unix.Statx_t grows | ||
// with time, so if they ever add a member that changes all | ||
// the time, we don't introduce a bug where we compare things | ||
// we don't want to. | ||
return unix.Statx_t{ | ||
Mask: tmpstx.Mask, | ||
Blksize: tmpstx.Blksize, | ||
Attributes: tmpstx.Attributes, | ||
Nlink: tmpstx.Nlink, | ||
Uid: tmpstx.Uid, | ||
Gid: tmpstx.Gid, | ||
Mode: tmpstx.Mode, | ||
Ino: tmpstx.Ino, | ||
Size: tmpstx.Size, | ||
Blocks: tmpstx.Blocks, | ||
Attributes_mask: tmpstx.Attributes_mask, | ||
Btime: tmpstx.Btime, | ||
Ctime: tmpstx.Ctime, | ||
Mtime: tmpstx.Mtime, | ||
Rdev_minor: tmpstx.Rdev_minor, | ||
Rdev_major: tmpstx.Rdev_major, | ||
// no Atime | ||
// no Dio_mem_align | ||
// no Dio_offset_align | ||
// no Subvol | ||
// no Atomic_write_unit_min | ||
// no Atomic_write_unit_max | ||
// no Atomic_write_segments_max | ||
}, nil | ||
} | ||
|
||
// No statx(2), fallback to stat(2) | ||
var st unix.Stat_t | ||
if err := unix.Stat(path, &st); err != nil { | ||
return unix.Statx_t{}, err | ||
} | ||
|
||
return unix.Statx_t{ | ||
Dev_major: unix.Major(st.Dev), | ||
Dev_minor: unix.Minor(st.Dev), | ||
Ino: st.Ino, | ||
Nlink: uint32(st.Nlink), | ||
Mode: uint16(st.Mode), | ||
Uid: st.Uid, | ||
Gid: st.Gid, | ||
Rdev_major: unix.Major(st.Rdev), | ||
Rdev_minor: unix.Minor(st.Rdev), | ||
Size: uint64(st.Size), | ||
Blksize: uint32(st.Blksize), | ||
Blocks: uint64(st.Blocks), | ||
Mtime: unix.StatxTimestamp{ | ||
Nsec: uint32(st.Mtim.Nsec), | ||
Sec: st.Mtim.Sec, | ||
}, | ||
Ctime: unix.StatxTimestamp{ | ||
Nsec: uint32(st.Ctim.Nsec), | ||
Sec: st.Ctim.Sec, | ||
}, | ||
// no Atime | ||
}, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
// Licensed to Elasticsearch B.V. under one or more contributor | ||
// license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright | ||
// ownership. Elasticsearch B.V. licenses this file to you under | ||
// the Apache License, Version 2.0 (the "License"); you may | ||
// not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
//go:build linux | ||
|
||
package hasher | ||
|
||
import ( | ||
"io" | ||
"os" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
type pattern struct { | ||
text []byte | ||
md5 string | ||
sha1 string | ||
sha256 string | ||
sha512 string | ||
sha3_384 string | ||
} | ||
|
||
var patternA = pattern{ | ||
text: []byte("Rather than love, than money, than fame, give me truth.\n"), | ||
md5: "572698a28f439d3c2647c67df75ed22f", | ||
sha1: "511c4040962d493ba9cb2c0748137c11e42eb46b", | ||
sha256: "19c76b22dd0bf97b0bf064e6587961938ba9f4ab73d034b0edac6c2c2829c0cd", | ||
sha512: "e339322ed81208f930047e8b94db504f40a3e8bb2af75511925e3469488104edcd8eb8c613ea7fd0b08199a4d7061690512a05f66b50b4427470d6c8cf2d74a3", | ||
sha3_384: "9961640983a079920f74f2503feb5ce63325d6a6cd0138905e9419c4307043fa324217587062ac8648cbf43138a33034", | ||
} | ||
|
||
var patternB = pattern{ | ||
text: []byte("From womb to tomb, in kindness and crime.\n"), | ||
md5: "e3d72a80f13b9c1e4b07a7182b934502", | ||
sha1: "90da69d7b93ef792e8e4506543506975018df980", | ||
sha256: "67606f88f25357b2b101e94bd02fc5da8dd2993391b88596c15bea77780a6a77", | ||
sha512: "23c3779d7c6a8d4be2ca7a0bf412a2c99ea2f8a95ac21f56e3b9cb1bd0c0427bf2db91bbb484128f53ef48fbbfc97e525b328e1c4c0f8d24dd8a3f438c449736", | ||
sha3_384: "2034d02ad7b46831b9f2bf09b2eaa77bfcf70ebd136f29b95e6723cc6bf94d0fb7aae972dd2297b5507bb568cb65563b", | ||
} | ||
|
||
var config = Config{ | ||
HashTypes: []HashType{MD5, SHA1, SHA256, SHA512, SHA3_384}, | ||
MaxFileSize: "1 KiB", | ||
MaxFileSizeBytes: 1024, | ||
} | ||
|
||
func TestCachedHasher(t *testing.T) { | ||
ch, err := NewFileHasherWithCache(config, 1) | ||
require.NoError(t, err) | ||
doTestCachedHasher(t, ch) | ||
} | ||
|
||
func TestCachedHasherWithStat(t *testing.T) { | ||
ch, err := NewFileHasherWithCache(config, 1) | ||
require.NoError(t, err) | ||
ch.hasStatx = false | ||
doTestCachedHasher(t, ch) | ||
} | ||
|
||
func doTestCachedHasher(t *testing.T, ch *CachedHasher) { | ||
// Create a file | ||
file := mkTemp(t) | ||
defer file.Close() | ||
|
||
// Write patternA and confirm first hash is a miss | ||
writePattern(t, file, patternA) | ||
ch.checkState(t, file.Name(), patternA, CachedHasherStats{Misses: 1}) | ||
|
||
// Prove a subsequent hash hits the cache | ||
ch.checkState(t, file.Name(), patternA, CachedHasherStats{Misses: 1, Hits: 1}) | ||
|
||
// Prove changing access time still causes a hit. | ||
// Note: we can't use os.Chtimes() to change _only_ atime, it | ||
// might end up modifying mtime since it can round/truncate | ||
// value we would get from file.Stat().ModTime() | ||
time.Sleep(time.Millisecond * 2) | ||
_, err := os.ReadFile(file.Name()) | ||
require.NoError(t, err) | ||
ch.checkState(t, file.Name(), patternA, CachedHasherStats{Misses: 1, Hits: 2}) | ||
|
||
// Prove changing mtime invalides the entry, and causes a miss | ||
ostat, err := file.Stat() | ||
require.NoError(t, err) | ||
mtime := ostat.ModTime().Add(time.Hour) | ||
require.NoError(t, os.Chtimes(file.Name(), mtime, mtime)) | ||
ch.checkState(t, file.Name(), patternA, CachedHasherStats{Misses: 2, Hits: 2, Invalidations: 1}) | ||
|
||
// Write the second pattern, prove it's a miss | ||
writePattern(t, file, patternB) | ||
ch.checkState(t, file.Name(), patternB, CachedHasherStats{Misses: 3, Hits: 2, Invalidations: 2}) | ||
|
||
// Hash something else, prove first one is evicted | ||
file2 := mkTemp(t) | ||
defer file2.Close() | ||
writePattern(t, file2, patternA) | ||
ch.checkState(t, file2.Name(), patternA, CachedHasherStats{Misses: 4, Hits: 2, Invalidations: 2, Evictions: 1}) | ||
|
||
// If we go back and lookup the original path, prove we should evict again and it's a miss | ||
ch.checkState(t, file.Name(), patternB, CachedHasherStats{Misses: 5, Hits: 2, Invalidations: 2, Evictions: 2}) | ||
|
||
// If we close, prove we purge | ||
require.Equal(t, ch.hashLRU.Len(), 1) | ||
ch.Close() | ||
require.Equal(t, ch.hashLRU.Len(), 0) | ||
} | ||
|
||
func mkTemp(t *testing.T) *os.File { | ||
file, err := os.CreateTemp(t.TempDir(), "cached_hasher_test_*") | ||
require.NoError(t, err) | ||
|
||
return file | ||
} | ||
|
||
func writePattern(t *testing.T, file *os.File, p pattern) { | ||
err := file.Truncate(0) | ||
require.NoError(t, err) | ||
_, err = file.Seek(0, io.SeekStart) | ||
require.NoError(t, err) | ||
n, err := file.Write(p.text) | ||
require.NoError(t, err) | ||
require.Equal(t, n, len(p.text)) | ||
} | ||
|
||
func (ch *CachedHasher) checkState(t *testing.T, path string, p pattern, stats CachedHasherStats) { | ||
hashes, err := ch.HashFile(path) | ||
require.NoError(t, err) | ||
require.Len(t, hashes, 5) | ||
require.Equal(t, p.md5, hashes["md5"].String()) | ||
require.Equal(t, p.sha1, hashes["sha1"].String()) | ||
require.Equal(t, p.sha256, hashes["sha256"].String()) | ||
require.Equal(t, p.sha512, hashes["sha512"].String()) | ||
require.Equal(t, p.sha3_384, hashes["sha3_384"].String()) | ||
require.Equal(t, stats, ch.Stats()) | ||
} |