Skip to content

Commit

Permalink
Add bbolt FSync as a config option
Browse files Browse the repository at this point in the history
  • Loading branch information
natbur authored and nathan.burke committed Oct 6, 2023
1 parent 37233ae commit d2d1203
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 24 deletions.
18 changes: 18 additions & 0 deletions .chloggen/feat_fsync_option_filestorage.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: 'entension/storage/filestorage'

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: 'Add support for setting bbolt fsync option'

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [20266]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
10 changes: 5 additions & 5 deletions extension/storage/filestorage/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,17 @@ type fileStorageClient struct {
closed bool
}

func bboltOptions(timeout time.Duration) *bbolt.Options {
func bboltOptions(timeout time.Duration, noSync bool) *bbolt.Options {
return &bbolt.Options{
Timeout: timeout,
NoSync: true,
NoSync: noSync,
NoFreelistSync: true,
FreelistType: bbolt.FreelistMapType,
}
}

func newClient(logger *zap.Logger, filePath string, timeout time.Duration, compactionCfg *CompactionConfig) (*fileStorageClient, error) {
options := bboltOptions(timeout)
func newClient(logger *zap.Logger, filePath string, timeout time.Duration, compactionCfg *CompactionConfig, fSync bool) (*fileStorageClient, error) {
options := bboltOptions(timeout, !fSync)
db, err := bbolt.Open(filePath, 0600, options)
if err != nil {
return nil, err
Expand Down Expand Up @@ -172,7 +172,7 @@ func (c *fileStorageClient) Compact(compactionDirectory string, timeout time.Dur
}()

// use temporary file as compaction target
options := bboltOptions(timeout)
options := bboltOptions(timeout, true)

c.compactionMutex.Lock()
defer c.compactionMutex.Unlock()
Expand Down
36 changes: 18 additions & 18 deletions extension/storage/filestorage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
func TestClientOperations(t *testing.T) {
dbFile := filepath.Join(t.TempDir(), "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, client.Close(context.TODO()))
Expand Down Expand Up @@ -59,7 +59,7 @@ func TestClientBatchOperations(t *testing.T) {
tempDir := t.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, client.Close(context.TODO()))
Expand Down Expand Up @@ -180,7 +180,7 @@ func TestNewClientTransactionErrors(t *testing.T) {
tempDir := t.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, timeout, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, timeout, &CompactionConfig{}, false)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, client.Close(context.TODO()))
Expand All @@ -204,7 +204,7 @@ func TestNewClientErrorsOnInvalidBucket(t *testing.T) {
tempDir := t.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.Error(t, err)
require.Nil(t, client)

Expand Down Expand Up @@ -259,7 +259,7 @@ func TestClientReboundCompaction(t *testing.T) {
CheckInterval: checkInterval,
ReboundNeededThresholdMiB: testCase.reboundNeededThresholdMiB,
ReboundTriggerThresholdMiB: testCase.reboundTriggerThresholdMiB,
})
}, false)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, client.Close(context.TODO()))
Expand Down Expand Up @@ -348,7 +348,7 @@ func TestClientConcurrentCompaction(t *testing.T) {
CheckInterval: stepInterval * 2,
ReboundNeededThresholdMiB: 1,
ReboundTriggerThresholdMiB: 5,
})
}, false)
require.NoError(t, err)

