Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace dgraph-io/badger cache storage with etcd-io/bbolt #42571

Open
wants to merge 36 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
4f10346
add bbolt store for key-value cache
stefans-elastic Jan 31, 2025
fbe7ebf
fix unit test failures
stefans-elastic Feb 3, 2025
7fa4f37
delete expired keys
stefans-elastic Feb 3, 2025
1c4a798
set expireAt properly
stefans-elastic Feb 3, 2025
183b395
add unit tests
stefans-elastic Feb 3, 2025
36c3933
add missing license headers
stefans-elastic Feb 3, 2025
9b20f2d
go mod tidy, update NOTICE.txt
stefans-elastic Feb 3, 2025
f2303dd
fix linter issues
stefans-elastic Feb 3, 2025
19bed5b
add more godoc comments
stefans-elastic Feb 4, 2025
17372b1
Merge branch 'main' of github.com:stefans-elastic/beats into drop-dba…
stefans-elastic Feb 4, 2025
7ab7864
add CHANGELOG-developer.next.asciidoc entry
stefans-elastic Feb 4, 2025
91a3ac2
rename Ttl to TTL to comply with styleguide
stefans-elastic Feb 4, 2025
6ea76db
simplify Set method transaction code
stefans-elastic Feb 5, 2025
21e1b2f
Get/Set transaction refactoring
stefans-elastic Feb 5, 2025
ae592ab
adress PR comments
stefans-elastic Feb 5, 2025
6041c9b
Update x-pack/libbeat/kv/bbolt/bbolt.go
stefans-elastic Feb 5, 2025
aec3e8e
rename Kv -> KV
stefans-elastic Feb 5, 2025
26e8277
Merge branch 'drop-dbadger-io' of github.com:stefans-elastic/beats in…
stefans-elastic Feb 5, 2025
e474690
Merge branch 'main' of github.com:stefans-elastic/beats into drop-dba…
stefans-elastic Feb 6, 2025
8675a71
replace assert.NoError with require.NoError in unit test
stefans-elastic Feb 6, 2025
d606724
don't store TTL as part of KV value in bbolt store
stefans-elastic Feb 6, 2025
2cc0ccd
rename Connect -> Open
stefans-elastic Feb 10, 2025
2b9137d
Merge branch 'main' into drop-dbadger-io
stefans-elastic Feb 10, 2025
0614e03
add debug log upon TTL refreshing (if refreshOnAccess is true)
stefans-elastic Feb 10, 2025
94d8f6d
Merge branch 'drop-dbadger-io' of github.com:stefans-elastic/beats in…
stefans-elastic Feb 10, 2025
5f22d0f
update godoc comment for x-pack/libbeat/persistentcache/persistentcac…
stefans-elastic Feb 10, 2025
dcd7bc7
PR comment resolved
stefans-elastic Feb 10, 2025
fa9f41c
rename boltKvStore -> bboltkv on import renaming to make it more idio…
stefans-elastic Feb 10, 2025
43d2f8f
include error in log message
stefans-elastic Feb 10, 2025
041ddb8
Merge branch 'main' into drop-dbadger-io
stefans-elastic Feb 11, 2025
477fb0a
update CHANGELOG-developer.next.asciidoc message
stefans-elastic Feb 12, 2025
3d37518
Merge branch 'main' into drop-dbadger-io
stefans-elastic Feb 12, 2025
c77b7f1
add CHANGELOG.next.asciidoc entry
stefans-elastic Feb 13, 2025
ddc31ed
Merge branch 'main' into drop-dbadger-io
stefans-elastic Feb 13, 2025
c47826e
Update CHANGELOG.next.asciidoc
stefans-elastic Feb 13, 2025
30a92e9
move entry to correct section in CHANGELOG.next.asciidoc
stefans-elastic Feb 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,13 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Add a configuration option for TCP/UDP network type. {issue}40407[40407] {pull}40623[40623]
- Added debug logging to parquet reader in x-pack/libbeat/reader. {pull}40651[40651]
- Added filebeat debug histograms for s3 object size and events per processed s3 object. {pull}40775[40775]
- Simplified GCS input state checkpoint calculation logic. {issue}40878[40878] {pull}40937[40937]
- Simplified GCS input state checkpoint calculation logic. {issue}40878[40878] {pull}40937[40937]
- Simplified Azure Blob Storage input state checkpoint calculation logic. {issue}40674[40674] {pull}40936[40936]
- Add field redaction package. {pull}40997[40997]
- Add support for marked redaction to x-pack/filebeat/input/internal/private {pull}41212[41212]
- Add support for collecting Okta role and factor data for users with filebeat entityanalytics input. {pull}41044[41044]
- Add CEL input program evaluation coverage collection support. {pull}41884[41884]
- Switch persistent storage from dgraph-io/badger to etcd-io/bbolt in x-pack/libbeat/persistentcache/persistentcache.go. This change affects only Cloudfoundry related functionality. {pull}42571[42571]

