Skip to content

Commit

Permalink
WIP: bench with persistent queue
Browse files Browse the repository at this point in the history
  • Loading branch information
carsonip committed May 16, 2024
1 parent 332ee78 commit 7978e89
Showing 1 changed file with 54 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@ import (
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/pdata/plog"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/filestorage"
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
)

Expand All @@ -34,23 +38,63 @@ func BenchmarkLogsExporter(b *testing.B) {
}
}

type mockHost struct {
component.Host
ext map[component.ID]component.Component
}

func (nh *mockHost) GetExtensions() map[component.ID]component.Component {
return nh.ext
}

func benchmarkLogs(b *testing.B, batchSize int) {
var generatedCount, observedCount atomic.Uint64

storage := filestorage.NewFactory()
storageCfg := storage.CreateDefaultConfig().(*filestorage.Config)
storageCfg.Directory = b.TempDir()
componentID := component.NewIDWithName(storage.Type(), "elasticsearch")
fileExtension, err := storage.CreateExtension(context.Background(),
extension.CreateSettings{
ID: componentID,
TelemetrySettings: componenttest.NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
},
storageCfg)
require.NoError(b, err)

host := &mockHost{
ext: map[component.ID]component.Component{
componentID: fileExtension,
},
}

require.NoError(b, fileExtension.Start(context.Background(), host))
defer fileExtension.Shutdown(context.Background())

receiver := newElasticsearchDataReceiver(b)
factory := elasticsearchexporter.NewFactory()

cfg := factory.CreateDefaultConfig().(*elasticsearchexporter.Config)
//cfg.QueueSettings.Enabled = true
//cfg.QueueSettings.NumConsumers = 100
//cfg.QueueSettings.QueueSize = 100000
cfg.PersistentQueueConfig.Enabled = true
cfg.PersistentQueueConfig.NumConsumers = 100
cfg.PersistentQueueConfig.QueueSize = 100000
cfg.PersistentQueueConfig.StorageID = &componentID
cfg.Endpoints = []string{receiver.endpoint}
cfg.Flush.Interval = 10 * time.Millisecond
cfg.NumWorkers = 1
cfg.Flush.Interval = 100 * time.Millisecond
cfg.Flush.Bytes = 125 * 300
cfg.NumWorkers = 4

exporter, err := factory.CreateLogsExporter(
context.Background(),
exportertest.NewNopCreateSettings(),
cfg,
)
require.NoError(b, err)
exporter.Start(context.Background(), host)

provider := testbed.NewPerfTestDataProvider(testbed.LoadOptions{ItemsPerBatch: batchSize})
provider.SetLoadGeneratorCounters(&generatedCount)
Expand All @@ -75,6 +119,13 @@ func benchmarkLogs(b *testing.B, batchSize int) {
b.StartTimer()
require.NoError(b, exporter.ConsumeLogs(ctx, logs))
}
// FIXME: persistent queue doesn't drain on shutdown
for {
if observedCount.Load() >= generatedCount.Load() {
break
}
time.Sleep(10 * time.Millisecond)
}
require.NoError(b, exporter.Shutdown(ctx))
require.Equal(b, generatedCount.Load(), observedCount.Load(), "failed to send all logs to backend")
require.Equal(b, int64(generatedCount.Load()), int64(observedCount.Load()), "failed to send all logs to backend")
}

0 comments on commit 7978e89

Please sign in to comment.