t.Cleanup(func() {
Expand Down Expand Up @@ -408,7 +408,7 @@ func BenchmarkClientGet(b *testing.B) {
tempDir := b.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.Cleanup(func() {
require.NoError(b, client.Close(context.TODO()))
Expand All @@ -428,7 +428,7 @@ func BenchmarkClientGet100(b *testing.B) {
tempDir := b.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.Cleanup(func() {
require.NoError(b, client.Close(context.TODO()))
Expand All @@ -451,7 +451,7 @@ func BenchmarkClientSet(b *testing.B) {
tempDir := b.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.Cleanup(func() {
require.NoError(b, client.Close(context.TODO()))
Expand All @@ -471,7 +471,7 @@ func BenchmarkClientSet100(b *testing.B) {
tempDir := b.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.Cleanup(func() {
require.NoError(b, client.Close(context.TODO()))
Expand All @@ -493,7 +493,7 @@ func BenchmarkClientDelete(b *testing.B) {
tempDir := b.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.Cleanup(func() {
require.NoError(b, client.Close(context.TODO()))
Expand All @@ -519,7 +519,7 @@ func BenchmarkClientSetLargeDB(b *testing.B) {
tempDir := b.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.Cleanup(func() {
require.NoError(b, client.Close(context.TODO()))
Expand Down Expand Up @@ -556,7 +556,7 @@ func BenchmarkClientInitLargeDB(b *testing.B) {
tempDir := b.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.Cleanup(func() {
require.NoError(b, client.Close(context.TODO()))
Expand All @@ -575,7 +575,7 @@ func BenchmarkClientInitLargeDB(b *testing.B) {
var tempClient *fileStorageClient
b.ResetTimer()
for n := 0; n < b.N; n++ {
tempClient, err = newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
tempClient, err = newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.StopTimer()
err = tempClient.Close(ctx)
Expand All @@ -593,7 +593,7 @@ func BenchmarkClientCompactLargeDBFile(b *testing.B) {
tempDir := b.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.Cleanup(func() {
require.NoError(b, client.Close(context.TODO()))
Expand All @@ -620,7 +620,7 @@ func BenchmarkClientCompactLargeDBFile(b *testing.B) {
testDbFile := filepath.Join(tempDir, fmt.Sprintf("my_db%d", n))
err = os.Link(dbFile, testDbFile)
require.NoError(b, err)
client, err = newClient(zap.NewNop(), testDbFile, time.Second, &CompactionConfig{})
client, err = newClient(zap.NewNop(), testDbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.StartTimer()
require.NoError(b, client.Compact(tempDir, time.Second, 65536))
Expand All @@ -637,7 +637,7 @@ func BenchmarkClientCompactDb(b *testing.B) {
tempDir := b.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.Cleanup(func() {
require.NoError(b, client.Close(context.TODO()))
Expand All @@ -664,7 +664,7 @@ func BenchmarkClientCompactDb(b *testing.B) {
testDbFile := filepath.Join(tempDir, fmt.Sprintf("my_db%d", n))
err = os.Link(dbFile, testDbFile)
require.NoError(b, err)
client, err = newClient(zap.NewNop(), testDbFile, time.Second, &CompactionConfig{})
client, err = newClient(zap.NewNop(), testDbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.StartTimer()
require.NoError(b, client.Compact(tempDir, time.Second, 65536))
Expand Down
3 changes: 3 additions & 0 deletions extension/storage/filestorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ type Config struct {
Timeout time.Duration `mapstructure:"timeout,omitempty"`

Compaction *CompactionConfig `mapstructure:"compaction,omitempty"`

// FSync specifies that fsync should be called after each database write
FSync bool `mapstructure:"fsync,omitempty"`
}

// CompactionConfig defines configuration for optional file storage compaction.
Expand Down
1 change: 1 addition & 0 deletions extension/storage/filestorage/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func TestLoadConfig(t *testing.T) {
CheckInterval: time.Second * 5,
},
Timeout: 2 * time.Second,
FSync: true,
},
},
}
Expand Down
2 changes: 1 addition & 1 deletion extension/storage/filestorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (lfs *localFileStorage) GetClient(_ context.Context, kind component.Kind, e
rawName = sanitize(rawName)
}
absoluteName := filepath.Join(lfs.cfg.Directory, rawName)
client, err := newClient(lfs.logger, absoluteName, lfs.cfg.Timeout, lfs.cfg.Compaction)
client, err := newClient(lfs.logger, absoluteName, lfs.cfg.Timeout, lfs.cfg.Compaction, lfs.cfg.FSync)

if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions extension/storage/filestorage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func createDefaultConfig() component.Config {
CheckInterval: defaultCompactionInterval,
},
Timeout: time.Second,
FSync: false,
}
}

Expand Down
1 change: 1 addition & 0 deletions extension/storage/filestorage/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func TestFactory(t *testing.T) {
require.Equal(t, expected, cfg.Directory)
}
require.Equal(t, time.Second, cfg.Timeout)
require.Equal(t, false, cfg.FSync)

tests := []struct {
name string
Expand Down
1 change: 1 addition & 0 deletions extension/storage/filestorage/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ file_storage/all_settings:
rebound_needed_threshold_mib: 128
max_transaction_size: 2048
timeout: 2s
fsync: true

0 comments on commit d2d1203

Please sign in to comment.