==== Deprecated

Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ otherwise no tag is added. {issue}42208[42208] {pull}42403[42403]
- Add `uppercase` processor. {issue}22254[22254] {pull}41535[41535]
- Replace `compress/gzip` with https://github.com/klauspost/compress/gzip library for gzip compression {pull}41584[41584]
- Add regex pattern matching to add_kubernetes_metadata processor {pull}41903[41903]
- Switch persistent storage from dgraph-io/badger to etcd-io/bbolt in x-pack/libbeat/persistentcache/persistentcache.go. This change impacts only the `add_cloudfoundry_metadata` processor. After updating to this version, its caches will be regenerated, potentially resulting in numerous calls to the Cloudfoundry API. {pull}42571[42571]

*Auditbeat*

Expand Down
403 changes: 0 additions & 403 deletions NOTICE.txt

Large diffs are not rendered by default.

2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ require (
github.com/aws/aws-sdk-go-v2/service/apigatewayv2 v1.24.8
github.com/aws/aws-sdk-go-v2/service/health v1.29.2
github.com/aws/smithy-go v1.22.1
github.com/dgraph-io/badger/v4 v4.4.0
github.com/elastic/bayeux v1.0.5
github.com/elastic/ebpfevents v0.6.0
github.com/elastic/elastic-agent-autodiscover v0.9.0
Expand Down Expand Up @@ -284,7 +283,6 @@ require (
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7 // indirect
github.com/cyphar/filepath-securejoin v0.2.5 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgraph-io/ristretto/v2 v2.0.0 // indirect
github.com/distribution/reference v0.6.0 // indirect
github.com/dnephin/pflag v1.0.7 // indirect
github.com/docker/go-metrics v0.0.1 // indirect
Expand Down
6 changes: 0 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,6 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/devigned/tab v0.1.2-0.20190607222403-0c15cf42f9a2 h1:6+hM8KeYKV0Z9EIINNqIEDyyIRAcNc2FW+/TUYNmWyw=
github.com/devigned/tab v0.1.2-0.20190607222403-0c15cf42f9a2/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY=
github.com/dgraph-io/badger/v4 v4.4.0 h1:rA48XiDynZLyMdlaJl67p9+lqfqwxlgKtCpYLAio7Zk=
github.com/dgraph-io/badger/v4 v4.4.0/go.mod h1:sONMmPPfbnj9FPwS/etCqky/ULth6CQJuAZSuWCmixE=
github.com/dgraph-io/ristretto/v2 v2.0.0 h1:l0yiSOtlJvc0otkqyMaDNysg8E9/F/TYZwMbxscNOAQ=
github.com/dgraph-io/ristretto/v2 v2.0.0/go.mod h1:FVFokF2dRqXyPyeMnK1YDy8Fc6aTe0IKgbcd03CYeEk=
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y=
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/digitalocean/go-libvirt v0.0.0-20240709142323-d8406205c752 h1:NI7XEcHzWVvBfVjSVK6Qk4wmrUfoyQxCNpBjrHelZFk=
github.com/digitalocean/go-libvirt v0.0.0-20240709142323-d8406205c752/go.mod h1:/Ok8PA2qi/ve0Py38+oL+VxoYmlowigYRyLEODRYdgc=
github.com/dimchansky/utfbom v1.1.0 h1:FcM3g+nofKgUteL8dm/UpdRXNC9KmADgTpLKsu0TRo4=
Expand Down
175 changes: 175 additions & 0 deletions x-pack/libbeat/kv/bbolt/bbolt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package bbolt

import (
"encoding/json"
"fmt"
"os"
"path"
"time"

bolt "go.etcd.io/bbolt"
)

const (
defaultDbPath = "beat_cache.db"
defaultBucketName = "kv"
defaultDbFileMode = 0o600
)

// BboltValue - value type used for storage in bolt DB.
type BboltValue struct {
RawValue []byte `json:"rawValue"`

// ExpireAt - Unix timestamp (in nanoseconds) of time when value expires.
ExpireAt int64 `json:"expireAt"`
}

type Option func(bbolt *Bbolt)

type Bbolt struct {
dbPath string
dbFileMode os.FileMode
bucketName string

db *bolt.DB
}

// New creates and returns instance of bolt key-value cache implementation
func New(options ...Option) *Bbolt {
b := &Bbolt{
dbPath: defaultDbPath,
dbFileMode: defaultDbFileMode,
bucketName: defaultBucketName,
}
for _, opt := range options {
opt(b)
}

return b
}

func WithDbPath(path string) Option {
return func(b *Bbolt) {
b.dbPath = path
}
}

func WithDbFileMode(mode os.FileMode) Option {
return func(b *Bbolt) {
b.dbFileMode = mode
}
}

func WithBucketName(name string) Option {
return func(b *Bbolt) {
b.bucketName = name
}
}

// Open creates directories of a given path for bbolt DB file (if directories not already exist), creates DB file with given file permissions, creates bucket to store cache data.
func (b *Bbolt) Open() error {
var err error

dbDir := path.Dir(b.dbPath)
err = os.MkdirAll(dbDir, b.dbFileMode)
if err != nil {
return fmt.Errorf("bbolt: creation of the directory for DB failed: %w", err)
}

b.db, err = openDbFile(b.dbPath, b.dbFileMode)
if err != nil {
return fmt.Errorf("bbolt: openDbFile error: %w", err)
}
err = b.ensureBucketExists()
if err != nil {
return fmt.Errorf("bbolt: bucket opening error: %w", err)
}
return nil
}

// Get fetches value by key from bolt DB (returns nil if key is not present or expired)
func (b *Bbolt) Get(key []byte) (data []byte, err error) {
// we need writable transaction here in order to delete expired keys
err = b.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(b.bucketName))

jsonVal := bucket.Get(key)
if jsonVal == nil { // no value in store
return nil
}

var val BboltValue
if err := json.Unmarshal(jsonVal, &val); err != nil {
return err
}
if val.ExpireAt != 0 && val.ExpireAt <= time.Now().UnixNano() { // value expired
return bucket.Delete(key)
}
Comment on lines +109 to +111
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't ever remove entries that stop being accessed, potentially leading to memory or storage leaks. This can happen in the Cloudfoundry use case when applications are removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would you suggest to fix this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what would be the best option here. A naive approach can be to have a goroutine that periodically looks for expired entries, but not sure about the performance hit it could cause in cases with many entries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah
I've had the same idea but fetching and checking all the keys for expiration might be problematic in case of many keys being stored in store

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another idea would be to have a "system" key which would track all the keys and their expiration time which would eliminate the need to iterate over all keys in the store (but it would complicate the code meaning making it more prone to errors/bugs and in case there are many keys to delete it would still potentially have a hit on the performance).
And another variation of the first idea - store everything in one keys (to outside users it will look the same but under the hood it would be a single bolt key) but again, this way any operation would be an operation on a single bolt key but again - complication and i don't know if bolt has a limit on key (key's value) size

to be honest, I don't really like any of the ideas I've listed, just brainstorming

data = val.RawValue
return nil
})
return data, err
}

