Skip to content

Commit

Permalink
rework/add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinnoel-be committed Jan 21, 2025
1 parent 8cb1d09 commit 2617fee
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 132 deletions.
265 changes: 215 additions & 50 deletions exporter/googlecloudpubsubexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,73 +5,238 @@ package googlecloudpubsubexporter

import (
"context"
"fmt"
"testing"
"time"

pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
"cloud.google.com/go/pubsub/pstest"
"github.com/google/uuid"
"github.com/googleapis/gax-go/v2"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

func TestExporterDefaultSettings(t *testing.T) {
ctx := context.Background()
// Start a fake server running locally.
srv := pstest.NewServer()
defer srv.Close()
_, err := srv.GServer.CreateTopic(ctx, &pb.Topic{
Name: "projects/my-project/topics/otlp",
const (
defaultUUID = "00000000-0000-0000-0000-000000000000"
defaultProjectID = "my-project"
defaultTopic = "projects/my-project/topics/otlp"
)

func TestExporterNoData(t *testing.T) {
exporter, publisher := newTestExporter(t, func(config *Config) {
config.Watermark.Behavior = "earliest"
})
assert.NoError(t, err)

factory := NewFactory()
cfg := factory.CreateDefaultConfig()
exporterConfig := cfg.(*Config)
exporterConfig.Endpoint = srv.Addr
exporterConfig.Insecure = true
exporterConfig.ProjectID = "my-project"
exporterConfig.Topic = "projects/my-project/topics/otlp"
exporterConfig.TimeoutSettings = exporterhelper.TimeoutConfig{
Timeout: 12 * time.Second,
}
exporter := ensureExporter(exportertest.NewNopSettings(), exporterConfig)
assert.NoError(t, exporter.start(ctx, nil))
assert.NoError(t, exporter.consumeTraces(ctx, ptrace.NewTraces()))
assert.NoError(t, exporter.consumeMetrics(ctx, pmetric.NewMetrics()))
ctx := context.Background()
assert.NoError(t, exporter.consumeLogs(ctx, plog.NewLogs()))
assert.NoError(t, exporter.shutdown(ctx))
assert.NoError(t, exporter.consumeMetrics(ctx, pmetric.NewMetrics()))
assert.NoError(t, exporter.consumeTraces(ctx, ptrace.NewTraces()))

assert.Zero(t, publisher.requests)
}

func TestExporterCompression(t *testing.T) {
ctx := context.Background()
// Start a fake server running locally.
srv := pstest.NewServer()
defer srv.Close()
_, err := srv.GServer.CreateTopic(ctx, &pb.Topic{
Name: "projects/my-project/topics/otlp",
func TestExporterClientError(t *testing.T) {
cfg := NewFactory().CreateDefaultConfig().(*Config)
cfg.ProjectID = defaultProjectID
cfg.Topic = defaultTopic
require.NoError(t, cfg.Validate())

exporter := ensureExporter(exportertest.NewNopSettings(), cfg)
exporter.makeClient = func(context.Context, *Config, string) (publisherClient, error) {
return nil, fmt.Errorf("something went wrong")
}

require.Error(t, exporter.start(context.Background(), componenttest.NewNopHost()))
}

func TestExporterSimpleData(t *testing.T) {
t.Run("logs", func(t *testing.T) {
exporter, publisher := newTestExporter(t)

logs := plog.NewLogs()
logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("some log message")

require.NoError(t, exporter.consumeLogs(context.Background(), logs))
require.Len(t, publisher.requests, 1)

request := publisher.requests[0]
assert.Equal(t, defaultTopic, request.Topic)
assert.Len(t, request.Messages, 1)

message := request.Messages[0]
assert.NotEmpty(t, message.Data)
assert.Subset(t, message.Attributes, map[string]string{
"ce-type": "org.opentelemetry.otlp.logs.v1",
"content-type": "application/protobuf",
})
})

t.Run("metrics", func(t *testing.T) {
exporter, publisher := newTestExporter(t)

metrics := pmetric.NewMetrics()
metric := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
metric.SetName("some.metric")
metric.SetEmptyGauge().DataPoints().AppendEmpty().SetIntValue(42)

require.NoError(t, exporter.consumeMetrics(context.Background(), metrics))
require.Len(t, publisher.requests, 1)

request := publisher.requests[0]
assert.Equal(t, defaultTopic, request.Topic)
assert.Len(t, request.Messages, 1)

message := request.Messages[0]
assert.NotEmpty(t, message.Data)
assert.Subset(t, message.Attributes, map[string]string{
"ce-type": "org.opentelemetry.otlp.metrics.v1",
"content-type": "application/protobuf",
})
})
assert.NoError(t, err)

t.Run("traces", func(t *testing.T) {
exporter, publisher := newTestExporter(t)

traces := ptrace.NewTraces()
span := traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty()
span.SetName("some span")

require.NoError(t, exporter.consumeTraces(context.Background(), traces))
require.Len(t, publisher.requests, 1)

request := publisher.requests[0]
assert.Equal(t, defaultTopic, request.Topic)
assert.Len(t, request.Messages, 1)

message := request.Messages[0]
assert.NotEmpty(t, message.Data)
assert.Subset(t, message.Attributes, map[string]string{
"ce-type": "org.opentelemetry.otlp.traces.v1",
"content-type": "application/protobuf",
})
})
}

func TestExporterSimpleDataWithCompression(t *testing.T) {
withCompression := func(config *Config) {
config.Compression = "gzip"
}

t.Run("logs", func(t *testing.T) {
exporter, publisher := newTestExporter(t, withCompression)

logs := plog.NewLogs()
logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("some log message")

require.NoError(t, exporter.consumeLogs(context.Background(), logs))
require.Len(t, publisher.requests, 1)

request := publisher.requests[0]
assert.Equal(t, defaultTopic, request.Topic)
assert.Len(t, request.Messages, 1)

message := request.Messages[0]
assert.NotEmpty(t, message.Data)
assert.Subset(t, message.Attributes, map[string]string{
"ce-id": "00000000-0000-0000-0000-000000000000",
"ce-source": "/opentelemetry/collector/googlecloudpubsub/latest",
"ce-specversion": "1.0",
"ce-type": "org.opentelemetry.otlp.logs.v1",
"content-type": "application/protobuf",
"content-encoding": "gzip",
})
})

t.Run("metrics", func(t *testing.T) {
exporter, publisher := newTestExporter(t, withCompression)

metrics := pmetric.NewMetrics()
metric := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
metric.SetName("some.metric")
metric.SetEmptyGauge().DataPoints().AppendEmpty().SetIntValue(42)

require.NoError(t, exporter.consumeMetrics(context.Background(), metrics))
require.Len(t, publisher.requests, 1)

request := publisher.requests[0]
assert.Equal(t, defaultTopic, request.Topic)
assert.Len(t, request.Messages, 1)

message := request.Messages[0]
assert.NotEmpty(t, message.Data)
assert.Subset(t, message.Attributes, map[string]string{
"ce-type": "org.opentelemetry.otlp.metrics.v1",
"content-type": "application/protobuf",
"content-encoding": "gzip",
})
})

t.Run("traces", func(t *testing.T) {
exporter, publisher := newTestExporter(t, withCompression)

traces := ptrace.NewTraces()
span := traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty()
span.SetName("some span")

require.NoError(t, exporter.consumeTraces(context.Background(), traces))
require.Len(t, publisher.requests, 1)

request := publisher.requests[0]
assert.Equal(t, defaultTopic, request.Topic)
assert.Len(t, request.Messages, 1)

message := request.Messages[0]
assert.NotEmpty(t, message.Data)
assert.Subset(t, message.Attributes, map[string]string{
"ce-type": "org.opentelemetry.otlp.traces.v1",
"content-type": "application/protobuf",
"content-encoding": "gzip",
})
})
}

// Helpers

func newTestExporter(t *testing.T, options ...func(*Config)) (*pubsubExporter, *mockPublisher) {
t.Helper()

factory := NewFactory()
cfg := factory.CreateDefaultConfig()
exporterConfig := cfg.(*Config)
exporterConfig.Endpoint = srv.Addr
exporterConfig.UserAgent = "test-user-agent"
exporterConfig.Insecure = true
exporterConfig.ProjectID = "my-project"
exporterConfig.Topic = "projects/my-project/topics/otlp"
exporterConfig.TimeoutSettings = exporterhelper.TimeoutConfig{
Timeout: 12 * time.Second,
cfg := factory.CreateDefaultConfig().(*Config)
cfg.ProjectID = defaultProjectID
cfg.Topic = defaultTopic
for _, option := range options {
option(cfg)
}
exporterConfig.Compression = "gzip"
exporter := ensureExporter(exportertest.NewNopSettings(), exporterConfig)
assert.NoError(t, exporter.start(ctx, nil))
assert.NoError(t, exporter.consumeTraces(ctx, ptrace.NewTraces()))
assert.NoError(t, exporter.consumeMetrics(ctx, pmetric.NewMetrics()))
assert.NoError(t, exporter.consumeLogs(ctx, plog.NewLogs()))
assert.NoError(t, exporter.shutdown(ctx))
require.NoError(t, cfg.Validate())

exporter := ensureExporter(exportertest.NewNopSettings(), cfg)
publisher := &mockPublisher{}
exporter.makeClient = func(context.Context, *Config, string) (publisherClient, error) {
return publisher, nil
}
exporter.makeUUID = func() (uuid.UUID, error) {
return uuid.Parse(defaultUUID)
}

require.NoError(t, exporter.start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() { assert.NoError(t, exporter.shutdown(context.Background())) })

return exporter, publisher
}

type mockPublisher struct {
requests []*pb.PublishRequest
}

func (m *mockPublisher) Publish(_ context.Context, request *pb.PublishRequest, _ ...gax.CallOption) (*pb.PublishResponse, error) {
m.requests = append(m.requests, request)
return &pb.PublishResponse{}, nil
}

func (m *mockPublisher) Close() error {
return nil
}
5 changes: 0 additions & 5 deletions exporter/googlecloudpubsubexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ require (
)

require (
cloud.google.com/go v0.116.0 // indirect
cloud.google.com/go/auth v0.13.0 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.6 // indirect
cloud.google.com/go/compute/metadata v0.6.0 // indirect
Expand All @@ -34,8 +33,6 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/s2a-go v0.1.8 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
Expand All @@ -49,8 +46,6 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
go.einride.tech/aip v0.68.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.117.1-0.20250119231113-f07ebc3afb51 // indirect
go.opentelemetry.io/collector/consumer/consumererror v0.117.1-0.20250119231113-f07ebc3afb51 // indirect
go.opentelemetry.io/collector/consumer/consumertest v0.117.1-0.20250119231113-f07ebc3afb51 // indirect
Expand Down
Loading

0 comments on commit 2617fee

Please sign in to comment.