diff --git a/exporter/googlecloudpubsubexporter/exporter.go b/exporter/googlecloudpubsubexporter/exporter.go index b5de6072718f..0136d9183919 100644 --- a/exporter/googlecloudpubsubexporter/exporter.go +++ b/exporter/googlecloudpubsubexporter/exporter.go @@ -33,6 +33,10 @@ type pubsubExporter struct { metricsWatermarkFunc metricsWatermarkFunc logsMarshaler plog.Marshaler logsWatermarkFunc logsWatermarkFunc + + // To be overridden in tests + makeUUID func() (uuid.UUID, error) + makeClient func(ctx context.Context, cfg *Config, userAgent string) (publisherClient, error) } type encoding int @@ -61,7 +65,7 @@ func (ex *pubsubExporter) start(ctx context.Context, _ component.Host) error { ctx, ex.cancel = context.WithCancel(ctx) if ex.client == nil { - client, err := newPublisherClient(ctx, ex.config, ex.userAgent) + client, err := ex.makeClient(ctx, ex.config, ex.userAgent) if err != nil { return fmt.Errorf("failed creating the gRPC client to Pubsub: %w", err) } @@ -82,7 +86,7 @@ func (ex *pubsubExporter) shutdown(_ context.Context) error { } func (ex *pubsubExporter) publishMessage(ctx context.Context, encoding encoding, data []byte, watermark time.Time) error { - id, err := uuid.NewRandom() + id, err := ex.makeUUID() if err != nil { return err } diff --git a/exporter/googlecloudpubsubexporter/factory.go b/exporter/googlecloudpubsubexporter/factory.go index b6abc3ddb0a5..b60be1b0a8bc 100644 --- a/exporter/googlecloudpubsubexporter/factory.go +++ b/exporter/googlecloudpubsubexporter/factory.go @@ -11,6 +11,7 @@ import ( "strings" "time" + "github.com/google/uuid" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" @@ -51,6 +52,8 @@ func ensureExporter(params exporter.Settings, pCfg *Config) *pubsubExporter { tracesMarshaler: &ptrace.ProtoMarshaler{}, metricsMarshaler: &pmetric.ProtoMarshaler{}, logsMarshaler: &plog.ProtoMarshaler{}, + makeUUID: uuid.NewRandom, + makeClient: newPublisherClient, } // we ignore the error here as the config is already validated with the same method receiver.ceCompression, _ = pCfg.parseCompression()