Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace dgraph-io/badger cache storage with etcd-io/bbolt #42571

Open
wants to merge 36 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
4f10346
add bbolt store for key-value cache
stefans-elastic Jan 31, 2025
fbe7ebf
fix unit test failures
stefans-elastic Feb 3, 2025
7fa4f37
delete expired keys
stefans-elastic Feb 3, 2025
1c4a798
set expireAt properly
stefans-elastic Feb 3, 2025
183b395
add unit tests
stefans-elastic Feb 3, 2025
36c3933
add missing license headers
stefans-elastic Feb 3, 2025
9b20f2d
go mod tidy, update NOTICE.txt
stefans-elastic Feb 3, 2025
f2303dd
fix linter issues
stefans-elastic Feb 3, 2025
19bed5b
add more godoc comments
stefans-elastic Feb 4, 2025
17372b1
Merge branch 'main' of github.com:stefans-elastic/beats into drop-dba…
stefans-elastic Feb 4, 2025
7ab7864
add CHANGELOG-developer.next.asciidoc entry
stefans-elastic Feb 4, 2025
91a3ac2
rename Ttl to TTL to comply with styleguide
stefans-elastic Feb 4, 2025
6ea76db
simplify Set method transaction code
stefans-elastic Feb 5, 2025
21e1b2f
Get/Set transaction refactoring
stefans-elastic Feb 5, 2025
ae592ab
adress PR comments
stefans-elastic Feb 5, 2025
6041c9b
Update x-pack/libbeat/kv/bbolt/bbolt.go
stefans-elastic Feb 5, 2025
aec3e8e
rename Kv -> KV
stefans-elastic Feb 5, 2025
26e8277
Merge branch 'drop-dbadger-io' of github.com:stefans-elastic/beats in…
stefans-elastic Feb 5, 2025
e474690
Merge branch 'main' of github.com:stefans-elastic/beats into drop-dba…
stefans-elastic Feb 6, 2025
8675a71
replace assert.NoError with require.NoError in unit test
stefans-elastic Feb 6, 2025
d606724
don't store TTL as part of KV value in bbolt store
stefans-elastic Feb 6, 2025
2cc0ccd
rename Connect -> Open
stefans-elastic Feb 10, 2025
2b9137d
Merge branch 'main' into drop-dbadger-io
stefans-elastic Feb 10, 2025
0614e03
add debug log upon TTL refreshing (if refreshOnAccess is true)
stefans-elastic Feb 10, 2025
94d8f6d
Merge branch 'drop-dbadger-io' of github.com:stefans-elastic/beats in…
stefans-elastic Feb 10, 2025
5f22d0f
update godoc comment for x-pack/libbeat/persistentcache/persistentcac…
stefans-elastic Feb 10, 2025
dcd7bc7
PR comment resolved
stefans-elastic Feb 10, 2025
fa9f41c
rename boltKvStore -> bboltkv on import renaming to make it more idio…
stefans-elastic Feb 10, 2025
43d2f8f
include error in log message
stefans-elastic Feb 10, 2025
041ddb8
Merge branch 'main' into drop-dbadger-io
stefans-elastic Feb 11, 2025
477fb0a
update CHANGELOG-developer.next.asciidoc message
stefans-elastic Feb 12, 2025
3d37518
Merge branch 'main' into drop-dbadger-io
stefans-elastic Feb 12, 2025
c77b7f1
add CHANGELOG.next.asciidoc entry
stefans-elastic Feb 13, 2025
ddc31ed
Merge branch 'main' into drop-dbadger-io
stefans-elastic Feb 13, 2025
c47826e
Update CHANGELOG.next.asciidoc
stefans-elastic Feb 13, 2025
30a92e9
move entry to correct section in CHANGELOG.next.asciidoc
stefans-elastic Feb 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,13 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Add a configuration option for TCP/UDP network type. {issue}40407[40407] {pull}40623[40623]
- Added debug logging to parquet reader in x-pack/libbeat/reader. {pull}40651[40651]
- Added filebeat debug histograms for s3 object size and events per processed s3 object. {pull}40775[40775]
- Simplified GCS input state checkpoint calculation logic. {issue}40878[40878] {pull}40937[40937]
- Simplified GCS input state checkpoint calculation logic. {issue}40878[40878] {pull}40937[40937]
- Simplified Azure Blob Storage input state checkpoint calculation logic. {issue}40674[40674] {pull}40936[40936]
- Add field redaction package. {pull}40997[40997]
- Add support for marked redaction to x-pack/filebeat/input/internal/private {pull}41212[41212]
- Add support for collecting Okta role and factor data for users with filebeat entityanalytics input. {pull}41044[41044]
- Add CEL input program evaluation coverage collection support. {pull}41884[41884]
- Switch persistent storage from dgraph-io/badger to etcd-io/bbolt in x-pack/libbeat/persistentcache/persistentcache.go. {pull}42571[42571]

