Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace dgraph-io/badger cache storage with etcd-io/bbolt #42571

Open
wants to merge 26 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
4f10346
add bbolt store for key-value cache
stefans-elastic Jan 31, 2025
fbe7ebf
fix unit test failures
stefans-elastic Feb 3, 2025
7fa4f37
delete expired keys
stefans-elastic Feb 3, 2025
1c4a798
set expireAt properly
stefans-elastic Feb 3, 2025
183b395
add unit tests
stefans-elastic Feb 3, 2025
36c3933
add missing license headers
stefans-elastic Feb 3, 2025
9b20f2d
go mod tidy, update NOTICE.txt
stefans-elastic Feb 3, 2025
f2303dd
fix linter issues
stefans-elastic Feb 3, 2025
19bed5b
add more godoc comments
stefans-elastic Feb 4, 2025
17372b1
Merge branch 'main' of github.com:stefans-elastic/beats into drop-dba…
stefans-elastic Feb 4, 2025
7ab7864
add CHANGELOG-developer.next.asciidoc entry
stefans-elastic Feb 4, 2025
91a3ac2
rename Ttl to TTL to comply with styleguide
stefans-elastic Feb 4, 2025
6ea76db
simplify Set method transaction code
stefans-elastic Feb 5, 2025
21e1b2f
Get/Set transaction refactoring
stefans-elastic Feb 5, 2025
ae592ab
adress PR comments
stefans-elastic Feb 5, 2025
6041c9b
Update x-pack/libbeat/kv/bbolt/bbolt.go
stefans-elastic Feb 5, 2025
aec3e8e
rename Kv -> KV
stefans-elastic Feb 5, 2025
26e8277
Merge branch 'drop-dbadger-io' of github.com:stefans-elastic/beats in…
stefans-elastic Feb 5, 2025
e474690
Merge branch 'main' of github.com:stefans-elastic/beats into drop-dba…
stefans-elastic Feb 6, 2025
8675a71
replace assert.NoError with require.NoError in unit test
stefans-elastic Feb 6, 2025
d606724
don't store TTL as part of KV value in bbolt store
stefans-elastic Feb 6, 2025
2cc0ccd
rename Connect -> Open
stefans-elastic Feb 10, 2025
2b9137d
Merge branch 'main' into drop-dbadger-io
stefans-elastic Feb 10, 2025
0614e03
add debug log upon TTL refreshing (if refreshOnAccess is true)
stefans-elastic Feb 10, 2025
94d8f6d
Merge branch 'drop-dbadger-io' of github.com:stefans-elastic/beats in…
stefans-elastic Feb 10, 2025
5f22d0f
update godoc comment for x-pack/libbeat/persistentcache/persistentcac…
stefans-elastic Feb 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,13 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Add a configuration option for TCP/UDP network type. {issue}40407[40407] {pull}40623[40623]
- Added debug logging to parquet reader in x-pack/libbeat/reader. {pull}40651[40651]
- Added filebeat debug histograms for s3 object size and events per processed s3 object. {pull}40775[40775]
- Simplified GCS input state checkpoint calculation logic. {issue}40878[40878] {pull}40937[40937]
- Simplified GCS input state checkpoint calculation logic. {issue}40878[40878] {pull}40937[40937]
- Simplified Azure Blob Storage input state checkpoint calculation logic. {issue}40674[40674] {pull}40936[40936]
- Add field redaction package. {pull}40997[40997]
- Add support for marked redaction to x-pack/filebeat/input/internal/private {pull}41212[41212]
- Add support for collecting Okta role and factor data for users with filebeat entityanalytics input. {pull}41044[41044]
- Add CEL input program evaluation coverage collection support. {pull}41884[41884]
- Switch persistent storage from dgraph-io/badger to etcd-io/bbolt in x-pack/libbeat/persistentcache/persistentcache.go. {pull}42571[42571]

==== Deprecated

Expand Down
403 changes: 0 additions & 403 deletions NOTICE.txt

Large diffs are not rendered by default.