// Set stores a key-value pair in the database. If TTL is 0, the key does not expire.
func (b *Bbolt) Set(key []byte, value []byte, ttl time.Duration) error {
return b.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(b.bucketName))

bboltValEncoded, err := getMarshalledBboltValue(value, ttl)
if err != nil {
return err
}
err = bucket.Put(key, bboltValEncoded)
if err != nil {
return err
}

return nil
})
}

// Close closes the database.
func (b *Bbolt) Close() error {
return b.db.Close()
}

// ensureBucketExists - creates bolt bucket if it doesn't already exist.
func (b *Bbolt) ensureBucketExists() error {
err := b.db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists([]byte(b.bucketName))
return err
})
return err
}

// openDbFile opens bolt DB file and returns *bolt.DB instance
func openDbFile(path string, mode os.FileMode) (*bolt.DB, error) {
db, err := bolt.Open(path, mode, nil)
if err != nil {
return nil, err
}

return db, nil
}

// getMarshalledBboltValue returns json encoded BboltValue constructed from raw value and TTL.
func getMarshalledBboltValue(value []byte, ttl time.Duration) ([]byte, error) {
return json.Marshal(newBboltValue(value, ttl))
}

// newBboltValue creates BboltValue from raw value and TTL
func newBboltValue(value []byte, ttl time.Duration) BboltValue {
var expireAt int64
if ttl > 0 {
expireAt = time.Now().Add(ttl).UnixNano()
}
return BboltValue{
RawValue: value,
ExpireAt: expireAt,
}
}
127 changes: 127 additions & 0 deletions x-pack/libbeat/kv/bbolt/bbolt_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package bbolt