==== Deprecated

Expand Down
403 changes: 0 additions & 403 deletions NOTICE.txt

Large diffs are not rendered by default.

2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ require (
github.com/aws/aws-sdk-go-v2/service/apigatewayv2 v1.24.8
github.com/aws/aws-sdk-go-v2/service/health v1.29.2
github.com/aws/smithy-go v1.22.1
github.com/dgraph-io/badger/v4 v4.4.0
github.com/elastic/bayeux v1.0.5
github.com/elastic/ebpfevents v0.6.0
github.com/elastic/elastic-agent-autodiscover v0.9.0
Expand Down Expand Up @@ -284,7 +283,6 @@ require (
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7 // indirect
github.com/cyphar/filepath-securejoin v0.2.5 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgraph-io/ristretto/v2 v2.0.0 // indirect
github.com/distribution/reference v0.6.0 // indirect
github.com/dnephin/pflag v1.0.7 // indirect
github.com/docker/go-metrics v0.0.1 // indirect
Expand Down
6 changes: 0 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,6 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/devigned/tab v0.1.2-0.20190607222403-0c15cf42f9a2 h1:6+hM8KeYKV0Z9EIINNqIEDyyIRAcNc2FW+/TUYNmWyw=
github.com/devigned/tab v0.1.2-0.20190607222403-0c15cf42f9a2/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY=
github.com/dgraph-io/badger/v4 v4.4.0 h1:rA48XiDynZLyMdlaJl67p9+lqfqwxlgKtCpYLAio7Zk=
github.com/dgraph-io/badger/v4 v4.4.0/go.mod h1:sONMmPPfbnj9FPwS/etCqky/ULth6CQJuAZSuWCmixE=
github.com/dgraph-io/ristretto/v2 v2.0.0 h1:l0yiSOtlJvc0otkqyMaDNysg8E9/F/TYZwMbxscNOAQ=
github.com/dgraph-io/ristretto/v2 v2.0.0/go.mod h1:FVFokF2dRqXyPyeMnK1YDy8Fc6aTe0IKgbcd03CYeEk=
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y=
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/digitalocean/go-libvirt v0.0.0-20240709142323-d8406205c752 h1:NI7XEcHzWVvBfVjSVK6Qk4wmrUfoyQxCNpBjrHelZFk=
github.com/digitalocean/go-libvirt v0.0.0-20240709142323-d8406205c752/go.mod h1:/Ok8PA2qi/ve0Py38+oL+VxoYmlowigYRyLEODRYdgc=
github.com/dimchansky/utfbom v1.1.0 h1:FcM3g+nofKgUteL8dm/UpdRXNC9KmADgTpLKsu0TRo4=
Expand Down
190 changes: 190 additions & 0 deletions x-pack/libbeat/kv/bbolt/bbolt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package bbolt

import (
"encoding/json"
"fmt"
"os"
"path"
"time"

bolt "go.etcd.io/bbolt"
)

const (
defaultDbPath = "beat_cache.db"
defaultBucketName = "kv"
defaultDbFileMode = 0o600
)

// BboltValue - value type used for storage in bolt DB.
type BboltValue struct {
RawValue []byte `json:"rawValue"`

// ExpireAt - Unix timestamp (in nanoseconds) of time when value expires.
ExpireAt int64 `json:"expireAt"`

// TTL - Time To Live used for value. If 0 then the value doesn't expire
TTL time.Duration `json:"ttl"`
stefans-elastic marked this conversation as resolved.
Show resolved Hide resolved
}

type Option func(bbolt *Bbolt)

type Bbolt struct {
dbPath string
dbFileMode os.FileMode
bucketName string

db *bolt.DB
}

// New - creates and returns instance of bolt key-value cache implementation
func New(options ...Option) *Bbolt {
b := &Bbolt{
dbPath: defaultDbPath,
dbFileMode: defaultDbFileMode,
bucketName: defaultBucketName,
}
for _, opt := range options {
opt(b)
}

return b
}

func WithDbPath(path string) Option {
return func(b *Bbolt) {
b.dbPath = path
}
}

func WithDbFileMode(mode os.FileMode) Option {
return func(b *Bbolt) {
b.dbFileMode = mode
}
}

func WithBucketName(name string) Option {
return func(b *Bbolt) {
b.bucketName = name
}
}

// Connect - creates directories of a given path for bbolt DB file (if directories not already exist), creates DB file with given file permissions, creates bucket to store cache data.
func (b *Bbolt) Connect() error {
stefans-elastic marked this conversation as resolved.
Show resolved Hide resolved
var err error

dbDir := path.Dir(b.dbPath)
err = os.MkdirAll(dbDir, b.dbFileMode)
if err != nil {
return fmt.Errorf("bbolt: creation of the directory for DB failed: %w", err)
}

b.db, err = openDbFile(b.dbPath, b.dbFileMode)
if err != nil {
return fmt.Errorf("bbolt: openDbFile error: %w", err)
}
err = b.ensureBucketExists()
if err != nil {
return fmt.Errorf("bbolt: bucket opening error: %w", err)
}
return nil
}

// Get - fetches value by key from bolt DB (returns nil if key is not present or expired)
func (b *Bbolt) Get(key []byte) ([]byte, error) {
// we need writable transaction here because if value is present in DB but expired we need to delete it.
tx, err := b.db.Begin(true)
if err != nil {
return nil, err
}
defer func() {
_ = tx.Rollback()
}()
stefans-elastic marked this conversation as resolved.
Show resolved Hide resolved
bucket := tx.Bucket([]byte(b.bucketName))

bboltValEncoded := bucket.Get(key)
if bboltValEncoded == nil { // no value in store
return nil, nil
}
var bboltVal BboltValue
err = json.Unmarshal(bboltValEncoded, &bboltVal)
if err != nil {
return nil, err
}
if bboltVal.TTL > 0 && bboltVal.ExpireAt <= time.Now().UnixNano() { // value expired
err = bucket.Delete(key) // since value has expired - no need to keep it in DB
if err != nil {
return nil, err
}
return nil, nil
}
return bboltVal.RawValue, nil
}

// Set - stores value by key in bolt DB. If TTL is 0 then value doesn't expire
stefans-elastic marked this conversation as resolved.
Show resolved Hide resolved
func (b *Bbolt) Set(key []byte, value []byte, ttl time.Duration) error {
return b.db.Update(b.createSetClosure(key, value, ttl))
}

// Close - closes bolt DB file.
func (b *Bbolt) Close() error {
return b.db.Close()
}

func (b *Bbolt) createSetClosure(key []byte, value []byte, ttl time.Duration) func(tx *bolt.Tx) error {
stefans-elastic marked this conversation as resolved.
Show resolved Hide resolved
return func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(b.bucketName))

bboltValEncoded, err := getMarshalledBboltValue(value, ttl)
if err != nil {
return err
}
err = bucket.Put(key, bboltValEncoded)
if err != nil {
return err
}

return nil
}
}