2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ require (
github.com/aws/aws-sdk-go-v2/service/apigatewayv2 v1.24.8
github.com/aws/aws-sdk-go-v2/service/health v1.29.2
github.com/aws/smithy-go v1.22.1
github.com/dgraph-io/badger/v4 v4.4.0
github.com/elastic/bayeux v1.0.5
github.com/elastic/ebpfevents v0.6.0
github.com/elastic/elastic-agent-autodiscover v0.9.0
Expand Down Expand Up @@ -284,7 +283,6 @@ require (
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7 // indirect
github.com/cyphar/filepath-securejoin v0.2.5 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgraph-io/ristretto/v2 v2.0.0 // indirect
github.com/distribution/reference v0.6.0 // indirect
github.com/dnephin/pflag v1.0.7 // indirect
github.com/docker/go-metrics v0.0.1 // indirect
Expand Down
6 changes: 0 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,6 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/devigned/tab v0.1.2-0.20190607222403-0c15cf42f9a2 h1:6+hM8KeYKV0Z9EIINNqIEDyyIRAcNc2FW+/TUYNmWyw=
github.com/devigned/tab v0.1.2-0.20190607222403-0c15cf42f9a2/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY=
github.com/dgraph-io/badger/v4 v4.4.0 h1:rA48XiDynZLyMdlaJl67p9+lqfqwxlgKtCpYLAio7Zk=
github.com/dgraph-io/badger/v4 v4.4.0/go.mod h1:sONMmPPfbnj9FPwS/etCqky/ULth6CQJuAZSuWCmixE=
github.com/dgraph-io/ristretto/v2 v2.0.0 h1:l0yiSOtlJvc0otkqyMaDNysg8E9/F/TYZwMbxscNOAQ=
github.com/dgraph-io/ristretto/v2 v2.0.0/go.mod h1:FVFokF2dRqXyPyeMnK1YDy8Fc6aTe0IKgbcd03CYeEk=
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y=
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/digitalocean/go-libvirt v0.0.0-20240709142323-d8406205c752 h1:NI7XEcHzWVvBfVjSVK6Qk4wmrUfoyQxCNpBjrHelZFk=
github.com/digitalocean/go-libvirt v0.0.0-20240709142323-d8406205c752/go.mod h1:/Ok8PA2qi/ve0Py38+oL+VxoYmlowigYRyLEODRYdgc=
github.com/dimchansky/utfbom v1.1.0 h1:FcM3g+nofKgUteL8dm/UpdRXNC9KmADgTpLKsu0TRo4=
Expand Down
179 changes: 179 additions & 0 deletions x-pack/libbeat/kv/bbolt/bbolt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package bbolt

import (
"encoding/json"
"fmt"
"os"
"path"
"time"

bolt "go.etcd.io/bbolt"
)

const (
defaultDbPath = "beat_cache.db"
defaultBucketName = "kv"
defaultDbFileMode = 0o600
)

// BboltValue - value type used for storage in bolt DB.
type BboltValue struct {
RawValue []byte `json:"rawValue"`

// ExpireAt - Unix timestamp (in nanoseconds) of time when value expires.
ExpireAt int64 `json:"expireAt"`

// TTL - Time To Live used for value. If 0 then the value doesn't expire
TTL time.Duration `json:"ttl"`
stefans-elastic marked this conversation as resolved.
Show resolved Hide resolved
}

type Option func(bbolt *Bbolt)

type Bbolt struct {
dbPath string
dbFileMode os.FileMode
bucketName string

db *bolt.DB
}

// New creates and returns instance of bolt key-value cache implementation
func New(options ...Option) *Bbolt {
b := &Bbolt{
dbPath: defaultDbPath,
dbFileMode: defaultDbFileMode,
bucketName: defaultBucketName,
}
for _, opt := range options {
opt(b)
}

return b
}

func WithDbPath(path string) Option {
return func(b *Bbolt) {
b.dbPath = path
}
}

func WithDbFileMode(mode os.FileMode) Option {
return func(b *Bbolt) {
b.dbFileMode = mode
}
}

func WithBucketName(name string) Option {
return func(b *Bbolt) {
b.bucketName = name
}
}

// Connect creates directories of a given path for bbolt DB file (if directories not already exist), creates DB file with given file permissions, creates bucket to store cache data.
func (b *Bbolt) Connect() error {
stefans-elastic marked this conversation as resolved.
Show resolved Hide resolved
var err error

dbDir := path.Dir(b.dbPath)
err = os.MkdirAll(dbDir, b.dbFileMode)
if err != nil {
return fmt.Errorf("bbolt: creation of the directory for DB failed: %w", err)
}

b.db, err = openDbFile(b.dbPath, b.dbFileMode)
if err != nil {
return fmt.Errorf("bbolt: openDbFile error: %w", err)
}
err = b.ensureBucketExists()
if err != nil {
return fmt.Errorf("bbolt: bucket opening error: %w", err)
}
return nil
}

// Get fetches value by key from bolt DB (returns nil if key is not present or expired)
func (b *Bbolt) Get(key []byte) (data []byte, err error) {
// we need writable transaction here in order to delete expired keys
err = b.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(b.bucketName))

jsonVal := bucket.Get(key)
if jsonVal == nil { // no value in store
return nil
}

var val BboltValue
if err := json.Unmarshal(jsonVal, &val); err != nil {
return err
}
if val.TTL > 0 && val.ExpireAt <= time.Now().UnixNano() { // value expired
return bucket.Delete(key)
}
data = val.RawValue
return nil
})
return data, err
}