import (
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNew(t *testing.T) {
tests := []struct {
name string
testCase func() *Bbolt
expected *Bbolt
}{
{
name: "With no options",
testCase: func() *Bbolt {
return New()
},
expected: &Bbolt{
dbPath: defaultDbPath,
dbFileMode: defaultDbFileMode,
bucketName: defaultBucketName,
db: nil,
},
},
{
name: "With options",
testCase: func() *Bbolt {
return New(
WithDbPath("test/path"),
WithBucketName("test_bucket"),
WithDbFileMode(0777),
)
},
expected: &Bbolt{
dbPath: "test/path",
dbFileMode: 0777,
bucketName: "test_bucket",
db: nil,
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
boltCache := tt.testCase()
assert.Equal(t, tt.expected, boltCache)
})
}
}

func TestGetSet(t *testing.T) {
tests := []struct {
name string
testCase func(*testing.T, *Bbolt)
}{
{
name: "Simple Set and Get",
testCase: func(t *testing.T, bolt *Bbolt) {
err := bolt.Set([]byte("testKey"), []byte("test_value"), 0)
assert.NoError(t, err)

val, err := bolt.Get([]byte("testKey"))
assert.NoError(t, err)
assert.Equal(t, []byte("test_value"), val)
},
},
{
name: "Set with expiration",
testCase: func(t *testing.T, bolt *Bbolt) {
err := bolt.Set([]byte("testKeyWithExpiration"), []byte("test_value"), 5*time.Second)
assert.NoError(t, err)

val, err := bolt.Get([]byte("testKeyWithExpiration"))
assert.NoError(t, err)
assert.Equal(t, []byte("test_value"), val)
},
},
{
name: "Get expired key",
testCase: func(t *testing.T, bolt *Bbolt) {
err := bolt.Set([]byte("testKeyWithExpiration2"), []byte("test_value"), time.Nanosecond)
assert.NoError(t, err)

time.Sleep(time.Nanosecond) // make sure we wait until key in the cache is expired

val, err := bolt.Get([]byte("testKeyWithExpiration2"))
assert.NoError(t, err)
assert.Nil(t, val)
},
},
{
name: "Get not existent key",
testCase: func(t *testing.T, bolt *Bbolt) {
val, err := bolt.Get([]byte("doesNotExist"))
assert.NoError(t, err)
assert.Nil(t, val)
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), "test.db")

bolt := &Bbolt{
dbPath: dbPath,
dbFileMode: 0o644,
bucketName: "test_bucket",
}

err := bolt.Open()
require.NoError(t, err)
defer bolt.Close()

tt.testCase(t, bolt)
})
}
}
14 changes: 14 additions & 0 deletions x-pack/libbeat/kv/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package kv

import "time"

type KV interface {
Open() error
Get([]byte) ([]byte, error)
Set([]byte, []byte, time.Duration) error
Close() error
}
Loading
Loading