// ensureBucketExists - creates bolt bucket if it doesn't already exist.
func (b *Bbolt) ensureBucketExists() error {
err := b.db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists([]byte(b.bucketName))
return err
})
return err
}

// openDbFile - opens bolt DB file and returns *bolt.DB instance
func openDbFile(path string, mode os.FileMode) (*bolt.DB, error) {
db, err := bolt.Open(path, mode, nil)
if err != nil {
return nil, err
}

return db, nil
}

// getMarshalledBboltValue - returns json encoded BboltValue constructed from raw value and TTL.
func getMarshalledBboltValue(value []byte, ttl time.Duration) ([]byte, error) {
return json.Marshal(newBboltValue(value, ttl))
}

// newBboltValue - creates BboltValue from raw value and TTL
func newBboltValue(value []byte, ttl time.Duration) BboltValue {
var expireAt int64
if ttl > 0 {
expireAt = time.Now().UnixNano() + ttl.Nanoseconds()
stefans-elastic marked this conversation as resolved.
Show resolved Hide resolved
}
return BboltValue{
RawValue: value,
ExpireAt: expireAt,
TTL: ttl,
}
}
132 changes: 132 additions & 0 deletions x-pack/libbeat/kv/bbolt/bbolt_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package bbolt

import (
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestNew(t *testing.T) {
tests := []struct {
name string
testCase func() *Bbolt
expected *Bbolt
}{
{
name: "With no options",
testCase: func() *Bbolt {
return New()
},
expected: &Bbolt{
dbPath: defaultDbPath,
dbFileMode: defaultDbFileMode,
bucketName: defaultBucketName,
db: nil,
},
},
{
name: "With options",
testCase: func() *Bbolt {
return New(
WithDbPath("test/path"),
WithBucketName("test_bucket"),
WithDbFileMode(0777),
)
},
expected: &Bbolt{
dbPath: "test/path",
dbFileMode: 0777,
bucketName: "test_bucket",
db: nil,
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(tInner *testing.T) {
boltCache := tt.testCase()
assert.Equal(tInner, tt.expected, boltCache)
})
}
}

func TestGetSet(t *testing.T) {
tests := []struct {
name string
testCase func(*testing.T, *Bbolt)
}{
{
name: "Simple Set and Get",
testCase: func(t *testing.T, bolt *Bbolt) {
err := bolt.Set([]byte("testKey"), []byte("test_value"), 0)
assert.NoError(t, err)

val, err := bolt.Get([]byte("testKey"))
assert.NoError(t, err)
assert.Equal(t, []byte("test_value"), val)
},
},
{
name: "Set with expiration",
testCase: func(t *testing.T, bolt *Bbolt) {
err := bolt.Set([]byte("testKeyWithExpiration"), []byte("test_value"), 5*time.Second)
assert.NoError(t, err)

val, err := bolt.Get([]byte("testKeyWithExpiration"))
assert.NoError(t, err)
assert.Equal(t, []byte("test_value"), val)
},
},
{
name: "Get expired key",
testCase: func(t *testing.T, bolt *Bbolt) {
err := bolt.Set([]byte("testKeyWithExpiration2"), []byte("test_value"), time.Nanosecond)
assert.NoError(t, err)

time.Sleep(time.Nanosecond) // make sure we wait until key in the cache is expired

val, err := bolt.Get([]byte("testKeyWithExpiration2"))
assert.NoError(t, err)
assert.Nil(t, val)
},
},
{
name: "Get not existent key",
testCase: func(t *testing.T, bolt *Bbolt) {
val, err := bolt.Get([]byte("doesNotExist"))
assert.NoError(t, err)
assert.Nil(t, val)
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(tInner *testing.T) {
stefans-elastic marked this conversation as resolved.
Show resolved Hide resolved
dbPath := filepath.Join(t.TempDir(), "test.db")

tInner.Cleanup(func() {
// Remove test DB file after test is done to not interfere with other tests
_ = os.Remove(dbPath)
stefans-elastic marked this conversation as resolved.
Show resolved Hide resolved
})

bolt := &Bbolt{
dbPath: dbPath,
dbFileMode: 0o644,
bucketName: "test_bucket",
}

err := bolt.Connect()
assert.NoError(t, err)
stefans-elastic marked this conversation as resolved.
Show resolved Hide resolved
defer bolt.Close()

tt.testCase(t, bolt)
})
}
}
14 changes: 14 additions & 0 deletions x-pack/libbeat/kv/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package kv

import "time"

type Kv interface {
stefans-elastic marked this conversation as resolved.
Show resolved Hide resolved
Connect() error
Get([]byte) ([]byte, error)
Set([]byte, []byte, time.Duration) error
Close() error
}
Loading
Loading