// Set stores a key-value pair in the database. If TTL is 0, the key does not expire.
func (b *Bbolt) Set(key []byte, value []byte, ttl time.Duration) error {
return b.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(b.bucketName))

bboltValEncoded, err := getMarshalledBboltValue(value, ttl)
if err != nil {
return err
}
err = bucket.Put(key, bboltValEncoded)
if err != nil {
return err
}

return nil
})
}

// Close closes the database.
func (b *Bbolt) Close() error {
return b.db.Close()
}

// ensureBucketExists - creates bolt bucket if it doesn't already exist.
func (b *Bbolt) ensureBucketExists() error {
err := b.db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists([]byte(b.bucketName))
return err
})
return err
}

// openDbFile opens bolt DB file and returns *bolt.DB instance
func openDbFile(path string, mode os.FileMode) (*bolt.DB, error) {
db, err := bolt.Open(path, mode, nil)
if err != nil {
return nil, err
}

return db, nil
}

// getMarshalledBboltValue returns json encoded BboltValue constructed from raw value and TTL.
func getMarshalledBboltValue(value []byte, ttl time.Duration) ([]byte, error) {
return json.Marshal(newBboltValue(value, ttl))
}

// newBboltValue creates BboltValue from raw value and TTL
func newBboltValue(value []byte, ttl time.Duration) BboltValue {
var expireAt int64
if ttl > 0 {
expireAt = time.Now().UnixNano() + ttl.Nanoseconds()
}
return BboltValue{
RawValue: value,
ExpireAt: expireAt,
TTL: ttl,
}
}
126 changes: 126 additions & 0 deletions x-pack/libbeat/kv/bbolt/bbolt_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package bbolt

import (
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestNew(t *testing.T) {
tests := []struct {
name string
testCase func() *Bbolt
expected *Bbolt
}{
{
name: "With no options",
testCase: func() *Bbolt {
return New()
},
expected: &Bbolt{
dbPath: defaultDbPath,
dbFileMode: defaultDbFileMode,
bucketName: defaultBucketName,
db: nil,
},
},
{
name: "With options",
testCase: func() *Bbolt {
return New(
WithDbPath("test/path"),
WithBucketName("test_bucket"),
WithDbFileMode(0777),
)
},
expected: &Bbolt{
dbPath: "test/path",
dbFileMode: 0777,
bucketName: "test_bucket",
db: nil,
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
boltCache := tt.testCase()
assert.Equal(t, tt.expected, boltCache)
})
}
}

func TestGetSet(t *testing.T) {
tests := []struct {
name string
testCase func(*testing.T, *Bbolt)
}{
{
name: "Simple Set and Get",
testCase: func(t *testing.T, bolt *Bbolt) {
err := bolt.Set([]byte("testKey"), []byte("test_value"), 0)
assert.NoError(t, err)

val, err := bolt.Get([]byte("testKey"))
assert.NoError(t, err)
assert.Equal(t, []byte("test_value"), val)
},
},
{
name: "Set with expiration",
testCase: func(t *testing.T, bolt *Bbolt) {
err := bolt.Set([]byte("testKeyWithExpiration"), []byte("test_value"), 5*time.Second)
assert.NoError(t, err)

val, err := bolt.Get([]byte("testKeyWithExpiration"))
assert.NoError(t, err)
assert.Equal(t, []byte("test_value"), val)
},
},
{
name: "Get expired key",
testCase: func(t *testing.T, bolt *Bbolt) {
err := bolt.Set([]byte("testKeyWithExpiration2"), []byte("test_value"), time.Nanosecond)
assert.NoError(t, err)

time.Sleep(time.Nanosecond) // make sure we wait until key in the cache is expired

val, err := bolt.Get([]byte("testKeyWithExpiration2"))
assert.NoError(t, err)
assert.Nil(t, val)
},
},
{
name: "Get not existent key",
testCase: func(t *testing.T, bolt *Bbolt) {
val, err := bolt.Get([]byte("doesNotExist"))
assert.NoError(t, err)
assert.Nil(t, val)
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), "test.db")

bolt := &Bbolt{
dbPath: dbPath,
dbFileMode: 0o644,
bucketName: "test_bucket",
}

err := bolt.Connect()
assert.NoError(t, err)
stefans-elastic marked this conversation as resolved.
Show resolved Hide resolved
defer bolt.Close()

tt.testCase(t, bolt)
})
}
}
14 changes: 14 additions & 0 deletions x-pack/libbeat/kv/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package kv

import "time"

type KV interface {
Connect() error
Get([]byte) ([]byte, error)
Set([]byte, []byte, time.Duration) error
Close() error
}
Loading
Loading